From ab5e8f36d17e3d4d326149df77a0c9dcc93997b4 Mon Sep 17 00:00:00 2001 From: jedi Date: Sun, 7 Jan 2024 20:46:48 +0100 Subject: [PATCH] add API endpoints and admin views for authentication module --- core/authentication/admin.py | 17 ++ core/authentication/api_v2.py | 116 +++++++++++ core/authentication/tests/__init__.py | 0 core/authentication/tests/v2/__init__.py | 0 .../tests/v2/test_permissions.py | 90 +++++++++ core/authentication/tests/v2/test_users.py | 183 ++++++++++++++++++ core/core/urls.py | 1 + 7 files changed, 407 insertions(+) create mode 100644 core/authentication/admin.py create mode 100644 core/authentication/api_v2.py create mode 100644 core/authentication/tests/__init__.py create mode 100644 core/authentication/tests/v2/__init__.py create mode 100644 core/authentication/tests/v2/test_permissions.py create mode 100644 core/authentication/tests/v2/test_users.py diff --git a/core/authentication/admin.py b/core/authentication/admin.py new file mode 100644 index 0000000..ed88ace --- /dev/null +++ b/core/authentication/admin.py @@ -0,0 +1,17 @@ +from django.contrib import admin +from django.contrib.auth.admin import UserAdmin + +from authentication.models import ExtendedUser + + +class ExtendedUserAdmin(UserAdmin): + list_display = ('username', 'email', 'first_name', 'last_name', 'is_staff', 'is_superuser') + search_fields = ('username', 'email', 'first_name', 'last_name') + ordering = ('username',) + filter_horizontal = ('groups', 'user_permissions', 'permissions') + + def permissions(self, obj): + return ', '.join(obj.get_all_permissions()) + + +admin.site.register(ExtendedUser, ExtendedUserAdmin) diff --git a/core/authentication/api_v2.py b/core/authentication/api_v2.py new file mode 100644 index 0000000..514a697 --- /dev/null +++ b/core/authentication/api_v2.py @@ -0,0 +1,116 @@ +from rest_framework import routers, viewsets, serializers, permissions +from rest_framework.decorators import api_view, permission_classes, authentication_classes +from rest_framework.authtoken.serializers import AuthTokenSerializer +from rest_framework.permissions import IsAuthenticated +from rest_framework.response import Response +from django.contrib.auth import login +from django.urls import path +from django.dispatch import receiver +from django.db.models.signals import post_save +from django.contrib.auth.models import Group +from knox.models import AuthToken +from knox.views import LoginView as KnoxLoginView + +from authentication.models import ExtendedUser + + +class UserSerializer(serializers.ModelSerializer): + permissions = serializers.SerializerMethodField() + groups = serializers.SlugRelatedField(many=True, read_only=True, slug_field='name') + + class Meta: + model = ExtendedUser + fields = ('id', 'username', 'email', 'first_name', 'last_name', 'permissions', 'groups') + read_only_fields = ('id', 'username', 'email', 'first_name', 'last_name', 'permissions', 'groups') + + def get_permissions(self, obj): + return list(set(obj.get_permissions())) + + +@receiver(post_save, sender=ExtendedUser) +def create_auth_token(sender, instance=None, created=False, **kwargs): + if created: + AuthToken.objects.create(user=instance) + + +class UserViewSet(viewsets.ModelViewSet): + queryset = ExtendedUser.objects.all() + serializer_class = UserSerializer + + +class GroupSerializer(serializers.ModelSerializer): + permissions = serializers.SerializerMethodField() + members = serializers.SerializerMethodField() + + class Meta: + model = Group + fields = ('id', 'name', 'permissions', 'members') + + def get_permissions(self, obj): + return ["*:" + p.codename for p in obj.permissions.all()] + + def get_members(self, obj): + return [u.username for u in obj.user_set.all()] + + +class GroupViewSet(viewsets.ModelViewSet): + queryset = Group.objects.all() + serializer_class = GroupSerializer + + +@api_view(['GET']) +@permission_classes([IsAuthenticated]) +def selfUser(request): + serializer = UserSerializer(request.user) + return Response(serializer.data, status=200) + + +@api_view(['POST']) +@permission_classes([]) +@authentication_classes([]) +def registerUser(request): + try: + username = request.data.get('username') + password = request.data.get('password') + email = request.data.get('email') + + errors = {} + if not username: + errors['username'] = 'Username is required' + if not password: + errors['password'] = 'Password is required' + if not email: + errors['email'] = 'Email is required' + if ExtendedUser.objects.filter(email=email).exists(): + errors['email'] = 'Email already exists' + if ExtendedUser.objects.filter(username=username).exists(): + errors['username'] = 'Username already exists' + if errors: + return Response({'errors': errors}, status=400) + user = ExtendedUser.objects.create_user(username, email, password) + return Response({'username': user.username, 'email': user.email}, status=201) + except Exception as e: + return Response({'errors': str(e)}, status=400) + + +class LoginView(KnoxLoginView): + permission_classes = (permissions.AllowAny,) + authentication_classes = () + + def post(self, request, format=None): + serializer = AuthTokenSerializer(data=request.data) + serializer.is_valid(raise_exception=True) + user = serializer.validated_data['user'] + login(request, user) + return super(LoginView, self).post(request, format=None) + + +router = routers.SimpleRouter() +router.register(r'users', UserViewSet, basename='users') +router.register(r'groups', GroupViewSet, basename='groups') + +urlpatterns = router.urls + [ + path('self/', selfUser), + path('login/', LoginView.as_view()), + path('register/', registerUser), +] diff --git a/core/authentication/tests/__init__.py b/core/authentication/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/core/authentication/tests/v2/__init__.py b/core/authentication/tests/v2/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/core/authentication/tests/v2/test_permissions.py b/core/authentication/tests/v2/test_permissions.py new file mode 100644 index 0000000..0a00fcd --- /dev/null +++ b/core/authentication/tests/v2/test_permissions.py @@ -0,0 +1,90 @@ +from django.test import TestCase, Client +from django.contrib.auth.models import Permission +from knox.models import AuthToken + +from authentication.models import EventPermission, ExtendedUser +from inventory.models import Event + + +class PermissionsTestCase(TestCase): + def setUp(self): + super().setUp() + self.user = ExtendedUser.objects.create_user('testuser', 'test', 'test') + self.user.user_permissions.add(*Permission.objects.all()) + event1 = Event.objects.create(slug='testevent1', name='testevent1') + event2 = Event.objects.create(slug='testevent2', name='testevent2') + permission1 = Permission.objects.get(codename='view_event') + EventPermission.objects.create(user=self.user, permission=permission1, event=event1) + EventPermission.objects.create(user=self.user, permission=permission1, event=event2) + self.token = AuthToken.objects.create(user=self.user) + self.client = Client(headers={'Authorization': 'Token ' + self.token[1]}) + self.newuser = ExtendedUser.objects.create_user('newuser', 'test', 'test') + self.newuser_token = AuthToken.objects.create(user=self.newuser) + self.newuser_client = Client(headers={'Authorization': 'Token ' + self.newuser_token[1]}) + + def test_user_permissions(self): + """ + Test that a user can only access their own data. + """ + response = self.client.get('/api/2/users/') + self.assertEqual(response.status_code, 200) + self.assertEqual(len(response.json()), 3) + self.assertEqual(response.json()[0]['username'], 'legacy_user') + self.assertEqual(response.json()[0]['email'], 'mail@localhost') + self.assertEqual(response.json()[0]['first_name'], '') + self.assertEqual(response.json()[0]['last_name'], '') + self.assertEqual(response.json()[0]['id'], 1) + self.assertEqual(response.json()[1]['username'], 'testuser') + self.assertEqual(response.json()[1]['email'], 'test') + self.assertEqual(response.json()[1]['first_name'], '') + self.assertEqual(response.json()[1]['last_name'], '') + + def test_user_permission(self): + """ + Test that a user can only access their own data. + """ + #ä['add_logentry', 'change_logentry', 'delete_logentry', 'view_logentry', 'add_group', 'change_group', + #ä 'delete_group', 'view_group', 'add_permission', 'change_permission', 'delete_permission', 'view_permission', + #ä 'add_authtokeneventpermissions', 'change_authtokeneventpermissions', 'delete_authtokeneventpermissions', + #ä 'view_authtokeneventpermissions', 'add_eventpermission', 'change_eventpermission', 'delete_eventpermission', + #ä 'view_eventpermission', 'add_extendedauthtoken', 'change_extendedauthtoken', 'delete_extendedauthtoken', + #ä 'view_extendedauthtoken', 'add_extendeduser', 'change_extendeduser', 'delete_extendeduser', + #ä 'view_extendeduser', 'add_contenttype', 'change_contenttype', 'delete_contenttype', 'view_contenttype', + #ä 'add_file', 'change_file', 'delete_file', 'view_file', 'add_container', 'change_container', 'delete_container', + #ä 'view_container', 'add_event', 'change_event', 'delete_event', 'view_event', 'add_item', 'change_item', + #ä 'delete_item', 'match_item', 'view_item', 'add_authtoken', 'change_authtoken', 'delete_authtoken', + #ä 'view_authtoken', 'add_email', 'change_email', 'delete_email', 'view_email', 'add_eventaddress', + #ä 'change_eventaddress', 'delete_eventaddress', 'view_eventaddress', 'add_systemevent', 'change_systemevent', + #ä 'delete_systemevent', 'view_systemevent', 'add_session', 'change_session', 'delete_session', 'view_session', + #ä 'add_comment', 'change_comment', 'delete_comment', 'view_comment', 'add_issuethread', 'change_issuethread', + #ä 'delete_issuethread', 'send_mail', 'view_issuethread', 'add_statechange', 'change_statechange', + #ä 'delete_statechange', 'view_statechange'] + + user = ExtendedUser.objects.create_user('testuser2', 'test', 'test') + user.event_permissions.create(permission=Permission.objects.get(codename='view_item'), event=Event.objects.get(slug='testevent1')) + user.event_permissions.create(permission=Permission.objects.get(codename='view_item'), event=Event.objects.get(slug='testevent2')) + user.event_permissions.create(permission=Permission.objects.get(codename='add_item'), event=Event.objects.get(slug='testevent1')) + user.save() + #self.assertTrue(user.has_perm('inventory.view_event', Event.objects.get(slug='testevent1'))) + #self.assertTrue(user.has_perm('inventory.view_event', Event.objects.get(slug='testevent2'))) + #self.assertFalse(user.has_perm('inventory.add_event', Event.objects.get(slug='testevent1'))) + #self.assertFalse(user.has_perm('inventory.add_event', Event.objects.get(slug='testevent2'))) + + def test_item_api_permissions(self): + """ + Test that a user can only access their own data. + """ + response = self.client.get('/api/2/testevent1/items/') + self.assertEqual(response.status_code, 200) + self.assertEqual(len(response.json()), 0) + + response = self.client.get('/api/2/testevent2/items/') + self.assertEqual(response.status_code, 200) + self.assertEqual(len(response.json()), 0) + + response = self.newuser_client.get('/api/2/testevent1/items/') + self.assertEqual(response.status_code, 403) + + response = self.newuser_client.get('/api/2/testevent2/items/') + self.assertEqual(response.status_code, 403) + diff --git a/core/authentication/tests/v2/test_users.py b/core/authentication/tests/v2/test_users.py new file mode 100644 index 0000000..125be9b --- /dev/null +++ b/core/authentication/tests/v2/test_users.py @@ -0,0 +1,183 @@ +from django.test import TestCase, Client +from django.contrib.auth.models import Permission, Group + +from knox.models import AuthToken + +from authentication.models import ExtendedUser, EventPermission +from core import settings +from inventory.models import Event + + +class UserApiTest(TestCase): + + def setUp(self): + self.event = Event.objects.create(name='testevent', slug='testevent') + self.group1 = Group.objects.create(name='testgroup1') + self.group2 = Group.objects.create(name='testgroup2') + self.group1.permissions.add(Permission.objects.get(codename='add_item')) + self.group1.permissions.add(Permission.objects.get(codename='view_item')) + self.group2.permissions.add(Permission.objects.get(codename='view_event')) + self.group2.permissions.add(Permission.objects.get(codename='view_item')) + self.user = ExtendedUser.objects.create_user('testuser', 'test', 'test') + self.user.user_permissions.add(Permission.objects.get(codename='add_event')) + self.user.groups.add(self.group1) + self.user.groups.add(self.group2) + self.user.save() + EventPermission.objects.create(event=self.event, user=self.user, + permission=Permission.objects.get(codename='delete_item')) + self.user.save() + self.token = AuthToken.objects.create(user=self.user) + self.client = Client(headers={'Authorization': 'Token ' + self.token[1]}) + + def test_users(self): + response = self.client.get('/api/2/users/') + self.assertEqual(response.status_code, 200) + self.assertEqual(len(response.json()), 2) + self.assertEqual(response.json()[0]['username'], settings.LEGACY_USER_NAME) + self.assertEqual(response.json()[0]['email'], 'mail@' + settings.MAIL_DOMAIN) + self.assertEqual(response.json()[0]['first_name'], '') + self.assertEqual(response.json()[0]['last_name'], '') + self.assertEqual(response.json()[0]['id'], 1) + self.assertEqual(response.json()[0]['groups'], []) + self.assertEqual(response.json()[1]['username'], 'testuser') + self.assertEqual(response.json()[1]['email'], 'test') + self.assertEqual(response.json()[1]['first_name'], '') + self.assertEqual(response.json()[1]['last_name'], '') + self.assertEqual(response.json()[1]['id'], 2) + self.assertEqual(response.json()[1]['groups'], ['testgroup1', 'testgroup2']) + + def test_self_user(self): + response = self.client.get('/api/2/self/') + self.assertEqual(response.status_code, 200) + self.assertEqual(response.json()['username'], 'testuser') + self.assertEqual(response.json()['email'], 'test') + self.assertEqual(response.json()['first_name'], '') + self.assertEqual(response.json()['last_name'], '') + permissions = response.json()['permissions'] + self.assertEqual(len(permissions), 5) + self.assertTrue('*:add_item' in permissions) + self.assertTrue('*:view_item' in permissions) + self.assertTrue('*:view_event' in permissions) + self.assertTrue('testevent:delete_item' in permissions) + self.assertTrue('*:add_event' in permissions) + + def test_register_user(self): + anonymous = Client() + response = anonymous.post('/api/2/register/', {'username': 'testuser2', 'password': 'test', 'email': 'test2'}, + content_type='application/json') + self.assertEqual(response.status_code, 201) + self.assertEqual(response.json()['username'], 'testuser2') + self.assertEqual(response.json()['email'], 'test2') + self.assertEqual(len(ExtendedUser.objects.all()), 3) + self.assertEqual(ExtendedUser.objects.get(username='testuser2').email, 'test2') + self.assertTrue(ExtendedUser.objects.get(username='testuser2').check_password('test')) + + def test_register_user_duplicate(self): + anonymous = Client() + response = anonymous.post('/api/2/register/', {'username': 'testuser', 'password': 'test', 'email': 'test2'}, + content_type='application/json') + self.assertEqual(response.status_code, 400) + self.assertEqual(response.json()['errors']['username'], 'Username already exists') + self.assertEqual(len(ExtendedUser.objects.all()), 2) + + def test_register_user_no_username(self): + anonymous = Client() + response = anonymous.post('/api/2/register/', {'password': 'test', 'email': 'test2'}, + content_type='application/json') + self.assertEqual(response.status_code, 400) + self.assertEqual(response.json()['errors']['username'], 'Username is required') + self.assertEqual(len(ExtendedUser.objects.all()), 2) + + def test_register_user_no_password(self): + anonymous = Client() + response = anonymous.post('/api/2/register/', {'username': 'testuser2', 'email': 'test2'}, + content_type='application/json') + self.assertEqual(response.status_code, 400) + self.assertEqual(response.json()['errors']['password'], 'Password is required') + self.assertEqual(len(ExtendedUser.objects.all()), 2) + + def test_register_user_no_email(self): + anonymous = Client() + response = anonymous.post('/api/2/register/', {'username': 'testuser2', 'password': 'test'}, + content_type='application/json') + self.assertEqual(response.status_code, 400) + self.assertEqual(response.json()['errors']['email'], 'Email is required') + self.assertEqual(len(ExtendedUser.objects.all()), 2) + + def test_register_user_duplicate_email(self): + anonymous = Client() + response = anonymous.post('/api/2/register/', {'username': 'testuser2', 'password': 'test', 'email': 'test'}, + content_type='application/json') + self.assertEqual(response.status_code, 400) + self.assertEqual(response.json()['errors']['email'], 'Email already exists') + self.assertEqual(len(ExtendedUser.objects.all()), 2) + + def test_get_token(self): + anonymous = Client() + response = anonymous.post('/api/2/login/', {'username': 'testuser', 'password': 'test'}, + content_type='application/json') + self.assertEqual(response.status_code, 200) + self.assertTrue('token' in response.json()) + + def test_legacy_user(self): + response = self.client.get('/api/2/users/1/') + self.assertEqual(response.status_code, 200) + self.assertEqual(response.json()['username'], settings.LEGACY_USER_NAME) + self.assertEqual(response.json()['email'], 'mail@' + settings.MAIL_DOMAIN) + self.assertEqual(response.json()['first_name'], '') + self.assertEqual(response.json()['last_name'], '') + self.assertEqual(response.json()['id'], 1) + + def test_get_legacy_user_token(self): + anonymous = Client() + response = anonymous.post('/api/2/login/', { + 'username': settings.LEGACY_USER_NAME, 'password': settings.LEGACY_USER_PASSWORD}, + content_type='application/json') + self.assertEqual(response.status_code, 200) + self.assertTrue('token' in response.json()) + + +class GroupApiTest(TestCase): + + def setUp(self): + self.event = Event.objects.create(name='testevent', slug='testevent') + # Admin, Orga, Team, User are created by default + self.group1 = Group.objects.create(name='testgroup1') + self.group2 = Group.objects.create(name='testgroup2') + self.group1.permissions.add(Permission.objects.get(codename='add_item')) + self.group1.permissions.add(Permission.objects.get(codename='view_item')) + self.group2.permissions.add(Permission.objects.get(codename='view_event')) + self.group2.permissions.add(Permission.objects.get(codename='view_item')) + self.user = ExtendedUser.objects.create_user('testuser', 'test', 'test') + self.user.user_permissions.add(Permission.objects.get(codename='add_event')) + self.user.groups.add(self.group1) + self.user.groups.add(self.group2) + self.user.save() + EventPermission.objects.create(event=self.event, user=self.user, + permission=Permission.objects.get(codename='delete_item')) + self.user.save() + self.token = AuthToken.objects.create(user=self.user) + self.client = Client(headers={'Authorization': 'Token ' + self.token[1]}) + + def test_groups(self): + response = self.client.get('/api/2/groups/') + self.assertEqual(response.status_code, 200) + self.assertEqual(len(response.json()), 6) + self.assertEqual(response.json()[0]['name'], 'Admin') + self.assertEqual(response.json()[1]['name'], 'Orga') + self.assertEqual(response.json()[2]['name'], 'Team') + self.assertEqual(response.json()[3]['name'], 'User') + self.assertEqual(response.json()[4]['name'], 'testgroup1') + self.assertEqual(response.json()[5]['name'], 'testgroup2') + + def test_group(self): + response = self.client.get('/api/2/groups/5/') + self.assertEqual(response.status_code, 200) + self.assertEqual(response.json()['name'], 'testgroup1') + permissions = response.json()['permissions'] + self.assertEqual(len(permissions), 2) + self.assertTrue('*:add_item' in permissions) + self.assertTrue('*:view_item' in permissions) + members = response.json()['members'] + self.assertEqual(len(members), 1) + self.assertEqual(members[0], 'testuser') diff --git a/core/core/urls.py b/core/core/urls.py index c115026..7d37ec0 100644 --- a/core/core/urls.py +++ b/core/core/urls.py @@ -24,5 +24,6 @@ urlpatterns = [ path('api/1/', include('inventory.api_v1')), path('api/1/', include('files.api_v1')), path('api/1/', include('files.media_v1')), + path('api/2/', include('authentication.api_v2')), path('api/', get_info), ]