feat: better history data, refs NOISSUE

This commit is contained in:
2026-04-10 23:52:08 +02:00
parent 032139c14f
commit fd812476cc
11 changed files with 2643 additions and 93 deletions

View File

@@ -0,0 +1,136 @@
"""Generate concise chat-friendly summaries of software generation results."""
from __future__ import annotations
try:
from ..config import settings
except ImportError:
from config import settings
class ChangeSummaryGenerator:
"""Create a readable overview of generated changes for chat responses."""
def __init__(self, ollama_url: str | None = None, model: str | None = None):
self.ollama_url = (ollama_url or settings.ollama_url).rstrip('/')
self.model = model or settings.OLLAMA_MODEL
async def summarize(self, context: dict) -> str:
"""Summarize project changes with Ollama, or fall back to a deterministic overview."""
summary, _trace = await self.summarize_with_trace(context)
return summary
async def summarize_with_trace(self, context: dict) -> tuple[str, dict]:
"""Summarize project changes with Ollama, or fall back to a deterministic overview."""
prompt = self._prompt(context)
system_prompt = (
'You write concise but informative mobile chat summaries of software delivery work. '
'Write 3 to 5 sentences. Mention the application goal, main delivered pieces, '
'technical direction, and what the user should expect next. Avoid markdown bullets.'
)
try:
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.post(
f'{self.ollama_url}/api/chat',
json={
'model': self.model,
'stream': False,
'messages': [
{
'role': 'system',
'content': system_prompt,
},
{'role': 'user', 'content': prompt},
],
},
) as resp:
payload = await resp.json()
if 200 <= resp.status < 300:
content = payload.get('message', {}).get('content', '').strip()
if content:
return content, {
'stage': 'change_summary',
'provider': 'ollama',
'model': self.model,
'system_prompt': system_prompt,
'user_prompt': prompt,
'assistant_response': content,
'raw_response': payload,
'fallback_used': False,
}
except Exception:
pass
fallback = self._fallback(context)
return fallback, {
'stage': 'change_summary',
'provider': 'fallback',
'model': self.model,
'system_prompt': system_prompt,
'user_prompt': prompt,
'assistant_response': fallback,
'raw_response': {'fallback': 'deterministic'},
'fallback_used': True,
}
def _prompt(self, context: dict) -> str:
features = ', '.join(context.get('features') or []) or 'No explicit features recorded'
tech_stack = ', '.join(context.get('tech_stack') or []) or 'No explicit tech stack recorded'
changed_files = ', '.join(context.get('changed_files') or []) or 'No files recorded'
logs = ' | '.join((context.get('logs') or [])[:4]) or 'No log excerpts'
return (
f"Project name: {context.get('name', 'Unknown project')}\n"
f"Description: {context.get('description', '')}\n"
f"Features: {features}\n"
f"Tech stack: {tech_stack}\n"
f"Changed files: {changed_files}\n"
f"Repository: {context.get('repository_url') or 'No repository URL'}\n"
f"Pull request: {context.get('pull_request_url') or 'No pull request URL'}\n"
f"Pull request state: {context.get('pull_request_state') or 'No pull request state'}\n"
f"Status message: {context.get('message') or ''}\n"
f"Log excerpts: {logs}\n"
"Write a broad but phone-friendly summary of what was done."
)
def _fallback(self, context: dict) -> str:
name = context.get('name', 'The project')
description = context.get('description') or 'a software request'
changed_files = context.get('changed_files') or []
features = context.get('features') or []
tech_stack = context.get('tech_stack') or []
repo_url = context.get('repository_url')
repo_status = context.get('repository_status')
pr_url = context.get('pull_request_url')
pr_state = context.get('pull_request_state')
first_sentence = f"{name} was generated from your request for {description}."
feature_sentence = (
f"The delivery focused on {', '.join(features[:3])}."
if features else
"The delivery focused on turning the request into an initial runnable application skeleton."
)
tech_sentence = (
f"The generated implementation currently targets {', '.join(tech_stack[:3])}."
if tech_stack else
"The implementation was created with the current default stack configured for the factory."
)
file_sentence = (
f"Key artifacts were updated across {len(changed_files)} files, including {', '.join(changed_files[:3])}."
if changed_files else
"The service completed the generation flow, but no changed file list was returned."
)
if repo_url:
repo_sentence = f"The resulting project is tracked at {repo_url}."
elif repo_status in {'pending', 'skipped', 'error'}:
repo_sentence = "Repository provisioning was not confirmed, so review the Gitea status in the dashboard before assuming a remote repo exists."
else:
repo_sentence = "The project is ready for further review in the dashboard."
if pr_url and pr_state == 'open':
pr_sentence = f"An open pull request is ready for review at {pr_url}, and later prompts will continue updating that same PR until it is merged."
elif pr_url:
pr_sentence = f"The latest pull request is available at {pr_url}."
else:
pr_sentence = "No pull request link was recorded for this delivery."
return ' '.join([first_sentence, feature_sentence, tech_sentence, file_sentence, repo_sentence, pr_sentence])

View File

