Compare commits

..

140 commits

Author SHA1 Message Date
b6b29d9fed update admin interface
All checks were successful
/ test (push) Successful in 52s
/ deploy (push) Successful in 4m37s
2024-11-24 16:28:14 +01:00
61c4beda47 add timeline information to the /items endpoint
All checks were successful
/ test (push) Successful in 55s
/ deploy (push) Successful in 3m40s
2024-11-23 16:58:55 +01:00
e8887fee8b add frontend to edit event details
All checks were successful
/ test (push) Successful in 51s
/ deploy (push) Successful in 4m37s
2024-11-23 01:12:45 +01:00
8786f4b845 ensure creation file date
All checks were successful
/ test (push) Successful in 49s
/ deploy (push) Successful in 5m5s
2024-11-23 00:59:45 +01:00
0eaff2266c fix untested error in /tickets endpoint
All checks were successful
/ test (push) Successful in 48s
/ deploy (push) Successful in 4m45s
2024-11-21 22:37:49 +01:00
81a0959547 Speed up sql
All checks were successful
/ test (pull_request) Successful in 48s
/ test (push) Successful in 49s
/ deploy (push) Successful in 5m13s
2024-11-21 21:26:16 +01:00
fdf5ab8ad1 implement simple backend search for items and tickets
All checks were successful
/ test (push) Successful in 51s
/ deploy (push) Successful in 4m36s
2024-11-21 01:45:47 +01:00
f2647f0dbd add /matches endpoint ad return match information in tickets and /item endpoints
All checks were successful
/ test (push) Successful in 52s
/ deploy (push) Successful in 4m42s
2024-11-21 00:59:03 +01:00
26255fadec drop v1 API and rename id columns
All checks were successful
/ test (push) Successful in 51s
2024-11-20 02:39:49 +01:00
2fce260ba8 test on every push
All checks were successful
/ test (push) Successful in 50s
/ deploy (push) Successful in 4m41s
2024-11-18 02:17:30 +01:00
c9d58191b3 show tickets filtered by active event 2024-11-18 02:17:30 +01:00
41b71bd51a partition tickets by event 2024-11-18 02:17:30 +01:00
d73bebd5de match incoming mail to event 2024-11-18 02:16:55 +01:00
2c609427ec add simple issue templates
All checks were successful
/ test (push) Successful in 54s
/ deploy (push) Successful in 4m55s
2024-11-13 23:13:13 +01:00
120507512d deploy: Simple protection for metrics endpoint
All checks were successful
/ test (pull_request) Successful in 52s
/ test (push) Successful in 52s
/ deploy (push) Successful in 3m45s
2024-11-13 18:15:00 +00:00
63d6b7a5a8 cicd: run on every pull request, but only deploy on testing
All checks were successful
/ test (pull_request) Successful in 52s
/ test (push) Successful in 54s
/ deploy (push) Successful in 4m45s
2024-11-12 17:05:03 +01:00
5ba4085e60 cicd: Run tests automatically
All checks were successful
/ test (push) Successful in 54s
/ deploy (push) Successful in 4m41s
2024-11-11 22:10:35 +01:00
be02a3e163 cicd: Deploy testing automatically
All checks were successful
/ deploy (push) Successful in 4m56s
2024-11-11 19:37:37 +00:00
444c2de16c change the ticket.state in the backend too 2024-11-10 19:01:45 +00:00
8831f67f00 ticket state changes to pending_open on first view 2024-11-10 19:01:45 +00:00
5a6349c5d3 train spam on state change to 'closed_spam' 2024-11-09 02:58:21 +01:00
a6a8b0defe add functions to train mails as spam/ham 2024-11-09 00:03:21 +01:00
4272aab643 save raw_mails as file 2024-11-08 22:54:57 +01:00
0c4995db2b add docker env for integration testing 2024-11-08 20:09:51 +01:00
269f02c2ce Add django standard metrics 2024-11-07 19:51:31 +01:00
b9cfdf5456 fixed race contidion on ticket view loading 2024-11-06 21:26:01 +01:00
0f8462dc7c enforce startup order in docker-compose.yml 2024-11-06 21:13:44 +01:00
242066ada4 Add dev environment using docker 2024-11-06 00:51:25 +01:00
d13687a910 cleanup admin 'dashboard' 2024-11-06 00:26:56 +01:00
f44da341b4 add event mail addresses to the /events endpoints 2024-11-06 00:08:52 +01:00
55cef1128e extract the search box into its own component 2024-11-05 23:36:05 +01:00
6e38ff7ac7 use an 'AsyncButton' that waits for completion of an async onClick handler for mail replies and comments in tickets 2024-11-05 23:32:53 +01:00
3a8fa8cdcf show an animation to signify that the page is still loading 2024-11-05 23:25:44 +01:00
767d34f8b7 state button has the state pre selected 2024-11-03 21:30:08 +01:00
dffd3531fa add form to create new event 2024-11-02 22:12:00 +01:00
f9409bb823 fix related item section in ticket view 2024-06-26 20:53:19 +02:00
2ece0cefd8 add links between items and tickets /tickets endpoint 2024-06-23 04:23:48 +02:00
d8be7f09e4 add frontend to manage shipping vouchers 2024-06-23 02:58:31 +02:00
f11758607e add /shipping_vouchers endpoint 2024-06-23 02:52:15 +02:00
2f354130da add timeline nodes for linked items and shipping vouchers 2024-06-23 01:42:10 +02:00
a59509a432 always use get_model in migrations 2024-06-23 01:33:24 +02:00
7d1786f143 refactor inventory serializers 2024-06-23 01:28:21 +02:00
4152034e4a better email error logging and some pretty printing for admin interface 2024-06-23 01:23:34 +02:00
67375bd281 add shared-state-plugin 2024-06-23 01:04:32 +02:00
e91b64ca97 software caching thumbnails in javascript 2024-06-22 22:11:38 +02:00
facefc1cc7 add persistent-state-plugin 2024-06-22 20:56:51 +02:00
575d43acbd add rspamd spamfilter to ansible 2024-06-22 19:51:16 +02:00
bb71c44aa7 migrate to vue 3 2024-06-22 19:50:05 +02:00
bb07a6b641 handle 'quoted-printable' shorthand version as Transfer-Encoding 2024-04-24 19:38:52 +02:00
49d3b02b9c also handle other text encodings than UTF8 2024-04-22 20:15:12 +02:00
b3c2233454 remove use of v1 api in item edit modal, which caused a prompt for http basic auth 2024-01-23 19:05:20 +01:00
41d983ccbb encode open expanded section of /tickets view in url 2024-01-23 19:04:07 +01:00
79cfbdbe2f fix logout button 2024-01-23 19:03:02 +01:00
e605292bf0 make tickets assignable to users 2024-01-22 18:29:39 +01:00
4be8109753 'Back' button in ticket view doesn't clear url query parameter anymore 2024-01-19 21:42:05 +01:00
26ce3e23a4 add more ticket state options 2024-01-19 21:18:50 +01:00
b28bd7b23b add alternative layout for /tickets page 2024-01-19 20:50:36 +01:00
a3f6a96f95 fix database problem with 4 byte utf-8 2024-01-16 00:43:51 +01:00
5e1890e990 handle html only email 2024-01-16 00:42:26 +01:00
892493a300 use first mail in tickets instead of last to choose reply address. mitigates problem with shipping confirmation mails 2024-01-15 22:04:59 +01:00
7e3a151ead frontend use Lightbox to enlarge mail attachments 2024-01-15 20:52:11 +01:00
027cf4fca4 show sender and recipient addresses in ticket view 2024-01-14 19:02:01 +01:00
04f774404a show mail attachments in frontend 2024-01-14 18:21:16 +01:00
d1626d1777 add text field to add comment to ticket 2024-01-13 20:31:46 +01:00
5a1de437b6 add button to copy shipping contact email to clipboard 2024-01-13 20:31:46 +01:00
d6df034ad0 quickfix for "Reply-To is noreply+None@domain.tld" 2024-01-13 03:31:17 +01:00
9269f2ec48 automatically attach mails addressed to ticket+<uuid>@domain.tld to ticket with matching uuid 2024-01-13 00:00:19 +01:00
5a1cfedd56 send more informative auto reply 2024-01-12 23:12:18 +01:00
f7002c5548 use uuid in tickets 2024-01-12 23:02:30 +01:00
4664d6255d handle empty Subject and empty body in incoming mails 2024-01-12 21:19:14 +01:00
c79b3185e5 do not send an auto reply to 'noreply*' addresses 2024-01-12 20:58:37 +01:00
1804939407 make the 'last_activity' field in /tickets show the actual last activity by making it a virtual field 2024-01-10 22:02:00 +01:00
9aeb6a319f set state to 'open' when receiving reply on a ticket that is not in state 'new' 2024-01-10 19:03:40 +01:00
734af10525 parse and save email attachments 2024-01-10 16:22:29 +01:00
f9a95317a2 restore session from localstorage 2024-01-07 23:03:22 +01:00
54e5ba6e8e add permissions in ticket models 2024-01-07 22:29:37 +01:00
2e29b8b046 add virtual item field 'returned' in item 2024-01-07 22:11:13 +01:00
83565d4b2a fix ticket link in overview 2024-01-07 21:56:40 +01:00
3a5f35fa5d format tickets in Tickets.vue 2024-01-07 21:56:40 +01:00
6dad675d1e change ticket reply box to textarea 2024-01-07 21:56:40 +01:00
7b77c183fb don't save ticket state in multiple locations 2024-01-07 21:56:40 +01:00
fd7847993b fix async bug in mailadress target event lookup 2024-01-07 21:56:40 +01:00
68812fe6b9 fix case sensitivity bug in mail domain lookup 2024-01-07 21:56:40 +01:00
b4687a8821 convert Quoted Printable email subject and body to utf-8 2024-01-07 21:56:35 +01:00
07213bd421 delete box button works, with
\confirmation
2024-01-07 21:54:57 +01:00
affd6f6c86 timeline icon colors for ticket
\according to state
2024-01-07 21:54:57 +01:00
54a8445cd4 refactor: use slot name 'actions' in Table component 2024-01-07 21:54:57 +01:00
a458d4f850 ask for confirmation before returning or deleting items 2024-01-07 21:54:57 +01:00
c2e73afb35 add admin panel for boxes 2024-01-07 21:54:54 +01:00
fe9795d147 add migrations for state change 2024-01-07 21:53:43 +01:00
626c9f23fe add dropdown selection to change state of tickets 2024-01-07 21:53:43 +01:00
515648ffa8 add import for authenticatedImage in Lightbox.vue 2024-01-07 21:53:43 +01:00
2ba96c8b16 enforce active user in router 2024-01-07 21:45:31 +01:00
8c85aa4fe4 create admin panel views 2024-01-07 21:45:03 +01:00
48b2752a1e add modal to manually create tickets 2024-01-07 21:44:22 +01:00
ad7528fe36 add more Icons 2024-01-07 21:43:28 +01:00
e4188df18e disable Toasts for socket errors 2024-01-07 21:42:23 +01:00
c5023202fc use AuthenticatedImage component in Items view 2024-01-07 21:41:40 +01:00
21ec29caa8 update vuex store to use API v2 2024-01-07 21:38:25 +01:00
0ebfe3adfb add ticket views 2024-01-07 21:32:42 +01:00
7f546ed13e add Login and Register views 2024-01-07 21:32:10 +01:00
fac00735ad frontend: add ticket timeline component 2024-01-07 21:30:47 +01:00
b1a8702932 add API endpoint for mails and tests 2024-01-07 21:27:22 +01:00
cbc27b143f tickets module: api_v2, admin views and tests 2024-01-07 21:23:29 +01:00
434dfe807e add notify_sessions module 2024-01-07 21:16:34 +01:00
fc05c7c1d8 create inventory API v2 und update tests 2024-01-07 21:14:28 +01:00
77828295f8 create file API v2 and X-Accel-Redirect endpoint for images 2024-01-07 21:10:40 +01:00
ab5e8f36d1 add API endpoints and admin views for authentication module 2024-01-07 20:48:31 +01:00
e5cd901b76 add mail module 2024-01-07 20:42:42 +01:00
649fff80f8 add tickets module 2024-01-07 20:42:00 +01:00
6abf3af6c0 add item match permission 2024-01-07 19:53:25 +01:00
0438eb0b6a add custom authentication and know in settings.py 2024-01-07 17:53:52 +01:00
0f911589ca use custom user and permission model 2024-01-07 17:45:42 +01:00
258065eec3 fix initial nginx ssl deployment in ansible playbook 2024-01-07 17:40:09 +01:00
8f7c037606 add requirements.dev.txt 2024-01-07 17:26:20 +01:00
8399b7e125 fix typo 2024-01-07 16:32:58 +01:00
9e1d570b07 move event loop to globals.py 2023-11-27 20:23:21 +01:00
101fa7b69d add /ws/2/notify/ socket 2023-11-27 01:48:48 +01:00
97503e91e0 update requirements.prod.txt 2023-11-27 01:34:58 +01:00
9f63414ba2 make indentation consistent 2023-11-27 01:14:52 +01:00
d52575aa42 experimental mail transport 2023-11-24 03:53:05 +01:00
e43d4837c3 temporary fix for websockets 2023-11-23 20:21:28 +01:00
e45a1f271e disable http auth for websockets 2023-11-22 22:47:35 +01:00
c88d434f39 connect to /notify WebSocket from frontend on app startup 2023-11-22 21:57:14 +01:00
6b3cc4c168 switch from WSGI using uWSGI to ASGI using custom event loop based on uvicorn 2023-11-22 21:41:49 +01:00
b103205dfe only use jpeg for thumbnails 2023-11-20 21:34:48 +01:00
153d79f126 serve images via X-Sendfile 2023-11-20 21:08:01 +01:00
27589a09bd fix embedded file upload 2023-11-20 20:29:46 +01:00
aa0bb9fd0d remove trailing slash requirement in api v1 2023-11-20 16:46:58 +01:00
e2a2e3e117 make container and item SoftDeleteModel 2023-11-20 12:33:40 +01:00
04581b66b6 return 404 when events doesn't exist 2023-11-20 07:02:14 +01:00
dedf98a12e do not check permissions in api v1 as they are checked by nginx already 2023-11-20 06:48:39 +01:00
7369db8512 add /api url prefix in backend 2023-11-19 01:55:19 +01:00
e5bec44164 bump versions in package.json 2023-11-18 23:40:38 +01:00
f2720e4fb2 load MEDIA_ROOT AND STATIC_ROOT from .env 2023-11-18 21:13:41 +01:00
a340154cd9 add public domain to ALLOWED_HOSTS 2023-11-18 19:57:32 +01:00
ac6eade412 read database settings from .env file 2023-11-18 17:46:54 +01:00
618ede273d unpin some python requirements 2023-11-18 16:49:41 +01:00
1f41b81b8f add django backend in /core
ported from laravel in c3lf/lfbackend repo
2023-11-18 12:57:50 +01:00
dd75c2b0d6 move frontend to /web 2023-11-18 12:51:24 +01:00
223 changed files with 12671 additions and 22082 deletions

View file

@ -1,7 +0,0 @@
root = true
[*.js]
indent_size = 2
[*.vue]
indent_size = 2

View file

@ -0,0 +1,35 @@
name: Bug Report
about: File a bug report
labels:
- Kind/Bug
body:
- type: markdown
attributes:
value: |
Thanks for taking the time to fill out this bug report!
- type: textarea
id: what-happened
attributes:
label: What happened?
description: Also tell us, what did you expect to happen?
placeholder: Tell us what you see!
validations:
required: true
- type: dropdown
id: browsers
attributes:
label: What browsers are you seeing the problem on?
multiple: true
options:
- Firefox (Windows)
- Firefox (MacOS)
- Firefox (Linux)
- Firefox (Android)
- Firefox (iOS)
- Chrome (Windows)
- Chrome (MacOS)
- Chrome (Linux)
- Chrome (Android)
- Chrome (iOS)
- Safari
- Microsoft Edge

View file

@ -0,0 +1,27 @@
name: 'New Feature'
about: 'This template is for new features'
labels:
- Kind/Feature
body:
- type: markdown
attributes:
value: |
Before creating a Feature Ticket, please check for duplicates.
- type: markdown
attributes:
value: |
### Implementation Checklist
- [ ] concept
- [ ] frontend
- [ ] backend
- [ ] unittests
- [ ] tested on staging
visible: [ content ]
- type: textarea
id: description
attributes:
label: 'Feature Description'
description: 'Explain the the feature.'
placeholder: Description
validations:
required: true

View file

@ -0,0 +1,60 @@
on:
push:
branches:
- testing
jobs:
test:
runs-on: docker
container:
image: ghcr.io/catthehacker/ubuntu:act-22.04
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: '3.11'
cache-dependency-path: '**/requirements.dev.txt'
- name: Install dependencies
working-directory: core
run: pip3 install -r requirements.dev.txt
- name: Run django tests
working-directory: core
run: python3 manage.py test
deploy:
needs: [test]
runs-on: docker
steps:
- uses: actions/checkout@v4
- name: Install ansible
run: |
apt update -y
apt install python3-pip -y
python3 -m pip install ansible
python3 -m pip install ansible-lint
- name: Populate relevant files
run: |
mkdir ~/.ssh
echo "${{ secrets.C3LF_SSH_TESTING }}" > ~/.ssh/id_ed25519
chmod 0600 ~/.ssh/id_ed25519
ls -lah ~/.ssh
command -v ssh-agent >/dev/null || ( apt-get update -y && apt-get install openssh-client -y )
eval $(ssh-agent -s)
ssh-add ~/.ssh/id_ed25519
echo "andromeda.lab.or.it ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIDXPoO0PE+B9PYwbGaLo98zhbmjAkp6eBtVeZe43v/+T" >> ~/.ssh/known_hosts
mkdir /etc/ansible
echo "${{ secrets.C3LF_INVENTORY_TESTING }}" > /etc/ansible/hosts
- name: Check ansible version
run: |
ansible --version
- name: List ansible hosts
run: |
ansible -m ping Andromeda
- name: Deploy testing
run: |
cd deploy/ansible
ansible-playbook playbooks/deploy-c3lf-sys3.yml

View file

@ -0,0 +1,21 @@
on:
pull_request:
push:
jobs:
test:
runs-on: docker
container:
image: ghcr.io/catthehacker/ubuntu:act-22.04
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: '3.11'
cache-dependency-path: '**/requirements.dev.txt'
- name: Install dependencies
working-directory: core
run: pip3 install -r requirements.dev.txt
- name: Run django tests
working-directory: core
run: python3 manage.py test

29
.gitignore vendored
View file

@ -1,27 +1,8 @@
.DS_Store
node_modules
/dist
# local env files /.idea
.env .env
.env.local .local
.env.*.local
# Log files staticfiles/
npm-debug.log* userfiles/
yarn-debug.log* *.db
yarn-error.log*
yarn.lock
# Editor directories and files
.idea
.vscode
*.suo
*.ntvs*
*.njsproj
*.sln
*.sw?
src/store/auth.js
src/config.js

View file

@ -1,5 +0,0 @@
module.exports = {
presets: [
'@vue/cli-plugin-babel/preset'
]
};

14
core/.coveragerc Normal file
View file

