Phase 1 MVP: synchronous OCR + regex header extraction

Implements the foundation of the OCR Sprint service:
- FastAPI app with /api/v1/health and /api/v1/documents (sync upload)
- Pydantic v2 schemas for documents, extraction result, personnel
- Pipeline: PDF/image ingest (PyMuPDF), preprocessing (resize, deskew,
  denoise, optional adaptive threshold), PaddleOCR wrapper, regex-based
  header extraction (nomor sprint, tanggal, satuan, perihal, dasar),
  signatory NRP, master-pangkat validation, confidence scoring + routing.
- Tests: 61 unit tests covering regex rules, validators, preprocess,
  ingest, confidence, and API contract (PaddleOCR mocked).
- Tooling: pyproject (setuptools), ruff, mypy strict, pytest, pre-commit,
  Dockerfile, docker-compose, Makefile.
- Docs: README + docs/architecture.md (full hybrid stack rationale and
  6-phase roadmap).

Co-authored-by: adrian kuman firmansah <adriancuman@gmail.com>
This commit is contained in:
Devin AI
2026-04-25 14:58:50 +00:00
commit ca0c0a0428
45 changed files with 2457 additions and 0 deletions

View File

@@ -0,0 +1,3 @@
"""OCR Sprint Service — extract structured data from Indonesian police 'surat sprint'."""
__version__ = "0.1.0"

View File

View File

@@ -0,0 +1,43 @@
"""HTTP error handlers."""
from __future__ import annotations
from fastapi import FastAPI, Request, status
from fastapi.responses import JSONResponse
from ocr_sprint.utils.logging import get_logger
_logger = get_logger(__name__)
class OCRServiceError(Exception):
"""Base class for application errors that should map to a 4xx response."""
http_status: int = status.HTTP_400_BAD_REQUEST
class UnsupportedDocumentError(OCRServiceError):
"""Uploaded file is neither a PDF nor a recognized image format."""
class JobNotFoundError(OCRServiceError):
http_status = status.HTTP_404_NOT_FOUND
def register_error_handlers(app: FastAPI) -> None:
"""Wire OCRServiceError + a final fallback for unexpected exceptions."""
@app.exception_handler(OCRServiceError)
async def _ocr_error_handler(_: Request, exc: OCRServiceError) -> JSONResponse:
return JSONResponse(
status_code=exc.http_status,
content={"error": exc.__class__.__name__, "message": str(exc)},
)
@app.exception_handler(Exception)
async def _unexpected_handler(_: Request, exc: Exception) -> JSONResponse:
_logger.exception("api.unhandled_exception", error=str(exc))
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={"error": "InternalServerError", "message": "Unexpected error"},
)

View File

View File

@@ -0,0 +1,58 @@
"""Documents API — Phase 1 synchronous endpoint.
POST /documents accepts a single PDF or image upload, runs the synchronous
pipeline inline, and returns the structured result. This is suitable for
development and low-traffic production; Phase 4 will introduce an async
queue and a polling-style API at the same path.
"""
from __future__ import annotations
from uuid import uuid4
from fastapi import APIRouter, File, UploadFile, status
from ocr_sprint.api.errors import UnsupportedDocumentError
from ocr_sprint.pipeline.orchestrator import run_pipeline
from ocr_sprint.schemas.document import DocumentResponse
from ocr_sprint.utils.logging import get_logger
router = APIRouter(prefix="/documents", tags=["documents"])
_logger = get_logger(__name__)
_MAX_UPLOAD_BYTES = 25 * 1024 * 1024 # 25 MB
@router.post("", status_code=status.HTTP_200_OK, response_model=DocumentResponse)
async def create_document(file: UploadFile = File(...)) -> DocumentResponse:
"""Run OCR + extraction synchronously on a single upload."""
job_id = uuid4()
log = _logger.bind(job_id=str(job_id), filename=file.filename or "")
content = await file.read()
if not content:
raise UnsupportedDocumentError("Uploaded file is empty.")
if len(content) > _MAX_UPLOAD_BYTES:
raise UnsupportedDocumentError(
f"Uploaded file exceeds {_MAX_UPLOAD_BYTES // (1024 * 1024)} MB limit."
)
log.info("documents.received", size=len(content))
try:
output = run_pipeline(content)
except ValueError as exc:
raise UnsupportedDocumentError(str(exc)) from exc
log.info(
"documents.completed",
status=output.status.value,
confidence=round(output.confidence, 3),
flags=[f.value for f in output.result.review_flags],
)
return DocumentResponse(
job_id=job_id,
status=output.status,
confidence=output.confidence,
data=output.result,
review_flags=[f.value for f in output.result.review_flags],
)

View File

@@ -0,0 +1,15 @@
"""Liveness / readiness endpoints."""
from __future__ import annotations
from fastapi import APIRouter
from ocr_sprint import __version__
router = APIRouter(tags=["health"])
@router.get("/health")
async def health() -> dict[str, str]:
"""Lightweight liveness check — does NOT touch the OCR engine."""
return {"status": "ok", "version": __version__}

72
src/ocr_sprint/config.py Normal file
View File

