Each search/graph/trending task was calling _fetch_and_index_channel inline for up to 10-15 newly discovered channels, each making up to 4 yt-dlp calls (1 channel metadata + 3 individual video fetches for dateless entries). This bypassed the 30-90 s worker gap, producing bursts of 40-60 calls in rapid succession and hammering YouTube. Changes: - _fetch_and_index_channel: removed the dateless-video individual fetch loop — one call per channel, videos without published_at are simply skipped at discovery time - _search_and_store and _fetch_graph_for_channel: queue channel indexing as separate worker tasks (3 and 2 respectively) so the 30-90 s gap applies between every yt-dlp call, including channel indexing - update_trending_signal and update_graph_signal (old sync path): removed inline _fetch_and_index_channel loops (15 and 10 channels) - _discovery_task in channels.py: replaced run_full_discovery (old synchronous path) with schedule_discovery so sync-all and follow-by-url go through the queue system Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
986 lines
35 KiB
Python
986 lines
35 KiB
Python
"""Discovery engine — search-based crawl, trending, community signal, category clustering."""
|
||
import json
|
||
import queue as _queue
|
||
import random
|
||
import threading as _threading
|
||
import time as _time
|
||
from datetime import datetime
|
||
from sqlalchemy.orm import Session
|
||
from sqlalchemy import text
|
||
|
||
from ..models import Channel, UserChannel, DiscoveryQueue, Video
|
||
from . import ytdlp
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Background task queue — spaces yt-dlp calls 30-90 s apart and shuffles
|
||
# call types so we don't fire 10 searches in a row.
|
||
# ---------------------------------------------------------------------------
|
||
_task_queue: _queue.Queue = _queue.Queue()
|
||
_progress: dict[int, dict] = {} # user_id -> {total, done, running}
|
||
_progress_lock = _threading.Lock()
|
||
_worker_started = False
|
||
_worker_lock = _threading.Lock()
|
||
|
||
|
||
def _fetch_and_index_channel(db: Session, channel: Channel):
|
||
"""Fetch metadata + recent videos for a discovered channel (one yt-dlp call only)."""
|
||
try:
|
||
result = ytdlp.fetch_channel_metadata(channel.youtube_channel_id, max_videos=10, polite=True)
|
||
if not result:
|
||
return
|
||
ch_data = result.get("channel", {})
|
||
for k, v in ch_data.items():
|
||
if hasattr(channel, k) and v is not None and v != "":
|
||
setattr(channel, k, v)
|
||
channel.crawled_at = datetime.utcnow()
|
||
|
||
for vdata in result.get("videos", []):
|
||
yt_id = vdata.get("youtube_video_id")
|
||
if not yt_id or not vdata.get("published_at"):
|
||
continue
|
||
if not db.query(Video).filter_by(youtube_video_id=yt_id).first():
|
||
db.add(Video(
|
||
youtube_video_id=yt_id,
|
||
channel_id=channel.id,
|
||
title=vdata.get("title", ""),
|
||
description=vdata.get("description"),
|
||
thumbnail_url=vdata.get("thumbnail_url"),
|
||
duration_seconds=vdata.get("duration_seconds"),
|
||
published_at=vdata.get("published_at"),
|
||
tags=vdata.get("tags"),
|
||
category=vdata.get("category"),
|
||
))
|
||
db.commit()
|
||
except Exception:
|
||
db.rollback()
|
||
|
||
|
||
def _upsert_channel(db: Session, channel_data: dict) -> Channel | None:
|
||
yt_id = channel_data.get("youtube_channel_id")
|
||
if not yt_id:
|
||
return None
|
||
channel = db.query(Channel).filter_by(youtube_channel_id=yt_id).first()
|
||
if not channel:
|
||
channel = Channel(**channel_data)
|
||
db.add(channel)
|
||
db.flush()
|
||
return channel
|
||
|
||
|
||
_MAX_DISCOVERY_SCORE = 50.0
|
||
|
||
|
||
def _add_to_discovery(
|
||
db: Session, user_id: int, channel_id: int, score: float, source: str,
|
||
preview_json: str | None = None,
|
||
):
|
||
score = min(score, _MAX_DISCOVERY_SCORE)
|
||
existing = db.query(DiscoveryQueue).filter_by(user_id=user_id, channel_id=channel_id).first()
|
||
if existing:
|
||
# Accumulate across sources but cap so no single signal dominates forever
|
||
existing.score = min(existing.score + score * 0.5, _MAX_DISCOVERY_SCORE)
|
||
if preview_json and not existing.preview_json:
|
||
existing.preview_json = preview_json
|
||
return
|
||
db.add(DiscoveryQueue(
|
||
user_id=user_id,
|
||
channel_id=channel_id,
|
||
score=score,
|
||
source=source,
|
||
preview_json=preview_json,
|
||
))
|
||
|
||
|
||
def _search_and_store(
|
||
db: Session, user_id: int, queries: list[str],
|
||
followed_yt_ids: set[str], score_multiplier: float, source: str,
|
||
neg_affinity_tags: frozenset[str] = frozenset(),
|
||
):
|
||
"""Run YouTube searches for the given queries and add results to discovery."""
|
||
discovered: dict[str, dict] = {}
|
||
|
||
for query in queries:
|
||
try:
|
||
results = ytdlp.search_youtube(query, max_results=40, polite=True)
|
||
except Exception:
|
||
results = []
|
||
for video in results:
|
||
ch = video.get("channel", {})
|
||
yt_id = ch.get("youtube_channel_id")
|
||
name = (ch.get("name") or "").strip()
|
||
if yt_id and name and yt_id not in followed_yt_ids:
|
||
if yt_id not in discovered:
|
||
discovered[yt_id] = {"name": name, "count": 0, "previews": []}
|
||
discovered[yt_id]["count"] += 1
|
||
previews = discovered[yt_id]["previews"]
|
||
if len(previews) < 3 and video.get("thumbnail_url") and video.get("title"):
|
||
previews.append({
|
||
"thumbnail_url": video["thumbnail_url"],
|
||
"title": video["title"],
|
||
})
|
||
|
||
if not discovered:
|
||
return
|
||
|
||
candidates = sorted(discovered.items(), key=lambda x: -x[1]["count"])
|
||
|
||
needs_indexing: list[int] = []
|
||
for yt_id, info in candidates:
|
||
channel = db.query(Channel).filter_by(youtube_channel_id=yt_id).first()
|
||
is_new = channel is None
|
||
if not channel:
|
||
channel = Channel(
|
||
youtube_channel_id=yt_id,
|
||
name=info["name"],
|
||
description="",
|
||
thumbnail_url=None,
|
||
)
|
||
db.add(channel)
|
||
db.flush()
|
||
|
||
uc = db.query(UserChannel).filter_by(user_id=user_id, channel_id=channel.id).first()
|
||
if uc and uc.status in ("followed", "dismissed"):
|
||
continue
|
||
|
||
# Skip channels whose indexed videos heavily overlap with negatively-rated tags
|
||
if neg_affinity_tags and not is_new and channel.crawled_at:
|
||
neg_hit = 0
|
||
vtags = db.execute(
|
||
text("SELECT tags FROM videos WHERE channel_id = :cid AND tags IS NOT NULL LIMIT 20"),
|
||
{"cid": channel.id},
|
||
).scalars().all()
|
||
for tags_json in vtags:
|
||
try:
|
||
for tag in json.loads(tags_json or "[]"):
|
||
if isinstance(tag, str) and tag.lower().strip() in neg_affinity_tags:
|
||
neg_hit += 1
|
||
except (json.JSONDecodeError, TypeError):
|
||
pass
|
||
if neg_hit >= 3:
|
||
continue
|
||
|
||
preview_json = json.dumps(info["previews"]) if info["previews"] else None
|
||
_add_to_discovery(
|
||
db, user_id, channel.id,
|
||
score=float(info["count"]) * score_multiplier,
|
||
source=source,
|
||
preview_json=preview_json,
|
||
)
|
||
if is_new or not channel.crawled_at:
|
||
needs_indexing.append(channel.id)
|
||
|
||
db.commit()
|
||
|
||
# Queue channel indexing as separate worker tasks (30-90 s gaps apply).
|
||
for channel_id in needs_indexing[:3]:
|
||
_task_queue.put((user_id, lambda cid=channel_id: _do_task_index_channel(user_id, cid)))
|
||
|
||
|
||
def crawl_by_search(db: Session, user_id: int):
|
||
"""Discover channels by searching YouTube using tags, categories, and channel names."""
|
||
# All followed channels (names + yt_ids)
|
||
followed_rows = db.execute(
|
||
text("""
|
||
SELECT c.name, c.youtube_channel_id
|
||
FROM channels c
|
||
JOIN user_channels uc ON c.id = uc.channel_id
|
||
WHERE uc.user_id = :user_id AND uc.status = 'followed'
|
||
"""),
|
||
{"user_id": user_id},
|
||
).mappings().all()
|
||
|
||
followed_yt_ids = {row["youtube_channel_id"] for row in followed_rows}
|
||
followed_names = [row["name"] for row in followed_rows if row["name"]]
|
||
|
||
# Top tags from followed channels' indexed videos + liked videos
|
||
# SQLite requires LIMIT inside a subquery when used with UNION ALL
|
||
tag_rows = db.execute(
|
||
text("""
|
||
SELECT tags FROM (
|
||
SELECT v.tags
|
||
FROM videos v
|
||
JOIN user_channels uc ON v.channel_id = uc.channel_id
|
||
WHERE uc.user_id = :user_id AND uc.status = 'followed'
|
||
AND v.tags IS NOT NULL AND v.tags != '' AND v.tags != '[]'
|
||
LIMIT 300
|
||
)
|
||
UNION ALL
|
||
SELECT tags FROM (
|
||
SELECT v.tags
|
||
FROM user_videos uv
|
||
JOIN videos v ON uv.video_id = v.id
|
||
WHERE uv.user_id = :user_id AND uv.liked = 1
|
||
AND v.tags IS NOT NULL AND v.tags != '' AND v.tags != '[]'
|
||
LIMIT 100
|
||
)
|
||
"""),
|
||
{"user_id": user_id},
|
||
).mappings().all()
|
||
|
||
tag_counts: dict[str, int] = {}
|
||
for row in tag_rows:
|
||
try:
|
||
tags = json.loads(row["tags"])
|
||
for tag in tags:
|
||
if isinstance(tag, str):
|
||
t = tag.lower().strip()
|
||
if 3 <= len(t) <= 40:
|
||
tag_counts[t] = tag_counts.get(t, 0) + 1
|
||
except (json.JSONDecodeError, TypeError):
|
||
continue
|
||
|
||
# Top categories as fallback
|
||
cat_rows = db.execute(
|
||
text("""
|
||
SELECT v.category, COUNT(*) AS cnt
|
||
FROM videos v
|
||
JOIN user_channels uc ON v.channel_id = uc.channel_id
|
||
WHERE uc.user_id = :user_id AND uc.status = 'followed'
|
||
AND v.category IS NOT NULL
|
||
GROUP BY v.category
|
||
ORDER BY cnt DESC
|
||
LIMIT 5
|
||
"""),
|
||
{"user_id": user_id},
|
||
).mappings().all()
|
||
|
||
# Keep the query count low — each query is a separate yt-dlp subprocess
|
||
# (its own HTTP session). Too many back-to-back sessions look like a bot.
|
||
top_tags = [t for t, _ in sorted(tag_counts.items(), key=lambda x: -x[1])[:5]]
|
||
top_cats = [r["category"] for r in cat_rows]
|
||
|
||
# A few randomly-sampled channel names — diversifies results each run
|
||
sampled_names: list[str] = []
|
||
if followed_names:
|
||
sampled_names = random.sample(followed_names, min(4, len(followed_names)))
|
||
|
||
# One serendipity query to surface content outside the user's direct tag space
|
||
serendipity = [f"best {top_cats[0]} channels"] if top_cats else []
|
||
|
||
# Total target: ≤10 queries
|
||
queries = list(dict.fromkeys(top_tags + sampled_names + serendipity + top_cats[:2]))[:10]
|
||
if not queries:
|
||
return
|
||
|
||
neg_tags = frozenset(
|
||
r["tag"] for r in db.execute(
|
||
text("SELECT tag FROM user_tag_affinity WHERE user_id = :user_id AND score < -2"),
|
||
{"user_id": user_id},
|
||
).mappings().all()
|
||
)
|
||
_search_and_store(db, user_id, queries, followed_yt_ids, score_multiplier=5.0, source="search",
|
||
neg_affinity_tags=neg_tags)
|
||
|
||
|
||
def update_community_signal(db: Session, user_id: int):
|
||
"""Surface channels that other users follow, weighted by follower count."""
|
||
rows = db.execute(
|
||
text("""
|
||
SELECT uc.channel_id, COUNT(DISTINCT uc.user_id) AS follower_count
|
||
FROM user_channels uc
|
||
WHERE uc.user_id != :user_id
|
||
AND uc.status = 'followed'
|
||
AND uc.channel_id NOT IN (
|
||
SELECT channel_id FROM user_channels
|
||
WHERE user_id = :user_id
|
||
)
|
||
GROUP BY uc.channel_id
|
||
ORDER BY follower_count DESC
|
||
LIMIT 100
|
||
"""),
|
||
{"user_id": user_id},
|
||
).mappings().all()
|
||
|
||
for row in rows:
|
||
_add_to_discovery(
|
||
db, user_id, row["channel_id"],
|
||
score=float(row["follower_count"]) * 5,
|
||
source="community",
|
||
)
|
||
db.commit()
|
||
|
||
|
||
def update_category_clusters(db: Session, user_id: int):
|
||
"""Find channels in categories the user watches heavily."""
|
||
rows = db.execute(
|
||
text("""
|
||
SELECT v.category, COUNT(*) AS watch_count
|
||
FROM user_videos uv
|
||
JOIN videos v ON uv.video_id = v.id
|
||
WHERE uv.user_id = :user_id AND uv.watched = 1 AND v.category IS NOT NULL
|
||
GROUP BY v.category
|
||
ORDER BY watch_count DESC
|
||
LIMIT 5
|
||
"""),
|
||
{"user_id": user_id},
|
||
).mappings().all()
|
||
|
||
top_categories = [r["category"] for r in rows]
|
||
if not top_categories:
|
||
return
|
||
|
||
# Use JSON_EACH / parameterized IN via repeated queries to avoid SQL injection
|
||
candidate_channel_ids: set[int] = set()
|
||
for cat in top_categories:
|
||
cat_rows = db.execute(
|
||
text("""
|
||
SELECT DISTINCT v.channel_id
|
||
FROM videos v
|
||
WHERE v.category = :cat
|
||
AND v.channel_id NOT IN (
|
||
SELECT channel_id FROM user_channels WHERE user_id = :user_id
|
||
)
|
||
LIMIT 50
|
||
"""),
|
||
{"cat": cat, "user_id": user_id},
|
||
).mappings().all()
|
||
for row in cat_rows:
|
||
candidate_channel_ids.add(row["channel_id"])
|
||
|
||
for channel_id in candidate_channel_ids:
|
||
_add_to_discovery(db, user_id, channel_id, score=5.0, source="category")
|
||
db.commit()
|
||
|
||
|
||
def update_liked_signal(db: Session, user_id: int):
|
||
"""Search YouTube for channels related to topics extracted from liked videos."""
|
||
liked_rows = db.execute(
|
||
text("""
|
||
SELECT v.tags
|
||
FROM user_videos uv
|
||
JOIN videos v ON uv.video_id = v.id
|
||
WHERE uv.user_id = :user_id AND uv.liked = 1
|
||
AND v.tags IS NOT NULL AND v.tags != '' AND v.tags != '[]'
|
||
"""),
|
||
{"user_id": user_id},
|
||
).mappings().all()
|
||
|
||
if not liked_rows:
|
||
return
|
||
|
||
tag_counts: dict[str, int] = {}
|
||
for row in liked_rows:
|
||
try:
|
||
tags = json.loads(row["tags"])
|
||
for tag in tags:
|
||
if isinstance(tag, str):
|
||
t = tag.lower().strip()
|
||
if 3 <= len(t) <= 40:
|
||
tag_counts[t] = tag_counts.get(t, 0) + 2
|
||
except (json.JSONDecodeError, TypeError):
|
||
pass
|
||
|
||
if not tag_counts:
|
||
return
|
||
|
||
followed_yt_ids = set(db.execute(
|
||
text("""
|
||
SELECT c.youtube_channel_id FROM channels c
|
||
JOIN user_channels uc ON c.id = uc.channel_id
|
||
WHERE uc.user_id = :user_id AND uc.status = 'followed'
|
||
"""),
|
||
{"user_id": user_id},
|
||
).scalars().all())
|
||
|
||
top_tags = [t for t, _ in sorted(tag_counts.items(), key=lambda x: -x[1])[:4]]
|
||
neg_tags = frozenset(
|
||
r["tag"] for r in db.execute(
|
||
text("SELECT tag FROM user_tag_affinity WHERE user_id = :user_id AND score < -2"),
|
||
{"user_id": user_id},
|
||
).mappings().all()
|
||
)
|
||
_search_and_store(db, user_id, top_tags, followed_yt_ids, score_multiplier=10.0, source="liked",
|
||
neg_affinity_tags=neg_tags)
|
||
|
||
|
||
def update_watch_signal(db: Session, user_id: int):
|
||
"""Discover channels from watched video topics, dampened so a single view has little effect.
|
||
|
||
A tag needs to appear in at least 3 distinct watched videos before it influences
|
||
discovery. Each qualifying tag contributes a modest score (×3 vs liked ×10),
|
||
so watching a single Tokyo video won't flood recommendations with Tokyo content.
|
||
"""
|
||
rows = db.execute(
|
||
text("""
|
||
SELECT v.tags
|
||
FROM user_videos uv
|
||
JOIN videos v ON uv.video_id = v.id
|
||
WHERE uv.user_id = :user_id AND uv.watched = 1
|
||
AND v.tags IS NOT NULL AND v.tags != '' AND v.tags != '[]'
|
||
"""),
|
||
{"user_id": user_id},
|
||
).mappings().all()
|
||
|
||
if not rows:
|
||
return
|
||
|
||
tag_counts: dict[str, int] = {}
|
||
for row in rows:
|
||
try:
|
||
tags = json.loads(row["tags"])
|
||
seen = set()
|
||
for tag in tags:
|
||
if isinstance(tag, str):
|
||
t = tag.lower().strip()
|
||
if 3 <= len(t) <= 40 and t not in seen:
|
||
tag_counts[t] = tag_counts.get(t, 0) + 1
|
||
seen.add(t)
|
||
except (json.JSONDecodeError, TypeError):
|
||
pass
|
||
|
||
# Only use tags that appear across 3+ distinct watched videos
|
||
qualified = {t: c for t, c in tag_counts.items() if c >= 3}
|
||
if not qualified:
|
||
return
|
||
|
||
followed_yt_ids = set(db.execute(
|
||
text("""
|
||
SELECT c.youtube_channel_id FROM channels c
|
||
JOIN user_channels uc ON c.id = uc.channel_id
|
||
WHERE uc.user_id = :user_id AND uc.status = 'followed'
|
||
"""),
|
||
{"user_id": user_id},
|
||
).scalars().all())
|
||
|
||
top_tags = [t for t, _ in sorted(qualified.items(), key=lambda x: -x[1])[:10]]
|
||
neg_tags = frozenset(
|
||
r["tag"] for r in db.execute(
|
||
text("SELECT tag FROM user_tag_affinity WHERE user_id = :user_id AND score < -2"),
|
||
{"user_id": user_id},
|
||
).mappings().all()
|
||
)
|
||
_search_and_store(db, user_id, top_tags, followed_yt_ids, score_multiplier=3.0, source="watched",
|
||
neg_affinity_tags=neg_tags)
|
||
|
||
|
||
def _build_user_tag_profile(db: Session, user_id: int) -> dict[str, float]:
|
||
"""Return tag affinity dict (positive = liked, negative = disliked/dismissed)."""
|
||
rows = db.execute(
|
||
text("SELECT tag, score FROM user_tag_affinity WHERE user_id = :user_id"),
|
||
{"user_id": user_id},
|
||
).mappings().all()
|
||
return {row["tag"]: row["score"] for row in rows}
|
||
|
||
|
||
def _tag_relevance_score(tag_profile: dict[str, float], tags_json: str | None) -> float:
|
||
"""Score a channel's tags against user affinity — positive means relevant, negative means disliked."""
|
||
if not tag_profile or not tags_json:
|
||
return 0.0
|
||
try:
|
||
tags = json.loads(tags_json)
|
||
except (json.JSONDecodeError, TypeError):
|
||
return 0.0
|
||
score = 0.0
|
||
for tag in tags:
|
||
if isinstance(tag, str):
|
||
t = tag.lower().strip()
|
||
score += tag_profile.get(t, 0.0)
|
||
return max(-100.0, min(score, 50.0))
|
||
|
||
|
||
def update_trending_signal(db: Session, user_id: int, regions: list[str]):
|
||
"""Fetch trending videos per region and score them by tag overlap with user interests."""
|
||
if not regions:
|
||
return
|
||
|
||
tag_profile = _build_user_tag_profile(db, user_id)
|
||
|
||
followed_yt_ids = set(db.execute(
|
||
text("""
|
||
SELECT c.youtube_channel_id FROM channels c
|
||
JOIN user_channels uc ON c.id = uc.channel_id
|
||
WHERE uc.user_id = :user_id AND uc.status = 'followed'
|
||
"""),
|
||
{"user_id": user_id},
|
||
).scalars().all())
|
||
|
||
dismissed_channel_ids = set(db.execute(
|
||
text("""
|
||
SELECT channel_id FROM user_channels
|
||
WHERE user_id = :user_id AND status = 'dismissed'
|
||
"""),
|
||
{"user_id": user_id},
|
||
).scalars().all())
|
||
|
||
discovered: dict[str, dict] = {}
|
||
for region in regions:
|
||
try:
|
||
videos = ytdlp.fetch_trending(region=region, max_results=50)
|
||
for video in videos:
|
||
ch = video.get("channel", {})
|
||
yt_id = ch.get("youtube_channel_id")
|
||
name = (ch.get("name") or "").strip()
|
||
if not yt_id or not name or yt_id in followed_yt_ids:
|
||
continue
|
||
if yt_id not in discovered:
|
||
discovered[yt_id] = {"name": name, "count": 0, "regions": set(), "previews": []}
|
||
discovered[yt_id]["count"] += 1
|
||
discovered[yt_id]["regions"].add(region)
|
||
previews = discovered[yt_id]["previews"]
|
||
if len(previews) < 3 and video.get("thumbnail_url") and video.get("title"):
|
||
previews.append({
|
||
"thumbnail_url": video["thumbnail_url"],
|
||
"title": video["title"],
|
||
})
|
||
except Exception:
|
||
continue
|
||
|
||
if not discovered:
|
||
return
|
||
|
||
needs_indexing: list[int] = []
|
||
for yt_id, info in discovered.items():
|
||
channel = db.query(Channel).filter_by(youtube_channel_id=yt_id).first()
|
||
is_new = channel is None
|
||
if not channel:
|
||
channel = Channel(
|
||
youtube_channel_id=yt_id,
|
||
name=info["name"],
|
||
description="",
|
||
thumbnail_url=None,
|
||
)
|
||
db.add(channel)
|
||
db.flush()
|
||
|
||
if channel.id in dismissed_channel_ids:
|
||
continue
|
||
|
||
uc = db.query(UserChannel).filter_by(user_id=user_id, channel_id=channel.id).first()
|
||
if uc and uc.status in ("followed", "dismissed"):
|
||
continue
|
||
|
||
# Cap base_score so a viral trending channel can't dominate the whole queue.
|
||
# count × 4.0 × regions can reach 300+ without this cap.
|
||
base_score = min(float(info["count"]) * 4.0 * len(info["regions"]), 18.0)
|
||
|
||
# Tag relevance: positive for liked content, negative for dismissed/disliked.
|
||
# tag_profile comes from user_tag_affinity which tracks both signals.
|
||
tag_boost = 0.0
|
||
if not is_new and channel.crawled_at:
|
||
tag_rows = db.execute(
|
||
text("SELECT tags FROM videos WHERE channel_id = :cid AND tags IS NOT NULL LIMIT 20"),
|
||
{"cid": channel.id},
|
||
).scalars().all()
|
||
for tags_json in tag_rows:
|
||
tag_boost += _tag_relevance_score(tag_profile, tags_json)
|
||
|
||
final_score = min(base_score + tag_boost, 25.0)
|
||
if final_score <= 0:
|
||
continue
|
||
|
||
preview_json = json.dumps(info["previews"]) if info["previews"] else None
|
||
_add_to_discovery(db, user_id, channel.id, score=final_score, source="trending", preview_json=preview_json)
|
||
if is_new or not channel.crawled_at:
|
||
needs_indexing.append(channel.id)
|
||
|
||
db.commit()
|
||
|
||
|
||
def update_graph_signal(db: Session, user_id: int):
|
||
"""Discover channels featured on followed channels' /channels tab.
|
||
|
||
Channels that creators explicitly recommend are high-signal — they're
|
||
curated by someone whose taste you already follow. Samples up to 12 followed
|
||
channels per run and fetches their featured channels list in parallel.
|
||
"""
|
||
followed_rows = db.execute(
|
||
text("""
|
||
SELECT c.youtube_channel_id, c.id
|
||
FROM channels c
|
||
JOIN user_channels uc ON c.id = uc.channel_id
|
||
WHERE uc.user_id = :user_id AND uc.status = 'followed'
|
||
AND c.youtube_channel_id IS NOT NULL
|
||
"""),
|
||
{"user_id": user_id},
|
||
).mappings().all()
|
||
|
||
if not followed_rows:
|
||
return
|
||
|
||
followed_yt_ids = {row["youtube_channel_id"] for row in followed_rows}
|
||
|
||
dismissed_ids = set(db.execute(
|
||
text("SELECT channel_id FROM user_channels WHERE user_id = :user_id AND status = 'dismissed'"),
|
||
{"user_id": user_id},
|
||
).scalars().all())
|
||
|
||
sample = random.sample(list(followed_rows), min(6, len(followed_rows)))
|
||
|
||
featured_map: dict[str, list[str]] = {}
|
||
for row in sample:
|
||
try:
|
||
featured_map[row["youtube_channel_id"]] = ytdlp.fetch_featured_channels(row["youtube_channel_id"])
|
||
except Exception:
|
||
featured_map[row["youtube_channel_id"]] = []
|
||
|
||
needs_indexing: list[int] = []
|
||
for source_yt_id, channel_ids in featured_map.items():
|
||
for yt_id in channel_ids:
|
||
if yt_id in followed_yt_ids:
|
||
continue
|
||
channel = db.query(Channel).filter_by(youtube_channel_id=yt_id).first()
|
||
is_new = channel is None
|
||
if not channel:
|
||
channel = Channel(youtube_channel_id=yt_id, name="", description="", thumbnail_url=None)
|
||
db.add(channel)
|
||
db.flush()
|
||
if channel.id in dismissed_ids:
|
||
continue
|
||
uc = db.query(UserChannel).filter_by(user_id=user_id, channel_id=channel.id).first()
|
||
if uc and uc.status in ("followed", "dismissed"):
|
||
continue
|
||
_add_to_discovery(db, user_id, channel.id, score=8.0, source="graph")
|
||
if is_new or not channel.crawled_at:
|
||
needs_indexing.append(channel.id)
|
||
|
||
db.commit()
|
||
|
||
|
||
def run_full_discovery(db: Session, user_id: int, regions: list[str] | None = None):
|
||
if regions is None:
|
||
regions = ["US", "SE"]
|
||
|
||
# Expire unseen entries older than 14 days so stale high-score channels
|
||
# don't block fresh results forever.
|
||
db.execute(
|
||
text("""
|
||
DELETE FROM discovery_queue
|
||
WHERE user_id = :user_id AND seen = 0
|
||
AND created_at <= datetime('now', '-14 days')
|
||
"""),
|
||
{"user_id": user_id},
|
||
)
|
||
db.commit()
|
||
|
||
crawl_by_search(db, user_id) # ~10 yt-dlp calls
|
||
update_community_signal(db, user_id) # no yt-dlp
|
||
update_category_clusters(db, user_id) # no yt-dlp
|
||
update_liked_signal(db, user_id) # ~4 yt-dlp calls
|
||
# update_watch_signal skipped — tags already included in crawl_by_search
|
||
update_trending_signal(db, user_id, regions[:1]) # 1 yt-dlp call (first region only)
|
||
update_graph_signal(db, user_id) # ~6 yt-dlp calls
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Queue-based gradual discovery — each yt-dlp call is its own task, shuffled
|
||
# so call types are mixed, with 30-90 s gaps between them.
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _get_followed_yt_ids(db: Session, user_id: int) -> set[str]:
|
||
return set(db.execute(
|
||
text("""
|
||
SELECT c.youtube_channel_id FROM channels c
|
||
JOIN user_channels uc ON c.id = uc.channel_id
|
||
WHERE uc.user_id = :uid AND uc.status = 'followed'
|
||
"""),
|
||
{"uid": user_id},
|
||
).scalars().all())
|
||
|
||
|
||
def _get_neg_tags(db: Session, user_id: int) -> frozenset[str]:
|
||
return frozenset(db.execute(
|
||
text("SELECT tag FROM user_tag_affinity WHERE user_id = :uid AND score < -2"),
|
||
{"uid": user_id},
|
||
).scalars().all())
|
||
|
||
|
||
def _stamp_last_run(user_id: int):
|
||
from ..database import SessionLocal
|
||
from sqlalchemy import text as _text
|
||
db = SessionLocal()
|
||
try:
|
||
db.execute(
|
||
_text("UPDATE user_settings SET last_discovery_run = :now WHERE user_id = :uid"),
|
||
{"now": datetime.utcnow(), "uid": user_id},
|
||
)
|
||
db.commit()
|
||
except Exception:
|
||
db.rollback()
|
||
finally:
|
||
db.close()
|
||
|
||
|
||
def _do_task_search(user_id: int, query: str, source: str, score_multiplier: float):
|
||
from ..database import SessionLocal
|
||
db = SessionLocal()
|
||
try:
|
||
followed_yt_ids = _get_followed_yt_ids(db, user_id)
|
||
neg_tags = _get_neg_tags(db, user_id)
|
||
_search_and_store(db, user_id, [query], followed_yt_ids, score_multiplier, source, neg_tags)
|
||
finally:
|
||
db.close()
|
||
|
||
|
||
def _do_task_trending(user_id: int, region: str):
|
||
from ..database import SessionLocal
|
||
db = SessionLocal()
|
||
try:
|
||
update_trending_signal(db, user_id, [region])
|
||
finally:
|
||
db.close()
|
||
|
||
|
||
def _fetch_graph_for_channel(db: Session, user_id: int, source_yt_id: str):
|
||
"""Fetch featured channels for one followed channel and add to discovery queue."""
|
||
followed_yt_ids = _get_followed_yt_ids(db, user_id)
|
||
dismissed_ids = set(db.execute(
|
||
text("SELECT channel_id FROM user_channels WHERE user_id = :uid AND status = 'dismissed'"),
|
||
{"uid": user_id},
|
||
).scalars().all())
|
||
|
||
try:
|
||
featured = ytdlp.fetch_featured_channels(source_yt_id)
|
||
except Exception:
|
||
return
|
||
|
||
needs_indexing: list[int] = []
|
||
for yt_id in featured:
|
||
if yt_id in followed_yt_ids:
|
||
continue
|
||
channel = db.query(Channel).filter_by(youtube_channel_id=yt_id).first()
|
||
is_new = channel is None
|
||
if not channel:
|
||
channel = Channel(youtube_channel_id=yt_id, name="", description="", thumbnail_url=None)
|
||
db.add(channel)
|
||
db.flush()
|
||
if channel.id in dismissed_ids:
|
||
continue
|
||
uc = db.query(UserChannel).filter_by(user_id=user_id, channel_id=channel.id).first()
|
||
if uc and uc.status in ("followed", "dismissed"):
|
||
continue
|
||
_add_to_discovery(db, user_id, channel.id, score=8.0, source="graph")
|
||
if is_new or not channel.crawled_at:
|
||
needs_indexing.append(channel.id)
|
||
|
||
db.commit()
|
||
|
||
for channel_id in needs_indexing[:2]:
|
||
_task_queue.put((user_id, lambda cid=channel_id: _do_task_index_channel(user_id, cid)))
|
||
|
||
|
||
def _do_task_graph(user_id: int, source_yt_id: str):
|
||
from ..database import SessionLocal
|
||
db = SessionLocal()
|
||
try:
|
||
_fetch_graph_for_channel(db, user_id, source_yt_id)
|
||
finally:
|
||
db.close()
|
||
|
||
|
||
def _do_task_index_channel(user_id: int, channel_id: int):
|
||
"""Index one newly-discovered channel (one yt-dlp call). Queued as a separate
|
||
worker task so the 30-90 s gap applies rather than bursting inline."""
|
||
from ..database import SessionLocal
|
||
db = SessionLocal()
|
||
try:
|
||
channel = db.query(Channel).filter_by(id=channel_id).first()
|
||
if channel:
|
||
_fetch_and_index_channel(db, channel)
|
||
finally:
|
||
db.close()
|
||
|
||
|
||
def _worker_loop():
|
||
while True:
|
||
try:
|
||
user_id, task = _task_queue.get(timeout=10)
|
||
except _queue.Empty:
|
||
continue
|
||
|
||
try:
|
||
task()
|
||
except Exception:
|
||
pass
|
||
|
||
with _progress_lock:
|
||
p = _progress.get(user_id)
|
||
if p:
|
||
p["done"] = min(p["done"] + 1, p["total"])
|
||
if p["done"] >= p["total"] and p["running"]:
|
||
p["running"] = False
|
||
_threading.Thread(target=_stamp_last_run, args=(user_id,), daemon=True).start()
|
||
|
||
_task_queue.task_done()
|
||
|
||
# Polite gap — only sleep if more tasks are waiting
|
||
if not _task_queue.empty():
|
||
_time.sleep(random.uniform(30, 90))
|
||
|
||
|
||
def start_discovery_worker():
|
||
"""Start the singleton background worker thread (idempotent)."""
|
||
global _worker_started
|
||
with _worker_lock:
|
||
if not _worker_started:
|
||
_threading.Thread(target=_worker_loop, daemon=True, name="discovery-worker").start()
|
||
_worker_started = True
|
||
|
||
|
||
def get_discovery_progress(user_id: int) -> dict | None:
|
||
with _progress_lock:
|
||
p = _progress.get(user_id)
|
||
return dict(p) if p is not None else None
|
||
|
||
|
||
def _build_search_task_args(db: Session, user_id: int) -> list[tuple[str, str, float]]:
|
||
"""Compute all search/liked query strings without executing any yt-dlp calls."""
|
||
result: list[tuple[str, str, float]] = []
|
||
|
||
followed_rows = db.execute(
|
||
text("""
|
||
SELECT c.name, c.youtube_channel_id
|
||
FROM channels c
|
||
JOIN user_channels uc ON c.id = uc.channel_id
|
||
WHERE uc.user_id = :user_id AND uc.status = 'followed'
|
||
"""),
|
||
{"user_id": user_id},
|
||
).mappings().all()
|
||
|
||
followed_names = [row["name"] for row in followed_rows if row["name"]]
|
||
|
||
tag_rows = db.execute(
|
||
text("""
|
||
SELECT tags FROM (
|
||
SELECT v.tags FROM videos v
|
||
JOIN user_channels uc ON v.channel_id = uc.channel_id
|
||
WHERE uc.user_id = :user_id AND uc.status = 'followed'
|
||
AND v.tags IS NOT NULL AND v.tags != '' AND v.tags != '[]'
|
||
LIMIT 300
|
||
)
|
||
UNION ALL
|
||
SELECT tags FROM (
|
||
SELECT v.tags FROM user_videos uv
|
||
JOIN videos v ON uv.video_id = v.id
|
||
WHERE uv.user_id = :user_id AND uv.liked = 1
|
||
AND v.tags IS NOT NULL AND v.tags != '' AND v.tags != '[]'
|
||
LIMIT 100
|
||
)
|
||
"""),
|
||
{"user_id": user_id},
|
||
).mappings().all()
|
||
|
||
tag_counts: dict[str, int] = {}
|
||
liked_tag_counts: dict[str, int] = {}
|
||
for row in tag_rows:
|
||
try:
|
||
for tag in json.loads(row["tags"]):
|
||
if isinstance(tag, str):
|
||
t = tag.lower().strip()
|
||
if 3 <= len(t) <= 40:
|
||
tag_counts[t] = tag_counts.get(t, 0) + 1
|
||
except (json.JSONDecodeError, TypeError):
|
||
continue
|
||
|
||
cat_rows = db.execute(
|
||
text("""
|
||
SELECT v.category, COUNT(*) AS cnt
|
||
FROM videos v
|
||
JOIN user_channels uc ON v.channel_id = uc.channel_id
|
||
WHERE uc.user_id = :user_id AND uc.status = 'followed'
|
||
AND v.category IS NOT NULL
|
||
GROUP BY v.category
|
||
ORDER BY cnt DESC
|
||
LIMIT 5
|
||
"""),
|
||
{"user_id": user_id},
|
||
).mappings().all()
|
||
|
||
top_tags = [t for t, _ in sorted(tag_counts.items(), key=lambda x: -x[1])[:5]]
|
||
top_cats = [r["category"] for r in cat_rows]
|
||
sampled_names = random.sample(followed_names, min(4, len(followed_names))) if followed_names else []
|
||
serendipity = [f"best {top_cats[0]} channels"] if top_cats else []
|
||
search_queries = list(dict.fromkeys(top_tags + sampled_names + serendipity + top_cats[:2]))[:10]
|
||
for q in search_queries:
|
||
result.append((q, "search", 5.0))
|
||
|
||
# Liked signal queries
|
||
liked_rows = db.execute(
|
||
text("""
|
||
SELECT v.tags FROM user_videos uv
|
||
JOIN videos v ON uv.video_id = v.id
|
||
WHERE uv.user_id = :user_id AND uv.liked = 1
|
||
AND v.tags IS NOT NULL AND v.tags != '' AND v.tags != '[]'
|
||
"""),
|
||
{"user_id": user_id},
|
||
).mappings().all()
|
||
|
||
for row in liked_rows:
|
||
try:
|
||
for tag in json.loads(row["tags"]):
|
||
if isinstance(tag, str):
|
||
t = tag.lower().strip()
|
||
if 3 <= len(t) <= 40:
|
||
liked_tag_counts[t] = liked_tag_counts.get(t, 0) + 2
|
||
except (json.JSONDecodeError, TypeError):
|
||
pass
|
||
|
||
for q in [t for t, _ in sorted(liked_tag_counts.items(), key=lambda x: -x[1])[:4]]:
|
||
result.append((q, "liked", 10.0))
|
||
|
||
return result
|
||
|
||
|
||
def _sample_graph_yt_ids(db: Session, user_id: int) -> list[str]:
|
||
rows = db.execute(
|
||
text("""
|
||
SELECT c.youtube_channel_id
|
||
FROM channels c
|
||
JOIN user_channels uc ON c.id = uc.channel_id
|
||
WHERE uc.user_id = :user_id AND uc.status = 'followed'
|
||
AND c.youtube_channel_id IS NOT NULL
|
||
"""),
|
||
{"user_id": user_id},
|
||
).scalars().all()
|
||
if not rows:
|
||
return []
|
||
return random.sample(list(rows), min(6, len(rows)))
|
||
|
||
|
||
def schedule_discovery(user_id: int, regions: list[str] | None = None):
|
||
"""Schedule a full discovery sweep, spreading yt-dlp calls 30-90 s apart
|
||
with call types shuffled so searches, graph fetches, and trending are mixed."""
|
||
if regions is None:
|
||
regions = ["US", "SE"]
|
||
|
||
from ..database import SessionLocal
|
||
|
||
# Fast signals (pure SQL, no yt-dlp) run synchronously right now
|
||
db = SessionLocal()
|
||
try:
|
||
db.execute(
|
||
text("""
|
||
DELETE FROM discovery_queue
|
||
WHERE user_id = :uid AND seen = 0
|
||
AND created_at <= datetime('now', '-14 days')
|
||
"""),
|
||
{"uid": user_id},
|
||
)
|
||
db.commit()
|
||
update_community_signal(db, user_id)
|
||
update_category_clusters(db, user_id)
|
||
|
||
search_args = _build_search_task_args(db, user_id)
|
||
graph_yt_ids = _sample_graph_yt_ids(db, user_id)
|
||
finally:
|
||
db.close()
|
||
|
||
# Build one task per yt-dlp call, then shuffle to mix call types
|
||
tasks: list[tuple[int, object]] = []
|
||
for query, source, mult in search_args:
|
||
tasks.append((user_id, lambda q=query, s=source, m=mult: _do_task_search(user_id, q, s, m)))
|
||
for region in regions[:1]:
|
||
tasks.append((user_id, lambda r=region: _do_task_trending(user_id, r)))
|
||
for yt_id in graph_yt_ids:
|
||
tasks.append((user_id, lambda y=yt_id: _do_task_graph(user_id, y)))
|
||
|
||
random.shuffle(tasks)
|
||
|
||
with _progress_lock:
|
||
_progress[user_id] = {"total": len(tasks), "done": 0, "running": bool(tasks)}
|
||
|
||
for item in tasks:
|
||
_task_queue.put(item)
|
||
|
||
if not tasks:
|
||
_stamp_last_run(user_id)
|