diff --git a/.env.example b/.env.example
index 530eff9..07585c9 100644
--- a/.env.example
+++ b/.env.example
@@ -26,6 +26,9 @@ PREPROCESS_DETECT_DOCUMENT=true
PREPROCESS_REMOVE_SHADOW=true
PREPROCESS_MIN_QUAD_AREA_FRACTION=0.20
+# ==== Table extraction (Phase 3, PaddleOCR PP-Structure) ====
+TABLES_ENABLED=true
+
# ==== Confidence / routing (Phase 5) ====
CONFIDENCE_AUTO_APPROVE=0.95
CONFIDENCE_NEEDS_REVIEW=0.85
diff --git a/README.md b/README.md
index b8d2de0..c952258 100644
--- a/README.md
+++ b/README.md
@@ -2,7 +2,7 @@
OCR + structured extraction service for Indonesian police "surat sprint" (surat perintah) documents. Built around **FastAPI + PaddleOCR + hybrid extraction (regex → LLM lokal → validation)** with **on-premise** deployment as a hard requirement.
-> **Status:** Phase 1+2 — synchronous PDF/image OCR with regex header extraction, validation, confidence scoring, and **document detection + perspective correction + shadow removal** for phone photos. Phase 3–6 (table extraction, async pipeline, LLM extraction, HITL) are tracked in [`docs/architecture.md`](docs/architecture.md).
+> **Status:** Phase 1+2+3 — synchronous PDF/image OCR with regex header extraction, validation, confidence scoring, document detection + perspective correction + shadow removal for phone photos, and **PP-Structure table extraction** for personnel rows. Phase 4–6 (async pipeline, LLM extraction, HITL) are tracked in [`docs/architecture.md`](docs/architecture.md).
## Why this stack
@@ -67,7 +67,7 @@ Expected response (truncated):
}
```
-> **Note:** Phase 1 does not yet populate the `personel[]` table — that requires PP-Structure (Phase 3). Header fields, signatory NRP, confidence, and HITL routing are fully wired.
+> **Note:** As of Phase 3 the `personel[]` array is populated from PP-Structure table recognition. Set `TABLES_ENABLED=false` in `.env` to skip the table stage (faster on documents that you know contain no personnel table).
### Docker
@@ -97,13 +97,13 @@ Pre-commit hooks run ruff on every commit. Install once with `pre-commit install
src/ocr_sprint/
api/ # FastAPI routes + error handlers
schemas/ # Pydantic v2 models (request/response, extraction, personnel)
- pipeline/ # ingest → document_detect → preprocess → ocr → extract → validate → score
- extract/ # regex_rules.py (Phase 1) → llm.py (Phase 5)
+ pipeline/ # ingest → document_detect → preprocess → ocr + table → extract → validate → score
+ extract/ # regex_rules.py (Phase 1) + personnel.py (Phase 3) → llm.py (Phase 5)
data/ # master data (Polri ranks, etc.)
utils/ # logging, helpers
config.py # pydantic-settings
main.py # app factory
-tests/unit/ # ~60 unit tests, no PaddleOCR dependency
+tests/unit/ # 100+ unit tests, PaddleOCR / PP-Structure mocked
docs/ # architecture & decision records
```
@@ -113,7 +113,7 @@ docs/ # architecture & decision records
|---|---|---|
| 1 | Sync API, PDF/image ingest, basic preprocessing, PaddleOCR, regex header extraction, validation, confidence scoring | **Done** |
| 2 | OpenCV-based document detection, perspective transform, shadow removal for phone photos | **Done** |
-| 3 | PP-Structure table extraction for personnel rows | Planned |
+| 3 | PP-Structure table extraction for personnel rows + column mapper | **Done** |
| 4 | Async pipeline (Celery + Redis), Postgres + MinIO, auth, observability | Planned |
| 5 | LLM hybrid extraction (Ollama + structured output) | Planned |
| 6 | HITL review endpoints + audit trail | Planned |
diff --git a/src/ocr_sprint/config.py b/src/ocr_sprint/config.py
index 1e2e8a5..b85a40a 100644
--- a/src/ocr_sprint/config.py
+++ b/src/ocr_sprint/config.py
@@ -47,6 +47,9 @@ class Settings(BaseSettings):
preprocess_remove_shadow: bool = True
preprocess_min_quad_area_fraction: float = Field(0.20, ge=0.0, le=1.0)
+ # Table extraction (Phase 3) via PaddleOCR PP-Structure
+ tables_enabled: bool = True
+
# Confidence thresholds (Phase 5 routing)
confidence_auto_approve: float = Field(0.95, ge=0.0, le=1.0)
confidence_needs_review: float = Field(0.85, ge=0.0, le=1.0)
diff --git a/src/ocr_sprint/pipeline/extract/personnel.py b/src/ocr_sprint/pipeline/extract/personnel.py
new file mode 100644
index 0000000..26c0ded
--- /dev/null
+++ b/src/ocr_sprint/pipeline/extract/personnel.py
@@ -0,0 +1,316 @@
+"""Map a raw 2D table grid into a list of `PersonnelEntry`.
+
+Surat sprint personnel tables don't have a fixed schema across satuan: column
+order, header phrasing, and even whether pangkat/NRP are merged into one cell
+all vary. We deal with this by:
+
+1. Detecting the header row by keyword scoring (rows that contain "PANGKAT"
+ or "NRP" or "NAMA" are headers; the row with the highest score wins).
+2. Mapping each header cell to one of the canonical PersonnelEntry fields
+ via a synonym dictionary.
+3. Walking the remaining rows and slotting cells into fields by column
+ index. A combined "PANGKAT/NRP" or "PANGKAT/NRP/NAMA" cell is split
+ heuristically (8-digit token → NRP, known-rank token → pangkat, the
+ leftover words → nama).
+
+The mapper is deliberately conservative: when in doubt it leaves a field
+None and lets validation flag the row for HITL review.
+"""
+
+from __future__ import annotations
+
+import re
+
+from ocr_sprint.data.master_pangkat import normalize_pangkat
+from ocr_sprint.pipeline.table import DetectedTable
+from ocr_sprint.schemas.personnel import PersonnelEntry
+
+# ---------- column synonyms ----------
+
+# header keyword → canonical column id. Lowercased, whitespace-collapsed.
+_HEADER_SYNONYMS: dict[str, str] = {
+ # row index column
+ "no": "no",
+ "nomor": "no",
+ "no.": "no",
+ # rank
+ "pangkat": "pangkat",
+ "pkt": "pangkat",
+ # NRP / NIP / NIPK
+ "nrp": "nrp",
+ "no nrp": "nrp",
+ "nrp / nip": "nrp",
+ "nrp/nip": "nrp",
+ "nrp nip": "nrp",
+ "no. mhs": "nrp", # taruna
+ # combined pangkat + NRP + nama cell, seen in compact Polri layouts.
+ # Order matters here only for readability; classify_header_cell ranks
+ # synonyms by length, so the longer 'pangkat / nrp / nama' wins over
+ # both 'pangkat / nrp' and 'pangkat'.
+ "pangkat / nrp / nama": "pangkat_nrp_nama",
+ "pangkat/nrp/nama": "pangkat_nrp_nama",
+ "pangkat nrp nama": "pangkat_nrp_nama",
+ "pangkat, nrp, nama": "pangkat_nrp_nama",
+ # combined pangkat + NRP cell, common in Polres-level sprint
+ "pangkat / nrp": "pangkat_nrp",
+ "pangkat/nrp": "pangkat_nrp",
+ "pangkat dan nrp": "pangkat_nrp",
+ "pangkat nrp": "pangkat_nrp",
+ # name
+ "nama": "nama",
+ "nama lengkap": "nama",
+ # jabatan dalam dinas (permanent post)
+ "jabatan": "jabatan_dinas",
+ "jabatan dinas": "jabatan_dinas",
+ "jabatan dalam dinas": "jabatan_dinas",
+ "jbt dinas": "jabatan_dinas",
+ # jabatan dalam sprint (role for this dispatch)
+ "jabatan dalam sprint": "jabatan_sprint",
+ "jabatan dalam sprin": "jabatan_sprint",
+ "jabatan dalam surat perintah": "jabatan_sprint",
+ "jabatan sprint": "jabatan_sprint",
+ "jabatan sprin": "jabatan_sprint",
+ "tugas": "jabatan_sprint",
+ "penugasan": "jabatan_sprint",
+ # remarks
+ "keterangan": "keterangan",
+ "ket": "keterangan",
+ "ket.": "keterangan",
+}
+
+# 8-digit NRP. We don't anchor on word boundaries because OCR sometimes glues
+# the rank directly onto the digits ("BRIPKA98050505"). We use (? str:
+ return " ".join(text.lower().split()).strip(" .:")
+
+
+# Synonym keywords sorted by length (descending) so that substring matching
+# in `_classify_header_cell` prefers the most specific match. Without this,
+# 'pangkat' would match 'pangkat / nrp / nama' before 'pangkat / nrp / nama'
+# itself, silently misclassifying combined-cell headers and dropping rows.
+_SORTED_HEADER_KEYWORDS: list[tuple[str, str]] = sorted(
+ _HEADER_SYNONYMS.items(), key=lambda kv: -len(kv[0])
+)
+
+
+def _classify_header_cell(text: str) -> str | None:
+ """Return the canonical column id for a header cell, or None.
+
+ First tries an exact match against the synonym table; if that fails,
+ falls back to substring matching against the *longest* synonym that is
+ contained in the cell text. The longest-first ordering matters: a header
+ like 'Pangkat / NRP / Nama' must classify as `pangkat_nrp_nama`, not
+ `pangkat`, otherwise downstream `map_row` would treat the whole cell as
+ a rank string and drop the row when normalize_pangkat returns None.
+ """
+ norm = _normalize_header_cell(text)
+ if not norm:
+ return None
+ if norm in _HEADER_SYNONYMS:
+ return _HEADER_SYNONYMS[norm]
+ for keyword, canonical in _SORTED_HEADER_KEYWORDS:
+ if keyword in norm:
+ return canonical
+ return None
+
+
+def detect_header_row(table: DetectedTable) -> tuple[int, list[str | None]] | None:
+ """Find the most likely header row and return (row_index, column_mapping).
+
+ Strategy: score each of the first ~3 rows by how many cells classify as a
+ known column. Pick the highest-scoring row provided it covers at least
+ two known fields (otherwise we don't have enough signal to trust it).
+ """
+ best_idx: int | None = None
+ best_mapping: list[str | None] = []
+ best_score = 0
+ for r_idx in range(min(3, table.n_rows)):
+ row = table.cells[r_idx]
+ mapping = [_classify_header_cell(cell) for cell in row]
+ score = sum(1 for m in mapping if m is not None)
+ if score >= 2 and score > best_score:
+ best_score = score
+ best_idx = r_idx
+ best_mapping = mapping
+ if best_idx is None:
+ return None
+ return best_idx, best_mapping
+
+
+# ---------- combined-cell splitting ----------
+
+
+def _split_pangkat_nrp(cell: str) -> tuple[str | None, str | None]:
+ """Split a 'PANGKAT NRP' cell into (pangkat, nrp).
+
+ Returns (None, None) if the cell can't be split confidently.
+ """
+ if not cell:
+ return None, None
+ nrp_match = _NRP_RE.search(cell)
+ nrp = nrp_match.group(1) if nrp_match else None
+ pangkat_part = cell
+ if nrp_match:
+ pangkat_part = cell[: nrp_match.start()] + cell[nrp_match.end() :]
+ # Strip separators commonly seen between rank and NRP ("AKP / 87010101",
+ # "AKP. 87010101", "AKP - 87010101") before normalizing.
+ pangkat_part = pangkat_part.strip(" /-.,;:|").strip()
+ pangkat = normalize_pangkat(pangkat_part)
+ return pangkat, nrp
+
+
+def _split_pangkat_nrp_nama(cell: str) -> tuple[str | None, str | None, str | None]:
+ """Split a 'PANGKAT NRP NAMA' single-cell into its three components.
+
+ Multi-word ranks like 'KOMBES POL' or 'BRIGJEN POL' must be matched as
+ contiguous token sequences, otherwise tokens like 'POL' leak into the
+ name. We greedily try the longest leading token-prefix that normalizes
+ to a known pangkat, then fall back to shorter prefixes.
+ """
+ if not cell:
+ return None, None, None
+ nrp_match = _NRP_RE.search(cell)
+ nrp = nrp_match.group(1) if nrp_match else None
+ rest = cell
+ if nrp:
+ rest = cell.replace(nrp, " ", 1)
+ tokens = rest.split()
+ if not tokens:
+ return None, nrp, None
+
+ # Try the longest leading sub-sequence first so 'KOMBES POL' wins over
+ # 'KOMBES' (which alone is not a valid pangkat anyway).
+ pangkat: str | None = None
+ consumed = 0
+ for prefix_len in range(min(len(tokens), 3), 0, -1):
+ candidate = " ".join(tokens[:prefix_len])
+ normalized = normalize_pangkat(candidate)
+ if normalized is not None:
+ pangkat = normalized
+ consumed = prefix_len
+ break
+
+ name_tokens = tokens[consumed:] if pangkat else tokens
+ nama = " ".join(name_tokens) if name_tokens else None
+ return pangkat, nrp, nama
+
+
+# ---------- row mapping ----------
+
+
+def _parse_int(value: str) -> int | None:
+ m = _NUMBER_RE.match(value)
+ return int(m.group(1)) if m else None
+
+
+def map_row(row: list[str], mapping: list[str | None]) -> PersonnelEntry | None:
+ """Convert one data row into a PersonnelEntry using the column mapping."""
+ fields: dict[str, str | int | None] = {
+ "no": None,
+ "pangkat": None,
+ "nrp": None,
+ "nama": None,
+ "jabatan_dinas": None,
+ "jabatan_sprint": None,
+ "keterangan": None,
+ }
+ for idx, cell in enumerate(row):
+ if idx >= len(mapping):
+ break
+ column = mapping[idx]
+ if column is None:
+ continue
+ text = cell.strip()
+ if column == "no":
+ fields["no"] = _parse_int(text)
+ elif column == "pangkat_nrp_nama":
+ pangkat, nrp, nama = _split_pangkat_nrp_nama(text)
+ if pangkat:
+ fields["pangkat"] = pangkat
+ if nrp:
+ fields["nrp"] = nrp
+ if nama:
+ fields["nama"] = nama
+ elif column == "pangkat_nrp":
+ pangkat, nrp = _split_pangkat_nrp(text)
+ if pangkat:
+ fields["pangkat"] = pangkat
+ if nrp:
+ fields["nrp"] = nrp
+ elif column == "pangkat":
+ fields["pangkat"] = normalize_pangkat(text) or text or None
+ elif column == "nrp":
+ m = _NRP_RE.search(text)
+ fields["nrp"] = m.group(1) if m else (text or None)
+ elif column in fields:
+ fields[column] = text or None
+
+ # require at least nama OR nrp to consider this a real personnel row;
+ # otherwise it's likely a separator / footnote / merged cell.
+ if not (fields["nrp"] or fields["nama"]):
+ return None
+
+ return PersonnelEntry(
+ no=fields["no"] if isinstance(fields["no"], int) else None,
+ pangkat=fields["pangkat"] if isinstance(fields["pangkat"], str) else None,
+ nrp=fields["nrp"] if isinstance(fields["nrp"], str) else None,
+ nama=fields["nama"] if isinstance(fields["nama"], str) else None,
+ jabatan_dinas=(
+ fields["jabatan_dinas"] if isinstance(fields["jabatan_dinas"], str) else None
+ ),
+ jabatan_sprint=(
+ fields["jabatan_sprint"] if isinstance(fields["jabatan_sprint"], str) else None
+ ),
+ keterangan=(fields["keterangan"] if isinstance(fields["keterangan"], str) else None),
+ )
+
+
+# ---------- table-level entrypoint ----------
+
+
+def is_personnel_table(table: DetectedTable) -> bool:
+ """Heuristic: a table is the personnel list if its header row contains
+ at least one rank/NRP indicator and one name indicator.
+ """
+ detected = detect_header_row(table)
+ if detected is None:
+ return False
+ _, mapping = detected
+ # `pangkat_nrp` is an id-only signal (rank + NRP, no name), while
+ # `pangkat_nrp_nama` carries a name too. Counting `pangkat_nrp` toward
+ # `has_name` would let id-only tables (e.g. ['No', 'Pangkat / NRP',
+ # 'Jabatan']) be mistaken for personnel tables.
+ combined_id = {"pangkat_nrp", "pangkat_nrp_nama"}
+ combined_name = {"pangkat_nrp_nama"}
+ has_id = any(m in {"nrp", "pangkat"} | combined_id for m in mapping)
+ has_name = any(m == "nama" or m in combined_name for m in mapping)
+ return has_id and has_name
+
+
+def extract_personnel(tables: list[DetectedTable]) -> list[PersonnelEntry]:
+ """Pick the best-matching personnel table and convert its rows.
+
+ If multiple tables look like personnel lists (rare), we concatenate them
+ in document order so nothing is silently dropped.
+ """
+ rows: list[PersonnelEntry] = []
+ for table in tables:
+ if not is_personnel_table(table):
+ continue
+ detected = detect_header_row(table)
+ if detected is None:
+ continue
+ header_idx, mapping = detected
+ for r_idx in range(header_idx + 1, table.n_rows):
+ entry = map_row(table.cells[r_idx], mapping)
+ if entry is not None:
+ rows.append(entry)
+ return rows
diff --git a/src/ocr_sprint/pipeline/orchestrator.py b/src/ocr_sprint/pipeline/orchestrator.py
index 980f30b..f42e810 100644
--- a/src/ocr_sprint/pipeline/orchestrator.py
+++ b/src/ocr_sprint/pipeline/orchestrator.py
@@ -1,11 +1,13 @@
-"""Synchronous pipeline orchestrator (Phase 1).
+"""Synchronous pipeline orchestrator (Phase 1-3).
Wires the individual stages together:
- bytes → ingest → preprocess → OCR → regex extract → validate → score
+ bytes -> ingest -> document_detect -> preprocess -> OCR
+ -> [PP-Structure tables -> personnel mapper]
+ -> regex header extract -> validate -> score
-Phase 4 will replace this with a Celery task graph; Phase 3/5 will plug
-in PP-Structure for tables and an LLM extractor for variant fields.
+Phase 4 will replace this with a Celery task graph; Phase 5 will plug
+in an LLM extractor for variant fields.
"""
from __future__ import annotations
@@ -15,13 +17,16 @@ from dataclasses import dataclass
from ocr_sprint.config import get_settings
from ocr_sprint.pipeline.confidence import compute_confidence, route
from ocr_sprint.pipeline.document_detect import DocumentDetectConfig, detect_and_correct
+from ocr_sprint.pipeline.extract.personnel import extract_personnel
from ocr_sprint.pipeline.extract.regex_rules import extract_header, find_signatory
from ocr_sprint.pipeline.extract.validators import validate_extraction
-from ocr_sprint.pipeline.ingest import detect_source_kind, ingest
+from ocr_sprint.pipeline.ingest import NDArrayU8, detect_source_kind, ingest
from ocr_sprint.pipeline.ocr import OCRPage, run_ocr
from ocr_sprint.pipeline.preprocess import PreprocessConfig, preprocess
+from ocr_sprint.pipeline.table import DetectedTable, run_table_extraction
from ocr_sprint.schemas.document import DocumentStatus, SourceKind
from ocr_sprint.schemas.extraction import ExtractionResult, ReviewFlag
+from ocr_sprint.schemas.personnel import PersonnelEntry
from ocr_sprint.utils.logging import get_logger
_logger = get_logger(__name__)
@@ -66,9 +71,11 @@ def run_pipeline(content: bytes) -> PipelineOutput:
)
ocr_pages: list[OCRPage] = []
+ cleaned_pages: list[NDArrayU8] = []
for page in pages:
corrected = detect_and_correct(page.image, detect_cfg)
cleaned = preprocess(corrected, pre_cfg)
+ cleaned_pages.append(cleaned)
ocr_pages.append(run_ocr(cleaned))
full_text = "\n".join(p.text for p in ocr_pages)
@@ -77,13 +84,28 @@ def run_pipeline(content: bytes) -> PipelineOutput:
header = extract_header(full_text)
ttd = find_signatory(full_text)
+ personel: list[PersonnelEntry] = []
+ if s.tables_enabled and cleaned_pages:
+ all_tables: list[DetectedTable] = []
+ for img in cleaned_pages:
+ try:
+ all_tables.extend(run_table_extraction(img))
+ except Exception as exc: # pragma: no cover - defensive
+ _logger.warning("pipeline.table_extraction_failed", error=str(exc))
+ personel = extract_personnel(all_tables)
+ _logger.info(
+ "pipeline.tables",
+ tables=len(all_tables),
+ personel_rows=len(personel),
+ )
+
initial_flags: list[ReviewFlag] = []
if mean_ocr_conf < _OCR_CONFIDENCE_FLAG_THRESHOLD:
initial_flags.append(ReviewFlag.LOW_OCR_CONFIDENCE)
result = ExtractionResult(
header=header,
- personel=[], # Phase 3 will populate from PP-Structure
+ personel=personel,
untuk=[],
ttd=ttd,
raw_text=full_text,
diff --git a/src/ocr_sprint/pipeline/table.py b/src/ocr_sprint/pipeline/table.py
new file mode 100644
index 0000000..b93ccd1
--- /dev/null
+++ b/src/ocr_sprint/pipeline/table.py
@@ -0,0 +1,155 @@
+"""Phase 3 — table extraction via PaddleOCR PP-Structure.
+
+The personnel section of a surat sprint is almost always a table with columns
+like (No, Pangkat, NRP, Nama, Jabatan dalam Dinas, Jabatan dalam Sprint,
+Keterangan). Plain OCR on the page produces a flat stream of text lines that
+makes column reconstruction brittle, so we use PP-Structure's table recognizer
+which returns a 2D cell grid directly.
+
+Like the OCR engine wrapper, PP-Structure has a heavy initialization cost
+(~3-6s on CPU) and an API that has shifted across paddleocr releases, so we
+hide it behind a small process-global accessor and a stable dataclass surface.
+
+Tests do NOT require paddleocr installed — `extract_tables_from_html` and the
+personnel column mapper are pure-Python and parse PP-Structure's HTML output.
+"""
+
+from __future__ import annotations
+
+import html
+import re
+from dataclasses import dataclass, field
+from threading import Lock
+from typing import TYPE_CHECKING
+
+from ocr_sprint.config import get_settings
+from ocr_sprint.pipeline.ingest import NDArrayU8
+from ocr_sprint.utils.logging import get_logger
+
+if TYPE_CHECKING:
+ from paddleocr import PPStructure
+
+_logger = get_logger(__name__)
+_lock = Lock()
+_instance: PPStructure | None = None
+
+
+@dataclass(frozen=True)
+class TableCell:
+ """One parsed table cell."""
+
+ text: str
+ row: int
+ col: int
+
+
+@dataclass
+class DetectedTable:
+ """One table region detected by PP-Structure, parsed into a 2D grid.
+
+ `cells[r]` is a list of strings for row r. The list is ragged if the table
+ has merged cells (we don't currently un-merge), so callers should treat it
+ defensively.
+ """
+
+ cells: list[list[str]] = field(default_factory=list)
+ html: str = ""
+
+ @property
+ def n_rows(self) -> int:
+ return len(self.cells)
+
+ @property
+ def n_cols(self) -> int:
+ return max((len(r) for r in self.cells), default=0)
+
+
+# ---------- PP-Structure singleton ----------
+
+
+def _build_pp_structure() -> PPStructure:
+ from paddleocr import PPStructure
+
+ s = get_settings()
+ _logger.info("pp_structure.init", lang=s.ocr_lang, use_gpu=s.ocr_use_gpu)
+ # layout=True so that PP-Structure also returns figure/text regions; we
+ # filter to tables only afterwards. show_log=False to keep stdout clean.
+ return PPStructure(
+ lang=s.ocr_lang,
+ use_gpu=s.ocr_use_gpu,
+ layout=True,
+ show_log=False,
+ )
+
+
+def get_pp_structure() -> PPStructure:
+ """Lazy, thread-safe singleton accessor for PP-Structure."""
+ global _instance
+ if _instance is None:
+ with _lock:
+ if _instance is None:
+ _instance = _build_pp_structure()
+ return _instance
+
+
+# ---------- table parsing ----------
+
+
+_TR_RE = re.compile(r"
]*>(.*?)
", re.IGNORECASE | re.DOTALL)
+_TD_RE = re.compile(r"]*>(.*?)", re.IGNORECASE | re.DOTALL)
+_TAG_RE = re.compile(r"<[^>]+>")
+
+
+def _strip_html(fragment: str) -> str:
+ """Remove inner tags + collapse whitespace + decode HTML entities."""
+ no_tags = _TAG_RE.sub(" ", fragment)
+ decoded = html.unescape(no_tags)
+ return " ".join(decoded.split()).strip()
+
+
+def parse_table_html(table_html: str) -> list[list[str]]:
+ """Parse an HTML string into a 2D list of cell text values.
+
+ Tolerant to PP-Structure's slight HTML inconsistencies (no closing tags,
+ nested spans, entities) — we don't need full HTML compliance,
+ just rows x cells.
+ """
+ rows: list[list[str]] = []
+ for tr in _TR_RE.findall(table_html):
+ cells = [_strip_html(td) for td in _TD_RE.findall(tr)]
+ rows.append(cells)
+ return rows
+
+
+def extract_tables_from_pp_result(
+ pp_result: list[dict[str, object]],
+) -> list[DetectedTable]:
+ """Pull tables out of PP-Structure's region list.
+
+ PP-Structure returns one dict per detected region; tables have
+ `type == "table"` and the recognized table HTML inside `res["html"]`.
+ """
+ tables: list[DetectedTable] = []
+ for region in pp_result:
+ if region.get("type") != "table":
+ continue
+ res = region.get("res")
+ if not isinstance(res, dict):
+ continue
+ table_html = res.get("html", "")
+ if not isinstance(table_html, str) or not table_html:
+ continue
+ cells = parse_table_html(table_html)
+ if not cells:
+ continue
+ tables.append(DetectedTable(cells=cells, html=table_html))
+ return tables
+
+
+def run_table_extraction(image: NDArrayU8) -> list[DetectedTable]:
+ """Run PP-Structure on a single page and return the parsed tables."""
+ engine = get_pp_structure()
+ raw = engine(image)
+ if not isinstance(raw, list):
+ return []
+ return extract_tables_from_pp_result(raw)
diff --git a/tests/unit/test_personnel_mapper.py b/tests/unit/test_personnel_mapper.py
new file mode 100644
index 0000000..ab10397
--- /dev/null
+++ b/tests/unit/test_personnel_mapper.py
@@ -0,0 +1,300 @@
+"""Tests for the personnel-row mapper."""
+
+from __future__ import annotations
+
+import pytest
+
+from ocr_sprint.pipeline.extract.personnel import (
+ _classify_header_cell,
+ _split_pangkat_nrp,
+ _split_pangkat_nrp_nama,
+ detect_header_row,
+ extract_personnel,
+ is_personnel_table,
+ map_row,
+)
+from ocr_sprint.pipeline.table import DetectedTable
+
+# ---------- header detection ----------
+
+
+class TestClassifyHeaderCell:
+ @pytest.mark.parametrize(
+ ("text", "expected"),
+ [
+ ("No", "no"),
+ ("NO.", "no"),
+ ("Nomor", "no"),
+ ("Pangkat", "pangkat"),
+ ("NRP", "nrp"),
+ ("Pangkat / NRP", "pangkat_nrp"),
+ ("PANGKAT/NRP", "pangkat_nrp"),
+ ("Pangkat / NRP / Nama", "pangkat_nrp_nama"),
+ ("PANGKAT/NRP/NAMA", "pangkat_nrp_nama"),
+ ("Pangkat, NRP, Nama", "pangkat_nrp_nama"),
+ ("Nama", "nama"),
+ ("Nama Lengkap", "nama"),
+ ("Jabatan dalam Dinas", "jabatan_dinas"),
+ ("Jabatan dalam Sprint", "jabatan_sprint"),
+ ("Keterangan", "keterangan"),
+ ],
+ )
+ def test_known_header(self, text: str, expected: str) -> None:
+ assert _classify_header_cell(text) == expected
+
+ def test_substring_match_prefers_longest_synonym(self) -> None:
+ # 'pangkat' is a shorter prefix of 'pangkat / nrp / nama'. Without
+ # length-sorted iteration we'd misclassify combined headers as plain
+ # 'pangkat' and downstream map_row would drop every row.
+ assert _classify_header_cell("Pangkat / NRP / Nama Personel") == "pangkat_nrp_nama"
+ assert _classify_header_cell("Pangkat / NRP Polri") == "pangkat_nrp"
+
+ def test_unknown_header(self) -> None:
+ assert _classify_header_cell("Random Text") is None
+ assert _classify_header_cell("") is None
+
+
+class TestDetectHeaderRow:
+ def test_detects_first_row_as_header(self) -> None:
+ table = DetectedTable(
+ cells=[
+ ["No", "Pangkat", "NRP", "Nama"],
+ ["1", "AKP", "87010101", "Budi"],
+ ]
+ )
+ result = detect_header_row(table)
+ assert result is not None
+ idx, mapping = result
+ assert idx == 0
+ assert mapping == ["no", "pangkat", "nrp", "nama"]
+
+ def test_detects_second_row_when_first_is_title(self) -> None:
+ table = DetectedTable(
+ cells=[
+ ["DAFTAR PERSONEL"], # title row, not a header
+ ["No", "Pangkat / NRP", "Nama", "Jabatan dalam Dinas"],
+ ["1", "AKP 87010101", "Budi", "Kanit"],
+ ]
+ )
+ result = detect_header_row(table)
+ assert result is not None
+ idx, _ = result
+ assert idx == 1
+
+ def test_returns_none_when_no_header_found(self) -> None:
+ table = DetectedTable(cells=[["foo", "bar"], ["baz", "qux"]])
+ assert detect_header_row(table) is None
+
+
+# ---------- combined-cell splitting ----------
+
+
+class TestSplitPangkatNrp:
+ @pytest.mark.parametrize(
+ ("text", "expected"),
+ [
+ ("AKP 87010101", ("AKP", "87010101")),
+ ("IPDA / 92030404", ("IPDA", "92030404")),
+ ("BRIPKA98050505", ("BRIPKA", "98050505")),
+ ("KOMPOL 88123456", ("KOMPOL", "88123456")),
+ ],
+ )
+ def test_known_combos(self, text: str, expected: tuple[str, str]) -> None:
+ assert _split_pangkat_nrp(text) == expected
+
+ def test_returns_none_when_no_nrp(self) -> None:
+ pangkat, nrp = _split_pangkat_nrp("AKP")
+ assert pangkat == "AKP"
+ assert nrp is None
+
+
+class TestSplitPangkatNrpNama:
+ def test_three_way_split(self) -> None:
+ pangkat, nrp, nama = _split_pangkat_nrp_nama("AKP 87010101 Budi Santoso")
+ assert pangkat == "AKP"
+ assert nrp == "87010101"
+ assert nama == "Budi Santoso"
+
+ @pytest.mark.parametrize(
+ ("text", "expected_pangkat", "expected_name"),
+ [
+ # multi-word ranks must be matched as contiguous token sequences,
+ # otherwise tokens like 'POL' would leak into the name.
+ ("KOMBES POL 88123456 John Doe", "KOMBES POL", "John Doe"),
+ ("BRIGJEN POL 99887766 Jane Doe", "BRIGJEN POL", "Jane Doe"),
+ ("IRJEN POL 77665544 Ahmad Hidayat", "IRJEN POL", "Ahmad Hidayat"),
+ ("JENDERAL POL 11223344 Sari Wulandari", "JENDERAL POL", "Sari Wulandari"),
+ ],
+ )
+ def test_multi_word_ranks(self, text: str, expected_pangkat: str, expected_name: str) -> None:
+ pangkat, _nrp, nama = _split_pangkat_nrp_nama(text)
+ assert pangkat == expected_pangkat
+ assert nama == expected_name
+
+ def test_unknown_rank_returns_none_pangkat(self) -> None:
+ pangkat, nrp, nama = _split_pangkat_nrp_nama("Foobar 87010101 Budi Santoso")
+ assert pangkat is None
+ assert nrp == "87010101"
+ # name keeps the unknown rank token; validators will flag the row.
+ assert nama == "Foobar Budi Santoso"
+
+
+# ---------- row mapping ----------
+
+
+class TestMapRow:
+ def test_split_columns_polres_layout(self) -> None:
+ mapping = ["no", "pangkat", "nrp", "nama", "jabatan_dinas", "jabatan_sprint"]
+ row = ["1", "AKP", "87010101", "Budi Santoso", "Kanit Reskrim", "Ketua Tim"]
+ entry = map_row(row, mapping)
+ assert entry is not None
+ assert entry.no == 1
+ assert entry.pangkat == "AKP"
+ assert entry.nrp == "87010101"
+ assert entry.nama == "Budi Santoso"
+ assert entry.jabatan_dinas == "Kanit Reskrim"
+ assert entry.jabatan_sprint == "Ketua Tim"
+
+ def test_combined_pangkat_nrp_nama_cell(self) -> None:
+ mapping = ["no", "pangkat_nrp_nama", "jabatan_dinas", "jabatan_sprint"]
+ row = ["1", "AKP 87010101 Budi Santoso", "Kanit Reskrim", "Ketua Tim"]
+ entry = map_row(row, mapping)
+ assert entry is not None
+ assert entry.no == 1
+ assert entry.pangkat == "AKP"
+ assert entry.nrp == "87010101"
+ assert entry.nama == "Budi Santoso"
+ assert entry.jabatan_dinas == "Kanit Reskrim"
+ assert entry.jabatan_sprint == "Ketua Tim"
+
+ def test_combined_pangkat_nrp_cell(self) -> None:
+ mapping = ["no", "pangkat_nrp", "nama", "jabatan_dinas"]
+ row = ["1", "AKP 87010101", "Budi Santoso", "Kanit Reskrim"]
+ entry = map_row(row, mapping)
+ assert entry is not None
+ assert entry.pangkat == "AKP"
+ assert entry.nrp == "87010101"
+ assert entry.nama == "Budi Santoso"
+
+ def test_skips_row_without_nama_or_nrp(self) -> None:
+ mapping = ["no", "pangkat"]
+ row = ["", ""]
+ assert map_row(row, mapping) is None
+
+ def test_unknown_pangkat_kept_verbatim(self) -> None:
+ mapping = ["no", "pangkat", "nrp", "nama"]
+ row = ["1", "Foobar", "87010101", "Budi"]
+ entry = map_row(row, mapping)
+ assert entry is not None
+ # unknown pangkat is preserved so the validation layer can flag it
+ assert entry.pangkat == "Foobar"
+
+
+# ---------- end-to-end extraction ----------
+
+
+class TestExtractPersonnel:
+ def test_full_table_with_header(self) -> None:
+ table = DetectedTable(
+ cells=[
+ [
+ "No",
+ "Pangkat / NRP",
+ "Nama",
+ "Jabatan dalam Dinas",
+ "Jabatan dalam Sprint",
+ ],
+ ["1", "AKP 87010101", "Budi Santoso", "Kanit Reskrim", "Ketua Tim"],
+ ["2", "IPDA 92030404", "Sari Wulandari", "Banit Reskrim", "Anggota"],
+ ["3", "BRIPKA 98050505", "Ahmad Hidayat", "Banit Reskrim", "Anggota"],
+ ]
+ )
+ entries = extract_personnel([table])
+ assert len(entries) == 3
+ assert entries[0].nama == "Budi Santoso"
+ assert entries[0].nrp == "87010101"
+ assert entries[1].pangkat == "IPDA"
+ assert entries[2].pangkat == "BRIPKA"
+
+ def test_full_table_with_triple_combined_header(self) -> None:
+ # Regression test for header misclassification: 'Pangkat / NRP / Nama'
+ # used to be classified as 'pangkat' due to substring matching, which
+ # silently dropped every personnel row.
+ table = DetectedTable(
+ cells=[
+ ["No", "Pangkat / NRP / Nama", "Jabatan dalam Sprint"],
+ ["1", "AKP 87010101 Budi Santoso", "Ketua Tim"],
+ ["2", "IPDA 92030404 Sari Wulandari", "Anggota"],
+ ]
+ )
+ entries = extract_personnel([table])
+ assert len(entries) == 2
+ assert entries[0].pangkat == "AKP"
+ assert entries[0].nrp == "87010101"
+ assert entries[0].nama == "Budi Santoso"
+ assert entries[1].nama == "Sari Wulandari"
+
+ def test_skips_non_personnel_table(self) -> None:
+ table = DetectedTable(
+ cells=[["Tahun", "Anggaran"], ["2024", "100M"]],
+ )
+ assert extract_personnel([table]) == []
+
+ def test_concatenates_multiple_personnel_tables(self) -> None:
+ t1 = DetectedTable(
+ cells=[
+ ["No", "Pangkat", "NRP", "Nama"],
+ ["1", "AKP", "87010101", "Budi"],
+ ]
+ )
+ t2 = DetectedTable(
+ cells=[
+ ["No", "Pangkat", "NRP", "Nama"],
+ ["1", "IPDA", "92030404", "Sari"],
+ ]
+ )
+ entries = extract_personnel([t1, t2])
+ assert len(entries) == 2
+ assert entries[0].nama == "Budi"
+ assert entries[1].nama == "Sari"
+
+
+class TestIsPersonnelTable:
+ def test_matches_with_pangkat_and_nama(self) -> None:
+ table = DetectedTable(
+ cells=[["No", "Pangkat", "NRP", "Nama"], ["1", "AKP", "87010101", "X"]]
+ )
+ assert is_personnel_table(table) is True
+
+ def test_rejects_unrelated_table(self) -> None:
+ table = DetectedTable(cells=[["A", "B"], ["1", "2"]])
+ assert is_personnel_table(table) is False
+
+ def test_rejects_id_only_table_without_name_column(self) -> None:
+ # 'Pangkat / NRP' carries id but no name; without a name signal
+ # this should not be classified as a personnel table.
+ table = DetectedTable(
+ cells=[
+ ["No", "Pangkat / NRP", "Jabatan"],
+ ["1", "AKP 87010101", "Kanit Reskrim"],
+ ]
+ )
+ assert is_personnel_table(table) is False
+
+ def test_accepts_pangkat_nrp_when_separate_nama_present(self) -> None:
+ table = DetectedTable(
+ cells=[
+ ["No", "Pangkat / NRP", "Nama"],
+ ["1", "AKP 87010101", "Budi"],
+ ]
+ )
+ assert is_personnel_table(table) is True
+
+ def test_accepts_pangkat_nrp_nama_combined(self) -> None:
+ table = DetectedTable(
+ cells=[
+ ["No", "Pangkat / NRP / Nama", "Jabatan"],
+ ["1", "AKP 87010101 Budi", "Kanit"],
+ ]
+ )
+ assert is_personnel_table(table) is True
diff --git a/tests/unit/test_table.py b/tests/unit/test_table.py
new file mode 100644
index 0000000..c944269
--- /dev/null
+++ b/tests/unit/test_table.py
@@ -0,0 +1,94 @@
+"""Tests for the PP-Structure table parsing helpers (no paddleocr required)."""
+
+from __future__ import annotations
+
+import pytest
+
+from ocr_sprint.pipeline.table import (
+ DetectedTable,
+ extract_tables_from_pp_result,
+ parse_table_html,
+)
+
+
+class TestParseTableHtml:
+ def test_simple_grid(self) -> None:
+ html_str = """
+
+ | No | Pangkat | NRP | Nama |
+ | 1 | AKP | 87010101 | Budi Santoso |
+ | 2 | IPDA | 92030404 | Sari Wulandari |
+
+ """
+ rows = parse_table_html(html_str)
+ assert rows == [
+ ["No", "Pangkat", "NRP", "Nama"],
+ ["1", "AKP", "87010101", "Budi Santoso"],
+ ["2", "IPDA", "92030404", "Sari Wulandari"],
+ ]
+
+ def test_handles_th_and_entities_and_inline_tags(self) -> None:
+ html_str = (
+ "| Pangkat / NRP | Nama |
"
+ "| AKP 87010101 | Budi Santoso |
"
+ )
+ rows = parse_table_html(html_str)
+ assert rows[0] == ["Pangkat / NRP", "Nama"]
+ assert rows[1] == ["AKP 87010101", "Budi Santoso"]
+
+ def test_empty_table_returns_empty_list(self) -> None:
+ assert parse_table_html("") == []
+ assert parse_table_html("") == []
+
+
+class TestExtractTablesFromPpResult:
+ def test_filters_table_regions_and_parses_html(self) -> None:
+ pp_result = [
+ {"type": "text", "res": [{"text": "ignore me", "confidence": 0.9}]},
+ {
+ "type": "table",
+ "res": {
+ "html": "",
+ "cell_bbox": [],
+ },
+ },
+ {
+ "type": "table",
+ "res": {"html": ""}, # empty html → ignored
+ },
+ {
+ "type": "figure",
+ "res": [],
+ },
+ ]
+ tables = extract_tables_from_pp_result(pp_result)
+ assert len(tables) == 1
+ assert tables[0].cells == [["A", "B"]]
+
+ def test_no_tables_returns_empty_list(self) -> None:
+ pp_result = [{"type": "text", "res": [{"text": "x"}]}]
+ assert extract_tables_from_pp_result(pp_result) == []
+
+
+class TestDetectedTable:
+ def test_dimensions(self) -> None:
+ table = DetectedTable(cells=[["a", "b", "c"], ["d", "e"]])
+ assert table.n_rows == 2
+ assert table.n_cols == 3
+
+ def test_zero_rows(self) -> None:
+ table = DetectedTable()
+ assert table.n_rows == 0
+ assert table.n_cols == 0
+
+
+@pytest.fixture
+def sample_personnel_table() -> DetectedTable:
+ """Header + three personnel rows in a typical Polres-level format."""
+ cells = [
+ ["No", "Pangkat / NRP", "Nama", "Jabatan dalam Dinas", "Jabatan dalam Sprint"],
+ ["1", "AKP 87010101", "Budi Santoso", "Kanit Reskrim", "Ketua Tim"],
+ ["2", "IPDA 92030404", "Sari Wulandari", "Banit Reskrim", "Anggota"],
+ ["3", "BRIPKA 98050505", "Ahmad Hidayat", "Banit Reskrim", "Anggota"],
+ ]
+ return DetectedTable(cells=cells)