363 lines
13 KiB
Python
363 lines
13 KiB
Python
"""OrganizationContext 与 AgentDiscovery 单元测试"""
|
||
|
||
import pytest
|
||
|
||
from agentkit.org.context import AgentProfile, OrganizationContext
|
||
from agentkit.org.discovery import AgentDiscovery
|
||
from agentkit.skills.base import Skill, SkillConfig
|
||
from agentkit.skills.registry import SkillRegistry
|
||
|
||
|
||
# ---- Fixtures ----
|
||
|
||
|
||
@pytest.fixture
|
||
def org_context():
|
||
return OrganizationContext()
|
||
|
||
|
||
@pytest.fixture
|
||
def profile_rag():
|
||
return AgentProfile(
|
||
name="rag_agent",
|
||
agent_type="react",
|
||
capabilities=["rag", "search"],
|
||
skills=["rag_skill"],
|
||
execution_mode="react",
|
||
model="gpt-4",
|
||
)
|
||
|
||
|
||
@pytest.fixture
|
||
def profile_terminal():
|
||
return AgentProfile(
|
||
name="terminal_agent",
|
||
agent_type="react",
|
||
capabilities=["terminal", "shell"],
|
||
skills=["terminal_skill"],
|
||
execution_mode="react",
|
||
model="gpt-4",
|
||
)
|
||
|
||
|
||
@pytest.fixture
|
||
def profile_coder():
|
||
return AgentProfile(
|
||
name="coder_agent",
|
||
agent_type="rewoo",
|
||
capabilities=["rag", "terminal", "code_gen"],
|
||
skills=["coder_skill"],
|
||
execution_mode="rewoo",
|
||
model="claude-3",
|
||
max_concurrency=3,
|
||
)
|
||
|
||
|
||
# ---- OrganizationContext: 注册与注销 ----
|
||
|
||
|
||
class TestOrganizationContextRegister:
|
||
"""注册与注销 Agent 档案"""
|
||
|
||
def test_register_agent(self, org_context, profile_rag):
|
||
org_context.register_agent(profile_rag)
|
||
assert org_context.get_agent_profile("rag_agent") is profile_rag
|
||
|
||
def test_unregister_agent(self, org_context, profile_rag):
|
||
org_context.register_agent(profile_rag)
|
||
org_context.unregister_agent("rag_agent")
|
||
assert org_context.get_agent_profile("rag_agent") is None
|
||
|
||
def test_unregister_nonexistent_no_error(self, org_context):
|
||
org_context.unregister_agent("nonexistent") # should not raise
|
||
|
||
def test_register_overwrites_existing(self, org_context, profile_rag):
|
||
org_context.register_agent(profile_rag)
|
||
updated = AgentProfile(
|
||
name="rag_agent",
|
||
agent_type="react",
|
||
capabilities=["rag", "search", "summarize"],
|
||
skills=["rag_skill"],
|
||
)
|
||
org_context.register_agent(updated)
|
||
profile = org_context.get_agent_profile("rag_agent")
|
||
assert profile is updated
|
||
assert "summarize" in profile.capabilities
|
||
|
||
def test_list_agents(self, org_context, profile_rag, profile_terminal):
|
||
org_context.register_agent(profile_rag)
|
||
org_context.register_agent(profile_terminal)
|
||
agents = org_context.list_agents()
|
||
assert len(agents) == 2
|
||
names = {a.name for a in agents}
|
||
assert names == {"rag_agent", "terminal_agent"}
|
||
|
||
def test_list_agents_empty(self, org_context):
|
||
assert org_context.list_agents() == []
|
||
|
||
|
||
# ---- OrganizationContext: 能力查找 ----
|
||
|
||
|
||
class TestOrganizationContextFind:
|
||
"""find_best_agent() 测试"""
|
||
|
||
def test_find_by_required_capabilities(self, org_context, profile_rag, profile_terminal):
|
||
org_context.register_agent(profile_rag)
|
||
org_context.register_agent(profile_terminal)
|
||
result = org_context.find_best_agent(["rag"])
|
||
assert result is not None
|
||
assert result.name == "rag_agent"
|
||
|
||
def test_find_exact_capability_match(self, org_context, profile_rag, profile_coder):
|
||
org_context.register_agent(profile_rag)
|
||
org_context.register_agent(profile_coder)
|
||
# 两者都有 rag,但 coder 还有 terminal
|
||
result = org_context.find_best_agent(["rag", "terminal"])
|
||
assert result is not None
|
||
assert result.name == "coder_agent"
|
||
|
||
def test_find_no_match_returns_none(self, org_context, profile_rag):
|
||
org_context.register_agent(profile_rag)
|
||
result = org_context.find_best_agent(["nonexistent_capability"])
|
||
assert result is None
|
||
|
||
def test_find_excluded_agents_skipped(self, org_context, profile_rag, profile_coder):
|
||
org_context.register_agent(profile_rag)
|
||
org_context.register_agent(profile_coder)
|
||
result = org_context.find_best_agent(["rag"], exclude=["coder_agent"])
|
||
assert result is not None
|
||
assert result.name == "rag_agent"
|
||
|
||
def test_find_unavailable_agents_skipped(self, org_context, profile_rag, profile_coder):
|
||
org_context.register_agent(profile_rag)
|
||
org_context.register_agent(profile_coder)
|
||
org_context.set_availability("coder_agent", False)
|
||
result = org_context.find_best_agent(["rag", "terminal"])
|
||
assert result is None # coder is unavailable, rag doesn't have terminal
|
||
|
||
def test_find_best_agent_with_load_balancing(self, org_context):
|
||
low_load = AgentProfile(
|
||
name="low_load_agent",
|
||
agent_type="react",
|
||
capabilities=["rag"],
|
||
skills=["rag_skill"],
|
||
current_load=0,
|
||
)
|
||
high_load = AgentProfile(
|
||
name="high_load_agent",
|
||
agent_type="react",
|
||
capabilities=["rag"],
|
||
skills=["rag_skill"],
|
||
current_load=5,
|
||
)
|
||
org_context.register_agent(low_load)
|
||
org_context.register_agent(high_load)
|
||
result = org_context.find_best_agent(["rag"])
|
||
assert result is not None
|
||
assert result.name == "low_load_agent"
|
||
|
||
def test_find_capability_case_insensitive(self, org_context, profile_rag):
|
||
org_context.register_agent(profile_rag)
|
||
result = org_context.find_best_agent(["RAG"])
|
||
assert result is not None
|
||
assert result.name == "rag_agent"
|
||
|
||
|
||
# ---- OrganizationContext: 负载与可用性 ----
|
||
|
||
|
||
class TestOrganizationContextLoadAvailability:
|
||
"""update_load() 和 set_availability() 测试"""
|
||
|
||
def test_update_load_increase(self, org_context, profile_rag):
|
||
org_context.register_agent(profile_rag)
|
||
org_context.update_load("rag_agent", 3)
|
||
assert org_context.get_agent_profile("rag_agent").current_load == 3
|
||
|
||
def test_update_load_decrease(self, org_context, profile_rag):
|
||
org_context.register_agent(profile_rag)
|
||
org_context.update_load("rag_agent", 5)
|
||
org_context.update_load("rag_agent", -2)
|
||
assert org_context.get_agent_profile("rag_agent").current_load == 3
|
||
|
||
def test_update_load_never_below_zero(self, org_context, profile_rag):
|
||
org_context.register_agent(profile_rag)
|
||
org_context.update_load("rag_agent", -10)
|
||
assert org_context.get_agent_profile("rag_agent").current_load == 0
|
||
|
||
def test_update_load_nonexistent_no_error(self, org_context):
|
||
org_context.update_load("nonexistent", 1) # should not raise
|
||
|
||
def test_set_availability(self, org_context, profile_rag):
|
||
org_context.register_agent(profile_rag)
|
||
org_context.set_availability("rag_agent", False)
|
||
assert org_context.get_agent_profile("rag_agent").availability is False
|
||
org_context.set_availability("rag_agent", True)
|
||
assert org_context.get_agent_profile("rag_agent").availability is True
|
||
|
||
def test_set_availability_nonexistent_no_error(self, org_context):
|
||
org_context.set_availability("nonexistent", False) # should not raise
|
||
|
||
|
||
# ---- OrganizationContext: from_agent_pool ----
|
||
|
||
|
||
class TestOrganizationContextFromPool:
|
||
"""from_agent_pool() 测试"""
|
||
|
||
def test_from_agent_pool_builds_context(self):
|
||
"""从 AgentPool + SkillRegistry 构建 OrganizationContext"""
|
||
skill_registry = SkillRegistry()
|
||
skill_config = SkillConfig(
|
||
name="my_skill",
|
||
agent_type="react",
|
||
capabilities=["rag", "search"],
|
||
execution_mode="react",
|
||
llm={"model": "gpt-4"},
|
||
max_concurrency=2,
|
||
prompt={"identity": "Test"},
|
||
)
|
||
skill = Skill(config=skill_config)
|
||
skill_registry.register(skill)
|
||
|
||
# Mock agent_pool
|
||
class FakeAgentPool:
|
||
def list_agents(self):
|
||
return [{"name": "my_skill", "agent_type": "react"}]
|
||
|
||
ctx = OrganizationContext.from_agent_pool(FakeAgentPool(), skill_registry)
|
||
profile = ctx.get_agent_profile("my_skill")
|
||
assert profile is not None
|
||
assert profile.agent_type == "react"
|
||
assert "rag" in profile.capabilities
|
||
assert "search" in profile.capabilities
|
||
assert profile.execution_mode == "react"
|
||
assert profile.model == "gpt-4"
|
||
assert profile.max_concurrency == 2
|
||
|
||
def test_from_agent_pool_none_graceful(self):
|
||
"""agent_pool 或 skill_registry 为 None 时返回空上下文"""
|
||
ctx = OrganizationContext.from_agent_pool(None, SkillRegistry())
|
||
assert ctx.list_agents() == []
|
||
|
||
class FakePool:
|
||
def list_agents(self):
|
||
return []
|
||
|
||
ctx = OrganizationContext.from_agent_pool(FakePool(), None)
|
||
assert ctx.list_agents() == []
|
||
|
||
def test_from_agent_pool_agent_not_in_registry(self):
|
||
"""Agent 不在 skill_registry 中时使用默认值"""
|
||
skill_registry = SkillRegistry()
|
||
|
||
class FakeAgentPool:
|
||
def list_agents(self):
|
||
return [{"name": "unknown_agent", "agent_type": "direct"}]
|
||
|
||
ctx = OrganizationContext.from_agent_pool(FakeAgentPool(), skill_registry)
|
||
profile = ctx.get_agent_profile("unknown_agent")
|
||
assert profile is not None
|
||
assert profile.agent_type == "direct"
|
||
assert profile.capabilities == []
|
||
assert profile.execution_mode == "react" # default
|
||
assert profile.model == "default"
|
||
|
||
|
||
# ---- AgentDiscovery ----
|
||
|
||
|
||
class TestAgentDiscoveryByCapability:
|
||
"""discover_by_capability() 测试"""
|
||
|
||
def test_discover_by_capability(self, org_context, profile_rag, profile_coder):
|
||
org_context.register_agent(profile_rag)
|
||
org_context.register_agent(profile_coder)
|
||
discovery = AgentDiscovery(org_context)
|
||
result = discovery.discover_by_capability(["rag"])
|
||
names = {p.name for p in result}
|
||
assert names == {"rag_agent", "coder_agent"}
|
||
|
||
def test_discover_by_capability_no_match(self, org_context, profile_rag):
|
||
org_context.register_agent(profile_rag)
|
||
discovery = AgentDiscovery(org_context)
|
||
result = discovery.discover_by_capability(["nonexistent"])
|
||
assert result == []
|
||
|
||
|
||
class TestAgentDiscoveryByMode:
|
||
"""discover_by_execution_mode() 测试"""
|
||
|
||
def test_discover_by_execution_mode(self, org_context, profile_rag, profile_coder):
|
||
org_context.register_agent(profile_rag)
|
||
org_context.register_agent(profile_coder)
|
||
discovery = AgentDiscovery(org_context)
|
||
result = discovery.discover_by_execution_mode("rewoo")
|
||
assert len(result) == 1
|
||
assert result[0].name == "coder_agent"
|
||
|
||
def test_discover_by_execution_mode_no_match(self, org_context, profile_rag):
|
||
org_context.register_agent(profile_rag)
|
||
discovery = AgentDiscovery(org_context)
|
||
result = discovery.discover_by_execution_mode("plan_exec")
|
||
assert result == []
|
||
|
||
|
||
class TestAgentDiscoveryAvailable:
|
||
"""discover_available() 测试"""
|
||
|
||
def test_discover_available(self, org_context, profile_rag, profile_coder):
|
||
org_context.register_agent(profile_rag)
|
||
org_context.register_agent(profile_coder)
|
||
org_context.set_availability("coder_agent", False)
|
||
discovery = AgentDiscovery(org_context)
|
||
result = discovery.discover_available()
|
||
names = {p.name for p in result}
|
||
assert names == {"rag_agent"}
|
||
|
||
|
||
class TestAgentDiscoveryRecommend:
|
||
"""recommend_agent() 测试"""
|
||
|
||
def test_recommend_with_preferred_mode(self, org_context, profile_rag, profile_coder):
|
||
org_context.register_agent(profile_rag)
|
||
org_context.register_agent(profile_coder)
|
||
discovery = AgentDiscovery(org_context)
|
||
result = discovery.recommend_agent(["rag"], preferred_mode="rewoo")
|
||
assert result is not None
|
||
assert result.name == "coder_agent"
|
||
|
||
def test_recommend_without_preferred_mode(self, org_context, profile_rag, profile_coder):
|
||
org_context.register_agent(profile_rag)
|
||
org_context.register_agent(profile_coder)
|
||
discovery = AgentDiscovery(org_context)
|
||
result = discovery.recommend_agent(["rag"])
|
||
assert result is not None
|
||
# Both have rag, should pick lower load
|
||
assert result.current_load == 0
|
||
|
||
def test_recommend_fallback_when_no_capability_match(self, org_context, profile_rag):
|
||
org_context.register_agent(profile_rag)
|
||
discovery = AgentDiscovery(org_context)
|
||
result = discovery.recommend_agent(["nonexistent"])
|
||
# Falls back to any available agent
|
||
assert result is not None
|
||
assert result.name == "rag_agent"
|
||
|
||
def test_recommend_returns_none_when_no_available(self, org_context, profile_rag):
|
||
org_context.register_agent(profile_rag)
|
||
org_context.set_availability("rag_agent", False)
|
||
discovery = AgentDiscovery(org_context)
|
||
result = discovery.recommend_agent(["rag"])
|
||
assert result is None
|
||
|
||
def test_recommend_preferred_mode_no_match_uses_any_match(self, org_context, profile_rag):
|
||
org_context.register_agent(profile_rag)
|
||
discovery = AgentDiscovery(org_context)
|
||
# rag_agent has react mode, but we prefer plan_exec
|
||
result = discovery.recommend_agent(["rag"], preferred_mode="plan_exec")
|
||
# No plan_exec match, but still has capability match
|
||
assert result is not None
|
||
assert result.name == "rag_agent"
|