import inspect from unittest import mock from django.test import TestCase, Client from inventory.models import Event from mail.models import Email from mail.protocol import LMTPHandler from tickets.models import IssueThread client = Client() def make_mocked_coro(return_value=mock.sentinel, raise_exception=mock.sentinel): async def mock_coro(*args, **kwargs): if raise_exception is not mock.sentinel: raise raise_exception if not inspect.isawaitable(return_value): return return_value await return_value return mock.Mock(wraps=mock_coro) class EmailsApiTest(TestCase): def test_mails(self): Event.objects.get_or_create( name="Test event", slug="test-event", ) Email.objects.create( subject='test', body='test', sender='test', recipient='test', ) response = client.get('/api/2/mails/') self.assertEqual(response.status_code, 200) self.assertEqual(len(response.json()), 1) self.assertEqual(response.json()[0]['subject'], 'test') self.assertEqual(response.json()[0]['body'], 'test') self.assertEqual(response.json()[0]['sender'], 'test') self.assertEqual(response.json()[0]['recipient'], 'test') def test_mails_empty(self): response = client.get('/api/2/mails/') self.assertEqual(response.status_code, 200) self.assertEqual(response.json(), []) class LMTPHandlerTestCase(TestCase): # TODO replace with less hacky test def test_handle_client(self): from aiosmtpd.smtp import Envelope from asgiref.sync import async_to_sync import aiosmtplib aiosmtplib.send = make_mocked_coro() handler = LMTPHandler() server = mock.Mock() session = mock.Mock() envelope = Envelope() envelope.mail_from = 'test1@test' envelope.rcpt_tos = ['test2@test'] envelope.content = b'Subject: test\nFrom: test3@test\nTo: test4@test\nMessage-ID: <1@test>\n\ntest' result = async_to_sync(handler.handle_DATA)(server, session, envelope) self.assertEqual(result, '250 Message accepted for delivery') self.assertEqual(len(Email.objects.all()), 2) self.assertEqual(len(IssueThread.objects.all()), 1) aiosmtplib.send.assert_called_once() self.assertEqual(Email.objects.all()[0].subject, 'test') self.assertEqual(Email.objects.all()[0].sender, 'test1@test') self.assertEqual(Email.objects.all()[0].recipient, 'test2@test') self.assertEqual(Email.objects.all()[0].body, 'test') self.assertEqual(Email.objects.all()[0].issue_thread, IssueThread.objects.all()[0]) self.assertEqual(Email.objects.all()[0].reference, '<1@test>') self.assertEqual(Email.objects.all()[0].in_reply_to, None) self.assertEqual(Email.objects.all()[1].subject, 'Message received') self.assertEqual(Email.objects.all()[1].sender, 'test2@test') self.assertEqual(Email.objects.all()[1].recipient, 'test1@test') self.assertEqual(Email.objects.all()[1].body, 'Thank you for your message.') self.assertEqual(Email.objects.all()[1].issue_thread, IssueThread.objects.all()[0]) self.assertTrue(Email.objects.all()[1].reference.startswith("<")) self.assertTrue(Email.objects.all()[1].reference.endswith("@localhost>")) self.assertEqual(Email.objects.all()[1].in_reply_to, "<1@test>") self.assertEqual(IssueThread.objects.all()[0].name, 'test') self.assertEqual(IssueThread.objects.all()[0].state, 'new') self.assertEqual(IssueThread.objects.all()[0].assigned_to, None) def test_handle_client_reply(self): issue_thread = IssueThread.objects.create( name="test", ) mail1 = Email.objects.create( subject='test subject', body='test', sender='test1@test', recipient='test2@test', issue_thread=issue_thread, ) mail1_reply = Email.objects.create( subject='Message received', body='Thank you for your message.', sender='test2@test', recipient='test1@test', in_reply_to=mail1.reference, issue_thread=issue_thread, ) from aiosmtpd.smtp import Envelope from asgiref.sync import async_to_sync import aiosmtplib aiosmtplib.send = make_mocked_coro() handler = LMTPHandler() server = mock.Mock() session = mock.Mock() envelope = Envelope() envelope.mail_from = 'test1@test' envelope.rcpt_tos = ['test2@test'] # envelope.content = b'Subject: Re: test\nFrom: test3@test\nTo: test4@test\nMessage-ID: 3@test\nIn-Reply-To: 2@localhost\n\ntest' envelope.content = (f'Subject: Re: test\nFrom: test3@test\nTo: test4@test\nMessage-ID: <3@test>\n' f'In-Reply-To: {mail1_reply.reference}'.encode('utf-8') + b'\n\ntest') result = async_to_sync(handler.handle_DATA)(server, session, envelope) self.assertEqual(result, '250 Message accepted for delivery') self.assertEqual(len(Email.objects.all()), 4) self.assertEqual(len(IssueThread.objects.all()), 1) aiosmtplib.send.assert_called_once() self.assertEqual(Email.objects.all()[2].subject, 'Re: test') self.assertEqual(Email.objects.all()[2].sender, 'test1@test') self.assertEqual(Email.objects.all()[2].recipient, 'test2@test') self.assertEqual(Email.objects.all()[2].body, 'test') self.assertEqual(Email.objects.all()[2].issue_thread, issue_thread) self.assertEqual(Email.objects.all()[2].reference, '<3@test>') self.assertEqual(Email.objects.all()[2].in_reply_to, mail1_reply.reference) self.assertEqual(Email.objects.all()[3].subject, 'Message received') self.assertEqual(Email.objects.all()[3].sender, 'test2@test') self.assertEqual(Email.objects.all()[3].recipient, 'test1@test') self.assertEqual(Email.objects.all()[3].body, 'Thank you for your message.') self.assertEqual(Email.objects.all()[3].issue_thread, issue_thread) self.assertTrue(Email.objects.all()[3].reference.startswith("<")) self.assertTrue(Email.objects.all()[3].reference.endswith("@localhost>")) self.assertEqual(Email.objects.all()[3].in_reply_to, "<3@test>") self.assertEqual(IssueThread.objects.all()[0].name, 'test') self.assertEqual(IssueThread.objects.all()[0].state, 'new') self.assertEqual(IssueThread.objects.all()[0].assigned_to, None) # class AsyncLMTPTestCase(TestCase): # # def setUp(self): # server = mock.Mock() # self.create_unix_server = make_mocked_coro(server) # self.wait_closed = make_mocked_coro() # self.loop = asyncio.new_event_loop() # self.loop.create_unix_server = self.create_unix_server # asyncio.set_event_loop(self.loop) # # async def test_connect(self): # server = await UnixSocketLMTPController(LMTPHandler(), unix_socket='lmtp.sock', loop=self.loop).serve() # self.assertEqual(self.create_unix_server.call_count, 1) # self.assertEqual(self.wait_closed.call_count, 0) # server.close() # # self.assertEqual(self.wait_closed.call_count, 1) # # def test_receive_mail(self): # from logging import getLogger # from aiosmtpd.lmtp import LMTP # from asgiref.sync import async_to_sync # log = getLogger('mail.log') # log.addHandler(logging.StreamHandler()) # log.setLevel(logging.DEBUG) # handler = LMTP(LMTPHandler(), loop=self.loop) # transport = mock.Mock() # handler.connection_made(transport) # # def _handle_client(): # print("Handling client") # async_to_sync(handler._handle_client)() # print("Client handled") # # thread = threading.Thread(target=_handle_client) # thread.start() # # # handler.data_received( # # b'HELO test\nMAIL FROM:\nRCPT TO:\nDATA\nSubject: test\nFrom: test1@test\nTo: ' # # b'test2@test\n\ntest\n.\nQUIT') # handler.data_received(b'HELO test\n') # handler.data_received(b'MAIL FROM:\n') # handler.data_received(b'RCPT TO:\n') # handler.data_received(b'DATA\n') # handler.data_received(b'Subject: test\n') # handler.data_received(b'From: test1@test\n') # handler.data_received(b'To: test2@test\n') # handler.data_received(b'\n') # handler.data_received(b'test\n') # handler.data_received(b'.\n') # handler.data_received(b'QUIT\n') # # thread.join() # # handler.connection_lost(None) # thread.join() # # # self.assertEqual(len(Email.objects.all()), 1) # # self.assertEqual(Email.objects.all()[0].subject, 'test') # # self.assertEqual(Email.objects.all()[0].body, 'test') # # self.assertEqual(Email.objects.all()[0].sender, 'test') # # self.assertEqual(Email.objects.all()[0].recipient, 'test@test')