Files
unraid-mcp/tests/test_resources.py
Jacob Magar cc24f1ec62 feat: add API key bearer token authentication
- ApiKeyVerifier(TokenVerifier) — validates Authorization: Bearer <key>
  against UNRAID_MCP_API_KEY; guards against empty-key bypass
- _build_auth() replaces module-level _build_google_auth() call:
  returns MultiAuth(server=google, verifiers=[api_key]) when both set,
  GoogleProvider alone, ApiKeyVerifier alone, or None
- settings.py: add UNRAID_MCP_API_KEY + is_api_key_auth_configured()
  + api_key_auth_enabled in get_config_summary()
- run_server(): improved auth status logging for all three states
- tests/test_api_key_auth.py: 9 tests covering verifier + _build_auth
- .env.example: add UNRAID_MCP_API_KEY section
- docs/GOOGLE_OAUTH.md: add API Key section
- README.md / CLAUDE.md: rename section, document both auth methods
- Fix pre-existing: test_health.py patched cache_middleware/error_middleware
  now match renamed _cache_middleware/_error_middleware in server.py
2026-03-16 11:11:38 -04:00

170 lines
8.1 KiB
Python

"""Tests for MCP subscription resources."""
import json
from unittest.mock import AsyncMock, patch
import pytest
from fastmcp import FastMCP
from unraid_mcp.subscriptions.queries import SNAPSHOT_ACTIONS
from unraid_mcp.subscriptions.resources import register_subscription_resources
def _make_resources():
"""Register resources on a test FastMCP instance and return it."""
test_mcp = FastMCP("test")
register_subscription_resources(test_mcp)
return test_mcp
@pytest.fixture
def _mock_ensure_started():
with patch(
"unraid_mcp.subscriptions.resources.ensure_subscriptions_started",
new_callable=AsyncMock,
) as mock:
yield mock
class TestLiveResourcesUseManagerCache:
"""All live resources must read from the persistent SubscriptionManager cache."""
@pytest.mark.parametrize("action", list(SNAPSHOT_ACTIONS.keys()))
@pytest.mark.usefixtures("_mock_ensure_started")
async def test_resource_returns_cached_data(self, action: str) -> None:
cached = {"systemMetricsCpu": {"percentTotal": 12.5}}
with patch("unraid_mcp.subscriptions.resources.subscription_manager") as mock_mgr:
mock_mgr.get_resource_data = AsyncMock(return_value=cached)
mcp = _make_resources()
# Accessing FastMCP internals intentionally for unit test isolation.
# This may break on FastMCP upgrades — consider a make_resource_fn() helper if it does.
resource = mcp.providers[0]._components[f"resource:unraid://live/{action}@"]
result = await resource.fn()
assert json.loads(result) == cached
@pytest.mark.parametrize("action", list(SNAPSHOT_ACTIONS.keys()))
@pytest.mark.usefixtures("_mock_ensure_started")
async def test_resource_returns_connecting_when_no_cache_and_no_error(
self, action: str
) -> None:
with patch("unraid_mcp.subscriptions.resources.subscription_manager") as mock_mgr:
mock_mgr.get_resource_data = AsyncMock(return_value=None)
mock_mgr.last_error = {}
mcp = _make_resources()
# Accessing FastMCP internals intentionally for unit test isolation.
# This may break on FastMCP upgrades — consider a make_resource_fn() helper if it does.
resource = mcp.providers[0]._components[f"resource:unraid://live/{action}@"]
result = await resource.fn()
parsed = json.loads(result)
assert parsed["status"] == "connecting"
@pytest.mark.parametrize("action", list(SNAPSHOT_ACTIONS.keys()))
@pytest.mark.usefixtures("_mock_ensure_started")
async def test_resource_returns_error_status_on_permanent_failure(self, action: str) -> None:
with patch("unraid_mcp.subscriptions.resources.subscription_manager") as mock_mgr:
mock_mgr.get_resource_data = AsyncMock(return_value=None)
mock_mgr.last_error = {action: "WebSocket auth failed"}
mcp = _make_resources()
# Accessing FastMCP internals intentionally for unit test isolation.
# This may break on FastMCP upgrades — consider a make_resource_fn() helper if it does.
resource = mcp.providers[0]._components[f"resource:unraid://live/{action}@"]
result = await resource.fn()
parsed = json.loads(result)
assert parsed["status"] == "error"
assert "auth failed" in parsed["message"]
class TestSnapshotSubscriptionsRegistered:
"""All SNAPSHOT_ACTIONS must be registered in the SubscriptionManager with auto_start=True."""
def test_all_snapshot_actions_in_configs(self) -> None:
from unraid_mcp.subscriptions.manager import subscription_manager
for action in SNAPSHOT_ACTIONS:
assert action in subscription_manager.subscription_configs, (
f"'{action}' not registered in subscription_configs"
)
def test_all_snapshot_actions_autostart(self) -> None:
from unraid_mcp.subscriptions.manager import subscription_manager
for action in SNAPSHOT_ACTIONS:
config = subscription_manager.subscription_configs[action]
assert config.get("auto_start") is True, (
f"'{action}' missing auto_start=True in subscription_configs"
)
class TestLogsStreamResource:
@pytest.mark.usefixtures("_mock_ensure_started")
async def test_logs_stream_no_data(self) -> None:
with patch("unraid_mcp.subscriptions.resources.subscription_manager") as mock_mgr:
mock_mgr.get_resource_data = AsyncMock(return_value=None)
mcp = _make_resources()
local_provider = mcp.providers[0]
# Accessing FastMCP internals intentionally for unit test isolation.
# This may break on FastMCP upgrades — consider a make_resource_fn() helper if it does.
resource = local_provider._components["resource:unraid://logs/stream@"]
result = await resource.fn()
parsed = json.loads(result)
assert "status" in parsed
@pytest.mark.usefixtures("_mock_ensure_started")
async def test_logs_stream_returns_data_with_empty_dict(self) -> None:
"""Empty dict cache hit must return data, not 'connecting' status."""
with patch("unraid_mcp.subscriptions.resources.subscription_manager") as mock_mgr:
mock_mgr.get_resource_data = AsyncMock(return_value={})
mcp = _make_resources()
local_provider = mcp.providers[0]
# Accessing FastMCP internals intentionally for unit test isolation.
# This may break on FastMCP upgrades — consider a make_resource_fn() helper if it does.
resource = local_provider._components["resource:unraid://logs/stream@"]
result = await resource.fn()
assert json.loads(result) == {}
class TestAutoStartDisabledFallback:
"""When auto_start is disabled, resources fall back to on-demand subscribe_once."""
@pytest.mark.parametrize("action", list(SNAPSHOT_ACTIONS.keys()))
@pytest.mark.usefixtures("_mock_ensure_started")
async def test_fallback_returns_subscribe_once_data(self, action: str) -> None:
fallback_data = {"systemMetricsCpu": {"percentTotal": 42.0}}
with (
patch("unraid_mcp.subscriptions.resources.subscription_manager") as mock_mgr,
patch(
"unraid_mcp.subscriptions.resources.subscribe_once",
new=AsyncMock(return_value=fallback_data),
),
):
mock_mgr.get_resource_data = AsyncMock(return_value=None)
mock_mgr.last_error = {}
mock_mgr.auto_start_enabled = False
mcp = _make_resources()
# Accessing FastMCP internals intentionally for unit test isolation.
# This may break on FastMCP upgrades — consider a make_resource_fn() helper if it does.
resource = mcp.providers[0]._components[f"resource:unraid://live/{action}@"]
result = await resource.fn()
assert json.loads(result) == fallback_data
@pytest.mark.parametrize("action", list(SNAPSHOT_ACTIONS.keys()))
@pytest.mark.usefixtures("_mock_ensure_started")
async def test_fallback_failure_returns_connecting(self, action: str) -> None:
"""When on-demand fallback itself fails, still return 'connecting' status."""
with (
patch("unraid_mcp.subscriptions.resources.subscription_manager") as mock_mgr,
patch(
"unraid_mcp.subscriptions.resources.subscribe_once",
new=AsyncMock(side_effect=Exception("WebSocket failed")),
),
):
mock_mgr.get_resource_data = AsyncMock(return_value=None)
mock_mgr.last_error = {}
mock_mgr.auto_start_enabled = False
mcp = _make_resources()
# Accessing FastMCP internals intentionally for unit test isolation.
# This may break on FastMCP upgrades — consider a make_resource_fn() helper if it does.
resource = mcp.providers[0]._components[f"resource:unraid://live/{action}@"]
result = await resource.fn()
assert json.loads(result)["status"] == "connecting"