Files
youclonedl/backend/services/ytdlp.py
2026-05-25 21:19:26 +02:00

517 lines
18 KiB
Python

"""Subprocess wrapper for yt-dlp."""
import json
import re
import subprocess
import threading
import urllib.request
import xml.etree.ElementTree as ET
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from ..config import settings
def _run(args: list[str], timeout: int = 60) -> tuple[str, str, int]:
result = subprocess.run(args, capture_output=True, text=True, timeout=timeout)
return result.stdout, result.stderr, result.returncode
def _parse_date(date_str: str | None) -> datetime | None:
if not date_str:
return None
try:
return datetime.strptime(date_str, "%Y%m%d")
except ValueError:
return None
def _parse_published(info: dict) -> datetime | None:
"""Extract publish date from yt-dlp info dict.
Tries upload_date (YYYYMMDD string) first, then timestamp (Unix epoch),
then release_timestamp. Flat-playlist entries often omit upload_date but
include timestamp, so the fallback is important.
"""
d = _parse_date(info.get("upload_date"))
if d:
return d
for key in ("timestamp", "release_timestamp"):
ts = info.get(key)
if ts:
try:
return datetime.utcfromtimestamp(float(ts))
except (ValueError, OSError, OverflowError):
pass
return None
def _stable_thumbnail(video_id: str | None) -> str | None:
if not video_id:
return None
return f"https://i.ytimg.com/vi/{video_id}/hqdefault.jpg"
def _normalize_video(info: dict) -> dict:
video_id = info.get("id")
raw_chapters = info.get("chapters") or []
chapters = [
{
"start_time": int(ch.get("start_time") or 0),
"end_time": int(ch.get("end_time") or 0),
"title": ch.get("title") or "",
}
for ch in raw_chapters
if ch.get("title")
]
return {
"youtube_video_id": video_id,
"title": info.get("title", ""),
"description": info.get("description", ""),
"thumbnail_url": _stable_thumbnail(video_id),
"duration_seconds": info.get("duration"),
"published_at": _parse_published(info),
"tags": json.dumps(info.get("tags") or []),
"category": info.get("category") or (info.get("categories") or [None])[0],
"chapters": json.dumps(chapters) if chapters else None,
"channel": {
"youtube_channel_id": info.get("channel_id"),
"name": info.get("channel") or info.get("uploader", ""),
"thumbnail_url": None,
},
}
def _channel_avatar(thumbnails: list | None) -> str | None:
"""Pick the channel avatar from yt-dlp's thumbnails list.
YouTube returns banners and avatars in the same array. Avatars have id
'avatar_uncropped' or are roughly square (width ≈ height).
"""
if not thumbnails:
return None
for t in thumbnails:
if "avatar" in str(t.get("id") or "").lower():
return t.get("url")
# Fall back to the most square thumbnail
square = [t for t in thumbnails
if t.get("width") and t.get("height")
and t["width"] <= t["height"] * 1.2
and t["height"] <= t["width"] * 1.2]
if square:
return max(square, key=lambda t: t.get("width") or 0).get("url")
return None
def _normalize_channel(info: dict) -> dict:
return {
"youtube_channel_id": info.get("channel_id") or info.get("id"),
"name": info.get("channel") or info.get("title") or info.get("uploader") or None,
"description": info.get("description") or None,
"thumbnail_url": _channel_avatar(info.get("thumbnails")),
"banner_url": None,
"subscriber_count": info.get("channel_follower_count"),
}
def search_youtube(query: str, max_results: int = 40) -> list[dict]:
"""Search YouTube via yt-dlp. Uses --flat-playlist for fast results."""
stdout, _, code = _run([
"yt-dlp",
f"ytsearch{max_results}:{query}",
"--dump-json",
"--flat-playlist",
"--quiet",
*_cookie_args(),
], timeout=60)
results = []
for line in stdout.splitlines():
line = line.strip()
if not line:
continue
try:
info = json.loads(line)
# flat-playlist entries have _type="url" with basic fields
if info.get("_type") in ("url", None) and info.get("id"):
results.append({
"youtube_video_id": info.get("id"),
"title": info.get("title", ""),
"description": info.get("description") or "",
"thumbnail_url": _stable_thumbnail(info.get("id")),
"duration_seconds": info.get("duration"),
"published_at": _parse_published(info),
"tags": json.dumps(info.get("tags") or []),
"category": None,
"channel": {
"youtube_channel_id": info.get("channel_id"),
"name": info.get("channel") or info.get("uploader") or "",
"thumbnail_url": None,
},
})
except json.JSONDecodeError:
continue
return results
def fetch_trending(region: str = "US", max_results: int = 50) -> list[dict]:
"""Fetch trending videos for a region via yt-dlp search with date-sort filter.
Uses the YouTube search sort-by-upload-date URL that reliably returns regional
results. Falls back gracefully to an empty list on error.
"""
region = region.upper()
# CAI%3D = sort by upload date; gl= sets the region
url = f"https://www.youtube.com/results?search_query=trending&sp=CAI%253D&gl={region}"
stdout, _, code = _run([
"yt-dlp",
url,
"--dump-json",
"--flat-playlist",
"--quiet",
"--playlist-end", str(max_results),
*_cookie_args(),
], timeout=60)
results = []
for line in stdout.splitlines():
line = line.strip()
if not line:
continue
try:
info = json.loads(line)
if info.get("_type") in ("url", None) and info.get("id"):
results.append({
"youtube_video_id": info.get("id"),
"title": info.get("title", ""),
"thumbnail_url": _stable_thumbnail(info.get("id")),
"duration_seconds": info.get("duration"),
"published_at": _parse_published(info),
"tags": json.dumps(info.get("tags") or []),
"category": None,
"channel": {
"youtube_channel_id": info.get("channel_id"),
"name": info.get("channel") or info.get("uploader") or "",
"thumbnail_url": None,
},
})
except json.JSONDecodeError:
continue
return results
def _best_thumbnail(thumbnails: list | None) -> str | None:
if not thumbnails:
return None
# pick the one closest to 480px wide
best = sorted(thumbnails, key=lambda t: abs((t.get("width") or 0) - 480))
return best[0].get("url") if best else None
def fetch_video_metadata(video_id: str) -> dict | None:
"""Fetch metadata for a single video by YouTube ID."""
url = f"https://www.youtube.com/watch?v={video_id}"
stdout, _, code = _run([
"yt-dlp",
url,
"--dump-json",
"--no-download",
"--no-playlist",
"--quiet",
"--extractor-args", "youtube:player_client=ios,web",
*_cookie_args(),
], timeout=30)
for line in stdout.splitlines():
line = line.strip()
if not line:
continue
try:
info = json.loads(line)
return _normalize_video(info)
except json.JSONDecodeError:
continue
return None
def _rss_dates(uc_channel_id: str) -> dict[str, datetime]:
"""Fetch publish dates for the 15 most recent videos from YouTube's RSS feed.
Fast, unauthenticated, and returns precise dates. Only works for UC… IDs.
"""
if not uc_channel_id or not uc_channel_id.startswith("UC"):
return {}
url = f"https://www.youtube.com/feeds/videos.xml?channel_id={uc_channel_id}"
try:
req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"})
with urllib.request.urlopen(req, timeout=10) as resp:
xml_data = resp.read()
root = ET.fromstring(xml_data)
ns = {
"atom": "http://www.w3.org/2005/Atom",
"yt": "http://www.youtube.com/xml/schemas/2015",
}
dates: dict[str, datetime] = {}
for entry in root.findall("atom:entry", ns):
vid_el = entry.find("yt:videoId", ns)
pub_el = entry.find("atom:published", ns)
if vid_el is not None and pub_el is not None and vid_el.text and pub_el.text:
try:
dt = datetime.fromisoformat(pub_el.text.replace("Z", "+00:00"))
dates[vid_el.text] = dt.replace(tzinfo=None)
except ValueError:
pass
return dates
except Exception:
return {}
def fetch_channel_metadata(channel_id: str, max_videos: int = 30) -> dict | None:
"""Fetch channel info + recent videos.
Uses --dump-single-json --flat-playlist for speed, then enriches video dates
from YouTube's RSS feed (gives precise dates for the 15 most recent videos).
"""
if channel_id.startswith("@"):
url = f"https://www.youtube.com/{channel_id}/videos"
else:
url = f"https://www.youtube.com/channel/{channel_id}/videos"
args = [
"yt-dlp", url,
"--dump-single-json",
"--flat-playlist",
"--quiet",
*_cookie_args(),
]
if max_videos > 0:
args += ["--playlist-end", str(max_videos)]
stdout, _, code = _run(args, timeout=60)
if not stdout.strip():
return None
try:
info = json.loads(stdout.strip())
except json.JSONDecodeError:
return None
if not info.get("id") and not info.get("channel_id"):
return None
channel_info = _normalize_channel(info)
# Fetch RSS dates — fast single HTTP request, precise dates for ≤15 newest videos
uc_id = channel_info.get("youtube_channel_id") or ""
rss = _rss_dates(uc_id)
videos = []
for entry in info.get("entries") or []:
vid_id = entry.get("id")
if not vid_id:
continue
published_at = rss.get(vid_id) or _parse_published(entry)
videos.append({
"youtube_video_id": vid_id,
"title": entry.get("title") or "",
"description": entry.get("description") or None,
"thumbnail_url": _stable_thumbnail(vid_id),
"duration_seconds": entry.get("duration"),
"published_at": published_at,
"tags": json.dumps(entry.get("tags") or []),
"category": (entry.get("categories") or [None])[0],
"channel": {
"youtube_channel_id": channel_info.get("youtube_channel_id"),
"name": channel_info.get("name") or "",
"thumbnail_url": None,
},
})
return {"channel": channel_info, "videos": videos}
def fetch_channel_links(channel_id: str) -> list[str]:
"""Extract linked channel IDs from a channel's about/description."""
if channel_id.startswith("@"):
url = f"https://www.youtube.com/{channel_id}/about"
else:
url = f"https://www.youtube.com/channel/{channel_id}/about"
stdout, _, code = _run([
"yt-dlp",
url,
"--dump-json",
"--no-download",
"--flat-playlist",
"--playlist-end", "1",
"--quiet",
*_cookie_args(),
], timeout=30)
channel_ids = set()
for line in stdout.splitlines():
line = line.strip()
if not line:
continue
try:
info = json.loads(line)
desc = info.get("description", "") or ""
for match in re.finditer(r"youtube\.com/channel/(UC[\w-]+)", desc):
channel_ids.add(match.group(1))
for match in re.finditer(r"youtube\.com/@([\w-]+)", desc):
channel_ids.add(f"@{match.group(1)}")
except json.JSONDecodeError:
continue
return list(channel_ids)
QUALITY_FORMATS = {
"best": "bestvideo[ext=mp4][vcodec^=avc1]+bestaudio[ext=m4a]/bestvideo[ext=mp4]+bestaudio[ext=m4a]/22/18/bestvideo+bestaudio/best",
"2160p": "bestvideo[ext=mp4][height<=2160]+bestaudio[ext=m4a]/bestvideo[height<=2160]+bestaudio/best[height<=2160]",
"1440p": "bestvideo[ext=mp4][height<=1440]+bestaudio[ext=m4a]/bestvideo[height<=1440]+bestaudio/best[height<=1440]",
"1080p": "bestvideo[ext=mp4][vcodec^=avc1][height<=1080]+bestaudio[ext=m4a]/bestvideo[ext=mp4][height<=1080]+bestaudio[ext=m4a]/137+140/22/best[height<=1080]",
"720p": "bestvideo[ext=mp4][vcodec^=avc1][height<=720]+bestaudio[ext=m4a]/bestvideo[ext=mp4][height<=720]+bestaudio[ext=m4a]/22/best[height<=720]",
"480p": "bestvideo[ext=mp4][vcodec^=avc1][height<=480]+bestaudio[ext=m4a]/bestvideo[ext=mp4][height<=480]+bestaudio[ext=m4a]/18/best[height<=480]",
"360p": "bestvideo[ext=mp4][height<=360]+bestaudio[ext=m4a]/18/best[height<=360]",
"240p": "bestvideo[ext=mp4][height<=240]+bestaudio[ext=m4a]/best[height<=240]",
"144p": "bestvideo[ext=mp4][height<=144]+bestaudio[ext=m4a]/best[height<=144]",
}
def detect_resolution(file_path: str) -> str | None:
"""Use ffprobe to get the video stream height and return a label like '1080p'."""
try:
result = subprocess.run(
["ffprobe", "-v", "quiet", "-select_streams", "v:0",
"-show_entries", "stream=height", "-of", "csv=p=0", file_path],
capture_output=True, text=True, timeout=15,
)
height = int(result.stdout.strip())
if height >= 1080: return "1080p"
if height >= 720: return "720p"
if height >= 480: return "480p"
if height >= 360: return "360p"
return f"{height}p"
except Exception:
return None
def predicted_file_path(video_id: str) -> Path:
"""Return the expected output path for a video download."""
return Path(settings.download_path) / f"{video_id}.mp4"
_SEMAPHORE = threading.Semaphore(3)
_semaphore_lock = threading.Lock()
_cookies_browser: str = ""
_cookies_file: str = ""
_cookies_lock = threading.Lock()
_AUTO_COOKIES_PATHS = ["/data/cookies.txt"]
def set_max_concurrent(n: int) -> None:
global _SEMAPHORE
with _semaphore_lock:
_SEMAPHORE = threading.Semaphore(max(1, min(n, 10)))
def set_cookies_browser(browser: str) -> None:
global _cookies_browser
with _cookies_lock:
_cookies_browser = browser.strip().lower()
def set_cookies_file(path: str) -> None:
global _cookies_file
with _cookies_lock:
_cookies_file = path.strip()
def _cookie_args() -> list[str]:
with _cookies_lock:
cf = _cookies_file
b = _cookies_browser
# Prefer explicit cookies file
if cf and Path(cf).exists():
return ["--cookies", cf]
# Auto-detect cookies.txt in well-known Docker locations
for candidate in _AUTO_COOKIES_PATHS:
if Path(candidate).exists():
return ["--cookies", candidate]
# Fall back to browser (works in local dev, not in Docker)
if b:
return ["--cookies-from-browser", b]
return []
def start_download(
video_id: str,
download_id: int,
on_progress: Any,
on_complete: Any,
on_error: Any,
quality: str = "best",
) -> None:
"""Start yt-dlp download in a background thread.
Uses a single progressive MP4 format so the file is playable as it downloads.
--no-part writes directly to the final filename (no .part rename at the end).
"""
url = f"https://www.youtube.com/watch?v={video_id}"
# Predictable output path — lets the player start before download finishes
output_template = str(Path(settings.download_path) / f"{video_id}.%(ext)s")
fmt = QUALITY_FORMATS.get(quality, QUALITY_FORMATS["best"])
def _run_download():
with _SEMAPHORE:
cookie_args = _cookie_args()
import logging
logging.getLogger(__name__).info("download cookie_args=%r file_exists=%s", cookie_args, Path(cookie_args[1]).exists() if len(cookie_args) == 2 and cookie_args[0] == "--cookies" else "n/a")
process = subprocess.Popen(
[
"yt-dlp", url,
"-f", fmt,
"--merge-output-format", "mp4",
"--postprocessor-args", "Merger+ffmpeg:-movflags +faststart",
"--embed-metadata", "--embed-thumbnail",
"--no-part", "--no-mtime",
"-o", output_template,
"--newline", "--progress", "--no-colors",
"--extractor-args", "youtube:player_client=ios,web",
*_cookie_args(),
],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
)
file_path = None
stream_index = 0
output_lines: list[str] = []
for line in process.stdout:
line = line.strip()
output_lines.append(line)
if re.search(r"\[download\] Destination:", line):
stream_index += 1
m = re.search(r"\[download\]\s+([\d.]+)%", line)
if m:
pct = float(m.group(1))
scaled = pct * 0.85 if stream_index <= 1 else 85.0 + pct * 0.10
on_progress(download_id, min(scaled, 95.0))
m2 = re.search(r"\[(?:download|Merger)\] Destination: (.+)", line)
if m2:
file_path = m2.group(1).strip()
process.wait()
if process.returncode == 0:
resolution = detect_resolution(file_path) if file_path else None
on_complete(download_id, file_path, resolution)
else:
tail = "\n".join(output_lines[-20:]) if output_lines else "(no output)"
import logging
logging.getLogger(__name__).error("yt-dlp failed (code %d):\n%s", process.returncode, tail)
on_error(download_id, f"yt-dlp exited with code {process.returncode}:\n{tail}")
thread = threading.Thread(target=_run_download, daemon=True)
thread.start()