You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
kmftools/bookscraper/scraper/tasks/download_tasks.py

213 lines
6.4 KiB

# ============================================================
# File: scraper/tasks/download_tasks.py
# Purpose:
# Download chapter HTML into payload["html"].
# Updated for book_idx unified ID model.
# ============================================================
from celery_app import celery_app
from scraper.utils.utils import get_save_path
from scraper.abort import abort_requested, chapter_started, mark_chapter_started
# Unified repository façade
from db.repository import (
set_status,
inc_download_done,
inc_download_skipped,
)
from logbus.publisher import log
from scraper.ui_log import push_ui
from scraper.logger_decorators import logcall
import requests
import redis
import os
import time
from datetime import datetime
print(">>> [IMPORT] download_tasks.py loaded")
# -----------------------------------------------------------
# TIMESTAMPED LOG WRAPPER
# -----------------------------------------------------------
def log_msg(book_idx: str, message: str):
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
full = f"{ts} [{book_idx}] {message}"
log(full)
push_ui(full)
# -----------------------------------------------------------
# ENV CONFIG
# -----------------------------------------------------------
MAX_RETRIES = int(os.getenv("DOWNLOAD_MAX_RETRIES", "7"))
BASE_DELAY = int(os.getenv("DOWNLOAD_BASE_DELAY", "2"))
BACKOFF = int(os.getenv("DOWNLOAD_BACKOFF_MULTIPLIER", "2"))
DELAY_429 = int(os.getenv("DOWNLOAD_429_DELAY", "10"))
MAX_CONCURRENCY = int(os.getenv("DOWNLOAD_MAX_GLOBAL_CONCURRENCY", "1"))
GLOBAL_DELAY = int(os.getenv("DOWNLOAD_GLOBAL_MIN_DELAY", "1"))
REDIS_URL = os.getenv("REDIS_BROKER", "redis://redis:6379/0")
redis_client = redis.Redis.from_url(REDIS_URL)
SEM_KEY = "download:active"
DELAY_KEY = "download:delay_lock"
# -----------------------------------------------------------
# DELAY + CONCURRENCY HELPERS
# -----------------------------------------------------------
def wait_for_global_delay():
if GLOBAL_DELAY <= 0:
return
while redis_client.exists(DELAY_KEY):
time.sleep(0.1)
def set_global_delay():
if GLOBAL_DELAY <= 0:
return
redis_client.set(DELAY_KEY, "1", nx=True, ex=GLOBAL_DELAY)
def acquire_global_slot(max_slots: int, retry_delay: float = 0.5):
while True:
current = redis_client.incr(SEM_KEY)
if current <= max_slots:
return
redis_client.decr(SEM_KEY)
time.sleep(retry_delay)
def release_global_slot():
redis_client.decr(SEM_KEY)
# ============================================================
# CELERY TASK — Payload v3 (book_idx model)
# ============================================================
@celery_app.task(bind=True, queue="download", ignore_result=False)
@logcall
def download_chapter(self, payload: dict):
"""
Payload format:
{
"book_idx": str,
"chapter": {
"num": int,
"title": str,
"url": str,
"volume_path": str
},
"book_meta": dict,
# fields filled during pipeline:
"html": None | str,
"parsed": None | str,
"skipped": bool,
"path": None | str
}
"""
if not payload:
raise ValueError("download_chapter received empty payload")
book_idx = payload["book_idx"]
chapter = payload["chapter"]
book_meta = payload.get("book_meta") or {}
chapter_num = chapter["num"]
chapter_url = chapter["url"]
chapter_title = chapter.get("title") or f"Chapter {chapter_num}"
volume_path = chapter["volume_path"]
# -----------------------------------------------------------
# STATUS UPDATE (book is now in 'downloading')
# -----------------------------------------------------------
set_status(book_idx, "downloading")
# -----------------------------------------------------------
# ABORT CHECK (skip if not yet started)
# -----------------------------------------------------------
if abort_requested(book_idx) and not chapter_started(book_idx, chapter_num):
log_msg(book_idx, f"[ABORT] Skip chapter {chapter_num}")
inc_download_skipped(book_idx)
payload["html"] = None
payload["skipped"] = True
payload["path"] = None
return payload
mark_chapter_started(book_idx, chapter_num)
# -----------------------------------------------------------
# SKIP IF FILE ALREADY EXISTS
# -----------------------------------------------------------
save_path = get_save_path(chapter_num, volume_path)
if os.path.exists(save_path):
log_msg(book_idx, f"[DL] SKIP {chapter_num}{save_path}")
inc_download_skipped(book_idx)
payload["html"] = None
payload["skipped"] = True
payload["path"] = save_path
return payload
# -----------------------------------------------------------
# GLOBAL DELAY + CONCURRENCY
# -----------------------------------------------------------
if GLOBAL_DELAY > 0:
time.sleep(GLOBAL_DELAY)
wait_for_global_delay()
acquire_global_slot(MAX_CONCURRENCY)
# -----------------------------------------------------------
# HTTP DOWNLOAD
# -----------------------------------------------------------
try:
log_msg(book_idx, f"[DL] Downloading {chapter_num} ({chapter_title})")
resp = requests.get(
chapter_url,
headers={"User-Agent": "Mozilla/5.0"},
timeout=20,
)
resp.raise_for_status()
resp.encoding = resp.apparent_encoding or "gb2312"
html = resp.text
log_msg(book_idx, f"[DL] OK {chapter_num}: {len(html)} bytes")
payload["html"] = html
payload["skipped"] = False
payload["path"] = save_path
return payload
except Exception as exc:
attempt = self.request.retries
delay = BASE_DELAY * (BACKOFF**attempt)
# Handle 429
if getattr(getattr(exc, "response", None), "status_code", None) == 429:
log_msg(book_idx, f"[DL] 429 → WAIT {DELAY_429}s")
time.sleep(DELAY_429)
set_global_delay()
raise self.retry(exc=exc, countdown=0, max_retries=MAX_RETRIES)
# General retry with backoff
log_msg(book_idx, f"[DL] ERROR {chapter_num}: {exc} → retry {delay}s")
raise self.retry(exc=exc, countdown=delay, max_retries=MAX_RETRIES)
finally:
set_global_delay()
release_global_slot()