761 lines
27 KiB
Python
761 lines
27 KiB
Python
"""Tests for U6: terminal three-layer whitelist + audit logging.
|
|
|
|
Covers:
|
|
- terminal_security.py: pattern matching, builtin whitelist, DB-backed
|
|
user whitelist, blocklist, audit log writing, top-level
|
|
check_command_safety_v2
|
|
- terminal_whitelist.py routes: user whitelist CRUD, blocklist CRUD,
|
|
audit log query
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
from fastapi import FastAPI
|
|
from httpx import ASGITransport, AsyncClient
|
|
|
|
from agentkit.server.auth.models import init_auth_db
|
|
from agentkit.server.auth.terminal_security import (
|
|
DECISION_BLOCKED,
|
|
DECISION_CONFIRMED,
|
|
DECISION_DENIED,
|
|
DECISION_EXECUTED,
|
|
_matches_any_pattern,
|
|
_matches_builtin_whitelist,
|
|
_matches_pattern,
|
|
check_command_safety_v2,
|
|
compute_whitelist_entry,
|
|
write_audit_log,
|
|
)
|
|
from agentkit.server.routes import terminal_whitelist
|
|
|
|
|
|
# ── Pattern matching unit tests ───────────────────────────────────────
|
|
|
|
|
|
class TestPatternMatching:
|
|
"""Test the pattern matching helpers."""
|
|
|
|
def test_matches_pattern_single_word_matches_binary(self):
|
|
assert _matches_pattern("ls -la", "ls") is True
|
|
assert _matches_pattern("git status", "git") is True
|
|
|
|
def test_matches_pattern_single_word_does_not_match_different_binary(self):
|
|
assert _matches_pattern("ls -la", "cat") is False
|
|
|
|
def test_matches_pattern_multi_word_prefix(self):
|
|
assert _matches_pattern("git push origin main", "git push") is True
|
|
assert _matches_pattern("npm install --save", "npm install") is True
|
|
|
|
def test_matches_pattern_multi_word_not_prefix(self):
|
|
assert _matches_pattern("git status", "git push") is False
|
|
|
|
def test_matches_pattern_case_insensitive(self):
|
|
assert _matches_pattern("LS -la", "ls") is True
|
|
assert _matches_pattern("Git Push", "git push") is True
|
|
|
|
def test_matches_pattern_empty_pattern_returns_false(self):
|
|
assert _matches_pattern("ls", "") is False
|
|
assert _matches_pattern("ls", " ") is False
|
|
|
|
def test_matches_pattern_unparseable_command(self):
|
|
# Unbalanced quote → shlex fails → binary is None → no match
|
|
assert _matches_pattern('echo "unbalanced', "echo") is False
|
|
|
|
def test_matches_any_pattern_returns_first_match(self):
|
|
matched, pattern = _matches_any_pattern("git push", ["ls", "git", "cat"])
|
|
assert matched is True
|
|
assert pattern == "git"
|
|
|
|
def test_matches_any_pattern_no_match(self):
|
|
matched, pattern = _matches_any_pattern("rm -rf /", ["ls", "git"])
|
|
assert matched is False
|
|
assert pattern is None
|
|
|
|
|
|
class TestBuiltinWhitelist:
|
|
"""Test the built-in safe whitelist matching."""
|
|
|
|
def test_builtin_whitelist_matches_ls(self):
|
|
assert _matches_builtin_whitelist("ls -la") is True
|
|
|
|
def test_builtin_whitelist_matches_cat(self):
|
|
assert _matches_builtin_whitelist("cat /etc/hosts") is True
|
|
|
|
def test_builtin_whitelist_matches_git_status(self):
|
|
assert _matches_builtin_whitelist("git status") is True
|
|
|
|
def test_builtin_whitelist_matches_docker_ps(self):
|
|
assert _matches_builtin_whitelist("docker ps") is True
|
|
|
|
def test_builtin_whitelist_does_not_match_rm(self):
|
|
assert _matches_builtin_whitelist("rm -rf /tmp/test") is False
|
|
|
|
def test_builtin_whitelist_does_not_match_unknown_command(self):
|
|
assert _matches_builtin_whitelist("somecmd --flag") is False
|
|
|
|
def test_builtin_whitelist_empty_command(self):
|
|
assert _matches_builtin_whitelist("") is False
|
|
|
|
def test_builtin_whitelist_case_insensitive(self):
|
|
assert _matches_builtin_whitelist("LS -LA") is True
|
|
|
|
|
|
class TestComputeWhitelistEntry:
|
|
"""Test the whitelist entry computation for confirmed commands."""
|
|
|
|
def test_compute_entry_for_safe_binary(self):
|
|
# Non-dangerous binary (not in _DANGEROUS_BINARY_FLAGS) → just the binary name
|
|
assert compute_whitelist_entry("ls -la /tmp") == "ls"
|
|
|
|
def test_compute_entry_for_dangerous_binary(self):
|
|
# Dangerous binary (in _DANGEROUS_BINARY_FLAGS) → None (never whitelisted,
|
|
# because whitelisting the binary name would bypass danger detection for
|
|
# dangerous flag variants like "rm -rf")
|
|
assert compute_whitelist_entry("rm -rf /tmp/test") is None
|
|
|
|
def test_compute_entry_for_binary_with_dangerous_variants(self):
|
|
# Binary in _DANGEROUS_BINARY_FLAGS (e.g. docker) → None even for safe
|
|
# invocations, because whitelisting "docker" would auto-execute "docker rm"
|
|
assert compute_whitelist_entry("docker ps") is None
|
|
|
|
def test_compute_entry_for_unparseable_command(self):
|
|
assert compute_whitelist_entry('echo "unbalanced') is None
|
|
|
|
def test_compute_entry_for_empty_command(self):
|
|
assert compute_whitelist_entry("") is None
|
|
|
|
|
|
# ── DB-backed tests (using temp SQLite) ───────────────────────────────
|
|
|
|
|
|
@pytest.fixture
|
|
async def temp_db_path(tmp_path: Path) -> Path:
|
|
"""Create a temporary auth DB with the terminal tables."""
|
|
db_path = tmp_path / "test_auth.db"
|
|
await init_auth_db(db_path)
|
|
return db_path
|
|
|
|
|
|
class TestCheckCommandSafetyV2:
|
|
"""Test the top-level three-layer whitelist check."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_builtin_whitelist_command_passes(self, temp_db_path: Path):
|
|
"""Built-in safe commands execute without confirmation."""
|
|
decision = await check_command_safety_v2(
|
|
"ls -la",
|
|
session_id="test-session",
|
|
session_whitelist=set(),
|
|
user_id="user-1",
|
|
db_path=temp_db_path,
|
|
)
|
|
assert decision.safe is True
|
|
assert decision.decision == DECISION_EXECUTED
|
|
assert decision.matched_layer == "builtin"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_dangerous_command_requires_confirmation(self, temp_db_path: Path):
|
|
"""Non-whitelisted dangerous commands require confirmation."""
|
|
decision = await check_command_safety_v2(
|
|
"rm -rf /tmp/test",
|
|
session_id="test-session",
|
|
session_whitelist=set(),
|
|
user_id="user-1",
|
|
db_path=temp_db_path,
|
|
)
|
|
assert decision.safe is False
|
|
assert decision.decision == DECISION_DENIED
|
|
assert decision.reason is not None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_session_whitelist_allows_command(self, temp_db_path: Path):
|
|
"""Session whitelist allows previously-confirmed commands."""
|
|
decision = await check_command_safety_v2(
|
|
"rm -rf /tmp/test",
|
|
session_id="test-session",
|
|
session_whitelist={"rm"}, # rm is in session whitelist
|
|
user_id="user-1",
|
|
db_path=temp_db_path,
|
|
)
|
|
assert decision.safe is True
|
|
assert decision.matched_layer == "session_whitelist"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_user_whitelist_allows_command(self, temp_db_path: Path):
|
|
"""User whitelist (DB-backed) allows commands."""
|
|
import aiosqlite
|
|
|
|
# Add a user whitelist entry
|
|
async with aiosqlite.connect(str(temp_db_path)) as db:
|
|
await db.execute(
|
|
"""INSERT INTO terminal_whitelist_user
|
|
(id, user_id, command_pattern, scope, created_at)
|
|
VALUES (?, ?, ?, 'user', ?)""",
|
|
("wl-1", "user-1", "rm", "2026-01-01T00:00:00Z"),
|
|
)
|
|
await db.commit()
|
|
|
|
decision = await check_command_safety_v2(
|
|
"rm -rf /tmp/test",
|
|
session_id="test-session",
|
|
session_whitelist=set(),
|
|
user_id="user-1",
|
|
db_path=temp_db_path,
|
|
)
|
|
assert decision.safe is True
|
|
assert decision.matched_layer == "user_whitelist"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_blocklist_rejects_command(self, temp_db_path: Path):
|
|
"""Blocklist rejects commands regardless of other whitelists."""
|
|
import aiosqlite
|
|
|
|
# Add a blocklist entry for "rm"
|
|
async with aiosqlite.connect(str(temp_db_path)) as db:
|
|
await db.execute(
|
|
"""INSERT INTO terminal_blocklist
|
|
(id, command_pattern, reason, created_at)
|
|
VALUES (?, ?, ?, ?)""",
|
|
("bl-1", "rm", "rm is blocked by admin", "2026-01-01T00:00:00Z"),
|
|
)
|
|
await db.commit()
|
|
|
|
# Even though "rm" is in session whitelist, blocklist takes priority
|
|
decision = await check_command_safety_v2(
|
|
"rm -rf /tmp/test",
|
|
session_id="test-session",
|
|
session_whitelist={"rm"}, # session whitelist has rm
|
|
user_id="user-1",
|
|
db_path=temp_db_path,
|
|
)
|
|
assert decision.safe is False
|
|
assert decision.decision == DECISION_BLOCKED
|
|
assert decision.matched_layer == "blocklist"
|
|
assert "rm is blocked by admin" in (decision.reason or "")
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_blocklist_takes_priority_over_builtin(self, temp_db_path: Path):
|
|
"""Blocklist rejects even built-in safe commands."""
|
|
import aiosqlite
|
|
|
|
async with aiosqlite.connect(str(temp_db_path)) as db:
|
|
await db.execute(
|
|
"""INSERT INTO terminal_blocklist
|
|
(id, command_pattern, reason, created_at)
|
|
VALUES (?, ?, ?, ?)""",
|
|
("bl-1", "ls", "ls is blocked", "2026-01-01T00:00:00Z"),
|
|
)
|
|
await db.commit()
|
|
|
|
decision = await check_command_safety_v2(
|
|
"ls -la",
|
|
session_id="test-session",
|
|
session_whitelist=set(),
|
|
user_id="user-1",
|
|
db_path=temp_db_path,
|
|
)
|
|
assert decision.safe is False
|
|
assert decision.decision == DECISION_BLOCKED
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_empty_command_denied(self, temp_db_path: Path):
|
|
"""Empty commands are denied."""
|
|
decision = await check_command_safety_v2(
|
|
" ",
|
|
session_id="test-session",
|
|
session_whitelist=set(),
|
|
user_id="user-1",
|
|
db_path=temp_db_path,
|
|
)
|
|
assert decision.safe is False
|
|
assert decision.decision == DECISION_DENIED
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_dev_mode_no_db(self):
|
|
"""When db_path is None, DB-backed layers are skipped."""
|
|
# A command that's not in builtin whitelist and is dangerous
|
|
decision = await check_command_safety_v2(
|
|
"rm -rf /tmp/test",
|
|
session_id="test-session",
|
|
session_whitelist=set(),
|
|
user_id=None,
|
|
db_path=None, # No DB
|
|
)
|
|
# Should fall through to danger detection
|
|
assert decision.safe is False
|
|
assert decision.decision == DECISION_DENIED
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_non_dangerous_non_whitelisted_command_passes(self, temp_db_path: Path):
|
|
"""Commands that pass the danger check (not in any whitelist) execute."""
|
|
# "man ls" is not in the builtin whitelist but is not dangerous
|
|
# per the _is_dangerous logic (man is not in _DANGEROUS_BINARIES)
|
|
# Actually, man is not in _SAFE_COMMAND_PREFIXES, so it would be
|
|
# considered dangerous by the default logic. Let's use a command
|
|
# that's definitely not dangerous but not in the whitelist.
|
|
# Looking at _is_single_command_dangerous: if binary is not in
|
|
# _SAFE_COMMAND_PREFIXES and not in _DANGEROUS_BINARIES, it returns
|
|
# True (dangerous). So any unknown binary is dangerous.
|
|
# This means the "danger_check_passed" layer is rarely reached.
|
|
# Let's test with a piped command where all parts are safe.
|
|
decision = await check_command_safety_v2(
|
|
"ls | grep test",
|
|
session_id="test-session",
|
|
session_whitelist=set(),
|
|
user_id="user-1",
|
|
db_path=temp_db_path,
|
|
)
|
|
# ls and grep are both in builtin whitelist, but the pipe makes it
|
|
# go through _is_dangerous which checks each part. Since both parts
|
|
# are safe, the whole command is safe.
|
|
assert decision.safe is True
|
|
|
|
|
|
class TestWriteAuditLog:
|
|
"""Test the audit log writer."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_write_audit_log_success(self, temp_db_path: Path):
|
|
"""Audit log entry is written to the DB."""
|
|
await write_audit_log(
|
|
temp_db_path,
|
|
user_id="user-1",
|
|
username="alice",
|
|
session_id="session-1",
|
|
command="ls -la",
|
|
decision=DECISION_EXECUTED,
|
|
reason="matched_layer=builtin",
|
|
cwd="/tmp",
|
|
exit_code=0,
|
|
terminal_mode="local",
|
|
)
|
|
|
|
import aiosqlite
|
|
|
|
async with aiosqlite.connect(str(temp_db_path)) as db:
|
|
db.row_factory = aiosqlite.Row
|
|
cursor = await db.execute(
|
|
"SELECT * FROM terminal_audit_logs WHERE session_id = ?",
|
|
("session-1",),
|
|
)
|
|
row = await cursor.fetchone()
|
|
|
|
assert row is not None
|
|
assert row["user_id"] == "user-1"
|
|
assert row["username"] == "alice"
|
|
assert row["command"] == "ls -la"
|
|
assert row["decision"] == DECISION_EXECUTED
|
|
assert row["exit_code"] == 0
|
|
assert row["terminal_mode"] == "local"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_write_audit_log_null_user(self, temp_db_path: Path):
|
|
"""Audit log accepts None user_id (dev mode)."""
|
|
await write_audit_log(
|
|
temp_db_path,
|
|
user_id=None,
|
|
username=None,
|
|
session_id="session-dev",
|
|
command="rm -rf /tmp",
|
|
decision=DECISION_CONFIRMED,
|
|
terminal_mode="local",
|
|
)
|
|
|
|
import aiosqlite
|
|
|
|
async with aiosqlite.connect(str(temp_db_path)) as db:
|
|
cursor = await db.execute(
|
|
"SELECT user_id, username FROM terminal_audit_logs WHERE session_id = ?",
|
|
("session-dev",),
|
|
)
|
|
row = await cursor.fetchone()
|
|
|
|
assert row is not None
|
|
assert row[0] is None
|
|
assert row[1] is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_write_audit_log_best_effort(self, tmp_path: Path):
|
|
"""Audit log failures are swallowed (never raise)."""
|
|
# Non-existent DB path — should not raise
|
|
await write_audit_log(
|
|
tmp_path / "nonexistent" / "auth.db", # Invalid path
|
|
user_id="user-1",
|
|
username="alice",
|
|
session_id="session-1",
|
|
command="ls",
|
|
decision=DECISION_EXECUTED,
|
|
)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_write_audit_log_truncates_long_command(self, temp_db_path: Path):
|
|
"""Commands longer than 4096 chars are truncated."""
|
|
long_command = "echo " + "a" * 5000
|
|
await write_audit_log(
|
|
temp_db_path,
|
|
user_id="user-1",
|
|
username="alice",
|
|
session_id="session-1",
|
|
command=long_command,
|
|
decision=DECISION_EXECUTED,
|
|
)
|
|
|
|
import aiosqlite
|
|
|
|
async with aiosqlite.connect(str(temp_db_path)) as db:
|
|
cursor = await db.execute(
|
|
"SELECT command FROM terminal_audit_logs WHERE session_id = ?",
|
|
("session-1",),
|
|
)
|
|
row = await cursor.fetchone()
|
|
|
|
assert row is not None
|
|
assert len(row[0]) <= 4096
|
|
|
|
|
|
# ── Whitelist management API tests ────────────────────────────────────
|
|
|
|
|
|
@pytest.fixture
|
|
async def whitelist_app(tmp_path: Path) -> FastAPI:
|
|
"""Build a FastAPI app with the whitelist routes + dev-admin middleware."""
|
|
db_path = tmp_path / "test_auth.db"
|
|
await init_auth_db(db_path)
|
|
|
|
app = FastAPI()
|
|
app.state.auth_db_path = str(db_path)
|
|
app.include_router(terminal_whitelist.router, prefix="/api/v1")
|
|
|
|
# Dev-admin middleware: injects an admin user so permission checks pass
|
|
@app.middleware("http")
|
|
async def _set_dev_admin_user(request, call_next):
|
|
request.state.current_user = {
|
|
"user_id": "dev-admin-id",
|
|
"username": "dev-admin",
|
|
"role": "admin",
|
|
"dev_mode": False, # Use real DB path
|
|
}
|
|
return await call_next(request)
|
|
|
|
return app
|
|
|
|
|
|
@pytest.fixture
|
|
async def whitelist_client(whitelist_app: FastAPI):
|
|
"""HTTP client for the whitelist app."""
|
|
transport = ASGITransport(app=whitelist_app)
|
|
async with AsyncClient(transport=transport, base_url="http://test") as client:
|
|
yield client
|
|
|
|
|
|
class TestUserWhitelistAPI:
|
|
"""Test the user whitelist CRUD endpoints."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_list_empty_user_whitelist(self, whitelist_client: AsyncClient):
|
|
resp = await whitelist_client.get("/api/v1/terminal/whitelist/user")
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["entries"] == []
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_add_user_whitelist_entry(self, whitelist_client: AsyncClient):
|
|
resp = await whitelist_client.post(
|
|
"/api/v1/terminal/whitelist/user",
|
|
json={"command_pattern": "docker"},
|
|
)
|
|
assert resp.status_code == 201
|
|
data = resp.json()
|
|
assert data["command_pattern"] == "docker"
|
|
assert data["scope"] == "user"
|
|
assert "id" in data
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_add_duplicate_user_whitelist_returns_409(
|
|
self, whitelist_client: AsyncClient
|
|
):
|
|
# Add once
|
|
await whitelist_client.post(
|
|
"/api/v1/terminal/whitelist/user",
|
|
json={"command_pattern": "git push"},
|
|
)
|
|
# Add again → 409
|
|
resp = await whitelist_client.post(
|
|
"/api/v1/terminal/whitelist/user",
|
|
json={"command_pattern": "git push"},
|
|
)
|
|
assert resp.status_code == 409
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_list_user_whitelist_after_add(self, whitelist_client: AsyncClient):
|
|
await whitelist_client.post(
|
|
"/api/v1/terminal/whitelist/user",
|
|
json={"command_pattern": "docker"},
|
|
)
|
|
await whitelist_client.post(
|
|
"/api/v1/terminal/whitelist/user",
|
|
json={"command_pattern": "git push"},
|
|
)
|
|
|
|
resp = await whitelist_client.get("/api/v1/terminal/whitelist/user")
|
|
assert resp.status_code == 200
|
|
entries = resp.json()["entries"]
|
|
assert len(entries) == 2
|
|
patterns = {e["command_pattern"] for e in entries}
|
|
assert patterns == {"docker", "git push"}
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_delete_user_whitelist_entry(self, whitelist_client: AsyncClient):
|
|
add_resp = await whitelist_client.post(
|
|
"/api/v1/terminal/whitelist/user",
|
|
json={"command_pattern": "docker"},
|
|
)
|
|
entry_id = add_resp.json()["id"]
|
|
|
|
del_resp = await whitelist_client.delete(
|
|
f"/api/v1/terminal/whitelist/user/{entry_id}"
|
|
)
|
|
assert del_resp.status_code == 204
|
|
|
|
# Verify it's gone
|
|
list_resp = await whitelist_client.get("/api/v1/terminal/whitelist/user")
|
|
assert list_resp.json()["entries"] == []
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_delete_nonexistent_user_whitelist_returns_404(
|
|
self, whitelist_client: AsyncClient
|
|
):
|
|
resp = await whitelist_client.delete(
|
|
"/api/v1/terminal/whitelist/user/nonexistent-id"
|
|
)
|
|
assert resp.status_code == 404
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_add_user_whitelist_invalid_pattern_rejected(
|
|
self, whitelist_client: AsyncClient
|
|
):
|
|
# Pattern with shell separator
|
|
resp = await whitelist_client.post(
|
|
"/api/v1/terminal/whitelist/user",
|
|
json={"command_pattern": "ls; rm -rf /"},
|
|
)
|
|
assert resp.status_code == 422
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_add_user_whitelist_empty_pattern_rejected(
|
|
self, whitelist_client: AsyncClient
|
|
):
|
|
resp = await whitelist_client.post(
|
|
"/api/v1/terminal/whitelist/user",
|
|
json={"command_pattern": ""},
|
|
)
|
|
assert resp.status_code == 422
|
|
|
|
|
|
class TestBlocklistAPI:
|
|
"""Test the blocklist CRUD endpoints (admin only)."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_list_empty_blocklist(self, whitelist_client: AsyncClient):
|
|
resp = await whitelist_client.get("/api/v1/terminal/blocklist")
|
|
assert resp.status_code == 200
|
|
assert resp.json()["entries"] == []
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_add_blocklist_entry(self, whitelist_client: AsyncClient):
|
|
resp = await whitelist_client.post(
|
|
"/api/v1/terminal/blocklist",
|
|
json={"command_pattern": "mkfs", "reason": "formatting is dangerous"},
|
|
)
|
|
assert resp.status_code == 201
|
|
data = resp.json()
|
|
assert data["command_pattern"] == "mkfs"
|
|
assert data["reason"] == "formatting is dangerous"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_add_duplicate_blocklist_returns_409(self, whitelist_client: AsyncClient):
|
|
await whitelist_client.post(
|
|
"/api/v1/terminal/blocklist",
|
|
json={"command_pattern": "mkfs"},
|
|
)
|
|
resp = await whitelist_client.post(
|
|
"/api/v1/terminal/blocklist",
|
|
json={"command_pattern": "mkfs"},
|
|
)
|
|
assert resp.status_code == 409
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_delete_blocklist_entry(self, whitelist_client: AsyncClient):
|
|
add_resp = await whitelist_client.post(
|
|
"/api/v1/terminal/blocklist",
|
|
json={"command_pattern": "mkfs"},
|
|
)
|
|
entry_id = add_resp.json()["id"]
|
|
|
|
del_resp = await whitelist_client.delete(f"/api/v1/terminal/blocklist/{entry_id}")
|
|
assert del_resp.status_code == 204
|
|
|
|
# Verify deleted
|
|
list_resp = await whitelist_client.get("/api/v1/terminal/blocklist")
|
|
assert list_resp.json()["entries"] == []
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_delete_nonexistent_blocklist_returns_404(
|
|
self, whitelist_client: AsyncClient
|
|
):
|
|
resp = await whitelist_client.delete("/api/v1/terminal/blocklist/nonexistent")
|
|
assert resp.status_code == 404
|
|
|
|
|
|
class TestAuditLogAPI:
|
|
"""Test the audit log query endpoint."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_list_empty_audit_logs(self, whitelist_client: AsyncClient):
|
|
resp = await whitelist_client.get("/api/v1/terminal/audit-logs")
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["entries"] == []
|
|
assert data["total"] == 0
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_list_audit_logs_after_writes(
|
|
self, whitelist_client: AsyncClient, whitelist_app: FastAPI
|
|
):
|
|
db_path = whitelist_app.state.auth_db_path
|
|
await write_audit_log(
|
|
db_path,
|
|
user_id="user-1",
|
|
username="alice",
|
|
session_id="session-1",
|
|
command="ls -la",
|
|
decision=DECISION_EXECUTED,
|
|
exit_code=0,
|
|
)
|
|
await write_audit_log(
|
|
db_path,
|
|
user_id="user-1",
|
|
username="alice",
|
|
session_id="session-1",
|
|
command="rm -rf /tmp",
|
|
decision=DECISION_CONFIRMED,
|
|
exit_code=0,
|
|
)
|
|
await write_audit_log(
|
|
db_path,
|
|
user_id="user-2",
|
|
username="bob",
|
|
session_id="session-2",
|
|
command="mkfs /dev/sda",
|
|
decision=DECISION_BLOCKED,
|
|
)
|
|
|
|
# List all
|
|
resp = await whitelist_client.get("/api/v1/terminal/audit-logs")
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["total"] == 3
|
|
assert len(data["entries"]) == 3
|
|
|
|
# Filter by user_id
|
|
resp = await whitelist_client.get(
|
|
"/api/v1/terminal/audit-logs?user_id=user-1"
|
|
)
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["total"] == 2
|
|
for entry in data["entries"]:
|
|
assert entry["user_id"] == "user-1"
|
|
|
|
# Filter by decision
|
|
resp = await whitelist_client.get(
|
|
"/api/v1/terminal/audit-logs?decision=confirmed"
|
|
)
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["total"] == 1
|
|
assert data["entries"][0]["decision"] == "confirmed"
|
|
|
|
# Filter by session_id
|
|
resp = await whitelist_client.get(
|
|
"/api/v1/terminal/audit-logs?session_id=session-2"
|
|
)
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["total"] == 1
|
|
assert data["entries"][0]["session_id"] == "session-2"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_audit_logs_pagination(
|
|
self, whitelist_client: AsyncClient, whitelist_app: FastAPI
|
|
):
|
|
db_path = whitelist_app.state.auth_db_path
|
|
for i in range(5):
|
|
await write_audit_log(
|
|
db_path,
|
|
user_id="user-1",
|
|
username="alice",
|
|
session_id=f"session-{i}",
|
|
command=f"ls {i}",
|
|
decision=DECISION_EXECUTED,
|
|
)
|
|
|
|
# Page 1: limit=2, offset=0
|
|
resp = await whitelist_client.get(
|
|
"/api/v1/terminal/audit-logs?limit=2&offset=0"
|
|
)
|
|
data = resp.json()
|
|
assert data["total"] == 5
|
|
assert len(data["entries"]) == 2
|
|
|
|
# Page 2: limit=2, offset=2
|
|
resp = await whitelist_client.get(
|
|
"/api/v1/terminal/audit-logs?limit=2&offset=2"
|
|
)
|
|
data = resp.json()
|
|
assert data["total"] == 5
|
|
assert len(data["entries"]) == 2
|
|
|
|
# Page 3: limit=2, offset=4
|
|
resp = await whitelist_client.get(
|
|
"/api/v1/terminal/audit-logs?limit=2&offset=4"
|
|
)
|
|
data = resp.json()
|
|
assert data["total"] == 5
|
|
assert len(data["entries"]) == 1
|
|
|
|
|
|
# ── Integration: blocklist + safety check ─────────────────────────────
|
|
|
|
|
|
class TestBlocklistIntegration:
|
|
"""Integration test: blocklist entry blocks command in safety check."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_blocklist_blocks_command_in_safety_check(
|
|
self, whitelist_client: AsyncClient, whitelist_app: FastAPI
|
|
):
|
|
"""Add a blocklist entry via API, then verify safety check blocks it."""
|
|
# Add "rm" to blocklist via API
|
|
resp = await whitelist_client.post(
|
|
"/api/v1/terminal/blocklist",
|
|
json={"command_pattern": "rm", "reason": "rm disabled by policy"},
|
|
)
|
|
assert resp.status_code == 201
|
|
|
|
# Now check command safety — rm should be blocked
|
|
db_path = whitelist_app.state.auth_db_path
|
|
decision = await check_command_safety_v2(
|
|
"rm -rf /tmp/test",
|
|
session_id="test-session",
|
|
session_whitelist={"rm"}, # Even with session whitelist
|
|
user_id="user-1",
|
|
db_path=db_path,
|
|
)
|
|
assert decision.safe is False
|
|
assert decision.decision == DECISION_BLOCKED
|
|
assert decision.matched_layer == "blocklist"
|