abort for audiotasks

feat/audiotasks
peter.fong 2 weeks ago
parent 6154b396e3
commit e0695cf216

@ -1 +1,4 @@
output/ output/
venv/
*.log
__pycache__/

@ -123,7 +123,8 @@ docker run \
bookscraper bookscraper
```
docker compose down docker compose down
docker compose build --no-cache docker compose build --no-cache
docker compose up docker compose up
```

@ -0,0 +1,65 @@
#!/usr/bin/env python3
"""
Local macOS Audio Worker runs outside Docker so macOS 'say' works.
"""
import os
import subprocess
from dotenv import load_dotenv
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
ENV_FILE = os.path.join(BASE_DIR, ".env")
# Load .env if present
if os.path.exists(ENV_FILE):
load_dotenv(ENV_FILE)
print(f"[AUDIO-LOCAL] Loaded .env from {ENV_FILE}")
else:
print("[AUDIO-LOCAL] WARNING: no .env found")
def main():
print("=====================================================")
print(" LOCAL macOS AUDIO WORKER")
print(" Queue : audio")
print(" Voice :", os.getenv("AUDIO_VOICE"))
print(" Rate :", os.getenv("AUDIO_RATE"))
print("=====================================================")
# ----------------------------------------------------------
# OVERRIDES: Local Redis instead of Docker internal hostname
# ----------------------------------------------------------
broker = os.getenv("REDIS_BROKER_LOCAL", "redis://127.0.0.1:6379/0")
backend = os.getenv("REDIS_BACKEND_LOCAL", "redis://127.0.0.1:6379/1")
os.environ["CELERY_BROKER_URL"] = broker
os.environ["CELERY_RESULT_BACKEND"] = backend
print(f"[AUDIO-LOCAL] Using Redis broker : {broker}")
print(f"[AUDIO-LOCAL] Using Redis backend: {backend}")
# ----------------------------------------------------------
# Celery command
# macOS requires prefork pool, and we use a single-line list.
# ----------------------------------------------------------
cmd = [
"celery",
"-A",
"celery_app",
"worker",
"-Q",
"audio",
"-n",
"audio_local@%h",
"-l",
"INFO",
"--pool=prefork",
]
print("[AUDIO-LOCAL] Launching Celery via subprocess…")
subprocess.run(cmd, check=False)
if __name__ == "__main__":
main()

@ -22,6 +22,10 @@ celery_app = Celery(
"scraper.tasks.download_tasks", "scraper.tasks.download_tasks",
"scraper.tasks.parse_tasks", "scraper.tasks.parse_tasks",
"scraper.tasks.save_tasks", "scraper.tasks.save_tasks",
# --------------------------------------------------------
# AUDIO TASKS (NEW)
# --------------------------------------------------------
"scraper.tasks.audio_tasks",
], ],
) )
@ -31,6 +35,10 @@ celery_app.conf.task_routes = {
"scraper.tasks.download_tasks.*": {"queue": "download"}, "scraper.tasks.download_tasks.*": {"queue": "download"},
"scraper.tasks.parse_tasks.*": {"queue": "parse"}, "scraper.tasks.parse_tasks.*": {"queue": "parse"},
"scraper.tasks.save_tasks.*": {"queue": "save"}, "scraper.tasks.save_tasks.*": {"queue": "save"},
# ------------------------------------------------------------
# AUDIO ROUTING (NEW)
# ------------------------------------------------------------
"scraper.tasks.audio_tasks.*": {"queue": "audio"},
} }
# ------------------------------------------------------------ # ------------------------------------------------------------

@ -5,6 +5,8 @@ services:
redis: redis:
image: redis:7 image: redis:7
container_name: bookscraper_redis container_name: bookscraper_redis
ports:
- "6379:6379"
healthcheck: healthcheck:
test: ["CMD", "redis-cli", "ping"] test: ["CMD", "redis-cli", "ping"]
interval: 2s interval: 2s
@ -113,25 +115,6 @@ services:
command: celery -A celery_app worker -Q save -n save@%h -l INFO command: celery -A celery_app worker -Q save -n save@%h -l INFO
restart: "no" restart: "no"
# ----------------------------------------------------------
# Audio Worker (macOS only)
# ----------------------------------------------------------
worker_audio:
build:
context: .
dockerfile: docker/Dockerfile.audio
container_name: worker_audio
volumes:
- .:/app
- /Users/peter/Desktop/books:/app/output
depends_on:
redis:
condition: service_healthy
env_file:
- .env
command: celery -A celery_app worker -Q audio -n audio@%h -l INFO
restart: "no"
# ---------------------------------------------------------- # ----------------------------------------------------------
# Scraping Worker # Scraping Worker
# ---------------------------------------------------------- # ----------------------------------------------------------

@ -1,9 +1,15 @@
FROM python:3.12-slim FROM python:3.12-slim
WORKDIR /app WORKDIR /app
# Install audio worker dependencies
COPY requirements.audio.txt /app/requirements.audio.txt COPY requirements.audio.txt /app/requirements.audio.txt
RUN pip install --no-cache-dir -r /app/requirements.audio.txt RUN pip install --no-cache-dir -r /app/requirements.audio.txt
# Celery is noodzakelijk voor de worker
RUN pip install --no-cache-dir celery
# Copy project
COPY . /app COPY . /app
CMD ["python3", "-c", "print('audio worker ready')"] # Start the AUDIO Celery worker
CMD ["celery", "-A", "celery_app", "worker", "-Q", "audio", "-n", "audio@%h", "-l", "INFO"]

@ -1,64 +1,106 @@
import os import os
import redis import redis
# GUI log (non-breaking)
from scraper.ui_log import push_ui
# --------------------------------------------------------- # ---------------------------------------------------------
# Redis connection # Default Redis connection (Docker workers)
# --------------------------------------------------------- # ---------------------------------------------------------
REDIS_URL = os.getenv("REDIS_URL", "redis://redis:6379/0") REDIS_URL = os.getenv("REDIS_URL", "redis://redis:6379/0")
r = redis.Redis.from_url(REDIS_URL, decode_responses=True) r = redis.Redis.from_url(REDIS_URL, decode_responses=True)
# Debug mode (optional)
ABORT_DEBUG = os.getenv("ABORT_DEBUG", "1") == "1"
# Internal flag to avoid spamming the same message
_seen_debug_keys = set()
# ========================================================= # =========================================================
# ABORT FLAG # ABORT FLAG
# ========================================================= # =========================================================
def _debug(msg: str):
"""Print + GUI log (non-breaking, minimal noise)."""
print(msg)
push_ui(msg)
def set_abort(book_id: str): def set_abort(book_id: str):
""" """Enable abort mode for this book."""
Enable abort mode for this book. key = f"abort:{book_id}"
All download tasks that haven't started yet will immediately exit. r.set(key, "1")
"""
r.set(f"abort:{book_id}", "1") if ABORT_DEBUG:
_debug(f"[ABORT] SET {key}")
def clear_abort(book_id: str): def clear_abort(book_id: str):
""" """Clear abort flag."""
Clear abort flag so future runs are unaffected. key = f"abort:{book_id}"
""" r.delete(key)
r.delete(f"abort:{book_id}")
if ABORT_DEBUG:
_debug(f"[ABORT] CLEAR {key}")
def abort_requested(book_id: str) -> bool: def abort_requested(book_id: str, redis_client=None) -> bool:
""" """
True if abort flag is set for this book. Return True if abort flag is set.
redis_client:
- Docker workers None use default Redis (r)
- Local macOS audio passes Redis(host=127.0.0.1)
""" """
return r.exists(f"abort:{book_id}") == 1 client = redis_client or r
key = f"abort:{book_id}"
try:
exists = client.exists(key)
if ABORT_DEBUG:
# Log once per key
if key not in _seen_debug_keys:
try:
conn = client.connection_pool.connection_kwargs
host = conn.get("host")
port = conn.get("port")
db = conn.get("db")
_debug(
f"[ABORT_DEBUG] first check book_id={book_id} "
f"redis={host}:{port} db={db}"
)
except Exception:
_debug(f"[ABORT_DEBUG] first check book_id={book_id}")
_seen_debug_keys.add(key)
# Only log abort ACTIVE
if exists == 1:
_debug(f"[ABORT] ACTIVE for {book_id}")
return exists == 1
except Exception as e:
if ABORT_DEBUG:
_debug(f"[ABORT_DEBUG] ERROR checking {key}: {e}")
return False
# ========================================================= # =========================================================
# PER-CHAPTER STATE # PER-CHAPTER STATE
# ========================================================= # =========================================================
# We mark a chapter "started" once its download task begins.
# If abort is activated AFTER download start:
# → download must complete
# → parse must complete
# → save must complete
# All subsequent chapters will skip.
def mark_chapter_started(book_id: str, chapter_num: int): def mark_chapter_started(book_id: str, chapter_num: int):
"""
Mark this chapter as started. Parse/save will always run after this,
even if abort has been activated afterwards.
"""
key = f"started:{book_id}:{chapter_num}" key = f"started:{book_id}:{chapter_num}"
r.set(key, "1") r.set(key, "1")
def chapter_started(book_id: str, chapter_num: int) -> bool: def chapter_started(book_id: str, chapter_num: int) -> bool:
"""
Return True if this chapter has already started downloading.
"""
key = f"started:{book_id}:{chapter_num}" key = f"started:{book_id}:{chapter_num}"
return r.exists(key) == 1 return r.exists(key) == 1
@ -70,13 +112,11 @@ def chapter_started(book_id: str, chapter_num: int) -> bool:
def reset_book_state(book_id: str): def reset_book_state(book_id: str):
""" """
Optional utility: remove abort flag and all started-chapter markers. Remove abort flag and all chapter-start markers.
Useful during testing or manual cleanup.
""" """
# Remove abort flag key = f"abort:{book_id}"
r.delete(f"abort:{book_id}") r.delete(key)
# Remove all "started:*" keys for this book
pattern = f"started:{book_id}:*" pattern = f"started:{book_id}:*"
for key in r.scan_iter(pattern): for k in r.scan_iter(pattern):
r.delete(key) r.delete(k)

