fastapi observability

FastAPI Observability

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "fastapi observability" with this command: npx skills add lobbi-docs/claude/lobbi-docs-claude-fastapi-observability

FastAPI Observability

This skill provides production-ready observability patterns including structured logging, Prometheus metrics, and OpenTelemetry tracing.

Structured Logging

Configuration with structlog

app/core/logging.py

import structlog import logging import sys from typing import Any

def setup_logging(log_level: str = "INFO", json_logs: bool = True): """Configure structured logging."""

# Shared processors
shared_processors = [
    structlog.contextvars.merge_contextvars,
    structlog.processors.add_log_level,
    structlog.processors.TimeStamper(fmt="iso"),
    structlog.processors.StackInfoRenderer(),
]

if json_logs:
    # JSON format for production
    processors = shared_processors + [
        structlog.processors.format_exc_info,
        structlog.processors.JSONRenderer()
    ]
else:
    # Console format for development
    processors = shared_processors + [
        structlog.dev.ConsoleRenderer()
    ]

structlog.configure(
    processors=processors,
    wrapper_class=structlog.make_filtering_bound_logger(
        getattr(logging, log_level.upper())
    ),
    context_class=dict,
    logger_factory=structlog.PrintLoggerFactory(),
    cache_logger_on_first_use=True,
)

def get_logger(name: str = None) -> structlog.BoundLogger: return structlog.get_logger(name)

Request Logging Middleware

app/middleware/logging.py

import time import uuid from starlette.middleware.base import BaseHTTPMiddleware from starlette.requests import Request import structlog

logger = structlog.get_logger()

class RequestLoggingMiddleware(BaseHTTPMiddleware): async def dispatch(self, request: Request, call_next): request_id = str(uuid.uuid4()) start_time = time.perf_counter()

    # Bind context for all logs in this request
    structlog.contextvars.clear_contextvars()
    structlog.contextvars.bind_contextvars(
        request_id=request_id,
        method=request.method,
        path=request.url.path,
        client_ip=request.client.host if request.client else None
    )

    # Add request ID to response headers
    response = await call_next(request)
    response.headers["X-Request-ID"] = request_id

    # Calculate duration
    duration_ms = (time.perf_counter() - start_time) * 1000

    # Log request completion
    logger.info(
        "request_completed",
        status_code=response.status_code,
        duration_ms=round(duration_ms, 2),
        content_length=response.headers.get("content-length")
    )

    return response

Application Logging

from app.core.logging import get_logger

logger = get_logger(name)

async def create_user(data: UserCreate) -> User: logger.info("creating_user", email=data.email)

try:
    user = await User(**data.model_dump()).insert()
    logger.info("user_created", user_id=str(user.id))
    return user
except Exception as e:
    logger.error("user_creation_failed", error=str(e), email=data.email)
    raise

Prometheus Metrics

Setup with prometheus-fastapi-instrumentator

app/core/metrics.py

from prometheus_fastapi_instrumentator import Instrumentator from prometheus_client import Counter, Histogram, Gauge from functools import wraps

Custom metrics

REQUEST_COUNT = Counter( "app_requests_total", "Total request count", ["method", "endpoint", "status"] )

REQUEST_LATENCY = Histogram( "app_request_latency_seconds", "Request latency", ["method", "endpoint"], buckets=[0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0] )

ACTIVE_REQUESTS = Gauge( "app_active_requests", "Number of active requests" )

DB_QUERY_LATENCY = Histogram( "app_db_query_latency_seconds", "Database query latency", ["operation", "collection"] )

CACHE_HITS = Counter( "app_cache_hits_total", "Cache hit count", ["cache_name"] )

CACHE_MISSES = Counter( "app_cache_misses_total", "Cache miss count", ["cache_name"] )

def setup_metrics(app): """Setup Prometheus metrics instrumentation.""" Instrumentator().instrument(app).expose(app, endpoint="/metrics")

Custom Metric Decorators

import time from functools import wraps

def track_db_query(operation: str, collection: str): def decorator(func): @wraps(func) async def wrapper(*args, **kwargs): start = time.perf_counter() try: return await func(*args, **kwargs) finally: duration = time.perf_counter() - start DB_QUERY_LATENCY.labels( operation=operation, collection=collection ).observe(duration) return wrapper return decorator

def track_cache(cache_name: str): def decorator(func): @wraps(func) async def wrapper(*args, **kwargs): result = await func(*args, **kwargs) if result is not None: CACHE_HITS.labels(cache_name=cache_name).inc() else: CACHE_MISSES.labels(cache_name=cache_name).inc() return result return wrapper return decorator

Usage

class UserRepository: @track_db_query("find", "users") async def find_by_id(self, user_id: str): return await User.get(user_id)

@track_cache("users")
async def get_cached(self, user_id: str):
    return await cache.get(f"user:{user_id}")

OpenTelemetry Tracing

Setup

app/core/tracing.py

