feat/download-progress-abort
peter.fong 2 weeks ago
parent 6577d997ed
commit ed341c727a

@ -1,5 +1,5 @@
# ============================================
# File: bookscraper/app.py (OPTION A — Sync Scraping)
# File: bookscraper/app.py (ASYNC SCRAPING)
# ============================================
from dotenv import load_dotenv
@ -9,24 +9,32 @@ load_dotenv()
print(">>> [WEB] Importing celery_app …")
from celery_app import celery_app
from flask import Flask, render_template, request, jsonify
from scraper.logger import log_debug
from flask import Flask, render_template, request
# Import SCRAPER (sync)
from scraper.book_scraper import BookScraper
from scraper.sites import BookSite
# Abort + Progress (per book_id)
from scraper.abort import set_abort
from scraper.progress import get_progress
# Import Download Controller
from scraper.download_controller import DownloadController
# UI LOGS (GLOBAL — no book_id)
from scraper.ui_log import get_ui_logs
from celery.result import AsyncResult
app = Flask(__name__)
# =====================================================
# HOME PAGE
# =====================================================
@app.route("/", methods=["GET"])
def index():
return render_template("index.html")
# =====================================================
# START SCRAPING (async via Celery)
# =====================================================
@app.route("/start", methods=["POST"])
def start_scraping():
url = request.form.get("url", "").strip()
@ -34,48 +42,66 @@ def start_scraping():
if not url:
return render_template("result.html", error="Geen URL opgegeven.")
log_debug(f"[WEB] Sync scraping for: {url}")
# -----------------------------------------------
# 1. SCRAPE DIRECT (NIET via Celery)
# -----------------------------------------------
site = BookSite()
scraper = BookScraper(site, url)
scrape_result = scraper.execute() # DIT GEEFT METADATA + CHAPTERLIST
log_debug(f"[WEB] Scraping via Celery: {url}")
# -----------------------------------------------
# 2. DOWNLOAD PIPELINE STARTEN VIA CELERY
# -----------------------------------------------
controller = DownloadController(scrape_result)
job = controller.start()
async_result = celery_app.send_task(
"scraper.tasks.scraping.start_scrape_book",
args=[url],
queue="scraping",
)
# -----------------------------------------------
# 3. TEMPLATE RENDEREN (VOLLEDIG GEVULD)
# -----------------------------------------------
return render_template(
"result.html",
book=scrape_result,
download_job_id=job.id,
message="Scraping gestart.",
scraping_task_id=async_result.id,
)
# ABORT ROUTE (blijft hetzelfde)
from scraper.abort import set_abort, clear_abort
# =====================================================
# ABORT (per book_id)
# =====================================================
@app.route("/abort/<book_id>", methods=["POST"])
def abort_download(book_id):
log_debug(f"[WEB] Abort requested for book: {book_id}")
set_abort(book_id)
return jsonify({"status": "ok", "aborted": book_id})
return render_template(
"result.html",
aborted=True,
book={"title": book_id, "author": "", "chapters": []},
message=f"Abort requested voor boek: {book_id}",
)
# =====================================================
# PROGRESS (per book_id)
# =====================================================
@app.route("/progress/<book_id>", methods=["GET"])
def progress(book_id):
return jsonify(get_progress(book_id))
# =====================================================
# LOGS — GLOBAL UI LOGS
# =====================================================
@app.route("/logs", methods=["GET"])
def logs():
return jsonify({"logs": get_ui_logs()})
# =====================================================
# CELERY RESULT → return book_id when scraping finishes
# =====================================================
@app.route("/celery-result/<task_id>", methods=["GET"])
def celery_result(task_id):
result = AsyncResult(task_id, app=celery_app)
if result.successful():
return jsonify({"ready": True, "result": result.get()})
if result.failed():
return jsonify({"ready": True, "error": "failed"})
return jsonify({"ready": False})
# =====================================================
# RUN FLASK
# =====================================================
if __name__ == "__main__":
import os

@ -7,13 +7,23 @@ logger = logging.getLogger("logbus")
def log(message: str):
"""
Compact logging:
- Geen lege regels
- Alleen prefix '[LOG]' als message niet leeg is
- Message staat op één regel
Dumb logger:
- skip lege messages
- stuur message 1:1 door
- geen prefixes
- geen mutaties
"""
if not message or not message.strip():
return # skip log entirely
return
logger.warning(f"[LOG] {message}")
# console
logger.warning(message)
# UI-echo
try:
from scraper.ui_log import push_ui
push_ui(message)
except Exception:
pass

