azure-eventhub-ts

Azure Event Hubs SDK for TypeScript

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "azure-eventhub-ts" with this command: npx skills add claudedjale/skillset/claudedjale-skillset-azure-eventhub-ts

Azure Event Hubs SDK for TypeScript

High-throughput event streaming and real-time data ingestion.

Installation

npm install @azure/event-hubs @azure/identity

For checkpointing with consumer groups:

npm install @azure/eventhubs-checkpointstore-blob @azure/storage-blob

Environment Variables

EVENTHUB_NAMESPACE=<namespace>.servicebus.windows.net EVENTHUB_NAME=my-eventhub STORAGE_ACCOUNT_NAME=<storage-account> STORAGE_CONTAINER_NAME=checkpoints

Authentication

import { EventHubProducerClient, EventHubConsumerClient } from "@azure/event-hubs"; import { DefaultAzureCredential } from "@azure/identity";

const fullyQualifiedNamespace = process.env.EVENTHUB_NAMESPACE!; const eventHubName = process.env.EVENTHUB_NAME!; const credential = new DefaultAzureCredential();

// Producer const producer = new EventHubProducerClient(fullyQualifiedNamespace, eventHubName, credential);

// Consumer const consumer = new EventHubConsumerClient( "$Default", // Consumer group fullyQualifiedNamespace, eventHubName, credential );

Core Workflow

Send Events

const producer = new EventHubProducerClient(namespace, eventHubName, credential);

// Create batch and add events const batch = await producer.createBatch(); batch.tryAdd({ body: { temperature: 72.5, deviceId: "sensor-1" } }); batch.tryAdd({ body: { temperature: 68.2, deviceId: "sensor-2" } });

await producer.sendBatch(batch); await producer.close();

Send to Specific Partition

// By partition ID const batch = await producer.createBatch({ partitionId: "0" });

// By partition key (consistent hashing) const batch = await producer.createBatch({ partitionKey: "device-123" });

Receive Events (Simple)

const consumer = new EventHubConsumerClient("$Default", namespace, eventHubName, credential);

const subscription = consumer.subscribe({ processEvents: async (events, context) => { for (const event of events) { console.log(Partition: ${context.partitionId}, Body: ${JSON.stringify(event.body)}); } }, processError: async (err, context) => { console.error(Error on partition ${context.partitionId}: ${err.message}); }, });

// Stop after some time setTimeout(async () => { await subscription.close(); await consumer.close(); }, 60000);

Receive with Checkpointing (Production)

import { EventHubConsumerClient } from "@azure/event-hubs"; import { ContainerClient } from "@azure/storage-blob"; import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob";

const containerClient = new ContainerClient( https://${storageAccount}.blob.core.windows.net/${containerName}, credential );

const checkpointStore = new BlobCheckpointStore(containerClient);

const consumer = new EventHubConsumerClient( "$Default", namespace, eventHubName, credential, checkpointStore );

const subscription = consumer.subscribe({ processEvents: async (events, context) => { for (const event of events) { console.log(Processing: ${JSON.stringify(event.body)}); } // Checkpoint after processing batch if (events.length > 0) { await context.updateCheckpoint(events[events.length - 1]); } }, processError: async (err, context) => { console.error(Error: ${err.message}); }, });

Receive from Specific Position

const subscription = consumer.subscribe({ processEvents: async (events, context) => { /* ... / }, processError: async (err, context) => { / ... */ }, }, { startPosition: { // Start from beginning "0": { offset: "@earliest" }, // Start from end (new events only) "1": { offset: "@latest" }, // Start from specific offset "2": { offset: "12345" }, // Start from specific time "3": { enqueuedOn: new Date("2024-01-01") }, }, });

Event Hub Properties

// Get hub info const hubProperties = await producer.getEventHubProperties(); console.log(Partitions: ${hubProperties.partitionIds});

// Get partition info const partitionProperties = await producer.getPartitionProperties("0"); console.log(Last sequence: ${partitionProperties.lastEnqueuedSequenceNumber});

Batch Processing Options

const subscription = consumer.subscribe( { processEvents: async (events, context) => { /* ... / }, processError: async (err, context) => { / ... */ }, }, { maxBatchSize: 100, // Max events per batch maxWaitTimeInSeconds: 30, // Max wait for batch } );

Key Types

import { EventHubProducerClient, EventHubConsumerClient, EventData, ReceivedEventData, PartitionContext, Subscription, SubscriptionEventHandlers, CreateBatchOptions, EventPosition, } from "@azure/event-hubs";

import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob";

Event Properties

// Send with properties const batch = await producer.createBatch(); batch.tryAdd({ body: { data: "payload" }, properties: { eventType: "telemetry", deviceId: "sensor-1", }, contentType: "application/json", correlationId: "request-123", });

// Access in receiver consumer.subscribe({ processEvents: async (events, context) => { for (const event of events) { console.log(Type: ${event.properties?.eventType}); console.log(Sequence: ${event.sequenceNumber}); console.log(Enqueued: ${event.enqueuedTimeUtc}); console.log(Offset: ${event.offset}); } }, });

Error Handling

consumer.subscribe({ processEvents: async (events, context) => { try { for (const event of events) { await processEvent(event); } await context.updateCheckpoint(events[events.length - 1]); } catch (error) { // Don't checkpoint on error - events will be reprocessed console.error("Processing failed:", error); } }, processError: async (err, context) => { if (err.name === "MessagingError") { // Transient error - SDK will retry console.warn("Transient error:", err.message); } else { // Fatal error console.error("Fatal error:", err); } }, });

Best Practices

  • Use checkpointing - Always checkpoint in production for exactly-once processing

  • Batch sends - Use createBatch() for efficient sending

  • Partition keys - Use partition keys to ensure ordering for related events

  • Consumer groups - Use separate consumer groups for different processing pipelines

  • Handle errors gracefully - Don't checkpoint on processing failures

  • Close clients - Always close producer/consumer when done

  • Monitor lag - Track lastEnqueuedSequenceNumber vs processed sequence

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

General

azure-observability

No summary provided by upstream source.

Repository SourceNeeds Review
General

azure-appconfiguration-java

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

wiki-agents-md

No summary provided by upstream source.

Repository SourceNeeds Review
General

mcp-builder

No summary provided by upstream source.

Repository SourceNeeds Review