fischer-agentkit/src/agentkit/core/trace.py

189 lines
5.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""执行轨迹记录器
在 ReActEngine 执行过程中记录完整的执行轨迹每步动作、输入输出、耗时、Token 用量),
为反思和可观测性提供数据。
"""
import time
import uuid
from dataclasses import dataclass, field
from typing import Any, Callable
@dataclass
class TraceStep:
"""单步执行轨迹"""
step: int
action: str # "tool_call" | "llm_call" | "final_answer"
tool_name: str | None = None
input_data: dict | None = None
output_data: Any = None
duration_ms: int = 0
tokens_used: int = 0
error: str | None = None
def to_dict(self) -> dict:
d = {
"step": self.step,
"action": self.action,
"duration_ms": self.duration_ms,
"tokens_used": self.tokens_used,
}
if self.tool_name is not None:
d["tool_name"] = self.tool_name
if self.input_data is not None:
d["input_data"] = self.input_data
if self.output_data is not None:
d["output_data"] = self.output_data
if self.error is not None:
d["error"] = self.error
return d
@dataclass
class ExecutionTrace:
"""完整执行轨迹"""
task_id: str
agent_name: str
skill_name: str | None = None
steps: list[TraceStep] = field(default_factory=list)
total_duration_ms: int = 0
total_tokens: int = 0
outcome: str = "success" # "success" | "failure" | "partial"
quality_score: float = 1.0 # 0.0 - 1.0
def to_dict(self) -> dict:
return {
"task_id": self.task_id,
"agent_name": self.agent_name,
"skill_name": self.skill_name,
"steps": [s.to_dict() for s in self.steps],
"total_duration_ms": self.total_duration_ms,
"total_tokens": self.total_tokens,
"outcome": self.outcome,
"quality_score": self.quality_score,
}
class TraceRecorder:
"""执行轨迹记录器
用法:
recorder = TraceRecorder()
recorder.start_trace(task_id="t1", agent_name="agent1")
recorder.record_step(step=1, action="llm_call", ...)
recorder.record_step(step=2, action="tool_call", tool_name="search", ...)
trace = recorder.end_trace(outcome="success")
"""
def __init__(
self,
task_id: str = "",
agent_name: str = "",
skill_name: str | None = None,
on_trace_complete: Callable[[ExecutionTrace], None] | None = None,
):
self._trace: ExecutionTrace | None = None
self._completed_trace: ExecutionTrace | None = None
self._completed: bool = False
self._step_start_time: float = 0
self._trace_start_time: float = 0
self._on_trace_complete = on_trace_complete
# 如果构造时提供了参数,自动 start_trace
if task_id:
self.start_trace(task_id=task_id, agent_name=agent_name, skill_name=skill_name)
def start_trace(
self,
task_id: str = "",
agent_name: str = "",
skill_name: str | None = None,
) -> None:
"""开始记录执行轨迹"""
tid = task_id or str(uuid.uuid4())
self._trace = ExecutionTrace(
task_id=tid,
agent_name=agent_name,
skill_name=skill_name,
)
self._completed = False
self._trace_start_time = time.monotonic()
def record_step(
self,
step: int,
action: str,
tool_name: str | None = None,
input_data: dict | None = None,
output_data: Any = None,
duration_ms: int = 0,
tokens_used: int = 0,
error: str | None = None,
) -> None:
"""记录一个执行步骤"""
if self._trace is None or self._completed:
return
trace_step = TraceStep(
step=step,
action=action,
tool_name=tool_name,
input_data=input_data,
output_data=output_data,
duration_ms=duration_ms,
tokens_used=tokens_used,
error=error,
)
self._trace.steps.append(trace_step)
def end_trace(
self,
outcome: str = "success",
quality_score: float = 1.0,
) -> ExecutionTrace:
"""结束执行轨迹记录并返回 ExecutionTrace"""
if self._trace is None:
# 未 start_trace 就 end_trace返回一个空的默认轨迹
self._trace = ExecutionTrace(
task_id="unknown",
agent_name="",
)
self._trace.outcome = outcome
self._trace.quality_score = quality_score
# 计算总耗时
if self._trace_start_time > 0:
self._trace.total_duration_ms = int(
(time.monotonic() - self._trace_start_time) * 1000
)
# 计算总 token
self._trace.total_tokens = sum(s.tokens_used for s in self._trace.steps)
result = self._trace
self._completed = True
self._completed_trace = result
self._trace = None
if self._on_trace_complete is not None:
self._on_trace_complete(result)
return result
def get_trace(self) -> ExecutionTrace | None:
"""获取当前执行轨迹end_trace 后返回已完成的轨迹)"""
return self._completed_trace if self._completed else self._trace
def start_step_timer(self) -> None:
"""开始计时当前步骤"""
self._step_start_time = time.monotonic()
def elapsed_ms(self) -> int:
"""获取自 start_step_timer 以来的毫秒数"""
if self._step_start_time == 0:
return 0
return int((time.monotonic() - self._step_start_time) * 1000)