"""Database manager for audit logging.""" from sqlalchemy.orm import Session from sqlalchemy import text from database import get_db from models import ( ProjectHistory, ProjectLog, UISnapshot, PullRequestData, SystemLog, UserAction, AuditTrail, PullRequest, ProjectStatus ) from datetime import datetime import json class DatabaseMigrations: """Handles database migrations.""" def __init__(self, db: Session): """Initialize migrations.""" self.db = db def run(self) -> int: """Run migrations.""" return 0 def get_project_by_id(self, project_id: str) -> ProjectHistory | None: """Get project by ID.""" return self.db.query(ProjectHistory).filter(ProjectHistory.project_id == project_id).first() def get_all_projects(self) -> list[ProjectHistory]: """Get all projects.""" return self.db.query(ProjectHistory).all() def get_project_logs(self, history_id: int, limit: int = 100) -> list[ProjectLog]: """Get project logs.""" return self.db.query(ProjectLog).filter(ProjectLog.history_id == history_id).limit(limit).all() def get_system_logs(self, limit: int = 100) -> list[SystemLog]: """Get system logs.""" return self.db.query(SystemLog).limit(limit).all() def log_system_event(self, component: str, level: str, message: str, user_agent: str | None = None, ip_address: str | None = None) -> SystemLog: """Log a system event.""" log = SystemLog( component=component, log_level=level, log_message=message, user_agent=user_agent, ip_address=ip_address ) self.db.add(log) self.db.commit() self.db.refresh(log) return log class DatabaseManager: """Manages database operations for audit logging and history tracking.""" def __init__(self, db: Session): """Initialize database manager.""" self.db = db self.migrations = DatabaseMigrations(self.db) def log_project_start(self, project_id: str, project_name: str, description: str) -> ProjectHistory: """Log project start.""" history = ProjectHistory( project_id=project_id, project_name=project_name, description=description, status=ProjectStatus.INITIALIZED.value, progress=0, message="Project initialization started" ) self.db.add(history) self.db.commit() self.db.refresh(history) # Log the action in audit trail self._log_audit_trail( project_id=project_id, action="PROJECT_CREATED", actor="system", action_type="CREATE", details=f"Project {project_name} was created", message="Project created successfully" ) return history def log_progress_update(self, history_id: int, progress: int, step: str, message: str) -> None: """Log progress update.""" history = self.db.query(ProjectHistory).filter( ProjectHistory.id == history_id ).first() if history: history.progress = progress history.current_step = step history.message = message self.db.commit() # Log the action self._log_action(history_id, "INFO", f"Progress: {progress}%, Step: {step} - {message}") # Log to audit trail self._log_audit_trail( project_id=history.project_id, action="PROGRESS_UPDATE", actor="agent", action_type="UPDATE", details=f"Progress updated to {progress}% - {step}", message=f"Progress: {progress}%, Step: {step} - {message}", metadata_json=json.dumps({"step": step, "message": message}) ) def log_project_complete(self, history_id: int, message: str) -> None: """Log project completion.""" history = self.db.query(ProjectHistory).filter( ProjectHistory.id == history_id ).first() if history: history.status = ProjectStatus.COMPLETED.value history.completed_at = datetime.utcnow() history.message = message self.db.commit() # Log the action self._log_action(history_id, "INFO", f"Project completed: {message}") # Log to audit trail self._log_audit_trail( project_id=history.project_id, action="PROJECT_COMPLETED", actor="agent", action_type="COMPLETE", details=message, message=f"Project completed: {message}" ) def log_error(self, history_id: int, error: str) -> None: """Log error.""" history = self.db.query(ProjectHistory).filter( ProjectHistory.id == history_id ).first() if history: history.status = ProjectStatus.ERROR.value history.error_message = error self.db.commit() # Log the action self._log_action(history_id, "ERROR", f"Error occurred: {error}") # Log to audit trail self._log_audit_trail( project_id=history.project_id, action="ERROR_OCCURRED", actor="agent", action_type="ERROR", details=error, message=f"Error occurred: {error}" ) def _log_action(self, history_id: int, level: str, message: str) -> None: """Log an action to the project log.""" project_log = ProjectLog( history_id=history_id, log_level=level, log_message=message, timestamp=datetime.utcnow() ) self.db.add(project_log) self.db.commit() def save_ui_snapshot(self, history_id: int, ui_data: dict) -> UISnapshot: """Save UI snapshot.""" snapshot = UISnapshot( history_id=history_id, snapshot_data=json.dumps(ui_data), created_at=datetime.utcnow() ) self.db.add(snapshot) self.db.commit() self.db.refresh(snapshot) return snapshot def save_pr_data(self, history_id: int, pr_data: dict) -> PullRequest: """Save PR data.""" # Parse PR data pr_number = pr_data.get("pr_number", pr_data.get("id", 0)) pr_title = pr_data.get("title", pr_data.get("pr_title", "")) pr_body = pr_data.get("body", pr_data.get("pr_body", "")) pr_state = pr_data.get("state", pr_data.get("pr_state", "open")) pr_url = pr_data.get("url", pr_data.get("pr_url", "")) pr = PullRequest( history_id=history_id, pr_number=pr_number, pr_title=pr_title, pr_body=pr_body, base=pr_data.get("base", "main"), user=pr_data.get("user", "system"), pr_url=pr_url, merged=False, pr_state=pr_state ) self.db.add(pr) self.db.commit() self.db.refresh(pr) return pr def get_project_by_id(self, project_id: str) -> ProjectHistory | None: """Get project by ID.""" return self.db.query(ProjectHistory).filter(ProjectHistory.project_id == project_id).first() def get_all_projects(self) -> list[ProjectHistory]: """Get all projects.""" return self.db.query(ProjectHistory).all() def get_project_logs(self, history_id: int, limit: int = 100) -> list[ProjectLog]: """Get project logs.""" return self.db.query(ProjectLog).filter(ProjectLog.history_id == history_id).limit(limit).all() def log_system_event(self, component: str, level: str, message: str, user_agent: str | None = None, ip_address: str | None = None) -> SystemLog: """Log a system event.""" log = SystemLog( component=component, log_level=level, log_message=message, user_agent=user_agent, ip_address=ip_address ) self.db.add(log) self.db.commit() self.db.refresh(log) return log def _log_audit_trail( self, project_id: str, action: str, actor: str, action_type: str, details: str, message: str | None = None, **kwargs ) -> AuditTrail: """Log to the audit trail.""" metadata_json = kwargs.get("metadata_json", kwargs.get("metadata", "{}")) audit = AuditTrail( project_id=project_id, action=action, actor=actor, action_type=action_type, details=details, message=message or details, metadata_json=metadata_json or "{}" ) self.db.add(audit) self.db.commit() return audit def get_logs(self, project_id: str = None, level: str = None, limit: int = 100) -> list: """Get logs from the database.""" query = self.db.query(ProjectLog) if project_id: query = query.filter(ProjectLog.history_id == project_id) if level: query = query.filter(ProjectLog.log_level == level) logs = query.order_by(ProjectLog.timestamp.desc()).limit(limit).all() return [ { "id": log.id, "history_id": log.history_id, "level": log.log_level, "message": log.log_message, "timestamp": log.timestamp.isoformat() if log.timestamp else None } for log in logs ] def get_audit_trail(self, project_id: str = None, limit: int = 100) -> list: """Get audit trail entries.""" query = self.db.query(AuditTrail) if project_id: query = query.filter(AuditTrail.project_id == project_id) audits = query.order_by(AuditTrail.created_at.desc()).limit(limit).all() return [ { "id": audit.id, "project_id": audit.project_id, "action": audit.action, "actor": audit.actor, "action_type": audit.action_type, "details": audit.details, "metadata_json": audit.metadata_json, "timestamp": audit.created_at.isoformat() if audit.created_at else None } for audit in audits ] def get_all_audit_trail(self, limit: int = 100) -> list: """Get all audit trail entries.""" audits = self.db.query(AuditTrail).order_by(AuditTrail.created_at.desc()).limit(limit).all() return [ { "id": audit.id, "project_id": audit.project_id, "action": audit.action, "actor": audit.actor, "action_type": audit.action_type, "details": audit.details, "metadata_json": audit.metadata_json, "timestamp": audit.created_at.isoformat() if audit.created_at else None } for audit in audits ] def log_user_action(self, history_id: int, action_type: str, actor_type: str, actor_name: str, action_description: str, action_data: dict = None) -> UserAction: """Log a user action.""" history = self.db.query(ProjectHistory).filter( ProjectHistory.id == history_id ).first() if not history: return None user_action = UserAction( history_id=history_id, action_type=action_type, actor_type=actor_type, actor_name=actor_name, action_description=action_description, action_data=action_data or {}, created_at=datetime.utcnow() ) self.db.add(user_action) self.db.commit() self.db.refresh(user_action) return user_action def get_user_actions(self, history_id: int, limit: int = 100) -> list: """Get user actions for a history.""" user_actions = self.db.query(UserAction).filter( UserAction.history_id == history_id ).order_by(UserAction.created_at.desc()).limit(limit).all() return [ { "id": ua.id, "history_id": ua.history_id, "action_type": ua.action_type, "actor_type": ua.actor_type, "actor_name": ua.actor_name, "action_description": ua.action_description, "action_data": ua.action_data, "created_at": ua.created_at.isoformat() if ua.created_at else None } for ua in user_actions ] def get_system_logs(self, level: str = None, limit: int = 100) -> list: """Get system logs.""" query = self.db.query(SystemLog) if level: query = query.filter(SystemLog.log_level == level) logs = query.order_by(SystemLog.created_at.desc()).limit(limit).all() return [ { "id": log.id, "component": log.component, "level": log.log_level, "message": log.log_message, "timestamp": log.created_at.isoformat() if log.created_at else None } for log in logs ] def log_code_change(self, project_id: str, change_type: str, file_path: str, actor: str, actor_type: str, details: str) -> AuditTrail: """Log a code change.""" audit = AuditTrail( project_id=project_id, action="CODE_CHANGE", actor=actor, action_type=change_type, details=f"File {file_path} {change_type}", message=f"Code change: {file_path}", metadata_json=json.dumps({"file": file_path, "change_type": change_type, "actor": actor}) ) self.db.add(audit) self.db.commit() return audit def log_commit(self, project_id: str, commit_message: str, actor: str, actor_type: str = "agent") -> AuditTrail: """Log a git commit.""" audit = AuditTrail( project_id=project_id, action="GIT_COMMIT", actor=actor, action_type="COMMIT", details=f"Commit: {commit_message}", message=f"Git commit: {commit_message}", metadata_json=json.dumps({"commit": commit_message, "actor": actor, "actor_type": actor_type}) ) self.db.add(audit) self.db.commit() return audit def get_project_audit_data(self, project_id: str) -> dict: """Get comprehensive audit data for a project.""" history = self.db.query(ProjectHistory).filter( ProjectHistory.project_id == project_id ).first() if not history: return { "project": None, "logs": [], "actions": [], "audit_trail": [] } # Get logs logs = self.db.query(ProjectLog).filter( ProjectLog.history_id == history.id ).order_by(ProjectLog.timestamp.desc()).all() # Get user actions user_actions = self.db.query(UserAction).filter( UserAction.history_id == history.id ).order_by(UserAction.created_at.desc()).all() # Get audit trail entries audit_trails = self.db.query(AuditTrail).filter( AuditTrail.project_id == project_id ).order_by(AuditTrail.created_at.desc()).all() return { "project": { "id": history.id, "project_id": history.project_id, "project_name": history.project_name, "description": history.description, "status": history.status, "progress": history.progress, "message": history.message, "error_message": history.error_message, "current_step": history.current_step, "completed_at": history.completed_at.isoformat() if history.completed_at else None, "created_at": history.started_at.isoformat() if history.started_at else None }, "logs": [ { "id": log.id, "level": log.log_level, "message": log.log_message, "timestamp": log.timestamp.isoformat() if log.timestamp else None } for log in logs ], "actions": [ { "id": ua.id, "action_type": ua.action_type, "actor_type": ua.actor_type, "actor_name": ua.actor_name, "action_description": ua.action_description, "action_data": ua.action_data, "created_at": ua.created_at.isoformat() if ua.created_at else None } for ua in user_actions ], "audit_trail": [ { "id": audit.id, "action": audit.action, "actor": audit.actor, "action_type": audit.action_type, "details": audit.details, "timestamp": audit.created_at.isoformat() if audit.created_at else None } for audit in audit_trails ] } def cleanup_audit_trail(self) -> None: """Clear all audit trail entries.""" self.db.query(AuditTrail).delete() self.db.commit()