696 lines
25 KiB
Python
696 lines
25 KiB
Python
"""AuctionHouse 与 WealthTracker 单元测试"""
|
||
|
||
import threading
|
||
|
||
import pytest
|
||
|
||
from agentkit.marketplace.auction import AuctionHouse, AuctionResult, Bid
|
||
from agentkit.marketplace.wealth import WealthTracker
|
||
|
||
|
||
# ---- Fixtures ----
|
||
|
||
|
||
@pytest.fixture
|
||
def wealth_tracker():
|
||
return WealthTracker()
|
||
|
||
|
||
@pytest.fixture
|
||
def auction_house():
|
||
return AuctionHouse()
|
||
|
||
|
||
@pytest.fixture
|
||
def auction_house_with_tracker():
|
||
tracker = WealthTracker()
|
||
return AuctionHouse(wealth_tracker=tracker), tracker
|
||
|
||
|
||
def make_bid(
|
||
agent_name: str = "agent_a",
|
||
architecture: str = "react",
|
||
estimated_steps: int = 5,
|
||
estimated_cost: float = 10.0,
|
||
confidence: float = 0.8,
|
||
payment_offer: float = 1.0,
|
||
capabilities: list[str] | None = None,
|
||
) -> Bid:
|
||
return Bid(
|
||
agent_name=agent_name,
|
||
architecture=architecture,
|
||
estimated_steps=estimated_steps,
|
||
estimated_cost=estimated_cost,
|
||
confidence=confidence,
|
||
payment_offer=payment_offer,
|
||
capabilities=capabilities or [],
|
||
)
|
||
|
||
|
||
# ---- AuctionHouse 测试 ----
|
||
|
||
|
||
class TestAuctionHouseSingleBidder:
|
||
"""单一竞价者自动获胜"""
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_single_bidder_wins(self, auction_house):
|
||
bid = make_bid(agent_name="solo_agent")
|
||
result = await auction_house.run_auction("do something", [bid])
|
||
assert result.winner is not None
|
||
assert result.winner.agent_name == "solo_agent"
|
||
assert result.total_bidders == 1
|
||
|
||
|
||
class TestAuctionHouseMultipleBidders:
|
||
"""多竞价者,最高分获胜"""
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_highest_score_wins(self, auction_house):
|
||
bid_low = make_bid(
|
||
agent_name="low_agent",
|
||
confidence=0.5,
|
||
estimated_cost=10.0,
|
||
)
|
||
bid_high = make_bid(
|
||
agent_name="high_agent",
|
||
confidence=0.9,
|
||
estimated_cost=10.0,
|
||
)
|
||
result = await auction_house.run_auction("do something", [bid_low, bid_high])
|
||
assert result.winner is not None
|
||
assert result.winner.agent_name == "high_agent"
|
||
|
||
|
||
class TestAuctionHouseNoBidders:
|
||
"""无竞价者返回 None winner"""
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_no_bidders_returns_none(self, auction_house):
|
||
result = await auction_house.run_auction("do something", [])
|
||
assert result.winner is None
|
||
assert result.total_bidders == 0
|
||
assert result.all_bids == []
|
||
|
||
|
||
class TestAuctionHouseWealthFactor:
|
||
"""财富因子影响评分"""
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_wealth_factor_affects_scoring(self):
|
||
tracker = WealthTracker()
|
||
# Give agent_rich more wealth
|
||
tracker.reward("agent_rich", 500.0)
|
||
house = AuctionHouse(wealth_tracker=tracker)
|
||
|
||
# Same confidence and cost, but different wealth
|
||
bid_rich = make_bid(agent_name="agent_rich", confidence=0.8, estimated_cost=10.0)
|
||
bid_poor = make_bid(agent_name="agent_poor", confidence=0.8, estimated_cost=10.0)
|
||
|
||
result = await house.run_auction("do something", [bid_rich, bid_poor])
|
||
assert result.winner is not None
|
||
assert result.winner.agent_name == "agent_rich"
|
||
|
||
|
||
class TestAuctionHouseZeroCost:
|
||
"""零 estimated_cost 处理(max 与 0.001)"""
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_zero_estimated_cost_handled(self, auction_house):
|
||
bid = make_bid(agent_name="zero_cost_agent", confidence=0.8, estimated_cost=0.0)
|
||
result = await auction_house.run_auction("do something", [bid])
|
||
assert result.winner is not None
|
||
assert result.winner.agent_name == "zero_cost_agent"
|
||
|
||
def test_score_bid_zero_cost(self, auction_house):
|
||
bid = make_bid(agent_name="zero_cost_agent", confidence=0.8, estimated_cost=0.0)
|
||
score = auction_house.score_bid(bid)
|
||
# score = (0.8 / max(0.0, 0.001)) * 1.1 = (0.8 / 0.001) * 1.1 = 880.0
|
||
expected = (0.8 / 0.001) * 1.1
|
||
assert abs(score - expected) < 0.01
|
||
|
||
|
||
class TestBidScoringFormula:
|
||
"""竞价评分公式验证"""
|
||
|
||
def test_score_formula(self):
|
||
tracker = WealthTracker()
|
||
# Default wealth = 100, so wealth_factor = 1.0 + (100 / 1000.0) = 1.1
|
||
house = AuctionHouse(wealth_tracker=tracker)
|
||
|
||
bid = make_bid(agent_name="test_agent", confidence=0.9, estimated_cost=5.0)
|
||
score = house.score_bid(bid)
|
||
|
||
wealth_factor = 1.0 + (100.0 / 1000.0) # 1.1
|
||
expected = (0.9 / 5.0) * wealth_factor
|
||
assert abs(score - expected) < 0.0001
|
||
|
||
def test_score_formula_with_custom_wealth(self):
|
||
tracker = WealthTracker(initial_wealth=200.0)
|
||
tracker.reward("rich_agent", 300.0)
|
||
# wealth = 500, factor = 1.0 + 500/1000 = 1.5
|
||
house = AuctionHouse(wealth_tracker=tracker)
|
||
|
||
bid = make_bid(agent_name="rich_agent", confidence=0.6, estimated_cost=3.0)
|
||
score = house.score_bid(bid)
|
||
|
||
wealth_factor = 1.0 + (500.0 / 1000.0) # 1.5
|
||
expected = (0.6 / 3.0) * wealth_factor
|
||
assert abs(score - expected) < 0.0001
|
||
|
||
|
||
# ---- WealthTracker 测试 ----
|
||
|
||
|
||
class TestWealthTrackerInitialWealth:
|
||
"""初始财富默认值"""
|
||
|
||
def test_default_initial_wealth(self):
|
||
tracker = WealthTracker()
|
||
assert tracker.get_wealth("unknown_agent") == 100.0
|
||
|
||
def test_custom_initial_wealth(self):
|
||
tracker = WealthTracker(initial_wealth=50.0)
|
||
assert tracker.get_wealth("unknown_agent") == 50.0
|
||
|
||
|
||
class TestWealthTrackerReward:
|
||
"""奖励增加财富"""
|
||
|
||
def test_reward_increases_wealth(self, wealth_tracker):
|
||
wealth_tracker.reward("agent_a", 50.0)
|
||
assert wealth_tracker.get_wealth("agent_a") == 150.0
|
||
|
||
def test_reward_multiple_times(self, wealth_tracker):
|
||
wealth_tracker.reward("agent_a", 30.0)
|
||
wealth_tracker.reward("agent_a", 20.0)
|
||
assert wealth_tracker.get_wealth("agent_a") == 150.0
|
||
|
||
|
||
class TestWealthTrackerPenalize:
|
||
"""惩罚减少财富"""
|
||
|
||
def test_penalize_decreases_wealth(self, wealth_tracker):
|
||
wealth_tracker.penalize("agent_a", 30.0)
|
||
assert wealth_tracker.get_wealth("agent_a") == 70.0
|
||
|
||
def test_penalize_below_zero(self, wealth_tracker):
|
||
wealth_tracker.penalize("agent_a", 150.0)
|
||
assert wealth_tracker.get_wealth("agent_a") == -50.0
|
||
|
||
|
||
class TestWealthTrackerBankrupt:
|
||
"""破产检查(wealth <= -100)"""
|
||
|
||
def test_bankrupt_at_negative_100(self, wealth_tracker):
|
||
wealth_tracker.penalize("agent_a", 200.0)
|
||
assert wealth_tracker.get_wealth("agent_a") == -100.0
|
||
assert wealth_tracker.is_bankrupt("agent_a") is True
|
||
|
||
def test_bankrupt_below_negative_100(self, wealth_tracker):
|
||
wealth_tracker.penalize("agent_a", 250.0)
|
||
assert wealth_tracker.is_bankrupt("agent_a") is True
|
||
|
||
def test_not_bankrupt_above_negative_100(self, wealth_tracker):
|
||
wealth_tracker.penalize("agent_a", 150.0)
|
||
# wealth = -50, which is > -100
|
||
assert wealth_tracker.is_bankrupt("agent_a") is False
|
||
|
||
def test_not_bankrupt_at_default(self, wealth_tracker):
|
||
assert wealth_tracker.is_bankrupt("agent_a") is False
|
||
|
||
|
||
class TestWealthTrackerReset:
|
||
"""重置恢复初始财富"""
|
||
|
||
def test_reset_restores_initial_wealth(self, wealth_tracker):
|
||
wealth_tracker.reward("agent_a", 500.0)
|
||
wealth_tracker.reset("agent_a")
|
||
assert wealth_tracker.get_wealth("agent_a") == 100.0
|
||
|
||
def test_reset_with_custom_initial(self):
|
||
tracker = WealthTracker(initial_wealth=200.0)
|
||
tracker.penalize("agent_a", 50.0)
|
||
tracker.reset("agent_a")
|
||
assert tracker.get_wealth("agent_a") == 200.0
|
||
|
||
|
||
class TestWealthTrackerRankings:
|
||
"""排名按财富降序"""
|
||
|
||
def test_rankings_sorted_descending(self, wealth_tracker):
|
||
wealth_tracker.reward("agent_a", 100.0) # 200
|
||
wealth_tracker.reward("agent_b", 300.0) # 400
|
||
wealth_tracker.penalize("agent_c", 50.0) # 50
|
||
|
||
rankings = wealth_tracker.get_rankings()
|
||
assert rankings[0][0] == "agent_b"
|
||
assert rankings[1][0] == "agent_a"
|
||
assert rankings[2][0] == "agent_c"
|
||
|
||
def test_rankings_empty(self, wealth_tracker):
|
||
assert wealth_tracker.get_rankings() == []
|
||
|
||
|
||
class TestWealthTrackerWealthFactor:
|
||
"""财富因子计算"""
|
||
|
||
def test_wealth_factor_default(self, wealth_tracker):
|
||
# wealth = 100, factor = 1.0 + 100/1000 = 1.1
|
||
factor = wealth_tracker.get_wealth_factor("agent_a")
|
||
assert abs(factor - 1.1) < 0.0001
|
||
|
||
def test_wealth_factor_with_wealth(self, wealth_tracker):
|
||
wealth_tracker.reward("agent_a", 400.0) # wealth = 500
|
||
factor = wealth_tracker.get_wealth_factor("agent_a")
|
||
# factor = 1.0 + 500/1000 = 1.5
|
||
assert abs(factor - 1.5) < 0.0001
|
||
|
||
def test_wealth_factor_negative_wealth(self, wealth_tracker):
|
||
wealth_tracker.penalize("agent_a", 150.0) # wealth = -50
|
||
factor = wealth_tracker.get_wealth_factor("agent_a")
|
||
# factor = 1.0 + (-50)/1000 = 0.95
|
||
assert abs(factor - 0.95) < 0.0001
|
||
|
||
|
||
# ---- Auction 默认禁用验证 ----
|
||
|
||
|
||
class TestAuctionDefaultDisabled:
|
||
"""拍卖机制默认禁用"""
|
||
|
||
def test_auction_not_in_default_config(self):
|
||
"""验证默认配置中不包含 auction_enabled"""
|
||
from agentkit.server.config import ServerConfig
|
||
|
||
config = ServerConfig()
|
||
# marketplace section should not exist or auction_enabled should be False
|
||
marketplace_cfg = getattr(config, "marketplace", None)
|
||
if marketplace_cfg is not None:
|
||
auction_enabled = getattr(marketplace_cfg, "auction_enabled", False)
|
||
assert auction_enabled is False
|
||
# If marketplace doesn't exist at all, auction is implicitly disabled
|
||
|
||
|
||
# ---- Vickrey Auction 测试 ----
|
||
|
||
|
||
class TestVickreySingleBidder:
|
||
"""Vickrey 拍卖:单一竞价者获胜,支付自身成本(利润为0)"""
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_single_bidder_wins_pays_own_cost(self):
|
||
tracker = WealthTracker()
|
||
house = AuctionHouse(wealth_tracker=tracker)
|
||
bid = make_bid(agent_name="solo_agent", estimated_cost=5.0)
|
||
result = await house.run_vickrey_auction("task", [bid])
|
||
assert result.winner is not None
|
||
assert result.winner.agent_name == "solo_agent"
|
||
assert result.total_bidders == 1
|
||
# Winner pays own cost, so profit = 0 → wealth unchanged
|
||
assert tracker.get_wealth("solo_agent") == 100.0
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_single_bidder_selection_reason(self):
|
||
house = AuctionHouse()
|
||
bid = make_bid(agent_name="solo_agent", estimated_cost=10.0)
|
||
result = await house.run_vickrey_auction("task", [bid])
|
||
assert "sole eligible bidder" in result.selection_reason
|
||
|
||
|
||
class TestVickreyTwoBidders:
|
||
"""Vickrey 拍卖:两个竞价者,最低价赢,支付第二低价"""
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_lowest_cost_wins_pays_second(self):
|
||
tracker = WealthTracker()
|
||
house = AuctionHouse(wealth_tracker=tracker)
|
||
bid_cheap = make_bid(agent_name="cheap_agent", estimated_cost=5.0)
|
||
bid_expensive = make_bid(agent_name="expensive_agent", estimated_cost=10.0)
|
||
result = await house.run_vickrey_auction("task", [bid_cheap, bid_expensive])
|
||
assert result.winner is not None
|
||
assert result.winner.agent_name == "cheap_agent"
|
||
# Winner pays second-lowest = 10.0, profit = 10.0 - 5.0 = 5.0
|
||
assert tracker.get_wealth("cheap_agent") == 100.0 + 5.0
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_second_price_not_own_price(self):
|
||
tracker = WealthTracker()
|
||
house = AuctionHouse(wealth_tracker=tracker)
|
||
bid_a = make_bid(agent_name="agent_a", estimated_cost=3.0)
|
||
bid_b = make_bid(agent_name="agent_b", estimated_cost=7.0)
|
||
result = await house.run_vickrey_auction("task", [bid_a, bid_b])
|
||
# agent_a wins, pays 7.0 (not 3.0), profit = 7.0 - 3.0 = 4.0
|
||
assert tracker.get_wealth("agent_a") == 100.0 + 4.0
|
||
# Loser pays nothing
|
||
assert tracker.get_wealth("agent_b") == 100.0
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_selection_reason_contains_second_price(self):
|
||
house = AuctionHouse()
|
||
bid_a = make_bid(agent_name="agent_a", estimated_cost=3.0)
|
||
bid_b = make_bid(agent_name="agent_b", estimated_cost=7.0)
|
||
result = await house.run_vickrey_auction("task", [bid_a, bid_b])
|
||
assert "7.0" in result.selection_reason
|
||
assert "agent_b" in result.selection_reason
|
||
|
||
|
||
class TestVickreyThreeBidders:
|
||
"""Vickrey 拍卖:三个竞价者,最低价赢,支付第二低价"""
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_lowest_wins_pays_second_lowest(self):
|
||
tracker = WealthTracker()
|
||
house = AuctionHouse(wealth_tracker=tracker)
|
||
bid_a = make_bid(agent_name="agent_a", estimated_cost=3.0)
|
||
bid_b = make_bid(agent_name="agent_b", estimated_cost=7.0)
|
||
bid_c = make_bid(agent_name="agent_c", estimated_cost=12.0)
|
||
result = await house.run_vickrey_auction("task", [bid_a, bid_b, bid_c])
|
||
assert result.winner is not None
|
||
assert result.winner.agent_name == "agent_a"
|
||
# Winner pays second-lowest = 7.0, profit = 7.0 - 3.0 = 4.0
|
||
assert tracker.get_wealth("agent_a") == 100.0 + 4.0
|
||
# Third bidder's cost doesn't affect payment
|
||
assert result.total_bidders == 3
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_middle_bidder_wins_when_cheapest_bankrupt(self):
|
||
tracker = WealthTracker()
|
||
# Make agent_a bankrupt
|
||
tracker.penalize("agent_a", 300.0)
|
||
assert tracker.is_bankrupt("agent_a") is True
|
||
house = AuctionHouse(wealth_tracker=tracker)
|
||
bid_a = make_bid(agent_name="agent_a", estimated_cost=3.0)
|
||
bid_b = make_bid(agent_name="agent_b", estimated_cost=7.0)
|
||
bid_c = make_bid(agent_name="agent_c", estimated_cost=12.0)
|
||
result = await house.run_vickrey_auction("task", [bid_a, bid_b, bid_c])
|
||
# agent_a is bankrupt, so agent_b wins, pays 12.0
|
||
assert result.winner is not None
|
||
assert result.winner.agent_name == "agent_b"
|
||
# profit = 12.0 - 7.0 = 5.0
|
||
assert tracker.get_wealth("agent_b") == 100.0 + 5.0
|
||
|
||
|
||
class TestVickreyCapabilityFiltering:
|
||
"""Vickrey 拍卖:能力过滤"""
|
||
|
||
def test_filter_by_capabilities_basic(self):
|
||
house = AuctionHouse()
|
||
bids = [
|
||
make_bid(agent_name="a", capabilities=["search", "analysis"]),
|
||
make_bid(agent_name="b", capabilities=["search"]),
|
||
make_bid(agent_name="c", capabilities=["analysis", "coding"]),
|
||
]
|
||
filtered = house.filter_by_capabilities(bids, ["search"])
|
||
assert len(filtered) == 2
|
||
assert {b.agent_name for b in filtered} == {"a", "b"}
|
||
|
||
def test_filter_by_capabilities_requires_all(self):
|
||
house = AuctionHouse()
|
||
bids = [
|
||
make_bid(agent_name="a", capabilities=["search", "analysis"]),
|
||
make_bid(agent_name="b", capabilities=["search"]),
|
||
make_bid(agent_name="c", capabilities=["analysis", "coding"]),
|
||
]
|
||
filtered = house.filter_by_capabilities(bids, ["search", "analysis"])
|
||
assert len(filtered) == 1
|
||
assert filtered[0].agent_name == "a"
|
||
|
||
def test_filter_by_capabilities_case_insensitive(self):
|
||
house = AuctionHouse()
|
||
bids = [
|
||
make_bid(agent_name="a", capabilities=["Search", "Analysis"]),
|
||
]
|
||
filtered = house.filter_by_capabilities(bids, ["search", "analysis"])
|
||
assert len(filtered) == 1
|
||
|
||
def test_filter_by_capabilities_no_match(self):
|
||
house = AuctionHouse()
|
||
bids = [
|
||
make_bid(agent_name="a", capabilities=["search"]),
|
||
]
|
||
filtered = house.filter_by_capabilities(bids, ["coding"])
|
||
assert len(filtered) == 0
|
||
|
||
def test_filter_by_capabilities_empty_requirements(self):
|
||
house = AuctionHouse()
|
||
bids = [
|
||
make_bid(agent_name="a", capabilities=["search"]),
|
||
]
|
||
filtered = house.filter_by_capabilities(bids, [])
|
||
assert len(filtered) == 1
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_vickrey_with_capability_filtering(self):
|
||
tracker = WealthTracker()
|
||
house = AuctionHouse(wealth_tracker=tracker)
|
||
bids = [
|
||
make_bid(agent_name="a", estimated_cost=5.0, capabilities=["search", "analysis"]),
|
||
make_bid(agent_name="b", estimated_cost=3.0, capabilities=["search"]),
|
||
make_bid(agent_name="c", estimated_cost=8.0, capabilities=["search", "analysis"]),
|
||
]
|
||
# Require both "search" and "analysis" → only a and c eligible
|
||
result = await house.run_vickrey_auction("task", bids, required_capabilities=["search", "analysis"])
|
||
assert result.winner is not None
|
||
assert result.winner.agent_name == "a"
|
||
# a wins (cost=5.0), pays second-lowest among eligible = c's cost = 8.0
|
||
assert tracker.get_wealth("a") == 100.0 + (8.0 - 5.0)
|
||
|
||
|
||
class TestVickreyBankruptAgent:
|
||
"""Vickrey 拍卖:破产 Agent 被排除"""
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_bankrupt_agent_excluded(self):
|
||
tracker = WealthTracker()
|
||
tracker.penalize("bankrupt_agent", 300.0) # wealth = -200, bankrupt
|
||
house = AuctionHouse(wealth_tracker=tracker)
|
||
bid_bankrupt = make_bid(agent_name="bankrupt_agent", estimated_cost=1.0)
|
||
bid_ok = make_bid(agent_name="ok_agent", estimated_cost=10.0)
|
||
result = await house.run_vickrey_auction("task", [bid_bankrupt, bid_ok])
|
||
assert result.winner is not None
|
||
assert result.winner.agent_name == "ok_agent"
|
||
# Only 1 eligible bidder → pays own cost, profit = 0
|
||
assert tracker.get_wealth("ok_agent") == 100.0
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_all_bankrupt_returns_none(self):
|
||
tracker = WealthTracker()
|
||
tracker.penalize("a", 300.0)
|
||
tracker.penalize("b", 300.0)
|
||
house = AuctionHouse(wealth_tracker=tracker)
|
||
bids = [
|
||
make_bid(agent_name="a", estimated_cost=1.0),
|
||
make_bid(agent_name="b", estimated_cost=2.0),
|
||
]
|
||
result = await house.run_vickrey_auction("task", bids)
|
||
assert result.winner is None
|
||
assert "bankrupt" in result.selection_reason.lower() or "eligible" in result.selection_reason.lower()
|
||
|
||
|
||
class TestVickreyNoBidders:
|
||
"""Vickrey 拍卖:无竞价者"""
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_no_bidders_returns_none(self):
|
||
house = AuctionHouse()
|
||
result = await house.run_vickrey_auction("task", [])
|
||
assert result.winner is None
|
||
assert result.total_bidders == 0
|
||
assert result.all_bids == []
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_no_eligible_after_capability_filter(self):
|
||
house = AuctionHouse()
|
||
bid = make_bid(agent_name="a", estimated_cost=5.0, capabilities=["search"])
|
||
result = await house.run_vickrey_auction("task", [bid], required_capabilities=["coding"])
|
||
assert result.winner is None
|
||
assert result.total_bidders == 1
|
||
|
||
|
||
class TestVickreyWealthTrackerUpdate:
|
||
"""Vickrey 拍卖:WealthTracker 正确更新"""
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_winner_earns_payment_minus_cost(self):
|
||
tracker = WealthTracker(initial_wealth=50.0)
|
||
house = AuctionHouse(wealth_tracker=tracker)
|
||
bid_a = make_bid(agent_name="a", estimated_cost=4.0)
|
||
bid_b = make_bid(agent_name="b", estimated_cost=9.0)
|
||
await house.run_vickrey_auction("task", [bid_a, bid_b])
|
||
# a wins, pays 9.0, profit = 9.0 - 4.0 = 5.0
|
||
assert tracker.get_wealth("a") == 50.0 + 5.0
|
||
# b pays nothing
|
||
assert tracker.get_wealth("b") == 50.0
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_single_bidder_zero_profit(self):
|
||
tracker = WealthTracker(initial_wealth=100.0)
|
||
house = AuctionHouse(wealth_tracker=tracker)
|
||
# Single bidder: pays own cost, profit = 0
|
||
bid = make_bid(agent_name="a", estimated_cost=10.0)
|
||
await house.run_vickrey_auction("task", [bid])
|
||
assert tracker.get_wealth("a") == 100.0
|
||
|
||
|
||
class TestVickreyBackwardCompat:
|
||
"""Vickrey 拍卖:原有 score_bid 方法仍然可用"""
|
||
|
||
def test_score_bid_still_works(self):
|
||
tracker = WealthTracker()
|
||
house = AuctionHouse(wealth_tracker=tracker)
|
||
bid = make_bid(agent_name="test_agent", confidence=0.9, estimated_cost=5.0)
|
||
score = house.score_bid(bid)
|
||
wealth_factor = 1.0 + (100.0 / 1000.0)
|
||
expected = (0.9 / 5.0) * wealth_factor
|
||
assert abs(score - expected) < 0.0001
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_run_auction_still_works(self):
|
||
house = AuctionHouse()
|
||
bid_low = make_bid(agent_name="low_agent", confidence=0.5, estimated_cost=10.0)
|
||
bid_high = make_bid(agent_name="high_agent", confidence=0.9, estimated_cost=10.0)
|
||
result = await house.run_auction("do something", [bid_low, bid_high])
|
||
assert result.winner is not None
|
||
assert result.winner.agent_name == "high_agent"
|
||
|
||
|
||
# ---- Bid Validation 测试 ----
|
||
|
||
|
||
class TestBidValidation:
|
||
"""Bid __post_init__ 验证"""
|
||
|
||
def test_negative_estimated_cost_raises(self):
|
||
with pytest.raises(ValueError, match="estimated_cost must be non-negative"):
|
||
Bid(
|
||
agent_name="a",
|
||
architecture="react",
|
||
estimated_steps=5,
|
||
estimated_cost=-1.0,
|
||
confidence=0.8,
|
||
payment_offer=1.0,
|
||
capabilities=[],
|
||
)
|
||
|
||
def test_confidence_above_one_raises(self):
|
||
with pytest.raises(ValueError, match="confidence must be between"):
|
||
Bid(
|
||
agent_name="a",
|
||
architecture="react",
|
||
estimated_steps=5,
|
||
estimated_cost=10.0,
|
||
confidence=1.5,
|
||
payment_offer=1.0,
|
||
capabilities=[],
|
||
)
|
||
|
||
def test_confidence_below_zero_raises(self):
|
||
with pytest.raises(ValueError, match="confidence must be between"):
|
||
Bid(
|
||
agent_name="a",
|
||
architecture="react",
|
||
estimated_steps=5,
|
||
estimated_cost=10.0,
|
||
confidence=-0.1,
|
||
payment_offer=1.0,
|
||
capabilities=[],
|
||
)
|
||
|
||
def test_negative_payment_offer_raises(self):
|
||
with pytest.raises(ValueError, match="payment_offer must be non-negative"):
|
||
Bid(
|
||
agent_name="a",
|
||
architecture="react",
|
||
estimated_steps=5,
|
||
estimated_cost=10.0,
|
||
confidence=0.8,
|
||
payment_offer=-5.0,
|
||
capabilities=[],
|
||
)
|
||
|
||
def test_zero_values_allowed(self):
|
||
bid = Bid(
|
||
agent_name="a",
|
||
architecture="react",
|
||
estimated_steps=5,
|
||
estimated_cost=0.0,
|
||
confidence=0.0,
|
||
payment_offer=0.0,
|
||
capabilities=[],
|
||
)
|
||
assert bid.estimated_cost == 0.0
|
||
assert bid.confidence == 0.0
|
||
assert bid.payment_offer == 0.0
|
||
|
||
def test_boundary_confidence_one_allowed(self):
|
||
bid = Bid(
|
||
agent_name="a",
|
||
architecture="react",
|
||
estimated_steps=5,
|
||
estimated_cost=10.0,
|
||
confidence=1.0,
|
||
payment_offer=1.0,
|
||
capabilities=[],
|
||
)
|
||
assert bid.confidence == 1.0
|
||
|
||
|
||
# ---- WealthTracker Thread Safety 测试 ----
|
||
|
||
|
||
class TestWealthTrackerThreadSafety:
|
||
"""WealthTracker 线程安全"""
|
||
|
||
def test_concurrent_reward_penalize(self):
|
||
tracker = WealthTracker(initial_wealth=1000.0)
|
||
errors: list[Exception] = []
|
||
|
||
def worker(action: str, name: str, amount: float, count: int):
|
||
try:
|
||
for _ in range(count):
|
||
if action == "reward":
|
||
tracker.reward(name, amount)
|
||
else:
|
||
tracker.penalize(name, amount)
|
||
except Exception as e:
|
||
errors.append(e)
|
||
|
||
threads = [
|
||
threading.Thread(target=worker, args=("reward", "agent_a", 1.0, 100)),
|
||
threading.Thread(target=worker, args=("penalize", "agent_a", 1.0, 100)),
|
||
threading.Thread(target=worker, args=("reward", "agent_b", 2.0, 50)),
|
||
]
|
||
for t in threads:
|
||
t.start()
|
||
for t in threads:
|
||
t.join()
|
||
|
||
assert not errors
|
||
# agent_a: 1000 + 100*1.0 - 100*1.0 = 1000
|
||
assert tracker.get_wealth("agent_a") == 1000.0
|
||
# agent_b: 1000 + 50*2.0 = 1100
|
||
assert tracker.get_wealth("agent_b") == 1100.0
|
||
|
||
|
||
# ---- Wealth Factor Lower Bound 测试 ----
|
||
|
||
|
||
class TestWealthFactorLowerBound:
|
||
"""get_wealth_factor 下限保护"""
|
||
|
||
def test_extremely_negative_wealth_clamped(self):
|
||
tracker = WealthTracker(initial_wealth=100.0)
|
||
tracker.penalize("agent_a", 5000.0)
|
||
# wealth = 100 - 5000 = -4900, factor would be 1.0 + (-4900/1000) = -3.9
|
||
# But with lower bound: max(0.01, -3.9) = 0.01
|
||
factor = tracker.get_wealth_factor("agent_a")
|
||
assert factor == 0.01
|
||
|
||
def test_slightly_negative_wealth_not_clamped(self):
|
||
tracker = WealthTracker(initial_wealth=100.0)
|
||
tracker.penalize("agent_a", 150.0)
|
||
# wealth = -50, factor = 1.0 + (-50/1000) = 0.95
|
||
factor = tracker.get_wealth_factor("agent_a")
|
||
assert abs(factor - 0.95) < 0.0001
|