From aa5fa3e177810a365cd6c2c21bc10ebcf6b70fd2 Mon Sep 17 00:00:00 2001 From: Jacob Magar Date: Sun, 15 Mar 2026 23:25:39 -0400 Subject: [PATCH] feat(guards): add core/guards.py with gate_destructive_action helper --- tests/test_guards.py | 90 ++++++++++++++++++++++++++++++++++++ unraid_mcp/core/guards.py | 96 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 186 insertions(+) create mode 100644 tests/test_guards.py create mode 100644 unraid_mcp/core/guards.py diff --git a/tests/test_guards.py b/tests/test_guards.py new file mode 100644 index 0000000..2095c45 --- /dev/null +++ b/tests/test_guards.py @@ -0,0 +1,90 @@ +"""Unit tests for unraid_mcp.core.guards.""" + +from unittest.mock import AsyncMock, patch + +import pytest +from fastmcp.exceptions import ToolError + +from unraid_mcp.core.guards import gate_destructive_action + + +DESTRUCTIVE = {"delete", "wipe"} + + +class TestGateDestructiveAction: + """gate_destructive_action raises ToolError or elicits based on state.""" + + @pytest.mark.asyncio + async def test_non_destructive_action_passes_through(self) -> None: + """Non-destructive actions are never blocked.""" + await gate_destructive_action(None, "list", DESTRUCTIVE, False, "irrelevant") + + @pytest.mark.asyncio + async def test_confirm_true_bypasses_elicitation(self) -> None: + """confirm=True skips elicitation entirely.""" + with patch("unraid_mcp.core.guards.elicit_destructive_confirmation") as mock_elicit: + await gate_destructive_action(None, "delete", DESTRUCTIVE, True, "desc") + mock_elicit.assert_not_called() + + @pytest.mark.asyncio + async def test_no_ctx_raises_tool_error(self) -> None: + """ctx=None means elicitation returns False → ToolError.""" + with pytest.raises(ToolError, match="not confirmed"): + await gate_destructive_action(None, "delete", DESTRUCTIVE, False, "desc") + + @pytest.mark.asyncio + async def test_elicitation_accepted_does_not_raise(self) -> None: + """When elicitation returns True, no ToolError is raised.""" + with patch( + "unraid_mcp.core.guards.elicit_destructive_confirmation", + new_callable=AsyncMock, + return_value=True, + ): + await gate_destructive_action(object(), "delete", DESTRUCTIVE, False, "desc") + + @pytest.mark.asyncio + async def test_elicitation_declined_raises_tool_error(self) -> None: + """When elicitation returns False, ToolError is raised.""" + with ( + patch( + "unraid_mcp.core.guards.elicit_destructive_confirmation", + new_callable=AsyncMock, + return_value=False, + ) as mock_elicit, + pytest.raises(ToolError, match="confirm=True"), + ): + await gate_destructive_action(object(), "delete", DESTRUCTIVE, False, "desc") + mock_elicit.assert_called_once() + + @pytest.mark.asyncio + async def test_string_description_passed_to_elicitation(self) -> None: + """A plain string description is forwarded as-is.""" + with patch( + "unraid_mcp.core.guards.elicit_destructive_confirmation", + new_callable=AsyncMock, + return_value=True, + ) as mock_elicit: + await gate_destructive_action( + object(), "delete", DESTRUCTIVE, False, "Delete everything." + ) + _, _, desc = mock_elicit.call_args.args + assert desc == "Delete everything." + + @pytest.mark.asyncio + async def test_dict_description_resolves_by_action(self) -> None: + """A dict description is resolved by action key.""" + descs = {"delete": "Delete desc.", "wipe": "Wipe desc."} + with patch( + "unraid_mcp.core.guards.elicit_destructive_confirmation", + new_callable=AsyncMock, + return_value=True, + ) as mock_elicit: + await gate_destructive_action(object(), "wipe", DESTRUCTIVE, False, descs) + _, _, desc = mock_elicit.call_args.args + assert desc == "Wipe desc." + + @pytest.mark.asyncio + async def test_error_message_contains_action_name(self) -> None: + """ToolError message includes the action name.""" + with pytest.raises(ToolError, match="'delete'"): + await gate_destructive_action(None, "delete", DESTRUCTIVE, False, "desc") diff --git a/unraid_mcp/core/guards.py b/unraid_mcp/core/guards.py new file mode 100644 index 0000000..6a9088c --- /dev/null +++ b/unraid_mcp/core/guards.py @@ -0,0 +1,96 @@ +"""Destructive action gating via MCP elicitation. + +Provides gate_destructive_action() — a single call to guard any destructive +tool action with interactive user confirmation or confirm=True bypass. +""" + +from typing import TYPE_CHECKING + + +if TYPE_CHECKING: + from fastmcp import Context + +from fastmcp.exceptions import ToolError + +from ..config.logging import logger + + +async def elicit_destructive_confirmation( + ctx: "Context | None", action: str, description: str +) -> bool: + """Prompt the user to confirm a destructive action via MCP elicitation. + + Args: + ctx: The MCP context. If None, returns False immediately. + action: Action name shown in the prompt. + description: Human-readable description of what the action will do. + + Returns: + True if the user confirmed, False otherwise. + """ + if ctx is None: + logger.warning( + "Cannot elicit confirmation for '%s': no MCP context available. " + "Re-run with confirm=True to bypass elicitation.", + action, + ) + return False + + try: + result = await ctx.elicit( + message=( + f"**Confirm destructive action: `{action}`**\n\n" + f"{description}\n\n" + "Are you sure you want to proceed?" + ), + response_type=bool, + ) + except NotImplementedError: + logger.warning( + "MCP client does not support elicitation for action '%s'. " + "Re-run with confirm=True to bypass.", + action, + ) + return False + + if result.action != "accept": + logger.info("Destructive action '%s' declined by user (%s).", action, result.action) + return False + + confirmed: bool = result.data # type: ignore[union-attr] + if not confirmed: + logger.info("Destructive action '%s' not confirmed by user.", action) + return confirmed + + +async def gate_destructive_action( + ctx: "Context | None", + action: str, + destructive_actions: set[str], + confirm: bool, + description: str | dict[str, str], +) -> None: + """Gate a destructive action with elicitation or confirm=True bypass. + + Does nothing if the action is not in destructive_actions or confirm=True. + Otherwise calls elicit_destructive_confirmation; raises ToolError if the + user declines or elicitation is unavailable. + + Args: + ctx: MCP context for elicitation (None skips elicitation). + action: The action being requested. + destructive_actions: Set of action names considered destructive. + confirm: When True, bypasses elicitation and proceeds immediately. + description: Human-readable description of the action's impact. + Pass a str when one description covers all destructive actions. + Pass a dict[action_name, description] when descriptions differ. + """ + if action not in destructive_actions or confirm: + return + + desc = description[action] if isinstance(description, dict) else description + confirmed = await elicit_destructive_confirmation(ctx, action, desc) + if not confirmed: + raise ToolError( + f"Action '{action}' was not confirmed. Re-run with confirm=True to bypass elicitation." + )