when the 'X-Spam' flag is set in the mail header, set the state to 'pending_suspected_spam' and do not send auto-reply
All checks were successful
/ test (push) Successful in 2m37s

This commit is contained in:
j3d1 2025-02-07 23:34:39 +01:00
parent 9395226c5f
commit 554bc70413
3 changed files with 55 additions and 6 deletions

View file

@ -103,7 +103,7 @@ async def send_smtp(message):
await aiosmtplib.send(message, hostname="127.0.0.1", port=25, use_tls=False, start_tls=False)
def find_active_issue_thread(in_reply_to, address, subject, event):
def find_active_issue_thread(in_reply_to, address, subject, event, spam=False):
from re import match
uuid_match = match(r'^ticket\+([a-f0-9-]{36})@', address)
if uuid_match:
@ -114,7 +114,8 @@ def find_active_issue_thread(in_reply_to, address, subject, event):
if reply_to.exists():
return reply_to.first().issue_thread, False
else:
issue = IssueThread.objects.create(name=subject, event=event)
issue = IssueThread.objects.create(name=subject, event=event,
initial_state='pending_suspected_spam' if spam else 'pending_new')
return issue, True
@ -213,6 +214,8 @@ def receive_email(envelope, log=None):
header_to = parsed.get('To')
header_in_reply_to = ascii_strip(parsed.get('In-Reply-To'))
header_message_id = ascii_strip(parsed.get('Message-ID'))
maybe_spam = parsed.get('X-Spam')
suspected_spam = (maybe_spam and maybe_spam.lower() == 'yes')
if match(r'^([a-zA-Z ]*<)?MAILER-DAEMON@', header_from) and envelope.mail_from.strip("<>") == "":
log.warning("Ignoring mailer daemon")
@ -232,7 +235,8 @@ def receive_email(envelope, log=None):
sender = decode_inline_encodings(sender)
target_event = find_target_event(recipient)
active_issue_thread, new = find_active_issue_thread(header_in_reply_to, recipient, subject, target_event)
active_issue_thread, new = find_active_issue_thread(
header_in_reply_to, recipient, subject, target_event, suspected_spam)
from hashlib import sha256
random_filename = 'mail-' + sha256(envelope.content).hexdigest()
@ -250,7 +254,7 @@ def receive_email(envelope, log=None):
if new:
# auto reply if new issue
references = collect_references(active_issue_thread)
if not sender.startswith('noreply'):
if not sender.startswith('noreply') and not sender.startswith('no-reply') and not suspected_spam:
subject = f"Re: {subject} [#{active_issue_thread.short_uuid()}]"
body = '''Your request (#{}) has been received and will be reviewed by our lost&found angels.

View file

@ -812,6 +812,44 @@ dGVzdGltYWdl
self.assertEqual(None, IssueThread.objects.all()[0].assigned_to)
aiosmtplib.send.assert_called_once()
def test_mail_spam_header(self):
from aiosmtpd.smtp import Envelope
from asgiref.sync import async_to_sync
import aiosmtplib
aiosmtplib.send = make_mocked_coro()
handler = LMTPHandler()
server = mock.Mock()
session = mock.Mock()
envelope = Envelope()
envelope.mail_from = 'test1@test'
envelope.rcpt_tos = ['test2@test']
envelope.content = b'''Subject: test
From: test1@test
To: test2@test
Message-ID: <1@test>
X-Spam: Yes
test'''
result = async_to_sync(handler.handle_DATA)(server, session, envelope)
self.assertEqual(result, '250 Message accepted for delivery')
self.assertEqual(len(Email.objects.all()), 1) # do not send auto reply if spam is suspected
self.assertEqual(len(IssueThread.objects.all()), 1)
aiosmtplib.send.assert_not_called()
self.assertEqual('test', Email.objects.all()[0].subject)
self.assertEqual('test1@test', Email.objects.all()[0].sender)
self.assertEqual('test2@test', Email.objects.all()[0].recipient)
self.assertEqual('test', Email.objects.all()[0].body)
self.assertEqual(IssueThread.objects.all()[0], Email.objects.all()[0].issue_thread)
self.assertEqual('<1@test>', Email.objects.all()[0].reference)
self.assertEqual(None, Email.objects.all()[0].in_reply_to)
self.assertEqual('test', IssueThread.objects.all()[0].name)
self.assertEqual('pending_suspected_spam', IssueThread.objects.all()[0].state)
self.assertEqual(None, IssueThread.objects.all()[0].assigned_to)
states = StateChange.objects.filter(issue_thread=IssueThread.objects.all()[0])
self.assertEqual(1, len(states))
self.assertEqual('pending_suspected_spam', states[0].state)
def test_mail_4byte_unicode_emoji(self):
from aiosmtpd.smtp import Envelope
from asgiref.sync import async_to_sync

View file

@ -16,6 +16,7 @@ STATE_CHOICES = (
('pending_physical_confirmation', 'Needs to be confirmed physically'),
('pending_return', 'Needs to be returned'),
('pending_postponed', 'Postponed'),
('pending_suspected_spam', 'Suspected Spam'),
('waiting_details', 'Waiting for details'),
('waiting_pre_shipping', 'Waiting for Address/Shipping Info'),
('closed_returned', 'Closed: Returned'),
@ -46,6 +47,11 @@ class IssueThread(SoftDeleteModel):
event = models.ForeignKey(Event, null=True, on_delete=models.SET_NULL, related_name='issue_threads')
manually_created = models.BooleanField(default=False)
def __init__(self, *args, **kwargs):
if 'initial_state' in kwargs:
self._initial_state = kwargs.pop('initial_state')
super().__init__(*args, **kwargs)
def short_uuid(self):
return self.uuid[:8]
@ -110,8 +116,9 @@ def set_uuid(sender, instance, **kwargs):
@receiver(post_save, sender=IssueThread)
def create_issue_thread(sender, instance, created, **kwargs):
if created:
StateChange.objects.create(issue_thread=instance, state='pending_new')
if created and instance.state_changes.count() == 0:
initial_state = getattr(instance, '_initial_state', None)
StateChange.objects.create(issue_thread=instance, state=initial_state if initial_state else 'pending_new')
class Comment(models.Model):