You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
kmftools/bookscraper/scraper/tasks/audio_tasks.py

189 lines
6.1 KiB

# ============================================================
# File: scraper/tasks/audio_tasks.py
# ============================================================
from celery_app import celery_app
from logbus.publisher import log
import os
import subprocess
import time
from scraper.progress import inc_audio_done, inc_audio_skipped
# from db.repository import inc_audio_done
from scraper.abort import abort_requested
from redis import Redis
from urllib.parse import urlparse
# Kies lokale redis als aanwezig, anders standaard backend
redis_url = os.getenv("REDIS_BACKEND_LOCAL") or os.getenv("REDIS_BACKEND")
parsed = urlparse(redis_url)
# ------------------------------------------------------------
# REGULIER REDIS CLIENT (slots, file checks, state)
# ------------------------------------------------------------
redis_client = Redis(
host=parsed.hostname,
port=parsed.port,
db=parsed.path.strip("/"),
)
# ------------------------------------------------------------
# BACKEND CLIENT (abort flags, progress counters) - altijd DB 0
# ------------------------------------------------------------
backend_client = Redis(
host=parsed.hostname,
port=parsed.port,
db=0,
)
AUDIO_TIMEOUT = int(os.getenv("AUDIO_TIMEOUT_SECONDS", "300"))
AUDIO_VOICE = os.getenv("AUDIO_VOICE", "SinJi")
AUDIO_RATE = int(os.getenv("AUDIO_RATE", "200"))
HOST_PATH = os.getenv("HOST_PATH", "/app/output")
AUDIO_SLOTS = int(os.getenv("AUDIO_SLOTS", "1"))
CONTAINER_PREFIX = os.getenv("BOOKSCRAPER_OUTPUT_DIR", "/app/output")
@celery_app.task(bind=True, queue="audio", ignore_result=True)
def generate_audio(
self, book_id, volume_name, chapter_number, chapter_title, chapter_text
):
log(f"[AUDIO] CH{chapter_number}: START task → raw_input={chapter_text}")
# Abort early
if abort_requested(book_id, backend_client):
inc_audio_skipped(book_id)
log(f"[AUDIO] ABORT detected → skip CH{chapter_number}")
return
# ============================================================
# ACQUIRE AUDIO SLOT
# ============================================================
slot_key = None
ttl = AUDIO_TIMEOUT + 15
for i in range(1, AUDIO_SLOTS + 1):
key = f"audio_slot:{i}"
if redis_client.set(key, "1", nx=True, ex=ttl):
slot_key = key
log(f"[AUDIO] CH{chapter_number}: Acquired slot {i}/{AUDIO_SLOTS}")
break
if slot_key is None:
log(f"[AUDIO] CH{chapter_number}: All slots busy → waiting...")
start_wait = time.time()
while slot_key is None:
for i in range(1, AUDIO_SLOTS + 1):
key = f"audio_slot:{i}"
if redis_client.set(key, "1", nx=True, ex=ttl):
slot_key = key
log(f"[AUDIO] CH{chapter_number}: Slot acquired after wait")
break
if slot_key:
break
if abort_requested(book_id, backend_client):
log(f"[AUDIO] ABORT while waiting → skip CH{chapter_number}")
return
if time.time() - start_wait > ttl:
log(f"[AUDIO] CH{chapter_number}: Slot wait timeout → aborting audio")
return
time.sleep(0.25)
# ============================================================
# PATH NORMALISATION
# ============================================================
container_path = chapter_text
# Fix 1 — container_path kan None zijn → abort zonder crash
if not container_path:
log(f"[AUDIO] CH{chapter_number}: FATAL — no input path provided")
redis_client.delete(slot_key)
return
# Fix 2 — veilige startswith
if CONTAINER_PREFIX and container_path.startswith(CONTAINER_PREFIX):
relative_path = container_path[len(CONTAINER_PREFIX) :].lstrip("/")
else:
relative_path = container_path
parts = relative_path.split("/")
if len(parts) < 3:
log(
f"[AUDIO] CH{chapter_number}: FATAL — cannot parse book/volume from {relative_path}"
)
redis_client.delete(slot_key)
return
book_from_path = parts[0]
volume_from_path = parts[1]
host_path = os.path.join(HOST_PATH, relative_path)
# ============================================================
# OUTPUT PREP
# ============================================================
base_dir = os.path.join(HOST_PATH, book_from_path, volume_from_path, "Audio")
os.makedirs(base_dir, exist_ok=True)
safe_num = f"{chapter_number:04d}"
audio_file = os.path.join(base_dir, f"{safe_num}.m4b")
if os.path.exists(audio_file):
log(f"[AUDIO] Skip CH{chapter_number} → already exists")
redis_client.delete(slot_key)
return
# ============================================================
# BUILD CMD
# ============================================================
cmd = (
f"say --voice={AUDIO_VOICE} "
f"--input-file='{host_path}' "
f"--output-file='{audio_file}' "
f"--file-format=m4bf "
f"--quality=127 "
f"-r {AUDIO_RATE} "
f"--data-format=aac"
)
log(f"[AUDIO] CH{chapter_number}: CMD = {cmd}")
# ============================================================
# RUN TTS
# ============================================================
try:
subprocess.run(cmd, shell=True, check=True, timeout=AUDIO_TIMEOUT)
inc_audio_done(book_id)
log(f"[AUDIO] CH{chapter_number}: Completed")
except subprocess.TimeoutExpired:
log(f"[AUDIO] CH{chapter_number}: TIMEOUT → remove incomplete file")
if os.path.exists(audio_file):
try:
os.remove(audio_file)
except Exception:
pass
except subprocess.CalledProcessError as e:
log(f"[AUDIO] CH{chapter_number}: ERROR during say → {e}")
except Exception as e:
log(f"[AUDIO] CH{chapter_number}: UNEXPECTED ERROR → {e}")
finally:
if slot_key:
redis_client.delete(slot_key)
log(f"[AUDIO] CH{chapter_number}: Released slot")