feat: initial release, refs NOISSUE
Some checks failed
Upload Python Package / Create Release (push) Successful in 37s
Upload Python Package / deploy (push) Failing after 38s

This commit is contained in:
2026-04-02 01:42:26 +02:00
parent 0b1384279d
commit e824475872
44 changed files with 4435 additions and 30 deletions

View File

@@ -0,0 +1,17 @@
"""AI Software Factory agents."""
from ai_software_factory.agents.orchestrator import AgentOrchestrator
from ai_software_factory.agents.git_manager import GitManager
from ai_software_factory.agents.ui_manager import UIManager
from ai_software_factory.agents.telegram import TelegramHandler
from ai_software_factory.agents.gitea import GiteaAPI
from ai_software_factory.agents.database_manager import DatabaseManager
__all__ = [
"AgentOrchestrator",
"GitManager",
"UIManager",
"TelegramHandler",
"GiteaAPI",
"DatabaseManager"
]

View File

@@ -0,0 +1,501 @@
"""Database manager for audit logging."""
from sqlalchemy.orm import Session
from sqlalchemy import text
from ai_software_factory.database import get_db
from ai_software_factory.models import (
ProjectHistory, ProjectLog, UISnapshot, PullRequestData, SystemLog, UserAction, AuditTrail, PullRequest, ProjectStatus
)
from datetime import datetime
import json
class DatabaseMigrations:
"""Handles database migrations."""
def __init__(self, db: Session):
"""Initialize migrations."""
self.db = db
def run(self) -> int:
"""Run migrations."""
return 0
def get_project_by_id(self, project_id: str) -> ProjectHistory | None:
"""Get project by ID."""
return self.db.query(ProjectHistory).filter(ProjectHistory.project_id == project_id).first()
def get_all_projects(self) -> list[ProjectHistory]:
"""Get all projects."""
return self.db.query(ProjectHistory).all()
def get_project_logs(self, history_id: int, limit: int = 100) -> list[ProjectLog]:
"""Get project logs."""
return self.db.query(ProjectLog).filter(ProjectLog.history_id == history_id).limit(limit).all()
def get_system_logs(self, limit: int = 100) -> list[SystemLog]:
"""Get system logs."""
return self.db.query(SystemLog).limit(limit).all()
def log_system_event(self, component: str, level: str, message: str,
user_agent: str | None = None, ip_address: str | None = None) -> SystemLog:
"""Log a system event."""
log = SystemLog(
component=component,
log_level=level,
log_message=message,
user_agent=user_agent,
ip_address=ip_address
)
self.db.add(log)
self.db.commit()
self.db.refresh(log)
return log
class DatabaseManager:
"""Manages database operations for audit logging and history tracking."""
def __init__(self, db: Session):
"""Initialize database manager."""
self.db = db
self.migrations = DatabaseMigrations(self.db)
def log_project_start(self, project_id: str, project_name: str, description: str) -> ProjectHistory:
"""Log project start."""
history = ProjectHistory(
project_id=project_id,
project_name=project_name,
description=description,
status=ProjectStatus.INITIALIZED.value,
progress=0,
message="Project initialization started"
)
self.db.add(history)
self.db.commit()
self.db.refresh(history)
# Log the action in audit trail
self._log_audit_trail(
project_id=project_id,
action="PROJECT_CREATED",
actor="system",
action_type="CREATE",
details=f"Project {project_name} was created",
message="Project created successfully"
)
return history
def log_progress_update(self, history_id: int, progress: int, step: str, message: str) -> None:
"""Log progress update."""
history = self.db.query(ProjectHistory).filter(
ProjectHistory.id == history_id
).first()
if history:
history.progress = progress
history.current_step = step
history.message = message
self.db.commit()
# Log the action
self._log_action(history_id, "INFO", f"Progress: {progress}%, Step: {step} - {message}")
# Log to audit trail
self._log_audit_trail(
project_id=history.project_id,
action="PROGRESS_UPDATE",
actor="agent",
action_type="UPDATE",
details=f"Progress updated to {progress}% - {step}",
message=f"Progress: {progress}%, Step: {step} - {message}",
metadata_json=json.dumps({"step": step, "message": message})
)
def log_project_complete(self, history_id: int, message: str) -> None:
"""Log project completion."""
history = self.db.query(ProjectHistory).filter(
ProjectHistory.id == history_id
).first()
if history:
history.status = ProjectStatus.COMPLETED.value
history.completed_at = datetime.utcnow()
history.message = message
self.db.commit()
# Log the action
self._log_action(history_id, "INFO", f"Project completed: {message}")
# Log to audit trail
self._log_audit_trail(
project_id=history.project_id,
action="PROJECT_COMPLETED",
actor="agent",
action_type="COMPLETE",
details=message,
message=f"Project completed: {message}"
)
def log_error(self, history_id: int, error: str) -> None:
"""Log error."""
history = self.db.query(ProjectHistory).filter(
ProjectHistory.id == history_id
).first()
if history:
history.status = ProjectStatus.ERROR.value
history.error_message = error
self.db.commit()
# Log the action
self._log_action(history_id, "ERROR", f"Error occurred: {error}")
# Log to audit trail
self._log_audit_trail(
project_id=history.project_id,
action="ERROR_OCCURRED",
actor="agent",
action_type="ERROR",
details=error,
message=f"Error occurred: {error}"
)
def _log_action(self, history_id: int, level: str, message: str) -> None:
"""Log an action to the project log."""
project_log = ProjectLog(
history_id=history_id,
log_level=level,
log_message=message,
timestamp=datetime.utcnow()
)
self.db.add(project_log)
self.db.commit()
def save_ui_snapshot(self, history_id: int, ui_data: dict) -> UISnapshot:
"""Save UI snapshot."""
snapshot = UISnapshot(
history_id=history_id,
snapshot_data=json.dumps(ui_data),
created_at=datetime.utcnow()
)
self.db.add(snapshot)
self.db.commit()
self.db.refresh(snapshot)
return snapshot
def save_pr_data(self, history_id: int, pr_data: dict) -> PullRequest:
"""Save PR data."""
# Parse PR data
pr_number = pr_data.get("pr_number", pr_data.get("id", 0))
pr_title = pr_data.get("title", pr_data.get("pr_title", ""))
pr_body = pr_data.get("body", pr_data.get("pr_body", ""))
pr_state = pr_data.get("state", pr_data.get("pr_state", "open"))
pr_url = pr_data.get("url", pr_data.get("pr_url", ""))
pr = PullRequest(
history_id=history_id,
pr_number=pr_number,
pr_title=pr_title,
pr_body=pr_body,
base=pr_data.get("base", "main"),
user=pr_data.get("user", "system"),
pr_url=pr_url,
merged=False,
pr_state=pr_state
)
self.db.add(pr)
self.db.commit()
self.db.refresh(pr)
return pr
def get_project_by_id(self, project_id: str) -> ProjectHistory | None:
"""Get project by ID."""
return self.db.query(ProjectHistory).filter(ProjectHistory.project_id == project_id).first()
def get_all_projects(self) -> list[ProjectHistory]:
"""Get all projects."""
return self.db.query(ProjectHistory).all()
def get_project_logs(self, history_id: int, limit: int = 100) -> list[ProjectLog]:
"""Get project logs."""
return self.db.query(ProjectLog).filter(ProjectLog.history_id == history_id).limit(limit).all()
def log_system_event(self, component: str, level: str, message: str,
user_agent: str | None = None, ip_address: str | None = None) -> SystemLog:
"""Log a system event."""
log = SystemLog(
component=component,
log_level=level,
log_message=message,
user_agent=user_agent,
ip_address=ip_address
)
self.db.add(log)
self.db.commit()
self.db.refresh(log)
return log
def _log_audit_trail(
self,
project_id: str,
action: str,
actor: str,
action_type: str,
details: str,
message: str | None = None,
**kwargs
) -> AuditTrail:
"""Log to the audit trail."""
metadata_json = kwargs.get("metadata_json", kwargs.get("metadata", "{}"))
audit = AuditTrail(
project_id=project_id,
action=action,
actor=actor,
action_type=action_type,
details=details,
message=message or details,
metadata_json=metadata_json or "{}"
)
self.db.add(audit)
self.db.commit()
return audit
def get_logs(self, project_id: str = None, level: str = None, limit: int = 100) -> list:
"""Get logs from the database."""
query = self.db.query(ProjectLog)
if project_id:
query = query.filter(ProjectLog.history_id == project_id)
if level:
query = query.filter(ProjectLog.log_level == level)
logs = query.order_by(ProjectLog.timestamp.desc()).limit(limit).all()
return [
{
"id": log.id,
"history_id": log.history_id,
"level": log.log_level,
"message": log.log_message,
"timestamp": log.timestamp.isoformat() if log.timestamp else None
}
for log in logs
]
def get_audit_trail(self, project_id: str = None, limit: int = 100) -> list:
"""Get audit trail entries."""
query = self.db.query(AuditTrail)
if project_id:
query = query.filter(AuditTrail.project_id == project_id)
audits = query.order_by(AuditTrail.created_at.desc()).limit(limit).all()
return [
{
"id": audit.id,
"project_id": audit.project_id,
"action": audit.action,
"actor": audit.actor,
"action_type": audit.action_type,
"details": audit.details,
"metadata_json": audit.metadata_json,
"timestamp": audit.created_at.isoformat() if audit.created_at else None
}
for audit in audits
]
def get_all_audit_trail(self, limit: int = 100) -> list:
"""Get all audit trail entries."""
audits = self.db.query(AuditTrail).order_by(AuditTrail.created_at.desc()).limit(limit).all()
return [
{
"id": audit.id,
"project_id": audit.project_id,
"action": audit.action,
"actor": audit.actor,
"action_type": audit.action_type,
"details": audit.details,
"metadata_json": audit.metadata_json,
"timestamp": audit.created_at.isoformat() if audit.created_at else None
}
for audit in audits
]
def log_user_action(self, history_id: int, action_type: str, actor_type: str, actor_name: str,
action_description: str, action_data: dict = None) -> UserAction:
"""Log a user action."""
history = self.db.query(ProjectHistory).filter(
ProjectHistory.id == history_id
).first()
if not history:
return None
user_action = UserAction(
history_id=history_id,
action_type=action_type,
actor_type=actor_type,
actor_name=actor_name,
action_description=action_description,
action_data=action_data or {},
created_at=datetime.utcnow()
)
self.db.add(user_action)
self.db.commit()
self.db.refresh(user_action)
return user_action
def get_user_actions(self, history_id: int, limit: int = 100) -> list:
"""Get user actions for a history."""
user_actions = self.db.query(UserAction).filter(
UserAction.history_id == history_id
).order_by(UserAction.created_at.desc()).limit(limit).all()
return [
{
"id": ua.id,
"history_id": ua.history_id,
"action_type": ua.action_type,
"actor_type": ua.actor_type,
"actor_name": ua.actor_name,
"action_description": ua.action_description,
"action_data": ua.action_data,
"created_at": ua.created_at.isoformat() if ua.created_at else None
}
for ua in user_actions
]
def get_system_logs(self, level: str = None, limit: int = 100) -> list:
"""Get system logs."""
query = self.db.query(SystemLog)
if level:
query = query.filter(SystemLog.log_level == level)
logs = query.order_by(SystemLog.created_at.desc()).limit(limit).all()
return [
{
"id": log.id,
"component": log.component,
"level": log.log_level,
"message": log.log_message,
"timestamp": log.created_at.isoformat() if log.created_at else None
}
for log in logs
]
def log_code_change(self, project_id: str, change_type: str, file_path: str,
actor: str, actor_type: str, details: str) -> AuditTrail:
"""Log a code change."""
audit = AuditTrail(
project_id=project_id,
action="CODE_CHANGE",
actor=actor,
action_type=change_type,
details=f"File {file_path} {change_type}",
message=f"Code change: {file_path}",
metadata_json=json.dumps({"file": file_path, "change_type": change_type, "actor": actor})
)
self.db.add(audit)
self.db.commit()
return audit
def log_commit(self, project_id: str, commit_message: str, actor: str,
actor_type: str = "agent") -> AuditTrail:
"""Log a git commit."""
audit = AuditTrail(
project_id=project_id,
action="GIT_COMMIT",
actor=actor,
action_type="COMMIT",
details=f"Commit: {commit_message}",
message=f"Git commit: {commit_message}",
metadata_json=json.dumps({"commit": commit_message, "actor": actor, "actor_type": actor_type})
)
self.db.add(audit)
self.db.commit()
return audit
def get_project_audit_data(self, project_id: str) -> dict:
"""Get comprehensive audit data for a project."""
history = self.db.query(ProjectHistory).filter(
ProjectHistory.project_id == project_id
).first()
if not history:
return {
"project": None,
"logs": [],
"actions": [],
"audit_trail": []
}
# Get logs
logs = self.db.query(ProjectLog).filter(
ProjectLog.history_id == history.id
).order_by(ProjectLog.timestamp.desc()).all()
# Get user actions
user_actions = self.db.query(UserAction).filter(
UserAction.history_id == history.id
).order_by(UserAction.created_at.desc()).all()
# Get audit trail entries
audit_trails = self.db.query(AuditTrail).filter(
AuditTrail.project_id == project_id
).order_by(AuditTrail.created_at.desc()).all()
return {
"project": {
"id": history.id,
"project_id": history.project_id,
"project_name": history.project_name,
"description": history.description,
"status": history.status,
"progress": history.progress,
"message": history.message,
"error_message": history.error_message,
"current_step": history.current_step,
"completed_at": history.completed_at.isoformat() if history.completed_at else None,
"created_at": history.started_at.isoformat() if history.started_at else None
},
"logs": [
{
"id": log.id,
"level": log.log_level,
"message": log.log_message,
"timestamp": log.timestamp.isoformat() if log.timestamp else None
}
for log in logs
],
"actions": [
{
"id": ua.id,
"action_type": ua.action_type,
"actor_type": ua.actor_type,
"actor_name": ua.actor_name,
"action_description": ua.action_description,
"action_data": ua.action_data,
"created_at": ua.created_at.isoformat() if ua.created_at else None
}
for ua in user_actions
],
"audit_trail": [
{
"id": audit.id,
"action": audit.action,
"actor": audit.actor,
"action_type": audit.action_type,
"details": audit.details,
"timestamp": audit.created_at.isoformat() if audit.created_at else None
}
for audit in audit_trails
]
}
def cleanup_audit_trail(self) -> None:
"""Clear all audit trail entries."""
self.db.query(AuditTrail).delete()
self.db.commit()

