首页/AI 系统可观测与 SRE/python-observability
P

python-observability

by @wshobsonv
4.5(74)

专注于Python后端系统的可观测性,提供智能自动化和多智能体编排能力,帮助开发者监控、调试和优化应用性能。

pythonobservabilityloggingmonitoringtracing-(opentelemetry)GitHub
安装方式
npx skills add wshobson/agents --skill python-observability
compare_arrows

Before / After 效果对比

1
使用前

过去,Python后端系统出现性能瓶颈或错误时,我们往往需要手动检查日志、追踪请求,耗费大量时间定位问题。这种被动式的故障排除效率低下,严重影响了开发和运维的响应速度,导致用户体验下降,甚至业务中断。

使用后

引入智能可观测性方案后,系统能自动收集并分析各项指标、日志和追踪数据。通过AI驱动的自动化和多智能体编排,我们能实时发现异常、预测潜在问题,并获得优化建议,从而实现主动式性能管理和故障预防。

SKILL.md

python-observability

Python Observability

Instrument Python applications with structured logs, metrics, and traces. When something breaks in production, you need to answer "what, where, and why" without deploying new code.

When to Use This Skill

  • Adding structured logging to applications

  • Implementing metrics collection with Prometheus

  • Setting up distributed tracing across services

  • Propagating correlation IDs through request chains

  • Debugging production issues

  • Building observability dashboards

Core Concepts

1. Structured Logging

Emit logs as JSON with consistent fields for production environments. Machine-readable logs enable powerful queries and alerts. For local development, consider human-readable formats.

2. The Four Golden Signals

Track latency, traffic, errors, and saturation for every service boundary.

3. Correlation IDs

Thread a unique ID through all logs and spans for a single request, enabling end-to-end tracing.

4. Bounded Cardinality

Keep metric label values bounded. Unbounded labels (like user IDs) explode storage costs.

Quick Start

import structlog

structlog.configure(
    processors=[
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.JSONRenderer(),
    ],
)

logger = structlog.get_logger()
logger.info("Request processed", user_id="123", duration_ms=45)

Fundamental Patterns

Pattern 1: Structured Logging with Structlog

Configure structlog for JSON output with consistent fields.

import logging
import structlog

def configure_logging(log_level: str = "INFO") -> None:
    """Configure structured logging for the application."""
    structlog.configure(
        processors=[
            structlog.contextvars.merge_contextvars,
            structlog.processors.add_log_level,
            structlog.processors.TimeStamper(fmt="iso"),
            structlog.processors.StackInfoRenderer(),
            structlog.processors.format_exc_info,
            structlog.processors.JSONRenderer(),
        ],
        wrapper_class=structlog.make_filtering_bound_logger(
            getattr(logging, log_level.upper())
        ),
        context_class=dict,
        logger_factory=structlog.PrintLoggerFactory(),
        cache_logger_on_first_use=True,
    )

# Initialize at application startup
configure_logging("INFO")
logger = structlog.get_logger()

Pattern 2: Consistent Log Fields

Every log entry should include standard fields for filtering and correlation.

import structlog
from contextvars import ContextVar

# Store correlation ID in context
correlation_id: ContextVar[str] = ContextVar("correlation_id", default="")

logger = structlog.get_logger()

def process_request(request: Request) -> Response:
    """Process request with structured logging."""
    logger.info(
        "Request received",
        correlation_id=correlation_id.get(),
        method=request.method,
        path=request.path,
        user_id=request.user_id,
    )

    try:
        result = handle_request(request)
        logger.info(
            "Request completed",
            correlation_id=correlation_id.get(),
            status_code=200,
            duration_ms=elapsed,
        )
        return result
    except Exception as e:
        logger.error(
            "Request failed",
            correlation_id=correlation_id.get(),
            error_type=type(e).__name__,
            error_message=str(e),
        )
        raise

Pattern 3: Semantic Log Levels

Use log levels consistently across the application.

Level Purpose Examples

DEBUG Development diagnostics Variable values, internal state

