Files
ai_software_factory/ai_software_factory/agents/gitea.py

434 lines
16 KiB
Python

"""Gitea API integration for repository and pull request operations."""
import os
import urllib.error
import urllib.request
import json
from urllib.parse import urlparse
def _normalize_base_url(base_url: str) -> str:
"""Normalize host-only service addresses into valid absolute URLs."""
normalized = (base_url or '').strip().rstrip('/')
if not normalized:
return ''
if '://' not in normalized:
normalized = f'https://{normalized}'
parsed = urlparse(normalized)
if not parsed.scheme or not parsed.netloc:
return ''
return normalized
class GiteaAPI:
"""Gitea API client for repository operations."""
def __init__(self, token: str, base_url: str, owner: str | None = None, repo: str | None = None):
self.token = token
self.base_url = _normalize_base_url(base_url)
self.owner = owner
self.repo = repo
self.headers = {
"Authorization": f"token {token}",
"Content-Type": "application/json",
}
def get_config(self) -> dict:
"""Load configuration from environment."""
base_url = os.getenv("GITEA_URL", "https://gitea.local")
token = os.getenv("GITEA_TOKEN", "")
owner = os.getenv("GITEA_OWNER", "ai-test")
repo = os.getenv("GITEA_REPO", "")
return {
"base_url": _normalize_base_url(base_url),
"token": token,
"owner": owner,
"repo": repo,
"supports_project_repos": not bool(repo),
}
def get_auth_headers(self) -> dict:
"""Get authentication headers."""
return {
"Authorization": f"token {self.token}",
"Content-Type": "application/json",
}
def _api_url(self, path: str) -> str:
"""Build a Gitea API URL from a relative path."""
return f"{self.base_url}/api/v1/{path.lstrip('/')}"
def build_repo_git_url(self, owner: str | None = None, repo: str | None = None) -> str | None:
"""Build the clone URL for a repository."""
_owner = owner or self.owner
_repo = repo or self.repo
if not _owner or not _repo:
return None
return f"{self.base_url}/{_owner}/{_repo}.git"
def build_commit_url(self, commit_hash: str, owner: str | None = None, repo: str | None = None) -> str | None:
"""Build a browser URL for a commit."""
_owner = owner or self.owner
_repo = repo or self.repo
if not _owner or not _repo or not commit_hash:
return None
return f"{self.base_url}/{_owner}/{_repo}/commit/{commit_hash}"
def build_compare_url(self, base_ref: str, head_ref: str, owner: str | None = None, repo: str | None = None) -> str | None:
"""Build a browser URL for a compare view."""
_owner = owner or self.owner
_repo = repo or self.repo
if not _owner or not _repo or not base_ref or not head_ref:
return None
return f"{self.base_url}/{_owner}/{_repo}/compare/{base_ref}...{head_ref}"
def build_pull_request_url(self, pr_number: int, owner: str | None = None, repo: str | None = None) -> str | None:
"""Build a browser URL for a pull request."""
_owner = owner or self.owner
_repo = repo or self.repo
if not _owner or not _repo or not pr_number:
return None
return f"{self.base_url}/{_owner}/{_repo}/pulls/{pr_number}"
async def _request(self, method: str, path: str, payload: dict | None = None) -> dict:
"""Perform a Gitea API request and normalize the response."""
try:
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.request(
method,
self._api_url(path),
headers=self.get_auth_headers(),
json=payload,
) as resp:
if resp.status in (200, 201):
return await resp.json()
return {"error": await resp.text(), "status_code": resp.status}
except Exception as e:
return {"error": str(e)}
def _request_sync(self, method: str, path: str, payload: dict | None = None) -> dict:
"""Perform a synchronous Gitea API request."""
try:
if not self.base_url:
return {'error': 'Gitea base URL is not configured or is invalid'}
request = urllib.request.Request(
self._api_url(path),
headers=self.get_auth_headers(),
method=method.upper(),
)
if payload is not None:
request.data = json.dumps(payload).encode('utf-8')
with urllib.request.urlopen(request) as response:
body = response.read().decode('utf-8')
return json.loads(body) if body else {}
except urllib.error.HTTPError as exc:
try:
body = exc.read().decode('utf-8')
except Exception:
body = str(exc)
return {'error': body, 'status_code': exc.code}
except Exception as exc:
return {'error': str(exc)}
def build_project_repo_name(self, project_id: str, project_name: str | None = None) -> str:
"""Build a repository name for a generated project."""
preferred = (project_name or project_id or "project").strip().lower().replace(" ", "-")
sanitized = "".join(ch if ch.isalnum() or ch in {"-", "_"} else "-" for ch in preferred)
while "--" in sanitized:
sanitized = sanitized.replace("--", "-")
return sanitized.strip("-") or project_id
async def create_repo(
self,
repo_name: str,
owner: str | None = None,
description: str | None = None,
private: bool = False,
auto_init: bool = True,
) -> dict:
"""Create a repository inside the configured organization."""
_owner = owner or self.owner
if not _owner:
return {"error": "Owner or organization is required"}
payload = {
"name": repo_name,
"description": description or f"AI-generated project repository for {repo_name}",
"private": private,
"auto_init": auto_init,
"default_branch": "main",
}
result = await self._request("POST", f"orgs/{_owner}/repos", payload)
if result.get("status_code") == 409:
existing = await self.get_repo_info(owner=_owner, repo=repo_name)
if not existing.get("error"):
existing["status"] = "exists"
return existing
if not result.get("error"):
result.setdefault("status", "created")
return result
async def delete_repo(self, owner: str | None = None, repo: str | None = None) -> dict:
"""Delete a repository from the configured organization/user."""
_owner = owner or self.owner
_repo = repo or self.repo
if not _owner or not _repo:
return {'error': 'Owner and repository name are required'}
result = await self._request('DELETE', f'repos/{_owner}/{_repo}')
if not result.get('error'):
result.setdefault('status', 'deleted')
return result
def delete_repo_sync(self, owner: str | None = None, repo: str | None = None) -> dict:
"""Synchronously delete a repository from the configured organization/user."""
_owner = owner or self.owner
_repo = repo or self.repo
if not _owner or not _repo:
return {'error': 'Owner and repository name are required'}
result = self._request_sync('DELETE', f'repos/{_owner}/{_repo}')
if not result.get('error'):
result.setdefault('status', 'deleted')
return result
async def get_current_user(self) -> dict:
"""Get the user associated with the configured token."""
return await self._request("GET", "user")
def get_current_user_sync(self) -> dict:
"""Synchronously get the user associated with the configured token."""
return self._request_sync("GET", "user")
async def create_branch(self, branch: str, base: str = "main", owner: str | None = None, repo: str | None = None):
"""Create a new branch."""
_owner = owner or self.owner
_repo = repo or self.repo
return await self._request(
"POST",
f"repos/{_owner}/{_repo}/branches",
{"new_branch_name": branch, "old_ref_name": base},
)
async def create_pull_request(
self,
title: str,
body: str,
owner: str,
repo: str,
base: str = "main",
head: str | None = None,
) -> dict:
"""Create a pull request."""
_owner = owner or self.owner
_repo = repo or self.repo
payload = {
"title": title,
"body": body,
"base": base,
"head": head or f"{_owner}-{_repo}-ai-gen-{hash(title) % 10000}",
}
return await self._request("POST", f"repos/{_owner}/{_repo}/pulls", payload)
def create_pull_request_sync(
self,
title: str,
body: str,
owner: str,
repo: str,
base: str = "main",
head: str | None = None,
) -> dict:
"""Synchronously create a pull request."""
_owner = owner or self.owner
_repo = repo or self.repo
payload = {
"title": title,
"body": body,
"base": base,
"head": head or f"{_owner}-{_repo}-ai-gen-{hash(title) % 10000}",
}
return self._request_sync("POST", f"repos/{_owner}/{_repo}/pulls", payload)
async def list_pull_requests(
self,
owner: str | None = None,
repo: str | None = None,
state: str = 'open',
) -> dict | list:
"""List pull requests for a repository."""
_owner = owner or self.owner
_repo = repo or self.repo
return await self._request("GET", f"repos/{_owner}/{_repo}/pulls?state={state}")
def list_pull_requests_sync(
self,
owner: str | None = None,
repo: str | None = None,
state: str = 'open',
) -> dict | list:
"""Synchronously list pull requests for a repository."""
_owner = owner or self.owner
_repo = repo or self.repo
return self._request_sync("GET", f"repos/{_owner}/{_repo}/pulls?state={state}")
async def list_repositories(self, owner: str | None = None) -> dict | list:
"""List repositories within the configured organization."""
_owner = owner or self.owner
return await self._request("GET", f"orgs/{_owner}/repos")
def list_repositories_sync(self, owner: str | None = None) -> dict | list:
"""Synchronously list repositories within the configured organization."""
_owner = owner or self.owner
return self._request_sync("GET", f"orgs/{_owner}/repos")
async def list_branches(self, owner: str | None = None, repo: str | None = None) -> dict | list:
"""List repository branches."""
_owner = owner or self.owner
_repo = repo or self.repo
return await self._request("GET", f"repos/{_owner}/{_repo}/branches")
def list_branches_sync(self, owner: str | None = None, repo: str | None = None) -> dict | list:
"""Synchronously list repository branches."""
_owner = owner or self.owner
_repo = repo or self.repo
return self._request_sync("GET", f"repos/{_owner}/{_repo}/branches")
async def list_issues(
self,
owner: str | None = None,
repo: str | None = None,
state: str = 'open',
) -> dict | list:
"""List repository issues, excluding pull requests at the consumer layer."""
_owner = owner or self.owner
_repo = repo or self.repo
return await self._request("GET", f"repos/{_owner}/{_repo}/issues?state={state}")
def list_issues_sync(
self,
owner: str | None = None,
repo: str | None = None,
state: str = 'open',
) -> dict | list:
"""Synchronously list repository issues."""
_owner = owner or self.owner
_repo = repo or self.repo
return self._request_sync("GET", f"repos/{_owner}/{_repo}/issues?state={state}")
async def get_issue(self, issue_number: int, owner: str | None = None, repo: str | None = None) -> dict:
"""Return one repository issue by number."""
_owner = owner or self.owner
_repo = repo or self.repo
return await self._request("GET", f"repos/{_owner}/{_repo}/issues/{issue_number}")
def get_issue_sync(self, issue_number: int, owner: str | None = None, repo: str | None = None) -> dict:
"""Synchronously return one repository issue by number."""
_owner = owner or self.owner
_repo = repo or self.repo
return self._request_sync("GET", f"repos/{_owner}/{_repo}/issues/{issue_number}")
async def list_repo_commits(
self,
owner: str | None = None,
repo: str | None = None,
limit: int = 25,
branch: str | None = None,
) -> dict | list:
"""List recent commits for a repository."""
_owner = owner or self.owner
_repo = repo or self.repo
branch_query = f"&sha={branch}" if branch else ""
return await self._request("GET", f"repos/{_owner}/{_repo}/commits?limit={limit}{branch_query}")
def list_repo_commits_sync(
self,
owner: str | None = None,
repo: str | None = None,
limit: int = 25,
branch: str | None = None,
) -> dict | list:
"""Synchronously list recent commits for a repository."""
_owner = owner or self.owner
_repo = repo or self.repo
branch_query = f"&sha={branch}" if branch else ""
return self._request_sync("GET", f"repos/{_owner}/{_repo}/commits?limit={limit}{branch_query}")
async def get_commit(
self,
commit_hash: str,
owner: str | None = None,
repo: str | None = None,
) -> dict:
"""Return one commit by hash."""
_owner = owner or self.owner
_repo = repo or self.repo
return await self._request("GET", f"repos/{_owner}/{_repo}/git/commits/{commit_hash}")
def get_commit_sync(
self,
commit_hash: str,
owner: str | None = None,
repo: str | None = None,
) -> dict:
"""Synchronously return one commit by hash."""
_owner = owner or self.owner
_repo = repo or self.repo
return self._request_sync("GET", f"repos/{_owner}/{_repo}/git/commits/{commit_hash}")
async def get_pull_request(self, pr_number: int, owner: str | None = None, repo: str | None = None) -> dict:
"""Return one pull request by number."""
_owner = owner or self.owner
_repo = repo or self.repo
return await self._request("GET", f"repos/{_owner}/{_repo}/pulls/{pr_number}")
def get_pull_request_sync(self, pr_number: int, owner: str | None = None, repo: str | None = None) -> dict:
"""Synchronously return one pull request by number."""
_owner = owner or self.owner
_repo = repo or self.repo
return self._request_sync("GET", f"repos/{_owner}/{_repo}/pulls/{pr_number}")
async def push_commit(
self,
branch: str,
files: list[dict],
message: str,
owner: str | None = None,
repo: str | None = None,
) -> dict:
"""Push files to a branch.
In production, this would use gitea's API or git push.
For now, this remains simulated.
"""
_owner = owner or self.owner
_repo = repo or self.repo
return {
"status": "simulated",
"branch": branch,
"message": message,
"files": files,
"owner": _owner,
"repo": _repo,
}
async def get_repo_info(self, owner: str | None = None, repo: str | None = None) -> dict:
"""Get repository information."""
_owner = owner or self.owner
_repo = repo or self.repo
if not _repo:
return {"error": "Repository name required for org operations"}
return await self._request("GET", f"repos/{_owner}/{_repo}")
def get_repo_info_sync(self, owner: str | None = None, repo: str | None = None) -> dict:
"""Synchronously get repository information."""
_owner = owner or self.owner
_repo = repo or self.repo
if not _repo:
return {"error": "Repository name required for org operations"}
return self._request_sync("GET", f"repos/{_owner}/{_repo}")