Remove unused MCP resources and update documentation

- Remove array_status, system_info, notifications_overview, and parity_status resources
- Keep only logs_stream resource (unraid://logs/stream) which is working properly
- Update README.md with current resource documentation and modern docker compose syntax
- Fix import path issues that were causing subscription errors
- Update environment configuration examples
- Clean up subscription manager to only include working log streaming

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Jacob Magar
2025-08-11 14:19:27 -04:00
parent f355511fe6
commit b00d78f408
29 changed files with 3641 additions and 2561 deletions

View File

@@ -0,0 +1 @@
"""MCP tools organized by functional domain."""

387
unraid_mcp/tools/docker.py Normal file
View File

@@ -0,0 +1,387 @@
"""Docker container management tools.
This module provides tools for Docker container lifecycle and management
including listing containers with caching options, start/stop operations,
and detailed container information retrieval.
"""
from typing import Any
from fastmcp import FastMCP
from ..config.logging import logger
from ..core.client import make_graphql_request
from ..core.exceptions import ToolError
def find_container_by_identifier(container_identifier: str, containers: list[dict[str, Any]]) -> dict[str, Any] | None:
"""Find a container by ID or name with fuzzy matching.
Args:
container_identifier: Container ID or name to find
containers: List of container dictionaries to search
Returns:
Container dictionary if found, None otherwise
"""
if not containers:
return None
# Exact matches first
for container in containers:
if container.get("id") == container_identifier:
return container
# Check all names for exact match
names = container.get("names", [])
if container_identifier in names:
return container
# Fuzzy matching - case insensitive partial matches
container_identifier_lower = container_identifier.lower()
for container in containers:
names = container.get("names", [])
for name in names:
if container_identifier_lower in name.lower() or name.lower() in container_identifier_lower:
logger.info(f"Found container via fuzzy match: '{container_identifier}' -> '{name}'")
return container
return None
def get_available_container_names(containers: list[dict[str, Any]]) -> list[str]:
"""Extract all available container names for error reporting.
Args:
containers: List of container dictionaries
Returns:
List of container names
"""
names = []
for container in containers:
container_names = container.get("names", [])
names.extend(container_names)
return names
def register_docker_tools(mcp: FastMCP):
"""Register all Docker tools with the FastMCP instance.
Args:
mcp: FastMCP instance to register tools with
"""
@mcp.tool()
async def list_docker_containers() -> list[dict[str, Any]]:
"""Lists all Docker containers on the Unraid system.
Returns:
List of Docker container information dictionaries
"""
query = """
query ListDockerContainers {
docker {
containers(skipCache: false) {
id
names
image
state
status
autoStart
}
}
}
"""
try:
logger.info("Executing list_docker_containers tool")
response_data = await make_graphql_request(query)
if response_data.get("docker"):
return response_data["docker"].get("containers", [])
return []
except Exception as e:
logger.error(f"Error in list_docker_containers: {e}", exc_info=True)
raise ToolError(f"Failed to list Docker containers: {str(e)}")
@mcp.tool()
async def manage_docker_container(container_id: str, action: str) -> dict[str, Any]:
"""Starts or stops a specific Docker container. Action must be 'start' or 'stop'.
Args:
container_id: Container ID to manage
action: Action to perform - 'start' or 'stop'
Returns:
Dict containing operation result and container information
"""
import asyncio
if action.lower() not in ["start", "stop"]:
logger.warning(f"Invalid action '{action}' for manage_docker_container")
raise ToolError("Invalid action. Must be 'start' or 'stop'.")
mutation_name = action.lower()
# Step 1: Execute the operation mutation
operation_query = f"""
mutation ManageDockerContainer($id: PrefixedID!) {{
docker {{
{mutation_name}(id: $id) {{
id
names
state
status
}}
}}
}}
"""
variables = {"id": container_id}
try:
logger.info(f"Executing manage_docker_container: action={action}, id={container_id}")
# Step 1: Resolve container identifier to actual container ID if needed
actual_container_id = container_id
if not container_id.startswith("3cb1026338736ed07b8afec2c484e429710b0f6550dc65d0c5c410ea9d0fa6b2:"):
# This looks like a name, not a full container ID - need to resolve it
logger.info(f"Resolving container identifier '{container_id}' to actual container ID")
list_query = """
query ResolveContainerID {
docker {
containers(skipCache: true) {
id
names
}
}
}
"""
list_response = await make_graphql_request(list_query)
if list_response.get("docker"):
containers = list_response["docker"].get("containers", [])
resolved_container = find_container_by_identifier(container_id, containers)
if resolved_container:
actual_container_id = resolved_container.get("id")
logger.info(f"Resolved '{container_id}' to container ID: {actual_container_id}")
else:
available_names = get_available_container_names(containers)
error_msg = f"Container '{container_id}' not found for {action} operation."
if available_names:
error_msg += f" Available containers: {', '.join(available_names[:10])}"
raise ToolError(error_msg)
# Update variables with the actual container ID
variables = {"id": actual_container_id}
# Execute the operation with idempotent error handling
operation_context = {"operation": action}
operation_response = await make_graphql_request(
operation_query,
variables,
operation_context=operation_context
)
# Handle idempotent success case
if operation_response.get("idempotent_success"):
logger.info(f"Container {action} operation was idempotent: {operation_response.get('message')}")
# Get current container state since the operation was already complete
try:
list_query = """
query GetContainerStateAfterIdempotent($skipCache: Boolean!) {
docker {
containers(skipCache: $skipCache) {
id
names
image
state
status
autoStart
}
}
}
"""
list_response = await make_graphql_request(list_query, {"skipCache": True})
if list_response.get("docker"):
containers = list_response["docker"].get("containers", [])
container = find_container_by_identifier(container_id, containers)
if container:
return {
"operation_result": {"id": container_id, "names": container.get("names", [])},
"container_details": container,
"success": True,
"message": f"Container {action} operation was already complete - current state returned",
"idempotent": True
}
except Exception as lookup_error:
logger.warning(f"Could not retrieve container state after idempotent operation: {lookup_error}")
return {
"operation_result": {"id": container_id},
"container_details": None,
"success": True,
"message": f"Container {action} operation was already complete",
"idempotent": True
}
# Handle normal successful operation
if not (operation_response.get("docker") and operation_response["docker"].get(mutation_name)):
raise ToolError(f"Failed to execute {action} operation on container")
operation_result = operation_response["docker"][mutation_name]
logger.info(f"Container {action} operation completed for {container_id}")
# Step 2: Wait briefly for state to propagate, then fetch current container details
await asyncio.sleep(1.0) # Give the container state time to update
# Step 3: Try to get updated container details with retry logic
max_retries = 3
retry_delay = 1.0
for attempt in range(max_retries):
try:
# Query all containers and find the one we just operated on
list_query = """
query GetUpdatedContainerState($skipCache: Boolean!) {
docker {
containers(skipCache: $skipCache) {
id
names
image
state
status
autoStart
}
}
}
"""
# Skip cache to get fresh data
list_response = await make_graphql_request(list_query, {"skipCache": True})
if list_response.get("docker"):
containers = list_response["docker"].get("containers", [])
# Find the container using our helper function
container = find_container_by_identifier(container_id, containers)
if container:
logger.info(f"Found updated container state for {container_id}")
return {
"operation_result": operation_result,
"container_details": container,
"success": True,
"message": f"Container {action} operation completed successfully"
}
# If not found in this attempt, wait and retry
if attempt < max_retries - 1:
logger.warning(f"Container {container_id} not found after {action}, retrying in {retry_delay}s (attempt {attempt + 1}/{max_retries})")
await asyncio.sleep(retry_delay)
retry_delay *= 1.5 # Exponential backoff
except Exception as query_error:
logger.warning(f"Error querying updated container state (attempt {attempt + 1}): {query_error}")
if attempt < max_retries - 1:
await asyncio.sleep(retry_delay)
retry_delay *= 1.5
else:
# On final attempt failure, still return operation success
logger.warning(f"Could not retrieve updated container details after {action}, but operation succeeded")
return {
"operation_result": operation_result,
"container_details": None,
"success": True,
"message": f"Container {action} operation completed, but updated state could not be retrieved",
"warning": "Container state query failed after operation - this may be due to timing or the container not being found in the updated state"
}
# If we get here, all retries failed to find the container
logger.warning(f"Container {container_id} not found in any retry attempt after {action}")
return {
"operation_result": operation_result,
"container_details": None,
"success": True,
"message": f"Container {action} operation completed, but container not found in subsequent queries",
"warning": "Container not found in updated state - this may indicate the operation succeeded but container is no longer listed"
}
except Exception as e:
logger.error(f"Error in manage_docker_container ({action}): {e}", exc_info=True)
raise ToolError(f"Failed to {action} Docker container: {str(e)}")
@mcp.tool()
async def get_docker_container_details(container_identifier: str) -> dict[str, Any]:
"""Retrieves detailed information for a specific Docker container by its ID or name.
Args:
container_identifier: Container ID or name to retrieve details for
Returns:
Dict containing detailed container information
"""
# This tool fetches all containers and then filters by ID or name.
# More detailed query for a single container if found:
detailed_query_fields = """
id
names
image
imageId
command
created
ports { ip privatePort publicPort type }
sizeRootFs
labels # JSONObject
state
status
hostConfig { networkMode }
networkSettings # JSONObject
mounts # JSONObject array
autoStart
"""
# Fetch all containers first
list_query = f"""
query GetAllContainerDetailsForFiltering {{
docker {{
containers(skipCache: false) {{
{detailed_query_fields}
}}
}}
}}
"""
try:
logger.info(f"Executing get_docker_container_details for identifier: {container_identifier}")
response_data = await make_graphql_request(list_query)
containers = []
if response_data.get("docker"):
containers = response_data["docker"].get("containers", [])
# Use our enhanced container lookup
container = find_container_by_identifier(container_identifier, containers)
if container:
logger.info(f"Found container {container_identifier}")
return container
# Container not found - provide helpful error message with available containers
available_names = get_available_container_names(containers)
logger.warning(f"Container with identifier '{container_identifier}' not found.")
logger.info(f"Available containers: {available_names}")
error_msg = f"Container '{container_identifier}' not found."
if available_names:
error_msg += f" Available containers: {', '.join(available_names[:10])}" # Limit to first 10
if len(available_names) > 10:
error_msg += f" (and {len(available_names) - 10} more)"
else:
error_msg += " No containers are currently available."
raise ToolError(error_msg)
except Exception as e:
logger.error(f"Error in get_docker_container_details: {e}", exc_info=True)
raise ToolError(f"Failed to retrieve Docker container details: {str(e)}")
logger.info("Docker tools registered successfully")

187
unraid_mcp/tools/health.py Normal file
View File

@@ -0,0 +1,187 @@
"""Comprehensive health monitoring tools.
This module provides tools for comprehensive health checks of the Unraid MCP server
and the underlying Unraid system, including performance metrics, system status,
notifications, Docker services, and API responsiveness.
"""
import datetime
import time
from typing import Any, Dict
from fastmcp import FastMCP
from ..config.logging import logger
from ..config.settings import UNRAID_API_URL, UNRAID_MCP_HOST, UNRAID_MCP_PORT, UNRAID_MCP_TRANSPORT
from ..core.client import make_graphql_request
from ..core.exceptions import ToolError
def register_health_tools(mcp: FastMCP):
"""Register all health tools with the FastMCP instance.
Args:
mcp: FastMCP instance to register tools with
"""
@mcp.tool()
async def health_check() -> Dict[str, Any]:
"""Returns comprehensive health status of the Unraid MCP server and system for monitoring purposes."""
start_time = time.time()
health_status = "healthy"
issues = []
try:
# Enhanced health check with multiple system components
comprehensive_query = """
query ComprehensiveHealthCheck {
info {
machineId
time
versions { unraid }
os { uptime }
}
array {
state
}
notifications {
overview {
unread { alert warning total }
}
}
docker {
containers(skipCache: true) {
id
state
status
}
}
}
"""
response_data = await make_graphql_request(comprehensive_query)
api_latency = round((time.time() - start_time) * 1000, 2) # ms
# Base health info
health_info = {
"status": health_status,
"timestamp": datetime.datetime.utcnow().isoformat(),
"api_latency_ms": api_latency,
"server": {
"name": "Unraid MCP Server",
"version": "0.1.0",
"transport": UNRAID_MCP_TRANSPORT,
"host": UNRAID_MCP_HOST,
"port": UNRAID_MCP_PORT,
"process_uptime_seconds": time.time() - start_time # Rough estimate
}
}
if not response_data:
health_status = "unhealthy"
issues.append("No response from Unraid API")
health_info["status"] = health_status
health_info["issues"] = issues
return health_info
# System info analysis
info = response_data.get("info", {})
if info:
health_info["unraid_system"] = {
"status": "connected",
"url": UNRAID_API_URL,
"machine_id": info.get("machineId"),
"time": info.get("time"),
"version": info.get("versions", {}).get("unraid"),
"uptime": info.get("os", {}).get("uptime")
}
else:
health_status = "degraded"
issues.append("Unable to retrieve system info")
# Array health analysis
array_info = response_data.get("array", {})
if array_info:
array_state = array_info.get("state", "unknown")
health_info["array_status"] = {
"state": array_state,
"healthy": array_state in ["STARTED", "STOPPED"]
}
if array_state not in ["STARTED", "STOPPED"]:
health_status = "warning"
issues.append(f"Array in unexpected state: {array_state}")
else:
health_status = "warning"
issues.append("Unable to retrieve array status")
# Notifications analysis
notifications = response_data.get("notifications", {})
if notifications and notifications.get("overview"):
unread = notifications["overview"].get("unread", {})
alert_count = unread.get("alert", 0)
warning_count = unread.get("warning", 0)
total_unread = unread.get("total", 0)
health_info["notifications"] = {
"unread_total": total_unread,
"unread_alerts": alert_count,
"unread_warnings": warning_count,
"has_critical_notifications": alert_count > 0
}
if alert_count > 0:
health_status = "warning"
issues.append(f"{alert_count} unread alert notification(s)")
# Docker services analysis
docker_info = response_data.get("docker", {})
if docker_info and docker_info.get("containers"):
containers = docker_info["containers"]
running_containers = [c for c in containers if c.get("state") == "running"]
stopped_containers = [c for c in containers if c.get("state") == "exited"]
health_info["docker_services"] = {
"total_containers": len(containers),
"running_containers": len(running_containers),
"stopped_containers": len(stopped_containers),
"containers_healthy": len([c for c in containers if c.get("status", "").startswith("Up")])
}
# API performance assessment
if api_latency > 5000: # > 5 seconds
health_status = "warning"
issues.append(f"High API latency: {api_latency}ms")
elif api_latency > 10000: # > 10 seconds
health_status = "degraded"
issues.append(f"Very high API latency: {api_latency}ms")
# Final status determination
health_info["status"] = health_status
if issues:
health_info["issues"] = issues
# Add performance metrics
health_info["performance"] = {
"api_response_time_ms": api_latency,
"health_check_duration_ms": round((time.time() - start_time) * 1000, 2)
}
return health_info
except Exception as e:
logger.error(f"Health check failed: {e}")
return {
"status": "unhealthy",
"timestamp": datetime.datetime.utcnow().isoformat(),
"error": str(e),
"api_latency_ms": round((time.time() - start_time) * 1000, 2) if 'start_time' in locals() else None,
"server": {
"name": "Unraid MCP Server",
"version": "0.1.0",
"transport": UNRAID_MCP_TRANSPORT,
"host": UNRAID_MCP_HOST,
"port": UNRAID_MCP_PORT
}
}
logger.info("Health tools registered successfully")

178
unraid_mcp/tools/rclone.py Normal file
View File

@@ -0,0 +1,178 @@
"""RClone cloud storage remote management tools.
This module provides tools for managing RClone remotes including listing existing
remotes, getting configuration forms, creating new remotes, and deleting remotes
for various cloud storage providers (S3, Google Drive, Dropbox, FTP, etc.).
"""
from typing import Any, Dict, List, Optional
from fastmcp import FastMCP
from ..config.logging import logger
from ..core.client import make_graphql_request
from ..core.exceptions import ToolError
def register_rclone_tools(mcp: FastMCP):
"""Register all RClone tools with the FastMCP instance.
Args:
mcp: FastMCP instance to register tools with
"""
@mcp.tool()
async def list_rclone_remotes() -> List[Dict[str, Any]]:
"""Retrieves all configured RClone remotes with their configuration details."""
try:
query = """
query ListRCloneRemotes {
rclone {
remotes {
name
type
parameters
config
}
}
}
"""
response_data = await make_graphql_request(query)
if "rclone" in response_data and "remotes" in response_data["rclone"]:
remotes = response_data["rclone"]["remotes"]
logger.info(f"Retrieved {len(remotes)} RClone remotes")
return remotes
return []
except Exception as e:
logger.error(f"Failed to list RClone remotes: {str(e)}")
raise ToolError(f"Failed to list RClone remotes: {str(e)}")
@mcp.tool()
async def get_rclone_config_form(provider_type: Optional[str] = None) -> Dict[str, Any]:
"""
Get RClone configuration form schema for setting up new remotes.
Args:
provider_type: Optional provider type to get specific form (e.g., 's3', 'drive', 'dropbox')
"""
try:
query = """
query GetRCloneConfigForm($formOptions: RCloneConfigFormInput) {
rclone {
configForm(formOptions: $formOptions) {
id
dataSchema
uiSchema
}
}
}
"""
variables = {}
if provider_type:
variables["formOptions"] = {"providerType": provider_type}
response_data = await make_graphql_request(query, variables)
if "rclone" in response_data and "configForm" in response_data["rclone"]:
form_data = response_data["rclone"]["configForm"]
logger.info(f"Retrieved RClone config form for {provider_type or 'general'}")
return form_data
raise ToolError("No RClone config form data received")
except Exception as e:
logger.error(f"Failed to get RClone config form: {str(e)}")
raise ToolError(f"Failed to get RClone config form: {str(e)}")
@mcp.tool()
async def create_rclone_remote(name: str, provider_type: str, config_data: Dict[str, Any]) -> Dict[str, Any]:
"""
Create a new RClone remote with the specified configuration.
Args:
name: Name for the new remote
provider_type: Type of provider (e.g., 's3', 'drive', 'dropbox', 'ftp')
config_data: Configuration parameters specific to the provider type
"""
try:
mutation = """
mutation CreateRCloneRemote($input: CreateRCloneRemoteInput!) {
rclone {
createRCloneRemote(input: $input) {
name
type
parameters
}
}
}
"""
variables = {
"input": {
"name": name,
"type": provider_type,
"config": config_data
}
}
response_data = await make_graphql_request(mutation, variables)
if "rclone" in response_data and "createRCloneRemote" in response_data["rclone"]:
remote_info = response_data["rclone"]["createRCloneRemote"]
logger.info(f"Successfully created RClone remote: {name}")
return {
"success": True,
"message": f"RClone remote '{name}' created successfully",
"remote": remote_info
}
raise ToolError("Failed to create RClone remote")
except Exception as e:
logger.error(f"Failed to create RClone remote {name}: {str(e)}")
raise ToolError(f"Failed to create RClone remote {name}: {str(e)}")
@mcp.tool()
async def delete_rclone_remote(name: str) -> Dict[str, Any]:
"""
Delete an existing RClone remote by name.
Args:
name: Name of the remote to delete
"""
try:
mutation = """
mutation DeleteRCloneRemote($input: DeleteRCloneRemoteInput!) {
rclone {
deleteRCloneRemote(input: $input)
}
}
"""
variables = {
"input": {
"name": name
}
}
response_data = await make_graphql_request(mutation, variables)
if "rclone" in response_data and response_data["rclone"]["deleteRCloneRemote"]:
logger.info(f"Successfully deleted RClone remote: {name}")
return {
"success": True,
"message": f"RClone remote '{name}' deleted successfully"
}
raise ToolError(f"Failed to delete RClone remote '{name}'")
except Exception as e:
logger.error(f"Failed to delete RClone remote {name}: {str(e)}")
raise ToolError(f"Failed to delete RClone remote {name}: {str(e)}")
logger.info("RClone tools registered successfully")

270
unraid_mcp/tools/storage.py Normal file
View File

@@ -0,0 +1,270 @@
"""Storage, disk, and notification management tools.
This module provides tools for managing user shares, notifications,
log files, physical disks with SMART data, and system storage operations
with custom timeout configurations for disk-intensive operations.
"""
from typing import Any, Dict, List, Optional
import httpx
from fastmcp import FastMCP
from ..config.logging import logger
from ..core.client import make_graphql_request
from ..core.exceptions import ToolError
def register_storage_tools(mcp: FastMCP):
"""Register all storage tools with the FastMCP instance.
Args:
mcp: FastMCP instance to register tools with
"""
@mcp.tool()
async def get_shares_info() -> List[Dict[str, Any]]:
"""Retrieves information about user shares."""
query = """
query GetSharesInfo {
shares {
id
name
free
used
size
include
exclude
cache
nameOrig
comment
allocator
splitLevel
floor
cow
color
luksStatus
}
}
"""
try:
logger.info("Executing get_shares_info tool")
response_data = await make_graphql_request(query)
return response_data.get("shares", [])
except Exception as e:
logger.error(f"Error in get_shares_info: {e}", exc_info=True)
raise ToolError(f"Failed to retrieve shares information: {str(e)}")
@mcp.tool()
async def get_notifications_overview() -> Dict[str, Any]:
"""Retrieves an overview of system notifications (unread and archive counts by severity)."""
query = """
query GetNotificationsOverview {
notifications {
overview {
unread { info warning alert total }
archive { info warning alert total }
}
}
}
"""
try:
logger.info("Executing get_notifications_overview tool")
response_data = await make_graphql_request(query)
if response_data.get("notifications"):
return response_data["notifications"].get("overview", {})
return {}
except Exception as e:
logger.error(f"Error in get_notifications_overview: {e}", exc_info=True)
raise ToolError(f"Failed to retrieve notifications overview: {str(e)}")
@mcp.tool()
async def list_notifications(
type: str,
offset: int,
limit: int,
importance: Optional[str] = None
) -> List[Dict[str, Any]]:
"""Lists notifications with filtering. Type: UNREAD/ARCHIVE. Importance: INFO/WARNING/ALERT."""
query = """
query ListNotifications($filter: NotificationFilter!) {
notifications {
list(filter: $filter) {
id
title
subject
description
importance
link
type
timestamp
formattedTimestamp
}
}
}
"""
variables = {
"filter": {
"type": type.upper(),
"offset": offset,
"limit": limit,
"importance": importance.upper() if importance else None
}
}
# Remove null importance from variables if not provided, as GraphQL might be strict
if not importance:
del variables["filter"]["importance"]
try:
logger.info(f"Executing list_notifications: type={type}, offset={offset}, limit={limit}, importance={importance}")
response_data = await make_graphql_request(query, variables)
if response_data.get("notifications"):
return response_data["notifications"].get("list", [])
return []
except Exception as e:
logger.error(f"Error in list_notifications: {e}", exc_info=True)
raise ToolError(f"Failed to list notifications: {str(e)}")
@mcp.tool()
async def list_available_log_files() -> List[Dict[str, Any]]:
"""Lists all available log files that can be queried."""
query = """
query ListLogFiles {
logFiles {
name
path
size
modifiedAt
}
}
"""
try:
logger.info("Executing list_available_log_files tool")
response_data = await make_graphql_request(query)
return response_data.get("logFiles", [])
except Exception as e:
logger.error(f"Error in list_available_log_files: {e}", exc_info=True)
raise ToolError(f"Failed to list available log files: {str(e)}")
@mcp.tool()
async def get_logs(log_file_path: str, tail_lines: int = 100) -> Dict[str, Any]:
"""Retrieves content from a specific log file, defaulting to the last 100 lines."""
# The Unraid GraphQL API Query.logFile takes 'lines' and 'startLine'.
# To implement 'tail_lines', we would ideally need to know the total lines first,
# then calculate startLine. However, Query.logFile itself returns totalLines.
# A simple approach for 'tail' is to request a large number of lines if totalLines is not known beforehand,
# and let the API handle it, or make two calls (one to get totalLines, then another).
# For now, let's assume 'lines' parameter in Query.logFile effectively means tail if startLine is not given.
# If not, this tool might need to be smarter or the API might not directly support 'tail' easily.
# The SDL for LogFileContent implies it returns startLine, so it seems aware of ranges.
# Let's try fetching with just 'lines' to see if it acts as a tail,
# or if we need Query.logFiles first to get totalLines for calculation.
# For robust tailing, one might need to fetch totalLines first, then calculate start_line for the tail.
# Simplified: query for the last 'tail_lines'. If the API doesn't support tailing this way, we may need adjustment.
# The current plan is to pass 'lines=tail_lines' directly.
query = """
query GetLogContent($path: String!, $lines: Int) {
logFile(path: $path, lines: $lines) {
path
content
totalLines
startLine
}
}
"""
variables = {"path": log_file_path, "lines": tail_lines}
try:
logger.info(f"Executing get_logs for {log_file_path}, tail_lines={tail_lines}")
response_data = await make_graphql_request(query, variables)
return response_data.get("logFile", {})
except Exception as e:
logger.error(f"Error in get_logs for {log_file_path}: {e}", exc_info=True)
raise ToolError(f"Failed to retrieve logs from {log_file_path}: {str(e)}")
@mcp.tool()
async def list_physical_disks() -> List[Dict[str, Any]]:
"""Lists all physical disks recognized by the Unraid system."""
# Querying an extremely minimal set of fields for diagnostics
query = """
query ListPhysicalDisksMinimal {
disks {
id
device
name
}
}
"""
try:
logger.info("Executing list_physical_disks tool with minimal query and increased timeout")
# Increased read timeout for this potentially slow query
long_timeout = httpx.Timeout(10.0, read=90.0, connect=5.0)
response_data = await make_graphql_request(query, custom_timeout=long_timeout)
return response_data.get("disks", [])
except Exception as e:
logger.error(f"Error in list_physical_disks: {e}", exc_info=True)
raise ToolError(f"Failed to list physical disks: {str(e)}")
@mcp.tool()
async def get_disk_details(disk_id: str) -> Dict[str, Any]:
"""Retrieves detailed SMART information and partition data for a specific physical disk."""
# Enhanced query with more comprehensive disk information
query = """
query GetDiskDetails($id: PrefixedID!) {
disk(id: $id) {
id
device
name
serialNum
size
temperature
}
}
"""
variables = {"id": disk_id}
try:
logger.info(f"Executing get_disk_details for disk: {disk_id}")
response_data = await make_graphql_request(query, variables)
raw_disk = response_data.get("disk", {})
if not raw_disk:
raise ToolError(f"Disk '{disk_id}' not found")
# Process disk information for human-readable output
def format_bytes(bytes_value):
if bytes_value is None: return "N/A"
bytes_value = int(bytes_value)
for unit in ['B', 'KB', 'MB', 'GB', 'TB', 'PB']:
if bytes_value < 1024.0:
return f"{bytes_value:.2f} {unit}"
bytes_value /= 1024.0
return f"{bytes_value:.2f} EB"
summary = {
'disk_id': raw_disk.get('id'),
'device': raw_disk.get('device'),
'name': raw_disk.get('name'),
'serial_number': raw_disk.get('serialNum'),
'size_formatted': format_bytes(raw_disk.get('size')),
'temperature': f"{raw_disk.get('temperature')}°C" if raw_disk.get('temperature') else 'N/A',
'interface_type': raw_disk.get('interfaceType'),
'smart_status': raw_disk.get('smartStatus'),
'is_spinning': raw_disk.get('isSpinning'),
'power_on_hours': raw_disk.get('powerOnHours'),
'reallocated_sectors': raw_disk.get('reallocatedSectorCount'),
'partition_count': len(raw_disk.get('partitions', [])),
'total_partition_size': format_bytes(sum(p.get('size', 0) for p in raw_disk.get('partitions', []) if p.get('size')))
}
return {
'summary': summary,
'partitions': raw_disk.get('partitions', []),
'details': raw_disk
}
except Exception as e:
logger.error(f"Error in get_disk_details for {disk_id}: {e}", exc_info=True)
raise ToolError(f"Failed to retrieve disk details for {disk_id}: {str(e)}")
logger.info("Storage tools registered successfully")

385
unraid_mcp/tools/system.py Normal file
View File

@@ -0,0 +1,385 @@
"""System information and array status tools.
This module provides tools for retrieving core Unraid system information,
array status with health analysis, network configuration, registration info,
and system variables.
"""
from typing import Any, Dict
from fastmcp import FastMCP
from ..config.logging import logger
from ..core.client import make_graphql_request
from ..core.exceptions import ToolError
# Standalone functions for use by subscription resources
async def _get_system_info() -> Dict[str, Any]:
"""Standalone function to get system info - used by subscriptions and tools."""
query = """
query GetSystemInfo {
info {
os { platform distro release codename kernel arch hostname codepage logofile serial build uptime }
cpu { manufacturer brand vendor family model stepping revision voltage speed speedmin speedmax threads cores processors socket cache flags }
memory {
# Avoid fetching problematic fields that cause type errors
layout { bank type clockSpeed formFactor manufacturer partNum serialNum }
}
baseboard { manufacturer model version serial assetTag }
system { manufacturer model version serial uuid sku }
versions { kernel openssl systemOpenssl systemOpensslLib node v8 npm yarn pm2 gulp grunt git tsc mysql redis mongodb apache nginx php docker postfix postgresql perl python gcc unraid }
apps { installed started }
# Remove devices section as it has non-nullable fields that might be null
machineId
time
}
}
"""
try:
logger.info("Executing get_system_info")
response_data = await make_graphql_request(query)
raw_info = response_data.get("info", {})
if not raw_info:
raise ToolError("No system info returned from Unraid API")
# Process for human-readable output
summary = {}
if raw_info.get('os'):
os_info = raw_info['os']
summary['os'] = f"{os_info.get('distro', '')} {os_info.get('release', '')} ({os_info.get('platform', '')}, {os_info.get('arch', '')})"
summary['hostname'] = os_info.get('hostname')
summary['uptime'] = os_info.get('uptime')
if raw_info.get('cpu'):
cpu_info = raw_info['cpu']
summary['cpu'] = f"{cpu_info.get('manufacturer', '')} {cpu_info.get('brand', '')} ({cpu_info.get('cores')} cores, {cpu_info.get('threads')} threads)"
if raw_info.get('memory') and raw_info['memory'].get('layout'):
mem_layout = raw_info['memory']['layout']
summary['memory_layout_details'] = [] # Renamed for clarity
# The API is not returning 'size' for individual sticks in the layout, even if queried.
# So, we cannot calculate total from layout currently.
for stick in mem_layout:
# stick_size = stick.get('size') # This is None in the actual API response
summary['memory_layout_details'].append(
f"Bank {stick.get('bank', '?')}: Type {stick.get('type', '?')}, Speed {stick.get('clockSpeed', '?')}MHz, Manufacturer: {stick.get('manufacturer','?')}, Part: {stick.get('partNum', '?')}"
)
summary['memory_summary'] = "Stick layout details retrieved. Overall total/used/free memory stats are unavailable due to API limitations (Int overflow or data not provided by API)."
else:
summary['memory_summary'] = "Memory information (layout or stats) not available or failed to retrieve."
# Include a key for the full details if needed by an LLM for deeper dives
return {"summary": summary, "details": raw_info}
except Exception as e:
logger.error(f"Error in get_system_info: {e}", exc_info=True)
raise ToolError(f"Failed to retrieve system information: {str(e)}")
async def _get_array_status() -> Dict[str, Any]:
"""Standalone function to get array status - used by subscriptions and tools."""
query = """
query GetArrayStatus {
array {
id
state
capacity {
kilobytes { free used total }
disks { free used total }
}
boot { id idx name device size status rotational temp numReads numWrites numErrors fsSize fsFree fsUsed exportable type warning critical fsType comment format transport color }
parities { id idx name device size status rotational temp numReads numWrites numErrors fsSize fsFree fsUsed exportable type warning critical fsType comment format transport color }
disks { id idx name device size status rotational temp numReads numWrites numErrors fsSize fsFree fsUsed exportable type warning critical fsType comment format transport color }
caches { id idx name device size status rotational temp numReads numWrites numErrors fsSize fsFree fsUsed exportable type warning critical fsType comment format transport color }
}
}
"""
try:
logger.info("Executing get_array_status")
response_data = await make_graphql_request(query)
raw_array_info = response_data.get("array", {})
if not raw_array_info:
raise ToolError("No array information returned from Unraid API")
summary = {}
summary['state'] = raw_array_info.get('state')
if raw_array_info.get('capacity') and raw_array_info['capacity'].get('kilobytes'):
kb_cap = raw_array_info['capacity']['kilobytes']
# Helper to format KB into TB/GB/MB
def format_kb(k):
if k is None: return "N/A"
k = int(k) # Values are strings in SDL for PrefixedID containing types like capacity
if k >= 1024*1024*1024: return f"{k / (1024*1024*1024):.2f} TB"
if k >= 1024*1024: return f"{k / (1024*1024):.2f} GB"
if k >= 1024: return f"{k / 1024:.2f} MB"
return f"{k} KB"
summary['capacity_total'] = format_kb(kb_cap.get('total'))
summary['capacity_used'] = format_kb(kb_cap.get('used'))
summary['capacity_free'] = format_kb(kb_cap.get('free'))
summary['num_parity_disks'] = len(raw_array_info.get('parities', []))
summary['num_data_disks'] = len(raw_array_info.get('disks', []))
summary['num_cache_pools'] = len(raw_array_info.get('caches', [])) # Note: caches are pools, not individual cache disks
# Enhanced: Add disk health summary
def analyze_disk_health(disks, disk_type):
"""Analyze health status of disk arrays"""
if not disks:
return {}
health_counts = {
'healthy': 0,
'failed': 0,
'missing': 0,
'new': 0,
'warning': 0,
'unknown': 0
}
for disk in disks:
status = disk.get('status', '').upper()
warning = disk.get('warning')
critical = disk.get('critical')
if status == 'DISK_OK':
if warning or critical:
health_counts['warning'] += 1
else:
health_counts['healthy'] += 1
elif status in ['DISK_DSBL', 'DISK_INVALID']:
health_counts['failed'] += 1
elif status == 'DISK_NP':
health_counts['missing'] += 1
elif status == 'DISK_NEW':
health_counts['new'] += 1
else:
health_counts['unknown'] += 1
return health_counts
# Analyze health for each disk type
health_summary = {}
if raw_array_info.get('parities'):
health_summary['parity_health'] = analyze_disk_health(raw_array_info['parities'], 'parity')
if raw_array_info.get('disks'):
health_summary['data_health'] = analyze_disk_health(raw_array_info['disks'], 'data')
if raw_array_info.get('caches'):
health_summary['cache_health'] = analyze_disk_health(raw_array_info['caches'], 'cache')
# Overall array health assessment
total_failed = sum(h.get('failed', 0) for h in health_summary.values())
total_missing = sum(h.get('missing', 0) for h in health_summary.values())
total_warning = sum(h.get('warning', 0) for h in health_summary.values())
if total_failed > 0:
overall_health = "CRITICAL"
elif total_missing > 0:
overall_health = "DEGRADED"
elif total_warning > 0:
overall_health = "WARNING"
else:
overall_health = "HEALTHY"
summary['overall_health'] = overall_health
summary['health_summary'] = health_summary
return {"summary": summary, "details": raw_array_info}
except Exception as e:
logger.error(f"Error in get_array_status: {e}", exc_info=True)
raise ToolError(f"Failed to retrieve array status: {str(e)}")
def register_system_tools(mcp: FastMCP):
"""Register all system tools with the FastMCP instance.
Args:
mcp: FastMCP instance to register tools with
"""
@mcp.tool()
async def get_system_info() -> Dict[str, Any]:
"""Retrieves comprehensive information about the Unraid system, OS, CPU, memory, and baseboard."""
return await _get_system_info()
@mcp.tool()
async def get_array_status() -> Dict[str, Any]:
"""Retrieves the current status of the Unraid storage array, including its state, capacity, and details of all disks."""
return await _get_array_status()
@mcp.tool()
async def get_network_config() -> Dict[str, Any]:
"""Retrieves network configuration details, including access URLs."""
query = """
query GetNetworkConfig {
network {
id
accessUrls { type name ipv4 ipv6 }
}
}
"""
try:
logger.info("Executing get_network_config tool")
response_data = await make_graphql_request(query)
return response_data.get("network", {})
except Exception as e:
logger.error(f"Error in get_network_config: {e}", exc_info=True)
raise ToolError(f"Failed to retrieve network configuration: {str(e)}")
@mcp.tool()
async def get_registration_info() -> Dict[str, Any]:
"""Retrieves Unraid registration details."""
query = """
query GetRegistrationInfo {
registration {
id
type
keyFile { location contents }
state
expiration
updateExpiration
}
}
"""
try:
logger.info("Executing get_registration_info tool")
response_data = await make_graphql_request(query)
return response_data.get("registration", {})
except Exception as e:
logger.error(f"Error in get_registration_info: {e}", exc_info=True)
raise ToolError(f"Failed to retrieve registration information: {str(e)}")
@mcp.tool()
async def get_connect_settings() -> Dict[str, Any]:
"""Retrieves settings related to Unraid Connect."""
# Based on actual schema: settings.unified.values contains the JSON settings
query = """
query GetConnectSettingsForm {
settings {
unified {
values
}
}
}
"""
try:
logger.info("Executing get_connect_settings tool")
response_data = await make_graphql_request(query)
# Navigate down to the unified settings values
if response_data.get("settings") and response_data["settings"].get("unified"):
values = response_data["settings"]["unified"].get("values", {})
# Filter for Connect-related settings if values is a dict
if isinstance(values, dict):
# Look for connect-related keys in the unified settings
connect_settings = {}
for key, value in values.items():
if 'connect' in key.lower() or key in ['accessType', 'forwardType', 'port']:
connect_settings[key] = value
return connect_settings if connect_settings else values
return values
return {}
except Exception as e:
logger.error(f"Error in get_connect_settings: {e}", exc_info=True)
raise ToolError(f"Failed to retrieve Unraid Connect settings: {str(e)}")
@mcp.tool()
async def get_unraid_variables() -> Dict[str, Any]:
"""Retrieves a selection of Unraid system variables and settings.
Note: Many variables are omitted due to API type issues (Int overflow/NaN).
"""
# Querying a smaller, curated set of fields to avoid Int overflow and NaN issues
# pending Unraid API schema fixes for the full Vars type.
query = """
query GetSelectiveUnraidVariables {
vars {
id
version
name
timeZone
comment
security
workgroup
domain
domainShort
hideDotFiles
localMaster
enableFruit
useNtp
# ntpServer1, ntpServer2, ... are strings, should be okay but numerous
domainLogin # Boolean
sysModel # String
# sysArraySlots, sysCacheSlots are Int, were problematic (NaN)
sysFlashSlots # Int, might be okay if small and always set
useSsl # Boolean
port # Int, usually small
portssl # Int, usually small
localTld # String
bindMgt # Boolean
useTelnet # Boolean
porttelnet # Int, usually small
useSsh # Boolean
portssh # Int, usually small
startPage # String
startArray # Boolean
# spindownDelay, queueDepth are Int, potentially okay if always set
# defaultFormat, defaultFsType are String
shutdownTimeout # Int, potentially okay
# luksKeyfile is String
# pollAttributes, pollAttributesDefault, pollAttributesStatus are Int/String, were problematic (NaN or type)
# nrRequests, nrRequestsDefault, nrRequestsStatus were problematic
# mdNumStripes, mdNumStripesDefault, mdNumStripesStatus were problematic
# mdSyncWindow, mdSyncWindowDefault, mdSyncWindowStatus were problematic
# mdSyncThresh, mdSyncThreshDefault, mdSyncThreshStatus were problematic
# mdWriteMethod, mdWriteMethodDefault, mdWriteMethodStatus were problematic
# shareDisk, shareUser, shareUserInclude, shareUserExclude are String arrays/String
shareSmbEnabled # Boolean
shareNfsEnabled # Boolean
shareAfpEnabled # Boolean
# shareInitialOwner, shareInitialGroup are String
shareCacheEnabled # Boolean
# shareCacheFloor is String (numeric string?)
# shareMoverSchedule, shareMoverLogging are String
# fuseRemember, fuseRememberDefault, fuseRememberStatus are String/Boolean, were problematic
# fuseDirectio, fuseDirectioDefault, fuseDirectioStatus are String/Boolean, were problematic
shareAvahiEnabled # Boolean
# shareAvahiSmbName, shareAvahiSmbModel, shareAvahiAfpName, shareAvahiAfpModel are String
safeMode # Boolean
startMode # String
configValid # Boolean
configError # String
joinStatus # String
deviceCount # Int, might be okay
flashGuid # String
flashProduct # String
flashVendor # String
# regCheck, regFile, regGuid, regTy, regState, regTo, regTm, regTm2, regGen are varied, mostly String/Int
# sbName, sbVersion, sbUpdated, sbEvents, sbState, sbClean, sbSynced, sbSyncErrs, sbSynced2, sbSyncExit are varied
# mdColor, mdNumDisks, mdNumDisabled, mdNumInvalid, mdNumMissing, mdNumNew, mdNumErased are Int, potentially okay if counts
# mdResync, mdResyncCorr, mdResyncPos, mdResyncDb, mdResyncDt, mdResyncAction are varied (Int/Boolean/String)
# mdResyncSize was an overflow
mdState # String (enum)
mdVersion # String
# cacheNumDevices, cacheSbNumDisks were problematic (NaN)
# fsState, fsProgress, fsCopyPrcnt, fsNumMounted, fsNumUnmountable, fsUnmountableMask are varied
shareCount # Int, might be okay
shareSmbCount # Int, might be okay
shareNfsCount # Int, might be okay
shareAfpCount # Int, might be okay
shareMoverActive # Boolean
csrfToken # String
}
}
"""
try:
logger.info("Executing get_unraid_variables tool with a selective query")
response_data = await make_graphql_request(query)
return response_data.get("vars", {})
except Exception as e:
logger.error(f"Error in get_unraid_variables: {e}", exc_info=True)
raise ToolError(f"Failed to retrieve Unraid variables: {str(e)}")
logger.info("System tools registered successfully")

View File

@@ -0,0 +1,162 @@
"""Virtual machine management tools.
This module provides tools for VM lifecycle management and monitoring
including listing VMs, VM operations (start/stop/pause/reboot/etc),
and detailed VM information retrieval.
"""
from typing import Any, Dict, List
from fastmcp import FastMCP
from ..config.logging import logger
from ..core.client import make_graphql_request
from ..core.exceptions import ToolError
def register_vm_tools(mcp: FastMCP):
"""Register all VM tools with the FastMCP instance.
Args:
mcp: FastMCP instance to register tools with
"""
@mcp.tool()
async def list_vms() -> List[Dict[str, Any]]:
"""Lists all Virtual Machines (VMs) on the Unraid system and their current state.
Returns:
List of VM information dictionaries with UUID, name, and state
"""
query = """
query ListVMs {
vms {
id
domains {
id
name
state
uuid
}
}
}
"""
try:
logger.info("Executing list_vms tool")
response_data = await make_graphql_request(query)
logger.info(f"VM query response: {response_data}")
if response_data.get("vms") and response_data["vms"].get("domains"):
vms = response_data["vms"]["domains"]
logger.info(f"Found {len(vms)} VMs")
return vms
else:
logger.info("No VMs found in domains field")
return []
except Exception as e:
logger.error(f"Error in list_vms: {e}", exc_info=True)
error_msg = str(e)
if "VMs are not available" in error_msg:
raise ToolError("VMs are not available on this Unraid server. This could mean: 1) VM support is not enabled, 2) VM service is not running, or 3) no VMs are configured. Check Unraid VM settings.")
else:
raise ToolError(f"Failed to list virtual machines: {error_msg}")
@mcp.tool()
async def manage_vm(vm_uuid: str, action: str) -> Dict[str, Any]:
"""Manages a VM: start, stop, pause, resume, force_stop, reboot, reset. Uses VM UUID.
Args:
vm_uuid: UUID of the VM to manage
action: Action to perform - one of: start, stop, pause, resume, forceStop, reboot, reset
Returns:
Dict containing operation success status and details
"""
valid_actions = ["start", "stop", "pause", "resume", "forceStop", "reboot", "reset"] # Added reset operation
if action not in valid_actions:
logger.warning(f"Invalid action '{action}' for manage_vm")
raise ToolError(f"Invalid action. Must be one of {valid_actions}.")
mutation_name = action
query = f"""
mutation ManageVM($id: PrefixedID!) {{
vm {{
{mutation_name}(id: $id)
}}
}}
"""
variables = {"id": vm_uuid}
try:
logger.info(f"Executing manage_vm tool: action={action}, uuid={vm_uuid}")
response_data = await make_graphql_request(query, variables)
if response_data.get("vm") and mutation_name in response_data["vm"]:
# Mutations for VM return Boolean for success
success = response_data["vm"][mutation_name]
return {"success": success, "action": action, "vm_uuid": vm_uuid}
raise ToolError(f"Failed to {action} VM or unexpected response structure.")
except Exception as e:
logger.error(f"Error in manage_vm ({action}): {e}", exc_info=True)
raise ToolError(f"Failed to {action} virtual machine: {str(e)}")
@mcp.tool()
async def get_vm_details(vm_identifier: str) -> Dict[str, Any]:
"""Retrieves detailed information for a specific VM by its UUID or name.
Args:
vm_identifier: VM UUID or name to retrieve details for
Returns:
Dict containing detailed VM information
"""
# Make direct GraphQL call instead of calling list_vms() tool
query = """
query GetVmDetails {
vms {
domains {
id
name
state
uuid
}
domain {
id
name
state
uuid
}
}
}
"""
try:
logger.info(f"Executing get_vm_details for identifier: {vm_identifier}")
response_data = await make_graphql_request(query)
if response_data.get("vms"):
vms_data = response_data["vms"]
# Try to get VMs from either domains or domain field
vms = vms_data.get("domains") or vms_data.get("domain") or []
if vms:
for vm_data in vms:
if (vm_data.get("uuid") == vm_identifier or
vm_data.get("id") == vm_identifier or
vm_data.get("name") == vm_identifier):
logger.info(f"Found VM {vm_identifier}")
return vm_data
logger.warning(f"VM with identifier '{vm_identifier}' not found.")
available_vms = [f"{vm.get('name')} (UUID: {vm.get('uuid')}, ID: {vm.get('id')})" for vm in vms]
raise ToolError(f"VM '{vm_identifier}' not found. Available VMs: {', '.join(available_vms)}")
else:
raise ToolError("No VMs available or VMs not accessible")
else:
raise ToolError("No VMs data returned from server")
except Exception as e:
logger.error(f"Error in get_vm_details: {e}", exc_info=True)
error_msg = str(e)
if "VMs are not available" in error_msg:
raise ToolError("VMs are not available on this Unraid server. This could mean: 1) VM support is not enabled, 2) VM service is not running, or 3) no VMs are configured. Check Unraid VM settings.")
else:
raise ToolError(f"Failed to retrieve VM details: {error_msg}")
logger.info("VM tools registered successfully")