@ -0,0 +1,14 @@
[run]
source = .
[report]
fail_under = 100
show_missing = True
skip_covered = True
omit =
*/tests/*
*/migrations/*
core/asgi.py
core/wsgi.py
core/settings.py
manage.py

129
core/.gitignore vendored Normal file
View file

@ -0,0 +1,129 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
.python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/

View file

View file

@ -0,0 +1,14 @@
from django.contrib import admin
from django.contrib.auth.admin import UserAdmin
from authentication.models import ExtendedUser
class ExtendedUserAdmin(UserAdmin):
list_display = ('username', 'email', 'first_name', 'last_name', 'is_staff', 'is_superuser')
search_fields = ('username', 'email', 'first_name', 'last_name')
ordering = ('username',)
filter_horizontal = ('groups', 'user_permissions', 'permissions')
admin.site.register(ExtendedUser, ExtendedUserAdmin)

View file

@ -0,0 +1,89 @@
from rest_framework import routers, viewsets, serializers, permissions
from rest_framework.decorators import api_view, permission_classes, authentication_classes
from rest_framework.authtoken.serializers import AuthTokenSerializer
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from django.contrib.auth import login
from django.urls import path
from django.dispatch import receiver
from django.db.models.signals import post_save
from django.contrib.auth.models import Group
from knox.models import AuthToken
from knox.views import LoginView as KnoxLoginView
from authentication.models import ExtendedUser
from authentication.serializers import UserSerializer, GroupSerializer
class UserViewSet(viewsets.ModelViewSet):
queryset = ExtendedUser.objects.all()
serializer_class = UserSerializer
class GroupViewSet(viewsets.ModelViewSet):
queryset = Group.objects.all()
serializer_class = GroupSerializer
@receiver(post_save, sender=ExtendedUser)
def create_auth_token(sender, instance=None, created=False, **kwargs):
if created:
AuthToken.objects.create(user=instance)
@api_view(['GET'])
@permission_classes([IsAuthenticated])
def selfUser(request):
serializer = UserSerializer(request.user)
return Response(serializer.data, status=200)
@api_view(['POST'])
@permission_classes([])
@authentication_classes([])
def registerUser(request):
try:
username = request.data.get('username')
password = request.data.get('password')
email = request.data.get('email')
errors = {}
if not username:
errors['username'] = 'Username is required'
if not password:
errors['password'] = 'Password is required'
if not email:
errors['email'] = 'Email is required'
if ExtendedUser.objects.filter(email=email).exists():
errors['email'] = 'Email already exists'
if ExtendedUser.objects.filter(username=username).exists():
errors['username'] = 'Username already exists'
if errors:
return Response({'errors': errors}, status=400)
user = ExtendedUser.objects.create_user(username, email, password)
return Response({'username': user.username, 'email': user.email}, status=201)
except Exception as e:
return Response({'errors': str(e)}, status=400)
class LoginView(KnoxLoginView):
permission_classes = (permissions.AllowAny,)
authentication_classes = ()
def post(self, request, format=None):
serializer = AuthTokenSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
user = serializer.validated_data['user']
login(request, user)
return super(LoginView, self).post(request, format=None)
router = routers.SimpleRouter()
router.register(r'users', UserViewSet, basename='users')
router.register(r'groups', GroupViewSet, basename='groups')
urlpatterns = router.urls + [
path('self/', selfUser),
path('login/', LoginView.as_view()),
path('register/', registerUser),
]

View file

@ -0,0 +1,67 @@
# Generated by Django 4.2.7 on 2023-12-11 21:10
from django.conf import settings
import django.contrib.auth.models
import django.contrib.auth.validators
from django.db import migrations, models
import django.db.models.deletion
import django.utils.timezone
class Migration(migrations.Migration):
initial = True
dependencies = [
('auth', '0012_alter_user_first_name_max_length'),
('inventory', '0001_initial'),
]
operations = [
migrations.CreateModel(
name='ExtendedUser',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('password', models.CharField(max_length=128, verbose_name='password')),
('last_login', models.DateTimeField(blank=True, null=True, verbose_name='last login')),
('is_superuser', models.BooleanField(default=False, help_text='Designates that this user has all permissions without explicitly assigning them.', verbose_name='superuser status')),
('username', models.CharField(error_messages={'unique': 'A user with that username already exists.'}, help_text='Required. 150 characters or fewer. Letters, digits and @/./+/-/_ only.', max_length=150, unique=True, validators=[django.contrib.auth.validators.UnicodeUsernameValidator()], verbose_name='username')),
('first_name', models.CharField(blank=True, max_length=150, verbose_name='first name')),
('last_name', models.CharField(blank=True, max_length=150, verbose_name='last name')),
('email', models.EmailField(blank=True, max_length=254, verbose_name='email address')),
('is_staff', models.BooleanField(default=False, help_text='Designates whether the user can log into this admin site.', verbose_name='staff status')),
('is_active', models.BooleanField(default=True, help_text='Designates whether this user should be treated as active. Unselect this instead of deleting accounts.', verbose_name='active')),
('date_joined', models.DateTimeField(default=django.utils.timezone.now, verbose_name='date joined')),
('groups', models.ManyToManyField(blank=True, help_text='The groups this user belongs to. A user will get all permissions granted to each of their groups.', related_name='user_set', related_query_name='user', to='auth.group', verbose_name='groups')),
],
options={
'verbose_name': 'Extended user',
'verbose_name_plural': 'Extended users',
},
managers=[
('objects', django.contrib.auth.models.UserManager()),
],
),
migrations.CreateModel(
name='EventPermission',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('event', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='inventory.event')),
('permission', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='auth.permission')),
('user', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL)),
],
options={
'unique_together': {('user', 'permission', 'event')},
},
),
migrations.AddField(
model_name='extendeduser',
name='permissions',
field=models.ManyToManyField(through='authentication.EventPermission', to='auth.permission'),
),
migrations.AddField(
model_name='extendeduser',
name='user_permissions',
field=models.ManyToManyField(blank=True, help_text='Specific permissions for this user.', related_name='user_set', related_query_name='user', to='auth.permission', verbose_name='user permissions'),
),
]

View file

@ -0,0 +1,45 @@
# Generated by Django 4.2.7 on 2023-12-11 21:11
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
dependencies = [
('inventory', '0001_initial'),
('knox', '0008_remove_authtoken_salt'),
('authentication', '0001_initial'),
]
operations = [
migrations.CreateModel(
name='AuthTokenEventPermissions',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('event', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='inventory.event')),
('permission', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='authentication.eventpermission')),
],
),
migrations.CreateModel(
name='ExtendedAuthToken',
fields=[
('authtoken_ptr', models.OneToOneField(auto_created=True, on_delete=django.db.models.deletion.CASCADE, parent_link=True, primary_key=True, serialize=False, to='knox.authtoken')),
('permissions', models.ManyToManyField(through='authentication.AuthTokenEventPermissions', to='authentication.eventpermission')),
],
options={
'verbose_name': 'Extended auth token',
'verbose_name_plural': 'Extended auth tokens',
},
bases=('knox.authtoken',),
),
migrations.AddField(
model_name='authtokeneventpermissions',
name='token',
field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='authentication.extendedauthtoken'),
),
migrations.AlterUniqueTogether(
name='authtokeneventpermissions',
unique_together={('token', 'permission', 'event')},
),
]

View file

@ -0,0 +1,33 @@
# Generated by Django 4.2.7 on 2023-11-26 00:16
from django.conf import settings
from django.db import migrations
from django.contrib.auth.models import Permission, Group
class Migration(migrations.Migration):
dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
('authentication', '0002_authtokeneventpermissions_extendedauthtoken_and_more'),
('tickets', '0001_initial'),
]
def create_groups(apps, schema_editor):
admins = Group.objects.create(name='Admin')
orga = Group.objects.create(name='Orga')
team = Group.objects.create(name='Team')
users = Group.objects.create(name='User')
admins.permissions.add(*Permission.objects.all())
users.permissions.add(*Permission.objects.filter(codename__in=
['view_item', 'add_item', 'change_item', 'match_item']))
team.permissions.add(*Permission.objects.filter(codename__in=
['delete_item', 'view_issuethread', 'add_issuethread',
'change_issuethread', 'delete_issuethread', 'send_mail']),
*users.permissions.all())
orga.permissions.add(*Permission.objects.filter(codename__in=['add_event']),
*team.permissions.all())
operations = [
migrations.RunPython(create_groups),
]

View file

@ -0,0 +1,18 @@
from django.conf import settings
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
('authentication', '0003_groups'),
]
def create_legacy_user(apps, schema_editor):
ExtendedUser = apps.get_model('authentication', 'ExtendedUser')
ExtendedUser.objects.create_user(settings.LEGACY_USER_NAME, 'mail@' + settings.MAIL_DOMAIN,
settings.LEGACY_USER_PASSWORD)
operations = [
migrations.RunPython(create_legacy_user)
]

View file

@ -0,0 +1,26 @@
# Generated by Django 4.2.7 on 2023-12-13 16:28
from django.conf import settings
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
dependencies = [
('inventory', '0001_initial'),
('authentication', '0004_legacy_user'),
]
operations = [
migrations.AlterField(
model_name='eventpermission',
name='event',
field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, to='inventory.event'),
),
migrations.AlterField(
model_name='eventpermission',
name='user',
field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='event_permissions', to=settings.AUTH_USER_MODEL),
),
]

View file

@ -0,0 +1,62 @@
from django.db import models
from django.contrib.auth.models import Permission, AbstractUser
from knox.models import AuthToken
from inventory.models import Event
class ExtendedUser(AbstractUser):
permissions = models.ManyToManyField(Permission, through='EventPermission', through_fields=('user', 'permission'))
class Meta:
verbose_name = 'Extended user'
verbose_name_plural = 'Extended users'
def get_permissions(self):
if self.is_superuser:
for permission in Permission.objects.all():
yield "*:" + permission.codename
for permission in self.user_permissions.all():
yield "*:" + permission.codename
for group in self.groups.all():
for permission in group.permissions.all():
yield "*:" + permission.codename
for permission in self.event_permissions.all():
yield permission.event.slug + ":" + permission.permission.codename
def has_event_perm(self, event, permission):
if self.is_superuser:
return True
permissions = set(self.get_permissions())
if "*:" + permission in permissions:
return True
if event.slug + ":" + permission in permissions:
return True
return False
class ExtendedAuthToken(AuthToken):
permissions = models.ManyToManyField('EventPermission', through='AuthTokenEventPermissions',
through_fields=('token', 'permission'))
class Meta:
verbose_name = 'Extended auth token'
verbose_name_plural = 'Extended auth tokens'
class EventPermission(models.Model):
user = models.ForeignKey(ExtendedUser, on_delete=models.CASCADE, related_name='event_permissions')
permission = models.ForeignKey(Permission, on_delete=models.CASCADE)
event = models.ForeignKey(Event, on_delete=models.CASCADE, null=True, blank=True)
class Meta:
unique_together = ('user', 'permission', 'event')
class AuthTokenEventPermissions(models.Model):
token = models.ForeignKey(ExtendedAuthToken, on_delete=models.CASCADE)
permission = models.ForeignKey(EventPermission, on_delete=models.CASCADE)
event = models.ForeignKey(Event, on_delete=models.CASCADE)
class Meta:
unique_together = ('token', 'permission', 'event')

View file

@ -0,0 +1,32 @@
from rest_framework import serializers
from django.contrib.auth.models import Group
from authentication.models import ExtendedUser
class UserSerializer(serializers.ModelSerializer):
permissions = serializers.SerializerMethodField()
groups = serializers.SlugRelatedField(many=True, read_only=True, slug_field='name')
class Meta:
model = ExtendedUser
fields = ('id', 'username', 'email', 'first_name', 'last_name', 'permissions', 'groups')
read_only_fields = ('id', 'username', 'email', 'first_name', 'last_name', 'permissions', 'groups')
def get_permissions(self, obj):
return list(set(obj.get_permissions()))
class GroupSerializer(serializers.ModelSerializer):
permissions = serializers.SerializerMethodField()
members = serializers.SerializerMethodField()
class Meta:
model = Group
fields = ('id', 'name', 'permissions', 'members')
def get_permissions(self, obj):
return ["*:" + p.codename for p in obj.permissions.all()]
def get_members(self, obj):
return [u.username for u in obj.user_set.all()]

View file

View file

View file

@ -0,0 +1,90 @@
from django.test import TestCase, Client
from django.contrib.auth.models import Permission
from knox.models import AuthToken
from authentication.models import EventPermission, ExtendedUser
from inventory.models import Event
class PermissionsTestCase(TestCase):
def setUp(self):
super().setUp()
self.user = ExtendedUser.objects.create_user('testuser', 'test', 'test')
self.user.user_permissions.add(*Permission.objects.all())
event1 = Event.objects.create(slug='testevent1', name='testevent1')
event2 = Event.objects.create(slug='testevent2', name='testevent2')
permission1 = Permission.objects.get(codename='view_event')
EventPermission.objects.create(user=self.user, permission=permission1, event=event1)
EventPermission.objects.create(user=self.user, permission=permission1, event=event2)
self.token = AuthToken.objects.create(user=self.user)
self.client = Client(headers={'Authorization': 'Token ' + self.token[1]})
self.newuser = ExtendedUser.objects.create_user('newuser', 'test', 'test')
self.newuser_token = AuthToken.objects.create(user=self.newuser)
self.newuser_client = Client(headers={'Authorization': 'Token ' + self.newuser_token[1]})
def test_user_permissions(self):
"""
Test that a user can only access their own data.
"""
response = self.client.get('/api/2/users/')
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.json()), 3)
self.assertEqual(response.json()[0]['username'], 'legacy_user')
self.assertEqual(response.json()[0]['email'], 'mail@localhost')
self.assertEqual(response.json()[0]['first_name'], '')
self.assertEqual(response.json()[0]['last_name'], '')
self.assertEqual(response.json()[0]['id'], 1)
self.assertEqual(response.json()[1]['username'], 'testuser')
self.assertEqual(response.json()[1]['email'], 'test')
self.assertEqual(response.json()[1]['first_name'], '')
self.assertEqual(response.json()[1]['last_name'], '')
def test_user_permission(self):
"""
Test that a user can only access their own data.
"""
#ä['add_logentry', 'change_logentry', 'delete_logentry', 'view_logentry', 'add_group', 'change_group',
#ä 'delete_group', 'view_group', 'add_permission', 'change_permission', 'delete_permission', 'view_permission',
#ä 'add_authtokeneventpermissions', 'change_authtokeneventpermissions', 'delete_authtokeneventpermissions',
#ä 'view_authtokeneventpermissions', 'add_eventpermission', 'change_eventpermission', 'delete_eventpermission',
#ä 'view_eventpermission', 'add_extendedauthtoken', 'change_extendedauthtoken', 'delete_extendedauthtoken',
#ä 'view_extendedauthtoken', 'add_extendeduser', 'change_extendeduser', 'delete_extendeduser',
#ä 'view_extendeduser', 'add_contenttype', 'change_contenttype', 'delete_contenttype', 'view_contenttype',
#ä 'add_file', 'change_file', 'delete_file', 'view_file', 'add_container', 'change_container', 'delete_container',
#ä 'view_container', 'add_event', 'change_event', 'delete_event', 'view_event', 'add_item', 'change_item',
#ä 'delete_item', 'match_item', 'view_item', 'add_authtoken', 'change_authtoken', 'delete_authtoken',
#ä 'view_authtoken', 'add_email', 'change_email', 'delete_email', 'view_email', 'add_eventaddress',
#ä 'change_eventaddress', 'delete_eventaddress', 'view_eventaddress', 'add_systemevent', 'change_systemevent',
#ä 'delete_systemevent', 'view_systemevent', 'add_session', 'change_session', 'delete_session', 'view_session',
#ä 'add_comment', 'change_comment', 'delete_comment', 'view_comment', 'add_issuethread', 'change_issuethread',
#ä 'delete_issuethread', 'send_mail', 'view_issuethread', 'add_statechange', 'change_statechange',
#ä 'delete_statechange', 'view_statechange']
user = ExtendedUser.objects.create_user('testuser2', 'test', 'test')
user.event_permissions.create(permission=Permission.objects.get(codename='view_item'), event=Event.objects.get(slug='testevent1'))
user.event_permissions.create(permission=Permission.objects.get(codename='view_item'), event=Event.objects.get(slug='testevent2'))
user.event_permissions.create(permission=Permission.objects.get(codename='add_item'), event=Event.objects.get(slug='testevent1'))
user.save()
#self.assertTrue(user.has_perm('inventory.view_event', Event.objects.get(slug='testevent1')))
#self.assertTrue(user.has_perm('inventory.view_event', Event.objects.get(slug='testevent2')))
#self.assertFalse(user.has_perm('inventory.add_event', Event.objects.get(slug='testevent1')))
#self.assertFalse(user.has_perm('inventory.add_event', Event.objects.get(slug='testevent2')))
def test_item_api_permissions(self):
"""
Test that a user can only access their own data.
"""
response = self.client.get('/api/2/testevent1/items/')
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.json()), 0)
response = self.client.get('/api/2/testevent2/items/')
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.json()), 0)
response = self.newuser_client.get('/api/2/testevent1/items/')
self.assertEqual(response.status_code, 403)
response = self.newuser_client.get('/api/2/testevent2/items/')
self.assertEqual(response.status_code, 403)

View file

@ -0,0 +1,183 @@
from django.test import TestCase, Client
from django.contrib.auth.models import Permission, Group
from knox.models import AuthToken
from authentication.models import ExtendedUser, EventPermission
from core import settings
from inventory.models import Event
class UserApiTest(TestCase):
def setUp(self):
self.event = Event.objects.create(name='testevent', slug='testevent')
self.group1 = Group.objects.create(name='testgroup1')
self.group2 = Group.objects.create(name='testgroup2')
self.group1.permissions.add(Permission.objects.get(codename='add_item'))
self.group1.permissions.add(Permission.objects.get(codename='view_item'))
self.group2.permissions.add(Permission.objects.get(codename='view_event'))
self.group2.permissions.add(Permission.objects.get(codename='view_item'))
self.user = ExtendedUser.objects.create_user('testuser', 'test', 'test')
self.user.user_permissions.add(Permission.objects.get(codename='add_event'))
self.user.groups.add(self.group1)
self.user.groups.add(self.group2)
self.user.save()
EventPermission.objects.create(event=self.event, user=self.user,
permission=Permission.objects.get(codename='delete_item'))
self.user.save()
self.token = AuthToken.objects.create(user=self.user)
self.client = Client(headers={'Authorization': 'Token ' + self.token[1]})
def test_users(self):
response = self.client.get('/api/2/users/')
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.json()), 2)
self.assertEqual(response.json()[0]['username'], settings.LEGACY_USER_NAME)
self.assertEqual(response.json()[0]['email'], 'mail@' + settings.MAIL_DOMAIN)
self.assertEqual(response.json()[0]['first_name'], '')
self.assertEqual(response.json()[0]['last_name'], '')
self.assertEqual(response.json()[0]['id'], 1)
self.assertEqual(response.json()[0]['groups'], [])
self.assertEqual(response.json()[1]['username'], 'testuser')
self.assertEqual(response.json()[1]['email'], 'test')
self.assertEqual(response.json()[1]['first_name'], '')
self.assertEqual(response.json()[1]['last_name'], '')
self.assertEqual(response.json()[1]['id'], 2)
self.assertEqual(response.json()[1]['groups'], ['testgroup1', 'testgroup2'])
def test_self_user(self):
response = self.client.get('/api/2/self/')
self.assertEqual(response.status_code, 200)
self.assertEqual(response.json()['username'], 'testuser')
self.assertEqual(response.json()['email'], 'test')
self.assertEqual(response.json()['first_name'], '')
self.assertEqual(response.json()['last_name'], '')
permissions = response.json()['permissions']
self.assertEqual(len(permissions), 5)
self.assertTrue('*:add_item' in permissions)
self.assertTrue('*:view_item' in permissions)
self.assertTrue('*:view_event' in permissions)
self.assertTrue('testevent:delete_item' in permissions)
self.assertTrue('*:add_event' in permissions)
def test_register_user(self):
anonymous = Client()
response = anonymous.post('/api/2/register/', {'username': 'testuser2', 'password': 'test', 'email': 'test2'},
content_type='application/json')
self.assertEqual(response.status_code, 201)
self.assertEqual(response.json()['username'], 'testuser2')
self.assertEqual(response.json()['email'], 'test2')
self.assertEqual(len(ExtendedUser.objects.all()), 3)
self.assertEqual(ExtendedUser.objects.get(username='testuser2').email, 'test2')
self.assertTrue(ExtendedUser.objects.get(username='testuser2').check_password('test'))
def test_register_user_duplicate(self):
anonymous = Client()
response = anonymous.post('/api/2/register/', {'username': 'testuser', 'password': 'test', 'email': 'test2'},
content_type='application/json')
self.assertEqual(response.status_code, 400)
self.assertEqual(response.json()['errors']['username'], 'Username already exists')
self.assertEqual(len(ExtendedUser.objects.all()), 2)
def test_register_user_no_username(self):
anonymous = Client()
response = anonymous.post('/api/2/register/', {'password': 'test', 'email': 'test2'},
content_type='application/json')
self.assertEqual(response.status_code, 400)
self.assertEqual(response.json()['errors']['username'], 'Username is required')
self.assertEqual(len(ExtendedUser.objects.all()), 2)
def test_register_user_no_password(self):
anonymous = Client()
response = anonymous.post('/api/2/register/', {'username': 'testuser2', 'email': 'test2'},
content_type='application/json')
self.assertEqual(response.status_code, 400)
self.assertEqual(response.json()['errors']['password'], 'Password is required')
self.assertEqual(len(ExtendedUser.objects.all()), 2)
def test_register_user_no_email(self):
anonymous = Client()
response = anonymous.post('/api/2/register/', {'username': 'testuser2', 'password': 'test'},
content_type='application/json')
self.assertEqual(response.status_code, 400)
self.assertEqual(response.json()['errors']['email'], 'Email is required')
self.assertEqual(len(ExtendedUser.objects.all()), 2)
def test_register_user_duplicate_email(self):
anonymous = Client()
response = anonymous.post('/api/2/register/', {'username': 'testuser2', 'password': 'test', 'email': 'test'},
content_type='application/json')
self.assertEqual(response.status_code, 400)
self.assertEqual(response.json()['errors']['email'], 'Email already exists')
self.assertEqual(len(ExtendedUser.objects.all()), 2)
def test_get_token(self):
anonymous = Client()
response = anonymous.post('/api/2/login/', {'username': 'testuser', 'password': 'test'},
content_type='application/json')
self.assertEqual(response.status_code, 200)
self.assertTrue('token' in response.json())
def test_legacy_user(self):
response = self.client.get('/api/2/users/1/')
self.assertEqual(response.status_code, 200)
self.assertEqual(response.json()['username'], settings.LEGACY_USER_NAME)
self.assertEqual(response.json()['email'], 'mail@' + settings.MAIL_DOMAIN)
self.assertEqual(response.json()['first_name'], '')
self.assertEqual(response.json()['last_name'], '')
self.assertEqual(response.json()['id'], 1)
def test_get_legacy_user_token(self):
anonymous = Client()
response = anonymous.post('/api/2/login/', {
'username': settings.LEGACY_USER_NAME, 'password': settings.LEGACY_USER_PASSWORD},
content_type='application/json')
self.assertEqual(response.status_code, 200)
self.assertTrue('token' in response.json())
class GroupApiTest(TestCase):
def setUp(self):
self.event = Event.objects.create(name='testevent', slug='testevent')
# Admin, Orga, Team, User are created by default
self.group1 = Group.objects.create(name='testgroup1')
self.group2 = Group.objects.create(name='testgroup2')
self.group1.permissions.add(Permission.objects.get(codename='add_item'))
self.group1.permissions.add(Permission.objects.get(codename='view_item'))
self.group2.permissions.add(Permission.objects.get(codename='view_event'))
self.group2.permissions.add(Permission.objects.get(codename='view_item'))
self.user = ExtendedUser.objects.create_user('testuser', 'test', 'test')
self.user.user_permissions.add(Permission.objects.get(codename='add_event'))
self.user.groups.add(self.group1)
self.user.groups.add(self.group2)
self.user.save()
EventPermission.objects.create(event=self.event, user=self.user,
permission=Permission.objects.get(codename='delete_item'))
self.user.save()
self.token = AuthToken.objects.create(user=self.user)
self.client = Client(headers={'Authorization': 'Token ' + self.token[1]})
def test_groups(self):
response = self.client.get('/api/2/groups/')
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.json()), 6)
self.assertEqual(response.json()[0]['name'], 'Admin')
self.assertEqual(response.json()[1]['name'], 'Orga')
self.assertEqual(response.json()[2]['name'], 'Team')
self.assertEqual(response.json()[3]['name'], 'User')
self.assertEqual(response.json()[4]['name'], 'testgroup1')
self.assertEqual(response.json()[5]['name'], 'testgroup2')
def test_group(self):
response = self.client.get('/api/2/groups/5/')
self.assertEqual(response.status_code, 200)
self.assertEqual(response.json()['name'], 'testgroup1')
permissions = response.json()['permissions']
self.assertEqual(len(permissions), 2)
self.assertTrue('*:add_item' in permissions)
self.assertTrue('*:view_item' in permissions)
members = response.json()['members']
self.assertEqual(len(members), 1)
self.assertEqual(members[0], 'testuser')

0
core/core/__init__.py Normal file
View file

66
core/core/asgi.py Normal file
View file

@ -0,0 +1,66 @@
"""
ASGI config for core project.
It exposes the ASGI callable as a module-level variable named ``application``.
For more information on this file, see
https://docs.djangoproject.com/en/4.2/howto/deployment/asgi/
"""
import os
from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.security.websocket import AllowedHostsOriginValidator
from django.core.asgi import get_asgi_application
from notify_sessions.routing import websocket_urlpatterns
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'core.settings')
django_asgi_app = get_asgi_application()
class TokenAuthMiddleware:
"""
Token authorization middleware for Django Channels 2
"""
def __init__(self, inner):
self.inner = inner
def __call__(self, scope):
import base64
headers = dict(scope['headers'])
if b'authorization' in headers:
try:
token_name, token_key = headers[b'authorization'].decode().split()
if token_name == 'Basic':
b64 = base64.b64decode(token_key)
user = b64.decode().split(':')[0]
password = b64.decode().split(':')[1]
print(user, password)
else:
print("Token name is not Basic")
scope['user'] = None
except:
print("Token is not valid")
scope['user'] = None
else:
print("Token is not in headers")
scope['user'] = None
TokenAuthMiddlewareStack = lambda inner: TokenAuthMiddleware(AuthMiddlewareStack(inner))
websocket_asgi_app = AllowedHostsOriginValidator(
AuthMiddlewareStack(
URLRouter(
websocket_urlpatterns
)
)
)
application = ProtocolTypeRouter({
"http": django_asgi_app,
"websocket": websocket_asgi_app,
})

29
core/core/globals.py Normal file
View file

@ -0,0 +1,29 @@
import asyncio
import logging
import signal
loop = asyncio.get_event_loop()
def create_task(coro):
global loop
loop.create_task(coro)
async def shutdown(sig, loop):
log = logging.getLogger()
log.info(f"Received exit signal {sig.name}...")
tasks = [t for t in asyncio.all_tasks() if t is not
asyncio.current_task()]
[task.cancel() for task in tasks]
log.info(f"Cancelling {len(tasks)} outstanding tasks")
await asyncio.wait_for(loop.shutdown_asyncgens(), timeout=10)
loop.stop()
log.info("Shutdown complete.")
def init_loop():
global loop
loop.add_signal_handler(signal.SIGTERM, lambda: asyncio.create_task(shutdown(signal.SIGTERM, loop)))
loop.add_signal_handler(signal.SIGINT, lambda: asyncio.create_task(shutdown(signal.SIGINT, loop)))
return loop

228
core/core/settings.py Normal file
View file

@ -0,0 +1,228 @@
"""
Django settings for core project.
Generated by 'django-admin startproject' using Django 4.2.7.
For more information on this file, see
https://docs.djangoproject.com/en/4.2/topics/settings/
For the full list of settings and their values, see
https://docs.djangoproject.com/en/4.2/ref/settings/
"""
import os
import sys
import dotenv
from pathlib import Path
def truthy_str(s):
return s.lower() in ['true', '1', 't', 'y', 'yes', 'yeah', 'yup', 'certainly', 'sure', 'positive', 'uh-huh', '👍']
# Build paths inside the project like this: BASE_DIR / 'subdir'.
BASE_DIR = Path(__file__).resolve().parent.parent
dotenv.load_dotenv(BASE_DIR / '.env')
# Quick-start development settings - unsuitable for production
# See https://docs.djangoproject.com/en/4.2/howto/deployment/checklist/
# SECURITY WARNING: keep the secret key used in production secret!
SECRET_KEY = os.getenv('DJANGO_SECRET_KEY', 'django-insecure-tm*$w_14iqbiy-!7(8#ba7j+_@(7@rf2&a^!=shs&$03b%2*rv')
# SECURITY WARNING: don't run with debug turned on in production!
DEBUG = truthy_str(os.getenv('DEBUG_MODE_ACTIVE', 'False'))
PRIMARY_HOST = os.getenv('HTTP_HOST', 'localhost')
ALLOWED_HOSTS = [PRIMARY_HOST]
MAIL_DOMAIN = os.getenv('MAIL_DOMAIN', 'localhost')
CSRF_TRUSTED_ORIGINS = ["https://" + host for host in ALLOWED_HOSTS]
LEGACY_USER_NAME = os.getenv('LEGACY_API_USER', 'legacy_user')
LEGACY_USER_PASSWORD = os.getenv('LEGACY_API_PASSWORD', 'legacy_password')
SYSTEM3_VERSION = "0.0.0-dev.0"
ACTIVE_SPAM_TRAINING = truthy_str(os.getenv('ACTIVE_SPAM_TRAINING', 'False'))
# Application definition
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'django_extensions',
'django_prometheus',
'rest_framework',
'knox',
'drf_yasg',
'channels',
'authentication',
'files',
'tickets',
'inventory',
'mail',
'notify_sessions',
]
REST_FRAMEWORK = {
'TEST_REQUEST_DEFAULT_FORMAT': 'json',
'DEFAULT_AUTHENTICATION_CLASSES': ('knox.auth.TokenAuthentication',),
'DEFAULT_PERMISSION_CLASSES': (
'rest_framework.permissions.IsAuthenticated', 'rest_framework.permissions.DjangoModelPermissions'),
}
AUTH_USER_MODEL = 'authentication.ExtendedUser'
SWAGGER_SETTINGS = {
'SECURITY_DEFINITIONS': {
'api_key': {
'type': 'apiKey',
'in': 'header',
'name': 'Authorization'
}
},
'USE_SESSION_AUTH': False,
'JSON_EDITOR': True,
'DEFAULT_INFO': 'core.urls.openapi_info',
}
MIDDLEWARE = [
'django_prometheus.middleware.PrometheusBeforeMiddleware',
'django.middleware.security.SecurityMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
'django_prometheus.middleware.PrometheusAfterMiddleware',
]
ROOT_URLCONF = 'core.urls'
TEMPLATES = [
{
'BACKEND': 'django.template.backends.django.DjangoTemplates',
'DIRS': [BASE_DIR / 'templates'],
'APP_DIRS': True,
'OPTIONS': {
'context_processors': [
'django.template.context_processors.debug',
'django.template.context_processors.request',
'django.contrib.auth.context_processors.auth',
'django.contrib.messages.context_processors.messages',
],
},
},
]
WSGI_APPLICATION = 'core.wsgi.application'
# Database
# https://docs.djangoproject.com/en/4.2/ref/settings/#databases
if 'test' in sys.argv:
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': ':memory:',
}
}
else:
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.mysql',
'HOST': os.getenv('DB_HOST', 'localhost'),
'PORT': os.getenv('DB_PORT', '3306'),
'NAME': os.getenv('DB_NAME', 'system3'),
'USER': os.getenv('DB_USER', 'system3'),
'PASSWORD': os.getenv('DB_PASSWORD', 'system3'),
'OPTIONS': {
'charset': 'utf8mb4',
'init_command': "SET sql_mode='STRICT_TRANS_TABLES'"
}
}
}
# Password validation
# https://docs.djangoproject.com/en/4.2/ref/settings/#auth-password-validators
AUTH_PASSWORD_VALIDATORS = [
{
'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator',
},
]
# Internationalization
# https://docs.djangoproject.com/en/4.2/topics/i18n/
LANGUAGE_CODE = 'en-us'
TIME_ZONE = 'UTC'
USE_I18N = True
USE_TZ = True
# Static files (CSS, JavaScript, Images)
# https://docs.djangoproject.com/en/4.2/howto/static-files/
STATIC_ROOT = os.getenv('STATIC_ROOT', 'staticfiles')
STATIC_URL = '/static/'
MEDIA_ROOT = os.getenv('MEDIA_ROOT', 'userfiles')
MEDIA_URL = '/media/'
STORAGES = {
'default': {
'BACKEND': 'django.core.files.storage.FileSystemStorage',
'OPTIONS': {
'base_url': MEDIA_URL,
'location': BASE_DIR / MEDIA_ROOT
},
},
'staticfiles': {
'BACKEND': 'django.core.files.storage.FileSystemStorage',
'OPTIONS': {
'base_url': STATIC_URL,
'location': BASE_DIR / STATIC_ROOT
},
},
}
DATA_UPLOAD_MAX_MEMORY_SIZE = 1024 * 1024 * 128 # 128 MB
# Default primary key field type
# https://docs.djangoproject.com/en/4.2/ref/settings/#default-auto-field
DEFAULT_AUTO_FIELD = 'django.db.models.BigAutoField'
CHANNEL_LAYERS = {
'default': {
'BACKEND': 'channels_redis.core.RedisChannelLayer',
'CONFIG': {
'hosts': [(os.getenv('REDIS_HOST', 'localhost'), 6379)],
},
}
}
PROMETHEUS_METRIC_NAMESPACE = 'c3lf'
TEST_RUNNER = 'core.test_runner.FastTestRunner'

33
core/core/test_runner.py Normal file
View file

@ -0,0 +1,33 @@
from django.conf import settings
from django.test.runner import DiscoverRunner
class FastTestRunner(DiscoverRunner):
def setup_test_environment(self):
super(FastTestRunner, self).setup_test_environment()
# Don't write files
settings.STORAGES = {
'default': {
'BACKEND': 'django.core.files.storage.InMemoryStorage',
'OPTIONS': {
'base_url': '/media/',
'location': '',
},
},
}
# Bonus: Use a faster password hasher. This REALLY helps.
settings.PASSWORD_HASHERS = (
'django.contrib.auth.hashers.MD5PasswordHasher',
)
settings.CHANNEL_LAYERS = {
'default': {
'BACKEND': 'channels.layers.InMemoryChannelLayer'
}
}
settings.DATABASES = {
'default': {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': ':memory:',
}
}

33
core/core/urls.py Normal file
View file

@ -0,0 +1,33 @@
"""
URL configuration for core project.
The `urlpatterns` list routes URLs to views. For more information please see:
https://docs.djangoproject.com/en/4.2/topics/http/urls/
Examples:
Function views
1. Add an import: from my_app import views
2. Add a URL to urlpatterns: path('', views.home, name='home')
Class-based views
1. Add an import: from other_app.views import Home
2. Add a URL to urlpatterns: path('', Home.as_view(), name='home')
Including another URLconf
1. Import the include() function: from django.urls import include, path
2. Add a URL to urlpatterns: path('blog/', include('blog.urls'))
"""
from django.contrib import admin
from django.urls import path, include
from .version import get_info
urlpatterns = [
path('djangoadmin/', admin.site.urls),
path('api/2/', include('inventory.api_v2')),
path('api/2/', include('files.api_v2')),
path('media/2/', include('files.media_v2')),
path('api/2/', include('tickets.api_v2')),
path('api/2/', include('mail.api_v2')),
path('api/2/', include('notify_sessions.api_v2')),
path('api/2/', include('authentication.api_v2')),
path('api/', get_info),
path('', include('django_prometheus.urls')),
]

15
core/core/version.py Normal file
View file

@ -0,0 +1,15 @@
from rest_framework.decorators import api_view, permission_classes, authentication_classes
from rest_framework.response import Response
from .settings import SYSTEM3_VERSION
@api_view(['GET'])
@permission_classes([])
@authentication_classes([])
def get_info(request):
return Response({
"framework_version": SYSTEM3_VERSION,
"api_min_version": "1.0",
"api_max_version": "1.0",
})

0
core/files/__init__.py Normal file
View file

10
core/files/admin.py Normal file
View file

@ -0,0 +1,10 @@
from django.contrib import admin
from files.models import File
class FileAdmin(admin.ModelAdmin):
pass
admin.site.register(File, FileAdmin)

24
core/files/api_v2.py Normal file
View file

@ -0,0 +1,24 @@
from rest_framework import serializers, viewsets, routers
from files.models import File
class FileSerializer(serializers.ModelSerializer):
data = serializers.CharField(max_length=1000000, write_only=True)
class Meta:
model = File
fields = ['hash', 'data']
read_only_fields = ['hash']
class FileViewSet(viewsets.ModelViewSet):
serializer_class = FileSerializer
queryset = File.objects.all()
lookup_field = 'hash'
router = routers.SimpleRouter()
router.register(r'files', FileViewSet, basename='files')
urlpatterns = router.urls

92
core/files/media_v2.py Normal file
View file

@ -0,0 +1,92 @@
from datetime import datetime, timedelta
import os
from django.http import HttpResponse
from django.urls import path
from drf_yasg.utils import swagger_auto_schema
from rest_framework import status
from rest_framework.decorators import api_view, permission_classes
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from core.settings import MEDIA_ROOT
from files.models import File
from mail.models import EmailAttachment
@swagger_auto_schema(method='GET', auto_schema=None)
@api_view(['GET'])
@permission_classes([IsAuthenticated])
def media_urls(request, hash):
try:
if request.META.get('HTTP_IF_NONE_MATCH') and request.META.get('HTTP_IF_NONE_MATCH') == hash:
return HttpResponse(status=status.HTTP_304_NOT_MODIFIED)
file = File.objects.filter(hash=hash).first()
attachment = EmailAttachment.objects.filter(hash=hash).first()
file = file if file else attachment
if not file:
return Response(status=status.HTTP_404_NOT_FOUND)
hash_path = file.file
return HttpResponse(status=status.HTTP_200_OK,
content_type=file.mime_type,
headers={
'X-Accel-Redirect': f'/redirect_media/{hash_path}',
'Access-Control-Allow-Origin': '*',
'Cache-Control': 'max-age=31536000, private, immutable',
'Expires': datetime.utcnow() + timedelta(days=365),
'Age': 0,
'ETag': file.hash,
})
except File.DoesNotExist:
return Response(status=status.HTTP_404_NOT_FOUND)
except EmailAttachment.DoesNotExist:
return Response(status=status.HTTP_404_NOT_FOUND)
@swagger_auto_schema(method='GET', auto_schema=None)
@api_view(['GET'])
@permission_classes([IsAuthenticated])
def thumbnail_urls(request, size, hash):
if size not in [32, 64, 256]:
return Response(status=status.HTTP_404_NOT_FOUND)
if request.META.get('HTTP_IF_NONE_MATCH') and request.META.get('HTTP_IF_NONE_MATCH') == hash + "_" + str(size):
return HttpResponse(status=status.HTTP_304_NOT_MODIFIED)
try:
file = File.objects.filter(hash=hash).first()
attachment = EmailAttachment.objects.filter(hash=hash).first()
file = file if file else attachment
if not file:
return Response(status=status.HTTP_404_NOT_FOUND)
hash_path = file.file
if not os.path.exists(MEDIA_ROOT + f'/thumbnails/{size}/{hash_path}'):
from PIL import Image
image = Image.open(file.file)
image.thumbnail((size, size))
rgb_image = image.convert('RGB')
thumb_dir = os.path.dirname(MEDIA_ROOT + f'/thumbnails/{size}/{hash_path}')
if not os.path.exists(thumb_dir):
os.makedirs(thumb_dir)
rgb_image.save(MEDIA_ROOT + f'/thumbnails/{size}/{hash_path}', 'jpeg', quality=90)
return HttpResponse(status=status.HTTP_200_OK,
content_type="image/jpeg",
headers={
'X-Accel-Redirect': f'/redirect_thumbnail/{size}/{hash_path}',
'Access-Control-Allow-Origin': '*',
'Cache-Control': 'max-age=31536000, private, immutable',
'Expires': datetime.utcnow() + timedelta(days=365),
'Age': 0,
'ETag': file.hash + "_" + str(size),
})
except File.DoesNotExist:
return Response(status=status.HTTP_404_NOT_FOUND)
except EmailAttachment.DoesNotExist:
return Response(status=status.HTTP_404_NOT_FOUND)
urlpatterns = [
path('<int:size>/<path:hash>/', thumbnail_urls),
path('<path:hash>/', media_urls),
]

View file

@ -0,0 +1,30 @@
# Generated by Django 4.2.7 on 2023-12-09 02:13
from django.db import migrations, models
import django.db.models.deletion
import files.models
class Migration(migrations.Migration):
initial = True
dependencies = [
('inventory', '0001_initial'),
]
operations = [
migrations.CreateModel(
name='File',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('created_at', models.DateTimeField(blank=True, null=True)),
('updated_at', models.DateTimeField(blank=True, null=True)),
('deleted_at', models.DateTimeField(blank=True, null=True)),
('file', models.ImageField(upload_to=files.models.hash_upload)),
('mime_type', models.CharField(max_length=255)),
('hash', models.CharField(max_length=64, unique=True)),
('item', models.ForeignKey(blank=True, db_column='iid', null=True, on_delete=django.db.models.deletion.CASCADE, related_name='files', to='inventory.item')),
],
),
]

View file

@ -0,0 +1,19 @@
# Generated by Django 4.2.7 on 2024-01-10 19:04
from django.db import migrations, models
import files.models
class Migration(migrations.Migration):
dependencies = [
('files', '0001_initial'),
]
operations = [
migrations.AlterField(
model_name='file',
name='file',
field=models.FileField(upload_to=files.models.hash_upload),
),
]

View file

@ -0,0 +1,24 @@
# Generated by Django 4.2.7 on 2024-11-21 22:40
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('files', '0002_alter_file_file'),
]
def set_creation_date(apps, schema_editor):
File = apps.get_model('files', 'File')
for file in File.objects.all():
if file.created_at is None:
if not file.item.created_at is None:
file.created_at = file.item.created_at
else:
file.created_at = max(File.objects.filter(
id__lt=file.id, created_at__isnull=False).values_list('created_at', flat=True))
file.save()
operations = [
migrations.RunPython(set_creation_date),
]

View file

95
core/files/models.py Normal file
View file

@ -0,0 +1,95 @@
from django.core.files.base import ContentFile
from django.db import models, IntegrityError
from django_softdelete.models import SoftDeleteModel
from inventory.models import Item
def hash_upload(instance, filename):
return f"{instance.hash[:2]}/{instance.hash[2:4]}/{instance.hash[4:6]}/{instance.hash[6:]}"
class FileManager(models.Manager):
def get_or_create(self, **kwargs):
if 'data' in kwargs and type(kwargs['data']) == str:
import base64
from hashlib import sha256
raw = kwargs['data']
if not raw.startswith('data:'):
raise ValueError('data must be a base64 encoded string or file and hash must be provided')
raw = raw.split(';base64,')
if len(raw) != 2:
raise ValueError('data must be a base64 encoded string or file and hash must be provided')
mime_type = raw[0].split(':')[1]
content = base64.b64decode(raw[1], validate=True)
kwargs.pop('data')
content_hash = sha256(content).hexdigest()
kwargs['file'] = ContentFile(content, content_hash)
kwargs['hash'] = content_hash
kwargs['mime_type'] = mime_type
elif 'file' in kwargs and 'hash' in kwargs and type(kwargs['file']) == ContentFile and 'mime_type' in kwargs:
pass
else:
raise ValueError('data must be a base64 encoded string or file and hash must be provided')
try:
return self.get(hash=kwargs['hash']), False
except self.model.DoesNotExist:
obj = super().create(**kwargs)
obj.file.save(content=kwargs['file'], name=kwargs['hash'])
return obj, True
def create(self, **kwargs):
if 'data' in kwargs and type(kwargs['data']) == str:
import base64
from hashlib import sha256
raw = kwargs['data']
if not raw.startswith('data:'):
raise ValueError('data must be a base64 encoded string or file and hash must be provided')
raw = raw.split(';base64,')
if len(raw) != 2:
raise ValueError('data must be a base64 encoded string or file and hash must be provided')
mime_type = raw[0].split(':')[1]
content = base64.b64decode(raw[1], validate=True)
kwargs.pop('data')
content_hash = sha256(content).hexdigest()
kwargs['file'] = ContentFile(content, content_hash)
kwargs['hash'] = content_hash
kwargs['mime_type'] = mime_type
elif 'file' in kwargs and 'hash' in kwargs and type(kwargs['file']) == ContentFile and 'mime_type' in kwargs:
pass
else:
raise ValueError('data must be a base64 encoded string or file and hash must be provided')
if not self.filter(hash=kwargs['hash']).exists():
obj = super().create(**kwargs)
obj.file.save(content=kwargs['file'], name=kwargs['hash'])
return obj
else:
raise IntegrityError('File with this hash already exists')
class AbstractFile(models.Model):
created_at = models.DateTimeField(blank=True, null=True)
updated_at = models.DateTimeField(blank=True, null=True)
deleted_at = models.DateTimeField(blank=True, null=True)
file = models.FileField(upload_to=hash_upload)
mime_type = models.CharField(max_length=255, null=False, blank=False)
hash = models.CharField(max_length=64, null=False, blank=False, unique=True)
objects = FileManager()
def save(self, *args, **kwargs):
from django.utils import timezone
if not self.created_at:
self.created_at = timezone.now()
self.updated_at = timezone.now()
super().save(*args, **kwargs)
class Meta:
abstract = True
class File(AbstractFile):
item = models.ForeignKey(Item, models.CASCADE, db_column='iid', null=True, blank=True, related_name='files')
def __str__(self):
return self.hash

View file

View file

View file

@ -0,0 +1,55 @@
from django.test import TestCase, Client
from django.contrib.auth.models import Permission
from authentication.models import ExtendedUser
from files.models import File
from inventory.models import Event, Container, Item
from knox.models import AuthToken
class FileTestCase(TestCase):
def setUp(self):
super().setUp()
self.user = ExtendedUser.objects.create_user('testuser', 'test', 'test')
self.user.user_permissions.add(*Permission.objects.all())
self.user.save()
self.token = AuthToken.objects.create(user=self.user)
self.client = Client(headers={'Authorization': 'Token ' + self.token[1]})
self.event = Event.objects.create(slug='EVENT', name='Event')
self.box = Container.objects.create(name='BOX')
def test_list_files(self):
import base64
item = File.objects.create(data="data:text/plain;base64," + base64.b64encode(b"foo").decode('utf-8'))
response = self.client.get('/api/2/files/')
self.assertEqual(response.status_code, 200)
self.assertEqual(response.json()[0]['hash'], item.hash)
self.assertEqual(len(response.json()[0]['hash']), 64)
def test_one_file(self):
import base64
item = File.objects.create(data="data:text/plain;base64," + base64.b64encode(b"foo").decode('utf-8'))
response = self.client.get(f'/api/2/files/{item.hash}/')
self.assertEqual(response.status_code, 200)
self.assertEqual(response.json()['hash'], item.hash)
self.assertEqual(len(response.json()['hash']), 64)
def test_create_file(self):
import base64
Item.objects.create(container=self.box, event=self.event, description='1')
item = Item.objects.create(container=self.box, event=self.event, description='2')
response = self.client.post('/api/2/files/',
{'data': "data:text/plain;base64," + base64.b64encode(b"foo").decode('utf-8')},
content_type='application/json')
self.assertEqual(response.status_code, 201)
self.assertEqual(len(response.json()['hash']), 64)
def test_delete_file(self):
import base64
item = Item.objects.create(container=self.box, event=self.event, description='1')
File.objects.create(item=item, data="data:text/plain;base64," + base64.b64encode(b"foo").decode('utf-8'))
file = File.objects.create(item=item, data="data:text/plain;base64," + base64.b64encode(b"bar").decode('utf-8'))
self.assertEqual(len(File.objects.all()), 2)
response = self.client.delete(f'/api/2/files/{file.hash}/')
self.assertEqual(response.status_code, 204)

30
core/helper.py Normal file
View file

@ -0,0 +1,30 @@
import asyncio
import logging
import signal
loop = None
def create_task(coro):
global loop
loop.create_task(coro)
async def shutdown(sig, loop):
log = logging.getLogger()
log.info(f"Received exit signal {sig.name}...")
tasks = [t for t in asyncio.all_tasks() if t is not
asyncio.current_task()]
[task.cancel() for task in tasks]
log.info(f"Cancelling {len(tasks)} outstanding tasks")
await asyncio.wait_for(loop.shutdown_asyncgens(), timeout=10)
loop.stop()
log.info("Shutdown complete.")
def init_loop():
global loop
loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGTERM, lambda: asyncio.create_task(shutdown(signal.SIGTERM, loop)))
loop.add_signal_handler(signal.SIGINT, lambda: asyncio.create_task(shutdown(signal.SIGINT, loop)))
return loop

View file

38
core/inventory/admin.py Normal file
View file

@ -0,0 +1,38 @@
from django.contrib import admin
from inventory.models import Item, Container, ItemPlacement, Comment, Event
class ItemAdmin(admin.ModelAdmin):
pass
admin.site.register(Item, ItemAdmin)
class ContainerAdmin(admin.ModelAdmin):
pass
admin.site.register(Container, ContainerAdmin)
class EventAdmin(admin.ModelAdmin):
pass
admin.site.register(Event, EventAdmin)
class ItemPlacementAdmin(admin.ModelAdmin):
pass
admin.site.register(ItemPlacement, ItemPlacementAdmin)
class CommentAdmin(admin.ModelAdmin):
pass
admin.site.register(Comment, CommentAdmin)

121
core/inventory/api_v2.py Normal file
View file

@ -0,0 +1,121 @@
from django.urls import re_path
from django.contrib.auth.decorators import permission_required
from rest_framework import routers, viewsets
from rest_framework.decorators import api_view, permission_classes
from rest_framework.response import Response
from rest_framework.permissions import IsAuthenticated
from inventory.models import Event, Container, Item
from inventory.serializers import EventSerializer, ContainerSerializer, ItemSerializer, SearchResultSerializer
from base64 import b64decode
class EventViewSet(viewsets.ModelViewSet):
serializer_class = EventSerializer
queryset = Event.objects.all()
permission_classes = []
class ContainerViewSet(viewsets.ModelViewSet):
serializer_class = ContainerSerializer
queryset = Container.objects.all()
def filter_items(items, query):
query_tokens = query.split(' ')
for item in items:
value = 0
for token in query_tokens:
if token in item.description:
value += 1
if value > 0:
yield {'search_score': value, 'item': item}
@api_view(['GET'])
@permission_classes([IsAuthenticated])
def search_items(request, event_slug, query):
try:
event = Event.objects.get(slug=event_slug)
if not request.user.has_event_perm(event, 'view_item'):
return Response(status=403)
items = filter_items(Item.objects.filter(event=event), b64decode(query).decode('utf-8'))
return Response(SearchResultSerializer(items, many=True).data)
except Event.DoesNotExist:
return Response(status=404)
@api_view(['GET', 'POST'])
@permission_classes([IsAuthenticated])
def item(request, event_slug):
try:
event = None
if event_slug != 'none':
event = Event.objects.get(slug=event_slug)
if request.method == 'GET':
if not request.user.has_event_perm(event, 'view_item'):
return Response(status=403)
return Response(ItemSerializer(Item.objects.filter(event=event), many=True).data)
elif request.method == 'POST':
if not request.user.has_event_perm(event, 'add_item'):
return Response(status=403)
validated_data = ItemSerializer(data=request.data)
if validated_data.is_valid():
validated_data.save(event=event)
return Response(validated_data.data, status=201)
return Response(status=400)
except Event.DoesNotExist:
return Response(status=404)
except KeyError:
return Response(status=400)
@api_view(['GET', 'PUT', 'DELETE', 'PATCH'])
@permission_classes([IsAuthenticated])
def item_by_id(request, event_slug, id):
try:
event = Event.objects.get(slug=event_slug)
item = Item.objects.get(event=event, id=id)
if request.method == 'GET':
if not request.user.has_event_perm(event, 'view_item'):
return Response(status=403)
return Response(ItemSerializer(item).data)
elif request.method == 'PUT':
if not request.user.has_event_perm(event, 'change_item'):
return Response(status=403)
validated_data = ItemSerializer(item, data=request.data)
if validated_data.is_valid():
validated_data.save()
return Response(validated_data.data)
return Response(validated_data.errors, status=400)
elif request.method == 'PATCH':
if not request.user.has_event_perm(event, 'change_item'):
return Response(status=403)
validated_data = ItemSerializer(item, data=request.data, partial=True)
if validated_data.is_valid():
validated_data.save()
return Response(validated_data.data)
return Response(validated_data.errors, status=400)
elif request.method == 'DELETE':
if not request.user.has_event_perm(event, 'delete_item'):
return Response(status=403)
item.delete()
return Response(status=204)
except Item.DoesNotExist:
return Response(status=404)
except Event.DoesNotExist:
return Response(status=404)
router = routers.SimpleRouter()
router.register(r'events', EventViewSet, basename='events')
router.register(r'boxes', ContainerViewSet, basename='boxes')
router.register(r'box', ContainerViewSet, basename='boxes')
urlpatterns = router.urls + [
re_path(r'^(?P<event_slug>[\w-]+)/items/$', item, name='item'),
re_path(r'^(?P<event_slug>[\w-]+)/items/(?P<query>[-A-Za-z0-9+/]*={0,3})/$', search_items, name='search_items'),
re_path(r'^(?P<event_slug>[\w-]+)/item/$', item, name='item'),
re_path(r'^(?P<event_slug>[\w-]+)/item/(?P<id>\d+)/$', item_by_id, name='item_by_id'),
]

View file

@ -0,0 +1,54 @@
# Generated by Django 4.2.7 on 2023-11-18 11:28
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
initial = True
dependencies = [
]
operations = [
migrations.CreateModel(
name='Container',
fields=[
('cid', models.AutoField(primary_key=True, serialize=False)),
('name', models.CharField(max_length=255)),
('created_at', models.DateTimeField(blank=True, null=True)),
('updated_at', models.DateTimeField(blank=True, null=True)),
],
),
migrations.CreateModel(
name='Event',
fields=[
('eid', models.AutoField(primary_key=True, serialize=False)),
('name', models.CharField(max_length=255)),
('slug', models.CharField(max_length=255, unique=True)),
('start', models.DateTimeField(blank=True, null=True)),
('end', models.DateTimeField(blank=True, null=True)),
('pre_start', models.DateTimeField(blank=True, null=True)),
('post_end', models.DateTimeField(blank=True, null=True)),
('created_at', models.DateTimeField(blank=True, null=True)),
('updated_at', models.DateTimeField(blank=True, null=True)),
],
),
migrations.CreateModel(
name='Item',
fields=[
('iid', models.AutoField(primary_key=True, serialize=False)),
('uid', models.IntegerField()),
('description', models.TextField()),
('returned_at', models.DateTimeField(blank=True, null=True)),
('created_at', models.DateTimeField(blank=True, null=True)),
('updated_at', models.DateTimeField(blank=True, null=True)),
('container', models.ForeignKey(db_column='cid', on_delete=django.db.models.deletion.CASCADE, to='inventory.container')),
('event', models.ForeignKey(db_column='eid', on_delete=django.db.models.deletion.CASCADE, to='inventory.event')),
],
options={
'unique_together': {('uid', 'event')},
},
),
]

View file

@ -0,0 +1,33 @@
# Generated by Django 4.2.7 on 2023-11-20 11:23
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('inventory', '0001_initial'),
]
operations = [
migrations.AddField(
model_name='container',
name='deleted_at',
field=models.DateTimeField(blank=True, null=True),
),
migrations.AddField(
model_name='container',
name='is_deleted',
field=models.BooleanField(default=False),
),
migrations.AddField(
model_name='item',
name='deleted_at',
field=models.DateTimeField(blank=True, null=True),
),
migrations.AddField(
model_name='item',
name='is_deleted',
field=models.BooleanField(default=False),
),
]

View file

@ -0,0 +1,17 @@
# Generated by Django 4.2.7 on 2024-01-07 18:46
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('inventory', '0002_container_deleted_at_container_is_deleted_and_more'),
]
operations = [
migrations.AlterModelOptions(
name='item',
options={'permissions': [('match_item', 'Can match item')]},
),
]

View file

@ -0,0 +1,23 @@
# Generated by Django 4.2.7 on 2024-01-22 16:02
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('inventory', '0003_alter_item_options'),
]
operations = [
migrations.AlterField(
model_name='event',
name='created_at',
field=models.DateTimeField(auto_now_add=True, null=True),
),
migrations.AlterField(
model_name='item',
name='created_at',
field=models.DateTimeField(auto_now_add=True, null=True),
),
]

View file

@ -0,0 +1,52 @@
# Generated by Django 4.2.7 on 2024-11-19 22:56
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
dependencies = [
('inventory', '0004_alter_event_created_at_alter_item_created_at'),
]
operations = [
migrations.RenameField(
model_name='container',
old_name='cid',
new_name='id',
),
migrations.RenameField(
model_name='event',
old_name='eid',
new_name='id',
),
migrations.RenameField(
model_name='item',
old_name='iid',
new_name='id',
),
migrations.RenameField(
model_name='item',
old_name='uid',
new_name='uid_deprecated',
),
migrations.AlterUniqueTogether(
name='item',
unique_together=set(),
),
migrations.AlterField(
model_name='item',
name='container',
field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='inventory.container'),
),
migrations.AlterField(
model_name='item',
name='event',
field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='inventory.event'),
),
migrations.AlterUniqueTogether(
name='item',
unique_together={('uid_deprecated', 'event')},
),
]

View file

@ -0,0 +1,17 @@
# Generated by Django 4.2.7 on 2024-11-20 01:39
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('inventory', '0005_rename_cid_container_id_rename_eid_event_id_and_more'),
]
operations = [
migrations.AlterModelTable(
name='event',
table='common_event',
),
]

View file

@ -0,0 +1,52 @@
# Generated by Django 4.2.7 on 2024-11-23 15:27
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
dependencies = [
('inventory', '0006_alter_event_table'),
]
def set_initial_container(apps, schema_editor):
Item = apps.get_model('inventory', 'Item')
for item in Item.objects.all():
item.container_history.get_or_create(container=item.container_old)
item.save()
operations = [
migrations.RenameField(
model_name='item',
old_name='container',
new_name='container_old',
),
migrations.CreateModel(
name='ItemPlacement',
fields=[
('id', models.AutoField(primary_key=True, serialize=False)),
('timestamp', models.DateTimeField(auto_now_add=True)),
('container',
models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='item_history',
to='inventory.container')),
('item',
models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='container_history',
to='inventory.item')),
],
),
migrations.CreateModel(
name='Comment',
fields=[
('id', models.AutoField(primary_key=True, serialize=False)),
('comment', models.TextField()),
('timestamp', models.DateTimeField(auto_now_add=True)),
('item', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='comments',
to='inventory.item')),
],
),
migrations.RunPython(set_initial_container),
migrations.RemoveField(
model_name='item',
name='container_old',
),
]

View file

113
core/inventory/models.py Normal file
View file

@ -0,0 +1,113 @@
from itertools import groupby
from django.db import models
from django_softdelete.models import SoftDeleteModel, SoftDeleteManager
class ItemManager(SoftDeleteManager):
def create(self, **kwargs):
container = kwargs.pop('container')
if 'uid_deprecated' in kwargs:
raise ValueError('uid_deprecated must not be set manually')
uid_deprecated = Item.all_objects.filter(event=kwargs['event']).count() + 1
kwargs['uid_deprecated'] = uid_deprecated
item = super().create(**kwargs)
item.container = container
return item
def get_queryset(self):
return super().get_queryset().filter(returned_at__isnull=True)
class Item(SoftDeleteModel):
id = models.AutoField(primary_key=True)
uid_deprecated = models.IntegerField()
description = models.TextField()
event = models.ForeignKey('Event', models.CASCADE)
returned_at = models.DateTimeField(blank=True, null=True)
created_at = models.DateTimeField(null=True, auto_now_add=True)
updated_at = models.DateTimeField(blank=True, null=True)
@property
def container(self):
try:
return self.container_history.order_by('-timestamp').first().container
except AttributeError:
return None
@container.setter
def container(self, value):
if self.container == value:
return
self.container_history.create(container=value)
@property
def related_issues(self):
groups = groupby(self.issue_relation_changes.all(), lambda rel: rel.issue_thread.id)
return [sorted(v, key=lambda r: r.timestamp)[0].issue_thread for k, v in groups]
objects = ItemManager()
all_objects = models.Manager()
class Meta:
unique_together = (('uid_deprecated', 'event'),)
permissions = [
('match_item', 'Can match item')
]
def __str__(self):
return '[' + str(self.id) + ']' + self.description
class Container(SoftDeleteModel):
id = models.AutoField(primary_key=True)
name = models.CharField(max_length=255)
created_at = models.DateTimeField(blank=True, null=True)
updated_at = models.DateTimeField(blank=True, null=True)
@property
def items(self):
try:
history = self.item_history.order_by('-timestamp').all()
return [v for k, v in groupby(history, key=lambda item: item.item.id)]
except AttributeError:
return []
def __str__(self):
return '[' + str(self.id) + ']' + self.name
class ItemPlacement(models.Model):
id = models.AutoField(primary_key=True)
item = models.ForeignKey('Item', models.CASCADE, related_name='container_history')
container = models.ForeignKey('Container', models.CASCADE, related_name='item_history')
timestamp = models.DateTimeField(auto_now_add=True)
class Comment(models.Model):
id = models.AutoField(primary_key=True)
item = models.ForeignKey(Item, on_delete=models.CASCADE, related_name='comments')
comment = models.TextField()
timestamp = models.DateTimeField(auto_now_add=True)
def __str__(self):
return str(self.item) + ' comment #' + str(self.id)
class Event(models.Model):
id = models.AutoField(primary_key=True)
name = models.CharField(max_length=255)
slug = models.CharField(max_length=255, unique=True)
start = models.DateTimeField(blank=True, null=True)
end = models.DateTimeField(blank=True, null=True)
pre_start = models.DateTimeField(blank=True, null=True)
post_end = models.DateTimeField(blank=True, null=True)
created_at = models.DateTimeField(null=True, auto_now_add=True)
updated_at = models.DateTimeField(blank=True, null=True)
def __str__(self):
return '[' + str(self.slug) + ']' + self.name
class Meta:
db_table = 'common_event'

View file

@ -0,0 +1,128 @@
from django.utils import timezone
from rest_framework import serializers
from rest_framework.relations import SlugRelatedField
from files.models import File
from inventory.models import Event, Container, Item
from inventory.shared_serializers import BasicItemSerializer
from mail.models import EventAddress
from tickets.shared_serializers import BasicIssueSerializer
class EventSerializer(serializers.ModelSerializer):
addresses = SlugRelatedField(many=True, slug_field='address', queryset=EventAddress.objects.all())
class Meta:
model = Event
fields = ['id', 'slug', 'name', 'start', 'end', 'pre_start', 'post_end', 'addresses']
read_only_fields = ['id']
def to_internal_value(self, data):
data = data.copy()
addresses = data.pop('addresses', None)
dict = super().to_internal_value(data)
if addresses:
dict['addresses'] = [EventAddress.objects.get_or_create(address=x)[0] for x in addresses]
return dict
class ContainerSerializer(serializers.ModelSerializer):
itemCount = serializers.SerializerMethodField()
class Meta:
model = Container
fields = ['id', 'name', 'itemCount']
read_only_fields = ['id', 'itemCount']
def get_itemCount(self, instance):
return len(instance.items)
class ItemSerializer(BasicItemSerializer):
timeline = serializers.SerializerMethodField()
dataImage = serializers.CharField(write_only=True, required=False)
related_issues = BasicIssueSerializer(many=True, read_only=True)
class Meta:
model = Item
fields = ['cid', 'box', 'id', 'description', 'file', 'dataImage', 'returned', 'event', 'related_issues',
'timeline']
read_only_fields = ['id']
def to_internal_value(self, data):
container = None
returned = False
if 'cid' in data:
container = Container.objects.get(id=data['cid'])
if 'returned' in data:
returned = data['returned']
internal = super().to_internal_value(data)
if container:
internal['container'] = container
if returned:
internal['returned_at'] = timezone.now()
return internal
def validate(self, attrs):
if not 'container' in attrs and not self.partial:
raise serializers.ValidationError("This field cannot be empty.")
return super().validate(attrs)
def create(self, validated_data):
if 'dataImage' in validated_data:
file = File.objects.create(data=validated_data['dataImage'])
validated_data.pop('dataImage')
item = Item.objects.create(**validated_data)
item.files.set([file])
return item
return Item.objects.create(**validated_data)
def update(self, instance, validated_data):
if 'returned' in validated_data:
if validated_data['returned']:
validated_data['returned_at'] = timezone.now()
validated_data.pop('returned')
if 'dataImage' in validated_data:
file = File.objects.create(data=validated_data['dataImage'])
validated_data.pop('dataImage')
instance.files.add(file)
return super().update(instance, validated_data)
@staticmethod
def get_timeline(obj):
timeline = []
for comment in obj.comments.all():
timeline.append({
'type': 'comment',
'id': comment.id,
'timestamp': comment.timestamp,
'comment': comment.comment,
})
for relation in (obj.issue_relation_changes.all()):
timeline.append({
'type': 'issue_relation',
'id': relation.id,
'status': relation.status,
'timestamp': relation.timestamp,
'issue_thread': BasicIssueSerializer(relation.issue_thread).data,
})
for placement in (obj.container_history.all()):
timeline.append({
'type': 'placement',
'id': placement.id,
'timestamp': placement.timestamp,
'cid': placement.container.id,
'box': placement.container.name
})
return sorted(timeline, key=lambda x: x['timestamp'])
class SearchResultSerializer(serializers.Serializer):
search_score = serializers.IntegerField()
item = ItemSerializer()
def to_representation(self, instance):
return {**ItemSerializer(instance['item']).data, 'search_score': instance['search_score']}
class Meta:
model = Item

View file

@ -0,0 +1,31 @@
from rest_framework import serializers
from inventory.models import Event, Item
class BasicItemSerializer(serializers.ModelSerializer):
cid = serializers.SerializerMethodField()
box = serializers.SerializerMethodField()
file = serializers.SerializerMethodField()
returned = serializers.SerializerMethodField(required=False)
event = serializers.SlugRelatedField(slug_field='slug', queryset=Event.objects.all(),
allow_null=True, required=False)
class Meta:
model = Item
fields = ['cid', 'box', 'id', 'description', 'file', 'returned', 'event']
read_only_fields = ['id']
def get_cid(self, instance):
return instance.container.id if instance.container else None
def get_box(self, instance):
return instance.container.name if instance.container else None
def get_file(self, instance):
if len(instance.files.all()) > 0:
return sorted(instance.files.all(), key=lambda x: x.created_at, reverse=True)[0].hash
return None
def get_returned(self, instance):
return instance.returned_at is not None

View file

View file

View file

@ -0,0 +1,44 @@
from django.test import TestCase, Client
from django.contrib.auth.models import Permission
from knox.models import AuthToken
from authentication.models import ExtendedUser
class ApiTest(TestCase):
def setUp(self):
super().setUp()
self.user = ExtendedUser.objects.create_user('testuser', 'test', 'test')
self.user.user_permissions.add(*Permission.objects.all())
self.user.save()
self.token = AuthToken.objects.create(user=self.user)
self.client = Client(headers={'Authorization': 'Token ' + self.token[1]})
def test_root(self):
from core.settings import SYSTEM3_VERSION
response = self.client.get('/api/')
self.assertEqual(response.status_code, 200)
self.assertEqual(response.json()["framework_version"], SYSTEM3_VERSION)
def test_events(self):
response = self.client.get('/api/2/events/')
self.assertEqual(response.status_code, 200)
self.assertEqual(response.json(), [])
def test_containers(self):
response = self.client.get('/api/2/boxes/')
self.assertEqual(response.status_code, 200)
self.assertEqual(response.json(), [])
def test_files(self):
response = self.client.get('/api/2/files/')
self.assertEqual(response.status_code, 200)
self.assertEqual(response.json(), [])
def test_items(self):
from inventory.models import Event
Event.objects.create(slug='TEST1', name='Event')
response = self.client.get('/api/2/TEST1/items/')
self.assertEqual(response.status_code, 200)
self.assertEqual(response.json(), [])

View file

@ -0,0 +1,66 @@
from django.test import TestCase, Client
from django.contrib.auth.models import Permission
from knox.models import AuthToken
from authentication.models import ExtendedUser
from inventory.models import Container
class ContainerTestCase(TestCase):
def setUp(self):
self.user = ExtendedUser.objects.create_user('testuser', 'test', 'test')
self.user.user_permissions.add(*Permission.objects.all())
self.token = AuthToken.objects.create(user=self.user)
self.client = Client(headers={'Authorization': 'Token ' + self.token[1]})
def test_empty(self):
response = self.client.get('/api/2/boxes/')
self.assertEqual(response.status_code, 200)
self.assertEqual(response.json(), [])
def test_members(self):
Container.objects.create(name='BOX')
response = self.client.get('/api/2/boxes/')
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.json()), 1)
self.assertEqual(response.json()[0]['id'], 1)
self.assertEqual(response.json()[0]['name'], 'BOX')
self.assertEqual(response.json()[0]['itemCount'], 0)
def test_multi_members(self):
Container.objects.create(name='BOX 1')
Container.objects.create(name='BOX 2')
Container.objects.create(name='BOX 3')
response = self.client.get('/api/2/boxes/')
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.json()), 3)
def test_create_container(self):
response = self.client.post('/api/2/box/', {'name': 'BOX'})
self.assertEqual(response.status_code, 201)
self.assertEqual(response.json()['id'], 1)
self.assertEqual(response.json()['name'], 'BOX')
self.assertEqual(response.json()['itemCount'], 0)
self.assertEqual(len(Container.objects.all()), 1)
self.assertEqual(Container.objects.all()[0].id, 1)
self.assertEqual(Container.objects.all()[0].name, 'BOX')
def test_update_container(self):
box = Container.objects.create(name='BOX 1')
response = self.client.put(f'/api/2/box/{box.id}/', {'name': 'BOX 2'}, content_type='application/json')
self.assertEqual(response.status_code, 200)
self.assertEqual(response.json()['id'], 1)
self.assertEqual(response.json()['name'], 'BOX 2')
self.assertEqual(response.json()['itemCount'], 0)
self.assertEqual(len(Container.objects.all()), 1)
self.assertEqual(Container.objects.all()[0].id, 1)
self.assertEqual(Container.objects.all()[0].name, 'BOX 2')
def test_delete_container(self):
box = Container.objects.create(name='BOX 1')
Container.objects.create(name='BOX 2')
self.assertEqual(len(Container.objects.all()), 2)
response = self.client.delete(f'/api/2/box/{box.id}/')
self.assertEqual(response.status_code, 204)
self.assertEqual(len(Container.objects.all()), 1)

View file

@ -0,0 +1,94 @@
from django.test import TestCase, Client
from inventory.models import Event
client = Client()
class EventTestCase(TestCase):
def test_empty(self):
response = client.get('/api/2/events/')
self.assertEqual(response.status_code, 200)
self.assertEqual(response.json(), [])
def test_members(self):
Event.objects.create(slug='EVENT', name='Event')
response = client.get('/api/2/events/')
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.json()), 1)
self.assertEqual(response.json()[0]['slug'], 'EVENT')
self.assertEqual(response.json()[0]['name'], 'Event')
def test_multi_members(self):
Event.objects.create(slug='EVENT1', name='Event 1')
Event.objects.create(slug='EVENT2', name='Event 2')
Event.objects.create(slug='EVENT3', name='Event 3')
response = client.get('/api/2/events/')
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.json()), 3)
def test_create_event(self):
response = client.post('/api/2/events/', {'slug': 'EVENT', 'name': 'Event'})
self.assertEqual(response.status_code, 201)
self.assertEqual(response.json()['slug'], 'EVENT')
self.assertEqual(response.json()['name'], 'Event')
self.assertEqual(len(Event.objects.all()), 1)
self.assertEqual(Event.objects.all()[0].slug, 'EVENT')
self.assertEqual(Event.objects.all()[0].name, 'Event')
def test_update_event(self):
from rest_framework.test import APIClient
event = Event.objects.create(slug='EVENT1', name='Event 1')
response = APIClient().put(f'/api/2/events/{event.id}/', {'slug': 'EVENT2', 'name': 'Event 2 new'})
self.assertEqual(response.status_code, 200)
self.assertEqual(response.json()['slug'], 'EVENT2')
self.assertEqual(response.json()['name'], 'Event 2 new')
self.assertEqual(len(Event.objects.all()), 1)
self.assertEqual(Event.objects.all()[0].slug, 'EVENT2')
self.assertEqual(Event.objects.all()[0].name, 'Event 2 new')
def test_update_event(self):
from rest_framework.test import APIClient
event = Event.objects.create(slug='EVENT1', name='Event 1')
response = APIClient().patch(f'/api/2/events/{event.id}/', {'addresses': ['foo@bar.baz', 'foo1@bar.baz']})
self.assertEqual(response.status_code, 200)
self.assertEqual(response.json()['slug'], 'EVENT1')
self.assertEqual(response.json()['name'], 'Event 1')
self.assertEqual(2, len(response.json()['addresses']))
self.assertEqual('foo@bar.baz', response.json()['addresses'][0])
self.assertEqual('foo1@bar.baz', response.json()['addresses'][1])
self.assertEqual(len(Event.objects.all()), 1)
self.assertEqual(Event.objects.all()[0].slug, 'EVENT1')
self.assertEqual(Event.objects.all()[0].name, 'Event 1')
def test_remove_event(self):
event = Event.objects.create(slug='EVENT1', name='Event 1')
Event.objects.create(slug='EVENT2', name='Event 2')
self.assertEqual(len(Event.objects.all()), 2)
response = client.delete(f'/api/2/events/{event.id}/')
self.assertEqual(response.status_code, 204)
self.assertEqual(len(Event.objects.all()), 1)
def test_event_with_address(self):
from mail.models import EventAddress
event1 = Event.objects.create(slug='TEST1', name='Event')
EventAddress.objects.create(event=event1, address='foo@bar.baz')
response = self.client.get('/api/2/events/')
self.assertEqual(response.status_code, 200)
self.assertEqual(1, len(response.json()))
self.assertEqual('TEST1', response.json()[0]['slug'])
self.assertEqual('Event', response.json()[0]['name'])
self.assertEqual(1, len(response.json()[0]['addresses']))
def test_items_remove_addresss(self):
from mail.models import EventAddress
from rest_framework.test import APIClient
event1 = Event.objects.create(slug='TEST1', name='Event')
EventAddress.objects.create(event=event1, address='foo@bar.baz')
EventAddress.objects.create(event=event1, address='fo1o@bar.baz')
response = APIClient().patch(f'/api/2/events/{event1.id}/', {'addresses': ['foo1@bar.baz']})
self.assertEqual(response.status_code, 200)
self.assertEqual('TEST1', response.json()['slug'])
self.assertEqual('Event', response.json()['name'])
self.assertEqual(1, len(response.json()['addresses']))
self.assertEqual('foo1@bar.baz', response.json()['addresses'][0])

View file

@ -0,0 +1,345 @@
from datetime import datetime, timedelta
from django.utils import timezone
from django.test import TestCase, Client
from django.contrib.auth.models import Permission
from knox.models import AuthToken
from authentication.models import ExtendedUser
from files.models import File
from inventory.models import Event, Container, Item, Comment, ItemPlacement
from base64 import b64encode
from tickets.models import IssueThread, ItemRelation
class ItemTestCase(TestCase):
def setUp(self):
super().setUp()
self.event = Event.objects.create(slug='EVENT', name='Event')
self.box1 = Container.objects.create(name='BOX1')
self.box2 = Container.objects.create(name='BOX2')
self.user = ExtendedUser.objects.create_user('testuser', 'test', 'test')
self.user.user_permissions.add(*Permission.objects.all())
self.token = AuthToken.objects.create(user=self.user)
self.client = Client(headers={'Authorization': 'Token ' + self.token[1]})
self.issue = IssueThread.objects.create(
name="test issue",
event=self.event,
)
def test_empty(self):
response = self.client.get(f'/api/2/{self.event.slug}/item/')
self.assertEqual(response.status_code, 200)
self.assertEqual(response.content, b'[]')
def test_members_and_timeline(self):
now = datetime.now()
item = Item.objects.create(container=self.box1, event=self.event, description='1')
comment = Comment.objects.create(
item=item,
comment="test",
timestamp=now + timedelta(seconds=3),
)
match = ItemRelation.objects.create(
issue_thread=self.issue,
item=item,
timestamp=now + timedelta(seconds=4),
)
placement = ItemPlacement.objects.create(
container=self.box2,
item=item,
timestamp=now + timedelta(seconds=5),
)
response = self.client.get(f'/api/2/{self.event.slug}/item/')
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.json()), 1)
self.assertEqual(response.json()[0]['id'], item.id)
self.assertEqual(response.json()[0]['description'], '1')
self.assertEqual(response.json()[0]['box'], 'BOX2')
self.assertEqual(response.json()[0]['cid'], self.box2.id)
self.assertEqual(response.json()[0]['file'], None)
self.assertEqual(response.json()[0]['returned'], False)
self.assertEqual(response.json()[0]['event'], self.event.slug)
self.assertEqual(len(response.json()[0]['timeline']), 4)
self.assertEqual(response.json()[0]['timeline'][0]['type'], 'placement')
self.assertEqual(response.json()[0]['timeline'][1]['type'], 'comment')
self.assertEqual(response.json()[0]['timeline'][2]['type'], 'issue_relation')
self.assertEqual(response.json()[0]['timeline'][3]['type'], 'placement')
self.assertEqual(response.json()[0]['timeline'][1]['id'], comment.id)
self.assertEqual(response.json()[0]['timeline'][2]['id'], match.id)
self.assertEqual(response.json()[0]['timeline'][3]['id'], placement.id)
self.assertEqual(response.json()[0]['timeline'][0]['box'], 'BOX1')
self.assertEqual(response.json()[0]['timeline'][0]['cid'], self.box1.id)
self.assertEqual(response.json()[0]['timeline'][1]['comment'], 'test')
self.assertEqual(response.json()[0]['timeline'][1]['timestamp'],
comment.timestamp.strftime('%Y-%m-%dT%H:%M:%S.%fZ'))
self.assertEqual(response.json()[0]['timeline'][2]['status'], 'possible')
self.assertEqual(response.json()[0]['timeline'][2]['timestamp'],
match.timestamp.strftime('%Y-%m-%dT%H:%M:%S.%fZ'))
self.assertEqual(response.json()[0]['timeline'][2]['issue_thread']['name'], "test issue")
self.assertEqual(response.json()[0]['timeline'][2]['issue_thread']['event'], "EVENT")
self.assertEqual(response.json()[0]['timeline'][2]['issue_thread']['state'], "pending_new")
self.assertEqual(response.json()[0]['timeline'][3]['box'], 'BOX2')
self.assertEqual(response.json()[0]['timeline'][3]['cid'], self.box2.id)
self.assertEqual(response.json()[0]['timeline'][3]['timestamp'],
placement.timestamp.strftime('%Y-%m-%dT%H:%M:%S.%fZ'))
self.assertEqual(len(response.json()[0]['related_issues']), 1)
self.assertEqual(response.json()[0]['related_issues'][0]['name'], "test issue")
self.assertEqual(response.json()[0]['related_issues'][0]['event'], "EVENT")
self.assertEqual(response.json()[0]['related_issues'][0]['state'], "pending_new")
def test_members_with_file(self):
import base64
item = Item.objects.create(container=self.box1, event=self.event, description='1')
file = File.objects.create(item=item, data="data:text/plain;base64," + base64.b64encode(b"foo").decode('utf-8'))
response = self.client.get(f'/api/2/{self.event.slug}/item/')
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.json()), 1)
self.assertEqual(response.json()[0]['id'], item.id)
self.assertEqual(response.json()[0]['description'], '1')
self.assertEqual(response.json()[0]['box'], 'BOX1')
self.assertEqual(response.json()[0]['cid'], self.box1.id)
self.assertEqual(response.json()[0]['file'], file.hash)
self.assertEqual(response.json()[0]['returned'], False)
self.assertEqual(response.json()[0]['event'], self.event.slug)
self.assertEqual(len(response.json()[0]['related_issues']), 0)
def test_members_with_two_file(self):
import base64
item = Item.objects.create(container=self.box1, event=self.event, description='1')
file1 = File.objects.create(item=item,
data="data:text/plain;base64," + base64.b64encode(b"foo").decode('utf-8'))
file2 = File.objects.create(item=item,
data="data:text/plain;base64," + base64.b64encode(b"bar").decode('utf-8'))
response = self.client.get(f'/api/2/{self.event.slug}/item/')
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.json()), 1)
self.assertEqual(response.json()[0]['id'], item.id)
self.assertEqual(response.json()[0]['description'], '1')
self.assertEqual(response.json()[0]['box'], 'BOX1')
self.assertEqual(response.json()[0]['cid'], self.box1.id)
self.assertEqual(response.json()[0]['file'], file2.hash)
self.assertEqual(response.json()[0]['returned'], False)
self.assertEqual(response.json()[0]['event'], self.event.slug)
self.assertEqual(len(response.json()[0]['related_issues']), 0)
def test_multi_members(self):
Item.objects.create(container=self.box1, event=self.event, description='1')
Item.objects.create(container=self.box1, event=self.event, description='2')
Item.objects.create(container=self.box1, event=self.event, description='3')
response = self.client.get(f'/api/2/{self.event.slug}/item/')
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.json()), 3)
def test_create_item(self):
response = self.client.post(f'/api/2/{self.event.slug}/item/', {'cid': self.box1.id, 'description': '1'})
self.assertEqual(response.status_code, 201)
self.assertEqual(response.json()['id'], 1)
self.assertEqual(response.json()['description'], '1')
self.assertEqual(response.json()['box'], 'BOX1')
self.assertEqual(response.json()['cid'], self.box1.id)
self.assertEqual(response.json()['file'], None)
self.assertEqual(response.json()['returned'], False)
self.assertEqual(response.json()['event'], self.event.slug)
self.assertEqual(len(response.json()['related_issues']), 0)
self.assertEqual(len(Item.objects.all()), 1)
self.assertEqual(Item.objects.all()[0].id, 1)
self.assertEqual(Item.objects.all()[0].description, '1')
self.assertEqual(Item.objects.all()[0].container.id, self.box1.id)
def test_create_item_without_container(self):
response = self.client.post(f'/api/2/{self.event.slug}/item/', {'description': '1'})
self.assertEqual(response.status_code, 400)
def test_create_item_without_description(self):
response = self.client.post(f'/api/2/{self.event.slug}/item/', {'cid': self.box1.id})
self.assertEqual(response.status_code, 400)
def test_create_item_with_file(self):
import base64
response = self.client.post(f'/api/2/{self.event.slug}/item/',
{'cid': self.box1.id, 'description': '1',
'dataImage': "data:text/plain;base64," + base64.b64encode(b"foo").decode(
'utf-8')}, content_type='application/json')
self.assertEqual(response.status_code, 201)
self.assertEqual(response.json()['id'], 1)
self.assertEqual(response.json()['description'], '1')
self.assertEqual(response.json()['box'], 'BOX1')
self.assertEqual(response.json()['id'], self.box1.id)
self.assertEqual(len(response.json()['file']), 64)
self.assertEqual(len(Item.objects.all()), 1)
self.assertEqual(Item.objects.all()[0].id, 1)
self.assertEqual(Item.objects.all()[0].description, '1')
self.assertEqual(Item.objects.all()[0].container.id, self.box1.id)
self.assertEqual(len(File.objects.all()), 1)
def test_update_item(self):
item = Item.objects.create(container=self.box1, event=self.event, description='1')
response = self.client.patch(f'/api/2/{self.event.slug}/item/{item.id}/', {'description': '2'},
content_type='application/json')
self.assertEqual(response.status_code, 200)
self.assertEqual(response.json()['id'], item.id)
self.assertEqual(response.json()['description'], '2')
self.assertEqual(response.json()['box'], 'BOX1')
self.assertEqual(response.json()['cid'], self.box1.id)
self.assertEqual(response.json()['file'], None)
self.assertEqual(response.json()['returned'], False)
self.assertEqual(response.json()['event'], self.event.slug)
self.assertEqual(len(response.json()['related_issues']), 0)
self.assertEqual(len(Item.objects.all()), 1)
self.assertEqual(Item.objects.all()[0].id, 1)
self.assertEqual(Item.objects.all()[0].description, '2')
self.assertEqual(Item.objects.all()[0].container.id, self.box1.id)
def test_update_item_with_file(self):
import base64
item = Item.objects.create(container=self.box1, event=self.event, description='1')
response = self.client.patch(f'/api/2/{self.event.slug}/item/{item.id}/',
{'description': '2',
'dataImage': "data:text/plain;base64," + base64.b64encode(b"foo").decode(
'utf-8')},
content_type='application/json')
self.assertEqual(response.status_code, 200)
self.assertEqual(response.json()['id'], 1)
self.assertEqual(response.json()['description'], '2')
self.assertEqual(response.json()['box'], 'BOX1')
self.assertEqual(response.json()['id'], self.box1.id)
self.assertEqual(len(response.json()['file']), 64)
self.assertEqual(len(Item.objects.all()), 1)
self.assertEqual(Item.objects.all()[0].id, 1)
self.assertEqual(Item.objects.all()[0].description, '2')
self.assertEqual(Item.objects.all()[0].container.id, self.box1.id)
self.assertEqual(len(File.objects.all()), 1)
def test_delete_item(self):
item = Item.objects.create(container=self.box1, event=self.event, description='1')
Item.objects.create(container=self.box1, event=self.event, description='2')
self.assertEqual(len(Item.objects.all()), 2)
response = self.client.delete(f'/api/2/{self.event.slug}/item/{item.id}/')
self.assertEqual(response.status_code, 204)
self.assertEqual(len(Item.objects.all()), 1)
def test_delete_item2(self):
Item.objects.create(container=self.box1, event=self.event, description='1')
item2 = Item.objects.create(container=self.box1, event=self.event, description='2')
self.assertEqual(len(Item.objects.all()), 2)
response = self.client.delete(f'/api/2/{self.event.slug}/item/{item2.id}/')
self.assertEqual(response.status_code, 204)
self.assertEqual(len(Item.objects.all()), 1)
item3 = Item.objects.create(container=self.box1, event=self.event, description='3')
self.assertEqual(item3.id, 3)
self.assertEqual(len(Item.objects.all()), 2)
def test_item_count(self):
Item.objects.create(container=self.box1, event=self.event, description='1')
Item.objects.create(container=self.box1, event=self.event, description='2')
response = self.client.get('/api/2/boxes/')
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.json()), 2)
self.assertEqual(response.json()[0]['itemCount'], 2)
self.assertEqual(response.json()[1]['itemCount'], 0)
def test_item_nonexistent(self):
response = self.client.get(f'/api/2/NOEVENT/item/')
self.assertEqual(response.status_code, 404)
def test_item_return(self):
item = Item.objects.create(container=self.box1, event=self.event, description='1')
self.assertEqual(item.returned_at, None)
response = self.client.get(f'/api/2/{self.event.slug}/item/')
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.json()), 1)
response = self.client.patch(f'/api/2/{self.event.slug}/item/{item.id}/', {'returned': True},
content_type='application/json')
self.assertEqual(response.status_code, 200)
item.refresh_from_db()
self.assertNotEqual(item.returned_at, None)
response = self.client.get(f'/api/2/{self.event.slug}/item/')
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.json()), 0)
def test_item_show_not_returned(self):
item1 = Item.objects.create(container=self.box1, event=self.event, description='1')
item2 = Item.objects.create(container=self.box1, event=self.event, description='2')
response = self.client.get(f'/api/2/{self.event.slug}/item/')
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.json()), 2)
item2.returned_at = timezone.now()
item2.save()
response = self.client.get(f'/api/2/{self.event.slug}/item/')
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.json()), 1)
self.assertEqual(response.json()[0]['id'], item1.id)
class ItemSearchTestCase(TestCase):
def setUp(self):
super().setUp()
self.event = Event.objects.create(slug='EVENT', name='Event')
self.box1 = Container.objects.create(name='BOX1')
self.user = ExtendedUser.objects.create_user('testuser', 'test', 'test')
self.user.user_permissions.add(*Permission.objects.all())
self.token = AuthToken.objects.create(user=self.user)
self.client = Client(headers={'Authorization': 'Token ' + self.token[1]})
self.item1 = Item.objects.create(container=self.box1, event=self.event, description='abc def')
self.item2 = Item.objects.create(container=self.box1, event=self.event, description='def ghi')
self.item3 = Item.objects.create(container=self.box1, event=self.event, description='jkl mno pqr')
self.item4 = Item.objects.create(container=self.box1, event=self.event, description='stu vwx')
def test_search(self):
search_query = b64encode(b'abc').decode('utf-8')
response = self.client.get(f'/api/2/{self.event.slug}/items/{search_query}/')
self.assertEqual(200, response.status_code)
self.assertEqual(1, len(response.json()))
self.assertEqual(self.item1.id, response.json()[0]['id'])
self.assertEqual('abc def', response.json()[0]['description'])
self.assertEqual('BOX1', response.json()[0]['box'])
self.assertEqual(self.box1.id, response.json()[0]['cid'])
self.assertEqual(1, response.json()[0]['search_score'])
def test_search2(self):
search_query = b64encode(b'def').decode('utf-8')
response = self.client.get(f'/api/2/{self.event.slug}/items/{search_query}/')
self.assertEqual(200, response.status_code)
self.assertEqual(2, len(response.json()))
self.assertEqual(self.item1.id, response.json()[0]['id'])
self.assertEqual('abc def', response.json()[0]['description'])
self.assertEqual('BOX1', response.json()[0]['box'])
self.assertEqual(self.box1.id, response.json()[0]['cid'])
self.assertEqual(1, response.json()[0]['search_score'])
self.assertEqual(self.item2.id, response.json()[1]['id'])
self.assertEqual('def ghi', response.json()[1]['description'])
self.assertEqual('BOX1', response.json()[1]['box'])
self.assertEqual(self.box1.id, response.json()[1]['cid'])
self.assertEqual(1, response.json()[0]['search_score'])
def test_search3(self):
search_query = b64encode(b'jkl').decode('utf-8')
response = self.client.get(f'/api/2/{self.event.slug}/items/{search_query}/')
self.assertEqual(200, response.status_code)
self.assertEqual(1, len(response.json()))
self.assertEqual(self.item3.id, response.json()[0]['id'])
self.assertEqual('jkl mno pqr', response.json()[0]['description'])
self.assertEqual('BOX1', response.json()[0]['box'])
self.assertEqual(self.box1.id, response.json()[0]['cid'])
self.assertEqual(1, response.json()[0]['search_score'])
def test_search4(self):
search_query = b64encode(b'abc def').decode('utf-8')
response = self.client.get(f'/api/2/{self.event.slug}/items/{search_query}/')
self.assertEqual(200, response.status_code)
self.assertEqual(2, len(response.json()))
self.assertEqual(self.item1.id, response.json()[0]['id'])
self.assertEqual('abc def', response.json()[0]['description'])
self.assertEqual('BOX1', response.json()[0]['box'])
self.assertEqual(self.box1.id, response.json()[0]['cid'])
self.assertEqual(2, response.json()[0]['search_score'])
self.assertEqual(self.item2.id, response.json()[1]['id'])
self.assertEqual('def ghi', response.json()[1]['description'])
self.assertEqual('BOX1', response.json()[1]['box'])
self.assertEqual(self.box1.id, response.json()[1]['cid'])
self.assertEqual(1, response.json()[1]['search_score'])

0
core/mail/__init__.py Normal file
View file

17
core/mail/admin.py Normal file
View file

@ -0,0 +1,17 @@
from django.contrib import admin
from mail.models import Email, EventAddress
class EmailAdmin(admin.ModelAdmin):
pass
admin.site.register(Email, EmailAdmin)
class EventAddressAdmin(admin.ModelAdmin):
pass
admin.site.register(EventAddress, EventAddressAdmin)

26
core/mail/api_v2.py Normal file
View file

@ -0,0 +1,26 @@
from rest_framework import routers, viewsets, serializers
from mail.models import Email, EmailAttachment
class AttachmentSerializer(serializers.ModelSerializer):
class Meta:
model = EmailAttachment
fields = ['hash', 'mime_type', 'name']
class EmailSerializer(serializers.ModelSerializer):
class Meta:
model = Email
fields = '__all__'
class EmailViewSet(viewsets.ModelViewSet):
serializer_class = EmailSerializer
queryset = Email.objects.all()
router = routers.SimpleRouter()
router.register(r'mails', EmailViewSet, basename='mails')
urlpatterns = router.urls

View file

@ -0,0 +1,46 @@
# Generated by Django 4.2.7 on 2023-12-09 02:13
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
initial = True
dependencies = [
('inventory', '0001_initial'),
('tickets', '0001_initial'),
]
operations = [
migrations.CreateModel(
name='EventAddress',
fields=[
('id', models.AutoField(primary_key=True, serialize=False)),
('address', models.CharField(max_length=255)),
('event', models.ForeignKey(null=True, on_delete=django.db.models.deletion.SET_NULL, to='inventory.event')),
],
),
migrations.CreateModel(
name='Email',
fields=[
('is_deleted', models.BooleanField(default=False)),
('deleted_at', models.DateTimeField(blank=True, null=True)),
('id', models.AutoField(primary_key=True, serialize=False)),
('timestamp', models.DateTimeField(auto_now_add=True)),
('body', models.TextField()),
('subject', models.CharField(max_length=255)),
('sender', models.CharField(max_length=255)),
('recipient', models.CharField(max_length=255)),
('reference', models.CharField(max_length=255, null=True, unique=True)),
('in_reply_to', models.CharField(max_length=255, null=True)),
('raw', models.TextField()),
('event', models.ForeignKey(null=True, on_delete=django.db.models.deletion.SET_NULL, to='inventory.event')),
('issue_thread', models.ForeignKey(null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='emails', to='tickets.issuethread')),
],
options={
'abstract': False,
},
),
]

View file

@ -0,0 +1,32 @@
# Generated by Django 4.2.7 on 2023-12-09 02:13
import quopri
from django.db import migrations
from mail.protocol import unescape_and_decode_quoted_printable, unescape_and_decode_base64
class Migration(migrations.Migration):
initial = True
dependencies = [
('mail', '0001_initial'),
]
def convert_printed_quotable(apps, schema_editor):
Email = apps.get_model('mail', 'Email')
for mail in Email.objects.all():
mail.body = unescape_and_decode_quoted_printable(mail.body)
mail.body = unescape_and_decode_base64(mail.body)
mail.subject = unescape_and_decode_quoted_printable(mail.subject)
mail.subject = unescape_and_decode_base64(mail.subject)
mail.save()
IssueThread = apps.get_model('tickets', 'IssueThread')
for issue in IssueThread.objects.all():
issue.name = unescape_and_decode_quoted_printable(issue.name)
issue.name = unescape_and_decode_base64(issue.name)
issue.save()
operations = [
migrations.RunPython(convert_printed_quotable),
]

View file

@ -0,0 +1,59 @@
# Generated by Django 4.2.7 on 2024-01-09 20:56
from django.db import migrations, models
import django.db.models.deletion
import files.models
from mail.protocol import parse_email_body
class NullLogger:
def info(self, *args, **kwargs):
pass
def warning(self, *args, **kwargs):
pass
def debug(self, *args, **kwargs):
pass
class Migration(migrations.Migration):
dependencies = [
('mail', '0002_printed_quotable'),
]
def generate_email_attachments(apps, schema_editor):
Email = apps.get_model('mail', 'Email')
for email in Email.objects.all():
raw = email.raw
if raw is None or raw == '':
continue
parsed, body, attachments = parse_email_body(raw.encode('utf-8'), NullLogger())
email.attachments.clear()
for attachment in attachments:
email.attachments.add(attachment)
email.body = body
email.save()
operations = [
migrations.CreateModel(
name='EmailAttachment',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('created_at', models.DateTimeField(blank=True, null=True)),
('updated_at', models.DateTimeField(blank=True, null=True)),
('deleted_at', models.DateTimeField(blank=True, null=True)),
('file', models.ImageField(upload_to=files.models.hash_upload)),
('mime_type', models.CharField(max_length=255)),
('hash', models.CharField(max_length=64, unique=True)),
('name', models.CharField(max_length=255)),
('email',
models.ForeignKey(null=True, on_delete=django.db.models.deletion.CASCADE, related_name='attachments',
to='mail.email')),
],
options={
'abstract': False,
},
),
migrations.RunPython(generate_email_attachments),
]

View file

@ -0,0 +1,19 @@
# Generated by Django 4.2.7 on 2024-01-10 19:04
from django.db import migrations, models
import files.models
class Migration(migrations.Migration):
dependencies = [
('mail', '0003_emailattachment'),
]
operations = [
migrations.AlterField(
model_name='emailattachment',
name='file',
field=models.FileField(upload_to=files.models.hash_upload),
),
]

View file

@ -0,0 +1,20 @@
# Generated by Django 4.2.7 on 2024-11-03 18:30
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
dependencies = [
('inventory', '0004_alter_event_created_at_alter_item_created_at'),
('mail', '0004_alter_emailattachment_file'),
]
operations = [
migrations.AlterField(
model_name='eventaddress',
name='event',
field=models.ForeignKey(null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='addresses', to='inventory.event'),
),
]

View file

@ -0,0 +1,36 @@
# Generated by Django 4.2.7 on 2024-11-08 20:37
from django.core.files.base import ContentFile
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('mail', '0005_alter_eventaddress_event'),
]
def move_raw_mails_to_file(apps, schema_editor):
Email = apps.get_model('mail', 'Email')
for email in Email.objects.all():
raw_content = email.raw
path = "mail_{}".format(email.id)
if len(raw_content):
email.raw_file.save(path, ContentFile(raw_content))
email.save()
operations = [
migrations.AddField(
model_name='email',
name='raw_file',
field=models.FileField(null=True, upload_to='raw_mail/'),
),
migrations.RunPython(move_raw_mails_to_file),
migrations.RemoveField(
model_name='email',
name='raw',
),
migrations.AlterField(
model_name='email',
name='raw_file',
field=models.FileField(upload_to='raw_mail/'),
),
]

View file

52
core/mail/models.py Normal file
View file

@ -0,0 +1,52 @@
import random
from django.db import models
from django_softdelete.models import SoftDeleteModel
from core.settings import MAIL_DOMAIN, ACTIVE_SPAM_TRAINING
from files.models import AbstractFile
from inventory.models import Event
from tickets.models import IssueThread
class Email(SoftDeleteModel):
id = models.AutoField(primary_key=True)
timestamp = models.DateTimeField(auto_now_add=True)
body = models.TextField()
subject = models.CharField(max_length=255)
sender = models.CharField(max_length=255)
recipient = models.CharField(max_length=255)
reference = models.CharField(max_length=255, null=True, unique=True)
in_reply_to = models.CharField(max_length=255, null=True)
raw_file = models.FileField(upload_to='raw_mail/')
issue_thread = models.ForeignKey(IssueThread, models.SET_NULL, null=True, related_name='emails')
event = models.ForeignKey(Event, models.SET_NULL, null=True)
def save(self, *args, **kwargs):
super().save(*args, **kwargs)
if not self.reference:
self.reference = f'<{random.randint(0, 1000000000):09}@{MAIL_DOMAIN}>'
self.save()
def train_spam(self):
if ACTIVE_SPAM_TRAINING and self.raw_file.path:
import subprocess
path = self.raw_file.path
subprocess.run(["rspamc", "learn_spam", path])
def train_ham(self):
if ACTIVE_SPAM_TRAINING and self.raw_file.path:
import subprocess
path = self.raw_file.path
subprocess.run(["rspamc", "learn_ham", path])
class EventAddress(models.Model):
id = models.AutoField(primary_key=True)
event = models.ForeignKey(Event, models.SET_NULL, null=True, related_name='addresses')
address = models.CharField(max_length=255)
class EmailAttachment(AbstractFile):
email = models.ForeignKey(Email, models.CASCADE, related_name='attachments', null=True)
name = models.CharField(max_length=255)

295
core/mail/protocol.py Normal file
View file

@ -0,0 +1,295 @@
import logging
from re import match
import aiosmtplib
from channels.layers import get_channel_layer
from channels.db import database_sync_to_async
from django.core.files.base import ContentFile
from mail.models import Email, EventAddress, EmailAttachment
from notify_sessions.models import SystemEvent
from tickets.models import IssueThread
class SpecialMailException(Exception):
pass
def find_quoted_printable(s, marker):
positions = [i for i in range(len(s)) if s.lower().startswith('=?utf-8?' + marker + '?', i)]
for pos in positions:
end = s.find('?=', pos + 10)
if end == -1:
continue
yield pos, end + 3
def unescape_and_decode_quoted_printable(s):
import quopri
decoded = ''
last_end = 0
for start, end in find_quoted_printable(s, 'q'):
decoded += s[last_end:start]
decoded += quopri.decodestring(s[start + 10:end - 3]).decode('utf-8')
last_end = end
decoded += s[last_end:]
return decoded
def unescape_and_decode_base64(s):
import base64
decoded = ''
last_end = 0
for start, end in find_quoted_printable(s, 'b'):
decoded += s[last_end:start]
decoded += base64.b64decode(s[start + 10:end - 3]).decode('utf-8')
last_end = end
decoded += s[last_end:]
return decoded
def unescape_simplified_quoted_printable(s):
import quopri
return quopri.decodestring(s).decode('utf-8')
def collect_references(issue_thread):
mails = issue_thread.emails.order_by('timestamp')
references = []
for mail in mails:
if mail.reference:
references.append(mail.reference)
return references
def make_reply(reply_email, references=None, event=None):
from email.message import EmailMessage
from core.settings import MAIL_DOMAIN
event = event or "mail"
reply = EmailMessage()
reply["From"] = reply_email.sender
reply["To"] = reply_email.recipient
reply["Subject"] = reply_email.subject
reply["Reply-To"] = f"{event}@{MAIL_DOMAIN}"
if reply_email.in_reply_to:
reply["In-Reply-To"] = reply_email.in_reply_to
if reply_email.reference:
reply["Message-ID"] = reply_email.reference
else:
reply["Message-ID"] = reply_email.id + "@" + MAIL_DOMAIN
reply_email.reference = reply["Message-ID"]
reply_email.save()
if references:
reply["References"] = " ".join(references)
reply.set_content(reply_email.body)
return reply
async def send_smtp(message):
await aiosmtplib.send(message, hostname="127.0.0.1", port=25, use_tls=False, start_tls=False)
def find_active_issue_thread(in_reply_to, address, subject, event):
from re import match
uuid_match = match(r'^ticket\+([a-f0-9-]{36})@', address)
if uuid_match:
issue = IssueThread.objects.filter(uuid=uuid_match.group(1))
if issue.exists():
return issue.first(), False
reply_to = Email.objects.filter(reference=in_reply_to)
if reply_to.exists():
return reply_to.first().issue_thread, False
else:
issue = IssueThread.objects.create(name=subject, event=event)
return issue, True
def find_target_event(address):
try:
address_map = EventAddress.objects.get(address=address)
if address_map.event:
return address_map.event
except EventAddress.DoesNotExist:
pass
return None
def parse_email_body(raw, log=None):
import email
from hashlib import sha256
attachments = []
parsed = email.message_from_bytes(raw)
body = ""
if parsed.is_multipart():
for part in parsed.walk():
ctype = part.get_content_type()
cdispo = str(part.get('Content-Disposition'))
# skip any text/plain (txt) attachments
if ctype == 'text/plain' and 'attachment' not in cdispo:
segment = part.get_payload()
if not segment:
continue
segment = unescape_and_decode_quoted_printable(segment)
segment = unescape_and_decode_base64(segment)
if part.get('Content-Transfer-Encoding') == 'quoted-printable':
segment = unescape_simplified_quoted_printable(segment)
log.debug(segment)
body = body + segment
elif 'attachment' in cdispo or 'inline' in cdispo:
file = ContentFile(part.get_payload(decode=True))
chash = sha256(file.read()).hexdigest()
name = part.get_filename()
if name is None:
name = "unnamed"
attachment, _ = EmailAttachment.objects.get_or_create(
name=name, mime_type=ctype, file=file, hash=chash)
attachment.save()
attachments.append(attachment)
if 'inline' in cdispo:
body = body + f'<img src="cid:{attachment.id}">'
log.info("Image %s %s", ctype, attachment.id)
else:
log.info("Attachment %s %s", ctype, cdispo)
else:
if parsed.get_content_type() == 'text/plain':
body = parsed.get_payload()
elif parsed.get_content_type() == 'text/html':
from bs4 import BeautifulSoup
import re
body = parsed.get_payload()
soup = BeautifulSoup(body, 'html.parser')
body = re.sub(r'([\r\n]+.?)*[\r\n]', r'\n', soup.get_text()).strip('\n')
else:
log.warning("Unknown content type %s", parsed.get_content_type())
body = "Unknown content type"
body = unescape_and_decode_quoted_printable(body)
body = unescape_and_decode_base64(body)
if parsed.get('Content-Transfer-Encoding') == 'quoted-printable':
body = unescape_simplified_quoted_printable(body)
log.debug(body)
return parsed, body, attachments
@database_sync_to_async
def receive_email(envelope, log=None):
parsed, body, attachments = parse_email_body(envelope.content, log)
header_from = parsed.get('From')
header_to = parsed.get('To')
header_in_reply_to = parsed.get('In-Reply-To')
header_message_id = parsed.get('Message-ID')
if match(r'^([a-zA-Z ]*<)?MAILER-DAEMON@', header_from) and envelope.mail_from.strip("<>") == "":
log.warning("Ignoring mailer daemon")
raise SpecialMailException("Ignoring mailer daemon")
if Email.objects.filter(reference=header_message_id).exists(): # break before issue thread is created
log.warning("Email already exists")
raise Exception("Email already exists")
recipient = envelope.rcpt_tos[0].lower() if envelope.rcpt_tos else header_to.lower()
sender = envelope.mail_from if envelope.mail_from else header_from
subject = parsed.get('Subject')
if not subject:
subject = "No subject"
subject = unescape_and_decode_quoted_printable(subject)
subject = unescape_and_decode_base64(subject)
target_event = find_target_event(recipient)
active_issue_thread, new = find_active_issue_thread(header_in_reply_to, recipient, subject, target_event)
from hashlib import sha256
random_filename = 'mail-' + sha256(envelope.content).hexdigest()
email = Email.objects.create(
sender=sender, recipient=recipient, body=body, subject=subject, reference=header_message_id,
in_reply_to=header_in_reply_to, raw_file=ContentFile(envelope.content, name=random_filename), event=target_event,
issue_thread=active_issue_thread)
for attachment in attachments:
email.attachments.add(attachment)
email.save()
reply = None
if new:
# auto reply if new issue
references = collect_references(active_issue_thread)
if not sender.startswith('noreply'):
subject = f"Re: {subject} [#{active_issue_thread.short_uuid()}]"
body = '''Your request (#{}) has been received and will be reviewed by our lost&found angels.
We are reviewing incoming requests during the event and teardown. Immediately after the event, expect a delay as the \
workload is high. We will not forget about your request and get back in touch once we have updated information on your \
request. Requests for devices, wallets, credit cards or similar items will be handled with priority.
If you happen to find your lost item or just want to add additional information, please reply to this email. Please \
do not create a new request.
Your c3lf (Cloakroom + Lost&Found) Team'''.format(active_issue_thread.short_uuid())
reply_email = Email.objects.create(
sender=recipient, recipient=sender, body=body, subject=subject,
in_reply_to=header_message_id, event=target_event, issue_thread=active_issue_thread)
reply = make_reply(reply_email, references, event=target_event.slug if target_event else None)
else:
# change state if not new
if active_issue_thread.state != 'pending_new':
active_issue_thread.state = 'pending_open'
active_issue_thread.save()
return email, new, reply, active_issue_thread
class LMTPHandler:
async def handle_RCPT(self, server, session, envelope, address, rcpt_options):
from core.settings import MAIL_DOMAIN
address = address.lower()
if not address.endswith('@' + MAIL_DOMAIN):
return '550 not relaying to that domain'
envelope.rcpt_tos.append(address)
return '250 OK'
async def handle_DATA(self, server, session, envelope):
log = logging.getLogger('mail.log')
log.setLevel(logging.DEBUG)
log.info('Message from %s' % envelope.mail_from)
log.info('Message for %s' % envelope.rcpt_tos)
log.info('Message data:\n')
content = None
try:
content = envelope.content
email, new, reply, thread = await receive_email(envelope, log)
log.info(f"Created email {email.id}")
systemevent = await database_sync_to_async(SystemEvent.objects.create)(type='email received',
reference=email.id)
log.info(f"Created system event {systemevent.id}")
channel_layer = get_channel_layer()
await channel_layer.group_send(
'general', {"type": "generic.event", "name": "send_message_to_frontend", "event_id": systemevent.id,
"message": "email received"})
log.info(f"Sent message to frontend")
if new and reply:
log.info('Sending message to %s' % reply['To'])
await send_smtp(reply)
log.info("Sent auto reply")
return '250 Message accepted for delivery'
except SpecialMailException as e:
import uuid
random_filename = 'special-' + str(uuid.uuid4())
with open(random_filename, 'wb') as f:
f.write(content)
log.warning(f"Special mail exception: {e} saved to {random_filename}")
return '250 Message accepted for delivery'
except Exception as e:
from hashlib import sha256
random_filename = 'mail-' + sha256(content).hexdigest()
with open(random_filename, 'wb') as f:
f.write(content)
log.error(f"Saved email to {random_filename} because of error %s (%s)", e, type(e))
return '451 Internal server error'

31
core/mail/socket.py Normal file
View file

@ -0,0 +1,31 @@
from abc import ABCMeta
from aiosmtpd.controller import BaseController, UnixSocketMixin
from aiosmtpd.lmtp import LMTP
class BaseAsyncController(BaseController, metaclass=ABCMeta):
def __init__(
self,
handler,
loop,
**SMTP_parameters,
):
super().__init__(
handler,
loop,
**SMTP_parameters,
)
def serve(self):
return self._create_server()
class UnixSocketLMTPController(UnixSocketMixin, BaseAsyncController):
def factory(self):
return LMTP(self.handler)
def _trigger_server(self): # pragma: no-unixsock
# Prevent confusion on which _trigger_server() to invoke.
# Or so LGTM.com claimed
UnixSocketMixin._trigger_server(self)

View file

View file

View file

@ -0,0 +1,941 @@
import inspect
from unittest import mock
from django.test import TestCase, Client
from django.contrib.auth.models import Permission
from knox.models import AuthToken
from authentication.models import ExtendedUser
from core.settings import MAIL_DOMAIN
from inventory.models import Event
from mail.models import Email, EventAddress, EmailAttachment
from mail.protocol import LMTPHandler
from tickets.models import IssueThread, StateChange
expected_auto_reply_subject = 'Re: {} [#{}]'
expected_auto_reply = '''Your request (#{}) has been received and will be reviewed by our lost&found angels.
We are reviewing incoming requests during the event and teardown. Immediately after the event, expect a delay as the \
workload is high. We will not forget about your request and get back in touch once we have updated information on your \
request. Requests for devices, wallets, credit cards or similar items will be handled with priority.
If you happen to find your lost item or just want to add additional information, please reply to this email. Please \
do not create a new request.
Your c3lf (Cloakroom + Lost&Found) Team'''
def make_mocked_coro(return_value=mock.sentinel, raise_exception=mock.sentinel):
async def mock_coro(*args, **kwargs):
if raise_exception is not mock.sentinel:
raise raise_exception
if not inspect.isawaitable(return_value):
return return_value
await return_value
return mock.Mock(wraps=mock_coro)
class EmailsApiTest(TestCase):
def setUp(self):
super().setUp()
self.user = ExtendedUser.objects.create_user('testuser', 'test', 'test')
self.user.user_permissions.add(*Permission.objects.all())
self.user.save()
self.token = AuthToken.objects.create(user=self.user)
self.client = Client(headers={'Authorization': 'Token ' + self.token[1]})
def test_mails(self):
Event.objects.get_or_create(
name="Test event",
slug="test-event",
)
Email.objects.create(
subject='test',
body='test',
sender='test',
recipient='test',
)
response = self.client.get('/api/2/mails/')
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.json()), 1)
self.assertEqual(response.json()[0]['subject'], 'test')
self.assertEqual(response.json()[0]['body'], 'test')
self.assertEqual(response.json()[0]['sender'], 'test')
self.assertEqual(response.json()[0]['recipient'], 'test')
def test_mails_empty(self):
response = self.client.get('/api/2/mails/')
self.assertEqual(response.status_code, 200)
self.assertEqual(response.json(), [])
class LMTPHandlerTestCase(TestCase): # TODO replace with less hacky test
def setUp(self):
super().setUp()
self.user = ExtendedUser.objects.create_user('testuser', 'test', 'test')
self.user.user_permissions.add(*Permission.objects.all())
self.user.save()
self.token = AuthToken.objects.create(user=self.user)
self.client = Client(headers={'Authorization': 'Token ' + self.token[1]})
def test_handle_client(self):
from aiosmtpd.smtp import Envelope
from asgiref.sync import async_to_sync
import aiosmtplib
aiosmtplib.send = make_mocked_coro()
handler = LMTPHandler()
server = mock.Mock()
session = mock.Mock()
envelope = Envelope()
envelope.mail_from = 'test1@test'
envelope.rcpt_tos = ['test2@localhost']
envelope.content = b'Subject: test\nFrom: test3@test\nTo: test4@localhost\nMessage-ID: <1@test>\n\ntest'
result = async_to_sync(handler.handle_DATA)(server, session, envelope)
self.assertEqual(result, '250 Message accepted for delivery')
self.assertEqual(len(Email.objects.all()), 2)
self.assertEqual(len(IssueThread.objects.all()), 1)
aiosmtplib.send.assert_called_once()
self.assertEqual('test', Email.objects.all()[0].subject)
self.assertEqual('test1@test', Email.objects.all()[0].sender)
self.assertEqual('test2@localhost', Email.objects.all()[0].recipient)
self.assertEqual('test', Email.objects.all()[0].body)
self.assertEqual(IssueThread.objects.all()[0], Email.objects.all()[0].issue_thread)
self.assertEqual('<1@test>', Email.objects.all()[0].reference)
self.assertEqual(None, Email.objects.all()[0].in_reply_to)
self.assertEqual(expected_auto_reply_subject.format('test', IssueThread.objects.all()[0].short_uuid()),
Email.objects.all()[1].subject)
self.assertEqual('test2@localhost', Email.objects.all()[1].sender)
self.assertEqual('test1@test', Email.objects.all()[1].recipient)
self.assertEqual(expected_auto_reply.format(IssueThread.objects.all()[0].short_uuid()),
Email.objects.all()[1].body)
self.assertEqual(IssueThread.objects.all()[0], Email.objects.all()[1].issue_thread)
self.assertTrue(Email.objects.all()[1].reference.startswith("<"))
self.assertTrue(Email.objects.all()[1].reference.endswith("@localhost>"))
self.assertEqual("<1@test>", Email.objects.all()[1].in_reply_to)
self.assertEqual('test', IssueThread.objects.all()[0].name)
self.assertEqual('pending_new', IssueThread.objects.all()[0].state)
self.assertEqual(None, IssueThread.objects.all()[0].assigned_to)
states = StateChange.objects.filter(issue_thread=IssueThread.objects.all()[0])
self.assertEqual(1, len(states))
self.assertEqual('pending_new', states[0].state)
def test_handle_quoted_printable(self):
from aiosmtpd.smtp import Envelope
from asgiref.sync import async_to_sync
import aiosmtplib
aiosmtplib.send = make_mocked_coro()
handler = LMTPHandler()
server = mock.Mock()
session = mock.Mock()
envelope = Envelope()
envelope.mail_from = 'test1@test'
envelope.rcpt_tos = ['test2@test']
envelope.content = b'Subject: test =?utf-8?Q?=C3=A4?=\nFrom: test3@test\nTo: test4@test\nMessage-ID: <1@test>\n\nText mit Quoted-Printable-Kodierung: =?utf-8?Q?=C3=A4=C3=B6=C3=BC=C3=9F?='
result = async_to_sync(handler.handle_DATA)(server, session, envelope)
self.assertEqual(result, '250 Message accepted for delivery')
self.assertEqual(len(Email.objects.all()), 2)
self.assertEqual(len(IssueThread.objects.all()), 1)
aiosmtplib.send.assert_called_once()
self.assertEqual('test ä', Email.objects.all()[0].subject)
self.assertEqual('Text mit Quoted-Printable-Kodierung: äöüß', Email.objects.all()[0].body)
self.assertTrue( Email.objects.all()[0].raw_file.path)
def test_handle_quoted_printable_2(self):
from aiosmtpd.smtp import Envelope
from asgiref.sync import async_to_sync
import aiosmtplib
aiosmtplib.send = make_mocked_coro()
handler = LMTPHandler()
server = mock.Mock()
session = mock.Mock()
envelope = Envelope()
envelope.mail_from = 'test1@test'
envelope.rcpt_tos = ['test2@test']
envelope.content = b'Subject: =?UTF-8?Q?suche_M=C3=BCtze?=\nFrom: test3@test\nTo: test4@test\nMessage-ID: <1@test>\n\nText mit Quoted-Printable-Kodierung: =?utf-8?Q?=C3=A4=C3=B6=C3=BC=C3=9F?='
result = async_to_sync(handler.handle_DATA)(server, session, envelope)
self.assertEqual(result, '250 Message accepted for delivery')
self.assertEqual(len(Email.objects.all()), 2)
self.assertEqual(len(IssueThread.objects.all()), 1)
aiosmtplib.send.assert_called_once()
self.assertEqual('suche_Mütze', Email.objects.all()[0].subject)
self.assertEqual('Text mit Quoted-Printable-Kodierung: äöüß', Email.objects.all()[0].body)
self.assertTrue( Email.objects.all()[0].raw_file.path)
def test_handle_base64(self):
from aiosmtpd.smtp import Envelope
from asgiref.sync import async_to_sync
import aiosmtplib
aiosmtplib.send = make_mocked_coro()
handler = LMTPHandler()
server = mock.Mock()
session = mock.Mock()
envelope = Envelope()
envelope.mail_from = 'test1@test'
envelope.rcpt_tos = ['test2@test']
envelope.content = b'Subject: =?utf-8?B?dGVzdA==?=\nFrom: test3@test\nTo: test4@test\nMessage-ID: <1@test>\n\nText mit Base64-Kodierung: =?utf-8?B?w6TDtsO8w58=?='
result = async_to_sync(handler.handle_DATA)(server, session, envelope)
self.assertEqual(result, '250 Message accepted for delivery')
self.assertEqual(len(Email.objects.all()), 2)
self.assertEqual(len(IssueThread.objects.all()), 1)
aiosmtplib.send.assert_called_once()
self.assertEqual('test', Email.objects.all()[0].subject)
self.assertEqual('Text mit Base64-Kodierung: äöüß', Email.objects.all()[0].body)
self.assertTrue( Email.objects.all()[0].raw_file.path)
def test_handle_client_reply(self):
issue_thread = IssueThread.objects.create(
name="test",
)
mail1 = Email.objects.create(
subject='test subject',
body='test',
sender='test1@test',
recipient='test2@test',
issue_thread=issue_thread,
)
mail1_reply = Email.objects.create(
subject='Message received',
body='Thank you for your message.',
sender='test2@test',
recipient='test1@test',
in_reply_to=mail1.reference,
issue_thread=issue_thread,
)
from aiosmtpd.smtp import Envelope
from asgiref.sync import async_to_sync
import aiosmtplib
aiosmtplib.send = make_mocked_coro()
handler = LMTPHandler()
server = mock.Mock()
session = mock.Mock()
envelope = Envelope()
envelope.mail_from = 'test1@test'
envelope.rcpt_tos = ['test2@test']
envelope.content = (f'Subject: Re: test\nFrom: test3@test\nTo: test4@test\nMessage-ID: <3@test>\n'
f'In-Reply-To: {mail1_reply.reference}'.encode('utf-8') + b'\n\ntest')
result = async_to_sync(handler.handle_DATA)(server, session, envelope)
self.assertEqual(result, '250 Message accepted for delivery')
self.assertEqual(len(Email.objects.all()), 3)
self.assertEqual(len(IssueThread.objects.all()), 1)
aiosmtplib.send.assert_not_called()
self.assertEqual(Email.objects.all()[2].subject, 'Re: test')
self.assertEqual(Email.objects.all()[2].sender, 'test1@test')
self.assertEqual(Email.objects.all()[2].recipient, 'test2@test')
self.assertEqual(Email.objects.all()[2].body, 'test')
self.assertEqual(Email.objects.all()[2].issue_thread, issue_thread)
self.assertEqual(Email.objects.all()[2].reference, '<3@test>')
self.assertEqual(Email.objects.all()[2].in_reply_to, mail1_reply.reference)
self.assertEqual(IssueThread.objects.all()[0].name, 'test')
self.assertEqual(IssueThread.objects.all()[0].state, 'pending_new')
self.assertEqual(IssueThread.objects.all()[0].assigned_to, None)
self.assertTrue( Email.objects.all()[2].raw_file.path)
def test_handle_client_reply_2(self):
issue_thread = IssueThread.objects.create(
name="test",
)
mail1 = Email.objects.create(
subject='test subject',
body='test',
sender='test1@test',
recipient='test2@test',
issue_thread=issue_thread,
)
mail1_reply = Email.objects.create(
subject='Message received',
body='Thank you for your message.',
sender='test2@test',
recipient='test1@test',
in_reply_to=mail1.reference,
issue_thread=issue_thread,
)
StateChange.objects.create(
issue_thread=issue_thread,
state='waiting_details',
)
self.assertEqual(IssueThread.objects.all()[0].state, 'waiting_details')
from aiosmtpd.smtp import Envelope
from asgiref.sync import async_to_sync
import aiosmtplib
aiosmtplib.send = make_mocked_coro()
handler = LMTPHandler()
server = mock.Mock()
session = mock.Mock()
envelope = Envelope()
envelope.mail_from = 'test1@test'
envelope.rcpt_tos = ['test2@test']
envelope.content = (f'Subject: Re: test\nFrom: test3@test\nTo: test4@test\nMessage-ID: <3@test>\n'
f'In-Reply-To: {mail1_reply.reference}'.encode('utf-8') + b'\n\ntest')
result = async_to_sync(handler.handle_DATA)(server, session, envelope)
self.assertEqual(result, '250 Message accepted for delivery')
self.assertEqual(len(Email.objects.all()), 3)
self.assertEqual(len(IssueThread.objects.all()), 1)
aiosmtplib.send.assert_not_called()
self.assertEqual(Email.objects.all()[2].subject, 'Re: test')
self.assertEqual(Email.objects.all()[2].sender, 'test1@test')
self.assertEqual(Email.objects.all()[2].recipient, 'test2@test')
self.assertEqual(Email.objects.all()[2].body, 'test')
self.assertEqual(Email.objects.all()[2].issue_thread, issue_thread)
self.assertEqual(Email.objects.all()[2].reference, '<3@test>')
self.assertEqual(Email.objects.all()[2].in_reply_to, mail1_reply.reference)
self.assertEqual(IssueThread.objects.all()[0].name, 'test')
self.assertEqual(IssueThread.objects.all()[0].state, 'pending_open')
self.assertEqual(IssueThread.objects.all()[0].assigned_to, None)
self.assertTrue( Email.objects.all()[2].raw_file.path)
def test_mail_reply(self):
issue_thread = IssueThread.objects.create(
name="test subject",
)
mail1 = Email.objects.create(
subject='test subject',
body='test',
sender='test1@test',
recipient='test2@localhost',
issue_thread=issue_thread,
)
mail1_reply = Email.objects.create(
subject='Message received',
body='Thank you for your message.',
sender='test2@localhost',
recipient='test1@test',
in_reply_to=mail1.reference,
issue_thread=issue_thread,
)
import aiosmtplib
aiosmtplib.send = make_mocked_coro()
response = self.client.post(f'/api/2/tickets/{issue_thread.id}/reply/', {
'message': 'test'
})
self.assertEqual(response.status_code, 201)
self.assertEqual(len(Email.objects.all()), 3)
self.assertEqual(len(IssueThread.objects.all()), 1)
aiosmtplib.send.assert_called_once()
self.assertEqual(Email.objects.all()[0].subject, 'test subject')
self.assertEqual(Email.objects.all()[0].sender, 'test1@test')
self.assertEqual(Email.objects.all()[0].recipient, 'test2@localhost')
self.assertEqual(Email.objects.all()[0].body, 'test')
self.assertEqual(Email.objects.all()[0].issue_thread, issue_thread)
self.assertEqual(Email.objects.all()[0].reference, mail1.reference)
self.assertEqual(Email.objects.all()[1].subject, 'Message received')
self.assertEqual(Email.objects.all()[1].sender, 'test2@localhost')
self.assertEqual(Email.objects.all()[1].recipient, 'test1@test')
self.assertEqual(Email.objects.all()[1].body, 'Thank you for your message.')
self.assertEqual(Email.objects.all()[1].issue_thread, issue_thread)
self.assertTrue(Email.objects.all()[1].reference.startswith("<"))
self.assertTrue(Email.objects.all()[1].reference.endswith("@localhost>"))
self.assertEqual(Email.objects.all()[1].in_reply_to, mail1.reference)
self.assertEqual(Email.objects.all()[2].subject, 'Re: test subject [#{0}]'.format(issue_thread.short_uuid()))
self.assertEqual(Email.objects.all()[2].sender, 'test2@localhost')
self.assertEqual(Email.objects.all()[2].recipient, 'test1@test')
self.assertEqual(Email.objects.all()[2].body, 'test')
self.assertEqual(Email.objects.all()[2].issue_thread, issue_thread)
self.assertTrue(Email.objects.all()[2].reference.startswith("<"))
self.assertTrue(Email.objects.all()[2].reference.endswith("@localhost>"))
self.assertEqual(Email.objects.all()[2].in_reply_to, mail1.reference)
def test_match_event(self):
event = Event.objects.create(
name="Test event",
slug="test-event",
)
event_address = EventAddress.objects.create(
event=event,
address="test_event@localhost",
)
from aiosmtpd.smtp import Envelope
from asgiref.sync import async_to_sync
import aiosmtplib
aiosmtplib.send = make_mocked_coro()
handler = LMTPHandler()
server = mock.Mock()
session = mock.Mock()
envelope = Envelope()
envelope.mail_from = 'test1@test'
envelope.rcpt_tos = ['test_event@localhost']
envelope.content = b'Subject: test\nFrom: test1@test\nTo: test_event@localhost\nMessage-ID: <1@test>\n\ntest'
result = async_to_sync(handler.handle_DATA)(server, session, envelope)
self.assertEqual(result, '250 Message accepted for delivery')
self.assertEqual(len(Email.objects.all()), 2)
self.assertEqual(len(IssueThread.objects.all()), 1)
aiosmtplib.send.assert_called_once()
self.assertEqual(event, Email.objects.all()[0].event)
self.assertEqual(event, Email.objects.all()[1].event)
self.assertEqual('test', Email.objects.all()[0].subject)
self.assertEqual(expected_auto_reply_subject.format('test', IssueThread.objects.all()[0].short_uuid()),
Email.objects.all()[1].subject)
self.assertEqual('test1@test', Email.objects.all()[0].sender)
self.assertEqual('test_event@localhost', Email.objects.all()[0].recipient)
self.assertEqual('test_event@localhost', Email.objects.all()[1].sender)
self.assertEqual('test1@test', Email.objects.all()[1].recipient)
self.assertEqual('test', Email.objects.all()[0].body)
self.assertEqual(expected_auto_reply.format(IssueThread.objects.all()[0].short_uuid()),
Email.objects.all()[1].body)
self.assertEqual(IssueThread.objects.all()[0], Email.objects.all()[0].issue_thread)
self.assertEqual(IssueThread.objects.all()[0], Email.objects.all()[1].issue_thread)
self.assertEqual('<1@test>', Email.objects.all()[0].reference)
self.assertEqual(None, Email.objects.all()[0].in_reply_to)
self.assertTrue(Email.objects.all()[1].reference.startswith("<"))
self.assertTrue(Email.objects.all()[1].reference.endswith("@localhost>"))
self.assertEqual("<1@test>", Email.objects.all()[1].in_reply_to)
self.assertEqual('test', IssueThread.objects.all()[0].name)
self.assertEqual(None, IssueThread.objects.all()[0].assigned_to)
self.assertEqual(1, len(IssueThread.objects.all()))
self.assertEqual('pending_new', IssueThread.objects.all()[0].state)
states = StateChange.objects.filter(issue_thread=IssueThread.objects.all()[0])
self.assertEqual(1, len(states))
self.assertEqual('pending_new', states[0].state)
self.assertEqual(event, IssueThread.objects.all()[0].event)
def test_mail_html_body(self):
from aiosmtpd.smtp import Envelope
from asgiref.sync import async_to_sync
import aiosmtplib
aiosmtplib.send = make_mocked_coro()
handler = LMTPHandler()
server = mock.Mock()
session = mock.Mock()
envelope = Envelope()
envelope.mail_from = 'test1@test'
envelope.rcpt_tos = ['test2@test']
envelope.content = b'''Subject: test
From: test1@test
To: test2@test
Message-ID: <1@test>
Content-Type: text/html; charset=utf-8
<div>
<div>
<p>test</p>
</div>
</div>'''
result = async_to_sync(handler.handle_DATA)(server, session, envelope)
self.assertEqual(result, '250 Message accepted for delivery')
self.assertEqual(len(Email.objects.all()), 2)
self.assertEqual(len(IssueThread.objects.all()), 1)
self.assertEqual(len(EmailAttachment.objects.all()), 0)
aiosmtplib.send.assert_called_once()
self.assertEqual('test', Email.objects.all()[0].subject)
self.assertEqual('test1@test', Email.objects.all()[0].sender)
self.assertEqual('test2@test', Email.objects.all()[0].recipient)
self.assertEqual('test', Email.objects.all()[0].body)
self.assertEqual(IssueThread.objects.all()[0], Email.objects.all()[0].issue_thread)
self.assertEqual('<1@test>', Email.objects.all()[0].reference)
self.assertEqual(None, Email.objects.all()[0].in_reply_to)
self.assertEqual(expected_auto_reply_subject.format('test', IssueThread.objects.all()[0].short_uuid()),
Email.objects.all()[1].subject)
self.assertEqual('test2@test', Email.objects.all()[1].sender)
self.assertEqual('test1@test', Email.objects.all()[1].recipient)
self.assertEqual(expected_auto_reply.format(IssueThread.objects.all()[0].short_uuid()),
Email.objects.all()[1].body)
self.assertEqual(IssueThread.objects.all()[0], Email.objects.all()[1].issue_thread)
self.assertTrue(Email.objects.all()[1].reference.startswith("<"))
self.assertTrue(Email.objects.all()[1].reference.endswith("@localhost>"))
self.assertEqual("<1@test>", Email.objects.all()[1].in_reply_to)
self.assertEqual('test', IssueThread.objects.all()[0].name)
self.assertEqual('pending_new', IssueThread.objects.all()[0].state)
self.assertEqual(None, IssueThread.objects.all()[0].assigned_to)
states = StateChange.objects.filter(issue_thread=IssueThread.objects.all()[0])
self.assertEqual(1, len(states))
self.assertEqual('pending_new', states[0].state)
def test_split_text_inline_image(self):
from aiosmtpd.smtp import Envelope
from asgiref.sync import async_to_sync
import aiosmtplib
aiosmtplib.send = make_mocked_coro()
handler = LMTPHandler()
server = mock.Mock()
session = mock.Mock()
envelope = Envelope()
envelope.mail_from = 'test1@test'
envelope.rcpt_tos = ['test2@test']
envelope.content = b'''Subject: test
From: test1@test
To: test2@test
Message-ID: <1@test>
Content-Type: multipart/alternative; boundary="abc"
--abc
Content-Type: text/plain; charset=utf-8
test1
--abc
Content-Type: image/jpeg; name="test.jpg"
Content-Disposition: inline; filename="test.jpg"
Content-Transfer-Encoding: base64
Content-ID: <1>
X-Attachment-Id: 1
dGVzdGltYWdl
--abc
Content-Type: text/plain; charset=utf-8
test2
--abc--'''
result = async_to_sync(handler.handle_DATA)(server, session, envelope)
self.assertEqual(result, '250 Message accepted for delivery')
self.assertEqual(len(Email.objects.all()), 2)
self.assertEqual(len(IssueThread.objects.all()), 1)
aiosmtplib.send.assert_called_once()
self.assertEqual('test', Email.objects.all()[0].subject)
self.assertEqual('test1@test', Email.objects.all()[0].sender)
self.assertEqual('test2@test', Email.objects.all()[0].recipient)
self.assertEqual('test1\n<img src="cid:1">test2\n', Email.objects.all()[0].body)
self.assertEqual(IssueThread.objects.all()[0], Email.objects.all()[0].issue_thread)
self.assertEqual('<1@test>', Email.objects.all()[0].reference)
self.assertEqual(None, Email.objects.all()[0].in_reply_to)
self.assertEqual(expected_auto_reply_subject.format('test', IssueThread.objects.all()[0].short_uuid()),
Email.objects.all()[1].subject)
self.assertEqual('test2@test', Email.objects.all()[1].sender)
self.assertEqual('test1@test', Email.objects.all()[1].recipient)
self.assertEqual(expected_auto_reply.format(IssueThread.objects.all()[0].short_uuid()),
Email.objects.all()[1].body)
self.assertEqual(IssueThread.objects.all()[0], Email.objects.all()[1].issue_thread)
self.assertTrue(Email.objects.all()[1].reference.startswith("<"))
self.assertTrue(Email.objects.all()[1].reference.endswith("@localhost>"))
self.assertEqual("<1@test>", Email.objects.all()[1].in_reply_to)
self.assertEqual('test', IssueThread.objects.all()[0].name)
self.assertEqual('pending_new', IssueThread.objects.all()[0].state)
self.assertEqual(None, IssueThread.objects.all()[0].assigned_to)
states = StateChange.objects.filter(issue_thread=IssueThread.objects.all()[0])
self.assertEqual(1, len(states))
self.assertEqual('pending_new', states[0].state)
self.assertEqual(1, len(EmailAttachment.objects.all()))
self.assertEqual(1, EmailAttachment.objects.all()[0].id)
self.assertEqual('image/jpeg', EmailAttachment.objects.all()[0].mime_type)
self.assertEqual('test.jpg', EmailAttachment.objects.all()[0].name)
file_content = EmailAttachment.objects.all()[0].file.read()
self.assertEqual(b'testimage', file_content)
def test_text_with_attachment(self):
from aiosmtpd.smtp import Envelope
from asgiref.sync import async_to_sync
import aiosmtplib
aiosmtplib.send = make_mocked_coro()
handler = LMTPHandler()
server = mock.Mock()
session = mock.Mock()
envelope = Envelope()
envelope.mail_from = 'test1@test'
envelope.rcpt_tos = ['test2@test']
envelope.content = b'''Subject: test
From: test1@test
To: test2@test
Message-ID: <1@test>
Content-Type: multipart/mixed; boundary="abc"
--abc
Content-Type: text/plain; charset=utf-8
test1
--abc
Content-Type: image/jpeg; name="test.jpg"
Content-Disposition: attachment; filename="test.jpg"
Content-Transfer-Encoding: base64
Content-ID: <1>
X-Attachment-Id: 1
dGVzdGltYWdl
--abc--'''
result = async_to_sync(handler.handle_DATA)(server, session, envelope)
self.assertEqual(result, '250 Message accepted for delivery')
self.assertEqual(len(Email.objects.all()), 2)
self.assertEqual(len(IssueThread.objects.all()), 1)
aiosmtplib.send.assert_called_once()
self.assertEqual('test', Email.objects.all()[0].subject)
self.assertEqual('test1@test', Email.objects.all()[0].sender)
self.assertEqual('test2@test', Email.objects.all()[0].recipient)
self.assertEqual('test1\n', Email.objects.all()[0].body)
self.assertEqual(IssueThread.objects.all()[0], Email.objects.all()[0].issue_thread)
self.assertEqual('<1@test>', Email.objects.all()[0].reference)
self.assertEqual(None, Email.objects.all()[0].in_reply_to)
self.assertEqual(expected_auto_reply_subject.format('test', IssueThread.objects.all()[0].short_uuid()),
Email.objects.all()[1].subject)
self.assertEqual('test2@test', Email.objects.all()[1].sender)
self.assertEqual('test1@test', Email.objects.all()[1].recipient)
self.assertEqual(expected_auto_reply.format(IssueThread.objects.all()[0].short_uuid()),
Email.objects.all()[1].body)
self.assertEqual(IssueThread.objects.all()[0], Email.objects.all()[1].issue_thread)
self.assertTrue(Email.objects.all()[1].reference.startswith("<"))
self.assertTrue(Email.objects.all()[1].reference.endswith("@localhost>"))
self.assertEqual("<1@test>", Email.objects.all()[1].in_reply_to)
self.assertEqual('test', IssueThread.objects.all()[0].name)
self.assertEqual('pending_new', IssueThread.objects.all()[0].state)
self.assertEqual(None, IssueThread.objects.all()[0].assigned_to)
states = StateChange.objects.filter(issue_thread=IssueThread.objects.all()[0])
self.assertEqual(1, len(states))
self.assertEqual('pending_new', states[0].state)
self.assertEqual(1, len(EmailAttachment.objects.all()))
self.assertEqual(1, EmailAttachment.objects.all()[0].id)
self.assertEqual('image/jpeg', EmailAttachment.objects.all()[0].mime_type)
self.assertEqual('test.jpg', EmailAttachment.objects.all()[0].name)
file_content = EmailAttachment.objects.all()[0].file.read()
self.assertEqual(b'testimage', file_content)
def test_mail_noreply(self):
from aiosmtpd.smtp import Envelope
from asgiref.sync import async_to_sync
import aiosmtplib
aiosmtplib.send = make_mocked_coro()
handler = LMTPHandler()
server = mock.Mock()
session = mock.Mock()
envelope = Envelope()
envelope.mail_from = 'noreply@test'
envelope.rcpt_tos = ['test2@test']
envelope.content = b'Subject: test\nFrom: noreply@test\nTo: test2@test\nMessage-ID: <1@test>\n\ntest'
result = async_to_sync(handler.handle_DATA)(server, session, envelope)
self.assertEqual(result, '250 Message accepted for delivery')
self.assertEqual(len(Email.objects.all()), 1)
self.assertEqual(len(IssueThread.objects.all()), 1)
aiosmtplib.send.assert_not_called()
self.assertEqual('test', Email.objects.all()[0].subject)
self.assertEqual('noreply@test', Email.objects.all()[0].sender)
self.assertEqual('test2@test', Email.objects.all()[0].recipient)
self.assertEqual('test', Email.objects.all()[0].body)
self.assertEqual(IssueThread.objects.all()[0], Email.objects.all()[0].issue_thread)
self.assertEqual('<1@test>', Email.objects.all()[0].reference)
self.assertEqual(None, Email.objects.all()[0].in_reply_to)
self.assertEqual('test', IssueThread.objects.all()[0].name)
self.assertEqual('pending_new', IssueThread.objects.all()[0].state)
self.assertEqual(None, IssueThread.objects.all()[0].assigned_to)
states = StateChange.objects.filter(issue_thread=IssueThread.objects.all()[0])
self.assertEqual(1, len(states))
self.assertEqual('pending_new', states[0].state)
def test_mail_empty_subject(self):
from aiosmtpd.smtp import Envelope
from asgiref.sync import async_to_sync
import aiosmtplib
import logging
logging.disable(logging.CRITICAL)
aiosmtplib.send = make_mocked_coro()
handler = LMTPHandler()
server = mock.Mock()
session = mock.Mock()
envelope = Envelope()
envelope.mail_from = 'test1@test'
envelope.rcpt_tos = ['test2@test']
envelope.content = b'From: noreply@test\nTo: test2@test\nMessage-ID: <1@test>\n\ntest'
result = async_to_sync(handler.handle_DATA)(server, session, envelope)
logging.disable(logging.NOTSET)
self.assertEqual('250 Message accepted for delivery', result)
self.assertEqual(2, len(Email.objects.all()))
self.assertEqual(1, len(IssueThread.objects.all()))
aiosmtplib.send.assert_called_once()
self.assertEqual('No subject', Email.objects.all()[0].subject)
self.assertEqual('test1@test', Email.objects.all()[0].sender)
self.assertEqual('test2@test', Email.objects.all()[0].recipient)
self.assertEqual('test', Email.objects.all()[0].body)
self.assertEqual(IssueThread.objects.all()[0], Email.objects.all()[0].issue_thread)
self.assertEqual('<1@test>', Email.objects.all()[0].reference)
self.assertEqual(None, Email.objects.all()[0].in_reply_to)
self.assertEqual(expected_auto_reply_subject.format('No subject', IssueThread.objects.all()[0].short_uuid()),
Email.objects.all()[1].subject)
self.assertEqual('test2@test', Email.objects.all()[1].sender)
self.assertEqual('test1@test', Email.objects.all()[1].recipient)
self.assertEqual(expected_auto_reply.format(IssueThread.objects.all()[0].short_uuid()),
Email.objects.all()[1].body)
self.assertEqual(IssueThread.objects.all()[0], Email.objects.all()[1].issue_thread)
self.assertTrue(Email.objects.all()[1].reference.startswith("<"))
self.assertTrue(Email.objects.all()[1].reference.endswith("@localhost>"))
self.assertEqual("<1@test>", Email.objects.all()[1].in_reply_to)
self.assertEqual('No subject', IssueThread.objects.all()[0].name)
self.assertEqual('pending_new', IssueThread.objects.all()[0].state)
self.assertEqual(None, IssueThread.objects.all()[0].assigned_to)
states = StateChange.objects.filter(issue_thread=IssueThread.objects.all()[0])
self.assertEqual(1, len(states))
self.assertEqual('pending_new', states[0].state)
def test_mail_empty_body(self):
from aiosmtpd.smtp import Envelope
from asgiref.sync import async_to_sync
import aiosmtplib
import logging
logging.disable(logging.CRITICAL)
aiosmtplib.send = make_mocked_coro()
handler = LMTPHandler()
server = mock.Mock()
session = mock.Mock()
envelope = Envelope()
envelope.mail_from = '<test1@test>'
envelope.rcpt_tos = ['test2@test']
envelope.content = b'Subject: test\nFrom: <test1@test>\nTo: test2@test\nMessage-ID: <1@test>\n\n'
result = async_to_sync(handler.handle_DATA)(server, session, envelope)
logging.disable(logging.NOTSET)
self.assertEqual('250 Message accepted for delivery', result)
self.assertEqual(2, len(Email.objects.all()))
self.assertEqual(1, len(IssueThread.objects.all()))
aiosmtplib.send.assert_called_once()
self.assertEqual('test', Email.objects.all()[0].subject)
self.assertEqual('<test1@test>', Email.objects.all()[0].sender)
self.assertEqual('test2@test', Email.objects.all()[0].recipient)
self.assertEqual('', Email.objects.all()[0].body)
self.assertEqual(IssueThread.objects.all()[0], Email.objects.all()[0].issue_thread)
self.assertEqual('<1@test>', Email.objects.all()[0].reference)
self.assertEqual(None, Email.objects.all()[0].in_reply_to)
self.assertEqual(expected_auto_reply_subject.format('test', IssueThread.objects.all()[0].short_uuid()),
Email.objects.all()[1].subject)
self.assertEqual('test2@test', Email.objects.all()[1].sender)
self.assertEqual('<test1@test>', Email.objects.all()[1].recipient)
self.assertEqual(expected_auto_reply.format(IssueThread.objects.all()[0].short_uuid()),
Email.objects.all()[1].body)
self.assertEqual(IssueThread.objects.all()[0], Email.objects.all()[1].issue_thread)
self.assertTrue(Email.objects.all()[1].reference.startswith("<"))
self.assertTrue(Email.objects.all()[1].reference.endswith("@localhost>"))
self.assertEqual("<1@test>", Email.objects.all()[1].in_reply_to)
self.assertEqual('test', IssueThread.objects.all()[0].name)
self.assertEqual('pending_new', IssueThread.objects.all()[0].state)
self.assertEqual(None, IssueThread.objects.all()[0].assigned_to)
states = StateChange.objects.filter(issue_thread=IssueThread.objects.all()[0])
self.assertEqual(1, len(states))
self.assertEqual('pending_new', states[0].state)
def test_mail_plus_issue_thread(self):
issue_thread = IssueThread.objects.create(
name="test subject",
)
mail1 = Email.objects.create(
subject='test subject',
body='test',
sender='test1@test',
recipient='test2@localhost',
issue_thread=issue_thread,
)
mail2 = Email.objects.create(
subject='Message received',
body='Thank you for your message.',
sender='test2@localhost',
recipient='test1@test',
in_reply_to=mail1.reference,
issue_thread=issue_thread,
)
Email.objects.create(
subject='Re: Message received',
body='any updates?',
sender='test3@test',
recipient='test2@localhost',
in_reply_to=mail2.reference,
issue_thread=issue_thread,
)
from aiosmtpd.smtp import Envelope
from asgiref.sync import async_to_sync
import aiosmtplib
import logging
logging.disable(logging.CRITICAL)
aiosmtplib.send = make_mocked_coro()
handler = LMTPHandler()
server = mock.Mock()
session = mock.Mock()
envelope = Envelope()
envelope.mail_from = '<test1@test>'
envelope.rcpt_tos = ['ticket+{}@test'.format(issue_thread.uuid)]
envelope.content = (f'Subject: foo\nFrom: <test3@test>\nTo: ticket+{issue_thread.uuid}@test\n'
f'Message-ID: <3@test>\n\nbar'.encode('utf-8'))
result = async_to_sync(handler.handle_DATA)(server, session, envelope)
logging.disable(logging.NOTSET)
self.assertEqual('250 Message accepted for delivery', result)
self.assertEqual(4, len(Email.objects.all()))
self.assertEqual(4, len(Email.objects.filter(issue_thread=issue_thread)))
self.assertEqual(1, len(IssueThread.objects.all()))
aiosmtplib.send.assert_not_called()
self.assertEqual(Email.objects.all()[3].subject, 'foo')
self.assertEqual(Email.objects.all()[3].sender, '<test1@test>')
self.assertEqual(Email.objects.all()[3].recipient, 'ticket+{}@test'.format(issue_thread.uuid))
self.assertEqual(Email.objects.all()[3].body, 'bar')
self.assertEqual(Email.objects.all()[3].issue_thread, issue_thread)
self.assertEqual(Email.objects.all()[3].reference, '<3@test>')
self.assertEqual('test subject', IssueThread.objects.all()[0].name)
response = self.client.post(f'/api/2/tickets/{issue_thread.id}/reply/', {
'message': 'test'
})
self.assertEqual(response.status_code, 201)
self.assertEqual(5, len(Email.objects.all()))
self.assertEqual(5, len(Email.objects.filter(issue_thread=issue_thread)))
self.assertEqual(1, len(IssueThread.objects.all()))
self.assertEqual(Email.objects.all()[4].subject, 'Re: test subject [#{0}]'.format(issue_thread.short_uuid()))
self.assertEqual(Email.objects.all()[4].sender, 'test2@localhost')
self.assertEqual(Email.objects.all()[4].recipient, 'test1@test')
self.assertEqual(Email.objects.all()[4].body, 'test')
self.assertEqual(Email.objects.all()[4].issue_thread, issue_thread)
self.assertTrue(Email.objects.all()[4].reference.startswith("<"))
self.assertTrue(Email.objects.all()[4].reference.endswith("@localhost>"))
self.assertEqual(Email.objects.all()[4].in_reply_to, mail1.reference)
self.assertEqual('test subject', IssueThread.objects.all()[0].name)
self.assertEqual('pending_new', IssueThread.objects.all()[0].state)
self.assertEqual(None, IssueThread.objects.all()[0].assigned_to)
aiosmtplib.send.assert_called_once()
def test_mail_4byte_unicode_emoji(self):
from aiosmtpd.smtp import Envelope
from asgiref.sync import async_to_sync
import aiosmtplib
aiosmtplib.send = make_mocked_coro()
handler = LMTPHandler()
server = mock.Mock()
session = mock.Mock()
envelope = Envelope()
envelope.mail_from = 'test1@test'
envelope.rcpt_tos = ['test2@test']
envelope.content = b'''Subject: test
From: test1@test
To: test2@test
Message-ID: <1@test>
Content-Type: text/html; charset=utf-8
thank you =?utf-8?Q?=F0=9F=98=8A?=''' # thank you 😊
result = async_to_sync(handler.handle_DATA)(server, session, envelope)
self.assertEqual('250 Message accepted for delivery', result)
self.assertEqual(2, len(Email.objects.all()))
self.assertEqual(1, len(IssueThread.objects.all()))
aiosmtplib.send.assert_called_once()
self.assertEqual('test', Email.objects.all()[0].subject)
self.assertEqual('test1@test', Email.objects.all()[0].sender)
self.assertEqual('test2@test', Email.objects.all()[0].recipient)
self.assertEqual('thank you 😊', Email.objects.all()[0].body)
self.assertEqual(IssueThread.objects.all()[0], Email.objects.all()[0].issue_thread)
self.assertEqual('<1@test>', Email.objects.all()[0].reference)
self.assertEqual(None, Email.objects.all()[0].in_reply_to)
self.assertEqual(expected_auto_reply_subject.format('test', IssueThread.objects.all()[0].short_uuid()),
Email.objects.all()[1].subject)
self.assertEqual('test2@test', Email.objects.all()[1].sender)
self.assertEqual('test1@test', Email.objects.all()[1].recipient)
self.assertEqual(expected_auto_reply.format(IssueThread.objects.all()[0].short_uuid()),
Email.objects.all()[1].body)
self.assertEqual(IssueThread.objects.all()[0], Email.objects.all()[1].issue_thread)
self.assertTrue(Email.objects.all()[1].reference.startswith("<"))
self.assertTrue(Email.objects.all()[1].reference.endswith("@localhost>"))
self.assertEqual("<1@test>", Email.objects.all()[1].in_reply_to)
self.assertEqual('test', IssueThread.objects.all()[0].name)
self.assertEqual('pending_new', IssueThread.objects.all()[0].state)
self.assertEqual(None, IssueThread.objects.all()[0].assigned_to)
states = StateChange.objects.filter(issue_thread=IssueThread.objects.all()[0])
self.assertEqual(1, len(states))
self.assertEqual('pending_new', states[0].state)
def test_mail_non_utf8(self):
from aiosmtpd.smtp import Envelope
from asgiref.sync import async_to_sync
import aiosmtplib
aiosmtplib.send = make_mocked_coro()
handler = LMTPHandler()
server = mock.Mock()
session = mock.Mock()
envelope = Envelope()
envelope.mail_from = 'test1@test'
envelope.rcpt_tos = ['test2@test']
envelope.content = b'''Subject: test
From: test1@test
To: test2@test
Message-ID: <1@test>
Content-Type: text/html; charset=iso-8859-1
hello \xe4\xf6\xfc'''
result = async_to_sync(handler.handle_DATA)(server, session, envelope)
self.assertEqual('250 Message accepted for delivery', result)
self.assertEqual(2, len(Email.objects.all()))
self.assertEqual(1, len(IssueThread.objects.all()))
aiosmtplib.send.assert_called_once()
self.assertEqual('test', Email.objects.all()[0].subject)
self.assertEqual('test1@test', Email.objects.all()[0].sender)
self.assertEqual('test2@test', Email.objects.all()[0].recipient)
self.assertEqual('hello äöü', Email.objects.all()[0].body)
self.assertEqual(IssueThread.objects.all()[0], Email.objects.all()[0].issue_thread)
self.assertEqual('<1@test>', Email.objects.all()[0].reference)
self.assertEqual(None, Email.objects.all()[0].in_reply_to)
self.assertEqual(expected_auto_reply_subject.format('test', IssueThread.objects.all()[0].short_uuid()),
Email.objects.all()[1].subject)
self.assertEqual('test2@test', Email.objects.all()[1].sender)
self.assertEqual('test1@test', Email.objects.all()[1].recipient)
self.assertEqual(expected_auto_reply.format(IssueThread.objects.all()[0].short_uuid()),
Email.objects.all()[1].body)
self.assertEqual(IssueThread.objects.all()[0], Email.objects.all()[1].issue_thread)
self.assertTrue(Email.objects.all()[1].reference.startswith("<"))
self.assertTrue(Email.objects.all()[1].reference.endswith("@localhost>"))
self.assertEqual("<1@test>", Email.objects.all()[1].in_reply_to)
self.assertEqual('test', IssueThread.objects.all()[0].name)
self.assertEqual('pending_new', IssueThread.objects.all()[0].state)
self.assertEqual(None, IssueThread.objects.all()[0].assigned_to)
states = StateChange.objects.filter(issue_thread=IssueThread.objects.all()[0])
self.assertEqual(1, len(states))
self.assertEqual('pending_new', states[0].state)
def test_mail_quoted_printable_transfer_encoding(self):
from aiosmtpd.smtp import Envelope
from asgiref.sync import async_to_sync
import aiosmtplib
aiosmtplib.send = make_mocked_coro()
handler = LMTPHandler()
server = mock.Mock()
session = mock.Mock()
envelope = Envelope()
envelope.mail_from = 'test1@test'
envelope.rcpt_tos = ['test2@test']
envelope.content = b'''Subject: test
From: test1@test
To: test2@test
Message-ID: <1@test>
Content-Type: text/html; charset=utf-8
Content-Transfer-Encoding: quoted-printable
hello =C3=A4=C3=B6=C3=BC'''
result = async_to_sync(handler.handle_DATA)(server, session, envelope)
self.assertEqual('250 Message accepted for delivery', result)
self.assertEqual(2, len(Email.objects.all()))
self.assertEqual(1, len(IssueThread.objects.all()))
aiosmtplib.send.assert_called_once()
self.assertEqual('test', Email.objects.all()[0].subject)
self.assertEqual('test1@test', Email.objects.all()[0].sender)
self.assertEqual('test2@test', Email.objects.all()[0].recipient)
self.assertEqual('hello äöü', Email.objects.all()[0].body)
self.assertEqual(IssueThread.objects.all()[0], Email.objects.all()[0].issue_thread)
self.assertEqual('<1@test>', Email.objects.all()[0].reference)
self.assertEqual(None, Email.objects.all()[0].in_reply_to)
self.assertEqual(expected_auto_reply_subject.format('test', IssueThread.objects.all()[0].short_uuid()),
Email.objects.all()[1].subject)
self.assertEqual('test2@test', Email.objects.all()[1].sender)
self.assertEqual('test1@test', Email.objects.all()[1].recipient)
self.assertEqual(expected_auto_reply.format(IssueThread.objects.all()[0].short_uuid()),
Email.objects.all()[1].body)
self.assertEqual(IssueThread.objects.all()[0], Email.objects.all()[1].issue_thread)
self.assertTrue(Email.objects.all()[1].reference.startswith("<"))
self.assertTrue(Email.objects.all()[1].reference.endswith("@localhost>"))
self.assertEqual("<1@test>", Email.objects.all()[1].in_reply_to)
self.assertEqual('test', IssueThread.objects.all()[0].name)
self.assertEqual('pending_new', IssueThread.objects.all()[0].state)
self.assertEqual(None, IssueThread.objects.all()[0].assigned_to)
states = StateChange.objects.filter(issue_thread=IssueThread.objects.all()[0])
self.assertEqual(1, len(states))
self.assertEqual('pending_new', states[0].state)

22
core/manage.py Executable file
View file

@ -0,0 +1,22 @@
#!/usr/bin/env python
"""Django's command-line utility for administrative tasks."""
import os
import sys
def main():
"""Run administrative tasks."""
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'core.settings')
try:
from django.core.management import execute_from_command_line
except ImportError as exc:
raise ImportError(
"Couldn't import Django. Are you sure it's installed and "
"available on your PYTHONPATH environment variable? Did you "
"forget to activate a virtual environment?"
) from exc
execute_from_command_line(sys.argv)
if __name__ == '__main__':
main()

View file

View file

@ -0,0 +1,10 @@
from django.contrib import admin
from notify_sessions.models import SystemEvent
class SystemEventAdmin(admin.ModelAdmin):
pass
admin.site.register(SystemEvent, SystemEventAdmin)

View file

@ -0,0 +1,21 @@
from rest_framework import routers, viewsets, serializers
from tickets.models import IssueThread
from notify_sessions.models import SystemEvent
class SystemEventSerializer(serializers.ModelSerializer):
class Meta:
model = SystemEvent
fields = '__all__'
class SystemEventViewSet(viewsets.ModelViewSet):
serializer_class = SystemEventSerializer
queryset = SystemEvent.objects.all()
router = routers.SimpleRouter()
router.register(r'systemevents', SystemEventViewSet, basename='systemevents')
urlpatterns = router.urls

View file

@ -0,0 +1,57 @@
import logging
from json.decoder import JSONDecodeError
from json import loads as json_loads
from json import dumps as json_dumps
from channels.generic.websocket import AsyncWebsocketConsumer
class NotifyConsumer(AsyncWebsocketConsumer):
def __init__(self, *args, **kwargs):
super().__init__(args, kwargs)
self.log = logging.getLogger("server.log")
self.room_group_name = "general"
async def connect(self):
# Join room group
await self.channel_layer.group_add(self.room_group_name, self.channel_name)
self.log.info(f"Added {self.channel_name} channel to {self.room_group_name} group")
await self.accept()
async def disconnect(self, close_code):
# Leave room group
await self.channel_layer.group_discard(self.room_group_name, self.channel_name)
# Receive message from WebSocket
async def receive(self, text_data=None, bytes_data=None):
self.log.info(f"Received message: {text_data}")
try:
text_data_json = json_loads(text_data)
message = text_data_json["message"]
# Send message to room group
await self.channel_layer.group_send(
self.room_group_name,
{"type": "generic.event", "message": message, "name": "send_message_to_frontend", "event_id": 1}
)
except JSONDecodeError as e:
await self.send(text_data=json_dumps({"message": "error", "error": "malformed json"}))
self.log.error(e)
except KeyError as e:
await self.send(text_data=json_dumps({"message": "error", "error": f"missing key: {str(e)}"}))
self.log.error(e)
except Exception as e:
await self.send(text_data=json_dumps({"message": "error", "error": "unknown error"}))
self.log.error(e)
raise e
# Receive message from room group
async def generic_event(self, event):
self.log.info(f"Received event: {event}")
message = event["message"]
name = event["name"]
event_id = event["event_id"]
# Send message to WebSocket
await self.send(text_data=json_dumps({"message": message, "name": name, "event_id": event_id}))

View file

@ -0,0 +1,27 @@
# Generated by Django 4.2.7 on 2023-12-09 02:13
from django.conf import settings
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
initial = True
dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
]
operations = [
migrations.CreateModel(
name='SystemEvent',
fields=[
('id', models.AutoField(primary_key=True, serialize=False)),
('timestamp', models.DateTimeField(auto_now_add=True)),
('type', models.CharField(choices=[('ticket_created', 'ticket_created'), ('ticket_updated', 'ticket_updated'), ('ticket_deleted', 'ticket_deleted'), ('item_created', 'item_created'), ('item_updated', 'item_updated'), ('item_deleted', 'item_deleted'), ('user_created', 'user_created'), ('event_created', 'event_created'), ('event_updated', 'event_updated'), ('event_deleted', 'event_deleted')], max_length=255)),
('reference', models.IntegerField(blank=True, null=True)),
('user', models.ForeignKey(null=True, on_delete=django.db.models.deletion.SET_NULL, to=settings.AUTH_USER_MODEL)),
],
),
]

View file

@ -0,0 +1,48 @@
import logging
from django.db import models
from asgiref.sync import sync_to_async
from channels.layers import get_channel_layer
from authentication.models import ExtendedUser
class SystemEvent(models.Model):
TYPE_CHOICES = [('ticket_created', 'ticket_created'),
('ticket_updated', 'ticket_updated'),
('ticket_deleted', 'ticket_deleted'),
('item_created', 'item_created'),
('item_updated', 'item_updated'),
('item_deleted', 'item_deleted'),
('user_created', 'user_created'),
('event_created', 'event_created'),
('event_updated', 'event_updated'),
('event_deleted', 'event_deleted'), ]
id = models.AutoField(primary_key=True)
timestamp = models.DateTimeField(auto_now_add=True)
user = models.ForeignKey(ExtendedUser, models.SET_NULL, null=True)
type = models.CharField(max_length=255, choices=TYPE_CHOICES)
reference = models.IntegerField(blank=True, null=True)
async def trigger_event(user, type, reference=None):
log = logging.getLogger('server.log')
log.info(f"Triggering event {type} for user {user} with reference {reference}")
try:
event = await sync_to_async(SystemEvent.objects.create, thread_sensitive=True)(user=user, type=type,
reference=reference)
channel_layer = get_channel_layer()
await channel_layer.group_send(
'general',
{
'type': 'generic.event',
'name': 'send_message_to_frontend',
'message': "event_trigered_from_views",
'event_id': event.id,
}
)
log.info(f"SystemEvent {event.id} triggered")
return event
except Exception as e:
log.error(e)
raise e

View file

@ -0,0 +1,7 @@
from django.urls import path
from .consumers import NotifyConsumer
websocket_urlpatterns = [
path('ws/2/notify/', NotifyConsumer.as_asgi()),
]

View file

View file

@ -0,0 +1,66 @@
from django.test import TestCase
from channels.testing import WebsocketCommunicator
from notify_sessions.consumers import NotifyConsumer
from asgiref.sync import async_to_sync
class NotifyWebsocketTestCase(TestCase):
async def test_connect(self):
communicator = WebsocketCommunicator(NotifyConsumer.as_asgi(), "/ws/2/notify/")
connected, subprotocol = await communicator.connect()
self.assertTrue(connected)
await communicator.disconnect()
async def fut_send_message(self):
communicator = WebsocketCommunicator(NotifyConsumer.as_asgi(), "/ws/2/notify/")
connected, subprotocol = await communicator.connect()
self.assertTrue(connected)
await communicator.send_json_to({
"name": "foo",
"message": "bar",
})
response = await communicator.receive_json_from()
await communicator.disconnect()
return response
def test_send_message(self):
response = async_to_sync(self.fut_send_message)()
self.assertEqual(response["message"], "bar")
self.assertEqual(response["event_id"], 1)
self.assertEqual(response["name"], "send_message_to_frontend")
# events = SystemEvent.objects.all()
# self.assertEqual(len(events), 1)
# event = events[0]
# self.assertEqual(event.event_id, 1)
# self.assertEqual(event.name, "send_message_to_frontend")
# self.assertEqual(event.message, "bar")
async def fut_send_and_receive_message(self):
communicator1 = WebsocketCommunicator(NotifyConsumer.as_asgi(), "/ws/2/notify/")
communicator2 = WebsocketCommunicator(NotifyConsumer.as_asgi(), "/ws/2/notify/")
connected1, subprotocol1 = await communicator1.connect()
connected2, subprotocol2 = await communicator2.connect()
self.assertTrue(connected1)
self.assertTrue(connected2)
await communicator1.send_json_to({
"name": "foo",
"message": "bar",
})
response = await communicator2.receive_json_from()
await communicator1.disconnect()
await communicator2.disconnect()
return response
def test_send_and_receive_message(self):
response = async_to_sync(self.fut_send_and_receive_message)()
self.assertEqual(response["message"], "bar")
self.assertEqual(response["event_id"], 1)
self.assertEqual(response["name"], "send_message_to_frontend")
# events = SystemEvent.objects.all()
# self.assertEqual(len(events), 1)
# event = events[0]
# self.assertEqual(event.event_id, 1)
# self.assertEqual(event.name, "send_message_to_frontend")
# self.assertEqual(event.message, "bar")

77
core/requirements.dev.txt Normal file
View file

@ -0,0 +1,77 @@
aiodns==3.2.0
aiohttp==3.9.5
aiosignal==1.3.1
aiosmtpd==1.4.4.post2
aiosmtplib==3.0.1
anyio==4.1.0
asgiref==3.7.2
asynctest==0.7.1
atpublic==4.0
attrs==23.1.0
autobahn==23.6.2
Automat==22.10.0
beautifulsoup4==4.12.2
bs4==0.0.1
certifi==2023.11.17
cffi==1.16.0
channels==4.0.0
channels-redis==4.1.0
charset-normalizer==3.3.2
click==8.1.7
constantly==23.10.4
coreapi==2.3.3
coreschema==0.0.4
coverage==7.3.2
cryptography==41.0.5
daphne==4.0.0
Django==4.2.7
django-async-test==0.2.2
django-extensions==3.2.3
django-rest-knox==4.2.0
django-soft-delete==0.9.21
djangorestframework==3.14.0
drf-yasg==1.21.7
frozenlist==1.4.1
h11==0.14.0
hyperlink==21.0.0
idna==3.4
incremental==22.10.0
inflection==0.5.1
itypes==1.2.0
Jinja2==3.1.2
MarkupSafe==2.1.3
msgpack==1.0.7
msgpack-python==0.5.6
multidict==6.0.5
openapi-codec==1.3.2
packaging==23.2
Pillow==10.1.0
pyasn1==0.5.1
pyasn1-modules==0.3.0
pycares==4.4.0
pycparser==2.21
pyOpenSSL==23.3.0
python-dotenv==1.0.0
pytz==2023.3.post1
PyYAML==6.0.1
redis==5.0.1
requests==2.31.0
sdnotify==0.3.2
service-identity==23.1.0
setproctitle==1.3.3
six==1.16.0
sniffio==1.3.0
soupsieve==2.5
sqlparse==0.4.4
Twisted==23.10.0
txaio==23.1.1
typing_extensions==4.8.0
uritemplate==4.1.1
urllib3==2.1.0
uvicorn==0.24.0.post1
watchfiles==0.21.0
websockets==12.0
yarl==1.9.4
zope.interface==6.1
django-prometheus==2.3.1
prometheus_client==0.21.0

View file

@ -0,0 +1,45 @@
aiodns==3.2.0
aiohttp==3.9.5
aiosignal==1.3.1
aiosmtpd==1.4.4.post2
aiosmtplib==3.0.1
asgiref==3.7.2
attrs==23.1.0
beautifulsoup4==4.12.2
bs4==0.0.1
certifi==2023.11.17
channels==4.0.0
channels-redis==4.1.0
coreapi==2.3.3
coreschema==0.0.4
Django==4.2.7
django-extensions==3.2.3
django-mysql==4.12.0
django-rest-knox==4.2.0
django-soft-delete==0.9.21
djangorestframework==3.14.0
drf-yasg==1.21.7
MarkupSafe==2.1.3
msgpack==1.0.7
msgpack-python==0.5.6
mysqlclient==2.2.0
openapi-codec==1.3.2
packaging==23.2
Pillow==10.1.0
python-dotenv==1.0.0
pytz==2023.3.post1
PyYAML==6.0.1
requests==2.31.0
sdnotify==0.3.2
setproctitle==1.3.3
sniffio==1.3.0
soupsieve==2.5
sqlparse==0.4.4
typing_extensions==4.8.0
uritemplate==4.1.1
urllib3==2.1.0
uvicorn==0.24.0.post1
watchfiles==0.21.0
websockets==12.0
django-prometheus==2.3.1
prometheus_client==0.21.0

92
core/server.py Normal file
View file

@ -0,0 +1,92 @@
#!/usr/bin/env python3
import logging
import os
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'core.settings')
import django
import uvicorn
django.setup()
from helper import init_loop
from mail.protocol import LMTPHandler
from mail.socket import UnixSocketLMTPController
class UvicornServer(uvicorn.Server):
def install_signal_handlers(self):
pass
async def web(loop):
log_config = uvicorn.config.LOGGING_CONFIG
log_config["handlers"]["default"] = {"class": "logging.FileHandler", "filename": "web.log", "formatter": "default"}
log_config["handlers"]["access"] = {"class": "logging.FileHandler", "filename": "web-access.log",
"formatter": "access"}
config = uvicorn.Config("core.asgi:application", uds="web.sock", log_config=log_config)
server = UvicornServer(config=config)
await server.serve()
async def lmtp(loop):
import grp
log = logging.getLogger('mail.log')
log.addHandler(logging.FileHandler('mail.log'))
# log.setLevel(logging.WARNING)
log.setLevel(logging.INFO)
log.info("Starting LMTP server")
server = await UnixSocketLMTPController(LMTPHandler(), unix_socket='lmtp.sock', loop=loop).serve()
addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)
log.info(f'Serving on {addrs}')
try:
os.chmod('lmtp.sock', 0o775)
current_uid = os.getuid()
posix_gid = grp.getgrnam('postfix').gr_gid
os.chown('lmtp.sock', current_uid, posix_gid)
except Exception as e:
log.error(e)
async with server:
await server.serve_forever()
log.info("LMTP done")
def main():
import sdnotify
import setproctitle
import os
setproctitle.setproctitle("c3lf-sys3")
log = logging.getLogger('server.log')
log.addHandler(logging.FileHandler('server.log'))
log.setLevel(logging.DEBUG)
log.info("Starting server")
loop = init_loop()
loop.create_task(web(loop))
# loop.create_task(tcp(loop))
loop.create_task(lmtp(loop))
n = sdnotify.SystemdNotifier()
n.notify("READY=1")
log.info("Server ready")
try:
loop.run_forever()
finally:
loop.close()
try:
os.remove("lmtp.sock")
except Exception as e:
log.error(e)
try:
os.remove("web.sock")
except Exception as e:
log.error(e)
log.error(e)
logging.info("Server stopped")
logging.shutdown()
if __name__ == '__main__':
main()

0
core/tickets/__init__.py Normal file
View file

35
core/tickets/admin.py Normal file
View file

@ -0,0 +1,35 @@
from django.contrib import admin
from tickets.models import IssueThread, Comment, StateChange, Assignment, ItemRelation, ShippingVoucher
class IssueThreadAdmin(admin.ModelAdmin):
pass
class CommentAdmin(admin.ModelAdmin):
pass
class StateChangeAdmin(admin.ModelAdmin):
pass
class AssignmentAdmin(admin.ModelAdmin):
pass
class ItemRelationAdmin(admin.ModelAdmin):
pass
class ShippingVouchersAdmin(admin.ModelAdmin):
pass
admin.site.register(IssueThread, IssueThreadAdmin)
admin.site.register(Comment, CommentAdmin)
admin.site.register(StateChange, StateChangeAdmin)
admin.site.register(Assignment, AssignmentAdmin)
admin.site.register(ItemRelation, ItemRelationAdmin)
admin.site.register(ShippingVoucher, ShippingVouchersAdmin)

175
core/tickets/api_v2.py Normal file
View file

@ -0,0 +1,175 @@
from base64 import b64decode
from django.urls import re_path
from django.contrib.auth.decorators import permission_required
from rest_framework import routers, viewsets, status
from rest_framework.decorators import api_view, permission_classes
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer
from core.settings import MAIL_DOMAIN
from inventory.models import Event
from mail.models import Email
from mail.protocol import send_smtp, make_reply, collect_references
from notify_sessions.models import SystemEvent
from tickets.models import IssueThread, Comment, STATE_CHOICES, ShippingVoucher, ItemRelation
from tickets.serializers import IssueSerializer, CommentSerializer, ShippingVoucherSerializer, SearchResultSerializer
from tickets.shared_serializers import RelationSerializer
class IssueViewSet(viewsets.ModelViewSet):
serializer_class = IssueSerializer
queryset = IssueThread.objects.all().prefetch_related('state_changes', 'comments', 'emails', 'emails__attachments', 'assignments', 'item_relation_changes', 'shipping_vouchers')
class RelationViewSet(viewsets.ModelViewSet):
serializer_class = RelationSerializer
queryset = ItemRelation.objects.all()
class CommentViewSet(viewsets.ModelViewSet):
serializer_class = CommentSerializer
queryset = Comment.objects.all()
class ShippingVoucherViewSet(viewsets.ModelViewSet):
serializer_class = ShippingVoucherSerializer
queryset = ShippingVoucher.objects.all()
@api_view(['POST'])
@permission_classes([IsAuthenticated])
@permission_required('tickets.add_issuethread', raise_exception=True)
def reply(request, pk):
issue = IssueThread.objects.get(pk=pk)
# email = issue.reply(request.data['body']) # TODO evaluate if this is a useful abstraction
references = collect_references(issue)
first_mail = Email.objects.filter(issue_thread=issue, recipient__endswith='@' + MAIL_DOMAIN).order_by(
'timestamp').first()
if not first_mail:
return Response({'status': 'error', 'message': 'no previous mail found, reply would not make sense.'},
status=status.HTTP_400_BAD_REQUEST)
mail = Email.objects.create(
issue_thread=issue,
sender=first_mail.recipient,
recipient=first_mail.sender,
subject=f'Re: {issue.name} [#{issue.short_uuid()}]',
body=request.data['message'],
in_reply_to=first_mail.reference,
)
async_to_sync(send_smtp)(make_reply(mail, references))
return Response({'status': 'ok'}, status=status.HTTP_201_CREATED)
@api_view(['POST'])
@permission_classes([IsAuthenticated])
@permission_required('tickets.add_issuethread_manual', raise_exception=True)
def manual_ticket(request, event_slug):
if 'name' not in request.data:
return Response({'status': 'error', 'message': 'missing name'}, status=status.HTTP_400_BAD_REQUEST)
if 'sender' not in request.data:
return Response({'status': 'error', 'message': 'missing sender'}, status=status.HTTP_400_BAD_REQUEST)
if 'recipient' not in request.data:
return Response({'status': 'error', 'message': 'missing recipient'}, status=status.HTTP_400_BAD_REQUEST)
if 'body' not in request.data:
return Response({'status': 'error', 'message': 'missing body'}, status=status.HTTP_400_BAD_REQUEST)
event = None
if event_slug != 'none':
try:
event = Event.objects.get(slug=event_slug)
except:
return Response({'status': 'error', 'message': 'invalid event'}, status=status.HTTP_400_BAD_REQUEST)
issue = IssueThread.objects.create(
name=request.data['name'],
event=event,
manually_created=True,
)
email = Email.objects.create(
issue_thread=issue,
sender=request.data['sender'],
recipient=request.data['recipient'],
subject=request.data['name'],
body=request.data['body'],
)
systemevent = SystemEvent.objects.create(type='email received', reference=email.id)
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
'general', {"type": "generic.event", "name": "send_message_to_frontend", "event_id": systemevent.id,
"message": "email received"}
)
return Response(IssueSerializer(issue).data, status=status.HTTP_201_CREATED)
@api_view(['GET'])
@permission_classes([IsAuthenticated])
def get_available_states(request):
def get_state_choices():
for state in STATE_CHOICES:
yield {'value': list(state)[0], 'text': list(state)[1]}
return Response(get_state_choices())
@api_view(['POST'])
@permission_classes([IsAuthenticated])
@permission_required('tickets.add_comment', raise_exception=True)
def add_comment(request, pk):
issue = IssueThread.objects.get(pk=pk)
if 'comment' not in request.data or request.data['comment'] == '':
return Response({'status': 'error', 'message': 'missing comment'}, status=status.HTTP_400_BAD_REQUEST)
comment = Comment.objects.create(
issue_thread=issue,
comment=request.data['comment'],
)
systemevent = SystemEvent.objects.create(type='comment added', reference=comment.id)
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
'general', {"type": "generic.event", "name": "send_message_to_frontend", "event_id": systemevent.id,
"message": "comment added"}
)
return Response(CommentSerializer(comment).data, status=status.HTTP_201_CREATED)
def filter_issues(issues, query):
query_tokens = query.split(' ')
for issue in issues:
value = 0
for token in query_tokens:
if token in issue.description:
value += 1
if value > 0:
yield {'search_score': value, 'issue': issue}
@api_view(['GET'])
@permission_classes([IsAuthenticated])
def search_issues(request, event_slug, query):
try:
event = Event.objects.get(slug=event_slug)
if not request.user.has_event_perm(event, 'view_issuethread'):
return Response(status=403)
items = filter_issues(IssueThread.objects.filter(event=event), b64decode(query).decode('utf-8'))
return Response(SearchResultSerializer(items, many=True).data)
except Event.DoesNotExist:
return Response(status=404)
router = routers.SimpleRouter()
router.register(r'tickets', IssueViewSet, basename='issues')
router.register(r'matches', RelationViewSet, basename='matches')
router.register(r'shipping_vouchers', ShippingVoucherViewSet, basename='shipping_vouchers')
urlpatterns = ([
re_path(r'tickets/states/$', get_available_states, name='get_available_states'),
re_path(r'^tickets/(?P<pk>\d+)/reply/$', reply, name='reply'),
re_path(r'^tickets/(?P<pk>\d+)/comment/$', add_comment, name='add_comment'),
re_path(r'^(?P<event_slug>[\w-]+)/tickets/manual/$', manual_ticket, name='manual_ticket'),
re_path(r'^(?P<event_slug>[\w-]+)/tickets/(?P<query>[-A-Za-z0-9+/]*={0,3})/$', search_issues,
name='search_issues'),
] + router.urls)

View file

@ -0,0 +1,48 @@
# Generated by Django 4.2.7 on 2023-12-09 02:13
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
initial = True
dependencies = [
]
operations = [
migrations.CreateModel(
name='IssueThread',
fields=[
('is_deleted', models.BooleanField(default=False)),
('deleted_at', models.DateTimeField(blank=True, null=True)),
('id', models.AutoField(primary_key=True, serialize=False)),
('name', models.CharField(max_length=255)),
('state', models.CharField(default='new', max_length=255)),
('assigned_to', models.CharField(max_length=255, null=True)),
('last_activity', models.DateTimeField(auto_now=True)),
],
options={
'permissions': [('send_mail', 'Can send mail')],
},
),
migrations.CreateModel(
name='StateChange',
fields=[
('id', models.AutoField(primary_key=True, serialize=False)),
('state', models.CharField(max_length=255)),
('timestamp', models.DateTimeField(auto_now_add=True)),
('issue_thread', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='state_changes', to='tickets.issuethread')),
],
),
migrations.CreateModel(
name='Comment',
fields=[
('id', models.AutoField(primary_key=True, serialize=False)),
('comment', models.TextField()),
('timestamp', models.DateTimeField(auto_now_add=True)),
('issue_thread', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='comments', to='tickets.issuethread')),
],
),
]

View file

@ -0,0 +1,22 @@
# Generated by Django 4.2.7 on 2023-12-22 20:57
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('tickets', '0001_initial'),
]
operations = [
migrations.AlterModelOptions(
name='issuethread',
options={'permissions': [('send_mail', 'Can send mail'), ('add_issuethread_manual', 'Can add issue thread manually')]},
),
migrations.AddField(
model_name='issuethread',
name='manually_created',
field=models.BooleanField(default=False),
),
]

View file

@ -0,0 +1,39 @@
# Generated by Django 4.2.7 on 2023-12-28 20:15
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('tickets', '0002_alter_issuethread_options_and_more'),
]
def convert_state(apps, schema_editor):
IssueThread = apps.get_model('tickets', 'IssueThread')
for issue in IssueThread.objects.all():
if issue.state == 'new':
issue.state = 'pending_new'
issue.save()
StateChange = apps.get_model('tickets', 'StateChange')
for change in StateChange.objects.all():
if change.state == 'new':
change.state = 'pending_new'
change.save()
operations = [
migrations.AlterField(
model_name='issuethread',
name='state',
field=models.CharField(
choices=[('pending_new', 'New'), ('pending_open', 'Open'), ('pending_shipping', 'Needs to be shipped'),
('pending_physical_confirmation', 'Needs to be confirmed physically'),
('pending_return', 'Needs to be returned'), ('waiting_details', 'Waiting for details'),
('waiting_pre_shipping', 'Waiting for Address/Shipping Info'),
('closed_returned', 'Closed: Returned'), ('closed_shipped', 'Closed: Shipped'),
('closed_not_found', 'Closed: Not found'),
('closed_not_our_problem', 'Closed: Not our problem'),
('closed_duplicate', 'Closed: Duplicate'), ('closed_timeout', 'Closed: Timeout'),
('closed_spam', 'Closed: Spam')], default='pending_new', max_length=32, verbose_name='state'),
),
migrations.RunPython(convert_state),
]

Some files were not shown because too many files have changed in this diff Show more