From a3346c6e8707ce8a95108a661037a4b23f10e9d8 Mon Sep 17 00:00:00 2001 From: Mattias Thall Date: Wed, 27 May 2026 03:17:37 +0200 Subject: [PATCH] fix: stop discovery from bursting dozens of yt-dlp calls inside one task MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Each search/graph/trending task was calling _fetch_and_index_channel inline for up to 10-15 newly discovered channels, each making up to 4 yt-dlp calls (1 channel metadata + 3 individual video fetches for dateless entries). This bypassed the 30-90 s worker gap, producing bursts of 40-60 calls in rapid succession and hammering YouTube. Changes: - _fetch_and_index_channel: removed the dateless-video individual fetch loop — one call per channel, videos without published_at are simply skipped at discovery time - _search_and_store and _fetch_graph_for_channel: queue channel indexing as separate worker tasks (3 and 2 respectively) so the 30-90 s gap applies between every yt-dlp call, including channel indexing - update_trending_signal and update_graph_signal (old sync path): removed inline _fetch_and_index_channel loops (15 and 10 channels) - _discovery_task in channels.py: replaced run_full_discovery (old synchronous path) with schedule_discovery so sync-all and follow-by-url go through the queue system Co-Authored-By: Claude Sonnet 4.6 --- backend/routers/channels.py | 8 ++--- backend/services/discovery.py | 65 +++++++++++------------------------ 2 files changed, 23 insertions(+), 50 deletions(-) diff --git a/backend/routers/channels.py b/backend/routers/channels.py index 41350ec..4c2b604 100644 --- a/backend/routers/channels.py +++ b/backend/routers/channels.py @@ -173,15 +173,11 @@ def _index_channel_task(channel_id: int, user_id: int, max_videos: int = 30): def _discovery_task(user_id: int): - from ..database import SessionLocal - from ..services.discovery import run_full_discovery - db = SessionLocal() + from ..services.discovery import schedule_discovery try: - run_full_discovery(db, user_id) + schedule_discovery(user_id) except Exception: pass - finally: - db.close() def _enrich_missing_task(limit: int = 20): diff --git a/backend/services/discovery.py b/backend/services/discovery.py index dc8e42b..83f0ceb 100644 --- a/backend/services/discovery.py +++ b/backend/services/discovery.py @@ -23,7 +23,7 @@ _worker_lock = _threading.Lock() def _fetch_and_index_channel(db: Session, channel: Channel): - """Fetch full metadata + recent videos for a discovered channel.""" + """Fetch metadata + recent videos for a discovered channel (one yt-dlp call only).""" try: result = ytdlp.fetch_channel_metadata(channel.youtube_channel_id, max_videos=10, polite=True) if not result: @@ -34,32 +34,9 @@ def _fetch_and_index_channel(db: Session, channel: Channel): setattr(channel, k, v) channel.crawled_at = datetime.utcnow() - videos = result.get("videos", []) - - # For videos missing a date (RSS didn't cover them or flat-playlist had no timestamp), - # do individual fetches — capped at 3 to avoid slow-downs. - dateless = [v for v in videos if not v.get("published_at")] - individual_fetched: dict[str, dict] = {} - for vdata in dateless[:3]: + for vdata in result.get("videos", []): yt_id = vdata.get("youtube_video_id") - if not yt_id: - continue - try: - meta = ytdlp.fetch_video_metadata(yt_id, polite=True) - if meta and meta.get("published_at"): - individual_fetched[yt_id] = meta - except Exception: - pass - - for vdata in videos: - yt_id = vdata.get("youtube_video_id") - if not yt_id: - continue - # Prefer individually-fetched metadata if we retrieved it - if yt_id in individual_fetched: - vdata = individual_fetched[yt_id] - # Skip videos we still can't date — undated videos break feed ordering - if not vdata.get("published_at"): + if not yt_id or not vdata.get("published_at"): continue if not db.query(Video).filter_by(youtube_video_id=yt_id).first(): db.add(Video( @@ -194,10 +171,9 @@ def _search_and_store( db.commit() - for channel_id in needs_indexing[:10]: - channel = db.query(Channel).filter_by(id=channel_id).first() - if channel: - _fetch_and_index_channel(db, channel) + # Queue channel indexing as separate worker tasks (30-90 s gaps apply). + for channel_id in needs_indexing[:3]: + _task_queue.put((user_id, lambda cid=channel_id: _do_task_index_channel(user_id, cid))) def crawl_by_search(db: Session, user_id: int): @@ -599,11 +575,6 @@ def update_trending_signal(db: Session, user_id: int, regions: list[str]): db.commit() - for channel_id in needs_indexing[:10]: - channel = db.query(Channel).filter_by(id=channel_id).first() - if channel: - _fetch_and_index_channel(db, channel) - def update_graph_signal(db: Session, user_id: int): """Discover channels featured on followed channels' /channels tab. @@ -664,11 +635,6 @@ def update_graph_signal(db: Session, user_id: int): db.commit() - for channel_id in needs_indexing[:15]: - channel = db.query(Channel).filter_by(id=channel_id).first() - if channel: - _fetch_and_index_channel(db, channel) - def run_full_discovery(db: Session, user_id: int, regions: list[str] | None = None): if regions is None: @@ -788,10 +754,8 @@ def _fetch_graph_for_channel(db: Session, user_id: int, source_yt_id: str): db.commit() - for channel_id in needs_indexing[:3]: - ch = db.query(Channel).filter_by(id=channel_id).first() - if ch: - _fetch_and_index_channel(db, ch) + for channel_id in needs_indexing[:2]: + _task_queue.put((user_id, lambda cid=channel_id: _do_task_index_channel(user_id, cid))) def _do_task_graph(user_id: int, source_yt_id: str): @@ -803,6 +767,19 @@ def _do_task_graph(user_id: int, source_yt_id: str): db.close() +def _do_task_index_channel(user_id: int, channel_id: int): + """Index one newly-discovered channel (one yt-dlp call). Queued as a separate + worker task so the 30-90 s gap applies rather than bursting inline.""" + from ..database import SessionLocal + db = SessionLocal() + try: + channel = db.query(Channel).filter_by(id=channel_id).first() + if channel: + _fetch_and_index_channel(db, channel) + finally: + db.close() + + def _worker_loop(): while True: try: