Phase 3: PP-Structure table extraction + personnel column mapper (#2)

* Phase 3: PP-Structure table extraction + personnel column mapper

Adds the personnel-table stage of the pipeline. PaddleOCR's PP-Structure
recognizes table regions and emits HTML, which we parse into a 2D cell
grid. A separate column mapper detects the header row, classifies each
column to a canonical PersonnelEntry field via a synonym dictionary,
and walks the data rows.

Variant handling:
- Different satuan use different column orders and header phrasing.
  Supported synonyms for each canonical field are listed in
  pipeline/extract/personnel.py (Pangkat / NRP / Pangkat-NRP combo /
  Nama / Jabatan dalam Dinas / Jabatan dalam Sprint / Keterangan).
- A merged 'PANGKAT NRP' or 'PANGKAT NRP NAMA' cell is split using
  the 8-digit NRP regex (with look-arounds so glued forms like
  'BRIPKA98050505' work) and the master pangkat lookup.
- Unknown ranks are kept verbatim so the validation layer can flag
  them as UNKNOWN_PANGKAT for HITL review.
- Rows without nrp AND nama are dropped (separators / merged cells).

New module pipeline/table.py:
- DetectedTable dataclass (cells + html).
- parse_table_html: tag/entity-tolerant HTML -> 2D grid.
- extract_tables_from_pp_result: filter PP-Structure regions to type=table.
- run_table_extraction: top-level entrypoint with lazy-init singleton
  for the heavy PP-Structure engine.

Orchestrator now invokes table extraction (gated by TABLES_ENABLED) on
every preprocessed page and merges the discovered personnel into the
ExtractionResult. Failures are caught and logged so a flaky table
recognizer never blocks header extraction.

Tests: 38 new unit tests covering HTML parsing, region filtering,
header classification, column mapping (split, combined, glued cells),
and end-to-end personnel extraction. Total 108 tests, all green.
PaddleOCR / PP-Structure remain optional - no test imports them.

Co-authored-by: adrian kuman firmansah <adriancuman@gmail.com>

* Phase 3: fix header misclassification for combined Pangkat/NRP/Nama columns

Devin Review caught two related bugs in personnel column mapping:

1. _classify_header_cell iterated _HEADER_SYNONYMS in insertion order
   when falling back to substring matching. The dict listed shorter
   keywords first ('pangkat' before 'pangkat / nrp'), so a header like
   'Pangkat / NRP / Nama' classified as plain 'pangkat'. map_row then
   tried to normalize the whole '"AKP 87010101 Budi Santoso"' cell
   as a rank, normalize_pangkat returned None, and the row failed the
   nrp-or-nama gate at the bottom of map_row -- silently dropping
   every personnel row in tables using this layout.

2. _split_pangkat_nrp_nama existed and was unit-tested but was never
   wired up in map_row, so even if classification had worked, the
   three-way split would not have run. The module docstring claimed
   the split was supported.

Fix:
- Iterate the synonym table sorted by keyword length descending in the
  substring-match fallback so the most specific synonym wins.
- Add 'pangkat_nrp_nama' synonym entries for typical separators
  (' / ', '/', whitespace, comma).
- Wire 'pangkat_nrp_nama' into map_row using the existing helper.
- Update is_personnel_table so combined headers count as both an id
  signal and a name signal.

Tests: 6 new asserts (parametrized), 1 regression test for triple-
combined header end-to-end, 1 dedicated map_row test for the new
column type. 114 tests total, all green.

Co-authored-by: adrian kuman firmansah <adriancuman@gmail.com>

* Phase 3: handle multi-word Polri ranks in _split_pangkat_nrp_nama

Devin Review caught: token-by-token is_valid_pangkat() check could not
recognize multi-word ranks ('KOMBES POL', 'BRIGJEN POL', 'IRJEN POL',
'KOMJEN POL', 'JENDERAL POL'). For 'KOMBES POL 88123456 John Doe' the
old code returned pangkat=None, nama='KOMBES POL John Doe', and the
validator's UNKNOWN_PANGKAT flag never fired because pangkat was falsy.

New behavior: greedy longest-prefix match. After stripping the NRP we
try the leading 3-token, 2-token, 1-token slice against
normalize_pangkat() and take the longest that maps to a canonical
rank. Tokens after the matched rank become the nama. Unknown ranks
fall through to pangkat=None and the rank text stays in the nama
field, where downstream validation already flags the row.

Tests: 5 new asserts (4 multi-word ranks + 1 unknown-rank fallback),
119 total green.

Co-authored-by: adrian kuman firmansah <adriancuman@gmail.com>

* Phase 3: don't count pangkat_nrp as a name signal in is_personnel_table

Devin Review caught: a table with header ['No', 'Pangkat / NRP',
'Jabatan'] (no name column) was wrongly classified as a personnel
table because pangkat_nrp was lumped into has_name. Such a table
would produce PersonnelEntry rows with nama=None passing the nrp-or-
nama gate, polluting the personel[] output with id-only fragments.

Split the combined-cell set into combined_id (counts toward has_id)
and combined_name (counts toward has_name). Only pangkat_nrp_nama,
which actually embeds a name, qualifies for has_name. pangkat_nrp
remains an id-only signal.

Tests: 3 new asserts (rejects id-only, accepts pangkat_nrp + separate
nama, accepts pangkat_nrp_nama). 122 total green.

Co-authored-by: adrian kuman firmansah <adriancuman@gmail.com>

---------

Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
Co-authored-by: adrian kuman firmansah <adriancuman@gmail.com>
This commit is contained in:
devin-ai-integration[bot]
2026-04-25 16:10:48 +00:00
committed by GitHub
parent 812ea7e030
commit 33b38aacc7
8 changed files with 905 additions and 12 deletions

View File

@@ -26,6 +26,9 @@ PREPROCESS_DETECT_DOCUMENT=true
PREPROCESS_REMOVE_SHADOW=true
PREPROCESS_MIN_QUAD_AREA_FRACTION=0.20
# ==== Table extraction (Phase 3, PaddleOCR PP-Structure) ====
TABLES_ENABLED=true
# ==== Confidence / routing (Phase 5) ====
CONFIDENCE_AUTO_APPROVE=0.95
CONFIDENCE_NEEDS_REVIEW=0.85

View File

@@ -2,7 +2,7 @@
OCR + structured extraction service for Indonesian police "surat sprint" (surat perintah) documents. Built around **FastAPI + PaddleOCR + hybrid extraction (regex → LLM lokal → validation)** with **on-premise** deployment as a hard requirement.
> **Status:** Phase 1+2 — synchronous PDF/image OCR with regex header extraction, validation, confidence scoring, and **document detection + perspective correction + shadow removal** for phone photos. Phase 36 (table extraction, async pipeline, LLM extraction, HITL) are tracked in [`docs/architecture.md`](docs/architecture.md).
> **Status:** Phase 1+2+3 — synchronous PDF/image OCR with regex header extraction, validation, confidence scoring, document detection + perspective correction + shadow removal for phone photos, and **PP-Structure table extraction** for personnel rows. Phase 46 (async pipeline, LLM extraction, HITL) are tracked in [`docs/architecture.md`](docs/architecture.md).
## Why this stack
@@ -67,7 +67,7 @@ Expected response (truncated):
}
```
> **Note:** Phase 1 does not yet populate the `personel[]` table — that requires PP-Structure (Phase 3). Header fields, signatory NRP, confidence, and HITL routing are fully wired.
> **Note:** As of Phase 3 the `personel[]` array is populated from PP-Structure table recognition. Set `TABLES_ENABLED=false` in `.env` to skip the table stage (faster on documents that you know contain no personnel table).
### Docker
@@ -97,13 +97,13 @@ Pre-commit hooks run ruff on every commit. Install once with `pre-commit install
src/ocr_sprint/
api/ # FastAPI routes + error handlers
schemas/ # Pydantic v2 models (request/response, extraction, personnel)
pipeline/ # ingest → document_detect → preprocess → ocr → extract → validate → score
extract/ # regex_rules.py (Phase 1) → llm.py (Phase 5)
pipeline/ # ingest → document_detect → preprocess → ocr + table → extract → validate → score
extract/ # regex_rules.py (Phase 1) + personnel.py (Phase 3) → llm.py (Phase 5)
data/ # master data (Polri ranks, etc.)
utils/ # logging, helpers
config.py # pydantic-settings
main.py # app factory
tests/unit/ # ~60 unit tests, no PaddleOCR dependency
tests/unit/ # 100+ unit tests, PaddleOCR / PP-Structure mocked
docs/ # architecture & decision records
```
@@ -113,7 +113,7 @@ docs/ # architecture & decision records
|---|---|---|
| 1 | Sync API, PDF/image ingest, basic preprocessing, PaddleOCR, regex header extraction, validation, confidence scoring | **Done** |
| 2 | OpenCV-based document detection, perspective transform, shadow removal for phone photos | **Done** |
| 3 | PP-Structure table extraction for personnel rows | Planned |
| 3 | PP-Structure table extraction for personnel rows + column mapper | **Done** |
| 4 | Async pipeline (Celery + Redis), Postgres + MinIO, auth, observability | Planned |
| 5 | LLM hybrid extraction (Ollama + structured output) | Planned |
| 6 | HITL review endpoints + audit trail | Planned |

View File

@@ -47,6 +47,9 @@ class Settings(BaseSettings):
preprocess_remove_shadow: bool = True
preprocess_min_quad_area_fraction: float = Field(0.20, ge=0.0, le=1.0)
# Table extraction (Phase 3) via PaddleOCR PP-Structure
tables_enabled: bool = True
# Confidence thresholds (Phase 5 routing)
confidence_auto_approve: float = Field(0.95, ge=0.0, le=1.0)
confidence_needs_review: float = Field(0.85, ge=0.0, le=1.0)

View File

@@ -0,0 +1,316 @@
"""Map a raw 2D table grid into a list of `PersonnelEntry`.
Surat sprint personnel tables don't have a fixed schema across satuan: column
order, header phrasing, and even whether pangkat/NRP are merged into one cell
all vary. We deal with this by:
1. Detecting the header row by keyword scoring (rows that contain "PANGKAT"
or "NRP" or "NAMA" are headers; the row with the highest score wins).
2. Mapping each header cell to one of the canonical PersonnelEntry fields
via a synonym dictionary.
3. Walking the remaining rows and slotting cells into fields by column
index. A combined "PANGKAT/NRP" or "PANGKAT/NRP/NAMA" cell is split
heuristically (8-digit token → NRP, known-rank token → pangkat, the
leftover words → nama).
The mapper is deliberately conservative: when in doubt it leaves a field
None and lets validation flag the row for HITL review.
"""
from __future__ import annotations
import re
from ocr_sprint.data.master_pangkat import normalize_pangkat
from ocr_sprint.pipeline.table import DetectedTable
from ocr_sprint.schemas.personnel import PersonnelEntry
# ---------- column synonyms ----------
# header keyword → canonical column id. Lowercased, whitespace-collapsed.
_HEADER_SYNONYMS: dict[str, str] = {
# row index column
"no": "no",
"nomor": "no",
"no.": "no",
# rank
"pangkat": "pangkat",
"pkt": "pangkat",
# NRP / NIP / NIPK
"nrp": "nrp",
"no nrp": "nrp",
"nrp / nip": "nrp",
"nrp/nip": "nrp",
"nrp nip": "nrp",
"no. mhs": "nrp", # taruna
# combined pangkat + NRP + nama cell, seen in compact Polri layouts.
# Order matters here only for readability; classify_header_cell ranks
# synonyms by length, so the longer 'pangkat / nrp / nama' wins over
# both 'pangkat / nrp' and 'pangkat'.
"pangkat / nrp / nama": "pangkat_nrp_nama",
"pangkat/nrp/nama": "pangkat_nrp_nama",
"pangkat nrp nama": "pangkat_nrp_nama",
"pangkat, nrp, nama": "pangkat_nrp_nama",
# combined pangkat + NRP cell, common in Polres-level sprint
"pangkat / nrp": "pangkat_nrp",
"pangkat/nrp": "pangkat_nrp",
"pangkat dan nrp": "pangkat_nrp",
"pangkat nrp": "pangkat_nrp",
# name
"nama": "nama",
"nama lengkap": "nama",
# jabatan dalam dinas (permanent post)
"jabatan": "jabatan_dinas",
"jabatan dinas": "jabatan_dinas",
"jabatan dalam dinas": "jabatan_dinas",
"jbt dinas": "jabatan_dinas",
# jabatan dalam sprint (role for this dispatch)
"jabatan dalam sprint": "jabatan_sprint",
"jabatan dalam sprin": "jabatan_sprint",
"jabatan dalam surat perintah": "jabatan_sprint",
"jabatan sprint": "jabatan_sprint",
"jabatan sprin": "jabatan_sprint",
"tugas": "jabatan_sprint",
"penugasan": "jabatan_sprint",
# remarks
"keterangan": "keterangan",
"ket": "keterangan",
"ket.": "keterangan",
}
# 8-digit NRP. We don't anchor on word boundaries because OCR sometimes glues
# the rank directly onto the digits ("BRIPKA98050505"). We use (?<!\d) and (?!\d)
# look-arounds to make sure we don't match a substring of a longer number.
_NRP_RE = re.compile(r"(?<!\d)(\d{8})(?!\d)")
_NUMBER_RE = re.compile(r"^\s*(\d{1,3})[.)\s]*$")
# ---------- header detection ----------
def _normalize_header_cell(text: str) -> str:
return " ".join(text.lower().split()).strip(" .:")
# Synonym keywords sorted by length (descending) so that substring matching
# in `_classify_header_cell` prefers the most specific match. Without this,
# 'pangkat' would match 'pangkat / nrp / nama' before 'pangkat / nrp / nama'
# itself, silently misclassifying combined-cell headers and dropping rows.
_SORTED_HEADER_KEYWORDS: list[tuple[str, str]] = sorted(
_HEADER_SYNONYMS.items(), key=lambda kv: -len(kv[0])
)
def _classify_header_cell(text: str) -> str | None:
"""Return the canonical column id for a header cell, or None.
First tries an exact match against the synonym table; if that fails,
falls back to substring matching against the *longest* synonym that is
contained in the cell text. The longest-first ordering matters: a header
like 'Pangkat / NRP / Nama' must classify as `pangkat_nrp_nama`, not
`pangkat`, otherwise downstream `map_row` would treat the whole cell as
a rank string and drop the row when normalize_pangkat returns None.
"""
norm = _normalize_header_cell(text)
if not norm:
return None
if norm in _HEADER_SYNONYMS:
return _HEADER_SYNONYMS[norm]
for keyword, canonical in _SORTED_HEADER_KEYWORDS:
if keyword in norm:
return canonical
return None
def detect_header_row(table: DetectedTable) -> tuple[int, list[str | None]] | None:
"""Find the most likely header row and return (row_index, column_mapping).
Strategy: score each of the first ~3 rows by how many cells classify as a
known column. Pick the highest-scoring row provided it covers at least
two known fields (otherwise we don't have enough signal to trust it).
"""
best_idx: int | None = None
best_mapping: list[str | None] = []
best_score = 0
for r_idx in range(min(3, table.n_rows)):
row = table.cells[r_idx]
mapping = [_classify_header_cell(cell) for cell in row]
score = sum(1 for m in mapping if m is not None)
if score >= 2 and score > best_score:
best_score = score
best_idx = r_idx
best_mapping = mapping
if best_idx is None:
return None
return best_idx, best_mapping
# ---------- combined-cell splitting ----------
def _split_pangkat_nrp(cell: str) -> tuple[str | None, str | None]:
"""Split a 'PANGKAT NRP' cell into (pangkat, nrp).
Returns (None, None) if the cell can't be split confidently.
"""
if not cell:
return None, None
nrp_match = _NRP_RE.search(cell)
nrp = nrp_match.group(1) if nrp_match else None
pangkat_part = cell
if nrp_match:
pangkat_part = cell[: nrp_match.start()] + cell[nrp_match.end() :]
# Strip separators commonly seen between rank and NRP ("AKP / 87010101",
# "AKP. 87010101", "AKP - 87010101") before normalizing.
pangkat_part = pangkat_part.strip(" /-.,;:|").strip()
pangkat = normalize_pangkat(pangkat_part)
return pangkat, nrp
def _split_pangkat_nrp_nama(cell: str) -> tuple[str | None, str | None, str | None]:
"""Split a 'PANGKAT NRP NAMA' single-cell into its three components.
Multi-word ranks like 'KOMBES POL' or 'BRIGJEN POL' must be matched as
contiguous token sequences, otherwise tokens like 'POL' leak into the
name. We greedily try the longest leading token-prefix that normalizes
to a known pangkat, then fall back to shorter prefixes.
"""
if not cell:
return None, None, None
nrp_match = _NRP_RE.search(cell)
nrp = nrp_match.group(1) if nrp_match else None
rest = cell
if nrp:
rest = cell.replace(nrp, " ", 1)
tokens = rest.split()
if not tokens:
return None, nrp, None
# Try the longest leading sub-sequence first so 'KOMBES POL' wins over
# 'KOMBES' (which alone is not a valid pangkat anyway).
pangkat: str | None = None
consumed = 0
for prefix_len in range(min(len(tokens), 3), 0, -1):
candidate = " ".join(tokens[:prefix_len])
normalized = normalize_pangkat(candidate)
if normalized is not None:
pangkat = normalized
consumed = prefix_len
break
name_tokens = tokens[consumed:] if pangkat else tokens
nama = " ".join(name_tokens) if name_tokens else None
return pangkat, nrp, nama
# ---------- row mapping ----------
def _parse_int(value: str) -> int | None:
m = _NUMBER_RE.match(value)
return int(m.group(1)) if m else None
def map_row(row: list[str], mapping: list[str | None]) -> PersonnelEntry | None:
"""Convert one data row into a PersonnelEntry using the column mapping."""
fields: dict[str, str | int | None] = {
"no": None,
"pangkat": None,
"nrp": None,
"nama": None,
"jabatan_dinas": None,
"jabatan_sprint": None,
"keterangan": None,
}
for idx, cell in enumerate(row):
if idx >= len(mapping):
break
column = mapping[idx]
if column is None:
continue
text = cell.strip()
if column == "no":
fields["no"] = _parse_int(text)
elif column == "pangkat_nrp_nama":
pangkat, nrp, nama = _split_pangkat_nrp_nama(text)
if pangkat:
fields["pangkat"] = pangkat
if nrp:
fields["nrp"] = nrp
if nama:
fields["nama"] = nama
elif column == "pangkat_nrp":
pangkat, nrp = _split_pangkat_nrp(text)
if pangkat:
fields["pangkat"] = pangkat
if nrp:
fields["nrp"] = nrp
elif column == "pangkat":
fields["pangkat"] = normalize_pangkat(text) or text or None
elif column == "nrp":
m = _NRP_RE.search(text)
fields["nrp"] = m.group(1) if m else (text or None)
elif column in fields:
fields[column] = text or None
# require at least nama OR nrp to consider this a real personnel row;
# otherwise it's likely a separator / footnote / merged cell.
if not (fields["nrp"] or fields["nama"]):
return None
return PersonnelEntry(
no=fields["no"] if isinstance(fields["no"], int) else None,
pangkat=fields["pangkat"] if isinstance(fields["pangkat"], str) else None,
nrp=fields["nrp"] if isinstance(fields["nrp"], str) else None,
nama=fields["nama"] if isinstance(fields["nama"], str) else None,
jabatan_dinas=(
fields["jabatan_dinas"] if isinstance(fields["jabatan_dinas"], str) else None
),
jabatan_sprint=(
fields["jabatan_sprint"] if isinstance(fields["jabatan_sprint"], str) else None
),
keterangan=(fields["keterangan"] if isinstance(fields["keterangan"], str) else None),
)
# ---------- table-level entrypoint ----------
def is_personnel_table(table: DetectedTable) -> bool:
"""Heuristic: a table is the personnel list if its header row contains
at least one rank/NRP indicator and one name indicator.
"""
detected = detect_header_row(table)
if detected is None:
return False
_, mapping = detected
# `pangkat_nrp` is an id-only signal (rank + NRP, no name), while
# `pangkat_nrp_nama` carries a name too. Counting `pangkat_nrp` toward
# `has_name` would let id-only tables (e.g. ['No', 'Pangkat / NRP',
# 'Jabatan']) be mistaken for personnel tables.
combined_id = {"pangkat_nrp", "pangkat_nrp_nama"}
combined_name = {"pangkat_nrp_nama"}
has_id = any(m in {"nrp", "pangkat"} | combined_id for m in mapping)
has_name = any(m == "nama" or m in combined_name for m in mapping)
return has_id and has_name
def extract_personnel(tables: list[DetectedTable]) -> list[PersonnelEntry]:
"""Pick the best-matching personnel table and convert its rows.
If multiple tables look like personnel lists (rare), we concatenate them
in document order so nothing is silently dropped.
"""
rows: list[PersonnelEntry] = []
for table in tables:
if not is_personnel_table(table):
continue
detected = detect_header_row(table)
if detected is None:
continue
header_idx, mapping = detected
for r_idx in range(header_idx + 1, table.n_rows):
entry = map_row(table.cells[r_idx], mapping)
if entry is not None:
rows.append(entry)
return rows

View File

@@ -1,11 +1,13 @@
"""Synchronous pipeline orchestrator (Phase 1).
"""Synchronous pipeline orchestrator (Phase 1-3).
Wires the individual stages together:
bytes ingest preprocess OCR → regex extract → validate → score
bytes -> ingest -> document_detect -> preprocess -> OCR
-> [PP-Structure tables -> personnel mapper]
-> regex header extract -> validate -> score
Phase 4 will replace this with a Celery task graph; Phase 3/5 will plug
in PP-Structure for tables and an LLM extractor for variant fields.
Phase 4 will replace this with a Celery task graph; Phase 5 will plug
in an LLM extractor for variant fields.
"""
from __future__ import annotations
@@ -15,13 +17,16 @@ from dataclasses import dataclass
from ocr_sprint.config import get_settings
from ocr_sprint.pipeline.confidence import compute_confidence, route
from ocr_sprint.pipeline.document_detect import DocumentDetectConfig, detect_and_correct
from ocr_sprint.pipeline.extract.personnel import extract_personnel
from ocr_sprint.pipeline.extract.regex_rules import extract_header, find_signatory
from ocr_sprint.pipeline.extract.validators import validate_extraction
from ocr_sprint.pipeline.ingest import detect_source_kind, ingest
from ocr_sprint.pipeline.ingest import NDArrayU8, detect_source_kind, ingest
from ocr_sprint.pipeline.ocr import OCRPage, run_ocr
from ocr_sprint.pipeline.preprocess import PreprocessConfig, preprocess
from ocr_sprint.pipeline.table import DetectedTable, run_table_extraction
from ocr_sprint.schemas.document import DocumentStatus, SourceKind
from ocr_sprint.schemas.extraction import ExtractionResult, ReviewFlag
from ocr_sprint.schemas.personnel import PersonnelEntry
from ocr_sprint.utils.logging import get_logger
_logger = get_logger(__name__)
@@ -66,9 +71,11 @@ def run_pipeline(content: bytes) -> PipelineOutput:
)
ocr_pages: list[OCRPage] = []
cleaned_pages: list[NDArrayU8] = []
for page in pages:
corrected = detect_and_correct(page.image, detect_cfg)
cleaned = preprocess(corrected, pre_cfg)
cleaned_pages.append(cleaned)
ocr_pages.append(run_ocr(cleaned))
full_text = "\n".join(p.text for p in ocr_pages)
@@ -77,13 +84,28 @@ def run_pipeline(content: bytes) -> PipelineOutput:
header = extract_header(full_text)
ttd = find_signatory(full_text)
personel: list[PersonnelEntry] = []
if s.tables_enabled and cleaned_pages:
all_tables: list[DetectedTable] = []
for img in cleaned_pages:
try:
all_tables.extend(run_table_extraction(img))
except Exception as exc: # pragma: no cover - defensive
_logger.warning("pipeline.table_extraction_failed", error=str(exc))
personel = extract_personnel(all_tables)
_logger.info(
"pipeline.tables",
tables=len(all_tables),
personel_rows=len(personel),
)
initial_flags: list[ReviewFlag] = []
if mean_ocr_conf < _OCR_CONFIDENCE_FLAG_THRESHOLD:
initial_flags.append(ReviewFlag.LOW_OCR_CONFIDENCE)
result = ExtractionResult(
header=header,
personel=[], # Phase 3 will populate from PP-Structure
personel=personel,
untuk=[],
ttd=ttd,
raw_text=full_text,

View File

@@ -0,0 +1,155 @@
"""Phase 3 — table extraction via PaddleOCR PP-Structure.
The personnel section of a surat sprint is almost always a table with columns
like (No, Pangkat, NRP, Nama, Jabatan dalam Dinas, Jabatan dalam Sprint,
Keterangan). Plain OCR on the page produces a flat stream of text lines that
makes column reconstruction brittle, so we use PP-Structure's table recognizer
which returns a 2D cell grid directly.
Like the OCR engine wrapper, PP-Structure has a heavy initialization cost
(~3-6s on CPU) and an API that has shifted across paddleocr releases, so we
hide it behind a small process-global accessor and a stable dataclass surface.
Tests do NOT require paddleocr installed — `extract_tables_from_html` and the
personnel column mapper are pure-Python and parse PP-Structure's HTML output.
"""
from __future__ import annotations
import html
import re
from dataclasses import dataclass, field
from threading import Lock
from typing import TYPE_CHECKING
from ocr_sprint.config import get_settings
from ocr_sprint.pipeline.ingest import NDArrayU8
from ocr_sprint.utils.logging import get_logger
if TYPE_CHECKING:
from paddleocr import PPStructure
_logger = get_logger(__name__)
_lock = Lock()
_instance: PPStructure | None = None
@dataclass(frozen=True)
class TableCell:
"""One parsed table cell."""
text: str
row: int
col: int
@dataclass
class DetectedTable:
"""One table region detected by PP-Structure, parsed into a 2D grid.
`cells[r]` is a list of strings for row r. The list is ragged if the table
has merged cells (we don't currently un-merge), so callers should treat it
defensively.
"""
cells: list[list[str]] = field(default_factory=list)
html: str = ""
@property
def n_rows(self) -> int:
return len(self.cells)
@property
def n_cols(self) -> int:
return max((len(r) for r in self.cells), default=0)
# ---------- PP-Structure singleton ----------
def _build_pp_structure() -> PPStructure:
from paddleocr import PPStructure
s = get_settings()
_logger.info("pp_structure.init", lang=s.ocr_lang, use_gpu=s.ocr_use_gpu)
# layout=True so that PP-Structure also returns figure/text regions; we
# filter to tables only afterwards. show_log=False to keep stdout clean.
return PPStructure(
lang=s.ocr_lang,
use_gpu=s.ocr_use_gpu,
layout=True,
show_log=False,
)
def get_pp_structure() -> PPStructure:
"""Lazy, thread-safe singleton accessor for PP-Structure."""
global _instance
if _instance is None:
with _lock:
if _instance is None:
_instance = _build_pp_structure()
return _instance
# ---------- table parsing ----------
_TR_RE = re.compile(r"<tr[^>]*>(.*?)</tr>", re.IGNORECASE | re.DOTALL)
_TD_RE = re.compile(r"<t[dh][^>]*>(.*?)</t[dh]>", re.IGNORECASE | re.DOTALL)
_TAG_RE = re.compile(r"<[^>]+>")
def _strip_html(fragment: str) -> str:
"""Remove inner tags + collapse whitespace + decode HTML entities."""
no_tags = _TAG_RE.sub(" ", fragment)
decoded = html.unescape(no_tags)
return " ".join(decoded.split()).strip()
def parse_table_html(table_html: str) -> list[list[str]]:
"""Parse an HTML <table> string into a 2D list of cell text values.
Tolerant to PP-Structure's slight HTML inconsistencies (no closing tags,
nested spans, &nbsp; entities) — we don't need full HTML compliance,
just rows x cells.
"""
rows: list[list[str]] = []
for tr in _TR_RE.findall(table_html):
cells = [_strip_html(td) for td in _TD_RE.findall(tr)]
rows.append(cells)
return rows
def extract_tables_from_pp_result(
pp_result: list[dict[str, object]],
) -> list[DetectedTable]:
"""Pull tables out of PP-Structure's region list.
PP-Structure returns one dict per detected region; tables have
`type == "table"` and the recognized table HTML inside `res["html"]`.
"""
tables: list[DetectedTable] = []
for region in pp_result:
if region.get("type") != "table":
continue
res = region.get("res")
if not isinstance(res, dict):
continue
table_html = res.get("html", "")
if not isinstance(table_html, str) or not table_html:
continue
cells = parse_table_html(table_html)
if not cells:
continue
tables.append(DetectedTable(cells=cells, html=table_html))
return tables
def run_table_extraction(image: NDArrayU8) -> list[DetectedTable]:
"""Run PP-Structure on a single page and return the parsed tables."""
engine = get_pp_structure()
raw = engine(image)
if not isinstance(raw, list):
return []
return extract_tables_from_pp_result(raw)

View File

@@ -0,0 +1,300 @@
"""Tests for the personnel-row mapper."""
from __future__ import annotations
import pytest
from ocr_sprint.pipeline.extract.personnel import (
_classify_header_cell,
_split_pangkat_nrp,
_split_pangkat_nrp_nama,
detect_header_row,
extract_personnel,
is_personnel_table,
map_row,
)
from ocr_sprint.pipeline.table import DetectedTable
# ---------- header detection ----------
class TestClassifyHeaderCell:
@pytest.mark.parametrize(
("text", "expected"),
[
("No", "no"),
("NO.", "no"),
("Nomor", "no"),
("Pangkat", "pangkat"),
("NRP", "nrp"),
("Pangkat / NRP", "pangkat_nrp"),
("PANGKAT/NRP", "pangkat_nrp"),
("Pangkat / NRP / Nama", "pangkat_nrp_nama"),
("PANGKAT/NRP/NAMA", "pangkat_nrp_nama"),
("Pangkat, NRP, Nama", "pangkat_nrp_nama"),
("Nama", "nama"),
("Nama Lengkap", "nama"),
("Jabatan dalam Dinas", "jabatan_dinas"),
("Jabatan dalam Sprint", "jabatan_sprint"),
("Keterangan", "keterangan"),
],
)
def test_known_header(self, text: str, expected: str) -> None:
assert _classify_header_cell(text) == expected
def test_substring_match_prefers_longest_synonym(self) -> None:
# 'pangkat' is a shorter prefix of 'pangkat / nrp / nama'. Without
# length-sorted iteration we'd misclassify combined headers as plain
# 'pangkat' and downstream map_row would drop every row.
assert _classify_header_cell("Pangkat / NRP / Nama Personel") == "pangkat_nrp_nama"
assert _classify_header_cell("Pangkat / NRP Polri") == "pangkat_nrp"
def test_unknown_header(self) -> None:
assert _classify_header_cell("Random Text") is None
assert _classify_header_cell("") is None
class TestDetectHeaderRow:
def test_detects_first_row_as_header(self) -> None:
table = DetectedTable(
cells=[
["No", "Pangkat", "NRP", "Nama"],
["1", "AKP", "87010101", "Budi"],
]
)
result = detect_header_row(table)
assert result is not None
idx, mapping = result
assert idx == 0
assert mapping == ["no", "pangkat", "nrp", "nama"]
def test_detects_second_row_when_first_is_title(self) -> None:
table = DetectedTable(
cells=[
["DAFTAR PERSONEL"], # title row, not a header
["No", "Pangkat / NRP", "Nama", "Jabatan dalam Dinas"],
["1", "AKP 87010101", "Budi", "Kanit"],
]
)
result = detect_header_row(table)
assert result is not None
idx, _ = result
assert idx == 1
def test_returns_none_when_no_header_found(self) -> None:
table = DetectedTable(cells=[["foo", "bar"], ["baz", "qux"]])
assert detect_header_row(table) is None
# ---------- combined-cell splitting ----------
class TestSplitPangkatNrp:
@pytest.mark.parametrize(
("text", "expected"),
[
("AKP 87010101", ("AKP", "87010101")),
("IPDA / 92030404", ("IPDA", "92030404")),
("BRIPKA98050505", ("BRIPKA", "98050505")),
("KOMPOL 88123456", ("KOMPOL", "88123456")),
],
)
def test_known_combos(self, text: str, expected: tuple[str, str]) -> None:
assert _split_pangkat_nrp(text) == expected
def test_returns_none_when_no_nrp(self) -> None:
pangkat, nrp = _split_pangkat_nrp("AKP")
assert pangkat == "AKP"
assert nrp is None
class TestSplitPangkatNrpNama:
def test_three_way_split(self) -> None:
pangkat, nrp, nama = _split_pangkat_nrp_nama("AKP 87010101 Budi Santoso")
assert pangkat == "AKP"
assert nrp == "87010101"
assert nama == "Budi Santoso"
@pytest.mark.parametrize(
("text", "expected_pangkat", "expected_name"),
[
# multi-word ranks must be matched as contiguous token sequences,
# otherwise tokens like 'POL' would leak into the name.
("KOMBES POL 88123456 John Doe", "KOMBES POL", "John Doe"),
("BRIGJEN POL 99887766 Jane Doe", "BRIGJEN POL", "Jane Doe"),
("IRJEN POL 77665544 Ahmad Hidayat", "IRJEN POL", "Ahmad Hidayat"),
("JENDERAL POL 11223344 Sari Wulandari", "JENDERAL POL", "Sari Wulandari"),
],
)
def test_multi_word_ranks(self, text: str, expected_pangkat: str, expected_name: str) -> None:
pangkat, _nrp, nama = _split_pangkat_nrp_nama(text)
assert pangkat == expected_pangkat
assert nama == expected_name
def test_unknown_rank_returns_none_pangkat(self) -> None:
pangkat, nrp, nama = _split_pangkat_nrp_nama("Foobar 87010101 Budi Santoso")
assert pangkat is None
assert nrp == "87010101"
# name keeps the unknown rank token; validators will flag the row.
assert nama == "Foobar Budi Santoso"
# ---------- row mapping ----------
class TestMapRow:
def test_split_columns_polres_layout(self) -> None:
mapping = ["no", "pangkat", "nrp", "nama", "jabatan_dinas", "jabatan_sprint"]
row = ["1", "AKP", "87010101", "Budi Santoso", "Kanit Reskrim", "Ketua Tim"]
entry = map_row(row, mapping)
assert entry is not None
assert entry.no == 1
assert entry.pangkat == "AKP"
assert entry.nrp == "87010101"
assert entry.nama == "Budi Santoso"
assert entry.jabatan_dinas == "Kanit Reskrim"
assert entry.jabatan_sprint == "Ketua Tim"
def test_combined_pangkat_nrp_nama_cell(self) -> None:
mapping = ["no", "pangkat_nrp_nama", "jabatan_dinas", "jabatan_sprint"]
row = ["1", "AKP 87010101 Budi Santoso", "Kanit Reskrim", "Ketua Tim"]
entry = map_row(row, mapping)
assert entry is not None
assert entry.no == 1
assert entry.pangkat == "AKP"
assert entry.nrp == "87010101"
assert entry.nama == "Budi Santoso"
assert entry.jabatan_dinas == "Kanit Reskrim"
assert entry.jabatan_sprint == "Ketua Tim"
def test_combined_pangkat_nrp_cell(self) -> None:
mapping = ["no", "pangkat_nrp", "nama", "jabatan_dinas"]
row = ["1", "AKP 87010101", "Budi Santoso", "Kanit Reskrim"]
entry = map_row(row, mapping)
assert entry is not None
assert entry.pangkat == "AKP"
assert entry.nrp == "87010101"
assert entry.nama == "Budi Santoso"
def test_skips_row_without_nama_or_nrp(self) -> None:
mapping = ["no", "pangkat"]
row = ["", ""]
assert map_row(row, mapping) is None
def test_unknown_pangkat_kept_verbatim(self) -> None:
mapping = ["no", "pangkat", "nrp", "nama"]
row = ["1", "Foobar", "87010101", "Budi"]
entry = map_row(row, mapping)
assert entry is not None
# unknown pangkat is preserved so the validation layer can flag it
assert entry.pangkat == "Foobar"
# ---------- end-to-end extraction ----------
class TestExtractPersonnel:
def test_full_table_with_header(self) -> None:
table = DetectedTable(
cells=[
[
"No",
"Pangkat / NRP",
"Nama",
"Jabatan dalam Dinas",
"Jabatan dalam Sprint",
],
["1", "AKP 87010101", "Budi Santoso", "Kanit Reskrim", "Ketua Tim"],
["2", "IPDA 92030404", "Sari Wulandari", "Banit Reskrim", "Anggota"],
["3", "BRIPKA 98050505", "Ahmad Hidayat", "Banit Reskrim", "Anggota"],
]
)
entries = extract_personnel([table])
assert len(entries) == 3
assert entries[0].nama == "Budi Santoso"
assert entries[0].nrp == "87010101"
assert entries[1].pangkat == "IPDA"
assert entries[2].pangkat == "BRIPKA"
def test_full_table_with_triple_combined_header(self) -> None:
# Regression test for header misclassification: 'Pangkat / NRP / Nama'
# used to be classified as 'pangkat' due to substring matching, which
# silently dropped every personnel row.
table = DetectedTable(
cells=[
["No", "Pangkat / NRP / Nama", "Jabatan dalam Sprint"],
["1", "AKP 87010101 Budi Santoso", "Ketua Tim"],
["2", "IPDA 92030404 Sari Wulandari", "Anggota"],
]
)
entries = extract_personnel([table])
assert len(entries) == 2
assert entries[0].pangkat == "AKP"
assert entries[0].nrp == "87010101"
assert entries[0].nama == "Budi Santoso"
assert entries[1].nama == "Sari Wulandari"
def test_skips_non_personnel_table(self) -> None:
table = DetectedTable(
cells=[["Tahun", "Anggaran"], ["2024", "100M"]],
)
assert extract_personnel([table]) == []
def test_concatenates_multiple_personnel_tables(self) -> None:
t1 = DetectedTable(
cells=[
["No", "Pangkat", "NRP", "Nama"],
["1", "AKP", "87010101", "Budi"],
]
)
t2 = DetectedTable(
cells=[
["No", "Pangkat", "NRP", "Nama"],
["1", "IPDA", "92030404", "Sari"],
]
)
entries = extract_personnel([t1, t2])
assert len(entries) == 2
assert entries[0].nama == "Budi"
assert entries[1].nama == "Sari"
class TestIsPersonnelTable:
def test_matches_with_pangkat_and_nama(self) -> None:
table = DetectedTable(
cells=[["No", "Pangkat", "NRP", "Nama"], ["1", "AKP", "87010101", "X"]]
)
assert is_personnel_table(table) is True
def test_rejects_unrelated_table(self) -> None:
table = DetectedTable(cells=[["A", "B"], ["1", "2"]])
assert is_personnel_table(table) is False
def test_rejects_id_only_table_without_name_column(self) -> None:
# 'Pangkat / NRP' carries id but no name; without a name signal
# this should not be classified as a personnel table.
table = DetectedTable(
cells=[
["No", "Pangkat / NRP", "Jabatan"],
["1", "AKP 87010101", "Kanit Reskrim"],
]
)
assert is_personnel_table(table) is False
def test_accepts_pangkat_nrp_when_separate_nama_present(self) -> None:
table = DetectedTable(
cells=[
["No", "Pangkat / NRP", "Nama"],
["1", "AKP 87010101", "Budi"],
]
)
assert is_personnel_table(table) is True
def test_accepts_pangkat_nrp_nama_combined(self) -> None:
table = DetectedTable(
cells=[
["No", "Pangkat / NRP / Nama", "Jabatan"],
["1", "AKP 87010101 Budi", "Kanit"],
]
)
assert is_personnel_table(table) is True

94
tests/unit/test_table.py Normal file
View File

@@ -0,0 +1,94 @@
"""Tests for the PP-Structure table parsing helpers (no paddleocr required)."""
from __future__ import annotations
import pytest
from ocr_sprint.pipeline.table import (
DetectedTable,
extract_tables_from_pp_result,
parse_table_html,
)
class TestParseTableHtml:
def test_simple_grid(self) -> None:
html_str = """
<html><body><table>
<tr><td>No</td><td>Pangkat</td><td>NRP</td><td>Nama</td></tr>
<tr><td>1</td><td>AKP</td><td>87010101</td><td>Budi Santoso</td></tr>
<tr><td>2</td><td>IPDA</td><td>92030404</td><td>Sari Wulandari</td></tr>
</table></body></html>
"""
rows = parse_table_html(html_str)
assert rows == [
["No", "Pangkat", "NRP", "Nama"],
["1", "AKP", "87010101", "Budi Santoso"],
["2", "IPDA", "92030404", "Sari Wulandari"],
]
def test_handles_th_and_entities_and_inline_tags(self) -> None:
html_str = (
"<table><tr><th>Pangkat&nbsp;/ NRP</th><th>Nama</th></tr>"
"<tr><td>AKP <b>87010101</b></td><td>Budi&nbsp;Santoso</td></tr></table>"
)
rows = parse_table_html(html_str)
assert rows[0] == ["Pangkat / NRP", "Nama"]
assert rows[1] == ["AKP 87010101", "Budi Santoso"]
def test_empty_table_returns_empty_list(self) -> None:
assert parse_table_html("<table></table>") == []
assert parse_table_html("") == []
class TestExtractTablesFromPpResult:
def test_filters_table_regions_and_parses_html(self) -> None:
pp_result = [
{"type": "text", "res": [{"text": "ignore me", "confidence": 0.9}]},
{
"type": "table",
"res": {
"html": "<table><tr><td>A</td><td>B</td></tr></table>",
"cell_bbox": [],
},
},
{
"type": "table",
"res": {"html": ""}, # empty html → ignored
},
{
"type": "figure",
"res": [],
},
]
tables = extract_tables_from_pp_result(pp_result)
assert len(tables) == 1
assert tables[0].cells == [["A", "B"]]
def test_no_tables_returns_empty_list(self) -> None:
pp_result = [{"type": "text", "res": [{"text": "x"}]}]
assert extract_tables_from_pp_result(pp_result) == []
class TestDetectedTable:
def test_dimensions(self) -> None:
table = DetectedTable(cells=[["a", "b", "c"], ["d", "e"]])
assert table.n_rows == 2
assert table.n_cols == 3
def test_zero_rows(self) -> None:
table = DetectedTable()
assert table.n_rows == 0
assert table.n_cols == 0
@pytest.fixture
def sample_personnel_table() -> DetectedTable:
"""Header + three personnel rows in a typical Polres-level format."""
cells = [
["No", "Pangkat / NRP", "Nama", "Jabatan dalam Dinas", "Jabatan dalam Sprint"],
["1", "AKP 87010101", "Budi Santoso", "Kanit Reskrim", "Ketua Tim"],
["2", "IPDA 92030404", "Sari Wulandari", "Banit Reskrim", "Anggota"],
["3", "BRIPKA 98050505", "Ahmad Hidayat", "Banit Reskrim", "Anggota"],
]
return DetectedTable(cells=cells)