Initial commit — YT Hub

Self-hosted personal YouTube management app.
FastAPI + SQLite backend, React + Vite + Tailwind frontend.
Dockerfiles and compose included for Portainer deployment.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
inputnoise
2026-05-25 20:09:04 +02:00
commit 1827dd6c4e
63 changed files with 14480 additions and 0 deletions

View File

View File

@@ -0,0 +1,614 @@
"""Discovery engine — search-based crawl, trending, community signal, category clustering."""
import json
import random
from datetime import datetime
from sqlalchemy.orm import Session
from sqlalchemy import text
from ..models import Channel, UserChannel, DiscoveryQueue, Video
from . import ytdlp
def _fetch_and_index_channel(db: Session, channel: Channel):
"""Fetch full metadata + recent videos for a discovered channel."""
try:
result = ytdlp.fetch_channel_metadata(channel.youtube_channel_id, max_videos=10)
if not result:
return
ch_data = result.get("channel", {})
for k, v in ch_data.items():
if hasattr(channel, k) and v is not None and v != "":
setattr(channel, k, v)
channel.crawled_at = datetime.utcnow()
videos = result.get("videos", [])
# For videos missing a date (RSS didn't cover them or flat-playlist had no timestamp),
# do individual fetches — capped at 3 to avoid slow-downs.
dateless = [v for v in videos if not v.get("published_at")]
individual_fetched: dict[str, dict] = {}
for vdata in dateless[:3]:
yt_id = vdata.get("youtube_video_id")
if not yt_id:
continue
try:
meta = ytdlp.fetch_video_metadata(yt_id)
if meta and meta.get("published_at"):
individual_fetched[yt_id] = meta
except Exception:
pass
for vdata in videos:
yt_id = vdata.get("youtube_video_id")
if not yt_id:
continue
# Prefer individually-fetched metadata if we retrieved it
if yt_id in individual_fetched:
vdata = individual_fetched[yt_id]
# Skip videos we still can't date — undated videos break feed ordering
if not vdata.get("published_at"):
continue
if not db.query(Video).filter_by(youtube_video_id=yt_id).first():
db.add(Video(
youtube_video_id=yt_id,
channel_id=channel.id,
title=vdata.get("title", ""),
description=vdata.get("description"),
thumbnail_url=vdata.get("thumbnail_url"),
duration_seconds=vdata.get("duration_seconds"),
published_at=vdata.get("published_at"),
tags=vdata.get("tags"),
category=vdata.get("category"),
))
db.commit()
except Exception:
db.rollback()
def _upsert_channel(db: Session, channel_data: dict) -> Channel | None:
yt_id = channel_data.get("youtube_channel_id")
if not yt_id:
return None
channel = db.query(Channel).filter_by(youtube_channel_id=yt_id).first()
if not channel:
channel = Channel(**channel_data)
db.add(channel)
db.flush()
return channel
def _add_to_discovery(
db: Session, user_id: int, channel_id: int, score: float, source: str,
preview_json: str | None = None,
):
existing = db.query(DiscoveryQueue).filter_by(user_id=user_id, channel_id=channel_id).first()
if existing:
# Accumulate scores across sources but cap to prevent one dominant signal
existing.score = existing.score + score * 0.5
if preview_json and not existing.preview_json:
existing.preview_json = preview_json
return
db.add(DiscoveryQueue(
user_id=user_id,
channel_id=channel_id,
score=score,
source=source,
preview_json=preview_json,
))
def _search_and_store(
db: Session, user_id: int, queries: list[str],
followed_yt_ids: set[str], score_multiplier: float, source: str,
):
"""Run YouTube searches for the given queries and add results to discovery."""
discovered: dict[str, dict] = {}
for query in queries:
try:
results = ytdlp.search_youtube(query, max_results=20)
for video in results:
ch = video.get("channel", {})
yt_id = ch.get("youtube_channel_id")
name = (ch.get("name") or "").strip()
if yt_id and name and yt_id not in followed_yt_ids:
if yt_id not in discovered:
discovered[yt_id] = {"name": name, "count": 0, "previews": []}
discovered[yt_id]["count"] += 1
previews = discovered[yt_id]["previews"]
if len(previews) < 3 and video.get("thumbnail_url") and video.get("title"):
previews.append({
"thumbnail_url": video["thumbnail_url"],
"title": video["title"],
})
except Exception:
continue
if not discovered:
return
candidates = sorted(discovered.items(), key=lambda x: -x[1]["count"])
needs_indexing: list[int] = []
for yt_id, info in candidates:
channel = db.query(Channel).filter_by(youtube_channel_id=yt_id).first()
is_new = channel is None
if not channel:
channel = Channel(
youtube_channel_id=yt_id,
name=info["name"],
description="",
thumbnail_url=None,
)
db.add(channel)
db.flush()
uc = db.query(UserChannel).filter_by(user_id=user_id, channel_id=channel.id).first()
if uc and uc.status in ("followed", "dismissed"):
continue
preview_json = json.dumps(info["previews"]) if info["previews"] else None
_add_to_discovery(
db, user_id, channel.id,
score=float(info["count"]) * score_multiplier,
source=source,
preview_json=preview_json,
)
if is_new or not channel.crawled_at:
needs_indexing.append(channel.id)
db.commit()
for channel_id in needs_indexing[:5]:
channel = db.query(Channel).filter_by(id=channel_id).first()
if channel:
_fetch_and_index_channel(db, channel)
def crawl_by_search(db: Session, user_id: int):
"""Discover channels by searching YouTube using tags, categories, and channel names."""
# All followed channels (names + yt_ids)
followed_rows = db.execute(
text("""
SELECT c.name, c.youtube_channel_id
FROM channels c
JOIN user_channels uc ON c.id = uc.channel_id
WHERE uc.user_id = :user_id AND uc.status = 'followed'
"""),
{"user_id": user_id},
).mappings().all()
followed_yt_ids = {row["youtube_channel_id"] for row in followed_rows}
followed_names = [row["name"] for row in followed_rows if row["name"]]
# Top tags from followed channels' indexed videos + liked videos
# SQLite requires LIMIT inside a subquery when used with UNION ALL
tag_rows = db.execute(
text("""
SELECT tags FROM (
SELECT v.tags
FROM videos v
JOIN user_channels uc ON v.channel_id = uc.channel_id
WHERE uc.user_id = :user_id AND uc.status = 'followed'
AND v.tags IS NOT NULL AND v.tags != '' AND v.tags != '[]'
LIMIT 300
)
UNION ALL
SELECT tags FROM (
SELECT v.tags
FROM user_videos uv
JOIN videos v ON uv.video_id = v.id
WHERE uv.user_id = :user_id AND uv.liked = 1
AND v.tags IS NOT NULL AND v.tags != '' AND v.tags != '[]'
LIMIT 100
)
"""),
{"user_id": user_id},
).mappings().all()
tag_counts: dict[str, int] = {}
for row in tag_rows:
try:
tags = json.loads(row["tags"])
for tag in tags:
if isinstance(tag, str):
t = tag.lower().strip()
if 3 <= len(t) <= 40:
tag_counts[t] = tag_counts.get(t, 0) + 1
except (json.JSONDecodeError, TypeError):
continue
# Top categories as fallback
cat_rows = db.execute(
text("""
SELECT v.category, COUNT(*) AS cnt
FROM videos v
JOIN user_channels uc ON v.channel_id = uc.channel_id
WHERE uc.user_id = :user_id AND uc.status = 'followed'
AND v.category IS NOT NULL
GROUP BY v.category
ORDER BY cnt DESC
LIMIT 5
"""),
{"user_id": user_id},
).mappings().all()
# Build query pool: top tags + random channel names + categories
top_tags = [t for t, _ in sorted(tag_counts.items(), key=lambda x: -x[1])[:6]]
top_cats = [r["category"] for r in cat_rows]
# Random sample of followed channel names — diversifies discovery each run
sampled_names: list[str] = []
if followed_names:
sampled_names = random.sample(followed_names, min(8, len(followed_names)))
# Combine: tags (most signal) + channel names (broad reach) + categories (fallback)
queries = list(dict.fromkeys(top_tags + sampled_names + top_cats))[:15]
if not queries:
return
_search_and_store(db, user_id, queries, followed_yt_ids, score_multiplier=5.0, source="search")
def update_community_signal(db: Session, user_id: int):
"""Surface channels that other users follow, weighted by follower count."""
rows = db.execute(
text("""
SELECT uc.channel_id, COUNT(DISTINCT uc.user_id) AS follower_count
FROM user_channels uc
WHERE uc.user_id != :user_id
AND uc.status = 'followed'
AND uc.channel_id NOT IN (
SELECT channel_id FROM user_channels
WHERE user_id = :user_id
)
GROUP BY uc.channel_id
ORDER BY follower_count DESC
LIMIT 100
"""),
{"user_id": user_id},
).mappings().all()
for row in rows:
_add_to_discovery(
db, user_id, row["channel_id"],
score=float(row["follower_count"]) * 5,
source="community",
)
db.commit()
def update_category_clusters(db: Session, user_id: int):
"""Find channels in categories the user watches heavily."""
rows = db.execute(
text("""
SELECT v.category, COUNT(*) AS watch_count
FROM user_videos uv
JOIN videos v ON uv.video_id = v.id
WHERE uv.user_id = :user_id AND uv.watched = 1 AND v.category IS NOT NULL
GROUP BY v.category
ORDER BY watch_count DESC
LIMIT 5
"""),
{"user_id": user_id},
).mappings().all()
top_categories = [r["category"] for r in rows]
if not top_categories:
return
placeholders = ",".join(f"'{c}'" for c in top_categories)
candidate_rows = db.execute(
text(f"""
SELECT DISTINCT v.channel_id
FROM videos v
WHERE v.category IN ({placeholders})
AND v.channel_id NOT IN (
SELECT channel_id FROM user_channels WHERE user_id = :user_id
)
LIMIT 100
"""),
{"user_id": user_id},
).mappings().all()
for row in candidate_rows:
_add_to_discovery(db, user_id, row["channel_id"], score=3.0, source="category")
db.commit()
def update_liked_signal(db: Session, user_id: int):
"""Search YouTube for channels related to topics extracted from liked videos."""
liked_rows = db.execute(
text("""
SELECT v.tags
FROM user_videos uv
JOIN videos v ON uv.video_id = v.id
WHERE uv.user_id = :user_id AND uv.liked = 1
AND v.tags IS NOT NULL AND v.tags != '' AND v.tags != '[]'
"""),
{"user_id": user_id},
).mappings().all()
if not liked_rows:
return
tag_counts: dict[str, int] = {}
for row in liked_rows:
try:
tags = json.loads(row["tags"])
for tag in tags:
if isinstance(tag, str):
t = tag.lower().strip()
if 3 <= len(t) <= 40:
tag_counts[t] = tag_counts.get(t, 0) + 2
except (json.JSONDecodeError, TypeError):
pass
if not tag_counts:
return
followed_yt_ids = set(db.execute(
text("""
SELECT c.youtube_channel_id FROM channels c
JOIN user_channels uc ON c.id = uc.channel_id
WHERE uc.user_id = :user_id AND uc.status = 'followed'
"""),
{"user_id": user_id},
).scalars().all())
top_tags = [t for t, _ in sorted(tag_counts.items(), key=lambda x: -x[1])[:6]]
_search_and_store(db, user_id, top_tags, followed_yt_ids, score_multiplier=10.0, source="liked")
def update_watch_signal(db: Session, user_id: int):
"""Discover channels from watched video topics, dampened so a single view has little effect.
A tag needs to appear in at least 3 distinct watched videos before it influences
discovery. Each qualifying tag contributes a modest score (×3 vs liked ×10),
so watching a single Tokyo video won't flood recommendations with Tokyo content.
"""
rows = db.execute(
text("""
SELECT v.tags
FROM user_videos uv
JOIN videos v ON uv.video_id = v.id
WHERE uv.user_id = :user_id AND uv.watched = 1
AND v.tags IS NOT NULL AND v.tags != '' AND v.tags != '[]'
"""),
{"user_id": user_id},
).mappings().all()
if not rows:
return
tag_counts: dict[str, int] = {}
for row in rows:
try:
tags = json.loads(row["tags"])
seen = set()
for tag in tags:
if isinstance(tag, str):
t = tag.lower().strip()
if 3 <= len(t) <= 40 and t not in seen:
tag_counts[t] = tag_counts.get(t, 0) + 1
seen.add(t)
except (json.JSONDecodeError, TypeError):
pass
# Only use tags that appear across 3+ distinct watched videos
qualified = {t: c for t, c in tag_counts.items() if c >= 3}
if not qualified:
return
followed_yt_ids = set(db.execute(
text("""
SELECT c.youtube_channel_id FROM channels c
JOIN user_channels uc ON c.id = uc.channel_id
WHERE uc.user_id = :user_id AND uc.status = 'followed'
"""),
{"user_id": user_id},
).scalars().all())
top_tags = [t for t, _ in sorted(qualified.items(), key=lambda x: -x[1])[:6]]
_search_and_store(db, user_id, top_tags, followed_yt_ids, score_multiplier=3.0, source="watched")
def _build_user_tag_profile(db: Session, user_id: int) -> dict[str, float]:
"""Return a weighted tag dict from liked (weight 3) + watched (weight 1) videos."""
rows = db.execute(
text("""
SELECT v.tags, MAX(uv.liked) AS liked
FROM user_videos uv
JOIN videos v ON uv.video_id = v.id
WHERE uv.user_id = :user_id AND (uv.liked = 1 OR uv.watched = 1)
AND v.tags IS NOT NULL AND v.tags != '' AND v.tags != '[]'
GROUP BY v.id
"""),
{"user_id": user_id},
).mappings().all()
profile: dict[str, float] = {}
for row in rows:
weight = 3.0 if row["liked"] else 1.0
try:
for tag in json.loads(row["tags"]):
if isinstance(tag, str):
t = tag.lower().strip()
if 3 <= len(t) <= 40:
profile[t] = profile.get(t, 0.0) + weight
except (json.JSONDecodeError, TypeError):
pass
return profile
def _tag_relevance_score(tag_profile: dict[str, float], tags_json: str | None) -> float:
"""Score a candidate channel's tags against the user's interest profile."""
if not tag_profile or not tags_json:
return 0.0
try:
tags = json.loads(tags_json)
except (json.JSONDecodeError, TypeError):
return 0.0
score = 0.0
for tag in tags:
if isinstance(tag, str):
t = tag.lower().strip()
score += tag_profile.get(t, 0.0)
return min(score, 50.0)
def _dismissed_channel_tags(db: Session, user_id: int) -> set[str]:
"""Collect tags of channels this user explicitly dismissed — used to avoid similar content."""
rows = db.execute(
text("""
SELECT v.tags
FROM user_channels uc
JOIN videos v ON v.channel_id = uc.channel_id
WHERE uc.user_id = :user_id AND uc.status = 'dismissed'
AND v.tags IS NOT NULL AND v.tags != '' AND v.tags != '[]'
LIMIT 500
"""),
{"user_id": user_id},
).mappings().all()
bad_tags: dict[str, int] = {}
for row in rows:
try:
for tag in json.loads(row["tags"]):
if isinstance(tag, str):
t = tag.lower().strip()
if 3 <= len(t) <= 40:
bad_tags[t] = bad_tags.get(t, 0) + 1
except (json.JSONDecodeError, TypeError):
pass
# Only include tags that appeared in 3+ dismissed-channel videos (strong signal)
return {t for t, c in bad_tags.items() if c >= 3}
def update_trending_signal(db: Session, user_id: int, regions: list[str]):
"""Fetch trending videos per region and score them by tag overlap with user interests."""
if not regions:
return
tag_profile = _build_user_tag_profile(db, user_id)
dismiss_tags = _dismissed_channel_tags(db, user_id)
followed_yt_ids = set(db.execute(
text("""
SELECT c.youtube_channel_id FROM channels c
JOIN user_channels uc ON c.id = uc.channel_id
WHERE uc.user_id = :user_id AND uc.status = 'followed'
"""),
{"user_id": user_id},
).scalars().all())
dismissed_channel_ids = set(db.execute(
text("""
SELECT channel_id FROM user_channels
WHERE user_id = :user_id AND status = 'dismissed'
"""),
{"user_id": user_id},
).scalars().all())
discovered: dict[str, dict] = {}
for region in regions:
try:
videos = ytdlp.fetch_trending(region=region, max_results=50)
for video in videos:
ch = video.get("channel", {})
yt_id = ch.get("youtube_channel_id")
name = (ch.get("name") or "").strip()
if not yt_id or not name or yt_id in followed_yt_ids:
continue
if yt_id not in discovered:
discovered[yt_id] = {"name": name, "count": 0, "regions": set(), "previews": []}
discovered[yt_id]["count"] += 1
discovered[yt_id]["regions"].add(region)
previews = discovered[yt_id]["previews"]
if len(previews) < 3 and video.get("thumbnail_url") and video.get("title"):
previews.append({
"thumbnail_url": video["thumbnail_url"],
"title": video["title"],
})
except Exception:
continue
if not discovered:
return
needs_indexing: list[int] = []
for yt_id, info in discovered.items():
channel = db.query(Channel).filter_by(youtube_channel_id=yt_id).first()
is_new = channel is None
if not channel:
channel = Channel(
youtube_channel_id=yt_id,
name=info["name"],
description="",
thumbnail_url=None,
)
db.add(channel)
db.flush()
if channel.id in dismissed_channel_ids:
continue
uc = db.query(UserChannel).filter_by(user_id=user_id, channel_id=channel.id).first()
if uc and uc.status in ("followed", "dismissed"):
continue
# Score: base ×4 per region × count, boosted by tag relevance, penalised by dismiss-tag overlap
base_score = float(info["count"]) * 4.0 * len(info["regions"])
# Tag relevance boost (requires channel to have indexed videos)
tag_boost = 0.0
if not is_new and channel.crawled_at:
tag_rows = db.execute(
text("SELECT tags FROM videos WHERE channel_id = :cid AND tags IS NOT NULL LIMIT 20"),
{"cid": channel.id},
).scalars().all()
for tags_json in tag_rows:
tag_boost += _tag_relevance_score(tag_profile, tags_json)
tag_boost = min(tag_boost, 30.0)
# Dismiss penalty: if channel's tags overlap heavily with dismissed content, reduce score
dismiss_penalty = 0.0
if dismiss_tags and not is_new:
tag_rows2 = db.execute(
text("SELECT tags FROM videos WHERE channel_id = :cid AND tags IS NOT NULL LIMIT 20"),
{"cid": channel.id},
).scalars().all()
for tags_json in tag_rows2:
try:
for tag in json.loads(tags_json or "[]"):
if isinstance(tag, str) and tag.lower().strip() in dismiss_tags:
dismiss_penalty += 5.0
except (json.JSONDecodeError, TypeError):
pass
dismiss_penalty = min(dismiss_penalty, base_score * 0.8)
final_score = base_score + tag_boost - dismiss_penalty
if final_score <= 0:
continue
preview_json = json.dumps(info["previews"]) if info["previews"] else None
_add_to_discovery(db, user_id, channel.id, score=final_score, source="trending", preview_json=preview_json)
if is_new or not channel.crawled_at:
needs_indexing.append(channel.id)
db.commit()
for channel_id in needs_indexing[:5]:
channel = db.query(Channel).filter_by(id=channel_id).first()
if channel:
_fetch_and_index_channel(db, channel)
def run_full_discovery(db: Session, user_id: int, regions: list[str] | None = None):
if regions is None:
regions = ["US", "SE"]
crawl_by_search(db, user_id)
update_community_signal(db, user_id)
update_category_clusters(db, user_id)
update_liked_signal(db, user_id)
update_watch_signal(db, user_id)
update_trending_signal(db, user_id, regions)

