import inspect from unittest import mock from django.test import TestCase, Client from django.contrib.auth.models import Permission from knox.models import AuthToken from authentication.models import ExtendedUser from core.settings import MAIL_DOMAIN from inventory.models import Event from mail.models import Email, EventAddress, EmailAttachment from mail.protocol import LMTPHandler from tickets.models import IssueThread, StateChange def make_mocked_coro(return_value=mock.sentinel, raise_exception=mock.sentinel): async def mock_coro(*args, **kwargs): if raise_exception is not mock.sentinel: raise raise_exception if not inspect.isawaitable(return_value): return return_value await return_value return mock.Mock(wraps=mock_coro) class EmailsApiTest(TestCase): def setUp(self): super().setUp() self.user = ExtendedUser.objects.create_user('testuser', 'test', 'test') self.user.user_permissions.add(*Permission.objects.all()) self.user.save() self.token = AuthToken.objects.create(user=self.user) self.client = Client(headers={'Authorization': 'Token ' + self.token[1]}) def test_mails(self): Event.objects.get_or_create( name="Test event", slug="test-event", ) Email.objects.create( subject='test', body='test', sender='test', recipient='test', ) response = self.client.get('/api/2/mails/') self.assertEqual(response.status_code, 200) self.assertEqual(len(response.json()), 1) self.assertEqual(response.json()[0]['subject'], 'test') self.assertEqual(response.json()[0]['body'], 'test') self.assertEqual(response.json()[0]['sender'], 'test') self.assertEqual(response.json()[0]['recipient'], 'test') def test_mails_empty(self): response = self.client.get('/api/2/mails/') self.assertEqual(response.status_code, 200) self.assertEqual(response.json(), []) class LMTPHandlerTestCase(TestCase): # TODO replace with less hacky test def setUp(self): super().setUp() self.user = ExtendedUser.objects.create_user('testuser', 'test', 'test') self.user.user_permissions.add(*Permission.objects.all()) self.user.save() self.token = AuthToken.objects.create(user=self.user) self.client = Client(headers={'Authorization': 'Token ' + self.token[1]}) def test_handle_client(self): from aiosmtpd.smtp import Envelope from asgiref.sync import async_to_sync import aiosmtplib aiosmtplib.send = make_mocked_coro() handler = LMTPHandler() server = mock.Mock() session = mock.Mock() envelope = Envelope() envelope.mail_from = 'test1@test' envelope.rcpt_tos = ['test2@test'] envelope.content = b'Subject: test\nFrom: test3@test\nTo: test4@test\nMessage-ID: <1@test>\n\ntest' result = async_to_sync(handler.handle_DATA)(server, session, envelope) self.assertEqual(result, '250 Message accepted for delivery') self.assertEqual(len(Email.objects.all()), 2) self.assertEqual(len(IssueThread.objects.all()), 1) aiosmtplib.send.assert_called_once() self.assertEqual('test', Email.objects.all()[0].subject) self.assertEqual('test1@test', Email.objects.all()[0].sender) self.assertEqual('test2@test', Email.objects.all()[0].recipient) self.assertEqual('test', Email.objects.all()[0].body) self.assertEqual(IssueThread.objects.all()[0], Email.objects.all()[0].issue_thread) self.assertEqual('<1@test>', Email.objects.all()[0].reference) self.assertEqual(None, Email.objects.all()[0].in_reply_to) self.assertEqual('Message received', Email.objects.all()[1].subject) self.assertEqual('test2@test', Email.objects.all()[1].sender) self.assertEqual('test1@test', Email.objects.all()[1].recipient) self.assertEqual('Thank you for your message.', Email.objects.all()[1].body) self.assertEqual(IssueThread.objects.all()[0], Email.objects.all()[1].issue_thread) self.assertTrue(Email.objects.all()[1].reference.startswith("<")) self.assertTrue(Email.objects.all()[1].reference.endswith("@localhost>")) self.assertEqual("<1@test>", Email.objects.all()[1].in_reply_to) self.assertEqual('test', IssueThread.objects.all()[0].name) self.assertEqual('pending_new', IssueThread.objects.all()[0].state) self.assertEqual(None, IssueThread.objects.all()[0].assigned_to) states = StateChange.objects.filter(issue_thread=IssueThread.objects.all()[0]) self.assertEqual(1, len(states)) self.assertEqual('pending_new', states[0].state) def test_handle_quoted_printable(self): from aiosmtpd.smtp import Envelope from asgiref.sync import async_to_sync import aiosmtplib aiosmtplib.send = make_mocked_coro() handler = LMTPHandler() server = mock.Mock() session = mock.Mock() envelope = Envelope() envelope.mail_from = 'test1@test' envelope.rcpt_tos = ['test2@test'] envelope.content = b'Subject: test =?utf-8?Q?=C3=A4?=\nFrom: test3@test\nTo: test4@test\nMessage-ID: <1@test>\n\nText mit Quoted-Printable-Kodierung: =?utf-8?Q?=C3=A4=C3=B6=C3=BC=C3=9F?=' result = async_to_sync(handler.handle_DATA)(server, session, envelope) self.assertEqual(result, '250 Message accepted for delivery') self.assertEqual(len(Email.objects.all()), 2) self.assertEqual(len(IssueThread.objects.all()), 1) aiosmtplib.send.assert_called_once() self.assertEqual('test ä', Email.objects.all()[0].subject) self.assertEqual('Text mit Quoted-Printable-Kodierung: äöüß', Email.objects.all()[0].body) def test_handle_quoted_printable_2(self): from aiosmtpd.smtp import Envelope from asgiref.sync import async_to_sync import aiosmtplib aiosmtplib.send = make_mocked_coro() handler = LMTPHandler() server = mock.Mock() session = mock.Mock() envelope = Envelope() envelope.mail_from = 'test1@test' envelope.rcpt_tos = ['test2@test'] envelope.content = b'Subject: =?UTF-8?Q?suche_M=C3=BCtze?=\nFrom: test3@test\nTo: test4@test\nMessage-ID: <1@test>\n\nText mit Quoted-Printable-Kodierung: =?utf-8?Q?=C3=A4=C3=B6=C3=BC=C3=9F?=' result = async_to_sync(handler.handle_DATA)(server, session, envelope) self.assertEqual(result, '250 Message accepted for delivery') self.assertEqual(len(Email.objects.all()), 2) self.assertEqual(len(IssueThread.objects.all()), 1) aiosmtplib.send.assert_called_once() self.assertEqual('suche_Mütze', Email.objects.all()[0].subject) self.assertEqual('Text mit Quoted-Printable-Kodierung: äöüß', Email.objects.all()[0].body) def test_handle_base64(self): from aiosmtpd.smtp import Envelope from asgiref.sync import async_to_sync import aiosmtplib aiosmtplib.send = make_mocked_coro() handler = LMTPHandler() server = mock.Mock() session = mock.Mock() envelope = Envelope() envelope.mail_from = 'test1@test' envelope.rcpt_tos = ['test2@test'] envelope.content = b'Subject: =?utf-8?B?dGVzdA==?=\nFrom: test3@test\nTo: test4@test\nMessage-ID: <1@test>\n\nText mit Base64-Kodierung: =?utf-8?B?w6TDtsO8w58=?=' result = async_to_sync(handler.handle_DATA)(server, session, envelope) self.assertEqual(result, '250 Message accepted for delivery') self.assertEqual(len(Email.objects.all()), 2) self.assertEqual(len(IssueThread.objects.all()), 1) aiosmtplib.send.assert_called_once() self.assertEqual('test', Email.objects.all()[0].subject) self.assertEqual('Text mit Base64-Kodierung: äöüß', Email.objects.all()[0].body) def test_handle_client_reply(self): issue_thread = IssueThread.objects.create( name="test", ) mail1 = Email.objects.create( subject='test subject', body='test', sender='test1@test', recipient='test2@test', issue_thread=issue_thread, ) mail1_reply = Email.objects.create( subject='Message received', body='Thank you for your message.', sender='test2@test', recipient='test1@test', in_reply_to=mail1.reference, issue_thread=issue_thread, ) from aiosmtpd.smtp import Envelope from asgiref.sync import async_to_sync import aiosmtplib aiosmtplib.send = make_mocked_coro() handler = LMTPHandler() server = mock.Mock() session = mock.Mock() envelope = Envelope() envelope.mail_from = 'test1@test' envelope.rcpt_tos = ['test2@test'] envelope.content = (f'Subject: Re: test\nFrom: test3@test\nTo: test4@test\nMessage-ID: <3@test>\n' f'In-Reply-To: {mail1_reply.reference}'.encode('utf-8') + b'\n\ntest') result = async_to_sync(handler.handle_DATA)(server, session, envelope) self.assertEqual(result, '250 Message accepted for delivery') self.assertEqual(len(Email.objects.all()), 3) self.assertEqual(len(IssueThread.objects.all()), 1) aiosmtplib.send.assert_not_called() self.assertEqual(Email.objects.all()[2].subject, 'Re: test') self.assertEqual(Email.objects.all()[2].sender, 'test1@test') self.assertEqual(Email.objects.all()[2].recipient, 'test2@test') self.assertEqual(Email.objects.all()[2].body, 'test') self.assertEqual(Email.objects.all()[2].issue_thread, issue_thread) self.assertEqual(Email.objects.all()[2].reference, '<3@test>') self.assertEqual(Email.objects.all()[2].in_reply_to, mail1_reply.reference) self.assertEqual(IssueThread.objects.all()[0].name, 'test') self.assertEqual(IssueThread.objects.all()[0].state, 'pending_new') self.assertEqual(IssueThread.objects.all()[0].assigned_to, None) def test_handle_client_reply_2(self): issue_thread = IssueThread.objects.create( name="test", ) mail1 = Email.objects.create( subject='test subject', body='test', sender='test1@test', recipient='test2@test', issue_thread=issue_thread, ) mail1_reply = Email.objects.create( subject='Message received', body='Thank you for your message.', sender='test2@test', recipient='test1@test', in_reply_to=mail1.reference, issue_thread=issue_thread, ) StateChange.objects.create( issue_thread=issue_thread, state='waiting_details', ) self.assertEqual(IssueThread.objects.all()[0].state, 'waiting_details') from aiosmtpd.smtp import Envelope from asgiref.sync import async_to_sync import aiosmtplib aiosmtplib.send = make_mocked_coro() handler = LMTPHandler() server = mock.Mock() session = mock.Mock() envelope = Envelope() envelope.mail_from = 'test1@test' envelope.rcpt_tos = ['test2@test'] envelope.content = (f'Subject: Re: test\nFrom: test3@test\nTo: test4@test\nMessage-ID: <3@test>\n' f'In-Reply-To: {mail1_reply.reference}'.encode('utf-8') + b'\n\ntest') result = async_to_sync(handler.handle_DATA)(server, session, envelope) self.assertEqual(result, '250 Message accepted for delivery') self.assertEqual(len(Email.objects.all()), 3) self.assertEqual(len(IssueThread.objects.all()), 1) aiosmtplib.send.assert_not_called() self.assertEqual(Email.objects.all()[2].subject, 'Re: test') self.assertEqual(Email.objects.all()[2].sender, 'test1@test') self.assertEqual(Email.objects.all()[2].recipient, 'test2@test') self.assertEqual(Email.objects.all()[2].body, 'test') self.assertEqual(Email.objects.all()[2].issue_thread, issue_thread) self.assertEqual(Email.objects.all()[2].reference, '<3@test>') self.assertEqual(Email.objects.all()[2].in_reply_to, mail1_reply.reference) self.assertEqual(IssueThread.objects.all()[0].name, 'test') self.assertEqual(IssueThread.objects.all()[0].state, 'pending_open') self.assertEqual(IssueThread.objects.all()[0].assigned_to, None) def test_mail_reply(self): issue_thread = IssueThread.objects.create( name="test", ) mail1 = Email.objects.create( subject='test subject', body='test', sender='test1@test', recipient='test2@' + MAIL_DOMAIN, issue_thread=issue_thread, ) mail1_reply = Email.objects.create( subject='Re: test subject', body='Thank you for your message.', sender='test2@' + MAIL_DOMAIN, recipient='test1@test', in_reply_to=mail1.reference, issue_thread=issue_thread, ) import aiosmtplib aiosmtplib.send = make_mocked_coro() response = self.client.post(f'/api/2/tickets/{issue_thread.id}/reply/', { 'message': 'test' }) self.assertEqual(response.status_code, 201) self.assertEqual(len(Email.objects.all()), 3) self.assertEqual(len(IssueThread.objects.all()), 1) aiosmtplib.send.assert_called_once() self.assertEqual(Email.objects.all()[2].subject, 'Re: test subject') self.assertEqual(Email.objects.all()[2].sender, 'test2@' + MAIL_DOMAIN) self.assertEqual(Email.objects.all()[2].recipient, 'test1@test') self.assertEqual(Email.objects.all()[2].body, 'test') self.assertEqual(Email.objects.all()[2].issue_thread, issue_thread) self.assertTrue(Email.objects.all()[2].reference.startswith("<")) self.assertTrue(Email.objects.all()[2].reference.endswith("@localhost>")) self.assertEqual(Email.objects.all()[2].in_reply_to, mail1.reference) def test_match_event(self): event = Event.objects.create( name="Test event", slug="test-event", ) event_address = EventAddress.objects.create( event=event, address="test_event@localhost", ) from aiosmtpd.smtp import Envelope from asgiref.sync import async_to_sync import aiosmtplib aiosmtplib.send = make_mocked_coro() handler = LMTPHandler() server = mock.Mock() session = mock.Mock() envelope = Envelope() envelope.mail_from = 'test1@test' envelope.rcpt_tos = ['test_event@localhost'] envelope.content = b'Subject: test\nFrom: test1@test\nTo: test_event@localhost\nMessage-ID: <1@test>\n\ntest' result = async_to_sync(handler.handle_DATA)(server, session, envelope) self.assertEqual(result, '250 Message accepted for delivery') self.assertEqual(len(Email.objects.all()), 2) self.assertEqual(len(IssueThread.objects.all()), 1) aiosmtplib.send.assert_called_once() self.assertEqual(event, Email.objects.all()[0].event) self.assertEqual(event, Email.objects.all()[1].event) self.assertEqual('test', Email.objects.all()[0].subject) self.assertEqual('Message received', Email.objects.all()[1].subject) self.assertEqual('test1@test', Email.objects.all()[0].sender) self.assertEqual('test_event@localhost', Email.objects.all()[0].recipient) self.assertEqual('test_event@localhost', Email.objects.all()[1].sender) self.assertEqual('test1@test', Email.objects.all()[1].recipient) self.assertEqual('test', Email.objects.all()[0].body) self.assertEqual('Thank you for your message.', Email.objects.all()[1].body) self.assertEqual(IssueThread.objects.all()[0], Email.objects.all()[0].issue_thread) self.assertEqual(IssueThread.objects.all()[0], Email.objects.all()[1].issue_thread) self.assertEqual('<1@test>', Email.objects.all()[0].reference) self.assertEqual(None, Email.objects.all()[0].in_reply_to) self.assertTrue(Email.objects.all()[1].reference.startswith("<")) self.assertTrue(Email.objects.all()[1].reference.endswith("@localhost>")) self.assertEqual("<1@test>", Email.objects.all()[1].in_reply_to) self.assertEqual('test', IssueThread.objects.all()[0].name) self.assertEqual(None, IssueThread.objects.all()[0].assigned_to) self.assertEqual(1, len(IssueThread.objects.all())) self.assertEqual('pending_new', IssueThread.objects.all()[0].state) states = StateChange.objects.filter(issue_thread=IssueThread.objects.all()[0]) self.assertEqual(1, len(states)) self.assertEqual('pending_new', states[0].state) def test_split_text_inline_image(self): from aiosmtpd.smtp import Envelope from asgiref.sync import async_to_sync import aiosmtplib aiosmtplib.send = make_mocked_coro() handler = LMTPHandler() server = mock.Mock() session = mock.Mock() envelope = Envelope() envelope.mail_from = 'test1@test' envelope.rcpt_tos = ['test2@test'] envelope.content = b'''Subject: test From: test1@test To: test2@test Message-ID: <1@test> Content-Type: multipart/alternative; boundary="abc" --abc Content-Type: text/plain; charset=utf-8 test1 --abc Content-Type: image/jpeg; name="test.jpg" Content-Disposition: inline; filename="test.jpg" Content-Transfer-Encoding: base64 Content-ID: <1> X-Attachment-Id: 1 dGVzdGltYWdl --abc Content-Type: text/plain; charset=utf-8 test2 --abc--''' result = async_to_sync(handler.handle_DATA)(server, session, envelope) self.assertEqual(result, '250 Message accepted for delivery') self.assertEqual(len(Email.objects.all()), 2) self.assertEqual(len(IssueThread.objects.all()), 1) aiosmtplib.send.assert_called_once() self.assertEqual('test', Email.objects.all()[0].subject) self.assertEqual('test1@test', Email.objects.all()[0].sender) self.assertEqual('test2@test', Email.objects.all()[0].recipient) self.assertEqual('test1\ntest2\n', Email.objects.all()[0].body) self.assertEqual(IssueThread.objects.all()[0], Email.objects.all()[0].issue_thread) self.assertEqual('<1@test>', Email.objects.all()[0].reference) self.assertEqual(None, Email.objects.all()[0].in_reply_to) self.assertEqual('Message received', Email.objects.all()[1].subject) self.assertEqual('test2@test', Email.objects.all()[1].sender) self.assertEqual('test1@test', Email.objects.all()[1].recipient) self.assertEqual('Thank you for your message.', Email.objects.all()[1].body) self.assertEqual(IssueThread.objects.all()[0], Email.objects.all()[1].issue_thread) self.assertTrue(Email.objects.all()[1].reference.startswith("<")) self.assertTrue(Email.objects.all()[1].reference.endswith("@localhost>")) self.assertEqual("<1@test>", Email.objects.all()[1].in_reply_to) self.assertEqual('test', IssueThread.objects.all()[0].name) self.assertEqual('pending_new', IssueThread.objects.all()[0].state) self.assertEqual(None, IssueThread.objects.all()[0].assigned_to) states = StateChange.objects.filter(issue_thread=IssueThread.objects.all()[0]) self.assertEqual(1, len(states)) self.assertEqual('pending_new', states[0].state) self.assertEqual(1, len(EmailAttachment.objects.all())) self.assertEqual(1, EmailAttachment.objects.all()[0].id) self.assertEqual('image/jpeg', EmailAttachment.objects.all()[0].mime_type) self.assertEqual('test.jpg', EmailAttachment.objects.all()[0].name) file_content = EmailAttachment.objects.all()[0].file.read() self.assertEqual(b'testimage', file_content) def test_text_with_attachment(self): from aiosmtpd.smtp import Envelope from asgiref.sync import async_to_sync import aiosmtplib aiosmtplib.send = make_mocked_coro() handler = LMTPHandler() server = mock.Mock() session = mock.Mock() envelope = Envelope() envelope.mail_from = 'test1@test' envelope.rcpt_tos = ['test2@test'] envelope.content = b'''Subject: test From: test1@test To: test2@test Message-ID: <1@test> Content-Type: multipart/mixed; boundary="abc" --abc Content-Type: text/plain; charset=utf-8 test1 --abc Content-Type: image/jpeg; name="test.jpg" Content-Disposition: attachment; filename="test.jpg" Content-Transfer-Encoding: base64 Content-ID: <1> X-Attachment-Id: 1 dGVzdGltYWdl --abc--''' result = async_to_sync(handler.handle_DATA)(server, session, envelope) self.assertEqual(result, '250 Message accepted for delivery') self.assertEqual(len(Email.objects.all()), 2) self.assertEqual(len(IssueThread.objects.all()), 1) aiosmtplib.send.assert_called_once() self.assertEqual('test', Email.objects.all()[0].subject) self.assertEqual('test1@test', Email.objects.all()[0].sender) self.assertEqual('test2@test', Email.objects.all()[0].recipient) self.assertEqual('test1\n', Email.objects.all()[0].body) self.assertEqual(IssueThread.objects.all()[0], Email.objects.all()[0].issue_thread) self.assertEqual('<1@test>', Email.objects.all()[0].reference) self.assertEqual(None, Email.objects.all()[0].in_reply_to) self.assertEqual('Message received', Email.objects.all()[1].subject) self.assertEqual('test2@test', Email.objects.all()[1].sender) self.assertEqual('test1@test', Email.objects.all()[1].recipient) self.assertEqual('Thank you for your message.', Email.objects.all()[1].body) self.assertEqual(IssueThread.objects.all()[0], Email.objects.all()[1].issue_thread) self.assertTrue(Email.objects.all()[1].reference.startswith("<")) self.assertTrue(Email.objects.all()[1].reference.endswith("@localhost>")) self.assertEqual("<1@test>", Email.objects.all()[1].in_reply_to) self.assertEqual('test', IssueThread.objects.all()[0].name) self.assertEqual('pending_new', IssueThread.objects.all()[0].state) self.assertEqual(None, IssueThread.objects.all()[0].assigned_to) states = StateChange.objects.filter(issue_thread=IssueThread.objects.all()[0]) self.assertEqual(1, len(states)) self.assertEqual('pending_new', states[0].state) self.assertEqual(1, len(EmailAttachment.objects.all())) self.assertEqual(1, EmailAttachment.objects.all()[0].id) self.assertEqual('image/jpeg', EmailAttachment.objects.all()[0].mime_type) self.assertEqual('test.jpg', EmailAttachment.objects.all()[0].name) file_content = EmailAttachment.objects.all()[0].file.read() self.assertEqual(b'testimage', file_content) def test_mail_noreply(self): from aiosmtpd.smtp import Envelope from asgiref.sync import async_to_sync import aiosmtplib aiosmtplib.send = make_mocked_coro() handler = LMTPHandler() server = mock.Mock() session = mock.Mock() envelope = Envelope() envelope.mail_from = 'noreply@test' envelope.rcpt_tos = ['test2@test'] envelope.content = b'Subject: test\nFrom: noreply@test\nTo: test2@test\nMessage-ID: <1@test>\n\ntest' result = async_to_sync(handler.handle_DATA)(server, session, envelope) self.assertEqual(result, '250 Message accepted for delivery') self.assertEqual(len(Email.objects.all()), 1) self.assertEqual(len(IssueThread.objects.all()), 1) aiosmtplib.send.assert_not_called() self.assertEqual('test', Email.objects.all()[0].subject) self.assertEqual('noreply@test', Email.objects.all()[0].sender) self.assertEqual('test2@test', Email.objects.all()[0].recipient) self.assertEqual('test', Email.objects.all()[0].body) self.assertEqual(IssueThread.objects.all()[0], Email.objects.all()[0].issue_thread) self.assertEqual('<1@test>', Email.objects.all()[0].reference) self.assertEqual(None, Email.objects.all()[0].in_reply_to) self.assertEqual('test', IssueThread.objects.all()[0].name) self.assertEqual('pending_new', IssueThread.objects.all()[0].state) self.assertEqual(None, IssueThread.objects.all()[0].assigned_to) states = StateChange.objects.filter(issue_thread=IssueThread.objects.all()[0]) self.assertEqual(1, len(states)) self.assertEqual('pending_new', states[0].state)