fischer-agentkit/tests/unit/test_skill_pipeline.py

456 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""SkillPipeline 单元测试"""
import pytest
from agentkit.skills.pipeline import SkillPipeline
from agentkit.skills.registry import SkillRegistry
# ---- Helpers ----
async def _mock_agent_factory(skill_name: str, input_data: dict) -> dict:
"""Mock agent factory: 返回包含 skill_name 和输入数据的字典"""
return {"skill": skill_name, "processed": True, **input_data}
async def _failing_agent_factory(skill_name: str, input_data: dict) -> dict:
"""Mock agent factory: 特定 skill 抛出异常"""
if skill_name == "failing_skill":
raise RuntimeError("Skill execution failed")
return {"skill": skill_name, "processed": True, **input_data}
async def _transform_agent_factory(skill_name: str, input_data: dict) -> dict:
"""Mock agent factory: 根据技能名做不同转换"""
if skill_name == "extract":
return {"title": input_data.get("raw_text", "").split()[0], "score": 0.9}
if skill_name == "enrich":
return {"title": input_data.get("title", ""), "enriched": True}
if skill_name == "format":
return {"result": f"Formatted: {input_data.get('title', '')}", "enriched": input_data.get("enriched", False)}
return {"skill": skill_name, **input_data}
# ---- SkillPipeline 核心测试 ----
class TestSkillPipelineSequential:
"""顺序执行测试"""
@pytest.mark.asyncio
async def test_sequential_three_skills(self):
"""3 个 Skill 顺序执行,输出在步骤间传递"""
pipeline = SkillPipeline(
name="seq_pipeline",
steps=[
{"skill_name": "skill_a"},
{"skill_name": "skill_b"},
{"skill_name": "skill_c"},
],
)
result = await pipeline.execute(
input_data={"query": "hello"},
agent_factory=_mock_agent_factory,
)
assert result["pipeline"] == "seq_pipeline"
assert len(result["steps"]) == 3
assert result["steps"][0]["status"] == "success"
assert result["steps"][0]["skill"] == "skill_a"
assert result["steps"][1]["status"] == "success"
assert result["steps"][1]["skill"] == "skill_b"
assert result["steps"][2]["status"] == "success"
assert result["steps"][2]["skill"] == "skill_c"
# 验证输出传递:第二步输入包含第一步输出
assert result["steps"][1]["output"]["query"] == "hello"
assert result["steps"][1]["output"]["processed"] is True
@pytest.mark.asyncio
async def test_output_passes_between_steps(self):
"""输出在步骤间正确传递"""
pipeline = SkillPipeline(
name="transform_pipeline",
steps=[
{"skill_name": "extract"},
{"skill_name": "enrich"},
{"skill_name": "format"},
],
)
result = await pipeline.execute(
input_data={"raw_text": "Hello World"},
agent_factory=_transform_agent_factory,
)
# 第一步: extract → {"title": "Hello", "score": 0.9}
assert result["steps"][0]["output"]["title"] == "Hello"
assert result["steps"][0]["output"]["score"] == 0.9
# 第二步: enrich → {"title": "Hello", "enriched": True}
assert result["steps"][1]["output"]["title"] == "Hello"
assert result["steps"][1]["output"]["enriched"] is True
# 第三步: format → {"result": "Formatted: Hello", "enriched": True}
assert result["steps"][2]["output"]["result"] == "Formatted: Hello"
assert result["steps"][2]["output"]["enriched"] is True
# final_output 是最后一步的输出
assert result["final_output"]["result"] == "Formatted: Hello"
class TestSkillPipelineConditional:
"""条件分支测试"""
@pytest.mark.asyncio
async def test_condition_met_executes_step(self):
"""条件满足时执行步骤"""
pipeline = SkillPipeline(
name="cond_pipeline",
steps=[
{"skill_name": "skill_a"},
{"skill_name": "skill_b", "condition": "status == 'ok'"},
],
)
async def factory(name, data):
if name == "skill_a":
return {"status": "ok", "data": "test"}
return {"skill": name, **data}
result = await pipeline.execute(input_data={}, agent_factory=factory)
assert result["steps"][0]["status"] == "success"
assert result["steps"][1]["status"] == "success"
@pytest.mark.asyncio
async def test_condition_not_met_skips_step(self):
"""条件不满足时跳过步骤"""
pipeline = SkillPipeline(
name="cond_pipeline_skip",
steps=[
{"skill_name": "skill_a"},
{"skill_name": "skill_b", "condition": "status == 'ok'"},
],
)
async def factory(name, data):
if name == "skill_a":
return {"status": "error", "data": "test"}
return {"skill": name, **data}
result = await pipeline.execute(input_data={}, agent_factory=factory)
assert result["steps"][0]["status"] == "success"
assert result["steps"][1]["status"] == "skipped"
@pytest.mark.asyncio
async def test_numeric_condition(self):
"""数值条件判断"""
pipeline = SkillPipeline(
name="num_cond_pipeline",
steps=[
{"skill_name": "skill_a"},
{"skill_name": "skill_b", "condition": "score > 0.5"},
],
)
async def factory(name, data):
if name == "skill_a":
return {"score": 0.9}
return {"skill": name, **data}
result = await pipeline.execute(input_data={}, agent_factory=factory)
assert result["steps"][1]["status"] == "success"
@pytest.mark.asyncio
async def test_numeric_condition_not_met(self):
"""数值条件不满足时跳过"""
pipeline = SkillPipeline(
name="num_cond_pipeline_fail",
steps=[
{"skill_name": "skill_a"},
{"skill_name": "skill_b", "condition": "score > 0.5"},
],
)
async def factory(name, data):
if name == "skill_a":
return {"score": 0.3}
return {"skill": name, **data}
result = await pipeline.execute(input_data={}, agent_factory=factory)
assert result["steps"][1]["status"] == "skipped"
class TestSkillPipelineFailure:
"""Pipeline 失败测试"""
@pytest.mark.asyncio
async def test_step_failure_stops_pipeline(self):
"""步骤失败时中止 Pipeline"""
pipeline = SkillPipeline(
name="fail_pipeline",
steps=[
{"skill_name": "skill_a"},
{"skill_name": "failing_skill"},
{"skill_name": "skill_c"},
],
)
result = await pipeline.execute(
input_data={"query": "test"},
agent_factory=_failing_agent_factory,
)
assert len(result["steps"]) == 2
assert result["steps"][0]["status"] == "success"
assert result["steps"][1]["status"] == "failed"
assert result["steps"][1]["skill"] == "failing_skill"
assert "Skill execution failed" in result["steps"][1]["error"]
assert result["success"] is False
assert result["final_output"] is None
@pytest.mark.asyncio
async def test_no_registry_no_factory_marks_step_failed(self):
"""无 registry 也无 factory 时步骤标记为 failed"""
pipeline = SkillPipeline(
name="no_exec_pipeline",
steps=[{"skill_name": "skill_a"}],
)
result = await pipeline.execute(input_data={})
assert len(result["steps"]) == 1
assert result["steps"][0]["status"] == "failed"
assert "no agent_factory or skill_registry" in result["steps"][0]["error"]
assert result["success"] is False
assert result["final_output"] is None
class TestSkillPipelineEmpty:
"""空 Pipeline 测试"""
@pytest.mark.asyncio
async def test_empty_pipeline(self):
"""空步骤列表返回空结果"""
pipeline = SkillPipeline(name="empty_pipeline", steps=[])
result = await pipeline.execute(input_data={"key": "value"})
assert result["pipeline"] == "empty_pipeline"
assert result["steps"] == []
assert result["final_output"] == {"key": "value"}
assert result["success"] is True
class TestSkillPipelineInputMapping:
"""输入映射测试"""
@pytest.mark.asyncio
async def test_input_mapping(self):
"""将上一步输出字段映射到下一步输入字段"""
pipeline = SkillPipeline(
name="mapping_pipeline",
steps=[
{"skill_name": "extract"},
{
"skill_name": "enrich",
"input_mapping": {"title": "title"},
},
],
)
result = await pipeline.execute(
input_data={"raw_text": "Hello World"},
agent_factory=_transform_agent_factory,
)
# 第一步输出 {"title": "Hello", "score": 0.9}
# 映射后第二步输入 {"title": "Hello"}
assert result["steps"][1]["output"]["title"] == "Hello"
assert result["steps"][1]["output"]["enriched"] is True
@pytest.mark.asyncio
async def test_nested_path_mapping(self):
"""嵌套路径映射"""
pipeline = SkillPipeline(
name="nested_mapping",
steps=[
{"skill_name": "skill_a"},
{
"skill_name": "skill_b",
"input_mapping": {"name": "user.name"},
},
],
)
async def factory(name, data):
if name == "skill_a":
return {"user": {"name": "Alice"}, "age": 30}
return {"skill": name, **data}
result = await pipeline.execute(input_data={}, agent_factory=factory)
# 第二步输入应为 {"name": "Alice"}
assert result["steps"][1]["output"]["name"] == "Alice"
@pytest.mark.asyncio
async def test_mapping_missing_field_omitted(self):
"""映射字段不存在时省略该字段"""
pipeline = SkillPipeline(
name="missing_mapping",
steps=[
{"skill_name": "skill_a"},
{
"skill_name": "skill_b",
"input_mapping": {"title": "nonexistent.field"},
},
],
)
async def factory(name, data):
if name == "skill_a":
return {"other": "data"}
return {"skill": name, **data}
result = await pipeline.execute(input_data={}, agent_factory=factory)
# 映射字段不存在,第二步输入为空字典
assert result["steps"][1]["status"] == "success"
class TestSkillPipelineRegistry:
"""SkillPipeline 在 SkillRegistry 中的注册与查询"""
def test_register_pipeline(self):
registry = SkillRegistry()
pipeline = SkillPipeline(name="test_pipe", steps=[{"skill_name": "a"}])
registry.register_pipeline(pipeline)
assert registry.get_pipeline("test_pipe") is pipeline
def test_get_pipeline_not_found(self):
registry = SkillRegistry()
assert registry.get_pipeline("nonexistent") is None
def test_list_pipelines(self):
registry = SkillRegistry()
registry.register_pipeline(SkillPipeline(name="p1", steps=[]))
registry.register_pipeline(SkillPipeline(name="p2", steps=[]))
names = registry.list_pipelines()
assert "p1" in names
assert "p2" in names
def test_list_pipelines_empty(self):
registry = SkillRegistry()
assert registry.list_pipelines() == []
def test_unregister_pipeline(self):
registry = SkillRegistry()
registry.register_pipeline(SkillPipeline(name="p1", steps=[]))
registry.unregister_pipeline("p1")
assert registry.get_pipeline("p1") is None
def test_unregister_pipeline_nonexistent(self):
"""注销不存在的 Pipeline 不抛异常"""
registry = SkillRegistry()
registry.unregister_pipeline("nonexistent")
def test_register_pipeline_overwrites(self):
"""同名 Pipeline 覆盖注册"""
registry = SkillRegistry()
p1 = SkillPipeline(name="dup", steps=[{"skill_name": "a"}])
p2 = SkillPipeline(name="dup", steps=[{"skill_name": "b"}])
registry.register_pipeline(p1)
registry.register_pipeline(p2)
assert registry.get_pipeline("dup") is p2
class TestSkillPipelineAPI:
"""Pipeline API 端点测试"""
@pytest.fixture
def app(self):
from agentkit.server.app import create_app
application = create_app()
return application
@pytest.fixture
async def client(self, app):
from httpx import ASGITransport, AsyncClient
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as c:
yield c
@pytest.mark.asyncio
async def test_create_pipeline(self, client):
response = await client.post(
"/api/v1/skills/pipelines",
json={
"name": "test_pipe",
"steps": [
{"skill_name": "skill_a"},
{"skill_name": "skill_b"},
],
},
)
assert response.status_code == 201
data = response.json()
assert data["name"] == "test_pipe"
assert len(data["steps"]) == 2
@pytest.mark.asyncio
async def test_create_pipeline_missing_skill_name(self, client):
response = await client.post(
"/api/v1/skills/pipelines",
json={
"name": "bad_pipe",
"steps": [{"no_skill_name": "oops"}],
},
)
assert response.status_code == 422
@pytest.mark.asyncio
async def test_list_pipelines_empty(self, client):
response = await client.get("/api/v1/skills/pipelines")
assert response.status_code == 200
assert response.json() == []
@pytest.mark.asyncio
async def test_list_pipelines_after_create(self, client):
await client.post(
"/api/v1/skills/pipelines",
json={"name": "pipe1", "steps": [{"skill_name": "a"}]},
)
response = await client.get("/api/v1/skills/pipelines")
assert response.status_code == 200
assert "pipe1" in response.json()
@pytest.mark.asyncio
async def test_execute_pipeline_not_found(self, client):
response = await client.post(
"/api/v1/skills/pipelines/nonexistent/execute",
json={"input_data": {}},
)
assert response.status_code == 404
@pytest.mark.asyncio
async def test_execute_pipeline_no_executor(self, client):
"""Pipeline 存在但 registry 中无 Skill 时步骤标记为 failed"""
await client.post(
"/api/v1/skills/pipelines",
json={"name": "exec_pipe", "steps": [{"skill_name": "missing_skill"}]},
)
response = await client.post(
"/api/v1/skills/pipelines/exec_pipe/execute",
json={"input_data": {"query": "test"}},
)
# Pipeline 执行返回 200但步骤标记为 failed
assert response.status_code == 200
data = response.json()
assert data["steps"][0]["status"] == "failed"