You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
kmftools/bookscraper/db/db.py

161 lines
4.6 KiB

# ============================================================
# File: db/db.py (UPDATED for book_idx-only architecture)
# Purpose:
# Raw SQLite engine for BookScraper.
# - Connection management
# - init_db() schema creation + safe schema upgrade
# - upsert_book() atomic write (now uses book_idx)
# - raw fetch helpers
# ============================================================
import os
import sqlite3
from threading import Lock
DB_PATH = os.environ.get("BOOKSCRAPER_DB", "/app/data/books.db")
# Ensure directory exists
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
# Per-process connection cache
_connection_cache = {}
_connection_lock = Lock()
# ------------------------------------------------------------
# Connection handling
# ------------------------------------------------------------
def get_db():
pid = os.getpid()
if pid not in _connection_cache:
with _connection_lock:
conn = sqlite3.connect(DB_PATH, check_same_thread=False)
conn.row_factory = sqlite3.Row
enable_wal_mode(conn)
_connection_cache[pid] = conn
return _connection_cache[pid]
def enable_wal_mode(conn):
conn.execute("PRAGMA journal_mode=DELETE;")
conn.execute("PRAGMA synchronous=NORMAL;")
conn.commit()
# ------------------------------------------------------------
# Schema creation + SAFE schema upgrades
# ------------------------------------------------------------
def init_db():
conn = get_db()
# --------------------------------------------------------
# BASE SCHEMA — book_idx is now PRIMARY KEY
# --------------------------------------------------------
conn.execute(
"""
CREATE TABLE IF NOT EXISTS books (
book_idx INTEGER PRIMARY KEY,
title TEXT,
author TEXT,
description TEXT,
cover_url TEXT,
cover_path TEXT,
book_url TEXT,
chapters_total INTEGER,
status TEXT,
downloaded INTEGER DEFAULT 0,
parsed INTEGER DEFAULT 0,
audio_done INTEGER DEFAULT 0,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
processdate DATETIME,
last_update DATETIME
);
"""
)
conn.commit()
# --------------------------------------------------------
# SCHEMA UPGRADE UTILITY
# --------------------------------------------------------
def add_column(name, type_):
try:
conn.execute(f"ALTER TABLE books ADD COLUMN {name} {type_};")
except:
pass # column already exists
cols = conn.execute("PRAGMA table_info(books);").fetchall()
colnames = [c[1] for c in cols]
# --------------------------------------------------------
# UPGRADE NEW FIELDS — future-proof, matched with Redis state model
# --------------------------------------------------------
# (book_idx already exists as PRIMARY KEY — no need to add)
add_column("description", "TEXT")
add_column("cover_path", "TEXT")
add_column("book_url", "TEXT")
# Download counters
add_column("chapters_download_done", "INTEGER DEFAULT 0")
add_column("chapters_download_skipped", "INTEGER DEFAULT 0")
# Audio counters
add_column("audio_skipped", "INTEGER DEFAULT 0")
# Optional future fields
add_column("audio_total", "INTEGER DEFAULT 0")
conn.commit()
# ------------------------------------------------------------
# WRITE OPERATIONS (book_idx-based UPSERT)
# ------------------------------------------------------------
def upsert_book(book_idx, **fields):
"""
UPSERT by book_idx.
Replaces old upsert that used book_id.
"""
conn = get_db()
keys = ["book_idx"] + list(fields.keys())
values = [book_idx] + list(fields.values())
placeholders = ",".join(["?"] * len(values))
updates = ", ".join([f"{k} = excluded.{k}" for k in fields.keys()])
sql = f"""
INSERT INTO books ({','.join(keys)})
VALUES ({placeholders})
ON CONFLICT(book_idx)
DO UPDATE SET {updates},
last_update = CURRENT_TIMESTAMP;
"""
conn.execute(sql, values)
conn.commit()
# ------------------------------------------------------------
# RAW READ OPERATIONS
# ------------------------------------------------------------
def _raw_get_book(book_idx):
conn = get_db()
row = conn.execute(
"SELECT * FROM books WHERE book_idx = ?;", (book_idx,)
).fetchone()
return dict(row) if row else None
def _raw_get_all_books():
conn = get_db()
cur = conn.execute("SELECT * FROM books ORDER BY created_at DESC;")
return [dict(row) for row in cur.fetchall()]