refactor complete celery multithreaded downloads

celery-integration
peter.fong 2 weeks ago
parent 8e2d3cec49
commit cbfcce62cc

@ -1,22 +1,20 @@
# ============================================
# File: bookscraper/app.py
# Ensure project directory is on PYTHONPATH
# ============================================
from scraper.logger import log_debug
from scraper.download_controller import DownloadController
from flask import Flask, render_template, request
from dotenv import load_dotenv
import sys
from pathlib import Path
# Add this directory (bookscraper/) to Python import path
PROJECT_ROOT = Path(__file__).resolve().parent
sys.path.insert(0, str(PROJECT_ROOT))
# Load .env BEFORE any Celery app is imported
load_dotenv()
print(">>> [WEB] Importing celery_app …")
from celery_app import celery_app # <<< MOET BOVEN TASK IMPORTS
from scraper.logger import log_debug
from flask import Flask, render_template, request
# Task imports komen pas na celery_app:
print(">>> [WEB] Importing tasks …")
from scraper.tasks.scraping import start_scrape_book
app = Flask(__name__)
@ -33,20 +31,25 @@ def start_scraping():
if not url:
return render_template("result.html", error="Geen URL opgegeven.")
try:
log_debug(f"[WEB] Start scraping: {url}")
ctl = DownloadController(url)
result = ctl.start()
log_debug(f"[WEB] Scrape request for: {url}")
return render_template("result.html", result=result, url=url)
# Belangrijk: start_scrape_book komt uit DEZELFDE celery_app nu
result = start_scrape_book.delay(url)
except Exception as e:
log_debug(f"[WEB] ERROR: {e}")
return render_template("result.html", error=str(e), url=url)
return render_template(
"result.html",
message="Scraping gestart.",
task_id=result.id,
url=url,
)
if __name__ == "__main__":
import os
debug = os.getenv("FLASK_DEBUG", "0") == "1"
app.run(host="0.0.0.0", port=5000, debug=debug)
host = os.getenv("HOST", "0.0.0.0")
port = int(os.getenv("PORT", "5000"))
log_debug(f"[WEB] Starting Flask server on {host}:{port}, debug={debug}")
app.run(host=host, port=port, debug=debug)

@ -1,45 +1,47 @@
# ============================================
# File: bookscraper/celery_app.py
# ============================================
# celery_app.py
import os
from celery import Celery
from dotenv import load_dotenv
# Load environment variables (OK to do here)
print(">>> [celery_app] Loading .env BEFORE initializing Celery...")
load_dotenv()
print(">>> DEBUG: celery_app.py LOADED")
print(">>> DEBUG: env REDIS_BROKER =", os.getenv("REDIS_BROKER"))
print(">>> DEBUG: env REDIS_URL =", os.getenv("REDIS_URL"))
# Read broker settings
REDIS_BROKER = os.getenv("REDIS_BROKER")
REDIS_BACKEND = os.getenv("REDIS_BACKEND")
BROKER = os.getenv("REDIS_BROKER")
BACKEND = os.getenv("REDIS_BACKEND")
# Fallback ONLY if missing
if not REDIS_BROKER:
REDIS_BROKER = os.getenv(
"REDIS_URL", "redis://host.docker.internal:6379/0"
)
print(">>> [celery_app] BROKER =", BROKER)
print(">>> [celery_app] BACKEND =", BACKEND)
if not REDIS_BACKEND:
REDIS_BACKEND = REDIS_BROKER # safe fallback
# Create Celery app AFTER loading .env
celery_app = Celery(
"bookscraper",
broker=REDIS_BROKER,
backend=REDIS_BACKEND,
broker=BROKER,
backend=BACKEND,
include=[
"scraper.tasks.scraping",
"scraper.tasks.controller_tasks",
"scraper.tasks.download_tasks",
"scraper.tasks.parse_tasks",
"scraper.tasks.save_tasks",
],
)
celery_app.conf.update(
task_default_queue="default",
task_routes={
"tasks.scraping.*": {"queue": "scraping"},
"tasks.audio.*": {"queue": "audio"},
"tasks.*": {"queue": "default"},
},
worker_prefetch_multiplier=1,
task_acks_late=True,
)
celery_app.conf.task_routes = {
"scraper.tasks.scraping.*": {"queue": "scraping"},
"scraper.tasks.controller_tasks.*": {"queue": "controller"},
"scraper.tasks.download_tasks.*": {"queue": "download"},
"scraper.tasks.parse_tasks.*": {"queue": "parse"},
"scraper.tasks.save_tasks.*": {"queue": "save"},
}
# ------------------------------------------------------------
# EXTRA DEBUG: test import of included modules
# ------------------------------------------------------------
print(">>> [celery_app] Testing imports for included task modules...")
for module in celery_app.conf.include:
try:
__import__(module)
print(f">>> [celery_app] OK import {module}")
except Exception as e:
print(f">>> [celery_app] FAILED import {module}: {e}")
print(">>> [celery_app] Celery initialization complete.")