@ -1,9 +1,8 @@
# =========================================================
# File: scraper/download_controller.py
# Purpose: Build Celery pipelines for all chapters and
# pass book_id for abort functionality.
#
# book_id = self.title
# Purpose:
# Build Celery pipelines for all chapters
# and pass book_id for abort/progress/log functionality.
# =========================================================
from celery import group
@ -13,68 +12,86 @@ import os
class DownloadController:
"""Coordinates parallel chapter pipelines, with optional volume splitting."""
def __init__(self, scrape_result: dict):
"""
Coordinates all chapter pipelines (download parse save),
including:
- volume splitting
- consistent meta propagation
- book_id-based abort + progress tracking
"""
def __init__(self, book_id: str, scrape_result: dict):
self.book_id = book_id
self.scrape_result = scrape_result
# Core metadata
self.title = scrape_result.get("title", "UnknownBook")
self.chapters = scrape_result.get("chapters", [])
self.chapters = scrape_result.get("chapters", []) or []
# Base output dir from .env
# Output base dir
root = os.getenv("BOOKSCRAPER_OUTPUT_DIR", "output")
# Volume size
self.max_vol = int(os.getenv("MAX_VOL_SIZE", "200"))
# Base directory for the whole book
# Base folder for the whole book
self.book_base = os.path.join(root, self.title)
os.makedirs(self.book_base, exist_ok=True)
# ------------------------------------------
# FIXED: meta now includes book_url
# ------------------------------------------
# Meta passed to parse/save stage
self.meta = {
"title": self.scrape_result.get("title"),
"author": self.scrape_result.get("author"),
"description": self.scrape_result.get("description"),
"book_url": self.scrape_result.get("book_url"),
"title": self.title,
"author": scrape_result.get("author"),
"description": scrape_result.get("description"),
"book_url": scrape_result.get("book_url"),
}
# ---------------------------------------------------------
# Volume isolation
# ---------------------------------------------------------
def get_volume_path(self, chapter_num: int) -> str:
"""Returns the correct volume directory based on chapter number."""
"""Returns the correct volume directory for a chapter."""
vol_index = (chapter_num - 1) // self.max_vol + 1
vol_name = f"Volume_{vol_index:03d}"
vol_path = os.path.join(self.book_base, vol_name)
os.makedirs(vol_path, exist_ok=True)
return vol_path
# ---------------------------------------------------------
# Pipeline launcher
# ---------------------------------------------------------
def start(self):
log(f"[CTRL] Starting download pipeline for {self.title}")
log(f"[CTRL] Chapters: {len(self.chapters)}")
total = len(self.chapters)
log(
f"[CTRL] Initialising pipeline for '{self.title}' "
f"(book_id={self.book_id}, chapters={total}, max_vol={self.max_vol})"
)
log(f"[CTRL] Output root: {self.book_base}")
log(f"[CTRL] MAX_VOL_SIZE = {self.max_vol}")
tasks = []
book_id = self.title # Use title as book_id for abort logic
for ch in self.chapters:
chapter_num = ch["num"]
chapter_url = ch["url"]
vol_path = self.get_volume_path(chapter_num)
volume_path = self.get_volume_path(chapter_num)
tasks.append(
build_chapter_pipeline(
book_id, # ← NEW: abort requires book_id
self.book_id, # ← UUID from scraping.py
chapter_num,
chapter_url,
vol_path,
volume_path,
self.meta,
)
)
job_group = group(tasks)
async_result = job_group.apply_async()
async_result = group(tasks).apply_async()
log(
f"[CTRL] Pipelines dispatched for '{self.title}' "
f"(book_id={self.book_id}, group_id={async_result.id})"
)
log("[CTRL] Pipelines launched.")
return async_result

@ -0,0 +1,66 @@
# ============================================================
# File: scraper/progress.py
# Purpose: Track chapter counters for WebGUI progress.
# ============================================================
import os
import redis
REDIS_URL = os.getenv("REDIS_BROKER", "redis://redis:6379/0")
r = redis.Redis.from_url(REDIS_URL, decode_responses=True)
# ------------------------------------------------------------
# SET TOTAL
# ------------------------------------------------------------
def set_total(book_id: str, total: int):
r.set(f"progress:{book_id}:total", total)
# ------------------------------------------------------------
# COUNTERS
# ------------------------------------------------------------
def inc_completed(book_id: str):
r.incr(f"progress:{book_id}:completed")
def inc_skipped(book_id: str):
r.incr(f"progress:{book_id}:skipped")
def inc_failed(book_id: str):
r.incr(f"progress:{book_id}:failed")
# ------------------------------------------------------------
# FAILED CHAPTER LIST
# ------------------------------------------------------------
def add_failed_chapter(book_id: str, chapter: int, reason: str):
entry = f"Chapter {chapter}: {reason}"
r.rpush(f"progress:{book_id}:failed_list", entry)
def get_failed_list(book_id: str):
return r.lrange(f"progress:{book_id}:failed_list", 0, -1)
# ------------------------------------------------------------
# READ STRUCT FOR UI
# ------------------------------------------------------------
def get_progress(book_id: str):
total = int(r.get(f"progress:{book_id}:total") or 0)
completed = int(r.get(f"progress:{book_id}:completed") or 0)
skipped = int(r.get(f"progress:{book_id}:skipped") or 0)
failed = int(r.get(f"progress:{book_id}:failed") or 0)
abort = r.exists(f"abort:{book_id}") == 1
failed_list = get_failed_list(book_id)
return {
"book_id": book_id,
"total": total,
"completed": completed,
"skipped": skipped,
"failed": failed,
"failed_list": failed_list,
"abort": abort,
}

