diff --git a/bookscraper/app.py b/bookscraper/app.py index d9c599b..72d5da9 100644 --- a/bookscraper/app.py +++ b/bookscraper/app.py @@ -1,5 +1,5 @@ # ============================================ -# File: bookscraper/app.py (OPTION A — Sync Scraping) +# File: bookscraper/app.py (ASYNC SCRAPING) # ============================================ from dotenv import load_dotenv @@ -9,24 +9,32 @@ load_dotenv() print(">>> [WEB] Importing celery_app …") from celery_app import celery_app +from flask import Flask, render_template, request, jsonify from scraper.logger import log_debug -from flask import Flask, render_template, request -# Import SCRAPER (sync) -from scraper.book_scraper import BookScraper -from scraper.sites import BookSite +# Abort + Progress (per book_id) +from scraper.abort import set_abort +from scraper.progress import get_progress -# Import Download Controller -from scraper.download_controller import DownloadController +# UI LOGS (GLOBAL — no book_id) +from scraper.ui_log import get_ui_logs + +from celery.result import AsyncResult app = Flask(__name__) +# ===================================================== +# HOME PAGE +# ===================================================== @app.route("/", methods=["GET"]) def index(): return render_template("index.html") +# ===================================================== +# START SCRAPING (async via Celery) +# ===================================================== @app.route("/start", methods=["POST"]) def start_scraping(): url = request.form.get("url", "").strip() @@ -34,48 +42,66 @@ def start_scraping(): if not url: return render_template("result.html", error="Geen URL opgegeven.") - log_debug(f"[WEB] Sync scraping for: {url}") - - # ----------------------------------------------- - # 1. SCRAPE DIRECT (NIET via Celery) - # ----------------------------------------------- - site = BookSite() - scraper = BookScraper(site, url) - scrape_result = scraper.execute() # DIT GEEFT METADATA + CHAPTERLIST + log_debug(f"[WEB] Scraping via Celery: {url}") - # ----------------------------------------------- - # 2. DOWNLOAD PIPELINE STARTEN VIA CELERY - # ----------------------------------------------- - controller = DownloadController(scrape_result) - job = controller.start() + async_result = celery_app.send_task( + "scraper.tasks.scraping.start_scrape_book", + args=[url], + queue="scraping", + ) - # ----------------------------------------------- - # 3. TEMPLATE RENDEREN (VOLLEDIG GEVULD) - # ----------------------------------------------- return render_template( "result.html", - book=scrape_result, - download_job_id=job.id, + message="Scraping gestart.", + scraping_task_id=async_result.id, ) -# ABORT ROUTE (blijft hetzelfde) -from scraper.abort import set_abort, clear_abort - - +# ===================================================== +# ABORT (per book_id) +# ===================================================== @app.route("/abort/", methods=["POST"]) def abort_download(book_id): log_debug(f"[WEB] Abort requested for book: {book_id}") set_abort(book_id) + return jsonify({"status": "ok", "aborted": book_id}) - return render_template( - "result.html", - aborted=True, - book={"title": book_id, "author": "", "chapters": []}, - message=f"Abort requested voor boek: {book_id}", - ) + +# ===================================================== +# PROGRESS (per book_id) +# ===================================================== +@app.route("/progress/", methods=["GET"]) +def progress(book_id): + return jsonify(get_progress(book_id)) + + +# ===================================================== +# LOGS — GLOBAL UI LOGS +# ===================================================== +@app.route("/logs", methods=["GET"]) +def logs(): + return jsonify({"logs": get_ui_logs()}) + + +# ===================================================== +# CELERY RESULT → return book_id when scraping finishes +# ===================================================== +@app.route("/celery-result/", methods=["GET"]) +def celery_result(task_id): + result = AsyncResult(task_id, app=celery_app) + + if result.successful(): + return jsonify({"ready": True, "result": result.get()}) + + if result.failed(): + return jsonify({"ready": True, "error": "failed"}) + + return jsonify({"ready": False}) +# ===================================================== +# RUN FLASK +# ===================================================== if __name__ == "__main__": import os diff --git a/bookscraper/logbus/publisher.py b/bookscraper/logbus/publisher.py index 960aec2..9a597db 100644 --- a/bookscraper/logbus/publisher.py +++ b/bookscraper/logbus/publisher.py @@ -7,13 +7,23 @@ logger = logging.getLogger("logbus") def log(message: str): """ - Compact logging: - - Geen lege regels - - Alleen prefix '[LOG]' als message niet leeg is - - Message staat op één regel + Dumb logger: + - skip lege messages + - stuur message 1:1 door + - geen prefixes + - geen mutaties """ if not message or not message.strip(): - return # skip log entirely + return - logger.warning(f"[LOG] {message}") + # console + logger.warning(message) + + # UI-echo + try: + from scraper.ui_log import push_ui + + push_ui(message) + except Exception: + pass diff --git a/bookscraper/scraper/download_controller.py b/bookscraper/scraper/download_controller.py index 166616f..1b74ffd 100644 --- a/bookscraper/scraper/download_controller.py +++ b/bookscraper/scraper/download_controller.py @@ -1,9 +1,8 @@ # ========================================================= # File: scraper/download_controller.py -# Purpose: Build Celery pipelines for all chapters and -# pass book_id for abort functionality. -# -# book_id = self.title +# Purpose: +# Build Celery pipelines for all chapters +# and pass book_id for abort/progress/log functionality. # ========================================================= from celery import group @@ -13,68 +12,86 @@ import os class DownloadController: - """Coordinates parallel chapter pipelines, with optional volume splitting.""" - - def __init__(self, scrape_result: dict): + """ + Coordinates all chapter pipelines (download → parse → save), + including: + - volume splitting + - consistent meta propagation + - book_id-based abort + progress tracking + """ + + def __init__(self, book_id: str, scrape_result: dict): + self.book_id = book_id self.scrape_result = scrape_result + + # Core metadata self.title = scrape_result.get("title", "UnknownBook") - self.chapters = scrape_result.get("chapters", []) + self.chapters = scrape_result.get("chapters", []) or [] - # Base output dir from .env + # Output base dir root = os.getenv("BOOKSCRAPER_OUTPUT_DIR", "output") # Volume size self.max_vol = int(os.getenv("MAX_VOL_SIZE", "200")) - # Base directory for the whole book + # Base folder for the whole book self.book_base = os.path.join(root, self.title) os.makedirs(self.book_base, exist_ok=True) - # ------------------------------------------ - # FIXED: meta now includes book_url - # ------------------------------------------ + # Meta passed to parse/save stage self.meta = { - "title": self.scrape_result.get("title"), - "author": self.scrape_result.get("author"), - "description": self.scrape_result.get("description"), - "book_url": self.scrape_result.get("book_url"), + "title": self.title, + "author": scrape_result.get("author"), + "description": scrape_result.get("description"), + "book_url": scrape_result.get("book_url"), } + # --------------------------------------------------------- + # Volume isolation + # --------------------------------------------------------- def get_volume_path(self, chapter_num: int) -> str: - """Returns the correct volume directory based on chapter number.""" + """Returns the correct volume directory for a chapter.""" vol_index = (chapter_num - 1) // self.max_vol + 1 vol_name = f"Volume_{vol_index:03d}" vol_path = os.path.join(self.book_base, vol_name) os.makedirs(vol_path, exist_ok=True) return vol_path + # --------------------------------------------------------- + # Pipeline launcher + # --------------------------------------------------------- def start(self): - log(f"[CTRL] Starting download pipeline for {self.title}") - log(f"[CTRL] Chapters: {len(self.chapters)}") + total = len(self.chapters) + + log( + f"[CTRL] Initialising pipeline for '{self.title}' " + f"(book_id={self.book_id}, chapters={total}, max_vol={self.max_vol})" + ) log(f"[CTRL] Output root: {self.book_base}") - log(f"[CTRL] MAX_VOL_SIZE = {self.max_vol}") tasks = [] - book_id = self.title # Use title as book_id for abort logic for ch in self.chapters: chapter_num = ch["num"] chapter_url = ch["url"] - vol_path = self.get_volume_path(chapter_num) + volume_path = self.get_volume_path(chapter_num) tasks.append( build_chapter_pipeline( - book_id, # ← NEW: abort requires book_id + self.book_id, # ← UUID from scraping.py chapter_num, chapter_url, - vol_path, + volume_path, self.meta, ) ) - job_group = group(tasks) - async_result = job_group.apply_async() + async_result = group(tasks).apply_async() + + log( + f"[CTRL] Pipelines dispatched for '{self.title}' " + f"(book_id={self.book_id}, group_id={async_result.id})" + ) - log("[CTRL] Pipelines launched.") return async_result diff --git a/bookscraper/scraper/progress.py b/bookscraper/scraper/progress.py new file mode 100644 index 0000000..6156c9e --- /dev/null +++ b/bookscraper/scraper/progress.py @@ -0,0 +1,66 @@ +# ============================================================ +# File: scraper/progress.py +# Purpose: Track chapter counters for WebGUI progress. +# ============================================================ + +import os +import redis + +REDIS_URL = os.getenv("REDIS_BROKER", "redis://redis:6379/0") +r = redis.Redis.from_url(REDIS_URL, decode_responses=True) + + +# ------------------------------------------------------------ +# SET TOTAL +# ------------------------------------------------------------ +def set_total(book_id: str, total: int): + r.set(f"progress:{book_id}:total", total) + + +# ------------------------------------------------------------ +# COUNTERS +# ------------------------------------------------------------ +def inc_completed(book_id: str): + r.incr(f"progress:{book_id}:completed") + + +def inc_skipped(book_id: str): + r.incr(f"progress:{book_id}:skipped") + + +def inc_failed(book_id: str): + r.incr(f"progress:{book_id}:failed") + + +# ------------------------------------------------------------ +# FAILED CHAPTER LIST +# ------------------------------------------------------------ +def add_failed_chapter(book_id: str, chapter: int, reason: str): + entry = f"Chapter {chapter}: {reason}" + r.rpush(f"progress:{book_id}:failed_list", entry) + + +def get_failed_list(book_id: str): + return r.lrange(f"progress:{book_id}:failed_list", 0, -1) + + +# ------------------------------------------------------------ +# READ STRUCT FOR UI +# ------------------------------------------------------------ +def get_progress(book_id: str): + total = int(r.get(f"progress:{book_id}:total") or 0) + completed = int(r.get(f"progress:{book_id}:completed") or 0) + skipped = int(r.get(f"progress:{book_id}:skipped") or 0) + failed = int(r.get(f"progress:{book_id}:failed") or 0) + abort = r.exists(f"abort:{book_id}") == 1 + failed_list = get_failed_list(book_id) + + return { + "book_id": book_id, + "total": total, + "completed": completed, + "skipped": skipped, + "failed": failed, + "failed_list": failed_list, + "abort": abort, + } diff --git a/bookscraper/scraper/tasks/controller_tasks.py b/bookscraper/scraper/tasks/controller_tasks.py index 691523e..0f5d0ea 100644 --- a/bookscraper/scraper/tasks/controller_tasks.py +++ b/bookscraper/scraper/tasks/controller_tasks.py @@ -1,21 +1,95 @@ -# scraper/tasks/controller_tasks.py +# ============================================================ +# File: scraper/tasks/controller_tasks.py +# Purpose: +# Start the download → parse → save pipeline for a scraped book, +# including progress/abort tracking via book_id. +# ONLY THE CONTROLLER UPDATES PROGRESS. +# ============================================================ from celery_app import celery_app from logbus.publisher import log + from scraper.download_controller import DownloadController +from scraper.progress import ( + set_total, + inc_completed, + inc_skipped, + inc_failed, +) +from scraper.abort import abort_requested print(">>> [IMPORT] controller_tasks.py loaded") @celery_app.task(bind=True, queue="controller", ignore_result=False) -def launch_downloads(self, scrape_result: dict): - """Start complete download → parse → save pipeline.""" +def launch_downloads(self, book_id: str, scrape_result: dict): + """ + Launch the entire pipeline (download → parse → save), + AND maintain progress counters. + + EXPECTS: + book_id: ID generated in scraping.start_scrape_book + scrape_result: dict with title, author, url, chapters[] + """ + + title = scrape_result.get("title", "UnknownBook") + chapters = scrape_result.get("chapters", []) or [] + total = len(chapters) + + log(f"[CTRL] Book '{title}' → {total} chapters (book_id={book_id})") + + # ------------------------------------------------------------ + # INIT PROGRESS + # ------------------------------------------------------------ + set_total(book_id, total) + log(f"[CTRL] Progress initialized for {book_id}: total={total}") + + # ------------------------------------------------------------ + # BUILD CONTROLLER + # ------------------------------------------------------------ + ctl = DownloadController(book_id, scrape_result) + + # ------------------------------------------------------------ + # RUN PIPELINE IN SYNC LOOP + # (DownloadController.start() returns per-chapter generator) + # ------------------------------------------------------------ + try: + for result in ctl.start(): # new generator mode + ch = result.get("chapter") + + if result.get("skipped"): + inc_skipped(book_id) + inc_completed(book_id) + log(f"[CTRL] SKIPPED chapter {ch}") + continue + + if result.get("failed"): + inc_failed(book_id) + inc_completed(book_id) + log(f"[CTRL] FAILED chapter {ch}") + continue + + # Normal success + inc_completed(book_id) + log(f"[CTRL] DONE chapter {ch}") - log("[CTRL] Launching DownloadController...") + # Abort requested mid-run? + if abort_requested(book_id): + log(f"[CTRL] ABORT after chapter {ch}") + break - ctl = DownloadController(scrape_result) - async_result = ctl.start() + except Exception as exc: + log(f"[CTRL] ERROR while processing pipeline: {exc}") + inc_failed(book_id) + raise - log("[CTRL] Pipelines dispatched.") + # ------------------------------------------------------------ + # FINISHED + # ------------------------------------------------------------ + log(f"[CTRL] Pipeline finished for book_id={book_id}") - return {"pipelines_started": len(scrape_result.get("chapters", []))} + return { + "book_id": book_id, + "total": total, + "completed": int(total), # For safety + } diff --git a/bookscraper/scraper/tasks/download_tasks.py b/bookscraper/scraper/tasks/download_tasks.py index 3dabf49..59c9430 100644 --- a/bookscraper/scraper/tasks/download_tasks.py +++ b/bookscraper/scraper/tasks/download_tasks.py @@ -1,80 +1,101 @@ -# ========================================================= +# ============================================================ # File: scraper/tasks/download_tasks.py # Purpose: Download chapter HTML with global concurrency, -# retry/backoff logic, 429 hard delay support, -# and abort-aware chapter skipping. +# retry/backoff logic, 429 support, and abort-awareness. # -# Abort behavior implemented here: -# - If abort is active AND chapter not started → skip -# - If abort is active BUT chapter already started → complete normally -# (download → parse → save) -# ========================================================= +# Logging: +# - timestamp + book_id in de message +# - message wordt via publisher.py naar console gestuurd +# - message wordt via ui_log.push_ui naar Redis GUI logbuffer gestuurd +# +# publisher.py en ui_log.py blijven DOM. +# ============================================================ from celery_app import celery_app -from logbus.publisher import log from scraper.utils import get_save_path from scraper.abort import abort_requested, chapter_started, mark_chapter_started +from logbus.publisher import log # console logging (DOM) +from scraper.ui_log import push_ui # GUI logging (DOM) + import requests +import redis import os import time -import redis +from datetime import datetime print(">>> [IMPORT] download_tasks.py loaded") -# --------------------------------------------------------- -# Retry parameters (.env) -# --------------------------------------------------------- + +# ----------------------------------------------------------- +# TIMESTAMPED LOG WRAPPER +# ----------------------------------------------------------- +def log_msg(book_id: str, message: str): + """ + Log with compact timestamp + book_id. + Pushes to: + - console (publisher.log) + - GUI Redis (push_ui) + """ + ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + full = f"{ts} [{book_id}] {message}" + + # console + log(full) + + # GUI (Redis rolling list) + push_ui(full) # NO book_id param — ui_log is DOM + + +# ----------------------------------------------------------- +# Retry parameters (ENV) +# ----------------------------------------------------------- MAX_RETRIES = int(os.getenv("DOWNLOAD_MAX_RETRIES", "7")) BASE_DELAY = int(os.getenv("DOWNLOAD_BASE_DELAY", "2")) BACKOFF = int(os.getenv("DOWNLOAD_BACKOFF_MULTIPLIER", "2")) DELAY_429 = int(os.getenv("DOWNLOAD_429_DELAY", "10")) -# --------------------------------------------------------- -# Global concurrency (.env) -# --------------------------------------------------------- +# ----------------------------------------------------------- +# Global concurrency +# ----------------------------------------------------------- MAX_CONCURRENCY = int(os.getenv("DOWNLOAD_MAX_GLOBAL_CONCURRENCY", "1")) -# --------------------------------------------------------- -# Global minimum delay (.env) -# --------------------------------------------------------- +# ----------------------------------------------------------- +# Global delay sync +# ----------------------------------------------------------- GLOBAL_DELAY = int(os.getenv("DOWNLOAD_GLOBAL_MIN_DELAY", "1")) DELAY_KEY = "download:delay_lock" -# --------------------------------------------------------- +# ----------------------------------------------------------- # Redis connection -# --------------------------------------------------------- +# ----------------------------------------------------------- REDIS_URL = os.getenv("REDIS_BROKER", "redis://redis:6379/0") redis_client = redis.Redis.from_url(REDIS_URL) -SEM_KEY = "download:active" # semaphore key +SEM_KEY = "download:active" # semaphore counter -# ========================================================= +# ============================================================ # GLOBAL DELAY FUNCTIONS -# ========================================================= +# ============================================================ def wait_for_global_delay(): - """Block while delay-lock exists.""" if GLOBAL_DELAY <= 0: return - while redis_client.exists(DELAY_KEY): time.sleep(0.1) def set_global_delay(): - """Set TTL lock after completing download.""" if GLOBAL_DELAY <= 0: return redis_client.set(DELAY_KEY, "1", nx=True, ex=GLOBAL_DELAY) -# ========================================================= +# ============================================================ # GLOBAL CONCURRENCY FUNCTIONS -# ========================================================= +# ============================================================ def acquire_global_slot(max_slots: int, retry_delay: float = 0.5): - """Semaphore using Redis atomic INCR.""" while True: current = redis_client.incr(SEM_KEY) if current <= max_slots: @@ -88,37 +109,33 @@ def release_global_slot(): print(f">>> [CONFIG] Global concurrency = {MAX_CONCURRENCY}") -print(f">>> [CONFIG] Global min delay = {GLOBAL_DELAY}s") +print(f">>> [CONFIG] Global min delay = {GLOBAL_DELAY}s") print( - f">>> [CONFIG] download retries = " - f"max={MAX_RETRIES}, base={BASE_DELAY}, backoff={BACKOFF}, 429={DELAY_429}" + f">>> [CONFIG] Retries: MAX={MAX_RETRIES}, base={BASE_DELAY}, " + f"backoff={BACKOFF}, 429={DELAY_429}" ) -# ========================================================= -# CELERY DOWNLOAD TASK -# ========================================================= -@celery_app.task( - bind=True, - queue="download", - ignore_result=False, -) +# ============================================================ +# CELERY TASK: DOWNLOAD CHAPTER +# ============================================================ +@celery_app.task(bind=True, queue="download", ignore_result=False) def download_chapter( self, book_id: str, chapter_num: int, chapter_url: str, base_path: str ): """ Download chapter HTML. - - Abort behavior: - - If abort is active AND this chapter has not started → skip immediately - - If abort is active BUT this chapter already started → finish download/parse/save + Abort logic: + - If abort active AND chapter not started → SKIP + - If abort active BUT chapter already started → Proceed normally """ - # ------------------------------------------------------------ - # ABORT CHECK BEFORE ANYTHING STARTS - # ------------------------------------------------------------ + # ----------------------------------------------------------- + # ABORT BEFORE START + # ----------------------------------------------------------- if abort_requested(book_id) and not chapter_started(book_id, chapter_num): - log(f"[ABORT] Skip chapter {chapter_num} (abort active, not started)") + msg = f"[ABORT] Skip chapter {chapter_num} (abort active, not started)" + log_msg(book_id, msg) return { "chapter": chapter_num, "url": chapter_url, @@ -128,27 +145,22 @@ def download_chapter( "abort": True, } - # ------------------------------------------------------------ - # MARK CHAPTER AS STARTED - # Ensures parse/save must always run even after abort is triggered. - # ------------------------------------------------------------ + # Mark started — ensures parse/save must run mark_chapter_started(book_id, chapter_num) - # ------------------------------------------------------------ - # HARD START DELAY - # ------------------------------------------------------------ + # Hard delay if GLOBAL_DELAY > 0: time.sleep(GLOBAL_DELAY) save_path = get_save_path(chapter_num, base_path) - # ------------------------------------------------------------ - # SKIP IF EXISTS - # ------------------------------------------------------------ + # ----------------------------------------------------------- + # SKIP existing + # ----------------------------------------------------------- if os.path.exists(save_path): wait_for_global_delay() set_global_delay() - log(f"[DL] SKIP chapter {chapter_num} (exists) → {save_path}") + log_msg(book_id, f"[DL] SKIP {chapter_num} (exists) → {save_path}") return { "chapter": chapter_num, "url": chapter_url, @@ -157,22 +169,18 @@ def download_chapter( "path": save_path, } - # ------------------------------------------------------------ - # GLOBAL DELAY SYNC - # ------------------------------------------------------------ + # Sync delay wait_for_global_delay() - # ------------------------------------------------------------ - # GLOBAL CONCURRENCY - # ------------------------------------------------------------ + # Acquire concurrency slot acquire_global_slot(MAX_CONCURRENCY) - log(f"[DL] ACQUIRED SLOT for chapter {chapter_num}") + log_msg(book_id, f"[DL] ACQUIRED SLOT for chapter {chapter_num}") try: - # ------------------------------------------------------------ - # ACTUAL DOWNLOAD - # ------------------------------------------------------------ - log(f"[DL] Downloading chapter {chapter_num}: {chapter_url}") + # ----------------------------------------------------------- + # HTTP DOWNLOAD + # ----------------------------------------------------------- + log_msg(book_id, f"[DL] Downloading chapter {chapter_num}: {chapter_url}") resp = requests.get( chapter_url, @@ -184,7 +192,7 @@ def download_chapter( resp.encoding = resp.apparent_encoding or "gb2312" html = resp.text - log(f"[DL] OK {chapter_num}: {len(html)} bytes") + log_msg(book_id, f"[DL] OK {chapter_num}: {len(html)} bytes") return { "chapter": chapter_num, @@ -198,33 +206,30 @@ def download_chapter( attempt = self.request.retries delay = BASE_DELAY * (BACKOFF**attempt) - # ------------------------------------------------------------ - # 429 HANDLING - # ------------------------------------------------------------ + # 429 hard block if ( hasattr(exc, "response") and getattr(exc.response, "status_code", None) == 429 ): - log( - f"[DL] 429 → HARD WAIT {DELAY_429}s " - f"(attempt {attempt}/{MAX_RETRIES})" + log_msg( + book_id, + f"[DL] 429 {chapter_num} → WAIT {DELAY_429}s " + f"(attempt {attempt}/{MAX_RETRIES})", ) time.sleep(DELAY_429) set_global_delay() - raise self.retry(exc=exc, countdown=0, max_retries=MAX_RETRIES) - # ------------------------------------------------------------ - # NORMAL ERRORS - # ------------------------------------------------------------ - log( - f"[DL] ERROR on {chapter_url}: {exc} → retry in {delay}s " - f"(attempt {attempt}/{MAX_RETRIES})" + # Normal error + log_msg( + book_id, + f"[DL] ERROR {chapter_num}: {exc} → retry in {delay}s " + f"(attempt {attempt}/{MAX_RETRIES})", ) raise self.retry(exc=exc, countdown=delay, max_retries=MAX_RETRIES) finally: set_global_delay() release_global_slot() - log(f"[DL] RELEASED SLOT for chapter {chapter_num}") + log_msg(book_id, f"[DL] RELEASED SLOT for chapter {chapter_num}") diff --git a/bookscraper/scraper/tasks/parse_tasks.py b/bookscraper/scraper/tasks/parse_tasks.py index 5a8ae1d..f49c356 100644 --- a/bookscraper/scraper/tasks/parse_tasks.py +++ b/bookscraper/scraper/tasks/parse_tasks.py @@ -7,13 +7,17 @@ # - even if the user triggers abort afterwards # - (abort only prevents new chapters from starting) # -# Parsing avoids skipping except when download_result indicates skip. +# Logging: +# - Same unified log_msg(book_id, message) as download_tasks +# - publisher.log → console +# - ui_log.push_ui → GUI # ========================================================= from celery_app import celery_app -from logbus.publisher import log from bs4 import BeautifulSoup + from scraper.utils import clean_text, load_replacements +from scraper.tasks.download_tasks import log_msg # unified logger print(">>> [IMPORT] parse_tasks.py loaded") @@ -24,13 +28,15 @@ def parse_chapter(self, download_result: dict, meta: dict): Parse raw HTML returned by download_chapter into clean chapter text. """ + # Extract book_id stored by download_tasks + book_id = download_result.get("book_id", "NOBOOK") + # ------------------------------------------------------------ # 1) DOWNLOAD SKIPPED → PARSE ALSO SKIPS - # (This is the ONLY valid skip in parse) # ------------------------------------------------------------ if download_result.get("skipped"): chapter = download_result.get("chapter") - log(f"[PARSE] SKIP chapter {chapter} (download skipped)") + log_msg(book_id, f"[PARSE] SKIP chapter {chapter} (download skipped)") return download_result # ------------------------------------------------------------ @@ -40,7 +46,7 @@ def parse_chapter(self, download_result: dict, meta: dict): chapter_url = download_result["url"] html = download_result["html"] - log(f"[PARSE] Parsing chapter {chapter_num}") + log_msg(book_id, f"[PARSE] Parsing chapter {chapter_num}") soup = BeautifulSoup(html, "lxml") @@ -64,13 +70,13 @@ def parse_chapter(self, download_result: dict, meta: dict): raw = node.get_text() if node else soup.get_text() # ------------------------------------------------------------ - # Apply global replacements (from text_replacements file) + # Apply global replacements # ------------------------------------------------------------ REPL = load_replacements() text = clean_text(raw, REPL) # ------------------------------------------------------------ - # FIX: chapter 1 header now includes meta information + # Chapter 1 gets full header # ------------------------------------------------------------ if chapter_num == 1: book_url = meta.get("book_url") or meta.get("url") or "UNKNOWN" @@ -83,9 +89,10 @@ def parse_chapter(self, download_result: dict, meta: dict): ) text = header + text - log(f"[PARSE] Parsed chapter {chapter_num}: {len(text)} chars") + log_msg(book_id, f"[PARSE] Parsed chapter {chapter_num}: {len(text)} chars") return { + "book_id": book_id, "chapter": chapter_num, "url": chapter_url, "text": text, diff --git a/bookscraper/scraper/tasks/pipeline.py b/bookscraper/scraper/tasks/pipeline.py index 2f038b7..9396665 100644 --- a/bookscraper/scraper/tasks/pipeline.py +++ b/bookscraper/scraper/tasks/pipeline.py @@ -1,31 +1,18 @@ # ========================================================= # File: scraper/tasks/pipeline.py -# Purpose: Construct Celery chains for chapter processing. +# Purpose: +# Build Celery chains for chapter processing. # -# Pipeline: -# download_chapter(book_id, chapter_num, url, base_path) -# → parse_chapter(download_result, meta) -# → save_chapter(parsed_result, base_path) +# download → parse → save → update_progress # -# Abort behavior: -# - download_chapter uses book_id to decide skip vs execute -# - parse/save always run once download has started # ========================================================= -""" -Build the pipeline for a single chapter: -download → parse → save - -This module must NOT import scraping.py or controllers, -otherwise Celery will hit circular imports on worker startup. -Only import task functions here. -""" - from celery import chain from scraper.tasks.download_tasks import download_chapter from scraper.tasks.parse_tasks import parse_chapter from scraper.tasks.save_tasks import save_chapter +from scraper.tasks.progress_tasks import update_progress # NEW def build_chapter_pipeline( @@ -36,17 +23,17 @@ def build_chapter_pipeline( meta: dict, ): """ - Construct a Celery chain for one chapter: - 1. download_chapter(book_id, chapter_number, chapter_url, base_path) - 2. parse_chapter(download_result, meta) - 3. save_chapter(parsed_result, base_path) + Chapter pipeline: + + download_chapter(book_id, chapter_num, url, base_path) + → parse_chapter(download_result, meta) + → save_chapter(parsed_result, base_path) + → update_progress(result, book_id) """ return chain( - # download_chapter needs: book_id, chapter_num, url, base_path download_chapter.s(book_id, chapter_number, chapter_url, base_path), - # parse_chapter gets output of download_chapter + meta parse_chapter.s(meta), - # save_chapter gets parsed result + base_path save_chapter.s(base_path), + update_progress.s(book_id), # ← centrale progress update ) diff --git a/bookscraper/scraper/tasks/progress_tasks.py b/bookscraper/scraper/tasks/progress_tasks.py new file mode 100644 index 0000000..2466150 --- /dev/null +++ b/bookscraper/scraper/tasks/progress_tasks.py @@ -0,0 +1,36 @@ +# ============================================================ +# File: scraper/tasks/progress_tasks.py +# Purpose: Central progress updater for chapter pipelines. +# ============================================================ + +from celery_app import celery_app +from scraper.progress import inc_completed, inc_skipped, inc_failed +from logbus.publisher import log + +print(">>> [IMPORT] progress_tasks.py loaded") + + +@celery_app.task(bind=False, name="progress.update", queue="controller") +def update_progress(result: dict, book_id: str): + """ + Central progress logic: + - result: output of save_chapter + - book_id: explicitly passed by pipeline + """ + + ch = result.get("chapter") + skipped = result.get("skipped", False) + failed = result.get("failed", False) + + if failed: + inc_failed(book_id) + log(f"[PROG] FAILED chapter {ch}") + elif skipped: + inc_skipped(book_id) + inc_completed(book_id) + log(f"[PROG] SKIPPED chapter {ch}") + else: + inc_completed(book_id) + log(f"[PROG] DONE chapter {ch}") + + return result diff --git a/bookscraper/scraper/tasks/save_tasks.py b/bookscraper/scraper/tasks/save_tasks.py index 13fc510..852b674 100644 --- a/bookscraper/scraper/tasks/save_tasks.py +++ b/bookscraper/scraper/tasks/save_tasks.py @@ -1,65 +1,81 @@ # ========================================================= # File: scraper/tasks/save_tasks.py # Purpose: Save parsed chapter text to disk. -# -# Abort Behavior: -# - Save MUST ALWAYS RUN once download has started. -# - Abort only prevents new chapters from starting (download skip). -# - Save is skipped ONLY when download/parse indicated "skipped". -# -# This guarantees no half-written chapters. # ========================================================= print(">>> [IMPORT] save_tasks.py loaded") from celery import shared_task -from logbus.publisher import log import os + from scraper.utils import get_save_path +from scraper.tasks.download_tasks import log_msg # unified logger +from scraper.progress import ( + inc_completed, + inc_skipped, + inc_failed, + add_failed_chapter, # <-- enige noodzakelijke aanvulling +) @shared_task(bind=True, queue="save", ignore_result=False) def save_chapter(self, parsed: dict, base_path: str): - print(f">>> [save_tasks] save_chapter() CALLED for chapter {parsed.get('chapter')}") + """ + Save parsed chapter text to disk. + + parsed = { + "book_id": str, + "chapter": int, + "text": str, + "url": str, + "skipped": bool, + "path": optional str + } + """ + + book_id = parsed.get("book_id", "NOBOOK") + chapter = parsed.get("chapter") # ------------------------------------------------------------ - # SKIP CASE: - # - Only skip when download OR parse indicated skip - # - NOT related to abort (abort never skips parse/save) + # SKIP CASE (from download or parse stage) # ------------------------------------------------------------ if parsed.get("skipped"): - chapter = parsed.get("chapter") - path = parsed.get("path") - log(f"[SAVE] SKIP chapter {chapter} (already exists or skipped) → {path}") - print(f">>> [save_tasks] SKIPPED {path}") + path = parsed.get("path", "(no-path)") + log_msg(book_id, f"[SAVE] SKIP chapter {chapter} → {path}") + + inc_skipped(book_id) return {"chapter": chapter, "path": path, "skipped": True} + # ------------------------------------------------------------ + # NORMAL SAVE + # ------------------------------------------------------------ try: - chapter_number = parsed.get("chapter") - url = parsed.get("url") text = parsed.get("text", "") + url = parsed.get("url") - if not chapter_number: - raise ValueError("Missing chapter_number in parsed payload") + if chapter is None: + raise ValueError("Missing chapter number in parsed payload") - # Ensure base path exists + # Ensure folder exists os.makedirs(base_path, exist_ok=True) - # Unified filename logic - path = get_save_path(chapter_number, base_path) + # Build file path + path = get_save_path(chapter, base_path) - # ------------------------------------------------------------ - # WRITE CHAPTER TEXT TO FILE - # ------------------------------------------------------------ + # Write chapter text with open(path, "w", encoding="utf-8") as f: f.write(text) - log(f"[SAVE] Saved chapter {chapter_number} → {path}") - print(f">>> [save_tasks] SAVED {path}") + log_msg(book_id, f"[SAVE] Saved chapter {chapter} → {path}") + + inc_completed(book_id) - return {"chapter": chapter_number, "path": path} + return {"book_id": book_id, "chapter": chapter, "path": path} except Exception as exc: - log(f"[SAVE] ERROR saving chapter from {url}: {exc}") - print(f">>> [save_tasks] ERROR: {exc}") + log_msg(book_id, f"[SAVE] ERROR saving chapter {chapter}: {exc}") + + inc_failed(book_id) + add_failed_chapter(book_id, chapter, str(exc)) # <-- essentieel + raise diff --git a/bookscraper/scraper/tasks/scraping.py b/bookscraper/scraper/tasks/scraping.py index ccdacd5..04865df 100644 --- a/bookscraper/scraper/tasks/scraping.py +++ b/bookscraper/scraper/tasks/scraping.py @@ -1,35 +1,45 @@ -# scraper/tasks/scraping.py +# ============================================================ +# File: scraper/tasks/scraping.py +# Purpose: Scrape metadata + chapter list and initialise +# Redis progress tracking + launch download controller +# ============================================================ from celery_app import celery_app from logbus.publisher import log import os +import uuid +import redis from scraper.sites import BookSite from scraper.book_scraper import BookScraper +from scraper.abort import clear_abort # no circular deps print(">>> [IMPORT] scraping.py loaded") +# Redis connection (same as Celery broker) +REDIS_URL = os.getenv("REDIS_BROKER", "redis://redis:6379/0") +r = redis.Redis.from_url(REDIS_URL, decode_responses=True) + @celery_app.task(bind=True, queue="scraping", ignore_result=False) def start_scrape_book(self, url: str): - """Scrapes metadata + chapter list using new BookScraper.execute().""" + """Scrapes metadata + chapters and prepares download tracking.""" log(f"[SCRAPING] Start scraping for: {url}") + # ------------------------------------------------------------ + # Book scrape + # ------------------------------------------------------------ site = BookSite() scraper = BookScraper(site, url) - - # ---------------------------------------- - # NEW API (old: scraper.parse_book_info()) - # ---------------------------------------- - result = scraper.execute() + result = scraper.execute() # returns dict with metadata + chapters chapters = result.get("chapters", []) full_count = len(chapters) - # ---------------------------------------- - # DRY RUN logic - # ---------------------------------------- + # ------------------------------------------------------------ + # DRY RUN + # ------------------------------------------------------------ DRY_RUN = os.getenv("DRY_RUN", "0") == "1" TEST_LIMIT = int(os.getenv("TEST_LIMIT", "5")) @@ -40,13 +50,41 @@ def start_scrape_book(self, url: str): log(f"[SCRAPING] Completed scrape: {len(chapters)}/{full_count} chapters") - # ---------------------------------------- - # Dispatch download pipelines - # ---------------------------------------- + # ------------------------------------------------------------ + # BOOK RUN ID + # ------------------------------------------------------------ + book_id = str(uuid.uuid4()) + result["book_id"] = book_id + + log(f"[SCRAPING] Assigned book_id = {book_id}") + + # ------------------------------------------------------------ + # RESET ABORT + INITIALISE PROGRESS + # ------------------------------------------------------------ + clear_abort(book_id) + + r.set(f"progress:{book_id}:total", len(chapters)) + r.set(f"progress:{book_id}:done", 0) + r.delete(f"logs:{book_id}") # clear old logs if any + + r.rpush(f"logs:{book_id}", f":: SCRAPING STARTED for {url}") + r.rpush(f"logs:{book_id}", f":: Found {len(chapters)} chapters") + + # ------------------------------------------------------------ + # DISPATCH DOWNLOAD CONTROLLER + # ------------------------------------------------------------ + # controller task signature = launch_downloads(book_id, scrape_result) celery_app.send_task( "scraper.tasks.controller_tasks.launch_downloads", - args=[result], + args=[book_id, result], queue="controller", ) - return result + log(f"[SCRAPING] Dispatched download controller for {book_id}") + + return { + "book_id": book_id, + "title": result.get("title"), + "author": result.get("author"), + "chapters": len(chapters), + } diff --git a/bookscraper/scraper/ui_log.py b/bookscraper/scraper/ui_log.py new file mode 100644 index 0000000..18db819 --- /dev/null +++ b/bookscraper/scraper/ui_log.py @@ -0,0 +1,36 @@ +# ============================================ +# File: scraper/ui_log.py +# Purpose: Central UI log buffer for WebGUI +# Single global buffer. No book_id. +# ============================================ + +import redis +import os +from datetime import datetime + +REDIS_URL = os.getenv("REDIS_BROKER", "redis://redis:6379/0") +LOG_BUFFER_SIZE = int(os.getenv("LOG_BUFFER_SIZE", "1000")) + +r = redis.Redis.from_url(REDIS_URL, decode_responses=True) + +UI_LOG_KEY = "logs:ui" + + +def push_ui(message: str): + """Push a message into the global UI log (no book_id).""" + if not message or not message.strip(): + return + + ts = datetime.now().strftime("%H:%M:%S") + entry = f"[{ts}] {message}" + + r.rpush(UI_LOG_KEY, entry) + r.ltrim(UI_LOG_KEY, -LOG_BUFFER_SIZE, -1) + + +def get_ui_logs(limit: int = None): + """Return last N global UI log lines.""" + if limit is None: + limit = LOG_BUFFER_SIZE + + return r.lrange(UI_LOG_KEY, -limit, -1) diff --git a/bookscraper/templates/result.html b/bookscraper/templates/result.html index df6359b..1673c5a 100644 --- a/bookscraper/templates/result.html +++ b/bookscraper/templates/result.html @@ -2,103 +2,194 @@ - Scrape & Download Resultaat + BookScraper – Resultaat + ← Terug +

Scrape Resultaat--

{% if error %} -
Fout:
{{ error }}
+
+ Fout: {{ error }} +
+ {% endif %} {% if message %} +
{{ message }}
{% endif %} -

Scrape Resultaat

- - {% if book %} - -
- Titel: {{ book.title }}
- Auteur: {{ book.author }}
+ - {% if book.description %} -
- Beschrijving:
-

{{ book.description }}

+ + - {% endif %}
- Aantal chapters: {{ book.chapters|length }} + Live log: +
- {% if book.chapters %} -
- Chapters:

- -
- {% endif %} {% if download_job_id %} -
- Download pipeline gestart!
- Job ID: {{ download_job_id }} -
+ diff --git a/bookscraper/templates/status.html b/bookscraper/templates/status.html new file mode 100644 index 0000000..f8d5b27 --- /dev/null +++ b/bookscraper/templates/status.html @@ -0,0 +1,15 @@ +

{{ title }}

+ +
+ +
+ +
+
+
+ +
Loading…
+ +