add m4btool worker, statuscheck and audio completion flow

- Add dedicated m4b Docker image and mp4v2 tooling
- Introduce statuscheck task for filesystem-based validation
- Add audio completion service and m4b task pipeline
- Align repository counters with audio/m4b workflow
- Clean up legacy project.zip artifact
main
peter.fong 18 hours ago
parent 65842505b0
commit 7c2f65bbf4

@ -125,7 +125,8 @@ docker run \
``` ```
docker compose down docker compose down --remove-orphans
docker image prune -f
docker builder prune -af docker builder prune -af
docker volume prune -f docker volume prune -f
docker compose build --no-cache docker compose build --no-cache
@ -141,6 +142,10 @@ docker compose build --no-cache web && docker compose up web
docker compose build worker_download && docker compose up worker_download docker compose build worker_download && docker compose up worker_download
docker compose down --remove-orphans
docker compose build --no-cache worker_m4b
docker compose up -d worker_m4b
docker compose up web docker compose up web
docker compose build web docker compose build web
docker compose restart web docker compose restart web
@ -152,3 +157,9 @@ tar \
--exclude=".venv" \ --exclude=".venv" \
--exclude="venv" \ --exclude="venv" \
-czvf project.tar.gz . -czvf project.tar.gz .
docker compose down
docker image rm bookscraper-worker_m4b || true
docker builder prune -af
docker compose build --no-cache worker_m4b
docker compose up -d worker_m4b

