Files
unraid-mcp/unraid_mcp/subscriptions/resources.py
Jacob Magar f5978d67ec feat(resources): add unraid://live/{action} MCP resources for 9 snapshot subscriptions
Registers cpu, memory, cpu_telemetry, array_state, parity_progress,
ups_status, notifications_overview, owner, and server_status as MCP
resources under unraid://live/{action}. Each opens a transient WebSocket
via subscribe_once() and returns JSON; exceptions degrade gracefully to
an error JSON dict rather than raising. Skips log_tail and
notification_feed (require params, not suitable as resources).
2026-03-15 21:51:20 -04:00

126 lines
4.7 KiB
Python

"""MCP resources that expose subscription data.
This module defines MCP resources that bridge between the subscription manager
and the MCP protocol, providing fallback queries when subscription data is unavailable.
"""
import asyncio
import json
import os
from typing import Final
import anyio
from fastmcp import FastMCP
from ..config.logging import logger
from .manager import subscription_manager
from .queries import SNAPSHOT_ACTIONS
from .snapshot import subscribe_once
# Global flag to track subscription startup
_subscriptions_started = False
_startup_lock: Final[asyncio.Lock] = asyncio.Lock()
async def ensure_subscriptions_started() -> None:
"""Ensure subscriptions are started, called from async context."""
global _subscriptions_started
# Fast-path: skip lock if already started
if _subscriptions_started:
return
# Slow-path: acquire lock for initialization (double-checked locking)
async with _startup_lock:
if _subscriptions_started:
return
logger.info("[STARTUP] First async operation detected, starting subscriptions...")
try:
await autostart_subscriptions()
_subscriptions_started = True
logger.info("[STARTUP] Subscriptions started successfully")
except Exception as e:
logger.error(f"[STARTUP] Failed to start subscriptions: {e}", exc_info=True)
async def autostart_subscriptions() -> None:
"""Auto-start all subscriptions marked for auto-start in SubscriptionManager."""
logger.info("[AUTOSTART] Initiating subscription auto-start process...")
try:
# Use the SubscriptionManager auto-start method
await subscription_manager.auto_start_all_subscriptions()
logger.info("[AUTOSTART] Auto-start process completed successfully")
except Exception as e:
logger.error(f"[AUTOSTART] Failed during auto-start process: {e}", exc_info=True)
raise # Propagate so ensure_subscriptions_started doesn't mark as started
# Optional log file subscription
log_path = os.getenv("UNRAID_AUTOSTART_LOG_PATH")
if log_path is None:
# Default to syslog if available
default_path = "/var/log/syslog"
if await anyio.Path(default_path).exists():
log_path = default_path
logger.info(f"[AUTOSTART] Using default log path: {default_path}")
if log_path:
try:
logger.info(f"[AUTOSTART] Starting log file subscription for: {log_path}")
config = subscription_manager.subscription_configs.get("logFileSubscription")
if config:
await subscription_manager.start_subscription(
"logFileSubscription", str(config["query"]), {"path": log_path}
)
logger.info(f"[AUTOSTART] Log file subscription started for: {log_path}")
else:
logger.error("[AUTOSTART] logFileSubscription config not found")
except Exception as e:
logger.error(f"[AUTOSTART] Failed to start log file subscription: {e}", exc_info=True)
else:
logger.info("[AUTOSTART] No log file path configured for auto-start")
def register_subscription_resources(mcp: FastMCP) -> None:
"""Register all subscription resources with the FastMCP instance.
Args:
mcp: FastMCP instance to register resources with
"""
@mcp.resource("unraid://logs/stream")
async def logs_stream_resource() -> str:
"""Real-time log stream data from subscription."""
await ensure_subscriptions_started()
data = await subscription_manager.get_resource_data("logFileSubscription")
if data:
return json.dumps(data, indent=2)
return json.dumps(
{
"status": "No subscription data yet",
"message": "Subscriptions auto-start on server boot. If this persists, check server logs for WebSocket/auth issues.",
}
)
def _make_resource_fn(action: str, query: str):
async def _live_resource() -> str:
await ensure_subscriptions_started()
try:
data = await subscribe_once(query)
return json.dumps(data, indent=2)
except Exception as exc:
return json.dumps({"error": str(exc), "action": action})
_live_resource.__name__ = f"{action}_resource"
_live_resource.__doc__ = (
f"Real-time {action.replace('_', ' ')} data via WebSocket subscription."
)
return _live_resource
for _action, _query in SNAPSHOT_ACTIONS.items():
mcp.resource(f"unraid://live/{_action}")(_make_resource_fn(_action, _query))
logger.info("Subscription resources registered successfully")