diff --git a/tests/safety/test_destructive_guards.py b/tests/safety/test_destructive_guards.py index 1a77650..f9a5eba 100644 --- a/tests/safety/test_destructive_guards.py +++ b/tests/safety/test_destructive_guards.py @@ -129,7 +129,20 @@ class TestDestructiveActionRegistries: ) def test_no_delete_or_remove_mutations_missing_from_destructive(self) -> None: - """Any mutation with 'delete' or 'remove' in its name should be destructive.""" + """Any mutation with 'delete' or 'remove' in its name should be destructive. + + Exceptions (documented, intentional): + keys/remove_role — fully reversible; the role can always be re-added via add_role. + No data is lost and there is no irreversible side-effect. + """ + # Mutations explicitly exempted from the delete/remove heuristic with justification. + # Add entries here only when the action is demonstrably reversible and non-destructive. + _HEURISTIC_EXCEPTIONS: frozenset[str] = frozenset( + { + "keys/remove_role", # reversible — role can be re-added via add_role + } + ) + all_mutations = { "array": ARRAY_MUTATIONS, "vm": VM_MUTATIONS, @@ -156,6 +169,7 @@ class TestDestructiveActionRegistries: for action_name in mutations if ("delete" in action_name or "remove" in action_name) and action_name not in destructive + and f"{tool_key}/{action_name}" not in _HEURISTIC_EXCEPTIONS ) assert not missing, ( f"Mutations with 'delete'/'remove' not in DESTRUCTIVE_ACTIONS: {missing}" diff --git a/tests/schema/test_query_validation.py b/tests/schema/test_query_validation.py index b1730ca..dac6b71 100644 --- a/tests/schema/test_query_validation.py +++ b/tests/schema/test_query_validation.py @@ -58,12 +58,6 @@ class TestInfoQueries: errors = _validate_operation(schema, QUERIES["registration"]) assert not errors, f"registration query validation failed: {errors}" - def test_connect_query(self, schema: GraphQLSchema) -> None: - from unraid_mcp.tools.info import QUERIES - - errors = _validate_operation(schema, QUERIES["connect"]) - assert not errors, f"connect query validation failed: {errors}" - def test_variables_query(self, schema: GraphQLSchema) -> None: from unraid_mcp.tools.info import QUERIES @@ -157,7 +151,6 @@ class TestInfoQueries: "array", "network", "registration", - "connect", "variables", "metrics", "services", @@ -229,7 +222,7 @@ class TestArrayMutations: # ============================================================================ -# Storage Tool (6 queries) +# Storage Tool (5 queries + 1 mutation) # ============================================================================ class TestStorageQueries: """Validate all queries from unraid_mcp/tools/storage.py.""" @@ -252,12 +245,6 @@ class TestStorageQueries: errors = _validate_operation(schema, QUERIES["disk_details"]) assert not errors, f"disk_details query validation failed: {errors}" - def test_unassigned_query(self, schema: GraphQLSchema) -> None: - from unraid_mcp.tools.storage import QUERIES - - errors = _validate_operation(schema, QUERIES["unassigned"]) - assert not errors, f"unassigned query validation failed: {errors}" - def test_log_files_query(self, schema: GraphQLSchema) -> None: from unraid_mcp.tools.storage import QUERIES @@ -273,12 +260,27 @@ class TestStorageQueries: def test_all_storage_queries_covered(self, schema: GraphQLSchema) -> None: from unraid_mcp.tools.storage import QUERIES - expected = {"shares", "disks", "disk_details", "unassigned", "log_files", "logs"} + expected = {"shares", "disks", "disk_details", "log_files", "logs"} assert set(QUERIES.keys()) == expected +class TestStorageMutations: + """Validate all mutations from unraid_mcp/tools/storage.py.""" + + def test_flash_backup_mutation(self, schema: GraphQLSchema) -> None: + from unraid_mcp.tools.storage import MUTATIONS + + errors = _validate_operation(schema, MUTATIONS["flash_backup"]) + assert not errors, f"flash_backup mutation validation failed: {errors}" + + def test_all_storage_mutations_covered(self, schema: GraphQLSchema) -> None: + from unraid_mcp.tools.storage import MUTATIONS + + assert set(MUTATIONS.keys()) == {"flash_backup"} + + # ============================================================================ -# Docker Tool (7 queries + 7 mutations) +# Docker Tool (4 queries + 2 mutations) # ============================================================================ class TestDockerQueries: """Validate all queries from unraid_mcp/tools/docker.py.""" @@ -295,12 +297,6 @@ class TestDockerQueries: errors = _validate_operation(schema, QUERIES["details"]) assert not errors, f"details query validation failed: {errors}" - def test_logs_query(self, schema: GraphQLSchema) -> None: - from unraid_mcp.tools.docker import QUERIES - - errors = _validate_operation(schema, QUERIES["logs"]) - assert not errors, f"logs query validation failed: {errors}" - def test_networks_query(self, schema: GraphQLSchema) -> None: from unraid_mcp.tools.docker import QUERIES @@ -313,29 +309,14 @@ class TestDockerQueries: errors = _validate_operation(schema, QUERIES["network_details"]) assert not errors, f"network_details query validation failed: {errors}" - def test_port_conflicts_query(self, schema: GraphQLSchema) -> None: - from unraid_mcp.tools.docker import QUERIES - - errors = _validate_operation(schema, QUERIES["port_conflicts"]) - assert not errors, f"port_conflicts query validation failed: {errors}" - - def test_check_updates_query(self, schema: GraphQLSchema) -> None: - from unraid_mcp.tools.docker import QUERIES - - errors = _validate_operation(schema, QUERIES["check_updates"]) - assert not errors, f"check_updates query validation failed: {errors}" - def test_all_docker_queries_covered(self, schema: GraphQLSchema) -> None: from unraid_mcp.tools.docker import QUERIES expected = { "list", "details", - "logs", "networks", "network_details", - "port_conflicts", - "check_updates", } assert set(QUERIES.keys()) == expected @@ -355,58 +336,12 @@ class TestDockerMutations: errors = _validate_operation(schema, MUTATIONS["stop"]) assert not errors, f"stop mutation validation failed: {errors}" - def test_pause_mutation(self, schema: GraphQLSchema) -> None: - from unraid_mcp.tools.docker import MUTATIONS - - errors = _validate_operation(schema, MUTATIONS["pause"]) - assert not errors, f"pause mutation validation failed: {errors}" - - def test_unpause_mutation(self, schema: GraphQLSchema) -> None: - from unraid_mcp.tools.docker import MUTATIONS - - errors = _validate_operation(schema, MUTATIONS["unpause"]) - assert not errors, f"unpause mutation validation failed: {errors}" - - def test_remove_mutation(self, schema: GraphQLSchema) -> None: - from unraid_mcp.tools.docker import MUTATIONS - - errors = _validate_operation(schema, MUTATIONS["remove"]) - assert not errors, f"remove mutation validation failed: {errors}" - - def test_update_mutation(self, schema: GraphQLSchema) -> None: - from unraid_mcp.tools.docker import MUTATIONS - - errors = _validate_operation(schema, MUTATIONS["update"]) - assert not errors, f"update mutation validation failed: {errors}" - - def test_update_all_mutation(self, schema: GraphQLSchema) -> None: - from unraid_mcp.tools.docker import MUTATIONS - - errors = _validate_operation(schema, MUTATIONS["update_all"]) - assert not errors, f"update_all mutation validation failed: {errors}" - def test_all_docker_mutations_covered(self, schema: GraphQLSchema) -> None: from unraid_mcp.tools.docker import MUTATIONS expected = { "start", "stop", - "pause", - "unpause", - "remove", - "update", - "update_all", - "create_folder", - "set_folder_children", - "delete_entries", - "move_to_folder", - "move_to_position", - "rename_folder", - "create_folder_with_items", - "update_view_prefs", - "sync_templates", - "reset_template_mappings", - "refresh_digests", } assert set(MUTATIONS.keys()) == expected @@ -505,16 +440,10 @@ class TestNotificationQueries: errors = _validate_operation(schema, QUERIES["list"]) assert not errors, f"list query validation failed: {errors}" - def test_warnings_query(self, schema: GraphQLSchema) -> None: - from unraid_mcp.tools.notifications import QUERIES - - errors = _validate_operation(schema, QUERIES["warnings"]) - assert not errors, f"warnings query validation failed: {errors}" - def test_all_notification_queries_covered(self, schema: GraphQLSchema) -> None: from unraid_mcp.tools.notifications import QUERIES - assert set(QUERIES.keys()) == {"overview", "list", "warnings"} + assert set(QUERIES.keys()) == {"overview", "list"} class TestNotificationMutations: @@ -562,12 +491,6 @@ class TestNotificationMutations: errors = _validate_operation(schema, MUTATIONS["archive_many"]) assert not errors, f"archive_many mutation validation failed: {errors}" - def test_create_unique_mutation(self, schema: GraphQLSchema) -> None: - from unraid_mcp.tools.notifications import MUTATIONS - - errors = _validate_operation(schema, MUTATIONS["create_unique"]) - assert not errors, f"create_unique mutation validation failed: {errors}" - def test_unarchive_many_mutation(self, schema: GraphQLSchema) -> None: from unraid_mcp.tools.notifications import MUTATIONS @@ -597,7 +520,6 @@ class TestNotificationMutations: "delete_archived", "archive_all", "archive_many", - "create_unique", "unarchive_many", "unarchive_all", "recalculate", @@ -713,10 +635,46 @@ class TestKeysMutations: errors = _validate_operation(schema, MUTATIONS["delete"]) assert not errors, f"delete mutation validation failed: {errors}" + def test_add_role_mutation(self, schema: GraphQLSchema) -> None: + from unraid_mcp.tools.keys import MUTATIONS + + errors = _validate_operation(schema, MUTATIONS["add_role"]) + assert not errors, f"add_role mutation validation failed: {errors}" + + def test_remove_role_mutation(self, schema: GraphQLSchema) -> None: + from unraid_mcp.tools.keys import MUTATIONS + + errors = _validate_operation(schema, MUTATIONS["remove_role"]) + assert not errors, f"remove_role mutation validation failed: {errors}" + def test_all_keys_mutations_covered(self, schema: GraphQLSchema) -> None: from unraid_mcp.tools.keys import MUTATIONS - assert set(MUTATIONS.keys()) == {"create", "update", "delete"} + assert set(MUTATIONS.keys()) == {"create", "update", "delete", "add_role", "remove_role"} + + +# ============================================================================ +# Settings Tool (0 queries + 2 mutations) +# ============================================================================ +class TestSettingsMutations: + """Validate all mutations from unraid_mcp/tools/settings.py.""" + + def test_update_mutation(self, schema: GraphQLSchema) -> None: + from unraid_mcp.tools.settings import MUTATIONS + + errors = _validate_operation(schema, MUTATIONS["update"]) + assert not errors, f"update mutation validation failed: {errors}" + + def test_configure_ups_mutation(self, schema: GraphQLSchema) -> None: + from unraid_mcp.tools.settings import MUTATIONS + + errors = _validate_operation(schema, MUTATIONS["configure_ups"]) + assert not errors, f"configure_ups mutation validation failed: {errors}" + + def test_all_settings_mutations_covered(self, schema: GraphQLSchema) -> None: + from unraid_mcp.tools.settings import MUTATIONS + + assert set(MUTATIONS.keys()) == {"update", "configure_ups"} # ============================================================================ @@ -770,6 +728,7 @@ class TestSchemaCompleteness: "unraid_mcp.tools.rclone", "unraid_mcp.tools.users", "unraid_mcp.tools.keys", + "unraid_mcp.tools.settings", ] failures: list[str] = [] @@ -820,6 +779,7 @@ class TestSchemaCompleteness: "unraid_mcp.tools.rclone", "unraid_mcp.tools.users", "unraid_mcp.tools.keys", + "unraid_mcp.tools.settings", ] total = 0 @@ -828,5 +788,5 @@ class TestSchemaCompleteness: total += len(getattr(mod, "QUERIES", {})) total += len(getattr(mod, "MUTATIONS", {})) - # 71 operations across all tools (queries + mutations in dicts) - assert total >= 50, f"Expected at least 50 operations, found {total}" + # Operations across all tools (queries + mutations in dicts) + assert total >= 40, f"Expected at least 40 operations, found {total}" diff --git a/tests/test_keys.py b/tests/test_keys.py index 28b23e4..b97a58e 100644 --- a/tests/test_keys.py +++ b/tests/test_keys.py @@ -22,7 +22,7 @@ def _make_tool(): class TestKeysValidation: async def test_delete_requires_confirm(self, _mock_graphql: AsyncMock) -> None: tool_fn = _make_tool() - with pytest.raises(ToolError, match="destructive"): + with pytest.raises(ToolError, match="confirm=True"): await tool_fn(action="delete", key_id="k:1") async def test_get_requires_key_id(self, _mock_graphql: AsyncMock) -> None: @@ -108,3 +108,25 @@ class TestKeysActions: tool_fn = _make_tool() with pytest.raises(ToolError, match="Failed to execute keys/list"): await tool_fn(action="list") + + async def test_add_role_requires_key_id(self, _mock_graphql: AsyncMock) -> None: + tool_fn = _make_tool() + with pytest.raises(ToolError, match="key_id"): + await tool_fn(action="add_role", roles=["VIEWER"]) + + async def test_add_role_requires_role(self, _mock_graphql: AsyncMock) -> None: + tool_fn = _make_tool() + with pytest.raises(ToolError, match="role"): + await tool_fn(action="add_role", key_id="abc:local") + + async def test_add_role_success(self, _mock_graphql: AsyncMock) -> None: + _mock_graphql.return_value = {"apiKey": {"addRole": True}} + tool_fn = _make_tool() + result = await tool_fn(action="add_role", key_id="abc:local", roles=["VIEWER"]) + assert result["success"] is True + + async def test_remove_role_success(self, _mock_graphql: AsyncMock) -> None: + _mock_graphql.return_value = {"apiKey": {"removeRole": True}} + tool_fn = _make_tool() + result = await tool_fn(action="remove_role", key_id="abc:local", roles=["VIEWER"]) + assert result["success"] is True diff --git a/unraid_mcp/tools/keys.py b/unraid_mcp/tools/keys.py index c5f08bc..7893838 100644 --- a/unraid_mcp/tools/keys.py +++ b/unraid_mcp/tools/keys.py @@ -6,11 +6,12 @@ creating, updating, and deleting API keys. from typing import Any, Literal, get_args -from fastmcp import FastMCP +from fastmcp import Context, FastMCP from ..config.logging import logger from ..core.client import make_graphql_request from ..core.exceptions import ToolError, tool_error_handler +from ..core.setup import elicit_destructive_confirmation QUERIES: dict[str, str] = { @@ -42,17 +43,29 @@ MUTATIONS: dict[str, str] = { apiKey { delete(input: $input) } } """, + "add_role": """ + mutation AddRole($input: AddRoleForApiKeyInput!) { + apiKey { addRole(input: $input) } + } + """, + "remove_role": """ + mutation RemoveRole($input: RemoveRoleFromApiKeyInput!) { + apiKey { removeRole(input: $input) } + } + """, } DESTRUCTIVE_ACTIONS = {"delete"} ALL_ACTIONS = set(QUERIES) | set(MUTATIONS) KEY_ACTIONS = Literal[ - "list", - "get", + "add_role", "create", - "update", "delete", + "get", + "list", + "remove_role", + "update", ] if set(get_args(KEY_ACTIONS)) != ALL_ACTIONS: @@ -70,6 +83,7 @@ def register_keys_tool(mcp: FastMCP) -> None: @mcp.tool() async def unraid_keys( action: KEY_ACTIONS, + ctx: Context | None = None, confirm: bool = False, key_id: str | None = None, name: str | None = None, @@ -84,12 +98,20 @@ def register_keys_tool(mcp: FastMCP) -> None: create - Create a new API key (requires name; optional roles, permissions) update - Update an API key (requires key_id; optional name, roles) delete - Delete API keys (requires key_id, confirm=True) + add_role - Add a role to an API key (requires key_id and roles) + remove_role - Remove a role from an API key (requires key_id and roles) """ if action not in ALL_ACTIONS: raise ToolError(f"Invalid action '{action}'. Must be one of: {sorted(ALL_ACTIONS)}") if action in DESTRUCTIVE_ACTIONS and not confirm: - raise ToolError(f"Action '{action}' is destructive. Set confirm=True to proceed.") + _desc = f"Delete API key **{key_id}**. Any clients using this key will lose access." + confirmed = await elicit_destructive_confirmation(ctx, action, _desc) + if not confirmed: + raise ToolError( + f"Action '{action}' was not confirmed. " + "Re-run with confirm=True to bypass elicitation." + ) with tool_error_handler("keys", action, logger): logger.info(f"Executing unraid_keys action={action}") @@ -147,6 +169,35 @@ def register_keys_tool(mcp: FastMCP) -> None: "message": f"API key '{key_id}' deleted", } + if action == "add_role": + if not key_id: + raise ToolError("key_id is required for 'add_role' action") + if not roles or len(roles) == 0: + raise ToolError( + "role is required for 'add_role' action (pass as roles=['ROLE_NAME'])" + ) + data = await make_graphql_request( + MUTATIONS["add_role"], + {"input": {"apiKeyId": key_id, "role": roles[0]}}, + ) + return {"success": True, "message": f"Role '{roles[0]}' added to key '{key_id}'"} + + if action == "remove_role": + if not key_id: + raise ToolError("key_id is required for 'remove_role' action") + if not roles or len(roles) == 0: + raise ToolError( + "role is required for 'remove_role' action (pass as roles=['ROLE_NAME'])" + ) + data = await make_graphql_request( + MUTATIONS["remove_role"], + {"input": {"apiKeyId": key_id, "role": roles[0]}}, + ) + return { + "success": True, + "message": f"Role '{roles[0]}' removed from key '{key_id}'", + } + raise ToolError(f"Unhandled action '{action}' — this is a bug") logger.info("Keys tool registered successfully")