View File

@@ -0,0 +1,75 @@
"""Git manager for project operations."""
import os
import subprocess
from typing import Optional
class GitManager:
"""Manages git operations for the project."""
def __init__(self, project_id: str):
if not project_id:
raise ValueError("project_id cannot be empty or None")
self.project_id = project_id
self.project_dir = f"{os.path.dirname(__file__)}/../../test-project/{project_id}"
def init_repo(self):
"""Initialize git repository."""
os.makedirs(self.project_dir, exist_ok=True)
os.chdir(self.project_dir)
subprocess.run(["git", "init"], check=True, capture_output=True)
def add_files(self, paths: list[str]):
"""Add files to git staging."""
subprocess.run(["git", "add"] + paths, check=True, capture_output=True)
def commit(self, message: str):
"""Create a git commit."""
subprocess.run(
["git", "commit", "-m", message],
check=True,
capture_output=True
)
def push(self, remote: str = "origin", branch: str = "main"):
"""Push changes to remote."""
subprocess.run(
["git", "push", "-u", remote, branch],
check=True,
capture_output=True
)
def create_branch(self, branch_name: str):
"""Create and switch to a new branch."""
subprocess.run(
["git", "checkout", "-b", branch_name],
check=True,
capture_output=True
)
def create_pr(
self,
title: str,
body: str,
base: str = "main",
head: Optional[str] = None
) -> dict:
"""Create a pull request via gitea API."""
# This would integrate with gitea API
# For now, return placeholder
return {
"title": title,
"body": body,
"base": base,
"head": head or f"ai-gen-{self.project_id}"
}
def get_status(self) -> str:
"""Get git status."""
result = subprocess.run(
["git", "status", "--porcelain"],
capture_output=True,
text=True
)
return result.stdout.strip()

