Fix personnel extraction + header bugs on real Polres Cimahi sprint

This fixes 4 bugs found on a real Polres Cimahi SPRIN PDF:

1. satuan_penerbit captured the generic 'KEPOLISIAN NEGARA REPUBLIK
   INDONESIA' letterhead line instead of the most-specific issuing unit
   (e.g. RESOR CIMAHI / SEKTOR PADALARANG). Reworked find_satuan to
   scan for each level independently and return the deepest available.

2. find_dasar_list dropped numbered items when OCR put the marker on
   its own line ("1.\n Undang-Undang ..."). Refactored into
   _collect_numbered_section that buffers a bare-number line and uses
   the next non-empty line as the body. Also reused for the new
   find_untuk_list which extracts the previously-empty 'untuk' bullets.

3. find_perihal returned None for documents that use 'Pertimbangan'
   (very common in Polres-level sprint), forcing the LLM to guess.
   Added a regex fallback that picks up the first line under a
   'Pertimbangan' label so we keep extraction deterministic.

4. Personnel rows were emitted with only nama populated when
   PP-Structure detected a table but the column mapper degraded.
   Added a text-based fallback (extract_personnel_from_text) that
   scans raw OCR for <rank> + <8-digit NRP> patterns. Triggered when
   the PP-Structure result has fewer than 30% rank/NRP-bearing rows.
   Reviewed by raising the new PERSONNEL_TEXT_FALLBACK flag.

5. Validation now flags rows with neither pangkat nor nrp as
   INCOMPLETE_PERSONNEL_ROW, so the document routes to needs_review
   even when individual nrp/pangkat checks pass on empty values.

6. Added 'BRIGPOL' as a variant of BRIGADIR (seen in real scans).

Tests: 229 (was 203) — 26 new tests covering the regex fixes,
text-based personnel extractor, low-quality detector, validator
behaviour, and orchestrator wiring of the fallback path.

Co-Authored-By: adrian kuman firmansah <adriancuman@gmail.com>
This commit is contained in:
Devin AI
2026-04-26 05:35:42 +00:00
parent dce77e80e1
commit 58a2bf2648
11 changed files with 747 additions and 39 deletions

View File

