import inspect from unittest import mock from django.test import TestCase, Client from django.contrib.auth.models import Permission from knox.models import AuthToken from authentication.models import ExtendedUser from core.settings import MAIL_DOMAIN from inventory.models import Event from mail.models import Email, EventAddress, EmailAttachment from mail.protocol import LMTPHandler from tickets.models import IssueThread, StateChange expected_auto_reply_subject = 'Re: {} [#{}]' expected_auto_reply = '''Your request (#{}) has been received and will be reviewed by our lost&found angels. We are reviewing incoming requests during the event and teardown. Immediately after the event, expect a delay as the \ workload is high. We will not forget about your request and get back in touch once we have updated information on your \ request. Requests for devices, wallets, credit cards or similar items will be handled with priority. If you happen to find your lost item or just want to add additional information, please reply to this email. Please \ do not create a new request. Your c3lf (Cloakroom + Lost&Found) Team''' def make_mocked_coro(return_value=mock.sentinel, raise_exception=mock.sentinel): async def mock_coro(*args, **kwargs): if raise_exception is not mock.sentinel: raise raise_exception if not inspect.isawaitable(return_value): return return_value await return_value return mock.Mock(wraps=mock_coro) class EmailsApiTest(TestCase): def setUp(self): super().setUp() self.user = ExtendedUser.objects.create_user('testuser', 'test', 'test') self.user.user_permissions.add(*Permission.objects.all()) self.user.save() self.token = AuthToken.objects.create(user=self.user) self.client = Client(headers={'Authorization': 'Token ' + self.token[1]}) def test_mails(self): Event.objects.get_or_create( name="Test event", slug="test-event", ) Email.objects.create( subject='test', body='test', sender='test', recipient='test', ) response = self.client.get('/api/2/mails/') self.assertEqual(response.status_code, 200) self.assertEqual(len(response.json()), 1) self.assertEqual(response.json()[0]['subject'], 'test') self.assertEqual(response.json()[0]['body'], 'test') self.assertEqual(response.json()[0]['sender'], 'test') self.assertEqual(response.json()[0]['recipient'], 'test') def test_mails_empty(self): response = self.client.get('/api/2/mails/') self.assertEqual(response.status_code, 200) self.assertEqual(response.json(), []) class LMTPHandlerTestCase(TestCase): # TODO replace with less hacky test def setUp(self): super().setUp() self.user = ExtendedUser.objects.create_user('testuser', 'test', 'test') self.user.user_permissions.add(*Permission.objects.all()) self.user.save() self.token = AuthToken.objects.create(user=self.user) self.client = Client(headers={'Authorization': 'Token ' + self.token[1]}) def test_handle_client(self): from aiosmtpd.smtp import Envelope from asgiref.sync import async_to_sync import aiosmtplib aiosmtplib.send = make_mocked_coro() handler = LMTPHandler() server = mock.Mock() session = mock.Mock() envelope = Envelope() envelope.mail_from = 'test1@test' envelope.rcpt_tos = ['test2@localhost'] envelope.content = b'Subject: test\nFrom: test3@test\nTo: test4@localhost\nMessage-ID: <1@test>\n\ntest' result = async_to_sync(handler.handle_DATA)(server, session, envelope) self.assertEqual(result, '250 Message accepted for delivery') self.assertEqual(len(Email.objects.all()), 2) self.assertEqual(len(IssueThread.objects.all()), 1) aiosmtplib.send.assert_called_once() self.assertEqual('test', Email.objects.all()[0].subject) self.assertEqual('test1@test', Email.objects.all()[0].sender) self.assertEqual('test2@localhost', Email.objects.all()[0].recipient) self.assertEqual('test', Email.objects.all()[0].body) self.assertEqual(IssueThread.objects.all()[0], Email.objects.all()[0].issue_thread) self.assertEqual('<1@test>', Email.objects.all()[0].reference) self.assertEqual(None, Email.objects.all()[0].in_reply_to) self.assertEqual(expected_auto_reply_subject.format('test', IssueThread.objects.all()[0].short_uuid()), Email.objects.all()[1].subject) self.assertEqual('test2@localhost', Email.objects.all()[1].sender) self.assertEqual('test1@test', Email.objects.all()[1].recipient) self.assertEqual(expected_auto_reply.format(IssueThread.objects.all()[0].short_uuid()), Email.objects.all()[1].body) self.assertEqual(IssueThread.objects.all()[0], Email.objects.all()[1].issue_thread) self.assertTrue(Email.objects.all()[1].reference.startswith("<")) self.assertTrue(Email.objects.all()[1].reference.endswith("@localhost>")) self.assertEqual("<1@test>", Email.objects.all()[1].in_reply_to) self.assertEqual('test', IssueThread.objects.all()[0].name) self.assertEqual('pending_new', IssueThread.objects.all()[0].state) self.assertEqual(None, IssueThread.objects.all()[0].assigned_to) states = StateChange.objects.filter(issue_thread=IssueThread.objects.all()[0]) self.assertEqual(1, len(states)) self.assertEqual('pending_new', states[0].state) def test_handle_quoted_printable(self): from aiosmtpd.smtp import Envelope from asgiref.sync import async_to_sync import aiosmtplib aiosmtplib.send = make_mocked_coro() handler = LMTPHandler() server = mock.Mock() session = mock.Mock() envelope = Envelope() envelope.mail_from = 'test1@test' envelope.rcpt_tos = ['test2@test'] envelope.content = b'Subject: test =?utf-8?Q?=C3=A4?=\nFrom: test3@test\nTo: test4@test\nMessage-ID: <1@test>\n\nText mit Quoted-Printable-Kodierung: =?utf-8?Q?=C3=A4=C3=B6=C3=BC=C3=9F?=' result = async_to_sync(handler.handle_DATA)(server, session, envelope) self.assertEqual(result, '250 Message accepted for delivery') self.assertEqual(len(Email.objects.all()), 2) self.assertEqual(len(IssueThread.objects.all()), 1) aiosmtplib.send.assert_called_once() self.assertEqual('test ä', Email.objects.all()[0].subject) self.assertEqual('Text mit Quoted-Printable-Kodierung: äöüß', Email.objects.all()[0].body) def test_handle_quoted_printable_2(self): from aiosmtpd.smtp import Envelope from asgiref.sync import async_to_sync import aiosmtplib aiosmtplib.send = make_mocked_coro() handler = LMTPHandler() server = mock.Mock() session = mock.Mock() envelope = Envelope() envelope.mail_from = 'test1@test' envelope.rcpt_tos = ['test2@test'] envelope.content = b'Subject: =?UTF-8?Q?suche_M=C3=BCtze?=\nFrom: test3@test\nTo: test4@test\nMessage-ID: <1@test>\n\nText mit Quoted-Printable-Kodierung: =?utf-8?Q?=C3=A4=C3=B6=C3=BC=C3=9F?=' result = async_to_sync(handler.handle_DATA)(server, session, envelope) self.assertEqual(result, '250 Message accepted for delivery') self.assertEqual(len(Email.objects.all()), 2) self.assertEqual(len(IssueThread.objects.all()), 1) aiosmtplib.send.assert_called_once() self.assertEqual('suche_Mütze', Email.objects.all()[0].subject) self.assertEqual('Text mit Quoted-Printable-Kodierung: äöüß', Email.objects.all()[0].body) def test_handle_base64(self): from aiosmtpd.smtp import Envelope from asgiref.sync import async_to_sync import aiosmtplib aiosmtplib.send = make_mocked_coro() handler = LMTPHandler() server = mock.Mock() session = mock.Mock() envelope = Envelope() envelope.mail_from = 'test1@test' envelope.rcpt_tos = ['test2@test'] envelope.content = b'Subject: =?utf-8?B?dGVzdA==?=\nFrom: test3@test\nTo: test4@test\nMessage-ID: <1@test>\n\nText mit Base64-Kodierung: =?utf-8?B?w6TDtsO8w58=?=' result = async_to_sync(handler.handle_DATA)(server, session, envelope) self.assertEqual(result, '250 Message accepted for delivery') self.assertEqual(len(Email.objects.all()), 2) self.assertEqual(len(IssueThread.objects.all()), 1) aiosmtplib.send.assert_called_once() self.assertEqual('test', Email.objects.all()[0].subject) self.assertEqual('Text mit Base64-Kodierung: äöüß', Email.objects.all()[0].body) def test_handle_client_reply(self): issue_thread = IssueThread.objects.create( name="test", ) mail1 = Email.objects.create( subject='test subject', body='test', sender='test1@test', recipient='test2@test', issue_thread=issue_thread, ) mail1_reply = Email.objects.create( subject='Message received', body='Thank you for your message.', sender='test2@test', recipient='test1@test', in_reply_to=mail1.reference, issue_thread=issue_thread, ) from aiosmtpd.smtp import Envelope from asgiref.sync import async_to_sync import aiosmtplib aiosmtplib.send = make_mocked_coro() handler = LMTPHandler() server = mock.Mock() session = mock.Mock() envelope = Envelope() envelope.mail_from = 'test1@test' envelope.rcpt_tos = ['test2@test'] envelope.content = (f'Subject: Re: test\nFrom: test3@test\nTo: test4@test\nMessage-ID: <3@test>\n' f'In-Reply-To: {mail1_reply.reference}'.encode('utf-8') + b'\n\ntest') result = async_to_sync(handler.handle_DATA)(server, session, envelope) self.assertEqual(result, '250 Message accepted for delivery') self.assertEqual(len(Email.objects.all()), 3) self.assertEqual(len(IssueThread.objects.all()), 1) aiosmtplib.send.assert_not_called() self.assertEqual(Email.objects.all()[2].subject, 'Re: test') self.assertEqual(Email.objects.all()[2].sender, 'test1@test') self.assertEqual(Email.objects.all()[2].recipient, 'test2@test') self.assertEqual(Email.objects.all()[2].body, 'test') self.assertEqual(Email.objects.all()[2].issue_thread, issue_thread) self.assertEqual(Email.objects.all()[2].reference, '<3@test>') self.assertEqual(Email.objects.all()[2].in_reply_to, mail1_reply.reference) self.assertEqual(IssueThread.objects.all()[0].name, 'test') self.assertEqual(IssueThread.objects.all()[0].state, 'pending_new') self.assertEqual(IssueThread.objects.all()[0].assigned_to, None) def test_handle_client_reply_2(self): issue_thread = IssueThread.objects.create( name="test", ) mail1 = Email.objects.create( subject='test subject', body='test', sender='test1@test', recipient='test2@test', issue_thread=issue_thread, ) mail1_reply = Email.objects.create( subject='Message received', body='Thank you for your message.', sender='test2@test', recipient='test1@test', in_reply_to=mail1.reference, issue_thread=issue_thread, ) StateChange.objects.create( issue_thread=issue_thread, state='waiting_details', ) self.assertEqual(IssueThread.objects.all()[0].state, 'waiting_details') from aiosmtpd.smtp import Envelope from asgiref.sync import async_to_sync import aiosmtplib aiosmtplib.send = make_mocked_coro() handler = LMTPHandler() server = mock.Mock() session = mock.Mock() envelope = Envelope() envelope.mail_from = 'test1@test' envelope.rcpt_tos = ['test2@test'] envelope.content = (f'Subject: Re: test\nFrom: test3@test\nTo: test4@test\nMessage-ID: <3@test>\n' f'In-Reply-To: {mail1_reply.reference}'.encode('utf-8') + b'\n\ntest') result = async_to_sync(handler.handle_DATA)(server, session, envelope) self.assertEqual(result, '250 Message accepted for delivery') self.assertEqual(len(Email.objects.all()), 3) self.assertEqual(len(IssueThread.objects.all()), 1) aiosmtplib.send.assert_not_called() self.assertEqual(Email.objects.all()[2].subject, 'Re: test') self.assertEqual(Email.objects.all()[2].sender, 'test1@test') self.assertEqual(Email.objects.all()[2].recipient, 'test2@test') self.assertEqual(Email.objects.all()[2].body, 'test') self.assertEqual(Email.objects.all()[2].issue_thread, issue_thread) self.assertEqual(Email.objects.all()[2].reference, '<3@test>') self.assertEqual(Email.objects.all()[2].in_reply_to, mail1_reply.reference) self.assertEqual(IssueThread.objects.all()[0].name, 'test') self.assertEqual(IssueThread.objects.all()[0].state, 'pending_open') self.assertEqual(IssueThread.objects.all()[0].assigned_to, None) def test_mail_reply(self): issue_thread = IssueThread.objects.create( name="test subject", ) mail1 = Email.objects.create( subject='test subject', body='test', sender='test1@test', recipient='test2@localhost', issue_thread=issue_thread, ) mail1_reply = Email.objects.create( subject='Message received', body='Thank you for your message.', sender='test2@localhost', recipient='test1@test', in_reply_to=mail1.reference, issue_thread=issue_thread, ) import aiosmtplib aiosmtplib.send = make_mocked_coro() response = self.client.post(f'/api/2/tickets/{issue_thread.id}/reply/', { 'message': 'test' }) self.assertEqual(response.status_code, 201) self.assertEqual(len(Email.objects.all()), 3) self.assertEqual(len(IssueThread.objects.all()), 1) aiosmtplib.send.assert_called_once() self.assertEqual(Email.objects.all()[0].subject, 'test subject') self.assertEqual(Email.objects.all()[0].sender, 'test1@test') self.assertEqual(Email.objects.all()[0].recipient, 'test2@localhost') self.assertEqual(Email.objects.all()[0].body, 'test') self.assertEqual(Email.objects.all()[0].issue_thread, issue_thread) self.assertEqual(Email.objects.all()[0].reference, mail1.reference) self.assertEqual(Email.objects.all()[1].subject, 'Message received') self.assertEqual(Email.objects.all()[1].sender, 'test2@localhost') self.assertEqual(Email.objects.all()[1].recipient, 'test1@test') self.assertEqual(Email.objects.all()[1].body, 'Thank you for your message.') self.assertEqual(Email.objects.all()[1].issue_thread, issue_thread) self.assertTrue(Email.objects.all()[1].reference.startswith("<")) self.assertTrue(Email.objects.all()[1].reference.endswith("@localhost>")) self.assertEqual(Email.objects.all()[1].in_reply_to, mail1.reference) self.assertEqual(Email.objects.all()[2].subject, 'Re: test subject [#{0}]'.format(issue_thread.short_uuid())) self.assertEqual(Email.objects.all()[2].sender, 'test2@localhost') self.assertEqual(Email.objects.all()[2].recipient, 'test1@test') self.assertEqual(Email.objects.all()[2].body, 'test') self.assertEqual(Email.objects.all()[2].issue_thread, issue_thread) self.assertTrue(Email.objects.all()[2].reference.startswith("<")) self.assertTrue(Email.objects.all()[2].reference.endswith("@localhost>")) self.assertEqual(Email.objects.all()[2].in_reply_to, mail1.reference) def test_match_event(self): event = Event.objects.create( name="Test event", slug="test-event", ) event_address = EventAddress.objects.create( event=event, address="test_event@localhost", ) from aiosmtpd.smtp import Envelope from asgiref.sync import async_to_sync import aiosmtplib aiosmtplib.send = make_mocked_coro() handler = LMTPHandler() server = mock.Mock() session = mock.Mock() envelope = Envelope() envelope.mail_from = 'test1@test' envelope.rcpt_tos = ['test_event@localhost'] envelope.content = b'Subject: test\nFrom: test1@test\nTo: test_event@localhost\nMessage-ID: <1@test>\n\ntest' result = async_to_sync(handler.handle_DATA)(server, session, envelope) self.assertEqual(result, '250 Message accepted for delivery') self.assertEqual(len(Email.objects.all()), 2) self.assertEqual(len(IssueThread.objects.all()), 1) aiosmtplib.send.assert_called_once() self.assertEqual(event, Email.objects.all()[0].event) self.assertEqual(event, Email.objects.all()[1].event) self.assertEqual('test', Email.objects.all()[0].subject) self.assertEqual(expected_auto_reply_subject.format('test', IssueThread.objects.all()[0].short_uuid()), Email.objects.all()[1].subject) self.assertEqual('test1@test', Email.objects.all()[0].sender) self.assertEqual('test_event@localhost', Email.objects.all()[0].recipient) self.assertEqual('test_event@localhost', Email.objects.all()[1].sender) self.assertEqual('test1@test', Email.objects.all()[1].recipient) self.assertEqual('test', Email.objects.all()[0].body) self.assertEqual(expected_auto_reply.format(IssueThread.objects.all()[0].short_uuid()), Email.objects.all()[1].body) self.assertEqual(IssueThread.objects.all()[0], Email.objects.all()[0].issue_thread) self.assertEqual(IssueThread.objects.all()[0], Email.objects.all()[1].issue_thread) self.assertEqual('<1@test>', Email.objects.all()[0].reference) self.assertEqual(None, Email.objects.all()[0].in_reply_to) self.assertTrue(Email.objects.all()[1].reference.startswith("<")) self.assertTrue(Email.objects.all()[1].reference.endswith("@localhost>")) self.assertEqual("<1@test>", Email.objects.all()[1].in_reply_to) self.assertEqual('test', IssueThread.objects.all()[0].name) self.assertEqual(None, IssueThread.objects.all()[0].assigned_to) self.assertEqual(1, len(IssueThread.objects.all())) self.assertEqual('pending_new', IssueThread.objects.all()[0].state) states = StateChange.objects.filter(issue_thread=IssueThread.objects.all()[0]) self.assertEqual(1, len(states)) self.assertEqual('pending_new', states[0].state) def test_mail_html_body(self): from aiosmtpd.smtp import Envelope from asgiref.sync import async_to_sync import aiosmtplib aiosmtplib.send = make_mocked_coro() handler = LMTPHandler() server = mock.Mock() session = mock.Mock() envelope = Envelope() envelope.mail_from = 'test1@test' envelope.rcpt_tos = ['test2@test'] envelope.content = b'''Subject: test From: test1@test To: test2@test Message-ID: <1@test> Content-Type: text/html; charset=utf-8

