From 8c365486e23eed6eb90307e41ebaad9ce66b864d Mon Sep 17 00:00:00 2001 From: chiguyong Date: Fri, 12 Jun 2026 10:02:37 +0800 Subject: [PATCH] fix(pipeline): address code review findings for adversarial loop Critical: - C1: Add verifier_timeout_seconds for independent Verifier timeout - C2: Verifier parse failure raises RuntimeError instead of dead-loop Major: - M1: Inject previous_output into Worker retry context - M2: Add Pydantic ge/le constraint on ReviewFeedback.score - M3: Use Literal type for feedback_mode enum validation - M4: Use Literal types for ReviewIssue severity and category - M5: Merge error messages when escalation agent also fails Tests: 8 new test cases added (19 total), all passing --- configs/pipelines/coding_harness.yaml | 1 + src/agentkit/orchestrator/pipeline_engine.py | 66 +++--- src/agentkit/orchestrator/pipeline_schema.py | 15 +- tests/unit/test_pipeline_adversarial.py | 230 +++++++++++++++++++ 4 files changed, 279 insertions(+), 33 deletions(-) diff --git a/configs/pipelines/coding_harness.yaml b/configs/pipelines/coding_harness.yaml index 30223f5..8aa7c2a 100644 --- a/configs/pipelines/coding_harness.yaml +++ b/configs/pipelines/coding_harness.yaml @@ -36,6 +36,7 @@ stages: depends_on: - test max_adversarial_rounds: 3 + verifier_timeout_seconds: 120 feedback_mode: "structured+natural" escalate_on_exhaust: human_approval inputs: diff --git a/src/agentkit/orchestrator/pipeline_engine.py b/src/agentkit/orchestrator/pipeline_engine.py index 103dfa7..f00bcb8 100644 --- a/src/agentkit/orchestrator/pipeline_engine.py +++ b/src/agentkit/orchestrator/pipeline_engine.py @@ -446,7 +446,6 @@ class PipelineEngine: resolved_inputs = self._resolve_variables(stage.inputs, pipeline_result.variables) current_context = resolved_inputs.copy() - last_worker_result: StageResult | None = None for round_num in range(1, stage.max_adversarial_rounds + 1): adversarial_state.current_round = round_num @@ -468,8 +467,6 @@ class PipelineEngine: # Worker 执行失败,直接返回 return worker_result - last_worker_result = worker_result - # 2. 执行 Verifier 审查 try: verifier_feedback = await self._execute_verifier( @@ -526,12 +523,16 @@ class PipelineEngine: started_at, ) - # 5. 打回 Worker 重做,附带反馈 + # 5. 打回 Worker 重做,附带反馈和前次产出 feedback_context = self._build_feedback_context( verifier_feedback, stage.feedback_mode, ) - current_context = {**resolved_inputs, **feedback_context} + current_context = { + **resolved_inputs, + "previous_output": worker_result.output_data, + **feedback_context, + } # 不应该到达这里,但以防万一 return StageResult( @@ -549,8 +550,19 @@ class PipelineEngine: input_data: dict[str, Any], stage: PipelineStage, started_at: str, + timeout_seconds: int | None = None, ) -> StageResult: - """执行单个 Agent stage(不含对抗逻辑)""" + """执行单个 Agent stage(不含对抗逻辑) + + Args: + agent_name: Agent 名称 + action: 执行动作 + input_data: 输入数据 + stage: 所属 stage + started_at: 开始时间 + timeout_seconds: 独立超时时间,不传则使用 stage.timeout_seconds + """ + effective_timeout = timeout_seconds if timeout_seconds is not None else stage.timeout_seconds if self._dispatcher is None: # Dry-run 模式 return StageResult( @@ -572,14 +584,14 @@ class PipelineEngine: input_data=input_data, callback_url=None, created_at=datetime.now(timezone.utc), - timeout_seconds=stage.timeout_seconds, + timeout_seconds=effective_timeout, ) async def _dispatch_and_wait() -> StageResult: """Dispatch task and wait for result""" await self._dispatcher.dispatch(task) - for _ in range(stage.timeout_seconds): + for _ in range(effective_timeout): status = await self._dispatcher.get_task_status(task.task_id) if status["status"] in ("completed", "failed", "cancelled"): return StageResult( @@ -595,7 +607,7 @@ class PipelineEngine: return StageResult( stage_name=stage.name, status=StageStatus.FAILED, - error_message=f"Timeout after {stage.timeout_seconds}s", + error_message=f"Timeout after {effective_timeout}s", started_at=started_at, completed_at=datetime.now(timezone.utc).isoformat(), ) @@ -639,13 +651,14 @@ class PipelineEngine: ), } - # 执行 Verifier Agent + # 执行 Verifier Agent(使用独立超时) verifier_result = await self._execute_agent_stage( verifier_name, "review", verifier_input, stage, started_at, + timeout_seconds=stage.verifier_timeout_seconds, ) if verifier_result.status != StageStatus.COMPLETED: @@ -667,20 +680,12 @@ class PipelineEngine: ) return feedback except Exception as e: - # 如果解析失败,创建默认反馈 - logger.warning(f"Failed to parse verifier output: {e}") - return ReviewFeedback( - passed=False, - issues=[ - ReviewIssue( - severity="major", - category="logic_error", - description=f"Failed to parse verifier output: {e}", - ) - ], - summary="Verifier output parsing failed", - score=0.0, - ) + # 解析失败时直接抛出异常,避免死循环 + logger.error(f"Failed to parse verifier output: {e}") + raise RuntimeError( + f"Verifier '{verifier_name}' returned unparseable output: {e}. " + f"Raw output keys: {list(output_data.keys())}" + ) from e def _build_feedback_context( self, @@ -742,7 +747,8 @@ class PipelineEngine: "Please regenerate addressing the feedback." ) else: - # 默认使用 structured+natural + # 未知模式,fallback 到 structured+natural + logger.warning(f"Unknown feedback_mode '{feedback_mode}', falling back to structured+natural") feedback_context["review_feedback"] = { "summary": feedback.summary, "issues": issues_list, @@ -750,7 +756,8 @@ class PipelineEngine: } feedback_context["instruction"] = ( "Your previous output did not pass review. " - "Please fix the issues listed above and regenerate." + "Please fix the issues listed above and regenerate. " + f"Review summary: {feedback.summary}" ) return feedback_context @@ -803,6 +810,13 @@ class PipelineEngine: for i, fb in enumerate(adversarial_state.feedback_history) ], } + # 如果升级 Agent 也失败了,合并错误信息 + if escalate_result.status == StageStatus.FAILED: + escalate_result.error_message = ( + f"Escalation to '{stage.escalate_on_exhaust}' also failed: " + f"{escalate_result.error_message}. " + f"Original adversarial rounds exhausted: {adversarial_state.current_round}/{adversarial_state.max_rounds}" + ) return escalate_result else: # 返回失败结果,附带审查历史 diff --git a/src/agentkit/orchestrator/pipeline_schema.py b/src/agentkit/orchestrator/pipeline_schema.py index 17419b8..5f3cf0a 100644 --- a/src/agentkit/orchestrator/pipeline_schema.py +++ b/src/agentkit/orchestrator/pipeline_schema.py @@ -1,7 +1,7 @@ """Pipeline 数据模型""" from enum import Enum -from typing import Any +from typing import Any, Literal from pydantic import BaseModel, Field @@ -18,9 +18,9 @@ class StageStatus(str, Enum): class ReviewIssue(BaseModel): """单条审查问题""" - severity: str = Field(description="问题严重程度: critical/major/minor") - category: str = Field(description="问题类别: logic_error/security/style/test_failure/architecture") - description: str = Field(description="问题描述") + severity: Literal["critical", "major", "minor"] = Field(description="问题严重程度") + category: Literal["logic_error", "security", "style", "test_failure", "architecture"] = Field(description="问题类别") + description: str = Field(min_length=1, description="问题描述") location: str | None = Field(default=None, description="文件路径/行号") suggestion: str | None = Field(default=None, description="修复建议") @@ -29,8 +29,8 @@ class ReviewFeedback(BaseModel): """Verifier 返回的结构化审查反馈""" passed: bool = Field(description="是否通过审查") issues: list[ReviewIssue] = Field(default_factory=list, description="问题列表") - summary: str = Field(description="自然语言审查报告") - score: float = Field(description="质量评分 (0-1)") + summary: str = Field(min_length=1, description="自然语言审查报告") + score: float = Field(ge=0.0, le=1.0, description="质量评分 (0-1)") class AdversarialState(BaseModel): @@ -58,7 +58,8 @@ class PipelineStage(BaseModel): # 对抗闭环相关字段 verifier: str | None = Field(default=None, description="Verifier Agent 名称,配置后启用对抗模式") max_adversarial_rounds: int = Field(default=3, description="最大对抗轮次") - feedback_mode: str = Field(default="structured+natural", description="反馈模式: structured+natural / structured / natural") + verifier_timeout_seconds: int = Field(default=120, description="Verifier Agent 独立超时时间(秒),避免与 Worker 共享 timeout_seconds") + feedback_mode: Literal["structured+natural", "structured", "natural"] = Field(default="structured+natural", description="反馈模式") escalate_on_exhaust: str | None = Field(default=None, description="对抗轮次耗尽后的升级目标") model_config = {"arbitrary_types_allowed": True} diff --git a/tests/unit/test_pipeline_adversarial.py b/tests/unit/test_pipeline_adversarial.py index 01769ab..b4c3bb5 100644 --- a/tests/unit/test_pipeline_adversarial.py +++ b/tests/unit/test_pipeline_adversarial.py @@ -29,12 +29,14 @@ class TestPipelineSchemaAdversarial: action="fix_code_issues", verifier="code_reviewer", max_adversarial_rounds=3, + verifier_timeout_seconds=120, feedback_mode="structured+natural", escalate_on_exhaust="human_approval", ) assert stage.verifier == "code_reviewer" assert stage.max_adversarial_rounds == 3 + assert stage.verifier_timeout_seconds == 120 assert stage.feedback_mode == "structured+natural" assert stage.escalate_on_exhaust == "human_approval" @@ -50,6 +52,7 @@ class TestPipelineSchemaAdversarial: assert stage.max_adversarial_rounds == 3 # 默认值 assert stage.feedback_mode == "structured+natural" # 默认值 assert stage.escalate_on_exhaust is None + assert stage.verifier_timeout_seconds == 120 # 默认值 def test_review_feedback_serialization(self): """Happy path: 创建 ReviewFeedback 对象,验证序列化和反序列化正常""" @@ -86,6 +89,60 @@ class TestPipelineSchemaAdversarial: assert len(restored.issues) == 2 assert restored.issues[0].severity == "critical" + def test_review_feedback_score_validation(self): + """Edge case: score 超出 0-1 范围时校验失败""" + import pydantic + + with pytest.raises(pydantic.ValidationError): + ReviewFeedback( + passed=False, + issues=[], + summary="Test", + score=1.5, + ) + + with pytest.raises(pydantic.ValidationError): + ReviewFeedback( + passed=False, + issues=[], + summary="Test", + score=-0.3, + ) + + def test_review_issue_invalid_severity(self): + """Edge case: severity 不在枚举范围内时校验失败""" + import pydantic + + with pytest.raises(pydantic.ValidationError): + ReviewIssue( + severity="invalid", + category="logic_error", + description="Test", + ) + + def test_review_issue_invalid_category(self): + """Edge case: category 不在枚举范围内时校验失败""" + import pydantic + + with pytest.raises(pydantic.ValidationError): + ReviewIssue( + severity="major", + category="invalid", + description="Test", + ) + + def test_feedback_mode_invalid(self): + """Edge case: feedback_mode 不在枚举范围内时校验失败""" + import pydantic + + with pytest.raises(pydantic.ValidationError): + PipelineStage( + name="review", + agent="developer", + action="fix", + feedback_mode="invalid_mode", + ) + def test_adversarial_state_tracking(self): """Happy path: AdversarialState 正确追踪对抗轮次""" state = AdversarialState( @@ -366,3 +423,176 @@ class TestEscalation: assert "Adversarial rounds exhausted" in result.error_message assert "adversarial_metadata" in result.output_data assert result.output_data["adversarial_metadata"]["total_rounds"] == 3 + + @pytest.mark.asyncio + async def test_escalate_to_agent_success(self, engine, started_at): + """Happy path: 配置升级且升级 Agent 成功""" + stage = PipelineStage( + name="review", + agent="developer", + action="fix", + verifier="reviewer", + max_adversarial_rounds=3, + escalate_on_exhaust="human_approval", + ) + + worker_result = StageResult( + stage_name="review", + status=StageStatus.COMPLETED, + output_data={"code": "bad code"}, + ) + + adversarial_state = AdversarialState( + current_round=3, + max_rounds=3, + feedback_history=[ + ReviewFeedback( + passed=False, + issues=[ReviewIssue(severity="major", category="logic_error", description="Bug")], + summary="Failed review", + score=0.3, + ) + ], + ) + + # Mock 升级 Agent 成功 + engine._dispatcher.dispatch = AsyncMock() + engine._dispatcher.get_task_status = AsyncMock(return_value={ + "status": "completed", + "output_data": {"approved": True, "decision": "Accept with modifications"}, + }) + + result = await engine._escalate(stage, worker_result, adversarial_state, started_at) + + assert result.status == StageStatus.COMPLETED + assert "adversarial_metadata" in result.output_data + assert result.output_data["adversarial_metadata"]["escalated_to"] == "human_approval" + + @pytest.mark.asyncio + async def test_escalate_to_agent_failure(self, engine, started_at): + """Error path: 配置升级但升级 Agent 也失败""" + stage = PipelineStage( + name="review", + agent="developer", + action="fix", + verifier="reviewer", + max_adversarial_rounds=3, + escalate_on_exhaust="human_approval", + ) + + worker_result = StageResult( + stage_name="review", + status=StageStatus.COMPLETED, + output_data={"code": "bad code"}, + ) + + adversarial_state = AdversarialState( + current_round=3, + max_rounds=3, + feedback_history=[ + ReviewFeedback( + passed=False, + issues=[ReviewIssue(severity="major", category="logic_error", description="Bug")], + summary="Failed review", + score=0.3, + ) + ], + ) + + # Mock 升级 Agent 失败 + engine._dispatcher.dispatch = AsyncMock() + engine._dispatcher.get_task_status = AsyncMock(return_value={ + "status": "failed", + "error_message": "Human not available", + }) + + result = await engine._escalate(stage, worker_result, adversarial_state, started_at) + + assert result.status == StageStatus.FAILED + assert "Escalation to 'human_approval' also failed" in result.error_message + assert "adversarial_metadata" in result.output_data + + +class TestVerifierFailure: + """测试 Verifier 执行异常和解析失败""" + + @pytest.fixture + def engine(self): + dispatcher = AsyncMock() + return PipelineEngine(dispatcher=dispatcher) + + @pytest.fixture + def saga(self): + return SagaOrchestrator() + + @pytest.fixture + def pipeline_result(self): + return PipelineResult(pipeline_name="test") + + @pytest.mark.asyncio + async def test_verifier_parse_failure_raises_error(self, engine, saga, pipeline_result): + """Error path: Verifier 输出无法解析时抛出异常,而非静默继续""" + stage = PipelineStage( + name="review", + agent="developer_agent", + action="fix", + verifier="code_reviewer", + max_adversarial_rounds=3, + ) + + call_count = 0 + + async def mock_dispatch(task): + pass + + async def mock_get_status(task_id): + nonlocal call_count + call_count += 1 + if call_count % 2 == 1: + # Worker 成功 + return { + "status": "completed", + "output_data": {"code": "some code"}, + } + else: + # Verifier 返回 score 超出范围的数据,触发 Pydantic 校验失败 + return { + "status": "completed", + "output_data": { + "passed": False, + "score": 5.0, # 超出 0-1 范围 + "summary": "Bad score", + "issues": [], + }, + } + + engine._dispatcher.dispatch = AsyncMock(side_effect=mock_dispatch) + engine._dispatcher.get_task_status = AsyncMock(side_effect=mock_get_status) + + result = await engine._execute_stage(stage, pipeline_result, saga) + + # Verifier 解析失败应该导致 FAILED,而非死循环 + assert result.status == StageStatus.FAILED + assert "Verifier failed" in result.error_message + + @pytest.mark.asyncio + async def test_worker_failure_short_circuits(self, engine, saga, pipeline_result): + """Error path: Worker 执行失败时直接返回,不进入 Verifier""" + stage = PipelineStage( + name="review", + agent="developer_agent", + action="fix", + verifier="code_reviewer", + max_adversarial_rounds=3, + ) + + engine._dispatcher.dispatch = AsyncMock() + engine._dispatcher.get_task_status = AsyncMock(return_value={ + "status": "failed", + "error_message": "Worker crashed", + }) + + result = await engine._execute_stage(stage, pipeline_result, saga) + + assert result.status == StageStatus.FAILED + assert "Worker crashed" in (result.error_message or "")