llm-streaming

Deliver LLM responses in real-time for better UX.

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "llm-streaming" with this command: npx skills add yonatangross/orchestkit/yonatangross-orchestkit-llm-streaming

LLM Streaming

Deliver LLM responses in real-time for better UX.

Basic Streaming (OpenAI)

from openai import OpenAI

client = OpenAI()

async def stream_response(prompt: str): """Stream tokens as they're generated.""" stream = client.chat.completions.create( model="gpt-5.2", messages=[{"role": "user", "content": prompt}], stream=True )

for chunk in stream:
    if chunk.choices[0].delta.content:
        yield chunk.choices[0].delta.content

Streaming with Async

from openai import AsyncOpenAI

client = AsyncOpenAI()

async def async_stream(prompt: str): """Async streaming for better concurrency.""" stream = await client.chat.completions.create( model="gpt-5.2", messages=[{"role": "user", "content": prompt}], stream=True )

async for chunk in stream:
    if chunk.choices[0].delta.content:
        yield chunk.choices[0].delta.content

FastAPI SSE Endpoint

from fastapi import FastAPI from fastapi.responses import StreamingResponse from sse_starlette.sse import EventSourceResponse

app = FastAPI()

@app.get("/chat/stream") async def stream_chat(prompt: str): """Server-Sent Events endpoint for streaming.""" async def generate(): async for token in async_stream(prompt): yield { "event": "token", "data": token } yield {"event": "done", "data": ""}

return EventSourceResponse(generate())

Frontend SSE Consumer

async function streamChat(prompt: string, onToken: (t: string) => void) { const response = await fetch("/chat/stream?prompt=" + encodeURIComponent(prompt)); const reader = response.body?.getReader(); const decoder = new TextDecoder();

while (reader) { const { done, value } = await reader.read(); if (done) break;

const text = decoder.decode(value);
const lines = text.split('\n');

for (const line of lines) {
  if (line.startsWith('data: ')) {
    const data = line.slice(6);
    if (data !== '[DONE]') {
      onToken(data);
    }
  }
}

} }

// Usage let fullResponse = ''; await streamChat('Hello', (token) => { fullResponse += token; setDisplayText(fullResponse); // Update UI incrementally });

Streaming with Tool Calls

async def stream_with_tools(messages: list, tools: list): """Handle streaming responses that include tool calls.""" stream = await client.chat.completions.create( model="gpt-5.2", messages=messages, tools=tools, stream=True )

collected_content = ""
collected_tool_calls = []

async for chunk in stream:
    delta = chunk.choices[0].delta

    # Collect content tokens
    if delta.content:
        collected_content += delta.content
        yield {"type": "content", "data": delta.content}

    # Collect tool call chunks
    if delta.tool_calls:
        for tc in delta.tool_calls:
            # Tool calls come in chunks, accumulate them
            if tc.index >= len(collected_tool_calls):
                collected_tool_calls.append({
                    "id": tc.id,
                    "function": {"name": "", "arguments": ""}
                })

            if tc.function.name:
                collected_tool_calls[tc.index]["function"]["name"] += tc.function.name
            if tc.function.arguments:
                collected_tool_calls[tc.index]["function"]["arguments"] += tc.function.arguments

# If tool calls, execute them
if collected_tool_calls:
    yield {"type": "tool_calls", "data": collected_tool_calls}

Backpressure Handling

import asyncio

async def stream_with_backpressure(prompt: str, max_buffer: int = 100): """Handle slow consumers with backpressure.""" buffer = asyncio.Queue(maxsize=max_buffer)

async def producer():
    async for token in async_stream(prompt):
        await buffer.put(token)  # Blocks if buffer full
    await buffer.put(None)  # Signal completion

async def consumer():
    while True:
        token = await buffer.get()
        if token is None:
            break
        yield token
        await asyncio.sleep(0)  # Yield control

# Start producer in background
asyncio.create_task(producer())

# Return consumer generator
async for token in consumer():
    yield token

Key Decisions

Decision Recommendation

Protocol SSE for web, WebSocket for bidirectional

Buffer size 50-200 tokens

Timeout 30-60s for long responses

Retry Reconnect on disconnect

Common Mistakes

  • No timeout (hangs on network issues)

  • Missing error handling in stream

  • Not closing connections properly

  • Buffering entire response (defeats purpose)

Related Skills

  • streaming-api-patterns

  • SSE/WebSocket deep dive

  • function-calling

  • Tool calls in streams

  • react-streaming-ui

  • React streaming components

Capability Details

token-streaming

Keywords: streaming, token, stream response, real-time, incremental Solves:

  • Stream tokens as they're generated

  • Display real-time LLM output

  • Reduce time to first byte

sse-responses

Keywords: SSE, Server-Sent Events, event stream, text/event-stream Solves:

  • Implement SSE for streaming

  • Handle SSE reconnection

  • Parse SSE event data

streaming-with-tools

Keywords: stream tools, tool streaming, function call stream Solves:

  • Stream responses with tool calls

  • Handle partial tool call data

  • Coordinate streaming and tool execution

partial-json-parsing

Keywords: partial JSON, incremental parse, streaming JSON Solves:

  • Parse JSON as it streams

  • Handle incomplete JSON safely

  • Display partial structured data

stream-cancellation

Keywords: cancel, abort, stop stream, AbortController Solves:

  • Cancel ongoing streams

  • Handle user interrupts

  • Clean up stream resources

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

General

responsive-patterns

No summary provided by upstream source.

Repository SourceNeeds Review
General

domain-driven-design

No summary provided by upstream source.

Repository SourceNeeds Review
General

dashboard-patterns

No summary provided by upstream source.

Repository SourceNeeds Review