From 02b4e2d39af6a6dd8e0e0be7bf8a42a270bc2ba8 Mon Sep 17 00:00:00 2001 From: Denis Date: Fri, 25 Jul 2025 00:52:38 +0300 Subject: [PATCH] =?UTF-8?q?=D0=A1=D0=BE=D0=B7=D0=B4=D0=B0=D0=BD=20=D0=BE?= =?UTF-8?q?=D1=82=D0=B4=D0=B5=D0=BB=D1=8C=D0=BD=D1=8B=D0=B9=20=D1=84=D0=B0?= =?UTF-8?q?=D0=B9=D0=BB=20=D0=B4=D0=BB=D1=8F=20MemoryContext?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- maxapi/context/context.py | 93 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 93 insertions(+) create mode 100644 maxapi/context/context.py diff --git a/maxapi/context/context.py b/maxapi/context/context.py new file mode 100644 index 0000000..ef8b6e1 --- /dev/null +++ b/maxapi/context/context.py @@ -0,0 +1,93 @@ +import asyncio + +from typing import Any, Dict, Optional, Union + +from ..context.state_machine import State + + +class MemoryContext: + + """ + Контекст хранения данных пользователя с блокировками. + + Args: + chat_id (int): Идентификатор чата + user_id (int): Идентификатор пользователя + """ + + def __init__(self, chat_id: int, user_id: int): + self.chat_id = chat_id + self.user_id = user_id + self._context: Dict[str, Any] = {} + self._state: State | str | None = None + self._lock = asyncio.Lock() + + async def get_data(self) -> dict[str, Any]: + + """ + Возвращает текущий контекст данных. + + Returns: + Словарь с данными контекста + """ + + async with self._lock: + return self._context + + async def set_data(self, data: dict[str, Any]): + + """ + Полностью заменяет контекст данных. + + Args: + data: Новый словарь контекста + """ + + async with self._lock: + self._context = data + + async def update_data(self, **kwargs): + + """ + Обновляет контекст данных новыми значениями. + + Args: + **kwargs: Пары ключ-значение для обновления + """ + + async with self._lock: + self._context.update(kwargs) + + async def set_state(self, state: Optional[Union[State, str]] = None): + + """ + Устанавливает новое состояние. + + Args: + state: Новое состояние или None для сброса + """ + + async with self._lock: + self._state = state + + async def get_state(self): + + """ + Возвращает текущее состояние. + + Returns: + Текущее состояние или None + """ + + async with self._lock: + return self._state + + async def clear(self): + + """ + Очищает контекст и сбрасывает состояние. + """ + + async with self._lock: + self._state = None + self._context = {}