291 lines
9.5 KiB
Python
291 lines
9.5 KiB
Python
"""AuctionHouse 与 WealthTracker 单元测试"""
|
||
|
||
import pytest
|
||
|
||
from agentkit.marketplace.auction import AuctionHouse, AuctionResult, Bid
|
||
from agentkit.marketplace.wealth import WealthTracker
|
||
|
||
|
||
# ---- Fixtures ----
|
||
|
||
|
||
@pytest.fixture
|
||
def wealth_tracker():
|
||
return WealthTracker()
|
||
|
||
|
||
@pytest.fixture
|
||
def auction_house():
|
||
return AuctionHouse()
|
||
|
||
|
||
@pytest.fixture
|
||
def auction_house_with_tracker():
|
||
tracker = WealthTracker()
|
||
return AuctionHouse(wealth_tracker=tracker), tracker
|
||
|
||
|
||
def make_bid(
|
||
agent_name: str = "agent_a",
|
||
architecture: str = "react",
|
||
estimated_steps: int = 5,
|
||
estimated_cost: float = 10.0,
|
||
confidence: float = 0.8,
|
||
payment_offer: float = 1.0,
|
||
capabilities: list[str] | None = None,
|
||
) -> Bid:
|
||
return Bid(
|
||
agent_name=agent_name,
|
||
architecture=architecture,
|
||
estimated_steps=estimated_steps,
|
||
estimated_cost=estimated_cost,
|
||
confidence=confidence,
|
||
payment_offer=payment_offer,
|
||
capabilities=capabilities or [],
|
||
)
|
||
|
||
|
||
# ---- AuctionHouse 测试 ----
|
||
|
||
|
||
class TestAuctionHouseSingleBidder:
|
||
"""单一竞价者自动获胜"""
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_single_bidder_wins(self, auction_house):
|
||
bid = make_bid(agent_name="solo_agent")
|
||
result = await auction_house.run_auction("do something", [bid])
|
||
assert result.winner is not None
|
||
assert result.winner.agent_name == "solo_agent"
|
||
assert result.total_bidders == 1
|
||
|
||
|
||
class TestAuctionHouseMultipleBidders:
|
||
"""多竞价者,最高分获胜"""
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_highest_score_wins(self, auction_house):
|
||
bid_low = make_bid(
|
||
agent_name="low_agent",
|
||
confidence=0.5,
|
||
estimated_cost=10.0,
|
||
)
|
||
bid_high = make_bid(
|
||
agent_name="high_agent",
|
||
confidence=0.9,
|
||
estimated_cost=10.0,
|
||
)
|
||
result = await auction_house.run_auction("do something", [bid_low, bid_high])
|
||
assert result.winner is not None
|
||
assert result.winner.agent_name == "high_agent"
|
||
|
||
|
||
class TestAuctionHouseNoBidders:
|
||
"""无竞价者返回 None winner"""
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_no_bidders_returns_none(self, auction_house):
|
||
result = await auction_house.run_auction("do something", [])
|
||
assert result.winner is None
|
||
assert result.total_bidders == 0
|
||
assert result.all_bids == []
|
||
|
||
|
||
class TestAuctionHouseWealthFactor:
|
||
"""财富因子影响评分"""
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_wealth_factor_affects_scoring(self):
|
||
tracker = WealthTracker()
|
||
# Give agent_rich more wealth
|
||
tracker.reward("agent_rich", 500.0)
|
||
house = AuctionHouse(wealth_tracker=tracker)
|
||
|
||
# Same confidence and cost, but different wealth
|
||
bid_rich = make_bid(agent_name="agent_rich", confidence=0.8, estimated_cost=10.0)
|
||
bid_poor = make_bid(agent_name="agent_poor", confidence=0.8, estimated_cost=10.0)
|
||
|
||
result = await house.run_auction("do something", [bid_rich, bid_poor])
|
||
assert result.winner is not None
|
||
assert result.winner.agent_name == "agent_rich"
|
||
|
||
|
||
class TestAuctionHouseZeroCost:
|
||
"""零 estimated_cost 处理(max 与 0.001)"""
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_zero_estimated_cost_handled(self, auction_house):
|
||
bid = make_bid(agent_name="zero_cost_agent", confidence=0.8, estimated_cost=0.0)
|
||
result = await auction_house.run_auction("do something", [bid])
|
||
assert result.winner is not None
|
||
assert result.winner.agent_name == "zero_cost_agent"
|
||
|
||
def test_score_bid_zero_cost(self, auction_house):
|
||
bid = make_bid(agent_name="zero_cost_agent", confidence=0.8, estimated_cost=0.0)
|
||
score = auction_house.score_bid(bid)
|
||
# score = (0.8 / max(0.0, 0.001)) * 1.1 = (0.8 / 0.001) * 1.1 = 880.0
|
||
expected = (0.8 / 0.001) * 1.1
|
||
assert abs(score - expected) < 0.01
|
||
|
||
|
||
class TestBidScoringFormula:
|
||
"""竞价评分公式验证"""
|
||
|
||
def test_score_formula(self):
|
||
tracker = WealthTracker()
|
||
# Default wealth = 100, so wealth_factor = 1.0 + (100 / 1000.0) = 1.1
|
||
house = AuctionHouse(wealth_tracker=tracker)
|
||
|
||
bid = make_bid(agent_name="test_agent", confidence=0.9, estimated_cost=5.0)
|
||
score = house.score_bid(bid)
|
||
|
||
wealth_factor = 1.0 + (100.0 / 1000.0) # 1.1
|
||
expected = (0.9 / 5.0) * wealth_factor
|
||
assert abs(score - expected) < 0.0001
|
||
|
||
def test_score_formula_with_custom_wealth(self):
|
||
tracker = WealthTracker(initial_wealth=200.0)
|
||
tracker.reward("rich_agent", 300.0)
|
||
# wealth = 500, factor = 1.0 + 500/1000 = 1.5
|
||
house = AuctionHouse(wealth_tracker=tracker)
|
||
|
||
bid = make_bid(agent_name="rich_agent", confidence=0.6, estimated_cost=3.0)
|
||
score = house.score_bid(bid)
|
||
|
||
wealth_factor = 1.0 + (500.0 / 1000.0) # 1.5
|
||
expected = (0.6 / 3.0) * wealth_factor
|
||
assert abs(score - expected) < 0.0001
|
||
|
||
|
||
# ---- WealthTracker 测试 ----
|
||
|
||
|
||
class TestWealthTrackerInitialWealth:
|
||
"""初始财富默认值"""
|
||
|
||
def test_default_initial_wealth(self):
|
||
tracker = WealthTracker()
|
||
assert tracker.get_wealth("unknown_agent") == 100.0
|
||
|
||
def test_custom_initial_wealth(self):
|
||
tracker = WealthTracker(initial_wealth=50.0)
|
||
assert tracker.get_wealth("unknown_agent") == 50.0
|
||
|
||
|
||
class TestWealthTrackerReward:
|
||
"""奖励增加财富"""
|
||
|
||
def test_reward_increases_wealth(self, wealth_tracker):
|
||
wealth_tracker.reward("agent_a", 50.0)
|
||
assert wealth_tracker.get_wealth("agent_a") == 150.0
|
||
|
||
def test_reward_multiple_times(self, wealth_tracker):
|
||
wealth_tracker.reward("agent_a", 30.0)
|
||
wealth_tracker.reward("agent_a", 20.0)
|
||
assert wealth_tracker.get_wealth("agent_a") == 150.0
|
||
|
||
|
||
class TestWealthTrackerPenalize:
|
||
"""惩罚减少财富"""
|
||
|
||
def test_penalize_decreases_wealth(self, wealth_tracker):
|
||
wealth_tracker.penalize("agent_a", 30.0)
|
||
assert wealth_tracker.get_wealth("agent_a") == 70.0
|
||
|
||
def test_penalize_below_zero(self, wealth_tracker):
|
||
wealth_tracker.penalize("agent_a", 150.0)
|
||
assert wealth_tracker.get_wealth("agent_a") == -50.0
|
||
|
||
|
||
class TestWealthTrackerBankrupt:
|
||
"""破产检查(wealth <= -100)"""
|
||
|
||
def test_bankrupt_at_negative_100(self, wealth_tracker):
|
||
wealth_tracker.penalize("agent_a", 200.0)
|
||
assert wealth_tracker.get_wealth("agent_a") == -100.0
|
||
assert wealth_tracker.is_bankrupt("agent_a") is True
|
||
|
||
def test_bankrupt_below_negative_100(self, wealth_tracker):
|
||
wealth_tracker.penalize("agent_a", 250.0)
|
||
assert wealth_tracker.is_bankrupt("agent_a") is True
|
||
|
||
def test_not_bankrupt_above_negative_100(self, wealth_tracker):
|
||
wealth_tracker.penalize("agent_a", 150.0)
|
||
# wealth = -50, which is > -100
|
||
assert wealth_tracker.is_bankrupt("agent_a") is False
|
||
|
||
def test_not_bankrupt_at_default(self, wealth_tracker):
|
||
assert wealth_tracker.is_bankrupt("agent_a") is False
|
||
|
||
|
||
class TestWealthTrackerReset:
|
||
"""重置恢复初始财富"""
|
||
|
||
def test_reset_restores_initial_wealth(self, wealth_tracker):
|
||
wealth_tracker.reward("agent_a", 500.0)
|
||
wealth_tracker.reset("agent_a")
|
||
assert wealth_tracker.get_wealth("agent_a") == 100.0
|
||
|
||
def test_reset_with_custom_initial(self):
|
||
tracker = WealthTracker(initial_wealth=200.0)
|
||
tracker.penalize("agent_a", 50.0)
|
||
tracker.reset("agent_a")
|
||
assert tracker.get_wealth("agent_a") == 200.0
|
||
|
||
|
||
class TestWealthTrackerRankings:
|
||
"""排名按财富降序"""
|
||
|
||
def test_rankings_sorted_descending(self, wealth_tracker):
|
||
wealth_tracker.reward("agent_a", 100.0) # 200
|
||
wealth_tracker.reward("agent_b", 300.0) # 400
|
||
wealth_tracker.penalize("agent_c", 50.0) # 50
|
||
|
||
rankings = wealth_tracker.get_rankings()
|
||
assert rankings[0][0] == "agent_b"
|
||
assert rankings[1][0] == "agent_a"
|
||
assert rankings[2][0] == "agent_c"
|
||
|
||
def test_rankings_empty(self, wealth_tracker):
|
||
assert wealth_tracker.get_rankings() == []
|
||
|
||
|
||
class TestWealthTrackerWealthFactor:
|
||
"""财富因子计算"""
|
||
|
||
def test_wealth_factor_default(self, wealth_tracker):
|
||
# wealth = 100, factor = 1.0 + 100/1000 = 1.1
|
||
factor = wealth_tracker.get_wealth_factor("agent_a")
|
||
assert abs(factor - 1.1) < 0.0001
|
||
|
||
def test_wealth_factor_with_wealth(self, wealth_tracker):
|
||
wealth_tracker.reward("agent_a", 400.0) # wealth = 500
|
||
factor = wealth_tracker.get_wealth_factor("agent_a")
|
||
# factor = 1.0 + 500/1000 = 1.5
|
||
assert abs(factor - 1.5) < 0.0001
|
||
|
||
def test_wealth_factor_negative_wealth(self, wealth_tracker):
|
||
wealth_tracker.penalize("agent_a", 150.0) # wealth = -50
|
||
factor = wealth_tracker.get_wealth_factor("agent_a")
|
||
# factor = 1.0 + (-50)/1000 = 0.95
|
||
assert abs(factor - 0.95) < 0.0001
|
||
|
||
|
||
# ---- Auction 默认禁用验证 ----
|
||
|
||
|
||
class TestAuctionDefaultDisabled:
|
||
"""拍卖机制默认禁用"""
|
||
|
||
def test_auction_not_in_default_config(self):
|
||
"""验证默认配置中不包含 auction_enabled"""
|
||
from agentkit.server.config import ServerConfig
|
||
|
||
config = ServerConfig()
|
||
# marketplace section should not exist or auction_enabled should be False
|
||
marketplace_cfg = getattr(config, "marketplace", None)
|
||
if marketplace_cfg is not None:
|
||
auction_enabled = getattr(marketplace_cfg, "auction_enabled", False)
|
||
assert auction_enabled is False
|
||
# If marketplace doesn't exist at all, auction is implicitly disabled
|