Files
unraid-mcp/tests/test_client.py
Jacob Magar f76e676fd4 test: close critical coverage gaps and harden PR review fixes
Critical bug fixes from PR review agents:
- client.py: eager asyncio.Lock init, Final[frozenset] for _SENSITIVE_KEYS,
  explicit 429 ToolError after retries exhausted, removed lazy _get_client_lock()
  and _RateLimiter._get_lock() patterns
- exceptions.py: use builtin TimeoutError (UP041), explicit handler before broad
  except so asyncio timeouts get descriptive messages
- docker.py: add update_all to DESTRUCTIVE_ACTIONS (was missing), remove dead
  _MUTATION_ACTIONS constant
- manager.py: _cap_log_content returns new dict (immutable), lock write to
  resource_data, clean dead task from active_subscriptions after loop exits
- diagnostics.py: fix inaccurate comment about semicolon injection guard
- health.py: narrow except ValueError in _safe_display_url, fix TODO comment

New test coverage (98 tests added, 529 → 598 passing):
- test_subscription_validation.py: 27 tests for _validate_subscription_query
  (security-critical allow-list, forbidden keyword guards, word-boundary test)
- test_subscription_manager.py: 12 tests for _cap_log_content
  (immutability, truncation, nesting, passthrough)
- test_client.py: +57 tests — _RateLimiter (token math, refill, sleep-on-empty),
  _QueryCache (TTL, invalidation, is_cacheable), 429 retry loop (1/2/3 failures)
- test_health.py: +10 tests for _safe_display_url (credential strip, port,
  path/query removal, malformed IPv6 → <unparseable>)
- test_notifications.py: +7 importance enum and field length validation tests
- test_rclone.py: +7 _validate_config_data security guard tests
- test_storage.py: +15 (tail_lines bounds, format_kb, safe_get)
- test_docker.py: update_all now requires confirm=True + new guard test
- test_destructive_guards.py: update audit to include update_all

Co-authored-by: Claude <noreply@anthropic.com>
2026-02-18 01:28:40 -05:00

698 lines
27 KiB
Python

