"""Tests for PlanChecker — 计划检查与复盘""" from __future__ import annotations import pytest from datetime import datetime, timezone from typing import Any from agentkit.core.plan_checker import ( CheckResult, CheckStatus, PlanChecker, QualityGate, ReviewReport, RuleBasedStepReflector, ) from agentkit.core.plan_executor import PlanExecutionResult, StepExecutionResult from agentkit.core.plan_schema import ExecutionPlan, PlanStep, PlanStepStatus from agentkit.skills.base import QualityGateConfig from agentkit.evolution.experience_store import InMemoryExperienceStore from agentkit.evolution.experience_schema import TaskExperience # --- Helpers --- def make_step( step_id: str = "s0", name: str = "Test Step", description: str = "A test step", **kwargs, ) -> PlanStep: return PlanStep(step_id=step_id, name=name, description=description, **kwargs) def make_step_result( step_id: str = "s0", status: PlanStepStatus = PlanStepStatus.COMPLETED, result: dict[str, Any] | None = None, error: str | None = None, retry_count: int = 0, duration_ms: float = 100.0, ) -> StepExecutionResult: return StepExecutionResult( step_id=step_id, status=status, result=result, error=error, retry_count=retry_count, duration_ms=duration_ms, ) def make_plan_result( plan_id: str = "p1", step_results: dict[str, StepExecutionResult] | None = None, total_duration_ms: float = 500.0, ) -> PlanExecutionResult: from agentkit.core.protocol import TaskStatus if step_results is None: step_results = { "s0": make_step_result(), } return PlanExecutionResult( plan_id=plan_id, step_results=step_results, status=TaskStatus.COMPLETED, total_duration_ms=total_duration_ms, ) def make_plan( steps: list[PlanStep] | None = None, plan_id: str = "p1", goal: str = "test goal", ) -> ExecutionPlan: if steps is None: steps = [make_step()] return ExecutionPlan( plan_id=plan_id, goal=goal, steps=steps, parallel_groups=[[s.step_id for s in steps]], confirmed=True, ) # --- QualityGate Tests --- class TestQualityGate: """QualityGate 规则检查""" def test_pass_when_no_config(self): """无配置时所有结果通过""" gate = QualityGate() step = make_step() result = make_step_result(result={"data": "test"}) check = gate.check(step, result) assert check.status == CheckStatus.PASS def test_pass_with_required_fields_present(self): """必填字段全部存在时通过""" config = QualityGateConfig(required_fields=["name", "value"]) gate = QualityGate(config=config) step = make_step() result = make_step_result(result={"name": "test", "value": 42}) check = gate.check(step, result) assert check.status == CheckStatus.PASS def test_fail_with_missing_required_fields(self): """缺少必填字段时不通过""" config = QualityGateConfig(required_fields=["name", "value", "missing"]) gate = QualityGate(config=config) step = make_step() result = make_step_result(result={"name": "test", "value": 42}) check = gate.check(step, result) assert check.status == CheckStatus.FAIL assert "missing" in check.reason.lower() or "Missing required fields" in check.reason def test_fail_with_none_result_and_required_fields(self): """结果为 None 且有必填字段时不通过""" config = QualityGateConfig(required_fields=["name"]) gate = QualityGate(config=config) step = make_step() result = make_step_result(result=None) check = gate.check(step, result) assert check.status == CheckStatus.FAIL def test_pass_with_min_word_count_met(self): """字数满足最低要求时通过""" config = QualityGateConfig(min_word_count=3) gate = QualityGate(config=config) step = make_step() result = make_step_result(result={"text": "hello world foo"}) check = gate.check(step, result) assert check.status == CheckStatus.PASS def test_fail_with_min_word_count_not_met(self): """字数不满足最低要求时不通过""" config = QualityGateConfig(min_word_count=100) gate = QualityGate(config=config) step = make_step() result = make_step_result(result={"text": "hello"}) check = gate.check(step, result) assert check.status == CheckStatus.FAIL assert "word count" in check.reason.lower() or "Word count" in check.reason def test_skip_for_non_completed_step(self): """非完成步骤跳过检查""" gate = QualityGate() step = make_step() result = make_step_result(status=PlanStepStatus.FAILED, error="some error") check = gate.check(step, result) assert check.status == CheckStatus.SKIP def test_skip_for_skipped_step(self): """跳过的步骤跳过检查""" gate = QualityGate() step = make_step() result = make_step_result(status=PlanStepStatus.SKIPPED, error="skipped") check = gate.check(step, result) assert check.status == CheckStatus.SKIP def test_custom_validator_pass(self): """自定义校验通过""" def validator(result): return (True, "") gate = QualityGate(custom_validator=validator) step = make_step() result = make_step_result(result={"data": "test"}) check = gate.check(step, result) assert check.status == CheckStatus.PASS def test_custom_validator_fail(self): """自定义校验不通过""" def validator(result): return (False, "Output format incorrect") gate = QualityGate(custom_validator=validator) step = make_step() result = make_step_result(result={"data": "test"}) check = gate.check(step, result) assert check.status == CheckStatus.FAIL assert "Output format incorrect" in check.reason def test_custom_validator_exception(self): """自定义校验抛异常时不通过""" def validator(result): raise ValueError("Validator crashed") gate = QualityGate(custom_validator=validator) step = make_step() result = make_step_result(result={"data": "test"}) check = gate.check(step, result) assert check.status == CheckStatus.FAIL assert "error" in check.reason.lower() or "Validator crashed" in check.reason def test_combined_required_fields_and_word_count(self): """同时检查必填字段和字数""" config = QualityGateConfig(required_fields=["report"], min_word_count=5) gate = QualityGate(config=config) step = make_step() # 字数不足 result = make_step_result(result={"report": "hi"}) check = gate.check(step, result) assert check.status == CheckStatus.FAIL # 字数满足 result2 = make_step_result(result={"report": "This is a detailed report content"}) check2 = gate.check(step, result2) assert check2.status == CheckStatus.PASS def test_quality_score_decreases_with_failures(self): """失败项越多质量评分越低""" config = QualityGateConfig(required_fields=["a", "b"], min_word_count=100) gate = QualityGate(config=config) step = make_step() result = make_step_result(result={"a": "x"}) # missing b + word count check = gate.check(step, result) assert check.quality_score < 0.5 # --- RuleBasedStepReflector Tests --- class TestRuleBasedStepReflector: """基于规则的步骤反思器""" @pytest.mark.asyncio async def test_completed_step_score(self): """完成步骤获得合理评分""" reflector = RuleBasedStepReflector() step = make_step() result = make_step_result( result={"data": "test"}, retry_count=0, duration_ms=5000, ) score, suggestions = await reflector.reflect_step(step, result) assert score >= 0.8 assert len(suggestions) == 0 @pytest.mark.asyncio async def test_failed_step_zero_score(self): """失败步骤评分为零""" reflector = RuleBasedStepReflector() step = make_step() result = make_step_result( status=PlanStepStatus.FAILED, error="Something went wrong", ) score, suggestions = await reflector.reflect_step(step, result) assert score == 0.0 assert len(suggestions) > 0 @pytest.mark.asyncio async def test_retry_suggestion(self): """有重试的步骤生成改进建议""" reflector = RuleBasedStepReflector() step = make_step() result = make_step_result( result={"data": "test"}, retry_count=2, ) score, suggestions = await reflector.reflect_step(step, result) assert any("retries" in s.lower() or "retry" in s.lower() for s in suggestions) @pytest.mark.asyncio async def test_slow_step_suggestion(self): """慢步骤生成优化建议""" reflector = RuleBasedStepReflector() step = make_step() result = make_step_result( result={"data": "test"}, duration_ms=120000, # 120s ) score, suggestions = await reflector.reflect_step(step, result) assert any("slow" in s.lower() or "optimizing" in s.lower() for s in suggestions) @pytest.mark.asyncio async def test_timeout_error_suggestion(self): """超时错误生成超时相关建议""" reflector = RuleBasedStepReflector() step = make_step() result = make_step_result( status=PlanStepStatus.FAILED, error="Step timed out after 300s", ) score, suggestions = await reflector.reflect_step(step, result) assert any("timed out" in s.lower() or "timeout" in s.lower() for s in suggestions) # --- PlanChecker.check_step Tests --- class TestPlanCheckerCheckStep: """PlanChecker 单步检查""" @pytest.mark.asyncio async def test_check_step_pass(self): """步骤通过检查""" checker = PlanChecker() step = make_step() result = make_step_result(result={"data": "test"}) check = await checker.check_step(step, result) assert check.status == CheckStatus.PASS assert check.quality_score > 0.5 @pytest.mark.asyncio async def test_check_step_fail_quality_gate(self): """步骤不通过质量门控""" config = QualityGateConfig(required_fields=["missing_field"]) checker = PlanChecker(quality_gate_config=config) step = make_step() result = make_step_result(result={"data": "test"}) check = await checker.check_step(step, result) assert check.status == CheckStatus.FAIL @pytest.mark.asyncio async def test_check_step_skip_for_failed_status(self): """失败步骤跳过检查""" checker = PlanChecker() step = make_step() result = make_step_result(status=PlanStepStatus.FAILED, error="error") check = await checker.check_step(step, result) assert check.status == CheckStatus.SKIP @pytest.mark.asyncio async def test_check_step_records_result(self): """检查结果被记录""" checker = PlanChecker() step = make_step(step_id="s1") result = make_step_result(step_id="s1", result={"data": "test"}) await checker.check_step(step, result) assert "s1" in checker._check_results @pytest.mark.asyncio async def test_check_step_with_step_specific_config(self): """步骤独立质量配置""" step_configs = { "s0": QualityGateConfig(required_fields=["report"]), "s1": QualityGateConfig(required_fields=["analysis"]), } checker = PlanChecker(step_quality_configs=step_configs) # s0 缺少 report step0 = make_step(step_id="s0") result0 = make_step_result(step_id="s0", result={"data": "test"}) check0 = await checker.check_step(step0, result0) assert check0.status == CheckStatus.FAIL # s1 有 analysis step1 = make_step(step_id="s1") result1 = make_step_result(step_id="s1", result={"analysis": "result"}) check1 = await checker.check_step(step1, result1) assert check1.status == CheckStatus.PASS @pytest.mark.asyncio async def test_check_step_with_custom_validator(self): """自定义校验器""" def validator(result): if result and result.get("format") == "json": return (True, "") return (False, "Expected JSON format") checker = PlanChecker(custom_validator=validator) step = make_step() # 格式正确 result_ok = make_step_result(result={"format": "json", "data": {}}) check_ok = await checker.check_step(step, result_ok) assert check_ok.status == CheckStatus.PASS # 格式不正确 result_bad = make_step_result(result={"format": "xml", "data": {}}) check_bad = await checker.check_step(step, result_bad) assert check_bad.status == CheckStatus.FAIL # --- PlanChecker.should_retry / should_request_human Tests --- class TestPlanCheckerRetryAndHuman: """重试与人工介入判断""" def test_should_retry_on_fail_within_limit(self): """检查不通过且重试次数未耗尽时应重试""" checker = PlanChecker(max_check_retries=2) check = CheckResult(step_id="s0", status=CheckStatus.FAIL, reason="quality low") assert checker.should_retry(check, 0) is True assert checker.should_retry(check, 1) is True def test_should_not_retry_on_pass(self): """检查通过时不应重试""" checker = PlanChecker(max_check_retries=2) check = CheckResult(step_id="s0", status=CheckStatus.PASS) assert checker.should_retry(check, 0) is False def test_should_not_retry_on_skip(self): """跳过检查时不应重试""" checker = PlanChecker(max_check_retries=2) check = CheckResult(step_id="s0", status=CheckStatus.SKIP) assert checker.should_retry(check, 0) is False def test_should_not_retry_exhausted(self): """重试次数耗尽时不应重试""" checker = PlanChecker(max_check_retries=1) check = CheckResult(step_id="s0", status=CheckStatus.FAIL, reason="quality low") assert checker.should_retry(check, 1) is False def test_should_request_human_on_exhausted_retries(self): """重试耗尽后应请求人工介入""" checker = PlanChecker(max_check_retries=1) check = CheckResult(step_id="s0", status=CheckStatus.FAIL, reason="quality low") assert checker.should_request_human(check, 1) is True def test_should_not_request_human_on_pass(self): """检查通过时不应请求人工介入""" checker = PlanChecker(max_check_retries=1) check = CheckResult(step_id="s0", status=CheckStatus.PASS) assert checker.should_request_human(check, 0) is False def test_should_not_request_human_within_retries(self): """重试次数未耗尽时不应请求人工介入""" checker = PlanChecker(max_check_retries=2) check = CheckResult(step_id="s0", status=CheckStatus.FAIL, reason="quality low") assert checker.should_request_human(check, 0) is False # --- PlanChecker.review_plan Tests --- class TestPlanCheckerReviewPlan: """复盘报告生成""" @pytest.mark.asyncio async def test_all_steps_pass_review(self): """所有步骤通过检查 → 生成复盘报告""" checker = PlanChecker() step0 = make_step(step_id="s0", name="Search") step1 = make_step(step_id="s1", name="Analyze") plan = make_plan(steps=[step0, step1]) plan_result = make_plan_result( step_results={ "s0": make_step_result(step_id="s0", result={"data": "A"}), "s1": make_step_result(step_id="s1", result={"data": "B"}), }, ) # 先检查每步 await checker.check_step(step0, plan_result.step_results["s0"]) await checker.check_step(step1, plan_result.step_results["s1"]) # 复盘 report = await checker.review_plan(plan, plan_result) assert report.outcome == "success" assert "s0" in report.success_path assert "s1" in report.success_path assert len(report.failure_reasons) == 0 assert report.success_rate == 1.0 @pytest.mark.asyncio async def test_partial_failure_review(self): """部分步骤失败 → 复盘报告包含失败原因""" checker = PlanChecker() step0 = make_step(step_id="s0", name="Search") step1 = make_step(step_id="s1", name="Analyze") plan = make_plan(steps=[step0, step1]) plan_result = make_plan_result( step_results={ "s0": make_step_result(step_id="s0", result={"data": "A"}), "s1": make_step_result( step_id="s1", status=PlanStepStatus.FAILED, error="Agent crashed", ), }, ) await checker.check_step(step0, plan_result.step_results["s0"]) await checker.check_step(step1, plan_result.step_results["s1"]) report = await checker.review_plan(plan, plan_result) assert report.outcome == "partial" assert "s0" in report.success_path assert len(report.failure_reasons) > 0 assert any("s1" in r for r in report.failure_reasons) assert report.success_rate == 0.5 @pytest.mark.asyncio async def test_all_failure_review(self): """全部步骤失败 → 复盘报告 outcome 为 failure""" checker = PlanChecker() step0 = make_step(step_id="s0", name="Search") plan = make_plan(steps=[step0]) plan_result = make_plan_result( step_results={ "s0": make_step_result( step_id="s0", status=PlanStepStatus.FAILED, error="Agent unavailable", ), }, ) await checker.check_step(step0, plan_result.step_results["s0"]) report = await checker.review_plan(plan, plan_result) assert report.outcome == "failure" assert len(report.failure_reasons) > 0 @pytest.mark.asyncio async def test_review_report_contains_duration_distribution(self): """复盘报告包含耗时分布""" checker = PlanChecker() step0 = make_step(step_id="s0") step1 = make_step(step_id="s1") plan = make_plan(steps=[step0, step1]) plan_result = make_plan_result( step_results={ "s0": make_step_result(step_id="s0", result={"data": "A"}, duration_ms=100.0), "s1": make_step_result(step_id="s1", result={"data": "B"}, duration_ms=200.0), }, ) await checker.check_step(step0, plan_result.step_results["s0"]) await checker.check_step(step1, plan_result.step_results["s1"]) report = await checker.review_plan(plan, plan_result) assert "s0" in report.duration_distribution assert "s1" in report.duration_distribution assert report.duration_distribution["s0"] == 100.0 assert report.duration_distribution["s1"] == 200.0 @pytest.mark.asyncio async def test_review_report_contains_quality_scores(self): """复盘报告包含质量评分""" checker = PlanChecker() step0 = make_step(step_id="s0") plan = make_plan(steps=[step0]) plan_result = make_plan_result( step_results={ "s0": make_step_result(step_id="s0", result={"data": "A"}), }, ) await checker.check_step(step0, plan_result.step_results["s0"]) report = await checker.review_plan(plan, plan_result) assert "s0" in report.quality_scores assert report.quality_scores["s0"] > 0 @pytest.mark.asyncio async def test_review_report_contains_optimization_tips(self): """复盘报告包含优化建议""" checker = PlanChecker() step0 = make_step(step_id="s0") plan = make_plan(steps=[step0]) plan_result = make_plan_result( step_results={ "s0": make_step_result( step_id="s0", result={"data": "A"}, retry_count=2, duration_ms=120000, ), }, ) await checker.check_step(step0, plan_result.step_results["s0"]) report = await checker.review_plan(plan, plan_result) assert len(report.optimization_tips) > 0 @pytest.mark.asyncio async def test_review_report_to_dict(self): """复盘报告可序列化为字典""" checker = PlanChecker() step0 = make_step(step_id="s0") plan = make_plan(steps=[step0]) plan_result = make_plan_result( step_results={ "s0": make_step_result(step_id="s0", result={"data": "A"}), }, ) await checker.check_step(step0, plan_result.step_results["s0"]) report = await checker.review_plan(plan, plan_result) d = report.to_dict() assert d["plan_id"] == "p1" assert d["outcome"] == "success" assert isinstance(d["success_path"], list) assert isinstance(d["failure_reasons"], list) assert isinstance(d["optimization_tips"], list) # --- PlanChecker + ExperienceStore Integration Tests --- class TestPlanCheckerExperienceStore: """复盘结果写入经验库""" @pytest.mark.asyncio async def test_experience_written_on_review(self): """复盘结果写入 ExperienceStore""" store = InMemoryExperienceStore() checker = PlanChecker(experience_store=store) step0 = make_step(step_id="s0") plan = make_plan(steps=[step0]) plan_result = make_plan_result( step_results={ "s0": make_step_result(step_id="s0", result={"data": "A"}), }, ) await checker.check_step(step0, plan_result.step_results["s0"]) report = await checker.review_plan( plan, plan_result, task_type="test_task", goal="test goal" ) # 验证经验已写入 results = await store.search("test_task", top_k=10) assert len(results) == 1 assert results[0].outcome == "success" assert results[0].task_type == "test_task" assert results[0].goal == "test goal" @pytest.mark.asyncio async def test_failure_experience_written(self): """失败经验写入后可检索到""" store = InMemoryExperienceStore() checker = PlanChecker(experience_store=store) step0 = make_step(step_id="s0") plan = make_plan(steps=[step0]) plan_result = make_plan_result( step_results={ "s0": make_step_result( step_id="s0", status=PlanStepStatus.FAILED, error="Agent crashed", ), }, ) await checker.check_step(step0, plan_result.step_results["s0"]) report = await checker.review_plan( plan, plan_result, task_type="risky_task", goal="risky goal" ) # 验证失败经验已写入 results = await store.search("risky_task", top_k=10) assert len(results) == 1 assert results[0].outcome == "failure" assert len(results[0].failure_reasons) > 0 @pytest.mark.asyncio async def test_experience_searchable_by_failure_reason(self): """AE3: 错误经验写入后,后续任务能检索到避坑预警""" store = InMemoryExperienceStore() # 第一次:记录失败经验 checker = PlanChecker(experience_store=store) step0 = make_step(step_id="s0") plan = make_plan(steps=[step0]) plan_result = make_plan_result( step_results={ "s0": make_step_result( step_id="s0", status=PlanStepStatus.FAILED, error="Database connection timeout", ), }, ) await checker.check_step(step0, plan_result.step_results["s0"]) await checker.review_plan( plan, plan_result, task_type="db_query", goal="query database" ) # 第二次:搜索相关经验 results = await store.search("database timeout", top_k=5, task_type="db_query") assert len(results) >= 1 assert results[0].outcome == "failure" assert any("timeout" in r.lower() for r in results[0].failure_reasons) @pytest.mark.asyncio async def test_no_experience_store_still_works(self): """无 ExperienceStore 时复盘仍正常工作""" checker = PlanChecker() # 无 experience_store step0 = make_step(step_id="s0") plan = make_plan(steps=[step0]) plan_result = make_plan_result( step_results={ "s0": make_step_result(step_id="s0", result={"data": "A"}), }, ) await checker.check_step(step0, plan_result.step_results["s0"]) report = await checker.review_plan(plan, plan_result) assert report.outcome == "success" assert report.plan_id == "p1" @pytest.mark.asyncio async def test_experience_store_error_does_not_crash(self): """ExperienceStore 写入异常不影响复盘""" class FailingStore: async def record_experience(self, experience): raise RuntimeError("Store is down") checker = PlanChecker(experience_store=FailingStore()) step0 = make_step(step_id="s0") plan = make_plan(steps=[step0]) plan_result = make_plan_result( step_results={ "s0": make_step_result(step_id="s0", result={"data": "A"}), }, ) await checker.check_step(step0, plan_result.step_results["s0"]) # 不应抛出异常 report = await checker.review_plan(plan, plan_result) assert report.outcome == "success" # --- PlanChecker + PlanExecutor Integration Pattern Tests --- class TestPlanCheckerExecutorIntegration: """PlanChecker 与 PlanExecutor 集成模式""" @pytest.mark.asyncio async def test_make_step_complete_callback(self): """make_step_complete_callback 创建的回调正确记录检查结果""" checker = PlanChecker() callback = checker.make_step_complete_callback() step = make_step(step_id="s0") result = make_step_result(step_id="s0", result={"data": "test"}) await callback(step, result) assert "s0" in checker._check_results assert checker._check_results["s0"].status == CheckStatus.PASS @pytest.mark.asyncio async def test_full_check_review_cycle(self): """完整的 检查→复盘→经验写入 闭环""" store = InMemoryExperienceStore() checker = PlanChecker(experience_store=store) # 模拟 3 步计划 step0 = make_step(step_id="s0", name="Search") step1 = make_step(step_id="s1", name="Analyze") step2 = make_step(step_id="s2", name="Report") plan = make_plan(steps=[step0, step1, step2]) plan_result = make_plan_result( step_results={ "s0": make_step_result(step_id="s0", result={"search": "data"}, duration_ms=500), "s1": make_step_result(step_id="s1", result={"analysis": "result"}, duration_ms=1500), "s2": make_step_result(step_id="s2", result={"report": "done"}, duration_ms=800), }, ) # 逐步检查 await checker.check_step(step0, plan_result.step_results["s0"]) await checker.check_step(step1, plan_result.step_results["s1"]) await checker.check_step(step2, plan_result.step_results["s2"]) # 复盘 report = await checker.review_plan( plan, plan_result, task_type="analysis", goal="analyze data" ) # 验证复盘报告 assert report.outcome == "success" assert len(report.success_path) == 3 assert report.success_rate == 1.0 assert len(report.duration_distribution) == 3 # 验证经验已写入 results = await store.search("analysis", top_k=10) assert len(results) == 1 assert results[0].success_rate == 1.0 # --- PlanChecker Reset Tests --- class TestPlanCheckerReset: """重置内部状态""" @pytest.mark.asyncio async def test_reset_clears_check_results(self): """reset 清除检查结果""" checker = PlanChecker() step = make_step(step_id="s0") result = make_step_result(result={"data": "test"}) await checker.check_step(step, result) assert len(checker._check_results) > 0 checker.reset() assert len(checker._check_results) == 0 @pytest.mark.asyncio async def test_reset_allows_new_check_cycle(self): """重置后可开始新一轮检查""" checker = PlanChecker() step = make_step(step_id="s0") # 第一轮 result1 = make_step_result(result={"data": "test1"}) await checker.check_step(step, result1) checker.reset() # 第二轮 result2 = make_step_result(result={"data": "test2"}) check = await checker.check_step(step, result2) assert check.status == CheckStatus.PASS # --- PlanChecker without LLM Tests --- class TestPlanCheckerWithoutLLM: """PlanChecker 无 LLM 回退到规则检查""" @pytest.mark.asyncio async def test_works_without_llm(self): """无 LLM 时使用 RuleBasedStepReflector""" checker = PlanChecker() # 默认使用 RuleBasedStepReflector step = make_step() result = make_step_result(result={"data": "test"}) check = await checker.check_step(step, result) assert check.status == CheckStatus.PASS assert check.quality_score > 0 @pytest.mark.asyncio async def test_custom_reflector(self): """自定义反思器""" class CustomReflector: async def reflect_step(self, step, exec_result): return (0.9, ["Custom suggestion"]) checker = PlanChecker(reflector=CustomReflector()) step = make_step() result = make_step_result(result={"data": "test"}) check = await checker.check_step(step, result) assert check.status == CheckStatus.PASS assert "Custom suggestion" in check.details.get("reflector_suggestions", []) # --- Edge Cases --- class TestPlanCheckerEdgeCases: """边界情况""" @pytest.mark.asyncio async def test_empty_plan_review(self): """空计划复盘""" checker = PlanChecker() plan = make_plan(steps=[]) plan_result = make_plan_result(step_results={}) report = await checker.review_plan(plan, plan_result) assert report.outcome == "success" assert report.success_rate == 0.0 @pytest.mark.asyncio async def test_all_skipped_steps_review(self): """全部跳过步骤的复盘""" checker = PlanChecker() step0 = make_step(step_id="s0") step1 = make_step(step_id="s1") plan = make_plan(steps=[step0, step1]) plan_result = make_plan_result( step_results={ "s0": make_step_result( step_id="s0", status=PlanStepStatus.SKIPPED, error="Dependency failed", ), "s1": make_step_result( step_id="s1", status=PlanStepStatus.SKIPPED, error="Dependency failed", ), }, ) report = await checker.review_plan(plan, plan_result) assert report.outcome == "failure" assert len(report.failure_reasons) > 0 @pytest.mark.asyncio async def test_quality_threshold_triggers_fail(self): """质量评分低于阈值触发不通过""" class LowScoreReflector: async def reflect_step(self, step, exec_result): return (0.2, ["Low quality output"]) checker = PlanChecker( reflector=LowScoreReflector(), quality_threshold=0.5, ) step = make_step() result = make_step_result(result={"data": "test"}) check = await checker.check_step(step, result) # 综合评分 = 0.4 * 1.0 (gate) + 0.6 * 0.2 (reflector) = 0.52 # 如果 reflector 评分很低,可能低于阈值 assert check.quality_score < 1.0 @pytest.mark.asyncio async def test_reflector_exception_handled(self): """Reflector 异常不影响检查""" class CrashingReflector: async def reflect_step(self, step, exec_result): raise RuntimeError("Reflector crashed") checker = PlanChecker(reflector=CrashingReflector()) step = make_step() result = make_step_result(result={"data": "test"}) check = await checker.check_step(step, result) # 应该回退到 gate 的评分 assert check.status in (CheckStatus.PASS, CheckStatus.FAIL) @pytest.mark.asyncio async def test_multiple_quality_failures_in_review(self): """多个步骤质量检查不通过,复盘报告汇总所有原因""" config = QualityGateConfig(required_fields=["report"]) checker = PlanChecker(quality_gate_config=config) step0 = make_step(step_id="s0") step1 = make_step(step_id="s1") plan = make_plan(steps=[step0, step1]) plan_result = make_plan_result( step_results={ "s0": make_step_result(step_id="s0", result={"data": "no report"}), "s1": make_step_result(step_id="s1", result={"data": "also no report"}), }, ) await checker.check_step(step0, plan_result.step_results["s0"]) await checker.check_step(step1, plan_result.step_results["s1"]) report = await checker.review_plan(plan, plan_result) # 质量检查不通过的原因应出现在 failure_reasons 中 quality_fail_reasons = [ r for r in report.failure_reasons if "quality check failed" in r ] assert len(quality_fail_reasons) == 2