fix: split subscription_lock, fix safe_get None semantics, validate notification enums

P-01: Replace single subscription_lock with two fine-grained locks:
- _task_lock guards active_subscriptions (task lifecycle operations)
- _data_lock guards resource_data (WebSocket message writes and reads)
Eliminates serialization between WebSocket updates and tool reads.

CQ-05: safe_get now preserves explicit None at terminal key.
Uses sentinel _MISSING to distinguish "key absent" (returns default)
from "key=null" (returns None). Fixes conflation that masked
intentional null values from the Unraid API.

SEC-M04: Validate list_type, importance, and notification_type against
known enums before dispatching to GraphQL. Prevents wasting rate-limited
requests on invalid values and avoids leaking schema details in errors.
This commit is contained in:
Jacob Magar
2026-03-13 02:44:26 -04:00
parent bdb2155366
commit ac5639301c
5 changed files with 794 additions and 14 deletions

View File

@@ -80,7 +80,13 @@ class SubscriptionManager:
def __init__(self) -> None:
self.active_subscriptions: dict[str, asyncio.Task[None]] = {}
self.resource_data: dict[str, SubscriptionData] = {}
self.subscription_lock = asyncio.Lock()
# Two fine-grained locks instead of one coarse lock (P-01):
# _task_lock guards active_subscriptions dict (task lifecycle).
# _data_lock guards resource_data dict (WebSocket message writes + reads).
# Splitting prevents WebSocket message updates from blocking tool reads
# of active_subscriptions and vice versa.
self._task_lock = asyncio.Lock()
self._data_lock = asyncio.Lock()
# Configuration
self.auto_start_enabled = (
@@ -161,7 +167,7 @@ class SubscriptionManager:
self.connection_states[subscription_name] = "starting"
self._connection_start_times.pop(subscription_name, None)
async with self.subscription_lock:
async with self._task_lock:
try:
task = asyncio.create_task(
self._subscription_loop(subscription_name, query, variables or {})
@@ -183,7 +189,7 @@ class SubscriptionManager:
"""Stop a specific subscription."""
logger.info(f"[SUBSCRIPTION:{subscription_name}] Stopping subscription...")
async with self.subscription_lock:
async with self._task_lock:
if subscription_name in self.active_subscriptions:
task = self.active_subscriptions[subscription_name]
task.cancel()
@@ -392,7 +398,7 @@ class SubscriptionManager:
last_updated=datetime.now(UTC),
subscription_type=subscription_name,
)
async with self.subscription_lock:
async with self._data_lock:
self.resource_data[subscription_name] = new_entry
logger.debug(
f"[RESOURCE:{subscription_name}] Resource data updated successfully"
@@ -531,7 +537,7 @@ class SubscriptionManager:
# The while loop exited (via break or max_retries exceeded).
# Remove from active_subscriptions so start_subscription() can restart it.
async with self.subscription_lock:
async with self._task_lock:
self.active_subscriptions.pop(subscription_name, None)
logger.info(
f"[SUBSCRIPTION:{subscription_name}] Subscription loop ended — "
@@ -543,7 +549,7 @@ class SubscriptionManager:
"""Get current resource data with enhanced logging."""
logger.debug(f"[RESOURCE:{resource_name}] Resource data requested")
async with self.subscription_lock:
async with self._data_lock:
if resource_name in self.resource_data:
data = self.resource_data[resource_name]
age_seconds = (datetime.now(UTC) - data.last_updated).total_seconds()
@@ -562,7 +568,7 @@ class SubscriptionManager:
"""Get detailed status of all subscriptions for diagnostics."""
status = {}
async with self.subscription_lock:
async with self._task_lock, self._data_lock:
for sub_name, config in self.subscription_configs.items():
sub_status = {
"config": {