Files
unraid-mcp/tests/test_subscription_validation.py

132 lines
6.0 KiB
Python

"""Tests for _validate_subscription_query in diagnostics.py.
Security-critical: this function is the only guard against arbitrary GraphQL
operations (mutations, queries) being sent over the WebSocket subscription channel.
"""
import pytest
from unraid_mcp.core.exceptions import ToolError
from unraid_mcp.subscriptions.diagnostics import (
_ALLOWED_SUBSCRIPTION_FIELDS,
_validate_subscription_query,
)
class TestValidateSubscriptionQueryAllowed:
"""All whitelisted subscription names must be accepted."""
@pytest.mark.parametrize("sub_name", sorted(_ALLOWED_SUBSCRIPTION_FIELDS))
def test_all_allowed_names_accepted(self, sub_name: str) -> None:
query = f"subscription {{ {sub_name} {{ data }} }}"
result = _validate_subscription_query(query)
assert result == sub_name
def test_returns_extracted_subscription_name(self) -> None:
query = "subscription { cpuSubscription { usage } }"
assert _validate_subscription_query(query) == "cpuSubscription"
def test_leading_whitespace_accepted(self) -> None:
query = " subscription { memorySubscription { free } }"
assert _validate_subscription_query(query) == "memorySubscription"
def test_multiline_query_accepted(self) -> None:
query = "subscription {\n logFileSubscription {\n content\n }\n}"
assert _validate_subscription_query(query) == "logFileSubscription"
def test_case_insensitive_subscription_keyword(self) -> None:
"""'SUBSCRIPTION' should be accepted (regex uses IGNORECASE)."""
query = "SUBSCRIPTION { cpuSubscription { usage } }"
assert _validate_subscription_query(query) == "cpuSubscription"
class TestValidateSubscriptionQueryForbiddenKeywords:
"""Queries containing 'mutation' or 'query' as standalone keywords must be rejected."""
def test_mutation_keyword_rejected(self) -> None:
query = 'mutation { docker { start(id: "abc") } }'
with pytest.raises(ToolError, match="must be a subscription"):
_validate_subscription_query(query)
def test_query_keyword_rejected(self) -> None:
query = "query { info { os { platform } } }"
with pytest.raises(ToolError, match="must be a subscription"):
_validate_subscription_query(query)
def test_mutation_embedded_in_subscription_rejected(self) -> None:
"""'mutation' anywhere in the string triggers rejection."""
query = "subscription { cpuSubscription { mutation data } }"
with pytest.raises(ToolError, match="must be a subscription"):
_validate_subscription_query(query)
def test_query_embedded_in_subscription_rejected(self) -> None:
query = "subscription { cpuSubscription { query data } }"
with pytest.raises(ToolError, match="must be a subscription"):
_validate_subscription_query(query)
def test_mutation_case_insensitive_rejection(self) -> None:
query = 'MUTATION { docker { start(id: "abc") } }'
with pytest.raises(ToolError, match="must be a subscription"):
_validate_subscription_query(query)
def test_mutation_field_identifier_not_rejected(self) -> None:
"""'mutationField' as an identifier must NOT be rejected — only standalone 'mutation'."""
# This tests the \b word boundary in _FORBIDDEN_KEYWORDS
query = "subscription { cpuSubscription { mutationField } }"
# Should not raise — "mutationField" is an identifier, not the keyword
result = _validate_subscription_query(query)
assert result == "cpuSubscription"
def test_query_field_identifier_not_rejected(self) -> None:
"""'queryResult' as an identifier must NOT be rejected."""
query = "subscription { cpuSubscription { queryResult } }"
result = _validate_subscription_query(query)
assert result == "cpuSubscription"
class TestValidateSubscriptionQueryInvalidFormat:
"""Queries that don't match the expected subscription format must be rejected."""
def test_empty_string_rejected(self) -> None:
with pytest.raises(ToolError, match="must start with 'subscription'"):
_validate_subscription_query("")
def test_plain_identifier_rejected(self) -> None:
with pytest.raises(ToolError, match="must start with 'subscription'"):
_validate_subscription_query("cpuSubscription { usage }")
def test_missing_operation_body_rejected(self) -> None:
with pytest.raises(ToolError, match="must start with 'subscription'"):
_validate_subscription_query("subscription")
def test_subscription_without_field_rejected(self) -> None:
"""subscription { } with no field name doesn't match the pattern."""
with pytest.raises(ToolError, match="must start with 'subscription'"):
_validate_subscription_query("subscription { }")
class TestValidateSubscriptionQueryUnknownName:
"""Subscription names not in the whitelist must be rejected even if format is valid."""
def test_unknown_subscription_name_rejected(self) -> None:
query = "subscription { unknownSubscription { data } }"
with pytest.raises(ToolError, match="not allowed"):
_validate_subscription_query(query)
def test_error_message_includes_allowed_list(self) -> None:
"""Error message must list the allowed subscription names for usability."""
query = "subscription { badSub { data } }"
with pytest.raises(ToolError, match="Allowed subscriptions"):
_validate_subscription_query(query)
def test_arbitrary_field_name_rejected(self) -> None:
query = "subscription { users { id email } }"
with pytest.raises(ToolError, match="not allowed"):
_validate_subscription_query(query)
def test_close_but_not_whitelisted_rejected(self) -> None:
"""'cpu' without 'Subscription' suffix is not in the allow-list."""
query = "subscription { cpu { usage } }"
with pytest.raises(ToolError, match="not allowed"):
_validate_subscription_query(query)