gRPC Expert
Expert guidance for gRPC services, Protocol Buffers, microservices communication, and streaming patterns.
Core Concepts
gRPC Fundamentals
-
Protocol Buffers (protobuf)
-
Service definitions
-
RPC patterns (unary, server streaming, client streaming, bidirectional)
-
HTTP/2 transport
-
Code generation
-
Interceptors and middleware
Communication Patterns
-
Unary RPC (request-response)
-
Server streaming RPC
-
Client streaming RPC
-
Bidirectional streaming RPC
-
Deadline/timeout handling
-
Error handling and status codes
Production Features
-
Load balancing
-
Service discovery
-
Health checking
-
Authentication (TLS, tokens)
-
Monitoring and tracing
-
Retry policies
Protocol Buffer Definition
syntax = "proto3";
package user.v1;
import "google/protobuf/timestamp.proto";
service UserService { // Unary RPC rpc GetUser(GetUserRequest) returns (GetUserResponse);
// Server streaming rpc ListUsers(ListUsersRequest) returns (stream User);
// Client streaming rpc CreateUsers(stream CreateUserRequest) returns (CreateUsersResponse);
// Bidirectional streaming rpc Chat(stream ChatMessage) returns (stream ChatMessage); }
message User { string id = 1; string email = 2; string name = 3; google.protobuf.Timestamp created_at = 4; UserRole role = 5; }
enum UserRole { USER_ROLE_UNSPECIFIED = 0; USER_ROLE_USER = 1; USER_ROLE_ADMIN = 2; }
message GetUserRequest { string id = 1; }
message GetUserResponse { User user = 1; }
message ListUsersRequest { int32 page_size = 1; string page_token = 2; }
message CreateUserRequest { string email = 1; string name = 2; }
message CreateUsersResponse { repeated string user_ids = 1; int32 created_count = 2; }
message ChatMessage { string user_id = 1; string message = 2; google.protobuf.Timestamp timestamp = 3; }
Python gRPC Server
import grpc from concurrent import futures import logging from typing import Iterator
import user_pb2 import user_pb2_grpc
class UserService(user_pb2_grpc.UserServiceServicer): def init(self): self.users = {}
def GetUser(self, request, context):
"""Unary RPC"""
user_id = request.id
if user_id not in self.users:
context.abort(grpc.StatusCode.NOT_FOUND, f"User {user_id} not found")
user = self.users[user_id]
return user_pb2.GetUserResponse(user=user)
def ListUsers(self, request, context):
"""Server streaming RPC"""
page_size = request.page_size or 10
for i, user in enumerate(self.users.values()):
if i >= page_size:
break
yield user
def CreateUsers(self, request_iterator, context):
"""Client streaming RPC"""
created_ids = []
for request in request_iterator:
user_id = self._generate_id()
user = user_pb2.User(
id=user_id,
email=request.email,
name=request.name
)
self.users[user_id] = user
created_ids.append(user_id)
return user_pb2.CreateUsersResponse(
user_ids=created_ids,
created_count=len(created_ids)
)
def Chat(self, request_iterator, context):
"""Bidirectional streaming RPC"""
for message in request_iterator:
# Echo back with modification
response = user_pb2.ChatMessage(
user_id="server",
message=f"Echo: {message.message}",
timestamp=message.timestamp
)
yield response
def serve(): server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) user_pb2_grpc.add_UserServiceServicer_to_server(UserService(), server)
server.add_insecure_port('[::]:50051')
server.start()
print("Server started on port 50051")
server.wait_for_termination()
if name == 'main': logging.basicConfig() serve()
Go gRPC Server
package main
import ( "context" "io" "log" "net" "time"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
pb "example.com/user/v1"
)
type userServer struct { pb.UnimplementedUserServiceServer users map[string]*pb.User }
func (s *userServer) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.GetUserResponse, error) { user, exists := s.users[req.Id] if !exists { return nil, status.Errorf(codes.NotFound, "user %s not found", req.Id) }
return &pb.GetUserResponse{User: user}, nil
}
func (s *userServer) ListUsers(req *pb.ListUsersRequest, stream pb.UserService_ListUsersServer) error { pageSize := req.PageSize if pageSize == 0 { pageSize = 10 }
count := 0
for _, user := range s.users {
if count >= int(pageSize) {
break
}
if err := stream.Send(user); err != nil {
return err
}
count++
}
return nil
}
func (s *userServer) CreateUsers(stream pb.UserService_CreateUsersServer) error { var userIds []string
for {
req, err := stream.Recv()
if err == io.EOF {
return stream.SendAndClose(&pb.CreateUsersResponse{
UserIds: userIds,
CreatedCount: int32(len(userIds)),
})
}
if err != nil {
return err
}
userId := generateID()
user := &pb.User{
Id: userId,
Email: req.Email,
Name: req.Name,
}
s.users[userId] = user
userIds = append(userIds, userId)
}
}
func (s *userServer) Chat(stream pb.UserService_ChatServer) error { for { msg, err := stream.Recv() if err == io.EOF { return nil } if err != nil { return err }
response := &pb.ChatMessage{
UserId: "server",
Message: "Echo: " + msg.Message,
Timestamp: msg.Timestamp,
}
if err := stream.Send(response); err != nil {
return err
}
}
}
func main() { lis, err := net.Listen("tcp", ":50051") if err != nil { log.Fatalf("failed to listen: %v", err) }
s := grpc.NewServer()
pb.RegisterUserServiceServer(s, &userServer{
users: make(map[string]*pb.User),
})
log.Println("Server listening on :50051")
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
gRPC Client with Interceptors
import grpc import user_pb2 import user_pb2_grpc from typing import Callable
def logging_interceptor( method: Callable, request, call_details: grpc.ClientCallDetails ): """Unary-unary interceptor for logging""" print(f"Calling method: {call_details.method}") print(f"Request: {request}")
response = method(request, call_details)
print(f"Response: {response}")
return response
class UserClient: def init(self, host: str = 'localhost:50051'): # Create channel with interceptor self.channel = grpc.insecure_channel(host) self.channel = grpc.intercept_channel( self.channel, logging_interceptor ) self.stub = user_pb2_grpc.UserServiceStub(self.channel)
def get_user(self, user_id: str) -> user_pb2.User:
"""Call unary RPC"""
request = user_pb2.GetUserRequest(id=user_id)
try:
response = self.stub.GetUser(
request,
timeout=5.0 # 5 second timeout
)
return response.user
except grpc.RpcError as e:
print(f"RPC failed: {e.code()} - {e.details()}")
raise
def list_users(self, page_size: int = 10):
"""Call server streaming RPC"""
request = user_pb2.ListUsersRequest(page_size=page_size)
try:
for user in self.stub.ListUsers(request):
yield user
except grpc.RpcError as e:
print(f"RPC failed: {e.code()} - {e.details()}")
def create_users_batch(self, users: list):
"""Call client streaming RPC"""
def request_generator():
for user in users:
yield user_pb2.CreateUserRequest(
email=user['email'],
name=user['name']
)
try:
response = self.stub.CreateUsers(request_generator())
return response
except grpc.RpcError as e:
print(f"RPC failed: {e.code()} - {e.details()}")
def close(self):
self.channel.close()
Best Practices
Design
-
Use semantic versioning for protobuf packages
-
Design backward-compatible changes
-
Use proper field numbering (never reuse)
-
Include metadata in messages
-
Use enums with explicit UNSPECIFIED value
-
Design for pagination in list operations
Performance
-
Enable HTTP/2 connection pooling
-
Use streaming for large data transfers
-
Implement proper timeouts
-
Use compression for large payloads
-
Batch operations when possible
-
Monitor and tune thread pool sizes
Production
-
Implement health checks
-
Use TLS for secure communication
-
Add authentication/authorization
-
Implement retry policies with backoff
-
Monitor gRPC metrics (latency, errors)
-
Use service discovery for dynamic endpoints
Anti-Patterns
❌ Breaking backward compatibility ❌ No timeout configuration ❌ Ignoring error status codes ❌ Not handling stream cancellation ❌ Over-sized messages ❌ No health checking ❌ Missing authentication
Resources
-
gRPC: https://grpc.io/
-
Protocol Buffers: https://protobuf.dev/
-
gRPC-Go: https://github.com/grpc/grpc-go
-
gRPC-Python: https://grpc.io/docs/languages/python/