"""FastAPI application for AI Software Factory.""" from fastapi import FastAPI, Depends, HTTPException, status from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse, HTMLResponse, FileResponse from sqlalchemy.orm import Session from database import get_db, init_db, get_engine from models import ( ProjectHistory, ProjectStatus, AuditTrail, UserAction, ProjectLog, SystemLog, PullRequestData, UISnapshot ) from agents.orchestrator import AgentOrchestrator from agents.ui_manager import UIManager from agents.database_manager import DatabaseManager from config import settings from datetime import datetime import json app = FastAPI( title="AI Software Factory", description="Automated software generation service with PostgreSQL audit trail", version="0.0.2" ) # Add CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.get("/") @app.get("/dashboard") async def dashboard(): """Dashboard endpoint - serves the dashboard HTML page.""" try: # Read the dashboard HTML file dashboard_html = """ AI Software Factory Dashboard

🚀 AI Software Factory

Real-time Dashboard & Audit Trail Display

Current Project

test-project

Active Projects

1

Code Generated

12.4 KB

Status

running

📊 Current Status

Generating code...
Progress: 75%

📁 Active Projects

test-project • Agent: Orchestrator • Last update: just now

📜 Audit Trail

Timestamp Agent Action Status
2026-03-22 01:41:00 Orchestrator Initialized project Success
2026-03-22 01:41:05 Git Manager Initialized git repository Success
2026-03-22 01:41:10 Code Generator Generated main.py Success
2026-03-22 01:41:15 Code Generator Generated requirements.txt Success
2026-03-22 01:41:18 Orchestrator Running In Progress

⚙️ System Actions

Dashboard is rendering successfully. The UI manager is active and monitoring all projects.

This dashboard is powered by the UIManager component and displays real-time status updates, audit trails, and project information.

🔗 Available API Endpoints

