fischer-agentkit/tests/unit/test_terminal_whitelist.py

761 lines
27 KiB
Python

"""Tests for U6: terminal three-layer whitelist + audit logging.
Covers:
- terminal_security.py: pattern matching, builtin whitelist, DB-backed
user whitelist, blocklist, audit log writing, top-level
check_command_safety_v2
- terminal_whitelist.py routes: user whitelist CRUD, blocklist CRUD,
audit log query
"""
from __future__ import annotations
from pathlib import Path
import pytest
from fastapi import FastAPI
from httpx import ASGITransport, AsyncClient
from agentkit.server.auth.models import init_auth_db
from agentkit.server.auth.terminal_security import (
DECISION_BLOCKED,
DECISION_CONFIRMED,
DECISION_DENIED,
DECISION_EXECUTED,
_matches_any_pattern,
_matches_builtin_whitelist,
_matches_pattern,
check_command_safety_v2,
compute_whitelist_entry,
write_audit_log,
)
from agentkit.server.routes import terminal_whitelist
# ── Pattern matching unit tests ───────────────────────────────────────
class TestPatternMatching:
"""Test the pattern matching helpers."""
def test_matches_pattern_single_word_matches_binary(self):
assert _matches_pattern("ls -la", "ls") is True
assert _matches_pattern("git status", "git") is True
def test_matches_pattern_single_word_does_not_match_different_binary(self):
assert _matches_pattern("ls -la", "cat") is False
def test_matches_pattern_multi_word_prefix(self):
assert _matches_pattern("git push origin main", "git push") is True
assert _matches_pattern("npm install --save", "npm install") is True
def test_matches_pattern_multi_word_not_prefix(self):
assert _matches_pattern("git status", "git push") is False
def test_matches_pattern_case_insensitive(self):
assert _matches_pattern("LS -la", "ls") is True
assert _matches_pattern("Git Push", "git push") is True
def test_matches_pattern_empty_pattern_returns_false(self):
assert _matches_pattern("ls", "") is False
assert _matches_pattern("ls", " ") is False
def test_matches_pattern_unparseable_command(self):
# Unbalanced quote → shlex fails → binary is None → no match
assert _matches_pattern('echo "unbalanced', "echo") is False
def test_matches_any_pattern_returns_first_match(self):
matched, pattern = _matches_any_pattern("git push", ["ls", "git", "cat"])
assert matched is True
assert pattern == "git"
def test_matches_any_pattern_no_match(self):
matched, pattern = _matches_any_pattern("rm -rf /", ["ls", "git"])
assert matched is False
assert pattern is None
class TestBuiltinWhitelist:
"""Test the built-in safe whitelist matching."""
def test_builtin_whitelist_matches_ls(self):
assert _matches_builtin_whitelist("ls -la") is True
def test_builtin_whitelist_matches_cat(self):
assert _matches_builtin_whitelist("cat /etc/hosts") is True
def test_builtin_whitelist_matches_git_status(self):
assert _matches_builtin_whitelist("git status") is True
def test_builtin_whitelist_matches_docker_ps(self):
assert _matches_builtin_whitelist("docker ps") is True
def test_builtin_whitelist_does_not_match_rm(self):
assert _matches_builtin_whitelist("rm -rf /tmp/test") is False
def test_builtin_whitelist_does_not_match_unknown_command(self):
assert _matches_builtin_whitelist("somecmd --flag") is False
def test_builtin_whitelist_empty_command(self):
assert _matches_builtin_whitelist("") is False
def test_builtin_whitelist_case_insensitive(self):
assert _matches_builtin_whitelist("LS -LA") is True
class TestComputeWhitelistEntry:
"""Test the whitelist entry computation for confirmed commands."""
def test_compute_entry_for_safe_binary(self):
# Non-dangerous binary (not in _DANGEROUS_BINARY_FLAGS) → just the binary name
assert compute_whitelist_entry("ls -la /tmp") == "ls"
def test_compute_entry_for_dangerous_binary(self):
# Dangerous binary (in _DANGEROUS_BINARY_FLAGS) → None (never whitelisted,
# because whitelisting the binary name would bypass danger detection for
# dangerous flag variants like "rm -rf")
assert compute_whitelist_entry("rm -rf /tmp/test") is None
def test_compute_entry_for_binary_with_dangerous_variants(self):
# Binary in _DANGEROUS_BINARY_FLAGS (e.g. docker) → None even for safe
# invocations, because whitelisting "docker" would auto-execute "docker rm"
assert compute_whitelist_entry("docker ps") is None
def test_compute_entry_for_unparseable_command(self):
assert compute_whitelist_entry('echo "unbalanced') is None
def test_compute_entry_for_empty_command(self):
assert compute_whitelist_entry("") is None
# ── DB-backed tests (using temp SQLite) ───────────────────────────────
@pytest.fixture
async def temp_db_path(tmp_path: Path) -> Path:
"""Create a temporary auth DB with the terminal tables."""
db_path = tmp_path / "test_auth.db"
await init_auth_db(db_path)
return db_path
class TestCheckCommandSafetyV2:
"""Test the top-level three-layer whitelist check."""
@pytest.mark.asyncio
async def test_builtin_whitelist_command_passes(self, temp_db_path: Path):
"""Built-in safe commands execute without confirmation."""
decision = await check_command_safety_v2(
"ls -la",
session_id="test-session",
session_whitelist=set(),
user_id="user-1",
db_path=temp_db_path,
)
assert decision.safe is True
assert decision.decision == DECISION_EXECUTED
assert decision.matched_layer == "builtin"
@pytest.mark.asyncio
async def test_dangerous_command_requires_confirmation(self, temp_db_path: Path):
"""Non-whitelisted dangerous commands require confirmation."""
decision = await check_command_safety_v2(
"rm -rf /tmp/test",
session_id="test-session",
session_whitelist=set(),
user_id="user-1",
db_path=temp_db_path,
)
assert decision.safe is False
assert decision.decision == DECISION_DENIED
assert decision.reason is not None
@pytest.mark.asyncio
async def test_session_whitelist_allows_command(self, temp_db_path: Path):
"""Session whitelist allows previously-confirmed commands."""
decision = await check_command_safety_v2(
"rm -rf /tmp/test",
session_id="test-session",
session_whitelist={"rm"}, # rm is in session whitelist
user_id="user-1",
db_path=temp_db_path,
)
assert decision.safe is True
assert decision.matched_layer == "session_whitelist"
@pytest.mark.asyncio
async def test_user_whitelist_allows_command(self, temp_db_path: Path):
"""User whitelist (DB-backed) allows commands."""
import aiosqlite
# Add a user whitelist entry
async with aiosqlite.connect(str(temp_db_path)) as db:
await db.execute(
"""INSERT INTO terminal_whitelist_user
(id, user_id, command_pattern, scope, created_at)
VALUES (?, ?, ?, 'user', ?)""",
("wl-1", "user-1", "rm", "2026-01-01T00:00:00Z"),
)
await db.commit()
decision = await check_command_safety_v2(
"rm -rf /tmp/test",
session_id="test-session",
session_whitelist=set(),
user_id="user-1",
db_path=temp_db_path,
)
assert decision.safe is True
assert decision.matched_layer == "user_whitelist"
@pytest.mark.asyncio
async def test_blocklist_rejects_command(self, temp_db_path: Path):
"""Blocklist rejects commands regardless of other whitelists."""
import aiosqlite
# Add a blocklist entry for "rm"
async with aiosqlite.connect(str(temp_db_path)) as db:
await db.execute(
"""INSERT INTO terminal_blocklist
(id, command_pattern, reason, created_at)
VALUES (?, ?, ?, ?)""",
("bl-1", "rm", "rm is blocked by admin", "2026-01-01T00:00:00Z"),
)
await db.commit()
# Even though "rm" is in session whitelist, blocklist takes priority
decision = await check_command_safety_v2(
"rm -rf /tmp/test",
session_id="test-session",
session_whitelist={"rm"}, # session whitelist has rm
user_id="user-1",
db_path=temp_db_path,
)
assert decision.safe is False
assert decision.decision == DECISION_BLOCKED
assert decision.matched_layer == "blocklist"
assert "rm is blocked by admin" in (decision.reason or "")
@pytest.mark.asyncio
async def test_blocklist_takes_priority_over_builtin(self, temp_db_path: Path):
"""Blocklist rejects even built-in safe commands."""
import aiosqlite
async with aiosqlite.connect(str(temp_db_path)) as db:
await db.execute(
"""INSERT INTO terminal_blocklist
(id, command_pattern, reason, created_at)
VALUES (?, ?, ?, ?)""",
("bl-1", "ls", "ls is blocked", "2026-01-01T00:00:00Z"),
)
await db.commit()
decision = await check_command_safety_v2(
"ls -la",
session_id="test-session",
session_whitelist=set(),
user_id="user-1",
db_path=temp_db_path,
)
assert decision.safe is False
assert decision.decision == DECISION_BLOCKED
@pytest.mark.asyncio
async def test_empty_command_denied(self, temp_db_path: Path):
"""Empty commands are denied."""
decision = await check_command_safety_v2(
" ",
session_id="test-session",
session_whitelist=set(),
user_id="user-1",
db_path=temp_db_path,
)
assert decision.safe is False
assert decision.decision == DECISION_DENIED
@pytest.mark.asyncio
async def test_dev_mode_no_db(self):
"""When db_path is None, DB-backed layers are skipped."""
# A command that's not in builtin whitelist and is dangerous
decision = await check_command_safety_v2(
"rm -rf /tmp/test",
session_id="test-session",
session_whitelist=set(),
user_id=None,
db_path=None, # No DB
)
# Should fall through to danger detection
assert decision.safe is False
assert decision.decision == DECISION_DENIED
@pytest.mark.asyncio
async def test_non_dangerous_non_whitelisted_command_passes(self, temp_db_path: Path):
"""Commands that pass the danger check (not in any whitelist) execute."""
# "man ls" is not in the builtin whitelist but is not dangerous
# per the _is_dangerous logic (man is not in _DANGEROUS_BINARIES)
# Actually, man is not in _SAFE_COMMAND_PREFIXES, so it would be
# considered dangerous by the default logic. Let's use a command
# that's definitely not dangerous but not in the whitelist.
# Looking at _is_single_command_dangerous: if binary is not in
# _SAFE_COMMAND_PREFIXES and not in _DANGEROUS_BINARIES, it returns
# True (dangerous). So any unknown binary is dangerous.
# This means the "danger_check_passed" layer is rarely reached.
# Let's test with a piped command where all parts are safe.
decision = await check_command_safety_v2(
"ls | grep test",
session_id="test-session",
session_whitelist=set(),
user_id="user-1",
db_path=temp_db_path,
)
# ls and grep are both in builtin whitelist, but the pipe makes it
# go through _is_dangerous which checks each part. Since both parts
# are safe, the whole command is safe.
assert decision.safe is True
class TestWriteAuditLog:
"""Test the audit log writer."""
@pytest.mark.asyncio
async def test_write_audit_log_success(self, temp_db_path: Path):
"""Audit log entry is written to the DB."""
await write_audit_log(
temp_db_path,
user_id="user-1",
username="alice",
session_id="session-1",
command="ls -la",
decision=DECISION_EXECUTED,
reason="matched_layer=builtin",
cwd="/tmp",
exit_code=0,
terminal_mode="local",
)
import aiosqlite
async with aiosqlite.connect(str(temp_db_path)) as db:
db.row_factory = aiosqlite.Row
cursor = await db.execute(
"SELECT * FROM terminal_audit_logs WHERE session_id = ?",
("session-1",),
)
row = await cursor.fetchone()
assert row is not None
assert row["user_id"] == "user-1"
assert row["username"] == "alice"
assert row["command"] == "ls -la"
assert row["decision"] == DECISION_EXECUTED
assert row["exit_code"] == 0
assert row["terminal_mode"] == "local"
@pytest.mark.asyncio
async def test_write_audit_log_null_user(self, temp_db_path: Path):
"""Audit log accepts None user_id (dev mode)."""
await write_audit_log(
temp_db_path,
user_id=None,
username=None,
session_id="session-dev",
command="rm -rf /tmp",
decision=DECISION_CONFIRMED,
terminal_mode="local",
)
import aiosqlite
async with aiosqlite.connect(str(temp_db_path)) as db:
cursor = await db.execute(
"SELECT user_id, username FROM terminal_audit_logs WHERE session_id = ?",
("session-dev",),
)
row = await cursor.fetchone()
assert row is not None
assert row[0] is None
assert row[1] is None
@pytest.mark.asyncio
async def test_write_audit_log_best_effort(self, tmp_path: Path):
"""Audit log failures are swallowed (never raise)."""
# Non-existent DB path — should not raise
await write_audit_log(
tmp_path / "nonexistent" / "auth.db", # Invalid path
user_id="user-1",
username="alice",
session_id="session-1",
command="ls",
decision=DECISION_EXECUTED,
)
@pytest.mark.asyncio
async def test_write_audit_log_truncates_long_command(self, temp_db_path: Path):
"""Commands longer than 4096 chars are truncated."""
long_command = "echo " + "a" * 5000
await write_audit_log(
temp_db_path,
user_id="user-1",
username="alice",
session_id="session-1",
command=long_command,
decision=DECISION_EXECUTED,
)
import aiosqlite
async with aiosqlite.connect(str(temp_db_path)) as db:
cursor = await db.execute(
"SELECT command FROM terminal_audit_logs WHERE session_id = ?",
("session-1",),
)
row = await cursor.fetchone()
assert row is not None
assert len(row[0]) <= 4096
# ── Whitelist management API tests ────────────────────────────────────
@pytest.fixture
async def whitelist_app(tmp_path: Path) -> FastAPI:
"""Build a FastAPI app with the whitelist routes + dev-admin middleware."""
db_path = tmp_path / "test_auth.db"
await init_auth_db(db_path)
app = FastAPI()
app.state.auth_db_path = str(db_path)
app.include_router(terminal_whitelist.router, prefix="/api/v1")
# Dev-admin middleware: injects an admin user so permission checks pass
@app.middleware("http")
async def _set_dev_admin_user(request, call_next):
request.state.current_user = {
"user_id": "dev-admin-id",
"username": "dev-admin",
"role": "admin",
"dev_mode": False, # Use real DB path
}
return await call_next(request)
return app
@pytest.fixture
async def whitelist_client(whitelist_app: FastAPI):
"""HTTP client for the whitelist app."""
transport = ASGITransport(app=whitelist_app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
yield client
class TestUserWhitelistAPI:
"""Test the user whitelist CRUD endpoints."""
@pytest.mark.asyncio
async def test_list_empty_user_whitelist(self, whitelist_client: AsyncClient):
resp = await whitelist_client.get("/api/v1/terminal/whitelist/user")
assert resp.status_code == 200
data = resp.json()
assert data["entries"] == []
@pytest.mark.asyncio
async def test_add_user_whitelist_entry(self, whitelist_client: AsyncClient):
resp = await whitelist_client.post(
"/api/v1/terminal/whitelist/user",
json={"command_pattern": "docker"},
)
assert resp.status_code == 201
data = resp.json()
assert data["command_pattern"] == "docker"
assert data["scope"] == "user"
assert "id" in data
@pytest.mark.asyncio
async def test_add_duplicate_user_whitelist_returns_409(
self, whitelist_client: AsyncClient
):
# Add once
await whitelist_client.post(
"/api/v1/terminal/whitelist/user",
json={"command_pattern": "git push"},
)
# Add again → 409
resp = await whitelist_client.post(
"/api/v1/terminal/whitelist/user",
json={"command_pattern": "git push"},
)
assert resp.status_code == 409
@pytest.mark.asyncio
async def test_list_user_whitelist_after_add(self, whitelist_client: AsyncClient):
await whitelist_client.post(
"/api/v1/terminal/whitelist/user",
json={"command_pattern": "docker"},
)
await whitelist_client.post(
"/api/v1/terminal/whitelist/user",
json={"command_pattern": "git push"},
)
resp = await whitelist_client.get("/api/v1/terminal/whitelist/user")
assert resp.status_code == 200
entries = resp.json()["entries"]
assert len(entries) == 2
patterns = {e["command_pattern"] for e in entries}
assert patterns == {"docker", "git push"}
@pytest.mark.asyncio
async def test_delete_user_whitelist_entry(self, whitelist_client: AsyncClient):
add_resp = await whitelist_client.post(
"/api/v1/terminal/whitelist/user",
json={"command_pattern": "docker"},
)
entry_id = add_resp.json()["id"]
del_resp = await whitelist_client.delete(
f"/api/v1/terminal/whitelist/user/{entry_id}"
)
assert del_resp.status_code == 204
# Verify it's gone
list_resp = await whitelist_client.get("/api/v1/terminal/whitelist/user")
assert list_resp.json()["entries"] == []
@pytest.mark.asyncio
async def test_delete_nonexistent_user_whitelist_returns_404(
self, whitelist_client: AsyncClient
):
resp = await whitelist_client.delete(
"/api/v1/terminal/whitelist/user/nonexistent-id"
)
assert resp.status_code == 404
@pytest.mark.asyncio
async def test_add_user_whitelist_invalid_pattern_rejected(
self, whitelist_client: AsyncClient
):
# Pattern with shell separator
resp = await whitelist_client.post(
"/api/v1/terminal/whitelist/user",
json={"command_pattern": "ls; rm -rf /"},
)
assert resp.status_code == 422
@pytest.mark.asyncio
async def test_add_user_whitelist_empty_pattern_rejected(
self, whitelist_client: AsyncClient
):
resp = await whitelist_client.post(
"/api/v1/terminal/whitelist/user",
json={"command_pattern": ""},
)
assert resp.status_code == 422
class TestBlocklistAPI:
"""Test the blocklist CRUD endpoints (admin only)."""
@pytest.mark.asyncio
async def test_list_empty_blocklist(self, whitelist_client: AsyncClient):
resp = await whitelist_client.get("/api/v1/terminal/blocklist")
assert resp.status_code == 200
assert resp.json()["entries"] == []
@pytest.mark.asyncio
async def test_add_blocklist_entry(self, whitelist_client: AsyncClient):
resp = await whitelist_client.post(
"/api/v1/terminal/blocklist",
json={"command_pattern": "mkfs", "reason": "formatting is dangerous"},
)
assert resp.status_code == 201
data = resp.json()
assert data["command_pattern"] == "mkfs"
assert data["reason"] == "formatting is dangerous"
@pytest.mark.asyncio
async def test_add_duplicate_blocklist_returns_409(self, whitelist_client: AsyncClient):
await whitelist_client.post(
"/api/v1/terminal/blocklist",
json={"command_pattern": "mkfs"},
)
resp = await whitelist_client.post(
"/api/v1/terminal/blocklist",
json={"command_pattern": "mkfs"},
)
assert resp.status_code == 409
@pytest.mark.asyncio
async def test_delete_blocklist_entry(self, whitelist_client: AsyncClient):
add_resp = await whitelist_client.post(
"/api/v1/terminal/blocklist",
json={"command_pattern": "mkfs"},
)
entry_id = add_resp.json()["id"]
del_resp = await whitelist_client.delete(f"/api/v1/terminal/blocklist/{entry_id}")
assert del_resp.status_code == 204
# Verify deleted
list_resp = await whitelist_client.get("/api/v1/terminal/blocklist")
assert list_resp.json()["entries"] == []
@pytest.mark.asyncio
async def test_delete_nonexistent_blocklist_returns_404(
self, whitelist_client: AsyncClient
):
resp = await whitelist_client.delete("/api/v1/terminal/blocklist/nonexistent")
assert resp.status_code == 404
class TestAuditLogAPI:
"""Test the audit log query endpoint."""
@pytest.mark.asyncio
async def test_list_empty_audit_logs(self, whitelist_client: AsyncClient):
resp = await whitelist_client.get("/api/v1/terminal/audit-logs")
assert resp.status_code == 200
data = resp.json()
assert data["entries"] == []
assert data["total"] == 0
@pytest.mark.asyncio
async def test_list_audit_logs_after_writes(
self, whitelist_client: AsyncClient, whitelist_app: FastAPI
):
db_path = whitelist_app.state.auth_db_path
await write_audit_log(
db_path,
user_id="user-1",
username="alice",
session_id="session-1",
command="ls -la",
decision=DECISION_EXECUTED,
exit_code=0,
)
await write_audit_log(
db_path,
user_id="user-1",
username="alice",
session_id="session-1",
command="rm -rf /tmp",
decision=DECISION_CONFIRMED,
exit_code=0,
)
await write_audit_log(
db_path,
user_id="user-2",
username="bob",
session_id="session-2",
command="mkfs /dev/sda",
decision=DECISION_BLOCKED,
)
# List all
resp = await whitelist_client.get("/api/v1/terminal/audit-logs")
assert resp.status_code == 200
data = resp.json()
assert data["total"] == 3
assert len(data["entries"]) == 3
# Filter by user_id
resp = await whitelist_client.get(
"/api/v1/terminal/audit-logs?user_id=user-1"
)
assert resp.status_code == 200
data = resp.json()
assert data["total"] == 2
for entry in data["entries"]:
assert entry["user_id"] == "user-1"
# Filter by decision
resp = await whitelist_client.get(
"/api/v1/terminal/audit-logs?decision=confirmed"
)
assert resp.status_code == 200
data = resp.json()
assert data["total"] == 1
assert data["entries"][0]["decision"] == "confirmed"
# Filter by session_id
resp = await whitelist_client.get(
"/api/v1/terminal/audit-logs?session_id=session-2"
)
assert resp.status_code == 200
data = resp.json()
assert data["total"] == 1
assert data["entries"][0]["session_id"] == "session-2"
@pytest.mark.asyncio
async def test_audit_logs_pagination(
self, whitelist_client: AsyncClient, whitelist_app: FastAPI
):
db_path = whitelist_app.state.auth_db_path
for i in range(5):
await write_audit_log(
db_path,
user_id="user-1",
username="alice",
session_id=f"session-{i}",
command=f"ls {i}",
decision=DECISION_EXECUTED,
)
# Page 1: limit=2, offset=0
resp = await whitelist_client.get(
"/api/v1/terminal/audit-logs?limit=2&offset=0"
)
data = resp.json()
assert data["total"] == 5
assert len(data["entries"]) == 2
# Page 2: limit=2, offset=2
resp = await whitelist_client.get(
"/api/v1/terminal/audit-logs?limit=2&offset=2"
)
data = resp.json()
assert data["total"] == 5
assert len(data["entries"]) == 2
# Page 3: limit=2, offset=4
resp = await whitelist_client.get(
"/api/v1/terminal/audit-logs?limit=2&offset=4"
)
data = resp.json()
assert data["total"] == 5
assert len(data["entries"]) == 1
# ── Integration: blocklist + safety check ─────────────────────────────
class TestBlocklistIntegration:
"""Integration test: blocklist entry blocks command in safety check."""
@pytest.mark.asyncio
async def test_blocklist_blocks_command_in_safety_check(
self, whitelist_client: AsyncClient, whitelist_app: FastAPI
):
"""Add a blocklist entry via API, then verify safety check blocks it."""
# Add "rm" to blocklist via API
resp = await whitelist_client.post(
"/api/v1/terminal/blocklist",
json={"command_pattern": "rm", "reason": "rm disabled by policy"},
)
assert resp.status_code == 201
# Now check command safety — rm should be blocked
db_path = whitelist_app.state.auth_db_path
decision = await check_command_safety_v2(
"rm -rf /tmp/test",
session_id="test-session",
session_whitelist={"rm"}, # Even with session whitelist
user_id="user-1",
db_path=db_path,
)
assert decision.safe is False
assert decision.decision == DECISION_BLOCKED
assert decision.matched_layer == "blocklist"