voice-ass6/frontend-tg/main.py
2025-07-16 18:32:26 +05:00

116 lines
4.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# admin_bot/main.py
import asyncio
import toml
import json
import nats
from nats.errors import TimeoutError
from loguru import logger
import uuid
from aiogram import Bot, Dispatcher, types
from aiogram.types import Message
from aiogram.enums import ParseMode
from aiogram.fsm.storage.memory import MemoryStorage
from aiogram.client.default import DefaultBotProperties
# Загрузка конфигурации
config = toml.load("config.toml")
BOT_TOKEN = config["telegram"]["bot_token"]
ADMIN_ID = config["telegram"]["admin_id"]
NATS_URL = config["nats"]["url"]
PUBLISH_TOPIC = config["nats"]["publish_topic"]
RESPONSE_TOPIC = config["nats"]["response_topic"]
# Инициализация
bot = Bot(token=BOT_TOKEN, default=DefaultBotProperties(parse_mode=ParseMode.HTML))
dp = Dispatcher(storage=MemoryStorage())
# Подключение к NATS
nats_client = None
pending_requests = {} # Словарь для отслеживания ожидающих ответов запросов
async def response_handler(msg):
"""Обработчик ответов от сервисов"""
try:
data = json.loads(msg.data.decode())
request_id = data.get("request_id")
response_text = data.get("response")
service_name = data.get("service", "unknown")
if request_id in pending_requests:
message_obj = pending_requests.pop(request_id) # Удаляем после обработки
await message_obj.answer(f"🤖 {service_name}: {response_text}")
logger.success(f"Ответ от {service_name} доставлен пользователю {message_obj.from_user.id}")
else:
logger.warning(f"Получен ответ для неизвестного request_id: {request_id}")
except Exception as e:
logger.exception("Ошибка при обработке ответа от сервиса")
async def cleanup_request(request_id: str, timeout: int):
"""Очистка pending_requests после таймаута"""
await asyncio.sleep(timeout)
if request_id in pending_requests:
message_obj = pending_requests.pop(request_id)
await message_obj.answer("⌛ Таймаут ожидания ответа от сервисов.")
logger.warning(f"Таймаут для request_id: {request_id}")
@dp.message()
async def handle_message(message: Message):
logger.info(f"Получено сообщение от {message.from_user.id}: {message.text}")
if message.from_user.id != ADMIN_ID:
logger.warning(f"Пользователь {message.from_user.id} не является админом. Доступ запрещён.")
await message.answer("⛔️ У вас нет прав на использование этого бота.")
return
if nats_client is None:
logger.error("Попытка отправки в NATS до инициализации клиента.")
await message.answer("🚫 NATS клиент не инициализирован.")
return
# Генерируем уникальный ID для запроса
request_id = str(uuid.uuid4())
pending_requests[request_id] = message
payload = {
"request_id": request_id,
"user_id": message.from_user.id,
"text": message.text,
"timestamp": asyncio.get_event_loop().time()
}
try:
# Публикуем сообщение в топик для всех заинтересованных сервисов
await nats_client.publish(PUBLISH_TOPIC, json.dumps(payload).encode())
await message.answer("📤 Сообщение отправлено сервисам для обработки...")
logger.info(f"Сообщение опубликовано в {PUBLISH_TOPIC} с request_id: {request_id}")
# Устанавливаем таймаут для очистки pending_requests
asyncio.create_task(cleanup_request(request_id, 60)) # 60 секунд таймаут
except Exception as e:
logger.exception("Ошибка при публикации сообщения")
await message.answer("❌ Ошибка при отправке сообщения.")
if request_id in pending_requests:
del pending_requests[request_id]
async def main():
global nats_client
logger.info("Подключение к NATS...")
nats_client = await nats.connect(servers=[NATS_URL])
logger.success("Подключение к NATS успешно.")
# Подписка на топик ответов
await nats_client.subscribe(RESPONSE_TOPIC, cb=response_handler)
logger.info(f"Подписан на топик ответов: {RESPONSE_TOPIC}")
logger.info("Запуск polling Telegram-бота...")
await dp.start_polling(bot)
if __name__ == "__main__":
logger.info("Запуск бота...")
asyncio.run(main())