Files
youclonedl/backend/routers/playlists.py
Mattias Thall 6cfaca382c Fix playlist thumbnails — extract from yt-dlp thumbnails array
_stable_thumbnail expects a video ID but was being passed a playlist ID
(PLxxx), producing a broken URL. Now picks the best thumbnail from
yt-dlp's thumbnails array, falling back to the singular thumbnail field.

Also backfills playlist.thumbnail_url from the first video when indexing
a playlist that still has no thumbnail.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-26 22:42:30 +02:00

201 lines
7.2 KiB
Python

import json
from datetime import datetime
from typing import Optional
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, status
from pydantic import BaseModel
from sqlalchemy.orm import Session
from sqlalchemy import text
from ..auth_utils import get_current_user
from ..database import get_db
from ..models import Channel, Playlist, Video, User
from ..services import ytdlp
router = APIRouter()
class PlaylistOut(BaseModel):
id: int
youtube_playlist_id: str
channel_id: Optional[int]
title: str
description: Optional[str]
thumbnail_url: Optional[str]
video_count: int
indexed_at: Optional[datetime]
model_config = {"from_attributes": True}
class PlaylistVideoOut(BaseModel):
id: int
youtube_video_id: str
title: str
thumbnail_url: Optional[str]
duration_seconds: Optional[int]
published_at: Optional[datetime]
view_count: Optional[int]
is_downloaded: bool = False
is_watched: bool = False
channel_id: Optional[int] = None
channel_name: Optional[str] = None
model_config = {"from_attributes": True}
@router.get("/channel/{channel_id}", response_model=list[PlaylistOut])
def get_channel_playlists(
channel_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
rows = db.query(Playlist).filter_by(channel_id=channel_id).order_by(Playlist.video_count.desc()).all()
return rows
@router.post("/channel/{channel_id}/fetch", status_code=status.HTTP_202_ACCEPTED)
def fetch_channel_playlists(
channel_id: int,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
channel = db.query(Channel).filter_by(id=channel_id).first()
if not channel:
raise HTTPException(status_code=404, detail="Channel not found")
background_tasks.add_task(_fetch_playlists_task, channel_id, channel.youtube_channel_id)
return {"detail": "Fetching playlists"}
def _fetch_playlists_task(channel_id: int, youtube_channel_id: str):
from ..database import SessionLocal
db = SessionLocal()
try:
playlists = ytdlp.fetch_channel_playlists(youtube_channel_id)
for pl in playlists:
pl_id = pl["youtube_playlist_id"]
existing = db.query(Playlist).filter_by(youtube_playlist_id=pl_id).first()
if existing:
existing.title = pl["title"] or existing.title
if pl.get("video_count"):
existing.video_count = pl["video_count"]
if pl.get("thumbnail_url") and not existing.thumbnail_url:
existing.thumbnail_url = pl["thumbnail_url"]
else:
db.add(Playlist(
youtube_playlist_id=pl_id,
channel_id=channel_id,
title=pl["title"],
description=pl.get("description"),
thumbnail_url=pl.get("thumbnail_url"),
video_count=pl.get("video_count") or 0,
))
db.commit()
except Exception:
db.rollback()
finally:
db.close()
@router.get("/{playlist_id}/videos", response_model=list[PlaylistVideoOut])
def get_playlist_videos(
playlist_id: int,
offset: int = 0,
limit: int = 60,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
playlist = db.query(Playlist).filter_by(id=playlist_id).first()
if not playlist:
raise HTTPException(status_code=404, detail="Playlist not found")
if not playlist.indexed_at:
return []
rows = db.execute(
text("""
SELECT v.id, v.youtube_video_id, v.title, v.thumbnail_url,
v.duration_seconds, v.published_at, v.view_count,
c.id AS channel_id, c.name AS channel_name,
COALESCE(uv.downloaded, 0) AS is_downloaded,
COALESCE(uv.watched, 0) AS is_watched
FROM videos v
LEFT JOIN channels c ON v.channel_id = c.id
LEFT JOIN user_videos uv ON v.id = uv.video_id AND uv.user_id = :user_id
WHERE v.youtube_video_id IN (
SELECT value FROM json_each((
SELECT video_ids FROM playlists WHERE id = :playlist_id
))
)
ORDER BY v.published_at DESC NULLS LAST
LIMIT :limit OFFSET :offset
"""),
{"user_id": current_user.id, "playlist_id": playlist_id, "limit": limit, "offset": offset},
).mappings().all()
return [PlaylistVideoOut(**dict(r)) for r in rows]
@router.post("/{playlist_id}/index", status_code=status.HTTP_202_ACCEPTED)
def index_playlist(
playlist_id: int,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
playlist = db.query(Playlist).filter_by(id=playlist_id).first()
if not playlist:
raise HTTPException(status_code=404, detail="Playlist not found")
background_tasks.add_task(_index_playlist_task, playlist_id, playlist.youtube_playlist_id, playlist.channel_id)
return {"detail": "Indexing playlist"}
def _index_playlist_task(playlist_id: int, youtube_playlist_id: str, channel_id: Optional[int]):
from ..database import SessionLocal
db = SessionLocal()
try:
videos = ytdlp.fetch_playlist_videos(youtube_playlist_id)
playlist = db.query(Playlist).filter_by(id=playlist_id).first()
if not playlist:
return
video_yt_ids = []
for vdata in videos:
yt_id = vdata.get("youtube_video_id")
if not yt_id:
continue
video_yt_ids.append(yt_id)
existing = db.query(Video).filter_by(youtube_video_id=yt_id).first()
if existing:
if vdata.get("view_count") is not None:
existing.view_count = vdata["view_count"]
else:
ch_id = channel_id
if not ch_id and vdata.get("channel", {}).get("youtube_channel_id"):
ch = db.query(Channel).filter_by(
youtube_channel_id=vdata["channel"]["youtube_channel_id"]
).first()
if ch:
ch_id = ch.id
db.add(Video(
youtube_video_id=yt_id,
channel_id=ch_id,
title=vdata.get("title", ""),
thumbnail_url=vdata.get("thumbnail_url"),
duration_seconds=vdata.get("duration_seconds"),
published_at=vdata.get("published_at"),
view_count=vdata.get("view_count"),
tags="[]",
))
playlist.video_count = len(video_yt_ids)
playlist.indexed_at = datetime.utcnow()
playlist.video_ids = json.dumps(video_yt_ids)
# Backfill thumbnail from first video if playlist has none
if not playlist.thumbnail_url and video_yt_ids:
from ..services.ytdlp import _stable_thumbnail
playlist.thumbnail_url = _stable_thumbnail(video_yt_ids[0])
db.commit()
except Exception:
db.rollback()
finally:
db.close()