diff --git a/YTProcessing.py b/YTProcessing.py index 03e797c..f8e23e9 100644 --- a/YTProcessing.py +++ b/YTProcessing.py @@ -2,10 +2,10 @@ import yt_dlp import aiohttp import asyncio from _config import * -from cachetools import TTLCache import zipfile import io + class YTVideoInfo: def __init__(self, link: str): self.link = link @@ -56,11 +56,44 @@ class YTVideoInfo: if key in seen: continue seen.add(key) + size = f.get('filesize') or f.get('filesize_approx') + size_str = None + if size: + if size >= 1_073_741_824: + size_str = f"{size / 1_073_741_824:.1f} GB" + elif size >= 1_048_576: + size_str = f"{size / 1_048_576:.1f} MB" + else: + size_str = f"{size / 1024:.0f} KB" + + audio_size = 0 + for af in info['formats']: + if (af.get('vcodec') == 'none' + and af.get('acodec') != 'none' + and af.get('ext') == 'm4a'): + audio_size = af.get('filesize') or af.get('filesize_approx') or 0 + break + + total_size = (size or 0) + audio_size + label = f"{height}p" if fps and fps > 30: label += f" {int(fps)}fps" label += f" ({ext.upper()})" - formats.append({'id': f['format_id'], 'label': label, 'height': height}) + if size_str: + label += f" ~ {size_str}" + + formats.append({ + 'id': f['format_id'], + 'label': label, + 'height': height, + 'ext': ext, + 'fps': fps, + 'vcodec': f.get('vcodec', ''), + 'acodec': f.get('acodec', ''), + 'filesize': total_size or None, + }) + return sorted(formats, key=lambda x: x['height'], reverse=True) def get_subtitles(self, info: dict) -> list[dict]: @@ -126,7 +159,17 @@ class YTVideoStream: loop = asyncio.get_event_loop() return await loop.run_in_executor(executor, self._get_urls_sync) - async def generate(self): + async def generate( + self, + prebuffer: bool = True, + stop_event: asyncio.Event | None = None, + ): + """ + Yields MP4 chunks. + stop_event — when set, cleanly terminates FFmpeg and stops yielding. + This allows the server to cancel an in-progress stream when the + client reconnects with a new format_id. + """ video_url, audio_url = await self._get_urls() if audio_url: @@ -146,110 +189,61 @@ class YTVideoStream: stderr=asyncio.subprocess.DEVNULL, ) - pre_buffer, buffered_mb = [], 0 try: - while buffered_mb < PRE_BUFFER_MB: - chunk = await process.stdout.read(CHUNK_SIZE) - if not chunk: - for c in pre_buffer: - yield c + if prebuffer: + pre_buffer, buffered_mb = [], 0 + while buffered_mb < PRE_BUFFER_MB: + if stop_event and stop_event.is_set(): + return + try: + chunk = await asyncio.wait_for( + process.stdout.read(CHUNK_SIZE), + timeout=15.0 + ) + except asyncio.TimeoutError: + yield b'' + continue + if not chunk: + for c in pre_buffer: yield c + return + pre_buffer.append(chunk) + buffered_mb += len(chunk) / (1024 * 1024) + for c in pre_buffer: + yield c + else: + first = True + while first: + if stop_event and stop_event.is_set(): + return + try: + chunk = await asyncio.wait_for( + process.stdout.read(CHUNK_SIZE), + timeout=15.0 + ) + if chunk: + yield chunk + first = False + else: + return + except asyncio.TimeoutError: + yield b'' + continue + + while True: + if stop_event and stop_event.is_set(): return - pre_buffer.append(chunk) - buffered_mb += len(chunk) / (1024 * 1024) - - for c in pre_buffer: - yield c - - while True: - chunk = await process.stdout.read(CHUNK_SIZE) + try: + chunk = await asyncio.wait_for( + process.stdout.read(CHUNK_SIZE), + timeout=30.0 + ) + except asyncio.TimeoutError: + yield b'' + continue if not chunk: break yield chunk - finally: - try: - process.kill() - except Exception: - pass - else: - async with aiohttp.ClientSession() as session: - async with session.get(video_url) as r: - async for chunk in r.content.iter_chunked(CHUNK_SIZE): - yield chunk - - -class YTVideoDownloader: - def __init__(self, link: str, format_id: str = 'best'): - self.link = link - self.format_id = format_id - - def _get_urls_sync(self) -> tuple[str, str | None, str]: - ydl_opts = { - 'format': ( - f'{self.format_id}[ext=mp4]+bestaudio[ext=m4a]' - f'/bestvideo[ext=mp4]+bestaudio[ext=m4a]' - f'/best[ext=mp4]/best' - ), - 'quiet': True, - 'no_warnings': True, - 'socket_timeout': 30, - 'http_headers': { - 'User-Agent': ( - 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) ' - 'AppleWebKit/537.36 (KHTML, like Gecko) ' - 'Chrome/120.0.0.0 Safari/537.36' - ) - }, - } - with yt_dlp.YoutubeDL(ydl_opts) as ydl: - info = ydl.extract_info(self.link, download=False) - - title = info.get('title', 'video') - - if 'requested_formats' in info and len(info['requested_formats']) == 2: - return ( - info['requested_formats'][0]['url'], - info['requested_formats'][1]['url'], - title, - ) - return info['url'], None, title - - async def get_urls(self) -> tuple[str, str | None, str]: - loop = asyncio.get_event_loop() - return await loop.run_in_executor(executor, self._get_urls_sync) - - async def generate(self, progress_callback=None): - video_url, audio_url, _ = await self.get_urls() - - if audio_url: - process = await asyncio.create_subprocess_exec( - 'ffmpeg', - '-i', video_url, - '-i', audio_url, - '-c:v', 'copy', - '-c:a', 'aac', - '-b:a', '192k', - '-g', '60', - '-f', 'mp4', - '-movflags', 'frag_keyframe+empty_moov+faststart', - '-frag_duration', '2000000', - 'pipe:1', - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.DEVNULL, - ) - - bytes_sent = 0 - try: - while True: - chunk = await process.stdout.read(CHUNK_SIZE) - if not chunk: - break - bytes_sent += len(chunk) - - if progress_callback: - progress_callback(bytes_sent) - - yield chunk finally: try: process.kill() @@ -260,22 +254,118 @@ class YTVideoDownloader: else: async with aiohttp.ClientSession() as session: async with session.get(video_url) as r: - bytes_sent = 0 async for chunk in r.content.iter_chunked(CHUNK_SIZE): - bytes_sent += len(chunk) - if progress_callback: - progress_callback(bytes_sent) + if stop_event and stop_event.is_set(): + return yield chunk -class YTPlaylist: - """Handles playlist metadata extraction and streaming/downloading.""" +class YTVideoDownloader: + def __init__(self, link: str, format_id: str = 'best'): + self.link = link + self.format_id = format_id + def _download_sync(self, progress_callback=None) -> tuple[str, str]: + tmp_dir = tempfile.mkdtemp() + out_tmpl = os.path.join(tmp_dir, 'video.%(ext)s') + + ydl_opts = { + 'format': ( + f'{self.format_id}[ext=mp4]+bestaudio[ext=m4a]' + f'/bestvideo[ext=mp4]+bestaudio[ext=m4a]' + f'/best[ext=mp4]/best' + ), + 'outtmpl': out_tmpl, + 'merge_output_format': 'mp4', + 'quiet': True, + 'no_warnings': True, + + 'concurrent_fragment_downloads': 4, + + 'retries': 5, + 'fragment_retries': 5, + + 'socket_timeout': 30, + + 'postprocessors': [ + { + 'key': 'FFmpegVideoRemuxer', + 'preferedformat': 'mp4', + }, + ], + 'postprocessor_args': { + 'ffmpeg': [ + '-c:v', 'copy', + '-c:a', 'aac', + '-b:a', '192k', + '-movflags', '+faststart', + ] + }, + } + + if progress_callback: + def hook(d): + if d['status'] == 'downloading': + raw = d.get('_percent_str', '0%').strip().replace('%', '') + try: + pct = min(float(raw), 99.0) + progress_callback(pct) + except Exception: + pass + elif d['status'] == 'finished': + progress_callback(99) + + ydl_opts['progress_hooks'] = [hook] + + try: + with yt_dlp.YoutubeDL(ydl_opts) as ydl: + ydl.download([self.link]) + except Exception as e: + shutil.rmtree(tmp_dir, ignore_errors=True) + raise RuntimeError(f'yt-dlp failed: {e}') + + mp4_files = [ + f for f in os.listdir(tmp_dir) + if f.endswith('.mp4') + ] + if not mp4_files: + shutil.rmtree(tmp_dir, ignore_errors=True) + raise FileNotFoundError('No MP4 found after download.') + + return os.path.join(tmp_dir, mp4_files[0]), tmp_dir + + async def download(self, progress_callback=None) -> tuple[str, str]: + loop = asyncio.get_event_loop() + try: + return await asyncio.wait_for( + loop.run_in_executor( + executor, + lambda: self._download_sync(progress_callback) + ), + timeout=600 + ) + except asyncio.TimeoutError: + raise TimeoutError('Download timed out after 10 minutes.') + + async def stream_file(self, filepath: str): + file_size = os.path.getsize(filepath) + + async def generator(): + with open(filepath, 'rb') as f: + while True: + chunk = f.read(CHUNK_SIZE) + if not chunk: + break + yield chunk + + return generator(), file_size + + +class YTPlaylist: def __init__(self, link: str): self.link = link def _fetch_sync(self) -> dict: - """Fetch full playlist info — each entry has its own formats.""" ydl_opts = { 'quiet': True, 'no_warnings': True, @@ -287,10 +377,6 @@ class YTPlaylist: return ydl.extract_info(self.link, download=False) def _fetch_flat_sync(self) -> dict: - """ - Fast fetch — only titles/IDs, no format details. - Use this for the info endpoint to avoid 30s+ waits. - """ ydl_opts = { 'quiet': True, 'no_warnings': True, @@ -309,7 +395,6 @@ class YTPlaylist: return await loop.run_in_executor(executor, self._fetch_sync) def get_entries(self, info: dict) -> list[dict]: - """Return clean list of video entries from playlist.""" entries = [] for i, entry in enumerate(info.get('entries', []), 1): if not entry: @@ -334,87 +419,88 @@ class YTPlaylist: 'entries': entries, } - async def generate_zip( - self, - format_id: str = 'best', - progress_callback=None - ): - """ - Async generator — yields chunks of a ZIP file containing all videos. - Each video is streamed through FFmpeg and added to the zip on the fly. - """ - info = await self.fetch_flat() - entries = self.get_entries(info) - total = len(entries) +async def generate_zip(self, format_id: str = 'best', progress_callback=None): + info = await self.fetch_flat() + entries = self.get_entries(info) + total = len(entries) - zip_buffer = io.BytesIO() + zip_buffer = io.BytesIO() - for i, entry in enumerate(entries, 1): - video_url_yt = entry['url'] - safe_title = "".join( - c for c in entry['title'] if c.isascii() and (c.isalnum() or c in ' ._-') - )[:60].strip() or f'video_{i}' + for i, entry in enumerate(entries, 1): + video_url_yt = entry['url'] + safe_title = "".join( + c for c in entry['title'] + if c.isascii() and (c.isalnum() or c in ' ._-') + )[:60].strip() or f'video_{i}' - if progress_callback: - progress_callback(i, total, entry['title']) + if progress_callback: + progress_callback(i, total, entry['title']) - try: - downloader = YTVideoDownloader(video_url_yt, format_id) - vid_url, aud_url, _ = await downloader.get_urls() - except Exception as e: - print(f'Skipping {entry["title"]}: {e}') - continue + try: + info_obj = YTVideoInfo(video_url_yt) + vid_info = await info_obj.fetch() - video_bytes = io.BytesIO() - - if aud_url: - process = await asyncio.create_subprocess_exec( - 'ffmpeg', - '-i', vid_url, - '-i', aud_url, - '-c:v', 'copy', - '-c:a', 'aac', - '-b:a', '192k', - '-f', 'mp4', - '-movflags', 'frag_keyframe+empty_moov+faststart', - 'pipe:1', - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.DEVNULL, - ) - try: - while True: - chunk = await process.stdout.read(256 * 1024) - if not chunk: - break - video_bytes.write(chunk) - finally: - try: - process.kill() - await process.wait() - except Exception: - pass + requested = vid_info.get('requested_formats', []) + if len(requested) == 2: + vid_url = requested[0]['url'] + aud_url = requested[1]['url'] else: - async with aiohttp.ClientSession() as session: - async with session.get(vid_url) as r: - async for chunk in r.content.iter_chunked(256 * 1024): - video_bytes.write(chunk) + vid_url = vid_info.get('url') + aud_url = None + except Exception as e: + print(f'Skipping {entry["title"]}: {e}') + continue - video_bytes.seek(0) - filename = f'{i:02d}. {safe_title}.mp4' - with zipfile.ZipFile(zip_buffer, 'a', zipfile.ZIP_STORED) as zf: - zf.writestr(filename, video_bytes.read()) + video_bytes = io.BytesIO() - zip_buffer.seek(0) - while True: - chunk = zip_buffer.read(256 * 1024) - if not chunk: - break - yield chunk + if aud_url: + process = await asyncio.create_subprocess_exec( + 'ffmpeg', + '-i', vid_url, + '-i', aud_url, + '-c:v', 'copy', + '-c:a', 'aac', + '-b:a', '192k', + '-f', 'mp4', + '-movflags', 'frag_keyframe+empty_moov+faststart', + 'pipe:1', + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.DEVNULL, + ) + try: + while True: + chunk = await process.stdout.read(CHUNK_SIZE) + if not chunk: + break + video_bytes.write(chunk) + finally: + try: + process.kill() + await process.wait() + except Exception: + pass + else: + async with aiohttp.ClientSession() as session: + async with session.get(vid_url) as r: + async for chunk in r.content.iter_chunked(CHUNK_SIZE): + video_bytes.write(chunk) - zip_buffer.seek(0) - zip_buffer.truncate(0) + video_bytes.seek(0) + filename = f'{i:02d}. {safe_title}.mp4' + with zipfile.ZipFile(zip_buffer, 'a', zipfile.ZIP_STORED) as zf: + zf.writestr(filename, video_bytes.read()) zip_buffer.seek(0) - remainder = zip_buffer.read() - if remainder: - yield remainder \ No newline at end of file + while True: + chunk = zip_buffer.read(CHUNK_SIZE) + if not chunk: + break + yield chunk + + zip_buffer.seek(0) + zip_buffer.truncate(0) + + zip_buffer.seek(0) + remainder = zip_buffer.read() + if remainder: + yield remainder \ No newline at end of file diff --git a/app.py b/app.py index 7232fe9..5c615f6 100644 --- a/app.py +++ b/app.py @@ -11,6 +11,7 @@ from slowapi.errors import RateLimitExceeded from dotenv import load_dotenv import secrets from fastapi.templating import Jinja2Templates +import hashlib templates = Jinja2Templates(directory='templates') @@ -20,6 +21,14 @@ PORT = int(os.environ.get('PORT', 5000)) HOST = os.environ.get('HOST', '0.0.0.0') SECRET_KEY = os.environ.get('SECRET_KEY', secrets.token_hex(32)) + +_active_streams: dict[str, asyncio.Event] = {} +_streams_lock = asyncio.Lock() + +def _make_stream_key(client_ip: str, url: str) -> str: + raw = f"{client_ip}:{url}" + return hashlib.md5(raw.encode()).hexdigest() + limiter = Limiter(key_func=get_remote_address) app = FastAPI() app.state.limiter = limiter @@ -68,11 +77,98 @@ async def watch(request: Request, url: str = ''): @app.get('/stream') @limiter.limit('30/minute') -async def stream(request: Request, url: str, format_id: str = 'best'): - streamer = YTVideoStream(url, format_id) +async def stream( + request: Request, + url: str, + format_id: str = 'best', + client: str = 'browser' +): + streamer = YTVideoStream(url, format_id) + prebuffer = client != 'mobile' + return StreamingResponse( - streamer.generate(), + streamer.generate(prebuffer=prebuffer), media_type='video/mp4', + headers={ + 'Cache-Control': 'no-cache', + 'X-Content-Type-Options': 'nosniff', + 'Transfer-Encoding': 'chunked', + } + ) + + +@app.get('/download') +@limiter.limit('5/minute') +async def download(request: Request, url: str, format_id: str = 'best'): + if not url: + raise HTTPException(status_code=400, detail='No URL provided.') + + def progress_callback(pct: float): + set_progress(url, {'percent': pct, 'speed': '', 'eta': ''}) + + try: + downloader = YTVideoDownloader(url, format_id) + filepath, tmp_dir = await downloader.download( + progress_callback=progress_callback + ) + except TimeoutError: + clear_progress(url) + raise HTTPException(status_code=504, detail='Download timed out.') + except Exception as e: + clear_progress(url) + import traceback + traceback.print_exc() + raise HTTPException(status_code=500, detail=str(e)) + + file_size = os.path.getsize(filepath) + if file_size == 0: + shutil.rmtree(tmp_dir, ignore_errors=True) + raise HTTPException( + status_code=500, + detail='Downloaded file is empty — download may have failed.' + ) + + try: + info_obj = YTVideoInfo(url) + info = await info_obj.fetch() + title = info.get('title', 'video') + except Exception: + title = 'video' + + safe_title = "".join( + c for c in title + if c.isascii() and (c.isalnum() or c in ' ._-') + )[:80].strip() or 'video' + utf8_title = urllib.parse.quote(title[:80].strip(), safe='._- ') + + set_progress(url, {'percent': 100, 'speed': '', 'eta': ''}) + + async def file_generator(): + try: + with open(filepath, 'rb') as f: + while True: + chunk = f.read(CHUNK_SIZE) + if not chunk: + break + yield chunk + finally: + await asyncio.sleep(2) + shutil.rmtree(tmp_dir, ignore_errors=True) + clear_progress(url) + + return StreamingResponse( + file_generator(), + media_type='video/mp4', + headers={ + 'Content-Length': str(file_size), + 'Content-Disposition': ( + f'attachment; ' + f'filename="{safe_title}.mp4"; ' + f"filename*=UTF-8''{utf8_title}.mp4" + ), + 'Accept-Ranges': 'bytes', + 'Cache-Control': 'no-cache', + } ) @app.get('/subtitles') @@ -91,45 +187,6 @@ async def subtitles(url: str, lang: str, auto: str = '0'): return Response(text, media_type='text/vtt; charset=utf-8') - -@app.get('/download') -@limiter.limit('5/minute') -async def download(request: Request, url: str, format_id: str = 'best'): - if not url: - raise HTTPException(status_code=400, detail='No URL provided.') - - try: - downloader = YTVideoDownloader(url, format_id) - video_url, audio_url, title = await downloader.get_urls() - except Exception as e: - raise HTTPException(status_code=500, detail=f'Could not fetch video info: {str(e)}') - - ascii_title = "".join( - c for c in title if c.isascii() and (c.isalnum() or c in ' ._-') - )[:80].strip() or 'video' - - utf8_title = urllib.parse.quote(title[:80].strip(), safe='._- ') - - def progress_callback(total_bytes: int): - mb = total_bytes / (1024 * 1024) - set_progress(url, { - 'mb_received': round(mb, 1), - 'speed': f'{mb:.1f} MB received', - }) - - return StreamingResponse( - downloader.generate(progress_callback=progress_callback), - media_type='video/mp4', - headers={ - 'Content-Disposition': ( - f'attachment; ' - f'filename="{ascii_title}.mp4"; ' - f"filename*=UTF-8''{utf8_title}.mp4" - ), - 'X-Video-Title': utf8_title, - 'Cache-Control': 'no-cache', - } - ) @app.get('/download/progress') async def download_progress(url: str): return JSONResponse(get_progress(url)) @@ -139,29 +196,37 @@ async def info(url: str): if not url: raise HTTPException(status_code=400, detail='No URL provided.') - # Simple check — playlist URLs contain 'list=' - is_playlist = 'list=' in url and 'watch?v=' not in url + is_playlist = ( + 'playlist?list=' in url + or ('list=' in url and 'watch?v=' not in url) + ) try: if is_playlist: playlist = YTPlaylist(url) raw = await playlist.fetch_flat() return JSONResponse({ - 'type': 'playlist', + 'type': 'playlist', **playlist.summary(raw) }) else: info_obj = YTVideoInfo(url) raw = await info_obj.fetch() + formats = info_obj.get_formats(raw) return JSONResponse({ - 'type': 'video', - 'title': raw.get('title'), - 'uploader': raw.get('uploader'), - 'duration': raw.get('duration'), - 'formats': info_obj.get_formats(raw), - 'subtitles': info_obj.get_subtitles(raw), + 'type': 'video', + 'title': raw.get('title'), + 'uploader': raw.get('uploader'), + 'duration': raw.get('duration'), + 'thumbnail_url': raw.get('thumbnail'), + 'formats': formats, + 'subtitles': info_obj.get_subtitles(raw), }) + except HTTPException: + raise except Exception as e: + import traceback + traceback.print_exc() raise HTTPException(status_code=500, detail=str(e)) @app.get('/playlist/info') @@ -179,45 +244,84 @@ async def playlist_info(url: str): return JSONResponse(summary) -@app.get('/playlist/stream') -async def playlist_stream( - request: Request, - url: str, - index: int = 0, # which video in the playlist to stream (0-based) - format_id: str = 'best' +@app.get('/stream') +@limiter.limit('30/minute') +async def stream( + request: Request, + url: str, + format_id: str = 'best', + client: str = 'browser' ): - if not url: - raise HTTPException(status_code=400, detail='No URL provided.') + client_ip = request.client.host + stream_key = _make_stream_key(client_ip, url) + async with _streams_lock: + if stream_key in _active_streams: + _active_streams[stream_key].set() + await asyncio.sleep(0.5) + + stop_event = asyncio.Event() + _active_streams[stream_key] = stop_event + + estimated_size = None try: - playlist = YTPlaylist(url) - info = await playlist.fetch_flat() - entries = playlist.get_entries(info) - except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) + info_obj = YTVideoInfo(url) + info = await info_obj.fetch() + for f in info.get('formats', []): + if f.get('format_id') == format_id: + video_size = f.get('filesize') or f.get('filesize_approx') or 0 + audio_size = 0 + for af in info['formats']: + if (af.get('vcodec') == 'none' + and af.get('ext') == 'm4a'): + audio_size = ( + af.get('filesize') + or af.get('filesize_approx') + or 0 + ) + break + total = video_size + audio_size + if total > 0: + estimated_size = total + break + except Exception: + pass - if index >= len(entries): - raise HTTPException(status_code=404, detail='Video index out of range.') + prebuffer = client != 'mobile' + streamer = YTVideoStream(url, format_id) - entry = entries[index] - video_url = entry['url'] + async def stream_with_cleanup(): + try: + async for chunk in streamer.generate( + prebuffer=prebuffer, + stop_event=stop_event, + ): + yield chunk + finally: + async with _streams_lock: + if _active_streams.get(stream_key) is stop_event: + del _active_streams[stream_key] - # Reuse YTVideoStream for the individual video - streamer = YTVideoStream(video_url, format_id) + headers = { + 'Cache-Control': 'no-cache', + 'X-Content-Type-Options': 'nosniff', + 'Accept-Ranges': 'none', + } + + if estimated_size: + headers['X-Estimated-Size'] = str(estimated_size) + if client == 'mobile': + headers['Content-Length'] = str(estimated_size) return StreamingResponse( - streamer.generate(), + stream_with_cleanup(), media_type='video/mp4', - headers={ - 'X-Playlist-Index': str(index), - 'X-Playlist-Total': str(len(entries)), - 'X-Video-Title': urllib.parse.quote(entry['title']), - } + headers=headers, ) @app.get('/playlist/download') -@limiter.limit('2/minute') # stricter limit — downloads entire playlist +@limiter.limit('2/minute') async def playlist_download( request: Request, url: str, @@ -227,7 +331,6 @@ async def playlist_download( raise HTTPException(status_code=400, detail='No URL provided.') try: - # Quick flat fetch just to get the playlist title playlist = YTPlaylist(url) info = await playlist.fetch_flat() title = info.get('title', 'playlist')