From a6e97c6170edae7977e2e8387e29b9076603b001 Mon Sep 17 00:00:00 2001 From: Niklas Meinzer Date: Sat, 3 Jan 2026 10:39:52 +0100 Subject: [PATCH] feat(admin): Add group admin and test for admin views --- src/allmende_payment_system/api/admin.py | 96 ++++++++++++- .../templates/base.html.jinja | 5 + .../templates/groups.html.jinja | 94 +++++++++++++ test/conftest.py | 15 +- test/test_admin.py | 130 ++++++++++++++++++ test/test_shop.py | 26 ++-- 6 files changed, 352 insertions(+), 14 deletions(-) create mode 100644 src/allmende_payment_system/templates/groups.html.jinja create mode 100644 test/test_admin.py diff --git a/src/allmende_payment_system/api/admin.py b/src/allmende_payment_system/api/admin.py index 499da86..eefb380 100644 --- a/src/allmende_payment_system/api/admin.py +++ b/src/allmende_payment_system/api/admin.py @@ -6,11 +6,13 @@ from starlette import status from starlette.responses import RedirectResponse from allmende_payment_system.api.dependencies import SessionDep, UserDep -from allmende_payment_system.models import User, UserGroup +from allmende_payment_system.models import Permission, User, UserGroup from allmende_payment_system.tools import get_jinja_renderer admin_router = APIRouter(prefix="/admin") +# USERS + @admin_router.get("/users") async def user_list(request: Request, session: SessionDep, user: UserDep): @@ -62,3 +64,95 @@ async def user_remove_group( print(user) user.user_groups.remove(group) return RedirectResponse(url="/admin/users", status_code=status.HTTP_303_SEE_OTHER) + + +# GROUPS + + +@admin_router.get("/groups") +async def group_list(request: Request, session: SessionDep, user: UserDep): + if not user.has_permission("user", "edit"): + raise HTTPException(status_code=403, detail="Insufficient permissions") + + groups = session.scalars(select(UserGroup)).all() + templates = get_jinja_renderer() + return templates.TemplateResponse( + "groups.html.jinja", + context={"request": request, "groups": groups}, + ) + + +@admin_router.post("/groups/{group_id}/add_permission") +async def group_add_permission( + request: Request, session: SessionDep, user: UserDep, group_id: int +): + if not user.has_permission("user", "edit"): + raise HTTPException(status_code=403, detail="Insufficient permissions") + + data = await request.form() + scope_action = data["permission"].split(":") + if len(scope_action) != 2: + raise HTTPException( + status_code=400, detail="Permission must be in the format 'scope:action'" + ) + + permission = Permission(scope=scope_action[0], action=scope_action[1]) + group = session.execute( + select(UserGroup).where(UserGroup.id == group_id) + ).scalar_one() + + session.add(permission) + group.permissions.append(permission) + + return RedirectResponse(url="/admin/groups", status_code=status.HTTP_303_SEE_OTHER) + + +@admin_router.get("/groups/{group_id}/remove_permission/{permission_id}") +async def group_remove_permission( + request: Request, + session: SessionDep, + user: UserDep, + group_id: int, + permission_id: int, +): + if not user.has_permission("user", "edit"): + raise HTTPException(status_code=403, detail="Insufficient permissions") + + permission = session.execute( + select(Permission).where(Permission.id == permission_id) + ).scalar_one() + group = session.execute( + select(UserGroup).where(UserGroup.id == group_id) + ).scalar_one() + group.permissions.remove(permission) + session.delete(permission) + return RedirectResponse(url="/admin/groups", status_code=status.HTTP_303_SEE_OTHER) + + +@admin_router.post("/groups/create") +async def create_group(request: Request, session: SessionDep, user: UserDep): + if not user.has_permission("user", "edit"): + raise HTTPException(status_code=403, detail="Insufficient permissions") + + data = await request.form() + group = UserGroup(name=data["name"], description=data["description"]) + session.add(group) + + return RedirectResponse(url="/admin/groups", status_code=status.HTTP_303_SEE_OTHER) + + +@admin_router.get("/groups/{group_id}/delete") +async def delete_group( + request: Request, + session: SessionDep, + user: UserDep, + group_id: int, +): + if not user.has_permission("user", "edit"): + raise HTTPException(status_code=403, detail="Insufficient permissions") + + group = session.execute( + select(UserGroup).where(UserGroup.id == group_id) + ).scalar_one() + session.delete(group) + return RedirectResponse(url="/admin/groups", status_code=status.HTTP_303_SEE_OTHER) diff --git a/src/allmende_payment_system/templates/base.html.jinja b/src/allmende_payment_system/templates/base.html.jinja index b56a066..f7a6039 100644 --- a/src/allmende_payment_system/templates/base.html.jinja +++ b/src/allmende_payment_system/templates/base.html.jinja @@ -35,6 +35,11 @@ Nutzerverwaltung + {% endif %} diff --git a/src/allmende_payment_system/templates/groups.html.jinja b/src/allmende_payment_system/templates/groups.html.jinja new file mode 100644 index 0000000..be019b7 --- /dev/null +++ b/src/allmende_payment_system/templates/groups.html.jinja @@ -0,0 +1,94 @@ +{% extends "base.html.jinja" %} +{% block content %} +
+