@@ -101,6 +101,39 @@ class DatabaseManager:
return {"value": metadata}
return {"value": metadata}
@staticmethod
def _make_onboarded_project_id(owner: str, repo_name: str) -> str:
"""Build a stable project id for a repository onboarded from Gitea."""
raw = f"external-{owner}-{repo_name}".lower()
sanitized = ''.join(ch if ch.isalnum() or ch == '-' else '-' for ch in raw)
while '--' in sanitized:
sanitized = sanitized.replace('--', '-')
return sanitized.strip('-') or 'external-project'
def get_project_by_repository(self, owner: str, repo_name: str) -> ProjectHistory | None:
"""Return the project currently associated with a repository."""
normalized_owner = (owner or '').strip().lower()
normalized_repo = (repo_name or '').strip().lower()
if not normalized_owner or not normalized_repo:
return None
for history in self.db.query(ProjectHistory).order_by(ProjectHistory.updated_at.desc(), ProjectHistory.id.desc()).all():
repository = self._get_project_repository(history) or {}
if (repository.get('owner') or '').strip().lower() == normalized_owner and (repository.get('name') or '').strip().lower() == normalized_repo:
return history
return None
@staticmethod
def _classify_branch_scope(branch_name: str | None) -> str | None:
"""Classify a branch into main, pr, or manual scope for filtering."""
normalized = (branch_name or '').strip().lower()
if not normalized:
return None
if normalized in {'main', 'master', 'trunk'}:
return 'main'
if normalized.startswith('ai/') or normalized.startswith('pr/') or normalized.startswith('pull/'):
return 'pr'
return 'manual'
def log_project_start(self, project_id: str, project_name: str, description: str) -> ProjectHistory:
"""Log project start."""
history = ProjectHistory(
@@ -184,6 +217,102 @@ class DatabaseManager:
self.db.refresh(audit)
return audit
def log_llm_trace(
self,
project_id: str,
history_id: int | None,
prompt_id: int | None,
stage: str,
provider: str,
model: str,
system_prompt: str,
user_prompt: str,
assistant_response: str,
raw_response: object | None = None,
fallback_used: bool = False,
) -> AuditTrail:
"""Persist an LLM interaction trace for a prompt."""
audit = AuditTrail(
project_id=project_id,
action='LLM_TRACE',
actor=provider,
action_type=stage,
details=f'LLM trace for {stage}',
message=f'LLM trace: {stage}',
metadata_json={
'history_id': history_id,
'prompt_id': prompt_id,
'stage': stage,
'provider': provider,
'model': model,
'system_prompt': system_prompt,
'user_prompt': user_prompt,
'assistant_response': assistant_response,
'raw_response': raw_response,
'fallback_used': fallback_used,
},
)
self.db.add(audit)
self.db.commit()
self.db.refresh(audit)
return audit
def get_llm_traces(
self,
project_id: str | None = None,
prompt_id: int | None = None,
history_id: int | None = None,
stage: str | None = None,
model: str | None = None,
search_query: str | None = None,
limit: int = 200,
) -> list[dict]:
"""Return persisted LLM traces."""
query = self.db.query(AuditTrail).filter(AuditTrail.action == 'LLM_TRACE')
if project_id:
query = query.filter(AuditTrail.project_id == project_id)
traces = query.order_by(AuditTrail.created_at.desc(), AuditTrail.id.desc()).limit(limit).all()
result = []
for trace in traces:
metadata = self._normalize_metadata(trace.metadata_json)
if prompt_id is not None and metadata.get('prompt_id') != prompt_id:
continue
if history_id is not None and metadata.get('history_id') != history_id:
continue
if stage and metadata.get('stage') != stage:
continue
if model and metadata.get('model') != model:
continue
if search_query:
haystacks = [
str(metadata.get('system_prompt') or ''),
str(metadata.get('user_prompt') or ''),
str(metadata.get('assistant_response') or ''),
str(metadata.get('stage') or ''),
str(metadata.get('model') or ''),
]
lowered = search_query.lower()
if not any(lowered in haystack.lower() for haystack in haystacks):
continue
result.append(
{
'id': trace.id,
'project_id': trace.project_id,
'history_id': metadata.get('history_id'),
'prompt_id': metadata.get('prompt_id'),
'stage': metadata.get('stage'),
'provider': metadata.get('provider'),
'model': metadata.get('model'),
'system_prompt': metadata.get('system_prompt'),
'user_prompt': metadata.get('user_prompt'),
'assistant_response': metadata.get('assistant_response'),
'raw_response': metadata.get('raw_response'),
'fallback_used': bool(metadata.get('fallback_used')),
'timestamp': trace.created_at.isoformat() if trace.created_at else None,
}
)
return result
def log_progress_update(self, history_id: int, progress: int, step: str, message: str) -> None:
"""Log progress update."""
history = self.db.query(ProjectHistory).filter(
@@ -291,8 +420,14 @@ class DatabaseManager:
pr_title = pr_data.get("title", pr_data.get("pr_title", ""))
pr_body = pr_data.get("body", pr_data.get("pr_body", ""))
pr_state = pr_data.get("state", pr_data.get("pr_state", "open"))
pr_url = pr_data.get("url", pr_data.get("pr_url", ""))
pr_url = pr_data.get("html_url", pr_data.get("url", pr_data.get("pr_url", "")))
pr = self.db.query(PullRequest).filter(
PullRequest.history_id == history_id,
PullRequest.pr_number == pr_number,
).first()
created = pr is None
if pr is None:
pr = PullRequest(
history_id=history_id,
pr_number=pr_number,
@@ -301,14 +436,190 @@ class DatabaseManager:
base=pr_data.get("base", "main"),
user=pr_data.get("user", "system"),
pr_url=pr_url,
merged=False,
pr_state=pr_state
merged=pr_data.get("merged", False),
pr_state=pr_state,
)
self.db.add(pr)
else:
pr.pr_title = pr_title
pr.pr_body = pr_body
pr.base = pr_data.get("base", pr.base)
pr.user = pr_data.get("user", pr.user)
pr.pr_url = pr_url or pr.pr_url
pr.pr_state = pr_state
pr.merged = pr_data.get("merged", pr.merged)
pr.merged_at = pr_data.get("merged_at", pr.merged_at)
self.db.commit()
self.db.refresh(pr)
history = self.db.query(ProjectHistory).filter(ProjectHistory.id == history_id).first()
if history is not None:
self._log_audit_trail(
project_id=history.project_id,
action='PULL_REQUEST_TRACKED' if created else 'PULL_REQUEST_UPDATED',
actor=pr.user or 'gitea',
action_type='PULL_REQUEST',
details=f"Pull request #{pr.pr_number} is {pr.pr_state}",
message=f"PR #{pr.pr_number}: {pr.pr_title}",
metadata_json={
'history_id': history_id,
'pr_number': pr.pr_number,
'pr_title': pr.pr_title,
'pr_state': pr.pr_state,
'pr_url': pr.pr_url,
'merged': pr.merged,
'head': pr_data.get('head'),
'branch_scope': self._classify_branch_scope(pr_data.get('head')),
},
)
return pr
def get_pull_requests(
self,
project_id: str | None = None,
history_id: int | None = None,
state: str | None = None,
only_open: bool = False,
) -> list[dict]:
"""Return pull requests for a project or history."""
query = self.db.query(PullRequest)
if history_id is not None:
query = query.filter(PullRequest.history_id == history_id)
elif project_id is not None:
history = self.get_project_by_id(project_id)
if history is None:
return []
query = query.filter(PullRequest.history_id == history.id)
if only_open:
query = query.filter(PullRequest.pr_state == 'open', PullRequest.merged.is_(False))
elif state:
query = query.filter(PullRequest.pr_state == state)
items = query.order_by(PullRequest.created_at.desc(), PullRequest.id.desc()).all()
return [
{
'id': item.id,
'history_id': item.history_id,
'pr_number': item.pr_number,
'pr_title': item.pr_title,
'pr_body': item.pr_body,
'base': item.base,
'user': item.user,
'pr_url': item.pr_url,
'pr_state': item.pr_state,
'merged': item.merged,
'merged_at': item.merged_at.isoformat() if item.merged_at else None,
'created_at': item.created_at.isoformat() if item.created_at else None,
}
for item in items
]
def get_open_pull_request(self, project_id: str | None = None, history_id: int | None = None) -> dict | None:
"""Return the newest open pull request for a project/history."""
prs = self.get_pull_requests(project_id=project_id, history_id=history_id, only_open=True)
return prs[0] if prs else None
def sync_pull_request_states(self, gitea_api, project_id: str | None = None) -> list[dict]:
"""Refresh persisted PR states from Gitea."""
items = self.get_pull_requests(project_id=project_id)
updates: list[dict] = []
for item in items:
history = self.db.query(ProjectHistory).filter(ProjectHistory.id == item['history_id']).first()
if history is None:
continue
repository = self._get_project_repository(history) or {}
owner = repository.get('owner')
repo = repository.get('name')
if not owner or not repo:
continue
remote = gitea_api.get_pull_request_sync(pr_number=item['pr_number'], owner=owner, repo=repo)
if isinstance(remote, dict) and remote.get('error'):
continue
pr = self.db.query(PullRequest).filter(PullRequest.id == item['id']).first()
if pr is None:
continue
previous_state = pr.pr_state
previous_merged = pr.merged
pr.pr_state = remote.get('state', pr.pr_state)
pr.pr_title = remote.get('title', pr.pr_title)
pr.pr_body = remote.get('body', pr.pr_body)
pr.pr_url = remote.get('html_url', pr.pr_url)
pr.merged = bool(remote.get('merged', pr.merged))
if history is not None and (pr.pr_state != previous_state or pr.merged != previous_merged):
self._log_audit_trail(
project_id=history.project_id,
action='PULL_REQUEST_UPDATED',
actor='gitea-sync',
action_type='PULL_REQUEST',
details=f"Pull request #{pr.pr_number} is {pr.pr_state}",
message=f"PR #{pr.pr_number} refreshed from Gitea",
metadata_json={
'history_id': history.id,
'pr_number': pr.pr_number,
'pr_title': pr.pr_title,
'pr_state': pr.pr_state,
'pr_url': pr.pr_url,
'merged': pr.merged,
'head': (remote.get('head') or {}).get('ref') if isinstance(remote.get('head'), dict) else remote.get('head'),
'branch_scope': self._classify_branch_scope((remote.get('head') or {}).get('ref') if isinstance(remote.get('head'), dict) else remote.get('head')),
},
)
updates.append({'id': pr.id, 'pr_state': pr.pr_state, 'merged': pr.merged})
if updates:
self.db.commit()
return updates
def get_latest_project_by_name(self, project_name: str) -> ProjectHistory | None:
"""Return the most recently updated project with the requested name."""
return self.db.query(ProjectHistory).filter(
ProjectHistory.project_name == project_name
).order_by(ProjectHistory.updated_at.desc(), ProjectHistory.id.desc()).first()
def log_prompt_revert(
self,
project_id: str,
prompt_id: int,
reverted_commit_hash: str,
revert_commit_hash: str,
actor: str = 'user',
commit_url: str | None = None,
) -> AuditTrail:
"""Record that a specific prompt's changes were reverted."""
return self._log_audit_trail(
project_id=project_id,
action='PROMPT_REVERTED',
actor=actor,
action_type='UNDO',
details=f'Reverted prompt {prompt_id}',
message=f'Prompt {prompt_id} reverted',
metadata_json={
'prompt_id': prompt_id,
'reverted_commit_hash': reverted_commit_hash,
'revert_commit_hash': revert_commit_hash,
'commit_url': commit_url,
},
)
def get_prompt_reverts(self, project_id: str | None = None) -> dict[int, dict]:
"""Return the latest revert metadata keyed by prompt id."""
query = self.db.query(AuditTrail).filter(AuditTrail.action == 'PROMPT_REVERTED')
if project_id:
query = query.filter(AuditTrail.project_id == project_id)
reverts = query.order_by(AuditTrail.created_at.desc(), AuditTrail.id.desc()).all()
result: dict[int, dict] = {}
for revert in reverts:
metadata = self._normalize_metadata(revert.metadata_json)
prompt_id = metadata.get('prompt_id')
if prompt_id is None or prompt_id in result:
continue
result[prompt_id] = {
'prompt_id': prompt_id,
'reverted_commit_hash': metadata.get('reverted_commit_hash'),
'revert_commit_hash': metadata.get('revert_commit_hash'),
'commit_url': metadata.get('commit_url'),
'timestamp': revert.created_at.isoformat() if revert.created_at else None,
}
return result
def _get_latest_ui_snapshot_data(self, history_id: int) -> dict:
"""Return the latest stored UI snapshot payload for a project."""
snapshot = self.db.query(UISnapshot).filter(
@@ -513,7 +824,7 @@ class DatabaseManager:
def log_code_change(self, project_id: str, change_type: str, file_path: str,
actor: str, actor_type: str, details: str,
history_id: int | None = None, prompt_id: int | None = None,
diff_summary: str | None = None) -> AuditTrail:
diff_summary: str | None = None, diff_text: str | None = None) -> AuditTrail:
"""Log a code change."""
audit = AuditTrail(
project_id=project_id,
@@ -531,6 +842,7 @@ class DatabaseManager:
"prompt_id": prompt_id,
"details": details,
"diff_summary": diff_summary,
"diff_text": diff_text,
}
)
self.db.add(audit)
@@ -580,8 +892,19 @@ class DatabaseManager:
if not links:
return []
prompt_map = {prompt["id"]: {**prompt, "changes": []} for prompt in prompt_events}
prompt_map = {prompt["id"]: {**prompt, "changes": [], "commits": [], "llm_traces": []} for prompt in prompt_events}
change_map = {change["id"]: change for change in self.get_code_changes(project_id=project_id, limit=limit * 10)}
revert_map = self.get_prompt_reverts(project_id=project_id)
commits_by_prompt: dict[int, list[dict]] = {}
traces_by_prompt: dict[int, list[dict]] = {}
for commit in self.get_commits(project_id=project_id, limit=limit * 10):
prompt_id = commit.get("prompt_id")
if prompt_id is not None:
commits_by_prompt.setdefault(prompt_id, []).append(commit)
for trace in self.get_llm_traces(project_id=project_id, limit=limit * 20):
prompt_id = trace.get("prompt_id")
if prompt_id is not None:
traces_by_prompt.setdefault(prompt_id, []).append(trace)
for link in links:
prompt = prompt_map.get(link["prompt_audit_id"])
@@ -595,10 +918,20 @@ class DatabaseManager:
"change_type": link["change_type"] or change["action_type"],
"details": change["details"],
"diff_summary": change["diff_summary"],
"diff_text": change.get("diff_text"),
"timestamp": change["timestamp"],
}
)
for prompt_id, commits in commits_by_prompt.items():
prompt = prompt_map.get(prompt_id)
if prompt is not None:
prompt["commits"] = commits
for prompt_id, traces in traces_by_prompt.items():
prompt = prompt_map.get(prompt_id)
if prompt is not None:
prompt["llm_traces"] = traces
correlations = [
{
"project_id": prompt["project_id"],
@@ -608,6 +941,9 @@ class DatabaseManager:
"tech_stack": prompt["tech_stack"],
"timestamp": prompt["timestamp"],
"changes": prompt["changes"],
"commits": prompt["commits"],
"llm_traces": prompt["llm_traces"],
"revert": revert_map.get(prompt["id"]),
}
for prompt in prompt_map.values()
]
@@ -628,6 +964,7 @@ class DatabaseManager:
grouped.setdefault(event.project_id or "", []).append(event)
correlations: list[dict] = []
revert_map = self.get_prompt_reverts(project_id=project_id)
for grouped_project_id, project_events in grouped.items():
current_prompt: AuditTrail | None = None
current_changes: list[AuditTrail] = []
@@ -649,10 +986,14 @@ class DatabaseManager:
"change_type": change.action_type,
"details": self._normalize_metadata(change.metadata_json).get("details", change.details),
"diff_summary": self._normalize_metadata(change.metadata_json).get("diff_summary"),
"diff_text": self._normalize_metadata(change.metadata_json).get("diff_text"),
"timestamp": change.created_at.isoformat() if change.created_at else None,
}
for change in current_changes
],
"commits": [],
"llm_traces": self.get_llm_traces(project_id=grouped_project_id, prompt_id=current_prompt.id),
"revert": revert_map.get(current_prompt.id),
})
current_prompt = event
current_changes = []
@@ -675,17 +1016,31 @@ class DatabaseManager:
"change_type": change.action_type,
"details": self._normalize_metadata(change.metadata_json).get("details", change.details),
"diff_summary": self._normalize_metadata(change.metadata_json).get("diff_summary"),
"diff_text": self._normalize_metadata(change.metadata_json).get("diff_text"),
"timestamp": change.created_at.isoformat() if change.created_at else None,
}
for change in current_changes
],
"commits": [],
"llm_traces": self.get_llm_traces(project_id=grouped_project_id, prompt_id=current_prompt.id),
"revert": revert_map.get(current_prompt.id),
})
correlations.sort(key=lambda item: item["timestamp"] or "", reverse=True)
return correlations[:limit]
def log_commit(self, project_id: str, commit_message: str, actor: str,
actor_type: str = "agent") -> AuditTrail:
actor_type: str = "agent", history_id: int | None = None,
prompt_id: int | None = None, commit_hash: str | None = None,
changed_files: list[str] | None = None, branch: str | None = None,
commit_url: str | None = None, compare_url: str | None = None,
remote_status: str | None = None, source: str | None = None,
imported_from_remote: bool = False,
repository_owner: str | None = None,
repository_name: str | None = None,
author_name: str | None = None,
author_email: str | None = None,
commit_parents: list[str] | None = None) -> AuditTrail:
"""Log a git commit."""
audit = AuditTrail(
project_id=project_id,
@@ -694,12 +1049,435 @@ class DatabaseManager:
action_type="COMMIT",
details=f"Commit: {commit_message}",
message=f"Git commit: {commit_message}",
metadata_json=json.dumps({"commit": commit_message, "actor": actor, "actor_type": actor_type})
metadata_json=json.dumps({
"commit": commit_message,
"actor": actor,
"actor_type": actor_type,
"history_id": history_id,
"prompt_id": prompt_id,
"commit_hash": commit_hash,
"changed_files": changed_files or [],
"branch": branch,
"commit_url": commit_url,
"compare_url": compare_url,
"remote_status": remote_status,
"source": source or "factory",
"imported_from_remote": imported_from_remote,
"repository_owner": repository_owner,
"repository_name": repository_name,
"author_name": author_name,
"author_email": author_email,
"commit_parents": commit_parents or [],
})
)
self.db.add(audit)
self.db.commit()
self.db.refresh(audit)
return audit
def onboard_repository(
self,
owner: str,
repo_name: str,
repository_data: dict,
actor_name: str = 'dashboard',
sync_result: dict | None = None,
) -> dict:
"""Create or update a tracked project for a manually created repository."""
existing = self.get_project_by_repository(owner=owner, repo_name=repo_name)
created = existing is None
if existing is None:
existing = self.log_project_start(
project_id=self._make_onboarded_project_id(owner, repo_name),
project_name=repository_data.get('name') or repo_name,
description=repository_data.get('description') or f'Onboarded repository {owner}/{repo_name}',
)
existing.status = ProjectStatus.COMPLETED.value
existing.progress = 100
existing.message = 'Repository onboarded from Gitea'
self.db.commit()
else:
existing.project_name = repository_data.get('name') or existing.project_name or repo_name
existing.description = repository_data.get('description') or existing.description
existing.message = 'Repository onboarding refreshed from Gitea'
self.db.commit()
snapshot_payload = {
'repository': {
'owner': owner,
'name': repo_name,
'url': repository_data.get('html_url') or repository_data.get('url') or f"{settings.gitea_url.rstrip('/')}/{owner}/{repo_name}",
'clone_url': repository_data.get('clone_url') or repository_data.get('ssh_url') or repository_data.get('html_url'),
'default_branch': repository_data.get('default_branch') or 'main',
'private': bool(repository_data.get('private', False)),
'mode': 'onboarded',
'status': 'onboarded',
'description': repository_data.get('description') or '',
'api_response': repository_data,
}
}
self.save_ui_snapshot(existing.id, snapshot_payload)
self.log_user_action(
history_id=existing.id,
action_type='REPOSITORY_ONBOARDED',
actor_type='user',
actor_name=actor_name,
action_description=f'Onboarded repository {owner}/{repo_name}',
action_data={'owner': owner, 'repo': repo_name, 'created': created},
)
self._log_audit_trail(
project_id=existing.project_id,
action='REPOSITORY_ONBOARDED',
actor=actor_name,
action_type='SYNC',
details=f'Onboarded repository {owner}/{repo_name}',
message=f'Repository {owner}/{repo_name} onboarded',
metadata_json={
'history_id': existing.id,
'owner': owner,
'repo': repo_name,
'created': created,
'sync_result': sync_result or {},
},
)
return {
'history_id': existing.id,
'project_id': existing.project_id,
'created': created,
'repository': snapshot_payload['repository'],
}
def sync_repository_activity(self, project_id: str, gitea_api, commit_limit: int = 25) -> dict:
"""Import recent remote commits and pull requests for a tracked repository."""
history = self.get_project_by_id(project_id)
if history is None:
return {'status': 'error', 'message': 'Project not found'}
repository = self._get_project_repository(history) or {}
owner = repository.get('owner')
repo_name = repository.get('name')
if not owner or not repo_name:
return {'status': 'error', 'message': 'Repository metadata is missing for this project'}
def _record_sync_event(result: dict) -> dict:
self._log_audit_trail(
project_id=project_id,
action='REPOSITORY_SYNCED',
actor='gitea-sync',
action_type='SYNC',
details=result.get('message') or f'Synchronized repository activity for {owner}/{repo_name}',
message=result.get('message') or f'Repository sync complete for {owner}/{repo_name}',
metadata_json={
'history_id': history.id,
'owner': owner,
'repo': repo_name,
'status': result.get('status', 'success'),
'imported_commit_count': result.get('imported_commit_count', 0),
'observed_pull_requests': result.get('observed_pull_requests', 0),
'error': result.get('error'),
'branches_checked': result.get('branches_checked', []),
},
)
return result
existing_hashes = {
commit.get('commit_hash')
for commit in self.get_commits(project_id=project_id, limit=max(commit_limit * 4, 200))
if commit.get('commit_hash')
}
imported_commits: list[dict] = []
branches = gitea_api.list_branches_sync(owner=owner, repo=repo_name)
if isinstance(branches, dict) and branches.get('error'):
branch_names = [repository.get('default_branch') or 'main']
else:
branch_names = [
branch.get('name')
for branch in branches if isinstance(branch, dict) and branch.get('name')
] or [repository.get('default_branch') or 'main']
seen_branches: set[str] = set()
for branch_name in branch_names:
if not branch_name or branch_name in seen_branches:
continue
seen_branches.add(branch_name)
commits = gitea_api.list_repo_commits_sync(owner=owner, repo=repo_name, limit=commit_limit, branch=branch_name)
if isinstance(commits, dict) and commits.get('error'):
continue
for commit in commits if isinstance(commits, list) else []:
commit_hash = commit.get('sha') or commit.get('id')
if not commit_hash or commit_hash in existing_hashes:
continue
detailed_commit = gitea_api.get_commit_sync(commit_hash=commit_hash, owner=owner, repo=repo_name)
if isinstance(detailed_commit, dict) and detailed_commit.get('error'):
detailed_commit = commit
commit_payload = detailed_commit.get('commit', {}) if isinstance(detailed_commit, dict) else {}
author_payload = commit_payload.get('author', {}) if isinstance(commit_payload, dict) else {}
file_items = detailed_commit.get('files', []) if isinstance(detailed_commit, dict) else []
changed_files = [
item.get('filename') or item.get('file') or item.get('path')
for item in file_items
if isinstance(item, dict) and (item.get('filename') or item.get('file') or item.get('path'))
]
parent_hashes = [
parent.get('sha') or parent.get('id')
for parent in (detailed_commit.get('parents', []) if isinstance(detailed_commit, dict) else [])
if isinstance(parent, dict)
]
commit_message = commit_payload.get('message') or commit.get('commit', {}).get('message') or f'Imported commit {commit_hash[:12]}'
author_name = (
(commit.get('author') or {}).get('login')
or author_payload.get('name')
or (commit.get('commit', {}).get('author') or {}).get('name')
or 'unknown'
)
author_email = author_payload.get('email') or (commit.get('commit', {}).get('author') or {}).get('email')
self.log_commit(
project_id=project_id,
commit_message=commit_message.splitlines()[0],
actor=author_name,
actor_type='user',
history_id=history.id,
prompt_id=None,
commit_hash=commit_hash,
changed_files=changed_files,
branch=branch_name,
commit_url=gitea_api.build_commit_url(commit_hash, owner=owner, repo=repo_name),
compare_url=gitea_api.build_compare_url(parent_hashes[0], commit_hash, owner=owner, repo=repo_name) if parent_hashes else None,
remote_status='imported',
source='gitea_sync',
imported_from_remote=True,
repository_owner=owner,
repository_name=repo_name,
author_name=author_name,
author_email=author_email,
commit_parents=parent_hashes,
)
existing_hashes.add(commit_hash)
imported_commits.append({'commit_hash': commit_hash, 'message': commit_message.splitlines()[0], 'branch': branch_name})
imported_prs = 0
for state in ('open', 'closed'):
pull_requests = gitea_api.list_pull_requests_sync(owner=owner, repo=repo_name, state=state)
if isinstance(pull_requests, dict) and pull_requests.get('error'):
continue
for pull_request in pull_requests if isinstance(pull_requests, list) else []:
self.save_pr_data(history.id, {
'pr_number': pull_request.get('number'),
'pr_title': pull_request.get('title'),
'pr_body': pull_request.get('body'),
'pr_state': pull_request.get('state', state),
'pr_url': pull_request.get('html_url') or gitea_api.build_pull_request_url(pull_request.get('number'), owner=owner, repo=repo_name),
'base': (pull_request.get('base') or {}).get('ref', 'main') if isinstance(pull_request.get('base'), dict) else pull_request.get('base', 'main'),
'head': (pull_request.get('head') or {}).get('ref') if isinstance(pull_request.get('head'), dict) else pull_request.get('head'),
'user': (pull_request.get('user') or {}).get('login', 'gitea') if isinstance(pull_request.get('user'), dict) else pull_request.get('user', 'gitea'),
'merged': bool(pull_request.get('merged', False)),
'merged_at': pull_request.get('merged_at'),
})
imported_prs += 1
sync_summary = {
'status': 'success',
'message': f'Synced repository activity for {owner}/{repo_name}',
'project_id': project_id,
'imported_commit_count': len(imported_commits),
'imported_commits': imported_commits,
'observed_pull_requests': imported_prs,
'branches_checked': sorted(seen_branches),
'repository': repository,
}
return _record_sync_event(sync_summary)
def get_repository_sync_status(self, project_id: str) -> dict | None:
"""Return the most recent repository sync result for a project."""
audit = self.db.query(AuditTrail).filter(
AuditTrail.project_id == project_id,
AuditTrail.action == 'REPOSITORY_SYNCED',
).order_by(AuditTrail.created_at.desc(), AuditTrail.id.desc()).first()
if audit is None:
return None
metadata = self._normalize_metadata(audit.metadata_json)
return {
'status': metadata.get('status', 'success'),
'message': audit.message,
'error': metadata.get('error'),
'imported_commit_count': metadata.get('imported_commit_count', 0),
'observed_pull_requests': metadata.get('observed_pull_requests', 0),
'branches_checked': metadata.get('branches_checked', []),
'timestamp': audit.created_at.isoformat() if audit.created_at else None,
'owner': metadata.get('owner'),
'repo': metadata.get('repo'),
}
def get_commits(self, project_id: str | None = None, limit: int = 100) -> list[dict]:
"""Return git commit events from the audit trail."""
query = self.db.query(AuditTrail).filter(AuditTrail.action == "GIT_COMMIT")
if project_id:
query = query.filter(AuditTrail.project_id == project_id)
commits = query.order_by(AuditTrail.created_at.desc()).limit(limit).all()
return [
{
"id": commit.id,
"project_id": commit.project_id,
"actor": commit.actor,
"commit_message": self._normalize_metadata(commit.metadata_json).get("commit", commit.details),
"commit_hash": self._normalize_metadata(commit.metadata_json).get("commit_hash"),
"history_id": self._normalize_metadata(commit.metadata_json).get("history_id"),
"prompt_id": self._normalize_metadata(commit.metadata_json).get("prompt_id"),
"changed_files": self._normalize_metadata(commit.metadata_json).get("changed_files", []),
"branch": self._normalize_metadata(commit.metadata_json).get("branch"),
"branch_scope": self._classify_branch_scope(self._normalize_metadata(commit.metadata_json).get("branch")),
"commit_url": self._normalize_metadata(commit.metadata_json).get("commit_url"),
"compare_url": self._normalize_metadata(commit.metadata_json).get("compare_url"),
"remote_status": self._normalize_metadata(commit.metadata_json).get("remote_status"),
"source": self._normalize_metadata(commit.metadata_json).get("source", "factory"),
"imported_from_remote": bool(self._normalize_metadata(commit.metadata_json).get("imported_from_remote")),
"repository_owner": self._normalize_metadata(commit.metadata_json).get("repository_owner"),
"repository_name": self._normalize_metadata(commit.metadata_json).get("repository_name"),
"author_name": self._normalize_metadata(commit.metadata_json).get("author_name"),
"author_email": self._normalize_metadata(commit.metadata_json).get("author_email"),
"commit_parents": self._normalize_metadata(commit.metadata_json).get("commit_parents", []),
"timestamp": commit.created_at.isoformat() if commit.created_at else None,
}
for commit in commits
]
def get_project_timeline(self, project_id: str, limit: int = 200, branch_scope: str | None = None) -> list[dict]:
"""Return a chronologically ordered project timeline across prompts, traces, commits, PRs, and sync events."""
history = self.get_project_by_id(project_id)
if history is None:
return []
commits_for_project = self.get_commits(project_id=project_id, limit=max(limit * 4, 200))
prompt_branch_scopes: dict[int, set[str]] = {}
for commit in commits_for_project:
if commit.get('prompt_id') is None or not commit.get('branch_scope'):
continue
prompt_branch_scopes.setdefault(commit['prompt_id'], set()).add(commit['branch_scope'])
interesting_actions = {
'PROMPT_RECEIVED',
'LLM_TRACE',
'GIT_COMMIT',
'PROMPT_REVERTED',
'REPOSITORY_ONBOARDED',
'REPOSITORY_SYNCED',
'PULL_REQUEST_TRACKED',
'PULL_REQUEST_UPDATED',
}
audits = self.db.query(AuditTrail).filter(
AuditTrail.project_id == project_id,
AuditTrail.action.in_(interesting_actions),
).order_by(AuditTrail.created_at.desc(), AuditTrail.id.desc()).limit(limit).all()
timeline = []
for audit in audits:
metadata = self._normalize_metadata(audit.metadata_json)
item_type = (audit.action or '').lower()
title = audit.message or audit.details or audit.action or 'Event'
event_branch_scope = metadata.get('branch_scope')
event_branch = metadata.get('branch') or metadata.get('head')
if audit.action == 'PROMPT_RECEIVED':
item_type = 'prompt'
title = 'Prompt received'
metadata = {**metadata, 'prompt_id': metadata.get('prompt_id') or audit.id}
scopes = sorted(prompt_branch_scopes.get(audit.id, set()))
if scopes:
metadata['branch_scopes'] = scopes
event_branch_scope = scopes[0] if len(scopes) == 1 else 'mixed'
elif audit.action == 'LLM_TRACE':
item_type = 'llm_trace'
title = f"LLM: {metadata.get('stage') or 'trace'}"
prompt_id = metadata.get('prompt_id')
scopes = sorted(prompt_branch_scopes.get(prompt_id, set())) if prompt_id is not None else []
if scopes:
metadata['branch_scopes'] = scopes
event_branch_scope = scopes[0] if len(scopes) == 1 else 'mixed'
elif audit.action == 'GIT_COMMIT':
item_type = 'commit'
title = metadata.get('commit') or 'Commit recorded'
event_branch_scope = self._classify_branch_scope(metadata.get('branch'))
elif audit.action == 'PROMPT_REVERTED':
item_type = 'revert'
title = f"Prompt {metadata.get('prompt_id')} reverted"
scopes = sorted(prompt_branch_scopes.get(metadata.get('prompt_id'), set())) if metadata.get('prompt_id') is not None else []
if scopes:
metadata['branch_scopes'] = scopes
event_branch_scope = scopes[0] if len(scopes) == 1 else 'mixed'
elif audit.action.startswith('PULL_REQUEST_'):
item_type = 'pull_request'
title = f"PR #{metadata.get('pr_number')} {metadata.get('pr_state') or 'updated'}"
event_branch_scope = self._classify_branch_scope(metadata.get('head'))
elif audit.action.startswith('REPOSITORY_'):
item_type = 'repository'
if branch_scope:
scopes = metadata.get('branch_scopes') or []
if event_branch_scope == branch_scope:
pass
elif branch_scope in scopes:
pass
else:
continue
timeline.append(
{
'id': audit.id,
'project_id': project_id,
'type': item_type,
'action': audit.action,
'title': title,
'actor': audit.actor,
'details': audit.details,
'metadata': {**metadata, 'branch_scope': event_branch_scope, 'branch': event_branch},
'timestamp': audit.created_at.isoformat() if audit.created_at else None,
}
)
return timeline
def get_commit_context(self, commit_hash: str, project_id: str | None = None, branch_scope: str | None = None) -> dict | None:
"""Return the recorded context explaining how a commit came to be."""
normalized = (commit_hash or '').strip()
if not normalized:
return None
candidates = self.get_commits(project_id=project_id, limit=1000)
exact_match = next((item for item in candidates if item.get('commit_hash') == normalized), None)
commit = exact_match or next((item for item in candidates if (item.get('commit_hash') or '').startswith(normalized)), None)
if commit is None:
return None
if branch_scope and commit.get('branch_scope') != branch_scope:
return None
project_bundle = self.get_project_audit_data(commit['project_id'])
prompt = None
correlation = None
if commit.get('prompt_id') is not None:
prompt = next((item for item in project_bundle.get('prompts', []) if item.get('id') == commit.get('prompt_id')), None)
correlation = next((item for item in project_bundle.get('prompt_change_correlations', []) if item.get('prompt_id') == commit.get('prompt_id')), None)
timeline = self.get_project_timeline(commit['project_id'], limit=40, branch_scope=branch_scope)
surrounding_events = []
commit_timestamp = commit.get('timestamp')
for item in timeline:
if correlation and item.get('metadata', {}).get('prompt_id') == correlation.get('prompt_id'):
surrounding_events.append(item)
elif item.get('type') == 'commit' and (item.get('metadata', {}).get('commit_hash') or '').startswith(commit.get('commit_hash') or ''):
surrounding_events.append(item)
elif len(surrounding_events) < 8 and item.get('timestamp') and commit_timestamp and item.get('timestamp') <= commit_timestamp:
surrounding_events.append(item)
if prompt is not None:
origin_summary = 'Prompt-linked AI commit with recorded prompt, traces, and resulting code changes.'
elif commit.get('imported_from_remote'):
origin_summary = 'Imported from Gitea with no originating prompt recorded inside the factory.'
else:
origin_summary = 'Commit was recorded without a linked originating prompt.'
return {
'commit': commit,
'project': project_bundle.get('project'),
'repository': project_bundle.get('repository'),
'prompt': prompt,
'correlation': correlation,
'related_changes': (correlation or {}).get('changes', []),
'related_llm_traces': (correlation or {}).get('llm_traces', []),
'pull_requests': project_bundle.get('pull_requests', []),
'timeline': surrounding_events,
'origin_summary': origin_summary,
}
def get_project_audit_data(self, project_id: str) -> dict:
"""Get comprehensive audit data for a project."""
history = self.db.query(ProjectHistory).filter(
@@ -714,7 +1492,11 @@ class DatabaseManager:
"audit_trail": [],
"prompts": [],
"code_changes": [],
"commits": [],
"pull_requests": [],
"llm_traces": [],
"prompt_change_correlations": [],
"timeline": [],
}
# Get logs
@@ -734,8 +1516,13 @@ class DatabaseManager:
prompts = self.get_prompt_events(project_id=project_id)
code_changes = self.get_code_changes(project_id=project_id)
commits = self.get_commits(project_id=project_id)
pull_requests = self.get_pull_requests(project_id=project_id)
llm_traces = self.get_llm_traces(project_id=project_id)
correlations = self.get_prompt_change_correlations(project_id=project_id)
repository = self._get_project_repository(history)
timeline = self.get_project_timeline(project_id=project_id)
repository_sync = self.get_repository_sync_status(project_id=project_id)
return {
"project": {
@@ -749,6 +1536,8 @@ class DatabaseManager:
"error_message": history.error_message,
"current_step": history.current_step,
"repository": repository,
"repository_sync": repository_sync,
"open_pull_requests": len([pr for pr in pull_requests if pr["pr_state"] == "open" and not pr["merged"]]),
"completed_at": history.completed_at.isoformat() if history.completed_at else None,
"created_at": history.started_at.isoformat() if history.started_at else None
},
@@ -787,8 +1576,13 @@ class DatabaseManager:
],
"prompts": prompts,
"code_changes": code_changes,
"commits": commits,
"pull_requests": pull_requests,
"llm_traces": llm_traces,
"prompt_change_correlations": correlations,
"timeline": timeline,
"repository": repository,
"repository_sync": repository_sync,
}
def get_prompt_events(self, project_id: str | None = None, limit: int = 100) -> list[dict]:
@@ -829,6 +1623,7 @@ class DatabaseManager:
"prompt_id": self._normalize_metadata(change.metadata_json).get("prompt_id"),
"history_id": self._normalize_metadata(change.metadata_json).get("history_id"),
"diff_summary": self._normalize_metadata(change.metadata_json).get("diff_summary"),
"diff_text": self._normalize_metadata(change.metadata_json).get("diff_text"),
"timestamp": change.created_at.isoformat() if change.created_at else None,
}
for change in changes
@@ -843,6 +1638,22 @@ class DatabaseManager:
def get_dashboard_snapshot(self, limit: int = 8) -> dict:
"""Return DB-backed dashboard data for the UI."""
if settings.gitea_url and settings.gitea_token:
try:
try:
from .gitea import GiteaAPI
except ImportError:
from agents.gitea import GiteaAPI
self.sync_pull_request_states(
GiteaAPI(
token=settings.GITEA_TOKEN,
base_url=settings.GITEA_URL,
owner=settings.GITEA_OWNER,
repo=settings.GITEA_REPO or '',
)
)
except Exception:
pass
projects = self.db.query(ProjectHistory).order_by(ProjectHistory.updated_at.desc()).limit(limit).all()
system_logs = self.db.query(SystemLog).order_by(SystemLog.created_at.desc()).limit(limit).all()
return {
@@ -853,6 +1664,7 @@ class DatabaseManager:
"error_projects": self.db.query(ProjectHistory).filter(ProjectHistory.status == ProjectStatus.ERROR.value).count(),
"prompt_events": self.db.query(AuditTrail).filter(AuditTrail.action == "PROMPT_RECEIVED").count(),
"code_changes": self.db.query(AuditTrail).filter(AuditTrail.action == "CODE_CHANGE").count(),
"open_pull_requests": self.db.query(PullRequest).filter(PullRequest.pr_state == "open", PullRequest.merged.is_(False)).count(),
},
"projects": [self.get_project_audit_data(project.project_id) for project in projects],
"system_logs": [

View File

@@ -2,6 +2,7 @@
import os
import subprocess
import tempfile
from pathlib import Path
from typing import Optional
@@ -14,10 +15,13 @@ except ImportError:
class GitManager:
"""Manages git operations for the project."""
def __init__(self, project_id: str):
def __init__(self, project_id: str, project_dir: str | None = None):
if not project_id:
raise ValueError("project_id cannot be empty or None")
self.project_id = project_id
if project_dir:
resolved = Path(project_dir).expanduser().resolve()
else:
project_path = Path(project_id)
if project_path.is_absolute() or len(project_path.parts) > 1:
resolved = project_path.expanduser().resolve()
@@ -28,39 +32,113 @@ class GitManager:
resolved = (base_root / project_id).resolve()
self.project_dir = str(resolved)
def _run(self, args: list[str], env: dict | None = None, check: bool = True) -> subprocess.CompletedProcess:
"""Run a git command in the project directory."""
return subprocess.run(
args,
check=check,
capture_output=True,
text=True,
cwd=self.project_dir,
env=env,
)
def has_repo(self) -> bool:
"""Return whether the project directory already contains a git repository."""
return Path(self.project_dir, '.git').exists()
def init_repo(self):
"""Initialize git repository."""
os.makedirs(self.project_dir, exist_ok=True)
os.chdir(self.project_dir)
subprocess.run(["git", "init"], check=True, capture_output=True)
self._run(["git", "init", "-b", "main"])
self._run(["git", "config", "user.name", "AI Software Factory"])
self._run(["git", "config", "user.email", "factory@local.invalid"])
def add_files(self, paths: list[str]):
"""Add files to git staging."""
subprocess.run(["git", "add"] + paths, check=True, capture_output=True)
self._run(["git", "add"] + paths)
def commit(self, message: str):
def checkout_branch(self, branch_name: str, create: bool = False, start_point: str | None = None) -> None:
"""Switch to a branch, optionally creating it from a start point."""
if create:
args = ["git", "checkout", "-B", branch_name]
if start_point:
args.append(start_point)
self._run(args)
return
self._run(["git", "checkout", branch_name])
def branch_exists(self, branch_name: str) -> bool:
"""Return whether a local branch exists."""
result = self._run(["git", "show-ref", "--verify", f"refs/heads/{branch_name}"], check=False)
return result.returncode == 0
def commit(self, message: str) -> str:
"""Create a git commit."""
subprocess.run(
["git", "commit", "-m", message],
check=True,
capture_output=True
)
self._run(["git", "commit", "-m", message])
return self.current_head()
def create_empty_commit(self, message: str) -> str:
"""Create an empty commit."""
self._run(["git", "commit", "--allow-empty", "-m", message])
return self.current_head()
def push(self, remote: str = "origin", branch: str = "main"):
"""Push changes to remote."""
subprocess.run(
["git", "push", "-u", remote, branch],
check=True,
capture_output=True
self._run(["git", "push", "-u", remote, branch])
def ensure_remote(self, remote: str, url: str) -> None:
"""Create or update a remote URL."""
result = self._run(["git", "remote", "get-url", remote], check=False)
if result.returncode == 0:
self._run(["git", "remote", "set-url", remote, url])
else:
self._run(["git", "remote", "add", remote, url])
def push_with_credentials(
self,
remote_url: str,
username: str,
password: str,
remote: str = "origin",
branch: str = "main",
) -> None:
"""Push to a remote over HTTPS using an askpass helper."""
os.makedirs(self.project_dir, exist_ok=True)
self.ensure_remote(remote, remote_url)
helper_contents = "#!/bin/sh\ncase \"$1\" in\n *Username*) printf '%s\\n' \"$GIT_ASKPASS_USERNAME\" ;;\n *) printf '%s\\n' \"$GIT_ASKPASS_PASSWORD\" ;;\nesac\n"
helper_path: str | None = None
try:
with tempfile.NamedTemporaryFile('w', delete=False, dir=self.project_dir, prefix='git-askpass-', suffix='.sh') as helper_file:
helper_file.write(helper_contents)
helper_path = helper_file.name
os.chmod(helper_path, 0o700)
env = os.environ.copy()
env.update(
{
"GIT_TERMINAL_PROMPT": "0",
"GIT_ASKPASS": helper_path,
"GIT_ASKPASS_USERNAME": username,
"GIT_ASKPASS_PASSWORD": password,
}
)
self._run(["git", "push", "-u", remote, branch], env=env)
finally:
if helper_path:
Path(helper_path).unlink(missing_ok=True)
def create_branch(self, branch_name: str):
"""Create and switch to a new branch."""
subprocess.run(
["git", "checkout", "-b", branch_name],
check=True,
capture_output=True
)
self._run(["git", "checkout", "-b", branch_name])
def revert_commit(self, commit_hash: str, no_edit: bool = True) -> str:
"""Revert a commit and return the new HEAD."""
args = ["git", "revert"]
if no_edit:
args.append("--no-edit")
args.append(commit_hash)
self._run(args)
return self.current_head()
def create_pr(
self,
@@ -84,6 +162,18 @@ class GitManager:
result = subprocess.run(
["git", "status", "--porcelain"],
capture_output=True,
text=True
text=True,
cwd=self.project_dir,
)
return result.stdout.strip()
def current_head(self) -> str:
"""Return the current commit hash."""
return self._run(["git", "rev-parse", "HEAD"]).stdout.strip()
def current_head_or_none(self) -> str | None:
"""Return the current commit hash when the repository already has commits."""
result = self._run(["git", "rev-parse", "HEAD"], check=False)
if result.returncode != 0:
return None
return result.stdout.strip() or None

View File

@@ -1,6 +1,9 @@
"""Gitea API integration for repository and pull request operations."""
import os
import urllib.error
import urllib.request
import json
class GiteaAPI:
@@ -41,6 +44,38 @@ class GiteaAPI:
"""Build a Gitea API URL from a relative path."""
return f"{self.base_url}/api/v1/{path.lstrip('/')}"
def build_repo_git_url(self, owner: str | None = None, repo: str | None = None) -> str | None:
"""Build the clone URL for a repository."""
_owner = owner or self.owner
_repo = repo or self.repo
if not _owner or not _repo:
return None
return f"{self.base_url}/{_owner}/{_repo}.git"
def build_commit_url(self, commit_hash: str, owner: str | None = None, repo: str | None = None) -> str | None:
"""Build a browser URL for a commit."""
_owner = owner or self.owner
_repo = repo or self.repo
if not _owner or not _repo or not commit_hash:
return None
return f"{self.base_url}/{_owner}/{_repo}/commit/{commit_hash}"
def build_compare_url(self, base_ref: str, head_ref: str, owner: str | None = None, repo: str | None = None) -> str | None:
"""Build a browser URL for a compare view."""
_owner = owner or self.owner
_repo = repo or self.repo
if not _owner or not _repo or not base_ref or not head_ref:
return None
return f"{self.base_url}/{_owner}/{_repo}/compare/{base_ref}...{head_ref}"
def build_pull_request_url(self, pr_number: int, owner: str | None = None, repo: str | None = None) -> str | None:
"""Build a browser URL for a pull request."""
_owner = owner or self.owner
_repo = repo or self.repo
if not _owner or not _repo or not pr_number:
return None
return f"{self.base_url}/{_owner}/{_repo}/pulls/{pr_number}"
async def _request(self, method: str, path: str, payload: dict | None = None) -> dict:
"""Perform a Gitea API request and normalize the response."""
try:
@@ -59,6 +94,30 @@ class GiteaAPI:
except Exception as e:
return {"error": str(e)}
def _request_sync(self, method: str, path: str, payload: dict | None = None) -> dict:
"""Perform a synchronous Gitea API request."""
request = urllib.request.Request(
self._api_url(path),
headers=self.get_auth_headers(),
method=method.upper(),
)
data = None
if payload is not None:
data = json.dumps(payload).encode('utf-8')
request.data = data
try:
with urllib.request.urlopen(request) as response:
body = response.read().decode('utf-8')
return json.loads(body) if body else {}
except urllib.error.HTTPError as exc:
try:
body = exc.read().decode('utf-8')
except Exception:
body = str(exc)
return {'error': body, 'status_code': exc.code}
except Exception as exc:
return {'error': str(exc)}
def build_project_repo_name(self, project_id: str, project_name: str | None = None) -> str:
"""Build a repository name for a generated project."""
preferred = (project_name or project_id or "project").strip().lower().replace(" ", "-")
@@ -97,6 +156,10 @@ class GiteaAPI:
result.setdefault("status", "created")
return result
async def get_current_user(self) -> dict:
"""Get the user associated with the configured token."""
return await self._request("GET", "user")
async def create_branch(self, branch: str, base: str = "main", owner: str | None = None, repo: str | None = None):
"""Create a new branch."""
_owner = owner or self.owner
@@ -127,6 +190,110 @@ class GiteaAPI:
}
return await self._request("POST", f"repos/{_owner}/{_repo}/pulls", payload)
async def list_pull_requests(
self,
owner: str | None = None,
repo: str | None = None,
state: str = 'open',
) -> dict | list:
"""List pull requests for a repository."""
_owner = owner or self.owner
_repo = repo or self.repo
return await self._request("GET", f"repos/{_owner}/{_repo}/pulls?state={state}")
def list_pull_requests_sync(
self,
owner: str | None = None,
repo: str | None = None,
state: str = 'open',
) -> dict | list:
"""Synchronously list pull requests for a repository."""
_owner = owner or self.owner
_repo = repo or self.repo
return self._request_sync("GET", f"repos/{_owner}/{_repo}/pulls?state={state}")
async def list_repositories(self, owner: str | None = None) -> dict | list:
"""List repositories within the configured organization."""
_owner = owner or self.owner
return await self._request("GET", f"orgs/{_owner}/repos")
def list_repositories_sync(self, owner: str | None = None) -> dict | list:
"""Synchronously list repositories within the configured organization."""
_owner = owner or self.owner
return self._request_sync("GET", f"orgs/{_owner}/repos")
async def list_branches(self, owner: str | None = None, repo: str | None = None) -> dict | list:
"""List repository branches."""
_owner = owner or self.owner
_repo = repo or self.repo
return await self._request("GET", f"repos/{_owner}/{_repo}/branches")
def list_branches_sync(self, owner: str | None = None, repo: str | None = None) -> dict | list:
"""Synchronously list repository branches."""
_owner = owner or self.owner
_repo = repo or self.repo
return self._request_sync("GET", f"repos/{_owner}/{_repo}/branches")
async def list_repo_commits(
self,
owner: str | None = None,
repo: str | None = None,
limit: int = 25,
branch: str | None = None,
) -> dict | list:
"""List recent commits for a repository."""
_owner = owner or self.owner
_repo = repo or self.repo
branch_query = f"&sha={branch}" if branch else ""
return await self._request("GET", f"repos/{_owner}/{_repo}/commits?limit={limit}{branch_query}")
def list_repo_commits_sync(
self,
owner: str | None = None,
repo: str | None = None,
limit: int = 25,
branch: str | None = None,
) -> dict | list:
"""Synchronously list recent commits for a repository."""
_owner = owner or self.owner
_repo = repo or self.repo
branch_query = f"&sha={branch}" if branch else ""
return self._request_sync("GET", f"repos/{_owner}/{_repo}/commits?limit={limit}{branch_query}")
async def get_commit(
self,
commit_hash: str,
owner: str | None = None,
repo: str | None = None,
) -> dict:
"""Return one commit by hash."""
_owner = owner or self.owner
_repo = repo or self.repo
return await self._request("GET", f"repos/{_owner}/{_repo}/git/commits/{commit_hash}")
def get_commit_sync(
self,
commit_hash: str,
owner: str | None = None,
repo: str | None = None,
) -> dict:
"""Synchronously return one commit by hash."""
_owner = owner or self.owner
_repo = repo or self.repo
return self._request_sync("GET", f"repos/{_owner}/{_repo}/git/commits/{commit_hash}")
async def get_pull_request(self, pr_number: int, owner: str | None = None, repo: str | None = None) -> dict:
"""Return one pull request by number."""
_owner = owner or self.owner
_repo = repo or self.repo
return await self._request("GET", f"repos/{_owner}/{_repo}/pulls/{pr_number}")
def get_pull_request_sync(self, pr_number: int, owner: str | None = None, repo: str | None = None) -> dict:
"""Synchronously return one pull request by number."""
_owner = owner or self.owner
_repo = repo or self.repo
return self._request_sync("GET", f"repos/{_owner}/{_repo}/pulls/{pr_number}")
async def push_commit(
self,
branch: str,

View File

@@ -359,7 +359,7 @@ class N8NSetupAgent:
"type": "n8n-nodes-base.telegramTrigger",
"typeVersion": 1,
"position": [-520, 120],
"parameters": {"updates": ["message"]},
"parameters": {"updates": ["message", "channel_post"]},
"credentials": {"telegramApi": {"name": credential_name}},
},
{
@@ -370,7 +370,7 @@ class N8NSetupAgent:
"position": [-180, 120],
"parameters": {
"language": "javaScript",
"jsCode": f"const allowedChatId = {allowed_chat};\nconst message = $json.message ?? $json;\nconst text = String(message.text ?? '').trim();\nconst chatId = String(message.chat?.id ?? '');\nif (!text) return [];\nif (allowedChatId && chatId !== allowedChatId) return [];\nreturn [{{ json: {{ prompt_text: text, source: 'telegram', chat_id: chatId, chat_type: message.chat?.type ?? null }} }}];",
"jsCode": f"const allowedChatId = {allowed_chat};\nconst message = $json.message ?? $json.channel_post ?? $json;\nconst text = String(message.text ?? '').trim();\nconst chatId = String(message.chat?.id ?? '');\nif (!text) return [];\nif (allowedChatId && chatId !== allowedChatId) return [];\nreturn [{{ json: {{ prompt_text: text, source: 'telegram', chat_id: chatId, chat_type: message.chat?.type ?? null }} }}];",
},
},
{
@@ -397,8 +397,8 @@ class N8NSetupAgent:
"parameters": {
"resource": "message",
"operation": "sendMessage",
"chatId": "={{ $('Telegram Trigger').item.json.message.chat.id }}",
"text": "={{ $json.data ? `Generated ${$json.data.name} (${($json.data.changed_files || []).length} files)` : ($json.message || 'Software generation request accepted') }}",
"chatId": "={{ ($('Telegram Trigger').item.json.message ?? $('Telegram Trigger').item.json.channel_post).chat.id }}",
"text": "={{ $json.summary_message || $json.data?.summary_message || $json.message || 'Software generation request accepted' }}",
},
"credentials": {"telegramApi": {"name": credential_name}},
},

View File

@@ -2,7 +2,9 @@
from __future__ import annotations
import difflib
import py_compile
import subprocess
from typing import Optional
from datetime import datetime
@@ -33,6 +35,7 @@ class AgentOrchestrator:
db=None,
prompt_text: str | None = None,
prompt_actor: str = "api",
existing_history=None,
):
"""Initialize orchestrator."""
self.project_id = project_id
@@ -49,6 +52,7 @@ class AgentOrchestrator:
self.db = db
self.prompt_text = prompt_text
self.prompt_actor = prompt_actor
self.existing_history = existing_history
self.changed_files: list[str] = []
self.gitea_api = GiteaAPI(
token=settings.GITEA_TOKEN,
@@ -60,10 +64,13 @@ class AgentOrchestrator:
self.prompt_audit = None
self.repo_name = settings.gitea_repo or self.gitea_api.build_project_repo_name(project_id, project_name)
self.repo_owner = settings.gitea_owner
self.repo_url = self._build_repo_url(self.repo_owner, self.repo_name)
self.repo_url = None
self.branch_name = self._build_pr_branch_name(project_id)
self.active_pull_request = None
self._gitea_username: str | None = None
# Initialize agents
self.git_manager = GitManager(project_id)
self.git_manager = GitManager(project_id, project_dir=str(self.project_root))
self.ui_manager = UIManager(project_id)
# Initialize database manager if db session provided
@@ -71,18 +78,30 @@ class AgentOrchestrator:
self.history = None
if db:
self.db_manager = DatabaseManager(db)
# Log project start to database
if existing_history is not None:
self.history = existing_history
self.project_id = existing_history.project_id
self.project_name = existing_history.project_name or project_name
self.description = existing_history.description or description
else:
self.history = self.db_manager.log_project_start(
project_id=project_id,
project_name=project_name,
description=description
)
# Re-fetch with new history_id
self.db_manager = DatabaseManager(db)
self.active_pull_request = self.db_manager.get_open_pull_request(project_id=self.project_id)
if existing_history is not None and self.history is not None:
latest_ui = self.db_manager._get_latest_ui_snapshot_data(self.history.id)
repository = latest_ui.get('repository') if isinstance(latest_ui, dict) else None
if isinstance(repository, dict) and repository:
self.repo_owner = repository.get('owner') or self.repo_owner
self.repo_name = repository.get('name') or self.repo_name
self.repo_url = repository.get('url') or self.repo_url
if self.prompt_text:
self.prompt_audit = self.db_manager.log_prompt_submission(
history_id=self.history.id,
project_id=project_id,
project_id=self.project_id,
prompt_text=self.prompt_text,
features=self.features,
tech_stack=self.tech_stack,
@@ -95,19 +114,71 @@ class AgentOrchestrator:
self.ui_manager.ui_data["repository"] = {
"owner": self.repo_owner,
"name": self.repo_name,
"url": self.repo_url,
"mode": "project" if settings.use_project_repositories else "shared",
"status": "pending" if settings.use_project_repositories else "shared",
"provider": "gitea",
}
if self.active_pull_request:
self.ui_manager.ui_data["pull_request"] = self.active_pull_request
def _build_pr_branch_name(self, project_id: str) -> str:
"""Build a stable branch name used until the PR is merged."""
return f"ai/{project_id}"
def _build_repo_url(self, owner: str | None, repo: str | None) -> str | None:
if not owner or not repo or not settings.gitea_url:
return None
return f"{settings.gitea_url.rstrip('/')}/{owner}/{repo}"
def _log_generation_plan_trace(self) -> None:
"""Persist the current generation plan as an inspectable trace."""
if not self.db_manager or not self.history or not self.prompt_audit:
return
planned_files = list(self._template_files().keys())
self.db_manager.log_llm_trace(
project_id=self.project_id,
history_id=self.history.id,
prompt_id=self.prompt_audit.id,
stage='generation_plan',
provider='factory-planner',
model='template-generator',
system_prompt='Plan the generated project structure from the structured request and repository state.',
user_prompt=self.prompt_text or self.description,
assistant_response=(
f"Planned files: {', '.join(planned_files)}. "
f"Target branch: {self.branch_name}. "
f"Repository mode: {self.ui_manager.ui_data.get('repository', {}).get('mode', 'unknown')}."
),
raw_response={
'planned_files': planned_files,
'features': list(self.features),
'tech_stack': list(self.tech_stack),
'branch': self.branch_name,
'repository': self.ui_manager.ui_data.get('repository', {}),
},
fallback_used=False,
)
async def _ensure_remote_repository(self) -> None:
if not settings.use_project_repositories:
self.ui_manager.ui_data["repository"]["status"] = "shared"
if settings.gitea_repo:
predicted_url = self._build_repo_url(self.repo_owner, self.repo_name)
if predicted_url:
self.repo_url = predicted_url
self.ui_manager.ui_data["repository"]["url"] = predicted_url
self.ui_manager.ui_data["repository"]["api_response"] = {
"status": "shared",
"detail": "Using the configured shared repository instead of provisioning a per-project repo.",
}
return
if not self.repo_owner or not settings.gitea_token or not settings.gitea_url:
self.ui_manager.ui_data["repository"]["status"] = "skipped"
self.ui_manager.ui_data["repository"]["reason"] = "Missing Gitea owner, URL, or token configuration"
self.ui_manager.ui_data["repository"]["api_response"] = {
"status": "skipped",
"detail": "Missing Gitea owner, URL, or token configuration",
}
return
repo_name = self.repo_name
@@ -115,6 +186,7 @@ class AgentOrchestrator:
repo_name=repo_name,
owner=self.repo_owner,
description=f"AI-generated project for {self.project_name}",
auto_init=False,
)
if result.get("status") == "exists" and repo_name == self.gitea_api.build_project_repo_name(self.project_id, self.project_name):
repo_name = f"{repo_name}-{self.project_id.split('-')[-1]}"
@@ -122,6 +194,7 @@ class AgentOrchestrator:
repo_name=repo_name,
owner=self.repo_owner,
description=f"AI-generated project for {self.project_name}",
auto_init=False,
)
self.repo_name = repo_name
self.ui_manager.ui_data["repository"]["name"] = repo_name
@@ -135,10 +208,172 @@ class AgentOrchestrator:
else f"Prepared repository {self.repo_owner}/{self.repo_name}"
),
)
self.ui_manager.ui_data["repository"]["status"] = result.get("status", "error" if result.get("error") else "ready")
if result.get("html_url"):
repo_status = result.get("status", "error" if result.get("error") else "ready")
self.ui_manager.ui_data["repository"]["status"] = repo_status
self.ui_manager.ui_data["repository"]["api_response"] = {
key: value
for key, value in result.items()
if key not in {"private"}
}
if result.get("status_code") is not None:
self.ui_manager.ui_data["repository"]["api_status_code"] = result.get("status_code")
if result.get("error"):
self.ui_manager.ui_data["repository"]["reason"] = result.get("error")
self.ui_manager.ui_data["repository"].pop("url", None)
elif result.get("html_url"):
self.repo_url = result["html_url"]
self.ui_manager.ui_data["repository"]["url"] = self.repo_url
clone_url = result.get("clone_url") or self.gitea_api.build_repo_git_url(self.repo_owner, self.repo_name)
if clone_url:
self.ui_manager.ui_data["repository"]["clone_url"] = clone_url
self.ui_manager.ui_data["repository"].pop("reason", None)
elif repo_status == "exists":
predicted_url = self._build_repo_url(self.repo_owner, self.repo_name)
if predicted_url:
self.repo_url = predicted_url
self.ui_manager.ui_data["repository"]["url"] = predicted_url
clone_url = result.get("clone_url") or self.gitea_api.build_repo_git_url(self.repo_owner, self.repo_name)
if clone_url:
self.ui_manager.ui_data["repository"]["clone_url"] = clone_url
else:
self.ui_manager.ui_data["repository"].pop("url", None)
async def _resolve_gitea_username(self) -> str:
"""Resolve and cache the Gitea login used for authenticated git operations."""
if self._gitea_username:
return self._gitea_username
user_info = await self.gitea_api.get_current_user()
if user_info.get('error') or not user_info.get('login'):
raise RuntimeError(f"Unable to resolve Gitea user for push: {user_info.get('error', 'missing login')}")
self._gitea_username = user_info['login']
return self._gitea_username
async def _push_branch(self, branch: str) -> dict | None:
"""Push a branch to the configured project repository when available."""
repository = self.ui_manager.ui_data.get('repository') or {}
if repository.get('mode') != 'project':
return None
if repository.get('status') not in {'created', 'exists', 'ready'}:
return None
if not settings.gitea_token or not self.repo_owner or not self.repo_name:
return None
clone_url = repository.get('clone_url') or self.gitea_api.build_repo_git_url(self.repo_owner, self.repo_name)
if not clone_url:
return None
username = await self._resolve_gitea_username()
self.git_manager.push_with_credentials(
remote_url=clone_url,
username=username,
password=settings.gitea_token,
remote='origin',
branch=branch,
)
return {'status': 'pushed', 'remote': clone_url, 'branch': branch}
async def _prepare_git_workspace(self) -> None:
"""Initialize the local repo and ensure the PR branch exists before writing files."""
if not self.git_manager.has_repo():
self.git_manager.init_repo()
if not self.git_manager.current_head_or_none():
self.git_manager.create_empty_commit('Initialize project repository')
try:
await self._push_branch('main')
except (RuntimeError, subprocess.CalledProcessError, FileNotFoundError) as exc:
self.ui_manager.ui_data.setdefault('git', {})['remote_error'] = str(exc)
self._append_log(f'Initial main push skipped: {exc}')
if self.git_manager.branch_exists(self.branch_name):
self.git_manager.checkout_branch(self.branch_name)
else:
self.git_manager.checkout_branch(self.branch_name, create=True, start_point='main')
self.ui_manager.ui_data.setdefault('git', {})['active_branch'] = self.branch_name
async def _ensure_pull_request(self) -> dict | None:
"""Create the project pull request on first delivery and reuse it later."""
if self.active_pull_request:
self.ui_manager.ui_data['pull_request'] = self.active_pull_request
return self.active_pull_request
repository = self.ui_manager.ui_data.get('repository') or {}
if repository.get('mode') != 'project' or repository.get('status') not in {'created', 'exists', 'ready'}:
return None
title = f"AI delivery for {self.project_name}"
body = (
f"Automated software factory changes for {self.project_name}.\n\n"
f"Prompt: {self.prompt_text or self.description}\n\n"
f"Branch: {self.branch_name}"
)
result = await self.gitea_api.create_pull_request(
title=title,
body=body,
owner=self.repo_owner,
repo=self.repo_name,
base='main',
head=self.branch_name,
)
if result.get('error'):
raise RuntimeError(f"Unable to create pull request: {result.get('error')}")
pr_number = result.get('number') or result.get('id') or 0
pr_data = {
'pr_number': pr_number,
'title': result.get('title', title),
'body': result.get('body', body),
'state': result.get('state', 'open'),
'base': result.get('base', {}).get('ref', 'main') if isinstance(result.get('base'), dict) else 'main',
'user': result.get('user', {}).get('login', 'system') if isinstance(result.get('user'), dict) else 'system',
'pr_url': result.get('html_url') or self.gitea_api.build_pull_request_url(pr_number, self.repo_owner, self.repo_name),
'merged': bool(result.get('merged')),
'pr_state': result.get('state', 'open'),
}
if self.db_manager and self.history:
self.db_manager.save_pr_data(self.history.id, pr_data)
self.active_pull_request = self.db_manager.get_open_pull_request(project_id=self.project_id) if self.db_manager else pr_data
self.ui_manager.ui_data['pull_request'] = self.active_pull_request or pr_data
return self.active_pull_request or pr_data
async def _push_remote_commit(self, commit_hash: str, commit_message: str, changed_files: list[str], base_commit: str | None) -> dict | None:
"""Push the local commit to the provisioned Gitea repository and build browser links."""
repository = self.ui_manager.ui_data.get("repository") or {}
if repository.get("mode") != "project":
return None
if repository.get("status") not in {"created", "exists", "ready"}:
return None
push_result = await self._push_branch(self.branch_name)
if push_result is None:
return None
pull_request = await self._ensure_pull_request()
commit_url = self.gitea_api.build_commit_url(commit_hash, owner=self.repo_owner, repo=self.repo_name)
compare_url = self.gitea_api.build_compare_url(base_commit, commit_hash, owner=self.repo_owner, repo=self.repo_name) if base_commit else None
remote_record = {
"status": "pushed",
"remote": push_result.get('remote'),
"branch": self.branch_name,
"commit_url": commit_url,
"compare_url": compare_url,
"changed_files": changed_files,
"pull_request": pull_request,
}
self.ui_manager.ui_data.setdefault("git", {})["remote_push"] = remote_record
repository["last_commit_url"] = commit_url
if compare_url:
repository["last_compare_url"] = compare_url
self._append_log(f"Pushed generated commit to {self.repo_owner}/{self.repo_name}.")
return remote_record
def _build_diff_text(self, relative_path: str, previous_content: str, new_content: str) -> str:
"""Build a unified diff for display in the dashboard."""
previous_lines = previous_content.splitlines(keepends=True)
new_lines = new_content.splitlines(keepends=True)
diff = difflib.unified_diff(
previous_lines,
new_lines,
fromfile=f"a/{relative_path}",
tofile=f"b/{relative_path}",
)
return "".join(diff)
def _append_log(self, message: str) -> None:
timestamped = f"[{datetime.utcnow().isoformat()}] {message}"
@@ -163,6 +398,8 @@ class AgentOrchestrator:
target = self.project_root / relative_path
target.parent.mkdir(parents=True, exist_ok=True)
change_type = "UPDATE" if target.exists() else "CREATE"
previous_content = target.read_text(encoding="utf-8") if target.exists() else ""
diff_text = self._build_diff_text(relative_path, previous_content, content)
target.write_text(content, encoding="utf-8")
self.changed_files.append(relative_path)
if self.db_manager and self.history:
@@ -176,6 +413,7 @@ class AgentOrchestrator:
history_id=self.history.id,
prompt_id=self.prompt_audit.id if self.prompt_audit else None,
diff_summary=f"Wrote {len(content.splitlines())} lines to {relative_path}",
diff_text=diff_text,
)
def _template_files(self) -> dict[str, str]:
@@ -215,6 +453,8 @@ class AgentOrchestrator:
self._append_log("Initializing project.")
await self._ensure_remote_repository()
await self._prepare_git_workspace()
self._log_generation_plan_trace()
# Step 2: Create project structure (skip git operations)
self._update_progress(20, "project-structure", "Creating project files...")
@@ -228,6 +468,10 @@ class AgentOrchestrator:
self._update_progress(80, "validation", "Validating generated code...")
await self._run_tests()
# Step 5: Commit generated artifacts locally for traceability
self._update_progress(90, "git", "Recording generated changes in git...")
await self._commit_to_git()
# Step 7: Complete
self.status = "completed"
self._update_progress(100, "completed", "Software generation complete!")
@@ -253,6 +497,7 @@ class AgentOrchestrator:
"project_root": str(self.project_root),
"changed_files": list(dict.fromkeys(self.changed_files)),
"repository": self.ui_manager.ui_data.get("repository"),
"pull_request": self.ui_manager.ui_data.get("pull_request"),
}
except Exception as e:
@@ -279,6 +524,7 @@ class AgentOrchestrator:
"project_root": str(self.project_root),
"changed_files": list(dict.fromkeys(self.changed_files)),
"repository": self.ui_manager.ui_data.get("repository"),
"pull_request": self.ui_manager.ui_data.get("pull_request"),
}
async def _create_project_structure(self) -> None:
@@ -305,7 +551,63 @@ class AgentOrchestrator:
async def _commit_to_git(self) -> None:
"""Commit changes to git."""
pass # Skip git operations in test environment
unique_files = list(dict.fromkeys(self.changed_files))
if not unique_files:
return
try:
if not self.git_manager.has_repo():
self.git_manager.init_repo()
base_commit = self.git_manager.current_head_or_none()
self.git_manager.add_files(unique_files)
if not self.git_manager.get_status():
return
commit_message = f"AI generation for prompt: {self.project_name}"
commit_hash = self.git_manager.commit(commit_message)
commit_record = {
"hash": commit_hash,
"message": commit_message,
"files": unique_files,
"timestamp": datetime.utcnow().isoformat(),
"scope": "local",
"branch": self.branch_name,
}
remote_record = None
try:
remote_record = await self._push_remote_commit(commit_hash, commit_message, unique_files, base_commit)
except (RuntimeError, subprocess.CalledProcessError, FileNotFoundError) as remote_exc:
self.ui_manager.ui_data.setdefault("git", {})["remote_error"] = str(remote_exc)
self._append_log(f"Remote git push skipped: {remote_exc}")
if remote_record:
commit_record["scope"] = "remote"
commit_record["commit_url"] = remote_record.get("commit_url")
commit_record["compare_url"] = remote_record.get("compare_url")
if remote_record.get('pull_request'):
commit_record['pull_request'] = remote_record['pull_request']
self.ui_manager.ui_data['pull_request'] = remote_record['pull_request']
self.ui_manager.ui_data.setdefault("git", {})["latest_commit"] = commit_record
self.ui_manager.ui_data.setdefault("git", {})["commits"] = [commit_record]
self._append_log(f"Recorded git commit {commit_hash[:12]} for generated files.")
if self.db_manager:
self.db_manager.log_commit(
project_id=self.project_id,
commit_message=commit_message,
actor="orchestrator",
actor_type="agent",
history_id=self.history.id if self.history else None,
prompt_id=self.prompt_audit.id if self.prompt_audit else None,
commit_hash=commit_hash,
changed_files=unique_files,
branch=self.branch_name,
commit_url=remote_record.get("commit_url") if remote_record else None,
compare_url=remote_record.get("compare_url") if remote_record else None,
remote_status=remote_record.get("status") if remote_record else "local-only",
)
except (subprocess.CalledProcessError, FileNotFoundError) as exc:
self.ui_manager.ui_data.setdefault("git", {})["error"] = str(exc)
self._append_log(f"Git commit skipped: {exc}")
async def _create_pr(self) -> None:
"""Create pull request."""

