# ============================================================ # File: scraper/tasks/download_tasks.py # Purpose: Download chapter HTML with global concurrency, # retry/backoff logic, 429 support, and abort-awareness. # # Logging: # - timestamp + book_id in de message # - message wordt via publisher.py naar console gestuurd # - message wordt via ui_log.push_ui naar Redis GUI logbuffer gestuurd # # publisher.py en ui_log.py blijven DOM. # ============================================================ from celery_app import celery_app from scraper.utils import get_save_path from scraper.abort import abort_requested, chapter_started, mark_chapter_started from logbus.publisher import log # console logging (DOM) from scraper.ui_log import push_ui # GUI logging (DOM) import requests import redis import os import time from datetime import datetime print(">>> [IMPORT] download_tasks.py loaded") # ----------------------------------------------------------- # TIMESTAMPED LOG WRAPPER # ----------------------------------------------------------- def log_msg(book_id: str, message: str): ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") full = f"{ts} [{book_id}] {message}" log(full) push_ui(full) # ----------------------------------------------------------- # Retry parameters (ENV) # ----------------------------------------------------------- MAX_RETRIES = int(os.getenv("DOWNLOAD_MAX_RETRIES", "7")) BASE_DELAY = int(os.getenv("DOWNLOAD_BASE_DELAY", "2")) BACKOFF = int(os.getenv("DOWNLOAD_BACKOFF_MULTIPLIER", "2")) DELAY_429 = int(os.getenv("DOWNLOAD_429_DELAY", "10")) # ----------------------------------------------------------- # Global concurrency # ----------------------------------------------------------- MAX_CONCURRENCY = int(os.getenv("DOWNLOAD_MAX_GLOBAL_CONCURRENCY", "1")) # ----------------------------------------------------------- # Global delay sync # ----------------------------------------------------------- GLOBAL_DELAY = int(os.getenv("DOWNLOAD_GLOBAL_MIN_DELAY", "1")) DELAY_KEY = "download:delay_lock" # ----------------------------------------------------------- # Redis # ----------------------------------------------------------- REDIS_URL = os.getenv("REDIS_BROKER", "redis://redis:6379/0") redis_client = redis.Redis.from_url(REDIS_URL) SEM_KEY = "download:active" # ============================================================ # GLOBAL DELAY FUNCTIONS # ============================================================ def wait_for_global_delay(): if GLOBAL_DELAY <= 0: return while redis_client.exists(DELAY_KEY): time.sleep(0.1) def set_global_delay(): if GLOBAL_DELAY <= 0: return redis_client.set(DELAY_KEY, "1", nx=True, ex=GLOBAL_DELAY) # ============================================================ # GLOBAL CONCURRENCY FUNCTIONS # ============================================================ def acquire_global_slot(max_slots: int, retry_delay: float = 0.5): while True: current = redis_client.incr(SEM_KEY) if current <= max_slots: return redis_client.decr(SEM_KEY) time.sleep(retry_delay) def release_global_slot(): redis_client.decr(SEM_KEY) print(f">>> [CONFIG] Global concurrency = {MAX_CONCURRENCY}") print(f">>> [CONFIG] Global min delay = {GLOBAL_DELAY}s") print( f">>> [CONFIG] Retries: MAX={MAX_RETRIES}, base={BASE_DELAY}, " f"backoff={BACKOFF}, 429={DELAY_429}" ) # ============================================================ # CELERY TASK: DOWNLOAD CHAPTER # ============================================================ @celery_app.task(bind=True, queue="download", ignore_result=False) def download_chapter( self, book_id: str, chapter_num: int, chapter_url: str, base_path: str ): """ Download chapter HTML. Abort logic: - If abort active AND chapter not started → SKIP - If abort active BUT chapter already started → Proceed normally """ # ----------------------------------------------------------- # ABORT BEFORE START # ----------------------------------------------------------- if abort_requested(book_id) and not chapter_started(book_id, chapter_num): msg = f"[ABORT] Skip chapter {chapter_num} (abort active, not started)" log_msg(book_id, msg) return { "book_id": book_id, "chapter": chapter_num, "url": chapter_url, "html": None, "skipped": True, "path": None, "abort": True, } # Mark started mark_chapter_started(book_id, chapter_num) # ----------------------------------------------------------- # NEW POSITION FOR SKIP BLOCK (before any delay logic) # ----------------------------------------------------------- save_path = get_save_path(chapter_num, base_path) if os.path.exists(save_path): log_msg(book_id, f"[DL] SKIP {chapter_num} (exists) → {save_path}") return { "book_id": book_id, "chapter": chapter_num, "url": chapter_url, "html": None, "skipped": True, "path": save_path, } # ----------------------------------------------------------- # Hard delay (only for real downloads) # ----------------------------------------------------------- if GLOBAL_DELAY > 0: time.sleep(GLOBAL_DELAY) # Sync delay wait_for_global_delay() # Acquire concurrency slot acquire_global_slot(MAX_CONCURRENCY) log_msg(book_id, f"[DL] ACQUIRED SLOT for chapter {chapter_num}") try: # ----------------------------------------------------------- # HTTP DOWNLOAD # ----------------------------------------------------------- log_msg(book_id, f"[DL] Downloading chapter {chapter_num}: {chapter_url}") resp = requests.get( chapter_url, headers={"User-Agent": "Mozilla/5.0"}, timeout=20, ) resp.raise_for_status() resp.encoding = resp.apparent_encoding or "gb2312" html = resp.text log_msg(book_id, f"[DL] OK {chapter_num}: {len(html)} bytes") return { "book_id": book_id, "chapter": chapter_num, "url": chapter_url, "html": html, "skipped": False, "path": save_path, } except Exception as exc: attempt = self.request.retries delay = BASE_DELAY * (BACKOFF**attempt) # 429 hard block if getattr(getattr(exc, "response", None), "status_code", None) == 429: log_msg( book_id, f"[DL] 429 {chapter_num} → WAIT {DELAY_429}s " f"(attempt {attempt}/{MAX_RETRIES})", ) time.sleep(DELAY_429) set_global_delay() raise self.retry(exc=exc, countdown=0, max_retries=MAX_RETRIES) # Normal error log_msg( book_id, f"[DL] ERROR {chapter_num}: {exc} → retry in {delay}s " f"(attempt {attempt}/{MAX_RETRIES})", ) raise self.retry(exc=exc, countdown=delay, max_retries=MAX_RETRIES) finally: set_global_delay() release_global_slot() log_msg(book_id, f"[DL] RELEASED SLOT for chapter {chapter_num}")