You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
kmftools/bookscraper/scraper/book_scraper.py

270 lines
9.0 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import requests
import os
import time
from pathlib import Path
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
from PIL import Image
from io import BytesIO
from dotenv import load_dotenv
from scraper.logger import setup_logger, LOG_BUFFER
from scraper.utils import clean_text, load_replacements
load_dotenv()
logger = setup_logger()
class Chapter:
def __init__(self, number, title, url):
self.number = number
self.title = title
self.url = url
self.text = ""
class BookScraper:
def __init__(self, site, url):
self.site = site
self.url = url
self.book_title = ""
self.book_author = ""
self.book_description = ""
self.cover_url = ""
self.chapters = []
self.chapter_base = None
self.base_path = None
# ENV settings
self.DRY_RUN = os.getenv("DRY_RUN", "0") == "1"
self.TEST_CHAPTER_LIMIT = int(os.getenv("TEST_CHAPTER_LIMIT", "10"))
self.MAX_VOL_SIZE = int(os.getenv("MAX_VOL_SIZE", "1500"))
self.MAX_DL_PER_SEC = int(os.getenv("MAX_DL_PER_SEC", "2"))
# Load text replacements
self.replacements = load_replacements("replacements.txt")
# -----------------------------------------------------
def execute(self):
LOG_BUFFER.seek(0)
LOG_BUFFER.truncate(0)
logger.debug("Starting scraper for %s", self.url)
soup = self.get_document(self.url)
self.parse_title(soup)
self.parse_author(soup)
self.parse_description(soup)
self.parse_cover(soup)
self.prepare_output_folder()
chapter_page = self.get_chapter_page(soup)
self.parse_chapter_links(chapter_page)
if self.DRY_RUN:
logger.debug(
"DRY RUN → downloading only first %s chapters", self.TEST_CHAPTER_LIMIT)
self.get_some_chapters(self.TEST_CHAPTER_LIMIT)
else:
self.get_all_chapters()
self.split_into_volumes()
return {
"title": self.book_title,
"debug": LOG_BUFFER.getvalue()
}
# -----------------------------------------------------
# NETWORK
# -----------------------------------------------------
def get_document(self, url):
logger.debug("GET %s", url)
time.sleep(1 / max(1, self.MAX_DL_PER_SEC))
resp = requests.get(
url, headers={"User-Agent": "Mozilla/5.0"}, timeout=10)
resp.encoding = self.site.encoding
logger.debug("HTTP %s for %s", resp.status_code, url)
return BeautifulSoup(resp.text, "lxml")
# -----------------------------------------------------
# BASIC PARSERS (piaotia structure)
# -----------------------------------------------------
def parse_title(self, soup):
h1 = soup.find("h1")
if h1:
self.book_title = h1.get_text(strip=True)
else:
self.book_title = "UnknownTitle"
logger.debug("Book title: %s", self.book_title)
def parse_author(self, soup):
td = soup.find("td", string=lambda t: t and "" in t and "" in t)
if td:
raw = td.get_text(strip=True)
if "" in raw:
self.book_author = raw.split("", 1)[1].strip()
else:
self.book_author = "UnknownAuthor"
else:
self.book_author = "UnknownAuthor"
logger.debug("Book author: %s", self.book_author)
def parse_description(self, soup):
span = soup.find("span", string=lambda t: t and "内容简介" in t)
if not span:
self.book_description = ""
return
parts = []
for sib in span.next_siblings:
if getattr(sib, "name", None) == "span":
break
txt = sib.get_text(strip=True) if not isinstance(
sib, str) else sib.strip()
if txt:
parts.append(txt)
self.book_description = "\n".join(parts)
logger.debug("Description parsed (%s chars)",
len(self.book_description))
def parse_cover(self, soup):
selector = (
"html > body > div:nth-of-type(6) > div:nth-of-type(2) > div > table "
"> tr:nth-of-type(4) > td:nth-of-type(1) > table > tr:nth-of-type(1) "
"> td:nth-of-type(2) > a:nth-of-type(1) > img"
)
img = soup.select_one(selector)
if img:
self.cover_url = urljoin(self.site.root, img.get("src"))
else:
logger.debug("Cover not found!")
logger.debug("Cover URL = %s", self.cover_url)
# -----------------------------------------------------
def prepare_output_folder(self):
output_root = os.getenv("OUTPUT_DIR", "./output")
self.base_path = Path(output_root) / self.book_title / self.site.name
self.base_path.mkdir(parents=True, exist_ok=True)
logger.debug("Output directory: %s", self.base_path)
if self.cover_url:
self.save_image(self.cover_url, self.base_path / "cover.jpg")
def save_image(self, url, path):
logger.debug("Downloading cover: %s", url)
resp = requests.get(
url, headers={"User-Agent": "Mozilla/5.0"}, timeout=10)
if resp.status_code == 200:
img = Image.open(BytesIO(resp.content))
img.save(path)
logger.debug("Cover saved to %s", path)
# -----------------------------------------------------
# CHAPTER PAGE
# -----------------------------------------------------
def get_chapter_page(self, soup):
node = soup.select_one(
"html > body > div:nth-of-type(6) > div:nth-of-type(2) > div > table")
link = node.select_one("a")
href = link.get("href")
chapter_url = urljoin(self.site.root, href)
parsed = urlparse(chapter_url)
base = parsed.path.rsplit("/", 1)[0] + "/"
self.chapter_base = f"{parsed.scheme}://{parsed.netloc}{base}"
logger.debug("Chapter index URL = %s", chapter_url)
logger.debug("CHAPTER_BASE = %s", self.chapter_base)
return self.get_document(chapter_url)
def parse_chapter_links(self, soup):
container = soup.select_one("div.centent")
links = container.select("ul li a[href]")
for i, a in enumerate(links, 1):
href = a.get("href")
if not href.endswith(".html"):
continue
abs_url = urljoin(self.chapter_base, href)
title = a.get_text(strip=True)
self.chapters.append(Chapter(i, title, abs_url))
logger.debug("Total chapters: %s", len(self.chapters))
# -----------------------------------------------------
# DOWNLOAD CHAPTERS
# -----------------------------------------------------
def get_all_chapters(self):
for ch in self.chapters:
ch.text = self.fetch_chapter(ch)
logger.debug("CH %s length = %s", ch.number, len(ch.text))
def get_some_chapters(self, limit):
for ch in self.chapters[:limit]:
ch.text = self.fetch_chapter(ch)
filename = self.base_path / f"{ch.number:05d}_{ch.title}.txt"
filename.write_text(ch.text, encoding="utf-8")
logger.debug("Saved test chapter: %s", filename)
def fetch_chapter(self, ch):
soup = self.get_document(ch.url)
text = self.parse_chapter_text(soup)
return clean_text(text, self.replacements)
def parse_chapter_text(self, soup):
body = soup.body
h1 = body.find("h1")
parts = []
collecting = False
for sib in h1.next_siblings:
if getattr(sib, "get", None) and sib.get("class") == ["bottomlink"]:
break
if getattr(sib, "get", None) and sib.get("class") == ["toplink"]:
continue
if getattr(sib, "name", None) in ["script", "style"]:
continue
if not collecting:
if getattr(sib, "name", None) == "br":
collecting = True
continue
txt = sib.strip() if isinstance(sib, str) else sib.get_text("\n", strip=True)
if txt:
parts.append(txt)
return "\n".join(parts).strip()
# -----------------------------------------------------
# SPLIT VOLUMES
# -----------------------------------------------------
def split_into_volumes(self):
logger.debug(
"Splitting into volumes (max %s chapters per volume)", self.MAX_VOL_SIZE)
chapters = len(self.chapters)
volume = 1
index = 0
while index < chapters:
chunk = self.chapters[index:index + self.MAX_VOL_SIZE]
volume_dir = self.base_path / f"v{volume}"
volume_dir.mkdir(exist_ok=True)
for ch in chunk:
filename = volume_dir / f"{ch.number:05d}_{ch.title}.txt"
filename.write_text(ch.text, encoding="utf-8")
logger.debug("Volume %s saved (%s chapters)", volume, len(chunk))
volume += 1
index += self.MAX_VOL_SIZE