Gruppen verwalten

+ +
+ + + + + {% if groups|length == 0 %} +
Keine Gruppen vorhanden.
+ {% else %} +
+ + + + + + + + + + + {% for group in groups %} + + + + + + + + + + {% endfor %} + +
IDNameBeschreibung
{{ group.id }}{{ group.name }}{{ group.description }} + {% for permission in group.permissions %} + {{ permission.scope }}:{{ permission.action }} + {% endfor %} + + + Gruppe löschen +
+
+ {% endif %} + +{% endblock %} \ No newline at end of file diff --git a/test/conftest.py b/test/conftest.py index 0f90af9..d61fabd 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -41,10 +41,23 @@ def get_test_session(): app.dependency_overrides[get_session] = get_test_session +class APSTestClient(TestClient): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + def post(self, *args, user: str = "test", **kwargs): + with mock.patch.dict(os.environ, {"APS_username": user}, clear=False): + return super().post(*args, **kwargs) + + def get(self, *args, user: str = "test", **kwargs): + with mock.patch.dict(os.environ, {"APS_username": user}, clear=False): + return super().get(*args, **kwargs) + + @pytest.fixture(scope="session") def client(): os.environ["APS_username"] = "test" - return TestClient(app) + return APSTestClient(app) @pytest.fixture(scope="session") diff --git a/test/test_admin.py b/test/test_admin.py new file mode 100644 index 0000000..e886c29 --- /dev/null +++ b/test/test_admin.py @@ -0,0 +1,130 @@ +import pytest +from fastapi.testclient import TestClient +from sqlalchemy import select +from sqlalchemy.orm import Session + +from allmende_payment_system.app import app +from allmende_payment_system.database import ensure_user +from allmende_payment_system.models import Permission, User, UserGroup + + +@pytest.fixture +def admin_user(test_db): + user_info = {"username": "admin", "display_name": "The Administrator"} + user = ensure_user(user_info, test_db) + + group = UserGroup(id=1, name="Admins") + group.permissions.append(Permission(scope="user", action="edit")) + user.user_groups.append(group) + test_db.add(group) + test_db.flush() + return "admin" + + +def test_user_add_group(test_db, client, admin_user): + user_info = {"username": "test", "display_name": "Display Test"} + user = ensure_user(user_info, test_db) + + group = UserGroup(name="Bosses") + test_db.add(group) + test_db.flush() + + assert 0 == len(user.user_groups) + + response = client.post( + f"/admin/users/{user.id}/add_group", + data={"group_id": group.id}, + user=admin_user, + follow_redirects=False, + ) + assert response.status_code == 303 + + user = test_db.execute(select(User).where(User.username == "test")).scalar() + assert 1 == len(user.user_groups) + assert "Bosses" == user.user_groups[0].name + + +def test_user_remove_group(test_db, client, admin_user): + user_info = {"username": "test", "display_name": "Display Test"} + user = ensure_user(user_info, test_db) + + group = UserGroup(name="Bosses") + test_db.add(group) + user.user_groups.append(group) + test_db.flush() + + assert 1 == len(user.user_groups) + + response = client.get( + f"/admin/users/{user.id}/remove_group/{group.id}", + user=admin_user, + follow_redirects=False, + ) + assert response.status_code == 303 + + user = test_db.execute(select(User).where(User.username == "test")).scalar() + assert 0 == len(user.user_groups) + + +def test_group_add_permission(test_db, client, admin_user): + group = test_db.query(UserGroup).scalar() + + response = client.post( + f"/admin/groups/{group.id}/add_permission", + data={"permission": "foo:bar"}, + user=admin_user, + follow_redirects=False, + ) + assert response.status_code == 303 + + group = test_db.execute(select(UserGroup).where(UserGroup.id == group.id)).scalar() + assert any( + perm.scope == "foo" and perm.action == "bar" for perm in group.permissions + ) + + +def test_group_add_permission_illegal_format(test_db, client, admin_user): + group = test_db.query(UserGroup).scalar() + + response = client.post( + f"/admin/groups/{group.id}/add_permission", + data={"permission": "foobar"}, + user=admin_user, + follow_redirects=False, + ) + assert response.status_code == 400 + + +def test_group_remove_permission(test_db, client, admin_user): + group = test_db.query(UserGroup).scalar() + response = client.get( + f"/admin/groups/{group.id}/remove_permission/1", + user=admin_user, + follow_redirects=False, + ) + assert response.status_code == 303 + group = test_db.execute(select(UserGroup).where(UserGroup.id == group.id)).scalar() + assert 0 == len(group.permissions) + + +def test_create_group(test_db, client, admin_user): + response = client.post( + "/admin/groups/create", + data={"name": "New Group", "description": "A newly created group"}, + user=admin_user, + follow_redirects=False, + ) + assert response.status_code == 303 + assert test_db.query(UserGroup).filter_by(name="New Group").scalar() is not None + + +def test_delete_group(test_db, client, admin_user): + group = UserGroup(name="To Be Deleted") + test_db.add(group) + test_db.flush() + response = client.get(f"/admin/groups/{group.id}/delete", user=admin_user) + assert response.status_code == 200 + assert ( + test_db.execute(select(UserGroup).where(UserGroup.id == group.id)).scalar() + is None + ) diff --git a/test/test_shop.py b/test/test_shop.py index d6eeaff..8682093 100644 --- a/test/test_shop.py +++ b/test/test_shop.py @@ -2,9 +2,9 @@ from decimal import Decimal from unittest import mock import pytest +from conftest import APSTestClient from fake_data import fake from sqlalchemy import select -from starlette.testclient import TestClient from allmende_payment_system.database import ensure_user from allmende_payment_system.models import ( @@ -56,7 +56,7 @@ def product(test_db): return product -def test_add_item_to_cart(client: TestClient, test_db, product): +def test_add_item_to_cart(client: APSTestClient, test_db, product): form_data = {"product_id": product.id, "quantity": 2, "area_id": product.area.id} @@ -72,7 +72,7 @@ def test_add_item_to_cart(client: TestClient, test_db, product): assert len(cart.items) == 1 -def test_edit_item_in_cart(client: TestClient, test_db, product): +def test_edit_item_in_cart(client: APSTestClient, test_db, product): form_data = {"product_id": product.id, "quantity": 2, "area_id": product.area.id} response = client.post( @@ -99,7 +99,7 @@ def test_edit_item_in_cart(client: TestClient, test_db, product): assert cart.items[0].total_amount == product.price * 3 -def test_remove_item_from_cart(client: TestClient, test_db, product): +def test_remove_item_from_cart(client: APSTestClient, test_db, product): user = create_user_with_account(test_db, "test") @@ -122,7 +122,7 @@ def test_remove_item_from_cart(client: TestClient, test_db, product): assert len(test_db.scalars(select(OrderItem)).all()) == 0 -def test_remove_item_from_cart_wrong_user(client: TestClient, test_db, product): +def test_remove_item_from_cart_wrong_user(client: APSTestClient, test_db, product): user = create_user_with_account(test_db, "test") user.shopping_cart.items.append( @@ -135,15 +135,16 @@ def test_remove_item_from_cart_wrong_user(client: TestClient, test_db, product): id_ = user.shopping_cart.items[0].id - with mock.patch.dict("os.environ", {"APS_username": "other_user"}): - response = client.get(f"/shop/cart/remove/{id_}", follow_redirects=False) + response = client.get( + f"/shop/cart/remove/{id_}", follow_redirects=False, user="other_user" + ) assert response.status_code == 404 cart = test_db.scalar(select(Order)) assert len(cart.items) == 1 -def test_finalize_order(client: TestClient, test_db, product): +def test_finalize_order(client: APSTestClient, test_db, product): user = create_user_with_account(test_db, "test", balance=100.0) @@ -161,7 +162,7 @@ def test_finalize_order(client: TestClient, test_db, product): assert user.accounts[0].balance == Decimal(100.0) - (product.price * 2) -def test_view_order(client: TestClient, test_db, product): +def test_view_order(client: APSTestClient, test_db, product): user = create_user_with_account(test_db, "test") order = add_finalized_order_to_user(test_db, user, product) @@ -172,11 +173,12 @@ def test_view_order(client: TestClient, test_db, product): assert product.name in response.text -def test_view_order_wrong_user(client: TestClient, test_db, product): +def test_view_order_wrong_user(client: APSTestClient, test_db, product): user = create_user_with_account(test_db, "test") order = add_finalized_order_to_user(test_db, user, product) - with mock.patch.dict("os.environ", {"APS_username": "other_user"}): - response = client.get(f"/shop/order/{order.id}") + response = client.get( + f"/shop/order/{order.id}", follow_redirects=False, user="other_user" + ) assert response.status_code == 403