fix(live): validate log_tail path against allowlist, move guards before error handler

Add _ALLOWED_LOG_PREFIXES allowlist check to log_tail (mirrors storage.py pattern)
to prevent path traversal attacks. Move path/required guards before tool_error_handler
context so validation errors raise cleanly. Add two tests: ToolError propagation and
invalid path rejection.
This commit is contained in:
Jacob Magar
2026-03-15 19:08:43 -04:00
parent 3a72f6c6b9
commit 0d4a3fa4e2
2 changed files with 36 additions and 2 deletions

View File

@@ -93,3 +93,22 @@ async def test_invalid_action_raises(mcp):
tool_fn = _make_live_tool(mcp)
with pytest.raises(ToolError, match="Invalid action"):
await tool_fn(action="nonexistent") # type: ignore[arg-type]
@pytest.mark.asyncio
async def test_snapshot_propagates_tool_error(mcp, _mock_subscribe_once):
from unraid_mcp.core.exceptions import ToolError
_mock_subscribe_once.side_effect = ToolError("Subscription timed out after 10s")
tool_fn = _make_live_tool(mcp)
with pytest.raises(ToolError, match="timed out"):
await tool_fn(action="cpu")
@pytest.mark.asyncio
async def test_log_tail_rejects_invalid_path(mcp, _mock_subscribe_collect):
from unraid_mcp.core.exceptions import ToolError
tool_fn = _make_live_tool(mcp)
with pytest.raises(ToolError, match="must start with"):
await tool_fn(action="log_tail", path="/etc/shadow")

View File

@@ -10,6 +10,7 @@ Use `subscribe_collect` actions for event streams (notification_feed, log_tail).
from __future__ import annotations
import os
from typing import Any, Literal, get_args
from fastmcp import FastMCP
@@ -19,6 +20,8 @@ from ..core.exceptions import ToolError, tool_error_handler
from ..subscriptions.snapshot import subscribe_collect, subscribe_once
_ALLOWED_LOG_PREFIXES = ("/var/log/", "/boot/logs/", "/mnt/")
SNAPSHOT_ACTIONS = {
"cpu": """
subscription { systemMetricsCpu { id percentTotal cpus { percentTotal percentUser percentSystem percentIdle } } }
@@ -122,6 +125,20 @@ def register_live_tool(mcp: FastMCP) -> None:
f"Invalid action '{action}'. Must be one of: {sorted(ALL_LIVE_ACTIONS)}"
)
# Validate log_tail path before entering the error handler context.
if action == "log_tail":
if not path:
raise ToolError("path is required for 'log_tail' action")
# Resolve to prevent path traversal attacks (same as storage.py).
# Using os.path.realpath instead of anyio.Path.resolve() because the
# async variant blocks on NFS-mounted paths under /mnt/ (Perf-AI-1).
normalized = os.path.realpath(path) # noqa: ASYNC240
if not any(normalized.startswith(p) for p in _ALLOWED_LOG_PREFIXES):
raise ToolError(
f"path must start with one of: {', '.join(_ALLOWED_LOG_PREFIXES)}. Got: {path!r}"
)
path = normalized
with tool_error_handler("live", action, logger):
logger.info(f"Executing unraid_live action={action} timeout={timeout}")
@@ -131,8 +148,6 @@ def register_live_tool(mcp: FastMCP) -> None:
# Collect actions
if action == "log_tail":
if not path:
raise ToolError("path is required for 'log_tail' action")
events = await subscribe_collect(
COLLECT_ACTIONS["log_tail"],
variables={"path": path},