228 lines
6.7 KiB
Python
228 lines
6.7 KiB
Python
from decimal import Decimal
|
|
|
|
import pytest
|
|
from fastapi.testclient import TestClient
|
|
from sqlalchemy import select
|
|
from sqlalchemy.orm import Session
|
|
|
|
from allmende_payment_system.app import app
|
|
from allmende_payment_system.database import ensure_user
|
|
from allmende_payment_system.models import (
|
|
Account,
|
|
Area,
|
|
Permission,
|
|
Product,
|
|
User,
|
|
UserGroup,
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def admin_user(test_db):
|
|
user_info = {"username": "admin", "display_name": "The Administrator"}
|
|
user = ensure_user(user_info, test_db)
|
|
|
|
group = UserGroup(id=1, name="Admins")
|
|
group.permissions.append(Permission(scope="user", action="edit"))
|
|
group.permissions.append(Permission(scope="account", action="edit"))
|
|
group.permissions.append(Permission(scope="product", action="edit"))
|
|
user.user_groups.append(group)
|
|
test_db.add(group)
|
|
test_db.flush()
|
|
return "admin"
|
|
|
|
|
|
def test_user_add_group(test_db, client, admin_user):
|
|
user_info = {"username": "test", "display_name": "Display Test"}
|
|
user = ensure_user(user_info, test_db)
|
|
|
|
group = UserGroup(name="Bosses")
|
|
test_db.add(group)
|
|
test_db.flush()
|
|
|
|
assert 0 == len(user.user_groups)
|
|
|
|
response = client.post(
|
|
f"/admin/users/{user.id}/add_group",
|
|
data={"group_id": group.id},
|
|
user=admin_user,
|
|
follow_redirects=False,
|
|
)
|
|
assert response.status_code == 303
|
|
|
|
user = test_db.execute(select(User).where(User.username == "test")).scalar()
|
|
assert 1 == len(user.user_groups)
|
|
assert "Bosses" == user.user_groups[0].name
|
|
|
|
|
|
def test_user_remove_group(test_db, client, admin_user):
|
|
user_info = {"username": "test", "display_name": "Display Test"}
|
|
user = ensure_user(user_info, test_db)
|
|
|
|
group = UserGroup(name="Bosses")
|
|
test_db.add(group)
|
|
user.user_groups.append(group)
|
|
test_db.flush()
|
|
|
|
assert 1 == len(user.user_groups)
|
|
|
|
response = client.get(
|
|
f"/admin/users/{user.id}/remove_group/{group.id}",
|
|
user=admin_user,
|
|
follow_redirects=False,
|
|
)
|
|
assert response.status_code == 303
|
|
|
|
user = test_db.execute(select(User).where(User.username == "test")).scalar()
|
|
assert 0 == len(user.user_groups)
|
|
|
|
|
|
def test_group_add_permission(test_db, client, admin_user):
|
|
group = test_db.query(UserGroup).scalar()
|
|
|
|
response = client.post(
|
|
f"/admin/groups/{group.id}/add_permission",
|
|
data={"permission": "foo:bar"},
|
|
user=admin_user,
|
|
follow_redirects=False,
|
|
)
|
|
assert response.status_code == 303
|
|
|
|
group = test_db.execute(select(UserGroup).where(UserGroup.id == group.id)).scalar()
|
|
assert any(
|
|
perm.scope == "foo" and perm.action == "bar" for perm in group.permissions
|
|
)
|
|
|
|
|
|
def test_group_add_permission_illegal_format(test_db, client, admin_user):
|
|
group = test_db.query(UserGroup).scalar()
|
|
|
|
response = client.post(
|
|
f"/admin/groups/{group.id}/add_permission",
|
|
data={"permission": "foobar"},
|
|
user=admin_user,
|
|
follow_redirects=False,
|
|
)
|
|
assert response.status_code == 400
|
|
|
|
|
|
def test_group_remove_permission(test_db, client, admin_user):
|
|
group = test_db.query(UserGroup).scalar()
|
|
num_permissions_before = len(group.permissions)
|
|
response = client.get(
|
|
f"/admin/groups/{group.id}/remove_permission/1",
|
|
user=admin_user,
|
|
follow_redirects=False,
|
|
)
|
|
assert response.status_code == 303
|
|
group = test_db.execute(select(UserGroup).where(UserGroup.id == group.id)).scalar()
|
|
assert num_permissions_before - 1 == len(group.permissions)
|
|
|
|
|
|
def test_create_group(test_db, client, admin_user):
|
|
response = client.post(
|
|
"/admin/groups/create",
|
|
data={"name": "New Group", "description": "A newly created group"},
|
|
user=admin_user,
|
|
follow_redirects=False,
|
|
)
|
|
assert response.status_code == 303
|
|
assert test_db.query(UserGroup).filter_by(name="New Group").scalar() is not None
|
|
|
|
|
|
def test_delete_group(test_db, client, admin_user):
|
|
group = UserGroup(name="To Be Deleted")
|
|
test_db.add(group)
|
|
test_db.flush()
|
|
response = client.get(f"/admin/groups/{group.id}/delete", user=admin_user)
|
|
assert response.status_code == 200
|
|
assert (
|
|
test_db.execute(select(UserGroup).where(UserGroup.id == group.id)).scalar()
|
|
is None
|
|
)
|
|
|
|
|
|
def test_create_account(test_db, client, admin_user):
|
|
response = client.post(
|
|
"/admin/accounts/new",
|
|
data={"account_name": "New Account"},
|
|
user=admin_user,
|
|
follow_redirects=False,
|
|
)
|
|
assert response.status_code == 303
|
|
assert test_db.query(Account).filter_by(name="New Account").scalar() is not None
|
|
|
|
# try to create another account with the same name, should fail
|
|
response = client.post(
|
|
"/admin/accounts/new",
|
|
data={"account_name": "New Account"},
|
|
user=admin_user,
|
|
follow_redirects=False,
|
|
)
|
|
assert response.status_code == 400
|
|
|
|
|
|
def test_add_user_to_account(test_db, client, admin_user):
|
|
user_info = {"username": "test", "display_name": "Display Test"}
|
|
user = ensure_user(user_info, test_db)
|
|
|
|
account = Account(name="Test Account")
|
|
test_db.add(account)
|
|
test_db.flush()
|
|
|
|
response = client.post(
|
|
f"/admin/accounts/{account.id}/add_user",
|
|
data={"user_id": user.id},
|
|
user=admin_user,
|
|
follow_redirects=False,
|
|
)
|
|
assert response.status_code == 303
|
|
|
|
account = test_db.execute(select(Account).where(Account.id == account.id)).scalar()
|
|
assert any(u.username == "test" for u in account.users)
|
|
|
|
|
|
def test_add_balance_to_account(test_db, client, admin_user):
|
|
account = Account(name="Test Account")
|
|
test_db.add(account)
|
|
test_db.flush()
|
|
|
|
response = client.post(
|
|
f"/admin/accounts/{account.id}/add_balance",
|
|
data={"amount": "100.00"},
|
|
user=admin_user,
|
|
follow_redirects=False,
|
|
)
|
|
assert response.status_code == 303
|
|
|
|
account = test_db.execute(select(Account).where(Account.id == account.id)).scalar()
|
|
assert account.balance == 100.00
|
|
|
|
|
|
### area and product management
|
|
|
|
|
|
def test_add_product(test_db, client, admin_user):
|
|
area = Area(name="Test Area", description="An area for testing")
|
|
test_db.add(area)
|
|
test_db.flush()
|
|
response = client.post(
|
|
"/admin/products/new",
|
|
data={
|
|
"name": "Test Product",
|
|
"vat_rate": "19.00",
|
|
"unit_of_measure": "piece",
|
|
"price": 9.99,
|
|
"area_id": area.id,
|
|
},
|
|
user=admin_user,
|
|
follow_redirects=False,
|
|
)
|
|
assert response.status_code == 303
|
|
product = test_db.execute(
|
|
select(Product).where(Product.name == "Test Product")
|
|
).scalar()
|
|
assert product is not None
|
|
assert product.name == "Test Product"
|
|
assert product.price == Decimal("9.99")
|