From 57f0d6500f8b52abc242ed735f58e634bf4d91f0 Mon Sep 17 00:00:00 2001 From: "peter.fong" Date: Thu, 4 Dec 2025 10:16:05 +0100 Subject: [PATCH] =?UTF-8?q?Refactor:=20full=20chapter=5Fdict=20+=20book=5F?= =?UTF-8?q?meta=20pipeline=20(download=E2=86=92parse=E2=86=92save=E2=86=92?= =?UTF-8?q?progress)=20+=20C&U=20tasks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- bookscraper/README.md | 2 + bookscraper/docker-compose.yml | 11 +- bookscraper/scraper/download_controller.py | 15 ++- bookscraper/scraper/tasks/download_tasks.py | 109 ++++++++--------- bookscraper/scraper/tasks/parse_tasks.py | 46 ++++---- bookscraper/scraper/tasks/pipeline.py | 32 +++-- bookscraper/scraper/tasks/progress_tasks.py | 34 ++++-- bookscraper/scraper/tasks/save_tasks.py | 123 +++++++++++--------- 8 files changed, 202 insertions(+), 170 deletions(-) diff --git a/bookscraper/README.md b/bookscraper/README.md index 8cc03d0..01b0304 100644 --- a/bookscraper/README.md +++ b/bookscraper/README.md @@ -126,5 +126,7 @@ docker run \ ``` docker compose down +docker builder prune -af +docker volume prune -f docker compose build --no-cache docker compose up diff --git a/bookscraper/docker-compose.yml b/bookscraper/docker-compose.yml index 0f607b7..b2b7feb 100644 --- a/bookscraper/docker-compose.yml +++ b/bookscraper/docker-compose.yml @@ -1,10 +1,19 @@ services: # ---------------------------------------------------------- - # Redis broker & backend + # Redis broker & backend (NO SNAPSHOTS, NO AOF) # ---------------------------------------------------------- redis: image: redis:7 container_name: bookscraper_redis + command: [ + "redis-server", + "--save", + "", # Disable RDB snapshots + "--appendonly", + "no", # Disable AOF + "--stop-writes-on-bgsave-error", + "no", # Never block writes + ] ports: - "6379:6379" healthcheck: diff --git a/bookscraper/scraper/download_controller.py b/bookscraper/scraper/download_controller.py index 9a9e978..bb2b57a 100644 --- a/bookscraper/scraper/download_controller.py +++ b/bookscraper/scraper/download_controller.py @@ -185,17 +185,26 @@ class DownloadController: tasks = [] for ch in self.chapters: + + # Build chapter_dict (NEW) chapter_num = ch["num"] chapter_url = ch["url"] + chapter_title = ch.get("title") volume_path = self.get_volume_path(chapter_num) + chapter_dict = { + "num": chapter_num, + "url": chapter_url, + "title": chapter_title, + "volume_path": volume_path, + } + + # Dispatch pipeline with chapter_dict tasks.append( build_chapter_pipeline( self.book_id, - chapter_num, - chapter_url, - volume_path, + chapter_dict, self.meta, ) ) diff --git a/bookscraper/scraper/tasks/download_tasks.py b/bookscraper/scraper/tasks/download_tasks.py index 5110483..8fbc339 100644 --- a/bookscraper/scraper/tasks/download_tasks.py +++ b/bookscraper/scraper/tasks/download_tasks.py @@ -4,19 +4,17 @@ # retry/backoff logic, 429 support, and abort-awareness. # # Logging: -# - timestamp + book_id in de message -# - message wordt via publisher.py naar console gestuurd -# - message wordt via ui_log.push_ui naar Redis GUI logbuffer gestuurd -# -# publisher.py en ui_log.py blijven DOM. +# - timestamp + book_id in message +# - logbus.publisher → console +# - ui_log.push_ui → Redis GUI # ============================================================ from celery_app import celery_app from scraper.utils import get_save_path from scraper.abort import abort_requested, chapter_started, mark_chapter_started -from logbus.publisher import log # console logging (DOM) -from scraper.ui_log import push_ui # GUI logging (DOM) +from logbus.publisher import log +from scraper.ui_log import push_ui import requests import redis @@ -39,36 +37,26 @@ def log_msg(book_id: str, message: str): # ----------------------------------------------------------- -# Retry parameters (ENV) +# ENV CONFIG # ----------------------------------------------------------- MAX_RETRIES = int(os.getenv("DOWNLOAD_MAX_RETRIES", "7")) BASE_DELAY = int(os.getenv("DOWNLOAD_BASE_DELAY", "2")) BACKOFF = int(os.getenv("DOWNLOAD_BACKOFF_MULTIPLIER", "2")) DELAY_429 = int(os.getenv("DOWNLOAD_429_DELAY", "10")) -# ----------------------------------------------------------- -# Global concurrency -# ----------------------------------------------------------- MAX_CONCURRENCY = int(os.getenv("DOWNLOAD_MAX_GLOBAL_CONCURRENCY", "1")) - -# ----------------------------------------------------------- -# Global delay sync -# ----------------------------------------------------------- GLOBAL_DELAY = int(os.getenv("DOWNLOAD_GLOBAL_MIN_DELAY", "1")) -DELAY_KEY = "download:delay_lock" -# ----------------------------------------------------------- -# Redis -# ----------------------------------------------------------- REDIS_URL = os.getenv("REDIS_BROKER", "redis://redis:6379/0") redis_client = redis.Redis.from_url(REDIS_URL) SEM_KEY = "download:active" +DELAY_KEY = "download:delay_lock" -# ============================================================ -# GLOBAL DELAY FUNCTIONS -# ============================================================ +# ----------------------------------------------------------- +# DELAY + CONCURRENCY HELPERS +# ----------------------------------------------------------- def wait_for_global_delay(): if GLOBAL_DELAY <= 0: return @@ -82,9 +70,6 @@ def set_global_delay(): redis_client.set(DELAY_KEY, "1", nx=True, ex=GLOBAL_DELAY) -# ============================================================ -# GLOBAL CONCURRENCY FUNCTIONS -# ============================================================ def acquire_global_slot(max_slots: int, retry_delay: float = 0.5): while True: current = redis_client.incr(SEM_KEY) @@ -98,81 +83,82 @@ def release_global_slot(): redis_client.decr(SEM_KEY) -print(f">>> [CONFIG] Global concurrency = {MAX_CONCURRENCY}") -print(f">>> [CONFIG] Global min delay = {GLOBAL_DELAY}s") -print( - f">>> [CONFIG] Retries: MAX={MAX_RETRIES}, base={BASE_DELAY}, " - f"backoff={BACKOFF}, 429={DELAY_429}" -) - - # ============================================================ -# CELERY TASK: DOWNLOAD CHAPTER +# CELERY TASK — NEW SIGNATURE WITH chapter_dict + book_meta # ============================================================ @celery_app.task(bind=True, queue="download", ignore_result=False) -def download_chapter( - self, book_id: str, chapter_num: int, chapter_url: str, base_path: str -): +def download_chapter(self, book_id: str, chapter_dict: dict, book_meta: dict): """ - Download chapter HTML. - Abort logic: - - If abort active AND chapter not started → SKIP - - If abort active BUT chapter already started → Proceed normally + New unified chapter model: + chapter_dict = { + "num": int, + "url": str, + "title": str, + "volume_path": str + } + + book_meta is propagated through the pipeline for parse/save. """ + chapter_num = chapter_dict.get("num") + chapter_url = chapter_dict.get("url") + chapter_title = chapter_dict.get("title") or f"Chapter {chapter_num}" + volume_path = chapter_dict.get("volume_path") + # ----------------------------------------------------------- # ABORT BEFORE START # ----------------------------------------------------------- if abort_requested(book_id) and not chapter_started(book_id, chapter_num): msg = f"[ABORT] Skip chapter {chapter_num} (abort active, not started)" log_msg(book_id, msg) + return { "book_id": book_id, - "chapter": chapter_num, - "url": chapter_url, + "chapter": chapter_dict, "html": None, "skipped": True, "path": None, "abort": True, + "book_meta": book_meta, } - # Mark started + # Mark chapter as started mark_chapter_started(book_id, chapter_num) # ----------------------------------------------------------- - # NEW POSITION FOR SKIP BLOCK (before any delay logic) + # SKIP IF FILE ALREADY EXISTS # ----------------------------------------------------------- - save_path = get_save_path(chapter_num, base_path) + save_path = get_save_path(chapter_num, volume_path) if os.path.exists(save_path): - log_msg(book_id, f"[DL] SKIP {chapter_num} (exists) → {save_path}") + log_msg(book_id, f"[DL] SKIP {chapter_num} ({chapter_title}) → {save_path}") return { "book_id": book_id, - "chapter": chapter_num, - "url": chapter_url, + "chapter": chapter_dict, "html": None, "skipped": True, "path": save_path, + "book_meta": book_meta, } # ----------------------------------------------------------- - # Hard delay (only for real downloads) + # GLOBAL + SYNC DELAY # ----------------------------------------------------------- if GLOBAL_DELAY > 0: time.sleep(GLOBAL_DELAY) - # Sync delay wait_for_global_delay() - - # Acquire concurrency slot acquire_global_slot(MAX_CONCURRENCY) log_msg(book_id, f"[DL] ACQUIRED SLOT for chapter {chapter_num}") + # ----------------------------------------------------------- + # HTTP DOWNLOAD + # ----------------------------------------------------------- try: - # ----------------------------------------------------------- - # HTTP DOWNLOAD - # ----------------------------------------------------------- - log_msg(book_id, f"[DL] Downloading chapter {chapter_num}: {chapter_url}") + log_msg( + book_id, + f"[DL] Downloading {chapter_num} ({chapter_title}): {chapter_url}", + ) resp = requests.get( chapter_url, @@ -188,30 +174,29 @@ def download_chapter( return { "book_id": book_id, - "chapter": chapter_num, - "url": chapter_url, + "chapter": chapter_dict, "html": html, "skipped": False, "path": save_path, + "book_meta": book_meta, } except Exception as exc: attempt = self.request.retries delay = BASE_DELAY * (BACKOFF**attempt) - # 429 hard block + # Specific 429 handler if getattr(getattr(exc, "response", None), "status_code", None) == 429: log_msg( book_id, f"[DL] 429 {chapter_num} → WAIT {DELAY_429}s " f"(attempt {attempt}/{MAX_RETRIES})", ) - time.sleep(DELAY_429) set_global_delay() raise self.retry(exc=exc, countdown=0, max_retries=MAX_RETRIES) - # Normal error + # Normal retry log_msg( book_id, f"[DL] ERROR {chapter_num}: {exc} → retry in {delay}s " diff --git a/bookscraper/scraper/tasks/parse_tasks.py b/bookscraper/scraper/tasks/parse_tasks.py index 52066f9..71392da 100644 --- a/bookscraper/scraper/tasks/parse_tasks.py +++ b/bookscraper/scraper/tasks/parse_tasks.py @@ -15,25 +15,28 @@ print(">>> [IMPORT] parse_tasks.py loaded (enhanced parser)") @celery_app.task(bind=True, queue="parse", ignore_result=False) -def parse_chapter(self, download_result: dict, meta: dict): +def parse_chapter(self, download_result: dict): + """ + New signature under chapter_dict pipeline: + - receives ONLY the output dict from download_chapter + - book_meta is inside download_result["book_meta"] + - chapter_dict is inside download_result["chapter"] + """ book_id = download_result.get("book_id", "NOBOOK") + chapter_dict = download_result.get("chapter") or {} + book_meta = download_result.get("book_meta") or {} + + chapter_num = chapter_dict.get("num") + chapter_url = chapter_dict.get("url") + html = download_result.get("html") # ------------------------------------------------------------ # SKIPPED DOWNLOAD → SKIP PARSE # ------------------------------------------------------------ if download_result.get("skipped"): - chapter = download_result.get("chapter") - log_msg(book_id, f"[PARSE] SKIP chapter {chapter} (download skipped)") - download_result["book_id"] = book_id - return download_result - - # ------------------------------------------------------------ - # NORMAL PARSE - # ------------------------------------------------------------ - chapter_num = download_result["chapter"] - chapter_url = download_result["url"] - html = download_result["html"] + log_msg(book_id, f"[PARSE] SKIP chapter {chapter_num} (download skipped)") + return download_result # already has chapter + book_meta + skipped log_msg(book_id, f"[PARSE] Parsing chapter {chapter_num}") @@ -72,14 +75,12 @@ def parse_chapter(self, download_result: dict, meta: dict): content_parts = [] for sib in h1.next_siblings: - # stop at bottom navigation/footer block sib_class = getattr(sib, "get", lambda *_: None)("class") if sib_class and ( "bottomlink" in sib_class or sib_class == "bottomlink" ): break - # ignore typical noise containers if getattr(sib, "name", None) in ["script", "style", "center"]: continue @@ -97,10 +98,8 @@ def parse_chapter(self, download_result: dict, meta: dict): if node: raw = node.get_text(separator="\n") else: - # drop scripts & styles for tag in soup(["script", "style", "noscript"]): tag.decompose() - raw = soup.get_text(separator="\n") # ------------------------------------------------------------ @@ -109,7 +108,7 @@ def parse_chapter(self, download_result: dict, meta: dict): REPL = load_all_replacements() text = raw - for _ in range(5): # like the C# CleanText loop + for _ in range(5): text = clean_text(text, REPL) # ------------------------------------------------------------ @@ -135,21 +134,22 @@ def parse_chapter(self, download_result: dict, meta: dict): # Add header to chapter 1 # ------------------------------------------------------------ if chapter_num == 1: - book_url = meta.get("book_url") or meta.get("url") or "UNKNOWN" + book_url = book_meta.get("book_url") or book_meta.get("url") or "UNKNOWN" header = ( - f"{meta.get('title','')}\n" - f"Author: {meta.get('author','')}\n" - f"Description:\n{meta.get('description','')}\n" + f"{book_meta.get('title','')}\n" + f"Author: {book_meta.get('author','')}\n" + f"Description:\n{book_meta.get('description','')}\n" f"Book URL: {book_url}\n" + "-" * 50 + "\n\n" ) text = header + text log_msg(book_id, f"[PARSE] Parsed chapter {chapter_num}: {len(text)} chars") + # NEW RETURN FORMAT: chapter_dict stays intact return { "book_id": book_id, - "chapter": chapter_num, - "url": chapter_url, + "chapter": chapter_dict, "text": text, "length": len(text), + "book_meta": book_meta, } diff --git a/bookscraper/scraper/tasks/pipeline.py b/bookscraper/scraper/tasks/pipeline.py index 9da657e..267af60 100644 --- a/bookscraper/scraper/tasks/pipeline.py +++ b/bookscraper/scraper/tasks/pipeline.py @@ -1,15 +1,15 @@ # ========================================================= # File: scraper/tasks/pipeline.py # Purpose: -# Build Celery chains for chapter processing. +# Build Celery chains for chapter processing using chapter_dict. # -# Chain: -# download_chapter(book_id, chapter_num, url, base_path) -# → parse_chapter(download_result, meta) -# → save_chapter(parsed_result, base_path) +# New Chain: +# download_chapter(book_id, chapter_dict, book_meta) +# → parse_chapter(download_result) +# → save_chapter(parsed_result) # → update_progress(final_result, book_id) # -# All subtasks must pass through result dicts untouched so the +# All subtasks pass through result dicts unchanged so the # next stage receives the correct fields. # ========================================================= @@ -23,23 +23,21 @@ from scraper.tasks.progress_tasks import update_progress def build_chapter_pipeline( book_id: str, - chapter_number: int, - chapter_url: str, - base_path: str, - meta: dict, + chapter_dict: dict, + book_meta: dict, ): """ - Build a Celery chain for one chapter. + Build a Celery chain for one chapter using chapter_dict. - download_chapter(book_id, chapter_number, chapter_url, base_path) - → parse_chapter(download_result, meta) - → save_chapter(parsed_result, base_path) + download_chapter(book_id, chapter_dict, book_meta) + → parse_chapter(download_result) + → save_chapter(parsed_result) → update_progress(result, book_id) """ return chain( - download_chapter.s(book_id, chapter_number, chapter_url, base_path), - parse_chapter.s(meta), - save_chapter.s(base_path), + download_chapter.s(book_id, chapter_dict, book_meta), + parse_chapter.s(), + save_chapter.s(), update_progress.s(book_id), ) diff --git a/bookscraper/scraper/tasks/progress_tasks.py b/bookscraper/scraper/tasks/progress_tasks.py index 9045fab..3752893 100644 --- a/bookscraper/scraper/tasks/progress_tasks.py +++ b/bookscraper/scraper/tasks/progress_tasks.py @@ -1,6 +1,7 @@ # ============================================================ # File: scraper/tasks/progress_tasks.py # Purpose: Central progress updater for chapter pipelines. +# Updated for chapter_dict pipeline model. # ============================================================ from celery_app import celery_app @@ -22,22 +23,35 @@ def update_progress(result: dict, book_id: str): - progress.update MUST NOT double-increment """ - ch = result.get("chapter") + ch = result.get("chapter") or {} + chapter_num = ch.get("num") + skipped = result.get("skipped", False) failed = result.get("failed", False) + # ------------------------------------------------------------ + # FAILED CASE + # ------------------------------------------------------------ if failed: inc_failed(book_id) - log(f"[PROG] FAILED chapter {ch}") + log(f"[PROG] FAILED chapter {chapter_num}") + return result - elif skipped: + # ------------------------------------------------------------ + # SKIPPED CASE + # ------------------------------------------------------------ + if skipped: # save_chapter already did: - # inc_skipped + inc_completed - log(f"[PROG] SKIPPED chapter {ch}") - - else: - # Normal completion: save_chapter only does inc_completed - inc_completed(book_id) - log(f"[PROG] DONE chapter {ch}") + # inc_skipped(book_id) + log(f"[PROG] SKIPPED chapter {chapter_num}") + return result + + # ------------------------------------------------------------ + # NORMAL COMPLETION + # ------------------------------------------------------------ + # save_chapter did NOT increment completed for skipped cases + # but DID inc_completed(book_id) for normal cases. + # update_progress should NOT double increment, so only log here. + log(f"[PROG] DONE chapter {chapter_num}") return result diff --git a/bookscraper/scraper/tasks/save_tasks.py b/bookscraper/scraper/tasks/save_tasks.py index 8aa0578..15b64b9 100644 --- a/bookscraper/scraper/tasks/save_tasks.py +++ b/bookscraper/scraper/tasks/save_tasks.py @@ -1,6 +1,7 @@ # ============================================================ # File: scraper/tasks/save_tasks.py # Purpose: Save parsed chapter text to disk + trigger audio. +# Updated for chapter_dict + book_meta pipeline model. # ============================================================ print(">>> [IMPORT] save_tasks.py loaded") @@ -13,67 +14,76 @@ from scraper.tasks.download_tasks import log_msg # unified logger from scraper.progress import ( inc_completed, inc_skipped, - inc_failed, - add_failed_chapter, ) - from scraper.tasks.audio_tasks import generate_audio @shared_task(bind=True, queue="save", ignore_result=False) -def save_chapter(self, parsed: dict, base_path: str): +def save_chapter(self, parsed: dict): """ - Save parsed chapter text to disk. - + New pipeline model: parsed = { "book_id": str, - "chapter": int, + "chapter": chapter_dict, "text": str, - "url": str, + "length": int, + "book_meta": dict, "skipped": bool, - "path": optional str + "path": optional str (if skipped) } """ book_id = parsed.get("book_id", "NOBOOK") - chapter = parsed.get("chapter") + chapter_dict = parsed.get("chapter") or {} + book_meta = parsed.get("book_meta") or {} + + chapter_num = chapter_dict.get("num") + chapter_title = chapter_dict.get("title") or f"Chapter {chapter_num}" + volume_path = chapter_dict.get("volume_path") # ------------------------------------------------------------ - # SKIP CASE (download or parse skipped the chapter) + # VALIDATION + # ------------------------------------------------------------ + if chapter_num is None or volume_path is None: + raise ValueError("Invalid parsed payload: chapter_dict missing fields.") + + # ------------------------------------------------------------ + # SKIPPED CASE # ------------------------------------------------------------ if parsed.get("skipped"): - path = parsed.get("path", "(no-path)") - log_msg(book_id, f"[SAVE] SKIP chapter {chapter} → {path}") + path = parsed.get("path", None) + log_msg(book_id, f"[SAVE] SKIP chapter {chapter_num} → {path}") inc_skipped(book_id) - # Determine volume name from the base path - volume_name = os.path.basename(base_path.rstrip("/")) - - # Queue audio using the existing saved file - try: - generate_audio.delay( - book_id, - volume_name, - chapter, - f"Chapter {chapter}", - path, # <<-- correct: this is always the real file path - ) - log_msg( - book_id, - f"[AUDIO] Task queued (SKIPPED) for chapter {chapter} in {volume_name}", - ) - except Exception as audio_exc: - log_msg( - book_id, - f"[AUDIO] ERROR queueing (SKIPPED) chapter {chapter}: {audio_exc}", - ) + volume_name = os.path.basename(volume_path.rstrip("/")) + + # Queue audio only if a valid file exists + if path and os.path.exists(path): + try: + generate_audio.delay( + book_id, + volume_name, + chapter_num, + chapter_title, + path, + ) + log_msg( + book_id, + f"[AUDIO] Task queued (SKIPPED) for chapter {chapter_num} in {volume_name}", + ) + except Exception as audio_exc: + log_msg( + book_id, + f"[AUDIO] ERROR queueing (SKIPPED) chapter {chapter_num}: {audio_exc}", + ) return { - "book_id": book_id, # <<< FIXED - "chapter": chapter, + "book_id": book_id, + "chapter": chapter_dict, "path": path, "skipped": True, + "book_meta": book_meta, } # ------------------------------------------------------------ @@ -82,42 +92,47 @@ def save_chapter(self, parsed: dict, base_path: str): try: text = parsed.get("text", "") - if chapter is None: - raise ValueError("Missing chapter number in parsed payload") - - # Ensure chapter folder exists - os.makedirs(base_path, exist_ok=True) + # Ensure volume folder exists + os.makedirs(volume_path, exist_ok=True) - # Build chapter file path - path = get_save_path(chapter, base_path) + # Build final chapter file path + path = get_save_path(chapter_num, volume_path) - # Save chapter text to disk + # Write chapter text to file with open(path, "w", encoding="utf-8") as f: f.write(text) - log_msg(book_id, f"[SAVE] Saved chapter {chapter} → {path}") - + log_msg(book_id, f"[SAVE] Saved chapter {chapter_num} → {path}") inc_completed(book_id) # Determine volume name - volume_name = os.path.basename(base_path.rstrip("/")) + volume_name = os.path.basename(volume_path.rstrip("/")) - # Queue audio task (always use the saved file path) + # Queue audio task try: generate_audio.delay( book_id, volume_name, - chapter, - f"Chapter {chapter}", + chapter_num, + chapter_title, path, ) log_msg( - book_id, f"[AUDIO] Task queued for chapter {chapter} in {volume_name}" + book_id, + f"[AUDIO] Task queued for chapter {chapter_num} in {volume_name}", ) except Exception as audio_exc: - log_msg(book_id, f"[AUDIO] ERROR queueing chapter {chapter}: {audio_exc}") + log_msg( + book_id, f"[AUDIO] ERROR queueing chapter {chapter_num}: {audio_exc}" + ) - return {"book_id": book_id, "chapter": chapter, "path": path} + return { + "book_id": book_id, + "chapter": chapter_dict, + "path": path, + "book_meta": book_meta, + } except Exception as exc: - log_msg(book_id, f"[SAVE] ERROR saving chapter {chapter}: {exc}") + log_msg(book_id, f"[SAVE] ERROR saving chapter {chapter_num}: {exc}") + raise