Files

237 lines
8.8 KiB
Python

"""n8n setup agent for automatic webhook configuration."""
import json
from typing import Optional
from config import settings
class N8NSetupAgent:
"""Automatically configures n8n webhooks and workflows using API token authentication."""
def __init__(self, api_url: str, webhook_token: str):
"""Initialize n8n setup agent.
Args:
api_url: n8n API URL (e.g., http://n8n.yourserver.com)
webhook_token: n8n webhook token for API access (more secure than username/password)
Note: Set the webhook token in n8n via Settings > Credentials > Webhook
This token is used for all API requests instead of Basic Auth
"""
self.api_url = api_url.rstrip("/")
self.webhook_token = webhook_token
self.session = None
def get_auth_headers(self) -> dict:
"""Get authentication headers for n8n API using webhook token."""
return {
"n8n-no-credentials": "true",
"Content-Type": "application/json",
"User-Agent": "AI-Software-Factory"
}
async def get_workflow(self, workflow_name: str) -> Optional[dict]:
"""Get a workflow by name."""
import aiohttp
try:
async with aiohttp.ClientSession() as session:
# Use the webhook URL directly for workflow operations
# n8n supports calling workflows via /webhook/ path with query params
# For API token auth, n8n checks the token against webhook credentials
headers = self.get_auth_headers()
# Try standard workflow endpoint first (for API token setup)
async with session.get(
f"{self.api_url}/workflow/{workflow_name}.json",
headers=headers
) as resp:
if resp.status == 200:
return await resp.json()
elif resp.status == 404:
return None
else:
return {"error": f"Status {resp.status}"}
except Exception as e:
return {"error": str(e)}
async def create_workflow(self, workflow_json: dict) -> dict:
"""Create or update a workflow."""
import aiohttp
try:
async with aiohttp.ClientSession() as session:
# Use POST to create/update workflow
headers = self.get_auth_headers()
async with session.post(
f"{self.api_url}/workflow",
headers=headers,
json=workflow_json
) as resp:
if resp.status == 200 or resp.status == 201:
return await resp.json()
else:
return {"error": f"Status {resp.status}: {await resp.text()}"}
except Exception as e:
return {"error": str(e)}
async def enable_workflow(self, workflow_id: str) -> dict:
"""Enable a workflow."""
import aiohttp
try:
async with aiohttp.ClientSession() as session:
headers = self.get_auth_headers()
async with session.post(
f"{self.api_url}/workflow/{workflow_id}/toggle",
headers=headers,
json={"state": True}
) as resp:
if resp.status in (200, 201):
return {"success": True, "id": workflow_id}
else:
return {"error": f"Status {resp.status}: {await resp.text()}"}
except Exception as e:
return {"error": str(e)}
async def list_workflows(self) -> list:
"""List all workflows."""
import aiohttp
try:
async with aiohttp.ClientSession() as session:
headers = self.get_auth_headers()
async with session.get(
f"{self.api_url}/workflow",
headers=headers
) as resp:
if resp.status == 200:
return await resp.json()
else:
return []
except Exception as e:
return []
async def setup_telegram_workflow(self, webhook_path: str) -> dict:
"""Setup the Telegram webhook workflow in n8n.
Args:
webhook_path: The webhook path (e.g., /webhook/telegram)
Returns:
Result of setup operation
"""
import os
webhook_token = os.getenv("TELEGRAM_BOT_TOKEN", "")
# Define the workflow using n8n's Telegram trigger
workflow = {
"name": "Telegram to AI Software Factory",
"nodes": [
{
"parameters": {
"httpMethod": "post",
"responseMode": "response",
"path": webhook_path or "telegram",
"httpBody": "={{ json.stringify($json) }}",
"httpAuthType": "headerParam",
"headerParams": {
"x-n8n-internal": "true",
"content-type": "application/json"
}
},
"id": "webhook-node",
"name": "Telegram Webhook"
},
{
"parameters": {
"operation": "editFields",
"fields": "json",
"editFieldsValue": "={{ json.parse($json.text) }}",
"options": {}
},
"id": "parse-node",
"name": "Parse Message"
},
{
"parameters": {
"url": "http://localhost:8000/generate",
"method": "post",
"sendBody": True,
"responseMode": "onReceived",
"ignoreSSL": True,
"retResponse": True,
"sendQueryParams": False
},
"id": "api-node",
"name": "AI Software Factory API"
},
{
"parameters": {
"operation": "editResponse",
"editResponseValue": "={{ $json }}"
},
"id": "response-node",
"name": "Response Builder"
}
],
"connections": {
"Telegram Webhook": {
"webhook": ["parse"]
},
"Parse Message": {
"API Call": ["POST"]
},
"Response Builder": {
"respondToWebhook": ["response"]
}
},
"settings": {
"executionOrder": "v1"
}
}
# Create the workflow
result = await self.create_workflow(workflow)
if result.get("success") or result.get("id"):
# Try to enable the workflow
enable_result = await self.enable_workflow(result.get("id", ""))
result.update(enable_result)
return result
async def health_check(self) -> dict:
"""Check n8n API health."""
import aiohttp
try:
async with aiohttp.ClientSession() as session:
headers = self.get_auth_headers()
async with session.get(
f"{self.api_url}/api/v1/workflow",
headers=headers
) as resp:
if resp.status == 200:
return {"status": "ok"}
else:
return {"error": f"Status {resp.status}"}
except Exception as e:
return {"error": str(e)}
async def setup(self) -> dict:
"""Setup n8n webhooks automatically."""
# First, verify n8n is accessible
health = await self.health_check()
if health.get("error"):
return {"status": "error", "message": health.get("error")}
# Try to get existing telegram workflow
existing = await self.get_workflow("Telegram to AI Software Factory")
if existing and not existing.get("error"):
# Enable existing workflow
return await self.enable_workflow(existing.get("id", ""))
# Create new workflow
result = await self.setup_telegram_workflow("/webhook/telegram")
return result