150 lines
4.2 KiB
Python
150 lines
4.2 KiB
Python
from bs4 import BeautifulSoup
|
|
import requests
|
|
import re
|
|
from urllib.parse import urlunsplit
|
|
from multiprocessing.dummy import Pool as ThreadPool
|
|
import os
|
|
import shutil
|
|
|
|
|
|
DOMAIN = "4kwallpapers.com"
|
|
|
|
|
|
class PageHandler:
|
|
def __init__(self, domain, path):
|
|
self.domain = domain
|
|
self.path = path
|
|
|
|
@property
|
|
def url(self):
|
|
return urlunsplit(("https", self.domain, self.path, "", ""))
|
|
|
|
def get(self, path="", query="", soup=True):
|
|
r = requests.get(self.relative_url(path, query), allow_redirects=True)
|
|
|
|
if r.status_code != 200:
|
|
raise requests.exceptions.RequestException()
|
|
|
|
if soup:
|
|
return BeautifulSoup(r.content, "html.parser")
|
|
|
|
return r
|
|
|
|
def relative_url(self, path="", query=""):
|
|
return urlunsplit(("https", self.domain, "/".join((self.path,
|
|
path.strip("/"))),
|
|
query, "")).strip("/")
|
|
|
|
|
|
class Scraper(PageHandler):
|
|
def __init__(self, domain=DOMAIN):
|
|
super().__init__(domain, "")
|
|
|
|
self._categories = None
|
|
|
|
@property
|
|
def categories(self):
|
|
if not self._categories:
|
|
# Get them if not cached
|
|
soup = self.get(soup=True)
|
|
ul = soup.find("ul", class_="cats")
|
|
cats = []
|
|
|
|
for li in ul.findAll("li"):
|
|
anchor = li.find("a")
|
|
|
|
cats.append(Category(anchor.get_text(), self.domain,
|
|
anchor["href"].strip("/")))
|
|
|
|
self._categories = cats
|
|
|
|
return self._categories
|
|
|
|
@property
|
|
def images(self):
|
|
return sum([cat.images for cat in self.categories], [])
|
|
|
|
def download(self, dir_path):
|
|
for cat in self.categories:
|
|
cat.download(dir_path)
|
|
|
|
|
|
class Category(PageHandler):
|
|
def __init__(self, name, domain, path):
|
|
super().__init__(domain, path)
|
|
|
|
self.name = name
|
|
self._images = None
|
|
self._pool = ThreadPool(50)
|
|
|
|
@property
|
|
def images(self):
|
|
if not self._images:
|
|
# Get base page
|
|
soup = self.get()
|
|
|
|
# Get how many pages there are
|
|
pages_p = soup.find("p", class_="pages")
|
|
|
|
count = 1
|
|
|
|
# The paragraph doesn't exist if there's only one page
|
|
if pages_p:
|
|
anchors = pages_p.findAll("a")
|
|
count = max([int(res.group(1))
|
|
for anchor in anchors
|
|
if (res := re.match("\?page=([0-9]+)",
|
|
anchor["href"]))]
|
|
)
|
|
|
|
# Now, we get the URL for every wallpaper's page
|
|
pages = [self.relative_url(query="page={}".format(i))
|
|
for i in range(1, count + 1)]
|
|
|
|
def helper(page_num):
|
|
soup = self.get(query="page={}".format(page_num))
|
|
|
|
pics_list = soup.find("div", id="pics-list")
|
|
|
|
return [Image(self.domain, anchor["href"])
|
|
for anchor in pics_list.findAll("a", class_="wallpapers__canvas_image")]
|
|
|
|
self._images = sum(self._pool.map(helper, range(1, count + 1)), [])
|
|
|
|
return self._images
|
|
|
|
def download(self, dir_path):
|
|
dir_path = os.path.join(dir_path, self.name)
|
|
|
|
os.makedirs(dir_path, exist_ok=True)
|
|
|
|
self._pool.map(lambda image: image.download(dir_path), self.images)
|
|
|
|
class Image(PageHandler):
|
|
def __init__(self, domain, path):
|
|
super().__init__(domain, path)
|
|
|
|
self._image_url = None
|
|
|
|
def _get_urls(self):
|
|
soup = self.get()
|
|
|
|
self._image_url = urlunsplit(("https", self.domain,
|
|
soup.find("a",
|
|
class_="original")["href"].strip("/"),
|
|
"", ""))
|
|
|
|
@property
|
|
def image_url(self):
|
|
if not self._image_url:
|
|
self._get_urls()
|
|
|
|
return self._image_url
|
|
|
|
def download(self, dir_path):
|
|
filename = os.path.basename(self.image_url)
|
|
|
|
with requests.get(self.image_url, stream=True) as r:
|
|
with open(os.path.join(dir_path, filename), "wb") as f:
|
|
shutil.copyfileobj(r.raw, f)
|