kinesis-stream-processor

AWS Kinesis Stream Processor

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "kinesis-stream-processor" with this command: npx skills add dengineproblem/agents-monorepo/dengineproblem-agents-monorepo-kinesis-stream-processor

AWS Kinesis Stream Processor

Expert in building real-time data streaming applications with AWS Kinesis.

Core Concepts

Kinesis Components

Component Purpose Use Case

Data Streams Real-time data ingestion Custom processing, low latency

Data Firehose Delivery to destinations S3, Redshift, Elasticsearch

Data Analytics SQL-based processing Real-time analytics

Video Streams Video streaming IoT, media processing

Key Limits

Kinesis Data Streams: per_shard: write: "1,000 records/sec OR 1 MB/sec" read: "5 transactions/sec, up to 10,000 records" read_throughput: "2 MB/sec"

per_stream: max_shards: "500 (soft limit)" retention: "24 hours (default) to 365 days"

per_record: max_size: "1 MB" partition_key: "256 bytes max"

Producer Implementation

Python Producer with Batching

import boto3 import json import time from concurrent.futures import ThreadPoolExecutor from typing import List, Dict, Any

class KinesisProducer: """Optimized Kinesis producer with batching and error handling."""

def __init__(self, stream_name: str, region: str = 'us-east-1'):
    self.stream_name = stream_name
    self.client = boto3.client('kinesis', region_name=region)
    self.buffer: List[Dict] = []
    self.buffer_size = 500  # Max records per batch
    self.buffer_time = 0.1  # Flush every 100ms
    self.last_flush = time.time()

def put_record(self, data: Dict[str, Any], partition_key: str) -> None:
    """Add record to buffer, flush if needed."""
    self.buffer.append({
        'Data': json.dumps(data).encode('utf-8'),
        'PartitionKey': partition_key
    })

    if len(self.buffer) >= self.buffer_size:
        self.flush()
    elif time.time() - self.last_flush > self.buffer_time:
        self.flush()

def flush(self) -> None:
    """Send buffered records to Kinesis."""
    if not self.buffer:
        return

    records = self.buffer[:500]  # PutRecords limit
    self.buffer = self.buffer[500:]

    try:
        response = self.client.put_records(
            StreamName=self.stream_name,
            Records=records
        )

        # Handle partial failures
        failed_count = response.get('FailedRecordCount', 0)
        if failed_count > 0:
            self._handle_failures(response, records)

    except Exception as e:
        print(f"Kinesis put_records error: {e}")
        # Implement retry logic or dead letter queue
        raise

    self.last_flush = time.time()

def _handle_failures(self, response: Dict, records: List[Dict]) -> None:
    """Retry failed records with exponential backoff."""
    failed_records = []

    for i, record_response in enumerate(response['Records']):
        if 'ErrorCode' in record_response:
            failed_records.append(records[i])
            print(f"Failed record: {record_response['ErrorCode']} - {record_response.get('ErrorMessage')}")

    # Retry failed records
    if failed_records:
        time.sleep(0.1)  # Brief backoff
        self.client.put_records(
            StreamName=self.stream_name,
            Records=failed_records
        )

def __enter__(self):
    return self

def __exit__(self, exc_type, exc_val, exc_tb):
    self.flush()

Node.js Producer

const { KinesisClient, PutRecordsCommand } = require('@aws-sdk/client-kinesis');

class KinesisProducer { constructor(streamName, region = 'us-east-1') { this.streamName = streamName; this.client = new KinesisClient({ region }); this.buffer = []; this.bufferSize = 500; this.flushInterval = 100; // ms

// Auto-flush timer
setInterval(() => this.flush(), this.flushInterval);

}

async putRecord(data, partitionKey) { this.buffer.push({ Data: Buffer.from(JSON.stringify(data)), PartitionKey: partitionKey });

if (this.buffer.length >= this.bufferSize) {
  await this.flush();
}

}

async flush() { if (this.buffer.length === 0) return;

const records = this.buffer.splice(0, 500);

try {
  const command = new PutRecordsCommand({
    StreamName: this.streamName,
    Records: records
  });

  const response = await this.client.send(command);

  if (response.FailedRecordCount > 0) {
    await this.handleFailures(response, records);
  }
} catch (error) {
  console.error('Kinesis error:', error);
  throw error;
}

}

async handleFailures(response, records) { const failedRecords = response.Records .map((r, i) => r.ErrorCode ? records[i] : null) .filter(Boolean);

if (failedRecords.length > 0) {
  // Exponential backoff retry
  await new Promise(resolve => setTimeout(resolve, 100));

  const command = new PutRecordsCommand({
    StreamName: this.streamName,
    Records: failedRecords
  });

  await this.client.send(command);
}

} }

Consumer Patterns

Lambda Consumer

import json import base64 from typing import Dict, Any, List