@@ -0,0 +1,72 @@
"""Application settings loaded from environment / .env file."""
from __future__ import annotations
from functools import lru_cache
from pathlib import Path
from pydantic import Field
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
"""Runtime configuration. Override via environment variables or a .env file."""
model_config = SettingsConfigDict(
env_file=".env",
env_file_encoding="utf-8",
case_sensitive=False,
extra="ignore",
)
# App
app_env: str = "local"
app_host: str = "0.0.0.0"
app_port: int = 8000
app_log_level: str = "INFO"
# Storage (Phase 1: local fs)
storage_local_dir: Path = Path("./storage")
# OCR
ocr_lang: str = "latin"
ocr_use_gpu: bool = False
ocr_det_model_dir: str | None = None
ocr_rec_model_dir: str | None = None
ocr_cls_model_dir: str | None = None
ocr_max_image_side: int = 2200
# Preprocessing
preprocess_target_dpi: int = 300
preprocess_denoise: bool = True
preprocess_deskew: bool = True
preprocess_adaptive_threshold: bool = False
# Confidence thresholds (Phase 5 routing)
confidence_auto_approve: float = Field(0.95, ge=0.0, le=1.0)
confidence_needs_review: float = Field(0.85, ge=0.0, le=1.0)
# LLM (Phase 5)
llm_enabled: bool = False
llm_provider: str = "ollama"
llm_model: str = "qwen2.5:1.5b"
llm_base_url: str = "http://localhost:11434"
llm_timeout_s: int = 60
# Async pipeline (Phase 4)
queue_enabled: bool = False
redis_url: str = "redis://localhost:6379/0"
database_url: str = "postgresql+psycopg://ocr:ocr@localhost:5432/ocr_sprint"
minio_endpoint: str = "localhost:9000"
minio_access_key: str = "minioadmin"
minio_secret_key: str = "minioadmin"
minio_bucket: str = "ocr-sprint"
minio_secure: bool = False
@lru_cache(maxsize=1)
def get_settings() -> Settings:
"""Cached accessor so settings are loaded once per process."""
settings = Settings()
settings.storage_local_dir.mkdir(parents=True, exist_ok=True)
return settings

View File

View File

@@ -0,0 +1,66 @@
"""Master data for Polri ranks ('pangkat').
Used by the validation layer to:
1. Confirm that a recognized rank string is a real Polri rank.
2. Normalize abbreviated forms ("AKP""AKP", "Brigadir Polisi""Brigadir") to a canonical form.
Source: Peraturan Kapolri tentang Pangkat (publicly available, 2024).
Update this file when ranks are reorganized.
"""
from __future__ import annotations
# Canonical abbreviation → list of accepted variants (case-insensitive).
PANGKAT_VARIANTS: dict[str, tuple[str, ...]] = {
# Tamtama
"BHARADA": ("BHARADA", "BHRD"),
"BHARATU": ("BHARATU", "BHRT"),
"BHARAKA": ("BHARAKA", "BHRK"),
"ABRIP": ("ABRIP",),
"ABRIPTU": ("ABRIPTU",),
"ABRIPKA": ("ABRIPKA",),
# Bintara
"BRIPDA": ("BRIPDA",),
"BRIPTU": ("BRIPTU",),
"BRIGADIR": ("BRIGADIR", "BRIG", "BRIG POL"),
"BRIPKA": ("BRIPKA",),
"AIPDA": ("AIPDA",),
"AIPTU": ("AIPTU",),
# Perwira Pertama
"IPDA": ("IPDA",),
"IPTU": ("IPTU",),
"AKP": ("AKP",),
# Perwira Menengah
"KOMPOL": ("KOMPOL",),
"AKBP": ("AKBP",),
"KOMBES POL": ("KOMBES POL", "KOMBESPOL", "KBP"),
# Perwira Tinggi
"BRIGJEN POL": ("BRIGJEN POL", "BRIGJENPOL", "BRIGJEN"),
"IRJEN POL": ("IRJEN POL", "IRJENPOL", "IRJEN"),
"KOMJEN POL": ("KOMJEN POL", "KOMJENPOL", "KOMJEN"),
"JENDERAL POL": ("JENDERAL POL", "JENDERALPOL", "JENDERAL"),
}
# Reverse lookup: any variant (uppercased) → canonical form.
_VARIANT_TO_CANONICAL: dict[str, str] = {
variant.upper(): canonical
for canonical, variants in PANGKAT_VARIANTS.items()
for variant in variants
}
def normalize_pangkat(raw: str | None) -> str | None:
"""Return canonical Polri rank, or None if input is empty/unknown."""
if not raw:
return None
cleaned = " ".join(raw.strip().upper().split())
if cleaned in _VARIANT_TO_CANONICAL:
return _VARIANT_TO_CANONICAL[cleaned]
# tolerate trailing punctuation like "AKP."
stripped = cleaned.rstrip(".,;:")
return _VARIANT_TO_CANONICAL.get(stripped)
def is_valid_pangkat(raw: str | None) -> bool:
"""True if the string maps to a known Polri rank after normalization."""
return normalize_pangkat(raw) is not None

42
src/ocr_sprint/main.py Normal file
View File

