"""API tests with the OCR engine mocked. These tests do NOT load PaddleOCR — instead they monkeypatch the orchestrator so we can exercise the FastAPI surface without the heavy ML init cost. """ from __future__ import annotations from datetime import date import pytest from fastapi.testclient import TestClient from ocr_sprint.main import create_app from ocr_sprint.pipeline import orchestrator as orch_module from ocr_sprint.pipeline.orchestrator import PipelineOutput from ocr_sprint.schemas.document import DocumentStatus, SourceKind from ocr_sprint.schemas.extraction import ExtractionResult, HeaderFields @pytest.fixture def client() -> TestClient: return TestClient(create_app()) @pytest.fixture def fake_pipeline(monkeypatch: pytest.MonkeyPatch) -> PipelineOutput: """Patch run_pipeline everywhere it's referenced.""" fake_result = ExtractionResult( header=HeaderFields( nomor_sprint="Sprin/1/I/2025", tanggal=date(2025, 1, 1), satuan_penerbit="POLRES TEST", ), confidence=0.97, ) fake_output = PipelineOutput( source_kind=SourceKind.PDF, status=DocumentStatus.COMPLETED, confidence=0.97, result=fake_result, ) def _fake_run(_content: bytes) -> PipelineOutput: return fake_output monkeypatch.setattr(orch_module, "run_pipeline", _fake_run) from ocr_sprint.api.routes import documents as docs_module monkeypatch.setattr(docs_module, "run_pipeline", _fake_run) from ocr_sprint.worker import tasks as tasks_module monkeypatch.setattr(tasks_module, "run_pipeline", _fake_run) return fake_output def test_health_endpoint(client: TestClient) -> None: response = client.get("/api/v1/health") assert response.status_code == 200 assert response.json()["status"] == "ok" def test_documents_rejects_empty_upload(client: TestClient) -> None: response = client.post( "/api/v1/documents", files={"file": ("empty.pdf", b"", "application/pdf")}, ) assert response.status_code == 400 def test_documents_sync_returns_pipeline_output( client: TestClient, fake_pipeline: PipelineOutput, ) -> None: response = client.post( "/api/v1/documents?sync=true", files={"file": ("x.pdf", b"%PDF-1.4\n%fake", "application/pdf")}, ) assert response.status_code == 200 body = response.json() assert body["status"] == "completed" assert body["confidence"] == 0.97 assert body["data"]["header"]["nomor_sprint"] == "Sprin/1/I/2025" def test_documents_async_returns_202_then_polls_to_completion( client: TestClient, fake_pipeline: PipelineOutput, ) -> None: """Default flow: POST returns 202, GET returns the eventual completion. With CELERY_TASK_ALWAYS_EAGER set in conftest, the worker runs inline, so by the time POST returns the task has already finished and GET sees a `completed` row. """ post = client.post( "/api/v1/documents", files={"file": ("x.pdf", b"%PDF-1.4\n%fake", "application/pdf")}, ) assert post.status_code == 202 job_id = post.json()["job_id"] get = client.get(f"/api/v1/documents/{job_id}") assert get.status_code == 200 body = get.json() assert body["status"] == "completed" assert body["confidence"] == 0.97 def test_documents_defaults_to_sync_when_queue_disabled( client: TestClient, fake_pipeline: PipelineOutput, monkeypatch: pytest.MonkeyPatch, ) -> None: """Regression: with ``QUEUE_ENABLED=false`` the route must NOT enqueue, otherwise a default install with no Redis returns 500. """ monkeypatch.setenv("QUEUE_ENABLED", "false") from ocr_sprint.config import get_settings get_settings.cache_clear() # Pretend the broker is unreachable; if the route still enqueues, the # call would blow up here. def _no_broker(_self: object, *_args: object, **_kwargs: object) -> None: raise AssertionError("queue path taken when queue is disabled") from ocr_sprint.worker import tasks as task_module monkeypatch.setattr(task_module.process_document_task, "delay", _no_broker) post = client.post( "/api/v1/documents", files={"file": ("x.pdf", b"%PDF-1.4\n%fake", "application/pdf")}, ) assert post.status_code == 200, post.text body = post.json() assert body["status"] == "completed" def test_documents_get_unknown_id_returns_404(client: TestClient) -> None: response = client.get("/api/v1/documents/00000000-0000-0000-0000-000000000000") assert response.status_code == 404 def test_documents_async_marks_failed_on_pipeline_error( client: TestClient, monkeypatch: pytest.MonkeyPatch, ) -> None: def _explode(_content: bytes) -> PipelineOutput: raise RuntimeError("boom") from ocr_sprint.worker import tasks as tasks_module monkeypatch.setattr(tasks_module, "run_pipeline", _explode) post = client.post( "/api/v1/documents", files={"file": ("x.pdf", b"%PDF-1.4\n%fake", "application/pdf")}, ) assert post.status_code == 202 job_id = post.json()["job_id"] get = client.get(f"/api/v1/documents/{job_id}") body = get.json() assert body["status"] == "failed" assert "boom" in (body.get("error") or "") def test_documents_sync_persists_failed_row_when_pipeline_raises( client: TestClient, monkeypatch: pytest.MonkeyPatch, ) -> None: """Regression: an exception in the sync pipeline must NOT roll back the pending row + ``mark_failed`` write. Otherwise the blob on disk has no DB record pointing at it. """ def _explode(_content: bytes) -> PipelineOutput: raise RuntimeError("kapow") from ocr_sprint.api.routes import documents as docs_module monkeypatch.setattr(docs_module, "run_pipeline", _explode) # ``raise_server_exceptions=False`` lets the test see the 500 response # rather than re-raising the underlying RuntimeError from the route. silent = TestClient(client.app, raise_server_exceptions=False) post = silent.post( "/api/v1/documents?sync=true", files={"file": ("x.pdf", b"%PDF-1.4\n%fake", "application/pdf")}, ) assert post.status_code == 500 # The row must still be visible to GET, with status=failed. from ocr_sprint.db.base import session_scope from ocr_sprint.db.repositories import JobRepository with session_scope() as session: # Find the most recent row. from ocr_sprint.db.models import JobRow row = session.query(JobRow).order_by(JobRow.created_at.desc()).first() assert row is not None, "create() must persist even when pipeline blows up" assert row.status == "failed" assert "kapow" in (row.error or "") assert row.blob_key # blob is referenced — not orphaned # GET must surface the failure too (this is the client-visible contract). get = client.get(f"/api/v1/documents/{row.job_id}") assert get.status_code == 200 assert get.json()["status"] == "failed" assert JobRepository # silence import-only warning def test_metrics_endpoint_exposes_request_counter( client: TestClient, fake_pipeline: PipelineOutput, ) -> None: client.post( "/api/v1/documents?sync=true", files={"file": ("x.pdf", b"%PDF-1.4\n%fake", "application/pdf")}, ) metrics = client.get("/metrics") assert metrics.status_code == 200 body = metrics.text assert "http_requests_total" in body assert "ocr_jobs_total" in body def test_metrics_jobs_total_reflects_worker_writes( client: TestClient, fake_pipeline: PipelineOutput, ) -> None: """Regression: when the worker (eager mode here) marks a job complete, /metrics must reflect that — the previous Counter-based implementation would have stayed at zero because the worker's increments don't reach the API process's in-memory registry. """ post = client.post( "/api/v1/documents", files={"file": ("x.pdf", b"%PDF-1.4\n%fake", "application/pdf")}, ) assert post.status_code == 202 body = client.get("/metrics").text # ``ocr_jobs_total{status="completed"} 1.0`` — exact match to make sure # the gauge-style metric is being populated from the DB. assert 'ocr_jobs_total{status="completed"} 1.0' in body