diff --git a/bookscraper/logbus/publisher.py b/bookscraper/logbus/publisher.py index faa9f2d..960aec2 100644 --- a/bookscraper/logbus/publisher.py +++ b/bookscraper/logbus/publisher.py @@ -1,12 +1,19 @@ # logbus/publisher.py -import redis -import os -REDIS_URL = os.getenv("REDIS_BROKER", "redis://redis:6379/0") +import logging -r = redis.Redis.from_url(REDIS_URL) +logger = logging.getLogger("logbus") -def log(message): - print("[LOG]", message) - r.publish("logs", message) +def log(message: str): + """ + Compact logging: + - Geen lege regels + - Alleen prefix '[LOG]' als message niet leeg is + - Message staat op één regel + """ + + if not message or not message.strip(): + return # skip log entirely + + logger.warning(f"[LOG] {message}") diff --git a/bookscraper/scraper/tasks/download_tasks.py b/bookscraper/scraper/tasks/download_tasks.py index 837c99e..479ee47 100644 --- a/bookscraper/scraper/tasks/download_tasks.py +++ b/bookscraper/scraper/tasks/download_tasks.py @@ -28,7 +28,6 @@ MAX_CONCURRENCY = int(os.getenv("DOWNLOAD_MAX_GLOBAL_CONCURRENCY", "1")) GLOBAL_DELAY = int(os.getenv("DOWNLOAD_GLOBAL_MIN_DELAY", "1")) DELAY_KEY = "download:delay_lock" - # --------------------------- # Redis connection # --------------------------- @@ -42,10 +41,7 @@ SEM_KEY = "download:active" # semaphore counter # GLOBAL DELAY FUNCTIONS # ====================================================== def wait_for_global_delay(): - """ - Block until no global delay lock exists. - Prevents hammering the server too fast. - """ + """Block while delay lock exists.""" if GLOBAL_DELAY <= 0: return @@ -54,16 +50,10 @@ def wait_for_global_delay(): def set_global_delay(): - """ - After finishing a download (or skip), - set a TTL lock so all workers wait a minimum time. - """ + """Set TTL lock after a download completes.""" if GLOBAL_DELAY <= 0: return - # SET key NX EX: - # - only set if not existing - # - expires automatically redis_client.set(DELAY_KEY, "1", nx=True, ex=GLOBAL_DELAY) @@ -71,10 +61,7 @@ def set_global_delay(): # GLOBAL CONCURRENCY FUNCTIONS # ====================================================== def acquire_global_slot(max_slots: int, retry_delay: float = 0.5): - """ - GLOBAL semaphore with Redis. - Atomic INCR. If limit exceeded, undo & wait. - """ + """Semaphore using Redis, atomic INCR.""" while True: current = redis_client.incr(SEM_KEY) if current <= max_slots: @@ -85,12 +72,11 @@ def acquire_global_slot(max_slots: int, retry_delay: float = 0.5): def release_global_slot(): - """Release semaphore.""" redis_client.decr(SEM_KEY) print(f">>> [CONFIG] Global concurrency = {MAX_CONCURRENCY}") -print(f">>> [CONFIG] Global min delay = {GLOBAL_DELAY}s") +print(f">>> [CONFIG] Global min delay = {GLOBAL_DELAY}s") print( f">>> [CONFIG] download retries = " f"max={MAX_RETRIES}, base={BASE_DELAY}, backoff={BACKOFF}, 429={DELAY_429}" @@ -107,14 +93,21 @@ print( ) def download_chapter(self, chapter_num: int, chapter_url: str, base_path: str): """ - base_path komt uit pipeline.py - Download wordt SKIPPED als het bestand al bestaat. + Download chapter HTML. + Ensures: + - global delay throttle + - global concurrency limit + - hard blocking on 429 """ + # HARD DELAY at every execution to prevent retry storms + if GLOBAL_DELAY > 0: + time.sleep(GLOBAL_DELAY) + save_path = get_save_path(chapter_num, base_path) # ------------------------------------------------------------------ - # 1. SKIP IF EXISTS — maar WEL global delay zetten! + # 1. SKIP IF EXISTS (still delay to maintain consistency) # ------------------------------------------------------------------ if os.path.exists(save_path): wait_for_global_delay() @@ -130,19 +123,19 @@ def download_chapter(self, chapter_num: int, chapter_url: str, base_path: str): } # ------------------------------------------------------------------ - # 2. GLOBAL DELAY — throttle downloads globally + # 2. GLOBAL DELAY SYNC # ------------------------------------------------------------------ wait_for_global_delay() # ------------------------------------------------------------------ - # 3. GLOBAL CONCURRENCY — only X downloads at the same time + # 3. GLOBAL CONCURRENCY # ------------------------------------------------------------------ acquire_global_slot(MAX_CONCURRENCY) log(f"[DL] ACQUIRED SLOT for chapter {chapter_num}") try: # ------------------------------------------------------------------ - # 4. DOWNLOAD LOGIC + # 4. DO THE DOWNLOAD # ------------------------------------------------------------------ log(f"[DL] Downloading chapter {chapter_num}: {chapter_url}") @@ -167,23 +160,34 @@ def download_chapter(self, chapter_num: int, chapter_url: str, base_path: str): } except Exception as exc: - # ------------------------------------------------------------------ - # 5. RETRY LOGIC - # ------------------------------------------------------------------ attempt = self.request.retries delay = BASE_DELAY * (BACKOFF**attempt) + # ============================================================= + # HARD 429 BLOCK — DO NOT RELEASE SLOT YET + # ============================================================= if ( hasattr(exc, "response") and getattr(exc.response, "status_code", None) == 429 ): - delay = DELAY_429 + delay + log( - f"[DL] 429 Too Many Requests → retry in {delay}s " + f"[DL] 429 Too Many Requests → HARD WAIT {DELAY_429}s " f"(attempt {attempt}/{MAX_RETRIES})" ) - raise self.retry(exc=exc, countdown=delay, max_retries=MAX_RETRIES) + # HARD BLOCK: worker sleeps, still holding the slot + time.sleep(DELAY_429) + + # After 429 wait, also apply global delay + set_global_delay() + + # Retry immediately (countdown=0) + raise self.retry(exc=exc, countdown=0, max_retries=MAX_RETRIES) + + # ============================================================= + # NORMAL ERRORS + # ============================================================= log( f"[DL] ERROR on {chapter_url}: {exc} → retry in {delay}s " f"(attempt {attempt}/{MAX_RETRIES})" @@ -191,9 +195,9 @@ def download_chapter(self, chapter_num: int, chapter_url: str, base_path: str): raise self.retry(exc=exc, countdown=delay, max_retries=MAX_RETRIES) finally: - # ------------------------------------------------------------------ - # 6. ALWAYS set delay + release semaphore - # ------------------------------------------------------------------ + # ============================================================= + # ALWAYS RELEASE SLOT AFTER HARD BLOCK / NORMAL WORK + # ============================================================= set_global_delay() release_global_slot() log(f"[DL] RELEASED SLOT for chapter {chapter_num}") diff --git a/bookscraper/scraper/tasks/scraping.py b/bookscraper/scraper/tasks/scraping.py index 8003cf8..ccdacd5 100644 --- a/bookscraper/scraper/tasks/scraping.py +++ b/bookscraper/scraper/tasks/scraping.py @@ -1,52 +1,48 @@ # scraper/tasks/scraping.py -# + from celery_app import celery_app from logbus.publisher import log import os from scraper.sites import BookSite from scraper.book_scraper import BookScraper -from scraper.tasks.controller_tasks import launch_downloads print(">>> [IMPORT] scraping.py loaded") @celery_app.task(bind=True, queue="scraping", ignore_result=False) def start_scrape_book(self, url: str): - """Scrapes metadata + chapter list.""" + """Scrapes metadata + chapter list using new BookScraper.execute().""" log(f"[SCRAPING] Start scraping for: {url}") site = BookSite() scraper = BookScraper(site, url) - scraper.parse_book_info() - chapters = scraper.get_chapter_list() + # ---------------------------------------- + # NEW API (old: scraper.parse_book_info()) + # ---------------------------------------- + result = scraper.execute() + + chapters = result.get("chapters", []) full_count = len(chapters) + # ---------------------------------------- + # DRY RUN logic + # ---------------------------------------- DRY_RUN = os.getenv("DRY_RUN", "0") == "1" TEST_LIMIT = int(os.getenv("TEST_LIMIT", "5")) if DRY_RUN: - log(f"[SCRAPING] DRY_RUN: limiting chapters to first {TEST_LIMIT}") + log(f"[SCRAPING] DRY_RUN: limiting chapters to {TEST_LIMIT}") chapters = chapters[:TEST_LIMIT] - - # --------------------------------------------------- - # FIX: add book_url so parse_chapter has the real url - # --------------------------------------------------- - result = { - "title": scraper.book_title, - "author": scraper.book_author, - "description": scraper.book_description, - "cover": scraper.cover_url, - "book_url": url, - "chapters": [ - {"num": ch.number, "title": ch.title, "url": ch.url} for ch in chapters - ], - } + result["chapters"] = chapters log(f"[SCRAPING] Completed scrape: {len(chapters)}/{full_count} chapters") + # ---------------------------------------- + # Dispatch download pipelines + # ---------------------------------------- celery_app.send_task( "scraper.tasks.controller_tasks.launch_downloads", args=[result],