@@ -0,0 +1,42 @@
"""FastAPI entrypoint."""
from __future__ import annotations
from fastapi import FastAPI
from ocr_sprint import __version__
from ocr_sprint.api.errors import register_error_handlers
from ocr_sprint.api.routes import documents, health
from ocr_sprint.config import get_settings
from ocr_sprint.utils.logging import configure_logging
def create_app() -> FastAPI:
"""Application factory — keeps top-level state easy to test."""
settings = get_settings()
configure_logging(settings.app_log_level)
app = FastAPI(
title="OCR Sprint Service",
version=__version__,
description="OCR + structured extraction for Indonesian police 'surat sprint' documents.",
docs_url="/docs",
redoc_url="/redoc",
openapi_url="/openapi.json",
)
register_error_handlers(app)
app.include_router(health.router, prefix="/api/v1")
app.include_router(documents.router, prefix="/api/v1")
return app
app = create_app()
def run() -> None:
"""Console-script entrypoint (`ocr-sprint-api`)."""
import uvicorn
s = get_settings()
uvicorn.run("ocr_sprint.main:app", host=s.app_host, port=s.app_port, reload=False)

View File

@@ -0,0 +1 @@
"""OCR pipeline: ingest → preprocess → OCR → extract → validate."""

View File

@@ -0,0 +1,51 @@
"""Confidence scoring + routing decision.
The score is a weighted blend of:
- mean OCR confidence across all detected lines
- validation pass rate (1.0 if no review flags, decreases per flag)
This is intentionally simple for Phase 1; Phase 5 will add LLM logprob
contributions and per-field confidences.
"""
from __future__ import annotations
from ocr_sprint.config import get_settings
from ocr_sprint.schemas.document import DocumentStatus
from ocr_sprint.schemas.extraction import ReviewFlag
# Per-flag penalty applied to the validation component of the score.
_FLAG_PENALTY: dict[ReviewFlag, float] = {
ReviewFlag.LOW_OCR_CONFIDENCE: 0.10,
ReviewFlag.MISSING_FIELD: 0.20,
ReviewFlag.INVALID_NRP: 0.10,
ReviewFlag.UNKNOWN_PANGKAT: 0.05,
ReviewFlag.PERSONNEL_COUNT_MISMATCH: 0.15,
ReviewFlag.DATE_PARSE_FAILED: 0.10,
}
OCR_WEIGHT = 0.6
VALIDATION_WEIGHT = 0.4
def compute_confidence(
ocr_confidence: float,
flags: list[ReviewFlag],
) -> float:
"""Blend OCR confidence with validation penalties into a single 0-1 score."""
validation_score = 1.0
for flag in flags:
validation_score -= _FLAG_PENALTY.get(flag, 0.05)
validation_score = max(0.0, validation_score)
blended = OCR_WEIGHT * ocr_confidence + VALIDATION_WEIGHT * validation_score
return max(0.0, min(1.0, blended))
def route(confidence: float) -> DocumentStatus:
"""Map a final confidence score onto the job's terminal status."""
s = get_settings()
if confidence >= s.confidence_auto_approve:
return DocumentStatus.COMPLETED
if confidence >= s.confidence_needs_review:
return DocumentStatus.NEEDS_REVIEW
return DocumentStatus.NEEDS_REVIEW # below review threshold also goes to humans

View File

@@ -0,0 +1 @@
"""Information extraction layer (regex Phase 1, LLM Phase 5)."""

View File

