From 2f370721871812ceb57f07797c77baf405a1bae2 Mon Sep 17 00:00:00 2001 From: Mattias Thall Date: Tue, 26 May 2026 22:36:18 +0200 Subject: [PATCH] Fix popular fetch and improve date/view_count coverage MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Popular fetch now does a two-phase approach: fast flat-playlist to get IDs in popularity order, then parallel full metadata fetch (8 workers) to get real view_count and published_at for each video. Previously flat-playlist mode returned timestamp/view_count as null. Enrich task now also backfills published_at and view_count (not just description). Startup limit 3→50, enrichment sleep 2s→0.5s. Raise all thread pool sizes to match 8-core machine: - Discovery search: 5→8 workers - Graph signal: 4→8 workers - Popular fetch: 5→8 workers - Download semaphore default 3→6, cap 10→16 Co-Authored-By: Claude Sonnet 4.6 --- backend/main.py | 2 +- backend/routers/channels.py | 126 ++++++++++++++++++++------------- backend/services/discovery.py | 4 +- backend/services/ytdlp.py | 4 +- frontend/src/pages/Channel.jsx | 2 +- 5 files changed, 82 insertions(+), 56 deletions(-) diff --git a/backend/main.py b/backend/main.py index f414bd1..cc1330f 100644 --- a/backend/main.py +++ b/backend/main.py @@ -168,7 +168,7 @@ def on_startup(): # Backfill descriptions for videos that don't have them yet (runs in background) import threading from .routers.channels import _enrich_missing_task, _index_channels_batch - threading.Thread(target=_enrich_missing_task, args=(3,), daemon=True).start() + threading.Thread(target=_enrich_missing_task, args=(50,), daemon=True).start() def _auto_sync_daemon(): import time diff --git a/backend/routers/channels.py b/backend/routers/channels.py index 129285d..1fc1475 100644 --- a/backend/routers/channels.py +++ b/backend/routers/channels.py @@ -185,16 +185,16 @@ def _discovery_task(user_id: int): def _enrich_missing_task(limit: int = 20): - """Fetch full metadata for videos that are missing a description.""" + """Fetch full metadata for videos missing description, published_at, or view_count.""" from ..database import SessionLocal + import time db = SessionLocal() try: rows = db.execute( text(""" SELECT v.id, v.youtube_video_id FROM videos v - WHERE v.description IS NULL + WHERE v.description IS NULL OR v.published_at IS NULL OR v.view_count IS NULL ORDER BY - -- prioritise: followed-channel videos first, then discovery queue, then rest (EXISTS (SELECT 1 FROM user_channels uc WHERE uc.channel_id = v.channel_id AND uc.status = 'followed')) DESC, (EXISTS (SELECT 1 FROM discovery_queue dq @@ -206,7 +206,7 @@ def _enrich_missing_task(limit: int = 20): ).mappings().all() for i, row in enumerate(rows): if i > 0: - import time; time.sleep(2) + time.sleep(0.5) try: meta = ytdlp.fetch_video_metadata(row["youtube_video_id"]) if meta: @@ -214,6 +214,10 @@ def _enrich_missing_task(limit: int = 20): if vid: if meta.get("description") is not None: vid.description = meta["description"] or "" + if not vid.published_at and meta.get("published_at"): + vid.published_at = meta["published_at"] + if vid.view_count is None and meta.get("view_count") is not None: + vid.view_count = meta["view_count"] if not vid.tags and meta.get("tags"): vid.tags = meta["tags"] if not vid.category and meta.get("category"): @@ -277,7 +281,7 @@ def sync_all_channels( background_tasks.add_task(_index_channels_batch, ids, current_user.id) background_tasks.add_task(_discovery_task, current_user.id) - background_tasks.add_task(_enrich_missing_task, 5) + background_tasks.add_task(_enrich_missing_task, 30) return {"indexing": len(channels)} @@ -651,59 +655,81 @@ def fetch_popular_videos( def _fetch_popular_task(channel_id: int, youtube_channel_id: str): + """Two-phase popular fetch: get IDs fast via flat-playlist, then enrich with full metadata in parallel.""" from ..database import SessionLocal + from concurrent.futures import ThreadPoolExecutor, as_completed + + if youtube_channel_id.startswith("@"): + url = f"https://www.youtube.com/{youtube_channel_id}/videos?sort=p" + else: + url = f"https://www.youtube.com/channel/{youtube_channel_id}/videos?sort=p" + + # Phase 1: get ordered list of popular video IDs (fast) + stdout, _, _ = ytdlp._run([ + "yt-dlp", url, + "--dump-json", "--flat-playlist", + "--playlist-end", "30", + "--quiet", + *ytdlp._cookie_args(), + ], timeout=60) + + video_ids = [] + for line in stdout.splitlines(): + line = line.strip() + if not line: + continue + try: + info = json.loads(line) + yt_id = info.get("id") + if yt_id: + video_ids.append(yt_id) + except json.JSONDecodeError: + continue + + if not video_ids: + return + + # Phase 2: fetch full metadata in parallel (gets view_count + published_at) + with ThreadPoolExecutor(max_workers=8) as pool: + futures = {pool.submit(ytdlp.fetch_video_metadata, vid): vid for vid in video_ids} + results = {} + for future in as_completed(futures): + vid = futures[future] + try: + results[vid] = future.result() + except Exception: + results[vid] = None + db = SessionLocal() try: - if youtube_channel_id.startswith("@"): - url = f"https://www.youtube.com/{youtube_channel_id}/videos?sort=p" - else: - url = f"https://www.youtube.com/channel/{youtube_channel_id}/videos?sort=p" - - stdout, _, code = ytdlp._run([ - "yt-dlp", url, - "--dump-json", "--flat-playlist", - "--playlist-end", "100", - "--quiet", - *ytdlp._cookie_args(), - ], timeout=120) - channel = db.query(Channel).filter_by(id=channel_id).first() if not channel: return - - for line in stdout.splitlines(): - line = line.strip() - if not line: + for yt_id in video_ids: + meta = results.get(yt_id) + if not meta: continue try: - info = json.loads(line) - except json.JSONDecodeError: - continue - yt_id = info.get("id") - if not yt_id: - continue - existing = db.query(Video).filter_by(youtube_video_id=yt_id).first() - view_count = info.get("view_count") - published_at = ytdlp._parse_published(info) - if existing: - if view_count is not None: - existing.view_count = view_count - if published_at and not existing.published_at: - existing.published_at = published_at - else: - db.add(Video( - youtube_video_id=yt_id, - channel_id=channel.id, - title=info.get("title", ""), - thumbnail_url=ytdlp._stable_thumbnail(yt_id), - duration_seconds=info.get("duration"), - published_at=published_at, - tags=json.dumps(info.get("tags") or []), - view_count=view_count, - )) - db.commit() - except Exception: - db.rollback() + existing = db.query(Video).filter_by(youtube_video_id=yt_id).first() + if existing: + if meta.get("view_count") is not None: + existing.view_count = meta["view_count"] + if meta.get("published_at") and not existing.published_at: + existing.published_at = meta["published_at"] + else: + db.add(Video( + youtube_video_id=yt_id, + channel_id=channel.id, + title=meta.get("title", ""), + thumbnail_url=ytdlp._stable_thumbnail(yt_id), + duration_seconds=meta.get("duration_seconds"), + published_at=meta.get("published_at"), + tags=meta.get("tags") or "[]", + view_count=meta.get("view_count"), + )) + db.commit() + except Exception: + db.rollback() finally: db.close() diff --git a/backend/services/discovery.py b/backend/services/discovery.py index 4aa8beb..88828fb 100644 --- a/backend/services/discovery.py +++ b/backend/services/discovery.py @@ -112,7 +112,7 @@ def _search_and_store( except Exception: return [] - with ThreadPoolExecutor(max_workers=5) as pool: + with ThreadPoolExecutor(max_workers=8) as pool: futures = {pool.submit(_do_search, q): q for q in queries} for fut in as_completed(futures): for video in fut.result(): @@ -620,7 +620,7 @@ def update_graph_signal(db: Session, user_id: int): return [] featured_map: dict[str, list[str]] = {} - with ThreadPoolExecutor(max_workers=4) as pool: + with ThreadPoolExecutor(max_workers=8) as pool: futures = {pool.submit(_fetch, row["youtube_channel_id"]): row for row in sample} for fut in as_completed(futures): row = futures[fut] diff --git a/backend/services/ytdlp.py b/backend/services/ytdlp.py index 940e218..b148ef2 100644 --- a/backend/services/ytdlp.py +++ b/backend/services/ytdlp.py @@ -665,7 +665,7 @@ def predicted_file_path(video_id: str) -> Path: return Path(settings.download_path) / f"{video_id}.mp4" -_SEMAPHORE = threading.Semaphore(3) +_SEMAPHORE = threading.Semaphore(6) _semaphore_lock = threading.Lock() _cookies_browser: str = "" _cookies_file: str = "" @@ -682,7 +682,7 @@ _oauth2_state_lock = threading.Lock() def set_max_concurrent(n: int) -> None: global _SEMAPHORE with _semaphore_lock: - _SEMAPHORE = threading.Semaphore(max(1, min(n, 10))) + _SEMAPHORE = threading.Semaphore(max(1, min(n, 16))) def set_cookies_browser(browser: str) -> None: diff --git a/frontend/src/pages/Channel.jsx b/frontend/src/pages/Channel.jsx index 599d907..13a54ad 100644 --- a/frontend/src/pages/Channel.jsx +++ b/frontend/src/pages/Channel.jsx @@ -117,7 +117,7 @@ export default function ChannelPage() { const popularMut = useMutation({ mutationFn: () => fetchPopularVideos(id), - onSuccess: () => scheduleRefetch(20000), + onSuccess: () => scheduleRefetch(35000), }); const deepSearchMut = useMutation({