@@ -0,0 +1,203 @@
"""Text-based fallback personnel extractor.
PP-Structure (Phase 3) is the primary path for personnel rows because it
preserves the table grid. But PP-Structure can fail in two ways on real
sprint scans:
1. The table is not detected at all (low-quality scan, watermark, atypical
layout) — `extract_personnel` returns an empty list.
2. The table IS detected but the column mapping is too sparse, so each row
collapses to a single ``nama`` cell with all other fields ``None``. This
is what was observed on a real Polres Cimahi sprint where the OCR
produced 24 rows with only ``nama`` populated.
This module provides a regex/heuristic fallback that operates directly on
the flat OCR text. It is deliberately conservative: a row must have BOTH a
recognizable Polri rank AND an 8-digit NRP to be emitted, so we never
generate the kind of "name-only" rows that motivated the fallback in the
first place.
"""
from __future__ import annotations
import re
from ocr_sprint.data.master_pangkat import (
PANGKAT_VARIANTS,
is_valid_pangkat,
normalize_pangkat,
)
from ocr_sprint.schemas.personnel import PersonnelEntry
# Build a single alternation of all known rank tokens (longest first so multi-
# word ranks like "KOMBES POL" win over the single-word "KOMBES").
_RANK_TOKENS: tuple[str, ...] = tuple(
sorted(
{variant for variants in PANGKAT_VARIANTS.values() for variant in variants},
key=lambda v: -len(v),
)
)
_RANK_ALT = "|".join(re.escape(tok) for tok in _RANK_TOKENS)
# A line that contains a rank token followed (anywhere on the same line) by
# an 8-digit NRP. We allow common separators: '/', '-', '.', ',', ':' or
# whitespace. Rank token must be word-bounded so "BRIPDA" doesn't match
# inside e.g. "ABRIPDA-style" text.
_RE_RANK_NRP_LINE = re.compile(
rf"\b(?P<rank>{_RANK_ALT})\b[\s/.\-,:]*?(?P<nrp>\d{{8}})\b",
re.IGNORECASE,
)
# A bare row number marker like "1." or "12)". OCR often puts it on its own
# line in tabular layouts.
_RE_ROW_NUMBER = re.compile(r"^\s*(\d{1,3})\s*[.)]\s*$")
# Lines that should never be interpreted as a personnel name. These are
# section headers, OCR garbage anchors, and column header tokens.
_NAME_BLOCKLIST_PREFIXES: tuple[str, ...] = (
"DASAR",
"PERIHAL",
"PERTIMBANGAN",
"DIPERINTAHKAN",
"KEPADA",
"UNTUK",
"TEMBUSAN",
"DIKELUARKAN",
"PADA TANGGAL",
"SELESAI",
"DAFTAR",
"LAMPIRAN",
"NOMOR",
"TANGGAL",
"KEPOLISIAN",
"DAERAH",
"RESOR",
"SEKTOR",
"MABES",
"SURAT PERINTAH",
"NRP",
"NIP",
"PANGKAT",
"JABATAN",
"NAMA",
"KETERANGAN",
"KET",
"NO",
)
# A name should look like a name: mostly letters, common punctuation, and
# at least one alphabetic character. Pure-numeric or pure-symbol lines are
# rejected.
_RE_NAME_OK = re.compile(r"[A-Za-z]")
def _is_plausible_name(line: str) -> bool:
"""Return True iff ``line`` could plausibly be a personnel name."""
stripped = line.strip()
if not stripped or not _RE_NAME_OK.search(stripped):
return False
upper = stripped.upper()
for prefix in _NAME_BLOCKLIST_PREFIXES:
if upper.startswith(prefix):
return False
if _RE_ROW_NUMBER.match(stripped):
return False
if _RE_RANK_NRP_LINE.search(stripped):
return False
# Reject lines that are nothing but a row number with extra punctuation
# ("1 .", "2)") which the bare-number regex above might miss.
return not re.fullmatch(r"[\s\d.)(\-]+", stripped)
def _following_jabatan(lines: list[str], idx: int) -> str | None:
"""Collect 1-3 follow-up lines after the rank+NRP line as the jabatan.
Stops at the next rank+NRP line, the next bare row-number line, or any
blocked prefix (section header / column header).
"""
parts: list[str] = []
for fwd in range(idx + 1, min(idx + 4, len(lines))):
candidate = lines[fwd].strip()
if not candidate:
if parts:
break
continue
if _RE_RANK_NRP_LINE.search(candidate):
break
if _RE_ROW_NUMBER.match(candidate):
break
upper = candidate.upper()
if any(upper.startswith(p) for p in _NAME_BLOCKLIST_PREFIXES):
break
parts.append(candidate)
if not parts:
return None
joined = " ".join(parts)
return " ".join(joined.split()) or None
def extract_personnel_from_text(raw_text: str) -> list[PersonnelEntry]:
"""Best-effort personnel extraction from a flat OCR text stream.
Strategy:
1. Iterate every line. Skip lines that don't contain both a known rank
and an 8-digit NRP (those are the only signal we trust).
2. For each rank+NRP line, look back for the most recent plausible name
line, and forward 1-3 lines for jabatan content.
3. Emit a ``PersonnelEntry`` only when we have at least pangkat + nrp.
The fallback is intentionally rate-limited: the first matching rank
token on a line wins (no greedy multi-match per line), and a name line
can only be consumed once (so a stray ranked text inside a paragraph
doesn't turn into multiple bogus entries).
"""
lines = raw_text.splitlines()
consumed_names: set[int] = set()
rows: list[PersonnelEntry] = []
for idx, raw_line in enumerate(lines):
line = raw_line.strip()
match = _RE_RANK_NRP_LINE.search(line)
if not match:
continue
pangkat = normalize_pangkat(match.group("rank"))
if not pangkat or not is_valid_pangkat(pangkat):
continue
nrp = match.group("nrp")
nama: str | None = None
for back in range(idx - 1, max(idx - 6, -1), -1):
if back in consumed_names:
continue
candidate = lines[back].strip()
if _is_plausible_name(candidate):
nama = candidate
consumed_names.add(back)
break
jabatan = _following_jabatan(lines, idx)
rows.append(
PersonnelEntry(
no=None,
pangkat=pangkat,
nrp=nrp,
nama=nama,
jabatan_dinas=jabatan,
jabatan_sprint=None,
keterangan=None,
)
)
return rows
def is_low_quality(rows: list[PersonnelEntry]) -> bool:
"""Heuristic: did PP-Structure produce useless rows?
A row is useful when it has at least pangkat OR nrp. If most rows have
only ``nama`` (or worse, nothing) the table extraction failed and the
caller should retry with the text-based fallback.
"""
if not rows:
return True
useful = sum(1 for r in rows if r.pangkat or r.nrp)
# Require at least 30% of rows to carry rank/NRP signal. Below that we
# assume the column mapper degraded to "everything is nama" and prefer
# a fresh attempt.
return useful / max(1, len(rows)) < 0.3

