Drop Sqlmodel and use plain Sqlalchemy
This commit is contained in:
@@ -8,9 +8,10 @@ from fastapi import Depends, FastAPI, Request
|
||||
from fastapi.responses import RedirectResponse
|
||||
from fastapi.staticfiles import StaticFiles
|
||||
from fastapi.templating import Jinja2Templates
|
||||
from sqlmodel import Session, SQLModel, create_engine, select
|
||||
from sqlalchemy import create_engine, select
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from models import Event, Household, Registration, Subscription, TeamRegistration
|
||||
from models import Base, Event, Household, Registration, Subscription, TeamRegistration
|
||||
|
||||
sqlite_file_name = "database.db"
|
||||
sqlite_url = f"sqlite:///{sqlite_file_name}"
|
||||
@@ -27,7 +28,7 @@ def get_session():
|
||||
|
||||
|
||||
def create_db_and_tables():
|
||||
SQLModel.metadata.create_all(engine)
|
||||
Base.metadata.create_all(engine)
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
@@ -54,7 +55,7 @@ async def index(request: Request, session: SessionDep):
|
||||
.order_by(Event.event_time)
|
||||
.where(Event.event_time >= now - timedelta(days=1))
|
||||
)
|
||||
events = session.exec(statement).all()
|
||||
events = session.scalars(statement)
|
||||
return templates.TemplateResponse(
|
||||
request=request,
|
||||
name="index.html",
|
||||
@@ -71,7 +72,7 @@ async def past_events(request: Request, session: SessionDep):
|
||||
.order_by(Event.event_time)
|
||||
.where(Event.event_time < now - timedelta(days=1))
|
||||
)
|
||||
events = session.exec(statement).all()
|
||||
events = session.scalars(statement)
|
||||
return templates.TemplateResponse(
|
||||
request=request,
|
||||
name="index.html",
|
||||
@@ -82,9 +83,9 @@ async def past_events(request: Request, session: SessionDep):
|
||||
@app.get("/subscribe")
|
||||
async def subscribe(request: Request, session: SessionDep):
|
||||
statement = select(Household)
|
||||
households = session.exec(statement).all()
|
||||
households = session.scalars(statement)
|
||||
|
||||
subscriptions = session.exec(select(Subscription)).all()
|
||||
subscriptions = session.scalars(select(Subscription))
|
||||
|
||||
# filter out households with existing subscriptions
|
||||
households = [
|
||||
@@ -138,7 +139,7 @@ async def add_subscribe(request: Request, session: SessionDep):
|
||||
async def delete_subscription(request: Request, session: SessionDep, household_id: int):
|
||||
|
||||
statement = select(Subscription).where(Subscription.household_id == household_id)
|
||||
sub = session.exec(statement).one()
|
||||
sub = session.scalars(statement).one()
|
||||
|
||||
session.delete(sub)
|
||||
session.commit()
|
||||
@@ -183,10 +184,10 @@ async def add_event(request: Request, session: SessionDep):
|
||||
@app.get("/event/{event_id}")
|
||||
async def read_event(request: Request, event_id: int, session: SessionDep):
|
||||
statement = select(Event).where(Event.id == event_id)
|
||||
event = session.exec(statement).one()
|
||||
event = session.scalars(statement).one()
|
||||
|
||||
statement = select(Household)
|
||||
households = session.exec(statement).all()
|
||||
households = session.scalars(statement)
|
||||
|
||||
# filter out households with existing registrations
|
||||
households = [
|
||||
@@ -240,7 +241,7 @@ async def delete_registration(
|
||||
statement = select(Registration).where(
|
||||
Registration.household_id == household_id, Registration.event_id == event_id
|
||||
)
|
||||
session.delete(session.exec(statement).one())
|
||||
session.delete(session.scalars(statement).one())
|
||||
session.commit()
|
||||
return RedirectResponse(url=f"/event/{event_id}", status_code=status.HTTP_302_FOUND)
|
||||
|
||||
@@ -256,13 +257,12 @@ async def add_team_registration(request: Request, event_id: int, session: Sessio
|
||||
TeamRegistration.person_name == person, TeamRegistration.work_type == work_type
|
||||
)
|
||||
# if the person has already registered for the same work type, just ignore
|
||||
if session.exec(statement).one_or_none() is None:
|
||||
if session.scalars(statement).one_or_none() is None:
|
||||
registration = TeamRegistration(
|
||||
person_name=person,
|
||||
event_id=event_id,
|
||||
work_type=form_data["workType"],
|
||||
)
|
||||
TeamRegistration.model_validate(registration)
|
||||
session.add(registration)
|
||||
session.commit()
|
||||
return RedirectResponse(url=f"/event/{event_id}", status_code=status.HTTP_302_FOUND)
|
||||
@@ -276,6 +276,6 @@ async def delete_team_registration(
|
||||
session: SessionDep,
|
||||
):
|
||||
statement = select(TeamRegistration).where(TeamRegistration.id == entry_id)
|
||||
session.delete(session.exec(statement).one())
|
||||
session.delete(session.scalars(statement).one())
|
||||
session.commit()
|
||||
return RedirectResponse(url=f"/event/{event_id}", status_code=status.HTTP_302_FOUND)
|
||||
|
||||
Reference in New Issue
Block a user