Cost & Latency Optimizer
Optimize LLM applications for cost and performance.
Cost Breakdown Analysis
class CostAnalyzer: def init(self): self.costs = { "llm_calls": 0, "embeddings": 0, "tool_calls": 0, } self.counts = { "llm_calls": 0, "embeddings": 0, }
def track_llm_call(self, tokens_in: int, tokens_out: int):
# GPT-4 pricing
cost = (tokens_in / 1000) * 0.03 + (tokens_out / 1000) * 0.06
self.costs["llm_calls"] += cost
self.counts["llm_calls"] += 1
def report(self):
return {
"total_cost": sum(self.costs.values()),
"breakdown": self.costs,
"avg_cost_per_call": self.costs["llm_calls"] / self.counts["llm_calls"],
}
Caching Strategy
import hashlib from functools import lru_cache
class LLMCache: def init(self, redis_client): self.cache = redis_client self.ttl = 3600 # 1 hour
def get_cache_key(self, prompt: str, model: str) -> str:
content = f"{model}:{prompt}"
return f"llm_cache:{hashlib.sha256(content.encode()).hexdigest()}"
def get(self, prompt: str, model: str):
key = self.get_cache_key(prompt, model)
return self.cache.get(key)
def set(self, prompt: str, model: str, response: str):
key = self.get_cache_key(prompt, model)
self.cache.setex(key, self.ttl, response)
Usage
cache = LLMCache(redis_client)
def cached_llm_call(prompt: str, model: str = "gpt-4"): # Check cache cached = cache.get(prompt, model) if cached: return cached
# Call LLM
response = llm(prompt, model=model)
# Cache result
cache.set(prompt, model, response)
return response
Model Selection
MODEL_PRICING = { "gpt-4": {"input": 0.03, "output": 0.06}, "gpt-3.5-turbo": {"input": 0.0005, "output": 0.0015}, "claude-3-opus": {"input": 0.015, "output": 0.075}, "claude-3-sonnet": {"input": 0.003, "output": 0.015}, }
def select_model_by_complexity(query: str) -> str: """Use cheaper models for simple queries""" # Classify complexity complexity = classify_complexity(query)
if complexity == "simple":
return "gpt-3.5-turbo" # 60x cheaper
elif complexity == "medium":
return "claude-3-sonnet"
else:
return "gpt-4"
def classify_complexity(query: str) -> str: # Simple heuristics if len(query) < 100 and "?" in query: return "simple" elif any(word in query.lower() for word in ["analyze", "complex", "detailed"]): return "complex" return "medium"
Prompt Optimization
def optimize_prompt(prompt: str) -> str: """Reduce token count while preserving meaning""" optimizations = [ # Remove extra whitespace lambda p: re.sub(r'\s+', ' ', p),
# Remove examples if not critical
lambda p: p.split("Examples:")[0] if "Examples:" in p else p,
# Use abbreviations
lambda p: p.replace("For example", "E.g."),
]
for optimize in optimizations:
prompt = optimize(prompt)
return prompt.strip()
Example: 500 tokens → 350 tokens = 30% cost reduction
Batching
async def batch_llm_calls(prompts: List[str], batch_size: int = 5): """Process multiple prompts in parallel""" results = []
for i in range(0, len(prompts), batch_size):
batch = prompts[i:i + batch_size]
# Parallel execution
batch_results = await asyncio.gather(*[
llm_async(prompt) for prompt in batch
])
results.extend(batch_results)
return results
10 sequential calls: ~30 seconds
10 batched calls (5 parallel): ~6 seconds
Latency Hotspot Analysis
import time
class LatencyTracker: def init(self): self.timings = {}
def track(self, operation: str):
def decorator(func):
def wrapper(*args, **kwargs):
start = time.time()
result = func(*args, **kwargs)
duration = time.time() - start
if operation not in self.timings:
self.timings[operation] = []
self.timings[operation].append(duration)
return result
return wrapper
return decorator
def report(self):
return {
op: {
"count": len(times),
"total": sum(times),
"avg": sum(times) / len(times),
"p95": sorted(times)[int(len(times) * 0.95)]
}
for op, times in self.timings.items()
}
Usage
tracker = LatencyTracker()
@tracker.track("llm_call") def call_llm(prompt): return llm(prompt)
After 100 calls
print(tracker.report())
{"llm_call": {"avg": 2.3, "p95": 4.1, ...}}
Optimization Recommendations
def generate_recommendations(cost_analysis, latency_analysis): recs = []
# High LLM costs
if cost_analysis["costs"]["llm_calls"] > 10:
recs.append({
"issue": "High LLM costs",
"recommendation": "Implement caching for repeated queries",
"impact": "50-80% cost reduction",
})
if cost_analysis["avg_cost_per_call"] > 0.01:
recs.append({
"issue": "Using expensive model for all queries",
"recommendation": "Use gpt-3.5-turbo for simple queries",
"impact": "60% cost reduction",
})
# High latency
if latency_analysis["llm_call"]["avg"] > 3:
recs.append({
"issue": "High LLM latency",
"recommendation": "Batch parallel calls, use streaming",
"impact": "50% latency reduction",
})
return recs
Streaming for Faster TTFB
async def streaming_llm(prompt: str): """Stream tokens as they're generated""" async for chunk in llm_stream(prompt): yield chunk # User sees partial response immediately
Time to First Byte: ~200ms (streaming) vs ~2s (waiting for full response)
Best Practices
-
Cache aggressively: Identical queries cached
-
Model selection: Use cheaper models when possible
-
Prompt optimization: Reduce unnecessary tokens
-
Batching: Parallel execution for throughput
-
Streaming: Faster perceived latency
-
Monitor costs: Track per-user, per-feature
-
Set budgets: Alert on anomalies
Output Checklist
-
Cost tracking implementation
-
Caching layer
-
Model selection logic
-
Prompt optimization
-
Batching for parallel calls
-
Latency tracking
-
Hotspot analysis
-
Optimization recommendations
-
Budget alerts
-
Performance dashboard