diff --git a/bookscraper/README.md b/bookscraper/README.md index e53e997..9fdb189 100644 --- a/bookscraper/README.md +++ b/bookscraper/README.md @@ -125,7 +125,8 @@ docker run \ ``` -docker compose down +docker compose down --remove-orphans +docker image prune -f docker builder prune -af docker volume prune -f docker compose build --no-cache @@ -141,6 +142,10 @@ docker compose build --no-cache web && docker compose up web docker compose build worker_download && docker compose up worker_download +docker compose down --remove-orphans +docker compose build --no-cache worker_m4b +docker compose up -d worker_m4b + docker compose up web docker compose build web docker compose restart web @@ -152,3 +157,9 @@ tar \ --exclude=".venv" \ --exclude="venv" \ -czvf project.tar.gz . + +docker compose down +docker image rm bookscraper-worker_m4b || true +docker builder prune -af +docker compose build --no-cache worker_m4b +docker compose up -d worker_m4b diff --git a/bookscraper/db/repository.py b/bookscraper/db/repository.py index b58b7bb..83e6f77 100644 --- a/bookscraper/db/repository.py +++ b/bookscraper/db/repository.py @@ -9,7 +9,7 @@ # - Provide a clean API for tasks and Flask UI # ============================================================ # ============================================================ -# File: db/repository.py (UPDATED for book_idx-only architecture) +# UPDATED — canonical read model via get_book_state # ============================================================ from scraper.logger_decorators import logcall @@ -17,7 +17,6 @@ from logbus.publisher import log import redis import os -import time # ============================================================ # SQL low-level engines (snapshot storage) @@ -29,10 +28,6 @@ from db.state_sql import ( sql_set_chapters_total, sql_register_book, sql_update_book, - sql_inc_downloaded, - sql_inc_parsed, - sql_inc_audio_done, - sql_inc_audio_skipped, ) # ============================================================ @@ -49,80 +44,34 @@ from db.state_redis import ( ) # ============================================================ -# Redis setup for legacy progress paths +# Redis client (read-only for legacy + guards) # ============================================================ REDIS_URL = os.getenv("REDIS_BROKER", "redis://redis:6379/0") _r = redis.Redis.from_url(REDIS_URL, decode_responses=True) # ============================================================ -# INTERNAL — LEGACY PROGRESS HELPERS (kept for UI) -# Keys remain: progress:{book_idx}:* +# LEGACY PROGRESS (UI only, unchanged) # ============================================================ -def _legacy_set_total(book_idx, total): - _r.set(f"progress:{book_idx}:total", total) - - -def _legacy_inc_completed(book_idx): - _r.incr(f"progress:{book_idx}:completed") - - -def _legacy_inc_skipped(book_idx): - _r.incr(f"progress:{book_idx}:skipped") - - -def _legacy_inc_failed(book_idx): - _r.incr(f"progress:{book_idx}:failed") - - -def _legacy_add_failed_chapter(book_idx, chapter, reason): - entry = f"Chapter {chapter}: {reason}" - _r.rpush(f"progress:{book_idx}:failed_list", entry) - - -def _legacy_get_failed_list(book_idx): - return _r.lrange(f"progress:{book_idx}:failed_list", 0, -1) - - def _legacy_get_progress(book_idx): - total = int(_r.get(f"progress:{book_idx}:total") or 0) - completed = int(_r.get(f"progress:{book_idx}:completed") or 0) - skipped = int(_r.get(f"progress:{book_idx}:skipped") or 0) - failed = int(_r.get(f"progress:{book_idx}:failed") or 0) - abort = _r.exists(f"abort:{book_idx}") == 1 - failed_list = _legacy_get_failed_list(book_idx) - return { "book_idx": book_idx, - "total": total, - "completed": completed, - "skipped": skipped, - "failed": failed, - "failed_list": failed_list, - "abort": abort, + "total": int(_r.get(f"progress:{book_idx}:total") or 0), + "completed": int(_r.get(f"progress:{book_idx}:completed") or 0), + "skipped": int(_r.get(f"progress:{book_idx}:skipped") or 0), + "failed": int(_r.get(f"progress:{book_idx}:failed") or 0), + "abort": _r.exists(f"abort:{book_idx}") == 1, + "failed_list": _r.lrange(f"progress:{book_idx}:failed_list", 0, -1), } -# ============================================================ -# PUBLIC — PROGRESS API -# ============================================================ @logcall def get_progress(book_idx): return _legacy_get_progress(book_idx) -@logcall -def add_failed_chapter(book_idx, chapter, reason): - _legacy_add_failed_chapter(book_idx, chapter, reason) - - -@logcall -def get_failed_list(book_idx): - return _legacy_get_failed_list(book_idx) - - # ============================================================ -# FETCH OPERATIONS (SQLite snapshot) +# FETCH (SQLite snapshot) # ============================================================ @logcall def fetch_book(book_idx): @@ -135,7 +84,7 @@ def fetch_all_books(): # ============================================================ -# INIT-FLOW (SQLite metadata only) +# INIT / UPDATE METADATA # ============================================================ @logcall def register_book( @@ -147,26 +96,22 @@ def register_book( cover_path=None, book_url=None, ): - - fields = { - "book_idx": book_idx, - "title": title, - "author": author, - "description": description, - "cover_url": cover_url, - "cover_path": cover_path, - "book_url": book_url, - "chapters_total": 0, - "status": "registered", - } - - log(f"[DB] Registering new book_idx={book_idx} title='{title}'") - sql_register_book(book_idx, fields) + sql_register_book( + book_idx, + { + "book_idx": book_idx, + "title": title, + "author": author, + "description": description, + "cover_url": cover_url, + "cover_path": cover_path, + "book_url": book_url, + "chapters_total": 0, + "status": "registered", + }, + ) -# ============================================================ -# SCRAPE-FLOW UPDATE -# ============================================================ @logcall def update_book_after_full_scrape( book_idx, @@ -176,9 +121,7 @@ def update_book_after_full_scrape( cover_url=None, chapters_total=None, ): - fields = {} - if title is not None: fields["title"] = title if author is not None: @@ -191,143 +134,72 @@ def update_book_after_full_scrape( fields["chapters_total"] = chapters_total fields["status"] = "active" - - log(f"[DB] update metadata for book_idx={book_idx}") sql_update_book(book_idx, fields) # ============================================================ -# ACTIVE BOOK LISTS -# ============================================================ -@logcall -def get_registered_books(): - all_books = sql_fetch_all_books() - HIDDEN_STATES = {"hidden"} - log(f"[DB] Fetched all books for registered filter, total={len(all_books)}") - return [b for b in all_books if b.get("status") not in HIDDEN_STATES] - - -@logcall -def get_active_books(): - all_books = sql_fetch_all_books() - - HIDDEN_STATES = {"hidden", "done"} - log(f"[DB] Fetched all books for active filter, total={len(all_books)}") - return [b for b in all_books if b.get("status") not in HIDDEN_STATES] - - -# ============================================================ -# STATUS MANAGEMENT +# STATUS # ============================================================ @logcall def set_status(book_idx, status): - log(f"[DB] Setting status for {book_idx} to '{status}'") redis_set_status(book_idx, status) sql_set_status(book_idx, status) # ============================================================ -# CHAPTER TOTALS +# TOTALS # ============================================================ @logcall def set_chapters_total(book_idx, total): - log(f"[DB] Setting chapter total for {book_idx} to {total}") redis_set_chapters_total(book_idx, total) sql_set_chapters_total(book_idx, total) - # _legacy_set_total(book_idx, total) # ============================================================ -# COUNTERS — DOWNLOAD +# COUNTERS — WRITE ONLY # ============================================================ @logcall def inc_download_done(book_idx, amount=1): - log(f"[DB] Incrementing download done for {book_idx} by {amount}") redis_inc_download_done(book_idx, amount) - # sql_inc_downloaded(book_idx, amount) - # _legacy_inc_completed(book_idx) @logcall def inc_download_skipped(book_idx, amount=1): - log(f"[DB] Incrementing download skipped for {book_idx} by {amount}") redis_inc_download_skipped(book_idx, amount) - # _legacy_inc_skipped(book_idx) -# ============================================================ -# COUNTERS — PARSE -# ============================================================ @logcall def inc_parsed_done(book_idx, amount=1): - log(f"[DB] Incrementing parsed done for {book_idx} by {amount}") redis_inc_parsed_done(book_idx, amount) - # sql_inc_parsed(book_idx, amount) - - -# ============================================================ -# COUNTERS — AUDIO -# ============================================================ -@logcall -def inc_audio_skipped(book_idx, amount=1): - log(f"[DB] Incrementing audio skipped for {book_idx} by {amount}") - # sql_inc_audio_skipped(book_idx, amount) - redis_inc_audio_skipped(book_idx, amount) @logcall def inc_audio_done(book_idx, amount=1): - log(f"[DB] Incrementing audio done for {book_idx} by {amount}") redis_inc_audio_done(book_idx, amount) - # sql_inc_audio_done(book_idx, amount) -# ============================================================ -# BACKWARDS COMPATIBILITY SHIMS -# These map the old API (book_id) to the new book_idx-only system -# ============================================================ @logcall -def inc_downloaded(book_idx, amount=1): - return inc_download_done(book_idx, amount) - - -@logcall -def inc_parsed(book_idx, amount=1): - return inc_parsed_done(book_idx, amount) - - -@logcall -def inc_audio_done_legacy(book_idx, amount=1): - return inc_audio_done(book_idx, amount) +def inc_audio_skipped(book_idx, amount=1): + redis_inc_audio_skipped(book_idx, amount) # ============================================================ -# READ — DERIVED BOOK STATE +# CANONICAL READ MODEL # ============================================================ @logcall def get_book_state(book_idx): """ - Canonical merged read-model for a single book. + Canonical merged read model. - Gedrag: - - Leest SQL (snapshot) - - Leest Redis (live counters) - - Rekent naar merged - - GEEN writes - - GEEN side-effects - - Merge-regels: + Rules: + - SQL = snapshot baseline + - Redis = live counters - merged = max(sql, redis) - - merged wordt gecapt op chapters_total + - capped at chapters_total """ - # ---------------------------------------------------- - # 1. Fetch bronnen - # ---------------------------------------------------- sqlite_row = sql_fetch_book(book_idx) or {} - - key = f"book:{book_idx}:state" - redis_state = _r.hgetall(key) or {} + redis_state = _r.hgetall(f"book:{book_idx}:state") or {} def _int(v): try: @@ -335,56 +207,114 @@ def get_book_state(book_idx): except Exception: return 0 - # ---------------------------------------------------- - # 2. SQL snapshot - # ---------------------------------------------------- chapters_total = _int(sqlite_row.get("chapters_total")) + + # SQL snapshot sql_downloaded = _int(sqlite_row.get("downloaded")) sql_audio_done = _int(sqlite_row.get("audio_done")) + sql_audio_skipped = _int(sqlite_row.get("audio_skipped")) - # ---------------------------------------------------- - # 3. Redis live counters - # ---------------------------------------------------- + # Redis live redis_downloaded = _int(redis_state.get("chapters_download_done")) + _int( redis_state.get("chapters_download_skipped") ) - redis_audio_done = _int(redis_state.get("audio_done")) + redis_audio_skipped = _int(redis_state.get("audio_skipped")) - # ---------------------------------------------------- - # 4. Merge (SQL vs Redis) - # ---------------------------------------------------- + # Merge merged_downloaded = max(sql_downloaded, redis_downloaded) merged_audio_done = max(sql_audio_done, redis_audio_done) + merged_audio_skipped = max(sql_audio_skipped, redis_audio_skipped) if chapters_total > 0: merged_downloaded = min(merged_downloaded, chapters_total) merged_audio_done = min(merged_audio_done, chapters_total) + merged_audio_skipped = min(merged_audio_skipped, chapters_total) + + audio_completed = merged_audio_done + merged_audio_skipped + + # Build state + state = dict(sqlite_row) + state.update( + { + "downloaded": merged_downloaded, + "audio_done": merged_audio_done, + "audio_skipped": merged_audio_skipped, + "chapters_total": chapters_total, + } + ) - # ---------------------------------------------------- - # 5. Bouw merged state (read-only) - # ---------------------------------------------------- - state = {} - - # Basis = SQL - state.update(sqlite_row) - - # Overschrijf alleen met merged conclusies - state["downloaded"] = merged_downloaded - state["audio_done"] = merged_audio_done - state["chapters_total"] = chapters_total - # ---------------------------------------------------- - # 4b. Derive status (READ-ONLY) - # ---------------------------------------------------- - derived_status = sqlite_row.get("status") or "unknown" - + # Derived status + status = sqlite_row.get("status") or "unknown" if chapters_total > 0: if merged_downloaded < chapters_total: - derived_status = "downloading" - elif merged_downloaded == chapters_total and merged_audio_done < chapters_total: - derived_status = "audio" - elif merged_audio_done == chapters_total: - derived_status = "done" - state["status"] = derived_status + status = "downloading" + elif merged_downloaded == chapters_total and audio_completed < chapters_total: + status = "audio" + elif audio_completed >= chapters_total: + status = "done" + state["status"] = status return state + + +# ============================================================ +# READ HELPERS (VIA get_book_state ONLY) +# ============================================================ +@logcall +def get_chapters_total(book_idx): + return int(get_book_state(book_idx).get("chapters_total", 0)) + + +@logcall +def get_audio_done(book_idx): + return int(get_book_state(book_idx).get("audio_done", 0)) + + +@logcall +def get_audio_completed_total(book_idx): + state = get_book_state(book_idx) + return int(state.get("audio_done", 0)) + int(state.get("audio_skipped", 0)) + + +# ============================================================ +# STATUSCHECK GUARD (INTENTIONAL DIRECT REDIS) +# ============================================================ +@logcall +def try_trigger_statuscheck(book_idx): + return bool(_r.set(f"book:{book_idx}:statuscheck:triggered", "1", nx=True)) + + +# ============================================================ +# ACTIVE / REGISTERED BOOK LISTS (UI API) +# ============================================================ +@logcall +def get_registered_books(): + """ + Books visible in the 'registered' list in the UI. + """ + all_books = sql_fetch_all_books() + HIDDEN_STATES = {"hidden"} + return [b for b in all_books if b.get("status") not in HIDDEN_STATES] + + +@logcall +def get_active_books(): + """ + Books currently active in the dashboard. + """ + all_books = sql_fetch_all_books() + HIDDEN_STATES = {"hidden", "done"} + return [b for b in all_books if b.get("status") not in HIDDEN_STATES] + + +@logcall +def store_m4b_error(book_idx: str, volume: str, error_text: str): + """ + Passive storage of m4b errors. + No logic, no retries, no state transitions. + """ + key = f"book:{book_idx}:m4b:errors" + entry = f"{volume}: {error_text}" + + _r.rpush(key, entry) diff --git a/bookscraper/docker-compose.yml b/bookscraper/docker-compose.yml index 5c6b8c8..c0a4df7 100644 --- a/bookscraper/docker-compose.yml +++ b/bookscraper/docker-compose.yml @@ -149,3 +149,22 @@ services: - .env command: celery -A celery_app worker -Q scraping -n scraping@%h -l INFO restart: "no" + # ---------------------------------------------------------- + # M4B Worker (Finalization) + # ---------------------------------------------------------- + worker_m4b: + build: + context: . + dockerfile: docker/Dockerfile.m4b + container_name: worker_m4b + command: celery -A celery_app worker -Q m4b -n m4b@%h -l INFO + depends_on: + redis: + condition: service_healthy + env_file: + - .env + volumes: + - .:/app + - /Users/peter/mnt/asustor/Sync/bookscraper/books:/Users/peter/mnt/asustor/Sync/bookscraper/books + - /Users/peter/mnt/asustor/Sync/bookscraper/db:/Users/peter/mnt/asustor/Sync/bookscraper/db + restart: "no" diff --git a/bookscraper/docker/Dockerfile.m4b b/bookscraper/docker/Dockerfile.m4b new file mode 100644 index 0000000..b2eecf6 --- /dev/null +++ b/bookscraper/docker/Dockerfile.m4b @@ -0,0 +1,70 @@ +FROM debian:12 + +ENV DEBIAN_FRONTEND=noninteractive + +# ---------------------------------------------------------- +# System + PHP (PHP 8.2 native) +# ---------------------------------------------------------- +RUN apt-get update && apt-get install -y \ + ffmpeg \ + curl \ + ca-certificates \ + bash \ + php-cli \ + php-intl \ + php-json \ + php-mbstring \ + php-xml \ + php-curl \ + php-zip \ + python3 \ + python3-pip \ + python3-venv \ + \ + # build deps for mp4v2 + git \ + build-essential \ + autoconf \ + automake \ + libtool \ + pkg-config \ + && rm -rf /var/lib/apt/lists/* + +# ---------------------------------------------------------- +# Python venv (PEP 668 compliant) +# ---------------------------------------------------------- +RUN python3 -m venv /opt/venv +ENV PATH="/opt/venv/bin:/usr/local/bin:$PATH" + +# ---------------------------------------------------------- +# Build & install mp4v2 (mp4info) +# ---------------------------------------------------------- +WORKDIR /tmp + +RUN git clone https://github.com/sandreas/mp4v2 \ + && cd mp4v2 \ + && ./configure \ + && make -j$(nproc) \ + && make install \ + && echo "/usr/local/lib" > /etc/ld.so.conf.d/mp4v2.conf \ + && ldconfig \ + && cd / \ + && rm -rf /tmp/mp4v2 + +# ---------------------------------------------------------- +# Install m4b-tool +# ---------------------------------------------------------- +RUN curl -L https://github.com/sandreas/m4b-tool/releases/latest/download/m4b-tool.phar \ + -o /usr/local/bin/m4b-tool \ + && chmod +x /usr/local/bin/m4b-tool + +# ---------------------------------------------------------- +# App +# ---------------------------------------------------------- +WORKDIR /app +COPY requirements.txt /app/requirements.txt +RUN pip install --no-cache-dir -r requirements.txt + +COPY . /app + +CMD ["bash"] diff --git a/bookscraper/mp4v2 b/bookscraper/mp4v2 new file mode 160000 index 0000000..480a733 --- /dev/null +++ b/bookscraper/mp4v2 @@ -0,0 +1 @@ +Subproject commit 480a73324f53d0d24bea4931c3902097f8e2a663 diff --git a/bookscraper/project.zip b/bookscraper/project.zip deleted file mode 100644 index 6da01aa..0000000 Binary files a/bookscraper/project.zip and /dev/null differ diff --git a/bookscraper/scraper/services/audio_completion.py b/bookscraper/scraper/services/audio_completion.py new file mode 100644 index 0000000..f8c0115 --- /dev/null +++ b/bookscraper/scraper/services/audio_completion.py @@ -0,0 +1,94 @@ +# ============================================================ +# File: scraper/services/audio_completion.py +# Purpose: +# Orchestration hook after audio completion. +# +# Rules (STRICT): +# - ALWAYS read via get_book_state() +# - Use ONLY merged counters from repository +# - NO usage of derived status field +# - Completion rule: +# audio_completed < chapters_total → NOT DONE +# ============================================================ + +from logbus.publisher import log +from scraper.logger_decorators import logcall + +from db.repository import ( + get_book_state, + try_trigger_statuscheck, +) + +from scraper.services.status_check_service import StatusCheckService +from scraper.tasks.m4b_tasks import queue_m4b_for_book + + +@logcall +def trigger_audio_completion_check(book_idx: str): + """ + Called after inc_audio_done() OR inc_audio_skipped(). + + Flow: + 1. Fetch canonical merged state from repository + 2. Evaluate completion via merged counters ONLY + 3. Run filesystem validation (authoritative) + 4. Apply idempotency guard + 5. Queue m4b exactly once + """ + + try: + # ---------------------------------------------------- + # STEP 1 — CANONICAL MERGED STATE + # ---------------------------------------------------- + state = get_book_state(book_idx) + + chapters_total = int(state.get("chapters_total", 0)) + audio_done = int(state.get("audio_done", 0)) + audio_skipped = int(state.get("audio_skipped", 0)) + audio_completed = audio_done + audio_skipped + + log( + f"[AUDIO-COMPLETION] book={book_idx} " + f"audio_completed={audio_completed} chapters_total={chapters_total}" + ) + + # ---------------------------------------------------- + # STEP 2 — FAST REJECT (MERGED COUNTERS ONLY) + # ---------------------------------------------------- + if chapters_total <= 0 or audio_completed < chapters_total: + log(f"[AUDIO-COMPLETION] not yet complete for book={book_idx}") + return + + # ---------------------------------------------------- + # STEP 3 — FILESYSTEM VALIDATION (AUTHORITATIVE) + # ---------------------------------------------------- + result = StatusCheckService.run(book_idx) + fs = result.get("filesystem", {}) + + audio_files = fs.get("audio_files", 0) + chapters_txt = fs.get("chapters_txt", 0) + effective_audio = audio_files + audio_skipped + + if effective_audio < chapters_txt: + log( + f"[AUDIO-COMPLETION] FS validation failed " + f"(audio_files={audio_files}, skipped={audio_skipped}, txt={chapters_txt})" + ) + return + + # ---------------------------------------------------- + # STEP 4 — IDEMPOTENCY GUARD (AFTER FS CONFIRMATION) + # ---------------------------------------------------- + if not try_trigger_statuscheck(book_idx): + log(f"[AUDIO-COMPLETION] statuscheck already triggered for {book_idx}") + return + + # ---------------------------------------------------- + # STEP 5 — FINAL ACTION + # ---------------------------------------------------- + log(f"[AUDIO-COMPLETION] DONE → queue m4b for book={book_idx}") + queue_m4b_for_book(book_idx) + + except Exception as exc: + # MUST NEVER break audio workers + log(f"[AUDIO-COMPLETION][ERROR] book={book_idx} error={exc}") diff --git a/bookscraper/scraper/tasks/audio_tasks.py b/bookscraper/scraper/tasks/audio_tasks.py index 82cd692..e04e55a 100644 --- a/bookscraper/scraper/tasks/audio_tasks.py +++ b/bookscraper/scraper/tasks/audio_tasks.py @@ -17,6 +17,7 @@ from scraper.abort import abort_requested from scraper.logger_decorators import logcall from redis import Redis from urllib.parse import urlparse +from scraper.services.audio_completion import trigger_audio_completion_check # NEW — unified repository façade from db.repository import ( @@ -166,6 +167,7 @@ def generate_audio( log(f"[AUDIO] CH{chapter_number}: Already exists → skip") redis_client.delete(slot_key) inc_audio_skipped(book_id) + trigger_audio_completion_check(book_id) return # ------------------------------------------------------------ @@ -191,6 +193,8 @@ def generate_audio( # NEW — repository façade inc_audio_done(book_id) + trigger_audio_completion_check(book_id) + log(f"trigger_audio_completion_check ") log(f"[AUDIO]({HOST}) CH{chapter_number}: Completed") except subprocess.TimeoutExpired: diff --git a/bookscraper/scraper/tasks/m4b_tasks.py b/bookscraper/scraper/tasks/m4b_tasks.py new file mode 100644 index 0000000..e1a1a44 --- /dev/null +++ b/bookscraper/scraper/tasks/m4b_tasks.py @@ -0,0 +1,132 @@ +# ============================================================ +# File: scraper/tasks/m4b_tasks.py +# ============================================================ + +import os +import subprocess +from typing import List + +from celery_app import celery_app +from logbus.publisher import log +from scraper.logger_decorators import logcall + +from db.repository import fetch_book, store_m4b_error +from scraper.scriptgen import build_merge_block + + +# ------------------------------------------------------------ +# Helper: detect volumes (UNCHANGED) +# ------------------------------------------------------------ +def detect_volumes(book_base: str) -> List[str]: + volumes = [] + for name in os.listdir(book_base): + if name.lower().startswith("volume_"): + full = os.path.join(book_base, name) + if os.path.isdir(full): + volumes.append(name) + volumes.sort() + return volumes + + +# ------------------------------------------------------------ +# Celery task +# ------------------------------------------------------------ +@celery_app.task(bind=True, queue="m4b", ignore_result=True) +@logcall +def run_m4btool(self, book_idx: str): + + log(f"[M4B] START book_idx={book_idx}") + + book = fetch_book(book_idx) + if not book: + log(f"[M4B] Book not found in SQL: book_idx={book_idx}") + return + + title = book.get("title", book_idx) + author = book.get("author", "Unknown") + + output_root = os.getenv("BOOKSCRAPER_OUTPUT_DIR", "output") + book_base = os.path.join(output_root, title) + + log(f"[M4B] Book base directory: {book_base}") + + if not os.path.isdir(book_base): + log(f"[M4B] Book directory missing: {book_base}") + return + + volumes = detect_volumes(book_base) + if not volumes: + log(f"[M4B] No volumes found for book_idx={book_idx}") + return + + log(f"[M4B] Volumes detected: {volumes}") + + # -------------------------------------------------------- + # Build canonical commands via scriptgen + # -------------------------------------------------------- + merge_block = build_merge_block( + title, author, [(i + 1, v) for i, v in enumerate(volumes)] + ) + commands = [c.strip() for c in merge_block.split("&&") if c.strip()] + + for volume, cmd in zip(volumes, commands): + audio_dir = os.path.join(book_base, volume, "Audio") + if not os.path.isdir(audio_dir): + log(f"[M4B] SKIP {volume}: no Audio directory") + continue + + log(f"[M4B] Running for volume={volume}") + log(f"[M4B] CMD: {cmd}") + + try: + result = subprocess.run( + cmd, + cwd=book_base, + shell=True, + capture_output=True, + text=True, + check=True, + ) + + if result.stdout: + log(f"[M4B][STDOUT] {result.stdout}") + + except subprocess.CalledProcessError as exc: + log(f"[M4B][FAILED] volume={volume}") + + if exc.stdout: + log(f"[M4B][STDOUT] {exc.stdout}") + if exc.stderr: + log(f"[M4B][STDERR] {exc.stderr}") + + store_m4b_error( + book_idx=book_idx, + volume=volume, + error_text=exc.stderr or str(exc), + ) + continue + + except Exception as exc: + log(f"[M4B][UNEXPECTED ERROR] volume={volume}: {exc}") + + store_m4b_error( + book_idx=book_idx, + volume=volume, + error_text=str(exc), + ) + continue + + log(f"[M4B] FINISHED book_idx={book_idx}") + + +# ------------------------------------------------------------ +# Orchestration helper (UNCHANGED) +# ------------------------------------------------------------ +@logcall +def queue_m4b_for_book(book_idx: str): + log(f"[M4B] Queuing m4b-tool for book_idx={book_idx}") + celery_app.send_task( + "scraper.tasks.m4b_tasks.run_m4btool", + args=[book_idx], + queue="m4b", + ) diff --git a/bookscraper/scraper/tasks/statuscheck.py b/bookscraper/scraper/tasks/statuscheck.py new file mode 100644 index 0000000..51be307 --- /dev/null +++ b/bookscraper/scraper/tasks/statuscheck.py @@ -0,0 +1,149 @@ +# ============================================================ +# File: scraper/tasks/statuscheck.py +# Purpose: +# Final status check after audio completion. +# +# Responsibilities: +# - Verify Redis counters (sanity check) +# - Verify filesystem (Audio files present) +# - Queue m4btool task +# +# Design rules: +# - Book-scope ONLY +# - No direct Redis usage +# - Repository is the single source of truth +# - Idempotent, defensive, non-blocking +# ============================================================ + +import os +from celery_app import celery_app +from logbus.publisher import log + +from scraper.logger_decorators import logcall + +from db.repository import ( + get_audio_done, + get_chapters_total, + set_status, + fetch_book, +) + +from scraper.tasks.m4b_tasks import run_m4btool + + +# ------------------------------------------------------------ +# Helpers +# ------------------------------------------------------------ +@log +def _detect_volumes(book_base: str): + """ + Return sorted list of Volume_XXX directories. + """ + vols = [] + for name in os.listdir(book_base): + if name.lower().startswith("volume_"): + full = os.path.join(book_base, name) + if os.path.isdir(full): + vols.append(name) + vols.sort() + return vols + + +@logcall +def _count_audio_files(audio_dir: str) -> int: + """ + Count .m4b files in an Audio directory. + """ + if not os.path.isdir(audio_dir): + return 0 + return len([f for f in os.listdir(audio_dir) if f.lower().endswith(".m4b")]) + + +# ------------------------------------------------------------ +# Celery task +# ------------------------------------------------------------ +@celery_app.task(bind=True, queue="controller", ignore_result=True) +@logcall +def run_statuscheck(self, book_idx: str): + """ + Final statuscheck before m4btool execution. + + Triggered exactly once by audio_completion quickcheck. + """ + + log(f"[STATUSCHECK] START book={book_idx}") + + # -------------------------------------------------------- + # 1. Redis sanity check (via repository) + # -------------------------------------------------------- + audio_done = get_audio_done(book_idx) + chapters_total = get_chapters_total(book_idx) + + log( + f"[STATUSCHECK] Counters book={book_idx} " + f"audio_done={audio_done} chapters_total={chapters_total}" + ) + + if chapters_total <= 0: + log(f"[STATUSCHECK] No chapters_total → abort") + return + + if audio_done < chapters_total: + # Defensive: should not happen, but never assume + log( + f"[STATUSCHECK] Audio not complete yet " + f"({audio_done}/{chapters_total}) → abort" + ) + return + + # -------------------------------------------------------- + # 2. Fetch book metadata (for paths & m4b meta) + # -------------------------------------------------------- + book = fetch_book(book_idx) + if not book: + log(f"[STATUSCHECK] Book not found in DB: {book_idx}") + return + + title = book.get("title") or book_idx + author = book.get("author") or "Unknown" + + # Base output directory + root = os.getenv("BOOKSCRAPER_OUTPUT_DIR", "output") + book_base = os.path.join(root, title) + + if not os.path.isdir(book_base): + log(f"[STATUSCHECK] Book directory missing: {book_base}") + return + + # -------------------------------------------------------- + # 3. Filesystem validation (light, non-blocking) + # -------------------------------------------------------- + volumes = _detect_volumes(book_base) + + if not volumes: + log(f"[STATUSCHECK] No volumes found for {book_idx}") + # Still allow m4btool to decide (it will no-op) + else: + for vol in volumes: + audio_dir = os.path.join(book_base, vol, "Audio") + count = _count_audio_files(audio_dir) + + log(f"[STATUSCHECK] {vol}: " f"{count} audio files detected") + + # -------------------------------------------------------- + # 4. Queue m4btool (final pipeline step) + # -------------------------------------------------------- + log(f"[STATUSCHECK] Queue m4btool for book={book_idx}") + + set_status(book_idx, "m4b_running") + + run_m4btool.delay( + book_idx=book_idx, + book_base=book_base, + meta={ + "title": title, + "author": author, + }, + ) + + log(f"[STATUSCHECK] DONE book={book_idx}") diff --git a/bookscraper/tools/mp4info b/bookscraper/tools/mp4info new file mode 100644 index 0000000..86b2f30 --- /dev/null +++ b/bookscraper/tools/mp4info @@ -0,0 +1,13 @@ +#!/bin/sh +# mp4info shim for m4b-tool (ffprobe-based) + +if [ -z "$1" ]; then + echo "Usage: mp4info " >&2 + exit 1 +fi + +# ffprobe outputs float seconds; m4b-tool expects an integer +ffprobe -v error \ + -show_entries format=duration \ + -of default=noprint_wrappers=1:nokey=1 \ + "$1" | awk '{ printf "%d\n", ($1 + 0.5) }'