diff --git a/bookscraper/.gitignore b/bookscraper/.gitignore index 08fedd4..cd78ff3 100644 --- a/bookscraper/.gitignore +++ b/bookscraper/.gitignore @@ -1,4 +1,164 @@ -output/ +# ============================================ +# PYTHON +# ============================================ + +# Bytecode +__pycache__/ +*.pyc +*.pyo +*.pyd + +# Virtual environments venv/ +env/ +.venv/ + +# Python build artifacts +build/ +dist/ +*.egg-info/ + + +# ============================================ +# PROJECT-SPECIFIC IGNORE RULES +# ============================================ + +# Output generated by BookScraper +output/ +audio_output/ +m4b_output/ +covers/ + +# Logs *.log -__pycache__/ \ No newline at end of file +logs/ +log/ +celerybeat-schedule +celerybeat.pid + +# Redis dump (if ever created) +dump.rdb + +# Temporary HTML/debug scrapings +tmp/ +temp/ +*.html.tmp +*.debug.html + + +# ============================================ +# CELERY / RUNTIME +# ============================================ + +celerybeat-schedule +*.pid +*.worker + +# Celery progress / abort temporary files (if any) +abort_flags/ +progress_cache/ + + +# ============================================ +# DOCKER +# ============================================ + +# Docker build cache +**/.dockerignore +**/Dockerfile~ +docker-compose.override.yml +docker-compose.local.yml +docker-compose*.backup + +# Local bind mounts from Docker +**/.volumes/ +**/mnt/ +**/cache/ + + +# ============================================ +# FRONTEND / STATIC FILES +# ============================================ + +# Node / JS (if ever used) +node_modules/ +npm-debug.log +yarn-debug.log +yarn-error.log +dist/ +.bundle/ + + +# ============================================ +# VS CODE / EDITORS +# ============================================ + +# VSCode +.vscode/ +.history/ +.code-workspace + +# PyCharm / JetBrains +.idea/ + +# Editor backups +*.swp +*.swo +*~ + +# Autosave files +*.bak +*.tmp + + +# ============================================ +# SYSTEM / OS FILES +# ============================================ + +# MacOS bullshit +.DS_Store +.AppleDouble +.LSOverride +Icon? +.Trashes + +# Windows +Thumbs.db +Desktop.ini + + +# ============================================ +# ARCHIVES +# ============================================ + +*.zip +*.tar.gz +*.tgz +*.7z +*.rar + + +# ============================================ +# AUDIO / TTS / TEMPORARY +# ============================================ + +*.wav +*.mp3 +*.m4a +*.m4b +*.aac +*.flac + +tts_temp/ +audio_temp/ +tts_cache/ + + +# ============================================ +# GIT INTERNAL SAFETY +# ============================================ + +# Never track your global git config junk +.gitignore_global +.gitconfig +.gitattributes-global diff --git a/bookscraper/app.py b/bookscraper/app.py index bf758c8..241bc22 100644 --- a/bookscraper/app.py +++ b/bookscraper/app.py @@ -1,5 +1,5 @@ # ============================================ -# File: bookscraper/app.py (ASYNC SCRAPING) +# File: bookscraper/app.py (ASYNC SCRAPING + DASHBOARD) # ============================================ from dotenv import load_dotenv @@ -9,34 +9,45 @@ load_dotenv() print(">>> [WEB] Importing celery_app …") from celery_app import celery_app -from flask import Flask, render_template, request, jsonify +from flask import ( + Flask, + render_template, + request, + jsonify, + redirect, + send_from_directory, +) from scraper.logger import log_debug -# Abort + Progress (per book_id) +# Abort + Progress (legacy) from scraper.abort import set_abort from scraper.progress import get_progress -# UI LOGS (GLOBAL — no book_id) -from scraper.ui_log import get_ui_logs, reset_ui_logs # <-- ADDED +# NEW: Full Redis Book State Model +from scraper.progress import get_state +from scraper.progress import r as redis_client -from celery.result import AsyncResult +# NEW: Indexed log fetchers +from scraper.log_index import fetch_logs, fetch_recent_logs, fetch_global_logs + +# UI LOGS (legacy) +from scraper.ui_log import get_ui_logs, reset_ui_logs -# ⬇⬇⬇ TOEGEVOEGD voor cover-serving -from flask import send_from_directory +from celery.result import AsyncResult import os +import time +import re app = Flask(__name__) - # ===================================================== -# STATIC FILE SERVING FOR OUTPUT ← TOEGEVOEGD +# STATIC FILE SERVING FOR OUTPUT # ===================================================== OUTPUT_ROOT = os.getenv("BOOKSCRAPER_OUTPUT_DIR", "output") @app.route("/output/") def serve_output(filename): - """Serve output files such as cover.jpg and volumes.""" return send_from_directory(OUTPUT_ROOT, filename, as_attachment=False) @@ -49,7 +60,7 @@ def index(): # ===================================================== -# START SCRAPING (async via Celery) +# START SCRAPING → DIRECT REDIRECT TO DASHBOARD (book_idx native) # ===================================================== @app.route("/start", methods=["POST"]) def start_scraping(): @@ -58,64 +69,150 @@ def start_scraping(): if not url: return render_template("result.html", error="Geen URL opgegeven.") - # --------------------------------------------------------- - # NEW: Clear UI log buffer when starting a new scrape - # --------------------------------------------------------- reset_ui_logs() - log_debug(f"[WEB] Scraping via Celery: {url}") - async_result = celery_app.send_task( + # -------------------------------------------- + # Extract book_idx from URL + # Supports: + # - /15/15618.html + # - /15618/ + # - /15618.html + # -------------------------------------------- + m = re.search(r"/(\d+)(?:\.html|/)?$", url) + if not m: + return render_template( + "result.html", error="Kan book_idx niet bepalen uit URL." + ) + + book_idx = m.group(1) + + # -------------------------------------------- + # Start async scraping task + # -------------------------------------------- + celery_app.send_task( "scraper.tasks.scraping.start_scrape_book", args=[url], queue="scraping", ) - return render_template( - "result.html", - message="Scraping gestart.", - scraping_task_id=async_result.id, - book_title=None, - ) + # -------------------------------------------- + # DIRECT redirect — no waiting on Celery + # -------------------------------------------- + return redirect(f"/book/{book_idx}") # ===================================================== -# CLEAR UI LOGS MANUALLY (NEW) +# CLEAR UI LOGS (legacy) # ===================================================== @app.route("/clear-logs", methods=["POST"]) def clear_logs(): reset_ui_logs() - return jsonify({"status": "ok", "message": "UI logs cleared"}) + return jsonify({"status": "ok"}) + + +# ===================================================== +# ABORT (per book_idx) +# ===================================================== +@app.route("/abort/", methods=["POST"]) +def abort_download(book_idx): + log_debug(f"[WEB] Abort requested for book: {book_idx}") + set_abort(book_idx) + return jsonify({"status": "ok", "aborted": book_idx}) + + +# ===================================================== +# LEGACY PROGRESS ENDPOINT +# ===================================================== +@app.route("/progress/", methods=["GET"]) +def progress(book_idx): + return jsonify(get_progress(book_idx)) + + +# ===================================================== +# REDIS STATE ENDPOINT +# ===================================================== +@app.route("/state/", methods=["GET"]) +def full_state(book_idx): + return jsonify(get_state(book_idx)) # ===================================================== -# ABORT (per book_id) +# LIST ALL BOOKS — METADATA # ===================================================== -@app.route("/abort/", methods=["POST"]) -def abort_download(book_id): - log_debug(f"[WEB] Abort requested for book: {book_id}") - set_abort(book_id) - return jsonify({"status": "ok", "aborted": book_id}) +@app.route("/books", methods=["GET"]) +def list_books(): + books = sorted(redis_client.smembers("books") or []) + result = [] + + for book_idx in books: + meta = redis_client.hgetall(f"book:{book_idx}:meta") or {} + state = get_state(book_idx) or {} + + result.append( + { + "id": book_idx, + "title": meta.get("title", book_idx), + "author": meta.get("author"), + "url": meta.get("url"), + "cover_url": meta.get("cover_url"), + "scraped_at": meta.get("scraped_at"), + "status": state.get("status"), + "last_update": state.get("last_update"), + "chapters_total": state.get("chapters_total"), + "chapters_done": state.get("chapters_done"), + } + ) + + return jsonify(result) # ===================================================== -# PROGRESS (per book_id) +# LIBRARY DASHBOARD PAGE # ===================================================== -@app.route("/progress/", methods=["GET"]) -def progress(book_id): - return jsonify(get_progress(book_id)) +@app.route("/library", methods=["GET"]) +def library_page(): + return render_template("library.html") # ===================================================== -# LOGS — GLOBAL UI LOGS +# BOOK DASHBOARD PAGE — book_idx native # ===================================================== -@app.route("/logs", methods=["GET"]) -def logs(): - return jsonify({"logs": get_ui_logs()}) +@app.route("/book/", methods=["GET"]) +def book_dashboard(book_idx): + return render_template( + "book_dashboard.html", + book_id=book_idx, # for template backward compatibility + book_idx=book_idx, + ) # ===================================================== -# CELERY RESULT → return book_id when scraping finishes +# INDEXED LOG API — book_idx direct +# ===================================================== +@app.route("/api/book//logs", methods=["GET"]) +def api_book_logs(book_idx): + cursor = int(request.args.get("cursor", "0")) + logs, new_cursor = fetch_logs(book_idx, cursor) + return jsonify({"logs": logs, "cursor": new_cursor}) + + +@app.route("/api/book//logs/recent", methods=["GET"]) +def api_book_logs_recent(book_idx): + limit = int(request.args.get("limit", "200")) + logs = fetch_recent_logs(book_idx, limit) + return jsonify({"logs": logs}) + + +@app.route("/api/logs/global", methods=["GET"]) +def api_global_logs(): + cursor = int(request.args.get("cursor", "0")) + logs, new_cursor = fetch_global_logs(cursor) + return jsonify({"logs": logs, "cursor": new_cursor}) + + +# ===================================================== +# CELERY RESULT # ===================================================== @app.route("/celery-result/", methods=["GET"]) def celery_result(task_id): @@ -123,10 +220,8 @@ def celery_result(task_id): if result.successful(): return jsonify({"ready": True, "result": result.get()}) - if result.failed(): return jsonify({"ready": True, "error": "failed"}) - return jsonify({"ready": False}) diff --git a/bookscraper/logbus/publisher.py b/bookscraper/logbus/publisher.py index 9a597db..5476475 100644 --- a/bookscraper/logbus/publisher.py +++ b/bookscraper/logbus/publisher.py @@ -1,4 +1,11 @@ -# logbus/publisher.py +# ============================================================ +# File: logbus/publisher.py +# Purpose: +# Centralized logger: +# - console logging +# - UI legacy log echo +# - NEW: indexed Redis log ingest (non-blocking) +# ============================================================ import logging @@ -10,20 +17,45 @@ def log(message: str): Dumb logger: - skip lege messages - stuur message 1:1 door - - geen prefixes + - geen prefixes wijzigen - geen mutaties """ if not message or not message.strip(): return - # console + # ============================================================ + # SAFETY FIX (C&U): + # voorkom infinite loop: messages die uit log_index komen + # beginnen met "[IDX]" en mogen NIET opnieuw via de pipeline. + # ============================================================ + if message.startswith("[IDX]"): + logger.warning(message) + return + + # --------------------------------------- + # Console log + # --------------------------------------- logger.warning(message) - # UI-echo + # --------------------------------------- + # Legacy UI log (bestaand gedrag) + # --------------------------------------- try: - from scraper.ui_log import push_ui + from scraper.ui_log import push_ui # delayed import push_ui(message) except Exception: + # UI log mag nooit crash veroorzaken + pass + + # --------------------------------------- + # NEW: Indexed Redis log entry + # --------------------------------------- + try: + from scraper.log_index import ingest_indexed_log # delayed import + + ingest_indexed_log(message) + except Exception: + # Fail silently — logging mag nooit pipeline breken pass diff --git a/bookscraper/scraper/book_scraper.py b/bookscraper/scraper/book_scraper.py index 922d0c7..bf54375 100644 --- a/bookscraper/scraper/book_scraper.py +++ b/bookscraper/scraper/book_scraper.py @@ -13,10 +13,7 @@ from scraper.models.book_state import Chapter class BookScraper: """ Minimal scraper: only metadata + chapter list. - The DownloadController handles Celery pipelines for: - - download - - parse - - save + The DownloadController handles Celery pipelines. """ def __init__(self, site, url): @@ -29,6 +26,8 @@ class BookScraper: self.cover_url = "" self.chapter_base = None + self.book_idx = None # NUMERIEK ID UIT URL (enige bron) + self.chapters = [] # Load custom replacements @@ -40,6 +39,9 @@ class BookScraper: """Main entry point. Returns metadata + chapter URLs.""" soup = self._fetch(self.url) + # Book_idx alleen uit main URL (afspraak) + self._extract_book_idx() + self._parse_title(soup) self._parse_author(soup) self._parse_description(soup) @@ -54,14 +56,30 @@ class BookScraper: "title": self.book_title, "author": self.book_author, "description": self.book_description, - "cover_url": self.cover_url, # ← used by DownloadController + "cover_url": self.cover_url, "book_url": self.url, + "book_idx": self.book_idx, # <<< belangrijk voor logging pipeline "chapters": [ {"num": ch.number, "title": ch.title, "url": ch.url} for ch in self.chapters ], } + # ------------------------------------------------------------ + def _extract_book_idx(self): + """ + Extract numeric ID from main URL such as: + https://www.piaotia.com/bookinfo/15/15618.html + This is the ONLY allowed source. + """ + m = re.search(r"/(\d+)\.html$", self.url) + if m: + self.book_idx = m.group(1) + log_debug(f"[BookScraper] Extracted book_idx = {self.book_idx}") + else: + self.book_idx = None + log_debug("[BookScraper] book_idx NOT FOUND in URL") + # ------------------------------------------------------------ def _fetch(self, url): log_debug(f"[BookScraper] Fetch: {url}") @@ -109,10 +127,7 @@ class BookScraper: def _parse_cover(self, soup): """ Extract correct cover based on book_id path logic. - 1. primary: match "/files/article/image/{vol}/{book_id}/" - 2. fallback: endswith "/{book_id}s.jpg" """ - # Extract book_id from URL m = re.search(r"/(\d+)\.html$", self.url) if not m: log_debug("[BookScraper] No book_id found in URL → cannot match cover") @@ -120,20 +135,15 @@ class BookScraper: book_id = m.group(1) - # Extract vol folder from URL (bookinfo//.html) m2 = re.search(r"/bookinfo/(\d+)/", self.url) volume = m2.group(1) if m2 else None log_debug(f"[BookScraper] Book ID = {book_id}, Volume = {volume}") imgs = soup.find_all("img", src=True) - chosen = None - # -------------------------------------------------------- - # PRIORITY 1: Path-match - # /files/article/image/{vol}/{book_id}/ - # -------------------------------------------------------- + # PATH-MATCH if volume: target_path = f"/files/article/image/{volume}/{book_id}/" for img in imgs: @@ -143,9 +153,7 @@ class BookScraper: log_debug(f"[BookScraper] Cover matched by PATH: {src}") break - # -------------------------------------------------------- - # PRIORITY 2: endswith "/{book_id}s.jpg" - # -------------------------------------------------------- + # SUFFIX-MATCH if not chosen: target_suffix = f"/{book_id}s.jpg" for img in imgs: @@ -155,9 +163,6 @@ class BookScraper: log_debug(f"[BookScraper] Cover matched by SUFFIX: {src}") break - # -------------------------------------------------------- - # No match - # -------------------------------------------------------- if not chosen: log_debug("[BookScraper] No matching cover found") return @@ -167,14 +172,12 @@ class BookScraper: # ------------------------------------------------------------ def get_chapter_page(self, soup): - """Return BeautifulSoup of the main chapter list page.""" node = soup.select_one( "html > body > div:nth-of-type(6) > div:nth-of-type(2) > div > table" ) href = node.select_one("a").get("href") chapter_url = urljoin(self.site.root, href) - # base for chapter links parts = chapter_url.rsplit("/", 1) self.chapter_base = parts[0] + "/" diff --git a/bookscraper/scraper/download_controller.py b/bookscraper/scraper/download_controller.py index 9a9e978..4f2cf32 100644 --- a/bookscraper/scraper/download_controller.py +++ b/bookscraper/scraper/download_controller.py @@ -1,231 +1,133 @@ -# ========================================================= +# ============================================================ # File: scraper/download_controller.py # Purpose: -# Build Celery pipelines for all chapters -# and pass book_id for abort/progress/log functionality. -# + Download and replicate cover image to all volume folders -# + Generate scripts (allinone.txt, makebook, say) -# + Initialize Redis Book State Model (status + counters) -# ========================================================= - -from celery import group -from scraper.tasks.pipeline import build_chapter_pipeline -from scraper.scriptgen import generate_all_scripts -from logbus.publisher import log +# Prepare folder structure, volumes, cover, and Celery pipelines +# using ONLY Celery-safe primitive arguments. +# +# Workers never receive BookContext/ChapterContext. +# ============================================================ + import os import requests -import shutil -from scraper.abort import abort_requested # DEBUG allowed +from logbus.publisher import log +from celery import chain, group + +from scraper.tasks.download_tasks import download_chapter +from scraper.tasks.parse_tasks import parse_chapter +from scraper.tasks.save_tasks import save_chapter +from scraper.tasks.progress_tasks import update_progress + -# NEW: Redis State Model (C&U) -from scraper.progress import ( - init_book_state, - set_status, - set_chapter_total, -) +print(">>> [IMPORT] download_controller.py loaded (final Celery-safe mode)") class DownloadController: """ - Coordinates all chapter pipelines (download → parse → save), - including: - - volume splitting - - consistent meta propagation - - book_id-based abort + progress tracking - - cover download + volume replication - - script generation (allinone.txt, makebook, say) - - Redis book state initialisation and status updates + Responsibilities: + • Determine output root + • Download cover + • Assign chapters → volumes + • Build Celery pipelines (primitive-only arguments) """ - def __init__(self, book_id: str, scrape_result: dict): - self.book_id = book_id + def __init__(self, scrape_result: dict): + self.book_idx = scrape_result.get("book_id") + self.title = scrape_result.get("title", "Unknown") + self.author = scrape_result.get("author") + self.cover_url = scrape_result.get("cover") self.scrape_result = scrape_result - # Core metadata - self.title = scrape_result.get("title", "UnknownBook") - self.chapters = scrape_result.get("chapters", []) or [] - self.cover_url = scrape_result.get("cover_url") - - # Output base dir - root = os.getenv("BOOKSCRAPER_OUTPUT_DIR", "output") - - # Volume size - self.max_vol = int(os.getenv("MAX_VOL_SIZE", "200")) - - # Base folder for the whole book - self.book_base = os.path.join(root, self.title) - os.makedirs(self.book_base, exist_ok=True) - - # Meta passed to parse/save stage - self.meta = { - "title": self.title, - "author": scrape_result.get("author"), - "description": scrape_result.get("description"), - "book_url": scrape_result.get("book_url"), - } - - # ------------------------------------------------- - # DEBUG — bevestig dat controller correct book_id ziet - # ------------------------------------------------- - log(f"[CTRL_DEBUG] Controller init book_id={book_id} title='{self.title}'") - - try: - abort_state = abort_requested(book_id) - log(f"[CTRL_DEBUG] abort_requested(book_id={book_id}) → {abort_state}") - except Exception as e: - log(f"[CTRL_DEBUG] abort_requested ERROR: {e}") - - # ------------------------------------------------- - # NEW: Initialize Redis Book State Model - # ------------------------------------------------- - try: - init_book_state( - book_id=self.book_id, - title=self.title, - url=self.scrape_result.get("book_url"), - chapters_total=len(self.chapters), - ) - log(f"[CTRL_STATE] init_book_state() completed for {self.title}") - except Exception as e: - log(f"[CTRL_STATE] init_book_state FAILED: {e}") - - # --------------------------------------------------------- - # Cover Download - # --------------------------------------------------------- - def download_cover(self): - """Download one cover image into the root of the book folder.""" - if not self.cover_url: - log(f"[CTRL] No cover URL found for '{self.title}'") - return - - cover_path = os.path.join(self.book_base, "cover.jpg") - - headers = { - "User-Agent": ( - "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:118.0) " - "Gecko/20100101 Firefox/118.0" - ), - "Referer": self.scrape_result.get("book_url") or "https://www.piaotia.com/", - } - - try: - log(f"[CTRL] Downloading cover: {self.cover_url}") - - resp = requests.get(self.cover_url, timeout=10, headers=headers) - resp.raise_for_status() + # List of dicts from scraper: [{num, title, url}] + self.chapters = scrape_result.get("chapter_list", []) + self.chapter_count = len(self.chapters) + + # Output root + self.output_root = os.path.join("/app/output", self.title) + + # Will be filled by assign_volumes() + self.volume_paths = {} # chapter_num → volume_path + + # ----------------------------------------------------------- + # Create root folder + download cover + # ----------------------------------------------------------- + def init_book_output(self): + os.makedirs(self.output_root, exist_ok=True) + + if self.cover_url: + try: + resp = requests.get(self.cover_url, timeout=10) + if resp.ok: + p = os.path.join(self.output_root, "cover.jpg") + with open(p, "wb") as f: + f.write(resp.content) + log(f"[CTRL] Cover saved: {p}") + except Exception as exc: + log(f"[CTRL] Cover download error: {exc}") + + # ----------------------------------------------------------- + # Volume assignment (no BookContext needed) + # ----------------------------------------------------------- + def assign_volumes(self, max_chapters_per_volume=200): + """ + Determine per-chapter volume_path and ensure folders exist. + """ + volume_index = 1 + chapters_in_volume = 0 - with open(cover_path, "wb") as f: - f.write(resp.content) - - log(f"[CTRL] Cover saved to: {cover_path}") - - except Exception as e: - log(f"[CTRL] Cover download failed: {e} (url={self.cover_url})") - - # --------------------------------------------------------- - # Cover Replication to Volumes - # --------------------------------------------------------- - def replicate_cover_to_volumes(self): - """Copy cover.jpg into each existing Volume_xxx directory.""" - src = os.path.join(self.book_base, "cover.jpg") - if not os.path.exists(src): - log("[CTRL] No cover.jpg found, replication skipped") - return + for ch in self.chapters: + if chapters_in_volume >= max_chapters_per_volume: + volume_index += 1 + chapters_in_volume = 0 - try: - for entry in os.listdir(self.book_base): - if entry.lower().startswith("volume_"): - vol_dir = os.path.join(self.book_base, entry) - dst = os.path.join(vol_dir, "cover.jpg") - - shutil.copyfile(src, dst) - log(f"[CTRL] Cover replicated into: {dst}") - - except Exception as e: - log(f"[CTRL] Cover replication failed: {e}") - - # --------------------------------------------------------- - # Volume isolation - # --------------------------------------------------------- - def get_volume_path(self, chapter_num: int) -> str: - """Returns the correct volume directory for a chapter.""" - vol_index = (chapter_num - 1) // self.max_vol + 1 - vol_name = f"Volume_{vol_index:03d}" - vol_path = os.path.join(self.book_base, vol_name) - os.makedirs(vol_path, exist_ok=True) - return vol_path - - # --------------------------------------------------------- - # Pipeline launcher - # --------------------------------------------------------- - def start(self): - total = len(self.chapters) + volume_path = os.path.join(self.output_root, f"Volume{volume_index}") + os.makedirs(volume_path, exist_ok=True) - log( - f"[CTRL] Initialising pipeline for '{self.title}' " - f"(book_id={self.book_id}, chapters={total}, max_vol={self.max_vol})" - ) - log(f"[CTRL] Output root: {self.book_base}") + # C&U FIX — scraper outputs key "num", NOT "number" + chapter_num = ch["num"] + self.volume_paths[chapter_num] = volume_path - # ------------------------------------- - # NEW: Redis state update - # ------------------------------------- - try: - set_status(self.book_id, "downloading") - set_chapter_total(self.book_id, total) - log(f"[CTRL_STATE] Status set to 'downloading' for {self.book_id}") - except Exception as e: - log(f"[CTRL_STATE] set_status/set_chapter_total FAILED: {e}") + chapters_in_volume += 1 - # ------------------------------------- - # 1) Download cover - # ------------------------------------- - self.download_cover() + log(f"[CTRL] Volume assignment complete: {len(self.volume_paths)} chapters") - tasks = [] + # ----------------------------------------------------------- + # Build Celery pipelines (primitive-only) + # ----------------------------------------------------------- + def build_pipelines(self): + pipelines = [] for ch in self.chapters: - chapter_num = ch["num"] - chapter_url = ch["url"] - - volume_path = self.get_volume_path(chapter_num) - - tasks.append( - build_chapter_pipeline( - self.book_id, - chapter_num, - chapter_url, - volume_path, - self.meta, - ) + # C&U FIX — must use "num" + num = ch["num"] + url = ch["url"] + vp = self.volume_paths[num] + + p = chain( + download_chapter.s(self.book_idx, num, url, vp), + parse_chapter.s(self.book_idx, num), + save_chapter.s(self.book_idx, num), + update_progress.s(self.book_idx), ) + pipelines.append(p) - async_result = group(tasks).apply_async() - - log( - f"[CTRL] Pipelines dispatched for '{self.title}' " - f"(book_id={self.book_id}, group_id={async_result.id})" - ) - - # Debug abort state - try: - abort_state = abort_requested(self.book_id) - log(f"[CTRL_DEBUG] After-dispatch abort state: {abort_state}") - except Exception as e: - log(f"[CTRL_DEBUG] abort_requested error after dispatch: {e}") - - # ------------------------------------------------------- - self.replicate_cover_to_volumes() + return pipelines - # ------------------------------------------------------- + # ----------------------------------------------------------- + # Launch Celery group(pipelines) + # ----------------------------------------------------------- + def dispatch_pipelines(self, pipelines): try: - generate_all_scripts( - self.book_base, - self.title, - self.meta.get("author"), - ) - log(f"[CTRL] Scripts generated for '{self.title}'") - except Exception as e: - log(f"[CTRL] Script generation failed: {e}") - - return async_result + g = group(pipelines) + result = g.apply_async() + return result + except Exception as exc: + log(f"[CTRL] ERROR dispatching pipelines: {exc}") + raise + + # ----------------------------------------------------------- + # Legacy convenience entrypoint + # ----------------------------------------------------------- + def start(self): + self.init_book_output() + self.assign_volumes() + return self.dispatch_pipelines(self.build_pipelines()) diff --git a/bookscraper/scraper/tasks/audio_tasks.py b/bookscraper/scraper/tasks/audio_tasks.py index fea3285..f43d3a9 100644 --- a/bookscraper/scraper/tasks/audio_tasks.py +++ b/bookscraper/scraper/tasks/audio_tasks.py @@ -1,5 +1,8 @@ # ============================================================ # File: scraper/tasks/audio_tasks.py +# Purpose: +# Convert a saved chapter's text file into an audio file (.m4a) +# using macOS "say". Uses BookContext + ChapterContext. # ============================================================ from celery_app import celery_app @@ -12,29 +15,38 @@ from scraper.abort import abort_requested from redis import Redis from urllib.parse import urlparse -# Kies lokale redis als aanwezig, anders standaard backend -redis_url = os.getenv("REDIS_BACKEND_LOCAL") or os.getenv("REDIS_BACKEND") - -parsed = urlparse(redis_url) +from scraper.progress import ( + set_status, + set_last_update, + inc_audio_done, + save_skip_reason, +) # ------------------------------------------------------------ -# REGULIER REDIS CLIENT (slots, file checks, state) +# REDIS CLIENTS # ------------------------------------------------------------ + +redis_url = os.getenv("REDIS_BACKEND_LOCAL") or os.getenv("REDIS_BACKEND") +parsed = urlparse(redis_url) + +# Slot management DB (same as download slot handling) redis_client = Redis( host=parsed.hostname, port=parsed.port, db=parsed.path.strip("/"), ) -# ------------------------------------------------------------ -# BACKEND CLIENT (abort flags, progress counters) - altijd DB 0 -# ------------------------------------------------------------ +# Backend DB (abort + progress counters) backend_client = Redis( host=parsed.hostname, port=parsed.port, db=0, ) +# ------------------------------------------------------------ +# ENVIRONMENT +# ------------------------------------------------------------ + AUDIO_TIMEOUT = int(os.getenv("AUDIO_TIMEOUT_SECONDS", "300")) AUDIO_VOICE = os.getenv("AUDIO_VOICE", "SinJi") AUDIO_RATE = int(os.getenv("AUDIO_RATE", "200")) @@ -44,20 +56,54 @@ AUDIO_SLOTS = int(os.getenv("AUDIO_SLOTS", "1")) CONTAINER_PREFIX = os.getenv("BOOKSCRAPER_OUTPUT_DIR", "/app/output") +# ============================================================ +# CELERY TASK — NOW USING CONTEXT OBJECTS +# ============================================================ + + @celery_app.task(bind=True, queue="audio", ignore_result=True) -def generate_audio( - self, book_id, volume_name, chapter_number, chapter_title, chapter_text -): - log(f"[AUDIO] CH{chapter_number}: START task → raw_input={chapter_text}") - - # Abort early - if abort_requested(book_id, backend_client): - log(f"[AUDIO] ABORT detected → skip CH{chapter_number}") +def generate_audio(self, book_context, chapter_context): + """ + Create audio using: + - book_context.book_idx + - chapter_context.number + - chapter_context.path (text filepath) + """ + + # ------------------------------------------------------------ + # IDENTIFIERS + # ------------------------------------------------------------ + book_idx = book_context.book_idx + chapter_number = chapter_context.number + text_file_path = chapter_context.path + + log(f"[AUDIO] CH{chapter_number}: START task → {text_file_path}") + + # ------------------------------------------------------------ + # Update state: audio stage active + # ------------------------------------------------------------ + try: + set_status(book_idx, "audio") + set_last_update(book_idx) + except Exception: + pass + + # ------------------------------------------------------------ + # Abort BEFORE doing anything + # ------------------------------------------------------------ + if abort_requested(book_idx, backend_client): + log(f"[AUDIO] ABORT detected → skip chapter {chapter_number}") + try: + chapter_context.add_skip("audio", "abort") + save_skip_reason(book_idx, chapter_number, "audio_abort") + set_last_update(book_idx) + except Exception: + pass return - # ============================================================ + # ------------------------------------------------------------ # ACQUIRE AUDIO SLOT - # ============================================================ + # ------------------------------------------------------------ slot_key = None ttl = AUDIO_TIMEOUT + 15 @@ -68,11 +114,13 @@ def generate_audio( log(f"[AUDIO] CH{chapter_number}: Acquired slot {i}/{AUDIO_SLOTS}") break + # If no slot free → wait if slot_key is None: log(f"[AUDIO] CH{chapter_number}: All slots busy → waiting...") start_wait = time.time() while slot_key is None: + # retry each slot for i in range(1, AUDIO_SLOTS + 1): key = f"audio_slot:{i}" if redis_client.set(key, "1", nx=True, ex=ttl): @@ -80,101 +128,135 @@ def generate_audio( log(f"[AUDIO] CH{chapter_number}: Slot acquired after wait") break - if slot_key: - break - - if abort_requested(book_id, backend_client): + # abort while waiting + if abort_requested(book_idx, backend_client): log(f"[AUDIO] ABORT while waiting → skip CH{chapter_number}") + try: + chapter_context.add_skip("audio", "abort_wait") + save_skip_reason(book_idx, chapter_number, "audio_abort_wait") + set_last_update(book_idx) + except Exception: + pass return + # timeout if time.time() - start_wait > ttl: - log(f"[AUDIO] CH{chapter_number}: Slot wait timeout → aborting audio") + log(f"[AUDIO] CH{chapter_number}: WAIT TIMEOUT → aborting") + try: + chapter_context.add_skip("audio", "timeout_wait") + save_skip_reason(book_idx, chapter_number, "audio_timeout_wait") + set_last_update(book_idx) + except Exception: + pass return time.sleep(0.25) - # ============================================================ - # PATH NORMALISATION - # ============================================================ - - container_path = chapter_text - - # Fix 1 — container_path kan None zijn → abort zonder crash - if not container_path: - log(f"[AUDIO] CH{chapter_number}: FATAL — no input path provided") - redis_client.delete(slot_key) + # ------------------------------------------------------------ + # VALIDATE INPUT PATH + # ------------------------------------------------------------ + if not text_file_path: + log(f"[AUDIO] CH{chapter_number}: No input path") + try: + chapter_context.add_skip("audio", "missing_input") + save_skip_reason(book_idx, chapter_number, "audio_missing_input") + set_last_update(book_idx) + except Exception: + pass + if slot_key: + redis_client.delete(slot_key) return - # Fix 2 — veilige startswith - if CONTAINER_PREFIX and container_path.startswith(CONTAINER_PREFIX): - relative_path = container_path[len(CONTAINER_PREFIX) :].lstrip("/") + # Convert container path → host FS path + if text_file_path.startswith(CONTAINER_PREFIX): + relative = text_file_path[len(CONTAINER_PREFIX) :].lstrip("/") else: - relative_path = container_path - - parts = relative_path.split("/") - if len(parts) < 3: - log( - f"[AUDIO] CH{chapter_number}: FATAL — cannot parse book/volume from {relative_path}" - ) - redis_client.delete(slot_key) - return - - book_from_path = parts[0] - volume_from_path = parts[1] - - host_path = os.path.join(HOST_PATH, relative_path) - - # ============================================================ - # OUTPUT PREP - # ============================================================ - - base_dir = os.path.join(HOST_PATH, book_from_path, volume_from_path, "Audio") - os.makedirs(base_dir, exist_ok=True) + relative = text_file_path + + host_input_path = os.path.join(HOST_PATH, relative) + + # ------------------------------------------------------------ + # Determine output directory + # ------------------------------------------------------------ + volume_name = os.path.basename(os.path.dirname(text_file_path)) + audio_output_dir = os.path.join( + HOST_PATH, + os.path.basename(book_context.book_base), + volume_name, + "Audio", + ) + os.makedirs(audio_output_dir, exist_ok=True) - safe_num = f"{chapter_number:04d}" - audio_file = os.path.join(base_dir, f"{safe_num}.m4a") + audio_file = os.path.join(audio_output_dir, f"{chapter_number:04d}.m4a") + # ------------------------------------------------------------ + # Skip if output already exists + # ------------------------------------------------------------ if os.path.exists(audio_file): log(f"[AUDIO] Skip CH{chapter_number} → already exists") - redis_client.delete(slot_key) + try: + chapter_context.add_skip("audio", "already_exists") + save_skip_reason(book_idx, chapter_number, "audio_exists") + set_last_update(book_idx) + except Exception: + pass + if slot_key: + redis_client.delete(slot_key) return - # ============================================================ - # BUILD CMD - # ============================================================ - + # ------------------------------------------------------------ + # BUILD COMMAND + # ------------------------------------------------------------ cmd = ( f"say --voice={AUDIO_VOICE} " - f"--input-file='{host_path}' " + f"--input-file='{host_input_path}' " f"--output-file='{audio_file}' " - f"--file-format=m4bf " - f"--quality=127 " - f"-r {AUDIO_RATE} " - f"--data-format=aac" + f"--file-format=m4bf --quality=127 " + f"-r {AUDIO_RATE} --data-format=aac" ) log(f"[AUDIO] CH{chapter_number}: CMD = {cmd}") - # ============================================================ + # ------------------------------------------------------------ # RUN TTS - # ============================================================ + # ------------------------------------------------------------ try: subprocess.run(cmd, shell=True, check=True, timeout=AUDIO_TIMEOUT) log(f"[AUDIO] CH{chapter_number}: Completed") + inc_audio_done(book_idx) + set_last_update(book_idx) + + chapter_context.audio_done = True + except subprocess.TimeoutExpired: - log(f"[AUDIO] CH{chapter_number}: TIMEOUT → remove incomplete file") + log(f"[AUDIO] CH{chapter_number}: TIMEOUT → removing incomplete file") + try: + chapter_context.add_skip("audio", "timeout") + save_skip_reason(book_idx, chapter_number, "audio_timeout") + set_last_update(book_idx) + except Exception: + pass if os.path.exists(audio_file): - try: - os.remove(audio_file) - except Exception: - pass + os.remove(audio_file) except subprocess.CalledProcessError as e: - log(f"[AUDIO] CH{chapter_number}: ERROR during say → {e}") + log(f"[AUDIO] CH{chapter_number}: ERROR in say → {e}") + try: + chapter_context.add_skip("audio", f"cmd_error:{e}") + save_skip_reason(book_idx, chapter_number, "audio_cmd_error") + set_last_update(book_idx) + except Exception: + pass except Exception as e: log(f"[AUDIO] CH{chapter_number}: UNEXPECTED ERROR → {e}") + try: + chapter_context.add_skip("audio", f"unexpected:{e}") + save_skip_reason(book_idx, chapter_number, "audio_unexpected_error") + set_last_update(book_idx) + except Exception: + pass finally: if slot_key: diff --git a/bookscraper/scraper/tasks/controller_tasks.py b/bookscraper/scraper/tasks/controller_tasks.py index 0f06405..b7336c8 100644 --- a/bookscraper/scraper/tasks/controller_tasks.py +++ b/bookscraper/scraper/tasks/controller_tasks.py @@ -1,81 +1,98 @@ # ============================================================ # File: scraper/tasks/controller_tasks.py # Purpose: -# Start the download → parse → save pipeline for a scraped book, -# including progress/abort tracking via book_id. -# ONLY THE CONTROLLER UPDATES PROGRESS (initial total). +# Launch the full download/parse/save pipeline. +# +# JSON-safe Celery architecture: +# • controller receives primitive scrape_result +# • controller builds DownloadController locally +# • controller assigns volumes & prepares folders +# • Celery pipelines receive ONLY (book_idx, chapter_num) # ============================================================ from celery_app import celery_app from logbus.publisher import log from scraper.download_controller import DownloadController -from scraper.progress import ( - set_total, -) +from scraper.progress import set_total, set_status from scraper.abort import abort_requested -print(">>> [IMPORT] controller_tasks.py loaded") +print(">>> [IMPORT] controller_tasks.py loaded (ID-only mode)") @celery_app.task(bind=True, queue="controller", ignore_result=False) def launch_downloads(self, book_id: str, scrape_result: dict): - """ - Launch the entire pipeline (download → parse → save), - AND initialize progress counters. - - Chapter-level progress is updated INSIDE the download/parse/save tasks. - This task MUST NOT call .get() on async subtasks (Celery restriction). - """ - - title = scrape_result.get("title", "UnknownBook") - chapters = scrape_result.get("chapters", []) or [] - total = len(chapters) - - log(f"[CTRL] Book '{title}' → {total} chapters (book_id={book_id})") - - # ------------------------------------------------------------ - # INIT PROGRESS - # ------------------------------------------------------------ - set_total(book_id, total) - log(f"[CTRL] Progress initialized for {book_id}: total={total}") - - # ------------------------------------------------------------ - # BUILD CONTROLLER - # ------------------------------------------------------------ - ctl = DownloadController(book_id, scrape_result) - - # ------------------------------------------------------------ - # START PIPELINES (ASYNC) - # Returns a celery group AsyncResult. We DO NOT iterate or get(). - # Progress & failures are handled by the worker subtasks. - # ------------------------------------------------------------ + + title = scrape_result.get("title", "Unknown") + chapter_count = scrape_result.get("chapters", 0) + book_idx = scrape_result.get("book_id") + + log(f"[CTRL] Book '{title}' → {chapter_count} chapters (book_idx={book_idx})") + + # ----------------------------------------------------------- + # Initialize progress counters + # ----------------------------------------------------------- + try: + set_total(book_idx, chapter_count) + set_status(book_idx, "downloading") + except Exception as exc: + log(f"[CTRL] ERROR setting up progress counters: {exc}") + raise + + # ----------------------------------------------------------- + # Build controller + # ----------------------------------------------------------- + try: + ctl = DownloadController(scrape_result) + except Exception as exc: + log(f"[CTRL] ERROR constructing DownloadController: {exc}") + raise + + # ----------------------------------------------------------- + # Prepare folders + cover (NEW: init_book_output) + # ----------------------------------------------------------- try: - group_result = ctl.start() + ctl.init_book_output() + except Exception as exc: + log(f"[CTRL] ERROR initializing context: {exc}") + raise + + log(f"[CTRL_STATE] init_book_output() completed for {title}") - log( - f"[CTRL] Pipelines dispatched for '{title}' " - f"(book_id={book_id}, group_id={group_result.id})" - ) + # ----------------------------------------------------------- + # Abort check BEFORE pipelines + # ----------------------------------------------------------- + if abort_requested(book_idx): + log(f"[CTRL] ABORT requested BEFORE pipeline dispatch → stopping.") + set_status(book_idx, "aborted") + return {"status": "aborted"} - # Abort flag set BEFORE tasks start? - if abort_requested(book_id): - log(f"[CTRL] ABORT requested before tasks start") - return {"book_id": book_id, "aborted": True} + # ----------------------------------------------------------- + # Assign volumes + # ----------------------------------------------------------- + try: + ctl.assign_volumes() + except Exception as exc: + log(f"[CTRL] ERROR during volume assignment: {exc}") + raise + # ----------------------------------------------------------- + # Build pipelines (primitive arguments) + # ----------------------------------------------------------- + try: + tasks = ctl.build_pipelines() + except Exception as exc: + log(f"[CTRL] ERROR building pipelines: {exc}") + raise + + # ----------------------------------------------------------- + # Dispatch all pipelines + # ----------------------------------------------------------- + try: + result_group = ctl.dispatch_pipelines(tasks) except Exception as exc: - log(f"[CTRL] ERROR while dispatching pipelines: {exc}") + log(f"[CTRL] ERROR dispatching pipelines: {exc}") raise - # ------------------------------------------------------------ - # CONTROLLER DOES NOT WAIT FOR SUBTASK RESULTS - # (Download/parse/save tasks update progress themselves) - # ------------------------------------------------------------ - log(f"[CTRL] Controller finished dispatch for book_id={book_id}") - - return { - "book_id": book_id, - "total": total, - "started": True, - "group_id": group_result.id, - } + log(f"[CTRL] Pipelines dispatched for {title} (book_idx={book_idx})") + return {"book_idx": book_idx, "pipelines": len(tasks)} diff --git a/bookscraper/scraper/tasks/download_tasks.py b/bookscraper/scraper/tasks/download_tasks.py index 5110483..1e01630 100644 --- a/bookscraper/scraper/tasks/download_tasks.py +++ b/bookscraper/scraper/tasks/download_tasks.py @@ -1,22 +1,41 @@ # ============================================================ # File: scraper/tasks/download_tasks.py -# Purpose: Download chapter HTML with global concurrency, -# retry/backoff logic, 429 support, and abort-awareness. # -# Logging: -# - timestamp + book_id in de message -# - message wordt via publisher.py naar console gestuurd -# - message wordt via ui_log.push_ui naar Redis GUI logbuffer gestuurd +# FINAL ARCHITECTURE — CELERY-SAFE VERSION +# +# This task downloads EXACTLY ONE CHAPTER using ONLY primitive +# Celery-safe arguments: +# +# download_chapter(book_idx, chapter_num, chapter_url, volume_path) +# +# BookContext / ChapterContext are NOT passed into Celery and +# NOT loaded inside workers. Workers ONLY manipulate: +# +# • Redis abort flags +# • Redis chapter-start markers +# • Redis progress counters +# • save_path derived from volume_path + chapter_num +# +# The chapter HTML is written to disk and a small Redis skip- +# reason is stored when needed. All other state lives outside +# Celery. +# +# This is the architecture we agreed on to avoid JSON +# serialization errors and to remove the need for +# load_book_context/save_book_context. # -# publisher.py en ui_log.py blijven DOM. # ============================================================ from celery_app import celery_app from scraper.utils import get_save_path from scraper.abort import abort_requested, chapter_started, mark_chapter_started -from logbus.publisher import log # console logging (DOM) -from scraper.ui_log import push_ui # GUI logging (DOM) +from scraper.progress import ( + inc_chapter_done, + save_skip_reason, +) + +from logbus.publisher import log import requests import redis @@ -25,50 +44,38 @@ import time from datetime import datetime -print(">>> [IMPORT] download_tasks.py loaded") +print(">>> [IMPORT] download_tasks.py loaded (final Celery-safe mode)") # ----------------------------------------------------------- -# TIMESTAMPED LOG WRAPPER +# TIMESTAMPED LOGGER (book_idx ONLY) # ----------------------------------------------------------- -def log_msg(book_id: str, message: str): +def log_msg(book_idx: str, message: str): ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - full = f"{ts} [{book_id}] {message}" - log(full) - push_ui(full) + log(f"{ts} [{book_idx}] {message}") # ----------------------------------------------------------- -# Retry parameters (ENV) +# ENV SETTINGS # ----------------------------------------------------------- MAX_RETRIES = int(os.getenv("DOWNLOAD_MAX_RETRIES", "7")) BASE_DELAY = int(os.getenv("DOWNLOAD_BASE_DELAY", "2")) BACKOFF = int(os.getenv("DOWNLOAD_BACKOFF_MULTIPLIER", "2")) DELAY_429 = int(os.getenv("DOWNLOAD_429_DELAY", "10")) -# ----------------------------------------------------------- -# Global concurrency -# ----------------------------------------------------------- MAX_CONCURRENCY = int(os.getenv("DOWNLOAD_MAX_GLOBAL_CONCURRENCY", "1")) - -# ----------------------------------------------------------- -# Global delay sync -# ----------------------------------------------------------- GLOBAL_DELAY = int(os.getenv("DOWNLOAD_GLOBAL_MIN_DELAY", "1")) -DELAY_KEY = "download:delay_lock" -# ----------------------------------------------------------- -# Redis -# ----------------------------------------------------------- REDIS_URL = os.getenv("REDIS_BROKER", "redis://redis:6379/0") redis_client = redis.Redis.from_url(REDIS_URL) SEM_KEY = "download:active" +DELAY_KEY = "download:delay_lock" -# ============================================================ -# GLOBAL DELAY FUNCTIONS -# ============================================================ +# ----------------------------------------------------------- +# Delay + concurrency helpers +# ----------------------------------------------------------- def wait_for_global_delay(): if GLOBAL_DELAY <= 0: return @@ -82,13 +89,10 @@ def set_global_delay(): redis_client.set(DELAY_KEY, "1", nx=True, ex=GLOBAL_DELAY) -# ============================================================ -# GLOBAL CONCURRENCY FUNCTIONS -# ============================================================ -def acquire_global_slot(max_slots: int, retry_delay: float = 0.5): +def acquire_global_slot(max_slots: int, retry_delay=0.5): while True: - current = redis_client.incr(SEM_KEY) - if current <= max_slots: + cur = redis_client.incr(SEM_KEY) + if cur <= max_slots: return redis_client.decr(SEM_KEY) time.sleep(retry_delay) @@ -98,81 +102,71 @@ def release_global_slot(): redis_client.decr(SEM_KEY) -print(f">>> [CONFIG] Global concurrency = {MAX_CONCURRENCY}") -print(f">>> [CONFIG] Global min delay = {GLOBAL_DELAY}s") -print( - f">>> [CONFIG] Retries: MAX={MAX_RETRIES}, base={BASE_DELAY}, " - f"backoff={BACKOFF}, 429={DELAY_429}" -) - - # ============================================================ -# CELERY TASK: DOWNLOAD CHAPTER +# CELERY TASK — PRIMITIVE ARG MODE +# +# book_idx: str Unique book identifier used for state tracking +# chapter_num: int Chapter number (1-based) +# chapter_url: str URL to download this chapter +# volume_path: str Filesystem directory where this chapter belongs # ============================================================ @celery_app.task(bind=True, queue="download", ignore_result=False) def download_chapter( - self, book_id: str, chapter_num: int, chapter_url: str, base_path: str + self, book_idx: str, chapter_num: int, chapter_url: str, volume_path: str ): """ - Download chapter HTML. - Abort logic: - - If abort active AND chapter not started → SKIP - - If abort active BUT chapter already started → Proceed normally + Download a single chapter using ONLY Celery-safe primitives. + No BookContext or ChapterContext is ever loaded. + + Writes: + /.html """ # ----------------------------------------------------------- - # ABORT BEFORE START + # Abort BEFORE start # ----------------------------------------------------------- - if abort_requested(book_id) and not chapter_started(book_id, chapter_num): - msg = f"[ABORT] Skip chapter {chapter_num} (abort active, not started)" - log_msg(book_id, msg) - return { - "book_id": book_id, - "chapter": chapter_num, - "url": chapter_url, - "html": None, - "skipped": True, - "path": None, - "abort": True, - } - - # Mark started - mark_chapter_started(book_id, chapter_num) + if abort_requested(book_idx) and not chapter_started(book_idx, chapter_num): + log_msg(book_idx, f"[ABORT] Skip chapter {chapter_num}") + + save_skip_reason(book_idx, chapter_num, "abort_before_start") + inc_chapter_done(book_idx) + return None + + # Mark chapter as started + mark_chapter_started(book_idx, chapter_num) # ----------------------------------------------------------- - # NEW POSITION FOR SKIP BLOCK (before any delay logic) + # Path resolution # ----------------------------------------------------------- - save_path = get_save_path(chapter_num, base_path) + save_path = get_save_path(chapter_num, volume_path) + os.makedirs(volume_path, exist_ok=True) + # ----------------------------------------------------------- + # Skip if file already exists + # ----------------------------------------------------------- if os.path.exists(save_path): - log_msg(book_id, f"[DL] SKIP {chapter_num} (exists) → {save_path}") - return { - "book_id": book_id, - "chapter": chapter_num, - "url": chapter_url, - "html": None, - "skipped": True, - "path": save_path, - } + log_msg(book_idx, f"[DL] SKIP {chapter_num} (exists)") + + save_skip_reason(book_idx, chapter_num, "already_exists") + inc_chapter_done(book_idx) + return None # ----------------------------------------------------------- - # Hard delay (only for real downloads) + # Delay + concurrency enforcement # ----------------------------------------------------------- if GLOBAL_DELAY > 0: time.sleep(GLOBAL_DELAY) - # Sync delay wait_for_global_delay() - - # Acquire concurrency slot acquire_global_slot(MAX_CONCURRENCY) - log_msg(book_id, f"[DL] ACQUIRED SLOT for chapter {chapter_num}") + log_msg(book_idx, f"[DL] ACQUIRED SLOT for chapter {chapter_num}") + + # ----------------------------------------------------------- + # HTTP Download + # ----------------------------------------------------------- try: - # ----------------------------------------------------------- - # HTTP DOWNLOAD - # ----------------------------------------------------------- - log_msg(book_id, f"[DL] Downloading chapter {chapter_num}: {chapter_url}") + log_msg(book_idx, f"[DL] GET chapter {chapter_num}: {chapter_url}") resp = requests.get( chapter_url, @@ -184,42 +178,31 @@ def download_chapter( resp.encoding = resp.apparent_encoding or "gb2312" html = resp.text - log_msg(book_id, f"[DL] OK {chapter_num}: {len(html)} bytes") + # Write file + with open(save_path, "w", encoding="utf-8") as f: + f.write(html) - return { - "book_id": book_id, - "chapter": chapter_num, - "url": chapter_url, - "html": html, - "skipped": False, - "path": save_path, - } + log_msg(book_idx, f"[DL] OK {chapter_num}: {len(html)} bytes") + inc_chapter_done(book_idx) + return None except Exception as exc: - attempt = self.request.retries - delay = BASE_DELAY * (BACKOFF**attempt) - - # 429 hard block + # 429: Too Many Requests if getattr(getattr(exc, "response", None), "status_code", None) == 429: - log_msg( - book_id, - f"[DL] 429 {chapter_num} → WAIT {DELAY_429}s " - f"(attempt {attempt}/{MAX_RETRIES})", - ) - + log_msg(book_idx, f"[DL] 429 → wait {DELAY_429}s") time.sleep(DELAY_429) set_global_delay() raise self.retry(exc=exc, countdown=0, max_retries=MAX_RETRIES) - # Normal error + # Standard error + delay = BASE_DELAY * (BACKOFF ** min(self.request.retries, 5)) log_msg( - book_id, + book_idx, f"[DL] ERROR {chapter_num}: {exc} → retry in {delay}s " - f"(attempt {attempt}/{MAX_RETRIES})", + f"(attempt {self.request.retries + 1}/{MAX_RETRIES})", ) + raise self.retry(exc=exc, countdown=delay, max_retries=MAX_RETRIES) finally: - set_global_delay() release_global_slot() - log_msg(book_id, f"[DL] RELEASED SLOT for chapter {chapter_num}") diff --git a/bookscraper/scraper/tasks/parse_tasks.py b/bookscraper/scraper/tasks/parse_tasks.py index 52066f9..7460f70 100644 --- a/bookscraper/scraper/tasks/parse_tasks.py +++ b/bookscraper/scraper/tasks/parse_tasks.py @@ -1,155 +1,132 @@ -# ========================================================= +# ============================================================ # File: scraper/tasks/parse_tasks.py -# Purpose: Parse downloaded HTML into clean chapter text. -# Enhanced version: Piaotia H1→content extractor + clean pipeline -# NO HARDCODED REPLACEMENTS — everything comes from replacement files -# ========================================================= +# +# FINAL ARCHITECTURE — CELERY-SAFE VERSION +# +# This task parses a downloaded chapter using ONLY primitive, +# Celery-safe arguments: +# +# parse_chapter(book_idx, chapter_num, html_path, text_path) +# +# NO BookContext or ChapterContext are ever loaded. +# +# Responsibilities: +# • Read HTML file from disk +# • Convert to cleaned text +# • Write .txt file back to disk +# • Update Redis progress counters +# • Abort-aware +# • Skip if HTML missing +# +# This matches the structure of download_tasks.py and avoids all +# serialization issues. +# ============================================================ from celery_app import celery_app -from bs4 import BeautifulSoup -from scraper.utils import clean_text, load_all_replacements -from scraper.tasks.download_tasks import log_msg # unified logger +from scraper.utils import clean_text +from scraper.abort import abort_requested, chapter_started, mark_chapter_started +from scraper.progress import ( + inc_chapter_done, + save_skip_reason, +) -print(">>> [IMPORT] parse_tasks.py loaded (enhanced parser)") +from logbus.publisher import log +import os +from datetime import datetime + +print(">>> [IMPORT] parse_tasks.py loaded (final Celery-safe mode)") + + +# ----------------------------------------------------------- +# TIMESTAMPED LOGGER (book_idx ONLY) +# ----------------------------------------------------------- +def log_msg(book_idx: str, message: str): + ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + log(f"{ts} [{book_idx}] {message}") + + +# ============================================================ +# CELERY TASK — PRIMITIVE ARGUMENT MODE +# +# book_idx: str Unique Redis state key +# chapter_num: int Chapter number (1-based) +# html_path: str Path to downloaded HTML file +# text_path: str Path where parsed/cleaned text must be written +# ============================================================ @celery_app.task(bind=True, queue="parse", ignore_result=False) -def parse_chapter(self, download_result: dict, meta: dict): - - book_id = download_result.get("book_id", "NOBOOK") - - # ------------------------------------------------------------ - # SKIPPED DOWNLOAD → SKIP PARSE - # ------------------------------------------------------------ - if download_result.get("skipped"): - chapter = download_result.get("chapter") - log_msg(book_id, f"[PARSE] SKIP chapter {chapter} (download skipped)") - download_result["book_id"] = book_id - return download_result - - # ------------------------------------------------------------ - # NORMAL PARSE - # ------------------------------------------------------------ - chapter_num = download_result["chapter"] - chapter_url = download_result["url"] - html = download_result["html"] - - log_msg(book_id, f"[PARSE] Parsing chapter {chapter_num}") - - soup = BeautifulSoup(html, "lxml") - - # ------------------------------------------------------------ - # STRICT SELECTORS (direct content blocks) - # ------------------------------------------------------------ - selectors = [ - "#content", - "div#content", - ".content", - "div.content", - "#chaptercontent", - "div#chaptercontent", - "#chapterContent", - ".read-content", - "div.read-content", - ] - - node = None - for sel in selectors: - tmp = soup.select_one(sel) - if tmp: - node = tmp - break - - # ------------------------------------------------------------ - # PIAOTIA FALLBACK: - # Extract content between

