add notify_sessions module

This commit is contained in:
j3d1 2024-01-07 21:16:34 +01:00
parent fc05c7c1d8
commit 434dfe807e
7 changed files with 189 additions and 10 deletions

View file

View file

@ -0,0 +1,66 @@
from django.test import TestCase
from channels.testing import WebsocketCommunicator
from notify_sessions.consumers import NotifyConsumer
from asgiref.sync import async_to_sync
class NotifyWebsocketTestCase(TestCase):
async def test_connect(self):
communicator = WebsocketCommunicator(NotifyConsumer.as_asgi(), "/ws/2/notify/")
connected, subprotocol = await communicator.connect()
self.assertTrue(connected)
await communicator.disconnect()
async def fut_send_message(self):
communicator = WebsocketCommunicator(NotifyConsumer.as_asgi(), "/ws/2/notify/")
connected, subprotocol = await communicator.connect()
self.assertTrue(connected)
await communicator.send_json_to({
"name": "foo",
"message": "bar",
})
response = await communicator.receive_json_from()
await communicator.disconnect()
return response
def test_send_message(self):
response = async_to_sync(self.fut_send_message)()
self.assertEqual(response["message"], "bar")
self.assertEqual(response["event_id"], 1)
self.assertEqual(response["name"], "send_message_to_frontend")
# events = SystemEvent.objects.all()
# self.assertEqual(len(events), 1)
# event = events[0]
# self.assertEqual(event.event_id, 1)
# self.assertEqual(event.name, "send_message_to_frontend")
# self.assertEqual(event.message, "bar")
async def fut_send_and_receive_message(self):
communicator1 = WebsocketCommunicator(NotifyConsumer.as_asgi(), "/ws/2/notify/")
communicator2 = WebsocketCommunicator(NotifyConsumer.as_asgi(), "/ws/2/notify/")
connected1, subprotocol1 = await communicator1.connect()
connected2, subprotocol2 = await communicator2.connect()
self.assertTrue(connected1)
self.assertTrue(connected2)
await communicator1.send_json_to({
"name": "foo",
"message": "bar",
})
response = await communicator2.receive_json_from()
await communicator1.disconnect()
await communicator2.disconnect()
return response
def test_send_and_receive_message(self):
response = async_to_sync(self.fut_send_and_receive_message)()
self.assertEqual(response["message"], "bar")
self.assertEqual(response["event_id"], 1)
self.assertEqual(response["name"], "send_message_to_frontend")
# events = SystemEvent.objects.all()
# self.assertEqual(len(events), 1)
# event = events[0]
# self.assertEqual(event.event_id, 1)
# self.assertEqual(event.name, "send_message_to_frontend")
# self.assertEqual(event.message, "bar")