from bs4 import BeautifulSoup from multiprocessing.dummy import Pool as ThreadPool from urllib.parse import urlunsplit import os import re import requests import shutil import sys DOMAIN = "4kwallpapers.com" class PageHandler: def __init__(self, domain, path): self.domain = domain self.path = path @property def url(self): return urlunsplit(("https", self.domain, self.path, "", "")) def get(self, path="", query="", soup=True, max_tries=5): r = requests.get(self.relative_url(path, query), allow_redirects=True) if r.status_code >= 400 and max_tries > 0: return self.get(path, query, soup, max_tries - 1) elif r.status_code != 200: raise requests.exceptions.RequestError() if soup: return BeautifulSoup(r.content, "html.parser") return r def relative_url(self, path="", query=""): return urlunsplit(("https", self.domain, "/".join((self.path, path.strip("/"))), query, "")).strip("/") class Scraper(PageHandler): def __init__(self, domain=DOMAIN): super().__init__(domain, "") self._categories = None @property def categories(self): if not self._categories: # Get them if not cached soup = self.get(soup=True) ul = soup.find("ul", class_="cats") cats = [] for li in ul.findAll("li"): anchor = li.find("a") cats.append(Category(anchor.get_text(), self.domain, anchor["href"].strip("/"))) self._categories = cats return self._categories @property def images(self): return sum([cat.images for cat in self.categories], []) def download(self, dir_path, cats=None): for cat in self.categories: if cats and cat.name in cats: cat.download(dir_path) class Category(PageHandler): def __init__(self, name, domain, path): super().__init__(domain, path) self.name = name self._images = None self._pool = ThreadPool(50) @property def images(self): if not self._images: # Get base page soup = self.get() # Get how many pages there are pages_p = soup.find("p", class_="pages") count = 1 # The paragraph doesn't exist if there's only one page if pages_p: anchors = pages_p.findAll("a") count = max([int(res.group(1)) for anchor in anchors if (res := re.match("\?page=([0-9]+)", anchor["href"]))] ) # Now, we get the URL for every wallpaper's page pages = [self.relative_url(query="page={}".format(i)) for i in range(1, count + 1)] def helper(page_num): soup = self.get(query="page={}".format(page_num)) pics_list = soup.find("div", id="pics-list") return [Image(self.domain, anchor["href"]) for anchor in pics_list.findAll("a", class_="wallpapers__canvas_image")] self._images = sum(self._pool.map(helper, range(1, count + 1)), []) return self._images def download(self, dir_path): dir_path = os.path.join(dir_path, self.name.replace("/", "-")) os.makedirs(dir_path, exist_ok=True) self._pool.map(lambda image: image.download(dir_path), self.images) class Image(PageHandler): def __init__(self, domain, path): super().__init__(domain, path) self._image_url = None def _get_urls(self): soup = self.get() self._image_url = urlunsplit(("https", self.domain, soup.find("a", class_="original")["href"].strip("/"), "", "")) @property def image_url(self): if not self._image_url: self._get_urls() return self._image_url def download(self, dir_path): filename = os.path.basename(self.image_url) with requests.get(self.image_url, stream=True) as r: with open(os.path.join(dir_path, filename), "wb") as f: shutil.copyfileobj(r.raw, f) if __name__ == "__main__": scraper = Scraper() if len(sys.argv) == 1: print("No path provided.") elif len(sys.argv) == 2: scraper.download(sys.argv[1]) else: scraper.download(sys.argv[1], sys.argv[2:])