chat_moderator_bot/utils/db.py

251 lines
8.7 KiB
Python
Raw Normal View History

2024-07-17 20:35:45 +00:00
import logging
import asyncpg
import redis.asyncio as redis
from config import POSTGRES_NAME, POSTGRES_USER, POSTGRES_PASSWORD, POSTGRES_HOST, POSTGRES_PORT, REDIS_NAME, \
REDIS_HOST, REDIS_PORT, REDIS_PASSWORD
create_tables_query = """
CREATE TABLE IF NOT EXISTS ban_words
(id SERIAL PRIMARY KEY,
word TEXT NOT NULL);
CREATE TABLE IF NOT EXISTS message
(id SERIAL PRIMARY KEY,
text TEXT NOT NULL,
media TEXT,
buttons TEXT NOT NULL,
included BOOL NOT NULL);
CREATE TABLE IF NOT EXISTS ban_media
(id SERIAL PRIMARY KEY,
video BOOL NOT NULL,
photo BOOL NOT NULL);
"""
exist_query = """
SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = $1)
"""
class Postgres:
def __init__(self):
self.conn = None
async def connect(self):
self.conn = await asyncpg.connect(
database=POSTGRES_NAME,
user=POSTGRES_USER,
password=POSTGRES_PASSWORD,
host=POSTGRES_HOST,
port=POSTGRES_PORT,
)
async def create_tables(self) -> None:
"""
Создаёт таблицы если их нет
:return:
"""
await self.connect()
try:
async with self.conn.transaction():
message_exist = await self.conn.fetchval(
exist_query, 'message'
)
ban_media_exist = await self.conn.fetchval(
exist_query, 'ban_media'
)
await self.conn.execute(
create_tables_query
)
if not message_exist:
await self.create_row(
table_name='message',
data_to_insert={
'text': 'Сообщение',
'media': '',
'buttons': '',
'included': False
}
)
if not ban_media_exist:
await self.create_row(
table_name='ban_media',
data_to_insert={
'photo': True,
'video': True,
}
)
except Exception as e:
logging.error(f'Ошибка в create_tables {e}')
finally:
await self.conn.close()
async def get_data(self, table_name: str, columns='*', query_filter=None) -> list:
"""
Получить данные нужной таблицы по указанным фильтрам
:param columns: Название колонн с нужными данными
:param query_filter: Фильтры запроса в формате {колонка: её значение}
:param table_name: Название таблицы для запроса
:return: False в случае ошибки, словарь с данными в случае успеха
"""
if query_filter is None:
query_filter = {}
await self.connect()
try:
if isinstance(columns, str):
columns = [columns]
full_query = f"SELECT {','.join(columns)} FROM {table_name}"
if query_filter:
query_filter = ' AND '.join(
[f"{key} = '{value}'" for key, value in query_filter.items()]
)
full_query += f' WHERE {query_filter}'
async with self.conn.transaction():
result = await self.conn.fetch(full_query)
return result
except Exception as e:
logging.error(f'Ошибка в get_data {e}')
finally:
await self.conn.close()
return []
async def update_data(self, new_data: dict, query_filter: dict, table_name: str) -> bool:
"""
Обновляет данные по заданным фильтрам
:param new_data: Новые данные в формате {Колонка: новое значение}
:param query_filter: Фильтры запроса в формате {колонка: её значение}
:param table_name: Название таблицы для запроса
:return: True в случае успеха, False в случае ошибки
"""
await self.connect()
try:
dollar_data = {key: f"${i + 1}" for i, key in enumerate(new_data)}
values = ', '.join(f'{key} = {value}' for key, value in dollar_data.items())
full_query = f"UPDATE {table_name} SET {values}"
if query_filter:
query_filter = ' AND '.join([f"{key} = '{value}'" for key, value in query_filter.items()])
full_query += f' WHERE {query_filter}'
async with self.conn.transaction():
await self.conn.execute(full_query, *new_data.values())
return True
except Exception as e:
logging.error(f'Ошибка в update_data {e}')
return False
finally:
await self.conn.close()
async def create_row(self, data_to_insert: dict, table_name: str) -> bool:
"""
Создаёт новую строку с данными
:param data_to_insert: Список, где ключ - название столбика, значение - значение столбика в новой строчке
:param table_name: Название таблицы, куда вставляем данные
:return: id последней вставленной строки
"""
await self.connect()
try:
dollars = [f"${i + 1}" for i in range(len(data_to_insert))]
full_query = f"INSERT INTO {table_name} ({', '.join(data_to_insert.keys())}) VALUES ({', '.join(dollars)})"
async with self.conn.transaction():
await self.conn.execute(full_query, *data_to_insert.values())
return True
except Exception as e:
logging.error(f'Ошибка в create_row {e}')
return False
finally:
await self.conn.close()
async def query(self, query_text: str):
"""
Прямой запрос к бд
:param query_text: sql запрос
:return: Результат sql запроса
"""
await self.connect()
try:
async with self.conn.transaction():
await self.conn.execute(query_text)
except Exception as e:
logging.error(f'Ошибка в query {e}')
finally:
await self.conn.close()
class Redis:
def __init__(self):
self.conn = None
async def connect(self):
try:
self.conn = await redis.Redis(
host=REDIS_HOST,
port=REDIS_PORT,
db=REDIS_NAME,
password=REDIS_PASSWORD,
decode_responses=True,
encoding='utf-8'
)
except Exception as e:
logging.error('redis connect', e)
async def delete_key(self, *keys: str | int) -> str | int:
await self.connect()
try:
return await self.conn.delete(*keys)
except Exception as e:
logging.error('redis delete_key', e)
finally:
await self.conn.close()
async def update_list(self, key: str | int, *values) -> str | int:
await self.connect()
try:
return await self.conn.rpush(key, *values)
except Exception as e:
logging.error('redis update_data', e)
finally:
await self.conn.close()
async def get_list(self, key: str | int) -> list:
await self.connect()
try:
data = await self.conn.lrange(name=str(key), start=0, end=-1)
return data
except Exception as e:
logging.error('redis get_data', e)
return []
finally:
await self.conn.close()
async def update_dict(self, key: str | int, value: dict) -> str | int:
await self.connect()
try:
return await self.conn.hset(name=str(key), mapping=value)
except Exception as e:
logging.error('redis update', e)
finally:
await self.conn.close()
async def get_dict(self, key: str | int) -> dict:
await self.connect()
try:
data = await self.conn.hgetall(name=str(key))
return data
except Exception as e:
logging.error('redis get', e)
return []
finally:
await self.conn.close()