View File

@@ -0,0 +1,127 @@
"""Helpers for prompt-level repository workflows such as undoing a prompt."""
from __future__ import annotations
import subprocess
try:
from ..config import settings
from .database_manager import DatabaseManager
from .git_manager import GitManager
from .gitea import GiteaAPI
except ImportError:
from config import settings
from agents.database_manager import DatabaseManager
from agents.git_manager import GitManager
from agents.gitea import GiteaAPI
class PromptWorkflowManager:
"""Coordinate prompt-level repository actions against git and Gitea."""
def __init__(self, db):
self.db_manager = DatabaseManager(db)
self.gitea_api = GiteaAPI(
token=settings.GITEA_TOKEN,
base_url=settings.GITEA_URL,
owner=settings.GITEA_OWNER,
repo=settings.GITEA_REPO or '',
)
async def undo_prompt(self, project_id: str, prompt_id: int) -> dict:
"""Revert the commit associated with a prompt and push the revert to the PR branch."""
history = self.db_manager.get_project_by_id(project_id)
if history is None:
return {'status': 'error', 'message': 'Project not found'}
correlations = self.db_manager.get_prompt_change_correlations(project_id=project_id, limit=500)
correlation = next((item for item in correlations if item.get('prompt_id') == prompt_id), None)
if correlation is None:
return {'status': 'error', 'message': 'Prompt not found for project'}
if correlation.get('revert'):
return {'status': 'ignored', 'message': 'Prompt has already been reverted', 'revert': correlation['revert']}
original_commit = next(
(commit for commit in correlation.get('commits', []) if commit.get('remote_status') != 'reverted' and commit.get('commit_hash')),
None,
)
if original_commit is None:
return {'status': 'error', 'message': 'No reversible commit was recorded for this prompt'}
branch = original_commit.get('branch') or f'ai/{project_id}'
project_root = settings.projects_root / project_id
git_manager = GitManager(project_id, project_dir=str(project_root))
if not git_manager.has_repo():
return {'status': 'error', 'message': 'Local project repository is not available for undo'}
try:
git_manager.checkout_branch(branch)
previous_head = git_manager.current_head_or_none()
revert_commit_hash = git_manager.revert_commit(original_commit['commit_hash'])
except (subprocess.CalledProcessError, FileNotFoundError) as exc:
return {'status': 'error', 'message': f'Unable to revert prompt commit: {exc}'}
repository = self.db_manager.get_project_audit_data(project_id).get('repository') or {}
commit_url = None
compare_url = None
if (
repository.get('mode') == 'project'
and repository.get('status') in {'created', 'exists', 'ready'}
and settings.gitea_token
and repository.get('owner')
and repository.get('name')
):
try:
user_info = await self.gitea_api.get_current_user()
username = user_info.get('login') if isinstance(user_info, dict) else None
if username and not user_info.get('error'):
remote_url = repository.get('clone_url') or self.gitea_api.build_repo_git_url(repository.get('owner'), repository.get('name'))
if remote_url:
git_manager.push_with_credentials(
remote_url=remote_url,
username=username,
password=settings.gitea_token,
branch=branch,
)
commit_url = self.gitea_api.build_commit_url(revert_commit_hash, repository.get('owner'), repository.get('name'))
if previous_head:
compare_url = self.gitea_api.build_compare_url(previous_head, revert_commit_hash, repository.get('owner'), repository.get('name'))
except (RuntimeError, subprocess.CalledProcessError, FileNotFoundError):
pass
self.db_manager.log_commit(
project_id=project_id,
commit_message=f'Revert prompt {prompt_id}',
actor='dashboard',
actor_type='user',
history_id=history.id,
prompt_id=prompt_id,
commit_hash=revert_commit_hash,
changed_files=original_commit.get('changed_files', []),
branch=branch,
commit_url=commit_url,
compare_url=compare_url,
remote_status='reverted',
)
self.db_manager.log_prompt_revert(
project_id=project_id,
prompt_id=prompt_id,
reverted_commit_hash=original_commit['commit_hash'],
revert_commit_hash=revert_commit_hash,
actor='dashboard',
commit_url=commit_url,
)
self.db_manager.log_system_event(
component='git',
level='INFO',
message=f'Reverted prompt {prompt_id} for project {project_id}',
)
return {
'status': 'success',
'project_id': project_id,
'prompt_id': prompt_id,
'reverted_commit_hash': original_commit['commit_hash'],
'revert_commit_hash': revert_commit_hash,
'commit_url': commit_url,
'compare_url': compare_url,
}

