grpc-python

High-performance RPC framework for microservice communication.

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "grpc-python" with this command: npx skills add yonatangross/orchestkit/yonatangross-orchestkit-grpc-python

gRPC Python Patterns

High-performance RPC framework for microservice communication.

Overview

  • Internal microservice communication (lower latency than REST)

  • Streaming data (real-time updates, file transfers)

  • Polyglot environments (shared proto definitions)

  • Strong typing between services (compile-time validation)

  • Bidirectional streaming (chat, gaming, real-time sync)

When NOT to Use

  • Public APIs (prefer REST/GraphQL for browser compatibility)

  • Simple CRUD with few services (REST is simpler)

  • When HTTP/2 is not available

Proto Definition

// protos/user_service.proto syntax = "proto3"; package user.v1;

import "google/protobuf/timestamp.proto"; import "google/protobuf/empty.proto";

service UserService { rpc GetUser(GetUserRequest) returns (User); rpc CreateUser(CreateUserRequest) returns (User); rpc ListUsers(ListUsersRequest) returns (stream User); // Server streaming rpc BulkCreateUsers(stream CreateUserRequest) returns (BulkCreateResponse); // Client streaming rpc UserUpdates(stream UserUpdateRequest) returns (stream User); // Bidirectional }

message User { string id = 1; string email = 2; string name = 3; UserStatus status = 4; google.protobuf.Timestamp created_at = 5; }

enum UserStatus { USER_STATUS_UNSPECIFIED = 0; USER_STATUS_ACTIVE = 1; USER_STATUS_INACTIVE = 2; }

message GetUserRequest { string user_id = 1; } message CreateUserRequest { string email = 1; string name = 2; string password = 3; } message ListUsersRequest { int32 page_size = 1; string page_token = 2; } message BulkCreateResponse { int32 created_count = 1; repeated string user_ids = 2; }

Code Generation

pip install grpcio grpcio-tools python -m grpc_tools.protoc -I./protos --python_out=./app/protos --pyi_out=./app/protos --grpc_python_out=./app/protos ./protos/user_service.proto

Server Implementation

import grpc from concurrent import futures from google.protobuf.timestamp_pb2 import Timestamp from app.protos import user_service_pb2 as pb2 from app.protos import user_service_pb2_grpc as pb2_grpc

class UserServiceServicer(pb2_grpc.UserServiceServicer): def init(self, user_repo): self.user_repo = user_repo

def GetUser(self, request, context):
    user = self.user_repo.get(request.user_id)
    if not user:
        context.abort(grpc.StatusCode.NOT_FOUND, f"User {request.user_id} not found")
    return self._to_proto(user)

def CreateUser(self, request, context):
    if not request.email or "@" not in request.email:
        context.abort(grpc.StatusCode.INVALID_ARGUMENT, "Invalid email")
    if self.user_repo.get_by_email(request.email):
        context.abort(grpc.StatusCode.ALREADY_EXISTS, "Email already registered")
    user = self.user_repo.create(email=request.email, name=request.name)
    return self._to_proto(user)

def ListUsers(self, request, context):
    """Server streaming: yield users one by one."""
    for user in self.user_repo.iterate(page_size=request.page_size or 100):
        if not context.is_active():
            return
        yield self._to_proto(user)

def BulkCreateUsers(self, request_iterator, context):
    """Client streaming: receive multiple requests."""
    created_ids = []
    for request in request_iterator:
        try:
            user = self.user_repo.create(email=request.email, name=request.name)
            created_ids.append(user.id)
        except Exception as e:
            pass  # Log error, continue
    return pb2.BulkCreateResponse(created_count=len(created_ids), user_ids=created_ids)

def _to_proto(self, user) -> pb2.User:
    created_at = Timestamp()
    created_at.FromDatetime(user.created_at)
    return pb2.User(id=user.id, email=user.email, name=user.name, created_at=created_at)

def serve(): server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) pb2_grpc.add_UserServiceServicer_to_server(UserServiceServicer(user_repo), server) from grpc_health.v1 import health, health_pb2_grpc health_pb2_grpc.add_HealthServicer_to_server(health.HealthServicer(), server) server.add_insecure_port("[::]:50051") server.start() server.wait_for_termination()

