mirror of
https://github.com/stijndcl/didier.git
synced 2026-04-07 23:55:46 +02:00
Create first implementation of events
This commit is contained in:
parent
5deb312474
commit
1831446f65
4 changed files with 111 additions and 9 deletions
|
|
@ -1,13 +1,14 @@
|
|||
import datetime
|
||||
from typing import Optional
|
||||
from zoneinfo import ZoneInfo
|
||||
|
||||
from dateutil.parser import parse
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy import delete, select
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from database.schemas import Event
|
||||
|
||||
__all__ = ["add_event", "get_event_by_id", "get_events", "get_next_event"]
|
||||
__all__ = ["add_event", "delete_event_by_id", "get_event_by_id", "get_events", "get_next_event"]
|
||||
|
||||
|
||||
async def add_event(
|
||||
|
|
@ -19,23 +20,31 @@ async def add_event(
|
|||
event = Event(name=name, description=description, timestamp=date_dt, notification_channel=channel_id)
|
||||
session.add(event)
|
||||
await session.commit()
|
||||
await session.refresh(event)
|
||||
|
||||
return event
|
||||
|
||||
|
||||
async def delete_event_by_id(session: AsyncSession, event_id: int):
|
||||
"""Delete an event by its id"""
|
||||
statement = delete(Event).where(Event.event_id == event_id)
|
||||
await session.execute(statement)
|
||||
await session.commit()
|
||||
|
||||
|
||||
async def get_event_by_id(session: AsyncSession, event_id: int) -> Optional[Event]:
|
||||
"""Get an event by its id"""
|
||||
statement = select(Event).where(Event.event_id == event_id)
|
||||
return (await session.execute(statement)).scalar_one_or_none()
|
||||
|
||||
|
||||
async def get_events(session: AsyncSession) -> list[Event]:
|
||||
async def get_events(session: AsyncSession, *, now: datetime.datetime) -> list[Event]:
|
||||
"""Get a list of all upcoming events"""
|
||||
statement = select(Event)
|
||||
statement = select(Event).where(Event.timestamp > now)
|
||||
return (await session.execute(statement)).scalars().all()
|
||||
|
||||
|
||||
async def get_next_event(session: AsyncSession) -> Optional[Event]:
|
||||
async def get_next_event(session: AsyncSession, *, now: datetime.datetime) -> Optional[Event]:
|
||||
"""Get the first upcoming event"""
|
||||
statement = select(Event).order_by(Event.timestamp)
|
||||
statement = select(Event).where(Event.timestamp > now).order_by(Event.timestamp)
|
||||
return (await session.execute(statement)).scalar_one_or_none()
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue