Files
youclonedl/backend/services/discovery.py
Mattias Thall a535e9f22a Add queue-based gradual discovery with shuffled call ordering and progress UI
Each yt-dlp call is now an independent task (one search query, one trending
fetch, one graph channel fetch). Tasks are shuffled together so we don't fire
10 searches in a row, then enqueued with 30-90s random gaps between them —
a full sweep of ~17 tasks completes in roughly 10-25 minutes instead of
hammering YouTube with 21 calls back-to-back.

Fast signals (community, category clusters) still run synchronously at
schedule time since they're pure SQL.

Progress is tracked per-user (total/done/running) and exposed on
GET /api/discovery/status. The Discovery page polls every 10s while
running and shows a progress bar + "Finding channels… X / Y" in the header.
The auto-discovery daemon skips scheduling if a manual sweep is already running.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-27 02:28:35 +02:00

1009 lines
36 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Discovery engine — search-based crawl, trending, community signal, category clustering."""
import json
import queue as _queue
import random
import threading as _threading
import time as _time
from datetime import datetime
from sqlalchemy.orm import Session
from sqlalchemy import text
from ..models import Channel, UserChannel, DiscoveryQueue, Video
from . import ytdlp
# ---------------------------------------------------------------------------
# Background task queue — spaces yt-dlp calls 30-90 s apart and shuffles
# call types so we don't fire 10 searches in a row.
# ---------------------------------------------------------------------------
_task_queue: _queue.Queue = _queue.Queue()
_progress: dict[int, dict] = {} # user_id -> {total, done, running}
_progress_lock = _threading.Lock()
_worker_started = False
_worker_lock = _threading.Lock()
def _fetch_and_index_channel(db: Session, channel: Channel):
"""Fetch full metadata + recent videos for a discovered channel."""
try:
result = ytdlp.fetch_channel_metadata(channel.youtube_channel_id, max_videos=10, polite=True)
if not result:
return
ch_data = result.get("channel", {})
for k, v in ch_data.items():
if hasattr(channel, k) and v is not None and v != "":
setattr(channel, k, v)
channel.crawled_at = datetime.utcnow()
videos = result.get("videos", [])
# For videos missing a date (RSS didn't cover them or flat-playlist had no timestamp),
# do individual fetches — capped at 3 to avoid slow-downs.
dateless = [v for v in videos if not v.get("published_at")]
individual_fetched: dict[str, dict] = {}
for vdata in dateless[:3]:
yt_id = vdata.get("youtube_video_id")
if not yt_id:
continue
try:
meta = ytdlp.fetch_video_metadata(yt_id, polite=True)
if meta and meta.get("published_at"):
individual_fetched[yt_id] = meta
except Exception:
pass
for vdata in videos:
yt_id = vdata.get("youtube_video_id")
if not yt_id:
continue
# Prefer individually-fetched metadata if we retrieved it
if yt_id in individual_fetched:
vdata = individual_fetched[yt_id]
# Skip videos we still can't date — undated videos break feed ordering
if not vdata.get("published_at"):
continue
if not db.query(Video).filter_by(youtube_video_id=yt_id).first():
db.add(Video(
youtube_video_id=yt_id,
channel_id=channel.id,
title=vdata.get("title", ""),
description=vdata.get("description"),
thumbnail_url=vdata.get("thumbnail_url"),
duration_seconds=vdata.get("duration_seconds"),
published_at=vdata.get("published_at"),
tags=vdata.get("tags"),
category=vdata.get("category"),
))
db.commit()
except Exception:
db.rollback()
def _upsert_channel(db: Session, channel_data: dict) -> Channel | None:
yt_id = channel_data.get("youtube_channel_id")
if not yt_id:
return None
channel = db.query(Channel).filter_by(youtube_channel_id=yt_id).first()
if not channel:
channel = Channel(**channel_data)
db.add(channel)
db.flush()
return channel
_MAX_DISCOVERY_SCORE = 50.0
def _add_to_discovery(
db: Session, user_id: int, channel_id: int, score: float, source: str,
preview_json: str | None = None,
):
score = min(score, _MAX_DISCOVERY_SCORE)
existing = db.query(DiscoveryQueue).filter_by(user_id=user_id, channel_id=channel_id).first()
if existing:
# Accumulate across sources but cap so no single signal dominates forever
existing.score = min(existing.score + score * 0.5, _MAX_DISCOVERY_SCORE)
if preview_json and not existing.preview_json:
existing.preview_json = preview_json
return
db.add(DiscoveryQueue(
user_id=user_id,
channel_id=channel_id,
score=score,
source=source,
preview_json=preview_json,
))
def _search_and_store(
db: Session, user_id: int, queries: list[str],
followed_yt_ids: set[str], score_multiplier: float, source: str,
neg_affinity_tags: frozenset[str] = frozenset(),
):
"""Run YouTube searches for the given queries and add results to discovery."""
discovered: dict[str, dict] = {}
for query in queries:
try:
results = ytdlp.search_youtube(query, max_results=40, polite=True)
except Exception:
results = []
for video in results:
ch = video.get("channel", {})
yt_id = ch.get("youtube_channel_id")
name = (ch.get("name") or "").strip()
if yt_id and name and yt_id not in followed_yt_ids:
if yt_id not in discovered:
discovered[yt_id] = {"name": name, "count": 0, "previews": []}
discovered[yt_id]["count"] += 1
previews = discovered[yt_id]["previews"]
if len(previews) < 3 and video.get("thumbnail_url") and video.get("title"):
previews.append({
"thumbnail_url": video["thumbnail_url"],
"title": video["title"],
})
if not discovered:
return
candidates = sorted(discovered.items(), key=lambda x: -x[1]["count"])
needs_indexing: list[int] = []
for yt_id, info in candidates:
channel = db.query(Channel).filter_by(youtube_channel_id=yt_id).first()
is_new = channel is None
if not channel:
channel = Channel(
youtube_channel_id=yt_id,
name=info["name"],
description="",
thumbnail_url=None,
)
db.add(channel)
db.flush()
uc = db.query(UserChannel).filter_by(user_id=user_id, channel_id=channel.id).first()
if uc and uc.status in ("followed", "dismissed"):
continue
# Skip channels whose indexed videos heavily overlap with negatively-rated tags
if neg_affinity_tags and not is_new and channel.crawled_at:
neg_hit = 0
vtags = db.execute(
text("SELECT tags FROM videos WHERE channel_id = :cid AND tags IS NOT NULL LIMIT 20"),
{"cid": channel.id},
).scalars().all()
for tags_json in vtags:
try:
for tag in json.loads(tags_json or "[]"):
if isinstance(tag, str) and tag.lower().strip() in neg_affinity_tags:
neg_hit += 1
except (json.JSONDecodeError, TypeError):
pass
if neg_hit >= 3:
continue
preview_json = json.dumps(info["previews"]) if info["previews"] else None
_add_to_discovery(
db, user_id, channel.id,
score=float(info["count"]) * score_multiplier,
source=source,
preview_json=preview_json,
)
if is_new or not channel.crawled_at:
needs_indexing.append(channel.id)
db.commit()
for channel_id in needs_indexing[:10]:
channel = db.query(Channel).filter_by(id=channel_id).first()
if channel:
_fetch_and_index_channel(db, channel)
def crawl_by_search(db: Session, user_id: int):
"""Discover channels by searching YouTube using tags, categories, and channel names."""
# All followed channels (names + yt_ids)
followed_rows = db.execute(
text("""
SELECT c.name, c.youtube_channel_id
FROM channels c
JOIN user_channels uc ON c.id = uc.channel_id
WHERE uc.user_id = :user_id AND uc.status = 'followed'
"""),
{"user_id": user_id},
).mappings().all()
followed_yt_ids = {row["youtube_channel_id"] for row in followed_rows}
followed_names = [row["name"] for row in followed_rows if row["name"]]
# Top tags from followed channels' indexed videos + liked videos
# SQLite requires LIMIT inside a subquery when used with UNION ALL
tag_rows = db.execute(
text("""
SELECT tags FROM (
SELECT v.tags
FROM videos v
JOIN user_channels uc ON v.channel_id = uc.channel_id
WHERE uc.user_id = :user_id AND uc.status = 'followed'
AND v.tags IS NOT NULL AND v.tags != '' AND v.tags != '[]'
LIMIT 300
)
UNION ALL
SELECT tags FROM (
SELECT v.tags
FROM user_videos uv
JOIN videos v ON uv.video_id = v.id
WHERE uv.user_id = :user_id AND uv.liked = 1
AND v.tags IS NOT NULL AND v.tags != '' AND v.tags != '[]'
LIMIT 100
)
"""),
{"user_id": user_id},
).mappings().all()
tag_counts: dict[str, int] = {}
for row in tag_rows:
try:
tags = json.loads(row["tags"])
for tag in tags:
if isinstance(tag, str):
t = tag.lower().strip()
if 3 <= len(t) <= 40:
tag_counts[t] = tag_counts.get(t, 0) + 1
except (json.JSONDecodeError, TypeError):
continue
# Top categories as fallback
cat_rows = db.execute(
text("""
SELECT v.category, COUNT(*) AS cnt
FROM videos v
JOIN user_channels uc ON v.channel_id = uc.channel_id
WHERE uc.user_id = :user_id AND uc.status = 'followed'
AND v.category IS NOT NULL
GROUP BY v.category
ORDER BY cnt DESC
LIMIT 5
"""),
{"user_id": user_id},
).mappings().all()
# Keep the query count low — each query is a separate yt-dlp subprocess
# (its own HTTP session). Too many back-to-back sessions look like a bot.
top_tags = [t for t, _ in sorted(tag_counts.items(), key=lambda x: -x[1])[:5]]
top_cats = [r["category"] for r in cat_rows]
# A few randomly-sampled channel names — diversifies results each run
sampled_names: list[str] = []
if followed_names:
sampled_names = random.sample(followed_names, min(4, len(followed_names)))
# One serendipity query to surface content outside the user's direct tag space
serendipity = [f"best {top_cats[0]} channels"] if top_cats else []
# Total target: ≤10 queries
queries = list(dict.fromkeys(top_tags + sampled_names + serendipity + top_cats[:2]))[:10]
if not queries:
return
neg_tags = frozenset(
r["tag"] for r in db.execute(
text("SELECT tag FROM user_tag_affinity WHERE user_id = :user_id AND score < -2"),
{"user_id": user_id},
).mappings().all()
)
_search_and_store(db, user_id, queries, followed_yt_ids, score_multiplier=5.0, source="search",
neg_affinity_tags=neg_tags)
def update_community_signal(db: Session, user_id: int):
"""Surface channels that other users follow, weighted by follower count."""
rows = db.execute(
text("""
SELECT uc.channel_id, COUNT(DISTINCT uc.user_id) AS follower_count
FROM user_channels uc
WHERE uc.user_id != :user_id
AND uc.status = 'followed'
AND uc.channel_id NOT IN (
SELECT channel_id FROM user_channels
WHERE user_id = :user_id
)
GROUP BY uc.channel_id
ORDER BY follower_count DESC
LIMIT 100
"""),
{"user_id": user_id},
).mappings().all()
for row in rows:
_add_to_discovery(
db, user_id, row["channel_id"],
score=float(row["follower_count"]) * 5,
source="community",
)
db.commit()
def update_category_clusters(db: Session, user_id: int):
"""Find channels in categories the user watches heavily."""
rows = db.execute(
text("""
SELECT v.category, COUNT(*) AS watch_count
FROM user_videos uv
JOIN videos v ON uv.video_id = v.id
WHERE uv.user_id = :user_id AND uv.watched = 1 AND v.category IS NOT NULL
GROUP BY v.category
ORDER BY watch_count DESC
LIMIT 5
"""),
{"user_id": user_id},
).mappings().all()
top_categories = [r["category"] for r in rows]
if not top_categories:
return
# Use JSON_EACH / parameterized IN via repeated queries to avoid SQL injection
candidate_channel_ids: set[int] = set()
for cat in top_categories:
cat_rows = db.execute(
text("""
SELECT DISTINCT v.channel_id
FROM videos v
WHERE v.category = :cat
AND v.channel_id NOT IN (
SELECT channel_id FROM user_channels WHERE user_id = :user_id
)
LIMIT 50
"""),
{"cat": cat, "user_id": user_id},
).mappings().all()
for row in cat_rows:
candidate_channel_ids.add(row["channel_id"])
for channel_id in candidate_channel_ids:
_add_to_discovery(db, user_id, channel_id, score=5.0, source="category")
db.commit()
def update_liked_signal(db: Session, user_id: int):
"""Search YouTube for channels related to topics extracted from liked videos."""
liked_rows = db.execute(
text("""
SELECT v.tags
FROM user_videos uv
JOIN videos v ON uv.video_id = v.id
WHERE uv.user_id = :user_id AND uv.liked = 1
AND v.tags IS NOT NULL AND v.tags != '' AND v.tags != '[]'
"""),
{"user_id": user_id},
).mappings().all()
if not liked_rows:
return
tag_counts: dict[str, int] = {}
for row in liked_rows:
try:
tags = json.loads(row["tags"])
for tag in tags:
if isinstance(tag, str):
t = tag.lower().strip()
if 3 <= len(t) <= 40:
tag_counts[t] = tag_counts.get(t, 0) + 2
except (json.JSONDecodeError, TypeError):
pass
if not tag_counts:
return
followed_yt_ids = set(db.execute(
text("""
SELECT c.youtube_channel_id FROM channels c
JOIN user_channels uc ON c.id = uc.channel_id
WHERE uc.user_id = :user_id AND uc.status = 'followed'
"""),
{"user_id": user_id},
).scalars().all())
top_tags = [t for t, _ in sorted(tag_counts.items(), key=lambda x: -x[1])[:4]]
neg_tags = frozenset(
r["tag"] for r in db.execute(
text("SELECT tag FROM user_tag_affinity WHERE user_id = :user_id AND score < -2"),
{"user_id": user_id},
).mappings().all()
)
_search_and_store(db, user_id, top_tags, followed_yt_ids, score_multiplier=10.0, source="liked",
neg_affinity_tags=neg_tags)
def update_watch_signal(db: Session, user_id: int):
"""Discover channels from watched video topics, dampened so a single view has little effect.
A tag needs to appear in at least 3 distinct watched videos before it influences
discovery. Each qualifying tag contributes a modest score (×3 vs liked ×10),
so watching a single Tokyo video won't flood recommendations with Tokyo content.
"""
rows = db.execute(
text("""
SELECT v.tags
FROM user_videos uv
JOIN videos v ON uv.video_id = v.id
WHERE uv.user_id = :user_id AND uv.watched = 1
AND v.tags IS NOT NULL AND v.tags != '' AND v.tags != '[]'
"""),
{"user_id": user_id},
).mappings().all()
if not rows:
return
tag_counts: dict[str, int] = {}
for row in rows:
try:
tags = json.loads(row["tags"])
seen = set()
for tag in tags:
if isinstance(tag, str):
t = tag.lower().strip()
if 3 <= len(t) <= 40 and t not in seen:
tag_counts[t] = tag_counts.get(t, 0) + 1
seen.add(t)
except (json.JSONDecodeError, TypeError):
pass
# Only use tags that appear across 3+ distinct watched videos
qualified = {t: c for t, c in tag_counts.items() if c >= 3}
if not qualified:
return
followed_yt_ids = set(db.execute(
text("""
SELECT c.youtube_channel_id FROM channels c
JOIN user_channels uc ON c.id = uc.channel_id
WHERE uc.user_id = :user_id AND uc.status = 'followed'
"""),
{"user_id": user_id},
).scalars().all())
top_tags = [t for t, _ in sorted(qualified.items(), key=lambda x: -x[1])[:10]]
neg_tags = frozenset(
r["tag"] for r in db.execute(
text("SELECT tag FROM user_tag_affinity WHERE user_id = :user_id AND score < -2"),
{"user_id": user_id},
).mappings().all()
)
_search_and_store(db, user_id, top_tags, followed_yt_ids, score_multiplier=3.0, source="watched",
neg_affinity_tags=neg_tags)
def _build_user_tag_profile(db: Session, user_id: int) -> dict[str, float]:
"""Return tag affinity dict (positive = liked, negative = disliked/dismissed)."""
rows = db.execute(
text("SELECT tag, score FROM user_tag_affinity WHERE user_id = :user_id"),
{"user_id": user_id},
).mappings().all()
return {row["tag"]: row["score"] for row in rows}
def _tag_relevance_score(tag_profile: dict[str, float], tags_json: str | None) -> float:
"""Score a channel's tags against user affinity — positive means relevant, negative means disliked."""
if not tag_profile or not tags_json:
return 0.0
try:
tags = json.loads(tags_json)
except (json.JSONDecodeError, TypeError):
return 0.0
score = 0.0
for tag in tags:
if isinstance(tag, str):
t = tag.lower().strip()
score += tag_profile.get(t, 0.0)
return max(-100.0, min(score, 50.0))
def update_trending_signal(db: Session, user_id: int, regions: list[str]):
"""Fetch trending videos per region and score them by tag overlap with user interests."""
if not regions:
return
tag_profile = _build_user_tag_profile(db, user_id)
followed_yt_ids = set(db.execute(
text("""
SELECT c.youtube_channel_id FROM channels c
JOIN user_channels uc ON c.id = uc.channel_id
WHERE uc.user_id = :user_id AND uc.status = 'followed'
"""),
{"user_id": user_id},
).scalars().all())
dismissed_channel_ids = set(db.execute(
text("""
SELECT channel_id FROM user_channels
WHERE user_id = :user_id AND status = 'dismissed'
"""),
{"user_id": user_id},
).scalars().all())
discovered: dict[str, dict] = {}
for region in regions:
try:
videos = ytdlp.fetch_trending(region=region, max_results=50)
for video in videos:
ch = video.get("channel", {})
yt_id = ch.get("youtube_channel_id")
name = (ch.get("name") or "").strip()
if not yt_id or not name or yt_id in followed_yt_ids:
continue
if yt_id not in discovered:
discovered[yt_id] = {"name": name, "count": 0, "regions": set(), "previews": []}
discovered[yt_id]["count"] += 1
discovered[yt_id]["regions"].add(region)
previews = discovered[yt_id]["previews"]
if len(previews) < 3 and video.get("thumbnail_url") and video.get("title"):
previews.append({
"thumbnail_url": video["thumbnail_url"],
"title": video["title"],
})
except Exception:
continue
if not discovered:
return
needs_indexing: list[int] = []
for yt_id, info in discovered.items():
channel = db.query(Channel).filter_by(youtube_channel_id=yt_id).first()
is_new = channel is None
if not channel:
channel = Channel(
youtube_channel_id=yt_id,
name=info["name"],
description="",
thumbnail_url=None,
)
db.add(channel)
db.flush()
if channel.id in dismissed_channel_ids:
continue
uc = db.query(UserChannel).filter_by(user_id=user_id, channel_id=channel.id).first()
if uc and uc.status in ("followed", "dismissed"):
continue
# Cap base_score so a viral trending channel can't dominate the whole queue.
# count × 4.0 × regions can reach 300+ without this cap.
base_score = min(float(info["count"]) * 4.0 * len(info["regions"]), 18.0)
# Tag relevance: positive for liked content, negative for dismissed/disliked.
# tag_profile comes from user_tag_affinity which tracks both signals.
tag_boost = 0.0
if not is_new and channel.crawled_at:
tag_rows = db.execute(
text("SELECT tags FROM videos WHERE channel_id = :cid AND tags IS NOT NULL LIMIT 20"),
{"cid": channel.id},
).scalars().all()
for tags_json in tag_rows:
tag_boost += _tag_relevance_score(tag_profile, tags_json)
final_score = min(base_score + tag_boost, 25.0)
if final_score <= 0:
continue
preview_json = json.dumps(info["previews"]) if info["previews"] else None
_add_to_discovery(db, user_id, channel.id, score=final_score, source="trending", preview_json=preview_json)
if is_new or not channel.crawled_at:
needs_indexing.append(channel.id)
db.commit()
for channel_id in needs_indexing[:10]:
channel = db.query(Channel).filter_by(id=channel_id).first()
if channel:
_fetch_and_index_channel(db, channel)
def update_graph_signal(db: Session, user_id: int):
"""Discover channels featured on followed channels' /channels tab.
Channels that creators explicitly recommend are high-signal — they're
curated by someone whose taste you already follow. Samples up to 12 followed
channels per run and fetches their featured channels list in parallel.
"""
followed_rows = db.execute(
text("""
SELECT c.youtube_channel_id, c.id
FROM channels c
JOIN user_channels uc ON c.id = uc.channel_id
WHERE uc.user_id = :user_id AND uc.status = 'followed'
AND c.youtube_channel_id IS NOT NULL
"""),
{"user_id": user_id},
).mappings().all()
if not followed_rows:
return
followed_yt_ids = {row["youtube_channel_id"] for row in followed_rows}
dismissed_ids = set(db.execute(
text("SELECT channel_id FROM user_channels WHERE user_id = :user_id AND status = 'dismissed'"),
{"user_id": user_id},
).scalars().all())
sample = random.sample(list(followed_rows), min(6, len(followed_rows)))
featured_map: dict[str, list[str]] = {}
for row in sample:
try:
featured_map[row["youtube_channel_id"]] = ytdlp.fetch_featured_channels(row["youtube_channel_id"])
except Exception:
featured_map[row["youtube_channel_id"]] = []
needs_indexing: list[int] = []
for source_yt_id, channel_ids in featured_map.items():
for yt_id in channel_ids:
if yt_id in followed_yt_ids:
continue
channel = db.query(Channel).filter_by(youtube_channel_id=yt_id).first()
is_new = channel is None
if not channel:
channel = Channel(youtube_channel_id=yt_id, name="", description="", thumbnail_url=None)
db.add(channel)
db.flush()
if channel.id in dismissed_ids:
continue
uc = db.query(UserChannel).filter_by(user_id=user_id, channel_id=channel.id).first()
if uc and uc.status in ("followed", "dismissed"):
continue
_add_to_discovery(db, user_id, channel.id, score=8.0, source="graph")
if is_new or not channel.crawled_at:
needs_indexing.append(channel.id)
db.commit()
for channel_id in needs_indexing[:15]:
channel = db.query(Channel).filter_by(id=channel_id).first()
if channel:
_fetch_and_index_channel(db, channel)
def run_full_discovery(db: Session, user_id: int, regions: list[str] | None = None):
if regions is None:
regions = ["US", "SE"]
# Expire unseen entries older than 14 days so stale high-score channels
# don't block fresh results forever.
db.execute(
text("""
DELETE FROM discovery_queue
WHERE user_id = :user_id AND seen = 0
AND created_at <= datetime('now', '-14 days')
"""),
{"user_id": user_id},
)
db.commit()
crawl_by_search(db, user_id) # ~10 yt-dlp calls
update_community_signal(db, user_id) # no yt-dlp
update_category_clusters(db, user_id) # no yt-dlp
update_liked_signal(db, user_id) # ~4 yt-dlp calls
# update_watch_signal skipped — tags already included in crawl_by_search
update_trending_signal(db, user_id, regions[:1]) # 1 yt-dlp call (first region only)
update_graph_signal(db, user_id) # ~6 yt-dlp calls
# ---------------------------------------------------------------------------
# Queue-based gradual discovery — each yt-dlp call is its own task, shuffled
# so call types are mixed, with 30-90 s gaps between them.
# ---------------------------------------------------------------------------
def _get_followed_yt_ids(db: Session, user_id: int) -> set[str]:
return set(db.execute(
text("""
SELECT c.youtube_channel_id FROM channels c
JOIN user_channels uc ON c.id = uc.channel_id
WHERE uc.user_id = :uid AND uc.status = 'followed'
"""),
{"uid": user_id},
).scalars().all())
def _get_neg_tags(db: Session, user_id: int) -> frozenset[str]:
return frozenset(db.execute(
text("SELECT tag FROM user_tag_affinity WHERE user_id = :uid AND score < -2"),
{"uid": user_id},
).scalars().all())
def _stamp_last_run(user_id: int):
from ..database import SessionLocal
from sqlalchemy import text as _text
db = SessionLocal()
try:
db.execute(
_text("UPDATE user_settings SET last_discovery_run = :now WHERE user_id = :uid"),
{"now": datetime.utcnow(), "uid": user_id},
)
db.commit()
except Exception:
db.rollback()
finally:
db.close()
def _do_task_search(user_id: int, query: str, source: str, score_multiplier: float):
from ..database import SessionLocal
db = SessionLocal()
try:
followed_yt_ids = _get_followed_yt_ids(db, user_id)
neg_tags = _get_neg_tags(db, user_id)
_search_and_store(db, user_id, [query], followed_yt_ids, score_multiplier, source, neg_tags)
finally:
db.close()
def _do_task_trending(user_id: int, region: str):
from ..database import SessionLocal
db = SessionLocal()
try:
update_trending_signal(db, user_id, [region])
finally:
db.close()
def _fetch_graph_for_channel(db: Session, user_id: int, source_yt_id: str):
"""Fetch featured channels for one followed channel and add to discovery queue."""
followed_yt_ids = _get_followed_yt_ids(db, user_id)
dismissed_ids = set(db.execute(
text("SELECT channel_id FROM user_channels WHERE user_id = :uid AND status = 'dismissed'"),
{"uid": user_id},
).scalars().all())
try:
featured = ytdlp.fetch_featured_channels(source_yt_id)
except Exception:
return
needs_indexing: list[int] = []
for yt_id in featured:
if yt_id in followed_yt_ids:
continue
channel = db.query(Channel).filter_by(youtube_channel_id=yt_id).first()
is_new = channel is None
if not channel:
channel = Channel(youtube_channel_id=yt_id, name="", description="", thumbnail_url=None)
db.add(channel)
db.flush()
if channel.id in dismissed_ids:
continue
uc = db.query(UserChannel).filter_by(user_id=user_id, channel_id=channel.id).first()
if uc and uc.status in ("followed", "dismissed"):
continue
_add_to_discovery(db, user_id, channel.id, score=8.0, source="graph")
if is_new or not channel.crawled_at:
needs_indexing.append(channel.id)
db.commit()
for channel_id in needs_indexing[:3]:
ch = db.query(Channel).filter_by(id=channel_id).first()
if ch:
_fetch_and_index_channel(db, ch)
def _do_task_graph(user_id: int, source_yt_id: str):
from ..database import SessionLocal
db = SessionLocal()
try:
_fetch_graph_for_channel(db, user_id, source_yt_id)
finally:
db.close()
def _worker_loop():
while True:
try:
user_id, task = _task_queue.get(timeout=10)
except _queue.Empty:
continue
try:
task()
except Exception:
pass
with _progress_lock:
p = _progress.get(user_id)
if p:
p["done"] = min(p["done"] + 1, p["total"])
if p["done"] >= p["total"] and p["running"]:
p["running"] = False
_threading.Thread(target=_stamp_last_run, args=(user_id,), daemon=True).start()
_task_queue.task_done()
# Polite gap — only sleep if more tasks are waiting
if not _task_queue.empty():
_time.sleep(random.uniform(30, 90))
def start_discovery_worker():
"""Start the singleton background worker thread (idempotent)."""
global _worker_started
with _worker_lock:
if not _worker_started:
_threading.Thread(target=_worker_loop, daemon=True, name="discovery-worker").start()
_worker_started = True
def get_discovery_progress(user_id: int) -> dict | None:
with _progress_lock:
p = _progress.get(user_id)
return dict(p) if p is not None else None
def _build_search_task_args(db: Session, user_id: int) -> list[tuple[str, str, float]]:
"""Compute all search/liked query strings without executing any yt-dlp calls."""
result: list[tuple[str, str, float]] = []
followed_rows = db.execute(
text("""
SELECT c.name, c.youtube_channel_id
FROM channels c
JOIN user_channels uc ON c.id = uc.channel_id
WHERE uc.user_id = :user_id AND uc.status = 'followed'
"""),
{"user_id": user_id},
).mappings().all()
followed_names = [row["name"] for row in followed_rows if row["name"]]
tag_rows = db.execute(
text("""
SELECT tags FROM (
SELECT v.tags FROM videos v
JOIN user_channels uc ON v.channel_id = uc.channel_id
WHERE uc.user_id = :user_id AND uc.status = 'followed'
AND v.tags IS NOT NULL AND v.tags != '' AND v.tags != '[]'
LIMIT 300
)
UNION ALL
SELECT tags FROM (
SELECT v.tags FROM user_videos uv
JOIN videos v ON uv.video_id = v.id
WHERE uv.user_id = :user_id AND uv.liked = 1
AND v.tags IS NOT NULL AND v.tags != '' AND v.tags != '[]'
LIMIT 100
)
"""),
{"user_id": user_id},
).mappings().all()
tag_counts: dict[str, int] = {}
liked_tag_counts: dict[str, int] = {}
for row in tag_rows:
try:
for tag in json.loads(row["tags"]):
if isinstance(tag, str):
t = tag.lower().strip()
if 3 <= len(t) <= 40:
tag_counts[t] = tag_counts.get(t, 0) + 1
except (json.JSONDecodeError, TypeError):
continue
cat_rows = db.execute(
text("""
SELECT v.category, COUNT(*) AS cnt
FROM videos v
JOIN user_channels uc ON v.channel_id = uc.channel_id
WHERE uc.user_id = :user_id AND uc.status = 'followed'
AND v.category IS NOT NULL
GROUP BY v.category
ORDER BY cnt DESC
LIMIT 5
"""),
{"user_id": user_id},
).mappings().all()
top_tags = [t for t, _ in sorted(tag_counts.items(), key=lambda x: -x[1])[:5]]
top_cats = [r["category"] for r in cat_rows]
sampled_names = random.sample(followed_names, min(4, len(followed_names))) if followed_names else []
serendipity = [f"best {top_cats[0]} channels"] if top_cats else []
search_queries = list(dict.fromkeys(top_tags + sampled_names + serendipity + top_cats[:2]))[:10]
for q in search_queries:
result.append((q, "search", 5.0))
# Liked signal queries
liked_rows = db.execute(
text("""
SELECT v.tags FROM user_videos uv
JOIN videos v ON uv.video_id = v.id
WHERE uv.user_id = :user_id AND uv.liked = 1
AND v.tags IS NOT NULL AND v.tags != '' AND v.tags != '[]'
"""),
{"user_id": user_id},
).mappings().all()
for row in liked_rows:
try:
for tag in json.loads(row["tags"]):
if isinstance(tag, str):
t = tag.lower().strip()
if 3 <= len(t) <= 40:
liked_tag_counts[t] = liked_tag_counts.get(t, 0) + 2
except (json.JSONDecodeError, TypeError):
pass
for q in [t for t, _ in sorted(liked_tag_counts.items(), key=lambda x: -x[1])[:4]]:
result.append((q, "liked", 10.0))
return result
def _sample_graph_yt_ids(db: Session, user_id: int) -> list[str]:
rows = db.execute(
text("""
SELECT c.youtube_channel_id
FROM channels c
JOIN user_channels uc ON c.id = uc.channel_id
WHERE uc.user_id = :user_id AND uc.status = 'followed'
AND c.youtube_channel_id IS NOT NULL
"""),
{"user_id": user_id},
).scalars().all()
if not rows:
return []
return random.sample(list(rows), min(6, len(rows)))
def schedule_discovery(user_id: int, regions: list[str] | None = None):
"""Schedule a full discovery sweep, spreading yt-dlp calls 30-90 s apart
with call types shuffled so searches, graph fetches, and trending are mixed."""
if regions is None:
regions = ["US", "SE"]
from ..database import SessionLocal
# Fast signals (pure SQL, no yt-dlp) run synchronously right now
db = SessionLocal()
try:
db.execute(
text("""
DELETE FROM discovery_queue
WHERE user_id = :uid AND seen = 0
AND created_at <= datetime('now', '-14 days')
"""),
{"uid": user_id},
)
db.commit()
update_community_signal(db, user_id)
update_category_clusters(db, user_id)
search_args = _build_search_task_args(db, user_id)
graph_yt_ids = _sample_graph_yt_ids(db, user_id)
finally:
db.close()
# Build one task per yt-dlp call, then shuffle to mix call types
tasks: list[tuple[int, object]] = []
for query, source, mult in search_args:
tasks.append((user_id, lambda q=query, s=source, m=mult: _do_task_search(user_id, q, s, m)))
for region in regions[:1]:
tasks.append((user_id, lambda r=region: _do_task_trending(user_id, r)))
for yt_id in graph_yt_ids:
tasks.append((user_id, lambda y=yt_id: _do_task_graph(user_id, y)))
random.shuffle(tasks)
with _progress_lock:
_progress[user_id] = {"total": len(tasks), "done": 0, "running": bool(tasks)}
for item in tasks:
_task_queue.put(item)
if not tasks:
_stamp_last_run(user_id)