@@ -0,0 +1,169 @@
"""Regex-based extraction for the deterministic header fields of a surat sprint.
Targets header fields whose layout is highly standardized across Polri units:
- Nomor sprint, e.g. "Sprin / 123 / IV / 2025 / Reskrim"
- Tanggal (date the sprint was issued)
- Satuan penerbit (issuing unit)
- Perihal
- Dasar (numbered list of legal/operational basis)
Personnel table extraction is intentionally NOT done here — that needs
PP-Structure + cell-aware logic and lives in `pipeline/table.py` (Phase 3).
"""
from __future__ import annotations
import re
from datetime import date
from ocr_sprint.schemas.extraction import HeaderFields, Signatory
# ---------- regex patterns ----------
# Nomor sprint, tolerant of spacing and OCR noise.
# Examples it should match:
# "Sprin / 123 / IV / 2025 / Reskrim"
# "SPRIN/345/X/2024"
# "Nomor : Sprin/12/I/2025/Sat Intelkam"
_RE_NOMOR_SPRINT = re.compile(
r"\bSPRIN[\s./-]*\d+[\s./-]*[IVXLCDM]+[\s./-]*\d{2,4}(?:[\s./-]*[\w .-]+?)?",
re.IGNORECASE,
)
# Indonesian month names.
_BULAN_MAP: dict[str, int] = {
"JANUARI": 1,
"FEBRUARI": 2,
"MARET": 3,
"APRIL": 4,
"MEI": 5,
"JUNI": 6,
"JULI": 7,
"AGUSTUS": 8,
"SEPTEMBER": 9,
"OKTOBER": 10,
"NOVEMBER": 11,
"DESEMBER": 12,
}
# Date in Indonesian, e.g. "21 April 2025" or "21 - April - 2025"
_RE_TANGGAL_ID = re.compile(
r"\b(\d{1,2})\s*[-./\s]\s*(" + "|".join(_BULAN_MAP.keys()) + r")\s*[-./\s]\s*(\d{4})\b",
re.IGNORECASE,
)
# Satuan penerbit usually appears in the document letterhead, prefixed by
# KEPOLISIAN <NEGARA|DAERAH|RESORT|SEKTOR>.
_RE_SATUAN = re.compile(
r"KEPOLISIAN\s+(?:NEGARA\s+REPUBLIK\s+INDONESIA|DAERAH|RESOR(?:T)?|SEKTOR|RESORT)"
r"[^\n]{0,80}",
re.IGNORECASE,
)
# "Perihal : ...." up to end of line.
_RE_PERIHAL = re.compile(r"PERIHAL\s*[:\-]\s*(.+)", re.IGNORECASE)
# A dasar entry typically begins with a number and dot, e.g. "1. UU No. 2 Tahun 2002 ..."
_RE_DASAR_ITEM = re.compile(r"^\s*(\d+)\s*[.)]\s*(.+)$")
# Signatory NRP — Polri NRPs are 8 digits, civil servant NIPs are 18 digits.
_RE_NRP = re.compile(r"\b(NRP|NIP)\s*[.:]?\s*(\d{8,20})\b", re.IGNORECASE)
def find_nomor_sprint(text: str) -> str | None:
"""Return the first nomor sprint found, normalized (no extra spaces)."""
match = _RE_NOMOR_SPRINT.search(text)
if not match:
return None
return " ".join(match.group(0).split())
def find_tanggal(text: str) -> date | None:
"""Find the issuance date.
Surat sprint typically contains multiple dates: one or more in the 'Dasar'
section (citing prior documents) and one near the signatory at the bottom
(the actual issuance date, usually formatted as 'Tempat, DD Month YYYY').
We prefer the **last** date in the document since the issuance date appears
after the dasar items in the standard layout.
"""
matches = list(_RE_TANGGAL_ID.finditer(text))
if not matches:
return None
last = matches[-1]
day_s, bulan, year_s = last.group(1), last.group(2).upper(), last.group(3)
try:
return date(int(year_s), _BULAN_MAP[bulan], int(day_s))
except (KeyError, ValueError):
return None
def find_satuan(text: str) -> str | None:
"""Return the first letterhead match (issuing unit), normalized."""
match = _RE_SATUAN.search(text)
if not match:
return None
return " ".join(match.group(0).split())
def find_perihal(text: str) -> str | None:
"""Return the first 'Perihal: ...' line, trimmed to that line only."""
for line in text.splitlines():
m = _RE_PERIHAL.search(line)
if m:
return m.group(1).strip()
return None
def find_dasar_list(text: str) -> list[str]:
"""Extract numbered 'Dasar' items from the text.
Heuristic: locate a line containing 'DASAR' (Indonesian: "DASAR :") and
collect subsequent lines that start with a number. Stops at a blank line
or a line beginning with another section header keyword.
"""
lines = text.splitlines()
items: list[str] = []
in_dasar = False
section_terminators = ("DIPERINTAHKAN", "UNTUK", "DASAR HUKUM", "PERIHAL")
for raw_line in lines:
line = raw_line.strip()
if not in_dasar:
if re.match(r"^\s*DASAR\b", line, re.IGNORECASE):
in_dasar = True
continue
if not line:
if items:
break
continue
upper = line.upper()
if any(upper.startswith(term) for term in section_terminators):
break
m = _RE_DASAR_ITEM.match(line)
if m:
items.append(m.group(2).strip())
elif items:
# continuation of the previous dasar item
items[-1] = (items[-1] + " " + line).strip()
return items
def find_signatory(text: str) -> Signatory:
"""Best-effort extraction of the signatory block (last NRP in the document)."""
matches = list(_RE_NRP.finditer(text))
if not matches:
return Signatory()
last = matches[-1]
return Signatory(nrp=last.group(2))
def extract_header(text: str) -> HeaderFields:
"""Run all header-level regex extractors and return a populated schema."""
return HeaderFields(
nomor_sprint=find_nomor_sprint(text),
tanggal=find_tanggal(text),
satuan_penerbit=find_satuan(text),
perihal=find_perihal(text),
dasar=find_dasar_list(text),
)

View File

