"""n8n setup agent for automatic webhook configuration.""" import json from typing import Optional from ai_software_factory.config import settings class N8NSetupAgent: """Automatically configures n8n webhooks and workflows using API token authentication.""" def __init__(self, api_url: str, webhook_token: str): """Initialize n8n setup agent. Args: api_url: n8n API URL (e.g., http://n8n.yourserver.com) webhook_token: n8n webhook token for API access (more secure than username/password) Note: Set the webhook token in n8n via Settings > Credentials > Webhook This token is used for all API requests instead of Basic Auth """ self.api_url = api_url.rstrip("/") self.webhook_token = webhook_token self.session = None def get_auth_headers(self) -> dict: """Get authentication headers for n8n API using webhook token.""" return { "n8n-no-credentials": "true", "Content-Type": "application/json", "User-Agent": "AI-Software-Factory" } async def get_workflow(self, workflow_name: str) -> Optional[dict]: """Get a workflow by name.""" import aiohttp try: async with aiohttp.ClientSession() as session: # Use the webhook URL directly for workflow operations # n8n supports calling workflows via /webhook/ path with query params # For API token auth, n8n checks the token against webhook credentials headers = self.get_auth_headers() # Try standard workflow endpoint first (for API token setup) async with session.get( f"{self.api_url}/workflow/{workflow_name}.json", headers=headers ) as resp: if resp.status == 200: return await resp.json() elif resp.status == 404: return None else: return {"error": f"Status {resp.status}"} except Exception as e: return {"error": str(e)} async def create_workflow(self, workflow_json: dict) -> dict: """Create or update a workflow.""" import aiohttp try: async with aiohttp.ClientSession() as session: # Use POST to create/update workflow headers = self.get_auth_headers() async with session.post( f"{self.api_url}/workflow", headers=headers, json=workflow_json ) as resp: if resp.status == 200 or resp.status == 201: return await resp.json() else: return {"error": f"Status {resp.status}: {await resp.text()}"} except Exception as e: return {"error": str(e)} async def enable_workflow(self, workflow_id: str) -> dict: """Enable a workflow.""" import aiohttp try: async with aiohttp.ClientSession() as session: headers = self.get_auth_headers() async with session.post( f"{self.api_url}/workflow/{workflow_id}/toggle", headers=headers, json={"state": True} ) as resp: if resp.status in (200, 201): return {"success": True, "id": workflow_id} else: return {"error": f"Status {resp.status}: {await resp.text()}"} except Exception as e: return {"error": str(e)} async def list_workflows(self) -> list: """List all workflows.""" import aiohttp try: async with aiohttp.ClientSession() as session: headers = self.get_auth_headers() async with session.get( f"{self.api_url}/workflow", headers=headers ) as resp: if resp.status == 200: return await resp.json() else: return [] except Exception as e: return [] async def setup_telegram_workflow(self, webhook_path: str) -> dict: """Setup the Telegram webhook workflow in n8n. Args: webhook_path: The webhook path (e.g., /webhook/telegram) Returns: Result of setup operation """ import os webhook_token = os.getenv("TELEGRAM_BOT_TOKEN", "") # Define the workflow using n8n's Telegram trigger workflow = { "name": "Telegram to AI Software Factory", "nodes": [ { "parameters": { "httpMethod": "post", "responseMode": "response", "path": webhook_path or "telegram", "httpBody": "={{ json.stringify($json) }}", "httpAuthType": "headerParam", "headerParams": { "x-n8n-internal": "true", "content-type": "application/json" } }, "id": "webhook-node", "name": "Telegram Webhook" }, { "parameters": { "operation": "editFields", "fields": "json", "editFieldsValue": "={{ json.parse($json.text) }}", "options": {} }, "id": "parse-node", "name": "Parse Message" }, { "parameters": { "url": "http://localhost:8000/generate", "method": "post", "sendBody": True, "responseMode": "onReceived", "ignoreSSL": True, "retResponse": True, "sendQueryParams": False }, "id": "api-node", "name": "AI Software Factory API" }, { "parameters": { "operation": "editResponse", "editResponseValue": "={{ $json }}" }, "id": "response-node", "name": "Response Builder" } ], "connections": { "Telegram Webhook": { "webhook": ["parse"] }, "Parse Message": { "API Call": ["POST"] }, "Response Builder": { "respondToWebhook": ["response"] } }, "settings": { "executionOrder": "v1" } } # Create the workflow result = await self.create_workflow(workflow) if result.get("success") or result.get("id"): # Try to enable the workflow enable_result = await self.enable_workflow(result.get("id", "")) result.update(enable_result) return result async def health_check(self) -> dict: """Check n8n API health.""" import aiohttp try: async with aiohttp.ClientSession() as session: headers = self.get_auth_headers() async with session.get( f"{self.api_url}/api/v1/workflow", headers=headers ) as resp: if resp.status == 200: return {"status": "ok"} else: return {"error": f"Status {resp.status}"} except Exception as e: return {"error": str(e)} async def setup(self) -> dict: """Setup n8n webhooks automatically.""" # First, verify n8n is accessible health = await self.health_check() if health.get("error"): return {"status": "error", "message": health.get("error")} # Try to get existing telegram workflow existing = await self.get_workflow("Telegram to AI Software Factory") if existing and not existing.get("error"): # Enable existing workflow return await self.enable_workflow(existing.get("id", "")) # Create new workflow result = await self.setup_telegram_workflow("/webhook/telegram") return result