You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
kmftools/tmp/stash.patch2

2951 lines
100 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

diff --git a/bookscraper/.gitignore b/bookscraper/.gitignore
index 08fedd4..cd78ff3 100644
--- a/bookscraper/.gitignore
+++ b/bookscraper/.gitignore
@@ -1,4 +1,164 @@
-output/
+# ============================================
+# PYTHON
+# ============================================
+
+# Bytecode
+__pycache__/
+*.pyc
+*.pyo
+*.pyd
+
+# Virtual environments
venv/
+env/
+.venv/
+
+# Python build artifacts
+build/
+dist/
+*.egg-info/
+
+
+# ============================================
+# PROJECT-SPECIFIC IGNORE RULES
+# ============================================
+
+# Output generated by BookScraper
+output/
+audio_output/
+m4b_output/
+covers/
+
+# Logs
*.log
-__pycache__/
\ No newline at end of file
+logs/
+log/
+celerybeat-schedule
+celerybeat.pid
+
+# Redis dump (if ever created)
+dump.rdb
+
+# Temporary HTML/debug scrapings
+tmp/
+temp/
+*.html.tmp
+*.debug.html
+
+
+# ============================================
+# CELERY / RUNTIME
+# ============================================
+
+celerybeat-schedule
+*.pid
+*.worker
+
+# Celery progress / abort temporary files (if any)
+abort_flags/
+progress_cache/
+
+
+# ============================================
+# DOCKER
+# ============================================
+
+# Docker build cache
+**/.dockerignore
+**/Dockerfile~
+docker-compose.override.yml
+docker-compose.local.yml
+docker-compose*.backup
+
+# Local bind mounts from Docker
+**/.volumes/
+**/mnt/
+**/cache/
+
+
+# ============================================
+# FRONTEND / STATIC FILES
+# ============================================
+
+# Node / JS (if ever used)
+node_modules/
+npm-debug.log
+yarn-debug.log
+yarn-error.log
+dist/
+.bundle/
+
+
+# ============================================
+# VS CODE / EDITORS
+# ============================================
+
+# VSCode
+.vscode/
+.history/
+.code-workspace
+
+# PyCharm / JetBrains
+.idea/
+
+# Editor backups
+*.swp
+*.swo
+*~
+
+# Autosave files
+*.bak
+*.tmp
+
+
+# ============================================
+# SYSTEM / OS FILES
+# ============================================
+
+# MacOS bullshit
+.DS_Store
+.AppleDouble
+.LSOverride
+Icon?
+.Trashes
+
+# Windows
+Thumbs.db
+Desktop.ini
+
+
+# ============================================
+# ARCHIVES
+# ============================================
+
+*.zip
+*.tar.gz
+*.tgz
+*.7z
+*.rar
+
+
+# ============================================
+# AUDIO / TTS / TEMPORARY
+# ============================================
+
+*.wav
+*.mp3
+*.m4a
+*.m4b
+*.aac
+*.flac
+
+tts_temp/
+audio_temp/
+tts_cache/
+
+
+# ============================================
+# GIT INTERNAL SAFETY
+# ============================================
+
+# Never track your global git config junk
+.gitignore_global
+.gitconfig
+.gitattributes-global
diff --git a/bookscraper/app.py b/bookscraper/app.py
index bf758c8..241bc22 100644
--- a/bookscraper/app.py
+++ b/bookscraper/app.py
@@ -1,5 +1,5 @@
# ============================================
-# File: bookscraper/app.py (ASYNC SCRAPING)
+# File: bookscraper/app.py (ASYNC SCRAPING + DASHBOARD)
# ============================================
from dotenv import load_dotenv
@@ -9,34 +9,45 @@ load_dotenv()
print(">>> [WEB] Importing celery_app …")
from celery_app import celery_app
-from flask import Flask, render_template, request, jsonify
+from flask import (
+ Flask,
+ render_template,
+ request,
+ jsonify,
+ redirect,
+ send_from_directory,
+)
from scraper.logger import log_debug
-# Abort + Progress (per book_id)
+# Abort + Progress (legacy)
from scraper.abort import set_abort
from scraper.progress import get_progress
-# UI LOGS (GLOBAL — no book_id)
-from scraper.ui_log import get_ui_logs, reset_ui_logs # <-- ADDED
+# NEW: Full Redis Book State Model
+from scraper.progress import get_state
+from scraper.progress import r as redis_client
-from celery.result import AsyncResult
+# NEW: Indexed log fetchers
+from scraper.log_index import fetch_logs, fetch_recent_logs, fetch_global_logs
+
+# UI LOGS (legacy)
+from scraper.ui_log import get_ui_logs, reset_ui_logs
-# ⬇⬇⬇ TOEGEVOEGD voor cover-serving
-from flask import send_from_directory
+from celery.result import AsyncResult
import os
+import time
+import re
app = Flask(__name__)
-
# =====================================================
-# STATIC FILE SERVING FOR OUTPUT ← TOEGEVOEGD
+# STATIC FILE SERVING FOR OUTPUT
# =====================================================
OUTPUT_ROOT = os.getenv("BOOKSCRAPER_OUTPUT_DIR", "output")
@app.route("/output/<path:filename>")
def serve_output(filename):
- """Serve output files such as cover.jpg and volumes."""
return send_from_directory(OUTPUT_ROOT, filename, as_attachment=False)
@@ -49,7 +60,7 @@ def index():
# =====================================================
-# START SCRAPING (async via Celery)
+# START SCRAPING → DIRECT REDIRECT TO DASHBOARD (book_idx native)
# =====================================================
@app.route("/start", methods=["POST"])
def start_scraping():
@@ -58,64 +69,150 @@ def start_scraping():
if not url:
return render_template("result.html", error="Geen URL opgegeven.")
- # ---------------------------------------------------------
- # NEW: Clear UI log buffer when starting a new scrape
- # ---------------------------------------------------------
reset_ui_logs()
-
log_debug(f"[WEB] Scraping via Celery: {url}")
- async_result = celery_app.send_task(
+ # --------------------------------------------
+ # Extract book_idx from URL
+ # Supports:
+ # - /15/15618.html
+ # - /15618/
+ # - /15618.html
+ # --------------------------------------------
+ m = re.search(r"/(\d+)(?:\.html|/)?$", url)
+ if not m:
+ return render_template(
+ "result.html", error="Kan book_idx niet bepalen uit URL."
+ )
+
+ book_idx = m.group(1)
+
+ # --------------------------------------------
+ # Start async scraping task
+ # --------------------------------------------
+ celery_app.send_task(
"scraper.tasks.scraping.start_scrape_book",
args=[url],
queue="scraping",
)
- return render_template(
- "result.html",
- message="Scraping gestart.",
- scraping_task_id=async_result.id,
- book_title=None,
- )
+ # --------------------------------------------
+ # DIRECT redirect — no waiting on Celery
+ # --------------------------------------------
+ return redirect(f"/book/{book_idx}")
# =====================================================
-# CLEAR UI LOGS MANUALLY (NEW)
+# CLEAR UI LOGS (legacy)
# =====================================================
@app.route("/clear-logs", methods=["POST"])
def clear_logs():
reset_ui_logs()
- return jsonify({"status": "ok", "message": "UI logs cleared"})
+ return jsonify({"status": "ok"})
+
+
+# =====================================================
+# ABORT (per book_idx)
+# =====================================================
+@app.route("/abort/<book_idx>", methods=["POST"])
+def abort_download(book_idx):
+ log_debug(f"[WEB] Abort requested for book: {book_idx}")
+ set_abort(book_idx)
+ return jsonify({"status": "ok", "aborted": book_idx})
+
+
+# =====================================================
+# LEGACY PROGRESS ENDPOINT
+# =====================================================
+@app.route("/progress/<book_idx>", methods=["GET"])
+def progress(book_idx):
+ return jsonify(get_progress(book_idx))
+
+
+# =====================================================
+# REDIS STATE ENDPOINT
+# =====================================================
+@app.route("/state/<book_idx>", methods=["GET"])
+def full_state(book_idx):
+ return jsonify(get_state(book_idx))
# =====================================================
-# ABORT (per book_id)
+# LIST ALL BOOKS — METADATA
# =====================================================
-@app.route("/abort/<book_id>", methods=["POST"])
-def abort_download(book_id):
- log_debug(f"[WEB] Abort requested for book: {book_id}")
- set_abort(book_id)
- return jsonify({"status": "ok", "aborted": book_id})
+@app.route("/books", methods=["GET"])
+def list_books():
+ books = sorted(redis_client.smembers("books") or [])
+ result = []
+
+ for book_idx in books:
+ meta = redis_client.hgetall(f"book:{book_idx}:meta") or {}
+ state = get_state(book_idx) or {}
+
+ result.append(
+ {
+ "id": book_idx,
+ "title": meta.get("title", book_idx),
+ "author": meta.get("author"),
+ "url": meta.get("url"),
+ "cover_url": meta.get("cover_url"),
+ "scraped_at": meta.get("scraped_at"),
+ "status": state.get("status"),
+ "last_update": state.get("last_update"),
+ "chapters_total": state.get("chapters_total"),
+ "chapters_done": state.get("chapters_done"),
+ }
+ )
+
+ return jsonify(result)
# =====================================================
-# PROGRESS (per book_id)
+# LIBRARY DASHBOARD PAGE
# =====================================================
-@app.route("/progress/<book_id>", methods=["GET"])
-def progress(book_id):
- return jsonify(get_progress(book_id))
+@app.route("/library", methods=["GET"])
+def library_page():
+ return render_template("library.html")
# =====================================================
-# LOGS — GLOBAL UI LOGS
+# BOOK DASHBOARD PAGE — book_idx native
# =====================================================
-@app.route("/logs", methods=["GET"])
-def logs():
- return jsonify({"logs": get_ui_logs()})
+@app.route("/book/<book_idx>", methods=["GET"])
+def book_dashboard(book_idx):
+ return render_template(
+ "book_dashboard.html",
+ book_id=book_idx, # for template backward compatibility
+ book_idx=book_idx,
+ )
# =====================================================
-# CELERY RESULT → return book_id when scraping finishes
+# INDEXED LOG API — book_idx direct
+# =====================================================
+@app.route("/api/book/<book_idx>/logs", methods=["GET"])
+def api_book_logs(book_idx):
+ cursor = int(request.args.get("cursor", "0"))
+ logs, new_cursor = fetch_logs(book_idx, cursor)
+ return jsonify({"logs": logs, "cursor": new_cursor})
+
+
+@app.route("/api/book/<book_idx>/logs/recent", methods=["GET"])
+def api_book_logs_recent(book_idx):
+ limit = int(request.args.get("limit", "200"))
+ logs = fetch_recent_logs(book_idx, limit)
+ return jsonify({"logs": logs})
+
+
+@app.route("/api/logs/global", methods=["GET"])
+def api_global_logs():
+ cursor = int(request.args.get("cursor", "0"))
+ logs, new_cursor = fetch_global_logs(cursor)
+ return jsonify({"logs": logs, "cursor": new_cursor})
+
+
+# =====================================================
+# CELERY RESULT
# =====================================================
@app.route("/celery-result/<task_id>", methods=["GET"])
def celery_result(task_id):
@@ -123,10 +220,8 @@ def celery_result(task_id):
if result.successful():
return jsonify({"ready": True, "result": result.get()})
-
if result.failed():
return jsonify({"ready": True, "error": "failed"})
-
return jsonify({"ready": False})
diff --git a/bookscraper/logbus/publisher.py b/bookscraper/logbus/publisher.py
index 9a597db..5476475 100644
--- a/bookscraper/logbus/publisher.py
+++ b/bookscraper/logbus/publisher.py
@@ -1,4 +1,11 @@
-# logbus/publisher.py
+# ============================================================
+# File: logbus/publisher.py
+# Purpose:
+# Centralized logger:
+# - console logging
+# - UI legacy log echo
+# - NEW: indexed Redis log ingest (non-blocking)
+# ============================================================
import logging
@@ -10,20 +17,45 @@ def log(message: str):
Dumb logger:
- skip lege messages
- stuur message 1:1 door
- - geen prefixes
+ - geen prefixes wijzigen
- geen mutaties
"""
if not message or not message.strip():
return
- # console
+ # ============================================================
+ # SAFETY FIX (C&U):
+ # voorkom infinite loop: messages die uit log_index komen
+ # beginnen met "[IDX]" en mogen NIET opnieuw via de pipeline.
+ # ============================================================
+ if message.startswith("[IDX]"):
+ logger.warning(message)
+ return
+
+ # ---------------------------------------
+ # Console log
+ # ---------------------------------------
logger.warning(message)
- # UI-echo
+ # ---------------------------------------
+ # Legacy UI log (bestaand gedrag)
+ # ---------------------------------------
try:
- from scraper.ui_log import push_ui
+ from scraper.ui_log import push_ui # delayed import
push_ui(message)
except Exception:
+ # UI log mag nooit crash veroorzaken
+ pass
+
+ # ---------------------------------------
+ # NEW: Indexed Redis log entry
+ # ---------------------------------------
+ try:
+ from scraper.log_index import ingest_indexed_log # delayed import
+
+ ingest_indexed_log(message)
+ except Exception:
+ # Fail silently — logging mag nooit pipeline breken
pass
diff --git a/bookscraper/scraper/book_scraper.py b/bookscraper/scraper/book_scraper.py
index 922d0c7..bf54375 100644
--- a/bookscraper/scraper/book_scraper.py
+++ b/bookscraper/scraper/book_scraper.py
@@ -13,10 +13,7 @@ from scraper.models.book_state import Chapter
class BookScraper:
"""
Minimal scraper: only metadata + chapter list.
- The DownloadController handles Celery pipelines for:
- - download
- - parse
- - save
+ The DownloadController handles Celery pipelines.
"""
def __init__(self, site, url):
@@ -29,6 +26,8 @@ class BookScraper:
self.cover_url = ""
self.chapter_base = None
+ self.book_idx = None # NUMERIEK ID UIT URL (enige bron)
+
self.chapters = []
# Load custom replacements
@@ -40,6 +39,9 @@ class BookScraper:
"""Main entry point. Returns metadata + chapter URLs."""
soup = self._fetch(self.url)
+ # Book_idx alleen uit main URL (afspraak)
+ self._extract_book_idx()
+
self._parse_title(soup)
self._parse_author(soup)
self._parse_description(soup)
@@ -54,14 +56,30 @@ class BookScraper:
"title": self.book_title,
"author": self.book_author,
"description": self.book_description,
- "cover_url": self.cover_url, # ← used by DownloadController
+ "cover_url": self.cover_url,
"book_url": self.url,
+ "book_idx": self.book_idx, # <<< belangrijk voor logging pipeline
"chapters": [
{"num": ch.number, "title": ch.title, "url": ch.url}
for ch in self.chapters
],
}
+ # ------------------------------------------------------------
+ def _extract_book_idx(self):
+ """
+ Extract numeric ID from main URL such as:
+ https://www.piaotia.com/bookinfo/15/15618.html
+ This is the ONLY allowed source.
+ """
+ m = re.search(r"/(\d+)\.html$", self.url)
+ if m:
+ self.book_idx = m.group(1)
+ log_debug(f"[BookScraper] Extracted book_idx = {self.book_idx}")
+ else:
+ self.book_idx = None
+ log_debug("[BookScraper] book_idx NOT FOUND in URL")
+
# ------------------------------------------------------------
def _fetch(self, url):
log_debug(f"[BookScraper] Fetch: {url}")
@@ -109,10 +127,7 @@ class BookScraper:
def _parse_cover(self, soup):
"""
Extract correct cover based on book_id path logic.
- 1. primary: match "/files/article/image/{vol}/{book_id}/"
- 2. fallback: endswith "/{book_id}s.jpg"
"""
- # Extract book_id from URL
m = re.search(r"/(\d+)\.html$", self.url)
if not m:
log_debug("[BookScraper] No book_id found in URL → cannot match cover")
@@ -120,20 +135,15 @@ class BookScraper:
book_id = m.group(1)
- # Extract vol folder from URL (bookinfo/<vol>/<id>.html)
m2 = re.search(r"/bookinfo/(\d+)/", self.url)
volume = m2.group(1) if m2 else None
log_debug(f"[BookScraper] Book ID = {book_id}, Volume = {volume}")
imgs = soup.find_all("img", src=True)
-
chosen = None
- # --------------------------------------------------------
- # PRIORITY 1: Path-match
- # /files/article/image/{vol}/{book_id}/
- # --------------------------------------------------------
+ # PATH-MATCH
if volume:
target_path = f"/files/article/image/{volume}/{book_id}/"
for img in imgs:
@@ -143,9 +153,7 @@ class BookScraper:
log_debug(f"[BookScraper] Cover matched by PATH: {src}")
break
- # --------------------------------------------------------
- # PRIORITY 2: endswith "/{book_id}s.jpg"
- # --------------------------------------------------------
+ # SUFFIX-MATCH
if not chosen:
target_suffix = f"/{book_id}s.jpg"
for img in imgs:
@@ -155,9 +163,6 @@ class BookScraper:
log_debug(f"[BookScraper] Cover matched by SUFFIX: {src}")
break
- # --------------------------------------------------------
- # No match
- # --------------------------------------------------------
if not chosen:
log_debug("[BookScraper] No matching cover found")
return
@@ -167,14 +172,12 @@ class BookScraper:
# ------------------------------------------------------------
def get_chapter_page(self, soup):
- """Return BeautifulSoup of the main chapter list page."""
node = soup.select_one(
"html > body > div:nth-of-type(6) > div:nth-of-type(2) > div > table"
)
href = node.select_one("a").get("href")
chapter_url = urljoin(self.site.root, href)
- # base for chapter links
parts = chapter_url.rsplit("/", 1)
self.chapter_base = parts[0] + "/"
diff --git a/bookscraper/scraper/download_controller.py b/bookscraper/scraper/download_controller.py
index 9a9e978..4f2cf32 100644
--- a/bookscraper/scraper/download_controller.py
+++ b/bookscraper/scraper/download_controller.py
@@ -1,231 +1,133 @@
-# =========================================================
+# ============================================================
# File: scraper/download_controller.py
# Purpose:
-# Build Celery pipelines for all chapters
-# and pass book_id for abort/progress/log functionality.
-# + Download and replicate cover image to all volume folders
-# + Generate scripts (allinone.txt, makebook, say)
-# + Initialize Redis Book State Model (status + counters)
-# =========================================================
-
-from celery import group
-from scraper.tasks.pipeline import build_chapter_pipeline
-from scraper.scriptgen import generate_all_scripts
-from logbus.publisher import log
+# Prepare folder structure, volumes, cover, and Celery pipelines
+# using ONLY Celery-safe primitive arguments.
+#
+# Workers never receive BookContext/ChapterContext.
+# ============================================================
+
import os
import requests
-import shutil
-from scraper.abort import abort_requested # DEBUG allowed
+from logbus.publisher import log
+from celery import chain, group
+
+from scraper.tasks.download_tasks import download_chapter
+from scraper.tasks.parse_tasks import parse_chapter
+from scraper.tasks.save_tasks import save_chapter
+from scraper.tasks.progress_tasks import update_progress
+
-# NEW: Redis State Model (C&U)
-from scraper.progress import (
- init_book_state,
- set_status,
- set_chapter_total,
-)
+print(">>> [IMPORT] download_controller.py loaded (final Celery-safe mode)")
class DownloadController:
"""
- Coordinates all chapter pipelines (download → parse → save),
- including:
- - volume splitting
- - consistent meta propagation
- - book_id-based abort + progress tracking
- - cover download + volume replication
- - script generation (allinone.txt, makebook, say)
- - Redis book state initialisation and status updates
+ Responsibilities:
+ • Determine output root
+ • Download cover
+ • Assign chapters → volumes
+ • Build Celery pipelines (primitive-only arguments)
"""
- def __init__(self, book_id: str, scrape_result: dict):
- self.book_id = book_id
+ def __init__(self, scrape_result: dict):
+ self.book_idx = scrape_result.get("book_id")
+ self.title = scrape_result.get("title", "Unknown")
+ self.author = scrape_result.get("author")
+ self.cover_url = scrape_result.get("cover")
self.scrape_result = scrape_result
- # Core metadata
- self.title = scrape_result.get("title", "UnknownBook")
- self.chapters = scrape_result.get("chapters", []) or []
- self.cover_url = scrape_result.get("cover_url")
-
- # Output base dir
- root = os.getenv("BOOKSCRAPER_OUTPUT_DIR", "output")
-
- # Volume size
- self.max_vol = int(os.getenv("MAX_VOL_SIZE", "200"))
-
- # Base folder for the whole book
- self.book_base = os.path.join(root, self.title)
- os.makedirs(self.book_base, exist_ok=True)
-
- # Meta passed to parse/save stage
- self.meta = {
- "title": self.title,
- "author": scrape_result.get("author"),
- "description": scrape_result.get("description"),
- "book_url": scrape_result.get("book_url"),
- }
-
- # -------------------------------------------------
- # DEBUG — bevestig dat controller correct book_id ziet
- # -------------------------------------------------
- log(f"[CTRL_DEBUG] Controller init book_id={book_id} title='{self.title}'")
-
- try:
- abort_state = abort_requested(book_id)
- log(f"[CTRL_DEBUG] abort_requested(book_id={book_id}) → {abort_state}")
- except Exception as e:
- log(f"[CTRL_DEBUG] abort_requested ERROR: {e}")
-
- # -------------------------------------------------
- # NEW: Initialize Redis Book State Model
- # -------------------------------------------------
- try:
- init_book_state(
- book_id=self.book_id,
- title=self.title,
- url=self.scrape_result.get("book_url"),
- chapters_total=len(self.chapters),
- )
- log(f"[CTRL_STATE] init_book_state() completed for {self.title}")
- except Exception as e:
- log(f"[CTRL_STATE] init_book_state FAILED: {e}")
-
- # ---------------------------------------------------------
- # Cover Download
- # ---------------------------------------------------------
- def download_cover(self):
- """Download one cover image into the root of the book folder."""
- if not self.cover_url:
- log(f"[CTRL] No cover URL found for '{self.title}'")
- return
-
- cover_path = os.path.join(self.book_base, "cover.jpg")
-
- headers = {
- "User-Agent": (
- "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:118.0) "
- "Gecko/20100101 Firefox/118.0"
- ),
- "Referer": self.scrape_result.get("book_url") or "https://www.piaotia.com/",
- }
-
- try:
- log(f"[CTRL] Downloading cover: {self.cover_url}")
-
- resp = requests.get(self.cover_url, timeout=10, headers=headers)
- resp.raise_for_status()
+ # List of dicts from scraper: [{num, title, url}]
+ self.chapters = scrape_result.get("chapter_list", [])
+ self.chapter_count = len(self.chapters)
+
+ # Output root
+ self.output_root = os.path.join("/app/output", self.title)
+
+ # Will be filled by assign_volumes()
+ self.volume_paths = {} # chapter_num → volume_path
+
+ # -----------------------------------------------------------
+ # Create root folder + download cover
+ # -----------------------------------------------------------
+ def init_book_output(self):
+ os.makedirs(self.output_root, exist_ok=True)
+
+ if self.cover_url:
+ try:
+ resp = requests.get(self.cover_url, timeout=10)
+ if resp.ok:
+ p = os.path.join(self.output_root, "cover.jpg")
+ with open(p, "wb") as f:
+ f.write(resp.content)
+ log(f"[CTRL] Cover saved: {p}")
+ except Exception as exc:
+ log(f"[CTRL] Cover download error: {exc}")
+
+ # -----------------------------------------------------------
+ # Volume assignment (no BookContext needed)
+ # -----------------------------------------------------------
+ def assign_volumes(self, max_chapters_per_volume=200):
+ """
+ Determine per-chapter volume_path and ensure folders exist.
+ """
+ volume_index = 1
+ chapters_in_volume = 0
- with open(cover_path, "wb") as f:
- f.write(resp.content)
-
- log(f"[CTRL] Cover saved to: {cover_path}")
-
- except Exception as e:
- log(f"[CTRL] Cover download failed: {e} (url={self.cover_url})")
-
- # ---------------------------------------------------------
- # Cover Replication to Volumes
- # ---------------------------------------------------------
- def replicate_cover_to_volumes(self):
- """Copy cover.jpg into each existing Volume_xxx directory."""
- src = os.path.join(self.book_base, "cover.jpg")
- if not os.path.exists(src):
- log("[CTRL] No cover.jpg found, replication skipped")
- return
+ for ch in self.chapters:
+ if chapters_in_volume >= max_chapters_per_volume:
+ volume_index += 1
+ chapters_in_volume = 0
- try:
- for entry in os.listdir(self.book_base):
- if entry.lower().startswith("volume_"):
- vol_dir = os.path.join(self.book_base, entry)
- dst = os.path.join(vol_dir, "cover.jpg")
-
- shutil.copyfile(src, dst)
- log(f"[CTRL] Cover replicated into: {dst}")
-
- except Exception as e:
- log(f"[CTRL] Cover replication failed: {e}")
-
- # ---------------------------------------------------------
- # Volume isolation
- # ---------------------------------------------------------
- def get_volume_path(self, chapter_num: int) -> str:
- """Returns the correct volume directory for a chapter."""
- vol_index = (chapter_num - 1) // self.max_vol + 1
- vol_name = f"Volume_{vol_index:03d}"
- vol_path = os.path.join(self.book_base, vol_name)
- os.makedirs(vol_path, exist_ok=True)
- return vol_path
-
- # ---------------------------------------------------------
- # Pipeline launcher
- # ---------------------------------------------------------
- def start(self):
- total = len(self.chapters)
+ volume_path = os.path.join(self.output_root, f"Volume{volume_index}")
+ os.makedirs(volume_path, exist_ok=True)
- log(
- f"[CTRL] Initialising pipeline for '{self.title}' "
- f"(book_id={self.book_id}, chapters={total}, max_vol={self.max_vol})"
- )
- log(f"[CTRL] Output root: {self.book_base}")
+ # C&U FIX — scraper outputs key "num", NOT "number"
+ chapter_num = ch["num"]
+ self.volume_paths[chapter_num] = volume_path
- # -------------------------------------
- # NEW: Redis state update
- # -------------------------------------
- try:
- set_status(self.book_id, "downloading")
- set_chapter_total(self.book_id, total)
- log(f"[CTRL_STATE] Status set to 'downloading' for {self.book_id}")
- except Exception as e:
- log(f"[CTRL_STATE] set_status/set_chapter_total FAILED: {e}")
+ chapters_in_volume += 1
- # -------------------------------------
- # 1) Download cover
- # -------------------------------------
- self.download_cover()
+ log(f"[CTRL] Volume assignment complete: {len(self.volume_paths)} chapters")
- tasks = []
+ # -----------------------------------------------------------
+ # Build Celery pipelines (primitive-only)
+ # -----------------------------------------------------------
+ def build_pipelines(self):
+ pipelines = []
for ch in self.chapters:
- chapter_num = ch["num"]
- chapter_url = ch["url"]
-
- volume_path = self.get_volume_path(chapter_num)
-
- tasks.append(
- build_chapter_pipeline(
- self.book_id,
- chapter_num,
- chapter_url,
- volume_path,
- self.meta,
- )
+ # C&U FIX — must use "num"
+ num = ch["num"]
+ url = ch["url"]
+ vp = self.volume_paths[num]
+
+ p = chain(
+ download_chapter.s(self.book_idx, num, url, vp),
+ parse_chapter.s(self.book_idx, num),
+ save_chapter.s(self.book_idx, num),
+ update_progress.s(self.book_idx),
)
+ pipelines.append(p)
- async_result = group(tasks).apply_async()
-
- log(
- f"[CTRL] Pipelines dispatched for '{self.title}' "
- f"(book_id={self.book_id}, group_id={async_result.id})"
- )
-
- # Debug abort state
- try:
- abort_state = abort_requested(self.book_id)
- log(f"[CTRL_DEBUG] After-dispatch abort state: {abort_state}")
- except Exception as e:
- log(f"[CTRL_DEBUG] abort_requested error after dispatch: {e}")
-
- # -------------------------------------------------------
- self.replicate_cover_to_volumes()
+ return pipelines
- # -------------------------------------------------------
+ # -----------------------------------------------------------
+ # Launch Celery group(pipelines)
+ # -----------------------------------------------------------
+ def dispatch_pipelines(self, pipelines):
try:
- generate_all_scripts(
- self.book_base,
- self.title,
- self.meta.get("author"),
- )
- log(f"[CTRL] Scripts generated for '{self.title}'")
- except Exception as e:
- log(f"[CTRL] Script generation failed: {e}")
-
- return async_result
+ g = group(pipelines)
+ result = g.apply_async()
+ return result
+ except Exception as exc:
+ log(f"[CTRL] ERROR dispatching pipelines: {exc}")
+ raise
+
+ # -----------------------------------------------------------
+ # Legacy convenience entrypoint
+ # -----------------------------------------------------------
+ def start(self):
+ self.init_book_output()
+ self.assign_volumes()
+ return self.dispatch_pipelines(self.build_pipelines())
diff --git a/bookscraper/scraper/tasks/audio_tasks.py b/bookscraper/scraper/tasks/audio_tasks.py
index fea3285..f43d3a9 100644
--- a/bookscraper/scraper/tasks/audio_tasks.py
+++ b/bookscraper/scraper/tasks/audio_tasks.py
@@ -1,5 +1,8 @@
# ============================================================
# File: scraper/tasks/audio_tasks.py
+# Purpose:
+# Convert a saved chapter's text file into an audio file (.m4a)
+# using macOS "say". Uses BookContext + ChapterContext.
# ============================================================
from celery_app import celery_app
@@ -12,29 +15,38 @@ from scraper.abort import abort_requested
from redis import Redis
from urllib.parse import urlparse
-# Kies lokale redis als aanwezig, anders standaard backend
-redis_url = os.getenv("REDIS_BACKEND_LOCAL") or os.getenv("REDIS_BACKEND")
-
-parsed = urlparse(redis_url)
+from scraper.progress import (
+ set_status,
+ set_last_update,
+ inc_audio_done,
+ save_skip_reason,
+)
# ------------------------------------------------------------
-# REGULIER REDIS CLIENT (slots, file checks, state)
+# REDIS CLIENTS
# ------------------------------------------------------------
+
+redis_url = os.getenv("REDIS_BACKEND_LOCAL") or os.getenv("REDIS_BACKEND")
+parsed = urlparse(redis_url)
+
+# Slot management DB (same as download slot handling)
redis_client = Redis(
host=parsed.hostname,
port=parsed.port,
db=parsed.path.strip("/"),
)
-# ------------------------------------------------------------
-# BACKEND CLIENT (abort flags, progress counters) - altijd DB 0
-# ------------------------------------------------------------
+# Backend DB (abort + progress counters)
backend_client = Redis(
host=parsed.hostname,
port=parsed.port,
db=0,
)
+# ------------------------------------------------------------
+# ENVIRONMENT
+# ------------------------------------------------------------
+
AUDIO_TIMEOUT = int(os.getenv("AUDIO_TIMEOUT_SECONDS", "300"))
AUDIO_VOICE = os.getenv("AUDIO_VOICE", "SinJi")
AUDIO_RATE = int(os.getenv("AUDIO_RATE", "200"))
@@ -44,20 +56,54 @@ AUDIO_SLOTS = int(os.getenv("AUDIO_SLOTS", "1"))
CONTAINER_PREFIX = os.getenv("BOOKSCRAPER_OUTPUT_DIR", "/app/output")
+# ============================================================
+# CELERY TASK — NOW USING CONTEXT OBJECTS
+# ============================================================
+
+
@celery_app.task(bind=True, queue="audio", ignore_result=True)
-def generate_audio(
- self, book_id, volume_name, chapter_number, chapter_title, chapter_text
-):
- log(f"[AUDIO] CH{chapter_number}: START task → raw_input={chapter_text}")
-
- # Abort early
- if abort_requested(book_id, backend_client):
- log(f"[AUDIO] ABORT detected → skip CH{chapter_number}")
+def generate_audio(self, book_context, chapter_context):
+ """
+ Create audio using:
+ - book_context.book_idx
+ - chapter_context.number
+ - chapter_context.path (text filepath)
+ """
+
+ # ------------------------------------------------------------
+ # IDENTIFIERS
+ # ------------------------------------------------------------
+ book_idx = book_context.book_idx
+ chapter_number = chapter_context.number
+ text_file_path = chapter_context.path
+
+ log(f"[AUDIO] CH{chapter_number}: START task → {text_file_path}")
+
+ # ------------------------------------------------------------
+ # Update state: audio stage active
+ # ------------------------------------------------------------
+ try:
+ set_status(book_idx, "audio")
+ set_last_update(book_idx)
+ except Exception:
+ pass
+
+ # ------------------------------------------------------------
+ # Abort BEFORE doing anything
+ # ------------------------------------------------------------
+ if abort_requested(book_idx, backend_client):
+ log(f"[AUDIO] ABORT detected → skip chapter {chapter_number}")
+ try:
+ chapter_context.add_skip("audio", "abort")
+ save_skip_reason(book_idx, chapter_number, "audio_abort")
+ set_last_update(book_idx)
+ except Exception:
+ pass
return
- # ============================================================
+ # ------------------------------------------------------------
# ACQUIRE AUDIO SLOT
- # ============================================================
+ # ------------------------------------------------------------
slot_key = None
ttl = AUDIO_TIMEOUT + 15
@@ -68,11 +114,13 @@ def generate_audio(
log(f"[AUDIO] CH{chapter_number}: Acquired slot {i}/{AUDIO_SLOTS}")
break
+ # If no slot free → wait
if slot_key is None:
log(f"[AUDIO] CH{chapter_number}: All slots busy → waiting...")
start_wait = time.time()
while slot_key is None:
+ # retry each slot
for i in range(1, AUDIO_SLOTS + 1):
key = f"audio_slot:{i}"
if redis_client.set(key, "1", nx=True, ex=ttl):
@@ -80,101 +128,135 @@ def generate_audio(
log(f"[AUDIO] CH{chapter_number}: Slot acquired after wait")
break
- if slot_key:
- break
-
- if abort_requested(book_id, backend_client):
+ # abort while waiting
+ if abort_requested(book_idx, backend_client):
log(f"[AUDIO] ABORT while waiting → skip CH{chapter_number}")
+ try:
+ chapter_context.add_skip("audio", "abort_wait")
+ save_skip_reason(book_idx, chapter_number, "audio_abort_wait")
+ set_last_update(book_idx)
+ except Exception:
+ pass
return
+ # timeout
if time.time() - start_wait > ttl:
- log(f"[AUDIO] CH{chapter_number}: Slot wait timeout → aborting audio")
+ log(f"[AUDIO] CH{chapter_number}: WAIT TIMEOUT → aborting")
+ try:
+ chapter_context.add_skip("audio", "timeout_wait")
+ save_skip_reason(book_idx, chapter_number, "audio_timeout_wait")
+ set_last_update(book_idx)
+ except Exception:
+ pass
return
time.sleep(0.25)
- # ============================================================
- # PATH NORMALISATION
- # ============================================================
-
- container_path = chapter_text
-
- # Fix 1 — container_path kan None zijn → abort zonder crash
- if not container_path:
- log(f"[AUDIO] CH{chapter_number}: FATAL — no input path provided")
- redis_client.delete(slot_key)
+ # ------------------------------------------------------------
+ # VALIDATE INPUT PATH
+ # ------------------------------------------------------------
+ if not text_file_path:
+ log(f"[AUDIO] CH{chapter_number}: No input path")
+ try:
+ chapter_context.add_skip("audio", "missing_input")
+ save_skip_reason(book_idx, chapter_number, "audio_missing_input")
+ set_last_update(book_idx)
+ except Exception:
+ pass
+ if slot_key:
+ redis_client.delete(slot_key)
return
- # Fix 2 — veilige startswith
- if CONTAINER_PREFIX and container_path.startswith(CONTAINER_PREFIX):
- relative_path = container_path[len(CONTAINER_PREFIX) :].lstrip("/")
+ # Convert container path → host FS path
+ if text_file_path.startswith(CONTAINER_PREFIX):
+ relative = text_file_path[len(CONTAINER_PREFIX) :].lstrip("/")
else:
- relative_path = container_path
-
- parts = relative_path.split("/")
- if len(parts) < 3:
- log(
- f"[AUDIO] CH{chapter_number}: FATAL — cannot parse book/volume from {relative_path}"
- )
- redis_client.delete(slot_key)
- return
-
- book_from_path = parts[0]
- volume_from_path = parts[1]
-
- host_path = os.path.join(HOST_PATH, relative_path)
-
- # ============================================================
- # OUTPUT PREP
- # ============================================================
-
- base_dir = os.path.join(HOST_PATH, book_from_path, volume_from_path, "Audio")
- os.makedirs(base_dir, exist_ok=True)
+ relative = text_file_path
+
+ host_input_path = os.path.join(HOST_PATH, relative)
+
+ # ------------------------------------------------------------
+ # Determine output directory
+ # ------------------------------------------------------------
+ volume_name = os.path.basename(os.path.dirname(text_file_path))
+ audio_output_dir = os.path.join(
+ HOST_PATH,
+ os.path.basename(book_context.book_base),
+ volume_name,
+ "Audio",
+ )
+ os.makedirs(audio_output_dir, exist_ok=True)
- safe_num = f"{chapter_number:04d}"
- audio_file = os.path.join(base_dir, f"{safe_num}.m4a")
+ audio_file = os.path.join(audio_output_dir, f"{chapter_number:04d}.m4a")
+ # ------------------------------------------------------------
+ # Skip if output already exists
+ # ------------------------------------------------------------
if os.path.exists(audio_file):
log(f"[AUDIO] Skip CH{chapter_number} → already exists")
- redis_client.delete(slot_key)
+ try:
+ chapter_context.add_skip("audio", "already_exists")
+ save_skip_reason(book_idx, chapter_number, "audio_exists")
+ set_last_update(book_idx)
+ except Exception:
+ pass
+ if slot_key:
+ redis_client.delete(slot_key)
return
- # ============================================================
- # BUILD CMD
- # ============================================================
-
+ # ------------------------------------------------------------
+ # BUILD COMMAND
+ # ------------------------------------------------------------
cmd = (
f"say --voice={AUDIO_VOICE} "
- f"--input-file='{host_path}' "
+ f"--input-file='{host_input_path}' "
f"--output-file='{audio_file}' "
- f"--file-format=m4bf "
- f"--quality=127 "
- f"-r {AUDIO_RATE} "
- f"--data-format=aac"
+ f"--file-format=m4bf --quality=127 "
+ f"-r {AUDIO_RATE} --data-format=aac"
)
log(f"[AUDIO] CH{chapter_number}: CMD = {cmd}")
- # ============================================================
+ # ------------------------------------------------------------
# RUN TTS
- # ============================================================
+ # ------------------------------------------------------------
try:
subprocess.run(cmd, shell=True, check=True, timeout=AUDIO_TIMEOUT)
log(f"[AUDIO] CH{chapter_number}: Completed")
+ inc_audio_done(book_idx)
+ set_last_update(book_idx)
+
+ chapter_context.audio_done = True
+
except subprocess.TimeoutExpired:
- log(f"[AUDIO] CH{chapter_number}: TIMEOUT → remove incomplete file")
+ log(f"[AUDIO] CH{chapter_number}: TIMEOUT → removing incomplete file")
+ try:
+ chapter_context.add_skip("audio", "timeout")
+ save_skip_reason(book_idx, chapter_number, "audio_timeout")
+ set_last_update(book_idx)
+ except Exception:
+ pass
if os.path.exists(audio_file):
- try:
- os.remove(audio_file)
- except Exception:
- pass
+ os.remove(audio_file)
except subprocess.CalledProcessError as e:
- log(f"[AUDIO] CH{chapter_number}: ERROR during say → {e}")
+ log(f"[AUDIO] CH{chapter_number}: ERROR in say → {e}")
+ try:
+ chapter_context.add_skip("audio", f"cmd_error:{e}")
+ save_skip_reason(book_idx, chapter_number, "audio_cmd_error")
+ set_last_update(book_idx)
+ except Exception:
+ pass
except Exception as e:
log(f"[AUDIO] CH{chapter_number}: UNEXPECTED ERROR → {e}")
+ try:
+ chapter_context.add_skip("audio", f"unexpected:{e}")
+ save_skip_reason(book_idx, chapter_number, "audio_unexpected_error")
+ set_last_update(book_idx)
+ except Exception:
+ pass
finally:
if slot_key:
diff --git a/bookscraper/scraper/tasks/controller_tasks.py b/bookscraper/scraper/tasks/controller_tasks.py
index 0f06405..b7336c8 100644
--- a/bookscraper/scraper/tasks/controller_tasks.py
+++ b/bookscraper/scraper/tasks/controller_tasks.py
@@ -1,81 +1,98 @@
# ============================================================
# File: scraper/tasks/controller_tasks.py
# Purpose:
-# Start the download → parse → save pipeline for a scraped book,
-# including progress/abort tracking via book_id.
-# ONLY THE CONTROLLER UPDATES PROGRESS (initial total).
+# Launch the full download/parse/save pipeline.
+#
+# JSON-safe Celery architecture:
+# • controller receives primitive scrape_result
+# • controller builds DownloadController locally
+# • controller assigns volumes & prepares folders
+# • Celery pipelines receive ONLY (book_idx, chapter_num)
# ============================================================
from celery_app import celery_app
from logbus.publisher import log
from scraper.download_controller import DownloadController
-from scraper.progress import (
- set_total,
-)
+from scraper.progress import set_total, set_status
from scraper.abort import abort_requested
-print(">>> [IMPORT] controller_tasks.py loaded")
+print(">>> [IMPORT] controller_tasks.py loaded (ID-only mode)")
@celery_app.task(bind=True, queue="controller", ignore_result=False)
def launch_downloads(self, book_id: str, scrape_result: dict):
- """
- Launch the entire pipeline (download → parse → save),
- AND initialize progress counters.
-
- Chapter-level progress is updated INSIDE the download/parse/save tasks.
- This task MUST NOT call .get() on async subtasks (Celery restriction).
- """
-
- title = scrape_result.get("title", "UnknownBook")
- chapters = scrape_result.get("chapters", []) or []
- total = len(chapters)
-
- log(f"[CTRL] Book '{title}' → {total} chapters (book_id={book_id})")
-
- # ------------------------------------------------------------
- # INIT PROGRESS
- # ------------------------------------------------------------
- set_total(book_id, total)
- log(f"[CTRL] Progress initialized for {book_id}: total={total}")
-
- # ------------------------------------------------------------
- # BUILD CONTROLLER
- # ------------------------------------------------------------
- ctl = DownloadController(book_id, scrape_result)
-
- # ------------------------------------------------------------
- # START PIPELINES (ASYNC)
- # Returns a celery group AsyncResult. We DO NOT iterate or get().
- # Progress & failures are handled by the worker subtasks.
- # ------------------------------------------------------------
+
+ title = scrape_result.get("title", "Unknown")
+ chapter_count = scrape_result.get("chapters", 0)
+ book_idx = scrape_result.get("book_id")
+
+ log(f"[CTRL] Book '{title}' → {chapter_count} chapters (book_idx={book_idx})")
+
+ # -----------------------------------------------------------
+ # Initialize progress counters
+ # -----------------------------------------------------------
+ try:
+ set_total(book_idx, chapter_count)
+ set_status(book_idx, "downloading")
+ except Exception as exc:
+ log(f"[CTRL] ERROR setting up progress counters: {exc}")
+ raise
+
+ # -----------------------------------------------------------
+ # Build controller
+ # -----------------------------------------------------------
+ try:
+ ctl = DownloadController(scrape_result)
+ except Exception as exc:
+ log(f"[CTRL] ERROR constructing DownloadController: {exc}")
+ raise
+
+ # -----------------------------------------------------------
+ # Prepare folders + cover (NEW: init_book_output)
+ # -----------------------------------------------------------
try:
- group_result = ctl.start()
+ ctl.init_book_output()
+ except Exception as exc:
+ log(f"[CTRL] ERROR initializing context: {exc}")
+ raise
+
+ log(f"[CTRL_STATE] init_book_output() completed for {title}")
- log(
- f"[CTRL] Pipelines dispatched for '{title}' "
- f"(book_id={book_id}, group_id={group_result.id})"
- )
+ # -----------------------------------------------------------
+ # Abort check BEFORE pipelines
+ # -----------------------------------------------------------
+ if abort_requested(book_idx):
+ log(f"[CTRL] ABORT requested BEFORE pipeline dispatch → stopping.")
+ set_status(book_idx, "aborted")
+ return {"status": "aborted"}
- # Abort flag set BEFORE tasks start?
- if abort_requested(book_id):
- log(f"[CTRL] ABORT requested before tasks start")
- return {"book_id": book_id, "aborted": True}
+ # -----------------------------------------------------------
+ # Assign volumes
+ # -----------------------------------------------------------
+ try:
+ ctl.assign_volumes()
+ except Exception as exc:
+ log(f"[CTRL] ERROR during volume assignment: {exc}")
+ raise
+ # -----------------------------------------------------------
+ # Build pipelines (primitive arguments)
+ # -----------------------------------------------------------
+ try:
+ tasks = ctl.build_pipelines()
+ except Exception as exc:
+ log(f"[CTRL] ERROR building pipelines: {exc}")
+ raise
+
+ # -----------------------------------------------------------
+ # Dispatch all pipelines
+ # -----------------------------------------------------------
+ try:
+ result_group = ctl.dispatch_pipelines(tasks)
except Exception as exc:
- log(f"[CTRL] ERROR while dispatching pipelines: {exc}")
+ log(f"[CTRL] ERROR dispatching pipelines: {exc}")
raise
- # ------------------------------------------------------------
- # CONTROLLER DOES NOT WAIT FOR SUBTASK RESULTS
- # (Download/parse/save tasks update progress themselves)
- # ------------------------------------------------------------
- log(f"[CTRL] Controller finished dispatch for book_id={book_id}")
-
- return {
- "book_id": book_id,
- "total": total,
- "started": True,
- "group_id": group_result.id,
- }
+ log(f"[CTRL] Pipelines dispatched for {title} (book_idx={book_idx})")
+ return {"book_idx": book_idx, "pipelines": len(tasks)}
diff --git a/bookscraper/scraper/tasks/download_tasks.py b/bookscraper/scraper/tasks/download_tasks.py
index 5110483..1e01630 100644
--- a/bookscraper/scraper/tasks/download_tasks.py
+++ b/bookscraper/scraper/tasks/download_tasks.py
@@ -1,22 +1,41 @@
# ============================================================
# File: scraper/tasks/download_tasks.py
-# Purpose: Download chapter HTML with global concurrency,
-# retry/backoff logic, 429 support, and abort-awareness.
#
-# Logging:
-# - timestamp + book_id in de message
-# - message wordt via publisher.py naar console gestuurd
-# - message wordt via ui_log.push_ui naar Redis GUI logbuffer gestuurd
+# FINAL ARCHITECTURE — CELERY-SAFE VERSION
+#
+# This task downloads EXACTLY ONE CHAPTER using ONLY primitive
+# Celery-safe arguments:
+#
+# download_chapter(book_idx, chapter_num, chapter_url, volume_path)
+#
+# BookContext / ChapterContext are NOT passed into Celery and
+# NOT loaded inside workers. Workers ONLY manipulate:
+#
+# • Redis abort flags
+# • Redis chapter-start markers
+# • Redis progress counters
+# • save_path derived from volume_path + chapter_num
+#
+# The chapter HTML is written to disk and a small Redis skip-
+# reason is stored when needed. All other state lives outside
+# Celery.
+#
+# This is the architecture we agreed on to avoid JSON
+# serialization errors and to remove the need for
+# load_book_context/save_book_context.
#
-# publisher.py en ui_log.py blijven DOM.
# ============================================================
from celery_app import celery_app
from scraper.utils import get_save_path
from scraper.abort import abort_requested, chapter_started, mark_chapter_started
-from logbus.publisher import log # console logging (DOM)
-from scraper.ui_log import push_ui # GUI logging (DOM)
+from scraper.progress import (
+ inc_chapter_done,
+ save_skip_reason,
+)
+
+from logbus.publisher import log
import requests
import redis
@@ -25,50 +44,38 @@ import time
from datetime import datetime
-print(">>> [IMPORT] download_tasks.py loaded")
+print(">>> [IMPORT] download_tasks.py loaded (final Celery-safe mode)")
# -----------------------------------------------------------
-# TIMESTAMPED LOG WRAPPER
+# TIMESTAMPED LOGGER (book_idx ONLY)
# -----------------------------------------------------------
-def log_msg(book_id: str, message: str):
+def log_msg(book_idx: str, message: str):
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- full = f"{ts} [{book_id}] {message}"
- log(full)
- push_ui(full)
+ log(f"{ts} [{book_idx}] {message}")
# -----------------------------------------------------------
-# Retry parameters (ENV)
+# ENV SETTINGS
# -----------------------------------------------------------
MAX_RETRIES = int(os.getenv("DOWNLOAD_MAX_RETRIES", "7"))
BASE_DELAY = int(os.getenv("DOWNLOAD_BASE_DELAY", "2"))
BACKOFF = int(os.getenv("DOWNLOAD_BACKOFF_MULTIPLIER", "2"))
DELAY_429 = int(os.getenv("DOWNLOAD_429_DELAY", "10"))
-# -----------------------------------------------------------
-# Global concurrency
-# -----------------------------------------------------------
MAX_CONCURRENCY = int(os.getenv("DOWNLOAD_MAX_GLOBAL_CONCURRENCY", "1"))
-
-# -----------------------------------------------------------
-# Global delay sync
-# -----------------------------------------------------------
GLOBAL_DELAY = int(os.getenv("DOWNLOAD_GLOBAL_MIN_DELAY", "1"))
-DELAY_KEY = "download:delay_lock"
-# -----------------------------------------------------------
-# Redis
-# -----------------------------------------------------------
REDIS_URL = os.getenv("REDIS_BROKER", "redis://redis:6379/0")
redis_client = redis.Redis.from_url(REDIS_URL)
SEM_KEY = "download:active"
+DELAY_KEY = "download:delay_lock"
-# ============================================================
-# GLOBAL DELAY FUNCTIONS
-# ============================================================
+# -----------------------------------------------------------
+# Delay + concurrency helpers
+# -----------------------------------------------------------
def wait_for_global_delay():
if GLOBAL_DELAY <= 0:
return
@@ -82,13 +89,10 @@ def set_global_delay():
redis_client.set(DELAY_KEY, "1", nx=True, ex=GLOBAL_DELAY)
-# ============================================================
-# GLOBAL CONCURRENCY FUNCTIONS
-# ============================================================
-def acquire_global_slot(max_slots: int, retry_delay: float = 0.5):
+def acquire_global_slot(max_slots: int, retry_delay=0.5):
while True:
- current = redis_client.incr(SEM_KEY)
- if current <= max_slots:
+ cur = redis_client.incr(SEM_KEY)
+ if cur <= max_slots:
return
redis_client.decr(SEM_KEY)
time.sleep(retry_delay)
@@ -98,81 +102,71 @@ def release_global_slot():
redis_client.decr(SEM_KEY)
-print(f">>> [CONFIG] Global concurrency = {MAX_CONCURRENCY}")
-print(f">>> [CONFIG] Global min delay = {GLOBAL_DELAY}s")
-print(
- f">>> [CONFIG] Retries: MAX={MAX_RETRIES}, base={BASE_DELAY}, "
- f"backoff={BACKOFF}, 429={DELAY_429}"
-)
-
-
# ============================================================
-# CELERY TASK: DOWNLOAD CHAPTER
+# CELERY TASK — PRIMITIVE ARG MODE
+#
+# book_idx: str Unique book identifier used for state tracking
+# chapter_num: int Chapter number (1-based)
+# chapter_url: str URL to download this chapter
+# volume_path: str Filesystem directory where this chapter belongs
# ============================================================
@celery_app.task(bind=True, queue="download", ignore_result=False)
def download_chapter(
- self, book_id: str, chapter_num: int, chapter_url: str, base_path: str
+ self, book_idx: str, chapter_num: int, chapter_url: str, volume_path: str
):
"""
- Download chapter HTML.
- Abort logic:
- - If abort active AND chapter not started → SKIP
- - If abort active BUT chapter already started → Proceed normally
+ Download a single chapter using ONLY Celery-safe primitives.
+ No BookContext or ChapterContext is ever loaded.
+
+ Writes:
+ <volume_path>/<chapter_num>.html
"""
# -----------------------------------------------------------
- # ABORT BEFORE START
+ # Abort BEFORE start
# -----------------------------------------------------------
- if abort_requested(book_id) and not chapter_started(book_id, chapter_num):
- msg = f"[ABORT] Skip chapter {chapter_num} (abort active, not started)"
- log_msg(book_id, msg)
- return {
- "book_id": book_id,
- "chapter": chapter_num,
- "url": chapter_url,
- "html": None,
- "skipped": True,
- "path": None,
- "abort": True,
- }
-
- # Mark started
- mark_chapter_started(book_id, chapter_num)
+ if abort_requested(book_idx) and not chapter_started(book_idx, chapter_num):
+ log_msg(book_idx, f"[ABORT] Skip chapter {chapter_num}")
+
+ save_skip_reason(book_idx, chapter_num, "abort_before_start")
+ inc_chapter_done(book_idx)
+ return None
+
+ # Mark chapter as started
+ mark_chapter_started(book_idx, chapter_num)
# -----------------------------------------------------------
- # NEW POSITION FOR SKIP BLOCK (before any delay logic)
+ # Path resolution
# -----------------------------------------------------------
- save_path = get_save_path(chapter_num, base_path)
+ save_path = get_save_path(chapter_num, volume_path)
+ os.makedirs(volume_path, exist_ok=True)
+ # -----------------------------------------------------------
+ # Skip if file already exists
+ # -----------------------------------------------------------
if os.path.exists(save_path):
- log_msg(book_id, f"[DL] SKIP {chapter_num} (exists) → {save_path}")
- return {
- "book_id": book_id,
- "chapter": chapter_num,
- "url": chapter_url,
- "html": None,
- "skipped": True,
- "path": save_path,
- }
+ log_msg(book_idx, f"[DL] SKIP {chapter_num} (exists)")
+
+ save_skip_reason(book_idx, chapter_num, "already_exists")
+ inc_chapter_done(book_idx)
+ return None
# -----------------------------------------------------------
- # Hard delay (only for real downloads)
+ # Delay + concurrency enforcement
# -----------------------------------------------------------
if GLOBAL_DELAY > 0:
time.sleep(GLOBAL_DELAY)
- # Sync delay
wait_for_global_delay()
-
- # Acquire concurrency slot
acquire_global_slot(MAX_CONCURRENCY)
- log_msg(book_id, f"[DL] ACQUIRED SLOT for chapter {chapter_num}")
+ log_msg(book_idx, f"[DL] ACQUIRED SLOT for chapter {chapter_num}")
+
+ # -----------------------------------------------------------
+ # HTTP Download
+ # -----------------------------------------------------------
try:
- # -----------------------------------------------------------
- # HTTP DOWNLOAD
- # -----------------------------------------------------------
- log_msg(book_id, f"[DL] Downloading chapter {chapter_num}: {chapter_url}")
+ log_msg(book_idx, f"[DL] GET chapter {chapter_num}: {chapter_url}")
resp = requests.get(
chapter_url,
@@ -184,42 +178,31 @@ def download_chapter(
resp.encoding = resp.apparent_encoding or "gb2312"
html = resp.text
- log_msg(book_id, f"[DL] OK {chapter_num}: {len(html)} bytes")
+ # Write file
+ with open(save_path, "w", encoding="utf-8") as f:
+ f.write(html)
- return {
- "book_id": book_id,
- "chapter": chapter_num,
- "url": chapter_url,
- "html": html,
- "skipped": False,
- "path": save_path,
- }
+ log_msg(book_idx, f"[DL] OK {chapter_num}: {len(html)} bytes")
+ inc_chapter_done(book_idx)
+ return None
except Exception as exc:
- attempt = self.request.retries
- delay = BASE_DELAY * (BACKOFF**attempt)
-
- # 429 hard block
+ # 429: Too Many Requests
if getattr(getattr(exc, "response", None), "status_code", None) == 429:
- log_msg(
- book_id,
- f"[DL] 429 {chapter_num} → WAIT {DELAY_429}s "
- f"(attempt {attempt}/{MAX_RETRIES})",
- )
-
+ log_msg(book_idx, f"[DL] 429 → wait {DELAY_429}s")
time.sleep(DELAY_429)
set_global_delay()
raise self.retry(exc=exc, countdown=0, max_retries=MAX_RETRIES)
- # Normal error
+ # Standard error
+ delay = BASE_DELAY * (BACKOFF ** min(self.request.retries, 5))
log_msg(
- book_id,
+ book_idx,
f"[DL] ERROR {chapter_num}: {exc} → retry in {delay}s "
- f"(attempt {attempt}/{MAX_RETRIES})",
+ f"(attempt {self.request.retries + 1}/{MAX_RETRIES})",
)
+
raise self.retry(exc=exc, countdown=delay, max_retries=MAX_RETRIES)
finally:
- set_global_delay()
release_global_slot()
- log_msg(book_id, f"[DL] RELEASED SLOT for chapter {chapter_num}")
diff --git a/bookscraper/scraper/tasks/parse_tasks.py b/bookscraper/scraper/tasks/parse_tasks.py
index 52066f9..7460f70 100644
--- a/bookscraper/scraper/tasks/parse_tasks.py
+++ b/bookscraper/scraper/tasks/parse_tasks.py
@@ -1,155 +1,132 @@
-# =========================================================
+# ============================================================
# File: scraper/tasks/parse_tasks.py
-# Purpose: Parse downloaded HTML into clean chapter text.
-# Enhanced version: Piaotia H1→content extractor + clean pipeline
-# NO HARDCODED REPLACEMENTS — everything comes from replacement files
-# =========================================================
+#
+# FINAL ARCHITECTURE — CELERY-SAFE VERSION
+#
+# This task parses a downloaded chapter using ONLY primitive,
+# Celery-safe arguments:
+#
+# parse_chapter(book_idx, chapter_num, html_path, text_path)
+#
+# NO BookContext or ChapterContext are ever loaded.
+#
+# Responsibilities:
+# • Read HTML file from disk
+# • Convert to cleaned text
+# • Write .txt file back to disk
+# • Update Redis progress counters
+# • Abort-aware
+# • Skip if HTML missing
+#
+# This matches the structure of download_tasks.py and avoids all
+# serialization issues.
+# ============================================================
from celery_app import celery_app
-from bs4 import BeautifulSoup
-from scraper.utils import clean_text, load_all_replacements
-from scraper.tasks.download_tasks import log_msg # unified logger
+from scraper.utils import clean_text
+from scraper.abort import abort_requested, chapter_started, mark_chapter_started
+from scraper.progress import (
+ inc_chapter_done,
+ save_skip_reason,
+)
-print(">>> [IMPORT] parse_tasks.py loaded (enhanced parser)")
+from logbus.publisher import log
+import os
+from datetime import datetime
+
+print(">>> [IMPORT] parse_tasks.py loaded (final Celery-safe mode)")
+
+
+# -----------------------------------------------------------
+# TIMESTAMPED LOGGER (book_idx ONLY)
+# -----------------------------------------------------------
+def log_msg(book_idx: str, message: str):
+ ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
+ log(f"{ts} [{book_idx}] {message}")
+
+
+# ============================================================
+# CELERY TASK — PRIMITIVE ARGUMENT MODE
+#
+# book_idx: str Unique Redis state key
+# chapter_num: int Chapter number (1-based)
+# html_path: str Path to downloaded HTML file
+# text_path: str Path where parsed/cleaned text must be written
+# ============================================================
@celery_app.task(bind=True, queue="parse", ignore_result=False)
-def parse_chapter(self, download_result: dict, meta: dict):
-
- book_id = download_result.get("book_id", "NOBOOK")
-
- # ------------------------------------------------------------
- # SKIPPED DOWNLOAD → SKIP PARSE
- # ------------------------------------------------------------
- if download_result.get("skipped"):
- chapter = download_result.get("chapter")
- log_msg(book_id, f"[PARSE] SKIP chapter {chapter} (download skipped)")
- download_result["book_id"] = book_id
- return download_result
-
- # ------------------------------------------------------------
- # NORMAL PARSE
- # ------------------------------------------------------------
- chapter_num = download_result["chapter"]
- chapter_url = download_result["url"]
- html = download_result["html"]
-
- log_msg(book_id, f"[PARSE] Parsing chapter {chapter_num}")
-
- soup = BeautifulSoup(html, "lxml")
-
- # ------------------------------------------------------------
- # STRICT SELECTORS (direct content blocks)
- # ------------------------------------------------------------
- selectors = [
- "#content",
- "div#content",
- ".content",
- "div.content",
- "#chaptercontent",
- "div#chaptercontent",
- "#chapterContent",
- ".read-content",
- "div.read-content",
- ]
-
- node = None
- for sel in selectors:
- tmp = soup.select_one(sel)
- if tmp:
- node = tmp
- break
-
- # ------------------------------------------------------------
- # PIAOTIA FALLBACK:
- # Extract content between <H1> and the "bottomlink" block.
- # ------------------------------------------------------------
- raw = None
- if node is None:
- h1 = soup.find("h1")
- if h1:
- content_parts = []
- for sib in h1.next_siblings:
-
- # stop at bottom navigation/footer block
- sib_class = getattr(sib, "get", lambda *_: None)("class")
- if sib_class and (
- "bottomlink" in sib_class or sib_class == "bottomlink"
- ):
- break
-
- # ignore typical noise containers
- if getattr(sib, "name", None) in ["script", "style", "center"]:
- continue
-
- if hasattr(sib, "get_text"):
- content_parts.append(sib.get_text(separator="\n"))
- else:
- content_parts.append(str(sib))
-
- raw = "\n".join(content_parts)
-
- # ------------------------------------------------------------
- # FINAL FALLBACK
- # ------------------------------------------------------------
- if raw is None:
- if node:
- raw = node.get_text(separator="\n")
- else:
- # drop scripts & styles
- for tag in soup(["script", "style", "noscript"]):
- tag.decompose()
-
- raw = soup.get_text(separator="\n")
-
- # ------------------------------------------------------------
- # MULTIPASS CLEANING via replacement files ONLY
- # ------------------------------------------------------------
- REPL = load_all_replacements()
-
- text = raw
- for _ in range(5): # like the C# CleanText loop
- text = clean_text(text, REPL)
-
- # ------------------------------------------------------------
- # Collapse excessive empty lines
- # ------------------------------------------------------------
- cleaned = []
- prev_blank = False
-
- for line in text.split("\n"):
- stripped = line.rstrip()
- if stripped == "":
- if prev_blank:
- continue
- prev_blank = True
- cleaned.append("")
- else:
- prev_blank = False
- cleaned.append(stripped)
-
- text = "\n".join(cleaned)
-
- # ------------------------------------------------------------
- # Add header to chapter 1
- # ------------------------------------------------------------
- if chapter_num == 1:
- book_url = meta.get("book_url") or meta.get("url") or "UNKNOWN"
- header = (
- f"{meta.get('title','')}\n"
- f"Author: {meta.get('author','')}\n"
- f"Description:\n{meta.get('description','')}\n"
- f"Book URL: {book_url}\n" + "-" * 50 + "\n\n"
- )
- text = header + text
-
- log_msg(book_id, f"[PARSE] Parsed chapter {chapter_num}: {len(text)} chars")
-
- return {
- "book_id": book_id,
- "chapter": chapter_num,
- "url": chapter_url,
- "text": text,
- "length": len(text),
- }
+def parse_chapter(
+ self, book_idx: str, chapter_num: int, html_path: str, text_path: str
+):
+ """
+ Parse a downloaded chapter using ONLY primitive arguments.
+ Converts HTML → cleaned text and writes it to disk.
+ """
+
+ # -----------------------------------------------------------
+ # Abort BEFORE start
+ # -----------------------------------------------------------
+ if abort_requested(book_idx) and not chapter_started(book_idx, chapter_num):
+ log_msg(book_idx, f"[ABORT] Skip PARSE {chapter_num}")
+
+ save_skip_reason(book_idx, chapter_num, "abort_before_start")
+ inc_chapter_done(book_idx)
+ return None
+
+ # Mark as started
+ mark_chapter_started(book_idx, chapter_num)
+
+ # -----------------------------------------------------------
+ # Check if HTML file exists
+ # -----------------------------------------------------------
+ if not os.path.exists(html_path):
+ log_msg(book_idx, f"[PARSE] SKIP {chapter_num} (no HTML file)")
+
+ save_skip_reason(book_idx, chapter_num, "no_html_file")
+ inc_chapter_done(book_idx)
+ return None
+
+ # -----------------------------------------------------------
+ # Load HTML
+ # -----------------------------------------------------------
+ try:
+ with open(html_path, "r", encoding="utf-8") as f:
+ raw_html = f.read()
+ except Exception as exc:
+ log_msg(book_idx, f"[PARSE] ERROR reading HTML {chapter_num}: {exc}")
+
+ save_skip_reason(book_idx, chapter_num, f"read_error: {exc}")
+ inc_chapter_done(book_idx)
+ return None
+
+ if not raw_html.strip():
+ log_msg(book_idx, f"[PARSE] SKIP {chapter_num} (empty HTML)")
+
+ save_skip_reason(book_idx, chapter_num, "html_empty")
+ inc_chapter_done(book_idx)
+ return None
+
+ # -----------------------------------------------------------
+ # Clean HTML → Text
+ # -----------------------------------------------------------
+ try:
+ log_msg(book_idx, f"[PARSE] Start {chapter_num}")
+
+ cleaned = clean_text(raw_html)
+
+ os.makedirs(os.path.dirname(text_path), exist_ok=True)
+ with open(text_path, "w", encoding="utf-8") as f:
+ f.write(cleaned)
+
+ log_msg(book_idx, f"[PARSE] OK {chapter_num}: {len(cleaned)} chars")
+ inc_chapter_done(book_idx)
+ return None
+
+ except Exception as exc:
+ log_msg(book_idx, f"[PARSE] ERROR {chapter_num}: {exc}")
+
+ save_skip_reason(book_idx, chapter_num, f"parse_error: {exc}")
+ inc_chapter_done(book_idx)
+ return None
diff --git a/bookscraper/scraper/tasks/pipeline.py b/bookscraper/scraper/tasks/pipeline.py
index 9da657e..2dae558 100644
--- a/bookscraper/scraper/tasks/pipeline.py
+++ b/bookscraper/scraper/tasks/pipeline.py
@@ -1,17 +1,19 @@
-# =========================================================
+# ============================================================
# File: scraper/tasks/pipeline.py
# Purpose:
-# Build Celery chains for chapter processing.
+# Build a per-chapter Celery pipeline using ONLY JSON-safe
+# arguments: (book_idx, chapter_num).
#
-# Chain:
-# download_chapter(book_id, chapter_num, url, base_path)
-# → parse_chapter(download_result, meta)
-# → save_chapter(parsed_result, base_path)
-# → update_progress(final_result, book_id)
+# Pipeline:
+# download_chapter(book_idx, chapter_num)
+# → parse_chapter(book_idx, chapter_num)
+# → save_chapter(book_idx, chapter_num)
+# → update_progress(book_idx)
#
-# All subtasks must pass through result dicts untouched so the
-# next stage receives the correct fields.
-# =========================================================
+# No BookContext/ChapterContext objects are passed through
+# Celery anymore. All context is loaded internally inside
+# each task.
+# ============================================================
from celery import chain
@@ -21,25 +23,41 @@ from scraper.tasks.save_tasks import save_chapter
from scraper.tasks.progress_tasks import update_progress
-def build_chapter_pipeline(
- book_id: str,
- chapter_number: int,
- chapter_url: str,
- base_path: str,
- meta: dict,
-):
+print(">>> [IMPORT] pipeline.py loaded (ID-only mode)")
+
+
+# ============================================================
+# Build chapter pipeline
+# ============================================================
+def build_chapter_pipeline(book_idx: str, chapter_num: int):
"""
- Build a Celery chain for one chapter.
+ Constructs the Celery chain for a single chapter.
- download_chapter(book_id, chapter_number, chapter_url, base_path)
- → parse_chapter(download_result, meta)
- → save_chapter(parsed_result, base_path)
- → update_progress(result, book_id)
+ Every task receives only JSON-safe arguments. All state
+ (BookContext + ChapterContext) is loaded inside the task.
"""
return chain(
- download_chapter.s(book_id, chapter_number, chapter_url, base_path),
- parse_chapter.s(meta),
- save_chapter.s(base_path),
- update_progress.s(book_id),
+ download_chapter.s(book_idx, chapter_num),
+ parse_chapter.s(book_idx, chapter_num),
+ save_chapter.s(book_idx, chapter_num),
+ # update_progress only needs book_idx.
+ # BUT chain forwards previous task's result → so we accept *args.
+ update_progress.s(book_idx),
)
+
+
+# ============================================================
+# Build pipelines for all chapters
+# (usually called from download_controller)
+# ============================================================
+def build_all_pipelines(book_idx: str, chapters):
+ """
+ Utility: given a list of chapter numbers, build a list of chains.
+
+ chapters = [1, 2, 3, ...]
+ """
+ pipelines = []
+ for ch in chapters:
+ pipelines.append(build_chapter_pipeline(book_idx, ch))
+ return pipelines
diff --git a/bookscraper/scraper/tasks/progress_tasks.py b/bookscraper/scraper/tasks/progress_tasks.py
index 9045fab..717a70c 100644
--- a/bookscraper/scraper/tasks/progress_tasks.py
+++ b/bookscraper/scraper/tasks/progress_tasks.py
@@ -1,43 +1,72 @@
# ============================================================
# File: scraper/tasks/progress_tasks.py
-# Purpose: Central progress updater for chapter pipelines.
+# Purpose:
+# Update pipeline progress after each chapter finishes.
+#
+# MUST accept chain-call semantics:
+# update_progress(previous_result, book_idx)
+#
+# Only book_idx is meaningful; previous_result is ignored.
+#
+# JSON-safe: no BookContext or ChapterContext objects are
+# ever passed into Celery tasks.
# ============================================================
from celery_app import celery_app
-from scraper.progress import inc_completed, inc_skipped, inc_failed
+from scraper.progress import inc_chapter_done, set_status, get_progress
+
from logbus.publisher import log
+from datetime import datetime
+
+
+print(">>> [IMPORT] progress_tasks.py loaded (ID-only mode)")
-print(">>> [IMPORT] progress_tasks.py loaded")
+# -----------------------------------------------------------
+# TIMESTAMPED LOGGER (book_idx ONLY)
+# -----------------------------------------------------------
+def log_msg(book_idx: str, msg: str):
+ ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
+ log(f"{ts} [{book_idx}] {msg}")
-@celery_app.task(bind=False, name="progress.update", queue="controller")
-def update_progress(result: dict, book_id: str):
+
+# ============================================================
+# CELERY TASK — must accept chain semantics
+# ============================================================
+@celery_app.task(bind=True, queue="progress", ignore_result=False)
+def update_progress(self, *args):
"""
- Central progress logic:
- - result: output of save_chapter
- - book_id: explicitly passed by pipeline
+ Chain-safe progress update.
+
+ Celery chain will call:
+ update_progress(previous_result, book_idx)
+
+ Therefore:
+ book_idx = args[-1]
- IMPORTANT:
- - save_chapter already updates counters for skipped & normal chapters
- - progress.update MUST NOT double-increment
+ This increments the done counter and sets status to
+ "completed" once all chapters are done.
"""
- ch = result.get("chapter")
- skipped = result.get("skipped", False)
- failed = result.get("failed", False)
+ if not args:
+ return None
+
+ # Last argument is ALWAYS the book_idx
+ book_idx = args[-1]
+
+ # Increment chapter_done counter
+ inc_chapter_done(book_idx)
- if failed:
- inc_failed(book_id)
- log(f"[PROG] FAILED chapter {ch}")
+ # Fetch progress counters
+ prog = get_progress(book_idx)
+ done = prog.get("done", 0)
+ total = prog.get("total", 0)
- elif skipped:
- # save_chapter already did:
- # inc_skipped + inc_completed
- log(f"[PROG] SKIPPED chapter {ch}")
+ log_msg(book_idx, f"[PROGRESS] Updated: {done}/{total}")
- else:
- # Normal completion: save_chapter only does inc_completed
- inc_completed(book_id)
- log(f"[PROG] DONE chapter {ch}")
+ # If finished → update status
+ if total > 0 and done >= total:
+ set_status(book_idx, "completed")
+ log_msg(book_idx, "[PROGRESS] All chapters completed")
- return result
+ return None
diff --git a/bookscraper/scraper/tasks/save_tasks.py b/bookscraper/scraper/tasks/save_tasks.py
index 8aa0578..e615e9a 100644
--- a/bookscraper/scraper/tasks/save_tasks.py
+++ b/bookscraper/scraper/tasks/save_tasks.py
@@ -1,123 +1,128 @@
# ============================================================
# File: scraper/tasks/save_tasks.py
-# Purpose: Save parsed chapter text to disk + trigger audio.
+#
+# FINAL ARCHITECTURE — CELERY-SAFE VERSION
+#
+# save_chapter(book_idx, chapter_num, text_path, json_path)
+#
+# Deze task slaat GEEN BookContext meer op, en laadt hem ook niet.
+# Alle data komt uit PRIMITIEVE argumenten zodat Celery nooit
+# hoeft te serialiseren of picklen.
+#
+# Functionaliteit:
+# • Abort-aware
+# • Skip als text ontbreekt
+# • Schrijft JSON chapter-object naar disk
+# • Houdt Redis progress state bij
+#
+# Deze versie is 100% consistent met download → parse → save pipeline.
# ============================================================
-print(">>> [IMPORT] save_tasks.py loaded")
+from celery_app import celery_app
-from celery import shared_task
+from scraper.abort import abort_requested, chapter_started, mark_chapter_started
+from scraper.progress import inc_chapter_done, save_skip_reason
+
+from logbus.publisher import log
+import json
import os
+from datetime import datetime
+
-from scraper.utils import get_save_path
-from scraper.tasks.download_tasks import log_msg # unified logger
-from scraper.progress import (
- inc_completed,
- inc_skipped,
- inc_failed,
- add_failed_chapter,
-)
+print(">>> [IMPORT] save_tasks.py loaded (final Celery-safe mode)")
-from scraper.tasks.audio_tasks import generate_audio
+# -----------------------------------------------------------
+# TIMESTAMP LOGGER
+# -----------------------------------------------------------
+def log_msg(book_idx: str, message: str):
+ ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
+ log(f"{ts} [{book_idx}] {message}")
-@shared_task(bind=True, queue="save", ignore_result=False)
-def save_chapter(self, parsed: dict, base_path: str):
+
+# ============================================================
+# CELERY TASK — primitive arguments only
+#
+# book_idx: str
+# chapter_num: int
+# text_path: str path to parsed .txt file
+# json_path: str output path for chapter JSON model
+# ============================================================
+@celery_app.task(bind=True, queue="save", ignore_result=False)
+def save_chapter(self, book_idx: str, chapter_num: int, text_path: str, json_path: str):
"""
- Save parsed chapter text to disk.
-
- parsed = {
- "book_id": str,
- "chapter": int,
- "text": str,
- "url": str,
- "skipped": bool,
- "path": optional str
- }
+ Save parsed chapter text + metadata to JSON on disk.
+ No BookContext is loaded or saved.
"""
- book_id = parsed.get("book_id", "NOBOOK")
- chapter = parsed.get("chapter")
-
- # ------------------------------------------------------------
- # SKIP CASE (download or parse skipped the chapter)
- # ------------------------------------------------------------
- if parsed.get("skipped"):
- path = parsed.get("path", "(no-path)")
- log_msg(book_id, f"[SAVE] SKIP chapter {chapter} → {path}")
-
- inc_skipped(book_id)
-
- # Determine volume name from the base path
- volume_name = os.path.basename(base_path.rstrip("/"))
-
- # Queue audio using the existing saved file
- try:
- generate_audio.delay(
- book_id,
- volume_name,
- chapter,
- f"Chapter {chapter}",
- path, # <<-- correct: this is always the real file path
- )
- log_msg(
- book_id,
- f"[AUDIO] Task queued (SKIPPED) for chapter {chapter} in {volume_name}",
- )
- except Exception as audio_exc:
- log_msg(
- book_id,
- f"[AUDIO] ERROR queueing (SKIPPED) chapter {chapter}: {audio_exc}",
- )
-
- return {
- "book_id": book_id, # <<< FIXED
- "chapter": chapter,
- "path": path,
- "skipped": True,
- }
-
- # ------------------------------------------------------------
- # NORMAL SAVE CASE
- # ------------------------------------------------------------
- try:
- text = parsed.get("text", "")
+ # -----------------------------------------------------------
+ # Abort BEFORE the task starts
+ # -----------------------------------------------------------
+ if abort_requested(book_idx) and not chapter_started(book_idx, chapter_num):
+ log_msg(book_idx, f"[ABORT] Skip SAVE {chapter_num}")
+
+ save_skip_reason(book_idx, chapter_num, "abort_before_start")
+ inc_chapter_done(book_idx)
+ return None
- if chapter is None:
- raise ValueError("Missing chapter number in parsed payload")
+ # Mark chapter as started
+ mark_chapter_started(book_idx, chapter_num)
- # Ensure chapter folder exists
- os.makedirs(base_path, exist_ok=True)
+ # -----------------------------------------------------------
+ # Ensure parsed text exists
+ # -----------------------------------------------------------
+ if not os.path.exists(text_path):
+ log_msg(book_idx, f"[SAVE] SKIP {chapter_num} (missing text file)")
- # Build chapter file path
- path = get_save_path(chapter, base_path)
+ save_skip_reason(book_idx, chapter_num, "no_text_file")
+ inc_chapter_done(book_idx)
+ return None
- # Save chapter text to disk
- with open(path, "w", encoding="utf-8") as f:
- f.write(text)
+ # -----------------------------------------------------------
+ # Read parsed text
+ # -----------------------------------------------------------
+ try:
+ with open(text_path, "r", encoding="utf-8") as f:
+ text = f.read()
+ except Exception as exc:
+ log_msg(book_idx, f"[SAVE] ERROR reading text {chapter_num}: {exc}")
+
+ save_skip_reason(book_idx, chapter_num, f"text_read_error: {exc}")
+ inc_chapter_done(book_idx)
+ return None
+
+ if not text.strip():
+ log_msg(book_idx, f"[SAVE] SKIP {chapter_num} (text empty)")
- log_msg(book_id, f"[SAVE] Saved chapter {chapter} → {path}")
+ save_skip_reason(book_idx, chapter_num, "text_empty")
+ inc_chapter_done(book_idx)
+ return None
- inc_completed(book_id)
+ # -----------------------------------------------------------
+ # Build JSON chapter representation
+ # -----------------------------------------------------------
+ chapter_obj = {
+ "chapter_num": chapter_num,
+ "text": text,
+ }
+
+ # -----------------------------------------------------------
+ # Write JSON output
+ # -----------------------------------------------------------
+ try:
+ os.makedirs(os.path.dirname(json_path), exist_ok=True)
- # Determine volume name
- volume_name = os.path.basename(base_path.rstrip("/"))
+ with open(json_path, "w", encoding="utf-8") as f:
+ json.dump(chapter_obj, f, ensure_ascii=False, indent=2)
- # Queue audio task (always use the saved file path)
- try:
- generate_audio.delay(
- book_id,
- volume_name,
- chapter,
- f"Chapter {chapter}",
- path,
- )
- log_msg(
- book_id, f"[AUDIO] Task queued for chapter {chapter} in {volume_name}"
- )
- except Exception as audio_exc:
- log_msg(book_id, f"[AUDIO] ERROR queueing chapter {chapter}: {audio_exc}")
+ log_msg(book_idx, f"[SAVE] OK {chapter_num}: {json_path}")
- return {"book_id": book_id, "chapter": chapter, "path": path}
+ inc_chapter_done(book_idx)
+ return None
except Exception as exc:
- log_msg(book_id, f"[SAVE] ERROR saving chapter {chapter}: {exc}")
+ log_msg(book_idx, f"[SAVE] ERROR writing JSON {chapter_num}: {exc}")
+
+ save_skip_reason(book_idx, chapter_num, f"json_write_error: {exc}")
+ inc_chapter_done(book_idx)
+ return None
diff --git a/bookscraper/scraper/tasks/scraping.py b/bookscraper/scraper/tasks/scraping.py
index 0694089..70198d0 100644
--- a/bookscraper/scraper/tasks/scraping.py
+++ b/bookscraper/scraper/tasks/scraping.py
@@ -12,11 +12,11 @@ import redis
from scraper.sites import BookSite
from scraper.book_scraper import BookScraper
from scraper.abort import clear_abort # no circular deps
-from scraper.ui_log import reset_ui_logs # <-- NEW IMPORT
+from scraper.ui_log import reset_ui_logs # NEW
print(">>> [IMPORT] scraping.py loaded")
-# Redis connection (same as Celery broker)
+# Redis = same URL as Celery broker
REDIS_URL = os.getenv("REDIS_BROKER", "redis://redis:6379/0")
r = redis.Redis.from_url(REDIS_URL, decode_responses=True)
@@ -26,24 +26,24 @@ def start_scrape_book(self, url: str):
"""Scrapes metadata + chapters and prepares download tracking."""
# ------------------------------------------------------------
- # NEW: clear UI log buffer at start of new run
+ # Clear UI logs for a fresh run
# ------------------------------------------------------------
reset_ui_logs()
log(f"[SCRAPING] Start scraping for: {url}")
# ------------------------------------------------------------
- # Book scrape
+ # Scrape metadata + chapters
# ------------------------------------------------------------
site = BookSite()
scraper = BookScraper(site, url)
- result = scraper.execute() # returns dict with metadata + chapters
+ result = scraper.execute() # dict with metadata + chapters list
chapters = result.get("chapters", [])
full_count = len(chapters)
# ------------------------------------------------------------
- # DRY RUN
+ # DRY RUN (limit number of chapters)
# ------------------------------------------------------------
DRY_RUN = os.getenv("DRY_RUN", "0") == "1"
TEST_LIMIT = int(os.getenv("TEST_LIMIT", "5"))
@@ -51,34 +51,45 @@ def start_scrape_book(self, url: str):
if DRY_RUN:
log(f"[SCRAPING] DRY_RUN: limiting chapters to {TEST_LIMIT}")
chapters = chapters[:TEST_LIMIT]
- result["chapters"] = chapters
+
+ # ------------------------------------------------------------
+ # NORMALISE OUTPUT FORMAT
+ # - chapters = INT
+ # - chapter_list = LIST
+ # ------------------------------------------------------------
+ result["chapter_list"] = chapters
+ result["chapters"] = len(chapters)
log(f"[SCRAPING] Completed scrape: {len(chapters)}/{full_count} chapters")
# ------------------------------------------------------------
- # BOOK RUN ID (using title as ID)
+ # Ensure book_id exists
# ------------------------------------------------------------
- title = result.get("title") or "UnknownBook"
- book_id = title # user requirement
+ book_idx = result.get("book_idx")
+ if not book_idx:
+ raise ValueError("BookScraper did not return book_idx")
+ book_id = book_idx
result["book_id"] = book_id
- log(f"[SCRAPING] Assigned book_id = '{book_id}'")
+ log(f"[SCRAPING] Assigned book_id = {book_id}")
# ------------------------------------------------------------
# RESET ABORT + INITIALISE PROGRESS
# ------------------------------------------------------------
clear_abort(book_id)
- r.set(f"progress:{book_id}:total", len(chapters))
+ r.set(f"progress:{book_id}:total", result["chapters"])
r.set(f"progress:{book_id}:done", 0)
- r.delete(f"logs:{book_id}") # clear old logs if any
+
+ # clear legacy logs
+ r.delete(f"logs:{book_id}")
r.rpush(f"logs:{book_id}", f":: SCRAPING STARTED for {url}")
- r.rpush(f"logs:{book_id}", f":: Found {len(chapters)} chapters")
+ r.rpush(f"logs:{book_id}", f":: Found {result['chapters']} chapters")
# ------------------------------------------------------------
- # DISPATCH DOWNLOAD CONTROLLER
+ # DISPATCH CONTROLLER (book_idx + primitive metadata)
# ------------------------------------------------------------
celery_app.send_task(
"scraper.tasks.controller_tasks.launch_downloads",
@@ -86,11 +97,11 @@ def start_scrape_book(self, url: str):
queue="controller",
)
- log(f"[SCRAPING] Dispatched download controller for '{book_id}'")
+ log(f"[SCRAPING] Dispatched download controller for {book_id}")
return {
"book_id": book_id,
"title": result.get("title"),
"author": result.get("author"),
- "chapters": len(chapters),
+ "chapters": result["chapters"], # integer
}
diff --git a/bookscraper/templates/index.html b/bookscraper/templates/index.html
index a8a4b76..a751f12 100644
--- a/bookscraper/templates/index.html
+++ b/bookscraper/templates/index.html
@@ -1,34 +1,28 @@
-<!DOCTYPE html>
-<html lang="nl">
-<head>
- <meta charset="UTF-8">
- <title>BookScraper</title>
- <style>
- body { font-family: Arial, sans-serif; padding: 40px; max-width: 600px; margin: auto; }
- h1 { margin-bottom: 20px; }
- input[type="text"] {
- width: 100%; padding: 12px; font-size: 16px;
- border: 1px solid #ccc; border-radius: 6px;
- }
- button {
- margin-top: 20px;
- padding: 12px 20px;
- background: #007bff; color: white;
- border: none; border-radius: 6px;
- font-size: 16px; cursor: pointer;
- }
- button:hover { background: #0056b3; }
- </style>
-</head>
-<body>
+{% extends "base.html" %} {% block content %}
-<h1>BookScraper WebGUI</h1>
+<h1>BookScraper</h1>
+
+<div class="card">
+ <h3>Start new scrape</h3>
+
+ <form action="/start" method="POST">
+ <label for="url">Book URL:</label>
+ <input
+ type="text"
+ id="url"
+ name="url"
+ placeholder="https://www.example.com/book/123"
+ required
+ />
-<form action="/start" method="POST">
- <label for="url">Geef een boek-URL op:</label><br><br>
- <input type="text" id="url" name="url" placeholder="https://example.com/book/12345" required>
<button type="submit">Start Scraping</button>
-</form>
+ </form>
+</div>
+
+<div class="card">
+ <h3>Library</h3>
+ <p>Bekijk alle eerder gescrapete boeken.</p>
+ <a href="/library">Open Library</a>
+</div>
-</body>
-</html>
+{% endblock %}
diff --git a/bookscraper/templates/result.html b/bookscraper/templates/result.html
index 57aabf9..379cf6b 100644
--- a/bookscraper/templates/result.html
+++ b/bookscraper/templates/result.html
@@ -1,239 +1,33 @@
-<!DOCTYPE html>
-<html lang="nl">
- <head>
- <meta charset="UTF-8" />
- <title>BookScraper Resultaat</title>
+{% extends "base.html" %} {% block content %}
- <style>
- body {
- font-family: Arial, sans-serif;
- padding: 30px;
- max-width: 900px;
- margin: auto;
- }
- h1 {
- margin-bottom: 10px;
- }
- .box {
- padding: 15px;
- border: 1px solid #ddd;
- background: #f8f8f8;
- border-radius: 6px;
- margin-bottom: 20px;
- }
- .logbox {
- background: #000;
- color: #0f0;
- padding: 12px;
- height: 70vh;
- overflow-y: auto;
- font-family: monospace;
- border-radius: 6px;
- font-size: 13px;
- }
+<h1>Scraping gestart</h1>
- /* NEW: Clear button */
- #clearLogBtn {
- margin-bottom: 10px;
- padding: 8px 16px;
- background: #777;
- color: white;
- border: none;
- border-radius: 6px;
- cursor: pointer;
- }
- #clearLogBtn:hover {
- background: #555;
- }
+<div class="card">
+ {% if error %}
+ <div class="alert alert-error"><strong>Fout:</strong> {{ error }}</div>
- #abortBtn {
- padding: 12px 20px;
- background: #d9534f;
- color: white;
- border: none;
- border-radius: 6px;
- cursor: pointer;
- margin-top: 10px;
- }
- #abortBtn:hover {
- background: #c9302c;
- }
- #statusLine {
- font-size: 18px;
- font-weight: bold;
- }
- .hidden {
- display: none;
- }
- </style>
- </head>
+ <div class="actions">
+ <a href="/" class="button">Terug</a>
+ </div>
- <body>
- <a href="/">&larr; Terug</a>
- <h1>Scrape Resultaat--</h1>
+ {% else %} {% if message %}
+ <p>{{ message }}</p>
+ {% endif %} {% if scraping_task_id %}
+ <p><strong>Task ID:</strong> {{ scraping_task_id }}</p>
+ {% endif %}
- {% if error %}
- <div
- class="box"
- style="background: #ffdddd; border-left: 5px solid #ff4444"
- >
- <strong>Fout:</strong> {{ error }}
- </div>
- {% endif %} {% if message %}
- <div class="box">{{ message }}</div>
- {% endif %}
+ <p>
+ Je scraper is gestart.<br />
+ Zodra het eerste resultaat beschikbaar is, verschijnt het boek automatisch
+ in de <strong>Library</strong>.
+ </p>
- <!-- COVER -->
- {% if book_title %}
- <div class="box">
- <strong>Cover:</strong><br />
- <img
- src="/output/{{ book_title }}/cover.jpg"
- alt="Cover"
- style="
- margin-top: 10px;
- max-width: 250px;
- border: 1px solid #ccc;
- border-radius: 4px;
- "
- onerror="this.style.display='none'"
- />
- </div>
- {% endif %}
+ <div class="actions">
+ <a href="/library" class="button">Ga naar Library</a>
+ <a href="/" class="button">Nieuwe scrape</a>
+ </div>
- <div id="statusBox" class="box hidden">
- <div id="statusLine">Status: bezig…</div>
- <div id="progressText"></div>
- <button id="abortBtn" class="hidden">ABORT</button>
- </div>
+ {% endif %}
+</div>
- <!-- FAILED LIST -->
- <div
- id="failedBox"
- class="box hidden"
- style="background: #ffefef; border-left: 5px solid #cc0000"
- >
- <strong>Failed chapters:</strong>
- <ul id="failedList" style="margin-top: 10px"></ul>
- </div>
-
- <div class="box">
- <strong>Live log:</strong><br />
-
- <!-- NEW BUTTON -->
- <button id="clearLogBtn" onclick="clearLogs()">Clear logs</button>
-
- <div id="logbox" class="logbox"></div>
- </div>
-
- <script>
- const scrapingTaskId = "{{ scraping_task_id or '' }}";
-
- let bookId = null;
- let polling = true;
-
- if (scrapingTaskId) pollForBookId();
-
- function pollForBookId() {
- fetch(`/celery-result/${scrapingTaskId}`)
- .then((r) => r.json())
- .then((data) => {
- if (data.ready && data.result && data.result.book_id) {
- bookId = data.result.book_id;
- startLiveUI();
- } else setTimeout(pollForBookId, 800);
- })
- .catch(() => setTimeout(pollForBookId, 1200));
- }
-
- function startLiveUI() {
- document.getElementById("statusBox").classList.remove("hidden");
- document.getElementById("abortBtn").classList.remove("hidden");
-
- document.getElementById("abortBtn").onclick = () => {
- fetch(`/abort/${bookId}`, { method: "POST" });
- };
-
- pollProgress();
- pollLogs();
- }
-
- function pollProgress() {
- if (!bookId) return;
-
- fetch(`/progress/${bookId}`)
- .then((r) => r.json())
- .then((p) => {
- const done = p.completed || 0;
- const total = p.total || 0;
-
- document.getElementById(
- "progressText"
- ).innerText = `Completed: ${done} / ${total} | Skipped: ${
- p.skipped || 0
- } | Failed: ${p.failed || 0}`;
-
- const failedBox = document.getElementById("failedBox");
- const failedList = document.getElementById("failedList");
-
- if (p.failed_list && p.failed_list.length > 0) {
- failedBox.classList.remove("hidden");
- failedList.innerHTML = "";
- p.failed_list.forEach((entry) => {
- const li = document.createElement("li");
- li.textContent = entry;
- failedList.appendChild(li);
- });
- }
-
- if (p.abort) {
- document.getElementById("statusLine").innerText = "ABORTED";
- polling = false;
- } else if (done >= total && total > 0) {
- document.getElementById("statusLine").innerText = "KLAAR ✔";
- polling = false;
- } else {
- document.getElementById("statusLine").innerText = "Bezig…";
- }
-
- if (polling) setTimeout(pollProgress, 1000);
- })
- .catch(() => {
- if (polling) setTimeout(pollProgress, 1500);
- });
- }
-
- function pollLogs() {
- if (!polling) return;
-
- fetch(`/logs`)
- .then((r) => r.json())
- .then((data) => {
- const logbox = document.getElementById("logbox");
- logbox.innerHTML = "";
-
- data.logs.forEach((line) => {
- const div = document.createElement("div");
- div.textContent = line;
- logbox.appendChild(div);
- });
-
- logbox.scrollTop = logbox.scrollHeight;
- setTimeout(pollLogs, 1000);
- })
- .catch(() => setTimeout(pollLogs, 1500));
- }
-
- // =========================================================
- // NEW: Clear logs button handler
- // =========================================================
- function clearLogs() {
- fetch("/clear-logs", { method: "POST" })
- .then(() => {
- document.getElementById("logbox").innerHTML = "";
- })
- .catch((e) => console.error("Clear logs failed:", e));
- }
- </script>
- </body>
-</html>
+{% endblock %}