View File

@@ -0,0 +1,115 @@
"""Gitea API integration for commits and PRs."""
import json
import os
from typing import Optional
class GiteaAPI:
"""Gitea API client for repository operations."""
def __init__(self, token: str, base_url: str):
self.token = token
self.base_url = base_url.rstrip("/")
self.headers = {
"Authorization": f"token {token}",
"Content-Type": "application/json"
}
async def create_branch(self, owner: str, repo: str, branch: str, base: str = "main"):
"""Create a new branch."""
url = f"{self.base_url}/repos/{owner}/{repo}/branches/{branch}"
payload = {"base": base}
try:
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.post(url, headers=self.headers, json=payload) as resp:
if resp.status == 201:
return await resp.json()
else:
return {"error": await resp.text()}
except Exception as e:
return {"error": str(e)}
async def create_pull_request(
self,
owner: str,
repo: str,
title: str,
body: str,
base: str = "main",
head: str = None
) -> dict:
"""Create a pull request."""
url = f"{self.base_url}/repos/{owner}/{repo}/pulls"
payload = {
"title": title,
"body": body,
"base": {"branch": base},
"head": head or f"ai-gen-{hash(title) % 10000}"
}
try:
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.post(url, headers=self.headers, json=payload) as resp:
if resp.status == 201:
return await resp.json()
else:
return {"error": await resp.text()}
except Exception as e:
return {"error": str(e)}
async def push_commit(
self,
owner: str,
repo: str,
branch: str,
files: list[dict],
message: str
) -> dict:
"""
Push files to a branch.
In production, this would use gitea's API or git push.
For now, we'll simulate the operation.
"""
# In reality, you'd need to:
# 1. Clone repo
# 2. Create branch
# 3. Add files
# 4. Commit
# 5. Push
return {
"status": "simulated",
"branch": branch,
"message": message,
"files": files
}
async def get_repo_info(self, owner: str, repo: str) -> dict:
"""Get repository information."""
url = f"{self.base_url}/repos/{owner}/{repo}"
try:
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=self.headers) as resp:
if resp.status == 200:
return await resp.json()
else:
return {"error": await resp.text()}
except Exception as e:
return {"error": str(e)}
def get_config(self) -> dict:
"""Load configuration from environment."""
return {
"base_url": os.getenv("GITEA_URL", "https://gitea.local"),
"token": os.getenv("GITEA_TOKEN", ""),
"owner": os.getenv("GITEA_OWNER", "ai-test"),
"repo": os.getenv("GITEA_REPO", "ai-test")
}