@ -1,21 +1,95 @@
# scraper/tasks/controller_tasks.py
# ============================================================
# File: scraper/tasks/controller_tasks.py
# Purpose:
# Start the download → parse → save pipeline for a scraped book,
# including progress/abort tracking via book_id.
# ONLY THE CONTROLLER UPDATES PROGRESS.
# ============================================================
from celery_app import celery_app
from logbus.publisher import log
from scraper.download_controller import DownloadController
from scraper.progress import (
set_total,
inc_completed,
inc_skipped,
inc_failed,
)
from scraper.abort import abort_requested
print(">>> [IMPORT] controller_tasks.py loaded")
@celery_app.task(bind=True, queue="controller", ignore_result=False)
def launch_downloads(self, scrape_result: dict):
"""Start complete download → parse → save pipeline."""
def launch_downloads(self, book_id: str, scrape_result: dict):
"""
Launch the entire pipeline (download parse save),
AND maintain progress counters.
EXPECTS:
book_id: ID generated in scraping.start_scrape_book
scrape_result: dict with title, author, url, chapters[]
"""
title = scrape_result.get("title", "UnknownBook")
chapters = scrape_result.get("chapters", []) or []
total = len(chapters)
log(f"[CTRL] Book '{title}'{total} chapters (book_id={book_id})")
# ------------------------------------------------------------
# INIT PROGRESS
# ------------------------------------------------------------
set_total(book_id, total)
log(f"[CTRL] Progress initialized for {book_id}: total={total}")
# ------------------------------------------------------------
# BUILD CONTROLLER
# ------------------------------------------------------------
ctl = DownloadController(book_id, scrape_result)
# ------------------------------------------------------------
# RUN PIPELINE IN SYNC LOOP
# (DownloadController.start() returns per-chapter generator)
# ------------------------------------------------------------
try:
for result in ctl.start(): # new generator mode
ch = result.get("chapter")
if result.get("skipped"):
inc_skipped(book_id)
inc_completed(book_id)
log(f"[CTRL] SKIPPED chapter {ch}")
continue
if result.get("failed"):
inc_failed(book_id)
inc_completed(book_id)
log(f"[CTRL] FAILED chapter {ch}")
continue
# Normal success
inc_completed(book_id)
log(f"[CTRL] DONE chapter {ch}")
log("[CTRL] Launching DownloadController...")
# Abort requested mid-run?
if abort_requested(book_id):
log(f"[CTRL] ABORT after chapter {ch}")
break
ctl = DownloadController(scrape_result)
async_result = ctl.start()
except Exception as exc:
log(f"[CTRL] ERROR while processing pipeline: {exc}")
inc_failed(book_id)
raise
log("[CTRL] Pipelines dispatched.")
# ------------------------------------------------------------
# FINISHED
# ------------------------------------------------------------
log(f"[CTRL] Pipeline finished for book_id={book_id}")
return {"pipelines_started": len(scrape_result.get("chapters", []))}
return {
"book_id": book_id,
"total": total,
"completed": int(total), # For safety
}

