import aiohttp from fastapi import FastAPI, Request, Response, HTTPException from fastapi.responses import HTMLResponse, StreamingResponse, JSONResponse from YTProcessing import * from _config import * import urllib.parse from slowapi import Limiter, _rate_limit_exceeded_handler from fastapi.middleware.gzip import GZipMiddleware from slowapi.util import get_remote_address from slowapi.errors import RateLimitExceeded from dotenv import load_dotenv import secrets from fastapi.templating import Jinja2Templates import hashlib templates = Jinja2Templates(directory='templates') load_dotenv() PORT = int(os.environ.get('PORT', 5000)) HOST = os.environ.get('HOST', '0.0.0.0') SECRET_KEY = os.environ.get('SECRET_KEY', secrets.token_hex(32)) _active_streams: dict[str, asyncio.Event] = {} _streams_lock = asyncio.Lock() def _make_stream_key(client_ip: str, url: str) -> str: raw = f"{client_ip}:{url}" return hashlib.md5(raw.encode()).hexdigest() limiter = Limiter(key_func=get_remote_address) app = FastAPI() app.state.limiter = limiter app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) app.add_middleware(GZipMiddleware, minimum_size=1000) @app.get('/', response_class=HTMLResponse) async def home(): with open(HOME_HTML, 'r', encoding='utf-8') as f: return HTMLResponse(f.read()) @app.post('/validate') @limiter.limit('20/minute') async def validate(request: Request): body = await request.json() url = (body or {}).get('url', '').strip() if not url: return JSONResponse({'ok': False, 'error': 'No URL provided.'}) ok, err = await YTVideoInfo(url).validate() return JSONResponse({'ok': ok, 'error': err}) @app.get('/watch', response_class=HTMLResponse) async def watch(request: Request, url: str = ''): if not url: return HTMLResponse('', status_code=302, headers={'Location': '/'}) info_obj = YTVideoInfo(url) info = await info_obj.fetch() formats = info_obj.get_formats(info) subs = info_obj.get_subtitles(info) meta = info_obj.summary(info) from jinja2 import Template with open(PLAYER_HTML, 'r', encoding='utf-8') as f: template = Template(f.read()) return HTMLResponse(template.render( meta=meta, formats=formats, default_format=formats[0]['id'] if formats else 'best', subtitles=subs, youtube_url=url, )) @app.get('/stream') @limiter.limit('30/minute') async def stream( request: Request, url: str, format_id: str = 'best', client: str = 'browser' ): streamer = YTVideoStream(url, format_id) prebuffer = client != 'mobile' return StreamingResponse( streamer.generate(prebuffer=prebuffer), media_type='video/mp4', headers={ 'Cache-Control': 'no-cache', 'X-Content-Type-Options': 'nosniff', 'Transfer-Encoding': 'chunked', } ) @app.get('/download') @limiter.limit('5/minute') async def download(request: Request, url: str, format_id: str = 'best'): if not url: raise HTTPException(status_code=400, detail='No URL provided.') def progress_callback(pct: float): set_progress(url, {'percent': pct, 'speed': '', 'eta': ''}) try: downloader = YTVideoDownloader(url, format_id) filepath, tmp_dir = await downloader.download( progress_callback=progress_callback ) except TimeoutError: clear_progress(url) raise HTTPException(status_code=504, detail='Download timed out.') except Exception as e: clear_progress(url) import traceback traceback.print_exc() raise HTTPException(status_code=500, detail=str(e)) file_size = os.path.getsize(filepath) if file_size == 0: shutil.rmtree(tmp_dir, ignore_errors=True) raise HTTPException( status_code=500, detail='Downloaded file is empty — download may have failed.' ) try: info_obj = YTVideoInfo(url) info = await info_obj.fetch() title = info.get('title', 'video') except Exception: title = 'video' safe_title = "".join( c for c in title if c.isascii() and (c.isalnum() or c in ' ._-') )[:80].strip() or 'video' utf8_title = urllib.parse.quote(title[:80].strip(), safe='._- ') set_progress(url, {'percent': 100, 'speed': '', 'eta': ''}) async def file_generator(): try: with open(filepath, 'rb') as f: while True: chunk = f.read(CHUNK_SIZE) if not chunk: break yield chunk finally: await asyncio.sleep(2) shutil.rmtree(tmp_dir, ignore_errors=True) clear_progress(url) return StreamingResponse( file_generator(), media_type='video/mp4', headers={ 'Content-Length': str(file_size), 'Content-Disposition': ( f'attachment; ' f'filename="{safe_title}.mp4"; ' f"filename*=UTF-8''{utf8_title}.mp4" ), 'Accept-Ranges': 'bytes', 'Cache-Control': 'no-cache', } ) @app.get('/subtitles') async def subtitles(url: str, lang: str, auto: str = '0'): info_obj = YTVideoInfo(url) info = await info_obj.fetch() vtt_url = info_obj.get_subtitle_vtt_url(info, lang, auto == '1') if not vtt_url: return Response('', media_type='text/vtt') async with aiohttp.ClientSession() as session: async with session.get(vtt_url) as r: text = await r.text() return Response(text, media_type='text/vtt; charset=utf-8') @app.get('/download/progress') async def download_progress(url: str): return JSONResponse(get_progress(url)) @app.get('/info') async def info(url: str): if not url: raise HTTPException(status_code=400, detail='No URL provided.') is_playlist = ( 'playlist?list=' in url or ('list=' in url and 'watch?v=' not in url) ) try: if is_playlist: playlist = YTPlaylist(url) raw = await playlist.fetch_flat() return JSONResponse({ 'type': 'playlist', **playlist.summary(raw) }) else: info_obj = YTVideoInfo(url) raw = await info_obj.fetch() formats = info_obj.get_formats(raw) return JSONResponse({ 'type': 'video', 'title': raw.get('title'), 'uploader': raw.get('uploader'), 'duration': raw.get('duration'), 'thumbnail_url': raw.get('thumbnail'), 'formats': formats, 'subtitles': info_obj.get_subtitles(raw), }) except HTTPException: raise except Exception as e: import traceback traceback.print_exc() raise HTTPException(status_code=500, detail=str(e)) @app.get('/playlist/info') async def playlist_info(url: str): if not url: raise HTTPException(status_code=400, detail='No URL provided.') try: playlist = YTPlaylist(url) info = await playlist.fetch_flat() summary = playlist.summary(info) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) return JSONResponse(summary) @app.get('/stream') @limiter.limit('30/minute') async def stream( request: Request, url: str, format_id: str = 'best', client: str = 'browser' ): client_ip = request.client.host stream_key = _make_stream_key(client_ip, url) async with _streams_lock: if stream_key in _active_streams: _active_streams[stream_key].set() await asyncio.sleep(0.5) stop_event = asyncio.Event() _active_streams[stream_key] = stop_event estimated_size = None try: info_obj = YTVideoInfo(url) info = await info_obj.fetch() for f in info.get('formats', []): if f.get('format_id') == format_id: video_size = f.get('filesize') or f.get('filesize_approx') or 0 audio_size = 0 for af in info['formats']: if (af.get('vcodec') == 'none' and af.get('ext') == 'm4a'): audio_size = ( af.get('filesize') or af.get('filesize_approx') or 0 ) break total = video_size + audio_size if total > 0: estimated_size = total break except Exception: pass prebuffer = client != 'mobile' streamer = YTVideoStream(url, format_id) async def stream_with_cleanup(): try: async for chunk in streamer.generate( prebuffer=prebuffer, stop_event=stop_event, ): yield chunk finally: async with _streams_lock: if _active_streams.get(stream_key) is stop_event: del _active_streams[stream_key] headers = { 'Cache-Control': 'no-cache', 'X-Content-Type-Options': 'nosniff', 'Accept-Ranges': 'none', } if estimated_size: headers['X-Estimated-Size'] = str(estimated_size) if client == 'mobile': headers['Content-Length'] = str(estimated_size) return StreamingResponse( stream_with_cleanup(), media_type='video/mp4', headers=headers, ) @app.get('/playlist/download') @limiter.limit('2/minute') async def playlist_download( request: Request, url: str, format_id: str = 'best' ): if not url: raise HTTPException(status_code=400, detail='No URL provided.') try: playlist = YTPlaylist(url) info = await playlist.fetch_flat() title = info.get('title', 'playlist') except Exception as e: raise HTTPException(status_code=500, detail=str(e)) safe_title = "".join( c for c in title if c.isascii() and (c.isalnum() or c in ' ._-') )[:60].strip() or 'playlist' utf8_title = urllib.parse.quote(title[:60].strip(), safe='._- ') def progress_callback(current: int, total: int, video_title: str): set_progress(url, { 'current': current, 'total': total, 'video_title': video_title, 'percent': round((current - 1) / total * 100, 1), }) return StreamingResponse( playlist.generate_zip( format_id=format_id, progress_callback=progress_callback, ), media_type='application/zip', headers={ 'Content-Disposition': ( f'attachment; ' f'filename="{safe_title}.zip"; ' f"filename*=UTF-8''{utf8_title}.zip" ), 'Cache-Control': 'no-cache', } ) @app.get('/playlist/watch', response_class=HTMLResponse) async def playlist_watch(request: Request, url: str = ''): if not url: return HTMLResponse('', status_code=302, headers={'Location': '/'}) playlist = YTPlaylist(url) info = await playlist.fetch_flat() summary = playlist.summary(info) return templates.TemplateResponse('Playlist.html', { 'request': request, 'title': summary['title'], 'uploader': summary['uploader'], 'video_count': summary['video_count'], 'entries': summary['entries'], 'playlist_url': url, }) @app.get('/playlist/progress') async def playlist_progress(url: str): return JSONResponse(get_progress(url)) if __name__ == '__main__': import uvicorn print(f'Local URL : http://localhost:{PORT}') uvicorn.run( 'app:app', host=HOST, port=PORT, log_level='warning', reload=False, )