import pytest from fastapi.testclient import TestClient from sqlalchemy import select from sqlalchemy.orm import Session from allmende_payment_system.app import app from allmende_payment_system.database import ensure_user from allmende_payment_system.models import Permission, User, UserGroup @pytest.fixture def admin_user(test_db): user_info = {"username": "admin", "display_name": "The Administrator"} user = ensure_user(user_info, test_db) group = UserGroup(id=1, name="Admins") group.permissions.append(Permission(scope="user", action="edit")) user.user_groups.append(group) test_db.add(group) test_db.flush() return "admin" def test_user_add_group(test_db, client, admin_user): user_info = {"username": "test", "display_name": "Display Test"} user = ensure_user(user_info, test_db) group = UserGroup(name="Bosses") test_db.add(group) test_db.flush() assert 0 == len(user.user_groups) response = client.post( f"/admin/users/{user.id}/add_group", data={"group_id": group.id}, user=admin_user, follow_redirects=False, ) assert response.status_code == 303 user = test_db.execute(select(User).where(User.username == "test")).scalar() assert 1 == len(user.user_groups) assert "Bosses" == user.user_groups[0].name def test_user_remove_group(test_db, client, admin_user): user_info = {"username": "test", "display_name": "Display Test"} user = ensure_user(user_info, test_db) group = UserGroup(name="Bosses") test_db.add(group) user.user_groups.append(group) test_db.flush() assert 1 == len(user.user_groups) response = client.get( f"/admin/users/{user.id}/remove_group/{group.id}", user=admin_user, follow_redirects=False, ) assert response.status_code == 303 user = test_db.execute(select(User).where(User.username == "test")).scalar() assert 0 == len(user.user_groups) def test_group_add_permission(test_db, client, admin_user): group = test_db.query(UserGroup).scalar() response = client.post( f"/admin/groups/{group.id}/add_permission", data={"permission": "foo:bar"}, user=admin_user, follow_redirects=False, ) assert response.status_code == 303 group = test_db.execute(select(UserGroup).where(UserGroup.id == group.id)).scalar() assert any( perm.scope == "foo" and perm.action == "bar" for perm in group.permissions ) def test_group_add_permission_illegal_format(test_db, client, admin_user): group = test_db.query(UserGroup).scalar() response = client.post( f"/admin/groups/{group.id}/add_permission", data={"permission": "foobar"}, user=admin_user, follow_redirects=False, ) assert response.status_code == 400 def test_group_remove_permission(test_db, client, admin_user): group = test_db.query(UserGroup).scalar() response = client.get( f"/admin/groups/{group.id}/remove_permission/1", user=admin_user, follow_redirects=False, ) assert response.status_code == 303 group = test_db.execute(select(UserGroup).where(UserGroup.id == group.id)).scalar() assert 0 == len(group.permissions) def test_create_group(test_db, client, admin_user): response = client.post( "/admin/groups/create", data={"name": "New Group", "description": "A newly created group"}, user=admin_user, follow_redirects=False, ) assert response.status_code == 303 assert test_db.query(UserGroup).filter_by(name="New Group").scalar() is not None def test_delete_group(test_db, client, admin_user): group = UserGroup(name="To Be Deleted") test_db.add(group) test_db.flush() response = client.get(f"/admin/groups/{group.id}/delete", user=admin_user) assert response.status_code == 200 assert ( test_db.execute(select(UserGroup).where(UserGroup.id == group.id)).scalar() is None )