Files
ytplayer/app.py

298 lines
9.5 KiB
Python

import aiohttp
from fastapi import FastAPI, Request, Response, HTTPException
from fastapi.responses import HTMLResponse, StreamingResponse, JSONResponse
from YTProcessing import *
from _config import *
import urllib.parse
from slowapi import Limiter, _rate_limit_exceeded_handler
from fastapi.middleware.gzip import GZipMiddleware
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from dotenv import load_dotenv
import secrets
from fastapi.templating import Jinja2Templates
templates = Jinja2Templates(directory='templates')
load_dotenv()
PORT = int(os.environ.get('PORT', 5000))
HOST = os.environ.get('HOST', '0.0.0.0')
SECRET_KEY = os.environ.get('SECRET_KEY', secrets.token_hex(32))
limiter = Limiter(key_func=get_remote_address)
app = FastAPI()
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
app.add_middleware(GZipMiddleware, minimum_size=1000)
@app.get('/', response_class=HTMLResponse)
async def home():
with open(HOME_HTML, 'r', encoding='utf-8') as f:
return HTMLResponse(f.read())
@app.post('/validate')
@limiter.limit('20/minute')
async def validate(request: Request):
body = await request.json()
url = (body or {}).get('url', '').strip()
if not url:
return JSONResponse({'ok': False, 'error': 'No URL provided.'})
ok, err = await YTVideoInfo(url).validate()
return JSONResponse({'ok': ok, 'error': err})
@app.get('/watch', response_class=HTMLResponse)
async def watch(request: Request, url: str = ''):
if not url:
return HTMLResponse('', status_code=302, headers={'Location': '/'})
info_obj = YTVideoInfo(url)
info = await info_obj.fetch()
formats = info_obj.get_formats(info)
subs = info_obj.get_subtitles(info)
meta = info_obj.summary(info)
from jinja2 import Template
with open(PLAYER_HTML, 'r', encoding='utf-8') as f:
template = Template(f.read())
return HTMLResponse(template.render(
meta=meta,
formats=formats,
default_format=formats[0]['id'] if formats else 'best',
subtitles=subs,
youtube_url=url,
))
@app.get('/stream')
@limiter.limit('30/minute')
async def stream(request: Request, url: str, format_id: str = 'best'):
streamer = YTVideoStream(url, format_id)
return StreamingResponse(
streamer.generate(),
media_type='video/mp4',
)
@app.get('/subtitles')
async def subtitles(url: str, lang: str, auto: str = '0'):
info_obj = YTVideoInfo(url)
info = await info_obj.fetch()
vtt_url = info_obj.get_subtitle_vtt_url(info, lang, auto == '1')
if not vtt_url:
return Response('', media_type='text/vtt')
async with aiohttp.ClientSession() as session:
async with session.get(vtt_url) as r:
text = await r.text()
return Response(text, media_type='text/vtt; charset=utf-8')
@app.get('/download')
@limiter.limit('5/minute')
async def download(request: Request, url: str, format_id: str = 'best'):
if not url:
raise HTTPException(status_code=400, detail='No URL provided.')
try:
downloader = YTVideoDownloader(url, format_id)
video_url, audio_url, title = await downloader.get_urls()
except Exception as e:
raise HTTPException(status_code=500, detail=f'Could not fetch video info: {str(e)}')
ascii_title = "".join(
c for c in title if c.isascii() and (c.isalnum() or c in ' ._-')
)[:80].strip() or 'video'
utf8_title = urllib.parse.quote(title[:80].strip(), safe='._- ')
def progress_callback(total_bytes: int):
mb = total_bytes / (1024 * 1024)
set_progress(url, {
'mb_received': round(mb, 1),
'speed': f'{mb:.1f} MB received',
})
return StreamingResponse(
downloader.generate(progress_callback=progress_callback),
media_type='video/mp4',
headers={
'Content-Disposition': (
f'attachment; '
f'filename="{ascii_title}.mp4"; '
f"filename*=UTF-8''{utf8_title}.mp4"
),
'X-Video-Title': utf8_title,
'Cache-Control': 'no-cache',
}
)
@app.get('/download/progress')
async def download_progress(url: str):
return JSONResponse(get_progress(url))
@app.get('/info')
async def info(url: str):
if not url:
raise HTTPException(status_code=400, detail='No URL provided.')
# Simple check — playlist URLs contain 'list='
is_playlist = 'list=' in url and 'watch?v=' not in url
try:
if is_playlist:
playlist = YTPlaylist(url)
raw = await playlist.fetch_flat()
return JSONResponse({
'type': 'playlist',
**playlist.summary(raw)
})
else:
info_obj = YTVideoInfo(url)
raw = await info_obj.fetch()
return JSONResponse({
'type': 'video',
'title': raw.get('title'),
'uploader': raw.get('uploader'),
'duration': raw.get('duration'),
'formats': info_obj.get_formats(raw),
'subtitles': info_obj.get_subtitles(raw),
})
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get('/playlist/info')
async def playlist_info(url: str):
if not url:
raise HTTPException(status_code=400, detail='No URL provided.')
try:
playlist = YTPlaylist(url)
info = await playlist.fetch_flat()
summary = playlist.summary(info)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
return JSONResponse(summary)
@app.get('/playlist/stream')
async def playlist_stream(
request: Request,
url: str,
index: int = 0, # which video in the playlist to stream (0-based)
format_id: str = 'best'
):
if not url:
raise HTTPException(status_code=400, detail='No URL provided.')
try:
playlist = YTPlaylist(url)
info = await playlist.fetch_flat()
entries = playlist.get_entries(info)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
if index >= len(entries):
raise HTTPException(status_code=404, detail='Video index out of range.')
entry = entries[index]
video_url = entry['url']
# Reuse YTVideoStream for the individual video
streamer = YTVideoStream(video_url, format_id)
return StreamingResponse(
streamer.generate(),
media_type='video/mp4',
headers={
'X-Playlist-Index': str(index),
'X-Playlist-Total': str(len(entries)),
'X-Video-Title': urllib.parse.quote(entry['title']),
}
)
@app.get('/playlist/download')
@limiter.limit('2/minute') # stricter limit — downloads entire playlist
async def playlist_download(
request: Request,
url: str,
format_id: str = 'best'
):
if not url:
raise HTTPException(status_code=400, detail='No URL provided.')
try:
# Quick flat fetch just to get the playlist title
playlist = YTPlaylist(url)
info = await playlist.fetch_flat()
title = info.get('title', 'playlist')
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
safe_title = "".join(
c for c in title if c.isascii() and (c.isalnum() or c in ' ._-')
)[:60].strip() or 'playlist'
utf8_title = urllib.parse.quote(title[:60].strip(), safe='._- ')
def progress_callback(current: int, total: int, video_title: str):
set_progress(url, {
'current': current,
'total': total,
'video_title': video_title,
'percent': round((current - 1) / total * 100, 1),
})
return StreamingResponse(
playlist.generate_zip(
format_id=format_id,
progress_callback=progress_callback,
),
media_type='application/zip',
headers={
'Content-Disposition': (
f'attachment; '
f'filename="{safe_title}.zip"; '
f"filename*=UTF-8''{utf8_title}.zip"
),
'Cache-Control': 'no-cache',
}
)
@app.get('/playlist/watch', response_class=HTMLResponse)
async def playlist_watch(request: Request, url: str = ''):
if not url:
return HTMLResponse('', status_code=302, headers={'Location': '/'})
playlist = YTPlaylist(url)
info = await playlist.fetch_flat()
summary = playlist.summary(info)
return templates.TemplateResponse('Playlist.html', {
'request': request,
'title': summary['title'],
'uploader': summary['uploader'],
'video_count': summary['video_count'],
'entries': summary['entries'],
'playlist_url': url,
})
@app.get('/playlist/progress')
async def playlist_progress(url: str):
return JSONResponse(get_progress(url))
if __name__ == '__main__':
import uvicorn
print(f'Local URL : http://localhost:{PORT}')
uvicorn.run(
'app:app',
host=HOST,
port=PORT,
log_level='warning',
reload=False,
)