77 lines
2.2 KiB
Python
77 lines
2.2 KiB
Python
import os
|
|
from unittest import mock
|
|
|
|
import pytest
|
|
from fastapi.testclient import TestClient
|
|
from sqlalchemy import StaticPool, create_engine, event
|
|
from sqlalchemy.orm import sessionmaker
|
|
|
|
from allmende_payment_system.api.dependencies import get_session
|
|
from allmende_payment_system.app import app
|
|
from allmende_payment_system.models import Base
|
|
|
|
engine = create_engine(
|
|
"sqlite:///:memory:",
|
|
connect_args={"check_same_thread": False},
|
|
poolclass=StaticPool,
|
|
)
|
|
|
|
# Create a single connection and an outer transaction which we can rollback
|
|
# at the end of the test run. Individual tests will use nested transactions
|
|
# (SAVEPOINTs) for isolation. This ensures the TestClient (app) and the
|
|
# test fixture sessions see the same in-memory database state.
|
|
connection = engine.connect()
|
|
transaction = connection.begin()
|
|
|
|
Base.metadata.create_all(bind=connection)
|
|
|
|
# Bind sessions to the single connection so all sessions share the same DB
|
|
TestSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=connection)
|
|
|
|
|
|
def get_test_session():
|
|
"""Dependency override for get_session"""
|
|
db = TestSessionLocal()
|
|
try:
|
|
yield db
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
app.dependency_overrides[get_session] = get_test_session
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
def client():
|
|
os.environ["APS_username"] = "test"
|
|
return TestClient(app)
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
def unauthorized_client():
|
|
os.environ.pop("APS_username", None)
|
|
return TestClient(app)
|
|
|
|
|
|
@pytest.fixture(scope="function")
|
|
def test_db():
|
|
"""Provides a database session for direct test usage"""
|
|
db = TestSessionLocal()
|
|
|
|
# Start a SAVEPOINT so test changes can be rolled back without
|
|
# closing the shared connection. Also restart the nested transaction
|
|
# when the session issues commits internally.
|
|
db.begin_nested()
|
|
|
|
@event.listens_for(db, "after_transaction_end")
|
|
def restart_savepoint(session, transaction):
|
|
# If the nested transaction ended, re-open it for continued isolation
|
|
if transaction.nested and not session.is_active:
|
|
session.begin_nested()
|
|
|
|
try:
|
|
yield db
|
|
finally:
|
|
# Rollback to the SAVEPOINT and close the session to clean up
|
|
db.rollback()
|
|
db.close() |