Phase 2: document detection + perspective correction + shadow removal
Adds OpenCV-based phone-photo handling that runs before the standard preprocessing pipeline for IMAGE source kinds (PDF renders are flat by construction and skip this stage). Pipeline additions in src/ocr_sprint/pipeline/document_detect.py: - _find_document_quad: Canny + dilate + contour search, picks the largest convex 4-point polygon above a configurable area threshold; fails gracefully and returns None when no usable quad is found. - _four_point_warp: orders corners (TL/TR/BR/BL via sum/diff trick) and runs cv2.getPerspectiveTransform + warpPerspective. - _remove_shadow: per-channel background-division (dilate + median blur + 255 - absdiff + normalize) for uneven phone-shot lighting. - detect_and_correct: top-level entrypoint with graceful fallback to the original image when detection fails. Wired into the synchronous orchestrator: only enabled for IMAGE sources, skipped for PDF. New settings: - preprocess_detect_document (default: true) - preprocess_remove_shadow (default: true) - preprocess_min_quad_area_fraction (default: 0.20) Tests: 9 new unit tests covering corner ordering, quad detection on synthetic skewed documents, perspective warp output sanity, shadow removal shape preservation, full-pipeline behavior, and graceful fallback when detection fails. 70 tests total, all green. ML-based dewarping (DewarpNet) and DocTR detector are deferred to a future phase per the roadmap; the existing API is structured so they can be added as alternative backends behind DocumentDetectConfig. Co-authored-by: adrian kuman firmansah <adriancuman@gmail.com>
This commit is contained in:
@@ -42,6 +42,11 @@ class Settings(BaseSettings):
|
||||
preprocess_deskew: bool = True
|
||||
preprocess_adaptive_threshold: bool = False
|
||||
|
||||
# Document detection (Phase 2) — applied to IMAGE sources only
|
||||
preprocess_detect_document: bool = True
|
||||
preprocess_remove_shadow: bool = True
|
||||
preprocess_min_quad_area_fraction: float = Field(0.20, ge=0.0, le=1.0)
|
||||
|
||||
# Confidence thresholds (Phase 5 routing)
|
||||
confidence_auto_approve: float = Field(0.95, ge=0.0, le=1.0)
|
||||
confidence_needs_review: float = Field(0.85, ge=0.0, le=1.0)
|
||||
|
||||
205
src/ocr_sprint/pipeline/document_detect.py
Normal file
205
src/ocr_sprint/pipeline/document_detect.py
Normal file
@@ -0,0 +1,205 @@
|
||||
"""Phase 2 — document detection + perspective correction + shadow removal.
|
||||
|
||||
Targets phone photos of surat sprint where the page is shot at an angle, with
|
||||
uneven lighting, and not perfectly centered. The pipeline below uses pure
|
||||
OpenCV (no ML model dependency) to:
|
||||
|
||||
1. detect the four corners of the document inside the image,
|
||||
2. apply a perspective transform to obtain a flat top-down rectangle,
|
||||
3. remove shadows via morphological background division.
|
||||
|
||||
Failure mode is **graceful**: if no usable document quadrilateral is found,
|
||||
we return the original image untouched and log a warning. The downstream
|
||||
`preprocess` stage will still run.
|
||||
|
||||
Future work (tracked in docs/architecture.md):
|
||||
- swap the contour heuristic for a DocTR / MobileSAM model when accuracy
|
||||
on real Polri photos isn't enough,
|
||||
- add full ML-based dewarping (DewarpNet) for curved/folded pages.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
|
||||
from ocr_sprint.pipeline.ingest import NDArrayU8
|
||||
from ocr_sprint.utils.logging import get_logger
|
||||
|
||||
_logger = get_logger(__name__)
|
||||
|
||||
# Internal working size for contour detection. Smaller = faster + less noise.
|
||||
_DETECT_HEIGHT = 500
|
||||
|
||||
# Reject any candidate quad whose area is below this fraction of the image —
|
||||
# they're almost certainly not the document but logos, stamps, or text blocks.
|
||||
_MIN_AREA_FRACTION = 0.20
|
||||
|
||||
# Polygon-approximation epsilon (% of perimeter). 2% works well for paper.
|
||||
_POLY_EPSILON = 0.02
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class DocumentDetectConfig:
|
||||
"""Tunable knobs for document detection."""
|
||||
|
||||
detect_document: bool = True
|
||||
remove_shadow: bool = True
|
||||
min_area_fraction: float = _MIN_AREA_FRACTION
|
||||
|
||||
|
||||
# ---------- public surface ----------
|
||||
|
||||
|
||||
def detect_and_correct(
|
||||
img: NDArrayU8,
|
||||
cfg: DocumentDetectConfig | None = None,
|
||||
) -> NDArrayU8:
|
||||
"""Run detection + perspective correction + shadow removal in one shot.
|
||||
|
||||
Returns either the warped + cleaned image, or the original if detection
|
||||
failed. Always returns a BGR uint8 ndarray.
|
||||
"""
|
||||
if cfg is None:
|
||||
cfg = DocumentDetectConfig()
|
||||
|
||||
out = img
|
||||
if cfg.detect_document:
|
||||
quad = _find_document_quad(img, min_area_fraction=cfg.min_area_fraction)
|
||||
if quad is not None:
|
||||
out = _four_point_warp(img, quad)
|
||||
_logger.info("document_detect.warped", shape=out.shape[:2])
|
||||
else:
|
||||
_logger.info("document_detect.no_quad_found", shape=img.shape[:2])
|
||||
|
||||
if cfg.remove_shadow:
|
||||
out = _remove_shadow(out)
|
||||
|
||||
return out
|
||||
|
||||
|
||||
# ---------- corner detection ----------
|
||||
|
||||
|
||||
def _find_document_quad(
|
||||
img: NDArrayU8,
|
||||
min_area_fraction: float = _MIN_AREA_FRACTION,
|
||||
) -> NDArrayU8 | None:
|
||||
"""Locate the document quadrilateral; return 4x2 corners or None."""
|
||||
h_orig, w_orig = img.shape[:2]
|
||||
if h_orig < 50 or w_orig < 50:
|
||||
return None
|
||||
|
||||
scale = _DETECT_HEIGHT / float(h_orig)
|
||||
if scale >= 1.0:
|
||||
small = img
|
||||
scale = 1.0
|
||||
else:
|
||||
small = cv2.resize(
|
||||
img,
|
||||
(round(w_orig * scale), _DETECT_HEIGHT),
|
||||
interpolation=cv2.INTER_AREA,
|
||||
)
|
||||
|
||||
gray = cv2.cvtColor(small, cv2.COLOR_BGR2GRAY)
|
||||
gray = cv2.bilateralFilter(gray, 9, 75, 75)
|
||||
edges = cv2.Canny(gray, 60, 180)
|
||||
# close small gaps so contours are continuous
|
||||
edges = cv2.dilate(edges, np.ones((3, 3), np.uint8), iterations=2)
|
||||
|
||||
contours, _ = cv2.findContours(edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
||||
if not contours:
|
||||
return None
|
||||
|
||||
image_area = float(small.shape[0] * small.shape[1])
|
||||
candidates = sorted(contours, key=cv2.contourArea, reverse=True)[:5]
|
||||
|
||||
for contour in candidates:
|
||||
perimeter = cv2.arcLength(contour, True)
|
||||
approx = cv2.approxPolyDP(contour, _POLY_EPSILON * perimeter, True)
|
||||
if len(approx) != 4:
|
||||
continue
|
||||
if not cv2.isContourConvex(approx):
|
||||
continue
|
||||
area = cv2.contourArea(approx)
|
||||
if area < min_area_fraction * image_area:
|
||||
return None # remaining contours are smaller — give up early
|
||||
quad = approx.reshape(4, 2).astype(np.float32) / scale
|
||||
return quad
|
||||
|
||||
return None
|
||||
|
||||
|
||||
# ---------- perspective transform ----------
|
||||
|
||||
|
||||
def _order_corners(pts: NDArrayU8) -> NDArrayU8:
|
||||
"""Return points sorted as (top-left, top-right, bottom-right, bottom-left)."""
|
||||
rect = np.zeros((4, 2), dtype=np.float32)
|
||||
s = pts.sum(axis=1)
|
||||
diff = np.diff(pts, axis=1)
|
||||
rect[0] = pts[np.argmin(s)] # smallest sum → top-left
|
||||
rect[2] = pts[np.argmax(s)] # largest sum → bottom-right
|
||||
rect[1] = pts[np.argmin(diff)] # smallest diff → top-right
|
||||
rect[3] = pts[np.argmax(diff)] # largest diff → bottom-left
|
||||
return rect
|
||||
|
||||
|
||||
def _four_point_warp(img: NDArrayU8, quad: NDArrayU8) -> NDArrayU8:
|
||||
"""Warp the image to a top-down view of the detected quadrilateral."""
|
||||
rect = _order_corners(quad)
|
||||
tl, tr, br, bl = rect
|
||||
|
||||
width_top = float(np.linalg.norm(tr - tl))
|
||||
width_bot = float(np.linalg.norm(br - bl))
|
||||
height_left = float(np.linalg.norm(bl - tl))
|
||||
height_right = float(np.linalg.norm(br - tr))
|
||||
|
||||
max_width = round(max(width_top, width_bot))
|
||||
max_height = round(max(height_left, height_right))
|
||||
if max_width <= 1 or max_height <= 1:
|
||||
return img
|
||||
|
||||
dst = np.array(
|
||||
[
|
||||
[0, 0],
|
||||
[max_width - 1, 0],
|
||||
[max_width - 1, max_height - 1],
|
||||
[0, max_height - 1],
|
||||
],
|
||||
dtype=np.float32,
|
||||
)
|
||||
matrix = cv2.getPerspectiveTransform(rect, dst)
|
||||
return cv2.warpPerspective(img, matrix, (max_width, max_height))
|
||||
|
||||
|
||||
# ---------- shadow removal ----------
|
||||
|
||||
|
||||
def _remove_shadow(img: NDArrayU8) -> NDArrayU8:
|
||||
"""Background-division shadow removal applied per channel.
|
||||
|
||||
Idea: dilate + median-blur the channel to estimate the local background;
|
||||
subtract from 255 minus the absolute diff to flatten lighting; normalize
|
||||
back to 0-255. Cheap and surprisingly effective on phone shots.
|
||||
"""
|
||||
planes = cv2.split(img)
|
||||
cleaned: list[NDArrayU8] = []
|
||||
kernel = np.ones((7, 7), np.uint8)
|
||||
for plane in planes:
|
||||
dilated = cv2.dilate(plane, kernel)
|
||||
bg = cv2.medianBlur(dilated, 21)
|
||||
diff: NDArrayU8 = (255 - cv2.absdiff(plane, bg)).astype(np.uint8)
|
||||
# cv2 stubs reject None dst even though the runtime accepts it.
|
||||
norm = cv2.normalize( # type: ignore[call-overload]
|
||||
diff,
|
||||
None,
|
||||
alpha=0,
|
||||
beta=255,
|
||||
norm_type=cv2.NORM_MINMAX,
|
||||
dtype=cv2.CV_8U,
|
||||
)
|
||||
cleaned.append(norm)
|
||||
return cv2.merge(cleaned)
|
||||
@@ -14,6 +14,7 @@ from dataclasses import dataclass
|
||||
|
||||
from ocr_sprint.config import get_settings
|
||||
from ocr_sprint.pipeline.confidence import compute_confidence, route
|
||||
from ocr_sprint.pipeline.document_detect import DocumentDetectConfig, detect_and_correct
|
||||
from ocr_sprint.pipeline.extract.regex_rules import extract_header, find_signatory
|
||||
from ocr_sprint.pipeline.extract.validators import validate_extraction
|
||||
from ocr_sprint.pipeline.ingest import detect_source_kind, ingest
|
||||
@@ -56,10 +57,18 @@ def run_pipeline(content: bytes) -> PipelineOutput:
|
||||
deskew=s.preprocess_deskew,
|
||||
adaptive_threshold=s.preprocess_adaptive_threshold,
|
||||
)
|
||||
# Document detection only makes sense on photographed images. PDF renders
|
||||
# are already flat by construction, so we skip the heavy quad search there.
|
||||
detect_cfg = DocumentDetectConfig(
|
||||
detect_document=s.preprocess_detect_document and kind == SourceKind.IMAGE,
|
||||
remove_shadow=s.preprocess_remove_shadow and kind == SourceKind.IMAGE,
|
||||
min_area_fraction=s.preprocess_min_quad_area_fraction,
|
||||
)
|
||||
|
||||
ocr_pages: list[OCRPage] = []
|
||||
for page in pages:
|
||||
cleaned = preprocess(page.image, pre_cfg)
|
||||
corrected = detect_and_correct(page.image, detect_cfg)
|
||||
cleaned = preprocess(corrected, pre_cfg)
|
||||
ocr_pages.append(run_ocr(cleaned))
|
||||
|
||||
full_text = "\n".join(p.text for p in ocr_pages)
|
||||
|
||||
Reference in New Issue
Block a user