使用 FastAPI 和 yt-dlp 构建 YouTube 下载器

使用 FastAPI 和 yt-dlp 构建 YouTube 下载器

在这份综合指南中,我们将使用 FastAPIyt-dlp 构建一个现代化的 YouTube 视频下载器。我们的应用程序将具有简洁的 Web 界面、实时下载进度跟踪和强大的错误处理功能,以应对 YouTube 不断发展的反爬虫措施。

我们将构建什么

  • Web 界面:使用 Bootstrap 构建的简洁、响应式 UI
  • 实时进度跟踪:实时下载进度更新
  • 格式选择:从各种视频/音频质量选项中选择
  • 任务管理:查看、监控和管理下载任务
  • 错误处理:处理 YouTube 的限制
  • 文件管理:安全的文件服务和下载

使用的技术

  • 后端:FastAPI (Python)
  • 视频处理:yt-dlp
  • 数据库:SQLite
  • 前端:HTML、CSS、JavaScript、Bootstrap

前置要求

在开始之前,请确保你有:

1
2
3
4
5
# Python 3.8+
python --version

# Required packages
pip install fastapi uvicorn yt-dlp sqlalchemy python-multipart jinja2

项目结构

1
2
3
4
5
6
7
8
9
10
11
12
13
yt-dlp-web/
├── app/
│ ├── main.py # FastAPI application
│ ├── models.py # Database models
│ ├── database.py # Database configuration
│ └── youtube_downloader.py # Core download logic
├── static/
│ ├── script.js # Frontend JavaScript
│ └── style.css # Custom styles
├── templates/
│ └── index.html # Main HTML template
├── downloads/ # Downloaded files directory
└── requirements.txt # Python dependencies

数据库模型

让我们从定义数据库模型开始:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# app/models.py
from sqlalchemy import Column, Integer, String, Float, DateTime, Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.sql import func
from enum import Enum

Base = declarative_base()

class TaskStatus(str, Enum):
PENDING = "pending"
DOWNLOADING = "downloading"
COMPLETED = "completed"
FAILED = "failed"

class DownloadTask(Base):
__tablename__ = "download_tasks"

id = Column(Integer, primary_key=True, index=True)
url = Column(String, nullable=False)
title = Column(String, nullable=True)
status = Column(String, default=TaskStatus.PENDING)
progress = Column(Float, default=0.0)
file_path = Column(String, nullable=True)
file_size = Column(Integer, nullable=True)
error_message = Column(Text, nullable=True)
format_id = Column(String, nullable=True)
created_at = Column(DateTime, default=func.now())
updated_at = Column(DateTime, default=func.now(), onupdate=func.now())

def to_dict(self):
return {
"id": self.id,
"url": self.url,
"title": self.title,
"status": self.status,
"progress": self.progress,
"file_path": self.file_path,
"file_size": self.file_size,
"error_message": self.error_message,
"format_id": self.format_id,
"created_at": self.created_at.isoformat() if self.created_at else None,
"updated_at": self.updated_at.isoformat() if self.updated_at else None
}

YouTube 下载类

我们应用程序的核心是 YouTube 下载器类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# app/youtube_downloader.py
import yt_dlp
import os
import threading
from typing import Dict, Any, Callable, Optional
from models import TaskStatus

class YouTubeDownloader:
def __init__(self, download_dir: str = "./downloads"):
self.download_dir = download_dir

def get_video_info(self, url: str) -> Dict[str, Any]:
"""Extract video information without downloading"""
ydl_opts = {
'quiet': True,
'no_warnings': True,
'extract_flat': False,
'extractor_args': {
'youtube': {
'player_client': ['tv', 'mweb', 'web'],
}
},
'http_headers': {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
},
'ignoreerrors': True
}

with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=False)

# Process and categorize available formats
formats = self._process_formats(info.get('formats', []))

return {
'title': info.get('title', 'Unknown'),
'duration': info.get('duration', 0),
'uploader': info.get('uploader', 'Unknown'),
'view_count': info.get('view_count', 0),
'thumbnail': info.get('thumbnail', ''),
'webpage_url': info.get('webpage_url', url),
'formats': formats
}

def download_video(self, task_id: int, url: str, format_id: str = 'best',
progress_callback: Optional[Callable] = None):
"""Download video in a separate thread"""
def download_thread():
try:
# Ensure download directory exists
if not os.path.exists(self.download_dir):
os.makedirs(self.download_dir)

# Configure yt-dlp options
ydl_opts = {
'format': self._get_format_selector(format_id),
'outtmpl': os.path.join(self.download_dir, "%(title)s-%(id)s.%(ext)s"),
'writeinfojson': True,
'merge_output_format': 'mp4',
'prefer_ffmpeg': True,
'extractor_args': {
'youtube': {
'player_client': ['tv', 'mweb', 'web'],
}
},
'retries': 3,
'fragment_retries': 3,
'socket_timeout': 30,
}

if progress_callback:
ydl_opts['progress_hooks'] = [self._progress_hook(task_id, progress_callback)]

with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([url])

except Exception as e:
if progress_callback:
progress_callback(task_id, 0, TaskStatus.FAILED, None, str(e))

thread = threading.Thread(target=download_thread)
thread.daemon = True
thread.start()
return thread

def _progress_hook(self, task_id: int, progress_callback: Callable):
"""Progress tracking hook for yt-dlp"""
def hook(d):
if d['status'] == 'downloading':
if 'total_bytes' in d:
progress = (d['downloaded_bytes'] / d['total_bytes']) * 100
elif 'total_bytes_estimate' in d:
progress = (d['downloaded_bytes'] / d['total_bytes_estimate']) * 100
else:
progress = 0

