Files
ai_software_factory/ai_software_factory/main.py

1248 lines
50 KiB
Python

#!/usr/bin/env python3
"""AI Software Factory - Main application with FastAPI backend and NiceGUI frontend.
This application uses FastAPI to:
1. Provide HTTP API endpoints
2. Host NiceGUI frontend via ui.run_with()
The NiceGUI frontend provides:
1. Interactive dashboard at /
2. Real-time data visualization
3. Audit trail display
"""
from __future__ import annotations
import asyncio
from contextlib import asynccontextmanager
import json
import re
from pathlib import Path
from typing import Annotated
from uuid import uuid4
from fastapi import Depends, FastAPI, HTTPException, Query
from pydantic import BaseModel, Field
from sqlalchemy.orm import Session
try:
from . import __version__, frontend
from . import database as database_module
from .agents.change_summary import ChangeSummaryGenerator
from .agents.database_manager import DatabaseManager
from .agents.home_assistant import HomeAssistantAgent
from .agents.request_interpreter import RequestInterpreter
from .agents.llm_service import LLMServiceClient
from .agents.orchestrator import AgentOrchestrator
from .agents.n8n_setup import N8NSetupAgent
from .agents.prompt_workflow import PromptWorkflowManager
from .agents.ui_manager import UIManager
from .models import ProjectHistory, ProjectLog, SystemLog
except ImportError:
import frontend
import database as database_module
from agents.change_summary import ChangeSummaryGenerator
from agents.database_manager import DatabaseManager
from agents.home_assistant import HomeAssistantAgent
from agents.request_interpreter import RequestInterpreter
from agents.llm_service import LLMServiceClient
from agents.orchestrator import AgentOrchestrator
from agents.n8n_setup import N8NSetupAgent
from agents.prompt_workflow import PromptWorkflowManager
from agents.ui_manager import UIManager
from models import ProjectHistory, ProjectLog, SystemLog
__version__ = "0.0.1"
@asynccontextmanager
async def lifespan(_app: FastAPI):
"""Log resolved runtime configuration when the app starts."""
runtime = database_module.get_database_runtime_summary()
print(
f"Runtime configuration: database_backend={runtime['backend']} target={runtime['target']}"
)
queue_worker = None
if database_module.settings.prompt_queue_enabled and database_module.settings.prompt_queue_auto_process:
queue_worker = asyncio.create_task(_prompt_queue_worker())
try:
yield
finally:
if queue_worker is not None:
queue_worker.cancel()
try:
await queue_worker
except asyncio.CancelledError:
pass
app = FastAPI(lifespan=lifespan)
DbSession = Annotated[Session, Depends(database_module.get_db)]
PROJECT_ID_PATTERN = re.compile(r"[^a-z0-9]+")
class SoftwareRequest(BaseModel):
"""Request body for software generation."""
name: str = Field(min_length=1, max_length=255)
description: str = Field(min_length=1, max_length=255)
features: list[str] = Field(default_factory=list)
tech_stack: list[str] = Field(default_factory=list)
class N8NSetupRequest(BaseModel):
"""Request body for n8n workflow provisioning."""
api_url: str | None = None
api_key: str | None = None
webhook_path: str = "telegram"
backend_url: str | None = None
force_update: bool = False
class FreeformSoftwareRequest(BaseModel):
"""Request body for free-form software generation."""
prompt_text: str = Field(min_length=1)
source: str = 'telegram'
chat_id: str | None = None
chat_type: str | None = None
process_now: bool = False
class PromptQueueProcessRequest(BaseModel):
"""Request body for manual queue processing."""
force: bool = False
limit: int = Field(default=1, ge=1, le=25)
class LLMPromptSettingUpdateRequest(BaseModel):
"""Request body for persisting one editable LLM prompt override."""
value: str = Field(default='')
class GiteaRepositoryOnboardRequest(BaseModel):
"""Request body for onboarding a manually created Gitea repository."""
repo_name: str = Field(min_length=1, max_length=255)
owner: str | None = None
sync_commits: bool = True
commit_limit: int = Field(default=25, ge=1, le=200)
def _build_project_id(name: str) -> str:
"""Create a stable project id from the requested name."""
slug = PROJECT_ID_PATTERN.sub("-", name.strip().lower()).strip("-") or "project"
return f"{slug}-{uuid4().hex[:8]}"
def _build_project_slug(name: str) -> str:
"""Normalize a project name into a kebab-case identifier slug."""
return PROJECT_ID_PATTERN.sub("-", name.strip().lower()).strip("-") or "project"
def _ensure_unique_identifier(base_slug: str, reserved_ids: set[str]) -> str:
"""Return a unique identifier using deterministic numeric suffixes when needed."""
normalized = _build_project_slug(base_slug)
if normalized not in reserved_ids:
return normalized
suffix = 2
while f"{normalized}-{suffix}" in reserved_ids:
suffix += 1
return f"{normalized}-{suffix}"
def _build_project_identity_context(manager: DatabaseManager) -> list[dict]:
"""Build a compact project catalog for naming stages."""
projects = []
for history in manager.get_all_projects(include_archived=True):
repository = manager._get_project_repository(history) or {}
projects.append(
{
'project_id': history.project_id,
'name': history.project_name,
'description': history.description,
'repository': {
'owner': repository.get('owner'),
'name': repository.get('name'),
},
}
)
return projects
async def _derive_project_id_for_request(
request: SoftwareRequest,
*,
prompt_text: str,
prompt_routing: dict | None,
existing_projects: list[dict],
) -> tuple[str, dict | None]:
"""Derive a stable project id for a newly created project."""
reserved_ids = {str(project.get('project_id')).strip() for project in existing_projects if project.get('project_id')}
fallback_id = _ensure_unique_identifier((prompt_routing or {}).get('project_name') or request.name, reserved_ids)
user_prompt = (
f"Original user prompt:\n{prompt_text}\n\n"
f"Structured request:\n{json.dumps({'name': request.name, 'description': request.description, 'features': request.features, 'tech_stack': request.tech_stack}, indent=2)}\n\n"
f"Naming context:\n{json.dumps(prompt_routing or {}, indent=2)}\n\n"
f"Reserved project ids:\n{json.dumps(sorted(reserved_ids))}\n\n"
"Suggest the best stable project id for this new project."
)
content, trace = await LLMServiceClient().chat_with_trace(
stage='project_id_naming',
system_prompt=database_module.settings.llm_project_id_system_prompt,
user_prompt=user_prompt,
tool_context_input={'projects': existing_projects},
expect_json=True,
)
if content:
try:
parsed = json.loads(content)
candidate = parsed.get('project_id') or parsed.get('slug') or request.name
return _ensure_unique_identifier(str(candidate), reserved_ids), trace
except Exception:
pass
return fallback_id, trace
def _serialize_project(history: ProjectHistory) -> dict:
"""Serialize a project history row for API responses."""
return {
"history_id": history.id,
"project_id": history.project_id,
"name": history.project_name,
"description": history.description,
"status": history.status,
"progress": history.progress,
"message": history.message,
"current_step": history.current_step,
"error_message": history.error_message,
"created_at": history.created_at.isoformat() if history.created_at else None,
"updated_at": history.updated_at.isoformat() if history.updated_at else None,
"completed_at": history.completed_at.isoformat() if history.completed_at else None,
}
def _serialize_project_log(log: ProjectLog) -> dict:
"""Serialize a project log row."""
return {
"id": log.id,
"history_id": log.history_id,
"level": log.log_level,
"message": log.log_message,
"timestamp": log.timestamp.isoformat() if log.timestamp else None,
}
def _serialize_system_log(log: SystemLog) -> dict:
"""Serialize a system log row."""
return {
"id": log.id,
"component": log.component,
"level": log.log_level,
"message": log.log_message,
"user_agent": log.user_agent,
"ip_address": log.ip_address,
"timestamp": log.created_at.isoformat() if log.created_at else None,
}
def _serialize_audit_item(item: dict) -> dict:
"""Return audit-shaped dictionaries unchanged for API output."""
return item
def _compose_prompt_text(request: SoftwareRequest) -> str:
"""Render the originating software request into a stable prompt string."""
features = ", ".join(request.features) if request.features else "None"
tech_stack = ", ".join(request.tech_stack) if request.tech_stack else "None"
return (
f"Name: {request.name}\n"
f"Description: {request.description}\n"
f"Features: {features}\n"
f"Tech Stack: {tech_stack}"
)
async def _run_generation(
request: SoftwareRequest,
db: Session,
prompt_text: str | None = None,
prompt_actor: str = 'api',
prompt_source_context: dict | None = None,
prompt_routing: dict | None = None,
preferred_project_id: str | None = None,
repo_name_override: str | None = None,
related_issue: dict | None = None,
) -> dict:
"""Run the shared generation pipeline for a structured request."""
database_module.init_db()
manager = DatabaseManager(db)
is_explicit_new_project = (prompt_routing or {}).get('intent') == 'new_project'
reusable_history = manager.get_project_by_id(preferred_project_id, include_archived=False) if preferred_project_id else (None if is_explicit_new_project else manager.get_latest_project_by_name(request.name))
if reusable_history and database_module.settings.gitea_url and database_module.settings.gitea_token:
try:
from .agents.gitea import GiteaAPI
except ImportError:
from agents.gitea import GiteaAPI
manager.sync_pull_request_states(
GiteaAPI(
token=database_module.settings.GITEA_TOKEN,
base_url=database_module.settings.GITEA_URL,
owner=database_module.settings.GITEA_OWNER,
repo=database_module.settings.GITEA_REPO or '',
),
project_id=reusable_history.project_id,
)
project_id_trace = None
resolved_prompt_text = prompt_text or _compose_prompt_text(request)
if preferred_project_id and reusable_history is not None:
project_id = reusable_history.project_id
elif reusable_history and not is_explicit_new_project and manager.get_open_pull_request(project_id=reusable_history.project_id):
project_id = reusable_history.project_id
else:
if is_explicit_new_project or prompt_text:
project_id, project_id_trace = await _derive_project_id_for_request(
request,
prompt_text=resolved_prompt_text,
prompt_routing=prompt_routing,
existing_projects=_build_project_identity_context(manager),
)
else:
project_id = _build_project_id(request.name)
reusable_history = None
orchestrator = AgentOrchestrator(
project_id=project_id,
project_name=request.name,
description=request.description,
features=request.features,
tech_stack=request.tech_stack,
db=db,
prompt_text=resolved_prompt_text,
prompt_actor=prompt_actor,
existing_history=reusable_history,
prompt_source_context=prompt_source_context,
prompt_routing=prompt_routing,
repo_name_override=repo_name_override,
related_issue_hint=related_issue,
)
result = await orchestrator.run()
manager = DatabaseManager(db)
manager.log_system_event(
component='api',
level='INFO' if result['status'] == 'completed' else 'ERROR',
message=f"Generated project {project_id} with {len(result.get('changed_files', []))} artifact(s)",
)
history = manager.get_project_by_id(project_id)
project_logs = manager.get_project_logs(history.id)
response_data = _serialize_project(history)
response_data['logs'] = [_serialize_project_log(log) for log in project_logs]
response_data['ui_data'] = result.get('ui_data')
response_data['features'] = request.features
response_data['tech_stack'] = request.tech_stack
response_data['project_root'] = result.get('project_root', str(_project_root(project_id)))
response_data['changed_files'] = result.get('changed_files', [])
response_data['repository'] = result.get('repository')
response_data['related_issue'] = result.get('related_issue') or (result.get('ui_data') or {}).get('related_issue')
response_data['pull_request'] = result.get('pull_request') or manager.get_open_pull_request(project_id=project_id)
if project_id_trace:
manager.log_llm_trace(
project_id=project_id,
history_id=history.id if history else None,
prompt_id=orchestrator.prompt_audit.id if orchestrator.prompt_audit else None,
stage=project_id_trace['stage'],
provider=project_id_trace['provider'],
model=project_id_trace['model'],
system_prompt=project_id_trace['system_prompt'],
user_prompt=project_id_trace['user_prompt'],
assistant_response=project_id_trace['assistant_response'],
raw_response=project_id_trace.get('raw_response'),
fallback_used=project_id_trace.get('fallback_used', False),
)
summary_context = {
'name': response_data['name'],
'description': response_data['description'],
'features': response_data['features'],
'tech_stack': response_data['tech_stack'],
'changed_files': response_data['changed_files'],
'repository_url': (
(response_data.get('repository') or {}).get('url')
if isinstance(response_data.get('repository'), dict)
and (response_data.get('repository') or {}).get('status') in {'created', 'exists', 'ready', 'shared'}
else None
),
'repository_status': (response_data.get('repository') or {}).get('status') if isinstance(response_data.get('repository'), dict) else None,
'pull_request_url': (response_data.get('pull_request') or {}).get('pr_url') if isinstance(response_data.get('pull_request'), dict) else None,
'pull_request_state': (response_data.get('pull_request') or {}).get('pr_state') if isinstance(response_data.get('pull_request'), dict) else None,
'related_issue': response_data.get('related_issue'),
'message': response_data.get('message'),
'logs': [log.get('message', '') for log in response_data.get('logs', []) if isinstance(log, dict)],
}
summary_message, summary_trace = await ChangeSummaryGenerator().summarize_with_trace(summary_context)
if orchestrator.db_manager and orchestrator.history and orchestrator.prompt_audit:
orchestrator.db_manager.log_llm_trace(
project_id=project_id,
history_id=orchestrator.history.id,
prompt_id=orchestrator.prompt_audit.id,
stage=summary_trace['stage'],
provider=summary_trace['provider'],
model=summary_trace['model'],
system_prompt=summary_trace['system_prompt'],
user_prompt=summary_trace['user_prompt'],
assistant_response=summary_trace['assistant_response'],
raw_response=summary_trace.get('raw_response'),
fallback_used=summary_trace.get('fallback_used', False),
)
response_data['summary_message'] = summary_message
response_data['pull_request'] = result.get('pull_request') or manager.get_open_pull_request(project_id=project_id)
return {'status': result['status'], 'data': response_data, 'summary_message': summary_message}
def _project_root(project_id: str) -> Path:
"""Resolve the filesystem location for a generated project."""
return database_module.settings.projects_root / project_id
def _create_gitea_api():
"""Create a configured Gitea client or raise an HTTP error if unavailable."""
if not database_module.settings.gitea_url or not database_module.settings.gitea_token:
raise HTTPException(status_code=400, detail='Gitea integration is not configured')
try:
from .agents.gitea import GiteaAPI
except ImportError:
from agents.gitea import GiteaAPI
return GiteaAPI(
token=database_module.settings.GITEA_TOKEN,
base_url=database_module.settings.GITEA_URL,
owner=database_module.settings.GITEA_OWNER,
repo=database_module.settings.GITEA_REPO or '',
)
def _create_home_assistant_agent() -> HomeAssistantAgent:
"""Create a configured Home Assistant client."""
return HomeAssistantAgent(
base_url=database_module.settings.home_assistant_url,
token=database_module.settings.home_assistant_token,
)
def _get_gitea_health() -> dict:
"""Return current Gitea connectivity diagnostics."""
if not database_module.settings.gitea_url:
return {
'status': 'error',
'message': 'Gitea URL is not configured.',
'base_url': '',
'configured': False,
'checks': [],
}
if not database_module.settings.gitea_token:
return {
'status': 'error',
'message': 'Gitea token is not configured.',
'base_url': database_module.settings.gitea_url,
'configured': False,
'checks': [],
}
response = _create_gitea_api().get_current_user_sync()
if response.get('error'):
return {
'status': 'error',
'message': response.get('error'),
'base_url': database_module.settings.gitea_url,
'configured': True,
'checks': [
{
'name': 'token_auth',
'ok': False,
'message': response.get('error'),
'url': f"{database_module.settings.gitea_url}/api/v1/user",
'status_code': response.get('status_code'),
}
],
}
username = response.get('login') or response.get('username') or response.get('full_name') or 'unknown'
return {
'status': 'success',
'message': f'Authenticated as {username}.',
'base_url': database_module.settings.gitea_url,
'configured': True,
'checks': [
{
'name': 'token_auth',
'ok': True,
'message': f'Authenticated as {username}',
'url': f"{database_module.settings.gitea_url}/api/v1/user",
}
],
'user': username,
}
def _get_home_assistant_health() -> dict:
"""Return current Home Assistant connectivity diagnostics."""
return _create_home_assistant_agent().health_check_sync()
async def _get_queue_gate_status(force: bool = False) -> dict:
"""Return whether queued prompts may be processed now."""
if not database_module.settings.prompt_queue_enabled:
return {
'status': 'disabled',
'allowed': True,
'forced': False,
'reason': 'Prompt queue is disabled',
}
if not database_module.settings.home_assistant_url:
if force or database_module.settings.prompt_queue_force_process:
return {
'status': 'success',
'allowed': True,
'forced': True,
'reason': 'Queue override is enabled',
}
return {
'status': 'blocked',
'allowed': False,
'forced': False,
'reason': 'Home Assistant URL is not configured',
}
return await _create_home_assistant_agent().queue_gate_status(force=force)
async def _interpret_freeform_request(request: FreeformSoftwareRequest, manager: DatabaseManager) -> tuple[SoftwareRequest, dict, dict]:
"""Interpret a free-form request and return the structured request plus routing trace."""
interpreter_context = manager.get_interpreter_context(chat_id=request.chat_id, source=request.source)
interpreted, interpretation_trace = await RequestInterpreter().interpret_with_trace(
request.prompt_text,
context=interpreter_context,
)
routing = interpretation_trace.get('routing') or {}
selected_history = manager.get_project_by_id(routing.get('project_id'), include_archived=False) if routing.get('project_id') else None
if selected_history is not None and routing.get('intent') != 'new_project':
interpreted['name'] = selected_history.project_name
interpreted['description'] = selected_history.description or interpreted['description']
return SoftwareRequest(**interpreted), routing, interpretation_trace
async def _run_freeform_generation(
request: FreeformSoftwareRequest,
db: Session,
*,
queue_item_id: int | None = None,
) -> dict:
"""Shared free-form request flow used by direct calls and queued processing."""
manager = DatabaseManager(db)
try:
structured_request, routing, interpretation_trace = await _interpret_freeform_request(request, manager)
response = await _run_generation(
structured_request,
db,
prompt_text=request.prompt_text,
prompt_actor=request.source,
prompt_source_context={
'chat_id': request.chat_id,
'chat_type': request.chat_type,
'queue_item_id': queue_item_id,
},
prompt_routing=routing,
preferred_project_id=routing.get('project_id') if routing.get('intent') != 'new_project' else None,
repo_name_override=routing.get('repo_name') if routing.get('intent') == 'new_project' else None,
related_issue={'number': routing.get('issue_number')} if routing.get('issue_number') is not None else None,
)
project_data = response.get('data', {})
if project_data.get('history_id') is not None:
manager = DatabaseManager(db)
prompts = manager.get_prompt_events(project_id=project_data.get('project_id'))
prompt_id = prompts[0]['id'] if prompts else None
manager.log_llm_trace(
project_id=project_data.get('project_id'),
history_id=project_data.get('history_id'),
prompt_id=prompt_id,
stage=interpretation_trace['stage'],
provider=interpretation_trace['provider'],
model=interpretation_trace['model'],
system_prompt=interpretation_trace['system_prompt'],
user_prompt=interpretation_trace['user_prompt'],
assistant_response=interpretation_trace['assistant_response'],
raw_response=interpretation_trace.get('raw_response'),
fallback_used=interpretation_trace.get('fallback_used', False),
)
naming_trace = interpretation_trace.get('project_naming')
if naming_trace:
manager.log_llm_trace(
project_id=project_data.get('project_id'),
history_id=project_data.get('history_id'),
prompt_id=prompt_id,
stage=naming_trace['stage'],
provider=naming_trace['provider'],
model=naming_trace['model'],
system_prompt=naming_trace['system_prompt'],
user_prompt=naming_trace['user_prompt'],
assistant_response=naming_trace['assistant_response'],
raw_response=naming_trace.get('raw_response'),
fallback_used=naming_trace.get('fallback_used', False),
)
response['interpreted_request'] = structured_request.model_dump()
response['routing'] = routing
response['llm_trace'] = interpretation_trace
response['source'] = {
'type': request.source,
'chat_id': request.chat_id,
'chat_type': request.chat_type,
}
if queue_item_id is not None:
DatabaseManager(db).complete_queued_prompt(
queue_item_id,
{
'project_id': project_data.get('project_id'),
'history_id': project_data.get('history_id'),
'status': response.get('status'),
},
)
return response
except Exception as exc:
if queue_item_id is not None:
DatabaseManager(db).fail_queued_prompt(queue_item_id, str(exc))
raise
async def _process_prompt_queue_batch(limit: int = 1, force: bool = False) -> dict:
"""Process up to `limit` queued prompts if the energy gate allows it."""
queue_gate = await _get_queue_gate_status(force=force)
if not queue_gate.get('allowed'):
db = database_module.get_db_sync()
try:
summary = DatabaseManager(db).get_prompt_queue_summary()
finally:
db.close()
return {
'status': queue_gate.get('status', 'blocked'),
'processed_count': 0,
'queue_gate': queue_gate,
'queue_summary': summary,
'processed': [],
}
processed = []
for _ in range(max(limit, 1)):
claim_db = database_module.get_db_sync()
try:
claimed = DatabaseManager(claim_db).claim_next_queued_prompt()
finally:
claim_db.close()
if claimed is None:
break
work_db = database_module.get_db_sync()
try:
request = FreeformSoftwareRequest(
prompt_text=claimed['prompt_text'],
source=claimed['source'] or 'telegram',
chat_id=claimed.get('chat_id'),
chat_type=claimed.get('chat_type'),
process_now=True,
)
response = await _run_freeform_generation(request, work_db, queue_item_id=claimed['id'])
processed.append(
{
'queue_item_id': claimed['id'],
'project_id': (response.get('data') or {}).get('project_id'),
'status': response.get('status'),
}
)
except Exception as exc:
DatabaseManager(work_db).fail_queued_prompt(claimed['id'], str(exc))
processed.append({'queue_item_id': claimed['id'], 'status': 'failed', 'error': str(exc)})
finally:
work_db.close()
summary_db = database_module.get_db_sync()
try:
summary = DatabaseManager(summary_db).get_prompt_queue_summary()
finally:
summary_db.close()
return {
'status': 'success',
'processed_count': len(processed),
'processed': processed,
'queue_gate': queue_gate,
'queue_summary': summary,
}
async def _prompt_queue_worker() -> None:
"""Background worker that drains the prompt queue when the energy gate opens."""
while True:
try:
await _process_prompt_queue_batch(
limit=database_module.settings.prompt_queue_max_batch_size,
force=database_module.settings.prompt_queue_force_process,
)
except Exception as exc:
db = database_module.get_db_sync()
try:
DatabaseManager(db).log_system_event('prompt-queue', 'ERROR', f'Queue worker error: {exc}')
finally:
db.close()
await asyncio.sleep(database_module.settings.prompt_queue_poll_interval_seconds)
def _resolve_n8n_api_url(explicit_url: str | None = None) -> str:
"""Resolve the effective n8n API URL from explicit input or settings."""
if explicit_url and explicit_url.strip():
return explicit_url.strip()
if database_module.settings.n8n_api_url:
return database_module.settings.n8n_api_url
webhook_url = database_module.settings.n8n_webhook_url
if webhook_url:
return webhook_url.split("/webhook", 1)[0].rstrip("/")
return ""
@app.get('/api')
def read_api_info():
"""Return service metadata for API clients."""
return {
'service': 'AI Software Factory',
'version': __version__,
'endpoints': [
'/',
'/api',
'/health',
'/llm/runtime',
'/llm/prompts',
'/llm/prompts/{prompt_key}',
'/generate',
'/generate/text',
'/queue',
'/queue/process',
'/projects',
'/status/{project_id}',
'/audit/projects',
'/audit/logs',
'/audit/system/logs',
'/audit/prompts',
'/audit/changes',
'/audit/issues',
'/audit/commit-context',
'/audit/timeline',
'/audit/llm-traces',
'/audit/pull-requests',
'/audit/lineage',
'/audit/correlations',
'/projects/{project_id}/archive',
'/projects/{project_id}/unarchive',
'/projects/{project_id}',
'/projects/{project_id}/prompts/{prompt_id}/undo',
'/projects/{project_id}/sync-repository',
'/gitea/repos',
'/gitea/health',
'/gitea/repos/onboard',
'/home-assistant/health',
'/n8n/health',
'/n8n/setup',
],
}
@app.get('/health')
def health_check():
"""Health check endpoint."""
runtime = database_module.get_database_runtime_summary()
queue_summary = {'queued': 0, 'processing': 0, 'completed': 0, 'failed': 0, 'total': 0, 'next_item': None}
db = database_module.get_db_sync()
try:
try:
queue_summary = DatabaseManager(db).get_prompt_queue_summary()
except Exception:
pass
finally:
db.close()
return {
'status': 'healthy',
'database': runtime['backend'],
'database_target': runtime['target'],
'database_name': runtime['database'],
'integrations': {
'gitea': _get_gitea_health(),
'home_assistant': _get_home_assistant_health(),
},
'prompt_queue': {
'enabled': database_module.settings.prompt_queue_enabled,
'auto_process': database_module.settings.prompt_queue_auto_process,
'force_process': database_module.settings.prompt_queue_force_process,
'summary': queue_summary,
},
}
@app.get('/llm/runtime')
def get_llm_runtime():
"""Return the active external LLM runtime, guardrail, and tool configuration."""
return LLMServiceClient().get_runtime_configuration()
@app.get('/llm/prompts')
def get_llm_prompt_settings(db: DbSession):
"""Return editable LLM prompt settings with DB overrides merged over environment defaults."""
return {'prompts': DatabaseManager(db).get_llm_prompt_settings()}
@app.put('/llm/prompts/{prompt_key}')
def update_llm_prompt_setting(prompt_key: str, request: LLMPromptSettingUpdateRequest, db: DbSession):
"""Persist one editable LLM prompt override into the database."""
database_module.init_db()
result = DatabaseManager(db).save_llm_prompt_setting(prompt_key, request.value, actor='api')
if result.get('status') == 'error':
raise HTTPException(status_code=400, detail=result.get('message', 'Prompt save failed'))
return result
@app.delete('/llm/prompts/{prompt_key}')
def reset_llm_prompt_setting(prompt_key: str, db: DbSession):
"""Reset one editable LLM prompt override back to the environment/default value."""
database_module.init_db()
result = DatabaseManager(db).reset_llm_prompt_setting(prompt_key, actor='api')
if result.get('status') == 'error':
raise HTTPException(status_code=400, detail=result.get('message', 'Prompt reset failed'))
return result
@app.post('/generate')
async def generate_software(request: SoftwareRequest, db: DbSession):
"""Create and record a software-generation request."""
return await _run_generation(request, db)
@app.post('/generate/text')
async def generate_software_from_text(request: FreeformSoftwareRequest, db: DbSession):
"""Interpret a free-form request and run generation."""
if (
request.source == 'telegram'
and database_module.settings.telegram_chat_id
and request.chat_id
and str(request.chat_id) != str(database_module.settings.telegram_chat_id)
):
return {
'status': 'ignored',
'message': f"Ignoring Telegram message from chat {request.chat_id}",
'source': {
'type': request.source,
'chat_id': request.chat_id,
'chat_type': request.chat_type,
},
}
if request.source == 'telegram' and database_module.settings.prompt_queue_enabled and not request.process_now:
manager = DatabaseManager(db)
queue_item = manager.enqueue_prompt(
prompt_text=request.prompt_text,
source=request.source,
chat_id=request.chat_id,
chat_type=request.chat_type,
source_context={'chat_id': request.chat_id, 'chat_type': request.chat_type},
)
return {
'status': 'queued',
'message': 'Prompt queued for energy-aware processing.',
'queue_item': queue_item,
'queue_summary': manager.get_prompt_queue_summary(),
'queue_gate': await _get_queue_gate_status(force=False),
'source': {
'type': request.source,
'chat_id': request.chat_id,
'chat_type': request.chat_type,
},
}
return await _run_freeform_generation(request, db)
@app.get('/queue')
def get_prompt_queue(db: DbSession):
"""Return queued prompt items and prompt queue configuration."""
manager = DatabaseManager(db)
return {
'queue': manager.get_prompt_queue(),
'summary': manager.get_prompt_queue_summary(),
'config': {
'enabled': database_module.settings.prompt_queue_enabled,
'auto_process': database_module.settings.prompt_queue_auto_process,
'force_process': database_module.settings.prompt_queue_force_process,
'poll_interval_seconds': database_module.settings.prompt_queue_poll_interval_seconds,
'max_batch_size': database_module.settings.prompt_queue_max_batch_size,
},
}
@app.post('/queue/process')
async def process_prompt_queue(request: PromptQueueProcessRequest):
"""Manually process queued prompts, optionally bypassing the HA gate."""
return await _process_prompt_queue_batch(limit=request.limit, force=request.force)
@app.get('/gitea/health')
def get_gitea_health():
"""Return Gitea integration connectivity diagnostics."""
return _get_gitea_health()
@app.get('/home-assistant/health')
def get_home_assistant_health():
"""Return Home Assistant integration connectivity diagnostics."""
return _get_home_assistant_health()
@app.get('/projects')
def list_projects(
db: DbSession,
include_archived: bool = Query(default=False),
archived_only: bool = Query(default=False),
):
"""List recorded projects."""
manager = DatabaseManager(db)
projects = manager.get_all_projects(include_archived=include_archived, archived_only=archived_only)
return {'projects': [_serialize_project(project) for project in projects]}
@app.get('/status/{project_id}')
def get_project_status(project_id: str, db: DbSession):
"""Get the current status for a single project."""
manager = DatabaseManager(db)
history = manager.get_project_by_id(project_id)
if history is None:
raise HTTPException(status_code=404, detail='Project not found')
return _serialize_project(history)
@app.get('/audit/projects')
def get_audit_projects(db: DbSession):
"""Return projects together with their related logs and audit data."""
manager = DatabaseManager(db)
projects = []
for history in manager.get_all_projects():
project_data = _serialize_project(history)
audit_data = manager.get_project_audit_data(history.project_id)
project_data['logs'] = audit_data['logs']
project_data['actions'] = audit_data['actions']
project_data['audit_trail'] = audit_data['audit_trail']
projects.append(project_data)
return {'projects': projects}
@app.get('/audit/prompts')
def get_prompt_audit(db: DbSession, project_id: str | None = Query(default=None)):
"""Return stored prompt submissions."""
manager = DatabaseManager(db)
return {'prompts': [_serialize_audit_item(item) for item in manager.get_prompt_events(project_id=project_id)]}
@app.get('/audit/changes')
def get_code_change_audit(db: DbSession, project_id: str | None = Query(default=None)):
"""Return recorded code changes."""
manager = DatabaseManager(db)
return {'changes': [_serialize_audit_item(item) for item in manager.get_code_changes(project_id=project_id)]}
@app.get('/audit/issues')
def get_issue_audit(
db: DbSession,
project_id: str | None = Query(default=None),
state: str | None = Query(default=None),
):
"""Return tracked repository issues and issue-work events."""
manager = DatabaseManager(db)
return {
'issues': manager.get_repository_issues(project_id=project_id, state=state),
'issue_work': manager.get_issue_work_events(project_id=project_id),
}
@app.get('/audit/commit-context')
def get_commit_context_audit(
db: DbSession,
commit_hash: str = Query(min_length=4),
project_id: str | None = Query(default=None),
branch_scope: str | None = Query(default=None, pattern='^(main|pr|manual)?$'),
):
"""Return the recorded context explaining how a commit came to be."""
manager = DatabaseManager(db)
context = manager.get_commit_context(commit_hash=commit_hash, project_id=project_id, branch_scope=branch_scope)
if context is None:
raise HTTPException(status_code=404, detail='Commit context not found')
return context
@app.get('/audit/timeline')
def get_project_timeline_audit(
db: DbSession,
project_id: str = Query(min_length=1),
branch_scope: str | None = Query(default=None, pattern='^(main|pr|manual)?$'),
):
"""Return the mixed audit timeline for one project."""
manager = DatabaseManager(db)
return {'timeline': manager.get_project_timeline(project_id=project_id, branch_scope=branch_scope)}
@app.get('/audit/llm-traces')
def get_llm_trace_audit(
db: DbSession,
project_id: str | None = Query(default=None),
prompt_id: int | None = Query(default=None),
stage: str | None = Query(default=None),
model: str | None = Query(default=None),
search: str | None = Query(default=None),
):
"""Return persisted LLM traces."""
manager = DatabaseManager(db)
return {
'llm_traces': manager.get_llm_traces(
project_id=project_id,
prompt_id=prompt_id,
stage=stage,
model=model,
search_query=search,
)
}
@app.get('/audit/lineage')
def get_prompt_change_lineage(db: DbSession, project_id: str | None = Query(default=None)):
"""Return explicit prompt-to-code lineage rows."""
manager = DatabaseManager(db)
return {'lineage': manager.get_prompt_change_links(project_id=project_id)}
@app.get('/audit/correlations')
def get_prompt_change_correlations(db: DbSession, project_id: str | None = Query(default=None)):
"""Return prompt-to-change correlations for generated projects."""
manager = DatabaseManager(db)
return {'correlations': manager.get_prompt_change_correlations(project_id=project_id)}
@app.get('/audit/pull-requests')
def get_pull_request_audit(db: DbSession, project_id: str | None = Query(default=None), open_only: bool = Query(default=False)):
"""Return tracked pull requests for generated projects."""
manager = DatabaseManager(db)
return {'pull_requests': manager.get_pull_requests(project_id=project_id, only_open=open_only)}
@app.post('/projects/{project_id}/prompts/{prompt_id}/undo')
async def undo_prompt_changes(project_id: str, prompt_id: int, db: DbSession):
"""Undo all changes associated with a specific prompt."""
manager = DatabaseManager(db)
history = manager.get_project_by_id(project_id)
if history is None:
raise HTTPException(status_code=404, detail='Project not found')
if history.status == 'archived':
raise HTTPException(status_code=400, detail='Archived projects cannot be modified')
result = await PromptWorkflowManager(db).undo_prompt(project_id=project_id, prompt_id=prompt_id)
if result.get('status') == 'error':
raise HTTPException(status_code=400, detail=result.get('message', 'Undo failed'))
return result
@app.post('/projects/{project_id}/archive')
def archive_project(project_id: str, db: DbSession):
"""Archive a project so it no longer participates in active automation."""
manager = DatabaseManager(db)
result = manager.archive_project(project_id)
if result.get('status') == 'error':
raise HTTPException(status_code=404, detail=result.get('message', 'Archive failed'))
return result
@app.post('/projects/{project_id}/unarchive')
def unarchive_project(project_id: str, db: DbSession):
"""Restore an archived project back into the active automation set."""
manager = DatabaseManager(db)
result = manager.unarchive_project(project_id)
if result.get('status') == 'error':
raise HTTPException(status_code=404, detail=result.get('message', 'Restore failed'))
return result
@app.delete('/projects/{project_id}')
def delete_project(project_id: str, db: DbSession):
"""Delete a project, its local project directory, and project-scoped DB traces."""
manager = DatabaseManager(db)
audit_data = manager.get_project_audit_data(project_id)
if audit_data.get('project') is None:
raise HTTPException(status_code=404, detail='Project not found')
repository = audit_data.get('repository') or audit_data['project'].get('repository') or {}
remote_delete = None
if repository and repository.get('mode') != 'shared' and repository.get('owner') and repository.get('name') and database_module.settings.gitea_url and database_module.settings.gitea_token:
remote_delete = _create_gitea_api().delete_repo_sync(owner=repository.get('owner'), repo=repository.get('name'))
if remote_delete.get('error'):
manager.log_system_event(
component='gitea',
level='WARNING',
message=f"Remote repository delete failed for {repository.get('owner')}/{repository.get('name')}: {remote_delete.get('error')}",
)
result = manager.delete_project(project_id)
if result.get('status') == 'error':
raise HTTPException(status_code=400, detail=result.get('message', 'Project deletion failed'))
result['remote_repository_deleted'] = bool(remote_delete and not remote_delete.get('error'))
result['remote_repository_delete_error'] = remote_delete.get('error') if remote_delete else None
result['remote_repository'] = repository if repository else None
return result
@app.post('/projects/{project_id}/sync-repository')
def sync_project_repository(project_id: str, db: DbSession, commit_limit: int = Query(default=25, ge=1, le=200)):
"""Import recent repository activity from Gitea for a tracked project."""
manager = DatabaseManager(db)
history = manager.get_project_by_id(project_id)
if history is None:
raise HTTPException(status_code=404, detail='Project not found')
if history.status == 'archived':
raise HTTPException(status_code=400, detail='Archived projects cannot be synced')
gitea_api = _create_gitea_api()
result = manager.sync_repository_activity(project_id=project_id, gitea_api=gitea_api, commit_limit=commit_limit)
if result.get('status') == 'error':
raise HTTPException(status_code=400, detail=result.get('message', 'Repository sync failed'))
manager.sync_repository_issues(project_id=project_id, gitea_api=gitea_api, state='open')
return result
@app.get('/gitea/repos')
def list_gitea_repositories(db: DbSession, owner: str | None = Query(default=None)):
"""List repositories in the configured Gitea organization and whether they are already onboarded."""
gitea_api = _create_gitea_api()
resolved_owner = owner or database_module.settings.gitea_owner
repos = gitea_api.list_repositories_sync(owner=resolved_owner)
if isinstance(repos, dict) and repos.get('error'):
raise HTTPException(status_code=502, detail=repos.get('error'))
manager = DatabaseManager(db)
items = []
for repo in repos if isinstance(repos, list) else []:
tracked_project = manager.get_project_by_repository(resolved_owner, repo.get('name', ''))
items.append(
{
'name': repo.get('name'),
'full_name': repo.get('full_name') or f"{resolved_owner}/{repo.get('name')}",
'description': repo.get('description'),
'html_url': repo.get('html_url'),
'clone_url': repo.get('clone_url'),
'default_branch': repo.get('default_branch'),
'private': bool(repo.get('private', False)),
'onboarded': tracked_project is not None,
'project_id': tracked_project.project_id if tracked_project is not None else None,
}
)
return {'repositories': items}
@app.post('/gitea/repos/onboard')
async def onboard_gitea_repository(request: GiteaRepositoryOnboardRequest, db: DbSession):
"""Onboard a manually created Gitea repository into the factory dashboard."""
gitea_api = _create_gitea_api()
owner = request.owner or database_module.settings.gitea_owner
repo = await gitea_api.get_repo_info(owner=owner, repo=request.repo_name)
if isinstance(repo, dict) and repo.get('error'):
raise HTTPException(status_code=404, detail=repo.get('error'))
manager = DatabaseManager(db)
onboarded = manager.onboard_repository(owner=owner, repo_name=request.repo_name, repository_data=repo)
manager.sync_repository_issues(project_id=onboarded['project_id'], gitea_api=gitea_api, state='open')
sync_result = None
if request.sync_commits:
sync_result = manager.sync_repository_activity(
project_id=onboarded['project_id'],
gitea_api=gitea_api,
commit_limit=request.commit_limit,
)
return {
'status': 'success',
'onboarded': onboarded,
'sync_result': sync_result,
}
@app.get('/audit/logs')
def get_audit_logs(db: DbSession):
"""Return all project logs ordered newest first."""
logs = db.query(ProjectLog).order_by(ProjectLog.id.desc()).all()
return {'logs': [_serialize_project_log(log) for log in logs]}
@app.get('/audit/system/logs')
def get_system_audit_logs(
db: DbSession,
component: str | None = Query(default=None),
):
"""Return system logs with optional component filtering."""
query = db.query(SystemLog).order_by(SystemLog.id.desc())
if component:
query = query.filter(SystemLog.component == component)
return {'logs': [_serialize_system_log(log) for log in query.all()]}
@app.get('/n8n/health')
async def get_n8n_health():
"""Check whether the configured n8n instance is reachable."""
api_url = _resolve_n8n_api_url()
if not api_url:
return {
'status': 'error',
'message': 'N8N_API_URL or N8N_WEBHOOK_URL is not configured.',
'api_url': '',
'auth_configured': bool(database_module.settings.n8n_api_key),
'checks': [],
'suggestion': 'Set N8N_API_URL to the base n8n address before provisioning workflows.',
}
agent = N8NSetupAgent(api_url=api_url, webhook_token=database_module.settings.n8n_api_key)
return await agent.health_check()
@app.post('/n8n/setup')
async def setup_n8n_workflow(request: N8NSetupRequest, db: DbSession):
"""Create or update the n8n Telegram workflow."""
api_url = _resolve_n8n_api_url(request.api_url)
if not api_url:
raise HTTPException(status_code=400, detail='n8n API URL is not configured')
agent = N8NSetupAgent(
api_url=api_url,
webhook_token=(request.api_key or database_module.settings.n8n_api_key),
)
result = await agent.setup(
webhook_path=request.webhook_path,
backend_url=request.backend_url or f"{database_module.settings.backend_public_url}/generate/text",
force_update=request.force_update,
telegram_bot_token=database_module.settings.telegram_bot_token,
telegram_credential_name=database_module.settings.n8n_telegram_credential_name,
)
manager = DatabaseManager(db)
log_level = 'INFO' if result.get('status') != 'error' else 'ERROR'
manager.log_system_event(
component='n8n',
level=log_level,
message=result.get('message', json.dumps(result)),
)
return result
@app.post('/init-db')
def initialize_database():
"""Initialize database tables (POST endpoint for NiceGUI to call before dashboard)."""
try:
database_module.init_db()
return {'message': 'Database tables created successfully', 'status': 'success'}
except Exception as e:
return {'message': f'Error initializing database: {str(e)}', 'status': 'error'}
frontend.init(app)
if __name__ == '__main__':
print('Please start the app with the "uvicorn" command as shown in the start.sh script')