Files
ai_software_factory/ai_software_factory/main.py

357 lines
12 KiB
Python

#!/usr/bin/env python3
"""AI Software Factory - Main application with FastAPI backend and NiceGUI frontend.
This application uses FastAPI to:
1. Provide HTTP API endpoints
2. Host NiceGUI frontend via ui.run_with()
The NiceGUI frontend provides:
1. Interactive dashboard at /show
2. Real-time data visualization
3. Audit trail display
"""
from __future__ import annotations
import json
import re
from pathlib import Path
from typing import Annotated
from uuid import uuid4
from fastapi import Depends, FastAPI, HTTPException, Query
from pydantic import BaseModel, Field
from sqlalchemy.orm import Session
try:
from . import __version__, frontend
from . import database as database_module
from .agents.database_manager import DatabaseManager
from .agents.orchestrator import AgentOrchestrator
from .agents.n8n_setup import N8NSetupAgent
from .agents.ui_manager import UIManager
from .models import ProjectHistory, ProjectLog, SystemLog
except ImportError:
import frontend
import database as database_module
from agents.database_manager import DatabaseManager
from agents.orchestrator import AgentOrchestrator
from agents.n8n_setup import N8NSetupAgent
from agents.ui_manager import UIManager
from models import ProjectHistory, ProjectLog, SystemLog
__version__ = "0.0.1"
app = FastAPI()
DbSession = Annotated[Session, Depends(database_module.get_db)]
PROJECT_ID_PATTERN = re.compile(r"[^a-z0-9]+")
class SoftwareRequest(BaseModel):
"""Request body for software generation."""
name: str = Field(min_length=1, max_length=255)
description: str = Field(min_length=1, max_length=255)
features: list[str] = Field(default_factory=list)
tech_stack: list[str] = Field(default_factory=list)
class N8NSetupRequest(BaseModel):
"""Request body for n8n workflow provisioning."""
api_url: str | None = None
api_key: str | None = None
webhook_path: str = "telegram"
backend_url: str | None = None
force_update: bool = False
def _build_project_id(name: str) -> str:
"""Create a stable project id from the requested name."""
slug = PROJECT_ID_PATTERN.sub("-", name.strip().lower()).strip("-") or "project"
return f"{slug}-{uuid4().hex[:8]}"
def _serialize_project(history: ProjectHistory) -> dict:
"""Serialize a project history row for API responses."""
return {
"history_id": history.id,
"project_id": history.project_id,
"name": history.project_name,
"description": history.description,
"status": history.status,
"progress": history.progress,
"message": history.message,
"current_step": history.current_step,
"error_message": history.error_message,
"created_at": history.created_at.isoformat() if history.created_at else None,
"updated_at": history.updated_at.isoformat() if history.updated_at else None,
"completed_at": history.completed_at.isoformat() if history.completed_at else None,
}
def _serialize_project_log(log: ProjectLog) -> dict:
"""Serialize a project log row."""
return {
"id": log.id,
"history_id": log.history_id,
"level": log.log_level,
"message": log.log_message,
"timestamp": log.timestamp.isoformat() if log.timestamp else None,
}
def _serialize_system_log(log: SystemLog) -> dict:
"""Serialize a system log row."""
return {
"id": log.id,
"component": log.component,
"level": log.log_level,
"message": log.log_message,
"user_agent": log.user_agent,
"ip_address": log.ip_address,
"timestamp": log.created_at.isoformat() if log.created_at else None,
}
def _serialize_audit_item(item: dict) -> dict:
"""Return audit-shaped dictionaries unchanged for API output."""
return item
def _compose_prompt_text(request: SoftwareRequest) -> str:
"""Render the originating software request into a stable prompt string."""
features = ", ".join(request.features) if request.features else "None"
tech_stack = ", ".join(request.tech_stack) if request.tech_stack else "None"
return (
f"Name: {request.name}\n"
f"Description: {request.description}\n"
f"Features: {features}\n"
f"Tech Stack: {tech_stack}"
)
def _project_root(project_id: str) -> Path:
"""Resolve the filesystem location for a generated project."""
return database_module.settings.projects_root / project_id
def _resolve_n8n_api_url(explicit_url: str | None = None) -> str:
"""Resolve the effective n8n API URL from explicit input or settings."""
if explicit_url and explicit_url.strip():
return explicit_url.strip()
if database_module.settings.n8n_api_url:
return database_module.settings.n8n_api_url
webhook_url = database_module.settings.n8n_webhook_url
if webhook_url:
return webhook_url.split("/webhook", 1)[0].rstrip("/")
return ""
@app.get('/')
def read_root():
"""Root endpoint that returns service metadata."""
return {
'service': 'AI Software Factory',
'version': __version__,
'endpoints': [
'/',
'/health',
'/generate',
'/projects',
'/status/{project_id}',
'/audit/projects',
'/audit/logs',
'/audit/system/logs',
'/audit/prompts',
'/audit/changes',
'/audit/lineage',
'/audit/correlations',
'/n8n/health',
'/n8n/setup',
],
}
@app.get('/health')
def health_check():
"""Health check endpoint."""
return {
'status': 'healthy',
'database': 'sqlite' if database_module.settings.USE_SQLITE else 'postgresql',
}
@app.post('/generate')
async def generate_software(request: SoftwareRequest, db: DbSession):
"""Create and record a software-generation request."""
database_module.init_db()
project_id = _build_project_id(request.name)
prompt_text = _compose_prompt_text(request)
orchestrator = AgentOrchestrator(
project_id=project_id,
project_name=request.name,
description=request.description,
features=request.features,
tech_stack=request.tech_stack,
db=db,
prompt_text=prompt_text,
)
result = await orchestrator.run()
manager = DatabaseManager(db)
manager.log_system_event(
component='api',
level='INFO' if result['status'] == 'completed' else 'ERROR',
message=f"Generated project {project_id} with {len(result.get('changed_files', []))} artifact(s)",
)
history = manager.get_project_by_id(project_id)
project_logs = manager.get_project_logs(history.id)
response_data = _serialize_project(history)
response_data['logs'] = [_serialize_project_log(log) for log in project_logs]
response_data['ui_data'] = result.get('ui_data')
response_data['features'] = request.features
response_data['tech_stack'] = request.tech_stack
response_data['project_root'] = result.get('project_root', str(_project_root(project_id)))
response_data['changed_files'] = result.get('changed_files', [])
return {'status': result['status'], 'data': response_data}
@app.get('/projects')
def list_projects(db: DbSession):
"""List recorded projects."""
manager = DatabaseManager(db)
projects = manager.get_all_projects()
return {'projects': [_serialize_project(project) for project in projects]}
@app.get('/status/{project_id}')
def get_project_status(project_id: str, db: DbSession):
"""Get the current status for a single project."""
manager = DatabaseManager(db)
history = manager.get_project_by_id(project_id)
if history is None:
raise HTTPException(status_code=404, detail='Project not found')
return _serialize_project(history)
@app.get('/audit/projects')
def get_audit_projects(db: DbSession):
"""Return projects together with their related logs and audit data."""
manager = DatabaseManager(db)
projects = []
for history in manager.get_all_projects():
project_data = _serialize_project(history)
audit_data = manager.get_project_audit_data(history.project_id)
project_data['logs'] = audit_data['logs']
project_data['actions'] = audit_data['actions']
project_data['audit_trail'] = audit_data['audit_trail']
projects.append(project_data)
return {'projects': projects}
@app.get('/audit/prompts')
def get_prompt_audit(db: DbSession, project_id: str | None = Query(default=None)):
"""Return stored prompt submissions."""
manager = DatabaseManager(db)
return {'prompts': [_serialize_audit_item(item) for item in manager.get_prompt_events(project_id=project_id)]}
@app.get('/audit/changes')
def get_code_change_audit(db: DbSession, project_id: str | None = Query(default=None)):
"""Return recorded code changes."""
manager = DatabaseManager(db)
return {'changes': [_serialize_audit_item(item) for item in manager.get_code_changes(project_id=project_id)]}
@app.get('/audit/lineage')
def get_prompt_change_lineage(db: DbSession, project_id: str | None = Query(default=None)):
"""Return explicit prompt-to-code lineage rows."""
manager = DatabaseManager(db)
return {'lineage': manager.get_prompt_change_links(project_id=project_id)}
@app.get('/audit/correlations')
def get_prompt_change_correlations(db: DbSession, project_id: str | None = Query(default=None)):
"""Return prompt-to-change correlations for generated projects."""
manager = DatabaseManager(db)
return {'correlations': manager.get_prompt_change_correlations(project_id=project_id)}
@app.get('/audit/logs')
def get_audit_logs(db: DbSession):
"""Return all project logs ordered newest first."""
logs = db.query(ProjectLog).order_by(ProjectLog.id.desc()).all()
return {'logs': [_serialize_project_log(log) for log in logs]}
@app.get('/audit/system/logs')
def get_system_audit_logs(
db: DbSession,
component: str | None = Query(default=None),
):
"""Return system logs with optional component filtering."""
query = db.query(SystemLog).order_by(SystemLog.id.desc())
if component:
query = query.filter(SystemLog.component == component)
return {'logs': [_serialize_system_log(log) for log in query.all()]}
@app.get('/n8n/health')
async def get_n8n_health():
"""Check whether the configured n8n instance is reachable."""
api_url = _resolve_n8n_api_url()
if not api_url:
return {'status': 'error', 'message': 'N8N_API_URL or N8N_WEBHOOK_URL is not configured'}
agent = N8NSetupAgent(api_url=api_url, webhook_token=database_module.settings.n8n_api_key)
result = await agent.health_check()
return {'status': 'ok' if not result.get('error') else 'error', 'data': result}
@app.post('/n8n/setup')
async def setup_n8n_workflow(request: N8NSetupRequest, db: DbSession):
"""Create or update the n8n Telegram workflow."""
api_url = _resolve_n8n_api_url(request.api_url)
if not api_url:
raise HTTPException(status_code=400, detail='n8n API URL is not configured')
agent = N8NSetupAgent(
api_url=api_url,
webhook_token=(request.api_key or database_module.settings.n8n_api_key),
)
result = await agent.setup(
webhook_path=request.webhook_path,
backend_url=request.backend_url or f"{database_module.settings.backend_public_url}/generate",
force_update=request.force_update,
telegram_bot_token=database_module.settings.telegram_bot_token,
telegram_credential_name=database_module.settings.n8n_telegram_credential_name,
)
manager = DatabaseManager(db)
log_level = 'INFO' if result.get('status') != 'error' else 'ERROR'
manager.log_system_event(
component='n8n',
level=log_level,
message=result.get('message', json.dumps(result)),
)
return result
@app.post('/init-db')
def initialize_database():
"""Initialize database tables (POST endpoint for NiceGUI to call before dashboard)."""
try:
database_module.init_db()
return {'message': 'Database tables created successfully', 'status': 'success'}
except Exception as e:
return {'message': f'Error initializing database: {str(e)}', 'status': 'error'}
frontend.init(app)
if __name__ == '__main__':
print('Please start the app with the "uvicorn" command as shown in the start.sh script')