gRPC Python Patterns
High-performance RPC framework for microservice communication.
Overview
-
Internal microservice communication (lower latency than REST)
-
Streaming data (real-time updates, file transfers)
-
Polyglot environments (shared proto definitions)
-
Strong typing between services (compile-time validation)
-
Bidirectional streaming (chat, gaming, real-time sync)
When NOT to Use
-
Public APIs (prefer REST/GraphQL for browser compatibility)
-
Simple CRUD with few services (REST is simpler)
-
When HTTP/2 is not available
Proto Definition
// protos/user_service.proto syntax = "proto3"; package user.v1;
import "google/protobuf/timestamp.proto"; import "google/protobuf/empty.proto";
service UserService { rpc GetUser(GetUserRequest) returns (User); rpc CreateUser(CreateUserRequest) returns (User); rpc ListUsers(ListUsersRequest) returns (stream User); // Server streaming rpc BulkCreateUsers(stream CreateUserRequest) returns (BulkCreateResponse); // Client streaming rpc UserUpdates(stream UserUpdateRequest) returns (stream User); // Bidirectional }
message User { string id = 1; string email = 2; string name = 3; UserStatus status = 4; google.protobuf.Timestamp created_at = 5; }
enum UserStatus { USER_STATUS_UNSPECIFIED = 0; USER_STATUS_ACTIVE = 1; USER_STATUS_INACTIVE = 2; }
message GetUserRequest { string user_id = 1; } message CreateUserRequest { string email = 1; string name = 2; string password = 3; } message ListUsersRequest { int32 page_size = 1; string page_token = 2; } message BulkCreateResponse { int32 created_count = 1; repeated string user_ids = 2; }
Code Generation
pip install grpcio grpcio-tools python -m grpc_tools.protoc -I./protos --python_out=./app/protos --pyi_out=./app/protos --grpc_python_out=./app/protos ./protos/user_service.proto
Server Implementation
import grpc from concurrent import futures from google.protobuf.timestamp_pb2 import Timestamp from app.protos import user_service_pb2 as pb2 from app.protos import user_service_pb2_grpc as pb2_grpc
class UserServiceServicer(pb2_grpc.UserServiceServicer): def init(self, user_repo): self.user_repo = user_repo
def GetUser(self, request, context):
user = self.user_repo.get(request.user_id)
if not user:
context.abort(grpc.StatusCode.NOT_FOUND, f"User {request.user_id} not found")
return self._to_proto(user)
def CreateUser(self, request, context):
if not request.email or "@" not in request.email:
context.abort(grpc.StatusCode.INVALID_ARGUMENT, "Invalid email")
if self.user_repo.get_by_email(request.email):
context.abort(grpc.StatusCode.ALREADY_EXISTS, "Email already registered")
user = self.user_repo.create(email=request.email, name=request.name)
return self._to_proto(user)
def ListUsers(self, request, context):
"""Server streaming: yield users one by one."""
for user in self.user_repo.iterate(page_size=request.page_size or 100):
if not context.is_active():
return
yield self._to_proto(user)
def BulkCreateUsers(self, request_iterator, context):
"""Client streaming: receive multiple requests."""
created_ids = []
for request in request_iterator:
try:
user = self.user_repo.create(email=request.email, name=request.name)
created_ids.append(user.id)
except Exception as e:
pass # Log error, continue
return pb2.BulkCreateResponse(created_count=len(created_ids), user_ids=created_ids)
def _to_proto(self, user) -> pb2.User:
created_at = Timestamp()
created_at.FromDatetime(user.created_at)
return pb2.User(id=user.id, email=user.email, name=user.name, created_at=created_at)
def serve(): server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) pb2_grpc.add_UserServiceServicer_to_server(UserServiceServicer(user_repo), server) from grpc_health.v1 import health, health_pb2_grpc health_pb2_grpc.add_HealthServicer_to_server(health.HealthServicer(), server) server.add_insecure_port("[::]:50051") server.start() server.wait_for_termination()
Async Server (grpcio >= 1.50)
import grpc.aio
class AsyncUserServiceServicer(pb2_grpc.UserServiceServicer): async def GetUser(self, request, context): user = await self.user_repo.get(request.user_id) if not user: await context.abort(grpc.StatusCode.NOT_FOUND, "User not found") return self._to_proto(user)
async def serve_async(): server = grpc.aio.server() pb2_grpc.add_UserServiceServicer_to_server(AsyncUserServiceServicer(), server) server.add_insecure_port("[::]:50051") await server.start() await server.wait_for_termination()
Client Implementation
class UserServiceClient: def init(self, host: str = "localhost:50051"): self.channel = grpc.insecure_channel(host, options=[ ("grpc.keepalive_time_ms", 30000), ("grpc.keepalive_timeout_ms", 10000), ]) self.stub = pb2_grpc.UserServiceStub(self.channel)
def get_user(self, user_id: str, timeout: float = 5.0):
try:
return self.stub.GetUser(pb2.GetUserRequest(user_id=user_id), timeout=timeout)
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.NOT_FOUND:
raise UserNotFoundError(user_id)
raise
def list_users(self, page_size: int = 100):
for user in self.stub.ListUsers(pb2.ListUsersRequest(page_size=page_size)):
yield user
def close(self):
self.channel.close()
Interceptors
class LoggingInterceptor(grpc.ServerInterceptor): def intercept_service(self, continuation, handler_call_details): start = time.time() handler = continuation(handler_call_details) logger.info(f"{handler_call_details.method} in {time.time() - start:.3f}s") return handler
class AuthInterceptor(grpc.ServerInterceptor): def init(self, auth_service): self.auth_service = auth_service self.public_methods = {"/user.v1.UserService/CreateUser"}
def intercept_service(self, continuation, handler_call_details):
if handler_call_details.method not in self.public_methods:
metadata = dict(handler_call_details.invocation_metadata)
token = metadata.get("authorization", "").replace("Bearer ", "")
if not token or not self.auth_service.verify(token):
return grpc.unary_unary_rpc_method_handler(
lambda req, ctx: ctx.abort(grpc.StatusCode.UNAUTHENTICATED, "Invalid token")
)
return continuation(handler_call_details)
class RetryInterceptor(grpc.UnaryUnaryClientInterceptor): def init(self, max_retries: int = 3): self.max_retries = max_retries self.retry_codes = {grpc.StatusCode.UNAVAILABLE, grpc.StatusCode.DEADLINE_EXCEEDED}
def intercept_unary_unary(self, continuation, client_call_details, request):
for attempt in range(self.max_retries):
try:
return continuation(client_call_details, request)
except grpc.RpcError as e:
if e.code() not in self.retry_codes or attempt == self.max_retries - 1:
raise
time.sleep(2 ** attempt)
Key Decisions
Decision Recommendation
Proto organization One service per file, shared messages in common.proto
Versioning Package version (user.v1, user.v2), backward compatible
Streaming Server stream for large lists, bidirectional for real-time
Error codes Use standard gRPC codes, add details for validation
Auth Interceptor with metadata, JWT tokens
Timeouts Always set client-side deadlines
Health checks Required for load balancers
Anti-Patterns (FORBIDDEN)
NEVER skip deadline/timeout
stub.GetUser(request) # Can hang forever! Use timeout=5.0
NEVER ignore streaming cancellation
def ListUsers(self, request, context): for user in all_users: if not context.is_active(): # Check if client disconnected return yield user
NEVER return None for message fields
NEVER use proto2 syntax for new services
ALWAYS close channels to prevent resource leaks
Related Skills
-
api-design-framework
-
REST/OpenAPI patterns
-
strawberry-graphql
-
GraphQL alternative
-
streaming-api-patterns
-
SSE/WebSocket patterns