chat_moderator_bot/utils/db.py
2024-07-17 23:35:45 +03:00

251 lines
8.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
import asyncpg
import redis.asyncio as redis
from config import POSTGRES_NAME, POSTGRES_USER, POSTGRES_PASSWORD, POSTGRES_HOST, POSTGRES_PORT, REDIS_NAME, \
REDIS_HOST, REDIS_PORT, REDIS_PASSWORD
create_tables_query = """
CREATE TABLE IF NOT EXISTS ban_words
(id SERIAL PRIMARY KEY,
word TEXT NOT NULL);
CREATE TABLE IF NOT EXISTS message
(id SERIAL PRIMARY KEY,
text TEXT NOT NULL,
media TEXT,
buttons TEXT NOT NULL,
included BOOL NOT NULL);
CREATE TABLE IF NOT EXISTS ban_media
(id SERIAL PRIMARY KEY,
video BOOL NOT NULL,
photo BOOL NOT NULL);
"""
exist_query = """
SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = $1)
"""
class Postgres:
def __init__(self):
self.conn = None
async def connect(self):
self.conn = await asyncpg.connect(
database=POSTGRES_NAME,
user=POSTGRES_USER,
password=POSTGRES_PASSWORD,
host=POSTGRES_HOST,
port=POSTGRES_PORT,
)
async def create_tables(self) -> None:
"""
Создаёт таблицы если их нет
:return:
"""
await self.connect()
try:
async with self.conn.transaction():
message_exist = await self.conn.fetchval(
exist_query, 'message'
)
ban_media_exist = await self.conn.fetchval(
exist_query, 'ban_media'
)
await self.conn.execute(
create_tables_query
)
if not message_exist:
await self.create_row(
table_name='message',
data_to_insert={
'text': 'Сообщение',
'media': '',
'buttons': '',
'included': False
}
)
if not ban_media_exist:
await self.create_row(
table_name='ban_media',
data_to_insert={
'photo': True,
'video': True,
}
)
except Exception as e:
logging.error(f'Ошибка в create_tables {e}')
finally:
await self.conn.close()
async def get_data(self, table_name: str, columns='*', query_filter=None) -> list:
"""
Получить данные нужной таблицы по указанным фильтрам
:param columns: Название колонн с нужными данными
:param query_filter: Фильтры запроса в формате {колонка: её значение}
:param table_name: Название таблицы для запроса
:return: False в случае ошибки, словарь с данными в случае успеха
"""
if query_filter is None:
query_filter = {}
await self.connect()
try:
if isinstance(columns, str):
columns = [columns]
full_query = f"SELECT {','.join(columns)} FROM {table_name}"
if query_filter:
query_filter = ' AND '.join(
[f"{key} = '{value}'" for key, value in query_filter.items()]
)
full_query += f' WHERE {query_filter}'
async with self.conn.transaction():
result = await self.conn.fetch(full_query)
return result
except Exception as e:
logging.error(f'Ошибка в get_data {e}')
finally:
await self.conn.close()
return []
async def update_data(self, new_data: dict, query_filter: dict, table_name: str) -> bool:
"""
Обновляет данные по заданным фильтрам
:param new_data: Новые данные в формате {Колонка: новое значение}
:param query_filter: Фильтры запроса в формате {колонка: её значение}
:param table_name: Название таблицы для запроса
:return: True в случае успеха, False в случае ошибки
"""
await self.connect()
try:
dollar_data = {key: f"${i + 1}" for i, key in enumerate(new_data)}
values = ', '.join(f'{key} = {value}' for key, value in dollar_data.items())
full_query = f"UPDATE {table_name} SET {values}"
if query_filter:
query_filter = ' AND '.join([f"{key} = '{value}'" for key, value in query_filter.items()])
full_query += f' WHERE {query_filter}'
async with self.conn.transaction():
await self.conn.execute(full_query, *new_data.values())
return True
except Exception as e:
logging.error(f'Ошибка в update_data {e}')
return False
finally:
await self.conn.close()
async def create_row(self, data_to_insert: dict, table_name: str) -> bool:
"""
Создаёт новую строку с данными
:param data_to_insert: Список, где ключ - название столбика, значение - значение столбика в новой строчке
:param table_name: Название таблицы, куда вставляем данные
:return: id последней вставленной строки
"""
await self.connect()
try:
dollars = [f"${i + 1}" for i in range(len(data_to_insert))]
full_query = f"INSERT INTO {table_name} ({', '.join(data_to_insert.keys())}) VALUES ({', '.join(dollars)})"
async with self.conn.transaction():
await self.conn.execute(full_query, *data_to_insert.values())
return True
except Exception as e:
logging.error(f'Ошибка в create_row {e}')
return False
finally:
await self.conn.close()
async def query(self, query_text: str):
"""
Прямой запрос к бд
:param query_text: sql запрос
:return: Результат sql запроса
"""
await self.connect()
try:
async with self.conn.transaction():
await self.conn.execute(query_text)
except Exception as e:
logging.error(f'Ошибка в query {e}')
finally:
await self.conn.close()
class Redis:
def __init__(self):
self.conn = None
async def connect(self):
try:
self.conn = await redis.Redis(
host=REDIS_HOST,
port=REDIS_PORT,
db=REDIS_NAME,
password=REDIS_PASSWORD,
decode_responses=True,
encoding='utf-8'
)
except Exception as e:
logging.error('redis connect', e)
async def delete_key(self, *keys: str | int) -> str | int:
await self.connect()
try:
return await self.conn.delete(*keys)
except Exception as e:
logging.error('redis delete_key', e)
finally:
await self.conn.close()
async def update_list(self, key: str | int, *values) -> str | int:
await self.connect()
try:
return await self.conn.rpush(key, *values)
except Exception as e:
logging.error('redis update_data', e)
finally:
await self.conn.close()
async def get_list(self, key: str | int) -> list:
await self.connect()
try:
data = await self.conn.lrange(name=str(key), start=0, end=-1)
return data
except Exception as e:
logging.error('redis get_data', e)
return []
finally:
await self.conn.close()
async def update_dict(self, key: str | int, value: dict) -> str | int:
await self.connect()
try:
return await self.conn.hset(name=str(key), mapping=value)
except Exception as e:
logging.error('redis update', e)
finally:
await self.conn.close()
async def get_dict(self, key: str | int) -> dict:
await self.connect()
try:
data = await self.conn.hgetall(name=str(key))
return data
except Exception as e:
logging.error('redis get', e)
return []
finally:
await self.conn.close()