Parallelize discovery searches and add graph signal
Run search queries concurrently (5 workers) instead of sequentially — cuts crawl time dramatically. Add graph signal: fetch featured channels from followed channels' /channels tab in parallel (4 workers), which surfaces creator-curated recommendations as a high-signal, diverse pool that search alone can't reach. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -1,6 +1,7 @@
|
|||||||
"""Discovery engine — search-based crawl, trending, community signal, category clustering."""
|
"""Discovery engine — search-based crawl, trending, community signal, category clustering."""
|
||||||
import json
|
import json
|
||||||
import random
|
import random
|
||||||
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from sqlalchemy.orm import Session
|
from sqlalchemy.orm import Session
|
||||||
from sqlalchemy import text
|
from sqlalchemy import text
|
||||||
@@ -104,10 +105,17 @@ def _search_and_store(
|
|||||||
):
|
):
|
||||||
"""Run YouTube searches for the given queries and add results to discovery."""
|
"""Run YouTube searches for the given queries and add results to discovery."""
|
||||||
discovered: dict[str, dict] = {}
|
discovered: dict[str, dict] = {}
|
||||||
for query in queries:
|
|
||||||
|
def _do_search(query: str) -> list[dict]:
|
||||||
try:
|
try:
|
||||||
results = ytdlp.search_youtube(query, max_results=40)
|
return ytdlp.search_youtube(query, max_results=40)
|
||||||
for video in results:
|
except Exception:
|
||||||
|
return []
|
||||||
|
|
||||||
|
with ThreadPoolExecutor(max_workers=5) as pool:
|
||||||
|
futures = {pool.submit(_do_search, q): q for q in queries}
|
||||||
|
for fut in as_completed(futures):
|
||||||
|
for video in fut.result():
|
||||||
ch = video.get("channel", {})
|
ch = video.get("channel", {})
|
||||||
yt_id = ch.get("youtube_channel_id")
|
yt_id = ch.get("youtube_channel_id")
|
||||||
name = (ch.get("name") or "").strip()
|
name = (ch.get("name") or "").strip()
|
||||||
@@ -121,8 +129,6 @@ def _search_and_store(
|
|||||||
"thumbnail_url": video["thumbnail_url"],
|
"thumbnail_url": video["thumbnail_url"],
|
||||||
"title": video["title"],
|
"title": video["title"],
|
||||||
})
|
})
|
||||||
except Exception:
|
|
||||||
continue
|
|
||||||
|
|
||||||
if not discovered:
|
if not discovered:
|
||||||
return
|
return
|
||||||
@@ -577,6 +583,77 @@ def update_trending_signal(db: Session, user_id: int, regions: list[str]):
|
|||||||
_fetch_and_index_channel(db, channel)
|
_fetch_and_index_channel(db, channel)
|
||||||
|
|
||||||
|
|
||||||
|
def update_graph_signal(db: Session, user_id: int):
|
||||||
|
"""Discover channels featured on followed channels' /channels tab.
|
||||||
|
|
||||||
|
Channels that creators explicitly recommend are high-signal — they're
|
||||||
|
curated by someone whose taste you already follow. Samples up to 12 followed
|
||||||
|
channels per run and fetches their featured channels list in parallel.
|
||||||
|
"""
|
||||||
|
followed_rows = db.execute(
|
||||||
|
text("""
|
||||||
|
SELECT c.youtube_channel_id, c.id
|
||||||
|
FROM channels c
|
||||||
|
JOIN user_channels uc ON c.id = uc.channel_id
|
||||||
|
WHERE uc.user_id = :user_id AND uc.status = 'followed'
|
||||||
|
AND c.youtube_channel_id IS NOT NULL
|
||||||
|
"""),
|
||||||
|
{"user_id": user_id},
|
||||||
|
).mappings().all()
|
||||||
|
|
||||||
|
if not followed_rows:
|
||||||
|
return
|
||||||
|
|
||||||
|
followed_yt_ids = {row["youtube_channel_id"] for row in followed_rows}
|
||||||
|
|
||||||
|
dismissed_ids = set(db.execute(
|
||||||
|
text("SELECT channel_id FROM user_channels WHERE user_id = :user_id AND status = 'dismissed'"),
|
||||||
|
{"user_id": user_id},
|
||||||
|
).scalars().all())
|
||||||
|
|
||||||
|
sample = random.sample(list(followed_rows), min(12, len(followed_rows)))
|
||||||
|
|
||||||
|
def _fetch(yt_id: str) -> list[str]:
|
||||||
|
try:
|
||||||
|
return ytdlp.fetch_featured_channels(yt_id)
|
||||||
|
except Exception:
|
||||||
|
return []
|
||||||
|
|
||||||
|
featured_map: dict[str, list[str]] = {}
|
||||||
|
with ThreadPoolExecutor(max_workers=4) as pool:
|
||||||
|
futures = {pool.submit(_fetch, row["youtube_channel_id"]): row for row in sample}
|
||||||
|
for fut in as_completed(futures):
|
||||||
|
row = futures[fut]
|
||||||
|
featured_map[row["youtube_channel_id"]] = fut.result()
|
||||||
|
|
||||||
|
needs_indexing: list[int] = []
|
||||||
|
for source_yt_id, channel_ids in featured_map.items():
|
||||||
|
for yt_id in channel_ids:
|
||||||
|
if yt_id in followed_yt_ids:
|
||||||
|
continue
|
||||||
|
channel = db.query(Channel).filter_by(youtube_channel_id=yt_id).first()
|
||||||
|
is_new = channel is None
|
||||||
|
if not channel:
|
||||||
|
channel = Channel(youtube_channel_id=yt_id, name="", description="", thumbnail_url=None)
|
||||||
|
db.add(channel)
|
||||||
|
db.flush()
|
||||||
|
if channel.id in dismissed_ids:
|
||||||
|
continue
|
||||||
|
uc = db.query(UserChannel).filter_by(user_id=user_id, channel_id=channel.id).first()
|
||||||
|
if uc and uc.status in ("followed", "dismissed"):
|
||||||
|
continue
|
||||||
|
_add_to_discovery(db, user_id, channel.id, score=8.0, source="graph")
|
||||||
|
if is_new or not channel.crawled_at:
|
||||||
|
needs_indexing.append(channel.id)
|
||||||
|
|
||||||
|
db.commit()
|
||||||
|
|
||||||
|
for channel_id in needs_indexing[:15]:
|
||||||
|
channel = db.query(Channel).filter_by(id=channel_id).first()
|
||||||
|
if channel:
|
||||||
|
_fetch_and_index_channel(db, channel)
|
||||||
|
|
||||||
|
|
||||||
def run_full_discovery(db: Session, user_id: int, regions: list[str] | None = None):
|
def run_full_discovery(db: Session, user_id: int, regions: list[str] | None = None):
|
||||||
if regions is None:
|
if regions is None:
|
||||||
regions = ["US", "SE"]
|
regions = ["US", "SE"]
|
||||||
@@ -586,3 +663,4 @@ def run_full_discovery(db: Session, user_id: int, regions: list[str] | None = No
|
|||||||
update_liked_signal(db, user_id)
|
update_liked_signal(db, user_id)
|
||||||
update_watch_signal(db, user_id)
|
update_watch_signal(db, user_id)
|
||||||
update_trending_signal(db, user_id, regions)
|
update_trending_signal(db, user_id, regions)
|
||||||
|
update_graph_signal(db, user_id)
|
||||||
|
|||||||
@@ -350,6 +350,39 @@ def fetch_channel_metadata(channel_id: str, max_videos: int = 30) -> dict | None
|
|||||||
return {"channel": channel_info, "videos": videos}
|
return {"channel": channel_info, "videos": videos}
|
||||||
|
|
||||||
|
|
||||||
|
def fetch_featured_channels(channel_id: str) -> list[str]:
|
||||||
|
"""Fetch channel IDs from the /channels tab of a YouTube channel.
|
||||||
|
|
||||||
|
The /channels tab lists channels the creator explicitly recommends — a very
|
||||||
|
high-signal source for discovery. Returns UC... channel IDs.
|
||||||
|
"""
|
||||||
|
if channel_id.startswith("@"):
|
||||||
|
url = f"https://www.youtube.com/{channel_id}/channels"
|
||||||
|
else:
|
||||||
|
url = f"https://www.youtube.com/channel/{channel_id}/channels"
|
||||||
|
stdout, _, code = _run([
|
||||||
|
"yt-dlp", url,
|
||||||
|
"--dump-json",
|
||||||
|
"--flat-playlist",
|
||||||
|
"--quiet",
|
||||||
|
*_cookie_args(),
|
||||||
|
], timeout=30)
|
||||||
|
|
||||||
|
channel_ids: list[str] = []
|
||||||
|
for line in stdout.splitlines():
|
||||||
|
line = line.strip()
|
||||||
|
if not line:
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
info = json.loads(line)
|
||||||
|
ch_id = info.get("channel_id") or info.get("id")
|
||||||
|
if ch_id and ch_id.startswith("UC"):
|
||||||
|
channel_ids.append(ch_id)
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
continue
|
||||||
|
return channel_ids
|
||||||
|
|
||||||
|
|
||||||
def fetch_channel_links(channel_id: str) -> list[str]:
|
def fetch_channel_links(channel_id: str) -> list[str]:
|
||||||
"""Extract linked channel IDs from a channel's about/description."""
|
"""Extract linked channel IDs from a channel's about/description."""
|
||||||
if channel_id.startswith("@"):
|
if channel_id.startswith("@"):
|
||||||
|
|||||||
Reference in New Issue
Block a user