420 lines
15 KiB
Python
420 lines
15 KiB
Python
import yt_dlp
|
|
import aiohttp
|
|
import asyncio
|
|
from _config import *
|
|
from cachetools import TTLCache
|
|
import zipfile
|
|
import io
|
|
|
|
class YTVideoInfo:
|
|
def __init__(self, link: str):
|
|
self.link = link
|
|
|
|
def _fetch_sync(self) -> dict:
|
|
with cache_lock:
|
|
if self.link in info_cache:
|
|
return info_cache[self.link]
|
|
|
|
with yt_dlp.YoutubeDL({'quiet': True, 'no_warnings': True}) as ydl:
|
|
info = ydl.extract_info(self.link, download=False)
|
|
|
|
with cache_lock:
|
|
info_cache[self.link] = info
|
|
|
|
return info
|
|
|
|
async def fetch(self) -> dict:
|
|
loop = asyncio.get_event_loop()
|
|
return await loop.run_in_executor(executor, self._fetch_sync)
|
|
|
|
def _validate_sync(self) -> tuple[bool, str]:
|
|
try:
|
|
info = self._fetch_sync()
|
|
if not info:
|
|
return False, 'Could not fetch video info.'
|
|
return True, ''
|
|
except yt_dlp.utils.DownloadError as e:
|
|
msg = str(e)
|
|
if 'Private video' in msg: return False, 'This video is private.'
|
|
if 'age' in msg.lower(): return False, 'This video is age-restricted.'
|
|
if 'unavailable' in msg.lower(): return False, 'This video is unavailable.'
|
|
return False, 'Could not load this video.'
|
|
|
|
async def validate(self) -> tuple[bool, str]:
|
|
loop = asyncio.get_event_loop()
|
|
return await loop.run_in_executor(executor, self._validate_sync)
|
|
|
|
def get_formats(self, info: dict) -> list[dict]:
|
|
seen, formats = set(), []
|
|
for f in reversed(info['formats']):
|
|
height = f.get('height')
|
|
fps = f.get('fps')
|
|
ext = f.get('ext', '')
|
|
if not height or f.get('vcodec', 'none') == 'none':
|
|
continue
|
|
key = (height, fps)
|
|
if key in seen:
|
|
continue
|
|
seen.add(key)
|
|
label = f"{height}p"
|
|
if fps and fps > 30:
|
|
label += f" {int(fps)}fps"
|
|
label += f" ({ext.upper()})"
|
|
formats.append({'id': f['format_id'], 'label': label, 'height': height})
|
|
return sorted(formats, key=lambda x: x['height'], reverse=True)
|
|
|
|
def get_subtitles(self, info: dict) -> list[dict]:
|
|
subtitles = []
|
|
for lang, tracks in info.get('subtitles', {}).items():
|
|
if any(t.get('ext') == 'vtt' for t in tracks):
|
|
subtitles.append({'lang': lang, 'label': f"{lang} (manual)", 'auto': '0'})
|
|
for lang, tracks in info.get('automatic_captions', {}).items():
|
|
if any(t.get('ext') == 'vtt' for t in tracks):
|
|
subtitles.append({'lang': lang, 'label': f"{lang} (auto)", 'auto': '1'})
|
|
return subtitles
|
|
|
|
def get_subtitle_vtt_url(self, info: dict, lang: str, auto: bool) -> str | None:
|
|
source = info.get('automatic_captions' if auto else 'subtitles', {})
|
|
tracks = source.get(lang, [])
|
|
return next((t['url'] for t in tracks if t.get('ext') == 'vtt'), None)
|
|
|
|
def summary(self, info: dict) -> dict:
|
|
raw_date = info.get('upload_date', '')
|
|
n = info.get('view_count', 0)
|
|
|
|
def fmt_views(n):
|
|
if not n: return 'N/A'
|
|
if n >= 1_000_000: return f"{n/1_000_000:.1f}M"
|
|
if n >= 1_000: return f"{n/1_000:.1f}K"
|
|
return str(n)
|
|
|
|
def fmt_duration(s):
|
|
if not s: return 'N/A'
|
|
h, m, sec = s // 3600, (s % 3600) // 60, s % 60
|
|
return f"{h}:{m:02}:{sec:02}" if h else f"{m}:{sec:02}"
|
|
|
|
return {
|
|
'title': info.get('title', 'Video'),
|
|
'uploader': info.get('uploader', 'N/A'),
|
|
'views': fmt_views(n),
|
|
'duration': fmt_duration(info.get('duration')),
|
|
'upload_date': f"{raw_date[:4]}-{raw_date[4:6]}-{raw_date[6:]}" if raw_date else 'N/A',
|
|
'description': info.get('description', 'No description.'),
|
|
}
|
|
|
|
|
|
|
|
class YTVideoStream:
|
|
def __init__(self, link: str, format_id: str = 'best'):
|
|
self.link = link
|
|
self.format_id = format_id
|
|
|
|
def _get_urls_sync(self) -> tuple[str, str | None]:
|
|
ydl_opts = {
|
|
'format': f'{self.format_id}+bestaudio[ext=m4a]/bestvideo+bestaudio',
|
|
'quiet': True,
|
|
}
|
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
|
info = ydl.extract_info(self.link, download=False)
|
|
|
|
if 'requested_formats' in info and len(info['requested_formats']) == 2:
|
|
return (info['requested_formats'][0]['url'],
|
|
info['requested_formats'][1]['url'])
|
|
return info['url'], None
|
|
|
|
async def _get_urls(self) -> tuple[str, str | None]:
|
|
loop = asyncio.get_event_loop()
|
|
return await loop.run_in_executor(executor, self._get_urls_sync)
|
|
|
|
async def generate(self):
|
|
video_url, audio_url = await self._get_urls()
|
|
|
|
if audio_url:
|
|
process = await asyncio.create_subprocess_exec(
|
|
'ffmpeg',
|
|
'-i', video_url,
|
|
'-i', audio_url,
|
|
'-c:v', 'copy',
|
|
'-c:a', 'aac',
|
|
'-b:a', '192k',
|
|
'-g', '60',
|
|
'-f', 'mp4',
|
|
'-movflags', 'frag_keyframe+empty_moov+faststart',
|
|
'-frag_duration', '2000000',
|
|
'pipe:1',
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.DEVNULL,
|
|
)
|
|
|
|
pre_buffer, buffered_mb = [], 0
|
|
try:
|
|
while buffered_mb < PRE_BUFFER_MB:
|
|
chunk = await process.stdout.read(CHUNK_SIZE)
|
|
if not chunk:
|
|
for c in pre_buffer:
|
|
yield c
|
|
return
|
|
pre_buffer.append(chunk)
|
|
buffered_mb += len(chunk) / (1024 * 1024)
|
|
|
|
for c in pre_buffer:
|
|
yield c
|
|
|
|
while True:
|
|
chunk = await process.stdout.read(CHUNK_SIZE)
|
|
if not chunk:
|
|
break
|
|
yield chunk
|
|
finally:
|
|
try:
|
|
process.kill()
|
|
except Exception:
|
|
pass
|
|
|
|
else:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(video_url) as r:
|
|
async for chunk in r.content.iter_chunked(CHUNK_SIZE):
|
|
yield chunk
|
|
|
|
|
|
class YTVideoDownloader:
|
|
def __init__(self, link: str, format_id: str = 'best'):
|
|
self.link = link
|
|
self.format_id = format_id
|
|
|
|
def _get_urls_sync(self) -> tuple[str, str | None, str]:
|
|
ydl_opts = {
|
|
'format': (
|
|
f'{self.format_id}[ext=mp4]+bestaudio[ext=m4a]'
|
|
f'/bestvideo[ext=mp4]+bestaudio[ext=m4a]'
|
|
f'/best[ext=mp4]/best'
|
|
),
|
|
'quiet': True,
|
|
'no_warnings': True,
|
|
'socket_timeout': 30,
|
|
'http_headers': {
|
|
'User-Agent': (
|
|
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
|
|
'AppleWebKit/537.36 (KHTML, like Gecko) '
|
|
'Chrome/120.0.0.0 Safari/537.36'
|
|
)
|
|
},
|
|
}
|
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
|
info = ydl.extract_info(self.link, download=False)
|
|
|
|
title = info.get('title', 'video')
|
|
|
|
if 'requested_formats' in info and len(info['requested_formats']) == 2:
|
|
return (
|
|
info['requested_formats'][0]['url'],
|
|
info['requested_formats'][1]['url'],
|
|
title,
|
|
)
|
|
return info['url'], None, title
|
|
|
|
async def get_urls(self) -> tuple[str, str | None, str]:
|
|
loop = asyncio.get_event_loop()
|
|
return await loop.run_in_executor(executor, self._get_urls_sync)
|
|
|
|
async def generate(self, progress_callback=None):
|
|
video_url, audio_url, _ = await self.get_urls()
|
|
|
|
if audio_url:
|
|
process = await asyncio.create_subprocess_exec(
|
|
'ffmpeg',
|
|
'-i', video_url,
|
|
'-i', audio_url,
|
|
'-c:v', 'copy',
|
|
'-c:a', 'aac',
|
|
'-b:a', '192k',
|
|
'-g', '60',
|
|
'-f', 'mp4',
|
|
'-movflags', 'frag_keyframe+empty_moov+faststart',
|
|
'-frag_duration', '2000000',
|
|
'pipe:1',
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.DEVNULL,
|
|
)
|
|
|
|
bytes_sent = 0
|
|
try:
|
|
while True:
|
|
chunk = await process.stdout.read(CHUNK_SIZE)
|
|
if not chunk:
|
|
break
|
|
bytes_sent += len(chunk)
|
|
|
|
if progress_callback:
|
|
progress_callback(bytes_sent)
|
|
|
|
yield chunk
|
|
finally:
|
|
try:
|
|
process.kill()
|
|
await process.wait()
|
|
except Exception:
|
|
pass
|
|
|
|
else:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(video_url) as r:
|
|
bytes_sent = 0
|
|
async for chunk in r.content.iter_chunked(CHUNK_SIZE):
|
|
bytes_sent += len(chunk)
|
|
if progress_callback:
|
|
progress_callback(bytes_sent)
|
|
yield chunk
|
|
|
|
|
|
class YTPlaylist:
|
|
"""Handles playlist metadata extraction and streaming/downloading."""
|
|
|
|
def __init__(self, link: str):
|
|
self.link = link
|
|
|
|
def _fetch_sync(self) -> dict:
|
|
"""Fetch full playlist info — each entry has its own formats."""
|
|
ydl_opts = {
|
|
'quiet': True,
|
|
'no_warnings': True,
|
|
'extract_flat': False,
|
|
'ignoreerrors': True,
|
|
'socket_timeout': 30,
|
|
}
|
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
|
return ydl.extract_info(self.link, download=False)
|
|
|
|
def _fetch_flat_sync(self) -> dict:
|
|
"""
|
|
Fast fetch — only titles/IDs, no format details.
|
|
Use this for the info endpoint to avoid 30s+ waits.
|
|
"""
|
|
ydl_opts = {
|
|
'quiet': True,
|
|
'no_warnings': True,
|
|
'extract_flat': True,
|
|
'ignoreerrors': True,
|
|
}
|
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
|
return ydl.extract_info(self.link, download=False)
|
|
|
|
async def fetch_flat(self) -> dict:
|
|
loop = asyncio.get_event_loop()
|
|
return await loop.run_in_executor(executor, self._fetch_flat_sync)
|
|
|
|
async def fetch(self) -> dict:
|
|
loop = asyncio.get_event_loop()
|
|
return await loop.run_in_executor(executor, self._fetch_sync)
|
|
|
|
def get_entries(self, info: dict) -> list[dict]:
|
|
"""Return clean list of video entries from playlist."""
|
|
entries = []
|
|
for i, entry in enumerate(info.get('entries', []), 1):
|
|
if not entry:
|
|
continue
|
|
entries.append({
|
|
'index': i,
|
|
'id': entry.get('id', ''),
|
|
'title': entry.get('title', f'Video {i}'),
|
|
'url': entry.get('url') or f"https://www.youtube.com/watch?v={entry.get('id')}",
|
|
'duration': entry.get('duration'),
|
|
'uploader': entry.get('uploader', ''),
|
|
'thumbnail': entry.get('thumbnail', ''),
|
|
})
|
|
return entries
|
|
|
|
def summary(self, info: dict) -> dict:
|
|
entries = self.get_entries(info)
|
|
return {
|
|
'title': info.get('title', 'Playlist'),
|
|
'uploader': info.get('uploader') or info.get('channel', 'N/A'),
|
|
'video_count': len(entries),
|
|
'entries': entries,
|
|
}
|
|
|
|
async def generate_zip(
|
|
self,
|
|
format_id: str = 'best',
|
|
progress_callback=None
|
|
):
|
|
"""
|
|
Async generator — yields chunks of a ZIP file containing all videos.
|
|
Each video is streamed through FFmpeg and added to the zip on the fly.
|
|
"""
|
|
info = await self.fetch_flat()
|
|
entries = self.get_entries(info)
|
|
total = len(entries)
|
|
|
|
zip_buffer = io.BytesIO()
|
|
|
|
for i, entry in enumerate(entries, 1):
|
|
video_url_yt = entry['url']
|
|
safe_title = "".join(
|
|
c for c in entry['title'] if c.isascii() and (c.isalnum() or c in ' ._-')
|
|
)[:60].strip() or f'video_{i}'
|
|
|
|
if progress_callback:
|
|
progress_callback(i, total, entry['title'])
|
|
|
|
try:
|
|
downloader = YTVideoDownloader(video_url_yt, format_id)
|
|
vid_url, aud_url, _ = await downloader.get_urls()
|
|
except Exception as e:
|
|
print(f'Skipping {entry["title"]}: {e}')
|
|
continue
|
|
|
|
video_bytes = io.BytesIO()
|
|
|
|
if aud_url:
|
|
process = await asyncio.create_subprocess_exec(
|
|
'ffmpeg',
|
|
'-i', vid_url,
|
|
'-i', aud_url,
|
|
'-c:v', 'copy',
|
|
'-c:a', 'aac',
|
|
'-b:a', '192k',
|
|
'-f', 'mp4',
|
|
'-movflags', 'frag_keyframe+empty_moov+faststart',
|
|
'pipe:1',
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.DEVNULL,
|
|
)
|
|
try:
|
|
while True:
|
|
chunk = await process.stdout.read(256 * 1024)
|
|
if not chunk:
|
|
break
|
|
video_bytes.write(chunk)
|
|
finally:
|
|
try:
|
|
process.kill()
|
|
await process.wait()
|
|
except Exception:
|
|
pass
|
|
else:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(vid_url) as r:
|
|
async for chunk in r.content.iter_chunked(256 * 1024):
|
|
video_bytes.write(chunk)
|
|
|
|
video_bytes.seek(0)
|
|
filename = f'{i:02d}. {safe_title}.mp4'
|
|
with zipfile.ZipFile(zip_buffer, 'a', zipfile.ZIP_STORED) as zf:
|
|
zf.writestr(filename, video_bytes.read())
|
|
|
|
zip_buffer.seek(0)
|
|
while True:
|
|
chunk = zip_buffer.read(256 * 1024)
|
|
if not chunk:
|
|
break
|
|
yield chunk
|
|
|
|
zip_buffer.seek(0)
|
|
zip_buffer.truncate(0)
|
|
|
|
zip_buffer.seek(0)
|
|
remainder = zip_buffer.read()
|
|
if remainder:
|
|
yield remainder |