c3lf-system-3/core/mail/tests/v2/test_mails.py
2024-01-07 17:50:16 +01:00

198 lines
8.4 KiB
Python

import inspect
from unittest import mock
from django.test import TestCase, Client
from inventory.models import Event
from mail.models import Email
from mail.protocol import LMTPHandler
from tickets.models import IssueThread
client = Client()
def make_mocked_coro(return_value=mock.sentinel, raise_exception=mock.sentinel):
async def mock_coro(*args, **kwargs):
if raise_exception is not mock.sentinel:
raise raise_exception
if not inspect.isawaitable(return_value):
return return_value
await return_value
return mock.Mock(wraps=mock_coro)
class EmailsApiTest(TestCase):
def test_mails(self):
Event.objects.get_or_create(
name="Test event",
slug="test-event",
)
Email.objects.create(
subject='test',
body='test',
sender='test',
recipient='test',
)
response = client.get('/api/2/mails/')
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.json()), 1)
self.assertEqual(response.json()[0]['subject'], 'test')
self.assertEqual(response.json()[0]['body'], 'test')
self.assertEqual(response.json()[0]['sender'], 'test')
self.assertEqual(response.json()[0]['recipient'], 'test')
def test_mails_empty(self):
response = client.get('/api/2/mails/')
self.assertEqual(response.status_code, 200)
self.assertEqual(response.json(), [])
class LMTPHandlerTestCase(TestCase): # TODO replace with less hacky test
def test_handle_client(self):
from aiosmtpd.smtp import Envelope
from asgiref.sync import async_to_sync
import aiosmtplib
aiosmtplib.send = make_mocked_coro()
handler = LMTPHandler()
server = mock.Mock()
session = mock.Mock()
envelope = Envelope()
envelope.mail_from = 'test1@test'
envelope.rcpt_tos = ['test2@test']
envelope.content = b'Subject: test\nFrom: test3@test\nTo: test4@test\nMessage-ID: <1@test>\n\ntest'
result = async_to_sync(handler.handle_DATA)(server, session, envelope)
self.assertEqual(result, '250 Message accepted for delivery')
self.assertEqual(len(Email.objects.all()), 2)
self.assertEqual(len(IssueThread.objects.all()), 1)
aiosmtplib.send.assert_called_once()
self.assertEqual(Email.objects.all()[0].subject, 'test')
self.assertEqual(Email.objects.all()[0].sender, 'test1@test')
self.assertEqual(Email.objects.all()[0].recipient, 'test2@test')
self.assertEqual(Email.objects.all()[0].body, 'test')
self.assertEqual(Email.objects.all()[0].issue_thread, IssueThread.objects.all()[0])
self.assertEqual(Email.objects.all()[0].reference, '<1@test>')
self.assertEqual(Email.objects.all()[0].in_reply_to, None)
self.assertEqual(Email.objects.all()[1].subject, 'Message received')
self.assertEqual(Email.objects.all()[1].sender, 'test2@test')
self.assertEqual(Email.objects.all()[1].recipient, 'test1@test')
self.assertEqual(Email.objects.all()[1].body, 'Thank you for your message.')
self.assertEqual(Email.objects.all()[1].issue_thread, IssueThread.objects.all()[0])
self.assertTrue(Email.objects.all()[1].reference.startswith("<"))
self.assertTrue(Email.objects.all()[1].reference.endswith("@localhost>"))
self.assertEqual(Email.objects.all()[1].in_reply_to, "<1@test>")
self.assertEqual(IssueThread.objects.all()[0].name, 'test')
self.assertEqual(IssueThread.objects.all()[0].state, 'new')
self.assertEqual(IssueThread.objects.all()[0].assigned_to, None)
def test_handle_client_reply(self):
issue_thread = IssueThread.objects.create(
name="test",
)
mail1 = Email.objects.create(
subject='test subject',
body='test',
sender='test1@test',
recipient='test2@test',
issue_thread=issue_thread,
)
mail1_reply = Email.objects.create(
subject='Message received',
body='Thank you for your message.',
sender='test2@test',
recipient='test1@test',
in_reply_to=mail1.reference,
issue_thread=issue_thread,
)
from aiosmtpd.smtp import Envelope
from asgiref.sync import async_to_sync
import aiosmtplib
aiosmtplib.send = make_mocked_coro()
handler = LMTPHandler()
server = mock.Mock()
session = mock.Mock()
envelope = Envelope()
envelope.mail_from = 'test1@test'
envelope.rcpt_tos = ['test2@test']
# envelope.content = b'Subject: Re: test\nFrom: test3@test\nTo: test4@test\nMessage-ID: 3@test\nIn-Reply-To: 2@localhost\n\ntest'
envelope.content = (f'Subject: Re: test\nFrom: test3@test\nTo: test4@test\nMessage-ID: <3@test>\n'
f'In-Reply-To: {mail1_reply.reference}'.encode('utf-8') + b'\n\ntest')
result = async_to_sync(handler.handle_DATA)(server, session, envelope)
self.assertEqual(result, '250 Message accepted for delivery')
self.assertEqual(len(Email.objects.all()), 3)
self.assertEqual(len(IssueThread.objects.all()), 1)
aiosmtplib.send.assert_not_called()
self.assertEqual(Email.objects.all()[2].subject, 'Re: test')
self.assertEqual(Email.objects.all()[2].sender, 'test1@test')
self.assertEqual(Email.objects.all()[2].recipient, 'test2@test')
self.assertEqual(Email.objects.all()[2].body, 'test')
self.assertEqual(Email.objects.all()[2].issue_thread, issue_thread)
self.assertEqual(Email.objects.all()[2].reference, '<3@test>')
self.assertEqual(Email.objects.all()[2].in_reply_to, mail1_reply.reference)
self.assertEqual(IssueThread.objects.all()[0].name, 'test')
self.assertEqual(IssueThread.objects.all()[0].state, 'new')
self.assertEqual(IssueThread.objects.all()[0].assigned_to, None)
# class AsyncLMTPTestCase(TestCase):
#
# def setUp(self):
# server = mock.Mock()
# self.create_unix_server = make_mocked_coro(server)
# self.wait_closed = make_mocked_coro()
# self.loop = asyncio.new_event_loop()
# self.loop.create_unix_server = self.create_unix_server
# asyncio.set_event_loop(self.loop)
#
# async def test_connect(self):
# server = await UnixSocketLMTPController(LMTPHandler(), unix_socket='lmtp.sock', loop=self.loop).serve()
# self.assertEqual(self.create_unix_server.call_count, 1)
# self.assertEqual(self.wait_closed.call_count, 0)
# server.close()
# # self.assertEqual(self.wait_closed.call_count, 1)
#
# def test_receive_mail(self):
# from logging import getLogger
# from aiosmtpd.lmtp import LMTP
# from asgiref.sync import async_to_sync
# log = getLogger('mail.log')
# log.addHandler(logging.StreamHandler())
# log.setLevel(logging.DEBUG)
# handler = LMTP(LMTPHandler(), loop=self.loop)
# transport = mock.Mock()
# handler.connection_made(transport)
#
# def _handle_client():
# print("Handling client")
# async_to_sync(handler._handle_client)()
# print("Client handled")
#
# thread = threading.Thread(target=_handle_client)
# thread.start()
#
# # handler.data_received(
# # b'HELO test\nMAIL FROM:<test1@test>\nRCPT TO:<test2@test>\nDATA\nSubject: test\nFrom: test1@test\nTo: '
# # b'test2@test\n\ntest\n.\nQUIT')
# handler.data_received(b'HELO test\n')
# handler.data_received(b'MAIL FROM:<test1@test>\n')
# handler.data_received(b'RCPT TO:<test2@test>\n')
# handler.data_received(b'DATA\n')
# handler.data_received(b'Subject: test\n')
# handler.data_received(b'From: test1@test\n')
# handler.data_received(b'To: test2@test\n')
# handler.data_received(b'\n')
# handler.data_received(b'test\n')
# handler.data_received(b'.\n')
# handler.data_received(b'QUIT\n')
#
# thread.join()
#
# handler.connection_lost(None)
# thread.join()
#
# # self.assertEqual(len(Email.objects.all()), 1)
# # self.assertEqual(Email.objects.all()[0].subject, 'test')
# # self.assertEqual(Email.objects.all()[0].body, 'test')
# # self.assertEqual(Email.objects.all()[0].sender, 'test')
# # self.assertEqual(Email.objects.all()[0].recipient, 'test@test')