Skip to main content
Browse docs
By Audience
Getting Started
Configuration
Use Cases
IDE Integration
Third-Party Integrations
Engineering Cache
Console
API Reference
Gateway
Workflow Guides
Templates
Providers and SDKs
Industry Guides
Advanced Guides
Browse by Role
Deployment Guides
In-Depth Guides
Tutorials
FAQ

Event-Driven AI Architecture

Every request through the Keeptrusts gateway produces a structured decision event. This guide shows how to build event-driven architectures that consume these events for real-time alerting, audit trails, analytics, and compliance workflows.

Use this page when

  • You are building real-time alerting, audit trails, or analytics on gateway decision events
  • You need to configure webhook subscriptions for blocked or escalated events
  • You want to implement event sourcing or CQRS patterns around governance events
  • You are replaying events for compliance audit or post-mortem analysis

Primary audience

  • Primary: Technical Engineers
  • Secondary: AI Agents, Technical Leaders

Gateway Decision Events

The gateway emits a decision event for every evaluated request, regardless of outcome:

{
"id": "evt_a1b2c3d4",
"timestamp": "2026-04-23T14:30:00Z",
"decision": "allowed",
"model": "gpt-4o",
"provider": "openai",
"input_tokens": 1250,
"output_tokens": 890,
"policies_evaluated": [
{
"name": "content_safety",
"result": "pass",
"latency_ms": 12
},
{
"name": "pii_detection",
"result": "redacted",
"entities_found": 3,
"latency_ms": 8
}
],
"gateway_id": "gw_prod_east",
"user_id": "user_789",
"cost_usd": 0.0034
}

Events flow from the gateway to the control-plane API via POST /v1/events. From there, you can consume them through webhooks, the events API, or exports.

Webhook Subscriptions

Configure webhooks to receive events in real time:

# Create a webhook subscription for blocked events
curl -X POST https://api.keeptrusts.example/v1/webhooks \
-H "Authorization: Bearer $API_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"url": "https://your-service.example/hooks/keeptrusts",
"events": ["event.blocked", "event.escalated"],
"secret": "whsec_your_signing_secret"
}'

Verifying Webhook Signatures

Always verify the HMAC signature before processing:

// webhook-handler.ts
import { createHmac, timingSafeEqual } from "crypto";

function verifyWebhookSignature(
payload: string,
signature: string,
secret: string
): boolean {
const expected = createHmac("sha256", secret)
.update(payload)
.digest("hex");
return timingSafeEqual(
Buffer.from(signature),
Buffer.from(expected)
);
}

export async function handleWebhook(req: Request): Promise<Response> {
const payload = await req.text();
const signature = req.headers.get("x-keeptrusts-signature") ?? "";

if (!verifyWebhookSignature(payload, signature, WEBHOOK_SECRET)) {
return new Response("Invalid signature", { status: 401 });
}

const event = JSON.parse(payload);
await processEvent(event);
return new Response("OK", { status: 200 });
}

Event Sourcing Pattern

Use decision events as the source of truth for AI usage analytics:

Event Processor Implementation

# event_processor.py
import json
from datetime import datetime
from collections import defaultdict

class GovernanceEventProcessor:
"""Process and route gateway decision events."""

def __init__(self, event_store, alert_service):
self.event_store = event_store
self.alert_service = alert_service
self.handlers = {
"allowed": self._handle_allowed,
"blocked": self._handle_blocked,
"escalated": self._handle_escalated,
"redacted": self._handle_redacted,
}

async def process(self, event: dict):
# Persist every event for audit
await self.event_store.append(event)

# Route to decision-specific handler
handler = self.handlers.get(event["decision"])
if handler:
await handler(event)

async def _handle_blocked(self, event: dict):
"""Immediate alert on blocked requests."""
await self.alert_service.send(
channel="governance-alerts",
severity="high",
message=f"Request blocked: {event['id']} "
f"policy={event['policies_evaluated'][0]['name']} "
f"user={event.get('user_id', 'unknown')}"
)

async def _handle_escalated(self, event: dict):
"""Route escalated decisions to human reviewers."""
await self.alert_service.send(
channel="escalation-queue",
severity="critical",
message=f"Escalation required: {event['id']}"
)

async def _handle_allowed(self, event: dict):
"""Track allowed requests for spend analytics."""
pass # Aggregated in analytics pipeline

async def _handle_redacted(self, event: dict):
"""Log redaction events for compliance reporting."""
pass # Stored in event store for audit trail

