"""Discovery engine — search-based crawl, trending, community signal, category clustering.""" import json import random from datetime import datetime from sqlalchemy.orm import Session from sqlalchemy import text from ..models import Channel, UserChannel, DiscoveryQueue, Video from . import ytdlp def _fetch_and_index_channel(db: Session, channel: Channel): """Fetch full metadata + recent videos for a discovered channel.""" try: result = ytdlp.fetch_channel_metadata(channel.youtube_channel_id, max_videos=10) if not result: return ch_data = result.get("channel", {}) for k, v in ch_data.items(): if hasattr(channel, k) and v is not None and v != "": setattr(channel, k, v) channel.crawled_at = datetime.utcnow() videos = result.get("videos", []) # For videos missing a date (RSS didn't cover them or flat-playlist had no timestamp), # do individual fetches — capped at 3 to avoid slow-downs. dateless = [v for v in videos if not v.get("published_at")] individual_fetched: dict[str, dict] = {} for vdata in dateless[:3]: yt_id = vdata.get("youtube_video_id") if not yt_id: continue try: meta = ytdlp.fetch_video_metadata(yt_id) if meta and meta.get("published_at"): individual_fetched[yt_id] = meta except Exception: pass for vdata in videos: yt_id = vdata.get("youtube_video_id") if not yt_id: continue # Prefer individually-fetched metadata if we retrieved it if yt_id in individual_fetched: vdata = individual_fetched[yt_id] # Skip videos we still can't date — undated videos break feed ordering if not vdata.get("published_at"): continue if not db.query(Video).filter_by(youtube_video_id=yt_id).first(): db.add(Video( youtube_video_id=yt_id, channel_id=channel.id, title=vdata.get("title", ""), description=vdata.get("description"), thumbnail_url=vdata.get("thumbnail_url"), duration_seconds=vdata.get("duration_seconds"), published_at=vdata.get("published_at"), tags=vdata.get("tags"), category=vdata.get("category"), )) db.commit() except Exception: db.rollback() def _upsert_channel(db: Session, channel_data: dict) -> Channel | None: yt_id = channel_data.get("youtube_channel_id") if not yt_id: return None channel = db.query(Channel).filter_by(youtube_channel_id=yt_id).first() if not channel: channel = Channel(**channel_data) db.add(channel) db.flush() return channel def _add_to_discovery( db: Session, user_id: int, channel_id: int, score: float, source: str, preview_json: str | None = None, ): existing = db.query(DiscoveryQueue).filter_by(user_id=user_id, channel_id=channel_id).first() if existing: # Accumulate scores across sources but cap to prevent one dominant signal existing.score = existing.score + score * 0.5 if preview_json and not existing.preview_json: existing.preview_json = preview_json return db.add(DiscoveryQueue( user_id=user_id, channel_id=channel_id, score=score, source=source, preview_json=preview_json, )) def _search_and_store( db: Session, user_id: int, queries: list[str], followed_yt_ids: set[str], score_multiplier: float, source: str, neg_affinity_tags: frozenset[str] = frozenset(), ): """Run YouTube searches for the given queries and add results to discovery.""" discovered: dict[str, dict] = {} for query in queries: try: results = ytdlp.search_youtube(query, max_results=40) for video in results: ch = video.get("channel", {}) yt_id = ch.get("youtube_channel_id") name = (ch.get("name") or "").strip() if yt_id and name and yt_id not in followed_yt_ids: if yt_id not in discovered: discovered[yt_id] = {"name": name, "count": 0, "previews": []} discovered[yt_id]["count"] += 1 previews = discovered[yt_id]["previews"] if len(previews) < 3 and video.get("thumbnail_url") and video.get("title"): previews.append({ "thumbnail_url": video["thumbnail_url"], "title": video["title"], }) except Exception: continue if not discovered: return candidates = sorted(discovered.items(), key=lambda x: -x[1]["count"]) needs_indexing: list[int] = [] for yt_id, info in candidates: channel = db.query(Channel).filter_by(youtube_channel_id=yt_id).first() is_new = channel is None if not channel: channel = Channel( youtube_channel_id=yt_id, name=info["name"], description="", thumbnail_url=None, ) db.add(channel) db.flush() uc = db.query(UserChannel).filter_by(user_id=user_id, channel_id=channel.id).first() if uc and uc.status in ("followed", "dismissed"): continue # Skip channels whose indexed videos heavily overlap with negatively-rated tags if neg_affinity_tags and not is_new and channel.crawled_at: neg_hit = 0 vtags = db.execute( text("SELECT tags FROM videos WHERE channel_id = :cid AND tags IS NOT NULL LIMIT 20"), {"cid": channel.id}, ).scalars().all() for tags_json in vtags: try: for tag in json.loads(tags_json or "[]"): if isinstance(tag, str) and tag.lower().strip() in neg_affinity_tags: neg_hit += 1 except (json.JSONDecodeError, TypeError): pass if neg_hit >= 3: continue preview_json = json.dumps(info["previews"]) if info["previews"] else None _add_to_discovery( db, user_id, channel.id, score=float(info["count"]) * score_multiplier, source=source, preview_json=preview_json, ) if is_new or not channel.crawled_at: needs_indexing.append(channel.id) db.commit() for channel_id in needs_indexing[:10]: channel = db.query(Channel).filter_by(id=channel_id).first() if channel: _fetch_and_index_channel(db, channel) def crawl_by_search(db: Session, user_id: int): """Discover channels by searching YouTube using tags, categories, and channel names.""" # All followed channels (names + yt_ids) followed_rows = db.execute( text(""" SELECT c.name, c.youtube_channel_id FROM channels c JOIN user_channels uc ON c.id = uc.channel_id WHERE uc.user_id = :user_id AND uc.status = 'followed' """), {"user_id": user_id}, ).mappings().all() followed_yt_ids = {row["youtube_channel_id"] for row in followed_rows} followed_names = [row["name"] for row in followed_rows if row["name"]] # Top tags from followed channels' indexed videos + liked videos # SQLite requires LIMIT inside a subquery when used with UNION ALL tag_rows = db.execute( text(""" SELECT tags FROM ( SELECT v.tags FROM videos v JOIN user_channels uc ON v.channel_id = uc.channel_id WHERE uc.user_id = :user_id AND uc.status = 'followed' AND v.tags IS NOT NULL AND v.tags != '' AND v.tags != '[]' LIMIT 300 ) UNION ALL SELECT tags FROM ( SELECT v.tags FROM user_videos uv JOIN videos v ON uv.video_id = v.id WHERE uv.user_id = :user_id AND uv.liked = 1 AND v.tags IS NOT NULL AND v.tags != '' AND v.tags != '[]' LIMIT 100 ) """), {"user_id": user_id}, ).mappings().all() tag_counts: dict[str, int] = {} for row in tag_rows: try: tags = json.loads(row["tags"]) for tag in tags: if isinstance(tag, str): t = tag.lower().strip() if 3 <= len(t) <= 40: tag_counts[t] = tag_counts.get(t, 0) + 1 except (json.JSONDecodeError, TypeError): continue # Top categories as fallback cat_rows = db.execute( text(""" SELECT v.category, COUNT(*) AS cnt FROM videos v JOIN user_channels uc ON v.channel_id = uc.channel_id WHERE uc.user_id = :user_id AND uc.status = 'followed' AND v.category IS NOT NULL GROUP BY v.category ORDER BY cnt DESC LIMIT 5 """), {"user_id": user_id}, ).mappings().all() # Build query pool: top tags + random channel names + categories top_tags = [t for t, _ in sorted(tag_counts.items(), key=lambda x: -x[1])[:12]] top_cats = [r["category"] for r in cat_rows] # Random sample of followed channel names — diversifies discovery each run sampled_names: list[str] = [] if followed_names: sampled_names = random.sample(followed_names, min(15, len(followed_names))) # Combine: tags (most signal) + channel names (broad reach) + categories (fallback) queries = list(dict.fromkeys(top_tags + sampled_names + top_cats))[:25] if not queries: return neg_tags = frozenset( r["tag"] for r in db.execute( text("SELECT tag FROM user_tag_affinity WHERE user_id = :user_id AND score < -2"), {"user_id": user_id}, ).mappings().all() ) _search_and_store(db, user_id, queries, followed_yt_ids, score_multiplier=5.0, source="search", neg_affinity_tags=neg_tags) def update_community_signal(db: Session, user_id: int): """Surface channels that other users follow, weighted by follower count.""" rows = db.execute( text(""" SELECT uc.channel_id, COUNT(DISTINCT uc.user_id) AS follower_count FROM user_channels uc WHERE uc.user_id != :user_id AND uc.status = 'followed' AND uc.channel_id NOT IN ( SELECT channel_id FROM user_channels WHERE user_id = :user_id ) GROUP BY uc.channel_id ORDER BY follower_count DESC LIMIT 100 """), {"user_id": user_id}, ).mappings().all() for row in rows: _add_to_discovery( db, user_id, row["channel_id"], score=float(row["follower_count"]) * 5, source="community", ) db.commit() def update_category_clusters(db: Session, user_id: int): """Find channels in categories the user watches heavily.""" rows = db.execute( text(""" SELECT v.category, COUNT(*) AS watch_count FROM user_videos uv JOIN videos v ON uv.video_id = v.id WHERE uv.user_id = :user_id AND uv.watched = 1 AND v.category IS NOT NULL GROUP BY v.category ORDER BY watch_count DESC LIMIT 5 """), {"user_id": user_id}, ).mappings().all() top_categories = [r["category"] for r in rows] if not top_categories: return placeholders = ",".join(f"'{c}'" for c in top_categories) candidate_rows = db.execute( text(f""" SELECT DISTINCT v.channel_id FROM videos v WHERE v.category IN ({placeholders}) AND v.channel_id NOT IN ( SELECT channel_id FROM user_channels WHERE user_id = :user_id ) LIMIT 100 """), {"user_id": user_id}, ).mappings().all() for row in candidate_rows: _add_to_discovery(db, user_id, row["channel_id"], score=3.0, source="category") db.commit() def update_liked_signal(db: Session, user_id: int): """Search YouTube for channels related to topics extracted from liked videos.""" liked_rows = db.execute( text(""" SELECT v.tags FROM user_videos uv JOIN videos v ON uv.video_id = v.id WHERE uv.user_id = :user_id AND uv.liked = 1 AND v.tags IS NOT NULL AND v.tags != '' AND v.tags != '[]' """), {"user_id": user_id}, ).mappings().all() if not liked_rows: return tag_counts: dict[str, int] = {} for row in liked_rows: try: tags = json.loads(row["tags"]) for tag in tags: if isinstance(tag, str): t = tag.lower().strip() if 3 <= len(t) <= 40: tag_counts[t] = tag_counts.get(t, 0) + 2 except (json.JSONDecodeError, TypeError): pass if not tag_counts: return followed_yt_ids = set(db.execute( text(""" SELECT c.youtube_channel_id FROM channels c JOIN user_channels uc ON c.id = uc.channel_id WHERE uc.user_id = :user_id AND uc.status = 'followed' """), {"user_id": user_id}, ).scalars().all()) top_tags = [t for t, _ in sorted(tag_counts.items(), key=lambda x: -x[1])[:10]] neg_tags = frozenset( r["tag"] for r in db.execute( text("SELECT tag FROM user_tag_affinity WHERE user_id = :user_id AND score < -2"), {"user_id": user_id}, ).mappings().all() ) _search_and_store(db, user_id, top_tags, followed_yt_ids, score_multiplier=10.0, source="liked", neg_affinity_tags=neg_tags) def update_watch_signal(db: Session, user_id: int): """Discover channels from watched video topics, dampened so a single view has little effect. A tag needs to appear in at least 3 distinct watched videos before it influences discovery. Each qualifying tag contributes a modest score (×3 vs liked ×10), so watching a single Tokyo video won't flood recommendations with Tokyo content. """ rows = db.execute( text(""" SELECT v.tags FROM user_videos uv JOIN videos v ON uv.video_id = v.id WHERE uv.user_id = :user_id AND uv.watched = 1 AND v.tags IS NOT NULL AND v.tags != '' AND v.tags != '[]' """), {"user_id": user_id}, ).mappings().all() if not rows: return tag_counts: dict[str, int] = {} for row in rows: try: tags = json.loads(row["tags"]) seen = set() for tag in tags: if isinstance(tag, str): t = tag.lower().strip() if 3 <= len(t) <= 40 and t not in seen: tag_counts[t] = tag_counts.get(t, 0) + 1 seen.add(t) except (json.JSONDecodeError, TypeError): pass # Only use tags that appear across 3+ distinct watched videos qualified = {t: c for t, c in tag_counts.items() if c >= 3} if not qualified: return followed_yt_ids = set(db.execute( text(""" SELECT c.youtube_channel_id FROM channels c JOIN user_channels uc ON c.id = uc.channel_id WHERE uc.user_id = :user_id AND uc.status = 'followed' """), {"user_id": user_id}, ).scalars().all()) top_tags = [t for t, _ in sorted(qualified.items(), key=lambda x: -x[1])[:10]] neg_tags = frozenset( r["tag"] for r in db.execute( text("SELECT tag FROM user_tag_affinity WHERE user_id = :user_id AND score < -2"), {"user_id": user_id}, ).mappings().all() ) _search_and_store(db, user_id, top_tags, followed_yt_ids, score_multiplier=3.0, source="watched", neg_affinity_tags=neg_tags) def _build_user_tag_profile(db: Session, user_id: int) -> dict[str, float]: """Return tag affinity dict (positive = liked, negative = disliked/dismissed).""" rows = db.execute( text("SELECT tag, score FROM user_tag_affinity WHERE user_id = :user_id"), {"user_id": user_id}, ).mappings().all() return {row["tag"]: row["score"] for row in rows} def _tag_relevance_score(tag_profile: dict[str, float], tags_json: str | None) -> float: """Score a channel's tags against user affinity — positive means relevant, negative means disliked.""" if not tag_profile or not tags_json: return 0.0 try: tags = json.loads(tags_json) except (json.JSONDecodeError, TypeError): return 0.0 score = 0.0 for tag in tags: if isinstance(tag, str): t = tag.lower().strip() score += tag_profile.get(t, 0.0) return max(-100.0, min(score, 50.0)) def update_trending_signal(db: Session, user_id: int, regions: list[str]): """Fetch trending videos per region and score them by tag overlap with user interests.""" if not regions: return tag_profile = _build_user_tag_profile(db, user_id) followed_yt_ids = set(db.execute( text(""" SELECT c.youtube_channel_id FROM channels c JOIN user_channels uc ON c.id = uc.channel_id WHERE uc.user_id = :user_id AND uc.status = 'followed' """), {"user_id": user_id}, ).scalars().all()) dismissed_channel_ids = set(db.execute( text(""" SELECT channel_id FROM user_channels WHERE user_id = :user_id AND status = 'dismissed' """), {"user_id": user_id}, ).scalars().all()) discovered: dict[str, dict] = {} for region in regions: try: videos = ytdlp.fetch_trending(region=region, max_results=50) for video in videos: ch = video.get("channel", {}) yt_id = ch.get("youtube_channel_id") name = (ch.get("name") or "").strip() if not yt_id or not name or yt_id in followed_yt_ids: continue if yt_id not in discovered: discovered[yt_id] = {"name": name, "count": 0, "regions": set(), "previews": []} discovered[yt_id]["count"] += 1 discovered[yt_id]["regions"].add(region) previews = discovered[yt_id]["previews"] if len(previews) < 3 and video.get("thumbnail_url") and video.get("title"): previews.append({ "thumbnail_url": video["thumbnail_url"], "title": video["title"], }) except Exception: continue if not discovered: return needs_indexing: list[int] = [] for yt_id, info in discovered.items(): channel = db.query(Channel).filter_by(youtube_channel_id=yt_id).first() is_new = channel is None if not channel: channel = Channel( youtube_channel_id=yt_id, name=info["name"], description="", thumbnail_url=None, ) db.add(channel) db.flush() if channel.id in dismissed_channel_ids: continue uc = db.query(UserChannel).filter_by(user_id=user_id, channel_id=channel.id).first() if uc and uc.status in ("followed", "dismissed"): continue base_score = float(info["count"]) * 4.0 * len(info["regions"]) # Tag relevance: positive for liked content, negative for dismissed/disliked. # tag_profile comes from user_tag_affinity which tracks both signals. tag_boost = 0.0 if not is_new and channel.crawled_at: tag_rows = db.execute( text("SELECT tags FROM videos WHERE channel_id = :cid AND tags IS NOT NULL LIMIT 20"), {"cid": channel.id}, ).scalars().all() for tags_json in tag_rows: tag_boost += _tag_relevance_score(tag_profile, tags_json) final_score = base_score + tag_boost if final_score <= 0: continue preview_json = json.dumps(info["previews"]) if info["previews"] else None _add_to_discovery(db, user_id, channel.id, score=final_score, source="trending", preview_json=preview_json) if is_new or not channel.crawled_at: needs_indexing.append(channel.id) db.commit() for channel_id in needs_indexing[:10]: channel = db.query(Channel).filter_by(id=channel_id).first() if channel: _fetch_and_index_channel(db, channel) def run_full_discovery(db: Session, user_id: int, regions: list[str] | None = None): if regions is None: regions = ["US", "SE"] crawl_by_search(db, user_id) update_community_signal(db, user_id) update_category_clusters(db, user_id) update_liked_signal(db, user_id) update_watch_signal(db, user_id) update_trending_signal(db, user_id, regions)