scaffolding new celery branch

celery-integration
peter.fong 2 weeks ago
parent 3ed85d08e3
commit fe4ed78802

@ -2,6 +2,7 @@
from flask import Flask, request, Response, render_template
import time
import queue
import redis
import os
from scraper.book_scraper import BookScraper
@ -10,6 +11,8 @@ from scraper.logger import add_listener, remove_listener, LOG_BUFFER
app = Flask(__name__)
r = redis.Redis.from_url("redis://redis:6379/0")
@app.route("/")
def index():
@ -41,22 +44,12 @@ def run_scraper():
def stream():
def event_stream():
q = queue.Queue()
# push logregels van BookScraper naar SSE
def listener(line):
q.put(line)
add_listener(listener)
try:
while True:
msg = q.get() # blokkeert totdat logregel binnenkomt
yield f"data: {msg}\n\n"
except GeneratorExit:
pass
finally:
remove_listener(listener)
pub = r.pubsub()
pub.subscribe("logs")
for msg in pub.listen():
if msg["type"] == "message":
yield f"data: {msg['data'].decode()}\n\n"
return Response(event_stream(), mimetype="text/event-stream")

@ -0,0 +1,21 @@
# celery_app.py
from celery import Celery
import os
REDIS_URL = os.getenv("REDIS_URL", "redis://redis:6379/0")
celery_app = Celery(
"bookscraper",
broker=REDIS_URL,
backend=REDIS_URL
)
celery_app.conf.update(
task_default_queue="default",
task_routes={
"tasks.scraping.*": {"queue": "scraping"},
"tasks.audio.*": {"queue": "audio"},
},
worker_prefetch_multiplier=1, # important for concurrency=1 workers
task_acks_late=True,
)

@ -1,7 +1,11 @@
version: "3.9"
services:
bookscraper:
# -------------------------
# WEB UI
# -------------------------
web:
build:
context: .
dockerfile: Dockerfile
@ -9,17 +13,75 @@ services:
ports:
- "5050:5000"
# Mount alles zoals je lokaal al werkt
volumes:
- .:/app # volledige projectmap
- .:/app
- /Users/peter/Desktop/books:/app/output
# Bestaande .env wordt automatisch geladen door Docker Compose
env_file:
- .env
# Zorg dat Flask NIET in debugmode gaat (jouw code bepaalt dit)
environment:
FLASK_ENV: "production"
restart: unless-stopped
depends_on:
- redis
# -------------------------
# SCRAPING WORKER
# (1 concurrency, 1 job tegelijk)
# -------------------------
scraper_worker:
build:
context: .
dockerfile: Dockerfile
container_name: scraper_worker
command: python worker/scrape_worker.py
volumes:
- .:/app
- /Users/peter/Desktop/books:/app/output
env_file:
- .env
restart: unless-stopped
depends_on:
- redis
# -------------------------
# AUDIO WORKER
# -------------------------
audio_worker:
build:
context: .
dockerfile: Dockerfile
container_name: audio_worker
command: python worker/audio_worker.py
volumes:
- .:/app
- /Users/peter/Desktop/books:/app/output
env_file:
- .env
restart: unless-stopped
depends_on:
- redis
# -------------------------
# REDIS (LOGS & QUEUE)
# -------------------------
redis:
image: redis:alpine
container_name: redis
ports:
- "6379:6379"
restart: unless-stopped

@ -0,0 +1,17 @@
# docker/Dockerfile.audio
FROM python:3.11-slim
WORKDIR /app
# Install audio processing dependencies (extend later)
RUN apt-get update && apt-get install -y --no-install-recommends \
ffmpeg \
libavcodec-extra \
&& rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["python", "worker/audio_worker.py"]

@ -0,0 +1,17 @@
# docker/Dockerfile.scraper
FROM python:3.11-slim
WORKDIR /app
# Scraper-specific system dependencies
RUN apt-get update && apt-get install -y --no-install-recommends \
libjpeg62-turbo-dev \
zlib1g-dev \
&& rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["python", "worker/scrape_worker.py"]

@ -0,0 +1,16 @@
# docker/Dockerfile.web
FROM python:3.11-slim
WORKDIR /app
# Install only Python deps
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy the entire app
COPY . .
# Flask runs on port 5000
EXPOSE 5000
CMD ["python", "app.py"]

@ -0,0 +1,12 @@
# logbus/publisher.py
import redis
import os
REDIS_URL = os.getenv("REDIS_URL", "redis://redis:6379/0")
r = redis.Redis.from_url(REDIS_URL)
def log(message):
print("[LOG]", message)
r.publish("logs", message)

@ -0,0 +1,5 @@
from tasks.audio import text_to_audio
def enqueue_audio(path):
return text_to_audio.delay(path)

@ -0,0 +1,5 @@
from tasks.scraping import scrape_book
def enqueue_book(url):
return scrape_book.delay(url)

@ -0,0 +1,3 @@
# reserved for future job-tracking models
class BookJob:
pass

@ -0,0 +1,10 @@
# tasks/audio.py
from celery import shared_task
from logbus.publisher import log
@shared_task(bind=True, queue="audio")
def text_to_audio(self, text_file):
log(f"[AUDIO] converting: {text_file}")
# placeholder for macOS "say"
return True

@ -0,0 +1,10 @@
# tasks/pipeline.py
from celery import shared_task
from tasks.scraping import scrape_book
from tasks.audio import text_to_audio
@shared_task(bind=True)
def scrape_and_convert(self, url):
result = scrape_book.delay(url)
return result.id

@ -0,0 +1,17 @@
# tasks/scraping.py
from celery import shared_task
from scraper.book_scraper import BookScraper
from scraper.sites import BookSite
from logbus.publisher import log
@shared_task(bind=True, queue="scraping")
def scrape_book(self, url):
log(f"START scraping: {url}")
site = BookSite()
scraper = BookScraper(site, url)
result = scraper.execute()
log(f"FINISHED scraping: {url}")
return {"title": result["title"]}

@ -0,0 +1,10 @@
# worker/audio_worker.py
from celery_app import celery_app
if __name__ == "__main__":
celery_app.worker_main([
"worker",
"-Q", "audio",
"--concurrency=10",
"--hostname=audio@%h"
])

@ -0,0 +1,9 @@
# worker/general_worker.py
from celery_app import celery_app
if __name__ == "__main__":
celery_app.worker_main([
"worker",
"--concurrency=4",
"--hostname=general@%h"
])

@ -0,0 +1,10 @@
# worker/scrape_worker.py
from celery_app import celery_app
if __name__ == "__main__":
celery_app.worker_main([
"worker",
"-Q", "scraping",
"--concurrency=1",
"--hostname=scraper@%h"
])
Loading…
Cancel
Save