fix: add Telegram helper functions, refs NOISSUE

This commit is contained in:
2026-04-10 21:47:50 +02:00
parent 321bf74aef
commit 80d7716e65
2 changed files with 119 additions and 15 deletions

View File

@@ -1,8 +1,6 @@
"""Telegram bot integration for n8n webhook."""
import asyncio
import json
import re
from typing import Optional
@@ -12,6 +10,62 @@ class TelegramHandler:
def __init__(self, webhook_url: str):
self.webhook_url = webhook_url
self.api_url = "https://api.telegram.org/bot"
def build_prompt_guide_message(self, backend_url: str | None = None) -> str:
"""Build a Telegram message explaining the expected prompt format."""
lines = [
"AI Software Factory is listening in this chat.",
"",
"Send prompts in this format:",
"Name: Inventory Portal",
"Description: Internal tool for stock management and purchase tracking",
"Features:",
"- role-based login",
"- inventory dashboard",
"- purchase order workflow",
"Tech Stack:",
"- fastapi",
"- postgresql",
"- nicegui",
]
if backend_url:
lines.extend(["", f"Backend target: {backend_url}"])
return "\n".join(lines)
async def send_message(self, bot_token: str, chat_id: str | int, text: str) -> dict:
"""Send a direct Telegram message using the configured bot."""
if not bot_token:
return {"status": "error", "message": "Telegram bot token is not configured"}
if chat_id in (None, ""):
return {"status": "error", "message": "Telegram chat id is not configured"}
api_endpoint = f"{self.api_url}{bot_token}/sendMessage"
try:
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.post(
api_endpoint,
json={
"chat_id": str(chat_id),
"text": text,
},
) as resp:
payload = await resp.json()
if 200 <= resp.status < 300 and payload.get("ok"):
return {
"status": "success",
"message": "Telegram prompt guide sent successfully",
"payload": payload,
}
description = payload.get("description") or payload.get("message") or str(payload)
return {
"status": "error",
"message": f"Telegram API returned {resp.status}: {description}",
"payload": payload,
}
except Exception as exc:
return {"status": "error", "message": str(exc)}
async def handle_message(self, message_data: dict) -> dict:
"""Handle incoming Telegram message."""