|
|
|
|
@ -1,47 +1,61 @@
|
|
|
|
|
# scraper/tasks/download_tasks.py
|
|
|
|
|
# =========================================================
|
|
|
|
|
# File: scraper/tasks/download_tasks.py
|
|
|
|
|
# Purpose: Download chapter HTML with global concurrency,
|
|
|
|
|
# retry/backoff logic, 429 hard delay support,
|
|
|
|
|
# and abort-aware chapter skipping.
|
|
|
|
|
#
|
|
|
|
|
# Abort behavior implemented here:
|
|
|
|
|
# - If abort is active AND chapter not started → skip
|
|
|
|
|
# - If abort is active BUT chapter already started → complete normally
|
|
|
|
|
# (download → parse → save)
|
|
|
|
|
# =========================================================
|
|
|
|
|
|
|
|
|
|
from celery_app import celery_app
|
|
|
|
|
from logbus.publisher import log
|
|
|
|
|
from scraper.utils import get_save_path
|
|
|
|
|
from scraper.abort import abort_requested, chapter_started, mark_chapter_started
|
|
|
|
|
|
|
|
|
|
import requests
|
|
|
|
|
import os
|
|
|
|
|
import time
|
|
|
|
|
import redis
|
|
|
|
|
from scraper.utils import get_save_path
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print(">>> [IMPORT] download_tasks.py loaded")
|
|
|
|
|
|
|
|
|
|
# ---------------------------
|
|
|
|
|
# Retry parameters from .env
|
|
|
|
|
# ---------------------------
|
|
|
|
|
# ---------------------------------------------------------
|
|
|
|
|
# Retry parameters (.env)
|
|
|
|
|
# ---------------------------------------------------------
|
|
|
|
|
MAX_RETRIES = int(os.getenv("DOWNLOAD_MAX_RETRIES", "7"))
|
|
|
|
|
BASE_DELAY = int(os.getenv("DOWNLOAD_BASE_DELAY", "2"))
|
|
|
|
|
BACKOFF = int(os.getenv("DOWNLOAD_BACKOFF_MULTIPLIER", "2"))
|
|
|
|
|
DELAY_429 = int(os.getenv("DOWNLOAD_429_DELAY", "10"))
|
|
|
|
|
|
|
|
|
|
# ---------------------------
|
|
|
|
|
# GLOBAL CONCURRENCY LIMIT
|
|
|
|
|
# ---------------------------
|
|
|
|
|
# ---------------------------------------------------------
|
|
|
|
|
# Global concurrency (.env)
|
|
|
|
|
# ---------------------------------------------------------
|
|
|
|
|
MAX_CONCURRENCY = int(os.getenv("DOWNLOAD_MAX_GLOBAL_CONCURRENCY", "1"))
|
|
|
|
|
|
|
|
|
|
# ---------------------------
|
|
|
|
|
# GLOBAL MINIMUM DELAY
|
|
|
|
|
# ---------------------------
|
|
|
|
|
# ---------------------------------------------------------
|
|
|
|
|
# Global minimum delay (.env)
|
|
|
|
|
# ---------------------------------------------------------
|
|
|
|
|
GLOBAL_DELAY = int(os.getenv("DOWNLOAD_GLOBAL_MIN_DELAY", "1"))
|
|
|
|
|
DELAY_KEY = "download:delay_lock"
|
|
|
|
|
|
|
|
|
|
# ---------------------------
|
|
|
|
|
# ---------------------------------------------------------
|
|
|
|
|
# Redis connection
|
|
|
|
|
# ---------------------------
|
|
|
|
|
# ---------------------------------------------------------
|
|
|
|
|
REDIS_URL = os.getenv("REDIS_BROKER", "redis://redis:6379/0")
|
|
|
|
|
redis_client = redis.Redis.from_url(REDIS_URL)
|
|
|
|
|
|
|
|
|
|
SEM_KEY = "download:active" # semaphore counter
|
|
|
|
|
SEM_KEY = "download:active" # semaphore key
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ======================================================
|
|
|
|
|
# =========================================================
|
|
|
|
|
# GLOBAL DELAY FUNCTIONS
|
|
|
|
|
# ======================================================
|
|
|
|
|
# =========================================================
|
|
|
|
|
def wait_for_global_delay():
|
|
|
|
|
"""Block while delay lock exists."""
|
|
|
|
|
"""Block while delay-lock exists."""
|
|
|
|
|
if GLOBAL_DELAY <= 0:
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
@ -50,23 +64,21 @@ def wait_for_global_delay():
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def set_global_delay():
|
|
|
|
|
"""Set TTL lock after a download completes."""
|
|
|
|
|
"""Set TTL lock after completing download."""
|
|
|
|
|
if GLOBAL_DELAY <= 0:
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
redis_client.set(DELAY_KEY, "1", nx=True, ex=GLOBAL_DELAY)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ======================================================
|
|
|
|
|
# =========================================================
|
|
|
|
|
# GLOBAL CONCURRENCY FUNCTIONS
|
|
|
|
|
# ======================================================
|
|
|
|
|
# =========================================================
|
|
|
|
|
def acquire_global_slot(max_slots: int, retry_delay: float = 0.5):
|
|
|
|
|
"""Semaphore using Redis, atomic INCR."""
|
|
|
|
|
"""Semaphore using Redis atomic INCR."""
|
|
|
|
|
while True:
|
|
|
|
|
current = redis_client.incr(SEM_KEY)
|
|
|
|
|
if current <= max_slots:
|
|
|
|
|
return # acquired OK
|
|
|
|
|
|
|
|
|
|
return
|
|
|
|
|
redis_client.decr(SEM_KEY)
|
|
|
|
|
time.sleep(retry_delay)
|
|
|
|
|
|
|
|
|
|
@ -83,36 +95,59 @@ print(
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ======================================================
|
|
|
|
|
# CELERY TASK
|
|
|
|
|
# ======================================================
|
|
|
|
|
# =========================================================
|
|
|
|
|
# CELERY DOWNLOAD TASK
|
|
|
|
|
# =========================================================
|
|
|
|
|
@celery_app.task(
|
|
|
|
|
bind=True,
|
|
|
|
|
queue="download",
|
|
|
|
|
ignore_result=False,
|
|
|
|
|
)
|
|
|
|
|
def download_chapter(self, chapter_num: int, chapter_url: str, base_path: str):
|
|
|
|
|
def download_chapter(
|
|
|
|
|
self, book_id: str, chapter_num: int, chapter_url: str, base_path: str
|
|
|
|
|
):
|
|
|
|
|
"""
|
|
|
|
|
Download chapter HTML.
|
|
|
|
|
Ensures:
|
|
|
|
|
- global delay throttle
|
|
|
|
|
- global concurrency limit
|
|
|
|
|
- hard blocking on 429
|
|
|
|
|
|
|
|
|
|
Abort behavior:
|
|
|
|
|
- If abort is active AND this chapter has not started → skip immediately
|
|
|
|
|
- If abort is active BUT this chapter already started → finish download/parse/save
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
# HARD DELAY at every execution to prevent retry storms
|
|
|
|
|
# ------------------------------------------------------------
|
|
|
|
|
# ABORT CHECK BEFORE ANYTHING STARTS
|
|
|
|
|
# ------------------------------------------------------------
|
|
|
|
|
if abort_requested(book_id) and not chapter_started(book_id, chapter_num):
|
|
|
|
|
log(f"[ABORT] Skip chapter {chapter_num} (abort active, not started)")
|
|
|
|
|
return {
|
|
|
|
|
"chapter": chapter_num,
|
|
|
|
|
"url": chapter_url,
|
|
|
|
|
"html": None,
|
|
|
|
|
"skipped": True,
|
|
|
|
|
"path": None,
|
|
|
|
|
"abort": True,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
# ------------------------------------------------------------
|
|
|
|
|
# MARK CHAPTER AS STARTED
|
|
|
|
|
# Ensures parse/save must always run even after abort is triggered.
|
|
|
|
|
# ------------------------------------------------------------
|
|
|
|
|
mark_chapter_started(book_id, chapter_num)
|
|
|
|
|
|
|
|
|
|
# ------------------------------------------------------------
|
|
|
|
|
# HARD START DELAY
|
|
|
|
|
# ------------------------------------------------------------
|
|
|
|
|
if GLOBAL_DELAY > 0:
|
|
|
|
|
time.sleep(GLOBAL_DELAY)
|
|
|
|
|
|
|
|
|
|
save_path = get_save_path(chapter_num, base_path)
|
|
|
|
|
|
|
|
|
|
# ------------------------------------------------------------------
|
|
|
|
|
# 1. SKIP IF EXISTS (still delay to maintain consistency)
|
|
|
|
|
# ------------------------------------------------------------------
|
|
|
|
|
# ------------------------------------------------------------
|
|
|
|
|
# SKIP IF EXISTS
|
|
|
|
|
# ------------------------------------------------------------
|
|
|
|
|
if os.path.exists(save_path):
|
|
|
|
|
wait_for_global_delay()
|
|
|
|
|
set_global_delay()
|
|
|
|
|
|
|
|
|
|
log(f"[DL] SKIP chapter {chapter_num} (exists) → {save_path}")
|
|
|
|
|
return {
|
|
|
|
|
"chapter": chapter_num,
|
|
|
|
|
@ -122,21 +157,21 @@ def download_chapter(self, chapter_num: int, chapter_url: str, base_path: str):
|
|
|
|
|
"path": save_path,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
# ------------------------------------------------------------------
|
|
|
|
|
# 2. GLOBAL DELAY SYNC
|
|
|
|
|
# ------------------------------------------------------------------
|
|
|
|
|
# ------------------------------------------------------------
|
|
|
|
|
# GLOBAL DELAY SYNC
|
|
|
|
|
# ------------------------------------------------------------
|
|
|
|
|
wait_for_global_delay()
|
|
|
|
|
|
|
|
|
|
# ------------------------------------------------------------------
|
|
|
|
|
# 3. GLOBAL CONCURRENCY
|
|
|
|
|
# ------------------------------------------------------------------
|
|
|
|
|
# ------------------------------------------------------------
|
|
|
|
|
# GLOBAL CONCURRENCY
|
|
|
|
|
# ------------------------------------------------------------
|
|
|
|
|
acquire_global_slot(MAX_CONCURRENCY)
|
|
|
|
|
log(f"[DL] ACQUIRED SLOT for chapter {chapter_num}")
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
# ------------------------------------------------------------------
|
|
|
|
|
# 4. DO THE DOWNLOAD
|
|
|
|
|
# ------------------------------------------------------------------
|
|
|
|
|
# ------------------------------------------------------------
|
|
|
|
|
# ACTUAL DOWNLOAD
|
|
|
|
|
# ------------------------------------------------------------
|
|
|
|
|
log(f"[DL] Downloading chapter {chapter_num}: {chapter_url}")
|
|
|
|
|
|
|
|
|
|
resp = requests.get(
|
|
|
|
|
@ -163,31 +198,26 @@ def download_chapter(self, chapter_num: int, chapter_url: str, base_path: str):
|
|
|
|
|
attempt = self.request.retries
|
|
|
|
|
delay = BASE_DELAY * (BACKOFF**attempt)
|
|
|
|
|
|
|
|
|
|
# =============================================================
|
|
|
|
|
# HARD 429 BLOCK — DO NOT RELEASE SLOT YET
|
|
|
|
|
# =============================================================
|
|
|
|
|
# ------------------------------------------------------------
|
|
|
|
|
# 429 HANDLING
|
|
|
|
|
# ------------------------------------------------------------
|
|
|
|
|
if (
|
|
|
|
|
hasattr(exc, "response")
|
|
|
|
|
and getattr(exc.response, "status_code", None) == 429
|
|
|
|
|
):
|
|
|
|
|
|
|
|
|
|
log(
|
|
|
|
|
f"[DL] 429 Too Many Requests → HARD WAIT {DELAY_429}s "
|
|
|
|
|
f"[DL] 429 → HARD WAIT {DELAY_429}s "
|
|
|
|
|
f"(attempt {attempt}/{MAX_RETRIES})"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# HARD BLOCK: worker sleeps, still holding the slot
|
|
|
|
|
time.sleep(DELAY_429)
|
|
|
|
|
|
|
|
|
|
# After 429 wait, also apply global delay
|
|
|
|
|
set_global_delay()
|
|
|
|
|
|
|
|
|
|
# Retry immediately (countdown=0)
|
|
|
|
|
raise self.retry(exc=exc, countdown=0, max_retries=MAX_RETRIES)
|
|
|
|
|
|
|
|
|
|
# =============================================================
|
|
|
|
|
# ------------------------------------------------------------
|
|
|
|
|
# NORMAL ERRORS
|
|
|
|
|
# =============================================================
|
|
|
|
|
# ------------------------------------------------------------
|
|
|
|
|
log(
|
|
|
|
|
f"[DL] ERROR on {chapter_url}: {exc} → retry in {delay}s "
|
|
|
|
|
f"(attempt {attempt}/{MAX_RETRIES})"
|
|
|
|
|
@ -195,9 +225,6 @@ def download_chapter(self, chapter_num: int, chapter_url: str, base_path: str):
|
|
|
|
|
raise self.retry(exc=exc, countdown=delay, max_retries=MAX_RETRIES)
|
|
|
|
|
|
|
|
|
|
finally:
|
|
|
|
|
# =============================================================
|
|
|
|
|
# ALWAYS RELEASE SLOT AFTER HARD BLOCK / NORMAL WORK
|
|
|
|
|
# =============================================================
|
|
|
|
|
set_global_delay()
|
|
|
|
|
release_global_slot()
|
|
|
|
|
log(f"[DL] RELEASED SLOT for chapter {chapter_num}")
|
|
|
|
|
|