You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
kmftools/bookscraper/scraper/book_scraper.py

392 lines
12 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import requests
import os
import time
from pathlib import Path
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
from PIL import Image
from io import BytesIO
from scraper.logger import log_debug
from scraper.utils import clean_text, load_replacements
class Chapter:
def __init__(self, num, title, url):
self.number = num
self.title = title
self.url = url
self.text = ""
class BookScraper:
def __init__(self, site, url):
self.site = site
self.url = url
self.book_title = ""
self.book_author = ""
self.book_description = ""
self.cover_url = ""
self.chapters = []
self.base_path = None
self.chapter_base = None
# ENV
self.DRY_RUN = os.getenv("DRY_RUN", "1") == "1"
self.TEST_LIMIT = int(os.getenv("TEST_LIMIT", "10"))
self.MAX_DL = float(os.getenv("MAX_DOWNLOADS_PER_SEC", "1"))
self.min_delay = 1.0 / self.MAX_DL if self.MAX_DL > 0 else 1.0
self._last_download_time = 0
# replacements.txt
fp = os.path.join(os.getcwd(), "replacements.txt")
extra = load_replacements(fp)
self.site.replacements.update(extra)
self.start_time = None
self.total_chapters = 0
self.volume_dirs = {}
# ------------------------------------------------------------
# RATE LIMITER
# ------------------------------------------------------------
def throttle(self):
now = time.time()
elapsed = now - self._last_download_time
if elapsed < self.min_delay:
time.sleep(self.min_delay - elapsed)
self._last_download_time = time.time()
# ------------------------------------------------------------
def execute(self):
log_debug(f"Starting scraper for {self.url}")
self.start_time = time.time()
soup = self.get_doc_with_retry(self.url)
self.parse_title(soup)
self.parse_author(soup)
self.parse_description(soup)
self.parse_cover(soup)
self.prepare_output_folder()
chapter_page = self.get_chapter_page(soup)
self.parse_chapter_links(chapter_page)
self.prepare_volume_folders()
if self.DRY_RUN:
self.download_some(self.TEST_LIMIT)
else:
self.download_all()
return {"title": self.book_title}
# ------------------------------------------------------------
# HTTP GET WITH RETRIES + HARD 429 COOLDOWN WITH COUNTDOWN
# ------------------------------------------------------------
def get_doc_with_retry(self, url):
attempt = 1
while True:
self.throttle()
log_debug(f"GET {url} (attempt {attempt})")
try:
resp = requests.get(
url,
headers={"User-Agent": "Mozilla/5.0"},
timeout=10,
)
except Exception as e:
log_debug(f"Network error {e} → retry in {attempt + 1}s")
time.sleep(attempt + 1)
attempt += 1
continue
code = resp.status_code
log_debug(f"HTTP {code} for {url}")
# 429 → hard cooldown with countdown
if code == 429:
cooldown = 60
log_debug(f"429 detected — cooldown {cooldown}s")
for i in range(cooldown, 0, -1):
log_debug(f"429 cooldown… {i}s remaining")
time.sleep(1)
attempt += 1
continue
# recoverable
if code in (403, 500):
wait = min(5 * attempt, 30)
log_debug(f"HTTP {code} → retry in {wait}s")
time.sleep(wait)
attempt += 1
continue
if code == 200:
resp.encoding = self.site.encoding
return BeautifulSoup(resp.text, "lxml")
# unexpected
wait = attempt + 1
log_debug(f"Unexpected HTTP {code} → sleep {wait}s")
time.sleep(wait)
attempt += 1
# ------------------------------------------------------------
def parse_title(self, soup):
h1 = soup.find("h1")
self.book_title = h1.get_text(strip=True) if h1 else "UnknownTitle"
log_debug(f"Book title = {self.book_title}")
def parse_author(self, soup):
td = soup.find("td", string=lambda t: t and "" in t)
self.book_author = (
td.get_text(strip=True).split("")[1]
if td and "" in td.get_text()
else "UnknownAuthor"
)
log_debug(f"Book author = {self.book_author}")
def parse_description(self, soup):
span = soup.find("span", string=lambda t: t and "内容简介" in t)
if not span:
log_debug("No description found")
self.book_description = ""
return
parts = []
for sib in span.next_siblings:
if getattr(sib, "name", None) == "span":
break
text = (
sib.get_text(strip=True)
if hasattr(sib, "get_text")
else str(sib).strip()
)
if text:
parts.append(text)
self.book_description = "\n".join(parts)
log_debug(f"Description length = {len(self.book_description)}")
# ------------------------------------------------------------
def parse_cover(self, soup):
cover = soup.find(
"img", src=lambda v: v and "files/article/image" in v)
if not cover:
log_debug("Cover not found")
return
self.cover_url = urljoin(self.site.root, cover.get("src"))
log_debug(f"Cover URL = {self.cover_url}")
# ------------------------------------------------------------
def prepare_output_folder(self):
self.base_path = Path("output") / self.book_title / self.site.name
self.base_path.mkdir(parents=True, exist_ok=True)
if self.cover_url:
self.download_cover()
def download_cover(self):
log_debug(f"Downloading cover: {self.cover_url}")
resp = requests.get(
self.cover_url,
headers={"User-Agent": "Mozilla/5.0"},
timeout=10,
)
if resp.status_code != 200:
return
if "html" in resp.headers.get("Content-Type", ""):
return
try:
img = Image.open(BytesIO(resp.content))
except:
return
img.save(self.base_path / "cover.jpg")
log_debug("Cover saved")
# ------------------------------------------------------------
def get_chapter_page(self, soup):
node = soup.select_one(
"html > body > div:nth-of-type(6) > div:nth-of-type(2) > div > table"
)
href = node.select_one("a").get("href")
url = urljoin(self.site.root, href)
parsed = urlparse(url)
bp = parsed.path.rsplit("/", 1)[0] + "/"
self.chapter_base = f"{parsed.scheme}://{parsed.netloc}{bp}"
return self.get_doc_with_retry(url)
# ------------------------------------------------------------
def parse_chapter_links(self, soup):
cont = soup.select_one(self.site.chapter_list_selector)
items = cont.select("ul li a[href]")
self.chapters = []
idx = 1
for a in items:
href = a.get("href")
if not href.endswith(".html"):
continue
title = a.get_text(strip=True)
full = urljoin(self.chapter_base, href)
self.chapters.append(Chapter(idx, title, full))
idx += 1
self.total_chapters = len(self.chapters)
log_debug(f"Found {self.total_chapters} chapters")
# ------------------------------------------------------------
def prepare_volume_folders(self):
max_size = int(os.getenv("MAX_VOL_SIZE", "200"))
num_vols = (self.total_chapters + max_size - 1) // max_size
for v in range(1, num_vols + 1):
d = self.base_path / f"v{v}"
d.mkdir(parents=True, exist_ok=True)
self.volume_dirs[v] = d
# ------------------------------------------------------------
def download_all(self):
for ch in self.chapters:
self.download_chapter(ch)
def download_some(self, limit):
for ch in self.chapters[:limit]:
self.download_chapter(ch)
# ------------------------------------------------------------
def download_chapter(self, ch):
# Determine volume + filename
max_size = int(os.getenv("MAX_VOL_SIZE", "200"))
volume = ((ch.number - 1) // max_size) + 1
vdir = self.volume_dirs.get(volume, self.base_path)
expected_name = f"{ch.number:05d}_{ch.title}.txt"
fname = vdir / expected_name
expected_full_path = str(fname.resolve())
# STRICT SKIP CHECK
if fname.exists() and fname.is_file():
actual_size = fname.stat().st_size
# correct name?
if fname.name == expected_name:
expected_dir = str(vdir.resolve())
actual_dir = str(fname.parent.resolve())
if expected_dir == actual_dir:
if actual_size > 300:
log_debug(
f"Skip chapter {ch.number}/{self.total_chapters}: already exists\n"
f" Path: {expected_full_path}\n"
f" Size: {actual_size} bytes"
)
return
else:
log_debug(
f"Existing file too small ({actual_size} bytes), redownloading: {expected_full_path}"
)
else:
log_debug(
f"Directory mismatch for chapter {ch.number}, redownloading"
)
else:
log_debug(
f"Filename mismatch for chapter {ch.number}, redownloading\n"
f" Expected: {expected_name}\n"
f" Found: {fname.name}"
)
# PROGRESS INFO
percent = (ch.number / self.total_chapters) * 100
elapsed = time.time() - self.start_time
avg_time = elapsed / max(ch.number - 1, 1)
remaining = self.total_chapters - ch.number
eta_seconds = max(0, remaining * avg_time)
eta_min = int(eta_seconds // 60)
eta_sec = int(eta_seconds % 60)
log_debug(
f"Fetching chapter {ch.number}/{self.total_chapters} "
f"({percent:.2f}%, ETA {eta_min}m {eta_sec}s): "
f"{ch.title}"
)
# RETRY EMPTY CONTENT
attempt = 1
while True:
soup = self.get_doc_with_retry(ch.url)
text = self.parse_chapter_text(soup)
if text.strip():
ch.text = text
break
wait = min(10 + attempt, 30)
log_debug(f"Empty chapter → retry in {wait}s")
time.sleep(wait)
attempt += 1
fname.write_text(ch.text, encoding="utf-8")
log_debug(f"Saved chapter to v{volume}: {fname}")
chapter_delay = float(os.getenv("CHAPTER_DELAY", "2"))
log_debug(f"Throttling {chapter_delay}s before next chapter")
time.sleep(chapter_delay)
# ------------------------------------------------------------
def parse_chapter_text(self, soup):
body = soup.body
if not body:
return ""
h1 = body.find("h1")
if not h1:
return ""
parts = []
collecting = False
for sib in h1.next_siblings:
if getattr(sib, "class", None) == ["toplink"]:
continue
if getattr(sib, "class", None) == ["bottomlink"]:
break
if getattr(sib, "name", None) in ["script", "style"]:
continue
if not collecting:
if getattr(sib, "name", None) == "br":
collecting = True
continue
text = (
sib.get_text("\n", strip=True)
if hasattr(sib, "get_text")
else str(sib).strip()
)
if text:
parts.append(text)
raw = "\n".join(parts)
raw = clean_text(raw, self.site.replacements)
return raw.strip()