generated from Templates/Docker_Image
501 lines
18 KiB
Python
501 lines
18 KiB
Python
"""Database manager for audit logging."""
|
|
|
|
from sqlalchemy.orm import Session
|
|
from sqlalchemy import text
|
|
from ai_software_factory.database import get_db
|
|
from ai_software_factory.models import (
|
|
ProjectHistory, ProjectLog, UISnapshot, PullRequestData, SystemLog, UserAction, AuditTrail, PullRequest, ProjectStatus
|
|
)
|
|
from datetime import datetime
|
|
import json
|
|
|
|
|
|
class DatabaseMigrations:
|
|
"""Handles database migrations."""
|
|
|
|
def __init__(self, db: Session):
|
|
"""Initialize migrations."""
|
|
self.db = db
|
|
|
|
def run(self) -> int:
|
|
"""Run migrations."""
|
|
return 0
|
|
|
|
def get_project_by_id(self, project_id: str) -> ProjectHistory | None:
|
|
"""Get project by ID."""
|
|
return self.db.query(ProjectHistory).filter(ProjectHistory.project_id == project_id).first()
|
|
|
|
def get_all_projects(self) -> list[ProjectHistory]:
|
|
"""Get all projects."""
|
|
return self.db.query(ProjectHistory).all()
|
|
|
|
def get_project_logs(self, history_id: int, limit: int = 100) -> list[ProjectLog]:
|
|
"""Get project logs."""
|
|
return self.db.query(ProjectLog).filter(ProjectLog.history_id == history_id).limit(limit).all()
|
|
|
|
def get_system_logs(self, limit: int = 100) -> list[SystemLog]:
|
|
"""Get system logs."""
|
|
return self.db.query(SystemLog).limit(limit).all()
|
|
|
|
def log_system_event(self, component: str, level: str, message: str,
|
|
user_agent: str | None = None, ip_address: str | None = None) -> SystemLog:
|
|
"""Log a system event."""
|
|
log = SystemLog(
|
|
component=component,
|
|
log_level=level,
|
|
log_message=message,
|
|
user_agent=user_agent,
|
|
ip_address=ip_address
|
|
)
|
|
self.db.add(log)
|
|
self.db.commit()
|
|
self.db.refresh(log)
|
|
return log
|
|
|
|
|
|
class DatabaseManager:
|
|
"""Manages database operations for audit logging and history tracking."""
|
|
|
|
def __init__(self, db: Session):
|
|
"""Initialize database manager."""
|
|
self.db = db
|
|
self.migrations = DatabaseMigrations(self.db)
|
|
|
|
def log_project_start(self, project_id: str, project_name: str, description: str) -> ProjectHistory:
|
|
"""Log project start."""
|
|
history = ProjectHistory(
|
|
project_id=project_id,
|
|
project_name=project_name,
|
|
description=description,
|
|
status=ProjectStatus.INITIALIZED.value,
|
|
progress=0,
|
|
message="Project initialization started"
|
|
)
|
|
self.db.add(history)
|
|
self.db.commit()
|
|
self.db.refresh(history)
|
|
|
|
# Log the action in audit trail
|
|
self._log_audit_trail(
|
|
project_id=project_id,
|
|
action="PROJECT_CREATED",
|
|
actor="system",
|
|
action_type="CREATE",
|
|
details=f"Project {project_name} was created",
|
|
message="Project created successfully"
|
|
)
|
|
|
|
return history
|
|
|
|
def log_progress_update(self, history_id: int, progress: int, step: str, message: str) -> None:
|
|
"""Log progress update."""
|
|
history = self.db.query(ProjectHistory).filter(
|
|
ProjectHistory.id == history_id
|
|
).first()
|
|
|
|
if history:
|
|
history.progress = progress
|
|
history.current_step = step
|
|
history.message = message
|
|
self.db.commit()
|
|
|
|
# Log the action
|
|
self._log_action(history_id, "INFO", f"Progress: {progress}%, Step: {step} - {message}")
|
|
|
|
# Log to audit trail
|
|
self._log_audit_trail(
|
|
project_id=history.project_id,
|
|
action="PROGRESS_UPDATE",
|
|
actor="agent",
|
|
action_type="UPDATE",
|
|
details=f"Progress updated to {progress}% - {step}",
|
|
message=f"Progress: {progress}%, Step: {step} - {message}",
|
|
metadata_json=json.dumps({"step": step, "message": message})
|
|
)
|
|
|
|
def log_project_complete(self, history_id: int, message: str) -> None:
|
|
"""Log project completion."""
|
|
history = self.db.query(ProjectHistory).filter(
|
|
ProjectHistory.id == history_id
|
|
).first()
|
|
|
|
if history:
|
|
history.status = ProjectStatus.COMPLETED.value
|
|
history.completed_at = datetime.utcnow()
|
|
history.message = message
|
|
self.db.commit()
|
|
|
|
# Log the action
|
|
self._log_action(history_id, "INFO", f"Project completed: {message}")
|
|
|
|
# Log to audit trail
|
|
self._log_audit_trail(
|
|
project_id=history.project_id,
|
|
action="PROJECT_COMPLETED",
|
|
actor="agent",
|
|
action_type="COMPLETE",
|
|
details=message,
|
|
message=f"Project completed: {message}"
|
|
)
|
|
|
|
def log_error(self, history_id: int, error: str) -> None:
|
|
"""Log error."""
|
|
history = self.db.query(ProjectHistory).filter(
|
|
ProjectHistory.id == history_id
|
|
).first()
|
|
|
|
if history:
|
|
history.status = ProjectStatus.ERROR.value
|
|
history.error_message = error
|
|
self.db.commit()
|
|
|
|
# Log the action
|
|
self._log_action(history_id, "ERROR", f"Error occurred: {error}")
|
|
|
|
# Log to audit trail
|
|
self._log_audit_trail(
|
|
project_id=history.project_id,
|
|
action="ERROR_OCCURRED",
|
|
actor="agent",
|
|
action_type="ERROR",
|
|
details=error,
|
|
message=f"Error occurred: {error}"
|
|
)
|
|
|
|
def _log_action(self, history_id: int, level: str, message: str) -> None:
|
|
"""Log an action to the project log."""
|
|
project_log = ProjectLog(
|
|
history_id=history_id,
|
|
log_level=level,
|
|
log_message=message,
|
|
timestamp=datetime.utcnow()
|
|
)
|
|
self.db.add(project_log)
|
|
self.db.commit()
|
|
|
|
def save_ui_snapshot(self, history_id: int, ui_data: dict) -> UISnapshot:
|
|
"""Save UI snapshot."""
|
|
snapshot = UISnapshot(
|
|
history_id=history_id,
|
|
snapshot_data=json.dumps(ui_data),
|
|
created_at=datetime.utcnow()
|
|
)
|
|
self.db.add(snapshot)
|
|
self.db.commit()
|
|
self.db.refresh(snapshot)
|
|
return snapshot
|
|
|
|
def save_pr_data(self, history_id: int, pr_data: dict) -> PullRequest:
|
|
"""Save PR data."""
|
|
# Parse PR data
|
|
pr_number = pr_data.get("pr_number", pr_data.get("id", 0))
|
|
pr_title = pr_data.get("title", pr_data.get("pr_title", ""))
|
|
pr_body = pr_data.get("body", pr_data.get("pr_body", ""))
|
|
pr_state = pr_data.get("state", pr_data.get("pr_state", "open"))
|
|
pr_url = pr_data.get("url", pr_data.get("pr_url", ""))
|
|
|
|
pr = PullRequest(
|
|
history_id=history_id,
|
|
pr_number=pr_number,
|
|
pr_title=pr_title,
|
|
pr_body=pr_body,
|
|
base=pr_data.get("base", "main"),
|
|
user=pr_data.get("user", "system"),
|
|
pr_url=pr_url,
|
|
merged=False,
|
|
pr_state=pr_state
|
|
)
|
|
self.db.add(pr)
|
|
self.db.commit()
|
|
self.db.refresh(pr)
|
|
return pr
|
|
|
|
def get_project_by_id(self, project_id: str) -> ProjectHistory | None:
|
|
"""Get project by ID."""
|
|
return self.db.query(ProjectHistory).filter(ProjectHistory.project_id == project_id).first()
|
|
|
|
def get_all_projects(self) -> list[ProjectHistory]:
|
|
"""Get all projects."""
|
|
return self.db.query(ProjectHistory).all()
|
|
|
|
def get_project_logs(self, history_id: int, limit: int = 100) -> list[ProjectLog]:
|
|
"""Get project logs."""
|
|
return self.db.query(ProjectLog).filter(ProjectLog.history_id == history_id).limit(limit).all()
|
|
|
|
def log_system_event(self, component: str, level: str, message: str,
|
|
user_agent: str | None = None, ip_address: str | None = None) -> SystemLog:
|
|
"""Log a system event."""
|
|
log = SystemLog(
|
|
component=component,
|
|
log_level=level,
|
|
log_message=message,
|
|
user_agent=user_agent,
|
|
ip_address=ip_address
|
|
)
|
|
self.db.add(log)
|
|
self.db.commit()
|
|
self.db.refresh(log)
|
|
return log
|
|
|
|
def _log_audit_trail(
|
|
self,
|
|
project_id: str,
|
|
action: str,
|
|
actor: str,
|
|
action_type: str,
|
|
details: str,
|
|
message: str | None = None,
|
|
**kwargs
|
|
) -> AuditTrail:
|
|
"""Log to the audit trail."""
|
|
metadata_json = kwargs.get("metadata_json", kwargs.get("metadata", "{}"))
|
|
audit = AuditTrail(
|
|
project_id=project_id,
|
|
action=action,
|
|
actor=actor,
|
|
action_type=action_type,
|
|
details=details,
|
|
message=message or details,
|
|
metadata_json=metadata_json or "{}"
|
|
)
|
|
self.db.add(audit)
|
|
self.db.commit()
|
|
return audit
|
|
|
|
def get_logs(self, project_id: str = None, level: str = None, limit: int = 100) -> list:
|
|
"""Get logs from the database."""
|
|
query = self.db.query(ProjectLog)
|
|
|
|
if project_id:
|
|
query = query.filter(ProjectLog.history_id == project_id)
|
|
|
|
if level:
|
|
query = query.filter(ProjectLog.log_level == level)
|
|
|
|
logs = query.order_by(ProjectLog.timestamp.desc()).limit(limit).all()
|
|
return [
|
|
{
|
|
"id": log.id,
|
|
"history_id": log.history_id,
|
|
"level": log.log_level,
|
|
"message": log.log_message,
|
|
"timestamp": log.timestamp.isoformat() if log.timestamp else None
|
|
}
|
|
for log in logs
|
|
]
|
|
|
|
def get_audit_trail(self, project_id: str = None, limit: int = 100) -> list:
|
|
"""Get audit trail entries."""
|
|
query = self.db.query(AuditTrail)
|
|
|
|
if project_id:
|
|
query = query.filter(AuditTrail.project_id == project_id)
|
|
|
|
audits = query.order_by(AuditTrail.created_at.desc()).limit(limit).all()
|
|
return [
|
|
{
|
|
"id": audit.id,
|
|
"project_id": audit.project_id,
|
|
"action": audit.action,
|
|
"actor": audit.actor,
|
|
"action_type": audit.action_type,
|
|
"details": audit.details,
|
|
"metadata_json": audit.metadata_json,
|
|
"timestamp": audit.created_at.isoformat() if audit.created_at else None
|
|
}
|
|
for audit in audits
|
|
]
|
|
|
|
def get_all_audit_trail(self, limit: int = 100) -> list:
|
|
"""Get all audit trail entries."""
|
|
audits = self.db.query(AuditTrail).order_by(AuditTrail.created_at.desc()).limit(limit).all()
|
|
return [
|
|
{
|
|
"id": audit.id,
|
|
"project_id": audit.project_id,
|
|
"action": audit.action,
|
|
"actor": audit.actor,
|
|
"action_type": audit.action_type,
|
|
"details": audit.details,
|
|
"metadata_json": audit.metadata_json,
|
|
"timestamp": audit.created_at.isoformat() if audit.created_at else None
|
|
}
|
|
for audit in audits
|
|
]
|
|
|
|
def log_user_action(self, history_id: int, action_type: str, actor_type: str, actor_name: str,
|
|
action_description: str, action_data: dict = None) -> UserAction:
|
|
"""Log a user action."""
|
|
history = self.db.query(ProjectHistory).filter(
|
|
ProjectHistory.id == history_id
|
|
).first()
|
|
|
|
if not history:
|
|
return None
|
|
|
|
user_action = UserAction(
|
|
history_id=history_id,
|
|
action_type=action_type,
|
|
actor_type=actor_type,
|
|
actor_name=actor_name,
|
|
action_description=action_description,
|
|
action_data=action_data or {},
|
|
created_at=datetime.utcnow()
|
|
)
|
|
self.db.add(user_action)
|
|
self.db.commit()
|
|
self.db.refresh(user_action)
|
|
return user_action
|
|
|
|
def get_user_actions(self, history_id: int, limit: int = 100) -> list:
|
|
"""Get user actions for a history."""
|
|
user_actions = self.db.query(UserAction).filter(
|
|
UserAction.history_id == history_id
|
|
).order_by(UserAction.created_at.desc()).limit(limit).all()
|
|
|
|
return [
|
|
{
|
|
"id": ua.id,
|
|
"history_id": ua.history_id,
|
|
"action_type": ua.action_type,
|
|
"actor_type": ua.actor_type,
|
|
"actor_name": ua.actor_name,
|
|
"action_description": ua.action_description,
|
|
"action_data": ua.action_data,
|
|
"created_at": ua.created_at.isoformat() if ua.created_at else None
|
|
}
|
|
for ua in user_actions
|
|
]
|
|
|
|
def get_system_logs(self, level: str = None, limit: int = 100) -> list:
|
|
"""Get system logs."""
|
|
query = self.db.query(SystemLog)
|
|
|
|
if level:
|
|
query = query.filter(SystemLog.log_level == level)
|
|
|
|
logs = query.order_by(SystemLog.created_at.desc()).limit(limit).all()
|
|
return [
|
|
{
|
|
"id": log.id,
|
|
"component": log.component,
|
|
"level": log.log_level,
|
|
"message": log.log_message,
|
|
"timestamp": log.created_at.isoformat() if log.created_at else None
|
|
}
|
|
for log in logs
|
|
]
|
|
|
|
def log_code_change(self, project_id: str, change_type: str, file_path: str,
|
|
actor: str, actor_type: str, details: str) -> AuditTrail:
|
|
"""Log a code change."""
|
|
audit = AuditTrail(
|
|
project_id=project_id,
|
|
action="CODE_CHANGE",
|
|
actor=actor,
|
|
action_type=change_type,
|
|
details=f"File {file_path} {change_type}",
|
|
message=f"Code change: {file_path}",
|
|
metadata_json=json.dumps({"file": file_path, "change_type": change_type, "actor": actor})
|
|
)
|
|
self.db.add(audit)
|
|
self.db.commit()
|
|
return audit
|
|
|
|
def log_commit(self, project_id: str, commit_message: str, actor: str,
|
|
actor_type: str = "agent") -> AuditTrail:
|
|
"""Log a git commit."""
|
|
audit = AuditTrail(
|
|
project_id=project_id,
|
|
action="GIT_COMMIT",
|
|
actor=actor,
|
|
action_type="COMMIT",
|
|
details=f"Commit: {commit_message}",
|
|
message=f"Git commit: {commit_message}",
|
|
metadata_json=json.dumps({"commit": commit_message, "actor": actor, "actor_type": actor_type})
|
|
)
|
|
self.db.add(audit)
|
|
self.db.commit()
|
|
return audit
|
|
|
|
def get_project_audit_data(self, project_id: str) -> dict:
|
|
"""Get comprehensive audit data for a project."""
|
|
history = self.db.query(ProjectHistory).filter(
|
|
ProjectHistory.project_id == project_id
|
|
).first()
|
|
|
|
if not history:
|
|
return {
|
|
"project": None,
|
|
"logs": [],
|
|
"actions": [],
|
|
"audit_trail": []
|
|
}
|
|
|
|
# Get logs
|
|
logs = self.db.query(ProjectLog).filter(
|
|
ProjectLog.history_id == history.id
|
|
).order_by(ProjectLog.timestamp.desc()).all()
|
|
|
|
# Get user actions
|
|
user_actions = self.db.query(UserAction).filter(
|
|
UserAction.history_id == history.id
|
|
).order_by(UserAction.created_at.desc()).all()
|
|
|
|
# Get audit trail entries
|
|
audit_trails = self.db.query(AuditTrail).filter(
|
|
AuditTrail.project_id == project_id
|
|
).order_by(AuditTrail.created_at.desc()).all()
|
|
|
|
return {
|
|
"project": {
|
|
"id": history.id,
|
|
"project_id": history.project_id,
|
|
"project_name": history.project_name,
|
|
"description": history.description,
|
|
"status": history.status,
|
|
"progress": history.progress,
|
|
"message": history.message,
|
|
"error_message": history.error_message,
|
|
"current_step": history.current_step,
|
|
"completed_at": history.completed_at.isoformat() if history.completed_at else None,
|
|
"created_at": history.started_at.isoformat() if history.started_at else None
|
|
},
|
|
"logs": [
|
|
{
|
|
"id": log.id,
|
|
"level": log.log_level,
|
|
"message": log.log_message,
|
|
"timestamp": log.timestamp.isoformat() if log.timestamp else None
|
|
}
|
|
for log in logs
|
|
],
|
|
"actions": [
|
|
{
|
|
"id": ua.id,
|
|
"action_type": ua.action_type,
|
|
"actor_type": ua.actor_type,
|
|
"actor_name": ua.actor_name,
|
|
"action_description": ua.action_description,
|
|
"action_data": ua.action_data,
|
|
"created_at": ua.created_at.isoformat() if ua.created_at else None
|
|
}
|
|
for ua in user_actions
|
|
],
|
|
"audit_trail": [
|
|
{
|
|
"id": audit.id,
|
|
"action": audit.action,
|
|
"actor": audit.actor,
|
|
"action_type": audit.action_type,
|
|
"details": audit.details,
|
|
"timestamp": audit.created_at.isoformat() if audit.created_at else None
|
|
}
|
|
for audit in audit_trails
|
|
]
|
|
}
|
|
|
|
def cleanup_audit_trail(self) -> None:
|
|
"""Clear all audit trail entries."""
|
|
self.db.query(AuditTrail).delete()
|
|
self.db.commit() |