Skip to main content

Atlas Adoption

This page documents the 8 services adopted from a co-founder's parallel Atlas codebase into Trust Relay. Each service was analysed, extracted, and reimplemented in Trust Relay's tech stack (PydanticAI, SQLAlchemy, Alembic, Next.js/shadcn) — no foreign code was introduced. The source analysis is at docs/research/atlas-extraction/ and the design specification is at docs/plans/2026-03-31-atlas-adoption-roadmap-design.md.


Design Principle

Adopt the pattern, not the code. Atlas uses LangChain/LangGraph, Flyway, asyncpg, and Blueprint.js. Trust Relay uses PydanticAI, Alembic, SQLAlchemy, and Next.js. Every item is a reimplementation following existing Trust Relay conventions, not a port.


Architecture Overview — 4 Waves

The 8 services are organised into 4 dependency-ordered waves. Later waves depend on earlier ones: the EBA risk matrix (Wave 3) performs country risk lookups against the reference data loaded in Wave 1, and the survivorship resolver (Wave 4) consumes entity matches produced in Wave 3.

WaveGoalServices
Wave 1Structured reference data, quality measurement, cost-optimised modelsReferenceDataService, QualityScorer, ModelTiers
Wave 2Regulatory citations in prompts, segment-specific severityPrompt templates (not standalone services — see app/prompts/)
Wave 3Regulatory-grade risk scoring, auditable event chains, entity deduplicationEBARiskMatrix, DomainEvents, EntityMatcher
Wave 4Provable data quality, declarative workflow typesSurvivorshipResolver, WorkflowSchema

Wave 1 Services

ReferenceDataService

backend/app/services/reference_data_service.py

A singleton that loads 12 JSON compliance datasets at startup from backend/config/reference_data/. Every subsequent service that needs a country risk score, PEP tier, or industry classification calls this service rather than embedding hardcoded lists.

12 datasets:

DatasetRecordsPurpose
fatf_grey_list22 countriesFATF increased monitoring
fatf_black_list3 countriesFATF high-risk (IR, KP, MM)
eu_high_risk_third_countries27 countriesEU delegated regulation
eu_tax_blacklist10 jurisdictionsEU non-cooperative tax jurisdictions
secrecy_jurisdictions12 jurisdictionsTax Justice Network FSI
cpi_below_4044 countriesCorruption Perception Index < 40
pep_tiers8 levels with scoresPEP classification hierarchy
industry_risk_classification15 sectors with scoresML/TF industry risk
product_risk_taxonomy11 categories with scoresProduct/service risk scoring
sanctions_defaults4 lists + match weightsSanctions configuration
ubo_thresholdsPer-jurisdiction %UBO identification thresholds
dataset_typesSchema definitionsJSON Schema per dataset type

Key methods:

  • get_dataset(name) — returns the full dataset (list or dict)
  • is_in_dataset(dataset_name, value) — membership check for list and dict datasets
  • get_risk_score(dataset_name, key) — returns a numeric risk score, handling three payload shapes: direct numeric, {"score": N}, and {"risk_score": N}
  • get_all_datasets() — sorted list of all loaded dataset names
  • get_reference_data() — module-level factory returning the singleton

Each JSON file uses a standard envelope:

{
"list_key": "fatf_grey_list",
"type_id": "country_risk_list",
"source": "FATF Public Statement",
"source_url": "https://www.fatf-gafi.org/en/topics/grey-list.html",
"source_date": "2026-02-21",
"data": ["AF", "KH", "KG", ...]
}

Integration: The EBARiskMatrix calls get_reference_data() to check fatf_grey_list, eu_high_risk_third_countries, and cpi_below_40 during geographic scoring. The ARIA risk matrix service was previously hardcoding these lists inline.

API endpoint: GET /api/reference-data/{dataset} — exposes all 12 datasets to the frontend. As of 2026-03-31 the reference data browser is integrated into the Risk Configuration admin page (/admin/risk-configuration, Reference Datasets tab) rather than the standalone /admin/reference-data page, which now redirects. The backing endpoint app/api/reference_data.py has been superseded by app/api/risk_config.py.


QualityScorer

backend/app/services/quality_scorer.py

An LLM-as-judge evaluator that runs after the pre-investigation OSINT pass and stores the result in additional_data.quality_scores. It scores the investigation on 5 dimensions, gated by settings.quality_scoring_enabled (with a quality_scoring_mock_mode for demos).