Async Processing Patterns

Buffered Event Ingestion

For high-throughput gateways, buffer events before processing:

# policy-config.yaml — event batching
gateway:
port: 41002
events:
batch_size: 100
flush_interval_ms: 5000
retry_max_attempts: 3
retry_backoff_ms: 1000

Dead Letter Queue

Handle event delivery failures with a DLQ pattern:

# dlq_handler.py
async def process_with_dlq(event: dict, processor, dlq):
"""Attempt processing with dead-letter fallback."""
try:
await processor.process(event)
except TransientError:
await dlq.enqueue(event, retry_after_seconds=60)
except PermanentError as e:
await dlq.enqueue(event, retry_after_seconds=None, error=str(e))
# Alert on permanent failures — these need manual review
await alert_service.send(
channel="event-failures",
message=f"Permanent event processing failure: {event['id']}"
)

Event Replay for Audit

Replay historical events to reconstruct what happened during an incident:

# Export events for a specific time range
curl -X POST https://api.keeptrusts.example/v1/exports \
-H "Authorization: Bearer $API_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"type": "events",
"format": "jsonl",
"filter": {
"start_time": "2026-04-22T00:00:00Z",
"end_time": "2026-04-23T00:00:00Z",
"decision": "blocked",
"gateway_id": "gw_prod_east"
}
}'

Reconstructing an Audit Trail

# audit_replay.py
async def reconstruct_audit_trail(events: list[dict]) -> dict:
"""Rebuild the timeline of governance decisions."""
timeline = []
for event in sorted(events, key=lambda e: e["timestamp"]):
timeline.append({
"time": event["timestamp"],
"decision": event["decision"],
"model": event["model"],
"user": event.get("user_id"),
"policies": [p["name"] for p in event["policies_evaluated"]],
"cost": event.get("cost_usd", 0),
})

return {
"period": {
"start": timeline[0]["time"],
"end": timeline[-1]["time"],
},
"total_events": len(timeline),
"blocked_count": sum(1 for e in timeline if e["decision"] == "blocked"),
"total_cost": sum(e["cost"] for e in timeline),
"timeline": timeline,
}

Real-Time Dashboards

Stream events to monitoring tools using webhook forwarding:

# Webhook → metrics bridge
# Forward governance events to your observability stack
webhooks:
- url: https://metrics.internal/keeptrusts/ingest
events: ["event.allowed", "event.blocked", "event.redacted"]
headers:
Authorization: "Bearer ${METRICS_TOKEN}"

Useful metrics to derive from events:

MetricSource FieldAggregation
Block ratedecision == "blocked"count / total per minute
Policy latency p99policies_evaluated[].latency_mspercentile per policy
Token spendinput_tokens + output_tokenssum per hour per team
Redaction frequencypolicies_evaluated[].entities_foundcount per entity type

Key Takeaways

  • Every gateway request generates a structured decision event — treat these as your AI audit log
  • Use webhook subscriptions to trigger real-time alerts on blocked or escalated events
  • Buffer events for high-throughput scenarios with batch size and flush interval tuning
  • Event replay via the exports API enables post-incident audit reconstruction
  • Derive operational metrics (block rate, spend, latency) from event streams

For AI systems

  • Canonical terms: decision events, POST /v1/events, POST /v1/webhooks, x-keeptrusts-signature, HMAC-SHA256, event.blocked, event.escalated, event sourcing, event replay, webhook subscription
  • Key event fields: id, timestamp, decision, model, provider, input_tokens, output_tokens, policies_evaluated, cost_usd
  • Best next pages: Database Patterns for AI-Augmented Systems, Observability Patterns, Incident Response

For engineers

  • Subscribe to real-time events: POST /v1/webhooks with events: ["event.blocked", "event.escalated"]
  • Always verify webhook HMAC signature with x-keeptrusts-signature before processing — use timingSafeEqual
  • Events are emitted asynchronously from the gateway — event ordering is not guaranteed across gateway instances
  • Use GET /v1/events?since=<timestamp> for catch-up reads if a webhook consumer was offline
  • Event replay: export and re-ingest for audit or analytics reprocessing

For leaders

  • Event-driven architecture enables real-time compliance monitoring without polling — alerts fire within seconds of policy violations
  • Webhook integrations connect governance events to existing alerting tools (PagerDuty, Slack, SIEM) without custom code
  • Complete audit trails built from events satisfy SOC 2, ISO 27001, and EU AI Act evidence requirements

Next steps