Streaming AI Responses Through the Gateway
The Keeptrusts gateway supports streaming for all OpenAI-compatible endpoints. This guide covers SSE mechanics, how output policies apply to streams, chunked transfer patterns, and edge cases you need to handle.
Use this page when
- You need to handle SSE streaming responses through the Keeptrusts gateway.
- You want to understand how output policies evaluate streamed content (assembled evaluation).
- You are handling mid-stream policy terminations gracefully in Python or TypeScript.
- You are configuring reverse proxies (nginx) to pass through SSE streams without buffering.
Primary audience
- Primary: Backend developers implementing streaming AI responses in production applications
- Secondary: DevOps Engineers configuring reverse proxies for streaming, Frontend developers consuming SSE streams
How Streaming Works Through the Gateway
Client → Gateway → Provider
↓ (SSE stream)
Client ← Gateway ← Provider
↑ policy evaluation on assembled output
- The client sends a request with
stream: true. - The gateway forwards the request to the upstream provider.
- The provider streams SSE chunks back.
- The gateway forwards each chunk to the client in real time.
- Output policies evaluate the assembled content as it accumulates.
- If a policy triggers, the gateway terminates the stream with an error event.
Python Streaming
Basic Stream Consumption
from openai import OpenAI
client = OpenAI(
base_url="http://localhost:41002/v1",
api_key="sk-...",
)
stream = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Explain zero-trust architecture."}],
stream=True,
)
for chunk in stream:
delta = chunk.choices[0].delta
if delta.content:
print(delta.content, end="", flush=True)
print()
Collecting Full Response
def stream_and_collect(prompt: str) -> str:
chunks: list[str] = []
stream = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
stream=True,
)
for chunk in stream:
content = chunk.choices[0].delta.content
if content:
chunks.append(content)
print(content, end="", flush=True)
print()
return "".join(chunks)
Async Streaming
import asyncio
from openai import AsyncOpenAI
client = AsyncOpenAI(
base_url="http://localhost:41002/v1",
api_key="sk-...",
)
async def stream_response(prompt: str) -> str:
chunks: list[str] = []
stream = await client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
stream=True,
)
async for chunk in stream:
content = chunk.choices[0].delta.content
if content:
chunks.append(content)
return "".join(chunks)
result = asyncio.run(stream_response("What is prompt injection?"))
TypeScript Streaming
Node.js with OpenAI SDK
import OpenAI from "openai";
const client = new OpenAI({
baseURL: process.env.LLM_GATEWAY_URL ?? "http://localhost:41002/v1",
apiKey: process.env.OPENAI_API_KEY,
});
async function streamResponse(prompt: string): Promise<string> {
const stream = await client.chat.completions.create({
model: "gpt-4o",
messages: [{ role: "user", content: prompt }],
stream: true,
});
const chunks: string[] = [];
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content;
if (content) {
process.stdout.write(content);
chunks.push(content);
}
}
return chunks.join("");
}
Next.js Server-Side Streaming
import { streamText } from "ai";
import { createOpenAI } from "@ai-sdk/openai";
const provider = createOpenAI({
baseURL: process.env.LLM_GATEWAY_URL ?? "http://localhost:41002/v1",
apiKey: process.env.OPENAI_API_KEY,
});
export async function POST(req: Request) {
const { messages } = await req.json();
const result = streamText({
model: provider("gpt-4o"),
messages,
});
return result.toDataStreamResponse();
}
Raw SSE with curl
For debugging, stream directly with curl:
curl -N http://localhost:41002/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{
"model": "gpt-4o",
"stream": true,
"messages": [{"role": "user", "content": "Hello, world!"}]
}'
Each SSE event arrives as:
data: {"id":"chatcmpl-...","choices":[{"delta":{"content":"Hello"},"index":0}]}
data: {"id":"chatcmpl-...","choices":[{"delta":{"content":"!"},"index":0}]}
data: [DONE]
Policy Evaluation on Streams
How Output Policies Apply
The gateway buffers the assembled output text as chunks arrive. Output policies (PII filters, content blockers, redaction rules) evaluate against this growing buffer.
Observe-mode policies log findings but do not interrupt the stream.
Block-mode policies terminate the stream when triggered:
data: {"id":"chatcmpl-...","choices":[{"delta":{"content":"The SSN is "},"index":0}]}
data: {"id":"chatcmpl-...","choices":[{"delta":{"content":"123-"},"index":0}]}
data: {"error":{"type":"policy_violation","message":"Stream terminated: PII detected","policy":"block-pii-output","code":"output_blocked"}}
data: [DONE]
Handling Mid-Stream Termination
from openai import APIStatusError
def safe_stream(prompt: str) -> str:
chunks: list[str] = []
try:
stream = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
stream=True,
)
for chunk in stream:
content = chunk.choices[0].delta.content
if content:
chunks.append(content)
except APIStatusError as e:
if e.status_code == 409:
error_body = e.response.json()
policy = error_body.get("error", {}).get("policy", "unknown")
chunks.append(f"\n\n[Stream terminated by policy: {policy}]")
else:
raise
return "".join(chunks)
TypeScript Mid-Stream Handling
async function safeStream(prompt: string): Promise<string> {
const chunks: string[] = [];
try {
const stream = await client.chat.completions.create({
model: "gpt-4o",
messages: [{ role: "user", content: prompt }],
stream: true,
});
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content;
if (content) chunks.push(content);
}
} catch (err) {
if (err instanceof OpenAI.APIError && err.status === 409) {
chunks.push("\n\n[Stream terminated by governance policy]");
} else {
throw err;
}
}
return chunks.join("");
}
Chunked Transfer Encoding
The gateway uses HTTP chunked transfer encoding for streaming responses. Key behaviors:
| Aspect | Behavior |
|---|---|
| Content-Type | text/event-stream |
| Transfer-Encoding | chunked |
| Connection | keep-alive |
| Policy eval | Runs on accumulated buffer |
| Termination | Error event + [DONE] |
Proxy Considerations
If you run a reverse proxy (nginx, Caddy) in front of the gateway, disable response buffering:
# nginx.conf
location /v1/ {
proxy_pass http://localhost:41002;
proxy_buffering off;
proxy_cache off;
proxy_set_header Connection '';
proxy_http_version 1.1;
chunked_transfer_encoding on;
}
Without proxy_buffering off, nginx may buffer the entire response before forwarding, defeating streaming.
Best Practices
- Always handle mid-stream termination — policy blocks can occur at any chunk boundary.
- Use
flush=Truewhen printing — ensures real-time output in terminals and logs. - Disable proxy buffering — reverse proxies must pass through chunks immediately.
- Set appropriate timeouts — streaming connections may stay open for minutes.
- Prefer SDK streaming over raw SSE — the OpenAI SDKs handle reconnection and parsing.
- Test output policies with long responses — PII patterns may only match after several chunks accumulate.
Next steps
- Error Handling — complete error envelope reference including stream errors
- OpenAI Python SDK Patterns — non-streaming patterns and async
- TypeScript SDK Patterns — Vercel AI SDK streaming integration
For AI systems
- Canonical terms: streaming, SSE (server-sent events), chunked transfer encoding,
stream: true, output policy evaluation, mid-stream termination,proxy_buffering off. - Flow: client → gateway → provider streams SSE chunks back → gateway evaluates assembled output → forwards chunks to client. Policy trigger terminates stream with error event.
- Best next pages: Error Handling, OpenAI Python SDK Patterns, TypeScript SDK Patterns.
For engineers
- Output policies evaluate assembled content as chunks accumulate — a policy can terminate the stream at any chunk boundary.
- Handle
APIStatusError(Python) or checkchunk.choices[0].finish_reasonfor mid-stream policy terminations. - Use
flush=Truewhen printing stream output to ensure real-time display in terminals. - Configure reverse proxies with
proxy_buffering off— without this, nginx buffers the entire response, defeating streaming. - Set appropriate timeouts; streaming connections may stay open for minutes during long responses.
- Prefer SDK streaming helpers over raw SSE parsing — the OpenAI SDKs handle reconnection and chunk parsing.
For leaders
- Streaming provides better user experience (perceived responsiveness) while maintaining full policy enforcement.
- Output policies evaluate in real-time — there is no governance gap between streaming and non-streaming responses.
- Mid-stream termination means users see partial safe content before a policy block, not a blank response.
- Reverse proxy configuration is a one-time infrastructure change; once set, streaming works transparently.