307 lines
11 KiB
Python
307 lines
11 KiB
Python
import asyncio
|
||
|
||
from typing import Any, Callable, Dict, List
|
||
|
||
from fastapi import FastAPI, Request
|
||
from fastapi.responses import JSONResponse
|
||
from magic_filter import MagicFilter
|
||
from uvicorn import Config, Server
|
||
from aiohttp import ClientConnectorError
|
||
|
||
from .filters.middleware import BaseMiddleware
|
||
from .filters.handler import Handler
|
||
|
||
from .context import MemoryContext
|
||
from .types.updates import UpdateUnion
|
||
from .types.errors import Error
|
||
|
||
from .methods.types.getted_updates import process_update_webhook, process_update_request
|
||
|
||
from .filters import filter_attrs
|
||
|
||
from .bot import Bot
|
||
from .enums.update import UpdateType
|
||
from .loggers import logger_dp
|
||
|
||
|
||
webhook_app = FastAPI()
|
||
CONNECTION_RETRY_DELAY = 30
|
||
|
||
|
||
class Dispatcher:
|
||
|
||
"""Основной класс для обработки событий бота.
|
||
|
||
Обеспечивает работу с вебхуком и поллингом, управляет обработчиками событий.
|
||
"""
|
||
|
||
def __init__(self):
|
||
self.event_handlers: List[Handler] = []
|
||
self.contexts: List[MemoryContext] = []
|
||
self.routers: List[Router] = []
|
||
self.filters: List[MagicFilter] = []
|
||
self.middlewares: List[BaseMiddleware] = []
|
||
|
||
self.bot: Bot = None
|
||
self.on_started_func: Callable = None
|
||
|
||
self.message_created = Event(update_type=UpdateType.MESSAGE_CREATED, router=self)
|
||
self.bot_added = Event(update_type=UpdateType.BOT_ADDED, router=self)
|
||
self.bot_removed = Event(update_type=UpdateType.BOT_REMOVED, router=self)
|
||
self.bot_started = Event(update_type=UpdateType.BOT_STARTED, router=self)
|
||
self.chat_title_changed = Event(update_type=UpdateType.CHAT_TITLE_CHANGED, router=self)
|
||
self.message_callback = Event(update_type=UpdateType.MESSAGE_CALLBACK, router=self)
|
||
self.message_chat_created = Event(update_type=UpdateType.MESSAGE_CHAT_CREATED, router=self)
|
||
self.message_edited = Event(update_type=UpdateType.MESSAGE_EDITED, router=self)
|
||
self.message_removed = Event(update_type=UpdateType.MESSAGE_REMOVED, router=self)
|
||
self.user_added = Event(update_type=UpdateType.USER_ADDED, router=self)
|
||
self.user_removed = Event(update_type=UpdateType.USER_REMOVED, router=self)
|
||
self.on_started = Event(update_type=UpdateType.ON_STARTED, router=self)
|
||
|
||
async def check_me(self):
|
||
|
||
"""Проверяет и логирует информацию о боте."""
|
||
|
||
me = await self.bot.get_me()
|
||
logger_dp.info(f'Бот: @{me.username} id={me.user_id}')
|
||
|
||
def include_routers(self, *routers: 'Router'):
|
||
|
||
"""Добавляет обработчики из роутеров.
|
||
|
||
Args:
|
||
*routers: Роутеры для включения
|
||
"""
|
||
|
||
for router in routers:
|
||
self.routers.append(router)
|
||
|
||
async def __ready(self, bot: Bot):
|
||
self.bot = bot
|
||
await self.check_me()
|
||
|
||
self.routers += [self]
|
||
|
||
handlers_count = 0
|
||
for router in self.routers:
|
||
for _ in router.event_handlers:
|
||
handlers_count += 1
|
||
|
||
logger_dp.info(f'{handlers_count} событий на обработку')
|
||
|
||
if self.on_started_func:
|
||
await self.on_started_func()
|
||
|
||
def __get_memory_context(self, chat_id: int, user_id: int):
|
||
|
||
"""Возвращает или создает контекст для чата и пользователя.
|
||
|
||
Args:
|
||
chat_id: ID чата
|
||
user_id: ID пользователя
|
||
|
||
Returns:
|
||
Существующий или новый контекст
|
||
"""
|
||
|
||
for ctx in self.contexts:
|
||
if ctx.chat_id == chat_id and ctx.user_id == user_id:
|
||
return ctx
|
||
|
||
new_ctx = MemoryContext(chat_id, user_id)
|
||
self.contexts.append(new_ctx)
|
||
return new_ctx
|
||
|
||
async def process_middlewares(
|
||
self,
|
||
middlewares: List[BaseMiddleware],
|
||
event_object: UpdateUnion,
|
||
result_data_kwargs: Dict[str, Any]
|
||
):
|
||
|
||
for middleware in middlewares:
|
||
result = await middleware.process_middleware(
|
||
event_object=event_object,
|
||
result_data_kwargs=result_data_kwargs
|
||
)
|
||
|
||
if result == None or result == False:
|
||
return
|
||
|
||
elif result == True:
|
||
result = {}
|
||
|
||
for key, value in result.items():
|
||
result_data_kwargs[key] = value
|
||
|
||
return result_data_kwargs
|
||
|
||
async def handle(self, event_object: UpdateUnion):
|
||
|
||
"""Обрабатывает событие.
|
||
|
||
Args:
|
||
event_object: Объект события для обработки
|
||
"""
|
||
try:
|
||
ids = event_object.get_ids()
|
||
memory_context = self.__get_memory_context(*ids)
|
||
kwargs = {'context': memory_context}
|
||
|
||
is_handled = False
|
||
|
||
for router in self.routers:
|
||
|
||
if is_handled:
|
||
break
|
||
|
||
if router.filters:
|
||
if not filter_attrs(event_object, *router.filters):
|
||
continue
|
||
|
||
kwargs = await self.process_middlewares(
|
||
middlewares=router.middlewares,
|
||
event_object=event_object,
|
||
result_data_kwargs=kwargs
|
||
)
|
||
|
||
for handler in router.event_handlers:
|
||
|
||
if not handler.update_type == event_object.update_type:
|
||
continue
|
||
|
||
if handler.filters:
|
||
if not filter_attrs(event_object, *handler.filters):
|
||
continue
|
||
|
||
if not handler.state == await memory_context.get_state() \
|
||
and handler.state:
|
||
continue
|
||
|
||
func_args = handler.func_event.__annotations__.keys()
|
||
|
||
kwargs = await self.process_middlewares(
|
||
middlewares=handler.middlewares,
|
||
event_object=event_object,
|
||
result_data_kwargs=kwargs
|
||
)
|
||
|
||
if not kwargs:
|
||
continue
|
||
|
||
for key in kwargs.copy().keys():
|
||
if not key in func_args:
|
||
del kwargs[key]
|
||
|
||
await handler.func_event(event_object, **kwargs)
|
||
|
||
logger_dp.info(f'Обработано: {event_object.update_type} | chat_id: {ids[0]}, user_id: {ids[1]}')
|
||
|
||
is_handled = True
|
||
break
|
||
|
||
if not is_handled:
|
||
logger_dp.info(f'Проигнорировано: {event_object.update_type} | chat_id: {ids[0]}, user_id: {ids[1]}')
|
||
|
||
except Exception as e:
|
||
logger_dp.error(f"Ошибка при обработке события: {event_object.update_type} | chat_id: {ids[0]}, user_id: {ids[1]} | {e} ")
|
||
|
||
async def start_polling(self, bot: Bot):
|
||
|
||
"""Запускает поллинг обновлений.
|
||
|
||
Args:
|
||
bot: Экземпляр бота
|
||
"""
|
||
await self.__ready(bot)
|
||
|
||
while True:
|
||
try:
|
||
events = await self.bot.get_updates()
|
||
|
||
if isinstance(events, Error):
|
||
logger_dp.info(f'Ошибка при получении обновлений: {events}')
|
||
continue
|
||
|
||
self.bot.marker_updates = events.get('marker')
|
||
|
||
processed_events = await process_update_request(
|
||
events=events,
|
||
bot=self.bot
|
||
)
|
||
|
||
for event in processed_events:
|
||
await self.handle(event)
|
||
except ClientConnectorError:
|
||
logger_dp.error(f'Ошибка подключения, жду {CONNECTION_RETRY_DELAY} секунд')
|
||
await asyncio.sleep(CONNECTION_RETRY_DELAY)
|
||
except Exception as e:
|
||
logger_dp.error(f'Общая ошибка при обработке событий: {e}')
|
||
|
||
async def handle_webhook(self, bot: Bot, host: str = '0.0.0.0', port: int = 8080):
|
||
|
||
"""Запускает вебхук сервер.
|
||
|
||
Args:
|
||
bot: Экземпляр бота
|
||
host: Хост для сервера
|
||
port: Порт для сервера
|
||
"""
|
||
|
||
await self.__ready(bot)
|
||
|
||
@webhook_app.post('/')
|
||
async def _(request: Request):
|
||
try:
|
||
event_json = await request.json()
|
||
|
||
event_object = await process_update_webhook(
|
||
event_json=event_json,
|
||
bot=self.bot
|
||
)
|
||
|
||
await self.handle(event_object)
|
||
|
||
return JSONResponse(content={'ok': True}, status_code=200)
|
||
except Exception as e:
|
||
logger_dp.error(f"Ошибка при обработке события: {event_json['update_type']}: {e}")
|
||
|
||
config = Config(app=webhook_app, host=host, port=port, log_level="critical")
|
||
server = Server(config)
|
||
|
||
await server.serve()
|
||
|
||
|
||
class Router(Dispatcher):
|
||
|
||
"""Роутер для группировки обработчиков событий."""
|
||
|
||
def __init__(self):
|
||
super().__init__()
|
||
|
||
|
||
class Event:
|
||
|
||
"""Декоратор для регистрации обработчиков событий."""
|
||
|
||
def __init__(self, update_type: UpdateType, router: Dispatcher | Router):
|
||
self.update_type = update_type
|
||
self.router = router
|
||
|
||
def __call__(self, *args, **kwargs):
|
||
def decorator(func_event: Callable):
|
||
|
||
if self.update_type == UpdateType.ON_STARTED:
|
||
self.router.on_started_func = func_event
|
||
|
||
else:
|
||
self.router.event_handlers.append(
|
||
Handler(
|
||
func_event=func_event,
|
||
update_type=self.update_type,
|
||
*args, **kwargs
|
||
)
|
||
)
|
||
return func_event
|
||
|
||
return decorator |