Files
unraid-mcp/unraid_mcp/tools/info.py

510 lines
19 KiB
Python

"""System information and server status queries.
Provides the `unraid_info` tool with 19 read-only actions for retrieving
system information, array status, network config, and server metadata.
"""
from typing import Any, Literal, get_args
from fastmcp import FastMCP
from ..config.logging import logger
from ..core.client import make_graphql_request
from ..core.exceptions import ToolError, tool_error_handler
from ..core.utils import format_kb
# Pre-built queries keyed by action name
QUERIES: dict[str, str] = {
"overview": """
query GetSystemInfo {
info {
os { platform distro release codename kernel arch hostname logofile serial build uptime }
cpu { manufacturer brand vendor family model stepping revision voltage speed speedmin speedmax threads cores processors socket cache }
memory {
layout { bank type clockSpeed formFactor manufacturer partNum serialNum }
}
baseboard { manufacturer model version serial assetTag }
system { manufacturer model version serial uuid sku }
versions { core { unraid api kernel } packages { openssl node npm pm2 git nginx php docker } }
machineId
time
}
}
""",
"array": """
query GetArrayStatus {
array {
id
state
capacity {
kilobytes { free used total }
disks { free used total }
}
boot { id idx name device size status rotational temp numReads numWrites numErrors fsSize fsFree fsUsed exportable type warning critical fsType comment format transport color }
parities { id idx name device size status rotational temp numReads numWrites numErrors fsSize fsFree fsUsed exportable type warning critical fsType comment format transport color }
disks { id idx name device size status rotational temp numReads numWrites numErrors fsSize fsFree fsUsed exportable type warning critical fsType comment format transport color }
caches { id idx name device size status rotational temp numReads numWrites numErrors fsSize fsFree fsUsed exportable type warning critical fsType comment format transport color }
}
}
""",
"network": """
query GetNetworkInfo {
servers { id name status wanip lanip localurl remoteurl }
vars { id port portssl localTld useSsl }
}
""",
"registration": """
query GetRegistrationInfo {
registration {
id type
keyFile { location }
state expiration updateExpiration
}
}
""",
"connect": """
query GetConnectSettings {
connect { id dynamicRemoteAccess { enabledType runningType error } }
}
""",
"variables": """
query GetSelectiveUnraidVariables {
vars {
id version name timeZone comment security workgroup domain domainShort
hideDotFiles localMaster enableFruit useNtp domainLogin sysModel
sysFlashSlots useSsl port portssl localTld bindMgt useTelnet porttelnet
useSsh portssh startPage startArray shutdownTimeout
shareSmbEnabled shareNfsEnabled shareAfpEnabled shareCacheEnabled
shareAvahiEnabled safeMode startMode configValid configError joinStatus
deviceCount flashGuid flashProduct flashVendor mdState mdVersion
shareCount shareSmbCount shareNfsCount shareAfpCount shareMoverActive
}
}
""",
"metrics": """
query GetMetrics {
metrics { cpu { percentTotal } memory { total used free available buffcache percentTotal } }
}
""",
"services": """
query GetServices {
services { name online version }
}
""",
"display": """
query GetDisplay {
info { display { theme } }
}
""",
"config": """
query GetConfig {
config { valid error }
}
""",
"online": """
query GetOnline { online }
""",
"owner": """
query GetOwner {
owner { username avatar url }
}
""",
"settings": """
query GetSettings {
settings { unified { values } }
}
""",
"server": """
query GetServer {
info {
os { hostname uptime }
versions { core { unraid } }
machineId time
}
array { state }
online
}
""",
"servers": """
query GetServers {
servers { id name status wanip lanip localurl remoteurl }
}
""",
"flash": """
query GetFlash {
flash { id vendor product }
}
""",
"ups_devices": """
query GetUpsDevices {
upsDevices { id name model status battery { chargeLevel estimatedRuntime health } power { loadPercentage inputVoltage outputVoltage } }
}
""",
"ups_device": """
query GetUpsDevice($id: String!) {
upsDeviceById(id: $id) { id name model status battery { chargeLevel estimatedRuntime health } power { loadPercentage inputVoltage outputVoltage nominalPower currentPower } }
}
""",
"ups_config": """
query GetUpsConfig {
upsConfiguration { service upsCable upsType device batteryLevel minutes timeout killUps upsName }
}
""",
}
MUTATIONS: dict[str, str] = {
"update_server": """
mutation UpdateServerIdentity($name: String!, $comment: String, $sysModel: String) {
updateServerIdentity(name: $name, comment: $comment, sysModel: $sysModel) {
id name comment status
}
}
""",
"update_ssh": """
mutation UpdateSshSettings($input: UpdateSshInput!) {
updateSshSettings(input: $input) { id useSsh portssh }
}
""",
}
DESTRUCTIVE_ACTIONS = {"update_ssh"}
ALL_ACTIONS = set(QUERIES) | set(MUTATIONS)
INFO_ACTIONS = Literal[
"overview",
"array",
"network",
"registration",
"connect",
"variables",
"metrics",
"services",
"display",
"config",
"online",
"owner",
"settings",
"server",
"servers",
"flash",
"ups_devices",
"ups_device",
"ups_config",
"update_server",
"update_ssh",
]
if set(get_args(INFO_ACTIONS)) != ALL_ACTIONS:
_missing = ALL_ACTIONS - set(get_args(INFO_ACTIONS))
_extra = set(get_args(INFO_ACTIONS)) - ALL_ACTIONS
raise RuntimeError(
f"QUERIES keys and INFO_ACTIONS are out of sync. "
f"Missing from Literal: {_missing or 'none'}. Extra in Literal: {_extra or 'none'}"
)
def _process_system_info(raw_info: dict[str, Any]) -> dict[str, Any]:
"""Process raw system info into summary + details."""
summary: dict[str, Any] = {}
if raw_info.get("os"):
os_info = raw_info["os"]
summary["os"] = (
f"{os_info.get('distro') or 'unknown'} {os_info.get('release') or 'unknown'} "
f"({os_info.get('platform') or 'unknown'}, {os_info.get('arch') or 'unknown'})"
)
summary["hostname"] = os_info.get("hostname") or "unknown"
summary["uptime"] = os_info.get("uptime")
if raw_info.get("cpu"):
cpu = raw_info["cpu"]
summary["cpu"] = (
f"{cpu.get('manufacturer') or 'unknown'} {cpu.get('brand') or 'unknown'} "
f"({cpu.get('cores') or '?'} cores, {cpu.get('threads') or '?'} threads)"
)
if raw_info.get("memory") and raw_info["memory"].get("layout"):
mem_layout = raw_info["memory"]["layout"]
summary["memory_layout_details"] = []
for stick in mem_layout:
summary["memory_layout_details"].append(
f"Bank {stick.get('bank') or '?'}: Type {stick.get('type') or '?'}, "
f"Speed {stick.get('clockSpeed') or '?'}MHz, "
f"Manufacturer: {stick.get('manufacturer') or '?'}, "
f"Part: {stick.get('partNum') or '?'}"
)
summary["memory_summary"] = (
"Stick layout details retrieved. Overall total/used/free memory stats "
"are unavailable due to API limitations."
)
else:
summary["memory_summary"] = "Memory information not available."
return {"summary": summary, "details": raw_info}
def _analyze_disk_health(disks: list[dict[str, Any]]) -> dict[str, int]:
"""Analyze health status of disk arrays."""
counts = {
"healthy": 0,
"failed": 0,
"missing": 0,
"new": 0,
"warning": 0,
"critical": 0,
"unknown": 0,
}
for disk in disks:
status = disk.get("status", "").upper()
warning = disk.get("warning")
critical = disk.get("critical")
if status == "DISK_OK":
if critical:
counts["critical"] += 1
elif warning:
counts["warning"] += 1
else:
counts["healthy"] += 1
elif status in ("DISK_DSBL", "DISK_INVALID"):
counts["failed"] += 1
elif status == "DISK_NP":
counts["missing"] += 1
elif status == "DISK_NEW":
counts["new"] += 1
else:
counts["unknown"] += 1
return counts
def _process_array_status(raw: dict[str, Any]) -> dict[str, Any]:
"""Process raw array data into summary + details."""
summary: dict[str, Any] = {"state": raw.get("state")}
if raw.get("capacity") and raw["capacity"].get("kilobytes"):
kb = raw["capacity"]["kilobytes"]
summary["capacity_total"] = format_kb(kb.get("total"))
summary["capacity_used"] = format_kb(kb.get("used"))
summary["capacity_free"] = format_kb(kb.get("free"))
summary["num_parity_disks"] = len(raw.get("parities", []))
summary["num_data_disks"] = len(raw.get("disks", []))
summary["num_cache_pools"] = len(raw.get("caches", []))
health_summary: dict[str, Any] = {}
for key, label in [
("parities", "parity_health"),
("disks", "data_health"),
("caches", "cache_health"),
]:
if raw.get(key):
health_summary[label] = _analyze_disk_health(raw[key])
total_failed = sum(h.get("failed", 0) for h in health_summary.values())
total_critical = sum(h.get("critical", 0) for h in health_summary.values())
total_missing = sum(h.get("missing", 0) for h in health_summary.values())
total_warning = sum(h.get("warning", 0) for h in health_summary.values())
if total_failed > 0 or total_critical > 0:
overall = "CRITICAL"
elif total_missing > 0:
overall = "DEGRADED"
elif total_warning > 0:
overall = "WARNING"
else:
overall = "HEALTHY"
summary["overall_health"] = overall
summary["health_summary"] = health_summary
return {"summary": summary, "details": raw}
def register_info_tool(mcp: FastMCP) -> None:
"""Register the unraid_info tool with the FastMCP instance."""
@mcp.tool()
async def unraid_info(
action: INFO_ACTIONS,
confirm: bool = False,
device_id: str | None = None,
server_name: str | None = None,
server_comment: str | None = None,
sys_model: str | None = None,
ssh_enabled: bool | None = None,
ssh_port: int | None = None,
) -> dict[str, Any]:
"""Query Unraid system information.
Actions:
overview - OS, CPU, memory, baseboard, versions
array - Array state, capacity, disk health
network - Access URLs, interfaces
registration - License type, state, expiration
connect - Unraid Connect settings
variables - System variables and configuration
metrics - CPU and memory utilization
services - Running services
display - Theme settings
config - Configuration validity
online - Server online status
owner - Server owner info
settings - All unified settings
server - Quick server summary
servers - Connected servers list
flash - Flash drive info
ups_devices - List UPS devices
ups_device - Single UPS device (requires device_id)
ups_config - UPS configuration
update_server - Update server name, comment, and model (requires server_name)
update_ssh - Enable/disable SSH and set port (requires ssh_enabled, ssh_port)
"""
if action not in ALL_ACTIONS:
raise ToolError(f"Invalid action '{action}'. Must be one of: {sorted(ALL_ACTIONS)}")
if action in DESTRUCTIVE_ACTIONS and not confirm:
raise ToolError(f"Action '{action}' is destructive. Set confirm=True to proceed.")
if action == "ups_device" and not device_id:
raise ToolError("device_id is required for ups_device action")
# Mutation handlers — must return before query = QUERIES[action]
if action == "update_server":
if server_name is None:
raise ToolError("server_name is required for 'update_server' action")
variables_mut: dict[str, Any] = {"name": server_name}
if server_comment is not None:
variables_mut["comment"] = server_comment
if sys_model is not None:
variables_mut["sysModel"] = sys_model
with tool_error_handler("info", action, logger):
logger.info("Executing unraid_info action=update_server")
data = await make_graphql_request(MUTATIONS["update_server"], variables_mut)
return {
"success": True,
"action": "update_server",
"data": data.get("updateServerIdentity"),
}
if action == "update_ssh":
if ssh_enabled is None:
raise ToolError("ssh_enabled is required for 'update_ssh' action")
if ssh_port is None:
raise ToolError("ssh_port is required for 'update_ssh' action")
with tool_error_handler("info", action, logger):
logger.info("Executing unraid_info action=update_ssh")
data = await make_graphql_request(
MUTATIONS["update_ssh"], {"input": {"enabled": ssh_enabled, "port": ssh_port}}
)
return {
"success": True,
"action": "update_ssh",
"data": data.get("updateSshSettings"),
}
# connect is not available on all Unraid API versions
if action == "connect":
raise ToolError(
"The 'connect' query is not available on this Unraid API version. "
"Use the 'settings' action for API and SSO configuration."
)
query = QUERIES[action]
variables: dict[str, Any] | None = None
if action == "ups_device":
variables = {"id": device_id}
# Lookup tables for common response patterns
# Simple dict actions: action -> GraphQL response key
dict_actions: dict[str, str] = {
"registration": "registration",
"variables": "vars",
"metrics": "metrics",
"config": "config",
"owner": "owner",
"flash": "flash",
"ups_device": "upsDeviceById",
"ups_config": "upsConfiguration",
}
# List-wrapped actions: action -> (GraphQL response key, output key)
list_actions: dict[str, tuple[str, str]] = {
"services": ("services", "services"),
"servers": ("servers", "servers"),
"ups_devices": ("upsDevices", "ups_devices"),
}
with tool_error_handler("info", action, logger):
logger.info(f"Executing unraid_info action={action}")
data = await make_graphql_request(query, variables)
# Special-case actions with custom processing
if action == "overview":
raw = data.get("info") or {}
if not raw:
raise ToolError("No system info returned from Unraid API")
return _process_system_info(raw)
if action == "array":
raw = data.get("array") or {}
if not raw:
raise ToolError("No array information returned from Unraid API")
return _process_array_status(raw)
if action == "display":
info = data.get("info") or {}
return dict(info.get("display") or {})
if action == "online":
return {"online": data.get("online")}
if action == "settings":
settings = data.get("settings") or {}
if not settings:
raise ToolError(
"No settings data returned from Unraid API. Check API permissions."
)
if not settings.get("unified"):
logger.warning(f"Settings returned unexpected structure: {settings.keys()}")
raise ToolError(
f"Unexpected settings structure. Expected 'unified' key, got: {list(settings.keys())}"
)
values = settings["unified"].get("values") or {}
return dict(values) if isinstance(values, dict) else {"raw": values}
if action == "server":
return data
if action == "network":
servers_data = data.get("servers") or []
vars_data = data.get("vars") or {}
access_urls = []
for srv in servers_data:
if srv.get("lanip"):
access_urls.append(
{"type": "LAN", "ipv4": srv["lanip"], "url": srv.get("localurl")}
)
if srv.get("wanip"):
access_urls.append(
{"type": "WAN", "ipv4": srv["wanip"], "url": srv.get("remoteurl")}
)
return {
"accessUrls": access_urls,
"httpPort": vars_data.get("port"),
"httpsPort": vars_data.get("portssl"),
"localTld": vars_data.get("localTld"),
"useSsl": vars_data.get("useSsl"),
}
# Simple dict-returning actions
if action in dict_actions:
return dict(data.get(dict_actions[action]) or {})
# List-wrapped actions
if action in list_actions:
response_key, output_key = list_actions[action]
items = data.get(response_key) or []
normalized_items = list(items) if isinstance(items, list) else []
return {output_key: normalized_items}
raise ToolError(f"Unhandled action '{action}' — this is a bug")
logger.info("Info tool registered successfully")