Skip to main content
Browse docs
By Audience
Getting Started
Configuration
Use Cases
IDE Integration
Third-Party Integrations
Engineering Cache
Console
API Reference
Gateway
Workflow Guides
Templates
Providers and SDKs
Industry Guides
Advanced Guides
Browse by Role
Deployment Guides
In-Depth Guides
Tutorials
FAQ

Streaming AI Responses Through the Gateway

The Keeptrusts gateway supports streaming for all OpenAI-compatible endpoints. This guide covers SSE mechanics, how output policies apply to streams, chunked transfer patterns, and edge cases you need to handle.

Use this page when

  • You need to handle SSE streaming responses through the Keeptrusts gateway.
  • You want to understand how output policies evaluate streamed content (assembled evaluation).
  • You are handling mid-stream policy terminations gracefully in Python or TypeScript.
  • You are configuring reverse proxies (nginx) to pass through SSE streams without buffering.

Primary audience

  • Primary: Backend developers implementing streaming AI responses in production applications
  • Secondary: DevOps Engineers configuring reverse proxies for streaming, Frontend developers consuming SSE streams

How Streaming Works Through the Gateway

Client → Gateway → Provider
↓ (SSE stream)
Client ← Gateway ← Provider
↑ policy evaluation on assembled output
  1. The client sends a request with stream: true.
  2. The gateway forwards the request to the upstream provider.
  3. The provider streams SSE chunks back.
  4. The gateway forwards each chunk to the client in real time.
  5. Output policies evaluate the assembled content as it accumulates.
  6. If a policy triggers, the gateway terminates the stream with an error event.

Python Streaming

Basic Stream Consumption

from openai import OpenAI

client = OpenAI(
base_url="http://localhost:41002/v1",
api_key="sk-...",
)

stream = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Explain zero-trust architecture."}],
stream=True,
)

for chunk in stream:
delta = chunk.choices[0].delta
if delta.content:
print(delta.content, end="", flush=True)
print()

Collecting Full Response

def stream_and_collect(prompt: str) -> str:
chunks: list[str] = []
stream = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
stream=True,
)
for chunk in stream:
content = chunk.choices[0].delta.content
if content:
chunks.append(content)
print(content, end="", flush=True)
print()
return "".join(chunks)

Async Streaming

import asyncio
from openai import AsyncOpenAI

client = AsyncOpenAI(
base_url="http://localhost:41002/v1",
api_key="sk-...",
)

async def stream_response(prompt: str) -> str:
chunks: list[str] = []
stream = await client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
stream=True,
)
async for chunk in stream:
content = chunk.choices[0].delta.content
if content:
chunks.append(content)
return "".join(chunks)

result = asyncio.run(stream_response("What is prompt injection?"))

TypeScript Streaming

Node.js with OpenAI SDK

import OpenAI from "openai";

const client = new OpenAI({
baseURL: process.env.LLM_GATEWAY_URL ?? "http://localhost:41002/v1",
apiKey: process.env.OPENAI_API_KEY,
});

async function streamResponse(prompt: string): Promise<string> {
const stream = await client.chat.completions.create({
model: "gpt-4o",
messages: [{ role: "user", content: prompt }],
stream: true,
});

const chunks: string[] = [];
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content;
if (content) {
process.stdout.write(content);
chunks.push(content);
}
}
return chunks.join("");
}

Next.js Server-Side Streaming

import { streamText } from "ai";
import { createOpenAI } from "@ai-sdk/openai";

const provider = createOpenAI({
baseURL: process.env.LLM_GATEWAY_URL ?? "http://localhost:41002/v1",
apiKey: process.env.OPENAI_API_KEY,
});

export async function POST(req: Request) {
const { messages } = await req.json();
const result = streamText({
model: provider("gpt-4o"),
messages,
});
return result.toDataStreamResponse();
}

Raw SSE with curl

For debugging, stream directly with curl:

curl -N http://localhost:41002/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{
"model": "gpt-4o",
"stream": true,
"messages": [{"role": "user", "content": "Hello, world!"}]
}'

Each SSE event arrives as:

data: {"id":"chatcmpl-...","choices":[{"delta":{"content":"Hello"},"index":0}]}

data: {"id":"chatcmpl-...","choices":[{"delta":{"content":"!"},"index":0}]}

data: [DONE]

Policy Evaluation on Streams

How Output Policies Apply

