mirror of https://github.com/stijndcl/didier
Ufora announcements
parent
bacd2d77fb
commit
6873cab955
|
@ -0,0 +1,32 @@
|
|||
"""Add option to disable announcement fetching for courses
|
||||
|
||||
Revision ID: d3cd92cb9efe
|
||||
Revises: 9e8ce58c0a26
|
||||
Create Date: 2022-06-18 00:36:00.484627
|
||||
|
||||
"""
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = "d3cd92cb9efe"
|
||||
down_revision = "9e8ce58c0a26"
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
with op.batch_alter_table("ufora_courses", schema=None) as batch_op:
|
||||
batch_op.add_column(sa.Column("log_announcements", sa.Boolean(), nullable=False, server_default=sa.false()))
|
||||
|
||||
# ### end Alembic commands ###
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
with op.batch_alter_table("ufora_courses", schema=None) as batch_op:
|
||||
batch_op.drop_column("log_announcements")
|
||||
|
||||
# ### end Alembic commands ###
|
|
@ -0,0 +1,17 @@
|
|||
from sqlalchemy import select
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from database.models import UforaCourse, UforaAnnouncement
|
||||
|
||||
|
||||
async def get_courses_with_announcements(session: AsyncSession) -> list[UforaCourse]:
|
||||
"""Get all courses where announcements are enabled"""
|
||||
query = select(UforaCourse).where(UforaCourse.log_announcements)
|
||||
return (await session.execute(query)).scalars().all()
|
||||
|
||||
|
||||
async def create_new_announcement(session: AsyncSession, announcement_id: int, course: UforaCourse):
|
||||
"""Add a new announcement to the database"""
|
||||
new_announcement = UforaAnnouncement(announcement_id=announcement_id, course=course)
|
||||
session.add(new_announcement)
|
||||
await session.commit()
|
|
@ -7,7 +7,7 @@ from sqlalchemy.orm import sessionmaker
|
|||
import settings
|
||||
|
||||
# Run local tests against SQLite instead of Postgres
|
||||
if settings.DB_TEST_SQLITE:
|
||||
if settings.TESTING and settings.DB_TEST_SQLITE:
|
||||
engine = create_async_engine(
|
||||
URL.create(
|
||||
drivername="sqlite+aiosqlite",
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
from __future__ import annotations
|
||||
|
||||
from sqlalchemy import Column, Integer, Text, ForeignKey
|
||||
from sqlalchemy import Column, Integer, Text, ForeignKey, Boolean
|
||||
from sqlalchemy.orm import declarative_base, relationship
|
||||
|
||||
Base = declarative_base()
|
||||
|
@ -15,12 +15,13 @@ class UforaCourse(Base):
|
|||
name: str = Column(Text, nullable=False, unique=True)
|
||||
code: str = Column(Text, nullable=False, unique=True)
|
||||
year: int = Column(Integer, nullable=False)
|
||||
log_announcements: bool = Column(Boolean, default=False, nullable=False)
|
||||
|
||||
announcements: list[UforaAnnouncement] = relationship(
|
||||
"UforaAnnouncement", back_populates="course", cascade="all, delete-orphan"
|
||||
"UforaAnnouncement", back_populates="course", cascade="all, delete-orphan", lazy="selectin"
|
||||
)
|
||||
aliases: list[UforaCourseAlias] = relationship(
|
||||
"UforaCourseAlias", back_populates="course", cascade="all, delete-orphan"
|
||||
"UforaCourseAlias", back_populates="course", cascade="all, delete-orphan", lazy="selectin"
|
||||
)
|
||||
|
||||
|
||||
|
@ -33,7 +34,7 @@ class UforaCourseAlias(Base):
|
|||
alias: str = Column(Text, nullable=False, unique=True)
|
||||
course_id: int = Column(Integer, ForeignKey("ufora_courses.course_id"))
|
||||
|
||||
course: UforaCourse = relationship("UforaCourse", back_populates="aliases", uselist=False)
|
||||
course: UforaCourse = relationship("UforaCourse", back_populates="aliases", uselist=False, lazy="selectin")
|
||||
|
||||
|
||||
class UforaAnnouncement(Base):
|
||||
|
@ -44,4 +45,4 @@ class UforaAnnouncement(Base):
|
|||
announcement_id = Column(Integer, primary_key=True)
|
||||
course_id = Column(Integer, ForeignKey("ufora_courses.course_id"))
|
||||
|
||||
course: UforaCourse = relationship("UforaCourse", back_populates="announcements", uselist=False)
|
||||
course: UforaCourse = relationship("UforaCourse", back_populates="announcements", uselist=False, lazy="selectin")
|
||||
|
|
|
@ -0,0 +1,49 @@
|
|||
import traceback
|
||||
|
||||
from discord.ext import commands, tasks
|
||||
|
||||
import settings
|
||||
from didier import Didier
|
||||
from didier.data.embeds.ufora.announcements import fetch_ufora_announcements
|
||||
|
||||
|
||||
# TODO task to clean up old announcements? (> 1 week)
|
||||
class Tasks(commands.Cog):
|
||||
"""Task loops that run periodically"""
|
||||
|
||||
client: Didier
|
||||
|
||||
def __init__(self, client: Didier):
|
||||
self.client = client
|
||||
|
||||
# Only pull announcements if a token was provided
|
||||
if settings.UFORA_RSS_TOKEN is not None and settings.UFORA_ANNOUNCEMENTS_CHANNEL is not None:
|
||||
self.pull_ufora_announcements.start() # pylint: disable=no-member
|
||||
|
||||
@tasks.loop(minutes=10)
|
||||
async def pull_ufora_announcements(self):
|
||||
"""Task that checks for new Ufora announcements & logs them in a channel"""
|
||||
# In theory this shouldn't happen but just to please Mypy
|
||||
if settings.UFORA_RSS_TOKEN is None or settings.UFORA_ANNOUNCEMENTS_CHANNEL is None:
|
||||
return
|
||||
|
||||
announcements_channel = self.client.get_channel(settings.UFORA_ANNOUNCEMENTS_CHANNEL)
|
||||
announcements = await fetch_ufora_announcements(self.client.db_session)
|
||||
|
||||
for announcement in announcements:
|
||||
await announcements_channel.send(embed=announcement.to_embed())
|
||||
|
||||
@pull_ufora_announcements.before_loop
|
||||
async def _before_ufora_announcements(self):
|
||||
"""Don't try to get announcements if the bot isn't ready yet"""
|
||||
await self.client.wait_until_ready()
|
||||
|
||||
@pull_ufora_announcements.error
|
||||
async def _on_announcements_error(self, error: BaseException):
|
||||
"""Error handler for the Ufora Announcements task"""
|
||||
print("".join(traceback.format_exception(type(error), error, error.__traceback__)))
|
||||
|
||||
|
||||
async def setup(client: Didier):
|
||||
"""Load the cog"""
|
||||
await client.add_cog(Tasks(client))
|
|
@ -0,0 +1,145 @@
|
|||
import re
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime
|
||||
from typing import Optional
|
||||
|
||||
import discord
|
||||
import feedparser
|
||||
import pytz
|
||||
from markdownify import markdownify as md
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
import settings
|
||||
from database.crud import ufora_announcements as crud
|
||||
from database.models import UforaCourse
|
||||
|
||||
|
||||
@dataclass
|
||||
class UforaNotification:
|
||||
"""A single notification from Ufora"""
|
||||
|
||||
content: dict
|
||||
course: UforaCourse
|
||||
notification_id: Optional[int] = None
|
||||
course_id: Optional[int] = None
|
||||
|
||||
_view_url: str = field(init=False)
|
||||
_title: str = field(init=False)
|
||||
_description: str = field(init=False)
|
||||
_published: str = field(init=False)
|
||||
|
||||
def __post_init__(self):
|
||||
self._view_url = self._create_url()
|
||||
self._title = self._clean_content(self.content["title"])
|
||||
self._description = self._get_description()
|
||||
self._published = self._get_published()
|
||||
|
||||
def to_embed(self) -> discord.Embed:
|
||||
"""Turn the notification into an embed"""
|
||||
embed = discord.Embed(colour=discord.Colour.from_rgb(30, 100, 200))
|
||||
|
||||
embed.set_author(name=self.course.name)
|
||||
embed.title = self._title
|
||||
embed.url = self._view_url
|
||||
embed.description = self._description
|
||||
embed.set_footer(text=self._published)
|
||||
|
||||
return embed
|
||||
|
||||
def get_id(self) -> int:
|
||||
"""Parse the id out of the notification"""
|
||||
return int(self.notification_id) if self.notification_id is not None else self.content["id"]
|
||||
|
||||
def _create_url(self):
|
||||
if self.notification_id is None or self.course_id is None:
|
||||
return self.content["link"]
|
||||
|
||||
return f"https://ufora.ugent.be/d2l/le/news/{self.course_id}/{self.notification_id}/view?ou={self.course_id}"
|
||||
|
||||
def _get_description(self):
|
||||
desc = self._clean_content(self.content["summary"])
|
||||
|
||||
if len(desc) > 4096:
|
||||
return desc[:4093] + "..."
|
||||
|
||||
return desc
|
||||
|
||||
def _clean_content(self, text: str):
|
||||
# Escape *-characters because they mess up the layout
|
||||
text = text.replace("*", "\\*")
|
||||
return md(text)
|
||||
|
||||
def _get_published(self) -> str:
|
||||
# Datetime is unable to parse the timezone because it's useless
|
||||
# We will hereby cut it out and pray the timezone will always be UTC+0
|
||||
published = self.content["published"].rsplit(" ", 1)[0]
|
||||
time_string = "%a, %d %b %Y %H:%M:%S"
|
||||
dt = datetime.strptime(published, time_string).astimezone(pytz.timezone("Europe/Brussels"))
|
||||
|
||||
# Apply timezone offset in a hacky way
|
||||
offset = dt.utcoffset()
|
||||
if offset is not None:
|
||||
dt += offset
|
||||
|
||||
# TODO
|
||||
return "Placeholder :) TODO make the functions to format this"
|
||||
|
||||
|
||||
def parse_ids(url: str) -> Optional[tuple[int, int]]:
|
||||
"""Parse the notification & course id out of a notification url"""
|
||||
match = re.search(r"\d+-\d+$", url)
|
||||
|
||||
if not match:
|
||||
return None
|
||||
|
||||
spl = match[0].split("-")
|
||||
return int(spl[0]), int(spl[1])
|
||||
|
||||
|
||||
async def fetch_ufora_announcements(session: AsyncSession) -> list[UforaNotification]:
|
||||
"""Fetch all new announcements"""
|
||||
notifications: list[UforaNotification] = []
|
||||
|
||||
# No token provided, don't fetch announcements
|
||||
if settings.UFORA_RSS_TOKEN is None:
|
||||
return notifications
|
||||
|
||||
courses = await crud.get_courses_with_announcements(session)
|
||||
|
||||
for course in courses:
|
||||
course_announcement_ids = list(map(lambda announcement: announcement.announcement_id, course.announcements))
|
||||
|
||||
course_url = (
|
||||
f"https://ufora.ugent.be/d2l/le/news/rss/{course.course_id}/course?token={settings.UFORA_RSS_TOKEN}"
|
||||
)
|
||||
|
||||
# Get the updated feed
|
||||
feed = feedparser.parse(course_url)
|
||||
|
||||
# Remove old notifications
|
||||
fresh_feed: list[dict] = []
|
||||
for entry in feed["entries"]:
|
||||
parsed = parse_ids(entry["id"])
|
||||
if parsed is None:
|
||||
continue
|
||||
|
||||
if parsed[0] not in course_announcement_ids:
|
||||
fresh_feed.append(entry)
|
||||
|
||||
if fresh_feed:
|
||||
for item in fresh_feed:
|
||||
# Parse id's out
|
||||
# Technically this can't happen but Mypy angry
|
||||
parsed = parse_ids(item["id"])
|
||||
|
||||
if parsed is None:
|
||||
continue
|
||||
|
||||
# Create a new notification
|
||||
notification_id, course_id = parsed
|
||||
notifications.append(UforaNotification(item, course, notification_id, course_id))
|
||||
|
||||
# Create new db entry
|
||||
await crud.create_new_announcement(session, notification_id, course)
|
||||
|
||||
return notifications
|
|
@ -6,23 +6,25 @@ plugins = [
|
|||
"sqlalchemy.ext.mypy.plugin"
|
||||
]
|
||||
[[tool.mypy.overrides]]
|
||||
module = "discord.*"
|
||||
module = ["discord.*", "feedparser.*", "markdownify.*"]
|
||||
ignore_missing_imports = true
|
||||
|
||||
[tool.pylint.master]
|
||||
disable = [
|
||||
"missing-module-docstring",
|
||||
"too-few-public-methods",
|
||||
"too-many-arguments"
|
||||
"too-many-arguments",
|
||||
"too-many-instance-attributes"
|
||||
]
|
||||
|
||||
[tool.pylint.format]
|
||||
max-line-length = 120
|
||||
good-names = ["i"]
|
||||
good-names = ["i", "dt"]
|
||||
|
||||
[tool.pytest.ini_options]
|
||||
asyncio_mode = "auto"
|
||||
env = [
|
||||
"TESTING = true",
|
||||
"DB_NAME = didier_action",
|
||||
"DB_USERNAME = postgres",
|
||||
"DB_HOST = localhost",
|
||||
|
|
|
@ -6,3 +6,4 @@ pytest==7.1.2
|
|||
pytest-asyncio==0.18.3
|
||||
pytest-env==0.6.2
|
||||
sqlalchemy2-stubs==0.0.2a23
|
||||
types-pytz==2021.3.8
|
|
@ -4,4 +4,7 @@ asyncpg==0.25.0
|
|||
# Dev version of dpy
|
||||
git+https://github.com/Rapptz/discord.py
|
||||
environs==9.5.0
|
||||
feedparser==6.0.10
|
||||
markdownify==0.11.2
|
||||
pytz==2022.1
|
||||
sqlalchemy[asyncio]==1.4.37
|
||||
|
|
|
@ -1,3 +1,5 @@
|
|||
from typing import Optional
|
||||
|
||||
from environs import Env
|
||||
|
||||
# Read the .env file (if present)
|
||||
|
@ -7,6 +9,7 @@ env.read_env()
|
|||
"""General config"""
|
||||
SANDBOX: bool = env.bool("SANDBOX", True)
|
||||
LOGFILE: str = env.str("LOGFILE", "didier.log")
|
||||
TESTING: bool = env.bool("TESTING", False)
|
||||
|
||||
"""Database"""
|
||||
DB_NAME: str = env.str("DB_NAME", "didier")
|
||||
|
@ -21,3 +24,7 @@ DISCORD_TOKEN: str = env.str("DISC_TOKEN")
|
|||
DISCORD_READY_MESSAGE: str = env.str("DISC_READY_MESSAGE", "I'M READY I'M READY I'M READY")
|
||||
DISCORD_STATUS_MESSAGE: str = env.str("DISC_STATUS_MESSAGE", "with your Didier Dinks.")
|
||||
DISCORD_TEST_GUILDS: list[int] = env.list("DISC_TEST_GUILDS", [], subcast=int)
|
||||
UFORA_ANNOUNCEMENTS_CHANNEL: Optional[int] = env.int("UFORA_ANNOUNCEMENTS_CHANNEL", None)
|
||||
|
||||
"""API Keys"""
|
||||
UFORA_RSS_TOKEN: Optional[str] = env.str("UFORA_RSS_TOKEN", None)
|
||||
|
|
Loading…
Reference in New Issue