From 5f0d9b86267667aaf8a58b28c340f46042345dbf Mon Sep 17 00:00:00 2001 From: jedi Date: Thu, 9 Jan 2025 18:39:39 +0100 Subject: [PATCH] add more tests for encodings --- core/mail/protocol.py | 48 +++++++---- core/mail/tests/v2/test_mails.py | 136 +++++++++++++++++++++++++++++-- 2 files changed, 162 insertions(+), 22 deletions(-) diff --git a/core/mail/protocol.py b/core/mail/protocol.py index 36bff20..7fe6942 100644 --- a/core/mail/protocol.py +++ b/core/mail/protocol.py @@ -48,9 +48,15 @@ def unescape_and_decode_base64(s): return decoded -def unescape_simplified_quoted_printable(s): +def unescape_simplified_quoted_printable(s, encoding='utf-8'): import quopri - return quopri.decodestring(s).decode('utf-8') + return quopri.decodestring(s).decode(encoding) + + +def ascii_strip(s): + if not s: + return None + return ''.join([c for c in str(s) if 128 > ord(c) > 31]) def collect_references(issue_thread): @@ -116,6 +122,19 @@ def find_target_event(address): return None +def decode_email_segment(segment, charset, transfer_encoding): + decode_as = 'utf-8' + if charset == 'windows-1251': + decode_as = 'cp1251' + elif charset == 'iso-8859-1': + decode_as = 'latin1' + segment = unescape_and_decode_quoted_printable(segment) + segment = unescape_and_decode_base64(segment) + if transfer_encoding == 'quoted-printable': + segment = unescape_simplified_quoted_printable(segment, decode_as) + return segment + + def parse_email_body(raw, log=None): import email from hashlib import sha256 @@ -127,9 +146,9 @@ def parse_email_body(raw, log=None): if parsed.is_multipart(): for part in parsed.walk(): ctype = part.get_content_type() + charset = part.get_content_charset() cdispo = str(part.get('Content-Disposition')) - if ctype == 'multipart/mixed': log.debug("Ignoring Multipart %s %s", ctype, cdispo) # skip any text/plain (txt) attachments @@ -137,14 +156,14 @@ def parse_email_body(raw, log=None): segment = part.get_payload() if not segment: continue - segment = unescape_and_decode_quoted_printable(segment) - segment = unescape_and_decode_base64(segment) - if part.get('Content-Transfer-Encoding') == 'quoted-printable': - segment = unescape_simplified_quoted_printable(segment) + segment = decode_email_segment(segment, charset, part.get('Content-Transfer-Encoding')) log.debug(segment) body = body + segment elif 'attachment' in cdispo or 'inline' in cdispo: - file = ContentFile(part.get_payload(decode=True)) + content = part.get_payload(decode=True) + if content is None: + continue + file = ContentFile(content) chash = sha256(file.read()).hexdigest() name = part.get_filename() if name is None: @@ -170,10 +189,7 @@ def parse_email_body(raw, log=None): else: log.warning("Unknown content type %s", parsed.get_content_type()) body = "Unknown content type" - body = unescape_and_decode_quoted_printable(body) - body = unescape_and_decode_base64(body) - if parsed.get('Content-Transfer-Encoding') == 'quoted-printable': - body = unescape_simplified_quoted_printable(body) + body = decode_email_segment(body, parsed.get_content_charset(), parsed.get('Content-Transfer-Encoding')) log.debug(body) return parsed, body, attachments @@ -185,8 +201,8 @@ def receive_email(envelope, log=None): header_from = parsed.get('From') header_to = parsed.get('To') - header_in_reply_to = parsed.get('In-Reply-To') - header_message_id = parsed.get('Message-ID') + header_in_reply_to = ascii_strip(parsed.get('In-Reply-To')) + header_message_id = ascii_strip(parsed.get('Message-ID')) if match(r'^([a-zA-Z ]*<)?MAILER-DAEMON@', header_from) and envelope.mail_from.strip("<>") == "": log.warning("Ignoring mailer daemon") @@ -198,7 +214,7 @@ def receive_email(envelope, log=None): recipient = envelope.rcpt_tos[0].lower() if envelope.rcpt_tos else header_to.lower() sender = envelope.mail_from if envelope.mail_from else header_from - subject = parsed.get('Subject') + subject = ascii_strip(parsed.get('Subject')) if not subject: subject = "No subject" subject = unescape_and_decode_quoted_printable(subject) @@ -236,7 +252,7 @@ do not create a new request. Your c3lf (Cloakroom + Lost&Found) Team'''.format(active_issue_thread.short_uuid()) reply_email = Email.objects.create( - sender=recipient, recipient=sender, body=body, subject=subject, + sender=recipient, recipient=sender, body=body, subject=ascii_strip(subject), in_reply_to=header_message_id, event=target_event, issue_thread=active_issue_thread) reply = make_reply(reply_email, references, event=target_event.slug if target_event else None) else: diff --git a/core/mail/tests/v2/test_mails.py b/core/mail/tests/v2/test_mails.py index 0f34c41..455faf1 100644 --- a/core/mail/tests/v2/test_mails.py +++ b/core/mail/tests/v2/test_mails.py @@ -142,7 +142,7 @@ class LMTPHandlerTestCase(TestCase): # TODO replace with less hacky test aiosmtplib.send.assert_called_once() self.assertEqual('test ä', Email.objects.all()[0].subject) self.assertEqual('Text mit Quoted-Printable-Kodierung: äöüß', Email.objects.all()[0].body) - self.assertTrue( Email.objects.all()[0].raw_file.path) + self.assertTrue(Email.objects.all()[0].raw_file.path) def test_handle_quoted_printable_2(self): from aiosmtpd.smtp import Envelope @@ -163,7 +163,7 @@ class LMTPHandlerTestCase(TestCase): # TODO replace with less hacky test aiosmtplib.send.assert_called_once() self.assertEqual('suche_Mütze', Email.objects.all()[0].subject) self.assertEqual('Text mit Quoted-Printable-Kodierung: äöüß', Email.objects.all()[0].body) - self.assertTrue( Email.objects.all()[0].raw_file.path) + self.assertTrue(Email.objects.all()[0].raw_file.path) def test_handle_base64(self): from aiosmtpd.smtp import Envelope @@ -184,7 +184,7 @@ class LMTPHandlerTestCase(TestCase): # TODO replace with less hacky test aiosmtplib.send.assert_called_once() self.assertEqual('test', Email.objects.all()[0].subject) self.assertEqual('Text mit Base64-Kodierung: äöüß', Email.objects.all()[0].body) - self.assertTrue( Email.objects.all()[0].raw_file.path) + self.assertTrue(Email.objects.all()[0].raw_file.path) def test_handle_client_reply(self): issue_thread = IssueThread.objects.create( @@ -232,7 +232,7 @@ class LMTPHandlerTestCase(TestCase): # TODO replace with less hacky test self.assertEqual(IssueThread.objects.all()[0].name, 'test') self.assertEqual(IssueThread.objects.all()[0].state, 'pending_new') self.assertEqual(IssueThread.objects.all()[0].assigned_to, None) - self.assertTrue( Email.objects.all()[2].raw_file.path) + self.assertTrue(Email.objects.all()[2].raw_file.path) def test_handle_client_reply_2(self): issue_thread = IssueThread.objects.create( @@ -285,7 +285,7 @@ class LMTPHandlerTestCase(TestCase): # TODO replace with less hacky test self.assertEqual(IssueThread.objects.all()[0].name, 'test') self.assertEqual(IssueThread.objects.all()[0].state, 'pending_open') self.assertEqual(IssueThread.objects.all()[0].assigned_to, None) - self.assertTrue( Email.objects.all()[2].raw_file.path) + self.assertTrue(Email.objects.all()[2].raw_file.path) def test_mail_reply(self): issue_thread = IssueThread.objects.create( @@ -887,6 +887,59 @@ hello \xe4\xf6\xfc''' self.assertEqual(1, len(states)) self.assertEqual('pending_new', states[0].state) + def test_mail_windows_1252(self): + from aiosmtpd.smtp import Envelope + from asgiref.sync import async_to_sync + import aiosmtplib + + aiosmtplib.send = make_mocked_coro() + + handler = LMTPHandler() + server = mock.Mock() + session = mock.Mock() + envelope = Envelope() + + envelope.mail_from = 'test1@test' + envelope.rcpt_tos = ['test2@test'] + + envelope.content = b'''Subject: test +From: test1@test +To: test2@test +Message-ID: <1@test> +Content-Type: text/html; charset=windows-1252 +Content-Transfer-Encoding: quoted-printable + +=0D=0Ahello=''' + + result = async_to_sync(handler.handle_DATA)(server, session, envelope) + self.assertEqual('250 Message accepted for delivery', result) + self.assertEqual(2, len(Email.objects.all())) + self.assertEqual(1, len(IssueThread.objects.all())) + aiosmtplib.send.assert_called_once() + self.assertEqual('test', Email.objects.all()[0].subject) + self.assertEqual('test1@test', Email.objects.all()[0].sender) + self.assertEqual('test2@test', Email.objects.all()[0].recipient) + self.assertEqual('\r\nhello', Email.objects.all()[0].body) + self.assertEqual(IssueThread.objects.all()[0], Email.objects.all()[0].issue_thread) + self.assertEqual('<1@test>', Email.objects.all()[0].reference) + self.assertEqual(None, Email.objects.all()[0].in_reply_to) + self.assertEqual(expected_auto_reply_subject.format('test', IssueThread.objects.all()[0].short_uuid()), + Email.objects.all()[1].subject) + self.assertEqual('test2@test', Email.objects.all()[1].sender) + self.assertEqual('test1@test', Email.objects.all()[1].recipient) + self.assertEqual(expected_auto_reply.format(IssueThread.objects.all()[0].short_uuid()), + Email.objects.all()[1].body) + self.assertEqual(IssueThread.objects.all()[0], Email.objects.all()[1].issue_thread) + self.assertTrue(Email.objects.all()[1].reference.startswith("<")) + self.assertTrue(Email.objects.all()[1].reference.endswith("@localhost>")) + self.assertEqual("<1@test>", Email.objects.all()[1].in_reply_to) + self.assertEqual('test', IssueThread.objects.all()[0].name) + self.assertEqual('pending_new', IssueThread.objects.all()[0].state) + self.assertEqual(None, IssueThread.objects.all()[0].assigned_to) + states = StateChange.objects.filter(issue_thread=IssueThread.objects.all()[0]) + self.assertEqual(1, len(states)) + self.assertEqual('pending_new', states[0].state) + def test_mail_quoted_printable_transfer_encoding(self): from aiosmtpd.smtp import Envelope from asgiref.sync import async_to_sync @@ -1010,4 +1063,75 @@ dGVzdGltYWdl self.assertEqual('image/jpeg', EmailAttachment.objects.all()[0].mime_type) self.assertEqual('test.jpg', EmailAttachment.objects.all()[0].name) file_content = EmailAttachment.objects.all()[0].file.read() - self.assertEqual(b'testimage', file_content) \ No newline at end of file + self.assertEqual(b'testimage', file_content) + + + def test_text_non_utf8_in_multipart(self): + from aiosmtpd.smtp import Envelope + from asgiref.sync import async_to_sync + import aiosmtplib + + aiosmtplib.send = make_mocked_coro() + + handler = LMTPHandler() + server = mock.Mock() + session = mock.Mock() + envelope = Envelope() + + envelope.mail_from = 'test1@test' + envelope.rcpt_tos = ['test2@test'] + + envelope.content = b'''Subject: test +From: test1@test +To: test2@test +Message-ID: <1@test> +Content-Type: multipart/alternative; boundary="abc" + +--abc +Content-Type: text/plain; charset=utf-8 +Content-Transfer-Encoding: 8bit + +test1 + +--abc +Content-Type: text/plain; charset=iso-8859-1 +Content-Transfer-Encoding: quoted-printable + +hello =E4 + +--abc +Content-Type: text/plain; charset=windows-1252 +Content-Transfer-Encoding: quoted-printable + +=0D=0Ahello + +--abc--''' + + result = async_to_sync(handler.handle_DATA)(server, session, envelope) + self.assertEqual(result, '250 Message accepted for delivery') + self.assertEqual(len(Email.objects.all()), 2) + self.assertEqual(len(IssueThread.objects.all()), 1) + aiosmtplib.send.assert_called_once() + self.assertEqual('test', Email.objects.all()[0].subject) + self.assertEqual('test1@test', Email.objects.all()[0].sender) + self.assertEqual('test2@test', Email.objects.all()[0].recipient) + self.assertEqual('test1\nhello ä\n\r\nhello\n', Email.objects.all()[0].body) + self.assertEqual(IssueThread.objects.all()[0], Email.objects.all()[0].issue_thread) + self.assertEqual('<1@test>', Email.objects.all()[0].reference) + self.assertEqual(None, Email.objects.all()[0].in_reply_to) + self.assertEqual(expected_auto_reply_subject.format('test', IssueThread.objects.all()[0].short_uuid()), + Email.objects.all()[1].subject) + self.assertEqual('test2@test', Email.objects.all()[1].sender) + self.assertEqual('test1@test', Email.objects.all()[1].recipient) + self.assertEqual(expected_auto_reply.format(IssueThread.objects.all()[0].short_uuid()), + Email.objects.all()[1].body) + self.assertEqual(IssueThread.objects.all()[0], Email.objects.all()[1].issue_thread) + self.assertTrue(Email.objects.all()[1].reference.startswith("<")) + self.assertTrue(Email.objects.all()[1].reference.endswith("@localhost>")) + self.assertEqual("<1@test>", Email.objects.all()[1].in_reply_to) + self.assertEqual('test', IssueThread.objects.all()[0].name) + self.assertEqual('pending_new', IssueThread.objects.all()[0].state) + self.assertEqual(None, IssueThread.objects.all()[0].assigned_to) + states = StateChange.objects.filter(issue_thread=IssueThread.objects.all()[0]) + self.assertEqual(1, len(states)) + self.assertEqual('pending_new', states[0].state)