1116 lines
43 KiB
Python
1116 lines
43 KiB
Python
"""Plan-and-Execute 执行引擎适配器
|
||
|
||
将 GoalPlanner + PlanExecutor + PipelineReplanner 组合为 plan_exec 执行模式引擎,
|
||
兼容 ReActEngine 的接口(execute / execute_stream),复用 ReActResult / ReActEvent 数据结构。
|
||
|
||
三阶段流程:
|
||
1. Planner Phase: GoalPlanner 分解目标为 ExecutionPlan
|
||
2. Executor Phase: PlanExecutor 按 parallel_groups 执行 PlanStep
|
||
3. Replanner Phase: 步骤失败时,PipelineReplanner 修正计划后重试
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import json
|
||
import logging
|
||
from dataclasses import dataclass, field
|
||
from datetime import datetime, timezone
|
||
from typing import TYPE_CHECKING, Any, Awaitable, Callable
|
||
|
||
from agentkit.core.exceptions import LLMProviderError, TaskCancelledError, TaskTimeoutError
|
||
from agentkit.core.goal_planner import GoalPlanner
|
||
from agentkit.core.plan_executor import PlanExecutor, PlanExecutionResult
|
||
from agentkit.core.plan_schema import ExecutionPlan, PlanStep, PlanStepStatus
|
||
from agentkit.core.protocol import CancellationToken, TaskMessage, TaskResult, TaskStatus
|
||
from agentkit.core.react import ReActEvent, ReActResult, ReActStep
|
||
from agentkit.core.shared_workspace import SharedWorkspace
|
||
from agentkit.core.spec_manager import Spec, SpecManager, SpecStep
|
||
from agentkit.orchestrator.reflection import PipelineReflector, PipelineReplanner
|
||
from agentkit.orchestrator.pipeline_schema import Pipeline, PipelineResult, StageResult, StageStatus
|
||
|
||
if TYPE_CHECKING:
|
||
from agentkit.core.compressor import CompressionStrategy
|
||
from agentkit.core.trace import TraceRecorder
|
||
from agentkit.memory.retriever import MemoryRetriever
|
||
from agentkit.llm.gateway import LLMGateway
|
||
from agentkit.tools.base import Tool
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# 最大重规划次数
|
||
_DEFAULT_MAX_REPLANS = 2
|
||
|
||
|
||
@dataclass
|
||
class _StreamState:
|
||
"""流式执行内部状态,用于在 execute_stream 中跨 yield 传递"""
|
||
|
||
plan_result: PlanExecutionResult | None = None
|
||
trajectory: list[ReActStep] = field(default_factory=list)
|
||
total_tokens: int = 0
|
||
step_counter: int = 0
|
||
replanned: bool = False
|
||
|
||
|
||
class PlanExecEngine:
|
||
"""Plan-and-Execute 执行引擎适配器
|
||
|
||
组合 GoalPlanner、PlanExecutor、PipelineReplanner,
|
||
对外暴露与 ReActEngine 兼容的 execute / execute_stream 接口。
|
||
|
||
使用方式:
|
||
engine = PlanExecEngine(llm_gateway=gateway)
|
||
result = await engine.execute(
|
||
messages=[{"role": "user", "content": "调研3个竞品并生成报告"}],
|
||
tools=[...],
|
||
)
|
||
"""
|
||
|
||
def __init__(
|
||
self,
|
||
llm_gateway: "LLMGateway | None" = None,
|
||
max_replans: int = _DEFAULT_MAX_REPLANS,
|
||
default_timeout: float = 300.0,
|
||
workspace: SharedWorkspace | None = None,
|
||
step_event_callback: "Callable[[str, dict[str, Any]], Awaitable[None]] | None" = None,
|
||
spec_manager: SpecManager | None = None,
|
||
):
|
||
"""
|
||
Args:
|
||
llm_gateway: LLM Gateway,传递给 GoalPlanner / PipelineReplanner
|
||
max_replans: 最大重规划次数
|
||
default_timeout: 默认超时秒数
|
||
workspace: SharedWorkspace 实例,用于步骤间状态传递
|
||
step_event_callback: 步骤事件回调,用于非流式执行时推送进度
|
||
spec_manager: SpecManager 实例,用于持久化执行计划为 Spec 文档
|
||
"""
|
||
self._llm_gateway = llm_gateway
|
||
self._max_replans = max_replans
|
||
self._default_timeout = default_timeout
|
||
self._workspace = workspace
|
||
self._step_event_callback = step_event_callback
|
||
self._spec_manager = spec_manager
|
||
self._confirmation_handler: Any | None = None
|
||
|
||
# 组合子组件
|
||
self._planner = GoalPlanner(llm_gateway=llm_gateway)
|
||
self._reflector = PipelineReflector(llm_gateway=llm_gateway)
|
||
self._replanner = PipelineReplanner(llm_gateway=llm_gateway)
|
||
|
||
# ------------------------------------------------------------------
|
||
# 公开接口 — 与 ReActEngine 签名一致
|
||
# ------------------------------------------------------------------
|
||
|
||
async def execute(
|
||
self,
|
||
messages: list[dict[str, str]],
|
||
tools: list["Tool"] | None = None,
|
||
model: str = "default",
|
||
agent_name: str = "",
|
||
task_type: str = "",
|
||
system_prompt: str | None = None,
|
||
trace_recorder: "TraceRecorder | None" = None,
|
||
memory_retriever: "MemoryRetriever | None" = None,
|
||
task_id: str | None = None,
|
||
compressor: "CompressionStrategy | None" = None,
|
||
retrieval_config: dict[str, Any] | None = None,
|
||
cancellation_token: CancellationToken | None = None,
|
||
timeout_seconds: float | None = None,
|
||
confirmation_handler: Any | None = None,
|
||
) -> ReActResult:
|
||
"""执行 Plan-and-Execute 流程
|
||
|
||
1. Planner Phase: 生成 ExecutionPlan
|
||
2. Executor Phase: 逐步执行
|
||
3. Replanner Phase: 失败时重规划
|
||
"""
|
||
self._confirmation_handler = confirmation_handler
|
||
effective_timeout = timeout_seconds if timeout_seconds is not None else self._default_timeout
|
||
|
||
try:
|
||
if effective_timeout > 0:
|
||
result = await asyncio.wait_for(
|
||
self._execute_loop(
|
||
messages=messages,
|
||
tools=tools,
|
||
model=model,
|
||
agent_name=agent_name,
|
||
task_type=task_type,
|
||
system_prompt=system_prompt,
|
||
trace_recorder=trace_recorder,
|
||
memory_retriever=memory_retriever,
|
||
task_id=task_id,
|
||
compressor=compressor,
|
||
retrieval_config=retrieval_config,
|
||
cancellation_token=cancellation_token,
|
||
),
|
||
timeout=effective_timeout,
|
||
)
|
||
else:
|
||
result = await self._execute_loop(
|
||
messages=messages,
|
||
tools=tools,
|
||
model=model,
|
||
agent_name=agent_name,
|
||
task_type=task_type,
|
||
system_prompt=system_prompt,
|
||
trace_recorder=trace_recorder,
|
||
memory_retriever=memory_retriever,
|
||
task_id=task_id,
|
||
compressor=compressor,
|
||
retrieval_config=retrieval_config,
|
||
cancellation_token=cancellation_token,
|
||
)
|
||
except asyncio.TimeoutError:
|
||
raise TaskTimeoutError(
|
||
task_id=task_id or "",
|
||
timeout_seconds=int(effective_timeout),
|
||
)
|
||
except TaskCancelledError:
|
||
raise
|
||
|
||
return result
|
||
|
||
async def execute_stream(
|
||
self,
|
||
messages: list[dict[str, str]],
|
||
tools: list["Tool"] | None = None,
|
||
model: str = "default",
|
||
agent_name: str = "",
|
||
task_type: str = "",
|
||
system_prompt: str | None = None,
|
||
trace_recorder: "TraceRecorder | None" = None,
|
||
memory_retriever: "MemoryRetriever | None" = None,
|
||
task_id: str | None = None,
|
||
compressor: "CompressionStrategy | None" = None,
|
||
retrieval_config: dict[str, Any] | None = None,
|
||
cancellation_token: CancellationToken | None = None,
|
||
timeout_seconds: float | None = None,
|
||
confirmation_handler: Any | None = None,
|
||
):
|
||
"""执行 Plan-and-Execute 流程,逐步 yield ReActEvent
|
||
|
||
事件类型:
|
||
- "planning": 开始规划
|
||
- "plan_generated": 计划生成完成
|
||
- "step_executing": 步骤开始执行
|
||
- "step_completed": 步骤执行完成
|
||
- "replanning": 触发重规划
|
||
- "final_answer": 最终结果
|
||
"""
|
||
self._confirmation_handler = confirmation_handler
|
||
# Memory retrieval
|
||
if memory_retriever:
|
||
try:
|
||
query = str(messages[-1].get("content", "")) if messages else ""
|
||
top_k = (retrieval_config or {}).get("top_k", 5)
|
||
token_budget = (retrieval_config or {}).get("token_budget", 2000)
|
||
memory_context = await memory_retriever.get_context_string(
|
||
query=query, top_k=top_k, token_budget=token_budget,
|
||
)
|
||
if memory_context:
|
||
if system_prompt:
|
||
system_prompt += f"\n\n## 参考信息\n{memory_context}"
|
||
else:
|
||
system_prompt = f"## 参考信息\n{memory_context}"
|
||
except (asyncio.TimeoutError, ConnectionError, LLMProviderError) as e:
|
||
logger.warning(f"Memory retrieval failed, continuing without context: {e}")
|
||
|
||
# 启动轨迹记录
|
||
if trace_recorder is not None:
|
||
trace_recorder.start_trace(
|
||
task_id="",
|
||
agent_name=agent_name,
|
||
skill_name=task_type or None,
|
||
)
|
||
|
||
state = _StreamState()
|
||
trace_outcome = "success"
|
||
output = ""
|
||
|
||
try:
|
||
# ── Phase 1: Planner ──
|
||
state.step_counter += 1
|
||
yield ReActEvent(
|
||
event_type="planning",
|
||
step=state.step_counter,
|
||
data={"message": "Decomposing goal into execution plan..."},
|
||
)
|
||
|
||
goal = self._extract_goal(messages)
|
||
available_skills = self._extract_skill_names(tools)
|
||
plan = await self._planner.generate_plan(
|
||
goal=goal,
|
||
context={"system_prompt": system_prompt, "task_type": task_type},
|
||
available_skills=available_skills,
|
||
)
|
||
|
||
state.step_counter += 1
|
||
yield ReActEvent(
|
||
event_type="plan_generated",
|
||
step=state.step_counter,
|
||
data={
|
||
"plan_id": plan.plan_id,
|
||
"goal": plan.goal,
|
||
"steps": [s.to_dict() for s in plan.steps],
|
||
"parallel_groups": plan.parallel_groups,
|
||
},
|
||
)
|
||
|
||
state.trajectory.append(ReActStep(
|
||
step=state.step_counter,
|
||
action="plan_generated",
|
||
content=f"Generated plan with {len(plan.steps)} steps",
|
||
tokens=0,
|
||
))
|
||
|
||
# Persist plan as Spec if spec_manager is provided
|
||
if self._spec_manager is not None:
|
||
spec = self._plan_to_spec(plan)
|
||
self._spec_manager.create(spec)
|
||
state.step_counter += 1
|
||
yield ReActEvent(
|
||
event_type="spec_created",
|
||
step=state.step_counter,
|
||
data={"spec_id": spec.spec_id, "goal": spec.goal, "num_steps": len(spec.steps)},
|
||
)
|
||
|
||
# ── Phase 2 & 3: Execute with optional replanning ──
|
||
current_plan = plan
|
||
replan_count = 0
|
||
|
||
while True:
|
||
if cancellation_token is not None:
|
||
cancellation_token.check()
|
||
|
||
task_msg = self._build_task_message(
|
||
messages=messages,
|
||
agent_name=agent_name,
|
||
task_type=task_type,
|
||
task_id=task_id,
|
||
)
|
||
|
||
executor = self._create_executor(
|
||
messages=messages,
|
||
model=model,
|
||
system_prompt=system_prompt,
|
||
tools=tools,
|
||
)
|
||
|
||
plan_result = await executor.execute(current_plan, task_msg)
|
||
|
||
# Write step results to workspace for cross-execution state sharing
|
||
if self._workspace:
|
||
for sid, step_result in plan_result.step_results.items():
|
||
if step_result.status == PlanStepStatus.COMPLETED and step_result.result:
|
||
await self._workspace.write(
|
||
f"plan:{current_plan.plan_id}:step:{sid}:result",
|
||
step_result.result,
|
||
agent_id=agent_name or "plan_exec",
|
||
)
|
||
|
||
# 将步骤结果映射到 trajectory 并 yield 事件
|
||
for sid, step_result in plan_result.step_results.items():
|
||
plan_step = current_plan.get_step(sid)
|
||
step_name = plan_step.name if plan_step else sid
|
||
|
||
state.step_counter += 1
|
||
yield ReActEvent(
|
||
event_type="step_executing",
|
||
step=state.step_counter,
|
||
data={"step_id": sid, "step_name": step_name},
|
||
)
|
||
|
||
state.step_counter += 1
|
||
yield ReActEvent(
|
||
event_type="step_completed",
|
||
step=state.step_counter,
|
||
data={
|
||
"step_id": sid,
|
||
"step_name": step_name,
|
||
"status": step_result.status.value,
|
||
"result": step_result.result,
|
||
"error": step_result.error,
|
||
},
|
||
)
|
||
|
||
state.trajectory.append(ReActStep(
|
||
step=state.step_counter,
|
||
action="step_completed" if step_result.status == PlanStepStatus.COMPLETED else "step_failed",
|
||
tool_name=step_name,
|
||
result=step_result.result,
|
||
tokens=0,
|
||
))
|
||
|
||
if trace_recorder is not None:
|
||
trace_recorder.record_step(
|
||
step=state.step_counter,
|
||
action="step_completed" if step_result.status == PlanStepStatus.COMPLETED else "step_failed",
|
||
tool_name=step_name,
|
||
output_data=step_result.result,
|
||
error=step_result.error,
|
||
)
|
||
|
||
# 全部成功
|
||
if plan_result.status == TaskStatus.COMPLETED:
|
||
break
|
||
|
||
# 失败且可重规划
|
||
if plan_result.failed_steps and replan_count < self._max_replans:
|
||
replan_count += 1
|
||
state.replanned = True
|
||
|
||
state.step_counter += 1
|
||
yield ReActEvent(
|
||
event_type="replanning",
|
||
step=state.step_counter,
|
||
data={
|
||
"replan_count": replan_count,
|
||
"failed_steps": plan_result.failed_steps,
|
||
},
|
||
)
|
||
|
||
pipeline = self._plan_to_pipeline(current_plan, agent_name)
|
||
pipeline_result = self._plan_result_to_pipeline_result(current_plan, plan_result)
|
||
|
||
reflection_report = await self._reflector.reflect(pipeline, pipeline_result, replan_count)
|
||
revised_pipeline = await self._replanner.replan(pipeline, pipeline_result, reflection_report)
|
||
current_plan = self._pipeline_to_plan(revised_pipeline, plan.goal)
|
||
self._merge_completed_results(current_plan, plan_result)
|
||
|
||
state.trajectory.append(ReActStep(
|
||
step=state.step_counter,
|
||
action="replanning",
|
||
content=f"Replanned (attempt {replan_count}): {reflection_report.root_cause}",
|
||
tokens=0,
|
||
))
|
||
|
||
continue
|
||
|
||
# 无法重规划或已达到上限
|
||
break
|
||
|
||
# 确定输出
|
||
output = self._aggregate_output(plan, plan_result)
|
||
|
||
# 确定状态
|
||
if plan_result.status == TaskStatus.FAILED:
|
||
trace_outcome = "partial" if plan_result.completed_steps else "error"
|
||
elif plan_result.status == TaskStatus.PARTIALLY_COMPLETED:
|
||
trace_outcome = "partial"
|
||
else:
|
||
trace_outcome = "success"
|
||
|
||
# 最终步骤
|
||
state.step_counter += 1
|
||
state.trajectory.append(ReActStep(
|
||
step=state.step_counter,
|
||
action="final_answer",
|
||
content=output,
|
||
tokens=0,
|
||
))
|
||
|
||
yield ReActEvent(
|
||
event_type="final_answer",
|
||
step=state.step_counter,
|
||
data={
|
||
"output": output,
|
||
"total_steps": len(state.trajectory),
|
||
"total_tokens": state.total_tokens,
|
||
"plan_id": plan.plan_id,
|
||
"plan_status": plan_result.status.value,
|
||
"replanned": state.replanned,
|
||
},
|
||
)
|
||
|
||
except TaskCancelledError:
|
||
trace_outcome = "cancelled"
|
||
raise
|
||
finally:
|
||
if trace_recorder is not None:
|
||
trace_recorder.end_trace(outcome=trace_outcome)
|
||
|
||
# Memory storage
|
||
if memory_retriever and hasattr(memory_retriever, "store_episode"):
|
||
try:
|
||
summary = output[:500] if output else ""
|
||
await memory_retriever.store_episode(
|
||
key=f"task:{task_id or 'unknown'}",
|
||
value={"output_summary": summary, "agent_name": agent_name},
|
||
metadata={"task_type": task_type, "outcome": trace_outcome},
|
||
)
|
||
except (asyncio.TimeoutError, ConnectionError, ValueError) as e:
|
||
logger.warning(f"Failed to store task result in episodic memory: {e}")
|
||
|
||
# ------------------------------------------------------------------
|
||
# 内部实现
|
||
# ------------------------------------------------------------------
|
||
|
||
async def _execute_loop(
|
||
self,
|
||
messages: list[dict[str, str]],
|
||
tools: list["Tool"] | None = None,
|
||
model: str = "default",
|
||
agent_name: str = "",
|
||
task_type: str = "",
|
||
system_prompt: str | None = None,
|
||
trace_recorder: "TraceRecorder | None" = None,
|
||
memory_retriever: "MemoryRetriever | None" = None,
|
||
task_id: str | None = None,
|
||
compressor: "CompressionStrategy | None" = None,
|
||
retrieval_config: dict[str, Any] | None = None,
|
||
cancellation_token: CancellationToken | None = None,
|
||
) -> ReActResult:
|
||
"""Plan-and-Execute 核心循环(非流式)"""
|
||
# Memory retrieval
|
||
if memory_retriever:
|
||
try:
|
||
query = str(messages[-1].get("content", "")) if messages else ""
|
||
top_k = (retrieval_config or {}).get("top_k", 5)
|
||
token_budget = (retrieval_config or {}).get("token_budget", 2000)
|
||
memory_context = await memory_retriever.get_context_string(
|
||
query=query, top_k=top_k, token_budget=token_budget,
|
||
)
|
||
if memory_context:
|
||
if system_prompt:
|
||
system_prompt += f"\n\n## 参考信息\n{memory_context}"
|
||
else:
|
||
system_prompt = f"## 参考信息\n{memory_context}"
|
||
except (asyncio.TimeoutError, ConnectionError, LLMProviderError) as e:
|
||
logger.warning(f"Memory retrieval failed, continuing without context: {e}")
|
||
|
||
# 启动轨迹记录
|
||
if trace_recorder is not None:
|
||
trace_recorder.start_trace(
|
||
task_id="",
|
||
agent_name=agent_name,
|
||
skill_name=task_type or None,
|
||
)
|
||
|
||
trajectory: list[ReActStep] = []
|
||
total_tokens = 0
|
||
trace_outcome = "success"
|
||
|
||
try:
|
||
# ── Phase 1: Planner ──
|
||
if cancellation_token is not None:
|
||
cancellation_token.check()
|
||
|
||
goal = self._extract_goal(messages)
|
||
available_skills = self._extract_skill_names(tools)
|
||
|
||
plan = await self._planner.generate_plan(
|
||
goal=goal,
|
||
context={"system_prompt": system_prompt, "task_type": task_type},
|
||
available_skills=available_skills,
|
||
)
|
||
|
||
# Emit plan_generated event
|
||
if self._step_event_callback:
|
||
try:
|
||
await self._step_event_callback("plan_generated", {
|
||
"plan_id": plan.plan_id,
|
||
"goal": plan.goal,
|
||
"steps": [s.to_dict() for s in plan.steps],
|
||
})
|
||
except (RuntimeError, ValueError, TypeError, KeyError, AttributeError, ConnectionError, asyncio.TimeoutError) as e:
|
||
logger.warning(f"Step event callback failed: {e}")
|
||
|
||
trajectory.append(ReActStep(
|
||
step=1,
|
||
action="plan_generated",
|
||
content=f"Generated plan with {len(plan.steps)} steps",
|
||
tokens=0,
|
||
))
|
||
|
||
# Persist plan as Spec if spec_manager is provided
|
||
if self._spec_manager is not None:
|
||
spec = self._plan_to_spec(plan)
|
||
self._spec_manager.create(spec)
|
||
if self._step_event_callback:
|
||
try:
|
||
await self._step_event_callback("spec_created", {
|
||
"spec_id": spec.spec_id,
|
||
"goal": spec.goal,
|
||
"num_steps": len(spec.steps),
|
||
})
|
||
except (RuntimeError, ValueError, TypeError, KeyError, AttributeError, ConnectionError, asyncio.TimeoutError) as e:
|
||
logger.warning(f"Step event callback failed: {e}")
|
||
|
||
if trace_recorder is not None:
|
||
trace_recorder.record_step(
|
||
step=1,
|
||
action="plan_generated",
|
||
output_data={"plan_id": plan.plan_id, "num_steps": len(plan.steps)},
|
||
)
|
||
|
||
# ── Phase 2 & 3: Execute with replanning ──
|
||
plan_result, trajectory, total_tokens = await self._execute_with_replanning(
|
||
plan=plan,
|
||
messages=messages,
|
||
tools=tools,
|
||
model=model,
|
||
agent_name=agent_name,
|
||
task_type=task_type,
|
||
system_prompt=system_prompt,
|
||
trace_recorder=trace_recorder,
|
||
task_id=task_id,
|
||
cancellation_token=cancellation_token,
|
||
trajectory=trajectory,
|
||
total_tokens=total_tokens,
|
||
)
|
||
|
||
# 聚合输出
|
||
output = self._aggregate_output(plan, plan_result)
|
||
|
||
# 确定状态
|
||
if plan_result.status == TaskStatus.FAILED:
|
||
trace_outcome = "partial" if plan_result.completed_steps else "error"
|
||
elif plan_result.status == TaskStatus.PARTIALLY_COMPLETED:
|
||
trace_outcome = "partial"
|
||
else:
|
||
trace_outcome = "success"
|
||
|
||
trajectory.append(ReActStep(
|
||
step=len(trajectory) + 1,
|
||
action="final_answer",
|
||
content=output,
|
||
tokens=0,
|
||
))
|
||
|
||
return ReActResult(
|
||
output=output,
|
||
trajectory=trajectory,
|
||
total_steps=len(trajectory),
|
||
total_tokens=total_tokens,
|
||
status=trace_outcome,
|
||
)
|
||
|
||
except TaskCancelledError:
|
||
trace_outcome = "cancelled"
|
||
raise
|
||
finally:
|
||
if trace_recorder is not None:
|
||
trace_recorder.end_trace(outcome=trace_outcome)
|
||
|
||
# Memory storage
|
||
if memory_retriever and hasattr(memory_retriever, "store_episode"):
|
||
try:
|
||
output = trajectory[-1].content if trajectory else ""
|
||
summary = output[:500] if output else ""
|
||
await memory_retriever.store_episode(
|
||
key=f"task:{task_id or 'unknown'}",
|
||
value={"output_summary": summary, "agent_name": agent_name},
|
||
metadata={"task_type": task_type, "outcome": trace_outcome},
|
||
)
|
||
except (asyncio.TimeoutError, ConnectionError, ValueError) as e:
|
||
logger.warning(f"Failed to store task result in episodic memory: {e}")
|
||
|
||
async def _execute_with_replanning(
|
||
self,
|
||
plan: ExecutionPlan,
|
||
messages: list[dict[str, str]],
|
||
tools: list["Tool"] | None,
|
||
model: str,
|
||
agent_name: str,
|
||
task_type: str,
|
||
system_prompt: str | None,
|
||
trace_recorder: "TraceRecorder | None",
|
||
task_id: str | None,
|
||
cancellation_token: CancellationToken | None,
|
||
trajectory: list[ReActStep],
|
||
total_tokens: int,
|
||
) -> tuple[PlanExecutionResult, list[ReActStep], int]:
|
||
"""执行计划,失败时触发重规划
|
||
|
||
Returns:
|
||
(plan_result, trajectory, total_tokens)
|
||
"""
|
||
current_plan = plan
|
||
replan_count = 0
|
||
|
||
while True:
|
||
if cancellation_token is not None:
|
||
cancellation_token.check()
|
||
|
||
# 构建 TaskMessage 用于 PlanExecutor
|
||
task_msg = self._build_task_message(
|
||
messages=messages,
|
||
agent_name=agent_name,
|
||
task_type=task_type,
|
||
task_id=task_id,
|
||
)
|
||
|
||
# 创建 PlanExecutor
|
||
executor = self._create_executor(
|
||
messages=messages,
|
||
model=model,
|
||
system_prompt=system_prompt,
|
||
tools=tools,
|
||
)
|
||
|
||
plan_result = await executor.execute(current_plan, task_msg)
|
||
|
||
# Write step results to workspace for cross-execution state sharing
|
||
if self._workspace:
|
||
for sid, step_result in plan_result.step_results.items():
|
||
if step_result.status == PlanStepStatus.COMPLETED and step_result.result:
|
||
await self._workspace.write(
|
||
f"plan:{current_plan.plan_id}:step:{sid}:result",
|
||
step_result.result,
|
||
agent_id=agent_name or "plan_exec",
|
||
)
|
||
|
||
# 将步骤结果映射到 trajectory
|
||
for sid, step_result in plan_result.step_results.items():
|
||
plan_step = current_plan.get_step(sid)
|
||
step_name = plan_step.name if plan_step else sid
|
||
trajectory.append(ReActStep(
|
||
step=len(trajectory) + 1,
|
||
action="step_completed" if step_result.status == PlanStepStatus.COMPLETED else "step_failed",
|
||
tool_name=step_name,
|
||
result=step_result.result,
|
||
tokens=0,
|
||
))
|
||
|
||
# Emit step event callback
|
||
if self._step_event_callback:
|
||
event_type = "step_completed" if step_result.status == PlanStepStatus.COMPLETED else "step_failed"
|
||
try:
|
||
await self._step_event_callback(event_type, {
|
||
"step_id": sid,
|
||
"step_name": step_name,
|
||
"status": step_result.status.value,
|
||
"result": step_result.result,
|
||
"error": step_result.error,
|
||
})
|
||
except (RuntimeError, ValueError, TypeError, KeyError, AttributeError, ConnectionError, asyncio.TimeoutError) as e:
|
||
logger.warning(f"Step event callback failed: {e}")
|
||
|
||
if trace_recorder is not None:
|
||
trace_recorder.record_step(
|
||
step=len(trajectory),
|
||
action="step_completed" if step_result.status == PlanStepStatus.COMPLETED else "step_failed",
|
||
tool_name=step_name,
|
||
output_data=step_result.result,
|
||
error=step_result.error,
|
||
)
|
||
|
||
# 如果全部成功,直接返回
|
||
if plan_result.status == TaskStatus.COMPLETED:
|
||
return plan_result, trajectory, total_tokens
|
||
|
||
# 如果有失败步骤且还可以重规划
|
||
if plan_result.failed_steps and replan_count < self._max_replans:
|
||
replan_count += 1
|
||
logger.info(
|
||
f"Plan execution has failed steps, triggering replan "
|
||
f"(attempt {replan_count}/{self._max_replans})"
|
||
)
|
||
|
||
# 将 ExecutionPlan 转换为 Pipeline 用于反思-重规划
|
||
pipeline = self._plan_to_pipeline(current_plan, agent_name)
|
||
pipeline_result = self._plan_result_to_pipeline_result(current_plan, plan_result)
|
||
|
||
# 反思
|
||
reflection_report = await self._reflector.reflect(pipeline, pipeline_result, replan_count)
|
||
|
||
# 重规划
|
||
revised_pipeline = await self._replanner.replan(pipeline, pipeline_result, reflection_report)
|
||
|
||
# 将修正后的 Pipeline 转回 ExecutionPlan
|
||
current_plan = self._pipeline_to_plan(revised_pipeline, plan.goal)
|
||
|
||
# 保留已完成步骤的结果到新计划
|
||
self._merge_completed_results(current_plan, plan_result)
|
||
|
||
# Emit replanning event
|
||
if self._step_event_callback:
|
||
try:
|
||
await self._step_event_callback("replanning", {
|
||
"replan_count": replan_count,
|
||
"root_cause": reflection_report.root_cause,
|
||
"new_plan_id": current_plan.plan_id,
|
||
})
|
||
except (RuntimeError, ValueError, TypeError, KeyError, AttributeError, ConnectionError, asyncio.TimeoutError) as e:
|
||
logger.warning(f"Step event callback failed: {e}")
|
||
|
||
trajectory.append(ReActStep(
|
||
step=len(trajectory) + 1,
|
||
action="replanning",
|
||
content=f"Replanned (attempt {replan_count}): {reflection_report.root_cause}",
|
||
tokens=0,
|
||
))
|
||
|
||
if trace_recorder is not None:
|
||
trace_recorder.record_step(
|
||
step=len(trajectory),
|
||
action="replanning",
|
||
output_data={
|
||
"replan_count": replan_count,
|
||
"root_cause": reflection_report.root_cause,
|
||
"new_plan_id": current_plan.plan_id,
|
||
},
|
||
)
|
||
|
||
continue
|
||
|
||
# 无法重规划或已达到上限,返回部分结果
|
||
return plan_result, trajectory, total_tokens
|
||
|
||
# ------------------------------------------------------------------
|
||
# 辅助方法
|
||
# ------------------------------------------------------------------
|
||
|
||
@staticmethod
|
||
def _plan_to_spec(plan: ExecutionPlan) -> Spec:
|
||
"""Convert an ExecutionPlan to a Spec for persistence."""
|
||
steps = [
|
||
SpecStep(
|
||
step_id=s.step_id,
|
||
name=s.name,
|
||
description=s.description,
|
||
dependencies=s.dependencies,
|
||
)
|
||
for s in plan.steps
|
||
]
|
||
return Spec(
|
||
spec_id=plan.plan_id,
|
||
goal=plan.goal,
|
||
steps=steps,
|
||
metadata=plan.metadata,
|
||
)
|
||
|
||
@staticmethod
|
||
def _extract_goal(messages: list[dict[str, str]]) -> str:
|
||
"""从消息列表中提取用户目标"""
|
||
for msg in reversed(messages):
|
||
if msg.get("role") == "user":
|
||
return msg.get("content", "")
|
||
return ""
|
||
|
||
@staticmethod
|
||
def _extract_skill_names(tools: list["Tool"] | None) -> list[str]:
|
||
"""从工具列表中提取 Skill 名称"""
|
||
if not tools:
|
||
return []
|
||
return [t.name for t in tools]
|
||
|
||
@staticmethod
|
||
def _build_task_message(
|
||
messages: list[dict[str, str]],
|
||
agent_name: str,
|
||
task_type: str,
|
||
task_id: str | None,
|
||
) -> TaskMessage:
|
||
"""构建 TaskMessage 用于 PlanExecutor"""
|
||
goal = ""
|
||
for msg in reversed(messages):
|
||
if msg.get("role") == "user":
|
||
goal = msg.get("content", "")
|
||
break
|
||
|
||
return TaskMessage(
|
||
task_id=task_id or "plan_exec",
|
||
agent_name=agent_name,
|
||
task_type=task_type,
|
||
priority=0,
|
||
input_data={"goal": goal, "messages": messages},
|
||
callback_url=None,
|
||
created_at=datetime.now(timezone.utc),
|
||
)
|
||
|
||
def _create_executor(
|
||
self,
|
||
messages: list[dict[str, str]],
|
||
model: str,
|
||
system_prompt: str | None,
|
||
tools: list["Tool"] | None,
|
||
) -> PlanExecutor:
|
||
"""创建 PlanExecutor 实例,使用 ReActStepExecutor 执行步骤"""
|
||
step_executor = ReActStepExecutor(
|
||
llm_gateway=self._llm_gateway,
|
||
messages=messages,
|
||
model=model,
|
||
system_prompt=system_prompt,
|
||
tools=tools,
|
||
confirmation_handler=self._confirmation_handler,
|
||
)
|
||
return PlanExecutor(
|
||
agent_pool=step_executor,
|
||
max_retries=1,
|
||
step_timeout=120.0,
|
||
)
|
||
|
||
@staticmethod
|
||
def _plan_to_pipeline(plan: ExecutionPlan, agent_name: str) -> Pipeline:
|
||
"""将 ExecutionPlan 转换为 Pipeline(用于 PipelineReplanner)"""
|
||
from agentkit.orchestrator.pipeline_schema import PipelineStage
|
||
|
||
stages = []
|
||
for step in plan.steps:
|
||
stages.append(PipelineStage(
|
||
name=step.step_id,
|
||
agent=agent_name,
|
||
action=step.description,
|
||
depends_on=step.dependencies,
|
||
inputs=step.input_data,
|
||
))
|
||
|
||
return Pipeline(
|
||
name=f"plan_{plan.plan_id}",
|
||
version="1.0",
|
||
description=plan.goal,
|
||
stages=stages,
|
||
)
|
||
|
||
@staticmethod
|
||
def _plan_result_to_pipeline_result(
|
||
plan: ExecutionPlan,
|
||
plan_result: PlanExecutionResult,
|
||
) -> PipelineResult:
|
||
"""将 PlanExecutionResult 转换为 PipelineResult(用于 PipelineReplanner)"""
|
||
stage_results = {}
|
||
for sid, sr in plan_result.step_results.items():
|
||
status_map = {
|
||
PlanStepStatus.PENDING: StageStatus.PENDING,
|
||
PlanStepStatus.RUNNING: StageStatus.RUNNING,
|
||
PlanStepStatus.COMPLETED: StageStatus.COMPLETED,
|
||
PlanStepStatus.FAILED: StageStatus.FAILED,
|
||
PlanStepStatus.SKIPPED: StageStatus.SKIPPED,
|
||
}
|
||
stage_results[sid] = StageResult(
|
||
stage_name=sid,
|
||
status=status_map.get(sr.status, StageStatus.PENDING),
|
||
output_data=sr.result,
|
||
error_message=sr.error,
|
||
)
|
||
|
||
overall_status = StageStatus.COMPLETED
|
||
if plan_result.status == TaskStatus.FAILED:
|
||
overall_status = StageStatus.FAILED
|
||
elif plan_result.status == TaskStatus.PARTIALLY_COMPLETED:
|
||
overall_status = StageStatus.FAILED
|
||
|
||
return PipelineResult(
|
||
pipeline_name=f"plan_{plan.plan_id}",
|
||
status=overall_status,
|
||
stage_results=stage_results,
|
||
)
|
||
|
||
@staticmethod
|
||
def _pipeline_to_plan(pipeline: Pipeline, goal: str) -> ExecutionPlan:
|
||
"""将修正后的 Pipeline 转回 ExecutionPlan"""
|
||
steps = []
|
||
for stage in pipeline.stages:
|
||
steps.append(PlanStep(
|
||
step_id=stage.name,
|
||
name=stage.name,
|
||
description=stage.action,
|
||
dependencies=stage.depends_on,
|
||
input_data=stage.inputs,
|
||
required_skills=[],
|
||
))
|
||
|
||
plan = ExecutionPlan(
|
||
goal=goal,
|
||
steps=steps,
|
||
)
|
||
# 重建并行组
|
||
planner = GoalPlanner()
|
||
plan.parallel_groups = planner._build_parallel_groups(steps)
|
||
return plan
|
||
|
||
@staticmethod
|
||
def _merge_completed_results(
|
||
plan: ExecutionPlan,
|
||
plan_result: PlanExecutionResult,
|
||
) -> None:
|
||
"""将已完成步骤的结果合并到新计划中,避免重复执行"""
|
||
for step in plan.steps:
|
||
if step.step_id in plan_result.step_results:
|
||
sr = plan_result.step_results[step.step_id]
|
||
if sr.status == PlanStepStatus.COMPLETED:
|
||
step.status = PlanStepStatus.COMPLETED
|
||
step.result = sr.result
|
||
elif sr.status == PlanStepStatus.SKIPPED:
|
||
step.status = PlanStepStatus.SKIPPED
|
||
|
||
@staticmethod
|
||
def _aggregate_output(plan: ExecutionPlan, plan_result: PlanExecutionResult) -> str:
|
||
"""聚合步骤结果为最终输出"""
|
||
completed_results = []
|
||
for step in plan.steps:
|
||
sr = plan_result.step_results.get(step.step_id)
|
||
if sr and sr.status == PlanStepStatus.COMPLETED and sr.result:
|
||
completed_results.append({
|
||
"step": step.name,
|
||
"result": sr.result,
|
||
})
|
||
|
||
if not completed_results:
|
||
# 没有成功步骤
|
||
failed_info = []
|
||
for sid in plan_result.failed_steps:
|
||
sr = plan_result.step_results.get(sid)
|
||
plan_step = plan.get_step(sid)
|
||
name = plan_step.name if plan_step else sid
|
||
failed_info.append(f"- {name}: {sr.error if sr else 'unknown error'}")
|
||
if failed_info:
|
||
return "Plan execution failed.\nFailed steps:\n" + "\n".join(failed_info)
|
||
return "Plan execution completed with no output."
|
||
|
||
# 简单聚合:将所有成功步骤结果格式化
|
||
parts = []
|
||
for item in completed_results:
|
||
result_str = json.dumps(item["result"], ensure_ascii=False) if isinstance(item["result"], dict) else str(item["result"])
|
||
parts.append(f"**{item['step']}**: {result_str}")
|
||
|
||
return "\n\n".join(parts)
|
||
|
||
|
||
class ReActStepExecutor:
|
||
"""ReAct 循环步骤执行器
|
||
|
||
使用 ReActEngine 执行每个 PlanStep,支持工具调用和多步推理。
|
||
作为 PlanExecutor 的 agent_pool 替代品。
|
||
"""
|
||
|
||
def __init__(
|
||
self,
|
||
llm_gateway: "LLMGateway | None" = None,
|
||
messages: list[dict[str, str]] | None = None,
|
||
model: str = "default",
|
||
system_prompt: str | None = None,
|
||
tools: list["Tool"] | None = None,
|
||
max_steps: int = 5,
|
||
confirmation_handler: Any | None = None,
|
||
):
|
||
self._llm_gateway = llm_gateway
|
||
self._messages = messages or []
|
||
self._model = model
|
||
self._system_prompt = system_prompt
|
||
self._tools = tools or []
|
||
self._max_steps = max_steps
|
||
self._confirmation_handler = confirmation_handler
|
||
self._agents: dict[str, _ReActStepAgent] = {}
|
||
|
||
async def create_agent_from_skill(self, skill_name: str):
|
||
"""创建 ReAct 步骤 Agent"""
|
||
agent = _ReActStepAgent(
|
||
name=skill_name,
|
||
llm_gateway=self._llm_gateway,
|
||
messages=self._messages,
|
||
model=self._model,
|
||
system_prompt=self._system_prompt,
|
||
tools=self._tools,
|
||
max_steps=self._max_steps,
|
||
confirmation_handler=self._confirmation_handler,
|
||
)
|
||
self._agents[skill_name] = agent
|
||
return agent
|
||
|
||
def get_agent(self, key: str):
|
||
"""获取已创建的 Agent"""
|
||
if key in self._agents:
|
||
return self._agents[key]
|
||
agent = _ReActStepAgent(
|
||
name=key,
|
||
llm_gateway=self._llm_gateway,
|
||
messages=self._messages,
|
||
model=self._model,
|
||
system_prompt=self._system_prompt,
|
||
tools=self._tools,
|
||
max_steps=self._max_steps,
|
||
)
|
||
self._agents[key] = agent
|
||
return agent
|
||
|
||
|
||
class _ReActStepAgent:
|
||
"""ReAct 循环步骤 Agent
|
||
|
||
将 PlanStep 的描述作为任务交给 ReActEngine 执行,
|
||
支持工具调用和多步 think-act-observe 循环。
|
||
"""
|
||
|
||
def __init__(
|
||
self,
|
||
name: str,
|
||
llm_gateway: "LLMGateway | None" = None,
|
||
messages: list[dict[str, str]] | None = None,
|
||
model: str = "default",
|
||
system_prompt: str | None = None,
|
||
tools: list["Tool"] | None = None,
|
||
max_steps: int = 5,
|
||
confirmation_handler: Any | None = None,
|
||
):
|
||
self.name = name
|
||
self._llm_gateway = llm_gateway
|
||
self._messages = messages or []
|
||
self._model = model
|
||
self._system_prompt = system_prompt
|
||
self._tools = tools or []
|
||
self._max_steps = max_steps
|
||
self._confirmation_handler = confirmation_handler
|
||
|
||
async def execute(self, task_msg: TaskMessage) -> "TaskResult":
|
||
"""执行步骤:通过 ReActEngine 循环调用"""
|
||
if self._llm_gateway is None:
|
||
raise RuntimeError(f"No LLM gateway available for step '{task_msg.task_id}'")
|
||
|
||
from agentkit.core.react import ReActEngine
|
||
|
||
input_data = task_msg.input_data
|
||
step_name = input_data.get("step_name", task_msg.task_id)
|
||
step_description = input_data.get("step_description", "")
|
||
dep_results = input_data.get("dependency_results", {})
|
||
|
||
# 构建步骤 prompt
|
||
prompt_parts = [f"Execute the following task step:\n\nStep: {step_name}\nDescription: {step_description}"]
|
||
if dep_results:
|
||
prompt_parts.append(
|
||
f"\nResults from previous steps:\n{json.dumps(dep_results, ensure_ascii=False, indent=2)}"
|
||
)
|
||
prompt_parts.append("\nProvide a clear, structured result for this step.")
|
||
|
||
# 构建 ReActEngine
|
||
engine = ReActEngine(
|
||
llm_gateway=self._llm_gateway,
|
||
max_steps=self._max_steps,
|
||
)
|
||
|
||
# 构建 messages
|
||
step_messages: list[dict[str, str]] = list(self._messages)
|
||
step_messages.append({"role": "user", "content": "\n".join(prompt_parts)})
|
||
|
||
# 执行 ReAct 循环
|
||
react_result = await engine.execute(
|
||
messages=step_messages,
|
||
tools=self._tools if self._tools else None,
|
||
model=self._model,
|
||
agent_name=self.name,
|
||
system_prompt=self._system_prompt,
|
||
confirmation_handler=self._confirmation_handler,
|
||
)
|
||
|
||
now = datetime.now(timezone.utc)
|
||
status = TaskStatus.COMPLETED.value
|
||
if react_result.status in ("timeout", "cancelled"):
|
||
status = TaskStatus.FAILED.value
|
||
|
||
return TaskResult(
|
||
task_id=task_msg.task_id,
|
||
agent_name=self.name,
|
||
status=status,
|
||
output_data={
|
||
"content": react_result.output,
|
||
"steps": react_result.total_steps,
|
||
"tokens": react_result.total_tokens,
|
||
},
|
||
error_message=None if react_result.status == "success" else react_result.status,
|
||
started_at=now,
|
||
completed_at=now,
|
||
)
|