fix(pipeline): address code review findings for adversarial loop

Critical:
- C1: Add verifier_timeout_seconds for independent Verifier timeout
- C2: Verifier parse failure raises RuntimeError instead of dead-loop

Major:
- M1: Inject previous_output into Worker retry context
- M2: Add Pydantic ge/le constraint on ReviewFeedback.score
- M3: Use Literal type for feedback_mode enum validation
- M4: Use Literal types for ReviewIssue severity and category
- M5: Merge error messages when escalation agent also fails

Tests: 8 new test cases added (19 total), all passing
This commit is contained in:
chiguyong 2026-06-12 10:02:37 +08:00
parent ddc735b078
commit 8c365486e2
4 changed files with 279 additions and 33 deletions

View File

@ -36,6 +36,7 @@ stages:
depends_on:
- test
max_adversarial_rounds: 3
verifier_timeout_seconds: 120
feedback_mode: "structured+natural"
escalate_on_exhaust: human_approval
inputs:

View File

@ -446,7 +446,6 @@ class PipelineEngine:
resolved_inputs = self._resolve_variables(stage.inputs, pipeline_result.variables)
current_context = resolved_inputs.copy()
last_worker_result: StageResult | None = None
for round_num in range(1, stage.max_adversarial_rounds + 1):
adversarial_state.current_round = round_num
@ -468,8 +467,6 @@ class PipelineEngine:
# Worker 执行失败,直接返回
return worker_result
last_worker_result = worker_result
# 2. 执行 Verifier 审查
try:
verifier_feedback = await self._execute_verifier(
@ -526,12 +523,16 @@ class PipelineEngine:
started_at,
)
# 5. 打回 Worker 重做,附带反馈
# 5. 打回 Worker 重做,附带反馈和前次产出
feedback_context = self._build_feedback_context(
verifier_feedback,
stage.feedback_mode,
)
current_context = {**resolved_inputs, **feedback_context}
current_context = {
**resolved_inputs,
"previous_output": worker_result.output_data,
**feedback_context,
}
# 不应该到达这里,但以防万一
return StageResult(
@ -549,8 +550,19 @@ class PipelineEngine:
input_data: dict[str, Any],
stage: PipelineStage,
started_at: str,
timeout_seconds: int | None = None,
) -> StageResult:
"""执行单个 Agent stage不含对抗逻辑"""
"""执行单个 Agent stage不含对抗逻辑
Args:
agent_name: Agent 名称
action: 执行动作
input_data: 输入数据
stage: 所属 stage
started_at: 开始时间
timeout_seconds: 独立超时时间不传则使用 stage.timeout_seconds
"""
effective_timeout = timeout_seconds if timeout_seconds is not None else stage.timeout_seconds
if self._dispatcher is None:
# Dry-run 模式
return StageResult(
@ -572,14 +584,14 @@ class PipelineEngine:
input_data=input_data,
callback_url=None,
created_at=datetime.now(timezone.utc),
timeout_seconds=stage.timeout_seconds,
timeout_seconds=effective_timeout,
)
async def _dispatch_and_wait() -> StageResult:
"""Dispatch task and wait for result"""
await self._dispatcher.dispatch(task)
for _ in range(stage.timeout_seconds):
for _ in range(effective_timeout):
status = await self._dispatcher.get_task_status(task.task_id)
if status["status"] in ("completed", "failed", "cancelled"):
return StageResult(
@ -595,7 +607,7 @@ class PipelineEngine:
return StageResult(
stage_name=stage.name,
status=StageStatus.FAILED,
error_message=f"Timeout after {stage.timeout_seconds}s",
error_message=f"Timeout after {effective_timeout}s",
started_at=started_at,
completed_at=datetime.now(timezone.utc).isoformat(),
)
@ -639,13 +651,14 @@ class PipelineEngine:
),
}
# 执行 Verifier Agent
# 执行 Verifier Agent(使用独立超时)
verifier_result = await self._execute_agent_stage(
verifier_name,
"review",
verifier_input,
stage,
started_at,
timeout_seconds=stage.verifier_timeout_seconds,
)
if verifier_result.status != StageStatus.COMPLETED:
@ -667,20 +680,12 @@ class PipelineEngine:
)
return feedback
except Exception as e:
# 如果解析失败,创建默认反馈
logger.warning(f"Failed to parse verifier output: {e}")
return ReviewFeedback(
passed=False,
issues=[
ReviewIssue(
severity="major",
category="logic_error",
description=f"Failed to parse verifier output: {e}",
)
],
summary="Verifier output parsing failed",
score=0.0,
)
# 解析失败时直接抛出异常,避免死循环
logger.error(f"Failed to parse verifier output: {e}")
raise RuntimeError(
f"Verifier '{verifier_name}' returned unparseable output: {e}. "
f"Raw output keys: {list(output_data.keys())}"
) from e
def _build_feedback_context(
self,
@ -742,7 +747,8 @@ class PipelineEngine:
"Please regenerate addressing the feedback."
)
else:
# 默认使用 structured+natural
# 未知模式fallback 到 structured+natural
logger.warning(f"Unknown feedback_mode '{feedback_mode}', falling back to structured+natural")
feedback_context["review_feedback"] = {
"summary": feedback.summary,
"issues": issues_list,
@ -750,7 +756,8 @@ class PipelineEngine:
}
feedback_context["instruction"] = (
"Your previous output did not pass review. "
"Please fix the issues listed above and regenerate."
"Please fix the issues listed above and regenerate. "
f"Review summary: {feedback.summary}"
)
return feedback_context
@ -803,6 +810,13 @@ class PipelineEngine:
for i, fb in enumerate(adversarial_state.feedback_history)
],
}
# 如果升级 Agent 也失败了,合并错误信息
if escalate_result.status == StageStatus.FAILED:
escalate_result.error_message = (
f"Escalation to '{stage.escalate_on_exhaust}' also failed: "
f"{escalate_result.error_message}. "
f"Original adversarial rounds exhausted: {adversarial_state.current_round}/{adversarial_state.max_rounds}"
)
return escalate_result
else:
# 返回失败结果,附带审查历史

