import yt_dlp import aiohttp import asyncio from _config import * from cachetools import TTLCache import zipfile import io class YTVideoInfo: def __init__(self, link: str): self.link = link def _fetch_sync(self) -> dict: with cache_lock: if self.link in info_cache: return info_cache[self.link] with yt_dlp.YoutubeDL({'quiet': True, 'no_warnings': True}) as ydl: info = ydl.extract_info(self.link, download=False) with cache_lock: info_cache[self.link] = info return info async def fetch(self) -> dict: loop = asyncio.get_event_loop() return await loop.run_in_executor(executor, self._fetch_sync) def _validate_sync(self) -> tuple[bool, str]: try: info = self._fetch_sync() if not info: return False, 'Could not fetch video info.' return True, '' except yt_dlp.utils.DownloadError as e: msg = str(e) if 'Private video' in msg: return False, 'This video is private.' if 'age' in msg.lower(): return False, 'This video is age-restricted.' if 'unavailable' in msg.lower(): return False, 'This video is unavailable.' return False, 'Could not load this video.' async def validate(self) -> tuple[bool, str]: loop = asyncio.get_event_loop() return await loop.run_in_executor(executor, self._validate_sync) def get_formats(self, info: dict) -> list[dict]: seen, formats = set(), [] for f in reversed(info['formats']): height = f.get('height') fps = f.get('fps') ext = f.get('ext', '') if not height or f.get('vcodec', 'none') == 'none': continue key = (height, fps) if key in seen: continue seen.add(key) label = f"{height}p" if fps and fps > 30: label += f" {int(fps)}fps" label += f" ({ext.upper()})" formats.append({'id': f['format_id'], 'label': label, 'height': height}) return sorted(formats, key=lambda x: x['height'], reverse=True) def get_subtitles(self, info: dict) -> list[dict]: subtitles = [] for lang, tracks in info.get('subtitles', {}).items(): if any(t.get('ext') == 'vtt' for t in tracks): subtitles.append({'lang': lang, 'label': f"{lang} (manual)", 'auto': '0'}) for lang, tracks in info.get('automatic_captions', {}).items(): if any(t.get('ext') == 'vtt' for t in tracks): subtitles.append({'lang': lang, 'label': f"{lang} (auto)", 'auto': '1'}) return subtitles def get_subtitle_vtt_url(self, info: dict, lang: str, auto: bool) -> str | None: source = info.get('automatic_captions' if auto else 'subtitles', {}) tracks = source.get(lang, []) return next((t['url'] for t in tracks if t.get('ext') == 'vtt'), None) def summary(self, info: dict) -> dict: raw_date = info.get('upload_date', '') n = info.get('view_count', 0) def fmt_views(n): if not n: return 'N/A' if n >= 1_000_000: return f"{n/1_000_000:.1f}M" if n >= 1_000: return f"{n/1_000:.1f}K" return str(n) def fmt_duration(s): if not s: return 'N/A' h, m, sec = s // 3600, (s % 3600) // 60, s % 60 return f"{h}:{m:02}:{sec:02}" if h else f"{m}:{sec:02}" return { 'title': info.get('title', 'Video'), 'uploader': info.get('uploader', 'N/A'), 'views': fmt_views(n), 'duration': fmt_duration(info.get('duration')), 'upload_date': f"{raw_date[:4]}-{raw_date[4:6]}-{raw_date[6:]}" if raw_date else 'N/A', 'description': info.get('description', 'No description.'), } class YTVideoStream: def __init__(self, link: str, format_id: str = 'best'): self.link = link self.format_id = format_id def _get_urls_sync(self) -> tuple[str, str | None]: ydl_opts = { 'format': f'{self.format_id}+bestaudio[ext=m4a]/bestvideo+bestaudio', 'quiet': True, } with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(self.link, download=False) if 'requested_formats' in info and len(info['requested_formats']) == 2: return (info['requested_formats'][0]['url'], info['requested_formats'][1]['url']) return info['url'], None async def _get_urls(self) -> tuple[str, str | None]: loop = asyncio.get_event_loop() return await loop.run_in_executor(executor, self._get_urls_sync) async def generate(self): video_url, audio_url = await self._get_urls() if audio_url: process = await asyncio.create_subprocess_exec( 'ffmpeg', '-i', video_url, '-i', audio_url, '-c:v', 'copy', '-c:a', 'aac', '-b:a', '192k', '-g', '60', '-f', 'mp4', '-movflags', 'frag_keyframe+empty_moov+faststart', '-frag_duration', '2000000', 'pipe:1', stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.DEVNULL, ) pre_buffer, buffered_mb = [], 0 try: while buffered_mb < PRE_BUFFER_MB: chunk = await process.stdout.read(CHUNK_SIZE) if not chunk: for c in pre_buffer: yield c return pre_buffer.append(chunk) buffered_mb += len(chunk) / (1024 * 1024) for c in pre_buffer: yield c while True: chunk = await process.stdout.read(CHUNK_SIZE) if not chunk: break yield chunk finally: try: process.kill() except Exception: pass else: async with aiohttp.ClientSession() as session: async with session.get(video_url) as r: async for chunk in r.content.iter_chunked(CHUNK_SIZE): yield chunk class YTVideoDownloader: def __init__(self, link: str, format_id: str = 'best'): self.link = link self.format_id = format_id def _get_urls_sync(self) -> tuple[str, str | None, str]: ydl_opts = { 'format': ( f'{self.format_id}[ext=mp4]+bestaudio[ext=m4a]' f'/bestvideo[ext=mp4]+bestaudio[ext=m4a]' f'/best[ext=mp4]/best' ), 'quiet': True, 'no_warnings': True, 'socket_timeout': 30, 'http_headers': { 'User-Agent': ( 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) ' 'AppleWebKit/537.36 (KHTML, like Gecko) ' 'Chrome/120.0.0.0 Safari/537.36' ) }, } with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(self.link, download=False) title = info.get('title', 'video') if 'requested_formats' in info and len(info['requested_formats']) == 2: return ( info['requested_formats'][0]['url'], info['requested_formats'][1]['url'], title, ) return info['url'], None, title async def get_urls(self) -> tuple[str, str | None, str]: loop = asyncio.get_event_loop() return await loop.run_in_executor(executor, self._get_urls_sync) async def generate(self, progress_callback=None): video_url, audio_url, _ = await self.get_urls() if audio_url: process = await asyncio.create_subprocess_exec( 'ffmpeg', '-i', video_url, '-i', audio_url, '-c:v', 'copy', '-c:a', 'aac', '-b:a', '192k', '-g', '60', '-f', 'mp4', '-movflags', 'frag_keyframe+empty_moov+faststart', '-frag_duration', '2000000', 'pipe:1', stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.DEVNULL, ) bytes_sent = 0 try: while True: chunk = await process.stdout.read(CHUNK_SIZE) if not chunk: break bytes_sent += len(chunk) if progress_callback: progress_callback(bytes_sent) yield chunk finally: try: process.kill() await process.wait() except Exception: pass else: async with aiohttp.ClientSession() as session: async with session.get(video_url) as r: bytes_sent = 0 async for chunk in r.content.iter_chunked(CHUNK_SIZE): bytes_sent += len(chunk) if progress_callback: progress_callback(bytes_sent) yield chunk class YTPlaylist: """Handles playlist metadata extraction and streaming/downloading.""" def __init__(self, link: str): self.link = link def _fetch_sync(self) -> dict: """Fetch full playlist info — each entry has its own formats.""" ydl_opts = { 'quiet': True, 'no_warnings': True, 'extract_flat': False, 'ignoreerrors': True, 'socket_timeout': 30, } with yt_dlp.YoutubeDL(ydl_opts) as ydl: return ydl.extract_info(self.link, download=False) def _fetch_flat_sync(self) -> dict: """ Fast fetch — only titles/IDs, no format details. Use this for the info endpoint to avoid 30s+ waits. """ ydl_opts = { 'quiet': True, 'no_warnings': True, 'extract_flat': True, 'ignoreerrors': True, } with yt_dlp.YoutubeDL(ydl_opts) as ydl: return ydl.extract_info(self.link, download=False) async def fetch_flat(self) -> dict: loop = asyncio.get_event_loop() return await loop.run_in_executor(executor, self._fetch_flat_sync) async def fetch(self) -> dict: loop = asyncio.get_event_loop() return await loop.run_in_executor(executor, self._fetch_sync) def get_entries(self, info: dict) -> list[dict]: """Return clean list of video entries from playlist.""" entries = [] for i, entry in enumerate(info.get('entries', []), 1): if not entry: continue entries.append({ 'index': i, 'id': entry.get('id', ''), 'title': entry.get('title', f'Video {i}'), 'url': entry.get('url') or f"https://www.youtube.com/watch?v={entry.get('id')}", 'duration': entry.get('duration'), 'uploader': entry.get('uploader', ''), 'thumbnail': entry.get('thumbnail', ''), }) return entries def summary(self, info: dict) -> dict: entries = self.get_entries(info) return { 'title': info.get('title', 'Playlist'), 'uploader': info.get('uploader') or info.get('channel', 'N/A'), 'video_count': len(entries), 'entries': entries, } async def generate_zip( self, format_id: str = 'best', progress_callback=None ): """ Async generator — yields chunks of a ZIP file containing all videos. Each video is streamed through FFmpeg and added to the zip on the fly. """ info = await self.fetch_flat() entries = self.get_entries(info) total = len(entries) zip_buffer = io.BytesIO() for i, entry in enumerate(entries, 1): video_url_yt = entry['url'] safe_title = "".join( c for c in entry['title'] if c.isascii() and (c.isalnum() or c in ' ._-') )[:60].strip() or f'video_{i}' if progress_callback: progress_callback(i, total, entry['title']) try: downloader = YTVideoDownloader(video_url_yt, format_id) vid_url, aud_url, _ = await downloader.get_urls() except Exception as e: print(f'Skipping {entry["title"]}: {e}') continue video_bytes = io.BytesIO() if aud_url: process = await asyncio.create_subprocess_exec( 'ffmpeg', '-i', vid_url, '-i', aud_url, '-c:v', 'copy', '-c:a', 'aac', '-b:a', '192k', '-f', 'mp4', '-movflags', 'frag_keyframe+empty_moov+faststart', 'pipe:1', stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.DEVNULL, ) try: while True: chunk = await process.stdout.read(256 * 1024) if not chunk: break video_bytes.write(chunk) finally: try: process.kill() await process.wait() except Exception: pass else: async with aiohttp.ClientSession() as session: async with session.get(vid_url) as r: async for chunk in r.content.iter_chunked(256 * 1024): video_bytes.write(chunk) video_bytes.seek(0) filename = f'{i:02d}. {safe_title}.mp4' with zipfile.ZipFile(zip_buffer, 'a', zipfile.ZIP_STORED) as zf: zf.writestr(filename, video_bytes.read()) zip_buffer.seek(0) while True: chunk = zip_buffer.read(256 * 1024) if not chunk: break yield chunk zip_buffer.seek(0) zip_buffer.truncate(0) zip_buffer.seek(0) remainder = zip_buffer.read() if remainder: yield remainder