INFO Request lifecycle, operations Request start/end, job completion

WARNING Recoverable anomalies Retry attempts, fallback used

ERROR Failures needing attention Exceptions, service unavailable

# DEBUG: Detailed internal information
logger.debug("Cache lookup", key=cache_key, hit=cache_hit)

# INFO: Normal operational events
logger.info("Order created", order_id=order.id, total=order.total)

# WARNING: Abnormal but handled situations
logger.warning(
    "Rate limit approaching",
    current_rate=950,
    limit=1000,
    reset_seconds=30,
)

# ERROR: Failures requiring investigation
logger.error(
    "Payment processing failed",
    order_id=order.id,
    error=str(e),
    payment_provider="stripe",
)

Never log expected behavior at ERROR. A user entering a wrong password is INFO, not ERROR.

Pattern 4: Correlation ID Propagation

Generate a unique ID at ingress and thread it through all operations.

from contextvars import ContextVar
import uuid
import structlog

correlation_id: ContextVar[str] = ContextVar("correlation_id", default="")

def set_correlation_id(cid: str | None = None) -> str:
    """Set correlation ID for current context."""
    cid = cid or str(uuid.uuid4())
    correlation_id.set(cid)
    structlog.contextvars.bind_contextvars(correlation_id=cid)
    return cid

# FastAPI middleware example
from fastapi import Request

async def correlation_middleware(request: Request, call_next):
    """Middleware to set and propagate correlation ID."""
    # Use incoming header or generate new
    cid = request.headers.get("X-Correlation-ID") or str(uuid.uuid4())
    set_correlation_id(cid)

    response = await call_next(request)
    response.headers["X-Correlation-ID"] = cid
    return response

Propagate to outbound requests:

import httpx

async def call_downstream_service(endpoint: str, data: dict) -> dict:
    """Call downstream service with correlation ID."""
    async with httpx.AsyncClient() as client:
        response = await client.post(
            endpoint,
            json=data,
            headers={"X-Correlation-ID": correlation_id.get()},
        )
        return response.json()

Advanced Patterns

Pattern 5: The Four Golden Signals with Prometheus

Track these metrics for every service boundary:

from prometheus_client import Counter, Histogram, Gauge

# Latency: How long requests take
REQUEST_LATENCY = Histogram(
    "http_request_duration_seconds",
    "Request latency in seconds",
    ["method", "endpoint", "status"],
    buckets=[0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10],
)

# Traffic: Request rate
REQUEST_COUNT = Counter(
    "http_requests_total",
    "Total HTTP requests",
    ["method", "endpoint", "status"],
)

# Errors: Error rate
ERROR_COUNT = Counter(
    "http_errors_total",
    "Total HTTP errors",
    ["method", "endpoint", "error_type"],
)

# Saturation: Resource utilization
DB_POOL_USAGE = Gauge(
    "db_connection_pool_used",
    "Number of database connections in use",
)

Instrument your endpoints:

import time
from functools import wraps

def track_request(func):
    """Decorator to track request metrics."""
    @wraps(func)
    async def wrapper(request: Request, *args, **kwargs):
        method = request.method
        endpoint = request.url.path
        start = time.perf_counter()

        try:
            response = await func(request, *args, **kwargs)
            status = str(response.status_code)
            return response
        except Exception as e:
            status = "500"
            ERROR_COUNT.labels(
                method=method,
                endpoint=endpoint,
                error_type=type(e).__name__,
            ).inc()
            raise
        finally:
            duration = time.perf_counter() - start
            REQUEST_COUNT.labels(method=method, endpoint=endpoint, status=status).inc()
            REQUEST_LATENCY.labels(method=method, endpoint=endpoint, status=status).observe(duration)

    return wrapper

Pattern 6: Bounded Cardinality

Avoid labels with unbounded values to prevent metric explosion.

# BAD: User ID has potentially millions of values
REQUEST_COUNT.labels(method="GET", user_id=user.id)  # Don't do this!

