Skip to main content
Browse docs
By Audience
Getting Started
Configuration
Use Cases
IDE Integration
Third-Party Integrations
Engineering Cache
Console
API Reference
Gateway
Workflow Guides
Templates
Providers and SDKs
Industry Guides
Advanced Guides
Browse by Role
Deployment Guides
In-Depth Guides
Tutorials
FAQ

Database Patterns for AI-Augmented Systems

Keeptrusts generates structured decision events for every governed AI interaction. This guide covers schema design for storing and querying these events, building analytics on governance data, and exporting to external data warehouses.

Use this page when

  • You are exporting decision events from Keeptrusts into your own analytics database
  • You need to design schemas for storing and querying governance event data
  • You are building time-series dashboards on AI interaction metrics
  • You want to export events to a data warehouse (Parquet, S3) for compliance or BI workflows

Primary audience

  • Primary: Technical Engineers
  • Secondary: AI Agents, Technical Leaders

Event Storage Schema

The control-plane API stores decision events in a normalized schema. When exporting events to your own analytics database, mirror this structure:

-- Event storage schema for exported governance data
CREATE TABLE ai_decision_events (
id TEXT PRIMARY KEY,
timestamp TIMESTAMPTZ NOT NULL,
decision TEXT NOT NULL, -- 'allowed', 'blocked', 'escalated', 'redacted'
model TEXT NOT NULL,
provider TEXT NOT NULL,
gateway_id TEXT NOT NULL,
user_id TEXT,
team_id TEXT,
input_tokens INTEGER NOT NULL DEFAULT 0,
output_tokens INTEGER NOT NULL DEFAULT 0,
cost_usd NUMERIC(12, 6),
latency_ms INTEGER,
request_source TEXT,
batch_id TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

-- Policy evaluation details (one-to-many)
CREATE TABLE policy_evaluations (
id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
event_id TEXT NOT NULL REFERENCES ai_decision_events(id),
policy_name TEXT NOT NULL,
result TEXT NOT NULL, -- 'pass', 'block', 'redact', 'flag', 'escalate'
latency_ms INTEGER,
details JSONB,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

-- Indexes for common query patterns
CREATE INDEX idx_events_timestamp ON ai_decision_events (timestamp DESC);
CREATE INDEX idx_events_decision ON ai_decision_events (decision, timestamp DESC);
CREATE INDEX idx_events_gateway ON ai_decision_events (gateway_id, timestamp DESC);
CREATE INDEX idx_events_user ON ai_decision_events (user_id, timestamp DESC);
CREATE INDEX idx_events_team ON ai_decision_events (team_id, timestamp DESC);
CREATE INDEX idx_evaluations_event ON policy_evaluations (event_id);
CREATE INDEX idx_evaluations_policy ON policy_evaluations (policy_name, result);

Partitioning for Scale

For high-volume deployments, partition the events table by time:

-- Time-based partitioning (PostgreSQL)
CREATE TABLE ai_decision_events (
id TEXT NOT NULL,
timestamp TIMESTAMPTZ NOT NULL,
decision TEXT NOT NULL,
model TEXT NOT NULL,
provider TEXT NOT NULL,
gateway_id TEXT NOT NULL,
user_id TEXT,
input_tokens INTEGER NOT NULL DEFAULT 0,
output_tokens INTEGER NOT NULL DEFAULT 0,
cost_usd NUMERIC(12, 6),
PRIMARY KEY (id, timestamp)
) PARTITION BY RANGE (timestamp);

-- Create monthly partitions
CREATE TABLE events_2026_04 PARTITION OF ai_decision_events
FOR VALUES FROM ('2026-04-01') TO ('2026-05-01');
CREATE TABLE events_2026_05 PARTITION OF ai_decision_events
FOR VALUES FROM ('2026-05-01') TO ('2026-06-01');

Analytics Queries

Governance Summary Dashboard

-- Daily governance summary
SELECT
date_trunc('day', timestamp) AS day,
decision,
COUNT(*) AS event_count,
SUM(cost_usd) AS total_cost,
AVG(latency_ms) AS avg_latency_ms,
SUM(input_tokens + output_tokens) AS total_tokens
FROM ai_decision_events
WHERE timestamp >= NOW() - INTERVAL '30 days'
GROUP BY day, decision
ORDER BY day DESC, decision;

Policy Effectiveness Report

-- Which policies block the most requests?
SELECT
pe.policy_name,
pe.result,
COUNT(*) AS evaluation_count,
COUNT(*) FILTER (WHERE pe.result = 'block') AS block_count,
ROUND(
100.0 * COUNT(*) FILTER (WHERE pe.result = 'block') / COUNT(*),
2
) AS block_rate_pct
FROM policy_evaluations pe
JOIN ai_decision_events e ON pe.event_id = e.id
WHERE e.timestamp >= NOW() - INTERVAL '7 days'
GROUP BY pe.policy_name, pe.result
ORDER BY block_count DESC;

Spend by Team and Model

-- Token spend breakdown by team and model
SELECT
COALESCE(team_id, 'unassigned') AS team,
model,
COUNT(*) AS request_count,
SUM(input_tokens) AS total_input_tokens,
SUM(output_tokens) AS total_output_tokens,
SUM(cost_usd) AS total_cost_usd
FROM ai_decision_events
WHERE timestamp >= NOW() - INTERVAL '30 days'
AND decision = 'allowed'
GROUP BY team_id, model
ORDER BY total_cost_usd DESC;

Anomaly Detection Query

-- Detect unusual block rate spikes (hourly)
WITH hourly_stats AS (
SELECT
date_trunc('hour', timestamp) AS hour,
COUNT(*) AS total,
COUNT(*) FILTER (WHERE decision = 'blocked') AS blocked
FROM ai_decision_events
WHERE timestamp >= NOW() - INTERVAL '48 hours'
GROUP BY hour
),
baseline AS (
SELECT
AVG(blocked::FLOAT / NULLIF(total, 0)) AS avg_block_rate,
STDDEV(blocked::FLOAT / NULLIF(total, 0)) AS stddev_block_rate
FROM hourly_stats
)
SELECT
h.hour,
h.total,
h.blocked,
ROUND((h.blocked::FLOAT / NULLIF(h.total, 0))::NUMERIC, 4) AS block_rate,
CASE
WHEN (h.blocked::FLOAT / NULLIF(h.total, 0)) >
b.avg_block_rate + 2 * b.stddev_block_rate
THEN 'ANOMALY'
ELSE 'normal'
END AS status
FROM hourly_stats h, baseline b
ORDER BY h.hour DESC;

Time-Series AI Metrics

Track governance metrics over time for trend analysis:

-- Materialized view for time-series metrics
CREATE MATERIALIZED VIEW governance_metrics_hourly AS
SELECT
date_trunc('hour', timestamp) AS hour,
gateway_id,
model,
COUNT(*) AS total_requests,
COUNT(*) FILTER (WHERE decision = 'allowed') AS allowed,
COUNT(*) FILTER (WHERE decision = 'blocked') AS blocked,
COUNT(*) FILTER (WHERE decision = 'escalated') AS escalated,
SUM(cost_usd) AS total_cost,
PERCENTILE_CONT(0.50) WITHIN GROUP (ORDER BY latency_ms) AS p50_latency,
PERCENTILE_CONT(0.99) WITHIN GROUP (ORDER BY latency_ms) AS p99_latency,
SUM(input_tokens) AS total_input_tokens,
SUM(output_tokens) AS total_output_tokens
FROM ai_decision_events
GROUP BY hour, gateway_id, model;

-- Refresh periodically
REFRESH MATERIALIZED VIEW CONCURRENTLY governance_metrics_hourly;

Export to Data Warehouse

Using the Exports API

# Schedule a recurring export to S3
curl -X POST https://api.keeptrusts.example/v1/exports \
-H "Authorization: Bearer $API_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"type": "events",
"format": "parquet",
"destination": {
"type": "s3",
"bucket": "analytics-datalake",
"prefix": "keeptrusts/events/",
"region": "us-east-1"
},
"filter": {
"start_time": "2026-04-01T00:00:00Z",
"end_time": "2026-05-01T00:00:00Z"
}
}'

Loading into a Data Warehouse

-- Snowflake: load exported Parquet files
COPY INTO analytics.governance.decision_events
FROM @analytics.governance.keeptrusts_stage/events/
FILE_FORMAT = (TYPE = PARQUET)
MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE;

-- BigQuery: load from GCS
LOAD DATA INTO analytics.governance_events
FROM FILES (
format = 'PARQUET',
uris = ['gs://analytics-bucket/keeptrusts/events/*.parquet']
);

Retention and Archival

Manage data lifecycle for compliance:

-- Archive old events to cold storage before deletion
INSERT INTO ai_decision_events_archive
SELECT * FROM ai_decision_events
WHERE timestamp < NOW() - INTERVAL '90 days';

-- Drop old partitions
DROP TABLE IF EXISTS events_2026_01;

The Keeptrusts API supports configurable event retention via KEEPTRUSTS_EVENT_RETENTION_HOURS. A background worker automatically prunes events past the retention window.

Key Takeaways

  • Mirror the event schema with proper indexes for timestamp, decision, gateway_id, and user_id
  • Partition event tables by time for high-volume deployments
  • Use materialized views for time-series governance metrics
  • Export to data warehouses in Parquet format for integration with existing analytics tools
  • Implement retention policies aligned with your compliance requirements and the API's built-in retention worker

For AI systems

  • Canonical terms: decision events, ai_decision_events table, policy_evaluations table, time-based partitioning, materialized views, Parquet export, kt export create, KEEPTRUSTS_EVENT_RETENTION_HOURS, event retention worker
  • Key queries: GET /v1/events, POST /v1/exports, governance summary by decision/model/team
  • Best next pages: Event-Driven AI Architecture, Observability Patterns, Capacity Planning

For engineers

  • Mirror the event schema with indexes on timestamp DESC, decision, gateway_id, and user_id for common query patterns
  • Partition by month for >100K events/day: PARTITION BY RANGE (timestamp)
  • Use materialized views for time-series metrics (hourly/daily aggregations) to avoid expensive real-time aggregation
  • Export to warehouses in Parquet format via kt export create --format parquet --delivery s3://bucket/
  • Align retention with the API’s built-in retention worker (KEEPTRUSTS_EVENT_RETENTION_HOURS)

For leaders

  • Governance event data is a compliance asset — proper schema design enables audit readiness and reduces evidence-gathering time from days to minutes
  • Time-series analytics on AI interactions provide leading indicators for cost trends, risk patterns, and team adoption
  • Data warehouse exports integrate governance metrics into existing BI tools without custom engineering

Next steps