# ============================================================ # File: scraper/tasks/controller_tasks.py # Purpose: # Start the download → parse → save pipeline for a scraped book, # including progress/abort tracking via book_id. # ONLY THE CONTROLLER UPDATES PROGRESS. # ============================================================ from celery_app import celery_app from logbus.publisher import log from scraper.download_controller import DownloadController from scraper.progress import ( set_total, inc_completed, inc_skipped, inc_failed, ) from scraper.abort import abort_requested print(">>> [IMPORT] controller_tasks.py loaded") @celery_app.task(bind=True, queue="controller", ignore_result=False) def launch_downloads(self, book_id: str, scrape_result: dict): """ Launch the entire pipeline (download → parse → save), AND maintain progress counters. EXPECTS: book_id: ID generated in scraping.start_scrape_book scrape_result: dict with title, author, url, chapters[] """ title = scrape_result.get("title", "UnknownBook") chapters = scrape_result.get("chapters", []) or [] total = len(chapters) log(f"[CTRL] Book '{title}' → {total} chapters (book_id={book_id})") # ------------------------------------------------------------ # INIT PROGRESS # ------------------------------------------------------------ set_total(book_id, total) log(f"[CTRL] Progress initialized for {book_id}: total={total}") # ------------------------------------------------------------ # BUILD CONTROLLER # ------------------------------------------------------------ ctl = DownloadController(book_id, scrape_result) # ------------------------------------------------------------ # RUN PIPELINE IN SYNC LOOP # (DownloadController.start() returns per-chapter generator) # ------------------------------------------------------------ try: for result in ctl.start(): # new generator mode ch = result.get("chapter") if result.get("skipped"): inc_skipped(book_id) inc_completed(book_id) log(f"[CTRL] SKIPPED chapter {ch}") continue if result.get("failed"): inc_failed(book_id) inc_completed(book_id) log(f"[CTRL] FAILED chapter {ch}") continue # Normal success inc_completed(book_id) log(f"[CTRL] DONE chapter {ch}") # Abort requested mid-run? if abort_requested(book_id): log(f"[CTRL] ABORT after chapter {ch}") break except Exception as exc: log(f"[CTRL] ERROR while processing pipeline: {exc}") inc_failed(book_id) raise # ------------------------------------------------------------ # FINISHED # ------------------------------------------------------------ log(f"[CTRL] Pipeline finished for book_id={book_id}") return { "book_id": book_id, "total": total, "completed": int(total), # For safety }