# scraper/tasks/download_tasks.py from celery_app import celery_app from logbus.publisher import log import requests import os import time import redis from scraper.utils import get_save_path print(">>> [IMPORT] download_tasks.py loaded") # --------------------------- # Retry parameters from .env # --------------------------- MAX_RETRIES = int(os.getenv("DOWNLOAD_MAX_RETRIES", "7")) BASE_DELAY = int(os.getenv("DOWNLOAD_BASE_DELAY", "2")) BACKOFF = int(os.getenv("DOWNLOAD_BACKOFF_MULTIPLIER", "2")) DELAY_429 = int(os.getenv("DOWNLOAD_429_DELAY", "10")) # --------------------------- # GLOBAL CONCURRENCY LIMIT # --------------------------- MAX_CONCURRENCY = int(os.getenv("DOWNLOAD_MAX_GLOBAL_CONCURRENCY", "1")) # --------------------------- # GLOBAL MINIMUM DELAY # --------------------------- GLOBAL_DELAY = int(os.getenv("DOWNLOAD_GLOBAL_MIN_DELAY", "1")) DELAY_KEY = "download:delay_lock" # --------------------------- # Redis connection # --------------------------- REDIS_URL = os.getenv("REDIS_BROKER", "redis://redis:6379/0") redis_client = redis.Redis.from_url(REDIS_URL) SEM_KEY = "download:active" # semaphore counter # ====================================================== # GLOBAL DELAY FUNCTIONS # ====================================================== def wait_for_global_delay(): """Block while delay lock exists.""" if GLOBAL_DELAY <= 0: return while redis_client.exists(DELAY_KEY): time.sleep(0.1) def set_global_delay(): """Set TTL lock after a download completes.""" if GLOBAL_DELAY <= 0: return redis_client.set(DELAY_KEY, "1", nx=True, ex=GLOBAL_DELAY) # ====================================================== # GLOBAL CONCURRENCY FUNCTIONS # ====================================================== def acquire_global_slot(max_slots: int, retry_delay: float = 0.5): """Semaphore using Redis, atomic INCR.""" while True: current = redis_client.incr(SEM_KEY) if current <= max_slots: return # acquired OK redis_client.decr(SEM_KEY) time.sleep(retry_delay) def release_global_slot(): redis_client.decr(SEM_KEY) print(f">>> [CONFIG] Global concurrency = {MAX_CONCURRENCY}") print(f">>> [CONFIG] Global min delay = {GLOBAL_DELAY}s") print( f">>> [CONFIG] download retries = " f"max={MAX_RETRIES}, base={BASE_DELAY}, backoff={BACKOFF}, 429={DELAY_429}" ) # ====================================================== # CELERY TASK # ====================================================== @celery_app.task( bind=True, queue="download", ignore_result=False, ) def download_chapter(self, chapter_num: int, chapter_url: str, base_path: str): """ Download chapter HTML. Ensures: - global delay throttle - global concurrency limit - hard blocking on 429 """ # HARD DELAY at every execution to prevent retry storms if GLOBAL_DELAY > 0: time.sleep(GLOBAL_DELAY) save_path = get_save_path(chapter_num, base_path) # ------------------------------------------------------------------ # 1. SKIP IF EXISTS (still delay to maintain consistency) # ------------------------------------------------------------------ if os.path.exists(save_path): wait_for_global_delay() set_global_delay() log(f"[DL] SKIP chapter {chapter_num} (exists) → {save_path}") return { "chapter": chapter_num, "url": chapter_url, "html": None, "skipped": True, "path": save_path, } # ------------------------------------------------------------------ # 2. GLOBAL DELAY SYNC # ------------------------------------------------------------------ wait_for_global_delay() # ------------------------------------------------------------------ # 3. GLOBAL CONCURRENCY # ------------------------------------------------------------------ acquire_global_slot(MAX_CONCURRENCY) log(f"[DL] ACQUIRED SLOT for chapter {chapter_num}") try: # ------------------------------------------------------------------ # 4. DO THE DOWNLOAD # ------------------------------------------------------------------ log(f"[DL] Downloading chapter {chapter_num}: {chapter_url}") resp = requests.get( chapter_url, headers={"User-Agent": "Mozilla/5.0"}, timeout=20, ) resp.raise_for_status() resp.encoding = resp.apparent_encoding or "gb2312" html = resp.text log(f"[DL] OK {chapter_num}: {len(html)} bytes") return { "chapter": chapter_num, "url": chapter_url, "html": html, "skipped": False, "path": save_path, } except Exception as exc: attempt = self.request.retries delay = BASE_DELAY * (BACKOFF**attempt) # ============================================================= # HARD 429 BLOCK — DO NOT RELEASE SLOT YET # ============================================================= if ( hasattr(exc, "response") and getattr(exc.response, "status_code", None) == 429 ): log( f"[DL] 429 Too Many Requests → HARD WAIT {DELAY_429}s " f"(attempt {attempt}/{MAX_RETRIES})" ) # HARD BLOCK: worker sleeps, still holding the slot time.sleep(DELAY_429) # After 429 wait, also apply global delay set_global_delay() # Retry immediately (countdown=0) raise self.retry(exc=exc, countdown=0, max_retries=MAX_RETRIES) # ============================================================= # NORMAL ERRORS # ============================================================= log( f"[DL] ERROR on {chapter_url}: {exc} → retry in {delay}s " f"(attempt {attempt}/{MAX_RETRIES})" ) raise self.retry(exc=exc, countdown=delay, max_retries=MAX_RETRIES) finally: # ============================================================= # ALWAYS RELEASE SLOT AFTER HARD BLOCK / NORMAL WORK # ============================================================= set_global_delay() release_global_slot() log(f"[DL] RELEASED SLOT for chapter {chapter_num}")