# admin_bot/main.py import asyncio import toml import json import nats from nats.errors import TimeoutError from loguru import logger import uuid from aiogram import Bot, Dispatcher, types from aiogram.types import Message from aiogram.enums import ParseMode from aiogram.fsm.storage.memory import MemoryStorage from aiogram.client.default import DefaultBotProperties # Загрузка конфигурации config = toml.load("config.toml") BOT_TOKEN = config["telegram"]["bot_token"] ADMIN_ID = config["telegram"]["admin_id"] NATS_URL = config["nats"]["url"] PUBLISH_TOPIC = config["nats"]["publish_topic"] RESPONSE_TOPIC = config["nats"]["response_topic"] # Инициализация bot = Bot(token=BOT_TOKEN, default=DefaultBotProperties(parse_mode=ParseMode.HTML)) dp = Dispatcher(storage=MemoryStorage()) # Подключение к NATS nats_client = None pending_requests = {} # Словарь для отслеживания ожидающих ответов запросов async def response_handler(msg): """Обработчик ответов от сервисов""" try: data = json.loads(msg.data.decode()) request_id = data.get("request_id") response_text = data.get("response") service_name = data.get("service", "unknown") if request_id in pending_requests: message_obj = pending_requests.pop(request_id) # Удаляем после обработки await message_obj.answer(f"🤖 {service_name}: {response_text}") logger.success(f"Ответ от {service_name} доставлен пользователю {message_obj.from_user.id}") else: logger.warning(f"Получен ответ для неизвестного request_id: {request_id}") except Exception as e: logger.exception("Ошибка при обработке ответа от сервиса") async def cleanup_request(request_id: str, timeout: int): """Очистка pending_requests после таймаута""" await asyncio.sleep(timeout) if request_id in pending_requests: message_obj = pending_requests.pop(request_id) await message_obj.answer("⌛ Таймаут ожидания ответа от сервисов.") logger.warning(f"Таймаут для request_id: {request_id}") @dp.message() async def handle_message(message: Message): logger.info(f"Получено сообщение от {message.from_user.id}: {message.text}") if message.from_user.id != ADMIN_ID: logger.warning(f"Пользователь {message.from_user.id} не является админом. Доступ запрещён.") await message.answer("⛔️ У вас нет прав на использование этого бота.") return if nats_client is None: logger.error("Попытка отправки в NATS до инициализации клиента.") await message.answer("🚫 NATS клиент не инициализирован.") return # Генерируем уникальный ID для запроса request_id = str(uuid.uuid4()) pending_requests[request_id] = message payload = { "request_id": request_id, "user_id": message.from_user.id, "text": message.text, "timestamp": asyncio.get_event_loop().time() } try: # Публикуем сообщение в топик для всех заинтересованных сервисов await nats_client.publish(PUBLISH_TOPIC, json.dumps(payload).encode()) await message.answer("📤 Сообщение отправлено сервисам для обработки...") logger.info(f"Сообщение опубликовано в {PUBLISH_TOPIC} с request_id: {request_id}") # Устанавливаем таймаут для очистки pending_requests asyncio.create_task(cleanup_request(request_id, 60)) # 60 секунд таймаут except Exception as e: logger.exception("Ошибка при публикации сообщения") await message.answer("❌ Ошибка при отправке сообщения.") if request_id in pending_requests: del pending_requests[request_id] async def main(): global nats_client logger.info("Подключение к NATS...") nats_client = await nats.connect(servers=[NATS_URL]) logger.success("Подключение к NATS успешно.") # Подписка на топик ответов await nats_client.subscribe(RESPONSE_TOPIC, cb=response_handler) logger.info(f"Подписан на топик ответов: {RESPONSE_TOPIC}") logger.info("Запуск polling Telegram-бота...") await dp.start_polling(bot) if __name__ == "__main__": logger.info("Запуск бота...") asyncio.run(main())