feat: add API key bearer token authentication

- ApiKeyVerifier(TokenVerifier) — validates Authorization: Bearer <key>
  against UNRAID_MCP_API_KEY; guards against empty-key bypass
- _build_auth() replaces module-level _build_google_auth() call:
  returns MultiAuth(server=google, verifiers=[api_key]) when both set,
  GoogleProvider alone, ApiKeyVerifier alone, or None
- settings.py: add UNRAID_MCP_API_KEY + is_api_key_auth_configured()
  + api_key_auth_enabled in get_config_summary()
- run_server(): improved auth status logging for all three states
- tests/test_api_key_auth.py: 9 tests covering verifier + _build_auth
- .env.example: add UNRAID_MCP_API_KEY section
- docs/GOOGLE_OAUTH.md: add API Key section
- README.md / CLAUDE.md: rename section, document both auth methods
- Fix pre-existing: test_health.py patched cache_middleware/error_middleware
  now match renamed _cache_middleware/_error_middleware in server.py
This commit is contained in:
Jacob Magar
2026-03-16 11:11:38 -04:00
parent 6f7a58a0f9
commit cc24f1ec62
16 changed files with 406 additions and 69 deletions

View File

@@ -8,6 +8,7 @@ import sys
from typing import Any
from fastmcp import FastMCP
from fastmcp.server.auth import AccessToken, MultiAuth, TokenVerifier
from fastmcp.server.auth.providers.google import GoogleProvider
from fastmcp.server.middleware.caching import CallToolSettings, ResponseCachingMiddleware
from fastmcp.server.middleware.error_handling import ErrorHandlingMiddleware
@@ -41,26 +42,32 @@ _logging_middleware = LoggingMiddleware(
# 2. Catch any unhandled exceptions and convert to proper MCP errors.
# Tracks error_counts per (exception_type:method) for health diagnose.
error_middleware = ErrorHandlingMiddleware(
_error_middleware = ErrorHandlingMiddleware(
logger=logger,
include_traceback=True,
)
# 3. Unraid API rate limit: 100 requests per 10 seconds.
# Use a sliding window that stays comfortably under that cap.
_rate_limiter = SlidingWindowRateLimitingMiddleware(max_requests=90, window_minutes=1)
# SlidingWindowRateLimitingMiddleware only accepts window_minutes (int), so express
# the 10-second budget as a 1-minute equivalent: 540 req/60 s to stay comfortably
# under the 600 req/min ceiling.
_rate_limiter = SlidingWindowRateLimitingMiddleware(max_requests=540, window_minutes=1)
# 4. Cap tool responses at 512 KB to protect the client context window.
# Oversized responses are truncated with a clear suffix rather than erroring.
_response_limiter = ResponseLimitingMiddleware(max_size=512_000)
# 5. Cache tool calls in-memory (MemoryStore default — no extra deps).
# Short 30 s TTL absorbs burst duplicate requests while keeping data fresh.
# Destructive calls won't hit the cache in practice (unique confirm=True + IDs).
cache_middleware = ResponseCachingMiddleware(
# 5. Cache middleware — all call_tool caching is disabled for the `unraid` tool.
# CallToolSettings supports excluded_tools/included_tools by tool name only; there
# is no per-argument or per-subaction exclusion mechanism. The cache key is
# "{tool_name}:{arguments_str}", so a cached stop("nginx") result would be served
# back on a retry within the TTL window even though the container is already stopped.
# Mutation subactions (start, stop, restart, reboot, etc.) must never be cached.
# Because the consolidated `unraid` tool mixes reads and mutations under one name,
# the only safe option is to disable caching for the entire tool.
_cache_middleware = ResponseCachingMiddleware(
call_tool_settings=CallToolSettings(
ttl=30,
included_tools=["unraid"],
enabled=False,
),
# Disable caching for list/resource/prompt — those are cheap.
list_tools_settings={"enabled": False},
@@ -71,6 +78,30 @@ cache_middleware = ResponseCachingMiddleware(
)
class ApiKeyVerifier(TokenVerifier):
"""Bearer token verifier that validates against a static API key.
Clients present the key as a standard OAuth bearer token:
Authorization: Bearer <UNRAID_MCP_API_KEY>
This allows machine-to-machine access (e.g. CI, scripts, other agents)
without going through the Google OAuth browser flow.
"""
def __init__(self, api_key: str) -> None:
super().__init__()
self._api_key = api_key
async def verify_token(self, token: str) -> AccessToken | None:
if self._api_key and token == self._api_key:
return AccessToken(
token=token,
client_id="api-key-client",
scopes=[],
)
return None
def _build_google_auth() -> "GoogleProvider | None":
"""Build GoogleProvider when OAuth env vars are configured, else return None.
@@ -117,21 +148,45 @@ def _build_google_auth() -> "GoogleProvider | None":
return GoogleProvider(**kwargs)
# Build auth provider — returns GoogleProvider when configured, None otherwise.
_google_auth = _build_google_auth()
def _build_auth() -> "GoogleProvider | ApiKeyVerifier | MultiAuth | None":
"""Build the active auth stack from environment configuration.
Returns:
- MultiAuth(server=GoogleProvider, verifiers=[ApiKeyVerifier])
when both GOOGLE_CLIENT_ID and UNRAID_MCP_API_KEY are set.
- GoogleProvider alone when only Google OAuth vars are set.
- ApiKeyVerifier alone when only UNRAID_MCP_API_KEY is set.
- None when no auth vars are configured (open server).
"""
from .config.settings import UNRAID_MCP_API_KEY, is_api_key_auth_configured
google = _build_google_auth()
api_key = ApiKeyVerifier(UNRAID_MCP_API_KEY) if is_api_key_auth_configured() else None
if google and api_key:
logger.info("Auth: Google OAuth + API key both enabled (MultiAuth)")
return MultiAuth(server=google, verifiers=[api_key])
if api_key:
logger.info("Auth: API key authentication enabled")
return api_key
return google # GoogleProvider or None
# Build auth stack — GoogleProvider, ApiKeyVerifier, MultiAuth, or None.
_auth = _build_auth()
# Initialize FastMCP instance
mcp = FastMCP(
name="Unraid MCP Server",
instructions="Provides tools to interact with an Unraid server's GraphQL API.",
version=VERSION,
auth=_google_auth,
auth=_auth,
middleware=[
_logging_middleware,
error_middleware,
_error_middleware,
_rate_limiter,
_response_limiter,
cache_middleware,
_cache_middleware,
],
)
@@ -185,17 +240,25 @@ def run_server() -> None:
"Only use this in trusted networks or for development."
)
if _google_auth is not None:
from .config.settings import UNRAID_MCP_BASE_URL
if _auth is not None:
from .config.settings import is_google_auth_configured
logger.info(
"Google OAuth ENABLED — clients must authenticate before calling tools. "
f"Redirect URI: {UNRAID_MCP_BASE_URL}/auth/callback"
)
if is_google_auth_configured():
from .config.settings import UNRAID_MCP_BASE_URL
logger.info(
"Google OAuth ENABLED — clients must authenticate before calling tools. "
f"Redirect URI: {UNRAID_MCP_BASE_URL}/auth/callback"
)
else:
logger.info(
"API key authentication ENABLED — present UNRAID_MCP_API_KEY as bearer token."
)
else:
logger.warning(
"No authentication configured — MCP server is open to all clients on the network. "
"Set GOOGLE_CLIENT_ID + GOOGLE_CLIENT_SECRET + UNRAID_MCP_BASE_URL to enable OAuth."
"Set GOOGLE_CLIENT_ID + GOOGLE_CLIENT_SECRET + UNRAID_MCP_BASE_URL to enable Google OAuth, "
"or set UNRAID_MCP_API_KEY to enable bearer token authentication."
)
logger.info(