@ -1,87 +1,152 @@
version: "3.9"
services:
# ----------------------------------------------------------
# Redis broker & backend
# ----------------------------------------------------------
redis:
image: redis:7
container_name: bookscraper_redis
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 2s
timeout: 2s
retries: 20
restart: "no"
# ----------------------------------------------------------
# Controller Worker
# ----------------------------------------------------------
worker_controller:
build:
context: .
dockerfile: docker/Dockerfile.scraper
container_name: worker_controller
command: celery -A celery_app worker -Q controller -n controller@%h -l INFO
depends_on:
redis:
condition: service_healthy
env_file:
- .env
environment:
- PYTHONUNBUFFERED=1
volumes:
- .:/app
- /Users/peter/Desktop/books:/app/output
restart: "no"
# -------------------------
# WEB UI
# -------------------------
# ----------------------------------------------------------
# Web GUI
# ----------------------------------------------------------
web:
build:
context: .
dockerfile: Dockerfile
container_name: bookscraper
ports:
- "5050:5000"
dockerfile: docker/Dockerfile.web
container_name: bookscraper_web
volumes:
- .:/app
- /Users/peter/Desktop/books:/app/output
depends_on:
redis:
condition: service_healthy
ports:
- "5011:5000"
environment:
- REDIS_BROKER=redis://redis:6379/0
- REDIS_BACKEND=redis://redis:6379/1
env_file:
- .env
restart: "no"
environment:
FLASK_ENV: "production"
restart: unless-stopped
# ----------------------------------------------------------
# Download Worker
# ----------------------------------------------------------
worker_download:
build:
context: .
dockerfile: docker/Dockerfile.scraper
container_name: worker_download
volumes:
- .:/app
- /Users/peter/Desktop/books:/app/output
depends_on:
- redis
redis:
condition: service_healthy
env_file:
- .env
command: celery -A celery_app worker -Q download -n download@%h -l INFO
restart: "no"
# -------------------------
# SCRAPING WORKER
# (1 concurrency, 1 job tegelijk)
# -------------------------
scraper_worker:
# ----------------------------------------------------------
# Parse Worker
# ----------------------------------------------------------
worker_parse:
build:
context: .
dockerfile: Dockerfile
container_name: scraper_worker
command: python worker/scrape_worker.py
dockerfile: docker/Dockerfile.scraper
container_name: worker_parse
volumes:
- .:/app
- /Users/peter/Desktop/books:/app/output
depends_on:
redis:
condition: service_healthy
env_file:
- .env
command: celery -A celery_app worker -Q parse -n parse@%h -l INFO
restart: "no"
restart: unless-stopped
# ----------------------------------------------------------
# Save Worker
# ----------------------------------------------------------
worker_save:
build:
context: .
dockerfile: docker/Dockerfile.scraper
container_name: worker_save
volumes:
- .:/app
- /Users/peter/Desktop/books:/app/output
depends_on:
- redis
redis:
condition: service_healthy
env_file:
- .env
command: celery -A celery_app worker -Q save -n save@%h -l INFO
restart: "no"
# -------------------------
# AUDIO WORKER
# -------------------------
audio_worker:
# ----------------------------------------------------------
# Audio Worker (macOS only)
# ----------------------------------------------------------
worker_audio:
build:
context: .
dockerfile: Dockerfile
container_name: audio_worker
command: python worker/audio_worker.py
dockerfile: docker/Dockerfile.audio
container_name: worker_audio
volumes:
- .:/app
- /Users/peter/Desktop/books:/app/output
depends_on:
redis:
condition: service_healthy
env_file:
- .env
command: celery -A celery_app worker -Q audio -n audio@%h -l INFO
restart: "no"
restart: unless-stopped
# ----------------------------------------------------------
# Scraping Worker
# ----------------------------------------------------------
worker_scraping:
build:
context: .
dockerfile: docker/Dockerfile.scraper
container_name: worker_scraping
volumes:
- .:/app
- /Users/peter/Desktop/books:/app/output
depends_on:
- redis
# -------------------------
# REDIS (LOGS & QUEUE)
# -------------------------
redis:
image: redis:alpine
container_name: redis
ports:
- "6379:6379"
restart: unless-stopped
redis:
condition: service_healthy
env_file:
- .env
command: celery -A celery_app worker -Q scraping -n scraping@%h -l INFO
restart: "no"

@ -1,17 +1,9 @@
# docker/Dockerfile.audio
FROM python:3.11-slim
FROM python:3.12-slim
WORKDIR /app
# Install audio processing dependencies (extend later)
RUN apt-get update && apt-get install -y --no-install-recommends \
ffmpeg \
libavcodec-extra \
&& rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY requirements.audio.txt /app/requirements.audio.txt
RUN pip install --no-cache-dir -r /app/requirements.audio.txt
COPY . .
COPY . /app
CMD ["python", "worker/audio_worker.py"]
CMD ["python3", "-c", "print('audio worker ready')"]

@ -1,17 +1,15 @@
# docker/Dockerfile.scraper
FROM python:3.11-slim
FROM python:3.12-slim
WORKDIR /app
# Scraper-specific system dependencies
RUN apt-get update && apt-get install -y --no-install-recommends \
libjpeg62-turbo-dev \
zlib1g-dev \
RUN apt-get update && apt-get install -y \
build-essential \
libxml2-dev \
libxslt1-dev \
&& rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY requirements.scraper.txt /app/requirements.scraper.txt
RUN pip install --no-cache-dir -r /app/requirements.scraper.txt
COPY . .
COPY . /app
CMD ["python", "worker/scrape_worker.py"]
CMD ["python3", "-c", "print('scraper worker ready')"]

@ -1,16 +1,17 @@
# docker/Dockerfile.web
FROM python:3.11-slim
WORKDIR /app
# Install only Python deps
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy full requirements for both Flask + Celery + BookScraper
COPY requirements.txt /app/requirements.txt
RUN pip install --no-cache-dir -r /app/requirements.txt
# Copy the entire app
COPY . .
# Copy entire application (including .env so load_dotenv works)
COPY . /app
# Ensure Celery + BookScraper modules load correctly
ENV PYTHONPATH=/app
# Flask runs on port 5000
EXPOSE 5000
CMD ["python", "app.py"]

