streaming

Implement real-time streaming responses from Claude API using Server-Sent Events (SSE).

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "streaming" with this command: npx skills add lobbi-docs/claude/lobbi-docs-claude-streaming

Streaming Skill

Implement real-time streaming responses from Claude API using Server-Sent Events (SSE).

When to Use This Skill

  • Real-time user interfaces

  • Long-running generations

  • Progressive output display

  • Tool use with streaming

  • Extended thinking visualization

SSE Event Flow

message_start → content_block_start → content_block_delta (repeated) → content_block_stop → (more blocks...) → message_delta → message_stop

Core Implementation

Basic Text Streaming (Python)

import anthropic

client = anthropic.Anthropic()

with client.messages.stream( model="claude-sonnet-4-20250514", max_tokens=1024, messages=[{"role": "user", "content": "Write a short story."}] ) as stream: for text in stream.text_stream: print(text, end="", flush=True)

Event-Based Streaming

with client.messages.stream( model="claude-sonnet-4-20250514", max_tokens=1024, messages=[{"role": "user", "content": "Hello"}] ) as stream: for event in stream: if event.type == "content_block_delta": if event.delta.type == "text_delta": print(event.delta.text, end="") elif event.delta.type == "input_json_delta": # Tool input (accumulate, don't parse yet!) tool_input_buffer += event.delta.partial_json elif event.type == "content_block_stop": # Now safe to parse tool input if tool_input_buffer: tool_input = json.loads(tool_input_buffer)

TypeScript Streaming

import Anthropic from '@anthropic-ai/sdk';

const client = new Anthropic();

const stream = client.messages.stream({ model: 'claude-sonnet-4-20250514', max_tokens: 1024, messages: [{ role: 'user', content: 'Write a story.' }] });

for await (const event of stream) { if (event.type === 'content_block_delta' && event.delta.type === 'text_delta') { process.stdout.write(event.delta.text); } }

const finalMessage = await stream.finalMessage();

Event Types Reference

Event When Data

message_start

Beginning Message metadata

content_block_start

Block begins Block type, index

content_block_delta

Content chunk Delta content

content_block_stop

Block ends

message_delta

Message update Stop reason, usage

message_stop

Complete

Delta Types

Delta Type Content When

text_delta

.text

Text content

input_json_delta

.partial_json

Tool input

thinking_delta

.thinking

Extended thinking

signature_delta

.signature

Thinking signature

Tool Use Streaming

Critical Rule: Never Parse JSON Mid-Stream!

WRONG - Will fail on partial JSON!

for event in stream: if event.delta.type == "input_json_delta": tool_input = json.loads(event.delta.partial_json) # FAILS!

CORRECT - Accumulate then parse

tool_json_buffer = "" for event in stream: if event.delta.type == "input_json_delta": tool_json_buffer += event.delta.partial_json elif event.type == "content_block_stop": if tool_json_buffer: tool_input = json.loads(tool_json_buffer) # Safe now! tool_json_buffer = ""

Complete Tool Streaming Pattern

def stream_with_tools(client, messages, tools): current_block = None tool_input_buffer = ""

with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=4096,
    messages=messages,
    tools=tools
) as stream:
    for event in stream:
        if event.type == "content_block_start":
            current_block = event.content_block
            tool_input_buffer = ""

        elif event.type == "content_block_delta":
            if event.delta.type == "text_delta":
                yield {"type": "text", "content": event.delta.text}
            elif event.delta.type == "input_json_delta":
                tool_input_buffer += event.delta.partial_json

        elif event.type == "content_block_stop":
            if current_block.type == "tool_use":
                yield {
                    "type": "tool_call",
                    "id": current_block.id,
                    "name": current_block.name,
                    "input": json.loads(tool_input_buffer)
                }

Extended Thinking Streaming

thinking_content = "" signature = ""

with client.messages.stream( model="claude-sonnet-4-20250514", max_tokens=16000, thinking={"type": "enabled", "budget_tokens": 10000}, messages=[{"role": "user", "content": "Solve this complex problem..."}] ) as stream: for event in stream: if event.type == "content_block_delta": if event.delta.type == "thinking_delta": thinking_content += event.delta.thinking # Optionally display thinking in UI elif event.delta.type == "signature_delta": signature = event.delta.signature elif event.delta.type == "text_delta": print(event.delta.text, end="")

Error Handling

Retriable Errors

import time

RETRIABLE_ERRORS = [529, 429, 500, 502, 503]

def stream_with_retry(client, **kwargs): max_retries = 3 base_delay = 1

for attempt in range(max_retries):
    try:
        with client.messages.stream(**kwargs) as stream:
            for event in stream:
                yield event
        return
    except anthropic.APIStatusError as e:
        if e.status_code in RETRIABLE_ERRORS and attempt < max_retries - 1:
            delay = base_delay * (2 ** attempt)
            time.sleep(delay)
        else:
            raise

Silent Overloaded Errors

CRITICAL: Check for error events even at HTTP 200!

for event in stream: if event.type == "error": if event.error.type == "overloaded_error": # Retry with backoff pass

Connection Management

Keep-Alive Configuration

import httpx

Proper timeout configuration

http_client = httpx.Client( timeout=httpx.Timeout( connect=10.0, # Connection timeout read=120.0, # Read timeout (long for streaming!) write=30.0, # Write timeout pool=30.0 # Pool timeout ) )

client = anthropic.Anthropic(http_client=http_client)

Connection Pooling

http_client = httpx.Client( limits=httpx.Limits( max_keepalive_connections=20, max_connections=100, keepalive_expiry=30.0 ) )

Best Practices

DO:

  • Set read timeout >= 60 seconds

  • Accumulate tool JSON, parse after block_stop

  • Handle error events even at HTTP 200

  • Use exponential backoff for retries

DON'T:

  • Parse partial JSON during streaming

  • Use short timeouts

  • Ignore overloaded_error events

  • Leave connections idle >5 minutes

UI Integration Pattern

async def stream_to_ui(websocket, prompt): """Stream Claude response to WebSocket client""" async with client.messages.stream( model="claude-sonnet-4-20250514", max_tokens=4096, messages=[{"role": "user", "content": prompt}] ) as stream: async for text in stream.text_stream: await websocket.send_json({ "type": "chunk", "content": text })

    await websocket.send_json({
        "type": "complete",
        "usage": stream.get_final_message().usage
    })

See Also

  • [[llm-integration]] - API basics

  • [[tool-use]] - Tool calling

  • [[extended-thinking]] - Deep reasoning

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

General

vision-multimodal

No summary provided by upstream source.

Repository SourceNeeds Review
General

design-system

No summary provided by upstream source.

Repository SourceNeeds Review
General

kanban

No summary provided by upstream source.

Repository SourceNeeds Review
General

gcp

No summary provided by upstream source.

Repository SourceNeeds Review