Skip to main content
Browse docs
By Audience
Getting Started
Configuration
Use Cases
IDE Integration
Third-Party Integrations
Engineering Cache
Console
API Reference
Gateway
Workflow Guides
Templates
Providers and SDKs
Industry Guides
Advanced Guides
Browse by Role
Deployment Guides
In-Depth Guides
Tutorials
FAQ

Governing AI in Data Pipelines

Data pipelines that incorporate LLM calls need the same governance controls as real-time APIs. This guide covers patterns for routing batch AI processing through the Keeptrusts gateway, scoring output quality, and handling failures gracefully.

Use this page when

  • You are routing batch AI processing (ETL, classification, summarization) through the gateway
  • You need concurrency control and retry patterns for high-volume LLM batch jobs
  • You want to use action: flag for output policies to record without blocking in batch scenarios
  • You are scoring AI-generated output quality before loading into target systems

Primary audience

  • Primary: Technical Engineers
  • Secondary: AI Agents, Technical Leaders

Batch Processing Through the Gateway

The gateway handles batch workloads the same way it handles interactive requests. Each call is individually evaluated against the policy chain:

Batch Client Implementation

# batch_processor.py
import asyncio
import httpx
from dataclasses import dataclass

GATEWAY_URL = "http://localhost:41002"

@dataclass
class BatchResult:
record_id: str
status: str # "success", "blocked", "error"
response: dict | None = None
error: str | None = None

async def process_batch(
records: list[dict],
concurrency: int = 10,
gateway_key: str = "",
) -> list[BatchResult]:
"""Process records through the governed gateway with concurrency control."""
semaphore = asyncio.Semaphore(concurrency)
results = []

async def process_one(record: dict) -> BatchResult:
async with semaphore:
try:
async with httpx.AsyncClient(timeout=60.0) as client:
resp = await client.post(
f"{GATEWAY_URL}/v1/chat/completions",
json={
"model": "gpt-4o-mini",
"messages": [
{"role": "system", "content": record["prompt"]},
{"role": "user", "content": record["content"]},
],
},
headers={
"Authorization": f"Bearer {gateway_key}",
"X-Batch-Id": record["batch_id"],
"X-Record-Id": record["id"],
},
)
if resp.status_code == 409:
return BatchResult(
record_id=record["id"],
status="blocked",
error=resp.json().get("error", {}).get("message"),
)
resp.raise_for_status()
return BatchResult(
record_id=record["id"],
status="success",
response=resp.json(),
)
except httpx.HTTPStatusError as e:
return BatchResult(
record_id=record["id"],
status="error",
error=str(e),
)

tasks = [process_one(r) for r in records]
results = await asyncio.gather(*tasks)
return list(results)

ETL with Governed LLM Calls

Integrate governed AI calls into standard ETL stages:

# etl_pipeline.py
async def extract(source_config: dict) -> list[dict]:
"""Extract raw records from source system."""
# ... database query or file read
return records

async def transform_with_ai(records: list[dict], gateway_key: str) -> list[dict]:
"""Transform records using governed LLM calls."""
results = await process_batch(records, concurrency=10, gateway_key=gateway_key)

transformed = []
blocked = []
errors = []

for result in results:
if result.status == "success":
transformed.append({
"record_id": result.record_id,
"ai_output": result.response["choices"][0]["message"]["content"],
"processed_at": datetime.utcnow().isoformat(),
})
elif result.status == "blocked":
blocked.append({
"record_id": result.record_id,
"reason": result.error,
})
else:
errors.append({
"record_id": result.record_id,
"error": result.error,
})

# Log governance outcomes
logger.info(
f"Batch complete: {len(transformed)} processed, "
f"{len(blocked)} blocked, {len(errors)} errors"
)
return transformed

async def load(records: list[dict], target_config: dict):
"""Load transformed records into target system."""
# ... database insert or API call
pass

Gateway Configuration for Batch Workloads

Tune the gateway for throughput-oriented batch processing:

# policy-config.yaml — batch-optimized
gateway:
port: 41002
secret_key_ref:
env: OPENAI_API_KEY

policies:
- name: batch-classification
input:
- type: content_safety
action: block
categories: [hate, violence, self_harm]
- type: pii_detection
action: redact
entities: [ssn, credit_card]
output:
- type: content_safety
action: flag

