331 lines
12 KiB
Python
331 lines
12 KiB
Python
"""Tests for the async recalc pipeline (U3).
|
|
|
|
Requires PostgreSQL — marked ``postgres``. Tests the full pipeline:
|
|
record write → recalc enqueue → worker processing → formula value written.
|
|
|
|
Also covers: crash recovery, deduplication, and error handling.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
|
|
import pytest
|
|
|
|
from agentkit.bitable.models import FieldOwner, FieldType, RecalcStatus
|
|
from agentkit.bitable.recalc_worker import RecalcWorker
|
|
from agentkit.bitable.service import BitableService
|
|
|
|
pytestmark = pytest.mark.postgres
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helper: process all pending recalc tasks synchronously
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
async def _process_all_pending(service: BitableService) -> None:
|
|
"""Process all pending recalc tasks (for testing without background worker)."""
|
|
tasks = await service.get_pending_recalc_tasks(limit=100)
|
|
for task in tasks:
|
|
await service.process_recalc_task(task)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Happy path: formula recalc after record write
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
async def test_recalc_simple_formula_after_create(bitable_service: BitableService) -> None:
|
|
"""Create a record with data → formula field gets recalculated."""
|
|
table = await bitable_service.create_table(name="T")
|
|
src_field = await bitable_service.create_field(
|
|
table_id=table.id, name="src", field_type=FieldType.number, owner=FieldOwner.agent
|
|
)
|
|
calc_field = await bitable_service.create_field(
|
|
table_id=table.id,
|
|
name="calc",
|
|
field_type=FieldType.formula,
|
|
config={"formula_expr": f"={{{src_field.id}}} * 2"},
|
|
)
|
|
|
|
# Create a record — should trigger recalc
|
|
record = await bitable_service.create_record(table_id=table.id, values={src_field.id: 21})
|
|
|
|
# Process pending recalc tasks
|
|
await _process_all_pending(bitable_service)
|
|
|
|
# Verify formula result was written
|
|
updated = await bitable_service.get_record(record.id)
|
|
assert updated is not None
|
|
assert updated.values[calc_field.id] == 42
|
|
|
|
|
|
async def test_recalc_aggregate_formula(bitable_service: BitableService) -> None:
|
|
"""SUM aggregate formula recalculates correctly across all records."""
|
|
table = await bitable_service.create_table(name="T")
|
|
src_field = await bitable_service.create_field(
|
|
table_id=table.id, name="amt", field_type=FieldType.number, owner=FieldOwner.agent
|
|
)
|
|
total_field = await bitable_service.create_field(
|
|
table_id=table.id,
|
|
name="total",
|
|
field_type=FieldType.formula,
|
|
config={"formula_expr": f"=SUM({{{src_field.id}}})"},
|
|
)
|
|
|
|
# Create multiple records
|
|
for amt in [10, 20, 30]:
|
|
await bitable_service.create_record(table_id=table.id, values={src_field.id: amt})
|
|
|
|
# Process all pending recalc tasks
|
|
await _process_all_pending(bitable_service)
|
|
|
|
# Each record's total field should be 60 (sum of all)
|
|
records, _ = await bitable_service.list_records(table.id)
|
|
assert len(records) == 3
|
|
for rec in records:
|
|
assert rec.values[total_field.id] == 60
|
|
|
|
|
|
async def test_recalc_after_upsert(bitable_service: BitableService) -> None:
|
|
"""Upsert triggers recalc for affected formula fields."""
|
|
table = await bitable_service.create_table(name="T")
|
|
pk_field = await bitable_service.create_field(
|
|
table_id=table.id, name="id", field_type=FieldType.text, owner=FieldOwner.agent
|
|
)
|
|
data_field = await bitable_service.create_field(
|
|
table_id=table.id, name="data", field_type=FieldType.number, owner=FieldOwner.agent
|
|
)
|
|
calc_field = await bitable_service.create_field(
|
|
table_id=table.id,
|
|
name="doubled",
|
|
field_type=FieldType.formula,
|
|
config={"formula_expr": f"={{{data_field.id}}} * 2"},
|
|
)
|
|
await bitable_service.update_table(table.id, primary_key_field_id=pk_field.id)
|
|
|
|
# Upsert a record
|
|
await bitable_service.upsert_records(
|
|
table.id,
|
|
[{pk_field.id: "r1", data_field.id: 15}],
|
|
pk_field.id,
|
|
)
|
|
|
|
# Process recalc
|
|
await _process_all_pending(bitable_service)
|
|
|
|
# Verify formula result
|
|
records, _ = await bitable_service.list_records(table.id)
|
|
assert len(records) == 1
|
|
assert records[0].values[calc_field.id] == 30
|
|
|
|
|
|
async def test_recalc_formula_chain(bitable_service: BitableService) -> None:
|
|
"""Formula-to-formula dependency: c = b*2, b = a*2 → c = a*4."""
|
|
table = await bitable_service.create_table(name="T")
|
|
src = await bitable_service.create_field(
|
|
table_id=table.id, name="a", field_type=FieldType.number, owner=FieldOwner.agent
|
|
)
|
|
mid = await bitable_service.create_field(
|
|
table_id=table.id,
|
|
name="b",
|
|
field_type=FieldType.formula,
|
|
config={"formula_expr": f"={{{src.id}}} * 2"},
|
|
)
|
|
top = await bitable_service.create_field(
|
|
table_id=table.id,
|
|
name="c",
|
|
field_type=FieldType.formula,
|
|
config={"formula_expr": f"={{{mid.id}}} * 2"},
|
|
)
|
|
|
|
await bitable_service.create_record(table_id=table.id, values={src.id: 5})
|
|
|
|
# Process recalc — may need multiple passes for formula chains
|
|
# ponytail: The current implementation processes tasks in queue order, not
|
|
# topological order. For formula chains, we may need to process twice:
|
|
# first pass computes b, second pass computes c (which depends on b).
|
|
await _process_all_pending(bitable_service)
|
|
await _process_all_pending(bitable_service)
|
|
|
|
records, _ = await bitable_service.list_records(table.id)
|
|
assert len(records) == 1
|
|
rec = records[0]
|
|
assert rec.values[mid.id] == 10 # 5 * 2
|
|
assert rec.values[top.id] == 20 # 10 * 2
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Crash recovery
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
async def test_crash_recovery_resets_calculating_tasks(
|
|
bitable_service: BitableService,
|
|
) -> None:
|
|
"""Stale 'calculating' tasks are reset to 'pending' on worker start."""
|
|
table = await bitable_service.create_table(name="T")
|
|
src = await bitable_service.create_field(
|
|
table_id=table.id, name="s", field_type=FieldType.number, owner=FieldOwner.agent
|
|
)
|
|
calc = await bitable_service.create_field(
|
|
table_id=table.id,
|
|
name="c",
|
|
field_type=FieldType.formula,
|
|
config={"formula_expr": f"={{{src.id}}} + 1"},
|
|
)
|
|
|
|
record = await bitable_service.create_record(table_id=table.id, values={src.id: 10})
|
|
|
|
# create_record already enqueued a recalc task — get it from pending
|
|
tasks = await bitable_service.get_pending_recalc_tasks()
|
|
assert len(tasks) == 1
|
|
task = tasks[0]
|
|
|
|
from agentkit.bitable.repository import BitableRepository
|
|
|
|
repo = BitableRepository(bitable_service._db)
|
|
await repo.update_recalc_status(task.id, RecalcStatus.calculating)
|
|
|
|
# Verify it's stuck in calculating
|
|
tasks = await bitable_service.get_pending_recalc_tasks()
|
|
assert len(tasks) == 0 # not pending, it's calculating
|
|
|
|
# Crash recovery
|
|
reset_count = await bitable_service.reset_stale_recalc_tasks()
|
|
assert reset_count == 1
|
|
|
|
# Now it should be pending again
|
|
tasks = await bitable_service.get_pending_recalc_tasks()
|
|
assert len(tasks) == 1
|
|
|
|
# Process it
|
|
await _process_all_pending(bitable_service)
|
|
|
|
# Verify result
|
|
rec = await bitable_service.get_record(record.id)
|
|
assert rec is not None
|
|
assert rec.values[calc.id] == 11
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Deduplication
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
async def test_recalc_deduplication(bitable_service: BitableService) -> None:
|
|
"""Same (record_id, field_id) enqueued twice → only one task in queue."""
|
|
table = await bitable_service.create_table(name="T")
|
|
src = await bitable_service.create_field(
|
|
table_id=table.id, name="s", field_type=FieldType.number, owner=FieldOwner.agent
|
|
)
|
|
calc = await bitable_service.create_field(
|
|
table_id=table.id,
|
|
name="c",
|
|
field_type=FieldType.formula,
|
|
config={"formula_expr": f"={{{src.id}}} + 1"},
|
|
)
|
|
|
|
record = await bitable_service.create_record(table_id=table.id, values={src.id: 10})
|
|
|
|
# The create_record already enqueued one task. Enqueue again manually.
|
|
task2 = await bitable_service.trigger_recalc(table.id, record.id, calc.id)
|
|
# Should return None (duplicate, ON CONFLICT DO NOTHING)
|
|
assert task2 is None
|
|
|
|
# Only one pending task
|
|
tasks = await bitable_service.get_pending_recalc_tasks()
|
|
assert len(tasks) == 1
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Error handling
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
async def test_recalc_error_marks_task_as_error(bitable_service: BitableService) -> None:
|
|
"""Formula with division by zero marks task as error."""
|
|
table = await bitable_service.create_table(name="T")
|
|
src = await bitable_service.create_field(
|
|
table_id=table.id, name="s", field_type=FieldType.number, owner=FieldOwner.agent
|
|
)
|
|
calc = await bitable_service.create_field(
|
|
table_id=table.id,
|
|
name="c",
|
|
field_type=FieldType.formula,
|
|
config={"formula_expr": f"={{{src.id}}} / 0"},
|
|
)
|
|
|
|
record = await bitable_service.create_record(table_id=table.id, values={src.id: 10})
|
|
|
|
# Process recalc — should fail with division by zero
|
|
await _process_all_pending(bitable_service)
|
|
|
|
# Verify task is marked as error
|
|
from sqlalchemy import text
|
|
|
|
db = bitable_service._db
|
|
async with db.session_factory() as session:
|
|
result = await session.execute(
|
|
text(
|
|
"SELECT status, error_message FROM bitable.bitable_recalc_queue "
|
|
"WHERE record_id = :rid AND field_id = :fid"
|
|
),
|
|
{"rid": record.id, "fid": calc.id},
|
|
)
|
|
row = result.fetchone()
|
|
assert row is not None
|
|
assert row[0] == RecalcStatus.error.value
|
|
assert "division" in row[1].lower() or "zero" in row[1].lower()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Worker lifecycle
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
async def test_worker_starts_and_stops(bitable_service: BitableService) -> None:
|
|
"""RecalcWorker starts and stops gracefully."""
|
|
worker = RecalcWorker(bitable_service._db, bitable_service, poll_interval=0.1)
|
|
await worker.start()
|
|
assert worker._task is not None
|
|
assert worker._reaper_task is not None
|
|
|
|
# Let it run briefly
|
|
await asyncio.sleep(0.2)
|
|
|
|
await worker.stop()
|
|
assert worker._task is None
|
|
assert worker._reaper_task is None
|
|
|
|
|
|
async def test_worker_processes_tasks(bitable_service: BitableService) -> None:
|
|
"""Background worker picks up and processes recalc tasks."""
|
|
table = await bitable_service.create_table(name="T")
|
|
src = await bitable_service.create_field(
|
|
table_id=table.id, name="s", field_type=FieldType.number, owner=FieldOwner.agent
|
|
)
|
|
calc = await bitable_service.create_field(
|
|
table_id=table.id,
|
|
name="c",
|
|
field_type=FieldType.formula,
|
|
config={"formula_expr": f"={{{src.id}}} + 100"},
|
|
)
|
|
|
|
record = await bitable_service.create_record(table_id=table.id, values={src.id: 5})
|
|
|
|
# Start worker — it should pick up the pending task
|
|
worker = RecalcWorker(bitable_service._db, bitable_service, poll_interval=0.1)
|
|
await worker.start()
|
|
|
|
# Wait for worker to process
|
|
await asyncio.sleep(1.0)
|
|
|
|
await worker.stop()
|
|
|
|
# Verify formula was computed
|
|
rec = await bitable_service.get_record(record.id)
|
|
assert rec is not None
|
|
assert rec.values[calc.id] == 105
|