From 6577d997ed83c9804ade091c951de440abc0c424 Mon Sep 17 00:00:00 2001 From: "peter.fong" Date: Tue, 2 Dec 2025 10:51:20 +0100 Subject: [PATCH] abort functionaliteit --- bookscraper/app.py | 54 +++++-- bookscraper/scraper/abort.py | 82 +++++++++++ bookscraper/scraper/download_controller.py | 10 +- bookscraper/scraper/tasks/download_tasks.py | 147 ++++++++++++-------- bookscraper/scraper/tasks/parse_tasks.py | 35 ++++- bookscraper/scraper/tasks/pipeline.py | 34 +++-- bookscraper/scraper/tasks/save_tasks.py | 29 +++- bookscraper/templates/result.html | 19 +++ 8 files changed, 316 insertions(+), 94 deletions(-) create mode 100644 bookscraper/scraper/abort.py diff --git a/bookscraper/app.py b/bookscraper/app.py index 522c727..d9c599b 100644 --- a/bookscraper/app.py +++ b/bookscraper/app.py @@ -1,5 +1,5 @@ # ============================================ -# File: bookscraper/app.py +# File: bookscraper/app.py (OPTION A — Sync Scraping) # ============================================ from dotenv import load_dotenv @@ -7,14 +7,17 @@ from dotenv import load_dotenv load_dotenv() print(">>> [WEB] Importing celery_app …") -from celery_app import celery_app # <<< MOET BOVEN TASK IMPORTS +from celery_app import celery_app from scraper.logger import log_debug from flask import Flask, render_template, request -# Task imports komen pas na celery_app: -print(">>> [WEB] Importing tasks …") -from scraper.tasks.scraping import start_scrape_book +# Import SCRAPER (sync) +from scraper.book_scraper import BookScraper +from scraper.sites import BookSite + +# Import Download Controller +from scraper.download_controller import DownloadController app = Flask(__name__) @@ -31,16 +34,45 @@ def start_scraping(): if not url: return render_template("result.html", error="Geen URL opgegeven.") - log_debug(f"[WEB] Scrape request for: {url}") + log_debug(f"[WEB] Sync scraping for: {url}") + + # ----------------------------------------------- + # 1. SCRAPE DIRECT (NIET via Celery) + # ----------------------------------------------- + site = BookSite() + scraper = BookScraper(site, url) + scrape_result = scraper.execute() # DIT GEEFT METADATA + CHAPTERLIST + + # ----------------------------------------------- + # 2. DOWNLOAD PIPELINE STARTEN VIA CELERY + # ----------------------------------------------- + controller = DownloadController(scrape_result) + job = controller.start() + + # ----------------------------------------------- + # 3. TEMPLATE RENDEREN (VOLLEDIG GEVULD) + # ----------------------------------------------- + return render_template( + "result.html", + book=scrape_result, + download_job_id=job.id, + ) + + +# ABORT ROUTE (blijft hetzelfde) +from scraper.abort import set_abort, clear_abort + - # Belangrijk: start_scrape_book komt uit DEZELFDE celery_app nu - result = start_scrape_book.delay(url) +@app.route("/abort/", methods=["POST"]) +def abort_download(book_id): + log_debug(f"[WEB] Abort requested for book: {book_id}") + set_abort(book_id) return render_template( "result.html", - message="Scraping gestart.", - task_id=result.id, - url=url, + aborted=True, + book={"title": book_id, "author": "", "chapters": []}, + message=f"Abort requested voor boek: {book_id}", ) diff --git a/bookscraper/scraper/abort.py b/bookscraper/scraper/abort.py new file mode 100644 index 0000000..d9f7c92 --- /dev/null +++ b/bookscraper/scraper/abort.py @@ -0,0 +1,82 @@ +import os +import redis + +# --------------------------------------------------------- +# Redis connection +# --------------------------------------------------------- +REDIS_URL = os.getenv("REDIS_URL", "redis://redis:6379/0") +r = redis.Redis.from_url(REDIS_URL, decode_responses=True) + + +# ========================================================= +# ABORT FLAG +# ========================================================= + + +def set_abort(book_id: str): + """ + Enable abort mode for this book. + All download tasks that haven't started yet will immediately exit. + """ + r.set(f"abort:{book_id}", "1") + + +def clear_abort(book_id: str): + """ + Clear abort flag so future runs are unaffected. + """ + r.delete(f"abort:{book_id}") + + +def abort_requested(book_id: str) -> bool: + """ + True if abort flag is set for this book. + """ + return r.exists(f"abort:{book_id}") == 1 + + +# ========================================================= +# PER-CHAPTER STATE +# ========================================================= +# We mark a chapter "started" once its download task begins. +# If abort is activated AFTER download start: +# → download must complete +# → parse must complete +# → save must complete +# All subsequent chapters will skip. + + +def mark_chapter_started(book_id: str, chapter_num: int): + """ + Mark this chapter as started. Parse/save will always run after this, + even if abort has been activated afterwards. + """ + key = f"started:{book_id}:{chapter_num}" + r.set(key, "1") + + +def chapter_started(book_id: str, chapter_num: int) -> bool: + """ + Return True if this chapter has already started downloading. + """ + key = f"started:{book_id}:{chapter_num}" + return r.exists(key) == 1 + + +# ========================================================= +# UTILITY: RESET FOR A BOOK +# ========================================================= + + +def reset_book_state(book_id: str): + """ + Optional utility: remove abort flag and all started-chapter markers. + Useful during testing or manual cleanup. + """ + # Remove abort flag + r.delete(f"abort:{book_id}") + + # Remove all "started:*" keys for this book + pattern = f"started:{book_id}:*" + for key in r.scan_iter(pattern): + r.delete(key) diff --git a/bookscraper/scraper/download_controller.py b/bookscraper/scraper/download_controller.py index 8538cd1..166616f 100644 --- a/bookscraper/scraper/download_controller.py +++ b/bookscraper/scraper/download_controller.py @@ -1,4 +1,10 @@ -# scraper/download_controller.py +# ========================================================= +# File: scraper/download_controller.py +# Purpose: Build Celery pipelines for all chapters and +# pass book_id for abort functionality. +# +# book_id = self.title +# ========================================================= from celery import group from scraper.tasks.pipeline import build_chapter_pipeline @@ -49,6 +55,7 @@ class DownloadController: log(f"[CTRL] MAX_VOL_SIZE = {self.max_vol}") tasks = [] + book_id = self.title # Use title as book_id for abort logic for ch in self.chapters: chapter_num = ch["num"] @@ -58,6 +65,7 @@ class DownloadController: tasks.append( build_chapter_pipeline( + book_id, # ← NEW: abort requires book_id chapter_num, chapter_url, vol_path, diff --git a/bookscraper/scraper/tasks/download_tasks.py b/bookscraper/scraper/tasks/download_tasks.py index 479ee47..3dabf49 100644 --- a/bookscraper/scraper/tasks/download_tasks.py +++ b/bookscraper/scraper/tasks/download_tasks.py @@ -1,47 +1,61 @@ -# scraper/tasks/download_tasks.py +# ========================================================= +# File: scraper/tasks/download_tasks.py +# Purpose: Download chapter HTML with global concurrency, +# retry/backoff logic, 429 hard delay support, +# and abort-aware chapter skipping. +# +# Abort behavior implemented here: +# - If abort is active AND chapter not started → skip +# - If abort is active BUT chapter already started → complete normally +# (download → parse → save) +# ========================================================= + from celery_app import celery_app from logbus.publisher import log +from scraper.utils import get_save_path +from scraper.abort import abort_requested, chapter_started, mark_chapter_started + import requests import os import time import redis -from scraper.utils import get_save_path + print(">>> [IMPORT] download_tasks.py loaded") -# --------------------------- -# Retry parameters from .env -# --------------------------- +# --------------------------------------------------------- +# Retry parameters (.env) +# --------------------------------------------------------- MAX_RETRIES = int(os.getenv("DOWNLOAD_MAX_RETRIES", "7")) BASE_DELAY = int(os.getenv("DOWNLOAD_BASE_DELAY", "2")) BACKOFF = int(os.getenv("DOWNLOAD_BACKOFF_MULTIPLIER", "2")) DELAY_429 = int(os.getenv("DOWNLOAD_429_DELAY", "10")) -# --------------------------- -# GLOBAL CONCURRENCY LIMIT -# --------------------------- +# --------------------------------------------------------- +# Global concurrency (.env) +# --------------------------------------------------------- MAX_CONCURRENCY = int(os.getenv("DOWNLOAD_MAX_GLOBAL_CONCURRENCY", "1")) -# --------------------------- -# GLOBAL MINIMUM DELAY -# --------------------------- +# --------------------------------------------------------- +# Global minimum delay (.env) +# --------------------------------------------------------- GLOBAL_DELAY = int(os.getenv("DOWNLOAD_GLOBAL_MIN_DELAY", "1")) DELAY_KEY = "download:delay_lock" -# --------------------------- +# --------------------------------------------------------- # Redis connection -# --------------------------- +# --------------------------------------------------------- REDIS_URL = os.getenv("REDIS_BROKER", "redis://redis:6379/0") redis_client = redis.Redis.from_url(REDIS_URL) -SEM_KEY = "download:active" # semaphore counter +SEM_KEY = "download:active" # semaphore key -# ====================================================== +# ========================================================= # GLOBAL DELAY FUNCTIONS -# ====================================================== +# ========================================================= def wait_for_global_delay(): - """Block while delay lock exists.""" + """Block while delay-lock exists.""" if GLOBAL_DELAY <= 0: return @@ -50,23 +64,21 @@ def wait_for_global_delay(): def set_global_delay(): - """Set TTL lock after a download completes.""" + """Set TTL lock after completing download.""" if GLOBAL_DELAY <= 0: return - redis_client.set(DELAY_KEY, "1", nx=True, ex=GLOBAL_DELAY) -# ====================================================== +# ========================================================= # GLOBAL CONCURRENCY FUNCTIONS -# ====================================================== +# ========================================================= def acquire_global_slot(max_slots: int, retry_delay: float = 0.5): - """Semaphore using Redis, atomic INCR.""" + """Semaphore using Redis atomic INCR.""" while True: current = redis_client.incr(SEM_KEY) if current <= max_slots: - return # acquired OK - + return redis_client.decr(SEM_KEY) time.sleep(retry_delay) @@ -83,36 +95,59 @@ print( ) -# ====================================================== -# CELERY TASK -# ====================================================== +# ========================================================= +# CELERY DOWNLOAD TASK +# ========================================================= @celery_app.task( bind=True, queue="download", ignore_result=False, ) -def download_chapter(self, chapter_num: int, chapter_url: str, base_path: str): +def download_chapter( + self, book_id: str, chapter_num: int, chapter_url: str, base_path: str +): """ Download chapter HTML. - Ensures: - - global delay throttle - - global concurrency limit - - hard blocking on 429 + + Abort behavior: + - If abort is active AND this chapter has not started → skip immediately + - If abort is active BUT this chapter already started → finish download/parse/save """ - # HARD DELAY at every execution to prevent retry storms + # ------------------------------------------------------------ + # ABORT CHECK BEFORE ANYTHING STARTS + # ------------------------------------------------------------ + if abort_requested(book_id) and not chapter_started(book_id, chapter_num): + log(f"[ABORT] Skip chapter {chapter_num} (abort active, not started)") + return { + "chapter": chapter_num, + "url": chapter_url, + "html": None, + "skipped": True, + "path": None, + "abort": True, + } + + # ------------------------------------------------------------ + # MARK CHAPTER AS STARTED + # Ensures parse/save must always run even after abort is triggered. + # ------------------------------------------------------------ + mark_chapter_started(book_id, chapter_num) + + # ------------------------------------------------------------ + # HARD START DELAY + # ------------------------------------------------------------ if GLOBAL_DELAY > 0: time.sleep(GLOBAL_DELAY) save_path = get_save_path(chapter_num, base_path) - # ------------------------------------------------------------------ - # 1. SKIP IF EXISTS (still delay to maintain consistency) - # ------------------------------------------------------------------ + # ------------------------------------------------------------ + # SKIP IF EXISTS + # ------------------------------------------------------------ if os.path.exists(save_path): wait_for_global_delay() set_global_delay() - log(f"[DL] SKIP chapter {chapter_num} (exists) → {save_path}") return { "chapter": chapter_num, @@ -122,21 +157,21 @@ def download_chapter(self, chapter_num: int, chapter_url: str, base_path: str): "path": save_path, } - # ------------------------------------------------------------------ - # 2. GLOBAL DELAY SYNC - # ------------------------------------------------------------------ + # ------------------------------------------------------------ + # GLOBAL DELAY SYNC + # ------------------------------------------------------------ wait_for_global_delay() - # ------------------------------------------------------------------ - # 3. GLOBAL CONCURRENCY - # ------------------------------------------------------------------ + # ------------------------------------------------------------ + # GLOBAL CONCURRENCY + # ------------------------------------------------------------ acquire_global_slot(MAX_CONCURRENCY) log(f"[DL] ACQUIRED SLOT for chapter {chapter_num}") try: - # ------------------------------------------------------------------ - # 4. DO THE DOWNLOAD - # ------------------------------------------------------------------ + # ------------------------------------------------------------ + # ACTUAL DOWNLOAD + # ------------------------------------------------------------ log(f"[DL] Downloading chapter {chapter_num}: {chapter_url}") resp = requests.get( @@ -163,31 +198,26 @@ def download_chapter(self, chapter_num: int, chapter_url: str, base_path: str): attempt = self.request.retries delay = BASE_DELAY * (BACKOFF**attempt) - # ============================================================= - # HARD 429 BLOCK — DO NOT RELEASE SLOT YET - # ============================================================= + # ------------------------------------------------------------ + # 429 HANDLING + # ------------------------------------------------------------ if ( hasattr(exc, "response") and getattr(exc.response, "status_code", None) == 429 ): - log( - f"[DL] 429 Too Many Requests → HARD WAIT {DELAY_429}s " + f"[DL] 429 → HARD WAIT {DELAY_429}s " f"(attempt {attempt}/{MAX_RETRIES})" ) - # HARD BLOCK: worker sleeps, still holding the slot time.sleep(DELAY_429) - - # After 429 wait, also apply global delay set_global_delay() - # Retry immediately (countdown=0) raise self.retry(exc=exc, countdown=0, max_retries=MAX_RETRIES) - # ============================================================= + # ------------------------------------------------------------ # NORMAL ERRORS - # ============================================================= + # ------------------------------------------------------------ log( f"[DL] ERROR on {chapter_url}: {exc} → retry in {delay}s " f"(attempt {attempt}/{MAX_RETRIES})" @@ -195,9 +225,6 @@ def download_chapter(self, chapter_num: int, chapter_url: str, base_path: str): raise self.retry(exc=exc, countdown=delay, max_retries=MAX_RETRIES) finally: - # ============================================================= - # ALWAYS RELEASE SLOT AFTER HARD BLOCK / NORMAL WORK - # ============================================================= set_global_delay() release_global_slot() log(f"[DL] RELEASED SLOT for chapter {chapter_num}") diff --git a/bookscraper/scraper/tasks/parse_tasks.py b/bookscraper/scraper/tasks/parse_tasks.py index 3aa270e..5a8ae1d 100644 --- a/bookscraper/scraper/tasks/parse_tasks.py +++ b/bookscraper/scraper/tasks/parse_tasks.py @@ -1,4 +1,14 @@ -# scraper/tasks/parse_tasks.py +# ========================================================= +# File: scraper/tasks/parse_tasks.py +# Purpose: Parse downloaded HTML into clean chapter text. +# +# Abort Behavior: +# - parse MUST ALWAYS RUN once download has started +# - even if the user triggers abort afterwards +# - (abort only prevents new chapters from starting) +# +# Parsing avoids skipping except when download_result indicates skip. +# ========================================================= from celery_app import celery_app from logbus.publisher import log @@ -10,14 +20,22 @@ print(">>> [IMPORT] parse_tasks.py loaded") @celery_app.task(bind=True, queue="parse", ignore_result=False) def parse_chapter(self, download_result: dict, meta: dict): - - # 1) SKIP mode + """ + Parse raw HTML returned by download_chapter into clean chapter text. + """ + + # ------------------------------------------------------------ + # 1) DOWNLOAD SKIPPED → PARSE ALSO SKIPS + # (This is the ONLY valid skip in parse) + # ------------------------------------------------------------ if download_result.get("skipped"): chapter = download_result.get("chapter") log(f"[PARSE] SKIP chapter {chapter} (download skipped)") return download_result - # 2) Normal mode + # ------------------------------------------------------------ + # 2) Normal Parsing + # ------------------------------------------------------------ chapter_num = download_result["chapter"] chapter_url = download_result["url"] html = download_result["html"] @@ -45,12 +63,15 @@ def parse_chapter(self, download_result: dict, meta: dict): raw = node.get_text() if node else soup.get_text() + # ------------------------------------------------------------ + # Apply global replacements (from text_replacements file) + # ------------------------------------------------------------ REPL = load_replacements() text = clean_text(raw, REPL) - # ----------------------------- - # FIXED: chapter 1 header = book URL - # ----------------------------- + # ------------------------------------------------------------ + # FIX: chapter 1 header now includes meta information + # ------------------------------------------------------------ if chapter_num == 1: book_url = meta.get("book_url") or meta.get("url") or "UNKNOWN" diff --git a/bookscraper/scraper/tasks/pipeline.py b/bookscraper/scraper/tasks/pipeline.py index e71b99c..2f038b7 100644 --- a/bookscraper/scraper/tasks/pipeline.py +++ b/bookscraper/scraper/tasks/pipeline.py @@ -1,4 +1,16 @@ -# scraper/tasks/pipeline.py +# ========================================================= +# File: scraper/tasks/pipeline.py +# Purpose: Construct Celery chains for chapter processing. +# +# Pipeline: +# download_chapter(book_id, chapter_num, url, base_path) +# → parse_chapter(download_result, meta) +# → save_chapter(parsed_result, base_path) +# +# Abort behavior: +# - download_chapter uses book_id to decide skip vs execute +# - parse/save always run once download has started +# ========================================================= """ Build the pipeline for a single chapter: @@ -17,20 +29,24 @@ from scraper.tasks.save_tasks import save_chapter def build_chapter_pipeline( - chapter_number: int, chapter_url: str, base_path: str, meta: dict + book_id: str, + chapter_number: int, + chapter_url: str, + base_path: str, + meta: dict, ): """ Construct a Celery chain for one chapter: - 1. download_chapter - 2. parse_chapter - 3. save_chapter + 1. download_chapter(book_id, chapter_number, chapter_url, base_path) + 2. parse_chapter(download_result, meta) + 3. save_chapter(parsed_result, base_path) """ return chain( - # download_chapter needs ALL THREE arguments - download_chapter.s(chapter_number, chapter_url, base_path), - # parse_chapter gets the output of download_chapter + meta as extra arg + # download_chapter needs: book_id, chapter_num, url, base_path + download_chapter.s(book_id, chapter_number, chapter_url, base_path), + # parse_chapter gets output of download_chapter + meta parse_chapter.s(meta), - # save_chapter needs base_path as extra arg + # save_chapter gets parsed result + base_path save_chapter.s(base_path), ) diff --git a/bookscraper/scraper/tasks/save_tasks.py b/bookscraper/scraper/tasks/save_tasks.py index f1acbe5..13fc510 100644 --- a/bookscraper/scraper/tasks/save_tasks.py +++ b/bookscraper/scraper/tasks/save_tasks.py @@ -1,4 +1,15 @@ -# scraper/tasks/save_tasks.py +# ========================================================= +# File: scraper/tasks/save_tasks.py +# Purpose: Save parsed chapter text to disk. +# +# Abort Behavior: +# - Save MUST ALWAYS RUN once download has started. +# - Abort only prevents new chapters from starting (download skip). +# - Save is skipped ONLY when download/parse indicated "skipped". +# +# This guarantees no half-written chapters. +# ========================================================= + print(">>> [IMPORT] save_tasks.py loaded") from celery import shared_task @@ -11,13 +22,15 @@ from scraper.utils import get_save_path def save_chapter(self, parsed: dict, base_path: str): print(f">>> [save_tasks] save_chapter() CALLED for chapter {parsed.get('chapter')}") - # ---------------------------- - # SKIP: If pipeline marked skip - # ---------------------------- + # ------------------------------------------------------------ + # SKIP CASE: + # - Only skip when download OR parse indicated skip + # - NOT related to abort (abort never skips parse/save) + # ------------------------------------------------------------ if parsed.get("skipped"): chapter = parsed.get("chapter") path = parsed.get("path") - log(f"[SAVE] SKIP chapter {chapter} (already exists) → {path}") + log(f"[SAVE] SKIP chapter {chapter} (already exists or skipped) → {path}") print(f">>> [save_tasks] SKIPPED {path}") return {"chapter": chapter, "path": path, "skipped": True} @@ -29,11 +42,15 @@ def save_chapter(self, parsed: dict, base_path: str): if not chapter_number: raise ValueError("Missing chapter_number in parsed payload") + # Ensure base path exists os.makedirs(base_path, exist_ok=True) - # unified filename logic + # Unified filename logic path = get_save_path(chapter_number, base_path) + # ------------------------------------------------------------ + # WRITE CHAPTER TEXT TO FILE + # ------------------------------------------------------------ with open(path, "w", encoding="utf-8") as f: f.write(text) diff --git a/bookscraper/templates/result.html b/bookscraper/templates/result.html index f3d2c5f..df6359b 100644 --- a/bookscraper/templates/result.html +++ b/bookscraper/templates/result.html @@ -33,8 +33,21 @@ a:hover { text-decoration: underline; } + .abort-btn { + padding: 10px 15px; + background: #cc0000; + color: #fff; + border: none; + border-radius: 4px; + cursor: pointer; + font-size: 15px; + } + .abort-btn:hover { + background: #a30000; + } + ← Terug @@ -80,6 +93,12 @@ Download pipeline gestart!
Job ID: {{ download_job_id }} + +
+
+ +
+
{% endif %} {% endif %}