Skip to main content

Pipeline Resilience (Circuit Breakers)

The OSINT investigation pipeline queries 10+ external services per case. Without explicit resilience patterns, a single degraded upstream — a registry API under maintenance, a rate-limited third-party feed — causes the entire investigation to stall: activity retries pile up, Temporal activity timeouts are consumed, and the parallel pipeline (ADR-0019) is blocked at the registry agent, preventing downstream agents from starting.

Trust Relay uses a circuit breaker registry (ADR-0032) to detect persistent failures early, fail fast on subsequent calls, and allow graceful degradation rather than full pipeline stall.

How It Works

A global singleton circuit_registry (instance of CircuitRegistry) holds one named pybreaker.CircuitBreaker per external service. All service clients call their upstream through:

from app.services.circuit_breaker import circuit_registry, CircuitOpenError

result = await circuit_registry.call("northdata", fetch_company_data(company_id))

The breaker transitions through three states:

StateBehavior
CLOSEDCalls pass through. Failures are counted.
OPENAfter fail_max consecutive failures, all calls raise CircuitOpenError immediately — no network request.
HALF-OPENAfter reset_timeout seconds, one probe request is allowed through. Success → CLOSED. Failure → OPEN again.

Each service has its own breaker. A NorthData outage does not affect the GLEIF breaker.

Default Thresholds

ParameterDefaultMeaning
fail_max5Trip after 5 consecutive failures
reset_timeout30sAllow probe request after 30 seconds

Service-Specific Overrides

Some services have different risk profiles:

Servicefail_maxreset_timeoutReason
brightdata_social3120sBrightData MCP polling takes up to 90s per attempt; 3 failures = 4.5 min wasted
brightdata_person3120sSame as above
tavily360sStrict per-minute rate limits; fast trip prevents credit burn on concurrent investigations

4xx Exclusion Rule

HTTP 4xx responses (400–499) are excluded from failure counting. A 404 "company not found" or a 403 "invalid API key" means the service is responding correctly — only 5xx server errors and connection failures count toward the breaker threshold.

Current Coverage (as of 2026-06-11)

The resilience rollout completed (ADR-0039): 49 circuit_registry.call sites now span the OSINT, registry, and verification services. Breaker names (one per upstream, endpoints sharing a name share failure state):

Breaker nameUpstreamFile(s)
northdataNorthData company lookup + searchnorthdata_service.py
gleifGLEIF LEI lookup (multiple methods)gleif_service.py, verification/gleif.py
nominatimNominatim geocodinggeocoding_service.py
tavilyTavily adverse media searchadverse_media_agent.py
eba_registerEBA register license fetchverification/license_registry.py
viesEU VIES VAT validation + VATSense fallbackvies_service.py
crawl4aicrawl4ai website scrapercrawl4ai_service.py
opensanctionsOpenSanctions feed downloadsanctions_feed_service.py
brightdata_enrichmentBrightData Crunchbase enrichmentbrightdata_enrichment_service.py
brightdata_socialBrightData social intelligenceagents/osint_agent.py
kboKBO lookup + establishmentskbo_service.py
nbbNBB filings/financialsnbb_service.py
peppol_directoryPEPPOL directorypeppol_directory_service.py
cz_aresCzech ARES lookup + health proberegistries/cz_ares_service.py
cz_justiceCzech Justice registryregistries/cz_justice_service.py
cz_isirCzech ISIR insolvencyregistries/cz_isir_service.py
cz_uboCzech UBO registryregistries/cz_ubo_service.py
registry_skSK ORSR search + detailregistries/sk_orsr_service.py
sk_ruzSK RUZ financialsregistries/sk_ruz_service.py
fr_inpiFR INPIregistries/fr_inpi_service.py
fr_inseeFR INSEEregistries/fr_insee_service.py
fr_bodaccFR BODACCregistries/fr_bodacc_service.py
dk_cvrDK CVRregistries/dk_cvr_service.py
ee_ariregisterEE Ariregisterregistries/ee_ariregister_service.py
fi_ytjFI YTJregistries/fi_ytj_service.py
nl_kvkNL KvKregistries/nl_kvk_service.py
no_brregNO BRREGregistries/no_brreg_service.py
ro_anafRO ANAF (company + VAT)registries/ro_anaf_service.py, registries/ro_anaf_company_service.py
ch_zefixCH Zefixregistries/ch_zefix_service.py

Shared-breaker design: services with multiple endpoints share one breaker name (e.g. kbo covers both lookup and _fetch_establishments; vies covers both VIES SOAP and VATSense fallback). All endpoints of the same service hit the same upstream, so one outage trips one breaker — not parallel breakers that never share failure state.

Remaining Gaps (Not Yet Wrapped)

A small number of low-traffic services still call upstreams directly. PEPPOL Verify (the REST verification endpoint, distinct from the wrapped peppol_directory) is the main remaining target for a follow-up pass.

Usage Pattern

Inner async function + circuit_registry.call + CircuitOpenError fallback:

from app.services.circuit_breaker import CircuitOpenError, circuit_registry

async def lookup(company_id: str) -> CompanyData | None:
async def _do_lookup() -> CompanyData:
async with httpx.AsyncClient() as client:
response = await client.get(f"{BASE_URL}/{company_id}")
response.raise_for_status()
return CompanyData(**response.json())

try:
return await circuit_registry.call("my_registry", _do_lookup())
except CircuitOpenError:
logger.warning("my_registry circuit open — skipping lookup")
return None
except Exception as exc:
logger.error("my_registry lookup failed: %s", exc)
return None

The inner function pattern (_do_lookup) ensures the coroutine object is created inside the call rather than at the call site — this matters for async: a coroutine created before circuit_registry.call is already partially advanced if awaited elsewhere.

Operational Behavior

When a breaker opens:

  • CircuitOpenError(service) is raised immediately, with the message "Circuit breaker open for '{service}' — failing fast"
  • The calling service catches it and applies its fallback (returning None, an empty result, or a degraded Finding)
  • The investigation continues with reduced data quality for that service; other services are unaffected

Recovery (30s reset):

  • After reset_timeout seconds, the breaker enters HALF-OPEN state
  • The next call is allowed through as a probe. If it succeeds, the breaker returns to CLOSED and full operation resumes
  • If the probe fails, the breaker returns to OPEN and the timer resets

After worker restart (state persistence — ADR-0040):

  • On startup the worker calls restore_breakers_from_redis(circuit_registry), rehydrating each breaker's open/half-open state from Redis snapshots written by schedule_publish(). A breaker that was open before the restart stays open (respecting the elapsed reset_timeout), so a known-broken upstream is not re-probed by every fresh worker.
  • If Redis is unavailable, restore is skipped (logged, non-fatal) and breakers start CLOSED — they then re-trip after fail_max failures if the upstream is still broken.

Observability (ADR-0040):

  • Each state change publishes the Prometheus gauge trustrelay_circuit_breaker_state (labelled by service; 0=closed, 1=open, 2=half-open) via app/observability/metrics.py.
  • The same hook fire-and-forgets a Redis snapshot of the breaker state for cross-process visibility and the restart-restore path above.