mirror of https://github.com/stijndcl/didier
				
				
				
			
						commit
						1c249adb46
					
				|  | @ -0,0 +1,30 @@ | |||
| """Add free games | ||||
| 
 | ||||
| Revision ID: 9fb84b4d9f0b | ||||
| Revises: 11388e39bb90 | ||||
| Create Date: 2022-10-13 19:17:58.032182 | ||||
| 
 | ||||
| """ | ||||
| import sqlalchemy as sa | ||||
| 
 | ||||
| from alembic import op | ||||
| 
 | ||||
| # revision identifiers, used by Alembic. | ||||
| revision = "9fb84b4d9f0b" | ||||
| down_revision = "11388e39bb90" | ||||
| branch_labels = None | ||||
| depends_on = None | ||||
| 
 | ||||
| 
 | ||||
| def upgrade() -> None: | ||||
|     # ### commands auto generated by Alembic - please adjust! ### | ||||
|     op.create_table( | ||||
|         "free_games", sa.Column("free_game_id", sa.Integer(), nullable=False), sa.PrimaryKeyConstraint("free_game_id") | ||||
|     ) | ||||
|     # ### end Alembic commands ### | ||||
| 
 | ||||
| 
 | ||||
| def downgrade() -> None: | ||||
|     # ### commands auto generated by Alembic - please adjust! ### | ||||
|     op.drop_table("free_games") | ||||
|     # ### end Alembic commands ### | ||||
|  | @ -0,0 +1,20 @@ | |||
| from sqlalchemy import select | ||||
| from sqlalchemy.ext.asyncio import AsyncSession | ||||
| 
 | ||||
| from database.schemas import FreeGame | ||||
| 
 | ||||
| __all__ = ["add_free_games", "filter_present_games"] | ||||
| 
 | ||||
| 
 | ||||
| async def add_free_games(session: AsyncSession, game_ids: list[int]): | ||||
|     """Bulk-add a list of IDs into the database""" | ||||
|     games = [FreeGame(free_game_id=game_id) for game_id in game_ids] | ||||
|     session.add_all(games) | ||||
|     await session.commit() | ||||
| 
 | ||||
| 
 | ||||