View File

@ -1,7 +1,7 @@
"""Pipeline 数据模型"""
from enum import Enum
from typing import Any
from typing import Any, Literal
from pydantic import BaseModel, Field
@ -18,9 +18,9 @@ class StageStatus(str, Enum):
class ReviewIssue(BaseModel):
"""单条审查问题"""
severity: str = Field(description="问题严重程度: critical/major/minor")
category: str = Field(description="问题类别: logic_error/security/style/test_failure/architecture")
description: str = Field(description="问题描述")
severity: Literal["critical", "major", "minor"] = Field(description="问题严重程度")
category: Literal["logic_error", "security", "style", "test_failure", "architecture"] = Field(description="问题类别")
description: str = Field(min_length=1, description="问题描述")
location: str | None = Field(default=None, description="文件路径/行号")
suggestion: str | None = Field(default=None, description="修复建议")
@ -29,8 +29,8 @@ class ReviewFeedback(BaseModel):
"""Verifier 返回的结构化审查反馈"""
passed: bool = Field(description="是否通过审查")
issues: list[ReviewIssue] = Field(default_factory=list, description="问题列表")
summary: str = Field(description="自然语言审查报告")
score: float = Field(description="质量评分 (0-1)")
summary: str = Field(min_length=1, description="自然语言审查报告")
score: float = Field(ge=0.0, le=1.0, description="质量评分 (0-1)")
class AdversarialState(BaseModel):
@ -58,7 +58,8 @@ class PipelineStage(BaseModel):
# 对抗闭环相关字段
verifier: str | None = Field(default=None, description="Verifier Agent 名称,配置后启用对抗模式")
max_adversarial_rounds: int = Field(default=3, description="最大对抗轮次")
feedback_mode: str = Field(default="structured+natural", description="反馈模式: structured+natural / structured / natural")
verifier_timeout_seconds: int = Field(default=120, description="Verifier Agent 独立超时时间(秒),避免与 Worker 共享 timeout_seconds")
feedback_mode: Literal["structured+natural", "structured", "natural"] = Field(default="structured+natural", description="反馈模式")
escalate_on_exhaust: str | None = Field(default=None, description="对抗轮次耗尽后的升级目标")
model_config = {"arbitrary_types_allowed": True}

