From 788572e1fac3ea2198cc367d54bd1905ca87a273 Mon Sep 17 00:00:00 2001 From: "peter.fong" Date: Tue, 2 Dec 2025 09:42:32 +0100 Subject: [PATCH] Implement global download delay, Redis-based concurrency, cleanup utils, update pipelines --- bookscraper/scraper/download_controller.py | 12 +- bookscraper/scraper/tasks/download_tasks.py | 176 +++++++++++++++++++- bookscraper/scraper/tasks/parse_tasks.py | 37 ++-- bookscraper/scraper/tasks/pipeline.py | 25 ++- bookscraper/scraper/tasks/save_tasks.py | 15 +- bookscraper/scraper/tasks/scraping.py | 4 + bookscraper/scraper/tasks/utils.py | 57 ------- bookscraper/scraper/utils.py | 67 ++++++-- bookscraper/text_replacements.txt | 3 + 9 files changed, 281 insertions(+), 115 deletions(-) delete mode 100644 bookscraper/scraper/tasks/utils.py diff --git a/bookscraper/scraper/download_controller.py b/bookscraper/scraper/download_controller.py index 42d83f1..8538cd1 100644 --- a/bookscraper/scraper/download_controller.py +++ b/bookscraper/scraper/download_controller.py @@ -24,11 +24,14 @@ class DownloadController: self.book_base = os.path.join(root, self.title) os.makedirs(self.book_base, exist_ok=True) - # constant metadata for all chapters + # ------------------------------------------ + # FIXED: meta now includes book_url + # ------------------------------------------ self.meta = { "title": self.scrape_result.get("title"), "author": self.scrape_result.get("author"), "description": self.scrape_result.get("description"), + "book_url": self.scrape_result.get("book_url"), } def get_volume_path(self, chapter_num: int) -> str: @@ -51,20 +54,17 @@ class DownloadController: chapter_num = ch["num"] chapter_url = ch["url"] - # compute volume directory vol_path = self.get_volume_path(chapter_num) - # build the pipeline for this chapter tasks.append( build_chapter_pipeline( chapter_num, chapter_url, - vol_path, # ✔ correct volume path!! - self.meta, # ✔ pass metadata once + vol_path, + self.meta, ) ) - # parallel processing job_group = group(tasks) async_result = job_group.apply_async() diff --git a/bookscraper/scraper/tasks/download_tasks.py b/bookscraper/scraper/tasks/download_tasks.py index c6b6202..837c99e 100644 --- a/bookscraper/scraper/tasks/download_tasks.py +++ b/bookscraper/scraper/tasks/download_tasks.py @@ -2,15 +2,150 @@ from celery_app import celery_app from logbus.publisher import log import requests +import os +import time +import redis +from scraper.utils import get_save_path print(">>> [IMPORT] download_tasks.py loaded") +# --------------------------- +# Retry parameters from .env +# --------------------------- +MAX_RETRIES = int(os.getenv("DOWNLOAD_MAX_RETRIES", "7")) +BASE_DELAY = int(os.getenv("DOWNLOAD_BASE_DELAY", "2")) +BACKOFF = int(os.getenv("DOWNLOAD_BACKOFF_MULTIPLIER", "2")) +DELAY_429 = int(os.getenv("DOWNLOAD_429_DELAY", "10")) -@celery_app.task(bind=True, queue="download", ignore_result=False) -def download_chapter(self, chapter_num: int, chapter_url: str): - log(f"[DL] Downloading chapter {chapter_num}: {chapter_url}") +# --------------------------- +# GLOBAL CONCURRENCY LIMIT +# --------------------------- +MAX_CONCURRENCY = int(os.getenv("DOWNLOAD_MAX_GLOBAL_CONCURRENCY", "1")) + +# --------------------------- +# GLOBAL MINIMUM DELAY +# --------------------------- +GLOBAL_DELAY = int(os.getenv("DOWNLOAD_GLOBAL_MIN_DELAY", "1")) +DELAY_KEY = "download:delay_lock" + + +# --------------------------- +# Redis connection +# --------------------------- +REDIS_URL = os.getenv("REDIS_BROKER", "redis://redis:6379/0") +redis_client = redis.Redis.from_url(REDIS_URL) + +SEM_KEY = "download:active" # semaphore counter + + +# ====================================================== +# GLOBAL DELAY FUNCTIONS +# ====================================================== +def wait_for_global_delay(): + """ + Block until no global delay lock exists. + Prevents hammering the server too fast. + """ + if GLOBAL_DELAY <= 0: + return + + while redis_client.exists(DELAY_KEY): + time.sleep(0.1) + + +def set_global_delay(): + """ + After finishing a download (or skip), + set a TTL lock so all workers wait a minimum time. + """ + if GLOBAL_DELAY <= 0: + return + + # SET key NX EX: + # - only set if not existing + # - expires automatically + redis_client.set(DELAY_KEY, "1", nx=True, ex=GLOBAL_DELAY) + + +# ====================================================== +# GLOBAL CONCURRENCY FUNCTIONS +# ====================================================== +def acquire_global_slot(max_slots: int, retry_delay: float = 0.5): + """ + GLOBAL semaphore with Redis. + Atomic INCR. If limit exceeded, undo & wait. + """ + while True: + current = redis_client.incr(SEM_KEY) + if current <= max_slots: + return # acquired OK + + redis_client.decr(SEM_KEY) + time.sleep(retry_delay) + + +def release_global_slot(): + """Release semaphore.""" + redis_client.decr(SEM_KEY) + + +print(f">>> [CONFIG] Global concurrency = {MAX_CONCURRENCY}") +print(f">>> [CONFIG] Global min delay = {GLOBAL_DELAY}s") +print( + f">>> [CONFIG] download retries = " + f"max={MAX_RETRIES}, base={BASE_DELAY}, backoff={BACKOFF}, 429={DELAY_429}" +) + + +# ====================================================== +# CELERY TASK +# ====================================================== +@celery_app.task( + bind=True, + queue="download", + ignore_result=False, +) +def download_chapter(self, chapter_num: int, chapter_url: str, base_path: str): + """ + base_path komt uit pipeline.py + Download wordt SKIPPED als het bestand al bestaat. + """ + + save_path = get_save_path(chapter_num, base_path) + + # ------------------------------------------------------------------ + # 1. SKIP IF EXISTS — maar WEL global delay zetten! + # ------------------------------------------------------------------ + if os.path.exists(save_path): + wait_for_global_delay() + set_global_delay() + + log(f"[DL] SKIP chapter {chapter_num} (exists) → {save_path}") + return { + "chapter": chapter_num, + "url": chapter_url, + "html": None, + "skipped": True, + "path": save_path, + } + + # ------------------------------------------------------------------ + # 2. GLOBAL DELAY — throttle downloads globally + # ------------------------------------------------------------------ + wait_for_global_delay() + + # ------------------------------------------------------------------ + # 3. GLOBAL CONCURRENCY — only X downloads at the same time + # ------------------------------------------------------------------ + acquire_global_slot(MAX_CONCURRENCY) + log(f"[DL] ACQUIRED SLOT for chapter {chapter_num}") try: + # ------------------------------------------------------------------ + # 4. DOWNLOAD LOGIC + # ------------------------------------------------------------------ + log(f"[DL] Downloading chapter {chapter_num}: {chapter_url}") + resp = requests.get( chapter_url, headers={"User-Agent": "Mozilla/5.0"}, @@ -20,14 +155,45 @@ def download_chapter(self, chapter_num: int, chapter_url: str): resp.encoding = resp.apparent_encoding or "gb2312" html = resp.text + log(f"[DL] OK {chapter_num}: {len(html)} bytes") return { "chapter": chapter_num, "url": chapter_url, "html": html, + "skipped": False, + "path": save_path, } except Exception as exc: - log(f"[DL] ERROR {chapter_url}: {exc}") - raise + # ------------------------------------------------------------------ + # 5. RETRY LOGIC + # ------------------------------------------------------------------ + attempt = self.request.retries + delay = BASE_DELAY * (BACKOFF**attempt) + + if ( + hasattr(exc, "response") + and getattr(exc.response, "status_code", None) == 429 + ): + delay = DELAY_429 + delay + log( + f"[DL] 429 Too Many Requests → retry in {delay}s " + f"(attempt {attempt}/{MAX_RETRIES})" + ) + raise self.retry(exc=exc, countdown=delay, max_retries=MAX_RETRIES) + + log( + f"[DL] ERROR on {chapter_url}: {exc} → retry in {delay}s " + f"(attempt {attempt}/{MAX_RETRIES})" + ) + raise self.retry(exc=exc, countdown=delay, max_retries=MAX_RETRIES) + + finally: + # ------------------------------------------------------------------ + # 6. ALWAYS set delay + release semaphore + # ------------------------------------------------------------------ + set_global_delay() + release_global_slot() + log(f"[DL] RELEASED SLOT for chapter {chapter_num}") diff --git a/bookscraper/scraper/tasks/parse_tasks.py b/bookscraper/scraper/tasks/parse_tasks.py index 83a25c3..3aa270e 100644 --- a/bookscraper/scraper/tasks/parse_tasks.py +++ b/bookscraper/scraper/tasks/parse_tasks.py @@ -10,24 +10,16 @@ print(">>> [IMPORT] parse_tasks.py loaded") @celery_app.task(bind=True, queue="parse", ignore_result=False) def parse_chapter(self, download_result: dict, meta: dict): - """ - download_result: - { - "chapter": int, - "url": str, - "html": str - } - - meta: - { - "title": str, - "author": str, - "description": str - } - """ + # 1) SKIP mode + if download_result.get("skipped"): + chapter = download_result.get("chapter") + log(f"[PARSE] SKIP chapter {chapter} (download skipped)") + return download_result + + # 2) Normal mode chapter_num = download_result["chapter"] - url = download_result["url"] + chapter_url = download_result["url"] html = download_result["html"] log(f"[PARSE] Parsing chapter {chapter_num}") @@ -53,19 +45,20 @@ def parse_chapter(self, download_result: dict, meta: dict): raw = node.get_text() if node else soup.get_text() - # replacements REPL = load_replacements() text = clean_text(raw, REPL) - # --------------------------------------------------- - # HEADER ONLY FOR CHAPTER 1 - # --------------------------------------------------- + # ----------------------------- + # FIXED: chapter 1 header = book URL + # ----------------------------- if chapter_num == 1: + book_url = meta.get("book_url") or meta.get("url") or "UNKNOWN" + header = ( f"{meta.get('title','')}\n" f"Author: {meta.get('author','')}\n" f"Description:\n{meta.get('description','')}\n" - f"URL: {url}\n" + "-" * 50 + "\n\n" + f"Book URL: {book_url}\n" + "-" * 50 + "\n\n" ) text = header + text @@ -73,7 +66,7 @@ def parse_chapter(self, download_result: dict, meta: dict): return { "chapter": chapter_num, - "url": url, + "url": chapter_url, "text": text, "length": len(text), } diff --git a/bookscraper/scraper/tasks/pipeline.py b/bookscraper/scraper/tasks/pipeline.py index 93e98b0..e71b99c 100644 --- a/bookscraper/scraper/tasks/pipeline.py +++ b/bookscraper/scraper/tasks/pipeline.py @@ -1,6 +1,16 @@ # scraper/tasks/pipeline.py +""" +Build the pipeline for a single chapter: +download → parse → save + +This module must NOT import scraping.py or controllers, +otherwise Celery will hit circular imports on worker startup. +Only import task functions here. +""" + from celery import chain + from scraper.tasks.download_tasks import download_chapter from scraper.tasks.parse_tasks import parse_chapter from scraper.tasks.save_tasks import save_chapter @@ -10,12 +20,17 @@ def build_chapter_pipeline( chapter_number: int, chapter_url: str, base_path: str, meta: dict ): """ - Build a download → parse → save pipeline for one chapter. - meta bevat: - title, author, description + Construct a Celery chain for one chapter: + 1. download_chapter + 2. parse_chapter + 3. save_chapter """ + return chain( - download_chapter.s(chapter_number, chapter_url), - parse_chapter.s(meta), # ← METADATA DOORGEVEN + # download_chapter needs ALL THREE arguments + download_chapter.s(chapter_number, chapter_url, base_path), + # parse_chapter gets the output of download_chapter + meta as extra arg + parse_chapter.s(meta), + # save_chapter needs base_path as extra arg save_chapter.s(base_path), ) diff --git a/bookscraper/scraper/tasks/save_tasks.py b/bookscraper/scraper/tasks/save_tasks.py index 3faebc9..f1acbe5 100644 --- a/bookscraper/scraper/tasks/save_tasks.py +++ b/bookscraper/scraper/tasks/save_tasks.py @@ -4,12 +4,23 @@ print(">>> [IMPORT] save_tasks.py loaded") from celery import shared_task from logbus.publisher import log import os +from scraper.utils import get_save_path @shared_task(bind=True, queue="save", ignore_result=False) def save_chapter(self, parsed: dict, base_path: str): print(f">>> [save_tasks] save_chapter() CALLED for chapter {parsed.get('chapter')}") + # ---------------------------- + # SKIP: If pipeline marked skip + # ---------------------------- + if parsed.get("skipped"): + chapter = parsed.get("chapter") + path = parsed.get("path") + log(f"[SAVE] SKIP chapter {chapter} (already exists) → {path}") + print(f">>> [save_tasks] SKIPPED {path}") + return {"chapter": chapter, "path": path, "skipped": True} + try: chapter_number = parsed.get("chapter") url = parsed.get("url") @@ -20,8 +31,8 @@ def save_chapter(self, parsed: dict, base_path: str): os.makedirs(base_path, exist_ok=True) - filename = f"{chapter_number:05d}.txt" - path = os.path.join(base_path, filename) + # unified filename logic + path = get_save_path(chapter_number, base_path) with open(path, "w", encoding="utf-8") as f: f.write(text) diff --git a/bookscraper/scraper/tasks/scraping.py b/bookscraper/scraper/tasks/scraping.py index bbe1d18..8003cf8 100644 --- a/bookscraper/scraper/tasks/scraping.py +++ b/bookscraper/scraper/tasks/scraping.py @@ -31,11 +31,15 @@ def start_scrape_book(self, url: str): log(f"[SCRAPING] DRY_RUN: limiting chapters to first {TEST_LIMIT}") chapters = chapters[:TEST_LIMIT] + # --------------------------------------------------- + # FIX: add book_url so parse_chapter has the real url + # --------------------------------------------------- result = { "title": scraper.book_title, "author": scraper.book_author, "description": scraper.book_description, "cover": scraper.cover_url, + "book_url": url, "chapters": [ {"num": ch.number, "title": ch.title, "url": ch.url} for ch in chapters ], diff --git a/bookscraper/scraper/tasks/utils.py b/bookscraper/scraper/tasks/utils.py deleted file mode 100644 index 03c9b9f..0000000 --- a/bookscraper/scraper/tasks/utils.py +++ /dev/null @@ -1,57 +0,0 @@ -# scraper/utils.py - -import re -import os -from pathlib import Path - - -# ------------------------------------------------------------ -# Load replacements from text_replacements.txt (optional file) -# ------------------------------------------------------------ -def load_replacements(filepath="text_replacements.txt") -> dict: - """ - Load key=value style replacements. - Empty or missing file → return {}. - """ - path = Path(filepath) - - if not path.exists(): - return {} - - repl = {} - - with open(path, "r", encoding="utf-8") as f: - for line in f: - line = line.strip() - if "=" in line: - key, val = line.split("=", 1) - repl[key.strip()] = val.strip() - - return repl - - -# ------------------------------------------------------------ -# Clean extracted HTML text -# ------------------------------------------------------------ -def clean_text(raw: str, repl_dict: dict = None) -> str: - """ - Normalizes whitespace, removes junk, and applies replacements. - repl_dict is optional → falls back to {}. - """ - if repl_dict is None: - repl_dict = {} - - txt = raw - - # Normalize CRLF - txt = txt.replace("\r", "") - - # Collapse multiple blank lines - txt = re.sub(r"\n{3,}", "\n\n", txt) - - # Apply replacements - for key, val in repl_dict.items(): - txt = txt.replace(key, val) - - # Strip excessive whitespace at edges - return txt.strip() diff --git a/bookscraper/scraper/utils.py b/bookscraper/scraper/utils.py index 6fa27e6..08e45f0 100644 --- a/bookscraper/scraper/utils.py +++ b/bookscraper/scraper/utils.py @@ -1,36 +1,67 @@ -# scraper/utils.py +import os +import re from pathlib import Path -def load_replacements(path="text_replacements.txt") -> dict: +# ------------------------------------------------------------ +# Load replacements from text_replacements.txt (optional file) +# ------------------------------------------------------------ +def load_replacements(filepath="text_replacements.txt") -> dict: """ - Load key=value replacements from a simple text file. - Lines beginning with # are ignored. + Load key=value style replacements. + Empty or missing file → return {}. + Lines starting with '#' are ignored. """ - fp = Path(path) - if not fp.exists(): + path = Path(filepath) + + if not path.exists(): return {} repl = {} - for line in fp.read_text(encoding="utf-8").splitlines(): - line = line.strip() - if not line or line.startswith("#"): - continue - if "=" in line: - k, v = line.split("=", 1) - repl[k.strip()] = v.strip() + with open(path, "r", encoding="utf-8") as f: + for line in f: + line = line.strip() + if not line or line.startswith("#"): + continue + if "=" in line: + key, val = line.split("=", 1) + repl[key.strip()] = val.strip() return repl -def clean_text(raw: str, repl_dict: dict) -> str: +# ------------------------------------------------------------ +# Clean extracted HTML text +# ------------------------------------------------------------ +def clean_text(raw: str, repl_dict: dict = None) -> str: """ - Cleans text using user-defined replacements. + Normalize whitespace, remove junk, apply replacements. + repl_dict is optional → {} if none provided. """ - txt = raw + if repl_dict is None: + repl_dict = {} + + txt = raw.replace("\r", "") # normalize CRLF - for k, v in repl_dict.items(): - txt = txt.replace(k, v) + # Collapse 3+ blank lines → max 1 empty line + txt = re.sub(r"\n{3,}", "\n\n", txt) + + # Apply replacements + for key, val in repl_dict.items(): + txt = txt.replace(key, val) return txt.strip() + + +# ------------------------------------------------------------ +# Determine save path for a chapter (shared by download & save) +# ------------------------------------------------------------ +def get_save_path(chapter_num: int, base_path: str) -> str: + """ + Returns the filesystem path where this chapter should be saved. + Formats the filename as 0001.txt, 0002.txt, ... + """ + + filename = f"{chapter_num:04d}.txt" + return os.path.join(base_path, filename) diff --git a/bookscraper/text_replacements.txt b/bookscraper/text_replacements.txt index 73c2339..a2a6525 100644 --- a/bookscraper/text_replacements.txt +++ b/bookscraper/text_replacements.txt @@ -20,8 +20,10 @@ 返回飘天文学网首页= 永久地址:www.piaotia.com= www.piaotia.com= +www.piaotia.com piaotia.com= piaotian.com= +飘天文学 www.piaotian.com= www.piaotian.net= @@ -54,6 +56,7 @@ Copyright ©= 本小说来自互联网资源,如果侵犯您的权益请联系我们= 本站立场无关= 均由网友发表或上传= +感谢各位书友的支持,您的支持就是我们最大的动力 # ---------- COMMON NOISE ---------- 广告=