Node.js is required by yt-dlp to solve YouTube's n-challenge (format URL deobfuscation). Without it the web client returns no video formats. The tv and ios player clients were removed — both require GVS PO tokens that we don't have, so they only produce warnings and block every request. The web client with Node.js installed gives 30+ formats and works cleanly. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
607 lines
22 KiB
Python
607 lines
22 KiB
Python
"""Subprocess wrapper for yt-dlp."""
|
|
import json
|
|
import re
|
|
import subprocess
|
|
import threading
|
|
import urllib.request
|
|
import xml.etree.ElementTree as ET
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from ..config import settings
|
|
|
|
|
|
def _run(args: list[str], timeout: int = 60) -> tuple[str, str, int]:
|
|
result = subprocess.run(args, capture_output=True, text=True, timeout=timeout)
|
|
return result.stdout, result.stderr, result.returncode
|
|
|
|
|
|
def _parse_date(date_str: str | None) -> datetime | None:
|
|
if not date_str:
|
|
return None
|
|
try:
|
|
return datetime.strptime(date_str, "%Y%m%d")
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
def _parse_published(info: dict) -> datetime | None:
|
|
"""Extract publish date from yt-dlp info dict.
|
|
|
|
Tries upload_date (YYYYMMDD string) first, then timestamp (Unix epoch),
|
|
then release_timestamp. Flat-playlist entries often omit upload_date but
|
|
include timestamp, so the fallback is important.
|
|
"""
|
|
d = _parse_date(info.get("upload_date"))
|
|
if d:
|
|
return d
|
|
for key in ("timestamp", "release_timestamp"):
|
|
ts = info.get(key)
|
|
if ts:
|
|
try:
|
|
return datetime.utcfromtimestamp(float(ts))
|
|
except (ValueError, OSError, OverflowError):
|
|
pass
|
|
return None
|
|
|
|
|
|
def _stable_thumbnail(video_id: str | None) -> str | None:
|
|
if not video_id:
|
|
return None
|
|
return f"https://i.ytimg.com/vi/{video_id}/hqdefault.jpg"
|
|
|
|
|
|
def _normalize_video(info: dict) -> dict:
|
|
video_id = info.get("id")
|
|
raw_chapters = info.get("chapters") or []
|
|
chapters = [
|
|
{
|
|
"start_time": int(ch.get("start_time") or 0),
|
|
"end_time": int(ch.get("end_time") or 0),
|
|
"title": ch.get("title") or "",
|
|
}
|
|
for ch in raw_chapters
|
|
if ch.get("title")
|
|
]
|
|
return {
|
|
"youtube_video_id": video_id,
|
|
"title": info.get("title", ""),
|
|
"description": info.get("description", ""),
|
|
"thumbnail_url": _stable_thumbnail(video_id),
|
|
"duration_seconds": info.get("duration"),
|
|
"published_at": _parse_published(info),
|
|
"tags": json.dumps(info.get("tags") or []),
|
|
"category": info.get("category") or (info.get("categories") or [None])[0],
|
|
"chapters": json.dumps(chapters) if chapters else None,
|
|
"channel": {
|
|
"youtube_channel_id": info.get("channel_id"),
|
|
"name": info.get("channel") or info.get("uploader", ""),
|
|
"thumbnail_url": None,
|
|
},
|
|
}
|
|
|
|
|
|
def _channel_avatar(thumbnails: list | None) -> str | None:
|
|
"""Pick the channel avatar from yt-dlp's thumbnails list.
|
|
|
|
YouTube returns banners and avatars in the same array. Avatars have id
|
|
'avatar_uncropped' or are roughly square (width ≈ height).
|
|
"""
|
|
if not thumbnails:
|
|
return None
|
|
for t in thumbnails:
|
|
if "avatar" in str(t.get("id") or "").lower():
|
|
return t.get("url")
|
|
# Fall back to the most square thumbnail
|
|
square = [t for t in thumbnails
|
|
if t.get("width") and t.get("height")
|
|
and t["width"] <= t["height"] * 1.2
|
|
and t["height"] <= t["width"] * 1.2]
|
|
if square:
|
|
return max(square, key=lambda t: t.get("width") or 0).get("url")
|
|
return None
|
|
|
|
|
|
def _normalize_channel(info: dict) -> dict:
|
|
return {
|
|
"youtube_channel_id": info.get("channel_id") or info.get("id"),
|
|
"name": info.get("channel") or info.get("title") or info.get("uploader") or None,
|
|
"description": info.get("description") or None,
|
|
"thumbnail_url": _channel_avatar(info.get("thumbnails")),
|
|
"banner_url": None,
|
|
"subscriber_count": info.get("channel_follower_count"),
|
|
}
|
|
|
|
|
|
def search_youtube(query: str, max_results: int = 40) -> list[dict]:
|
|
"""Search YouTube via yt-dlp. Uses --flat-playlist for fast results."""
|
|
stdout, _, code = _run([
|
|
"yt-dlp",
|
|
f"ytsearch{max_results}:{query}",
|
|
"--dump-json",
|
|
"--flat-playlist",
|
|
"--quiet",
|
|
*_cookie_args(),
|
|
], timeout=60)
|
|
|
|
results = []
|
|
for line in stdout.splitlines():
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
info = json.loads(line)
|
|
# flat-playlist entries have _type="url" with basic fields
|
|
if info.get("_type") in ("url", None) and info.get("id"):
|
|
results.append({
|
|
"youtube_video_id": info.get("id"),
|
|
"title": info.get("title", ""),
|
|
"description": info.get("description") or "",
|
|
"thumbnail_url": _stable_thumbnail(info.get("id")),
|
|
"duration_seconds": info.get("duration"),
|
|
"published_at": _parse_published(info),
|
|
"tags": json.dumps(info.get("tags") or []),
|
|
"category": None,
|
|
"channel": {
|
|
"youtube_channel_id": info.get("channel_id"),
|
|
"name": info.get("channel") or info.get("uploader") or "",
|
|
"thumbnail_url": None,
|
|
},
|
|
})
|
|
except json.JSONDecodeError:
|
|
continue
|
|
return results
|
|
|
|
|
|
def fetch_trending(region: str = "US", max_results: int = 50) -> list[dict]:
|
|
"""Fetch trending videos for a region via yt-dlp search with date-sort filter.
|
|
|
|
Uses the YouTube search sort-by-upload-date URL that reliably returns regional
|
|
results. Falls back gracefully to an empty list on error.
|
|
"""
|
|
region = region.upper()
|
|
# CAI%3D = sort by upload date; gl= sets the region
|
|
url = f"https://www.youtube.com/results?search_query=trending&sp=CAI%253D&gl={region}"
|
|
stdout, _, code = _run([
|
|
"yt-dlp",
|
|
url,
|
|
"--dump-json",
|
|
"--flat-playlist",
|
|
"--quiet",
|
|
"--playlist-end", str(max_results),
|
|
*_cookie_args(),
|
|
], timeout=60)
|
|
|
|
results = []
|
|
for line in stdout.splitlines():
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
info = json.loads(line)
|
|
if info.get("_type") in ("url", None) and info.get("id"):
|
|
results.append({
|
|
"youtube_video_id": info.get("id"),
|
|
"title": info.get("title", ""),
|
|
"thumbnail_url": _stable_thumbnail(info.get("id")),
|
|
"duration_seconds": info.get("duration"),
|
|
"published_at": _parse_published(info),
|
|
"tags": json.dumps(info.get("tags") or []),
|
|
"category": None,
|
|
"channel": {
|
|
"youtube_channel_id": info.get("channel_id"),
|
|
"name": info.get("channel") or info.get("uploader") or "",
|
|
"thumbnail_url": None,
|
|
},
|
|
})
|
|
except json.JSONDecodeError:
|
|
continue
|
|
return results
|
|
|
|
|
|
def _best_thumbnail(thumbnails: list | None) -> str | None:
|
|
if not thumbnails:
|
|
return None
|
|
# pick the one closest to 480px wide
|
|
best = sorted(thumbnails, key=lambda t: abs((t.get("width") or 0) - 480))
|
|
return best[0].get("url") if best else None
|
|
|
|
|
|
def fetch_video_metadata(video_id: str) -> dict | None:
|
|
"""Fetch metadata for a single video by YouTube ID."""
|
|
url = f"https://www.youtube.com/watch?v={video_id}"
|
|
cookie_args = _cookie_args()
|
|
print(f"[fetch_meta] video={video_id} cookie_args={cookie_args!r}", flush=True)
|
|
base_cmd = [
|
|
"yt-dlp", url,
|
|
"--dump-json", "--no-download", "--no-playlist",
|
|
"--extractor-args", "youtube:player_client=web",
|
|
]
|
|
stdout, stderr, code = _run([*base_cmd, *cookie_args], timeout=30)
|
|
if code != 0:
|
|
print(f"[fetch_meta] FAILED code={code} stderr={stderr[:500]!r}", flush=True)
|
|
# Retry without auth args — broken cookie config shouldn't block public videos
|
|
if cookie_args:
|
|
print(f"[fetch_meta] retrying without cookie args", flush=True)
|
|
stdout, stderr, code = _run(base_cmd, timeout=30)
|
|
if code != 0:
|
|
print(f"[fetch_meta] retry also FAILED code={code}", flush=True)
|
|
|
|
for line in stdout.splitlines():
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
info = json.loads(line)
|
|
return _normalize_video(info)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
return None
|
|
|
|
|
|
def _rss_dates(uc_channel_id: str) -> dict[str, datetime]:
|
|
"""Fetch publish dates for the 15 most recent videos from YouTube's RSS feed.
|
|
|
|
Fast, unauthenticated, and returns precise dates. Only works for UC… IDs.
|
|
"""
|
|
if not uc_channel_id or not uc_channel_id.startswith("UC"):
|
|
return {}
|
|
url = f"https://www.youtube.com/feeds/videos.xml?channel_id={uc_channel_id}"
|
|
try:
|
|
req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"})
|
|
with urllib.request.urlopen(req, timeout=10) as resp:
|
|
xml_data = resp.read()
|
|
root = ET.fromstring(xml_data)
|
|
ns = {
|
|
"atom": "http://www.w3.org/2005/Atom",
|
|
"yt": "http://www.youtube.com/xml/schemas/2015",
|
|
}
|
|
dates: dict[str, datetime] = {}
|
|
for entry in root.findall("atom:entry", ns):
|
|
vid_el = entry.find("yt:videoId", ns)
|
|
pub_el = entry.find("atom:published", ns)
|
|
if vid_el is not None and pub_el is not None and vid_el.text and pub_el.text:
|
|
try:
|
|
dt = datetime.fromisoformat(pub_el.text.replace("Z", "+00:00"))
|
|
dates[vid_el.text] = dt.replace(tzinfo=None)
|
|
except ValueError:
|
|
pass
|
|
return dates
|
|
except Exception:
|
|
return {}
|
|
|
|
|
|
def fetch_channel_metadata(channel_id: str, max_videos: int = 30) -> dict | None:
|
|
"""Fetch channel info + recent videos.
|
|
|
|
Uses --dump-single-json --flat-playlist for speed, then enriches video dates
|
|
from YouTube's RSS feed (gives precise dates for the 15 most recent videos).
|
|
"""
|
|
if channel_id.startswith("@"):
|
|
url = f"https://www.youtube.com/{channel_id}/videos"
|
|
else:
|
|
url = f"https://www.youtube.com/channel/{channel_id}/videos"
|
|
args = [
|
|
"yt-dlp", url,
|
|
"--dump-single-json",
|
|
"--flat-playlist",
|
|
"--quiet",
|
|
*_cookie_args(),
|
|
]
|
|
if max_videos > 0:
|
|
args += ["--playlist-end", str(max_videos)]
|
|
|
|
stdout, _, code = _run(args, timeout=60)
|
|
if not stdout.strip():
|
|
return None
|
|
|
|
try:
|
|
info = json.loads(stdout.strip())
|
|
except json.JSONDecodeError:
|
|
return None
|
|
|
|
if not info.get("id") and not info.get("channel_id"):
|
|
return None
|
|
|
|
channel_info = _normalize_channel(info)
|
|
|
|
# Fetch RSS dates — fast single HTTP request, precise dates for ≤15 newest videos
|
|
uc_id = channel_info.get("youtube_channel_id") or ""
|
|
rss = _rss_dates(uc_id)
|
|
|
|
videos = []
|
|
for entry in info.get("entries") or []:
|
|
vid_id = entry.get("id")
|
|
if not vid_id:
|
|
continue
|
|
published_at = rss.get(vid_id) or _parse_published(entry)
|
|
videos.append({
|
|
"youtube_video_id": vid_id,
|
|
"title": entry.get("title") or "",
|
|
"description": entry.get("description") or None,
|
|
"thumbnail_url": _stable_thumbnail(vid_id),
|
|
"duration_seconds": entry.get("duration"),
|
|
"published_at": published_at,
|
|
"tags": json.dumps(entry.get("tags") or []),
|
|
"category": (entry.get("categories") or [None])[0],
|
|
"channel": {
|
|
"youtube_channel_id": channel_info.get("youtube_channel_id"),
|
|
"name": channel_info.get("name") or "",
|
|
"thumbnail_url": None,
|
|
},
|
|
})
|
|
|
|
return {"channel": channel_info, "videos": videos}
|
|
|
|
|
|
def fetch_channel_links(channel_id: str) -> list[str]:
|
|
"""Extract linked channel IDs from a channel's about/description."""
|
|
if channel_id.startswith("@"):
|
|
url = f"https://www.youtube.com/{channel_id}/about"
|
|
else:
|
|
url = f"https://www.youtube.com/channel/{channel_id}/about"
|
|
stdout, _, code = _run([
|
|
"yt-dlp",
|
|
url,
|
|
"--dump-json",
|
|
"--no-download",
|
|
"--flat-playlist",
|
|
"--playlist-end", "1",
|
|
"--quiet",
|
|
*_cookie_args(),
|
|
], timeout=30)
|
|
|
|
channel_ids = set()
|
|
for line in stdout.splitlines():
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
info = json.loads(line)
|
|
desc = info.get("description", "") or ""
|
|
for match in re.finditer(r"youtube\.com/channel/(UC[\w-]+)", desc):
|
|
channel_ids.add(match.group(1))
|
|
for match in re.finditer(r"youtube\.com/@([\w-]+)", desc):
|
|
channel_ids.add(f"@{match.group(1)}")
|
|
except json.JSONDecodeError:
|
|
continue
|
|
return list(channel_ids)
|
|
|
|
|
|
QUALITY_FORMATS = {
|
|
"best": "bestvideo[ext=mp4][vcodec^=avc1]+bestaudio[ext=m4a]/bestvideo[ext=mp4]+bestaudio[ext=m4a]/22/18/bestvideo+bestaudio/best",
|
|
"2160p": "bestvideo[ext=mp4][height<=2160]+bestaudio[ext=m4a]/bestvideo[height<=2160]+bestaudio/best[height<=2160]",
|
|
"1440p": "bestvideo[ext=mp4][height<=1440]+bestaudio[ext=m4a]/bestvideo[height<=1440]+bestaudio/best[height<=1440]",
|
|
"1080p": "bestvideo[ext=mp4][vcodec^=avc1][height<=1080]+bestaudio[ext=m4a]/bestvideo[ext=mp4][height<=1080]+bestaudio[ext=m4a]/137+140/22/best[height<=1080]",
|
|
"720p": "bestvideo[ext=mp4][vcodec^=avc1][height<=720]+bestaudio[ext=m4a]/bestvideo[ext=mp4][height<=720]+bestaudio[ext=m4a]/22/best[height<=720]",
|
|
"480p": "bestvideo[ext=mp4][vcodec^=avc1][height<=480]+bestaudio[ext=m4a]/bestvideo[ext=mp4][height<=480]+bestaudio[ext=m4a]/18/best[height<=480]",
|
|
"360p": "bestvideo[ext=mp4][height<=360]+bestaudio[ext=m4a]/18/best[height<=360]",
|
|
"240p": "bestvideo[ext=mp4][height<=240]+bestaudio[ext=m4a]/best[height<=240]",
|
|
"144p": "bestvideo[ext=mp4][height<=144]+bestaudio[ext=m4a]/best[height<=144]",
|
|
}
|
|
|
|
|
|
def detect_resolution(file_path: str) -> str | None:
|
|
"""Use ffprobe to get the video stream height and return a label like '1080p'."""
|
|
try:
|
|
result = subprocess.run(
|
|
["ffprobe", "-v", "quiet", "-select_streams", "v:0",
|
|
"-show_entries", "stream=height", "-of", "csv=p=0", file_path],
|
|
capture_output=True, text=True, timeout=15,
|
|
)
|
|
height = int(result.stdout.strip())
|
|
if height >= 1080: return "1080p"
|
|
if height >= 720: return "720p"
|
|
if height >= 480: return "480p"
|
|
if height >= 360: return "360p"
|
|
return f"{height}p"
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def predicted_file_path(video_id: str) -> Path:
|
|
"""Return the expected output path for a video download."""
|
|
return Path(settings.download_path) / f"{video_id}.mp4"
|
|
|
|
|
|
_SEMAPHORE = threading.Semaphore(3)
|
|
_semaphore_lock = threading.Lock()
|
|
_cookies_browser: str = ""
|
|
_cookies_file: str = ""
|
|
_use_oauth2: bool = False
|
|
_cookies_lock = threading.Lock()
|
|
|
|
_AUTO_COOKIES_PATHS = ["/data/cookies.txt"]
|
|
|
|
# OAuth2 device-auth flow state (shared across threads)
|
|
_oauth2_state: dict = {"status": "idle", "device_url": None, "code": None, "error": None}
|
|
_oauth2_state_lock = threading.Lock()
|
|
|
|
|
|
def set_max_concurrent(n: int) -> None:
|
|
global _SEMAPHORE
|
|
with _semaphore_lock:
|
|
_SEMAPHORE = threading.Semaphore(max(1, min(n, 10)))
|
|
|
|
|
|
def set_cookies_browser(browser: str) -> None:
|
|
global _cookies_browser
|
|
with _cookies_lock:
|
|
_cookies_browser = browser.strip().lower()
|
|
|
|
|
|
def set_cookies_file(path: str) -> None:
|
|
global _cookies_file
|
|
with _cookies_lock:
|
|
_cookies_file = path.strip()
|
|
|
|
|
|
def set_oauth2(enabled: bool) -> None:
|
|
global _use_oauth2
|
|
with _cookies_lock:
|
|
_use_oauth2 = bool(enabled)
|
|
|
|
|
|
def _cookie_args() -> list[str]:
|
|
with _cookies_lock:
|
|
cf = _cookies_file
|
|
b = _cookies_browser
|
|
oauth2 = _use_oauth2
|
|
# OAuth2 token auth — IP-independent, works on datacenter servers
|
|
if oauth2:
|
|
return ["--username", "oauth2", "--password", ""]
|
|
# Explicit cookies file
|
|
if cf and Path(cf).exists():
|
|
return ["--cookies", cf]
|
|
# Auto-detect cookies.txt in well-known Docker locations
|
|
for candidate in _AUTO_COOKIES_PATHS:
|
|
if Path(candidate).exists():
|
|
return ["--cookies", candidate]
|
|
# Browser cookies — only when no file path was ever configured.
|
|
# If cookies_file is set but missing, the user intended file auth; falling
|
|
# through to a browser that isn't installed in Docker would silently break
|
|
# all yt-dlp calls with an empty-stdout failure.
|
|
if b and not cf:
|
|
return ["--cookies-from-browser", b]
|
|
return []
|
|
|
|
|
|
def get_oauth2_status() -> dict:
|
|
with _oauth2_state_lock:
|
|
return dict(_oauth2_state)
|
|
|
|
|
|
def start_oauth2_flow() -> dict:
|
|
"""Start yt-dlp OAuth2 device-auth flow in a background thread.
|
|
|
|
yt-dlp prints a Google device URL + code to stderr, then polls until the user
|
|
completes sign-in on their phone/browser. Token is cached to /data/yt-dlp-cache
|
|
(set globally via /etc/yt-dlp.conf) and reused on every subsequent call that
|
|
passes --username oauth2 --password "".
|
|
"""
|
|
import time as _time
|
|
|
|
with _oauth2_state_lock:
|
|
if _oauth2_state["status"] == "pending":
|
|
return dict(_oauth2_state)
|
|
_oauth2_state.update({"status": "pending", "device_url": None, "code": None, "error": None})
|
|
|
|
def _run_flow():
|
|
try:
|
|
process = subprocess.Popen(
|
|
[
|
|
"yt-dlp",
|
|
"--username", "oauth2", "--password", "",
|
|
"https://www.youtube.com/",
|
|
],
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.PIPE,
|
|
text=True,
|
|
)
|
|
for line in process.stderr:
|
|
line = line.strip()
|
|
print(f"[oauth2] {line}", flush=True)
|
|
if "google.com/device" in line or "youtube.com/device" in line:
|
|
url_m = re.search(r"(https://[^\s]+)", line)
|
|
code_m = re.search(r"code[:\s]+([A-Z0-9]{4}-[A-Z0-9]{4}|[A-Z0-9-]{6,})", line, re.IGNORECASE)
|
|
with _oauth2_state_lock:
|
|
_oauth2_state["device_url"] = (url_m.group(1) if url_m else "https://www.google.com/device")
|
|
_oauth2_state["code"] = code_m.group(1) if code_m else None
|
|
process.wait()
|
|
with _oauth2_state_lock:
|
|
if process.returncode == 0:
|
|
_oauth2_state["status"] = "complete"
|
|
else:
|
|
_oauth2_state["status"] = "error"
|
|
_oauth2_state["error"] = f"yt-dlp exited with code {process.returncode}"
|
|
except Exception as exc:
|
|
with _oauth2_state_lock:
|
|
_oauth2_state["status"] = "error"
|
|
_oauth2_state["error"] = str(exc)
|
|
|
|
threading.Thread(target=_run_flow, daemon=True).start()
|
|
|
|
# Wait up to 10 s for the device URL to appear in stderr
|
|
import time as _time
|
|
for _ in range(100):
|
|
with _oauth2_state_lock:
|
|
if _oauth2_state["device_url"] or _oauth2_state["status"] in ("complete", "error"):
|
|
break
|
|
_time.sleep(0.1)
|
|
|
|
with _oauth2_state_lock:
|
|
return dict(_oauth2_state)
|
|
|
|
|
|
def start_download(
|
|
video_id: str,
|
|
download_id: int,
|
|
on_progress: Any,
|
|
on_complete: Any,
|
|
on_error: Any,
|
|
quality: str = "best",
|
|
) -> None:
|
|
"""Start yt-dlp download in a background thread.
|
|
|
|
Uses a single progressive MP4 format so the file is playable as it downloads.
|
|
--no-part writes directly to the final filename (no .part rename at the end).
|
|
"""
|
|
url = f"https://www.youtube.com/watch?v={video_id}"
|
|
# Predictable output path — lets the player start before download finishes
|
|
output_template = str(Path(settings.download_path) / f"{video_id}.%(ext)s")
|
|
|
|
fmt = QUALITY_FORMATS.get(quality, QUALITY_FORMATS["best"])
|
|
|
|
def _run_download():
|
|
with _SEMAPHORE:
|
|
cookie_args = _cookie_args()
|
|
print(f"[ytdlp] cookie_args={cookie_args!r} file_exists={Path(cookie_args[1]).exists() if len(cookie_args) == 2 and cookie_args[0] == '--cookies' else 'n/a'}", flush=True)
|
|
process = subprocess.Popen(
|
|
[
|
|
"yt-dlp", url,
|
|
"-f", fmt,
|
|
"--merge-output-format", "mp4",
|
|
"--postprocessor-args", "Merger+ffmpeg:-movflags +faststart",
|
|
"--embed-metadata", "--embed-thumbnail",
|
|
"--no-part", "--no-mtime",
|
|
"-o", output_template,
|
|
"--newline", "--progress", "--no-colors",
|
|
"--extractor-args", "youtube:player_client=web",
|
|
*_cookie_args(),
|
|
],
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT,
|
|
text=True,
|
|
)
|
|
|
|
file_path = None
|
|
stream_index = 0
|
|
output_lines: list[str] = []
|
|
for line in process.stdout:
|
|
line = line.strip()
|
|
output_lines.append(line)
|
|
if re.search(r"\[download\] Destination:", line):
|
|
stream_index += 1
|
|
m = re.search(r"\[download\]\s+([\d.]+)%", line)
|
|
if m:
|
|
pct = float(m.group(1))
|
|
scaled = pct * 0.85 if stream_index <= 1 else 85.0 + pct * 0.10
|
|
on_progress(download_id, min(scaled, 95.0))
|
|
m2 = re.search(r"\[(?:download|Merger)\] Destination: (.+)", line)
|
|
if m2:
|
|
file_path = m2.group(1).strip()
|
|
|
|
process.wait()
|
|
if process.returncode == 0:
|
|
resolution = detect_resolution(file_path) if file_path else None
|
|
on_complete(download_id, file_path, resolution)
|
|
else:
|
|
tail = "\n".join(output_lines[-20:]) if output_lines else "(no output)"
|
|
import logging
|
|
logging.getLogger(__name__).error("yt-dlp failed (code %d):\n%s", process.returncode, tail)
|
|
on_error(download_id, f"yt-dlp exited with code {process.returncode}:\n{tail}")
|
|
|
|
thread = threading.Thread(target=_run_download, daemon=True)
|
|
thread.start()
|