Sending schedules in discord, small fixes

pull/131/head
stijndcl 2022-09-18 01:16:19 +02:00
parent 14ccb42424
commit 13f7d03bbb
17 changed files with 354 additions and 135 deletions

View File

@ -1,8 +1,8 @@
"""Initial migration
Revision ID: 5bdb99885a5d
Revision ID: 515dc3f52c6d
Revises:
Create Date: 2022-09-17 22:39:15.969694
Create Date: 2022-09-18 00:30:56.348634
"""
import sqlalchemy as sa
@ -10,7 +10,7 @@ import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "5bdb99885a5d"
revision = "515dc3f52c6d"
down_revision = None
branch_labels = None
depends_on = None
@ -69,8 +69,8 @@ def upgrade() -> None:
sa.Column("code", sa.Text(), nullable=False),
sa.Column("year", sa.Integer(), nullable=False),
sa.Column("compulsory", sa.Boolean(), server_default="1", nullable=False),
sa.Column("role_id", sa.Integer(), nullable=True),
sa.Column("overarching_role_id", sa.Integer(), nullable=True),
sa.Column("role_id", sa.BigInteger(), nullable=True),
sa.Column("overarching_role_id", sa.BigInteger(), nullable=True),
sa.Column("log_announcements", sa.Boolean(), server_default="0", nullable=False),
sa.PrimaryKeyConstraint("course_id"),
sa.UniqueConstraint("code"),

View File