View File

@@ -19,11 +19,24 @@ class RequestInterpreter:
self.model = model or settings.OLLAMA_MODEL
async def interpret(self, prompt_text: str) -> dict:
"""Interpret free-form text into the request shape expected by the orchestrator."""
interpreted, _trace = await self.interpret_with_trace(prompt_text)
return interpreted
async def interpret_with_trace(self, prompt_text: str) -> tuple[dict, dict]:
"""Interpret free-form text into the request shape expected by the orchestrator."""
normalized = prompt_text.strip()
if not normalized:
raise ValueError('Prompt text cannot be empty')
system_prompt = (
'You extract structured software requests. '
'Return only JSON with keys name, description, features, tech_stack. '
'name and description must be concise strings. '
'features and tech_stack must be arrays of strings. '
'Infer missing details from the user request instead of leaving arrays empty when possible.'
)
try:
import aiohttp
@@ -37,13 +50,7 @@ class RequestInterpreter:
'messages': [
{
'role': 'system',
'content': (
'You extract structured software requests. '
'Return only JSON with keys name, description, features, tech_stack. '
'name and description must be concise strings. '
'features and tech_stack must be arrays of strings. '
'Infer missing details from the user request instead of leaving arrays empty when possible.'
),
'content': system_prompt,
},
{'role': 'user', 'content': normalized},
],
@@ -53,11 +60,31 @@ class RequestInterpreter:
if 200 <= resp.status < 300:
content = payload.get('message', {}).get('content', '')
if content:
return self._normalize_interpreted_request(json.loads(content), normalized)
interpreted = self._normalize_interpreted_request(json.loads(content), normalized)
return interpreted, {
'stage': 'request_interpretation',
'provider': 'ollama',
'model': self.model,
'system_prompt': system_prompt,
'user_prompt': normalized,
'assistant_response': content,
'raw_response': payload,
'fallback_used': False,
}
except Exception:
pass
return self._heuristic_fallback(normalized)
interpreted = self._heuristic_fallback(normalized)
return interpreted, {
'stage': 'request_interpretation',
'provider': 'heuristic',
'model': self.model,
'system_prompt': system_prompt,
'user_prompt': normalized,
'assistant_response': json.dumps(interpreted),
'raw_response': {'fallback': 'heuristic'},
'fallback_used': True,
}
def _normalize_interpreted_request(self, interpreted: dict, original_prompt: str) -> dict:
"""Normalize LLM output into the required request shape."""

