Files
allmende-essen/new-registration-app/main.py

285 lines
9.5 KiB
Python

import locale
from contextlib import asynccontextmanager
from datetime import datetime, timedelta
from typing import Annotated
import starlette.status as status
from fastapi import Depends, FastAPI, Request
from fastapi.responses import RedirectResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from sqlmodel import Session, SQLModel, create_engine, select
from models import (Event, Household, Registration, Subscription,
TeamRegistration)
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
connect_args = {"check_same_thread": False}
engine = create_engine(sqlite_url, connect_args=connect_args)
locale.setlocale(locale.LC_ALL, "de_DE.UTF-8")
def get_session():
with Session(engine) as session:
yield session
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
@asynccontextmanager
async def on_startup(app_: FastAPI):
create_db_and_tables()
yield
app = FastAPI(lifespan=on_startup)
app.mount("/static", StaticFiles(directory="static"), name="static")
templates = Jinja2Templates(directory="templates")
SessionDep = Annotated[Session, Depends(get_session)]
@app.get("/")
async def index(request: Request, session: SessionDep):
"""Displays coming events and a button to register new ones"""
now = datetime.now()
# TODO: Once we refactored to use SQLAlchemy directly, we can probably do a nicer filtering on the date alone
statement = (
select(Event)
.order_by(Event.event_time)
.where(Event.event_time >= now - timedelta(days=1))
)
events = session.exec(statement).all()
return templates.TemplateResponse(
request=request,
name="index.html",
context={"events": events, "current_page": "home", "now": now},
)
@app.get("/past_events")
async def past_events(request: Request, session: SessionDep):
now = datetime.now()
# TODO: Once we refactored to use SQLAlchemy directly, we can probably do a nicer filtering on the date alone
statement = (
select(Event)
.order_by(Event.event_time)
.where(Event.event_time < now - timedelta(days=1))
)
events = session.exec(statement).all()
return templates.TemplateResponse(
request=request,
name="index.html",
context={"events": events, "current_page": "past", "now": now},
)
@app.get("/subscribe")
async def subscribe(request: Request, session: SessionDep):
statement = select(Household)
households = session.exec(statement).all()
# filter out households with existing registrations
# households = [
# h
# for h in households
# if h.id not in [reg.household_id for reg in event.registrations]
# ]
subscriptions = session.exec(select(Subscription)).all()
return templates.TemplateResponse(
request=request,
name="subscribe.html",
context={"households": households, "subscriptions": subscriptions},
)
@app.post("/subscribe")
async def add_subscribe(request: Request, session: SessionDep):
form_data = await request.form()
# TODO: Make this return a nicer error message
try:
num_adult_meals = int(form_data["numAdults"]) if form_data["numAdults"] else 0
num_children_meals = int(form_data["numKids"]) if form_data["numKids"] else 0
num_small_children_meals = (
int(form_data["numSmallKids"]) if form_data["numSmallKids"] else 0
)
except ValueError:
raise ValueError("All number fields must be integers")
subscription = Subscription(
household_id=form_data["household"],
num_adult_meals=num_adult_meals,
num_children_meals=num_children_meals,
num_small_children_meals=num_small_children_meals,
)
selected_days = form_data.getlist("days")
if selected_days:
subscription.monday = "1" in selected_days
subscription.tuesday = "2" in selected_days
subscription.wednesday = "3" in selected_days
subscription.thursday = "4" in selected_days
subscription.friday = "5" in selected_days
subscription.saturday = "6" in selected_days
subscription.sunday = "7" in selected_days
session.add(subscription)
session.commit()
return RedirectResponse(url="/subscribe", status_code=status.HTTP_302_FOUND)
@app.get("/subscribe/{household_id}/delete")
async def delete_subscription(request: Request, session: SessionDep, household_id: int):
statement = select(Subscription).where(Subscription.household_id == household_id)
sub = session.exec(statement).one()
session.delete(sub)
session.commit()
return RedirectResponse(url="/subscribe", status_code=status.HTTP_302_FOUND)
@app.get("/event/add")
async def add_event_form(request: Request, session: SessionDep):
return templates.TemplateResponse(request=request, name="add_event.html")
@app.post("/event/add")
async def add_event(request: Request, session: SessionDep):
form_data = await request.form()
event_time = datetime.fromisoformat(form_data["eventTime"])
registration_deadline = form_data.get("registrationDeadline")
if not registration_deadline:
# Find the last Sunday before event_time
deadline = event_time
while deadline.weekday() != 6: # 6 represents Sunday
deadline = deadline.replace(day=deadline.day - 1)
registration_deadline = deadline.replace(
hour=19, minute=30, second=0, microsecond=0
)
else:
registration_deadline = datetime.fromisoformat(registration_deadline)
event = Event(
title=form_data["eventName"],
event_time=event_time,
registration_deadline=registration_deadline,
description=form_data.get("eventDescription"),
recipe_link=form_data.get("recipeLink"),
)
session.add(event)
session.commit()
return RedirectResponse(url="/", status_code=status.HTTP_302_FOUND)
@app.get("/event/{event_id}")
async def read_event(request: Request, event_id: int, session: SessionDep):
statement = select(Event).where(Event.id == event_id)
event = session.exec(statement).one()
statement = select(Household)
households = session.exec(statement).all()
# filter out households with existing registrations
households = [
h
for h in households
if h.id not in [reg.household_id for reg in event.registrations]
]
return templates.TemplateResponse(
request=request,
name="event.html",
context={"event": event, "households": households, "now": datetime.now()},
)
@app.post("/event/{event_id}/register")
async def add_registration(request: Request, event_id: int, session: SessionDep):
form_data = await request.form()
# TODO: Make this return a nicer error message
try:
num_adult_meals = int(form_data["numAdults"]) if form_data["numAdults"] else 0
num_children_meals = int(form_data["numKids"]) if form_data["numKids"] else 0
num_small_children_meals = (
int(form_data["numSmallKids"]) if form_data["numSmallKids"] else 0
)
except ValueError:
raise ValueError("All number fields must be integers")
registration = Registration(
household_id=form_data["household"],
event_id=event_id,
num_adult_meals=num_adult_meals,
num_children_meals=num_children_meals,
num_small_children_meals=num_small_children_meals,
)
session.add(registration)
session.commit()
return RedirectResponse(url=f"/event/{event_id}", status_code=status.HTTP_302_FOUND)
@app.get("/event/{event_id}/registration/{household_id}/delete")
async def delete_registration(
request: Request, event_id: int, household_id: int, session: SessionDep
):
"""
Deletes a registration record for a specific household at a given event. This endpoint
handles the removal of the registration, commits the change to the database, and
redirects the user to the event page.
"""
statement = select(Registration).where(
Registration.household_id == household_id, Registration.event_id == event_id
)
session.delete(session.exec(statement).one())
session.commit()
return RedirectResponse(url=f"/event/{event_id}", status_code=status.HTTP_302_FOUND)
@app.post("/event/{event_id}/register_team")
async def add_team_registration(request: Request, event_id: int, session: SessionDep):
form_data = await request.form()
person = form_data["personName"].strip()
work_type = form_data["workType"]
statement = select(TeamRegistration).where(
TeamRegistration.person_name == person, TeamRegistration.work_type == work_type
)
# if the person has already registered for the same work type, just ignore
if session.exec(statement).one_or_none() is None:
registration = TeamRegistration(
person_name=person,
event_id=event_id,
work_type=form_data["workType"],
)
TeamRegistration.model_validate(registration)
session.add(registration)
session.commit()
return RedirectResponse(url=f"/event/{event_id}", status_code=status.HTTP_302_FOUND)
@app.get("/event/{event_id}/register_team/{entry_id}/delete")
async def delete_team_registration(
request: Request,
event_id: int,
entry_id: int,
session: SessionDep,
):
statement = select(TeamRegistration).where(TeamRegistration.id == entry_id)
session.delete(session.exec(statement).one())
session.commit()
return RedirectResponse(url=f"/event/{event_id}", status_code=status.HTTP_302_FOUND)