# ============================================================ # File: scraper/tasks/controller_tasks.py # Purpose: # FULL scrape entrypoint + launching download/parse/save pipelines. # NO result.get() anywhere. Scraping is done inline. # ============================================================ from celery_app import celery_app from logbus.publisher import log import os import time import redis from urllib.parse import urlparse from scraper.logger_decorators import logcall from scraper.abort import abort_requested from scraper.services.scrape_engine import ScrapeEngine from scraper.services.site_resolver import SiteResolver from db.repository import fetch_book, set_chapters_total from scraper.download_controller import DownloadController print(">>> [IMPORT] controller_tasks.py loaded") # ============================================================= # 1) PUBLIC ENTRYPOINT — CALLED FROM /start # ============================================================= @celery_app.task( bind=True, queue="controller", ignore_result=False, name="scraper.tasks.controller_tasks.start_full_scrape", ) @logcall def start_full_scrape(self, book_idx: str): """ FULL SCRAPE ENTRYPOINT. Scraping is done inline → no Celery .get() needed. """ log(f"[CTRL] start_full_scrape(book_idx={book_idx})") # Abort before doing anything if abort_requested(book_idx): log(f"[CTRL] PRE-ABORT flag detected for {book_idx}") return {"book_idx": book_idx, "aborted": True, "reason": "pre-abort"} # -------------------------------------------------------- # 1) Load book metadata from SQLite # -------------------------------------------------------- book = fetch_book(book_idx) if not book: msg = f"[CTRL] Book '{book_idx}' not found in DB" log(msg) raise ValueError(msg) url = book.get("book_url") if not url: msg = f"[CTRL] No book_url stored for {book_idx}" log(msg) raise ValueError(msg) # -------------------------------------------------------- # 2) INLINE SCRAPE (fast, no Celery wait) # -------------------------------------------------------- site = SiteResolver.resolve(url) try: scrape_result = ScrapeEngine.fetch_metadata_and_chapters(site, url) log(f"[CTRL] Scrape OK for {book_idx}: {scrape_result.get('title')}") except Exception as e: log(f"[CTRL] ERROR during scrape of {book_idx}: {e}") raise # -------------------------------------------------------- # 3) Continue → dispatch pipelines # -------------------------------------------------------- return launch_downloads(book_idx, scrape_result) # ============================================================= # 2) PIPELINE DISPATCH (NOT a Celery task) # ============================================================= @logcall def launch_downloads(book_idx: str, scrape_result: dict): """ Launches the entire processing pipeline: - initialize Redis UI state - initialize SQLite totals - dispatch per-chapter pipelines via DownloadController """ title = scrape_result.get("title", "UnknownBook") chapters = scrape_result.get("chapters", []) or [] total = len(chapters) # ------------------------------------------------------------ # INIT REDIS STATE # ------------------------------------------------------------ broker_url = os.getenv("REDIS_BROKER", "redis://redis:6379/0") parsed = urlparse(broker_url) r = redis.Redis( host=parsed.hostname, port=parsed.port, db=int(parsed.path.strip("/")), decode_responses=True, ) base = f"book:{book_idx}:state" r.hset(base, "title", title) r.hset(base, "status", "starting") r.hset(base, "chapters_total", total) r.hset(base, "chapters_download_done", 0) r.hset(base, "chapters_download_skipped", 0) r.hset(base, "chapters_parsed_done", 0) r.hset(base, "audio_done", 0) r.hset(base, "audio_skipped", 0) r.hset(base, "last_update", int(time.time())) # ------------------------------------------------------------ # INIT SQLITE SNAPSHOT # ------------------------------------------------------------ try: set_chapters_total(book_idx, total) except Exception as e: log(f"[CTRL] ERROR updating SQLite totals: {e}") raise log(f"[CTRL] Initialized totals for {book_idx}: {total}") # ------------------------------------------------------------ # ABORT CHECK BEFORE LAUNCHING JOBS # ------------------------------------------------------------ if abort_requested(book_idx): log(f"[CTRL] ABORT flag detected — stopping BEFORE dispatch for {book_idx}") r.hset(base, "status", "aborted") return {"book_idx": book_idx, "aborted": True, "reason": "abort-before-start"} # ------------------------------------------------------------ # BUILD + DISPATCH PER-CHAPTER PIPELINES # ------------------------------------------------------------ controller = DownloadController(book_idx, scrape_result) try: group_result = controller.start() gid = getattr(group_result, "id", None) log(f"[CTRL] Pipelines dispatched for {book_idx} (group_id={gid})") except Exception as e: log(f"[CTRL] ERROR dispatching pipelines for {book_idx}: {e}") raise # Update UI state to "downloading" r.hset(base, "status", "downloading") r.hset(base, "last_update", int(time.time())) return { "book_idx": book_idx, "total": total, "started": True, "group_id": gid, }