wip
This commit is contained in:
parent
0f911589ca
commit
5bdfe313de
65 changed files with 2219 additions and 77 deletions
198
core/mail/tests/v2/test_mails.py
Normal file
198
core/mail/tests/v2/test_mails.py
Normal file
|
@ -0,0 +1,198 @@
|
|||
import inspect
|
||||
from unittest import mock
|
||||
|
||||
from django.test import TestCase, Client
|
||||
|
||||
from inventory.models import Event
|
||||
from mail.models import Email
|
||||
from mail.protocol import LMTPHandler
|
||||
from tickets.models import IssueThread
|
||||
|
||||
client = Client()
|
||||
|
||||
|
||||
def make_mocked_coro(return_value=mock.sentinel, raise_exception=mock.sentinel):
|
||||
async def mock_coro(*args, **kwargs):
|
||||
if raise_exception is not mock.sentinel:
|
||||
raise raise_exception
|
||||
if not inspect.isawaitable(return_value):
|
||||
return return_value
|
||||
await return_value
|
||||
|
||||
return mock.Mock(wraps=mock_coro)
|
||||
|
||||
|
||||
class EmailsApiTest(TestCase):
|
||||
|
||||
def test_mails(self):
|
||||
Event.objects.get_or_create(
|
||||
name="Test event",
|
||||
slug="test-event",
|
||||
)
|
||||
Email.objects.create(
|
||||
subject='test',
|
||||
body='test',
|
||||
sender='test',
|
||||
recipient='test',
|
||||
)
|
||||
response = client.get('/api/2/mails/')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertEqual(len(response.json()), 1)
|
||||
self.assertEqual(response.json()[0]['subject'], 'test')
|
||||
self.assertEqual(response.json()[0]['body'], 'test')
|
||||
self.assertEqual(response.json()[0]['sender'], 'test')
|
||||
self.assertEqual(response.json()[0]['recipient'], 'test')
|
||||
|
||||
def test_mails_empty(self):
|
||||
response = client.get('/api/2/mails/')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertEqual(response.json(), [])
|
||||
|
||||
|
||||
class LMTPHandlerTestCase(TestCase): # TODO replace with less hacky test
|
||||
|
||||
def test_handle_client(self):
|
||||
from aiosmtpd.smtp import Envelope
|
||||
from asgiref.sync import async_to_sync
|
||||
import aiosmtplib
|
||||
aiosmtplib.send = make_mocked_coro()
|
||||
handler = LMTPHandler()
|
||||
server = mock.Mock()
|
||||
session = mock.Mock()
|
||||
envelope = Envelope()
|
||||
envelope.mail_from = 'test1@test'
|
||||
envelope.rcpt_tos = ['test2@test']
|
||||
envelope.content = b'Subject: test\nFrom: test3@test\nTo: test4@test\nMessage-ID: <1@test>\n\ntest'
|
||||
result = async_to_sync(handler.handle_DATA)(server, session, envelope)
|
||||
self.assertEqual(result, '250 Message accepted for delivery')
|
||||
self.assertEqual(len(Email.objects.all()), 2)
|
||||
self.assertEqual(len(IssueThread.objects.all()), 1)
|
||||
aiosmtplib.send.assert_called_once()
|
||||
self.assertEqual(Email.objects.all()[0].subject, 'test')
|
||||
self.assertEqual(Email.objects.all()[0].sender, 'test1@test')
|
||||
self.assertEqual(Email.objects.all()[0].recipient, 'test2@test')
|
||||
self.assertEqual(Email.objects.all()[0].body, 'test')
|
||||
self.assertEqual(Email.objects.all()[0].issue_thread, IssueThread.objects.all()[0])
|
||||
self.assertEqual(Email.objects.all()[0].reference, '<1@test>')
|
||||
self.assertEqual(Email.objects.all()[0].in_reply_to, None)
|
||||
self.assertEqual(Email.objects.all()[1].subject, 'Message received')
|
||||
self.assertEqual(Email.objects.all()[1].sender, 'test2@test')
|
||||
self.assertEqual(Email.objects.all()[1].recipient, 'test1@test')
|
||||
self.assertEqual(Email.objects.all()[1].body, 'Thank you for your message.')
|
||||
self.assertEqual(Email.objects.all()[1].issue_thread, IssueThread.objects.all()[0])
|
||||
self.assertTrue(Email.objects.all()[1].reference.startswith("<"))
|
||||
self.assertTrue(Email.objects.all()[1].reference.endswith("@localhost>"))
|
||||
self.assertEqual(Email.objects.all()[1].in_reply_to, "<1@test>")
|
||||
|
||||
def test_handle_client_reply(self):
|
||||
issue_thread = IssueThread.objects.create()
|
||||
mail1 = Email.objects.create(
|
||||
subject='test',
|
||||
body='test',
|
||||
sender='test1@test',
|
||||
recipient='test2@test',
|
||||
issue_thread=issue_thread,
|
||||
)
|
||||
mail1_reply = Email.objects.create(
|
||||
subject='Message received',
|
||||
body='Thank you for your message.',
|
||||
sender='test2@test',
|
||||
recipient='test1@test',
|
||||
in_reply_to=mail1.reference,
|
||||
issue_thread=issue_thread,
|
||||
)
|
||||
from aiosmtpd.smtp import Envelope
|
||||
from asgiref.sync import async_to_sync
|
||||
import aiosmtplib
|
||||
aiosmtplib.send = make_mocked_coro()
|
||||
handler = LMTPHandler()
|
||||
server = mock.Mock()
|
||||
session = mock.Mock()
|
||||
envelope = Envelope()
|
||||
envelope.mail_from = 'test1@test'
|
||||
envelope.rcpt_tos = ['test2@test']
|
||||
# envelope.content = b'Subject: Re: test\nFrom: test3@test\nTo: test4@test\nMessage-ID: 3@test\nIn-Reply-To: 2@localhost\n\ntest'
|
||||
envelope.content = (f'Subject: Re: test\nFrom: test3@test\nTo: test4@test\nMessage-ID: <3@test>\n'
|
||||
f'In-Reply-To: {mail1_reply.reference}'.encode('utf-8') + b'\n\ntest')
|
||||
result = async_to_sync(handler.handle_DATA)(server, session, envelope)
|
||||
self.assertEqual(result, '250 Message accepted for delivery')
|
||||
self.assertEqual(len(Email.objects.all()), 4)
|
||||
self.assertEqual(len(IssueThread.objects.all()), 1)
|
||||
aiosmtplib.send.assert_called_once()
|
||||
self.assertEqual(Email.objects.all()[2].subject, 'Re: test')
|
||||
self.assertEqual(Email.objects.all()[2].sender, 'test1@test')
|
||||
self.assertEqual(Email.objects.all()[2].recipient, 'test2@test')
|
||||
self.assertEqual(Email.objects.all()[2].body, 'test')
|
||||
self.assertEqual(Email.objects.all()[2].issue_thread, issue_thread)
|
||||
self.assertEqual(Email.objects.all()[2].reference, '<3@test>')
|
||||
self.assertEqual(Email.objects.all()[2].in_reply_to, mail1_reply.reference)
|
||||
self.assertEqual(Email.objects.all()[3].subject, 'Message received')
|
||||
self.assertEqual(Email.objects.all()[3].sender, 'test2@test')
|
||||
self.assertEqual(Email.objects.all()[3].recipient, 'test1@test')
|
||||
self.assertEqual(Email.objects.all()[3].body, 'Thank you for your message.')
|
||||
self.assertEqual(Email.objects.all()[3].issue_thread, issue_thread)
|
||||
self.assertTrue(Email.objects.all()[3].reference.startswith("<"))
|
||||
self.assertTrue(Email.objects.all()[3].reference.endswith("@localhost>"))
|
||||
self.assertEqual(Email.objects.all()[3].in_reply_to, "<3@test>")
|
||||
|
||||
# class AsyncLMTPTestCase(TestCase):
|
||||
#
|
||||
# def setUp(self):
|
||||
# server = mock.Mock()
|
||||
# self.create_unix_server = make_mocked_coro(server)
|
||||
# self.wait_closed = make_mocked_coro()
|
||||
# self.loop = asyncio.new_event_loop()
|
||||
# self.loop.create_unix_server = self.create_unix_server
|
||||
# asyncio.set_event_loop(self.loop)
|
||||
#
|
||||
# async def test_connect(self):
|
||||
# server = await UnixSocketLMTPController(LMTPHandler(), unix_socket='lmtp.sock', loop=self.loop).serve()
|
||||
# self.assertEqual(self.create_unix_server.call_count, 1)
|
||||
# self.assertEqual(self.wait_closed.call_count, 0)
|
||||
# server.close()
|
||||
# # self.assertEqual(self.wait_closed.call_count, 1)
|
||||
#
|
||||
# def test_receive_mail(self):
|
||||
# from logging import getLogger
|
||||
# from aiosmtpd.lmtp import LMTP
|
||||
# from asgiref.sync import async_to_sync
|
||||
# log = getLogger('mail.log')
|
||||
# log.addHandler(logging.StreamHandler())
|
||||
# log.setLevel(logging.DEBUG)
|
||||
# handler = LMTP(LMTPHandler(), loop=self.loop)
|
||||
# transport = mock.Mock()
|
||||
# handler.connection_made(transport)
|
||||
#
|
||||
# def _handle_client():
|
||||
# print("Handling client")
|
||||
# async_to_sync(handler._handle_client)()
|
||||
# print("Client handled")
|
||||
#
|
||||
# thread = threading.Thread(target=_handle_client)
|
||||
# thread.start()
|
||||
#
|
||||
# # handler.data_received(
|
||||
# # b'HELO test\nMAIL FROM:<test1@test>\nRCPT TO:<test2@test>\nDATA\nSubject: test\nFrom: test1@test\nTo: '
|
||||
# # b'test2@test\n\ntest\n.\nQUIT')
|
||||
# handler.data_received(b'HELO test\n')
|
||||
# handler.data_received(b'MAIL FROM:<test1@test>\n')
|
||||
# handler.data_received(b'RCPT TO:<test2@test>\n')
|
||||
# handler.data_received(b'DATA\n')
|
||||
# handler.data_received(b'Subject: test\n')
|
||||
# handler.data_received(b'From: test1@test\n')
|
||||
# handler.data_received(b'To: test2@test\n')
|
||||
# handler.data_received(b'\n')
|
||||
# handler.data_received(b'test\n')
|
||||
# handler.data_received(b'.\n')
|
||||
# handler.data_received(b'QUIT\n')
|
||||
#
|
||||
# thread.join()
|
||||
#
|
||||
# handler.connection_lost(None)
|
||||
# thread.join()
|
||||
#
|
||||
# # self.assertEqual(len(Email.objects.all()), 1)
|
||||
# # self.assertEqual(Email.objects.all()[0].subject, 'test')
|
||||
# # self.assertEqual(Email.objects.all()[0].body, 'test')
|
||||
# # self.assertEqual(Email.objects.all()[0].sender, 'test')
|
||||
# # self.assertEqual(Email.objects.all()[0].recipient, 'test@test')
|
Loading…
Add table
Add a link
Reference in a new issue