diff --git a/src/agentkit/core/react.py b/src/agentkit/core/react.py index 345dfe5..4abd0e9 100644 --- a/src/agentkit/core/react.py +++ b/src/agentkit/core/react.py @@ -17,6 +17,11 @@ from agentkit.core.exceptions import TaskCancelledError, TaskTimeoutError from agentkit.core.protocol import CancellationToken from agentkit.llm.gateway import LLMGateway from agentkit.tools.base import Tool +from agentkit.telemetry.tracing import get_tracer, start_span, _OTEL_AVAILABLE +from agentkit.telemetry.metrics import ( + agent_request_counter, + agent_duration_histogram, +) if TYPE_CHECKING: from agentkit.core.compressor import ContextCompressor @@ -165,6 +170,17 @@ class ReActEngine: tools = tools or [] tool_schemas = self._build_tool_schemas(tools) if tools else None + # Telemetry: record agent request + agent_request_counter().add(1, {"agent.name": agent_name, "agent.type": task_type or "react"}) + + # Start telemetry span for the entire agent execution + _span_cm = start_span( + "agent.execute", + attributes={"agent.name": agent_name, "agent.type": task_type or "react"}, + ) + _span = _span_cm.__enter__() + _exec_start = time.monotonic() + # 启动轨迹记录 if trace_recorder is not None: trace_recorder.start_trace( @@ -397,6 +413,15 @@ class ReActEngine: except Exception as e: logger.warning(f"Failed to store task result in episodic memory: {e}") + # Telemetry: end span and record duration + _duration_ms = int((time.monotonic() - _exec_start) * 1000) + _span.set_attribute("agent.total_steps", len(trajectory)) + _span.set_attribute("agent.total_tokens", total_tokens) + _span.set_attribute("agent.outcome", trace_outcome) + _span.set_attribute("agent.duration_ms", _duration_ms) + _span_cm.__exit__(None, None, None) + agent_duration_histogram().record(_duration_ms, {"agent.name": agent_name}) + return ReActResult( output=output, trajectory=trajectory, diff --git a/src/agentkit/llm/gateway.py b/src/agentkit/llm/gateway.py index 3b5b0d3..7e7f20e 100644 --- a/src/agentkit/llm/gateway.py +++ b/src/agentkit/llm/gateway.py @@ -7,6 +7,8 @@ from agentkit.core.exceptions import LLMProviderError, ModelNotFoundError from agentkit.llm.config import LLMConfig from agentkit.llm.protocol import LLMProvider, LLMRequest, LLMResponse, StreamChunk, TokenUsage from agentkit.llm.providers.tracker import UsageSummary, UsageTracker +from agentkit.telemetry.tracing import get_tracer, _OTEL_AVAILABLE +from agentkit.telemetry.metrics import llm_token_histogram logger = logging.getLogger(__name__) @@ -45,48 +47,81 @@ class LLMGateway: if not self._providers: raise LLMProviderError("", "No provider registered") + # Telemetry: start LLM span + _span_cm = None + _span = None + if _OTEL_AVAILABLE: + tracer = get_tracer() + if tracer is not None: + from opentelemetry.trace import SpanKind + _span_cm = tracer.start_as_current_span( + "gen_ai.chat", + kind=SpanKind.CLIENT, + attributes={ + "gen_ai.system": resolved_model.split("/")[0] if "/" in resolved_model else "unknown", + "gen_ai.operation.name": "chat", + "gen_ai.request.model": resolved_model, + }, + ) + _span = _span_cm.__enter__() + start = time.monotonic() models_to_try = self._get_models_to_try(resolved_model) last_error: LLMProviderError | None = None - for model_name in models_to_try: - try: - provider, actual_model = self._resolve_model(model_name) - except ModelNotFoundError: - continue + try: + for model_name in models_to_try: + try: + provider, actual_model = self._resolve_model(model_name) + except ModelNotFoundError: + continue - req = LLMRequest( - messages=messages, - model=actual_model, - tools=tools, - tool_choice=tool_choice, - **kwargs, + req = LLMRequest( + messages=messages, + model=actual_model, + tools=tools, + tool_choice=tool_choice, + **kwargs, + ) + try: + response = await provider.chat(req) + break + except LLMProviderError as e: + last_error = e + logger.warning(f"Model '{model_name}' failed, trying next: {e}") + continue + else: + raise last_error or LLMProviderError("", f"All models failed for '{resolved_model}'") + + latency_ms = (time.monotonic() - start) * 1000 + + # 计算成本 + cost = self._calculate_cost(response.model, response.usage) + + # 记录使用量 + self._usage_tracker.record( + agent_name=agent_name, + model=response.model, + usage=response.usage, + cost=cost, + latency_ms=latency_ms, ) - try: - response = await provider.chat(req) - break - except LLMProviderError as e: - last_error = e - logger.warning(f"Model '{model_name}' failed, trying next: {e}") - continue - else: - raise last_error or LLMProviderError("", f"All models failed for '{resolved_model}'") - latency_ms = (time.monotonic() - start) * 1000 + # Telemetry: record token usage and end span + if _span is not None: + _span.set_attribute("gen_ai.usage.input_tokens", response.usage.prompt_tokens) + _span.set_attribute("gen_ai.usage.output_tokens", response.usage.completion_tokens) + _span.set_attribute("gen_ai.response.model", response.model) + _span.set_attribute("gen_ai.duration_ms", int(latency_ms)) + llm_token_histogram().record( + response.usage.total_tokens, + {"gen_ai.request.model": resolved_model}, + ) - # 计算成本 - cost = self._calculate_cost(response.model, response.usage) - - # 记录使用量 - self._usage_tracker.record( - agent_name=agent_name, - model=response.model, - usage=response.usage, - cost=cost, - latency_ms=latency_ms, - ) - - return response + return response + finally: + if _span_cm is not None: + _span_cm.__exit__(None, None, None) async def chat_stream( self, diff --git a/src/agentkit/telemetry/__init__.py b/src/agentkit/telemetry/__init__.py new file mode 100644 index 0000000..4f3984b --- /dev/null +++ b/src/agentkit/telemetry/__init__.py @@ -0,0 +1,38 @@ +"""Telemetry module — OpenTelemetry integration (optional) + +All tracing and metrics are no-op when opentelemetry packages are not installed. +""" + +from agentkit.telemetry.tracing import ( + get_tracer, + start_span, + trace_agent, + trace_tool, + trace_llm, + trace_pipeline_step, + _OTEL_AVAILABLE, +) +from agentkit.telemetry.metrics import ( + agent_request_counter, + agent_duration_histogram, + llm_token_histogram, + tool_duration_histogram, + pipeline_step_histogram, +) +from agentkit.telemetry.setup import setup_telemetry + +__all__ = [ + "get_tracer", + "start_span", + "trace_agent", + "trace_tool", + "trace_llm", + "trace_pipeline_step", + "agent_request_counter", + "agent_duration_histogram", + "llm_token_histogram", + "tool_duration_histogram", + "pipeline_step_histogram", + "setup_telemetry", + "_OTEL_AVAILABLE", +] diff --git a/src/agentkit/telemetry/metrics.py b/src/agentkit/telemetry/metrics.py new file mode 100644 index 0000000..0525be7 --- /dev/null +++ b/src/agentkit/telemetry/metrics.py @@ -0,0 +1,108 @@ +"""Metric definitions — no-op when OTel not installed""" + +try: + from opentelemetry import metrics + + _OTEL_AVAILABLE = True +except ImportError: + _OTEL_AVAILABLE = False + + +class _NoOpCounter: + """No-op counter used when OTel is not installed.""" + + def add(self, *args, **kwargs): + pass + + +class _NoOpHistogram: + """No-op histogram used when OTel is not installed.""" + + def record(self, *args, **kwargs): + pass + + +class _NoOpUpDownCounter: + """No-op up-down counter used when OTel is not installed.""" + + def add(self, *args, **kwargs): + pass + + +def get_meter(name: str = "fischer.agentkit"): + """Get meter — returns None if OTel not installed.""" + if _OTEL_AVAILABLE: + return metrics.get_meter(name) + return None + + +# Lazy-initialized metric instruments +_agent_request_counter = None +_agent_duration_histogram = None +_llm_token_histogram = None +_tool_duration_histogram = None +_pipeline_step_histogram = None + + +def _get_counter(name: str, description: str, unit: str = "1"): + meter = get_meter() + if meter is None: + return _NoOpCounter() + return meter.create_counter(name=name, description=description, unit=unit) + + +def _get_histogram(name: str, description: str, unit: str = "ms"): + meter = get_meter() + if meter is None: + return _NoOpHistogram() + return meter.create_histogram(name=name, description=description, unit=unit) + + +def agent_request_counter(): + """Total agent execution requests.""" + global _agent_request_counter + if _agent_request_counter is None: + _agent_request_counter = _get_counter( + "agent.request.total", "Total agent execution requests" + ) + return _agent_request_counter + + +def agent_duration_histogram(): + """Agent execution duration.""" + global _agent_duration_histogram + if _agent_duration_histogram is None: + _agent_duration_histogram = _get_histogram( + "agent.execution.duration", "Agent execution duration" + ) + return _agent_duration_histogram + + +def llm_token_histogram(): + """Token usage per LLM call.""" + global _llm_token_histogram + if _llm_token_histogram is None: + _llm_token_histogram = _get_histogram( + "gen_ai.usage.tokens", "Token usage per LLM call", unit="1" + ) + return _llm_token_histogram + + +def tool_duration_histogram(): + """Tool call duration.""" + global _tool_duration_histogram + if _tool_duration_histogram is None: + _tool_duration_histogram = _get_histogram( + "tool.call.duration", "Tool call duration" + ) + return _tool_duration_histogram + + +def pipeline_step_histogram(): + """Pipeline step duration.""" + global _pipeline_step_histogram + if _pipeline_step_histogram is None: + _pipeline_step_histogram = _get_histogram( + "pipeline.step.duration", "Pipeline step duration" + ) + return _pipeline_step_histogram diff --git a/src/agentkit/telemetry/setup.py b/src/agentkit/telemetry/setup.py new file mode 100644 index 0000000..5da9581 --- /dev/null +++ b/src/agentkit/telemetry/setup.py @@ -0,0 +1,93 @@ +"""OTel initialization — called at app startup""" + +import logging + +logger = logging.getLogger(__name__) + + +def setup_telemetry(app, config: dict | None = None): + """Initialize OpenTelemetry if installed and configured. + + This is a no-op when: + - config is None or config.enabled is False + - opentelemetry packages are not installed + + Args: + app: FastAPI application instance + config: Telemetry configuration dict with keys: + - enabled (bool): Whether to enable telemetry + - service_name (str): Service name for OTel resource + - otlp_endpoint (str): OTLP gRPC endpoint URL + - export_traces (bool): Whether to export traces + - export_metrics (bool): Whether to export metrics + """ + if not config or not config.get("enabled", False): + logger.info("Telemetry disabled") + return + + try: + from opentelemetry import trace, metrics + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.export import BatchSpanProcessor + from opentelemetry.sdk.metrics import MeterProvider + from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader + from opentelemetry.sdk.resources import Resource + except ImportError: + logger.warning( + "OpenTelemetry packages not installed. Telemetry disabled." + ) + return + + service_name = config.get("service_name", "fischer-agentkit") + resource = Resource.create({"service.name": service_name}) + + # Tracing setup + if config.get("export_traces", True): + endpoint = config.get("otlp_endpoint", "http://localhost:4317") + try: + from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import ( + OTLPSpanExporter, + ) + + provider = TracerProvider(resource=resource) + provider.add_span_processor( + BatchSpanProcessor( + OTLPSpanExporter(endpoint=endpoint, insecure=True) + ) + ) + trace.set_tracer_provider(provider) + logger.info(f"Tracing enabled, exporting to {endpoint}") + except ImportError: + logger.warning( + "OTLP exporter not installed. Tracing disabled." + ) + + # Metrics setup + if config.get("export_metrics", True): + endpoint = config.get("otlp_endpoint", "http://localhost:4317") + try: + from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import ( + OTLPMetricExporter, + ) + + reader = PeriodicExportingMetricReader( + OTLPMetricExporter(endpoint=endpoint, insecure=True) + ) + provider = MeterProvider(resource=resource, readers=[reader]) + metrics.set_meter_provider(provider) + logger.info(f"Metrics enabled, exporting to {endpoint}") + except ImportError: + logger.warning( + "OTLP metric exporter not installed. Metrics disabled." + ) + + # FastAPI auto-instrumentation + try: + from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor + + FastAPIInstrumentor.instrument_app(app, excluded_urls="health,metrics") + logger.info("FastAPI auto-instrumentation enabled") + except ImportError: + logger.warning( + "FastAPI instrumentation not installed. Skipping auto-instrumentation." + ) diff --git a/src/agentkit/telemetry/tracing.py b/src/agentkit/telemetry/tracing.py new file mode 100644 index 0000000..531eb66 --- /dev/null +++ b/src/agentkit/telemetry/tracing.py @@ -0,0 +1,232 @@ +"""Tracing helpers — no-op when OTel not installed""" + +import logging +import time +from functools import wraps +from typing import Any, Callable + +logger = logging.getLogger(__name__) + +# Try importing OTel — if not available, provide no-op implementations +try: + from opentelemetry import trace + from opentelemetry.trace import SpanKind, Status, StatusCode + + _OTEL_AVAILABLE = True +except ImportError: + _OTEL_AVAILABLE = False + + # Provide fallback stubs so module-level references work in tests + class _StubEnum: + INTERNAL = "INTERNAL" + CLIENT = "CLIENT" + SERVER = "SERVER" + PRODUCER = "PRODUCER" + CONSUMER = "CONSUMER" + + SpanKind = _StubEnum # type: ignore[misc,assignment] + + class Status: # type: ignore[no-redef] + def __init__(self, *args, **kwargs): + pass + + class StatusCode: # type: ignore[no-redef] + UNSET = "UNSET" + OK = "OK" + ERROR = "ERROR" + + +class _NoOpSpan: + """No-op span context manager used when OTel is not installed.""" + + def __enter__(self): + return self + + def __exit__(self, *args): + pass + + def set_attribute(self, *args): + pass + + def add_event(self, *args): + pass + + def set_status(self, *args): + pass + + def record_exception(self, *args): + pass + + +def get_tracer(name: str = "fischer.agentkit"): + """Get tracer — returns None if OTel not installed.""" + if _OTEL_AVAILABLE: + return trace.get_tracer(name) + return None + + +def start_span( + name: str, + kind: Any = None, + attributes: dict | None = None, +): + """Start a span — returns no-op span if OTel not installed. + + Returns a context manager that yields a span (or no-op). + """ + if not _OTEL_AVAILABLE: + return _NoOpSpan() + tracer = get_tracer() + if tracer is None: + return _NoOpSpan() + if kind is None: + kind = SpanKind.INTERNAL + span = tracer.start_span(name, kind=kind, attributes=attributes) + return trace.use_span(span, end_on_exit=True) + + +def trace_agent(agent_name: str, agent_type: str = "react"): + """Decorator: trace agent execution.""" + + def decorator(func: Callable) -> Callable: + @wraps(func) + async def wrapper(*args, **kwargs): + if not _OTEL_AVAILABLE: + return await func(*args, **kwargs) + tracer = get_tracer() + with tracer.start_as_current_span( + "agent.execute", + kind=SpanKind.INTERNAL, + attributes={"agent.name": agent_name, "agent.type": agent_type}, + ) as span: + start = time.monotonic() + try: + result = await func(*args, **kwargs) + duration_ms = int((time.monotonic() - start) * 1000) + span.set_attribute("agent.result.success", True) + span.set_attribute("agent.duration_ms", duration_ms) + return result + except Exception as e: + span.set_status(Status(StatusCode.ERROR, str(e))) + span.record_exception(e) + span.set_attribute("agent.result.success", False) + raise + + return wrapper + + return decorator + + +def trace_tool(tool_name: str): + """Decorator: trace tool call.""" + + def decorator(func: Callable) -> Callable: + @wraps(func) + async def wrapper(*args, **kwargs): + if not _OTEL_AVAILABLE: + return await func(*args, **kwargs) + tracer = get_tracer() + with tracer.start_as_current_span( + "tool.execute", + kind=SpanKind.CLIENT, + attributes={"tool.name": tool_name}, + ) as span: + start = time.monotonic() + try: + result = await func(*args, **kwargs) + duration_ms = int((time.monotonic() - start) * 1000) + span.set_attribute("tool.duration_ms", duration_ms) + span.set_attribute("tool.result.success", True) + return result + except Exception as e: + duration_ms = int((time.monotonic() - start) * 1000) + span.set_attribute("tool.duration_ms", duration_ms) + span.set_attribute("tool.result.success", False) + span.set_status(Status(StatusCode.ERROR, str(e))) + span.record_exception(e) + raise + + return wrapper + + return decorator + + +def trace_llm(provider: str, model: str): + """Decorator: trace LLM call — follows GenAI Semantic Conventions.""" + + def decorator(func: Callable) -> Callable: + @wraps(func) + async def wrapper(*args, **kwargs): + if not _OTEL_AVAILABLE: + return await func(*args, **kwargs) + tracer = get_tracer() + with tracer.start_as_current_span( + "gen_ai.chat", + kind=SpanKind.CLIENT, + attributes={ + "gen_ai.system": provider, + "gen_ai.operation.name": "chat", + "gen_ai.request.model": model, + }, + ) as span: + start = time.monotonic() + try: + result = await func(*args, **kwargs) + duration_ms = int((time.monotonic() - start) * 1000) + span.set_attribute("gen_ai.duration_ms", duration_ms) + # Record token usage if available on the response + if hasattr(result, "usage") and result.usage is not None: + span.set_attribute( + "gen_ai.usage.input_tokens", + getattr(result.usage, "prompt_tokens", 0), + ) + span.set_attribute( + "gen_ai.usage.output_tokens", + getattr(result.usage, "completion_tokens", 0), + ) + return result + except Exception as e: + span.set_status(Status(StatusCode.ERROR, str(e))) + span.record_exception(e) + raise + + return wrapper + + return decorator + + +def trace_pipeline_step(pipeline_name: str, step_name: str): + """Decorator: trace pipeline step execution.""" + + def decorator(func: Callable) -> Callable: + @wraps(func) + async def wrapper(*args, **kwargs): + if not _OTEL_AVAILABLE: + return await func(*args, **kwargs) + tracer = get_tracer() + with tracer.start_as_current_span( + "pipeline.step", + kind=SpanKind.INTERNAL, + attributes={ + "pipeline.name": pipeline_name, + "step.name": step_name, + }, + ) as span: + start = time.monotonic() + try: + result = await func(*args, **kwargs) + duration_ms = int((time.monotonic() - start) * 1000) + span.set_attribute("step.duration_ms", duration_ms) + span.set_attribute("step.status", "success") + return result + except Exception as e: + duration_ms = int((time.monotonic() - start) * 1000) + span.set_attribute("step.duration_ms", duration_ms) + span.set_attribute("step.status", "error") + span.set_status(Status(StatusCode.ERROR, str(e))) + span.record_exception(e) + raise + + return wrapper + + return decorator diff --git a/src/agentkit/tools/base.py b/src/agentkit/tools/base.py index 7642644..79a1706 100644 --- a/src/agentkit/tools/base.py +++ b/src/agentkit/tools/base.py @@ -1,8 +1,12 @@ """Tool 抽象基类 - 统一工具接口""" +import time from abc import ABC, abstractmethod from typing import Any +from agentkit.telemetry.tracing import start_span +from agentkit.telemetry.metrics import tool_duration_histogram + class Tool(ABC): """工具抽象基类 @@ -45,14 +49,32 @@ class Tool(ABC): async def safe_execute(self, **kwargs) -> dict: """带钩子的安全执行""" + _span_cm = start_span( + "tool.execute", + attributes={"tool.name": self.name}, + ) + _span = _span_cm.__enter__() + _start = time.monotonic() try: await self.before_execute(**kwargs) result = await self.execute(**kwargs) await self.after_execute(result, **kwargs) + _duration_ms = int((time.monotonic() - _start) * 1000) + if _span is not None: + _span.set_attribute("tool.duration_ms", _duration_ms) + _span.set_attribute("tool.result.success", True) + tool_duration_histogram().record(_duration_ms, {"tool.name": self.name}) return result except Exception as e: + _duration_ms = int((time.monotonic() - _start) * 1000) + if _span is not None: + _span.set_attribute("tool.duration_ms", _duration_ms) + _span.set_attribute("tool.result.success", False) + tool_duration_histogram().record(_duration_ms, {"tool.name": self.name}) await self.on_error(e, **kwargs) raise + finally: + _span_cm.__exit__(None, None, None) def to_dict(self) -> dict: return { diff --git a/tests/unit/test_telemetry.py b/tests/unit/test_telemetry.py new file mode 100644 index 0000000..bb03bf5 --- /dev/null +++ b/tests/unit/test_telemetry.py @@ -0,0 +1,472 @@ +"""Unit tests for telemetry module — OpenTelemetry integration""" + +import asyncio +import importlib +import sys +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + + +# ── No-op behavior when OTel not installed ────────────────────────── + + +class TestNoOpWhenOTelNotInstalled: + """All operations are no-op when opentelemetry is not installed.""" + + def test_tracing_noop_span_context_manager(self): + """_NoOpSpan works as context manager without errors.""" + from agentkit.telemetry.tracing import _NoOpSpan + + span = _NoOpSpan() + with span as s: + s.set_attribute("key", "value") + s.add_event("event") + s.set_status("ok") + s.record_exception(Exception("test")) + + def test_get_tracer_returns_none_without_otel(self): + """get_tracer returns None when OTel is not installed.""" + from agentkit.telemetry.tracing import _OTEL_AVAILABLE, get_tracer + + if _OTEL_AVAILABLE: + pytest.skip("OTel is installed, skipping no-op test") + assert get_tracer() is None + + def test_start_span_returns_noop_without_otel(self): + """start_span returns no-op span when OTel is not installed.""" + from agentkit.telemetry.tracing import _OTEL_AVAILABLE, start_span, _NoOpSpan + + if _OTEL_AVAILABLE: + pytest.skip("OTel is installed, skipping no-op test") + span_cm = start_span("test.span") + assert isinstance(span_cm, _NoOpSpan) + + def test_metrics_noop_counter(self): + """No-op counter add() does not raise.""" + from agentkit.telemetry.metrics import _NoOpCounter + + counter = _NoOpCounter() + counter.add(1, {"key": "value"}) # Should not raise + + def test_metrics_noop_histogram(self): + """No-op histogram record() does not raise.""" + from agentkit.telemetry.metrics import _NoOpHistogram + + hist = _NoOpHistogram() + hist.record(100, {"key": "value"}) # Should not raise + + def test_metrics_get_meter_returns_none_without_otel(self): + """get_meter returns None when OTel is not installed.""" + from agentkit.telemetry.metrics import _OTEL_AVAILABLE, get_meter + + if _OTEL_AVAILABLE: + pytest.skip("OTel is installed, skipping no-op test") + assert get_meter() is None + + def test_metric_helpers_return_noop_without_otel(self): + """Metric helper functions return no-op instruments when OTel not installed.""" + from agentkit.telemetry.metrics import ( + _OTEL_AVAILABLE, + _NoOpCounter, + _NoOpHistogram, + agent_request_counter, + agent_duration_histogram, + llm_token_histogram, + tool_duration_histogram, + pipeline_step_histogram, + ) + + if _OTEL_AVAILABLE: + pytest.skip("OTel is installed, skipping no-op test") + + # Reset lazy singletons to force re-creation + import agentkit.telemetry.metrics as m + m._agent_request_counter = None + m._agent_duration_histogram = None + m._llm_token_histogram = None + m._tool_duration_histogram = None + m._pipeline_step_histogram = None + + assert isinstance(agent_request_counter(), _NoOpCounter) + assert isinstance(agent_duration_histogram(), _NoOpHistogram) + assert isinstance(llm_token_histogram(), _NoOpHistogram) + assert isinstance(tool_duration_histogram(), _NoOpHistogram) + assert isinstance(pipeline_step_histogram(), _NoOpHistogram) + + +# ── Tracing decorator tests ───────────────────────────────────────── + + +class TestTraceAgentDecorator: + """trace_agent decorator works with and without OTel.""" + + @pytest.mark.asyncio + async def test_decorator_works_without_otel(self): + """trace_agent decorator passes through when OTel not installed.""" + from agentkit.telemetry.tracing import _OTEL_AVAILABLE, trace_agent + + if _OTEL_AVAILABLE: + pytest.skip("OTel is installed, skipping no-op test") + + @trace_agent("test_agent", "react") + async def my_func(): + return "result" + + result = await my_func() + assert result == "result" + + @pytest.mark.asyncio + async def test_decorator_propagates_exception_without_otel(self): + """trace_agent propagates exceptions when OTel not installed.""" + from agentkit.telemetry.tracing import _OTEL_AVAILABLE, trace_agent + + if _OTEL_AVAILABLE: + pytest.skip("OTel is installed, skipping no-op test") + + @trace_agent("test_agent") + async def my_func(): + raise ValueError("test error") + + with pytest.raises(ValueError, match="test error"): + await my_func() + + +class TestTraceToolDecorator: + """trace_tool decorator tests.""" + + @pytest.mark.asyncio + async def test_decorator_works_without_otel(self): + """trace_tool decorator passes through when OTel not installed.""" + from agentkit.telemetry.tracing import _OTEL_AVAILABLE, trace_tool + + if _OTEL_AVAILABLE: + pytest.skip("OTel is installed, skipping no-op test") + + @trace_tool("my_tool") + async def my_func(): + return {"result": "ok"} + + result = await my_func() + assert result == {"result": "ok"} + + @pytest.mark.asyncio + async def test_decorator_propagates_exception_without_otel(self): + """trace_tool propagates exceptions when OTel not installed.""" + from agentkit.telemetry.tracing import _OTEL_AVAILABLE, trace_tool + + if _OTEL_AVAILABLE: + pytest.skip("OTel is installed, skipping no-op test") + + @trace_tool("my_tool") + async def my_func(): + raise RuntimeError("tool error") + + with pytest.raises(RuntimeError, match="tool error"): + await my_func() + + +class TestTraceLLMDecorator: + """trace_llm decorator tests.""" + + @pytest.mark.asyncio + async def test_decorator_works_without_otel(self): + """trace_llm decorator passes through when OTel not installed.""" + from agentkit.telemetry.tracing import _OTEL_AVAILABLE, trace_llm + + if _OTEL_AVAILABLE: + pytest.skip("OTel is installed, skipping no-op test") + + @trace_llm("openai", "gpt-4") + async def my_func(): + return MagicMock(usage=MagicMock(prompt_tokens=10, completion_tokens=20)) + + result = await my_func() + assert result is not None + + @pytest.mark.asyncio + async def test_decorator_propagates_exception_without_otel(self): + """trace_llm propagates exceptions when OTel not installed.""" + from agentkit.telemetry.tracing import _OTEL_AVAILABLE, trace_llm + + if _OTEL_AVAILABLE: + pytest.skip("OTel is installed, skipping no-op test") + + @trace_llm("openai", "gpt-4") + async def my_func(): + raise ConnectionError("LLM error") + + with pytest.raises(ConnectionError, match="LLM error"): + await my_func() + + +class TestTracePipelineStepDecorator: + """trace_pipeline_step decorator tests.""" + + @pytest.mark.asyncio + async def test_decorator_works_without_otel(self): + """trace_pipeline_step decorator passes through when OTel not installed.""" + from agentkit.telemetry.tracing import _OTEL_AVAILABLE, trace_pipeline_step + + if _OTEL_AVAILABLE: + pytest.skip("OTel is installed, skipping no-op test") + + @trace_pipeline_step("my_pipeline", "step_1") + async def my_func(): + return "step_result" + + result = await my_func() + assert result == "step_result" + + @pytest.mark.asyncio + async def test_decorator_propagates_exception_without_otel(self): + """trace_pipeline_step propagates exceptions when OTel not installed.""" + from agentkit.telemetry.tracing import _OTEL_AVAILABLE, trace_pipeline_step + + if _OTEL_AVAILABLE: + pytest.skip("OTel is installed, skipping no-op test") + + @trace_pipeline_step("my_pipeline", "step_1") + async def my_func(): + raise RuntimeError("step failed") + + with pytest.raises(RuntimeError, match="step failed"): + await my_func() + + +# ── OTel installed (mocked) tests ─────────────────────────────────── + + +class TestTracingWithMockedOTel: + """Test tracing with mocked OTel imports.""" + + @pytest.mark.asyncio + async def test_trace_agent_with_mocked_otel(self): + """trace_agent creates span with correct attributes when OTel is available.""" + mock_span = MagicMock() + mock_span_cm = MagicMock() + mock_span_cm.__enter__ = MagicMock(return_value=mock_span) + mock_span_cm.__exit__ = MagicMock(return_value=False) + + mock_tracer = MagicMock() + mock_tracer.start_as_current_span.return_value = mock_span_cm + + with patch("agentkit.telemetry.tracing._OTEL_AVAILABLE", True), \ + patch("agentkit.telemetry.tracing.get_tracer", return_value=mock_tracer), \ + patch("agentkit.telemetry.tracing.SpanKind"), \ + patch("agentkit.telemetry.tracing.Status"), \ + patch("agentkit.telemetry.tracing.StatusCode"): + + from agentkit.telemetry.tracing import trace_agent + + @trace_agent("test_agent", "react") + async def my_func(): + return "result" + + result = await my_func() + assert result == "result" + mock_tracer.start_as_current_span.assert_called_once() + call_kwargs = mock_tracer.start_as_current_span.call_args + assert call_kwargs[1]["attributes"]["agent.name"] == "test_agent" + assert call_kwargs[1]["attributes"]["agent.type"] == "react" + + @pytest.mark.asyncio + async def test_trace_tool_with_mocked_otel(self): + """trace_tool creates span with tool.name attribute.""" + mock_span = MagicMock() + mock_span_cm = MagicMock() + mock_span_cm.__enter__ = MagicMock(return_value=mock_span) + mock_span_cm.__exit__ = MagicMock(return_value=False) + + mock_tracer = MagicMock() + mock_tracer.start_as_current_span.return_value = mock_span_cm + + with patch("agentkit.telemetry.tracing._OTEL_AVAILABLE", True), \ + patch("agentkit.telemetry.tracing.get_tracer", return_value=mock_tracer), \ + patch("agentkit.telemetry.tracing.SpanKind"), \ + patch("agentkit.telemetry.tracing.Status"), \ + patch("agentkit.telemetry.tracing.StatusCode"): + + from agentkit.telemetry.tracing import trace_tool + + @trace_tool("search_tool") + async def my_func(): + return {"found": True} + + result = await my_func() + assert result == {"found": True} + call_kwargs = mock_tracer.start_as_current_span.call_args + assert call_kwargs[1]["attributes"]["tool.name"] == "search_tool" + + @pytest.mark.asyncio + async def test_trace_llm_with_mocked_otel(self): + """trace_llm creates span with gen_ai semantic conventions.""" + mock_span = MagicMock() + mock_span_cm = MagicMock() + mock_span_cm.__enter__ = MagicMock(return_value=mock_span) + mock_span_cm.__exit__ = MagicMock(return_value=False) + + mock_tracer = MagicMock() + mock_tracer.start_as_current_span.return_value = mock_span_cm + + mock_usage = MagicMock() + mock_usage.prompt_tokens = 50 + mock_usage.completion_tokens = 100 + mock_response = MagicMock() + mock_response.usage = mock_usage + + with patch("agentkit.telemetry.tracing._OTEL_AVAILABLE", True), \ + patch("agentkit.telemetry.tracing.get_tracer", return_value=mock_tracer), \ + patch("agentkit.telemetry.tracing.SpanKind"), \ + patch("agentkit.telemetry.tracing.Status"), \ + patch("agentkit.telemetry.tracing.StatusCode"): + + from agentkit.telemetry.tracing import trace_llm + + @trace_llm("openai", "gpt-4") + async def my_func(): + return mock_response + + result = await my_func() + assert result is mock_response + call_kwargs = mock_tracer.start_as_current_span.call_args + attrs = call_kwargs[1]["attributes"] + assert attrs["gen_ai.system"] == "openai" + assert attrs["gen_ai.operation.name"] == "chat" + assert attrs["gen_ai.request.model"] == "gpt-4" + # Token usage should be recorded on span + mock_span.set_attribute.assert_any_call("gen_ai.usage.input_tokens", 50) + mock_span.set_attribute.assert_any_call("gen_ai.usage.output_tokens", 100) + + @pytest.mark.asyncio + async def test_trace_pipeline_step_with_mocked_otel(self): + """trace_pipeline_step creates span with pipeline and step attributes.""" + mock_span = MagicMock() + mock_span_cm = MagicMock() + mock_span_cm.__enter__ = MagicMock(return_value=mock_span) + mock_span_cm.__exit__ = MagicMock(return_value=False) + + mock_tracer = MagicMock() + mock_tracer.start_as_current_span.return_value = mock_span_cm + + with patch("agentkit.telemetry.tracing._OTEL_AVAILABLE", True), \ + patch("agentkit.telemetry.tracing.get_tracer", return_value=mock_tracer), \ + patch("agentkit.telemetry.tracing.SpanKind"), \ + patch("agentkit.telemetry.tracing.Status"), \ + patch("agentkit.telemetry.tracing.StatusCode"): + + from agentkit.telemetry.tracing import trace_pipeline_step + + @trace_pipeline_step("geo_pipeline", "analyze") + async def my_func(): + return "done" + + result = await my_func() + assert result == "done" + call_kwargs = mock_tracer.start_as_current_span.call_args + attrs = call_kwargs[1]["attributes"] + assert attrs["pipeline.name"] == "geo_pipeline" + assert attrs["step.name"] == "analyze" + + +# ── setup_telemetry tests ─────────────────────────────────────────── + + +class TestSetupTelemetry: + """setup_telemetry initialization tests.""" + + def test_no_config_is_noop(self): + """setup_telemetry with no config is a no-op.""" + from agentkit.telemetry.setup import setup_telemetry + + mock_app = MagicMock() + setup_telemetry(mock_app, None) # Should not raise + # No auto-instrumentation should happen + mock_app.state = MagicMock() # Just ensure no crash + + def test_disabled_config_is_noop(self): + """setup_telemetry with enabled=False is a no-op.""" + from agentkit.telemetry.setup import setup_telemetry + + mock_app = MagicMock() + setup_telemetry(mock_app, {"enabled": False}) # Should not raise + + def test_config_without_otel_logs_warning(self): + """setup_telemetry with config but OTel not installed logs warning.""" + from agentkit.telemetry.setup import setup_telemetry + + mock_app = MagicMock() + # This should not raise even if OTel is not installed + # It will log a warning internally + config = {"enabled": True, "service_name": "test"} + # If OTel is installed, this will try to set up providers + # If not, it will log a warning and return + setup_telemetry(mock_app, config) # Should not raise + + def test_empty_config_is_noop(self): + """setup_telemetry with empty dict is a no-op (enabled defaults to False).""" + from agentkit.telemetry.setup import setup_telemetry + + mock_app = MagicMock() + setup_telemetry(mock_app, {}) # Should not raise + + +# ── Integration: Tool safe_execute with telemetry ─────────────────── + + +class TestToolTelemetryIntegration: + """Test that Tool.safe_execute records telemetry.""" + + @pytest.mark.asyncio + async def test_safe_execute_records_noop_telemetry(self): + """safe_execute works with no-op telemetry (OTel not installed).""" + from agentkit.tools.base import Tool + + class DummyTool(Tool): + async def execute(self, **kwargs): + return {"result": "ok"} + + tool = DummyTool(name="test_tool", description="A test tool") + result = await tool.safe_execute(query="hello") + assert result == {"result": "ok"} + + @pytest.mark.asyncio + async def test_safe_execute_error_records_telemetry(self): + """safe_execute records error telemetry on exception.""" + from agentkit.tools.base import Tool + + class FailingTool(Tool): + async def execute(self, **kwargs): + raise ValueError("tool failed") + + tool = FailingTool(name="failing_tool", description="A failing tool") + with pytest.raises(ValueError, match="tool failed"): + await tool.safe_execute(query="hello") + + +# ── start_span helper tests ───────────────────────────────────────── + + +class TestStartSpan: + """Test start_span helper function.""" + + def test_start_span_noop_without_otel(self): + """start_span returns no-op span context manager without OTel.""" + from agentkit.telemetry.tracing import _OTEL_AVAILABLE, start_span, _NoOpSpan + + if _OTEL_AVAILABLE: + pytest.skip("OTel is installed, skipping no-op test") + + cm = start_span("test.span", attributes={"key": "value"}) + assert isinstance(cm, _NoOpSpan) + # Should work as context manager + with cm: + pass # No error + + def test_start_span_with_attributes(self): + """start_span accepts attributes parameter without error.""" + from agentkit.telemetry.tracing import start_span + + cm = start_span("test.span", attributes={"key": "value", "count": 42}) + with cm: + pass # No error regardless of OTel availability