116 lines
4.9 KiB
Python
116 lines
4.9 KiB
Python
# admin_bot/main.py
|
||
import asyncio
|
||
import toml
|
||
import json
|
||
import nats
|
||
from nats.errors import TimeoutError
|
||
from loguru import logger
|
||
import uuid
|
||
from aiogram import Bot, Dispatcher, types
|
||
from aiogram.types import Message
|
||
from aiogram.enums import ParseMode
|
||
from aiogram.fsm.storage.memory import MemoryStorage
|
||
from aiogram.client.default import DefaultBotProperties
|
||
|
||
# Загрузка конфигурации
|
||
config = toml.load("config.toml")
|
||
|
||
BOT_TOKEN = config["telegram"]["bot_token"]
|
||
ADMIN_ID = config["telegram"]["admin_id"]
|
||
NATS_URL = config["nats"]["url"]
|
||
PUBLISH_TOPIC = config["nats"]["publish_topic"]
|
||
RESPONSE_TOPIC = config["nats"]["response_topic"]
|
||
|
||
# Инициализация
|
||
bot = Bot(token=BOT_TOKEN, default=DefaultBotProperties(parse_mode=ParseMode.HTML))
|
||
dp = Dispatcher(storage=MemoryStorage())
|
||
|
||
# Подключение к NATS
|
||
nats_client = None
|
||
pending_requests = {} # Словарь для отслеживания ожидающих ответов запросов
|
||
|
||
|
||
async def response_handler(msg):
|
||
"""Обработчик ответов от сервисов"""
|
||
try:
|
||
data = json.loads(msg.data.decode())
|
||
request_id = data.get("request_id")
|
||
response_text = data.get("response")
|
||
service_name = data.get("service", "unknown")
|
||
|
||
if request_id in pending_requests:
|
||
message_obj = pending_requests.pop(request_id) # Удаляем после обработки
|
||
await message_obj.answer(f"🤖 {service_name}: {response_text}")
|
||
logger.success(f"Ответ от {service_name} доставлен пользователю {message_obj.from_user.id}")
|
||
else:
|
||
logger.warning(f"Получен ответ для неизвестного request_id: {request_id}")
|
||
except Exception as e:
|
||
logger.exception("Ошибка при обработке ответа от сервиса")
|
||
|
||
|
||
async def cleanup_request(request_id: str, timeout: int):
|
||
"""Очистка pending_requests после таймаута"""
|
||
await asyncio.sleep(timeout)
|
||
if request_id in pending_requests:
|
||
message_obj = pending_requests.pop(request_id)
|
||
await message_obj.answer("⌛ Таймаут ожидания ответа от сервисов.")
|
||
logger.warning(f"Таймаут для request_id: {request_id}")
|
||
|
||
|
||
@dp.message()
|
||
async def handle_message(message: Message):
|
||
logger.info(f"Получено сообщение от {message.from_user.id}: {message.text}")
|
||
|
||
if message.from_user.id != ADMIN_ID:
|
||
logger.warning(f"Пользователь {message.from_user.id} не является админом. Доступ запрещён.")
|
||
await message.answer("⛔️ У вас нет прав на использование этого бота.")
|
||
return
|
||
|
||
if nats_client is None:
|
||
logger.error("Попытка отправки в NATS до инициализации клиента.")
|
||
await message.answer("🚫 NATS клиент не инициализирован.")
|
||
return
|
||
|
||
# Генерируем уникальный ID для запроса
|
||
request_id = str(uuid.uuid4())
|
||
pending_requests[request_id] = message
|
||
|
||
payload = {
|
||
"request_id": request_id,
|
||
"user_id": message.from_user.id,
|
||
"text": message.text,
|
||
"timestamp": asyncio.get_event_loop().time()
|
||
}
|
||
|
||
try:
|
||
# Публикуем сообщение в топик для всех заинтересованных сервисов
|
||
await nats_client.publish(PUBLISH_TOPIC, json.dumps(payload).encode())
|
||
await message.answer("📤 Сообщение отправлено сервисам для обработки...")
|
||
logger.info(f"Сообщение опубликовано в {PUBLISH_TOPIC} с request_id: {request_id}")
|
||
|
||
# Устанавливаем таймаут для очистки pending_requests
|
||
asyncio.create_task(cleanup_request(request_id, 60)) # 60 секунд таймаут
|
||
|
||
except Exception as e:
|
||
logger.exception("Ошибка при публикации сообщения")
|
||
await message.answer("❌ Ошибка при отправке сообщения.")
|
||
if request_id in pending_requests:
|
||
del pending_requests[request_id]
|
||
|
||
|
||
async def main():
|
||
global nats_client
|
||
logger.info("Подключение к NATS...")
|
||
nats_client = await nats.connect(servers=[NATS_URL])
|
||
logger.success("Подключение к NATS успешно.")
|
||
|
||
# Подписка на топик ответов
|
||
await nats_client.subscribe(RESPONSE_TOPIC, cb=response_handler)
|
||
logger.info(f"Подписан на топик ответов: {RESPONSE_TOPIC}")
|
||
|
||
logger.info("Запуск polling Telegram-бота...")
|
||
await dp.start_polling(bot)
|
||
|
||
if __name__ == "__main__":
|
||
logger.info("Запуск бота...")
|
||
asyncio.run(main())
|