OSINT Investigation Pipeline
The OSINT (Open Source Intelligence) pipeline is the core automated investigation engine. It coordinates multiple AI agents to cross-reference customer-provided documents against public registries, LinkedIn profiles, adverse media sources, and sanctions databases.
Pipeline DAG
The investigation pipeline tracked in agent_progress_service.py now spans 23 pipeline stages across 10 columns (0–9), including a full pre-enrichment phase (vies_validation, northdata_lookup, gleif_check, website_discovery, initial_risk) ahead of the core OSINT stages. The mermaid diagram below shows the core post-pre-enrichment investigation agents. Notable additions since the original design: social_intelligence (BrightData, runs parallel with person/media screening), financial_analysis (Altman/Ohlson/Zmijewski distress models, ADR-0048), verification_checks, gap_analysis, and quality_scorer. MCC classification runs before risk reassessment (after synthesis).
Execution Order
- Document Validator (sequential) -- Validates uploaded documents against template requirements
- Registry Agent (sequential) -- Must complete first to extract director and UBO names
- Person Validation + Adverse Media + Social Intelligence + Financial Analysis (parallel) -- Run concurrently via
asyncio.gather - Synthesis Agent (sequential) -- Combines the parallel agent outputs into a unified risk assessment
- MCC Classifier (sequential) -- Assigns Merchant Category Code based on OSINT findings (moved before risk reassessment as of 2026-03-31, so MCC context is available during EBA scoring)
- Risk Reassessment (sequential) -- Re-runs EBA risk matrix with MCC context
- Verification Checks (sequential) -- Runs
inhoudingsplicht_checkand other verification tools; progress tracked inagent_executions - Quality Scorer (sequential) -- LLM-as-judge scores synthesis report on 4 dimensions; progress tracked in
agent_executions - Inhoudingsplicht Check (sequential, BE only) -- Social/tax debt status via PEPPOL (Belgian cases only)
- Task Generator (sequential) -- Suggests follow-up actions based on the investigation
- Graph ETL (sequential, KYB only) -- Syncs investigation results to Neo4j knowledge graph
Pipeline Strip (Frontend)
The officer dashboard Pipeline Strip shows 8 stages that map to the DAG above:
| Stage | Agents covered |
|---|---|
| Registry | Document Validator + Registry Agent |
| MCC | MCC Classifier |
| Screening | Person Validation + Adverse Media |
| Financial | Risk Reassessment |
| Synthesis | Synthesis Agent |
| Quality | Quality Scorer |
| Tasks | Task Generator |
| Graph | Graph ETL |
Standard Pipeline (Non-Belgian)
For non-Belgian cases, the registry agent uses NorthData MCP tools:
| Step | Agent | Data Source | Output |
|---|---|---|---|
| 1 | Registry Agent | NorthData MCP (Stdio; DACH, NL coverage) | Company status, directors, UBOs, financials |
| 2a | Person Validation | BrightData MCP (StreamableHTTP) | LinkedIn profiles, company-role match, legitimacy scores |
| 2b | Adverse Media | Tavily REST (httpx, no MCP) | Sanctions matches, PEP flags, adverse news articles |
| 2c | Social Intelligence | BrightData MCP (StreamableHTTP) | Social/web presence (LinkedIn, Google Maps, Yahoo Finance) |
| 2d | Financial Analysis | None (deterministic, NBB/NorthData financials) | Ratios, Altman/Ohlson/Zmijewski distress scores, peer benchmarks |
| 3 | Synthesis | None (reasoning only) | Risk score (0.0-1.0), findings, discrepancies, narrative summary |
Belgian Pipeline (4 Official Sources)
When country=BE, the system routes to the Belgian agent, which queries four official data sources:
Belgian Data Sources
| Source | Data Retrieved | Tool Used | Why This Tool |
|---|---|---|---|
| KBO/BCE | Company name, legal form, NACE codes, directors (with roles and mandate dates), establishments | Custom HTML scraper (kbo_service.py) | Dedicated parser for well-known HTML structure |
| Belgian Gazette | Board publications, capital changes, statutory modifications, full publication text, official PDF documents | crawl4ai (crawl4ai_service.py) | Static HTML, no bot protection, crawl4ai works perfectly. Gazette publications include articles of association and statutory documents with downloadable PDFs |
| NBB CBSO | Financial filings, annual accounts CSV, solvency/debt ratios, filing regularity | Direct REST API (nbb_service.py) via httpx | Public REST API behind the Angular SPA -- no scraping needed |
| Inhoudingsplicht | Social debt and tax debt status | PEPPOL (primary) or crawl4ai (fallback) | PEPPOL inhoudingsplicht check is the primary reliable source |
:::note Scraping Tool Selection Each Belgian data source uses the tool best suited to its characteristics. The Belgian Gazette provides comprehensive corporate documentation including articles of association, capital changes, and statutory modifications with full-text content and official PDF downloads. See ADR-0012 (scraping tool selection) for the full tool selection rationale. :::
NBB Financial Data
The NBB service parses CSV financial accounts and extracts key metrics:
| Rubric Code | Metric | Description |
|---|---|---|
| 20/58 | Total Assets | Balance sheet total |
| 10/15 | Equity | Shareholder equity |
| 70 | Revenue | Turnover |
| 9904 | Profit/Loss | Net result |
| 9087 | Employees | Staff count |
Computed ratios:
- Solvency ratio = equity / total_assets (below 0.3 = concerning, below 0.1 = critical)
- Debt ratio = (total_assets - equity) / total_assets
These metrics feed into the FinancialHealthReport model, which is displayed in the dashboard's Financial Health Card.
Deep Gazette Scraping
The gazette scraper uses a two-phase approach:
- Phase 1 -- Search the gazette for the company name, extract publication summaries
- Phase 2 -- Follow up to 5 publication URLs to extract full text content
This provides the synthesis agent with complete publication text (capital changes, director appointments, statutory modifications) rather than just titles.
Evidence Chain
Belgian Evidence Service
Every Belgian data source response is hashed and persisted:
Per-source: SHA-256(json.dumps(data, sort_keys=True))
Bundle: SHA-256(json.dumps({source: hash, ...}, sort_keys=True))
Evidence is stored in two locations:
- PostgreSQL (
belgian_evidencetable) -- source, URL, hash, raw JSON data, timestamp - MinIO -- Archived JSON for long-term storage
The bundle hash creates a tamper-evident fingerprint of the entire evidence collection for a case iteration.
PEPPOL Evidence
The PEPPOL verification service uses the same hashing pattern, with results stored in the peppol_verifications table and an evidence_bundle_hash field.
Cache and Reuse
On follow-up iterations (iteration > 1), the pipeline checks for cached agent outputs from the previous iteration:
if iteration > 1 and case_id and not force_full_investigation:
cached = _load_osint_cache(case_id)
if cached:
registry_data, person_data, media_data, metadata = cached
# Skip agents 1-3, only run Synthesis with new documents
What Gets Cached
Cached to MinIO at {case_id}/osint_cache/:
| File | Content |
|---|---|
registry_output.json | Registry agent structured output |
person_validation_output.json | Person validation results |
adverse_media_output.json | Adverse media screening results |
metadata.json | Cache timestamp and source iteration number |
What Gets Re-Run
The Synthesis agent always re-runs because it needs to incorporate:
- New documents from the latest iteration
- Customer responses to follow-up questions
- The cumulative evidence picture
Cache Bypass
Set force_full_investigation: true in the case's additional_data to force a full re-run of all agents, bypassing the cache.
Pipeline Observability
Each agent reports its status to the agent_executions PostgreSQL table:
| Field | Description |
|---|---|
agent_name | e.g., "registry", "person_validation", "synthesis" |
status | "pending", "running", "success", "failed", "reused" |
started_at / completed_at | Timestamps for duration calculation |
duration_ms | Execution time in milliseconds |
model | LLM model used (e.g., "openai:gpt-5.2") |
findings_count | Number of findings produced |
output_summary | Human-readable summary (e.g., "Found 3 directors, 1 UBO") |
The frontend displays this data as a real-time pipeline visualization (PipelineDAG, AgentCard, PipelineTimingBar components), showing officers which agents are running, completed, or failed.
Network Scan Phase
After the Synthesis agent completes its primary investigation, the EVOI engine initiates a network scan phase that evaluates connected entities discovered via NorthData relatedCompanies.
Trigger
The network scan phase is triggered automatically after synthesis when:
- NorthData returned one or more
relatedCompaniesentries for the subject entity - At least one connected entity has a positive Network EVOI score
Per-Entity Scan
For each entity with a positive Network EVOI decision, the engine runs three checks in parallel:
| Check | Tool | Output |
|---|---|---|
| Company lookup | NorthData API (euId passed as registration ID) | company_status, directors, further relatedCompanies |
| Sanctions screening | OpenSanctions local database (pg_trgm fuzzy match) | sanctions_status, matched entity names |
| Jurisdiction risk | FATF + EU high-risk countries lookup | jurisdiction_risk (LOW / MEDIUM / HIGH / CRITICAL) |
The euId extracted from the relatedCompanies response is passed as the registration ID for the NorthData lookup. This dramatically improves match quality compared to name-based searches, especially for entities with common names.
Timing
Scanning 16 entities typically completes in 35–60 seconds. The bottleneck is NorthData, which is rate-limited to a 2-second interval between requests enforced by a global async rate limiter shared across all concurrent network scans.
Corroboration Analysis
Once all entities are scanned, a cross-network corroboration analysis identifies structural risk patterns:
- Shared directors — the same natural person governing multiple network entities
- Jurisdiction concentration — multiple entities in high-risk jurisdictions
- Dissolved entities — terminated companies in the corporate chain
- Sanctions hits — any sanctions match in the network
Result Propagation
HIGH-severity findings from the network scan cascade to the parent case investigation result. These findings appear in the officer review dashboard alongside primary OSINT findings, clearly attributed to the network scan phase. The OSINT pipeline DAG displays a dedicated "Network Scan" step with real-time progress messages as each entity is evaluated.
Data Merge Points
The OSINT pipeline collects data from multiple independent sources that must be merged at specific points in the pipeline. Director data, financial data, and company identity data each have different merge strategies and timing constraints.
Key merge points:
- Pre-validation director merge -- Registry agent directors + NorthData pre-enrichment directors + UBOs are merged and deduplicated before person validation and adverse media screening run in parallel.
- Post-synthesis financial enrichment -- NorthData API financials and country-specific financial APIs are fetched after synthesis and added to the result.
- Post-OSINT UBO merge -- Document-extracted UBOs are merged into the investigation result at the Temporal workflow level, after the OSINT pipeline completes.
For the full data flow with Mermaid diagrams, merge decision rationale, and documented gaps, see Data Merge Architecture.
Error Handling
The pipeline uses graceful degradation at every level:
- Agent failure -- Each agent wrapper catches exceptions and returns a safe fallback output with an error finding
- Pipeline failure -- The
run_osint_investigationfunction catches top-level exceptions and returns a minimal result with a 0.5 risk score and a recommendation for manual review - Temporal retry -- The activity has a 3-attempt retry policy with exponential backoff
This means a case never gets stuck due to a transient API failure. The worst case is a degraded investigation result that flags the need for manual review.
BrightData MCP Hard Timeouts (2026-04-13)
To prevent worker zombification from unbounded MCP connections (observed
during multi-country E2E validation), three BrightData-backed call sites
now wrap agent.run() inside async with agent: with an
asyncio.wait_for(..., timeout=N):
| Call site | Timeout | Purpose |
|---|---|---|
app.agents.social_intelligence_agent | 180s | Social data (LinkedIn, Google Maps, Yahoo Finance, etc.) |
app.agents.person_validation_agent | 300s | LinkedIn people search, up to 150 tool calls |
app.services.brightdata_enrichment_service.lookup_crunchbase | 90s | Pre-enrichment Crunchbase lookup at case creation |
On timeout or cancellation the agent catches
(asyncio.TimeoutError, asyncio.CancelledError) (not the broad
BaseException the previous implementation used) and returns a
LOW-severity Finding with a dedicated category — e.g.
social_intelligence_timeout, person_validation_timeout. The
MCP client's __aexit__ runs cleanly, releasing the streaming-HTTP
socket; no CLOSE_WAIT leak.
Root cause (commit 1baa085c): PydanticAI's
MCPServerStreamableHTTP held streaming connections open when inner
tasks were cancelled, which caused Temporal activity heartbeats to
lapse and the workflow to transition to FAILED. Observed on
NL/DE/CZ/DK during validation; post-patch 13 consecutive cases completed
cleanly on the same worker process with no leak.
Website Scrape Reuse from Pre-Enrichment
When a website URL is discovered during pre-enrichment (via Tavily search at case creation time), the workflow reuses this URL during the investigation iteration instead of repeating the discovery:
- Pre-enrichment (case creation):
_website_discover()searches Tavily for the company website, stores result inadditional_data.website_discovery - Workflow iteration:
fetch-website-v1reads the storedwebsite_urlfrom the DB viafetch_company_detailsactivity, avoiding a redundant Tavily call - Crawl4ai scrape: Uses the known URL directly, saving 2-5 seconds of discovery time
The website URL from pre-enrichment is auto-selected (first non-directory candidate) and stored in the case's additional_data.website_url field.
Agent Error Tracking in Audit Log
As of 2026-04-06, every agent failure is tracked in two places:
agent_executionstable: Status set to "failed" witherror_messagefield (truncated to 200 chars)- Workflow audit log:
agent_erroraudit event withagent_name,error_type, anderror_message
Agent errors include duration tracking -- the duration_ms field is populated even on failure, enabling analysis of whether timeouts are the primary failure mode.
PydanticAI ModelRetry Quality Gates
Two pipeline agents use PydanticAI's ModelRetry mechanism to enforce output quality:
Synthesis Agent
The synthesis agent validates its output against 5 structural requirements:
- Must contain all 5 sections (Executive Summary, Key Risk Signals, Verification Status, Recommendation, Next Steps)
- Executive Summary must be 2-3 sentences
- Risk signals must not be empty
- Verified items must not appear in the Unverified section
On validation failure, ModelRetry re-runs the LLM with feedback about what was wrong, up to 2 retries. This catches misclassification errors where the LLM puts verified sanctions results under "Unverified" instead of "Verified".
Document Validator
The document validator uses ModelRetry to catch:
- Missing confidence scores
- Generic validation reasons (e.g., "document looks valid")
- Requirement ID mismatches
MCC-Aware License Verification
As of 2026-04-06, the verification checks pipeline includes regulatory license verification that is MCC-aware:
- Vertical resolution: The
resolve_vertical()function determines the business vertical from either the template ID (e.g.,psp_merchant_onboarding->payments) or the MCC code (e.g., MCC 6012 ->payments) - Registry check: For payments/banking verticals, the EBA Payment Institutions Register (CSV) is searched by company name and registration number
- Country routing: National supervisory authority checks are routed by country -- FSMA for Belgium, CNB for Czech Republic
- Circuit breaker protection: The EBA register download uses the
eba_registercircuit breaker to prevent cascading failures
License check results are converted to standard VerificationResult findings and merged into the investigation output.