feat: add 28 GraphQL mutations across storage, info, docker, and new settings tool

- storage: flash_backup mutation (initiates rclone flash backup, destructive)
- info: update_server and update_ssh mutations
- docker: 11 organizer mutations (create_folder, set_folder_children,
  delete_entries, move_to_folder, move_to_position, rename_folder,
  create_folder_with_items, update_view_prefs, sync_templates,
  reset_template_mappings, refresh_digests); delete_entries and
  reset_template_mappings added to DESTRUCTIVE_ACTIONS
- settings: new unraid_settings tool with 9 mutations (update,
  update_temperature, update_time, configure_ups, update_api,
  connect_sign_in, connect_sign_out, setup_remote_access,
  enable_dynamic_remote_access); registered in server.py
- tests: 82 new tests (28 settings, 23 docker organizer, 7 info, 6 storage
  + 18 existing fixes for notification regex and safety audit list)
- bump version 0.3.0 → 0.4.0 (11 tools, ~104 actions)

🤖 Generated with Claude Code

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Jacob Magar
2026-03-13 03:03:37 -04:00
parent 4af1e74b4a
commit 9aee3a2448
11 changed files with 994 additions and 7 deletions

View File

@@ -97,9 +97,80 @@ MUTATIONS: dict[str, str] = {
docker { updateAllContainers { id names state status } }
}
""",
"create_folder": """
mutation CreateDockerFolder($name: String!, $parentId: String, $childrenIds: [String!]) {
createDockerFolder(name: $name, parentId: $parentId, childrenIds: $childrenIds) {
version views { id name rootId flatEntries { id type name parentId depth position path hasChildren childrenIds } }
}
}
""",
"set_folder_children": """
mutation SetDockerFolderChildren($folderId: String, $childrenIds: [String!]!) {
setDockerFolderChildren(folderId: $folderId, childrenIds: $childrenIds) {
version views { id name rootId flatEntries { id type name parentId depth position path hasChildren childrenIds } }
}
}
""",
"delete_entries": """
mutation DeleteDockerEntries($entryIds: [String!]!) {
deleteDockerEntries(entryIds: $entryIds) {
version views { id name rootId flatEntries { id type name parentId depth position path hasChildren childrenIds } }
}
}
""",
"move_to_folder": """
mutation MoveDockerEntriesToFolder($sourceEntryIds: [String!]!, $destinationFolderId: String!) {
moveDockerEntriesToFolder(sourceEntryIds: $sourceEntryIds, destinationFolderId: $destinationFolderId) {
version views { id name rootId flatEntries { id type name parentId depth position path hasChildren childrenIds } }
}
}
""",
"move_to_position": """
mutation MoveDockerItemsToPosition($sourceEntryIds: [String!]!, $destinationFolderId: String!, $position: Float!) {
moveDockerItemsToPosition(sourceEntryIds: $sourceEntryIds, destinationFolderId: $destinationFolderId, position: $position) {
version views { id name rootId flatEntries { id type name parentId depth position path hasChildren childrenIds } }
}
}
""",
"rename_folder": """
mutation RenameDockerFolder($folderId: String!, $newName: String!) {
renameDockerFolder(folderId: $folderId, newName: $newName) {
version views { id name rootId flatEntries { id type name parentId depth position path hasChildren childrenIds } }
}
}
""",
"create_folder_with_items": """
mutation CreateDockerFolderWithItems($name: String!, $parentId: String, $sourceEntryIds: [String!], $position: Float) {
createDockerFolderWithItems(name: $name, parentId: $parentId, sourceEntryIds: $sourceEntryIds, position: $position) {
version views { id name rootId flatEntries { id type name parentId depth position path hasChildren childrenIds } }
}
}
""",
"update_view_prefs": """
mutation UpdateDockerViewPreferences($viewId: String, $prefs: JSON!) {
updateDockerViewPreferences(viewId: $viewId, prefs: $prefs) {
version views { id name rootId }
}
}
""",
"sync_templates": """
mutation SyncDockerTemplatePaths {
syncDockerTemplatePaths { scanned matched skipped errors }
}
""",
"reset_template_mappings": """
mutation ResetDockerTemplateMappings {
resetDockerTemplateMappings
}
""",
"refresh_digests": """
mutation RefreshDockerDigests {
refreshDockerDigests
}
""",
}
DESTRUCTIVE_ACTIONS = {"remove", "update_all"}
DESTRUCTIVE_ACTIONS = {"remove", "update_all", "delete_entries", "reset_template_mappings"}
# NOTE (Code-M-07): "details" and "logs" are listed here because they require a
# container_id parameter, but unlike mutations they use fuzzy name matching (not
# strict). This is intentional: read-only queries are safe with fuzzy matching.
@@ -133,6 +204,17 @@ DOCKER_ACTIONS = Literal[
"network_details",
"port_conflicts",
"check_updates",
"create_folder",
"set_folder_children",
"delete_entries",
"move_to_folder",
"move_to_position",
"rename_folder",
"create_folder_with_items",
"update_view_prefs",
"sync_templates",
"reset_template_mappings",
"refresh_digests",
]
if set(get_args(DOCKER_ACTIONS)) != ALL_ACTIONS:
@@ -283,6 +365,17 @@ def register_docker_tool(mcp: FastMCP) -> None:
*,
confirm: bool = False,
tail_lines: int = 100,
folder_name: str | None = None,
folder_id: str | None = None,
parent_id: str | None = None,
children_ids: list[str] | None = None,
entry_ids: list[str] | None = None,
source_entry_ids: list[str] | None = None,
destination_folder_id: str | None = None,
position: float | None = None,
new_folder_name: str | None = None,
view_id: str = "default",
view_prefs: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Manage Docker containers, networks, and updates.
@@ -398,6 +491,100 @@ def register_docker_tool(mcp: FastMCP) -> None:
results = safe_get(data, "docker", "updateAllContainers", default=[])
return {"success": True, "action": "update_all", "containers": results}
# --- Docker organizer mutations ---
if action == "create_folder":
if not folder_name:
raise ToolError("folder_name is required for 'create_folder' action")
_vars: dict[str, Any] = {"name": folder_name}
if parent_id is not None:
_vars["parentId"] = parent_id
if children_ids is not None:
_vars["childrenIds"] = children_ids
data = await make_graphql_request(MUTATIONS["create_folder"], _vars)
return {"success": True, "action": "create_folder", "organizer": data.get("createDockerFolder")}
if action == "set_folder_children":
if not children_ids:
raise ToolError("children_ids is required for 'set_folder_children' action")
_vars = {"childrenIds": children_ids}
if folder_id is not None:
_vars["folderId"] = folder_id
data = await make_graphql_request(MUTATIONS["set_folder_children"], _vars)
return {"success": True, "action": "set_folder_children", "organizer": data.get("setDockerFolderChildren")}
if action == "delete_entries":
if not entry_ids:
raise ToolError("entry_ids is required for 'delete_entries' action")
data = await make_graphql_request(MUTATIONS["delete_entries"], {"entryIds": entry_ids})
return {"success": True, "action": "delete_entries", "organizer": data.get("deleteDockerEntries")}
if action == "move_to_folder":
if not source_entry_ids:
raise ToolError("source_entry_ids is required for 'move_to_folder' action")
if not destination_folder_id:
raise ToolError("destination_folder_id is required for 'move_to_folder' action")
data = await make_graphql_request(
MUTATIONS["move_to_folder"],
{"sourceEntryIds": source_entry_ids, "destinationFolderId": destination_folder_id},
)
return {"success": True, "action": "move_to_folder", "organizer": data.get("moveDockerEntriesToFolder")}
if action == "move_to_position":
if not source_entry_ids:
raise ToolError("source_entry_ids is required for 'move_to_position' action")
if not destination_folder_id:
raise ToolError("destination_folder_id is required for 'move_to_position' action")
if position is None:
raise ToolError("position is required for 'move_to_position' action")
data = await make_graphql_request(
MUTATIONS["move_to_position"],
{"sourceEntryIds": source_entry_ids, "destinationFolderId": destination_folder_id, "position": position},
)
return {"success": True, "action": "move_to_position", "organizer": data.get("moveDockerItemsToPosition")}
if action == "rename_folder":
if not folder_id:
raise ToolError("folder_id is required for 'rename_folder' action")
if not new_folder_name:
raise ToolError("new_folder_name is required for 'rename_folder' action")
data = await make_graphql_request(
MUTATIONS["rename_folder"], {"folderId": folder_id, "newName": new_folder_name}
)
return {"success": True, "action": "rename_folder", "organizer": data.get("renameDockerFolder")}
if action == "create_folder_with_items":
if not folder_name:
raise ToolError("folder_name is required for 'create_folder_with_items' action")
_vars = {"name": folder_name}
if parent_id is not None:
_vars["parentId"] = parent_id
if entry_ids is not None:
_vars["sourceEntryIds"] = entry_ids
if position is not None:
_vars["position"] = position
data = await make_graphql_request(MUTATIONS["create_folder_with_items"], _vars)
return {"success": True, "action": "create_folder_with_items", "organizer": data.get("createDockerFolderWithItems")}
if action == "update_view_prefs":
if view_prefs is None:
raise ToolError("view_prefs is required for 'update_view_prefs' action")
data = await make_graphql_request(
MUTATIONS["update_view_prefs"], {"viewId": view_id, "prefs": view_prefs}
)
return {"success": True, "action": "update_view_prefs", "organizer": data.get("updateDockerViewPreferences")}
if action == "sync_templates":
data = await make_graphql_request(MUTATIONS["sync_templates"])
return {"success": True, "action": "sync_templates", "result": data.get("syncDockerTemplatePaths")}
if action == "reset_template_mappings":
data = await make_graphql_request(MUTATIONS["reset_template_mappings"])
return {"success": True, "action": "reset_template_mappings", "result": data.get("resetDockerTemplateMappings")}
if action == "refresh_digests":
data = await make_graphql_request(MUTATIONS["refresh_digests"])
return {"success": True, "action": "refresh_digests", "result": data.get("refreshDockerDigests")}
# Single-container mutations
if action in MUTATIONS:
actual_id = await _resolve_container_id(container_id or "", strict=True)

View File

@@ -156,7 +156,22 @@ QUERIES: dict[str, str] = {
""",
}
ALL_ACTIONS = set(QUERIES)
MUTATIONS: dict[str, str] = {
"update_server": """
mutation UpdateServerIdentity($name: String!, $comment: String, $sysModel: String) {
updateServerIdentity(name: $name, comment: $comment, sysModel: $sysModel) {
id name comment status
}
}
""",
"update_ssh": """
mutation UpdateSshSettings($input: UpdateSshInput!) {
updateSshSettings(input: $input) { id useSsh portssh }
}
""",
}
ALL_ACTIONS = set(QUERIES) | set(MUTATIONS)
INFO_ACTIONS = Literal[
"overview",
@@ -178,6 +193,8 @@ INFO_ACTIONS = Literal[
"ups_devices",
"ups_device",
"ups_config",
"update_server",
"update_ssh",
]
if set(get_args(INFO_ACTIONS)) != ALL_ACTIONS:
@@ -310,6 +327,11 @@ def register_info_tool(mcp: FastMCP) -> None:
async def unraid_info(
action: INFO_ACTIONS,
device_id: str | None = None,
server_name: str | None = None,
server_comment: str | None = None,
sys_model: str | None = None,
ssh_enabled: bool | None = None,
ssh_port: int | None = None,
) -> dict[str, Any]:
"""Query Unraid system information.
@@ -333,6 +355,8 @@ def register_info_tool(mcp: FastMCP) -> None:
ups_devices - List UPS devices
ups_device - Single UPS device (requires device_id)
ups_config - UPS configuration
update_server - Update server name, comment, and model (requires server_name)
update_ssh - Enable/disable SSH and set port (requires ssh_enabled, ssh_port)
"""
if action not in ALL_ACTIONS:
raise ToolError(f"Invalid action '{action}'. Must be one of: {sorted(ALL_ACTIONS)}")
@@ -340,6 +364,40 @@ def register_info_tool(mcp: FastMCP) -> None:
if action == "ups_device" and not device_id:
raise ToolError("device_id is required for ups_device action")
# Mutation handlers — must return before query = QUERIES[action]
if action == "update_server":
if server_name is None:
raise ToolError("server_name is required for 'update_server' action")
variables_mut: dict[str, Any] = {"name": server_name}
if server_comment is not None:
variables_mut["comment"] = server_comment
if sys_model is not None:
variables_mut["sysModel"] = sys_model
with tool_error_handler("info", action, logger):
logger.info("Executing unraid_info action=update_server")
data = await make_graphql_request(MUTATIONS["update_server"], variables_mut)
return {
"success": True,
"action": "update_server",
"data": data.get("updateServerIdentity"),
}
if action == "update_ssh":
if ssh_enabled is None:
raise ToolError("ssh_enabled is required for 'update_ssh' action")
if ssh_port is None:
raise ToolError("ssh_port is required for 'update_ssh' action")
with tool_error_handler("info", action, logger):
logger.info("Executing unraid_info action=update_ssh")
data = await make_graphql_request(
MUTATIONS["update_ssh"], {"input": {"enabled": ssh_enabled, "port": ssh_port}}
)
return {
"success": True,
"action": "update_ssh",
"data": data.get("updateSshSettings"),
}
query = QUERIES[action]
variables: dict[str, Any] | None = None
if action == "ups_device":

View File

@@ -0,0 +1,248 @@
"""System settings, time, UPS, and remote access mutations.
Provides the `unraid_settings` tool with 9 actions for updating system
configuration, time settings, UPS, API settings, and Unraid Connect.
"""
from typing import Any, Literal, get_args
from fastmcp import FastMCP
from ..config.logging import logger
from ..core.client import make_graphql_request
from ..core.exceptions import ToolError, tool_error_handler
MUTATIONS: dict[str, str] = {
"update": """
mutation UpdateSettings($input: JSON!) {
updateSettings(input: $input) { restartRequired values warnings }
}
""",
"update_temperature": """
mutation UpdateTemperatureConfig($input: TemperatureConfigInput!) {
updateTemperatureConfig(input: $input)
}
""",
"update_time": """
mutation UpdateSystemTime($input: UpdateSystemTimeInput!) {
updateSystemTime(input: $input) { currentTime timeZone useNtp ntpServers }
}
""",
"configure_ups": """
mutation ConfigureUps($config: UPSConfigInput!) {
configureUps(config: $config)
}
""",
"update_api": """
mutation UpdateApiSettings($input: ConnectSettingsInput!) {
updateApiSettings(input: $input) { accessType forwardType port }
}
""",
"connect_sign_in": """
mutation ConnectSignIn($input: ConnectSignInInput!) {
connectSignIn(input: $input)
}
""",
"connect_sign_out": """
mutation ConnectSignOut {
connectSignOut
}
""",
"setup_remote_access": """
mutation SetupRemoteAccess($input: SetupRemoteAccessInput!) {
setupRemoteAccess(input: $input)
}
""",
"enable_dynamic_remote_access": """
mutation EnableDynamicRemoteAccess($input: EnableDynamicRemoteAccessInput!) {
enableDynamicRemoteAccess(input: $input)
}
""",
}
DESTRUCTIVE_ACTIONS = {"configure_ups", "setup_remote_access", "enable_dynamic_remote_access"}
ALL_ACTIONS = set(MUTATIONS)
SETTINGS_ACTIONS = Literal[
"update",
"update_temperature",
"update_time",
"configure_ups",
"update_api",
"connect_sign_in",
"connect_sign_out",
"setup_remote_access",
"enable_dynamic_remote_access",
]
if set(get_args(SETTINGS_ACTIONS)) != ALL_ACTIONS:
_missing = ALL_ACTIONS - set(get_args(SETTINGS_ACTIONS))
_extra = set(get_args(SETTINGS_ACTIONS)) - ALL_ACTIONS
raise RuntimeError(
f"SETTINGS_ACTIONS and ALL_ACTIONS are out of sync. "
f"Missing from Literal: {_missing or 'none'}. Extra in Literal: {_extra or 'none'}"
)
def register_settings_tool(mcp: FastMCP) -> None:
"""Register the unraid_settings tool with the FastMCP instance."""
@mcp.tool()
async def unraid_settings(
action: SETTINGS_ACTIONS,
confirm: bool = False,
settings_input: dict[str, Any] | None = None,
temperature_config: dict[str, Any] | None = None,
time_zone: str | None = None,
use_ntp: bool | None = None,
ntp_servers: list[str] | None = None,
manual_datetime: str | None = None,
ups_config: dict[str, Any] | None = None,
access_type: str | None = None,
forward_type: str | None = None,
port: int | None = None,
api_key: str | None = None,
username: str | None = None,
email: str | None = None,
avatar: str | None = None,
access_url_type: str | None = None,
access_url_name: str | None = None,
access_url_ipv4: str | None = None,
access_url_ipv6: str | None = None,
dynamic_enabled: bool | None = None,
) -> dict[str, Any]:
"""Update Unraid system settings, time, UPS, and remote access configuration.
Actions:
update - Update system settings (requires settings_input dict)
update_temperature - Update temperature sensor config (requires temperature_config dict)
update_time - Update time/timezone/NTP (requires at least one of: time_zone, use_ntp, ntp_servers, manual_datetime)
configure_ups - Configure UPS monitoring (requires ups_config dict, confirm=True)
update_api - Update API/Connect settings (requires at least one of: access_type, forward_type, port)
connect_sign_in - Sign in to Unraid Connect (requires api_key)
connect_sign_out - Sign out from Unraid Connect
setup_remote_access - Configure remote access (requires access_type, confirm=True)
enable_dynamic_remote_access - Enable/disable dynamic remote access (requires access_url_type, dynamic_enabled, confirm=True)
"""
if action not in ALL_ACTIONS:
raise ToolError(f"Invalid action '{action}'. Must be one of: {sorted(ALL_ACTIONS)}")
if action in DESTRUCTIVE_ACTIONS and not confirm:
raise ToolError(f"Action '{action}' is destructive. Set confirm=True to proceed.")
with tool_error_handler("settings", action, logger):
logger.info(f"Executing unraid_settings action={action}")
if action == "update":
if settings_input is None:
raise ToolError("settings_input is required for 'update' action")
data = await make_graphql_request(MUTATIONS["update"], {"input": settings_input})
return {"success": True, "action": "update", "data": data.get("updateSettings")}
if action == "update_temperature":
if temperature_config is None:
raise ToolError("temperature_config is required for 'update_temperature' action")
data = await make_graphql_request(
MUTATIONS["update_temperature"], {"input": temperature_config}
)
return {"success": True, "action": "update_temperature", "result": data.get("updateTemperatureConfig")}
if action == "update_time":
time_input: dict[str, Any] = {}
if time_zone is not None:
time_input["timeZone"] = time_zone
if use_ntp is not None:
time_input["useNtp"] = use_ntp
if ntp_servers is not None:
time_input["ntpServers"] = ntp_servers
if manual_datetime is not None:
time_input["manualDateTime"] = manual_datetime
if not time_input:
raise ToolError(
"update_time requires at least one of: time_zone, use_ntp, ntp_servers, manual_datetime"
)
data = await make_graphql_request(MUTATIONS["update_time"], {"input": time_input})
return {"success": True, "action": "update_time", "data": data.get("updateSystemTime")}
if action == "configure_ups":
if ups_config is None:
raise ToolError("ups_config is required for 'configure_ups' action")
data = await make_graphql_request(MUTATIONS["configure_ups"], {"config": ups_config})
return {"success": True, "action": "configure_ups", "result": data.get("configureUps")}
if action == "update_api":
api_input: dict[str, Any] = {}
if access_type is not None:
api_input["accessType"] = access_type
if forward_type is not None:
api_input["forwardType"] = forward_type
if port is not None:
api_input["port"] = port
if not api_input:
raise ToolError(
"update_api requires at least one of: access_type, forward_type, port"
)
data = await make_graphql_request(MUTATIONS["update_api"], {"input": api_input})
return {"success": True, "action": "update_api", "data": data.get("updateApiSettings")}
if action == "connect_sign_in":
if not api_key:
raise ToolError("api_key is required for 'connect_sign_in' action")
sign_in_input: dict[str, Any] = {"apiKey": api_key}
if username or email:
user_info: dict[str, Any] = {}
if username:
user_info["preferred_username"] = username
if email:
user_info["email"] = email
if avatar:
user_info["avatar"] = avatar
sign_in_input["userInfo"] = user_info
data = await make_graphql_request(
MUTATIONS["connect_sign_in"], {"input": sign_in_input}
)
return {"success": True, "action": "connect_sign_in", "result": data.get("connectSignIn")}
if action == "connect_sign_out":
data = await make_graphql_request(MUTATIONS["connect_sign_out"])
return {"success": True, "action": "connect_sign_out", "result": data.get("connectSignOut")}
if action == "setup_remote_access":
if not access_type:
raise ToolError("access_type is required for 'setup_remote_access' action")
remote_input: dict[str, Any] = {"accessType": access_type}
if forward_type is not None:
remote_input["forwardType"] = forward_type
if port is not None:
remote_input["port"] = port
data = await make_graphql_request(
MUTATIONS["setup_remote_access"], {"input": remote_input}
)
return {"success": True, "action": "setup_remote_access", "result": data.get("setupRemoteAccess")}
if action == "enable_dynamic_remote_access":
if not access_url_type:
raise ToolError(
"access_url_type is required for 'enable_dynamic_remote_access' action"
)
if dynamic_enabled is None:
raise ToolError(
"dynamic_enabled is required for 'enable_dynamic_remote_access' action"
)
url_input: dict[str, Any] = {"type": access_url_type}
if access_url_name is not None:
url_input["name"] = access_url_name
if access_url_ipv4 is not None:
url_input["ipv4"] = access_url_ipv4
if access_url_ipv6 is not None:
url_input["ipv6"] = access_url_ipv6
data = await make_graphql_request(
MUTATIONS["enable_dynamic_remote_access"],
{"input": {"url": url_input, "enabled": dynamic_enabled}},
)
return {"success": True, "action": "enable_dynamic_remote_access", "result": data.get("enableDynamicRemoteAccess")}
raise ToolError(f"Unhandled action '{action}' — this is a bug")
logger.info("Settings tool registered successfully")

View File

@@ -58,7 +58,16 @@ QUERIES: dict[str, str] = {
""",
}
ALL_ACTIONS = set(QUERIES)
MUTATIONS: dict[str, str] = {
"flash_backup": """
mutation InitiateFlashBackup($input: InitiateFlashBackupInput!) {
initiateFlashBackup(input: $input) { status jobId }
}
""",
}
DESTRUCTIVE_ACTIONS = {"flash_backup"}
ALL_ACTIONS = set(QUERIES) | set(MUTATIONS)
STORAGE_ACTIONS = Literal[
"shares",
@@ -67,6 +76,7 @@ STORAGE_ACTIONS = Literal[
"unassigned",
"log_files",
"logs",
"flash_backup",
]
if set(get_args(STORAGE_ACTIONS)) != ALL_ACTIONS:
@@ -87,6 +97,11 @@ def register_storage_tool(mcp: FastMCP) -> None:
disk_id: str | None = None,
log_path: str | None = None,
tail_lines: int = 100,
confirm: bool = False,
remote_name: str | None = None,
source_path: str | None = None,
destination_path: str | None = None,
backup_options: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Manage Unraid storage, disks, and logs.
@@ -97,10 +112,14 @@ def register_storage_tool(mcp: FastMCP) -> None:
unassigned - List unassigned devices
log_files - List available log files
logs - Retrieve log content (requires log_path, optional tail_lines)
flash_backup - Initiate flash backup via rclone (requires remote_name, source_path, destination_path, confirm=True)
"""
if action not in ALL_ACTIONS:
raise ToolError(f"Invalid action '{action}'. Must be one of: {sorted(ALL_ACTIONS)}")
if action in DESTRUCTIVE_ACTIONS and not confirm:
raise ToolError(f"Action '{action}' is destructive. Set confirm=True to proceed.")
if action == "disk_details" and not disk_id:
raise ToolError("disk_id is required for 'disk_details' action")
@@ -121,6 +140,29 @@ def register_storage_tool(mcp: FastMCP) -> None:
)
log_path = normalized
if action == "flash_backup":
if not remote_name:
raise ToolError("remote_name is required for 'flash_backup' action")
if not source_path:
raise ToolError("source_path is required for 'flash_backup' action")
if not destination_path:
raise ToolError("destination_path is required for 'flash_backup' action")
input_data: dict[str, Any] = {
"remoteName": remote_name,
"sourcePath": source_path,
"destinationPath": destination_path,
}
if backup_options is not None:
input_data["options"] = backup_options
with tool_error_handler("storage", action, logger):
logger.info("Executing unraid_storage action=flash_backup")
data = await make_graphql_request(MUTATIONS["flash_backup"], {"input": input_data})
return {
"success": True,
"action": "flash_backup",
"data": data.get("initiateFlashBackup"),
}
query = QUERIES[action]
variables: dict[str, Any] | None = None
custom_timeout = DISK_TIMEOUT if action in ("disks", "disk_details") else None