Добавлена загрузка файлов из буфера, InputMediaBuffer
This commit is contained in:
parent
b20a46de24
commit
dd1bdb5e37
@ -1,10 +1,13 @@
|
|||||||
import os
|
import os
|
||||||
|
import mimetypes
|
||||||
|
|
||||||
from typing import TYPE_CHECKING
|
from typing import TYPE_CHECKING
|
||||||
|
from uuid import uuid4
|
||||||
|
|
||||||
import aiofiles
|
import aiofiles
|
||||||
import aiohttp
|
import aiohttp
|
||||||
|
|
||||||
|
import puremagic
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
|
|
||||||
from ..exceptions.invalid_token import InvalidToken
|
from ..exceptions.invalid_token import InvalidToken
|
||||||
@ -135,6 +138,51 @@ class BaseConnection:
|
|||||||
|
|
||||||
return await response.text()
|
return await response.text()
|
||||||
|
|
||||||
|
async def upload_file_buffer(
|
||||||
|
self,
|
||||||
|
url: str,
|
||||||
|
buffer: bytes,
|
||||||
|
type: UploadType
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Загружает файл из буфера.
|
||||||
|
|
||||||
|
:param url: Конечная точка загрузки файла
|
||||||
|
:param buffer: Буфер (bytes)
|
||||||
|
:param type: Тип файла (video, image, audio, file)
|
||||||
|
|
||||||
|
:return: Сырой .text() ответ от сервера после загрузки файла
|
||||||
|
"""
|
||||||
|
|
||||||
|
try:
|
||||||
|
matches = puremagic.magic_string(buffer[:4096])
|
||||||
|
if matches:
|
||||||
|
mime_type = matches[0][1]
|
||||||
|
ext = mimetypes.guess_extension(mime_type) or ''
|
||||||
|
else:
|
||||||
|
mime_type = f"{type.value}/*"
|
||||||
|
ext = ''
|
||||||
|
except Exception:
|
||||||
|
mime_type = f"{type.value}/*"
|
||||||
|
ext = ''
|
||||||
|
|
||||||
|
basename = f'{uuid4()}{ext}'
|
||||||
|
|
||||||
|
form = aiohttp.FormData()
|
||||||
|
form.add_field(
|
||||||
|
name='data',
|
||||||
|
value=buffer,
|
||||||
|
filename=basename,
|
||||||
|
content_type=mime_type
|
||||||
|
)
|
||||||
|
|
||||||
|
async with aiohttp.ClientSession() as session:
|
||||||
|
response = await session.post(
|
||||||
|
url=url,
|
||||||
|
data=form
|
||||||
|
)
|
||||||
|
return await response.text()
|
||||||
|
|
||||||
async def download_file(
|
async def download_file(
|
||||||
self,
|
self,
|
||||||
path: str,
|
path: str,
|
||||||
|
@ -9,7 +9,7 @@ from .types.sended_message import SendedMessage
|
|||||||
from ..types.attachments.upload import AttachmentPayload, AttachmentUpload
|
from ..types.attachments.upload import AttachmentPayload, AttachmentUpload
|
||||||
from ..types.errors import Error
|
from ..types.errors import Error
|
||||||
from ..types.message import NewMessageLink
|
from ..types.message import NewMessageLink
|
||||||
from ..types.input_media import InputMedia
|
from ..types.input_media import InputMedia, InputMediaBuffer
|
||||||
from ..types.attachments.attachment import Attachment
|
from ..types.attachments.attachment import Attachment
|
||||||
|
|
||||||
from ..enums.upload_type import UploadType
|
from ..enums.upload_type import UploadType
|
||||||
@ -67,7 +67,7 @@ class SendMessage(BaseConnection):
|
|||||||
|
|
||||||
async def __process_input_media(
|
async def __process_input_media(
|
||||||
self,
|
self,
|
||||||
att: InputMedia
|
att: InputMedia | InputMediaBuffer
|
||||||
):
|
):
|
||||||
|
|
||||||
# очень нестабильный метод независящий от модуля
|
# очень нестабильный метод независящий от модуля
|
||||||
@ -85,11 +85,18 @@ class SendMessage(BaseConnection):
|
|||||||
|
|
||||||
upload = await self.bot.get_upload_url(att.type)
|
upload = await self.bot.get_upload_url(att.type)
|
||||||
|
|
||||||
upload_file_response = await self.upload_file(
|
if isinstance(att, InputMedia):
|
||||||
url=upload.url,
|
upload_file_response = await self.upload_file(
|
||||||
path=att.path,
|
url=upload.url,
|
||||||
type=att.type
|
path=att.path,
|
||||||
)
|
type=att.type,
|
||||||
|
)
|
||||||
|
elif isinstance(att, InputMediaBuffer):
|
||||||
|
upload_file_response = await self.upload_file_buffer(
|
||||||
|
url=upload.url,
|
||||||
|
buffer=att.buffer,
|
||||||
|
type=att.type,
|
||||||
|
)
|
||||||
|
|
||||||
if att.type in (UploadType.VIDEO, UploadType.AUDIO):
|
if att.type in (UploadType.VIDEO, UploadType.AUDIO):
|
||||||
token = upload.token
|
token = upload.token
|
||||||
@ -134,7 +141,7 @@ class SendMessage(BaseConnection):
|
|||||||
|
|
||||||
for att in self.attachments:
|
for att in self.attachments:
|
||||||
|
|
||||||
if isinstance(att, InputMedia):
|
if isinstance(att, InputMedia) or isinstance(att, InputMediaBuffer):
|
||||||
input_media = await self.__process_input_media(att)
|
input_media = await self.__process_input_media(att)
|
||||||
json['attachments'].append(
|
json['attachments'].append(
|
||||||
input_media.model_dump()
|
input_media.model_dump()
|
||||||
|
@ -26,6 +26,7 @@ from ..types.message import Message
|
|||||||
from ..types.command import Command, BotCommand
|
from ..types.command import Command, BotCommand
|
||||||
|
|
||||||
from .input_media import InputMedia
|
from .input_media import InputMedia
|
||||||
|
from .input_media import InputMediaBuffer
|
||||||
|
|
||||||
__all__ = [
|
__all__ = [
|
||||||
UpdateUnion,
|
UpdateUnion,
|
||||||
|
@ -1,34 +1,38 @@
|
|||||||
import mimetypes
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from typing import TYPE_CHECKING
|
||||||
|
|
||||||
|
import puremagic
|
||||||
|
|
||||||
from ..enums.upload_type import UploadType
|
from ..enums.upload_type import UploadType
|
||||||
|
|
||||||
|
|
||||||
class InputMedia:
|
if TYPE_CHECKING:
|
||||||
|
from io import BytesIO
|
||||||
|
|
||||||
|
|
||||||
|
class InputMedia:
|
||||||
"""
|
"""
|
||||||
Класс для представления медиафайла.
|
Класс для представления медиафайла.
|
||||||
|
|
||||||
Attributes:
|
Attributes:
|
||||||
path (str): Путь к файлу.
|
path (str): Путь к файлу.
|
||||||
type (UploadType): Тип файла, определенный на основе MIME-типа.
|
type (UploadType): Тип файла, определенный на основе содержимого (MIME-типа).
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, path: str):
|
def __init__(self, path: str):
|
||||||
|
|
||||||
"""
|
"""
|
||||||
Инициализирует объект медиафайла.
|
Инициализирует объект медиафайла.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
path (str): Путь к файлу.
|
path (str): Путь к файлу.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
self.path = path
|
self.path = path
|
||||||
self.type = self.__detect_file_type(path)
|
self.type = self.__detect_file_type(path)
|
||||||
|
|
||||||
def __detect_file_type(self, path: str) -> UploadType:
|
def __detect_file_type(self, path: str) -> UploadType:
|
||||||
|
|
||||||
"""
|
"""
|
||||||
Определяет тип файла на основе его MIME-типа.
|
Определяет тип файла на основе его содержимого (MIME-типа).
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
path (str): Путь к файлу.
|
path (str): Путь к файлу.
|
||||||
@ -36,8 +40,17 @@ class InputMedia:
|
|||||||
Returns:
|
Returns:
|
||||||
UploadType: Тип файла (VIDEO, IMAGE, AUDIO или FILE).
|
UploadType: Тип файла (VIDEO, IMAGE, AUDIO или FILE).
|
||||||
"""
|
"""
|
||||||
|
with open(path, 'rb') as f:
|
||||||
|
sample = f.read(4096)
|
||||||
|
|
||||||
mime_type, _ = mimetypes.guess_type(path)
|
try:
|
||||||
|
matches = puremagic.magic_string(sample)
|
||||||
|
if matches:
|
||||||
|
mime_type = matches[0].mime_type
|
||||||
|
else:
|
||||||
|
mime_type = None
|
||||||
|
except Exception:
|
||||||
|
mime_type = None
|
||||||
|
|
||||||
if mime_type is None:
|
if mime_type is None:
|
||||||
return UploadType.FILE
|
return UploadType.FILE
|
||||||
@ -50,3 +63,44 @@ class InputMedia:
|
|||||||
return UploadType.AUDIO
|
return UploadType.AUDIO
|
||||||
else:
|
else:
|
||||||
return UploadType.FILE
|
return UploadType.FILE
|
||||||
|
|
||||||
|
|
||||||
|
class InputMediaBuffer:
|
||||||
|
"""
|
||||||
|
Класс для представления медиафайла из буфера.
|
||||||
|
|
||||||
|
Attributes:
|
||||||
|
buffer (BytesIO): Буфер с содержимым файла.
|
||||||
|
type (UploadType): Тип файла, определенный по содержимому.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, buffer: BytesIO):
|
||||||
|
"""
|
||||||
|
Инициализирует объект медиафайла из буфера.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
buffer (IO): Буфер с содержимым файла.
|
||||||
|
"""
|
||||||
|
self.buffer = buffer
|
||||||
|
self.type = self.__detect_file_type(buffer)
|
||||||
|
|
||||||
|
def __detect_file_type(self, buffer: BytesIO) -> UploadType:
|
||||||
|
try:
|
||||||
|
matches = puremagic.magic_string(buffer)
|
||||||
|
if matches:
|
||||||
|
mime_type = matches[0].mime_type
|
||||||
|
else:
|
||||||
|
mime_type = None
|
||||||
|
except Exception:
|
||||||
|
mime_type = None
|
||||||
|
|
||||||
|
if mime_type is None:
|
||||||
|
return UploadType.FILE
|
||||||
|
if mime_type.startswith('video/'):
|
||||||
|
return UploadType.VIDEO
|
||||||
|
elif mime_type.startswith('image/'):
|
||||||
|
return UploadType.IMAGE
|
||||||
|
elif mime_type.startswith('audio/'):
|
||||||
|
return UploadType.AUDIO
|
||||||
|
else:
|
||||||
|
return UploadType.FILE
|
Loading…
x
Reference in New Issue
Block a user