You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
kmftools/bookscraper/scraper/tasks/download_tasks.py

200 lines
6.1 KiB

# scraper/tasks/download_tasks.py
from celery_app import celery_app
from logbus.publisher import log
import requests
import os
import time
import redis
from scraper.utils import get_save_path
print(">>> [IMPORT] download_tasks.py loaded")
# ---------------------------
# Retry parameters from .env
# ---------------------------
MAX_RETRIES = int(os.getenv("DOWNLOAD_MAX_RETRIES", "7"))
BASE_DELAY = int(os.getenv("DOWNLOAD_BASE_DELAY", "2"))
BACKOFF = int(os.getenv("DOWNLOAD_BACKOFF_MULTIPLIER", "2"))
DELAY_429 = int(os.getenv("DOWNLOAD_429_DELAY", "10"))
# ---------------------------
# GLOBAL CONCURRENCY LIMIT
# ---------------------------
MAX_CONCURRENCY = int(os.getenv("DOWNLOAD_MAX_GLOBAL_CONCURRENCY", "1"))
# ---------------------------
# GLOBAL MINIMUM DELAY
# ---------------------------
GLOBAL_DELAY = int(os.getenv("DOWNLOAD_GLOBAL_MIN_DELAY", "1"))
DELAY_KEY = "download:delay_lock"
# ---------------------------
# Redis connection
# ---------------------------
REDIS_URL = os.getenv("REDIS_BROKER", "redis://redis:6379/0")
redis_client = redis.Redis.from_url(REDIS_URL)
SEM_KEY = "download:active" # semaphore counter
# ======================================================
# GLOBAL DELAY FUNCTIONS
# ======================================================
def wait_for_global_delay():
"""
Block until no global delay lock exists.
Prevents hammering the server too fast.
"""
if GLOBAL_DELAY <= 0:
return
while redis_client.exists(DELAY_KEY):
time.sleep(0.1)
def set_global_delay():
"""
After finishing a download (or skip),
set a TTL lock so all workers wait a minimum time.
"""
if GLOBAL_DELAY <= 0:
return
# SET key NX EX:
# - only set if not existing
# - expires automatically
redis_client.set(DELAY_KEY, "1", nx=True, ex=GLOBAL_DELAY)
# ======================================================
# GLOBAL CONCURRENCY FUNCTIONS
# ======================================================
def acquire_global_slot(max_slots: int, retry_delay: float = 0.5):
"""
GLOBAL semaphore with Redis.
Atomic INCR. If limit exceeded, undo & wait.
"""
while True:
current = redis_client.incr(SEM_KEY)
if current <= max_slots:
return # acquired OK
redis_client.decr(SEM_KEY)
time.sleep(retry_delay)
def release_global_slot():
"""Release semaphore."""
redis_client.decr(SEM_KEY)
print(f">>> [CONFIG] Global concurrency = {MAX_CONCURRENCY}")
print(f">>> [CONFIG] Global min delay = {GLOBAL_DELAY}s")
print(
f">>> [CONFIG] download retries = "
f"max={MAX_RETRIES}, base={BASE_DELAY}, backoff={BACKOFF}, 429={DELAY_429}"
)
# ======================================================
# CELERY TASK
# ======================================================
@celery_app.task(
bind=True,
queue="download",
ignore_result=False,
)
def download_chapter(self, chapter_num: int, chapter_url: str, base_path: str):
"""
base_path komt uit pipeline.py
Download wordt SKIPPED als het bestand al bestaat.
"""
save_path = get_save_path(chapter_num, base_path)
# ------------------------------------------------------------------
# 1. SKIP IF EXISTS — maar WEL global delay zetten!
# ------------------------------------------------------------------
if os.path.exists(save_path):
wait_for_global_delay()
set_global_delay()
log(f"[DL] SKIP chapter {chapter_num} (exists) → {save_path}")
return {
"chapter": chapter_num,
"url": chapter_url,
"html": None,
"skipped": True,
"path": save_path,
}
# ------------------------------------------------------------------
# 2. GLOBAL DELAY — throttle downloads globally
# ------------------------------------------------------------------
wait_for_global_delay()
# ------------------------------------------------------------------
# 3. GLOBAL CONCURRENCY — only X downloads at the same time
# ------------------------------------------------------------------
acquire_global_slot(MAX_CONCURRENCY)
log(f"[DL] ACQUIRED SLOT for chapter {chapter_num}")
try:
# ------------------------------------------------------------------
# 4. DOWNLOAD LOGIC
# ------------------------------------------------------------------
log(f"[DL] Downloading chapter {chapter_num}: {chapter_url}")
resp = requests.get(
chapter_url,
headers={"User-Agent": "Mozilla/5.0"},
timeout=20,
)
resp.raise_for_status()
resp.encoding = resp.apparent_encoding or "gb2312"
html = resp.text
log(f"[DL] OK {chapter_num}: {len(html)} bytes")
return {
"chapter": chapter_num,
"url": chapter_url,
"html": html,
"skipped": False,
"path": save_path,
}
except Exception as exc:
# ------------------------------------------------------------------
# 5. RETRY LOGIC
# ------------------------------------------------------------------
attempt = self.request.retries
delay = BASE_DELAY * (BACKOFF**attempt)
if (
hasattr(exc, "response")
and getattr(exc.response, "status_code", None) == 429
):
delay = DELAY_429 + delay
log(
f"[DL] 429 Too Many Requests → retry in {delay}s "
f"(attempt {attempt}/{MAX_RETRIES})"
)
raise self.retry(exc=exc, countdown=delay, max_retries=MAX_RETRIES)
log(
f"[DL] ERROR on {chapter_url}: {exc} → retry in {delay}s "
f"(attempt {attempt}/{MAX_RETRIES})"
)
raise self.retry(exc=exc, countdown=delay, max_retries=MAX_RETRIES)
finally:
# ------------------------------------------------------------------
# 6. ALWAYS set delay + release semaphore
# ------------------------------------------------------------------
set_global_delay()
release_global_slot()
log(f"[DL] RELEASED SLOT for chapter {chapter_num}")