QualityScore dimensions (each scored 0.0–1.0 by the judge):

DimensionWeightCriterion
completeness0.25Are all required verification domains covered with substantive content? No obvious gaps?
source_diversity0.20Findings drawn from a diverse set of independent sources, not a single feed?
evidence_quality0.25Facts attributed to specific data points? No sign of hallucinated or unverified information?
risk_justification0.20Risk assessment logically follows from the evidence?
actionability0.10Recommendations specific and decision-ready for a compliance officer?

overall is the weighted average of the five dimensions (weights above, summing to 1.0), rounded to 3 decimals via QualityScore.compute_overall().

Key functions:

  • score_investigation(case_id, findings, synthesis, sources, discrepancies, verification_checks) — async entry point; renders the quality_scorer prompt via PromptRegistry, runs a PydanticAI Agent with output_type=QualityScore, and returns a QualityScore (or None on failure / when disabled)
  • QualityScore.compute_overall() — computes the weighted average and sets overall
  • QualityScore.to_dict() — serialises to dict for JSONB storage

Judge model: settings.quality_scoring_model, default openai:gpt-4.1 (overridable by env var). The findings list is truncated to the first 20 entries when building the prompt.

Best-effort, non-blocking: Scoring runs as the Temporal activity score_investigation_quality and is wrapped in a try/except in compliance_case.py — failures are logged and the workflow continues. There is no automatic re-investigation or synthesis-retry threshold; the score is stored and surfaced for human review only.

Integration: Invoked as the score_investigation_quality Temporal activity in the pre-investigation tail of compliance_case.py. Scores appear on the case detail page.

Why it exists: Establishes a quality baseline before prompt changes are made in Wave 2. Measures regression when switching model tiers.


ModelTiers

backend/app/services/model_tiers.py

Maps each agent to a cost tier, and each tier to a specific model. Environment variables override the defaults, enabling deployment-time model swaps without code changes.

4 tiers (defaults in _DEFAULT_TIER_MODELS):

TierCriterionDefault Model
premiumAccuracy critical (sanctions, synthesis)openai:gpt-5.2
midJudgment-heavy tasks (most agents)openai:gpt-5.2
valueReserved for future cost optimisationopenai:gpt-4.1-mini
budgetReserved for future cost optimisationopenai:gpt-4.1-mini

13 agent-to-tier assignments (AGENT_TIERS). Note: several agents originally placed in value/budget were upgraded back to mid after the budget tier (gpt-4.1-mini) was found to drop documents with OCR noise and to under-synthesise officer-facing output. As a result only sanctions_resolver and synthesis use premium; everything else currently resolves to mid:

AgentTierNote
sanctions_resolverpremium
synthesispremium
registry_investigationmid
belgian_investigationmid
person_validationmidupgraded from value — identity validation is compliance-critical
adverse_mediamid
task_generatormidupgraded from budget — synthesises all findings into officer actions
document_validatormidupgraded from budget — gpt-4.1-mini drops documents with OCR noise
mcc_classifiermid
scan_synthesismid
case_intelligencemid
document_extractionmidupgraded from budget — same OCR-noise issue
precious_metals_riskmid

Key functions:

  • get_model_for_agent(agent_name, override_tier=None) — resolves agent → tier → model, with fallback to DEFAULT_TIER (mid)
  • get_agent_tier(agent_name) — returns the tier string for a given agent
  • get_tier_config() — returns the current tier-to-model mapping (used by health endpoint)
  • _reload_config() — re-reads environment variables (called by tests to reset state)

Env var overrides: MODEL_TIER_PREMIUM, MODEL_TIER_MID, MODEL_TIER_VALUE, MODEL_TIER_BUDGET

Cost impact: The tier indirection lets a deployment swap models per tier without code changes. The aggressive value/budget assignments that originally targeted a ~60–80% per-investigation cost reduction have largely been rolled back to mid for accuracy reasons, so the realised cost saving today is smaller; the saving is realised only when a deployment overrides the value/budget tiers to a cheaper model via env vars.


Wave 3 Services

EBARiskMatrix

backend/app/services/eba_risk_matrix.py

Implements EBA/GL/2021/02 (Guidelines on risk factors) as a scored 5-dimension matrix with SHA-256 determinism proof. This is intended to replace ARIA's 4-dimension weighted-average scoring as the default risk scoring engine.

