# ============================================================ # File: scraper/tasks/controller_tasks.py # Purpose: # Start the download → parse → save pipeline for a scraped book, # including progress/abort tracking via book_id. # ONLY THE CONTROLLER UPDATES PROGRESS (initial total). # ============================================================ from celery_app import celery_app from logbus.publisher import log from scraper.download_controller import DownloadController from scraper.progress import ( set_total, ) from urllib.parse import urlparse import redis import os from scraper.abort import abort_requested print(">>> [IMPORT] controller_tasks.py loaded") @celery_app.task(bind=True, queue="controller", ignore_result=False) def launch_downloads(self, book_id: str, scrape_result: dict): """ Launch the entire pipeline (download → parse → save), AND initialize progress counters. Chapter-level progress is updated INSIDE the download/parse/save tasks. This task MUST NOT call .get() on async subtasks (Celery restriction). """ title = scrape_result.get("title", "UnknownBook") chapters = scrape_result.get("chapters", []) or [] total = len(chapters) # ------------------------------------------------------------ # INIT BOOK STATE MODEL (required for Active Books dashboard) # ------------------------------------------------------------ broker_url = os.getenv("REDIS_BROKER", "redis://redis:6379/0") parsed = urlparse(broker_url) state = redis.Redis( host=parsed.hostname, port=parsed.port, db=int(parsed.path.strip("/")), decode_responses=True, ) # Book metadata state.set(f"book:{book_id}:title", title) state.set(f"book:{book_id}:status", "starting") # Download counters state.set(f"book:{book_id}:download:total", total) state.set(f"book:{book_id}:download:done", 0) # Audio counters (start at zero) state.set(f"book:{book_id}:audio:done", 0) # ------------------------------------------------------------ # INIT PROGRESS # ------------------------------------------------------------ set_total(book_id, total) log(f"[CTRL] Progress initialized for {book_id}: total={total}") # ------------------------------------------------------------ # BUILD CONTROLLER # ------------------------------------------------------------ ctl = DownloadController(book_id, scrape_result) # ------------------------------------------------------------ # START PIPELINES (ASYNC) # Returns a celery group AsyncResult. We DO NOT iterate or get(). # Progress & failures are handled by the worker subtasks. # ------------------------------------------------------------ try: group_result = ctl.start() log( f"[CTRL] Pipelines dispatched for '{title}' " f"(book_id={book_id}, group_id={group_result.id})" ) # Abort flag set BEFORE tasks start? if abort_requested(book_id): log(f"[CTRL] ABORT requested before tasks start") return {"book_id": book_id, "aborted": True} except Exception as exc: log(f"[CTRL] ERROR while dispatching pipelines: {exc}") raise # ------------------------------------------------------------ # CONTROLLER DOES NOT WAIT FOR SUBTASK RESULTS # (Download/parse/save tasks update progress themselves) # ------------------------------------------------------------ log(f"[CTRL] Controller finished dispatch for book_id={book_id}") return { "book_id": book_id, "total": total, "started": True, "group_id": group_result.id, }