def lambda_handler(event: Dict[str, Any], context) -> Dict[str, Any]: """Process Kinesis records from Lambda trigger."""

processed_records = []
failed_records = []

for record in event['Records']:
    try:
        # Decode Kinesis record
        payload = base64.b64decode(record['kinesis']['data'])
        data = json.loads(payload)

        # Process record
        result = process_record(data)
        processed_records.append({
            'sequenceNumber': record['kinesis']['sequenceNumber'],
            'result': result
        })

    except Exception as e:
        print(f"Error processing record: {e}")
        failed_records.append({
            'sequenceNumber': record['kinesis']['sequenceNumber'],
            'error': str(e)
        })

# Report results
print(f"Processed: {len(processed_records)}, Failed: {len(failed_records)}")

# Return batch item failures for partial batch response
return {
    'batchItemFailures': [
        {'itemIdentifier': r['sequenceNumber']}
        for r in failed_records
    ]
}

def process_record(data: Dict) -> Dict: """Business logic for processing each record.""" # Transform data transformed = { 'id': data.get('id'), 'timestamp': data.get('timestamp'), 'processed_at': datetime.utcnow().isoformat(), 'value': data.get('value', 0) * 2 # Example transformation }

# Write to downstream (DynamoDB, S3, etc.)
write_to_downstream(transformed)

return transformed

KCL Consumer (Java-style with Python)

import boto3 import time from datetime import datetime

class KinesisConsumer: """KCL-style consumer with checkpointing."""

def __init__(self, stream_name: str, region: str = 'us-east-1'):
    self.stream_name = stream_name
    self.client = boto3.client('kinesis', region_name=region)
    self.checkpoint_interval = 60  # seconds
    self.last_checkpoint = time.time()

def process_shard(self, shard_id: str) -> None:
    """Process records from a single shard."""

    # Get shard iterator
    iterator_response = self.client.get_shard_iterator(
        StreamName=self.stream_name,
        ShardId=shard_id,
        ShardIteratorType='LATEST'  # or 'TRIM_HORIZON', 'AT_SEQUENCE_NUMBER'
    )
    shard_iterator = iterator_response['ShardIterator']

    while True:
        try:
            response = self.client.get_records(
                ShardIterator=shard_iterator,
                Limit=100
            )

            for record in response['Records']:
                self.process_record(record)

            # Checkpoint periodically
            if time.time() - self.last_checkpoint > self.checkpoint_interval:
                self.checkpoint(shard_id, response['Records'][-1]['SequenceNumber'])

            # Get next iterator
            shard_iterator = response.get('NextShardIterator')
            if not shard_iterator:
                break

            # Respect rate limits
            if len(response['Records']) == 0:
                time.sleep(0.5)

        except Exception as e:
            print(f"Error processing shard {shard_id}: {e}")
            time.sleep(1)

def process_record(self, record: Dict) -> None:
    """Process individual record."""
    data = json.loads(record['Data'])
    # Business logic here
    print(f"Processing: {data}")

def checkpoint(self, shard_id: str, sequence_number: str) -> None:
    """Save checkpoint for recovery."""
    # Store in DynamoDB or other persistent store
    print(f"Checkpoint: shard={shard_id}, seq={sequence_number}")
    self.last_checkpoint = time.time()

Enhanced Fan-Out Consumer

import boto3 import json

def setup_enhanced_fanout(stream_arn: str, consumer_name: str) -> str: """Register enhanced fan-out consumer for dedicated throughput."""

client = boto3.client('kinesis')

# Register consumer
response = client.register_stream_consumer(
    StreamARN=stream_arn,
    ConsumerName=consumer_name
)

consumer_arn = response['Consumer']['ConsumerARN']

# Wait for consumer to become active
waiter = client.get_waiter('stream_consumer_active')
waiter.wait(
    StreamARN=stream_arn,
    ConsumerName=consumer_name
)

return consumer_arn

def subscribe_to_shard(consumer_arn: str, shard_id: str): """Subscribe to shard with enhanced fan-out."""

client = boto3.client('kinesis')

response = client.subscribe_to_shard(
    ConsumerARN=consumer_arn,
    ShardId=shard_id,
    StartingPosition={
        'Type': 'LATEST'
    }
)

# Process events from subscription
for event in response['EventStream']:
    if 'SubscribeToShardEvent' in event:
        records = event['SubscribeToShardEvent']['Records']
        for record in records:
            process_record(record)

Infrastructure as Code

CloudFormation

AWSTemplateFormatVersion: '2010-09-09' Description: Kinesis Data Stream with Lambda Consumer

Parameters: StreamName: Type: String Default: my-data-stream ShardCount: Type: Number Default: 2 RetentionPeriod: Type: Number Default: 24

