fischer-agentkit/src/agentkit/core/plan_exec_engine.py

1201 lines
45 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Plan-and-Execute 执行引擎适配器
将 GoalPlanner + PlanExecutor + PipelineReplanner 组合为 plan_exec 执行模式引擎,
兼容 ReActEngine 的接口execute / execute_stream复用 ReActResult / ReActEvent 数据结构。
三阶段流程:
1. Planner Phase: GoalPlanner 分解目标为 ExecutionPlan
2. Executor Phase: PlanExecutor 按 parallel_groups 执行 PlanStep
3. Replanner Phase: 步骤失败时PipelineReplanner 修正计划后重试
"""
from __future__ import annotations
import asyncio
import json
import logging
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Any, Awaitable, Callable
from agentkit.core.exceptions import TaskCancelledError, TaskTimeoutError
from agentkit.core.goal_planner import GoalPlanner
from agentkit.core.plan_executor import PlanExecutor, PlanExecutionResult
from agentkit.core.plan_schema import ExecutionPlan, PlanStep, PlanStepStatus
from agentkit.core.protocol import CancellationToken, TaskMessage, TaskResult, TaskStatus
from agentkit.core.react import ReActEvent, ReActResult, ReActStep
from agentkit.core.shared_workspace import SharedWorkspace
from agentkit.orchestrator.reflection import PipelineReflector, PipelineReplanner
from agentkit.orchestrator.pipeline_schema import Pipeline, PipelineResult, StageResult, StageStatus
if TYPE_CHECKING:
from agentkit.core.compressor import CompressionStrategy
from agentkit.core.trace import TraceRecorder
from agentkit.memory.retriever import MemoryRetriever
from agentkit.llm.gateway import LLMGateway
from agentkit.tools.base import Tool
logger = logging.getLogger(__name__)
# 最大重规划次数
_DEFAULT_MAX_REPLANS = 2
@dataclass
class _StreamState:
"""流式执行内部状态,用于在 execute_stream 中跨 yield 传递"""
plan_result: PlanExecutionResult | None = None
trajectory: list[ReActStep] = field(default_factory=list)
total_tokens: int = 0
step_counter: int = 0
replanned: bool = False
class PlanExecEngine:
"""Plan-and-Execute 执行引擎适配器
组合 GoalPlanner、PlanExecutor、PipelineReplanner
对外暴露与 ReActEngine 兼容的 execute / execute_stream 接口。
使用方式:
engine = PlanExecEngine(llm_gateway=gateway)
result = await engine.execute(
messages=[{"role": "user", "content": "调研3个竞品并生成报告"}],
tools=[...],
)
"""
def __init__(
self,
llm_gateway: "LLMGateway | None" = None,
max_replans: int = _DEFAULT_MAX_REPLANS,
default_timeout: float = 300.0,
workspace: SharedWorkspace | None = None,
step_event_callback: "Callable[[str, dict[str, Any]], Awaitable[None]] | None" = None,
):
"""
Args:
llm_gateway: LLM Gateway传递给 GoalPlanner / PipelineReplanner
max_replans: 最大重规划次数
default_timeout: 默认超时秒数
workspace: SharedWorkspace 实例,用于步骤间状态传递
step_event_callback: 步骤事件回调,用于非流式执行时推送进度
"""
self._llm_gateway = llm_gateway
self._max_replans = max_replans
self._default_timeout = default_timeout
self._workspace = workspace
self._step_event_callback = step_event_callback
self._confirmation_handler: Any | None = None
# 组合子组件
self._planner = GoalPlanner(llm_gateway=llm_gateway)
self._reflector = PipelineReflector(llm_gateway=llm_gateway)
self._replanner = PipelineReplanner(llm_gateway=llm_gateway)
# ------------------------------------------------------------------
# 公开接口 — 与 ReActEngine 签名一致
# ------------------------------------------------------------------
async def execute(
self,
messages: list[dict[str, str]],
tools: list["Tool"] | None = None,
model: str = "default",
agent_name: str = "",
task_type: str = "",
system_prompt: str | None = None,
trace_recorder: "TraceRecorder | None" = None,
memory_retriever: "MemoryRetriever | None" = None,
task_id: str | None = None,
compressor: "CompressionStrategy | None" = None,
retrieval_config: dict[str, Any] | None = None,
cancellation_token: CancellationToken | None = None,
timeout_seconds: float | None = None,
confirmation_handler: Any | None = None,
) -> ReActResult:
"""执行 Plan-and-Execute 流程
1. Planner Phase: 生成 ExecutionPlan
2. Executor Phase: 逐步执行
3. Replanner Phase: 失败时重规划
"""
self._confirmation_handler = confirmation_handler
effective_timeout = timeout_seconds if timeout_seconds is not None else self._default_timeout
try:
if effective_timeout > 0:
result = await asyncio.wait_for(
self._execute_loop(
messages=messages,
tools=tools,
model=model,
agent_name=agent_name,
task_type=task_type,
system_prompt=system_prompt,
trace_recorder=trace_recorder,
memory_retriever=memory_retriever,
task_id=task_id,
compressor=compressor,
retrieval_config=retrieval_config,
cancellation_token=cancellation_token,
),
timeout=effective_timeout,
)
else:
result = await self._execute_loop(
messages=messages,
tools=tools,
model=model,
agent_name=agent_name,
task_type=task_type,
system_prompt=system_prompt,
trace_recorder=trace_recorder,
memory_retriever=memory_retriever,
task_id=task_id,
compressor=compressor,
retrieval_config=retrieval_config,
cancellation_token=cancellation_token,
)
except asyncio.TimeoutError:
raise TaskTimeoutError(
task_id=task_id or "",
timeout_seconds=int(effective_timeout),
)
except TaskCancelledError:
raise
return result
async def execute_stream(
self,
messages: list[dict[str, str]],
tools: list["Tool"] | None = None,
model: str = "default",
agent_name: str = "",
task_type: str = "",
system_prompt: str | None = None,
trace_recorder: "TraceRecorder | None" = None,
memory_retriever: "MemoryRetriever | None" = None,
task_id: str | None = None,
compressor: "CompressionStrategy | None" = None,
retrieval_config: dict[str, Any] | None = None,
cancellation_token: CancellationToken | None = None,
timeout_seconds: float | None = None,
confirmation_handler: Any | None = None,
):
"""执行 Plan-and-Execute 流程,逐步 yield ReActEvent
事件类型:
- "planning": 开始规划
- "plan_generated": 计划生成完成
- "step_executing": 步骤开始执行
- "step_completed": 步骤执行完成
- "replanning": 触发重规划
- "final_answer": 最终结果
"""
self._confirmation_handler = confirmation_handler
# Memory retrieval
if memory_retriever:
try:
query = str(messages[-1].get("content", "")) if messages else ""
top_k = (retrieval_config or {}).get("top_k", 5)
token_budget = (retrieval_config or {}).get("token_budget", 2000)
memory_context = await memory_retriever.get_context_string(
query=query, top_k=top_k, token_budget=token_budget,
)
if memory_context:
if system_prompt:
system_prompt += f"\n\n## 参考信息\n{memory_context}"
else:
system_prompt = f"## 参考信息\n{memory_context}"
except Exception as e:
logger.warning(f"Memory retrieval failed, continuing without context: {e}")
# 启动轨迹记录
if trace_recorder is not None:
trace_recorder.start_trace(
task_id="",
agent_name=agent_name,
skill_name=task_type or None,
)
state = _StreamState()
trace_outcome = "success"
output = ""
try:
# ── Phase 1: Planner ──
state.step_counter += 1
yield ReActEvent(
event_type="planning",
step=state.step_counter,
data={"message": "Decomposing goal into execution plan..."},
)
goal = self._extract_goal(messages)
available_skills = self._extract_skill_names(tools)
plan = await self._planner.generate_plan(
goal=goal,
context={"system_prompt": system_prompt, "task_type": task_type},
available_skills=available_skills,
)
state.step_counter += 1
yield ReActEvent(
event_type="plan_generated",
step=state.step_counter,
data={
"plan_id": plan.plan_id,
"goal": plan.goal,
"steps": [s.to_dict() for s in plan.steps],
"parallel_groups": plan.parallel_groups,
},
)
state.trajectory.append(ReActStep(
step=state.step_counter,
action="plan_generated",
content=f"Generated plan with {len(plan.steps)} steps",
tokens=0,
))
# ── Phase 2 & 3: Execute with optional replanning ──
current_plan = plan
replan_count = 0
while True:
if cancellation_token is not None:
cancellation_token.check()
task_msg = self._build_task_message(
messages=messages,
agent_name=agent_name,
task_type=task_type,
task_id=task_id,
)
executor = self._create_executor(
messages=messages,
model=model,
system_prompt=system_prompt,
tools=tools,
)
plan_result = await executor.execute(current_plan, task_msg)
# Write step results to workspace for cross-execution state sharing
if self._workspace:
for sid, step_result in plan_result.step_results.items():
if step_result.status == PlanStepStatus.COMPLETED and step_result.result:
await self._workspace.write(
f"plan:{current_plan.plan_id}:step:{sid}:result",
step_result.result,
agent_id=agent_name or "plan_exec",
)
# 将步骤结果映射到 trajectory 并 yield 事件
for sid, step_result in plan_result.step_results.items():
plan_step = current_plan.get_step(sid)
step_name = plan_step.name if plan_step else sid
state.step_counter += 1
yield ReActEvent(
event_type="step_executing",
step=state.step_counter,
data={"step_id": sid, "step_name": step_name},
)
state.step_counter += 1
yield ReActEvent(
event_type="step_completed",
step=state.step_counter,
data={
"step_id": sid,
"step_name": step_name,
"status": step_result.status.value,
"result": step_result.result,
"error": step_result.error,
},
)
state.trajectory.append(ReActStep(
step=state.step_counter,
action="step_completed" if step_result.status == PlanStepStatus.COMPLETED else "step_failed",
tool_name=step_name,
result=step_result.result,
tokens=0,
))
if trace_recorder is not None:
trace_recorder.record_step(
step=state.step_counter,
action="step_completed" if step_result.status == PlanStepStatus.COMPLETED else "step_failed",
tool_name=step_name,
output_data=step_result.result,
error=step_result.error,
)
# 全部成功
if plan_result.status == TaskStatus.COMPLETED:
break
# 失败且可重规划
if plan_result.failed_steps and replan_count < self._max_replans:
replan_count += 1
state.replanned = True
state.step_counter += 1
yield ReActEvent(
event_type="replanning",
step=state.step_counter,
data={
"replan_count": replan_count,
"failed_steps": plan_result.failed_steps,
},
)
pipeline = self._plan_to_pipeline(current_plan, agent_name)
pipeline_result = self._plan_result_to_pipeline_result(current_plan, plan_result)
reflection_report = await self._reflector.reflect(pipeline, pipeline_result, replan_count)
revised_pipeline = await self._replanner.replan(pipeline, pipeline_result, reflection_report)
current_plan = self._pipeline_to_plan(revised_pipeline, plan.goal)
self._merge_completed_results(current_plan, plan_result)
state.trajectory.append(ReActStep(
step=state.step_counter,
action="replanning",
content=f"Replanned (attempt {replan_count}): {reflection_report.root_cause}",
tokens=0,
))
continue
# 无法重规划或已达到上限
break
# 确定输出
output = self._aggregate_output(plan, plan_result)
# 确定状态
if plan_result.status == TaskStatus.FAILED:
trace_outcome = "partial" if plan_result.completed_steps else "error"
elif plan_result.status == TaskStatus.PARTIALLY_COMPLETED:
trace_outcome = "partial"
else:
trace_outcome = "success"
# 最终步骤
state.step_counter += 1
state.trajectory.append(ReActStep(
step=state.step_counter,
action="final_answer",
content=output,
tokens=0,
))
yield ReActEvent(
event_type="final_answer",
step=state.step_counter,
data={
"output": output,
"total_steps": len(state.trajectory),
"total_tokens": state.total_tokens,
"plan_id": plan.plan_id,
"plan_status": plan_result.status.value,
"replanned": state.replanned,
},
)
except TaskCancelledError:
trace_outcome = "cancelled"
raise
finally:
if trace_recorder is not None:
trace_recorder.end_trace(outcome=trace_outcome)
# Memory storage
if memory_retriever and hasattr(memory_retriever, "store_episode"):
try:
summary = output[:500] if output else ""
await memory_retriever.store_episode(
key=f"task:{task_id or 'unknown'}",
value={"output_summary": summary, "agent_name": agent_name},
metadata={"task_type": task_type, "outcome": trace_outcome},
)
except Exception as e:
logger.warning(f"Failed to store task result in episodic memory: {e}")
# ------------------------------------------------------------------
# 内部实现
# ------------------------------------------------------------------
async def _execute_loop(
self,
messages: list[dict[str, str]],
tools: list["Tool"] | None = None,
model: str = "default",
agent_name: str = "",
task_type: str = "",
system_prompt: str | None = None,
trace_recorder: "TraceRecorder | None" = None,
memory_retriever: "MemoryRetriever | None" = None,
task_id: str | None = None,
compressor: "CompressionStrategy | None" = None,
retrieval_config: dict[str, Any] | None = None,
cancellation_token: CancellationToken | None = None,
) -> ReActResult:
"""Plan-and-Execute 核心循环(非流式)"""
# Memory retrieval
if memory_retriever:
try:
query = str(messages[-1].get("content", "")) if messages else ""
top_k = (retrieval_config or {}).get("top_k", 5)
token_budget = (retrieval_config or {}).get("token_budget", 2000)
memory_context = await memory_retriever.get_context_string(
query=query, top_k=top_k, token_budget=token_budget,
)
if memory_context:
if system_prompt:
system_prompt += f"\n\n## 参考信息\n{memory_context}"
else:
system_prompt = f"## 参考信息\n{memory_context}"
except Exception as e:
logger.warning(f"Memory retrieval failed, continuing without context: {e}")
# 启动轨迹记录
if trace_recorder is not None:
trace_recorder.start_trace(
task_id="",
agent_name=agent_name,
skill_name=task_type or None,
)
trajectory: list[ReActStep] = []
total_tokens = 0
trace_outcome = "success"
try:
# ── Phase 1: Planner ──
if cancellation_token is not None:
cancellation_token.check()
goal = self._extract_goal(messages)
available_skills = self._extract_skill_names(tools)
plan = await self._planner.generate_plan(
goal=goal,
context={"system_prompt": system_prompt, "task_type": task_type},
available_skills=available_skills,
)
# Emit plan_generated event
if self._step_event_callback:
try:
await self._step_event_callback("plan_generated", {
"plan_id": plan.plan_id,
"goal": plan.goal,
"steps": [s.to_dict() for s in plan.steps],
})
except Exception as e:
logger.warning(f"Step event callback failed: {e}")
trajectory.append(ReActStep(
step=1,
action="plan_generated",
content=f"Generated plan with {len(plan.steps)} steps",
tokens=0,
))
if trace_recorder is not None:
trace_recorder.record_step(
step=1,
action="plan_generated",
output_data={"plan_id": plan.plan_id, "num_steps": len(plan.steps)},
)
# ── Phase 2 & 3: Execute with replanning ──
plan_result, trajectory, total_tokens = await self._execute_with_replanning(
plan=plan,
messages=messages,
tools=tools,
model=model,
agent_name=agent_name,
task_type=task_type,
system_prompt=system_prompt,
trace_recorder=trace_recorder,
task_id=task_id,
cancellation_token=cancellation_token,
trajectory=trajectory,
total_tokens=total_tokens,
)
# 聚合输出
output = self._aggregate_output(plan, plan_result)
# 确定状态
if plan_result.status == TaskStatus.FAILED:
trace_outcome = "partial" if plan_result.completed_steps else "error"
elif plan_result.status == TaskStatus.PARTIALLY_COMPLETED:
trace_outcome = "partial"
else:
trace_outcome = "success"
trajectory.append(ReActStep(
step=len(trajectory) + 1,
action="final_answer",
content=output,
tokens=0,
))
return ReActResult(
output=output,
trajectory=trajectory,
total_steps=len(trajectory),
total_tokens=total_tokens,
status=trace_outcome,
)
except TaskCancelledError:
trace_outcome = "cancelled"
raise
finally:
if trace_recorder is not None:
trace_recorder.end_trace(outcome=trace_outcome)
# Memory storage
if memory_retriever and hasattr(memory_retriever, "store_episode"):
try:
output = trajectory[-1].content if trajectory else ""
summary = output[:500] if output else ""
await memory_retriever.store_episode(
key=f"task:{task_id or 'unknown'}",
value={"output_summary": summary, "agent_name": agent_name},
metadata={"task_type": task_type, "outcome": trace_outcome},
)
except Exception as e:
logger.warning(f"Failed to store task result in episodic memory: {e}")
async def _execute_with_replanning(
self,
plan: ExecutionPlan,
messages: list[dict[str, str]],
tools: list["Tool"] | None,
model: str,
agent_name: str,
task_type: str,
system_prompt: str | None,
trace_recorder: "TraceRecorder | None",
task_id: str | None,
cancellation_token: CancellationToken | None,
trajectory: list[ReActStep],
total_tokens: int,
) -> tuple[PlanExecutionResult, list[ReActStep], int]:
"""执行计划,失败时触发重规划
Returns:
(plan_result, trajectory, total_tokens)
"""
current_plan = plan
replan_count = 0
while True:
if cancellation_token is not None:
cancellation_token.check()
# 构建 TaskMessage 用于 PlanExecutor
task_msg = self._build_task_message(
messages=messages,
agent_name=agent_name,
task_type=task_type,
task_id=task_id,
)
# 创建 PlanExecutor使用 LLM 直接调用模式)
executor = self._create_executor(
messages=messages,
model=model,
system_prompt=system_prompt,
tools=tools,
)
plan_result = await executor.execute(current_plan, task_msg)
# Write step results to workspace for cross-execution state sharing
if self._workspace:
for sid, step_result in plan_result.step_results.items():
if step_result.status == PlanStepStatus.COMPLETED and step_result.result:
await self._workspace.write(
f"plan:{current_plan.plan_id}:step:{sid}:result",
step_result.result,
agent_id=agent_name or "plan_exec",
)
# 将步骤结果映射到 trajectory
for sid, step_result in plan_result.step_results.items():
plan_step = current_plan.get_step(sid)
step_name = plan_step.name if plan_step else sid
trajectory.append(ReActStep(
step=len(trajectory) + 1,
action="step_completed" if step_result.status == PlanStepStatus.COMPLETED else "step_failed",
tool_name=step_name,
result=step_result.result,
tokens=0,
))
# Emit step event callback
if self._step_event_callback:
event_type = "step_completed" if step_result.status == PlanStepStatus.COMPLETED else "step_failed"
try:
await self._step_event_callback(event_type, {
"step_id": sid,
"step_name": step_name,
"status": step_result.status.value,
"result": step_result.result,
"error": step_result.error,
})
except Exception as e:
logger.warning(f"Step event callback failed: {e}")
if trace_recorder is not None:
trace_recorder.record_step(
step=len(trajectory),
action="step_completed" if step_result.status == PlanStepStatus.COMPLETED else "step_failed",
tool_name=step_name,
output_data=step_result.result,
error=step_result.error,
)
# 如果全部成功,直接返回
if plan_result.status == TaskStatus.COMPLETED:
return plan_result, trajectory, total_tokens
# 如果有失败步骤且还可以重规划
if plan_result.failed_steps and replan_count < self._max_replans:
replan_count += 1
logger.info(
f"Plan execution has failed steps, triggering replan "
f"(attempt {replan_count}/{self._max_replans})"
)
# 将 ExecutionPlan 转换为 Pipeline 用于反思-重规划
pipeline = self._plan_to_pipeline(current_plan, agent_name)
pipeline_result = self._plan_result_to_pipeline_result(current_plan, plan_result)
# 反思
reflection_report = await self._reflector.reflect(pipeline, pipeline_result, replan_count)
# 重规划
revised_pipeline = await self._replanner.replan(pipeline, pipeline_result, reflection_report)
# 将修正后的 Pipeline 转回 ExecutionPlan
current_plan = self._pipeline_to_plan(revised_pipeline, plan.goal)
# 保留已完成步骤的结果到新计划
self._merge_completed_results(current_plan, plan_result)
# Emit replanning event
if self._step_event_callback:
try:
await self._step_event_callback("replanning", {
"replan_count": replan_count,
"root_cause": reflection_report.root_cause,
"new_plan_id": current_plan.plan_id,
})
except Exception as e:
logger.warning(f"Step event callback failed: {e}")
trajectory.append(ReActStep(
step=len(trajectory) + 1,
action="replanning",
content=f"Replanned (attempt {replan_count}): {reflection_report.root_cause}",
tokens=0,
))
if trace_recorder is not None:
trace_recorder.record_step(
step=len(trajectory),
action="replanning",
output_data={
"replan_count": replan_count,
"root_cause": reflection_report.root_cause,
"new_plan_id": current_plan.plan_id,
},
)
continue
# 无法重规划或已达到上限,返回部分结果
return plan_result, trajectory, total_tokens
# ------------------------------------------------------------------
# 辅助方法
# ------------------------------------------------------------------
@staticmethod
def _extract_goal(messages: list[dict[str, str]]) -> str:
"""从消息列表中提取用户目标"""
for msg in reversed(messages):
if msg.get("role") == "user":
return msg.get("content", "")
return ""
@staticmethod
def _extract_skill_names(tools: list["Tool"] | None) -> list[str]:
"""从工具列表中提取 Skill 名称"""
if not tools:
return []
return [t.name for t in tools]
@staticmethod
def _build_task_message(
messages: list[dict[str, str]],
agent_name: str,
task_type: str,
task_id: str | None,
) -> TaskMessage:
"""构建 TaskMessage 用于 PlanExecutor"""
goal = ""
for msg in reversed(messages):
if msg.get("role") == "user":
goal = msg.get("content", "")
break
return TaskMessage(
task_id=task_id or "plan_exec",
agent_name=agent_name,
task_type=task_type,
priority=0,
input_data={"goal": goal, "messages": messages},
callback_url=None,
created_at=datetime.now(timezone.utc),
)
def _create_executor(
self,
messages: list[dict[str, str]],
model: str,
system_prompt: str | None,
tools: list["Tool"] | None,
step_executor_type: str = "react",
) -> PlanExecutor:
"""创建 PlanExecutor 实例
Args:
step_executor_type: "react" 使用 ReActStepExecutor默认支持工具调用
"llm" 使用 _LLMStepExecutor纯 LLM 调用,无工具)
"""
if step_executor_type == "llm":
step_executor: _LLMStepExecutor | ReActStepExecutor = _LLMStepExecutor(
llm_gateway=self._llm_gateway,
messages=messages,
model=model,
system_prompt=system_prompt,
tools=tools,
)
else:
step_executor = ReActStepExecutor(
llm_gateway=self._llm_gateway,
messages=messages,
model=model,
system_prompt=system_prompt,
tools=tools,
confirmation_handler=self._confirmation_handler,
)
return PlanExecutor(
agent_pool=step_executor,
max_retries=1,
step_timeout=120.0,
)
@staticmethod
def _plan_to_pipeline(plan: ExecutionPlan, agent_name: str) -> Pipeline:
"""将 ExecutionPlan 转换为 Pipeline用于 PipelineReplanner"""
from agentkit.orchestrator.pipeline_schema import PipelineStage
stages = []
for step in plan.steps:
stages.append(PipelineStage(
name=step.step_id,
agent=agent_name,
action=step.description,
depends_on=step.dependencies,
inputs=step.input_data,
))
return Pipeline(
name=f"plan_{plan.plan_id}",
version="1.0",
description=plan.goal,
stages=stages,
)
@staticmethod
def _plan_result_to_pipeline_result(
plan: ExecutionPlan,
plan_result: PlanExecutionResult,
) -> PipelineResult:
"""将 PlanExecutionResult 转换为 PipelineResult用于 PipelineReplanner"""
stage_results = {}
for sid, sr in plan_result.step_results.items():
status_map = {
PlanStepStatus.PENDING: StageStatus.PENDING,
PlanStepStatus.RUNNING: StageStatus.RUNNING,
PlanStepStatus.COMPLETED: StageStatus.COMPLETED,
PlanStepStatus.FAILED: StageStatus.FAILED,
PlanStepStatus.SKIPPED: StageStatus.SKIPPED,
}
stage_results[sid] = StageResult(
stage_name=sid,
status=status_map.get(sr.status, StageStatus.PENDING),
output_data=sr.result,
error_message=sr.error,
)
overall_status = StageStatus.COMPLETED
if plan_result.status == TaskStatus.FAILED:
overall_status = StageStatus.FAILED
elif plan_result.status == TaskStatus.PARTIALLY_COMPLETED:
overall_status = StageStatus.FAILED
return PipelineResult(
pipeline_name=f"plan_{plan.plan_id}",
status=overall_status,
stage_results=stage_results,
)
@staticmethod
def _pipeline_to_plan(pipeline: Pipeline, goal: str) -> ExecutionPlan:
"""将修正后的 Pipeline 转回 ExecutionPlan"""
steps = []
for stage in pipeline.stages:
steps.append(PlanStep(
step_id=stage.name,
name=stage.name,
description=stage.action,
dependencies=stage.depends_on,
input_data=stage.inputs,
required_skills=[],
))
plan = ExecutionPlan(
goal=goal,
steps=steps,
)
# 重建并行组
planner = GoalPlanner()
plan.parallel_groups = planner._build_parallel_groups(steps)
return plan
@staticmethod
def _merge_completed_results(
plan: ExecutionPlan,
plan_result: PlanExecutionResult,
) -> None:
"""将已完成步骤的结果合并到新计划中,避免重复执行"""
for step in plan.steps:
if step.step_id in plan_result.step_results:
sr = plan_result.step_results[step.step_id]
if sr.status == PlanStepStatus.COMPLETED:
step.status = PlanStepStatus.COMPLETED
step.result = sr.result
elif sr.status == PlanStepStatus.SKIPPED:
step.status = PlanStepStatus.SKIPPED
@staticmethod
def _aggregate_output(plan: ExecutionPlan, plan_result: PlanExecutionResult) -> str:
"""聚合步骤结果为最终输出"""
completed_results = []
for step in plan.steps:
sr = plan_result.step_results.get(step.step_id)
if sr and sr.status == PlanStepStatus.COMPLETED and sr.result:
completed_results.append({
"step": step.name,
"result": sr.result,
})
if not completed_results:
# 没有成功步骤
failed_info = []
for sid in plan_result.failed_steps:
sr = plan_result.step_results.get(sid)
plan_step = plan.get_step(sid)
name = plan_step.name if plan_step else sid
failed_info.append(f"- {name}: {sr.error if sr else 'unknown error'}")
if failed_info:
return "Plan execution failed.\nFailed steps:\n" + "\n".join(failed_info)
return "Plan execution completed with no output."
# 简单聚合:将所有成功步骤结果格式化
parts = []
for item in completed_results:
result_str = json.dumps(item["result"], ensure_ascii=False) if isinstance(item["result"], dict) else str(item["result"])
parts.append(f"**{item['step']}**: {result_str}")
return "\n\n".join(parts)
class _LLMStepExecutor:
"""LLM 直接调用步骤执行器
作为 PlanExecutor 的 agent_pool 替代品,
使每个 PlanStep 通过 LLM 直接调用执行,而非通过 AgentPool。
"""
def __init__(
self,
llm_gateway: "LLMGateway | None" = None,
messages: list[dict[str, str]] | None = None,
model: str = "default",
system_prompt: str | None = None,
tools: list["Tool"] | None = None,
):
self._llm_gateway = llm_gateway
self._messages = messages or []
self._model = model
self._system_prompt = system_prompt
self._tools = tools
self._agents: dict[str, _LLMStepAgent] = {}
async def create_agent_from_skill(self, skill_name: str):
"""创建 LLM 步骤 Agent"""
agent = _LLMStepAgent(
name=skill_name,
llm_gateway=self._llm_gateway,
messages=self._messages,
model=self._model,
system_prompt=self._system_prompt,
tools=self._tools,
)
self._agents[skill_name] = agent
return agent
def get_agent(self, key: str):
"""获取已创建的 Agent"""
if key in self._agents:
return self._agents[key]
# 回退:创建一个默认 Agent
agent = _LLMStepAgent(
name=key,
llm_gateway=self._llm_gateway,
messages=self._messages,
model=self._model,
system_prompt=self._system_prompt,
tools=self._tools,
)
self._agents[key] = agent
return agent
class ReActStepExecutor:
"""ReAct 循环步骤执行器
使用 ReActEngine 执行每个 PlanStep支持工具调用和多步推理。
作为 PlanExecutor 的 agent_pool 替代品。
"""
def __init__(
self,
llm_gateway: "LLMGateway | None" = None,
messages: list[dict[str, str]] | None = None,
model: str = "default",
system_prompt: str | None = None,
tools: list["Tool"] | None = None,
max_steps: int = 5,
confirmation_handler: Any | None = None,
):
self._llm_gateway = llm_gateway
self._messages = messages or []
self._model = model
self._system_prompt = system_prompt
self._tools = tools or []
self._max_steps = max_steps
self._confirmation_handler = confirmation_handler
self._agents: dict[str, _ReActStepAgent] = {}
async def create_agent_from_skill(self, skill_name: str):
"""创建 ReAct 步骤 Agent"""
agent = _ReActStepAgent(
name=skill_name,
llm_gateway=self._llm_gateway,
messages=self._messages,
model=self._model,
system_prompt=self._system_prompt,
tools=self._tools,
max_steps=self._max_steps,
confirmation_handler=self._confirmation_handler,
)
self._agents[skill_name] = agent
return agent
def get_agent(self, key: str):
"""获取已创建的 Agent"""
if key in self._agents:
return self._agents[key]
agent = _ReActStepAgent(
name=key,
llm_gateway=self._llm_gateway,
messages=self._messages,
model=self._model,
system_prompt=self._system_prompt,
tools=self._tools,
max_steps=self._max_steps,
)
self._agents[key] = agent
return agent
class _ReActStepAgent:
"""ReAct 循环步骤 Agent
将 PlanStep 的描述作为任务交给 ReActEngine 执行,
支持工具调用和多步 think-act-observe 循环。
"""
def __init__(
self,
name: str,
llm_gateway: "LLMGateway | None" = None,
messages: list[dict[str, str]] | None = None,
model: str = "default",
system_prompt: str | None = None,
tools: list["Tool"] | None = None,
max_steps: int = 5,
confirmation_handler: Any | None = None,
):
self.name = name
self._llm_gateway = llm_gateway
self._messages = messages or []
self._model = model
self._system_prompt = system_prompt
self._tools = tools or []
self._max_steps = max_steps
self._confirmation_handler = confirmation_handler
async def execute(self, task_msg: TaskMessage) -> "TaskResult":
"""执行步骤:通过 ReActEngine 循环调用"""
if self._llm_gateway is None:
raise RuntimeError(f"No LLM gateway available for step '{task_msg.task_id}'")
from agentkit.core.react import ReActEngine
input_data = task_msg.input_data
step_name = input_data.get("step_name", task_msg.task_id)
step_description = input_data.get("step_description", "")
dep_results = input_data.get("dependency_results", {})
# 构建步骤 prompt
prompt_parts = [f"Execute the following task step:\n\nStep: {step_name}\nDescription: {step_description}"]
if dep_results:
prompt_parts.append(
f"\nResults from previous steps:\n{json.dumps(dep_results, ensure_ascii=False, indent=2)}"
)
prompt_parts.append("\nProvide a clear, structured result for this step.")
# 构建 ReActEngine
engine = ReActEngine(
llm_gateway=self._llm_gateway,
max_steps=self._max_steps,
)
# 构建 messages
step_messages: list[dict[str, str]] = list(self._messages)
step_messages.append({"role": "user", "content": "\n".join(prompt_parts)})
# 执行 ReAct 循环
react_result = await engine.execute(
messages=step_messages,
tools=self._tools if self._tools else None,
model=self._model,
agent_name=self.name,
system_prompt=self._system_prompt,
confirmation_handler=self._confirmation_handler,
)
now = datetime.now(timezone.utc)
status = TaskStatus.COMPLETED.value
if react_result.status in ("timeout", "cancelled"):
status = TaskStatus.FAILED.value
return TaskResult(
task_id=task_msg.task_id,
agent_name=self.name,
status=status,
output_data={
"content": react_result.output,
"steps": react_result.total_steps,
"tokens": react_result.total_tokens,
},
error_message=None if react_result.status == "success" else react_result.status,
started_at=now,
completed_at=now,
)
class _LLMStepAgent:
"""LLM 直接调用步骤 Agent
将 PlanStep 的描述作为 prompt 发送给 LLM
返回 LLM 的响应作为步骤结果。
"""
def __init__(
self,
name: str,
llm_gateway: "LLMGateway | None" = None,
messages: list[dict[str, str]] | None = None,
model: str = "default",
system_prompt: str | None = None,
tools: list["Tool"] | None = None,
):
self.name = name
self._llm_gateway = llm_gateway
self._messages = messages or []
self._model = model
self._system_prompt = system_prompt
self._tools = tools
async def execute(self, task_msg: TaskMessage) -> "TaskResult":
"""执行步骤:通过 LLM 直接调用"""
if self._llm_gateway is None:
raise RuntimeError(f"No LLM gateway available for step '{task_msg.task_id}'")
# 构建步骤 prompt
input_data = task_msg.input_data
step_name = input_data.get("step_name", task_msg.task_id)
step_description = input_data.get("step_description", "")
dep_results = input_data.get("dependency_results", {})
prompt_parts = [f"Execute the following task step:\n\nStep: {step_name}\nDescription: {step_description}"]
if dep_results:
prompt_parts.append(f"\nResults from previous steps:\n{json.dumps(dep_results, ensure_ascii=False, indent=2)}")
prompt_parts.append("\nProvide a clear, structured result for this step.")
conversation: list[dict[str, Any]] = []
if self._system_prompt:
conversation.append({"role": "system", "content": self._system_prompt})
# 添加原始对话上下文
for msg in self._messages:
conversation.append(msg)
conversation.append({"role": "user", "content": "\n".join(prompt_parts)})
response = await self._llm_gateway.chat(
messages=conversation,
model=self._model,
)
now = datetime.now(timezone.utc)
return TaskResult(
task_id=task_msg.task_id,
agent_name=self.name,
status=TaskStatus.COMPLETED.value,
output_data={"content": response.content or ""},
error_message=None,
started_at=now,
completed_at=now,
)