5 dimensions and 15 factors:

DimensionWeightFactors
Customer0.30ownership_complexity, pep_exposure, sanctions_exposure, adverse_media, business_profile
Geographic0.25jurisdiction_risk, operational_geography, ubo_geography
Product/Service0.20product_complexity, regulatory_status
Delivery Channel0.10non_face_to_face, digital_presence
Transaction0.15financial_profile, transaction_patterns

Aggregation — weighted_max: The overall score is a weighted average of dimension scores, with a floor boost applied when any single dimension exceeds 80. This means a very high-risk dimension dominates the result rather than being averaged away — consistent with EBA guidance that a single critical risk factor can elevate the overall assessment.

5 risk levels:

LevelScore Range
Critical90+
High70–89
Medium40–69
Low20–39
Clear0–19

SHA-256 audit trail: EBARiskResult carries two hashes:

  • input_hash — SHA-256 of the canonical (sort_keys=True) JSON representation of EBARiskInput
  • output_hash — SHA-256 of the dimension and factor scores

These hashes allow an auditor to verify that a stored result was produced from a specific input without re-running the scorer. The matrix version (eba_standard_v1) is captured in every result, satisfying immutable audit requirements under AML 5-year retention rules.

Key functions:

  • compute_eba_risk(inp: EBARiskInput, config: dict | None = None) -> EBARiskResult — entry point; runs all 5 dimensions, applies _aggregate_weighted_max, hashes input and output. config allows the versioned risk_configurations row to override weights/thresholds
  • apply_risk_calibration(...) — applies segment-specific calibration to a raw score
  • map_risk_level(score) -> str — converts 0–100 score to level string
  • _hash_canonical(obj) -> str — SHA-256 of JSON-serialised dict with sorted keys (MATRIX_VERSION = "eba_standard_v1")

Reference data integration: Geographic dimension calls get_reference_data() for fatf_black_list, fatf_grey_list, eu_high_risk_third_countries, eu_tax_blacklist, secrecy_jurisdictions, and cpi_below_40. Customer dimension uses pep_tiers and product_risk_taxonomy.

Production status: As of 2026-03-31 EBARiskMatrix is the only active risk scoring engine. The ARIA risk matrix and the EBA_RISK_MATRIX_ENABLED feature flag have been removed. Risk configuration is now managed via the versioned risk_configurations table (see Risk Assessment).


DomainEvents

backend/app/services/domain_events.py

Adds correlation/causation chain semantics to the audit log. Every significant action emits a DomainEvent. Events from the same investigation share a correlation_id; each event records the causation_id of the event that triggered it.

Why this matters: The flat audit_events table records what happened and when. Domain event chains record why — "the sanctions match on officer X caused the escalation which caused the 60-day extension which caused the MLRO notification." This chain reconstruction is required for SAR filing and regulatory examination responses.

Data classes:

DomainEvent
event_id: UUID (auto)
event_type: str
data: dict
timestamp: datetime (UTC)
correlation_id: UUID | None — groups all events in one investigation
causation_id: UUID | None — points to the triggering event
actor_id: str | None — user or agent ID
actor_type: str | None — "system" | "user" | "agent"
source_service: str | None — originating service name

EventChain class:

  • EventChain.start(context) — creates a new chain with a fresh correlation_id
  • chain.emit(event_type, data, caused_by, ...) — emits an event linked to caused_by
  • chain.get_chain() — returns all events sorted by timestamp
  • chain.get_caused_by(event) — returns all events directly caused by a specific event

Factory function: create_investigation_chain(case_id) — creates a chain scoped to one investigation; used by Temporal activities so all activities within the same workflow run share a correlation_id.

Integration: Temporal activities call create_investigation_chain() at the start of each investigation workflow run. The chain is passed through activity parameters and used to emit events at each stage (documents submitted, OSINT started, finding discovered, decision made). The correlation_id and causation_id columns are added to the audit_events table via an Alembic migration.


EntityMatcher

backend/app/services/entity_matcher.py

Algorithmic entity matching for cross-investigation deduplication. When two investigations reference the same company or person under slightly different names or registration number formats, this service detects and links them.