Resources: KinesisStream: Type: AWS::Kinesis::Stream Properties: Name: !Ref StreamName ShardCount: !Ref ShardCount RetentionPeriodHours: !Ref RetentionPeriod StreamEncryption: EncryptionType: KMS KeyId: alias/aws/kinesis Tags: - Key: Environment Value: production

ProcessorFunction: Type: AWS::Lambda::Function Properties: FunctionName: kinesis-processor Runtime: python3.11 Handler: index.lambda_handler MemorySize: 256 Timeout: 60 Role: !GetAtt ProcessorRole.Arn Code: ZipFile: | import json import base64

      def lambda_handler(event, context):
          for record in event['Records']:
              payload = base64.b64decode(record['kinesis']['data'])
              print(f"Processed: {payload}")
          return {'statusCode': 200}

EventSourceMapping: Type: AWS::Lambda::EventSourceMapping Properties: EventSourceArn: !GetAtt KinesisStream.Arn FunctionName: !Ref ProcessorFunction StartingPosition: LATEST BatchSize: 100 MaximumBatchingWindowInSeconds: 5 MaximumRetryAttempts: 3 BisectBatchOnFunctionError: true ParallelizationFactor: 1

ProcessorRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: lambda.amazonaws.com Action: sts:AssumeRole ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AWSLambdaKinesisExecutionRole - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole

CloudWatch Alarms

IteratorAgeAlarm: Type: AWS::CloudWatch::Alarm Properties: AlarmName: kinesis-iterator-age MetricName: GetRecords.IteratorAgeMilliseconds Namespace: AWS/Kinesis Dimensions: - Name: StreamName Value: !Ref StreamName Statistic: Maximum Period: 60 EvaluationPeriods: 5 Threshold: 60000 # 1 minute ComparisonOperator: GreaterThanThreshold AlarmActions: - !Ref AlertTopic

AlertTopic: Type: AWS::SNS::Topic Properties: TopicName: kinesis-alerts

Outputs: StreamArn: Value: !GetAtt KinesisStream.Arn StreamName: Value: !Ref KinesisStream

Terraform

resource "aws_kinesis_stream" "main" { name = var.stream_name shard_count = var.shard_count retention_period = var.retention_hours

encryption_type = "KMS" kms_key_id = "alias/aws/kinesis"

shard_level_metrics = [ "IncomingBytes", "IncomingRecords", "OutgoingBytes", "OutgoingRecords", "WriteProvisionedThroughputExceeded", "ReadProvisionedThroughputExceeded", "IteratorAgeMilliseconds" ]

tags = { Environment = var.environment } }

resource "aws_lambda_event_source_mapping" "kinesis" { event_source_arn = aws_kinesis_stream.main.arn function_name = aws_lambda_function.processor.arn starting_position = "LATEST" batch_size = 100 maximum_batching_window_in_seconds = 5 maximum_retry_attempts = 3 bisect_batch_on_function_error = true parallelization_factor = 1 }

Monitoring and Alerting

Key CloudWatch Metrics

Metric Description Alert Threshold

IncomingRecords

Records put per second Monitor for traffic patterns

IncomingBytes

Bytes put per second 80% of shard limit

WriteProvisionedThroughputExceeded

Throttled writes

0

ReadProvisionedThroughputExceeded

Throttled reads

0

GetRecords.IteratorAgeMilliseconds

Consumer lag

60000ms

GetRecords.Success

Successful GetRecords Monitor for drops

Monitoring Dashboard

import boto3

def get_stream_metrics(stream_name: str, period_minutes: int = 5): """Get key Kinesis metrics for monitoring."""

cloudwatch = boto3.client('cloudwatch')

metrics = [
    'IncomingRecords',
    'IncomingBytes',
    'WriteProvisionedThroughputExceeded',
    'GetRecords.IteratorAgeMilliseconds'
]

results = {}
for metric in metrics:
    response = cloudwatch.get_metric_statistics(
        Namespace='AWS/Kinesis',
        MetricName=metric,
        Dimensions=[{'Name': 'StreamName', 'Value': stream_name}],
        StartTime=datetime.utcnow() - timedelta(minutes=period_minutes),
        EndTime=datetime.utcnow(),
        Period=60,
        Statistics=['Sum', 'Average', 'Maximum']
    )
    results[metric] = response['Datapoints']

return results

Лучшие практики

  • Partition key design — распределяйте данные равномерно по шардам

  • Batch writes — используйте PutRecords вместо PutRecord

  • Handle throttling — реализуйте exponential backoff

  • Monitor iterator age — отслеживайте отставание consumers

  • Use enhanced fan-out — для множества consumers с низкой задержкой

  • Enable encryption — KMS encryption для sensitive данных

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Automation

social-media-marketing

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

video-marketing

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

frontend-design

No summary provided by upstream source.

Repository SourceNeeds Review