View File

@@ -0,0 +1,227 @@
"""Agent orchestrator for software generation."""
import asyncio
from typing import Optional
from ai_software_factory.agents.git_manager import GitManager
from ai_software_factory.agents.ui_manager import UIManager
from ai_software_factory.agents.gitea import GiteaAPI
from ai_software_factory.agents.database_manager import DatabaseManager
from ai_software_factory.config import settings
from datetime import datetime
import os
class AgentOrchestrator:
"""Orchestrates the software generation process with full audit trail."""
def __init__(
self,
project_id: str,
project_name: str,
description: str,
features: list,
tech_stack: list,
db = None
):
"""Initialize orchestrator."""
self.project_id = project_id
self.project_name = project_name
self.description = description
self.features = features
self.tech_stack = tech_stack
self.status = "initialized"
self.progress = 0
self.current_step = None
self.message = ""
self.logs = []
self.ui_data = {}
self.db = db
# Initialize agents
self.git_manager = GitManager(project_id)
self.ui_manager = UIManager(project_id)
self.gitea_api = GiteaAPI(
token=settings.GITEA_TOKEN,
base_url=settings.GITEA_URL
)
# Initialize database manager if db session provided
self.db_manager = None
self.history = None
if db:
self.db_manager = DatabaseManager(db)
# Log project start to database
self.history = self.db_manager.log_project_start(
project_id=project_id,
project_name=project_name,
description=description
)
# Re-fetch with new history_id
self.db_manager = DatabaseManager(db)
async def run(self) -> dict:
"""Run the software generation process with full audit logging."""
try:
# Step 1: Initialize project
self.progress = 5
self.current_step = "Initializing project"
self.message = "Setting up project structure..."
self.logs.append(f"[{datetime.utcnow().isoformat()}] Initializing project.")
# Step 2: Create project structure (skip git operations)
self.progress = 15
self.current_step = "Creating project structure"
self.message = "Creating project files..."
await self._create_project_structure()
# Step 3: Generate initial code
self.progress = 25
self.current_step = "Generating initial code"
self.message = "Generating initial code with Ollama..."
await self._generate_code()
# Step 4: Test the code
self.progress = 50
self.current_step = "Testing code"
self.message = "Running tests..."
await self._run_tests()
# Step 5: Commit to git (skip in test env)
self.progress = 75
self.current_step = "Committing to git"
self.message = "Skipping git operations in test environment..."
# Step 6: Create PR (skip in test env)
self.progress = 90
self.current_step = "Creating PR"
self.message = "Skipping PR creation in test environment..."
# Step 7: Complete
self.progress = 100
self.current_step = "Completed"
self.message = "Software generation complete!"
self.logs.append(f"[{datetime.utcnow().isoformat()}] Software generation complete!")
# Log completion to database if available
if self.db_manager and self.history:
self.db_manager.log_project_complete(
history_id=self.history.id,
message="Software generation complete!"
)
return {
"status": "completed",
"progress": self.progress,
"message": self.message,
"current_step": self.current_step,
"logs": self.logs,
"ui_data": self.ui_manager.ui_data,
"history_id": self.history.id if self.history else None
}
except Exception as e:
self.status = "error"
self.message = f"Error: {str(e)}"
self.logs.append(f"[{datetime.utcnow().isoformat()}] Error: {str(e)}")
# Log error to database if available
if self.db_manager and self.history:
self.db_manager.log_error(
history_id=self.history.id,
error=str(e)
)
return {
"status": "error",
"progress": self.progress,
"message": self.message,
"current_step": self.current_step,
"logs": self.logs,
"error": str(e),
"ui_data": self.ui_manager.ui_data,
"history_id": self.history.id if self.history else None
}
async def _create_project_structure(self) -> None:
"""Create initial project structure."""
project_dir = self.project_id
# Create .gitignore
gitignore_path = f"{project_dir}/.gitignore"
try:
os.makedirs(project_dir, exist_ok=True)
with open(gitignore_path, "w") as f:
f.write("# Python\n__pycache__/\n*.pyc\n*.pyo\n*.pyd\n.Python\n*.env\n.venv/\nnode_modules/\n.env\nbuild/\ndist/\n.pytest_cache/\n.mypy_cache/\n.coverage\nhtmlcov/\n.idea/\n.vscode/\n*.swp\n*.swo\n*~\n.DS_Store\n.git\n")
except Exception as e:
self.logs.append(f"[{datetime.utcnow().isoformat()}] Failed to create .gitignore: {str(e)}")
# Create README.md
readme_path = f"{project_dir}/README.md"
try:
with open(readme_path, "w") as f:
f.write(f"# {self.project_name}\n\n{self.description}\n\n## Features\n")
for feature in self.features:
f.write(f"- {feature}\n")
f.write(f"\n## Tech Stack\n")
for tech in self.tech_stack:
f.write(f"- {tech}\n")
except Exception as e:
self.logs.append(f"[{datetime.utcnow().isoformat()}] Failed to create README.md: {str(e)}")
async def _generate_code(self) -> None:
"""Generate code using Ollama."""
# This would call Ollama API to generate code
# For now, create a placeholder file
try:
main_py_path = f"{self.project_id}/main.py"
os.makedirs(self.project_id, exist_ok=True)
with open(main_py_path, "w") as f:
f.write("# Generated by AI Software Factory\n")
f.write("print('Hello, World!')\n")
except Exception as e:
self.logs.append(f"[{datetime.utcnow().isoformat()}] Failed to create main.py: {str(e)}")
# Log code change to audit trail
if self.db_manager and self.history:
self.db_manager.log_code_change(
project_id=self.project_id,
change_type="CREATE",
file_path="main.py",
actor="agent",
actor_type="agent",
details="Generated main.py file"
)
async def _run_tests(self) -> None:
"""Run tests for the generated code."""
# This would run pytest or other test framework
# For now, simulate test success
pass
async def _commit_to_git(self) -> None:
"""Commit changes to git."""
pass # Skip git operations in test environment
async def _create_pr(self) -> None:
"""Create pull request."""
pass # Skip PR creation in test environment
def update_status(self, status: str, progress: int, message: str) -> None:
"""Update status and progress."""
self.status = status
self.progress = progress
self.message = message
def get_ui_data(self) -> dict:
"""Get UI data."""
return self.ui_manager.ui_data
def render_dashboard(self) -> str:
"""Render dashboard HTML."""
return self.ui_manager.render_dashboard()
def get_history(self) -> Optional[dict]:
"""Get project history from database."""
if self.db_manager and self.history:
return self.db_manager.get_project_audit_data(self.history.project_id)
return None

