Skip to main content

OSINT Investigation Pipeline

The OSINT (Open Source Intelligence) pipeline is the core automated investigation engine. It coordinates multiple AI agents to cross-reference customer-provided documents against public registries, LinkedIn profiles, adverse media sources, and sanctions databases.

Pipeline DAG

The investigation pipeline tracked in agent_progress_service.py now spans 23 pipeline stages across 10 columns (0–9), including a full pre-enrichment phase (vies_validation, northdata_lookup, gleif_check, website_discovery, initial_risk) ahead of the core OSINT stages. The mermaid diagram below shows the core post-pre-enrichment investigation agents. Notable additions since the original design: social_intelligence (BrightData, runs parallel with person/media screening), financial_analysis (Altman/Ohlson/Zmijewski distress models, ADR-0048), verification_checks, gap_analysis, and quality_scorer. MCC classification runs before risk reassessment (after synthesis).

Execution Order

  1. Document Validator (sequential) -- Validates uploaded documents against template requirements
  2. Registry Agent (sequential) -- Must complete first to extract director and UBO names
  3. Person Validation + Adverse Media + Social Intelligence + Financial Analysis (parallel) -- Run concurrently via asyncio.gather
  4. Synthesis Agent (sequential) -- Combines the parallel agent outputs into a unified risk assessment
  5. MCC Classifier (sequential) -- Assigns Merchant Category Code based on OSINT findings (moved before risk reassessment as of 2026-03-31, so MCC context is available during EBA scoring)
  6. Risk Reassessment (sequential) -- Re-runs EBA risk matrix with MCC context
  7. Verification Checks (sequential) -- Runs inhoudingsplicht_check and other verification tools; progress tracked in agent_executions
  8. Quality Scorer (sequential) -- LLM-as-judge scores synthesis report on 4 dimensions; progress tracked in agent_executions
  9. Inhoudingsplicht Check (sequential, BE only) -- Social/tax debt status via PEPPOL (Belgian cases only)
  10. Task Generator (sequential) -- Suggests follow-up actions based on the investigation
  11. Graph ETL (sequential, KYB only) -- Syncs investigation results to Neo4j knowledge graph

Pipeline Strip (Frontend)

The officer dashboard Pipeline Strip shows 8 stages that map to the DAG above:

StageAgents covered
RegistryDocument Validator + Registry Agent
MCCMCC Classifier
ScreeningPerson Validation + Adverse Media
FinancialRisk Reassessment
SynthesisSynthesis Agent
QualityQuality Scorer
TasksTask Generator
GraphGraph ETL

Standard Pipeline (Non-Belgian)

For non-Belgian cases, the registry agent uses NorthData MCP tools:

StepAgentData SourceOutput
1Registry AgentNorthData MCP (Stdio; DACH, NL coverage)Company status, directors, UBOs, financials
2aPerson ValidationBrightData MCP (StreamableHTTP)LinkedIn profiles, company-role match, legitimacy scores
2bAdverse MediaTavily REST (httpx, no MCP)Sanctions matches, PEP flags, adverse news articles
2cSocial IntelligenceBrightData MCP (StreamableHTTP)Social/web presence (LinkedIn, Google Maps, Yahoo Finance)
2dFinancial AnalysisNone (deterministic, NBB/NorthData financials)Ratios, Altman/Ohlson/Zmijewski distress scores, peer benchmarks
3SynthesisNone (reasoning only)Risk score (0.0-1.0), findings, discrepancies, narrative summary

Belgian Pipeline (4 Official Sources)

When country=BE, the system routes to the Belgian agent, which queries four official data sources:

Belgian Data Sources

SourceData RetrievedTool UsedWhy This Tool
KBO/BCECompany name, legal form, NACE codes, directors (with roles and mandate dates), establishmentsCustom HTML scraper (kbo_service.py)Dedicated parser for well-known HTML structure
Belgian GazetteBoard publications, capital changes, statutory modifications, full publication text, official PDF documentscrawl4ai (crawl4ai_service.py)Static HTML, no bot protection, crawl4ai works perfectly. Gazette publications include articles of association and statutory documents with downloadable PDFs
NBB CBSOFinancial filings, annual accounts CSV, solvency/debt ratios, filing regularityDirect REST API (nbb_service.py) via httpxPublic REST API behind the Angular SPA -- no scraping needed
InhoudingsplichtSocial debt and tax debt statusPEPPOL (primary) or crawl4ai (fallback)PEPPOL inhoudingsplicht check is the primary reliable source

:::note Scraping Tool Selection Each Belgian data source uses the tool best suited to its characteristics. The Belgian Gazette provides comprehensive corporate documentation including articles of association, capital changes, and statutory modifications with full-text content and official PDF downloads. See ADR-0012 (scraping tool selection) for the full tool selection rationale. :::

NBB Financial Data

The NBB service parses CSV financial accounts and extracts key metrics:

Rubric CodeMetricDescription
20/58Total AssetsBalance sheet total
10/15EquityShareholder equity
70RevenueTurnover
9904Profit/LossNet result
9087EmployeesStaff count

