Files
OCR-SPRIN-SERVICE/src/ocr_sprint/pipeline/orchestrator.py

216 lines
8.4 KiB
Python

"""Synchronous pipeline orchestrator (Phase 1-3).
Wires the individual stages together:
bytes -> ingest -> document_detect -> preprocess -> OCR
-> [PP-Structure tables -> personnel mapper]
-> regex header extract -> validate -> score
Phase 4 will replace this with a Celery task graph; Phase 5 will plug
in an LLM extractor for variant fields.
"""
from __future__ import annotations
from dataclasses import dataclass
from ocr_sprint.config import get_settings
from ocr_sprint.llm.extractor import llm_fill_header
from ocr_sprint.pipeline.confidence import compute_confidence, route
from ocr_sprint.pipeline.document_detect import DocumentDetectConfig, detect_and_correct
from ocr_sprint.pipeline.extract.personnel import extract_personnel
from ocr_sprint.pipeline.extract.personnel_text import (
extract_personnel_from_ocr_lines,
extract_personnel_from_text,
is_low_quality,
)
from ocr_sprint.pipeline.extract.regex_rules import (
extract_header,
find_signatory,
find_untuk_list,
)
from ocr_sprint.pipeline.extract.validators import validate_extraction
from ocr_sprint.pipeline.ingest import NDArrayU8, detect_source_kind, ingest
from ocr_sprint.pipeline.ocr import OCRPage, run_ocr
from ocr_sprint.pipeline.preprocess import PreprocessConfig, preprocess
from ocr_sprint.pipeline.table import DetectedTable, run_table_extraction
from ocr_sprint.schemas.document import DocumentStatus, SourceKind
from ocr_sprint.schemas.extraction import ExtractionResult, ReviewFlag
from ocr_sprint.schemas.personnel import PersonnelEntry
from ocr_sprint.utils.logging import get_logger
_logger = get_logger(__name__)
# Below this OCR confidence we automatically flag for review.
_OCR_CONFIDENCE_FLAG_THRESHOLD = 0.80
def _header_has_gaps(header: object) -> bool:
"""True if any header field worth asking the LLM about is missing.
Using ``getattr`` so this stays decoupled from the exact attribute
names; the schema change cost was too large last time we hard-coded.
"""
for field in ("nomor_sprint", "tanggal", "satuan_penerbit", "perihal"):
if not getattr(header, field, None):
return True
return not getattr(header, "dasar", None)
@dataclass
class PipelineOutput:
"""Bundle returned by the orchestrator."""
source_kind: SourceKind
status: DocumentStatus
confidence: float
result: ExtractionResult
def run_pipeline(content: bytes) -> PipelineOutput:
"""Execute the synchronous OCR + extraction pipeline on raw upload bytes."""
s = get_settings()
kind = detect_source_kind(content)
if kind == SourceKind.UNKNOWN:
raise ValueError("Unsupported file type — only PDF and common image formats are accepted.")
pages = ingest(content, kind, target_dpi=s.preprocess_target_dpi)
_logger.info("pipeline.ingested", source_kind=kind.value, pages=len(pages))
pre_cfg = PreprocessConfig(
max_side=s.ocr_max_image_side,
denoise=s.preprocess_denoise,
deskew=s.preprocess_deskew,
adaptive_threshold=s.preprocess_adaptive_threshold,
)
# Document detection only makes sense on photographed images. PDF renders
# are already flat by construction, so we skip the heavy quad search there.
detect_cfg = DocumentDetectConfig(
detect_document=s.preprocess_detect_document and kind == SourceKind.IMAGE,
remove_shadow=s.preprocess_remove_shadow and kind == SourceKind.IMAGE,
min_area_fraction=s.preprocess_min_quad_area_fraction,
)
ocr_pages: list[OCRPage] = []
cleaned_pages: list[NDArrayU8] = []
for page in pages:
corrected = detect_and_correct(page.image, detect_cfg)
cleaned = preprocess(corrected, pre_cfg)
cleaned_pages.append(cleaned)
ocr_pages.append(run_ocr(cleaned))
full_text = "\n".join(p.text for p in ocr_pages)
mean_ocr_conf = sum(p.mean_confidence for p in ocr_pages) / len(ocr_pages) if ocr_pages else 0.0
header = extract_header(full_text)
ttd = find_signatory(full_text)
# Phase 5 — hybrid extraction. The regex layer is deterministic but
# brittle to layout variants between satuan; if any header field is
# still missing we ask the local LLM to fill the gaps. The merger
# never lets the LLM overwrite a field that regex already captured.
llm_flags: list[ReviewFlag] = []
if s.llm_enabled and _header_has_gaps(header):
merged = llm_fill_header(full_text, header)
if merged is None:
llm_flags.append(ReviewFlag.LLM_UNAVAILABLE)
else:
if merged.model_dump() != header.model_dump():
llm_flags.append(ReviewFlag.LLM_FALLBACK)
header = merged
personel: list[PersonnelEntry] = []
table_flags: list[ReviewFlag] = []
if s.tables_enabled and cleaned_pages:
all_tables: list[DetectedTable] = []
for img in cleaned_pages:
try:
all_tables.extend(run_table_extraction(img))
except Exception as exc: # pragma: no cover - defensive
_logger.warning("pipeline.table_extraction_failed", error=str(exc))
personel = extract_personnel(all_tables)
_logger.info(
"pipeline.tables",
tables=len(all_tables),
personel_rows=len(personel),
)
# Text-based fallback: PP-Structure can succeed structurally but emit
# rows with only ``nama`` populated (column mapper degraded), or fail to
# detect the table at all. In both cases the regex fallback that scans
# raw OCR for rank+NRP pairs produces a much more useful result. We
# always run it when the structured path is empty or low-quality, and
# raise a review flag so the operator knows the document didn't go
# through the preferred path.
if is_low_quality(personel):
fallback_rows = extract_personnel_from_text(full_text)
# If text-based fallback produced rows but they all lack NRP
# (Pass 3 territory), retry with the column-aware extractor that
# uses OCR bounding boxes. On dense tables (e.g. Polda Kalbar
# Akpol-panitia), text-only Pass 3 bleeds adjacent columns into
# nama/jabatan because lines are interleaved within each Y-band;
# the columnar variant restricts each field to its visual column.
text_only_no_nrp = bool(fallback_rows) and all(
r.nrp is None for r in fallback_rows
)
if (not fallback_rows) or text_only_no_nrp:
ocr_lines = [ln for page in ocr_pages for ln in page.lines]
columnar_rows = extract_personnel_from_ocr_lines(ocr_lines)
if columnar_rows and (
not fallback_rows or len(columnar_rows) >= len(fallback_rows)
):
fallback_rows = columnar_rows
if fallback_rows:
personel = fallback_rows
# Pass 3 / columnar emit rows with nrp=None for sprint
# templates without an NRP column. Surface that with a
# distinct flag so operators know to expect missing NRPs by
# design rather than by OCR failure.
no_nrp = all(r.nrp is None for r in fallback_rows)
if no_nrp:
table_flags.append(ReviewFlag.PERSONNEL_TEXT_FALLBACK_NO_NRP)
else:
table_flags.append(ReviewFlag.PERSONNEL_TEXT_FALLBACK)
_logger.info(
"pipeline.personnel_text_fallback",
fallback_rows=len(fallback_rows),
no_nrp=no_nrp,
)
untuk_items = find_untuk_list(full_text)
initial_flags: list[ReviewFlag] = list(llm_flags) + list(table_flags)
if mean_ocr_conf < _OCR_CONFIDENCE_FLAG_THRESHOLD:
initial_flags.append(ReviewFlag.LOW_OCR_CONFIDENCE)
result = ExtractionResult(
header=header,
personel=personel,
untuk=untuk_items,
ttd=ttd,
raw_text=full_text,
confidence=mean_ocr_conf,
review_flags=list(initial_flags),
)
flags = validate_extraction(result)
# merge initial OCR-confidence flag with validation flags, preserving uniqueness
seen = set(flags)
for f in initial_flags:
if f not in seen:
flags.append(f)
seen.add(f)
result.review_flags = flags
final_conf = compute_confidence(mean_ocr_conf, flags)
result.confidence = final_conf
status = route(final_conf)
return PipelineOutput(
source_kind=kind,
status=status,
confidence=final_conf,
result=result,
)