LLM Streaming
Deliver LLM responses in real-time for better UX.
Basic Streaming (OpenAI)
from openai import OpenAI
client = OpenAI()
async def stream_response(prompt: str): """Stream tokens as they're generated.""" stream = client.chat.completions.create( model="gpt-5.2", messages=[{"role": "user", "content": prompt}], stream=True )
for chunk in stream:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
Streaming with Async
from openai import AsyncOpenAI
client = AsyncOpenAI()
async def async_stream(prompt: str): """Async streaming for better concurrency.""" stream = await client.chat.completions.create( model="gpt-5.2", messages=[{"role": "user", "content": prompt}], stream=True )
async for chunk in stream:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
FastAPI SSE Endpoint
from fastapi import FastAPI from fastapi.responses import StreamingResponse from sse_starlette.sse import EventSourceResponse
app = FastAPI()
@app.get("/chat/stream") async def stream_chat(prompt: str): """Server-Sent Events endpoint for streaming.""" async def generate(): async for token in async_stream(prompt): yield { "event": "token", "data": token } yield {"event": "done", "data": ""}
return EventSourceResponse(generate())
Frontend SSE Consumer
async function streamChat(prompt: string, onToken: (t: string) => void) { const response = await fetch("/chat/stream?prompt=" + encodeURIComponent(prompt)); const reader = response.body?.getReader(); const decoder = new TextDecoder();
while (reader) { const { done, value } = await reader.read(); if (done) break;
const text = decoder.decode(value);
const lines = text.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data !== '[DONE]') {
onToken(data);
}
}
}
} }
// Usage let fullResponse = ''; await streamChat('Hello', (token) => { fullResponse += token; setDisplayText(fullResponse); // Update UI incrementally });
Streaming with Tool Calls
async def stream_with_tools(messages: list, tools: list): """Handle streaming responses that include tool calls.""" stream = await client.chat.completions.create( model="gpt-5.2", messages=messages, tools=tools, stream=True )
collected_content = ""
collected_tool_calls = []
async for chunk in stream:
delta = chunk.choices[0].delta
# Collect content tokens
if delta.content:
collected_content += delta.content
yield {"type": "content", "data": delta.content}
# Collect tool call chunks
if delta.tool_calls:
for tc in delta.tool_calls:
# Tool calls come in chunks, accumulate them
if tc.index >= len(collected_tool_calls):
collected_tool_calls.append({
"id": tc.id,
"function": {"name": "", "arguments": ""}
})
if tc.function.name:
collected_tool_calls[tc.index]["function"]["name"] += tc.function.name
if tc.function.arguments:
collected_tool_calls[tc.index]["function"]["arguments"] += tc.function.arguments
# If tool calls, execute them
if collected_tool_calls:
yield {"type": "tool_calls", "data": collected_tool_calls}
Backpressure Handling
import asyncio
async def stream_with_backpressure(prompt: str, max_buffer: int = 100): """Handle slow consumers with backpressure.""" buffer = asyncio.Queue(maxsize=max_buffer)
async def producer():
async for token in async_stream(prompt):
await buffer.put(token) # Blocks if buffer full
await buffer.put(None) # Signal completion
async def consumer():
while True:
token = await buffer.get()
if token is None:
break
yield token
await asyncio.sleep(0) # Yield control
# Start producer in background
asyncio.create_task(producer())
# Return consumer generator
async for token in consumer():
yield token
Key Decisions
Decision Recommendation
Protocol SSE for web, WebSocket for bidirectional
Buffer size 50-200 tokens
Timeout 30-60s for long responses
Retry Reconnect on disconnect
Common Mistakes
-
No timeout (hangs on network issues)
-
Missing error handling in stream
-
Not closing connections properly
-
Buffering entire response (defeats purpose)
Related Skills
-
streaming-api-patterns
-
SSE/WebSocket deep dive
-
function-calling
-
Tool calls in streams
-
react-streaming-ui
-
React streaming components
Capability Details
token-streaming
Keywords: streaming, token, stream response, real-time, incremental Solves:
-
Stream tokens as they're generated
-
Display real-time LLM output
-
Reduce time to first byte
sse-responses
Keywords: SSE, Server-Sent Events, event stream, text/event-stream Solves:
-
Implement SSE for streaming
-
Handle SSE reconnection
-
Parse SSE event data
streaming-with-tools
Keywords: stream tools, tool streaming, function call stream Solves:
-
Stream responses with tool calls
-
Handle partial tool call data
-
Coordinate streaming and tool execution
partial-json-parsing
Keywords: partial JSON, incremental parse, streaming JSON Solves:
-
Parse JSON as it streams
-
Handle incomplete JSON safely
-
Display partial structured data
stream-cancellation
Keywords: cancel, abort, stop stream, AbortController Solves:
-
Cancel ongoing streams
-
Handle user interrupts
-
Clean up stream resources