View File

@@ -4,18 +4,28 @@ from __future__ import annotations
from contextlib import closing
from html import escape
import json
import time
from nicegui import app, ui
AUTO_SYNC_INTERVAL_SECONDS = 60
_last_background_repo_sync_at = 0.0
try:
from .agents.database_manager import DatabaseManager
from .agents.gitea import GiteaAPI
from .agents.n8n_setup import N8NSetupAgent
from .agents.prompt_workflow import PromptWorkflowManager
from .agents.telegram import TelegramHandler
from .config import settings
from .database import get_database_runtime_summary, get_db_sync, init_db
except ImportError:
from agents.database_manager import DatabaseManager
from agents.gitea import GiteaAPI
from agents.n8n_setup import N8NSetupAgent
from agents.prompt_workflow import PromptWorkflowManager
from agents.telegram import TelegramHandler
from config import settings
from database import get_database_runtime_summary, get_db_sync, init_db
@@ -41,6 +51,9 @@ def _render_repository_block(repository: dict | None) -> None:
mode = repository.get('mode') or 'project'
status = repository.get('status')
repo_url = repository.get('url')
reason = repository.get('reason')
api_status_code = repository.get('api_status_code')
api_response = repository.get('api_response')
with ui.column().classes('gap-1'):
with ui.row().classes('items-center gap-2'):
@@ -52,6 +65,305 @@ def _render_repository_block(repository: dict | None) -> None:
ui.link(repo_url, repo_url, new_tab=True).classes('factory-code')
else:
ui.label('Repository URL not available yet.').classes('factory-muted')
if reason:
ui.label(f'Reason: {reason}').classes('factory-muted')
if api_status_code is not None:
ui.label(f'Gitea status code: {api_status_code}').classes('factory-muted')
if api_response:
with ui.expansion('Gitea API response').classes('w-full q-mt-sm'):
ui.label(json.dumps(api_response, indent=2, sort_keys=True)).classes('factory-code')
def _render_pull_request_block(pull_request: dict | None) -> None:
"""Render tracked pull request details."""
if not pull_request:
ui.label('No pull request recorded yet.').classes('factory-muted')
return
with ui.column().classes('gap-1'):
with ui.row().classes('items-center gap-2'):
ui.label(pull_request.get('pr_title') or 'Untitled PR').style('font-weight: 700; color: #2f241d;')
ui.label(pull_request.get('pr_state') or 'unknown').classes('factory-chip')
if pull_request.get('merged'):
ui.label('merged').classes('factory-chip')
if pull_request.get('pr_url'):
ui.link('Open pull request', pull_request['pr_url'], new_tab=True).classes('factory-code')
if pull_request.get('pr_body'):
ui.label(pull_request['pr_body']).classes('factory-muted')
def _render_repository_sync_block(repository_sync: dict | None) -> None:
"""Render latest repository sync health and import counts."""
if not repository_sync:
ui.label('No repository sync recorded yet.').classes('factory-muted')
return
with ui.column().classes('gap-1'):
with ui.row().classes('items-center gap-2'):
ui.label(repository_sync.get('status') or 'unknown').classes('factory-chip')
if repository_sync.get('timestamp'):
ui.label(repository_sync['timestamp']).classes('factory-muted')
ui.label(repository_sync.get('message') or 'No sync message recorded.').classes('factory-muted')
with ui.row().classes('items-center gap-2'):
ui.label(f"Commits imported: {repository_sync.get('imported_commit_count', 0)}").classes('factory-chip')
ui.label(f"PRs observed: {repository_sync.get('observed_pull_requests', 0)}").classes('factory-chip')
if repository_sync.get('branches_checked'):
ui.label(', '.join(repository_sync['branches_checked'])).classes('factory-muted')
if repository_sync.get('error'):
ui.label(str(repository_sync['error'])).classes('factory-code')
def _render_commit_list(commits: list[dict]) -> None:
"""Render prompt- or project-level git commits."""
if not commits:
ui.label('No git commits recorded for this scope yet.').classes('factory-muted')
return
for commit in commits:
commit_hash = commit.get('commit_hash') or commit.get('hash') or 'unknown'
commit_message = commit.get('commit_message') or commit.get('message') or 'No message recorded'
changed_files = commit.get('changed_files') or commit.get('files') or []
commit_url = commit.get('commit_url')
compare_url = commit.get('compare_url')
remote_status = commit.get('remote_status') or commit.get('scope') or 'local'
source = commit.get('source') or 'factory'
with ui.card().classes('q-pa-sm q-mt-sm'):
with ui.row().classes('items-center justify-between w-full'):
ui.label(commit_message).style('font-weight: 700; color: #2f241d;')
ui.label(commit_hash[:12]).classes('factory-chip')
ui.label(commit.get('timestamp') or 'Timestamp unavailable').classes('factory-muted')
ui.label(f'Status: {remote_status}').classes('factory-muted')
with ui.row().classes('items-center gap-2 q-mt-sm'):
ui.label(source).classes('factory-chip')
if commit.get('branch_scope'):
ui.label(commit['branch_scope']).classes('factory-chip')
if commit.get('branch'):
ui.label(commit['branch']).classes('factory-chip')
if commit.get('imported_from_remote'):
ui.label('imported').classes('factory-chip')
if commit.get('prompt_id') is not None:
ui.label(f"prompt {commit['prompt_id']}").classes('factory-chip')
if changed_files:
ui.label(', '.join(changed_files)).classes('factory-muted')
with ui.row().classes('items-center gap-3 q-mt-sm'):
if commit_url:
ui.link('Open commit in Gitea', commit_url, new_tab=True)
if compare_url:
ui.link('Open compare view', compare_url, new_tab=True)
def _render_timeline(events: list[dict]) -> None:
"""Render a mixed project timeline."""
if not events:
ui.label('No timeline events recorded yet.').classes('factory-muted')
return
for event in events:
metadata = event.get('metadata') or {}
with ui.card().classes('q-pa-sm q-mt-sm'):
with ui.row().classes('items-center justify-between w-full'):
ui.label(event.get('title') or event.get('action') or 'Event').style('font-weight: 700; color: #2f241d;')
ui.label(event.get('type') or 'event').classes('factory-chip')
ui.label(event.get('timestamp') or 'Timestamp unavailable').classes('factory-muted')
if event.get('details'):
ui.label(event['details']).classes('factory-muted')
if metadata.get('commit_hash'):
ui.label(f"Commit: {metadata['commit_hash'][:12]}").classes('factory-chip')
if metadata.get('branch_scope'):
ui.label(str(metadata['branch_scope'])).classes('factory-chip')
if metadata.get('branch'):
ui.label(str(metadata['branch'])).classes('factory-chip')
if metadata.get('pr_number'):
ui.label(f"PR #{metadata['pr_number']}").classes('factory-chip')
if metadata.get('prompt_id'):
ui.label(f"Prompt {metadata['prompt_id']}").classes('factory-chip')
def _render_commit_context(context: dict | None) -> None:
"""Render a commit provenance lookup result."""
if not context:
ui.label('No commit context loaded.').classes('factory-muted')
return
commit = context.get('commit') or {}
project = context.get('project') or {}
prompt = context.get('prompt')
with ui.card().classes('factory-panel q-pa-lg q-mt-md'):
with ui.row().classes('items-center justify-between w-full'):
ui.label(commit.get('commit_message') or 'Commit').style('font-size: 1.1rem; font-weight: 700; color: #2f241d;')
ui.label((commit.get('commit_hash') or 'unknown')[:12]).classes('factory-chip')
ui.label(context.get('origin_summary') or 'No origin summary available.').classes('factory-muted')
with ui.row().classes('items-center gap-2 q-mt-sm'):
if project.get('project_name'):
ui.label(project['project_name']).classes('factory-chip')
if commit.get('source'):
ui.label(commit['source']).classes('factory-chip')
if commit.get('remote_status'):
ui.label(commit['remote_status']).classes('factory-chip')
if context.get('repository'):
ui.label('Repository').style('font-weight: 700; color: #3a281a; margin-top: 12px;')
_render_repository_block(context.get('repository'))
ui.label('Commit').style('font-weight: 700; color: #3a281a; margin-top: 12px;')
_render_commit_list([commit])
if prompt:
ui.label('Originating Prompt').style('font-weight: 700; color: #3a281a; margin-top: 12px;')
ui.label(prompt.get('prompt_text') or 'Prompt text unavailable').classes('factory-code')
if context.get('related_llm_traces'):
ui.label('Related LLM Trace').style('font-weight: 700; color: #3a281a; margin-top: 12px;')
_render_llm_traces(context.get('related_llm_traces', []))
if context.get('related_changes'):
ui.label('Related Code Changes').style('font-weight: 700; color: #3a281a; margin-top: 12px;')
_render_change_list(context.get('related_changes', []))
if context.get('timeline'):
ui.label('Surrounding Timeline').style('font-weight: 700; color: #3a281a; margin-top: 12px;')
_render_timeline(context.get('timeline', []))
def _filter_timeline_events(events: list[dict], branch_scope: str) -> list[dict]:
"""Apply branch-scope filtering to timeline events."""
if not branch_scope:
return events
filtered = []
for event in events:
metadata = event.get('metadata') or {}
event_scope = metadata.get('branch_scope')
event_scopes = metadata.get('branch_scopes') or []
if event_scope == branch_scope or branch_scope in event_scopes:
filtered.append(event)
return filtered
def _load_commit_context(commit_hash: str, branch_scope: str = '') -> dict | None:
"""Load one commit provenance result from the database."""
if not commit_hash.strip():
return None
db = get_db_sync()
if db is None:
return None
with closing(db):
manager = DatabaseManager(db)
return manager.get_commit_context(commit_hash.strip(), branch_scope=branch_scope or None)
def _run_background_repository_sync() -> None:
"""Refresh remote-backed repositories on a throttled timer."""
global _last_background_repo_sync_at
if not settings.gitea_url or not settings.gitea_token:
return
now = time.monotonic()
if now - _last_background_repo_sync_at < AUTO_SYNC_INTERVAL_SECONDS:
return
db = get_db_sync()
if db is None:
return
synced_any = False
try:
with closing(db):
manager = DatabaseManager(db)
gitea_api = GiteaAPI(
token=settings.GITEA_TOKEN,
base_url=settings.GITEA_URL,
owner=settings.GITEA_OWNER,
repo=settings.GITEA_REPO or '',
)
for history in manager.get_all_projects():
repository = manager._get_project_repository(history) or {}
if not repository.get('owner') or not repository.get('name'):
continue
manager.sync_repository_activity(project_id=history.project_id, gitea_api=gitea_api, commit_limit=20)
synced_any = True
if synced_any:
manager.log_system_event(component='gitea', level='INFO', message='Background repository sync completed')
finally:
_last_background_repo_sync_at = now
def _render_change_list(changes: list[dict]) -> None:
"""Render code changes with expandable actual diffs."""
if not changes:
ui.label('No code changes recorded.').classes('factory-muted')
return
for change in changes:
with ui.card().classes('q-pa-sm q-mt-sm'):
with ui.row().classes('justify-between items-start w-full'):
ui.label(change.get('file_path') or 'unknown file').style('font-weight: 600; color: #2f241d;')
ui.label(change.get('change_type') or change.get('action_type') or 'CHANGE').classes('factory-chip')
ui.label(change.get('diff_summary') or change.get('details') or 'No diff summary recorded').classes('factory-muted')
if change.get('diff_text'):
with ui.expansion('Show diff').classes('w-full q-mt-sm'):
ui.label(change['diff_text']).classes('factory-code')
def _render_llm_traces(traces: list[dict]) -> None:
"""Render persisted LLM request/response traces for a prompt."""
if not traces:
ui.label('No LLM traces recorded for this prompt.').classes('factory-muted')
return
for trace in traces:
with ui.card().classes('q-pa-sm q-mt-sm'):
with ui.row().classes('items-center justify-between w-full'):
ui.label(trace.get('stage') or 'llm').style('font-weight: 700; color: #2f241d;')
provider = trace.get('provider') or 'unknown'
model = trace.get('model') or 'unknown'
ui.label(f'{provider}:{model}').classes('factory-chip')
if trace.get('fallback_used'):
ui.label('Fallback path used').classes('factory-chip')
with ui.expansion('System prompt').classes('w-full q-mt-sm'):
ui.label(trace.get('system_prompt') or 'No system prompt recorded').classes('factory-code')
with ui.expansion('User prompt').classes('w-full q-mt-sm'):
ui.label(trace.get('user_prompt') or 'No user prompt recorded').classes('factory-code')
with ui.expansion('Assistant response').classes('w-full q-mt-sm'):
ui.label(trace.get('assistant_response') or 'No assistant response recorded').classes('factory-code')
def _filter_llm_traces(traces: list[dict], stage: str, model: str, search_query: str) -> list[dict]:
"""Apply UI-selected LLM trace filters."""
filtered = []
lowered_query = search_query.strip().lower()
for trace in traces:
if stage and trace.get('stage') != stage:
continue
if model and trace.get('model') != model:
continue
if lowered_query:
haystacks = [
str(trace.get('stage') or ''),
str(trace.get('model') or ''),
str(trace.get('provider') or ''),
str(trace.get('user_prompt') or ''),
str(trace.get('assistant_response') or ''),
]
if not any(lowered_query in haystack.lower() for haystack in haystacks):
continue
filtered.append(trace)
return filtered
def _render_prompt_compare(correlation: dict) -> None:
"""Render a consolidated compare view for a single prompt."""
changes = correlation.get('changes', [])
commits = correlation.get('commits', [])
changed_files = [change.get('file_path') or 'unknown file' for change in changes]
with ui.card().classes('factory-panel q-pa-lg q-mt-md'):
with ui.row().classes('items-start justify-between w-full'):
with ui.column().classes('gap-1'):
ui.label(correlation.get('prompt_text') or 'Prompt text unavailable').classes('factory-code')
ui.label(correlation.get('timestamp') or 'Timestamp unavailable').classes('factory-muted')
with ui.column().classes('items-end gap-1'):
ui.label(f"{len(commits)} commit(s)").classes('factory-chip')
ui.label(f"{len(changes)} file change(s)").classes('factory-chip')
if changed_files:
ui.label('Files in this prompt change set').style('font-weight: 700; color: #3a281a; margin-top: 12px;')
ui.label(', '.join(changed_files)).classes('factory-muted')
ui.label('Commits').style('font-weight: 700; color: #3a281a; margin-top: 12px;')
_render_commit_list(commits)
ui.label('LLM Trace').style('font-weight: 700; color: #3a281a; margin-top: 12px;')
_render_llm_traces(correlation.get('llm_traces', []))
ui.label('Combined review').style('font-weight: 700; color: #3a281a; margin-top: 12px;')
_render_change_list(changes)
def _load_dashboard_snapshot() -> dict:
@@ -208,6 +520,14 @@ def create_dashboard():
"""Create the main NiceGUI dashboard."""
_add_dashboard_styles()
active_tab_key = 'dashboard.active_tab'
llm_stage_filter_key = 'dashboard.llm_stage_filter'
llm_model_filter_key = 'dashboard.llm_model_filter'
llm_search_filter_key = 'dashboard.llm_search_filter'
commit_lookup_key = 'dashboard.commit_lookup'
branch_scope_filter_key = 'dashboard.branch_scope_filter'
repo_discovery_key = 'dashboard.repo_discovery'
repo_owner_key = 'dashboard.repo_owner'
repo_name_key = 'dashboard.repo_name'
def _selected_tab_name() -> str:
"""Return the persisted active dashboard tab."""
@@ -217,6 +537,141 @@ def create_dashboard():
"""Persist the active dashboard tab across refreshes."""
app.storage.user[active_tab_key] = event.value or 'overview'
def _selected_llm_stage() -> str:
return app.storage.user.get(llm_stage_filter_key, '')
def _selected_llm_model() -> str:
return app.storage.user.get(llm_model_filter_key, '')
def _selected_llm_search() -> str:
return app.storage.user.get(llm_search_filter_key, '')
def _store_llm_stage(event) -> None:
app.storage.user[llm_stage_filter_key] = event.value or ''
dashboard_body.refresh()
def _store_llm_model(event) -> None:
app.storage.user[llm_model_filter_key] = event.value or ''
dashboard_body.refresh()
def _store_llm_search(event) -> None:
app.storage.user[llm_search_filter_key] = event.value or ''
dashboard_body.refresh()
def _selected_commit_lookup() -> str:
return app.storage.user.get(commit_lookup_key, '')
def _store_commit_lookup(event) -> None:
app.storage.user[commit_lookup_key] = event.value or ''
def _selected_branch_scope() -> str:
return app.storage.user.get(branch_scope_filter_key, '')
def _store_branch_scope(event) -> None:
app.storage.user[branch_scope_filter_key] = event.value or ''
dashboard_body.refresh()
def _selected_repo_owner() -> str:
return app.storage.user.get(repo_owner_key, settings.gitea_owner or '')
def _selected_repo_name() -> str:
return app.storage.user.get(repo_name_key, '')
def _store_repo_owner(event) -> None:
app.storage.user[repo_owner_key] = event.value or ''
def _store_repo_name(event) -> None:
app.storage.user[repo_name_key] = event.value or ''
def _set_discovered_repositories(repositories: list[dict]) -> None:
app.storage.user[repo_discovery_key] = repositories
def _get_discovered_repositories() -> list[dict]:
return app.storage.user.get(repo_discovery_key, [])
async def discover_gitea_repositories_action() -> None:
if not settings.gitea_url or not settings.gitea_token:
ui.notify('Configure GITEA_URL and GITEA_TOKEN first', color='negative')
return
owner = _selected_repo_owner() or settings.gitea_owner
gitea_api = GiteaAPI(token=settings.GITEA_TOKEN, base_url=settings.GITEA_URL, owner=owner, repo=settings.GITEA_REPO or '')
repositories = gitea_api.list_repositories_sync(owner=owner)
if isinstance(repositories, dict) and repositories.get('error'):
ui.notify(repositories.get('error', 'Unable to discover repositories'), color='negative')
return
db = get_db_sync()
resolved = []
if db is not None:
with closing(db):
manager = DatabaseManager(db)
for repo in repositories if isinstance(repositories, list) else []:
tracked_project = manager.get_project_by_repository(owner, repo.get('name', ''))
resolved.append(
{
'name': repo.get('name'),
'description': repo.get('description'),
'html_url': repo.get('html_url'),
'default_branch': repo.get('default_branch'),
'private': bool(repo.get('private', False)),
'onboarded': tracked_project is not None,
'project_id': tracked_project.project_id if tracked_project is not None else None,
}
)
_set_discovered_repositories(resolved)
ui.notify(f'Discovered {len(resolved)} repositories in {owner}', color='positive')
dashboard_body.refresh()
async def onboard_repository_action(owner: str, repo_name: str) -> None:
if not settings.gitea_url or not settings.gitea_token:
ui.notify('Configure GITEA_URL and GITEA_TOKEN first', color='negative')
return
if not owner or not repo_name:
ui.notify('Owner and repository name are required', color='negative')
return
gitea_api = GiteaAPI(token=settings.GITEA_TOKEN, base_url=settings.GITEA_URL, owner=owner, repo=settings.GITEA_REPO or '')
repo_info = await gitea_api.get_repo_info(owner=owner, repo=repo_name)
if isinstance(repo_info, dict) and repo_info.get('error'):
ui.notify(repo_info.get('error', 'Repository lookup failed'), color='negative')
return
db = get_db_sync()
if db is None:
ui.notify('Database session could not be created', color='negative')
return
with closing(db):
manager = DatabaseManager(db)
onboarded = manager.onboard_repository(owner=owner, repo_name=repo_name, repository_data=repo_info)
sync_result = manager.sync_repository_activity(project_id=onboarded['project_id'], gitea_api=gitea_api, commit_limit=25)
manager.log_system_event(
component='gitea',
level='INFO' if sync_result.get('status') == 'success' else 'ERROR',
message=sync_result.get('message', f'Onboarded {owner}/{repo_name}'),
)
await discover_gitea_repositories_action()
ui.notify(f'Onboarded {owner}/{repo_name}', color='positive')
dashboard_body.refresh()
def sync_project_repository_action(project_id: str) -> None:
if not settings.gitea_url or not settings.gitea_token:
ui.notify('Configure GITEA_URL and GITEA_TOKEN first', color='negative')
return
db = get_db_sync()
if db is None:
ui.notify('Database session could not be created', color='negative')
return
with closing(db):
result = DatabaseManager(db).sync_repository_activity(
project_id=project_id,
gitea_api=GiteaAPI(
token=settings.GITEA_TOKEN,
base_url=settings.GITEA_URL,
owner=settings.GITEA_OWNER,
repo=settings.GITEA_REPO or '',
),
commit_limit=25,
)
ui.notify(result.get('message', 'Repository sync finished'), color='positive' if result.get('status') == 'success' else 'negative')
dashboard_body.refresh()
async def setup_n8n_workflow_action() -> None:
api_url = _resolve_n8n_api_url()
if not api_url:
@@ -277,6 +732,16 @@ def create_dashboard():
ui.notify(result.get('message', 'Database initialized'), color='positive' if result.get('status') == 'success' else 'negative')
dashboard_body.refresh()
async def undo_prompt_action(project_id: str, prompt_id: int) -> None:
db = get_db_sync()
if db is None:
ui.notify('Database session could not be created', color='negative')
return
with closing(db):
result = await PromptWorkflowManager(db).undo_prompt(project_id=project_id, prompt_id=prompt_id)
ui.notify(result.get('message', 'Prompt reverted') if result.get('status') != 'success' else 'Prompt changes reverted', color='positive' if result.get('status') == 'success' else 'negative')
dashboard_body.refresh()
@ui.refreshable
def dashboard_body() -> None:
snapshot = _load_dashboard_snapshot()
@@ -291,10 +756,22 @@ def create_dashboard():
projects = snapshot['projects']
correlations = snapshot['correlations']
system_logs = snapshot['system_logs']
llm_stage_filter = _selected_llm_stage()
llm_model_filter = _selected_llm_model()
llm_search_filter = _selected_llm_search()
branch_scope_filter = _selected_branch_scope()
commit_lookup_query = _selected_commit_lookup()
commit_context = _load_commit_context(commit_lookup_query, branch_scope_filter) if commit_lookup_query else None
discovered_repositories = _get_discovered_repositories()
all_llm_traces = [trace for project_bundle in projects for trace in project_bundle.get('llm_traces', [])]
llm_stage_options = [''] + sorted({trace.get('stage') for trace in all_llm_traces if trace.get('stage')})
llm_model_options = [''] + sorted({trace.get('model') for trace in all_llm_traces if trace.get('model')})
project_repository_map = {
project_bundle['project']['project_id']: {
'project_name': project_bundle['project']['project_name'],
'repository': project_bundle.get('repository') or project_bundle['project'].get('repository'),
'repository_sync': project_bundle.get('repository_sync') or project_bundle['project'].get('repository_sync'),
'pull_request': next((pr for pr in project_bundle.get('pull_requests', []) if pr.get('pr_state') == 'open' and not pr.get('merged')), None),
}
for project_bundle in projects
if project_bundle.get('project')
@@ -317,7 +794,7 @@ def create_dashboard():
('Projects', summary['total_projects'], 'Tracked generation requests'),
('Completed', summary['completed_projects'], 'Finished project runs'),
('Prompts', summary['prompt_events'], 'Recorded originating prompts'),
('Code Changes', summary['code_changes'], 'Audited generated file writes'),
('Open PRs', summary['open_pull_requests'], 'Unmerged review branches'),
]
for title, value, subtitle in metrics:
with ui.card().classes('factory-kpi'):
@@ -330,6 +807,8 @@ def create_dashboard():
ui.tab('Overview').props('name=overview')
ui.tab('Projects').props('name=projects')
ui.tab('Prompt Trace').props('name=trace')
ui.tab('Compare').props('name=compare')
ui.tab('Timeline').props('name=timeline')
ui.tab('System').props('name=system')
ui.tab('Health').props('name=health')
@@ -375,6 +854,19 @@ def create_dashboard():
with ui.card().classes('q-pa-md'):
ui.label('Repository').style('font-weight: 700; color: #3a281a;')
_render_repository_block(project_bundle.get('repository') or project.get('repository'))
ui.button(
'Sync Repo Activity',
on_click=lambda _=None, project_id=project['project_id']: sync_project_repository_action(project_id),
).props('outline color=secondary').classes('q-mt-md')
with ui.card().classes('q-pa-md'):
ui.label('Repository Sync').style('font-weight: 700; color: #3a281a;')
_render_repository_sync_block(project_bundle.get('repository_sync') or project.get('repository_sync'))
with ui.card().classes('q-pa-md'):
ui.label('Pull Request').style('font-weight: 700; color: #3a281a;')
open_pr = next((pr for pr in project_bundle.get('pull_requests', []) if pr.get('pr_state') == 'open' and not pr.get('merged')), None)
_render_pull_request_block(open_pr)
with ui.card().classes('q-pa-md'):
ui.label('Prompt').style('font-weight: 700; color: #3a281a;')
@@ -387,19 +879,20 @@ def create_dashboard():
else:
ui.label('No prompt recorded.').classes('factory-muted')
with ui.grid(columns=1).classes('w-full gap-4 q-pa-md'):
with ui.card().classes('q-pa-md'):
ui.label('Generated Changes').style('font-weight: 700; color: #3a281a;')
changes = project_bundle.get('code_changes', [])
if changes:
for change in changes:
with ui.row().classes('justify-between items-start w-full q-mt-sm'):
ui.label(change['file_path'] or 'unknown file').style('font-weight: 600; color: #2f241d;')
ui.label(change['action_type']).classes('factory-chip')
ui.label(change['diff_summary'] or change['details']).classes('factory-muted')
else:
ui.label('No code changes recorded.').classes('factory-muted')
_render_change_list(project_bundle.get('code_changes', []))
with ui.grid(columns=2).classes('w-full gap-4 q-pa-md'):
with ui.card().classes('q-pa-md'):
ui.label('Git Commits').style('font-weight: 700; color: #3a281a;')
_render_commit_list(project_bundle.get('commits', []))
with ui.card().classes('q-pa-md'):
ui.label('LLM Trace').style('font-weight: 700; color: #3a281a;')
_render_llm_traces(_filter_llm_traces(project_bundle.get('llm_traces', []), llm_stage_filter, llm_model_filter, llm_search_filter))
with ui.card().classes('q-pa-md'):
ui.label('Recent Logs').style('font-weight: 700; color: #3a281a;')
logs = project_bundle.get('logs', [])[:6]
@@ -409,6 +902,7 @@ def create_dashboard():
else:
ui.label('No project logs yet.').classes('factory-muted')
with ui.grid(columns=1).classes('w-full gap-4 q-pa-md'):
with ui.card().classes('q-pa-md'):
ui.label('Audit Trail').style('font-weight: 700; color: #3a281a;')
audits = project_bundle.get('audit_trail', [])[:6]
@@ -422,23 +916,130 @@ def create_dashboard():
with ui.card().classes('factory-panel q-pa-lg'):
ui.label('Prompt to Code Correlation').style('font-size: 1.25rem; font-weight: 700; color: #3a281a;')
ui.label('Each prompt entry is linked to the generated files recorded after that prompt for the same project.').classes('factory-muted')
with ui.row().classes('items-center gap-3 q-mt-md w-full'):
ui.select(
options=llm_stage_options,
value=llm_stage_filter,
on_change=_store_llm_stage,
label='LLM stage',
).classes('min-w-[12rem]')
ui.select(
options=llm_model_options,
value=llm_model_filter,
on_change=_store_llm_model,
label='LLM model',
).classes('min-w-[12rem]')
ui.input(
label='Search trace text',
value=llm_search_filter,
on_change=_store_llm_search,
).classes('min-w-[18rem]')
if correlations:
for correlation in correlations:
correlation_project = project_repository_map.get(correlation['project_id'], {})
filtered_traces = _filter_llm_traces(correlation.get('llm_traces', []), llm_stage_filter, llm_model_filter, llm_search_filter)
with ui.card().classes('q-pa-md q-mt-md'):
ui.label(correlation_project.get('project_name') or correlation['project_id']).style('font-size: 1rem; font-weight: 700; color: #2f241d;')
_render_repository_block(correlation_project.get('repository'))
_render_pull_request_block(correlation_project.get('pull_request'))
ui.label(correlation['prompt_text']).classes('factory-code q-mt-sm')
if correlation['changes']:
for change in correlation['changes']:
ui.markdown(
f"- **{change['file_path'] or 'unknown'}** · {change['change_type']} · {change['diff_summary'] or change['details']}"
)
else:
ui.label('No code changes correlated to this prompt yet.').classes('factory-muted')
if correlation.get('revert'):
ui.label(f"Reverted by commit {correlation['revert'].get('revert_commit_hash', 'unknown')[:12]}").classes('factory-chip')
ui.label('Commits').style('font-weight: 700; color: #3a281a; margin-top: 12px;')
_render_commit_list(correlation.get('commits', []))
ui.label('LLM Trace').style('font-weight: 700; color: #3a281a; margin-top: 12px;')
_render_llm_traces(filtered_traces)
ui.label('File Diffs').style('font-weight: 700; color: #3a281a; margin-top: 12px;')
_render_change_list(correlation['changes'])
else:
ui.label('No prompt traces recorded yet.').classes('factory-muted')
with ui.tab_panel('compare'):
with ui.card().classes('factory-panel q-pa-lg'):
ui.label('Prompt Compare View').style('font-size: 1.25rem; font-weight: 700; color: #3a281a;')
ui.label('Review one prompt at a time as a complete change set: repo diagnostics, commit links, and file-level diffs in one place.').classes('factory-muted')
with ui.row().classes('items-center gap-3 q-mt-md w-full'):
ui.select(
options=llm_stage_options,
value=llm_stage_filter,
on_change=_store_llm_stage,
label='LLM stage',
).classes('min-w-[12rem]')
ui.select(
options=llm_model_options,
value=llm_model_filter,
on_change=_store_llm_model,
label='LLM model',
).classes('min-w-[12rem]')
ui.input(
label='Search trace text',
value=llm_search_filter,
on_change=_store_llm_search,
).classes('min-w-[18rem]')
if correlations:
for correlation in correlations:
correlation_project = project_repository_map.get(correlation['project_id'], {})
correlation = {
**correlation,
'llm_traces': _filter_llm_traces(correlation.get('llm_traces', []), llm_stage_filter, llm_model_filter, llm_search_filter),
}
with ui.card().classes('q-pa-md q-mt-md'):
ui.label(correlation_project.get('project_name') or correlation['project_id']).style('font-size: 1rem; font-weight: 700; color: #2f241d;')
_render_repository_block(correlation_project.get('repository'))
_render_pull_request_block(correlation_project.get('pull_request'))
with ui.row().classes('items-center gap-2 q-mt-sm'):
if correlation.get('revert'):
ui.label('Prompt has already been reverted').classes('factory-chip')
else:
ui.button(
'Undo This Prompt',
on_click=lambda _=None, project_id=correlation['project_id'], prompt_id=correlation['prompt_id']: undo_prompt_action(project_id, prompt_id),
).props('outline color=negative')
_render_prompt_compare(correlation)
else:
ui.label('No prompt compare data recorded yet.').classes('factory-muted')
with ui.tab_panel('timeline'):
with ui.card().classes('factory-panel q-pa-lg q-mb-md'):
ui.label('Commit Lookup').style('font-size: 1.25rem; font-weight: 700; color: #3a281a;')
ui.label('Submit a commit id to reconstruct the prompt, traces, repository state, and surrounding timeline that produced it.').classes('factory-muted')
with ui.row().classes('items-center gap-3 q-mt-md w-full'):
ui.select(
options=['', 'main', 'pr', 'manual'],
value=branch_scope_filter,
on_change=_store_branch_scope,
label='Branch scope',
).classes('min-w-[10rem]')
ui.input(
label='Commit hash',
value=commit_lookup_query,
on_change=_store_commit_lookup,
placeholder='deadbeef',
).classes('min-w-[18rem]')
ui.button('Lookup', on_click=dashboard_body.refresh).props('unelevated color=dark')
if commit_lookup_query and commit_context is None:
ui.label('No recorded context found for that commit hash.').classes('factory-muted q-mt-md')
elif commit_context is not None:
_render_commit_context(commit_context)
with ui.card().classes('factory-panel q-pa-lg'):
ui.label('Project Timelines').style('font-size: 1.25rem; font-weight: 700; color: #3a281a;')
ui.label('Chronological view of prompts, LLM traces, commits, PR updates, repository sync events, and prompt reverts.').classes('factory-muted')
with ui.row().classes('items-center gap-3 q-mt-md w-full'):
ui.select(
options=['', 'main', 'pr', 'manual'],
value=branch_scope_filter,
on_change=_store_branch_scope,
label='Branch scope',
).classes('min-w-[10rem]')
if projects:
for project_bundle in projects:
project = project_bundle['project']
with ui.expansion(f"{project['project_name']} · {project['project_id']}", icon='schedule').classes('q-mt-md w-full'):
_render_timeline(_filter_timeline_events(project_bundle.get('timeline', []), branch_scope_filter))
else:
ui.label('No project timelines recorded yet.').classes('factory-muted')
with ui.tab_panel('system'):
with ui.grid(columns=2).classes('w-full gap-4'):
with ui.card().classes('factory-panel q-pa-lg'):
@@ -449,6 +1050,47 @@ def create_dashboard():
else:
ui.label('No system logs yet.').classes('factory-muted')
with ui.card().classes('factory-panel q-pa-lg'):
ui.label('Repository Onboarding').style('font-size: 1.25rem; font-weight: 700; color: #3a281a;')
ui.label('Discover repositories in the Gitea organization, onboard manually created repos, and import their recent commits into the dashboard.').classes('factory-muted')
with ui.row().classes('items-center gap-3 q-mt-md w-full'):
ui.input(
label='Owner / org',
value=_selected_repo_owner(),
on_change=_store_repo_owner,
).classes('min-w-[12rem]')
ui.input(
label='Repository name',
value=_selected_repo_name(),
on_change=_store_repo_name,
).classes('min-w-[14rem]')
ui.button('Discover Repos', on_click=discover_gitea_repositories_action).props('outline color=secondary')
ui.button(
'Onboard Repo',
on_click=lambda: onboard_repository_action(_selected_repo_owner(), _selected_repo_name()),
).props('unelevated color=dark')
if discovered_repositories:
for repo in discovered_repositories:
with ui.card().classes('q-pa-sm q-mt-md'):
with ui.row().classes('items-center justify-between w-full'):
with ui.column().classes('gap-1'):
ui.label(repo.get('name') or 'unknown').style('font-weight: 700; color: #2f241d;')
ui.label(repo.get('description') or 'No description').classes('factory-muted')
with ui.row().classes('items-center gap-2'):
if repo.get('onboarded'):
ui.label('onboarded').classes('factory-chip')
if repo.get('project_id'):
ui.label(repo['project_id']).classes('factory-chip')
else:
ui.button(
'Onboard',
on_click=lambda _=None, owner=_selected_repo_owner(), repo_name=repo.get('name'): onboard_repository_action(owner, repo_name),
).props('outline color=secondary')
if repo.get('html_url'):
ui.link(repo['html_url'], repo['html_url'], new_tab=True).classes('factory-code')
else:
ui.label('No discovered repositories loaded yet.').classes('factory-muted q-mt-md')
with ui.card().classes('factory-panel q-pa-lg'):
ui.label('Important Endpoints').style('font-size: 1.25rem; font-weight: 700; color: #3a281a;')
endpoints = [
@@ -458,7 +1100,13 @@ def create_dashboard():
'/audit/projects',
'/audit/prompts',
'/audit/changes',
'/audit/commit-context',
'/audit/timeline',
'/audit/llm-traces',
'/audit/correlations',
'/projects/{project_id}/sync-repository',
'/gitea/repos',
'/gitea/repos/onboard',
'/n8n/health',
'/n8n/setup',
]
@@ -481,6 +1129,7 @@ def create_dashboard():
_render_health_panels()
dashboard_body()
ui.timer(15.0, _run_background_repository_sync)
ui.timer(10.0, dashboard_body.refresh)

