This commit is contained in:
j3d1 2023-12-06 14:56:00 +01:00
parent a1d41430bf
commit 77a5b56642
7 changed files with 80 additions and 66 deletions

View file

@ -116,7 +116,6 @@ class LMTPHandlerTestCase(TestCase): # TODO replace with less hacky test
envelope = Envelope()
envelope.mail_from = 'test1@test'
envelope.rcpt_tos = ['test2@test']
# envelope.content = b'Subject: Re: test\nFrom: test3@test\nTo: test4@test\nMessage-ID: 3@test\nIn-Reply-To: 2@localhost\n\ntest'
envelope.content = (f'Subject: Re: test\nFrom: test3@test\nTo: test4@test\nMessage-ID: <3@test>\n'
f'In-Reply-To: {mail1_reply.reference}'.encode('utf-8') + b'\n\ntest')
result = async_to_sync(handler.handle_DATA)(server, session, envelope)
@ -134,65 +133,3 @@ class LMTPHandlerTestCase(TestCase): # TODO replace with less hacky test
self.assertEqual(IssueThread.objects.all()[0].name, 'test')
self.assertEqual(IssueThread.objects.all()[0].state, 'new')
self.assertEqual(IssueThread.objects.all()[0].assigned_to, None)
# class AsyncLMTPTestCase(TestCase):
#
# def setUp(self):
# server = mock.Mock()
# self.create_unix_server = make_mocked_coro(server)
# self.wait_closed = make_mocked_coro()
# self.loop = asyncio.new_event_loop()
# self.loop.create_unix_server = self.create_unix_server
# asyncio.set_event_loop(self.loop)
#
# async def test_connect(self):
# server = await UnixSocketLMTPController(LMTPHandler(), unix_socket='lmtp.sock', loop=self.loop).serve()
# self.assertEqual(self.create_unix_server.call_count, 1)
# self.assertEqual(self.wait_closed.call_count, 0)
# server.close()
# # self.assertEqual(self.wait_closed.call_count, 1)
#
# def test_receive_mail(self):
# from logging import getLogger
# from aiosmtpd.lmtp import LMTP
# from asgiref.sync import async_to_sync
# log = getLogger('mail.log')
# log.addHandler(logging.StreamHandler())
# log.setLevel(logging.DEBUG)
# handler = LMTP(LMTPHandler(), loop=self.loop)
# transport = mock.Mock()
# handler.connection_made(transport)
#
# def _handle_client():
# print("Handling client")
# async_to_sync(handler._handle_client)()
# print("Client handled")
#
# thread = threading.Thread(target=_handle_client)
# thread.start()
#
# # handler.data_received(
# # b'HELO test\nMAIL FROM:<test1@test>\nRCPT TO:<test2@test>\nDATA\nSubject: test\nFrom: test1@test\nTo: '
# # b'test2@test\n\ntest\n.\nQUIT')
# handler.data_received(b'HELO test\n')
# handler.data_received(b'MAIL FROM:<test1@test>\n')
# handler.data_received(b'RCPT TO:<test2@test>\n')
# handler.data_received(b'DATA\n')
# handler.data_received(b'Subject: test\n')
# handler.data_received(b'From: test1@test\n')
# handler.data_received(b'To: test2@test\n')
# handler.data_received(b'\n')
# handler.data_received(b'test\n')
# handler.data_received(b'.\n')
# handler.data_received(b'QUIT\n')
#
# thread.join()
#
# handler.connection_lost(None)
# thread.join()
#
# # self.assertEqual(len(Email.objects.all()), 1)
# # self.assertEqual(Email.objects.all()[0].subject, 'test')
# # self.assertEqual(Email.objects.all()[0].body, 'test')
# # self.assertEqual(Email.objects.all()[0].sender, 'test')
# # self.assertEqual(Email.objects.all()[0].recipient, 'test@test')