View File

@@ -53,19 +53,52 @@ _RE_TANGGAL_ID = re.compile(
re.IGNORECASE,
)
# Satuan penerbit usually appears in the document letterhead, prefixed by
# KEPOLISIAN <NEGARA|DAERAH|RESORT|SEKTOR>.
_RE_SATUAN = re.compile(
r"KEPOLISIAN\s+(?:NEGARA\s+REPUBLIK\s+INDONESIA|DAERAH|RESOR(?:T)?|SEKTOR|RESORT)"
r"[^\n]{0,80}",
# Polri letterhead pieces. The full letterhead spans multiple lines that are
# often broken across separate OCR rows like:
#
# KEPOLISIAN NEGARA REPUBLIK INDONESIA
# DAERAH JAWA BARAT
# RESOR CIMAHI
#
# We capture each individual level so we can reconstruct the most-specific
# unit (RESOR / SEKTOR > DAERAH > NEGARA) — a downstream consumer cares
# about *which* unit issued the sprint, not just that some Polri unit did.
_RE_LEVEL_NEGARA = re.compile(
r"KEPOLISIAN\s+NEGARA\s+REPUBLIK\s+INDONESIA",
re.IGNORECASE,
)
_RE_LEVEL_DAERAH = re.compile(
r"(?:KEPOLISIAN\s+)?DAERAH\s+([A-Z][A-Z .'/-]{1,60}?)(?:$|\s*\n)",
re.IGNORECASE | re.MULTILINE,
)
_RE_LEVEL_RESOR = re.compile(
r"(?:KEPOLISIAN\s+)?RESORT?\s+([A-Z][A-Z .'/-]{1,60}?)(?:$|\s*\n)",
re.IGNORECASE | re.MULTILINE,
)
_RE_LEVEL_SEKTOR = re.compile(
r"(?:KEPOLISIAN\s+)?SEKTOR\s+([A-Z][A-Z .'/-]{1,60}?)(?:$|\s*\n)",
re.IGNORECASE | re.MULTILINE,
)
_RE_LEVEL_MABES = re.compile(r"MABES\s+POLRI\b", re.IGNORECASE)
# "Perihal : ...." up to end of line.
_RE_PERIHAL = re.compile(r"PERIHAL\s*[:\-]\s*(.+)", re.IGNORECASE)
# Many sprint docs (especially Polres-level) use 'Pertimbangan' as the
# single-paragraph rationale block instead of (or alongside) 'Perihal'.
# When `perihal` is missing we fall back to the first non-empty line under
# 'Pertimbangan :' so the LLM doesn't have to guess and so a downstream
# audit trail still has *something* in the perihal slot.
_RE_PERTIMBANGAN_LABEL = re.compile(r"^\s*PERTIMBANGAN\b", re.IGNORECASE)
# A dasar entry typically begins with a number and dot, e.g. "1. UU No. 2 Tahun 2002 ..."
_RE_DASAR_ITEM = re.compile(r"^\s*(\d+)\s*[.)]\s*(.+)$")
# OCR sometimes splits the number from its content across two lines:
# 1.
# Undang-Undang Nomor 2 Tahun 2002 ...
# We detect a bare-number line and merge with the next non-empty line.
_RE_DASAR_BARE_NUMBER = re.compile(r"^\s*(\d+)\s*[.)]\s*$")
# Generic 'untuk' bullet — same shape as a dasar item.
_RE_UNTUK_ITEM = re.compile(r"^\s*(\d+)\s*[.)]\s*(.+)$")
# Signatory NRP — Polri NRPs are 8 digits, civil servant NIPs are 18 digits.
_RE_NRP = re.compile(r"\b(NRP|NIP)\s*[.:]?\s*(\d{8,20})\b", re.IGNORECASE)
@@ -99,54 +132,159 @@ def find_tanggal(text: str) -> date | None:
return None
def _clean_unit_tail(tail: str) -> str:
"""Strip trailing punctuation/noise from the captured place name."""
return " ".join(tail.split()).strip(" .,;:'\"")
def find_satuan(text: str) -> str | None:
"""Return the first letterhead match (issuing unit), normalized."""
match = _RE_SATUAN.search(text)
if not match:
return None
return " ".join(match.group(0).split())
"""Return the issuing unit, preferring the most-specific letterhead level.
Polri letterheads are hierarchical (Negara > Daerah > Resor/Sektor). The
actual *issuing* unit is the deepest level present in the letterhead, not
the topmost generic 'KEPOLISIAN NEGARA REPUBLIK INDONESIA' line. We scan
for each level independently and pick the most specific one available;
if only the generic Negara line is present we return that.
Examples
--------
>>> find_satuan("KEPOLISIAN NEGARA REPUBLIK INDONESIA\\n"
... "DAERAH JAWA BARAT\\nRESOR CIMAHI")
'KEPOLISIAN RESOR CIMAHI'
>>> find_satuan("KEPOLISIAN NEGARA REPUBLIK INDONESIA")
'KEPOLISIAN NEGARA REPUBLIK INDONESIA'
"""
# We only look at the document head — letterheads always sit at the
# very top, and constraining the search prevents false positives from
# body text like '... Polres Cimahi ...' deep in a paragraph.
head = "\n".join(text.splitlines()[:25])
sektor = _RE_LEVEL_SEKTOR.search(head)
if sektor:
return f"KEPOLISIAN SEKTOR {_clean_unit_tail(sektor.group(1))}"
resor = _RE_LEVEL_RESOR.search(head)
if resor:
return f"KEPOLISIAN RESOR {_clean_unit_tail(resor.group(1))}"
daerah = _RE_LEVEL_DAERAH.search(head)
if daerah:
return f"KEPOLISIAN DAERAH {_clean_unit_tail(daerah.group(1))}"
if _RE_LEVEL_MABES.search(head):
return "MABES POLRI"
if _RE_LEVEL_NEGARA.search(head):
return "KEPOLISIAN NEGARA REPUBLIK INDONESIA"
return None
def find_perihal(text: str) -> str | None:
"""Return the first 'Perihal: ...' line, trimmed to that line only."""
"""Return the first 'Perihal: ...' line, trimmed to that line only.
Falls back to the first non-empty line under a 'Pertimbangan' label
(a common variant in Polres-level surat sprint that doesn't have a
distinct 'Perihal' field). We deliberately keep this in regex-land
rather than deferring to the LLM because the LLM tends to hallucinate
perihal content from arbitrary paragraphs.
"""
for line in text.splitlines():
m = _RE_PERIHAL.search(line)
if m:
return m.group(1).strip()
lines = text.splitlines()
for idx, line in enumerate(lines):
if _RE_PERTIMBANGAN_LABEL.match(line):
for follow in lines[idx + 1 : idx + 5]:
stripped = follow.strip(" :\t")
if stripped and stripped != ":":
return stripped
break
return None
def _collect_numbered_section(
lines: list[str],
start_idx: int,
terminators: tuple[str, ...],
) -> list[str]:
"""Walk forward from ``start_idx`` collecting numbered list items.
Robust to OCR splitting the number marker onto its own line:
'1.' -> buffer ``pending_index=1``
next non-empty line starts the item body.
Continuation lines (non-empty, no leading number, after a started item)
are appended to the current item. Stops at any line whose uppercase form
starts with one of ``terminators``.
"""
items: list[str] = []
pending_marker = False
blank_run = 0
for raw_line in lines[start_idx:]:
line = raw_line.strip()
upper = line.upper()
if any(upper.startswith(term) for term in terminators):
break
if not line:
blank_run += 1
# Two consecutive blank lines reliably mark the end of a section.
# A single blank line is tolerated because OCR sprinkles them.
if blank_run >= 2 and items and not pending_marker:
break
continue
blank_run = 0
bare = _RE_DASAR_BARE_NUMBER.match(line)
if bare:
pending_marker = True
continue
m = _RE_DASAR_ITEM.match(line)
if m:
items.append(m.group(2).strip())
pending_marker = False
continue
if pending_marker:
items.append(line)
pending_marker = False
continue
if items:
items[-1] = (items[-1] + " " + line).strip()
return items
def find_dasar_list(text: str) -> list[str]:
"""Extract numbered 'Dasar' items from the text.
Heuristic: locate a line containing 'DASAR' (Indonesian: "DASAR :") and
collect subsequent lines that start with a number. Stops at a blank line
or a line beginning with another section header keyword.
delegate to ``_collect_numbered_section`` which handles three OCR
artefacts:
1. Inline numbered items: ``"1. Undang-Undang ..."``.
2. Bare-number lines (the OCR engine puts the number alone on a line):
``"1.\\n Undang-Undang ..."``.
3. Continuation lines (a line that is the wrapped tail of the previous
item gets appended back onto it).
"""
lines = text.splitlines()
items: list[str] = []
in_dasar = False
section_terminators = ("DIPERINTAHKAN", "UNTUK", "DASAR HUKUM", "PERIHAL")
for raw_line in lines:
line = raw_line.strip()
if not in_dasar:
if re.match(r"^\s*DASAR\b", line, re.IGNORECASE):
in_dasar = True
continue
if not line:
if items:
break
continue
upper = line.upper()
if any(upper.startswith(term) for term in section_terminators):
break
m = _RE_DASAR_ITEM.match(line)
if m:
items.append(m.group(2).strip())
elif items:
# continuation of the previous dasar item
items[-1] = (items[-1] + " " + line).strip()
return items
for idx, raw_line in enumerate(lines):
if re.match(r"^\s*DASAR\b", raw_line.strip(), re.IGNORECASE):
return _collect_numbered_section(lines, idx + 1, section_terminators)
return []
def find_untuk_list(text: str) -> list[str]:
"""Extract numbered 'Untuk' / 'DIPERINTAHKAN' bullets from the text.
The 'Untuk' section follows 'DIPERINTAHKAN' / 'Kepada' and lists the
tasks assigned to the personnel. Same OCR shape as Dasar, so we reuse
the collector but with different terminators.
"""
lines = text.splitlines()
# Stop conditions: 'Selesai' (boilerplate), 'Dikeluarkan di' (signature
# block), 'Tembusan' (carbon-copy section).
terminators = ("SELESAI", "DIKELUARKAN", "TEMBUSAN", "PADA TANGGAL")
for idx, raw_line in enumerate(lines):
if re.match(r"^\s*UNTUK\b", raw_line.strip(), re.IGNORECASE):
return _collect_numbered_section(lines, idx + 1, terminators)
return []
def find_signatory(text: str) -> Signatory:

View File

@@ -30,6 +30,13 @@ def validate_personnel_entry(entry: PersonnelEntry) -> list[ReviewFlag]:
flags.append(ReviewFlag.INVALID_NRP)
if entry.pangkat and not is_valid_pangkat(entry.pangkat):
flags.append(ReviewFlag.UNKNOWN_PANGKAT)
# Identification of a personnel row requires at least pangkat OR nrp.
# A row carrying only a name is structurally incomplete - likely a
# mis-aligned table cell or a leaked tembusan/dasar fragment - and must
# be flagged for human review even though pangkat/nrp validation
# individually pass (because they're empty).
if not entry.pangkat and not entry.nrp:
flags.append(ReviewFlag.INCOMPLETE_PERSONNEL_ROW)
return flags