parent
f7fda52fd0
commit
cd1ff4fe2d
11 changed files with 93 additions and 44 deletions
|
@ -1,3 +1,4 @@
|
|||
name: Test
|
||||
on:
|
||||
pull_request:
|
||||
push:
|
||||
|
@ -16,6 +17,12 @@ jobs:
|
|||
- name: Install dependencies
|
||||
working-directory: core
|
||||
run: pip3 install -r requirements.dev.txt
|
||||
- name: Run django tests
|
||||
- name: Run django tests with coverage
|
||||
working-directory: core
|
||||
run: python3 manage.py test
|
||||
run: coverage run manage.py test
|
||||
- name: Run integration tests with coverage
|
||||
working-directory: core
|
||||
run: python3 integration_tests/main.py
|
||||
- name: Evaluate coverage
|
||||
working-directory: core
|
||||
run: coverage report
|
||||
|
|
|
@ -8,7 +8,7 @@ skip_covered = True
|
|||
omit =
|
||||
*/tests/*
|
||||
*/migrations/*
|
||||
integration_tests/*
|
||||
core/asgi.py
|
||||
core/wsgi.py
|
||||
core/settings.py
|
||||
manage.py
|
|
@ -54,9 +54,9 @@ def registerUser(request):
|
|||
errors['password'] = 'Password is required'
|
||||
if not email:
|
||||
errors['email'] = 'Email is required'
|
||||
if ExtendedUser.objects.filter(email=email).exists():
|
||||
if email and ExtendedUser.objects.filter(email=email).exists():
|
||||
errors['email'] = 'Email already exists'
|
||||
if ExtendedUser.objects.filter(username=username).exists():
|
||||
if username and ExtendedUser.objects.filter(username=username).exists():
|
||||
errors['username'] = 'Username already exists'
|
||||
if errors:
|
||||
return Response({'errors': errors}, status=400)
|
||||
|
|
|
@ -72,6 +72,17 @@ class UserApiTest(TestCase):
|
|||
self.assertEqual(ExtendedUser.objects.get(username='testuser2').email, 'test2')
|
||||
self.assertTrue(ExtendedUser.objects.get(username='testuser2').check_password('test'))
|
||||
|
||||
def test_register_user_fail(self):
|
||||
anonymous = Client()
|
||||
response = anonymous.post('/api/2/register/', {'username': 'testuser2', 'password': 'test', 'email': 'test2'},
|
||||
content_type='application/json')
|
||||
self.assertEqual(response.status_code, 201)
|
||||
self.assertEqual(response.json()['username'], 'testuser2')
|
||||
self.assertEqual(response.json()['email'], 'test2')
|
||||
self.assertEqual(len(ExtendedUser.objects.all()), 3)
|
||||
self.assertEqual(ExtendedUser.objects.get(username='testuser2').email, 'test2')
|
||||
self.assertTrue(ExtendedUser.objects.get(username='testuser2').check_password('test'))
|
||||
|
||||
def test_register_user_duplicate(self):
|
||||
anonymous = Client()
|
||||
response = anonymous.post('/api/2/register/', {'username': 'testuser', 'password': 'test', 'email': 'test2'},
|
||||
|
|
|
@ -2,12 +2,7 @@ import asyncio
|
|||
import logging
|
||||
import signal
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
|
||||
|
||||
def create_task(coro):
|
||||
global loop
|
||||
loop.create_task(coro)
|
||||
loop = None
|
||||
|
||||
|
||||
async def shutdown(sig, loop):
|
||||
|
@ -24,6 +19,7 @@ async def shutdown(sig, loop):
|
|||
|
||||
def init_loop():
|
||||
global loop
|
||||
loop = asyncio.get_event_loop()
|
||||
loop.add_signal_handler(signal.SIGTERM, lambda: asyncio.create_task(shutdown(signal.SIGTERM, loop)))
|
||||
loop.add_signal_handler(signal.SIGINT, lambda: asyncio.create_task(shutdown(signal.SIGINT, loop)))
|
||||
return loop
|
||||
|
|
|
@ -1,30 +0,0 @@
|
|||
import asyncio
|
||||
import logging
|
||||
import signal
|
||||
|
||||
loop = None
|
||||
|
||||
|
||||
def create_task(coro):
|
||||
global loop
|
||||
loop.create_task(coro)
|
||||
|
||||
|
||||
async def shutdown(sig, loop):
|
||||
log = logging.getLogger()
|
||||
log.info(f"Received exit signal {sig.name}...")
|
||||
tasks = [t for t in asyncio.all_tasks() if t is not
|
||||
asyncio.current_task()]
|
||||
[task.cancel() for task in tasks]
|
||||
log.info(f"Cancelling {len(tasks)} outstanding tasks")
|
||||
await asyncio.wait_for(loop.shutdown_asyncgens(), timeout=10)
|
||||
loop.stop()
|
||||
log.info("Shutdown complete.")
|
||||
|
||||
|
||||
def init_loop():
|
||||
global loop
|
||||
loop = asyncio.get_event_loop()
|
||||
loop.add_signal_handler(signal.SIGTERM, lambda: asyncio.create_task(shutdown(signal.SIGTERM, loop)))
|
||||
loop.add_signal_handler(signal.SIGINT, lambda: asyncio.create_task(shutdown(signal.SIGINT, loop)))
|
||||
return loop
|
0
core/integration_tests/__init__.py
Normal file
0
core/integration_tests/__init__.py
Normal file
63
core/integration_tests/main.py
Normal file
63
core/integration_tests/main.py
Normal file
|
@ -0,0 +1,63 @@
|
|||
import time
|
||||
import os
|
||||
import signal
|
||||
import re
|
||||
import sys
|
||||
import subprocess
|
||||
|
||||
from django.db.models.expressions import result
|
||||
|
||||
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
|
||||
def run():
|
||||
while True:
|
||||
newpid = os.fork()
|
||||
if newpid == 0:
|
||||
import coverage
|
||||
cov = coverage.Coverage()
|
||||
cov.load()
|
||||
cov.start()
|
||||
signal.signal(signal.SIGINT, signal.default_int_handler)
|
||||
try:
|
||||
from server import main
|
||||
main()
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
cov.stop()
|
||||
cov.save()
|
||||
os._exit(0)
|
||||
else:
|
||||
return newpid
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
pid = run()
|
||||
time.sleep(1)
|
||||
script_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
pattern = re.compile(r'^test_.*\.(sh|py)$')
|
||||
dotpy = re.compile(r'^.*\.py$')
|
||||
dotsh = re.compile(r'^.*\.sh$')
|
||||
matching_files = [
|
||||
filename for filename in os.listdir(script_dir)
|
||||
if os.path.isfile(os.path.join(script_dir, filename)) and pattern.match(filename)
|
||||
]
|
||||
total = 0
|
||||
failed = 0
|
||||
for file in matching_files:
|
||||
file_path = os.path.join(script_dir, file)
|
||||
if dotpy.match(file):
|
||||
result = subprocess.run(['python3', file_path], capture_output=True, text=True)
|
||||
elif dotsh.match(file):
|
||||
result = subprocess.run(['bash', file_path], capture_output=True, text=True)
|
||||
else:
|
||||
result = subprocess.run(['bash', '-c', file_path], capture_output=True, text=True)
|
||||
print('{} returned {}'.format(file, result.returncode))
|
||||
if result.returncode != 0:
|
||||
print(result.stderr, result.stdout)
|
||||
failed += 1
|
||||
total += 1
|
||||
time.sleep(1)
|
||||
os.kill(pid, signal.SIGINT)
|
||||
print(f'{total - failed} out of {total} tests succeeded, {failed} failed')
|
||||
os._exit(0 if failed == 0 else 1)
|
1
core/integration_tests/test_bar.py
Normal file
1
core/integration_tests/test_bar.py
Normal file
|
@ -0,0 +1 @@
|
|||
#!/usr/bin/env python3
|
3
core/integration_tests/test_foo.sh
Normal file
3
core/integration_tests/test_foo.sh
Normal file
|
@ -0,0 +1,3 @@
|
|||
#!/bin/bash
|
||||
|
||||
exit 1
|
|
@ -9,7 +9,7 @@ import uvicorn
|
|||
|
||||
django.setup()
|
||||
|
||||
from helper import init_loop
|
||||
from core.globals import init_loop
|
||||
from mail.protocol import LMTPHandler
|
||||
from mail.socket import UnixSocketLMTPController
|
||||
|
||||
|
@ -51,7 +51,6 @@ async def lmtp(loop):
|
|||
|
||||
async with server:
|
||||
await server.serve_forever()
|
||||
log.info("LMTP done")
|
||||
|
||||
|
||||
def main():
|
||||
|
@ -82,7 +81,6 @@ def main():
|
|||
os.remove("web.sock")
|
||||
except Exception as e:
|
||||
log.error(e)
|
||||
log.error(e)
|
||||
logging.info("Server stopped")
|
||||
|
||||
logging.shutdown()
|
||||
|
|
Loading…
Add table
Reference in a new issue