# GOOD: Bounded values only
REQUEST_COUNT.labels(method="GET", endpoint="/users", status="200")

# If you need per-user metrics, use a different approach:
# - Log the user_id and query logs
# - Use a separate analytics system
# - Bucket users by type/tier
REQUEST_COUNT.labels(
    method="GET",
    endpoint="/users",
    user_tier="premium",  # Bounded set of values
)

Pattern 7: Timed Operations with Context Manager

Create a reusable timing context manager for operations.

from contextlib import contextmanager
import time
import structlog

logger = structlog.get_logger()

@contextmanager
def timed_operation(name: str, **extra_fields):
    """Context manager for timing and logging operations."""
    start = time.perf_counter()
    logger.debug("Operation started", operation=name, **extra_fields)

    try:
        yield
    except Exception as e:
        elapsed_ms = (time.perf_counter() - start) * 1000
        logger.error(
            "Operation failed",
            operation=name,
            duration_ms=round(elapsed_ms, 2),
            error=str(e),
            **extra_fields,
        )
        raise
    else:
        elapsed_ms = (time.perf_counter() - start) * 1000
        logger.info(
            "Operation completed",
            operation=name,
            duration_ms=round(elapsed_ms, 2),
            **extra_fields,
        )

# Usage
with timed_operation("fetch_user_orders", user_id=user.id):
    orders = await order_repository.get_by_user(user.id)

Pattern 8: OpenTelemetry Tracing

Set up distributed tracing with OpenTelemetry.

Note: OpenTelemetry is actively evolving. Check the official Python documentation for the latest API patterns and best practices.

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter

def configure_tracing(service_name: str, otlp_endpoint: str) -> None:
    """Configure OpenTelemetry tracing."""
    provider = TracerProvider()
    processor = BatchSpanProcessor(OTLPSpanExporter(endpoint=otlp_endpoint))
    provider.add_span_processor(processor)
    trace.set_tracer_provider(provider)

tracer = trace.get_tracer(__name__)

async def process_order(order_id: str) -> Order:
    """Process order with tracing."""
    with tracer.start_as_current_span("process_order") as span:
        span.set_attribute("order.id", order_id)

        with tracer.start_as_current_span("validate_order"):
            validate_order(order_id)

        with tracer.start_as_current_span("charge_payment"):
            charge_payment(order_id)

        with tracer.start_as_current_span("send_confirmation"):
            send_confirmation(order_id)

        return order

Best Practices Summary

  • Use structured logging - JSON logs with consistent fields

  • Propagate correlation IDs - Thread through all requests and logs

  • Track the four golden signals - Latency, traffic, errors, saturation

  • Bound label cardinality - Never use unbounded values as metric labels

  • Log at appropriate levels - Don't cry wolf with ERROR

  • Include context - User ID, request ID, operation name in logs

  • Use context managers - Consistent timing and error handling

  • Separate concerns - Observability code shouldn't pollute business logic

  • Test your observability - Verify logs and metrics in integration tests

  • Set up alerts - Metrics are useless without alerting

Weekly Installs2.7KRepositorywshobson/agentsGitHub Stars31.5KFirst SeenJan 30, 2026Security AuditsGen Agent Trust HubFailSocketPassSnykPassInstalled onopencode2.1Kgemini-cli2.1Kcodex2.1Kclaude-code2.0Kcursor1.9Kgithub-copilot1.9K

用户评价 (0)

发表评价

效果
易用性
文档
兼容性

暂无评价

统计数据

安装量6.6K
评分4.5 / 5.0
版本
更新日期2026年5月22日
对比案例1 组

用户评分

4.5(74)
5
27%
4
51%
3
20%
2
1%
1
0%

为此 Skill 评分

0.0

兼容平台

🔧Claude Code
🔧OpenClaw
🔧OpenCode
🔧Codex
🔧Gemini CLI
🔧GitHub Copilot
🔧Amp
🔧Kimi CLI

时间线

创建2026年3月17日
最后更新2026年5月22日