From 855f60727b08f68270fc62ce1e64c2b3477b4272 Mon Sep 17 00:00:00 2001 From: stijndcl Date: Thu, 13 Oct 2022 22:31:45 +0200 Subject: [PATCH] Scraping & parsing for Steam --- database/crud/free_games.py | 4 +- didier/data/embeds/free_games.py | 68 ++++++++-- didier/data/rss_feeds/free_games.py | 12 +- didier/data/scrapers/common.py | 58 +++++++++ didier/data/scrapers/google.py | 7 +- didier/data/scrapers/steam.py | 123 ++++++++++++++++++ didier/didier.py | 7 +- tests/conftest.py | 23 +++- tests/test_data/free_games.rss | 40 ++++++ .../test_crud/test_free_games.py | 17 +++ 10 files changed, 338 insertions(+), 21 deletions(-) create mode 100644 didier/data/scrapers/common.py create mode 100644 didier/data/scrapers/steam.py create mode 100644 tests/test_data/free_games.rss create mode 100644 tests/test_database/test_crud/test_free_games.py diff --git a/database/crud/free_games.py b/database/crud/free_games.py index 39b98b6..b2d835d 100644 --- a/database/crud/free_games.py +++ b/database/crud/free_games.py @@ -15,6 +15,6 @@ async def add_free_games(session: AsyncSession, game_ids: list[int]): async def filter_present_games(session: AsyncSession, game_ids: list[int]) -> list[int]: """Filter a list of game IDs down to the ones that aren't in the database yet""" - query = select(FreeGame.free_game_id).where(FreeGame.free_game_id.in_(game_ids)) - matches: list[int] = (await session.execute(query)).scalars().all() + statement = select(FreeGame.free_game_id).where(FreeGame.free_game_id.in_(game_ids)) + matches: list[int] = (await session.execute(statement)).scalars().all() return list(set(game_ids).difference(matches)) diff --git a/didier/data/embeds/free_games.py b/didier/data/embeds/free_games.py index 7949b30..a6b8421 100644 --- a/didier/data/embeds/free_games.py +++ b/didier/data/embeds/free_games.py @@ -2,14 +2,20 @@ import html from typing import Optional import discord +from aiohttp import ClientSession from overrides import overrides from pydantic import validator from didier.data.embeds.base import EmbedPydantic +from didier.data.scrapers.common import GameStorePage +from didier.data.scrapers.steam import get_steam_webpage_info from didier.utils.discord import colours __all__ = ["SEPARATOR", "FreeGameEmbed"] +from didier.utils.discord.constants import Limits +from didier.utils.types.string import abbreviate + SEPARATOR = " • Free • " @@ -45,23 +51,69 @@ class FreeGameEmbed(EmbedPydantic): dc_identifier: int link: str - summary: str = "" title: str + name: Optional[str] = None + store: Optional[str] = None + + store_page: Optional[GameStorePage] = None + @validator("title") def _clean_title(cls, value: str) -> str: return html.unescape(value) + async def update(self, http_session: ClientSession): + """Scrape the store page to fetch some information""" + self.name, self.store = self.title.split(SEPARATOR) + + store = (self.store or "").lower() + + if "steam" in store: + self.store_page = await get_steam_webpage_info(http_session, self.link) + + if self.store_page is not None and self.store_page.url is not None: + self.link = self.store_page.url + @overrides def to_embed(self, **kwargs) -> discord.Embed: - name, store = self.title.split(SEPARATOR) - embed = discord.Embed(title=name, url=self.link, description=self.summary or None) - embed.set_author(name=store) + embed = discord.Embed() + embed.set_author(name=self.store) - image, colour = _get_store_info(store) - if image is not None: - embed.set_thumbnail(url=image) + store_image, store_colour = _get_store_info(self.store) + if store_image is not None: + embed.set_thumbnail(url=store_image) - embed.colour = colour + # Populate with scraped info + if self.store_page is not None: + embed.title = self.store_page.title + embed.set_image(url=self.store_page.image) + embed.description = abbreviate(self.store_page.description, Limits.EMBED_DESCRIPTION_LENGTH) + + if self.store_page.original_price is not None and self.store_page.discounted_price is not None: + if self.store_page.discount_percentage is not None: + discount_pct_str = f" ({self.store_page.discount_percentage})" + else: + discount_pct_str = "" + + embed.add_field( + name="Price", + value=f"~~{self.store_page.original_price}~~ **{self.store_page.discounted_price}** " + f"{discount_pct_str}", + inline=False, + ) + + if self.store_page.xdg_open_url is not None: + embed.add_field(name="Open in browser", value=f"[{self.link}]({self.link})") + + embed.add_field( + name="Open in app", value=f"[{self.store_page.xdg_open_url}]({self.store_page.xdg_open_url})" + ) + else: + embed.title = self.name + embed.add_field(name="Open in browser", value=f"[{self.link}]({self.link})") + + embed.url = self.link + + embed.colour = store_colour return embed diff --git a/didier/data/rss_feeds/free_games.py b/didier/data/rss_feeds/free_games.py index abc8753..6aa576b 100644 --- a/didier/data/rss_feeds/free_games.py +++ b/didier/data/rss_feeds/free_games.py @@ -5,7 +5,7 @@ import feedparser from aiohttp import ClientSession from sqlalchemy.ext.asyncio import AsyncSession -from database.crud.free_games import add_free_games, filter_present_games +from database.crud.free_games import filter_present_games from didier.data.embeds.free_games import SEPARATOR, FreeGameEmbed logger = logging.getLogger(__name__) @@ -40,6 +40,12 @@ async def fetch_free_games(http_session: ClientSession, database_session: AsyncS filtered_ids = await filter_present_games(database_session, game_ids) # Insert new games into the database - await add_free_games(database_session, filtered_ids) + # await add_free_games(database_session, filtered_ids) TODO uncomment - return list(filter(lambda x: x.dc_identifier in filtered_ids, games)) + games = list(filter(lambda x: x.dc_identifier in filtered_ids, games)) + + # Look up additional info + for game in games: + await game.update(http_session) + + return games diff --git a/didier/data/scrapers/common.py b/didier/data/scrapers/common.py new file mode 100644 index 0000000..1fa973e --- /dev/null +++ b/didier/data/scrapers/common.py @@ -0,0 +1,58 @@ +from dataclasses import dataclass +from typing import Optional, cast + +from bs4 import BeautifulSoup, Tag + +__all__ = ["GameStorePage", "parse_open_graph_tags"] + + +@dataclass +class GameStorePage: + """Dataclass for information on a game's store page""" + + description: str + image: str + title: str + xdg_open_url: Optional[str] = None + url: Optional[str] = None + discount_expiry: Optional[int] = None + discounted_price: Optional[str] = None + original_price: Optional[str] = None + discount_percentage: Optional[str] = None + + +def parse_open_graph_tags(soup: BeautifulSoup) -> Optional[GameStorePage]: + """Parse Open Graph Protocol tags out of a webpage + + If any of the required tags were not found, this returns None + """ + head = soup.find("head") + + if head is None: + return None + + head = cast(Tag, head) + + title_tag = head.find("meta", property="og:title") + if title_tag is None: + return None + + description_tag = head.find("meta", property="og:description") + if description_tag is None: + return None + + image_tag = head.find("meta", property="og:image") + if image_tag is None: + return None + + url_tag = head.find("meta", property="og:url") + if url_tag is None: + url = None + else: + url = str(url_tag["content"]) # type: ignore + + description = str(description_tag["content"]) # type: ignore + image = str(image_tag["content"]) # type: ignore + title = str(title_tag["content"]) # type: ignore + + return GameStorePage(title=title, description=description, url=url, image=image) diff --git a/didier/data/scrapers/google.py b/didier/data/scrapers/google.py index 9ebb003..389e9ae 100644 --- a/didier/data/scrapers/google.py +++ b/didier/data/scrapers/google.py @@ -74,15 +74,10 @@ def get_search_results(bs: BeautifulSoup) -> list[str]: async def google_search(http_client: ClientSession, query: str): """Get the first 10 Google search results""" - headers = { - "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_4) AppleWebKit/537.36 (KHTML, like Gecko) " - "Chrome/83.0.4103.97 Safari/537.36" - } - query = urlencode({"q": query}) # Request 20 results in case of duplicates, bad matches, ... - async with http_client.get(f"https://www.google.com/search?{query}&num=20&hl=en", headers=headers) as response: + async with http_client.get(f"https://www.google.com/search?{query}&num=20&hl=en") as response: # Something went wrong if response.status != http.HTTPStatus.OK: return SearchData(query, response.status) diff --git a/didier/data/scrapers/steam.py b/didier/data/scrapers/steam.py new file mode 100644 index 0000000..2099cd1 --- /dev/null +++ b/didier/data/scrapers/steam.py @@ -0,0 +1,123 @@ +import re +from dataclasses import dataclass +from http import HTTPStatus +from typing import Optional, cast + +from aiohttp import ClientSession +from bs4 import BeautifulSoup, Tag + +from didier.data.scrapers.common import GameStorePage, parse_open_graph_tags + +__all__ = ["get_steam_webpage_info"] + + +@dataclass +class _PriceInfo: + # These are strings because they aren't used as floats, + # and this avoids possible rounding errors + original_price: str + discounted_price: str + discount_percentage: Optional[str] + + def __post_init__(self): + """Fix the price formats""" + self.original_price = "€" + self.original_price.replace(",--", ",00").removesuffix("€") + self.discounted_price = "€" + self.discounted_price.replace(",--", ",00").removesuffix("€") + if self.discounted_price == "€0,00": + self.discounted_price = "Free" + + +def _shorten_url(url: str) -> str: + match = re.search(r"https://store.steampowered.com/app/(\d+)/", url) + if match is None or not match.groups(): + return url + + return f"https://s.team/a/{match.groups()[0]}/" + + +def _parse_xdg_open_url(url: str) -> Optional[str]: + match = re.search(r"/app/(\d+)/", url) + if match is None or match.group() is None: + return None + + return f"steam://store/{match.group()}" + + +def _get_steam_discounts(soup: BeautifulSoup) -> Optional[_PriceInfo]: + discount_wrapper_tag = soup.find("div", class_="discount_block") + if discount_wrapper_tag is None: + return None + + discount_wrapper_tag = cast(Tag, discount_wrapper_tag) + + # Parsing the original (non-discounted) price + original_price_tag = discount_wrapper_tag.find("div", class_="discount_original_price") + if original_price_tag is None: + return None + + original_price_tag = cast(Tag, original_price_tag) + original_price = original_price_tag.text + if original_price is None: + return None + + # Parsing the discounted price + discounted_price_tag = discount_wrapper_tag.find("div", class_="discount_final_price") + if discounted_price_tag is None: + return None + + discounted_price_tag = cast(Tag, discounted_price_tag) + discounted_price = discounted_price_tag.text + if discounted_price is None: + return None + + percentage_tag = discount_wrapper_tag.find("div", class_="discount_pct") + if percentage_tag is None: + percentage = None + else: + percentage = percentage_tag.text + + return _PriceInfo(original_price=original_price, discounted_price=discounted_price, discount_percentage=percentage) + + +def _clean_title(title: str) -> str: + match = re.search(r"Save [\d,]+% on (.*) on Steam", title) + if match is None or not match.groups(): + return title + + return match.groups()[0] + + +async def get_steam_webpage_info(http_session: ClientSession, url: str) -> Optional[GameStorePage]: + """Scrape a Steam page""" + # If not currently on a Steam page, follow a redirect chain until you are + if not url.startswith("https://store.steampowered.com/"): + async with http_session.head(url, allow_redirects=True) as response: + url = str(response.url) + + async with http_session.get(url) as response: + if response.status != HTTPStatus.OK: + return None + + page = await response.text() + + soup = BeautifulSoup(page, "html.parser") + + page_tags = parse_open_graph_tags(soup) + if page_tags is None: + return None + + if page_tags.url is None: + page_tags.url = url + + page_tags.title = _clean_title(page_tags.title) + page_tags.url = _shorten_url(page_tags.url) + page_tags.xdg_open_url = _parse_xdg_open_url(page_tags.url) + + price_info = _get_steam_discounts(soup) + + if price_info is not None: + page_tags.original_price = price_info.original_price + page_tags.discounted_price = price_info.discounted_price + page_tags.discount_percentage = price_info.discount_percentage + + return page_tags diff --git a/didier/didier.py b/didier/didier.py index 06db727..ed1dd5b 100644 --- a/didier/didier.py +++ b/didier/didier.py @@ -90,7 +90,12 @@ class Didier(commands.Bot): await self.database_caches.initialize_caches(session) # Create aiohttp session - self.http_session = ClientSession() + self.http_session = ClientSession( + headers={ + "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/105.0.0.0 Safari/537.36" + } + ) # Load extensions await self._load_initial_extensions() diff --git a/tests/conftest.py b/tests/conftest.py index c218524..2a1a4a2 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,5 +1,7 @@ import asyncio -from typing import AsyncGenerator, Generator +import json +import pathlib +from typing import AsyncGenerator, Generator, Union from unittest.mock import MagicMock import pytest @@ -66,3 +68,22 @@ def mock_client() -> Didier: mock_client.user = mock_user return mock_client + + +"""Data providers""" + + +def _provide(name: str) -> Union[dict, str]: + location = pathlib.Path(__file__).parent / "test_data" / name + + with open(location, "r") as fp: + if name.endswith(".json"): + return json.load(fp) + + return fp.read() + + +@pytest.fixture +def free_games_response() -> str: + """Fixture to get an example response from the free games RSS feed""" + return _provide("free_games.rss") diff --git a/tests/test_data/free_games.rss b/tests/test_data/free_games.rss new file mode 100644 index 0000000..8df12a4 --- /dev/null +++ b/tests/test_data/free_games.rss @@ -0,0 +1,40 @@ + + + + pepeizq's deals + Follow the latest deals for PC games from legit stores such as Steam, Humble, Fanatical, Gamesplanet, GOG and more + https://pepeizqdeals.com + Thu, 13 Oct 2022 17:11:24 +0000 + + <![CDATA[Minion Masters – Torment • Free • Steam]]> + + Thu, 13 Oct 2022 18:08:41 +0100 + pepeizq + 55623 + 2022-10-13 18:08:59 + 2022-10-13 18:08:41 + 12 + + + + <![CDATA[Darkwood + ToeJam & Earl: Back in the Groove! • Free • Epic Games Store]]> + + Thu, 13 Oct 2022 17:03:59 +0100 + pepeizq + 55616 + 2022-10-13 17:04:17 + 2022-10-13 17:03:59 + 12 + + + + <![CDATA[Rebel Inc: Escalation – Sand & Secrets • Free • Steam]]> + + Tue, 20 Sep 2022 18:08:52 +0100 + pepeizq + 54874 + 2022-09-20 18:09:03 + 2022-09-20 18:08:52 + 12 + + diff --git a/tests/test_database/test_crud/test_free_games.py b/tests/test_database/test_crud/test_free_games.py new file mode 100644 index 0000000..d05df15 --- /dev/null +++ b/tests/test_database/test_crud/test_free_games.py @@ -0,0 +1,17 @@ +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from database.crud import free_games as crud +from database.schemas import FreeGame + + +async def test_add_games(postgres: AsyncSession): + """Test adding new games""" + statement = select(FreeGame) + games = (await postgres.execute(statement)).scalars().all() + assert not games + + await crud.add_free_games(postgres, [1, 2, 3, 4]) + + games = (await postgres.execute(statement)).scalars().all() + assert len(games) == 4