* display date on event page * Filter existing subscriptions for new subscriptions dropdown * Add Typeahead for Team registration
282 lines
9.4 KiB
Python
282 lines
9.4 KiB
Python
import locale
|
|
from contextlib import asynccontextmanager
|
|
from datetime import datetime, timedelta
|
|
from typing import Annotated
|
|
|
|
import starlette.status as status
|
|
from fastapi import Depends, FastAPI, Request
|
|
from fastapi.responses import RedirectResponse
|
|
from fastapi.staticfiles import StaticFiles
|
|
from fastapi.templating import Jinja2Templates
|
|
from sqlmodel import Session, SQLModel, create_engine, select
|
|
|
|
from models import Event, Household, Registration, Subscription, TeamRegistration
|
|
|
|
sqlite_file_name = "database.db"
|
|
sqlite_url = f"sqlite:///{sqlite_file_name}"
|
|
|
|
connect_args = {"check_same_thread": False}
|
|
engine = create_engine(sqlite_url, connect_args=connect_args)
|
|
|
|
locale.setlocale(locale.LC_ALL, "de_DE.UTF-8")
|
|
|
|
|
|
def get_session():
|
|
with Session(engine) as session:
|
|
yield session
|
|
|
|
|
|
def create_db_and_tables():
|
|
SQLModel.metadata.create_all(engine)
|
|
|
|
|
|
@asynccontextmanager
|
|
async def on_startup(app_: FastAPI):
|
|
create_db_and_tables()
|
|
yield
|
|
|
|
|
|
app = FastAPI(lifespan=on_startup)
|
|
app.mount("/static", StaticFiles(directory="static"), name="static")
|
|
|
|
templates = Jinja2Templates(directory="templates")
|
|
|
|
SessionDep = Annotated[Session, Depends(get_session)]
|
|
|
|
|
|
@app.get("/")
|
|
async def index(request: Request, session: SessionDep):
|
|
"""Displays coming events and a button to register new ones"""
|
|
now = datetime.now()
|
|
# TODO: Once we refactored to use SQLAlchemy directly, we can probably do a nicer filtering on the date alone
|
|
statement = (
|
|
select(Event)
|
|
.order_by(Event.event_time)
|
|
.where(Event.event_time >= now - timedelta(days=1))
|
|
)
|
|
events = session.exec(statement).all()
|
|
return templates.TemplateResponse(
|
|
request=request,
|
|
name="index.html",
|
|
context={"events": events, "current_page": "home", "now": now},
|
|
)
|
|
|
|
|
|
@app.get("/past_events")
|
|
async def past_events(request: Request, session: SessionDep):
|
|
now = datetime.now()
|
|
# TODO: Once we refactored to use SQLAlchemy directly, we can probably do a nicer filtering on the date alone
|
|
statement = (
|
|
select(Event)
|
|
.order_by(Event.event_time)
|
|
.where(Event.event_time < now - timedelta(days=1))
|
|
)
|
|
events = session.exec(statement).all()
|
|
return templates.TemplateResponse(
|
|
request=request,
|
|
name="index.html",
|
|
context={"events": events, "current_page": "past", "now": now},
|
|
)
|
|
|
|
|
|
@app.get("/subscribe")
|
|
async def subscribe(request: Request, session: SessionDep):
|
|
statement = select(Household)
|
|
households = session.exec(statement).all()
|
|
|
|
subscriptions = session.exec(select(Subscription)).all()
|
|
|
|
# filter out households with existing subscriptions
|
|
households = [
|
|
h for h in households if h.id not in [sub.household_id for sub in subscriptions]
|
|
]
|
|
|
|
return templates.TemplateResponse(
|
|
request=request,
|
|
name="subscribe.html",
|
|
context={"households": households, "subscriptions": subscriptions},
|
|
)
|
|
|
|
|
|
@app.post("/subscribe")
|
|
async def add_subscribe(request: Request, session: SessionDep):
|
|
form_data = await request.form()
|
|
# TODO: Make this return a nicer error message
|
|
try:
|
|
num_adult_meals = int(form_data["numAdults"]) if form_data["numAdults"] else 0
|
|
num_children_meals = int(form_data["numKids"]) if form_data["numKids"] else 0
|
|
num_small_children_meals = (
|
|
int(form_data["numSmallKids"]) if form_data["numSmallKids"] else 0
|
|
)
|
|
except ValueError:
|
|
raise ValueError("All number fields must be integers")
|
|
|
|
subscription = Subscription(
|
|
household_id=form_data["household"],
|
|
num_adult_meals=num_adult_meals,
|
|
num_children_meals=num_children_meals,
|
|
num_small_children_meals=num_small_children_meals,
|
|
)
|
|
|
|
selected_days = form_data.getlist("days")
|
|
if selected_days:
|
|
subscription.monday = "1" in selected_days
|
|
subscription.tuesday = "2" in selected_days
|
|
subscription.wednesday = "3" in selected_days
|
|
subscription.thursday = "4" in selected_days
|
|
subscription.friday = "5" in selected_days
|
|
subscription.saturday = "6" in selected_days
|
|
subscription.sunday = "7" in selected_days
|
|
|
|
session.add(subscription)
|
|
session.commit()
|
|
|
|
return RedirectResponse(url="/subscribe", status_code=status.HTTP_302_FOUND)
|
|
|
|
|
|
@app.get("/subscribe/{household_id}/delete")
|
|
async def delete_subscription(request: Request, session: SessionDep, household_id: int):
|
|
|
|
statement = select(Subscription).where(Subscription.household_id == household_id)
|
|
sub = session.exec(statement).one()
|
|
|
|
session.delete(sub)
|
|
session.commit()
|
|
|
|
return RedirectResponse(url="/subscribe", status_code=status.HTTP_302_FOUND)
|
|
|
|
|
|
@app.get("/event/add")
|
|
async def add_event_form(request: Request, session: SessionDep):
|
|
return templates.TemplateResponse(request=request, name="add_event.html")
|
|
|
|
|
|
@app.post("/event/add")
|
|
async def add_event(request: Request, session: SessionDep):
|
|
form_data = await request.form()
|
|
|
|
event_time = datetime.fromisoformat(form_data["eventTime"])
|
|
registration_deadline = form_data.get("registrationDeadline")
|
|
if not registration_deadline:
|
|
# Find the last Sunday before event_time
|
|
deadline = event_time
|
|
while deadline.weekday() != 6: # 6 represents Sunday
|
|
deadline = deadline.replace(day=deadline.day - 1)
|
|
registration_deadline = deadline.replace(
|
|
hour=19, minute=30, second=0, microsecond=0
|
|
)
|
|
else:
|
|
registration_deadline = datetime.fromisoformat(registration_deadline)
|
|
|
|
event = Event(
|
|
title=form_data["eventName"],
|
|
event_time=event_time,
|
|
registration_deadline=registration_deadline,
|
|
description=form_data.get("eventDescription"),
|
|
recipe_link=form_data.get("recipeLink"),
|
|
)
|
|
session.add(event)
|
|
session.commit()
|
|
return RedirectResponse(url="/", status_code=status.HTTP_302_FOUND)
|
|
|
|
|
|
@app.get("/event/{event_id}")
|
|
async def read_event(request: Request, event_id: int, session: SessionDep):
|
|
statement = select(Event).where(Event.id == event_id)
|
|
event = session.exec(statement).one()
|
|
|
|
statement = select(Household)
|
|
households = session.exec(statement).all()
|
|
|
|
# filter out households with existing registrations
|
|
households = [
|
|
h
|
|
for h in households
|
|
if h.id not in [reg.household_id for reg in event.registrations]
|
|
]
|
|
|
|
return templates.TemplateResponse(
|
|
request=request,
|
|
name="event.html",
|
|
context={"event": event, "households": households, "now": datetime.now()},
|
|
)
|
|
|
|
|
|
@app.post("/event/{event_id}/register")
|
|
async def add_registration(request: Request, event_id: int, session: SessionDep):
|
|
form_data = await request.form()
|
|
|
|
# TODO: Make this return a nicer error message
|
|
try:
|
|
num_adult_meals = int(form_data["numAdults"]) if form_data["numAdults"] else 0
|
|
num_children_meals = int(form_data["numKids"]) if form_data["numKids"] else 0
|
|
num_small_children_meals = (
|
|
int(form_data["numSmallKids"]) if form_data["numSmallKids"] else 0
|
|
)
|
|
except ValueError:
|
|
raise ValueError("All number fields must be integers")
|
|
|
|
registration = Registration(
|
|
household_id=form_data["household"],
|
|
event_id=event_id,
|
|
num_adult_meals=num_adult_meals,
|
|
num_children_meals=num_children_meals,
|
|
num_small_children_meals=num_small_children_meals,
|
|
)
|
|
session.add(registration)
|
|
session.commit()
|
|
return RedirectResponse(url=f"/event/{event_id}", status_code=status.HTTP_302_FOUND)
|
|
|
|
|
|
@app.get("/event/{event_id}/registration/{household_id}/delete")
|
|
async def delete_registration(
|
|
request: Request, event_id: int, household_id: int, session: SessionDep
|
|
):
|
|
"""
|
|
Deletes a registration record for a specific household at a given event. This endpoint
|
|
handles the removal of the registration, commits the change to the database, and
|
|
redirects the user to the event page.
|
|
"""
|
|
statement = select(Registration).where(
|
|
Registration.household_id == household_id, Registration.event_id == event_id
|
|
)
|
|
session.delete(session.exec(statement).one())
|
|
session.commit()
|
|
return RedirectResponse(url=f"/event/{event_id}", status_code=status.HTTP_302_FOUND)
|
|
|
|
|
|
@app.post("/event/{event_id}/register_team")
|
|
async def add_team_registration(request: Request, event_id: int, session: SessionDep):
|
|
form_data = await request.form()
|
|
|
|
person = form_data["personName"].strip()
|
|
work_type = form_data["workType"]
|
|
|
|
statement = select(TeamRegistration).where(
|
|
TeamRegistration.person_name == person, TeamRegistration.work_type == work_type
|
|
)
|
|
# if the person has already registered for the same work type, just ignore
|
|
if session.exec(statement).one_or_none() is None:
|
|
registration = TeamRegistration(
|
|
person_name=person,
|
|
event_id=event_id,
|
|
work_type=form_data["workType"],
|
|
)
|
|
TeamRegistration.model_validate(registration)
|
|
session.add(registration)
|
|
session.commit()
|
|
return RedirectResponse(url=f"/event/{event_id}", status_code=status.HTTP_302_FOUND)
|
|
|
|
|
|
@app.get("/event/{event_id}/register_team/{entry_id}/delete")
|
|
async def delete_team_registration(
|
|
request: Request,
|
|
event_id: int,
|
|
entry_id: int,
|
|
session: SessionDep,
|
|
):
|
|
statement = select(TeamRegistration).where(TeamRegistration.id == entry_id)
|
|
session.delete(session.exec(statement).one())
|
|
session.commit()
|
|
return RedirectResponse(url=f"/event/{event_id}", status_code=status.HTTP_302_FOUND)
|