mirror of https://github.com/stijndcl/didier
Compare commits
3 Commits
bacd2d77fb
...
d75831f848
| Author | SHA1 | Date |
|---|---|---|
|
|
d75831f848 | |
|
|
d7262595c6 | |
|
|
6873cab955 |
|
|
@ -1,8 +1,8 @@
|
||||||
"""Initial migration: Ufora announcements
|
"""Initial migration
|
||||||
|
|
||||||
Revision ID: 9e8ce58c0a26
|
Revision ID: 4ec79dd5b191
|
||||||
Revises:
|
Revises:
|
||||||
Create Date: 2022-06-17 01:36:02.767151
|
Create Date: 2022-06-19 00:31:58.384360
|
||||||
|
|
||||||
"""
|
"""
|
||||||
from alembic import op
|
from alembic import op
|
||||||
|
|
@ -10,7 +10,7 @@ import sqlalchemy as sa
|
||||||
|
|
||||||
|
|
||||||
# revision identifiers, used by Alembic.
|
# revision identifiers, used by Alembic.
|
||||||
revision = '9e8ce58c0a26'
|
revision = '4ec79dd5b191'
|
||||||
down_revision = None
|
down_revision = None
|
||||||
branch_labels = None
|
branch_labels = None
|
||||||
depends_on = None
|
depends_on = None
|
||||||
|
|
@ -23,6 +23,7 @@ def upgrade() -> None:
|
||||||
sa.Column('name', sa.Text(), nullable=False),
|
sa.Column('name', sa.Text(), nullable=False),
|
||||||
sa.Column('code', sa.Text(), nullable=False),
|
sa.Column('code', sa.Text(), nullable=False),
|
||||||
sa.Column('year', sa.Integer(), nullable=False),
|
sa.Column('year', sa.Integer(), nullable=False),
|
||||||
|
sa.Column('log_announcements', sa.Boolean(), nullable=False),
|
||||||
sa.PrimaryKeyConstraint('course_id'),
|
sa.PrimaryKeyConstraint('course_id'),
|
||||||
sa.UniqueConstraint('code'),
|
sa.UniqueConstraint('code'),
|
||||||
sa.UniqueConstraint('name')
|
sa.UniqueConstraint('name')
|
||||||
|
|
@ -30,6 +31,7 @@ def upgrade() -> None:
|
||||||
op.create_table('ufora_announcements',
|
op.create_table('ufora_announcements',
|
||||||
sa.Column('announcement_id', sa.Integer(), nullable=False),
|
sa.Column('announcement_id', sa.Integer(), nullable=False),
|
||||||
sa.Column('course_id', sa.Integer(), nullable=True),
|
sa.Column('course_id', sa.Integer(), nullable=True),
|
||||||
|
sa.Column('publication_date', sa.DateTime(timezone=True), nullable=True),
|
||||||
sa.ForeignKeyConstraint(['course_id'], ['ufora_courses.course_id'], ),
|
sa.ForeignKeyConstraint(['course_id'], ['ufora_courses.course_id'], ),
|
||||||
sa.PrimaryKeyConstraint('announcement_id')
|
sa.PrimaryKeyConstraint('announcement_id')
|
||||||
)
|
)
|
||||||
|
|
@ -0,0 +1,35 @@
|
||||||
|
import datetime
|
||||||
|
|
||||||
|
from sqlalchemy import select, delete
|
||||||
|
from sqlalchemy.ext.asyncio import AsyncSession
|
||||||
|
|
||||||
|
from database.models import UforaCourse, UforaAnnouncement
|
||||||
|
|
||||||
|
|
||||||
|
async def get_courses_with_announcements(session: AsyncSession) -> list[UforaCourse]:
|
||||||
|
"""Get all courses where announcements are enabled"""
|
||||||
|
statement = select(UforaCourse).where(UforaCourse.log_announcements)
|
||||||
|
return (await session.execute(statement)).scalars().all()
|
||||||
|
|
||||||
|
|
||||||
|
async def create_new_announcement(
|
||||||
|
session: AsyncSession, announcement_id: int, course: UforaCourse, publication_date: datetime
|
||||||
|
) -> UforaAnnouncement:
|
||||||
|
"""Add a new announcement to the database"""
|
||||||
|
new_announcement = UforaAnnouncement(
|
||||||
|
announcement_id=announcement_id, course=course, publication_date=publication_date
|
||||||
|
)
|
||||||
|
session.add(new_announcement)
|
||||||
|
await session.commit()
|
||||||
|
return new_announcement
|
||||||
|
|
||||||
|
|
||||||
|
async def remove_old_announcements(session: AsyncSession):
|
||||||
|
"""Delete all announcements that are > 8 days old
|
||||||
|
The RSS feed only goes back 7 days, so all of these old announcements never have to
|
||||||
|
be checked again when checking if an announcement is fresh or not.
|
||||||
|
"""
|
||||||
|
limit = datetime.datetime.utcnow() - datetime.timedelta(days=8)
|
||||||
|
statement = delete(UforaAnnouncement).where(UforaAnnouncement.publication_date < limit)
|
||||||
|
await session.execute(statement)
|
||||||
|
await session.commit()
|
||||||
|
|
@ -7,7 +7,7 @@ from sqlalchemy.orm import sessionmaker
|
||||||
import settings
|
import settings
|
||||||
|
|
||||||
# Run local tests against SQLite instead of Postgres
|
# Run local tests against SQLite instead of Postgres
|
||||||
if settings.DB_TEST_SQLITE:
|
if settings.TESTING and settings.DB_TEST_SQLITE:
|
||||||
engine = create_async_engine(
|
engine = create_async_engine(
|
||||||
URL.create(
|
URL.create(
|
||||||
drivername="sqlite+aiosqlite",
|
drivername="sqlite+aiosqlite",
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,8 @@
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from sqlalchemy import Column, Integer, Text, ForeignKey
|
from datetime import datetime
|
||||||
|
|
||||||
|
from sqlalchemy import Column, Integer, Text, ForeignKey, Boolean, DateTime
|
||||||
from sqlalchemy.orm import declarative_base, relationship
|
from sqlalchemy.orm import declarative_base, relationship
|
||||||
|
|
||||||
Base = declarative_base()
|
Base = declarative_base()
|
||||||
|
|
@ -15,12 +17,13 @@ class UforaCourse(Base):
|
||||||
name: str = Column(Text, nullable=False, unique=True)
|
name: str = Column(Text, nullable=False, unique=True)
|
||||||
code: str = Column(Text, nullable=False, unique=True)
|
code: str = Column(Text, nullable=False, unique=True)
|
||||||
year: int = Column(Integer, nullable=False)
|
year: int = Column(Integer, nullable=False)
|
||||||
|
log_announcements: bool = Column(Boolean, default=False, nullable=False)
|
||||||
|
|
||||||
announcements: list[UforaAnnouncement] = relationship(
|
announcements: list[UforaAnnouncement] = relationship(
|
||||||
"UforaAnnouncement", back_populates="course", cascade="all, delete-orphan"
|
"UforaAnnouncement", back_populates="course", cascade="all, delete-orphan", lazy="selectin"
|
||||||
)
|
)
|
||||||
aliases: list[UforaCourseAlias] = relationship(
|
aliases: list[UforaCourseAlias] = relationship(
|
||||||
"UforaCourseAlias", back_populates="course", cascade="all, delete-orphan"
|
"UforaCourseAlias", back_populates="course", cascade="all, delete-orphan", lazy="selectin"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -33,7 +36,7 @@ class UforaCourseAlias(Base):
|
||||||
alias: str = Column(Text, nullable=False, unique=True)
|
alias: str = Column(Text, nullable=False, unique=True)
|
||||||
course_id: int = Column(Integer, ForeignKey("ufora_courses.course_id"))
|
course_id: int = Column(Integer, ForeignKey("ufora_courses.course_id"))
|
||||||
|
|
||||||
course: UforaCourse = relationship("UforaCourse", back_populates="aliases", uselist=False)
|
course: UforaCourse = relationship("UforaCourse", back_populates="aliases", uselist=False, lazy="selectin")
|
||||||
|
|
||||||
|
|
||||||
class UforaAnnouncement(Base):
|
class UforaAnnouncement(Base):
|
||||||
|
|
@ -43,5 +46,6 @@ class UforaAnnouncement(Base):
|
||||||
|
|
||||||
announcement_id = Column(Integer, primary_key=True)
|
announcement_id = Column(Integer, primary_key=True)
|
||||||
course_id = Column(Integer, ForeignKey("ufora_courses.course_id"))
|
course_id = Column(Integer, ForeignKey("ufora_courses.course_id"))
|
||||||
|
publication_date: datetime = Column(DateTime(timezone=True))
|
||||||
|
|
||||||
course: UforaCourse = relationship("UforaCourse", back_populates="announcements", uselist=False)
|
course: UforaCourse = relationship("UforaCourse", back_populates="announcements", uselist=False, lazy="selectin")
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,59 @@
|
||||||
|
import traceback
|
||||||
|
|
||||||
|
from discord.ext import commands, tasks
|
||||||
|
|
||||||
|
import settings
|
||||||
|
from database.crud.ufora_announcements import remove_old_announcements
|
||||||
|
from didier import Didier
|
||||||
|
from didier.data.embeds.ufora.announcements import fetch_ufora_announcements
|
||||||
|
|
||||||
|
|
||||||
|
class Tasks(commands.Cog):
|
||||||
|
"""Task loops that run periodically"""
|
||||||
|
|
||||||
|
client: Didier
|
||||||
|
|
||||||
|
def __init__(self, client: Didier): # pylint: disable=no-member
|
||||||
|
self.client = client
|
||||||
|
|
||||||
|
# Only pull announcements if a token was provided
|
||||||
|
if settings.UFORA_RSS_TOKEN is not None and settings.UFORA_ANNOUNCEMENTS_CHANNEL is not None:
|
||||||
|
self.pull_ufora_announcements.start()
|
||||||
|
self.remove_old_ufora_announcements.start()
|
||||||
|
|
||||||
|
@tasks.loop(minutes=10)
|
||||||
|
async def pull_ufora_announcements(self):
|
||||||
|
"""Task that checks for new Ufora announcements & logs them in a channel"""
|
||||||
|
# In theory this shouldn't happen but just to please Mypy
|
||||||
|
if settings.UFORA_RSS_TOKEN is None or settings.UFORA_ANNOUNCEMENTS_CHANNEL is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
announcements_channel = self.client.get_channel(settings.UFORA_ANNOUNCEMENTS_CHANNEL)
|
||||||
|
announcements = await fetch_ufora_announcements(self.client.db_session)
|
||||||
|
|
||||||
|
for announcement in announcements:
|
||||||
|
await announcements_channel.send(embed=announcement.to_embed())
|
||||||
|
|
||||||
|
@pull_ufora_announcements.before_loop
|
||||||
|
async def _before_ufora_announcements(self):
|
||||||
|
"""Don't try to get announcements if the bot isn't ready yet"""
|
||||||
|
await self.client.wait_until_ready()
|
||||||
|
|
||||||
|
@pull_ufora_announcements.error
|
||||||
|
async def _on_announcements_error(self, error: BaseException):
|
||||||
|
"""Error handler for the Ufora Announcements task"""
|
||||||
|
print("".join(traceback.format_exception(type(error), error, error.__traceback__)))
|
||||||
|
|
||||||
|
@tasks.loop(hours=24)
|
||||||
|
async def remove_old_ufora_announcements(self):
|
||||||
|
"""Remove all announcements that are over 1 week old, once per day"""
|
||||||
|
await remove_old_announcements(self.client.db_session)
|
||||||
|
|
||||||
|
@remove_old_ufora_announcements.before_loop
|
||||||
|
async def _before_remove_old_ufora_announcements(self):
|
||||||
|
await self.client.wait_until_ready()
|
||||||
|
|
||||||
|
|
||||||
|
async def setup(client: Didier):
|
||||||
|
"""Load the cog"""
|
||||||
|
await client.add_cog(Tasks(client))
|
||||||
|
|
@ -0,0 +1,153 @@
|
||||||
|
import re
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
from datetime import datetime
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
import discord
|
||||||
|
import feedparser
|
||||||
|
import pytz
|
||||||
|
from markdownify import markdownify as md
|
||||||
|
from sqlalchemy.ext.asyncio import AsyncSession
|
||||||
|
|
||||||
|
import settings
|
||||||
|
from database.crud import ufora_announcements as crud
|
||||||
|
from database.models import UforaCourse
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class UforaNotification:
|
||||||
|
"""A single notification from Ufora"""
|
||||||
|
|
||||||
|
content: dict
|
||||||
|
course: UforaCourse
|
||||||
|
notification_id: Optional[int] = None
|
||||||
|
course_id: Optional[int] = None
|
||||||
|
|
||||||
|
_view_url: str = field(init=False)
|
||||||
|
_title: str = field(init=False)
|
||||||
|
_description: str = field(init=False)
|
||||||
|
published_dt: datetime = field(init=False)
|
||||||
|
_published: str = field(init=False)
|
||||||
|
|
||||||
|
def __post_init__(self):
|
||||||
|
self._view_url = self._create_url()
|
||||||
|
self._title = self._clean_content(self.content["title"])
|
||||||
|
self._description = self._get_description()
|
||||||
|
self.published_dt = self._published_datetime()
|
||||||
|
self._published = self._get_published()
|
||||||
|
|
||||||
|
def to_embed(self) -> discord.Embed:
|
||||||
|
"""Turn the notification into an embed"""
|
||||||
|
embed = discord.Embed(colour=discord.Colour.from_rgb(30, 100, 200))
|
||||||
|
|
||||||
|
embed.set_author(name=self.course.name)
|
||||||
|
embed.title = self._title
|
||||||
|
embed.url = self._view_url
|
||||||
|
embed.description = self._description
|
||||||
|
embed.set_footer(text=self._published)
|
||||||
|
|
||||||
|
return embed
|
||||||
|
|
||||||
|
def get_id(self) -> int:
|
||||||
|
"""Parse the id out of the notification"""
|
||||||
|
return int(self.notification_id) if self.notification_id is not None else self.content["id"]
|
||||||
|
|
||||||
|
def _create_url(self):
|
||||||
|
if self.notification_id is None or self.course_id is None:
|
||||||
|
return self.content["link"]
|
||||||
|
|
||||||
|
return f"https://ufora.ugent.be/d2l/le/news/{self.course_id}/{self.notification_id}/view?ou={self.course_id}"
|
||||||
|
|
||||||
|
def _get_description(self):
|
||||||
|
desc = self._clean_content(self.content["summary"])
|
||||||
|
|
||||||
|
if len(desc) > 4096:
|
||||||
|
return desc[:4093] + "..."
|
||||||
|
|
||||||
|
return desc
|
||||||
|
|
||||||
|
def _clean_content(self, text: str):
|
||||||
|
# Escape *-characters because they mess up the layout
|
||||||
|
text = text.replace("*", "\\*")
|
||||||
|
return md(text)
|
||||||
|
|
||||||
|
def _published_datetime(self) -> datetime:
|
||||||
|
"""Get a datetime instance of the publication date"""
|
||||||
|
# Datetime is unable to parse the timezone because it's useless
|
||||||
|
# We will hereby cut it out and pray the timezone will always be UTC+0
|
||||||
|
published = self.content["published"].rsplit(" ", 1)[0]
|
||||||
|
time_string = "%a, %d %b %Y %H:%M:%S"
|
||||||
|
dt = datetime.strptime(published, time_string).astimezone(pytz.timezone("Europe/Brussels"))
|
||||||
|
|
||||||
|
# Apply timezone offset in a hacky way
|
||||||
|
offset = dt.utcoffset()
|
||||||
|
if offset is not None:
|
||||||
|
dt += offset
|
||||||
|
|
||||||
|
return dt
|
||||||
|
|
||||||
|
def _get_published(self) -> str:
|
||||||
|
"""Get a formatted string that represents when this announcement was published"""
|
||||||
|
# TODO
|
||||||
|
return "Placeholder :) TODO make the functions to format this"
|
||||||
|
|
||||||
|
|
||||||
|
def parse_ids(url: str) -> Optional[tuple[int, int]]:
|
||||||
|
"""Parse the notification & course id out of a notification url"""
|
||||||
|
match = re.search(r"\d+-\d+$", url)
|
||||||
|
|
||||||
|
if not match:
|
||||||
|
return None
|
||||||
|
|
||||||
|
spl = match[0].split("-")
|
||||||
|
return int(spl[0]), int(spl[1])
|
||||||
|
|
||||||
|
|
||||||
|
async def fetch_ufora_announcements(session: AsyncSession) -> list[UforaNotification]:
|
||||||
|
"""Fetch all new announcements"""
|
||||||
|
notifications: list[UforaNotification] = []
|
||||||
|
|
||||||
|
# No token provided, don't fetch announcements
|
||||||
|
if settings.UFORA_RSS_TOKEN is None:
|
||||||
|
return notifications
|
||||||
|
|
||||||
|
courses = await crud.get_courses_with_announcements(session)
|
||||||
|
|
||||||
|
for course in courses:
|
||||||
|
course_announcement_ids = list(map(lambda announcement: announcement.announcement_id, course.announcements))
|
||||||
|
|
||||||
|
course_url = (
|
||||||
|
f"https://ufora.ugent.be/d2l/le/news/rss/{course.course_id}/course?token={settings.UFORA_RSS_TOKEN}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Get the updated feed
|
||||||
|
feed = feedparser.parse(course_url)
|
||||||
|
|
||||||
|
# Remove old notifications
|
||||||
|
fresh_feed: list[dict] = []
|
||||||
|
for entry in feed["entries"]:
|
||||||
|
parsed = parse_ids(entry["id"])
|
||||||
|
if parsed is None:
|
||||||
|
continue
|
||||||
|
|
||||||
|
if parsed[0] not in course_announcement_ids:
|
||||||
|
fresh_feed.append(entry)
|
||||||
|
|
||||||
|
if fresh_feed:
|
||||||
|
for item in fresh_feed:
|
||||||
|
# Parse id's out
|
||||||
|
# Technically this can't happen but Mypy angry
|
||||||
|
parsed = parse_ids(item["id"])
|
||||||
|
|
||||||
|
if parsed is None:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Create a new notification
|
||||||
|
notification_id, course_id = parsed
|
||||||
|
notification = UforaNotification(item, course, notification_id, course_id)
|
||||||
|
notifications.append(notification)
|
||||||
|
|
||||||
|
# Create new db entry
|
||||||
|
await crud.create_new_announcement(session, notification_id, course, notification.published_dt)
|
||||||
|
|
||||||
|
return notifications
|
||||||
|
|
@ -6,23 +6,25 @@ plugins = [
|
||||||
"sqlalchemy.ext.mypy.plugin"
|
"sqlalchemy.ext.mypy.plugin"
|
||||||
]
|
]
|
||||||
[[tool.mypy.overrides]]
|
[[tool.mypy.overrides]]
|
||||||
module = "discord.*"
|
module = ["discord.*", "feedparser.*", "markdownify.*"]
|
||||||
ignore_missing_imports = true
|
ignore_missing_imports = true
|
||||||
|
|
||||||
[tool.pylint.master]
|
[tool.pylint.master]
|
||||||
disable = [
|
disable = [
|
||||||
"missing-module-docstring",
|
"missing-module-docstring",
|
||||||
"too-few-public-methods",
|
"too-few-public-methods",
|
||||||
"too-many-arguments"
|
"too-many-arguments",
|
||||||
|
"too-many-instance-attributes"
|
||||||
]
|
]
|
||||||
|
|
||||||
[tool.pylint.format]
|
[tool.pylint.format]
|
||||||
max-line-length = 120
|
max-line-length = 120
|
||||||
good-names = ["i"]
|
good-names = ["i", "dt"]
|
||||||
|
|
||||||
[tool.pytest.ini_options]
|
[tool.pytest.ini_options]
|
||||||
asyncio_mode = "auto"
|
asyncio_mode = "auto"
|
||||||
env = [
|
env = [
|
||||||
|
"TESTING = true",
|
||||||
"DB_NAME = didier_action",
|
"DB_NAME = didier_action",
|
||||||
"DB_USERNAME = postgres",
|
"DB_USERNAME = postgres",
|
||||||
"DB_HOST = localhost",
|
"DB_HOST = localhost",
|
||||||
|
|
|
||||||
|
|
@ -6,3 +6,4 @@ pytest==7.1.2
|
||||||
pytest-asyncio==0.18.3
|
pytest-asyncio==0.18.3
|
||||||
pytest-env==0.6.2
|
pytest-env==0.6.2
|
||||||
sqlalchemy2-stubs==0.0.2a23
|
sqlalchemy2-stubs==0.0.2a23
|
||||||
|
types-pytz==2021.3.8
|
||||||
|
|
@ -4,4 +4,7 @@ asyncpg==0.25.0
|
||||||
# Dev version of dpy
|
# Dev version of dpy
|
||||||
git+https://github.com/Rapptz/discord.py
|
git+https://github.com/Rapptz/discord.py
|
||||||
environs==9.5.0
|
environs==9.5.0
|
||||||
|
feedparser==6.0.10
|
||||||
|
markdownify==0.11.2
|
||||||
|
pytz==2022.1
|
||||||
sqlalchemy[asyncio]==1.4.37
|
sqlalchemy[asyncio]==1.4.37
|
||||||
|
|
|
||||||
|
|
@ -1,3 +1,5 @@
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
from environs import Env
|
from environs import Env
|
||||||
|
|
||||||
# Read the .env file (if present)
|
# Read the .env file (if present)
|
||||||
|
|
@ -7,6 +9,7 @@ env.read_env()
|
||||||
"""General config"""
|
"""General config"""
|
||||||
SANDBOX: bool = env.bool("SANDBOX", True)
|
SANDBOX: bool = env.bool("SANDBOX", True)
|
||||||
LOGFILE: str = env.str("LOGFILE", "didier.log")
|
LOGFILE: str = env.str("LOGFILE", "didier.log")
|
||||||
|
TESTING: bool = env.bool("TESTING", False)
|
||||||
|
|
||||||
"""Database"""
|
"""Database"""
|
||||||
DB_NAME: str = env.str("DB_NAME", "didier")
|
DB_NAME: str = env.str("DB_NAME", "didier")
|
||||||
|
|
@ -21,3 +24,7 @@ DISCORD_TOKEN: str = env.str("DISC_TOKEN")
|
||||||
DISCORD_READY_MESSAGE: str = env.str("DISC_READY_MESSAGE", "I'M READY I'M READY I'M READY")
|
DISCORD_READY_MESSAGE: str = env.str("DISC_READY_MESSAGE", "I'M READY I'M READY I'M READY")
|
||||||
DISCORD_STATUS_MESSAGE: str = env.str("DISC_STATUS_MESSAGE", "with your Didier Dinks.")
|
DISCORD_STATUS_MESSAGE: str = env.str("DISC_STATUS_MESSAGE", "with your Didier Dinks.")
|
||||||
DISCORD_TEST_GUILDS: list[int] = env.list("DISC_TEST_GUILDS", [], subcast=int)
|
DISCORD_TEST_GUILDS: list[int] = env.list("DISC_TEST_GUILDS", [], subcast=int)
|
||||||
|
UFORA_ANNOUNCEMENTS_CHANNEL: Optional[int] = env.int("UFORA_ANNOUNCEMENTS_CHANNEL", None)
|
||||||
|
|
||||||
|
"""API Keys"""
|
||||||
|
UFORA_RSS_TOKEN: Optional[str] = env.str("UFORA_RSS_TOKEN", None)
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue