373 lines
12 KiB
Python
373 lines
12 KiB
Python
import uuid
|
|
from unittest.mock import AsyncMock, patch, MagicMock
|
|
|
|
import pytest
|
|
import pytest_asyncio
|
|
from fastapi import Depends, FastAPI
|
|
from httpx import ASGITransport, AsyncClient
|
|
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
|
|
from sqlalchemy.pool import StaticPool
|
|
|
|
from app.api.deps import get_current_user, get_db
|
|
from app.database import Base
|
|
from app.main import app
|
|
from app.middleware.subscription_enforcement import SubscriptionEnforcement
|
|
from app.models.payment_order import PaymentOrder as PaymentOrderModel
|
|
from app.models.user import User
|
|
from app.services.auth import hash_password
|
|
from app.services.payment.base import PaymentCallback
|
|
|
|
|
|
def _make_user(
|
|
user_id: str | None = None,
|
|
email: str = "test@example.com",
|
|
plan: str = "free",
|
|
) -> User:
|
|
uid = user_id or str(uuid.uuid4())
|
|
user = User(
|
|
id=uid,
|
|
email=email,
|
|
password=hash_password("Test@123456"),
|
|
firstName="Test",
|
|
lastName="User",
|
|
isActive=True,
|
|
emailVerified=True,
|
|
)
|
|
user.plan = plan
|
|
user.max_queries = 50 if plan != "free" else 5
|
|
return user
|
|
|
|
|
|
@pytest_asyncio.fixture
|
|
async def async_engine():
|
|
engine = create_async_engine(
|
|
"sqlite+aiosqlite:///:memory:",
|
|
connect_args={"check_same_thread": False},
|
|
poolclass=StaticPool,
|
|
)
|
|
async with engine.begin() as conn:
|
|
await conn.run_sync(Base.metadata.create_all)
|
|
yield engine
|
|
await engine.dispose()
|
|
|
|
|
|
@pytest_asyncio.fixture
|
|
async def async_session(async_engine):
|
|
maker = async_sessionmaker(
|
|
async_engine,
|
|
class_=AsyncSession,
|
|
expire_on_commit=False,
|
|
autoflush=False,
|
|
autocommit=False,
|
|
)
|
|
async with maker() as session:
|
|
yield session
|
|
|
|
|
|
@pytest_asyncio.fixture
|
|
async def free_user(async_session):
|
|
user = _make_user(plan="free")
|
|
async_session.add(user)
|
|
await async_session.commit()
|
|
await async_session.refresh(user)
|
|
return user
|
|
|
|
|
|
@pytest_asyncio.fixture
|
|
async def pro_user(async_session):
|
|
uid = str(uuid.uuid4())
|
|
user = _make_user(user_id=uid, email="pro@example.com", plan="pro")
|
|
async_session.add(user)
|
|
await async_session.commit()
|
|
await async_session.refresh(user)
|
|
return user
|
|
|
|
|
|
@pytest_asyncio.fixture
|
|
async def enterprise_user(async_session):
|
|
uid = str(uuid.uuid4())
|
|
user = _make_user(user_id=uid, email="enterprise@example.com", plan="enterprise")
|
|
async_session.add(user)
|
|
await async_session.commit()
|
|
await async_session.refresh(user)
|
|
return user
|
|
|
|
|
|
@pytest_asyncio.fixture
|
|
async def client_with_free_user(async_session, free_user):
|
|
async def override_get_db():
|
|
yield async_session
|
|
|
|
async def override_get_current_user():
|
|
return free_user
|
|
|
|
app.dependency_overrides[get_db] = override_get_db
|
|
app.dependency_overrides[get_current_user] = override_get_current_user
|
|
|
|
transport = ASGITransport(app=app)
|
|
async with AsyncClient(transport=transport, base_url="http://test") as client:
|
|
yield client
|
|
|
|
app.dependency_overrides.clear()
|
|
|
|
|
|
@pytest_asyncio.fixture
|
|
async def client_with_pro_user(async_session, pro_user):
|
|
async def override_get_db():
|
|
yield async_session
|
|
|
|
async def override_get_current_user():
|
|
return pro_user
|
|
|
|
app.dependency_overrides[get_db] = override_get_db
|
|
app.dependency_overrides[get_current_user] = override_get_current_user
|
|
|
|
transport = ASGITransport(app=app)
|
|
async with AsyncClient(transport=transport, base_url="http://test") as client:
|
|
yield client
|
|
|
|
app.dependency_overrides.clear()
|
|
|
|
|
|
@pytest_asyncio.fixture
|
|
async def client_with_enterprise_user(async_session, enterprise_user):
|
|
async def override_get_db():
|
|
yield async_session
|
|
|
|
async def override_get_current_user():
|
|
return enterprise_user
|
|
|
|
app.dependency_overrides[get_db] = override_get_db
|
|
app.dependency_overrides[get_current_user] = override_get_current_user
|
|
|
|
transport = ASGITransport(app=app)
|
|
async with AsyncClient(transport=transport, base_url="http://test") as client:
|
|
yield client
|
|
|
|
app.dependency_overrides.clear()
|
|
|
|
|
|
class TestCreatePaymentOrder:
|
|
@pytest.mark.asyncio
|
|
async def test_create_order_returns_201_with_order_id_and_pay_url(self, client_with_free_user):
|
|
response = await client_with_free_user.post(
|
|
"/api/v1/payments/orders",
|
|
json={"plan": "starter", "payment_provider": "wechat"},
|
|
)
|
|
assert response.status_code == 201
|
|
data = response.json()
|
|
assert "order_id" in data
|
|
assert "pay_url" in data
|
|
assert data["amount"] == 199
|
|
assert data["status"] == "pending"
|
|
assert data["currency"] == "CNY"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_order_with_invalid_plan_returns_422(self, client_with_free_user):
|
|
response = await client_with_free_user.post(
|
|
"/api/v1/payments/orders",
|
|
json={"plan": "invalid_plan", "payment_provider": "wechat"},
|
|
)
|
|
assert response.status_code == 422
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_order_for_free_plan_returns_400(self, client_with_free_user):
|
|
response = await client_with_free_user.post(
|
|
"/api/v1/payments/orders",
|
|
json={"plan": "free", "payment_provider": "wechat"},
|
|
)
|
|
assert response.status_code == 400
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_order_pro_plan(self, client_with_free_user):
|
|
response = await client_with_free_user.post(
|
|
"/api/v1/payments/orders",
|
|
json={"plan": "pro", "payment_provider": "alipay"},
|
|
)
|
|
assert response.status_code == 201
|
|
data = response.json()
|
|
assert data["amount"] == 599
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_order_enterprise_plan(self, client_with_free_user):
|
|
response = await client_with_free_user.post(
|
|
"/api/v1/payments/orders",
|
|
json={"plan": "enterprise", "payment_provider": "wechat"},
|
|
)
|
|
assert response.status_code == 201
|
|
data = response.json()
|
|
assert data["amount"] == 1999
|
|
|
|
|
|
class TestWechatCallback:
|
|
@pytest.mark.asyncio
|
|
async def test_wechat_callback_updates_order_and_activates_subscription(
|
|
self, async_session, free_user
|
|
):
|
|
order_id = uuid.uuid4()
|
|
order = PaymentOrderModel(
|
|
id=order_id,
|
|
user_id=free_user.id,
|
|
plan="starter",
|
|
amount=199,
|
|
payment_provider="wechat",
|
|
status="pending",
|
|
pay_url=f"mock://pay/{order_id}",
|
|
)
|
|
async_session.add(order)
|
|
await async_session.commit()
|
|
|
|
callback = PaymentCallback(
|
|
order_id=str(order_id),
|
|
payment_id=f"wx_pay_{order_id}",
|
|
amount=199,
|
|
status="success",
|
|
raw_data={"out_trade_no": str(order_id), "result_code": "SUCCESS"},
|
|
)
|
|
|
|
from app.api.payments import _process_callback
|
|
result = await _process_callback(async_session, callback, "wechat")
|
|
await async_session.commit()
|
|
|
|
await async_session.refresh(order)
|
|
assert order.status == "paid"
|
|
assert order.payment_id is not None
|
|
|
|
|
|
class TestAlipayCallback:
|
|
@pytest.mark.asyncio
|
|
async def test_alipay_callback_updates_order_and_activates_subscription(
|
|
self, async_session, free_user
|
|
):
|
|
order_id = uuid.uuid4()
|
|
order = PaymentOrderModel(
|
|
id=order_id,
|
|
user_id=free_user.id,
|
|
plan="pro",
|
|
amount=599,
|
|
payment_provider="alipay",
|
|
status="pending",
|
|
pay_url=f"mock://pay/{order_id}",
|
|
)
|
|
async_session.add(order)
|
|
await async_session.commit()
|
|
|
|
callback = PaymentCallback(
|
|
order_id=str(order_id),
|
|
payment_id=f"ali_pay_{order_id}",
|
|
amount=599,
|
|
status="success",
|
|
raw_data={"out_trade_no": str(order_id), "trade_status": "TRADE_SUCCESS"},
|
|
)
|
|
|
|
from app.api.payments import _process_callback
|
|
result = await _process_callback(async_session, callback, "alipay")
|
|
await async_session.commit()
|
|
|
|
await async_session.refresh(order)
|
|
assert order.status == "paid"
|
|
assert order.payment_id is not None
|
|
|
|
|
|
class TestQueryOrder:
|
|
@pytest.mark.asyncio
|
|
async def test_query_order_returns_current_status(self, client_with_free_user, async_session, free_user):
|
|
order_id = uuid.uuid4()
|
|
order = PaymentOrderModel(
|
|
id=order_id,
|
|
user_id=free_user.id,
|
|
plan="starter",
|
|
amount=199,
|
|
payment_provider="wechat",
|
|
status="pending",
|
|
pay_url=f"mock://pay/{order_id}",
|
|
)
|
|
async_session.add(order)
|
|
await async_session.commit()
|
|
|
|
response = await client_with_free_user.get(f"/api/v1/payments/orders/{order_id}")
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["order_id"] == str(order_id)
|
|
assert data["status"] == "pending"
|
|
assert data["plan"] == "starter"
|
|
assert data["amount"] == 199
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_query_nonexistent_order_returns_404(self, client_with_free_user):
|
|
fake_id = uuid.uuid4()
|
|
response = await client_with_free_user.get(f"/api/v1/payments/orders/{fake_id}")
|
|
assert response.status_code == 404
|
|
|
|
|
|
class TestSubscriptionEnforcement:
|
|
@pytest.mark.asyncio
|
|
async def test_free_user_accessing_pro_only_endpoint_gets_403(self, client_with_free_user):
|
|
test_app = FastAPI()
|
|
|
|
@test_app.get("/test/pro-only")
|
|
async def pro_only(user: User = Depends(SubscriptionEnforcement.require_plan("pro", "enterprise"))):
|
|
return {"message": "ok"}
|
|
|
|
async def override_get_current_user():
|
|
return _make_user(plan="free")
|
|
|
|
test_app.dependency_overrides[get_current_user] = override_get_current_user
|
|
|
|
transport = ASGITransport(app=test_app)
|
|
async with AsyncClient(transport=transport, base_url="http://test") as client:
|
|
response = await client.get("/test/pro-only")
|
|
|
|
assert response.status_code == 403
|
|
data = response.json()
|
|
assert data["detail"]["required_plan"] == "pro"
|
|
assert data["detail"]["current_plan"] == "free"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_pro_user_accessing_pro_only_endpoint_succeeds(self, client_with_pro_user):
|
|
test_app = FastAPI()
|
|
|
|
@test_app.get("/test/pro-only-2")
|
|
async def pro_only(user: User = Depends(SubscriptionEnforcement.require_plan("pro", "enterprise"))):
|
|
return {"message": "ok"}
|
|
|
|
async def override_get_current_user():
|
|
return _make_user(plan="pro")
|
|
|
|
test_app.dependency_overrides[get_current_user] = override_get_current_user
|
|
|
|
transport = ASGITransport(app=test_app)
|
|
async with AsyncClient(transport=transport, base_url="http://test") as client:
|
|
response = await client.get("/test/pro-only-2")
|
|
|
|
assert response.status_code == 200
|
|
|
|
|
|
class TestQuotaCheck:
|
|
@pytest.mark.asyncio
|
|
async def test_quota_check_returns_remaining_usage_info(self, client_with_free_user):
|
|
test_app = FastAPI()
|
|
|
|
@test_app.get("/test/quota")
|
|
async def check_quota(quota=Depends(SubscriptionEnforcement.check_quota("queries"))):
|
|
return quota
|
|
|
|
async def override_get_current_user():
|
|
return _make_user(plan="free")
|
|
|
|
async def override_get_db():
|
|
yield AsyncMock()
|
|
|
|
test_app.dependency_overrides[get_current_user] = override_get_current_user
|
|
test_app.dependency_overrides[get_db] = override_get_db
|
|
|
|
transport = ASGITransport(app=test_app)
|
|
async with AsyncClient(transport=transport, base_url="http://test") as client:
|
|
response = await client.get("/test/quota")
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["resource"] == "queries"
|
|
assert data["plan"] == "free"
|
|
assert "remaining" in data
|