@ -9,7 +9,7 @@
# - Provide a clean API for tasks and Flask UI # - Provide a clean API for tasks and Flask UI
# ============================================================ # ============================================================
# ============================================================ # ============================================================
# File: db/repository.py (UPDATED for book_idx-only architecture) # UPDATED — canonical read model via get_book_state
# ============================================================ # ============================================================
from scraper.logger_decorators import logcall from scraper.logger_decorators import logcall
@ -17,7 +17,6 @@ from logbus.publisher import log
import redis import redis
import os import os
import time
# ============================================================ # ============================================================
# SQL low-level engines (snapshot storage) # SQL low-level engines (snapshot storage)
@ -29,10 +28,6 @@ from db.state_sql import (
sql_set_chapters_total, sql_set_chapters_total,
sql_register_book, sql_register_book,
sql_update_book, sql_update_book,
sql_inc_downloaded,
sql_inc_parsed,
sql_inc_audio_done,
sql_inc_audio_skipped,
) )
# ============================================================ # ============================================================
@ -49,80 +44,34 @@ from db.state_redis import (
) )
# ============================================================ # ============================================================
# Redis setup for legacy progress paths # Redis client (read-only for legacy + guards)
# ============================================================ # ============================================================
REDIS_URL = os.getenv("REDIS_BROKER", "redis://redis:6379/0") REDIS_URL = os.getenv("REDIS_BROKER", "redis://redis:6379/0")
_r = redis.Redis.from_url(REDIS_URL, decode_responses=True) _r = redis.Redis.from_url(REDIS_URL, decode_responses=True)
# ============================================================ # ============================================================
# INTERNAL — LEGACY PROGRESS HELPERS (kept for UI) # LEGACY PROGRESS (UI only, unchanged)
# Keys remain: progress:{book_idx}:*
# ============================================================ # ============================================================
def _legacy_set_total(book_idx, total):
_r.set(f"progress:{book_idx}:total", total)
def _legacy_inc_completed(book_idx):
_r.incr(f"progress:{book_idx}:completed")
def _legacy_inc_skipped(book_idx):
_r.incr(f"progress:{book_idx}:skipped")
def _legacy_inc_failed(book_idx):
_r.incr(f"progress:{book_idx}:failed")
def _legacy_add_failed_chapter(book_idx, chapter, reason):
entry = f"Chapter {chapter}: {reason}"
_r.rpush(f"progress:{book_idx}:failed_list", entry)
def _legacy_get_failed_list(book_idx):
return _r.lrange(f"progress:{book_idx}:failed_list", 0, -1)
def _legacy_get_progress(book_idx): def _legacy_get_progress(book_idx):
total = int(_r.get(f"progress:{book_idx}:total") or 0)
completed = int(_r.get(f"progress:{book_idx}:completed") or 0)
skipped = int(_r.get(f"progress:{book_idx}:skipped") or 0)
failed = int(_r.get(f"progress:{book_idx}:failed") or 0)
abort = _r.exists(f"abort:{book_idx}") == 1
failed_list = _legacy_get_failed_list(book_idx)
return { return {
"book_idx": book_idx, "book_idx": book_idx,
"total": total, "total": int(_r.get(f"progress:{book_idx}:total") or 0),
"completed": completed, "completed": int(_r.get(f"progress:{book_idx}:completed") or 0),
"skipped": skipped, "skipped": int(_r.get(f"progress:{book_idx}:skipped") or 0),
"failed": failed, "failed": int(_r.get(f"progress:{book_idx}:failed") or 0),
"failed_list": failed_list, "abort": _r.exists(f"abort:{book_idx}") == 1,
"abort": abort, "failed_list": _r.lrange(f"progress:{book_idx}:failed_list", 0, -1),
} }
# ============================================================
# PUBLIC — PROGRESS API
# ============================================================
@logcall @logcall
def get_progress(book_idx): def get_progress(book_idx):
return _legacy_get_progress(book_idx) return _legacy_get_progress(book_idx)
@logcall
def add_failed_chapter(book_idx, chapter, reason):
_legacy_add_failed_chapter(book_idx, chapter, reason)
@logcall
def get_failed_list(book_idx):
return _legacy_get_failed_list(book_idx)
# ============================================================ # ============================================================
# FETCH OPERATIONS (SQLite snapshot) # FETCH (SQLite snapshot)
# ============================================================ # ============================================================
@logcall @logcall
def fetch_book(book_idx): def fetch_book(book_idx):
@ -135,7 +84,7 @@ def fetch_all_books():
# ============================================================ # ============================================================
# INIT-FLOW (SQLite metadata only) # INIT / UPDATE METADATA
# ============================================================ # ============================================================
@logcall @logcall
def register_book( def register_book(
@ -147,26 +96,22 @@ def register_book(
cover_path=None, cover_path=None,
book_url=None, book_url=None,
): ):
sql_register_book(
fields = { book_idx,
"book_idx": book_idx, {
"title": title, "book_idx": book_idx,
"author": author, "title": title,
"description": description, "author": author,
"cover_url": cover_url, "description": description,
"cover_path": cover_path, "cover_url": cover_url,
"book_url": book_url, "cover_path": cover_path,
"chapters_total": 0, "book_url": book_url,
"status": "registered", "chapters_total": 0,
} "status": "registered",
},
log(f"[DB] Registering new book_idx={book_idx} title='{title}'") )
sql_register_book(book_idx, fields)
# ============================================================
# SCRAPE-FLOW UPDATE
# ============================================================
@logcall @logcall
def update_book_after_full_scrape( def update_book_after_full_scrape(
book_idx, book_idx,
@ -176,9 +121,7 @@ def update_book_after_full_scrape(
cover_url=None, cover_url=None,
chapters_total=None, chapters_total=None,
): ):
fields = {} fields = {}
if title is not None: if title is not None:
fields["title"] = title fields["title"] = title
if author is not None: if author is not None:
@ -191,143 +134,72 @@ def update_book_after_full_scrape(
fields["chapters_total"] = chapters_total fields["chapters_total"] = chapters_total
fields["status"] = "active" fields["status"] = "active"
log(f"[DB] update metadata for book_idx={book_idx}")
sql_update_book(book_idx, fields) sql_update_book(book_idx, fields)
# ============================================================ # ============================================================
# ACTIVE BOOK LISTS # STATUS
# ============================================================
@logcall
def get_registered_books():
all_books = sql_fetch_all_books()
HIDDEN_STATES = {"hidden"}
log(f"[DB] Fetched all books for registered filter, total={len(all_books)}")
return [b for b in all_books if b.get("status") not in HIDDEN_STATES]
@logcall
def get_active_books():
all_books = sql_fetch_all_books()
HIDDEN_STATES = {"hidden", "done"}
log(f"[DB] Fetched all books for active filter, total={len(all_books)}")
return [b for b in all_books if b.get("status") not in HIDDEN_STATES]
# ============================================================
# STATUS MANAGEMENT
# ============================================================ # ============================================================
@logcall @logcall
def set_status(book_idx, status): def set_status(book_idx, status):
log(f"[DB] Setting status for {book_idx} to '{status}'")
redis_set_status(book_idx, status) redis_set_status(book_idx, status)
sql_set_status(book_idx, status) sql_set_status(book_idx, status)
# ============================================================ # ============================================================
# CHAPTER TOTALS # TOTALS
# ============================================================ # ============================================================
@logcall @logcall
def set_chapters_total(book_idx, total): def set_chapters_total(book_idx, total):
log(f"[DB] Setting chapter total for {book_idx} to {total}")
redis_set_chapters_total(book_idx, total) redis_set_chapters_total(book_idx, total)
sql_set_chapters_total(book_idx, total) sql_set_chapters_total(book_idx, total)
# _legacy_set_total(book_idx, total)
# ============================================================ # ============================================================
# COUNTERS — DOWNLOAD # COUNTERS — WRITE ONLY
# ============================================================ # ============================================================
@logcall @logcall
def inc_download_done(book_idx, amount=1): def inc_download_done(book_idx, amount=1):
log(f"[DB] Incrementing download done for {book_idx} by {amount}")
redis_inc_download_done(book_idx, amount) redis_inc_download_done(book_idx, amount)
# sql_inc_downloaded(book_idx, amount)
# _legacy_inc_completed(book_idx)
@logcall @logcall
def inc_download_skipped(book_idx, amount=1): def inc_download_skipped(book_idx, amount=1):
log(f"[DB] Incrementing download skipped for {book_idx} by {amount}")
redis_inc_download_skipped(book_idx, amount) redis_inc_download_skipped(book_idx, amount)
# _legacy_inc_skipped(book_idx)
# ============================================================
# COUNTERS — PARSE
# ============================================================
@logcall @logcall
def inc_parsed_done(book_idx, amount=1): def inc_parsed_done(book_idx, amount=1):
log(f"[DB] Incrementing parsed done for {book_idx} by {amount}")
redis_inc_parsed_done(book_idx, amount) redis_inc_parsed_done(book_idx, amount)
# sql_inc_parsed(book_idx, amount)
# ============================================================
# COUNTERS — AUDIO
# ============================================================
@logcall
def inc_audio_skipped(book_idx, amount=1):
log(f"[DB] Incrementing audio skipped for {book_idx} by {amount}")
# sql_inc_audio_skipped(book_idx, amount)
redis_inc_audio_skipped(book_idx, amount)
@logcall @logcall
def inc_audio_done(book_idx, amount=1): def inc_audio_done(book_idx, amount=1):
log(f"[DB] Incrementing audio done for {book_idx} by {amount}")
redis_inc_audio_done(book_idx, amount) redis_inc_audio_done(book_idx, amount)
# sql_inc_audio_done(book_idx, amount)
# ============================================================
# BACKWARDS COMPATIBILITY SHIMS
# These map the old API (book_id) to the new book_idx-only system
# ============================================================
@logcall @logcall
def inc_downloaded(book_idx, amount=1): def inc_audio_skipped(book_idx, amount=1):
return inc_download_done(book_idx, amount) redis_inc_audio_skipped(book_idx, amount)
@logcall
def inc_parsed(book_idx, amount=1):
return inc_parsed_done(book_idx, amount)
@logcall
def inc_audio_done_legacy(book_idx, amount=1):
return inc_audio_done(book_idx, amount)
# ============================================================ # ============================================================
# READ — DERIVED BOOK STATE # CANONICAL READ MODEL
# ============================================================ # ============================================================
@logcall @logcall
def get_book_state(book_idx): def get_book_state(book_idx):
""" """
Canonical merged read-model for a single book. Canonical merged read model.
Gedrag: Rules:
- Leest SQL (snapshot) - SQL = snapshot baseline
- Leest Redis (live counters) - Redis = live counters
- Rekent naar merged
- GEEN writes
- GEEN side-effects
Merge-regels:
- merged = max(sql, redis) - merged = max(sql, redis)
- merged wordt gecapt op chapters_total - capped at chapters_total
""" """
# ----------------------------------------------------
# 1. Fetch bronnen
# ----------------------------------------------------
sqlite_row = sql_fetch_book(book_idx) or {} sqlite_row = sql_fetch_book(book_idx) or {}
redis_state = _r.hgetall(f"book:{book_idx}:state") or {}
key = f"book:{book_idx}:state"
redis_state = _r.hgetall(key) or {}
def _int(v): def _int(v):
try: try:
@ -335,56 +207,114 @@ def get_book_state(book_idx):
except Exception: except Exception:
return 0 return 0
# ----------------------------------------------------
# 2. SQL snapshot
# ----------------------------------------------------
chapters_total = _int(sqlite_row.get("chapters_total")) chapters_total = _int(sqlite_row.get("chapters_total"))
# SQL snapshot
sql_downloaded = _int(sqlite_row.get("downloaded")) sql_downloaded = _int(sqlite_row.get("downloaded"))
sql_audio_done = _int(sqlite_row.get("audio_done")) sql_audio_done = _int(sqlite_row.get("audio_done"))
sql_audio_skipped = _int(sqlite_row.get("audio_skipped"))
# ---------------------------------------------------- # Redis live
# 3. Redis live counters
# ----------------------------------------------------
redis_downloaded = _int(redis_state.get("chapters_download_done")) + _int( redis_downloaded = _int(redis_state.get("chapters_download_done")) + _int(
redis_state.get("chapters_download_skipped") redis_state.get("chapters_download_skipped")
) )
redis_audio_done = _int(redis_state.get("audio_done")) redis_audio_done = _int(redis_state.get("audio_done"))
redis_audio_skipped = _int(redis_state.get("audio_skipped"))
# ---------------------------------------------------- # Merge
# 4. Merge (SQL vs Redis)
# ----------------------------------------------------
merged_downloaded = max(sql_downloaded, redis_downloaded) merged_downloaded = max(sql_downloaded, redis_downloaded)
merged_audio_done = max(sql_audio_done, redis_audio_done) merged_audio_done = max(sql_audio_done, redis_audio_done)
merged_audio_skipped = max(sql_audio_skipped, redis_audio_skipped)
if chapters_total > 0: if chapters_total > 0:
merged_downloaded = min(merged_downloaded, chapters_total) merged_downloaded = min(merged_downloaded, chapters_total)
merged_audio_done = min(merged_audio_done, chapters_total) merged_audio_done = min(merged_audio_done, chapters_total)
merged_audio_skipped = min(merged_audio_skipped, chapters_total)
audio_completed = merged_audio_done + merged_audio_skipped
# Build state
state = dict(sqlite_row)
state.update(
{
"downloaded": merged_downloaded,
"audio_done": merged_audio_done,
"audio_skipped": merged_audio_skipped,
"chapters_total": chapters_total,
}
)
# ---------------------------------------------------- # Derived status
# 5. Bouw merged state (read-only) status = sqlite_row.get("status") or "unknown"
# ----------------------------------------------------
state = {}
# Basis = SQL
state.update(sqlite_row)
# Overschrijf alleen met merged conclusies
state["downloaded"] = merged_downloaded
state["audio_done"] = merged_audio_done
state["chapters_total"] = chapters_total
# ----------------------------------------------------
# 4b. Derive status (READ-ONLY)
# ----------------------------------------------------
derived_status = sqlite_row.get("status") or "unknown"
if chapters_total > 0: if chapters_total > 0:
if merged_downloaded < chapters_total: if merged_downloaded < chapters_total:
derived_status = "downloading" status = "downloading"
elif merged_downloaded == chapters_total and merged_audio_done < chapters_total: elif merged_downloaded == chapters_total and audio_completed < chapters_total:
derived_status = "audio" status = "audio"
elif merged_audio_done == chapters_total: elif audio_completed >= chapters_total:
derived_status = "done" status = "done"
state["status"] = derived_status
state["status"] = status
return state return state
# ============================================================
# READ HELPERS (VIA get_book_state ONLY)
# ============================================================
@logcall
def get_chapters_total(book_idx):
return int(get_book_state(book_idx).get("chapters_total", 0))
@logcall
def get_audio_done(book_idx):
return int(get_book_state(book_idx).get("audio_done", 0))
@logcall
def get_audio_completed_total(book_idx):
state = get_book_state(book_idx)
return int(state.get("audio_done", 0)) + int(state.get("audio_skipped", 0))
# ============================================================
# STATUSCHECK GUARD (INTENTIONAL DIRECT REDIS)
# ============================================================
@logcall
def try_trigger_statuscheck(book_idx):
return bool(_r.set(f"book:{book_idx}:statuscheck:triggered", "1", nx=True))
# ============================================================
# ACTIVE / REGISTERED BOOK LISTS (UI API)
# ============================================================
@logcall
def get_registered_books():
"""
Books visible in the 'registered' list in the UI.
"""
all_books = sql_fetch_all_books()
HIDDEN_STATES = {"hidden"}
return [b for b in all_books if b.get("status") not in HIDDEN_STATES]
@logcall
def get_active_books():
"""
Books currently active in the dashboard.
"""
all_books = sql_fetch_all_books()
HIDDEN_STATES = {"hidden", "done"}
return [b for b in all_books if b.get("status") not in HIDDEN_STATES]
@logcall
def store_m4b_error(book_idx: str, volume: str, error_text: str):
"""
Passive storage of m4b errors.
No logic, no retries, no state transitions.
"""
key = f"book:{book_idx}:m4b:errors"
entry = f"{volume}: {error_text}"
_r.rpush(key, entry)

@ -149,3 +149,22 @@ services:
- .env - .env
command: celery -A celery_app worker -Q scraping -n scraping@%h -l INFO command: celery -A celery_app worker -Q scraping -n scraping@%h -l INFO
restart: "no" restart: "no"
# ----------------------------------------------------------
# M4B Worker (Finalization)
# ----------------------------------------------------------
worker_m4b:
build:
context: .
dockerfile: docker/Dockerfile.m4b
container_name: worker_m4b
command: celery -A celery_app worker -Q m4b -n m4b@%h -l INFO
depends_on:
redis:
condition: service_healthy
env_file:
- .env
volumes:
- .:/app
- /Users/peter/mnt/asustor/Sync/bookscraper/books:/Users/peter/mnt/asustor/Sync/bookscraper/books
- /Users/peter/mnt/asustor/Sync/bookscraper/db:/Users/peter/mnt/asustor/Sync/bookscraper/db
restart: "no"

@ -0,0 +1,70 @@
FROM debian:12
ENV DEBIAN_FRONTEND=noninteractive
# ----------------------------------------------------------
# System + PHP (PHP 8.2 native)
# ----------------------------------------------------------
RUN apt-get update && apt-get install -y \
ffmpeg \
curl \
ca-certificates \
bash \
php-cli \
php-intl \
php-json \
php-mbstring \
php-xml \
php-curl \
php-zip \
python3 \
python3-pip \
python3-venv \
\
# build deps for mp4v2
git \
build-essential \
autoconf \
automake \
libtool \
pkg-config \
&& rm -rf /var/lib/apt/lists/*
# ----------------------------------------------------------
# Python venv (PEP 668 compliant)
# ----------------------------------------------------------
RUN python3 -m venv /opt/venv
ENV PATH="/opt/venv/bin:/usr/local/bin:$PATH"
# ----------------------------------------------------------
# Build & install mp4v2 (mp4info)
# ----------------------------------------------------------
WORKDIR /tmp
RUN git clone https://github.com/sandreas/mp4v2 \
&& cd mp4v2 \
&& ./configure \
&& make -j$(nproc) \
&& make install \
&& echo "/usr/local/lib" > /etc/ld.so.conf.d/mp4v2.conf \
&& ldconfig \
&& cd / \
&& rm -rf /tmp/mp4v2
# ----------------------------------------------------------
# Install m4b-tool
# ----------------------------------------------------------
RUN curl -L https://github.com/sandreas/m4b-tool/releases/latest/download/m4b-tool.phar \
-o /usr/local/bin/m4b-tool \
&& chmod +x /usr/local/bin/m4b-tool
# ----------------------------------------------------------
# App
# ----------------------------------------------------------
WORKDIR /app
COPY requirements.txt /app/requirements.txt
RUN pip install --no-cache-dir -r requirements.txt
COPY . /app
CMD ["bash"]

@ -0,0 +1 @@
Subproject commit 480a73324f53d0d24bea4931c3902097f8e2a663

Binary file not shown.

@ -0,0 +1,94 @@
# ============================================================
# File: scraper/services/audio_completion.py
# Purpose:
# Orchestration hook after audio completion.
#
# Rules (STRICT):
# - ALWAYS read via get_book_state()
# - Use ONLY merged counters from repository
# - NO usage of derived status field
# - Completion rule:
# audio_completed < chapters_total → NOT DONE
# ============================================================
from logbus.publisher import log
from scraper.logger_decorators import logcall
from db.repository import (
get_book_state,
try_trigger_statuscheck,
)
from scraper.services.status_check_service import StatusCheckService
from scraper.tasks.m4b_tasks import queue_m4b_for_book
@logcall
def trigger_audio_completion_check(book_idx: str):
"""
Called after inc_audio_done() OR inc_audio_skipped().
Flow:
1. Fetch canonical merged state from repository
2. Evaluate completion via merged counters ONLY
3. Run filesystem validation (authoritative)
4. Apply idempotency guard
5. Queue m4b exactly once
"""
try:
# ----------------------------------------------------
# STEP 1 — CANONICAL MERGED STATE
# ----------------------------------------------------
state = get_book_state(book_idx)
chapters_total = int(state.get("chapters_total", 0))
audio_done = int(state.get("audio_done", 0))
audio_skipped = int(state.get("audio_skipped", 0))
audio_completed = audio_done + audio_skipped
log(
f"[AUDIO-COMPLETION] book={book_idx} "
f"audio_completed={audio_completed} chapters_total={chapters_total}"
)
# ----------------------------------------------------
# STEP 2 — FAST REJECT (MERGED COUNTERS ONLY)
# ----------------------------------------------------
if chapters_total <= 0 or audio_completed < chapters_total:
log(f"[AUDIO-COMPLETION] not yet complete for book={book_idx}")
return
# ----------------------------------------------------
# STEP 3 — FILESYSTEM VALIDATION (AUTHORITATIVE)
# ----------------------------------------------------
result = StatusCheckService.run(book_idx)
fs = result.get("filesystem", {})
audio_files = fs.get("audio_files", 0)
chapters_txt = fs.get("chapters_txt", 0)
effective_audio = audio_files + audio_skipped
if effective_audio < chapters_txt:
log(
f"[AUDIO-COMPLETION] FS validation failed "
f"(audio_files={audio_files}, skipped={audio_skipped}, txt={chapters_txt})"
)
return
# ----------------------------------------------------
# STEP 4 — IDEMPOTENCY GUARD (AFTER FS CONFIRMATION)
# ----------------------------------------------------
if not try_trigger_statuscheck(book_idx):
log(f"[AUDIO-COMPLETION] statuscheck already triggered for {book_idx}")
return
# ----------------------------------------------------
# STEP 5 — FINAL ACTION
# ----------------------------------------------------
log(f"[AUDIO-COMPLETION] DONE → queue m4b for book={book_idx}")
queue_m4b_for_book(book_idx)
except Exception as exc:
# MUST NEVER break audio workers
log(f"[AUDIO-COMPLETION][ERROR] book={book_idx} error={exc}")

@ -17,6 +17,7 @@ from scraper.abort import abort_requested
from scraper.logger_decorators import logcall from scraper.logger_decorators import logcall
from redis import Redis from redis import Redis
from urllib.parse import urlparse from urllib.parse import urlparse
from scraper.services.audio_completion import trigger_audio_completion_check
# NEW — unified repository façade # NEW — unified repository façade
from db.repository import ( from db.repository import (
@ -166,6 +167,7 @@ def generate_audio(
log(f"[AUDIO] CH{chapter_number}: Already exists → skip") log(f"[AUDIO] CH{chapter_number}: Already exists → skip")
redis_client.delete(slot_key) redis_client.delete(slot_key)
inc_audio_skipped(book_id) inc_audio_skipped(book_id)
trigger_audio_completion_check(book_id)
return return
# ------------------------------------------------------------ # ------------------------------------------------------------
@ -191,6 +193,8 @@ def generate_audio(
# NEW — repository façade # NEW — repository façade
inc_audio_done(book_id) inc_audio_done(book_id)
trigger_audio_completion_check(book_id)
log(f"trigger_audio_completion_check ")
log(f"[AUDIO]({HOST}) CH{chapter_number}: Completed") log(f"[AUDIO]({HOST}) CH{chapter_number}: Completed")
except subprocess.TimeoutExpired: except subprocess.TimeoutExpired:

@ -0,0 +1,132 @@
# ============================================================
# File: scraper/tasks/m4b_tasks.py
# ============================================================
import os
import subprocess
from typing import List
from celery_app import celery_app
from logbus.publisher import log
from scraper.logger_decorators import logcall
from db.repository import fetch_book, store_m4b_error
from scraper.scriptgen import build_merge_block
# ------------------------------------------------------------
# Helper: detect volumes (UNCHANGED)
# ------------------------------------------------------------
def detect_volumes(book_base: str) -> List[str]:
volumes = []
for name in os.listdir(book_base):
if name.lower().startswith("volume_"):
full = os.path.join(book_base, name)
if os.path.isdir(full):
volumes.append(name)
volumes.sort()
return volumes
# ------------------------------------------------------------
# Celery task
# ------------------------------------------------------------
@celery_app.task(bind=True, queue="m4b", ignore_result=True)
@logcall
def run_m4btool(self, book_idx: str):
log(f"[M4B] START book_idx={book_idx}")
book = fetch_book(book_idx)
if not book:
log(f"[M4B] Book not found in SQL: book_idx={book_idx}")
return
title = book.get("title", book_idx)
author = book.get("author", "Unknown")
output_root = os.getenv("BOOKSCRAPER_OUTPUT_DIR", "output")
book_base = os.path.join(output_root, title)
log(f"[M4B] Book base directory: {book_base}")
if not os.path.isdir(book_base):
log(f"[M4B] Book directory missing: {book_base}")
return
volumes = detect_volumes(book_base)
if not volumes:
log(f"[M4B] No volumes found for book_idx={book_idx}")
return
log(f"[M4B] Volumes detected: {volumes}")
# --------------------------------------------------------
# Build canonical commands via scriptgen
# --------------------------------------------------------
merge_block = build_merge_block(
title, author, [(i + 1, v) for i, v in enumerate(volumes)]
)
commands = [c.strip() for c in merge_block.split("&&") if c.strip()]
for volume, cmd in zip(volumes, commands):
audio_dir = os.path.join(book_base, volume, "Audio")
if not os.path.isdir(audio_dir):
log(f"[M4B] SKIP {volume}: no Audio directory")
continue
log(f"[M4B] Running for volume={volume}")
log(f"[M4B] CMD: {cmd}")
try:
result = subprocess.run(
cmd,
cwd=book_base,
shell=True,
capture_output=True,
text=True,
check=True,
)
if result.stdout:
log(f"[M4B][STDOUT] {result.stdout}")
except subprocess.CalledProcessError as exc:
log(f"[M4B][FAILED] volume={volume}")
if exc.stdout:
log(f"[M4B][STDOUT] {exc.stdout}")
if exc.stderr:
log(f"[M4B][STDERR] {exc.stderr}")
store_m4b_error(
book_idx=book_idx,
volume=volume,
error_text=exc.stderr or str(exc),
)
continue
except Exception as exc:
log(f"[M4B][UNEXPECTED ERROR] volume={volume}: {exc}")
store_m4b_error(
book_idx=book_idx,
volume=volume,
error_text=str(exc),
)
continue
log(f"[M4B] FINISHED book_idx={book_idx}")
# ------------------------------------------------------------
# Orchestration helper (UNCHANGED)
# ------------------------------------------------------------
@logcall
def queue_m4b_for_book(book_idx: str):
log(f"[M4B] Queuing m4b-tool for book_idx={book_idx}")
celery_app.send_task(
"scraper.tasks.m4b_tasks.run_m4btool",
args=[book_idx],
queue="m4b",
)

@ -0,0 +1,149 @@
# ============================================================
# File: scraper/tasks/statuscheck.py
# Purpose:
# Final status check after audio completion.
#
# Responsibilities:
# - Verify Redis counters (sanity check)
# - Verify filesystem (Audio files present)
# - Queue m4btool task
#
# Design rules:
# - Book-scope ONLY
# - No direct Redis usage
# - Repository is the single source of truth
# - Idempotent, defensive, non-blocking
# ============================================================
import os
from celery_app import celery_app
from logbus.publisher import log
from scraper.logger_decorators import logcall
from db.repository import (
get_audio_done,
get_chapters_total,
set_status,
fetch_book,
)
from scraper.tasks.m4b_tasks import run_m4btool
# ------------------------------------------------------------
# Helpers
# ------------------------------------------------------------
@log
def _detect_volumes(book_base: str):
"""
Return sorted list of Volume_XXX directories.
"""
vols = []
for name in os.listdir(book_base):
if name.lower().startswith("volume_"):
full = os.path.join(book_base, name)
if os.path.isdir(full):
vols.append(name)
vols.sort()
return vols
@logcall
def _count_audio_files(audio_dir: str) -> int:
"""
Count .m4b files in an Audio directory.
"""
if not os.path.isdir(audio_dir):
return 0
return len([f for f in os.listdir(audio_dir) if f.lower().endswith(".m4b")])
# ------------------------------------------------------------
# Celery task
# ------------------------------------------------------------
@celery_app.task(bind=True, queue="controller", ignore_result=True)
@logcall
def run_statuscheck(self, book_idx: str):
"""
Final statuscheck before m4btool execution.
Triggered exactly once by audio_completion quickcheck.
"""
log(f"[STATUSCHECK] START book={book_idx}")
# --------------------------------------------------------
# 1. Redis sanity check (via repository)
# --------------------------------------------------------
audio_done = get_audio_done(book_idx)
chapters_total = get_chapters_total(book_idx)
log(
f"[STATUSCHECK] Counters book={book_idx} "
f"audio_done={audio_done} chapters_total={chapters_total}"
)
if chapters_total <= 0:
log(f"[STATUSCHECK] No chapters_total → abort")
return
if audio_done < chapters_total:
# Defensive: should not happen, but never assume
log(
f"[STATUSCHECK] Audio not complete yet "
f"({audio_done}/{chapters_total}) → abort"
)
return
# --------------------------------------------------------
# 2. Fetch book metadata (for paths & m4b meta)
# --------------------------------------------------------
book = fetch_book(book_idx)
if not book:
log(f"[STATUSCHECK] Book not found in DB: {book_idx}")
return
title = book.get("title") or book_idx
author = book.get("author") or "Unknown"
# Base output directory
root = os.getenv("BOOKSCRAPER_OUTPUT_DIR", "output")
book_base = os.path.join(root, title)
if not os.path.isdir(book_base):
log(f"[STATUSCHECK] Book directory missing: {book_base}")
return
# --------------------------------------------------------
# 3. Filesystem validation (light, non-blocking)
# --------------------------------------------------------
volumes = _detect_volumes(book_base)
if not volumes:
log(f"[STATUSCHECK] No volumes found for {book_idx}")
# Still allow m4btool to decide (it will no-op)
else:
for vol in volumes:
audio_dir = os.path.join(book_base, vol, "Audio")
count = _count_audio_files(audio_dir)
log(f"[STATUSCHECK] {vol}: " f"{count} audio files detected")
# --------------------------------------------------------
# 4. Queue m4btool (final pipeline step)
# --------------------------------------------------------
log(f"[STATUSCHECK] Queue m4btool for book={book_idx}")
set_status(book_idx, "m4b_running")
run_m4btool.delay(
book_idx=book_idx,
book_base=book_base,
meta={
"title": title,
"author": author,
},
)
log(f"[STATUSCHECK] DONE book={book_idx}")

@ -0,0 +1,13 @@
#!/bin/sh
# mp4info shim for m4b-tool (ffprobe-based)
if [ -z "$1" ]; then
echo "Usage: mp4info <file>" >&2
exit 1
fi
# ffprobe outputs float seconds; m4b-tool expects an integer
ffprobe -v error \
-show_entries format=duration \
-of default=noprint_wrappers=1:nokey=1 \
"$1" | awk '{ printf "%d\n", ($1 + 0.5) }'
Loading…
Cancel
Save