"""Subprocess wrapper for yt-dlp.""" import json import random import re import subprocess import threading import time import urllib.request import xml.etree.ElementTree as ET from datetime import datetime, timezone from pathlib import Path from typing import Any from ..config import settings def _run(args: list[str], timeout: int = 60) -> tuple[str, str, int]: result = subprocess.run(args, capture_output=True, text=True, timeout=timeout) return result.stdout, result.stderr, result.returncode # Global rate limiter for all metadata fetches — prevents concurrent tasks from # hammering YouTube and invalidating cookies. _meta_lock = threading.Lock() _meta_last_call: float = 0.0 _META_MIN_GAP = 5.0 # seconds between any two metadata requests def _meta_run(args: list[str], timeout: int = 60) -> tuple[str, str, int]: global _meta_last_call with _meta_lock: now = time.monotonic() wait = _META_MIN_GAP - (now - _meta_last_call) if wait > 0: time.sleep(wait + random.uniform(0.5, 2.5)) _meta_last_call = time.monotonic() return _run(args, timeout=timeout) def _parse_date(date_str: str | None) -> datetime | None: if not date_str: return None try: return datetime.strptime(date_str, "%Y%m%d") except ValueError: return None def _parse_published(info: dict) -> datetime | None: """Extract publish date from yt-dlp info dict. Tries upload_date (YYYYMMDD string) first, then timestamp (Unix epoch), then release_timestamp. Flat-playlist entries often omit upload_date but include timestamp, so the fallback is important. """ d = _parse_date(info.get("upload_date")) if d: return d for key in ("timestamp", "release_timestamp"): ts = info.get(key) if ts: try: return datetime.utcfromtimestamp(float(ts)) except (ValueError, OSError, OverflowError): pass return None def _stable_thumbnail(video_id: str | None) -> str | None: if not video_id: return None return f"https://i.ytimg.com/vi/{video_id}/hqdefault.jpg" def _normalize_video(info: dict) -> dict: video_id = info.get("id") raw_chapters = info.get("chapters") or [] chapters = [ { "start_time": int(ch.get("start_time") or 0), "end_time": int(ch.get("end_time") or 0), "title": ch.get("title") or "", } for ch in raw_chapters if ch.get("title") ] return { "youtube_video_id": video_id, "title": info.get("title", ""), "description": info.get("description", ""), "thumbnail_url": _stable_thumbnail(video_id), "duration_seconds": info.get("duration"), "published_at": _parse_published(info), "tags": json.dumps(info.get("tags") or []), "category": info.get("category") or (info.get("categories") or [None])[0], "chapters": json.dumps(chapters) if chapters else None, "view_count": info.get("view_count"), "like_count": info.get("like_count"), "dislike_count": info.get("dislike_count"), "channel": { "youtube_channel_id": info.get("channel_id"), "name": info.get("channel") or info.get("uploader", ""), "thumbnail_url": None, }, } def _channel_banner(thumbnails: list | None) -> str | None: if not thumbnails: return None for t in thumbnails: if "banner" in str(t.get("id") or "").lower(): return t.get("url") wide = [t for t in thumbnails if t.get("width") and t.get("height") and t["width"] > t["height"] * 3] if wide: return max(wide, key=lambda t: t.get("width") or 0).get("url") return None def _channel_avatar(thumbnails: list | None) -> str | None: """Pick the channel avatar from yt-dlp's thumbnails list. YouTube returns banners and avatars in the same array. Avatars have id 'avatar_uncropped' or are roughly square (width ≈ height). """ if not thumbnails: return None for t in thumbnails: if "avatar" in str(t.get("id") or "").lower(): return t.get("url") # Fall back to the most square thumbnail square = [t for t in thumbnails if t.get("width") and t.get("height") and t["width"] <= t["height"] * 1.2 and t["height"] <= t["width"] * 1.2] if square: return max(square, key=lambda t: t.get("width") or 0).get("url") return None def _normalize_channel(info: dict) -> dict: return { "youtube_channel_id": info.get("channel_id") or info.get("id"), "name": info.get("channel") or info.get("title") or info.get("uploader") or None, "description": info.get("description") or None, "thumbnail_url": _channel_avatar(info.get("thumbnails")), "banner_url": _channel_banner(info.get("thumbnails")), "subscriber_count": info.get("channel_follower_count"), } def search_youtube(query: str, max_results: int = 40) -> list[dict]: """Search YouTube via yt-dlp. Uses --flat-playlist for fast results.""" stdout, _, code = _run([ "yt-dlp", f"ytsearch{max_results}:{query}", "--dump-json", "--flat-playlist", "--quiet", *_cookie_args(), ], timeout=60) results = [] for line in stdout.splitlines(): line = line.strip() if not line: continue try: info = json.loads(line) # flat-playlist entries have _type="url" with basic fields if info.get("_type") in ("url", None) and info.get("id"): results.append({ "youtube_video_id": info.get("id"), "title": info.get("title", ""), "description": info.get("description") or "", "thumbnail_url": _stable_thumbnail(info.get("id")), "duration_seconds": info.get("duration"), "published_at": _parse_published(info), "tags": json.dumps(info.get("tags") or []), "category": None, "channel": { "youtube_channel_id": info.get("channel_id"), "name": info.get("channel") or info.get("uploader") or "", "thumbnail_url": None, }, }) except json.JSONDecodeError: continue return results def fetch_trending(region: str = "US", max_results: int = 50) -> list[dict]: """Fetch trending videos for a region via yt-dlp search with date-sort filter. Uses the YouTube search sort-by-upload-date URL that reliably returns regional results. Falls back gracefully to an empty list on error. """ region = region.upper() # CAI%3D = sort by upload date; gl= sets the region url = f"https://www.youtube.com/results?search_query=trending&sp=CAI%253D&gl={region}" stdout, _, code = _run([ "yt-dlp", url, "--dump-json", "--flat-playlist", "--quiet", "--playlist-end", str(max_results), *_cookie_args(), ], timeout=60) results = [] for line in stdout.splitlines(): line = line.strip() if not line: continue try: info = json.loads(line) if info.get("_type") in ("url", None) and info.get("id"): results.append({ "youtube_video_id": info.get("id"), "title": info.get("title", ""), "thumbnail_url": _stable_thumbnail(info.get("id")), "duration_seconds": info.get("duration"), "published_at": _parse_published(info), "tags": json.dumps(info.get("tags") or []), "category": None, "channel": { "youtube_channel_id": info.get("channel_id"), "name": info.get("channel") or info.get("uploader") or "", "thumbnail_url": None, }, }) except json.JSONDecodeError: continue return results def _best_thumbnail(thumbnails: list | None) -> str | None: if not thumbnails: return None # pick the one closest to 480px wide best = sorted(thumbnails, key=lambda t: abs((t.get("width") or 0) - 480)) return best[0].get("url") if best else None def fetch_video_metadata(video_id: str) -> dict | None: """Fetch metadata for a single video by YouTube ID.""" url = f"https://www.youtube.com/watch?v={video_id}" cookie_args = _cookie_args() print(f"[fetch_meta] video={video_id} cookie_args={cookie_args!r}", flush=True) base_cmd = [ "yt-dlp", url, "--dump-json", "--no-download", "--no-playlist", ] stdout, stderr, code = _meta_run([*base_cmd, *cookie_args], timeout=30) if code != 0: print(f"[fetch_meta] FAILED code={code} stderr={stderr[:500]!r}", flush=True) # Retry without auth args — broken cookie config shouldn't block public videos if cookie_args: print(f"[fetch_meta] retrying without cookie args", flush=True) stdout, stderr, code = _meta_run(base_cmd, timeout=30) if code != 0: print(f"[fetch_meta] retry also FAILED code={code}", flush=True) for line in stdout.splitlines(): line = line.strip() if not line: continue try: info = json.loads(line) return _normalize_video(info) except json.JSONDecodeError: continue return None def _rss_dates(uc_channel_id: str) -> dict[str, datetime]: """Fetch publish dates for the 15 most recent videos from YouTube's RSS feed. Fast, unauthenticated, and returns precise dates. Only works for UC… IDs. """ if not uc_channel_id or not uc_channel_id.startswith("UC"): return {} url = f"https://www.youtube.com/feeds/videos.xml?channel_id={uc_channel_id}" try: req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"}) with urllib.request.urlopen(req, timeout=10) as resp: xml_data = resp.read() root = ET.fromstring(xml_data) ns = { "atom": "http://www.w3.org/2005/Atom", "yt": "http://www.youtube.com/xml/schemas/2015", } dates: dict[str, datetime] = {} for entry in root.findall("atom:entry", ns): vid_el = entry.find("yt:videoId", ns) pub_el = entry.find("atom:published", ns) if vid_el is not None and pub_el is not None and vid_el.text and pub_el.text: try: dt = datetime.fromisoformat(pub_el.text.replace("Z", "+00:00")) dates[vid_el.text] = dt.replace(tzinfo=None) except ValueError: pass return dates except Exception: return {} def fetch_channel_metadata(channel_id: str, max_videos: int = 30, start_video: int = 1) -> dict | None: """Fetch channel info + recent videos. Uses --dump-single-json --flat-playlist for speed, then enriches video dates from YouTube's RSS feed (gives precise dates for the 15 most recent videos). """ if channel_id.startswith("@"): url = f"https://www.youtube.com/{channel_id}/videos" else: url = f"https://www.youtube.com/channel/{channel_id}/videos" args = [ "yt-dlp", url, "--dump-single-json", "--flat-playlist", "--quiet", *_cookie_args(), ] if start_video > 1: args += ["--playlist-start", str(start_video)] if max_videos > 0: end = (start_video - 1 + max_videos) if start_video > 1 else max_videos args += ["--playlist-end", str(end)] stdout, _, code = _meta_run(args, timeout=60) if not stdout.strip(): return None try: info = json.loads(stdout.strip()) except json.JSONDecodeError: return None if not info.get("id") and not info.get("channel_id"): return None channel_info = _normalize_channel(info) # Fetch RSS dates — fast single HTTP request, precise dates for ≤15 newest videos uc_id = channel_info.get("youtube_channel_id") or "" rss = _rss_dates(uc_id) videos = [] for entry in info.get("entries") or []: vid_id = entry.get("id") if not vid_id: continue published_at = rss.get(vid_id) or _parse_published(entry) videos.append({ "youtube_video_id": vid_id, "title": entry.get("title") or "", "description": entry.get("description") or None, "thumbnail_url": _stable_thumbnail(vid_id), "duration_seconds": entry.get("duration"), "published_at": published_at, "tags": json.dumps(entry.get("tags") or []), "category": (entry.get("categories") or [None])[0], "channel": { "youtube_channel_id": channel_info.get("youtube_channel_id"), "name": channel_info.get("name") or "", "thumbnail_url": None, }, }) return {"channel": channel_info, "videos": videos} def fetch_channel_playlists(channel_id: str, max_results: int = 100) -> list[dict]: """Fetch the playlists listed on a channel's /playlists tab.""" if channel_id.startswith("@"): url = f"https://www.youtube.com/{channel_id}/playlists" else: url = f"https://www.youtube.com/channel/{channel_id}/playlists" stdout, _, code = _meta_run([ "yt-dlp", url, "--dump-json", "--flat-playlist", "--playlist-end", str(max_results), "--quiet", *_cookie_args(), ], timeout=60) playlists = [] for line in stdout.splitlines(): line = line.strip() if not line: continue try: info = json.loads(line) pl_id = info.get("id") or info.get("playlist_id") title = info.get("title") or info.get("playlist_title") or "" if not pl_id or not title or pl_id == channel_id: continue # Thumbnail: yt-dlp gives a thumbnails array for playlist entries; # fall back to singular thumbnail field. Never use _stable_thumbnail # here because the id is a playlist ID, not a video ID. thumbs = info.get("thumbnails") or [] thumb_url = info.get("thumbnail") if thumbs: best = max(thumbs, key=lambda t: (t.get("width") or 0) * (t.get("height") or 0), default=None) if best: thumb_url = best.get("url") or thumb_url playlists.append({ "youtube_playlist_id": pl_id, "title": title, "description": info.get("description"), "thumbnail_url": thumb_url, "video_count": info.get("playlist_count") or info.get("n_entries") or 0, }) except json.JSONDecodeError: continue return playlists def fetch_playlist_videos(playlist_id: str, max_videos: int = 200) -> list[dict]: """Fetch videos from a YouTube playlist by playlist ID.""" url = f"https://www.youtube.com/playlist?list={playlist_id}" args = [ "yt-dlp", url, "--dump-json", "--flat-playlist", "--quiet", *_cookie_args(), ] if max_videos > 0: args += ["--playlist-end", str(max_videos)] stdout, _, code = _run(args, timeout=120) videos = [] for line in stdout.splitlines(): line = line.strip() if not line: continue try: info = json.loads(line) vid_id = info.get("id") if not vid_id: continue videos.append({ "youtube_video_id": vid_id, "title": info.get("title", ""), "thumbnail_url": _stable_thumbnail(vid_id), "duration_seconds": info.get("duration"), "published_at": _parse_published(info), "view_count": info.get("view_count"), "channel": { "youtube_channel_id": info.get("channel_id"), "name": info.get("channel") or info.get("uploader") or "", }, }) except json.JSONDecodeError: continue return videos def fetch_featured_channels(channel_id: str) -> list[str]: """Fetch channel IDs from the /channels tab of a YouTube channel. The /channels tab lists channels the creator explicitly recommends — a very high-signal source for discovery. Returns UC... channel IDs. """ if channel_id.startswith("@"): url = f"https://www.youtube.com/{channel_id}/channels" else: url = f"https://www.youtube.com/channel/{channel_id}/channels" stdout, _, code = _run([ "yt-dlp", url, "--dump-json", "--flat-playlist", "--quiet", *_cookie_args(), ], timeout=30) channel_ids: list[str] = [] for line in stdout.splitlines(): line = line.strip() if not line: continue try: info = json.loads(line) ch_id = info.get("channel_id") or info.get("id") if ch_id and ch_id.startswith("UC"): channel_ids.append(ch_id) except json.JSONDecodeError: continue return channel_ids def fetch_channel_links(channel_id: str) -> list[str]: """Extract linked channel IDs from a channel's about/description.""" if channel_id.startswith("@"): url = f"https://www.youtube.com/{channel_id}/about" else: url = f"https://www.youtube.com/channel/{channel_id}/about" stdout, _, code = _run([ "yt-dlp", url, "--dump-json", "--no-download", "--flat-playlist", "--playlist-end", "1", "--quiet", *_cookie_args(), ], timeout=30) channel_ids = set() for line in stdout.splitlines(): line = line.strip() if not line: continue try: info = json.loads(line) desc = info.get("description", "") or "" for match in re.finditer(r"youtube\.com/channel/(UC[\w-]+)", desc): channel_ids.add(match.group(1)) for match in re.finditer(r"youtube\.com/@([\w-]+)", desc): channel_ids.add(f"@{match.group(1)}") except json.JSONDecodeError: continue return list(channel_ids) def _strip_vtt_cue_settings(video_id: str) -> None: """Remove position/align/line cue settings from yt-dlp VTT files. yt-dlp embeds 'align:start position:0%' in every cue header which pins subtitles to the bottom-left. Stripping them lets CSS ::cue center them. """ for vtt in Path(settings.download_path).glob(f"{video_id}.*.vtt"): try: text = vtt.read_text(encoding="utf-8", errors="replace") cleaned = re.sub( r'(\d{1,2}:\d{2}:\d{2}\.\d{3} --> \d{1,2}:\d{2}:\d{2}\.\d{3})[^\n]*', r'\1', text, ) vtt.write_text(cleaned, encoding="utf-8") except Exception: pass def download_subs_only(video_id: str, subtitle_langs: str) -> bool: """Download subtitle files only (no video) for an already-downloaded video.""" url = f"https://www.youtube.com/watch?v={video_id}" output_template = str(Path(settings.download_path) / f"{video_id}.%(ext)s") _, _, code = _run([ "yt-dlp", url, "--skip-download", "--no-playlist", "--write-subs", "--write-auto-subs", "--sub-langs", subtitle_langs, "--convert-subs", "vtt", "-o", output_template, *_cookie_args(), ], timeout=60) if code == 0: _strip_vtt_cue_settings(video_id) return code == 0 def fetch_available_subs(video_id: str) -> dict: """Return subtitle languages available on YouTube for a video. Returns {"manual": [...], "auto": [...]} where both are sorted lists of BCP-47 lang codes. Manual = human-made; auto = auto-generated captions. """ url = f"https://www.youtube.com/watch?v={video_id}" base_cmd = ["yt-dlp", url, "--dump-json", "--no-download", "--no-playlist"] cookie_args = _cookie_args() stdout, _, code = _meta_run([*base_cmd, *cookie_args], timeout=30) if code != 0 and cookie_args: stdout, _, code = _meta_run(base_cmd, timeout=30) for line in stdout.splitlines(): line = line.strip() if not line: continue try: info = json.loads(line) manual = sorted(info.get("subtitles") or {}) auto = sorted(set( lang for lang in (info.get("automatic_captions") or {}) if not lang.endswith("-orig") )) return {"manual": manual, "auto": auto} except json.JSONDecodeError: continue return {"manual": [], "auto": []} def fetch_video_comments(youtube_video_id: str, max_comments: int = 20) -> list[dict]: """Fetch top comments via yt-dlp CLI writing to a temp file. Returns empty list on failure.""" import os import tempfile url = f"https://www.youtube.com/watch?v={youtube_video_id}" with tempfile.TemporaryDirectory() as tmpdir: out_tmpl = os.path.join(tmpdir, "%(id)s.%(ext)s") args = [ "yt-dlp", url, "--write-info-json", "--write-comments", # Format: thread_count,total_count,replies_per_thread,reply_pages "--extractor-args", f"youtube:max_comments={max_comments},{max_comments},0,0;comment_sort=top", "--skip-download", "--no-playlist", "--output", out_tmpl, *_cookie_args(), ] _run(args, timeout=90) info = None for fname in os.listdir(tmpdir): if fname.endswith(".info.json"): try: with open(os.path.join(tmpdir, fname)) as f: info = json.load(f) except Exception: pass break if not info: return [] result = [] for c in (info.get("comments") or []): if c.get("parent") not in (None, "root"): continue # skip replies ts = c.get("timestamp") result.append({ "youtube_comment_id": c.get("id"), "author": c.get("author"), "text": c.get("text"), "likes": c.get("like_count") or 0, "is_pinned": bool(c.get("is_pinned")), "published_at": datetime.utcfromtimestamp(ts) if ts else None, }) result.sort(key=lambda c: (not c["is_pinned"], -(c["likes"] or 0))) return result[:max_comments] def fetch_dislike_count(youtube_video_id: str) -> int | None: """Fetch dislike count from returnyoutubedislike.com (crowdsourced).""" try: url = f"https://returnyoutubedislikeapi.com/votes?videoId={youtube_video_id}" with urllib.request.urlopen(url, timeout=5) as resp: data = json.loads(resp.read()) return data.get("dislikes") except Exception: return None QUALITY_FORMATS = { "best": "bestvideo+bestaudio/best", "2160p": "bestvideo[height<=2160]+bestaudio/bestvideo+bestaudio/best", "1440p": "bestvideo[height<=1440]+bestaudio/bestvideo+bestaudio/best", "1080p": "bestvideo[height<=1080]+bestaudio/bestvideo+bestaudio/best", "720p": "bestvideo[height<=720]+bestaudio/bestvideo+bestaudio/best", "480p": "bestvideo[height<=480]+bestaudio/bestvideo+bestaudio/best", "360p": "bestvideo[height<=360]+bestaudio/bestvideo+bestaudio/best", "240p": "bestvideo[height<=240]+bestaudio/bestvideo+bestaudio/best", "144p": "bestvideo[height<=144]+bestaudio/bestvideo+bestaudio/best", } def detect_resolution(file_path: str) -> str | None: """Use ffprobe to get the video stream height and return a label like '1080p'.""" try: result = subprocess.run( ["ffprobe", "-v", "quiet", "-select_streams", "v:0", "-show_entries", "stream=height", "-of", "csv=p=0", file_path], capture_output=True, text=True, timeout=15, ) height = int(result.stdout.strip()) if height >= 2160: return "2160p" if height >= 1440: return "1440p" if height >= 1080: return "1080p" if height >= 720: return "720p" if height >= 480: return "480p" if height >= 360: return "360p" return f"{height}p" except Exception: return None def predicted_file_path(video_id: str) -> Path: """Return the expected output path for a video download.""" return Path(settings.download_path) / f"{video_id}.mp4" _SEMAPHORE = threading.Semaphore(6) _semaphore_lock = threading.Lock() _cookies_browser: str = "" _cookies_file: str = "" _use_oauth2: bool = False _cookies_lock = threading.Lock() _AUTO_COOKIES_PATHS = ["/data/cookies.txt"] # OAuth2 device-auth flow state (shared across threads) _oauth2_state: dict = {"status": "idle", "device_url": None, "code": None, "error": None} _oauth2_state_lock = threading.Lock() def set_max_concurrent(n: int) -> None: global _SEMAPHORE with _semaphore_lock: _SEMAPHORE = threading.Semaphore(max(1, min(n, 16))) def set_cookies_browser(browser: str) -> None: global _cookies_browser with _cookies_lock: _cookies_browser = browser.strip().lower() def set_cookies_file(path: str) -> None: global _cookies_file with _cookies_lock: _cookies_file = path.strip() def set_oauth2(enabled: bool) -> None: global _use_oauth2 with _cookies_lock: _use_oauth2 = bool(enabled) def _cookie_args() -> list[str]: with _cookies_lock: cf = _cookies_file b = _cookies_browser oauth2 = _use_oauth2 # OAuth2 token auth — IP-independent, works on datacenter servers if oauth2: return ["--username", "oauth2", "--password", ""] # Explicit cookies file if cf and Path(cf).exists(): return ["--cookies", cf] # Auto-detect cookies.txt in well-known Docker locations for candidate in _AUTO_COOKIES_PATHS: if Path(candidate).exists(): return ["--cookies", candidate] # Browser cookies — only when no file path was ever configured. # If cookies_file is set but missing, the user intended file auth; falling # through to a browser that isn't installed in Docker would silently break # all yt-dlp calls with an empty-stdout failure. if b and not cf: return ["--cookies-from-browser", b] return [] def get_oauth2_status() -> dict: with _oauth2_state_lock: return dict(_oauth2_state) def start_oauth2_flow() -> dict: """Start yt-dlp OAuth2 device-auth flow in a background thread. yt-dlp prints a Google device URL + code to stderr, then polls until the user completes sign-in on their phone/browser. Token is cached to /data/yt-dlp-cache (set globally via /etc/yt-dlp.conf) and reused on every subsequent call that passes --username oauth2 --password "". """ import time as _time with _oauth2_state_lock: if _oauth2_state["status"] == "pending": return dict(_oauth2_state) _oauth2_state.update({"status": "pending", "device_url": None, "code": None, "error": None}) def _run_flow(): try: process = subprocess.Popen( [ "yt-dlp", "--username", "oauth2", "--password", "", "https://www.youtube.com/", ], stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, text=True, ) for line in process.stderr: line = line.strip() print(f"[oauth2] {line}", flush=True) if "google.com/device" in line or "youtube.com/device" in line: url_m = re.search(r"(https://[^\s]+)", line) code_m = re.search(r"code[:\s]+([A-Z0-9]{4}-[A-Z0-9]{4}|[A-Z0-9-]{6,})", line, re.IGNORECASE) with _oauth2_state_lock: _oauth2_state["device_url"] = (url_m.group(1) if url_m else "https://www.google.com/device") _oauth2_state["code"] = code_m.group(1) if code_m else None process.wait() with _oauth2_state_lock: if process.returncode == 0: _oauth2_state["status"] = "complete" else: _oauth2_state["status"] = "error" _oauth2_state["error"] = f"yt-dlp exited with code {process.returncode}" except Exception as exc: with _oauth2_state_lock: _oauth2_state["status"] = "error" _oauth2_state["error"] = str(exc) threading.Thread(target=_run_flow, daemon=True).start() # Wait up to 10 s for the device URL to appear in stderr import time as _time for _ in range(100): with _oauth2_state_lock: if _oauth2_state["device_url"] or _oauth2_state["status"] in ("complete", "error"): break _time.sleep(0.1) with _oauth2_state_lock: return dict(_oauth2_state) def start_download( video_id: str, download_id: int, on_progress: Any, on_complete: Any, on_error: Any, quality: str = "best", subtitle_langs: str = "", ) -> None: """Start yt-dlp download in a background thread. --no-part writes directly to the final filename (no .part rename at the end). """ url = f"https://www.youtube.com/watch?v={video_id}" output_template = str(Path(settings.download_path) / f"{video_id}.%(ext)s") fmt = QUALITY_FORMATS.get(quality, QUALITY_FORMATS["best"]) subtitle_args = ( ["--write-subs", "--write-auto-subs", "--sub-langs", subtitle_langs, "--convert-subs", "vtt"] if subtitle_langs else [] ) def _run_download(): with _SEMAPHORE: cookie_args = _cookie_args() print(f"[ytdlp] cookie_args={cookie_args!r}", flush=True) process = subprocess.Popen( [ "yt-dlp", url, "-f", fmt, "--merge-output-format", "mp4", "--no-part", "--no-mtime", "-o", output_template, "--newline", "--progress", "--no-colors", *subtitle_args, *cookie_args, ], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, ) file_path = None stream_index = 0 output_lines: list[str] = [] for line in process.stdout: line = line.strip() output_lines.append(line) if re.search(r"\[download\] Destination:", line): stream_index += 1 m = re.search(r"\[download\]\s+([\d.]+)%", line) if m: pct = float(m.group(1)) scaled = pct * 0.85 if stream_index <= 1 else 85.0 + pct * 0.10 on_progress(download_id, min(scaled, 95.0)) m2 = re.search(r"\[(?:download|Merger)\] Destination: (.+)", line) if m2: file_path = m2.group(1).strip() process.wait() if process.returncode == 0: _strip_vtt_cue_settings(video_id) resolution = detect_resolution(file_path) if file_path else None on_complete(download_id, file_path, resolution) else: tail = "\n".join(output_lines[-20:]) if output_lines else "(no output)" import logging logging.getLogger(__name__).error("yt-dlp failed (code %d):\n%s", process.returncode, tail) on_error(download_id, f"yt-dlp exited with code {process.returncode}:\n{tail}") thread = threading.Thread(target=_run_download, daemon=True) thread.start()