Files
unraid-mcp/unraid_mcp/tools/storage.py
Jacob Magar 482da4485d fix: flash_backup validation, smoke test assertions, docker/notification test coverage
- storage.py: validate initiateFlashBackup response before returning success=True
- test-tools.sh: remove set -e/inherit_errexit; add success assertion to smoke tests
- test_destructive_guards.py: add confirm-guard tests for new docker destructive actions
- test_docker.py: assert mutation variables in organizer tests; add items branch test
- test_query_validation.py: add 5 missing notification mutation schema test methods
- test_notifications.py: use lowercase importance to test uppercasing logic

Resolves review threads PRRT_kwDOO6Hdxs50FgPb PRRT_kwDOO6Hdxs50FgO4 PRRT_kwDOO6Hdxs50FgO8 PRRT_kwDOO6Hdxs50FgPI PRRT_kwDOO6Hdxs50FgPL PRRT_kwDOO6Hdxs50FgPm PRRT_kwDOO6Hdxs50E2iK PRRT_kwDOO6Hdxs50E2im
2026-03-13 10:41:43 -04:00

218 lines
7.8 KiB
Python

"""Storage and disk management.
Provides the `unraid_storage` tool with 6 actions for shares, physical disks,
unassigned devices, log files, and log content retrieval.
"""
import os
from typing import Any, Literal, get_args
from fastmcp import FastMCP
from ..config.logging import logger
from ..core.client import DISK_TIMEOUT, make_graphql_request
from ..core.exceptions import ToolError, tool_error_handler
from ..core.utils import format_bytes
_ALLOWED_LOG_PREFIXES = ("/var/log/", "/boot/logs/", "/mnt/")
_MAX_TAIL_LINES = 10_000
QUERIES: dict[str, str] = {
"shares": """
query GetSharesInfo {
shares {
id name free used size include exclude cache nameOrig
comment allocator splitLevel floor cow color luksStatus
}
}
""",
"disks": """
query ListPhysicalDisks {
disks { id device name }
}
""",
"disk_details": """
query GetDiskDetails($id: PrefixedID!) {
disk(id: $id) {
id device name serialNum size temperature
}
}
""",
"unassigned": """
query GetUnassignedDevices {
unassignedDevices { id device name size type }
}
""",
"log_files": """
query ListLogFiles {
logFiles { name path size modifiedAt }
}
""",
"logs": """
query GetLogContent($path: String!, $lines: Int) {
logFile(path: $path, lines: $lines) {
path content totalLines startLine
}
}
""",
}
MUTATIONS: dict[str, str] = {
"flash_backup": """
mutation InitiateFlashBackup($input: InitiateFlashBackupInput!) {
initiateFlashBackup(input: $input) { status jobId }
}
""",
}
DESTRUCTIVE_ACTIONS = {"flash_backup"}
ALL_ACTIONS = set(QUERIES) | set(MUTATIONS)
STORAGE_ACTIONS = Literal[
"shares",
"disks",
"disk_details",
"unassigned",
"log_files",
"logs",
"flash_backup",
]
if set(get_args(STORAGE_ACTIONS)) != ALL_ACTIONS:
_missing = ALL_ACTIONS - set(get_args(STORAGE_ACTIONS))
_extra = set(get_args(STORAGE_ACTIONS)) - ALL_ACTIONS
raise RuntimeError(
f"STORAGE_ACTIONS and ALL_ACTIONS are out of sync. "
f"Missing from Literal: {_missing or 'none'}. Extra in Literal: {_extra or 'none'}"
)
def register_storage_tool(mcp: FastMCP) -> None:
"""Register the unraid_storage tool with the FastMCP instance."""
@mcp.tool()
async def unraid_storage(
action: STORAGE_ACTIONS,
disk_id: str | None = None,
log_path: str | None = None,
tail_lines: int = 100,
confirm: bool = False,
remote_name: str | None = None,
source_path: str | None = None,
destination_path: str | None = None,
backup_options: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Manage Unraid storage, disks, and logs.
Actions:
shares - List all user shares with capacity info
disks - List all physical disks
disk_details - Detailed SMART info for a disk (requires disk_id)
unassigned - List unassigned devices
log_files - List available log files
logs - Retrieve log content (requires log_path, optional tail_lines)
flash_backup - Initiate flash backup via rclone (requires remote_name, source_path, destination_path, confirm=True)
"""
if action not in ALL_ACTIONS:
raise ToolError(f"Invalid action '{action}'. Must be one of: {sorted(ALL_ACTIONS)}")
if action in DESTRUCTIVE_ACTIONS and not confirm:
raise ToolError(f"Action '{action}' is destructive. Set confirm=True to proceed.")
if action == "disk_details" and not disk_id:
raise ToolError("disk_id is required for 'disk_details' action")
if action == "logs" and (tail_lines < 1 or tail_lines > _MAX_TAIL_LINES):
raise ToolError(f"tail_lines must be between 1 and {_MAX_TAIL_LINES}, got {tail_lines}")
if action == "logs":
if not log_path:
raise ToolError("log_path is required for 'logs' action")
# Resolve path synchronously to prevent traversal attacks.
# Using os.path.realpath instead of anyio.Path.resolve() because the
# async variant blocks on NFS-mounted paths under /mnt/ (Perf-AI-1).
normalized = os.path.realpath(log_path) # noqa: ASYNC240
if not any(normalized.startswith(p) for p in _ALLOWED_LOG_PREFIXES):
raise ToolError(
f"log_path must start with one of: {', '.join(_ALLOWED_LOG_PREFIXES)}. "
f"Use log_files action to discover valid paths."
)
log_path = normalized
if action == "flash_backup":
if not remote_name:
raise ToolError("remote_name is required for 'flash_backup' action")
if not source_path:
raise ToolError("source_path is required for 'flash_backup' action")
if not destination_path:
raise ToolError("destination_path is required for 'flash_backup' action")
input_data: dict[str, Any] = {
"remoteName": remote_name,
"sourcePath": source_path,
"destinationPath": destination_path,
}
if backup_options is not None:
input_data["options"] = backup_options
with tool_error_handler("storage", action, logger):
logger.info("Executing unraid_storage action=flash_backup")
data = await make_graphql_request(MUTATIONS["flash_backup"], {"input": input_data})
backup = data.get("initiateFlashBackup")
if not backup:
raise ToolError("Failed to start flash backup: no confirmation from server")
return {
"success": True,
"action": "flash_backup",
"data": backup,
}
query = QUERIES[action]
variables: dict[str, Any] | None = None
custom_timeout = DISK_TIMEOUT if action in ("disks", "disk_details") else None
if action == "disk_details":
variables = {"id": disk_id}
elif action == "logs":
variables = {"path": log_path, "lines": tail_lines}
with tool_error_handler("storage", action, logger):
logger.info(f"Executing unraid_storage action={action}")
data = await make_graphql_request(query, variables, custom_timeout=custom_timeout)
if action == "shares":
return {"shares": data.get("shares", [])}
if action == "disks":
return {"disks": data.get("disks", [])}
if action == "disk_details":
raw = data.get("disk", {})
if not raw:
raise ToolError(f"Disk '{disk_id}' not found")
summary = {
"disk_id": raw.get("id"),
"device": raw.get("device"),
"name": raw.get("name"),
"serial_number": raw.get("serialNum"),
"size_formatted": format_bytes(raw.get("size")),
"temperature": (
f"{raw['temperature']}\u00b0C"
if raw.get("temperature") is not None
else "N/A"
),
}
return {"summary": summary, "details": raw}
if action == "unassigned":
return {"devices": data.get("unassignedDevices", [])}
if action == "log_files":
return {"log_files": data.get("logFiles", [])}
if action == "logs":
return dict(data.get("logFile") or {})
raise ToolError(f"Unhandled action '{action}' — this is a bug")
logger.info("Storage tool registered successfully")