Governing AI in Data Pipelines
Data pipelines that incorporate LLM calls need the same governance controls as real-time APIs. This guide covers patterns for routing batch AI processing through the Keeptrusts gateway, scoring output quality, and handling failures gracefully.
Use this page when
- You are routing batch AI processing (ETL, classification, summarization) through the gateway
- You need concurrency control and retry patterns for high-volume LLM batch jobs
- You want to use
action: flagfor output policies to record without blocking in batch scenarios - You are scoring AI-generated output quality before loading into target systems
Primary audience
- Primary: Technical Engineers
- Secondary: AI Agents, Technical Leaders
Batch Processing Through the Gateway
The gateway handles batch workloads the same way it handles interactive requests. Each call is individually evaluated against the policy chain:
Batch Client Implementation
# batch_processor.py
import asyncio
import httpx
from dataclasses import dataclass
GATEWAY_URL = "http://localhost:41002"
@dataclass
class BatchResult:
record_id: str
status: str # "success", "blocked", "error"
response: dict | None = None
error: str | None = None
async def process_batch(
records: list[dict],
concurrency: int = 10,
gateway_key: str = "",
) -> list[BatchResult]:
"""Process records through the governed gateway with concurrency control."""
semaphore = asyncio.Semaphore(concurrency)
results = []
async def process_one(record: dict) -> BatchResult:
async with semaphore:
try:
async with httpx.AsyncClient(timeout=60.0) as client:
resp = await client.post(
f"{GATEWAY_URL}/v1/chat/completions",
json={
"model": "gpt-4o-mini",
"messages": [
{"role": "system", "content": record["prompt"]},
{"role": "user", "content": record["content"]},
],
},
headers={
"Authorization": f"Bearer {gateway_key}",
"X-Batch-Id": record["batch_id"],
"X-Record-Id": record["id"],
},
)
if resp.status_code == 409:
return BatchResult(
record_id=record["id"],
status="blocked",
error=resp.json().get("error", {}).get("message"),
)
resp.raise_for_status()
return BatchResult(
record_id=record["id"],
status="success",
response=resp.json(),
)
except httpx.HTTPStatusError as e:
return BatchResult(
record_id=record["id"],
status="error",
error=str(e),
)
tasks = [process_one(r) for r in records]
results = await asyncio.gather(*tasks)
return list(results)
ETL with Governed LLM Calls
Integrate governed AI calls into standard ETL stages:
# etl_pipeline.py
async def extract(source_config: dict) -> list[dict]:
"""Extract raw records from source system."""
# ... database query or file read
return records
async def transform_with_ai(records: list[dict], gateway_key: str) -> list[dict]:
"""Transform records using governed LLM calls."""
results = await process_batch(records, concurrency=10, gateway_key=gateway_key)
transformed = []
blocked = []
errors = []
for result in results:
if result.status == "success":
transformed.append({
"record_id": result.record_id,
"ai_output": result.response["choices"][0]["message"]["content"],
"processed_at": datetime.utcnow().isoformat(),
})
elif result.status == "blocked":
blocked.append({
"record_id": result.record_id,
"reason": result.error,
})
else:
errors.append({
"record_id": result.record_id,
"error": result.error,
})
# Log governance outcomes
logger.info(
f"Batch complete: {len(transformed)} processed, "
f"{len(blocked)} blocked, {len(errors)} errors"
)
return transformed
async def load(records: list[dict], target_config: dict):
"""Load transformed records into target system."""
# ... database insert or API call
pass
Gateway Configuration for Batch Workloads
Tune the gateway for throughput-oriented batch processing:
# policy-config.yaml — batch-optimized
gateway:
port: 41002
secret_key_ref:
env: OPENAI_API_KEY
policies:
- name: batch-classification
input:
- type: content_safety
action: block
categories: [hate, violence, self_harm]
- type: pii_detection
action: redact
entities: [ssn, credit_card]
output:
- type: content_safety
action: flag
Use action: flag on output policies for batch workloads where you want to record violations without blocking. Review flagged records in post-processing.
Data Quality Scoring
Score AI-generated outputs before loading into your target system:
# quality_scorer.py
from dataclasses import dataclass
@dataclass
class QualityScore:
completeness: float # 0.0 - 1.0
consistency: float
governance_clean: bool # No policy violations
confidence: float
def score_output(record: dict, batch_result: BatchResult) -> QualityScore:
"""Score the quality of a governed AI output."""
content = ""
if batch_result.response:
content = batch_result.response["choices"][0]["message"]["content"]
return QualityScore(
completeness=1.0 if len(content) > 50 else len(content) / 50,
consistency=check_format_consistency(content, record.get("expected_format")),
governance_clean=batch_result.status == "success",
confidence=extract_confidence(content),
)
def filter_by_quality(
results: list[tuple[dict, BatchResult]],
min_completeness: float = 0.8,
min_consistency: float = 0.7,
) -> list[dict]:
"""Filter batch outputs by quality thresholds."""
passed = []
for record, result in results:
score = score_output(record, result)
if (score.completeness >= min_completeness
and score.consistency >= min_consistency
and score.governance_clean):
passed.append(record)
return passed
Pipeline Retry Patterns
Handle transient failures with exponential backoff:
# retry.py
import asyncio
import random
async def retry_with_backoff(
fn,
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 30.0,
):
"""Retry a batch operation with exponential backoff and jitter."""
for attempt in range(max_retries + 1):
try:
return await fn()
except TransientError:
if attempt == max_retries:
raise
delay = min(base_delay * (2 ** attempt), max_delay)
jitter = random.uniform(0, delay * 0.1)
await asyncio.sleep(delay + jitter)
Classify failures for retry decisions:
| Failure Type | HTTP Status | Retry? | Action |
|---|---|---|---|
| Policy block | 409 | No | Log and skip — policy won't change on retry |
| Rate limit | 429 | Yes | Backoff with Retry-After header |
| Provider error | 502/503 | Yes | Exponential backoff |
| Timeout | N/A | Yes | Increase timeout, reduce concurrency |
| Auth failure | 401/403 | No | Fix credentials, halt pipeline |
Monitoring Batch Pipelines
Track batch governance metrics through the events API:
# Query batch processing summary
curl -s "https://api.keeptrusts.example/v1/events?batch_id=batch_20260423&limit=0" \
-H "Authorization: Bearer $API_TOKEN" | jq '{
total: .meta.total,
by_decision: [.data[] | .decision] | group_by(.) | map({(.[0]): length}) | add
}'
Key Takeaways
- Route every batch LLM call through the gateway — batch workloads need governance too
- Use concurrency semaphores to control throughput and avoid overwhelming the gateway
- Use
action: flagfor output policies in batch scenarios to record without blocking - Score AI outputs for quality before loading into target systems
- Never retry
409policy blocks — the policy evaluation is deterministic - Monitor batch governance outcomes through the events API and console dashboard
For AI systems
- Canonical terms: batch processing, concurrency semaphore,
action: flag,action: block,X-Batch-Id,X-Record-Id, ETL governance, quality scoring, retry with backoff, policy-config.yaml - Key pattern: Never retry
409policy blocks — the evaluation is deterministic. Retry only429(rate limit) and5xx(transient) - Best next pages: Capacity Planning for AI Workloads, Resilience Engineering, Observability Patterns
For engineers
- Use
asyncio.Semaphore(concurrency)to control throughput — start with 10 concurrent requests and tune based on gateway/provider capacity - Pass
X-Batch-IdandX-Record-Idheaders for traceability through the events API - Use
action: flagon output policies for batch workloads where you want to record violations without blocking - Query batch outcomes:
GET /v1/events?batch_id=<id>&limit=0to get total counts by decision - Quality scoring: filter outputs by completeness ≥ 0.8 and consistency ≥ 0.7 before loading
For leaders
- Batch AI workloads (data enrichment, classification, summarization) need the same governance controls as real-time APIs to meet compliance requirements
action: flagmode enables observation without blocking — useful during initial rollout to measure policy impact before enforcement- Concurrency controls prevent batch jobs from exhausting provider rate limits that interactive users depend on
Next steps
- Capacity Planning for AI Workloads — size the gateway for batch throughput
- Resilience Engineering for AI Services — retry strategies and circuit breakers
- Observability for AI-Governed Systems — monitor batch governance metrics
- Database Patterns for AI-Augmented Systems — store and query batch results