Async Server (grpcio >= 1.50)

import grpc.aio

class AsyncUserServiceServicer(pb2_grpc.UserServiceServicer): async def GetUser(self, request, context): user = await self.user_repo.get(request.user_id) if not user: await context.abort(grpc.StatusCode.NOT_FOUND, "User not found") return self._to_proto(user)

async def serve_async(): server = grpc.aio.server() pb2_grpc.add_UserServiceServicer_to_server(AsyncUserServiceServicer(), server) server.add_insecure_port("[::]:50051") await server.start() await server.wait_for_termination()

Client Implementation

class UserServiceClient: def init(self, host: str = "localhost:50051"): self.channel = grpc.insecure_channel(host, options=[ ("grpc.keepalive_time_ms", 30000), ("grpc.keepalive_timeout_ms", 10000), ]) self.stub = pb2_grpc.UserServiceStub(self.channel)

def get_user(self, user_id: str, timeout: float = 5.0):
    try:
        return self.stub.GetUser(pb2.GetUserRequest(user_id=user_id), timeout=timeout)
    except grpc.RpcError as e:
        if e.code() == grpc.StatusCode.NOT_FOUND:
            raise UserNotFoundError(user_id)
        raise

def list_users(self, page_size: int = 100):
    for user in self.stub.ListUsers(pb2.ListUsersRequest(page_size=page_size)):
        yield user

def close(self):
    self.channel.close()

Interceptors

class LoggingInterceptor(grpc.ServerInterceptor): def intercept_service(self, continuation, handler_call_details): start = time.time() handler = continuation(handler_call_details) logger.info(f"{handler_call_details.method} in {time.time() - start:.3f}s") return handler

class AuthInterceptor(grpc.ServerInterceptor): def init(self, auth_service): self.auth_service = auth_service self.public_methods = {"/user.v1.UserService/CreateUser"}

def intercept_service(self, continuation, handler_call_details):
    if handler_call_details.method not in self.public_methods:
        metadata = dict(handler_call_details.invocation_metadata)
        token = metadata.get("authorization", "").replace("Bearer ", "")
        if not token or not self.auth_service.verify(token):
            return grpc.unary_unary_rpc_method_handler(
                lambda req, ctx: ctx.abort(grpc.StatusCode.UNAUTHENTICATED, "Invalid token")
            )
    return continuation(handler_call_details)

class RetryInterceptor(grpc.UnaryUnaryClientInterceptor): def init(self, max_retries: int = 3): self.max_retries = max_retries self.retry_codes = {grpc.StatusCode.UNAVAILABLE, grpc.StatusCode.DEADLINE_EXCEEDED}

def intercept_unary_unary(self, continuation, client_call_details, request):
    for attempt in range(self.max_retries):
        try:
            return continuation(client_call_details, request)
        except grpc.RpcError as e:
            if e.code() not in self.retry_codes or attempt == self.max_retries - 1:
                raise
            time.sleep(2 ** attempt)

Key Decisions

Decision Recommendation

Proto organization One service per file, shared messages in common.proto

Versioning Package version (user.v1, user.v2), backward compatible

Streaming Server stream for large lists, bidirectional for real-time

Error codes Use standard gRPC codes, add details for validation

Auth Interceptor with metadata, JWT tokens

Timeouts Always set client-side deadlines

Health checks Required for load balancers

Anti-Patterns (FORBIDDEN)

NEVER skip deadline/timeout

stub.GetUser(request) # Can hang forever! Use timeout=5.0

NEVER ignore streaming cancellation

def ListUsers(self, request, context): for user in all_users: if not context.is_active(): # Check if client disconnected return yield user

NEVER return None for message fields

NEVER use proto2 syntax for new services

ALWAYS close channels to prevent resource leaks

Related Skills

  • api-design-framework

  • REST/OpenAPI patterns

  • strawberry-graphql

  • GraphQL alternative

  • streaming-api-patterns

  • SSE/WebSocket patterns

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Coding

devops-deployment

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

code-review-playbook

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

github-operations

No summary provided by upstream source.

Repository SourceNeeds Review