gcp-pubsub

Implements Google Cloud Pub/Sub integration in Python by configuring topics, subscriptions, publishing/subscribing, dead letter queues, and local emulator setup. Use when building event-driven architectures, implementing message queuing, or managing high-throughput systems. Triggers on "setup Pub/Sub", "publish messages", "create subscription", "configure DLQ", or "test with emulator". Works with google-cloud-pubsub library and includes reliability, idempotency, and testing patterns.

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "gcp-pubsub" with this command: npx skills add dawiddutoit/custom-claude/dawiddutoit-custom-claude-gcp-pubsub

Google Cloud Pub/Sub

Table of Contents

Purpose

Build robust, production-ready event-driven systems using Google Cloud Pub/Sub with Python. Covers setup, publishing, subscribing, error handling, dead letter queues, and local development with the emulator.

When to Use

Use this skill when you need to:

  • Build event-driven architectures with message-based communication
  • Implement reliable message queuing between services
  • Handle at-least-once message delivery guarantees
  • Manage high-throughput message systems (1000+ msgs/sec)
  • Configure local development with Pub/Sub emulator
  • Implement dead letter queues for failed message handling

Quick Start

Install and authenticate:

pip install google-cloud-pubsub
gcloud auth application-default login
python -c "from google.cloud import pubsub_v1; print('Ready')"

Publish a message:

from google.cloud import pubsub_v1

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path("my-project", "my-topic")

# Create topic
try:
    publisher.create_topic(request={"name": topic_path})
except Exception as e:
    if "ALREADY_EXISTS" not in str(e):
        raise

# Publish
future = publisher.publish(topic_path, b"Hello, World!")
print(f"Published: {future.result()}")

Subscribe to messages:

from google.cloud import pubsub_v1

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path("my-project", "my-subscription")

def callback(message):
    print(f"Received: {message.data.decode()}")
    message.ack()

future = subscriber.subscribe(subscription_path, callback=callback)

try:
    future.result(timeout=30)
except Exception:
    future.cancel()

Instructions

Step 1: Set Up Development Environment

Install dependencies and configure authentication:

pip install google-cloud-pubsub
gcloud auth application-default login

For production, use service account:

export GOOGLE_APPLICATION_CREDENTIALS="/path/to/service-account-key.json"

Step 2: Create Topics and Subscriptions

from google.cloud import pubsub_v1

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()

# Create topic
topic_path = publisher.topic_path("my-project", "my-topic")
publisher.create_topic(request={"name": topic_path})

# Create subscription
subscription_path = subscriber.subscription_path("my-project", "my-subscription")
subscription_config = {
    "name": subscription_path,
    "topic": topic_path,
    "ack_deadline_seconds": 60,
}
subscriber.create_subscription(request=subscription_config)

See references/detailed-guide.md for advanced configuration options.

Step 3: Publish Messages

Simple publishing:

from google.cloud import pubsub_v1

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path("my-project", "my-topic")

future = publisher.publish(topic_path, b"Message data")
message_id = future.result()

Publish with attributes:

import json

data = json.dumps({"event": "user.created", "user_id": "123"}).encode()
future = publisher.publish(
    topic_path,
    data,
    event_type="user.created",
    timestamp="2024-01-15T10:30:00Z"
)

See references/detailed-guide.md for production-ready publisher with batching and error handling.

Step 4: Subscribe to Messages

Basic subscriber:

from google.cloud import pubsub_v1

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path("my-project", "my-subscription")

def callback(message):
    try:
        print(f"Received: {message.data.decode()}")
        # Process message
        message.ack()
    except Exception as e:
        print(f"Error: {e}")
        message.nack()  # Will be redelivered

future = subscriber.subscribe(subscription_path, callback=callback)

try:
    future.result()  # Block indefinitely
except KeyboardInterrupt:
    future.cancel()

With flow control:

future = subscriber.subscribe(
    subscription_path,
    callback=callback,
    flow_control=pubsub_v1.types.FlowControl(
        max_messages=100,
        max_bytes=100 * 1024 * 1024,  # 100 MB
    ),
)

See references/detailed-guide.md for production subscriber with monitoring.

Step 5: Configure Dead Letter Queue

from google.cloud import pubsub_v1
from google.protobuf.duration_pb2 import Duration

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()

# Create dead letter topic
dlq_topic_path = publisher.topic_path("my-project", "my-topic-dlq")
publisher.create_topic(request={"name": dlq_topic_path})

# Create subscription with DLQ
subscription_path = subscriber.subscription_path("my-project", "my-subscription")
subscription = pubsub_v1.types.Subscription(
    name=subscription_path,
    topic=publisher.topic_path("my-project", "my-topic"),
    dead_letter_policy=pubsub_v1.types.DeadLetterPolicy(
        dead_letter_topic=dlq_topic_path,
        max_delivery_attempts=5,
    ),
    retry_policy=pubsub_v1.types.RetryPolicy(
        minimum_backoff=Duration(seconds=10),
        maximum_backoff=Duration(seconds=600),
    ),
)
subscriber.create_subscription(request=subscription)

See references/detailed-guide.md for complete DLQ setup with monitoring.

Step 6: Implement Idempotency

Track processed messages to avoid duplicate processing:

class IdempotentProcessor:
    def __init__(self):
        self.processed_ids = set()

    def process(self, message):
        msg_id = message.message_id

        if msg_id in self.processed_ids:
            print(f"Already processed: {msg_id}")
            message.ack()
            return

        try:
            # Process message
            print(f"Processing: {message.data.decode()}")
            self.processed_ids.add(msg_id)
            message.ack()
        except Exception as e:
            print(f"Failed: {e}")
            message.nack()

See references/detailed-guide.md for production-ready idempotency patterns.

Step 7: Local Development with Emulator

# Install and start emulator
gcloud components install pubsub-emulator
gcloud beta emulators pubsub start

# In another terminal
export PUBSUB_EMULATOR_HOST=localhost:8085
python your_script.py  # Uses emulator automatically

See references/detailed-guide.md for emulator configuration patterns.

Step 8: Monitor Operations

Enable logging and track metrics:

import logging

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

# Query subscription stats
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path("my-project", "my-subscription")
subscription = subscriber.get_subscription(request={"subscription": subscription_path})

print(f"Topic: {subscription.topic}")
print(f"Ack deadline: {subscription.ack_deadline_seconds}s")

See references/detailed-guide.md for comprehensive monitoring patterns.

Requirements

  • Python: 3.7+
  • Dependencies:
    pip install google-cloud-pubsub>=2.18.0
    
  • GCP Project: Active project with Pub/Sub API enabled
  • Authentication: Application Default Credentials or service account key
  • IAM Permissions:
    • roles/pubsub.publisher - Publish messages
    • roles/pubsub.subscriber - Subscribe to messages
    • roles/pubsub.admin - Create/delete topics and subscriptions
  • For Local Development:
    gcloud components install pubsub-emulator
    

See Also

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Coding

python-best-practices-fail-fast-imports

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

python-best-practices-async-context-manager

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

uv-python-version-management

No summary provided by upstream source.

Repository SourceNeeds Review