from opentelemetry import trace from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor from opentelemetry.instrumentation.redis import RedisInstrumentor from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.sdk.resources import Resource

def setup_tracing(app, service_name: str, otlp_endpoint: str): """Setup OpenTelemetry tracing."""

# Create resource
resource = Resource.create({
    "service.name": service_name,
    "service.version": "1.0.0",
})

# Setup tracer provider
provider = TracerProvider(resource=resource)

# Add OTLP exporter
otlp_exporter = OTLPSpanExporter(endpoint=otlp_endpoint)
processor = BatchSpanProcessor(otlp_exporter)
provider.add_span_processor(processor)

trace.set_tracer_provider(provider)

# Instrument FastAPI
FastAPIInstrumentor.instrument_app(app)

# Instrument HTTP client
HTTPXClientInstrumentor().instrument()

# Instrument Redis
RedisInstrumentor().instrument()

def get_tracer(name: str) -> trace.Tracer: return trace.get_tracer(name)

Custom Spans

from opentelemetry import trace from opentelemetry.trace import Status, StatusCode

tracer = trace.get_tracer(name)

async def process_order(order_id: str): with tracer.start_as_current_span("process_order") as span: span.set_attribute("order.id", order_id)

    try:
        # Validate order
        with tracer.start_as_current_span("validate_order"):
            order = await validate_order(order_id)
            span.set_attribute("order.total", order.total)

        # Process payment
        with tracer.start_as_current_span("process_payment"):
            payment = await process_payment(order)
            span.set_attribute("payment.id", payment.id)

        # Update inventory
        with tracer.start_as_current_span("update_inventory"):
            await update_inventory(order.items)

        span.set_status(Status(StatusCode.OK))
        return order

    except Exception as e:
        span.set_status(Status(StatusCode.ERROR, str(e)))
        span.record_exception(e)
        raise

Health Check Endpoints

app/routes/health.py

from fastapi import APIRouter, Response from typing import Dict, Any from enum import Enum

class HealthStatus(str, Enum): HEALTHY = "healthy" DEGRADED = "degraded" UNHEALTHY = "unhealthy"

router = APIRouter(tags=["Health"])

@router.get("/health") async def health() -> Dict[str, str]: """Kubernetes liveness probe.""" return {"status": "ok"}

@router.get("/health/ready") async def ready( db: Database = Depends(get_db), cache: RedisCache = Depends(get_cache) ) -> Response: """Kubernetes readiness probe with dependency checks.""" checks = {} status = HealthStatus.HEALTHY

# MongoDB check
try:
    await db.command("ping")
    checks["mongodb"] = {"status": "ok", "latency_ms": 0}
except Exception as e:
    checks["mongodb"] = {"status": "error", "error": str(e)}
    status = HealthStatus.UNHEALTHY

# Redis check
try:
    start = time.perf_counter()
    await cache.client.ping()
    latency = (time.perf_counter() - start) * 1000
    checks["redis"] = {"status": "ok", "latency_ms": round(latency, 2)}
except Exception as e:
    checks["redis"] = {"status": "error", "error": str(e)}
    status = HealthStatus.DEGRADED  # Cache failure = degraded

response_data = {
    "status": status.value,
    "checks": checks,
    "timestamp": datetime.utcnow().isoformat()
}

status_code = 200 if status != HealthStatus.UNHEALTHY else 503
return Response(
    content=json.dumps(response_data),
    status_code=status_code,
    media_type="application/json"
)

@router.get("/health/live") async def live() -> Dict[str, str]: """Simple liveness check.""" return {"status": "alive"}

Application Integration

from fastapi import FastAPI from app.core.logging import setup_logging from app.core.metrics import setup_metrics from app.core.tracing import setup_tracing

def create_app() -> FastAPI: # Setup logging first setup_logging( log_level=settings.log_level, json_logs=settings.environment == "production" )

app = FastAPI(title="API Service")

# Setup metrics
setup_metrics(app)

# Setup tracing
if settings.otlp_endpoint:
    setup_tracing(
        app,
        service_name="api-service",
        otlp_endpoint=settings.otlp_endpoint
    )

# Add middleware
app.add_middleware(RequestLoggingMiddleware)

return app

Additional Resources

Reference Files

For detailed configuration:

  • references/grafana-dashboards.md

  • Grafana dashboard JSON

  • references/alerting.md

  • Prometheus alerting rules

  • references/elk-setup.md

  • Elasticsearch/Kibana log aggregation

Example Files

Working examples in examples/ :

  • examples/logging_config.py

  • Complete logging setup

  • examples/metrics_middleware.py

  • Custom metrics middleware

  • examples/tracing_service.py

  • Service with tracing

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

General

vision-multimodal

No summary provided by upstream source.

Repository SourceNeeds Review
General

design-system

No summary provided by upstream source.

Repository SourceNeeds Review
General

kanban

No summary provided by upstream source.

Repository SourceNeeds Review