Matching pipeline:

  1. Blocking key{jurisdiction}:{first_3_chars_of_stripped_normalised_name}. Entities with different blocking keys are never compared, reducing the comparison space from O(n²) to O(n).
  2. Registration number comparison — exact match after stripping country and court prefixes (CHE, BE, DE, FR9201., etc.). Match → 0.99 confidence.
  3. Legal suffix removal — 25 patterns: Ltd, GmbH, BV, NV, SA, SAS, SARL, SRL, LLC, PLC, Inc, Corp, AG, SE, AB, AS, OY, eG, KG, OHG, GbR, and others.
  4. NFKD normalisation — strips diacritics (Bolloré → Bollore, Société → Societe).
  5. Name similaritySequenceMatcher on normalised strings after suffix removal.
  6. Person matching — parts sorted alphabetically before comparison, so "Jean-Pierre Dupont" matches "Dupont Jean-Pierre".

Match thresholds:

ConfidenceDecision
> 0.85Auto-match
0.70 – 0.85Review queue
< 0.70Distinct entities

Key functions:

  • match_entities(entity_a, entity_b) -> MatchResult — primary entry point; accepts dicts with type, name, registration_number, jurisdiction
  • compute_blocking_key(jurisdiction, name) -> str — generates the blocking key
  • normalize_company_name(name), normalize_person_name(name) — normalisation utilities
  • strip_legal_suffix(name), strip_country_prefix(reg_number) — cleaning utilities

Integration: Runs after graph ETL on newly created entity nodes. Detected potential matches are stored in a review queue. Confirmed matches feed into the SurvivorshipResolver (Wave 4) for field-level conflict resolution.


Wave 4 Services

SurvivorshipResolver

backend/app/services/survivorship.py

When the EntityMatcher links two records as the same entity, their field values may conflict. The SurvivorshipResolver selects the winning value using per-source trust scores, logs all conflicts, and protects sensitive fields from lower-trust overrides.

Trust hierarchy (representative scores):

SourceTrust ScoreCategory
kbo0.98Government registry
gleif0.97Government registry
nbb0.95Government registry
vies0.93EU VAT validation
eori0.93EU customs validation
peppol0.90Official e-invoicing
sanctions_resolver0.96Authoritative sanctions source
northdata0.85Structured third-party API
opencorporates0.84Structured third-party API
brightdata0.72Web scraping
crawl4ai0.70Web scraping
llm_extraction0.75AI-extracted data

Protected fields: Certain fields can only be set by authorised sources regardless of trust score.

FieldAuthorised Sources
is_sanctionedsanctions_resolver, kbo, gleif
is_peppep_resolver, kbo
sanctions_detailssanctions_resolver
pep_detailspep_resolver

Conflict detection: When two sources provide different values for the same field and their trust scores differ by less than TRUST_CONFLICT_DELTA (0.02), both values are preserved as a ConflictRecord for human review rather than silently picking a winner.

Key methods:

  • SurvivorshipResolver.resolve_field(field_name, candidates) -> FieldProvenance — selects the winning value
  • resolver.conflicts — list of ConflictRecord instances accumulated during a resolution session

Audit compliance: Every resolution decision is logged with the winning value, source, trust score, timestamp, and the reason for the decision. This satisfies EU AI Act Art. 12 (automatic logging) and AML 5-year retention requirements for data provenance.

Integration: Called from graph ETL during entity upsert when the EntityMatcher has flagged two records as the same entity. The ConflictRecord list is written to additional_data.survivorship_conflicts on the case for display in the Conflicts Panel (Wave 5 UI).


WorkflowSchema

backend/app/services/workflow_schema.py

A declarative YAML schema and compiler for compliance workflows. Defines the 5 phase types, validates schemas via Pydantic, and compiles them to dependency-ordered ExecutionPlan instances via Kahn's topological sort algorithm.

5 phase types:

TypePurpose
portalCustomer-facing data collection (forms + documents)
investigationAutomated due diligence modules (OSINT agents)
rule_evaluationEBA risk matrix scoring
reviewHuman or auto decision with conditional routing
actionPost-decision steps (activation, notification, scheduling)

Schema structure:

WorkflowSchema
schema_id: str
name: str
version: int
phases: list[WorkflowPhase]
description: str | None
category: str | None — kyb, periodic_review, vendor_dd, etc.
inputs: list[dict] | None
subject_entity_types: list[str] | None
audit: dict | None — retention, hashing config

WorkflowPhase
id: str — unique within the workflow
type: PhaseType
config: dict — phase-specific configuration
depends_on: list[str] | None — phases that must complete first
name: str | None
condition: dict | None — conditional execution for action phases
escalation: dict | None