View File

@ -29,12 +29,14 @@ class TestPipelineSchemaAdversarial:
action="fix_code_issues",
verifier="code_reviewer",
max_adversarial_rounds=3,
verifier_timeout_seconds=120,
feedback_mode="structured+natural",
escalate_on_exhaust="human_approval",
)
assert stage.verifier == "code_reviewer"
assert stage.max_adversarial_rounds == 3
assert stage.verifier_timeout_seconds == 120
assert stage.feedback_mode == "structured+natural"
assert stage.escalate_on_exhaust == "human_approval"
@ -50,6 +52,7 @@ class TestPipelineSchemaAdversarial:
assert stage.max_adversarial_rounds == 3 # 默认值
assert stage.feedback_mode == "structured+natural" # 默认值
assert stage.escalate_on_exhaust is None
assert stage.verifier_timeout_seconds == 120 # 默认值
def test_review_feedback_serialization(self):
"""Happy path: 创建 ReviewFeedback 对象,验证序列化和反序列化正常"""
@ -86,6 +89,60 @@ class TestPipelineSchemaAdversarial:
assert len(restored.issues) == 2
assert restored.issues[0].severity == "critical"
def test_review_feedback_score_validation(self):
"""Edge case: score 超出 0-1 范围时校验失败"""
import pydantic
with pytest.raises(pydantic.ValidationError):
ReviewFeedback(
passed=False,
issues=[],
summary="Test",
score=1.5,
)
with pytest.raises(pydantic.ValidationError):
ReviewFeedback(
passed=False,
issues=[],
summary="Test",
score=-0.3,
)
def test_review_issue_invalid_severity(self):
"""Edge case: severity 不在枚举范围内时校验失败"""
import pydantic
with pytest.raises(pydantic.ValidationError):
ReviewIssue(
severity="invalid",
category="logic_error",
description="Test",
)
def test_review_issue_invalid_category(self):
"""Edge case: category 不在枚举范围内时校验失败"""
import pydantic
with pytest.raises(pydantic.ValidationError):
ReviewIssue(
severity="major",
category="invalid",
description="Test",
)
def test_feedback_mode_invalid(self):
"""Edge case: feedback_mode 不在枚举范围内时校验失败"""
import pydantic
with pytest.raises(pydantic.ValidationError):
PipelineStage(
name="review",
agent="developer",
action="fix",
feedback_mode="invalid_mode",
)
def test_adversarial_state_tracking(self):
"""Happy path: AdversarialState 正确追踪对抗轮次"""
state = AdversarialState(
@ -366,3 +423,176 @@ class TestEscalation:
assert "Adversarial rounds exhausted" in result.error_message
assert "adversarial_metadata" in result.output_data
assert result.output_data["adversarial_metadata"]["total_rounds"] == 3
@pytest.mark.asyncio
async def test_escalate_to_agent_success(self, engine, started_at):
"""Happy path: 配置升级且升级 Agent 成功"""
stage = PipelineStage(
name="review",
agent="developer",
action="fix",
verifier="reviewer",
max_adversarial_rounds=3,
escalate_on_exhaust="human_approval",
)
worker_result = StageResult(
stage_name="review",
status=StageStatus.COMPLETED,
output_data={"code": "bad code"},
)
adversarial_state = AdversarialState(
current_round=3,
max_rounds=3,
feedback_history=[
ReviewFeedback(
passed=False,
issues=[ReviewIssue(severity="major", category="logic_error", description="Bug")],
summary="Failed review",
score=0.3,
)
],
)
# Mock 升级 Agent 成功
engine._dispatcher.dispatch = AsyncMock()
engine._dispatcher.get_task_status = AsyncMock(return_value={
"status": "completed",
"output_data": {"approved": True, "decision": "Accept with modifications"},
})
result = await engine._escalate(stage, worker_result, adversarial_state, started_at)
assert result.status == StageStatus.COMPLETED
assert "adversarial_metadata" in result.output_data
assert result.output_data["adversarial_metadata"]["escalated_to"] == "human_approval"
@pytest.mark.asyncio
async def test_escalate_to_agent_failure(self, engine, started_at):
"""Error path: 配置升级但升级 Agent 也失败"""
stage = PipelineStage(
name="review",
agent="developer",
action="fix",
verifier="reviewer",
max_adversarial_rounds=3,
escalate_on_exhaust="human_approval",
)
worker_result = StageResult(
stage_name="review",
status=StageStatus.COMPLETED,
output_data={"code": "bad code"},
)
adversarial_state = AdversarialState(
current_round=3,
max_rounds=3,
feedback_history=[
ReviewFeedback(
passed=False,
issues=[ReviewIssue(severity="major", category="logic_error", description="Bug")],
summary="Failed review",
score=0.3,
)
],
)
# Mock 升级 Agent 失败
engine._dispatcher.dispatch = AsyncMock()
engine._dispatcher.get_task_status = AsyncMock(return_value={
"status": "failed",
"error_message": "Human not available",
})
result = await engine._escalate(stage, worker_result, adversarial_state, started_at)
assert result.status == StageStatus.FAILED
assert "Escalation to 'human_approval' also failed" in result.error_message
assert "adversarial_metadata" in result.output_data
class TestVerifierFailure:
"""测试 Verifier 执行异常和解析失败"""
@pytest.fixture
def engine(self):
dispatcher = AsyncMock()
return PipelineEngine(dispatcher=dispatcher)
@pytest.fixture
def saga(self):
return SagaOrchestrator()
@pytest.fixture
def pipeline_result(self):
return PipelineResult(pipeline_name="test")
@pytest.mark.asyncio
async def test_verifier_parse_failure_raises_error(self, engine, saga, pipeline_result):
"""Error path: Verifier 输出无法解析时抛出异常,而非静默继续"""
stage = PipelineStage(
name="review",
agent="developer_agent",
action="fix",
verifier="code_reviewer",
max_adversarial_rounds=3,
)
call_count = 0
async def mock_dispatch(task):
pass
async def mock_get_status(task_id):
nonlocal call_count
call_count += 1
if call_count % 2 == 1:
# Worker 成功
return {
"status": "completed",
"output_data": {"code": "some code"},
}
else:
# Verifier 返回 score 超出范围的数据,触发 Pydantic 校验失败
return {
"status": "completed",
"output_data": {
"passed": False,
"score": 5.0, # 超出 0-1 范围
"summary": "Bad score",
"issues": [],
},
}
engine._dispatcher.dispatch = AsyncMock(side_effect=mock_dispatch)
engine._dispatcher.get_task_status = AsyncMock(side_effect=mock_get_status)
result = await engine._execute_stage(stage, pipeline_result, saga)
# Verifier 解析失败应该导致 FAILED而非死循环
assert result.status == StageStatus.FAILED
assert "Verifier failed" in result.error_message
@pytest.mark.asyncio
async def test_worker_failure_short_circuits(self, engine, saga, pipeline_result):
"""Error path: Worker 执行失败时直接返回,不进入 Verifier"""
stage = PipelineStage(
name="review",
agent="developer_agent",
action="fix",
verifier="code_reviewer",
max_adversarial_rounds=3,
)
engine._dispatcher.dispatch = AsyncMock()
engine._dispatcher.get_task_status = AsyncMock(return_value={
"status": "failed",
"error_message": "Worker crashed",
})
result = await engine._execute_stage(stage, pipeline_result, saga)
assert result.status == StageStatus.FAILED
assert "Worker crashed" in (result.error_message or "")