Skip to main content
Browse docs
By Audience
Getting Started
Configuration
Use Cases
IDE Integration
Third-Party Integrations
Engineering Cache
Console
API Reference
Gateway
Workflow Guides
Templates
Providers and SDKs
Industry Guides
Advanced Guides
Browse by Role
Deployment Guides
In-Depth Guides
Tutorials
FAQ

Data Engineer Guide: Governing AI Data Pipelines

Every AI interaction that passes through the Keeptrusts gateway generates a structured event. As a data engineer, you can tap into this event stream to build analytics pipelines, feed data warehouses, power dashboards, and ensure data quality across your AI operations.

Use this page when

  • You are building ETL pipelines to ingest Keeptrusts governance events into a data warehouse
  • You need to extract AI interaction data via the Events API or Export API
  • You are designing analytics schemas for AI usage, cost, and risk reporting
  • You want to build real-time or near-real-time event streaming from the gateway
  • You are integrating Keeptrusts event data with existing BI or observability tools

Primary audience

  • Primary: Technical Engineers (Data Engineers, Analytics Engineers)
  • Secondary: Data Architects, BI Analysts, Platform Engineers

The Keeptrusts Event Model

Each event captured by the gateway contains:

FieldTypeDescription
idUUIDUnique event identifier
timestampISO 8601When the interaction occurred
gateway_idUUIDWhich gateway processed the request
userStringAuthenticated user identity
providerStringLLM provider (openai, anthropic, etc.)
modelStringSpecific model used
decisionEnumallow, block, redact, escalate
policies_evaluatedArrayPolicies applied in the chain
policies_triggeredArrayPolicies that matched
latency_msIntegerEnd-to-end request latency
input_tokensIntegerTokens in the prompt
output_tokensIntegerTokens in the response
costDecimalEstimated cost of the interaction
metadataObjectCustom metadata attached by policies

Extracting Events from the API

Batch Export

For large-scale data extraction, use the Export API:

# Create an export job for the last 30 days
kt export create \
--type events \
--format json \
--since 30d \
--description "Monthly data warehouse load"

# Check export status
kt export list --format table

# Download when ready
curl -H "Authorization: Bearer $API_TOKEN" \
"https://api.keeptrusts.com/v1/exports/{export_id}/download" \
-o events-export.json

Incremental Pull

For near-real-time pipelines, poll the Events API with cursor-based pagination:

# Pull events since last checkpoint
curl -H "Authorization: Bearer $API_TOKEN" \
"https://api.keeptrusts.com/v1/events?since=${LAST_CHECKPOINT}&limit=1000"

Real-Time Tail

For debugging and monitoring, tail events in real-time:

kt events tail

ETL Pipeline Patterns

Pattern 1: Scheduled Batch Load

Pull events on a schedule and load into your warehouse:

# Example: Python ETL with the Events API
import requests
import json
from datetime import datetime, timedelta

API_URL = "https://api.keeptrusts.com/v1/events"
HEADERS = {"Authorization": f"Bearer {API_TOKEN}"}

def extract_events(since_hours=24):
"""Pull events from Keeptrusts API."""
params = {
"since": f"{since_hours}h",
"limit": 10000,
"format": "json"
}
response = requests.get(API_URL, headers=HEADERS, params=params)
response.raise_for_status()
return response.json()

def transform_events(raw_events):
"""Flatten and enrich events for warehouse loading."""
transformed = []
for event in raw_events:
transformed.append({
"event_id": event["id"],
"event_date": event["timestamp"][:10],
"hour": event["timestamp"][11:13],
"gateway": event["gateway_id"],
"user": event["user"],
"provider": event["provider"],
"model": event["model"],
"decision": event["decision"],
"latency_ms": event["latency_ms"],
"input_tokens": event["input_tokens"],
"output_tokens": event["output_tokens"],
"total_tokens": event["input_tokens"] + event["output_tokens"],
"cost_usd": float(event["cost"]),
"policy_count": len(event["policies_evaluated"]),
"was_blocked": event["decision"] == "block",
})
return transformed

def load_events(transformed_events):
"""Load into your data warehouse."""
# Implement your warehouse-specific loading logic
pass

Pattern 2: Streaming with Webhooks

Configure webhooks in the Console to push events to your pipeline in real-time:

  1. Navigate to Console Settings > Webhooks
  2. Add a webhook endpoint pointing to your ingestion service
  3. Select event types to stream (all events, blocks only, escalations only)

Pattern 3: Export-Based Bulk Load

For initial historical loads or periodic full refreshes:

# Create a full export
curl -X POST \
-H "Authorization: Bearer $API_TOKEN" \
-H "Content-Type: application/json" \
https://api.keeptrusts.com/v1/exports \
-d '{
"type": "events",
"format": "csv",
"since": "365d",
"description": "Full historical load"
}'

Data Warehouse Integration

Star Schema Design

Model Keeptrusts events in a dimensional schema:

