import locale from contextlib import asynccontextmanager from datetime import datetime, timedelta from typing import Annotated import starlette.status as status from fastapi import Depends, FastAPI, Request from fastapi.responses import RedirectResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from sqlmodel import Session, SQLModel, create_engine, select from models import Event, Household, Registration, TeamRegistration sqlite_file_name = "database.db" sqlite_url = f"sqlite:///{sqlite_file_name}" connect_args = {"check_same_thread": False} engine = create_engine(sqlite_url, connect_args=connect_args) locale.setlocale(locale.LC_ALL, "de_DE.UTF-8") def get_session(): with Session(engine) as session: yield session def create_db_and_tables(): SQLModel.metadata.create_all(engine) @asynccontextmanager async def on_startup(app_: FastAPI): create_db_and_tables() yield app = FastAPI(lifespan=on_startup) app.mount("/static", StaticFiles(directory="static"), name="static") templates = Jinja2Templates(directory="templates") SessionDep = Annotated[Session, Depends(get_session)] @app.get("/") async def index(request: Request, session: SessionDep): """Displays coming events and a button to register new ones""" now = datetime.now() # TODO: Once we refactored to use SQLAlchemy directly, we can probably do a nicer filtering on the date alone statement = ( select(Event) .order_by(Event.event_time) .where(Event.event_time >= now - timedelta(days=1)) ) events = session.exec(statement).all() return templates.TemplateResponse( request=request, name="index.html", context={"events": events, "current_page": "home", "now": now}, ) @app.get("/past_events") async def past_events(request: Request, session: SessionDep): now = datetime.now() # TODO: Once we refactored to use SQLAlchemy directly, we can probably do a nicer filtering on the date alone statement = ( select(Event) .order_by(Event.event_time) .where(Event.event_time < now - timedelta(days=1)) ) events = session.exec(statement).all() return templates.TemplateResponse( request=request, name="index.html", context={"events": events, "current_page": "past", "now": now}, ) @app.get("/event/add") async def add_event_form(request: Request, session: SessionDep): return templates.TemplateResponse(request=request, name="add_event.html") @app.post("/event/add") async def add_event(request: Request, session: SessionDep): form_data = await request.form() event_time = datetime.fromisoformat(form_data["eventTime"]) registration_deadline = form_data.get("registrationDeadline") if not registration_deadline: # Find the last Sunday before event_time deadline = event_time while deadline.weekday() != 6: # 6 represents Sunday deadline = deadline.replace(day=deadline.day - 1) registration_deadline = deadline.replace( hour=19, minute=30, second=0, microsecond=0 ) else: registration_deadline = datetime.fromisoformat(registration_deadline) event = Event( title=form_data["eventName"], event_time=event_time, registration_deadline=registration_deadline, description=form_data.get("eventDescription"), recipe_link=form_data.get("recipeLink"), ) session.add(event) session.commit() return RedirectResponse(url="/", status_code=status.HTTP_302_FOUND) @app.get("/event/{event_id}") async def read_event(request: Request, event_id: int, session: SessionDep): statement = select(Event).where(Event.id == event_id) event = session.exec(statement).one() statement = select(Household) households = session.exec(statement).all() # filter out households with existing registrations households = [ h for h in households if h.id not in [reg.household_id for reg in event.registrations] ] return templates.TemplateResponse( request=request, name="event.html", context={"event": event, "households": households, "now": datetime.now()}, ) @app.post("/event/{event_id}/register") async def add_registration(request: Request, event_id: int, session: SessionDep): form_data = await request.form() # TODO: Make this return a nicer error message try: num_adult_meals = int(form_data["numAdults"]) if form_data["numAdults"] else 0 num_children_meals = int(form_data["numKids"]) if form_data["numKids"] else 0 num_small_children_meals = ( int(form_data["numSmallKids"]) if form_data["numSmallKids"] else 0 ) except ValueError: raise ValueError("All number fields must be integers") registration = Registration( household_id=form_data["household"], event_id=event_id, num_adult_meals=num_adult_meals, num_children_meals=num_children_meals, num_small_children_meals=num_small_children_meals, ) session.add(registration) session.commit() return RedirectResponse(url=f"/event/{event_id}", status_code=status.HTTP_302_FOUND) @app.get("/event/{event_id}/registration/{household_id}/delete") async def delete_registration( request: Request, event_id: int, household_id: int, session: SessionDep ): """ Deletes a registration record for a specific household at a given event. This endpoint handles the removal of the registration, commits the change to the database, and redirects the user to the event page. """ statement = select(Registration).where( Registration.household_id == household_id, Registration.event_id == event_id ) session.delete(session.exec(statement).one()) session.commit() return RedirectResponse(url=f"/event/{event_id}", status_code=status.HTTP_302_FOUND) @app.post("/event/{event_id}/register_team") async def add_team_registration(request: Request, event_id: int, session: SessionDep): form_data = await request.form() person = form_data["personName"].strip() work_type = form_data["workType"] statement = select(TeamRegistration).where( TeamRegistration.person_name == person, TeamRegistration.work_type == work_type ) # if the person has already registered for the same work type, just ignore if session.exec(statement).one_or_none() is None: registration = TeamRegistration( person_name=person, event_id=event_id, work_type=form_data["workType"], ) TeamRegistration.model_validate(registration) session.add(registration) session.commit() return RedirectResponse(url=f"/event/{event_id}", status_code=status.HTTP_302_FOUND) @app.get("/event/{event_id}/register_team/{entry_id}/delete") async def delete_team_registration( request: Request, event_id: int, entry_id: int, session: SessionDep, ): statement = select(TeamRegistration).where(TeamRegistration.id == entry_id) session.delete(session.exec(statement).one()) session.commit() return RedirectResponse(url=f"/event/{event_id}", status_code=status.HTTP_302_FOUND)