Files
unraid-mcp/unraid_mcp/tools/health.py
Jacob Magar 8eab5992ba fix: resolve 21 pre-existing schema field drift failures
- Fix InfoOs: remove codepage (not in schema, codename already queried)
- Fix InfoVersions: use core { unraid api kernel } and packages { ... }
  subtype structure instead of flat field list; remove non-existent fields
- Fix Info: remove apps field from overview query (not in Info type)
- Fix Connect query: replace missing status/sandbox/flashGuid with
  dynamicRemoteAccess { enabledType runningType error }
- Fix CpuUtilization: replace used with percentTotal
- Fix Service: remove state field, add online and version
- Fix Server: replace ip/port with wanip/lanip/localurl/remoteurl
- Fix Flash: remove size field (not in schema)
- Fix UPSDevice: replace flat runtime/charge/load/voltage/frequency/temperature
  with nested battery { chargeLevel estimatedRuntime health } and
  power { loadPercentage inputVoltage outputVoltage } sub-types
- Fix ups_device variable type: PrefixedID! -> String! (schema uses String!)
- Fix UPSConfiguration: replace enabled/mode/cable/driver/port with
  service/upsCable/upsType/device/batteryLevel/minutes/timeout/killUps/upsName
- Fix storage unassigned query: unassignedDevices not in schema, use disks
- Fix docker logs: add subfield selection for DockerContainerLogs type
- Fix docker networks/network_details: move from root dockerNetworks/dockerNetwork
  to docker { networks { ... } }; filter by ID client-side for network_details
- Fix docker port_conflicts: replace containerName/port/conflictsWith with
  containerPorts { privatePort type containers { id name } } and lanPorts
- Fix docker check_updates: replace id/updateAvailable/currentVersion/latestVersion
  with name/updateStatus per ExplicitStatusItem schema type
- Fix keys queries: add subfield selection for permissions { resource actions },
  remove lastUsed (not on ApiKey type)
- Fix health.py comprehensive check: use versions { core { unraid } }
- Update docker mutations coverage assertion to include 11 organizer mutations
- Update test_networks mock to match new docker { networks } response shape
- Update health.py runtime accessor to follow new versions.core.unraid path
2026-03-13 11:19:40 -04:00

254 lines
8.9 KiB
Python

