Files
unraid-mcp/unraid_mcp/core/setup.py
Jacob Magar 252ec520d1 fix(lint): remove __future__ annotations from new tools, fix 4 failing tests
- Remove `from __future__ import annotations` from array.py, live.py,
  oidc.py, plugins.py to match existing tool pattern and resolve TC002
  ruff errors (fastmcp imports only needed in annotations under PEP 563)
- Add `# noqa: ASYNC109` to live.py timeout parameter (asyncio.timeout
  already used internally)
- Fix test_network_sends_correct_query: query name is GetNetworkInfo
- Fix test_delete_requires_confirm: match "not confirmed" not "destructive"
- Fix test_destructive_set_matches_audit[settings]: add setup_remote_access
  and enable_dynamic_remote_access to KNOWN_DESTRUCTIVE
- Fix test_logs: update mock to dict format {lines: [{timestamp, message}]}

742 tests passing, ruff clean
2026-03-15 19:57:46 -04:00

173 lines
5.8 KiB
Python

"""Interactive credential setup via MCP elicitation.
When UNRAID_API_URL or UNRAID_API_KEY are absent, tools call
`elicit_and_configure(ctx)` to collect them from the user and persist
them to ~/.unraid-mcp/.env with restricted permissions.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from fastmcp import Context
from ..config.logging import logger
from ..config.settings import (
CREDENTIALS_DIR,
CREDENTIALS_ENV_PATH,
PROJECT_ROOT,
apply_runtime_config,
)
@dataclass
class _UnraidCredentials:
api_url: str
api_key: str
async def elicit_destructive_confirmation(
ctx: Context | None, action: str, description: str
) -> bool:
"""Prompt the user to confirm a destructive action via MCP elicitation.
Args:
ctx: The MCP context for elicitation. If None, returns False immediately.
action: The action name (for display in the prompt).
description: Human-readable description of what the action will do.
Returns:
True if the user accepted, False if declined, cancelled, or no context.
"""
if ctx is None:
logger.warning(
"Cannot elicit confirmation for '%s': no MCP context available. "
"Re-run with confirm=True to bypass elicitation.",
action,
)
return False
try:
result = await ctx.elicit(
message=(
f"**Confirm destructive action: `{action}`**\n\n"
f"{description}\n\n"
"Are you sure you want to proceed?"
),
response_type=bool,
)
except NotImplementedError:
logger.warning(
"MCP client does not support elicitation for action '%s'. "
"Re-run with confirm=True to bypass.",
action,
)
return False
if result.action != "accept":
logger.info("Destructive action '%s' declined by user (%s).", action, result.action)
return False
confirmed: bool = result.data # type: ignore[union-attr]
if not confirmed:
logger.info("Destructive action '%s' not confirmed by user.", action)
return confirmed
async def elicit_and_configure(ctx: Context | None) -> bool:
"""Prompt the user for Unraid credentials via MCP elicitation.
Writes accepted credentials to CREDENTIALS_ENV_PATH and applies them
to the running process via apply_runtime_config().
Args:
ctx: The MCP context for elicitation. If None, returns False immediately
(no context available to prompt the user).
Returns:
True if credentials were accepted and applied, False if declined/cancelled
or if the MCP client does not support elicitation.
"""
if ctx is None:
logger.warning(
"Cannot elicit credentials: no MCP context available. "
"Run unraid_health action=setup to configure credentials."
)
return False
try:
result = await ctx.elicit(
message=(
"Unraid MCP needs your Unraid server credentials to connect.\n\n"
"• **API URL**: Your Unraid GraphQL endpoint "
"(e.g. `https://10-1-0-2.xxx.myunraid.net:31337`)\n"
"• **API Key**: Found in Unraid → Settings → Management Access → API Keys"
),
response_type=_UnraidCredentials,
)
except NotImplementedError:
logger.warning(
"MCP client does not support elicitation. "
"Use unraid_health action=setup or create %s manually.",
CREDENTIALS_ENV_PATH,
)
return False
if result.action != "accept":
logger.warning("Credential elicitation %s — server remains unconfigured.", result.action)
return False
api_url: str = result.data.api_url.rstrip("/") # type: ignore[union-attr]
api_key: str = result.data.api_key.strip() # type: ignore[union-attr]
_write_env(api_url, api_key)
apply_runtime_config(api_url, api_key)
logger.info("Credentials configured via elicitation and persisted to %s.", CREDENTIALS_ENV_PATH)
return True
def _write_env(api_url: str, api_key: str) -> None:
"""Write or update credentials in CREDENTIALS_ENV_PATH.
Creates CREDENTIALS_DIR (mode 700) if needed. On first run, seeds from
.env.example to preserve comments and structure. Sets file mode to 600.
"""
# Ensure directory exists with restricted permissions (chmod after to bypass umask)
CREDENTIALS_DIR.mkdir(parents=True, exist_ok=True)
CREDENTIALS_DIR.chmod(0o700)
if CREDENTIALS_ENV_PATH.exists():
template_lines = CREDENTIALS_ENV_PATH.read_text().splitlines()
else:
example_path = PROJECT_ROOT / ".env.example"
template_lines = example_path.read_text().splitlines() if example_path.exists() else []
# Replace credentials in-place; append at end if not found in template
url_written = False
key_written = False
new_lines: list[str] = []
for line in template_lines:
stripped = line.strip()
if stripped.startswith("UNRAID_API_URL="):
new_lines.append(f"UNRAID_API_URL={api_url}")
url_written = True
elif stripped.startswith("UNRAID_API_KEY="):
new_lines.append(f"UNRAID_API_KEY={api_key}")
key_written = True
else:
new_lines.append(line)
# If not found in template (empty or missing keys), append at end
if not url_written:
new_lines.append(f"UNRAID_API_URL={api_url}")
if not key_written:
new_lines.append(f"UNRAID_API_KEY={api_key}")
CREDENTIALS_ENV_PATH.write_text("\n".join(new_lines) + "\n")
CREDENTIALS_ENV_PATH.chmod(0o600)
logger.info("Credentials written to %s (mode 600)", CREDENTIALS_ENV_PATH)