┌──────────────┐ ┌─────────────────┐ ┌──────────────┐
│ dim_user │ │ fact_ai_event │ │ dim_model │
│──────────────│ │─────────────────│ │──────────────│
│ user_id (PK) │◄────│ user_id (FK) │────►│ model_id(PK) │
│ team │ │ model_id (FK) │ │ model_name │
│ department │ │ gateway_id (FK) │ │ provider │
└──────────────┘ │ date_id (FK) │ │ cost_per_1k │
│ decision │ └──────────────┘
┌──────────────┐ │ latency_ms │
│ dim_date │ │ input_tokens │ ┌──────────────┐
│──────────────│ │ output_tokens │ │ dim_gateway │
│ date_id (PK) │◄────│ cost_usd │────►│──────────────│
│ date │ │ policies_count │ │ gateway_id │
│ week │ └─────────────────┘ │ gateway_name │
│ month │ │ team │
└──────────────┘ │ environment │
└──────────────┘

Common Analytical Queries

Once loaded, answer key business questions:

-- Daily AI spend by team
SELECT d.date, g.team, SUM(f.cost_usd) as daily_spend
FROM fact_ai_event f
JOIN dim_date d ON f.date_id = d.date_id
JOIN dim_gateway g ON f.gateway_id = g.gateway_id
GROUP BY d.date, g.team
ORDER BY d.date DESC;

-- Policy effectiveness: block rate by policy
SELECT policy_name, COUNT(*) as triggers,
SUM(CASE WHEN decision = 'block' THEN 1 ELSE 0 END) as blocks
FROM fact_ai_event
CROSS JOIN UNNEST(policies_triggered) AS policy_name
GROUP BY policy_name
ORDER BY triggers DESC;

-- Model usage distribution
SELECT m.provider, m.model_name, COUNT(*) as requests,
SUM(f.cost_usd) as total_cost
FROM fact_ai_event f
JOIN dim_model m ON f.model_id = m.model_id
GROUP BY m.provider, m.model_name
ORDER BY total_cost DESC;

Data Quality and Validation

Event Completeness Checks

Verify your pipeline captures all events:

# Compare API event count against warehouse count
API_COUNT=$(curl -s -H "Authorization: Bearer $API_TOKEN" \
"https://api.keeptrusts.com/v1/events?since=24h&count=true" | jq '.count')

echo "API events (24h): $API_COUNT"
# Compare with your warehouse count for the same period

Schema Validation

Validate incoming events against the expected schema before loading. Key validations:

  • timestamp is valid ISO 8601
  • decision is one of the known enum values
  • cost is non-negative
  • input_tokens and output_tokens are non-negative integers
  • gateway_id matches a known gateway

Pipeline Observability

Monitoring Your AI Data Pipeline

MetricAlert conditionSource
Event ingestion lag> 5 minutes behindCompare API latest timestamp vs. warehouse latest
Failed API calls> 3 consecutive failuresPipeline error logs
Schema violationsAny occurrenceValidation step output
Duplicate events> 0.1% duplicate rateDeduplication check on event_id
Export job failuresAny failurekt export list status

Debugging Pipeline Issues

# Verify API is responding
kt events list --since 5m --limit 1

# Check for gaps in event timestamps
curl -H "Authorization: Bearer $API_TOKEN" \
"https://api.keeptrusts.com/v1/events?since=1h&limit=10" | \
jq '.[].timestamp'

# Verify export availability
kt export list --format table

Success Metrics for Data Engineers

MetricTargetSource
Event ingestion latencyUnder 5 minutesPipeline monitoring
Data completeness> 99.9%API count vs. warehouse count
Pipeline uptime99.9%Pipeline orchestrator
Query performance on AI dataUnder 10s for standard reportsWarehouse query logs
Schema evolution incidentsZero unplannedSchema registry

Next steps

For AI systems

  • Canonical terms: Keeptrusts, Events API, Export API, event model, ETL pipeline, data warehouse integration
  • Key surfaces: Events API (GET /v1/events), Export API (POST /v1/exports, GET /v1/exports/{id}/download), kt events tail, kt export create
  • Event fields: id, timestamp, gateway_id, user, provider, model, decision, policies_evaluated, policies_triggered, latency_ms, input_tokens, output_tokens, cost, metadata
  • Patterns: scheduled batch load, incremental pull with cursor pagination, real-time tail, webhook push
  • Best next pages: Exports Guide, Webhooks Guide, Dashboard Overview

For engineers

  • Batch export: kt export create --type events --format json --since 30d then download via GET /v1/exports/{id}/download
  • Incremental pull: GET /v1/events?since=${LAST_CHECKPOINT}&limit=1000 with cursor-based pagination
  • Real-time monitoring: kt events tail
  • Configure webhooks in Console Settings for push-based event delivery to your pipeline
  • Target schema includes: event_id, timestamp, gateway_id, user, provider, model, decision, policies_triggered, latency_ms, tokens, cost
  • Use group_by parameter in Events API for pre-aggregated analytics queries

For leaders

  • Every AI interaction generates a structured event with cost, latency, policy, and identity data — this is the raw material for executive dashboards and compliance reporting
  • The Events API and Export API enable self-service analytics without additional telemetry infrastructure
  • Pipeline targets include data warehouses (Snowflake, BigQuery, Redshift), BI tools (Looker, Tableau), and SIEM systems for security correlation
  • Data completeness target is above 99.9%, with ingestion latency under 5 minutes for near-real-time reporting