add more tests for encodings
All checks were successful
/ test (push) Successful in 2m52s

This commit is contained in:
j3d1 2025-01-09 18:39:39 +01:00
parent 3635a55e39
commit 5f0d9b8626
2 changed files with 162 additions and 22 deletions

View file

@ -48,9 +48,15 @@ def unescape_and_decode_base64(s):
return decoded
def unescape_simplified_quoted_printable(s):
def unescape_simplified_quoted_printable(s, encoding='utf-8'):
import quopri
return quopri.decodestring(s).decode('utf-8')
return quopri.decodestring(s).decode(encoding)
def ascii_strip(s):
if not s:
return None
return ''.join([c for c in str(s) if 128 > ord(c) > 31])
def collect_references(issue_thread):
@ -116,6 +122,19 @@ def find_target_event(address):
return None
def decode_email_segment(segment, charset, transfer_encoding):
decode_as = 'utf-8'
if charset == 'windows-1251':
decode_as = 'cp1251'
elif charset == 'iso-8859-1':
decode_as = 'latin1'
segment = unescape_and_decode_quoted_printable(segment)
segment = unescape_and_decode_base64(segment)
if transfer_encoding == 'quoted-printable':
segment = unescape_simplified_quoted_printable(segment, decode_as)
return segment
def parse_email_body(raw, log=None):
import email
from hashlib import sha256
@ -127,9 +146,9 @@ def parse_email_body(raw, log=None):
if parsed.is_multipart():
for part in parsed.walk():
ctype = part.get_content_type()
charset = part.get_content_charset()
cdispo = str(part.get('Content-Disposition'))
if ctype == 'multipart/mixed':
log.debug("Ignoring Multipart %s %s", ctype, cdispo)
# skip any text/plain (txt) attachments
@ -137,14 +156,14 @@ def parse_email_body(raw, log=None):
segment = part.get_payload()
if not segment:
continue
segment = unescape_and_decode_quoted_printable(segment)
segment = unescape_and_decode_base64(segment)
if part.get('Content-Transfer-Encoding') == 'quoted-printable':
segment = unescape_simplified_quoted_printable(segment)
segment = decode_email_segment(segment, charset, part.get('Content-Transfer-Encoding'))
log.debug(segment)
body = body + segment
elif 'attachment' in cdispo or 'inline' in cdispo:
file = ContentFile(part.get_payload(decode=True))
content = part.get_payload(decode=True)
if content is None:
continue
file = ContentFile(content)
chash = sha256(file.read()).hexdigest()
name = part.get_filename()
if name is None:
@ -170,10 +189,7 @@ def parse_email_body(raw, log=None):
else:
log.warning("Unknown content type %s", parsed.get_content_type())
body = "Unknown content type"
body = unescape_and_decode_quoted_printable(body)
body = unescape_and_decode_base64(body)
if parsed.get('Content-Transfer-Encoding') == 'quoted-printable':
body = unescape_simplified_quoted_printable(body)
body = decode_email_segment(body, parsed.get_content_charset(), parsed.get('Content-Transfer-Encoding'))
log.debug(body)
return parsed, body, attachments
@ -185,8 +201,8 @@ def receive_email(envelope, log=None):
header_from = parsed.get('From')
header_to = parsed.get('To')
header_in_reply_to = parsed.get('In-Reply-To')
header_message_id = parsed.get('Message-ID')
header_in_reply_to = ascii_strip(parsed.get('In-Reply-To'))
header_message_id = ascii_strip(parsed.get('Message-ID'))
if match(r'^([a-zA-Z ]*<)?MAILER-DAEMON@', header_from) and envelope.mail_from.strip("<>") == "":
log.warning("Ignoring mailer daemon")
@ -198,7 +214,7 @@ def receive_email(envelope, log=None):
recipient = envelope.rcpt_tos[0].lower() if envelope.rcpt_tos else header_to.lower()
sender = envelope.mail_from if envelope.mail_from else header_from
subject = parsed.get('Subject')
subject = ascii_strip(parsed.get('Subject'))
if not subject:
subject = "No subject"
subject = unescape_and_decode_quoted_printable(subject)
@ -236,7 +252,7 @@ do not create a new request.
Your c3lf (Cloakroom + Lost&Found) Team'''.format(active_issue_thread.short_uuid())
reply_email = Email.objects.create(
sender=recipient, recipient=sender, body=body, subject=subject,
sender=recipient, recipient=sender, body=body, subject=ascii_strip(subject),
in_reply_to=header_message_id, event=target_event, issue_thread=active_issue_thread)
reply = make_reply(reply_email, references, event=target_event.slug if target_event else None)
else: