# ============================================================ # File: scraper/tasks/audio_tasks.py # Purpose: Convert chapter text files into audio using macOS # “say”, with Redis-based slot control. # Updated: now uses db.repository for audio counters. # ============================================================ from celery_app import celery_app from logbus.publisher import log import os import subprocess import time import socket import os from scraper.abort import abort_requested from scraper.logger_decorators import logcall from redis import Redis from urllib.parse import urlparse # NEW — unified repository façade from db.repository import ( inc_audio_done, inc_audio_skipped, ) HOST = socket.gethostname() # ------------------------------------------------------------ # REDIS CLIENT SETUP # ------------------------------------------------------------ redis_url = os.getenv("REDIS_BACKEND_LOCAL") or os.getenv("REDIS_BACKEND") parsed = urlparse(redis_url) # Slot locking Redis client redis_client = Redis( host=parsed.hostname, port=parsed.port, db=parsed.path.strip("/"), ) # Abort + global progress flags always live in DB 0 backend_client = Redis( host=parsed.hostname, port=parsed.port, db=0, ) # ------------------------------------------------------------ # CONFIG # ------------------------------------------------------------ AUDIO_TIMEOUT = int(os.getenv("AUDIO_TIMEOUT_SECONDS", "300")) AUDIO_VOICE = os.getenv("AUDIO_VOICE", "SinJi") AUDIO_RATE = int(os.getenv("AUDIO_RATE", "200")) HOST_PATH = os.getenv("HOST_PATH", "/app/output") CONTAINER_PREFIX = os.getenv("BOOKSCRAPER_OUTPUT_DIR", "/app/output") AUDIO_SLOTS = int(os.getenv("AUDIO_SLOTS", "1")) # ============================================================ # CELERY TASK # ============================================================ @celery_app.task(bind=True, queue="audio", ignore_result=True) @logcall def generate_audio( self, book_id, volume_name, chapter_number, chapter_title, chapter_path ): """ chapter_path: absolute container path to chapter text file. """ log(f"[AUDIO]({HOST}) CH{chapter_number}: START → {chapter_title}") # ------------------------------------------------------------ # ABORT CHECK # ------------------------------------------------------------ if abort_requested(book_id, backend_client): inc_audio_skipped(book_id) log(f"[AUDIO]({HOST}) ABORT detected → skip CH{chapter_number}") return # ------------------------------------------------------------ # ACQUIRE SLOT # ------------------------------------------------------------ slot_key = None ttl = AUDIO_TIMEOUT + 15 for i in range(1, AUDIO_SLOTS + 1): key = f"audio_slot:{i}" if redis_client.set(key, "1", nx=True, ex=ttl): slot_key = key log(f"[AUDIO] CH{chapter_number}: Acquired slot {i}/{AUDIO_SLOTS}") break # Need to wait if slot_key is None: log(f"[AUDIO] CH{chapter_number}: All slots busy → waiting…") start_wait = time.time() while slot_key is None: # Try all slots again for i in range(1, AUDIO_SLOTS + 1): key = f"audio_slot:{i}" if redis_client.set(key, "1", nx=True, ex=ttl): slot_key = key log(f"[AUDIO] CH{chapter_number}: Slot acquired after wait") break # If still no slot if not slot_key: if abort_requested(book_id, backend_client): log(f"[AUDIO] ABORT while waiting → skip CH{chapter_number}") inc_audio_skipped(book_id) return if time.time() - start_wait > ttl: log(f"[AUDIO] CH{chapter_number}: Wait timeout → abort audio") inc_audio_skipped(book_id) return time.sleep(0.25) # ------------------------------------------------------------ # PATH NORMALISATION # ------------------------------------------------------------ container_path = chapter_path if not container_path: log(f"[AUDIO] CH{chapter_number}: ERROR — no input file path provided") redis_client.delete(slot_key) inc_audio_skipped(book_id) return # Strip container prefix so that host path is resolvable if CONTAINER_PREFIX and container_path.startswith(CONTAINER_PREFIX): relative_path = container_path[len(CONTAINER_PREFIX) :].lstrip("/") else: relative_path = container_path parts = relative_path.split("/") if len(parts) < 3: log( f"[AUDIO] CH{chapter_number}: ERROR — cannot parse book/volume from {relative_path}" ) redis_client.delete(slot_key) inc_audio_skipped(book_id) return # book_from_path = parts[0] # volume_name passed explicitly anyway # volume_from_path = parts[1] host_path = os.path.join(HOST_PATH, relative_path) # ------------------------------------------------------------ # OUTPUT DIRECTORY # ------------------------------------------------------------ base_dir = os.path.join(HOST_PATH, parts[0], parts[1], "Audio") os.makedirs(base_dir, exist_ok=True) safe_num = f"{chapter_number:04d}" audio_file = os.path.join(base_dir, f"{safe_num}.m4b") # Skip if audio already exists if os.path.exists(audio_file): log(f"[AUDIO] CH{chapter_number}: Already exists → skip") redis_client.delete(slot_key) inc_audio_skipped(book_id) return # ------------------------------------------------------------ # BUILD TTS COMMAND # ------------------------------------------------------------ cmd = ( f"say --voice={AUDIO_VOICE} " f"--input-file='{host_path}' " f"--output-file='{audio_file}' " f"--file-format=m4bf " f"--quality=127 " f"-r {AUDIO_RATE} " f"--data-format=aac" ) log(f"[AUDIO]({HOST}) CH{chapter_number} → output: {audio_file}") # ------------------------------------------------------------ # EXECUTE # ------------------------------------------------------------ try: subprocess.run(cmd, shell=True, check=True, timeout=AUDIO_TIMEOUT) # NEW — repository façade inc_audio_done(book_id) log(f"[AUDIO]({HOST}) CH{chapter_number}: Completed") except subprocess.TimeoutExpired: log(f"[AUDIO]({HOST}) CH{chapter_number}: TIMEOUT → removing file") if os.path.exists(audio_file): try: os.remove(audio_file) except Exception: pass inc_audio_skipped(book_id) except subprocess.CalledProcessError as e: log(f"[AUDIO] CH{chapter_number}: ERROR during say → {e}") inc_audio_skipped(book_id) except Exception as e: log(f"[AUDIO] CH{chapter_number}: UNEXPECTED ERROR → {e}") inc_audio_skipped(book_id) finally: if slot_key: redis_client.delete(slot_key) log(f"[AUDIO] CH{chapter_number}: Released slot")