mirror of
https://github.com/jmagar/unraid-mcp.git
synced 2026-03-23 12:39:24 -07:00
Threads 1, 2, 3 — test hygiene:
- Move elicit_and_configure/elicit_reset_confirmation to module-level imports
in unraid.py so tests can patch at unraid_mcp.tools.unraid.* (thread 2)
- Add return type annotations to _make_tool() in test_customization.py (thread 1)
- Replace unused _mock_ensure_started fixture params with @usefixtures (thread 3)
Thread 4 — remove dead 'connect' subaction from _SYSTEM_QUERIES; the subaction
was always rejected with a ToolError, creating an inconsistent contract.
Thread 5 — centralize two inline "query { online }" strings by reusing
_SYSTEM_QUERIES["online"]; add _DOCKER_QUERIES["_resolve"] for container-name
resolution instead of an inline query literal.
Threads 14, 15, 16, 17, 18 — test improvements:
- test-tools.sh: reword header to "broad non-destructive smoke coverage" (t14)
- test-tools.sh: add _json_payload() helper using jq --arg for safe JSON
construction; replace all printf-based payloads (thread 15)
- test_input_validation.py: add return type annotations to _make_tool and all
nested _run_test coroutines (thread 16)
- test_query_validation.py: extract _all_domain_dicts() shared helper to
eliminate the duplicate 22-item registry (thread 17)
- test_query_validation.py: tighten regression threshold from 50 → 90 (thread 18)
156 lines
6.8 KiB
Python
156 lines
6.8 KiB
Python
"""Tests for MCP subscription resources."""
|
|
|
|
import json
|
|
from unittest.mock import AsyncMock, patch
|
|
|
|
import pytest
|
|
from fastmcp import FastMCP
|
|
|
|
from unraid_mcp.subscriptions.queries import SNAPSHOT_ACTIONS
|
|
from unraid_mcp.subscriptions.resources import register_subscription_resources
|
|
|
|
|
|
def _make_resources():
|
|
"""Register resources on a test FastMCP instance and return it."""
|
|
test_mcp = FastMCP("test")
|
|
register_subscription_resources(test_mcp)
|
|
return test_mcp
|
|
|
|
|
|
@pytest.fixture
|
|
def _mock_ensure_started():
|
|
with patch(
|
|
"unraid_mcp.subscriptions.resources.ensure_subscriptions_started",
|
|
new_callable=AsyncMock,
|
|
) as mock:
|
|
yield mock
|
|
|
|
|
|
class TestLiveResourcesUseManagerCache:
|
|
"""All live resources must read from the persistent SubscriptionManager cache."""
|
|
|
|
@pytest.mark.parametrize("action", list(SNAPSHOT_ACTIONS.keys()))
|
|
@pytest.mark.usefixtures("_mock_ensure_started")
|
|
async def test_resource_returns_cached_data(self, action: str) -> None:
|
|
cached = {"systemMetricsCpu": {"percentTotal": 12.5}}
|
|
with patch("unraid_mcp.subscriptions.resources.subscription_manager") as mock_mgr:
|
|
mock_mgr.get_resource_data = AsyncMock(return_value=cached)
|
|
mcp = _make_resources()
|
|
resource = mcp.providers[0]._components[f"resource:unraid://live/{action}@"]
|
|
result = await resource.fn()
|
|
assert json.loads(result) == cached
|
|
|
|
@pytest.mark.parametrize("action", list(SNAPSHOT_ACTIONS.keys()))
|
|
@pytest.mark.usefixtures("_mock_ensure_started")
|
|
async def test_resource_returns_connecting_when_no_cache_and_no_error(
|
|
self, action: str
|
|
) -> None:
|
|
with patch("unraid_mcp.subscriptions.resources.subscription_manager") as mock_mgr:
|
|
mock_mgr.get_resource_data = AsyncMock(return_value=None)
|
|
mock_mgr.last_error = {}
|
|
mcp = _make_resources()
|
|
resource = mcp.providers[0]._components[f"resource:unraid://live/{action}@"]
|
|
result = await resource.fn()
|
|
parsed = json.loads(result)
|
|
assert parsed["status"] == "connecting"
|
|
|
|
@pytest.mark.parametrize("action", list(SNAPSHOT_ACTIONS.keys()))
|
|
@pytest.mark.usefixtures("_mock_ensure_started")
|
|
async def test_resource_returns_error_status_on_permanent_failure(self, action: str) -> None:
|
|
with patch("unraid_mcp.subscriptions.resources.subscription_manager") as mock_mgr:
|
|
mock_mgr.get_resource_data = AsyncMock(return_value=None)
|
|
mock_mgr.last_error = {action: "WebSocket auth failed"}
|
|
mcp = _make_resources()
|
|
resource = mcp.providers[0]._components[f"resource:unraid://live/{action}@"]
|
|
result = await resource.fn()
|
|
parsed = json.loads(result)
|
|
assert parsed["status"] == "error"
|
|
assert "auth failed" in parsed["message"]
|
|
|
|
|
|
class TestSnapshotSubscriptionsRegistered:
|
|
"""All SNAPSHOT_ACTIONS must be registered in the SubscriptionManager with auto_start=True."""
|
|
|
|
def test_all_snapshot_actions_in_configs(self) -> None:
|
|
from unraid_mcp.subscriptions.manager import subscription_manager
|
|
|
|
for action in SNAPSHOT_ACTIONS:
|
|
assert action in subscription_manager.subscription_configs, (
|
|
f"'{action}' not registered in subscription_configs"
|
|
)
|
|
|
|
def test_all_snapshot_actions_autostart(self) -> None:
|
|
from unraid_mcp.subscriptions.manager import subscription_manager
|
|
|
|
for action in SNAPSHOT_ACTIONS:
|
|
config = subscription_manager.subscription_configs[action]
|
|
assert config.get("auto_start") is True, (
|
|
f"'{action}' missing auto_start=True in subscription_configs"
|
|
)
|
|
|
|
|
|
class TestLogsStreamResource:
|
|
@pytest.mark.usefixtures("_mock_ensure_started")
|
|
async def test_logs_stream_no_data(self) -> None:
|
|
with patch("unraid_mcp.subscriptions.resources.subscription_manager") as mock_mgr:
|
|
mock_mgr.get_resource_data = AsyncMock(return_value=None)
|
|
mcp = _make_resources()
|
|
local_provider = mcp.providers[0]
|
|
resource = local_provider._components["resource:unraid://logs/stream@"]
|
|
result = await resource.fn()
|
|
parsed = json.loads(result)
|
|
assert "status" in parsed
|
|
|
|
@pytest.mark.usefixtures("_mock_ensure_started")
|
|
async def test_logs_stream_returns_data_with_empty_dict(self) -> None:
|
|
"""Empty dict cache hit must return data, not 'connecting' status."""
|
|
with patch("unraid_mcp.subscriptions.resources.subscription_manager") as mock_mgr:
|
|
mock_mgr.get_resource_data = AsyncMock(return_value={})
|
|
mcp = _make_resources()
|
|
local_provider = mcp.providers[0]
|
|
resource = local_provider._components["resource:unraid://logs/stream@"]
|
|
result = await resource.fn()
|
|
assert json.loads(result) == {}
|
|
|
|
|
|
class TestAutoStartDisabledFallback:
|
|
"""When auto_start is disabled, resources fall back to on-demand subscribe_once."""
|
|
|
|
@pytest.mark.parametrize("action", list(SNAPSHOT_ACTIONS.keys()))
|
|
@pytest.mark.usefixtures("_mock_ensure_started")
|
|
async def test_fallback_returns_subscribe_once_data(self, action: str) -> None:
|
|
fallback_data = {"systemMetricsCpu": {"percentTotal": 42.0}}
|
|
with (
|
|
patch("unraid_mcp.subscriptions.resources.subscription_manager") as mock_mgr,
|
|
patch(
|
|
"unraid_mcp.subscriptions.resources.subscribe_once",
|
|
new=AsyncMock(return_value=fallback_data),
|
|
),
|
|
):
|
|
mock_mgr.get_resource_data = AsyncMock(return_value=None)
|
|
mock_mgr.last_error = {}
|
|
mock_mgr.auto_start_enabled = False
|
|
mcp = _make_resources()
|
|
resource = mcp.providers[0]._components[f"resource:unraid://live/{action}@"]
|
|
result = await resource.fn()
|
|
assert json.loads(result) == fallback_data
|
|
|
|
@pytest.mark.parametrize("action", list(SNAPSHOT_ACTIONS.keys()))
|
|
@pytest.mark.usefixtures("_mock_ensure_started")
|
|
async def test_fallback_failure_returns_connecting(self, action: str) -> None:
|
|
"""When on-demand fallback itself fails, still return 'connecting' status."""
|
|
with (
|
|
patch("unraid_mcp.subscriptions.resources.subscription_manager") as mock_mgr,
|
|
patch(
|
|
"unraid_mcp.subscriptions.resources.subscribe_once",
|
|
new=AsyncMock(side_effect=Exception("WebSocket failed")),
|
|
),
|
|
):
|
|
mock_mgr.get_resource_data = AsyncMock(return_value=None)
|
|
mock_mgr.last_error = {}
|
|
mock_mgr.auto_start_enabled = False
|
|
mcp = _make_resources()
|
|
resource = mcp.providers[0]._components[f"resource:unraid://live/{action}@"]
|
|
result = await resource.fn()
|
|
assert json.loads(result)["status"] == "connecting"
|