Files

501 lines
18 KiB
Python

"""Database manager for audit logging."""
from sqlalchemy.orm import Session
from sqlalchemy import text
from database import get_db
from models import (
ProjectHistory, ProjectLog, UISnapshot, PullRequestData, SystemLog, UserAction, AuditTrail, PullRequest, ProjectStatus
)
from datetime import datetime
import json
class DatabaseMigrations:
"""Handles database migrations."""
def __init__(self, db: Session):
"""Initialize migrations."""
self.db = db
def run(self) -> int:
"""Run migrations."""
return 0
def get_project_by_id(self, project_id: str) -> ProjectHistory | None:
"""Get project by ID."""
return self.db.query(ProjectHistory).filter(ProjectHistory.project_id == project_id).first()
def get_all_projects(self) -> list[ProjectHistory]:
"""Get all projects."""
return self.db.query(ProjectHistory).all()
def get_project_logs(self, history_id: int, limit: int = 100) -> list[ProjectLog]:
"""Get project logs."""
return self.db.query(ProjectLog).filter(ProjectLog.history_id == history_id).limit(limit).all()
def get_system_logs(self, limit: int = 100) -> list[SystemLog]:
"""Get system logs."""
return self.db.query(SystemLog).limit(limit).all()
def log_system_event(self, component: str, level: str, message: str,
user_agent: str | None = None, ip_address: str | None = None) -> SystemLog:
"""Log a system event."""
log = SystemLog(
component=component,
log_level=level,
log_message=message,
user_agent=user_agent,
ip_address=ip_address
)
self.db.add(log)
self.db.commit()
self.db.refresh(log)
return log
class DatabaseManager:
"""Manages database operations for audit logging and history tracking."""
def __init__(self, db: Session):
"""Initialize database manager."""
self.db = db
self.migrations = DatabaseMigrations(self.db)
def log_project_start(self, project_id: str, project_name: str, description: str) -> ProjectHistory:
"""Log project start."""
history = ProjectHistory(
project_id=project_id,
project_name=project_name,
description=description,
status=ProjectStatus.INITIALIZED.value,
progress=0,
message="Project initialization started"
)
self.db.add(history)
self.db.commit()
self.db.refresh(history)
# Log the action in audit trail
self._log_audit_trail(
project_id=project_id,
action="PROJECT_CREATED",
actor="system",
action_type="CREATE",
details=f"Project {project_name} was created",
message="Project created successfully"
)
return history
def log_progress_update(self, history_id: int, progress: int, step: str, message: str) -> None:
"""Log progress update."""
history = self.db.query(ProjectHistory).filter(
ProjectHistory.id == history_id
).first()
if history:
history.progress = progress
history.current_step = step
history.message = message
self.db.commit()
# Log the action
self._log_action(history_id, "INFO", f"Progress: {progress}%, Step: {step} - {message}")
# Log to audit trail
self._log_audit_trail(
project_id=history.project_id,
action="PROGRESS_UPDATE",
actor="agent",
action_type="UPDATE",
details=f"Progress updated to {progress}% - {step}",
message=f"Progress: {progress}%, Step: {step} - {message}",
metadata_json=json.dumps({"step": step, "message": message})
)
def log_project_complete(self, history_id: int, message: str) -> None:
"""Log project completion."""
history = self.db.query(ProjectHistory).filter(
ProjectHistory.id == history_id
).first()
if history:
history.status = ProjectStatus.COMPLETED.value
history.completed_at = datetime.utcnow()
history.message = message
self.db.commit()
# Log the action
self._log_action(history_id, "INFO", f"Project completed: {message}")
# Log to audit trail
self._log_audit_trail(
project_id=history.project_id,
action="PROJECT_COMPLETED",
actor="agent",
action_type="COMPLETE",
details=message,
message=f"Project completed: {message}"
)
def log_error(self, history_id: int, error: str) -> None:
"""Log error."""
history = self.db.query(ProjectHistory).filter(
ProjectHistory.id == history_id
).first()
if history:
history.status = ProjectStatus.ERROR.value
history.error_message = error
self.db.commit()
# Log the action
self._log_action(history_id, "ERROR", f"Error occurred: {error}")
# Log to audit trail
self._log_audit_trail(
project_id=history.project_id,
action="ERROR_OCCURRED",
actor="agent",
action_type="ERROR",
details=error,
message=f"Error occurred: {error}"
)
def _log_action(self, history_id: int, level: str, message: str) -> None:
"""Log an action to the project log."""
project_log = ProjectLog(
history_id=history_id,
log_level=level,
log_message=message,
timestamp=datetime.utcnow()
)
self.db.add(project_log)
self.db.commit()
def save_ui_snapshot(self, history_id: int, ui_data: dict) -> UISnapshot:
"""Save UI snapshot."""
snapshot = UISnapshot(
history_id=history_id,
snapshot_data=json.dumps(ui_data),
created_at=datetime.utcnow()
)
self.db.add(snapshot)
self.db.commit()
self.db.refresh(snapshot)
return snapshot
def save_pr_data(self, history_id: int, pr_data: dict) -> PullRequest:
"""Save PR data."""
# Parse PR data
pr_number = pr_data.get("pr_number", pr_data.get("id", 0))
pr_title = pr_data.get("title", pr_data.get("pr_title", ""))
pr_body = pr_data.get("body", pr_data.get("pr_body", ""))
pr_state = pr_data.get("state", pr_data.get("pr_state", "open"))
pr_url = pr_data.get("url", pr_data.get("pr_url", ""))
pr = PullRequest(
history_id=history_id,
pr_number=pr_number,
pr_title=pr_title,
pr_body=pr_body,
base=pr_data.get("base", "main"),
user=pr_data.get("user", "system"),
pr_url=pr_url,
merged=False,
pr_state=pr_state
)
self.db.add(pr)
self.db.commit()
self.db.refresh(pr)
return pr
def get_project_by_id(self, project_id: str) -> ProjectHistory | None:
"""Get project by ID."""
return self.db.query(ProjectHistory).filter(ProjectHistory.project_id == project_id).first()
def get_all_projects(self) -> list[ProjectHistory]:
"""Get all projects."""
return self.db.query(ProjectHistory).all()
def get_project_logs(self, history_id: int, limit: int = 100) -> list[ProjectLog]:
"""Get project logs."""
return self.db.query(ProjectLog).filter(ProjectLog.history_id == history_id).limit(limit).all()
def log_system_event(self, component: str, level: str, message: str,
user_agent: str | None = None, ip_address: str | None = None) -> SystemLog:
"""Log a system event."""
log = SystemLog(
component=component,
log_level=level,
log_message=message,
user_agent=user_agent,
ip_address=ip_address
)
self.db.add(log)
self.db.commit()
self.db.refresh(log)
return log
def _log_audit_trail(
self,
project_id: str,
action: str,
actor: str,
action_type: str,
details: str,
message: str | None = None,
**kwargs
) -> AuditTrail:
"""Log to the audit trail."""
metadata_json = kwargs.get("metadata_json", kwargs.get("metadata", "{}"))
audit = AuditTrail(
project_id=project_id,
action=action,
actor=actor,
action_type=action_type,
details=details,
message=message or details,
metadata_json=metadata_json or "{}"
)
self.db.add(audit)
self.db.commit()
return audit
def get_logs(self, project_id: str = None, level: str = None, limit: int = 100) -> list:
"""Get logs from the database."""
query = self.db.query(ProjectLog)
if project_id:
query = query.filter(ProjectLog.history_id == project_id)
if level:
query = query.filter(ProjectLog.log_level == level)
logs = query.order_by(ProjectLog.timestamp.desc()).limit(limit).all()
return [
{
"id": log.id,
"history_id": log.history_id,
"level": log.log_level,
"message": log.log_message,
"timestamp": log.timestamp.isoformat() if log.timestamp else None
}
for log in logs
]
def get_audit_trail(self, project_id: str = None, limit: int = 100) -> list:
"""Get audit trail entries."""
query = self.db.query(AuditTrail)
if project_id:
query = query.filter(AuditTrail.project_id == project_id)
audits = query.order_by(AuditTrail.created_at.desc()).limit(limit).all()
return [
{
"id": audit.id,
"project_id": audit.project_id,
"action": audit.action,
"actor": audit.actor,
"action_type": audit.action_type,
"details": audit.details,
"metadata_json": audit.metadata_json,
"timestamp": audit.created_at.isoformat() if audit.created_at else None
}
for audit in audits
]
def get_all_audit_trail(self, limit: int = 100) -> list:
"""Get all audit trail entries."""
audits = self.db.query(AuditTrail).order_by(AuditTrail.created_at.desc()).limit(limit).all()
return [
{
"id": audit.id,
"project_id": audit.project_id,
"action": audit.action,
"actor": audit.actor,
"action_type": audit.action_type,
"details": audit.details,
"metadata_json": audit.metadata_json,
"timestamp": audit.created_at.isoformat() if audit.created_at else None
}
for audit in audits
]
def log_user_action(self, history_id: int, action_type: str, actor_type: str, actor_name: str,
action_description: str, action_data: dict = None) -> UserAction:
"""Log a user action."""
history = self.db.query(ProjectHistory).filter(
ProjectHistory.id == history_id
).first()
if not history:
return None
user_action = UserAction(
history_id=history_id,
action_type=action_type,
actor_type=actor_type,
actor_name=actor_name,
action_description=action_description,
action_data=action_data or {},
created_at=datetime.utcnow()
)
self.db.add(user_action)
self.db.commit()
self.db.refresh(user_action)
return user_action
def get_user_actions(self, history_id: int, limit: int = 100) -> list:
"""Get user actions for a history."""
user_actions = self.db.query(UserAction).filter(
UserAction.history_id == history_id
).order_by(UserAction.created_at.desc()).limit(limit).all()
return [
{
"id": ua.id,
"history_id": ua.history_id,
"action_type": ua.action_type,
"actor_type": ua.actor_type,
"actor_name": ua.actor_name,
"action_description": ua.action_description,
"action_data": ua.action_data,
"created_at": ua.created_at.isoformat() if ua.created_at else None
}
for ua in user_actions
]
def get_system_logs(self, level: str = None, limit: int = 100) -> list:
"""Get system logs."""
query = self.db.query(SystemLog)
if level:
query = query.filter(SystemLog.log_level == level)
logs = query.order_by(SystemLog.created_at.desc()).limit(limit).all()
return [
{
"id": log.id,
"component": log.component,
"level": log.log_level,
"message": log.log_message,
"timestamp": log.created_at.isoformat() if log.created_at else None
}
for log in logs
]
def log_code_change(self, project_id: str, change_type: str, file_path: str,
actor: str, actor_type: str, details: str) -> AuditTrail:
"""Log a code change."""
audit = AuditTrail(
project_id=project_id,
action="CODE_CHANGE",
actor=actor,
action_type=change_type,
details=f"File {file_path} {change_type}",
message=f"Code change: {file_path}",
metadata_json=json.dumps({"file": file_path, "change_type": change_type, "actor": actor})
)
self.db.add(audit)
self.db.commit()
return audit
def log_commit(self, project_id: str, commit_message: str, actor: str,
actor_type: str = "agent") -> AuditTrail:
"""Log a git commit."""
audit = AuditTrail(
project_id=project_id,
action="GIT_COMMIT",
actor=actor,
action_type="COMMIT",
details=f"Commit: {commit_message}",
message=f"Git commit: {commit_message}",
metadata_json=json.dumps({"commit": commit_message, "actor": actor, "actor_type": actor_type})
)
self.db.add(audit)
self.db.commit()
return audit
def get_project_audit_data(self, project_id: str) -> dict:
"""Get comprehensive audit data for a project."""
history = self.db.query(ProjectHistory).filter(
ProjectHistory.project_id == project_id
).first()
if not history:
return {
"project": None,
"logs": [],
"actions": [],
"audit_trail": []
}
# Get logs
logs = self.db.query(ProjectLog).filter(
ProjectLog.history_id == history.id
).order_by(ProjectLog.timestamp.desc()).all()
# Get user actions
user_actions = self.db.query(UserAction).filter(
UserAction.history_id == history.id
).order_by(UserAction.created_at.desc()).all()
# Get audit trail entries
audit_trails = self.db.query(AuditTrail).filter(
AuditTrail.project_id == project_id
).order_by(AuditTrail.created_at.desc()).all()
return {
"project": {
"id": history.id,
"project_id": history.project_id,
"project_name": history.project_name,
"description": history.description,
"status": history.status,
"progress": history.progress,
"message": history.message,
"error_message": history.error_message,
"current_step": history.current_step,
"completed_at": history.completed_at.isoformat() if history.completed_at else None,
"created_at": history.started_at.isoformat() if history.started_at else None
},
"logs": [
{
"id": log.id,
"level": log.log_level,
"message": log.log_message,
"timestamp": log.timestamp.isoformat() if log.timestamp else None
}
for log in logs
],
"actions": [
{
"id": ua.id,
"action_type": ua.action_type,
"actor_type": ua.actor_type,
"actor_name": ua.actor_name,
"action_description": ua.action_description,
"action_data": ua.action_data,
"created_at": ua.created_at.isoformat() if ua.created_at else None
}
for ua in user_actions
],
"audit_trail": [
{
"id": audit.id,
"action": audit.action,
"actor": audit.actor,
"action_type": audit.action_type,
"details": audit.details,
"timestamp": audit.created_at.isoformat() if audit.created_at else None
}
for audit in audit_trails
]
}
def cleanup_audit_trail(self) -> None:
"""Clear all audit trail entries."""
self.db.query(AuditTrail).delete()
self.db.commit()