From 871f668525e9f8ec6d60c636eb318ffcc952554a Mon Sep 17 00:00:00 2001 From: Mattias Thall Date: Tue, 26 May 2026 21:59:23 +0200 Subject: [PATCH] Parallelize discovery searches and add graph signal MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Run search queries concurrently (5 workers) instead of sequentially — cuts crawl time dramatically. Add graph signal: fetch featured channels from followed channels' /channels tab in parallel (4 workers), which surfaces creator-curated recommendations as a high-signal, diverse pool that search alone can't reach. Co-Authored-By: Claude Sonnet 4.6 --- backend/services/discovery.py | 88 +++++++++++++++++++++++++++++++++-- backend/services/ytdlp.py | 33 +++++++++++++ 2 files changed, 116 insertions(+), 5 deletions(-) diff --git a/backend/services/discovery.py b/backend/services/discovery.py index 40f4a0f..4aa8beb 100644 --- a/backend/services/discovery.py +++ b/backend/services/discovery.py @@ -1,6 +1,7 @@ """Discovery engine — search-based crawl, trending, community signal, category clustering.""" import json import random +from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime from sqlalchemy.orm import Session from sqlalchemy import text @@ -104,10 +105,17 @@ def _search_and_store( ): """Run YouTube searches for the given queries and add results to discovery.""" discovered: dict[str, dict] = {} - for query in queries: + + def _do_search(query: str) -> list[dict]: try: - results = ytdlp.search_youtube(query, max_results=40) - for video in results: + return ytdlp.search_youtube(query, max_results=40) + except Exception: + return [] + + with ThreadPoolExecutor(max_workers=5) as pool: + futures = {pool.submit(_do_search, q): q for q in queries} + for fut in as_completed(futures): + for video in fut.result(): ch = video.get("channel", {}) yt_id = ch.get("youtube_channel_id") name = (ch.get("name") or "").strip() @@ -121,8 +129,6 @@ def _search_and_store( "thumbnail_url": video["thumbnail_url"], "title": video["title"], }) - except Exception: - continue if not discovered: return @@ -577,6 +583,77 @@ def update_trending_signal(db: Session, user_id: int, regions: list[str]): _fetch_and_index_channel(db, channel) +def update_graph_signal(db: Session, user_id: int): + """Discover channels featured on followed channels' /channels tab. + + Channels that creators explicitly recommend are high-signal — they're + curated by someone whose taste you already follow. Samples up to 12 followed + channels per run and fetches their featured channels list in parallel. + """ + followed_rows = db.execute( + text(""" + SELECT c.youtube_channel_id, c.id + FROM channels c + JOIN user_channels uc ON c.id = uc.channel_id + WHERE uc.user_id = :user_id AND uc.status = 'followed' + AND c.youtube_channel_id IS NOT NULL + """), + {"user_id": user_id}, + ).mappings().all() + + if not followed_rows: + return + + followed_yt_ids = {row["youtube_channel_id"] for row in followed_rows} + + dismissed_ids = set(db.execute( + text("SELECT channel_id FROM user_channels WHERE user_id = :user_id AND status = 'dismissed'"), + {"user_id": user_id}, + ).scalars().all()) + + sample = random.sample(list(followed_rows), min(12, len(followed_rows))) + + def _fetch(yt_id: str) -> list[str]: + try: + return ytdlp.fetch_featured_channels(yt_id) + except Exception: + return [] + + featured_map: dict[str, list[str]] = {} + with ThreadPoolExecutor(max_workers=4) as pool: + futures = {pool.submit(_fetch, row["youtube_channel_id"]): row for row in sample} + for fut in as_completed(futures): + row = futures[fut] + featured_map[row["youtube_channel_id"]] = fut.result() + + needs_indexing: list[int] = [] + for source_yt_id, channel_ids in featured_map.items(): + for yt_id in channel_ids: + if yt_id in followed_yt_ids: + continue + channel = db.query(Channel).filter_by(youtube_channel_id=yt_id).first() + is_new = channel is None + if not channel: + channel = Channel(youtube_channel_id=yt_id, name="", description="", thumbnail_url=None) + db.add(channel) + db.flush() + if channel.id in dismissed_ids: + continue + uc = db.query(UserChannel).filter_by(user_id=user_id, channel_id=channel.id).first() + if uc and uc.status in ("followed", "dismissed"): + continue + _add_to_discovery(db, user_id, channel.id, score=8.0, source="graph") + if is_new or not channel.crawled_at: + needs_indexing.append(channel.id) + + db.commit() + + for channel_id in needs_indexing[:15]: + channel = db.query(Channel).filter_by(id=channel_id).first() + if channel: + _fetch_and_index_channel(db, channel) + + def run_full_discovery(db: Session, user_id: int, regions: list[str] | None = None): if regions is None: regions = ["US", "SE"] @@ -586,3 +663,4 @@ def run_full_discovery(db: Session, user_id: int, regions: list[str] | None = No update_liked_signal(db, user_id) update_watch_signal(db, user_id) update_trending_signal(db, user_id, regions) + update_graph_signal(db, user_id) diff --git a/backend/services/ytdlp.py b/backend/services/ytdlp.py index 99f676c..4f1d550 100644 --- a/backend/services/ytdlp.py +++ b/backend/services/ytdlp.py @@ -350,6 +350,39 @@ def fetch_channel_metadata(channel_id: str, max_videos: int = 30) -> dict | None return {"channel": channel_info, "videos": videos} +def fetch_featured_channels(channel_id: str) -> list[str]: + """Fetch channel IDs from the /channels tab of a YouTube channel. + + The /channels tab lists channels the creator explicitly recommends — a very + high-signal source for discovery. Returns UC... channel IDs. + """ + if channel_id.startswith("@"): + url = f"https://www.youtube.com/{channel_id}/channels" + else: + url = f"https://www.youtube.com/channel/{channel_id}/channels" + stdout, _, code = _run([ + "yt-dlp", url, + "--dump-json", + "--flat-playlist", + "--quiet", + *_cookie_args(), + ], timeout=30) + + channel_ids: list[str] = [] + for line in stdout.splitlines(): + line = line.strip() + if not line: + continue + try: + info = json.loads(line) + ch_id = info.get("channel_id") or info.get("id") + if ch_id and ch_id.startswith("UC"): + channel_ids.append(ch_id) + except json.JSONDecodeError: + continue + return channel_ids + + def fetch_channel_links(channel_id: str) -> list[str]: """Extract linked channel IDs from a channel's about/description.""" if channel_id.startswith("@"):