190 lines
5.8 KiB
Python
190 lines
5.8 KiB
Python
import pytest
|
|
from fastapi.testclient import TestClient
|
|
from sqlalchemy import select
|
|
from sqlalchemy.orm import Session
|
|
|
|
from allmende_payment_system.app import app
|
|
from allmende_payment_system.database import ensure_user
|
|
from allmende_payment_system.models import Account, Permission, User, UserGroup
|
|
|
|
|
|
@pytest.fixture
|
|
def admin_user(test_db):
|
|
user_info = {"username": "admin", "display_name": "The Administrator"}
|
|
user = ensure_user(user_info, test_db)
|
|
|
|
group = UserGroup(id=1, name="Admins")
|
|
group.permissions.append(Permission(scope="user", action="edit"))
|
|
group.permissions.append(Permission(scope="account", action="edit"))
|
|
user.user_groups.append(group)
|
|
test_db.add(group)
|
|
test_db.flush()
|
|
return "admin"
|
|
|
|
|
|
def test_user_add_group(test_db, client, admin_user):
|
|
user_info = {"username": "test", "display_name": "Display Test"}
|
|
user = ensure_user(user_info, test_db)
|
|
|
|
group = UserGroup(name="Bosses")
|
|
test_db.add(group)
|
|
test_db.flush()
|
|
|
|
assert 0 == len(user.user_groups)
|
|
|
|
response = client.post(
|
|
f"/admin/users/{user.id}/add_group",
|
|
data={"group_id": group.id},
|
|
user=admin_user,
|
|
follow_redirects=False,
|
|
)
|
|
assert response.status_code == 303
|
|
|
|
user = test_db.execute(select(User).where(User.username == "test")).scalar()
|
|
assert 1 == len(user.user_groups)
|
|
assert "Bosses" == user.user_groups[0].name
|
|
|
|
|
|
def test_user_remove_group(test_db, client, admin_user):
|
|
user_info = {"username": "test", "display_name": "Display Test"}
|
|
user = ensure_user(user_info, test_db)
|
|
|
|
group = UserGroup(name="Bosses")
|
|
test_db.add(group)
|
|
user.user_groups.append(group)
|
|
test_db.flush()
|
|
|
|
assert 1 == len(user.user_groups)
|
|
|
|
response = client.get(
|
|
f"/admin/users/{user.id}/remove_group/{group.id}",
|
|
user=admin_user,
|
|
follow_redirects=False,
|
|
)
|
|
assert response.status_code == 303
|
|
|
|
user = test_db.execute(select(User).where(User.username == "test")).scalar()
|
|
assert 0 == len(user.user_groups)
|
|
|
|
|
|
def test_group_add_permission(test_db, client, admin_user):
|
|
group = test_db.query(UserGroup).scalar()
|
|
|
|
response = client.post(
|
|
f"/admin/groups/{group.id}/add_permission",
|
|
data={"permission": "foo:bar"},
|
|
user=admin_user,
|
|
follow_redirects=False,
|
|
)
|
|
assert response.status_code == 303
|
|
|
|
group = test_db.execute(select(UserGroup).where(UserGroup.id == group.id)).scalar()
|
|
assert any(
|
|
perm.scope == "foo" and perm.action == "bar" for perm in group.permissions
|
|
)
|
|
|
|
|
|
def test_group_add_permission_illegal_format(test_db, client, admin_user):
|
|
group = test_db.query(UserGroup).scalar()
|
|
|
|
response = client.post(
|
|
f"/admin/groups/{group.id}/add_permission",
|
|
data={"permission": "foobar"},
|
|
user=admin_user,
|
|
follow_redirects=False,
|
|
)
|
|
assert response.status_code == 400
|
|
|
|
|
|
def test_group_remove_permission(test_db, client, admin_user):
|
|
group = test_db.query(UserGroup).scalar()
|
|
num_permissions_before = len(group.permissions)
|
|
response = client.get(
|
|
f"/admin/groups/{group.id}/remove_permission/1",
|
|
user=admin_user,
|
|
follow_redirects=False,
|
|
)
|
|
assert response.status_code == 303
|
|
group = test_db.execute(select(UserGroup).where(UserGroup.id == group.id)).scalar()
|
|
assert num_permissions_before - 1 == len(group.permissions)
|
|
|
|
|
|
def test_create_group(test_db, client, admin_user):
|
|
response = client.post(
|
|
"/admin/groups/create",
|
|
data={"name": "New Group", "description": "A newly created group"},
|
|
user=admin_user,
|
|
follow_redirects=False,
|
|
)
|
|
assert response.status_code == 303
|
|
assert test_db.query(UserGroup).filter_by(name="New Group").scalar() is not None
|
|
|
|
|
|
def test_delete_group(test_db, client, admin_user):
|
|
group = UserGroup(name="To Be Deleted")
|
|
test_db.add(group)
|
|
test_db.flush()
|
|
response = client.get(f"/admin/groups/{group.id}/delete", user=admin_user)
|
|
assert response.status_code == 200
|
|
assert (
|
|
test_db.execute(select(UserGroup).where(UserGroup.id == group.id)).scalar()
|
|
is None
|
|
)
|
|
|
|
|
|
def test_create_account(test_db, client, admin_user):
|
|
response = client.post(
|
|
"/admin/accounts/new",
|
|
data={"account_name": "New Account"},
|
|
user=admin_user,
|
|
follow_redirects=False,
|
|
)
|
|
assert response.status_code == 303
|
|
assert test_db.query(Account).filter_by(name="New Account").scalar() is not None
|
|
|
|
# try to create another account with the same name, should fail
|
|
response = client.post(
|
|
"/admin/accounts/new",
|
|
data={"account_name": "New Account"},
|
|
user=admin_user,
|
|
follow_redirects=False,
|
|
)
|
|
assert response.status_code == 400
|
|
|
|
|
|
def test_add_user_to_account(test_db, client, admin_user):
|
|
user_info = {"username": "test", "display_name": "Display Test"}
|
|
user = ensure_user(user_info, test_db)
|
|
|
|
account = Account(name="Test Account")
|
|
test_db.add(account)
|
|
test_db.flush()
|
|
|
|
response = client.post(
|
|
f"/admin/accounts/{account.id}/add_user",
|
|
data={"user_id": user.id},
|
|
user=admin_user,
|
|
follow_redirects=False,
|
|
)
|
|
assert response.status_code == 303
|
|
|
|
account = test_db.execute(select(Account).where(Account.id == account.id)).scalar()
|
|
assert any(u.username == "test" for u in account.users)
|
|
|
|
|
|
def test_add_balance_to_account(test_db, client, admin_user):
|
|
account = Account(name="Test Account")
|
|
test_db.add(account)
|
|
test_db.flush()
|
|
|
|
response = client.post(
|
|
f"/admin/accounts/{account.id}/add_balance",
|
|
data={"amount": "100.00"},
|
|
user=admin_user,
|
|
follow_redirects=False,
|
|
)
|
|
assert response.status_code == 303
|
|
|
|
account = test_db.execute(select(Account).where(Account.id == account.id)).scalar()
|
|
assert account.balance == 100.00
|