and the "bottomlink" block. - # ------------------------------------------------------------ - raw = None - if node is None: - h1 = soup.find("h1") - if h1: - content_parts = [] - for sib in h1.next_siblings: - - # stop at bottom navigation/footer block - sib_class = getattr(sib, "get", lambda *_: None)("class") - if sib_class and ( - "bottomlink" in sib_class or sib_class == "bottomlink" - ): - break - - # ignore typical noise containers - if getattr(sib, "name", None) in ["script", "style", "center"]: - continue - - if hasattr(sib, "get_text"): - content_parts.append(sib.get_text(separator="\n")) - else: - content_parts.append(str(sib)) - - raw = "\n".join(content_parts) - - # ------------------------------------------------------------ - # FINAL FALLBACK - # ------------------------------------------------------------ - if raw is None: - if node: - raw = node.get_text(separator="\n") - else: - # drop scripts & styles - for tag in soup(["script", "style", "noscript"]): - tag.decompose() - - raw = soup.get_text(separator="\n") - - # ------------------------------------------------------------ - # MULTIPASS CLEANING via replacement files ONLY - # ------------------------------------------------------------ - REPL = load_all_replacements() - - text = raw - for _ in range(5): # like the C# CleanText loop - text = clean_text(text, REPL) - - # ------------------------------------------------------------ - # Collapse excessive empty lines - # ------------------------------------------------------------ - cleaned = [] - prev_blank = False - - for line in text.split("\n"): - stripped = line.rstrip() - if stripped == "": - if prev_blank: - continue - prev_blank = True - cleaned.append("") - else: - prev_blank = False - cleaned.append(stripped) - - text = "\n".join(cleaned) - - # ------------------------------------------------------------ - # Add header to chapter 1 - # ------------------------------------------------------------ - if chapter_num == 1: - book_url = meta.get("book_url") or meta.get("url") or "UNKNOWN" - header = ( - f"{meta.get('title','')}\n" - f"Author: {meta.get('author','')}\n" - f"Description:\n{meta.get('description','')}\n" - f"Book URL: {book_url}\n" + "-" * 50 + "\n\n" - ) - text = header + text - - log_msg(book_id, f"[PARSE] Parsed chapter {chapter_num}: {len(text)} chars") - - return { - "book_id": book_id, - "chapter": chapter_num, - "url": chapter_url, - "text": text, - "length": len(text), - } +def parse_chapter( + self, book_idx: str, chapter_num: int, html_path: str, text_path: str +): + """ + Parse a downloaded chapter using ONLY primitive arguments. + Converts HTML → cleaned text and writes it to disk. + """ + + # ----------------------------------------------------------- + # Abort BEFORE start + # ----------------------------------------------------------- + if abort_requested(book_idx) and not chapter_started(book_idx, chapter_num): + log_msg(book_idx, f"[ABORT] Skip PARSE {chapter_num}") + + save_skip_reason(book_idx, chapter_num, "abort_before_start") + inc_chapter_done(book_idx) + return None + + # Mark as started + mark_chapter_started(book_idx, chapter_num) + + # ----------------------------------------------------------- + # Check if HTML file exists + # ----------------------------------------------------------- + if not os.path.exists(html_path): + log_msg(book_idx, f"[PARSE] SKIP {chapter_num} (no HTML file)") + + save_skip_reason(book_idx, chapter_num, "no_html_file") + inc_chapter_done(book_idx) + return None + + # ----------------------------------------------------------- + # Load HTML + # ----------------------------------------------------------- + try: + with open(html_path, "r", encoding="utf-8") as f: + raw_html = f.read() + except Exception as exc: + log_msg(book_idx, f"[PARSE] ERROR reading HTML {chapter_num}: {exc}") + + save_skip_reason(book_idx, chapter_num, f"read_error: {exc}") + inc_chapter_done(book_idx) + return None + + if not raw_html.strip(): + log_msg(book_idx, f"[PARSE] SKIP {chapter_num} (empty HTML)") + + save_skip_reason(book_idx, chapter_num, "html_empty") + inc_chapter_done(book_idx) + return None + + # ----------------------------------------------------------- + # Clean HTML → Text + # ----------------------------------------------------------- + try: + log_msg(book_idx, f"[PARSE] Start {chapter_num}") + + cleaned = clean_text(raw_html) + + os.makedirs(os.path.dirname(text_path), exist_ok=True) + with open(text_path, "w", encoding="utf-8") as f: + f.write(cleaned) + + log_msg(book_idx, f"[PARSE] OK {chapter_num}: {len(cleaned)} chars") + inc_chapter_done(book_idx) + return None + + except Exception as exc: + log_msg(book_idx, f"[PARSE] ERROR {chapter_num}: {exc}") + + save_skip_reason(book_idx, chapter_num, f"parse_error: {exc}") + inc_chapter_done(book_idx) + return None diff --git a/bookscraper/scraper/tasks/pipeline.py b/bookscraper/scraper/tasks/pipeline.py index 9da657e..2dae558 100644 --- a/bookscraper/scraper/tasks/pipeline.py +++ b/bookscraper/scraper/tasks/pipeline.py @@ -1,17 +1,19 @@ -# ========================================================= +# ============================================================ # File: scraper/tasks/pipeline.py # Purpose: -# Build Celery chains for chapter processing. +# Build a per-chapter Celery pipeline using ONLY JSON-safe +# arguments: (book_idx, chapter_num). # -# Chain: -# download_chapter(book_id, chapter_num, url, base_path) -# → parse_chapter(download_result, meta) -# → save_chapter(parsed_result, base_path) -# → update_progress(final_result, book_id) +# Pipeline: +# download_chapter(book_idx, chapter_num) +# → parse_chapter(book_idx, chapter_num) +# → save_chapter(book_idx, chapter_num) +# → update_progress(book_idx) # -# All subtasks must pass through result dicts untouched so the -# next stage receives the correct fields. -# ========================================================= +# No BookContext/ChapterContext objects are passed through +# Celery anymore. All context is loaded internally inside +# each task. +# ============================================================ from celery import chain @@ -21,25 +23,41 @@ from scraper.tasks.save_tasks import save_chapter from scraper.tasks.progress_tasks import update_progress -def build_chapter_pipeline( - book_id: str, - chapter_number: int, - chapter_url: str, - base_path: str, - meta: dict, -): +print(">>> [IMPORT] pipeline.py loaded (ID-only mode)") + + +# ============================================================ +# Build chapter pipeline +# ============================================================ +def build_chapter_pipeline(book_idx: str, chapter_num: int): """ - Build a Celery chain for one chapter. + Constructs the Celery chain for a single chapter. - download_chapter(book_id, chapter_number, chapter_url, base_path) - → parse_chapter(download_result, meta) - → save_chapter(parsed_result, base_path) - → update_progress(result, book_id) + Every task receives only JSON-safe arguments. All state + (BookContext + ChapterContext) is loaded inside the task. """ return chain( - download_chapter.s(book_id, chapter_number, chapter_url, base_path), - parse_chapter.s(meta), - save_chapter.s(base_path), - update_progress.s(book_id), + download_chapter.s(book_idx, chapter_num), + parse_chapter.s(book_idx, chapter_num), + save_chapter.s(book_idx, chapter_num), + # update_progress only needs book_idx. + # BUT chain forwards previous task's result → so we accept *args. + update_progress.s(book_idx), ) + + +# ============================================================ +# Build pipelines for all chapters +# (usually called from download_controller) +# ============================================================ +def build_all_pipelines(book_idx: str, chapters): + """ + Utility: given a list of chapter numbers, build a list of chains. + + chapters = [1, 2, 3, ...] + """ + pipelines = [] + for ch in chapters: + pipelines.append(build_chapter_pipeline(book_idx, ch)) + return pipelines diff --git a/bookscraper/scraper/tasks/progress_tasks.py b/bookscraper/scraper/tasks/progress_tasks.py index 9045fab..717a70c 100644 --- a/bookscraper/scraper/tasks/progress_tasks.py +++ b/bookscraper/scraper/tasks/progress_tasks.py @@ -1,43 +1,72 @@ # ============================================================ # File: scraper/tasks/progress_tasks.py -# Purpose: Central progress updater for chapter pipelines. +# Purpose: +# Update pipeline progress after each chapter finishes. +# +# MUST accept chain-call semantics: +# update_progress(previous_result, book_idx) +# +# Only book_idx is meaningful; previous_result is ignored. +# +# JSON-safe: no BookContext or ChapterContext objects are +# ever passed into Celery tasks. # ============================================================ from celery_app import celery_app -from scraper.progress import inc_completed, inc_skipped, inc_failed +from scraper.progress import inc_chapter_done, set_status, get_progress + from logbus.publisher import log +from datetime import datetime + + +print(">>> [IMPORT] progress_tasks.py loaded (ID-only mode)") -print(">>> [IMPORT] progress_tasks.py loaded") +# ----------------------------------------------------------- +# TIMESTAMPED LOGGER (book_idx ONLY) +# ----------------------------------------------------------- +def log_msg(book_idx: str, msg: str): + ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + log(f"{ts} [{book_idx}] {msg}") -@celery_app.task(bind=False, name="progress.update", queue="controller") -def update_progress(result: dict, book_id: str): + +# ============================================================ +# CELERY TASK — must accept chain semantics +# ============================================================ +@celery_app.task(bind=True, queue="progress", ignore_result=False) +def update_progress(self, *args): """ - Central progress logic: - - result: output of save_chapter - - book_id: explicitly passed by pipeline + Chain-safe progress update. + + Celery chain will call: + update_progress(previous_result, book_idx) + + Therefore: + book_idx = args[-1] - IMPORTANT: - - save_chapter already updates counters for skipped & normal chapters - - progress.update MUST NOT double-increment + This increments the done counter and sets status to + "completed" once all chapters are done. """ - ch = result.get("chapter") - skipped = result.get("skipped", False) - failed = result.get("failed", False) + if not args: + return None + + # Last argument is ALWAYS the book_idx + book_idx = args[-1] + + # Increment chapter_done counter + inc_chapter_done(book_idx) - if failed: - inc_failed(book_id) - log(f"[PROG] FAILED chapter {ch}") + # Fetch progress counters + prog = get_progress(book_idx) + done = prog.get("done", 0) + total = prog.get("total", 0) - elif skipped: - # save_chapter already did: - # inc_skipped + inc_completed - log(f"[PROG] SKIPPED chapter {ch}") + log_msg(book_idx, f"[PROGRESS] Updated: {done}/{total}") - else: - # Normal completion: save_chapter only does inc_completed - inc_completed(book_id) - log(f"[PROG] DONE chapter {ch}") + # If finished → update status + if total > 0 and done >= total: + set_status(book_idx, "completed") + log_msg(book_idx, "[PROGRESS] All chapters completed") - return result + return None diff --git a/bookscraper/scraper/tasks/save_tasks.py b/bookscraper/scraper/tasks/save_tasks.py index 8aa0578..e615e9a 100644 --- a/bookscraper/scraper/tasks/save_tasks.py +++ b/bookscraper/scraper/tasks/save_tasks.py @@ -1,123 +1,128 @@ # ============================================================ # File: scraper/tasks/save_tasks.py -# Purpose: Save parsed chapter text to disk + trigger audio. +# +# FINAL ARCHITECTURE — CELERY-SAFE VERSION +# +# save_chapter(book_idx, chapter_num, text_path, json_path) +# +# Deze task slaat GEEN BookContext meer op, en laadt hem ook niet. +# Alle data komt uit PRIMITIEVE argumenten zodat Celery nooit +# hoeft te serialiseren of picklen. +# +# Functionaliteit: +# • Abort-aware +# • Skip als text ontbreekt +# • Schrijft JSON chapter-object naar disk +# • Houdt Redis progress state bij +# +# Deze versie is 100% consistent met download → parse → save pipeline. # ============================================================ -print(">>> [IMPORT] save_tasks.py loaded") +from celery_app import celery_app -from celery import shared_task +from scraper.abort import abort_requested, chapter_started, mark_chapter_started +from scraper.progress import inc_chapter_done, save_skip_reason + +from logbus.publisher import log +import json import os +from datetime import datetime + -from scraper.utils import get_save_path -from scraper.tasks.download_tasks import log_msg # unified logger -from scraper.progress import ( - inc_completed, - inc_skipped, - inc_failed, - add_failed_chapter, -) +print(">>> [IMPORT] save_tasks.py loaded (final Celery-safe mode)") -from scraper.tasks.audio_tasks import generate_audio +# ----------------------------------------------------------- +# TIMESTAMP LOGGER +# ----------------------------------------------------------- +def log_msg(book_idx: str, message: str): + ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + log(f"{ts} [{book_idx}] {message}") -@shared_task(bind=True, queue="save", ignore_result=False) -def save_chapter(self, parsed: dict, base_path: str): + +# ============================================================ +# CELERY TASK — primitive arguments only +# +# book_idx: str +# chapter_num: int +# text_path: str path to parsed .txt file +# json_path: str output path for chapter JSON model +# ============================================================ +@celery_app.task(bind=True, queue="save", ignore_result=False) +def save_chapter(self, book_idx: str, chapter_num: int, text_path: str, json_path: str): """ - Save parsed chapter text to disk. - - parsed = { - "book_id": str, - "chapter": int, - "text": str, - "url": str, - "skipped": bool, - "path": optional str - } + Save parsed chapter text + metadata to JSON on disk. + No BookContext is loaded or saved. """ - book_id = parsed.get("book_id", "NOBOOK") - chapter = parsed.get("chapter") - - # ------------------------------------------------------------ - # SKIP CASE (download or parse skipped the chapter) - # ------------------------------------------------------------ - if parsed.get("skipped"): - path = parsed.get("path", "(no-path)") - log_msg(book_id, f"[SAVE] SKIP chapter {chapter} → {path}") - - inc_skipped(book_id) - - # Determine volume name from the base path - volume_name = os.path.basename(base_path.rstrip("/")) - - # Queue audio using the existing saved file - try: - generate_audio.delay( - book_id, - volume_name, - chapter, - f"Chapter {chapter}", - path, # <<-- correct: this is always the real file path - ) - log_msg( - book_id, - f"[AUDIO] Task queued (SKIPPED) for chapter {chapter} in {volume_name}", - ) - except Exception as audio_exc: - log_msg( - book_id, - f"[AUDIO] ERROR queueing (SKIPPED) chapter {chapter}: {audio_exc}", - ) - - return { - "book_id": book_id, # <<< FIXED - "chapter": chapter, - "path": path, - "skipped": True, - } - - # ------------------------------------------------------------ - # NORMAL SAVE CASE - # ------------------------------------------------------------ - try: - text = parsed.get("text", "") + # ----------------------------------------------------------- + # Abort BEFORE the task starts + # ----------------------------------------------------------- + if abort_requested(book_idx) and not chapter_started(book_idx, chapter_num): + log_msg(book_idx, f"[ABORT] Skip SAVE {chapter_num}") + + save_skip_reason(book_idx, chapter_num, "abort_before_start") + inc_chapter_done(book_idx) + return None - if chapter is None: - raise ValueError("Missing chapter number in parsed payload") + # Mark chapter as started + mark_chapter_started(book_idx, chapter_num) - # Ensure chapter folder exists - os.makedirs(base_path, exist_ok=True) + # ----------------------------------------------------------- + # Ensure parsed text exists + # ----------------------------------------------------------- + if not os.path.exists(text_path): + log_msg(book_idx, f"[SAVE] SKIP {chapter_num} (missing text file)") - # Build chapter file path - path = get_save_path(chapter, base_path) + save_skip_reason(book_idx, chapter_num, "no_text_file") + inc_chapter_done(book_idx) + return None - # Save chapter text to disk - with open(path, "w", encoding="utf-8") as f: - f.write(text) + # ----------------------------------------------------------- + # Read parsed text + # ----------------------------------------------------------- + try: + with open(text_path, "r", encoding="utf-8") as f: + text = f.read() + except Exception as exc: + log_msg(book_idx, f"[SAVE] ERROR reading text {chapter_num}: {exc}") + + save_skip_reason(book_idx, chapter_num, f"text_read_error: {exc}") + inc_chapter_done(book_idx) + return None + + if not text.strip(): + log_msg(book_idx, f"[SAVE] SKIP {chapter_num} (text empty)") - log_msg(book_id, f"[SAVE] Saved chapter {chapter} → {path}") + save_skip_reason(book_idx, chapter_num, "text_empty") + inc_chapter_done(book_idx) + return None - inc_completed(book_id) + # ----------------------------------------------------------- + # Build JSON chapter representation + # ----------------------------------------------------------- + chapter_obj = { + "chapter_num": chapter_num, + "text": text, + } + + # ----------------------------------------------------------- + # Write JSON output + # ----------------------------------------------------------- + try: + os.makedirs(os.path.dirname(json_path), exist_ok=True) - # Determine volume name - volume_name = os.path.basename(base_path.rstrip("/")) + with open(json_path, "w", encoding="utf-8") as f: + json.dump(chapter_obj, f, ensure_ascii=False, indent=2) - # Queue audio task (always use the saved file path) - try: - generate_audio.delay( - book_id, - volume_name, - chapter, - f"Chapter {chapter}", - path, - ) - log_msg( - book_id, f"[AUDIO] Task queued for chapter {chapter} in {volume_name}" - ) - except Exception as audio_exc: - log_msg(book_id, f"[AUDIO] ERROR queueing chapter {chapter}: {audio_exc}") + log_msg(book_idx, f"[SAVE] OK {chapter_num}: {json_path}") - return {"book_id": book_id, "chapter": chapter, "path": path} + inc_chapter_done(book_idx) + return None except Exception as exc: - log_msg(book_id, f"[SAVE] ERROR saving chapter {chapter}: {exc}") + log_msg(book_idx, f"[SAVE] ERROR writing JSON {chapter_num}: {exc}") + + save_skip_reason(book_idx, chapter_num, f"json_write_error: {exc}") + inc_chapter_done(book_idx) + return None diff --git a/bookscraper/scraper/tasks/scraping.py b/bookscraper/scraper/tasks/scraping.py index 0694089..70198d0 100644 --- a/bookscraper/scraper/tasks/scraping.py +++ b/bookscraper/scraper/tasks/scraping.py @@ -12,11 +12,11 @@ import redis from scraper.sites import BookSite from scraper.book_scraper import BookScraper from scraper.abort import clear_abort # no circular deps -from scraper.ui_log import reset_ui_logs # <-- NEW IMPORT +from scraper.ui_log import reset_ui_logs # NEW print(">>> [IMPORT] scraping.py loaded") -# Redis connection (same as Celery broker) +# Redis = same URL as Celery broker REDIS_URL = os.getenv("REDIS_BROKER", "redis://redis:6379/0") r = redis.Redis.from_url(REDIS_URL, decode_responses=True) @@ -26,24 +26,24 @@ def start_scrape_book(self, url: str): """Scrapes metadata + chapters and prepares download tracking.""" # ------------------------------------------------------------ - # NEW: clear UI log buffer at start of new run + # Clear UI logs for a fresh run # ------------------------------------------------------------ reset_ui_logs() log(f"[SCRAPING] Start scraping for: {url}") # ------------------------------------------------------------ - # Book scrape + # Scrape metadata + chapters # ------------------------------------------------------------ site = BookSite() scraper = BookScraper(site, url) - result = scraper.execute() # returns dict with metadata + chapters + result = scraper.execute() # dict with metadata + chapters list chapters = result.get("chapters", []) full_count = len(chapters) # ------------------------------------------------------------ - # DRY RUN + # DRY RUN (limit number of chapters) # ------------------------------------------------------------ DRY_RUN = os.getenv("DRY_RUN", "0") == "1" TEST_LIMIT = int(os.getenv("TEST_LIMIT", "5")) @@ -51,34 +51,45 @@ def start_scrape_book(self, url: str): if DRY_RUN: log(f"[SCRAPING] DRY_RUN: limiting chapters to {TEST_LIMIT}") chapters = chapters[:TEST_LIMIT] - result["chapters"] = chapters + + # ------------------------------------------------------------ + # NORMALISE OUTPUT FORMAT + # - chapters = INT + # - chapter_list = LIST + # ------------------------------------------------------------ + result["chapter_list"] = chapters + result["chapters"] = len(chapters) log(f"[SCRAPING] Completed scrape: {len(chapters)}/{full_count} chapters") # ------------------------------------------------------------ - # BOOK RUN ID (using title as ID) + # Ensure book_id exists # ------------------------------------------------------------ - title = result.get("title") or "UnknownBook" - book_id = title # user requirement + book_idx = result.get("book_idx") + if not book_idx: + raise ValueError("BookScraper did not return book_idx") + book_id = book_idx result["book_id"] = book_id - log(f"[SCRAPING] Assigned book_id = '{book_id}'") + log(f"[SCRAPING] Assigned book_id = {book_id}") # ------------------------------------------------------------ # RESET ABORT + INITIALISE PROGRESS # ------------------------------------------------------------ clear_abort(book_id) - r.set(f"progress:{book_id}:total", len(chapters)) + r.set(f"progress:{book_id}:total", result["chapters"]) r.set(f"progress:{book_id}:done", 0) - r.delete(f"logs:{book_id}") # clear old logs if any + + # clear legacy logs + r.delete(f"logs:{book_id}") r.rpush(f"logs:{book_id}", f":: SCRAPING STARTED for {url}") - r.rpush(f"logs:{book_id}", f":: Found {len(chapters)} chapters") + r.rpush(f"logs:{book_id}", f":: Found {result['chapters']} chapters") # ------------------------------------------------------------ - # DISPATCH DOWNLOAD CONTROLLER + # DISPATCH CONTROLLER (book_idx + primitive metadata) # ------------------------------------------------------------ celery_app.send_task( "scraper.tasks.controller_tasks.launch_downloads", @@ -86,11 +97,11 @@ def start_scrape_book(self, url: str): queue="controller", ) - log(f"[SCRAPING] Dispatched download controller for '{book_id}'") + log(f"[SCRAPING] Dispatched download controller for {book_id}") return { "book_id": book_id, "title": result.get("title"), "author": result.get("author"), - "chapters": len(chapters), + "chapters": result["chapters"], # integer } diff --git a/bookscraper/templates/index.html b/bookscraper/templates/index.html index a8a4b76..a751f12 100644 --- a/bookscraper/templates/index.html +++ b/bookscraper/templates/index.html @@ -1,34 +1,28 @@ - - - - - BookScraper - - - +{% extends "base.html" %} {% block content %} -

BookScraper WebGUI

+

BookScraper

+ +
+

Start new scrape

+ +
+ + - -

- -
+ +
+ +
+

Library

+

Bekijk alle eerder gescrapete boeken.

+ Open Library +
- - +{% endblock %} diff --git a/bookscraper/templates/result.html b/bookscraper/templates/result.html index 57aabf9..379cf6b 100644 --- a/bookscraper/templates/result.html +++ b/bookscraper/templates/result.html @@ -1,239 +1,33 @@ - - - - - BookScraper – Resultaat +{% extends "base.html" %} {% block content %} - - +
+ Terug +
- - ← Terug -

Scrape Resultaat--

+ {% else %} {% if message %} +

{{ message }}

+ {% endif %} {% if scraping_task_id %} +

Task ID: {{ scraping_task_id }}

+ {% endif %} - {% if error %} -
- Fout: {{ error }} -
- {% endif %} {% if message %} -
{{ message }}
- {% endif %} +

+ Je scraper is gestart.
+ Zodra het eerste resultaat beschikbaar is, verschijnt het boek automatisch + in de Library. +

- - {% if book_title %} -
- Cover:
- Cover -
- {% endif %} + - + {% endif %} + - - - -
- Live log:
- - - - -
-
- - - - +{% endblock %}