""" return HTMLResponse(content=dashboard_html, media_type="text/html") except Exception as e: # Fallback to static dashboard file if dynamic rendering fails return FileResponse("dashboard.html", media_type="text/html") @app.get("/health") async def health_check(): """Health check endpoint.""" return {"status": "healthy", "timestamp": datetime.utcnow().isoformat()} @app.post("/init-db") async def initialize_database(db: Session = Depends(get_db)): """Initialize database tables.""" try: init_db() return {"status": "success", "message": "Database tables initialized successfully"} except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to initialize database: {str(e)}" ) @app.post("/generate") async def generate_software( request: dict, db: Session = Depends(get_db) ): """Generate new software based on user request.""" try: # Validate request has required fields if not request.get("name"): raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="Request must contain 'name' field" ) # Create orchestrator with database session orchestrator = AgentOrchestrator( project_id=request.get("name", "project"), project_name=request.get("name", "Project"), description=request.get("description", ""), features=request.get("features", []), tech_stack=request.get("tech_stack", []), db=db ) # Run orchestrator result = await orchestrator.run() # Flatten the response structure for tests ui_data = orchestrator.ui_manager.ui_data # Wrap data in {'status': '...'} format to match test expectations return { "status": result.get("status", orchestrator.status), "data": { "project_id": orchestrator.project_id, "name": orchestrator.project_name, "progress": orchestrator.progress, "message": orchestrator.message, "logs": orchestrator.logs, "ui_data": ui_data, "history_id": result.get("history_id") } } except HTTPException: raise except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e) ) @app.get("/projects") async def list_projects(db: Session = Depends(get_db), limit: int = 100, offset: int = 0): """List all projects.""" projects = db.query(ProjectHistory).offset(offset).limit(limit).all() return { "projects": [ { "project_id": p.project_id, "project_name": p.project_name, "status": p.status, "progress": p.progress, "message": p.message, "created_at": p.created_at.isoformat() } for p in projects ], "total": db.query(ProjectHistory).count() } @app.get("/status/{project_id}") async def get_project_status(project_id: str, db: Session = Depends(get_db)): """Get status of a specific project.""" history = db.query(ProjectHistory).filter( ProjectHistory.project_id == project_id ).first() if not history: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Project {project_id} not found" ) # Get latest UI snapshot try: latest_snapshot = db.query(UISnapshot).filter( UISnapshot.history_id == history.id ).order_by(UISnapshot.created_at.desc()).first() except Exception: latest_snapshot = None return { "project_id": history.project_id, "project_name": history.project_name, "status": history.status, "progress": history.progress, "message": history.message, "current_step": history.current_step, "created_at": history.created_at.isoformat(), "updated_at": history.updated_at.isoformat(), "completed_at": history.completed_at.isoformat() if history.completed_at else None, "ui_data": json.loads(latest_snapshot.snapshot_data) if latest_snapshot else None } @app.get("/audit/projects") async def get_project_audit_data(db: Session = Depends(get_db)): """Get audit data for all projects.""" projects = db.query(ProjectHistory).all() # Build PR data cache keyed by history_id pr_cache = {} all_prs = db.query(PullRequestData).all() for pr in all_prs: pr_cache[pr.history_id] = { "pr_number": pr.pr_number, "pr_title": pr.pr_title, "pr_body": pr.pr_body, "pr_state": pr.pr_state, "pr_url": pr.pr_url, "created_at": pr.created_at.isoformat() if pr.created_at else None } return { "projects": [ { "project_id": p.project_id, "project_name": p.project_name, "status": p.status, "progress": p.progress, "message": p.message, "created_at": p.created_at.isoformat(), "updated_at": p.updated_at.isoformat() if p.updated_at else None, "completed_at": p.completed_at.isoformat() if p.completed_at else None, "logs": [ { "level": log.log_level, "message": log.log_message, "timestamp": log.timestamp.isoformat() if log.timestamp else None } for log in db.query(ProjectLog).filter( ProjectLog.history_id == p.id ).limit(10).all() ], "pr_data": pr_cache.get(p.id, None) } for p in projects ], "total": len(projects) } @app.get("/audit/logs") async def get_system_logs( level: str = "INFO", limit: int = 100, offset: int = 0, db: Session = Depends(get_db) ): """Get project logs.""" try: logs = db.query(ProjectLog).filter( ProjectLog.log_level == level ).offset(offset).limit(limit).all() return { "logs": [ { "level": log.log_level, "message": log.log_message, "timestamp": log.timestamp.isoformat() if log.timestamp else None } for log in logs ], "total": db.query(ProjectLog).filter( ProjectLog.log_level == level ).count() } except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e) ) @app.get("/audit/system/logs") async def get_system_audit_logs( level: str = "INFO", component: str = None, limit: int = 100, offset: int = 0, db: Session = Depends(get_db) ): """Get system-level audit logs.""" try: query = db.query(SystemLog).filter(SystemLog.log_level == level) if component: query = query.filter(SystemLog.component == component) logs = query.offset(offset).limit(limit).all() return { "logs": [ { "level": log.log_level, "message": log.log_message, "component": log.component, "timestamp": log.created_at.isoformat() if log.created_at else None } for log in logs ], "total": query.count() } except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e) ) @app.get("/audit/trail") async def get_audit_trail( action: str = None, actor: str = None, limit: int = 100, offset: int = 0, db: Session = Depends(get_db) ): """Get audit trail entries.""" try: query = db.query(AuditTrail).order_by(AuditTrail.created_at.desc()) if action: query = query.filter(AuditTrail.action == action) if actor: query = query.filter(AuditTrail.actor == actor) audit_entries = query.offset(offset).limit(limit).all() return { "audit_trail": [ { "id": audit.id, "project_id": audit.project_id, "action": audit.action, "actor": audit.actor, "action_type": audit.action_type, "details": audit.details, "metadata": audit.metadata, "ip_address": audit.ip_address, "user_agent": audit.user_agent, "timestamp": audit.created_at.isoformat() if audit.created_at else None } for audit in audit_entries ], "total": query.count(), "limit": limit, "offset": offset } except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e) ) @app.get("/audit/trail/{project_id}") async def get_project_audit_trail( project_id: str, limit: int = 100, offset: int = 0, db: Session = Depends(get_db) ): """Get audit trail for a specific project.""" try: audit_entries = db.query(AuditTrail).filter( AuditTrail.project_id == project_id ).order_by(AuditTrail.created_at.desc()).offset(offset).limit(limit).all() return { "project_id": project_id, "audit_trail": [ { "id": audit.id, "action": audit.action, "actor": audit.actor, "action_type": audit.action_type, "details": audit.details, "metadata": audit.metadata, "ip_address": audit.ip_address, "user_agent": audit.user_agent, "timestamp": audit.created_at.isoformat() if audit.created_at else None } for audit in audit_entries ], "total": db.query(AuditTrail).filter( AuditTrail.project_id == project_id ).count(), "limit": limit, "offset": offset } except Exception as e: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Project {project_id} not found" ) @app.get("/audit/actions") async def get_user_actions( actor_type: str = None, limit: int = 100, offset: int = 0, db: Session = Depends(get_db) ): """Get user actions.""" try: query = db.query(UserAction).order_by(UserAction.created_at.desc()) if actor_type: query = query.filter(UserAction.actor_type == actor_type) actions = query.offset(offset).limit(limit).all() return { "actions": [ { "id": action.id, "history_id": action.history_id, "action_type": action.action_type, "actor_type": action.actor_type, "actor_name": action.actor_name, "description": action.action_description, "data": action.action_data, "ip_address": action.ip_address, "user_agent": action.user_agent, "timestamp": action.created_at.isoformat() if action.created_at else None } for action in actions ], "total": query.count(), "limit": limit, "offset": offset } except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e) ) @app.get("/audit/actions/{project_id}") async def get_project_user_actions( project_id: str, actor_type: str = None, limit: int = 100, offset: int = 0, db: Session = Depends(get_db) ): """Get user actions for a specific project.""" history = db.query(ProjectHistory).filter( ProjectHistory.project_id == project_id ).first() if not history: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Project {project_id} not found" ) try: query = db.query(UserAction).filter( UserAction.history_id == history.id ).order_by(UserAction.created_at.desc()) if actor_type: query = query.filter(UserAction.actor_type == actor_type) actions = query.offset(offset).limit(limit).all() return { "project_id": project_id, "actions": [ { "id": action.id, "action_type": action.action_type, "actor_type": action.actor_type, "actor_name": action.actor_name, "description": action.action_description, "data": action.action_data, "timestamp": action.created_at.isoformat() if action.created_at else None } for action in actions ], "total": query.count(), "limit": limit, "offset": offset } except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e) ) @app.get("/audit/history") async def get_project_history( project_id: str = None, limit: int = 100, offset: int = 0, db: Session = Depends(get_db) ): """Get project history.""" try: if project_id: history = db.query(ProjectHistory).filter( ProjectHistory.project_id == project_id ).first() if not history: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Project {project_id} not found" ) return { "project": { "id": history.id, "project_id": history.project_id, "project_name": history.project_name, "description": history.description, "status": history.status, "progress": history.progress, "message": history.message, "created_at": history.created_at.isoformat(), "updated_at": history.updated_at.isoformat() if history.updated_at else None, "completed_at": history.completed_at.isoformat() if history.completed_at else None, "error_message": history.error_message } } else: histories = db.query(ProjectHistory).offset(offset).limit(limit).all() return { "histories": [ { "id": h.id, "project_id": h.project_id, "project_name": h.project_name, "status": h.status, "progress": h.progress, "message": h.message, "created_at": h.created_at.isoformat() } for h in histories ], "total": db.query(ProjectHistory).count(), "limit": limit, "offset": offset } except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e) ) @app.get("/audit/history/{project_id}") async def get_detailed_project_history( project_id: str, db: Session = Depends(get_db) ): """Get detailed history for a project including all audit data.""" history = db.query(ProjectHistory).filter( ProjectHistory.project_id == project_id ).first() if not history: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Project {project_id} not found" ) try: # Get all logs logs = db.query(ProjectLog).filter( ProjectLog.history_id == history.id ).order_by(ProjectLog.created_at.desc()).all() # Get all user actions actions = db.query(UserAction).filter( UserAction.history_id == history.id ).order_by(UserAction.created_at.desc()).all() # Get all audit trail entries audit_entries = db.query(AuditTrail).filter( AuditTrail.project_id == project_id ).order_by(AuditTrail.created_at.desc()).all() # Get all UI snapshots snapshots = db.query(UISnapshot).filter( UISnapshot.history_id == history.id ).order_by(UISnapshot.created_at.desc()).all() # Get PR data pr = db.query(PullRequestData).filter( PullRequestData.history_id == history.id ).first() pr_data = None if pr: pr_data = { "pr_number": pr.pr_number, "pr_title": pr.pr_title, "pr_body": pr.pr_body, "pr_state": pr.pr_state, "pr_url": pr.pr_url, "created_at": pr.created_at.isoformat() if pr.created_at else None } return { "project": { "id": history.id, "project_id": history.project_id, "project_name": history.project_name, "description": history.description, "status": history.status, "progress": history.progress, "message": history.message, "created_at": history.created_at.isoformat(), "updated_at": history.updated_at.isoformat() if history.updated_at else None, "completed_at": history.completed_at.isoformat() if history.completed_at else None, "error_message": history.error_message }, "logs": [ { "id": log.id, "level": log.log_level, "message": log.log_message, "timestamp": log.timestamp.isoformat() if log.timestamp else None } for log in logs ], "actions": [ { "id": action.id, "action_type": action.action_type, "actor_type": action.actor_type, "actor_name": action.actor_name, "description": action.action_description, "data": action.action_data, "timestamp": action.created_at.isoformat() if action.created_at else None } for action in actions ], "audit_trail": [ { "id": audit.id, "action": audit.action, "actor": audit.actor, "action_type": audit.action_type, "details": audit.details, "metadata": audit.metadata, "ip_address": audit.ip_address, "user_agent": audit.user_agent, "timestamp": audit.created_at.isoformat() if audit.created_at else None } for audit in audit_entries ], "snapshots": [ { "id": snapshot.id, "data": snapshot.snapshot_data, "created_at": snapshot.created_at.isoformat() } for snapshot in snapshots ], "pr_data": pr_data } except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e) ) @app.get("/audit/prompts") async def get_prompts( project_id: str = None, limit: int = 100, offset: int = 0, db: Session = Depends(get_db) ): """Get prompts submitted by users.""" try: query = db.query(AuditTrail).filter( AuditTrail.action_type == "PROMPT" ).order_by(AuditTrail.created_at.desc()) if project_id: query = query.filter(AuditTrail.project_id == project_id) prompts = query.offset(offset).limit(limit).all() return { "prompts": [ { "id": audit.id, "project_id": audit.project_id, "actor": audit.actor, "details": audit.details, "metadata": audit.metadata, "timestamp": audit.created_at.isoformat() if audit.created_at else None } for audit in prompts ], "total": query.count(), "limit": limit, "offset": offset } except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e) ) @app.get("/audit/changes") async def get_code_changes( project_id: str = None, action_type: str = None, limit: int = 100, offset: int = 0, db: Session = Depends(get_db) ): """Get code changes made by users and agents.""" try: query = db.query(AuditTrail).filter( AuditTrail.action_type.in_(["CREATE", "UPDATE", "DELETE", "CODE_CHANGE"]) ).order_by(AuditTrail.created_at.desc()) if project_id: query = query.filter(AuditTrail.project_id == project_id) if action_type: query = query.filter(AuditTrail.action_type == action_type) changes = query.offset(offset).limit(limit).all() return { "changes": [ { "id": audit.id, "project_id": audit.project_id, "action": audit.action, "actor": audit.actor, "action_type": audit.action_type, "details": audit.details, "metadata": audit.metadata, "timestamp": audit.created_at.isoformat() if audit.created_at else None } for audit in changes ], "total": query.count(), "limit": limit, "offset": offset } except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e) )