# ============================================================ # File: scraper/tasks/controller_tasks.py # Purpose: # Start the download → parse → save pipeline for a scraped book, # including progress/abort tracking via book_id. # ONLY THE CONTROLLER UPDATES PROGRESS (initial total). # ============================================================ from celery_app import celery_app from logbus.publisher import log from scraper.download_controller import DownloadController from scraper.progress import ( set_total, ) from scraper.abort import abort_requested print(">>> [IMPORT] controller_tasks.py loaded") @celery_app.task(bind=True, queue="controller", ignore_result=False) def launch_downloads(self, book_id: str, scrape_result: dict): """ Launch the entire pipeline (download → parse → save), AND initialize progress counters. Chapter-level progress is updated INSIDE the download/parse/save tasks. This task MUST NOT call .get() on async subtasks (Celery restriction). """ title = scrape_result.get("title", "UnknownBook") chapters = scrape_result.get("chapters", []) or [] total = len(chapters) log(f"[CTRL] Book '{title}' → {total} chapters (book_id={book_id})") # ------------------------------------------------------------ # INIT PROGRESS # ------------------------------------------------------------ set_total(book_id, total) log(f"[CTRL] Progress initialized for {book_id}: total={total}") # ------------------------------------------------------------ # BUILD CONTROLLER # ------------------------------------------------------------ ctl = DownloadController(book_id, scrape_result) # ------------------------------------------------------------ # START PIPELINES (ASYNC) # Returns a celery group AsyncResult. We DO NOT iterate or get(). # Progress & failures are handled by the worker subtasks. # ------------------------------------------------------------ try: group_result = ctl.start() log( f"[CTRL] Pipelines dispatched for '{title}' " f"(book_id={book_id}, group_id={group_result.id})" ) # Abort flag set BEFORE tasks start? if abort_requested(book_id): log(f"[CTRL] ABORT requested before tasks start") return {"book_id": book_id, "aborted": True} except Exception as exc: log(f"[CTRL] ERROR while dispatching pipelines: {exc}") raise # ------------------------------------------------------------ # CONTROLLER DOES NOT WAIT FOR SUBTASK RESULTS # (Download/parse/save tasks update progress themselves) # ------------------------------------------------------------ log(f"[CTRL] Controller finished dispatch for book_id={book_id}") return { "book_id": book_id, "total": total, "started": True, "group_id": group_result.id, }