"""Discovery engine — search-based crawl, trending, community signal, category clustering.""" import json import queue as _queue import random import threading as _threading import time as _time from datetime import datetime from sqlalchemy.orm import Session from sqlalchemy import text from ..models import Channel, UserChannel, DiscoveryQueue, Video from . import ytdlp # --------------------------------------------------------------------------- # Background task queue — spaces yt-dlp calls 30-90 s apart and shuffles # call types so we don't fire 10 searches in a row. # --------------------------------------------------------------------------- _task_queue: _queue.Queue = _queue.Queue() _progress: dict[int, dict] = {} # user_id -> {total, done, running} _progress_lock = _threading.Lock() _worker_started = False _worker_lock = _threading.Lock() def _fetch_and_index_channel(db: Session, channel: Channel): """Fetch metadata + recent videos for a discovered channel (one yt-dlp call only).""" try: result = ytdlp.fetch_channel_metadata(channel.youtube_channel_id, max_videos=10, polite=True) if not result: return ch_data = result.get("channel", {}) for k, v in ch_data.items(): if hasattr(channel, k) and v is not None and v != "": setattr(channel, k, v) channel.crawled_at = datetime.utcnow() for vdata in result.get("videos", []): yt_id = vdata.get("youtube_video_id") if not yt_id or not vdata.get("published_at"): continue if not db.query(Video).filter_by(youtube_video_id=yt_id).first(): db.add(Video( youtube_video_id=yt_id, channel_id=channel.id, title=vdata.get("title", ""), description=vdata.get("description"), thumbnail_url=vdata.get("thumbnail_url"), duration_seconds=vdata.get("duration_seconds"), published_at=vdata.get("published_at"), tags=vdata.get("tags"), category=vdata.get("category"), )) db.commit() except Exception: db.rollback() def _upsert_channel(db: Session, channel_data: dict) -> Channel | None: yt_id = channel_data.get("youtube_channel_id") if not yt_id: return None channel = db.query(Channel).filter_by(youtube_channel_id=yt_id).first() if not channel: channel = Channel(**channel_data) db.add(channel) db.flush() return channel _MAX_DISCOVERY_SCORE = 50.0 def _add_to_discovery( db: Session, user_id: int, channel_id: int, score: float, source: str, preview_json: str | None = None, ): score = min(score, _MAX_DISCOVERY_SCORE) existing = db.query(DiscoveryQueue).filter_by(user_id=user_id, channel_id=channel_id).first() if existing: # Accumulate across sources but cap so no single signal dominates forever existing.score = min(existing.score + score * 0.5, _MAX_DISCOVERY_SCORE) if preview_json and not existing.preview_json: existing.preview_json = preview_json return db.add(DiscoveryQueue( user_id=user_id, channel_id=channel_id, score=score, source=source, preview_json=preview_json, )) def _search_and_store( db: Session, user_id: int, queries: list[str], followed_yt_ids: set[str], score_multiplier: float, source: str, neg_affinity_tags: frozenset[str] = frozenset(), ): """Run YouTube searches for the given queries and add results to discovery.""" discovered: dict[str, dict] = {} for query in queries: try: results = ytdlp.search_youtube(query, max_results=40, polite=True) except Exception: results = [] for video in results: ch = video.get("channel", {}) yt_id = ch.get("youtube_channel_id") name = (ch.get("name") or "").strip() if yt_id and name and yt_id not in followed_yt_ids: if yt_id not in discovered: discovered[yt_id] = {"name": name, "count": 0, "previews": []} discovered[yt_id]["count"] += 1 previews = discovered[yt_id]["previews"] if len(previews) < 3 and video.get("thumbnail_url") and video.get("title"): previews.append({ "thumbnail_url": video["thumbnail_url"], "title": video["title"], }) if not discovered: return candidates = sorted(discovered.items(), key=lambda x: -x[1]["count"]) needs_indexing: list[int] = [] for yt_id, info in candidates: channel = db.query(Channel).filter_by(youtube_channel_id=yt_id).first() is_new = channel is None if not channel: channel = Channel( youtube_channel_id=yt_id, name=info["name"], description="", thumbnail_url=None, ) db.add(channel) db.flush() uc = db.query(UserChannel).filter_by(user_id=user_id, channel_id=channel.id).first() if uc and uc.status in ("followed", "dismissed"): continue # Skip channels whose indexed videos heavily overlap with negatively-rated tags if neg_affinity_tags and not is_new and channel.crawled_at: neg_hit = 0 vtags = db.execute( text("SELECT tags FROM videos WHERE channel_id = :cid AND tags IS NOT NULL LIMIT 20"), {"cid": channel.id}, ).scalars().all() for tags_json in vtags: try: for tag in json.loads(tags_json or "[]"): if isinstance(tag, str) and tag.lower().strip() in neg_affinity_tags: neg_hit += 1 except (json.JSONDecodeError, TypeError): pass if neg_hit >= 3: continue preview_json = json.dumps(info["previews"]) if info["previews"] else None _add_to_discovery( db, user_id, channel.id, score=float(info["count"]) * score_multiplier, source=source, preview_json=preview_json, ) if is_new or not channel.crawled_at: needs_indexing.append(channel.id) db.commit() # Queue channel indexing as separate worker tasks (30-90 s gaps apply). for channel_id in needs_indexing[:3]: _task_queue.put((user_id, lambda cid=channel_id: _do_task_index_channel(user_id, cid))) def crawl_by_search(db: Session, user_id: int): """Discover channels by searching YouTube using tags, categories, and channel names.""" # All followed channels (names + yt_ids) followed_rows = db.execute( text(""" SELECT c.name, c.youtube_channel_id FROM channels c JOIN user_channels uc ON c.id = uc.channel_id WHERE uc.user_id = :user_id AND uc.status = 'followed' """), {"user_id": user_id}, ).mappings().all() followed_yt_ids = {row["youtube_channel_id"] for row in followed_rows} followed_names = [row["name"] for row in followed_rows if row["name"]] # Top tags from followed channels' indexed videos + liked videos # SQLite requires LIMIT inside a subquery when used with UNION ALL tag_rows = db.execute( text(""" SELECT tags FROM ( SELECT v.tags FROM videos v JOIN user_channels uc ON v.channel_id = uc.channel_id WHERE uc.user_id = :user_id AND uc.status = 'followed' AND v.tags IS NOT NULL AND v.tags != '' AND v.tags != '[]' LIMIT 300 ) UNION ALL SELECT tags FROM ( SELECT v.tags FROM user_videos uv JOIN videos v ON uv.video_id = v.id WHERE uv.user_id = :user_id AND uv.liked = 1 AND v.tags IS NOT NULL AND v.tags != '' AND v.tags != '[]' LIMIT 100 ) """), {"user_id": user_id}, ).mappings().all() tag_counts: dict[str, int] = {} for row in tag_rows: try: tags = json.loads(row["tags"]) for tag in tags: if isinstance(tag, str): t = tag.lower().strip() if 3 <= len(t) <= 40: tag_counts[t] = tag_counts.get(t, 0) + 1 except (json.JSONDecodeError, TypeError): continue # Top categories as fallback cat_rows = db.execute( text(""" SELECT v.category, COUNT(*) AS cnt FROM videos v JOIN user_channels uc ON v.channel_id = uc.channel_id WHERE uc.user_id = :user_id AND uc.status = 'followed' AND v.category IS NOT NULL GROUP BY v.category ORDER BY cnt DESC LIMIT 5 """), {"user_id": user_id}, ).mappings().all() # Keep the query count low — each query is a separate yt-dlp subprocess # (its own HTTP session). Too many back-to-back sessions look like a bot. top_tags = [t for t, _ in sorted(tag_counts.items(), key=lambda x: -x[1])[:5]] top_cats = [r["category"] for r in cat_rows] # A few randomly-sampled channel names — diversifies results each run sampled_names: list[str] = [] if followed_names: sampled_names = random.sample(followed_names, min(4, len(followed_names))) # One serendipity query to surface content outside the user's direct tag space serendipity = [f"best {top_cats[0]} channels"] if top_cats else [] # Total target: ≤10 queries queries = list(dict.fromkeys(top_tags + sampled_names + serendipity + top_cats[:2]))[:10] if not queries: return neg_tags = frozenset( r["tag"] for r in db.execute( text("SELECT tag FROM user_tag_affinity WHERE user_id = :user_id AND score < -2"), {"user_id": user_id}, ).mappings().all() ) _search_and_store(db, user_id, queries, followed_yt_ids, score_multiplier=5.0, source="search", neg_affinity_tags=neg_tags) def update_community_signal(db: Session, user_id: int): """Surface channels that other users follow, weighted by follower count.""" rows = db.execute( text(""" SELECT uc.channel_id, COUNT(DISTINCT uc.user_id) AS follower_count FROM user_channels uc WHERE uc.user_id != :user_id AND uc.status = 'followed' AND uc.channel_id NOT IN ( SELECT channel_id FROM user_channels WHERE user_id = :user_id ) GROUP BY uc.channel_id ORDER BY follower_count DESC LIMIT 100 """), {"user_id": user_id}, ).mappings().all() for row in rows: _add_to_discovery( db, user_id, row["channel_id"], score=float(row["follower_count"]) * 5, source="community", ) db.commit() def update_category_clusters(db: Session, user_id: int): """Find channels in categories the user watches heavily.""" rows = db.execute( text(""" SELECT v.category, COUNT(*) AS watch_count FROM user_videos uv JOIN videos v ON uv.video_id = v.id WHERE uv.user_id = :user_id AND uv.watched = 1 AND v.category IS NOT NULL GROUP BY v.category ORDER BY watch_count DESC LIMIT 5 """), {"user_id": user_id}, ).mappings().all() top_categories = [r["category"] for r in rows] if not top_categories: return # Use JSON_EACH / parameterized IN via repeated queries to avoid SQL injection candidate_channel_ids: set[int] = set() for cat in top_categories: cat_rows = db.execute( text(""" SELECT DISTINCT v.channel_id FROM videos v WHERE v.category = :cat AND v.channel_id NOT IN ( SELECT channel_id FROM user_channels WHERE user_id = :user_id ) LIMIT 50 """), {"cat": cat, "user_id": user_id}, ).mappings().all() for row in cat_rows: candidate_channel_ids.add(row["channel_id"]) for channel_id in candidate_channel_ids: _add_to_discovery(db, user_id, channel_id, score=5.0, source="category") db.commit() def update_liked_signal(db: Session, user_id: int): """Search YouTube for channels related to topics extracted from liked videos.""" liked_rows = db.execute( text(""" SELECT v.tags FROM user_videos uv JOIN videos v ON uv.video_id = v.id WHERE uv.user_id = :user_id AND uv.liked = 1 AND v.tags IS NOT NULL AND v.tags != '' AND v.tags != '[]' """), {"user_id": user_id}, ).mappings().all() if not liked_rows: return tag_counts: dict[str, int] = {} for row in liked_rows: try: tags = json.loads(row["tags"]) for tag in tags: if isinstance(tag, str): t = tag.lower().strip() if 3 <= len(t) <= 40: tag_counts[t] = tag_counts.get(t, 0) + 2 except (json.JSONDecodeError, TypeError): pass if not tag_counts: return followed_yt_ids = set(db.execute( text(""" SELECT c.youtube_channel_id FROM channels c JOIN user_channels uc ON c.id = uc.channel_id WHERE uc.user_id = :user_id AND uc.status = 'followed' """), {"user_id": user_id}, ).scalars().all()) top_tags = [t for t, _ in sorted(tag_counts.items(), key=lambda x: -x[1])[:4]] neg_tags = frozenset( r["tag"] for r in db.execute( text("SELECT tag FROM user_tag_affinity WHERE user_id = :user_id AND score < -2"), {"user_id": user_id}, ).mappings().all() ) _search_and_store(db, user_id, top_tags, followed_yt_ids, score_multiplier=10.0, source="liked", neg_affinity_tags=neg_tags) def update_watch_signal(db: Session, user_id: int): """Discover channels from watched video topics, dampened so a single view has little effect. A tag needs to appear in at least 3 distinct watched videos before it influences discovery. Each qualifying tag contributes a modest score (×3 vs liked ×10), so watching a single Tokyo video won't flood recommendations with Tokyo content. """ rows = db.execute( text(""" SELECT v.tags FROM user_videos uv JOIN videos v ON uv.video_id = v.id WHERE uv.user_id = :user_id AND uv.watched = 1 AND v.tags IS NOT NULL AND v.tags != '' AND v.tags != '[]' """), {"user_id": user_id}, ).mappings().all() if not rows: return tag_counts: dict[str, int] = {} for row in rows: try: tags = json.loads(row["tags"]) seen = set() for tag in tags: if isinstance(tag, str): t = tag.lower().strip() if 3 <= len(t) <= 40 and t not in seen: tag_counts[t] = tag_counts.get(t, 0) + 1 seen.add(t) except (json.JSONDecodeError, TypeError): pass # Only use tags that appear across 3+ distinct watched videos qualified = {t: c for t, c in tag_counts.items() if c >= 3} if not qualified: return followed_yt_ids = set(db.execute( text(""" SELECT c.youtube_channel_id FROM channels c JOIN user_channels uc ON c.id = uc.channel_id WHERE uc.user_id = :user_id AND uc.status = 'followed' """), {"user_id": user_id}, ).scalars().all()) top_tags = [t for t, _ in sorted(qualified.items(), key=lambda x: -x[1])[:10]] neg_tags = frozenset( r["tag"] for r in db.execute( text("SELECT tag FROM user_tag_affinity WHERE user_id = :user_id AND score < -2"), {"user_id": user_id}, ).mappings().all() ) _search_and_store(db, user_id, top_tags, followed_yt_ids, score_multiplier=3.0, source="watched", neg_affinity_tags=neg_tags) def _build_user_tag_profile(db: Session, user_id: int) -> dict[str, float]: """Return tag affinity dict (positive = liked, negative = disliked/dismissed).""" rows = db.execute( text("SELECT tag, score FROM user_tag_affinity WHERE user_id = :user_id"), {"user_id": user_id}, ).mappings().all() return {row["tag"]: row["score"] for row in rows} def _tag_relevance_score(tag_profile: dict[str, float], tags_json: str | None) -> float: """Score a channel's tags against user affinity — positive means relevant, negative means disliked.""" if not tag_profile or not tags_json: return 0.0 try: tags = json.loads(tags_json) except (json.JSONDecodeError, TypeError): return 0.0 score = 0.0 for tag in tags: if isinstance(tag, str): t = tag.lower().strip() score += tag_profile.get(t, 0.0) return max(-100.0, min(score, 50.0)) def update_trending_signal(db: Session, user_id: int, regions: list[str]): """Fetch trending videos per region and score them by tag overlap with user interests.""" if not regions: return tag_profile = _build_user_tag_profile(db, user_id) followed_yt_ids = set(db.execute( text(""" SELECT c.youtube_channel_id FROM channels c JOIN user_channels uc ON c.id = uc.channel_id WHERE uc.user_id = :user_id AND uc.status = 'followed' """), {"user_id": user_id}, ).scalars().all()) dismissed_channel_ids = set(db.execute( text(""" SELECT channel_id FROM user_channels WHERE user_id = :user_id AND status = 'dismissed' """), {"user_id": user_id}, ).scalars().all()) discovered: dict[str, dict] = {} for region in regions: try: videos = ytdlp.fetch_trending(region=region, max_results=50) for video in videos: ch = video.get("channel", {}) yt_id = ch.get("youtube_channel_id") name = (ch.get("name") or "").strip() if not yt_id or not name or yt_id in followed_yt_ids: continue if yt_id not in discovered: discovered[yt_id] = {"name": name, "count": 0, "regions": set(), "previews": []} discovered[yt_id]["count"] += 1 discovered[yt_id]["regions"].add(region) previews = discovered[yt_id]["previews"] if len(previews) < 3 and video.get("thumbnail_url") and video.get("title"): previews.append({ "thumbnail_url": video["thumbnail_url"], "title": video["title"], }) except Exception: continue if not discovered: return needs_indexing: list[int] = [] for yt_id, info in discovered.items(): channel = db.query(Channel).filter_by(youtube_channel_id=yt_id).first() is_new = channel is None if not channel: channel = Channel( youtube_channel_id=yt_id, name=info["name"], description="", thumbnail_url=None, ) db.add(channel) db.flush() if channel.id in dismissed_channel_ids: continue uc = db.query(UserChannel).filter_by(user_id=user_id, channel_id=channel.id).first() if uc and uc.status in ("followed", "dismissed"): continue # Cap base_score so a viral trending channel can't dominate the whole queue. # count × 4.0 × regions can reach 300+ without this cap. base_score = min(float(info["count"]) * 4.0 * len(info["regions"]), 18.0) # Tag relevance: positive for liked content, negative for dismissed/disliked. # tag_profile comes from user_tag_affinity which tracks both signals. tag_boost = 0.0 if not is_new and channel.crawled_at: tag_rows = db.execute( text("SELECT tags FROM videos WHERE channel_id = :cid AND tags IS NOT NULL LIMIT 20"), {"cid": channel.id}, ).scalars().all() for tags_json in tag_rows: tag_boost += _tag_relevance_score(tag_profile, tags_json) final_score = min(base_score + tag_boost, 25.0) if final_score <= 0: continue preview_json = json.dumps(info["previews"]) if info["previews"] else None _add_to_discovery(db, user_id, channel.id, score=final_score, source="trending", preview_json=preview_json) if is_new or not channel.crawled_at: needs_indexing.append(channel.id) db.commit() def update_graph_signal(db: Session, user_id: int): """Discover channels featured on followed channels' /channels tab. Channels that creators explicitly recommend are high-signal — they're curated by someone whose taste you already follow. Samples up to 12 followed channels per run and fetches their featured channels list in parallel. """ followed_rows = db.execute( text(""" SELECT c.youtube_channel_id, c.id FROM channels c JOIN user_channels uc ON c.id = uc.channel_id WHERE uc.user_id = :user_id AND uc.status = 'followed' AND c.youtube_channel_id IS NOT NULL """), {"user_id": user_id}, ).mappings().all() if not followed_rows: return followed_yt_ids = {row["youtube_channel_id"] for row in followed_rows} dismissed_ids = set(db.execute( text("SELECT channel_id FROM user_channels WHERE user_id = :user_id AND status = 'dismissed'"), {"user_id": user_id}, ).scalars().all()) sample = random.sample(list(followed_rows), min(6, len(followed_rows))) featured_map: dict[str, list[str]] = {} for row in sample: try: featured_map[row["youtube_channel_id"]] = ytdlp.fetch_featured_channels(row["youtube_channel_id"]) except Exception: featured_map[row["youtube_channel_id"]] = [] needs_indexing: list[int] = [] for source_yt_id, channel_ids in featured_map.items(): for yt_id in channel_ids: if yt_id in followed_yt_ids: continue channel = db.query(Channel).filter_by(youtube_channel_id=yt_id).first() is_new = channel is None if not channel: channel = Channel(youtube_channel_id=yt_id, name="", description="", thumbnail_url=None) db.add(channel) db.flush() if channel.id in dismissed_ids: continue uc = db.query(UserChannel).filter_by(user_id=user_id, channel_id=channel.id).first() if uc and uc.status in ("followed", "dismissed"): continue _add_to_discovery(db, user_id, channel.id, score=8.0, source="graph") if is_new or not channel.crawled_at: needs_indexing.append(channel.id) db.commit() def run_full_discovery(db: Session, user_id: int, regions: list[str] | None = None): if regions is None: regions = ["US", "SE"] # Expire unseen entries older than 14 days so stale high-score channels # don't block fresh results forever. db.execute( text(""" DELETE FROM discovery_queue WHERE user_id = :user_id AND seen = 0 AND created_at <= datetime('now', '-14 days') """), {"user_id": user_id}, ) db.commit() crawl_by_search(db, user_id) # ~10 yt-dlp calls update_community_signal(db, user_id) # no yt-dlp update_category_clusters(db, user_id) # no yt-dlp update_liked_signal(db, user_id) # ~4 yt-dlp calls # update_watch_signal skipped — tags already included in crawl_by_search update_trending_signal(db, user_id, regions[:1]) # 1 yt-dlp call (first region only) update_graph_signal(db, user_id) # ~6 yt-dlp calls # --------------------------------------------------------------------------- # Queue-based gradual discovery — each yt-dlp call is its own task, shuffled # so call types are mixed, with 30-90 s gaps between them. # --------------------------------------------------------------------------- def _get_followed_yt_ids(db: Session, user_id: int) -> set[str]: return set(db.execute( text(""" SELECT c.youtube_channel_id FROM channels c JOIN user_channels uc ON c.id = uc.channel_id WHERE uc.user_id = :uid AND uc.status = 'followed' """), {"uid": user_id}, ).scalars().all()) def _get_neg_tags(db: Session, user_id: int) -> frozenset[str]: return frozenset(db.execute( text("SELECT tag FROM user_tag_affinity WHERE user_id = :uid AND score < -2"), {"uid": user_id}, ).scalars().all()) def _stamp_last_run(user_id: int): from ..database import SessionLocal from sqlalchemy import text as _text db = SessionLocal() try: db.execute( _text("UPDATE user_settings SET last_discovery_run = :now WHERE user_id = :uid"), {"now": datetime.utcnow(), "uid": user_id}, ) db.commit() except Exception: db.rollback() finally: db.close() def _do_task_search(user_id: int, query: str, source: str, score_multiplier: float): from ..database import SessionLocal db = SessionLocal() try: followed_yt_ids = _get_followed_yt_ids(db, user_id) neg_tags = _get_neg_tags(db, user_id) _search_and_store(db, user_id, [query], followed_yt_ids, score_multiplier, source, neg_tags) finally: db.close() def _do_task_trending(user_id: int, region: str): from ..database import SessionLocal db = SessionLocal() try: update_trending_signal(db, user_id, [region]) finally: db.close() def _fetch_graph_for_channel(db: Session, user_id: int, source_yt_id: str): """Fetch featured channels for one followed channel and add to discovery queue.""" followed_yt_ids = _get_followed_yt_ids(db, user_id) dismissed_ids = set(db.execute( text("SELECT channel_id FROM user_channels WHERE user_id = :uid AND status = 'dismissed'"), {"uid": user_id}, ).scalars().all()) try: featured = ytdlp.fetch_featured_channels(source_yt_id) except Exception: return needs_indexing: list[int] = [] for yt_id in featured: if yt_id in followed_yt_ids: continue channel = db.query(Channel).filter_by(youtube_channel_id=yt_id).first() is_new = channel is None if not channel: channel = Channel(youtube_channel_id=yt_id, name="", description="", thumbnail_url=None) db.add(channel) db.flush() if channel.id in dismissed_ids: continue uc = db.query(UserChannel).filter_by(user_id=user_id, channel_id=channel.id).first() if uc and uc.status in ("followed", "dismissed"): continue _add_to_discovery(db, user_id, channel.id, score=8.0, source="graph") if is_new or not channel.crawled_at: needs_indexing.append(channel.id) db.commit() for channel_id in needs_indexing[:2]: _task_queue.put((user_id, lambda cid=channel_id: _do_task_index_channel(user_id, cid))) def _do_task_graph(user_id: int, source_yt_id: str): from ..database import SessionLocal db = SessionLocal() try: _fetch_graph_for_channel(db, user_id, source_yt_id) finally: db.close() def _do_task_index_channel(user_id: int, channel_id: int): """Index one newly-discovered channel (one yt-dlp call). Queued as a separate worker task so the 30-90 s gap applies rather than bursting inline.""" from ..database import SessionLocal db = SessionLocal() try: channel = db.query(Channel).filter_by(id=channel_id).first() if channel: _fetch_and_index_channel(db, channel) finally: db.close() def _worker_loop(): while True: try: user_id, task = _task_queue.get(timeout=10) except _queue.Empty: continue try: task() except Exception: pass with _progress_lock: p = _progress.get(user_id) if p: p["done"] = min(p["done"] + 1, p["total"]) if p["done"] >= p["total"] and p["running"]: p["running"] = False _threading.Thread(target=_stamp_last_run, args=(user_id,), daemon=True).start() _task_queue.task_done() # Polite gap — only sleep if more tasks are waiting if not _task_queue.empty(): _time.sleep(random.uniform(30, 90)) def start_discovery_worker(): """Start the singleton background worker thread (idempotent).""" global _worker_started with _worker_lock: if not _worker_started: _threading.Thread(target=_worker_loop, daemon=True, name="discovery-worker").start() _worker_started = True def get_discovery_progress(user_id: int) -> dict | None: with _progress_lock: p = _progress.get(user_id) return dict(p) if p is not None else None def _build_search_task_args(db: Session, user_id: int) -> list[tuple[str, str, float]]: """Compute all search/liked query strings without executing any yt-dlp calls.""" result: list[tuple[str, str, float]] = [] followed_rows = db.execute( text(""" SELECT c.name, c.youtube_channel_id FROM channels c JOIN user_channels uc ON c.id = uc.channel_id WHERE uc.user_id = :user_id AND uc.status = 'followed' """), {"user_id": user_id}, ).mappings().all() followed_names = [row["name"] for row in followed_rows if row["name"]] tag_rows = db.execute( text(""" SELECT tags FROM ( SELECT v.tags FROM videos v JOIN user_channels uc ON v.channel_id = uc.channel_id WHERE uc.user_id = :user_id AND uc.status = 'followed' AND v.tags IS NOT NULL AND v.tags != '' AND v.tags != '[]' LIMIT 300 ) UNION ALL SELECT tags FROM ( SELECT v.tags FROM user_videos uv JOIN videos v ON uv.video_id = v.id WHERE uv.user_id = :user_id AND uv.liked = 1 AND v.tags IS NOT NULL AND v.tags != '' AND v.tags != '[]' LIMIT 100 ) """), {"user_id": user_id}, ).mappings().all() tag_counts: dict[str, int] = {} liked_tag_counts: dict[str, int] = {} for row in tag_rows: try: for tag in json.loads(row["tags"]): if isinstance(tag, str): t = tag.lower().strip() if 3 <= len(t) <= 40: tag_counts[t] = tag_counts.get(t, 0) + 1 except (json.JSONDecodeError, TypeError): continue cat_rows = db.execute( text(""" SELECT v.category, COUNT(*) AS cnt FROM videos v JOIN user_channels uc ON v.channel_id = uc.channel_id WHERE uc.user_id = :user_id AND uc.status = 'followed' AND v.category IS NOT NULL GROUP BY v.category ORDER BY cnt DESC LIMIT 5 """), {"user_id": user_id}, ).mappings().all() top_tags = [t for t, _ in sorted(tag_counts.items(), key=lambda x: -x[1])[:5]] top_cats = [r["category"] for r in cat_rows] sampled_names = random.sample(followed_names, min(4, len(followed_names))) if followed_names else [] serendipity = [f"best {top_cats[0]} channels"] if top_cats else [] search_queries = list(dict.fromkeys(top_tags + sampled_names + serendipity + top_cats[:2]))[:10] for q in search_queries: result.append((q, "search", 5.0)) # Liked signal queries liked_rows = db.execute( text(""" SELECT v.tags FROM user_videos uv JOIN videos v ON uv.video_id = v.id WHERE uv.user_id = :user_id AND uv.liked = 1 AND v.tags IS NOT NULL AND v.tags != '' AND v.tags != '[]' """), {"user_id": user_id}, ).mappings().all() for row in liked_rows: try: for tag in json.loads(row["tags"]): if isinstance(tag, str): t = tag.lower().strip() if 3 <= len(t) <= 40: liked_tag_counts[t] = liked_tag_counts.get(t, 0) + 2 except (json.JSONDecodeError, TypeError): pass for q in [t for t, _ in sorted(liked_tag_counts.items(), key=lambda x: -x[1])[:4]]: result.append((q, "liked", 10.0)) return result def _sample_graph_yt_ids(db: Session, user_id: int) -> list[str]: rows = db.execute( text(""" SELECT c.youtube_channel_id FROM channels c JOIN user_channels uc ON c.id = uc.channel_id WHERE uc.user_id = :user_id AND uc.status = 'followed' AND c.youtube_channel_id IS NOT NULL """), {"user_id": user_id}, ).scalars().all() if not rows: return [] return random.sample(list(rows), min(6, len(rows))) def schedule_discovery(user_id: int, regions: list[str] | None = None): """Schedule a full discovery sweep, spreading yt-dlp calls 30-90 s apart with call types shuffled so searches, graph fetches, and trending are mixed.""" if regions is None: regions = ["US", "SE"] from ..database import SessionLocal # Fast signals (pure SQL, no yt-dlp) run synchronously right now db = SessionLocal() try: db.execute( text(""" DELETE FROM discovery_queue WHERE user_id = :uid AND seen = 0 AND created_at <= datetime('now', '-14 days') """), {"uid": user_id}, ) db.commit() update_community_signal(db, user_id) update_category_clusters(db, user_id) search_args = _build_search_task_args(db, user_id) graph_yt_ids = _sample_graph_yt_ids(db, user_id) finally: db.close() # Build one task per yt-dlp call, then shuffle to mix call types tasks: list[tuple[int, object]] = [] for query, source, mult in search_args: tasks.append((user_id, lambda q=query, s=source, m=mult: _do_task_search(user_id, q, s, m))) for region in regions[:1]: tasks.append((user_id, lambda r=region: _do_task_trending(user_id, r))) for yt_id in graph_yt_ids: tasks.append((user_id, lambda y=yt_id: _do_task_graph(user_id, y))) random.shuffle(tasks) with _progress_lock: _progress[user_id] = {"total": len(tasks), "done": 0, "running": bool(tasks)} for item in tasks: _task_queue.put(item) if not tasks: _stamp_last_run(user_id)