mirror of https://github.com/stijndcl/didier
Google search
parent
1aeaa71ef8
commit
0c810d84e9
|
@ -1,7 +1,10 @@
|
|||
from discord import app_commands
|
||||
from discord.ext import commands
|
||||
|
||||
from didier import Didier
|
||||
from didier.data.apis import urban_dictionary
|
||||
from didier.data.embeds.google import GoogleSearch
|
||||
from didier.data.scrapers import google
|
||||
|
||||
|
||||
class Other(commands.Cog):
|
||||
|
@ -15,9 +18,19 @@ class Other(commands.Cog):
|
|||
@commands.hybrid_command(name="define", description="Urban Dictionary", aliases=["Ud", "Urban"], usage="[Woord]")
|
||||
async def define(self, ctx: commands.Context, *, query: str):
|
||||
"""Look up the definition of a word on the Urban Dictionary"""
|
||||
async with ctx.typing():
|
||||
definitions = await urban_dictionary.lookup(self.client.http_session, query)
|
||||
await ctx.reply(embed=definitions[0].to_embed(), mention_author=False)
|
||||
|
||||
@commands.hybrid_command(name="google", description="Google search", usage="[Query]")
|
||||
@app_commands.describe(query="Search query")
|
||||
async def google(self, ctx: commands.Context, *, query: str):
|
||||
"""Google something"""
|
||||
async with ctx.typing():
|
||||
results = await google.google_search(self.client.http_session, query)
|
||||
embed = GoogleSearch(results).to_embed()
|
||||
await ctx.reply(embed=embed, mention_author=False)
|
||||
|
||||
|
||||
async def setup(client: Didier):
|
||||
"""Load the cog"""
|
||||
|
|
|
@ -0,0 +1,3 @@
|
|||
from .google_search import GoogleSearch
|
||||
|
||||
__all__ = ["GoogleSearch"]
|
|
@ -0,0 +1,48 @@
|
|||
from http import HTTPStatus
|
||||
|
||||
import discord
|
||||
|
||||
from didier.data.embeds.base import EmbedBaseModel
|
||||
from didier.data.scrapers.google import SearchData
|
||||
|
||||
__all__ = ["GoogleSearch"]
|
||||
|
||||
|
||||
class GoogleSearch(EmbedBaseModel):
|
||||
"""Embed to display Google search results"""
|
||||
|
||||
data: SearchData
|
||||
|
||||
def __init__(self, data: SearchData):
|
||||
self.data = data
|
||||
|
||||
def _error_embed(self) -> discord.Embed:
|
||||
"""Custom embed for unsuccessful requests"""
|
||||
embed = discord.Embed(colour=discord.Colour.red())
|
||||
embed.set_author(name="Google Search")
|
||||
|
||||
# Empty embed
|
||||
if not self.data.results:
|
||||
embed.description = "Geen resultaten gevonden"
|
||||
return embed
|
||||
|
||||
# Error embed
|
||||
embed.description = f"Status {self.data.status_code}"
|
||||
|
||||
return embed
|
||||
|
||||
def to_embed(self) -> discord.Embed:
|
||||
if not self.data.results or self.data.status_code != HTTPStatus.OK:
|
||||
return self._error_embed()
|
||||
|
||||
embed = discord.Embed(colour=discord.Colour.blue())
|
||||
embed.set_author(name="Google Search")
|
||||
embed.set_footer(text=self.data.result_stats or None)
|
||||
|
||||
# Add all results into the description
|
||||
results = []
|
||||
for index, url in enumerate(self.data.results):
|
||||
results.append(f"{index + 1}: {url}")
|
||||
|
||||
embed.description = "\n".join(results)
|
||||
return embed
|
|
@ -0,0 +1,85 @@
|
|||
import http
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Optional
|
||||
from urllib.parse import unquote_plus, urlencode
|
||||
|
||||
from aiohttp import ClientSession
|
||||
from bs4 import BeautifulSoup
|
||||
from bs4.element import Tag
|
||||
|
||||
__all__ = ["google_search", "SearchData"]
|
||||
|
||||
|
||||
@dataclass
|
||||
class SearchData:
|
||||
"""Dataclass to store some data about a search query"""
|
||||
|
||||
query: str
|
||||
status_code: int
|
||||
results: list[str] = field(default_factory=list)
|
||||
result_stats: str = ""
|
||||
|
||||
def __post_init__(self):
|
||||
self.query = unquote_plus(self.query)
|
||||
|
||||
|
||||
def get_result_stats(bs: BeautifulSoup) -> Optional[str]:
|
||||
"""Parse the result stats
|
||||
|
||||
Example result: "About 16.570.000 results (0,84 seconds)"
|
||||
"""
|
||||
stats = bs.find("div", id="result-stats").text
|
||||
return stats and stats.removesuffix("\xa0")
|
||||
|
||||
|
||||
def parse_result(element: Tag) -> Optional[str]:
|
||||
"""Parse 1 wrapper into a link"""
|
||||
a_tag = element.find("a", href=True)
|
||||
url = a_tag["href"]
|
||||
title = a_tag.find("h3")
|
||||
|
||||
if (
|
||||
url is None
|
||||
or not url.startswith(
|
||||
(
|
||||
"http://",
|
||||
"https://",
|
||||
)
|
||||
)
|
||||
or title is None
|
||||
):
|
||||
return None
|
||||
|
||||
text = unquote_plus(title.text)
|
||||
return f"[{text}]({url})"
|
||||
|
||||
|
||||
def get_search_results(bs: BeautifulSoup) -> list[str]:
|
||||
"""Parse the search results"""
|
||||
result_wrappers = bs.find_all("div", class_="g")
|
||||
results = filter(lambda x: x is not None, map(parse_result, result_wrappers))
|
||||
|
||||
# Remove duplicates
|
||||
# (sets don't preserve the order!)
|
||||
return list(dict.fromkeys(results))
|
||||
|
||||
|
||||
async def google_search(http_client: ClientSession, query: str):
|
||||
"""Get the first 10 Google search results"""
|
||||
headers = {
|
||||
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.97 Safari/537.36"
|
||||
}
|
||||
|
||||
query = urlencode({"q": query})
|
||||
|
||||
# Request 20 results in case of duplicates, bad matches, ...
|
||||
async with http_client.get(f"https://www.google.com/search?{query}&num=20&hl=en", headers=headers) as response:
|
||||
# Something went wrong
|
||||
if response.status != http.HTTPStatus.OK:
|
||||
return SearchData(query, response.status)
|
||||
|
||||
bs = BeautifulSoup(await response.text(), "html.parser")
|
||||
result_stats = get_result_stats(bs)
|
||||
results = get_search_results(bs)
|
||||
|
||||
return SearchData(query, 200, results[:10], result_stats)
|
|
@ -1,6 +1,7 @@
|
|||
aiohttp==3.8.1
|
||||
alembic==1.8.0
|
||||
asyncpg==0.25.0
|
||||
beautifulsoup4==4.11.1
|
||||
# Dev version of dpy
|
||||
git+https://github.com/Rapptz/discord.py
|
||||
environs==9.5.0
|
||||
|
|
Loading…
Reference in New Issue