The gateway buffers the assembled output text as chunks arrive. Output policies (PII filters, content blockers, redaction rules) evaluate against this growing buffer.

Observe-mode policies log findings but do not interrupt the stream.

Block-mode policies terminate the stream when triggered:

data: {"id":"chatcmpl-...","choices":[{"delta":{"content":"The SSN is "},"index":0}]}

data: {"id":"chatcmpl-...","choices":[{"delta":{"content":"123-"},"index":0}]}

data: {"error":{"type":"policy_violation","message":"Stream terminated: PII detected","policy":"block-pii-output","code":"output_blocked"}}

data: [DONE]

Handling Mid-Stream Termination

from openai import APIStatusError

def safe_stream(prompt: str) -> str:
chunks: list[str] = []
try:
stream = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
stream=True,
)
for chunk in stream:
content = chunk.choices[0].delta.content
if content:
chunks.append(content)
except APIStatusError as e:
if e.status_code == 409:
error_body = e.response.json()
policy = error_body.get("error", {}).get("policy", "unknown")
chunks.append(f"\n\n[Stream terminated by policy: {policy}]")
else:
raise
return "".join(chunks)

TypeScript Mid-Stream Handling

async function safeStream(prompt: string): Promise<string> {
const chunks: string[] = [];
try {
const stream = await client.chat.completions.create({
model: "gpt-4o",
messages: [{ role: "user", content: prompt }],
stream: true,
});
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content;
if (content) chunks.push(content);
}
} catch (err) {
if (err instanceof OpenAI.APIError && err.status === 409) {
chunks.push("\n\n[Stream terminated by governance policy]");
} else {
throw err;
}
}
return chunks.join("");
}

Chunked Transfer Encoding

The gateway uses HTTP chunked transfer encoding for streaming responses. Key behaviors:

AspectBehavior
Content-Typetext/event-stream
Transfer-Encodingchunked
Connectionkeep-alive
Policy evalRuns on accumulated buffer
TerminationError event + [DONE]

Proxy Considerations

If you run a reverse proxy (nginx, Caddy) in front of the gateway, disable response buffering:

# nginx.conf
location /v1/ {
proxy_pass http://localhost:41002;
proxy_buffering off;
proxy_cache off;
proxy_set_header Connection '';
proxy_http_version 1.1;
chunked_transfer_encoding on;
}

Without proxy_buffering off, nginx may buffer the entire response before forwarding, defeating streaming.

Best Practices

  1. Always handle mid-stream termination — policy blocks can occur at any chunk boundary.
  2. Use flush=True when printing — ensures real-time output in terminals and logs.
  3. Disable proxy buffering — reverse proxies must pass through chunks immediately.
  4. Set appropriate timeouts — streaming connections may stay open for minutes.
  5. Prefer SDK streaming over raw SSE — the OpenAI SDKs handle reconnection and parsing.
  6. Test output policies with long responses — PII patterns may only match after several chunks accumulate.

Next steps

For AI systems

  • Canonical terms: streaming, SSE (server-sent events), chunked transfer encoding, stream: true, output policy evaluation, mid-stream termination, proxy_buffering off.
  • Flow: client → gateway → provider streams SSE chunks back → gateway evaluates assembled output → forwards chunks to client. Policy trigger terminates stream with error event.
  • Best next pages: Error Handling, OpenAI Python SDK Patterns, TypeScript SDK Patterns.

For engineers

  • Output policies evaluate assembled content as chunks accumulate — a policy can terminate the stream at any chunk boundary.
  • Handle APIStatusError (Python) or check chunk.choices[0].finish_reason for mid-stream policy terminations.
  • Use flush=True when printing stream output to ensure real-time display in terminals.
  • Configure reverse proxies with proxy_buffering off — without this, nginx buffers the entire response, defeating streaming.
  • Set appropriate timeouts; streaming connections may stay open for minutes during long responses.
  • Prefer SDK streaming helpers over raw SSE parsing — the OpenAI SDKs handle reconnection and chunk parsing.

For leaders

  • Streaming provides better user experience (perceived responsiveness) while maintaining full policy enforcement.
  • Output policies evaluate in real-time — there is no governance gap between streaming and non-streaming responses.
  • Mid-stream termination means users see partial safe content before a policy block, not a blank response.
  • Reverse proxy configuration is a one-time infrastructure change; once set, streaming works transparently.