"""Discovery engine — search-based crawl, trending, community signal, category clustering.""" import json import random from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime from sqlalchemy.orm import Session from sqlalchemy import text from ..models import Channel, UserChannel, DiscoveryQueue, Video from . import ytdlp def _fetch_and_index_channel(db: Session, channel: Channel): """Fetch full metadata + recent videos for a discovered channel.""" try: result = ytdlp.fetch_channel_metadata(channel.youtube_channel_id, max_videos=10) if not result: return ch_data = result.get("channel", {}) for k, v in ch_data.items(): if hasattr(channel, k) and v is not None and v != "": setattr(channel, k, v) channel.crawled_at = datetime.utcnow() videos = result.get("videos", []) # For videos missing a date (RSS didn't cover them or flat-playlist had no timestamp), # do individual fetches — capped at 3 to avoid slow-downs. dateless = [v for v in videos if not v.get("published_at")] individual_fetched: dict[str, dict] = {} for vdata in dateless[:3]: yt_id = vdata.get("youtube_video_id") if not yt_id: continue try: meta = ytdlp.fetch_video_metadata(yt_id) if meta and meta.get("published_at"): individual_fetched[yt_id] = meta except Exception: pass for vdata in videos: yt_id = vdata.get("youtube_video_id") if not yt_id: continue # Prefer individually-fetched metadata if we retrieved it if yt_id in individual_fetched: vdata = individual_fetched[yt_id] # Skip videos we still can't date — undated videos break feed ordering if not vdata.get("published_at"): continue if not db.query(Video).filter_by(youtube_video_id=yt_id).first(): db.add(Video( youtube_video_id=yt_id, channel_id=channel.id, title=vdata.get("title", ""), description=vdata.get("description"), thumbnail_url=vdata.get("thumbnail_url"), duration_seconds=vdata.get("duration_seconds"), published_at=vdata.get("published_at"), tags=vdata.get("tags"), category=vdata.get("category"), )) db.commit() except Exception: db.rollback() def _upsert_channel(db: Session, channel_data: dict) -> Channel | None: yt_id = channel_data.get("youtube_channel_id") if not yt_id: return None channel = db.query(Channel).filter_by(youtube_channel_id=yt_id).first() if not channel: channel = Channel(**channel_data) db.add(channel) db.flush() return channel def _add_to_discovery( db: Session, user_id: int, channel_id: int, score: float, source: str, preview_json: str | None = None, ): existing = db.query(DiscoveryQueue).filter_by(user_id=user_id, channel_id=channel_id).first() if existing: # Accumulate scores across sources but cap to prevent one dominant signal existing.score = existing.score + score * 0.5 if preview_json and not existing.preview_json: existing.preview_json = preview_json return db.add(DiscoveryQueue( user_id=user_id, channel_id=channel_id, score=score, source=source, preview_json=preview_json, )) def _search_and_store( db: Session, user_id: int, queries: list[str], followed_yt_ids: set[str], score_multiplier: float, source: str, neg_affinity_tags: frozenset[str] = frozenset(), ): """Run YouTube searches for the given queries and add results to discovery.""" discovered: dict[str, dict] = {} def _do_search(query: str) -> list[dict]: try: return ytdlp.search_youtube(query, max_results=40) except Exception: return [] with ThreadPoolExecutor(max_workers=5) as pool: futures = {pool.submit(_do_search, q): q for q in queries} for fut in as_completed(futures): for video in fut.result(): ch = video.get("channel", {}) yt_id = ch.get("youtube_channel_id") name = (ch.get("name") or "").strip() if yt_id and name and yt_id not in followed_yt_ids: if yt_id not in discovered: discovered[yt_id] = {"name": name, "count": 0, "previews": []} discovered[yt_id]["count"] += 1 previews = discovered[yt_id]["previews"] if len(previews) < 3 and video.get("thumbnail_url") and video.get("title"): previews.append({ "thumbnail_url": video["thumbnail_url"], "title": video["title"], }) if not discovered: return candidates = sorted(discovered.items(), key=lambda x: -x[1]["count"]) needs_indexing: list[int] = [] for yt_id, info in candidates: channel = db.query(Channel).filter_by(youtube_channel_id=yt_id).first() is_new = channel is None if not channel: channel = Channel( youtube_channel_id=yt_id, name=info["name"], description="", thumbnail_url=None, ) db.add(channel) db.flush() uc = db.query(UserChannel).filter_by(user_id=user_id, channel_id=channel.id).first() if uc and uc.status in ("followed", "dismissed"): continue # Skip channels whose indexed videos heavily overlap with negatively-rated tags if neg_affinity_tags and not is_new and channel.crawled_at: neg_hit = 0 vtags = db.execute( text("SELECT tags FROM videos WHERE channel_id = :cid AND tags IS NOT NULL LIMIT 20"), {"cid": channel.id}, ).scalars().all() for tags_json in vtags: try: for tag in json.loads(tags_json or "[]"): if isinstance(tag, str) and tag.lower().strip() in neg_affinity_tags: neg_hit += 1 except (json.JSONDecodeError, TypeError): pass if neg_hit >= 3: continue preview_json = json.dumps(info["previews"]) if info["previews"] else None _add_to_discovery( db, user_id, channel.id, score=float(info["count"]) * score_multiplier, source=source, preview_json=preview_json, ) if is_new or not channel.crawled_at: needs_indexing.append(channel.id) db.commit() for channel_id in needs_indexing[:10]: channel = db.query(Channel).filter_by(id=channel_id).first() if channel: _fetch_and_index_channel(db, channel) def crawl_by_search(db: Session, user_id: int): """Discover channels by searching YouTube using tags, categories, and channel names.""" # All followed channels (names + yt_ids) followed_rows = db.execute( text(""" SELECT c.name, c.youtube_channel_id FROM channels c JOIN user_channels uc ON c.id = uc.channel_id WHERE uc.user_id = :user_id AND uc.status = 'followed' """), {"user_id": user_id}, ).mappings().all() followed_yt_ids = {row["youtube_channel_id"] for row in followed_rows} followed_names = [row["name"] for row in followed_rows if row["name"]] # Top tags from followed channels' indexed videos + liked videos # SQLite requires LIMIT inside a subquery when used with UNION ALL tag_rows = db.execute( text(""" SELECT tags FROM ( SELECT v.tags FROM videos v JOIN user_channels uc ON v.channel_id = uc.channel_id WHERE uc.user_id = :user_id AND uc.status = 'followed' AND v.tags IS NOT NULL AND v.tags != '' AND v.tags != '[]' LIMIT 300 ) UNION ALL SELECT tags FROM ( SELECT v.tags FROM user_videos uv JOIN videos v ON uv.video_id = v.id WHERE uv.user_id = :user_id AND uv.liked = 1 AND v.tags IS NOT NULL AND v.tags != '' AND v.tags != '[]' LIMIT 100 ) """), {"user_id": user_id}, ).mappings().all() tag_counts: dict[str, int] = {} for row in tag_rows: try: tags = json.loads(row["tags"]) for tag in tags: if isinstance(tag, str): t = tag.lower().strip() if 3 <= len(t) <= 40: tag_counts[t] = tag_counts.get(t, 0) + 1 except (json.JSONDecodeError, TypeError): continue # Top categories as fallback cat_rows = db.execute( text(""" SELECT v.category, COUNT(*) AS cnt FROM videos v JOIN user_channels uc ON v.channel_id = uc.channel_id WHERE uc.user_id = :user_id AND uc.status = 'followed' AND v.category IS NOT NULL GROUP BY v.category ORDER BY cnt DESC LIMIT 5 """), {"user_id": user_id}, ).mappings().all() # Build query pool: top tags + random channel names + categories top_tags = [t for t, _ in sorted(tag_counts.items(), key=lambda x: -x[1])[:12]] top_cats = [r["category"] for r in cat_rows] # Random sample of followed channel names — diversifies discovery each run sampled_names: list[str] = [] if followed_names: sampled_names = random.sample(followed_names, min(15, len(followed_names))) # Combine: tags (most signal) + channel names (broad reach) + categories (fallback) queries = list(dict.fromkeys(top_tags + sampled_names + top_cats))[:25] if not queries: return neg_tags = frozenset( r["tag"] for r in db.execute( text("SELECT tag FROM user_tag_affinity WHERE user_id = :user_id AND score < -2"), {"user_id": user_id}, ).mappings().all() ) _search_and_store(db, user_id, queries, followed_yt_ids, score_multiplier=5.0, source="search", neg_affinity_tags=neg_tags) def update_community_signal(db: Session, user_id: int): """Surface channels that other users follow, weighted by follower count.""" rows = db.execute( text(""" SELECT uc.channel_id, COUNT(DISTINCT uc.user_id) AS follower_count FROM user_channels uc WHERE uc.user_id != :user_id AND uc.status = 'followed' AND uc.channel_id NOT IN ( SELECT channel_id FROM user_channels WHERE user_id = :user_id ) GROUP BY uc.channel_id ORDER BY follower_count DESC LIMIT 100 """), {"user_id": user_id}, ).mappings().all() for row in rows: _add_to_discovery( db, user_id, row["channel_id"], score=float(row["follower_count"]) * 5, source="community", ) db.commit() def update_category_clusters(db: Session, user_id: int): """Find channels in categories the user watches heavily.""" rows = db.execute( text(""" SELECT v.category, COUNT(*) AS watch_count FROM user_videos uv JOIN videos v ON uv.video_id = v.id WHERE uv.user_id = :user_id AND uv.watched = 1 AND v.category IS NOT NULL GROUP BY v.category ORDER BY watch_count DESC LIMIT 5 """), {"user_id": user_id}, ).mappings().all() top_categories = [r["category"] for r in rows] if not top_categories: return placeholders = ",".join(f"'{c}'" for c in top_categories) candidate_rows = db.execute( text(f""" SELECT DISTINCT v.channel_id FROM videos v WHERE v.category IN ({placeholders}) AND v.channel_id NOT IN ( SELECT channel_id FROM user_channels WHERE user_id = :user_id ) LIMIT 100 """), {"user_id": user_id}, ).mappings().all() for row in candidate_rows: _add_to_discovery(db, user_id, row["channel_id"], score=3.0, source="category") db.commit() def update_liked_signal(db: Session, user_id: int): """Search YouTube for channels related to topics extracted from liked videos.""" liked_rows = db.execute( text(""" SELECT v.tags FROM user_videos uv JOIN videos v ON uv.video_id = v.id WHERE uv.user_id = :user_id AND uv.liked = 1 AND v.tags IS NOT NULL AND v.tags != '' AND v.tags != '[]' """), {"user_id": user_id}, ).mappings().all() if not liked_rows: return tag_counts: dict[str, int] = {} for row in liked_rows: try: tags = json.loads(row["tags"]) for tag in tags: if isinstance(tag, str): t = tag.lower().strip() if 3 <= len(t) <= 40: tag_counts[t] = tag_counts.get(t, 0) + 2 except (json.JSONDecodeError, TypeError): pass if not tag_counts: return followed_yt_ids = set(db.execute( text(""" SELECT c.youtube_channel_id FROM channels c JOIN user_channels uc ON c.id = uc.channel_id WHERE uc.user_id = :user_id AND uc.status = 'followed' """), {"user_id": user_id}, ).scalars().all()) top_tags = [t for t, _ in sorted(tag_counts.items(), key=lambda x: -x[1])[:10]] neg_tags = frozenset( r["tag"] for r in db.execute( text("SELECT tag FROM user_tag_affinity WHERE user_id = :user_id AND score < -2"), {"user_id": user_id}, ).mappings().all() ) _search_and_store(db, user_id, top_tags, followed_yt_ids, score_multiplier=10.0, source="liked", neg_affinity_tags=neg_tags) def update_watch_signal(db: Session, user_id: int): """Discover channels from watched video topics, dampened so a single view has little effect. A tag needs to appear in at least 3 distinct watched videos before it influences discovery. Each qualifying tag contributes a modest score (×3 vs liked ×10), so watching a single Tokyo video won't flood recommendations with Tokyo content. """ rows = db.execute( text(""" SELECT v.tags FROM user_videos uv JOIN videos v ON uv.video_id = v.id WHERE uv.user_id = :user_id AND uv.watched = 1 AND v.tags IS NOT NULL AND v.tags != '' AND v.tags != '[]' """), {"user_id": user_id}, ).mappings().all() if not rows: return tag_counts: dict[str, int] = {} for row in rows: try: tags = json.loads(row["tags"]) seen = set() for tag in tags: if isinstance(tag, str): t = tag.lower().strip() if 3 <= len(t) <= 40 and t not in seen: tag_counts[t] = tag_counts.get(t, 0) + 1 seen.add(t) except (json.JSONDecodeError, TypeError): pass # Only use tags that appear across 3+ distinct watched videos qualified = {t: c for t, c in tag_counts.items() if c >= 3} if not qualified: return followed_yt_ids = set(db.execute( text(""" SELECT c.youtube_channel_id FROM channels c JOIN user_channels uc ON c.id = uc.channel_id WHERE uc.user_id = :user_id AND uc.status = 'followed' """), {"user_id": user_id}, ).scalars().all()) top_tags = [t for t, _ in sorted(qualified.items(), key=lambda x: -x[1])[:10]] neg_tags = frozenset( r["tag"] for r in db.execute( text("SELECT tag FROM user_tag_affinity WHERE user_id = :user_id AND score < -2"), {"user_id": user_id}, ).mappings().all() ) _search_and_store(db, user_id, top_tags, followed_yt_ids, score_multiplier=3.0, source="watched", neg_affinity_tags=neg_tags) def _build_user_tag_profile(db: Session, user_id: int) -> dict[str, float]: """Return tag affinity dict (positive = liked, negative = disliked/dismissed).""" rows = db.execute( text("SELECT tag, score FROM user_tag_affinity WHERE user_id = :user_id"), {"user_id": user_id}, ).mappings().all() return {row["tag"]: row["score"] for row in rows} def _tag_relevance_score(tag_profile: dict[str, float], tags_json: str | None) -> float: """Score a channel's tags against user affinity — positive means relevant, negative means disliked.""" if not tag_profile or not tags_json: return 0.0 try: tags = json.loads(tags_json) except (json.JSONDecodeError, TypeError): return 0.0 score = 0.0 for tag in tags: if isinstance(tag, str): t = tag.lower().strip() score += tag_profile.get(t, 0.0) return max(-100.0, min(score, 50.0)) def update_trending_signal(db: Session, user_id: int, regions: list[str]): """Fetch trending videos per region and score them by tag overlap with user interests.""" if not regions: return tag_profile = _build_user_tag_profile(db, user_id) followed_yt_ids = set(db.execute( text(""" SELECT c.youtube_channel_id FROM channels c JOIN user_channels uc ON c.id = uc.channel_id WHERE uc.user_id = :user_id AND uc.status = 'followed' """), {"user_id": user_id}, ).scalars().all()) dismissed_channel_ids = set(db.execute( text(""" SELECT channel_id FROM user_channels WHERE user_id = :user_id AND status = 'dismissed' """), {"user_id": user_id}, ).scalars().all()) discovered: dict[str, dict] = {} for region in regions: try: videos = ytdlp.fetch_trending(region=region, max_results=50) for video in videos: ch = video.get("channel", {}) yt_id = ch.get("youtube_channel_id") name = (ch.get("name") or "").strip() if not yt_id or not name or yt_id in followed_yt_ids: continue if yt_id not in discovered: discovered[yt_id] = {"name": name, "count": 0, "regions": set(), "previews": []} discovered[yt_id]["count"] += 1 discovered[yt_id]["regions"].add(region) previews = discovered[yt_id]["previews"] if len(previews) < 3 and video.get("thumbnail_url") and video.get("title"): previews.append({ "thumbnail_url": video["thumbnail_url"], "title": video["title"], }) except Exception: continue if not discovered: return needs_indexing: list[int] = [] for yt_id, info in discovered.items(): channel = db.query(Channel).filter_by(youtube_channel_id=yt_id).first() is_new = channel is None if not channel: channel = Channel( youtube_channel_id=yt_id, name=info["name"], description="", thumbnail_url=None, ) db.add(channel) db.flush() if channel.id in dismissed_channel_ids: continue uc = db.query(UserChannel).filter_by(user_id=user_id, channel_id=channel.id).first() if uc and uc.status in ("followed", "dismissed"): continue base_score = float(info["count"]) * 4.0 * len(info["regions"]) # Tag relevance: positive for liked content, negative for dismissed/disliked. # tag_profile comes from user_tag_affinity which tracks both signals. tag_boost = 0.0 if not is_new and channel.crawled_at: tag_rows = db.execute( text("SELECT tags FROM videos WHERE channel_id = :cid AND tags IS NOT NULL LIMIT 20"), {"cid": channel.id}, ).scalars().all() for tags_json in tag_rows: tag_boost += _tag_relevance_score(tag_profile, tags_json) final_score = base_score + tag_boost if final_score <= 0: continue preview_json = json.dumps(info["previews"]) if info["previews"] else None _add_to_discovery(db, user_id, channel.id, score=final_score, source="trending", preview_json=preview_json) if is_new or not channel.crawled_at: needs_indexing.append(channel.id) db.commit() for channel_id in needs_indexing[:10]: channel = db.query(Channel).filter_by(id=channel_id).first() if channel: _fetch_and_index_channel(db, channel) def update_graph_signal(db: Session, user_id: int): """Discover channels featured on followed channels' /channels tab. Channels that creators explicitly recommend are high-signal — they're curated by someone whose taste you already follow. Samples up to 12 followed channels per run and fetches their featured channels list in parallel. """ followed_rows = db.execute( text(""" SELECT c.youtube_channel_id, c.id FROM channels c JOIN user_channels uc ON c.id = uc.channel_id WHERE uc.user_id = :user_id AND uc.status = 'followed' AND c.youtube_channel_id IS NOT NULL """), {"user_id": user_id}, ).mappings().all() if not followed_rows: return followed_yt_ids = {row["youtube_channel_id"] for row in followed_rows} dismissed_ids = set(db.execute( text("SELECT channel_id FROM user_channels WHERE user_id = :user_id AND status = 'dismissed'"), {"user_id": user_id}, ).scalars().all()) sample = random.sample(list(followed_rows), min(12, len(followed_rows))) def _fetch(yt_id: str) -> list[str]: try: return ytdlp.fetch_featured_channels(yt_id) except Exception: return [] featured_map: dict[str, list[str]] = {} with ThreadPoolExecutor(max_workers=4) as pool: futures = {pool.submit(_fetch, row["youtube_channel_id"]): row for row in sample} for fut in as_completed(futures): row = futures[fut] featured_map[row["youtube_channel_id"]] = fut.result() needs_indexing: list[int] = [] for source_yt_id, channel_ids in featured_map.items(): for yt_id in channel_ids: if yt_id in followed_yt_ids: continue channel = db.query(Channel).filter_by(youtube_channel_id=yt_id).first() is_new = channel is None if not channel: channel = Channel(youtube_channel_id=yt_id, name="", description="", thumbnail_url=None) db.add(channel) db.flush() if channel.id in dismissed_ids: continue uc = db.query(UserChannel).filter_by(user_id=user_id, channel_id=channel.id).first() if uc and uc.status in ("followed", "dismissed"): continue _add_to_discovery(db, user_id, channel.id, score=8.0, source="graph") if is_new or not channel.crawled_at: needs_indexing.append(channel.id) db.commit() for channel_id in needs_indexing[:15]: channel = db.query(Channel).filter_by(id=channel_id).first() if channel: _fetch_and_index_channel(db, channel) def run_full_discovery(db: Session, user_id: int, regions: list[str] | None = None): if regions is None: regions = ["US", "SE"] crawl_by_search(db, user_id) update_community_signal(db, user_id) update_category_clusters(db, user_id) update_liked_signal(db, user_id) update_watch_signal(db, user_id) update_trending_signal(db, user_id, regions) update_graph_signal(db, user_id)