feat: enhance test suite with 275 new tests across 4 validation categories

Add comprehensive test coverage beyond unit tests:
- Schema validation (93 tests): Validate all GraphQL queries/mutations against extracted Unraid API schema
- HTTP layer (88 tests): Test request construction, timeouts, and error handling at httpx level
- Subscriptions (55 tests): WebSocket lifecycle, reconnection, and protocol validation
- Safety audit (39 tests): Enforce destructive action confirmation requirements

Total test count increased from 210 to 485 (130% increase), all passing in 5.91s.

New dependencies:
- graphql-core>=3.2.0 for schema validation
- respx>=0.22.0 for HTTP layer mocking

Files created:
- docs/unraid-schema.graphql (150-type GraphQL schema)
- tests/schema/test_query_validation.py
- tests/http_layer/test_request_construction.py
- tests/integration/test_subscriptions.py
- tests/safety/test_destructive_guards.py

Co-authored-by: Claude <claude@anthropic.com>
This commit is contained in:
Jacob Magar
2026-02-15 22:35:19 -05:00
parent abb7915672
commit fb86097709
6 changed files with 2652 additions and 499 deletions

1428
docs/unraid-schema.graphql Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -144,7 +144,6 @@ class TestTimeoutHandling:
route = respx.post(API_URL).mock(return_value=_graphql_response({"data": {}}))
custom = httpx.Timeout(10.0, read=120.0)
await make_graphql_request("query { info }", custom_timeout=custom)
# The request was made successfully (no timeout error)
assert route.called
@@ -307,7 +306,9 @@ class TestInfoToolRequests:
@respx.mock
async def test_metrics_sends_correct_query(self) -> None:
route = respx.post(API_URL).mock(
return_value=_graphql_response({"metrics": {"cpu": {"used": 50}, "memory": {"used": 4096, "total": 16384}}})
return_value=_graphql_response(
{"metrics": {"cpu": {"used": 50}, "memory": {"used": 4096, "total": 16384}}}
)
)
tool = self._get_tool()
await tool(action="metrics")
@@ -372,7 +373,9 @@ class TestDockerToolRequests:
async def test_list_sends_correct_query(self) -> None:
route = respx.post(API_URL).mock(
return_value=_graphql_response(
{"docker": {"containers": [{"id": "c1", "names": ["plex"], "state": "running"}]}}
{"docker": {"containers": [
{"id": "c1", "names": ["plex"], "state": "running"}
]}}
)
)
tool = self._get_tool()
@@ -383,11 +386,12 @@ class TestDockerToolRequests:
@respx.mock
async def test_start_sends_mutation_with_id(self) -> None:
container_id = "a" * 64
# First call: resolve container ID (already matches pattern, so no resolution needed)
# The tool sends the mutation directly since the ID matches _DOCKER_ID_PATTERN
route = respx.post(API_URL).mock(
return_value=_graphql_response(
{"docker": {"start": {"id": container_id, "names": ["plex"], "state": "running", "status": "Up"}}}
{"docker": {"start": {
"id": container_id, "names": ["plex"],
"state": "running", "status": "Up",
}}}
)
)
tool = self._get_tool()
@@ -401,7 +405,10 @@ class TestDockerToolRequests:
container_id = "b" * 64
route = respx.post(API_URL).mock(
return_value=_graphql_response(
{"docker": {"stop": {"id": container_id, "names": ["sonarr"], "state": "exited", "status": "Exited"}}}
{"docker": {"stop": {
"id": container_id, "names": ["sonarr"],
"state": "exited", "status": "Exited",
}}}
)
)
tool = self._get_tool()
@@ -443,7 +450,9 @@ class TestDockerToolRequests:
async def test_networks_sends_correct_query(self) -> None:
route = respx.post(API_URL).mock(
return_value=_graphql_response(
{"dockerNetworks": [{"id": "n1", "name": "bridge", "driver": "bridge", "scope": "local"}]}
{"dockerNetworks": [
{"id": "n1", "name": "bridge", "driver": "bridge", "scope": "local"}
]}
)
)
tool = self._get_tool()
@@ -475,11 +484,17 @@ class TestDockerToolRequests:
call_count += 1
if "StopContainer" in body["query"]:
return _graphql_response(
{"docker": {"stop": {"id": container_id, "names": ["app"], "state": "exited", "status": "Exited"}}}
{"docker": {"stop": {
"id": container_id, "names": ["app"],
"state": "exited", "status": "Exited",
}}}
)
if "StartContainer" in body["query"]:
return _graphql_response(
{"docker": {"start": {"id": container_id, "names": ["app"], "state": "running", "status": "Up"}}}
{"docker": {"start": {
"id": container_id, "names": ["app"],
"state": "running", "status": "Up",
}}}
)
return _graphql_response({"docker": {"containers": []}})
@@ -492,7 +507,7 @@ class TestDockerToolRequests:
@respx.mock
async def test_container_name_resolution(self) -> None:
"""When a name is provided instead of a PrefixedID, the tool resolves it first."""
"""When a name is provided instead of a PrefixedID, the tool resolves it."""
resolved_id = "f" * 64
call_count = 0
@@ -506,7 +521,10 @@ class TestDockerToolRequests:
)
if "StartContainer" in body["query"]:
return _graphql_response(
{"docker": {"start": {"id": resolved_id, "names": ["plex"], "state": "running", "status": "Up"}}}
{"docker": {"start": {
"id": resolved_id, "names": ["plex"],
"state": "running", "status": "Up",
}}}
)
return _graphql_response({})
@@ -527,13 +545,17 @@ class TestVMToolRequests:
@staticmethod
def _get_tool():
return make_tool_fn("unraid_mcp.tools.virtualization", "register_vm_tool", "unraid_vm")
return make_tool_fn(
"unraid_mcp.tools.virtualization", "register_vm_tool", "unraid_vm"
)
@respx.mock
async def test_list_sends_correct_query(self) -> None:
route = respx.post(API_URL).mock(
return_value=_graphql_response(
{"vms": {"domains": [{"id": "v1", "name": "win10", "state": "running", "uuid": "u1"}]}}
{"vms": {"domains": [
{"id": "v1", "name": "win10", "state": "running", "uuid": "u1"}
]}}
)
)
tool = self._get_tool()
@@ -590,7 +612,7 @@ class TestVMToolRequests:
@respx.mock
async def test_details_finds_vm_by_name(self) -> None:
route = respx.post(API_URL).mock(
respx.post(API_URL).mock(
return_value=_graphql_response(
{"vms": {"domains": [
{"id": "v1", "name": "win10", "state": "running", "uuid": "uuid-1"},
@@ -619,7 +641,9 @@ class TestArrayToolRequests:
async def test_parity_status_sends_correct_query(self) -> None:
route = respx.post(API_URL).mock(
return_value=_graphql_response(
{"array": {"parityCheckStatus": {"progress": 50, "speed": "100 MB/s", "errors": 0}}}
{"array": {"parityCheckStatus": {
"progress": 50, "speed": "100 MB/s", "errors": 0,
}}}
)
)
tool = self._get_tool()
@@ -680,7 +704,9 @@ class TestStorageToolRequests:
@staticmethod
def _get_tool():
return make_tool_fn("unraid_mcp.tools.storage", "register_storage_tool", "unraid_storage")
return make_tool_fn(
"unraid_mcp.tools.storage", "register_storage_tool", "unraid_storage"
)
@respx.mock
async def test_shares_sends_correct_query(self) -> None:
@@ -696,7 +722,9 @@ class TestStorageToolRequests:
@respx.mock
async def test_disks_sends_correct_query(self) -> None:
route = respx.post(API_URL).mock(
return_value=_graphql_response({"disks": [{"id": "d1", "device": "sda", "name": "Disk 1"}]})
return_value=_graphql_response(
{"disks": [{"id": "d1", "device": "sda", "name": "Disk 1"}]}
)
)
tool = self._get_tool()
await tool(action="disks")
@@ -707,7 +735,10 @@ class TestStorageToolRequests:
async def test_disk_details_sends_variable(self) -> None:
route = respx.post(API_URL).mock(
return_value=_graphql_response(
{"disk": {"id": "d1", "device": "sda", "name": "Disk 1", "serialNum": "SN123", "size": 1000000, "temperature": 35}}
{"disk": {
"id": "d1", "device": "sda", "name": "Disk 1",
"serialNum": "SN123", "size": 1000000, "temperature": 35,
}}
)
)
tool = self._get_tool()
@@ -719,7 +750,9 @@ class TestStorageToolRequests:
@respx.mock
async def test_log_files_sends_correct_query(self) -> None:
route = respx.post(API_URL).mock(
return_value=_graphql_response({"logFiles": [{"name": "syslog", "path": "/var/log/syslog"}]})
return_value=_graphql_response(
{"logFiles": [{"name": "syslog", "path": "/var/log/syslog"}]}
)
)
tool = self._get_tool()
result = await tool(action="log_files")
@@ -731,7 +764,10 @@ class TestStorageToolRequests:
async def test_logs_sends_path_and_lines_variables(self) -> None:
route = respx.post(API_URL).mock(
return_value=_graphql_response(
{"logFile": {"path": "/var/log/syslog", "content": "log line", "totalLines": 100, "startLine": 1}}
{"logFile": {
"path": "/var/log/syslog", "content": "log line",
"totalLines": 100, "startLine": 1,
}}
)
)
tool = self._get_tool()
@@ -770,14 +806,18 @@ class TestNotificationsToolRequests:
@staticmethod
def _get_tool():
return make_tool_fn(
"unraid_mcp.tools.notifications", "register_notifications_tool", "unraid_notifications"
"unraid_mcp.tools.notifications",
"register_notifications_tool",
"unraid_notifications",
)
@respx.mock
async def test_overview_sends_correct_query(self) -> None:
route = respx.post(API_URL).mock(
return_value=_graphql_response(
{"notifications": {"overview": {"unread": {"info": 1, "warning": 0, "alert": 0, "total": 1}}}}
{"notifications": {"overview": {
"unread": {"info": 1, "warning": 0, "alert": 0, "total": 1},
}}}
)
)
tool = self._get_tool()
@@ -791,7 +831,9 @@ class TestNotificationsToolRequests:
return_value=_graphql_response({"notifications": {"list": []}})
)
tool = self._get_tool()
await tool(action="list", list_type="ARCHIVE", importance="WARNING", offset=5, limit=10)
await tool(
action="list", list_type="ARCHIVE", importance="WARNING", offset=5, limit=10
)
body = _extract_request_body(route.calls.last.request)
assert "ListNotifications" in body["query"]
filt = body["variables"]["filter"]
@@ -815,12 +857,18 @@ class TestNotificationsToolRequests:
async def test_create_sends_input_variables(self) -> None:
route = respx.post(API_URL).mock(
return_value=_graphql_response(
{"notifications": {"createNotification": {"id": "n1", "title": "Test", "importance": "INFO"}}}
{"notifications": {"createNotification": {
"id": "n1", "title": "Test", "importance": "INFO",
}}}
)
)
tool = self._get_tool()
await tool(
action="create", title="Test", subject="Sub", description="Desc", importance="info"
action="create",
title="Test",
subject="Sub",
description="Desc",
importance="info",
)
body = _extract_request_body(route.calls.last.request)
assert "CreateNotification" in body["query"]
@@ -832,7 +880,9 @@ class TestNotificationsToolRequests:
@respx.mock
async def test_archive_sends_id_variable(self) -> None:
route = respx.post(API_URL).mock(
return_value=_graphql_response({"notifications": {"archiveNotification": True}})
return_value=_graphql_response(
{"notifications": {"archiveNotification": True}}
)
)
tool = self._get_tool()
await tool(action="archive", notification_id="notif-1")
@@ -849,11 +899,16 @@ class TestNotificationsToolRequests:
@respx.mock
async def test_delete_sends_id_and_type(self) -> None:
route = respx.post(API_URL).mock(
return_value=_graphql_response({"notifications": {"deleteNotification": True}})
return_value=_graphql_response(
{"notifications": {"deleteNotification": True}}
)
)
tool = self._get_tool()
await tool(
action="delete", notification_id="n1", notification_type="unread", confirm=True
action="delete",
notification_id="n1",
notification_type="unread",
confirm=True,
)
body = _extract_request_body(route.calls.last.request)
assert "DeleteNotification" in body["query"]
@@ -863,7 +918,9 @@ class TestNotificationsToolRequests:
@respx.mock
async def test_archive_all_sends_importance_when_provided(self) -> None:
route = respx.post(API_URL).mock(
return_value=_graphql_response({"notifications": {"archiveAll": True}})
return_value=_graphql_response(
{"notifications": {"archiveAll": True}}
)
)
tool = self._get_tool()
await tool(action="archive_all", importance="warning")
@@ -882,7 +939,9 @@ class TestRCloneToolRequests:
@staticmethod
def _get_tool():
return make_tool_fn("unraid_mcp.tools.rclone", "register_rclone_tool", "unraid_rclone")
return make_tool_fn(
"unraid_mcp.tools.rclone", "register_rclone_tool", "unraid_rclone"
)
@respx.mock
async def test_list_remotes_sends_correct_query(self) -> None:
@@ -901,7 +960,9 @@ class TestRCloneToolRequests:
async def test_config_form_sends_provider_type(self) -> None:
route = respx.post(API_URL).mock(
return_value=_graphql_response(
{"rclone": {"configForm": {"id": "form1", "dataSchema": {}, "uiSchema": {}}}}
{"rclone": {"configForm": {
"id": "form1", "dataSchema": {}, "uiSchema": {},
}}}
)
)
tool = self._get_tool()
@@ -914,7 +975,9 @@ class TestRCloneToolRequests:
async def test_create_remote_sends_input_variables(self) -> None:
route = respx.post(API_URL).mock(
return_value=_graphql_response(
{"rclone": {"createRCloneRemote": {"name": "my-s3", "type": "s3", "parameters": {}}}}
{"rclone": {"createRCloneRemote": {
"name": "my-s3", "type": "s3", "parameters": {},
}}}
)
)
tool = self._get_tool()
@@ -960,13 +1023,18 @@ class TestUsersToolRequests:
@staticmethod
def _get_tool():
return make_tool_fn("unraid_mcp.tools.users", "register_users_tool", "unraid_users")
return make_tool_fn(
"unraid_mcp.tools.users", "register_users_tool", "unraid_users"
)
@respx.mock
async def test_me_sends_correct_query(self) -> None:
route = respx.post(API_URL).mock(
return_value=_graphql_response(
{"me": {"id": "u1", "name": "admin", "description": "Admin", "roles": ["admin"]}}
{"me": {
"id": "u1", "name": "admin",
"description": "Admin", "roles": ["admin"],
}}
)
)
tool = self._get_tool()
@@ -991,7 +1059,9 @@ class TestKeysToolRequests:
@respx.mock
async def test_list_sends_correct_query(self) -> None:
route = respx.post(API_URL).mock(
return_value=_graphql_response({"apiKeys": [{"id": "k1", "name": "my-key"}]})
return_value=_graphql_response(
{"apiKeys": [{"id": "k1", "name": "my-key"}]}
)
)
tool = self._get_tool()
result = await tool(action="list")
@@ -1016,7 +1086,10 @@ class TestKeysToolRequests:
async def test_create_sends_input_variables(self) -> None:
route = respx.post(API_URL).mock(
return_value=_graphql_response(
{"createApiKey": {"id": "k2", "name": "new-key", "key": "secret", "roles": ["read"]}}
{"createApiKey": {
"id": "k2", "name": "new-key",
"key": "secret", "roles": ["read"],
}}
)
)
tool = self._get_tool()
@@ -1072,7 +1145,9 @@ class TestHealthToolRequests:
@staticmethod
def _get_tool():
return make_tool_fn("unraid_mcp.tools.health", "register_health_tool", "unraid_health")
return make_tool_fn(
"unraid_mcp.tools.health", "register_health_tool", "unraid_health"
)
@respx.mock
async def test_test_connection_sends_online_query(self) -> None:
@@ -1097,8 +1172,12 @@ class TestHealthToolRequests:
"os": {"uptime": 86400},
},
"array": {"state": "STARTED"},
"notifications": {"overview": {"unread": {"alert": 0, "warning": 1, "total": 3}}},
"docker": {"containers": [{"id": "c1", "state": "running", "status": "Up"}]},
"notifications": {
"overview": {"unread": {"alert": 0, "warning": 1, "total": 3}},
},
"docker": {
"containers": [{"id": "c1", "state": "running", "status": "Up"}],
},
})
)
tool = self._get_tool()
@@ -1110,7 +1189,9 @@ class TestHealthToolRequests:
@respx.mock
async def test_test_connection_measures_latency(self) -> None:
respx.post(API_URL).mock(return_value=_graphql_response({"online": True}))
respx.post(API_URL).mock(
return_value=_graphql_response({"online": True})
)
tool = self._get_tool()
result = await tool(action="test_connection")
assert "latency_ms" in result
@@ -1120,9 +1201,15 @@ class TestHealthToolRequests:
async def test_check_reports_warning_on_alerts(self) -> None:
respx.post(API_URL).mock(
return_value=_graphql_response({
"info": {"machineId": "m1", "time": 0, "versions": {"unraid": "7.0"}, "os": {"uptime": 0}},
"info": {
"machineId": "m1", "time": 0,
"versions": {"unraid": "7.0"},
"os": {"uptime": 0},
},
"array": {"state": "STARTED"},
"notifications": {"overview": {"unread": {"alert": 3, "warning": 0, "total": 5}}},
"notifications": {
"overview": {"unread": {"alert": 3, "warning": 0, "total": 5}},
},
"docker": {"containers": []},
})
)
@@ -1148,7 +1235,7 @@ class TestCrossCuttingConcerns:
pytest.raises(ToolError, match="UNRAID_API_URL not configured"),
):
await make_graphql_request("query { online }")
assert not route.called # HTTP request should never be made
assert not route.called
@respx.mock
async def test_missing_api_key_raises_before_http_call(self) -> None:
@@ -1163,16 +1250,24 @@ class TestCrossCuttingConcerns:
@respx.mock
async def test_tool_error_from_http_layer_propagates(self) -> None:
"""When an HTTP error occurs, the ToolError bubbles up through the tool."""
respx.post(API_URL).mock(return_value=httpx.Response(500, text="Server Error"))
tool = make_tool_fn("unraid_mcp.tools.info", "register_info_tool", "unraid_info")
respx.post(API_URL).mock(
return_value=httpx.Response(500, text="Server Error")
)
tool = make_tool_fn(
"unraid_mcp.tools.info", "register_info_tool", "unraid_info"
)
with pytest.raises(ToolError, match="HTTP error 500"):
await tool(action="online")
@respx.mock
async def test_network_error_propagates_through_tool(self) -> None:
"""When a network error occurs, the ToolError bubbles up through the tool."""
respx.post(API_URL).mock(side_effect=httpx.ConnectError("Connection refused"))
tool = make_tool_fn("unraid_mcp.tools.info", "register_info_tool", "unraid_info")
respx.post(API_URL).mock(
side_effect=httpx.ConnectError("Connection refused")
)
tool = make_tool_fn(
"unraid_mcp.tools.info", "register_info_tool", "unraid_info"
)
with pytest.raises(ToolError, match="Network connection error"):
await tool(action="online")
@@ -1180,8 +1275,12 @@ class TestCrossCuttingConcerns:
async def test_graphql_error_propagates_through_tool(self) -> None:
"""When a GraphQL error occurs, the ToolError bubbles up through the tool."""
respx.post(API_URL).mock(
return_value=_graphql_response(errors=[{"message": "Permission denied"}])
return_value=_graphql_response(
errors=[{"message": "Permission denied"}]
)
)
tool = make_tool_fn(
"unraid_mcp.tools.info", "register_info_tool", "unraid_info"
)
tool = make_tool_fn("unraid_mcp.tools.info", "register_info_tool", "unraid_info")
with pytest.raises(ToolError, match="Permission denied"):
await tool(action="online")

