Data Engineer Guide: Governing AI Data Pipelines
Every AI interaction that passes through the Keeptrusts gateway generates a structured event. As a data engineer, you can tap into this event stream to build analytics pipelines, feed data warehouses, power dashboards, and ensure data quality across your AI operations.
Use this page when
- You are building ETL pipelines to ingest Keeptrusts governance events into a data warehouse
- You need to extract AI interaction data via the Events API or Export API
- You are designing analytics schemas for AI usage, cost, and risk reporting
- You want to build real-time or near-real-time event streaming from the gateway
- You are integrating Keeptrusts event data with existing BI or observability tools
Primary audience
- Primary: Technical Engineers (Data Engineers, Analytics Engineers)
- Secondary: Data Architects, BI Analysts, Platform Engineers
The Keeptrusts Event Model
Each event captured by the gateway contains:
| Field | Type | Description |
|---|---|---|
id | UUID | Unique event identifier |
timestamp | ISO 8601 | When the interaction occurred |
gateway_id | UUID | Which gateway processed the request |
user | String | Authenticated user identity |
provider | String | LLM provider (openai, anthropic, etc.) |
model | String | Specific model used |
decision | Enum | allow, block, redact, escalate |
policies_evaluated | Array | Policies applied in the chain |
policies_triggered | Array | Policies that matched |
latency_ms | Integer | End-to-end request latency |
input_tokens | Integer | Tokens in the prompt |
output_tokens | Integer | Tokens in the response |
cost | Decimal | Estimated cost of the interaction |
metadata | Object | Custom metadata attached by policies |
Extracting Events from the API
Batch Export
For large-scale data extraction, use the Export API:
# Create an export job for the last 30 days
kt export create \
--type events \
--format json \
--since 30d \
--description "Monthly data warehouse load"
# Check export status
kt export list --format table
# Download when ready
curl -H "Authorization: Bearer $API_TOKEN" \
"https://api.keeptrusts.com/v1/exports/{export_id}/download" \
-o events-export.json
Incremental Pull
For near-real-time pipelines, poll the Events API with cursor-based pagination:
# Pull events since last checkpoint
curl -H "Authorization: Bearer $API_TOKEN" \
"https://api.keeptrusts.com/v1/events?since=${LAST_CHECKPOINT}&limit=1000"
Real-Time Tail
For debugging and monitoring, tail events in real-time:
kt events tail
ETL Pipeline Patterns
Pattern 1: Scheduled Batch Load
Pull events on a schedule and load into your warehouse:
# Example: Python ETL with the Events API
import requests
import json
from datetime import datetime, timedelta
API_URL = "https://api.keeptrusts.com/v1/events"
HEADERS = {"Authorization": f"Bearer {API_TOKEN}"}
def extract_events(since_hours=24):
"""Pull events from Keeptrusts API."""
params = {
"since": f"{since_hours}h",
"limit": 10000,
"format": "json"
}
response = requests.get(API_URL, headers=HEADERS, params=params)
response.raise_for_status()
return response.json()
def transform_events(raw_events):
"""Flatten and enrich events for warehouse loading."""
transformed = []
for event in raw_events:
transformed.append({
"event_id": event["id"],
"event_date": event["timestamp"][:10],
"hour": event["timestamp"][11:13],
"gateway": event["gateway_id"],
"user": event["user"],
"provider": event["provider"],
"model": event["model"],
"decision": event["decision"],
"latency_ms": event["latency_ms"],
"input_tokens": event["input_tokens"],
"output_tokens": event["output_tokens"],
"total_tokens": event["input_tokens"] + event["output_tokens"],
"cost_usd": float(event["cost"]),
"policy_count": len(event["policies_evaluated"]),
"was_blocked": event["decision"] == "block",
})
return transformed
def load_events(transformed_events):
"""Load into your data warehouse."""
# Implement your warehouse-specific loading logic
pass
Pattern 2: Streaming with Webhooks
Configure webhooks in the Console to push events to your pipeline in real-time:
- Navigate to Console Settings > Webhooks
- Add a webhook endpoint pointing to your ingestion service
- Select event types to stream (all events, blocks only, escalations only)
Pattern 3: Export-Based Bulk Load
For initial historical loads or periodic full refreshes:
# Create a full export
curl -X POST \
-H "Authorization: Bearer $API_TOKEN" \
-H "Content-Type: application/json" \
https://api.keeptrusts.com/v1/exports \
-d '{
"type": "events",
"format": "csv",
"since": "365d",
"description": "Full historical load"
}'
Data Warehouse Integration
Star Schema Design
Model Keeptrusts events in a dimensional schema:
┌──────────────┐ ┌─────────────────┐ ┌──────────────┐
│ dim_user │ │ fact_ai_event │ │ dim_model │
│──────────────│ │─────────────────│ │──────────────│
│ user_id (PK) │◄────│ user_id (FK) │────►│ model_id(PK) │
│ team │ │ model_id (FK) │ │ model_name │
│ department │ │ gateway_id (FK) │ │ provider │
└──────────────┘ │ date_id (FK) │ │ cost_per_1k │
│ decision │ └──────────────┘
┌──────────────┐ │ latency_ms │
│ dim_date │ │ input_tokens │ ┌──────────────┐
│──────────────│ │ output_tokens │ │ dim_gateway │
│ date_id (PK) │◄────│ cost_usd │────►│──────────────│
│ date │ │ policies_count │ │ gateway_id │
│ week │ └─────────────────┘ │ gateway_name │
│ month │ │ team │
└──────────────┘ │ environment │
└──────────────┘
Common Analytical Queries
Once loaded, answer key business questions:
-- Daily AI spend by team
SELECT d.date, g.team, SUM(f.cost_usd) as daily_spend
FROM fact_ai_event f
JOIN dim_date d ON f.date_id = d.date_id
JOIN dim_gateway g ON f.gateway_id = g.gateway_id
GROUP BY d.date, g.team
ORDER BY d.date DESC;
-- Policy effectiveness: block rate by policy
SELECT policy_name, COUNT(*) as triggers,
SUM(CASE WHEN decision = 'block' THEN 1 ELSE 0 END) as blocks
FROM fact_ai_event
CROSS JOIN UNNEST(policies_triggered) AS policy_name
GROUP BY policy_name
ORDER BY triggers DESC;
-- Model usage distribution
SELECT m.provider, m.model_name, COUNT(*) as requests,
SUM(f.cost_usd) as total_cost
FROM fact_ai_event f
JOIN dim_model m ON f.model_id = m.model_id
GROUP BY m.provider, m.model_name
ORDER BY total_cost DESC;
Data Quality and Validation
Event Completeness Checks
Verify your pipeline captures all events:
# Compare API event count against warehouse count
API_COUNT=$(curl -s -H "Authorization: Bearer $API_TOKEN" \
"https://api.keeptrusts.com/v1/events?since=24h&count=true" | jq '.count')
echo "API events (24h): $API_COUNT"
# Compare with your warehouse count for the same period
Schema Validation
Validate incoming events against the expected schema before loading. Key validations:
timestampis valid ISO 8601decisionis one of the known enum valuescostis non-negativeinput_tokensandoutput_tokensare non-negative integersgateway_idmatches a known gateway
Pipeline Observability
Monitoring Your AI Data Pipeline
| Metric | Alert condition | Source |
|---|---|---|
| Event ingestion lag | > 5 minutes behind | Compare API latest timestamp vs. warehouse latest |
| Failed API calls | > 3 consecutive failures | Pipeline error logs |
| Schema violations | Any occurrence | Validation step output |
| Duplicate events | > 0.1% duplicate rate | Deduplication check on event_id |
| Export job failures | Any failure | kt export list status |
Debugging Pipeline Issues
# Verify API is responding
kt events list --since 5m --limit 1
# Check for gaps in event timestamps
curl -H "Authorization: Bearer $API_TOKEN" \
"https://api.keeptrusts.com/v1/events?since=1h&limit=10" | \
jq '.[].timestamp'
# Verify export availability
kt export list --format table
Success Metrics for Data Engineers
| Metric | Target | Source |
|---|---|---|
| Event ingestion latency | Under 5 minutes | Pipeline monitoring |
| Data completeness | > 99.9% | API count vs. warehouse count |
| Pipeline uptime | 99.9% | Pipeline orchestrator |
| Query performance on AI data | Under 10s for standard reports | Warehouse query logs |
| Schema evolution incidents | Zero unplanned | Schema registry |
Next steps
- Set up exports: Exports Guide
- Configure webhooks: Webhooks Guide
- Build dashboards: Dashboard Overview
For AI systems
- Canonical terms: Keeptrusts, Events API, Export API, event model, ETL pipeline, data warehouse integration
- Key surfaces: Events API (
GET /v1/events), Export API (POST /v1/exports,GET /v1/exports/{id}/download),kt events tail,kt export create - Event fields:
id,timestamp,gateway_id,user,provider,model,decision,policies_evaluated,policies_triggered,latency_ms,input_tokens,output_tokens,cost,metadata - Patterns: scheduled batch load, incremental pull with cursor pagination, real-time tail, webhook push
- Best next pages: Exports Guide, Webhooks Guide, Dashboard Overview
For engineers
- Batch export:
kt export create --type events --format json --since 30dthen download viaGET /v1/exports/{id}/download - Incremental pull:
GET /v1/events?since=${LAST_CHECKPOINT}&limit=1000with cursor-based pagination - Real-time monitoring:
kt events tail - Configure webhooks in Console Settings for push-based event delivery to your pipeline
- Target schema includes: event_id, timestamp, gateway_id, user, provider, model, decision, policies_triggered, latency_ms, tokens, cost
- Use
group_byparameter in Events API for pre-aggregated analytics queries
For leaders
- Every AI interaction generates a structured event with cost, latency, policy, and identity data — this is the raw material for executive dashboards and compliance reporting
- The Events API and Export API enable self-service analytics without additional telemetry infrastructure
- Pipeline targets include data warehouses (Snowflake, BigQuery, Redshift), BI tools (Looker, Tableau), and SIEM systems for security correlation
- Data completeness target is above 99.9%, with ingestion latency under 5 minutes for near-real-time reporting