Event-Driven AI Architecture
Every request through the Keeptrusts gateway produces a structured decision event. This guide shows how to build event-driven architectures that consume these events for real-time alerting, audit trails, analytics, and compliance workflows.
Use this page when
- You are building real-time alerting, audit trails, or analytics on gateway decision events
- You need to configure webhook subscriptions for blocked or escalated events
- You want to implement event sourcing or CQRS patterns around governance events
- You are replaying events for compliance audit or post-mortem analysis
Primary audience
- Primary: Technical Engineers
- Secondary: AI Agents, Technical Leaders
Gateway Decision Events
The gateway emits a decision event for every evaluated request, regardless of outcome:
{
"id": "evt_a1b2c3d4",
"timestamp": "2026-04-23T14:30:00Z",
"decision": "allowed",
"model": "gpt-4o",
"provider": "openai",
"input_tokens": 1250,
"output_tokens": 890,
"policies_evaluated": [
{
"name": "content_safety",
"result": "pass",
"latency_ms": 12
},
{
"name": "pii_detection",
"result": "redacted",
"entities_found": 3,
"latency_ms": 8
}
],
"gateway_id": "gw_prod_east",
"user_id": "user_789",
"cost_usd": 0.0034
}
Events flow from the gateway to the control-plane API via POST /v1/events. From there, you can consume them through webhooks, the events API, or exports.
Webhook Subscriptions
Configure webhooks to receive events in real time:
# Create a webhook subscription for blocked events
curl -X POST https://api.keeptrusts.example/v1/webhooks \
-H "Authorization: Bearer $API_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"url": "https://your-service.example/hooks/keeptrusts",
"events": ["event.blocked", "event.escalated"],
"secret": "whsec_your_signing_secret"
}'
Verifying Webhook Signatures
Always verify the HMAC signature before processing:
// webhook-handler.ts
import { createHmac, timingSafeEqual } from "crypto";
function verifyWebhookSignature(
payload: string,
signature: string,
secret: string
): boolean {
const expected = createHmac("sha256", secret)
.update(payload)
.digest("hex");
return timingSafeEqual(
Buffer.from(signature),
Buffer.from(expected)
);
}
export async function handleWebhook(req: Request): Promise<Response> {
const payload = await req.text();
const signature = req.headers.get("x-keeptrusts-signature") ?? "";
if (!verifyWebhookSignature(payload, signature, WEBHOOK_SECRET)) {
return new Response("Invalid signature", { status: 401 });
}
const event = JSON.parse(payload);
await processEvent(event);
return new Response("OK", { status: 200 });
}
Event Sourcing Pattern
Use decision events as the source of truth for AI usage analytics:
Event Processor Implementation
# event_processor.py
import json
from datetime import datetime
from collections import defaultdict
class GovernanceEventProcessor:
"""Process and route gateway decision events."""
def __init__(self, event_store, alert_service):
self.event_store = event_store
self.alert_service = alert_service
self.handlers = {
"allowed": self._handle_allowed,
"blocked": self._handle_blocked,
"escalated": self._handle_escalated,
"redacted": self._handle_redacted,
}
async def process(self, event: dict):
# Persist every event for audit
await self.event_store.append(event)
# Route to decision-specific handler
handler = self.handlers.get(event["decision"])
if handler:
await handler(event)
async def _handle_blocked(self, event: dict):
"""Immediate alert on blocked requests."""
await self.alert_service.send(
channel="governance-alerts",
severity="high",
message=f"Request blocked: {event['id']} "
f"policy={event['policies_evaluated'][0]['name']} "
f"user={event.get('user_id', 'unknown')}"
)
async def _handle_escalated(self, event: dict):
"""Route escalated decisions to human reviewers."""
await self.alert_service.send(
channel="escalation-queue",
severity="critical",
message=f"Escalation required: {event['id']}"
)
async def _handle_allowed(self, event: dict):
"""Track allowed requests for spend analytics."""
pass # Aggregated in analytics pipeline
async def _handle_redacted(self, event: dict):
"""Log redaction events for compliance reporting."""
pass # Stored in event store for audit trail
Async Processing Patterns
Buffered Event Ingestion
For high-throughput gateways, buffer events before processing:
# policy-config.yaml — event batching
gateway:
port: 41002
events:
batch_size: 100
flush_interval_ms: 5000
retry_max_attempts: 3
retry_backoff_ms: 1000
Dead Letter Queue
Handle event delivery failures with a DLQ pattern:
# dlq_handler.py
async def process_with_dlq(event: dict, processor, dlq):
"""Attempt processing with dead-letter fallback."""
try:
await processor.process(event)
except TransientError:
await dlq.enqueue(event, retry_after_seconds=60)
except PermanentError as e:
await dlq.enqueue(event, retry_after_seconds=None, error=str(e))
# Alert on permanent failures — these need manual review
await alert_service.send(
channel="event-failures",
message=f"Permanent event processing failure: {event['id']}"
)
Event Replay for Audit
Replay historical events to reconstruct what happened during an incident:
# Export events for a specific time range
curl -X POST https://api.keeptrusts.example/v1/exports \
-H "Authorization: Bearer $API_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"type": "events",
"format": "jsonl",
"filter": {
"start_time": "2026-04-22T00:00:00Z",
"end_time": "2026-04-23T00:00:00Z",
"decision": "blocked",
"gateway_id": "gw_prod_east"
}
}'
Reconstructing an Audit Trail
# audit_replay.py
async def reconstruct_audit_trail(events: list[dict]) -> dict:
"""Rebuild the timeline of governance decisions."""
timeline = []
for event in sorted(events, key=lambda e: e["timestamp"]):
timeline.append({
"time": event["timestamp"],
"decision": event["decision"],
"model": event["model"],
"user": event.get("user_id"),
"policies": [p["name"] for p in event["policies_evaluated"]],
"cost": event.get("cost_usd", 0),
})
return {
"period": {
"start": timeline[0]["time"],
"end": timeline[-1]["time"],
},
"total_events": len(timeline),
"blocked_count": sum(1 for e in timeline if e["decision"] == "blocked"),
"total_cost": sum(e["cost"] for e in timeline),
"timeline": timeline,
}
Real-Time Dashboards
Stream events to monitoring tools using webhook forwarding:
# Webhook → metrics bridge
# Forward governance events to your observability stack
webhooks:
- url: https://metrics.internal/keeptrusts/ingest
events: ["event.allowed", "event.blocked", "event.redacted"]
headers:
Authorization: "Bearer ${METRICS_TOKEN}"
Useful metrics to derive from events:
| Metric | Source Field | Aggregation |
|---|---|---|
| Block rate | decision == "blocked" | count / total per minute |
| Policy latency p99 | policies_evaluated[].latency_ms | percentile per policy |
| Token spend | input_tokens + output_tokens | sum per hour per team |
| Redaction frequency | policies_evaluated[].entities_found | count per entity type |
Key Takeaways
- Every gateway request generates a structured decision event — treat these as your AI audit log
- Use webhook subscriptions to trigger real-time alerts on blocked or escalated events
- Buffer events for high-throughput scenarios with batch size and flush interval tuning
- Event replay via the exports API enables post-incident audit reconstruction
- Derive operational metrics (block rate, spend, latency) from event streams
For AI systems
- Canonical terms: decision events,
POST /v1/events,POST /v1/webhooks,x-keeptrusts-signature, HMAC-SHA256,event.blocked,event.escalated, event sourcing, event replay, webhook subscription - Key event fields:
id,timestamp,decision,model,provider,input_tokens,output_tokens,policies_evaluated,cost_usd - Best next pages: Database Patterns for AI-Augmented Systems, Observability Patterns, Incident Response
For engineers
- Subscribe to real-time events:
POST /v1/webhookswithevents: ["event.blocked", "event.escalated"] - Always verify webhook HMAC signature with
x-keeptrusts-signaturebefore processing — usetimingSafeEqual - Events are emitted asynchronously from the gateway — event ordering is not guaranteed across gateway instances
- Use
GET /v1/events?since=<timestamp>for catch-up reads if a webhook consumer was offline - Event replay: export and re-ingest for audit or analytics reprocessing
For leaders
- Event-driven architecture enables real-time compliance monitoring without polling — alerts fire within seconds of policy violations
- Webhook integrations connect governance events to existing alerting tools (PagerDuty, Slack, SIEM) without custom code
- Complete audit trails built from events satisfy SOC 2, ISO 27001, and EU AI Act evidence requirements
Next steps
- Database Patterns for AI-Augmented Systems — store and query event data
- Observability for AI-Governed Systems — correlate events with traces and metrics
- Incident Response for AI System Failures — use events for incident detection
- Security Engineering for AI Pipelines — secure webhook endpoints