You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
168 lines
5.6 KiB
168 lines
5.6 KiB
# ============================================================
|
|
# File: scraper/tasks/controller_tasks.py
|
|
# Purpose:
|
|
# FULL scrape entrypoint + launching download/parse/save pipelines.
|
|
# NO result.get() anywhere. Scraping is done inline.
|
|
# ============================================================
|
|
|
|
from celery_app import celery_app
|
|
from logbus.publisher import log
|
|
|
|
import os
|
|
import time
|
|
import redis
|
|
from urllib.parse import urlparse
|
|
|
|
from scraper.logger_decorators import logcall
|
|
from scraper.abort import abort_requested
|
|
|
|
from scraper.services.scrape_engine import ScrapeEngine
|
|
from scraper.services.site_resolver import SiteResolver
|
|
|
|
from db.repository import fetch_book, set_chapters_total
|
|
from scraper.download_controller import DownloadController
|
|
|
|
|
|
print(">>> [IMPORT] controller_tasks.py loaded")
|
|
|
|
|
|
# =============================================================
|
|
# 1) PUBLIC ENTRYPOINT — CALLED FROM /start
|
|
# =============================================================
|
|
@celery_app.task(
|
|
bind=True,
|
|
queue="controller",
|
|
ignore_result=False,
|
|
name="scraper.tasks.controller_tasks.start_full_scrape",
|
|
)
|
|
@logcall
|
|
def start_full_scrape(self, book_idx: str):
|
|
"""
|
|
FULL SCRAPE ENTRYPOINT.
|
|
Scraping is done inline → no Celery .get() needed.
|
|
"""
|
|
|
|
log(f"[CTRL] start_full_scrape(book_idx={book_idx})")
|
|
|
|
# Abort before doing anything
|
|
if abort_requested(book_idx):
|
|
log(f"[CTRL] PRE-ABORT flag detected for {book_idx}")
|
|
return {"book_idx": book_idx, "aborted": True, "reason": "pre-abort"}
|
|
|
|
# --------------------------------------------------------
|
|
# 1) Load book metadata from SQLite
|
|
# --------------------------------------------------------
|
|
book = fetch_book(book_idx)
|
|
if not book:
|
|
msg = f"[CTRL] Book '{book_idx}' not found in DB"
|
|
log(msg)
|
|
raise ValueError(msg)
|
|
|
|
url = book.get("book_url")
|
|
if not url:
|
|
msg = f"[CTRL] No book_url stored for {book_idx}"
|
|
log(msg)
|
|
raise ValueError(msg)
|
|
|
|
# --------------------------------------------------------
|
|
# 2) INLINE SCRAPE (fast, no Celery wait)
|
|
# --------------------------------------------------------
|
|
site = SiteResolver.resolve(url)
|
|
|
|
try:
|
|
scrape_result = ScrapeEngine.fetch_metadata_and_chapters(site, url)
|
|
log(f"[CTRL] Scrape OK for {book_idx}: {scrape_result.get('title')}")
|
|
except Exception as e:
|
|
log(f"[CTRL] ERROR during scrape of {book_idx}: {e}")
|
|
raise
|
|
|
|
# --------------------------------------------------------
|
|
# 3) Continue → dispatch pipelines
|
|
# --------------------------------------------------------
|
|
return launch_downloads(book_idx, scrape_result)
|
|
|
|
|
|
# =============================================================
|
|
# 2) PIPELINE DISPATCH (NOT a Celery task)
|
|
# =============================================================
|
|
@logcall
|
|
def launch_downloads(book_idx: str, scrape_result: dict):
|
|
"""
|
|
Launches the entire processing pipeline:
|
|
- initialize Redis UI state
|
|
- initialize SQLite totals
|
|
- dispatch per-chapter pipelines via DownloadController
|
|
"""
|
|
|
|
title = scrape_result.get("title", "UnknownBook")
|
|
chapters = scrape_result.get("chapters", []) or []
|
|
total = len(chapters)
|
|
|
|
# ------------------------------------------------------------
|
|
# INIT REDIS STATE
|
|
# ------------------------------------------------------------
|
|
broker_url = os.getenv("REDIS_BROKER", "redis://redis:6379/0")
|
|
parsed = urlparse(broker_url)
|
|
|
|
r = redis.Redis(
|
|
host=parsed.hostname,
|
|
port=parsed.port,
|
|
db=int(parsed.path.strip("/")),
|
|
decode_responses=True,
|
|
)
|
|
|
|
base = f"book:{book_idx}:state"
|
|
|
|
r.hset(base, "title", title)
|
|
r.hset(base, "status", "starting")
|
|
r.hset(base, "chapters_total", total)
|
|
r.hset(base, "chapters_download_done", 0)
|
|
r.hset(base, "chapters_download_skipped", 0)
|
|
r.hset(base, "chapters_parsed_done", 0)
|
|
r.hset(base, "audio_done", 0)
|
|
r.hset(base, "audio_skipped", 0)
|
|
r.hset(base, "last_update", int(time.time()))
|
|
|
|
# ------------------------------------------------------------
|
|
# INIT SQLITE SNAPSHOT
|
|
# ------------------------------------------------------------
|
|
try:
|
|
set_chapters_total(book_idx, total)
|
|
except Exception as e:
|
|
log(f"[CTRL] ERROR updating SQLite totals: {e}")
|
|
raise
|
|
|
|
log(f"[CTRL] Initialized totals for {book_idx}: {total}")
|
|
|
|
# ------------------------------------------------------------
|
|
# ABORT CHECK BEFORE LAUNCHING JOBS
|
|
# ------------------------------------------------------------
|
|
if abort_requested(book_idx):
|
|
log(f"[CTRL] ABORT flag detected — stopping BEFORE dispatch for {book_idx}")
|
|
r.hset(base, "status", "aborted")
|
|
return {"book_idx": book_idx, "aborted": True, "reason": "abort-before-start"}
|
|
|
|
# ------------------------------------------------------------
|
|
# BUILD + DISPATCH PER-CHAPTER PIPELINES
|
|
# ------------------------------------------------------------
|
|
controller = DownloadController(book_idx, scrape_result)
|
|
|
|
try:
|
|
group_result = controller.start()
|
|
gid = getattr(group_result, "id", None)
|
|
log(f"[CTRL] Pipelines dispatched for {book_idx} (group_id={gid})")
|
|
except Exception as e:
|
|
log(f"[CTRL] ERROR dispatching pipelines for {book_idx}: {e}")
|
|
raise
|
|
|
|
# Update UI state to "downloading"
|
|
r.hset(base, "status", "downloading")
|
|
r.hset(base, "last_update", int(time.time()))
|
|
|
|
return {
|
|
"book_idx": book_idx,
|
|
"total": total,
|
|
"started": True,
|
|
"group_id": gid,
|
|
}
|