@ -198,8 +198,8 @@ class UforaCourse(Base):
code: str = Column(Text, nullable=False, unique=True)
year: int = Column(Integer, nullable=False)
compulsory: bool = Column(Boolean, server_default="1", nullable=False)
role_id: Optional[int] = Column(Integer, nullable=True, unique=False)
overarching_role_id: Optional[int] = Column(Integer, nullable=True, unique=False)
role_id: Optional[int] = Column(BigInteger, nullable=True, unique=False)
overarching_role_id: Optional[int] = Column(BigInteger, nullable=True, unique=False)
log_announcements: bool = Column(Boolean, server_default="0", nullable=False)
announcements: list[UforaAnnouncement] = relationship(

View File

@ -10,7 +10,14 @@ async def main():
"""Add debug Ufora courses"""
session: AsyncSession
async with DBSession() as session:
modsim = UforaCourse(course_id=439235, code="C003786", name="Modelleren en Simuleren", year=3, compulsory=False)
modsim = UforaCourse(
course_id=439235,
code="C003786",
name="Modelleren en Simuleren",
year=3,
compulsory=False,
role_id=785577582561067028,
)
session.add_all([modsim])
await session.commit()

View File

@ -11,9 +11,12 @@ from didier import Didier
from didier.data.apis.hydra import fetch_menu
from didier.data.embeds.deadlines import Deadlines
from didier.data.embeds.hydra import no_menu_found
from didier.exceptions import HTTPException
from didier.data.embeds.schedules import Schedule, get_schedule_for_user
from didier.exceptions import HTTPException, NotInMainGuildException
from didier.utils.discord.converters.time import DateTransformer
from didier.utils.discord.flags.school import StudyGuideFlags
from didier.utils.discord.users import to_main_guild_member
from didier.utils.types.datetime import skip_weekends
class School(commands.Cog):
@ -33,6 +36,30 @@ class School(commands.Cog):
embed = Deadlines(deadlines).to_embed()
await ctx.reply(embed=embed, mention_author=False, ephemeral=False)
@commands.hybrid_command(
name="les", description="Show your personalized schedule for a given day.", aliases=["Sched", "Schedule"]
)
@app_commands.rename(day_dt="date")
async def les(self, ctx: commands.Context, day_dt: Optional[app_commands.Transform[date, DateTransformer]] = None):
"""Show your personalized schedule for a given day."""
if day_dt is None:
day_dt = date.today()
day_dt = skip_weekends(day_dt)
async with ctx.typing():
try:
member_instance = to_main_guild_member(self.client, ctx.author)
# Always make sure there is at least one schedule in case it returns None
# this allows proper error messages
schedule = get_schedule_for_user(self.client, member_instance, day_dt) or Schedule()
return await ctx.reply(embed=schedule.to_embed(day=day_dt), mention_author=False)
except NotInMainGuildException:
return await ctx.reply(f"You are not a member of {self.client.main_guild.name}.", mention_author=False)
@commands.hybrid_command(
name="menu",
description="Show the menu in the Ghent University restaurants.",

View File

@ -11,8 +11,8 @@ from database.crud.birthdays import get_birthdays_on_day
from database.crud.ufora_announcements import remove_old_announcements
from database.crud.wordle import set_daily_word
from didier import Didier
from didier.data.embeds.schedules import Schedule, parse_schedule_from_content
from didier.data.embeds.ufora.announcements import fetch_ufora_announcements
from didier.data.schedules import parse_schedule_from_content
from didier.decorators.tasks import timed_task
from didier.utils.discord.checks import is_owner
from didier.utils.types.datetime import LOCAL_TIMEZONE, tz_aware_now
@ -122,6 +122,8 @@ class Tasks(commands.Cog):
"""
_ = kwargs
new_schedules: dict[settings.ScheduleType, Schedule] = {}
async with self.client.postgres_session as session:
for data in settings.SCHEDULE_DATA:
if data.schedule_url is None:
@ -140,7 +142,14 @@ class Tasks(commands.Cog):
with open(f"files/schedules/{data.name}.ics", "w+") as fp:
fp.write(content)
await parse_schedule_from_content(content, database_session=session)
schedule = await parse_schedule_from_content(content, database_session=session)
if schedule is None:
continue
new_schedules[data.name] = schedule
# Only replace cached version if all schedules succeeded
self.client.schedules = new_schedules
@tasks.loop(minutes=10)
@timed_task(enums.TaskType.UFORA_ANNOUNCEMENTS)
@ -198,4 +207,3 @@ async def setup(client: Didier):
cog = Tasks(client)
await client.add_cog(cog)
await cog.reset_wordle_word()
# await cog.pull_schedules()

View File

@ -0,0 +1,219 @@
from __future__ import annotations
import pathlib
import re
from dataclasses import dataclass, field
from datetime import date, datetime
from typing import TYPE_CHECKING, Optional
if TYPE_CHECKING:
from didier import Didier
import discord
from ics import Calendar
from overrides import overrides
from sqlalchemy.ext.asyncio import AsyncSession
from database.crud.ufora_courses import get_course_by_code
from database.schemas import UforaCourse
from didier.data.embeds.base import EmbedBaseModel
from didier.utils.discord import colours
from didier.utils.types.datetime import LOCAL_TIMEZONE, int_to_weekday, time_string
from didier.utils.types.string import leading
from settings import ScheduleType
__all__ = ["Schedule", "get_schedule_for_user", "parse_schedule_from_content", "parse_schedule"]
@dataclass
class Schedule(EmbedBaseModel):
"""An entire schedule"""
slots: set[ScheduleSlot] = field(default_factory=set)
def __add__(self, other) -> Schedule:
"""Combine schedules using the + operator"""
if not isinstance(other, Schedule):
raise TypeError("Argument to __add__ must be a Schedule")
return Schedule(slots=self.slots.union(other.slots))
def __bool__(self) -> bool:
"""Make empty schedules falsy"""
return bool(self.slots)
def on_day(self, day: date) -> Schedule:
"""Only show courses on a given day"""
return Schedule(set(filter(lambda slot: slot.start_time.date() == day, self.slots)))
def personalize(self, roles: set[int]) -> Schedule:
"""Personalize a schedule for a user, only adding courses they follow"""
personal_slots = set()
for slot in self.slots:
role_found = slot.role_id is not None and slot.role_id in roles
overarching_role_found = slot.overarching_role_id is not None and slot.overarching_role_id in roles
if role_found or overarching_role_found:
personal_slots.add(slot)
return Schedule(personal_slots)
@overrides
def to_embed(self, **kwargs) -> discord.Embed:
day: date = kwargs.get("day", date.today())
day_str = f"{leading('0', str(day.day))}/{leading('0', str(day.month))}/{leading('0', str(day.year))}"
embed = discord.Embed(title=f"Schedule - {int_to_weekday(day.weekday())} {day_str}")
if self:
embed.colour = colours.ghent_university_blue()
else:
embed.colour = colours.error_red()
embed.description = (
"No planned classes found.\n\n"
"In case this doesn't seem right, "
"make sure that you've got the roles of all of courses that you're taking on.\n\n"
"In case it does, enjoy your day off!"
)
return embed
slots_sorted = sorted(list(self.slots), key=lambda k: k.start_time)
description_data = []
for slot in slots_sorted:
description_data.append(
f"{time_string(slot.start_time)} - {time_string(slot.end_time)}: {slot.course.name} "
f"in **{slot.location}**"
)
embed.description = "\n".join(description_data)
return embed
@dataclass
class ScheduleSlot:
"""A slot in the schedule"""
course: UforaCourse
start_time: datetime
end_time: datetime
location: str
_hash: int = field(init=False)
def __post_init__(self):
"""Fix some properties to display more nicely"""
# Re-format the location data
room, building, campus = re.search(r"(.*)\. Gebouw (.*)\. Campus (.*)\. ", self.location).groups()
self.location = f"{campus} {building} {room}"
self._hash = hash(f"{self.course.course_id} {str(self.start_time)}")
@property
def overarching_role_id(self) -> Optional[int]:
"""Shortcut to getting the overarching role id for this slot"""
return self.course.overarching_role_id
@property
def role_id(self) -> Optional[int]:
"""Shortcut to getting the role id for this slot"""
return self.course.role_id
@overrides
def __hash__(self) -> int:
return self._hash
@overrides
def __eq__(self, other):
if not isinstance(other, ScheduleSlot):
return False
return self._hash == other._hash
def get_schedule_for_user(client: Didier, member: discord.Member, day_dt: date) -> Optional[Schedule]:
"""Get a user's schedule"""
roles: set[int] = {role.id for role in member.roles}
main_schedule: Optional[Schedule] = None
for schedule in client.schedules.values():
personalized_schedule = schedule.on_day(day_dt).personalize(roles)
if not personalized_schedule:
continue
# Add the personalized one to the current main schedule
if main_schedule is None:
main_schedule = personalized_schedule
else:
main_schedule = main_schedule + personalized_schedule
return main_schedule
def parse_course_code(summary: str) -> str:
"""Parse a course's code out of the summary"""
code = re.search(r"^([^ ]+)\. ", summary)
if code is None:
return summary
code_group = code.groups()[0]
# Strip off last character as it's not relevant
if code_group[-1].isalpha():
return code_group[:-1]
return code_group
def parse_time_string(string: str) -> datetime:
"""Parse an ISO string to a timezone-aware datetime instance"""
return datetime.fromisoformat(string).astimezone(LOCAL_TIMEZONE)
async def parse_schedule_from_content(content: str, *, database_session: AsyncSession) -> Schedule:
"""Parse a schedule file, taking the file content as an argument
This can be used to avoid unnecessarily opening the file again if you already have its contents
"""
calendar = Calendar(content)
events = list(calendar.events)
course_codes: dict[str, UforaCourse] = {}
slots: set[ScheduleSlot] = set()
for event in events:
code = parse_course_code(event.name)
if code not in course_codes:
course = await get_course_by_code(database_session, code)
if course is None:
# raise ValueError(f"Unable to find course with code {code} (event {event.name})") # noqa: E800
continue # TODO uncomment the line above after all courses have been added
course_codes[code] = course
# Overwrite the name to be the sanitized value
event.name = code
slot = ScheduleSlot(
course=course_codes[code],
start_time=parse_time_string(str(event.begin)),
end_time=parse_time_string(str(event.end)),
location=event.location,
)
slots.add(slot)
return Schedule(slots=slots)
async def parse_schedule(name: ScheduleType, *, database_session: AsyncSession) -> Optional[Schedule]:
"""Read and then parse a schedule file"""
schedule_path = pathlib.Path(f"files/schedules/{name}.ics")
if not schedule_path.exists():
return None
with open(schedule_path, "r", encoding="utf-8") as fp:
return await parse_schedule_from_content(fp.read(), database_session=database_session)

View File

@ -1,116 +0,0 @@
from __future__ import annotations
import pathlib
import re
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
from arrow import Arrow
from ics import Calendar
from overrides import overrides
from sqlalchemy.ext.asyncio import AsyncSession
from database.crud.ufora_courses import get_course_by_code
from database.schemas import UforaCourse
from didier.utils.types.datetime import LOCAL_TIMEZONE
from settings import ScheduleType
__all__ = ["Schedule", "parse_schedule_from_content", "parse_schedule"]
@dataclass
class Schedule:
"""An entire schedule"""
slots: set[ScheduleSlot]
@dataclass
class ScheduleSlot:
"""A slot in the schedule"""
course: UforaCourse
start_time: datetime
end_time: datetime
location: str
_hash: int = field(init=False)
def __post_init__(self):
"""Fix some properties to display more nicely"""
# Re-format the location data
room, building, campus = re.search(r"Leslokaal (.*)\. Gebouw (.*)\. Campus (.*)\. ", self.location).groups()
self.location = f"{campus} {building} {room}"
self._hash = hash(f"{self.course.course_id} {str(self.start_time)}")
@overrides
def __hash__(self) -> int:
return self._hash
@overrides
def __eq__(self, other: ScheduleSlot):
return self._hash == other._hash
def parse_course_code(summary: str) -> str:
"""Parse a course's code out of the summary"""
code = re.search(r"^([^ ]+)\. ", summary).groups()[0]
# Strip off last character as it's not relevant
if code[-1].isalpha():
return code[:-1]
return code
def parse_time_string(string: str) -> datetime:
"""Parse an ISO string to a timezone-aware datetime instance"""
return datetime.fromisoformat(string).astimezone(LOCAL_TIMEZONE)
async def parse_schedule_from_content(content: str, *, database_session: AsyncSession) -> Schedule:
"""Parse a schedule file, taking the file content as an argument
This can be used to avoid unnecessarily opening the file again if you already have its contents
"""
calendar = Calendar(content)
day = Arrow(year=2022, month=9, day=26)
events = list(calendar.timeline.on(day))
course_codes: dict[str, UforaCourse] = {}
slots: set[ScheduleSlot] = set()
for event in events:
code = parse_course_code(event.name)
if code not in course_codes:
course = await get_course_by_code(database_session, code)
if course is None:
# raise ValueError(f"Unable to find course with code {code} (event {event.name})")
continue # TODO uncomment the line above
course_codes[code] = course
# Overwrite the name to be the sanitized value
event.name = code
slot = ScheduleSlot(
course=course_codes[code],
start_time=parse_time_string(str(event.begin)),
end_time=parse_time_string(str(event.end)),
location=event.location,
)
slots.add(slot)
return Schedule(slots=slots)
async def parse_schedule(name: ScheduleType, *, database_session: AsyncSession) -> Optional[Schedule]:
"""Read and then parse a schedule file"""
schedule_path = pathlib.Path(f"files/schedules/{name}.ics")
if not schedule_path.exists():
return None
with open(schedule_path, "r", encoding="utf-8") as fp:
return await parse_schedule_from_content(fp.read(), database_session=database_session)

View File

@ -1,6 +1,7 @@
import logging
import os
import pathlib
from functools import cached_property
import discord
from aiohttp import ClientSession
@ -13,7 +14,7 @@ from database.crud import custom_commands
from database.engine import DBSession
from database.utils.caches import CacheManager
from didier.data.embeds.error_embed import create_error_embed
from didier.data.schedules import Schedule, parse_schedule
from didier.data.embeds.schedules import Schedule, parse_schedule
from didier.exceptions import HTTPException, NoMatch
from didier.utils.discord.prefix import get_prefix
@ -52,6 +53,11 @@ class Didier(commands.Bot):
self.tree.on_error = self.on_app_command_error
@cached_property
def main_guild(self) -> discord.Guild:
"""Obtain a reference to the main guild"""
return self.get_guild(settings.DISCORD_MAIN_GUILD)
@property
def postgres_session(self) -> AsyncSession:
"""Obtain a session for the PostgreSQL database"""

View File

@ -1,5 +1,6 @@
from .http_exception import HTTPException
from .missing_env import MissingEnvironmentVariable
from .no_match import NoMatch, expect
from .not_in_main_guild_exception import NotInMainGuildException
__all__ = ["HTTPException", "MissingEnvironmentVariable", "NoMatch", "expect"]
__all__ = ["HTTPException", "MissingEnvironmentVariable", "NoMatch", "expect", "NotInMainGuildException"]

View File

@ -0,0 +1,17 @@
from typing import Union
import discord
import settings
__all__ = ["NotInMainGuildException"]
class NotInMainGuildException(ValueError):
"""Exception raised when a user is not a member of the main guild"""
def __init__(self, user: Union[discord.User, discord.Member]):
super().__init__(
f"User {user.display_name} (id {user.id}) "
f"is not a member of the configured main guild (id {settings.DISCORD_MAIN_GUILD})."
)

View File

@ -1,6 +1,10 @@
import discord
__all__ = ["ghent_university_blue", "ghent_university_yellow", "google_blue", "urban_dictionary_green"]
__all__ = ["error_red", "ghent_university_blue", "ghent_university_yellow", "google_blue", "urban_dictionary_green"]
def error_red() -> discord.Colour:
return discord.Colour.red()
def ghent_university_blue() -> discord.Colour:

View File

@ -53,7 +53,7 @@ def date_converter(argument: Optional[str]) -> date:
raise commands.ArgumentParsingError(f"Unable to interpret `{original_argument}` as a date.")
class DateTransformer(app_commands.Transformer):
class DateTransformer(commands.Converter, app_commands.Transformer):
"""Application commands transformer for dates"""
@overrides
@ -62,6 +62,10 @@ class DateTransformer(app_commands.Transformer):
) -> list[app_commands.Choice[Union[int, float, str]]]:
return autocomplete_day(str(value))
@overrides
async def convert(self, ctx: commands.Context, argument: str) -> datetime.date:
return date_converter(argument)
@overrides
async def transform(self, interaction: discord.Interaction, value: str) -> datetime.date:
return date_converter(value)

View File

@ -0,0 +1,26 @@
from typing import Union
import discord
from didier import Didier
from didier.exceptions import NotInMainGuildException
__all__ = ["to_main_guild_member"]
def to_main_guild_member(client: Didier, user: Union[discord.User, discord.Member]) -> discord.Member:
"""Turn a discord.User into a discord.Member instance
This assumes the user is in CoC. If not, it raises a NotInMainGuildException
"""
main_guild = client.main_guild
# Already a discord.Member instance
if isinstance(user, discord.Member) and user.guild == main_guild:
return user
member = main_guild.get_member(user.id)
if member is None:
raise NotInMainGuildException(user)
return member

View File

@ -8,9 +8,11 @@ __all__ = [
"forward_to_next_weekday",
"int_to_weekday",
"parse_dm_string",
"skip_weekends",
"str_to_date",
"str_to_month",
"str_to_weekday",
"time_string",
"tz_aware_now",
]
@ -86,6 +88,12 @@ def parse_dm_string(argument: str) -> datetime.date:
raise ValueError
def skip_weekends(dt_instance: datetime.date) -> datetime.date:
"""Fast-forward a date instance until its weekday is no longer a weekend"""
to_skip = (7 - dt_instance.weekday()) if dt_instance.weekday() > 4 else 0
return dt_instance + datetime.timedelta(days=to_skip)
def str_to_date(date_str: str, formats: Union[list[str], str] = "%d/%m/%Y") -> datetime.date:
"""Turn a string into a DD/MM/YYYY date"""
# Allow passing multiple formats in a list
@ -171,6 +179,11 @@ def str_to_weekday(argument: str) -> int:
raise ValueError
def time_string(dt_instance: datetime.datetime) -> str:
"""Get an HH:MM representation of a datetime instance"""
return dt_instance.strftime("%H:%M")
def tz_aware_now() -> datetime.datetime:
"""Get the current date & time, but timezone-aware"""
return datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc).astimezone(LOCAL_TIMEZONE)

