From cbfcce62cc1f0fcfbc17a541d660540952d5d9f4 Mon Sep 17 00:00:00 2001 From: "peter.fong" Date: Mon, 1 Dec 2025 20:37:13 +0100 Subject: [PATCH] refactor complete celery multithreaded downloads --- bookscraper/app.py | 45 +-- bookscraper/celery_app.py | 68 ++-- bookscraper/docker-compose.yml | 175 ++++++--- bookscraper/docker/Dockerfile.audio | 18 +- bookscraper/docker/Dockerfile.scraper | 20 +- bookscraper/docker/Dockerfile.web | 15 +- bookscraper/requirements.audio.txt | 7 + bookscraper/requirements.scraper.txt | 7 + bookscraper/requirements.txt | 3 + bookscraper/requirements.web.txt | 8 + bookscraper/scraper/book_scraper.py | 348 +++--------------- bookscraper/scraper/download_controller.py | 79 ++-- bookscraper/scraper/models/book_state.py | 16 + bookscraper/scraper/tasks/controller_tasks.py | 21 ++ bookscraper/scraper/tasks/download_tasks.py | 30 +- bookscraper/scraper/tasks/parse_tasks.py | 120 +++--- bookscraper/scraper/tasks/pipeline.py | 25 +- bookscraper/scraper/tasks/save_tasks.py | 42 +-- bookscraper/scraper/tasks/scraping.py | 69 ++-- bookscraper/scraper/tasks/utils.py | 57 +++ bookscraper/scraper/utils.py | 44 ++- bookscraper/templates/result.html | 118 +++--- 22 files changed, 674 insertions(+), 661 deletions(-) create mode 100644 bookscraper/requirements.audio.txt create mode 100644 bookscraper/requirements.scraper.txt create mode 100644 bookscraper/requirements.web.txt create mode 100644 bookscraper/scraper/tasks/controller_tasks.py diff --git a/bookscraper/app.py b/bookscraper/app.py index f6f4377..522c727 100644 --- a/bookscraper/app.py +++ b/bookscraper/app.py @@ -1,22 +1,20 @@ # ============================================ # File: bookscraper/app.py -# Ensure project directory is on PYTHONPATH # ============================================ -from scraper.logger import log_debug -from scraper.download_controller import DownloadController -from flask import Flask, render_template, request from dotenv import load_dotenv -import sys -from pathlib import Path - -# Add this directory (bookscraper/) to Python import path -PROJECT_ROOT = Path(__file__).resolve().parent -sys.path.insert(0, str(PROJECT_ROOT)) -# Load .env BEFORE any Celery app is imported load_dotenv() +print(">>> [WEB] Importing celery_app …") +from celery_app import celery_app # <<< MOET BOVEN TASK IMPORTS + +from scraper.logger import log_debug +from flask import Flask, render_template, request + +# Task imports komen pas na celery_app: +print(">>> [WEB] Importing tasks …") +from scraper.tasks.scraping import start_scrape_book app = Flask(__name__) @@ -33,20 +31,25 @@ def start_scraping(): if not url: return render_template("result.html", error="Geen URL opgegeven.") - try: - log_debug(f"[WEB] Start scraping: {url}") - - ctl = DownloadController(url) - result = ctl.start() + log_debug(f"[WEB] Scrape request for: {url}") - return render_template("result.html", result=result, url=url) + # Belangrijk: start_scrape_book komt uit DEZELFDE celery_app nu + result = start_scrape_book.delay(url) - except Exception as e: - log_debug(f"[WEB] ERROR: {e}") - return render_template("result.html", error=str(e), url=url) + return render_template( + "result.html", + message="Scraping gestart.", + task_id=result.id, + url=url, + ) if __name__ == "__main__": import os + debug = os.getenv("FLASK_DEBUG", "0") == "1" - app.run(host="0.0.0.0", port=5000, debug=debug) + host = os.getenv("HOST", "0.0.0.0") + port = int(os.getenv("PORT", "5000")) + + log_debug(f"[WEB] Starting Flask server on {host}:{port}, debug={debug}") + app.run(host=host, port=port, debug=debug) diff --git a/bookscraper/celery_app.py b/bookscraper/celery_app.py index 569f0ad..2c3fcc4 100644 --- a/bookscraper/celery_app.py +++ b/bookscraper/celery_app.py @@ -1,45 +1,47 @@ -# ============================================ -# File: bookscraper/celery_app.py -# ============================================ - +# celery_app.py import os from celery import Celery from dotenv import load_dotenv -# Load environment variables (OK to do here) +print(">>> [celery_app] Loading .env BEFORE initializing Celery...") load_dotenv() -print(">>> DEBUG: celery_app.py LOADED") -print(">>> DEBUG: env REDIS_BROKER =", os.getenv("REDIS_BROKER")) -print(">>> DEBUG: env REDIS_URL =", os.getenv("REDIS_URL")) - -# Read broker settings -REDIS_BROKER = os.getenv("REDIS_BROKER") -REDIS_BACKEND = os.getenv("REDIS_BACKEND") +BROKER = os.getenv("REDIS_BROKER") +BACKEND = os.getenv("REDIS_BACKEND") -# Fallback ONLY if missing -if not REDIS_BROKER: - REDIS_BROKER = os.getenv( - "REDIS_URL", "redis://host.docker.internal:6379/0" - ) +print(">>> [celery_app] BROKER =", BROKER) +print(">>> [celery_app] BACKEND =", BACKEND) -if not REDIS_BACKEND: - REDIS_BACKEND = REDIS_BROKER # safe fallback - -# Create Celery app AFTER loading .env celery_app = Celery( "bookscraper", - broker=REDIS_BROKER, - backend=REDIS_BACKEND, + broker=BROKER, + backend=BACKEND, + include=[ + "scraper.tasks.scraping", + "scraper.tasks.controller_tasks", + "scraper.tasks.download_tasks", + "scraper.tasks.parse_tasks", + "scraper.tasks.save_tasks", + ], ) -celery_app.conf.update( - task_default_queue="default", - task_routes={ - "tasks.scraping.*": {"queue": "scraping"}, - "tasks.audio.*": {"queue": "audio"}, - "tasks.*": {"queue": "default"}, - }, - worker_prefetch_multiplier=1, - task_acks_late=True, -) +celery_app.conf.task_routes = { + "scraper.tasks.scraping.*": {"queue": "scraping"}, + "scraper.tasks.controller_tasks.*": {"queue": "controller"}, + "scraper.tasks.download_tasks.*": {"queue": "download"}, + "scraper.tasks.parse_tasks.*": {"queue": "parse"}, + "scraper.tasks.save_tasks.*": {"queue": "save"}, +} + +# ------------------------------------------------------------ +# EXTRA DEBUG: test import of included modules +# ------------------------------------------------------------ +print(">>> [celery_app] Testing imports for included task modules...") +for module in celery_app.conf.include: + try: + __import__(module) + print(f">>> [celery_app] OK import {module}") + except Exception as e: + print(f">>> [celery_app] FAILED import {module}: {e}") + +print(">>> [celery_app] Celery initialization complete.") diff --git a/bookscraper/docker-compose.yml b/bookscraper/docker-compose.yml index 3a02a0a..770f733 100644 --- a/bookscraper/docker-compose.yml +++ b/bookscraper/docker-compose.yml @@ -1,87 +1,152 @@ -version: "3.9" - services: + # ---------------------------------------------------------- + # Redis broker & backend + # ---------------------------------------------------------- + redis: + image: redis:7 + container_name: bookscraper_redis + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 2s + timeout: 2s + retries: 20 + restart: "no" + + # ---------------------------------------------------------- + # Controller Worker + # ---------------------------------------------------------- + worker_controller: + build: + context: . + dockerfile: docker/Dockerfile.scraper + container_name: worker_controller + command: celery -A celery_app worker -Q controller -n controller@%h -l INFO + depends_on: + redis: + condition: service_healthy + env_file: + - .env + environment: + - PYTHONUNBUFFERED=1 + volumes: + - .:/app + - /Users/peter/Desktop/books:/app/output + restart: "no" - # ------------------------- - # WEB UI - # ------------------------- + # ---------------------------------------------------------- + # Web GUI + # ---------------------------------------------------------- web: build: context: . - dockerfile: Dockerfile - container_name: bookscraper - ports: - - "5050:5000" - + dockerfile: docker/Dockerfile.web + container_name: bookscraper_web volumes: - .:/app - /Users/peter/Desktop/books:/app/output - + depends_on: + redis: + condition: service_healthy + ports: + - "5011:5000" + environment: + - REDIS_BROKER=redis://redis:6379/0 + - REDIS_BACKEND=redis://redis:6379/1 env_file: - .env + restart: "no" - environment: - FLASK_ENV: "production" - - restart: unless-stopped - + # ---------------------------------------------------------- + # Download Worker + # ---------------------------------------------------------- + worker_download: + build: + context: . + dockerfile: docker/Dockerfile.scraper + container_name: worker_download + volumes: + - .:/app + - /Users/peter/Desktop/books:/app/output depends_on: - - redis - + redis: + condition: service_healthy + env_file: + - .env + command: celery -A celery_app worker -Q download -n download@%h -l INFO + restart: "no" - # ------------------------- - # SCRAPING WORKER - # (1 concurrency, 1 job tegelijk) - # ------------------------- - scraper_worker: + # ---------------------------------------------------------- + # Parse Worker + # ---------------------------------------------------------- + worker_parse: build: context: . - dockerfile: Dockerfile - container_name: scraper_worker - command: python worker/scrape_worker.py - + dockerfile: docker/Dockerfile.scraper + container_name: worker_parse volumes: - .:/app - /Users/peter/Desktop/books:/app/output - + depends_on: + redis: + condition: service_healthy env_file: - .env + command: celery -A celery_app worker -Q parse -n parse@%h -l INFO + restart: "no" - restart: unless-stopped - + # ---------------------------------------------------------- + # Save Worker + # ---------------------------------------------------------- + worker_save: + build: + context: . + dockerfile: docker/Dockerfile.scraper + container_name: worker_save + volumes: + - .:/app + - /Users/peter/Desktop/books:/app/output depends_on: - - redis - + redis: + condition: service_healthy + env_file: + - .env + command: celery -A celery_app worker -Q save -n save@%h -l INFO + restart: "no" - # ------------------------- - # AUDIO WORKER - # ------------------------- - audio_worker: + # ---------------------------------------------------------- + # Audio Worker (macOS only) + # ---------------------------------------------------------- + worker_audio: build: context: . - dockerfile: Dockerfile - container_name: audio_worker - command: python worker/audio_worker.py - + dockerfile: docker/Dockerfile.audio + container_name: worker_audio volumes: - .:/app - /Users/peter/Desktop/books:/app/output - + depends_on: + redis: + condition: service_healthy env_file: - .env + command: celery -A celery_app worker -Q audio -n audio@%h -l INFO + restart: "no" - restart: unless-stopped - + # ---------------------------------------------------------- + # Scraping Worker + # ---------------------------------------------------------- + worker_scraping: + build: + context: . + dockerfile: docker/Dockerfile.scraper + container_name: worker_scraping + volumes: + - .:/app + - /Users/peter/Desktop/books:/app/output depends_on: - - redis - - - # ------------------------- - # REDIS (LOGS & QUEUE) - # ------------------------- - redis: - image: redis:alpine - container_name: redis - ports: - - "6379:6379" - restart: unless-stopped + redis: + condition: service_healthy + env_file: + - .env + command: celery -A celery_app worker -Q scraping -n scraping@%h -l INFO + restart: "no" diff --git a/bookscraper/docker/Dockerfile.audio b/bookscraper/docker/Dockerfile.audio index 809b36b..c308a7c 100644 --- a/bookscraper/docker/Dockerfile.audio +++ b/bookscraper/docker/Dockerfile.audio @@ -1,17 +1,9 @@ -# docker/Dockerfile.audio -FROM python:3.11-slim - +FROM python:3.12-slim WORKDIR /app -# Install audio processing dependencies (extend later) -RUN apt-get update && apt-get install -y --no-install-recommends \ - ffmpeg \ - libavcodec-extra \ - && rm -rf /var/lib/apt/lists/* - -COPY requirements.txt . -RUN pip install --no-cache-dir -r requirements.txt +COPY requirements.audio.txt /app/requirements.audio.txt +RUN pip install --no-cache-dir -r /app/requirements.audio.txt -COPY . . +COPY . /app -CMD ["python", "worker/audio_worker.py"] +CMD ["python3", "-c", "print('audio worker ready')"] diff --git a/bookscraper/docker/Dockerfile.scraper b/bookscraper/docker/Dockerfile.scraper index 011cfcb..640b21c 100644 --- a/bookscraper/docker/Dockerfile.scraper +++ b/bookscraper/docker/Dockerfile.scraper @@ -1,17 +1,15 @@ -# docker/Dockerfile.scraper -FROM python:3.11-slim - +FROM python:3.12-slim WORKDIR /app -# Scraper-specific system dependencies -RUN apt-get update && apt-get install -y --no-install-recommends \ - libjpeg62-turbo-dev \ - zlib1g-dev \ +RUN apt-get update && apt-get install -y \ + build-essential \ + libxml2-dev \ + libxslt1-dev \ && rm -rf /var/lib/apt/lists/* -COPY requirements.txt . -RUN pip install --no-cache-dir -r requirements.txt +COPY requirements.scraper.txt /app/requirements.scraper.txt +RUN pip install --no-cache-dir -r /app/requirements.scraper.txt -COPY . . +COPY . /app -CMD ["python", "worker/scrape_worker.py"] +CMD ["python3", "-c", "print('scraper worker ready')"] diff --git a/bookscraper/docker/Dockerfile.web b/bookscraper/docker/Dockerfile.web index e9a946f..c2bd3d8 100644 --- a/bookscraper/docker/Dockerfile.web +++ b/bookscraper/docker/Dockerfile.web @@ -1,16 +1,17 @@ -# docker/Dockerfile.web FROM python:3.11-slim WORKDIR /app -# Install only Python deps -COPY requirements.txt . -RUN pip install --no-cache-dir -r requirements.txt +# Copy full requirements for both Flask + Celery + BookScraper +COPY requirements.txt /app/requirements.txt +RUN pip install --no-cache-dir -r /app/requirements.txt -# Copy the entire app -COPY . . +# Copy entire application (including .env so load_dotenv works) +COPY . /app + +# Ensure Celery + BookScraper modules load correctly +ENV PYTHONPATH=/app -# Flask runs on port 5000 EXPOSE 5000 CMD ["python", "app.py"] diff --git a/bookscraper/requirements.audio.txt b/bookscraper/requirements.audio.txt new file mode 100644 index 0000000..a67536f --- /dev/null +++ b/bookscraper/requirements.audio.txt @@ -0,0 +1,7 @@ +requests +beautifulsoup4 +lxml +pillow +redis +celery[redis] +python-dotenv diff --git a/bookscraper/requirements.scraper.txt b/bookscraper/requirements.scraper.txt new file mode 100644 index 0000000..a67536f --- /dev/null +++ b/bookscraper/requirements.scraper.txt @@ -0,0 +1,7 @@ +requests +beautifulsoup4 +lxml +pillow +redis +celery[redis] +python-dotenv diff --git a/bookscraper/requirements.txt b/bookscraper/requirements.txt index c51da14..2b271e3 100644 --- a/bookscraper/requirements.txt +++ b/bookscraper/requirements.txt @@ -3,3 +3,6 @@ requests beautifulsoup4 lxml pillow +redis +celery +python-dotenv diff --git a/bookscraper/requirements.web.txt b/bookscraper/requirements.web.txt new file mode 100644 index 0000000..2b271e3 --- /dev/null +++ b/bookscraper/requirements.web.txt @@ -0,0 +1,8 @@ +flask +requests +beautifulsoup4 +lxml +pillow +redis +celery +python-dotenv diff --git a/bookscraper/scraper/book_scraper.py b/bookscraper/scraper/book_scraper.py index 83b0348..597c313 100644 --- a/bookscraper/scraper/book_scraper.py +++ b/bookscraper/scraper/book_scraper.py @@ -1,25 +1,20 @@ +# scraper/book_scraper.py + import requests -import os -import time -from pathlib import Path from bs4 import BeautifulSoup from urllib.parse import urljoin, urlparse -from PIL import Image -from io import BytesIO from scraper.logger import log_debug from scraper.utils import clean_text, load_replacements - - -class Chapter: - def __init__(self, num, title, url): - self.number = num - self.title = title - self.url = url - self.text = "" +from scraper.models.book_state import Chapter class BookScraper: + """ + Lightweight scraper: only metadata + chapter list. + All downloading/parsing/saving is handled by Celery tasks. + """ + def __init__(self, site, url): self.site = site self.url = url @@ -30,142 +25,59 @@ class BookScraper: self.cover_url = "" self.chapters = [] - self.base_path = None self.chapter_base = None - # ENV - self.DRY_RUN = os.getenv("DRY_RUN", "1") == "1" - self.TEST_LIMIT = int(os.getenv("TEST_LIMIT", "10")) - self.MAX_DL = float(os.getenv("MAX_DOWNLOADS_PER_SEC", "1")) - self.min_delay = 1.0 / self.MAX_DL if self.MAX_DL > 0 else 1.0 - self._last_download_time = 0 - - # replacements.txt - fp = os.path.join(os.getcwd(), "replacements.txt") - extra = load_replacements(fp) + # Load custom replacements + extra = load_replacements("replacements.txt") self.site.replacements.update(extra) - self.start_time = None - self.total_chapters = 0 - self.volume_dirs = {} - - # ------------------------------------------------------------ - # RATE LIMITER - # ------------------------------------------------------------ - - def throttle(self): - now = time.time() - elapsed = now - self._last_download_time - - if elapsed < self.min_delay: - time.sleep(self.min_delay - elapsed) - - self._last_download_time = time.time() - # ------------------------------------------------------------ - def execute(self): - log_debug(f"Starting scraper for {self.url}") - - self.start_time = time.time() + def parse_book_info(self): + """Parse title, author, description, cover from the main page.""" + soup = self._fetch(self.url) - soup = self.get_doc_with_retry(self.url) - self.parse_title(soup) - self.parse_author(soup) - self.parse_description(soup) - self.parse_cover(soup) - - self.prepare_output_folder() + self._parse_title(soup) + self._parse_author(soup) + self._parse_description(soup) + self._parse_cover(soup) + # Parse chapter list page + chapter links chapter_page = self.get_chapter_page(soup) self.parse_chapter_links(chapter_page) - self.prepare_volume_folders() - - if self.DRY_RUN: - self.download_some(self.TEST_LIMIT) - else: - self.download_all() - - return {"title": self.book_title} # ------------------------------------------------------------ - # HTTP GET WITH RETRIES + HARD 429 COOLDOWN WITH COUNTDOWN - # ------------------------------------------------------------ - def get_doc_with_retry(self, url): - attempt = 1 - - while True: - self.throttle() - log_debug(f"GET {url} (attempt {attempt})") - - try: - resp = requests.get( - url, - headers={"User-Agent": "Mozilla/5.0"}, - timeout=10, - ) - except Exception as e: - log_debug(f"Network error {e} → retry in {attempt + 1}s") - time.sleep(attempt + 1) - attempt += 1 - continue - - code = resp.status_code - log_debug(f"HTTP {code} for {url}") - - # 429 → hard cooldown with countdown - if code == 429: - cooldown = 60 - log_debug(f"429 detected — cooldown {cooldown}s") - for i in range(cooldown, 0, -1): - log_debug(f"429 cooldown… {i}s remaining") - time.sleep(1) - attempt += 1 - continue - - # recoverable - if code in (403, 500): - wait = min(5 * attempt, 30) - log_debug(f"HTTP {code} → retry in {wait}s") - time.sleep(wait) - attempt += 1 - continue - - if code == 200: - resp.encoding = self.site.encoding - return BeautifulSoup(resp.text, "lxml") - - # unexpected - wait = attempt + 1 - log_debug(f"Unexpected HTTP {code} → sleep {wait}s") - time.sleep(wait) - attempt += 1 + def _fetch(self, url): + """Simple fetch (no retry), DownloadController handles errors.""" + log_debug(f"[BookScraper] Fetch: {url}") + resp = requests.get(url, headers={"User-Agent": "Mozilla/5.0"}, timeout=10) + resp.encoding = self.site.encoding + return BeautifulSoup(resp.text, "lxml") # ------------------------------------------------------------ - def parse_title(self, soup): + def _parse_title(self, soup): h1 = soup.find("h1") self.book_title = h1.get_text(strip=True) if h1 else "UnknownTitle" - log_debug(f"Book title = {self.book_title}") + log_debug(f"[BookScraper] Title = {self.book_title}") - def parse_author(self, soup): + def _parse_author(self, soup): td = soup.find("td", string=lambda t: t and "作" in t) - self.book_author = ( - td.get_text(strip=True).split(":")[1] - if td and ":" in td.get_text() - else "UnknownAuthor" - ) - log_debug(f"Book author = {self.book_author}") + raw = td.get_text(strip=True) if td else "" + self.book_author = raw.split(":")[1] if ":" in raw else "UnknownAuthor" + log_debug(f"[BookScraper] Author = {self.book_author}") - def parse_description(self, soup): + def _parse_description(self, soup): span = soup.find("span", string=lambda t: t and "内容简介" in t) if not span: - log_debug("No description found") self.book_description = "" + log_debug("[BookScraper] Description not found") return parts = [] for sib in span.next_siblings: + # Stop when next book section begins if getattr(sib, "name", None) == "span": break + text = ( sib.get_text(strip=True) if hasattr(sib, "get_text") @@ -175,52 +87,23 @@ class BookScraper: parts.append(text) self.book_description = "\n".join(parts) - log_debug(f"Description length = {len(self.book_description)}") + log_debug( + f"[BookScraper] Description length = {len(self.book_description)} characters" + ) # ------------------------------------------------------------ - def parse_cover(self, soup): - cover = soup.find( - "img", src=lambda v: v and "files/article/image" in v) + def _parse_cover(self, soup): + cover = soup.find("img", src=lambda v: v and "files/article/image" in v) if not cover: - log_debug("Cover not found") + log_debug("[BookScraper] No cover found") return self.cover_url = urljoin(self.site.root, cover.get("src")) - log_debug(f"Cover URL = {self.cover_url}") - - # ------------------------------------------------------------ - def prepare_output_folder(self): - self.base_path = Path("output") / self.book_title / self.site.name - self.base_path.mkdir(parents=True, exist_ok=True) - - if self.cover_url: - self.download_cover() - - def download_cover(self): - log_debug(f"Downloading cover: {self.cover_url}") - - resp = requests.get( - self.cover_url, - headers={"User-Agent": "Mozilla/5.0"}, - timeout=10, - ) - - if resp.status_code != 200: - return - - if "html" in resp.headers.get("Content-Type", ""): - return - - try: - img = Image.open(BytesIO(resp.content)) - except: - return - - img.save(self.base_path / "cover.jpg") - log_debug("Cover saved") + log_debug(f"[BookScraper] Cover URL = {self.cover_url}") # ------------------------------------------------------------ def get_chapter_page(self, soup): + """Return BeautifulSoup of the main chapter list page.""" node = soup.select_one( "html > body > div:nth-of-type(6) > div:nth-of-type(2) > div > table" ) @@ -231,7 +114,7 @@ class BookScraper: bp = parsed.path.rsplit("/", 1)[0] + "/" self.chapter_base = f"{parsed.scheme}://{parsed.netloc}{bp}" - return self.get_doc_with_retry(url) + return self._fetch(url) # ------------------------------------------------------------ def parse_chapter_links(self, soup): @@ -240,152 +123,21 @@ class BookScraper: self.chapters = [] idx = 1 + for a in items: href = a.get("href") if not href.endswith(".html"): continue + title = a.get_text(strip=True) full = urljoin(self.chapter_base, href) + self.chapters.append(Chapter(idx, title, full)) idx += 1 - self.total_chapters = len(self.chapters) - log_debug(f"Found {self.total_chapters} chapters") - - # ------------------------------------------------------------ - def prepare_volume_folders(self): - max_size = int(os.getenv("MAX_VOL_SIZE", "200")) - num_vols = (self.total_chapters + max_size - 1) // max_size - - for v in range(1, num_vols + 1): - d = self.base_path / f"v{v}" - d.mkdir(parents=True, exist_ok=True) - self.volume_dirs[v] = d - - # ------------------------------------------------------------ - def download_all(self): - for ch in self.chapters: - self.download_chapter(ch) - - def download_some(self, limit): - for ch in self.chapters[:limit]: - self.download_chapter(ch) - - # ------------------------------------------------------------ - def download_chapter(self, ch): - # Determine volume + filename - max_size = int(os.getenv("MAX_VOL_SIZE", "200")) - volume = ((ch.number - 1) // max_size) + 1 - vdir = self.volume_dirs.get(volume, self.base_path) - - expected_name = f"{ch.number:05d}_{ch.title}.txt" - fname = vdir / expected_name - expected_full_path = str(fname.resolve()) - - # STRICT SKIP CHECK - if fname.exists() and fname.is_file(): - actual_size = fname.stat().st_size - - # correct name? - if fname.name == expected_name: - expected_dir = str(vdir.resolve()) - actual_dir = str(fname.parent.resolve()) - - if expected_dir == actual_dir: - if actual_size > 300: - log_debug( - f"Skip chapter {ch.number}/{self.total_chapters}: already exists\n" - f" Path: {expected_full_path}\n" - f" Size: {actual_size} bytes" - ) - return - else: - log_debug( - f"Existing file too small ({actual_size} bytes), redownloading: {expected_full_path}" - ) - else: - log_debug( - f"Directory mismatch for chapter {ch.number}, redownloading" - ) - else: - log_debug( - f"Filename mismatch for chapter {ch.number}, redownloading\n" - f" Expected: {expected_name}\n" - f" Found: {fname.name}" - ) - - # PROGRESS INFO - percent = (ch.number / self.total_chapters) * 100 - elapsed = time.time() - self.start_time - avg_time = elapsed / max(ch.number - 1, 1) - remaining = self.total_chapters - ch.number - eta_seconds = max(0, remaining * avg_time) - - eta_min = int(eta_seconds // 60) - eta_sec = int(eta_seconds % 60) - - log_debug( - f"Fetching chapter {ch.number}/{self.total_chapters} " - f"({percent:.2f}%, ETA {eta_min}m {eta_sec}s): " - f"{ch.title}" - ) - - # RETRY EMPTY CONTENT - attempt = 1 - while True: - soup = self.get_doc_with_retry(ch.url) - text = self.parse_chapter_text(soup) - - if text.strip(): - ch.text = text - break - - wait = min(10 + attempt, 30) - log_debug(f"Empty chapter → retry in {wait}s") - time.sleep(wait) - attempt += 1 - - fname.write_text(ch.text, encoding="utf-8") - log_debug(f"Saved chapter to v{volume}: {fname}") - chapter_delay = float(os.getenv("CHAPTER_DELAY", "2")) - log_debug(f"Throttling {chapter_delay}s before next chapter") - time.sleep(chapter_delay) + log_debug(f"[BookScraper] Found {len(self.chapters)} chapters") # ------------------------------------------------------------ - - def parse_chapter_text(self, soup): - body = soup.body - if not body: - return "" - - h1 = body.find("h1") - if not h1: - return "" - - parts = [] - collecting = False - - for sib in h1.next_siblings: - if getattr(sib, "class", None) == ["toplink"]: - continue - if getattr(sib, "class", None) == ["bottomlink"]: - break - if getattr(sib, "name", None) in ["script", "style"]: - continue - - if not collecting: - if getattr(sib, "name", None) == "br": - collecting = True - continue - - text = ( - sib.get_text("\n", strip=True) - if hasattr(sib, "get_text") - else str(sib).strip() - ) - if text: - parts.append(text) - - raw = "\n".join(parts) - raw = clean_text(raw, self.site.replacements) - return raw.strip() + def get_chapter_list(self): + """Return the chapter list (DownloadController reads this).""" + return self.chapters diff --git a/bookscraper/scraper/download_controller.py b/bookscraper/scraper/download_controller.py index 8b54254..42d83f1 100644 --- a/bookscraper/scraper/download_controller.py +++ b/bookscraper/scraper/download_controller.py @@ -1,39 +1,72 @@ # scraper/download_controller.py -from logbus.publisher import log +from celery import group from scraper.tasks.pipeline import build_chapter_pipeline +from logbus.publisher import log +import os class DownloadController: + """Coordinates parallel chapter pipelines, with optional volume splitting.""" - def __init__(self, url: str): - self.url = url - self.scraper = None # door BookScraper gevuld - self.base_path = None + def __init__(self, scrape_result: dict): + self.scrape_result = scrape_result + self.title = scrape_result.get("title", "UnknownBook") + self.chapters = scrape_result.get("chapters", []) - def start(self): - log(f"[DL-CONTROLLER] Parsing metadata for {self.url}") + # Base output dir from .env + root = os.getenv("BOOKSCRAPER_OUTPUT_DIR", "output") + + # Volume size + self.max_vol = int(os.getenv("MAX_VOL_SIZE", "200")) - # 1) Boek info verzamelen - scraper = self.scraper = self._init_scraper() - scraper.parse_book_info() + # Base directory for the whole book + self.book_base = os.path.join(root, self.title) + os.makedirs(self.book_base, exist_ok=True) + + # constant metadata for all chapters + self.meta = { + "title": self.scrape_result.get("title"), + "author": self.scrape_result.get("author"), + "description": self.scrape_result.get("description"), + } + + def get_volume_path(self, chapter_num: int) -> str: + """Returns the correct volume directory based on chapter number.""" + vol_index = (chapter_num - 1) // self.max_vol + 1 + vol_name = f"Volume_{vol_index:03d}" + vol_path = os.path.join(self.book_base, vol_name) + os.makedirs(vol_path, exist_ok=True) + return vol_path + + def start(self): + log(f"[CTRL] Starting download pipeline for {self.title}") + log(f"[CTRL] Chapters: {len(self.chapters)}") + log(f"[CTRL] Output root: {self.book_base}") + log(f"[CTRL] MAX_VOL_SIZE = {self.max_vol}") - # base_path bepalen - self.base_path = scraper.get_base_path() + tasks = [] - # 2) Chapters ophalen - chapters = scraper.get_chapter_list() + for ch in self.chapters: + chapter_num = ch["num"] + chapter_url = ch["url"] - # 3) Per chapter een Celery pipeline starten - for ch in chapters: - log(f"[DL-CONTROLLER] Queue pipeline for chapter {ch.number}") + # compute volume directory + vol_path = self.get_volume_path(chapter_num) - workflow = build_chapter_pipeline( - chapter_number=ch.number, - chapter_url=ch.url, - base_path=self.base_path + # build the pipeline for this chapter + tasks.append( + build_chapter_pipeline( + chapter_num, + chapter_url, + vol_path, # ✔ correct volume path!! + self.meta, # ✔ pass metadata once + ) ) - workflow.delay() # 🔥 dit start de chain + # parallel processing + job_group = group(tasks) + async_result = job_group.apply_async() - return {"status": "queued", "chapters": len(chapters)} + log("[CTRL] Pipelines launched.") + return async_result diff --git a/bookscraper/scraper/models/book_state.py b/bookscraper/scraper/models/book_state.py index e69de29..e0540b6 100644 --- a/bookscraper/scraper/models/book_state.py +++ b/bookscraper/scraper/models/book_state.py @@ -0,0 +1,16 @@ +# scraper/models/book_state.py + + +class Chapter: + """ + Lightweight chapter model used by DownloadController, BookScraper, + and Celery pipelines. + """ + + def __init__(self, number: int, title: str, url: str): + self.number = number + self.title = title + self.url = url + + def __repr__(self): + return f"Chapter(number={self.number}, title={self.title}, url={self.url})" diff --git a/bookscraper/scraper/tasks/controller_tasks.py b/bookscraper/scraper/tasks/controller_tasks.py new file mode 100644 index 0000000..691523e --- /dev/null +++ b/bookscraper/scraper/tasks/controller_tasks.py @@ -0,0 +1,21 @@ +# scraper/tasks/controller_tasks.py + +from celery_app import celery_app +from logbus.publisher import log +from scraper.download_controller import DownloadController + +print(">>> [IMPORT] controller_tasks.py loaded") + + +@celery_app.task(bind=True, queue="controller", ignore_result=False) +def launch_downloads(self, scrape_result: dict): + """Start complete download → parse → save pipeline.""" + + log("[CTRL] Launching DownloadController...") + + ctl = DownloadController(scrape_result) + async_result = ctl.start() + + log("[CTRL] Pipelines dispatched.") + + return {"pipelines_started": len(scrape_result.get("chapters", []))} diff --git a/bookscraper/scraper/tasks/download_tasks.py b/bookscraper/scraper/tasks/download_tasks.py index a8e5bad..c6b6202 100644 --- a/bookscraper/scraper/tasks/download_tasks.py +++ b/bookscraper/scraper/tasks/download_tasks.py @@ -1,33 +1,33 @@ # scraper/tasks/download_tasks.py - -from celery import shared_task +from celery_app import celery_app from logbus.publisher import log import requests +print(">>> [IMPORT] download_tasks.py loaded") -@shared_task(bind=True, queue="download", ignore_result=False) -def download_chapter(self, chapter_number: int, chapter_url: str): - """ - Download a chapter page and return raw HTML for parsing. - Does NOT save anything; that is done by save_tasks.py - """ - log(f"[DL] Downloading chapter {chapter_number}: {chapter_url}") +@celery_app.task(bind=True, queue="download", ignore_result=False) +def download_chapter(self, chapter_num: int, chapter_url: str): + log(f"[DL] Downloading chapter {chapter_num}: {chapter_url}") try: - resp = requests.get(chapter_url, timeout=15) + resp = requests.get( + chapter_url, + headers={"User-Agent": "Mozilla/5.0"}, + timeout=20, + ) resp.raise_for_status() - html = resp.text - log(f"[DL] OK {chapter_number}: {len(html)} bytes") + resp.encoding = resp.apparent_encoding or "gb2312" + html = resp.text + log(f"[DL] OK {chapter_num}: {len(html)} bytes") - # Dit resultaat wordt doorgegeven aan parse_task return { - "chapter": chapter_number, + "chapter": chapter_num, "url": chapter_url, "html": html, } except Exception as exc: - log(f"[DL] ERROR downloading {chapter_url}: {exc}") + log(f"[DL] ERROR {chapter_url}: {exc}") raise diff --git a/bookscraper/scraper/tasks/parse_tasks.py b/bookscraper/scraper/tasks/parse_tasks.py index 03a01a8..83a25c3 100644 --- a/bookscraper/scraper/tasks/parse_tasks.py +++ b/bookscraper/scraper/tasks/parse_tasks.py @@ -1,57 +1,79 @@ # scraper/tasks/parse_tasks.py -from celery import shared_task +from celery_app import celery_app from logbus.publisher import log -from scraper.utils import clean_text from bs4 import BeautifulSoup +from scraper.utils import clean_text, load_replacements +print(">>> [IMPORT] parse_tasks.py loaded") -@shared_task(bind=True, queue="parse", ignore_result=False) -def parse_chapter(self, html: str, chapter_url: str): + +@celery_app.task(bind=True, queue="parse", ignore_result=False) +def parse_chapter(self, download_result: dict, meta: dict): """ - Parse downloaded chapter HTML into clean text. - Returns a dict: - { - "url": chapter_url, - "text": "...parsed text..." - } + download_result: + { + "chapter": int, + "url": str, + "html": str + } + + meta: + { + "title": str, + "author": str, + "description": str + } """ - try: - log(f"[PARSE] Start parsing: {chapter_url}") - - soup = BeautifulSoup(html, "html.parser") - - # Veel Chinese sites gebruiken dit soort containers: - possible_blocks = [ - "#content", - ".content", - "div#content", - "div.content", - "div#chaptercontent", - "#chapterContent" - ] - - node = None - for sel in possible_blocks: - r = soup.select_one(sel) - if r: - node = r - break - - if not node: - log( - f"[PARSE] WARNING: no known content block found in {chapter_url}") - text = clean_text(soup.get_text()) - else: - text = clean_text(node.get_text()) - - log(f"[PARSE] Finished parsing: {chapter_url} ({len(text)} chars)") - - return { - "url": chapter_url, - "text": text, - } - - except Exception as exc: - log(f"[PARSE] ERROR parsing {chapter_url}: {exc}") - raise + + chapter_num = download_result["chapter"] + url = download_result["url"] + html = download_result["html"] + + log(f"[PARSE] Parsing chapter {chapter_num}") + + soup = BeautifulSoup(html, "lxml") + + selectors = [ + "#content", + ".content", + "div#content", + "div.content", + "div#chaptercontent", + "#chapterContent", + ".read-content", + ] + + node = None + for sel in selectors: + tmp = soup.select_one(sel) + if tmp: + node = tmp + break + + raw = node.get_text() if node else soup.get_text() + + # replacements + REPL = load_replacements() + text = clean_text(raw, REPL) + + # --------------------------------------------------- + # HEADER ONLY FOR CHAPTER 1 + # --------------------------------------------------- + if chapter_num == 1: + header = ( + f"{meta.get('title','')}\n" + f"Author: {meta.get('author','')}\n" + f"Description:\n{meta.get('description','')}\n" + f"URL: {url}\n" + "-" * 50 + "\n\n" + ) + text = header + text + + log(f"[PARSE] Parsed chapter {chapter_num}: {len(text)} chars") + + return { + "chapter": chapter_num, + "url": url, + "text": text, + "length": len(text), + } diff --git a/bookscraper/scraper/tasks/pipeline.py b/bookscraper/scraper/tasks/pipeline.py index 3a3c442..93e98b0 100644 --- a/bookscraper/scraper/tasks/pipeline.py +++ b/bookscraper/scraper/tasks/pipeline.py @@ -1,28 +1,21 @@ # scraper/tasks/pipeline.py from celery import chain -from logbus.publisher import log - from scraper.tasks.download_tasks import download_chapter from scraper.tasks.parse_tasks import parse_chapter from scraper.tasks.save_tasks import save_chapter -def build_chapter_pipeline(chapter_number: int, chapter_url: str, base_path: str): +def build_chapter_pipeline( + chapter_number: int, chapter_url: str, base_path: str, meta: dict +): """ - Build a Celery pipeline for a single chapter: - download -> parse -> save + Build a download → parse → save pipeline for one chapter. + meta bevat: + title, author, description """ - - log(f"[PIPELINE] Building chain for chapter {chapter_number}") - - # Important: download returns dict {chapter, url, html} - # parse accepts html + chapter_url (via s()) - # save accepts chapter_number, text, base_path - workflow = chain( + return chain( download_chapter.s(chapter_number, chapter_url), - parse_chapter.s(), # takes previous result dict - save_chapter.s(base_path=base_path) + parse_chapter.s(meta), # ← METADATA DOORGEVEN + save_chapter.s(base_path), ) - - return workflow diff --git a/bookscraper/scraper/tasks/save_tasks.py b/bookscraper/scraper/tasks/save_tasks.py index cce44cf..3faebc9 100644 --- a/bookscraper/scraper/tasks/save_tasks.py +++ b/bookscraper/scraper/tasks/save_tasks.py @@ -1,4 +1,5 @@ # scraper/tasks/save_tasks.py +print(">>> [IMPORT] save_tasks.py loaded") from celery import shared_task from logbus.publisher import log @@ -6,26 +7,18 @@ import os @shared_task(bind=True, queue="save", ignore_result=False) -def save_chapter(self, result: dict, base_path: str): - """ - Save parsed chapter text to disk. - result = { - "url": ..., - "text": ... - } - """ +def save_chapter(self, parsed: dict, base_path: str): + print(f">>> [save_tasks] save_chapter() CALLED for chapter {parsed.get('chapter')}") try: - text = result.get("text", "") - url = result.get("url") + chapter_number = parsed.get("chapter") + url = parsed.get("url") + text = parsed.get("text", "") - # Haal chapter nummer uit URL - # Bijvoorbeeld: .../12345.html - # ⇒ 12345 - chapter_number = extract_chapter_number(url) + if not chapter_number: + raise ValueError("Missing chapter_number in parsed payload") - if not os.path.exists(base_path): - os.makedirs(base_path, exist_ok=True) + os.makedirs(base_path, exist_ok=True) filename = f"{chapter_number:05d}.txt" path = os.path.join(base_path, filename) @@ -34,24 +27,11 @@ def save_chapter(self, result: dict, base_path: str): f.write(text) log(f"[SAVE] Saved chapter {chapter_number} → {path}") + print(f">>> [save_tasks] SAVED {path}") return {"chapter": chapter_number, "path": path} except Exception as exc: log(f"[SAVE] ERROR saving chapter from {url}: {exc}") + print(f">>> [save_tasks] ERROR: {exc}") raise - - -def extract_chapter_number(url: str) -> int: - """ - Utility extractor for chapter numbers from a URL. - Example: https://site.com/1234.html → 1234 - """ - try: - import re - m = re.search(r'(\d+)\.html?', url) - if m: - return int(m.group(1)) - except: - pass - return 0 diff --git a/bookscraper/scraper/tasks/scraping.py b/bookscraper/scraper/tasks/scraping.py index 2d435f8..bbe1d18 100644 --- a/bookscraper/scraper/tasks/scraping.py +++ b/bookscraper/scraper/tasks/scraping.py @@ -1,35 +1,52 @@ -from celery import shared_task -from scraper.book_scraper import BookScraper -from scraper.sites import BookSite +# scraper/tasks/scraping.py +# +from celery_app import celery_app from logbus.publisher import log +import os + +from scraper.sites import BookSite +from scraper.book_scraper import BookScraper +from scraper.tasks.controller_tasks import launch_downloads + +print(">>> [IMPORT] scraping.py loaded") + + +@celery_app.task(bind=True, queue="scraping", ignore_result=False) +def start_scrape_book(self, url: str): + """Scrapes metadata + chapter list.""" + + log(f"[SCRAPING] Start scraping for: {url}") + site = BookSite() + scraper = BookScraper(site, url) + scraper.parse_book_info() -@shared_task(bind=True, queue="scraping") -def scrape_book(self, url): - """ - HIGH-LEVEL SCRAPER TASK - Roept synchronen BookScraper aan voor een volledige scrape. - """ - log(f"[SCRAPER] Start scrape: {url}") + chapters = scraper.get_chapter_list() + full_count = len(chapters) - scraper = BookScraper(BookSite(), url) - result = scraper.execute() + DRY_RUN = os.getenv("DRY_RUN", "0") == "1" + TEST_LIMIT = int(os.getenv("TEST_LIMIT", "5")) - log(f"[SCRAPER] Finished scrape: {url}") - return {"title": result["title"]} + if DRY_RUN: + log(f"[SCRAPING] DRY_RUN: limiting chapters to first {TEST_LIMIT}") + chapters = chapters[:TEST_LIMIT] + result = { + "title": scraper.book_title, + "author": scraper.book_author, + "description": scraper.book_description, + "cover": scraper.cover_url, + "chapters": [ + {"num": ch.number, "title": ch.title, "url": ch.url} for ch in chapters + ], + } -@shared_task(bind=True, queue="download", max_retries=5) -def download_chapter_task(self, number, title, url, output_base): - """ - Download alleen één chapter. - download_worker.py voert dit uiteindelijk uit. - """ - from worker.download_worker import download_single_chapter + log(f"[SCRAPING] Completed scrape: {len(chapters)}/{full_count} chapters") - try: - return download_single_chapter(number, title, url, output_base) + celery_app.send_task( + "scraper.tasks.controller_tasks.launch_downloads", + args=[result], + queue="controller", + ) - except Exception as e: - log(f"[DOWNLOAD] Error while downloading chapter {number}: {e}") - raise self.retry(countdown=3) + return result diff --git a/bookscraper/scraper/tasks/utils.py b/bookscraper/scraper/tasks/utils.py index e69de29..03c9b9f 100644 --- a/bookscraper/scraper/tasks/utils.py +++ b/bookscraper/scraper/tasks/utils.py @@ -0,0 +1,57 @@ +# scraper/utils.py + +import re +import os +from pathlib import Path + + +# ------------------------------------------------------------ +# Load replacements from text_replacements.txt (optional file) +# ------------------------------------------------------------ +def load_replacements(filepath="text_replacements.txt") -> dict: + """ + Load key=value style replacements. + Empty or missing file → return {}. + """ + path = Path(filepath) + + if not path.exists(): + return {} + + repl = {} + + with open(path, "r", encoding="utf-8") as f: + for line in f: + line = line.strip() + if "=" in line: + key, val = line.split("=", 1) + repl[key.strip()] = val.strip() + + return repl + + +# ------------------------------------------------------------ +# Clean extracted HTML text +# ------------------------------------------------------------ +def clean_text(raw: str, repl_dict: dict = None) -> str: + """ + Normalizes whitespace, removes junk, and applies replacements. + repl_dict is optional → falls back to {}. + """ + if repl_dict is None: + repl_dict = {} + + txt = raw + + # Normalize CRLF + txt = txt.replace("\r", "") + + # Collapse multiple blank lines + txt = re.sub(r"\n{3,}", "\n\n", txt) + + # Apply replacements + for key, val in repl_dict.items(): + txt = txt.replace(key, val) + + # Strip excessive whitespace at edges + return txt.strip() diff --git a/bookscraper/scraper/utils.py b/bookscraper/scraper/utils.py index dfe9c2a..6fa27e6 100644 --- a/bookscraper/scraper/utils.py +++ b/bookscraper/scraper/utils.py @@ -1,22 +1,36 @@ -import os - # scraper/utils.py +from pathlib import Path + +def load_replacements(path="text_replacements.txt") -> dict: + """ + Load key=value replacements from a simple text file. + Lines beginning with # are ignored. + """ + fp = Path(path) + if not fp.exists(): + return {} -def load_replacements(path): repl = {} - if not path or not os.path.exists(path): - return repl - - with open(path, encoding="utf-8") as f: - for line in f: - if "=>" in line: - k, v = line.strip().split("=>", 1) - repl[k.strip()] = v.strip() + for line in fp.read_text(encoding="utf-8").splitlines(): + line = line.strip() + if not line or line.startswith("#"): + continue + + if "=" in line: + k, v = line.split("=", 1) + repl[k.strip()] = v.strip() + return repl -def clean_text(text, repl_dict): - for src, tgt in repl_dict.items(): - text = text.replace(src, tgt) - return text +def clean_text(raw: str, repl_dict: dict) -> str: + """ + Cleans text using user-defined replacements. + """ + txt = raw + + for k, v in repl_dict.items(): + txt = txt.replace(k, v) + + return txt.strip() diff --git a/bookscraper/templates/result.html b/bookscraper/templates/result.html index bbf4082..f3d2c5f 100644 --- a/bookscraper/templates/result.html +++ b/bookscraper/templates/result.html @@ -1,63 +1,85 @@ - - - Scrape Resultaat + + + Scrape & Download Resultaat - - + + + ← Terug -← Terug + {% if error %} +
Fout:
{{ error }}
+ {% endif %} -{% if error %} -
- Fout:
{{ error }} -
-{% endif %} +

Scrape Resultaat

-

Scrape resultaat

+ {% if book %} -{% if result %} -
- Titel: {{ result.title }}
- Auteur: {{ result.author }}
-
+
+ Titel: {{ book.title }}
+ Auteur: {{ book.author }}
+
-{% if result.description %} -
- Beschrijving:
-

{{ result.description }}

-
-{% endif %} + {% if book.description %} +
+ Beschrijving:
+

{{ book.description }}

+
+ {% endif %} -
- Aantal chapters: {{ result.chapters|length }} -
+
+ Aantal chapters: {{ book.chapters|length }} +
-{% if result.chapters %} -
- Chapters:

- +
+ {% endif %} {% if download_job_id %} +
+ Download pipeline gestart!
+ Job ID: {{ download_job_id }} +
+ {% endif %} {% endif %} +