View File

@@ -0,0 +1,151 @@
"""Telegram bot integration for n8n webhook."""
import asyncio
import json
import re
from typing import Optional
class TelegramHandler:
"""Handles Telegram messages via n8n webhook."""
def __init__(self, webhook_url: str):
self.webhook_url = webhook_url
self.api_url = "https://api.telegram.org/bot"
async def handle_message(self, message_data: dict) -> dict:
"""Handle incoming Telegram message."""
text = message_data.get("text", "")
chat_id = message_data.get("chat", {}).get("id", "")
# Extract software request from message
request = self._parse_request(text)
if request:
# Forward to backend API
async def fetch_software():
try:
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.post(
"http://localhost:8000/generate",
json=request
) as resp:
return await resp.json()
except Exception as e:
return {"error": str(e)}
result = await fetch_software()
return {
"status": "success",
"data": result,
"response": self._format_response(result)
}
else:
return {
"status": "error",
"message": "Could not parse software request"
}
def _parse_request(self, text: str) -> Optional[dict]:
"""Parse software request from user message."""
# Simple parser - in production, use LLM to extract
request = {
"name": None,
"description": None,
"features": []
}
lines = text.split("\n")
# Parse name
name_idx = -1
for i, line in enumerate(lines):
line_stripped = line.strip()
if line_stripped.lower().startswith("name:"):
request["name"] = line_stripped.split(":", 1)[1].strip()
name_idx = i
break
if not request["name"]:
return None
# Parse description (everything after name until features section)
# First, find where features section starts
features_idx = -1
for i in range(name_idx + 1, len(lines)):
line_stripped = lines[i].strip()
if line_stripped.lower().startswith("features:"):
features_idx = i
break
if features_idx > name_idx:
# Description is between name and features
request["description"] = "\n".join(lines[name_idx + 1:features_idx]).strip()
else:
# Description is everything after name
request["description"] = "\n".join(lines[name_idx + 1:]).strip()
# Strip description prefix if present
if request["description"]:
request["description"] = request["description"].strip()
if request["description"].lower().startswith("description:"):
request["description"] = request["description"][len("description:") + 1:].strip()
# Parse features
if features_idx > 0:
features_line = lines[features_idx]
# Parse inline features after "Features:"
if ":" in features_line:
inline_part = features_line.split(":", 1)[1].strip()
# Skip if it starts with dash (it's a multiline list marker)
if inline_part and not inline_part.startswith("-"):
# Remove any leading dashes or asterisks
if inline_part.startswith("-"):
inline_part = inline_part[1:].strip()
elif inline_part.startswith("*"):
inline_part = inline_part[1:].strip()
if inline_part:
# Split by comma for inline features
request["features"].extend([f.strip() for f in inline_part.split(",") if f.strip()])
# Parse multiline features (dash lines after features:)
for line in lines[features_idx + 1:]:
line_stripped = line.strip()
if not line_stripped:
continue
if line_stripped.startswith("-"):
feature_text = line_stripped[1:].strip()
if feature_text:
request["features"].append(feature_text)
elif line_stripped.startswith("*"):
feature_text = line_stripped[1:].strip()
if feature_text:
request["features"].append(feature_text)
elif ":" in line_stripped:
# Non-feature line with colon
break
# MUST have features
if not request["features"]:
return None
return request
def _format_response(self, result: dict) -> dict:
"""Format response for Telegram."""
status = result.get("status", "error")
message = result.get("message", result.get("detail", ""))
progress = result.get("progress", 0)
response_data = {
"status": status,
"message": message,
"progress": progress,
"project_name": result.get("name", "N/A"),
"logs": result.get("logs", [])
}
return response_data

