You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
kmftools/bookscraper/scraper/tasks/controller_tasks.py

82 lines
2.9 KiB

# ============================================================
# File: scraper/tasks/controller_tasks.py
# Purpose:
# Start the download → parse → save pipeline for a scraped book,
# including progress/abort tracking via book_id.
# ONLY THE CONTROLLER UPDATES PROGRESS (initial total).
# ============================================================
from celery_app import celery_app
from logbus.publisher import log
from scraper.download_controller import DownloadController
from scraper.progress import (
set_total,
)
from scraper.abort import abort_requested
print(">>> [IMPORT] controller_tasks.py loaded")
@celery_app.task(bind=True, queue="controller", ignore_result=False)
def launch_downloads(self, book_id: str, scrape_result: dict):
"""
Launch the entire pipeline (download → parse → save),
AND initialize progress counters.
Chapter-level progress is updated INSIDE the download/parse/save tasks.
This task MUST NOT call .get() on async subtasks (Celery restriction).
"""
title = scrape_result.get("title", "UnknownBook")
chapters = scrape_result.get("chapters", []) or []
total = len(chapters)
log(f"[CTRL] Book '{title}'{total} chapters (book_id={book_id})")
# ------------------------------------------------------------
# INIT PROGRESS
# ------------------------------------------------------------
set_total(book_id, total)
log(f"[CTRL] Progress initialized for {book_id}: total={total}")
# ------------------------------------------------------------
# BUILD CONTROLLER
# ------------------------------------------------------------
ctl = DownloadController(book_id, scrape_result)
# ------------------------------------------------------------
# START PIPELINES (ASYNC)
# Returns a celery group AsyncResult. We DO NOT iterate or get().
# Progress & failures are handled by the worker subtasks.
# ------------------------------------------------------------
try:
group_result = ctl.start()
log(
f"[CTRL] Pipelines dispatched for '{title}' "
f"(book_id={book_id}, group_id={group_result.id})"
)
# Abort flag set BEFORE tasks start?
if abort_requested(book_id):
log(f"[CTRL] ABORT requested before tasks start")
return {"book_id": book_id, "aborted": True}
except Exception as exc:
log(f"[CTRL] ERROR while dispatching pipelines: {exc}")
raise
# ------------------------------------------------------------
# CONTROLLER DOES NOT WAIT FOR SUBTASK RESULTS
# (Download/parse/save tasks update progress themselves)
# ------------------------------------------------------------
log(f"[CTRL] Controller finished dispatch for book_id={book_id}")
return {
"book_id": book_id,
"total": total,
"started": True,
"group_id": group_result.id,
}