View File

@ -11,7 +11,10 @@ from didier import Didier
async def run_bot():
"""Run Didier"""
didier = Didier()
await didier.start(settings.DISCORD_TOKEN)
try:
await didier.start(settings.DISCORD_TOKEN)
finally:
await didier.http_session.close()
def setup_logging():

View File

@ -38,7 +38,7 @@ plugins = [
"sqlalchemy.ext.mypy.plugin"
]
[[tool.mypy.overrides]]
module = ["discord.*", "feedparser.*", "markdownify.*", "motor.*"]
module = ["discord.*", "feedparser.*", "ics.*", "markdownify.*"]
ignore_missing_imports = true
[tool.pytest.ini_options]

View File

@ -56,7 +56,7 @@ POSTGRES_PORT: int = env.int("POSTGRES_PORT", "5432")
DISCORD_TOKEN: str = env.str("DISCORD_TOKEN")
DISCORD_READY_MESSAGE: str = env.str("DISCORD_READY_MESSAGE", "I'M READY I'M READY I'M READY")
DISCORD_STATUS_MESSAGE: str = env.str("DISCORD_STATUS_MESSAGE", "with your Didier Dinks.")
DISCORD_MAIN_GUILD: Optional[int] = env.int("DISCORD_MAIN_GUILD", 626699611192688641)
DISCORD_MAIN_GUILD: int = env.int("DISCORD_MAIN_GUILD")
DISCORD_TEST_GUILDS: list[int] = env.list("DISCORD_TEST_GUILDS", [], subcast=int)
DISCORD_OWNER_GUILDS: Optional[list[int]] = env.list("DISCORD_OWNER_GUILDS", [], subcast=int) or None
DISCORD_BOOS_REACT: str = env.str("DISCORD_BOOS_REACT", "<:boos:629603785840263179>")