@ -0,0 +1,7 @@
requests
beautifulsoup4
lxml
pillow
redis
celery[redis]
python-dotenv

@ -0,0 +1,7 @@
requests
beautifulsoup4
lxml
pillow
redis
celery[redis]
python-dotenv

@ -3,3 +3,6 @@ requests
beautifulsoup4
lxml
pillow
redis
celery
python-dotenv

@ -0,0 +1,8 @@
flask
requests
beautifulsoup4
lxml
pillow
redis
celery
python-dotenv

@ -1,25 +1,20 @@
# scraper/book_scraper.py
import requests
import os
import time
from pathlib import Path
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
from PIL import Image
from io import BytesIO
from scraper.logger import log_debug
from scraper.utils import clean_text, load_replacements
class Chapter:
def __init__(self, num, title, url):
self.number = num
self.title = title
self.url = url
self.text = ""
from scraper.models.book_state import Chapter
class BookScraper:
"""
Lightweight scraper: only metadata + chapter list.
All downloading/parsing/saving is handled by Celery tasks.
"""
def __init__(self, site, url):
self.site = site
self.url = url
@ -30,142 +25,59 @@ class BookScraper:
self.cover_url = ""
self.chapters = []
self.base_path = None
self.chapter_base = None
# ENV
self.DRY_RUN = os.getenv("DRY_RUN", "1") == "1"
self.TEST_LIMIT = int(os.getenv("TEST_LIMIT", "10"))
self.MAX_DL = float(os.getenv("MAX_DOWNLOADS_PER_SEC", "1"))
self.min_delay = 1.0 / self.MAX_DL if self.MAX_DL > 0 else 1.0
self._last_download_time = 0
# replacements.txt
fp = os.path.join(os.getcwd(), "replacements.txt")
extra = load_replacements(fp)
# Load custom replacements
extra = load_replacements("replacements.txt")
self.site.replacements.update(extra)
self.start_time = None
self.total_chapters = 0
self.volume_dirs = {}
# ------------------------------------------------------------
# RATE LIMITER
# ------------------------------------------------------------
def throttle(self):
now = time.time()
elapsed = now - self._last_download_time
if elapsed < self.min_delay:
time.sleep(self.min_delay - elapsed)
self._last_download_time = time.time()
# ------------------------------------------------------------
def execute(self):
log_debug(f"Starting scraper for {self.url}")
self.start_time = time.time()
def parse_book_info(self):
"""Parse title, author, description, cover from the main page."""
soup = self._fetch(self.url)
soup = self.get_doc_with_retry(self.url)
self.parse_title(soup)
self.parse_author(soup)
self.parse_description(soup)
self.parse_cover(soup)
self.prepare_output_folder()
self._parse_title(soup)
self._parse_author(soup)
self._parse_description(soup)
self._parse_cover(soup)
# Parse chapter list page + chapter links
chapter_page = self.get_chapter_page(soup)
self.parse_chapter_links(chapter_page)
self.prepare_volume_folders()
if self.DRY_RUN:
self.download_some(self.TEST_LIMIT)
else:
self.download_all()
return {"title": self.book_title}
# ------------------------------------------------------------
# HTTP GET WITH RETRIES + HARD 429 COOLDOWN WITH COUNTDOWN
# ------------------------------------------------------------
def get_doc_with_retry(self, url):
attempt = 1
while True:
self.throttle()
log_debug(f"GET {url} (attempt {attempt})")
try:
resp = requests.get(
url,
headers={"User-Agent": "Mozilla/5.0"},
timeout=10,
)
except Exception as e:
log_debug(f"Network error {e} → retry in {attempt + 1}s")
time.sleep(attempt + 1)
attempt += 1
continue
code = resp.status_code
log_debug(f"HTTP {code} for {url}")
# 429 → hard cooldown with countdown
if code == 429:
cooldown = 60
log_debug(f"429 detected — cooldown {cooldown}s")
for i in range(cooldown, 0, -1):
log_debug(f"429 cooldown… {i}s remaining")
time.sleep(1)
attempt += 1
continue
# recoverable
if code in (403, 500):
wait = min(5 * attempt, 30)
log_debug(f"HTTP {code} → retry in {wait}s")
time.sleep(wait)
attempt += 1
continue
if code == 200:
resp.encoding = self.site.encoding
return BeautifulSoup(resp.text, "lxml")
# unexpected
wait = attempt + 1
log_debug(f"Unexpected HTTP {code} → sleep {wait}s")
time.sleep(wait)
attempt += 1
def _fetch(self, url):
"""Simple fetch (no retry), DownloadController handles errors."""
log_debug(f"[BookScraper] Fetch: {url}")
resp = requests.get(url, headers={"User-Agent": "Mozilla/5.0"}, timeout=10)
resp.encoding = self.site.encoding
return BeautifulSoup(resp.text, "lxml")
# ------------------------------------------------------------
def parse_title(self, soup):
def _parse_title(self, soup):
h1 = soup.find("h1")
self.book_title = h1.get_text(strip=True) if h1 else "UnknownTitle"
log_debug(f"Book title = {self.book_title}")
log_debug(f"[BookScraper] Title = {self.book_title}")
def parse_author(self, soup):
def _parse_author(self, soup):
td = soup.find("td", string=lambda t: t and "" in t)
self.book_author = (
td.get_text(strip=True).split("")[1]
if td and "" in td.get_text()
else "UnknownAuthor"
)
log_debug(f"Book author = {self.book_author}")
raw = td.get_text(strip=True) if td else ""
self.book_author = raw.split("")[1] if "" in raw else "UnknownAuthor"
log_debug(f"[BookScraper] Author = {self.book_author}")
def parse_description(self, soup):
def _parse_description(self, soup):
span = soup.find("span", string=lambda t: t and "内容简介" in t)
if not span:
log_debug("No description found")
self.book_description = ""
log_debug("[BookScraper] Description not found")
return
parts = []
for sib in span.next_siblings:
# Stop when next book section begins
if getattr(sib, "name", None) == "span":
break
text = (
sib.get_text(strip=True)
if hasattr(sib, "get_text")
@ -175,52 +87,23 @@ class BookScraper:
parts.append(text)
self.book_description = "\n".join(parts)
log_debug(f"Description length = {len(self.book_description)}")
log_debug(
f"[BookScraper] Description length = {len(self.book_description)} characters"
)
# ------------------------------------------------------------
def parse_cover(self, soup):
cover = soup.find(
"img", src=lambda v: v and "files/article/image" in v)
def _parse_cover(self, soup):
cover = soup.find("img", src=lambda v: v and "files/article/image" in v)
if not cover:
log_debug("Cover not found")
log_debug("[BookScraper] No cover found")
return
self.cover_url = urljoin(self.site.root, cover.get("src"))
log_debug(f"Cover URL = {self.cover_url}")
# ------------------------------------------------------------
def prepare_output_folder(self):
self.base_path = Path("output") / self.book_title / self.site.name
self.base_path.mkdir(parents=True, exist_ok=True)
if self.cover_url:
self.download_cover()
def download_cover(self):
log_debug(f"Downloading cover: {self.cover_url}")
resp = requests.get(
self.cover_url,
headers={"User-Agent": "Mozilla/5.0"},
timeout=10,
)
if resp.status_code != 200:
return
if "html" in resp.headers.get("Content-Type", ""):
return
try:
img = Image.open(BytesIO(resp.content))
except:
return
img.save(self.base_path / "cover.jpg")
log_debug("Cover saved")
log_debug(f"[BookScraper] Cover URL = {self.cover_url}")
# ------------------------------------------------------------
def get_chapter_page(self, soup):
"""Return BeautifulSoup of the main chapter list page."""
node = soup.select_one(
"html > body > div:nth-of-type(6) > div:nth-of-type(2) > div > table"
)
@ -231,7 +114,7 @@ class BookScraper:
bp = parsed.path.rsplit("/", 1)[0] + "/"
self.chapter_base = f"{parsed.scheme}://{parsed.netloc}{bp}"
return self.get_doc_with_retry(url)
return self._fetch(url)
# ------------------------------------------------------------
def parse_chapter_links(self, soup):
@ -240,152 +123,21 @@ class BookScraper:
self.chapters = []
idx = 1
for a in items:
href = a.get("href")
if not href.endswith(".html"):
continue
title = a.get_text(strip=True)
full = urljoin(self.chapter_base, href)
self.chapters.append(Chapter(idx, title, full))
idx += 1
self.total_chapters = len(self.chapters)
log_debug(f"Found {self.total_chapters} chapters")
# ------------------------------------------------------------
def prepare_volume_folders(self):
max_size = int(os.getenv("MAX_VOL_SIZE", "200"))
num_vols = (self.total_chapters + max_size - 1) // max_size
for v in range(1, num_vols + 1):
d = self.base_path / f"v{v}"
d.mkdir(parents=True, exist_ok=True)
self.volume_dirs[v] = d
# ------------------------------------------------------------
def download_all(self):
for ch in self.chapters:
self.download_chapter(ch)
def download_some(self, limit):
for ch in self.chapters[:limit]:
self.download_chapter(ch)
# ------------------------------------------------------------
def download_chapter(self, ch):
# Determine volume + filename
max_size = int(os.getenv("MAX_VOL_SIZE", "200"))
volume = ((ch.number - 1) // max_size) + 1
vdir = self.volume_dirs.get(volume, self.base_path)
expected_name = f"{ch.number:05d}_{ch.title}.txt"
fname = vdir / expected_name
expected_full_path = str(fname.resolve())
# STRICT SKIP CHECK
if fname.exists() and fname.is_file():
actual_size = fname.stat().st_size
# correct name?
if fname.name == expected_name:
expected_dir = str(vdir.resolve())
actual_dir = str(fname.parent.resolve())
if expected_dir == actual_dir:
if actual_size > 300:
log_debug(
f"Skip chapter {ch.number}/{self.total_chapters}: already exists\n"
f" Path: {expected_full_path}\n"
f" Size: {actual_size} bytes"
)
return
else:
log_debug(
f"Existing file too small ({actual_size} bytes), redownloading: {expected_full_path}"
)
else:
log_debug(
f"Directory mismatch for chapter {ch.number}, redownloading"
)
else:
log_debug(
f"Filename mismatch for chapter {ch.number}, redownloading\n"
f" Expected: {expected_name}\n"
f" Found: {fname.name}"
)
# PROGRESS INFO
percent = (ch.number / self.total_chapters) * 100
elapsed = time.time() - self.start_time
avg_time = elapsed / max(ch.number - 1, 1)
remaining = self.total_chapters - ch.number
eta_seconds = max(0, remaining * avg_time)
eta_min = int(eta_seconds // 60)
eta_sec = int(eta_seconds % 60)
log_debug(
f"Fetching chapter {ch.number}/{self.total_chapters} "
f"({percent:.2f}%, ETA {eta_min}m {eta_sec}s): "
f"{ch.title}"
)
# RETRY EMPTY CONTENT
attempt = 1
while True:
soup = self.get_doc_with_retry(ch.url)
text = self.parse_chapter_text(soup)
if text.strip():
ch.text = text
break
wait = min(10 + attempt, 30)
log_debug(f"Empty chapter → retry in {wait}s")
time.sleep(wait)
attempt += 1
fname.write_text(ch.text, encoding="utf-8")
log_debug(f"Saved chapter to v{volume}: {fname}")
chapter_delay = float(os.getenv("CHAPTER_DELAY", "2"))
log_debug(f"Throttling {chapter_delay}s before next chapter")
time.sleep(chapter_delay)
log_debug(f"[BookScraper] Found {len(self.chapters)} chapters")
# ------------------------------------------------------------
def parse_chapter_text(self, soup):
body = soup.body
if not body:
return ""
h1 = body.find("h1")
if not h1:
return ""
parts = []
collecting = False
for sib in h1.next_siblings:
if getattr(sib, "class", None) == ["toplink"]:
continue
if getattr(sib, "class", None) == ["bottomlink"]:
break
if getattr(sib, "name", None) in ["script", "style"]:
continue
if not collecting:
if getattr(sib, "name", None) == "br":
collecting = True
continue
text = (
sib.get_text("\n", strip=True)
if hasattr(sib, "get_text")
else str(sib).strip()
)
if text:
parts.append(text)
raw = "\n".join(parts)
raw = clean_text(raw, self.site.replacements)
return raw.strip()
def get_chapter_list(self):
"""Return the chapter list (DownloadController reads this)."""
return self.chapters

@ -1,39 +1,72 @@
# scraper/download_controller.py
from logbus.publisher import log
from celery import group
from scraper.tasks.pipeline import build_chapter_pipeline
from logbus.publisher import log
import os
class DownloadController:
"""Coordinates parallel chapter pipelines, with optional volume splitting."""
def __init__(self, url: str):
self.url = url
self.scraper = None # door BookScraper gevuld
self.base_path = None
def __init__(self, scrape_result: dict):
self.scrape_result = scrape_result
self.title = scrape_result.get("title", "UnknownBook")
self.chapters = scrape_result.get("chapters", [])
def start(self):
log(f"[DL-CONTROLLER] Parsing metadata for {self.url}")
# Base output dir from .env
root = os.getenv("BOOKSCRAPER_OUTPUT_DIR", "output")
# Volume size
self.max_vol = int(os.getenv("MAX_VOL_SIZE", "200"))
# 1) Boek info verzamelen
scraper = self.scraper = self._init_scraper()
scraper.parse_book_info()
# Base directory for the whole book
self.book_base = os.path.join(root, self.title)
os.makedirs(self.book_base, exist_ok=True)
# constant metadata for all chapters
self.meta = {
"title": self.scrape_result.get("title"),
"author": self.scrape_result.get("author"),
"description": self.scrape_result.get("description"),
}
def get_volume_path(self, chapter_num: int) -> str:
"""Returns the correct volume directory based on chapter number."""
vol_index = (chapter_num - 1) // self.max_vol + 1
vol_name = f"Volume_{vol_index:03d}"
vol_path = os.path.join(self.book_base, vol_name)
os.makedirs(vol_path, exist_ok=True)
return vol_path
def start(self):
log(f"[CTRL] Starting download pipeline for {self.title}")
log(f"[CTRL] Chapters: {len(self.chapters)}")
log(f"[CTRL] Output root: {self.book_base}")
log(f"[CTRL] MAX_VOL_SIZE = {self.max_vol}")
# base_path bepalen
self.base_path = scraper.get_base_path()
tasks = []
# 2) Chapters ophalen
chapters = scraper.get_chapter_list()
for ch in self.chapters:
chapter_num = ch["num"]
chapter_url = ch["url"]
# 3) Per chapter een Celery pipeline starten
for ch in chapters:
log(f"[DL-CONTROLLER] Queue pipeline for chapter {ch.number}")
# compute volume directory
vol_path = self.get_volume_path(chapter_num)
workflow = build_chapter_pipeline(
chapter_number=ch.number,
chapter_url=ch.url,
base_path=self.base_path
# build the pipeline for this chapter
tasks.append(
build_chapter_pipeline(
chapter_num,
chapter_url,
vol_path, # ✔ correct volume path!!
self.meta, # ✔ pass metadata once
)
)
workflow.delay() # 🔥 dit start de chain
# parallel processing
job_group = group(tasks)
async_result = job_group.apply_async()
return {"status": "queued", "chapters": len(chapters)}
log("[CTRL] Pipelines launched.")
return async_result

@ -0,0 +1,16 @@
# scraper/models/book_state.py
class Chapter:
"""
Lightweight chapter model used by DownloadController, BookScraper,
and Celery pipelines.
"""
def __init__(self, number: int, title: str, url: str):
self.number = number
self.title = title
self.url = url
def __repr__(self):
return f"Chapter(number={self.number}, title={self.title}, url={self.url})"

@ -0,0 +1,21 @@
# scraper/tasks/controller_tasks.py
from celery_app import celery_app
from logbus.publisher import log
from scraper.download_controller import DownloadController
print(">>> [IMPORT] controller_tasks.py loaded")
@celery_app.task(bind=True, queue="controller", ignore_result=False)
def launch_downloads(self, scrape_result: dict):
"""Start complete download → parse → save pipeline."""
log("[CTRL] Launching DownloadController...")
ctl = DownloadController(scrape_result)
async_result = ctl.start()
log("[CTRL] Pipelines dispatched.")
return {"pipelines_started": len(scrape_result.get("chapters", []))}

@ -1,33 +1,33 @@
# scraper/tasks/download_tasks.py
from celery import shared_task
from celery_app import celery_app
from logbus.publisher import log
import requests
print(">>> [IMPORT] download_tasks.py loaded")
@shared_task(bind=True, queue="download", ignore_result=False)
def download_chapter(self, chapter_number: int, chapter_url: str):
"""
Download a chapter page and return raw HTML for parsing.
Does NOT save anything; that is done by save_tasks.py
"""
log(f"[DL] Downloading chapter {chapter_number}: {chapter_url}")
@celery_app.task(bind=True, queue="download", ignore_result=False)
def download_chapter(self, chapter_num: int, chapter_url: str):
log(f"[DL] Downloading chapter {chapter_num}: {chapter_url}")
try:
resp = requests.get(chapter_url, timeout=15)
resp = requests.get(
chapter_url,
headers={"User-Agent": "Mozilla/5.0"},
timeout=20,
)
resp.raise_for_status()
html = resp.text
log(f"[DL] OK {chapter_number}: {len(html)} bytes")
resp.encoding = resp.apparent_encoding or "gb2312"
html = resp.text
log(f"[DL] OK {chapter_num}: {len(html)} bytes")
# Dit resultaat wordt doorgegeven aan parse_task
return {
"chapter": chapter_number,
"chapter": chapter_num,
"url": chapter_url,
"html": html,
}
except Exception as exc:
log(f"[DL] ERROR downloading {chapter_url}: {exc}")
log(f"[DL] ERROR {chapter_url}: {exc}")
raise

@ -1,57 +1,79 @@
# scraper/tasks/parse_tasks.py
from celery import shared_task
from celery_app import celery_app
from logbus.publisher import log
from scraper.utils import clean_text
from bs4 import BeautifulSoup
from scraper.utils import clean_text, load_replacements
print(">>> [IMPORT] parse_tasks.py loaded")
@shared_task(bind=True, queue="parse", ignore_result=False)
def parse_chapter(self, html: str, chapter_url: str):
@celery_app.task(bind=True, queue="parse", ignore_result=False)
def parse_chapter(self, download_result: dict, meta: dict):
"""
Parse downloaded chapter HTML into clean text.
Returns a dict:
{
"url": chapter_url,
"text": "...parsed text..."
}
download_result:
{
"chapter": int,
"url": str,
"html": str
}
meta:
{
"title": str,
"author": str,
"description": str
}
"""
try:
log(f"[PARSE] Start parsing: {chapter_url}")
soup = BeautifulSoup(html, "html.parser")
# Veel Chinese sites gebruiken dit soort containers:
possible_blocks = [
"#content",
".content",
"div#content",
"div.content",
"div#chaptercontent",
"#chapterContent"
]
node = None
for sel in possible_blocks:
r = soup.select_one(sel)
if r:
node = r
break
if not node:
log(
f"[PARSE] WARNING: no known content block found in {chapter_url}")
text = clean_text(soup.get_text())
else:
text = clean_text(node.get_text())
log(f"[PARSE] Finished parsing: {chapter_url} ({len(text)} chars)")
return {
"url": chapter_url,
"text": text,
}
except Exception as exc:
log(f"[PARSE] ERROR parsing {chapter_url}: {exc}")
raise
chapter_num = download_result["chapter"]
url = download_result["url"]
html = download_result["html"]
log(f"[PARSE] Parsing chapter {chapter_num}")
soup = BeautifulSoup(html, "lxml")
selectors = [
"#content",
".content",
"div#content",
"div.content",
"div#chaptercontent",
"#chapterContent",
".read-content",
]
node = None
for sel in selectors:
tmp = soup.select_one(sel)
if tmp:
node = tmp
break
raw = node.get_text() if node else soup.get_text()
# replacements
REPL = load_replacements()
text = clean_text(raw, REPL)
# ---------------------------------------------------
# HEADER ONLY FOR CHAPTER 1
# ---------------------------------------------------
if chapter_num == 1:
header = (
f"{meta.get('title','')}\n"
f"Author: {meta.get('author','')}\n"
f"Description:\n{meta.get('description','')}\n"
f"URL: {url}\n" + "-" * 50 + "\n\n"
)
text = header + text
log(f"[PARSE] Parsed chapter {chapter_num}: {len(text)} chars")
return {
"chapter": chapter_num,
"url": url,
"text": text,
"length": len(text),
}

@ -1,28 +1,21 @@
# scraper/tasks/pipeline.py
from celery import chain
from logbus.publisher import log
from scraper.tasks.download_tasks import download_chapter
from scraper.tasks.parse_tasks import parse_chapter
from scraper.tasks.save_tasks import save_chapter
def build_chapter_pipeline(chapter_number: int, chapter_url: str, base_path: str):
def build_chapter_pipeline(
chapter_number: int, chapter_url: str, base_path: str, meta: dict
):
"""
Build a Celery pipeline for a single chapter:
download -> parse -> save
Build a download parse save pipeline for one chapter.
meta bevat:
title, author, description
"""
log(f"[PIPELINE] Building chain for chapter {chapter_number}")
# Important: download returns dict {chapter, url, html}
# parse accepts html + chapter_url (via s())
# save accepts chapter_number, text, base_path
workflow = chain(
return chain(
download_chapter.s(chapter_number, chapter_url),
parse_chapter.s(), # takes previous result dict
save_chapter.s(base_path=base_path)
parse_chapter.s(meta), # ← METADATA DOORGEVEN
save_chapter.s(base_path),
)
return workflow

@ -1,4 +1,5 @@
# scraper/tasks/save_tasks.py
print(">>> [IMPORT] save_tasks.py loaded")
from celery import shared_task
from logbus.publisher import log
@ -6,26 +7,18 @@ import os
@shared_task(bind=True, queue="save", ignore_result=False)
def save_chapter(self, result: dict, base_path: str):
"""
Save parsed chapter text to disk.
result = {
"url": ...,
"text": ...
}
"""
def save_chapter(self, parsed: dict, base_path: str):
print(f">>> [save_tasks] save_chapter() CALLED for chapter {parsed.get('chapter')}")
try:
text = result.get("text", "")
url = result.get("url")
chapter_number = parsed.get("chapter")
url = parsed.get("url")
text = parsed.get("text", "")
# Haal chapter nummer uit URL
# Bijvoorbeeld: .../12345.html
# ⇒ 12345
chapter_number = extract_chapter_number(url)
if not chapter_number:
raise ValueError("Missing chapter_number in parsed payload")
if not os.path.exists(base_path):
os.makedirs(base_path, exist_ok=True)
os.makedirs(base_path, exist_ok=True)
filename = f"{chapter_number:05d}.txt"
path = os.path.join(base_path, filename)
@ -34,24 +27,11 @@ def save_chapter(self, result: dict, base_path: str):
f.write(text)
log(f"[SAVE] Saved chapter {chapter_number}{path}")
print(f">>> [save_tasks] SAVED {path}")
return {"chapter": chapter_number, "path": path}
except Exception as exc:
log(f"[SAVE] ERROR saving chapter from {url}: {exc}")
print(f">>> [save_tasks] ERROR: {exc}")
raise
def extract_chapter_number(url: str) -> int:
"""
Utility extractor for chapter numbers from a URL.
Example: https://site.com/1234.html 1234
"""
try:
import re
m = re.search(r'(\d+)\.html?', url)
if m:
return int(m.group(1))
except:
pass
return 0

@ -1,35 +1,52 @@
from celery import shared_task
from scraper.book_scraper import BookScraper
from scraper.sites import BookSite
# scraper/tasks/scraping.py
#
from celery_app import celery_app
from logbus.publisher import log
import os
from scraper.sites import BookSite
from scraper.book_scraper import BookScraper
from scraper.tasks.controller_tasks import launch_downloads
print(">>> [IMPORT] scraping.py loaded")
@celery_app.task(bind=True, queue="scraping", ignore_result=False)
def start_scrape_book(self, url: str):
"""Scrapes metadata + chapter list."""
log(f"[SCRAPING] Start scraping for: {url}")
site = BookSite()
scraper = BookScraper(site, url)
scraper.parse_book_info()
@shared_task(bind=True, queue="scraping")
def scrape_book(self, url):
"""
HIGH-LEVEL SCRAPER TASK
Roept synchronen BookScraper aan voor een volledige scrape.
"""
log(f"[SCRAPER] Start scrape: {url}")
chapters = scraper.get_chapter_list()
full_count = len(chapters)
scraper = BookScraper(BookSite(), url)
result = scraper.execute()
DRY_RUN = os.getenv("DRY_RUN", "0") == "1"
TEST_LIMIT = int(os.getenv("TEST_LIMIT", "5"))
log(f"[SCRAPER] Finished scrape: {url}")
return {"title": result["title"]}
if DRY_RUN:
log(f"[SCRAPING] DRY_RUN: limiting chapters to first {TEST_LIMIT}")
chapters = chapters[:TEST_LIMIT]
result = {
"title": scraper.book_title,
"author": scraper.book_author,
"description": scraper.book_description,
"cover": scraper.cover_url,
"chapters": [
{"num": ch.number, "title": ch.title, "url": ch.url} for ch in chapters
],
}
@shared_task(bind=True, queue="download", max_retries=5)
def download_chapter_task(self, number, title, url, output_base):
"""
Download alleen één chapter.
download_worker.py voert dit uiteindelijk uit.
"""
from worker.download_worker import download_single_chapter
log(f"[SCRAPING] Completed scrape: {len(chapters)}/{full_count} chapters")
try:
return download_single_chapter(number, title, url, output_base)
celery_app.send_task(
"scraper.tasks.controller_tasks.launch_downloads",
args=[result],
queue="controller",
)
except Exception as e:
log(f"[DOWNLOAD] Error while downloading chapter {number}: {e}")
raise self.retry(countdown=3)
return result

@ -0,0 +1,57 @@
# scraper/utils.py
import re
import os
from pathlib import Path
# ------------------------------------------------------------
# Load replacements from text_replacements.txt (optional file)
# ------------------------------------------------------------
def load_replacements(filepath="text_replacements.txt") -> dict:
"""
Load key=value style replacements.
Empty or missing file return {}.
"""
path = Path(filepath)
if not path.exists():
return {}
repl = {}
with open(path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if "=" in line:
key, val = line.split("=", 1)
repl[key.strip()] = val.strip()
return repl
# ------------------------------------------------------------
# Clean extracted HTML text
# ------------------------------------------------------------
def clean_text(raw: str, repl_dict: dict = None) -> str:
"""
Normalizes whitespace, removes junk, and applies replacements.
repl_dict is optional falls back to {}.
"""
if repl_dict is None:
repl_dict = {}
txt = raw
# Normalize CRLF
txt = txt.replace("\r", "")
# Collapse multiple blank lines
txt = re.sub(r"\n{3,}", "\n\n", txt)
# Apply replacements
for key, val in repl_dict.items():
txt = txt.replace(key, val)
# Strip excessive whitespace at edges
return txt.strip()

@ -1,22 +1,36 @@
import os
# scraper/utils.py
from pathlib import Path
def load_replacements(path="text_replacements.txt") -> dict:
"""
Load key=value replacements from a simple text file.
Lines beginning with # are ignored.
"""
fp = Path(path)
if not fp.exists():
return {}
def load_replacements(path):
repl = {}
if not path or not os.path.exists(path):
return repl
with open(path, encoding="utf-8") as f:
for line in f:
if "=>" in line:
k, v = line.strip().split("=>", 1)
repl[k.strip()] = v.strip()
for line in fp.read_text(encoding="utf-8").splitlines():
line = line.strip()
if not line or line.startswith("#"):
continue
if "=" in line:
k, v = line.split("=", 1)
repl[k.strip()] = v.strip()
return repl
def clean_text(text, repl_dict):
for src, tgt in repl_dict.items():
text = text.replace(src, tgt)
return text
def clean_text(raw: str, repl_dict: dict) -> str:
"""
Cleans text using user-defined replacements.
"""
txt = raw
for k, v in repl_dict.items():
txt = txt.replace(k, v)
return txt.strip()

@ -1,63 +1,85 @@
<!DOCTYPE html>
<html lang="nl">
<head>
<meta charset="UTF-8">
<title>Scrape Resultaat</title>
<head>
<meta charset="UTF-8" />
<title>Scrape & Download Resultaat</title>
<style>
body { font-family: Arial, sans-serif; padding: 40px; max-width: 900px; margin: auto; }
h1 { margin-bottom: 10px; }
.error { padding: 15px; background: #ffdddd; border-left: 5px solid #ff4444; margin-bottom: 20px; }
.box { padding: 15px; background: #f7f7f7; border: 1px solid #ddd; margin-bottom: 20px; border-radius: 6px; }
a { color: #007bff; text-decoration: none; }
a:hover { text-decoration: underline; }
pre { background: #222; color: #eee; padding: 10px; border-radius: 6px; overflow-x: auto; }
small { color: #555; }
body {
font-family: Arial, sans-serif;
padding: 40px;
max-width: 900px;
margin: auto;
}
h1 {
margin-bottom: 10px;
}
.error {
padding: 15px;
background: #ffdddd;
border-left: 5px solid #ff4444;
margin-bottom: 20px;
}
.box {
padding: 15px;
background: #f7f7f7;
border: 1px solid #ddd;
margin-bottom: 20px;
border-radius: 6px;
}
a {
color: #007bff;
text-decoration: none;
}
a:hover {
text-decoration: underline;
}
</style>
</head>
<body>
</head>
<body>
<a href="/">&larr; Terug</a>
<a href="/">&larr; Terug</a>
{% if error %}
<div class="error"><strong>Fout:</strong><br />{{ error }}</div>
{% endif %}
{% if error %}
<div class="error">
<strong>Fout:</strong><br>{{ error }}
</div>
{% endif %}
<h1>Scrape Resultaat</h1>
<h1>Scrape resultaat</h1>
{% if book %}
{% if result %}
<div class="box">
<strong>Titel:</strong> {{ result.title }}<br>
<strong>Auteur:</strong> {{ result.author }}<br>
</div>
<div class="box">
<strong>Titel:</strong> {{ book.title }}<br />
<strong>Auteur:</strong> {{ book.author }}<br />
</div>
{% if result.description %}
<div class="box">
<strong>Beschrijving:</strong><br>
<p>{{ result.description }}</p>
</div>
{% endif %}
{% if book.description %}
<div class="box">
<strong>Beschrijving:</strong><br />
<p>{{ book.description }}</p>
</div>
{% endif %}
<div class="box">
<strong>Aantal chapters:</strong> {{ result.chapters|length }}
</div>
<div class="box">
<strong>Aantal chapters:</strong> {{ book.chapters|length }}
</div>
{% if result.chapters %}
<div class="box">
<strong>Chapters:</strong><br><br>
<ul>
{% for ch in result.chapters %}
{% if book.chapters %}
<div class="box">
<strong>Chapters:</strong><br /><br />
<ul>
{% for ch in book.chapters %}
<li>
<a href="{{ ch.url }}" target="_blank">
Chapter {{ ch.number }} — {{ ch.title }}
</a>
<a href="{{ ch.url }}" target="_blank">
Chapter {{ ch.num }} — {{ ch.title }}
</a>
</li>
{% endfor %}
</ul>
</div>
{% endif %}
{% endif %}
</body>
</ul>
</div>
{% endif %} {% if download_job_id %}
<div class="box">
<strong>Download pipeline gestart!</strong><br />
Job ID: <code>{{ download_job_id }}</code>
</div>
{% endif %} {% endif %}
</body>
</html>

Loading…
Cancel
Save