"""Synchronous pipeline orchestrator (Phase 1-3). Wires the individual stages together: bytes -> ingest -> document_detect -> preprocess -> OCR -> [PP-Structure tables -> personnel mapper] -> regex header extract -> validate -> score Phase 4 will replace this with a Celery task graph; Phase 5 will plug in an LLM extractor for variant fields. """ from __future__ import annotations from dataclasses import dataclass from ocr_sprint.config import get_settings from ocr_sprint.llm.extractor import llm_fill_header from ocr_sprint.pipeline.confidence import compute_confidence, route from ocr_sprint.pipeline.document_detect import DocumentDetectConfig, detect_and_correct from ocr_sprint.pipeline.extract.personnel import extract_personnel from ocr_sprint.pipeline.extract.regex_rules import extract_header, find_signatory from ocr_sprint.pipeline.extract.validators import validate_extraction from ocr_sprint.pipeline.ingest import NDArrayU8, detect_source_kind, ingest from ocr_sprint.pipeline.ocr import OCRPage, run_ocr from ocr_sprint.pipeline.preprocess import PreprocessConfig, preprocess from ocr_sprint.pipeline.table import DetectedTable, run_table_extraction from ocr_sprint.schemas.document import DocumentStatus, SourceKind from ocr_sprint.schemas.extraction import ExtractionResult, ReviewFlag from ocr_sprint.schemas.personnel import PersonnelEntry from ocr_sprint.utils.logging import get_logger _logger = get_logger(__name__) # Below this OCR confidence we automatically flag for review. _OCR_CONFIDENCE_FLAG_THRESHOLD = 0.80 def _header_has_gaps(header: object) -> bool: """True if any header field worth asking the LLM about is missing. Using ``getattr`` so this stays decoupled from the exact attribute names; the schema change cost was too large last time we hard-coded. """ for field in ("nomor_sprint", "tanggal", "satuan_penerbit", "perihal"): if not getattr(header, field, None): return True return not getattr(header, "dasar", None) @dataclass class PipelineOutput: """Bundle returned by the orchestrator.""" source_kind: SourceKind status: DocumentStatus confidence: float result: ExtractionResult def run_pipeline(content: bytes) -> PipelineOutput: """Execute the synchronous OCR + extraction pipeline on raw upload bytes.""" s = get_settings() kind = detect_source_kind(content) if kind == SourceKind.UNKNOWN: raise ValueError("Unsupported file type — only PDF and common image formats are accepted.") pages = ingest(content, kind, target_dpi=s.preprocess_target_dpi) _logger.info("pipeline.ingested", source_kind=kind.value, pages=len(pages)) pre_cfg = PreprocessConfig( max_side=s.ocr_max_image_side, denoise=s.preprocess_denoise, deskew=s.preprocess_deskew, adaptive_threshold=s.preprocess_adaptive_threshold, ) # Document detection only makes sense on photographed images. PDF renders # are already flat by construction, so we skip the heavy quad search there. detect_cfg = DocumentDetectConfig( detect_document=s.preprocess_detect_document and kind == SourceKind.IMAGE, remove_shadow=s.preprocess_remove_shadow and kind == SourceKind.IMAGE, min_area_fraction=s.preprocess_min_quad_area_fraction, ) ocr_pages: list[OCRPage] = [] cleaned_pages: list[NDArrayU8] = [] for page in pages: corrected = detect_and_correct(page.image, detect_cfg) cleaned = preprocess(corrected, pre_cfg) cleaned_pages.append(cleaned) ocr_pages.append(run_ocr(cleaned)) full_text = "\n".join(p.text for p in ocr_pages) mean_ocr_conf = sum(p.mean_confidence for p in ocr_pages) / len(ocr_pages) if ocr_pages else 0.0 header = extract_header(full_text) ttd = find_signatory(full_text) # Phase 5 — hybrid extraction. The regex layer is deterministic but # brittle to layout variants between satuan; if any header field is # still missing we ask the local LLM to fill the gaps. The merger # never lets the LLM overwrite a field that regex already captured. llm_flags: list[ReviewFlag] = [] if s.llm_enabled and _header_has_gaps(header): merged = llm_fill_header(full_text, header) if merged is None: llm_flags.append(ReviewFlag.LLM_UNAVAILABLE) else: if merged.model_dump() != header.model_dump(): llm_flags.append(ReviewFlag.LLM_FALLBACK) header = merged personel: list[PersonnelEntry] = [] if s.tables_enabled and cleaned_pages: all_tables: list[DetectedTable] = [] for img in cleaned_pages: try: all_tables.extend(run_table_extraction(img)) except Exception as exc: # pragma: no cover - defensive _logger.warning("pipeline.table_extraction_failed", error=str(exc)) personel = extract_personnel(all_tables) _logger.info( "pipeline.tables", tables=len(all_tables), personel_rows=len(personel), ) initial_flags: list[ReviewFlag] = list(llm_flags) if mean_ocr_conf < _OCR_CONFIDENCE_FLAG_THRESHOLD: initial_flags.append(ReviewFlag.LOW_OCR_CONFIDENCE) result = ExtractionResult( header=header, personel=personel, untuk=[], ttd=ttd, raw_text=full_text, confidence=mean_ocr_conf, review_flags=list(initial_flags), ) flags = validate_extraction(result) # merge initial OCR-confidence flag with validation flags, preserving uniqueness seen = set(flags) for f in initial_flags: if f not in seen: flags.append(f) seen.add(f) result.review_flags = flags final_conf = compute_confidence(mean_ocr_conf, flags) result.confidence = final_conf status = route(final_conf) return PipelineOutput( source_kind=kind, status=status, confidence=final_conf, result=result, )