progress_callback(task_id, progress, TaskStatus.DOWNLOADING)

elif d['status'] == 'finished':
filename = d.get('filename')
if filename:
progress_callback(task_id, 100.0, TaskStatus.COMPLETED, filename)

return hook

FastAPI 应用

现在让我们构建 FastAPI 应用程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# app/main.py
from fastapi import FastAPI, Depends, HTTPException, Request
from fastapi.staticfiles import StaticFiles
from fastapi.responses import HTMLResponse, FileResponse
from fastapi.templating import Jinja2Templates
from fastapi.middleware.cors import CORSMiddleware
from sqlalchemy.orm import Session
from typing import List, Optional
import os

from database import get_db, create_tables, DOWNLOAD_DIR
from models import DownloadTask, TaskStatus
from youtube_downloader import YouTubeDownloader
from pydantic import BaseModel

# Initialize FastAPI app
app = FastAPI(title="YouTube Downloader", description="Modern YouTube video downloader")

# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)

# Initialize components
downloader = YouTubeDownloader(DOWNLOAD_DIR)
templates = Jinja2Templates(directory="templates")

# Mount static files
app.mount("/static", StaticFiles(directory="static"), name="static")

# Pydantic models for API
class VideoInfoRequest(BaseModel):
url: str

class DownloadRequest(BaseModel):
url: str
format_id: Optional[str] = "best"

# Progress callback function
def update_task_progress(task_id: int, progress: float, status: TaskStatus,
file_path: Optional[str] = None, error_message: Optional[str] = None):
"""Update task progress in database"""
db = next(get_db())
try:
task = db.query(DownloadTask).filter(DownloadTask.id == task_id).first()
if task:
task.progress = progress
task.status = status
if file_path:
# Handle file path properly
if os.path.isabs(file_path):
task.file_path = os.path.relpath(file_path, DOWNLOAD_DIR)
else:
task.file_path = file_path

# Get file size
full_path = os.path.join(DOWNLOAD_DIR, task.file_path)
if os.path.exists(full_path):
task.file_size = os.path.getsize(full_path)

if error_message:
task.error_message = error_message
db.commit()
finally:
db.close()

# API Routes
@app.get("/", response_class=HTMLResponse)
async def read_root(request: Request):
"""Serve the main page"""
return templates.TemplateResponse("index.html", {"request": request})

@app.post("/api/video-info")
async def get_video_info(request: VideoInfoRequest):
"""Get video information"""
try:
info = downloader.get_video_info(request.url)
return {"success": True, "data": info}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))

@app.post("/api/download")
async def start_download(request: DownloadRequest, db: Session = Depends(get_db)):
"""Start download task"""
try:
# Get video info first
video_info = downloader.get_video_info(request.url)

# Create task record
task = DownloadTask(
url=request.url,
title=video_info.get('title'),
format_id=request.format_id,
status=TaskStatus.PENDING
)
db.add(task)
db.commit()
db.refresh(task)

# Start download
downloader.download_video(
task.id,
request.url,
request.format_id,
update_task_progress
)

return {"success": True, "task_id": task.id, "message": "Download started"}

except Exception as e:
raise HTTPException(status_code=400, detail=str(e))

@app.get("/api/tasks")
async def get_tasks(db: Session = Depends(get_db)) -> List[dict]:
"""Get all download tasks"""
tasks = db.query(DownloadTask).order_by(DownloadTask.created_at.desc()).all()
return [task.to_dict() for task in tasks]

@app.get("/api/download-file/{task_id}")
async def download_file(task_id: int, db: Session = Depends(get_db)):
"""Serve downloaded file"""
task = db.query(DownloadTask).filter(DownloadTask.id == task_id).first()
if not task or not task.file_path:
raise HTTPException(status_code=404, detail="File not found")

file_path = os.path.join(DOWNLOAD_DIR, task.file_path)
if not os.path.exists(file_path):
raise HTTPException(status_code=404, detail="File not found")

return FileResponse(
file_path,
filename=os.path.basename(file_path),
media_type='application/octet-stream'
)

if __name__ == "__main__":
import uvicorn
uvicorn.run("main:app", host="127.0.0.1", port=8000, reload=True)

4. 下载 Web 预览

运行脚本时,你应该看到如下输出:

分析视频 URL

选择你喜欢的格式

检查下载进度

5. 测试

在下载过程中监控服务器性能:

1
2
3
4
5
6
7
8
9
10
11
# Check server logs for performance metrics
INFO: "POST /api/video-info HTTP/1.1" 200 OK
INFO: "POST /api/download HTTP/1.1" 200 OK
INFO: "GET /api/tasks/1 HTTP/1.1" 200 OK
📥 Starting download for task 1
🔗 URL: https://www.youtube.com/watch?v=dQw4w9WgXcQ
📋 Format: worst
📂 Output template: ./downloads/%(title)s-%(id)s.%(ext)s
🚀 Starting yt-dlp download...
✅ File saved: ./downloads/Rick Astley - Never Gonna Give You Up-dQw4w9WgXcQ.mp4 (Size: 3847291 bytes)
✅ Download completed for task 1

未来增强功能

  • 播放列表支持:下载整个播放列表
  • 用户认证:多用户支持
  • 下载调度:队列和计划下载
  • 云存储:与云存储服务集成
  • 移动应用:React Native 或 Flutter 移动应用
  • Docker 支持:容器化部署

资源


Coding


使用 FastAPI 和 yt-dlp 构建 YouTube 下载器
https://gzthss.com/2025/06/07/Building-a-YouTube-Downloader-with-FastAPI/
Author
GZTHSS
Posted on
June 7, 2025
Licensed under