# ========================================================= # File: scraper/tasks/parse_tasks.py # Purpose: Parse downloaded HTML into clean chapter text. # Enhanced version: Piaotia H1→content extractor + clean pipeline # NO HARDCODED REPLACEMENTS — everything comes from replacement files # ========================================================= from celery_app import celery_app from bs4 import BeautifulSoup from scraper.utils import clean_text, load_all_replacements from scraper.tasks.download_tasks import log_msg # unified logger from bs4 import NavigableString, Comment print(">>> [IMPORT] parse_tasks.py loaded (enhanced parser)") def extract_piaotia_content(soup): """ Extract clean chapter content from Piaotia pages. Start after the table following

. End before nav/ads/footer/copyright. """ h1 = soup.find("h1") if not h1: return None # -------- Find first table after

-------- table = None for sib in h1.next_siblings: if getattr(sib, "name", None) == "table": table = sib break if not table: return None parts = [] # -------- Iterate after table -------- for sib in table.next_siblings: name = getattr(sib, "name", None) text = None if hasattr(sib, "get_text"): text = sib.get_text(strip=True) # === STOP CONDITIONS === # Comments like if isinstance(sib, Comment) and ("翻页" in sib): break # Explicit footer blocks if name == "div": sid = sib.get("id", "") cls = sib.get("class", []) if sid in ("thumb", "tags", "tips", "Commenddiv", "feit2"): break # Copyright block — strongest indicator if text and ("重要声明" in text or "Copyright" in text): break # Navigation or 推荐阅读 if text and (text.startswith(("推荐阅读", "目录", "目 录"))): break # Skip scripts, ads, centers if name in ("script", "style"): continue # Skip JS containers like
if name == "center": continue # === ACCUMULATE TEXT === if isinstance(sib, NavigableString): s = sib.strip() if s: parts.append(s) elif hasattr(sib, "get_text"): t = sib.get_text(separator="\n").strip() if t: parts.append(t) return "\n".join(parts).strip() @celery_app.task(bind=True, queue="parse", ignore_result=False) def parse_chapter(self, download_result: dict): """ New signature under chapter_dict pipeline: - receives ONLY the output dict from download_chapter - book_meta is inside download_result["book_meta"] - chapter_dict is inside download_result["chapter"] """ book_id = download_result.get("book_id", "NOBOOK") chapter_dict = download_result.get("chapter") or {} book_meta = download_result.get("book_meta") or {} chapter_title = chapter_dict.get("title") chapter_num = chapter_dict.get("num") chapter_url = chapter_dict.get("url") html = download_result.get("html") # ------------------------------------------------------------ # SKIPPED DOWNLOAD → SKIP PARSE # ------------------------------------------------------------ if download_result.get("skipped"): log_msg(book_id, f"[PARSE] SKIP chapter {chapter_num} (download skipped)") return download_result # already has chapter + book_meta + skipped log_msg(book_id, f"[PARSE] Parsing chapter {chapter_num}") soup = BeautifulSoup(html, "lxml") # ------------------------------------------------------------ # STRICT SELECTORS (direct content blocks) # ------------------------------------------------------------ selectors = [ "#content", "div#content", ".content", "div.content", "#chaptercontent", "div#chaptercontent", "#chapterContent", ".read-content", "div.read-content", ] node = None for sel in selectors: tmp = soup.select_one(sel) if tmp: node = tmp break raw = None # --- STRICT SELECTOR FAILED → Try Piaotia extractor --- if node is None: raw = extract_piaotia_content(soup) # # ------------------------------------------------------------ # # PIAOTIA FALLBACK: # # Extract content between

and the "bottomlink" block. # # ------------------------------------------------------------ # raw = None # if node is None: # h1 = soup.find("h1") # if h1: # content_parts = [] # for sib in h1.next_siblings: # sib_class = getattr(sib, "get", lambda *_: None)("class") # if sib_class and ( # "bottomlink" in sib_class or sib_class == "bottomlink" # ): # break # if getattr(sib, "name", None) in ["script", "style", "center"]: # continue # if hasattr(sib, "get_text"): # content_parts.append(sib.get_text(separator="\n")) # else: # content_parts.append(str(sib)) # raw = "\n".join(content_parts) # ------------------------------------------------------------ # FINAL FALLBACK # ------------------------------------------------------------ if raw is None: if node: raw = node.get_text(separator="\n") else: for tag in soup(["script", "style", "noscript"]): tag.decompose() raw = soup.get_text(separator="\n") # ------------------------------------------------------------ # MULTIPASS CLEANING via replacement files ONLY # ------------------------------------------------------------ REPL = load_all_replacements() text = raw for _ in range(5): text = clean_text(text, REPL) # ------------------------------------------------------------ # Collapse excessive empty lines # ------------------------------------------------------------ cleaned = [] prev_blank = False for line in text.split("\n"): stripped = line.rstrip() if stripped == "": if prev_blank: continue prev_blank = True cleaned.append("") else: prev_blank = False cleaned.append(stripped) text = "\n".join(cleaned) text = chapter_title + "\n" + text # ------------------------------------------------------------ # Add header to chapter 1 # ------------------------------------------------------------ if chapter_num == 1: book_url = book_meta.get("book_url") or book_meta.get("url") or "UNKNOWN" header = ( f"{book_meta.get('title','')}\n" f"Author: {book_meta.get('author','')}\n" f"Description:\n{book_meta.get('description','')}\n" f"Book URL: {book_url}\n" + "-" * 50 + "\n\n" ) text = header + text log_msg(book_id, f"[PARSE] Parsed chapter {chapter_num}: {len(text)} chars") # NEW RETURN FORMAT: chapter_dict stays intact return { "book_id": book_id, "chapter": chapter_dict, "text": text, "length": len(text), "book_meta": book_meta, }