mirror of
https://github.com/jmagar/unraid-mcp.git
synced 2026-03-02 00:04:45 -08:00
lintfree
This commit is contained in:
@@ -5,7 +5,7 @@ array status with health analysis, network configuration, registration info,
|
||||
and system variables.
|
||||
"""
|
||||
|
||||
from typing import Any, Dict
|
||||
from typing import Any
|
||||
|
||||
from fastmcp import FastMCP
|
||||
|
||||
@@ -15,7 +15,7 @@ from ..core.exceptions import ToolError
|
||||
|
||||
|
||||
# Standalone functions for use by subscription resources
|
||||
async def _get_system_info() -> Dict[str, Any]:
|
||||
async def _get_system_info() -> dict[str, Any]:
|
||||
"""Standalone function to get system info - used by subscriptions and tools."""
|
||||
query = """
|
||||
query GetSystemInfo {
|
||||
@@ -44,20 +44,20 @@ async def _get_system_info() -> Dict[str, Any]:
|
||||
raise ToolError("No system info returned from Unraid API")
|
||||
|
||||
# Process for human-readable output
|
||||
summary = {}
|
||||
summary: dict[str, Any] = {}
|
||||
if raw_info.get('os'):
|
||||
os_info = raw_info['os']
|
||||
summary['os'] = f"{os_info.get('distro', '')} {os_info.get('release', '')} ({os_info.get('platform', '')}, {os_info.get('arch', '')})"
|
||||
summary['hostname'] = os_info.get('hostname')
|
||||
summary['uptime'] = os_info.get('uptime')
|
||||
|
||||
summary['uptime'] = os_info.get('uptime')
|
||||
|
||||
if raw_info.get('cpu'):
|
||||
cpu_info = raw_info['cpu']
|
||||
summary['cpu'] = f"{cpu_info.get('manufacturer', '')} {cpu_info.get('brand', '')} ({cpu_info.get('cores')} cores, {cpu_info.get('threads')} threads)"
|
||||
|
||||
|
||||
if raw_info.get('memory') and raw_info['memory'].get('layout'):
|
||||
mem_layout = raw_info['memory']['layout']
|
||||
summary['memory_layout_details'] = [] # Renamed for clarity
|
||||
summary['memory_layout_details'] = [] # Renamed for clarity
|
||||
# The API is not returning 'size' for individual sticks in the layout, even if queried.
|
||||
# So, we cannot calculate total from layout currently.
|
||||
for stick in mem_layout:
|
||||
@@ -74,10 +74,10 @@ async def _get_system_info() -> Dict[str, Any]:
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error in get_system_info: {e}", exc_info=True)
|
||||
raise ToolError(f"Failed to retrieve system information: {str(e)}")
|
||||
raise ToolError(f"Failed to retrieve system information: {str(e)}") from e
|
||||
|
||||
|
||||
async def _get_array_status() -> Dict[str, Any]:
|
||||
async def _get_array_status() -> dict[str, Any]:
|
||||
"""Standalone function to get array status - used by subscriptions and tools."""
|
||||
query = """
|
||||
query GetArrayStatus {
|
||||
@@ -102,34 +102,38 @@ async def _get_array_status() -> Dict[str, Any]:
|
||||
if not raw_array_info:
|
||||
raise ToolError("No array information returned from Unraid API")
|
||||
|
||||
summary = {}
|
||||
summary: dict[str, Any] = {}
|
||||
summary['state'] = raw_array_info.get('state')
|
||||
|
||||
if raw_array_info.get('capacity') and raw_array_info['capacity'].get('kilobytes'):
|
||||
kb_cap = raw_array_info['capacity']['kilobytes']
|
||||
# Helper to format KB into TB/GB/MB
|
||||
def format_kb(k):
|
||||
if k is None: return "N/A"
|
||||
def format_kb(k: Any) -> str:
|
||||
if k is None:
|
||||
return "N/A"
|
||||
k = int(k) # Values are strings in SDL for PrefixedID containing types like capacity
|
||||
if k >= 1024*1024*1024: return f"{k / (1024*1024*1024):.2f} TB"
|
||||
if k >= 1024*1024: return f"{k / (1024*1024):.2f} GB"
|
||||
if k >= 1024: return f"{k / 1024:.2f} MB"
|
||||
if k >= 1024*1024*1024:
|
||||
return f"{k / (1024*1024*1024):.2f} TB"
|
||||
if k >= 1024*1024:
|
||||
return f"{k / (1024*1024):.2f} GB"
|
||||
if k >= 1024:
|
||||
return f"{k / 1024:.2f} MB"
|
||||
return f"{k} KB"
|
||||
|
||||
summary['capacity_total'] = format_kb(kb_cap.get('total'))
|
||||
summary['capacity_used'] = format_kb(kb_cap.get('used'))
|
||||
summary['capacity_free'] = format_kb(kb_cap.get('free'))
|
||||
|
||||
|
||||
summary['num_parity_disks'] = len(raw_array_info.get('parities', []))
|
||||
summary['num_data_disks'] = len(raw_array_info.get('disks', []))
|
||||
summary['num_cache_pools'] = len(raw_array_info.get('caches', [])) # Note: caches are pools, not individual cache disks
|
||||
|
||||
# Enhanced: Add disk health summary
|
||||
def analyze_disk_health(disks, disk_type):
|
||||
def analyze_disk_health(disks: list[dict[str, Any]], disk_type: str) -> dict[str, int]:
|
||||
"""Analyze health status of disk arrays"""
|
||||
if not disks:
|
||||
return {}
|
||||
|
||||
|
||||
health_counts = {
|
||||
'healthy': 0,
|
||||
'failed': 0,
|
||||
@@ -138,12 +142,12 @@ async def _get_array_status() -> Dict[str, Any]:
|
||||
'warning': 0,
|
||||
'unknown': 0
|
||||
}
|
||||
|
||||
|
||||
for disk in disks:
|
||||
status = disk.get('status', '').upper()
|
||||
warning = disk.get('warning')
|
||||
critical = disk.get('critical')
|
||||
|
||||
|
||||
if status == 'DISK_OK':
|
||||
if warning or critical:
|
||||
health_counts['warning'] += 1
|
||||
@@ -157,7 +161,7 @@ async def _get_array_status() -> Dict[str, Any]:
|
||||
health_counts['new'] += 1
|
||||
else:
|
||||
health_counts['unknown'] += 1
|
||||
|
||||
|
||||
return health_counts
|
||||
|
||||
# Analyze health for each disk type
|
||||
@@ -168,12 +172,12 @@ async def _get_array_status() -> Dict[str, Any]:
|
||||
health_summary['data_health'] = analyze_disk_health(raw_array_info['disks'], 'data')
|
||||
if raw_array_info.get('caches'):
|
||||
health_summary['cache_health'] = analyze_disk_health(raw_array_info['caches'], 'cache')
|
||||
|
||||
|
||||
# Overall array health assessment
|
||||
total_failed = sum(h.get('failed', 0) for h in health_summary.values())
|
||||
total_missing = sum(h.get('missing', 0) for h in health_summary.values())
|
||||
total_warning = sum(h.get('warning', 0) for h in health_summary.values())
|
||||
|
||||
|
||||
if total_failed > 0:
|
||||
overall_health = "CRITICAL"
|
||||
elif total_missing > 0:
|
||||
@@ -182,7 +186,7 @@ async def _get_array_status() -> Dict[str, Any]:
|
||||
overall_health = "WARNING"
|
||||
else:
|
||||
overall_health = "HEALTHY"
|
||||
|
||||
|
||||
summary['overall_health'] = overall_health
|
||||
summary['health_summary'] = health_summary
|
||||
|
||||
@@ -190,28 +194,28 @@ async def _get_array_status() -> Dict[str, Any]:
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error in get_array_status: {e}", exc_info=True)
|
||||
raise ToolError(f"Failed to retrieve array status: {str(e)}")
|
||||
raise ToolError(f"Failed to retrieve array status: {str(e)}") from e
|
||||
|
||||
|
||||
def register_system_tools(mcp: FastMCP):
|
||||
def register_system_tools(mcp: FastMCP) -> None:
|
||||
"""Register all system tools with the FastMCP instance.
|
||||
|
||||
|
||||
Args:
|
||||
mcp: FastMCP instance to register tools with
|
||||
"""
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
async def get_system_info() -> Dict[str, Any]:
|
||||
async def get_system_info() -> dict[str, Any]:
|
||||
"""Retrieves comprehensive information about the Unraid system, OS, CPU, memory, and baseboard."""
|
||||
return await _get_system_info()
|
||||
|
||||
@mcp.tool()
|
||||
async def get_array_status() -> Dict[str, Any]:
|
||||
async def get_array_status() -> dict[str, Any]:
|
||||
"""Retrieves the current status of the Unraid storage array, including its state, capacity, and details of all disks."""
|
||||
return await _get_array_status()
|
||||
|
||||
@mcp.tool()
|
||||
async def get_network_config() -> Dict[str, Any]:
|
||||
async def get_network_config() -> dict[str, Any]:
|
||||
"""Retrieves network configuration details, including access URLs."""
|
||||
query = """
|
||||
query GetNetworkConfig {
|
||||
@@ -224,13 +228,14 @@ def register_system_tools(mcp: FastMCP):
|
||||
try:
|
||||
logger.info("Executing get_network_config tool")
|
||||
response_data = await make_graphql_request(query)
|
||||
return response_data.get("network", {})
|
||||
network = response_data.get("network", {})
|
||||
return dict(network) if isinstance(network, dict) else {}
|
||||
except Exception as e:
|
||||
logger.error(f"Error in get_network_config: {e}", exc_info=True)
|
||||
raise ToolError(f"Failed to retrieve network configuration: {str(e)}")
|
||||
raise ToolError(f"Failed to retrieve network configuration: {str(e)}") from e
|
||||
|
||||
@mcp.tool()
|
||||
async def get_registration_info() -> Dict[str, Any]:
|
||||
async def get_registration_info() -> dict[str, Any]:
|
||||
"""Retrieves Unraid registration details."""
|
||||
query = """
|
||||
query GetRegistrationInfo {
|
||||
@@ -247,13 +252,14 @@ def register_system_tools(mcp: FastMCP):
|
||||
try:
|
||||
logger.info("Executing get_registration_info tool")
|
||||
response_data = await make_graphql_request(query)
|
||||
return response_data.get("registration", {})
|
||||
registration = response_data.get("registration", {})
|
||||
return dict(registration) if isinstance(registration, dict) else {}
|
||||
except Exception as e:
|
||||
logger.error(f"Error in get_registration_info: {e}", exc_info=True)
|
||||
raise ToolError(f"Failed to retrieve registration information: {str(e)}")
|
||||
raise ToolError(f"Failed to retrieve registration information: {str(e)}") from e
|
||||
|
||||
@mcp.tool()
|
||||
async def get_connect_settings() -> Dict[str, Any]:
|
||||
async def get_connect_settings() -> dict[str, Any]:
|
||||
"""Retrieves settings related to Unraid Connect."""
|
||||
# Based on actual schema: settings.unified.values contains the JSON settings
|
||||
query = """
|
||||
@@ -268,7 +274,7 @@ def register_system_tools(mcp: FastMCP):
|
||||
try:
|
||||
logger.info("Executing get_connect_settings tool")
|
||||
response_data = await make_graphql_request(query)
|
||||
|
||||
|
||||
# Navigate down to the unified settings values
|
||||
if response_data.get("settings") and response_data["settings"].get("unified"):
|
||||
values = response_data["settings"]["unified"].get("values", {})
|
||||
@@ -280,15 +286,15 @@ def register_system_tools(mcp: FastMCP):
|
||||
if 'connect' in key.lower() or key in ['accessType', 'forwardType', 'port']:
|
||||
connect_settings[key] = value
|
||||
return connect_settings if connect_settings else values
|
||||
return values
|
||||
return dict(values) if isinstance(values, dict) else {}
|
||||
return {}
|
||||
except Exception as e:
|
||||
logger.error(f"Error in get_connect_settings: {e}", exc_info=True)
|
||||
raise ToolError(f"Failed to retrieve Unraid Connect settings: {str(e)}")
|
||||
raise ToolError(f"Failed to retrieve Unraid Connect settings: {str(e)}") from e
|
||||
|
||||
@mcp.tool()
|
||||
async def get_unraid_variables() -> Dict[str, Any]:
|
||||
"""Retrieves a selection of Unraid system variables and settings.
|
||||
async def get_unraid_variables() -> dict[str, Any]:
|
||||
"""Retrieves a selection of Unraid system variables and settings.
|
||||
Note: Many variables are omitted due to API type issues (Int overflow/NaN).
|
||||
"""
|
||||
# Querying a smaller, curated set of fields to avoid Int overflow and NaN issues
|
||||
@@ -377,9 +383,10 @@ def register_system_tools(mcp: FastMCP):
|
||||
try:
|
||||
logger.info("Executing get_unraid_variables tool with a selective query")
|
||||
response_data = await make_graphql_request(query)
|
||||
return response_data.get("vars", {})
|
||||
vars_data = response_data.get("vars", {})
|
||||
return dict(vars_data) if isinstance(vars_data, dict) else {}
|
||||
except Exception as e:
|
||||
logger.error(f"Error in get_unraid_variables: {e}", exc_info=True)
|
||||
raise ToolError(f"Failed to retrieve Unraid variables: {str(e)}")
|
||||
raise ToolError(f"Failed to retrieve Unraid variables: {str(e)}") from e
|
||||
|
||||
logger.info("System tools registered successfully")
|
||||
logger.info("System tools registered successfully")
|
||||
|
||||
Reference in New Issue
Block a user