Scraping & parsing for Steam

stijndcl 2022-10-13 22:31:45 +02:00
parent deefeb1106
commit 855f60727b
10 changed files with 338 additions and 21 deletions

View File

@ -15,6 +15,6 @@ async def add_free_games(session: AsyncSession, game_ids: list[int]):
async def filter_present_games(session: AsyncSession, game_ids: list[int]) -> list[int]:
"""Filter a list of game IDs down to the ones that aren't in the database yet"""
query = select(FreeGame.free_game_id).where(FreeGame.free_game_id.in_(game_ids))
matches: list[int] = (await session.execute(query)).scalars().all()
statement = select(FreeGame.free_game_id).where(FreeGame.free_game_id.in_(game_ids))
matches: list[int] = (await session.execute(statement)).scalars().all()
return list(set(game_ids).difference(matches))

View File

@ -2,14 +2,20 @@ import html
from typing import Optional
import discord
from aiohttp import ClientSession
from overrides import overrides
from pydantic import validator
from import EmbedPydantic
from import GameStorePage
from import get_steam_webpage_info
from didier.utils.discord import colours
__all__ = ["SEPARATOR", "FreeGameEmbed"]
from didier.utils.discord.constants import Limits
from didier.utils.types.string import abbreviate
SEPARATOR = " • Free • "
@ -45,23 +51,69 @@ class FreeGameEmbed(EmbedPydantic):
dc_identifier: int
link: str
summary: str = ""
title: str
name: Optional[str] = None
store: Optional[str] = None
store_page: Optional[GameStorePage] = None
def _clean_title(cls, value: str) -> str:
return html.unescape(value)
async def update(self, http_session: ClientSession):
"""Scrape the store page to fetch some information""", = self.title.split(SEPARATOR)
store = ( or "").lower()
if "steam" in store:
self.store_page = await get_steam_webpage_info(http_session,
if self.store_page is not None and self.store_page.url is not None: = self.store_page.url
def to_embed(self, **kwargs) -> discord.Embed:
name, store = self.title.split(SEPARATOR)
embed = discord.Embed(title=name,, description=self.summary or None)
embed = discord.Embed()
image, colour = _get_store_info(store)
if image is not None:
store_image, store_colour = _get_store_info(
if store_image is not None:
embed.colour = colour
# Populate with scraped info
if self.store_page is not None:
embed.title = self.store_page.title
embed.description = abbreviate(self.store_page.description, Limits.EMBED_DESCRIPTION_LENGTH)
if self.store_page.original_price is not None and self.store_page.discounted_price is not None:
if self.store_page.discount_percentage is not None:
discount_pct_str = f" ({self.store_page.discount_percentage})"
discount_pct_str = ""
value=f"~~{self.store_page.original_price}~~ **{self.store_page.discounted_price}** "
if self.store_page.xdg_open_url is not None:
embed.add_field(name="Open in browser", value=f"[{}]({})")
name="Open in app", value=f"[{self.store_page.xdg_open_url}]({self.store_page.xdg_open_url})"
embed.title =
embed.add_field(name="Open in browser", value=f"[{}]({})")
embed.url =
embed.colour = store_colour
return embed

View File

@ -5,7 +5,7 @@ import feedparser
from aiohttp import ClientSession
from sqlalchemy.ext.asyncio import AsyncSession
from database.crud.free_games import add_free_games, filter_present_games
from database.crud.free_games import filter_present_games
from import SEPARATOR, FreeGameEmbed
logger = logging.getLogger(__name__)
@ -40,6 +40,12 @@ async def fetch_free_games(http_session: ClientSession, database_session: AsyncS
filtered_ids = await filter_present_games(database_session, game_ids)
# Insert new games into the database
await add_free_games(database_session, filtered_ids)
# await add_free_games(database_session, filtered_ids) TODO uncomment
return list(filter(lambda x: x.dc_identifier in filtered_ids, games))
games = list(filter(lambda x: x.dc_identifier in filtered_ids, games))
# Look up additional info
for game in games:
await game.update(http_session)
return games

View File

@ -0,0 +1,58 @@
from dataclasses import dataclass
from typing import Optional, cast
from bs4 import BeautifulSoup, Tag
__all__ = ["GameStorePage", "parse_open_graph_tags"]
class GameStorePage:
"""Dataclass for information on a game's store page"""
description: str
image: str
title: str
xdg_open_url: Optional[str] = None
url: Optional[str] = None
discount_expiry: Optional[int] = None
discounted_price: Optional[str] = None
original_price: Optional[str] = None
discount_percentage: Optional[str] = None
def parse_open_graph_tags(soup: BeautifulSoup) -> Optional[GameStorePage]:
"""Parse Open Graph Protocol tags out of a webpage
If any of the required tags were not found, this returns None
head = soup.find("head")
if head is None:
return None
head = cast(Tag, head)
title_tag = head.find("meta", property="og:title")
if title_tag is None:
return None
description_tag = head.find("meta", property="og:description")
if description_tag is None:
return None
image_tag = head.find("meta", property="og:image")
if image_tag is None:
return None
url_tag = head.find("meta", property="og:url")
if url_tag is None:
url = None
url = str(url_tag["content"]) # type: ignore
description = str(description_tag["content"]) # type: ignore
image = str(image_tag["content"]) # type: ignore
title = str(title_tag["content"]) # type: ignore
return GameStorePage(title=title, description=description, url=url, image=image)

View File

@ -74,15 +74,10 @@ def get_search_results(bs: BeautifulSoup) -> list[str]:
async def google_search(http_client: ClientSession, query: str):
"""Get the first 10 Google search results"""
headers = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_4) AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/83.0.4103.97 Safari/537.36"
query = urlencode({"q": query})
# Request 20 results in case of duplicates, bad matches, ...
async with http_client.get(f"{query}&num=20&hl=en", headers=headers) as response:
async with http_client.get(f"{query}&num=20&hl=en") as response:
# Something went wrong
if response.status != http.HTTPStatus.OK:
return SearchData(query, response.status)

View File

@ -0,0 +1,123 @@
import re
from dataclasses import dataclass
from http import HTTPStatus
from typing import Optional, cast
from aiohttp import ClientSession
from bs4 import BeautifulSoup, Tag
from import GameStorePage, parse_open_graph_tags
__all__ = ["get_steam_webpage_info"]
class _PriceInfo:
# These are strings because they aren't used as floats,
# and this avoids possible rounding errors
original_price: str
discounted_price: str
discount_percentage: Optional[str]
def __post_init__(self):
"""Fix the price formats"""
self.original_price = "" + self.original_price.replace(",--", ",00").removesuffix("")
self.discounted_price = "" + self.discounted_price.replace(",--", ",00").removesuffix("")
if self.discounted_price == "€0,00":
self.discounted_price = "Free"
def _shorten_url(url: str) -> str:
match ="\d+)/", url)
if match is None or not match.groups():
return url
return f"{match.groups()[0]}/"
def _parse_xdg_open_url(url: str) -> Optional[str]:
match ="/app/(\d+)/", url)
if match is None or is None:
return None
return f"steam://store/{}"
def _get_steam_discounts(soup: BeautifulSoup) -> Optional[_PriceInfo]:
discount_wrapper_tag = soup.find("div", class_="discount_block")
if discount_wrapper_tag is None:
return None
discount_wrapper_tag = cast(Tag, discount_wrapper_tag)
# Parsing the original (non-discounted) price
original_price_tag = discount_wrapper_tag.find("div", class_="discount_original_price")
if original_price_tag is None:
return None
original_price_tag = cast(Tag, original_price_tag)
original_price = original_price_tag.text
if original_price is None:
return None
# Parsing the discounted price
discounted_price_tag = discount_wrapper_tag.find("div", class_="discount_final_price")
if discounted_price_tag is None:
return None
discounted_price_tag = cast(Tag, discounted_price_tag)
discounted_price = discounted_price_tag.text
if discounted_price is None:
return None
percentage_tag = discount_wrapper_tag.find("div", class_="discount_pct")
if percentage_tag is None:
percentage = None
percentage = percentage_tag.text
return _PriceInfo(original_price=original_price, discounted_price=discounted_price, discount_percentage=percentage)
def _clean_title(title: str) -> str:
match ="Save [\d,]+% on (.*) on Steam", title)
if match is None or not match.groups():
return title
return match.groups()[0]
async def get_steam_webpage_info(http_session: ClientSession, url: str) -> Optional[GameStorePage]:
"""Scrape a Steam page"""
# If not currently on a Steam page, follow a redirect chain until you are
if not url.startswith(""):
async with http_session.head(url, allow_redirects=True) as response:
url = str(response.url)
async with http_session.get(url) as response:
if response.status != HTTPStatus.OK:
return None
page = await response.text()
soup = BeautifulSoup(page, "html.parser")
page_tags = parse_open_graph_tags(soup)
if page_tags is None:
return None
if page_tags.url is None:
page_tags.url = url
page_tags.title = _clean_title(page_tags.title)
page_tags.url = _shorten_url(page_tags.url)
page_tags.xdg_open_url = _parse_xdg_open_url(page_tags.url)
price_info = _get_steam_discounts(soup)
if price_info is not None:
page_tags.original_price = price_info.original_price
page_tags.discounted_price = price_info.discounted_price
page_tags.discount_percentage = price_info.discount_percentage
return page_tags

View File

@ -90,7 +90,12 @@ class Didier(commands.Bot):
await self.database_caches.initialize_caches(session)
# Create aiohttp session
self.http_session = ClientSession()
self.http_session = ClientSession(
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/ Safari/537.36"
# Load extensions
await self._load_initial_extensions()

View File

@ -1,5 +1,7 @@
import asyncio
from typing import AsyncGenerator, Generator
import json
import pathlib
from typing import AsyncGenerator, Generator, Union
from unittest.mock import MagicMock
import pytest
@ -66,3 +68,22 @@ def mock_client() -> Didier:
mock_client.user = mock_user
return mock_client
"""Data providers"""
def _provide(name: str) -> Union[dict, str]:
location = pathlib.Path(__file__).parent / "test_data" / name
with open(location, "r") as fp:
if name.endswith(".json"):
return json.load(fp)
def free_games_response() -> str:
"""Fixture to get an example response from the free games RSS feed"""
return _provide("free_games.rss")

View File

@ -0,0 +1,40 @@
<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:content="" xmlns:wfw="" xmlns:dc="" xmlns:atom="" xmlns:sy="" xmlns:slash="" xmlns:media="" xmlns:wp="" xmlns:excerpt="" >
<title>pepeizq&#039;s deals</title>
<description>Follow the latest deals for PC games from legit stores such as Steam, Humble, Fanatical, Gamesplanet, GOG and more</description>
<lastBuildDate>Thu, 13 Oct 2022 17:11:24 +0000</lastBuildDate>
<title><![CDATA[Minion Masters &#8211; Torment • Free • Steam]]></title>
<pubDate>Thu, 13 Oct 2022 18:08:41 +0100</pubDate>
<dc:modified>2022-10-13 18:08:59</dc:modified>
<dc:created unix="1665684521">2022-10-13 18:08:41</dc:created>
<guid isPermaLink="true"><![CDATA[]]></guid><category>12</category>
<description><![CDATA[]]></description><content:encoded><![CDATA[]]></content:encoded><enclosure url=""/><media:content url="" height="150" width="150" type="image/jpeg"/>
<title><![CDATA[Darkwood + ToeJam &#038; Earl: Back in the Groove! • Free • Epic Games Store]]></title>
<pubDate>Thu, 13 Oct 2022 17:03:59 +0100</pubDate>
<dc:modified>2022-10-13 17:04:17</dc:modified>
<dc:created unix="1665680639">2022-10-13 17:03:59</dc:created>
<guid isPermaLink="true"><![CDATA[]]></guid><category>12</category>
<description><![CDATA[]]></description><content:encoded><![CDATA[]]></content:encoded><enclosure url=""/><media:content url="" height="150" width="150" type="image/jpeg"/>
<title><![CDATA[Rebel Inc: Escalation &#8211; Sand &#038; Secrets • Free • Steam]]></title>
<pubDate>Tue, 20 Sep 2022 18:08:52 +0100</pubDate>
<dc:modified>2022-09-20 18:09:03</dc:modified>
<dc:created unix="1663697332">2022-09-20 18:08:52</dc:created>
<guid isPermaLink="true"><![CDATA[]]></guid><category>12</category>
<description><![CDATA[]]></description><content:encoded><![CDATA[]]></content:encoded><enclosure url=""/><media:content url="" height="" width="" type=""/>
</item></channel></rss><!-- end of xml string -->

View File

@ -0,0 +1,17 @@
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from database.crud import free_games as crud
from database.schemas import FreeGame
async def test_add_games(postgres: AsyncSession):
"""Test adding new games"""
statement = select(FreeGame)
games = (await postgres.execute(statement)).scalars().all()
assert not games
await crud.add_free_games(postgres, [1, 2, 3, 4])
games = (await postgres.execute(statement)).scalars().all()
assert len(games) == 4