"""Tests for unraid_mcp.core.client — GraphQL client infrastructure."""
import json
import time
from unittest.mock import AsyncMock, MagicMock, patch
import httpx
import pytest
from unraid_mcp.core.client import (
DEFAULT_TIMEOUT,
DISK_TIMEOUT,
_QueryCache,
_RateLimiter,
_redact_sensitive,
is_idempotent_error,
make_graphql_request,
)
from unraid_mcp.core.exceptions import ToolError
# ---------------------------------------------------------------------------
# is_idempotent_error
# ---------------------------------------------------------------------------
class TestIsIdempotentError:
"""Verify all idempotent error pattern matches."""
def test_start_already_started(self) -> None:
assert is_idempotent_error("Container already started", "start") is True
def test_start_container_already_running(self) -> None:
assert is_idempotent_error("container already running", "start") is True
def test_start_http_code_304(self) -> None:
assert is_idempotent_error("HTTP code 304 - not modified", "start") is True
def test_stop_already_stopped(self) -> None:
assert is_idempotent_error("Container already stopped", "stop") is True
def test_stop_not_running(self) -> None:
assert is_idempotent_error("container not running", "stop") is True
def test_stop_http_code_304(self) -> None:
assert is_idempotent_error("HTTP code 304", "stop") is True
def test_start_unrelated_error(self) -> None:
assert is_idempotent_error("permission denied", "start") is False
def test_stop_unrelated_error(self) -> None:
assert is_idempotent_error("image not found", "stop") is False
def test_unknown_operation(self) -> None:
assert is_idempotent_error("already started", "restart") is False
def test_case_insensitive(self) -> None:
assert is_idempotent_error("ALREADY STARTED", "start") is True
assert is_idempotent_error("ALREADY STOPPED", "stop") is True
# ---------------------------------------------------------------------------
# _redact_sensitive
# ---------------------------------------------------------------------------
class TestRedactSensitive:
"""Verify recursive redaction of sensitive keys."""
def test_flat_dict(self) -> None:
data = {"username": "admin", "password": "hunter2", "host": "10.0.0.1"}
result = _redact_sensitive(data)
assert result["username"] == "admin"
assert result["password"] == "***"
assert result["host"] == "10.0.0.1"
def test_nested_dict(self) -> None:
data = {"config": {"apiKey": "abc123", "url": "http://host"}}
result = _redact_sensitive(data)
assert result["config"]["apiKey"] == "***"
assert result["config"]["url"] == "http://host"
def test_list_of_dicts(self) -> None:
data = [{"token": "t1"}, {"name": "safe"}]
result = _redact_sensitive(data)
assert result[0]["token"] == "***"
assert result[1]["name"] == "safe"
def test_deeply_nested(self) -> None:
data = {"a": {"b": {"c": {"secret": "deep"}}}}
result = _redact_sensitive(data)
assert result["a"]["b"]["c"]["secret"] == "***"
def test_non_dict_passthrough(self) -> None:
assert _redact_sensitive("plain_string") == "plain_string"
assert _redact_sensitive(42) == 42
assert _redact_sensitive(None) is None
def test_case_insensitive_keys(self) -> None:
data = {"Password": "p1", "TOKEN": "t1", "ApiKey": "k1", "Secret": "s1", "Key": "x1"}
result = _redact_sensitive(data)
for v in result.values():
assert v == "***"
def test_compound_key_names(self) -> None:
"""Keys containing sensitive substrings (e.g. 'user_password') are redacted."""
data = {
"user_password": "p1",
"api_key_value": "k1",
"auth_token_expiry": "t1",
"client_secret_id": "s1",
"username": "safe",
"host": "safe",
}
result = _redact_sensitive(data)
assert result["user_password"] == "***"
assert result["api_key_value"] == "***"
assert result["auth_token_expiry"] == "***"
assert result["client_secret_id"] == "***"
assert result["username"] == "safe"
assert result["host"] == "safe"
def test_mixed_list_content(self) -> None:
data = [{"key": "val"}, "string", 123, [{"token": "inner"}]]
result = _redact_sensitive(data)
assert result[0]["key"] == "***"
assert result[1] == "string"
assert result[2] == 123
assert result[3][0]["token"] == "***"
# ---------------------------------------------------------------------------
# Timeout constants
# ---------------------------------------------------------------------------
class TestTimeoutConstants:
def test_default_timeout_read(self) -> None:
assert DEFAULT_TIMEOUT.read == 30.0
def test_default_timeout_connect(self) -> None:
assert DEFAULT_TIMEOUT.connect == 5.0
def test_disk_timeout_read(self) -> None:
assert DISK_TIMEOUT.read == 90.0
def test_disk_timeout_connect(self) -> None:
assert DISK_TIMEOUT.connect == 5.0
# ---------------------------------------------------------------------------
# make_graphql_request — success paths
# ---------------------------------------------------------------------------
class TestMakeGraphQLRequestSuccess:
"""Test successful request paths."""
@pytest.fixture(autouse=True)
def _patch_config(self):
with (
patch("unraid_mcp.core.client.UNRAID_API_URL", "https://unraid.local/graphql"),
patch("unraid_mcp.core.client.UNRAID_API_KEY", "test-key"),
):
yield
async def test_simple_query(self) -> None:
mock_response = MagicMock()
mock_response.raise_for_status = MagicMock()
mock_response.json.return_value = {"data": {"info": {"os": "Unraid"}}}
mock_client = AsyncMock()
mock_client.post.return_value = mock_response
with patch("unraid_mcp.core.client.get_http_client", return_value=mock_client):
result = await make_graphql_request("{ info { os } }")
assert result == {"info": {"os": "Unraid"}}
async def test_query_with_variables(self) -> None:
mock_response = MagicMock()
mock_response.raise_for_status = MagicMock()
mock_response.json.return_value = {"data": {"container": {"name": "plex"}}}
mock_client = AsyncMock()
mock_client.post.return_value = mock_response
with patch("unraid_mcp.core.client.get_http_client", return_value=mock_client):
result = await make_graphql_request(
"query ($id: String!) { container(id: $id) { name } }",
variables={"id": "abc123"},
)
assert result == {"container": {"name": "plex"}}
# Verify variables were passed in the payload
call_kwargs = mock_client.post.call_args
assert call_kwargs.kwargs["json"]["variables"] == {"id": "abc123"}
async def test_custom_timeout_passed(self) -> None:
mock_response = MagicMock()
mock_response.raise_for_status = MagicMock()
mock_response.json.return_value = {"data": {}}
mock_client = AsyncMock()
mock_client.post.return_value = mock_response
custom_timeout = httpx.Timeout(10.0, read=90.0)
with patch("unraid_mcp.core.client.get_http_client", return_value=mock_client):
await make_graphql_request("{ info }", custom_timeout=custom_timeout)
call_kwargs = mock_client.post.call_args
assert call_kwargs.kwargs["timeout"] is custom_timeout
async def test_empty_data_returns_empty_dict(self) -> None:
mock_response = MagicMock()
mock_response.raise_for_status = MagicMock()
mock_response.json.return_value = {"data": None}
mock_client = AsyncMock()
mock_client.post.return_value = mock_response
with patch("unraid_mcp.core.client.get_http_client", return_value=mock_client):
result = await make_graphql_request("{ info }")
assert result == {}
async def test_missing_data_key_returns_empty_dict(self) -> None:
mock_response = MagicMock()
mock_response.raise_for_status = MagicMock()
mock_response.json.return_value = {}
mock_client = AsyncMock()
mock_client.post.return_value = mock_response
with patch("unraid_mcp.core.client.get_http_client", return_value=mock_client):
result = await make_graphql_request("{ info }")
assert result == {}
# ---------------------------------------------------------------------------
# make_graphql_request — error paths
# ---------------------------------------------------------------------------
class TestMakeGraphQLRequestErrors:
"""Test error handling in make_graphql_request."""
@pytest.fixture(autouse=True)
def _patch_config(self):
with (
patch("unraid_mcp.core.client.UNRAID_API_URL", "https://unraid.local/graphql"),
patch("unraid_mcp.core.client.UNRAID_API_KEY", "test-key"),
):
yield
async def test_missing_api_url(self) -> None:
with (
patch("unraid_mcp.core.client.UNRAID_API_URL", ""),
pytest.raises(ToolError, match="UNRAID_API_URL not configured"),
):
await make_graphql_request("{ info }")
async def test_missing_api_key(self) -> None:
with (
patch("unraid_mcp.core.client.UNRAID_API_KEY", ""),
pytest.raises(ToolError, match="UNRAID_API_KEY not configured"),
):
await make_graphql_request("{ info }")
async def test_http_401_error(self) -> None:
mock_response = MagicMock()
mock_response.status_code = 401
mock_response.text = "Unauthorized"
http_error = httpx.HTTPStatusError(
"401 Unauthorized", request=MagicMock(), response=mock_response
)
mock_response.raise_for_status.side_effect = http_error
mock_client = AsyncMock()
mock_client.post.return_value = mock_response
with (
patch("unraid_mcp.core.client.get_http_client", return_value=mock_client),
pytest.raises(ToolError, match="Unraid API returned HTTP 401"),
):
await make_graphql_request("{ info }")
async def test_http_500_error(self) -> None:
mock_response = MagicMock()
mock_response.status_code = 500
mock_response.text = "Internal Server Error"
http_error = httpx.HTTPStatusError(
"500 Internal Server Error", request=MagicMock(), response=mock_response
)
mock_response.raise_for_status.side_effect = http_error
mock_client = AsyncMock()
mock_client.post.return_value = mock_response
with (
patch("unraid_mcp.core.client.get_http_client", return_value=mock_client),
pytest.raises(ToolError, match="Unraid API returned HTTP 500"),
):
await make_graphql_request("{ info }")
async def test_http_503_error(self) -> None:
mock_response = MagicMock()
mock_response.status_code = 503
mock_response.text = "Service Unavailable"
http_error = httpx.HTTPStatusError(
"503 Service Unavailable", request=MagicMock(), response=mock_response
)
mock_response.raise_for_status.side_effect = http_error
mock_client = AsyncMock()
mock_client.post.return_value = mock_response
with (
patch("unraid_mcp.core.client.get_http_client", return_value=mock_client),
pytest.raises(ToolError, match="Unraid API returned HTTP 503"),
):
await make_graphql_request("{ info }")
async def test_network_connection_refused(self) -> None:
mock_client = AsyncMock()
mock_client.post.side_effect = httpx.ConnectError("Connection refused")
with (
patch("unraid_mcp.core.client.get_http_client", return_value=mock_client),
pytest.raises(ToolError, match="Network error connecting to Unraid API"),
):
await make_graphql_request("{ info }")
async def test_network_timeout(self) -> None:
mock_client = AsyncMock()
mock_client.post.side_effect = httpx.ReadTimeout("Read timed out")
with (
patch("unraid_mcp.core.client.get_http_client", return_value=mock_client),
pytest.raises(ToolError, match="Network error connecting to Unraid API"),
):
await make_graphql_request("{ info }")
async def test_json_decode_error(self) -> None:
mock_response = MagicMock()
mock_response.raise_for_status = MagicMock()
mock_response.json.side_effect = json.JSONDecodeError("Expecting value", "", 0)
mock_client = AsyncMock()
mock_client.post.return_value = mock_response
with (
patch("unraid_mcp.core.client.get_http_client", return_value=mock_client),
pytest.raises(ToolError, match="invalid response.*not valid JSON"),
):
await make_graphql_request("{ info }")
# ---------------------------------------------------------------------------
# make_graphql_request — GraphQL error handling
# ---------------------------------------------------------------------------
class TestGraphQLErrorHandling:
"""Test GraphQL-level error parsing and idempotent handling."""
@pytest.fixture(autouse=True)
def _patch_config(self):
with (
patch("unraid_mcp.core.client.UNRAID_API_URL", "https://unraid.local/graphql"),
patch("unraid_mcp.core.client.UNRAID_API_KEY", "test-key"),
):
yield
async def test_graphql_error_raises_tool_error(self) -> None:
mock_response = MagicMock()
mock_response.raise_for_status = MagicMock()
mock_response.json.return_value = {"errors": [{"message": "Field 'bogus' not found"}]}
mock_client = AsyncMock()
mock_client.post.return_value = mock_response
with (
patch("unraid_mcp.core.client.get_http_client", return_value=mock_client),
pytest.raises(ToolError, match="Field 'bogus' not found"),
):
await make_graphql_request("{ bogus }")
async def test_multiple_graphql_errors_joined(self) -> None:
mock_response = MagicMock()
mock_response.raise_for_status = MagicMock()
mock_response.json.return_value = {
"errors": [
{"message": "Error one"},
{"message": "Error two"},
]
}
mock_client = AsyncMock()
mock_client.post.return_value = mock_response
with (
patch("unraid_mcp.core.client.get_http_client", return_value=mock_client),
pytest.raises(ToolError, match="Error one; Error two"),
):
await make_graphql_request("{ info }")
async def test_idempotent_start_returns_success(self) -> None:
mock_response = MagicMock()
mock_response.raise_for_status = MagicMock()
mock_response.json.return_value = {"errors": [{"message": "Container already running"}]}
mock_client = AsyncMock()
mock_client.post.return_value = mock_response
with patch("unraid_mcp.core.client.get_http_client", return_value=mock_client):
result = await make_graphql_request(
'mutation { docker { start(id: "x") } }',
operation_context={"operation": "start"},
)
assert result["idempotent_success"] is True
assert result["operation"] == "start"
async def test_idempotent_stop_returns_success(self) -> None:
mock_response = MagicMock()
mock_response.raise_for_status = MagicMock()
mock_response.json.return_value = {"errors": [{"message": "Container not running"}]}
mock_client = AsyncMock()
mock_client.post.return_value = mock_response
with patch("unraid_mcp.core.client.get_http_client", return_value=mock_client):
result = await make_graphql_request(
'mutation { docker { stop(id: "x") } }',
operation_context={"operation": "stop"},
)
assert result["idempotent_success"] is True
assert result["operation"] == "stop"
async def test_non_idempotent_error_with_context_raises(self) -> None:
"""An error that doesn't match idempotent patterns still raises even with context."""
mock_response = MagicMock()
mock_response.raise_for_status = MagicMock()
mock_response.json.return_value = {"errors": [{"message": "Permission denied"}]}
mock_client = AsyncMock()
mock_client.post.return_value = mock_response
with (
patch("unraid_mcp.core.client.get_http_client", return_value=mock_client),
pytest.raises(ToolError, match="Permission denied"),
):
await make_graphql_request(
'mutation { docker { start(id: "x") } }',
operation_context={"operation": "start"},
)
async def test_graphql_error_without_message_key(self) -> None:
"""Error objects without a 'message' key fall back to str()."""
mock_response = MagicMock()
mock_response.raise_for_status = MagicMock()
mock_response.json.return_value = {
"errors": [{"code": "UNKNOWN", "detail": "something broke"}]
}
mock_client = AsyncMock()
mock_client.post.return_value = mock_response
with (
patch("unraid_mcp.core.client.get_http_client", return_value=mock_client),
pytest.raises(ToolError, match="GraphQL API error"),
):
await make_graphql_request("{ info }")
# ---------------------------------------------------------------------------
# _RateLimiter
# ---------------------------------------------------------------------------
class TestRateLimiter:
"""Unit tests for the token bucket rate limiter."""
async def test_acquire_consumes_one_token(self) -> None:
limiter = _RateLimiter(max_tokens=10, refill_rate=1.0)
initial = limiter.tokens
await limiter.acquire()
assert limiter.tokens == initial - 1
async def test_acquire_succeeds_when_tokens_available(self) -> None:
limiter = _RateLimiter(max_tokens=5, refill_rate=1.0)
# Should complete without sleeping
for _ in range(5):
await limiter.acquire()
# _refill() runs during each acquire() call and adds a tiny time-based
# amount; check < 1.0 (not enough for another immediate request) rather
# than == 0.0 to avoid flakiness from timing.
assert limiter.tokens < 1.0
async def test_tokens_do_not_exceed_max(self) -> None:
limiter = _RateLimiter(max_tokens=10, refill_rate=1.0)
# Force refill with large elapsed time
limiter.last_refill = time.monotonic() - 100.0 # 100 seconds ago
limiter._refill()
assert limiter.tokens == 10.0 # Capped at max_tokens
async def test_refill_adds_tokens_based_on_elapsed(self) -> None:
limiter = _RateLimiter(max_tokens=100, refill_rate=10.0)
limiter.tokens = 0.0
limiter.last_refill = time.monotonic() - 1.0 # 1 second ago
limiter._refill()
# Should have refilled ~10 tokens (10.0 rate * 1.0 sec)
assert 9.5 < limiter.tokens < 10.5
async def test_acquire_sleeps_when_no_tokens(self) -> None:
"""When tokens are exhausted, acquire should sleep before consuming."""
limiter = _RateLimiter(max_tokens=1, refill_rate=1.0)
limiter.tokens = 0.0
sleep_calls = []
async def fake_sleep(duration: float) -> None:
sleep_calls.append(duration)
# Simulate refill by advancing last_refill so tokens replenish
limiter.tokens = 1.0
limiter.last_refill = time.monotonic()
with patch("unraid_mcp.core.client.asyncio.sleep", side_effect=fake_sleep):
await limiter.acquire()
assert len(sleep_calls) == 1
assert sleep_calls[0] > 0
async def test_default_params_match_api_limits(self) -> None:
"""Default rate limiter must use 90 tokens at 9.0/sec (10% headroom from 100/10s)."""
limiter = _RateLimiter()
assert limiter.max_tokens == 90
assert limiter.refill_rate == 9.0
# ---------------------------------------------------------------------------
# _QueryCache
# ---------------------------------------------------------------------------
class TestQueryCache:
"""Unit tests for the TTL query cache."""
def test_miss_on_empty_cache(self) -> None:
cache = _QueryCache()
assert cache.get("{ info }", None) is None
def test_put_and_get_hit(self) -> None:
cache = _QueryCache()
data = {"result": "ok"}
cache.put("GetNetworkConfig { }", None, data)
result = cache.get("GetNetworkConfig { }", None)
assert result == data
def test_expired_entry_returns_none(self) -> None:
cache = _QueryCache()
data = {"result": "ok"}
cache.put("GetNetworkConfig { }", None, data)
# Manually expire the entry
key = cache._cache_key("GetNetworkConfig { }", None)
cache._store[key] = (time.monotonic() - 1.0, data) # expired 1 sec ago
assert cache.get("GetNetworkConfig { }", None) is None
def test_invalidate_all_clears_store(self) -> None:
cache = _QueryCache()
cache.put("GetNetworkConfig { }", None, {"x": 1})
cache.put("GetOwner { }", None, {"y": 2})
assert len(cache._store) == 2
cache.invalidate_all()
assert len(cache._store) == 0
def test_variables_affect_cache_key(self) -> None:
"""Different variables produce different cache keys."""
cache = _QueryCache()
q = "GetNetworkConfig($id: ID!) { network(id: $id) { name } }"
cache.put(q, {"id": "1"}, {"name": "eth0"})
cache.put(q, {"id": "2"}, {"name": "eth1"})
assert cache.get(q, {"id": "1"}) == {"name": "eth0"}
assert cache.get(q, {"id": "2"}) == {"name": "eth1"}
def test_is_cacheable_returns_true_for_known_prefixes(self) -> None:
assert _QueryCache.is_cacheable("GetNetworkConfig { ... }") is True
assert _QueryCache.is_cacheable("GetRegistrationInfo { ... }") is True
assert _QueryCache.is_cacheable("GetOwner { ... }") is True
assert _QueryCache.is_cacheable("GetFlash { ... }") is True
def test_is_cacheable_returns_false_for_mutations(self) -> None:
assert _QueryCache.is_cacheable('mutation { docker { start(id: "x") } }') is False
def test_is_cacheable_returns_false_for_unlisted_queries(self) -> None:
assert _QueryCache.is_cacheable("{ docker { containers { id } } }") is False
assert _QueryCache.is_cacheable("{ info { os } }") is False
def test_is_cacheable_mutation_check_is_prefix(self) -> None:
"""Queries that start with 'mutation' after whitespace are not cacheable."""
assert _QueryCache.is_cacheable(" mutation { ... }") is False
def test_expired_entry_removed_from_store(self) -> None:
"""Accessing an expired entry should remove it from the internal store."""
cache = _QueryCache()
cache.put("GetOwner { }", None, {"owner": "root"})
key = cache._cache_key("GetOwner { }", None)
cache._store[key] = (time.monotonic() - 1.0, {"owner": "root"})
assert key in cache._store
cache.get("GetOwner { }", None) # triggers deletion
assert key not in cache._store
# ---------------------------------------------------------------------------
# make_graphql_request — 429 retry behavior
# ---------------------------------------------------------------------------
class TestRateLimitRetry:
"""Tests for the 429 retry loop in make_graphql_request."""
@pytest.fixture(autouse=True)
def _patch_config(self):
with (
patch("unraid_mcp.core.client.UNRAID_API_URL", "https://unraid.local/graphql"),
patch("unraid_mcp.core.client.UNRAID_API_KEY", "test-key"),
patch("unraid_mcp.core.client.asyncio.sleep", new_callable=AsyncMock),
):
yield
def _make_429_response(self) -> MagicMock:
resp = MagicMock()
resp.status_code = 429
resp.raise_for_status = MagicMock()
return resp
def _make_ok_response(self, data: dict) -> MagicMock:
resp = MagicMock()
resp.status_code = 200
resp.raise_for_status = MagicMock()
resp.json.return_value = {"data": data}
return resp
async def test_single_429_then_success_retries(self) -> None:
"""One 429 followed by a success should return the data."""
mock_client = AsyncMock()
mock_client.post.side_effect = [
self._make_429_response(),
self._make_ok_response({"info": {"os": "Unraid"}}),
]
with patch("unraid_mcp.core.client.get_http_client", return_value=mock_client):
result = await make_graphql_request("{ info { os } }")
assert result == {"info": {"os": "Unraid"}}
assert mock_client.post.call_count == 2
async def test_two_429s_then_success(self) -> None:
"""Two 429s followed by success returns data after 2 retries."""
mock_client = AsyncMock()
mock_client.post.side_effect = [
self._make_429_response(),
self._make_429_response(),
self._make_ok_response({"x": 1}),
]
with patch("unraid_mcp.core.client.get_http_client", return_value=mock_client):
result = await make_graphql_request("{ x }")
assert result == {"x": 1}
assert mock_client.post.call_count == 3
async def test_three_429s_raises_tool_error(self) -> None:
"""Three consecutive 429s (all retries exhausted) raises ToolError."""
mock_client = AsyncMock()
mock_client.post.side_effect = [
self._make_429_response(),
self._make_429_response(),
self._make_429_response(),
]
with (
patch("unraid_mcp.core.client.get_http_client", return_value=mock_client),
pytest.raises(ToolError, match="rate limiting"),
):
await make_graphql_request("{ info }")
async def test_rate_limit_error_message_advises_wait(self) -> None:
"""The ToolError message should tell the user to wait ~10 seconds."""
mock_client = AsyncMock()
mock_client.post.side_effect = [
self._make_429_response(),
self._make_429_response(),
self._make_429_response(),
]
with (
patch("unraid_mcp.core.client.get_http_client", return_value=mock_client),
pytest.raises(ToolError, match="10 seconds"),
):
await make_graphql_request("{ info }")