Compilation: compile_workflow(schema) -> ExecutionPlan runs Kahn's algorithm to produce a topologically sorted list of ExecutionStep instances. Phases that have no unsatisfied dependencies at the same point in the sort are grouped into parallel_groups — these can execute as concurrent Temporal child workflows.

Key functions:

  • validate_workflow(schema) -> list[str] — structural validation; returns error messages (empty = valid). Checks for empty schema_id, duplicate phase IDs, unknown depends_on references, and cyclic dependencies.
  • compile_workflow(schema) -> ExecutionPlan — runs validation then topological sort; returns ExecutionPlan with has_errors=True and populated errors if validation fails.
  • load_workflow_yaml(yaml_text) -> WorkflowSchema — parses YAML, constructs Pydantic model.

Migration path: The hardcoded compliance_case.py Temporal workflow is mirrored by the first YAML template, backend/config/workflow_schemas/kyb_onboarding_v1.yaml. Two further workflow types already ship as YAML templates without code changes — periodic_review_v1.yaml and vendor_due_diligence_v1.yaml — and are listed/inspectable through the workflow-schemas API and admin page (though, as noted above, they are not yet registered as runnable Temporal workflows).

Integration: The compiled ExecutionPlan is consumed by DynamicWorkflowEngine in app/workflows/dynamic_workflow.py, which walks the topologically sorted steps via a DynamicWorkflowState machine (tested in tests/test_dynamic_workflow.py). The compiler and engine are exposed through the app/api/workflow_schemas.py router and the /admin/workflow-schemas frontend page (lists schemas, shows the compiled execution plan). Not yet fully wired to live Temporal execution: DynamicWorkflowEngine is not registered in app/worker.py (the worker registers ComplianceCaseWorkflow, ShipmentComplianceWorkflow, ContinuousMonitoringWorkflow, BatchRiskReEvaluationWorkflow, and SanctionsSuppressionRefreshWorkflow), so YAML-defined workflows are compiled and inspected but the hardcoded compliance_case.py workflow still runs production cases.


Integration Map

Where each service connects to the existing Trust Relay pipeline:

ServiceCalled FromReturns
ReferenceDataServiceEBARiskMatrix, risk engineRisk scores, membership checks
QualityScorerTemporal score_investigation_quality activityQualityScore dict → additional_data.quality_scores
ModelTiersAll agent invocations via get_model_for_agent()Model name string for PydanticAI
EBARiskMatrixTemporal compute_eba_risk activityEBARiskResult with SHA-256 hashes
DomainEventsAll Temporal activitiesDomainEventaudit_events table
EntityMatcherPost-ETL deduplication passMatchResult with confidence score
SurvivorshipResolverGraph ETL entity upsertResolved field values + ConflictRecord list
WorkflowSchemaWorkflow template compilerExecutionPlan → Temporal dynamic workflow

Competitive Context

These services address areas where the Atlas co-founder's codebase had architectural depth that Trust Relay's first implementation lacked.

Before adoption: Risk scoring used a custom 4-dimension weighted-average (ARIA). Entity conflicts were silently resolved with no provenance. Investigation quality was not measured. All agents used the same model. Event chains were flat and uncorrelated.

After adoption:

  • Risk scoring follows EBA/GL/2021/02 with a published SHA-256 determinism proof — auditors can verify any stored result
  • Entity conflicts are resolved transparently by a trust hierarchy, with protected fields that cannot be overwritten by lower-trust sources
  • Every investigation receives a quality score on 5 weighted dimensions (best-effort, surfaced for human review)
  • LLM model selection is centralised per-agent through tiered assignments, enabling deployment-time cost/accuracy trade-offs via env-var tier overrides
  • Every audit event carries a correlation_id and causation_id, enabling full chain reconstruction ("everything that happened because of this sanctions hit")
  • New compliance workflow types are created from YAML without code changes

None of these capabilities require changes to the core investigaton loop, the Temporal workflow state machine, or the customer portal. They integrate as services that the existing pipeline calls.

Trust Relay's architectural strengths that Atlas does not have — multi-tenancy, customer portal, Belgian regulatory depth (5 sources), EVOI decision theory, GovernanceEngine, supervised autonomy, compliance memory (Letta), and the Network Intelligence Hub — are preserved and unaffected by the adoption.