Atlas Adoption
This page documents the 8 services adopted from a co-founder's parallel Atlas codebase into Trust Relay. Each service was analysed, extracted, and reimplemented in Trust Relay's tech stack (PydanticAI, SQLAlchemy, Alembic, Next.js/shadcn) — no foreign code was introduced. The source analysis is at docs/research/atlas-extraction/ and the design specification is at docs/plans/2026-03-31-atlas-adoption-roadmap-design.md.
Design Principle
Adopt the pattern, not the code. Atlas uses LangChain/LangGraph, Flyway, asyncpg, and Blueprint.js. Trust Relay uses PydanticAI, Alembic, SQLAlchemy, and Next.js. Every item is a reimplementation following existing Trust Relay conventions, not a port.
Architecture Overview — 4 Waves
The 8 services are organised into 4 dependency-ordered waves. Later waves depend on earlier ones: the EBA risk matrix (Wave 3) performs country risk lookups against the reference data loaded in Wave 1, and the survivorship resolver (Wave 4) consumes entity matches produced in Wave 3.
| Wave | Goal | Services |
|---|---|---|
| Wave 1 | Structured reference data, quality measurement, cost-optimised models | ReferenceDataService, QualityScorer, ModelTiers |
| Wave 2 | Regulatory citations in prompts, segment-specific severity | Prompt templates (not standalone services — see app/prompts/) |
| Wave 3 | Regulatory-grade risk scoring, auditable event chains, entity deduplication | EBARiskMatrix, DomainEvents, EntityMatcher |
| Wave 4 | Provable data quality, declarative workflow types | SurvivorshipResolver, WorkflowSchema |
Wave 1 Services
ReferenceDataService
backend/app/services/reference_data_service.py
A singleton that loads 12 JSON compliance datasets at startup from backend/config/reference_data/. Every subsequent service that needs a country risk score, PEP tier, or industry classification calls this service rather than embedding hardcoded lists.
12 datasets:
| Dataset | Records | Purpose |
|---|---|---|
fatf_grey_list | 22 countries | FATF increased monitoring |
fatf_black_list | 3 countries | FATF high-risk (IR, KP, MM) |
eu_high_risk_third_countries | 27 countries | EU delegated regulation |
eu_tax_blacklist | 10 jurisdictions | EU non-cooperative tax jurisdictions |
secrecy_jurisdictions | 12 jurisdictions | Tax Justice Network FSI |
cpi_below_40 | 44 countries | Corruption Perception Index < 40 |
pep_tiers | 8 levels with scores | PEP classification hierarchy |
industry_risk_classification | 15 sectors with scores | ML/TF industry risk |
product_risk_taxonomy | 11 categories with scores | Product/service risk scoring |
sanctions_defaults | 4 lists + match weights | Sanctions configuration |
ubo_thresholds | Per-jurisdiction % | UBO identification thresholds |
dataset_types | Schema definitions | JSON Schema per dataset type |
Key methods:
get_dataset(name)— returns the full dataset (list or dict)is_in_dataset(dataset_name, value)— membership check for list and dict datasetsget_risk_score(dataset_name, key)— returns a numeric risk score, handling three payload shapes: direct numeric,{"score": N}, and{"risk_score": N}get_all_datasets()— sorted list of all loaded dataset namesget_reference_data()— module-level factory returning the singleton
Each JSON file uses a standard envelope:
{
"list_key": "fatf_grey_list",
"type_id": "country_risk_list",
"source": "FATF Public Statement",
"source_url": "https://www.fatf-gafi.org/en/topics/grey-list.html",
"source_date": "2026-02-21",
"data": ["AF", "KH", "KG", ...]
}
Integration: The EBARiskMatrix calls get_reference_data() to check fatf_grey_list, eu_high_risk_third_countries, and cpi_below_40 during geographic scoring. The ARIA risk matrix service was previously hardcoding these lists inline.
API endpoint: GET /api/reference-data/{dataset} — exposes all 12 datasets to the frontend. As of 2026-03-31 the reference data browser is integrated into the Risk Configuration admin page (/admin/risk-configuration, Reference Datasets tab) rather than the standalone /admin/reference-data page, which now redirects. The backing endpoint app/api/reference_data.py has been superseded by app/api/risk_config.py.
QualityScorer
backend/app/services/quality_scorer.py
An LLM-as-judge evaluator that runs after the pre-investigation OSINT pass and stores the result in additional_data.quality_scores. It scores the investigation on 5 dimensions, gated by settings.quality_scoring_enabled (with a quality_scoring_mock_mode for demos).
QualityScore dimensions (each scored 0.0–1.0 by the judge):
| Dimension | Weight | Criterion |
|---|---|---|
completeness | 0.25 | Are all required verification domains covered with substantive content? No obvious gaps? |
source_diversity | 0.20 | Findings drawn from a diverse set of independent sources, not a single feed? |
evidence_quality | 0.25 | Facts attributed to specific data points? No sign of hallucinated or unverified information? |
risk_justification | 0.20 | Risk assessment logically follows from the evidence? |
actionability | 0.10 | Recommendations specific and decision-ready for a compliance officer? |
overall is the weighted average of the five dimensions (weights above, summing to 1.0), rounded to 3 decimals via QualityScore.compute_overall().
Key functions:
score_investigation(case_id, findings, synthesis, sources, discrepancies, verification_checks)— async entry point; renders thequality_scorerprompt viaPromptRegistry, runs a PydanticAIAgentwithoutput_type=QualityScore, and returns aQualityScore(orNoneon failure / when disabled)QualityScore.compute_overall()— computes the weighted average and setsoverallQualityScore.to_dict()— serialises to dict for JSONB storage
Judge model: settings.quality_scoring_model, default openai:gpt-4.1 (overridable by env var). The findings list is truncated to the first 20 entries when building the prompt.
Best-effort, non-blocking: Scoring runs as the Temporal activity score_investigation_quality and is wrapped in a try/except in compliance_case.py — failures are logged and the workflow continues. There is no automatic re-investigation or synthesis-retry threshold; the score is stored and surfaced for human review only.
Integration: Invoked as the score_investigation_quality Temporal activity in the pre-investigation tail of compliance_case.py. Scores appear on the case detail page.
Why it exists: Establishes a quality baseline before prompt changes are made in Wave 2. Measures regression when switching model tiers.
ModelTiers
backend/app/services/model_tiers.py
Maps each agent to a cost tier, and each tier to a specific model. Environment variables override the defaults, enabling deployment-time model swaps without code changes.
4 tiers (defaults in _DEFAULT_TIER_MODELS):
| Tier | Criterion | Default Model |
|---|---|---|
premium | Accuracy critical (sanctions, synthesis) | openai:gpt-5.2 |
mid | Judgment-heavy tasks (most agents) | openai:gpt-5.2 |
value | Reserved for future cost optimisation | openai:gpt-4.1-mini |
budget | Reserved for future cost optimisation | openai:gpt-4.1-mini |
13 agent-to-tier assignments (AGENT_TIERS). Note: several agents originally placed in value/budget were upgraded back to mid after the budget tier (gpt-4.1-mini) was found to drop documents with OCR noise and to under-synthesise officer-facing output. As a result only sanctions_resolver and synthesis use premium; everything else currently resolves to mid:
| Agent | Tier | Note |
|---|---|---|
sanctions_resolver | premium | |
synthesis | premium | |
registry_investigation | mid | |
belgian_investigation | mid | |
person_validation | mid | upgraded from value — identity validation is compliance-critical |
adverse_media | mid | |
task_generator | mid | upgraded from budget — synthesises all findings into officer actions |
document_validator | mid | upgraded from budget — gpt-4.1-mini drops documents with OCR noise |
mcc_classifier | mid | |
scan_synthesis | mid | |
case_intelligence | mid | |
document_extraction | mid | upgraded from budget — same OCR-noise issue |
precious_metals_risk | mid |
Key functions:
get_model_for_agent(agent_name, override_tier=None)— resolves agent → tier → model, with fallback toDEFAULT_TIER(mid)get_agent_tier(agent_name)— returns the tier string for a given agentget_tier_config()— returns the current tier-to-model mapping (used by health endpoint)_reload_config()— re-reads environment variables (called by tests to reset state)
Env var overrides: MODEL_TIER_PREMIUM, MODEL_TIER_MID, MODEL_TIER_VALUE, MODEL_TIER_BUDGET
Cost impact: The tier indirection lets a deployment swap models per tier without code changes. The aggressive value/budget assignments that originally targeted a ~60–80% per-investigation cost reduction have largely been rolled back to mid for accuracy reasons, so the realised cost saving today is smaller; the saving is realised only when a deployment overrides the value/budget tiers to a cheaper model via env vars.
Wave 3 Services
EBARiskMatrix
backend/app/services/eba_risk_matrix.py
Implements EBA/GL/2021/02 (Guidelines on risk factors) as a scored 5-dimension matrix with SHA-256 determinism proof. This is intended to replace ARIA's 4-dimension weighted-average scoring as the default risk scoring engine.
5 dimensions and 15 factors:
| Dimension | Weight | Factors |
|---|---|---|
| Customer | 0.30 | ownership_complexity, pep_exposure, sanctions_exposure, adverse_media, business_profile |
| Geographic | 0.25 | jurisdiction_risk, operational_geography, ubo_geography |
| Product/Service | 0.20 | product_complexity, regulatory_status |
| Delivery Channel | 0.10 | non_face_to_face, digital_presence |
| Transaction | 0.15 | financial_profile, transaction_patterns |
Aggregation — weighted_max: The overall score is a weighted average of dimension scores, with a floor boost applied when any single dimension exceeds 80. This means a very high-risk dimension dominates the result rather than being averaged away — consistent with EBA guidance that a single critical risk factor can elevate the overall assessment.
5 risk levels:
| Level | Score Range |
|---|---|
| Critical | 90+ |
| High | 70–89 |
| Medium | 40–69 |
| Low | 20–39 |
| Clear | 0–19 |
SHA-256 audit trail: EBARiskResult carries two hashes:
input_hash— SHA-256 of the canonical (sort_keys=True) JSON representation ofEBARiskInputoutput_hash— SHA-256 of the dimension and factor scores
These hashes allow an auditor to verify that a stored result was produced from a specific input without re-running the scorer. The matrix version (eba_standard_v1) is captured in every result, satisfying immutable audit requirements under AML 5-year retention rules.
Key functions:
compute_eba_risk(inp: EBARiskInput, config: dict | None = None) -> EBARiskResult— entry point; runs all 5 dimensions, applies_aggregate_weighted_max, hashes input and output.configallows the versionedrisk_configurationsrow to override weights/thresholdsapply_risk_calibration(...)— applies segment-specific calibration to a raw scoremap_risk_level(score) -> str— converts 0–100 score to level string_hash_canonical(obj) -> str— SHA-256 of JSON-serialised dict with sorted keys (MATRIX_VERSION = "eba_standard_v1")
Reference data integration: Geographic dimension calls get_reference_data() for fatf_black_list, fatf_grey_list, eu_high_risk_third_countries, eu_tax_blacklist, secrecy_jurisdictions, and cpi_below_40. Customer dimension uses pep_tiers and product_risk_taxonomy.
Production status: As of 2026-03-31 EBARiskMatrix is the only active risk scoring engine. The ARIA risk matrix and the EBA_RISK_MATRIX_ENABLED feature flag have been removed. Risk configuration is now managed via the versioned risk_configurations table (see Risk Assessment).
DomainEvents
backend/app/services/domain_events.py
Adds correlation/causation chain semantics to the audit log. Every significant action emits a DomainEvent. Events from the same investigation share a correlation_id; each event records the causation_id of the event that triggered it.
Why this matters: The flat audit_events table records what happened and when. Domain event chains record why — "the sanctions match on officer X caused the escalation which caused the 60-day extension which caused the MLRO notification." This chain reconstruction is required for SAR filing and regulatory examination responses.
Data classes:
DomainEvent
event_id: UUID (auto)
event_type: str
data: dict
timestamp: datetime (UTC)
correlation_id: UUID | None — groups all events in one investigation
causation_id: UUID | None — points to the triggering event
actor_id: str | None — user or agent ID
actor_type: str | None — "system" | "user" | "agent"
source_service: str | None — originating service name
EventChain class:
EventChain.start(context)— creates a new chain with a freshcorrelation_idchain.emit(event_type, data, caused_by, ...)— emits an event linked tocaused_bychain.get_chain()— returns all events sorted by timestampchain.get_caused_by(event)— returns all events directly caused by a specific event
Factory function: create_investigation_chain(case_id) — creates a chain scoped to one investigation; used by Temporal activities so all activities within the same workflow run share a correlation_id.
Integration: Temporal activities call create_investigation_chain() at the start of each investigation workflow run. The chain is passed through activity parameters and used to emit events at each stage (documents submitted, OSINT started, finding discovered, decision made). The correlation_id and causation_id columns are added to the audit_events table via an Alembic migration.
EntityMatcher
backend/app/services/entity_matcher.py
Algorithmic entity matching for cross-investigation deduplication. When two investigations reference the same company or person under slightly different names or registration number formats, this service detects and links them.
Matching pipeline:
- Blocking key —
{jurisdiction}:{first_3_chars_of_stripped_normalised_name}. Entities with different blocking keys are never compared, reducing the comparison space from O(n²) to O(n). - Registration number comparison — exact match after stripping country and court prefixes (CHE, BE, DE, FR9201., etc.). Match → 0.99 confidence.
- Legal suffix removal — 25 patterns: Ltd, GmbH, BV, NV, SA, SAS, SARL, SRL, LLC, PLC, Inc, Corp, AG, SE, AB, AS, OY, eG, KG, OHG, GbR, and others.
- NFKD normalisation — strips diacritics (Bolloré → Bollore, Société → Societe).
- Name similarity —
SequenceMatcheron normalised strings after suffix removal. - Person matching — parts sorted alphabetically before comparison, so "Jean-Pierre Dupont" matches "Dupont Jean-Pierre".
Match thresholds:
| Confidence | Decision |
|---|---|
| > 0.85 | Auto-match |
| 0.70 – 0.85 | Review queue |
| < 0.70 | Distinct entities |
Key functions:
match_entities(entity_a, entity_b) -> MatchResult— primary entry point; accepts dicts withtype,name,registration_number,jurisdictioncompute_blocking_key(jurisdiction, name) -> str— generates the blocking keynormalize_company_name(name),normalize_person_name(name)— normalisation utilitiesstrip_legal_suffix(name),strip_country_prefix(reg_number)— cleaning utilities
Integration: Runs after graph ETL on newly created entity nodes. Detected potential matches are stored in a review queue. Confirmed matches feed into the SurvivorshipResolver (Wave 4) for field-level conflict resolution.
Wave 4 Services
SurvivorshipResolver
backend/app/services/survivorship.py
When the EntityMatcher links two records as the same entity, their field values may conflict. The SurvivorshipResolver selects the winning value using per-source trust scores, logs all conflicts, and protects sensitive fields from lower-trust overrides.
Trust hierarchy (representative scores):
| Source | Trust Score | Category |
|---|---|---|
kbo | 0.98 | Government registry |
gleif | 0.97 | Government registry |
nbb | 0.95 | Government registry |
vies | 0.93 | EU VAT validation |
eori | 0.93 | EU customs validation |
peppol | 0.90 | Official e-invoicing |
sanctions_resolver | 0.96 | Authoritative sanctions source |
northdata | 0.85 | Structured third-party API |
opencorporates | 0.84 | Structured third-party API |
brightdata | 0.72 | Web scraping |
crawl4ai | 0.70 | Web scraping |
llm_extraction | 0.75 | AI-extracted data |
Protected fields: Certain fields can only be set by authorised sources regardless of trust score.
| Field | Authorised Sources |
|---|---|
is_sanctioned | sanctions_resolver, kbo, gleif |
is_pep | pep_resolver, kbo |
sanctions_details | sanctions_resolver |
pep_details | pep_resolver |
Conflict detection: When two sources provide different values for the same field and their trust scores differ by less than TRUST_CONFLICT_DELTA (0.02), both values are preserved as a ConflictRecord for human review rather than silently picking a winner.
Key methods:
SurvivorshipResolver.resolve_field(field_name, candidates) -> FieldProvenance— selects the winning valueresolver.conflicts— list ofConflictRecordinstances accumulated during a resolution session
Audit compliance: Every resolution decision is logged with the winning value, source, trust score, timestamp, and the reason for the decision. This satisfies EU AI Act Art. 12 (automatic logging) and AML 5-year retention requirements for data provenance.
Integration: Called from graph ETL during entity upsert when the EntityMatcher has flagged two records as the same entity. The ConflictRecord list is written to additional_data.survivorship_conflicts on the case for display in the Conflicts Panel (Wave 5 UI).
WorkflowSchema
backend/app/services/workflow_schema.py
A declarative YAML schema and compiler for compliance workflows. Defines the 5 phase types, validates schemas via Pydantic, and compiles them to dependency-ordered ExecutionPlan instances via Kahn's topological sort algorithm.
5 phase types:
| Type | Purpose |
|---|---|
portal | Customer-facing data collection (forms + documents) |
investigation | Automated due diligence modules (OSINT agents) |
rule_evaluation | EBA risk matrix scoring |
review | Human or auto decision with conditional routing |
action | Post-decision steps (activation, notification, scheduling) |
Schema structure:
WorkflowSchema
schema_id: str
name: str
version: int
phases: list[WorkflowPhase]
description: str | None
category: str | None — kyb, periodic_review, vendor_dd, etc.
inputs: list[dict] | None
subject_entity_types: list[str] | None
audit: dict | None — retention, hashing config
WorkflowPhase
id: str — unique within the workflow
type: PhaseType
config: dict — phase-specific configuration
depends_on: list[str] | None — phases that must complete first
name: str | None
condition: dict | None — conditional execution for action phases
escalation: dict | None
Compilation: compile_workflow(schema) -> ExecutionPlan runs Kahn's algorithm to produce a topologically sorted list of ExecutionStep instances. Phases that have no unsatisfied dependencies at the same point in the sort are grouped into parallel_groups — these can execute as concurrent Temporal child workflows.
Key functions:
validate_workflow(schema) -> list[str]— structural validation; returns error messages (empty = valid). Checks for empty schema_id, duplicate phase IDs, unknowndepends_onreferences, and cyclic dependencies.compile_workflow(schema) -> ExecutionPlan— runs validation then topological sort; returnsExecutionPlanwithhas_errors=Trueand populatederrorsif validation fails.load_workflow_yaml(yaml_text) -> WorkflowSchema— parses YAML, constructs Pydantic model.
Migration path: The hardcoded compliance_case.py Temporal workflow is mirrored by the first YAML template, backend/config/workflow_schemas/kyb_onboarding_v1.yaml. Two further workflow types already ship as YAML templates without code changes — periodic_review_v1.yaml and vendor_due_diligence_v1.yaml — and are listed/inspectable through the workflow-schemas API and admin page (though, as noted above, they are not yet registered as runnable Temporal workflows).
Integration: The compiled ExecutionPlan is consumed by DynamicWorkflowEngine in app/workflows/dynamic_workflow.py, which walks the topologically sorted steps via a DynamicWorkflowState machine (tested in tests/test_dynamic_workflow.py). The compiler and engine are exposed through the app/api/workflow_schemas.py router and the /admin/workflow-schemas frontend page (lists schemas, shows the compiled execution plan). Not yet fully wired to live Temporal execution: DynamicWorkflowEngine is not registered in app/worker.py (the worker registers ComplianceCaseWorkflow, ShipmentComplianceWorkflow, ContinuousMonitoringWorkflow, BatchRiskReEvaluationWorkflow, and SanctionsSuppressionRefreshWorkflow), so YAML-defined workflows are compiled and inspected but the hardcoded compliance_case.py workflow still runs production cases.
Integration Map
Where each service connects to the existing Trust Relay pipeline:
| Service | Called From | Returns |
|---|---|---|
ReferenceDataService | EBARiskMatrix, risk engine | Risk scores, membership checks |
QualityScorer | Temporal score_investigation_quality activity | QualityScore dict → additional_data.quality_scores |
ModelTiers | All agent invocations via get_model_for_agent() | Model name string for PydanticAI |
EBARiskMatrix | Temporal compute_eba_risk activity | EBARiskResult with SHA-256 hashes |
DomainEvents | All Temporal activities | DomainEvent → audit_events table |
EntityMatcher | Post-ETL deduplication pass | MatchResult with confidence score |
SurvivorshipResolver | Graph ETL entity upsert | Resolved field values + ConflictRecord list |
WorkflowSchema | Workflow template compiler | ExecutionPlan → Temporal dynamic workflow |
Competitive Context
These services address areas where the Atlas co-founder's codebase had architectural depth that Trust Relay's first implementation lacked.
Before adoption: Risk scoring used a custom 4-dimension weighted-average (ARIA). Entity conflicts were silently resolved with no provenance. Investigation quality was not measured. All agents used the same model. Event chains were flat and uncorrelated.
After adoption:
- Risk scoring follows EBA/GL/2021/02 with a published SHA-256 determinism proof — auditors can verify any stored result
- Entity conflicts are resolved transparently by a trust hierarchy, with protected fields that cannot be overwritten by lower-trust sources
- Every investigation receives a quality score on 5 weighted dimensions (best-effort, surfaced for human review)
- LLM model selection is centralised per-agent through tiered assignments, enabling deployment-time cost/accuracy trade-offs via env-var tier overrides
- Every audit event carries a
correlation_idandcausation_id, enabling full chain reconstruction ("everything that happened because of this sanctions hit") - New compliance workflow types are created from YAML without code changes
None of these capabilities require changes to the core investigaton loop, the Temporal workflow state machine, or the customer portal. They integrate as services that the existing pipeline calls.
Trust Relay's architectural strengths that Atlas does not have — multi-tenancy, customer portal, Belgian regulatory depth (5 sources), EVOI decision theory, GovernanceEngine, supervised autonomy, compliance memory (Letta), and the Network Intelligence Hub — are preserved and unaffected by the adoption.