generated from Templates/Docker_Image
237 lines
8.8 KiB
Python
237 lines
8.8 KiB
Python
"""n8n setup agent for automatic webhook configuration."""
|
|
|
|
import json
|
|
from typing import Optional
|
|
from config import settings
|
|
|
|
|
|
class N8NSetupAgent:
|
|
"""Automatically configures n8n webhooks and workflows using API token authentication."""
|
|
|
|
def __init__(self, api_url: str, webhook_token: str):
|
|
"""Initialize n8n setup agent.
|
|
|
|
Args:
|
|
api_url: n8n API URL (e.g., http://n8n.yourserver.com)
|
|
webhook_token: n8n webhook token for API access (more secure than username/password)
|
|
|
|
Note: Set the webhook token in n8n via Settings > Credentials > Webhook
|
|
This token is used for all API requests instead of Basic Auth
|
|
"""
|
|
self.api_url = api_url.rstrip("/")
|
|
self.webhook_token = webhook_token
|
|
self.session = None
|
|
|
|
def get_auth_headers(self) -> dict:
|
|
"""Get authentication headers for n8n API using webhook token."""
|
|
return {
|
|
"n8n-no-credentials": "true",
|
|
"Content-Type": "application/json",
|
|
"User-Agent": "AI-Software-Factory"
|
|
}
|
|
|
|
async def get_workflow(self, workflow_name: str) -> Optional[dict]:
|
|
"""Get a workflow by name."""
|
|
import aiohttp
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
# Use the webhook URL directly for workflow operations
|
|
# n8n supports calling workflows via /webhook/ path with query params
|
|
# For API token auth, n8n checks the token against webhook credentials
|
|
headers = self.get_auth_headers()
|
|
|
|
# Try standard workflow endpoint first (for API token setup)
|
|
async with session.get(
|
|
f"{self.api_url}/workflow/{workflow_name}.json",
|
|
headers=headers
|
|
) as resp:
|
|
if resp.status == 200:
|
|
return await resp.json()
|
|
elif resp.status == 404:
|
|
return None
|
|
else:
|
|
return {"error": f"Status {resp.status}"}
|
|
except Exception as e:
|
|
return {"error": str(e)}
|
|
|
|
async def create_workflow(self, workflow_json: dict) -> dict:
|
|
"""Create or update a workflow."""
|
|
import aiohttp
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
# Use POST to create/update workflow
|
|
headers = self.get_auth_headers()
|
|
|
|
async with session.post(
|
|
f"{self.api_url}/workflow",
|
|
headers=headers,
|
|
json=workflow_json
|
|
) as resp:
|
|
if resp.status == 200 or resp.status == 201:
|
|
return await resp.json()
|
|
else:
|
|
return {"error": f"Status {resp.status}: {await resp.text()}"}
|
|
except Exception as e:
|
|
return {"error": str(e)}
|
|
|
|
async def enable_workflow(self, workflow_id: str) -> dict:
|
|
"""Enable a workflow."""
|
|
import aiohttp
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
headers = self.get_auth_headers()
|
|
|
|
async with session.post(
|
|
f"{self.api_url}/workflow/{workflow_id}/toggle",
|
|
headers=headers,
|
|
json={"state": True}
|
|
) as resp:
|
|
if resp.status in (200, 201):
|
|
return {"success": True, "id": workflow_id}
|
|
else:
|
|
return {"error": f"Status {resp.status}: {await resp.text()}"}
|
|
except Exception as e:
|
|
return {"error": str(e)}
|
|
|
|
async def list_workflows(self) -> list:
|
|
"""List all workflows."""
|
|
import aiohttp
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
headers = self.get_auth_headers()
|
|
|
|
async with session.get(
|
|
f"{self.api_url}/workflow",
|
|
headers=headers
|
|
) as resp:
|
|
if resp.status == 200:
|
|
return await resp.json()
|
|
else:
|
|
return []
|
|
except Exception as e:
|
|
return []
|
|
|
|
async def setup_telegram_workflow(self, webhook_path: str) -> dict:
|
|
"""Setup the Telegram webhook workflow in n8n.
|
|
|
|
Args:
|
|
webhook_path: The webhook path (e.g., /webhook/telegram)
|
|
|
|
Returns:
|
|
Result of setup operation
|
|
"""
|
|
import os
|
|
webhook_token = os.getenv("TELEGRAM_BOT_TOKEN", "")
|
|
|
|
# Define the workflow using n8n's Telegram trigger
|
|
workflow = {
|
|
"name": "Telegram to AI Software Factory",
|
|
"nodes": [
|
|
{
|
|
"parameters": {
|
|
"httpMethod": "post",
|
|
"responseMode": "response",
|
|
"path": webhook_path or "telegram",
|
|
"httpBody": "={{ json.stringify($json) }}",
|
|
"httpAuthType": "headerParam",
|
|
"headerParams": {
|
|
"x-n8n-internal": "true",
|
|
"content-type": "application/json"
|
|
}
|
|
},
|
|
"id": "webhook-node",
|
|
"name": "Telegram Webhook"
|
|
},
|
|
{
|
|
"parameters": {
|
|
"operation": "editFields",
|
|
"fields": "json",
|
|
"editFieldsValue": "={{ json.parse($json.text) }}",
|
|
"options": {}
|
|
},
|
|
"id": "parse-node",
|
|
"name": "Parse Message"
|
|
},
|
|
{
|
|
"parameters": {
|
|
"url": "http://localhost:8000/generate",
|
|
"method": "post",
|
|
"sendBody": True,
|
|
"responseMode": "onReceived",
|
|
"ignoreSSL": True,
|
|
"retResponse": True,
|
|
"sendQueryParams": False
|
|
},
|
|
"id": "api-node",
|
|
"name": "AI Software Factory API"
|
|
},
|
|
{
|
|
"parameters": {
|
|
"operation": "editResponse",
|
|
"editResponseValue": "={{ $json }}"
|
|
},
|
|
"id": "response-node",
|
|
"name": "Response Builder"
|
|
}
|
|
],
|
|
"connections": {
|
|
"Telegram Webhook": {
|
|
"webhook": ["parse"]
|
|
},
|
|
"Parse Message": {
|
|
"API Call": ["POST"]
|
|
},
|
|
"Response Builder": {
|
|
"respondToWebhook": ["response"]
|
|
}
|
|
},
|
|
"settings": {
|
|
"executionOrder": "v1"
|
|
}
|
|
}
|
|
|
|
# Create the workflow
|
|
result = await self.create_workflow(workflow)
|
|
|
|
if result.get("success") or result.get("id"):
|
|
# Try to enable the workflow
|
|
enable_result = await self.enable_workflow(result.get("id", ""))
|
|
result.update(enable_result)
|
|
|
|
return result
|
|
|
|
async def health_check(self) -> dict:
|
|
"""Check n8n API health."""
|
|
import aiohttp
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
headers = self.get_auth_headers()
|
|
|
|
async with session.get(
|
|
f"{self.api_url}/api/v1/workflow",
|
|
headers=headers
|
|
) as resp:
|
|
if resp.status == 200:
|
|
return {"status": "ok"}
|
|
else:
|
|
return {"error": f"Status {resp.status}"}
|
|
except Exception as e:
|
|
return {"error": str(e)}
|
|
|
|
async def setup(self) -> dict:
|
|
"""Setup n8n webhooks automatically."""
|
|
# First, verify n8n is accessible
|
|
health = await self.health_check()
|
|
if health.get("error"):
|
|
return {"status": "error", "message": health.get("error")}
|
|
|
|
# Try to get existing telegram workflow
|
|
existing = await self.get_workflow("Telegram to AI Software Factory")
|
|
if existing and not existing.get("error"):
|
|
# Enable existing workflow
|
|
return await self.enable_workflow(existing.get("id", ""))
|
|
|
|
# Create new workflow
|
|
result = await self.setup_telegram_workflow("/webhook/telegram")
|
|
return result
|