From 9d969e61fd8cbffdb665d43df89b34e830c32050 Mon Sep 17 00:00:00 2001 From: Nama Kamu Date: Sun, 26 Apr 2026 22:08:41 +0800 Subject: [PATCH] update --- src/ocr_sprint/api/routes/documents.py | 32 ++++++++++++++---- src/ocr_sprint/api/routes/health.py | 20 +++++++++++ src/ocr_sprint/main.py | 46 +++++++++++++++++++++++++- src/ocr_sprint/pipeline/ocr.py | 13 ++++++++ src/ocr_sprint/pipeline/table.py | 12 +++++++ src/ocr_sprint/worker/celery_app.py | 33 ++++++++++++++++++ 6 files changed, 149 insertions(+), 7 deletions(-) diff --git a/src/ocr_sprint/api/routes/documents.py b/src/ocr_sprint/api/routes/documents.py index ea5e970..88d42ac 100644 --- a/src/ocr_sprint/api/routes/documents.py +++ b/src/ocr_sprint/api/routes/documents.py @@ -10,7 +10,10 @@ flow on top: * `POST /documents?sync=true` — runs the pipeline inline (the original Phase 1 behaviour). Useful for tests and small-volume single-tenant deploys without - a Celery worker. + a Celery worker. The heavy OCR work is + offloaded to a thread-pool executor so the + uvicorn event loop stays responsive during + processing (~30-120s on CPU). * `GET /documents/{job_id}` — returns the current job state. Async clients poll this until `status` is in a terminal state (completed / needs_review / @@ -19,9 +22,19 @@ flow on top: from __future__ import annotations +import asyncio +from concurrent.futures import ThreadPoolExecutor +from functools import partial from typing import Annotated from uuid import UUID, uuid4 +# Thread pool dedicated to blocking OCR work. Using a *separate* pool +# (rather than the default loop executor) lets us cap the number of +# concurrent heavy OCR jobs independently of other thread-pool users. +# With 1 Celery worker + 1 sync slot we never exceed 2 parallel OCR +# runs; keep the pool at 1 so RAM stays bounded on the 7.4 GB server. +_OCR_EXECUTOR = ThreadPoolExecutor(max_workers=1, thread_name_prefix="ocr-inline") + from fastapi import ( APIRouter, Depends, @@ -165,11 +178,13 @@ async def create_document( async def _run_inline(job_id: UUID, content: bytes) -> DocumentResponse: - """Synchronous pipeline execution. + """Run the OCR pipeline without blocking the uvicorn event loop. - Each state transition opens its own short session so the request-scoped - session's rollback-on-exception behaviour cannot wipe out the - ``mark_failed`` write or strand the blob on disk. + ``run_pipeline`` is CPU-bound and can take 30-120 s on a 2 vCPU server. + Awaiting it directly on the async handler would freeze the entire event + loop (and therefore block health-checks, metrics, and every other request) + for the full duration. We push the work onto a dedicated single-thread + executor so the loop stays free while the OCR runs in the background. """ import time @@ -177,8 +192,13 @@ async def _run_inline(job_id: UUID, content: bytes) -> DocumentResponse: JobRepository(s).mark_processing(job_id) started = time.perf_counter() + loop = asyncio.get_event_loop() try: - output = run_pipeline(content) + # run_pipeline is synchronous; wrap it so asyncio can await it. + output = await loop.run_in_executor( + _OCR_EXECUTOR, + partial(run_pipeline, content), + ) except ValueError as exc: with session_scope() as s: JobRepository(s).mark_failed(job_id, error=str(exc)) diff --git a/src/ocr_sprint/api/routes/health.py b/src/ocr_sprint/api/routes/health.py index 7a01b81..d3786a7 100644 --- a/src/ocr_sprint/api/routes/health.py +++ b/src/ocr_sprint/api/routes/health.py @@ -3,8 +3,11 @@ from __future__ import annotations from fastapi import APIRouter +from fastapi.responses import JSONResponse from ocr_sprint import __version__ +from ocr_sprint.pipeline import ocr as _ocr +from ocr_sprint.pipeline import table as _table router = APIRouter(tags=["health"]) @@ -13,3 +16,20 @@ router = APIRouter(tags=["health"]) async def health() -> dict[str, str]: """Lightweight liveness check — does NOT touch the OCR engine.""" return {"status": "ok", "version": __version__} + + +@router.get("/health/ready") +async def readiness() -> JSONResponse: + """Readiness check — returns 200 when OCR models are loaded, 503 if still warming up.""" + ocr_ready = _ocr._instance is not None + table_ready = _table._instance is not None + ready = ocr_ready and table_ready + payload = { + "status": "ready" if ready else "warming_up", + "version": __version__, + "models": { + "paddleocr": "ready" if ocr_ready else "loading", + "pp_structure": "ready" if table_ready else "loading", + }, + } + return JSONResponse(content=payload, status_code=200 if ready else 503) diff --git a/src/ocr_sprint/main.py b/src/ocr_sprint/main.py index 9005ae9..21acd41 100644 --- a/src/ocr_sprint/main.py +++ b/src/ocr_sprint/main.py @@ -2,6 +2,10 @@ from __future__ import annotations +import threading +from contextlib import asynccontextmanager +from typing import AsyncIterator + from fastapi import FastAPI from ocr_sprint import __version__ @@ -11,7 +15,10 @@ from ocr_sprint.api.routes import documents, ground_truth, health from ocr_sprint.config import get_settings from ocr_sprint.db import models as _models # noqa: F401 (register ORM tables) from ocr_sprint.db.base import Base, get_engine -from ocr_sprint.utils.logging import configure_logging +from ocr_sprint.utils.logging import configure_logging, get_logger + + +_startup_logger = get_logger(__name__) def _ensure_schema() -> None: @@ -24,6 +31,42 @@ def _ensure_schema() -> None: Base.metadata.create_all(bind=get_engine()) +def _warmup_models_background() -> None: + """Load PaddleOCR and PP-Structure models in a background thread. + + Running in a thread keeps the lifespan non-blocking so uvicorn can + start accepting health-check requests immediately while the heavy models + load (~5-15s on CPU). Requests that arrive before warmup completes will + wait on the existing _lock in each module rather than racing to load. + """ + from ocr_sprint.config import get_settings as _gs + from ocr_sprint.pipeline import ocr as _ocr + from ocr_sprint.pipeline import table as _table + + s = _gs() + try: + _ocr.warmup() + except Exception as exc: + _startup_logger.warning("paddleocr.warmup.failed", error=str(exc)) + + if s.tables_enabled: + try: + _table.warmup() + except Exception as exc: + _startup_logger.warning("pp_structure.warmup.failed", error=str(exc)) + + +@asynccontextmanager +async def lifespan(app: FastAPI) -> AsyncIterator[None]: + """FastAPI lifespan: warm OCR models on startup in a background thread.""" + _startup_logger.info("startup.warmup.begin") + t = threading.Thread(target=_warmup_models_background, name="ocr-warmup", daemon=True) + t.start() + yield + # Shutdown: nothing to clean up (models are process-global singletons). + _startup_logger.info("shutdown.complete") + + def create_app() -> FastAPI: """Application factory — keeps top-level state easy to test.""" settings = get_settings() @@ -34,6 +77,7 @@ def create_app() -> FastAPI: root_path = getattr(settings, "root_path", "") app = FastAPI( + lifespan=lifespan, title="OCR Sprint Service", version=__version__, description="OCR + structured extraction for Indonesian police 'surat sprint' documents.", diff --git a/src/ocr_sprint/pipeline/ocr.py b/src/ocr_sprint/pipeline/ocr.py index a3d8775..c9893b3 100644 --- a/src/ocr_sprint/pipeline/ocr.py +++ b/src/ocr_sprint/pipeline/ocr.py @@ -151,6 +151,19 @@ def get_ocr() -> PaddleOCR: return _instance +def warmup() -> None: + """Eagerly initialize the PaddleOCR engine. + + Call this during application startup so the first real request does not + pay the model-loading cost (~2-5s on CPU). Also prevents the process from + entering Disk-Sleep state (state D) mid-request when memory is tight, + because the OS has already paged in all model weights during startup. + """ + _logger.info("paddleocr.warmup.start") + get_ocr() + _logger.info("paddleocr.warmup.done") + + def run_ocr(image: NDArrayU8) -> OCRPage: """Run OCR on a single BGR image and return a structured page result.""" engine = get_ocr() diff --git a/src/ocr_sprint/pipeline/table.py b/src/ocr_sprint/pipeline/table.py index 135f0a2..a6bd7bd 100644 --- a/src/ocr_sprint/pipeline/table.py +++ b/src/ocr_sprint/pipeline/table.py @@ -97,6 +97,18 @@ def get_pp_structure() -> PPStructure: return _instance +def warmup() -> None: + """Eagerly initialize the PP-Structure engine. + + Call this during application startup so the first real request does not + pay the model-loading cost (~3-6s on CPU). Mirrors ocr.warmup() so the + lifespan handler can warm both engines in one place. + """ + _logger.info("pp_structure.warmup.start") + get_pp_structure() + _logger.info("pp_structure.warmup.done") + + # ---------- table parsing ---------- diff --git a/src/ocr_sprint/worker/celery_app.py b/src/ocr_sprint/worker/celery_app.py index 68d427e..1a05252 100644 --- a/src/ocr_sprint/worker/celery_app.py +++ b/src/ocr_sprint/worker/celery_app.py @@ -15,8 +15,12 @@ from __future__ import annotations import os from celery import Celery +from celery.signals import worker_ready from ocr_sprint.config import get_settings +from ocr_sprint.utils.logging import get_logger + +_logger = get_logger(__name__) def build_celery_app() -> Celery: @@ -47,3 +51,32 @@ def build_celery_app() -> Celery: celery_app = build_celery_app() + + +@worker_ready.connect +def preload_ocr_models(sender: object, **kwargs: object) -> None: + """Warm up PaddleOCR and PP-Structure when the worker process is ready. + + With ``--pool=solo`` the worker runs tasks in the *same* process that + receives this signal, so models loaded here are reused for every + subsequent task — no fork overhead, no duplicate model loading, and + RAM usage stays bounded (~1.5 GB instead of 1.5 GB × n_forks). + """ + from ocr_sprint.config import get_settings as _gs + from ocr_sprint.pipeline import ocr as _ocr + from ocr_sprint.pipeline import table as _table + + _logger.info("celery.worker.warmup.start") + s = _gs() + try: + _ocr.warmup() + except Exception as exc: + _logger.warning("celery.worker.paddleocr.warmup.failed", error=str(exc)) + + if s.tables_enabled: + try: + _table.warmup() + except Exception as exc: + _logger.warning("celery.worker.pp_structure.warmup.failed", error=str(exc)) + + _logger.info("celery.worker.warmup.done")