feat: editable guardrails, refs NOISSUE

This commit is contained in:
2026-04-11 11:12:50 +02:00
parent 798bb218f8
commit e35db0a361
9 changed files with 1559 additions and 103 deletions

View File

@@ -13,6 +13,7 @@ The NiceGUI frontend provides:
from __future__ import annotations
import asyncio
from contextlib import asynccontextmanager
import json
import re
@@ -29,6 +30,7 @@ try:
from . import database as database_module
from .agents.change_summary import ChangeSummaryGenerator
from .agents.database_manager import DatabaseManager
from .agents.home_assistant import HomeAssistantAgent
from .agents.request_interpreter import RequestInterpreter
from .agents.llm_service import LLMServiceClient
from .agents.orchestrator import AgentOrchestrator
@@ -41,6 +43,7 @@ except ImportError:
import database as database_module
from agents.change_summary import ChangeSummaryGenerator
from agents.database_manager import DatabaseManager
from agents.home_assistant import HomeAssistantAgent
from agents.request_interpreter import RequestInterpreter
from agents.llm_service import LLMServiceClient
from agents.orchestrator import AgentOrchestrator
@@ -59,7 +62,18 @@ async def lifespan(_app: FastAPI):
print(
f"Runtime configuration: database_backend={runtime['backend']} target={runtime['target']}"
)
yield
queue_worker = None
if database_module.settings.prompt_queue_enabled and database_module.settings.prompt_queue_auto_process:
queue_worker = asyncio.create_task(_prompt_queue_worker())
try:
yield
finally:
if queue_worker is not None:
queue_worker.cancel()
try:
await queue_worker
except asyncio.CancelledError:
pass
app = FastAPI(lifespan=lifespan)
@@ -94,6 +108,20 @@ class FreeformSoftwareRequest(BaseModel):
source: str = 'telegram'
chat_id: str | None = None
chat_type: str | None = None
process_now: bool = False
class PromptQueueProcessRequest(BaseModel):
"""Request body for manual queue processing."""
force: bool = False
limit: int = Field(default=1, ge=1, le=25)
class LLMPromptSettingUpdateRequest(BaseModel):
"""Request body for persisting one editable LLM prompt override."""
value: str = Field(default='')
class GiteaRepositoryOnboardRequest(BaseModel):
@@ -397,6 +425,275 @@ def _create_gitea_api():
)
def _create_home_assistant_agent() -> HomeAssistantAgent:
"""Create a configured Home Assistant client."""
return HomeAssistantAgent(
base_url=database_module.settings.home_assistant_url,
token=database_module.settings.home_assistant_token,
)
def _get_gitea_health() -> dict:
"""Return current Gitea connectivity diagnostics."""
if not database_module.settings.gitea_url:
return {
'status': 'error',
'message': 'Gitea URL is not configured.',
'base_url': '',
'configured': False,
'checks': [],
}
if not database_module.settings.gitea_token:
return {
'status': 'error',
'message': 'Gitea token is not configured.',
'base_url': database_module.settings.gitea_url,
'configured': False,
'checks': [],
}
response = _create_gitea_api().get_current_user_sync()
if response.get('error'):
return {
'status': 'error',
'message': response.get('error'),
'base_url': database_module.settings.gitea_url,
'configured': True,
'checks': [
{
'name': 'token_auth',
'ok': False,
'message': response.get('error'),
'url': f"{database_module.settings.gitea_url}/api/v1/user",
'status_code': response.get('status_code'),
}
],
}
username = response.get('login') or response.get('username') or response.get('full_name') or 'unknown'
return {
'status': 'success',
'message': f'Authenticated as {username}.',
'base_url': database_module.settings.gitea_url,
'configured': True,
'checks': [
{
'name': 'token_auth',
'ok': True,
'message': f'Authenticated as {username}',
'url': f"{database_module.settings.gitea_url}/api/v1/user",
}
],
'user': username,
}
def _get_home_assistant_health() -> dict:
"""Return current Home Assistant connectivity diagnostics."""
return _create_home_assistant_agent().health_check_sync()
async def _get_queue_gate_status(force: bool = False) -> dict:
"""Return whether queued prompts may be processed now."""
if not database_module.settings.prompt_queue_enabled:
return {
'status': 'disabled',
'allowed': True,
'forced': False,
'reason': 'Prompt queue is disabled',
}
if not database_module.settings.home_assistant_url:
if force or database_module.settings.prompt_queue_force_process:
return {
'status': 'success',
'allowed': True,
'forced': True,
'reason': 'Queue override is enabled',
}
return {
'status': 'blocked',
'allowed': False,
'forced': False,
'reason': 'Home Assistant URL is not configured',
}
return await _create_home_assistant_agent().queue_gate_status(force=force)
async def _interpret_freeform_request(request: FreeformSoftwareRequest, manager: DatabaseManager) -> tuple[SoftwareRequest, dict, dict]:
"""Interpret a free-form request and return the structured request plus routing trace."""
interpreter_context = manager.get_interpreter_context(chat_id=request.chat_id, source=request.source)
interpreted, interpretation_trace = await RequestInterpreter().interpret_with_trace(
request.prompt_text,
context=interpreter_context,
)
routing = interpretation_trace.get('routing') or {}
selected_history = manager.get_project_by_id(routing.get('project_id'), include_archived=False) if routing.get('project_id') else None
if selected_history is not None and routing.get('intent') != 'new_project':
interpreted['name'] = selected_history.project_name
interpreted['description'] = selected_history.description or interpreted['description']
return SoftwareRequest(**interpreted), routing, interpretation_trace
async def _run_freeform_generation(
request: FreeformSoftwareRequest,
db: Session,
*,
queue_item_id: int | None = None,
) -> dict:
"""Shared free-form request flow used by direct calls and queued processing."""
manager = DatabaseManager(db)
try:
structured_request, routing, interpretation_trace = await _interpret_freeform_request(request, manager)
response = await _run_generation(
structured_request,
db,
prompt_text=request.prompt_text,
prompt_actor=request.source,
prompt_source_context={
'chat_id': request.chat_id,
'chat_type': request.chat_type,
'queue_item_id': queue_item_id,
},
prompt_routing=routing,
preferred_project_id=routing.get('project_id') if routing.get('intent') != 'new_project' else None,
repo_name_override=routing.get('repo_name') if routing.get('intent') == 'new_project' else None,
related_issue={'number': routing.get('issue_number')} if routing.get('issue_number') is not None else None,
)
project_data = response.get('data', {})
if project_data.get('history_id') is not None:
manager = DatabaseManager(db)
prompts = manager.get_prompt_events(project_id=project_data.get('project_id'))
prompt_id = prompts[0]['id'] if prompts else None
manager.log_llm_trace(
project_id=project_data.get('project_id'),
history_id=project_data.get('history_id'),
prompt_id=prompt_id,
stage=interpretation_trace['stage'],
provider=interpretation_trace['provider'],
model=interpretation_trace['model'],
system_prompt=interpretation_trace['system_prompt'],
user_prompt=interpretation_trace['user_prompt'],
assistant_response=interpretation_trace['assistant_response'],
raw_response=interpretation_trace.get('raw_response'),
fallback_used=interpretation_trace.get('fallback_used', False),
)
naming_trace = interpretation_trace.get('project_naming')
if naming_trace:
manager.log_llm_trace(
project_id=project_data.get('project_id'),
history_id=project_data.get('history_id'),
prompt_id=prompt_id,
stage=naming_trace['stage'],
provider=naming_trace['provider'],
model=naming_trace['model'],
system_prompt=naming_trace['system_prompt'],
user_prompt=naming_trace['user_prompt'],
assistant_response=naming_trace['assistant_response'],
raw_response=naming_trace.get('raw_response'),
fallback_used=naming_trace.get('fallback_used', False),
)
response['interpreted_request'] = structured_request.model_dump()
response['routing'] = routing
response['llm_trace'] = interpretation_trace
response['source'] = {
'type': request.source,
'chat_id': request.chat_id,
'chat_type': request.chat_type,
}
if queue_item_id is not None:
DatabaseManager(db).complete_queued_prompt(
queue_item_id,
{
'project_id': project_data.get('project_id'),
'history_id': project_data.get('history_id'),
'status': response.get('status'),
},
)
return response
except Exception as exc:
if queue_item_id is not None:
DatabaseManager(db).fail_queued_prompt(queue_item_id, str(exc))
raise
async def _process_prompt_queue_batch(limit: int = 1, force: bool = False) -> dict:
"""Process up to `limit` queued prompts if the energy gate allows it."""
queue_gate = await _get_queue_gate_status(force=force)
if not queue_gate.get('allowed'):
db = database_module.get_db_sync()
try:
summary = DatabaseManager(db).get_prompt_queue_summary()
finally:
db.close()
return {
'status': queue_gate.get('status', 'blocked'),
'processed_count': 0,
'queue_gate': queue_gate,
'queue_summary': summary,
'processed': [],
}
processed = []
for _ in range(max(limit, 1)):
claim_db = database_module.get_db_sync()
try:
claimed = DatabaseManager(claim_db).claim_next_queued_prompt()
finally:
claim_db.close()
if claimed is None:
break
work_db = database_module.get_db_sync()
try:
request = FreeformSoftwareRequest(
prompt_text=claimed['prompt_text'],
source=claimed['source'] or 'telegram',
chat_id=claimed.get('chat_id'),
chat_type=claimed.get('chat_type'),
process_now=True,
)
response = await _run_freeform_generation(request, work_db, queue_item_id=claimed['id'])
processed.append(
{
'queue_item_id': claimed['id'],
'project_id': (response.get('data') or {}).get('project_id'),
'status': response.get('status'),
}
)
except Exception as exc:
DatabaseManager(work_db).fail_queued_prompt(claimed['id'], str(exc))
processed.append({'queue_item_id': claimed['id'], 'status': 'failed', 'error': str(exc)})
finally:
work_db.close()
summary_db = database_module.get_db_sync()
try:
summary = DatabaseManager(summary_db).get_prompt_queue_summary()
finally:
summary_db.close()
return {
'status': 'success',
'processed_count': len(processed),
'processed': processed,
'queue_gate': queue_gate,
'queue_summary': summary,
}
async def _prompt_queue_worker() -> None:
"""Background worker that drains the prompt queue when the energy gate opens."""
while True:
try:
await _process_prompt_queue_batch(
limit=database_module.settings.prompt_queue_max_batch_size,
force=database_module.settings.prompt_queue_force_process,
)
except Exception as exc:
db = database_module.get_db_sync()
try:
DatabaseManager(db).log_system_event('prompt-queue', 'ERROR', f'Queue worker error: {exc}')
finally:
db.close()
await asyncio.sleep(database_module.settings.prompt_queue_poll_interval_seconds)
def _resolve_n8n_api_url(explicit_url: str | None = None) -> str:
"""Resolve the effective n8n API URL from explicit input or settings."""
if explicit_url and explicit_url.strip():
@@ -420,8 +717,12 @@ def read_api_info():
'/api',
'/health',
'/llm/runtime',
'/llm/prompts',
'/llm/prompts/{prompt_key}',
'/generate',
'/generate/text',
'/queue',
'/queue/process',
'/projects',
'/status/{project_id}',
'/audit/projects',
@@ -442,7 +743,9 @@ def read_api_info():
'/projects/{project_id}/prompts/{prompt_id}/undo',
'/projects/{project_id}/sync-repository',
'/gitea/repos',
'/gitea/health',
'/gitea/repos/onboard',
'/home-assistant/health',
'/n8n/health',
'/n8n/setup',
],
@@ -453,11 +756,30 @@ def read_api_info():
def health_check():
"""Health check endpoint."""
runtime = database_module.get_database_runtime_summary()
queue_summary = {'queued': 0, 'processing': 0, 'completed': 0, 'failed': 0, 'total': 0, 'next_item': None}
db = database_module.get_db_sync()
try:
try:
queue_summary = DatabaseManager(db).get_prompt_queue_summary()
except Exception:
pass
finally:
db.close()
return {
'status': 'healthy',
'database': runtime['backend'],
'database_target': runtime['target'],
'database_name': runtime['database'],
'integrations': {
'gitea': _get_gitea_health(),
'home_assistant': _get_home_assistant_health(),
},
'prompt_queue': {
'enabled': database_module.settings.prompt_queue_enabled,
'auto_process': database_module.settings.prompt_queue_auto_process,
'force_process': database_module.settings.prompt_queue_force_process,
'summary': queue_summary,
},
}
@@ -467,6 +789,30 @@ def get_llm_runtime():
return LLMServiceClient().get_runtime_configuration()
@app.get('/llm/prompts')
def get_llm_prompt_settings(db: DbSession):
"""Return editable LLM prompt settings with DB overrides merged over environment defaults."""
return {'prompts': DatabaseManager(db).get_llm_prompt_settings()}
@app.put('/llm/prompts/{prompt_key}')
def update_llm_prompt_setting(prompt_key: str, request: LLMPromptSettingUpdateRequest, db: DbSession):
"""Persist one editable LLM prompt override into the database."""
result = DatabaseManager(db).save_llm_prompt_setting(prompt_key, request.value, actor='api')
if result.get('status') == 'error':
raise HTTPException(status_code=400, detail=result.get('message', 'Prompt save failed'))
return result
@app.delete('/llm/prompts/{prompt_key}')
def reset_llm_prompt_setting(prompt_key: str, db: DbSession):
"""Reset one editable LLM prompt override back to the environment/default value."""
result = DatabaseManager(db).reset_llm_prompt_setting(prompt_key, actor='api')
if result.get('status') == 'error':
raise HTTPException(status_code=400, detail=result.get('message', 'Prompt reset failed'))
return result
@app.post('/generate')
async def generate_software(request: SoftwareRequest, db: DbSession):
"""Create and record a software-generation request."""
@@ -492,74 +838,64 @@ async def generate_software_from_text(request: FreeformSoftwareRequest, db: DbSe
},
}
manager = DatabaseManager(db)
interpreter_context = manager.get_interpreter_context(chat_id=request.chat_id, source=request.source)
interpreted, interpretation_trace = await RequestInterpreter().interpret_with_trace(
request.prompt_text,
context=interpreter_context,
)
routing = interpretation_trace.get('routing') or {}
selected_history = manager.get_project_by_id(routing.get('project_id'), include_archived=False) if routing.get('project_id') else None
if selected_history is not None and routing.get('intent') != 'new_project':
interpreted['name'] = selected_history.project_name
interpreted['description'] = selected_history.description or interpreted['description']
structured_request = SoftwareRequest(**interpreted)
response = await _run_generation(
structured_request,
db,
prompt_text=request.prompt_text,
prompt_actor=request.source,
prompt_source_context={
'chat_id': request.chat_id,
'chat_type': request.chat_type,
},
prompt_routing=routing,
preferred_project_id=routing.get('project_id') if routing.get('intent') != 'new_project' else None,
repo_name_override=routing.get('repo_name') if routing.get('intent') == 'new_project' else None,
related_issue={'number': routing.get('issue_number')} if routing.get('issue_number') is not None else None,
)
project_data = response.get('data', {})
if project_data.get('history_id') is not None:
if request.source == 'telegram' and database_module.settings.prompt_queue_enabled and not request.process_now:
manager = DatabaseManager(db)
prompts = manager.get_prompt_events(project_id=project_data.get('project_id'))
prompt_id = prompts[0]['id'] if prompts else None
manager.log_llm_trace(
project_id=project_data.get('project_id'),
history_id=project_data.get('history_id'),
prompt_id=prompt_id,
stage=interpretation_trace['stage'],
provider=interpretation_trace['provider'],
model=interpretation_trace['model'],
system_prompt=interpretation_trace['system_prompt'],
user_prompt=interpretation_trace['user_prompt'],
assistant_response=interpretation_trace['assistant_response'],
raw_response=interpretation_trace.get('raw_response'),
fallback_used=interpretation_trace.get('fallback_used', False),
queue_item = manager.enqueue_prompt(
prompt_text=request.prompt_text,
source=request.source,
chat_id=request.chat_id,
chat_type=request.chat_type,
source_context={'chat_id': request.chat_id, 'chat_type': request.chat_type},
)
naming_trace = interpretation_trace.get('project_naming')
if naming_trace:
manager.log_llm_trace(
project_id=project_data.get('project_id'),
history_id=project_data.get('history_id'),
prompt_id=prompt_id,
stage=naming_trace['stage'],
provider=naming_trace['provider'],
model=naming_trace['model'],
system_prompt=naming_trace['system_prompt'],
user_prompt=naming_trace['user_prompt'],
assistant_response=naming_trace['assistant_response'],
raw_response=naming_trace.get('raw_response'),
fallback_used=naming_trace.get('fallback_used', False),
)
response['interpreted_request'] = interpreted
response['routing'] = routing
response['llm_trace'] = interpretation_trace
response['source'] = {
'type': request.source,
'chat_id': request.chat_id,
'chat_type': request.chat_type,
return {
'status': 'queued',
'message': 'Prompt queued for energy-aware processing.',
'queue_item': queue_item,
'queue_summary': manager.get_prompt_queue_summary(),
'queue_gate': await _get_queue_gate_status(force=False),
'source': {
'type': request.source,
'chat_id': request.chat_id,
'chat_type': request.chat_type,
},
}
return await _run_freeform_generation(request, db)
@app.get('/queue')
def get_prompt_queue(db: DbSession):
"""Return queued prompt items and prompt queue configuration."""
manager = DatabaseManager(db)
return {
'queue': manager.get_prompt_queue(),
'summary': manager.get_prompt_queue_summary(),
'config': {
'enabled': database_module.settings.prompt_queue_enabled,
'auto_process': database_module.settings.prompt_queue_auto_process,
'force_process': database_module.settings.prompt_queue_force_process,
'poll_interval_seconds': database_module.settings.prompt_queue_poll_interval_seconds,
'max_batch_size': database_module.settings.prompt_queue_max_batch_size,
},
}
return response
@app.post('/queue/process')
async def process_prompt_queue(request: PromptQueueProcessRequest):
"""Manually process queued prompts, optionally bypassing the HA gate."""
return await _process_prompt_queue_batch(limit=request.limit, force=request.force)
@app.get('/gitea/health')
def get_gitea_health():
"""Return Gitea integration connectivity diagnostics."""
return _get_gitea_health()
@app.get('/home-assistant/health')
def get_home_assistant_health():
"""Return Home Assistant integration connectivity diagnostics."""
return _get_home_assistant_health()
@app.get('/projects')
@@ -743,13 +1079,18 @@ def delete_project(project_id: str, db: DbSession):
remote_delete = None
if repository and repository.get('mode') != 'shared' and repository.get('owner') and repository.get('name') and database_module.settings.gitea_url and database_module.settings.gitea_token:
remote_delete = _create_gitea_api().delete_repo_sync(owner=repository.get('owner'), repo=repository.get('name'))
if remote_delete.get('error') and remote_delete.get('status_code') not in {404, None}:
raise HTTPException(status_code=502, detail=remote_delete.get('error'))
if remote_delete.get('error'):
manager.log_system_event(
component='gitea',
level='WARNING',
message=f"Remote repository delete failed for {repository.get('owner')}/{repository.get('name')}: {remote_delete.get('error')}",
)
result = manager.delete_project(project_id)
if result.get('status') == 'error':
raise HTTPException(status_code=400, detail=result.get('message', 'Project deletion failed'))
result['remote_repository_deleted'] = bool(remote_delete and not remote_delete.get('error'))
result['remote_repository_delete_error'] = remote_delete.get('error') if remote_delete else None
result['remote_repository'] = repository if repository else None
return result