- search_youtube, fetch_trending, fetch_featured_channels now use _meta_run - Replaced ThreadPoolExecutor(4) parallel searches with sequential loop - Replaced ThreadPoolExecutor(3) parallel featured-channel fetches with sequential - _fetch_and_index_channel passes polite=True to fetch_channel/video_metadata Discovery was firing 4+ simultaneous yt-dlp processes, each with cookies, which is what invalidated the session. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
903 lines
32 KiB
Python
903 lines
32 KiB
Python
"""Subprocess wrapper for yt-dlp."""
|
|
import json
|
|
import random
|
|
import re
|
|
import subprocess
|
|
import threading
|
|
import time
|
|
import urllib.request
|
|
import xml.etree.ElementTree as ET
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from ..config import settings
|
|
|
|
|
|
def _run(args: list[str], timeout: int = 60) -> tuple[str, str, int]:
|
|
result = subprocess.run(args, capture_output=True, text=True, timeout=timeout)
|
|
return result.stdout, result.stderr, result.returncode
|
|
|
|
|
|
# Global rate limiter for all metadata fetches — prevents concurrent tasks from
|
|
# hammering YouTube and invalidating cookies.
|
|
_meta_lock = threading.Lock()
|
|
_meta_last_call: float = 0.0
|
|
_META_MIN_GAP = 5.0 # seconds between any two metadata requests
|
|
|
|
|
|
def _meta_run(args: list[str], timeout: int = 60) -> tuple[str, str, int]:
|
|
global _meta_last_call
|
|
with _meta_lock:
|
|
now = time.monotonic()
|
|
wait = _META_MIN_GAP - (now - _meta_last_call)
|
|
if wait > 0:
|
|
time.sleep(wait + random.uniform(0.5, 2.5))
|
|
_meta_last_call = time.monotonic()
|
|
return _run(args, timeout=timeout)
|
|
|
|
|
|
def _parse_date(date_str: str | None) -> datetime | None:
|
|
if not date_str:
|
|
return None
|
|
try:
|
|
return datetime.strptime(date_str, "%Y%m%d")
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
def _parse_published(info: dict) -> datetime | None:
|
|
"""Extract publish date from yt-dlp info dict.
|
|
|
|
Tries upload_date (YYYYMMDD string) first, then timestamp (Unix epoch),
|
|
then release_timestamp. Flat-playlist entries often omit upload_date but
|
|
include timestamp, so the fallback is important.
|
|
"""
|
|
d = _parse_date(info.get("upload_date"))
|
|
if d:
|
|
return d
|
|
for key in ("timestamp", "release_timestamp"):
|
|
ts = info.get(key)
|
|
if ts:
|
|
try:
|
|
return datetime.utcfromtimestamp(float(ts))
|
|
except (ValueError, OSError, OverflowError):
|
|
pass
|
|
return None
|
|
|
|
|
|
def _stable_thumbnail(video_id: str | None) -> str | None:
|
|
if not video_id:
|
|
return None
|
|
return f"https://i.ytimg.com/vi/{video_id}/hqdefault.jpg"
|
|
|
|
|
|
def _normalize_video(info: dict) -> dict:
|
|
video_id = info.get("id")
|
|
raw_chapters = info.get("chapters") or []
|
|
chapters = [
|
|
{
|
|
"start_time": int(ch.get("start_time") or 0),
|
|
"end_time": int(ch.get("end_time") or 0),
|
|
"title": ch.get("title") or "",
|
|
}
|
|
for ch in raw_chapters
|
|
if ch.get("title")
|
|
]
|
|
return {
|
|
"youtube_video_id": video_id,
|
|
"title": info.get("title", ""),
|
|
"description": info.get("description", ""),
|
|
"thumbnail_url": _stable_thumbnail(video_id),
|
|
"duration_seconds": info.get("duration"),
|
|
"published_at": _parse_published(info),
|
|
"tags": json.dumps(info.get("tags") or []),
|
|
"category": info.get("category") or (info.get("categories") or [None])[0],
|
|
"chapters": json.dumps(chapters) if chapters else None,
|
|
"view_count": info.get("view_count"),
|
|
"like_count": info.get("like_count"),
|
|
"dislike_count": info.get("dislike_count"),
|
|
"channel": {
|
|
"youtube_channel_id": info.get("channel_id"),
|
|
"name": info.get("channel") or info.get("uploader", ""),
|
|
"thumbnail_url": None,
|
|
},
|
|
}
|
|
|
|
|
|
def _channel_banner(thumbnails: list | None) -> str | None:
|
|
if not thumbnails:
|
|
return None
|
|
for t in thumbnails:
|
|
if "banner" in str(t.get("id") or "").lower():
|
|
return t.get("url")
|
|
wide = [t for t in thumbnails
|
|
if t.get("width") and t.get("height") and t["width"] > t["height"] * 3]
|
|
if wide:
|
|
return max(wide, key=lambda t: t.get("width") or 0).get("url")
|
|
return None
|
|
|
|
|
|
def _channel_avatar(thumbnails: list | None) -> str | None:
|
|
"""Pick the channel avatar from yt-dlp's thumbnails list.
|
|
|
|
YouTube returns banners and avatars in the same array. Avatars have id
|
|
'avatar_uncropped' or are roughly square (width ≈ height).
|
|
"""
|
|
if not thumbnails:
|
|
return None
|
|
for t in thumbnails:
|
|
if "avatar" in str(t.get("id") or "").lower():
|
|
return t.get("url")
|
|
# Fall back to the most square thumbnail
|
|
square = [t for t in thumbnails
|
|
if t.get("width") and t.get("height")
|
|
and t["width"] <= t["height"] * 1.2
|
|
and t["height"] <= t["width"] * 1.2]
|
|
if square:
|
|
return max(square, key=lambda t: t.get("width") or 0).get("url")
|
|
return None
|
|
|
|
|
|
def _normalize_channel(info: dict) -> dict:
|
|
return {
|
|
"youtube_channel_id": info.get("channel_id") or info.get("id"),
|
|
"name": info.get("channel") or info.get("title") or info.get("uploader") or None,
|
|
"description": info.get("description") or None,
|
|
"thumbnail_url": _channel_avatar(info.get("thumbnails")),
|
|
"banner_url": _channel_banner(info.get("thumbnails")),
|
|
"subscriber_count": info.get("channel_follower_count"),
|
|
}
|
|
|
|
|
|
def search_youtube(query: str, max_results: int = 40) -> list[dict]:
|
|
"""Search YouTube via yt-dlp. Uses --flat-playlist for fast results."""
|
|
stdout, _, code = _meta_run([
|
|
"yt-dlp",
|
|
f"ytsearch{max_results}:{query}",
|
|
"--dump-json",
|
|
"--flat-playlist",
|
|
"--quiet",
|
|
*_cookie_args(),
|
|
], timeout=60)
|
|
|
|
results = []
|
|
for line in stdout.splitlines():
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
info = json.loads(line)
|
|
# flat-playlist entries have _type="url" with basic fields
|
|
if info.get("_type") in ("url", None) and info.get("id"):
|
|
results.append({
|
|
"youtube_video_id": info.get("id"),
|
|
"title": info.get("title", ""),
|
|
"description": info.get("description") or "",
|
|
"thumbnail_url": _stable_thumbnail(info.get("id")),
|
|
"duration_seconds": info.get("duration"),
|
|
"published_at": _parse_published(info),
|
|
"tags": json.dumps(info.get("tags") or []),
|
|
"category": None,
|
|
"channel": {
|
|
"youtube_channel_id": info.get("channel_id"),
|
|
"name": info.get("channel") or info.get("uploader") or "",
|
|
"thumbnail_url": None,
|
|
},
|
|
})
|
|
except json.JSONDecodeError:
|
|
continue
|
|
return results
|
|
|
|
|
|
def fetch_trending(region: str = "US", max_results: int = 50) -> list[dict]:
|
|
"""Fetch trending videos for a region via yt-dlp search with date-sort filter.
|
|
|
|
Uses the YouTube search sort-by-upload-date URL that reliably returns regional
|
|
results. Falls back gracefully to an empty list on error.
|
|
"""
|
|
region = region.upper()
|
|
# CAI%3D = sort by upload date; gl= sets the region
|
|
url = f"https://www.youtube.com/results?search_query=trending&sp=CAI%253D&gl={region}"
|
|
stdout, _, code = _meta_run([
|
|
"yt-dlp",
|
|
url,
|
|
"--dump-json",
|
|
"--flat-playlist",
|
|
"--quiet",
|
|
"--playlist-end", str(max_results),
|
|
*_cookie_args(),
|
|
], timeout=60)
|
|
|
|
results = []
|
|
for line in stdout.splitlines():
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
info = json.loads(line)
|
|
if info.get("_type") in ("url", None) and info.get("id"):
|
|
results.append({
|
|
"youtube_video_id": info.get("id"),
|
|
"title": info.get("title", ""),
|
|
"thumbnail_url": _stable_thumbnail(info.get("id")),
|
|
"duration_seconds": info.get("duration"),
|
|
"published_at": _parse_published(info),
|
|
"tags": json.dumps(info.get("tags") or []),
|
|
"category": None,
|
|
"channel": {
|
|
"youtube_channel_id": info.get("channel_id"),
|
|
"name": info.get("channel") or info.get("uploader") or "",
|
|
"thumbnail_url": None,
|
|
},
|
|
})
|
|
except json.JSONDecodeError:
|
|
continue
|
|
return results
|
|
|
|
|
|
def _best_thumbnail(thumbnails: list | None) -> str | None:
|
|
if not thumbnails:
|
|
return None
|
|
# pick the one closest to 480px wide
|
|
best = sorted(thumbnails, key=lambda t: abs((t.get("width") or 0) - 480))
|
|
return best[0].get("url") if best else None
|
|
|
|
|
|
def fetch_video_metadata(video_id: str, polite: bool = False) -> dict | None:
|
|
"""Fetch metadata for a single video by YouTube ID.
|
|
|
|
polite=True applies the global rate limiter (for background batch tasks).
|
|
polite=False (default) runs immediately for user-facing requests.
|
|
"""
|
|
url = f"https://www.youtube.com/watch?v={video_id}"
|
|
cookie_args = _cookie_args()
|
|
print(f"[fetch_meta] video={video_id} cookie_args={cookie_args!r}", flush=True)
|
|
base_cmd = [
|
|
"yt-dlp", url,
|
|
"--dump-json", "--no-download", "--no-playlist",
|
|
]
|
|
runner = _meta_run if polite else _run
|
|
stdout, stderr, code = runner([*base_cmd, *cookie_args], timeout=30)
|
|
if code != 0:
|
|
print(f"[fetch_meta] FAILED code={code} stderr={stderr[:500]!r}", flush=True)
|
|
if cookie_args:
|
|
print(f"[fetch_meta] retrying without cookie args", flush=True)
|
|
stdout, stderr, code = runner(base_cmd, timeout=30)
|
|
if code != 0:
|
|
print(f"[fetch_meta] retry also FAILED code={code}", flush=True)
|
|
|
|
for line in stdout.splitlines():
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
info = json.loads(line)
|
|
return _normalize_video(info)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
return None
|
|
|
|
|
|
def _rss_dates(uc_channel_id: str) -> dict[str, datetime]:
|
|
"""Fetch publish dates for the 15 most recent videos from YouTube's RSS feed.
|
|
|
|
Fast, unauthenticated, and returns precise dates. Only works for UC… IDs.
|
|
"""
|
|
if not uc_channel_id or not uc_channel_id.startswith("UC"):
|
|
return {}
|
|
url = f"https://www.youtube.com/feeds/videos.xml?channel_id={uc_channel_id}"
|
|
try:
|
|
req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"})
|
|
with urllib.request.urlopen(req, timeout=10) as resp:
|
|
xml_data = resp.read()
|
|
root = ET.fromstring(xml_data)
|
|
ns = {
|
|
"atom": "http://www.w3.org/2005/Atom",
|
|
"yt": "http://www.youtube.com/xml/schemas/2015",
|
|
}
|
|
dates: dict[str, datetime] = {}
|
|
for entry in root.findall("atom:entry", ns):
|
|
vid_el = entry.find("yt:videoId", ns)
|
|
pub_el = entry.find("atom:published", ns)
|
|
if vid_el is not None and pub_el is not None and vid_el.text and pub_el.text:
|
|
try:
|
|
dt = datetime.fromisoformat(pub_el.text.replace("Z", "+00:00"))
|
|
dates[vid_el.text] = dt.replace(tzinfo=None)
|
|
except ValueError:
|
|
pass
|
|
return dates
|
|
except Exception:
|
|
return {}
|
|
|
|
|
|
def fetch_channel_metadata(channel_id: str, max_videos: int = 30, start_video: int = 1, polite: bool = False) -> dict | None:
|
|
"""Fetch channel info + recent videos.
|
|
|
|
Uses --dump-single-json --flat-playlist for speed, then enriches video dates
|
|
from YouTube's RSS feed (gives precise dates for the 15 most recent videos).
|
|
"""
|
|
if channel_id.startswith("@"):
|
|
url = f"https://www.youtube.com/{channel_id}/videos"
|
|
else:
|
|
url = f"https://www.youtube.com/channel/{channel_id}/videos"
|
|
args = [
|
|
"yt-dlp", url,
|
|
"--dump-single-json",
|
|
"--flat-playlist",
|
|
"--quiet",
|
|
*_cookie_args(),
|
|
]
|
|
if start_video > 1:
|
|
args += ["--playlist-start", str(start_video)]
|
|
if max_videos > 0:
|
|
end = (start_video - 1 + max_videos) if start_video > 1 else max_videos
|
|
args += ["--playlist-end", str(end)]
|
|
|
|
runner = _meta_run if polite else _run
|
|
stdout, _, code = runner(args, timeout=60)
|
|
if not stdout.strip():
|
|
return None
|
|
|
|
try:
|
|
info = json.loads(stdout.strip())
|
|
except json.JSONDecodeError:
|
|
return None
|
|
|
|
if not info.get("id") and not info.get("channel_id"):
|
|
return None
|
|
|
|
channel_info = _normalize_channel(info)
|
|
|
|
# Fetch RSS dates — fast single HTTP request, precise dates for ≤15 newest videos
|
|
uc_id = channel_info.get("youtube_channel_id") or ""
|
|
rss = _rss_dates(uc_id)
|
|
|
|
videos = []
|
|
for entry in info.get("entries") or []:
|
|
vid_id = entry.get("id")
|
|
if not vid_id:
|
|
continue
|
|
published_at = rss.get(vid_id) or _parse_published(entry)
|
|
videos.append({
|
|
"youtube_video_id": vid_id,
|
|
"title": entry.get("title") or "",
|
|
"description": entry.get("description") or None,
|
|
"thumbnail_url": _stable_thumbnail(vid_id),
|
|
"duration_seconds": entry.get("duration"),
|
|
"published_at": published_at,
|
|
"tags": json.dumps(entry.get("tags") or []),
|
|
"category": (entry.get("categories") or [None])[0],
|
|
"channel": {
|
|
"youtube_channel_id": channel_info.get("youtube_channel_id"),
|
|
"name": channel_info.get("name") or "",
|
|
"thumbnail_url": None,
|
|
},
|
|
})
|
|
|
|
return {"channel": channel_info, "videos": videos}
|
|
|
|
|
|
def fetch_channel_playlists(channel_id: str, max_results: int = 100) -> list[dict]:
|
|
"""Fetch the playlists listed on a channel's /playlists tab."""
|
|
if channel_id.startswith("@"):
|
|
url = f"https://www.youtube.com/{channel_id}/playlists"
|
|
else:
|
|
url = f"https://www.youtube.com/channel/{channel_id}/playlists"
|
|
stdout, _, code = _meta_run([
|
|
"yt-dlp", url,
|
|
"--dump-json", "--flat-playlist",
|
|
"--playlist-end", str(max_results),
|
|
"--quiet",
|
|
*_cookie_args(),
|
|
], timeout=60)
|
|
|
|
playlists = []
|
|
for line in stdout.splitlines():
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
info = json.loads(line)
|
|
pl_id = info.get("id") or info.get("playlist_id")
|
|
title = info.get("title") or info.get("playlist_title") or ""
|
|
if not pl_id or not title or pl_id == channel_id:
|
|
continue
|
|
# Thumbnail: yt-dlp gives a thumbnails array for playlist entries;
|
|
# fall back to singular thumbnail field. Never use _stable_thumbnail
|
|
# here because the id is a playlist ID, not a video ID.
|
|
thumbs = info.get("thumbnails") or []
|
|
thumb_url = info.get("thumbnail")
|
|
if thumbs:
|
|
best = max(thumbs, key=lambda t: (t.get("width") or 0) * (t.get("height") or 0), default=None)
|
|
if best:
|
|
thumb_url = best.get("url") or thumb_url
|
|
playlists.append({
|
|
"youtube_playlist_id": pl_id,
|
|
"title": title,
|
|
"description": info.get("description"),
|
|
"thumbnail_url": thumb_url,
|
|
"video_count": info.get("playlist_count") or info.get("n_entries") or 0,
|
|
})
|
|
except json.JSONDecodeError:
|
|
continue
|
|
return playlists
|
|
|
|
|
|
def fetch_playlist_videos(playlist_id: str, max_videos: int = 200) -> list[dict]:
|
|
"""Fetch videos from a YouTube playlist by playlist ID."""
|
|
url = f"https://www.youtube.com/playlist?list={playlist_id}"
|
|
args = [
|
|
"yt-dlp", url,
|
|
"--dump-json", "--flat-playlist",
|
|
"--quiet",
|
|
*_cookie_args(),
|
|
]
|
|
if max_videos > 0:
|
|
args += ["--playlist-end", str(max_videos)]
|
|
stdout, _, code = _run(args, timeout=120)
|
|
|
|
videos = []
|
|
for line in stdout.splitlines():
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
info = json.loads(line)
|
|
vid_id = info.get("id")
|
|
if not vid_id:
|
|
continue
|
|
videos.append({
|
|
"youtube_video_id": vid_id,
|
|
"title": info.get("title", ""),
|
|
"thumbnail_url": _stable_thumbnail(vid_id),
|
|
"duration_seconds": info.get("duration"),
|
|
"published_at": _parse_published(info),
|
|
"view_count": info.get("view_count"),
|
|
"channel": {
|
|
"youtube_channel_id": info.get("channel_id"),
|
|
"name": info.get("channel") or info.get("uploader") or "",
|
|
},
|
|
})
|
|
except json.JSONDecodeError:
|
|
continue
|
|
return videos
|
|
|
|
|
|
def fetch_featured_channels(channel_id: str) -> list[str]:
|
|
"""Fetch channel IDs from the /channels tab of a YouTube channel.
|
|
|
|
The /channels tab lists channels the creator explicitly recommends — a very
|
|
high-signal source for discovery. Returns UC... channel IDs.
|
|
"""
|
|
if channel_id.startswith("@"):
|
|
url = f"https://www.youtube.com/{channel_id}/channels"
|
|
else:
|
|
url = f"https://www.youtube.com/channel/{channel_id}/channels"
|
|
stdout, _, code = _meta_run([
|
|
"yt-dlp", url,
|
|
"--dump-json",
|
|
"--flat-playlist",
|
|
"--quiet",
|
|
*_cookie_args(),
|
|
], timeout=30)
|
|
|
|
channel_ids: list[str] = []
|
|
for line in stdout.splitlines():
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
info = json.loads(line)
|
|
ch_id = info.get("channel_id") or info.get("id")
|
|
if ch_id and ch_id.startswith("UC"):
|
|
channel_ids.append(ch_id)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
return channel_ids
|
|
|
|
|
|
def fetch_channel_links(channel_id: str) -> list[str]:
|
|
"""Extract linked channel IDs from a channel's about/description."""
|
|
if channel_id.startswith("@"):
|
|
url = f"https://www.youtube.com/{channel_id}/about"
|
|
else:
|
|
url = f"https://www.youtube.com/channel/{channel_id}/about"
|
|
stdout, _, code = _run([
|
|
"yt-dlp",
|
|
url,
|
|
"--dump-json",
|
|
"--no-download",
|
|
"--flat-playlist",
|
|
"--playlist-end", "1",
|
|
"--quiet",
|
|
*_cookie_args(),
|
|
], timeout=30)
|
|
|
|
channel_ids = set()
|
|
for line in stdout.splitlines():
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
info = json.loads(line)
|
|
desc = info.get("description", "") or ""
|
|
for match in re.finditer(r"youtube\.com/channel/(UC[\w-]+)", desc):
|
|
channel_ids.add(match.group(1))
|
|
for match in re.finditer(r"youtube\.com/@([\w-]+)", desc):
|
|
channel_ids.add(f"@{match.group(1)}")
|
|
except json.JSONDecodeError:
|
|
continue
|
|
return list(channel_ids)
|
|
|
|
|
|
def _strip_vtt_cue_settings(video_id: str) -> None:
|
|
"""Remove position/align/line cue settings from yt-dlp VTT files.
|
|
|
|
yt-dlp embeds 'align:start position:0%' in every cue header which pins
|
|
subtitles to the bottom-left. Stripping them lets CSS ::cue center them.
|
|
"""
|
|
for vtt in Path(settings.download_path).glob(f"{video_id}.*.vtt"):
|
|
try:
|
|
text = vtt.read_text(encoding="utf-8", errors="replace")
|
|
cleaned = re.sub(
|
|
r'(\d{1,2}:\d{2}:\d{2}\.\d{3} --> \d{1,2}:\d{2}:\d{2}\.\d{3})[^\n]*',
|
|
r'\1',
|
|
text,
|
|
)
|
|
vtt.write_text(cleaned, encoding="utf-8")
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def download_subs_only(video_id: str, subtitle_langs: str) -> bool:
|
|
"""Download subtitle files only (no video) for an already-downloaded video."""
|
|
url = f"https://www.youtube.com/watch?v={video_id}"
|
|
output_template = str(Path(settings.download_path) / f"{video_id}.%(ext)s")
|
|
_, _, code = _run([
|
|
"yt-dlp", url,
|
|
"--skip-download", "--no-playlist",
|
|
"--write-subs", "--write-auto-subs",
|
|
"--sub-langs", subtitle_langs,
|
|
"--convert-subs", "vtt",
|
|
"-o", output_template,
|
|
*_cookie_args(),
|
|
], timeout=60)
|
|
if code == 0:
|
|
_strip_vtt_cue_settings(video_id)
|
|
return code == 0
|
|
|
|
|
|
def fetch_available_subs(video_id: str) -> dict:
|
|
"""Return subtitle languages available on YouTube for a video.
|
|
|
|
Returns {"manual": [...], "auto": [...]} where both are sorted lists of
|
|
BCP-47 lang codes. Manual = human-made; auto = auto-generated captions.
|
|
"""
|
|
url = f"https://www.youtube.com/watch?v={video_id}"
|
|
base_cmd = ["yt-dlp", url, "--dump-json", "--no-download", "--no-playlist"]
|
|
cookie_args = _cookie_args()
|
|
stdout, _, code = _meta_run([*base_cmd, *cookie_args], timeout=30)
|
|
if code != 0 and cookie_args:
|
|
stdout, _, code = _meta_run(base_cmd, timeout=30)
|
|
|
|
for line in stdout.splitlines():
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
info = json.loads(line)
|
|
manual = sorted(info.get("subtitles") or {})
|
|
auto = sorted(set(
|
|
lang for lang in (info.get("automatic_captions") or {})
|
|
if not lang.endswith("-orig")
|
|
))
|
|
return {"manual": manual, "auto": auto}
|
|
except json.JSONDecodeError:
|
|
continue
|
|
return {"manual": [], "auto": []}
|
|
|
|
|
|
def fetch_video_comments(youtube_video_id: str, max_comments: int = 20) -> list[dict]:
|
|
"""Fetch top comments via yt-dlp CLI writing to a temp file. Returns empty list on failure."""
|
|
import os
|
|
import tempfile
|
|
|
|
url = f"https://www.youtube.com/watch?v={youtube_video_id}"
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
out_tmpl = os.path.join(tmpdir, "%(id)s.%(ext)s")
|
|
args = [
|
|
"yt-dlp", url,
|
|
"--write-info-json",
|
|
"--write-comments",
|
|
# Format: thread_count,total_count,replies_per_thread,reply_pages
|
|
"--extractor-args", f"youtube:max_comments={max_comments},{max_comments},0,0;comment_sort=top",
|
|
"--skip-download",
|
|
"--no-playlist",
|
|
"--output", out_tmpl,
|
|
*_cookie_args(),
|
|
]
|
|
_run(args, timeout=90)
|
|
|
|
info = None
|
|
for fname in os.listdir(tmpdir):
|
|
if fname.endswith(".info.json"):
|
|
try:
|
|
with open(os.path.join(tmpdir, fname)) as f:
|
|
info = json.load(f)
|
|
except Exception:
|
|
pass
|
|
break
|
|
|
|
if not info:
|
|
return []
|
|
|
|
result = []
|
|
for c in (info.get("comments") or []):
|
|
if c.get("parent") not in (None, "root"):
|
|
continue # skip replies
|
|
ts = c.get("timestamp")
|
|
result.append({
|
|
"youtube_comment_id": c.get("id"),
|
|
"author": c.get("author"),
|
|
"text": c.get("text"),
|
|
"likes": c.get("like_count") or 0,
|
|
"is_pinned": bool(c.get("is_pinned")),
|
|
"published_at": datetime.utcfromtimestamp(ts) if ts else None,
|
|
})
|
|
result.sort(key=lambda c: (not c["is_pinned"], -(c["likes"] or 0)))
|
|
return result[:max_comments]
|
|
|
|
|
|
def fetch_dislike_count(youtube_video_id: str) -> int | None:
|
|
"""Fetch dislike count from returnyoutubedislike.com (crowdsourced)."""
|
|
try:
|
|
url = f"https://returnyoutubedislikeapi.com/votes?videoId={youtube_video_id}"
|
|
with urllib.request.urlopen(url, timeout=5) as resp:
|
|
data = json.loads(resp.read())
|
|
return data.get("dislikes")
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
QUALITY_FORMATS = {
|
|
"best": "bestvideo+bestaudio/best",
|
|
"2160p": "bestvideo[height<=2160]+bestaudio/bestvideo+bestaudio/best",
|
|
"1440p": "bestvideo[height<=1440]+bestaudio/bestvideo+bestaudio/best",
|
|
"1080p": "bestvideo[height<=1080]+bestaudio/bestvideo+bestaudio/best",
|
|
"720p": "bestvideo[height<=720]+bestaudio/bestvideo+bestaudio/best",
|
|
"480p": "bestvideo[height<=480]+bestaudio/bestvideo+bestaudio/best",
|
|
"360p": "bestvideo[height<=360]+bestaudio/bestvideo+bestaudio/best",
|
|
"240p": "bestvideo[height<=240]+bestaudio/bestvideo+bestaudio/best",
|
|
"144p": "bestvideo[height<=144]+bestaudio/bestvideo+bestaudio/best",
|
|
}
|
|
|
|
|
|
def detect_resolution(file_path: str) -> str | None:
|
|
"""Use ffprobe to get the video stream height and return a label like '1080p'."""
|
|
try:
|
|
result = subprocess.run(
|
|
["ffprobe", "-v", "quiet", "-select_streams", "v:0",
|
|
"-show_entries", "stream=height", "-of", "csv=p=0", file_path],
|
|
capture_output=True, text=True, timeout=15,
|
|
)
|
|
height = int(result.stdout.strip())
|
|
if height >= 2160: return "2160p"
|
|
if height >= 1440: return "1440p"
|
|
if height >= 1080: return "1080p"
|
|
if height >= 720: return "720p"
|
|
if height >= 480: return "480p"
|
|
if height >= 360: return "360p"
|
|
return f"{height}p"
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def predicted_file_path(video_id: str) -> Path:
|
|
"""Return the expected output path for a video download."""
|
|
return Path(settings.download_path) / f"{video_id}.mp4"
|
|
|
|
|
|
_SEMAPHORE = threading.Semaphore(6)
|
|
_semaphore_lock = threading.Lock()
|
|
_cookies_browser: str = ""
|
|
_cookies_file: str = ""
|
|
_use_oauth2: bool = False
|
|
_cookies_lock = threading.Lock()
|
|
|
|
_AUTO_COOKIES_PATHS = ["/data/cookies.txt"]
|
|
|
|
# OAuth2 device-auth flow state (shared across threads)
|
|
_oauth2_state: dict = {"status": "idle", "device_url": None, "code": None, "error": None}
|
|
_oauth2_state_lock = threading.Lock()
|
|
|
|
|
|
def set_max_concurrent(n: int) -> None:
|
|
global _SEMAPHORE
|
|
with _semaphore_lock:
|
|
_SEMAPHORE = threading.Semaphore(max(1, min(n, 16)))
|
|
|
|
|
|
def set_cookies_browser(browser: str) -> None:
|
|
global _cookies_browser
|
|
with _cookies_lock:
|
|
_cookies_browser = browser.strip().lower()
|
|
|
|
|
|
def set_cookies_file(path: str) -> None:
|
|
global _cookies_file
|
|
with _cookies_lock:
|
|
_cookies_file = path.strip()
|
|
|
|
|
|
def set_oauth2(enabled: bool) -> None:
|
|
global _use_oauth2
|
|
with _cookies_lock:
|
|
_use_oauth2 = bool(enabled)
|
|
|
|
|
|
def _cookie_args() -> list[str]:
|
|
with _cookies_lock:
|
|
cf = _cookies_file
|
|
b = _cookies_browser
|
|
oauth2 = _use_oauth2
|
|
# OAuth2 token auth — IP-independent, works on datacenter servers
|
|
if oauth2:
|
|
return ["--username", "oauth2", "--password", ""]
|
|
# Explicit cookies file
|
|
if cf and Path(cf).exists():
|
|
return ["--cookies", cf]
|
|
# Auto-detect cookies.txt in well-known Docker locations
|
|
for candidate in _AUTO_COOKIES_PATHS:
|
|
if Path(candidate).exists():
|
|
return ["--cookies", candidate]
|
|
# Browser cookies — only when no file path was ever configured.
|
|
# If cookies_file is set but missing, the user intended file auth; falling
|
|
# through to a browser that isn't installed in Docker would silently break
|
|
# all yt-dlp calls with an empty-stdout failure.
|
|
if b and not cf:
|
|
return ["--cookies-from-browser", b]
|
|
return []
|
|
|
|
|
|
def get_oauth2_status() -> dict:
|
|
with _oauth2_state_lock:
|
|
return dict(_oauth2_state)
|
|
|
|
|
|
def start_oauth2_flow() -> dict:
|
|
"""Start yt-dlp OAuth2 device-auth flow in a background thread.
|
|
|
|
yt-dlp prints a Google device URL + code to stderr, then polls until the user
|
|
completes sign-in on their phone/browser. Token is cached to /data/yt-dlp-cache
|
|
(set globally via /etc/yt-dlp.conf) and reused on every subsequent call that
|
|
passes --username oauth2 --password "".
|
|
"""
|
|
import time as _time
|
|
|
|
with _oauth2_state_lock:
|
|
if _oauth2_state["status"] == "pending":
|
|
return dict(_oauth2_state)
|
|
_oauth2_state.update({"status": "pending", "device_url": None, "code": None, "error": None})
|
|
|
|
def _run_flow():
|
|
try:
|
|
process = subprocess.Popen(
|
|
[
|
|
"yt-dlp",
|
|
"--username", "oauth2", "--password", "",
|
|
"https://www.youtube.com/",
|
|
],
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.PIPE,
|
|
text=True,
|
|
)
|
|
for line in process.stderr:
|
|
line = line.strip()
|
|
print(f"[oauth2] {line}", flush=True)
|
|
if "google.com/device" in line or "youtube.com/device" in line:
|
|
url_m = re.search(r"(https://[^\s]+)", line)
|
|
code_m = re.search(r"code[:\s]+([A-Z0-9]{4}-[A-Z0-9]{4}|[A-Z0-9-]{6,})", line, re.IGNORECASE)
|
|
with _oauth2_state_lock:
|
|
_oauth2_state["device_url"] = (url_m.group(1) if url_m else "https://www.google.com/device")
|
|
_oauth2_state["code"] = code_m.group(1) if code_m else None
|
|
process.wait()
|
|
with _oauth2_state_lock:
|
|
if process.returncode == 0:
|
|
_oauth2_state["status"] = "complete"
|
|
else:
|
|
_oauth2_state["status"] = "error"
|
|
_oauth2_state["error"] = f"yt-dlp exited with code {process.returncode}"
|
|
except Exception as exc:
|
|
with _oauth2_state_lock:
|
|
_oauth2_state["status"] = "error"
|
|
_oauth2_state["error"] = str(exc)
|
|
|
|
threading.Thread(target=_run_flow, daemon=True).start()
|
|
|
|
# Wait up to 10 s for the device URL to appear in stderr
|
|
import time as _time
|
|
for _ in range(100):
|
|
with _oauth2_state_lock:
|
|
if _oauth2_state["device_url"] or _oauth2_state["status"] in ("complete", "error"):
|
|
break
|
|
_time.sleep(0.1)
|
|
|
|
with _oauth2_state_lock:
|
|
return dict(_oauth2_state)
|
|
|
|
|
|
def start_download(
|
|
video_id: str,
|
|
download_id: int,
|
|
on_progress: Any,
|
|
on_complete: Any,
|
|
on_error: Any,
|
|
quality: str = "best",
|
|
subtitle_langs: str = "",
|
|
) -> None:
|
|
"""Start yt-dlp download in a background thread.
|
|
|
|
--no-part writes directly to the final filename (no .part rename at the end).
|
|
"""
|
|
url = f"https://www.youtube.com/watch?v={video_id}"
|
|
output_template = str(Path(settings.download_path) / f"{video_id}.%(ext)s")
|
|
|
|
fmt = QUALITY_FORMATS.get(quality, QUALITY_FORMATS["best"])
|
|
subtitle_args = (
|
|
["--write-subs", "--write-auto-subs", "--sub-langs", subtitle_langs, "--convert-subs", "vtt"]
|
|
if subtitle_langs else []
|
|
)
|
|
|
|
def _run_download():
|
|
with _SEMAPHORE:
|
|
cookie_args = _cookie_args()
|
|
print(f"[ytdlp] cookie_args={cookie_args!r}", flush=True)
|
|
process = subprocess.Popen(
|
|
[
|
|
"yt-dlp", url,
|
|
"-f", fmt,
|
|
"--merge-output-format", "mp4",
|
|
"--no-part", "--no-mtime",
|
|
"-o", output_template,
|
|
"--newline", "--progress", "--no-colors",
|
|
*subtitle_args,
|
|
*cookie_args,
|
|
],
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT,
|
|
text=True,
|
|
)
|
|
|
|
file_path = None
|
|
stream_index = 0
|
|
output_lines: list[str] = []
|
|
for line in process.stdout:
|
|
line = line.strip()
|
|
output_lines.append(line)
|
|
if re.search(r"\[download\] Destination:", line):
|
|
stream_index += 1
|
|
m = re.search(r"\[download\]\s+([\d.]+)%", line)
|
|
if m:
|
|
pct = float(m.group(1))
|
|
scaled = pct * 0.85 if stream_index <= 1 else 85.0 + pct * 0.10
|
|
on_progress(download_id, min(scaled, 95.0))
|
|
m2 = re.search(r"\[(?:download|Merger)\] Destination: (.+)", line)
|
|
if m2:
|
|
file_path = m2.group(1).strip()
|
|
|
|
process.wait()
|
|
if process.returncode == 0:
|
|
_strip_vtt_cue_settings(video_id)
|
|
resolution = detect_resolution(file_path) if file_path else None
|
|
on_complete(download_id, file_path, resolution)
|
|
else:
|
|
tail = "\n".join(output_lines[-20:]) if output_lines else "(no output)"
|
|
import logging
|
|
logging.getLogger(__name__).error("yt-dlp failed (code %d):\n%s", process.returncode, tail)
|
|
on_error(download_id, f"yt-dlp exited with code {process.returncode}:\n{tail}")
|
|
|
|
thread = threading.Thread(target=_run_download, daemon=True)
|
|
thread.start()
|