batch-processing

Batch Processing Skill

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "batch-processing" with this command: npx skills add lobbi-docs/claude/lobbi-docs-claude-batch-processing

Batch Processing Skill

Leverage Anthropic's Message Batches API for 50% cost savings on bulk processing workloads.

When to Use This Skill

  • Processing 10K+ documents

  • Model evaluation and benchmarks

  • ETL and data enrichment

  • Training data generation

  • Bulk content analysis

  • Any non-time-sensitive workload

Cost Savings

Processing Type Cost

Standard API 100%

Batch API 50%

Example: 1M tokens @ $3/1M = $3 (standard) vs $1.50 (batch)

Batch Lifecycle

Created → Processing → Ended (completed/failed/expired/canceled)

  • Processing time: Up to 24 hours

  • Results retention: 29 days

  • Full feature support: Tools, vision, system prompts

Core Implementation

Step 1: Create JSONL File

import json

Each line is a complete request

requests = [ { "custom_id": "request-1", "params": { "model": "claude-sonnet-4-20250514", "max_tokens": 1024, "messages": [{"role": "user", "content": "Summarize: Document 1..."}] } }, { "custom_id": "request-2", "params": { "model": "claude-sonnet-4-20250514", "max_tokens": 1024, "messages": [{"role": "user", "content": "Summarize: Document 2..."}] } } ]

Write JSONL file

with open("batch_requests.jsonl", "w") as f: for request in requests: f.write(json.dumps(request) + "\n")

Step 2: Submit Batch

import anthropic

client = anthropic.Anthropic()

Create batch from file

with open("batch_requests.jsonl", "rb") as f: batch = client.beta.messages.batches.create( requests=f )

print(f"Batch ID: {batch.id}") print(f"Status: {batch.processing_status}")

Step 3: Poll for Completion

import time

def wait_for_batch(client, batch_id, poll_interval=60): """Poll until batch completes""" while True: batch = client.beta.messages.batches.retrieve(batch_id)

    if batch.processing_status == "ended":
        print(f"Batch completed!")
        print(f"  Succeeded: {batch.request_counts.succeeded}")
        print(f"  Errored: {batch.request_counts.errored}")
        print(f"  Expired: {batch.request_counts.expired}")
        return batch

    print(f"Status: {batch.processing_status} - waiting...")
    time.sleep(poll_interval)

Step 4: Stream Results

def process_results(client, batch_id): """Stream and process batch results""" results = {}

for result in client.beta.messages.batches.results(batch_id):
    custom_id = result.custom_id

    if result.result.type == "succeeded":
        message = result.result.message
        results[custom_id] = {
            "status": "success",
            "content": message.content[0].text,
            "usage": message.usage
        }
    elif result.result.type == "errored":
        results[custom_id] = {
            "status": "error",
            "error": result.result.error
        }
    elif result.result.type == "expired":
        results[custom_id] = {
            "status": "expired"
        }

return results

Complete Workflow

import anthropic import json import time

def run_batch_job(documents): """Complete batch processing workflow""" client = anthropic.Anthropic()

# 1. Create requests
requests = []
for i, doc in enumerate(documents):
    requests.append({
        "custom_id": f"doc-{i}",
        "params": {
            "model": "claude-sonnet-4-20250514",
            "max_tokens": 1024,
            "messages": [{"role": "user", "content": f"Summarize: {doc}"}]
        }
    })

# 2. Write JSONL
with open("batch.jsonl", "w") as f:
    for req in requests:
        f.write(json.dumps(req) + "\n")

# 3. Submit batch
with open("batch.jsonl", "rb") as f:
    batch = client.beta.messages.batches.create(requests=f)

print(f"Batch submitted: {batch.id}")

# 4. Wait for completion
while True:
    batch = client.beta.messages.batches.retrieve(batch.id)
    if batch.processing_status == "ended":
        break
    time.sleep(60)

# 5. Collect results
results = {}
for result in client.beta.messages.batches.results(batch.id):
    if result.result.type == "succeeded":
        results[result.custom_id] = result.result.message.content[0].text

return results

Error Handling

Retry Failed Requests

def retry_failed(client, original_batch_id): """Retry failed and expired requests""" retry_requests = []

for result in client.beta.messages.batches.results(original_batch_id):
    if result.result.type in ["errored", "expired"]:
        # Re-create the request (you'll need to store original params)
        retry_requests.append({
            "custom_id": result.custom_id,
            "params": get_original_params(result.custom_id)  # Your implementation
        })

if retry_requests:
    # Write and submit retry batch
    with open("retry.jsonl", "w") as f:
        for req in retry_requests:
            f.write(json.dumps(req) + "\n")

    with open("retry.jsonl", "rb") as f:
        return client.beta.messages.batches.create(requests=f)

return None

Error Types

Result Type Action

succeeded

Process normally

errored

Check error, may retry

expired

Request took >24h, retry

canceled

Batch was canceled

API Reference

Endpoints

Method Endpoint Purpose

POST /v1/messages/batches

Create batch

GET /v1/messages/batches/{id}

Get batch status

GET /v1/messages/batches

List batches

POST /v1/messages/batches/{id}/cancel

Cancel batch

GET /v1/messages/batches/{id}/results

Stream results

Request Format

{ "custom_id": "unique-identifier", "params": { "model": "claude-sonnet-4-20250514", "max_tokens": 1024, "system": "Optional system prompt", "messages": [{"role": "user", "content": "..."}], "tools": [], "temperature": 0.7 } }

Best Practices

DO:

  • Use for workloads >100 requests

  • Include unique custom_id for tracking

  • Store original params for retry logic

  • Monitor batch status via polling

  • Process results as streaming JSONL

DON'T:

  • Use for time-sensitive requests

  • Expect immediate results

  • Forget to handle expired requests

  • Ignore error results

Cost Optimization Tips

  • Batch similar requests - Same model, similar prompts

  • Combine with caching - Cache static context before batch

  • Use appropriate model - Haiku for simple tasks, Sonnet for complex

  • Optimize prompts - Shorter prompts = lower cost

Use Case: Document Processing

Process 10,000 documents at 50% cost

documents = load_documents("data/*.txt") # 10K files

Create batch with consistent format

requests = [{ "custom_id": doc.id, "params": { "model": "claude-haiku-4-20250514", # Cheapest for bulk "max_tokens": 512, "system": "Extract key information as JSON.", "messages": [{"role": "user", "content": doc.content}] } } for doc in documents]

Run batch job

results = run_batch_job(requests)

Cost: ~50% of standard API!

See Also

  • llm-integration - API basics

  • prompt-caching - Cache context

  • streaming - Real-time output

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

General

vision-multimodal

No summary provided by upstream source.

Repository SourceNeeds Review
General

design-system

No summary provided by upstream source.

Repository SourceNeeds Review
General

kanban

No summary provided by upstream source.

Repository SourceNeeds Review