This commit is contained in:
Nama Kamu
2026-04-26 22:08:41 +08:00
parent 5d9d9f784a
commit 9d969e61fd
6 changed files with 149 additions and 7 deletions

View File

@@ -10,7 +10,10 @@ flow on top:
* `POST /documents?sync=true` — runs the pipeline inline (the original * `POST /documents?sync=true` — runs the pipeline inline (the original
Phase 1 behaviour). Useful for tests and Phase 1 behaviour). Useful for tests and
small-volume single-tenant deploys without small-volume single-tenant deploys without
a Celery worker. a Celery worker. The heavy OCR work is
offloaded to a thread-pool executor so the
uvicorn event loop stays responsive during
processing (~30-120s on CPU).
* `GET /documents/{job_id}` — returns the current job state. Async * `GET /documents/{job_id}` — returns the current job state. Async
clients poll this until `status` is in a clients poll this until `status` is in a
terminal state (completed / needs_review / terminal state (completed / needs_review /
@@ -19,9 +22,19 @@ flow on top:
from __future__ import annotations from __future__ import annotations
import asyncio
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from typing import Annotated from typing import Annotated
from uuid import UUID, uuid4 from uuid import UUID, uuid4
# Thread pool dedicated to blocking OCR work. Using a *separate* pool
# (rather than the default loop executor) lets us cap the number of
# concurrent heavy OCR jobs independently of other thread-pool users.
# With 1 Celery worker + 1 sync slot we never exceed 2 parallel OCR
# runs; keep the pool at 1 so RAM stays bounded on the 7.4 GB server.
_OCR_EXECUTOR = ThreadPoolExecutor(max_workers=1, thread_name_prefix="ocr-inline")
from fastapi import ( from fastapi import (
APIRouter, APIRouter,
Depends, Depends,
@@ -165,11 +178,13 @@ async def create_document(
async def _run_inline(job_id: UUID, content: bytes) -> DocumentResponse: async def _run_inline(job_id: UUID, content: bytes) -> DocumentResponse:
"""Synchronous pipeline execution. """Run the OCR pipeline without blocking the uvicorn event loop.
Each state transition opens its own short session so the request-scoped ``run_pipeline`` is CPU-bound and can take 30-120 s on a 2 vCPU server.
session's rollback-on-exception behaviour cannot wipe out the Awaiting it directly on the async handler would freeze the entire event
``mark_failed`` write or strand the blob on disk. loop (and therefore block health-checks, metrics, and every other request)
for the full duration. We push the work onto a dedicated single-thread
executor so the loop stays free while the OCR runs in the background.
""" """
import time import time
@@ -177,8 +192,13 @@ async def _run_inline(job_id: UUID, content: bytes) -> DocumentResponse:
JobRepository(s).mark_processing(job_id) JobRepository(s).mark_processing(job_id)
started = time.perf_counter() started = time.perf_counter()
loop = asyncio.get_event_loop()
try: try:
output = run_pipeline(content) # run_pipeline is synchronous; wrap it so asyncio can await it.
output = await loop.run_in_executor(
_OCR_EXECUTOR,
partial(run_pipeline, content),
)
except ValueError as exc: except ValueError as exc:
with session_scope() as s: with session_scope() as s:
JobRepository(s).mark_failed(job_id, error=str(exc)) JobRepository(s).mark_failed(job_id, error=str(exc))

View File

@@ -3,8 +3,11 @@
from __future__ import annotations from __future__ import annotations
from fastapi import APIRouter from fastapi import APIRouter
from fastapi.responses import JSONResponse
from ocr_sprint import __version__ from ocr_sprint import __version__
from ocr_sprint.pipeline import ocr as _ocr
from ocr_sprint.pipeline import table as _table
router = APIRouter(tags=["health"]) router = APIRouter(tags=["health"])
@@ -13,3 +16,20 @@ router = APIRouter(tags=["health"])
async def health() -> dict[str, str]: async def health() -> dict[str, str]:
"""Lightweight liveness check — does NOT touch the OCR engine.""" """Lightweight liveness check — does NOT touch the OCR engine."""
return {"status": "ok", "version": __version__} return {"status": "ok", "version": __version__}
@router.get("/health/ready")
async def readiness() -> JSONResponse:
"""Readiness check — returns 200 when OCR models are loaded, 503 if still warming up."""
ocr_ready = _ocr._instance is not None
table_ready = _table._instance is not None
ready = ocr_ready and table_ready
payload = {
"status": "ready" if ready else "warming_up",
"version": __version__,
"models": {
"paddleocr": "ready" if ocr_ready else "loading",
"pp_structure": "ready" if table_ready else "loading",
},
}
return JSONResponse(content=payload, status_code=200 if ready else 503)

View File

@@ -2,6 +2,10 @@
from __future__ import annotations from __future__ import annotations
import threading
from contextlib import asynccontextmanager
from typing import AsyncIterator
from fastapi import FastAPI from fastapi import FastAPI
from ocr_sprint import __version__ from ocr_sprint import __version__
@@ -11,7 +15,10 @@ from ocr_sprint.api.routes import documents, ground_truth, health
from ocr_sprint.config import get_settings from ocr_sprint.config import get_settings
from ocr_sprint.db import models as _models # noqa: F401 (register ORM tables) from ocr_sprint.db import models as _models # noqa: F401 (register ORM tables)
from ocr_sprint.db.base import Base, get_engine from ocr_sprint.db.base import Base, get_engine
from ocr_sprint.utils.logging import configure_logging from ocr_sprint.utils.logging import configure_logging, get_logger
_startup_logger = get_logger(__name__)
def _ensure_schema() -> None: def _ensure_schema() -> None:
@@ -24,6 +31,42 @@ def _ensure_schema() -> None:
Base.metadata.create_all(bind=get_engine()) Base.metadata.create_all(bind=get_engine())
def _warmup_models_background() -> None:
"""Load PaddleOCR and PP-Structure models in a background thread.
Running in a thread keeps the lifespan non-blocking so uvicorn can
start accepting health-check requests immediately while the heavy models
load (~5-15s on CPU). Requests that arrive before warmup completes will
wait on the existing _lock in each module rather than racing to load.
"""
from ocr_sprint.config import get_settings as _gs
from ocr_sprint.pipeline import ocr as _ocr
from ocr_sprint.pipeline import table as _table
s = _gs()
try:
_ocr.warmup()
except Exception as exc:
_startup_logger.warning("paddleocr.warmup.failed", error=str(exc))
if s.tables_enabled:
try:
_table.warmup()
except Exception as exc:
_startup_logger.warning("pp_structure.warmup.failed", error=str(exc))
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
"""FastAPI lifespan: warm OCR models on startup in a background thread."""
_startup_logger.info("startup.warmup.begin")
t = threading.Thread(target=_warmup_models_background, name="ocr-warmup", daemon=True)
t.start()
yield
# Shutdown: nothing to clean up (models are process-global singletons).
_startup_logger.info("shutdown.complete")
def create_app() -> FastAPI: def create_app() -> FastAPI:
"""Application factory — keeps top-level state easy to test.""" """Application factory — keeps top-level state easy to test."""
settings = get_settings() settings = get_settings()
@@ -34,6 +77,7 @@ def create_app() -> FastAPI:
root_path = getattr(settings, "root_path", "") root_path = getattr(settings, "root_path", "")
app = FastAPI( app = FastAPI(
lifespan=lifespan,
title="OCR Sprint Service", title="OCR Sprint Service",
version=__version__, version=__version__,
description="OCR + structured extraction for Indonesian police 'surat sprint' documents.", description="OCR + structured extraction for Indonesian police 'surat sprint' documents.",

View File

@@ -151,6 +151,19 @@ def get_ocr() -> PaddleOCR:
return _instance return _instance
def warmup() -> None:
"""Eagerly initialize the PaddleOCR engine.
Call this during application startup so the first real request does not
pay the model-loading cost (~2-5s on CPU). Also prevents the process from
entering Disk-Sleep state (state D) mid-request when memory is tight,
because the OS has already paged in all model weights during startup.
"""
_logger.info("paddleocr.warmup.start")
get_ocr()
_logger.info("paddleocr.warmup.done")
def run_ocr(image: NDArrayU8) -> OCRPage: def run_ocr(image: NDArrayU8) -> OCRPage:
"""Run OCR on a single BGR image and return a structured page result.""" """Run OCR on a single BGR image and return a structured page result."""
engine = get_ocr() engine = get_ocr()

View File

@@ -97,6 +97,18 @@ def get_pp_structure() -> PPStructure:
return _instance return _instance
def warmup() -> None:
"""Eagerly initialize the PP-Structure engine.
Call this during application startup so the first real request does not
pay the model-loading cost (~3-6s on CPU). Mirrors ocr.warmup() so the
lifespan handler can warm both engines in one place.
"""
_logger.info("pp_structure.warmup.start")
get_pp_structure()
_logger.info("pp_structure.warmup.done")
# ---------- table parsing ---------- # ---------- table parsing ----------

View File

@@ -15,8 +15,12 @@ from __future__ import annotations
import os import os
from celery import Celery from celery import Celery
from celery.signals import worker_ready
from ocr_sprint.config import get_settings from ocr_sprint.config import get_settings
from ocr_sprint.utils.logging import get_logger
_logger = get_logger(__name__)
def build_celery_app() -> Celery: def build_celery_app() -> Celery:
@@ -47,3 +51,32 @@ def build_celery_app() -> Celery:
celery_app = build_celery_app() celery_app = build_celery_app()
@worker_ready.connect
def preload_ocr_models(sender: object, **kwargs: object) -> None:
"""Warm up PaddleOCR and PP-Structure when the worker process is ready.
With ``--pool=solo`` the worker runs tasks in the *same* process that
receives this signal, so models loaded here are reused for every
subsequent task — no fork overhead, no duplicate model loading, and
RAM usage stays bounded (~1.5 GB instead of 1.5 GB × n_forks).
"""
from ocr_sprint.config import get_settings as _gs
from ocr_sprint.pipeline import ocr as _ocr
from ocr_sprint.pipeline import table as _table
_logger.info("celery.worker.warmup.start")
s = _gs()
try:
_ocr.warmup()
except Exception as exc:
_logger.warning("celery.worker.paddleocr.warmup.failed", error=str(exc))
if s.tables_enabled:
try:
_table.warmup()
except Exception as exc:
_logger.warning("celery.worker.pp_structure.warmup.failed", error=str(exc))
_logger.info("celery.worker.warmup.done")