import aiohttp from fastapi import FastAPI, Request, Response, HTTPException from fastapi.responses import HTMLResponse, StreamingResponse, JSONResponse from YTProcessing import * from _config import * import urllib.parse from slowapi import Limiter, _rate_limit_exceeded_handler from fastapi.middleware.gzip import GZipMiddleware from slowapi.util import get_remote_address from slowapi.errors import RateLimitExceeded from dotenv import load_dotenv import secrets from fastapi.templating import Jinja2Templates templates = Jinja2Templates(directory='templates') load_dotenv() PORT = int(os.environ.get('PORT', 5000)) HOST = os.environ.get('HOST', '0.0.0.0') SECRET_KEY = os.environ.get('SECRET_KEY', secrets.token_hex(32)) limiter = Limiter(key_func=get_remote_address) app = FastAPI() app.state.limiter = limiter app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) app.add_middleware(GZipMiddleware, minimum_size=1000) @app.get('/', response_class=HTMLResponse) async def home(): with open(HOME_HTML, 'r', encoding='utf-8') as f: return HTMLResponse(f.read()) @app.post('/validate') @limiter.limit('20/minute') async def validate(request: Request): body = await request.json() url = (body or {}).get('url', '').strip() if not url: return JSONResponse({'ok': False, 'error': 'No URL provided.'}) ok, err = await YTVideoInfo(url).validate() return JSONResponse({'ok': ok, 'error': err}) @app.get('/watch', response_class=HTMLResponse) async def watch(request: Request, url: str = ''): if not url: return HTMLResponse('', status_code=302, headers={'Location': '/'}) info_obj = YTVideoInfo(url) info = await info_obj.fetch() formats = info_obj.get_formats(info) subs = info_obj.get_subtitles(info) meta = info_obj.summary(info) from jinja2 import Template with open(PLAYER_HTML, 'r', encoding='utf-8') as f: template = Template(f.read()) return HTMLResponse(template.render( meta=meta, formats=formats, default_format=formats[0]['id'] if formats else 'best', subtitles=subs, youtube_url=url, )) @app.get('/stream') @limiter.limit('30/minute') async def stream(request: Request, url: str, format_id: str = 'best'): streamer = YTVideoStream(url, format_id) return StreamingResponse( streamer.generate(), media_type='video/mp4', ) @app.get('/subtitles') async def subtitles(url: str, lang: str, auto: str = '0'): info_obj = YTVideoInfo(url) info = await info_obj.fetch() vtt_url = info_obj.get_subtitle_vtt_url(info, lang, auto == '1') if not vtt_url: return Response('', media_type='text/vtt') async with aiohttp.ClientSession() as session: async with session.get(vtt_url) as r: text = await r.text() return Response(text, media_type='text/vtt; charset=utf-8') @app.get('/download') @limiter.limit('5/minute') async def download(request: Request, url: str, format_id: str = 'best'): if not url: raise HTTPException(status_code=400, detail='No URL provided.') try: downloader = YTVideoDownloader(url, format_id) video_url, audio_url, title = await downloader.get_urls() except Exception as e: raise HTTPException(status_code=500, detail=f'Could not fetch video info: {str(e)}') ascii_title = "".join( c for c in title if c.isascii() and (c.isalnum() or c in ' ._-') )[:80].strip() or 'video' utf8_title = urllib.parse.quote(title[:80].strip(), safe='._- ') def progress_callback(total_bytes: int): mb = total_bytes / (1024 * 1024) set_progress(url, { 'mb_received': round(mb, 1), 'speed': f'{mb:.1f} MB received', }) return StreamingResponse( downloader.generate(progress_callback=progress_callback), media_type='video/mp4', headers={ 'Content-Disposition': ( f'attachment; ' f'filename="{ascii_title}.mp4"; ' f"filename*=UTF-8''{utf8_title}.mp4" ), 'X-Video-Title': utf8_title, 'Cache-Control': 'no-cache', } ) @app.get('/download/progress') async def download_progress(url: str): return JSONResponse(get_progress(url)) @app.get('/info') async def info(url: str): if not url: raise HTTPException(status_code=400, detail='No URL provided.') # Simple check — playlist URLs contain 'list=' is_playlist = 'list=' in url and 'watch?v=' not in url try: if is_playlist: playlist = YTPlaylist(url) raw = await playlist.fetch_flat() return JSONResponse({ 'type': 'playlist', **playlist.summary(raw) }) else: info_obj = YTVideoInfo(url) raw = await info_obj.fetch() return JSONResponse({ 'type': 'video', 'title': raw.get('title'), 'uploader': raw.get('uploader'), 'duration': raw.get('duration'), 'formats': info_obj.get_formats(raw), 'subtitles': info_obj.get_subtitles(raw), }) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get('/playlist/info') async def playlist_info(url: str): if not url: raise HTTPException(status_code=400, detail='No URL provided.') try: playlist = YTPlaylist(url) info = await playlist.fetch_flat() summary = playlist.summary(info) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) return JSONResponse(summary) @app.get('/playlist/stream') async def playlist_stream( request: Request, url: str, index: int = 0, # which video in the playlist to stream (0-based) format_id: str = 'best' ): if not url: raise HTTPException(status_code=400, detail='No URL provided.') try: playlist = YTPlaylist(url) info = await playlist.fetch_flat() entries = playlist.get_entries(info) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) if index >= len(entries): raise HTTPException(status_code=404, detail='Video index out of range.') entry = entries[index] video_url = entry['url'] # Reuse YTVideoStream for the individual video streamer = YTVideoStream(video_url, format_id) return StreamingResponse( streamer.generate(), media_type='video/mp4', headers={ 'X-Playlist-Index': str(index), 'X-Playlist-Total': str(len(entries)), 'X-Video-Title': urllib.parse.quote(entry['title']), } ) @app.get('/playlist/download') @limiter.limit('2/minute') # stricter limit — downloads entire playlist async def playlist_download( request: Request, url: str, format_id: str = 'best' ): if not url: raise HTTPException(status_code=400, detail='No URL provided.') try: # Quick flat fetch just to get the playlist title playlist = YTPlaylist(url) info = await playlist.fetch_flat() title = info.get('title', 'playlist') except Exception as e: raise HTTPException(status_code=500, detail=str(e)) safe_title = "".join( c for c in title if c.isascii() and (c.isalnum() or c in ' ._-') )[:60].strip() or 'playlist' utf8_title = urllib.parse.quote(title[:60].strip(), safe='._- ') def progress_callback(current: int, total: int, video_title: str): set_progress(url, { 'current': current, 'total': total, 'video_title': video_title, 'percent': round((current - 1) / total * 100, 1), }) return StreamingResponse( playlist.generate_zip( format_id=format_id, progress_callback=progress_callback, ), media_type='application/zip', headers={ 'Content-Disposition': ( f'attachment; ' f'filename="{safe_title}.zip"; ' f"filename*=UTF-8''{utf8_title}.zip" ), 'Cache-Control': 'no-cache', } ) @app.get('/playlist/watch', response_class=HTMLResponse) async def playlist_watch(request: Request, url: str = ''): if not url: return HTMLResponse('', status_code=302, headers={'Location': '/'}) playlist = YTPlaylist(url) info = await playlist.fetch_flat() summary = playlist.summary(info) return templates.TemplateResponse('Playlist.html', { 'request': request, 'title': summary['title'], 'uploader': summary['uploader'], 'video_count': summary['video_count'], 'entries': summary['entries'], 'playlist_url': url, }) @app.get('/playlist/progress') async def playlist_progress(url: str): return JSONResponse(get_progress(url)) if __name__ == '__main__': import uvicorn print(f'Local URL : http://localhost:{PORT}') uvicorn.run( 'app:app', host=HOST, port=PORT, log_level='warning', reload=False, )