From a535e9f22a7c536397f71fb164b23a9d52b92edd Mon Sep 17 00:00:00 2001 From: Mattias Thall Date: Wed, 27 May 2026 02:28:35 +0200 Subject: [PATCH] Add queue-based gradual discovery with shuffled call ordering and progress UI MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Each yt-dlp call is now an independent task (one search query, one trending fetch, one graph channel fetch). Tasks are shuffled together so we don't fire 10 searches in a row, then enqueued with 30-90s random gaps between them — a full sweep of ~17 tasks completes in roughly 10-25 minutes instead of hammering YouTube with 21 calls back-to-back. Fast signals (community, category clusters) still run synchronously at schedule time since they're pure SQL. Progress is tracked per-user (total/done/running) and exposed on GET /api/discovery/status. The Discovery page polls every 10s while running and shows a progress bar + "Finding channels… X / Y" in the header. The auto-discovery daemon skips scheduling if a manual sweep is already running. Co-Authored-By: Claude Sonnet 4.6 --- backend/main.py | 17 +- backend/routers/discovery.py | 30 +-- backend/services/discovery.py | 326 +++++++++++++++++++++++++++++++ frontend/src/pages/Discovery.jsx | 38 ++-- 4 files changed, 367 insertions(+), 44 deletions(-) diff --git a/backend/main.py b/backend/main.py index 7befbf5..4a9fd3b 100644 --- a/backend/main.py +++ b/backend/main.py @@ -175,9 +175,11 @@ def on_startup(): finally: db.close() - # Backfill descriptions for videos that don't have them yet (runs in background) + # Start discovery worker and backfill enrichment import threading from .routers.channels import _enrich_missing_task, _index_channels_batch + from .services.discovery import start_discovery_worker + start_discovery_worker() threading.Thread(target=_enrich_missing_task, args=(50,), daemon=True).start() def _auto_sync_daemon(): @@ -222,7 +224,7 @@ def on_startup(): import time as _time from datetime import datetime as _dt, timedelta as _td from sqlalchemy import text as _text - from .services.discovery import run_full_discovery + from .services.discovery import schedule_discovery, get_discovery_progress # Wait 5 minutes after startup before the first check so the app can # finish initialising and existing enrichment tasks can settle. @@ -244,13 +246,12 @@ def on_startup(): last = row["last_discovery_run"] if last is None or (_dt.utcnow() - _dt.fromisoformat(str(last))) > _td(hours=23): uid = row["user_id"] + # Skip if a manual sweep is already running + prog = get_discovery_progress(uid) + if prog and prog.get("running"): + continue regions = [r.strip().upper() for r in (row["discovery_regions"] or "US,SE").split(",") if r.strip()] - run_full_discovery(db, uid, regions) - db.execute( - _text("UPDATE user_settings SET last_discovery_run = :now WHERE user_id = :uid"), - {"now": _dt.utcnow(), "uid": uid}, - ) - db.commit() + schedule_discovery(uid, regions) finally: db.close() except Exception: diff --git a/backend/routers/discovery.py b/backend/routers/discovery.py index 434efae..28635b4 100644 --- a/backend/routers/discovery.py +++ b/backend/routers/discovery.py @@ -2,7 +2,7 @@ import json import random from typing import Optional -from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException +from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel from sqlalchemy.orm import Session from sqlalchemy import text @@ -10,7 +10,7 @@ from sqlalchemy import text from ..auth_utils import get_current_user from ..database import get_db from ..models import Channel, DiscoveryQueue, User, UserChannel, UserSettings -from ..services.discovery import run_full_discovery +from ..services.discovery import schedule_discovery, get_discovery_progress router = APIRouter() @@ -160,34 +160,14 @@ def dismiss_discovery( @router.post("/refresh", status_code=202) def refresh_discovery( - background_tasks: BackgroundTasks, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): s = db.query(UserSettings).filter_by(user_id=current_user.id).first() regions_str = (s.discovery_regions if s and s.discovery_regions else "US,SE") regions = [r.strip().upper() for r in regions_str.split(",") if r.strip()] - user_id = current_user.id - - def _run_discovery(): - from datetime import datetime - from ..database import SessionLocal - from sqlalchemy import text as _text - fresh_db = SessionLocal() - try: - run_full_discovery(fresh_db, user_id, regions) - fresh_db.execute( - _text("UPDATE user_settings SET last_discovery_run = :now WHERE user_id = :uid"), - {"now": datetime.utcnow(), "uid": user_id}, - ) - fresh_db.commit() - finally: - fresh_db.close() - - background_tasks.add_task(_run_discovery) - from .channels import _enrich_missing_task - background_tasks.add_task(_enrich_missing_task, 20) - return {"detail": "Discovery refresh started"} + schedule_discovery(current_user.id, regions) + return {"detail": "Discovery queued"} @router.get("/videos", response_model=list[dict]) @@ -263,7 +243,6 @@ def discovery_status( db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): - from ..models import UserSettings s = db.query(UserSettings).filter_by(user_id=current_user.id).first() pending = db.execute( text("SELECT COUNT(*) AS n FROM discovery_queue WHERE user_id = :uid AND seen = 0"), @@ -272,6 +251,7 @@ def discovery_status( return { "last_run": s.last_discovery_run.isoformat() if s and s.last_discovery_run else None, "pending_count": pending["n"] if pending else 0, + "progress": get_discovery_progress(current_user.id), } diff --git a/backend/services/discovery.py b/backend/services/discovery.py index cc90a57..dc8e42b 100644 --- a/backend/services/discovery.py +++ b/backend/services/discovery.py @@ -1,6 +1,9 @@ """Discovery engine — search-based crawl, trending, community signal, category clustering.""" import json +import queue as _queue import random +import threading as _threading +import time as _time from datetime import datetime from sqlalchemy.orm import Session from sqlalchemy import text @@ -8,6 +11,16 @@ from sqlalchemy import text from ..models import Channel, UserChannel, DiscoveryQueue, Video from . import ytdlp +# --------------------------------------------------------------------------- +# Background task queue — spaces yt-dlp calls 30-90 s apart and shuffles +# call types so we don't fire 10 searches in a row. +# --------------------------------------------------------------------------- +_task_queue: _queue.Queue = _queue.Queue() +_progress: dict[int, dict] = {} # user_id -> {total, done, running} +_progress_lock = _threading.Lock() +_worker_started = False +_worker_lock = _threading.Lock() + def _fetch_and_index_channel(db: Session, channel: Channel): """Fetch full metadata + recent videos for a discovered channel.""" @@ -680,3 +693,316 @@ def run_full_discovery(db: Session, user_id: int, regions: list[str] | None = No # update_watch_signal skipped — tags already included in crawl_by_search update_trending_signal(db, user_id, regions[:1]) # 1 yt-dlp call (first region only) update_graph_signal(db, user_id) # ~6 yt-dlp calls + + +# --------------------------------------------------------------------------- +# Queue-based gradual discovery — each yt-dlp call is its own task, shuffled +# so call types are mixed, with 30-90 s gaps between them. +# --------------------------------------------------------------------------- + +def _get_followed_yt_ids(db: Session, user_id: int) -> set[str]: + return set(db.execute( + text(""" + SELECT c.youtube_channel_id FROM channels c + JOIN user_channels uc ON c.id = uc.channel_id + WHERE uc.user_id = :uid AND uc.status = 'followed' + """), + {"uid": user_id}, + ).scalars().all()) + + +def _get_neg_tags(db: Session, user_id: int) -> frozenset[str]: + return frozenset(db.execute( + text("SELECT tag FROM user_tag_affinity WHERE user_id = :uid AND score < -2"), + {"uid": user_id}, + ).scalars().all()) + + +def _stamp_last_run(user_id: int): + from ..database import SessionLocal + from sqlalchemy import text as _text + db = SessionLocal() + try: + db.execute( + _text("UPDATE user_settings SET last_discovery_run = :now WHERE user_id = :uid"), + {"now": datetime.utcnow(), "uid": user_id}, + ) + db.commit() + except Exception: + db.rollback() + finally: + db.close() + + +def _do_task_search(user_id: int, query: str, source: str, score_multiplier: float): + from ..database import SessionLocal + db = SessionLocal() + try: + followed_yt_ids = _get_followed_yt_ids(db, user_id) + neg_tags = _get_neg_tags(db, user_id) + _search_and_store(db, user_id, [query], followed_yt_ids, score_multiplier, source, neg_tags) + finally: + db.close() + + +def _do_task_trending(user_id: int, region: str): + from ..database import SessionLocal + db = SessionLocal() + try: + update_trending_signal(db, user_id, [region]) + finally: + db.close() + + +def _fetch_graph_for_channel(db: Session, user_id: int, source_yt_id: str): + """Fetch featured channels for one followed channel and add to discovery queue.""" + followed_yt_ids = _get_followed_yt_ids(db, user_id) + dismissed_ids = set(db.execute( + text("SELECT channel_id FROM user_channels WHERE user_id = :uid AND status = 'dismissed'"), + {"uid": user_id}, + ).scalars().all()) + + try: + featured = ytdlp.fetch_featured_channels(source_yt_id) + except Exception: + return + + needs_indexing: list[int] = [] + for yt_id in featured: + if yt_id in followed_yt_ids: + continue + channel = db.query(Channel).filter_by(youtube_channel_id=yt_id).first() + is_new = channel is None + if not channel: + channel = Channel(youtube_channel_id=yt_id, name="", description="", thumbnail_url=None) + db.add(channel) + db.flush() + if channel.id in dismissed_ids: + continue + uc = db.query(UserChannel).filter_by(user_id=user_id, channel_id=channel.id).first() + if uc and uc.status in ("followed", "dismissed"): + continue + _add_to_discovery(db, user_id, channel.id, score=8.0, source="graph") + if is_new or not channel.crawled_at: + needs_indexing.append(channel.id) + + db.commit() + + for channel_id in needs_indexing[:3]: + ch = db.query(Channel).filter_by(id=channel_id).first() + if ch: + _fetch_and_index_channel(db, ch) + + +def _do_task_graph(user_id: int, source_yt_id: str): + from ..database import SessionLocal + db = SessionLocal() + try: + _fetch_graph_for_channel(db, user_id, source_yt_id) + finally: + db.close() + + +def _worker_loop(): + while True: + try: + user_id, task = _task_queue.get(timeout=10) + except _queue.Empty: + continue + + try: + task() + except Exception: + pass + + with _progress_lock: + p = _progress.get(user_id) + if p: + p["done"] = min(p["done"] + 1, p["total"]) + if p["done"] >= p["total"] and p["running"]: + p["running"] = False + _threading.Thread(target=_stamp_last_run, args=(user_id,), daemon=True).start() + + _task_queue.task_done() + + # Polite gap — only sleep if more tasks are waiting + if not _task_queue.empty(): + _time.sleep(random.uniform(30, 90)) + + +def start_discovery_worker(): + """Start the singleton background worker thread (idempotent).""" + global _worker_started + with _worker_lock: + if not _worker_started: + _threading.Thread(target=_worker_loop, daemon=True, name="discovery-worker").start() + _worker_started = True + + +def get_discovery_progress(user_id: int) -> dict | None: + with _progress_lock: + p = _progress.get(user_id) + return dict(p) if p is not None else None + + +def _build_search_task_args(db: Session, user_id: int) -> list[tuple[str, str, float]]: + """Compute all search/liked query strings without executing any yt-dlp calls.""" + result: list[tuple[str, str, float]] = [] + + followed_rows = db.execute( + text(""" + SELECT c.name, c.youtube_channel_id + FROM channels c + JOIN user_channels uc ON c.id = uc.channel_id + WHERE uc.user_id = :user_id AND uc.status = 'followed' + """), + {"user_id": user_id}, + ).mappings().all() + + followed_names = [row["name"] for row in followed_rows if row["name"]] + + tag_rows = db.execute( + text(""" + SELECT tags FROM ( + SELECT v.tags FROM videos v + JOIN user_channels uc ON v.channel_id = uc.channel_id + WHERE uc.user_id = :user_id AND uc.status = 'followed' + AND v.tags IS NOT NULL AND v.tags != '' AND v.tags != '[]' + LIMIT 300 + ) + UNION ALL + SELECT tags FROM ( + SELECT v.tags FROM user_videos uv + JOIN videos v ON uv.video_id = v.id + WHERE uv.user_id = :user_id AND uv.liked = 1 + AND v.tags IS NOT NULL AND v.tags != '' AND v.tags != '[]' + LIMIT 100 + ) + """), + {"user_id": user_id}, + ).mappings().all() + + tag_counts: dict[str, int] = {} + liked_tag_counts: dict[str, int] = {} + for row in tag_rows: + try: + for tag in json.loads(row["tags"]): + if isinstance(tag, str): + t = tag.lower().strip() + if 3 <= len(t) <= 40: + tag_counts[t] = tag_counts.get(t, 0) + 1 + except (json.JSONDecodeError, TypeError): + continue + + cat_rows = db.execute( + text(""" + SELECT v.category, COUNT(*) AS cnt + FROM videos v + JOIN user_channels uc ON v.channel_id = uc.channel_id + WHERE uc.user_id = :user_id AND uc.status = 'followed' + AND v.category IS NOT NULL + GROUP BY v.category + ORDER BY cnt DESC + LIMIT 5 + """), + {"user_id": user_id}, + ).mappings().all() + + top_tags = [t for t, _ in sorted(tag_counts.items(), key=lambda x: -x[1])[:5]] + top_cats = [r["category"] for r in cat_rows] + sampled_names = random.sample(followed_names, min(4, len(followed_names))) if followed_names else [] + serendipity = [f"best {top_cats[0]} channels"] if top_cats else [] + search_queries = list(dict.fromkeys(top_tags + sampled_names + serendipity + top_cats[:2]))[:10] + for q in search_queries: + result.append((q, "search", 5.0)) + + # Liked signal queries + liked_rows = db.execute( + text(""" + SELECT v.tags FROM user_videos uv + JOIN videos v ON uv.video_id = v.id + WHERE uv.user_id = :user_id AND uv.liked = 1 + AND v.tags IS NOT NULL AND v.tags != '' AND v.tags != '[]' + """), + {"user_id": user_id}, + ).mappings().all() + + for row in liked_rows: + try: + for tag in json.loads(row["tags"]): + if isinstance(tag, str): + t = tag.lower().strip() + if 3 <= len(t) <= 40: + liked_tag_counts[t] = liked_tag_counts.get(t, 0) + 2 + except (json.JSONDecodeError, TypeError): + pass + + for q in [t for t, _ in sorted(liked_tag_counts.items(), key=lambda x: -x[1])[:4]]: + result.append((q, "liked", 10.0)) + + return result + + +def _sample_graph_yt_ids(db: Session, user_id: int) -> list[str]: + rows = db.execute( + text(""" + SELECT c.youtube_channel_id + FROM channels c + JOIN user_channels uc ON c.id = uc.channel_id + WHERE uc.user_id = :user_id AND uc.status = 'followed' + AND c.youtube_channel_id IS NOT NULL + """), + {"user_id": user_id}, + ).scalars().all() + if not rows: + return [] + return random.sample(list(rows), min(6, len(rows))) + + +def schedule_discovery(user_id: int, regions: list[str] | None = None): + """Schedule a full discovery sweep, spreading yt-dlp calls 30-90 s apart + with call types shuffled so searches, graph fetches, and trending are mixed.""" + if regions is None: + regions = ["US", "SE"] + + from ..database import SessionLocal + + # Fast signals (pure SQL, no yt-dlp) run synchronously right now + db = SessionLocal() + try: + db.execute( + text(""" + DELETE FROM discovery_queue + WHERE user_id = :uid AND seen = 0 + AND created_at <= datetime('now', '-14 days') + """), + {"uid": user_id}, + ) + db.commit() + update_community_signal(db, user_id) + update_category_clusters(db, user_id) + + search_args = _build_search_task_args(db, user_id) + graph_yt_ids = _sample_graph_yt_ids(db, user_id) + finally: + db.close() + + # Build one task per yt-dlp call, then shuffle to mix call types + tasks: list[tuple[int, object]] = [] + for query, source, mult in search_args: + tasks.append((user_id, lambda q=query, s=source, m=mult: _do_task_search(user_id, q, s, m))) + for region in regions[:1]: + tasks.append((user_id, lambda r=region: _do_task_trending(user_id, r))) + for yt_id in graph_yt_ids: + tasks.append((user_id, lambda y=yt_id: _do_task_graph(user_id, y))) + + random.shuffle(tasks) + + with _progress_lock: + _progress[user_id] = {"total": len(tasks), "done": 0, "running": bool(tasks)} + + for item in tasks: + _task_queue.put(item) + + if not tasks: + _stamp_last_run(user_id) diff --git a/frontend/src/pages/Discovery.jsx b/frontend/src/pages/Discovery.jsx index 9e0c7be..bcd8d8a 100644 --- a/frontend/src/pages/Discovery.jsx +++ b/frontend/src/pages/Discovery.jsx @@ -217,7 +217,8 @@ export default function DiscoveryPage() { const { data: discStatus } = useQuery({ queryKey: ["discovery-status"], queryFn: () => getDiscoveryStatus().then(r => r.data), - staleTime: 60_000, + staleTime: 10_000, + refetchInterval: (query) => query.state.data?.progress?.running ? 10_000 : 60_000, }); const refreshMut = useMutation({ @@ -247,37 +248,52 @@ export default function DiscoveryPage() {
{/* Header */}
-
+

Discover

{discStatus && (

- {discStatus.pending_count > 0 - ? `${discStatus.pending_count} channel${discStatus.pending_count !== 1 ? "s" : ""} queued` - : "Queue empty"} + {discStatus.progress?.running + ? `Finding channels… ${discStatus.progress.done} / ${discStatus.progress.total}` + : discStatus.pending_count > 0 + ? `${discStatus.pending_count} channel${discStatus.pending_count !== 1 ? "s" : ""} queued` + : "Queue empty"} {discStatus.last_run ? ` · last refreshed ${new Date(discStatus.last_run + "Z").toLocaleDateString(undefined, { month: "short", day: "numeric", hour: "2-digit", minute: "2-digit" })}` : " · never refreshed"}

)} + {discStatus?.progress?.running && ( +
+
+
+ )}
- {refreshMut.isSuccess && ( + {(refreshMut.isSuccess || discStatus?.progress?.running) && (
- Discovery is running in the background — it searches YouTube using your tags and interests and takes a few minutes. New channels will appear when it finishes. It also runs automatically every day. + Discovery is running in the background — searches and channel fetches are spaced out over ~20 minutes to avoid hitting limits. New channels appear as each batch completes. Runs automatically every day.
)}