191 lines
8.3 KiB
Python
191 lines
8.3 KiB
Python
|
import inspect
|
||
|
from unittest import mock
|
||
|
|
||
|
from django.test import TestCase, Client
|
||
|
from django.contrib.auth.models import Permission
|
||
|
from knox.models import AuthToken
|
||
|
|
||
|
from authentication.models import ExtendedUser
|
||
|
from core.settings import MAIL_DOMAIN
|
||
|
from inventory.models import Event
|
||
|
from mail.models import Email
|
||
|
from mail.protocol import LMTPHandler
|
||
|
from tickets.models import IssueThread
|
||
|
|
||
|
|
||
|
def make_mocked_coro(return_value=mock.sentinel, raise_exception=mock.sentinel):
|
||
|
async def mock_coro(*args, **kwargs):
|
||
|
if raise_exception is not mock.sentinel:
|
||
|
raise raise_exception
|
||
|
if not inspect.isawaitable(return_value):
|
||
|
return return_value
|
||
|
await return_value
|
||
|
|
||
|
return mock.Mock(wraps=mock_coro)
|
||
|
|
||
|
|
||
|
class EmailsApiTest(TestCase):
|
||
|
|
||
|
def setUp(self):
|
||
|
super().setUp()
|
||
|
self.user = ExtendedUser.objects.create_user('testuser', 'test', 'test')
|
||
|
self.user.user_permissions.add(*Permission.objects.all())
|
||
|
self.user.save()
|
||
|
self.token = AuthToken.objects.create(user=self.user)
|
||
|
self.client = Client(headers={'Authorization': 'Token ' + self.token[1]})
|
||
|
|
||
|
def test_mails(self):
|
||
|
Event.objects.get_or_create(
|
||
|
name="Test event",
|
||
|
slug="test-event",
|
||
|
)
|
||
|
Email.objects.create(
|
||
|
subject='test',
|
||
|
body='test',
|
||
|
sender='test',
|
||
|
recipient='test',
|
||
|
)
|
||
|
response = self.client.get('/api/2/mails/')
|
||
|
self.assertEqual(response.status_code, 200)
|
||
|
self.assertEqual(len(response.json()), 1)
|
||
|
self.assertEqual(response.json()[0]['subject'], 'test')
|
||
|
self.assertEqual(response.json()[0]['body'], 'test')
|
||
|
self.assertEqual(response.json()[0]['sender'], 'test')
|
||
|
self.assertEqual(response.json()[0]['recipient'], 'test')
|
||
|
|
||
|
def test_mails_empty(self):
|
||
|
response = self.client.get('/api/2/mails/')
|
||
|
self.assertEqual(response.status_code, 200)
|
||
|
self.assertEqual(response.json(), [])
|
||
|
|
||
|
|
||
|
class LMTPHandlerTestCase(TestCase): # TODO replace with less hacky test
|
||
|
|
||
|
def setUp(self):
|
||
|
super().setUp()
|
||
|
self.user = ExtendedUser.objects.create_user('testuser', 'test', 'test')
|
||
|
self.user.user_permissions.add(*Permission.objects.all())
|
||
|
self.user.save()
|
||
|
self.token = AuthToken.objects.create(user=self.user)
|
||
|
self.client = Client(headers={'Authorization': 'Token ' + self.token[1]})
|
||
|
|
||
|
def test_handle_client(self):
|
||
|
from aiosmtpd.smtp import Envelope
|
||
|
from asgiref.sync import async_to_sync
|
||
|
import aiosmtplib
|
||
|
aiosmtplib.send = make_mocked_coro()
|
||
|
handler = LMTPHandler()
|
||
|
server = mock.Mock()
|
||
|
session = mock.Mock()
|
||
|
envelope = Envelope()
|
||
|
envelope.mail_from = 'test1@test'
|
||
|
envelope.rcpt_tos = ['test2@test']
|
||
|
envelope.content = b'Subject: test\nFrom: test3@test\nTo: test4@test\nMessage-ID: <1@test>\n\ntest'
|
||
|
result = async_to_sync(handler.handle_DATA)(server, session, envelope)
|
||
|
self.assertEqual(result, '250 Message accepted for delivery')
|
||
|
self.assertEqual(len(Email.objects.all()), 2)
|
||
|
self.assertEqual(len(IssueThread.objects.all()), 1)
|
||
|
aiosmtplib.send.assert_called_once()
|
||
|
self.assertEqual(Email.objects.all()[0].subject, 'test')
|
||
|
self.assertEqual(Email.objects.all()[0].sender, 'test1@test')
|
||
|
self.assertEqual(Email.objects.all()[0].recipient, 'test2@test')
|
||
|
self.assertEqual(Email.objects.all()[0].body, 'test')
|
||
|
self.assertEqual(Email.objects.all()[0].issue_thread, IssueThread.objects.all()[0])
|
||
|
self.assertEqual(Email.objects.all()[0].reference, '<1@test>')
|
||
|
self.assertEqual(Email.objects.all()[0].in_reply_to, None)
|
||
|
self.assertEqual(Email.objects.all()[1].subject, 'Message received')
|
||
|
self.assertEqual(Email.objects.all()[1].sender, 'test2@test')
|
||
|
self.assertEqual(Email.objects.all()[1].recipient, 'test1@test')
|
||
|
self.assertEqual(Email.objects.all()[1].body, 'Thank you for your message.')
|
||
|
self.assertEqual(Email.objects.all()[1].issue_thread, IssueThread.objects.all()[0])
|
||
|
self.assertTrue(Email.objects.all()[1].reference.startswith("<"))
|
||
|
self.assertTrue(Email.objects.all()[1].reference.endswith("@localhost>"))
|
||
|
self.assertEqual(Email.objects.all()[1].in_reply_to, "<1@test>")
|
||
|
self.assertEqual(IssueThread.objects.all()[0].name, 'test')
|
||
|
self.assertEqual(IssueThread.objects.all()[0].state, 'new')
|
||
|
self.assertEqual(IssueThread.objects.all()[0].assigned_to, None)
|
||
|
|
||
|
def test_handle_client_reply(self):
|
||
|
issue_thread = IssueThread.objects.create(
|
||
|
name="test",
|
||
|
)
|
||
|
mail1 = Email.objects.create(
|
||
|
subject='test subject',
|
||
|
body='test',
|
||
|
sender='test1@test',
|
||
|
recipient='test2@test',
|
||
|
issue_thread=issue_thread,
|
||
|
)
|
||
|
mail1_reply = Email.objects.create(
|
||
|
subject='Message received',
|
||
|
body='Thank you for your message.',
|
||
|
sender='test2@test',
|
||
|
recipient='test1@test',
|
||
|
in_reply_to=mail1.reference,
|
||
|
issue_thread=issue_thread,
|
||
|
)
|
||
|
from aiosmtpd.smtp import Envelope
|
||
|
from asgiref.sync import async_to_sync
|
||
|
import aiosmtplib
|
||
|
aiosmtplib.send = make_mocked_coro()
|
||
|
handler = LMTPHandler()
|
||
|
server = mock.Mock()
|
||
|
session = mock.Mock()
|
||
|
envelope = Envelope()
|
||
|
envelope.mail_from = 'test1@test'
|
||
|
envelope.rcpt_tos = ['test2@test']
|
||
|
envelope.content = (f'Subject: Re: test\nFrom: test3@test\nTo: test4@test\nMessage-ID: <3@test>\n'
|
||
|
f'In-Reply-To: {mail1_reply.reference}'.encode('utf-8') + b'\n\ntest')
|
||
|
result = async_to_sync(handler.handle_DATA)(server, session, envelope)
|
||
|
self.assertEqual(result, '250 Message accepted for delivery')
|
||
|
self.assertEqual(len(Email.objects.all()), 3)
|
||
|
self.assertEqual(len(IssueThread.objects.all()), 1)
|
||
|
aiosmtplib.send.assert_not_called()
|
||
|
self.assertEqual(Email.objects.all()[2].subject, 'Re: test')
|
||
|
self.assertEqual(Email.objects.all()[2].sender, 'test1@test')
|
||
|
self.assertEqual(Email.objects.all()[2].recipient, 'test2@test')
|
||
|
self.assertEqual(Email.objects.all()[2].body, 'test')
|
||
|
self.assertEqual(Email.objects.all()[2].issue_thread, issue_thread)
|
||
|
self.assertEqual(Email.objects.all()[2].reference, '<3@test>')
|
||
|
self.assertEqual(Email.objects.all()[2].in_reply_to, mail1_reply.reference)
|
||
|
self.assertEqual(IssueThread.objects.all()[0].name, 'test')
|
||
|
self.assertEqual(IssueThread.objects.all()[0].state, 'new')
|
||
|
self.assertEqual(IssueThread.objects.all()[0].assigned_to, None)
|
||
|
|
||
|
def test_mail_reply(self):
|
||
|
issue_thread = IssueThread.objects.create(
|
||
|
name="test",
|
||
|
)
|
||
|
mail1 = Email.objects.create(
|
||
|
subject='test subject',
|
||
|
body='test',
|
||
|
sender='test1@test',
|
||
|
recipient='test2@' + MAIL_DOMAIN,
|
||
|
issue_thread=issue_thread,
|
||
|
)
|
||
|
mail1_reply = Email.objects.create(
|
||
|
subject='Re: test subject',
|
||
|
body='Thank you for your message.',
|
||
|
sender='test2@' + MAIL_DOMAIN,
|
||
|
recipient='test1@test',
|
||
|
in_reply_to=mail1.reference,
|
||
|
issue_thread=issue_thread,
|
||
|
)
|
||
|
import aiosmtplib
|
||
|
aiosmtplib.send = make_mocked_coro()
|
||
|
response = self.client.post(f'/api/2/tickets/{issue_thread.id}/reply/', {
|
||
|
'message': 'test'
|
||
|
})
|
||
|
self.assertEqual(response.status_code, 201)
|
||
|
self.assertEqual(len(Email.objects.all()), 3)
|
||
|
self.assertEqual(len(IssueThread.objects.all()), 1)
|
||
|
aiosmtplib.send.assert_called_once()
|
||
|
self.assertEqual(Email.objects.all()[2].subject, 'Re: test subject')
|
||
|
self.assertEqual(Email.objects.all()[2].sender, 'test2@' + MAIL_DOMAIN)
|
||
|
self.assertEqual(Email.objects.all()[2].recipient, 'test1@test')
|
||
|
self.assertEqual(Email.objects.all()[2].body, 'test')
|
||
|
self.assertEqual(Email.objects.all()[2].issue_thread, issue_thread)
|
||
|
self.assertTrue(Email.objects.all()[2].reference.startswith("<"))
|
||
|
self.assertTrue(Email.objects.all()[2].reference.endswith("@localhost>"))
|
||
|
self.assertEqual(Email.objects.all()[2].in_reply_to, mail1.reference)
|