feat(guards): add core/guards.py with gate_destructive_action helper

This commit is contained in:
Jacob Magar
2026-03-15 23:25:39 -04:00
parent a3ea468bd9
commit aa5fa3e177
2 changed files with 186 additions and 0 deletions

90
tests/test_guards.py Normal file
View File

@@ -0,0 +1,90 @@
"""Unit tests for unraid_mcp.core.guards."""
from unittest.mock import AsyncMock, patch
import pytest
from fastmcp.exceptions import ToolError
from unraid_mcp.core.guards import gate_destructive_action
DESTRUCTIVE = {"delete", "wipe"}
class TestGateDestructiveAction:
"""gate_destructive_action raises ToolError or elicits based on state."""
@pytest.mark.asyncio
async def test_non_destructive_action_passes_through(self) -> None:
"""Non-destructive actions are never blocked."""
await gate_destructive_action(None, "list", DESTRUCTIVE, False, "irrelevant")
@pytest.mark.asyncio
async def test_confirm_true_bypasses_elicitation(self) -> None:
"""confirm=True skips elicitation entirely."""
with patch("unraid_mcp.core.guards.elicit_destructive_confirmation") as mock_elicit:
await gate_destructive_action(None, "delete", DESTRUCTIVE, True, "desc")
mock_elicit.assert_not_called()
@pytest.mark.asyncio
async def test_no_ctx_raises_tool_error(self) -> None:
"""ctx=None means elicitation returns False → ToolError."""
with pytest.raises(ToolError, match="not confirmed"):
await gate_destructive_action(None, "delete", DESTRUCTIVE, False, "desc")
@pytest.mark.asyncio
async def test_elicitation_accepted_does_not_raise(self) -> None:
"""When elicitation returns True, no ToolError is raised."""
with patch(
"unraid_mcp.core.guards.elicit_destructive_confirmation",
new_callable=AsyncMock,
return_value=True,
):
await gate_destructive_action(object(), "delete", DESTRUCTIVE, False, "desc")
@pytest.mark.asyncio
async def test_elicitation_declined_raises_tool_error(self) -> None:
"""When elicitation returns False, ToolError is raised."""
with (
patch(
"unraid_mcp.core.guards.elicit_destructive_confirmation",
new_callable=AsyncMock,
return_value=False,
) as mock_elicit,
pytest.raises(ToolError, match="confirm=True"),
):
await gate_destructive_action(object(), "delete", DESTRUCTIVE, False, "desc")
mock_elicit.assert_called_once()
@pytest.mark.asyncio
async def test_string_description_passed_to_elicitation(self) -> None:
"""A plain string description is forwarded as-is."""
with patch(
"unraid_mcp.core.guards.elicit_destructive_confirmation",
new_callable=AsyncMock,
return_value=True,
) as mock_elicit:
await gate_destructive_action(
object(), "delete", DESTRUCTIVE, False, "Delete everything."
)
_, _, desc = mock_elicit.call_args.args
assert desc == "Delete everything."
@pytest.mark.asyncio
async def test_dict_description_resolves_by_action(self) -> None:
"""A dict description is resolved by action key."""
descs = {"delete": "Delete desc.", "wipe": "Wipe desc."}
with patch(
"unraid_mcp.core.guards.elicit_destructive_confirmation",
new_callable=AsyncMock,
return_value=True,
) as mock_elicit:
await gate_destructive_action(object(), "wipe", DESTRUCTIVE, False, descs)
_, _, desc = mock_elicit.call_args.args
assert desc == "Wipe desc."
@pytest.mark.asyncio
async def test_error_message_contains_action_name(self) -> None:
"""ToolError message includes the action name."""
with pytest.raises(ToolError, match="'delete'"):
await gate_destructive_action(None, "delete", DESTRUCTIVE, False, "desc")

96
unraid_mcp/core/guards.py Normal file
View File

@@ -0,0 +1,96 @@
"""Destructive action gating via MCP elicitation.
Provides gate_destructive_action() — a single call to guard any destructive
tool action with interactive user confirmation or confirm=True bypass.
"""
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from fastmcp import Context
from fastmcp.exceptions import ToolError
from ..config.logging import logger
async def elicit_destructive_confirmation(
ctx: "Context | None", action: str, description: str
) -> bool:
"""Prompt the user to confirm a destructive action via MCP elicitation.
Args:
ctx: The MCP context. If None, returns False immediately.
action: Action name shown in the prompt.
description: Human-readable description of what the action will do.
Returns:
True if the user confirmed, False otherwise.
"""
if ctx is None:
logger.warning(
"Cannot elicit confirmation for '%s': no MCP context available. "
"Re-run with confirm=True to bypass elicitation.",
action,
)
return False
try:
result = await ctx.elicit(
message=(
f"**Confirm destructive action: `{action}`**\n\n"
f"{description}\n\n"
"Are you sure you want to proceed?"
),
response_type=bool,
)
except NotImplementedError:
logger.warning(
"MCP client does not support elicitation for action '%s'. "
"Re-run with confirm=True to bypass.",
action,
)
return False
if result.action != "accept":
logger.info("Destructive action '%s' declined by user (%s).", action, result.action)
return False
confirmed: bool = result.data # type: ignore[union-attr]
if not confirmed:
logger.info("Destructive action '%s' not confirmed by user.", action)
return confirmed
async def gate_destructive_action(
ctx: "Context | None",
action: str,
destructive_actions: set[str],
confirm: bool,
description: str | dict[str, str],
) -> None:
"""Gate a destructive action with elicitation or confirm=True bypass.
Does nothing if the action is not in destructive_actions or confirm=True.
Otherwise calls elicit_destructive_confirmation; raises ToolError if the
user declines or elicitation is unavailable.
Args:
ctx: MCP context for elicitation (None skips elicitation).
action: The action being requested.
destructive_actions: Set of action names considered destructive.
confirm: When True, bypasses elicitation and proceeds immediately.
description: Human-readable description of the action's impact.
Pass a str when one description covers all destructive actions.
Pass a dict[action_name, description] when descriptions differ.
"""
if action not in destructive_actions or confirm:
return
desc = description[action] if isinstance(description, dict) else description
confirmed = await elicit_destructive_confirmation(ctx, action, desc)
if not confirmed:
raise ToolError(
f"Action '{action}' was not confirmed. Re-run with confirm=True to bypass elicitation."
)