5 Commits
0.7.0 ... 0.8.0

Author SHA1 Message Date
798bb218f8 release: version 0.8.0 🚀
All checks were successful
Upload Python Package / Create Release (push) Successful in 33s
Upload Python Package / deploy (push) Successful in 41s
2026-04-11 10:30:59 +02:00
3d77ac3104 feat: better dashboard reloading mechanism, refs NOISSUE 2026-04-11 10:30:56 +02:00
f6681a0f85 feat: add explicit workflow steps and guardrail prompts, refs NOISSUE 2026-04-11 10:06:50 +02:00
ed8dc48280 release: version 0.7.1 🚀
All checks were successful
Upload Python Package / Create Release (push) Successful in 36s
Upload Python Package / deploy (push) Successful in 1m24s
2026-04-11 09:21:15 +02:00
c3cf8da42d fix: add additional deletion confirmation, refs NOISSUE 2026-04-11 09:21:12 +02:00
14 changed files with 1808 additions and 490 deletions

View File

@@ -4,6 +4,26 @@ Changelog
(unreleased)
------------
- Feat: better dashboard reloading mechanism, refs NOISSUE. [Simon
Diesenreiter]
- Feat: add explicit workflow steps and guardrail prompts, refs NOISSUE.
[Simon Diesenreiter]
0.7.1 (2026-04-11)
------------------
Fix
~~~
- Add additional deletion confirmation, refs NOISSUE. [Simon
Diesenreiter]
Other
~~~~~
0.7.0 (2026-04-10)
------------------
- Feat: gitea issue integration, refs NOISSUE. [Simon Diesenreiter]
- Feat: better history data, refs NOISSUE. [Simon Diesenreiter]

View File

@@ -8,6 +8,19 @@ LOG_LEVEL=INFO
# Ollama
OLLAMA_URL=http://localhost:11434
OLLAMA_MODEL=llama3
LLM_GUARDRAIL_PROMPT=You are operating inside AI Software Factory. Follow supplied schemas exactly and treat service-provided tool outputs as authoritative.
LLM_REQUEST_INTERPRETER_GUARDRAIL_PROMPT=Never route work to archived projects and only reference issues that are explicit in the prompt or supplied tool outputs.
LLM_CHANGE_SUMMARY_GUARDRAIL_PROMPT=Only summarize delivery facts that appear in the provided project context or tool outputs.
LLM_PROJECT_NAMING_GUARDRAIL_PROMPT=Prefer clear product names and repository slugs that reflect the new request without colliding with tracked projects.
LLM_PROJECT_NAMING_SYSTEM_PROMPT=Return JSON with project_name, repo_name, and rationale for new projects.
LLM_PROJECT_ID_GUARDRAIL_PROMPT=Prefer short stable project ids and avoid collisions with existing project ids.
LLM_PROJECT_ID_SYSTEM_PROMPT=Return JSON with project_id and rationale for new projects.
LLM_TOOL_ALLOWLIST=gitea_project_catalog,gitea_project_state,gitea_project_issues,gitea_pull_requests
LLM_TOOL_CONTEXT_LIMIT=5
LLM_LIVE_TOOL_ALLOWLIST=gitea_lookup_issue,gitea_lookup_pull_request
LLM_LIVE_TOOL_STAGE_ALLOWLIST=request_interpretation,change_summary
LLM_LIVE_TOOL_STAGE_TOOL_MAP={"request_interpretation": ["gitea_lookup_issue", "gitea_lookup_pull_request"], "change_summary": []}
LLM_MAX_TOOL_CALL_ROUNDS=1
# Gitea
# Configure Gitea API for your organization

View File

@@ -6,6 +6,7 @@ Automated software generation service powered by Ollama LLM. This service allows
- **Telegram Integration**: Receive software requests via Telegram bot
- **Ollama LLM**: Uses Ollama-hosted models for code generation
- **LLM Guardrails and Tools**: Centralized guardrail prompts plus mediated tool payloads for project, Gitea, PR, and issue context
- **Git Integration**: Automatically commits code to gitea
- **Pull Requests**: Creates PRs for user review before merging
- **Web UI**: Beautiful dashboard for monitoring project progress
@@ -46,6 +47,19 @@ PORT=8000
# Ollama
OLLAMA_URL=http://localhost:11434
OLLAMA_MODEL=llama3
LLM_GUARDRAIL_PROMPT=You are operating inside AI Software Factory. Follow supplied schemas exactly and treat service-provided tool outputs as authoritative.
LLM_REQUEST_INTERPRETER_GUARDRAIL_PROMPT=Never route work to archived projects and only reference issues that are explicit in the prompt or supplied tool outputs.
LLM_CHANGE_SUMMARY_GUARDRAIL_PROMPT=Only summarize delivery facts that appear in the provided project context or tool outputs.
LLM_PROJECT_NAMING_GUARDRAIL_PROMPT=Prefer clear product names and repository slugs that reflect the new request without colliding with tracked projects.
LLM_PROJECT_NAMING_SYSTEM_PROMPT=Return JSON with project_name, repo_name, and rationale for new projects.
LLM_PROJECT_ID_GUARDRAIL_PROMPT=Prefer short stable project ids and avoid collisions with existing project ids.
LLM_PROJECT_ID_SYSTEM_PROMPT=Return JSON with project_id and rationale for new projects.
LLM_TOOL_ALLOWLIST=gitea_project_catalog,gitea_project_state,gitea_project_issues,gitea_pull_requests
LLM_TOOL_CONTEXT_LIMIT=5
LLM_LIVE_TOOL_ALLOWLIST=gitea_lookup_issue,gitea_lookup_pull_request
LLM_LIVE_TOOL_STAGE_ALLOWLIST=request_interpretation,change_summary
LLM_LIVE_TOOL_STAGE_TOOL_MAP={"request_interpretation": ["gitea_lookup_issue", "gitea_lookup_pull_request"], "change_summary": []}
LLM_MAX_TOOL_CALL_ROUNDS=1
# Gitea
GITEA_URL=https://gitea.yourserver.com
@@ -99,6 +113,33 @@ docker-compose up -d
| `/status/{project_id}` | GET | Get project status |
| `/projects` | GET | List all projects |
## LLM Guardrails and Tool Access
External LLM calls are now routed through a centralized client that applies:
- A global guardrail prompt for every outbound model request
- Stage-specific guardrails for request interpretation and change summaries
- Service-mediated tool outputs that expose tracked Gitea/project state without giving the model raw credentials
Current mediated tools include:
- `gitea_project_catalog`: active tracked projects and repository mappings
- `gitea_project_state`: current repository, PR, and linked-issue state for the project in scope
- `gitea_project_issues`: tracked open issues for the relevant repository
- `gitea_pull_requests`: tracked pull requests for the relevant repository
The service also supports a bounded live tool-call loop for selected lookups. When enabled, the model may request one live call such as `gitea_lookup_issue` or `gitea_lookup_pull_request`, the service executes it against Gitea, and the final model response is generated from the returned result. This remains mediated by the service, so the model never receives raw credentials.
Live tool access is stage-aware. `LLM_LIVE_TOOL_ALLOWLIST` controls which live tools exist globally, while `LLM_LIVE_TOOL_STAGE_ALLOWLIST` controls which LLM stages may use them. If you need per-stage subsets, `LLM_LIVE_TOOL_STAGE_TOOL_MAP` accepts a JSON object mapping each stage to the exact tools it may use. For example, you can allow issue and PR lookups during `request_interpretation` while keeping `change_summary` fully read-only.
When the interpreter decides a prompt starts a new project, the service can run a dedicated `project_naming` LLM stage before generation. `LLM_PROJECT_NAMING_SYSTEM_PROMPT` and `LLM_PROJECT_NAMING_GUARDRAIL_PROMPT` let you steer how project titles and repository slugs are chosen. The interpreter now checks tracked project repositories plus live Gitea repository names when available, so if the model suggests a colliding repo slug the service will automatically move to the next available slug.
New project creation can also run a dedicated `project_id_naming` stage. `LLM_PROJECT_ID_SYSTEM_PROMPT` and `LLM_PROJECT_ID_GUARDRAIL_PROMPT` control how stable project ids are chosen, and the service will append deterministic numeric suffixes when an id is already taken instead of always falling back to a random UUID-based id.
Runtime visibility for the active guardrails, mediated tools, live tools, and model configuration is available at `/llm/runtime` and in the dashboard System tab.
These tool payloads are appended to the model prompt as authoritative JSON generated by the service, so the LLM can reason over live project and Gitea context while remaining constrained by the configured guardrails.
## Development
### Makefile Targets

View File

@@ -1 +1 @@
0.7.0
0.8.0

View File