@@ -0,0 +1,64 @@
"""Cross-field validation, with structured review-flag output."""
from __future__ import annotations
import re
from ocr_sprint.data.master_pangkat import is_valid_pangkat
from ocr_sprint.schemas.extraction import (
ExtractionResult,
HeaderFields,
ReviewFlag,
)
from ocr_sprint.schemas.personnel import PersonnelEntry
# Polri NRP = 8 digits.
_RE_NRP_8 = re.compile(r"^\d{8}$")
def validate_nrp(nrp: str | None) -> bool:
"""Return True when the value is a well-formed Polri NRP (8 digits)."""
if nrp is None:
return False
return bool(_RE_NRP_8.match(nrp.strip()))
def validate_personnel_entry(entry: PersonnelEntry) -> list[ReviewFlag]:
"""Inspect a single personnel row and return any review flags it triggers."""
flags: list[ReviewFlag] = []
if entry.nrp and not validate_nrp(entry.nrp):
flags.append(ReviewFlag.INVALID_NRP)
if entry.pangkat and not is_valid_pangkat(entry.pangkat):
flags.append(ReviewFlag.UNKNOWN_PANGKAT)
return flags
def validate_header(header: HeaderFields) -> list[ReviewFlag]:
"""Flag missing required fields or unparseable dates in the header."""
flags: list[ReviewFlag] = []
if header.nomor_sprint is None:
flags.append(ReviewFlag.MISSING_FIELD)
if header.tanggal is None:
flags.append(ReviewFlag.DATE_PARSE_FAILED)
return flags
def validate_extraction(
result: ExtractionResult,
expected_personnel_count: int | None = None,
) -> list[ReviewFlag]:
"""Run all validators across the full extraction and dedupe the flags."""
flags: list[ReviewFlag] = []
flags.extend(validate_header(result.header))
for entry in result.personel:
flags.extend(validate_personnel_entry(entry))
if expected_personnel_count is not None and expected_personnel_count != len(result.personel):
flags.append(ReviewFlag.PERSONNEL_COUNT_MISMATCH)
# dedupe while preserving order
seen: set[ReviewFlag] = set()
deduped: list[ReviewFlag] = []
for flag in flags:
if flag not in seen:
seen.add(flag)
deduped.append(flag)
return deduped

View File

@@ -0,0 +1,81 @@
"""Ingest layer: convert uploaded bytes (PDF/IMG) into a list of numpy images."""
from __future__ import annotations
import io
from dataclasses import dataclass
from typing import Any
import fitz # PyMuPDF
import numpy as np
from PIL import Image
from ocr_sprint.schemas.document import SourceKind
# Generic alias used across the pipeline. We don't constrain dtype/shape because
# OpenCV operations accept multiple dtypes and numpy generics are still rough.
NDArrayU8 = np.ndarray[Any, Any]
PDF_MAGIC = b"%PDF-"
PNG_MAGIC = b"\x89PNG\r\n\x1a\n"
JPEG_MAGIC = b"\xff\xd8\xff"
TIFF_MAGIC_LE = b"II*\x00"
TIFF_MAGIC_BE = b"MM\x00*"
@dataclass(frozen=True)
class IngestedPage:
"""One page worth of image data ready for preprocessing."""
image: NDArrayU8 # HxWx3 BGR uint8 (OpenCV convention)
page_index: int
def detect_source_kind(content: bytes) -> SourceKind:
"""Best-effort sniff of an uploaded payload."""
if content.startswith(PDF_MAGIC):
return SourceKind.PDF
if content.startswith((PNG_MAGIC, JPEG_MAGIC, TIFF_MAGIC_LE, TIFF_MAGIC_BE)):
return SourceKind.IMAGE
return SourceKind.UNKNOWN
def _pil_to_bgr(img: Image.Image) -> NDArrayU8:
"""Convert PIL image to OpenCV BGR numpy array."""
if img.mode != "RGB":
img = img.convert("RGB")
arr = np.asarray(img, dtype=np.uint8)
# RGB to BGR
return arr[:, :, ::-1].copy()
def ingest_pdf(content: bytes, target_dpi: int = 300) -> list[IngestedPage]:
"""Render every page of a PDF to a numpy image at the target DPI.
Uses PyMuPDF (no poppler dependency). DPI is enforced via a transform matrix:
fitz's default is 72 DPI, so the zoom factor is target_dpi / 72.
"""
pages: list[IngestedPage] = []
zoom = target_dpi / 72.0
matrix = fitz.Matrix(zoom, zoom)
with fitz.open(stream=content, filetype="pdf") as doc:
for idx, page in enumerate(doc):
pix = page.get_pixmap(matrix=matrix, alpha=False)
img = Image.frombytes("RGB", (pix.width, pix.height), pix.samples)
pages.append(IngestedPage(image=_pil_to_bgr(img), page_index=idx))
return pages
def ingest_image(content: bytes) -> list[IngestedPage]:
"""Decode a single image into a one-element page list."""
img = Image.open(io.BytesIO(content))
return [IngestedPage(image=_pil_to_bgr(img), page_index=0)]
def ingest(content: bytes, kind: SourceKind, target_dpi: int = 300) -> list[IngestedPage]:
"""Dispatch to the right loader based on declared source kind."""
if kind == SourceKind.PDF:
return ingest_pdf(content, target_dpi=target_dpi)
if kind == SourceKind.IMAGE:
return ingest_image(content)
raise ValueError(f"Unsupported source kind: {kind}")

View File

