Files
unraid-mcp/unraid_mcp/subscriptions/utils.py
Jacob Magar 2b777be927 fix(security): path traversal, timing-safe auth, stale credential bindings
Security:
- Remove /mnt/ from _ALLOWED_LOG_PREFIXES to prevent Unraid share exposure
- Add early .. detection for disk/logs and live/log_tail path validation
- Add /boot/ prefix restriction for flash_backup source_path
- Use hmac.compare_digest for timing-safe API key verification in server.py
- Gate include_traceback on DEBUG log level (no tracebacks in production)

Correctness:
- Re-raise CredentialsNotConfiguredError in health check instead of swallowing
- Fix ups_device query (remove non-existent nominalPower/currentPower fields)

Best practices (BP-01, BP-05, BP-06):
- Add # noqa: ASYNC109 to timeout params in _handle_live and unraid()
- Fix start_array* → start_array in docstring (not in ARRAY_DESTRUCTIVE)
- Remove from __future__ import annotations from snapshot.py
- Replace import-time UNRAID_API_KEY/URL bindings with _settings.ATTR pattern
  in manager.py, snapshot.py, utils.py, diagnostics.py — fixes stale binding
  after apply_runtime_config() post-elicitation (BP-05)

CI/CD:
- Add .github/workflows/ci.yml (5-job pipeline: lint, typecheck, test, version-sync, audit)
- Add fail_under = 80 to [tool.coverage.report]
- Add version sync check to scripts/validate-marketplace.sh

Documentation:
- Sync plugin.json version 1.1.1 → 1.1.2 with pyproject.toml
- Update CLAUDE.md: 3 tools, system domain count 18, scripts comment fix
- Update README.md: 3 tools, security notes
- Update docs/AUTHENTICATION.md: H1 title fix
- Add UNRAID_CREDENTIALS_DIR to .env.example

Bump: 1.1.1 → 1.1.2

Co-Authored-By: Claude <noreply@anthropic.com>
2026-03-23 11:37:05 -04:00

99 lines
3.4 KiB
Python

"""Shared utilities for the subscription system."""
import ssl as _ssl
from typing import Any
from ..config import settings as _settings
def build_ws_url() -> str:
"""Build a WebSocket URL from the configured UNRAID_API_URL setting.
Converts http(s) scheme to ws(s) and ensures /graphql path suffix.
Returns:
The WebSocket URL string (e.g. "wss://10.1.0.2:31337/graphql").
Raises:
ValueError: If UNRAID_API_URL is not configured or has an unrecognised scheme.
"""
if not _settings.UNRAID_API_URL:
raise ValueError("UNRAID_API_URL is not configured")
if _settings.UNRAID_API_URL.startswith("https://"):
ws_url = "wss://" + _settings.UNRAID_API_URL[len("https://") :]
elif _settings.UNRAID_API_URL.startswith("http://"):
ws_url = "ws://" + _settings.UNRAID_API_URL[len("http://") :]
elif _settings.UNRAID_API_URL.startswith(("ws://", "wss://")):
ws_url = _settings.UNRAID_API_URL # Already a WebSocket URL
else:
raise ValueError(
f"UNRAID_API_URL must start with http://, https://, ws://, or wss://. "
f"Got: {_settings.UNRAID_API_URL[:20]}..."
)
if not ws_url.endswith("/graphql"):
ws_url = ws_url.rstrip("/") + "/graphql"
return ws_url
def build_ws_ssl_context(ws_url: str) -> _ssl.SSLContext | None:
"""Build an SSL context for WebSocket connections when using wss://.
Args:
ws_url: The WebSocket URL to connect to.
Returns:
An SSLContext configured per _settings.UNRAID_VERIFY_SSL, or None for non-TLS URLs.
"""
if not ws_url.startswith("wss://"):
return None
if isinstance(_settings.UNRAID_VERIFY_SSL, str):
return _ssl.create_default_context(cafile=_settings.UNRAID_VERIFY_SSL)
if _settings.UNRAID_VERIFY_SSL:
return _ssl.create_default_context()
# Explicitly disable verification (equivalent to verify=False)
ctx = _ssl.SSLContext(_ssl.PROTOCOL_TLS_CLIENT)
ctx.check_hostname = False
ctx.verify_mode = _ssl.CERT_NONE
return ctx
def _analyze_subscription_status(
status: dict[str, Any],
) -> tuple[int, list[dict[str, Any]]]:
"""Analyze subscription status dict, returning error count and connection issues.
Only reports connection_issues for subscriptions that are currently in a
failure state (not recovered ones that happen to have a stale last_error).
Args:
status: Dict of subscription name -> status info from get_subscription_status().
Returns:
Tuple of (error_count, connection_issues_list).
"""
_error_states = frozenset(
{"error", "auth_failed", "timeout", "max_retries_exceeded", "invalid_uri"}
)
error_count = 0
connection_issues: list[dict[str, Any]] = []
for sub_name, sub_status in status.items():
runtime = sub_status.get("runtime", {})
conn_state = runtime.get("connection_state", "unknown")
if conn_state in _error_states:
error_count += 1
# Gate on current failure state so recovered subscriptions are not reported
if runtime.get("last_error") and conn_state in _error_states:
connection_issues.append(
{
"subscription": sub_name,
"state": conn_state,
"error": runtime["last_error"],
}
)
return error_count, connection_issues