@ -4,16 +4,17 @@
# Build Celery pipelines for all chapters # Build Celery pipelines for all chapters
# and pass book_id for abort/progress/log functionality. # and pass book_id for abort/progress/log functionality.
# + Download and replicate cover image to all volume folders # + Download and replicate cover image to all volume folders
# + Generate scripts (allinone.txt, makebook.txt, say.txt) # + Generate scripts (allinone.txt, makebook, say)
# ========================================================= # =========================================================
from celery import group from celery import group
from scraper.tasks.pipeline import build_chapter_pipeline from scraper.tasks.pipeline import build_chapter_pipeline
from scraper.scriptgen import generate_all_scripts # <-- ADDED from scraper.scriptgen import generate_all_scripts
from logbus.publisher import log from logbus.publisher import log
import os import os
import requests import requests
import shutil import shutil
from scraper.abort import abort_requested # DEBUG allowed
class DownloadController: class DownloadController:
@ -24,7 +25,7 @@ class DownloadController:
- consistent meta propagation - consistent meta propagation
- book_id-based abort + progress tracking - book_id-based abort + progress tracking
- cover download + volume replication - cover download + volume replication
- script generation (allinone.txt, makebook.txt, say.txt) - script generation (allinone.txt, makebook, say)
""" """
def __init__(self, book_id: str, scrape_result: dict): def __init__(self, book_id: str, scrape_result: dict):
@ -54,6 +55,17 @@ class DownloadController:
"book_url": scrape_result.get("book_url"), "book_url": scrape_result.get("book_url"),
} }
# -------------------------------------------------
# DEBUG — bevestig dat controller correct book_id ziet
# -------------------------------------------------
log(f"[CTRL_DEBUG] Controller init book_id={book_id} title='{self.title}'")
try:
abort_state = abort_requested(book_id)
log(f"[CTRL_DEBUG] abort_requested(book_id={book_id}) → {abort_state}")
except Exception as e:
log(f"[CTRL_DEBUG] abort_requested ERROR: {e}")
# --------------------------------------------------------- # ---------------------------------------------------------
# Cover Download # Cover Download
# --------------------------------------------------------- # ---------------------------------------------------------
@ -133,7 +145,7 @@ class DownloadController:
log(f"[CTRL] Output root: {self.book_base}") log(f"[CTRL] Output root: {self.book_base}")
# ------------------------------------- # -------------------------------------
# 1) Download cover before any pipelines # 1) Download cover
# ------------------------------------- # -------------------------------------
self.download_cover() self.download_cover()
@ -147,7 +159,7 @@ class DownloadController:
tasks.append( tasks.append(
build_chapter_pipeline( build_chapter_pipeline(
self.book_id, # UUID self.book_id,
chapter_num, chapter_num,
chapter_url, chapter_url,
volume_path, volume_path,
@ -162,13 +174,16 @@ class DownloadController:
f"(book_id={self.book_id}, group_id={async_result.id})" f"(book_id={self.book_id}, group_id={async_result.id})"
) )
# ------------------------------------------------------- # Debug abort state
# 2) AFTER dispatch: cover replication to volume folders try:
abort_state = abort_requested(self.book_id)
log(f"[CTRL_DEBUG] After-dispatch abort state: {abort_state}")
except Exception as e:
log(f"[CTRL_DEBUG] abort_requested error after dispatch: {e}")
# ------------------------------------------------------- # -------------------------------------------------------
self.replicate_cover_to_volumes() self.replicate_cover_to_volumes()
# -------------------------------------------------------
# 3) Generate scripts (allinone, makebook, say)
# ------------------------------------------------------- # -------------------------------------------------------
try: try:
generate_all_scripts( generate_all_scripts(

@ -1,10 +0,0 @@
# tasks/audio.py
from celery import shared_task
from logbus.publisher import log
@shared_task(bind=True, queue="audio")
def text_to_audio(self, text_file):
log(f"[AUDIO] converting: {text_file}")
# placeholder for macOS "say"
return True

@ -0,0 +1,183 @@
# ============================================================
# File: scraper/tasks/audio_tasks.py
# ============================================================
from celery_app import celery_app
from logbus.publisher import log
import os
import subprocess
import time
from scraper.abort import abort_requested
from redis import Redis
from urllib.parse import urlparse
# Kies lokale redis als aanwezig, anders standaard backend
redis_url = os.getenv("REDIS_BACKEND_LOCAL") or os.getenv("REDIS_BACKEND")
parsed = urlparse(redis_url)
# ------------------------------------------------------------
# REGULIER REDIS CLIENT (slots, file checks, state)
# ------------------------------------------------------------
redis_client = Redis(
host=parsed.hostname,
port=parsed.port,
db=parsed.path.strip("/"),
)
# ------------------------------------------------------------
# BACKEND CLIENT (abort flags, progress counters) - altijd DB 0
# ------------------------------------------------------------
backend_client = Redis(
host=parsed.hostname,
port=parsed.port,
db=0,
)
AUDIO_TIMEOUT = int(os.getenv("AUDIO_TIMEOUT_SECONDS", "300"))
AUDIO_VOICE = os.getenv("AUDIO_VOICE", "SinJi")
AUDIO_RATE = int(os.getenv("AUDIO_RATE", "200"))
HOST_PATH = os.getenv("HOST_PATH", "/app/output")
AUDIO_SLOTS = int(os.getenv("AUDIO_SLOTS", "1"))
CONTAINER_PREFIX = os.getenv("BOOKSCRAPER_OUTPUT_DIR", "/app/output")
@celery_app.task(bind=True, queue="audio", ignore_result=True)
def generate_audio(
self, book_id, volume_name, chapter_number, chapter_title, chapter_text
):
log(f"[AUDIO] CH{chapter_number}: START task → raw_input={chapter_text}")
# Abort early
if abort_requested(book_id, backend_client):
log(f"[AUDIO] ABORT detected → skip CH{chapter_number}")
return
# ============================================================
# ACQUIRE AUDIO SLOT
# ============================================================
slot_key = None
ttl = AUDIO_TIMEOUT + 15
for i in range(1, AUDIO_SLOTS + 1):
key = f"audio_slot:{i}"
if redis_client.set(key, "1", nx=True, ex=ttl):
slot_key = key
log(f"[AUDIO] CH{chapter_number}: Acquired slot {i}/{AUDIO_SLOTS}")
break
if slot_key is None:
log(f"[AUDIO] CH{chapter_number}: All slots busy → waiting...")
start_wait = time.time()
while slot_key is None:
for i in range(1, AUDIO_SLOTS + 1):
key = f"audio_slot:{i}"
if redis_client.set(key, "1", nx=True, ex=ttl):
slot_key = key
log(f"[AUDIO] CH{chapter_number}: Slot acquired after wait")
break
if slot_key:
break
if abort_requested(book_id, backend_client):
log(f"[AUDIO] ABORT while waiting → skip CH{chapter_number}")
return
if time.time() - start_wait > ttl:
log(f"[AUDIO] CH{chapter_number}: Slot wait timeout → aborting audio")
return
time.sleep(0.25)
# ============================================================
# PATH NORMALISATION
# ============================================================
container_path = chapter_text
log(f"[AUDIO] CH{chapter_number}: container_path={container_path}")
# 1) Strip container prefix to get relative path: BOOK/VOLUME/FILE
if container_path.startswith(CONTAINER_PREFIX):
relative_path = container_path[len(CONTAINER_PREFIX) :].lstrip("/")
else:
relative_path = container_path # fallback
parts = relative_path.split("/")
if len(parts) < 3:
log(
f"[AUDIO] CH{chapter_number}: FATAL — cannot parse book/volume from {relative_path}"
)
if slot_key:
redis_client.delete(slot_key)
return
book_from_path = parts[0]
volume_from_path = parts[1]
# 2) Construct real host path
host_path = os.path.join(HOST_PATH, relative_path)
log(f"[AUDIO] CH{chapter_number}: resolved_host_path={host_path}")
# ============================================================
# PREPARE OUTPUT DIR (always correct)
# ============================================================
base_dir = os.path.join(HOST_PATH, book_from_path, volume_from_path, "Audio")
os.makedirs(base_dir, exist_ok=True)
safe_num = f"{chapter_number:04d}"
audio_file = os.path.join(base_dir, f"{safe_num}.m4a")
log(f"[AUDIO] CH{chapter_number}: output_file={audio_file}")
if os.path.exists(audio_file):
log(f"[AUDIO] Skip CH{chapter_number} → already exists")
redis_client.delete(slot_key)
return
# ============================================================
# BUILD CMD
# ============================================================
cmd = (
f"say --voice={AUDIO_VOICE} "
f"--input-file='{host_path}' "
f"--output-file='{audio_file}' "
f"--file-format=m4bf "
f"--quality=127 "
f"-r {AUDIO_RATE} "
f"--data-format=aac"
)
log(f"[AUDIO] CH{chapter_number}: CMD = {cmd}")
# ============================================================
# RUN TTS
# ============================================================
try:
subprocess.run(cmd, shell=True, check=True, timeout=AUDIO_TIMEOUT)
log(f"[AUDIO] CH{chapter_number}: Completed")
except subprocess.TimeoutExpired:
log(f"[AUDIO] CH{chapter_number}: TIMEOUT → remove incomplete file")
if os.path.exists(audio_file):
try:
os.remove(audio_file)
except Exception:
pass
except subprocess.CalledProcessError as e:
log(f"[AUDIO] CH{chapter_number}: ERROR during say → {e}")
except Exception as e:
log(f"[AUDIO] CH{chapter_number}: UNEXPECTED ERROR → {e}")
finally:
if slot_key:
redis_client.delete(slot_key)
log(f"[AUDIO] CH{chapter_number}: Released slot")

@ -127,6 +127,7 @@ def download_chapter(
msg = f"[ABORT] Skip chapter {chapter_num} (abort active, not started)" msg = f"[ABORT] Skip chapter {chapter_num} (abort active, not started)"
log_msg(book_id, msg) log_msg(book_id, msg)
return { return {
"book_id": book_id,
"chapter": chapter_num, "chapter": chapter_num,
"url": chapter_url, "url": chapter_url,
"html": None, "html": None,
@ -146,6 +147,7 @@ def download_chapter(
if os.path.exists(save_path): if os.path.exists(save_path):
log_msg(book_id, f"[DL] SKIP {chapter_num} (exists) → {save_path}") log_msg(book_id, f"[DL] SKIP {chapter_num} (exists) → {save_path}")
return { return {
"book_id": book_id,
"chapter": chapter_num, "chapter": chapter_num,
"url": chapter_url, "url": chapter_url,
"html": None, "html": None,
@ -185,6 +187,7 @@ def download_chapter(
log_msg(book_id, f"[DL] OK {chapter_num}: {len(html)} bytes") log_msg(book_id, f"[DL] OK {chapter_num}: {len(html)} bytes")
return { return {
"book_id": book_id,
"chapter": chapter_num, "chapter": chapter_num,
"url": chapter_url, "url": chapter_url,
"html": html, "html": html,

@ -37,6 +37,10 @@ def parse_chapter(self, download_result: dict, meta: dict):
if download_result.get("skipped"): if download_result.get("skipped"):
chapter = download_result.get("chapter") chapter = download_result.get("chapter")
log_msg(book_id, f"[PARSE] SKIP chapter {chapter} (download skipped)") log_msg(book_id, f"[PARSE] SKIP chapter {chapter} (download skipped)")
# Ensure book_id is present in the returned dict
download_result["book_id"] = book_id
return download_result return download_result
# ------------------------------------------------------------ # ------------------------------------------------------------

@ -3,8 +3,14 @@
# Purpose: # Purpose:
# Build Celery chains for chapter processing. # Build Celery chains for chapter processing.
# #
# download → parse → save → update_progress # Chain:
# download_chapter(book_id, chapter_num, url, base_path)
# → parse_chapter(download_result, meta)
# → save_chapter(parsed_result, base_path)
# → update_progress(final_result, book_id)
# #
# All subtasks must pass through result dicts untouched so the
# next stage receives the correct fields.
# ========================================================= # =========================================================
from celery import chain from celery import chain
@ -12,7 +18,7 @@ from celery import chain
from scraper.tasks.download_tasks import download_chapter from scraper.tasks.download_tasks import download_chapter
from scraper.tasks.parse_tasks import parse_chapter from scraper.tasks.parse_tasks import parse_chapter
from scraper.tasks.save_tasks import save_chapter from scraper.tasks.save_tasks import save_chapter
from scraper.tasks.progress_tasks import update_progress # NEW from scraper.tasks.progress_tasks import update_progress
def build_chapter_pipeline( def build_chapter_pipeline(
@ -23,17 +29,17 @@ def build_chapter_pipeline(
meta: dict, meta: dict,
): ):
""" """
Chapter pipeline: Build a Celery chain for one chapter.
download_chapter(book_id, chapter_num, url, base_path) download_chapter(book_id, chapter_number, chapter_url, base_path)
parse_chapter(download_result, meta) parse_chapter(download_result, meta)
save_chapter(parsed_result, base_path) save_chapter(parsed_result, base_path)
update_progress(result, book_id) update_progress(result, book_id)
""" """
return chain( return chain(
download_chapter.s(book_id, chapter_number, chapter_url, base_path), download_chapter.s(book_id, chapter_number, chapter_url, base_path),
parse_chapter.s(meta), parse_chapter.s(meta),
save_chapter.s(base_path), save_chapter.s(base_path),
update_progress.s(book_id), # ← centrale progress update update_progress.s(book_id),
) )

@ -16,6 +16,10 @@ def update_progress(result: dict, book_id: str):
Central progress logic: Central progress logic:
- result: output of save_chapter - result: output of save_chapter
- book_id: explicitly passed by pipeline - book_id: explicitly passed by pipeline
IMPORTANT:
- save_chapter already updates counters for skipped & normal chapters
- progress.update MUST NOT double-increment
""" """
ch = result.get("chapter") ch = result.get("chapter")
@ -25,11 +29,14 @@ def update_progress(result: dict, book_id: str):
if failed: if failed:
inc_failed(book_id) inc_failed(book_id)
log(f"[PROG] FAILED chapter {ch}") log(f"[PROG] FAILED chapter {ch}")
elif skipped: elif skipped:
inc_skipped(book_id) # save_chapter already did:
inc_completed(book_id) # inc_skipped + inc_completed
log(f"[PROG] SKIPPED chapter {ch}") log(f"[PROG] SKIPPED chapter {ch}")
else: else:
# Normal completion: save_chapter only does inc_completed
inc_completed(book_id) inc_completed(book_id)
log(f"[PROG] DONE chapter {ch}") log(f"[PROG] DONE chapter {ch}")

@ -1,7 +1,7 @@
# ========================================================= # ============================================================
# File: scraper/tasks/save_tasks.py # File: scraper/tasks/save_tasks.py
# Purpose: Save parsed chapter text to disk. # Purpose: Save parsed chapter text to disk + trigger audio.
# ========================================================= # ============================================================
print(">>> [IMPORT] save_tasks.py loaded") print(">>> [IMPORT] save_tasks.py loaded")
@ -14,9 +14,11 @@ from scraper.progress import (
inc_completed, inc_completed,
inc_skipped, inc_skipped,
inc_failed, inc_failed,
add_failed_chapter, # <-- enige noodzakelijke aanvulling add_failed_chapter,
) )
from scraper.tasks.audio_tasks import generate_audio
@shared_task(bind=True, queue="save", ignore_result=False) @shared_task(bind=True, queue="save", ignore_result=False)
def save_chapter(self, parsed: dict, base_path: str): def save_chapter(self, parsed: dict, base_path: str):
@ -37,32 +39,59 @@ def save_chapter(self, parsed: dict, base_path: str):
chapter = parsed.get("chapter") chapter = parsed.get("chapter")
# ------------------------------------------------------------ # ------------------------------------------------------------
# SKIP CASE (from download or parse stage) # SKIP CASE (download or parse skipped the chapter)
# ------------------------------------------------------------ # ------------------------------------------------------------
if parsed.get("skipped"): if parsed.get("skipped"):
path = parsed.get("path", "(no-path)") path = parsed.get("path", "(no-path)")
log_msg(book_id, f"[SAVE] SKIP chapter {chapter}{path}") log_msg(book_id, f"[SAVE] SKIP chapter {chapter}{path}")
inc_skipped(book_id) inc_skipped(book_id)
return {"chapter": chapter, "path": path, "skipped": True}
# Determine volume name from the base path
volume_name = os.path.basename(base_path.rstrip("/"))
# Queue audio using the existing saved file
try:
generate_audio.delay(
book_id,
volume_name,
chapter,
f"Chapter {chapter}",
path, # <<-- correct: this is always the real file path
)
log_msg(
book_id,
f"[AUDIO] Task queued (SKIPPED) for chapter {chapter} in {volume_name}",
)
except Exception as audio_exc:
log_msg(
book_id,
f"[AUDIO] ERROR queueing (SKIPPED) chapter {chapter}: {audio_exc}",
)
return {
"book_id": book_id, # <<< FIXED
"chapter": chapter,
"path": path,
"skipped": True,
}
# ------------------------------------------------------------ # ------------------------------------------------------------
# NORMAL SAVE # NORMAL SAVE CASE
# ------------------------------------------------------------ # ------------------------------------------------------------
try: try:
text = parsed.get("text", "") text = parsed.get("text", "")
url = parsed.get("url")
if chapter is None: if chapter is None:
raise ValueError("Missing chapter number in parsed payload") raise ValueError("Missing chapter number in parsed payload")
# Ensure folder exists # Ensure chapter folder exists
os.makedirs(base_path, exist_ok=True) os.makedirs(base_path, exist_ok=True)
# Build file path # Build chapter file path
path = get_save_path(chapter, base_path) path = get_save_path(chapter, base_path)
# Write chapter text # Save chapter text to disk
with open(path, "w", encoding="utf-8") as f: with open(path, "w", encoding="utf-8") as f:
f.write(text) f.write(text)
@ -70,12 +99,25 @@ def save_chapter(self, parsed: dict, base_path: str):
inc_completed(book_id) inc_completed(book_id)
# Determine volume name
volume_name = os.path.basename(base_path.rstrip("/"))
# Queue audio task (always use the saved file path)
try:
generate_audio.delay(
book_id,
volume_name,
chapter,
f"Chapter {chapter}",
path,
)
log_msg(
book_id, f"[AUDIO] Task queued for chapter {chapter} in {volume_name}"
)
except Exception as audio_exc:
log_msg(book_id, f"[AUDIO] ERROR queueing chapter {chapter}: {audio_exc}")
return {"book_id": book_id, "chapter": chapter, "path": path} return {"book_id": book_id, "chapter": chapter, "path": path}
except Exception as exc: except Exception as exc:
log_msg(book_id, f"[SAVE] ERROR saving chapter {chapter}: {exc}") log_msg(book_id, f"[SAVE] ERROR saving chapter {chapter}: {exc}")
inc_failed(book_id)
add_failed_chapter(book_id, chapter, str(exc)) # <-- essentieel
raise

@ -7,7 +7,6 @@
from celery_app import celery_app from celery_app import celery_app
from logbus.publisher import log from logbus.publisher import log
import os import os
import uuid
import redis import redis
from scraper.sites import BookSite from scraper.sites import BookSite
@ -51,12 +50,14 @@ def start_scrape_book(self, url: str):
log(f"[SCRAPING] Completed scrape: {len(chapters)}/{full_count} chapters") log(f"[SCRAPING] Completed scrape: {len(chapters)}/{full_count} chapters")
# ------------------------------------------------------------ # ------------------------------------------------------------
# BOOK RUN ID # BOOK RUN ID (CHANGED: use book title instead of UUID)
# ------------------------------------------------------------ # ------------------------------------------------------------
book_id = str(uuid.uuid4()) title = result.get("title") or "UnknownBook"
book_id = title # ← your requirement: title is unique and consistent
result["book_id"] = book_id result["book_id"] = book_id
log(f"[SCRAPING] Assigned book_id = {book_id}") log(f"[SCRAPING] Assigned book_id = '{book_id}'")
# ------------------------------------------------------------ # ------------------------------------------------------------
# RESET ABORT + INITIALISE PROGRESS # RESET ABORT + INITIALISE PROGRESS
@ -80,7 +81,7 @@ def start_scrape_book(self, url: str):
queue="controller", queue="controller",
) )
log(f"[SCRAPING] Dispatched download controller for {book_id}") log(f"[SCRAPING] Dispatched download controller for '{book_id}'")
return { return {
"book_id": book_id, "book_id": book_id,

@ -0,0 +1,46 @@
#!/bin/bash
set -e
echo ""
echo "====================================================="
echo " STARTING LOCAL macOS AUDIO WORKER"
echo "====================================================="
echo ""
# ------------------------------------------------------
# Create venv if needed
# ------------------------------------------------------
if [ ! -d ".venv" ]; then
echo "[AUDIO] No .venv found — creating virtualenv..."
python3 -m venv .venv
else
echo "[AUDIO] Existing .venv found"
fi
# Activate virtualenv
echo "[AUDIO] Activating .venv"
source .venv/bin/activate
# ------------------------------------------------------
# Install requirements
# ------------------------------------------------------
REQ="requirements.audio.txt"
if [ ! -f "$REQ" ]; then
echo "[AUDIO] ERROR — $REQ not found!"
exit 1
fi
echo "[AUDIO] Installing audio requirements..."
pip install -r "$REQ"
# Celery must be installed locally too
echo "[AUDIO] Ensuring Celery installed..."
pip install celery
# ------------------------------------------------------
# Start the worker
# ------------------------------------------------------
echo ""
echo "[AUDIO] Starting audio worker..."
python3 audio_worker_local.py
Loading…
Cancel
Save