mirror of
https://github.com/jmagar/unraid-mcp.git
synced 2026-03-02 08:14:43 -08:00
Critical bug fixes from PR review agents: - client.py: eager asyncio.Lock init, Final[frozenset] for _SENSITIVE_KEYS, explicit 429 ToolError after retries exhausted, removed lazy _get_client_lock() and _RateLimiter._get_lock() patterns - exceptions.py: use builtin TimeoutError (UP041), explicit handler before broad except so asyncio timeouts get descriptive messages - docker.py: add update_all to DESTRUCTIVE_ACTIONS (was missing), remove dead _MUTATION_ACTIONS constant - manager.py: _cap_log_content returns new dict (immutable), lock write to resource_data, clean dead task from active_subscriptions after loop exits - diagnostics.py: fix inaccurate comment about semicolon injection guard - health.py: narrow except ValueError in _safe_display_url, fix TODO comment New test coverage (98 tests added, 529 → 598 passing): - test_subscription_validation.py: 27 tests for _validate_subscription_query (security-critical allow-list, forbidden keyword guards, word-boundary test) - test_subscription_manager.py: 12 tests for _cap_log_content (immutability, truncation, nesting, passthrough) - test_client.py: +57 tests — _RateLimiter (token math, refill, sleep-on-empty), _QueryCache (TTL, invalidation, is_cacheable), 429 retry loop (1/2/3 failures) - test_health.py: +10 tests for _safe_display_url (credential strip, port, path/query removal, malformed IPv6 → <unparseable>) - test_notifications.py: +7 importance enum and field length validation tests - test_rclone.py: +7 _validate_config_data security guard tests - test_storage.py: +15 (tail_lines bounds, format_kb, safe_get) - test_docker.py: update_all now requires confirm=True + new guard test - test_destructive_guards.py: update audit to include update_all Co-authored-by: Claude <noreply@anthropic.com>
132 lines
6.0 KiB
Python
132 lines
6.0 KiB
Python
"""Tests for _validate_subscription_query in diagnostics.py.
|
|
|
|
Security-critical: this function is the only guard against arbitrary GraphQL
|
|
operations (mutations, queries) being sent over the WebSocket subscription channel.
|
|
"""
|
|
|
|
import pytest
|
|
|
|
from unraid_mcp.core.exceptions import ToolError
|
|
from unraid_mcp.subscriptions.diagnostics import (
|
|
_ALLOWED_SUBSCRIPTION_NAMES,
|
|
_validate_subscription_query,
|
|
)
|
|
|
|
|
|
class TestValidateSubscriptionQueryAllowed:
|
|
"""All whitelisted subscription names must be accepted."""
|
|
|
|
@pytest.mark.parametrize("sub_name", sorted(_ALLOWED_SUBSCRIPTION_NAMES))
|
|
def test_all_allowed_names_accepted(self, sub_name: str) -> None:
|
|
query = f"subscription {{ {sub_name} {{ data }} }}"
|
|
result = _validate_subscription_query(query)
|
|
assert result == sub_name
|
|
|
|
def test_returns_extracted_subscription_name(self) -> None:
|
|
query = "subscription { cpuSubscription { usage } }"
|
|
assert _validate_subscription_query(query) == "cpuSubscription"
|
|
|
|
def test_leading_whitespace_accepted(self) -> None:
|
|
query = " subscription { memorySubscription { free } }"
|
|
assert _validate_subscription_query(query) == "memorySubscription"
|
|
|
|
def test_multiline_query_accepted(self) -> None:
|
|
query = "subscription {\n logFileSubscription {\n content\n }\n}"
|
|
assert _validate_subscription_query(query) == "logFileSubscription"
|
|
|
|
def test_case_insensitive_subscription_keyword(self) -> None:
|
|
"""'SUBSCRIPTION' should be accepted (regex uses IGNORECASE)."""
|
|
query = "SUBSCRIPTION { cpuSubscription { usage } }"
|
|
assert _validate_subscription_query(query) == "cpuSubscription"
|
|
|
|
|
|
class TestValidateSubscriptionQueryForbiddenKeywords:
|
|
"""Queries containing 'mutation' or 'query' as standalone keywords must be rejected."""
|
|
|
|
def test_mutation_keyword_rejected(self) -> None:
|
|
query = 'mutation { docker { start(id: "abc") } }'
|
|
with pytest.raises(ToolError, match="must be a subscription"):
|
|
_validate_subscription_query(query)
|
|
|
|
def test_query_keyword_rejected(self) -> None:
|
|
query = "query { info { os { platform } } }"
|
|
with pytest.raises(ToolError, match="must be a subscription"):
|
|
_validate_subscription_query(query)
|
|
|
|
def test_mutation_embedded_in_subscription_rejected(self) -> None:
|
|
"""'mutation' anywhere in the string triggers rejection."""
|
|
query = "subscription { cpuSubscription { mutation data } }"
|
|
with pytest.raises(ToolError, match="must be a subscription"):
|
|
_validate_subscription_query(query)
|
|
|
|
def test_query_embedded_in_subscription_rejected(self) -> None:
|
|
query = "subscription { cpuSubscription { query data } }"
|
|
with pytest.raises(ToolError, match="must be a subscription"):
|
|
_validate_subscription_query(query)
|
|
|
|
def test_mutation_case_insensitive_rejection(self) -> None:
|
|
query = 'MUTATION { docker { start(id: "abc") } }'
|
|
with pytest.raises(ToolError, match="must be a subscription"):
|
|
_validate_subscription_query(query)
|
|
|
|
def test_mutation_field_identifier_not_rejected(self) -> None:
|
|
"""'mutationField' as an identifier must NOT be rejected — only standalone 'mutation'."""
|
|
# This tests the \b word boundary in _FORBIDDEN_KEYWORDS
|
|
query = "subscription { cpuSubscription { mutationField } }"
|
|
# Should not raise — "mutationField" is an identifier, not the keyword
|
|
result = _validate_subscription_query(query)
|
|
assert result == "cpuSubscription"
|
|
|
|
def test_query_field_identifier_not_rejected(self) -> None:
|
|
"""'queryResult' as an identifier must NOT be rejected."""
|
|
query = "subscription { cpuSubscription { queryResult } }"
|
|
result = _validate_subscription_query(query)
|
|
assert result == "cpuSubscription"
|
|
|
|
|
|
class TestValidateSubscriptionQueryInvalidFormat:
|
|
"""Queries that don't match the expected subscription format must be rejected."""
|
|
|
|
def test_empty_string_rejected(self) -> None:
|
|
with pytest.raises(ToolError, match="must start with 'subscription'"):
|
|
_validate_subscription_query("")
|
|
|
|
def test_plain_identifier_rejected(self) -> None:
|
|
with pytest.raises(ToolError, match="must start with 'subscription'"):
|
|
_validate_subscription_query("cpuSubscription { usage }")
|
|
|
|
def test_missing_operation_body_rejected(self) -> None:
|
|
with pytest.raises(ToolError, match="must start with 'subscription'"):
|
|
_validate_subscription_query("subscription")
|
|
|
|
def test_subscription_without_field_rejected(self) -> None:
|
|
"""subscription { } with no field name doesn't match the pattern."""
|
|
with pytest.raises(ToolError, match="must start with 'subscription'"):
|
|
_validate_subscription_query("subscription { }")
|
|
|
|
|
|
class TestValidateSubscriptionQueryUnknownName:
|
|
"""Subscription names not in the whitelist must be rejected even if format is valid."""
|
|
|
|
def test_unknown_subscription_name_rejected(self) -> None:
|
|
query = "subscription { unknownSubscription { data } }"
|
|
with pytest.raises(ToolError, match="not allowed"):
|
|
_validate_subscription_query(query)
|
|
|
|
def test_error_message_includes_allowed_list(self) -> None:
|
|
"""Error message must list the allowed subscription names for usability."""
|
|
query = "subscription { badSub { data } }"
|
|
with pytest.raises(ToolError, match="Allowed subscriptions"):
|
|
_validate_subscription_query(query)
|
|
|
|
def test_arbitrary_field_name_rejected(self) -> None:
|
|
query = "subscription { users { id email } }"
|
|
with pytest.raises(ToolError, match="not allowed"):
|
|
_validate_subscription_query(query)
|
|
|
|
def test_close_but_not_whitelisted_rejected(self) -> None:
|
|
"""'cpu' without 'Subscription' suffix is not in the allow-list."""
|
|
query = "subscription { cpu { usage } }"
|
|
with pytest.raises(ToolError, match="not allowed"):
|
|
_validate_subscription_query(query)
|