Implement global download delay, Redis-based concurrency, cleanup utils, update pipelines

feat/download-progress-abort
peter.fong 2 weeks ago
parent f27b33a882
commit 788572e1fa

@ -24,11 +24,14 @@ class DownloadController:
self.book_base = os.path.join(root, self.title)
os.makedirs(self.book_base, exist_ok=True)
# constant metadata for all chapters
# ------------------------------------------
# FIXED: meta now includes book_url
# ------------------------------------------
self.meta = {
"title": self.scrape_result.get("title"),
"author": self.scrape_result.get("author"),
"description": self.scrape_result.get("description"),
"book_url": self.scrape_result.get("book_url"),
}
def get_volume_path(self, chapter_num: int) -> str:
@ -51,20 +54,17 @@ class DownloadController:
chapter_num = ch["num"]
chapter_url = ch["url"]
# compute volume directory
vol_path = self.get_volume_path(chapter_num)
# build the pipeline for this chapter
tasks.append(
build_chapter_pipeline(
chapter_num,
chapter_url,
vol_path, # ✔ correct volume path!!
self.meta, # ✔ pass metadata once
vol_path,
self.meta,
)
)
# parallel processing
job_group = group(tasks)
async_result = job_group.apply_async()

@ -2,15 +2,150 @@
from celery_app import celery_app
from logbus.publisher import log
import requests
import os
import time
import redis
from scraper.utils import get_save_path
print(">>> [IMPORT] download_tasks.py loaded")
# ---------------------------
# Retry parameters from .env
# ---------------------------
MAX_RETRIES = int(os.getenv("DOWNLOAD_MAX_RETRIES", "7"))
BASE_DELAY = int(os.getenv("DOWNLOAD_BASE_DELAY", "2"))
BACKOFF = int(os.getenv("DOWNLOAD_BACKOFF_MULTIPLIER", "2"))
DELAY_429 = int(os.getenv("DOWNLOAD_429_DELAY", "10"))
@celery_app.task(bind=True, queue="download", ignore_result=False)
def download_chapter(self, chapter_num: int, chapter_url: str):
log(f"[DL] Downloading chapter {chapter_num}: {chapter_url}")
# ---------------------------
# GLOBAL CONCURRENCY LIMIT
# ---------------------------
MAX_CONCURRENCY = int(os.getenv("DOWNLOAD_MAX_GLOBAL_CONCURRENCY", "1"))
# ---------------------------
# GLOBAL MINIMUM DELAY
# ---------------------------
GLOBAL_DELAY = int(os.getenv("DOWNLOAD_GLOBAL_MIN_DELAY", "1"))
DELAY_KEY = "download:delay_lock"
# ---------------------------
# Redis connection
# ---------------------------
REDIS_URL = os.getenv("REDIS_BROKER", "redis://redis:6379/0")
redis_client = redis.Redis.from_url(REDIS_URL)
SEM_KEY = "download:active" # semaphore counter
# ======================================================
# GLOBAL DELAY FUNCTIONS
# ======================================================
def wait_for_global_delay():
"""
Block until no global delay lock exists.
Prevents hammering the server too fast.
"""
if GLOBAL_DELAY <= 0:
return
while redis_client.exists(DELAY_KEY):
time.sleep(0.1)
def set_global_delay():
"""
After finishing a download (or skip),
set a TTL lock so all workers wait a minimum time.
"""
if GLOBAL_DELAY <= 0:
return
# SET key NX EX:
# - only set if not existing
# - expires automatically
redis_client.set(DELAY_KEY, "1", nx=True, ex=GLOBAL_DELAY)
# ======================================================
# GLOBAL CONCURRENCY FUNCTIONS
# ======================================================
def acquire_global_slot(max_slots: int, retry_delay: float = 0.5):
"""
GLOBAL semaphore with Redis.
Atomic INCR. If limit exceeded, undo & wait.
"""
while True:
current = redis_client.incr(SEM_KEY)
if current <= max_slots:
return # acquired OK
redis_client.decr(SEM_KEY)
time.sleep(retry_delay)
def release_global_slot():
"""Release semaphore."""
redis_client.decr(SEM_KEY)
print(f">>> [CONFIG] Global concurrency = {MAX_CONCURRENCY}")
print(f">>> [CONFIG] Global min delay = {GLOBAL_DELAY}s")
print(
f">>> [CONFIG] download retries = "
f"max={MAX_RETRIES}, base={BASE_DELAY}, backoff={BACKOFF}, 429={DELAY_429}"
)
# ======================================================
# CELERY TASK
# ======================================================
@celery_app.task(
bind=True,
queue="download",
ignore_result=False,
)
def download_chapter(self, chapter_num: int, chapter_url: str, base_path: str):
"""
base_path komt uit pipeline.py
Download wordt SKIPPED als het bestand al bestaat.
"""
save_path = get_save_path(chapter_num, base_path)
# ------------------------------------------------------------------
# 1. SKIP IF EXISTS — maar WEL global delay zetten!
# ------------------------------------------------------------------
if os.path.exists(save_path):
wait_for_global_delay()
set_global_delay()
log(f"[DL] SKIP chapter {chapter_num} (exists) → {save_path}")
return {
"chapter": chapter_num,
"url": chapter_url,
"html": None,
"skipped": True,
"path": save_path,
}
# ------------------------------------------------------------------
# 2. GLOBAL DELAY — throttle downloads globally
# ------------------------------------------------------------------
wait_for_global_delay()
# ------------------------------------------------------------------
# 3. GLOBAL CONCURRENCY — only X downloads at the same time
# ------------------------------------------------------------------
acquire_global_slot(MAX_CONCURRENCY)
log(f"[DL] ACQUIRED SLOT for chapter {chapter_num}")
try:
# ------------------------------------------------------------------
# 4. DOWNLOAD LOGIC
# ------------------------------------------------------------------
log(f"[DL] Downloading chapter {chapter_num}: {chapter_url}")
resp = requests.get(
chapter_url,
headers={"User-Agent": "Mozilla/5.0"},
@ -20,14 +155,45 @@ def download_chapter(self, chapter_num: int, chapter_url: str):
resp.encoding = resp.apparent_encoding or "gb2312"
html = resp.text
log(f"[DL] OK {chapter_num}: {len(html)} bytes")
return {
"chapter": chapter_num,
"url": chapter_url,
"html": html,
"skipped": False,
"path": save_path,
}
except Exception as exc:
log(f"[DL] ERROR {chapter_url}: {exc}")
raise
# ------------------------------------------------------------------
# 5. RETRY LOGIC
# ------------------------------------------------------------------
attempt = self.request.retries
delay = BASE_DELAY * (BACKOFF**attempt)
if (
hasattr(exc, "response")
and getattr(exc.response, "status_code", None) == 429
):
delay = DELAY_429 + delay
log(
f"[DL] 429 Too Many Requests → retry in {delay}s "
f"(attempt {attempt}/{MAX_RETRIES})"
)
raise self.retry(exc=exc, countdown=delay, max_retries=MAX_RETRIES)
log(
f"[DL] ERROR on {chapter_url}: {exc} → retry in {delay}s "
f"(attempt {attempt}/{MAX_RETRIES})"
)
raise self.retry(exc=exc, countdown=delay, max_retries=MAX_RETRIES)
finally:
# ------------------------------------------------------------------
# 6. ALWAYS set delay + release semaphore
# ------------------------------------------------------------------
set_global_delay()
release_global_slot()
log(f"[DL] RELEASED SLOT for chapter {chapter_num}")

@ -10,24 +10,16 @@ print(">>> [IMPORT] parse_tasks.py loaded")
@celery_app.task(bind=True, queue="parse", ignore_result=False)
def parse_chapter(self, download_result: dict, meta: dict):
"""
download_result:
{
"chapter": int,
"url": str,
"html": str
}
meta:
{
"title": str,
"author": str,
"description": str
}
"""
# 1) SKIP mode
if download_result.get("skipped"):
chapter = download_result.get("chapter")
log(f"[PARSE] SKIP chapter {chapter} (download skipped)")
return download_result
# 2) Normal mode
chapter_num = download_result["chapter"]
url = download_result["url"]
chapter_url = download_result["url"]
html = download_result["html"]
log(f"[PARSE] Parsing chapter {chapter_num}")
@ -53,19 +45,20 @@ def parse_chapter(self, download_result: dict, meta: dict):
raw = node.get_text() if node else soup.get_text()
# replacements
REPL = load_replacements()
text = clean_text(raw, REPL)
# ---------------------------------------------------
# HEADER ONLY FOR CHAPTER 1
# ---------------------------------------------------
# -----------------------------
# FIXED: chapter 1 header = book URL
# -----------------------------
if chapter_num == 1:
book_url = meta.get("book_url") or meta.get("url") or "UNKNOWN"
header = (
f"{meta.get('title','')}\n"
f"Author: {meta.get('author','')}\n"
f"Description:\n{meta.get('description','')}\n"
f"URL: {url}\n" + "-" * 50 + "\n\n"
f"Book URL: {book_url}\n" + "-" * 50 + "\n\n"
)
text = header + text
@ -73,7 +66,7 @@ def parse_chapter(self, download_result: dict, meta: dict):
return {
"chapter": chapter_num,
"url": url,
"url": chapter_url,
"text": text,
"length": len(text),
}

@ -1,6 +1,16 @@
# scraper/tasks/pipeline.py
"""
Build the pipeline for a single chapter:
download parse save
This module must NOT import scraping.py or controllers,
otherwise Celery will hit circular imports on worker startup.
Only import task functions here.
"""
from celery import chain
from scraper.tasks.download_tasks import download_chapter
from scraper.tasks.parse_tasks import parse_chapter
from scraper.tasks.save_tasks import save_chapter
@ -10,12 +20,17 @@ def build_chapter_pipeline(
chapter_number: int, chapter_url: str, base_path: str, meta: dict
):
"""
Build a download parse save pipeline for one chapter.
meta bevat:
title, author, description
Construct a Celery chain for one chapter:
1. download_chapter
2. parse_chapter
3. save_chapter
"""
return chain(
download_chapter.s(chapter_number, chapter_url),
parse_chapter.s(meta), # ← METADATA DOORGEVEN
# download_chapter needs ALL THREE arguments
download_chapter.s(chapter_number, chapter_url, base_path),
# parse_chapter gets the output of download_chapter + meta as extra arg
parse_chapter.s(meta),
# save_chapter needs base_path as extra arg
save_chapter.s(base_path),
)

@ -4,12 +4,23 @@ print(">>> [IMPORT] save_tasks.py loaded")
from celery import shared_task
from logbus.publisher import log
import os
from scraper.utils import get_save_path
@shared_task(bind=True, queue="save", ignore_result=False)
def save_chapter(self, parsed: dict, base_path: str):
print(f">>> [save_tasks] save_chapter() CALLED for chapter {parsed.get('chapter')}")
# ----------------------------
# SKIP: If pipeline marked skip
# ----------------------------
if parsed.get("skipped"):
chapter = parsed.get("chapter")
path = parsed.get("path")
log(f"[SAVE] SKIP chapter {chapter} (already exists) → {path}")
print(f">>> [save_tasks] SKIPPED {path}")
return {"chapter": chapter, "path": path, "skipped": True}
try:
chapter_number = parsed.get("chapter")
url = parsed.get("url")
@ -20,8 +31,8 @@ def save_chapter(self, parsed: dict, base_path: str):
os.makedirs(base_path, exist_ok=True)
filename = f"{chapter_number:05d}.txt"
path = os.path.join(base_path, filename)
# unified filename logic
path = get_save_path(chapter_number, base_path)
with open(path, "w", encoding="utf-8") as f:
f.write(text)

@ -31,11 +31,15 @@ def start_scrape_book(self, url: str):
log(f"[SCRAPING] DRY_RUN: limiting chapters to first {TEST_LIMIT}")
chapters = chapters[:TEST_LIMIT]
# ---------------------------------------------------
# FIX: add book_url so parse_chapter has the real url
# ---------------------------------------------------
result = {
"title": scraper.book_title,
"author": scraper.book_author,
"description": scraper.book_description,
"cover": scraper.cover_url,
"book_url": url,
"chapters": [
{"num": ch.number, "title": ch.title, "url": ch.url} for ch in chapters
],

@ -1,57 +0,0 @@
# scraper/utils.py
import re
import os
from pathlib import Path
# ------------------------------------------------------------
# Load replacements from text_replacements.txt (optional file)
# ------------------------------------------------------------
def load_replacements(filepath="text_replacements.txt") -> dict:
"""
Load key=value style replacements.
Empty or missing file return {}.
"""
path = Path(filepath)
if not path.exists():
return {}
repl = {}
with open(path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if "=" in line:
key, val = line.split("=", 1)
repl[key.strip()] = val.strip()
return repl
# ------------------------------------------------------------
# Clean extracted HTML text
# ------------------------------------------------------------
def clean_text(raw: str, repl_dict: dict = None) -> str:
"""
Normalizes whitespace, removes junk, and applies replacements.
repl_dict is optional falls back to {}.
"""
if repl_dict is None:
repl_dict = {}
txt = raw
# Normalize CRLF
txt = txt.replace("\r", "")
# Collapse multiple blank lines
txt = re.sub(r"\n{3,}", "\n\n", txt)
# Apply replacements
for key, val in repl_dict.items():
txt = txt.replace(key, val)
# Strip excessive whitespace at edges
return txt.strip()

@ -1,36 +1,67 @@
# scraper/utils.py
import os
import re
from pathlib import Path
def load_replacements(path="text_replacements.txt") -> dict:
# ------------------------------------------------------------
# Load replacements from text_replacements.txt (optional file)
# ------------------------------------------------------------
def load_replacements(filepath="text_replacements.txt") -> dict:
"""
Load key=value replacements from a simple text file.
Lines beginning with # are ignored.
Load key=value style replacements.
Empty or missing file return {}.
Lines starting with '#' are ignored.
"""
fp = Path(path)
if not fp.exists():
path = Path(filepath)
if not path.exists():
return {}
repl = {}
for line in fp.read_text(encoding="utf-8").splitlines():
line = line.strip()
if not line or line.startswith("#"):
continue
if "=" in line:
k, v = line.split("=", 1)
repl[k.strip()] = v.strip()
with open(path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
if "=" in line:
key, val = line.split("=", 1)
repl[key.strip()] = val.strip()
return repl
def clean_text(raw: str, repl_dict: dict) -> str:
# ------------------------------------------------------------
# Clean extracted HTML text
# ------------------------------------------------------------
def clean_text(raw: str, repl_dict: dict = None) -> str:
"""
Cleans text using user-defined replacements.
Normalize whitespace, remove junk, apply replacements.
repl_dict is optional {} if none provided.
"""
txt = raw
if repl_dict is None:
repl_dict = {}
txt = raw.replace("\r", "") # normalize CRLF
for k, v in repl_dict.items():
txt = txt.replace(k, v)
# Collapse 3+ blank lines → max 1 empty line
txt = re.sub(r"\n{3,}", "\n\n", txt)
# Apply replacements
for key, val in repl_dict.items():
txt = txt.replace(key, val)
return txt.strip()
# ------------------------------------------------------------
# Determine save path for a chapter (shared by download & save)
# ------------------------------------------------------------
def get_save_path(chapter_num: int, base_path: str) -> str:
"""
Returns the filesystem path where this chapter should be saved.
Formats the filename as 0001.txt, 0002.txt, ...
"""
filename = f"{chapter_num:04d}.txt"
return os.path.join(base_path, filename)

@ -20,8 +20,10 @@
返回飘天文学网首页=
永久地址www.piaotia.com=
www.piaotia.com=
www.piaotia.com
piaotia.com=
piaotian.com=
飘天文学
www.piaotian.com=
www.piaotian.net=
@ -54,6 +56,7 @@ Copyright ©=
本小说来自互联网资源,如果侵犯您的权益请联系我们=
本站立场无关=
均由网友发表或上传=
感谢各位书友的支持,您的支持就是我们最大的动力
# ---------- COMMON NOISE ----------
广告=

Loading…
Cancel
Save