"""Health monitoring and diagnostics.
Provides the `unraid_health` tool with 3 actions for system health checks,
connection testing, and subscription diagnostics.
"""
import datetime
import time
from typing import Any, Literal, get_args
from fastmcp import FastMCP
from ..config.logging import logger
from ..config.settings import (
UNRAID_API_URL,
UNRAID_MCP_HOST,
UNRAID_MCP_PORT,
UNRAID_MCP_TRANSPORT,
VERSION,
)
from ..core.client import make_graphql_request
from ..core.exceptions import ToolError, tool_error_handler
from ..core.utils import safe_display_url
from ..subscriptions.utils import _analyze_subscription_status
ALL_ACTIONS = {"check", "test_connection", "diagnose"}
HEALTH_ACTIONS = Literal["check", "test_connection", "diagnose"]
if set(get_args(HEALTH_ACTIONS)) != ALL_ACTIONS:
_missing = ALL_ACTIONS - set(get_args(HEALTH_ACTIONS))
_extra = set(get_args(HEALTH_ACTIONS)) - ALL_ACTIONS
raise RuntimeError(
"HEALTH_ACTIONS and ALL_ACTIONS are out of sync. "
f"Missing in HEALTH_ACTIONS: {_missing}; extra in HEALTH_ACTIONS: {_extra}"
)
# Severity ordering: only upgrade, never downgrade
_SEVERITY = {"healthy": 0, "warning": 1, "degraded": 2, "unhealthy": 3}
def _server_info() -> dict[str, Any]:
"""Return the standard server info block used in health responses."""
return {
"name": "Unraid MCP Server",
"version": VERSION,
"transport": UNRAID_MCP_TRANSPORT,
"host": UNRAID_MCP_HOST,
"port": UNRAID_MCP_PORT,
}
def register_health_tool(mcp: FastMCP) -> None:
"""Register the unraid_health tool with the FastMCP instance."""
@mcp.tool()
async def unraid_health(
action: HEALTH_ACTIONS,
) -> dict[str, Any]:
"""Monitor Unraid MCP server and system health.
Actions:
check - Comprehensive health check (API latency, array, notifications, Docker)
test_connection - Quick connectivity test (just checks { online })
diagnose - Subscription system diagnostics
"""
if action not in ALL_ACTIONS:
raise ToolError(f"Invalid action '{action}'. Must be one of: {sorted(ALL_ACTIONS)}")
with tool_error_handler("health", action, logger):
logger.info(f"Executing unraid_health action={action}")
if action == "test_connection":
start = time.time()
data = await make_graphql_request("query { online }")
latency = round((time.time() - start) * 1000, 2)
return {
"status": "connected",
"online": data.get("online"),
"latency_ms": latency,
}
if action == "check":
return await _comprehensive_check()
if action == "diagnose":
return await _diagnose_subscriptions()
raise ToolError(f"Unhandled action '{action}' — this is a bug")
logger.info("Health tool registered successfully")
async def _comprehensive_check() -> dict[str, Any]:
"""Run comprehensive health check against the Unraid system."""
start_time = time.time()
health_severity = 0 # Track as int to prevent downgrade
issues: list[str] = []
def _escalate(level: str) -> None:
nonlocal health_severity
health_severity = max(health_severity, _SEVERITY.get(level, 0))
try:
query = """
query ComprehensiveHealthCheck {
info {
machineId time
versions { core { unraid } }
os { uptime }
}
array { state }
notifications {
overview { unread { alert warning total } }
}
docker {
containers(skipCache: true) { id state status }
}
}
"""
data = await make_graphql_request(query)
api_latency = round((time.time() - start_time) * 1000, 2)
health_info: dict[str, Any] = {
"status": "healthy",
"timestamp": datetime.datetime.now(datetime.UTC).isoformat(),
"api_latency_ms": api_latency,
"server": _server_info(),
}
if not data:
health_info["status"] = "unhealthy"
health_info["issues"] = ["No response from Unraid API"]
return health_info
# System info
info = data.get("info", {})
if info:
health_info["unraid_system"] = {
"status": "connected",
"url": safe_display_url(UNRAID_API_URL),
"machine_id": info.get("machineId"),
"version": (info.get("versions") or {}).get("core", {}).get("unraid"),
"uptime": info.get("os", {}).get("uptime"),
}
else:
_escalate("degraded")
issues.append("Unable to retrieve system info")
# Array
array_info = data.get("array", {})
if array_info:
state = array_info.get("state", "unknown")
health_info["array_status"] = {
"state": state,
"healthy": state in ("STARTED", "STOPPED"),
}
if state not in ("STARTED", "STOPPED"):
_escalate("warning")
issues.append(f"Array in unexpected state: {state}")
else:
_escalate("warning")
issues.append("Unable to retrieve array status")
# Notifications
notifications = data.get("notifications", {})
if notifications and notifications.get("overview"):
unread = notifications["overview"].get("unread", {})
alerts = unread.get("alert", 0)
health_info["notifications"] = {
"unread_total": unread.get("total", 0),
"unread_alerts": alerts,
"unread_warnings": unread.get("warning", 0),
}
if alerts > 0:
_escalate("warning")
issues.append(f"{alerts} unread alert(s)")
# Docker
docker = data.get("docker", {})
if docker and docker.get("containers"):
containers = docker["containers"]
health_info["docker_services"] = {
"total": len(containers),
"running": len([c for c in containers if c.get("state") == "running"]),
"stopped": len([c for c in containers if c.get("state") == "exited"]),
}
# Latency assessment
if api_latency > 10000:
_escalate("degraded")
issues.append(f"Very high API latency: {api_latency}ms")
elif api_latency > 5000:
_escalate("warning")
issues.append(f"High API latency: {api_latency}ms")
# Resolve final status from severity level
severity_to_status = {v: k for k, v in _SEVERITY.items()}
health_info["status"] = severity_to_status.get(health_severity, "healthy")
if issues:
health_info["issues"] = issues
health_info["performance"] = {
"api_response_time_ms": api_latency,
"check_duration_ms": round((time.time() - start_time) * 1000, 2),
}
return health_info
except Exception as e:
# Intentionally broad: health checks must always return a result,
# even on unexpected failures, so callers never get an unhandled exception.
logger.error(f"Health check failed: {e}", exc_info=True)
return {
"status": "unhealthy",
"timestamp": datetime.datetime.now(datetime.UTC).isoformat(),
"error": str(e),
"server": _server_info(),
}
async def _diagnose_subscriptions() -> dict[str, Any]:
"""Import and run subscription diagnostics."""
try:
from ..subscriptions.manager import subscription_manager
from ..subscriptions.resources import ensure_subscriptions_started
await ensure_subscriptions_started()
status = await subscription_manager.get_subscription_status()
error_count, connection_issues = _analyze_subscription_status(status)
return {
"timestamp": datetime.datetime.now(datetime.UTC).isoformat(),
"environment": {
"auto_start_enabled": subscription_manager.auto_start_enabled,
"max_reconnect_attempts": subscription_manager.max_reconnect_attempts,
"api_url_configured": bool(UNRAID_API_URL),
},
"subscriptions": status,
"summary": {
"total_configured": len(subscription_manager.subscription_configs),
"active_count": len(subscription_manager.active_subscriptions),
"with_data": len(subscription_manager.resource_data),
"in_error_state": error_count,
"connection_issues": connection_issues,
},
}
except ImportError as e:
raise ToolError("Subscription modules not available") from e
except Exception as e:
raise ToolError(f"Failed to generate diagnostics: {e!s}") from e