Popular: write Phase 1 immediately, enrich view_count in background
Previously the task waited for all 30 parallel metadata fetches before writing anything to the DB (~30s). Now Phase 1 (flat-playlist IDs + basic info) commits to channel_popular_videos immediately (~5s), so the tab populates fast. Phase 2 (view_count + dates) runs in a daemon thread while the user is already browsing. Also: catch table-not-found errors in the sort=popular query so a cold server returns [] instead of 500. Frontend refetch wait 35s→8s to match the faster Phase 1 commit time. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -619,21 +619,25 @@ def get_channel_videos(
|
|||||||
params["q"] = f"%{q.strip()}%"
|
params["q"] = f"%{q.strip()}%"
|
||||||
|
|
||||||
if sort == "popular":
|
if sort == "popular":
|
||||||
rows = db.execute(
|
try:
|
||||||
text(f"""
|
rows = db.execute(
|
||||||
SELECT v.id, v.youtube_video_id, v.title, v.thumbnail_url,
|
text(f"""
|
||||||
v.duration_seconds, v.published_at, v.view_count,
|
SELECT v.id, v.youtube_video_id, v.title, v.thumbnail_url,
|
||||||
COALESCE(uv.downloaded, 0) AS is_downloaded,
|
v.duration_seconds, v.published_at, v.view_count,
|
||||||
COALESCE(uv.watched, 0) AS is_watched
|
COALESCE(uv.downloaded, 0) AS is_downloaded,
|
||||||
FROM channel_popular_videos cpv
|
COALESCE(uv.watched, 0) AS is_watched
|
||||||
JOIN videos v ON cpv.video_id = v.id
|
FROM channel_popular_videos cpv
|
||||||
LEFT JOIN user_videos uv ON v.id = uv.video_id AND uv.user_id = :user_id
|
JOIN videos v ON cpv.video_id = v.id
|
||||||
WHERE cpv.channel_id = :channel_id {q_clause}
|
LEFT JOIN user_videos uv ON v.id = uv.video_id AND uv.user_id = :user_id
|
||||||
ORDER BY cpv.rank ASC
|
WHERE cpv.channel_id = :channel_id {q_clause}
|
||||||
LIMIT :limit OFFSET :offset
|
ORDER BY cpv.rank ASC
|
||||||
"""),
|
LIMIT :limit OFFSET :offset
|
||||||
params,
|
"""),
|
||||||
).mappings().all()
|
params,
|
||||||
|
).mappings().all()
|
||||||
|
except Exception:
|
||||||
|
rows = []
|
||||||
|
return [VideoOut(**dict(r)) for r in rows]
|
||||||
else:
|
else:
|
||||||
order = {
|
order = {
|
||||||
"newest": "v.published_at DESC NULLS LAST",
|
"newest": "v.published_at DESC NULLS LAST",
|
||||||
@@ -672,16 +676,22 @@ def fetch_popular_videos(
|
|||||||
|
|
||||||
|
|
||||||
def _fetch_popular_task(channel_id: int, youtube_channel_id: str):
|
def _fetch_popular_task(channel_id: int, youtube_channel_id: str):
|
||||||
"""Two-phase popular fetch: get IDs fast via flat-playlist, then enrich with full metadata in parallel."""
|
"""Fetch popular videos in two phases.
|
||||||
|
|
||||||
|
Phase 1 (fast): flat-playlist to get IDs + basic info, write to DB
|
||||||
|
immediately so the Popular tab populates within seconds.
|
||||||
|
Phase 2 (background thread): enrich each video with view_count and
|
||||||
|
published_at via individual fetches — runs while the user is already
|
||||||
|
browsing.
|
||||||
|
"""
|
||||||
|
import threading
|
||||||
from ..database import SessionLocal
|
from ..database import SessionLocal
|
||||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
||||||
|
|
||||||
if youtube_channel_id.startswith("@"):
|
if youtube_channel_id.startswith("@"):
|
||||||
url = f"https://www.youtube.com/{youtube_channel_id}/videos?sort=p"
|
url = f"https://www.youtube.com/{youtube_channel_id}/videos?sort=p"
|
||||||
else:
|
else:
|
||||||
url = f"https://www.youtube.com/channel/{youtube_channel_id}/videos?sort=p"
|
url = f"https://www.youtube.com/channel/{youtube_channel_id}/videos?sort=p"
|
||||||
|
|
||||||
# Phase 1: get ordered list of popular video IDs (fast)
|
|
||||||
stdout, _, _ = ytdlp._run([
|
stdout, _, _ = ytdlp._run([
|
||||||
"yt-dlp", url,
|
"yt-dlp", url,
|
||||||
"--dump-json", "--flat-playlist",
|
"--dump-json", "--flat-playlist",
|
||||||
@@ -690,7 +700,7 @@ def _fetch_popular_task(channel_id: int, youtube_channel_id: str):
|
|||||||
*ytdlp._cookie_args(),
|
*ytdlp._cookie_args(),
|
||||||
], timeout=60)
|
], timeout=60)
|
||||||
|
|
||||||
video_ids = []
|
entries = []
|
||||||
for line in stdout.splitlines():
|
for line in stdout.splitlines():
|
||||||
line = line.strip()
|
line = line.strip()
|
||||||
if not line:
|
if not line:
|
||||||
@@ -699,56 +709,36 @@ def _fetch_popular_task(channel_id: int, youtube_channel_id: str):
|
|||||||
info = json.loads(line)
|
info = json.loads(line)
|
||||||
yt_id = info.get("id")
|
yt_id = info.get("id")
|
||||||
if yt_id:
|
if yt_id:
|
||||||
video_ids.append(yt_id)
|
entries.append({"id": yt_id, "title": info.get("title", ""), "duration": info.get("duration")})
|
||||||
except json.JSONDecodeError:
|
except json.JSONDecodeError:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if not video_ids:
|
if not entries:
|
||||||
return
|
return
|
||||||
|
|
||||||
# Phase 2: fetch full metadata in parallel (gets view_count + published_at)
|
# Phase 1: store with basic info and write popular ranks immediately
|
||||||
with ThreadPoolExecutor(max_workers=8) as pool:
|
|
||||||
futures = {pool.submit(ytdlp.fetch_video_metadata, vid): vid for vid in video_ids}
|
|
||||||
results = {}
|
|
||||||
for future in as_completed(futures):
|
|
||||||
vid = futures[future]
|
|
||||||
try:
|
|
||||||
results[vid] = future.result()
|
|
||||||
except Exception:
|
|
||||||
results[vid] = None
|
|
||||||
|
|
||||||
db = SessionLocal()
|
db = SessionLocal()
|
||||||
try:
|
try:
|
||||||
channel = db.query(Channel).filter_by(id=channel_id).first()
|
channel = db.query(Channel).filter_by(id=channel_id).first()
|
||||||
if not channel:
|
if not channel:
|
||||||
return
|
return
|
||||||
|
|
||||||
# Clear previous popular list for this channel
|
|
||||||
db.execute(text("DELETE FROM channel_popular_videos WHERE channel_id = :cid"), {"cid": channel_id})
|
db.execute(text("DELETE FROM channel_popular_videos WHERE channel_id = :cid"), {"cid": channel_id})
|
||||||
db.commit()
|
db.commit()
|
||||||
|
|
||||||
for rank, yt_id in enumerate(video_ids, start=1):
|
for rank, entry in enumerate(entries, start=1):
|
||||||
meta = results.get(yt_id)
|
yt_id = entry["id"]
|
||||||
if not meta:
|
|
||||||
continue
|
|
||||||
try:
|
try:
|
||||||
existing = db.query(Video).filter_by(youtube_video_id=yt_id).first()
|
existing = db.query(Video).filter_by(youtube_video_id=yt_id).first()
|
||||||
if existing:
|
if existing:
|
||||||
if meta.get("view_count") is not None:
|
|
||||||
existing.view_count = meta["view_count"]
|
|
||||||
if meta.get("published_at") and not existing.published_at:
|
|
||||||
existing.published_at = meta["published_at"]
|
|
||||||
video_id = existing.id
|
video_id = existing.id
|
||||||
else:
|
else:
|
||||||
v = Video(
|
v = Video(
|
||||||
youtube_video_id=yt_id,
|
youtube_video_id=yt_id,
|
||||||
channel_id=channel.id,
|
channel_id=channel.id,
|
||||||
title=meta.get("title", ""),
|
title=entry["title"],
|
||||||
thumbnail_url=ytdlp._stable_thumbnail(yt_id),
|
thumbnail_url=ytdlp._stable_thumbnail(yt_id),
|
||||||
duration_seconds=meta.get("duration_seconds"),
|
duration_seconds=entry["duration"],
|
||||||
published_at=meta.get("published_at"),
|
tags="[]",
|
||||||
tags=meta.get("tags") or "[]",
|
|
||||||
view_count=meta.get("view_count"),
|
|
||||||
)
|
)
|
||||||
db.add(v)
|
db.add(v)
|
||||||
db.flush()
|
db.flush()
|
||||||
@@ -767,6 +757,37 @@ def _fetch_popular_task(channel_id: int, youtube_channel_id: str):
|
|||||||
finally:
|
finally:
|
||||||
db.close()
|
db.close()
|
||||||
|
|
||||||
|
# Phase 2: enrich view_count + dates in a daemon thread (non-blocking)
|
||||||
|
video_ids = [e["id"] for e in entries]
|
||||||
|
threading.Thread(target=_enrich_popular_videos, args=(video_ids,), daemon=True).start()
|
||||||
|
|
||||||
|
|
||||||
|
def _enrich_popular_videos(video_ids: list):
|
||||||
|
from ..database import SessionLocal
|
||||||
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||||
|
|
||||||
|
with ThreadPoolExecutor(max_workers=8) as pool:
|
||||||
|
futures = {pool.submit(ytdlp.fetch_video_metadata, vid): vid for vid in video_ids}
|
||||||
|
results = {futures[f]: f.result() for f in as_completed(futures) if not f.exception()}
|
||||||
|
|
||||||
|
db = SessionLocal()
|
||||||
|
try:
|
||||||
|
for yt_id, meta in results.items():
|
||||||
|
if not meta:
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
vid = db.query(Video).filter_by(youtube_video_id=yt_id).first()
|
||||||
|
if vid:
|
||||||
|
if meta.get("view_count") is not None:
|
||||||
|
vid.view_count = meta["view_count"]
|
||||||
|
if not vid.published_at and meta.get("published_at"):
|
||||||
|
vid.published_at = meta["published_at"]
|
||||||
|
db.commit()
|
||||||
|
except Exception:
|
||||||
|
db.rollback()
|
||||||
|
finally:
|
||||||
|
db.close()
|
||||||
|
|
||||||
|
|
||||||
@router.post("/{channel_id}/search", status_code=status.HTTP_202_ACCEPTED)
|
@router.post("/{channel_id}/search", status_code=status.HTTP_202_ACCEPTED)
|
||||||
def search_channel_youtube(
|
def search_channel_youtube(
|
||||||
|
|||||||
@@ -117,7 +117,7 @@ export default function ChannelPage() {
|
|||||||
|
|
||||||
const popularMut = useMutation({
|
const popularMut = useMutation({
|
||||||
mutationFn: () => fetchPopularVideos(id),
|
mutationFn: () => fetchPopularVideos(id),
|
||||||
onSuccess: () => scheduleRefetch(35000),
|
onSuccess: () => scheduleRefetch(8000),
|
||||||
});
|
});
|
||||||
|
|
||||||
const deepSearchMut = useMutation({
|
const deepSearchMut = useMutation({
|
||||||
|
|||||||
Reference in New Issue
Block a user