View File

@@ -0,0 +1,86 @@
"""Surprise Me scoring logic."""
import random
from datetime import datetime, time
from sqlalchemy.orm import Session
from sqlalchemy import text
SURPRISE_SQL = """
WITH candidate_scores AS (
SELECT
v.id AS video_id,
v.youtube_video_id,
v.title,
v.thumbnail_url,
v.duration_seconds,
v.channel_id,
c.name AS channel_name,
c.thumbnail_url AS channel_thumbnail_url,
uv.watched,
uv.watch_progress_seconds,
uv.downloaded,
uv.last_watched_at,
-- Unplayed download bonus
CASE WHEN uv.downloaded = 1 AND (uv.watched IS NULL OR uv.watched = 0) THEN 40 ELSE 0 END
-- Recency penalty
+ CASE
WHEN uv.last_watched_at IS NOT NULL
AND uv.last_watched_at > datetime('now', '-7 days') THEN -50
WHEN uv.last_watched_at IS NOT NULL
AND uv.last_watched_at > datetime('now', '-30 days') THEN -20
ELSE 0
END
-- Late evening duration bonus (applied in Python)
+ :duration_bonus_active * CASE WHEN v.duration_seconds > 2700 THEN 10 ELSE 0 END
-- Random jitter
+ (ABS(RANDOM()) % 11 - 5) AS base_score
FROM videos v
JOIN user_videos uv ON v.id = uv.video_id AND uv.user_id = :user_id
JOIN channels c ON v.channel_id = c.id
WHERE uv.downloaded = 1
)
SELECT * FROM candidate_scores
ORDER BY base_score DESC
LIMIT 50
"""
def get_surprise_videos(db: Session, user_id: int, limit: int = 10) -> list[dict]:
now = datetime.now()
late_evening = now.time() >= time(21, 0)
rows = db.execute(
text(SURPRISE_SQL),
{"user_id": user_id, "duration_bonus_active": 1 if late_evening else 0},
).mappings().all()
# Apply channel diversity penalty in Python
seen_channels: dict[int, int] = {}
results = []
for row in rows:
row = dict(row)
channel_id = row["channel_id"]
penalty = seen_channels.get(channel_id, 0) * 30
row["final_score"] = row["base_score"] - penalty
seen_channels[channel_id] = seen_channels.get(channel_id, 0) + 1
results.append(row)
results.sort(key=lambda r: r["final_score"], reverse=True)
return results[:limit]
def get_discovery_injection(db: Session, user_id: int) -> dict | None:
"""Return one unseen discovery queue item to inject into Surprise Me."""
row = db.execute(
text("""
SELECT dq.id, c.id AS channel_id, c.name, c.thumbnail_url,
dq.source, dq.score
FROM discovery_queue dq
JOIN channels c ON dq.channel_id = c.id
WHERE dq.user_id = :user_id AND dq.seen = 0
ORDER BY dq.score DESC
LIMIT 1
"""),
{"user_id": user_id},
).mappings().first()
return dict(row) if row else None