@@ -0,0 +1,106 @@
"""PaddleOCR wrapper.
PaddleOCR has a heavy initialization cost (~2-5s on CPU as model files load),
so we keep a process-global instance behind a lazy accessor.
The wrapper exposes a small, stable surface so the rest of the pipeline does
not depend directly on paddleocr's evolving API.
"""
from __future__ import annotations
from dataclasses import dataclass
from threading import Lock
from typing import TYPE_CHECKING
import numpy as np
from ocr_sprint.config import get_settings
from ocr_sprint.pipeline.ingest import NDArrayU8
from ocr_sprint.utils.logging import get_logger
if TYPE_CHECKING:
from paddleocr import PaddleOCR
_logger = get_logger(__name__)
_lock = Lock()
_instance: PaddleOCR | None = None
@dataclass(frozen=True)
class OCRLine:
"""One recognized line with its bounding polygon and confidence."""
text: str
confidence: float
box: tuple[tuple[float, float], ...] # 4 (x, y) corner points
@dataclass(frozen=True)
class OCRPage:
"""OCR output for a single page."""
lines: list[OCRLine]
@property
def text(self) -> str:
"""Reconstruct page text by concatenating lines (order = paddle's output order)."""
return "\n".join(line.text for line in self.lines)
@property
def mean_confidence(self) -> float:
if not self.lines:
return 0.0
return float(np.mean([line.confidence for line in self.lines]))
def _build_paddleocr() -> PaddleOCR:
from paddleocr import PaddleOCR
s = get_settings()
kwargs: dict[str, object] = {
"lang": s.ocr_lang,
"use_angle_cls": True,
"use_gpu": s.ocr_use_gpu,
"show_log": False,
}
if s.ocr_det_model_dir:
kwargs["det_model_dir"] = s.ocr_det_model_dir
if s.ocr_rec_model_dir:
kwargs["rec_model_dir"] = s.ocr_rec_model_dir
if s.ocr_cls_model_dir:
kwargs["cls_model_dir"] = s.ocr_cls_model_dir
_logger.info("paddleocr.init", lang=s.ocr_lang, use_gpu=s.ocr_use_gpu)
return PaddleOCR(**kwargs)
def get_ocr() -> PaddleOCR:
"""Lazy, thread-safe singleton accessor for the PaddleOCR engine."""
global _instance
if _instance is None:
with _lock:
if _instance is None:
_instance = _build_paddleocr()
return _instance
def run_ocr(image: NDArrayU8) -> OCRPage:
"""Run OCR on a single BGR image and return a structured page result."""
engine = get_ocr()
raw = engine.ocr(image, cls=True)
# PaddleOCR returns [[ [box, (text, conf)], ... ]] — one outer list per image.
if not raw or raw[0] is None:
return OCRPage(lines=[])
page_raw = raw[0]
lines: list[OCRLine] = []
for item in page_raw:
if not item or len(item) < 2:
continue
box_raw, text_conf = item[0], item[1]
text, conf = text_conf[0], float(text_conf[1])
try:
box = tuple((float(p[0]), float(p[1])) for p in box_raw)
except (TypeError, ValueError, IndexError):
continue
lines.append(OCRLine(text=text, confidence=conf, box=box))
return OCRPage(lines=lines)

View File

@@ -0,0 +1,103 @@
"""Synchronous pipeline orchestrator (Phase 1).
Wires the individual stages together:
bytes → ingest → preprocess → OCR → regex extract → validate → score
Phase 4 will replace this with a Celery task graph; Phase 3/5 will plug
in PP-Structure for tables and an LLM extractor for variant fields.
"""
from __future__ import annotations
from dataclasses import dataclass
from ocr_sprint.config import get_settings
from ocr_sprint.pipeline.confidence import compute_confidence, route
from ocr_sprint.pipeline.extract.regex_rules import extract_header, find_signatory
from ocr_sprint.pipeline.extract.validators import validate_extraction
from ocr_sprint.pipeline.ingest import detect_source_kind, ingest
from ocr_sprint.pipeline.ocr import OCRPage, run_ocr
from ocr_sprint.pipeline.preprocess import PreprocessConfig, preprocess
from ocr_sprint.schemas.document import DocumentStatus, SourceKind
from ocr_sprint.schemas.extraction import ExtractionResult, ReviewFlag
from ocr_sprint.utils.logging import get_logger
_logger = get_logger(__name__)
# Below this OCR confidence we automatically flag for review.
_OCR_CONFIDENCE_FLAG_THRESHOLD = 0.80
@dataclass
class PipelineOutput:
"""Bundle returned by the orchestrator."""
source_kind: SourceKind
status: DocumentStatus
confidence: float
result: ExtractionResult
def run_pipeline(content: bytes) -> PipelineOutput:
"""Execute the synchronous OCR + extraction pipeline on raw upload bytes."""
s = get_settings()
kind = detect_source_kind(content)
if kind == SourceKind.UNKNOWN:
raise ValueError("Unsupported file type — only PDF and common image formats are accepted.")
pages = ingest(content, kind, target_dpi=s.preprocess_target_dpi)
_logger.info("pipeline.ingested", source_kind=kind.value, pages=len(pages))
pre_cfg = PreprocessConfig(
max_side=s.ocr_max_image_side,
denoise=s.preprocess_denoise,
deskew=s.preprocess_deskew,
adaptive_threshold=s.preprocess_adaptive_threshold,
)
ocr_pages: list[OCRPage] = []
for page in pages:
cleaned = preprocess(page.image, pre_cfg)
ocr_pages.append(run_ocr(cleaned))
full_text = "\n".join(p.text for p in ocr_pages)
mean_ocr_conf = sum(p.mean_confidence for p in ocr_pages) / len(ocr_pages) if ocr_pages else 0.0
header = extract_header(full_text)
ttd = find_signatory(full_text)
initial_flags: list[ReviewFlag] = []
if mean_ocr_conf < _OCR_CONFIDENCE_FLAG_THRESHOLD:
initial_flags.append(ReviewFlag.LOW_OCR_CONFIDENCE)
result = ExtractionResult(
header=header,
personel=[], # Phase 3 will populate from PP-Structure
untuk=[],
ttd=ttd,
raw_text=full_text,
confidence=mean_ocr_conf,
review_flags=list(initial_flags),
)
flags = validate_extraction(result)
# merge initial OCR-confidence flag with validation flags, preserving uniqueness
seen = set(flags)
for f in initial_flags:
if f not in seen:
flags.append(f)
seen.add(f)
result.review_flags = flags
final_conf = compute_confidence(mean_ocr_conf, flags)
result.confidence = final_conf
status = route(final_conf)
return PipelineOutput(
source_kind=kind,
status=status,
confidence=final_conf,
result=result,
)