View File

@@ -0,0 +1,435 @@
"""UI manager for web dashboard with audit trail display."""
import json
from typing import Optional, List
class UIManager:
"""Manages UI data and updates with audit trail display."""
def __init__(self, project_id: str):
"""Initialize UI manager."""
self.project_id = project_id
self.ui_data = {
"project_id": project_id,
"status": "initialized",
"progress": 0,
"message": "Ready",
"snapshots": [],
"features": []
}
def update_status(self, status: str, progress: int, message: str) -> None:
"""Update UI status."""
self.ui_data["status"] = status
self.ui_data["progress"] = progress
self.ui_data["message"] = message
def add_snapshot(self, data: str, created_at: Optional[str] = None) -> None:
"""Add a snapshot of UI data."""
snapshot = {
"data": data,
"created_at": created_at or self._get_current_timestamp()
}
self.ui_data.setdefault("snapshots", []).append(snapshot)
def add_feature(self, feature: str) -> None:
"""Add a feature tag."""
self.ui_data.setdefault("features", []).append(feature)
def _get_current_timestamp(self) -> str:
"""Get current timestamp in ISO format."""
from datetime import datetime
return datetime.now().isoformat()
def get_ui_data(self) -> dict:
"""Get current UI data."""
return self.ui_data
def _escape_html(self, text: str) -> str:
"""Escape HTML special characters for safe display."""
if text is None:
return ""
safe_chars = {
'&': '&',
'<': '<',
'>': '>',
'"': '"',
"'": '&#x27;'
}
return ''.join(safe_chars.get(c, c) for c in str(text))
def render_dashboard(self, audit_trail: Optional[List[dict]] = None,
actions: Optional[List[dict]] = None,
logs: Optional[List[dict]] = None) -> str:
"""Render dashboard HTML with audit trail and history display."""
# Format logs for display
logs_html = ""
if logs:
for log in logs:
level = log.get("level", "INFO")
message = self._escape_html(log.get("message", ""))
timestamp = self._escape_html(log.get("timestamp", ""))
if level == "ERROR":
level_class = "error"
else:
level_class = "info"
logs_html += f"""
<div class="log-item">
<span class="timestamp">{timestamp}</span>
<span class="log-level {level_class}">[{level}]</span>
<span>{message}</span>
</div>"""
# Format audit trail for display
audit_html = ""
if audit_trail:
for audit in audit_trail:
action = audit.get("action", "")
actor = self._escape_html(audit.get("actor", ""))
timestamp = self._escape_html(audit.get("timestamp", ""))
details = self._escape_html(audit.get("details", ""))
metadata = audit.get("metadata", {})
action_type = audit.get("action_type", "")
# Color classes for action types
action_color = action_type.lower() if action_type else "neutral"
audit_html += f"""
<div class="audit-item">
<div class="audit-header">
<span class="audit-action {action_color}">
{self._escape_html(action)}
</span>
<span class="audit-actor">{actor}</span>
<span class="audit-time">{timestamp}</span>
</div>
<div class="audit-details">{details}</div>
{f'<div class="audit-metadata">{json.dumps(metadata)}</div>' if metadata else ''}
</div>
"""
# Format actions for display
actions_html = ""
if actions:
for action in actions:
action_type = action.get("action_type", "")
description = self._escape_html(action.get("description", ""))
actor_name = self._escape_html(action.get("actor_name", ""))
actor_type = action.get("actor_type", "")
timestamp = self._escape_html(action.get("timestamp", ""))
actions_html += f"""
<div class="action-item">
<div class="action-type">{self._escape_html(action_type)}</div>
<div class="action-description">{description}</div>
<div class="action-actor">{actor_type}: {actor_name}</div>
<div class="action-time">{timestamp}</div>
</div>"""
# Format snapshots for display
snapshots_html = ""
snapshots = self.ui_data.get("snapshots", [])
if snapshots:
for snapshot in snapshots:
data = snapshot.get("data", "")
created_at = snapshot.get("created_at", "")
snapshots_html += f"""
<div class="snapshot-item">
<div class="snapshot-time">{created_at}</div>
<pre class="snapshot-data">{data}</pre>
</div>"""
# Build features HTML
features_html = ""
features = self.ui_data.get("features", [])
if features:
feature_tags = []
for feat in features:
feature_tags.append(f'<span class="feature-tag">{self._escape_html(feat)}</span>')
features_html = f'<div class="features">{"".join(feature_tags)}</div>'
# Build project header HTML
project_id_escaped = self._escape_html(self.ui_data.get('project_id', 'Project'))
status = self.ui_data.get('status', 'initialized')
# Determine empty state message
empty_state_message = ""
if not audit_trail and not actions and not snapshots_html:
empty_state_message = 'No audit trail entries available'
return f"""<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>AI Software Factory Dashboard</title>
<style>
* {{ margin: 0; padding: 0; box-sizing: border-box; }}
body {{
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
min-height: 100vh;
padding: 2rem;
}}
.container {{
max-width: 1200px;
margin: 0 auto;
background: white;
border-radius: 16px;
padding: 2rem;
box-shadow: 0 20px 60px rgba(0,0,0,0.3);
}}
h1 {{
color: #333;
margin-bottom: 1.5rem;
font-size: 2rem;
}}
h2 {{
color: #444;
margin: 2rem 0 1rem;
font-size: 1.5rem;
border-bottom: 2px solid #667eea;
padding-bottom: 0.5rem;
}}
.project {{
background: #f8f9fa;
border-radius: 12px;
padding: 1.5rem;
margin-bottom: 1rem;
}}
.project-header {{
display: flex;
justify-content: space-between;
align-items: center;
margin-bottom: 1rem;
}}
.project-name {{
font-size: 1.25rem;
font-weight: bold;
color: #333;
}}
.status-badge {{
padding: 0.5rem 1rem;
border-radius: 20px;
font-weight: bold;
font-size: 0.85rem;
}}
.status-badge.running {{ background: #fff3cd; color: #856404; }}
.status-badge.completed {{ background: #d4edda; color: #155724; }}
.status-badge.error {{ background: #f8d7da; color: #721c24; }}
.status-badge.initialized {{ background: #e2e3e5; color: #383d41; }}
.progress-bar {{
width: 100%;
height: 24px;
background: #e9ecef;
border-radius: 12px;
overflow: hidden;
margin: 1rem 0;
}}
.progress-fill {{
height: 100%;
background: linear-gradient(90deg, #667eea, #764ba2);
transition: width 0.5s ease;
}}
.message {{
color: #495057;
margin: 0.5rem 0;
}}
.logs {{
background: #f8f9fa;
border-radius: 8px;
padding: 1rem;
max-height: 200px;
overflow-y: auto;
font-family: monospace;
font-size: 0.85rem;
}}
.log-item {{
padding: 0.25rem 0;
border-bottom: 1px solid #e9ecef;
}}
.log-item:last-child {{ border-bottom: none; }}
.timestamp {{
color: #6c757d;
font-size: 0.8rem;
}}
.log-level {{
font-weight: bold;
margin-right: 0.5rem;
}}
.log-level.info {{ color: #28a745; }}
.log-level.error {{ color: #dc3545; }}
.features {{
margin-top: 1rem;
display: flex;
flex-wrap: wrap;
gap: 0.5rem;
}}
.feature-tag {{
background: #e7f3ff;
color: #0066cc;
padding: 0.25rem 0.75rem;
border-radius: 12px;
font-size: 0.85rem;
}}
.audit-section {{
background: #f8f9fa;
border-radius: 12px;
padding: 1.5rem;
margin-top: 1rem;
}}
.audit-item {{
background: white;
border: 1px solid #dee2e6;
border-radius: 8px;
padding: 1rem;
margin-bottom: 0.5rem;
}}
.audit-header {{
display: flex;
justify-content: space-between;
align-items: center;
margin-bottom: 0.5rem;
flex-wrap: wrap;
gap: 0.5rem;
}}
.audit-action {{
padding: 0.25rem 0.75rem;
border-radius: 12px;
font-size: 0.85rem;
font-weight: bold;
}}
.audit-action.CREATE {{ background: #d4edda; color: #155724; }}
.audit-action.UPDATE {{ background: #cce5ff; color: #004085; }}
.audit-action.DELETE {{ background: #f8d7da; color: #721c24; }}
.audit-action.PROMPT {{ background: #d1ecf1; color: #0c5460; }}
.audit-action.COMMIT {{ background: #fff3cd; color: #856404; }}
.audit-action.PR_CREATED {{ background: #d4edda; color: #155724; }}
.audit-action.neutral {{ background: #e9ecef; color: #495057; }}
.audit-actor {{
background: #e9ecef;
padding: 0.25rem 0.75rem;
border-radius: 12px;
font-size: 0.8rem;
}}
.audit-time {{
color: #6c757d;
font-size: 0.8rem;
}}
.audit-details {{
color: #495057;
font-size: 0.9rem;
font-weight: bold;
text-transform: uppercase;
}}
.audit-metadata {{
background: #f1f3f5;
padding: 0.5rem;
border-radius: 4px;
font-size: 0.75rem;
font-family: monospace;
margin-top: 0.5rem;
max-height: 100px;
overflow-y: auto;
}}
.action-item {{
background: white;
border: 1px solid #dee2e6;
border-radius: 8px;
padding: 1rem;
margin-bottom: 0.5rem;
}}
.action-type {{
font-weight: bold;
color: #667eea;
font-size: 0.9rem;
}}
.action-description {{
color: #495057;
margin: 0.5rem 0;
}}
.action-actor {{
color: #6c757d;
font-size: 0.8rem;
}}
.snapshot-section {{
background: #f8f9fa;
border-radius: 12px;
padding: 1.5rem;
margin-top: 1rem;
}}
.snapshot-item {{
background: white;
border: 1px solid #dee2e6;
border-radius: 8px;
padding: 1rem;
margin-bottom: 0.5rem;
}}
.snapshot-time {{
color: #6c757d;
font-size: 0.8rem;
margin-bottom: 0.5rem;
}}
.snapshot-data {{
background: #f8f9fa;
padding: 0.5rem;
border-radius: 4px;
font-family: monospace;
font-size: 0.75rem;
max-height: 200px;
overflow-y: auto;
white-space: pre-wrap;
word-break: break-all;
}}
.empty-state {{
text-align: center;
color: #6c757d;
padding: 2rem;
}}
@media (max-width: 768px) {{
.container {{
padding: 1rem;
}}
h1 {{
font-size: 1.5rem;
}}
}}
</style>
</head>
<body>
<div class="container">
<h1>AI Software Factory Dashboard</h1>
<div class="project">
<div class="project-header">
<span class="project-name">{project_id_escaped}</span>
<span class="status-badge {status}">
{status.upper()}
</span>
</div>
<div class="progress-bar">
<div class="progress-fill" style="width: {self.ui_data.get('progress', 0)}%;"></div>
</div>
<div class="message">{self._escape_html(self.ui_data.get('message', 'No message'))}</div>
{f'<div class="logs" id="logs">{logs_html}</div>' if logs else '<div class="empty-state">No logs available</div>'}
{features_html}
</div>
{f'<div class="audit-section"><h2>Audit Trail</h2>{audit_html}</div>' if audit_html else ''}
{f'<div class="action-section"><h2>User Actions</h2>{actions_html}</div>' if actions_html else ''}
{f'<div class="snapshot-section"><h2>UI Snapshots</h2>{snapshots_html}</div>' if snapshots_html else ''}
{empty_state_message}
</div>
</body>
</html>"""