Computed ratios:

  • Solvency ratio = equity / total_assets (below 0.3 = concerning, below 0.1 = critical)
  • Debt ratio = (total_assets - equity) / total_assets

These metrics feed into the FinancialHealthReport model, which is displayed in the dashboard's Financial Health Card.

Deep Gazette Scraping

The gazette scraper uses a two-phase approach:

  1. Phase 1 -- Search the gazette for the company name, extract publication summaries
  2. Phase 2 -- Follow up to 5 publication URLs to extract full text content

This provides the synthesis agent with complete publication text (capital changes, director appointments, statutory modifications) rather than just titles.

Evidence Chain

Belgian Evidence Service

Every Belgian data source response is hashed and persisted:

Per-source: SHA-256(json.dumps(data, sort_keys=True))
Bundle: SHA-256(json.dumps({source: hash, ...}, sort_keys=True))

Evidence is stored in two locations:

  1. PostgreSQL (belgian_evidence table) -- source, URL, hash, raw JSON data, timestamp
  2. MinIO -- Archived JSON for long-term storage

The bundle hash creates a tamper-evident fingerprint of the entire evidence collection for a case iteration.

PEPPOL Evidence

The PEPPOL verification service uses the same hashing pattern, with results stored in the peppol_verifications table and an evidence_bundle_hash field.

Cache and Reuse

On follow-up iterations (iteration > 1), the pipeline checks for cached agent outputs from the previous iteration:

if iteration > 1 and case_id and not force_full_investigation:
cached = _load_osint_cache(case_id)
if cached:
registry_data, person_data, media_data, metadata = cached
# Skip agents 1-3, only run Synthesis with new documents

What Gets Cached

Cached to MinIO at {case_id}/osint_cache/:

FileContent
registry_output.jsonRegistry agent structured output
person_validation_output.jsonPerson validation results
adverse_media_output.jsonAdverse media screening results
metadata.jsonCache timestamp and source iteration number

What Gets Re-Run

The Synthesis agent always re-runs because it needs to incorporate:

  • New documents from the latest iteration
  • Customer responses to follow-up questions
  • The cumulative evidence picture

Cache Bypass

Set force_full_investigation: true in the case's additional_data to force a full re-run of all agents, bypassing the cache.

Pipeline Observability

Each agent reports its status to the agent_executions PostgreSQL table:

FieldDescription
agent_namee.g., "registry", "person_validation", "synthesis"
status"pending", "running", "success", "failed", "reused"
started_at / completed_atTimestamps for duration calculation
duration_msExecution time in milliseconds
modelLLM model used (e.g., "openai:gpt-5.2")
findings_countNumber of findings produced
output_summaryHuman-readable summary (e.g., "Found 3 directors, 1 UBO")

The frontend displays this data as a real-time pipeline visualization (PipelineDAG, AgentCard, PipelineTimingBar components), showing officers which agents are running, completed, or failed.

Network Scan Phase

After the Synthesis agent completes its primary investigation, the EVOI engine initiates a network scan phase that evaluates connected entities discovered via NorthData relatedCompanies.

Trigger

The network scan phase is triggered automatically after synthesis when:

  • NorthData returned one or more relatedCompanies entries for the subject entity
  • At least one connected entity has a positive Network EVOI score

Per-Entity Scan

For each entity with a positive Network EVOI decision, the engine runs three checks in parallel:

CheckToolOutput
Company lookupNorthData API (euId passed as registration ID)company_status, directors, further relatedCompanies
Sanctions screeningOpenSanctions local database (pg_trgm fuzzy match)sanctions_status, matched entity names
Jurisdiction riskFATF + EU high-risk countries lookupjurisdiction_risk (LOW / MEDIUM / HIGH / CRITICAL)

The euId extracted from the relatedCompanies response is passed as the registration ID for the NorthData lookup. This dramatically improves match quality compared to name-based searches, especially for entities with common names.

Timing

Scanning 16 entities typically completes in 35–60 seconds. The bottleneck is NorthData, which is rate-limited to a 2-second interval between requests enforced by a global async rate limiter shared across all concurrent network scans.

Corroboration Analysis

Once all entities are scanned, a cross-network corroboration analysis identifies structural risk patterns:

  • Shared directors — the same natural person governing multiple network entities
  • Jurisdiction concentration — multiple entities in high-risk jurisdictions
  • Dissolved entities — terminated companies in the corporate chain
  • Sanctions hits — any sanctions match in the network

Result Propagation

HIGH-severity findings from the network scan cascade to the parent case investigation result. These findings appear in the officer review dashboard alongside primary OSINT findings, clearly attributed to the network scan phase. The OSINT pipeline DAG displays a dedicated "Network Scan" step with real-time progress messages as each entity is evaluated.


Data Merge Points

The OSINT pipeline collects data from multiple independent sources that must be merged at specific points in the pipeline. Director data, financial data, and company identity data each have different merge strategies and timing constraints.