View File

@@ -0,0 +1,108 @@
"""Image preprocessing for OCR.
Phase 1 implements the "always-on" steps that work for both clean PDF scans
and reasonable phone photos:
- resize to a reasonable max side (PaddleOCR runs faster on smaller inputs)
- convert to grayscale for analysis (kept as 3-channel BGR for paddle)
- denoise (Non-Local Means, gentle)
- deskew via Hough line angle estimate
- optional adaptive threshold for low-quality phone photos
Phase 2 will add document-corner detection + perspective transform + dewarping
for tilted phone shots; those live in `document_detect.py` (added later).
"""
from __future__ import annotations
from dataclasses import dataclass
import cv2
import numpy as np
from ocr_sprint.pipeline.ingest import NDArrayU8
@dataclass(frozen=True)
class PreprocessConfig:
"""Tunable knobs for the preprocessing pipeline."""
max_side: int = 2200
denoise: bool = True
deskew: bool = True
adaptive_threshold: bool = False
def _resize_max_side(img: NDArrayU8, max_side: int) -> NDArrayU8:
h, w = img.shape[:2]
longest = max(h, w)
if longest <= max_side:
return img
scale = max_side / longest
new_w, new_h = round(w * scale), round(h * scale)
return cv2.resize(img, (new_w, new_h), interpolation=cv2.INTER_AREA)
def _estimate_skew_angle(gray: NDArrayU8) -> float:
"""Estimate skew using Canny + Hough; returns angle in degrees within [-15, 15]."""
edges = cv2.Canny(gray, 50, 150, apertureSize=3)
lines = cv2.HoughLines(edges, 1, np.pi / 360, threshold=200)
if lines is None or len(lines) == 0:
return 0.0
angles: list[float] = []
for line in lines[:200]:
rho, theta = line[0]
del rho
# convert to angle relative to horizontal (degrees)
angle = (theta * 180.0 / np.pi) - 90.0
# only keep nearly-horizontal lines (within ±15°)
if -15.0 < angle < 15.0:
angles.append(angle)
if not angles:
return 0.0
return float(np.median(angles))
def _rotate(img: NDArrayU8, angle_deg: float) -> NDArrayU8:
if abs(angle_deg) < 0.1:
return img
h, w = img.shape[:2]
center = (w / 2, h / 2)
matrix = cv2.getRotationMatrix2D(center, angle_deg, 1.0)
return cv2.warpAffine(
img,
matrix,
(w, h),
flags=cv2.INTER_CUBIC,
borderMode=cv2.BORDER_REPLICATE,
)
def preprocess(img: NDArrayU8, cfg: PreprocessConfig | None = None) -> NDArrayU8:
"""Run preprocessing and return a clean BGR uint8 image suitable for OCR."""
if cfg is None:
cfg = PreprocessConfig()
out = _resize_max_side(img, cfg.max_side)
if cfg.deskew:
gray = cv2.cvtColor(out, cv2.COLOR_BGR2GRAY)
angle = _estimate_skew_angle(gray)
out = _rotate(out, -angle)
if cfg.denoise:
out = cv2.fastNlMeansDenoisingColored(out, None, 5, 5, 7, 21)
if cfg.adaptive_threshold:
gray = cv2.cvtColor(out, cv2.COLOR_BGR2GRAY)
binarized = cv2.adaptiveThreshold(
gray,
255,
cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
cv2.THRESH_BINARY,
blockSize=31,
C=15,
)
out = cv2.cvtColor(binarized, cv2.COLOR_GRAY2BGR)
return out

0
src/ocr_sprint/py.typed Normal file
View File

View File

@@ -0,0 +1,27 @@
"""Pydantic schemas for input/output of the OCR Sprint service."""
from ocr_sprint.schemas.document import (
DocumentJob,
DocumentResponse,
DocumentStatus,
SourceKind,
)
from ocr_sprint.schemas.extraction import (
ExtractionResult,
HeaderFields,
ReviewFlag,
Signatory,
)
from ocr_sprint.schemas.personnel import PersonnelEntry
__all__ = [
"DocumentJob",
"DocumentResponse",
"DocumentStatus",
"ExtractionResult",
"HeaderFields",
"PersonnelEntry",
"ReviewFlag",
"Signatory",
"SourceKind",
]

View File