@@ -4,8 +4,10 @@ from __future__ import annotations
try:
from ..config import settings
from .llm_service import LLMServiceClient
except ImportError:
from config import settings
from agents.llm_service import LLMServiceClient
class ChangeSummaryGenerator:
@@ -14,6 +16,7 @@ class ChangeSummaryGenerator:
def __init__(self, ollama_url: str | None = None, model: str | None = None):
self.ollama_url = (ollama_url or settings.ollama_url).rstrip('/')
self.model = model or settings.OLLAMA_MODEL
self.llm_client = LLMServiceClient(ollama_url=self.ollama_url, model=self.model)
async def summarize(self, context: dict) -> str:
"""Summarize project changes with Ollama, or fall back to a deterministic overview."""
@@ -28,40 +31,24 @@ class ChangeSummaryGenerator:
'Write 3 to 5 sentences. Mention the application goal, main delivered pieces, '
'technical direction, and what the user should expect next. Avoid markdown bullets.'
)
try:
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.post(
f'{self.ollama_url}/api/chat',
json={
'model': self.model,
'stream': False,
'messages': [
{
'role': 'system',
'content': system_prompt,
content, trace = await self.llm_client.chat_with_trace(
stage='change_summary',
system_prompt=system_prompt,
user_prompt=prompt,
tool_context_input={
'project_id': context.get('project_id'),
'project_name': context.get('name'),
'repository': context.get('repository'),
'repository_url': context.get('repository_url'),
'pull_request': context.get('pull_request'),
'pull_request_url': context.get('pull_request_url'),
'pull_request_state': context.get('pull_request_state'),
'related_issue': context.get('related_issue'),
'issues': [context.get('related_issue')] if context.get('related_issue') else [],
},
{'role': 'user', 'content': prompt},
],
},
) as resp:
payload = await resp.json()
if 200 <= resp.status < 300:
content = payload.get('message', {}).get('content', '').strip()
)
if content:
return content, {
'stage': 'change_summary',
'provider': 'ollama',
'model': self.model,
'system_prompt': system_prompt,
'user_prompt': prompt,
'assistant_response': content,
'raw_response': payload,
'fallback_used': False,
}
except Exception:
pass
return content.strip(), trace
fallback = self._fallback(context)
return fallback, {
@@ -71,7 +58,9 @@ class ChangeSummaryGenerator:
'system_prompt': system_prompt,
'user_prompt': prompt,
'assistant_response': fallback,
'raw_response': {'fallback': 'deterministic'},
'raw_response': {'fallback': 'deterministic', 'llm_trace': trace.get('raw_response') if isinstance(trace, dict) else None},
'guardrails': trace.get('guardrails') if isinstance(trace, dict) else [],
'tool_context': trace.get('tool_context') if isinstance(trace, dict) else [],
'fallback_used': True,
}

View File

@@ -34,6 +34,7 @@ except ImportError:
from datetime import datetime
import json
import re
import shutil
class DatabaseMigrations:
@@ -87,6 +88,11 @@ class DatabaseManager:
self.db = db
self.migrations = DatabaseMigrations(self.db)
@staticmethod
def _is_archived_status(status: str | None) -> bool:
"""Return whether a project status represents an archived project."""
return (status or '').strip().lower() == 'archived'
@staticmethod
def _normalize_metadata(metadata: object) -> dict:
"""Normalize JSON-like metadata stored in audit columns."""
@@ -111,13 +117,15 @@ class DatabaseManager:
sanitized = sanitized.replace('--', '-')
return sanitized.strip('-') or 'external-project'
def get_project_by_repository(self, owner: str, repo_name: str) -> ProjectHistory | None:
def get_project_by_repository(self, owner: str, repo_name: str, include_archived: bool = False) -> ProjectHistory | None:
"""Return the project currently associated with a repository."""
normalized_owner = (owner or '').strip().lower()
normalized_repo = (repo_name or '').strip().lower()
if not normalized_owner or not normalized_repo:
return None
for history in self.db.query(ProjectHistory).order_by(ProjectHistory.updated_at.desc(), ProjectHistory.id.desc()).all():
if not include_archived and self._is_archived_status(history.status):
continue
repository = self._get_project_repository(history) or {}
if (repository.get('owner') or '').strip().lower() == normalized_owner and (repository.get('name') or '').strip().lower() == normalized_repo:
return history
@@ -736,12 +744,6 @@ class DatabaseManager:
self.db.commit()
return updates
def get_latest_project_by_name(self, project_name: str) -> ProjectHistory | None:
"""Return the most recently updated project with the requested name."""
return self.db.query(ProjectHistory).filter(
ProjectHistory.project_name == project_name
).order_by(ProjectHistory.updated_at.desc(), ProjectHistory.id.desc()).first()
def log_prompt_revert(
self,
project_id: str,
@@ -813,9 +815,14 @@ class DatabaseManager:
}
return None
def get_project_by_id(self, project_id: str) -> ProjectHistory | None:
def get_project_by_id(self, project_id: str, include_archived: bool = True) -> ProjectHistory | None:
"""Get project by ID."""
return self.db.query(ProjectHistory).filter(ProjectHistory.project_id == project_id).first()
history = self.db.query(ProjectHistory).filter(ProjectHistory.project_id == project_id).first()
if history is None:
return None
if not include_archived and self._is_archived_status(history.status):
return None
return history
def get_recent_chat_history(self, chat_id: str, source: str = 'telegram', limit: int = 12) -> list[dict]:
"""Return recent prompt events for one chat/source conversation."""
@@ -832,6 +839,9 @@ class DatabaseManager:
continue
if str(source_context.get('chat_id') or '') != str(chat_id):
continue
history = self.get_project_by_id(prompt.project_id)
if history is None or self._is_archived_status(history.status):
continue
result.append(
{
'prompt_id': prompt.id,
@@ -875,9 +885,96 @@ class DatabaseManager:
'projects': projects,
}
def get_all_projects(self) -> list[ProjectHistory]:
"""Get all projects."""
return self.db.query(ProjectHistory).all()
def get_all_projects(self, include_archived: bool = False, archived_only: bool = False) -> list[ProjectHistory]:
"""Get tracked projects with optional archive filtering."""
projects = self.db.query(ProjectHistory).order_by(ProjectHistory.updated_at.desc(), ProjectHistory.id.desc()).all()
if archived_only:
return [project for project in projects if self._is_archived_status(project.status)]
if include_archived:
return projects
return [project for project in projects if not self._is_archived_status(project.status)]
def get_latest_project_by_name(self, project_name: str, include_archived: bool = False) -> ProjectHistory | None:
"""Return the latest project matching a human-readable project name."""
if not project_name:
return None
query = self.db.query(ProjectHistory).filter(ProjectHistory.project_name == project_name).order_by(
ProjectHistory.updated_at.desc(), ProjectHistory.id.desc()
)
for history in query.all():
if include_archived or not self._is_archived_status(history.status):
return history
return None
def archive_project(self, project_id: str) -> dict:
"""Archive a project so it no longer participates in active automation."""
history = self.get_project_by_id(project_id)
if history is None:
return {'status': 'error', 'message': 'Project not found'}
if self._is_archived_status(history.status):
return {'status': 'success', 'message': 'Project already archived', 'project_id': project_id}
history.status = 'archived'
history.message = 'Project archived'
history.current_step = 'archived'
history.updated_at = datetime.utcnow()
self.db.commit()
self._log_audit_trail(
project_id=project_id,
action='PROJECT_ARCHIVED',
actor='user',
action_type='ARCHIVE',
details=f'Project {project_id} archived',
message='Project archived',
)
return {'status': 'success', 'message': 'Project archived', 'project_id': project_id}
def unarchive_project(self, project_id: str) -> dict:
"""Restore an archived project to the active automation set."""
history = self.get_project_by_id(project_id)
if history is None:
return {'status': 'error', 'message': 'Project not found'}
if not self._is_archived_status(history.status):
return {'status': 'success', 'message': 'Project is already active', 'project_id': project_id}
history.status = ProjectStatus.COMPLETED.value if history.completed_at else ProjectStatus.STARTED.value
history.message = 'Project restored from archive'
history.current_step = 'restored'
history.updated_at = datetime.utcnow()
self.db.commit()
self._log_audit_trail(
project_id=project_id,
action='PROJECT_UNARCHIVED',
actor='user',
action_type='RESTORE',
details=f'Project {project_id} restored from archive',
message='Project restored from archive',
)
return {'status': 'success', 'message': 'Project restored from archive', 'project_id': project_id}
def delete_project(self, project_id: str, delete_project_root: bool = True) -> dict:
"""Delete a project and all project-scoped traces from the database."""
history = self.get_project_by_id(project_id)
if history is None:
return {'status': 'error', 'message': 'Project not found'}
snapshot_data = self._get_latest_ui_snapshot_data(history.id)
project_root = snapshot_data.get('project_root') or str(settings.projects_root / project_id)
self.db.query(PromptCodeLink).filter(PromptCodeLink.history_id == history.id).delete()
self.db.query(PullRequest).filter(PullRequest.history_id == history.id).delete()
self.db.query(PullRequestData).filter(PullRequestData.history_id == history.id).delete()
self.db.query(UISnapshot).filter(UISnapshot.history_id == history.id).delete()
self.db.query(UserAction).filter(UserAction.history_id == history.id).delete()
self.db.query(ProjectLog).filter(ProjectLog.history_id == history.id).delete()
self.db.query(AuditTrail).filter(AuditTrail.project_id == project_id).delete()
self.db.delete(history)
self.db.commit()
if delete_project_root and project_root:
shutil.rmtree(project_root, ignore_errors=True)
return {
'status': 'success',
'message': 'Project deleted',
'project_id': project_id,
'project_root_deleted': bool(delete_project_root and project_root),
'project_root': project_root,
}
def get_project_logs(self, history_id: int, limit: int = 100) -> list[ProjectLog]:
"""Get project logs."""
@@ -1906,14 +2003,17 @@ class DatabaseManager:
)
except Exception:
pass
projects = self.db.query(ProjectHistory).order_by(ProjectHistory.updated_at.desc()).limit(limit).all()
active_projects = self.get_all_projects()
archived_projects = self.get_all_projects(archived_only=True)
projects = active_projects[:limit]
system_logs = self.db.query(SystemLog).order_by(SystemLog.created_at.desc()).limit(limit).all()
return {
"summary": {
"total_projects": self.db.query(ProjectHistory).count(),
"running_projects": self.db.query(ProjectHistory).filter(ProjectHistory.status == ProjectStatus.RUNNING.value).count(),
"completed_projects": self.db.query(ProjectHistory).filter(ProjectHistory.status == ProjectStatus.COMPLETED.value).count(),
"error_projects": self.db.query(ProjectHistory).filter(ProjectHistory.status == ProjectStatus.ERROR.value).count(),
"total_projects": len(active_projects),
"archived_projects": len(archived_projects),
"running_projects": len([project for project in active_projects if project.status == ProjectStatus.RUNNING.value]),
"completed_projects": len([project for project in active_projects if project.status == ProjectStatus.COMPLETED.value]),
"error_projects": len([project for project in active_projects if project.status == ProjectStatus.ERROR.value]),
"prompt_events": self.db.query(AuditTrail).filter(AuditTrail.action == "PROMPT_RECEIVED").count(),
"code_changes": self.db.query(AuditTrail).filter(AuditTrail.action == "CODE_CHANGE").count(),
"open_pull_requests": self.db.query(PullRequest).filter(PullRequest.pr_state == "open", PullRequest.merged.is_(False)).count(),
@@ -1921,6 +2021,7 @@ class DatabaseManager:
"issue_work_events": self.db.query(AuditTrail).filter(AuditTrail.action == "ISSUE_WORKED").count(),
},
"projects": [self.get_project_audit_data(project.project_id) for project in projects],
"archived_projects": [self.get_project_audit_data(project.project_id) for project in archived_projects[:limit]],
"system_logs": [
{
"id": log.id,

View File

@@ -1,6 +1,7 @@
"""Git manager for project operations."""
import os
import shutil
import subprocess
import tempfile
from pathlib import Path
@@ -32,8 +33,18 @@ class GitManager:
resolved = (base_root / project_id).resolve()
self.project_dir = str(resolved)
def is_git_available(self) -> bool:
"""Return whether the git executable is available in the current environment."""
return shutil.which('git') is not None
def _ensure_git_available(self) -> None:
"""Raise a clear error when git is not installed in the runtime environment."""
if not self.is_git_available():
raise RuntimeError('git executable is not available in PATH')
def _run(self, args: list[str], env: dict | None = None, check: bool = True) -> subprocess.CompletedProcess:
"""Run a git command in the project directory."""
self._ensure_git_available()
return subprocess.run(
args,
check=check,

View File

@@ -156,6 +156,28 @@ class GiteaAPI:
result.setdefault("status", "created")
return result
async def delete_repo(self, owner: str | None = None, repo: str | None = None) -> dict:
"""Delete a repository from the configured organization/user."""
_owner = owner or self.owner
_repo = repo or self.repo
if not _owner or not _repo:
return {'error': 'Owner and repository name are required'}
result = await self._request('DELETE', f'repos/{_owner}/{_repo}')
if not result.get('error'):
result.setdefault('status', 'deleted')
return result
def delete_repo_sync(self, owner: str | None = None, repo: str | None = None) -> dict:
"""Synchronously delete a repository from the configured organization/user."""
_owner = owner or self.owner
_repo = repo or self.repo
if not _owner or not _repo:
return {'error': 'Owner and repository name are required'}
result = self._request_sync('DELETE', f'repos/{_owner}/{_repo}')
if not result.get('error'):
result.setdefault('status', 'deleted')
return result
async def get_current_user(self) -> dict:
"""Get the user associated with the configured token."""
return await self._request("GET", "user")

View File

@@ -0,0 +1,394 @@
"""Centralized LLM client with guardrails and mediated tool context."""
from __future__ import annotations
import json
try:
from .gitea import GiteaAPI
except ImportError:
from gitea import GiteaAPI
try:
from ..config import settings
except ImportError:
from config import settings
class LLMToolbox:
"""Build named tool payloads that can be shared with external LLM providers."""
SUPPORTED_LIVE_TOOL_STAGES = ('request_interpretation', 'change_summary', 'generation_plan', 'project_naming', 'project_id_naming')
def build_tool_context(self, stage: str, context: dict | None = None) -> list[dict]:
"""Return the mediated tool payloads allowed for this LLM request."""
context = context or {}
allowed = set(settings.llm_tool_allowlist)
limit = settings.llm_tool_context_limit
tool_context: list[dict] = []
if 'gitea_project_catalog' in allowed:
projects = context.get('projects') or []
if projects:
tool_context.append(
{
'name': 'gitea_project_catalog',
'description': 'Tracked active projects and their repository mappings inside the factory.',
'payload': projects[:limit],
}
)
if 'gitea_project_state' in allowed:
state_payload = {
'project_id': context.get('project_id'),
'project_name': context.get('project_name') or context.get('name'),
'repository': context.get('repository'),
'repository_url': context.get('repository_url'),
'pull_request': context.get('pull_request'),
'pull_request_url': context.get('pull_request_url'),
'pull_request_state': context.get('pull_request_state'),
'related_issue': context.get('related_issue'),
}
if any(value for value in state_payload.values()):
tool_context.append(
{
'name': 'gitea_project_state',
'description': 'Current repository and pull-request state for the project being discussed.',
'payload': state_payload,
}
)
if 'gitea_project_issues' in allowed:
issues = context.get('open_issues') or context.get('issues') or []
if issues:
tool_context.append(
{
'name': 'gitea_project_issues',
'description': 'Open tracked Gitea issues for the relevant project repository.',
'payload': issues[:limit],
}
)
if 'gitea_pull_requests' in allowed:
pull_requests = context.get('pull_requests') or []
if pull_requests:
tool_context.append(
{
'name': 'gitea_pull_requests',
'description': 'Tracked pull requests associated with the relevant project repository.',
'payload': pull_requests[:limit],
}
)
return tool_context
def build_live_tool_specs(self, stage: str, context: dict | None = None) -> list[dict]:
"""Return live tool-call specs that the model may request explicitly."""
_context = context or {}
specs = []
allowed = set(settings.llm_live_tools_for_stage(stage))
if 'gitea_lookup_issue' in allowed:
specs.append(
{
'name': 'gitea_lookup_issue',
'description': 'Fetch one live Gitea issue by issue number for a tracked repository.',
'arguments': {
'project_id': 'optional tracked project id',
'owner': 'optional repository owner override',
'repo': 'optional repository name override',
'issue_number': 'required integer issue number',
},
}
)
if 'gitea_lookup_pull_request' in allowed:
specs.append(
{
'name': 'gitea_lookup_pull_request',
'description': 'Fetch one live Gitea pull request by PR number for a tracked repository.',
'arguments': {
'project_id': 'optional tracked project id',
'owner': 'optional repository owner override',
'repo': 'optional repository name override',
'pr_number': 'required integer pull request number',
},
}
)
return specs
class LLMLiveToolExecutor:
"""Resolve bounded live tool requests on behalf of the model."""
def __init__(self):
self.gitea_api = None
if settings.gitea_url and settings.gitea_token:
self.gitea_api = GiteaAPI(
token=settings.GITEA_TOKEN,
base_url=settings.GITEA_URL,
owner=settings.GITEA_OWNER,
repo=settings.GITEA_REPO or '',
)
async def execute(self, tool_name: str, arguments: dict, context: dict | None = None) -> dict:
"""Execute one live tool request and normalize the result."""
if tool_name not in set(settings.llm_live_tool_allowlist):
return {'error': f'Tool {tool_name} is not enabled'}
if self.gitea_api is None:
return {'error': 'Gitea live tool execution is not configured'}
resolved = self._resolve_repository(arguments=arguments, context=context or {})
if resolved.get('error'):
return resolved
owner = resolved['owner']
repo = resolved['repo']
if tool_name == 'gitea_lookup_issue':
issue_number = arguments.get('issue_number')
if issue_number is None:
return {'error': 'issue_number is required'}
return await self.gitea_api.get_issue(issue_number=int(issue_number), owner=owner, repo=repo)
if tool_name == 'gitea_lookup_pull_request':
pr_number = arguments.get('pr_number')
if pr_number is None:
return {'error': 'pr_number is required'}
return await self.gitea_api.get_pull_request(pr_number=int(pr_number), owner=owner, repo=repo)
return {'error': f'Unsupported tool {tool_name}'}
def _resolve_repository(self, arguments: dict, context: dict) -> dict:
"""Resolve repository owner/name from explicit args or tracked project context."""
owner = arguments.get('owner')
repo = arguments.get('repo')
if owner and repo:
return {'owner': owner, 'repo': repo}
project_id = arguments.get('project_id')
if project_id:
for project in context.get('projects', []):
if project.get('project_id') == project_id:
repository = project.get('repository') or {}
if repository.get('owner') and repository.get('name'):
return {'owner': repository['owner'], 'repo': repository['name']}
state = context.get('repository') or {}
if context.get('project_id') == project_id and state.get('owner') and state.get('name'):
return {'owner': state['owner'], 'repo': state['name']}
repository = context.get('repository') or {}
if repository.get('owner') and repository.get('name'):
return {'owner': repository['owner'], 'repo': repository['name']}
return {'error': 'Could not resolve repository for tool request'}
class LLMServiceClient:
"""Call the configured LLM provider with consistent guardrails and tool payloads."""
def __init__(self, ollama_url: str | None = None, model: str | None = None):
self.ollama_url = (ollama_url or settings.ollama_url).rstrip('/')
self.model = model or settings.OLLAMA_MODEL
self.toolbox = LLMToolbox()
self.live_tool_executor = LLMLiveToolExecutor()
async def chat_with_trace(
self,
*,
stage: str,
system_prompt: str,
user_prompt: str,
tool_context_input: dict | None = None,
expect_json: bool = False,
) -> tuple[str | None, dict]:
"""Invoke the configured LLM and return both content and a structured trace."""
effective_system_prompt = self._compose_system_prompt(stage, system_prompt)
tool_context = self.toolbox.build_tool_context(stage=stage, context=tool_context_input)
live_tool_specs = self.toolbox.build_live_tool_specs(stage=stage, context=tool_context_input)
effective_user_prompt = self._compose_user_prompt(user_prompt, tool_context, live_tool_specs)
raw_responses: list[dict] = []
executed_tool_calls: list[dict] = []
current_user_prompt = effective_user_prompt
max_rounds = settings.llm_max_tool_call_rounds
for round_index in range(max_rounds + 1):
content, payload, error = await self._send_chat_request(
system_prompt=effective_system_prompt,
user_prompt=current_user_prompt,
expect_json=expect_json,
)
raw_responses.append(payload)
if content:
tool_request = self._extract_tool_request(content)
if tool_request and round_index < max_rounds:
tool_name = tool_request.get('name')
tool_arguments = tool_request.get('arguments') or {}
tool_result = await self.live_tool_executor.execute(tool_name, tool_arguments, tool_context_input)
executed_tool_calls.append(
{
'name': tool_name,
'arguments': tool_arguments,
'result': tool_result,
}
)
current_user_prompt = self._compose_follow_up_prompt(user_prompt, tool_context, live_tool_specs, executed_tool_calls)
continue
return content, {
'stage': stage,
'provider': 'ollama',
'model': self.model,
'system_prompt': effective_system_prompt,
'user_prompt': current_user_prompt,
'assistant_response': content,
'raw_response': {
'provider_response': raw_responses[-1],
'provider_responses': raw_responses,
'tool_context': tool_context,
'live_tool_specs': live_tool_specs,
'executed_tool_calls': executed_tool_calls,
},
'raw_responses': raw_responses,
'fallback_used': False,
'guardrails': self._guardrail_sections(stage),
'tool_context': tool_context,
'live_tool_specs': live_tool_specs,
'executed_tool_calls': executed_tool_calls,
}
if error:
break
return None, {
'stage': stage,
'provider': 'ollama',
'model': self.model,
'system_prompt': effective_system_prompt,
'user_prompt': current_user_prompt,
'assistant_response': '',
'raw_response': {
'provider_response': raw_responses[-1] if raw_responses else {'error': 'No response'},
'provider_responses': raw_responses,
'tool_context': tool_context,
'live_tool_specs': live_tool_specs,
'executed_tool_calls': executed_tool_calls,
},
'raw_responses': raw_responses,
'fallback_used': True,
'guardrails': self._guardrail_sections(stage),
'tool_context': tool_context,
'live_tool_specs': live_tool_specs,
'executed_tool_calls': executed_tool_calls,
}
async def _send_chat_request(self, *, system_prompt: str, user_prompt: str, expect_json: bool) -> tuple[str | None, dict, str | None]:
"""Send one outbound chat request to the configured model provider."""
request_payload = {
'model': self.model,
'stream': False,
'messages': [
{'role': 'system', 'content': system_prompt},
{'role': 'user', 'content': user_prompt},
],
}
if expect_json:
request_payload['format'] = 'json'
try:
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.post(f'{self.ollama_url}/api/chat', json=request_payload) as resp:
payload = await resp.json()
if 200 <= resp.status < 300:
return (payload.get('message') or {}).get('content', ''), payload, None
return None, payload, str(payload.get('error') or payload)
except Exception as exc:
return None, {'error': str(exc)}, str(exc)
def _compose_system_prompt(self, stage: str, stage_prompt: str) -> str:
"""Merge the stage prompt with configured guardrails."""
sections = [stage_prompt.strip()] + self._guardrail_sections(stage)
return '\n\n'.join(section for section in sections if section)
def _guardrail_sections(self, stage: str) -> list[str]:
"""Return all configured guardrail sections for one LLM stage."""
sections = []
if settings.llm_guardrail_prompt:
sections.append(f'Global guardrails:\n{settings.llm_guardrail_prompt}')
stage_specific = {
'request_interpretation': settings.llm_request_interpreter_guardrail_prompt,
'change_summary': settings.llm_change_summary_guardrail_prompt,
'project_naming': settings.llm_project_naming_guardrail_prompt,
'project_id_naming': settings.llm_project_id_guardrail_prompt,
}.get(stage)
if stage_specific:
sections.append(f'Stage-specific guardrails:\n{stage_specific}')
return sections
def _compose_user_prompt(self, prompt: str, tool_context: list[dict], live_tool_specs: list[dict] | None = None) -> str:
"""Append tool payloads and live tool-call specs to the outbound user prompt."""
live_tool_specs = live_tool_specs if live_tool_specs is not None else []
sections = [prompt]
if not tool_context:
pass
else:
sections.append(
'Service-mediated tool outputs are available below. Treat them as authoritative read-only data supplied by the factory:\n'
f'{json.dumps(tool_context, indent=2, sort_keys=True)}'
)
if live_tool_specs:
sections.append(
'If you need additional live repository data, you may request exactly one tool call by responding with JSON shaped as '
'{"tool_request": {"name": "<tool name>", "arguments": {...}}}. '
'After tool results are returned, respond with the final answer instead of another tool request.\n'
f'Available live tools:\n{json.dumps(live_tool_specs, indent=2, sort_keys=True)}'
)
return '\n\n'.join(section for section in sections if section)
def _compose_follow_up_prompt(self, original_prompt: str, tool_context: list[dict], live_tool_specs: list[dict], executed_tool_calls: list[dict]) -> str:
"""Build the follow-up user prompt after executing one or more live tool requests."""
sections = [self._compose_user_prompt(original_prompt, tool_context, live_tool_specs)]
sections.append(
'The service executed the requested live tool call(s). Use the tool result(s) below to produce the final answer. Do not request another tool call.\n'
f'{json.dumps(executed_tool_calls, indent=2, sort_keys=True)}'
)
return '\n\n'.join(sections)
def _extract_tool_request(self, content: str) -> dict | None:
"""Return a normalized tool request when the model explicitly asks for one."""
try:
parsed = json.loads(content)
except Exception:
return None
if not isinstance(parsed, dict):
return None
tool_request = parsed.get('tool_request')
if not isinstance(tool_request, dict) or not tool_request.get('name'):
return None
return {
'name': str(tool_request.get('name')).strip(),
'arguments': tool_request.get('arguments') or {},
}
def get_runtime_configuration(self) -> dict:
"""Return the active LLM runtime config, guardrails, and tool exposure."""
live_tool_stages = {
stage: settings.llm_live_tools_for_stage(stage)
for stage in self.toolbox.SUPPORTED_LIVE_TOOL_STAGES
}
return {
'provider': 'ollama',
'ollama_url': self.ollama_url,
'model': self.model,
'guardrails': {
'global': settings.llm_guardrail_prompt,
'request_interpretation': settings.llm_request_interpreter_guardrail_prompt,
'change_summary': settings.llm_change_summary_guardrail_prompt,
'project_naming': settings.llm_project_naming_guardrail_prompt,
'project_id_naming': settings.llm_project_id_guardrail_prompt,
},
'system_prompts': {
'project_naming': settings.llm_project_naming_system_prompt,
'project_id_naming': settings.llm_project_id_system_prompt,
},
'mediated_tools': settings.llm_tool_allowlist,
'live_tools': settings.llm_live_tool_allowlist,
'live_tool_stage_allowlist': settings.llm_live_tool_stage_allowlist,
'live_tool_stage_tool_map': settings.llm_live_tool_stage_tool_map,
'live_tools_by_stage': live_tool_stages,
'tool_context_limit': settings.llm_tool_context_limit,
'max_tool_call_rounds': settings.llm_max_tool_call_rounds,
'gitea_live_tools_configured': bool(settings.gitea_url and settings.gitea_token),
}

View File

@@ -39,6 +39,7 @@ class AgentOrchestrator:
existing_history=None,
prompt_source_context: dict | None = None,
prompt_routing: dict | None = None,
repo_name_override: str | None = None,
related_issue_hint: dict | None = None,
):
"""Initialize orchestrator."""
@@ -58,6 +59,7 @@ class AgentOrchestrator:
self.prompt_actor = prompt_actor
self.prompt_source_context = prompt_source_context or {}
self.prompt_routing = prompt_routing or {}
self.repo_name_override = repo_name_override
self.existing_history = existing_history
self.changed_files: list[str] = []
self.gitea_api = GiteaAPI(
@@ -68,7 +70,7 @@ class AgentOrchestrator:
)
self.project_root = settings.projects_root / project_id
self.prompt_audit = None
self.repo_name = settings.gitea_repo or self.gitea_api.build_project_repo_name(project_id, project_name)
self.repo_name = settings.gitea_repo or self.gitea_api.build_project_repo_name(project_id, repo_name_override or project_name)
self.repo_owner = settings.gitea_owner
self.repo_url = None
self.branch_name = self._build_pr_branch_name(project_id)
@@ -322,6 +324,10 @@ class AgentOrchestrator:
async def _prepare_git_workspace(self) -> None:
"""Initialize the local repo and ensure the PR branch exists before writing files."""
if not self.git_manager.is_git_available():
self.ui_manager.ui_data.setdefault('git', {})['error'] = 'git executable is not available in PATH'
self._append_log('Local git workspace skipped: git executable is not available in PATH')
return
if not self.git_manager.has_repo():
self.git_manager.init_repo()
@@ -606,6 +612,10 @@ class AgentOrchestrator:
unique_files = list(dict.fromkeys(self.changed_files))
if not unique_files:
return
if not self.git_manager.is_git_available():
self.ui_manager.ui_data.setdefault('git', {})['error'] = 'git executable is not available in PATH'
self._append_log('Git commit skipped: git executable is not available in PATH')
return
try:
if not self.git_manager.has_repo():
@@ -668,7 +678,7 @@ class AgentOrchestrator:
commit_hash=commit_hash,
commit_url=remote_record.get('commit_url') if remote_record else None,
)
except (subprocess.CalledProcessError, FileNotFoundError) as exc:
except (RuntimeError, subprocess.CalledProcessError, FileNotFoundError) as exc:
self.ui_manager.ui_data.setdefault("git", {})["error"] = str(exc)
self._append_log(f"Git commit skipped: {exc}")

View File

@@ -7,8 +7,12 @@ import re
try:
from ..config import settings
from .gitea import GiteaAPI
from .llm_service import LLMServiceClient
except ImportError:
from config import settings
from agents.gitea import GiteaAPI
from agents.llm_service import LLMServiceClient
class RequestInterpreter:
@@ -17,6 +21,15 @@ class RequestInterpreter:
def __init__(self, ollama_url: str | None = None, model: str | None = None):
self.ollama_url = (ollama_url or settings.ollama_url).rstrip('/')
self.model = model or settings.OLLAMA_MODEL
self.llm_client = LLMServiceClient(ollama_url=self.ollama_url, model=self.model)
self.gitea_api = None
if settings.gitea_url and settings.gitea_token:
self.gitea_api = GiteaAPI(
token=settings.GITEA_TOKEN,
base_url=settings.GITEA_URL,
owner=settings.GITEA_OWNER,
repo=settings.GITEA_REPO or '',
)
async def interpret(self, prompt_text: str, context: dict | None = None) -> dict:
"""Interpret free-form text into the request shape expected by the orchestrator."""
@@ -49,48 +62,46 @@ class RequestInterpreter:
f"User prompt:\n{normalized}"
)
try:
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.post(
f'{self.ollama_url}/api/chat',
json={
'model': self.model,
'stream': False,
'format': 'json',
'messages': [
{
'role': 'system',
'content': system_prompt,
},
{'role': 'user', 'content': user_prompt},
content, trace = await self.llm_client.chat_with_trace(
stage='request_interpretation',
system_prompt=system_prompt,
user_prompt=user_prompt,
tool_context_input={
'projects': compact_context.get('projects', []),
'open_issues': [
issue
for project in compact_context.get('projects', [])
for issue in project.get('open_issues', [])
],
'recent_chat_history': compact_context.get('recent_chat_history', []),
},
) as resp:
payload = await resp.json()
if 200 <= resp.status < 300:
content = payload.get('message', {}).get('content', '')
expect_json=True,
)
if content:
try:
parsed = json.loads(content)
interpreted = self._normalize_interpreted_request(parsed, normalized)
routing = self._normalize_routing(parsed.get('routing'), interpreted, compact_context)
return interpreted, {
'stage': 'request_interpretation',
'provider': 'ollama',
'model': self.model,
'system_prompt': system_prompt,
'user_prompt': user_prompt,
'assistant_response': content,
'raw_response': payload,
'routing': routing,
'context_excerpt': compact_context,
'fallback_used': False,
}
naming_trace = None
if routing.get('intent') == 'new_project':
interpreted, routing, naming_trace = await self._refine_new_project_identity(
prompt_text=normalized,
interpreted=interpreted,
routing=routing,
context=compact_context,
)
trace['routing'] = routing
trace['context_excerpt'] = compact_context
if naming_trace is not None:
trace['project_naming'] = naming_trace
return interpreted, trace
except Exception:
pass
interpreted, routing = self._heuristic_fallback(normalized, compact_context)
if routing.get('intent') == 'new_project':
constraints = await self._collect_project_identity_constraints(compact_context)
routing['repo_name'] = self._ensure_unique_repo_name(routing.get('repo_name') or interpreted.get('name') or 'project', constraints['repo_names'])
return interpreted, {
'stage': 'request_interpretation',
'provider': 'heuristic',
@@ -98,12 +109,87 @@ class RequestInterpreter:
'system_prompt': system_prompt,
'user_prompt': user_prompt,
'assistant_response': json.dumps({'request': interpreted, 'routing': routing}),
'raw_response': {'fallback': 'heuristic'},
'raw_response': {'fallback': 'heuristic', 'llm_trace': trace.get('raw_response') if isinstance(trace, dict) else None},
'routing': routing,
'context_excerpt': compact_context,
'guardrails': trace.get('guardrails') if isinstance(trace, dict) else [],
'tool_context': trace.get('tool_context') if isinstance(trace, dict) else [],
'fallback_used': True,
}
async def _refine_new_project_identity(
self,
*,
prompt_text: str,
interpreted: dict,
routing: dict,
context: dict,
) -> tuple[dict, dict, dict | None]:
"""Refine project and repository naming for genuinely new work."""
constraints = await self._collect_project_identity_constraints(context)
user_prompt = (
f"Original user prompt:\n{prompt_text}\n\n"
f"Draft structured request:\n{json.dumps(interpreted, indent=2)}\n\n"
f"Tracked project names to avoid reusing unless the user clearly wants them:\n{json.dumps(sorted(constraints['project_names']))}\n\n"
f"Repository slugs already reserved in tracked projects or Gitea:\n{json.dumps(sorted(constraints['repo_names']))}\n\n"
"Suggest the best project display name and repository slug for this new project."
)
content, trace = await self.llm_client.chat_with_trace(
stage='project_naming',
system_prompt=settings.llm_project_naming_system_prompt,
user_prompt=user_prompt,
tool_context_input={
'projects': context.get('projects', []),
},
expect_json=True,
)
if content:
try:
parsed = json.loads(content)
project_name, repo_name = self._normalize_project_identity(
parsed,
fallback_name=interpreted.get('name') or self._derive_name(prompt_text),
)
repo_name = self._ensure_unique_repo_name(repo_name, constraints['repo_names'])
interpreted['name'] = project_name
routing['project_name'] = project_name
routing['repo_name'] = repo_name
return interpreted, routing, trace
except Exception:
pass
fallback_name = interpreted.get('name') or self._derive_name(prompt_text)
routing['project_name'] = fallback_name
routing['repo_name'] = self._ensure_unique_repo_name(self._derive_repo_name(fallback_name), constraints['repo_names'])
return interpreted, routing, trace
async def _collect_project_identity_constraints(self, context: dict) -> dict[str, set[str]]:
"""Collect reserved project names and repository slugs from tracked state and Gitea."""
project_names: set[str] = set()
repo_names: set[str] = set()
for project in context.get('projects', []):
if project.get('name'):
project_names.add(str(project.get('name')).strip())
repository = project.get('repository') or {}
if repository.get('name'):
repo_names.add(str(repository.get('name')).strip())
repo_names.update(await self._load_remote_repo_names())
return {
'project_names': project_names,
'repo_names': repo_names,
}
async def _load_remote_repo_names(self) -> set[str]:
"""Load current Gitea repository names when live credentials are available."""
if settings.gitea_repo:
return {settings.gitea_repo}
if self.gitea_api is None or not settings.gitea_owner:
return set()
repos = await self.gitea_api.list_repositories(owner=settings.gitea_owner)
if not isinstance(repos, list):
return set()
return {str(repo.get('name')).strip() for repo in repos if repo.get('name')}
def _normalize_interpreted_request(self, interpreted: dict, original_prompt: str) -> dict:
"""Normalize LLM output into the required request shape."""
request_payload = interpreted.get('request') if isinstance(interpreted.get('request'), dict) else interpreted
@@ -164,14 +250,18 @@ class RequestInterpreter:
matched_project = project
break
intent = str(routing.get('intent') or '').strip() or ('continue_project' if matched_project else 'new_project')
return {
normalized = {
'intent': intent,
'project_id': matched_project.get('project_id') if matched_project else project_id,
'project_name': matched_project.get('name') if matched_project else (project_name or interpreted.get('name')),
'repo_name': routing.get('repo_name') if intent == 'new_project' else None,
'issue_number': issue_number,
'confidence': routing.get('confidence') or ('medium' if matched_project else 'low'),
'reasoning_summary': routing.get('reasoning_summary') or ('Matched prior project context' if matched_project else 'No strong prior project match found'),
}
if normalized['intent'] == 'new_project' and not normalized['repo_name']:
normalized['repo_name'] = self._derive_repo_name(normalized['project_name'] or interpreted.get('name') or 'Generated Project')
return normalized
def _normalize_list(self, value) -> list[str]:
if isinstance(value, list):
@@ -183,10 +273,65 @@ class RequestInterpreter:
def _derive_name(self, prompt_text: str) -> str:
"""Derive a stable project name when the LLM does not provide one."""
first_line = prompt_text.splitlines()[0].strip()
quoted = re.search(r'["\']([^"\']{3,80})["\']', first_line)
if quoted:
return self._humanize_name(quoted.group(1))
noun_phrase = re.search(
r'(?:build|create|start|make|develop|generate|design|need|want)\s+'
r'(?:me\s+|us\s+|an?\s+|the\s+|new\s+|internal\s+|simple\s+|lightweight\s+|modern\s+|web\s+|mobile\s+)*'
r'([a-z0-9][a-z0-9\s-]{2,80}?(?:portal|dashboard|app|application|service|tool|system|platform|api|bot|assistant|website|site|workspace|tracker|manager))\b',
first_line,
flags=re.IGNORECASE,
)
if noun_phrase:
return self._humanize_name(noun_phrase.group(1))
cleaned = re.sub(r'[^A-Za-z0-9 ]+', ' ', first_line)
words = [word.capitalize() for word in cleaned.split()[:4]]
stopwords = {
'build', 'create', 'start', 'make', 'develop', 'generate', 'design', 'need', 'want', 'please', 'for', 'our', 'with', 'that', 'this',
'new', 'internal', 'simple', 'modern', 'web', 'mobile', 'app', 'application', 'tool', 'system',
}
tokens = [word for word in cleaned.split() if word and word.lower() not in stopwords]
if tokens:
return self._humanize_name(' '.join(tokens[:4]))
return 'Generated Project'
def _humanize_name(self, raw_name: str) -> str:
"""Normalize a candidate project name into a readable title."""
cleaned = re.sub(r'[^A-Za-z0-9\s-]+', ' ', raw_name).strip(' -')
cleaned = re.sub(r'\s+', ' ', cleaned)
special_upper = {'api', 'crm', 'erp', 'cms', 'hr', 'it', 'ui', 'qa'}
words = []
for word in cleaned.split()[:6]:
lowered = word.lower()
words.append(lowered.upper() if lowered in special_upper else lowered.capitalize())
return ' '.join(words) or 'Generated Project'
def _derive_repo_name(self, project_name: str) -> str:
"""Derive a repository slug from a human-readable project name."""
preferred = (project_name or 'project').strip().lower().replace(' ', '-')
sanitized = ''.join(ch if ch.isalnum() or ch in {'-', '_'} else '-' for ch in preferred)
while '--' in sanitized:
sanitized = sanitized.replace('--', '-')
return sanitized.strip('-') or 'project'
def _ensure_unique_repo_name(self, repo_name: str, reserved_names: set[str]) -> str:
"""Choose a repository slug that does not collide with tracked or remote repositories."""
base_name = self._derive_repo_name(repo_name)
if base_name not in reserved_names:
return base_name
suffix = 2
while f'{base_name}-{suffix}' in reserved_names:
suffix += 1
return f'{base_name}-{suffix}'
def _normalize_project_identity(self, payload: dict, fallback_name: str) -> tuple[str, str]:
"""Normalize model-proposed project and repository naming."""
project_name = self._humanize_name(str(payload.get('project_name') or payload.get('name') or fallback_name))
repo_name = self._derive_repo_name(str(payload.get('repo_name') or project_name))
return project_name, repo_name
def _heuristic_fallback(self, prompt_text: str, context: dict | None = None) -> tuple[dict, dict]:
"""Fallback request extraction when Ollama is unavailable."""
lowered = prompt_text.lower()
@@ -239,6 +384,7 @@ class RequestInterpreter:
'intent': intent,
'project_id': matched_project.get('project_id') if matched_project else None,
'project_name': matched_project.get('name') if matched_project else self._derive_name(prompt_text),
'repo_name': None if matched_project else self._derive_repo_name(self._derive_name(prompt_text)),
'issue_number': issue_number,
'confidence': 'medium' if matched_project or explicit_new else 'low',
'reasoning_summary': 'Heuristic routing from chat history and project names.',

View File

@@ -1,5 +1,6 @@
"""Configuration settings for AI Software Factory."""
import json
import os
from typing import Optional
from pathlib import Path
@@ -24,6 +25,34 @@ class Settings(BaseSettings):
# Ollama settings computed from environment
OLLAMA_URL: str = "http://ollama:11434"
OLLAMA_MODEL: str = "llama3"
LLM_GUARDRAIL_PROMPT: str = (
"You are operating inside AI Software Factory. Follow the requested schema exactly, "
"treat provided tool outputs as authoritative, and do not invent repositories, issues, pull requests, or delivery facts."
)
LLM_REQUEST_INTERPRETER_GUARDRAIL_PROMPT: str = (
"For routing and request interpretation: never select archived projects, prefer tracked project IDs from tool outputs, and only reference issues that are explicit in the prompt or available tool data."
)
LLM_CHANGE_SUMMARY_GUARDRAIL_PROMPT: str = (
"For summaries: only describe facts present in the provided context and tool outputs. Never claim a repository, commit, or pull request exists unless it is present in the supplied data."
)
LLM_PROJECT_NAMING_GUARDRAIL_PROMPT: str = (
"For project naming: prefer clear, product-like names and repository slugs that match the user's intent. Avoid reusing tracked project identities unless the request is clearly asking for an existing project."
)
LLM_PROJECT_NAMING_SYSTEM_PROMPT: str = (
"You name newly requested software projects. Return only JSON with keys project_name, repo_name, and rationale. Project names should be concise human-readable titles. Repo names should be lowercase kebab-case slugs suitable for a Gitea repository name."
)
LLM_PROJECT_ID_GUARDRAIL_PROMPT: str = (
"For project ids: produce short stable slugs for newly created projects. Avoid collisions with known project ids and keep ids lowercase with hyphens."
)
LLM_PROJECT_ID_SYSTEM_PROMPT: str = (
"You derive stable project ids for new projects. Return only JSON with keys project_id and rationale. project_id must be a short lowercase kebab-case slug without spaces."
)
LLM_TOOL_ALLOWLIST: str = "gitea_project_catalog,gitea_project_state,gitea_project_issues,gitea_pull_requests"
LLM_TOOL_CONTEXT_LIMIT: int = 5
LLM_LIVE_TOOL_ALLOWLIST: str = "gitea_lookup_issue,gitea_lookup_pull_request"
LLM_LIVE_TOOL_STAGE_ALLOWLIST: str = "request_interpretation,change_summary"
LLM_LIVE_TOOL_STAGE_TOOL_MAP: str = ""
LLM_MAX_TOOL_CALL_ROUNDS: int = 1
# Gitea settings
GITEA_URL: str = "https://gitea.yourserver.com"
@@ -131,6 +160,97 @@ class Settings(BaseSettings):
"""Get Ollama URL with trimmed whitespace."""
return self.OLLAMA_URL.strip()
@property
def llm_guardrail_prompt(self) -> str:
"""Get the global guardrail prompt used for all external LLM calls."""
return self.LLM_GUARDRAIL_PROMPT.strip()
@property
def llm_request_interpreter_guardrail_prompt(self) -> str:
"""Get the request-interpretation specific guardrail prompt."""
return self.LLM_REQUEST_INTERPRETER_GUARDRAIL_PROMPT.strip()
@property
def llm_change_summary_guardrail_prompt(self) -> str:
"""Get the change-summary specific guardrail prompt."""
return self.LLM_CHANGE_SUMMARY_GUARDRAIL_PROMPT.strip()
@property
def llm_project_naming_guardrail_prompt(self) -> str:
"""Get the project-naming specific guardrail prompt."""
return self.LLM_PROJECT_NAMING_GUARDRAIL_PROMPT.strip()
@property
def llm_project_naming_system_prompt(self) -> str:
"""Get the project-naming system prompt."""
return self.LLM_PROJECT_NAMING_SYSTEM_PROMPT.strip()
@property
def llm_project_id_guardrail_prompt(self) -> str:
"""Get the project-id naming specific guardrail prompt."""
return self.LLM_PROJECT_ID_GUARDRAIL_PROMPT.strip()
@property
def llm_project_id_system_prompt(self) -> str:
"""Get the project-id naming system prompt."""
return self.LLM_PROJECT_ID_SYSTEM_PROMPT.strip()
@property
def llm_tool_allowlist(self) -> list[str]:
"""Get the allowed LLM tool names as a normalized list."""
return [item.strip() for item in self.LLM_TOOL_ALLOWLIST.split(',') if item.strip()]
@property
def llm_tool_context_limit(self) -> int:
"""Get the number of items to expose per mediated tool payload."""
return max(int(self.LLM_TOOL_CONTEXT_LIMIT), 1)
@property
def llm_live_tool_allowlist(self) -> list[str]:
"""Get the allowed live tool-call names for model-driven lookup requests."""
return [item.strip() for item in self.LLM_LIVE_TOOL_ALLOWLIST.split(',') if item.strip()]
@property
def llm_live_tool_stage_allowlist(self) -> list[str]:
"""Get the LLM stages where live tool requests are enabled."""
return [item.strip() for item in self.LLM_LIVE_TOOL_STAGE_ALLOWLIST.split(',') if item.strip()]
@property
def llm_live_tool_stage_tool_map(self) -> dict[str, list[str]]:
"""Get an optional per-stage live tool map that overrides the simple stage allowlist."""
raw = (self.LLM_LIVE_TOOL_STAGE_TOOL_MAP or '').strip()
if not raw:
return {}
try:
parsed = json.loads(raw)
except Exception:
return {}
if not isinstance(parsed, dict):
return {}
allowed_tools = set(self.llm_live_tool_allowlist)
normalized: dict[str, list[str]] = {}
for stage, tools in parsed.items():
if not isinstance(stage, str):
continue
if not isinstance(tools, list):
continue
normalized[stage.strip()] = [str(tool).strip() for tool in tools if str(tool).strip() in allowed_tools]
return normalized
def llm_live_tools_for_stage(self, stage: str) -> list[str]:
"""Return live tools enabled for a specific LLM stage."""
stage_map = self.llm_live_tool_stage_tool_map
if stage_map:
return stage_map.get(stage, [])
if stage not in set(self.llm_live_tool_stage_allowlist):
return []
return self.llm_live_tool_allowlist
@property
def llm_max_tool_call_rounds(self) -> int:
"""Get the maximum number of model-driven live tool-call rounds per LLM request."""
return max(int(self.LLM_MAX_TOOL_CALL_ROUNDS), 0)
@property
def gitea_url(self) -> str:
"""Get Gitea URL with trimmed whitespace."""

View File

@@ -16,6 +16,7 @@ _last_background_repo_sync_at = 0.0
try:
from .agents.database_manager import DatabaseManager
from .agents.gitea import GiteaAPI
from .agents.llm_service import LLMServiceClient
from .agents.n8n_setup import N8NSetupAgent
from .agents.prompt_workflow import PromptWorkflowManager
from .agents.telegram import TelegramHandler
@@ -24,6 +25,7 @@ try:
except ImportError:
from agents.database_manager import DatabaseManager
from agents.gitea import GiteaAPI
from agents.llm_service import LLMServiceClient
from agents.n8n_setup import N8NSetupAgent
from agents.prompt_workflow import PromptWorkflowManager
from agents.telegram import TelegramHandler
@@ -510,6 +512,22 @@ def _render_n8n_error_dialog(result: dict) -> None:
dialog.open()
def _render_confirmation_dialog(title: str, message: str, confirm_label: str, on_confirm, color: str = 'negative') -> None:
"""Render a reusable confirmation dialog for destructive or stateful actions."""
with ui.dialog() as dialog, ui.card().classes('factory-panel q-pa-lg').style('max-width: 640px; width: min(92vw, 640px);'):
ui.label(title).style('font-size: 1.2rem; font-weight: 800; color: #5c2d1f;')
ui.label(message).classes('factory-muted')
def _confirm() -> None:
dialog.close()
on_confirm()
with ui.row().classes('justify-end w-full q-mt-md gap-2'):
ui.button('Cancel', on_click=dialog.close).props('outline color=dark')
ui.button(confirm_label, on_click=_confirm).props(f'unelevated color={color}')
dialog.open()
def _render_health_panels() -> None:
"""Render application and n8n health panels."""
runtime = get_database_runtime_summary()
@@ -609,15 +627,15 @@ def create_dashboard():
def _store_llm_stage(event) -> None:
app.storage.user[llm_stage_filter_key] = event.value or ''
dashboard_body.refresh()
_refresh_llm_filtered_sections()
def _store_llm_model(event) -> None:
app.storage.user[llm_model_filter_key] = event.value or ''
dashboard_body.refresh()
_refresh_llm_filtered_sections()
def _store_llm_search(event) -> None:
app.storage.user[llm_search_filter_key] = event.value or ''
dashboard_body.refresh()
_refresh_llm_filtered_sections()
def _selected_commit_lookup() -> str:
return app.storage.user.get(commit_lookup_key, '')
@@ -630,7 +648,7 @@ def create_dashboard():
def _store_branch_scope(event) -> None:
app.storage.user[branch_scope_filter_key] = event.value or ''
dashboard_body.refresh()
_refresh_timeline_sections()
def _selected_repo_owner() -> str:
return app.storage.user.get(repo_owner_key, settings.gitea_owner or '')
@@ -680,7 +698,7 @@ def create_dashboard():
)
_set_discovered_repositories(resolved)
ui.notify(f'Discovered {len(resolved)} repositories in {owner}', color='positive')
dashboard_body.refresh()
_refresh_system_sections()
async def onboard_repository_action(owner: str, repo_name: str) -> None:
if not settings.gitea_url or not settings.gitea_token:
@@ -710,7 +728,7 @@ def create_dashboard():
)
await discover_gitea_repositories_action()
ui.notify(f'Onboarded {owner}/{repo_name}', color='positive')
dashboard_body.refresh()
_refresh_all_dashboard_sections()
def sync_project_repository_action(project_id: str) -> None:
if not settings.gitea_url or not settings.gitea_token:
@@ -722,6 +740,13 @@ def create_dashboard():
return
with closing(db):
manager = DatabaseManager(db)
history = manager.get_project_by_id(project_id)
if history is None:
ui.notify('Project not found', color='negative')
return
if history.status == 'archived':
ui.notify('Archived projects cannot be synced', color='negative')
return
gitea_api = GiteaAPI(
token=settings.GITEA_TOKEN,
base_url=settings.GITEA_URL,
@@ -735,7 +760,7 @@ def create_dashboard():
)
manager.sync_repository_issues(project_id=project_id, gitea_api=gitea_api, state='open')
ui.notify(result.get('message', 'Repository sync finished'), color='positive' if result.get('status') == 'success' else 'negative')
dashboard_body.refresh()
_refresh_all_dashboard_sections()
async def setup_n8n_workflow_action() -> None:
api_url = _resolve_n8n_api_url()
@@ -762,7 +787,7 @@ def create_dashboard():
if result.get('status') == 'error':
_render_n8n_error_dialog(result)
ui.notify(result.get('message', 'n8n setup finished'), color='positive' if result.get('status') == 'success' else 'negative')
dashboard_body.refresh()
_refresh_all_dashboard_sections()
async def send_telegram_prompt_guide_action() -> None:
if not settings.telegram_bot_token:
@@ -790,12 +815,12 @@ def create_dashboard():
)
ui.notify(result.get('message', 'Telegram message sent'), color='positive' if result.get('status') == 'success' else 'negative')
dashboard_body.refresh()
_refresh_health_sections()
def init_db_action() -> None:
result = init_db()
ui.notify(result.get('message', 'Database initialized'), color='positive' if result.get('status') == 'success' else 'negative')
dashboard_body.refresh()
_refresh_all_dashboard_sections()
async def undo_prompt_action(project_id: str, prompt_id: int) -> None:
db = get_db_sync()
@@ -805,33 +830,96 @@ def create_dashboard():
with closing(db):
result = await PromptWorkflowManager(db).undo_prompt(project_id=project_id, prompt_id=prompt_id)
ui.notify(result.get('message', 'Prompt reverted') if result.get('status') != 'success' else 'Prompt changes reverted', color='positive' if result.get('status') == 'success' else 'negative')
dashboard_body.refresh()
_refresh_all_dashboard_sections()
@ui.refreshable
def dashboard_body() -> None:
snapshot = _load_dashboard_snapshot()
if snapshot.get('error'):
with ui.card().classes('factory-panel w-full max-w-4xl mx-auto q-pa-xl'):
ui.label('Dashboard unavailable').style('font-size: 1.5rem; font-weight: 700; color: #5c2d1f;')
ui.label(snapshot['error']).classes('factory-muted')
ui.button('Initialize Database', on_click=init_db_action).props('unelevated')
def archive_project_action(project_id: str) -> None:
db = get_db_sync()
if db is None:
ui.notify('Database session could not be created', color='negative')
return
with closing(db):
result = DatabaseManager(db).archive_project(project_id)
ui.notify(result.get('message', 'Project archived'), color='positive' if result.get('status') == 'success' else 'negative')
_refresh_all_dashboard_sections()
summary = snapshot['summary']
projects = snapshot['projects']
correlations = snapshot['correlations']
system_logs = snapshot['system_logs']
def unarchive_project_action(project_id: str) -> None:
db = get_db_sync()
if db is None:
ui.notify('Database session could not be created', color='negative')
return
with closing(db):
result = DatabaseManager(db).unarchive_project(project_id)
ui.notify(result.get('message', 'Project restored'), color='positive' if result.get('status') == 'success' else 'negative')
_refresh_all_dashboard_sections()
def delete_project_action(project_id: str) -> None:
db = get_db_sync()
if db is None:
ui.notify('Database session could not be created', color='negative')
return
with closing(db):
manager = DatabaseManager(db)
audit_data = manager.get_project_audit_data(project_id)
if audit_data.get('project') is None:
ui.notify('Project not found', color='negative')
return
repository = audit_data.get('repository') or audit_data['project'].get('repository') or {}
remote_delete = None
if repository and repository.get('mode') != 'shared' and repository.get('owner') and repository.get('name') and settings.gitea_url and settings.gitea_token:
gitea_api = GiteaAPI(token=settings.GITEA_TOKEN, base_url=settings.GITEA_URL, owner=settings.GITEA_OWNER, repo=settings.GITEA_REPO or '')
remote_delete = gitea_api.delete_repo_sync(owner=repository.get('owner'), repo=repository.get('name'))
if remote_delete.get('error') and remote_delete.get('status_code') not in {404, None}:
ui.notify(remote_delete.get('error', 'Remote repository deletion failed'), color='negative')
return
result = manager.delete_project(project_id)
message = result.get('message', 'Project deleted')
if remote_delete and not remote_delete.get('error'):
message = f"{message}; remote repository deleted"
ui.notify(message, color='positive' if result.get('status') == 'success' else 'negative')
_refresh_all_dashboard_sections()
dashboard_state: dict = {}
def _load_dashboard_view_model() -> dict:
snapshot = _load_dashboard_snapshot()
llm_runtime = LLMServiceClient().get_runtime_configuration()
llm_stage_filter = _selected_llm_stage()
llm_model_filter = _selected_llm_model()
llm_search_filter = _selected_llm_search()
branch_scope_filter = _selected_branch_scope()
commit_lookup_query = _selected_commit_lookup()
commit_context = _load_commit_context(commit_lookup_query, branch_scope_filter) if commit_lookup_query else None
discovered_repositories = _get_discovered_repositories()
if snapshot.get('error'):
return {
'error': snapshot['error'],
'llm_runtime': llm_runtime,
'llm_stage_filter': llm_stage_filter,
'llm_model_filter': llm_model_filter,
'llm_search_filter': llm_search_filter,
'branch_scope_filter': branch_scope_filter,
'commit_lookup_query': commit_lookup_query,
'discovered_repositories': discovered_repositories,
}
projects = snapshot['projects']
all_llm_traces = [trace for project_bundle in projects for trace in project_bundle.get('llm_traces', [])]
llm_stage_options = [''] + sorted({trace.get('stage') for trace in all_llm_traces if trace.get('stage')})
llm_model_options = [''] + sorted({trace.get('model') for trace in all_llm_traces if trace.get('model')})
project_repository_map = {
return {
'snapshot': snapshot,
'summary': snapshot['summary'],
'projects': projects,
'archived_projects': snapshot.get('archived_projects', []),
'correlations': snapshot['correlations'],
'system_logs': snapshot['system_logs'],
'llm_runtime': llm_runtime,
'llm_stage_filter': llm_stage_filter,
'llm_model_filter': llm_model_filter,
'llm_search_filter': llm_search_filter,
'branch_scope_filter': branch_scope_filter,
'commit_lookup_query': commit_lookup_query,
'commit_context': _load_commit_context(commit_lookup_query, branch_scope_filter) if commit_lookup_query else None,
'discovered_repositories': discovered_repositories,
'llm_stage_options': [''] + sorted({trace.get('stage') for trace in all_llm_traces if trace.get('stage')}),
'llm_model_options': [''] + sorted({trace.get('model') for trace in all_llm_traces if trace.get('model')}),
'project_repository_map': {
project_bundle['project']['project_id']: {
'project_name': project_bundle['project']['project_name'],
'repository': project_bundle.get('repository') or project_bundle['project'].get('repository'),
@@ -840,23 +928,48 @@ def create_dashboard():
}
for project_bundle in projects
if project_bundle.get('project')
},
}
with ui.column().classes('factory-shell w-full gap-4 q-pa-lg'):
def _update_dashboard_state() -> None:
dashboard_state.clear()
dashboard_state.update(_load_dashboard_view_model())
def _view_model() -> dict:
if not dashboard_state:
_update_dashboard_state()
return dashboard_state
def _render_dashboard_unavailable(message: str) -> None:
with ui.card().classes('factory-panel w-full max-w-4xl mx-auto q-pa-xl'):
ui.label('Dashboard unavailable').style('font-size: 1.5rem; font-weight: 700; color: #5c2d1f;')
ui.label(message).classes('factory-muted')
ui.button('Initialize Database', on_click=init_db_action).props('unelevated')
@ui.refreshable
def render_header() -> None:
with ui.card().classes('factory-panel w-full q-pa-lg'):
with ui.row().classes('items-center justify-between w-full'):
with ui.column().classes('gap-1'):
ui.label('AI Software Factory').style('font-size: 2.3rem; font-weight: 800; color: #302116;')
ui.label('Operational dashboard with project audit, prompt traces, and n8n controls.').classes('factory-muted')
with ui.row().classes('items-center gap-2'):
ui.button('Refresh', on_click=dashboard_body.refresh).props('outline')
ui.button('Refresh', on_click=_refresh_current_dashboard_sections).props('outline')
ui.button('Initialize DB', on_click=init_db_action).props('unelevated color=dark')
ui.button('Provision n8n Workflow', on_click=setup_n8n_workflow_action).props('unelevated color=accent')
ui.button('Message Prompt Channel', on_click=send_telegram_prompt_guide_action).props('outline color=secondary')
@ui.refreshable
def render_metrics() -> None:
view_model = _view_model()
if view_model.get('error'):
_render_dashboard_unavailable(view_model['error'])
return
summary = view_model['summary']
with ui.grid(columns=4).classes('w-full gap-4'):
metrics = [
('Projects', summary['total_projects'], 'Tracked generation requests'),
('Archived', summary.get('archived_projects', 0), 'Excluded from active automation'),
('Completed', summary['completed_projects'], 'Finished project runs'),
('Prompts', summary['prompt_events'], 'Recorded originating prompts'),
('Open PRs', summary['open_pull_requests'], 'Unmerged review branches'),
@@ -867,18 +980,14 @@ def create_dashboard():
ui.label(str(value)).style('font-size: 2.1rem; font-weight: 800; margin-top: 6px;')
ui.label(subtitle).style('font-size: 0.9rem; opacity: 0.78; margin-top: 8px;')
selected_tab = _selected_tab_name()
with ui.tabs(value=selected_tab, on_change=_store_selected_tab).classes('w-full') as tabs:
ui.tab('Overview').props('name=overview')
ui.tab('Projects').props('name=projects')
ui.tab('Prompt Trace').props('name=trace')
ui.tab('Compare').props('name=compare')
ui.tab('Timeline').props('name=timeline')
ui.tab('System').props('name=system')
ui.tab('Health').props('name=health')
with ui.tab_panels(tabs, value=selected_tab).classes('w-full'):
with ui.tab_panel('overview'):
@ui.refreshable
def render_overview_panel() -> None:
view_model = _view_model()
if view_model.get('error'):
_render_dashboard_unavailable(view_model['error'])
return
projects = view_model['projects']
summary = view_model['summary']
with ui.grid(columns=2).classes('w-full gap-4'):
with ui.card().classes('factory-panel q-pa-lg'):
ui.label('Project Pipeline').style('font-size: 1.25rem; font-weight: 700; color: #3a281a;')
@@ -908,13 +1017,39 @@ def create_dashboard():
ui.label(label).classes('factory-muted')
ui.label(value).style('font-weight: 600; color: #3a281a;')
with ui.tab_panel('projects'):
@ui.refreshable
def render_projects_panel() -> None:
view_model = _view_model()
if view_model.get('error'):
_render_dashboard_unavailable(view_model['error'])
return
projects = view_model['projects']
if not projects:
with ui.card().classes('factory-panel q-pa-lg'):
ui.label('No project data available yet.').classes('factory-muted')
for project_bundle in projects:
project = project_bundle['project']
with ui.expansion(f"{project['project_name']} · {project['status']}", icon='folder').classes('factory-panel w-full q-mb-md'):
with ui.row().classes('items-center gap-2 q-pa-md'):
ui.button(
'Archive',
on_click=lambda _=None, project_id=project['project_id'], project_name=project['project_name']: _render_confirmation_dialog(
'Archive project?',
f'Archive {project_name}? Archived projects remain visible in the dashboard but are excluded from automation, Telegram routing, sync, and undo actions.',
'Archive',
lambda: archive_project_action(project_id),
color='warning',
),
).props('outline color=warning')
ui.button(
'Delete',
on_click=lambda _=None, project_id=project['project_id'], project_name=project['project_name']: _render_confirmation_dialog(
'Delete project permanently?',
f'Delete {project_name}? This removes the local project directory, project traces from the database, and any project-owned remote repository.',
'Delete Permanently',
lambda: delete_project_action(project_id),
),
).props('outline color=negative')
with ui.grid(columns=2).classes('w-full gap-4 q-pa-md'):
with ui.card().classes('q-pa-md'):
ui.label('Repository').style('font-weight: 700; color: #3a281a;')
@@ -924,15 +1059,67 @@ def create_dashboard():
on_click=lambda _=None, project_id=project['project_id']: sync_project_repository_action(project_id),
).props('outline color=secondary').classes('q-mt-md')
@ui.refreshable
def render_archived_panel() -> None:
view_model = _view_model()
if view_model.get('error'):
_render_dashboard_unavailable(view_model['error'])
return
archived_projects = view_model['archived_projects']
llm_stage_filter = view_model['llm_stage_filter']
llm_model_filter = view_model['llm_model_filter']
llm_search_filter = view_model['llm_search_filter']
if not archived_projects:
with ui.card().classes('factory-panel q-pa-lg'):
ui.label('No archived projects yet.').classes('factory-muted')
for project_bundle in archived_projects:
project = project_bundle['project']
with ui.expansion(f"{project['project_name']} · archived", icon='archive').classes('factory-panel w-full q-mb-md'):
with ui.row().classes('items-center gap-2 q-pa-md'):
ui.button(
'Restore',
on_click=lambda _=None, project_id=project['project_id'], project_name=project['project_name']: _render_confirmation_dialog(
'Restore archived project?',
f'Restore {project_name} to the active project set so the factory can work on it again?',
'Restore Project',
lambda: unarchive_project_action(project_id),
color='positive',
),
).props('outline color=positive')
ui.button(
'Delete Permanently',
on_click=lambda _=None, project_id=project['project_id'], project_name=project['project_name']: _render_confirmation_dialog(
'Delete archived project permanently?',
f'Delete {project_name}? This removes the archived project from both the database and filesystem, and deletes any project-owned remote repository.',
'Delete Permanently',
lambda: delete_project_action(project_id),
),
).props('outline color=negative')
with ui.grid(columns=2).classes('w-full gap-4 q-pa-md'):
with ui.card().classes('q-pa-md'):
ui.label('Repository').style('font-weight: 700; color: #3a281a;')
_render_repository_block(project_bundle.get('repository') or project.get('repository'))
with ui.card().classes('q-pa-md'):
ui.label('Prompt').style('font-weight: 700; color: #3a281a;')
prompts = project_bundle.get('prompts', [])
if prompts:
ui.label(prompts[0]['prompt_text']).classes('factory-code')
else:
ui.label('No prompt recorded.').classes('factory-muted')
with ui.grid(columns=2).classes('w-full gap-4 q-pa-md'):
with ui.card().classes('q-pa-md'):
ui.label('Git Commits').style('font-weight: 700; color: #3a281a;')
_render_commit_list(project_bundle.get('commits', []))
with ui.card().classes('q-pa-md'):
ui.label('Tracked Issues').style('font-weight: 700; color: #3a281a;')
_render_issue_list(project_bundle.get('issues', []))
with ui.card().classes('q-pa-md'):
ui.label('Repository Sync').style('font-weight: 700; color: #3a281a;')
_render_repository_sync_block(project_bundle.get('repository_sync') or project.get('repository_sync'))
with ui.card().classes('q-pa-md'):
ui.label('Pull Request').style('font-weight: 700; color: #3a281a;')
open_pr = next((pr for pr in project_bundle.get('pull_requests', []) if pr.get('pr_state') == 'open' and not pr.get('merged')), None)
_render_pull_request_block(open_pr)
with ui.card().classes('q-pa-md'):
ui.label('Prompt').style('font-weight: 700; color: #3a281a;')
prompts = project_bundle.get('prompts', [])
@@ -945,25 +1132,20 @@ def create_dashboard():
ui.label(prompt['prompt_text']).classes('factory-code')
else:
ui.label('No prompt recorded.').classes('factory-muted')
with ui.grid(columns=1).classes('w-full gap-4 q-pa-md'):
with ui.card().classes('q-pa-md'):
ui.label('Generated Changes').style('font-weight: 700; color: #3a281a;')
_render_change_list(project_bundle.get('code_changes', []))
with ui.card().classes('q-pa-md'):
ui.label('Tracked Issues').style('font-weight: 700; color: #3a281a;')
_render_issue_list(project_bundle.get('issues', []))
with ui.grid(columns=2).classes('w-full gap-4 q-pa-md'):
with ui.card().classes('q-pa-md'):
ui.label('Git Commits').style('font-weight: 700; color: #3a281a;')
_render_commit_list(project_bundle.get('commits', []))
with ui.card().classes('q-pa-md'):
ui.label('LLM Trace').style('font-weight: 700; color: #3a281a;')
_render_llm_traces(_filter_llm_traces(project_bundle.get('llm_traces', []), llm_stage_filter, llm_model_filter, llm_search_filter))
with ui.card().classes('q-pa-md'):
ui.label('Recent Logs').style('font-weight: 700; color: #3a281a;')
logs = project_bundle.get('logs', [])[:6]
@@ -972,12 +1154,10 @@ def create_dashboard():
ui.markdown(f"- {log['timestamp'] or 'n/a'} · {log['level']} · {log['message']}")
else:
ui.label('No project logs yet.').classes('factory-muted')
with ui.grid(columns=1).classes('w-full gap-4 q-pa-md'):
with ui.card().classes('q-pa-md'):
ui.label('Issue Work').style('font-weight: 700; color: #3a281a;')
_render_issue_work_events(project_bundle.get('issue_work', []))
with ui.card().classes('q-pa-md'):
ui.label('Audit Trail').style('font-weight: 700; color: #3a281a;')
audits = project_bundle.get('audit_trail', [])[:6]
@@ -987,28 +1167,26 @@ def create_dashboard():
else:
ui.label('No audit events yet.').classes('factory-muted')
with ui.tab_panel('trace'):
@ui.refreshable
def render_trace_panel() -> None:
view_model = _view_model()
if view_model.get('error'):
_render_dashboard_unavailable(view_model['error'])
return
correlations = view_model['correlations']
project_repository_map = view_model['project_repository_map']
llm_stage_options = view_model['llm_stage_options']
llm_model_options = view_model['llm_model_options']
llm_stage_filter = view_model['llm_stage_filter']
llm_model_filter = view_model['llm_model_filter']
llm_search_filter = view_model['llm_search_filter']
with ui.card().classes('factory-panel q-pa-lg'):
ui.label('Prompt to Code Correlation').style('font-size: 1.25rem; font-weight: 700; color: #3a281a;')
ui.label('Each prompt entry is linked to the generated files recorded after that prompt for the same project.').classes('factory-muted')
with ui.row().classes('items-center gap-3 q-mt-md w-full'):
ui.select(
options=llm_stage_options,
value=llm_stage_filter,
on_change=_store_llm_stage,
label='LLM stage',
).classes('min-w-[12rem]')
ui.select(
options=llm_model_options,
value=llm_model_filter,
on_change=_store_llm_model,
label='LLM model',
).classes('min-w-[12rem]')
ui.input(
label='Search trace text',
value=llm_search_filter,
on_change=_store_llm_search,
).classes('min-w-[18rem]')
ui.select(options=llm_stage_options, value=llm_stage_filter, on_change=_store_llm_stage, label='LLM stage').classes('min-w-[12rem]')
ui.select(options=llm_model_options, value=llm_model_filter, on_change=_store_llm_model, label='LLM model').classes('min-w-[12rem]')
ui.input(label='Search trace text', value=llm_search_filter, on_change=_store_llm_search).classes('min-w-[18rem]')
if correlations:
for correlation in correlations:
correlation_project = project_repository_map.get(correlation['project_id'], {})
@@ -1031,35 +1209,30 @@ def create_dashboard():
else:
ui.label('No prompt traces recorded yet.').classes('factory-muted')
with ui.tab_panel('compare'):
@ui.refreshable
def render_compare_panel() -> None:
view_model = _view_model()
if view_model.get('error'):
_render_dashboard_unavailable(view_model['error'])
return
correlations = view_model['correlations']
project_repository_map = view_model['project_repository_map']
llm_stage_options = view_model['llm_stage_options']
llm_model_options = view_model['llm_model_options']
llm_stage_filter = view_model['llm_stage_filter']
llm_model_filter = view_model['llm_model_filter']
llm_search_filter = view_model['llm_search_filter']
with ui.card().classes('factory-panel q-pa-lg'):
ui.label('Prompt Compare View').style('font-size: 1.25rem; font-weight: 700; color: #3a281a;')
ui.label('Review one prompt at a time as a complete change set: repo diagnostics, commit links, and file-level diffs in one place.').classes('factory-muted')
with ui.row().classes('items-center gap-3 q-mt-md w-full'):
ui.select(
options=llm_stage_options,
value=llm_stage_filter,
on_change=_store_llm_stage,
label='LLM stage',
).classes('min-w-[12rem]')
ui.select(
options=llm_model_options,
value=llm_model_filter,
on_change=_store_llm_model,
label='LLM model',
).classes('min-w-[12rem]')
ui.input(
label='Search trace text',
value=llm_search_filter,
on_change=_store_llm_search,
).classes('min-w-[18rem]')
ui.select(options=llm_stage_options, value=llm_stage_filter, on_change=_store_llm_stage, label='LLM stage').classes('min-w-[12rem]')
ui.select(options=llm_model_options, value=llm_model_filter, on_change=_store_llm_model, label='LLM model').classes('min-w-[12rem]')
ui.input(label='Search trace text', value=llm_search_filter, on_change=_store_llm_search).classes('min-w-[18rem]')
if correlations:
for correlation in correlations:
correlation_project = project_repository_map.get(correlation['project_id'], {})
correlation = {
**correlation,
'llm_traces': _filter_llm_traces(correlation.get('llm_traces', []), llm_stage_filter, llm_model_filter, llm_search_filter),
}
filtered_correlation = {**correlation, 'llm_traces': _filter_llm_traces(correlation.get('llm_traces', []), llm_stage_filter, llm_model_filter, llm_search_filter)}
with ui.card().classes('q-pa-md q-mt-md'):
ui.label(correlation_project.get('project_name') or correlation['project_id']).style('font-size: 1rem; font-weight: 700; color: #2f241d;')
_render_repository_block(correlation_project.get('repository'))
@@ -1074,28 +1247,27 @@ def create_dashboard():
'Undo This Prompt',
on_click=lambda _=None, project_id=correlation['project_id'], prompt_id=correlation['prompt_id']: undo_prompt_action(project_id, prompt_id),
).props('outline color=negative')
_render_prompt_compare(correlation)
_render_prompt_compare(filtered_correlation)
else:
ui.label('No prompt compare data recorded yet.').classes('factory-muted')
with ui.tab_panel('timeline'):
@ui.refreshable
def render_timeline_panel() -> None:
view_model = _view_model()
if view_model.get('error'):
_render_dashboard_unavailable(view_model['error'])
return
projects = view_model['projects']
branch_scope_filter = view_model['branch_scope_filter']
commit_lookup_query = view_model['commit_lookup_query']
commit_context = view_model['commit_context']
with ui.card().classes('factory-panel q-pa-lg q-mb-md'):
ui.label('Commit Lookup').style('font-size: 1.25rem; font-weight: 700; color: #3a281a;')
ui.label('Submit a commit id to reconstruct the prompt, traces, repository state, and surrounding timeline that produced it.').classes('factory-muted')
with ui.row().classes('items-center gap-3 q-mt-md w-full'):
ui.select(
options=['', 'main', 'pr', 'manual'],
value=branch_scope_filter,
on_change=_store_branch_scope,
label='Branch scope',
).classes('min-w-[10rem]')
ui.input(
label='Commit hash',
value=commit_lookup_query,
on_change=_store_commit_lookup,
placeholder='deadbeef',
).classes('min-w-[18rem]')
ui.button('Lookup', on_click=dashboard_body.refresh).props('unelevated color=dark')
ui.select(options=['', 'main', 'pr', 'manual'], value=branch_scope_filter, on_change=_store_branch_scope, label='Branch scope').classes('min-w-[10rem]')
ui.input(label='Commit hash', value=commit_lookup_query, on_change=_store_commit_lookup, placeholder='deadbeef').classes('min-w-[18rem]')
ui.button('Lookup', on_click=_refresh_timeline_sections).props('unelevated color=dark')
if commit_lookup_query and commit_context is None:
ui.label('No recorded context found for that commit hash.').classes('factory-muted q-mt-md')
elif commit_context is not None:
@@ -1105,12 +1277,7 @@ def create_dashboard():
ui.label('Project Timelines').style('font-size: 1.25rem; font-weight: 700; color: #3a281a;')
ui.label('Chronological view of prompts, LLM traces, commits, PR updates, repository sync events, and prompt reverts.').classes('factory-muted')
with ui.row().classes('items-center gap-3 q-mt-md w-full'):
ui.select(
options=['', 'main', 'pr', 'manual'],
value=branch_scope_filter,
on_change=_store_branch_scope,
label='Branch scope',
).classes('min-w-[10rem]')
ui.select(options=['', 'main', 'pr', 'manual'], value=branch_scope_filter, on_change=_store_branch_scope, label='Branch scope').classes('min-w-[10rem]')
if projects:
for project_bundle in projects:
project = project_bundle['project']
@@ -1119,7 +1286,15 @@ def create_dashboard():
else:
ui.label('No project timelines recorded yet.').classes('factory-muted')
with ui.tab_panel('system'):
@ui.refreshable
def render_system_panel() -> None:
view_model = _view_model()
if view_model.get('error'):
_render_dashboard_unavailable(view_model['error'])
return
system_logs = view_model['system_logs']
llm_runtime = view_model['llm_runtime']
discovered_repositories = view_model['discovered_repositories']
with ui.grid(columns=2).classes('w-full gap-4'):
with ui.card().classes('factory-panel q-pa-lg'):
ui.label('System Logs').style('font-size: 1.25rem; font-weight: 700; color: #3a281a;')
@@ -1128,26 +1303,56 @@ def create_dashboard():
ui.markdown(f"- {log['timestamp'] or 'n/a'} · **{log['component']}** · {log['level']} · {log['message']}")
else:
ui.label('No system logs yet.').classes('factory-muted')
with ui.card().classes('factory-panel q-pa-lg'):
ui.label('LLM Runtime').style('font-size: 1.25rem; font-weight: 700; color: #3a281a;')
rows = [
('Provider', llm_runtime.get('provider')),
('Model', llm_runtime.get('model')),
('Ollama URL', llm_runtime.get('ollama_url')),
('Tool Context Limit', str(llm_runtime.get('tool_context_limit'))),
('Max Tool Call Rounds', str(llm_runtime.get('max_tool_call_rounds'))),
('Live Gitea Tools Configured', 'yes' if llm_runtime.get('gitea_live_tools_configured') else 'no'),
]
for label, value in rows:
with ui.row().classes('justify-between w-full q-mt-sm'):
ui.label(label).classes('factory-muted')
ui.label(value or 'n/a').style('font-weight: 600; color: #3a281a;')
ui.label('Mediated Tools').style('font-weight: 700; color: #3a281a; margin-top: 12px;')
for tool_name in llm_runtime.get('mediated_tools', []):
ui.label(tool_name).classes('factory-chip q-mt-sm')
ui.label('Live Tools').style('font-weight: 700; color: #3a281a; margin-top: 12px;')
for tool_name in llm_runtime.get('live_tools', []):
ui.label(tool_name).classes('factory-chip q-mt-sm')
ui.label('Live Tool Stages').style('font-weight: 700; color: #3a281a; margin-top: 12px;')
live_tools_by_stage = llm_runtime.get('live_tools_by_stage', {})
for stage_name, stage_tools in live_tools_by_stage.items():
ui.label(stage_name.replace('_', ' ').title()).classes('factory-muted q-mt-sm')
if stage_tools:
for tool_name in stage_tools:
ui.label(tool_name).classes('factory-chip q-mt-sm')
else:
ui.label('disabled').classes('factory-code q-mt-sm')
if llm_runtime.get('live_tool_stage_tool_map'):
ui.label('Stage Tool Overrides').style('font-weight: 700; color: #3a281a; margin-top: 12px;')
ui.label(json.dumps(llm_runtime.get('live_tool_stage_tool_map'), indent=2, sort_keys=True)).classes('factory-code q-mt-sm')
ui.label('Guardrails').style('font-weight: 700; color: #3a281a; margin-top: 12px;')
for label, text in (llm_runtime.get('guardrails') or {}).items():
ui.label(label.replace('_', ' ').title()).classes('factory-muted q-mt-sm')
ui.label(text or 'Not configured').classes('factory-code')
system_prompts = llm_runtime.get('system_prompts', {})
if system_prompts:
ui.label('System Prompts').style('font-weight: 700; color: #3a281a; margin-top: 12px;')
for label, text in system_prompts.items():
ui.label(label.replace('_', ' ').title()).classes('factory-muted q-mt-sm')
ui.label(text or 'Not configured').classes('factory-code')
with ui.card().classes('factory-panel q-pa-lg'):
ui.label('Repository Onboarding').style('font-size: 1.25rem; font-weight: 700; color: #3a281a;')
ui.label('Discover repositories in the Gitea organization, onboard manually created repos, and import their recent commits into the dashboard.').classes('factory-muted')
with ui.row().classes('items-center gap-3 q-mt-md w-full'):
ui.input(
label='Owner / org',
value=_selected_repo_owner(),
on_change=_store_repo_owner,
).classes('min-w-[12rem]')
ui.input(
label='Repository name',
value=_selected_repo_name(),
on_change=_store_repo_name,
).classes('min-w-[14rem]')
ui.input(label='Owner / org', value=_selected_repo_owner(), on_change=_store_repo_owner).classes('min-w-[12rem]')
ui.input(label='Repository name', value=_selected_repo_name(), on_change=_store_repo_name).classes('min-w-[14rem]')
ui.button('Discover Repos', on_click=discover_gitea_repositories_action).props('outline color=secondary')
ui.button(
'Onboard Repo',
on_click=lambda: onboard_repository_action(_selected_repo_owner(), _selected_repo_name()),
).props('unelevated color=dark')
ui.button('Onboard Repo', on_click=lambda: onboard_repository_action(_selected_repo_owner(), _selected_repo_name())).props('unelevated color=dark')
if discovered_repositories:
for repo in discovered_repositories:
with ui.card().classes('q-pa-sm q-mt-md'):
@@ -1169,31 +1374,18 @@ def create_dashboard():
ui.link(repo['html_url'], repo['html_url'], new_tab=True).classes('factory-code')
else:
ui.label('No discovered repositories loaded yet.').classes('factory-muted q-mt-md')
with ui.card().classes('factory-panel q-pa-lg'):
ui.label('Important Endpoints').style('font-size: 1.25rem; font-weight: 700; color: #3a281a;')
endpoints = [
'/health',
'/generate',
'/projects',
'/audit/projects',
'/audit/prompts',
'/audit/changes',
'/audit/issues',
'/audit/commit-context',
'/audit/timeline',
'/audit/llm-traces',
'/audit/correlations',
'/projects/{project_id}/sync-repository',
'/gitea/repos',
'/gitea/repos/onboard',
'/n8n/health',
'/n8n/setup',
'/health', '/llm/runtime', '/generate', '/projects', '/audit/projects', '/audit/prompts', '/audit/changes', '/audit/issues',
'/audit/commit-context', '/audit/timeline', '/audit/llm-traces', '/audit/correlations', '/projects/{project_id}/sync-repository',
'/gitea/repos', '/gitea/repos/onboard', '/n8n/health', '/n8n/setup',
]
for endpoint in endpoints:
ui.label(endpoint).classes('factory-code q-mt-sm')
with ui.tab_panel('health'):
@ui.refreshable
def render_health_panel() -> None:
with ui.card().classes('factory-panel q-pa-lg q-mb-md'):
ui.label('Health and Diagnostics').style('font-size: 1.25rem; font-weight: 700; color: #3a281a;')
ui.label('Use this page to verify runtime configuration, n8n API connectivity, and likely causes of provisioning failures.').classes('factory-muted')
@@ -1208,9 +1400,87 @@ def create_dashboard():
ui.button('Send Prompt Guide', on_click=send_telegram_prompt_guide_action).props('unelevated color=secondary')
_render_health_panels()
dashboard_body()
panel_refreshers: dict[str, callable] = {}
def _refresh_current_dashboard_sections() -> None:
_update_dashboard_state()
panel_refreshers['metrics']()
active_tab = _selected_tab_name()
if active_tab in panel_refreshers:
panel_refreshers[active_tab]()
def _refresh_all_dashboard_sections() -> None:
_update_dashboard_state()
panel_refreshers['metrics']()
for name in ('overview', 'projects', 'archived', 'trace', 'compare', 'timeline', 'system', 'health'):
panel_refreshers[name]()
def _refresh_llm_filtered_sections() -> None:
_update_dashboard_state()
for name in ('archived', 'trace', 'compare'):
panel_refreshers[name]()
def _refresh_timeline_sections() -> None:
_update_dashboard_state()
panel_refreshers['timeline']()
def _refresh_system_sections() -> None:
_update_dashboard_state()
panel_refreshers['system']()
def _refresh_health_sections() -> None:
panel_refreshers['health']()
_update_dashboard_state()
with ui.column().classes('factory-shell w-full gap-4 q-pa-lg'):
render_header()
render_metrics()
selected_tab = _selected_tab_name()
with ui.tabs(value=selected_tab, on_change=_store_selected_tab).classes('w-full') as tabs:
ui.tab('Overview').props('name=overview')
ui.tab('Projects').props('name=projects')
ui.tab('Archived').props('name=archived')
ui.tab('Prompt Trace').props('name=trace')
ui.tab('Compare').props('name=compare')
ui.tab('Timeline').props('name=timeline')
ui.tab('System').props('name=system')
ui.tab('Health').props('name=health')
with ui.tab_panels(tabs, value=selected_tab).classes('w-full'):
with ui.tab_panel('overview'):
render_overview_panel()
with ui.tab_panel('projects'):
render_projects_panel()
with ui.tab_panel('archived'):
render_archived_panel()
with ui.tab_panel('trace'):
render_trace_panel()
with ui.tab_panel('compare'):
render_compare_panel()
with ui.tab_panel('timeline'):
render_timeline_panel()
with ui.tab_panel('system'):
render_system_panel()
with ui.tab_panel('health'):
render_health_panel()
panel_refreshers.update({
'header': render_header.refresh,
'metrics': render_metrics.refresh,
'overview': render_overview_panel.refresh,
'projects': render_projects_panel.refresh,
'archived': render_archived_panel.refresh,
'trace': render_trace_panel.refresh,
'compare': render_compare_panel.refresh,
'timeline': render_timeline_panel.refresh,
'system': render_system_panel.refresh,
'health': render_health_panel.refresh,
})
ui.timer(15.0, _run_background_repository_sync)
ui.timer(10.0, dashboard_body.refresh)
ui.timer(10.0, _refresh_current_dashboard_sections)
def run_app(port=None, reload=False, browser=True, storage_secret=None):

View File

@@ -30,6 +30,7 @@ try:
from .agents.change_summary import ChangeSummaryGenerator
from .agents.database_manager import DatabaseManager
from .agents.request_interpreter import RequestInterpreter
from .agents.llm_service import LLMServiceClient
from .agents.orchestrator import AgentOrchestrator
from .agents.n8n_setup import N8NSetupAgent
from .agents.prompt_workflow import PromptWorkflowManager
@@ -41,6 +42,7 @@ except ImportError:
from agents.change_summary import ChangeSummaryGenerator
from agents.database_manager import DatabaseManager
from agents.request_interpreter import RequestInterpreter
from agents.llm_service import LLMServiceClient
from agents.orchestrator import AgentOrchestrator
from agents.n8n_setup import N8NSetupAgent
from agents.prompt_workflow import PromptWorkflowManager
@@ -109,6 +111,75 @@ def _build_project_id(name: str) -> str:
return f"{slug}-{uuid4().hex[:8]}"
def _build_project_slug(name: str) -> str:
"""Normalize a project name into a kebab-case identifier slug."""
return PROJECT_ID_PATTERN.sub("-", name.strip().lower()).strip("-") or "project"
def _ensure_unique_identifier(base_slug: str, reserved_ids: set[str]) -> str:
"""Return a unique identifier using deterministic numeric suffixes when needed."""
normalized = _build_project_slug(base_slug)
if normalized not in reserved_ids:
return normalized
suffix = 2
while f"{normalized}-{suffix}" in reserved_ids:
suffix += 1
return f"{normalized}-{suffix}"
def _build_project_identity_context(manager: DatabaseManager) -> list[dict]:
"""Build a compact project catalog for naming stages."""
projects = []
for history in manager.get_all_projects(include_archived=True):
repository = manager._get_project_repository(history) or {}
projects.append(
{
'project_id': history.project_id,
'name': history.project_name,
'description': history.description,
'repository': {
'owner': repository.get('owner'),
'name': repository.get('name'),
},
}
)
return projects
async def _derive_project_id_for_request(
request: SoftwareRequest,
*,
prompt_text: str,
prompt_routing: dict | None,
existing_projects: list[dict],
) -> tuple[str, dict | None]:
"""Derive a stable project id for a newly created project."""
reserved_ids = {str(project.get('project_id')).strip() for project in existing_projects if project.get('project_id')}
fallback_id = _ensure_unique_identifier((prompt_routing or {}).get('project_name') or request.name, reserved_ids)
user_prompt = (
f"Original user prompt:\n{prompt_text}\n\n"
f"Structured request:\n{json.dumps({'name': request.name, 'description': request.description, 'features': request.features, 'tech_stack': request.tech_stack}, indent=2)}\n\n"
f"Naming context:\n{json.dumps(prompt_routing or {}, indent=2)}\n\n"
f"Reserved project ids:\n{json.dumps(sorted(reserved_ids))}\n\n"
"Suggest the best stable project id for this new project."
)
content, trace = await LLMServiceClient().chat_with_trace(
stage='project_id_naming',
system_prompt=database_module.settings.llm_project_id_system_prompt,
user_prompt=user_prompt,
tool_context_input={'projects': existing_projects},
expect_json=True,
)
if content:
try:
parsed = json.loads(content)
candidate = parsed.get('project_id') or parsed.get('slug') or request.name
return _ensure_unique_identifier(str(candidate), reserved_ids), trace
except Exception:
pass
return fallback_id, trace
def _serialize_project(history: ProjectHistory) -> dict:
"""Serialize a project history row for API responses."""
return {
@@ -176,13 +247,15 @@ async def _run_generation(
prompt_source_context: dict | None = None,
prompt_routing: dict | None = None,
preferred_project_id: str | None = None,
repo_name_override: str | None = None,
related_issue: dict | None = None,
) -> dict:
"""Run the shared generation pipeline for a structured request."""
database_module.init_db()
manager = DatabaseManager(db)
reusable_history = manager.get_project_by_id(preferred_project_id) if preferred_project_id else manager.get_latest_project_by_name(request.name)
is_explicit_new_project = (prompt_routing or {}).get('intent') == 'new_project'
reusable_history = manager.get_project_by_id(preferred_project_id, include_archived=False) if preferred_project_id else (None if is_explicit_new_project else manager.get_latest_project_by_name(request.name))
if reusable_history and database_module.settings.gitea_url and database_module.settings.gitea_token:
try:
from .agents.gitea import GiteaAPI
@@ -197,14 +270,23 @@ async def _run_generation(
),
project_id=reusable_history.project_id,
)
project_id_trace = None
resolved_prompt_text = prompt_text or _compose_prompt_text(request)
if preferred_project_id and reusable_history is not None:
project_id = reusable_history.project_id
elif reusable_history and manager.get_open_pull_request(project_id=reusable_history.project_id):
elif reusable_history and not is_explicit_new_project and manager.get_open_pull_request(project_id=reusable_history.project_id):
project_id = reusable_history.project_id
else:
if is_explicit_new_project or prompt_text:
project_id, project_id_trace = await _derive_project_id_for_request(
request,
prompt_text=resolved_prompt_text,
prompt_routing=prompt_routing,
existing_projects=_build_project_identity_context(manager),
)
else:
project_id = _build_project_id(request.name)
reusable_history = None
resolved_prompt_text = prompt_text or _compose_prompt_text(request)
orchestrator = AgentOrchestrator(
project_id=project_id,
project_name=request.name,
@@ -217,6 +299,7 @@ async def _run_generation(
existing_history=reusable_history,
prompt_source_context=prompt_source_context,
prompt_routing=prompt_routing,
repo_name_override=repo_name_override,
related_issue_hint=related_issue,
)
result = await orchestrator.run()
@@ -240,6 +323,20 @@ async def _run_generation(
response_data['repository'] = result.get('repository')
response_data['related_issue'] = result.get('related_issue') or (result.get('ui_data') or {}).get('related_issue')
response_data['pull_request'] = result.get('pull_request') or manager.get_open_pull_request(project_id=project_id)
if project_id_trace:
manager.log_llm_trace(
project_id=project_id,
history_id=history.id if history else None,
prompt_id=orchestrator.prompt_audit.id if orchestrator.prompt_audit else None,
stage=project_id_trace['stage'],
provider=project_id_trace['provider'],
model=project_id_trace['model'],
system_prompt=project_id_trace['system_prompt'],
user_prompt=project_id_trace['user_prompt'],
assistant_response=project_id_trace['assistant_response'],
raw_response=project_id_trace.get('raw_response'),
fallback_used=project_id_trace.get('fallback_used', False),
)
summary_context = {
'name': response_data['name'],
'description': response_data['description'],
@@ -322,6 +419,7 @@ def read_api_info():
'/',
'/api',
'/health',
'/llm/runtime',
'/generate',
'/generate/text',
'/projects',
@@ -338,6 +436,9 @@ def read_api_info():
'/audit/pull-requests',
'/audit/lineage',
'/audit/correlations',
'/projects/{project_id}/archive',
'/projects/{project_id}/unarchive',
'/projects/{project_id}',
'/projects/{project_id}/prompts/{prompt_id}/undo',
'/projects/{project_id}/sync-repository',
'/gitea/repos',
@@ -360,6 +461,12 @@ def health_check():
}
@app.get('/llm/runtime')
def get_llm_runtime():
"""Return the active external LLM runtime, guardrail, and tool configuration."""
return LLMServiceClient().get_runtime_configuration()
@app.post('/generate')
async def generate_software(request: SoftwareRequest, db: DbSession):
"""Create and record a software-generation request."""
@@ -392,7 +499,7 @@ async def generate_software_from_text(request: FreeformSoftwareRequest, db: DbSe
context=interpreter_context,
)
routing = interpretation_trace.get('routing') or {}
selected_history = manager.get_project_by_id(routing.get('project_id')) if routing.get('project_id') else None
selected_history = manager.get_project_by_id(routing.get('project_id'), include_archived=False) if routing.get('project_id') else None
if selected_history is not None and routing.get('intent') != 'new_project':
interpreted['name'] = selected_history.project_name
interpreted['description'] = selected_history.description or interpreted['description']
@@ -408,6 +515,7 @@ async def generate_software_from_text(request: FreeformSoftwareRequest, db: DbSe
},
prompt_routing=routing,
preferred_project_id=routing.get('project_id') if routing.get('intent') != 'new_project' else None,
repo_name_override=routing.get('repo_name') if routing.get('intent') == 'new_project' else None,
related_issue={'number': routing.get('issue_number')} if routing.get('issue_number') is not None else None,
)
project_data = response.get('data', {})
@@ -428,6 +536,21 @@ async def generate_software_from_text(request: FreeformSoftwareRequest, db: DbSe
raw_response=interpretation_trace.get('raw_response'),
fallback_used=interpretation_trace.get('fallback_used', False),
)
naming_trace = interpretation_trace.get('project_naming')
if naming_trace:
manager.log_llm_trace(
project_id=project_data.get('project_id'),
history_id=project_data.get('history_id'),
prompt_id=prompt_id,
stage=naming_trace['stage'],
provider=naming_trace['provider'],
model=naming_trace['model'],
system_prompt=naming_trace['system_prompt'],
user_prompt=naming_trace['user_prompt'],
assistant_response=naming_trace['assistant_response'],
raw_response=naming_trace.get('raw_response'),
fallback_used=naming_trace.get('fallback_used', False),
)
response['interpreted_request'] = interpreted
response['routing'] = routing
response['llm_trace'] = interpretation_trace
@@ -440,10 +563,14 @@ async def generate_software_from_text(request: FreeformSoftwareRequest, db: DbSe
@app.get('/projects')
def list_projects(db: DbSession):
def list_projects(
db: DbSession,
include_archived: bool = Query(default=False),
archived_only: bool = Query(default=False),
):
"""List recorded projects."""
manager = DatabaseManager(db)
projects = manager.get_all_projects()
projects = manager.get_all_projects(include_archived=include_archived, archived_only=archived_only)
return {'projects': [_serialize_project(project) for project in projects]}
@@ -572,16 +699,70 @@ def get_pull_request_audit(db: DbSession, project_id: str | None = Query(default
@app.post('/projects/{project_id}/prompts/{prompt_id}/undo')
async def undo_prompt_changes(project_id: str, prompt_id: int, db: DbSession):
"""Undo all changes associated with a specific prompt."""
manager = DatabaseManager(db)
history = manager.get_project_by_id(project_id)
if history is None:
raise HTTPException(status_code=404, detail='Project not found')
if history.status == 'archived':
raise HTTPException(status_code=400, detail='Archived projects cannot be modified')
result = await PromptWorkflowManager(db).undo_prompt(project_id=project_id, prompt_id=prompt_id)
if result.get('status') == 'error':
raise HTTPException(status_code=400, detail=result.get('message', 'Undo failed'))
return result
@app.post('/projects/{project_id}/archive')
def archive_project(project_id: str, db: DbSession):
"""Archive a project so it no longer participates in active automation."""
manager = DatabaseManager(db)
result = manager.archive_project(project_id)
if result.get('status') == 'error':
raise HTTPException(status_code=404, detail=result.get('message', 'Archive failed'))
return result
@app.post('/projects/{project_id}/unarchive')
def unarchive_project(project_id: str, db: DbSession):
"""Restore an archived project back into the active automation set."""
manager = DatabaseManager(db)
result = manager.unarchive_project(project_id)
if result.get('status') == 'error':
raise HTTPException(status_code=404, detail=result.get('message', 'Restore failed'))
return result
@app.delete('/projects/{project_id}')
def delete_project(project_id: str, db: DbSession):
"""Delete a project, its local project directory, and project-scoped DB traces."""
manager = DatabaseManager(db)
audit_data = manager.get_project_audit_data(project_id)
if audit_data.get('project') is None:
raise HTTPException(status_code=404, detail='Project not found')
repository = audit_data.get('repository') or audit_data['project'].get('repository') or {}
remote_delete = None
if repository and repository.get('mode') != 'shared' and repository.get('owner') and repository.get('name') and database_module.settings.gitea_url and database_module.settings.gitea_token:
remote_delete = _create_gitea_api().delete_repo_sync(owner=repository.get('owner'), repo=repository.get('name'))
if remote_delete.get('error') and remote_delete.get('status_code') not in {404, None}:
raise HTTPException(status_code=502, detail=remote_delete.get('error'))
result = manager.delete_project(project_id)
if result.get('status') == 'error':
raise HTTPException(status_code=400, detail=result.get('message', 'Project deletion failed'))
result['remote_repository_deleted'] = bool(remote_delete and not remote_delete.get('error'))
result['remote_repository'] = repository if repository else None
return result
@app.post('/projects/{project_id}/sync-repository')
def sync_project_repository(project_id: str, db: DbSession, commit_limit: int = Query(default=25, ge=1, le=200)):
"""Import recent repository activity from Gitea for a tracked project."""
manager = DatabaseManager(db)
history = manager.get_project_by_id(project_id)
if history is None:
raise HTTPException(status_code=404, detail='Project not found')
if history.status == 'archived':
raise HTTPException(status_code=400, detail='Archived projects cannot be synced')
gitea_api = _create_gitea_api()
result = manager.sync_repository_activity(project_id=project_id, gitea_api=gitea_api, commit_limit=commit_limit)
if result.get('status') == 'error':