"""System information and server status queries. Provides the `unraid_info` tool with 19 read-only actions for retrieving system information, array status, network config, and server metadata. """ from typing import Any, Literal from fastmcp import FastMCP from ..config.logging import logger from ..core.client import make_graphql_request from ..core.exceptions import ToolError # Pre-built queries keyed by action name QUERIES: dict[str, str] = { "overview": """ query GetSystemInfo { info { os { platform distro release codename arch hostname logofile serial build uptime } cpu { manufacturer brand vendor family model stepping revision voltage speed speedmin speedmax threads cores processors socket cache flags } memory { layout { bank type clockSpeed formFactor manufacturer partNum serialNum } } baseboard { manufacturer model version serial assetTag } system { manufacturer model version serial uuid sku } machineId time } } """, "array": """ query GetArrayStatus { array { id state capacity { kilobytes { free used total } disks { free used total } } boot { id idx name device size status rotational temp numReads numWrites numErrors fsSize fsFree fsUsed exportable type warning critical fsType comment format transport color } parities { id idx name device size status rotational temp numReads numWrites numErrors fsSize fsFree fsUsed exportable type warning critical fsType comment format transport color } disks { id idx name device size status rotational temp numReads numWrites numErrors fsSize fsFree fsUsed exportable type warning critical fsType comment format transport color } caches { id idx name device size status rotational temp numReads numWrites numErrors fsSize fsFree fsUsed exportable type warning critical fsType comment format transport color } } } """, "network": """ query GetNetworkConfig { network { id accessUrls { type name ipv4 ipv6 } } } """, "registration": """ query GetRegistrationInfo { registration { id type keyFile { location } state expiration updateExpiration } } """, "variables": """ query GetSelectiveUnraidVariables { vars { id version name timeZone comment security workgroup domain domainShort hideDotFiles localMaster enableFruit useNtp domainLogin sysModel sysFlashSlots useSsl port portssl localTld bindMgt useTelnet porttelnet useSsh portssh startPage startArray shutdownTimeout shareSmbEnabled shareNfsEnabled shareAfpEnabled shareCacheEnabled shareAvahiEnabled safeMode startMode configValid configError joinStatus deviceCount flashGuid flashProduct flashVendor mdState mdVersion shareCount shareSmbCount shareNfsCount shareAfpCount shareMoverActive csrfToken } } """, "metrics": """ query GetMetrics { metrics { cpu { percentTotal cpus { percentTotal } } memory { used total } } } """, "services": """ query GetServices { services { name online uptime { timestamp } } } """, "display": """ query GetDisplay { info { display { theme } } } """, "config": """ query GetConfig { config { valid error } } """, "online": """ query GetOnline { online } """, "owner": """ query GetOwner { owner { username avatar } } """, "settings": """ query GetSettings { settings { unified { values } } } """, "server": """ query GetServer { info { os { hostname uptime } machineId time } array { state } online } """, "servers": """ query GetServers { servers { id name status lanip wanip } } """, "flash": """ query GetFlash { flash { id guid product vendor } } """, "ups_devices": """ query GetUpsDevices { upsDevices { id model status name battery { chargeLevel estimatedRuntime health } } } """, "ups_device": """ query GetUpsDevice($id: PrefixedID!) { upsDeviceById(id: $id) { id model status name battery { chargeLevel estimatedRuntime health } power {loadPercentage inputVoltage outputVoltage } } } """, } INFO_ACTIONS = Literal[ "overview", "array", "network", "registration", "variables", "metrics", "services", "display", "config", "online", "owner", "settings", "server", "servers", "flash", "ups_devices", "ups_device", ] assert set(QUERIES.keys()) == set(INFO_ACTIONS.__args__), ( "QUERIES keys and INFO_ACTIONS are out of sync" ) def _process_system_info(raw_info: dict[str, Any]) -> dict[str, Any]: """Process raw system info into summary + details.""" summary: dict[str, Any] = {} if raw_info.get("os"): os_info = raw_info["os"] summary["os"] = ( f"{os_info.get('distro', '')} {os_info.get('release', '')} " f"({os_info.get('platform', '')}, {os_info.get('arch', '')})" ) summary["hostname"] = os_info.get("hostname") summary["uptime"] = os_info.get("uptime") if raw_info.get("cpu"): cpu = raw_info["cpu"] summary["cpu"] = ( f"{cpu.get('manufacturer', '')} {cpu.get('brand', '')} " f"({cpu.get('cores', '?')} cores, {cpu.get('threads', '?')} threads)" ) if raw_info.get("memory") and raw_info["memory"].get("layout"): mem_layout = raw_info["memory"]["layout"] summary["memory_layout_details"] = [] for stick in mem_layout: summary["memory_layout_details"].append( f"Bank {stick.get('bank', '?')}: Type {stick.get('type', '?')}, " f"Speed {stick.get('clockSpeed', '?')}MHz, " f"Manufacturer: {stick.get('manufacturer', '?')}, " f"Part: {stick.get('partNum', '?')}" ) summary["memory_summary"] = ( "Stick layout details retrieved. Overall total/used/free memory stats " "are unavailable due to API limitations." ) else: summary["memory_summary"] = "Memory information not available." return {"summary": summary, "details": raw_info} def _analyze_disk_health(disks: list[dict[str, Any]]) -> dict[str, int]: """Analyze health status of disk arrays.""" counts = { "healthy": 0, "failed": 0, "missing": 0, "new": 0, "warning": 0, "critical": 0, "unknown": 0, } for disk in disks: status = disk.get("status", "").upper() warning = disk.get("warning") critical = disk.get("critical") if status == "DISK_OK": if critical: counts["critical"] += 1 elif warning: counts["warning"] += 1 else: counts["healthy"] += 1 elif status in ("DISK_DSBL", "DISK_INVALID"): counts["failed"] += 1 elif status == "DISK_NP": counts["missing"] += 1 elif status == "DISK_NEW": counts["new"] += 1 else: counts["unknown"] += 1 return counts def _format_kb(k: Any) -> str: """Format kilobyte values into human-readable sizes.""" if k is None: return "N/A" try: k = int(k) except (ValueError, TypeError): return "N/A" if k >= 1024 * 1024 * 1024: return f"{k / (1024 * 1024 * 1024):.2f} TB" if k >= 1024 * 1024: return f"{k / (1024 * 1024):.2f} GB" if k >= 1024: return f"{k / 1024:.2f} MB" return f"{k} KB" def _process_array_status(raw: dict[str, Any]) -> dict[str, Any]: """Process raw array data into summary + details.""" summary: dict[str, Any] = {"state": raw.get("state")} if raw.get("capacity") and raw["capacity"].get("kilobytes"): kb = raw["capacity"]["kilobytes"] summary["capacity_total"] = _format_kb(kb.get("total")) summary["capacity_used"] = _format_kb(kb.get("used")) summary["capacity_free"] = _format_kb(kb.get("free")) summary["num_parity_disks"] = len(raw.get("parities", [])) summary["num_data_disks"] = len(raw.get("disks", [])) summary["num_cache_pools"] = len(raw.get("caches", [])) health_summary: dict[str, Any] = {} for key, label in [ ("parities", "parity_health"), ("disks", "data_health"), ("caches", "cache_health"), ]: if raw.get(key): health_summary[label] = _analyze_disk_health(raw[key]) total_failed = sum(h.get("failed", 0) for h in health_summary.values()) total_critical = sum(h.get("critical", 0) for h in health_summary.values()) total_missing = sum(h.get("missing", 0) for h in health_summary.values()) total_warning = sum(h.get("warning", 0) for h in health_summary.values()) if total_failed > 0 or total_critical > 0: overall = "CRITICAL" elif total_missing > 0: overall = "DEGRADED" elif total_warning > 0: overall = "WARNING" else: overall = "HEALTHY" summary["overall_health"] = overall summary["health_summary"] = health_summary return {"summary": summary, "details": raw} def register_info_tool(mcp: FastMCP) -> None: """Register the unraid_info tool with the FastMCP instance.""" @mcp.tool() async def unraid_info( action: INFO_ACTIONS, device_id: str | None = None, ) -> dict[str, Any]: """Query Unraid system information. Actions: overview - OS, CPU, memory, baseboard, versions array - Array state, capacity, disk health network - Access URLs, interfaces registration - License type, state, expiration variables - System variables and configuration metrics - CPU and memory utilization services - Running services display - Theme settings config - Configuration validity online - Server online status owner - Server owner info settings - All unified settings server - Quick server summary servers - Connected servers list flash - Flash drive info ups_devices - List UPS devices ups_device - Single UPS device (requires device_id) """ if action not in QUERIES: raise ToolError(f"Invalid action '{action}'. Must be one of: {list(QUERIES.keys())}") if action == "ups_device" and not device_id: raise ToolError("device_id is required for ups_device action") query = QUERIES[action] variables: dict[str, Any] | None = None if action == "ups_device": variables = {"id": device_id} # Lookup tables for common response patterns # Simple dict actions: action -> GraphQL response key dict_actions: dict[str, str] = { "network": "network", "registration": "registration", "variables": "vars", "metrics": "metrics", "config": "config", "owner": "owner", "flash": "flash", "ups_device": "upsDeviceById", } # List-wrapped actions: action -> (GraphQL response key, output key) list_actions: dict[str, tuple[str, str]] = { "services": ("services", "services"), "servers": ("servers", "servers"), "ups_devices": ("upsDevices", "ups_devices"), } try: logger.info(f"Executing unraid_info action={action}") data = await make_graphql_request(query, variables) # Special-case actions with custom processing if action == "overview": raw = data.get("info") or {} if not raw: raise ToolError("No system info returned from Unraid API") return _process_system_info(raw) if action == "array": raw = data.get("array") or {} if not raw: raise ToolError("No array information returned from Unraid API") return _process_array_status(raw) if action == "display": info = data.get("info") or {} return dict(info.get("display") or {}) if action == "online": return {"online": data.get("online")} if action == "settings": settings = data.get("settings") or {} if not settings: raise ToolError( "No settings data returned from Unraid API. Check API permissions." ) if not settings.get("unified"): logger.warning(f"Settings returned unexpected structure: {settings.keys()}") raise ToolError( f"Unexpected settings structure. Expected 'unified' key, got: {list(settings.keys())}" ) values = settings["unified"].get("values") or {} return dict(values) if isinstance(values, dict) else {"raw": values} if action == "server": return data # Simple dict-returning actions if action in dict_actions: return dict(data.get(dict_actions[action]) or {}) # List-wrapped actions if action in list_actions: response_key, output_key = list_actions[action] items = data.get(response_key) or [] return {output_key: list(items) if isinstance(items, list) else []} raise ToolError(f"Unhandled action '{action}' — this is a bug") except ToolError: raise except Exception as e: logger.error(f"Error in unraid_info action={action}: {e}", exc_info=True) raise ToolError(f"Failed to execute info/{action}: {e!s}") from e logger.info("Info tool registered successfully")