@@ -0,0 +1,57 @@
"""Job-level schemas (request, response, status)."""
from __future__ import annotations
from datetime import datetime
from enum import Enum
from typing import Any
from uuid import UUID, uuid4
from pydantic import BaseModel, ConfigDict, Field
from ocr_sprint.schemas.extraction import ExtractionResult
class SourceKind(str, Enum):
"""High-level type of the uploaded document."""
PDF = "pdf"
IMAGE = "image"
UNKNOWN = "unknown"
class DocumentStatus(str, Enum):
"""Lifecycle status of an OCR job."""
PENDING = "pending"
PROCESSING = "processing"
COMPLETED = "completed"
NEEDS_REVIEW = "needs_review"
FAILED = "failed"
class DocumentJob(BaseModel):
"""Internal representation of a job (Phase 1 holds it in-memory)."""
model_config = ConfigDict(use_enum_values=False)
job_id: UUID = Field(default_factory=uuid4)
source_kind: SourceKind = SourceKind.UNKNOWN
filename: str
status: DocumentStatus = DocumentStatus.PENDING
created_at: datetime = Field(default_factory=lambda: datetime.utcnow())
updated_at: datetime = Field(default_factory=lambda: datetime.utcnow())
error: str | None = None
result: ExtractionResult | None = None
debug: dict[str, Any] = Field(default_factory=dict)
class DocumentResponse(BaseModel):
"""Public response payload returned by the documents API."""
job_id: UUID
status: DocumentStatus
confidence: float | None = None
data: ExtractionResult | None = None
review_flags: list[str] = Field(default_factory=list)
error: str | None = None

View File

@@ -0,0 +1,55 @@
"""Top-level extraction result schemas."""
from __future__ import annotations
from datetime import date
from enum import Enum
from pydantic import BaseModel, Field
from ocr_sprint.schemas.personnel import PersonnelEntry
class ReviewFlag(str, Enum):
"""Reasons a document was routed to human review."""
LOW_OCR_CONFIDENCE = "low_ocr_confidence"
MISSING_FIELD = "missing_field"
INVALID_NRP = "invalid_nrp"
UNKNOWN_PANGKAT = "unknown_pangkat"
PERSONNEL_COUNT_MISMATCH = "personnel_count_mismatch"
DATE_PARSE_FAILED = "date_parse_failed"
class Signatory(BaseModel):
"""The official signing the sprint (Penandatangan)."""
nama: str | None = None
pangkat: str | None = None
nrp: str | None = None
jabatan: str | None = None
class HeaderFields(BaseModel):
"""Header fields parsed from the top portion of a sprint."""
nomor_sprint: str | None = Field(None, description="e.g. Sprin/123/IV/2025/Reskrim.")
tanggal: date | None = Field(None, description="Date the sprint was issued.")
satuan_penerbit: str | None = Field(None, description="Issuing unit, e.g. 'Polres Bandung'.")
perihal: str | None = None
dasar: list[str] = Field(default_factory=list, description="List of legal/operational basis.")
class ExtractionResult(BaseModel):
"""Full structured payload extracted from a single sprint document."""
header: HeaderFields = Field(default_factory=HeaderFields)
personel: list[PersonnelEntry] = Field(default_factory=list)
untuk: list[str] = Field(
default_factory=list,
description="Bulleted task descriptions in the 'Untuk' / 'Dikerjakan' section.",
)
ttd: Signatory = Field(default_factory=Signatory)
raw_text: str = Field(default="", description="Concatenated OCR text for debugging.")
confidence: float = Field(0.0, ge=0.0, le=1.0)
review_flags: list[ReviewFlag] = Field(default_factory=list)

View File

@@ -0,0 +1,18 @@
"""Schema for a single personnel row in a surat sprint."""
from __future__ import annotations
from pydantic import BaseModel, Field
class PersonnelEntry(BaseModel):
"""One row from the personnel table."""
no: int | None = Field(None, description="Row number as printed on the document.")
pangkat: str | None = Field(None, description="Rank, normalized when possible.")
nrp: str | None = Field(None, description="8-digit Polri NRP, or blank if not detected.")
nama: str | None = Field(None, description="Full name.")
jabatan_dinas: str | None = Field(None, description="Permanent post (jabatan dalam dinas).")
jabatan_sprint: str | None = Field(None, description="Role within this sprint.")
keterangan: str | None = None
confidence: float = Field(0.0, ge=0.0, le=1.0)

View File

View File

@@ -0,0 +1,45 @@
"""Structured logging setup using structlog."""
from __future__ import annotations
import logging
import sys
from typing import Any
import structlog
def configure_logging(level: str = "INFO") -> None:
"""Configure structlog to emit JSON-friendly key=value records to stdout."""
log_level = getattr(logging, level.upper(), logging.INFO)
logging.basicConfig(
format="%(message)s",
stream=sys.stdout,
level=log_level,
)
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso", utc=True),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.dev.ConsoleRenderer(colors=False),
],
wrapper_class=structlog.make_filtering_bound_logger(log_level),
context_class=dict,
logger_factory=structlog.PrintLoggerFactory(),
cache_logger_on_first_use=True,
)
def get_logger(name: str | None = None, **initial_values: Any) -> Any:
"""Return a bound logger with optional initial context.
The return type is ``Any`` because structlog's BoundLogger generic typing
is too restrictive in practice; callers treat it as a duck-typed logger.
"""
logger = structlog.get_logger(name)
if initial_values:
logger = logger.bind(**initial_values)
return logger