Use action: flag on output policies for batch workloads where you want to record violations without blocking. Review flagged records in post-processing.

Data Quality Scoring

Score AI-generated outputs before loading into your target system:

# quality_scorer.py
from dataclasses import dataclass

@dataclass
class QualityScore:
completeness: float # 0.0 - 1.0
consistency: float
governance_clean: bool # No policy violations
confidence: float

def score_output(record: dict, batch_result: BatchResult) -> QualityScore:
"""Score the quality of a governed AI output."""
content = ""
if batch_result.response:
content = batch_result.response["choices"][0]["message"]["content"]

return QualityScore(
completeness=1.0 if len(content) > 50 else len(content) / 50,
consistency=check_format_consistency(content, record.get("expected_format")),
governance_clean=batch_result.status == "success",
confidence=extract_confidence(content),
)

def filter_by_quality(
results: list[tuple[dict, BatchResult]],
min_completeness: float = 0.8,
min_consistency: float = 0.7,
) -> list[dict]:
"""Filter batch outputs by quality thresholds."""
passed = []
for record, result in results:
score = score_output(record, result)
if (score.completeness >= min_completeness
and score.consistency >= min_consistency
and score.governance_clean):
passed.append(record)
return passed

Pipeline Retry Patterns

Handle transient failures with exponential backoff:

# retry.py
import asyncio
import random

async def retry_with_backoff(
fn,
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 30.0,
):
"""Retry a batch operation with exponential backoff and jitter."""
for attempt in range(max_retries + 1):
try:
return await fn()
except TransientError:
if attempt == max_retries:
raise
delay = min(base_delay * (2 ** attempt), max_delay)
jitter = random.uniform(0, delay * 0.1)
await asyncio.sleep(delay + jitter)

Classify failures for retry decisions:

Failure TypeHTTP StatusRetry?Action
Policy block409NoLog and skip — policy won't change on retry
Rate limit429YesBackoff with Retry-After header
Provider error502/503YesExponential backoff
TimeoutN/AYesIncrease timeout, reduce concurrency
Auth failure401/403NoFix credentials, halt pipeline

Monitoring Batch Pipelines

Track batch governance metrics through the events API:

# Query batch processing summary
curl -s "https://api.keeptrusts.example/v1/events?batch_id=batch_20260423&limit=0" \
-H "Authorization: Bearer $API_TOKEN" | jq '{
total: .meta.total,
by_decision: [.data[] | .decision] | group_by(.) | map({(.[0]): length}) | add
}'

Key Takeaways

  • Route every batch LLM call through the gateway — batch workloads need governance too
  • Use concurrency semaphores to control throughput and avoid overwhelming the gateway
  • Use action: flag for output policies in batch scenarios to record without blocking
  • Score AI outputs for quality before loading into target systems
  • Never retry 409 policy blocks — the policy evaluation is deterministic
  • Monitor batch governance outcomes through the events API and console dashboard

For AI systems

  • Canonical terms: batch processing, concurrency semaphore, action: flag, action: block, X-Batch-Id, X-Record-Id, ETL governance, quality scoring, retry with backoff, policy-config.yaml
  • Key pattern: Never retry 409 policy blocks — the evaluation is deterministic. Retry only 429 (rate limit) and 5xx (transient)
  • Best next pages: Capacity Planning for AI Workloads, Resilience Engineering, Observability Patterns

For engineers

  • Use asyncio.Semaphore(concurrency) to control throughput — start with 10 concurrent requests and tune based on gateway/provider capacity
  • Pass X-Batch-Id and X-Record-Id headers for traceability through the events API
  • Use action: flag on output policies for batch workloads where you want to record violations without blocking
  • Query batch outcomes: GET /v1/events?batch_id=<id>&limit=0 to get total counts by decision
  • Quality scoring: filter outputs by completeness ≥ 0.8 and consistency ≥ 0.7 before loading

For leaders

  • Batch AI workloads (data enrichment, classification, summarization) need the same governance controls as real-time APIs to meet compliance requirements
  • action: flag mode enables observation without blocking — useful during initial rollout to measure policy impact before enforcement
  • Concurrency controls prevent batch jobs from exhausting provider rate limits that interactive users depend on

Next steps