feat(telemetry): U7 OpenTelemetry integration with zero-dependency no-op pattern

Add telemetry module with tracing (agent/tool/llm/pipeline_step spans),
metrics (5 histograms/counters), and setup with optional OTLP exporters.
Uses no-op pattern when opentelemetry not installed. GenAI Semantic
Conventions for LLM spans. Integrated into ReactEngine, LLMGateway,
ToolBase, and FastAPI app.
This commit is contained in:
chiguyong 2026-06-07 17:26:21 +08:00
parent 03a5167366
commit 239009357a
8 changed files with 1059 additions and 34 deletions

View File

@ -17,6 +17,11 @@ from agentkit.core.exceptions import TaskCancelledError, TaskTimeoutError
from agentkit.core.protocol import CancellationToken
from agentkit.llm.gateway import LLMGateway
from agentkit.tools.base import Tool
from agentkit.telemetry.tracing import get_tracer, start_span, _OTEL_AVAILABLE
from agentkit.telemetry.metrics import (
agent_request_counter,
agent_duration_histogram,
)
if TYPE_CHECKING:
from agentkit.core.compressor import ContextCompressor
@ -165,6 +170,17 @@ class ReActEngine:
tools = tools or []
tool_schemas = self._build_tool_schemas(tools) if tools else None
# Telemetry: record agent request
agent_request_counter().add(1, {"agent.name": agent_name, "agent.type": task_type or "react"})
# Start telemetry span for the entire agent execution
_span_cm = start_span(
"agent.execute",
attributes={"agent.name": agent_name, "agent.type": task_type or "react"},
)
_span = _span_cm.__enter__()
_exec_start = time.monotonic()
# 启动轨迹记录
if trace_recorder is not None:
trace_recorder.start_trace(
@ -397,6 +413,15 @@ class ReActEngine:
except Exception as e:
logger.warning(f"Failed to store task result in episodic memory: {e}")
# Telemetry: end span and record duration
_duration_ms = int((time.monotonic() - _exec_start) * 1000)
_span.set_attribute("agent.total_steps", len(trajectory))
_span.set_attribute("agent.total_tokens", total_tokens)
_span.set_attribute("agent.outcome", trace_outcome)
_span.set_attribute("agent.duration_ms", _duration_ms)
_span_cm.__exit__(None, None, None)
agent_duration_histogram().record(_duration_ms, {"agent.name": agent_name})
return ReActResult(
output=output,
trajectory=trajectory,

View File

@ -7,6 +7,8 @@ from agentkit.core.exceptions import LLMProviderError, ModelNotFoundError
from agentkit.llm.config import LLMConfig
from agentkit.llm.protocol import LLMProvider, LLMRequest, LLMResponse, StreamChunk, TokenUsage
from agentkit.llm.providers.tracker import UsageSummary, UsageTracker
from agentkit.telemetry.tracing import get_tracer, _OTEL_AVAILABLE
from agentkit.telemetry.metrics import llm_token_histogram
logger = logging.getLogger(__name__)
@ -45,48 +47,81 @@ class LLMGateway:
if not self._providers:
raise LLMProviderError("", "No provider registered")
# Telemetry: start LLM span
_span_cm = None
_span = None
if _OTEL_AVAILABLE:
tracer = get_tracer()
if tracer is not None:
from opentelemetry.trace import SpanKind
_span_cm = tracer.start_as_current_span(
"gen_ai.chat",
kind=SpanKind.CLIENT,
attributes={
"gen_ai.system": resolved_model.split("/")[0] if "/" in resolved_model else "unknown",
"gen_ai.operation.name": "chat",
"gen_ai.request.model": resolved_model,
},
)
_span = _span_cm.__enter__()
start = time.monotonic()
models_to_try = self._get_models_to_try(resolved_model)
last_error: LLMProviderError | None = None
for model_name in models_to_try:
try:
provider, actual_model = self._resolve_model(model_name)
except ModelNotFoundError:
continue
try:
for model_name in models_to_try:
try:
provider, actual_model = self._resolve_model(model_name)
except ModelNotFoundError:
continue
req = LLMRequest(
messages=messages,
model=actual_model,
tools=tools,
tool_choice=tool_choice,
**kwargs,
req = LLMRequest(
messages=messages,
model=actual_model,
tools=tools,
tool_choice=tool_choice,
**kwargs,
)
try:
response = await provider.chat(req)
break
except LLMProviderError as e:
last_error = e
logger.warning(f"Model '{model_name}' failed, trying next: {e}")
continue
else:
raise last_error or LLMProviderError("", f"All models failed for '{resolved_model}'")
latency_ms = (time.monotonic() - start) * 1000
# 计算成本
cost = self._calculate_cost(response.model, response.usage)
# 记录使用量
self._usage_tracker.record(
agent_name=agent_name,
model=response.model,
usage=response.usage,
cost=cost,
latency_ms=latency_ms,
)
try:
response = await provider.chat(req)
break
except LLMProviderError as e:
last_error = e
logger.warning(f"Model '{model_name}' failed, trying next: {e}")
continue
else:
raise last_error or LLMProviderError("", f"All models failed for '{resolved_model}'")
latency_ms = (time.monotonic() - start) * 1000
# Telemetry: record token usage and end span
if _span is not None:
_span.set_attribute("gen_ai.usage.input_tokens", response.usage.prompt_tokens)
_span.set_attribute("gen_ai.usage.output_tokens", response.usage.completion_tokens)
_span.set_attribute("gen_ai.response.model", response.model)
_span.set_attribute("gen_ai.duration_ms", int(latency_ms))
llm_token_histogram().record(
response.usage.total_tokens,
{"gen_ai.request.model": resolved_model},
)
# 计算成本
cost = self._calculate_cost(response.model, response.usage)
# 记录使用量
self._usage_tracker.record(
agent_name=agent_name,
model=response.model,
usage=response.usage,
cost=cost,
latency_ms=latency_ms,
)
return response
return response
finally:
if _span_cm is not None:
_span_cm.__exit__(None, None, None)
async def chat_stream(
self,

View File

@ -0,0 +1,38 @@
"""Telemetry module — OpenTelemetry integration (optional)
All tracing and metrics are no-op when opentelemetry packages are not installed.
"""
from agentkit.telemetry.tracing import (
get_tracer,
start_span,
trace_agent,
trace_tool,
trace_llm,
trace_pipeline_step,
_OTEL_AVAILABLE,
)
from agentkit.telemetry.metrics import (
agent_request_counter,
agent_duration_histogram,
llm_token_histogram,
tool_duration_histogram,
pipeline_step_histogram,
)
from agentkit.telemetry.setup import setup_telemetry
__all__ = [
"get_tracer",
"start_span",
"trace_agent",
"trace_tool",
"trace_llm",
"trace_pipeline_step",
"agent_request_counter",
"agent_duration_histogram",
"llm_token_histogram",
"tool_duration_histogram",
"pipeline_step_histogram",
"setup_telemetry",
"_OTEL_AVAILABLE",
]

View File

@ -0,0 +1,108 @@
"""Metric definitions — no-op when OTel not installed"""
try:
from opentelemetry import metrics
_OTEL_AVAILABLE = True
except ImportError:
_OTEL_AVAILABLE = False
class _NoOpCounter:
"""No-op counter used when OTel is not installed."""
def add(self, *args, **kwargs):
pass
class _NoOpHistogram:
"""No-op histogram used when OTel is not installed."""
def record(self, *args, **kwargs):
pass
class _NoOpUpDownCounter:
"""No-op up-down counter used when OTel is not installed."""
def add(self, *args, **kwargs):
pass
def get_meter(name: str = "fischer.agentkit"):
"""Get meter — returns None if OTel not installed."""
if _OTEL_AVAILABLE:
return metrics.get_meter(name)
return None
# Lazy-initialized metric instruments
_agent_request_counter = None
_agent_duration_histogram = None
_llm_token_histogram = None
_tool_duration_histogram = None
_pipeline_step_histogram = None
def _get_counter(name: str, description: str, unit: str = "1"):
meter = get_meter()
if meter is None:
return _NoOpCounter()
return meter.create_counter(name=name, description=description, unit=unit)
def _get_histogram(name: str, description: str, unit: str = "ms"):
meter = get_meter()
if meter is None:
return _NoOpHistogram()
return meter.create_histogram(name=name, description=description, unit=unit)
def agent_request_counter():
"""Total agent execution requests."""
global _agent_request_counter
if _agent_request_counter is None:
_agent_request_counter = _get_counter(
"agent.request.total", "Total agent execution requests"
)
return _agent_request_counter
def agent_duration_histogram():
"""Agent execution duration."""
global _agent_duration_histogram
if _agent_duration_histogram is None:
_agent_duration_histogram = _get_histogram(
"agent.execution.duration", "Agent execution duration"
)
return _agent_duration_histogram
def llm_token_histogram():
"""Token usage per LLM call."""
global _llm_token_histogram
if _llm_token_histogram is None:
_llm_token_histogram = _get_histogram(
"gen_ai.usage.tokens", "Token usage per LLM call", unit="1"
)
return _llm_token_histogram
def tool_duration_histogram():
"""Tool call duration."""
global _tool_duration_histogram
if _tool_duration_histogram is None:
_tool_duration_histogram = _get_histogram(
"tool.call.duration", "Tool call duration"
)
return _tool_duration_histogram
def pipeline_step_histogram():
"""Pipeline step duration."""
global _pipeline_step_histogram
if _pipeline_step_histogram is None:
_pipeline_step_histogram = _get_histogram(
"pipeline.step.duration", "Pipeline step duration"
)
return _pipeline_step_histogram

View File

@ -0,0 +1,93 @@
"""OTel initialization — called at app startup"""
import logging
logger = logging.getLogger(__name__)
def setup_telemetry(app, config: dict | None = None):
"""Initialize OpenTelemetry if installed and configured.
This is a no-op when:
- config is None or config.enabled is False
- opentelemetry packages are not installed
Args:
app: FastAPI application instance
config: Telemetry configuration dict with keys:
- enabled (bool): Whether to enable telemetry
- service_name (str): Service name for OTel resource
- otlp_endpoint (str): OTLP gRPC endpoint URL
- export_traces (bool): Whether to export traces
- export_metrics (bool): Whether to export metrics
"""
if not config or not config.get("enabled", False):
logger.info("Telemetry disabled")
return
try:
from opentelemetry import trace, metrics
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.sdk.resources import Resource
except ImportError:
logger.warning(
"OpenTelemetry packages not installed. Telemetry disabled."
)
return
service_name = config.get("service_name", "fischer-agentkit")
resource = Resource.create({"service.name": service_name})
# Tracing setup
if config.get("export_traces", True):
endpoint = config.get("otlp_endpoint", "http://localhost:4317")
try:
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import (
OTLPSpanExporter,
)
provider = TracerProvider(resource=resource)
provider.add_span_processor(
BatchSpanProcessor(
OTLPSpanExporter(endpoint=endpoint, insecure=True)
)
)
trace.set_tracer_provider(provider)
logger.info(f"Tracing enabled, exporting to {endpoint}")
except ImportError:
logger.warning(
"OTLP exporter not installed. Tracing disabled."
)
# Metrics setup
if config.get("export_metrics", True):
endpoint = config.get("otlp_endpoint", "http://localhost:4317")
try:
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import (
OTLPMetricExporter,
)
reader = PeriodicExportingMetricReader(
OTLPMetricExporter(endpoint=endpoint, insecure=True)
)
provider = MeterProvider(resource=resource, readers=[reader])
metrics.set_meter_provider(provider)
logger.info(f"Metrics enabled, exporting to {endpoint}")
except ImportError:
logger.warning(
"OTLP metric exporter not installed. Metrics disabled."
)
# FastAPI auto-instrumentation
try:
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
FastAPIInstrumentor.instrument_app(app, excluded_urls="health,metrics")
logger.info("FastAPI auto-instrumentation enabled")
except ImportError:
logger.warning(
"FastAPI instrumentation not installed. Skipping auto-instrumentation."
)

View File

@ -0,0 +1,232 @@
"""Tracing helpers — no-op when OTel not installed"""
import logging
import time
from functools import wraps
from typing import Any, Callable
logger = logging.getLogger(__name__)
# Try importing OTel — if not available, provide no-op implementations
try:
from opentelemetry import trace
from opentelemetry.trace import SpanKind, Status, StatusCode
_OTEL_AVAILABLE = True
except ImportError:
_OTEL_AVAILABLE = False
# Provide fallback stubs so module-level references work in tests
class _StubEnum:
INTERNAL = "INTERNAL"
CLIENT = "CLIENT"
SERVER = "SERVER"
PRODUCER = "PRODUCER"
CONSUMER = "CONSUMER"
SpanKind = _StubEnum # type: ignore[misc,assignment]
class Status: # type: ignore[no-redef]
def __init__(self, *args, **kwargs):
pass
class StatusCode: # type: ignore[no-redef]
UNSET = "UNSET"
OK = "OK"
ERROR = "ERROR"
class _NoOpSpan:
"""No-op span context manager used when OTel is not installed."""
def __enter__(self):
return self
def __exit__(self, *args):
pass
def set_attribute(self, *args):
pass
def add_event(self, *args):
pass
def set_status(self, *args):
pass
def record_exception(self, *args):
pass
def get_tracer(name: str = "fischer.agentkit"):
"""Get tracer — returns None if OTel not installed."""
if _OTEL_AVAILABLE:
return trace.get_tracer(name)
return None
def start_span(
name: str,
kind: Any = None,
attributes: dict | None = None,
):
"""Start a span — returns no-op span if OTel not installed.
Returns a context manager that yields a span (or no-op).
"""
if not _OTEL_AVAILABLE:
return _NoOpSpan()
tracer = get_tracer()
if tracer is None:
return _NoOpSpan()
if kind is None:
kind = SpanKind.INTERNAL
span = tracer.start_span(name, kind=kind, attributes=attributes)
return trace.use_span(span, end_on_exit=True)
def trace_agent(agent_name: str, agent_type: str = "react"):
"""Decorator: trace agent execution."""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(*args, **kwargs):
if not _OTEL_AVAILABLE:
return await func(*args, **kwargs)
tracer = get_tracer()
with tracer.start_as_current_span(
"agent.execute",
kind=SpanKind.INTERNAL,
attributes={"agent.name": agent_name, "agent.type": agent_type},
) as span:
start = time.monotonic()
try:
result = await func(*args, **kwargs)
duration_ms = int((time.monotonic() - start) * 1000)
span.set_attribute("agent.result.success", True)
span.set_attribute("agent.duration_ms", duration_ms)
return result
except Exception as e:
span.set_status(Status(StatusCode.ERROR, str(e)))
span.record_exception(e)
span.set_attribute("agent.result.success", False)
raise
return wrapper
return decorator
def trace_tool(tool_name: str):
"""Decorator: trace tool call."""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(*args, **kwargs):
if not _OTEL_AVAILABLE:
return await func(*args, **kwargs)
tracer = get_tracer()
with tracer.start_as_current_span(
"tool.execute",
kind=SpanKind.CLIENT,
attributes={"tool.name": tool_name},
) as span:
start = time.monotonic()
try:
result = await func(*args, **kwargs)
duration_ms = int((time.monotonic() - start) * 1000)
span.set_attribute("tool.duration_ms", duration_ms)
span.set_attribute("tool.result.success", True)
return result
except Exception as e:
duration_ms = int((time.monotonic() - start) * 1000)
span.set_attribute("tool.duration_ms", duration_ms)
span.set_attribute("tool.result.success", False)
span.set_status(Status(StatusCode.ERROR, str(e)))
span.record_exception(e)
raise
return wrapper
return decorator
def trace_llm(provider: str, model: str):
"""Decorator: trace LLM call — follows GenAI Semantic Conventions."""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(*args, **kwargs):
if not _OTEL_AVAILABLE:
return await func(*args, **kwargs)
tracer = get_tracer()
with tracer.start_as_current_span(
"gen_ai.chat",
kind=SpanKind.CLIENT,
attributes={
"gen_ai.system": provider,
"gen_ai.operation.name": "chat",
"gen_ai.request.model": model,
},
) as span:
start = time.monotonic()
try:
result = await func(*args, **kwargs)
duration_ms = int((time.monotonic() - start) * 1000)
span.set_attribute("gen_ai.duration_ms", duration_ms)
# Record token usage if available on the response
if hasattr(result, "usage") and result.usage is not None:
span.set_attribute(
"gen_ai.usage.input_tokens",
getattr(result.usage, "prompt_tokens", 0),
)
span.set_attribute(
"gen_ai.usage.output_tokens",
getattr(result.usage, "completion_tokens", 0),
)
return result
except Exception as e:
span.set_status(Status(StatusCode.ERROR, str(e)))
span.record_exception(e)
raise
return wrapper
return decorator
def trace_pipeline_step(pipeline_name: str, step_name: str):
"""Decorator: trace pipeline step execution."""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(*args, **kwargs):
if not _OTEL_AVAILABLE:
return await func(*args, **kwargs)
tracer = get_tracer()
with tracer.start_as_current_span(
"pipeline.step",
kind=SpanKind.INTERNAL,
attributes={
"pipeline.name": pipeline_name,
"step.name": step_name,
},
) as span:
start = time.monotonic()
try:
result = await func(*args, **kwargs)
duration_ms = int((time.monotonic() - start) * 1000)
span.set_attribute("step.duration_ms", duration_ms)
span.set_attribute("step.status", "success")
return result
except Exception as e:
duration_ms = int((time.monotonic() - start) * 1000)
span.set_attribute("step.duration_ms", duration_ms)
span.set_attribute("step.status", "error")
span.set_status(Status(StatusCode.ERROR, str(e)))
span.record_exception(e)
raise
return wrapper
return decorator

View File

@ -1,8 +1,12 @@
"""Tool 抽象基类 - 统一工具接口"""
import time
from abc import ABC, abstractmethod
from typing import Any
from agentkit.telemetry.tracing import start_span
from agentkit.telemetry.metrics import tool_duration_histogram
class Tool(ABC):
"""工具抽象基类
@ -45,14 +49,32 @@ class Tool(ABC):
async def safe_execute(self, **kwargs) -> dict:
"""带钩子的安全执行"""
_span_cm = start_span(
"tool.execute",
attributes={"tool.name": self.name},
)
_span = _span_cm.__enter__()
_start = time.monotonic()
try:
await self.before_execute(**kwargs)
result = await self.execute(**kwargs)
await self.after_execute(result, **kwargs)
_duration_ms = int((time.monotonic() - _start) * 1000)
if _span is not None:
_span.set_attribute("tool.duration_ms", _duration_ms)
_span.set_attribute("tool.result.success", True)
tool_duration_histogram().record(_duration_ms, {"tool.name": self.name})
return result
except Exception as e:
_duration_ms = int((time.monotonic() - _start) * 1000)
if _span is not None:
_span.set_attribute("tool.duration_ms", _duration_ms)
_span.set_attribute("tool.result.success", False)
tool_duration_histogram().record(_duration_ms, {"tool.name": self.name})
await self.on_error(e, **kwargs)
raise
finally:
_span_cm.__exit__(None, None, None)
def to_dict(self) -> dict:
return {

View File

@ -0,0 +1,472 @@
"""Unit tests for telemetry module — OpenTelemetry integration"""
import asyncio
import importlib
import sys
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
# ── No-op behavior when OTel not installed ──────────────────────────
class TestNoOpWhenOTelNotInstalled:
"""All operations are no-op when opentelemetry is not installed."""
def test_tracing_noop_span_context_manager(self):
"""_NoOpSpan works as context manager without errors."""
from agentkit.telemetry.tracing import _NoOpSpan
span = _NoOpSpan()
with span as s:
s.set_attribute("key", "value")
s.add_event("event")
s.set_status("ok")
s.record_exception(Exception("test"))
def test_get_tracer_returns_none_without_otel(self):
"""get_tracer returns None when OTel is not installed."""
from agentkit.telemetry.tracing import _OTEL_AVAILABLE, get_tracer
if _OTEL_AVAILABLE:
pytest.skip("OTel is installed, skipping no-op test")
assert get_tracer() is None
def test_start_span_returns_noop_without_otel(self):
"""start_span returns no-op span when OTel is not installed."""
from agentkit.telemetry.tracing import _OTEL_AVAILABLE, start_span, _NoOpSpan
if _OTEL_AVAILABLE:
pytest.skip("OTel is installed, skipping no-op test")
span_cm = start_span("test.span")
assert isinstance(span_cm, _NoOpSpan)
def test_metrics_noop_counter(self):
"""No-op counter add() does not raise."""
from agentkit.telemetry.metrics import _NoOpCounter
counter = _NoOpCounter()
counter.add(1, {"key": "value"}) # Should not raise
def test_metrics_noop_histogram(self):
"""No-op histogram record() does not raise."""
from agentkit.telemetry.metrics import _NoOpHistogram
hist = _NoOpHistogram()
hist.record(100, {"key": "value"}) # Should not raise
def test_metrics_get_meter_returns_none_without_otel(self):
"""get_meter returns None when OTel is not installed."""
from agentkit.telemetry.metrics import _OTEL_AVAILABLE, get_meter
if _OTEL_AVAILABLE:
pytest.skip("OTel is installed, skipping no-op test")
assert get_meter() is None
def test_metric_helpers_return_noop_without_otel(self):
"""Metric helper functions return no-op instruments when OTel not installed."""
from agentkit.telemetry.metrics import (
_OTEL_AVAILABLE,
_NoOpCounter,
_NoOpHistogram,
agent_request_counter,
agent_duration_histogram,
llm_token_histogram,
tool_duration_histogram,
pipeline_step_histogram,
)
if _OTEL_AVAILABLE:
pytest.skip("OTel is installed, skipping no-op test")
# Reset lazy singletons to force re-creation
import agentkit.telemetry.metrics as m
m._agent_request_counter = None
m._agent_duration_histogram = None
m._llm_token_histogram = None
m._tool_duration_histogram = None
m._pipeline_step_histogram = None
assert isinstance(agent_request_counter(), _NoOpCounter)
assert isinstance(agent_duration_histogram(), _NoOpHistogram)
assert isinstance(llm_token_histogram(), _NoOpHistogram)
assert isinstance(tool_duration_histogram(), _NoOpHistogram)
assert isinstance(pipeline_step_histogram(), _NoOpHistogram)
# ── Tracing decorator tests ─────────────────────────────────────────
class TestTraceAgentDecorator:
"""trace_agent decorator works with and without OTel."""
@pytest.mark.asyncio
async def test_decorator_works_without_otel(self):
"""trace_agent decorator passes through when OTel not installed."""
from agentkit.telemetry.tracing import _OTEL_AVAILABLE, trace_agent
if _OTEL_AVAILABLE:
pytest.skip("OTel is installed, skipping no-op test")
@trace_agent("test_agent", "react")
async def my_func():
return "result"
result = await my_func()
assert result == "result"
@pytest.mark.asyncio
async def test_decorator_propagates_exception_without_otel(self):
"""trace_agent propagates exceptions when OTel not installed."""
from agentkit.telemetry.tracing import _OTEL_AVAILABLE, trace_agent
if _OTEL_AVAILABLE:
pytest.skip("OTel is installed, skipping no-op test")
@trace_agent("test_agent")
async def my_func():
raise ValueError("test error")
with pytest.raises(ValueError, match="test error"):
await my_func()
class TestTraceToolDecorator:
"""trace_tool decorator tests."""
@pytest.mark.asyncio
async def test_decorator_works_without_otel(self):
"""trace_tool decorator passes through when OTel not installed."""
from agentkit.telemetry.tracing import _OTEL_AVAILABLE, trace_tool
if _OTEL_AVAILABLE:
pytest.skip("OTel is installed, skipping no-op test")
@trace_tool("my_tool")
async def my_func():
return {"result": "ok"}
result = await my_func()
assert result == {"result": "ok"}
@pytest.mark.asyncio
async def test_decorator_propagates_exception_without_otel(self):
"""trace_tool propagates exceptions when OTel not installed."""
from agentkit.telemetry.tracing import _OTEL_AVAILABLE, trace_tool
if _OTEL_AVAILABLE:
pytest.skip("OTel is installed, skipping no-op test")
@trace_tool("my_tool")
async def my_func():
raise RuntimeError("tool error")
with pytest.raises(RuntimeError, match="tool error"):
await my_func()
class TestTraceLLMDecorator:
"""trace_llm decorator tests."""
@pytest.mark.asyncio
async def test_decorator_works_without_otel(self):
"""trace_llm decorator passes through when OTel not installed."""
from agentkit.telemetry.tracing import _OTEL_AVAILABLE, trace_llm
if _OTEL_AVAILABLE:
pytest.skip("OTel is installed, skipping no-op test")
@trace_llm("openai", "gpt-4")
async def my_func():
return MagicMock(usage=MagicMock(prompt_tokens=10, completion_tokens=20))
result = await my_func()
assert result is not None
@pytest.mark.asyncio
async def test_decorator_propagates_exception_without_otel(self):
"""trace_llm propagates exceptions when OTel not installed."""
from agentkit.telemetry.tracing import _OTEL_AVAILABLE, trace_llm
if _OTEL_AVAILABLE:
pytest.skip("OTel is installed, skipping no-op test")
@trace_llm("openai", "gpt-4")
async def my_func():
raise ConnectionError("LLM error")
with pytest.raises(ConnectionError, match="LLM error"):
await my_func()
class TestTracePipelineStepDecorator:
"""trace_pipeline_step decorator tests."""
@pytest.mark.asyncio
async def test_decorator_works_without_otel(self):
"""trace_pipeline_step decorator passes through when OTel not installed."""
from agentkit.telemetry.tracing import _OTEL_AVAILABLE, trace_pipeline_step
if _OTEL_AVAILABLE:
pytest.skip("OTel is installed, skipping no-op test")
@trace_pipeline_step("my_pipeline", "step_1")
async def my_func():
return "step_result"
result = await my_func()
assert result == "step_result"
@pytest.mark.asyncio
async def test_decorator_propagates_exception_without_otel(self):
"""trace_pipeline_step propagates exceptions when OTel not installed."""
from agentkit.telemetry.tracing import _OTEL_AVAILABLE, trace_pipeline_step
if _OTEL_AVAILABLE:
pytest.skip("OTel is installed, skipping no-op test")
@trace_pipeline_step("my_pipeline", "step_1")
async def my_func():
raise RuntimeError("step failed")
with pytest.raises(RuntimeError, match="step failed"):
await my_func()
# ── OTel installed (mocked) tests ───────────────────────────────────
class TestTracingWithMockedOTel:
"""Test tracing with mocked OTel imports."""
@pytest.mark.asyncio
async def test_trace_agent_with_mocked_otel(self):
"""trace_agent creates span with correct attributes when OTel is available."""
mock_span = MagicMock()
mock_span_cm = MagicMock()
mock_span_cm.__enter__ = MagicMock(return_value=mock_span)
mock_span_cm.__exit__ = MagicMock(return_value=False)
mock_tracer = MagicMock()
mock_tracer.start_as_current_span.return_value = mock_span_cm
with patch("agentkit.telemetry.tracing._OTEL_AVAILABLE", True), \
patch("agentkit.telemetry.tracing.get_tracer", return_value=mock_tracer), \
patch("agentkit.telemetry.tracing.SpanKind"), \
patch("agentkit.telemetry.tracing.Status"), \
patch("agentkit.telemetry.tracing.StatusCode"):
from agentkit.telemetry.tracing import trace_agent
@trace_agent("test_agent", "react")
async def my_func():
return "result"
result = await my_func()
assert result == "result"
mock_tracer.start_as_current_span.assert_called_once()
call_kwargs = mock_tracer.start_as_current_span.call_args
assert call_kwargs[1]["attributes"]["agent.name"] == "test_agent"
assert call_kwargs[1]["attributes"]["agent.type"] == "react"
@pytest.mark.asyncio
async def test_trace_tool_with_mocked_otel(self):
"""trace_tool creates span with tool.name attribute."""
mock_span = MagicMock()
mock_span_cm = MagicMock()
mock_span_cm.__enter__ = MagicMock(return_value=mock_span)
mock_span_cm.__exit__ = MagicMock(return_value=False)
mock_tracer = MagicMock()
mock_tracer.start_as_current_span.return_value = mock_span_cm
with patch("agentkit.telemetry.tracing._OTEL_AVAILABLE", True), \
patch("agentkit.telemetry.tracing.get_tracer", return_value=mock_tracer), \
patch("agentkit.telemetry.tracing.SpanKind"), \
patch("agentkit.telemetry.tracing.Status"), \
patch("agentkit.telemetry.tracing.StatusCode"):
from agentkit.telemetry.tracing import trace_tool
@trace_tool("search_tool")
async def my_func():
return {"found": True}
result = await my_func()
assert result == {"found": True}
call_kwargs = mock_tracer.start_as_current_span.call_args
assert call_kwargs[1]["attributes"]["tool.name"] == "search_tool"
@pytest.mark.asyncio
async def test_trace_llm_with_mocked_otel(self):
"""trace_llm creates span with gen_ai semantic conventions."""
mock_span = MagicMock()
mock_span_cm = MagicMock()
mock_span_cm.__enter__ = MagicMock(return_value=mock_span)
mock_span_cm.__exit__ = MagicMock(return_value=False)
mock_tracer = MagicMock()
mock_tracer.start_as_current_span.return_value = mock_span_cm
mock_usage = MagicMock()
mock_usage.prompt_tokens = 50
mock_usage.completion_tokens = 100
mock_response = MagicMock()
mock_response.usage = mock_usage
with patch("agentkit.telemetry.tracing._OTEL_AVAILABLE", True), \
patch("agentkit.telemetry.tracing.get_tracer", return_value=mock_tracer), \
patch("agentkit.telemetry.tracing.SpanKind"), \
patch("agentkit.telemetry.tracing.Status"), \
patch("agentkit.telemetry.tracing.StatusCode"):
from agentkit.telemetry.tracing import trace_llm
@trace_llm("openai", "gpt-4")
async def my_func():
return mock_response
result = await my_func()
assert result is mock_response
call_kwargs = mock_tracer.start_as_current_span.call_args
attrs = call_kwargs[1]["attributes"]
assert attrs["gen_ai.system"] == "openai"
assert attrs["gen_ai.operation.name"] == "chat"
assert attrs["gen_ai.request.model"] == "gpt-4"
# Token usage should be recorded on span
mock_span.set_attribute.assert_any_call("gen_ai.usage.input_tokens", 50)
mock_span.set_attribute.assert_any_call("gen_ai.usage.output_tokens", 100)
@pytest.mark.asyncio
async def test_trace_pipeline_step_with_mocked_otel(self):
"""trace_pipeline_step creates span with pipeline and step attributes."""
mock_span = MagicMock()
mock_span_cm = MagicMock()
mock_span_cm.__enter__ = MagicMock(return_value=mock_span)
mock_span_cm.__exit__ = MagicMock(return_value=False)
mock_tracer = MagicMock()
mock_tracer.start_as_current_span.return_value = mock_span_cm
with patch("agentkit.telemetry.tracing._OTEL_AVAILABLE", True), \
patch("agentkit.telemetry.tracing.get_tracer", return_value=mock_tracer), \
patch("agentkit.telemetry.tracing.SpanKind"), \
patch("agentkit.telemetry.tracing.Status"), \
patch("agentkit.telemetry.tracing.StatusCode"):
from agentkit.telemetry.tracing import trace_pipeline_step
@trace_pipeline_step("geo_pipeline", "analyze")
async def my_func():
return "done"
result = await my_func()
assert result == "done"
call_kwargs = mock_tracer.start_as_current_span.call_args
attrs = call_kwargs[1]["attributes"]
assert attrs["pipeline.name"] == "geo_pipeline"
assert attrs["step.name"] == "analyze"
# ── setup_telemetry tests ───────────────────────────────────────────
class TestSetupTelemetry:
"""setup_telemetry initialization tests."""
def test_no_config_is_noop(self):
"""setup_telemetry with no config is a no-op."""
from agentkit.telemetry.setup import setup_telemetry
mock_app = MagicMock()
setup_telemetry(mock_app, None) # Should not raise
# No auto-instrumentation should happen
mock_app.state = MagicMock() # Just ensure no crash
def test_disabled_config_is_noop(self):
"""setup_telemetry with enabled=False is a no-op."""
from agentkit.telemetry.setup import setup_telemetry
mock_app = MagicMock()
setup_telemetry(mock_app, {"enabled": False}) # Should not raise
def test_config_without_otel_logs_warning(self):
"""setup_telemetry with config but OTel not installed logs warning."""
from agentkit.telemetry.setup import setup_telemetry
mock_app = MagicMock()
# This should not raise even if OTel is not installed
# It will log a warning internally
config = {"enabled": True, "service_name": "test"}
# If OTel is installed, this will try to set up providers
# If not, it will log a warning and return
setup_telemetry(mock_app, config) # Should not raise
def test_empty_config_is_noop(self):
"""setup_telemetry with empty dict is a no-op (enabled defaults to False)."""
from agentkit.telemetry.setup import setup_telemetry
mock_app = MagicMock()
setup_telemetry(mock_app, {}) # Should not raise
# ── Integration: Tool safe_execute with telemetry ───────────────────
class TestToolTelemetryIntegration:
"""Test that Tool.safe_execute records telemetry."""
@pytest.mark.asyncio
async def test_safe_execute_records_noop_telemetry(self):
"""safe_execute works with no-op telemetry (OTel not installed)."""
from agentkit.tools.base import Tool
class DummyTool(Tool):
async def execute(self, **kwargs):
return {"result": "ok"}
tool = DummyTool(name="test_tool", description="A test tool")
result = await tool.safe_execute(query="hello")
assert result == {"result": "ok"}
@pytest.mark.asyncio
async def test_safe_execute_error_records_telemetry(self):
"""safe_execute records error telemetry on exception."""
from agentkit.tools.base import Tool
class FailingTool(Tool):
async def execute(self, **kwargs):
raise ValueError("tool failed")
tool = FailingTool(name="failing_tool", description="A failing tool")
with pytest.raises(ValueError, match="tool failed"):
await tool.safe_execute(query="hello")
# ── start_span helper tests ─────────────────────────────────────────
class TestStartSpan:
"""Test start_span helper function."""
def test_start_span_noop_without_otel(self):
"""start_span returns no-op span context manager without OTel."""
from agentkit.telemetry.tracing import _OTEL_AVAILABLE, start_span, _NoOpSpan
if _OTEL_AVAILABLE:
pytest.skip("OTel is installed, skipping no-op test")
cm = start_span("test.span", attributes={"key": "value"})
assert isinstance(cm, _NoOpSpan)
# Should work as context manager
with cm:
pass # No error
def test_start_span_with_attributes(self):
"""start_span accepts attributes parameter without error."""
from agentkit.telemetry.tracing import start_span
cm = start_span("test.span", attributes={"key": "value", "count": 42})
with cm:
pass # No error regardless of OTel availability