| async def filter_present_games(session: AsyncSession, game_ids: list[int]) -> list[int]: | ||||
|     """Filter a list of game IDs down to the ones that aren't in the database yet""" | ||||
|     statement = select(FreeGame.free_game_id).where(FreeGame.free_game_id.in_(game_ids)) | ||||
|     matches: list[int] = (await session.execute(statement)).scalars().all() | ||||
|     return list(set(game_ids).difference(matches)) | ||||
|  | @ -33,6 +33,7 @@ __all__ = [ | |||
|     "DadJoke", | ||||
|     "Deadline", | ||||
|     "EasterEgg", | ||||
|     "FreeGame", | ||||
|     "GitHubLink", | ||||
|     "Link", | ||||
|     "MemeTemplate", | ||||
|  | @ -174,6 +175,14 @@ class EasterEgg(Base): | |||
|     startswith: bool = Column(Boolean, nullable=False, server_default="1") | ||||
| 
 | ||||
| 
 | ||||
| class FreeGame(Base): | ||||
|     """A temporarily free game""" | ||||
| 
 | ||||
|     __tablename__ = "free_games" | ||||
| 
 | ||||
|     free_game_id: int = Column(Integer, primary_key=True) | ||||
| 
 | ||||
| 
 | ||||
| class GitHubLink(Base): | ||||
|     """A user's GitHub link""" | ||||
| 
 | ||||
|  |  | |||
|  | @ -18,7 +18,8 @@ from didier.data.embeds.schedules import ( | |||
|     get_schedule_for_day, | ||||
|     parse_schedule_from_content, | ||||
| ) | ||||
| from didier.data.embeds.ufora.announcements import fetch_ufora_announcements | ||||
| from didier.data.rss_feeds.free_games import fetch_free_games | ||||
| from didier.data.rss_feeds.ufora import fetch_ufora_announcements | ||||
| from didier.decorators.tasks import timed_task | ||||
| from didier.utils.discord.checks import is_owner | ||||
| from didier.utils.types.datetime import LOCAL_TIMEZONE, tz_aware_now | ||||
|  | @ -48,6 +49,7 @@ class Tasks(commands.Cog): | |||
| 
 | ||||
|         self._tasks = { | ||||
|             "birthdays": self.check_birthdays, | ||||
|             "free_games": self.pull_free_games, | ||||
|             "schedules": self.pull_schedules, | ||||
|             "reminders": self.reminders, | ||||
|             "ufora": self.pull_ufora_announcements, | ||||
|  | @ -61,12 +63,17 @@ class Tasks(commands.Cog): | |||
|         if settings.BIRTHDAY_ANNOUNCEMENT_CHANNEL is not None: | ||||
|             self.check_birthdays.start() | ||||
| 
 | ||||
|         # Only pull free gmaes if a channel was provided | ||||
|         if settings.FREE_GAMES_CHANNEL is not None: | ||||
|             self.pull_free_games.start() | ||||
| 
 | ||||
|         # Only pull announcements if a token was provided | ||||
|         if settings.UFORA_RSS_TOKEN is not None and settings.UFORA_ANNOUNCEMENTS_CHANNEL is not None: | ||||
|             self.pull_ufora_announcements.start() | ||||
|             self.remove_old_ufora_announcements.start() | ||||
| 
 | ||||
|         # Start other tasks | ||||
|         self.init_schedules.start() | ||||
|         self.reminders.start() | ||||
|         self.reset_wordle_word.start() | ||||
|         self.pull_schedules.start() | ||||
|  | @ -128,6 +135,36 @@ class Tasks(commands.Cog): | |||
|     async def _before_check_birthdays(self): | ||||
|         await self.client.wait_until_ready() | ||||
| 
 | ||||
|     @tasks.loop(count=1) | ||||
|     async def init_schedules(self, **kwargs): | ||||
|         """Tasks that loads the schedules in memory on startup""" | ||||
|         _ = kwargs | ||||
|         await self.client.load_schedules() | ||||
| 
 | ||||
|     @init_schedules.before_loop | ||||
|     async def _before_init_schedules(self): | ||||
|         await self.client.wait_until_ready() | ||||
| 
 | ||||
|     @tasks.loop(minutes=15) | ||||
|     async def pull_free_games(self, **kwargs): | ||||
|         """Task that checks for free games occasionally""" | ||||
|         _ = kwargs | ||||
| 
 | ||||
|         # No channel to send the embeds to | ||||
|         if settings.FREE_GAMES_CHANNEL is None: | ||||
|             return | ||||
| 
 | ||||
|         async with self.client.postgres_session as session: | ||||
|             games = await fetch_free_games(self.client.http_session, session) | ||||
|             channel = self.client.get_channel(settings.FREE_GAMES_CHANNEL) | ||||
| 
 | ||||
|             for game in games: | ||||
|                 await channel.send(embed=game.to_embed()) | ||||
| 
 | ||||
|     @pull_free_games.before_loop | ||||
|     async def _before_free_games(self): | ||||
|         await self.client.wait_until_ready() | ||||
| 
 | ||||
|     @tasks.loop(time=DAILY_RESET_TIME) | ||||
|     @timed_task(enums.TaskType.SCHEDULES) | ||||
|     async def pull_schedules(self, **kwargs): | ||||
|  | @ -166,6 +203,10 @@ class Tasks(commands.Cog): | |||
|         # Only replace cached version if all schedules succeeded | ||||
|         self.client.schedules = new_schedules | ||||
| 
 | ||||
|     @pull_schedules.before_loop | ||||
|     async def _before_pull_schedules(self): | ||||
|         await self.client.wait_until_ready() | ||||
| 
 | ||||
|     @tasks.loop(minutes=10) | ||||
|     @timed_task(enums.TaskType.UFORA_ANNOUNCEMENTS) | ||||
|     async def pull_ufora_announcements(self, **kwargs): | ||||
|  |  | |||
|  | @ -0,0 +1,122 @@ | |||
| import html | ||||
| from typing import Optional | ||||
| 
 | ||||
| import discord | ||||
| from aiohttp import ClientSession | ||||
| from overrides import overrides | ||||
| from pydantic import validator | ||||
| 
 | ||||
| from didier.data.embeds.base import EmbedPydantic | ||||
| from didier.data.scrapers.common import GameStorePage | ||||
| from didier.data.scrapers.steam import get_steam_webpage_info | ||||
| from didier.utils.discord import colours | ||||
| 
 | ||||
| __all__ = ["SEPARATOR", "FreeGameEmbed"] | ||||
| 
 | ||||
| from didier.utils.discord.constants import Limits | ||||
| from didier.utils.types.string import abbreviate | ||||
| 
 | ||||
| SEPARATOR = " • Free • " | ||||
| 
 | ||||
| 
 | ||||
| def _get_store_info(store: str) -> tuple[Optional[str], discord.Colour]: | ||||
|     """Get the image url for a given store""" | ||||
|     store = store.lower() | ||||
| 
 | ||||
|     if "epic" in store: | ||||
|         return ( | ||||
|             "https://cdn2.unrealengine.com/" | ||||
|             "Unreal+Engine%2Feg-logo-filled-1255x1272-0eb9d144a0f981d1cbaaa1eb957de7a3207b31bb.png", | ||||
|             colours.epic_games_white(), | ||||
|         ) | ||||
| 
 | ||||
|     if "gog" in store: | ||||
|         return ( | ||||
|             "https://upload.wikimedia.org/wikipedia/commons/thumb/2/2e/GOG.com_logo.svg/1679px-GOG.com_logo.svg.png", | ||||
|             colours.gog_purple(), | ||||
|         ) | ||||
| 
 | ||||
|     if "steam" in store: | ||||
|         return ( | ||||
|             "https://upload.wikimedia.org/wikipedia/commons/thumb/8/83/" | ||||
|             "Steam_icon_logo.svg/2048px-Steam_icon_logo.svg.png", | ||||
|             colours.steam_blue(), | ||||
|         ) | ||||
| 
 | ||||
|     return None, discord.Colour.random() | ||||
| 
 | ||||
| 
 | ||||
| class FreeGameEmbed(EmbedPydantic): | ||||
|     """Embed for free games""" | ||||
| 
 | ||||
|     dc_identifier: int | ||||
|     link: str | ||||
|     title: str | ||||
| 
 | ||||
|     name: Optional[str] = None | ||||
|     store: Optional[str] = None | ||||
| 
 | ||||
|     store_page: Optional[GameStorePage] = None | ||||
| 
 | ||||
|     @validator("title") | ||||
|     def _clean_title(cls, value: str) -> str: | ||||
|         return html.unescape(value) | ||||
| 
 | ||||
|     async def update(self, http_session: ClientSession): | ||||
|         """Scrape the store page to fetch some information""" | ||||
|         self.name, self.store = self.title.split(SEPARATOR) | ||||
| 
 | ||||
|         store = (self.store or "").lower() | ||||
| 
 | ||||
|         if "steam" in store: | ||||
|             self.store_page = await get_steam_webpage_info(http_session, self.link) | ||||
|         elif "epic" in store: | ||||
|             self.link = "https://store.epicgames.com/free-games" | ||||
| 
 | ||||
|         if self.store_page is not None and self.store_page.url is not None: | ||||
|             self.link = self.store_page.url | ||||
| 
 | ||||
|     @overrides | ||||
|     def to_embed(self, **kwargs) -> discord.Embed: | ||||
|         embed = discord.Embed() | ||||
|         embed.set_author(name=self.store) | ||||
| 
 | ||||
|         store_image, store_colour = _get_store_info(self.store or "") | ||||
|         if store_image is not None: | ||||
|             embed.set_thumbnail(url=store_image) | ||||
| 
 | ||||
|         # Populate with scraped info | ||||
|         if self.store_page is not None: | ||||
|             embed.title = self.store_page.title | ||||
|             embed.set_image(url=self.store_page.image) | ||||
|             embed.description = abbreviate(self.store_page.description, Limits.EMBED_DESCRIPTION_LENGTH) | ||||
| 
 | ||||
|             if self.store_page.original_price is not None and self.store_page.discounted_price is not None: | ||||
|                 if self.store_page.discount_percentage is not None: | ||||
|                     discount_pct_str = f" ({self.store_page.discount_percentage})" | ||||
|                 else: | ||||
|                     discount_pct_str = "" | ||||
| 
 | ||||
|                 embed.add_field( | ||||
|                     name="Price", | ||||
|                     value=f"~~{self.store_page.original_price}~~ **{self.store_page.discounted_price}** " | ||||
|                     f"{discount_pct_str}", | ||||
|                     inline=False, | ||||
|                 ) | ||||
| 
 | ||||
|             embed.add_field(name="Open in browser", value=f"[{self.link}]({self.link})") | ||||
| 
 | ||||
|             if self.store_page.xdg_open_url is not None: | ||||
| 
 | ||||
|                 embed.add_field( | ||||
|                     name="Open in app", value=f"[{self.store_page.xdg_open_url}]({self.store_page.xdg_open_url})" | ||||
|                 ) | ||||
|         else: | ||||
|             embed.title = self.name | ||||
|             embed.add_field(name="Open in browser", value=f"[{self.link}]({self.link})") | ||||
| 
 | ||||
|         embed.url = self.link | ||||
| 
 | ||||
|         embed.colour = store_colour | ||||
| 
 | ||||
|         return embed | ||||
|  | @ -1,18 +1,11 @@ | |||
| import re | ||||
| from dataclasses import dataclass, field | ||||
| from datetime import datetime | ||||
| from typing import Optional | ||||
| from zoneinfo import ZoneInfo | ||||
| 
 | ||||
| import async_timeout | ||||
| import discord | ||||
| import feedparser | ||||
| from aiohttp import ClientSession | ||||
| from markdownify import markdownify as md | ||||
| from sqlalchemy.ext.asyncio import AsyncSession | ||||
| 
 | ||||
| import settings | ||||
| from database.crud import ufora_announcements as crud | ||||
| from database.schemas import UforaCourse | ||||
| from didier.data.embeds.base import EmbedBaseModel | ||||
| from didier.utils.discord.colours import ghent_university_blue | ||||
|  | @ -20,8 +13,6 @@ from didier.utils.types.datetime import LOCAL_TIMEZONE, int_to_weekday | |||
| from didier.utils.types.string import leading | ||||
| 
 | ||||
| __all__ = [ | ||||
|     "fetch_ufora_announcements", | ||||
|     "parse_ids", | ||||
|     "UforaNotification", | ||||
| ] | ||||
| 
 | ||||
|  | @ -107,68 +98,3 @@ class UforaNotification(EmbedBaseModel): | |||
|             ":" | ||||
|             f"{leading('0', str(self.published_dt.second))}" | ||||
|         ) | ||||
| 
 | ||||
| 
 | ||||
| def parse_ids(url: str) -> Optional[tuple[int, int]]: | ||||
|     """Parse the notification & course id out of a notification url""" | ||||
|     match = re.search(r"\d+-\d+$", url) | ||||
| 
 | ||||
|     if not match: | ||||
|         return None | ||||
| 
 | ||||
|     spl = match[0].split("-") | ||||
|     return int(spl[0]), int(spl[1]) | ||||
| 
 | ||||
| 
 | ||||
| async def fetch_ufora_announcements( | ||||
|     http_session: ClientSession, database_session: AsyncSession | ||||
| ) -> list[UforaNotification]: | ||||
|     """Fetch all new announcements""" | ||||
|     notifications: list[UforaNotification] = [] | ||||
| 
 | ||||
|     # No token provided, don't fetch announcements | ||||
|     if settings.UFORA_RSS_TOKEN is None: | ||||
|         return notifications | ||||
| 
 | ||||
|     courses = await crud.get_courses_with_announcements(database_session) | ||||
| 
 | ||||
|     for course in courses: | ||||
|         course_announcement_ids = list(map(lambda announcement: announcement.announcement_id, course.announcements)) | ||||
| 
 | ||||
|         course_url = ( | ||||
|             f"https://ufora.ugent.be/d2l/le/news/rss/{course.course_id}/course?token={settings.UFORA_RSS_TOKEN}" | ||||
|         ) | ||||
| 
 | ||||
|         # Get the updated feed | ||||
|         with async_timeout.timeout(10): | ||||
|             async with http_session.get(course_url) as response: | ||||
|                 feed = feedparser.parse(await response.text()) | ||||
| 
 | ||||
|         # Remove old notifications | ||||
|         fresh_feed: list[dict] = [] | ||||
|         for entry in feed["entries"]: | ||||
|             parsed = parse_ids(entry["id"]) | ||||
|             if parsed is None: | ||||
|                 continue | ||||
| 
 | ||||
|             if parsed[0] not in course_announcement_ids: | ||||
|                 fresh_feed.append(entry) | ||||
| 
 | ||||
|         if fresh_feed: | ||||
|             for item in fresh_feed: | ||||
|                 # Parse id's out | ||||
|                 # Technically this can't happen but Mypy angry | ||||
|                 parsed = parse_ids(item["id"]) | ||||
| 
 | ||||
|                 if parsed is None: | ||||
|                     continue | ||||
| 
 | ||||
|                 # Create a new notification | ||||
|                 notification_id, course_id = parsed | ||||
|                 notification = UforaNotification(item, course, notification_id, course_id) | ||||
|                 notifications.append(notification) | ||||
| 
 | ||||
|                 # Create new db entry | ||||
|                 await crud.create_new_announcement(database_session, notification_id, course, notification.published_dt) | ||||
| 
 | ||||
|     return notifications | ||||
|  |  | |||
|  | @ -0,0 +1,51 @@ | |||
| import logging | ||||
| from http import HTTPStatus | ||||
| 
 | ||||
| import feedparser | ||||
| from aiohttp import ClientSession | ||||
| from sqlalchemy.ext.asyncio import AsyncSession | ||||
| 
 | ||||
| from database.crud.free_games import add_free_games, filter_present_games | ||||
| from didier.data.embeds.free_games import SEPARATOR, FreeGameEmbed | ||||
| 
 | ||||
| logger = logging.getLogger(__name__) | ||||
| 
 | ||||
| 
 | ||||
| __all__ = ["fetch_free_games"] | ||||
| 
 | ||||
| 
 | ||||
| async def fetch_free_games(http_session: ClientSession, database_session: AsyncSession) -> list[FreeGameEmbed]: | ||||
|     """Get a fresh list of free games""" | ||||
|     url = "https://pepeizqdeals.com/?call_custom_simple_rss=1&csrp_cat=12" | ||||
|     async with http_session.get(url) as response: | ||||
|         if response.status != HTTPStatus.OK: | ||||
|             logger.error("Free games GET-request failed with status code %d." % response.status) | ||||
|             return [] | ||||
| 
 | ||||
|         feed = feedparser.parse(await response.text()) | ||||
| 
 | ||||
|     games: list[FreeGameEmbed] = [] | ||||
|     game_ids: list[int] = [] | ||||
| 
 | ||||
|     for entry in feed["entries"]: | ||||
|         # Game isn't free | ||||
|         if SEPARATOR not in entry["title"]: | ||||
|             continue | ||||
| 
 | ||||
|         game = FreeGameEmbed.parse_obj(entry) | ||||
|         games.append(game) | ||||
|         game_ids.append(game.dc_identifier) | ||||
| 
 | ||||
|     # Filter out games that we already know | ||||
|     filtered_ids = await filter_present_games(database_session, game_ids) | ||||
| 
 | ||||
|     # Insert new games into the database | ||||
|     await add_free_games(database_session, filtered_ids) | ||||
| 
 | ||||
|     games = list(filter(lambda x: x.dc_identifier in filtered_ids, games)) | ||||
| 
 | ||||
|     # Look up additional info | ||||
|     for game in games: | ||||
|         await game.update(http_session) | ||||
| 
 | ||||
|     return games | ||||
|  | @ -0,0 +1,78 @@ | |||
| import re | ||||
| from typing import Optional | ||||
| 
 | ||||
| import async_timeout | ||||
| import feedparser | ||||
| from aiohttp import ClientSession | ||||
| from sqlalchemy.ext.asyncio import AsyncSession | ||||
| 
 | ||||
| import settings | ||||
| from database.crud import ufora_announcements as crud | ||||
| from didier.data.embeds.ufora.announcements import UforaNotification | ||||
| 
 | ||||
| __all__ = ["parse_ids", "fetch_ufora_announcements"] | ||||
| 
 | ||||
| 
 | ||||
| def parse_ids(url: str) -> Optional[tuple[int, int]]: | ||||
|     """Parse the notification & course id out of a notification url""" | ||||
|     match = re.search(r"\d+-\d+$", url) | ||||
| 
 | ||||
|     if not match: | ||||
|         return None | ||||
| 
 | ||||
|     spl = match[0].split("-") | ||||
|     return int(spl[0]), int(spl[1]) | ||||
| 
 | ||||
| 
 | ||||
| async def fetch_ufora_announcements( | ||||
|     http_session: ClientSession, database_session: AsyncSession | ||||
| ) -> list[UforaNotification]: | ||||
|     """Fetch all new announcements""" | ||||
|     notifications: list[UforaNotification] = [] | ||||
| 
 | ||||
|     # No token provided, don't fetch announcements | ||||
|     if settings.UFORA_RSS_TOKEN is None: | ||||
|         return notifications | ||||
| 
 | ||||
|     courses = await crud.get_courses_with_announcements(database_session) | ||||
| 
 | ||||
|     for course in courses: | ||||
|         course_announcement_ids = list(map(lambda announcement: announcement.announcement_id, course.announcements)) | ||||
| 
 | ||||
|         course_url = ( | ||||
|             f"https://ufora.ugent.be/d2l/le/news/rss/{course.course_id}/course?token={settings.UFORA_RSS_TOKEN}" | ||||
|         ) | ||||
| 
 | ||||
|         # Get the updated feed | ||||
|         with async_timeout.timeout(10): | ||||
|             async with http_session.get(course_url) as response: | ||||
|                 feed = feedparser.parse(await response.text()) | ||||
| 
 | ||||
|         # Remove old notifications | ||||
|         fresh_feed: list[dict] = [] | ||||
|         for entry in feed["entries"]: | ||||
|             parsed = parse_ids(entry["id"]) | ||||
|             if parsed is None: | ||||
|                 continue | ||||
| 
 | ||||
|             if parsed[0] not in course_announcement_ids: | ||||
|                 fresh_feed.append(entry) | ||||
| 
 | ||||
|         if fresh_feed: | ||||
|             for item in fresh_feed: | ||||
|                 # Parse id's out | ||||
|                 # Technically this can't happen but Mypy angry | ||||
|                 parsed = parse_ids(item["id"]) | ||||
| 
 | ||||
|                 if parsed is None: | ||||
|                     continue | ||||
| 
 | ||||
|                 # Create a new notification | ||||
|                 notification_id, course_id = parsed | ||||
|                 notification = UforaNotification(item, course, notification_id, course_id) | ||||
|                 notifications.append(notification) | ||||
| 
 | ||||
|                 # Create new db entry | ||||
|                 await crud.create_new_announcement(database_session, notification_id, course, notification.published_dt) | ||||
| 
 | ||||
|     return notifications | ||||
|  | @ -0,0 +1,58 @@ | |||
| from dataclasses import dataclass | ||||
| from typing import Optional, cast | ||||
| 
 | ||||
| from bs4 import BeautifulSoup, Tag | ||||
| 
 | ||||
| __all__ = ["GameStorePage", "parse_open_graph_tags"] | ||||
| 
 | ||||
| 
 | ||||
| @dataclass | ||||
| class GameStorePage: | ||||
|     """Dataclass for information on a game's store page""" | ||||
| 
 | ||||
|     description: str | ||||
|     image: str | ||||
|     title: str | ||||
|     xdg_open_url: Optional[str] = None | ||||
|     url: Optional[str] = None | ||||
|     discount_expiry: Optional[int] = None  # TODO | ||||
|     discounted_price: Optional[str] = None | ||||
|     original_price: Optional[str] = None | ||||
|     discount_percentage: Optional[str] = None | ||||
| 
 | ||||
| 
 | ||||
| def parse_open_graph_tags(soup: BeautifulSoup) -> Optional[GameStorePage]: | ||||
|     """Parse Open Graph Protocol tags out of a webpage | ||||
| 
 | ||||
|     If any of the required tags were not found, this returns None | ||||
|     """ | ||||
|     head = soup.find("head") | ||||
| 
 | ||||
|     if head is None: | ||||
|         return None | ||||
| 
 | ||||
|     head = cast(Tag, head) | ||||
| 
 | ||||
|     title_tag = head.find("meta", property="og:title") | ||||
|     if title_tag is None: | ||||
|         return None | ||||
| 
 | ||||
|     description_tag = head.find("meta", property="og:description") | ||||
|     if description_tag is None: | ||||
|         return None | ||||
| 
 | ||||
|     image_tag = head.find("meta", property="og:image") | ||||
|     if image_tag is None: | ||||
|         return None | ||||
| 
 | ||||
|     url_tag = head.find("meta", property="og:url") | ||||
|     if url_tag is None: | ||||
|         url = None | ||||
|     else: | ||||
|         url = str(url_tag["content"])  # type: ignore | ||||
| 
 | ||||
|     description = str(description_tag["content"])  # type: ignore | ||||
|     image = str(image_tag["content"])  # type: ignore | ||||
|     title = str(title_tag["content"])  # type: ignore | ||||
| 
 | ||||
|     return GameStorePage(title=title, description=description, url=url, image=image) | ||||
|  | @ -74,15 +74,10 @@ def get_search_results(bs: BeautifulSoup) -> list[str]: | |||
| 
 | ||||
| async def google_search(http_client: ClientSession, query: str): | ||||
|     """Get the first 10 Google search results""" | ||||
|     headers = { | ||||
|         "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_4) AppleWebKit/537.36 (KHTML, like Gecko) " | ||||
|         "Chrome/83.0.4103.97 Safari/537.36" | ||||
|     } | ||||
| 
 | ||||
|     query = urlencode({"q": query}) | ||||
| 
 | ||||
|     # Request 20 results in case of duplicates, bad matches, ... | ||||
|     async with http_client.get(f"https://www.google.com/search?{query}&num=20&hl=en", headers=headers) as response: | ||||
|     async with http_client.get(f"https://www.google.com/search?{query}&num=20&hl=en") as response: | ||||
|         # Something went wrong | ||||
|         if response.status != http.HTTPStatus.OK: | ||||
|             return SearchData(query, response.status) | ||||
|  |  | |||
|  | @ -0,0 +1,123 @@ | |||
| import re | ||||
| from dataclasses import dataclass | ||||
| from http import HTTPStatus | ||||
| from typing import Optional, cast | ||||
| 
 | ||||
| from aiohttp import ClientSession | ||||
| from bs4 import BeautifulSoup, Tag | ||||
| 
 | ||||
| from didier.data.scrapers.common import GameStorePage, parse_open_graph_tags | ||||
| 
 | ||||
| __all__ = ["get_steam_webpage_info"] | ||||
| 
 | ||||
| 
 | ||||
| @dataclass | ||||
| class _PriceInfo: | ||||
|     # These are strings because they aren't used as floats, | ||||
|     # and this avoids possible rounding errors | ||||
|     original_price: str | ||||
|     discounted_price: str | ||||
|     discount_percentage: Optional[str] | ||||
| 
 | ||||
|     def __post_init__(self): | ||||
|         """Fix the price formats""" | ||||
|         self.original_price = "€" + self.original_price.replace(",--", ",00").removesuffix("€") | ||||
|         self.discounted_price = "€" + self.discounted_price.replace(",--", ",00").removesuffix("€") | ||||
|         if self.discounted_price == "€0,00": | ||||
|             self.discounted_price = "Free" | ||||
| 
 | ||||
| 
 | ||||
| def _shorten_url(url: str) -> str: | ||||
|     match = re.search(r"https://store.steampowered.com/app/(\d+)/", url) | ||||
|     if match is None or not match.groups(): | ||||
|         return url | ||||
| 
 | ||||
|     return f"https://s.team/a/{match.groups()[0]}" | ||||
| 
 | ||||
| 
 | ||||
| def _parse_xdg_open_url(url: str) -> Optional[str]: | ||||
|     match = re.search(r"/app/(\d+)/", url) | ||||
|     if match is None or not match.groups(): | ||||
|         return None | ||||
| 
 | ||||
|     return f"steam://store/{match.groups()[0]}" | ||||
| 
 | ||||
| 
 | ||||
| def _get_steam_discounts(soup: BeautifulSoup) -> Optional[_PriceInfo]: | ||||
|     discount_wrapper_tag = soup.find("div", class_="discount_block") | ||||
|     if discount_wrapper_tag is None: | ||||
|         return None | ||||
| 
 | ||||
|     discount_wrapper_tag = cast(Tag, discount_wrapper_tag) | ||||
| 
 | ||||
|     # Parsing the original (non-discounted) price | ||||
|     original_price_tag = discount_wrapper_tag.find("div", class_="discount_original_price") | ||||
|     if original_price_tag is None: | ||||
|         return None | ||||
| 
 | ||||
|     original_price_tag = cast(Tag, original_price_tag) | ||||
|     original_price = original_price_tag.text | ||||
|     if original_price is None: | ||||
|         return None | ||||
| 
 | ||||
|     # Parsing the discounted price | ||||
|     discounted_price_tag = discount_wrapper_tag.find("div", class_="discount_final_price") | ||||
|     if discounted_price_tag is None: | ||||
|         return None | ||||
| 
 | ||||
|     discounted_price_tag = cast(Tag, discounted_price_tag) | ||||
|     discounted_price = discounted_price_tag.text | ||||
|     if discounted_price is None: | ||||
|         return None | ||||
| 
 | ||||
|     percentage_tag = discount_wrapper_tag.find("div", class_="discount_pct") | ||||
|     if percentage_tag is None: | ||||
|         percentage = None | ||||
|     else: | ||||
|         percentage = percentage_tag.text | ||||
| 
 | ||||
|     return _PriceInfo(original_price=original_price, discounted_price=discounted_price, discount_percentage=percentage) | ||||
| 
 | ||||
| 
 | ||||
| def _clean_title(title: str) -> str: | ||||
|     match = re.search(r"Save [\d,]+% on (.*) on Steam", title) | ||||
|     if match is None or not match.groups(): | ||||
|         return title | ||||
| 
 | ||||
|     return match.groups()[0] | ||||
| 
 | ||||
| 
 | ||||
| async def get_steam_webpage_info(http_session: ClientSession, url: str) -> Optional[GameStorePage]: | ||||
|     """Scrape a Steam page""" | ||||
|     # If not currently on a Steam page, follow a redirect chain until you are | ||||
|     if not url.startswith("https://store.steampowered.com/"): | ||||
|         async with http_session.head(url, allow_redirects=True) as response: | ||||
|             url = str(response.url) | ||||
| 
 | ||||
|     async with http_session.get(url) as response: | ||||
|         if response.status != HTTPStatus.OK: | ||||
|             return None | ||||
| 
 | ||||
|         page = await response.text() | ||||
| 
 | ||||
|     soup = BeautifulSoup(page, "html.parser") | ||||
| 
 | ||||
|     page_tags = parse_open_graph_tags(soup) | ||||
|     if page_tags is None: | ||||
|         return None | ||||
| 
 | ||||
|     if page_tags.url is None: | ||||
|         page_tags.url = url | ||||
| 
 | ||||
|     page_tags.title = _clean_title(page_tags.title) | ||||
|     page_tags.xdg_open_url = _parse_xdg_open_url(page_tags.url) | ||||
|     page_tags.url = _shorten_url(page_tags.url) | ||||
| 
 | ||||
|     price_info = _get_steam_discounts(soup) | ||||
| 
 | ||||
|     if price_info is not None: | ||||
|         page_tags.original_price = price_info.original_price | ||||
|         page_tags.discounted_price = price_info.discounted_price | ||||
|         page_tags.discount_percentage = price_info.discount_percentage | ||||
| 
 | ||||
|     return page_tags | ||||
|  | @ -78,9 +78,6 @@ class Didier(commands.Bot): | |||
|         # Create directories that are ignored on GitHub | ||||
|         self._create_ignored_directories() | ||||
| 
 | ||||
|         # Load schedules | ||||
|         await self.load_schedules() | ||||
| 
 | ||||
|         # Load the Wordle dictionary | ||||
|         self._load_wordle_words() | ||||
| 
 | ||||
|  | @ -90,7 +87,12 @@ class Didier(commands.Bot): | |||
|             await self.database_caches.initialize_caches(session) | ||||
| 
 | ||||
|         # Create aiohttp session | ||||
|         self.http_session = ClientSession() | ||||
|         self.http_session = ClientSession( | ||||
|             headers={ | ||||
|                 "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) " | ||||
|                 "Chrome/105.0.0.0 Safari/537.36" | ||||
|             } | ||||
|         ) | ||||
| 
 | ||||
|         # Load extensions | ||||
|         await self._load_initial_extensions() | ||||
|  |  | |||
|  | @ -1,15 +1,21 @@ | |||
| import discord | ||||
| 
 | ||||
| __all__ = [ | ||||
|     "epic_games_white", | ||||
|     "error_red", | ||||
|     "github_white", | ||||
|     "ghent_university_blue", | ||||
|     "ghent_university_yellow", | ||||
|     "google_blue", | ||||
|     "steam_blue", | ||||
|     "urban_dictionary_green", | ||||
| ] | ||||
| 
 | ||||
| 
 | ||||
| def epic_games_white() -> discord.Colour: | ||||
|     return discord.Colour.from_rgb(255, 255, 255) | ||||
| 
 | ||||
| 
 | ||||
| def error_red() -> discord.Colour: | ||||
|     return discord.Colour.red() | ||||
| 
 | ||||
|  | @ -26,9 +32,17 @@ def ghent_university_yellow() -> discord.Colour: | |||
|     return discord.Colour.from_rgb(255, 210, 0) | ||||
| 
 | ||||
| 
 | ||||
| def gog_purple() -> discord.Colour: | ||||
|     return discord.Colour.purple() | ||||
| 
 | ||||
| 
 | ||||
| def google_blue() -> discord.Colour: | ||||
|     return discord.Colour.from_rgb(66, 133, 244) | ||||
| 
 | ||||
| 
 | ||||
| def steam_blue() -> discord.Colour: | ||||
|     return discord.Colour.from_rgb(102, 192, 244) | ||||
| 
 | ||||
| 
 | ||||
| def urban_dictionary_green() -> discord.Colour: | ||||
|     return discord.Colour.from_rgb(220, 255, 0) | ||||
|  |  | |||
|  | @ -27,6 +27,8 @@ __all__ = [ | |||
|     "DISCORD_TEST_GUILDS", | ||||
|     "DISCORD_BOOS_REACT", | ||||
|     "DISCORD_CUSTOM_COMMAND_PREFIX", | ||||
|     "ERRORS_CHANNEL", | ||||
|     "FREE_GAMES_CHANNEL", | ||||
|     "UFORA_ANNOUNCEMENTS_CHANNEL", | ||||
|     "UFORA_RSS_TOKEN", | ||||
|     "IMGFLIP_NAME", | ||||
|  | @ -65,6 +67,7 @@ DISCORD_BOOS_REACT: str = env.str("DISCORD_BOOS_REACT", "<:boos:6296037858402631 | |||
| DISCORD_CUSTOM_COMMAND_PREFIX: str = env.str("DISCORD_CUSTOM_COMMAND_PREFIX", "?") | ||||
| BIRTHDAY_ANNOUNCEMENT_CHANNEL: Optional[int] = env.int("BIRTHDAY_ANNOUNCEMENT_CHANNEL", None) | ||||
| ERRORS_CHANNEL: Optional[int] = env.int("ERRORS_CHANNEL", None) | ||||
| FREE_GAMES_CHANNEL: Optional[int] = env.int("FREE_GAMES_CHANNEL", None) | ||||
| UFORA_ANNOUNCEMENTS_CHANNEL: Optional[int] = env.int("UFORA_ANNOUNCEMENTS_CHANNEL", None) | ||||
| 
 | ||||
| """Discord Role ID's""" | ||||
|  |  | |||
|  | @ -1,5 +1,7 @@ | |||
| import asyncio | ||||
| from typing import AsyncGenerator, Generator | ||||
| import json | ||||
| import pathlib | ||||
| from typing import AsyncGenerator, Generator, Union | ||||
| from unittest.mock import MagicMock | ||||
| 
 | ||||
| import pytest | ||||
|  | @ -66,3 +68,22 @@ def mock_client() -> Didier: | |||
|     mock_client.user = mock_user | ||||
| 
 | ||||
|     return mock_client | ||||
| 
 | ||||
| 
 | ||||
| """Data providers""" | ||||
| 
 | ||||
| 
 | ||||
| def _provide(name: str) -> Union[dict, str]: | ||||
|     location = pathlib.Path(__file__).parent / "test_data" / name | ||||
| 
 | ||||
|     with open(location, "r") as fp: | ||||
|         if name.endswith(".json"): | ||||
|             return json.load(fp) | ||||
| 
 | ||||
|         return fp.read() | ||||
| 
 | ||||
| 
 | ||||
| @pytest.fixture | ||||
| def free_games_response() -> str: | ||||
|     """Fixture to get an example response from the free games RSS feed""" | ||||
|     return _provide("free_games.rss") | ||||
|  |  | |||
|  | @ -0,0 +1,40 @@ | |||
| <?xml version="1.0" encoding="UTF-8"?> | ||||
| 	<rss version="2.0" xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:wfw="http://wellformedweb.org/CommentAPI/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:sy="http://purl.org/rss/1.0/modules/syndication/" xmlns:slash="http://purl.org/rss/1.0/modules/slash/" xmlns:media="http://search.yahoo.com/mrss/" xmlns:wp="http://wordpress.org/export/1.2/" xmlns:excerpt="http://wordpress.org/export/1.2/excerpt/"  > | ||||
| 		<channel> | ||||
| 		<title>pepeizq's deals</title> | ||||
|         <description>Follow the latest deals for PC games from legit stores such as Steam, Humble, Fanatical, Gamesplanet, GOG and more</description> | ||||
|         <link>https://pepeizqdeals.com</link> | ||||
| 		<lastBuildDate>Thu, 13 Oct 2022 17:11:24 +0000</lastBuildDate> | ||||
| 					<item> | ||||
| 							<title><![CDATA[Minion Masters – Torment • Free • Steam]]></title> | ||||
| 							<link><![CDATA[https://pepeizqdeals.com/55623/minion-masters-torment-free-steam/]]></link> | ||||
| 							<pubDate>Thu, 13 Oct 2022 18:08:41 +0100</pubDate> | ||||
| 							<dc:creator>pepeizq</dc:creator> | ||||
| 							<dc:identifier>55623</dc:identifier> | ||||
| 							<dc:modified>2022-10-13 18:08:59</dc:modified> | ||||
| 							<dc:created unix="1665684521">2022-10-13 18:08:41</dc:created> | ||||
| 							<guid isPermaLink="true"><![CDATA[https://pepeizqdeals.com/55623/minion-masters-torment-free-steam/]]></guid><category>12</category> | ||||
| 							<description><![CDATA[]]></description><content:encoded><![CDATA[]]></content:encoded><enclosure url="https://pepeizqdeals.com/wp-content/uploads/2022/10/imagenWeb286-19-8-510-en.webp"/><media:content url="https://pepeizqdeals.com/wp-content/uploads/2022/10/imagenWeb286-19-8-510-en.webp" height="150" width="150" type="image/jpeg"/> | ||||
| 					</item> | ||||
| 					<item> | ||||
| 							<title><![CDATA[Darkwood + ToeJam & Earl: Back in the Groove! • Free • Epic Games Store]]></title> | ||||
| 							<link><![CDATA[https://pepeizqdeals.com/55616/darkwood-toejam-earl-back-in-the-groove-free-epic-games-store/]]></link> | ||||
| 							<pubDate>Thu, 13 Oct 2022 17:03:59 +0100</pubDate> | ||||
| 							<dc:creator>pepeizq</dc:creator> | ||||
| 							<dc:identifier>55616</dc:identifier> | ||||
| 							<dc:modified>2022-10-13 17:04:17</dc:modified> | ||||
| 							<dc:created unix="1665680639">2022-10-13 17:03:59</dc:created> | ||||
| 							<guid isPermaLink="true"><![CDATA[https://pepeizqdeals.com/55616/darkwood-toejam-earl-back-in-the-groove-free-epic-games-store/]]></guid><category>12</category> | ||||
| 							<description><![CDATA[]]></description><content:encoded><![CDATA[]]></content:encoded><enclosure url="https://pepeizqdeals.com/wp-content/uploads/2022/10/imagenWeb286-18-3-139-en.webp"/><media:content url="https://pepeizqdeals.com/wp-content/uploads/2022/10/imagenWeb286-18-3-139-en.webp" height="150" width="150" type="image/jpeg"/> | ||||
| 					</item> | ||||
| 					<item> | ||||
| 							<title><![CDATA[Rebel Inc: Escalation – Sand & Secrets • Free • Steam]]></title> | ||||
| 							<link><![CDATA[https://pepeizqdeals.com/54874/rebel-inc-escalation-sand-secrets-free-steam/]]></link> | ||||
| 							<pubDate>Tue, 20 Sep 2022 18:08:52 +0100</pubDate> | ||||
| 							<dc:creator>pepeizq</dc:creator> | ||||
| 							<dc:identifier>54874</dc:identifier> | ||||
| 							<dc:modified>2022-09-20 18:09:03</dc:modified> | ||||
| 							<dc:created unix="1663697332">2022-09-20 18:08:52</dc:created> | ||||
| 							<guid isPermaLink="true"><![CDATA[https://pepeizqdeals.com/54874/rebel-inc-escalation-sand-secrets-free-steam/]]></guid><category>12</category> | ||||
| 							<description><![CDATA[]]></description><content:encoded><![CDATA[]]></content:encoded><enclosure url=""/><media:content url="" height="" width="" type=""/> | ||||
| 					</item></channel></rss><!-- end of xml string --> | ||||
|  | @ -0,0 +1,17 @@ | |||
| from sqlalchemy import select | ||||
| from sqlalchemy.ext.asyncio import AsyncSession | ||||
| 
 | ||||
| from database.crud import free_games as crud | ||||
| from database.schemas import FreeGame | ||||
| 
 | ||||
| 
 | ||||
| async def test_add_games(postgres: AsyncSession): | ||||
|     """Test adding new games""" | ||||
|     statement = select(FreeGame) | ||||
|     games = (await postgres.execute(statement)).scalars().all() | ||||
|     assert not games | ||||
| 
 | ||||
|     await crud.add_free_games(postgres, [1, 2, 3, 4]) | ||||
| 
 | ||||
|     games = (await postgres.execute(statement)).scalars().all() | ||||
|     assert len(games) == 4 | ||||
		Loading…
	
		Reference in New Issue