From e0695cf216ff8d7156fad01490a57038e22b5c7f Mon Sep 17 00:00:00 2001 From: "peter.fong" Date: Wed, 3 Dec 2025 09:52:36 +0100 Subject: [PATCH] abort for audiotasks --- bookscraper/.gitignore | 3 + bookscraper/README.md | 3 +- bookscraper/audio_worker_local.py | 65 +++++++ bookscraper/celery_app.py | 8 + bookscraper/docker-compose.yml | 21 +-- bookscraper/docker/Dockerfile.audio | 8 +- bookscraper/scraper/abort.py | 106 ++++++++---- bookscraper/scraper/download_controller.py | 33 +++- bookscraper/scraper/tasks/audio.py | 10 -- bookscraper/scraper/tasks/audio_tasks.py | 183 ++++++++++++++++++++ bookscraper/scraper/tasks/download_tasks.py | 3 + bookscraper/scraper/tasks/parse_tasks.py | 4 + bookscraper/scraper/tasks/pipeline.py | 20 ++- bookscraper/scraper/tasks/progress_tasks.py | 11 +- bookscraper/scraper/tasks/save_tasks.py | 74 ++++++-- bookscraper/scraper/tasks/scraping.py | 11 +- bookscraper/start_audio_worker.sh | 46 +++++ 17 files changed, 506 insertions(+), 103 deletions(-) create mode 100644 bookscraper/audio_worker_local.py delete mode 100644 bookscraper/scraper/tasks/audio.py create mode 100644 bookscraper/scraper/tasks/audio_tasks.py create mode 100755 bookscraper/start_audio_worker.sh diff --git a/bookscraper/.gitignore b/bookscraper/.gitignore index ea1472e..08fedd4 100644 --- a/bookscraper/.gitignore +++ b/bookscraper/.gitignore @@ -1 +1,4 @@ output/ +venv/ +*.log +__pycache__/ \ No newline at end of file diff --git a/bookscraper/README.md b/bookscraper/README.md index d168f97..8cc03d0 100644 --- a/bookscraper/README.md +++ b/bookscraper/README.md @@ -123,7 +123,8 @@ docker run \ bookscraper +``` + docker compose down docker compose build --no-cache docker compose up -``` diff --git a/bookscraper/audio_worker_local.py b/bookscraper/audio_worker_local.py new file mode 100644 index 0000000..4affa93 --- /dev/null +++ b/bookscraper/audio_worker_local.py @@ -0,0 +1,65 @@ +#!/usr/bin/env python3 +""" +Local macOS Audio Worker — runs outside Docker so macOS 'say' works. +""" + +import os +import subprocess +from dotenv import load_dotenv + +BASE_DIR = os.path.dirname(os.path.abspath(__file__)) +ENV_FILE = os.path.join(BASE_DIR, ".env") + +# Load .env if present +if os.path.exists(ENV_FILE): + load_dotenv(ENV_FILE) + print(f"[AUDIO-LOCAL] Loaded .env from {ENV_FILE}") +else: + print("[AUDIO-LOCAL] WARNING: no .env found") + + +def main(): + print("=====================================================") + print(" LOCAL macOS AUDIO WORKER") + print(" Queue : audio") + print(" Voice :", os.getenv("AUDIO_VOICE")) + print(" Rate :", os.getenv("AUDIO_RATE")) + print("=====================================================") + + # ---------------------------------------------------------- + # OVERRIDES: Local Redis instead of Docker internal hostname + # ---------------------------------------------------------- + broker = os.getenv("REDIS_BROKER_LOCAL", "redis://127.0.0.1:6379/0") + backend = os.getenv("REDIS_BACKEND_LOCAL", "redis://127.0.0.1:6379/1") + + os.environ["CELERY_BROKER_URL"] = broker + os.environ["CELERY_RESULT_BACKEND"] = backend + + print(f"[AUDIO-LOCAL] Using Redis broker : {broker}") + print(f"[AUDIO-LOCAL] Using Redis backend: {backend}") + + # ---------------------------------------------------------- + # Celery command + # macOS requires prefork pool, and we use a single-line list. + # ---------------------------------------------------------- + cmd = [ + "celery", + "-A", + "celery_app", + "worker", + "-Q", + "audio", + "-n", + "audio_local@%h", + "-l", + "INFO", + "--pool=prefork", + ] + + print("[AUDIO-LOCAL] Launching Celery via subprocess…") + + subprocess.run(cmd, check=False) + + +if __name__ == "__main__": + main() diff --git a/bookscraper/celery_app.py b/bookscraper/celery_app.py index 2c3fcc4..58a034f 100644 --- a/bookscraper/celery_app.py +++ b/bookscraper/celery_app.py @@ -22,6 +22,10 @@ celery_app = Celery( "scraper.tasks.download_tasks", "scraper.tasks.parse_tasks", "scraper.tasks.save_tasks", + # -------------------------------------------------------- + # AUDIO TASKS (NEW) + # -------------------------------------------------------- + "scraper.tasks.audio_tasks", ], ) @@ -31,6 +35,10 @@ celery_app.conf.task_routes = { "scraper.tasks.download_tasks.*": {"queue": "download"}, "scraper.tasks.parse_tasks.*": {"queue": "parse"}, "scraper.tasks.save_tasks.*": {"queue": "save"}, + # ------------------------------------------------------------ + # AUDIO ROUTING (NEW) + # ------------------------------------------------------------ + "scraper.tasks.audio_tasks.*": {"queue": "audio"}, } # ------------------------------------------------------------ diff --git a/bookscraper/docker-compose.yml b/bookscraper/docker-compose.yml index 770f733..0f607b7 100644 --- a/bookscraper/docker-compose.yml +++ b/bookscraper/docker-compose.yml @@ -5,6 +5,8 @@ services: redis: image: redis:7 container_name: bookscraper_redis + ports: + - "6379:6379" healthcheck: test: ["CMD", "redis-cli", "ping"] interval: 2s @@ -113,25 +115,6 @@ services: command: celery -A celery_app worker -Q save -n save@%h -l INFO restart: "no" - # ---------------------------------------------------------- - # Audio Worker (macOS only) - # ---------------------------------------------------------- - worker_audio: - build: - context: . - dockerfile: docker/Dockerfile.audio - container_name: worker_audio - volumes: - - .:/app - - /Users/peter/Desktop/books:/app/output - depends_on: - redis: - condition: service_healthy - env_file: - - .env - command: celery -A celery_app worker -Q audio -n audio@%h -l INFO - restart: "no" - # ---------------------------------------------------------- # Scraping Worker # ---------------------------------------------------------- diff --git a/bookscraper/docker/Dockerfile.audio b/bookscraper/docker/Dockerfile.audio index c308a7c..22c9056 100644 --- a/bookscraper/docker/Dockerfile.audio +++ b/bookscraper/docker/Dockerfile.audio @@ -1,9 +1,15 @@ FROM python:3.12-slim WORKDIR /app +# Install audio worker dependencies COPY requirements.audio.txt /app/requirements.audio.txt RUN pip install --no-cache-dir -r /app/requirements.audio.txt +# Celery is noodzakelijk voor de worker +RUN pip install --no-cache-dir celery + +# Copy project COPY . /app -CMD ["python3", "-c", "print('audio worker ready')"] +# Start the AUDIO Celery worker +CMD ["celery", "-A", "celery_app", "worker", "-Q", "audio", "-n", "audio@%h", "-l", "INFO"] diff --git a/bookscraper/scraper/abort.py b/bookscraper/scraper/abort.py index d9f7c92..2df7880 100644 --- a/bookscraper/scraper/abort.py +++ b/bookscraper/scraper/abort.py @@ -1,64 +1,106 @@ import os import redis +# GUI log (non-breaking) +from scraper.ui_log import push_ui + # --------------------------------------------------------- -# Redis connection +# Default Redis connection (Docker workers) # --------------------------------------------------------- REDIS_URL = os.getenv("REDIS_URL", "redis://redis:6379/0") r = redis.Redis.from_url(REDIS_URL, decode_responses=True) +# Debug mode (optional) +ABORT_DEBUG = os.getenv("ABORT_DEBUG", "1") == "1" + +# Internal flag to avoid spamming the same message +_seen_debug_keys = set() + # ========================================================= # ABORT FLAG # ========================================================= +def _debug(msg: str): + """Print + GUI log (non-breaking, minimal noise).""" + print(msg) + push_ui(msg) + + def set_abort(book_id: str): - """ - Enable abort mode for this book. - All download tasks that haven't started yet will immediately exit. - """ - r.set(f"abort:{book_id}", "1") + """Enable abort mode for this book.""" + key = f"abort:{book_id}" + r.set(key, "1") + + if ABORT_DEBUG: + _debug(f"[ABORT] SET {key}") def clear_abort(book_id: str): - """ - Clear abort flag so future runs are unaffected. - """ - r.delete(f"abort:{book_id}") + """Clear abort flag.""" + key = f"abort:{book_id}" + r.delete(key) + + if ABORT_DEBUG: + _debug(f"[ABORT] CLEAR {key}") -def abort_requested(book_id: str) -> bool: +def abort_requested(book_id: str, redis_client=None) -> bool: """ - True if abort flag is set for this book. + Return True if abort flag is set. + + redis_client: + - Docker workers → None → use default Redis (r) + - Local macOS audio → passes Redis(host=127.0.0.1) """ - return r.exists(f"abort:{book_id}") == 1 + client = redis_client or r + key = f"abort:{book_id}" + + try: + exists = client.exists(key) + + if ABORT_DEBUG: + # Log once per key + if key not in _seen_debug_keys: + try: + conn = client.connection_pool.connection_kwargs + host = conn.get("host") + port = conn.get("port") + db = conn.get("db") + _debug( + f"[ABORT_DEBUG] first check book_id={book_id} " + f"redis={host}:{port} db={db}" + ) + except Exception: + _debug(f"[ABORT_DEBUG] first check book_id={book_id}") + + _seen_debug_keys.add(key) + + # Only log abort ACTIVE + if exists == 1: + _debug(f"[ABORT] ACTIVE for {book_id}") + + return exists == 1 + + except Exception as e: + if ABORT_DEBUG: + _debug(f"[ABORT_DEBUG] ERROR checking {key}: {e}") + + return False # ========================================================= # PER-CHAPTER STATE # ========================================================= -# We mark a chapter "started" once its download task begins. -# If abort is activated AFTER download start: -# → download must complete -# → parse must complete -# → save must complete -# All subsequent chapters will skip. def mark_chapter_started(book_id: str, chapter_num: int): - """ - Mark this chapter as started. Parse/save will always run after this, - even if abort has been activated afterwards. - """ key = f"started:{book_id}:{chapter_num}" r.set(key, "1") def chapter_started(book_id: str, chapter_num: int) -> bool: - """ - Return True if this chapter has already started downloading. - """ key = f"started:{book_id}:{chapter_num}" return r.exists(key) == 1 @@ -70,13 +112,11 @@ def chapter_started(book_id: str, chapter_num: int) -> bool: def reset_book_state(book_id: str): """ - Optional utility: remove abort flag and all started-chapter markers. - Useful during testing or manual cleanup. + Remove abort flag and all chapter-start markers. """ - # Remove abort flag - r.delete(f"abort:{book_id}") + key = f"abort:{book_id}" + r.delete(key) - # Remove all "started:*" keys for this book pattern = f"started:{book_id}:*" - for key in r.scan_iter(pattern): - r.delete(key) + for k in r.scan_iter(pattern): + r.delete(k) diff --git a/bookscraper/scraper/download_controller.py b/bookscraper/scraper/download_controller.py index aed1b2e..a75fe47 100644 --- a/bookscraper/scraper/download_controller.py +++ b/bookscraper/scraper/download_controller.py @@ -4,16 +4,17 @@ # Build Celery pipelines for all chapters # and pass book_id for abort/progress/log functionality. # + Download and replicate cover image to all volume folders -# + Generate scripts (allinone.txt, makebook.txt, say.txt) +# + Generate scripts (allinone.txt, makebook, say) # ========================================================= from celery import group from scraper.tasks.pipeline import build_chapter_pipeline -from scraper.scriptgen import generate_all_scripts # <-- ADDED +from scraper.scriptgen import generate_all_scripts from logbus.publisher import log import os import requests import shutil +from scraper.abort import abort_requested # DEBUG allowed class DownloadController: @@ -24,7 +25,7 @@ class DownloadController: - consistent meta propagation - book_id-based abort + progress tracking - cover download + volume replication - - script generation (allinone.txt, makebook.txt, say.txt) + - script generation (allinone.txt, makebook, say) """ def __init__(self, book_id: str, scrape_result: dict): @@ -54,6 +55,17 @@ class DownloadController: "book_url": scrape_result.get("book_url"), } + # ------------------------------------------------- + # DEBUG — bevestig dat controller correct book_id ziet + # ------------------------------------------------- + log(f"[CTRL_DEBUG] Controller init book_id={book_id} title='{self.title}'") + + try: + abort_state = abort_requested(book_id) + log(f"[CTRL_DEBUG] abort_requested(book_id={book_id}) → {abort_state}") + except Exception as e: + log(f"[CTRL_DEBUG] abort_requested ERROR: {e}") + # --------------------------------------------------------- # Cover Download # --------------------------------------------------------- @@ -133,7 +145,7 @@ class DownloadController: log(f"[CTRL] Output root: {self.book_base}") # ------------------------------------- - # 1) Download cover before any pipelines + # 1) Download cover # ------------------------------------- self.download_cover() @@ -147,7 +159,7 @@ class DownloadController: tasks.append( build_chapter_pipeline( - self.book_id, # UUID + self.book_id, chapter_num, chapter_url, volume_path, @@ -162,13 +174,16 @@ class DownloadController: f"(book_id={self.book_id}, group_id={async_result.id})" ) - # ------------------------------------------------------- - # 2) AFTER dispatch: cover replication to volume folders + # Debug abort state + try: + abort_state = abort_requested(self.book_id) + log(f"[CTRL_DEBUG] After-dispatch abort state: {abort_state}") + except Exception as e: + log(f"[CTRL_DEBUG] abort_requested error after dispatch: {e}") + # ------------------------------------------------------- self.replicate_cover_to_volumes() - # ------------------------------------------------------- - # 3) Generate scripts (allinone, makebook, say) # ------------------------------------------------------- try: generate_all_scripts( diff --git a/bookscraper/scraper/tasks/audio.py b/bookscraper/scraper/tasks/audio.py deleted file mode 100644 index e3ee1ab..0000000 --- a/bookscraper/scraper/tasks/audio.py +++ /dev/null @@ -1,10 +0,0 @@ -# tasks/audio.py -from celery import shared_task -from logbus.publisher import log - - -@shared_task(bind=True, queue="audio") -def text_to_audio(self, text_file): - log(f"[AUDIO] converting: {text_file}") - # placeholder for macOS "say" - return True diff --git a/bookscraper/scraper/tasks/audio_tasks.py b/bookscraper/scraper/tasks/audio_tasks.py new file mode 100644 index 0000000..c1a3ff0 --- /dev/null +++ b/bookscraper/scraper/tasks/audio_tasks.py @@ -0,0 +1,183 @@ +# ============================================================ +# File: scraper/tasks/audio_tasks.py +# ============================================================ + +from celery_app import celery_app +from logbus.publisher import log +import os +import subprocess +import time + +from scraper.abort import abort_requested +from redis import Redis +from urllib.parse import urlparse + +# Kies lokale redis als aanwezig, anders standaard backend +redis_url = os.getenv("REDIS_BACKEND_LOCAL") or os.getenv("REDIS_BACKEND") + +parsed = urlparse(redis_url) + +# ------------------------------------------------------------ +# REGULIER REDIS CLIENT (slots, file checks, state) +# ------------------------------------------------------------ +redis_client = Redis( + host=parsed.hostname, + port=parsed.port, + db=parsed.path.strip("/"), +) + +# ------------------------------------------------------------ +# BACKEND CLIENT (abort flags, progress counters) - altijd DB 0 +# ------------------------------------------------------------ +backend_client = Redis( + host=parsed.hostname, + port=parsed.port, + db=0, +) + +AUDIO_TIMEOUT = int(os.getenv("AUDIO_TIMEOUT_SECONDS", "300")) +AUDIO_VOICE = os.getenv("AUDIO_VOICE", "SinJi") +AUDIO_RATE = int(os.getenv("AUDIO_RATE", "200")) +HOST_PATH = os.getenv("HOST_PATH", "/app/output") +AUDIO_SLOTS = int(os.getenv("AUDIO_SLOTS", "1")) + +CONTAINER_PREFIX = os.getenv("BOOKSCRAPER_OUTPUT_DIR", "/app/output") + + +@celery_app.task(bind=True, queue="audio", ignore_result=True) +def generate_audio( + self, book_id, volume_name, chapter_number, chapter_title, chapter_text +): + log(f"[AUDIO] CH{chapter_number}: START task → raw_input={chapter_text}") + + # Abort early + if abort_requested(book_id, backend_client): + log(f"[AUDIO] ABORT detected → skip CH{chapter_number}") + return + + # ============================================================ + # ACQUIRE AUDIO SLOT + # ============================================================ + slot_key = None + ttl = AUDIO_TIMEOUT + 15 + + for i in range(1, AUDIO_SLOTS + 1): + key = f"audio_slot:{i}" + if redis_client.set(key, "1", nx=True, ex=ttl): + slot_key = key + log(f"[AUDIO] CH{chapter_number}: Acquired slot {i}/{AUDIO_SLOTS}") + break + + if slot_key is None: + log(f"[AUDIO] CH{chapter_number}: All slots busy → waiting...") + start_wait = time.time() + + while slot_key is None: + for i in range(1, AUDIO_SLOTS + 1): + key = f"audio_slot:{i}" + if redis_client.set(key, "1", nx=True, ex=ttl): + slot_key = key + log(f"[AUDIO] CH{chapter_number}: Slot acquired after wait") + break + + if slot_key: + break + + if abort_requested(book_id, backend_client): + log(f"[AUDIO] ABORT while waiting → skip CH{chapter_number}") + return + + if time.time() - start_wait > ttl: + log(f"[AUDIO] CH{chapter_number}: Slot wait timeout → aborting audio") + return + + time.sleep(0.25) + + # ============================================================ + # PATH NORMALISATION + # ============================================================ + + container_path = chapter_text + log(f"[AUDIO] CH{chapter_number}: container_path={container_path}") + + # 1) Strip container prefix to get relative path: BOOK/VOLUME/FILE + if container_path.startswith(CONTAINER_PREFIX): + relative_path = container_path[len(CONTAINER_PREFIX) :].lstrip("/") + else: + relative_path = container_path # fallback + + parts = relative_path.split("/") + if len(parts) < 3: + log( + f"[AUDIO] CH{chapter_number}: FATAL — cannot parse book/volume from {relative_path}" + ) + if slot_key: + redis_client.delete(slot_key) + return + + book_from_path = parts[0] + volume_from_path = parts[1] + + # 2) Construct real host path + host_path = os.path.join(HOST_PATH, relative_path) + log(f"[AUDIO] CH{chapter_number}: resolved_host_path={host_path}") + + # ============================================================ + # PREPARE OUTPUT DIR (always correct) + # ============================================================ + + base_dir = os.path.join(HOST_PATH, book_from_path, volume_from_path, "Audio") + os.makedirs(base_dir, exist_ok=True) + + safe_num = f"{chapter_number:04d}" + audio_file = os.path.join(base_dir, f"{safe_num}.m4a") + + log(f"[AUDIO] CH{chapter_number}: output_file={audio_file}") + + if os.path.exists(audio_file): + log(f"[AUDIO] Skip CH{chapter_number} → already exists") + redis_client.delete(slot_key) + return + + # ============================================================ + # BUILD CMD + # ============================================================ + + cmd = ( + f"say --voice={AUDIO_VOICE} " + f"--input-file='{host_path}' " + f"--output-file='{audio_file}' " + f"--file-format=m4bf " + f"--quality=127 " + f"-r {AUDIO_RATE} " + f"--data-format=aac" + ) + + log(f"[AUDIO] CH{chapter_number}: CMD = {cmd}") + + # ============================================================ + # RUN TTS + # ============================================================ + + try: + subprocess.run(cmd, shell=True, check=True, timeout=AUDIO_TIMEOUT) + log(f"[AUDIO] CH{chapter_number}: Completed") + + except subprocess.TimeoutExpired: + log(f"[AUDIO] CH{chapter_number}: TIMEOUT → remove incomplete file") + if os.path.exists(audio_file): + try: + os.remove(audio_file) + except Exception: + pass + + except subprocess.CalledProcessError as e: + log(f"[AUDIO] CH{chapter_number}: ERROR during say → {e}") + + except Exception as e: + log(f"[AUDIO] CH{chapter_number}: UNEXPECTED ERROR → {e}") + + finally: + if slot_key: + redis_client.delete(slot_key) + log(f"[AUDIO] CH{chapter_number}: Released slot") diff --git a/bookscraper/scraper/tasks/download_tasks.py b/bookscraper/scraper/tasks/download_tasks.py index 71a6da4..5110483 100644 --- a/bookscraper/scraper/tasks/download_tasks.py +++ b/bookscraper/scraper/tasks/download_tasks.py @@ -127,6 +127,7 @@ def download_chapter( msg = f"[ABORT] Skip chapter {chapter_num} (abort active, not started)" log_msg(book_id, msg) return { + "book_id": book_id, "chapter": chapter_num, "url": chapter_url, "html": None, @@ -146,6 +147,7 @@ def download_chapter( if os.path.exists(save_path): log_msg(book_id, f"[DL] SKIP {chapter_num} (exists) → {save_path}") return { + "book_id": book_id, "chapter": chapter_num, "url": chapter_url, "html": None, @@ -185,6 +187,7 @@ def download_chapter( log_msg(book_id, f"[DL] OK {chapter_num}: {len(html)} bytes") return { + "book_id": book_id, "chapter": chapter_num, "url": chapter_url, "html": html, diff --git a/bookscraper/scraper/tasks/parse_tasks.py b/bookscraper/scraper/tasks/parse_tasks.py index f49c356..ddea90e 100644 --- a/bookscraper/scraper/tasks/parse_tasks.py +++ b/bookscraper/scraper/tasks/parse_tasks.py @@ -37,6 +37,10 @@ def parse_chapter(self, download_result: dict, meta: dict): if download_result.get("skipped"): chapter = download_result.get("chapter") log_msg(book_id, f"[PARSE] SKIP chapter {chapter} (download skipped)") + + # Ensure book_id is present in the returned dict + download_result["book_id"] = book_id + return download_result # ------------------------------------------------------------ diff --git a/bookscraper/scraper/tasks/pipeline.py b/bookscraper/scraper/tasks/pipeline.py index 9396665..9da657e 100644 --- a/bookscraper/scraper/tasks/pipeline.py +++ b/bookscraper/scraper/tasks/pipeline.py @@ -3,8 +3,14 @@ # Purpose: # Build Celery chains for chapter processing. # -# download → parse → save → update_progress +# Chain: +# download_chapter(book_id, chapter_num, url, base_path) +# → parse_chapter(download_result, meta) +# → save_chapter(parsed_result, base_path) +# → update_progress(final_result, book_id) # +# All subtasks must pass through result dicts untouched so the +# next stage receives the correct fields. # ========================================================= from celery import chain @@ -12,7 +18,7 @@ from celery import chain from scraper.tasks.download_tasks import download_chapter from scraper.tasks.parse_tasks import parse_chapter from scraper.tasks.save_tasks import save_chapter -from scraper.tasks.progress_tasks import update_progress # NEW +from scraper.tasks.progress_tasks import update_progress def build_chapter_pipeline( @@ -23,17 +29,17 @@ def build_chapter_pipeline( meta: dict, ): """ - Chapter pipeline: + Build a Celery chain for one chapter. - download_chapter(book_id, chapter_num, url, base_path) + download_chapter(book_id, chapter_number, chapter_url, base_path) → parse_chapter(download_result, meta) - → save_chapter(parsed_result, base_path) - → update_progress(result, book_id) + → save_chapter(parsed_result, base_path) + → update_progress(result, book_id) """ return chain( download_chapter.s(book_id, chapter_number, chapter_url, base_path), parse_chapter.s(meta), save_chapter.s(base_path), - update_progress.s(book_id), # ← centrale progress update + update_progress.s(book_id), ) diff --git a/bookscraper/scraper/tasks/progress_tasks.py b/bookscraper/scraper/tasks/progress_tasks.py index 2466150..9045fab 100644 --- a/bookscraper/scraper/tasks/progress_tasks.py +++ b/bookscraper/scraper/tasks/progress_tasks.py @@ -16,6 +16,10 @@ def update_progress(result: dict, book_id: str): Central progress logic: - result: output of save_chapter - book_id: explicitly passed by pipeline + + IMPORTANT: + - save_chapter already updates counters for skipped & normal chapters + - progress.update MUST NOT double-increment """ ch = result.get("chapter") @@ -25,11 +29,14 @@ def update_progress(result: dict, book_id: str): if failed: inc_failed(book_id) log(f"[PROG] FAILED chapter {ch}") + elif skipped: - inc_skipped(book_id) - inc_completed(book_id) + # save_chapter already did: + # inc_skipped + inc_completed log(f"[PROG] SKIPPED chapter {ch}") + else: + # Normal completion: save_chapter only does inc_completed inc_completed(book_id) log(f"[PROG] DONE chapter {ch}") diff --git a/bookscraper/scraper/tasks/save_tasks.py b/bookscraper/scraper/tasks/save_tasks.py index 852b674..8aa0578 100644 --- a/bookscraper/scraper/tasks/save_tasks.py +++ b/bookscraper/scraper/tasks/save_tasks.py @@ -1,7 +1,7 @@ -# ========================================================= +# ============================================================ # File: scraper/tasks/save_tasks.py -# Purpose: Save parsed chapter text to disk. -# ========================================================= +# Purpose: Save parsed chapter text to disk + trigger audio. +# ============================================================ print(">>> [IMPORT] save_tasks.py loaded") @@ -14,9 +14,11 @@ from scraper.progress import ( inc_completed, inc_skipped, inc_failed, - add_failed_chapter, # <-- enige noodzakelijke aanvulling + add_failed_chapter, ) +from scraper.tasks.audio_tasks import generate_audio + @shared_task(bind=True, queue="save", ignore_result=False) def save_chapter(self, parsed: dict, base_path: str): @@ -37,32 +39,59 @@ def save_chapter(self, parsed: dict, base_path: str): chapter = parsed.get("chapter") # ------------------------------------------------------------ - # SKIP CASE (from download or parse stage) + # SKIP CASE (download or parse skipped the chapter) # ------------------------------------------------------------ if parsed.get("skipped"): path = parsed.get("path", "(no-path)") log_msg(book_id, f"[SAVE] SKIP chapter {chapter} → {path}") inc_skipped(book_id) - return {"chapter": chapter, "path": path, "skipped": True} + + # Determine volume name from the base path + volume_name = os.path.basename(base_path.rstrip("/")) + + # Queue audio using the existing saved file + try: + generate_audio.delay( + book_id, + volume_name, + chapter, + f"Chapter {chapter}", + path, # <<-- correct: this is always the real file path + ) + log_msg( + book_id, + f"[AUDIO] Task queued (SKIPPED) for chapter {chapter} in {volume_name}", + ) + except Exception as audio_exc: + log_msg( + book_id, + f"[AUDIO] ERROR queueing (SKIPPED) chapter {chapter}: {audio_exc}", + ) + + return { + "book_id": book_id, # <<< FIXED + "chapter": chapter, + "path": path, + "skipped": True, + } # ------------------------------------------------------------ - # NORMAL SAVE + # NORMAL SAVE CASE # ------------------------------------------------------------ try: text = parsed.get("text", "") - url = parsed.get("url") if chapter is None: raise ValueError("Missing chapter number in parsed payload") - # Ensure folder exists + # Ensure chapter folder exists os.makedirs(base_path, exist_ok=True) - # Build file path + # Build chapter file path path = get_save_path(chapter, base_path) - # Write chapter text + # Save chapter text to disk with open(path, "w", encoding="utf-8") as f: f.write(text) @@ -70,12 +99,25 @@ def save_chapter(self, parsed: dict, base_path: str): inc_completed(book_id) + # Determine volume name + volume_name = os.path.basename(base_path.rstrip("/")) + + # Queue audio task (always use the saved file path) + try: + generate_audio.delay( + book_id, + volume_name, + chapter, + f"Chapter {chapter}", + path, + ) + log_msg( + book_id, f"[AUDIO] Task queued for chapter {chapter} in {volume_name}" + ) + except Exception as audio_exc: + log_msg(book_id, f"[AUDIO] ERROR queueing chapter {chapter}: {audio_exc}") + return {"book_id": book_id, "chapter": chapter, "path": path} except Exception as exc: log_msg(book_id, f"[SAVE] ERROR saving chapter {chapter}: {exc}") - - inc_failed(book_id) - add_failed_chapter(book_id, chapter, str(exc)) # <-- essentieel - - raise diff --git a/bookscraper/scraper/tasks/scraping.py b/bookscraper/scraper/tasks/scraping.py index 04865df..8b0b9fe 100644 --- a/bookscraper/scraper/tasks/scraping.py +++ b/bookscraper/scraper/tasks/scraping.py @@ -7,7 +7,6 @@ from celery_app import celery_app from logbus.publisher import log import os -import uuid import redis from scraper.sites import BookSite @@ -51,12 +50,14 @@ def start_scrape_book(self, url: str): log(f"[SCRAPING] Completed scrape: {len(chapters)}/{full_count} chapters") # ------------------------------------------------------------ - # BOOK RUN ID + # BOOK RUN ID (CHANGED: use book title instead of UUID) # ------------------------------------------------------------ - book_id = str(uuid.uuid4()) + title = result.get("title") or "UnknownBook" + book_id = title # ← your requirement: title is unique and consistent + result["book_id"] = book_id - log(f"[SCRAPING] Assigned book_id = {book_id}") + log(f"[SCRAPING] Assigned book_id = '{book_id}'") # ------------------------------------------------------------ # RESET ABORT + INITIALISE PROGRESS @@ -80,7 +81,7 @@ def start_scrape_book(self, url: str): queue="controller", ) - log(f"[SCRAPING] Dispatched download controller for {book_id}") + log(f"[SCRAPING] Dispatched download controller for '{book_id}'") return { "book_id": book_id, diff --git a/bookscraper/start_audio_worker.sh b/bookscraper/start_audio_worker.sh new file mode 100755 index 0000000..62acf21 --- /dev/null +++ b/bookscraper/start_audio_worker.sh @@ -0,0 +1,46 @@ +#!/bin/bash +set -e + +echo "" +echo "=====================================================" +echo " STARTING LOCAL macOS AUDIO WORKER" +echo "=====================================================" +echo "" + +# ------------------------------------------------------ +# Create venv if needed +# ------------------------------------------------------ +if [ ! -d ".venv" ]; then + echo "[AUDIO] No .venv found — creating virtualenv..." + python3 -m venv .venv +else + echo "[AUDIO] Existing .venv found" +fi + +# Activate virtualenv +echo "[AUDIO] Activating .venv" +source .venv/bin/activate + +# ------------------------------------------------------ +# Install requirements +# ------------------------------------------------------ +REQ="requirements.audio.txt" + +if [ ! -f "$REQ" ]; then + echo "[AUDIO] ERROR — $REQ not found!" + exit 1 +fi + +echo "[AUDIO] Installing audio requirements..." +pip install -r "$REQ" + +# Celery must be installed locally too +echo "[AUDIO] Ensuring Celery installed..." +pip install celery + +# ------------------------------------------------------ +# Start the worker +# ------------------------------------------------------ +echo "" +echo "[AUDIO] Starting audio worker..." +python3 audio_worker_local.py