# ============================================================ # File: scraper/tasks/download_tasks.py # Purpose: # Download chapter HTML into payload["html"]. # Updated for book_idx unified ID model. # ============================================================ from celery_app import celery_app from scraper.utils.utils import get_save_path from scraper.abort import abort_requested, chapter_started, mark_chapter_started # Unified repository façade from db.repository import ( set_status, inc_download_done, inc_download_skipped, ) from logbus.publisher import log from scraper.ui_log import push_ui from scraper.logger_decorators import logcall import requests import redis import os import time from datetime import datetime print(">>> [IMPORT] download_tasks.py loaded") # ----------------------------------------------------------- # TIMESTAMPED LOG WRAPPER # ----------------------------------------------------------- def log_msg(book_idx: str, message: str): ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") full = f"{ts} [{book_idx}] {message}" log(full) push_ui(full) # ----------------------------------------------------------- # ENV CONFIG # ----------------------------------------------------------- MAX_RETRIES = int(os.getenv("DOWNLOAD_MAX_RETRIES", "7")) BASE_DELAY = int(os.getenv("DOWNLOAD_BASE_DELAY", "2")) BACKOFF = int(os.getenv("DOWNLOAD_BACKOFF_MULTIPLIER", "2")) DELAY_429 = int(os.getenv("DOWNLOAD_429_DELAY", "10")) MAX_CONCURRENCY = int(os.getenv("DOWNLOAD_MAX_GLOBAL_CONCURRENCY", "1")) GLOBAL_DELAY = int(os.getenv("DOWNLOAD_GLOBAL_MIN_DELAY", "1")) REDIS_URL = os.getenv("REDIS_BROKER", "redis://redis:6379/0") redis_client = redis.Redis.from_url(REDIS_URL) SEM_KEY = "download:active" DELAY_KEY = "download:delay_lock" # ----------------------------------------------------------- # DELAY + CONCURRENCY HELPERS # ----------------------------------------------------------- def wait_for_global_delay(): if GLOBAL_DELAY <= 0: return while redis_client.exists(DELAY_KEY): time.sleep(0.1) def set_global_delay(): if GLOBAL_DELAY <= 0: return redis_client.set(DELAY_KEY, "1", nx=True, ex=GLOBAL_DELAY) def acquire_global_slot(max_slots: int, retry_delay: float = 0.5): while True: current = redis_client.incr(SEM_KEY) if current <= max_slots: return redis_client.decr(SEM_KEY) time.sleep(retry_delay) def release_global_slot(): redis_client.decr(SEM_KEY) # ============================================================ # CELERY TASK — Payload v3 (book_idx model) # ============================================================ @celery_app.task(bind=True, queue="download", ignore_result=False) @logcall def download_chapter(self, payload: dict): """ Payload format: { "book_idx": str, "chapter": { "num": int, "title": str, "url": str, "volume_path": str }, "book_meta": dict, # fields filled during pipeline: "html": None | str, "parsed": None | str, "skipped": bool, "path": None | str } """ if not payload: raise ValueError("download_chapter received empty payload") book_idx = payload["book_idx"] chapter = payload["chapter"] book_meta = payload.get("book_meta") or {} chapter_num = chapter["num"] chapter_url = chapter["url"] chapter_title = chapter.get("title") or f"Chapter {chapter_num}" volume_path = chapter["volume_path"] # ----------------------------------------------------------- # STATUS UPDATE (book is now in 'downloading') # ----------------------------------------------------------- set_status(book_idx, "downloading") # ----------------------------------------------------------- # ABORT CHECK (skip if not yet started) # ----------------------------------------------------------- if abort_requested(book_idx) and not chapter_started(book_idx, chapter_num): log_msg(book_idx, f"[ABORT] Skip chapter {chapter_num}") inc_download_skipped(book_idx) payload["html"] = None payload["skipped"] = True payload["path"] = None return payload mark_chapter_started(book_idx, chapter_num) # ----------------------------------------------------------- # SKIP IF FILE ALREADY EXISTS # ----------------------------------------------------------- save_path = get_save_path(chapter_num, volume_path) if os.path.exists(save_path): log_msg(book_idx, f"[DL] SKIP {chapter_num} → {save_path}") inc_download_skipped(book_idx) payload["html"] = None payload["skipped"] = True payload["path"] = save_path return payload # ----------------------------------------------------------- # GLOBAL DELAY + CONCURRENCY # ----------------------------------------------------------- if GLOBAL_DELAY > 0: time.sleep(GLOBAL_DELAY) wait_for_global_delay() acquire_global_slot(MAX_CONCURRENCY) # ----------------------------------------------------------- # HTTP DOWNLOAD # ----------------------------------------------------------- try: log_msg(book_idx, f"[DL] Downloading {chapter_num} ({chapter_title})") resp = requests.get( chapter_url, headers={"User-Agent": "Mozilla/5.0"}, timeout=20, ) resp.raise_for_status() resp.encoding = resp.apparent_encoding or "gb2312" html = resp.text log_msg(book_idx, f"[DL] OK {chapter_num}: {len(html)} bytes") payload["html"] = html payload["skipped"] = False payload["path"] = save_path return payload except Exception as exc: attempt = self.request.retries delay = BASE_DELAY * (BACKOFF**attempt) # Handle 429 if getattr(getattr(exc, "response", None), "status_code", None) == 429: log_msg(book_idx, f"[DL] 429 → WAIT {DELAY_429}s") time.sleep(DELAY_429) set_global_delay() raise self.retry(exc=exc, countdown=0, max_retries=MAX_RETRIES) # General retry with backoff log_msg(book_idx, f"[DL] ERROR {chapter_num}: {exc} → retry {delay}s") raise self.retry(exc=exc, countdown=delay, max_retries=MAX_RETRIES) finally: set_global_delay() release_global_slot()