486
backend/services/ytdlp.py Normal file
View File

@@ -0,0 +1,486 @@
"""Subprocess wrapper for yt-dlp."""
import json
import re
import subprocess
import threading
import urllib.request
import xml.etree.ElementTree as ET
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from ..config import settings
def _run(args: list[str], timeout: int = 60) -> tuple[str, str, int]:
result = subprocess.run(args, capture_output=True, text=True, timeout=timeout)
return result.stdout, result.stderr, result.returncode
def _parse_date(date_str: str | None) -> datetime | None:
if not date_str:
return None
try:
return datetime.strptime(date_str, "%Y%m%d")
except ValueError:
return None
def _parse_published(info: dict) -> datetime | None:
"""Extract publish date from yt-dlp info dict.
Tries upload_date (YYYYMMDD string) first, then timestamp (Unix epoch),
then release_timestamp. Flat-playlist entries often omit upload_date but
include timestamp, so the fallback is important.
"""
d = _parse_date(info.get("upload_date"))
if d:
return d
for key in ("timestamp", "release_timestamp"):
ts = info.get(key)
if ts:
try:
return datetime.utcfromtimestamp(float(ts))
except (ValueError, OSError, OverflowError):
pass
return None
def _stable_thumbnail(video_id: str | None) -> str | None:
if not video_id:
return None
return f"https://i.ytimg.com/vi/{video_id}/hqdefault.jpg"
def _normalize_video(info: dict) -> dict:
video_id = info.get("id")
raw_chapters = info.get("chapters") or []
chapters = [
{
"start_time": int(ch.get("start_time") or 0),
"end_time": int(ch.get("end_time") or 0),
"title": ch.get("title") or "",
}
for ch in raw_chapters
if ch.get("title")
]
return {
"youtube_video_id": video_id,
"title": info.get("title", ""),
"description": info.get("description", ""),
"thumbnail_url": _stable_thumbnail(video_id),
"duration_seconds": info.get("duration"),
"published_at": _parse_published(info),
"tags": json.dumps(info.get("tags") or []),
"category": info.get("category") or (info.get("categories") or [None])[0],
"chapters": json.dumps(chapters) if chapters else None,
"channel": {
"youtube_channel_id": info.get("channel_id"),
"name": info.get("channel") or info.get("uploader", ""),
"thumbnail_url": None,
},
}
def _channel_avatar(thumbnails: list | None) -> str | None:
"""Pick the channel avatar from yt-dlp's thumbnails list.
YouTube returns banners and avatars in the same array. Avatars have id
'avatar_uncropped' or are roughly square (width ≈ height).
"""
if not thumbnails:
return None
for t in thumbnails:
if "avatar" in str(t.get("id") or "").lower():
return t.get("url")
# Fall back to the most square thumbnail
square = [t for t in thumbnails
if t.get("width") and t.get("height")
and t["width"] <= t["height"] * 1.2
and t["height"] <= t["width"] * 1.2]
if square:
return max(square, key=lambda t: t.get("width") or 0).get("url")
return None
def _normalize_channel(info: dict) -> dict:
return {
"youtube_channel_id": info.get("channel_id") or info.get("id"),
"name": info.get("channel") or info.get("title") or info.get("uploader") or None,
"description": info.get("description") or None,
"thumbnail_url": _channel_avatar(info.get("thumbnails")),
"banner_url": None,
"subscriber_count": info.get("channel_follower_count"),
}
def search_youtube(query: str, max_results: int = 40) -> list[dict]:
"""Search YouTube via yt-dlp. Uses --flat-playlist for fast results."""
stdout, _, code = _run([
"yt-dlp",
f"ytsearch{max_results}:{query}",
"--dump-json",
"--flat-playlist",
"--quiet",
*_cookie_args(),
], timeout=60)
results = []
for line in stdout.splitlines():
line = line.strip()
if not line:
continue
try:
info = json.loads(line)
# flat-playlist entries have _type="url" with basic fields
if info.get("_type") in ("url", None) and info.get("id"):
results.append({
"youtube_video_id": info.get("id"),
"title": info.get("title", ""),
"description": info.get("description") or "",
"thumbnail_url": _stable_thumbnail(info.get("id")),
"duration_seconds": info.get("duration"),
"published_at": _parse_published(info),
"tags": json.dumps(info.get("tags") or []),
"category": None,
"channel": {
"youtube_channel_id": info.get("channel_id"),
"name": info.get("channel") or info.get("uploader") or "",
"thumbnail_url": None,
},
})
except json.JSONDecodeError:
continue
return results
def fetch_trending(region: str = "US", max_results: int = 50) -> list[dict]:
"""Fetch trending videos for a region via yt-dlp search with date-sort filter.
Uses the YouTube search sort-by-upload-date URL that reliably returns regional
results. Falls back gracefully to an empty list on error.
"""
region = region.upper()
# CAI%3D = sort by upload date; gl= sets the region
url = f"https://www.youtube.com/results?search_query=trending&sp=CAI%253D&gl={region}"
stdout, _, code = _run([
"yt-dlp",
url,
"--dump-json",
"--flat-playlist",
"--quiet",
"--playlist-end", str(max_results),
*_cookie_args(),
], timeout=60)
results = []
for line in stdout.splitlines():
line = line.strip()
if not line:
continue
try:
info = json.loads(line)
if info.get("_type") in ("url", None) and info.get("id"):
results.append({
"youtube_video_id": info.get("id"),
"title": info.get("title", ""),
"thumbnail_url": _stable_thumbnail(info.get("id")),
"duration_seconds": info.get("duration"),
"published_at": _parse_published(info),
"tags": json.dumps(info.get("tags") or []),
"category": None,
"channel": {
"youtube_channel_id": info.get("channel_id"),
"name": info.get("channel") or info.get("uploader") or "",
"thumbnail_url": None,
},
})
except json.JSONDecodeError:
continue
return results
def _best_thumbnail(thumbnails: list | None) -> str | None:
if not thumbnails:
return None
# pick the one closest to 480px wide
best = sorted(thumbnails, key=lambda t: abs((t.get("width") or 0) - 480))
return best[0].get("url") if best else None
def fetch_video_metadata(video_id: str) -> dict | None:
"""Fetch metadata for a single video by YouTube ID."""
url = f"https://www.youtube.com/watch?v={video_id}"
stdout, _, code = _run([
"yt-dlp",
url,
"--dump-json",
"--no-download",
"--no-playlist",
"--quiet",
*_cookie_args(),
], timeout=30)
for line in stdout.splitlines():
line = line.strip()
if not line:
continue
try:
info = json.loads(line)
return _normalize_video(info)
except json.JSONDecodeError:
continue
return None
def _rss_dates(uc_channel_id: str) -> dict[str, datetime]:
"""Fetch publish dates for the 15 most recent videos from YouTube's RSS feed.
Fast, unauthenticated, and returns precise dates. Only works for UC… IDs.
"""
if not uc_channel_id or not uc_channel_id.startswith("UC"):
return {}
url = f"https://www.youtube.com/feeds/videos.xml?channel_id={uc_channel_id}"
try:
req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"})
with urllib.request.urlopen(req, timeout=10) as resp:
xml_data = resp.read()
root = ET.fromstring(xml_data)
ns = {
"atom": "http://www.w3.org/2005/Atom",
"yt": "http://www.youtube.com/xml/schemas/2015",
}
dates: dict[str, datetime] = {}
for entry in root.findall("atom:entry", ns):
vid_el = entry.find("yt:videoId", ns)
pub_el = entry.find("atom:published", ns)
if vid_el is not None and pub_el is not None and vid_el.text and pub_el.text:
try:
dt = datetime.fromisoformat(pub_el.text.replace("Z", "+00:00"))
dates[vid_el.text] = dt.replace(tzinfo=None)
except ValueError:
pass
return dates
except Exception:
return {}
def fetch_channel_metadata(channel_id: str, max_videos: int = 30) -> dict | None:
"""Fetch channel info + recent videos.
Uses --dump-single-json --flat-playlist for speed, then enriches video dates
from YouTube's RSS feed (gives precise dates for the 15 most recent videos).
"""
if channel_id.startswith("@"):
url = f"https://www.youtube.com/{channel_id}/videos"
else:
url = f"https://www.youtube.com/channel/{channel_id}/videos"
args = [
"yt-dlp", url,
"--dump-single-json",
"--flat-playlist",
"--quiet",
*_cookie_args(),
]
if max_videos > 0:
args += ["--playlist-end", str(max_videos)]
stdout, _, code = _run(args, timeout=60)
if not stdout.strip():
return None
try:
info = json.loads(stdout.strip())
except json.JSONDecodeError:
return None
if not info.get("id") and not info.get("channel_id"):
return None
channel_info = _normalize_channel(info)
# Fetch RSS dates — fast single HTTP request, precise dates for ≤15 newest videos
uc_id = channel_info.get("youtube_channel_id") or ""
rss = _rss_dates(uc_id)
videos = []
for entry in info.get("entries") or []:
vid_id = entry.get("id")
if not vid_id:
continue
published_at = rss.get(vid_id) or _parse_published(entry)
videos.append({
"youtube_video_id": vid_id,
"title": entry.get("title") or "",
"description": entry.get("description") or None,
"thumbnail_url": _stable_thumbnail(vid_id),
"duration_seconds": entry.get("duration"),
"published_at": published_at,
"tags": json.dumps(entry.get("tags") or []),
"category": (entry.get("categories") or [None])[0],
"channel": {
"youtube_channel_id": channel_info.get("youtube_channel_id"),
"name": channel_info.get("name") or "",
"thumbnail_url": None,
},
})
return {"channel": channel_info, "videos": videos}
def fetch_channel_links(channel_id: str) -> list[str]:
"""Extract linked channel IDs from a channel's about/description."""
if channel_id.startswith("@"):
url = f"https://www.youtube.com/{channel_id}/about"
else:
url = f"https://www.youtube.com/channel/{channel_id}/about"
stdout, _, code = _run([
"yt-dlp",
url,
"--dump-json",
"--no-download",
"--flat-playlist",
"--playlist-end", "1",
"--quiet",
*_cookie_args(),
], timeout=30)
channel_ids = set()
for line in stdout.splitlines():
line = line.strip()
if not line:
continue
try:
info = json.loads(line)
desc = info.get("description", "") or ""
for match in re.finditer(r"youtube\.com/channel/(UC[\w-]+)", desc):
channel_ids.add(match.group(1))
for match in re.finditer(r"youtube\.com/@([\w-]+)", desc):
channel_ids.add(f"@{match.group(1)}")
except json.JSONDecodeError:
continue
return list(channel_ids)
QUALITY_FORMATS = {
"best": "bestvideo[ext=mp4][vcodec^=avc1]+bestaudio[ext=m4a]/bestvideo[ext=mp4]+bestaudio[ext=m4a]/22/18/bestvideo+bestaudio/best",
"2160p": "bestvideo[ext=mp4][height<=2160]+bestaudio[ext=m4a]/bestvideo[height<=2160]+bestaudio/best[height<=2160]",
"1440p": "bestvideo[ext=mp4][height<=1440]+bestaudio[ext=m4a]/bestvideo[height<=1440]+bestaudio/best[height<=1440]",
"1080p": "bestvideo[ext=mp4][vcodec^=avc1][height<=1080]+bestaudio[ext=m4a]/bestvideo[ext=mp4][height<=1080]+bestaudio[ext=m4a]/137+140/22/best[height<=1080]",
"720p": "bestvideo[ext=mp4][vcodec^=avc1][height<=720]+bestaudio[ext=m4a]/bestvideo[ext=mp4][height<=720]+bestaudio[ext=m4a]/22/best[height<=720]",
"480p": "bestvideo[ext=mp4][vcodec^=avc1][height<=480]+bestaudio[ext=m4a]/bestvideo[ext=mp4][height<=480]+bestaudio[ext=m4a]/18/best[height<=480]",
"360p": "bestvideo[ext=mp4][height<=360]+bestaudio[ext=m4a]/18/best[height<=360]",
"240p": "bestvideo[ext=mp4][height<=240]+bestaudio[ext=m4a]/best[height<=240]",
"144p": "bestvideo[ext=mp4][height<=144]+bestaudio[ext=m4a]/best[height<=144]",
}
def detect_resolution(file_path: str) -> str | None:
"""Use ffprobe to get the video stream height and return a label like '1080p'."""
try:
result = subprocess.run(
["ffprobe", "-v", "quiet", "-select_streams", "v:0",
"-show_entries", "stream=height", "-of", "csv=p=0", file_path],
capture_output=True, text=True, timeout=15,
)
height = int(result.stdout.strip())
if height >= 1080: return "1080p"
if height >= 720: return "720p"
if height >= 480: return "480p"
if height >= 360: return "360p"
return f"{height}p"
except Exception:
return None
def predicted_file_path(video_id: str) -> Path:
"""Return the expected output path for a video download."""
return Path(settings.download_path) / f"{video_id}.mp4"
_SEMAPHORE = threading.Semaphore(3)
_semaphore_lock = threading.Lock()
_cookies_browser: str = ""
_cookies_lock = threading.Lock()
def set_max_concurrent(n: int) -> None:
global _SEMAPHORE
with _semaphore_lock:
_SEMAPHORE = threading.Semaphore(max(1, min(n, 10)))
def set_cookies_browser(browser: str) -> None:
global _cookies_browser
with _cookies_lock:
_cookies_browser = browser.strip().lower()
def _cookie_args() -> list[str]:
with _cookies_lock:
b = _cookies_browser
return ["--cookies-from-browser", b] if b else []
def start_download(
video_id: str,
download_id: int,
on_progress: Any,
on_complete: Any,
on_error: Any,
quality: str = "best",
) -> None:
"""Start yt-dlp download in a background thread.
Uses a single progressive MP4 format so the file is playable as it downloads.
--no-part writes directly to the final filename (no .part rename at the end).
"""
url = f"https://www.youtube.com/watch?v={video_id}"
# Predictable output path — lets the player start before download finishes
output_template = str(Path(settings.download_path) / f"{video_id}.%(ext)s")
fmt = QUALITY_FORMATS.get(quality, QUALITY_FORMATS["best"])
def _run_download():
with _SEMAPHORE:
process = subprocess.Popen(
[
"yt-dlp", url,
"-f", fmt,
"--merge-output-format", "mp4",
"--postprocessor-args", "Merger+ffmpeg:-movflags +faststart",
"--embed-metadata", "--embed-thumbnail",
"--no-part", "--no-mtime",
"-o", output_template,
"--newline", "--progress", "--no-colors",
*_cookie_args(),
],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
)
file_path = None
stream_index = 0
for line in process.stdout:
line = line.strip()
if re.search(r"\[download\] Destination:", line):
stream_index += 1
m = re.search(r"\[download\]\s+([\d.]+)%", line)
if m:
pct = float(m.group(1))
scaled = pct * 0.85 if stream_index <= 1 else 85.0 + pct * 0.10
on_progress(download_id, min(scaled, 95.0))
m2 = re.search(r"\[(?:download|Merger)\] Destination: (.+)", line)
if m2:
file_path = m2.group(1).strip()
process.wait()
if process.returncode == 0:
resolution = detect_resolution(file_path) if file_path else None
on_complete(download_id, file_path, resolution)
else:
on_error(download_id, f"yt-dlp exited with code {process.returncode}")
thread = threading.Thread(target=_run_download, daemon=True)
thread.start()