AI Integration
Enterprise AI/ML model integration patterns for vision, audio, embeddings, and RAG systems. This skill covers API integration, prompt engineering, and production deployment.
Purpose
Integrate AI capabilities into applications effectively:
-
Implement vision and image understanding
-
Add audio transcription and processing
-
Build semantic search with embeddings
-
Create RAG (Retrieval Augmented Generation) systems
-
Handle rate limiting and error recovery
-
Optimize costs and latency
Features
- Vision API Integration
import Anthropic from '@anthropic-ai/sdk'; import OpenAI from 'openai';
const anthropic = new Anthropic(); const openai = new OpenAI();
// Analyze image with Claude async function analyzeImageWithClaude( imageUrl: string | Buffer, prompt: string ): Promise<string> { const imageSource = typeof imageUrl === 'string' ? { type: 'url' as const, url: imageUrl } : { type: 'base64' as const, media_type: 'image/jpeg' as const, data: imageUrl.toString('base64'), };
const response = await anthropic.messages.create({ model: 'claude-sonnet-4-20250514', max_tokens: 1024, messages: [{ role: 'user', content: [ { type: 'image', source: imageSource, }, { type: 'text', text: prompt, }, ], }], });
return response.content[0].type === 'text' ? response.content[0].text : ''; }
// Extract structured data from image interface ProductInfo { name: string; description: string; price?: string; category?: string; features: string[]; }
async function extractProductFromImage(imageBuffer: Buffer): Promise<ProductInfo> { const prompt = `Analyze this product image and extract:
- Product name
- Description (2-3 sentences)
- Price (if visible)
- Category
- Key features (list)
Return as JSON only, no explanation.`;
const response = await analyzeImageWithClaude(imageBuffer, prompt);
try { return JSON.parse(response); } catch { throw new Error('Failed to parse product information'); } }
// OCR with GPT-4 Vision async function extractTextFromImage(imageUrl: string): Promise<string> { const response = await openai.chat.completions.create({ model: 'gpt-4o', messages: [{ role: 'user', content: [ { type: 'image_url', image_url: { url: imageUrl, detail: 'high' }, }, { type: 'text', text: 'Extract all text from this image. Preserve the original formatting and structure as much as possible.', }, ], }], max_tokens: 4096, });
return response.choices[0].message.content || ''; }
// Batch image processing async function batchAnalyzeImages( images: Array<{ id: string; url: string }>, prompt: string, concurrency: number = 3 ): Promise<Map<string, string>> { const results = new Map<string, string>(); const queue = new PQueue({ concurrency });
await Promise.all(
images.map(image =>
queue.add(async () => {
try {
const result = await analyzeImageWithClaude(image.url, prompt);
results.set(image.id, result);
} catch (error) {
results.set(image.id, Error: ${error.message});
}
})
)
);
return results; }
- Audio Processing
import { Readable } from 'stream';
// Transcribe audio with Whisper async function transcribeAudio( audioFile: Buffer | string, options: { language?: string; prompt?: string; responseFormat?: 'json' | 'text' | 'srt' | 'vtt'; timestamps?: boolean; } = {} ): Promise<TranscriptionResult> { const { language, prompt, responseFormat = 'json', timestamps = false, } = options;
const file = typeof audioFile === 'string' ? fs.createReadStream(audioFile) : Readable.from(audioFile);
const response = await openai.audio.transcriptions.create({ file, model: 'whisper-1', language, prompt, response_format: timestamps ? 'verbose_json' : responseFormat, });
if (timestamps && typeof response !== 'string') { return { text: response.text, segments: response.segments?.map(seg => ({ start: seg.start, end: seg.end, text: seg.text, })), language: response.language, }; }
return { text: typeof response === 'string' ? response : response.text }; }
// Real-time transcription with streaming async function* streamTranscription( audioStream: ReadableStream ): AsyncGenerator<string> { // For real-time, use Deepgram or AssemblyAI const deepgram = new Deepgram(process.env.DEEPGRAM_API_KEY!);
const connection = await deepgram.transcription.live({ model: 'nova-2', language: 'en', smart_format: true, interim_results: true, });
connection.on('transcriptReceived', (message) => { const transcript = message.channel?.alternatives?.[0]?.transcript; if (transcript) { yield transcript; } });
// Pipe audio to connection const reader = audioStream.getReader(); while (true) { const { done, value } = await reader.read(); if (done) break; connection.send(value); }
connection.close(); }
// Generate speech from text async function generateSpeech( text: string, options: { voice?: 'alloy' | 'echo' | 'fable' | 'onyx' | 'nova' | 'shimmer'; model?: 'tts-1' | 'tts-1-hd'; speed?: number; } = {} ): Promise<Buffer> { const { voice = 'alloy', model = 'tts-1', speed = 1 } = options;
const response = await openai.audio.speech.create({ model, voice, input: text, speed, });
return Buffer.from(await response.arrayBuffer()); }
- Embeddings & Vector Search
import { Pinecone } from '@pinecone-database/pinecone';
const pinecone = new Pinecone();
// Generate embeddings async function generateEmbeddings( texts: string[], model: string = 'text-embedding-3-small' ): Promise<number[][]> { const response = await openai.embeddings.create({ model, input: texts, });
return response.data.map(d => d.embedding); }
// Index documents interface Document { id: string; content: string; metadata?: Record<string, any>; }
async function indexDocuments( documents: Document[], indexName: string, namespace: string = 'default' ): Promise<void> { const index = pinecone.index(indexName);
// Process in batches const batchSize = 100; for (let i = 0; i < documents.length; i += batchSize) { const batch = documents.slice(i, i + batchSize);
const embeddings = await generateEmbeddings(
batch.map(d => d.content)
);
const vectors = batch.map((doc, j) => ({
id: doc.id,
values: embeddings[j],
metadata: {
content: doc.content.substring(0, 1000), // Store truncated content
...doc.metadata,
},
}));
await index.namespace(namespace).upsert(vectors);
} }
// Semantic search interface SearchResult { id: string; score: number; content: string; metadata?: Record<string, any>; }
async function semanticSearch( query: string, indexName: string, options: { namespace?: string; topK?: number; filter?: Record<string, any>; minScore?: number; } = {} ): Promise<SearchResult[]> { const { namespace = 'default', topK = 10, filter, minScore = 0.7, } = options;
const [queryEmbedding] = await generateEmbeddings([query]);
const index = pinecone.index(indexName); const results = await index.namespace(namespace).query({ vector: queryEmbedding, topK, filter, includeMetadata: true, });
return results.matches ?.filter(m => m.score && m.score >= minScore) .map(match => ({ id: match.id, score: match.score || 0, content: match.metadata?.content as string || '', metadata: match.metadata, })) || []; }
- RAG Implementation
interface RAGConfig { indexName: string; namespace?: string; topK?: number; model?: string; systemPrompt?: string; }
class RAGSystem { private config: RAGConfig;
constructor(config: RAGConfig) { this.config = { namespace: 'default', topK: 5, model: 'claude-sonnet-4-20250514', systemPrompt: 'You are a helpful assistant. Answer based on the provided context.', ...config, }; }
async query(question: string): Promise<RAGResponse> { // Step 1: Retrieve relevant documents const context = await semanticSearch(question, this.config.indexName, { namespace: this.config.namespace, topK: this.config.topK, });
if (context.length === 0) {
return {
answer: "I couldn't find relevant information to answer your question.",
sources: [],
confidence: 0,
};
}
// Step 2: Build context string
const contextText = context
.map((doc, i) => `[${i + 1}] ${doc.content}`)
.join('\n\n');
// Step 3: Generate answer
const response = await anthropic.messages.create({
model: this.config.model!,
max_tokens: 2048,
system: `${this.config.systemPrompt}
Use the following context to answer the user's question. If the answer is not in the context, say so.
Context: ${contextText}`, messages: [{ role: 'user', content: question, }], });
const answer = response.content[0].type === 'text'
? response.content[0].text
: '';
return {
answer,
sources: context.map(c => ({
id: c.id,
content: c.content,
score: c.score,
})),
confidence: Math.max(...context.map(c => c.score)),
};
}
// Hybrid search (keyword + semantic) async hybridQuery( question: string, keywords?: string[] ): Promise<RAGResponse> { // Semantic search const semanticResults = await semanticSearch(question, this.config.indexName, { namespace: this.config.namespace, topK: this.config.topK! * 2, });
// Keyword filter (if provided)
let results = semanticResults;
if (keywords && keywords.length > 0) {
results = semanticResults.filter(r =>
keywords.some(k =>
r.content.toLowerCase().includes(k.toLowerCase())
)
);
}
// Rerank and take top K
const topResults = results.slice(0, this.config.topK);
// Generate answer using top results
return this.generateAnswer(question, topResults);
}
private async generateAnswer( question: string, context: SearchResult[] ): Promise<RAGResponse> { // ... same generation logic } }
// Usage const rag = new RAGSystem({ indexName: 'knowledge-base', topK: 5, systemPrompt: 'You are a customer support agent. Be helpful and concise.', });
const response = await rag.query('How do I reset my password?');
- Structured Output
import { z } from 'zod'; import { zodResponseFormat } from 'openai/helpers/zod';
// Define schema const SentimentSchema = z.object({ sentiment: z.enum(['positive', 'negative', 'neutral']), confidence: z.number().min(0).max(1), topics: z.array(z.string()), summary: z.string(), });
type SentimentAnalysis = z.infer<typeof SentimentSchema>;
// Get structured output async function analyzeSentiment(text: string): Promise<SentimentAnalysis> { const response = await openai.beta.chat.completions.parse({ model: 'gpt-4o', messages: [{ role: 'system', content: 'Analyze the sentiment of the provided text.', }, { role: 'user', content: text, }], response_format: zodResponseFormat(SentimentSchema, 'sentiment_analysis'), });
return response.choices[0].message.parsed!; }
// Claude tool use for structured output
async function extractEntities(text: string): Promise<{
people: string[];
organizations: string[];
locations: string[];
dates: string[];
}> {
const response = await anthropic.messages.create({
model: 'claude-sonnet-4-20250514',
max_tokens: 1024,
tools: [{
name: 'extract_entities',
description: 'Extract named entities from text',
input_schema: {
type: 'object',
properties: {
people: {
type: 'array',
items: { type: 'string' },
description: 'Names of people mentioned',
},
organizations: {
type: 'array',
items: { type: 'string' },
description: 'Organization names',
},
locations: {
type: 'array',
items: { type: 'string' },
description: 'Location names',
},
dates: {
type: 'array',
items: { type: 'string' },
description: 'Dates mentioned',
},
},
required: ['people', 'organizations', 'locations', 'dates'],
},
}],
tool_choice: { type: 'tool', name: 'extract_entities' },
messages: [{
role: 'user',
content: Extract entities from: ${text},
}],
});
const toolUse = response.content.find(c => c.type === 'tool_use'); return toolUse?.input as any; }
- Production Patterns
// Rate limiting and retry import Bottleneck from 'bottleneck';
const limiter = new Bottleneck({ reservoir: 100, // Initial tokens reservoirRefreshAmount: 100, reservoirRefreshInterval: 60 * 1000, // Per minute maxConcurrent: 10, });
async function withRateLimit<T>(fn: () => Promise<T>): Promise<T> { return limiter.schedule(fn); }
// Retry with exponential backoff async function withRetry<T>( fn: () => Promise<T>, maxRetries: number = 3, baseDelay: number = 1000 ): Promise<T> { let lastError: Error;
for (let attempt = 0; attempt < maxRetries; attempt++) { try { return await fn(); } catch (error) { lastError = error as Error;
// Check if retryable
if (error.status === 429 || error.status >= 500) {
const delay = baseDelay * Math.pow(2, attempt);
await new Promise(r => setTimeout(r, delay));
continue;
}
throw error;
}
}
throw lastError!; }
// Caching layer const cache = new Map<string, { data: any; expires: number }>();
async function cachedEmbedding(
text: string,
ttl: number = 3600000 // 1 hour
): Promise<number[]> {
const key = embedding:${hashString(text)};
const cached = cache.get(key);
if (cached && cached.expires > Date.now()) { return cached.data; }
const [embedding] = await generateEmbeddings([text]); cache.set(key, { data: embedding, expires: Date.now() + ttl });
return embedding; }
// Cost tracking class CostTracker { private costs: Map<string, number> = new Map();
track(model: string, inputTokens: number, outputTokens: number): void { const pricing = MODEL_PRICING[model] || { input: 0, output: 0 }; const cost = (inputTokens * pricing.input + outputTokens * pricing.output) / 1000;
const current = this.costs.get(model) || 0;
this.costs.set(model, current + cost);
}
getReport(): Record<string, number> { return Object.fromEntries(this.costs); }
getTotalCost(): number { return Array.from(this.costs.values()).reduce((a, b) => a + b, 0); } }
Use Cases
- Document Q&A System
// Build document Q&A
async function buildDocumentQA(documents: string[]): Promise<RAGSystem> {
// Chunk documents
const chunks = documents.flatMap((doc, docIndex) =>
chunkText(doc, 500, 50).map((chunk, chunkIndex) => ({
id: doc-${docIndex}-chunk-${chunkIndex},
content: chunk,
metadata: { documentIndex: docIndex },
}))
);
// Index chunks await indexDocuments(chunks, 'document-qa');
// Return RAG system return new RAGSystem({ indexName: 'document-qa', topK: 5, systemPrompt: 'Answer questions based on the provided documents.', }); }
- Content Moderation
// Moderate content with AI async function moderateContent(content: string): Promise<ModerationResult> { const response = await openai.moderations.create({ input: content }); const result = response.results[0];
return { flagged: result.flagged, categories: Object.entries(result.categories) .filter(([_, flagged]) => flagged) .map(([category]) => category), scores: result.category_scores, }; }
Best Practices
Do's
-
Implement rate limiting - Respect API limits
-
Cache embeddings - Avoid redundant API calls
-
Handle errors gracefully - Implement retry logic
-
Monitor costs - Track token usage
-
Use streaming - For better UX with long responses
-
Chunk appropriately - Balance context vs. relevance
Don'ts
-
Don't expose API keys in frontend code
-
Don't skip input validation
-
Don't ignore rate limit errors
-
Don't cache sensitive data inappropriately
-
Don't use overly large context windows
-
Don't forget fallback strategies
Related Skills
-
api-architecture - API design patterns
-
caching-strategies - Caching for AI responses
-
backend-development - Integration patterns
Reference Resources
-
OpenAI API Reference
-
Anthropic API Reference
-
Pinecone Documentation
-
LangChain Documentation