@ -1,80 +1,101 @@
# =========================================================
# ============================================================
# File: scraper/tasks/download_tasks.py
# Purpose: Download chapter HTML with global concurrency,
# retry/backoff logic, 429 hard delay support,
# and abort-aware chapter skipping.
# retry/backoff logic, 429 support, and abort-awareness.
#
# Abort behavior implemented here:
# - If abort is active AND chapter not started → skip
# - If abort is active BUT chapter already started → complete normally
# (download → parse → save)
# =========================================================
# Logging:
# - timestamp + book_id in de message
# - message wordt via publisher.py naar console gestuurd
# - message wordt via ui_log.push_ui naar Redis GUI logbuffer gestuurd
#
# publisher.py en ui_log.py blijven DOM.
# ============================================================
from celery_app import celery_app
from logbus.publisher import log
from scraper.utils import get_save_path
from scraper.abort import abort_requested, chapter_started, mark_chapter_started
from logbus.publisher import log # console logging (DOM)
from scraper.ui_log import push_ui # GUI logging (DOM)
import requests
import redis
import os
import time
import redis
from datetime import datetime
print(">>> [IMPORT] download_tasks.py loaded")
# ---------------------------------------------------------
# Retry parameters (.env)
# ---------------------------------------------------------
# -----------------------------------------------------------
# TIMESTAMPED LOG WRAPPER
# -----------------------------------------------------------
def log_msg(book_id: str, message: str):
"""
Log with compact timestamp + book_id.
Pushes to:
- console (publisher.log)
- GUI Redis (push_ui)
"""
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
full = f"{ts} [{book_id}] {message}"
# console
log(full)
# GUI (Redis rolling list)
push_ui(full) # NO book_id param — ui_log is DOM
# -----------------------------------------------------------
# Retry parameters (ENV)
# -----------------------------------------------------------
MAX_RETRIES = int(os.getenv("DOWNLOAD_MAX_RETRIES", "7"))
BASE_DELAY = int(os.getenv("DOWNLOAD_BASE_DELAY", "2"))
BACKOFF = int(os.getenv("DOWNLOAD_BACKOFF_MULTIPLIER", "2"))
DELAY_429 = int(os.getenv("DOWNLOAD_429_DELAY", "10"))
# ---------------------------------------------------------
# Global concurrency (.env)
# ---------------------------------------------------------
# -----------------------------------------------------------
# Global concurrency
# -----------------------------------------------------------
MAX_CONCURRENCY = int(os.getenv("DOWNLOAD_MAX_GLOBAL_CONCURRENCY", "1"))
# ---------------------------------------------------------
# Global minimum delay (.env)
# ---------------------------------------------------------
# -----------------------------------------------------------
# Global delay sync
# -----------------------------------------------------------
GLOBAL_DELAY = int(os.getenv("DOWNLOAD_GLOBAL_MIN_DELAY", "1"))
DELAY_KEY = "download:delay_lock"
# ---------------------------------------------------------
# -----------------------------------------------------------
# Redis connection
# ---------------------------------------------------------
# -----------------------------------------------------------
REDIS_URL = os.getenv("REDIS_BROKER", "redis://redis:6379/0")
redis_client = redis.Redis.from_url(REDIS_URL)
SEM_KEY = "download:active" # semaphore key
SEM_KEY = "download:active" # semaphore counter
# =========================================================
# ============================================================
# GLOBAL DELAY FUNCTIONS
# =========================================================
# ============================================================
def wait_for_global_delay():
"""Block while delay-lock exists."""
if GLOBAL_DELAY <= 0:
return
while redis_client.exists(DELAY_KEY):
time.sleep(0.1)
def set_global_delay():
"""Set TTL lock after completing download."""
if GLOBAL_DELAY <= 0:
return
redis_client.set(DELAY_KEY, "1", nx=True, ex=GLOBAL_DELAY)
# =========================================================
# ============================================================
# GLOBAL CONCURRENCY FUNCTIONS
# =========================================================
# ============================================================
def acquire_global_slot(max_slots: int, retry_delay: float = 0.5):
"""Semaphore using Redis atomic INCR."""
while True:
current = redis_client.incr(SEM_KEY)
if current <= max_slots:
@ -88,37 +109,33 @@ def release_global_slot():
print(f">>> [CONFIG] Global concurrency = {MAX_CONCURRENCY}")
print(f">>> [CONFIG] Global min delay = {GLOBAL_DELAY}s")
print(f">>> [CONFIG] Global min delay = {GLOBAL_DELAY}s")
print(
f">>> [CONFIG] download retries = "
f"max={MAX_RETRIES}, base={BASE_DELAY}, backoff={BACKOFF}, 429={DELAY_429}"
f">>> [CONFIG] Retries: MAX={MAX_RETRIES}, base={BASE_DELAY}, "
f"backoff={BACKOFF}, 429={DELAY_429}"
)
# =========================================================
# CELERY DOWNLOAD TASK
# =========================================================
@celery_app.task(
bind=True,
queue="download",
ignore_result=False,
)
# ============================================================
# CELERY TASK: DOWNLOAD CHAPTER
# ============================================================
@celery_app.task(bind=True, queue="download", ignore_result=False)
def download_chapter(
self, book_id: str, chapter_num: int, chapter_url: str, base_path: str
):
"""
Download chapter HTML.
Abort behavior:
- If abort is active AND this chapter has not started skip immediately
- If abort is active BUT this chapter already started finish download/parse/save
Abort logic:
- If abort active AND chapter not started SKIP
- If abort active BUT chapter already started Proceed normally
"""
# ------------------------------------------------------------
# ABORT CHECK BEFORE ANYTHING STARTS
# ------------------------------------------------------------
# -----------------------------------------------------------
# ABORT BEFORE START
# -----------------------------------------------------------
if abort_requested(book_id) and not chapter_started(book_id, chapter_num):
log(f"[ABORT] Skip chapter {chapter_num} (abort active, not started)")
msg = f"[ABORT] Skip chapter {chapter_num} (abort active, not started)"
log_msg(book_id, msg)
return {
"chapter": chapter_num,
"url": chapter_url,
@ -128,27 +145,22 @@ def download_chapter(
"abort": True,
}
# ------------------------------------------------------------
# MARK CHAPTER AS STARTED
# Ensures parse/save must always run even after abort is triggered.
# ------------------------------------------------------------
# Mark started — ensures parse/save must run
mark_chapter_started(book_id, chapter_num)
# ------------------------------------------------------------
# HARD START DELAY
# ------------------------------------------------------------
# Hard delay
if GLOBAL_DELAY > 0:
time.sleep(GLOBAL_DELAY)
save_path = get_save_path(chapter_num, base_path)
# ------------------------------------------------------------
# SKIP IF EXISTS
# ------------------------------------------------------------
# -----------------------------------------------------------
# SKIP existing
# -----------------------------------------------------------
if os.path.exists(save_path):
wait_for_global_delay()
set_global_delay()
log(f"[DL] SKIP chapter {chapter_num} (exists) → {save_path}")
log_msg(book_id, f"[DL] SKIP {chapter_num} (exists) → {save_path}")
return {
"chapter": chapter_num,
"url": chapter_url,
@ -157,22 +169,18 @@ def download_chapter(
"path": save_path,
}
# ------------------------------------------------------------
# GLOBAL DELAY SYNC
# ------------------------------------------------------------
# Sync delay
wait_for_global_delay()
# ------------------------------------------------------------
# GLOBAL CONCURRENCY
# ------------------------------------------------------------
# Acquire concurrency slot
acquire_global_slot(MAX_CONCURRENCY)
log(f"[DL] ACQUIRED SLOT for chapter {chapter_num}")
log_msg(book_id, f"[DL] ACQUIRED SLOT for chapter {chapter_num}")
try:
# ------------------------------------------------------------
# ACTUAL DOWNLOAD
# ------------------------------------------------------------
log(f"[DL] Downloading chapter {chapter_num}: {chapter_url}")
# -----------------------------------------------------------
# HTTP DOWNLOAD
# -----------------------------------------------------------
log_msg(book_id, f"[DL] Downloading chapter {chapter_num}: {chapter_url}")
resp = requests.get(
chapter_url,
@ -184,7 +192,7 @@ def download_chapter(
resp.encoding = resp.apparent_encoding or "gb2312"
html = resp.text
log(f"[DL] OK {chapter_num}: {len(html)} bytes")
log_msg(book_id, f"[DL] OK {chapter_num}: {len(html)} bytes")
return {
"chapter": chapter_num,
@ -198,33 +206,30 @@ def download_chapter(
attempt = self.request.retries
delay = BASE_DELAY * (BACKOFF**attempt)
# ------------------------------------------------------------
# 429 HANDLING
# ------------------------------------------------------------
# 429 hard block
if (
hasattr(exc, "response")
and getattr(exc.response, "status_code", None) == 429
):
log(
f"[DL] 429 → HARD WAIT {DELAY_429}s "
f"(attempt {attempt}/{MAX_RETRIES})"
log_msg(
book_id,
f"[DL] 429 {chapter_num} → WAIT {DELAY_429}s "
f"(attempt {attempt}/{MAX_RETRIES})",
)
time.sleep(DELAY_429)
set_global_delay()
raise self.retry(exc=exc, countdown=0, max_retries=MAX_RETRIES)
# ------------------------------------------------------------
# NORMAL ERRORS
# ------------------------------------------------------------
log(
f"[DL] ERROR on {chapter_url}: {exc} → retry in {delay}s "
f"(attempt {attempt}/{MAX_RETRIES})"
# Normal error
log_msg(
book_id,
f"[DL] ERROR {chapter_num}: {exc} → retry in {delay}s "
f"(attempt {attempt}/{MAX_RETRIES})",
)
raise self.retry(exc=exc, countdown=delay, max_retries=MAX_RETRIES)
finally:
set_global_delay()
release_global_slot()
log(f"[DL] RELEASED SLOT for chapter {chapter_num}")
log_msg(book_id, f"[DL] RELEASED SLOT for chapter {chapter_num}")

@ -7,13 +7,17 @@
# - even if the user triggers abort afterwards
# - (abort only prevents new chapters from starting)
#
# Parsing avoids skipping except when download_result indicates skip.
# Logging:
# - Same unified log_msg(book_id, message) as download_tasks
# - publisher.log → console
# - ui_log.push_ui → GUI
# =========================================================
from celery_app import celery_app
from logbus.publisher import log
from bs4 import BeautifulSoup
from scraper.utils import clean_text, load_replacements
from scraper.tasks.download_tasks import log_msg # unified logger
print(">>> [IMPORT] parse_tasks.py loaded")
@ -24,13 +28,15 @@ def parse_chapter(self, download_result: dict, meta: dict):
Parse raw HTML returned by download_chapter into clean chapter text.
"""
# Extract book_id stored by download_tasks
book_id = download_result.get("book_id", "NOBOOK")
# ------------------------------------------------------------
# 1) DOWNLOAD SKIPPED → PARSE ALSO SKIPS
# (This is the ONLY valid skip in parse)
# ------------------------------------------------------------
if download_result.get("skipped"):
chapter = download_result.get("chapter")
log(f"[PARSE] SKIP chapter {chapter} (download skipped)")
log_msg(book_id, f"[PARSE] SKIP chapter {chapter} (download skipped)")
return download_result
# ------------------------------------------------------------
@ -40,7 +46,7 @@ def parse_chapter(self, download_result: dict, meta: dict):
chapter_url = download_result["url"]
html = download_result["html"]
log(f"[PARSE] Parsing chapter {chapter_num}")
log_msg(book_id, f"[PARSE] Parsing chapter {chapter_num}")
soup = BeautifulSoup(html, "lxml")
@ -64,13 +70,13 @@ def parse_chapter(self, download_result: dict, meta: dict):
raw = node.get_text() if node else soup.get_text()
# ------------------------------------------------------------
# Apply global replacements (from text_replacements file)
# Apply global replacements
# ------------------------------------------------------------
REPL = load_replacements()
text = clean_text(raw, REPL)
# ------------------------------------------------------------
# FIX: chapter 1 header now includes meta information
# Chapter 1 gets full header
# ------------------------------------------------------------
if chapter_num == 1:
book_url = meta.get("book_url") or meta.get("url") or "UNKNOWN"
@ -83,9 +89,10 @@ def parse_chapter(self, download_result: dict, meta: dict):
)
text = header + text
log(f"[PARSE] Parsed chapter {chapter_num}: {len(text)} chars")
log_msg(book_id, f"[PARSE] Parsed chapter {chapter_num}: {len(text)} chars")
return {
"book_id": book_id,
"chapter": chapter_num,
"url": chapter_url,
"text": text,

@ -1,31 +1,18 @@
# =========================================================
# File: scraper/tasks/pipeline.py
# Purpose: Construct Celery chains for chapter processing.
# Purpose:
# Build Celery chains for chapter processing.
#
# Pipeline:
# download_chapter(book_id, chapter_num, url, base_path)
# → parse_chapter(download_result, meta)
# → save_chapter(parsed_result, base_path)
# download → parse → save → update_progress
#
# Abort behavior:
# - download_chapter uses book_id to decide skip vs execute
# - parse/save always run once download has started
# =========================================================
"""
Build the pipeline for a single chapter:
download parse save
This module must NOT import scraping.py or controllers,
otherwise Celery will hit circular imports on worker startup.
Only import task functions here.
"""
from celery import chain
from scraper.tasks.download_tasks import download_chapter
from scraper.tasks.parse_tasks import parse_chapter
from scraper.tasks.save_tasks import save_chapter
from scraper.tasks.progress_tasks import update_progress # NEW
def build_chapter_pipeline(
@ -36,17 +23,17 @@ def build_chapter_pipeline(
meta: dict,
):
"""
Construct a Celery chain for one chapter:
1. download_chapter(book_id, chapter_number, chapter_url, base_path)
2. parse_chapter(download_result, meta)
3. save_chapter(parsed_result, base_path)
Chapter pipeline:
download_chapter(book_id, chapter_num, url, base_path)
parse_chapter(download_result, meta)
save_chapter(parsed_result, base_path)
update_progress(result, book_id)
"""
return chain(
# download_chapter needs: book_id, chapter_num, url, base_path
download_chapter.s(book_id, chapter_number, chapter_url, base_path),
# parse_chapter gets output of download_chapter + meta
parse_chapter.s(meta),
# save_chapter gets parsed result + base_path
save_chapter.s(base_path),
update_progress.s(book_id), # ← centrale progress update
)

@ -0,0 +1,36 @@
# ============================================================
# File: scraper/tasks/progress_tasks.py
# Purpose: Central progress updater for chapter pipelines.
# ============================================================
from celery_app import celery_app
from scraper.progress import inc_completed, inc_skipped, inc_failed
from logbus.publisher import log
print(">>> [IMPORT] progress_tasks.py loaded")
@celery_app.task(bind=False, name="progress.update", queue="controller")
def update_progress(result: dict, book_id: str):
"""
Central progress logic:
- result: output of save_chapter
- book_id: explicitly passed by pipeline
"""
ch = result.get("chapter")
skipped = result.get("skipped", False)
failed = result.get("failed", False)
if failed:
inc_failed(book_id)
log(f"[PROG] FAILED chapter {ch}")
elif skipped:
inc_skipped(book_id)
inc_completed(book_id)
log(f"[PROG] SKIPPED chapter {ch}")
else:
inc_completed(book_id)
log(f"[PROG] DONE chapter {ch}")
return result

@ -1,65 +1,81 @@
# =========================================================
# File: scraper/tasks/save_tasks.py
# Purpose: Save parsed chapter text to disk.
#
# Abort Behavior:
# - Save MUST ALWAYS RUN once download has started.
# - Abort only prevents new chapters from starting (download skip).
# - Save is skipped ONLY when download/parse indicated "skipped".
#
# This guarantees no half-written chapters.
# =========================================================
print(">>> [IMPORT] save_tasks.py loaded")
from celery import shared_task
from logbus.publisher import log
import os
from scraper.utils import get_save_path
from scraper.tasks.download_tasks import log_msg # unified logger
from scraper.progress import (
inc_completed,
inc_skipped,
inc_failed,
add_failed_chapter, # <-- enige noodzakelijke aanvulling
)
@shared_task(bind=True, queue="save", ignore_result=False)
def save_chapter(self, parsed: dict, base_path: str):
print(f">>> [save_tasks] save_chapter() CALLED for chapter {parsed.get('chapter')}")
"""
Save parsed chapter text to disk.
parsed = {
"book_id": str,
"chapter": int,
"text": str,
"url": str,
"skipped": bool,
"path": optional str
}
"""
book_id = parsed.get("book_id", "NOBOOK")
chapter = parsed.get("chapter")
# ------------------------------------------------------------
# SKIP CASE:
# - Only skip when download OR parse indicated skip
# - NOT related to abort (abort never skips parse/save)
# SKIP CASE (from download or parse stage)
# ------------------------------------------------------------
if parsed.get("skipped"):
chapter = parsed.get("chapter")
path = parsed.get("path")
log(f"[SAVE] SKIP chapter {chapter} (already exists or skipped) → {path}")
print(f">>> [save_tasks] SKIPPED {path}")
path = parsed.get("path", "(no-path)")
log_msg(book_id, f"[SAVE] SKIP chapter {chapter}{path}")
inc_skipped(book_id)
return {"chapter": chapter, "path": path, "skipped": True}
# ------------------------------------------------------------
# NORMAL SAVE
# ------------------------------------------------------------
try:
chapter_number = parsed.get("chapter")
url = parsed.get("url")
text = parsed.get("text", "")
url = parsed.get("url")
if not chapter_number:
raise ValueError("Missing chapter_number in parsed payload")
if chapter is None:
raise ValueError("Missing chapter number in parsed payload")
# Ensure base path exists
# Ensure folder exists
os.makedirs(base_path, exist_ok=True)
# Unified filename logic
path = get_save_path(chapter_number, base_path)
# Build file path
path = get_save_path(chapter, base_path)
# ------------------------------------------------------------
# WRITE CHAPTER TEXT TO FILE
# ------------------------------------------------------------
# Write chapter text
with open(path, "w", encoding="utf-8") as f:
f.write(text)
log(f"[SAVE] Saved chapter {chapter_number}{path}")
print(f">>> [save_tasks] SAVED {path}")
log_msg(book_id, f"[SAVE] Saved chapter {chapter}{path}")
inc_completed(book_id)
return {"chapter": chapter_number, "path": path}
return {"book_id": book_id, "chapter": chapter, "path": path}
except Exception as exc:
log(f"[SAVE] ERROR saving chapter from {url}: {exc}")
print(f">>> [save_tasks] ERROR: {exc}")
log_msg(book_id, f"[SAVE] ERROR saving chapter {chapter}: {exc}")
inc_failed(book_id)
add_failed_chapter(book_id, chapter, str(exc)) # <-- essentieel
raise

@ -1,35 +1,45 @@
# scraper/tasks/scraping.py
# ============================================================
# File: scraper/tasks/scraping.py
# Purpose: Scrape metadata + chapter list and initialise
# Redis progress tracking + launch download controller
# ============================================================
from celery_app import celery_app
from logbus.publisher import log
import os
import uuid
import redis
from scraper.sites import BookSite
from scraper.book_scraper import BookScraper
from scraper.abort import clear_abort # no circular deps
print(">>> [IMPORT] scraping.py loaded")
# Redis connection (same as Celery broker)
REDIS_URL = os.getenv("REDIS_BROKER", "redis://redis:6379/0")
r = redis.Redis.from_url(REDIS_URL, decode_responses=True)
@celery_app.task(bind=True, queue="scraping", ignore_result=False)
def start_scrape_book(self, url: str):
"""Scrapes metadata + chapter list using new BookScraper.execute()."""
"""Scrapes metadata + chapters and prepares download tracking."""
log(f"[SCRAPING] Start scraping for: {url}")
# ------------------------------------------------------------
# Book scrape
# ------------------------------------------------------------
site = BookSite()
scraper = BookScraper(site, url)
# ----------------------------------------
# NEW API (old: scraper.parse_book_info())
# ----------------------------------------
result = scraper.execute()
result = scraper.execute() # returns dict with metadata + chapters
chapters = result.get("chapters", [])
full_count = len(chapters)
# ----------------------------------------
# DRY RUN logic
# ----------------------------------------
# ------------------------------------------------------------
# DRY RUN
# ------------------------------------------------------------
DRY_RUN = os.getenv("DRY_RUN", "0") == "1"
TEST_LIMIT = int(os.getenv("TEST_LIMIT", "5"))
@ -40,13 +50,41 @@ def start_scrape_book(self, url: str):
log(f"[SCRAPING] Completed scrape: {len(chapters)}/{full_count} chapters")
# ----------------------------------------
# Dispatch download pipelines
# ----------------------------------------
# ------------------------------------------------------------
# BOOK RUN ID
# ------------------------------------------------------------
book_id = str(uuid.uuid4())
result["book_id"] = book_id
log(f"[SCRAPING] Assigned book_id = {book_id}")
# ------------------------------------------------------------
# RESET ABORT + INITIALISE PROGRESS
# ------------------------------------------------------------
clear_abort(book_id)
r.set(f"progress:{book_id}:total", len(chapters))
r.set(f"progress:{book_id}:done", 0)
r.delete(f"logs:{book_id}") # clear old logs if any
r.rpush(f"logs:{book_id}", f":: SCRAPING STARTED for {url}")
r.rpush(f"logs:{book_id}", f":: Found {len(chapters)} chapters")
# ------------------------------------------------------------
# DISPATCH DOWNLOAD CONTROLLER
# ------------------------------------------------------------
# controller task signature = launch_downloads(book_id, scrape_result)
celery_app.send_task(
"scraper.tasks.controller_tasks.launch_downloads",
args=[result],
args=[book_id, result],
queue="controller",
)
return result
log(f"[SCRAPING] Dispatched download controller for {book_id}")
return {
"book_id": book_id,
"title": result.get("title"),
"author": result.get("author"),
"chapters": len(chapters),
}

@ -0,0 +1,36 @@
# ============================================
# File: scraper/ui_log.py
# Purpose: Central UI log buffer for WebGUI
# Single global buffer. No book_id.
# ============================================
import redis
import os
from datetime import datetime
REDIS_URL = os.getenv("REDIS_BROKER", "redis://redis:6379/0")
LOG_BUFFER_SIZE = int(os.getenv("LOG_BUFFER_SIZE", "1000"))
r = redis.Redis.from_url(REDIS_URL, decode_responses=True)
UI_LOG_KEY = "logs:ui"
def push_ui(message: str):
"""Push a message into the global UI log (no book_id)."""
if not message or not message.strip():
return
ts = datetime.now().strftime("%H:%M:%S")
entry = f"[{ts}] {message}"
r.rpush(UI_LOG_KEY, entry)
r.ltrim(UI_LOG_KEY, -LOG_BUFFER_SIZE, -1)
def get_ui_logs(limit: int = None):
"""Return last N global UI log lines."""
if limit is None:
limit = LOG_BUFFER_SIZE
return r.lrange(UI_LOG_KEY, -limit, -1)

@ -2,103 +2,194 @@
<html lang="nl">
<head>
<meta charset="UTF-8" />
<title>Scrape & Download Resultaat</title>
<title>BookScraper Resultaat</title>
<style>
body {
font-family: Arial, sans-serif;
padding: 40px;
padding: 30px;
max-width: 900px;
margin: auto;
}
h1 {
margin-bottom: 10px;
}
.error {
padding: 15px;
background: #ffdddd;
border-left: 5px solid #ff4444;
margin-bottom: 20px;
}
.box {
padding: 15px;
background: #f7f7f7;
border: 1px solid #ddd;
margin-bottom: 20px;
background: #f8f8f8;
border-radius: 6px;
margin-bottom: 20px;
}
a {
color: #007bff;
text-decoration: none;
}
a:hover {
text-decoration: underline;
.logbox {
background: #000;
color: #0f0;
padding: 12px;
height: 70vh;
overflow-y: auto;
font-family: monospace;
border-radius: 6px;
font-size: 13px;
}
.abort-btn {
padding: 10px 15px;
background: #cc0000;
color: #fff;
#abortBtn {
padding: 12px 20px;
background: #d9534f;
color: white;
border: none;
border-radius: 4px;
border-radius: 6px;
cursor: pointer;
font-size: 15px;
margin-top: 10px;
}
#abortBtn:hover {
background: #c9302c;
}
#statusLine {
font-size: 18px;
font-weight: bold;
}
.abort-btn:hover {
background: #a30000;
.hidden {
display: none;
}
</style>
</head>
<body>
<a href="/">&larr; Terug</a>
<h1>Scrape Resultaat--</h1>
{% if error %}
<div class="error"><strong>Fout:</strong><br />{{ error }}</div>
<div
class="box"
style="background: #ffdddd; border-left: 5px solid #ff4444"
>
<strong>Fout:</strong> {{ error }}
</div>
{% endif %} {% if message %}
<div class="box">{{ message }}</div>
{% endif %}
<h1>Scrape Resultaat</h1>
{% if book %}
<div class="box">
<strong>Titel:</strong> {{ book.title }}<br />
<strong>Auteur:</strong> {{ book.author }}<br />
<div id="statusBox" class="box hidden">
<div id="statusLine">Status: bezig…</div>
<div id="progressText"></div>
<button id="abortBtn" class="hidden">ABORT</button>
</div>
{% if book.description %}
<div class="box">
<strong>Beschrijving:</strong><br />
<p>{{ book.description }}</p>
<!-- FAILED LIST -->
<div
id="failedBox"
class="box hidden"
style="background: #ffefef; border-left: 5px solid #cc0000"
>
<strong>Failed chapters:</strong>
<ul id="failedList" style="margin-top: 10px"></ul>
</div>
{% endif %}
<div class="box">
<strong>Aantal chapters:</strong> {{ book.chapters|length }}
<strong>Live log:</strong>
<div id="logbox" class="logbox"></div>
</div>
{% if book.chapters %}
<div class="box">
<strong>Chapters:</strong><br /><br />
<ul>
{% for ch in book.chapters %}
<li>
<a href="{{ ch.url }}" target="_blank">
Chapter {{ ch.num }} — {{ ch.title }}
</a>
</li>
{% endfor %}
</ul>
</div>
{% endif %} {% if download_job_id %}
<div class="box">
<strong>Download pipeline gestart!</strong><br />
Job ID: <code>{{ download_job_id }}</code>
</div>
<script>
const scrapingTaskId = "{{ scraping_task_id or '' }}";
<div class="box">
<form method="POST" action="/abort/{{ book.title }}">
<button class="abort-btn" type="submit">⛔ Abort Download</button>
</form>
</div>
{% endif %} {% endif %}
let bookId = null;
let polling = true;
if (scrapingTaskId) pollForBookId();
function pollForBookId() {
fetch(`/celery-result/${scrapingTaskId}`)
.then((r) => r.json())
.then((data) => {
if (data.ready && data.result && data.result.book_id) {
bookId = data.result.book_id;
startLiveUI();
} else setTimeout(pollForBookId, 800);
})
.catch(() => setTimeout(pollForBookId, 1200));
}
function startLiveUI() {
document.getElementById("statusBox").classList.remove("hidden");
document.getElementById("abortBtn").classList.remove("hidden");
document.getElementById("abortBtn").onclick = () => {
fetch(`/abort/${bookId}`, { method: "POST" });
};
pollProgress();
pollLogs();
}
function pollProgress() {
// FIX → UI blijft renderen, polling stopt alleen herhaling
if (!bookId) return;
fetch(`/progress/${bookId}`)
.then((r) => r.json())
.then((p) => {
const done = p.completed || 0;
const total = p.total || 0;
document.getElementById(
"progressText"
).innerText = `Completed: ${done} / ${total} | Skipped: ${
p.skipped || 0
} | Failed: ${p.failed || 0}`;
// FAILED LIST
const failedBox = document.getElementById("failedBox");
const failedList = document.getElementById("failedList");
if (p.failed_list && p.failed_list.length > 0) {
failedBox.classList.remove("hidden");
failedList.innerHTML = "";
p.failed_list.forEach((entry) => {
const li = document.createElement("li");
li.textContent = entry;
failedList.appendChild(li);
});
}
// STATUS
if (p.abort) {
document.getElementById("statusLine").innerText = "ABORTED";
polling = false;
} else if (done >= total && total > 0) {
document.getElementById("statusLine").innerText = "KLAAR ✔";
polling = false;
} else {
document.getElementById("statusLine").innerText = "Bezig…";
}
// STOP repetitieve polling, maar blijf renderen
if (polling) setTimeout(pollProgress, 1000);
})
.catch(() => {
if (polling) setTimeout(pollProgress, 1500);
});
}
function pollLogs() {
if (!polling) return;
fetch(`/logs`)
.then((r) => r.json())
.then((data) => {
const logbox = document.getElementById("logbox");
logbox.innerHTML = "";
data.logs.forEach((line) => {
const div = document.createElement("div");
div.textContent = line;
logbox.appendChild(div);
});
logbox.scrollTop = logbox.scrollHeight;
setTimeout(pollLogs, 1000);
})
.catch(() => setTimeout(pollLogs, 1500));
}
</script>
</body>
</html>

@ -0,0 +1,15 @@
<h2>{{ title }}</h2>
<div>
<button id="abortBtn">Abort Download</button>
</div>
<div id="progressBox">
<div id="bar"></div>
</div>
<pre id="logBox">Loading…</pre>
<script>
// Poll elke seconde
</script>
Loading…
Cancel
Save