"""Plan-and-Execute 执行引擎适配器 将 GoalPlanner + PlanExecutor + PipelineReplanner 组合为 plan_exec 执行模式引擎, 兼容 ReActEngine 的接口(execute / execute_stream),复用 ReActResult / ReActEvent 数据结构。 三阶段流程: 1. Planner Phase: GoalPlanner 分解目标为 ExecutionPlan 2. Executor Phase: PlanExecutor 按 parallel_groups 执行 PlanStep 3. Replanner Phase: 步骤失败时,PipelineReplanner 修正计划后重试 """ from __future__ import annotations import asyncio import json import logging import time from dataclasses import dataclass, field from datetime import datetime, timezone from typing import TYPE_CHECKING, Any from agentkit.core.exceptions import TaskCancelledError, TaskTimeoutError from agentkit.core.goal_planner import GoalPlanner from agentkit.core.plan_executor import PlanExecutor, PlanExecutionResult, StepExecutionResult from agentkit.core.plan_schema import ExecutionPlan, PlanStep, PlanStepStatus from agentkit.core.protocol import CancellationToken, TaskMessage, TaskStatus from agentkit.core.react import ReActEvent, ReActResult, ReActStep from agentkit.orchestrator.reflection import PipelineReflector, PipelineReplanner from agentkit.orchestrator.pipeline_schema import Pipeline, PipelineResult, ReflectionReport, StageResult, StageStatus if TYPE_CHECKING: from agentkit.core.compressor import CompressionStrategy, ContextCompressor from agentkit.core.trace import TraceRecorder from agentkit.memory.retriever import MemoryRetriever from agentkit.llm.gateway import LLMGateway from agentkit.tools.base import Tool logger = logging.getLogger(__name__) # 最大重规划次数 _DEFAULT_MAX_REPLANS = 2 @dataclass class _StreamState: """流式执行内部状态,用于在 execute_stream 中跨 yield 传递""" plan_result: PlanExecutionResult | None = None trajectory: list[ReActStep] = field(default_factory=list) total_tokens: int = 0 step_counter: int = 0 replanned: bool = False class PlanExecEngine: """Plan-and-Execute 执行引擎适配器 组合 GoalPlanner、PlanExecutor、PipelineReplanner, 对外暴露与 ReActEngine 兼容的 execute / execute_stream 接口。 使用方式: engine = PlanExecEngine(llm_gateway=gateway) result = await engine.execute( messages=[{"role": "user", "content": "调研3个竞品并生成报告"}], tools=[...], ) """ def __init__( self, llm_gateway: "LLMGateway | None" = None, max_replans: int = _DEFAULT_MAX_REPLANS, default_timeout: float = 300.0, ): """ Args: llm_gateway: LLM Gateway,传递给 GoalPlanner / PipelineReplanner max_replans: 最大重规划次数 default_timeout: 默认超时秒数 """ self._llm_gateway = llm_gateway self._max_replans = max_replans self._default_timeout = default_timeout # 组合子组件 self._planner = GoalPlanner(llm_gateway=llm_gateway) self._reflector = PipelineReflector(llm_gateway=llm_gateway) self._replanner = PipelineReplanner(llm_gateway=llm_gateway) # ------------------------------------------------------------------ # 公开接口 — 与 ReActEngine 签名一致 # ------------------------------------------------------------------ async def execute( self, messages: list[dict[str, str]], tools: list["Tool"] | None = None, model: str = "default", agent_name: str = "", task_type: str = "", system_prompt: str | None = None, trace_recorder: "TraceRecorder | None" = None, memory_retriever: "MemoryRetriever | None" = None, task_id: str | None = None, compressor: "CompressionStrategy | None" = None, retrieval_config: dict[str, Any] | None = None, cancellation_token: CancellationToken | None = None, timeout_seconds: float | None = None, ) -> ReActResult: """执行 Plan-and-Execute 流程 1. Planner Phase: 生成 ExecutionPlan 2. Executor Phase: 逐步执行 3. Replanner Phase: 失败时重规划 """ effective_timeout = timeout_seconds if timeout_seconds is not None else self._default_timeout try: if effective_timeout > 0: result = await asyncio.wait_for( self._execute_loop( messages=messages, tools=tools, model=model, agent_name=agent_name, task_type=task_type, system_prompt=system_prompt, trace_recorder=trace_recorder, memory_retriever=memory_retriever, task_id=task_id, compressor=compressor, retrieval_config=retrieval_config, cancellation_token=cancellation_token, ), timeout=effective_timeout, ) else: result = await self._execute_loop( messages=messages, tools=tools, model=model, agent_name=agent_name, task_type=task_type, system_prompt=system_prompt, trace_recorder=trace_recorder, memory_retriever=memory_retriever, task_id=task_id, compressor=compressor, retrieval_config=retrieval_config, cancellation_token=cancellation_token, ) except asyncio.TimeoutError: raise TaskTimeoutError( task_id=task_id or "", timeout_seconds=int(effective_timeout), ) except TaskCancelledError: raise return result async def execute_stream( self, messages: list[dict[str, str]], tools: list["Tool"] | None = None, model: str = "default", agent_name: str = "", task_type: str = "", system_prompt: str | None = None, trace_recorder: "TraceRecorder | None" = None, memory_retriever: "MemoryRetriever | None" = None, task_id: str | None = None, compressor: "CompressionStrategy | None" = None, retrieval_config: dict[str, Any] | None = None, cancellation_token: CancellationToken | None = None, timeout_seconds: float | None = None, ): """执行 Plan-and-Execute 流程,逐步 yield ReActEvent 事件类型: - "planning": 开始规划 - "plan_generated": 计划生成完成 - "step_executing": 步骤开始执行 - "step_completed": 步骤执行完成 - "replanning": 触发重规划 - "final_answer": 最终结果 """ # Memory retrieval if memory_retriever: try: query = str(messages[-1].get("content", "")) if messages else "" top_k = (retrieval_config or {}).get("top_k", 5) token_budget = (retrieval_config or {}).get("token_budget", 2000) memory_context = await memory_retriever.get_context_string( query=query, top_k=top_k, token_budget=token_budget, ) if memory_context: if system_prompt: system_prompt += f"\n\n## 参考信息\n{memory_context}" else: system_prompt = f"## 参考信息\n{memory_context}" except Exception as e: logger.warning(f"Memory retrieval failed, continuing without context: {e}") # 启动轨迹记录 if trace_recorder is not None: trace_recorder.start_trace( task_id="", agent_name=agent_name, skill_name=task_type or None, ) state = _StreamState() trace_outcome = "success" output = "" try: # ── Phase 1: Planner ── state.step_counter += 1 yield ReActEvent( event_type="planning", step=state.step_counter, data={"message": "Decomposing goal into execution plan..."}, ) goal = self._extract_goal(messages) available_skills = self._extract_skill_names(tools) plan = await self._planner.generate_plan( goal=goal, context={"system_prompt": system_prompt, "task_type": task_type}, available_skills=available_skills, ) state.step_counter += 1 yield ReActEvent( event_type="plan_generated", step=state.step_counter, data={ "plan_id": plan.plan_id, "goal": plan.goal, "steps": [s.to_dict() for s in plan.steps], "parallel_groups": plan.parallel_groups, }, ) state.trajectory.append(ReActStep( step=state.step_counter, action="plan_generated", content=f"Generated plan with {len(plan.steps)} steps", tokens=0, )) # ── Phase 2 & 3: Execute with optional replanning ── current_plan = plan replan_count = 0 while True: if cancellation_token is not None: cancellation_token.check() task_msg = self._build_task_message( messages=messages, agent_name=agent_name, task_type=task_type, task_id=task_id, ) executor = self._create_executor( messages=messages, model=model, system_prompt=system_prompt, tools=tools, ) plan_result = await executor.execute(current_plan, task_msg) # 将步骤结果映射到 trajectory 并 yield 事件 for sid, step_result in plan_result.step_results.items(): plan_step = current_plan.get_step(sid) step_name = plan_step.name if plan_step else sid state.step_counter += 1 yield ReActEvent( event_type="step_executing", step=state.step_counter, data={"step_id": sid, "step_name": step_name}, ) state.step_counter += 1 yield ReActEvent( event_type="step_completed", step=state.step_counter, data={ "step_id": sid, "step_name": step_name, "status": step_result.status.value, "result": step_result.result, "error": step_result.error, }, ) state.trajectory.append(ReActStep( step=state.step_counter, action="step_completed" if step_result.status == PlanStepStatus.COMPLETED else "step_failed", tool_name=step_name, result=step_result.result, tokens=0, )) if trace_recorder is not None: trace_recorder.record_step( step=state.step_counter, action="step_completed" if step_result.status == PlanStepStatus.COMPLETED else "step_failed", tool_name=step_name, output_data=step_result.result, error=step_result.error, ) # 全部成功 if plan_result.status == TaskStatus.COMPLETED: break # 失败且可重规划 if plan_result.failed_steps and replan_count < self._max_replans: replan_count += 1 state.replanned = True state.step_counter += 1 yield ReActEvent( event_type="replanning", step=state.step_counter, data={ "replan_count": replan_count, "failed_steps": plan_result.failed_steps, }, ) pipeline = self._plan_to_pipeline(current_plan, agent_name) pipeline_result = self._plan_result_to_pipeline_result(current_plan, plan_result) reflection_report = await self._reflector.reflect(pipeline, pipeline_result, replan_count) revised_pipeline = await self._replanner.replan(pipeline, pipeline_result, reflection_report) current_plan = self._pipeline_to_plan(revised_pipeline, plan.goal) self._merge_completed_results(current_plan, plan_result) state.trajectory.append(ReActStep( step=state.step_counter, action="replanning", content=f"Replanned (attempt {replan_count}): {reflection_report.root_cause}", tokens=0, )) continue # 无法重规划或已达到上限 break # 确定输出 output = self._aggregate_output(plan, plan_result) # 确定状态 if plan_result.status == TaskStatus.FAILED: trace_outcome = "partial" if plan_result.completed_steps else "error" elif plan_result.status == TaskStatus.PARTIALLY_COMPLETED: trace_outcome = "partial" else: trace_outcome = "success" # 最终步骤 state.step_counter += 1 state.trajectory.append(ReActStep( step=state.step_counter, action="final_answer", content=output, tokens=0, )) yield ReActEvent( event_type="final_answer", step=state.step_counter, data={ "output": output, "total_steps": len(state.trajectory), "total_tokens": state.total_tokens, "plan_id": plan.plan_id, "plan_status": plan_result.status.value, "replanned": state.replanned, }, ) except TaskCancelledError: trace_outcome = "cancelled" raise finally: if trace_recorder is not None: trace_recorder.end_trace(outcome=trace_outcome) # Memory storage if memory_retriever and hasattr(memory_retriever, "store_episode"): try: summary = output[:500] if output else "" await memory_retriever.store_episode( key=f"task:{task_id or 'unknown'}", value={"output_summary": summary, "agent_name": agent_name}, metadata={"task_type": task_type, "outcome": trace_outcome}, ) except Exception as e: logger.warning(f"Failed to store task result in episodic memory: {e}") # ------------------------------------------------------------------ # 内部实现 # ------------------------------------------------------------------ async def _execute_loop( self, messages: list[dict[str, str]], tools: list["Tool"] | None = None, model: str = "default", agent_name: str = "", task_type: str = "", system_prompt: str | None = None, trace_recorder: "TraceRecorder | None" = None, memory_retriever: "MemoryRetriever | None" = None, task_id: str | None = None, compressor: "CompressionStrategy | None" = None, retrieval_config: dict[str, Any] | None = None, cancellation_token: CancellationToken | None = None, ) -> ReActResult: """Plan-and-Execute 核心循环(非流式)""" # Memory retrieval if memory_retriever: try: query = str(messages[-1].get("content", "")) if messages else "" top_k = (retrieval_config or {}).get("top_k", 5) token_budget = (retrieval_config or {}).get("token_budget", 2000) memory_context = await memory_retriever.get_context_string( query=query, top_k=top_k, token_budget=token_budget, ) if memory_context: if system_prompt: system_prompt += f"\n\n## 参考信息\n{memory_context}" else: system_prompt = f"## 参考信息\n{memory_context}" except Exception as e: logger.warning(f"Memory retrieval failed, continuing without context: {e}") # 启动轨迹记录 if trace_recorder is not None: trace_recorder.start_trace( task_id="", agent_name=agent_name, skill_name=task_type or None, ) trajectory: list[ReActStep] = [] total_tokens = 0 trace_outcome = "success" try: # ── Phase 1: Planner ── if cancellation_token is not None: cancellation_token.check() goal = self._extract_goal(messages) available_skills = self._extract_skill_names(tools) plan = await self._planner.generate_plan( goal=goal, context={"system_prompt": system_prompt, "task_type": task_type}, available_skills=available_skills, ) trajectory.append(ReActStep( step=1, action="plan_generated", content=f"Generated plan with {len(plan.steps)} steps", tokens=0, )) if trace_recorder is not None: trace_recorder.record_step( step=1, action="plan_generated", output_data={"plan_id": plan.plan_id, "num_steps": len(plan.steps)}, ) # ── Phase 2 & 3: Execute with replanning ── plan_result, trajectory, total_tokens = await self._execute_with_replanning( plan=plan, messages=messages, tools=tools, model=model, agent_name=agent_name, task_type=task_type, system_prompt=system_prompt, trace_recorder=trace_recorder, task_id=task_id, cancellation_token=cancellation_token, trajectory=trajectory, total_tokens=total_tokens, ) # 聚合输出 output = self._aggregate_output(plan, plan_result) # 确定状态 if plan_result.status == TaskStatus.FAILED: trace_outcome = "partial" if plan_result.completed_steps else "error" elif plan_result.status == TaskStatus.PARTIALLY_COMPLETED: trace_outcome = "partial" else: trace_outcome = "success" trajectory.append(ReActStep( step=len(trajectory) + 1, action="final_answer", content=output, tokens=0, )) return ReActResult( output=output, trajectory=trajectory, total_steps=len(trajectory), total_tokens=total_tokens, status=trace_outcome, ) except TaskCancelledError: trace_outcome = "cancelled" raise finally: if trace_recorder is not None: trace_recorder.end_trace(outcome=trace_outcome) # Memory storage if memory_retriever and hasattr(memory_retriever, "store_episode"): try: output = trajectory[-1].content if trajectory else "" summary = output[:500] if output else "" await memory_retriever.store_episode( key=f"task:{task_id or 'unknown'}", value={"output_summary": summary, "agent_name": agent_name}, metadata={"task_type": task_type, "outcome": trace_outcome}, ) except Exception as e: logger.warning(f"Failed to store task result in episodic memory: {e}") async def _execute_with_replanning( self, plan: ExecutionPlan, messages: list[dict[str, str]], tools: list["Tool"] | None, model: str, agent_name: str, task_type: str, system_prompt: str | None, trace_recorder: "TraceRecorder | None", task_id: str | None, cancellation_token: CancellationToken | None, trajectory: list[ReActStep], total_tokens: int, ) -> tuple[PlanExecutionResult, list[ReActStep], int]: """执行计划,失败时触发重规划 Returns: (plan_result, trajectory, total_tokens) """ current_plan = plan replan_count = 0 while True: if cancellation_token is not None: cancellation_token.check() # 构建 TaskMessage 用于 PlanExecutor task_msg = self._build_task_message( messages=messages, agent_name=agent_name, task_type=task_type, task_id=task_id, ) # 创建 PlanExecutor(使用 LLM 直接调用模式) executor = self._create_executor( messages=messages, model=model, system_prompt=system_prompt, tools=tools, ) plan_result = await executor.execute(current_plan, task_msg) # 将步骤结果映射到 trajectory for sid, step_result in plan_result.step_results.items(): plan_step = current_plan.get_step(sid) step_name = plan_step.name if plan_step else sid trajectory.append(ReActStep( step=len(trajectory) + 1, action="step_completed" if step_result.status == PlanStepStatus.COMPLETED else "step_failed", tool_name=step_name, result=step_result.result, tokens=0, )) if trace_recorder is not None: trace_recorder.record_step( step=len(trajectory), action="step_completed" if step_result.status == PlanStepStatus.COMPLETED else "step_failed", tool_name=step_name, output_data=step_result.result, error=step_result.error, ) # 如果全部成功,直接返回 if plan_result.status == TaskStatus.COMPLETED: return plan_result, trajectory, total_tokens # 如果有失败步骤且还可以重规划 if plan_result.failed_steps and replan_count < self._max_replans: replan_count += 1 logger.info( f"Plan execution has failed steps, triggering replan " f"(attempt {replan_count}/{self._max_replans})" ) # 将 ExecutionPlan 转换为 Pipeline 用于反思-重规划 pipeline = self._plan_to_pipeline(current_plan, agent_name) pipeline_result = self._plan_result_to_pipeline_result(current_plan, plan_result) # 反思 reflection_report = await self._reflector.reflect(pipeline, pipeline_result, replan_count) # 重规划 revised_pipeline = await self._replanner.replan(pipeline, pipeline_result, reflection_report) # 将修正后的 Pipeline 转回 ExecutionPlan current_plan = self._pipeline_to_plan(revised_pipeline, plan.goal) # 保留已完成步骤的结果到新计划 self._merge_completed_results(current_plan, plan_result) trajectory.append(ReActStep( step=len(trajectory) + 1, action="replanning", content=f"Replanned (attempt {replan_count}): {reflection_report.root_cause}", tokens=0, )) if trace_recorder is not None: trace_recorder.record_step( step=len(trajectory), action="replanning", output_data={ "replan_count": replan_count, "root_cause": reflection_report.root_cause, "new_plan_id": current_plan.plan_id, }, ) continue # 无法重规划或已达到上限,返回部分结果 return plan_result, trajectory, total_tokens # ------------------------------------------------------------------ # 辅助方法 # ------------------------------------------------------------------ @staticmethod def _extract_goal(messages: list[dict[str, str]]) -> str: """从消息列表中提取用户目标""" for msg in reversed(messages): if msg.get("role") == "user": return msg.get("content", "") return "" @staticmethod def _extract_skill_names(tools: list["Tool"] | None) -> list[str]: """从工具列表中提取 Skill 名称""" if not tools: return [] return [t.name for t in tools] @staticmethod def _build_task_message( messages: list[dict[str, str]], agent_name: str, task_type: str, task_id: str | None, ) -> TaskMessage: """构建 TaskMessage 用于 PlanExecutor""" goal = "" for msg in reversed(messages): if msg.get("role") == "user": goal = msg.get("content", "") break return TaskMessage( task_id=task_id or "plan_exec", agent_name=agent_name, task_type=task_type, priority=0, input_data={"goal": goal, "messages": messages}, callback_url=None, created_at=datetime.now(timezone.utc), ) def _create_executor( self, messages: list[dict[str, str]], model: str, system_prompt: str | None, tools: list["Tool"] | None, ) -> PlanExecutor: """创建 PlanExecutor 实例 使用 _LLMStepExecutor 作为 agent_pool,使每个步骤通过 LLM 直接调用执行。 """ step_executor = _LLMStepExecutor( llm_gateway=self._llm_gateway, messages=messages, model=model, system_prompt=system_prompt, tools=tools, ) return PlanExecutor( agent_pool=step_executor, max_retries=1, step_timeout=120.0, ) @staticmethod def _plan_to_pipeline(plan: ExecutionPlan, agent_name: str) -> Pipeline: """将 ExecutionPlan 转换为 Pipeline(用于 PipelineReplanner)""" from agentkit.orchestrator.pipeline_schema import PipelineStage stages = [] for step in plan.steps: stages.append(PipelineStage( name=step.step_id, agent=agent_name, action=step.description, depends_on=step.dependencies, inputs=step.input_data, )) return Pipeline( name=f"plan_{plan.plan_id}", version="1.0", description=plan.goal, stages=stages, ) @staticmethod def _plan_result_to_pipeline_result( plan: ExecutionPlan, plan_result: PlanExecutionResult, ) -> PipelineResult: """将 PlanExecutionResult 转换为 PipelineResult(用于 PipelineReplanner)""" stage_results = {} for sid, sr in plan_result.step_results.items(): status_map = { PlanStepStatus.PENDING: StageStatus.PENDING, PlanStepStatus.RUNNING: StageStatus.RUNNING, PlanStepStatus.COMPLETED: StageStatus.COMPLETED, PlanStepStatus.FAILED: StageStatus.FAILED, PlanStepStatus.SKIPPED: StageStatus.SKIPPED, } stage_results[sid] = StageResult( stage_name=sid, status=status_map.get(sr.status, StageStatus.PENDING), output_data=sr.result, error_message=sr.error, ) overall_status = StageStatus.COMPLETED if plan_result.status == TaskStatus.FAILED: overall_status = StageStatus.FAILED elif plan_result.status == TaskStatus.PARTIALLY_COMPLETED: overall_status = StageStatus.FAILED return PipelineResult( pipeline_name=f"plan_{plan.plan_id}", status=overall_status, stage_results=stage_results, ) @staticmethod def _pipeline_to_plan(pipeline: Pipeline, goal: str) -> ExecutionPlan: """将修正后的 Pipeline 转回 ExecutionPlan""" steps = [] for stage in pipeline.stages: steps.append(PlanStep( step_id=stage.name, name=stage.name, description=stage.action, dependencies=stage.depends_on, input_data=stage.inputs, required_skills=[], )) plan = ExecutionPlan( goal=goal, steps=steps, ) # 重建并行组 planner = GoalPlanner() plan.parallel_groups = planner._build_parallel_groups(steps) return plan @staticmethod def _merge_completed_results( plan: ExecutionPlan, plan_result: PlanExecutionResult, ) -> None: """将已完成步骤的结果合并到新计划中,避免重复执行""" for step in plan.steps: if step.step_id in plan_result.step_results: sr = plan_result.step_results[step.step_id] if sr.status == PlanStepStatus.COMPLETED: step.status = PlanStepStatus.COMPLETED step.result = sr.result elif sr.status == PlanStepStatus.SKIPPED: step.status = PlanStepStatus.SKIPPED @staticmethod def _aggregate_output(plan: ExecutionPlan, plan_result: PlanExecutionResult) -> str: """聚合步骤结果为最终输出""" completed_results = [] for step in plan.steps: sr = plan_result.step_results.get(step.step_id) if sr and sr.status == PlanStepStatus.COMPLETED and sr.result: completed_results.append({ "step": step.name, "result": sr.result, }) if not completed_results: # 没有成功步骤 failed_info = [] for sid in plan_result.failed_steps: sr = plan_result.step_results.get(sid) plan_step = plan.get_step(sid) name = plan_step.name if plan_step else sid failed_info.append(f"- {name}: {sr.error if sr else 'unknown error'}") if failed_info: return f"Plan execution failed.\nFailed steps:\n" + "\n".join(failed_info) return "Plan execution completed with no output." # 简单聚合:将所有成功步骤结果格式化 parts = [] for item in completed_results: result_str = json.dumps(item["result"], ensure_ascii=False) if isinstance(item["result"], dict) else str(item["result"]) parts.append(f"**{item['step']}**: {result_str}") return "\n\n".join(parts) class _LLMStepExecutor: """LLM 直接调用步骤执行器 作为 PlanExecutor 的 agent_pool 替代品, 使每个 PlanStep 通过 LLM 直接调用执行,而非通过 AgentPool。 """ def __init__( self, llm_gateway: "LLMGateway | None" = None, messages: list[dict[str, str]] | None = None, model: str = "default", system_prompt: str | None = None, tools: list["Tool"] | None = None, ): self._llm_gateway = llm_gateway self._messages = messages or [] self._model = model self._system_prompt = system_prompt self._tools = tools self._agents: dict[str, _LLMStepAgent] = {} async def create_agent_from_skill(self, skill_name: str): """创建 LLM 步骤 Agent""" agent = _LLMStepAgent( name=skill_name, llm_gateway=self._llm_gateway, messages=self._messages, model=self._model, system_prompt=self._system_prompt, tools=self._tools, ) self._agents[skill_name] = agent return agent def get_agent(self, key: str): """获取已创建的 Agent""" if key in self._agents: return self._agents[key] # 回退:创建一个默认 Agent agent = _LLMStepAgent( name=key, llm_gateway=self._llm_gateway, messages=self._messages, model=self._model, system_prompt=self._system_prompt, tools=self._tools, ) self._agents[key] = agent return agent class _LLMStepAgent: """LLM 直接调用步骤 Agent 将 PlanStep 的描述作为 prompt 发送给 LLM, 返回 LLM 的响应作为步骤结果。 """ def __init__( self, name: str, llm_gateway: "LLMGateway | None" = None, messages: list[dict[str, str]] | None = None, model: str = "default", system_prompt: str | None = None, tools: list["Tool"] | None = None, ): self.name = name self._llm_gateway = llm_gateway self._messages = messages or [] self._model = model self._system_prompt = system_prompt self._tools = tools async def execute(self, task_msg: TaskMessage) -> "TaskResult": """执行步骤:通过 LLM 直接调用""" if self._llm_gateway is None: raise RuntimeError(f"No LLM gateway available for step '{task_msg.task_id}'") # 构建步骤 prompt input_data = task_msg.input_data step_name = input_data.get("step_name", task_msg.task_id) step_description = input_data.get("step_description", "") dep_results = input_data.get("dependency_results", {}) prompt_parts = [f"Execute the following task step:\n\nStep: {step_name}\nDescription: {step_description}"] if dep_results: prompt_parts.append(f"\nResults from previous steps:\n{json.dumps(dep_results, ensure_ascii=False, indent=2)}") prompt_parts.append("\nProvide a clear, structured result for this step.") conversation: list[dict[str, Any]] = [] if self._system_prompt: conversation.append({"role": "system", "content": self._system_prompt}) # 添加原始对话上下文 for msg in self._messages: conversation.append(msg) conversation.append({"role": "user", "content": "\n".join(prompt_parts)}) response = await self._llm_gateway.chat( messages=conversation, model=self._model, ) now = datetime.now(timezone.utc) return TaskResult( task_id=task_msg.task_id, agent_name=self.name, status=TaskStatus.COMPLETED.value, output_data={"content": response.content or ""}, error_message=None, started_at=now, completed_at=now, )