298 lines
9.5 KiB
Python
298 lines
9.5 KiB
Python
import aiohttp
|
|
from fastapi import FastAPI, Request, Response, HTTPException
|
|
from fastapi.responses import HTMLResponse, StreamingResponse, JSONResponse
|
|
from YTProcessing import *
|
|
from _config import *
|
|
import urllib.parse
|
|
from slowapi import Limiter, _rate_limit_exceeded_handler
|
|
from fastapi.middleware.gzip import GZipMiddleware
|
|
from slowapi.util import get_remote_address
|
|
from slowapi.errors import RateLimitExceeded
|
|
from dotenv import load_dotenv
|
|
import secrets
|
|
from fastapi.templating import Jinja2Templates
|
|
|
|
templates = Jinja2Templates(directory='templates')
|
|
|
|
load_dotenv()
|
|
|
|
PORT = int(os.environ.get('PORT', 5000))
|
|
HOST = os.environ.get('HOST', '0.0.0.0')
|
|
SECRET_KEY = os.environ.get('SECRET_KEY', secrets.token_hex(32))
|
|
|
|
limiter = Limiter(key_func=get_remote_address)
|
|
app = FastAPI()
|
|
app.state.limiter = limiter
|
|
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
|
|
app.add_middleware(GZipMiddleware, minimum_size=1000)
|
|
|
|
@app.get('/', response_class=HTMLResponse)
|
|
async def home():
|
|
with open(HOME_HTML, 'r', encoding='utf-8') as f:
|
|
return HTMLResponse(f.read())
|
|
|
|
@app.post('/validate')
|
|
@limiter.limit('20/minute')
|
|
async def validate(request: Request):
|
|
body = await request.json()
|
|
url = (body or {}).get('url', '').strip()
|
|
if not url:
|
|
return JSONResponse({'ok': False, 'error': 'No URL provided.'})
|
|
ok, err = await YTVideoInfo(url).validate()
|
|
return JSONResponse({'ok': ok, 'error': err})
|
|
|
|
|
|
@app.get('/watch', response_class=HTMLResponse)
|
|
async def watch(request: Request, url: str = ''):
|
|
if not url:
|
|
return HTMLResponse('', status_code=302, headers={'Location': '/'})
|
|
|
|
info_obj = YTVideoInfo(url)
|
|
info = await info_obj.fetch()
|
|
formats = info_obj.get_formats(info)
|
|
subs = info_obj.get_subtitles(info)
|
|
meta = info_obj.summary(info)
|
|
|
|
from jinja2 import Template
|
|
|
|
with open(PLAYER_HTML, 'r', encoding='utf-8') as f:
|
|
template = Template(f.read())
|
|
|
|
return HTMLResponse(template.render(
|
|
meta=meta,
|
|
formats=formats,
|
|
default_format=formats[0]['id'] if formats else 'best',
|
|
subtitles=subs,
|
|
youtube_url=url,
|
|
))
|
|
|
|
@app.get('/stream')
|
|
@limiter.limit('30/minute')
|
|
async def stream(request: Request, url: str, format_id: str = 'best'):
|
|
streamer = YTVideoStream(url, format_id)
|
|
return StreamingResponse(
|
|
streamer.generate(),
|
|
media_type='video/mp4',
|
|
)
|
|
|
|
@app.get('/subtitles')
|
|
async def subtitles(url: str, lang: str, auto: str = '0'):
|
|
info_obj = YTVideoInfo(url)
|
|
info = await info_obj.fetch()
|
|
vtt_url = info_obj.get_subtitle_vtt_url(info, lang, auto == '1')
|
|
|
|
if not vtt_url:
|
|
return Response('', media_type='text/vtt')
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(vtt_url) as r:
|
|
text = await r.text()
|
|
|
|
return Response(text, media_type='text/vtt; charset=utf-8')
|
|
|
|
|
|
|
|
@app.get('/download')
|
|
@limiter.limit('5/minute')
|
|
async def download(request: Request, url: str, format_id: str = 'best'):
|
|
if not url:
|
|
raise HTTPException(status_code=400, detail='No URL provided.')
|
|
|
|
try:
|
|
downloader = YTVideoDownloader(url, format_id)
|
|
video_url, audio_url, title = await downloader.get_urls()
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f'Could not fetch video info: {str(e)}')
|
|
|
|
ascii_title = "".join(
|
|
c for c in title if c.isascii() and (c.isalnum() or c in ' ._-')
|
|
)[:80].strip() or 'video'
|
|
|
|
utf8_title = urllib.parse.quote(title[:80].strip(), safe='._- ')
|
|
|
|
def progress_callback(total_bytes: int):
|
|
mb = total_bytes / (1024 * 1024)
|
|
set_progress(url, {
|
|
'mb_received': round(mb, 1),
|
|
'speed': f'{mb:.1f} MB received',
|
|
})
|
|
|
|
return StreamingResponse(
|
|
downloader.generate(progress_callback=progress_callback),
|
|
media_type='video/mp4',
|
|
headers={
|
|
'Content-Disposition': (
|
|
f'attachment; '
|
|
f'filename="{ascii_title}.mp4"; '
|
|
f"filename*=UTF-8''{utf8_title}.mp4"
|
|
),
|
|
'X-Video-Title': utf8_title,
|
|
'Cache-Control': 'no-cache',
|
|
}
|
|
)
|
|
@app.get('/download/progress')
|
|
async def download_progress(url: str):
|
|
return JSONResponse(get_progress(url))
|
|
|
|
@app.get('/info')
|
|
async def info(url: str):
|
|
if not url:
|
|
raise HTTPException(status_code=400, detail='No URL provided.')
|
|
|
|
# Simple check — playlist URLs contain 'list='
|
|
is_playlist = 'list=' in url and 'watch?v=' not in url
|
|
|
|
try:
|
|
if is_playlist:
|
|
playlist = YTPlaylist(url)
|
|
raw = await playlist.fetch_flat()
|
|
return JSONResponse({
|
|
'type': 'playlist',
|
|
**playlist.summary(raw)
|
|
})
|
|
else:
|
|
info_obj = YTVideoInfo(url)
|
|
raw = await info_obj.fetch()
|
|
return JSONResponse({
|
|
'type': 'video',
|
|
'title': raw.get('title'),
|
|
'uploader': raw.get('uploader'),
|
|
'duration': raw.get('duration'),
|
|
'formats': info_obj.get_formats(raw),
|
|
'subtitles': info_obj.get_subtitles(raw),
|
|
})
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@app.get('/playlist/info')
|
|
async def playlist_info(url: str):
|
|
if not url:
|
|
raise HTTPException(status_code=400, detail='No URL provided.')
|
|
|
|
try:
|
|
playlist = YTPlaylist(url)
|
|
info = await playlist.fetch_flat()
|
|
summary = playlist.summary(info)
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
return JSONResponse(summary)
|
|
|
|
|
|
@app.get('/playlist/stream')
|
|
async def playlist_stream(
|
|
request: Request,
|
|
url: str,
|
|
index: int = 0, # which video in the playlist to stream (0-based)
|
|
format_id: str = 'best'
|
|
):
|
|
if not url:
|
|
raise HTTPException(status_code=400, detail='No URL provided.')
|
|
|
|
try:
|
|
playlist = YTPlaylist(url)
|
|
info = await playlist.fetch_flat()
|
|
entries = playlist.get_entries(info)
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
if index >= len(entries):
|
|
raise HTTPException(status_code=404, detail='Video index out of range.')
|
|
|
|
entry = entries[index]
|
|
video_url = entry['url']
|
|
|
|
# Reuse YTVideoStream for the individual video
|
|
streamer = YTVideoStream(video_url, format_id)
|
|
|
|
return StreamingResponse(
|
|
streamer.generate(),
|
|
media_type='video/mp4',
|
|
headers={
|
|
'X-Playlist-Index': str(index),
|
|
'X-Playlist-Total': str(len(entries)),
|
|
'X-Video-Title': urllib.parse.quote(entry['title']),
|
|
}
|
|
)
|
|
|
|
|
|
@app.get('/playlist/download')
|
|
@limiter.limit('2/minute') # stricter limit — downloads entire playlist
|
|
async def playlist_download(
|
|
request: Request,
|
|
url: str,
|
|
format_id: str = 'best'
|
|
):
|
|
if not url:
|
|
raise HTTPException(status_code=400, detail='No URL provided.')
|
|
|
|
try:
|
|
# Quick flat fetch just to get the playlist title
|
|
playlist = YTPlaylist(url)
|
|
info = await playlist.fetch_flat()
|
|
title = info.get('title', 'playlist')
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
safe_title = "".join(
|
|
c for c in title if c.isascii() and (c.isalnum() or c in ' ._-')
|
|
)[:60].strip() or 'playlist'
|
|
|
|
utf8_title = urllib.parse.quote(title[:60].strip(), safe='._- ')
|
|
|
|
def progress_callback(current: int, total: int, video_title: str):
|
|
set_progress(url, {
|
|
'current': current,
|
|
'total': total,
|
|
'video_title': video_title,
|
|
'percent': round((current - 1) / total * 100, 1),
|
|
})
|
|
|
|
return StreamingResponse(
|
|
playlist.generate_zip(
|
|
format_id=format_id,
|
|
progress_callback=progress_callback,
|
|
),
|
|
media_type='application/zip',
|
|
headers={
|
|
'Content-Disposition': (
|
|
f'attachment; '
|
|
f'filename="{safe_title}.zip"; '
|
|
f"filename*=UTF-8''{utf8_title}.zip"
|
|
),
|
|
'Cache-Control': 'no-cache',
|
|
}
|
|
)
|
|
|
|
@app.get('/playlist/watch', response_class=HTMLResponse)
|
|
async def playlist_watch(request: Request, url: str = ''):
|
|
if not url:
|
|
return HTMLResponse('', status_code=302, headers={'Location': '/'})
|
|
|
|
playlist = YTPlaylist(url)
|
|
info = await playlist.fetch_flat()
|
|
summary = playlist.summary(info)
|
|
|
|
return templates.TemplateResponse('Playlist.html', {
|
|
'request': request,
|
|
'title': summary['title'],
|
|
'uploader': summary['uploader'],
|
|
'video_count': summary['video_count'],
|
|
'entries': summary['entries'],
|
|
'playlist_url': url,
|
|
})
|
|
|
|
@app.get('/playlist/progress')
|
|
async def playlist_progress(url: str):
|
|
return JSONResponse(get_progress(url))
|
|
|
|
if __name__ == '__main__':
|
|
import uvicorn
|
|
print(f'Local URL : http://localhost:{PORT}')
|
|
uvicorn.run(
|
|
'app:app',
|
|
host=HOST,
|
|
port=PORT,
|
|
log_level='warning',
|
|
reload=False,
|
|
) |