755 lines
25 KiB
Python
755 lines
25 KiB
Python
"""Tests for Workflow API routes"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import pytest
|
|
from fastapi.testclient import TestClient
|
|
|
|
from agentkit.llm.gateway import LLMGateway
|
|
from agentkit.server.app import create_app
|
|
from agentkit.server.routes.workflows import WorkflowStore, router as workflow_router
|
|
from agentkit.skills.registry import SkillRegistry
|
|
from agentkit.tools.registry import ToolRegistry
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fixtures
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_llm_gateway():
|
|
return LLMGateway()
|
|
|
|
|
|
@pytest.fixture
|
|
def skill_registry():
|
|
return SkillRegistry()
|
|
|
|
|
|
@pytest.fixture
|
|
def tool_registry():
|
|
return ToolRegistry()
|
|
|
|
|
|
@pytest.fixture
|
|
def app(mock_llm_gateway, skill_registry, tool_registry):
|
|
application = create_app(
|
|
llm_gateway=mock_llm_gateway,
|
|
skill_registry=skill_registry,
|
|
tool_registry=tool_registry,
|
|
)
|
|
# Register workflow routes (not yet in app.py)
|
|
application.include_router(workflow_router, prefix="/api/v1")
|
|
# Attach a fresh WorkflowStore to app state for isolation
|
|
application.state.workflow_store = WorkflowStore()
|
|
return application
|
|
|
|
|
|
@pytest.fixture
|
|
def client(app):
|
|
return TestClient(app)
|
|
|
|
|
|
def _create_workflow(client, name: str = "测试工作流", stages: list | None = None):
|
|
"""Helper to create a workflow via API."""
|
|
body: dict = {"name": name}
|
|
if stages is not None:
|
|
body["stages"] = stages
|
|
response = client.post("/api/v1/workflows", json=body)
|
|
return response
|
|
|
|
|
|
def _sample_stages():
|
|
"""Return a list of sample workflow stages."""
|
|
return [
|
|
{
|
|
"name": "开始",
|
|
"agent": "default",
|
|
"action": "start",
|
|
"depends_on": [],
|
|
"inputs": {},
|
|
"outputs": [],
|
|
"timeout_seconds": 300,
|
|
"retry_count": 0,
|
|
"continue_on_failure": False,
|
|
"condition": None,
|
|
"type": "skill",
|
|
"config": {},
|
|
},
|
|
{
|
|
"name": "检查条件",
|
|
"agent": "default",
|
|
"action": "evaluate",
|
|
"depends_on": ["开始"],
|
|
"inputs": {},
|
|
"outputs": [],
|
|
"timeout_seconds": 60,
|
|
"retry_count": 0,
|
|
"continue_on_failure": False,
|
|
"condition": None,
|
|
"type": "condition",
|
|
"config": {"expression": "status == 'approved'"},
|
|
},
|
|
{
|
|
"name": "人工审批",
|
|
"agent": "default",
|
|
"action": "approve",
|
|
"depends_on": ["检查条件"],
|
|
"inputs": {},
|
|
"outputs": [],
|
|
"timeout_seconds": 3600,
|
|
"retry_count": 0,
|
|
"continue_on_failure": False,
|
|
"condition": None,
|
|
"type": "approval",
|
|
"config": {"approver": "admin", "timeout": 3600},
|
|
},
|
|
]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# WorkflowStore unit tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestWorkflowStore:
|
|
def test_save_and_get(self):
|
|
from agentkit.orchestrator.workflow_schema import WorkflowDefinition
|
|
|
|
store = WorkflowStore()
|
|
wf = WorkflowDefinition(workflow_id="test-1", name="Test")
|
|
store.save(wf)
|
|
result = store.get("test-1")
|
|
assert result is not None
|
|
assert result.name == "Test"
|
|
|
|
def test_get_not_found(self):
|
|
store = WorkflowStore()
|
|
assert store.get("nonexistent") is None
|
|
|
|
def test_list(self):
|
|
from agentkit.orchestrator.workflow_schema import WorkflowDefinition
|
|
|
|
store = WorkflowStore()
|
|
for i in range(3):
|
|
store.save(WorkflowDefinition(workflow_id=f"wf-{i}", name=f"Workflow {i}"))
|
|
summaries = store.list()
|
|
assert len(summaries) == 3
|
|
|
|
def test_list_limit(self):
|
|
from agentkit.orchestrator.workflow_schema import WorkflowDefinition
|
|
|
|
store = WorkflowStore()
|
|
for i in range(5):
|
|
store.save(WorkflowDefinition(workflow_id=f"wf-{i}", name=f"Workflow {i}"))
|
|
summaries = store.list(limit=2)
|
|
assert len(summaries) == 2
|
|
|
|
def test_delete(self):
|
|
from agentkit.orchestrator.workflow_schema import WorkflowDefinition
|
|
|
|
store = WorkflowStore()
|
|
store.save(WorkflowDefinition(workflow_id="del-1", name="Delete Me"))
|
|
assert store.delete("del-1") is True
|
|
assert store.get("del-1") is None
|
|
|
|
def test_delete_not_found(self):
|
|
store = WorkflowStore()
|
|
assert store.delete("nonexistent") is False
|
|
|
|
def test_create_and_get_execution(self):
|
|
store = WorkflowStore()
|
|
execution = store.create_execution("wf-1")
|
|
assert execution.workflow_id == "wf-1"
|
|
assert execution.status == "pending"
|
|
|
|
fetched = store.get_execution(execution.execution_id)
|
|
assert fetched is not None
|
|
assert fetched.execution_id == execution.execution_id
|
|
|
|
def test_get_execution_not_found(self):
|
|
store = WorkflowStore()
|
|
assert store.get_execution("nonexistent") is None
|
|
|
|
def test_update_execution(self):
|
|
store = WorkflowStore()
|
|
execution = store.create_execution("wf-1")
|
|
updated = store.update_execution(
|
|
execution.execution_id, status="running", current_stage="step-1"
|
|
)
|
|
assert updated.status == "running"
|
|
assert updated.current_stage == "step-1"
|
|
|
|
def test_update_execution_not_found(self):
|
|
store = WorkflowStore()
|
|
with pytest.raises(KeyError):
|
|
store.update_execution("nonexistent", status="running")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# POST /workflows - Create
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestCreateWorkflow:
|
|
def test_create_empty_workflow(self, client):
|
|
response = _create_workflow(client)
|
|
assert response.status_code == 201
|
|
data = response.json()
|
|
assert data["name"] == "测试工作流"
|
|
assert data["workflow_id"] is not None
|
|
assert data["version"] == 1
|
|
|
|
def test_create_workflow_with_stages(self, client):
|
|
response = _create_workflow(client, stages=_sample_stages())
|
|
assert response.status_code == 201
|
|
data = response.json()
|
|
assert len(data["stages"]) == 3
|
|
assert data["stages"][0]["type"] == "skill"
|
|
assert data["stages"][1]["type"] == "condition"
|
|
assert data["stages"][2]["type"] == "approval"
|
|
|
|
def test_create_workflow_missing_dependency(self, client):
|
|
stages = [
|
|
{
|
|
"name": "步骤B",
|
|
"agent": "default",
|
|
"action": "do_b",
|
|
"depends_on": ["不存在的步骤"],
|
|
"inputs": {},
|
|
"outputs": [],
|
|
"timeout_seconds": 300,
|
|
"retry_count": 0,
|
|
"continue_on_failure": False,
|
|
"condition": None,
|
|
"type": "skill",
|
|
"config": {},
|
|
}
|
|
]
|
|
response = _create_workflow(client, stages=stages)
|
|
assert response.status_code == 400
|
|
assert "不存在" in response.json()["detail"]
|
|
|
|
def test_create_workflow_circular_dependency(self, client):
|
|
stages = [
|
|
{
|
|
"name": "A",
|
|
"agent": "default",
|
|
"action": "do_a",
|
|
"depends_on": ["B"],
|
|
"inputs": {},
|
|
"outputs": [],
|
|
"timeout_seconds": 300,
|
|
"retry_count": 0,
|
|
"continue_on_failure": False,
|
|
"condition": None,
|
|
"type": "skill",
|
|
"config": {},
|
|
},
|
|
{
|
|
"name": "B",
|
|
"agent": "default",
|
|
"action": "do_b",
|
|
"depends_on": ["A"],
|
|
"inputs": {},
|
|
"outputs": [],
|
|
"timeout_seconds": 300,
|
|
"retry_count": 0,
|
|
"continue_on_failure": False,
|
|
"condition": None,
|
|
"type": "skill",
|
|
"config": {},
|
|
},
|
|
]
|
|
response = _create_workflow(client, stages=stages)
|
|
assert response.status_code == 400
|
|
assert "循环依赖" in response.json()["detail"]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# GET /workflows - List
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestListWorkflows:
|
|
def test_list_empty(self, client):
|
|
response = client.get("/api/v1/workflows")
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["workflows"] == []
|
|
assert data["total"] == 0
|
|
|
|
def test_list_after_create(self, client):
|
|
_create_workflow(client, "工作流1")
|
|
_create_workflow(client, "工作流2")
|
|
response = client.get("/api/v1/workflows")
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["total"] == 2
|
|
|
|
def test_list_with_limit(self, client):
|
|
for i in range(5):
|
|
_create_workflow(client, f"工作流{i}")
|
|
response = client.get("/api/v1/workflows?limit=2")
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert len(data["workflows"]) == 2
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# GET /workflows/{id} - Get
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestGetWorkflow:
|
|
def test_get_existing(self, client):
|
|
create_resp = _create_workflow(client, stages=_sample_stages())
|
|
workflow_id = create_resp.json()["workflow_id"]
|
|
|
|
response = client.get(f"/api/v1/workflows/{workflow_id}")
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["workflow_id"] == workflow_id
|
|
assert len(data["stages"]) == 3
|
|
|
|
def test_get_not_found(self, client):
|
|
response = client.get("/api/v1/workflows/nonexistent-id")
|
|
assert response.status_code == 404
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# PUT /workflows/{id} - Update
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestUpdateWorkflow:
|
|
def test_update_name(self, client):
|
|
create_resp = _create_workflow(client, "原始名称")
|
|
workflow_id = create_resp.json()["workflow_id"]
|
|
|
|
response = client.put(
|
|
f"/api/v1/workflows/{workflow_id}",
|
|
json={"name": "更新名称", "stages": []},
|
|
)
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["name"] == "更新名称"
|
|
assert data["version"] == 2
|
|
|
|
def test_update_with_stages(self, client):
|
|
create_resp = _create_workflow(client)
|
|
workflow_id = create_resp.json()["workflow_id"]
|
|
|
|
response = client.put(
|
|
f"/api/v1/workflows/{workflow_id}",
|
|
json={"name": "更新工作流", "stages": _sample_stages()},
|
|
)
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert len(data["stages"]) == 3
|
|
|
|
def test_update_not_found(self, client):
|
|
response = client.put(
|
|
"/api/v1/workflows/nonexistent-id",
|
|
json={"name": "不存在", "stages": []},
|
|
)
|
|
assert response.status_code == 404
|
|
|
|
def test_update_circular_dependency(self, client):
|
|
create_resp = _create_workflow(client)
|
|
workflow_id = create_resp.json()["workflow_id"]
|
|
|
|
stages = [
|
|
{
|
|
"name": "A",
|
|
"agent": "default",
|
|
"action": "do_a",
|
|
"depends_on": ["B"],
|
|
"inputs": {},
|
|
"outputs": [],
|
|
"timeout_seconds": 300,
|
|
"retry_count": 0,
|
|
"continue_on_failure": False,
|
|
"condition": None,
|
|
"type": "skill",
|
|
"config": {},
|
|
},
|
|
{
|
|
"name": "B",
|
|
"agent": "default",
|
|
"action": "do_b",
|
|
"depends_on": ["A"],
|
|
"inputs": {},
|
|
"outputs": [],
|
|
"timeout_seconds": 300,
|
|
"retry_count": 0,
|
|
"continue_on_failure": False,
|
|
"condition": None,
|
|
"type": "skill",
|
|
"config": {},
|
|
},
|
|
]
|
|
response = client.put(
|
|
f"/api/v1/workflows/{workflow_id}",
|
|
json={"name": "循环依赖", "stages": stages},
|
|
)
|
|
assert response.status_code == 400
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# DELETE /workflows/{id} - Delete
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestDeleteWorkflow:
|
|
def test_delete_existing(self, client):
|
|
create_resp = _create_workflow(client)
|
|
workflow_id = create_resp.json()["workflow_id"]
|
|
|
|
response = client.delete(f"/api/v1/workflows/{workflow_id}")
|
|
assert response.status_code == 200
|
|
|
|
# Verify deleted
|
|
get_resp = client.get(f"/api/v1/workflows/{workflow_id}")
|
|
assert get_resp.status_code == 404
|
|
|
|
def test_delete_not_found(self, client):
|
|
response = client.delete("/api/v1/workflows/nonexistent-id")
|
|
assert response.status_code == 404
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# POST /workflows/{id}/execute - Execute
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestExecuteWorkflow:
|
|
def test_execute_workflow(self, client):
|
|
create_resp = _create_workflow(client, stages=_sample_stages())
|
|
workflow_id = create_resp.json()["workflow_id"]
|
|
|
|
response = client.post(
|
|
f"/api/v1/workflows/{workflow_id}/execute",
|
|
json={"variables": {"status": "approved"}},
|
|
)
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["execution_id"] is not None
|
|
assert data["workflow_id"] == workflow_id
|
|
assert data["status"] in ("pending", "running")
|
|
|
|
def test_execute_not_found(self, client):
|
|
response = client.post(
|
|
"/api/v1/workflows/nonexistent-id/execute",
|
|
json={"variables": {}},
|
|
)
|
|
assert response.status_code == 404
|
|
|
|
def test_execute_empty_workflow(self, client):
|
|
create_resp = _create_workflow(client)
|
|
workflow_id = create_resp.json()["workflow_id"]
|
|
|
|
response = client.post(
|
|
f"/api/v1/workflows/{workflow_id}/execute",
|
|
json={"variables": {}},
|
|
)
|
|
assert response.status_code == 200
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# GET /workflows/executions/{id} - Execution status
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestGetExecution:
|
|
def test_get_execution_status(self, client):
|
|
import time
|
|
|
|
create_resp = _create_workflow(client, stages=_sample_stages())
|
|
workflow_id = create_resp.json()["workflow_id"]
|
|
|
|
exec_resp = client.post(
|
|
f"/api/v1/workflows/{workflow_id}/execute",
|
|
json={"variables": {}},
|
|
)
|
|
execution_id = exec_resp.json()["execution_id"]
|
|
|
|
# Wait a bit for execution to progress
|
|
time.sleep(0.5)
|
|
|
|
response = client.get(f"/api/v1/workflows/executions/{execution_id}")
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["execution_id"] == execution_id
|
|
assert data["status"] in ("pending", "running", "completed", "paused")
|
|
|
|
def test_get_execution_not_found(self, client):
|
|
response = client.get("/api/v1/workflows/executions/nonexistent-id")
|
|
assert response.status_code == 404
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# POST /workflows/executions/{id}/approve - Approval
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestApproveExecution:
|
|
def test_approve_not_paused(self, client):
|
|
create_resp = _create_workflow(client, stages=_sample_stages())
|
|
workflow_id = create_resp.json()["workflow_id"]
|
|
|
|
exec_resp = client.post(
|
|
f"/api/v1/workflows/{workflow_id}/execute",
|
|
json={"variables": {}},
|
|
)
|
|
execution_id = exec_resp.json()["execution_id"]
|
|
|
|
# Try to approve when not paused (may already be completed)
|
|
response = client.post(
|
|
f"/api/v1/workflows/executions/{execution_id}/approve",
|
|
json={"approved": True, "comment": "同意"},
|
|
)
|
|
# Should be 400 if not paused, or 200 if already completed
|
|
assert response.status_code in (200, 400)
|
|
|
|
def test_approve_not_found(self, client):
|
|
response = client.post(
|
|
"/api/v1/workflows/executions/nonexistent-id/approve",
|
|
json={"approved": True},
|
|
)
|
|
assert response.status_code == 404
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# POST /workflows/executions/{id}/cancel - Cancel
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestCancelExecution:
|
|
def test_cancel_execution(self, client):
|
|
create_resp = _create_workflow(client, stages=_sample_stages())
|
|
workflow_id = create_resp.json()["workflow_id"]
|
|
|
|
exec_resp = client.post(
|
|
f"/api/v1/workflows/{workflow_id}/execute",
|
|
json={"variables": {}},
|
|
)
|
|
execution_id = exec_resp.json()["execution_id"]
|
|
|
|
# Try to cancel
|
|
response = client.post(
|
|
f"/api/v1/workflows/executions/{execution_id}/cancel"
|
|
)
|
|
# May be 200 (cancelled) or 400 (already completed)
|
|
assert response.status_code in (200, 400)
|
|
|
|
def test_cancel_not_found(self, client):
|
|
response = client.post(
|
|
"/api/v1/workflows/executions/nonexistent-id/cancel"
|
|
)
|
|
assert response.status_code == 404
|
|
|
|
def test_cancel_completed_execution(self, client):
|
|
import time
|
|
|
|
# Create a workflow with stages that will complete
|
|
create_resp = _create_workflow(client, stages=_sample_stages())
|
|
workflow_id = create_resp.json()["workflow_id"]
|
|
|
|
exec_resp = client.post(
|
|
f"/api/v1/workflows/{workflow_id}/execute",
|
|
json={"variables": {}},
|
|
)
|
|
execution_id = exec_resp.json()["execution_id"]
|
|
|
|
# Wait for completion
|
|
time.sleep(2)
|
|
|
|
# Check the execution status first
|
|
status_resp = client.get(f"/api/v1/workflows/executions/{execution_id}")
|
|
exec_status = status_resp.json()["status"]
|
|
|
|
# Try to cancel - should fail if already completed
|
|
response = client.post(
|
|
f"/api/v1/workflows/executions/{execution_id}/cancel"
|
|
)
|
|
if exec_status in ("completed", "failed"):
|
|
assert response.status_code == 400
|
|
else:
|
|
# If still running/paused, cancel should succeed
|
|
assert response.status_code == 200
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Validation tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestValidation:
|
|
def test_missing_dependency_validation(self, client):
|
|
stages = [
|
|
{
|
|
"name": "步骤A",
|
|
"agent": "default",
|
|
"action": "do_a",
|
|
"depends_on": [],
|
|
"inputs": {},
|
|
"outputs": [],
|
|
"timeout_seconds": 300,
|
|
"retry_count": 0,
|
|
"continue_on_failure": False,
|
|
"condition": None,
|
|
"type": "skill",
|
|
"config": {},
|
|
},
|
|
{
|
|
"name": "步骤B",
|
|
"agent": "default",
|
|
"action": "do_b",
|
|
"depends_on": ["步骤C"], # C doesn't exist
|
|
"inputs": {},
|
|
"outputs": [],
|
|
"timeout_seconds": 300,
|
|
"retry_count": 0,
|
|
"continue_on_failure": False,
|
|
"condition": None,
|
|
"type": "skill",
|
|
"config": {},
|
|
},
|
|
]
|
|
response = _create_workflow(client, stages=stages)
|
|
assert response.status_code == 400
|
|
assert "不存在" in response.json()["detail"]
|
|
|
|
def test_circular_dependency_validation(self, client):
|
|
stages = [
|
|
{
|
|
"name": "X",
|
|
"agent": "default",
|
|
"action": "do_x",
|
|
"depends_on": ["Y"],
|
|
"inputs": {},
|
|
"outputs": [],
|
|
"timeout_seconds": 300,
|
|
"retry_count": 0,
|
|
"continue_on_failure": False,
|
|
"condition": None,
|
|
"type": "skill",
|
|
"config": {},
|
|
},
|
|
{
|
|
"name": "Y",
|
|
"agent": "default",
|
|
"action": "do_y",
|
|
"depends_on": ["X"],
|
|
"inputs": {},
|
|
"outputs": [],
|
|
"timeout_seconds": 300,
|
|
"retry_count": 0,
|
|
"continue_on_failure": False,
|
|
"condition": None,
|
|
"type": "skill",
|
|
"config": {},
|
|
},
|
|
]
|
|
response = _create_workflow(client, stages=stages)
|
|
assert response.status_code == 400
|
|
assert "循环依赖" in response.json()["detail"]
|
|
|
|
def test_valid_linear_workflow(self, client):
|
|
stages = [
|
|
{
|
|
"name": "步骤1",
|
|
"agent": "default",
|
|
"action": "do_1",
|
|
"depends_on": [],
|
|
"inputs": {},
|
|
"outputs": [],
|
|
"timeout_seconds": 300,
|
|
"retry_count": 0,
|
|
"continue_on_failure": False,
|
|
"condition": None,
|
|
"type": "skill",
|
|
"config": {},
|
|
},
|
|
{
|
|
"name": "步骤2",
|
|
"agent": "default",
|
|
"action": "do_2",
|
|
"depends_on": ["步骤1"],
|
|
"inputs": {},
|
|
"outputs": [],
|
|
"timeout_seconds": 300,
|
|
"retry_count": 0,
|
|
"continue_on_failure": False,
|
|
"condition": None,
|
|
"type": "skill",
|
|
"config": {},
|
|
},
|
|
]
|
|
response = _create_workflow(client, stages=stages)
|
|
assert response.status_code == 201
|
|
|
|
def test_valid_dag_workflow(self, client):
|
|
stages = [
|
|
{
|
|
"name": "开始",
|
|
"agent": "default",
|
|
"action": "start",
|
|
"depends_on": [],
|
|
"inputs": {},
|
|
"outputs": [],
|
|
"timeout_seconds": 300,
|
|
"retry_count": 0,
|
|
"continue_on_failure": False,
|
|
"condition": None,
|
|
"type": "skill",
|
|
"config": {},
|
|
},
|
|
{
|
|
"name": "并行A",
|
|
"agent": "default",
|
|
"action": "do_a",
|
|
"depends_on": ["开始"],
|
|
"inputs": {},
|
|
"outputs": [],
|
|
"timeout_seconds": 300,
|
|
"retry_count": 0,
|
|
"continue_on_failure": False,
|
|
"condition": None,
|
|
"type": "skill",
|
|
"config": {},
|
|
},
|
|
{
|
|
"name": "并行B",
|
|
"agent": "default",
|
|
"action": "do_b",
|
|
"depends_on": ["开始"],
|
|
"inputs": {},
|
|
"outputs": [],
|
|
"timeout_seconds": 300,
|
|
"retry_count": 0,
|
|
"continue_on_failure": False,
|
|
"condition": None,
|
|
"type": "skill",
|
|
"config": {},
|
|
},
|
|
{
|
|
"name": "合并",
|
|
"agent": "default",
|
|
"action": "merge",
|
|
"depends_on": ["并行A", "并行B"],
|
|
"inputs": {},
|
|
"outputs": [],
|
|
"timeout_seconds": 300,
|
|
"retry_count": 0,
|
|
"continue_on_failure": False,
|
|
"condition": None,
|
|
"type": "skill",
|
|
"config": {},
|
|
},
|
|
]
|
|
response = _create_workflow(client, stages=stages)
|
|
assert response.status_code == 201
|