View File

@@ -24,6 +24,7 @@ def init(fastapi_app: FastAPI, storage_secret: str = 'Secr2t!') -> None:
"""
def render_dashboard_page() -> None:
ui.page_title('AI Software Factory')
create_dashboard()
# NOTE dark mode will be persistent for each user across tabs and server restarts

View File

@@ -27,19 +27,23 @@ from sqlalchemy.orm import Session
try:
from . import __version__, frontend
from . import database as database_module
from .agents.change_summary import ChangeSummaryGenerator
from .agents.database_manager import DatabaseManager
from .agents.request_interpreter import RequestInterpreter
from .agents.orchestrator import AgentOrchestrator
from .agents.n8n_setup import N8NSetupAgent
from .agents.prompt_workflow import PromptWorkflowManager
from .agents.ui_manager import UIManager
from .models import ProjectHistory, ProjectLog, SystemLog
except ImportError:
import frontend
import database as database_module
from agents.change_summary import ChangeSummaryGenerator
from agents.database_manager import DatabaseManager
from agents.request_interpreter import RequestInterpreter
from agents.orchestrator import AgentOrchestrator
from agents.n8n_setup import N8NSetupAgent
from agents.prompt_workflow import PromptWorkflowManager
from agents.ui_manager import UIManager
from models import ProjectHistory, ProjectLog, SystemLog
@@ -90,6 +94,15 @@ class FreeformSoftwareRequest(BaseModel):
chat_type: str | None = None
class GiteaRepositoryOnboardRequest(BaseModel):
"""Request body for onboarding a manually created Gitea repository."""
repo_name: str = Field(min_length=1, max_length=255)
owner: str | None = None
sync_commits: bool = True
commit_limit: int = Field(default=25, ge=1, le=200)
def _build_project_id(name: str) -> str:
"""Create a stable project id from the requested name."""
slug = PROJECT_ID_PATTERN.sub("-", name.strip().lower()).strip("-") or "project"
@@ -164,7 +177,27 @@ async def _run_generation(
"""Run the shared generation pipeline for a structured request."""
database_module.init_db()
manager = DatabaseManager(db)
reusable_history = manager.get_latest_project_by_name(request.name)
if reusable_history and database_module.settings.gitea_url and database_module.settings.gitea_token:
try:
from .agents.gitea import GiteaAPI
except ImportError:
from agents.gitea import GiteaAPI
manager.sync_pull_request_states(
GiteaAPI(
token=database_module.settings.GITEA_TOKEN,
base_url=database_module.settings.GITEA_URL,
owner=database_module.settings.GITEA_OWNER,
repo=database_module.settings.GITEA_REPO or '',
),
project_id=reusable_history.project_id,
)
if reusable_history and manager.get_open_pull_request(project_id=reusable_history.project_id):
project_id = reusable_history.project_id
else:
project_id = _build_project_id(request.name)
reusable_history = None
resolved_prompt_text = prompt_text or _compose_prompt_text(request)
orchestrator = AgentOrchestrator(
project_id=project_id,
@@ -175,6 +208,7 @@ async def _run_generation(
db=db,
prompt_text=resolved_prompt_text,
prompt_actor=prompt_actor,
existing_history=reusable_history,
)
result = await orchestrator.run()
@@ -195,7 +229,43 @@ async def _run_generation(
response_data['project_root'] = result.get('project_root', str(_project_root(project_id)))
response_data['changed_files'] = result.get('changed_files', [])
response_data['repository'] = result.get('repository')
return {'status': result['status'], 'data': response_data}
response_data['pull_request'] = result.get('pull_request') or manager.get_open_pull_request(project_id=project_id)
summary_context = {
'name': response_data['name'],
'description': response_data['description'],
'features': response_data['features'],
'tech_stack': response_data['tech_stack'],
'changed_files': response_data['changed_files'],
'repository_url': (
(response_data.get('repository') or {}).get('url')
if isinstance(response_data.get('repository'), dict)
and (response_data.get('repository') or {}).get('status') in {'created', 'exists', 'ready', 'shared'}
else None
),
'repository_status': (response_data.get('repository') or {}).get('status') if isinstance(response_data.get('repository'), dict) else None,
'pull_request_url': (response_data.get('pull_request') or {}).get('pr_url') if isinstance(response_data.get('pull_request'), dict) else None,
'pull_request_state': (response_data.get('pull_request') or {}).get('pr_state') if isinstance(response_data.get('pull_request'), dict) else None,
'message': response_data.get('message'),
'logs': [log.get('message', '') for log in response_data.get('logs', []) if isinstance(log, dict)],
}
summary_message, summary_trace = await ChangeSummaryGenerator().summarize_with_trace(summary_context)
if orchestrator.db_manager and orchestrator.history and orchestrator.prompt_audit:
orchestrator.db_manager.log_llm_trace(
project_id=project_id,
history_id=orchestrator.history.id,
prompt_id=orchestrator.prompt_audit.id,
stage=summary_trace['stage'],
provider=summary_trace['provider'],
model=summary_trace['model'],
system_prompt=summary_trace['system_prompt'],
user_prompt=summary_trace['user_prompt'],
assistant_response=summary_trace['assistant_response'],
raw_response=summary_trace.get('raw_response'),
fallback_used=summary_trace.get('fallback_used', False),
)
response_data['summary_message'] = summary_message
response_data['pull_request'] = result.get('pull_request') or manager.get_open_pull_request(project_id=project_id)
return {'status': result['status'], 'data': response_data, 'summary_message': summary_message}
def _project_root(project_id: str) -> Path:
@@ -203,6 +273,22 @@ def _project_root(project_id: str) -> Path:
return database_module.settings.projects_root / project_id
def _create_gitea_api():
"""Create a configured Gitea client or raise an HTTP error if unavailable."""
if not database_module.settings.gitea_url or not database_module.settings.gitea_token:
raise HTTPException(status_code=400, detail='Gitea integration is not configured')
try:
from .agents.gitea import GiteaAPI
except ImportError:
from agents.gitea import GiteaAPI
return GiteaAPI(
token=database_module.settings.GITEA_TOKEN,
base_url=database_module.settings.GITEA_URL,
owner=database_module.settings.GITEA_OWNER,
repo=database_module.settings.GITEA_REPO or '',
)
def _resolve_n8n_api_url(explicit_url: str | None = None) -> str:
"""Resolve the effective n8n API URL from explicit input or settings."""
if explicit_url and explicit_url.strip():
@@ -234,8 +320,16 @@ def read_api_info():
'/audit/system/logs',
'/audit/prompts',
'/audit/changes',
'/audit/commit-context',
'/audit/timeline',
'/audit/llm-traces',
'/audit/pull-requests',
'/audit/lineage',
'/audit/correlations',
'/projects/{project_id}/prompts/{prompt_id}/undo',
'/projects/{project_id}/sync-repository',
'/gitea/repos',
'/gitea/repos/onboard',
'/n8n/health',
'/n8n/setup',
],
@@ -279,7 +373,7 @@ async def generate_software_from_text(request: FreeformSoftwareRequest, db: DbSe
},
}
interpreted = await RequestInterpreter().interpret(request.prompt_text)
interpreted, interpretation_trace = await RequestInterpreter().interpret_with_trace(request.prompt_text)
structured_request = SoftwareRequest(**interpreted)
response = await _run_generation(
structured_request,
@@ -287,7 +381,26 @@ async def generate_software_from_text(request: FreeformSoftwareRequest, db: DbSe
prompt_text=request.prompt_text,
prompt_actor=request.source,
)
project_data = response.get('data', {})
if project_data.get('history_id') is not None:
manager = DatabaseManager(db)
prompts = manager.get_prompt_events(project_id=project_data.get('project_id'))
prompt_id = prompts[0]['id'] if prompts else None
manager.log_llm_trace(
project_id=project_data.get('project_id'),
history_id=project_data.get('history_id'),
prompt_id=prompt_id,
stage=interpretation_trace['stage'],
provider=interpretation_trace['provider'],
model=interpretation_trace['model'],
system_prompt=interpretation_trace['system_prompt'],
user_prompt=interpretation_trace['user_prompt'],
assistant_response=interpretation_trace['assistant_response'],
raw_response=interpretation_trace.get('raw_response'),
fallback_used=interpretation_trace.get('fallback_used', False),
)
response['interpreted_request'] = interpreted
response['llm_trace'] = interpretation_trace
response['source'] = {
'type': request.source,
'chat_id': request.chat_id,
@@ -343,6 +456,54 @@ def get_code_change_audit(db: DbSession, project_id: str | None = Query(default=
return {'changes': [_serialize_audit_item(item) for item in manager.get_code_changes(project_id=project_id)]}
@app.get('/audit/commit-context')
def get_commit_context_audit(
db: DbSession,
commit_hash: str = Query(min_length=4),
project_id: str | None = Query(default=None),
branch_scope: str | None = Query(default=None, pattern='^(main|pr|manual)?$'),
):
"""Return the recorded context explaining how a commit came to be."""
manager = DatabaseManager(db)
context = manager.get_commit_context(commit_hash=commit_hash, project_id=project_id, branch_scope=branch_scope)
if context is None:
raise HTTPException(status_code=404, detail='Commit context not found')
return context
@app.get('/audit/timeline')
def get_project_timeline_audit(
db: DbSession,
project_id: str = Query(min_length=1),
branch_scope: str | None = Query(default=None, pattern='^(main|pr|manual)?$'),
):
"""Return the mixed audit timeline for one project."""
manager = DatabaseManager(db)
return {'timeline': manager.get_project_timeline(project_id=project_id, branch_scope=branch_scope)}
@app.get('/audit/llm-traces')
def get_llm_trace_audit(
db: DbSession,
project_id: str | None = Query(default=None),
prompt_id: int | None = Query(default=None),
stage: str | None = Query(default=None),
model: str | None = Query(default=None),
search: str | None = Query(default=None),
):
"""Return persisted LLM traces."""
manager = DatabaseManager(db)
return {
'llm_traces': manager.get_llm_traces(
project_id=project_id,
prompt_id=prompt_id,
stage=stage,
model=model,
search_query=search,
)
}
@app.get('/audit/lineage')
def get_prompt_change_lineage(db: DbSession, project_id: str | None = Query(default=None)):
"""Return explicit prompt-to-code lineage rows."""
@@ -357,6 +518,84 @@ def get_prompt_change_correlations(db: DbSession, project_id: str | None = Query
return {'correlations': manager.get_prompt_change_correlations(project_id=project_id)}
@app.get('/audit/pull-requests')
def get_pull_request_audit(db: DbSession, project_id: str | None = Query(default=None), open_only: bool = Query(default=False)):
"""Return tracked pull requests for generated projects."""
manager = DatabaseManager(db)
return {'pull_requests': manager.get_pull_requests(project_id=project_id, only_open=open_only)}
@app.post('/projects/{project_id}/prompts/{prompt_id}/undo')
async def undo_prompt_changes(project_id: str, prompt_id: int, db: DbSession):
"""Undo all changes associated with a specific prompt."""
result = await PromptWorkflowManager(db).undo_prompt(project_id=project_id, prompt_id=prompt_id)
if result.get('status') == 'error':
raise HTTPException(status_code=400, detail=result.get('message', 'Undo failed'))
return result
@app.post('/projects/{project_id}/sync-repository')
def sync_project_repository(project_id: str, db: DbSession, commit_limit: int = Query(default=25, ge=1, le=200)):
"""Import recent repository activity from Gitea for a tracked project."""
manager = DatabaseManager(db)
result = manager.sync_repository_activity(project_id=project_id, gitea_api=_create_gitea_api(), commit_limit=commit_limit)
if result.get('status') == 'error':
raise HTTPException(status_code=400, detail=result.get('message', 'Repository sync failed'))
return result
@app.get('/gitea/repos')
def list_gitea_repositories(db: DbSession, owner: str | None = Query(default=None)):
"""List repositories in the configured Gitea organization and whether they are already onboarded."""
gitea_api = _create_gitea_api()
resolved_owner = owner or database_module.settings.gitea_owner
repos = gitea_api.list_repositories_sync(owner=resolved_owner)
if isinstance(repos, dict) and repos.get('error'):
raise HTTPException(status_code=502, detail=repos.get('error'))
manager = DatabaseManager(db)
items = []
for repo in repos if isinstance(repos, list) else []:
tracked_project = manager.get_project_by_repository(resolved_owner, repo.get('name', ''))
items.append(
{
'name': repo.get('name'),
'full_name': repo.get('full_name') or f"{resolved_owner}/{repo.get('name')}",
'description': repo.get('description'),
'html_url': repo.get('html_url'),
'clone_url': repo.get('clone_url'),
'default_branch': repo.get('default_branch'),
'private': bool(repo.get('private', False)),
'onboarded': tracked_project is not None,
'project_id': tracked_project.project_id if tracked_project is not None else None,
}
)
return {'repositories': items}
@app.post('/gitea/repos/onboard')
async def onboard_gitea_repository(request: GiteaRepositoryOnboardRequest, db: DbSession):
"""Onboard a manually created Gitea repository into the factory dashboard."""
gitea_api = _create_gitea_api()
owner = request.owner or database_module.settings.gitea_owner
repo = await gitea_api.get_repo_info(owner=owner, repo=request.repo_name)
if isinstance(repo, dict) and repo.get('error'):
raise HTTPException(status_code=404, detail=repo.get('error'))
manager = DatabaseManager(db)
onboarded = manager.onboard_repository(owner=owner, repo_name=request.repo_name, repository_data=repo)
sync_result = None
if request.sync_commits:
sync_result = manager.sync_repository_activity(
project_id=onboarded['project_id'],
gitea_api=gitea_api,
commit_limit=request.commit_limit,
)
return {
'status': 'success',
'onboarded': onboarded,
'sync_result': sync_result,
}
@app.get('/audit/logs')
def get_audit_logs(db: DbSession):
"""Return all project logs ordered newest first."""