test

''' result = async_to_sync(handler.handle_DATA)(server, session, envelope) self.assertEqual(result, '250 Message accepted for delivery') self.assertEqual(len(Email.objects.all()), 2) self.assertEqual(len(IssueThread.objects.all()), 1) self.assertEqual(len(EmailAttachment.objects.all()), 0) aiosmtplib.send.assert_called_once() self.assertEqual('test', Email.objects.all()[0].subject) self.assertEqual('test1@test', Email.objects.all()[0].sender) self.assertEqual('test2@test', Email.objects.all()[0].recipient) self.assertEqual('test', Email.objects.all()[0].body) self.assertEqual(IssueThread.objects.all()[0], Email.objects.all()[0].issue_thread) self.assertEqual('<1@test>', Email.objects.all()[0].reference) self.assertEqual(None, Email.objects.all()[0].in_reply_to) self.assertEqual(expected_auto_reply_subject.format('test', IssueThread.objects.all()[0].short_uuid()), Email.objects.all()[1].subject) self.assertEqual('test2@test', Email.objects.all()[1].sender) self.assertEqual('test1@test', Email.objects.all()[1].recipient) self.assertEqual(expected_auto_reply.format(IssueThread.objects.all()[0].short_uuid()), Email.objects.all()[1].body) self.assertEqual(IssueThread.objects.all()[0], Email.objects.all()[1].issue_thread) self.assertTrue(Email.objects.all()[1].reference.startswith("<")) self.assertTrue(Email.objects.all()[1].reference.endswith("@localhost>")) self.assertEqual("<1@test>", Email.objects.all()[1].in_reply_to) self.assertEqual('test', IssueThread.objects.all()[0].name) self.assertEqual('pending_new', IssueThread.objects.all()[0].state) self.assertEqual(None, IssueThread.objects.all()[0].assigned_to) states = StateChange.objects.filter(issue_thread=IssueThread.objects.all()[0]) self.assertEqual(1, len(states)) self.assertEqual('pending_new', states[0].state) def test_split_text_inline_image(self): from aiosmtpd.smtp import Envelope from asgiref.sync import async_to_sync import aiosmtplib aiosmtplib.send = make_mocked_coro() handler = LMTPHandler() server = mock.Mock() session = mock.Mock() envelope = Envelope() envelope.mail_from = 'test1@test' envelope.rcpt_tos = ['test2@test'] envelope.content = b'''Subject: test From: test1@test To: test2@test Message-ID: <1@test> Content-Type: multipart/alternative; boundary="abc" --abc Content-Type: text/plain; charset=utf-8 test1 --abc Content-Type: image/jpeg; name="test.jpg" Content-Disposition: inline; filename="test.jpg" Content-Transfer-Encoding: base64 Content-ID: <1> X-Attachment-Id: 1 dGVzdGltYWdl --abc Content-Type: text/plain; charset=utf-8 test2 --abc--''' result = async_to_sync(handler.handle_DATA)(server, session, envelope) self.assertEqual(result, '250 Message accepted for delivery') self.assertEqual(len(Email.objects.all()), 2) self.assertEqual(len(IssueThread.objects.all()), 1) aiosmtplib.send.assert_called_once() self.assertEqual('test', Email.objects.all()[0].subject) self.assertEqual('test1@test', Email.objects.all()[0].sender) self.assertEqual('test2@test', Email.objects.all()[0].recipient) self.assertEqual('test1\ntest2\n', Email.objects.all()[0].body) self.assertEqual(IssueThread.objects.all()[0], Email.objects.all()[0].issue_thread) self.assertEqual('<1@test>', Email.objects.all()[0].reference) self.assertEqual(None, Email.objects.all()[0].in_reply_to) self.assertEqual(expected_auto_reply_subject.format('test', IssueThread.objects.all()[0].short_uuid()), Email.objects.all()[1].subject) self.assertEqual('test2@test', Email.objects.all()[1].sender) self.assertEqual('test1@test', Email.objects.all()[1].recipient) self.assertEqual(expected_auto_reply.format(IssueThread.objects.all()[0].short_uuid()), Email.objects.all()[1].body) self.assertEqual(IssueThread.objects.all()[0], Email.objects.all()[1].issue_thread) self.assertTrue(Email.objects.all()[1].reference.startswith("<")) self.assertTrue(Email.objects.all()[1].reference.endswith("@localhost>")) self.assertEqual("<1@test>", Email.objects.all()[1].in_reply_to) self.assertEqual('test', IssueThread.objects.all()[0].name) self.assertEqual('pending_new', IssueThread.objects.all()[0].state) self.assertEqual(None, IssueThread.objects.all()[0].assigned_to) states = StateChange.objects.filter(issue_thread=IssueThread.objects.all()[0]) self.assertEqual(1, len(states)) self.assertEqual('pending_new', states[0].state) self.assertEqual(1, len(EmailAttachment.objects.all())) self.assertEqual(1, EmailAttachment.objects.all()[0].id) self.assertEqual('image/jpeg', EmailAttachment.objects.all()[0].mime_type) self.assertEqual('test.jpg', EmailAttachment.objects.all()[0].name) file_content = EmailAttachment.objects.all()[0].file.read() self.assertEqual(b'testimage', file_content) def test_text_with_attachment(self): from aiosmtpd.smtp import Envelope from asgiref.sync import async_to_sync import aiosmtplib aiosmtplib.send = make_mocked_coro() handler = LMTPHandler() server = mock.Mock() session = mock.Mock() envelope = Envelope() envelope.mail_from = 'test1@test' envelope.rcpt_tos = ['test2@test'] envelope.content = b'''Subject: test From: test1@test To: test2@test Message-ID: <1@test> Content-Type: multipart/mixed; boundary="abc" --abc Content-Type: text/plain; charset=utf-8 test1 --abc Content-Type: image/jpeg; name="test.jpg" Content-Disposition: attachment; filename="test.jpg" Content-Transfer-Encoding: base64 Content-ID: <1> X-Attachment-Id: 1 dGVzdGltYWdl --abc--''' result = async_to_sync(handler.handle_DATA)(server, session, envelope) self.assertEqual(result, '250 Message accepted for delivery') self.assertEqual(len(Email.objects.all()), 2) self.assertEqual(len(IssueThread.objects.all()), 1) aiosmtplib.send.assert_called_once() self.assertEqual('test', Email.objects.all()[0].subject) self.assertEqual('test1@test', Email.objects.all()[0].sender) self.assertEqual('test2@test', Email.objects.all()[0].recipient) self.assertEqual('test1\n', Email.objects.all()[0].body) self.assertEqual(IssueThread.objects.all()[0], Email.objects.all()[0].issue_thread) self.assertEqual('<1@test>', Email.objects.all()[0].reference) self.assertEqual(None, Email.objects.all()[0].in_reply_to) self.assertEqual(expected_auto_reply_subject.format('test', IssueThread.objects.all()[0].short_uuid()), Email.objects.all()[1].subject) self.assertEqual('test2@test', Email.objects.all()[1].sender) self.assertEqual('test1@test', Email.objects.all()[1].recipient) self.assertEqual(expected_auto_reply.format(IssueThread.objects.all()[0].short_uuid()), Email.objects.all()[1].body) self.assertEqual(IssueThread.objects.all()[0], Email.objects.all()[1].issue_thread) self.assertTrue(Email.objects.all()[1].reference.startswith("<")) self.assertTrue(Email.objects.all()[1].reference.endswith("@localhost>")) self.assertEqual("<1@test>", Email.objects.all()[1].in_reply_to) self.assertEqual('test', IssueThread.objects.all()[0].name) self.assertEqual('pending_new', IssueThread.objects.all()[0].state) self.assertEqual(None, IssueThread.objects.all()[0].assigned_to) states = StateChange.objects.filter(issue_thread=IssueThread.objects.all()[0]) self.assertEqual(1, len(states)) self.assertEqual('pending_new', states[0].state) self.assertEqual(1, len(EmailAttachment.objects.all())) self.assertEqual(1, EmailAttachment.objects.all()[0].id) self.assertEqual('image/jpeg', EmailAttachment.objects.all()[0].mime_type) self.assertEqual('test.jpg', EmailAttachment.objects.all()[0].name) file_content = EmailAttachment.objects.all()[0].file.read() self.assertEqual(b'testimage', file_content) def test_mail_noreply(self): from aiosmtpd.smtp import Envelope from asgiref.sync import async_to_sync import aiosmtplib aiosmtplib.send = make_mocked_coro() handler = LMTPHandler() server = mock.Mock() session = mock.Mock() envelope = Envelope() envelope.mail_from = 'noreply@test' envelope.rcpt_tos = ['test2@test'] envelope.content = b'Subject: test\nFrom: noreply@test\nTo: test2@test\nMessage-ID: <1@test>\n\ntest' result = async_to_sync(handler.handle_DATA)(server, session, envelope) self.assertEqual(result, '250 Message accepted for delivery') self.assertEqual(len(Email.objects.all()), 1) self.assertEqual(len(IssueThread.objects.all()), 1) aiosmtplib.send.assert_not_called() self.assertEqual('test', Email.objects.all()[0].subject) self.assertEqual('noreply@test', Email.objects.all()[0].sender) self.assertEqual('test2@test', Email.objects.all()[0].recipient) self.assertEqual('test', Email.objects.all()[0].body) self.assertEqual(IssueThread.objects.all()[0], Email.objects.all()[0].issue_thread) self.assertEqual('<1@test>', Email.objects.all()[0].reference) self.assertEqual(None, Email.objects.all()[0].in_reply_to) self.assertEqual('test', IssueThread.objects.all()[0].name) self.assertEqual('pending_new', IssueThread.objects.all()[0].state) self.assertEqual(None, IssueThread.objects.all()[0].assigned_to) states = StateChange.objects.filter(issue_thread=IssueThread.objects.all()[0]) self.assertEqual(1, len(states)) self.assertEqual('pending_new', states[0].state) def test_mail_empty_subject(self): from aiosmtpd.smtp import Envelope from asgiref.sync import async_to_sync import aiosmtplib import logging logging.disable(logging.CRITICAL) aiosmtplib.send = make_mocked_coro() handler = LMTPHandler() server = mock.Mock() session = mock.Mock() envelope = Envelope() envelope.mail_from = 'test1@test' envelope.rcpt_tos = ['test2@test'] envelope.content = b'From: noreply@test\nTo: test2@test\nMessage-ID: <1@test>\n\ntest' result = async_to_sync(handler.handle_DATA)(server, session, envelope) logging.disable(logging.NOTSET) self.assertEqual('250 Message accepted for delivery', result) self.assertEqual(2, len(Email.objects.all())) self.assertEqual(1, len(IssueThread.objects.all())) aiosmtplib.send.assert_called_once() self.assertEqual('No subject', Email.objects.all()[0].subject) self.assertEqual('test1@test', Email.objects.all()[0].sender) self.assertEqual('test2@test', Email.objects.all()[0].recipient) self.assertEqual('test', Email.objects.all()[0].body) self.assertEqual(IssueThread.objects.all()[0], Email.objects.all()[0].issue_thread) self.assertEqual('<1@test>', Email.objects.all()[0].reference) self.assertEqual(None, Email.objects.all()[0].in_reply_to) self.assertEqual(expected_auto_reply_subject.format('No subject', IssueThread.objects.all()[0].short_uuid()), Email.objects.all()[1].subject) self.assertEqual('test2@test', Email.objects.all()[1].sender) self.assertEqual('test1@test', Email.objects.all()[1].recipient) self.assertEqual(expected_auto_reply.format(IssueThread.objects.all()[0].short_uuid()), Email.objects.all()[1].body) self.assertEqual(IssueThread.objects.all()[0], Email.objects.all()[1].issue_thread) self.assertTrue(Email.objects.all()[1].reference.startswith("<")) self.assertTrue(Email.objects.all()[1].reference.endswith("@localhost>")) self.assertEqual("<1@test>", Email.objects.all()[1].in_reply_to) self.assertEqual('No subject', IssueThread.objects.all()[0].name) self.assertEqual('pending_new', IssueThread.objects.all()[0].state) self.assertEqual(None, IssueThread.objects.all()[0].assigned_to) states = StateChange.objects.filter(issue_thread=IssueThread.objects.all()[0]) self.assertEqual(1, len(states)) self.assertEqual('pending_new', states[0].state) def test_mail_empty_body(self): from aiosmtpd.smtp import Envelope from asgiref.sync import async_to_sync import aiosmtplib import logging logging.disable(logging.CRITICAL) aiosmtplib.send = make_mocked_coro() handler = LMTPHandler() server = mock.Mock() session = mock.Mock() envelope = Envelope() envelope.mail_from = '' envelope.rcpt_tos = ['test2@test'] envelope.content = b'Subject: test\nFrom: \nTo: test2@test\nMessage-ID: <1@test>\n\n' result = async_to_sync(handler.handle_DATA)(server, session, envelope) logging.disable(logging.NOTSET) self.assertEqual('250 Message accepted for delivery', result) self.assertEqual(2, len(Email.objects.all())) self.assertEqual(1, len(IssueThread.objects.all())) aiosmtplib.send.assert_called_once() self.assertEqual('test', Email.objects.all()[0].subject) self.assertEqual('', Email.objects.all()[0].sender) self.assertEqual('test2@test', Email.objects.all()[0].recipient) self.assertEqual('', Email.objects.all()[0].body) self.assertEqual(IssueThread.objects.all()[0], Email.objects.all()[0].issue_thread) self.assertEqual('<1@test>', Email.objects.all()[0].reference) self.assertEqual(None, Email.objects.all()[0].in_reply_to) self.assertEqual(expected_auto_reply_subject.format('test', IssueThread.objects.all()[0].short_uuid()), Email.objects.all()[1].subject) self.assertEqual('test2@test', Email.objects.all()[1].sender) self.assertEqual('', Email.objects.all()[1].recipient) self.assertEqual(expected_auto_reply.format(IssueThread.objects.all()[0].short_uuid()), Email.objects.all()[1].body) self.assertEqual(IssueThread.objects.all()[0], Email.objects.all()[1].issue_thread) self.assertTrue(Email.objects.all()[1].reference.startswith("<")) self.assertTrue(Email.objects.all()[1].reference.endswith("@localhost>")) self.assertEqual("<1@test>", Email.objects.all()[1].in_reply_to) self.assertEqual('test', IssueThread.objects.all()[0].name) self.assertEqual('pending_new', IssueThread.objects.all()[0].state) self.assertEqual(None, IssueThread.objects.all()[0].assigned_to) states = StateChange.objects.filter(issue_thread=IssueThread.objects.all()[0]) self.assertEqual(1, len(states)) self.assertEqual('pending_new', states[0].state) def test_mail_plus_issue_thread(self): issue_thread = IssueThread.objects.create( name="test subject", ) mail1 = Email.objects.create( subject='test subject', body='test', sender='test1@test', recipient='test2@localhost', issue_thread=issue_thread, ) mail2 = Email.objects.create( subject='Message received', body='Thank you for your message.', sender='test2@localhost', recipient='test1@test', in_reply_to=mail1.reference, issue_thread=issue_thread, ) Email.objects.create( subject='Re: Message received', body='any updates?', sender='test3@test', recipient='test2@localhost', in_reply_to=mail2.reference, issue_thread=issue_thread, ) from aiosmtpd.smtp import Envelope from asgiref.sync import async_to_sync import aiosmtplib import logging logging.disable(logging.CRITICAL) aiosmtplib.send = make_mocked_coro() handler = LMTPHandler() server = mock.Mock() session = mock.Mock() envelope = Envelope() envelope.mail_from = '' envelope.rcpt_tos = ['ticket+{}@test'.format(issue_thread.uuid)] envelope.content = (f'Subject: foo\nFrom: \nTo: ticket+{issue_thread.uuid}@test\n' f'Message-ID: <3@test>\n\nbar'.encode('utf-8')) result = async_to_sync(handler.handle_DATA)(server, session, envelope) logging.disable(logging.NOTSET) self.assertEqual('250 Message accepted for delivery', result) self.assertEqual(4, len(Email.objects.all())) self.assertEqual(4, len(Email.objects.filter(issue_thread=issue_thread))) self.assertEqual(1, len(IssueThread.objects.all())) aiosmtplib.send.assert_not_called() self.assertEqual(Email.objects.all()[3].subject, 'foo') self.assertEqual(Email.objects.all()[3].sender, '') self.assertEqual(Email.objects.all()[3].recipient, 'ticket+{}@test'.format(issue_thread.uuid)) self.assertEqual(Email.objects.all()[3].body, 'bar') self.assertEqual(Email.objects.all()[3].issue_thread, issue_thread) self.assertEqual(Email.objects.all()[3].reference, '<3@test>') self.assertEqual('test subject', IssueThread.objects.all()[0].name) response = self.client.post(f'/api/2/tickets/{issue_thread.id}/reply/', { 'message': 'test' }) self.assertEqual(response.status_code, 201) self.assertEqual(5, len(Email.objects.all())) self.assertEqual(5, len(Email.objects.filter(issue_thread=issue_thread))) self.assertEqual(1, len(IssueThread.objects.all())) self.assertEqual(Email.objects.all()[4].subject, 'Re: test subject [#{0}]'.format(issue_thread.short_uuid())) self.assertEqual(Email.objects.all()[4].sender, 'test2@localhost') self.assertEqual(Email.objects.all()[4].recipient, 'test1@test') self.assertEqual(Email.objects.all()[4].body, 'test') self.assertEqual(Email.objects.all()[4].issue_thread, issue_thread) self.assertTrue(Email.objects.all()[4].reference.startswith("<")) self.assertTrue(Email.objects.all()[4].reference.endswith("@localhost>")) self.assertEqual(Email.objects.all()[4].in_reply_to, mail1.reference) self.assertEqual('test subject', IssueThread.objects.all()[0].name) self.assertEqual('pending_new', IssueThread.objects.all()[0].state) self.assertEqual(None, IssueThread.objects.all()[0].assigned_to) aiosmtplib.send.assert_called_once() def test_mail_4byte_unicode_emoji(self): from aiosmtpd.smtp import Envelope from asgiref.sync import async_to_sync import aiosmtplib aiosmtplib.send = make_mocked_coro() handler = LMTPHandler() server = mock.Mock() session = mock.Mock() envelope = Envelope() envelope.mail_from = 'test1@test' envelope.rcpt_tos = ['test2@test'] envelope.content = b'''Subject: test From: test1@test To: test2@test Message-ID: <1@test> Content-Type: text/html; charset=utf-8 thank you =?utf-8?Q?=F0=9F=98=8A?=''' # thank you 😊 result = async_to_sync(handler.handle_DATA)(server, session, envelope) self.assertEqual('250 Message accepted for delivery', result) self.assertEqual(2, len(Email.objects.all())) self.assertEqual(1, len(IssueThread.objects.all())) aiosmtplib.send.assert_called_once() self.assertEqual('test', Email.objects.all()[0].subject) self.assertEqual('test1@test', Email.objects.all()[0].sender) self.assertEqual('test2@test', Email.objects.all()[0].recipient) self.assertEqual('thank you 😊', Email.objects.all()[0].body) self.assertEqual(IssueThread.objects.all()[0], Email.objects.all()[0].issue_thread) self.assertEqual('<1@test>', Email.objects.all()[0].reference) self.assertEqual(None, Email.objects.all()[0].in_reply_to) self.assertEqual(expected_auto_reply_subject.format('test', IssueThread.objects.all()[0].short_uuid()), Email.objects.all()[1].subject) self.assertEqual('test2@test', Email.objects.all()[1].sender) self.assertEqual('test1@test', Email.objects.all()[1].recipient) self.assertEqual(expected_auto_reply.format(IssueThread.objects.all()[0].short_uuid()), Email.objects.all()[1].body) self.assertEqual(IssueThread.objects.all()[0], Email.objects.all()[1].issue_thread) self.assertTrue(Email.objects.all()[1].reference.startswith("<")) self.assertTrue(Email.objects.all()[1].reference.endswith("@localhost>")) self.assertEqual("<1@test>", Email.objects.all()[1].in_reply_to) self.assertEqual('test', IssueThread.objects.all()[0].name) self.assertEqual('pending_new', IssueThread.objects.all()[0].state) self.assertEqual(None, IssueThread.objects.all()[0].assigned_to) states = StateChange.objects.filter(issue_thread=IssueThread.objects.all()[0]) self.assertEqual(1, len(states)) self.assertEqual('pending_new', states[0].state) def test_mail_non_utf8(self): from aiosmtpd.smtp import Envelope from asgiref.sync import async_to_sync import aiosmtplib aiosmtplib.send = make_mocked_coro() handler = LMTPHandler() server = mock.Mock() session = mock.Mock() envelope = Envelope() envelope.mail_from = 'test1@test' envelope.rcpt_tos = ['test2@test'] envelope.content = b'''Subject: test From: test1@test To: test2@test Message-ID: <1@test> Content-Type: text/html; charset=iso-8859-1 hello \xe4\xf6\xfc''' result = async_to_sync(handler.handle_DATA)(server, session, envelope) self.assertEqual('250 Message accepted for delivery', result) self.assertEqual(2, len(Email.objects.all())) self.assertEqual(1, len(IssueThread.objects.all())) aiosmtplib.send.assert_called_once() self.assertEqual('test', Email.objects.all()[0].subject) self.assertEqual('test1@test', Email.objects.all()[0].sender) self.assertEqual('test2@test', Email.objects.all()[0].recipient) self.assertEqual('hello äöü', Email.objects.all()[0].body) self.assertEqual(IssueThread.objects.all()[0], Email.objects.all()[0].issue_thread) self.assertEqual('<1@test>', Email.objects.all()[0].reference) self.assertEqual(None, Email.objects.all()[0].in_reply_to) self.assertEqual(expected_auto_reply_subject.format('test', IssueThread.objects.all()[0].short_uuid()), Email.objects.all()[1].subject) self.assertEqual('test2@test', Email.objects.all()[1].sender) self.assertEqual('test1@test', Email.objects.all()[1].recipient) self.assertEqual(expected_auto_reply.format(IssueThread.objects.all()[0].short_uuid()), Email.objects.all()[1].body) self.assertEqual(IssueThread.objects.all()[0], Email.objects.all()[1].issue_thread) self.assertTrue(Email.objects.all()[1].reference.startswith("<")) self.assertTrue(Email.objects.all()[1].reference.endswith("@localhost>")) self.assertEqual("<1@test>", Email.objects.all()[1].in_reply_to) self.assertEqual('test', IssueThread.objects.all()[0].name) self.assertEqual('pending_new', IssueThread.objects.all()[0].state) self.assertEqual(None, IssueThread.objects.all()[0].assigned_to) states = StateChange.objects.filter(issue_thread=IssueThread.objects.all()[0]) self.assertEqual(1, len(states)) self.assertEqual('pending_new', states[0].state) def test_mail_quoted_printable_transfer_encoding(self): from aiosmtpd.smtp import Envelope from asgiref.sync import async_to_sync import aiosmtplib aiosmtplib.send = make_mocked_coro() handler = LMTPHandler() server = mock.Mock() session = mock.Mock() envelope = Envelope() envelope.mail_from = 'test1@test' envelope.rcpt_tos = ['test2@test'] envelope.content = b'''Subject: test From: test1@test To: test2@test Message-ID: <1@test> Content-Type: text/html; charset=utf-8 Content-Transfer-Encoding: quoted-printable hello =C3=A4=C3=B6=C3=BC''' result = async_to_sync(handler.handle_DATA)(server, session, envelope) self.assertEqual('250 Message accepted for delivery', result) self.assertEqual(2, len(Email.objects.all())) self.assertEqual(1, len(IssueThread.objects.all())) aiosmtplib.send.assert_called_once() self.assertEqual('test', Email.objects.all()[0].subject) self.assertEqual('test1@test', Email.objects.all()[0].sender) self.assertEqual('test2@test', Email.objects.all()[0].recipient) self.assertEqual('hello äöü', Email.objects.all()[0].body) self.assertEqual(IssueThread.objects.all()[0], Email.objects.all()[0].issue_thread) self.assertEqual('<1@test>', Email.objects.all()[0].reference) self.assertEqual(None, Email.objects.all()[0].in_reply_to) self.assertEqual(expected_auto_reply_subject.format('test', IssueThread.objects.all()[0].short_uuid()), Email.objects.all()[1].subject) self.assertEqual('test2@test', Email.objects.all()[1].sender) self.assertEqual('test1@test', Email.objects.all()[1].recipient) self.assertEqual(expected_auto_reply.format(IssueThread.objects.all()[0].short_uuid()), Email.objects.all()[1].body) self.assertEqual(IssueThread.objects.all()[0], Email.objects.all()[1].issue_thread) self.assertTrue(Email.objects.all()[1].reference.startswith("<")) self.assertTrue(Email.objects.all()[1].reference.endswith("@localhost>")) self.assertEqual("<1@test>", Email.objects.all()[1].in_reply_to) self.assertEqual('test', IssueThread.objects.all()[0].name) self.assertEqual('pending_new', IssueThread.objects.all()[0].state) self.assertEqual(None, IssueThread.objects.all()[0].assigned_to) states = StateChange.objects.filter(issue_thread=IssueThread.objects.all()[0]) self.assertEqual(1, len(states)) self.assertEqual('pending_new', states[0].state)