You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
kmftools/bookscraper/scraper/services/scrape_engine.py

288 lines
9.0 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# ============================================================
# File: scraper/services/scrape_engine.py
# Purpose:
# Unified scraping engine for INIT-flow and Celery tasks.
# All functions are fully logged via @logcall.
# ============================================================
import os
import time
import re
import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
from logbus.publisher import log
from scraper.logger import log_debug
from scraper.logger_decorators import logcall
from scraper.utils.utils import load_replacements
class ScrapeEngine:
"""
Central scraping engine.
Metadata + chapterlist scraping.
All methods logged with @logcall.
"""
# ------------------------------------------------------------
# REPLACEMENTS LOADER
# ------------------------------------------------------------
@staticmethod
@logcall
def _apply_replacements(site):
fp = os.path.join(os.getcwd(), "replacements.txt")
extra = load_replacements(fp)
if not hasattr(site, "replacements"):
site.replacements = {}
site.replacements.update(extra)
return True
# ------------------------------------------------------------
# RATE LIMITER
# ------------------------------------------------------------
MIN_DELAY = 1.0 / float(os.getenv("MAX_DOWNLOADS_PER_SEC", "1"))
@staticmethod
@logcall
def _throttle(last_time=[0]):
now = time.time()
elapsed = now - last_time[0]
if elapsed < ScrapeEngine.MIN_DELAY:
time.sleep(ScrapeEngine.MIN_DELAY - elapsed)
last_time[0] = time.time()
return True
# ------------------------------------------------------------
# HTTP GET
# ------------------------------------------------------------
@staticmethod
@logcall
def _get_doc(url: str, site):
attempt = 1
while True:
ScrapeEngine._throttle()
log_debug(f"[SCRAPER] GET {url} (attempt {attempt})")
try:
resp = requests.get(
url,
headers={"User-Agent": "Mozilla/5.0"},
timeout=10,
)
except Exception as e:
log_debug(f"Network error {e} → retry {attempt + 1}s")
time.sleep(attempt + 1)
attempt += 1
continue
code = resp.status_code
if code == 200:
resp.encoding = getattr(site, "encoding", "utf-8")
return BeautifulSoup(resp.text, "lxml")
if code == 429:
cooldown = 60
log_debug("429 detected — cooldown 60s")
for i in range(cooldown, 0, -1):
log_debug(f" cooldown {i}s…")
time.sleep(1)
attempt += 1
continue
if code in (403, 500):
wait = min(5 * attempt, 30)
log_debug(f"HTTP {code} → retry in {wait}s")
time.sleep(wait)
attempt += 1
continue
wait = attempt + 1
log_debug(f"Unexpected HTTP {code} → sleep {wait}s")
time.sleep(wait)
attempt += 1
# ------------------------------------------------------------
# PARSER HELPERS
# ------------------------------------------------------------
@staticmethod
@logcall
def _parse_title(soup):
h1 = soup.find("h1")
return h1.get_text(strip=True) if h1 else "UnknownTitle"
@staticmethod
@logcall
def _parse_author(soup):
td = soup.find("td", string=lambda t: t and "" in t)
if td and "" in td.get_text():
return td.get_text(strip=True).split("")[1]
return "UnknownAuthor"
@staticmethod
@logcall
def _parse_description(soup):
span = soup.find("span", string=lambda t: t and "内容简介" in t)
if not span:
return ""
parts = []
for sib in span.next_siblings:
if getattr(sib, "name", None) == "span":
break
txt = (
sib.get_text(strip=True)
if hasattr(sib, "get_text")
else str(sib).strip()
)
if txt:
parts.append(txt)
return "\n".join(parts)
# ------------------------------------------------------------
# COVER PARSER
# ------------------------------------------------------------
@staticmethod
@logcall
def _parse_cover(soup, site):
"""
Vind cover door book_id substring matching:
- haal book_id uit site.url
- zoek IMG-tags waarvan filename book_id bevat
- kies kortste filename als beste match
"""
try:
parsed = urlparse(site.url)
m = re.search(r"/(\d+)\.html$", parsed.path)
if m:
book_id = m.group(1)
else:
book_id = parsed.path.rstrip("/").split("/")[-1]
except Exception:
return None
imgs = soup.find_all("img", src=True)
candidates = []
for img in imgs:
src = img["src"].strip()
filename = os.path.basename(src)
if book_id in filename:
candidates.append((filename, src))
if not candidates:
return None
candidates.sort(key=lambda t: len(t[0])) # kortste filename wint
best_src = candidates[0][1]
return urljoin(site.root, best_src)
# ------------------------------------------------------------
# RESOLVE CHAPTER PAGE
# ------------------------------------------------------------
@staticmethod
@logcall
def _resolve_chapter_page(soup, site):
node = soup.select_one(
"html > body > div:nth-of-type(6) > div:nth-of-type(2) > div > table"
)
if not node:
raise ValueError("Could not locate chapter list base node")
href = node.select_one("a").get("href")
url = urljoin(site.root, href)
parsed = urlparse(url)
basepath = parsed.path.rsplit("/", 1)[0] + "/"
chapter_base = f"{parsed.scheme}://{parsed.netloc}{basepath}"
return url, chapter_base
# ------------------------------------------------------------
# PARSE CHAPTER LINKS
# ------------------------------------------------------------
@staticmethod
@logcall
def _parse_chapter_links(soup, chapter_base, selector):
cont = soup.select_one(selector)
if not cont:
return []
items = cont.select("ul li a[href]")
chapters = []
idx = 1
for a in items:
href = a.get("href")
if not href.endswith(".html"):
continue
title = a.get_text(strip=True)
full = urljoin(chapter_base, href)
chapters.append({"num": idx, "title": title, "url": full})
idx += 1
return chapters
# ============================================================
# PUBLIC APIS
# ============================================================
@staticmethod
@logcall
def fetch_metadata_only(site, url: str) -> dict:
ScrapeEngine._apply_replacements(site)
soup = ScrapeEngine._get_doc(url, site)
site.url = url # NODIG voor cover parsing
return {
"title": ScrapeEngine._parse_title(soup),
"author": ScrapeEngine._parse_author(soup),
"description": ScrapeEngine._parse_description(soup),
"cover_url": ScrapeEngine._parse_cover(soup, site),
"book_url": url,
}
@staticmethod
@logcall
def fetch_metadata_and_chapters(site, url: str) -> dict:
ScrapeEngine._apply_replacements(site)
soup = ScrapeEngine._get_doc(url, site)
site.url = url
title = ScrapeEngine._parse_title(soup)
author = ScrapeEngine._parse_author(soup)
desc = ScrapeEngine._parse_description(soup)
cover = ScrapeEngine._parse_cover(soup, site)
chapter_page_url, chapter_base = ScrapeEngine._resolve_chapter_page(soup, site)
chapter_soup = ScrapeEngine._get_doc(chapter_page_url, site)
chapters = ScrapeEngine._parse_chapter_links(
chapter_soup, chapter_base, site.chapter_list_selector
)
return {
"title": title,
"author": author,
"description": desc,
"cover_url": cover,
"chapters": chapters,
"chapters_total": len(chapters),
"book_url": url,
}
@staticmethod
@logcall
def fetch_chapterlist(site, url: str):
ScrapeEngine._apply_replacements(site)
soup = ScrapeEngine._get_doc(url, site)
chapter_page_url, chapter_base = ScrapeEngine._resolve_chapter_page(soup, site)
chapter_soup = ScrapeEngine._get_doc(chapter_page_url, site)
return ScrapeEngine._parse_chapter_links(
chapter_soup, chapter_base, site.chapter_list_selector
)