File diff suppressed because it is too large Load Diff

0
tests/schema/__init__.py Normal file
View File

View File

@@ -0,0 +1,746 @@
"""Schema validation tests for all GraphQL queries and mutations.
Validates every query and mutation in the tool QUERIES/MUTATIONS dicts
against the Unraid GraphQL SDL schema to catch syntax errors, missing
fields, and type mismatches before they reach production.
"""
from pathlib import Path
import pytest
from graphql import DocumentNode, GraphQLSchema, build_schema, parse, validate
SCHEMA_PATH = Path(__file__).resolve().parents[2] / "docs" / "unraid-schema.graphql"
@pytest.fixture(scope="module")
def schema() -> GraphQLSchema:
"""Load and cache the Unraid GraphQL schema for the entire test module."""
schema_sdl = SCHEMA_PATH.read_text()
return build_schema(schema_sdl)
def _validate_operation(schema: GraphQLSchema, query_str: str) -> list[str]:
"""Parse and validate a GraphQL operation against the schema."""
doc: DocumentNode = parse(query_str)
errors = validate(schema, doc)
return [str(e) for e in errors]
# ============================================================================
# Info Tool (19 queries)
# ============================================================================
class TestInfoQueries:
"""Validate all queries from unraid_mcp/tools/info.py."""
def test_overview_query(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.info import QUERIES
errors = _validate_operation(schema, QUERIES["overview"])
assert not errors, f"overview query validation failed: {errors}"
def test_array_query(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.info import QUERIES
errors = _validate_operation(schema, QUERIES["array"])
assert not errors, f"array query validation failed: {errors}"
def test_network_query(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.info import QUERIES
errors = _validate_operation(schema, QUERIES["network"])
assert not errors, f"network query validation failed: {errors}"
def test_registration_query(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.info import QUERIES
errors = _validate_operation(schema, QUERIES["registration"])
assert not errors, f"registration query validation failed: {errors}"
def test_connect_query(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.info import QUERIES
errors = _validate_operation(schema, QUERIES["connect"])
assert not errors, f"connect query validation failed: {errors}"
def test_variables_query(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.info import QUERIES
errors = _validate_operation(schema, QUERIES["variables"])
assert not errors, f"variables query validation failed: {errors}"
def test_metrics_query(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.info import QUERIES
errors = _validate_operation(schema, QUERIES["metrics"])
assert not errors, f"metrics query validation failed: {errors}"
def test_services_query(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.info import QUERIES
errors = _validate_operation(schema, QUERIES["services"])
assert not errors, f"services query validation failed: {errors}"
def test_display_query(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.info import QUERIES
errors = _validate_operation(schema, QUERIES["display"])
assert not errors, f"display query validation failed: {errors}"
def test_config_query(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.info import QUERIES
errors = _validate_operation(schema, QUERIES["config"])
assert not errors, f"config query validation failed: {errors}"
def test_online_query(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.info import QUERIES
errors = _validate_operation(schema, QUERIES["online"])
assert not errors, f"online query validation failed: {errors}"
def test_owner_query(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.info import QUERIES
errors = _validate_operation(schema, QUERIES["owner"])
assert not errors, f"owner query validation failed: {errors}"
def test_settings_query(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.info import QUERIES
errors = _validate_operation(schema, QUERIES["settings"])
assert not errors, f"settings query validation failed: {errors}"
def test_server_query(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.info import QUERIES
errors = _validate_operation(schema, QUERIES["server"])
assert not errors, f"server query validation failed: {errors}"
def test_servers_query(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.info import QUERIES
errors = _validate_operation(schema, QUERIES["servers"])
assert not errors, f"servers query validation failed: {errors}"
def test_flash_query(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.info import QUERIES
errors = _validate_operation(schema, QUERIES["flash"])
assert not errors, f"flash query validation failed: {errors}"
def test_ups_devices_query(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.info import QUERIES
errors = _validate_operation(schema, QUERIES["ups_devices"])
assert not errors, f"ups_devices query validation failed: {errors}"
def test_ups_device_query(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.info import QUERIES
errors = _validate_operation(schema, QUERIES["ups_device"])
assert not errors, f"ups_device query validation failed: {errors}"
def test_ups_config_query(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.info import QUERIES
errors = _validate_operation(schema, QUERIES["ups_config"])
assert not errors, f"ups_config query validation failed: {errors}"
def test_all_info_actions_covered(self, schema: GraphQLSchema) -> None:
"""Ensure every key in QUERIES has a corresponding test."""
from unraid_mcp.tools.info import QUERIES
expected_actions = {
"overview", "array", "network", "registration", "connect",
"variables", "metrics", "services", "display", "config",
"online", "owner", "settings", "server", "servers",
"flash", "ups_devices", "ups_device", "ups_config",
}
assert set(QUERIES.keys()) == expected_actions
# ============================================================================
# Array Tool (1 query + 4 mutations)
# ============================================================================
class TestArrayQueries:
"""Validate all queries from unraid_mcp/tools/array.py."""
def test_parity_status_query(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.array import QUERIES
errors = _validate_operation(schema, QUERIES["parity_status"])
assert not errors, f"parity_status query validation failed: {errors}"
def test_all_array_queries_covered(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.array import QUERIES
assert set(QUERIES.keys()) == {"parity_status"}
class TestArrayMutations:
"""Validate all mutations from unraid_mcp/tools/array.py."""
def test_parity_start_mutation(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.array import MUTATIONS
errors = _validate_operation(schema, MUTATIONS["parity_start"])
assert not errors, f"parity_start mutation validation failed: {errors}"
def test_parity_pause_mutation(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.array import MUTATIONS
errors = _validate_operation(schema, MUTATIONS["parity_pause"])
assert not errors, f"parity_pause mutation validation failed: {errors}"
def test_parity_resume_mutation(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.array import MUTATIONS
errors = _validate_operation(schema, MUTATIONS["parity_resume"])
assert not errors, f"parity_resume mutation validation failed: {errors}"
def test_parity_cancel_mutation(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.array import MUTATIONS
errors = _validate_operation(schema, MUTATIONS["parity_cancel"])
assert not errors, f"parity_cancel mutation validation failed: {errors}"
def test_all_array_mutations_covered(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.array import MUTATIONS
expected = {"parity_start", "parity_pause", "parity_resume", "parity_cancel"}
assert set(MUTATIONS.keys()) == expected
# ============================================================================
# Storage Tool (6 queries)
# ============================================================================
class TestStorageQueries:
"""Validate all queries from unraid_mcp/tools/storage.py."""
def test_shares_query(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.storage import QUERIES
errors = _validate_operation(schema, QUERIES["shares"])
assert not errors, f"shares query validation failed: {errors}"
def test_disks_query(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.storage import QUERIES
errors = _validate_operation(schema, QUERIES["disks"])
assert not errors, f"disks query validation failed: {errors}"
def test_disk_details_query(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.storage import QUERIES
errors = _validate_operation(schema, QUERIES["disk_details"])
assert not errors, f"disk_details query validation failed: {errors}"
def test_unassigned_query(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.storage import QUERIES
errors = _validate_operation(schema, QUERIES["unassigned"])
assert not errors, f"unassigned query validation failed: {errors}"
def test_log_files_query(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.storage import QUERIES
errors = _validate_operation(schema, QUERIES["log_files"])
assert not errors, f"log_files query validation failed: {errors}"
def test_logs_query(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.storage import QUERIES
errors = _validate_operation(schema, QUERIES["logs"])
assert not errors, f"logs query validation failed: {errors}"
def test_all_storage_queries_covered(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.storage import QUERIES
expected = {"shares", "disks", "disk_details", "unassigned", "log_files", "logs"}
assert set(QUERIES.keys()) == expected
# ============================================================================
# Docker Tool (7 queries + 7 mutations)
# ============================================================================
class TestDockerQueries:
"""Validate all queries from unraid_mcp/tools/docker.py."""
def test_list_query(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.docker import QUERIES
errors = _validate_operation(schema, QUERIES["list"])
assert not errors, f"list query validation failed: {errors}"
def test_details_query(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.docker import QUERIES
errors = _validate_operation(schema, QUERIES["details"])
assert not errors, f"details query validation failed: {errors}"
def test_logs_query(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.docker import QUERIES
errors = _validate_operation(schema, QUERIES["logs"])
assert not errors, f"logs query validation failed: {errors}"
def test_networks_query(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.docker import QUERIES
errors = _validate_operation(schema, QUERIES["networks"])
assert not errors, f"networks query validation failed: {errors}"
def test_network_details_query(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.docker import QUERIES
errors = _validate_operation(schema, QUERIES["network_details"])
assert not errors, f"network_details query validation failed: {errors}"
def test_port_conflicts_query(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.docker import QUERIES
errors = _validate_operation(schema, QUERIES["port_conflicts"])
assert not errors, f"port_conflicts query validation failed: {errors}"
def test_check_updates_query(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.docker import QUERIES
errors = _validate_operation(schema, QUERIES["check_updates"])
assert not errors, f"check_updates query validation failed: {errors}"
def test_all_docker_queries_covered(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.docker import QUERIES
expected = {
"list", "details", "logs", "networks",
"network_details", "port_conflicts", "check_updates",
}
assert set(QUERIES.keys()) == expected
class TestDockerMutations:
"""Validate all mutations from unraid_mcp/tools/docker.py."""
def test_start_mutation(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.docker import MUTATIONS
errors = _validate_operation(schema, MUTATIONS["start"])
assert not errors, f"start mutation validation failed: {errors}"
def test_stop_mutation(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.docker import MUTATIONS
errors = _validate_operation(schema, MUTATIONS["stop"])
assert not errors, f"stop mutation validation failed: {errors}"
def test_pause_mutation(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.docker import MUTATIONS
errors = _validate_operation(schema, MUTATIONS["pause"])
assert not errors, f"pause mutation validation failed: {errors}"
def test_unpause_mutation(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.docker import MUTATIONS
errors = _validate_operation(schema, MUTATIONS["unpause"])
assert not errors, f"unpause mutation validation failed: {errors}"
def test_remove_mutation(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.docker import MUTATIONS
errors = _validate_operation(schema, MUTATIONS["remove"])
assert not errors, f"remove mutation validation failed: {errors}"
def test_update_mutation(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.docker import MUTATIONS
errors = _validate_operation(schema, MUTATIONS["update"])
assert not errors, f"update mutation validation failed: {errors}"
def test_update_all_mutation(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.docker import MUTATIONS
errors = _validate_operation(schema, MUTATIONS["update_all"])
assert not errors, f"update_all mutation validation failed: {errors}"
def test_all_docker_mutations_covered(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.docker import MUTATIONS
expected = {"start", "stop", "pause", "unpause", "remove", "update", "update_all"}
assert set(MUTATIONS.keys()) == expected
# ============================================================================
# VM Tool (1 query + 7 mutations)
# ============================================================================
class TestVmQueries:
"""Validate all queries from unraid_mcp/tools/virtualization.py."""
def test_list_query(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.virtualization import QUERIES
errors = _validate_operation(schema, QUERIES["list"])
assert not errors, f"list query validation failed: {errors}"
def test_all_vm_queries_covered(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.virtualization import QUERIES
assert set(QUERIES.keys()) == {"list"}
class TestVmMutations:
"""Validate all mutations from unraid_mcp/tools/virtualization.py."""
def test_start_mutation(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.virtualization import MUTATIONS
errors = _validate_operation(schema, MUTATIONS["start"])
assert not errors, f"start mutation validation failed: {errors}"
def test_stop_mutation(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.virtualization import MUTATIONS
errors = _validate_operation(schema, MUTATIONS["stop"])
assert not errors, f"stop mutation validation failed: {errors}"
def test_pause_mutation(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.virtualization import MUTATIONS
errors = _validate_operation(schema, MUTATIONS["pause"])
assert not errors, f"pause mutation validation failed: {errors}"
def test_resume_mutation(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.virtualization import MUTATIONS
errors = _validate_operation(schema, MUTATIONS["resume"])
assert not errors, f"resume mutation validation failed: {errors}"
def test_force_stop_mutation(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.virtualization import MUTATIONS
errors = _validate_operation(schema, MUTATIONS["force_stop"])
assert not errors, f"force_stop mutation validation failed: {errors}"
def test_reboot_mutation(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.virtualization import MUTATIONS
errors = _validate_operation(schema, MUTATIONS["reboot"])
assert not errors, f"reboot mutation validation failed: {errors}"
def test_reset_mutation(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.virtualization import MUTATIONS
errors = _validate_operation(schema, MUTATIONS["reset"])
assert not errors, f"reset mutation validation failed: {errors}"
def test_all_vm_mutations_covered(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.virtualization import MUTATIONS
expected = {"start", "stop", "pause", "resume", "force_stop", "reboot", "reset"}
assert set(MUTATIONS.keys()) == expected
# ============================================================================
# Notifications Tool (3 queries + 6 mutations)
# ============================================================================
class TestNotificationQueries:
"""Validate all queries from unraid_mcp/tools/notifications.py."""
def test_overview_query(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.notifications import QUERIES
errors = _validate_operation(schema, QUERIES["overview"])
assert not errors, f"overview query validation failed: {errors}"
def test_list_query(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.notifications import QUERIES
errors = _validate_operation(schema, QUERIES["list"])
assert not errors, f"list query validation failed: {errors}"
def test_warnings_query(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.notifications import QUERIES
errors = _validate_operation(schema, QUERIES["warnings"])
assert not errors, f"warnings query validation failed: {errors}"
def test_all_notification_queries_covered(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.notifications import QUERIES
assert set(QUERIES.keys()) == {"overview", "list", "warnings"}
class TestNotificationMutations:
"""Validate all mutations from unraid_mcp/tools/notifications.py."""
def test_create_mutation(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.notifications import MUTATIONS
errors = _validate_operation(schema, MUTATIONS["create"])
assert not errors, f"create mutation validation failed: {errors}"
def test_archive_mutation(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.notifications import MUTATIONS
errors = _validate_operation(schema, MUTATIONS["archive"])
assert not errors, f"archive mutation validation failed: {errors}"
def test_unread_mutation(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.notifications import MUTATIONS
errors = _validate_operation(schema, MUTATIONS["unread"])
assert not errors, f"unread mutation validation failed: {errors}"
def test_delete_mutation(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.notifications import MUTATIONS
errors = _validate_operation(schema, MUTATIONS["delete"])
assert not errors, f"delete mutation validation failed: {errors}"
def test_delete_archived_mutation(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.notifications import MUTATIONS
errors = _validate_operation(schema, MUTATIONS["delete_archived"])
assert not errors, f"delete_archived mutation validation failed: {errors}"
def test_archive_all_mutation(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.notifications import MUTATIONS
errors = _validate_operation(schema, MUTATIONS["archive_all"])
assert not errors, f"archive_all mutation validation failed: {errors}"
def test_all_notification_mutations_covered(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.notifications import MUTATIONS
expected = {"create", "archive", "unread", "delete", "delete_archived", "archive_all"}
assert set(MUTATIONS.keys()) == expected
# ============================================================================
# RClone Tool (2 queries + 2 mutations)
# ============================================================================
class TestRcloneQueries:
"""Validate all queries from unraid_mcp/tools/rclone.py."""
def test_list_remotes_query(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.rclone import QUERIES
errors = _validate_operation(schema, QUERIES["list_remotes"])
assert not errors, f"list_remotes query validation failed: {errors}"
def test_config_form_query(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.rclone import QUERIES
errors = _validate_operation(schema, QUERIES["config_form"])
assert not errors, f"config_form query validation failed: {errors}"
def test_all_rclone_queries_covered(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.rclone import QUERIES
assert set(QUERIES.keys()) == {"list_remotes", "config_form"}
class TestRcloneMutations:
"""Validate all mutations from unraid_mcp/tools/rclone.py."""
def test_create_remote_mutation(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.rclone import MUTATIONS
errors = _validate_operation(schema, MUTATIONS["create_remote"])
assert not errors, f"create_remote mutation validation failed: {errors}"
def test_delete_remote_mutation(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.rclone import MUTATIONS
errors = _validate_operation(schema, MUTATIONS["delete_remote"])
assert not errors, f"delete_remote mutation validation failed: {errors}"
def test_all_rclone_mutations_covered(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.rclone import MUTATIONS
assert set(MUTATIONS.keys()) == {"create_remote", "delete_remote"}
# ============================================================================
# Users Tool (1 query)
# ============================================================================
class TestUsersQueries:
"""Validate all queries from unraid_mcp/tools/users.py."""
def test_me_query(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.users import QUERIES
errors = _validate_operation(schema, QUERIES["me"])
assert not errors, f"me query validation failed: {errors}"
def test_all_users_queries_covered(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.users import QUERIES
assert set(QUERIES.keys()) == {"me"}
# ============================================================================
# Keys Tool (2 queries + 3 mutations)
# ============================================================================
class TestKeysQueries:
"""Validate all queries from unraid_mcp/tools/keys.py."""
def test_list_query(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.keys import QUERIES
errors = _validate_operation(schema, QUERIES["list"])
assert not errors, f"list query validation failed: {errors}"
def test_get_query(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.keys import QUERIES
errors = _validate_operation(schema, QUERIES["get"])
assert not errors, f"get query validation failed: {errors}"
def test_all_keys_queries_covered(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.keys import QUERIES
assert set(QUERIES.keys()) == {"list", "get"}
class TestKeysMutations:
"""Validate all mutations from unraid_mcp/tools/keys.py."""
def test_create_mutation(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.keys import MUTATIONS
errors = _validate_operation(schema, MUTATIONS["create"])
assert not errors, f"create mutation validation failed: {errors}"
def test_update_mutation(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.keys import MUTATIONS
errors = _validate_operation(schema, MUTATIONS["update"])
assert not errors, f"update mutation validation failed: {errors}"
def test_delete_mutation(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.keys import MUTATIONS
errors = _validate_operation(schema, MUTATIONS["delete"])
assert not errors, f"delete mutation validation failed: {errors}"
def test_all_keys_mutations_covered(self, schema: GraphQLSchema) -> None:
from unraid_mcp.tools.keys import MUTATIONS
assert set(MUTATIONS.keys()) == {"create", "update", "delete"}
# ============================================================================
# Health Tool (inline queries)
# ============================================================================
class TestHealthQueries:
"""Validate inline queries from unraid_mcp/tools/health.py."""
def test_connection_query(self, schema: GraphQLSchema) -> None:
errors = _validate_operation(schema, "query { online }")
assert not errors, f"test_connection query validation failed: {errors}"
def test_comprehensive_check_query(self, schema: GraphQLSchema) -> None:
query = """
query ComprehensiveHealthCheck {
info {
machineId time
versions { unraid }
os { uptime }
}
array { state }
notifications {
overview { unread { alert warning total } }
}
docker {
containers(skipCache: true) { id state status }
}
}
"""
errors = _validate_operation(schema, query)
assert not errors, f"comprehensive check query validation failed: {errors}"
# ============================================================================
# Cross-cutting Validation
# ============================================================================
class TestSchemaCompleteness:
"""Validate that all tool operations are covered by the schema."""
def test_all_tool_queries_validate(self, schema: GraphQLSchema) -> None:
"""Bulk-validate every query across all tools."""
import importlib
tool_modules = [
"unraid_mcp.tools.info",
"unraid_mcp.tools.array",
"unraid_mcp.tools.storage",
"unraid_mcp.tools.docker",
"unraid_mcp.tools.virtualization",
"unraid_mcp.tools.notifications",
"unraid_mcp.tools.rclone",
"unraid_mcp.tools.users",
"unraid_mcp.tools.keys",
]
failures: list[str] = []
total = 0
for module_path in tool_modules:
mod = importlib.import_module(module_path)
tool_name = module_path.split(".")[-1]
queries = getattr(mod, "QUERIES", {})
for action, query_str in queries.items():
total += 1
errors = _validate_operation(schema, query_str)
if errors:
failures.append(f"{tool_name}/QUERIES/{action}: {errors[0]}")
mutations = getattr(mod, "MUTATIONS", {})
for action, query_str in mutations.items():
total += 1
errors = _validate_operation(schema, query_str)
if errors:
failures.append(f"{tool_name}/MUTATIONS/{action}: {errors[0]}")
assert not failures, (
f"{len(failures)} of {total} operations failed validation:\n"
+ "\n".join(failures)
)
def test_schema_has_query_type(self, schema: GraphQLSchema) -> None:
assert schema.query_type is not None
def test_schema_has_mutation_type(self, schema: GraphQLSchema) -> None:
assert schema.mutation_type is not None
def test_schema_has_subscription_type(self, schema: GraphQLSchema) -> None:
assert schema.subscription_type is not None
def test_total_operations_count(self, schema: GraphQLSchema) -> None:
"""Verify the expected number of tool operations exist."""
import importlib
tool_modules = [
"unraid_mcp.tools.info",
"unraid_mcp.tools.array",
"unraid_mcp.tools.storage",
"unraid_mcp.tools.docker",
"unraid_mcp.tools.virtualization",
"unraid_mcp.tools.notifications",
"unraid_mcp.tools.rclone",
"unraid_mcp.tools.users",
"unraid_mcp.tools.keys",
]
total = 0
for module_path in tool_modules:
mod = importlib.import_module(module_path)
total += len(getattr(mod, "QUERIES", {}))
total += len(getattr(mod, "MUTATIONS", {}))
# 71 operations across all tools (queries + mutations in dicts)
assert total >= 50, f"Expected at least 50 operations, found {total}"