Key merge points:

  • Pre-validation director merge -- Registry agent directors + NorthData pre-enrichment directors + UBOs are merged and deduplicated before person validation and adverse media screening run in parallel.
  • Post-synthesis financial enrichment -- NorthData API financials and country-specific financial APIs are fetched after synthesis and added to the result.
  • Post-OSINT UBO merge -- Document-extracted UBOs are merged into the investigation result at the Temporal workflow level, after the OSINT pipeline completes.

For the full data flow with Mermaid diagrams, merge decision rationale, and documented gaps, see Data Merge Architecture.


Error Handling

The pipeline uses graceful degradation at every level:

  1. Agent failure -- Each agent wrapper catches exceptions and returns a safe fallback output with an error finding
  2. Pipeline failure -- The run_osint_investigation function catches top-level exceptions and returns a minimal result with a 0.5 risk score and a recommendation for manual review
  3. Temporal retry -- The activity has a 3-attempt retry policy with exponential backoff

This means a case never gets stuck due to a transient API failure. The worst case is a degraded investigation result that flags the need for manual review.


BrightData MCP Hard Timeouts (2026-04-13)

To prevent worker zombification from unbounded MCP connections (observed during multi-country E2E validation), three BrightData-backed call sites now wrap agent.run() inside async with agent: with an asyncio.wait_for(..., timeout=N):

Call siteTimeoutPurpose
app.agents.social_intelligence_agent180sSocial data (LinkedIn, Google Maps, Yahoo Finance, etc.)
app.agents.person_validation_agent300sLinkedIn people search, up to 150 tool calls
app.services.brightdata_enrichment_service.lookup_crunchbase90sPre-enrichment Crunchbase lookup at case creation

On timeout or cancellation the agent catches (asyncio.TimeoutError, asyncio.CancelledError) (not the broad BaseException the previous implementation used) and returns a LOW-severity Finding with a dedicated category — e.g. social_intelligence_timeout, person_validation_timeout. The MCP client's __aexit__ runs cleanly, releasing the streaming-HTTP socket; no CLOSE_WAIT leak.

Root cause (commit 1baa085c): PydanticAI's MCPServerStreamableHTTP held streaming connections open when inner tasks were cancelled, which caused Temporal activity heartbeats to lapse and the workflow to transition to FAILED. Observed on NL/DE/CZ/DK during validation; post-patch 13 consecutive cases completed cleanly on the same worker process with no leak.


Website Scrape Reuse from Pre-Enrichment

When a website URL is discovered during pre-enrichment (via Tavily search at case creation time), the workflow reuses this URL during the investigation iteration instead of repeating the discovery:

  1. Pre-enrichment (case creation): _website_discover() searches Tavily for the company website, stores result in additional_data.website_discovery
  2. Workflow iteration: fetch-website-v1 reads the stored website_url from the DB via fetch_company_details activity, avoiding a redundant Tavily call
  3. Crawl4ai scrape: Uses the known URL directly, saving 2-5 seconds of discovery time

The website URL from pre-enrichment is auto-selected (first non-directory candidate) and stored in the case's additional_data.website_url field.


Agent Error Tracking in Audit Log

As of 2026-04-06, every agent failure is tracked in two places:

  1. agent_executions table: Status set to "failed" with error_message field (truncated to 200 chars)
  2. Workflow audit log: agent_error audit event with agent_name, error_type, and error_message

Agent errors include duration tracking -- the duration_ms field is populated even on failure, enabling analysis of whether timeouts are the primary failure mode.


PydanticAI ModelRetry Quality Gates

Two pipeline agents use PydanticAI's ModelRetry mechanism to enforce output quality:

Synthesis Agent

The synthesis agent validates its output against 5 structural requirements:

  • Must contain all 5 sections (Executive Summary, Key Risk Signals, Verification Status, Recommendation, Next Steps)
  • Executive Summary must be 2-3 sentences
  • Risk signals must not be empty
  • Verified items must not appear in the Unverified section

On validation failure, ModelRetry re-runs the LLM with feedback about what was wrong, up to 2 retries. This catches misclassification errors where the LLM puts verified sanctions results under "Unverified" instead of "Verified".

Document Validator

The document validator uses ModelRetry to catch:

  • Missing confidence scores
  • Generic validation reasons (e.g., "document looks valid")
  • Requirement ID mismatches

MCC-Aware License Verification

As of 2026-04-06, the verification checks pipeline includes regulatory license verification that is MCC-aware:

  1. Vertical resolution: The resolve_vertical() function determines the business vertical from either the template ID (e.g., psp_merchant_onboarding -> payments) or the MCC code (e.g., MCC 6012 -> payments)
  2. Registry check: For payments/banking verticals, the EBA Payment Institutions Register (CSV) is searched by company name and registration number
  3. Country routing: National supervisory authority checks are routed by country -- FSMA for Belgium, CNB for Czech Republic
  4. Circuit breaker protection: The EBA register download uses the eba_register circuit breaker to prevent cascading failures

License check results are converted to standard VerificationResult findings and merged into the investigation output.