680 lines
33 KiB
Python
680 lines
33 KiB
Python
import inspect
|
||
from unittest import mock
|
||
|
||
from django.test import TestCase, Client
|
||
from django.contrib.auth.models import Permission
|
||
from knox.models import AuthToken
|
||
|
||
from authentication.models import ExtendedUser
|
||
from core.settings import MAIL_DOMAIN
|
||
from inventory.models import Event
|
||
from mail.models import Email, EventAddress, EmailAttachment
|
||
from mail.protocol import LMTPHandler
|
||
from tickets.models import IssueThread, StateChange
|
||
|
||
expected_auto_reply_subject = 'Re: {} [#{}]'
|
||
|
||
expected_auto_reply = '''Your request (#{}) has been received and will be reviewed by our lost&found angels.
|
||
|
||
We are reviewing incoming requests during the event and teardown. Immediately after the event, expect a delay as the \
|
||
workload is high. We will not forget about your request and get back in touch once we have updated information on your \
|
||
request. Requests for devices, wallets, credit cards or similar items will be handled with priority.
|
||
|
||
If you happen to find your lost item or just want to add additional information, please reply to this email. Please \
|
||
do not create a new request.
|
||
|
||
Your c3lf (Cloakroom + Lost&Found) Team'''
|
||
|
||
|
||
def make_mocked_coro(return_value=mock.sentinel, raise_exception=mock.sentinel):
|
||
async def mock_coro(*args, **kwargs):
|
||
if raise_exception is not mock.sentinel:
|
||
raise raise_exception
|
||
if not inspect.isawaitable(return_value):
|
||
return return_value
|
||
await return_value
|
||
|
||
return mock.Mock(wraps=mock_coro)
|
||
|
||
|
||
class EmailsApiTest(TestCase):
|
||
|
||
def setUp(self):
|
||
super().setUp()
|
||
self.user = ExtendedUser.objects.create_user('testuser', 'test', 'test')
|
||
self.user.user_permissions.add(*Permission.objects.all())
|
||
self.user.save()
|
||
self.token = AuthToken.objects.create(user=self.user)
|
||
self.client = Client(headers={'Authorization': 'Token ' + self.token[1]})
|
||
|
||
def test_mails(self):
|
||
Event.objects.get_or_create(
|
||
name="Test event",
|
||
slug="test-event",
|
||
)
|
||
Email.objects.create(
|
||
subject='test',
|
||
body='test',
|
||
sender='test',
|
||
recipient='test',
|
||
)
|
||
response = self.client.get('/api/2/mails/')
|
||
self.assertEqual(response.status_code, 200)
|
||
self.assertEqual(len(response.json()), 1)
|
||
self.assertEqual(response.json()[0]['subject'], 'test')
|
||
self.assertEqual(response.json()[0]['body'], 'test')
|
||
self.assertEqual(response.json()[0]['sender'], 'test')
|
||
self.assertEqual(response.json()[0]['recipient'], 'test')
|
||
|
||
def test_mails_empty(self):
|
||
response = self.client.get('/api/2/mails/')
|
||
self.assertEqual(response.status_code, 200)
|
||
self.assertEqual(response.json(), [])
|
||
|
||
|
||
class LMTPHandlerTestCase(TestCase): # TODO replace with less hacky test
|
||
|
||
def setUp(self):
|
||
super().setUp()
|
||
self.user = ExtendedUser.objects.create_user('testuser', 'test', 'test')
|
||
self.user.user_permissions.add(*Permission.objects.all())
|
||
self.user.save()
|
||
self.token = AuthToken.objects.create(user=self.user)
|
||
self.client = Client(headers={'Authorization': 'Token ' + self.token[1]})
|
||
|
||
def test_handle_client(self):
|
||
from aiosmtpd.smtp import Envelope
|
||
from asgiref.sync import async_to_sync
|
||
import aiosmtplib
|
||
aiosmtplib.send = make_mocked_coro()
|
||
handler = LMTPHandler()
|
||
server = mock.Mock()
|
||
session = mock.Mock()
|
||
envelope = Envelope()
|
||
envelope.mail_from = 'test1@test'
|
||
envelope.rcpt_tos = ['test2@test']
|
||
envelope.content = b'Subject: test\nFrom: test3@test\nTo: test4@test\nMessage-ID: <1@test>\n\ntest'
|
||
result = async_to_sync(handler.handle_DATA)(server, session, envelope)
|
||
self.assertEqual(result, '250 Message accepted for delivery')
|
||
self.assertEqual(len(Email.objects.all()), 2)
|
||
self.assertEqual(len(IssueThread.objects.all()), 1)
|
||
aiosmtplib.send.assert_called_once()
|
||
self.assertEqual('test', Email.objects.all()[0].subject)
|
||
self.assertEqual('test1@test', Email.objects.all()[0].sender)
|
||
self.assertEqual('test2@test', Email.objects.all()[0].recipient)
|
||
self.assertEqual('test', Email.objects.all()[0].body)
|
||
self.assertEqual(IssueThread.objects.all()[0], Email.objects.all()[0].issue_thread)
|
||
self.assertEqual('<1@test>', Email.objects.all()[0].reference)
|
||
self.assertEqual(None, Email.objects.all()[0].in_reply_to)
|
||
self.assertEqual(expected_auto_reply_subject.format('test', IssueThread.objects.all()[0].short_uuid()),
|
||
Email.objects.all()[1].subject)
|
||
self.assertEqual('test2@test', Email.objects.all()[1].sender)
|
||
self.assertEqual('test1@test', Email.objects.all()[1].recipient)
|
||
self.assertEqual(expected_auto_reply.format(IssueThread.objects.all()[0].short_uuid()),
|
||
Email.objects.all()[1].body)
|
||
self.assertEqual(IssueThread.objects.all()[0], Email.objects.all()[1].issue_thread)
|
||
self.assertTrue(Email.objects.all()[1].reference.startswith("<"))
|
||
self.assertTrue(Email.objects.all()[1].reference.endswith("@localhost>"))
|
||
self.assertEqual("<1@test>", Email.objects.all()[1].in_reply_to)
|
||
self.assertEqual('test', IssueThread.objects.all()[0].name)
|
||
self.assertEqual('pending_new', IssueThread.objects.all()[0].state)
|
||
self.assertEqual(None, IssueThread.objects.all()[0].assigned_to)
|
||
states = StateChange.objects.filter(issue_thread=IssueThread.objects.all()[0])
|
||
self.assertEqual(1, len(states))
|
||
self.assertEqual('pending_new', states[0].state)
|
||
|
||
def test_handle_quoted_printable(self):
|
||
from aiosmtpd.smtp import Envelope
|
||
from asgiref.sync import async_to_sync
|
||
import aiosmtplib
|
||
aiosmtplib.send = make_mocked_coro()
|
||
handler = LMTPHandler()
|
||
server = mock.Mock()
|
||
session = mock.Mock()
|
||
envelope = Envelope()
|
||
envelope.mail_from = 'test1@test'
|
||
envelope.rcpt_tos = ['test2@test']
|
||
envelope.content = b'Subject: test =?utf-8?Q?=C3=A4?=\nFrom: test3@test\nTo: test4@test\nMessage-ID: <1@test>\n\nText mit Quoted-Printable-Kodierung: =?utf-8?Q?=C3=A4=C3=B6=C3=BC=C3=9F?='
|
||
result = async_to_sync(handler.handle_DATA)(server, session, envelope)
|
||
self.assertEqual(result, '250 Message accepted for delivery')
|
||
self.assertEqual(len(Email.objects.all()), 2)
|
||
self.assertEqual(len(IssueThread.objects.all()), 1)
|
||
aiosmtplib.send.assert_called_once()
|
||
self.assertEqual('test ä', Email.objects.all()[0].subject)
|
||
self.assertEqual('Text mit Quoted-Printable-Kodierung: äöüß', Email.objects.all()[0].body)
|
||
|
||
def test_handle_quoted_printable_2(self):
|
||
from aiosmtpd.smtp import Envelope
|
||
from asgiref.sync import async_to_sync
|
||
import aiosmtplib
|
||
aiosmtplib.send = make_mocked_coro()
|
||
handler = LMTPHandler()
|
||
server = mock.Mock()
|
||
session = mock.Mock()
|
||
envelope = Envelope()
|
||
envelope.mail_from = 'test1@test'
|
||
envelope.rcpt_tos = ['test2@test']
|
||
envelope.content = b'Subject: =?UTF-8?Q?suche_M=C3=BCtze?=\nFrom: test3@test\nTo: test4@test\nMessage-ID: <1@test>\n\nText mit Quoted-Printable-Kodierung: =?utf-8?Q?=C3=A4=C3=B6=C3=BC=C3=9F?='
|
||
result = async_to_sync(handler.handle_DATA)(server, session, envelope)
|
||
self.assertEqual(result, '250 Message accepted for delivery')
|
||
self.assertEqual(len(Email.objects.all()), 2)
|
||
self.assertEqual(len(IssueThread.objects.all()), 1)
|
||
aiosmtplib.send.assert_called_once()
|
||
self.assertEqual('suche_Mütze', Email.objects.all()[0].subject)
|
||
self.assertEqual('Text mit Quoted-Printable-Kodierung: äöüß', Email.objects.all()[0].body)
|
||
|
||
def test_handle_base64(self):
|
||
from aiosmtpd.smtp import Envelope
|
||
from asgiref.sync import async_to_sync
|
||
import aiosmtplib
|
||
aiosmtplib.send = make_mocked_coro()
|
||
handler = LMTPHandler()
|
||
server = mock.Mock()
|
||
session = mock.Mock()
|
||
envelope = Envelope()
|
||
envelope.mail_from = 'test1@test'
|
||
envelope.rcpt_tos = ['test2@test']
|
||
envelope.content = b'Subject: =?utf-8?B?dGVzdA==?=\nFrom: test3@test\nTo: test4@test\nMessage-ID: <1@test>\n\nText mit Base64-Kodierung: =?utf-8?B?w6TDtsO8w58=?='
|
||
result = async_to_sync(handler.handle_DATA)(server, session, envelope)
|
||
self.assertEqual(result, '250 Message accepted for delivery')
|
||
self.assertEqual(len(Email.objects.all()), 2)
|
||
self.assertEqual(len(IssueThread.objects.all()), 1)
|
||
aiosmtplib.send.assert_called_once()
|
||
self.assertEqual('test', Email.objects.all()[0].subject)
|
||
self.assertEqual('Text mit Base64-Kodierung: äöüß', Email.objects.all()[0].body)
|
||
|
||
def test_handle_client_reply(self):
|
||
issue_thread = IssueThread.objects.create(
|
||
name="test",
|
||
)
|
||
mail1 = Email.objects.create(
|
||
subject='test subject',
|
||
body='test',
|
||
sender='test1@test',
|
||
recipient='test2@test',
|
||
issue_thread=issue_thread,
|
||
)
|
||
mail1_reply = Email.objects.create(
|
||
subject='Message received',
|
||
body='Thank you for your message.',
|
||
sender='test2@test',
|
||
recipient='test1@test',
|
||
in_reply_to=mail1.reference,
|
||
issue_thread=issue_thread,
|
||
)
|
||
from aiosmtpd.smtp import Envelope
|
||
from asgiref.sync import async_to_sync
|
||
import aiosmtplib
|
||
aiosmtplib.send = make_mocked_coro()
|
||
handler = LMTPHandler()
|
||
server = mock.Mock()
|
||
session = mock.Mock()
|
||
envelope = Envelope()
|
||
envelope.mail_from = 'test1@test'
|
||
envelope.rcpt_tos = ['test2@test']
|
||
envelope.content = (f'Subject: Re: test\nFrom: test3@test\nTo: test4@test\nMessage-ID: <3@test>\n'
|
||
f'In-Reply-To: {mail1_reply.reference}'.encode('utf-8') + b'\n\ntest')
|
||
result = async_to_sync(handler.handle_DATA)(server, session, envelope)
|
||
self.assertEqual(result, '250 Message accepted for delivery')
|
||
self.assertEqual(len(Email.objects.all()), 3)
|
||
self.assertEqual(len(IssueThread.objects.all()), 1)
|
||
aiosmtplib.send.assert_not_called()
|
||
self.assertEqual(Email.objects.all()[2].subject, 'Re: test')
|
||
self.assertEqual(Email.objects.all()[2].sender, 'test1@test')
|
||
self.assertEqual(Email.objects.all()[2].recipient, 'test2@test')
|
||
self.assertEqual(Email.objects.all()[2].body, 'test')
|
||
self.assertEqual(Email.objects.all()[2].issue_thread, issue_thread)
|
||
self.assertEqual(Email.objects.all()[2].reference, '<3@test>')
|
||
self.assertEqual(Email.objects.all()[2].in_reply_to, mail1_reply.reference)
|
||
self.assertEqual(IssueThread.objects.all()[0].name, 'test')
|
||
self.assertEqual(IssueThread.objects.all()[0].state, 'pending_new')
|
||
self.assertEqual(IssueThread.objects.all()[0].assigned_to, None)
|
||
|
||
def test_handle_client_reply_2(self):
|
||
issue_thread = IssueThread.objects.create(
|
||
name="test",
|
||
)
|
||
mail1 = Email.objects.create(
|
||
subject='test subject',
|
||
body='test',
|
||
sender='test1@test',
|
||
recipient='test2@test',
|
||
issue_thread=issue_thread,
|
||
)
|
||
mail1_reply = Email.objects.create(
|
||
subject='Message received',
|
||
body='Thank you for your message.',
|
||
sender='test2@test',
|
||
recipient='test1@test',
|
||
in_reply_to=mail1.reference,
|
||
issue_thread=issue_thread,
|
||
)
|
||
StateChange.objects.create(
|
||
issue_thread=issue_thread,
|
||
state='waiting_details',
|
||
)
|
||
self.assertEqual(IssueThread.objects.all()[0].state, 'waiting_details')
|
||
from aiosmtpd.smtp import Envelope
|
||
from asgiref.sync import async_to_sync
|
||
import aiosmtplib
|
||
aiosmtplib.send = make_mocked_coro()
|
||
handler = LMTPHandler()
|
||
server = mock.Mock()
|
||
session = mock.Mock()
|
||
envelope = Envelope()
|
||
envelope.mail_from = 'test1@test'
|
||
envelope.rcpt_tos = ['test2@test']
|
||
envelope.content = (f'Subject: Re: test\nFrom: test3@test\nTo: test4@test\nMessage-ID: <3@test>\n'
|
||
f'In-Reply-To: {mail1_reply.reference}'.encode('utf-8') + b'\n\ntest')
|
||
result = async_to_sync(handler.handle_DATA)(server, session, envelope)
|
||
self.assertEqual(result, '250 Message accepted for delivery')
|
||
self.assertEqual(len(Email.objects.all()), 3)
|
||
self.assertEqual(len(IssueThread.objects.all()), 1)
|
||
aiosmtplib.send.assert_not_called()
|
||
self.assertEqual(Email.objects.all()[2].subject, 'Re: test')
|
||
self.assertEqual(Email.objects.all()[2].sender, 'test1@test')
|
||
self.assertEqual(Email.objects.all()[2].recipient, 'test2@test')
|
||
self.assertEqual(Email.objects.all()[2].body, 'test')
|
||
self.assertEqual(Email.objects.all()[2].issue_thread, issue_thread)
|
||
self.assertEqual(Email.objects.all()[2].reference, '<3@test>')
|
||
self.assertEqual(Email.objects.all()[2].in_reply_to, mail1_reply.reference)
|
||
self.assertEqual(IssueThread.objects.all()[0].name, 'test')
|
||
self.assertEqual(IssueThread.objects.all()[0].state, 'pending_open')
|
||
self.assertEqual(IssueThread.objects.all()[0].assigned_to, None)
|
||
|
||
def test_mail_reply(self):
|
||
issue_thread = IssueThread.objects.create(
|
||
name="test",
|
||
)
|
||
mail1 = Email.objects.create(
|
||
subject='test subject',
|
||
body='test',
|
||
sender='test1@test',
|
||
recipient='test2@' + MAIL_DOMAIN,
|
||
issue_thread=issue_thread,
|
||
)
|
||
mail1_reply = Email.objects.create(
|
||
subject='Re: test subject',
|
||
body='Thank you for your message.',
|
||
sender='test2@' + MAIL_DOMAIN,
|
||
recipient='test1@test',
|
||
in_reply_to=mail1.reference,
|
||
issue_thread=issue_thread,
|
||
)
|
||
import aiosmtplib
|
||
aiosmtplib.send = make_mocked_coro()
|
||
response = self.client.post(f'/api/2/tickets/{issue_thread.id}/reply/', {
|
||
'message': 'test'
|
||
})
|
||
self.assertEqual(response.status_code, 201)
|
||
self.assertEqual(len(Email.objects.all()), 3)
|
||
self.assertEqual(len(IssueThread.objects.all()), 1)
|
||
aiosmtplib.send.assert_called_once()
|
||
self.assertEqual(Email.objects.all()[2].subject, 'Re: test subject')
|
||
self.assertEqual(Email.objects.all()[2].sender, 'test2@' + MAIL_DOMAIN)
|
||
self.assertEqual(Email.objects.all()[2].recipient, 'test1@test')
|
||
self.assertEqual(Email.objects.all()[2].body, 'test')
|
||
self.assertEqual(Email.objects.all()[2].issue_thread, issue_thread)
|
||
self.assertTrue(Email.objects.all()[2].reference.startswith("<"))
|
||
self.assertTrue(Email.objects.all()[2].reference.endswith("@localhost>"))
|
||
self.assertEqual(Email.objects.all()[2].in_reply_to, mail1.reference)
|
||
|
||
def test_match_event(self):
|
||
event = Event.objects.create(
|
||
name="Test event",
|
||
slug="test-event",
|
||
)
|
||
event_address = EventAddress.objects.create(
|
||
event=event,
|
||
address="test_event@localhost",
|
||
)
|
||
from aiosmtpd.smtp import Envelope
|
||
from asgiref.sync import async_to_sync
|
||
import aiosmtplib
|
||
aiosmtplib.send = make_mocked_coro()
|
||
handler = LMTPHandler()
|
||
server = mock.Mock()
|
||
session = mock.Mock()
|
||
envelope = Envelope()
|
||
envelope.mail_from = 'test1@test'
|
||
envelope.rcpt_tos = ['test_event@localhost']
|
||
envelope.content = b'Subject: test\nFrom: test1@test\nTo: test_event@localhost\nMessage-ID: <1@test>\n\ntest'
|
||
result = async_to_sync(handler.handle_DATA)(server, session, envelope)
|
||
self.assertEqual(result, '250 Message accepted for delivery')
|
||
self.assertEqual(len(Email.objects.all()), 2)
|
||
self.assertEqual(len(IssueThread.objects.all()), 1)
|
||
aiosmtplib.send.assert_called_once()
|
||
self.assertEqual(event, Email.objects.all()[0].event)
|
||
self.assertEqual(event, Email.objects.all()[1].event)
|
||
self.assertEqual('test', Email.objects.all()[0].subject)
|
||
self.assertEqual(expected_auto_reply_subject.format('test', IssueThread.objects.all()[0].short_uuid()),
|
||
Email.objects.all()[1].subject)
|
||
self.assertEqual('test1@test', Email.objects.all()[0].sender)
|
||
self.assertEqual('test_event@localhost', Email.objects.all()[0].recipient)
|
||
self.assertEqual('test_event@localhost', Email.objects.all()[1].sender)
|
||
self.assertEqual('test1@test', Email.objects.all()[1].recipient)
|
||
self.assertEqual('test', Email.objects.all()[0].body)
|
||
self.assertEqual(expected_auto_reply.format(IssueThread.objects.all()[0].short_uuid()),
|
||
Email.objects.all()[1].body)
|
||
self.assertEqual(IssueThread.objects.all()[0], Email.objects.all()[0].issue_thread)
|
||
self.assertEqual(IssueThread.objects.all()[0], Email.objects.all()[1].issue_thread)
|
||
self.assertEqual('<1@test>', Email.objects.all()[0].reference)
|
||
self.assertEqual(None, Email.objects.all()[0].in_reply_to)
|
||
self.assertTrue(Email.objects.all()[1].reference.startswith("<"))
|
||
self.assertTrue(Email.objects.all()[1].reference.endswith("@localhost>"))
|
||
self.assertEqual("<1@test>", Email.objects.all()[1].in_reply_to)
|
||
self.assertEqual('test', IssueThread.objects.all()[0].name)
|
||
self.assertEqual(None, IssueThread.objects.all()[0].assigned_to)
|
||
self.assertEqual(1, len(IssueThread.objects.all()))
|
||
self.assertEqual('pending_new', IssueThread.objects.all()[0].state)
|
||
states = StateChange.objects.filter(issue_thread=IssueThread.objects.all()[0])
|
||
self.assertEqual(1, len(states))
|
||
self.assertEqual('pending_new', states[0].state)
|
||
|
||
def test_split_text_inline_image(self):
|
||
from aiosmtpd.smtp import Envelope
|
||
from asgiref.sync import async_to_sync
|
||
import aiosmtplib
|
||
aiosmtplib.send = make_mocked_coro()
|
||
handler = LMTPHandler()
|
||
server = mock.Mock()
|
||
session = mock.Mock()
|
||
envelope = Envelope()
|
||
envelope.mail_from = 'test1@test'
|
||
envelope.rcpt_tos = ['test2@test']
|
||
envelope.content = b'''Subject: test
|
||
From: test1@test
|
||
To: test2@test
|
||
Message-ID: <1@test>
|
||
Content-Type: multipart/alternative; boundary="abc"
|
||
|
||
--abc
|
||
Content-Type: text/plain; charset=utf-8
|
||
|
||
test1
|
||
|
||
--abc
|
||
Content-Type: image/jpeg; name="test.jpg"
|
||
Content-Disposition: inline; filename="test.jpg"
|
||
Content-Transfer-Encoding: base64
|
||
Content-ID: <1>
|
||
X-Attachment-Id: 1
|
||
|
||
dGVzdGltYWdl
|
||
|
||
--abc
|
||
Content-Type: text/plain; charset=utf-8
|
||
|
||
test2
|
||
|
||
--abc--'''
|
||
|
||
result = async_to_sync(handler.handle_DATA)(server, session, envelope)
|
||
self.assertEqual(result, '250 Message accepted for delivery')
|
||
self.assertEqual(len(Email.objects.all()), 2)
|
||
self.assertEqual(len(IssueThread.objects.all()), 1)
|
||
aiosmtplib.send.assert_called_once()
|
||
self.assertEqual('test', Email.objects.all()[0].subject)
|
||
self.assertEqual('test1@test', Email.objects.all()[0].sender)
|
||
self.assertEqual('test2@test', Email.objects.all()[0].recipient)
|
||
self.assertEqual('test1\n<img src="cid:1">test2\n', Email.objects.all()[0].body)
|
||
self.assertEqual(IssueThread.objects.all()[0], Email.objects.all()[0].issue_thread)
|
||
self.assertEqual('<1@test>', Email.objects.all()[0].reference)
|
||
self.assertEqual(None, Email.objects.all()[0].in_reply_to)
|
||
self.assertEqual(expected_auto_reply_subject.format('test', IssueThread.objects.all()[0].short_uuid()),
|
||
Email.objects.all()[1].subject)
|
||
self.assertEqual('test2@test', Email.objects.all()[1].sender)
|
||
self.assertEqual('test1@test', Email.objects.all()[1].recipient)
|
||
self.assertEqual(expected_auto_reply.format(IssueThread.objects.all()[0].short_uuid()),
|
||
Email.objects.all()[1].body)
|
||
self.assertEqual(IssueThread.objects.all()[0], Email.objects.all()[1].issue_thread)
|
||
self.assertTrue(Email.objects.all()[1].reference.startswith("<"))
|
||
self.assertTrue(Email.objects.all()[1].reference.endswith("@localhost>"))
|
||
self.assertEqual("<1@test>", Email.objects.all()[1].in_reply_to)
|
||
self.assertEqual('test', IssueThread.objects.all()[0].name)
|
||
self.assertEqual('pending_new', IssueThread.objects.all()[0].state)
|
||
self.assertEqual(None, IssueThread.objects.all()[0].assigned_to)
|
||
states = StateChange.objects.filter(issue_thread=IssueThread.objects.all()[0])
|
||
self.assertEqual(1, len(states))
|
||
self.assertEqual('pending_new', states[0].state)
|
||
self.assertEqual(1, len(EmailAttachment.objects.all()))
|
||
self.assertEqual(1, EmailAttachment.objects.all()[0].id)
|
||
self.assertEqual('image/jpeg', EmailAttachment.objects.all()[0].mime_type)
|
||
self.assertEqual('test.jpg', EmailAttachment.objects.all()[0].name)
|
||
file_content = EmailAttachment.objects.all()[0].file.read()
|
||
self.assertEqual(b'testimage', file_content)
|
||
|
||
def test_text_with_attachment(self):
|
||
from aiosmtpd.smtp import Envelope
|
||
from asgiref.sync import async_to_sync
|
||
import aiosmtplib
|
||
aiosmtplib.send = make_mocked_coro()
|
||
handler = LMTPHandler()
|
||
server = mock.Mock()
|
||
session = mock.Mock()
|
||
envelope = Envelope()
|
||
envelope.mail_from = 'test1@test'
|
||
envelope.rcpt_tos = ['test2@test']
|
||
envelope.content = b'''Subject: test
|
||
From: test1@test
|
||
To: test2@test
|
||
Message-ID: <1@test>
|
||
Content-Type: multipart/mixed; boundary="abc"
|
||
|
||
--abc
|
||
Content-Type: text/plain; charset=utf-8
|
||
|
||
test1
|
||
|
||
--abc
|
||
Content-Type: image/jpeg; name="test.jpg"
|
||
Content-Disposition: attachment; filename="test.jpg"
|
||
Content-Transfer-Encoding: base64
|
||
Content-ID: <1>
|
||
X-Attachment-Id: 1
|
||
|
||
dGVzdGltYWdl
|
||
|
||
--abc--'''
|
||
|
||
result = async_to_sync(handler.handle_DATA)(server, session, envelope)
|
||
self.assertEqual(result, '250 Message accepted for delivery')
|
||
self.assertEqual(len(Email.objects.all()), 2)
|
||
self.assertEqual(len(IssueThread.objects.all()), 1)
|
||
aiosmtplib.send.assert_called_once()
|
||
self.assertEqual('test', Email.objects.all()[0].subject)
|
||
self.assertEqual('test1@test', Email.objects.all()[0].sender)
|
||
self.assertEqual('test2@test', Email.objects.all()[0].recipient)
|
||
self.assertEqual('test1\n', Email.objects.all()[0].body)
|
||
self.assertEqual(IssueThread.objects.all()[0], Email.objects.all()[0].issue_thread)
|
||
self.assertEqual('<1@test>', Email.objects.all()[0].reference)
|
||
self.assertEqual(None, Email.objects.all()[0].in_reply_to)
|
||
self.assertEqual(expected_auto_reply_subject.format('test', IssueThread.objects.all()[0].short_uuid()),
|
||
Email.objects.all()[1].subject)
|
||
self.assertEqual('test2@test', Email.objects.all()[1].sender)
|
||
self.assertEqual('test1@test', Email.objects.all()[1].recipient)
|
||
self.assertEqual(expected_auto_reply.format(IssueThread.objects.all()[0].short_uuid()),
|
||
Email.objects.all()[1].body)
|
||
self.assertEqual(IssueThread.objects.all()[0], Email.objects.all()[1].issue_thread)
|
||
self.assertTrue(Email.objects.all()[1].reference.startswith("<"))
|
||
self.assertTrue(Email.objects.all()[1].reference.endswith("@localhost>"))
|
||
self.assertEqual("<1@test>", Email.objects.all()[1].in_reply_to)
|
||
self.assertEqual('test', IssueThread.objects.all()[0].name)
|
||
self.assertEqual('pending_new', IssueThread.objects.all()[0].state)
|
||
self.assertEqual(None, IssueThread.objects.all()[0].assigned_to)
|
||
states = StateChange.objects.filter(issue_thread=IssueThread.objects.all()[0])
|
||
self.assertEqual(1, len(states))
|
||
self.assertEqual('pending_new', states[0].state)
|
||
self.assertEqual(1, len(EmailAttachment.objects.all()))
|
||
self.assertEqual(1, EmailAttachment.objects.all()[0].id)
|
||
self.assertEqual('image/jpeg', EmailAttachment.objects.all()[0].mime_type)
|
||
self.assertEqual('test.jpg', EmailAttachment.objects.all()[0].name)
|
||
file_content = EmailAttachment.objects.all()[0].file.read()
|
||
self.assertEqual(b'testimage', file_content)
|
||
|
||
def test_mail_noreply(self):
|
||
from aiosmtpd.smtp import Envelope
|
||
from asgiref.sync import async_to_sync
|
||
import aiosmtplib
|
||
aiosmtplib.send = make_mocked_coro()
|
||
handler = LMTPHandler()
|
||
server = mock.Mock()
|
||
session = mock.Mock()
|
||
envelope = Envelope()
|
||
envelope.mail_from = 'noreply@test'
|
||
envelope.rcpt_tos = ['test2@test']
|
||
envelope.content = b'Subject: test\nFrom: noreply@test\nTo: test2@test\nMessage-ID: <1@test>\n\ntest'
|
||
result = async_to_sync(handler.handle_DATA)(server, session, envelope)
|
||
self.assertEqual(result, '250 Message accepted for delivery')
|
||
self.assertEqual(len(Email.objects.all()), 1)
|
||
self.assertEqual(len(IssueThread.objects.all()), 1)
|
||
aiosmtplib.send.assert_not_called()
|
||
self.assertEqual('test', Email.objects.all()[0].subject)
|
||
self.assertEqual('noreply@test', Email.objects.all()[0].sender)
|
||
self.assertEqual('test2@test', Email.objects.all()[0].recipient)
|
||
self.assertEqual('test', Email.objects.all()[0].body)
|
||
self.assertEqual(IssueThread.objects.all()[0], Email.objects.all()[0].issue_thread)
|
||
self.assertEqual('<1@test>', Email.objects.all()[0].reference)
|
||
self.assertEqual(None, Email.objects.all()[0].in_reply_to)
|
||
self.assertEqual('test', IssueThread.objects.all()[0].name)
|
||
self.assertEqual('pending_new', IssueThread.objects.all()[0].state)
|
||
self.assertEqual(None, IssueThread.objects.all()[0].assigned_to)
|
||
states = StateChange.objects.filter(issue_thread=IssueThread.objects.all()[0])
|
||
self.assertEqual(1, len(states))
|
||
self.assertEqual('pending_new', states[0].state)
|
||
|
||
def test_mail_empty_subject(self):
|
||
from aiosmtpd.smtp import Envelope
|
||
from asgiref.sync import async_to_sync
|
||
import aiosmtplib
|
||
import logging
|
||
logging.disable(logging.CRITICAL)
|
||
aiosmtplib.send = make_mocked_coro()
|
||
handler = LMTPHandler()
|
||
server = mock.Mock()
|
||
session = mock.Mock()
|
||
envelope = Envelope()
|
||
envelope.mail_from = 'test1@test'
|
||
envelope.rcpt_tos = ['test2@test']
|
||
envelope.content = b'From: noreply@test\nTo: test2@test\nMessage-ID: <1@test>\n\ntest'
|
||
result = async_to_sync(handler.handle_DATA)(server, session, envelope)
|
||
logging.disable(logging.NOTSET)
|
||
self.assertEqual('250 Message accepted for delivery', result)
|
||
self.assertEqual(2, len(Email.objects.all()))
|
||
self.assertEqual(1, len(IssueThread.objects.all()))
|
||
aiosmtplib.send.assert_called_once()
|
||
self.assertEqual('No subject', Email.objects.all()[0].subject)
|
||
self.assertEqual('test1@test', Email.objects.all()[0].sender)
|
||
self.assertEqual('test2@test', Email.objects.all()[0].recipient)
|
||
self.assertEqual('test', Email.objects.all()[0].body)
|
||
self.assertEqual(IssueThread.objects.all()[0], Email.objects.all()[0].issue_thread)
|
||
self.assertEqual('<1@test>', Email.objects.all()[0].reference)
|
||
self.assertEqual(None, Email.objects.all()[0].in_reply_to)
|
||
self.assertEqual(expected_auto_reply_subject.format('No subject', IssueThread.objects.all()[0].short_uuid()),
|
||
Email.objects.all()[1].subject)
|
||
self.assertEqual('test2@test', Email.objects.all()[1].sender)
|
||
self.assertEqual('test1@test', Email.objects.all()[1].recipient)
|
||
self.assertEqual(expected_auto_reply.format(IssueThread.objects.all()[0].short_uuid()),
|
||
Email.objects.all()[1].body)
|
||
self.assertEqual(IssueThread.objects.all()[0], Email.objects.all()[1].issue_thread)
|
||
self.assertTrue(Email.objects.all()[1].reference.startswith("<"))
|
||
self.assertTrue(Email.objects.all()[1].reference.endswith("@localhost>"))
|
||
self.assertEqual("<1@test>", Email.objects.all()[1].in_reply_to)
|
||
self.assertEqual('No subject', IssueThread.objects.all()[0].name)
|
||
self.assertEqual('pending_new', IssueThread.objects.all()[0].state)
|
||
self.assertEqual(None, IssueThread.objects.all()[0].assigned_to)
|
||
states = StateChange.objects.filter(issue_thread=IssueThread.objects.all()[0])
|
||
self.assertEqual(1, len(states))
|
||
self.assertEqual('pending_new', states[0].state)
|
||
|
||
def test_mail_empty_body(self):
|
||
from aiosmtpd.smtp import Envelope
|
||
from asgiref.sync import async_to_sync
|
||
import aiosmtplib
|
||
import logging
|
||
logging.disable(logging.CRITICAL)
|
||
aiosmtplib.send = make_mocked_coro()
|
||
handler = LMTPHandler()
|
||
server = mock.Mock()
|
||
session = mock.Mock()
|
||
envelope = Envelope()
|
||
envelope.mail_from = '<test1@test>'
|
||
envelope.rcpt_tos = ['test2@test']
|
||
envelope.content = b'Subject: test\nFrom: <test1@test>\nTo: test2@test\nMessage-ID: <1@test>\n\n'
|
||
result = async_to_sync(handler.handle_DATA)(server, session, envelope)
|
||
logging.disable(logging.NOTSET)
|
||
self.assertEqual('250 Message accepted for delivery', result)
|
||
self.assertEqual(2, len(Email.objects.all()))
|
||
self.assertEqual(1, len(IssueThread.objects.all()))
|
||
aiosmtplib.send.assert_called_once()
|
||
self.assertEqual('test', Email.objects.all()[0].subject)
|
||
self.assertEqual('<test1@test>', Email.objects.all()[0].sender)
|
||
self.assertEqual('test2@test', Email.objects.all()[0].recipient)
|
||
self.assertEqual('', Email.objects.all()[0].body)
|
||
self.assertEqual(IssueThread.objects.all()[0], Email.objects.all()[0].issue_thread)
|
||
self.assertEqual('<1@test>', Email.objects.all()[0].reference)
|
||
self.assertEqual(None, Email.objects.all()[0].in_reply_to)
|
||
self.assertEqual(expected_auto_reply_subject.format('test', IssueThread.objects.all()[0].short_uuid()),
|
||
Email.objects.all()[1].subject)
|
||
self.assertEqual('test2@test', Email.objects.all()[1].sender)
|
||
self.assertEqual('<test1@test>', Email.objects.all()[1].recipient)
|
||
self.assertEqual(expected_auto_reply.format(IssueThread.objects.all()[0].short_uuid()),
|
||
Email.objects.all()[1].body)
|
||
self.assertEqual(IssueThread.objects.all()[0], Email.objects.all()[1].issue_thread)
|
||
self.assertTrue(Email.objects.all()[1].reference.startswith("<"))
|
||
self.assertTrue(Email.objects.all()[1].reference.endswith("@localhost>"))
|
||
self.assertEqual("<1@test>", Email.objects.all()[1].in_reply_to)
|
||
self.assertEqual('test', IssueThread.objects.all()[0].name)
|
||
self.assertEqual('pending_new', IssueThread.objects.all()[0].state)
|
||
self.assertEqual(None, IssueThread.objects.all()[0].assigned_to)
|
||
states = StateChange.objects.filter(issue_thread=IssueThread.objects.all()[0])
|
||
self.assertEqual(1, len(states))
|
||
self.assertEqual('pending_new', states[0].state)
|
||
|
||
def test_mail_plus_issue_thread(self):
|
||
issue_thread = IssueThread.objects.create(
|
||
name="test",
|
||
)
|
||
mail1 = Email.objects.create(
|
||
subject='test subject',
|
||
body='test',
|
||
sender='test1@test',
|
||
recipient='test2@test',
|
||
issue_thread=issue_thread,
|
||
)
|
||
mail1_reply = Email.objects.create(
|
||
subject='Message received',
|
||
body='Thank you for your message.',
|
||
sender='test2@test',
|
||
recipient='test1@test',
|
||
in_reply_to=mail1.reference,
|
||
issue_thread=issue_thread,
|
||
)
|
||
from aiosmtpd.smtp import Envelope
|
||
from asgiref.sync import async_to_sync
|
||
import aiosmtplib
|
||
import logging
|
||
logging.disable(logging.CRITICAL)
|
||
aiosmtplib.send = make_mocked_coro()
|
||
handler = LMTPHandler()
|
||
server = mock.Mock()
|
||
session = mock.Mock()
|
||
envelope = Envelope()
|
||
envelope.mail_from = '<test1@test>'
|
||
envelope.rcpt_tos = ['ticket+{}@test'.format(issue_thread.uuid)]
|
||
envelope.content = (f'Subject: foo\nFrom: <test3@test>\nTo: ticket+{issue_thread.uuid}@test\n'
|
||
f'Message-ID: <3@test>\n\nbar'.encode('utf-8'))
|
||
result = async_to_sync(handler.handle_DATA)(server, session, envelope)
|
||
logging.disable(logging.NOTSET)
|
||
self.assertEqual('250 Message accepted for delivery', result)
|
||
self.assertEqual(3, len(Email.objects.all()))
|
||
self.assertEqual(3, len(Email.objects.filter(issue_thread=issue_thread)))
|
||
self.assertEqual(1, len(IssueThread.objects.all()))
|
||
aiosmtplib.send.assert_not_called()
|
||
self.assertEqual(Email.objects.all()[2].subject, 'foo')
|
||
self.assertEqual(Email.objects.all()[2].sender, '<test1@test>')
|
||
self.assertEqual(Email.objects.all()[2].recipient, 'ticket+{}@test'.format(issue_thread.uuid))
|
||
self.assertEqual(Email.objects.all()[2].body, 'bar')
|
||
self.assertEqual(Email.objects.all()[2].issue_thread, issue_thread)
|
||
self.assertEqual(Email.objects.all()[2].reference, '<3@test>')
|
||
self.assertEqual('test', IssueThread.objects.all()[0].name)
|