Files
youclonedl/backend/routers/channels.py
Mattias Tall 74e9a52096 Fix Following page: replace 9-subquery-per-channel stats with 2 CTEs + indexes
The old _CHANNEL_STATS_SELECT ran 9 correlated subqueries for each
channel row. With 1266 channels that was ~11000 sub-executions per
GET /channels request, causing multi-second (or timeout) delays.

New approach: 2 CTEs (vinfo for counts/sums, nc for new_count) each do
a single aggregated pass over all followed-channel videos, joined back
to channels. Only 2 correlated LIMIT-1 subqueries remain for
latest_video_id/title (fast with the new index).

Also adds 4 indexes on startup (IF NOT EXISTS — safe to deploy):
- videos(channel_id, published_at DESC)  — latest video lookups
- videos(channel_id, indexed_at)         — new_count filter
- user_videos(video_id, user_id)         — watch/download aggregation
- user_channels(user_id, status)         — followed channel filter

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-26 16:04:41 +02:00

743 lines
27 KiB
Python

import json
from datetime import datetime, timedelta
from typing import Optional
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, status
from pydantic import BaseModel
from sqlalchemy.orm import Session
from sqlalchemy import text
from ..auth_utils import get_current_user
from ..database import get_db
from ..models import Channel, ChannelGroup, ChannelGroupMember, Download, User, UserChannel, UserSettings, UserVideo, Video
from ..services import ytdlp
router = APIRouter()
class ChannelOut(BaseModel):
id: int
youtube_channel_id: str
name: str
description: Optional[str]
thumbnail_url: Optional[str]
banner_url: Optional[str]
crawled_at: Optional[datetime]
status: Optional[str]
auto_download: Optional[bool] = None
subscriber_count: Optional[int] = None
video_count: int = 0
unwatched_count: int = 0
watched_count: int = 0
downloaded_count: int = 0
last_published_at: Optional[datetime] = None
new_count: int = 0
latest_video_id: Optional[str] = None
latest_video_title: Optional[str] = None
muted_until: Optional[datetime] = None
upload_frequency_days: Optional[float] = None
notes: Optional[str] = ""
model_config = {"from_attributes": True}
class ChannelGroupOut(BaseModel):
id: int
name: str
channel_ids: list[int] = []
model_config = {"from_attributes": True}
class VideoOut(BaseModel):
id: int
youtube_video_id: str
title: str
thumbnail_url: Optional[str]
duration_seconds: Optional[int]
published_at: Optional[datetime]
channel_id: Optional[int] = None
channel_name: Optional[str] = None
channel_youtube_id: Optional[str] = None
is_downloaded: bool = False
is_watched: bool = False
queued: bool = False
model_config = {"from_attributes": True}
_CHANNEL_STATS_SELECT = """
WITH followed AS (
SELECT channel_id, last_seen_at
FROM user_channels
WHERE user_id = :user_id AND status = 'followed'
),
vinfo AS (
SELECT
v.channel_id,
COUNT(*) AS video_count,
MIN(v.published_at) AS oldest_published,
MAX(v.published_at) AS last_published_at,
SUM(CASE WHEN COALESCE(uv.watched, 0) = 0 THEN 1 ELSE 0 END) AS unwatched_count,
SUM(CASE WHEN uv.watched = 1 THEN 1 ELSE 0 END) AS watched_count,
SUM(CASE WHEN uv.downloaded = 1 THEN 1 ELSE 0 END) AS downloaded_count
FROM videos v
JOIN followed f ON f.channel_id = v.channel_id
LEFT JOIN user_videos uv ON uv.video_id = v.id AND uv.user_id = :user_id
GROUP BY v.channel_id
),
nc AS (
SELECT v.channel_id, COUNT(*) AS new_count
FROM videos v
JOIN followed f ON f.channel_id = v.channel_id
WHERE f.last_seen_at IS NULL OR v.indexed_at > f.last_seen_at
GROUP BY v.channel_id
)
SELECT
c.*, uc.status, uc.auto_download, uc.muted_until, uc.notes,
COALESCE(vi.video_count, 0) AS video_count,
vi.last_published_at,
COALESCE(vi.unwatched_count, 0) AS unwatched_count,
COALESCE(vi.watched_count, 0) AS watched_count,
COALESCE(vi.downloaded_count, 0) AS downloaded_count,
COALESCE(nc.new_count, 0) AS new_count,
CASE WHEN COALESCE(vi.video_count, 0) < 2 THEN NULL
ELSE (julianday(vi.last_published_at) - julianday(vi.oldest_published))
/ (vi.video_count - 1.0)
END AS upload_frequency_days,
(SELECT v2.youtube_video_id FROM videos v2
WHERE v2.channel_id = c.id ORDER BY v2.published_at DESC LIMIT 1) AS latest_video_id,
(SELECT v2.title FROM videos v2
WHERE v2.channel_id = c.id ORDER BY v2.published_at DESC LIMIT 1) AS latest_video_title
FROM channels c
JOIN user_channels uc ON c.id = uc.channel_id
WHERE uc.user_id = :user_id AND uc.status = 'followed'
"""
def _get_channel_or_404(db: Session, channel_id: int) -> Channel:
c = db.query(Channel).filter(Channel.id == channel_id).first()
if not c:
raise HTTPException(status_code=404, detail="Channel not found")
return c
def _index_channels_batch(channel_ids: list[int], user_id: int, delay: float = 1.5):
"""Run channel syncs sequentially with a polite delay between requests."""
import time
for i, cid in enumerate(channel_ids):
if i > 0:
time.sleep(delay)
_index_channel_task(cid, user_id)
def _index_channel_task(channel_id: int, user_id: int):
from ..database import SessionLocal
db = SessionLocal()
try:
channel = db.query(Channel).filter_by(id=channel_id).first()
if not channel:
return
result = ytdlp.fetch_channel_metadata(channel.youtube_channel_id)
if not result:
return
ch_data = result.get("channel", {})
if ch_data:
for k, v in ch_data.items():
if hasattr(channel, k) and v is not None and v != "":
setattr(channel, k, v)
channel.crawled_at = datetime.utcnow()
db.merge(channel)
new_video_ids = []
for vdata in result.get("videos", []):
yt_id = vdata.get("youtube_video_id")
if not yt_id:
continue
existing = db.query(Video).filter_by(youtube_video_id=yt_id).first()
if not existing:
new_video = Video(
youtube_video_id=yt_id,
channel_id=channel.id,
title=vdata.get("title", ""),
description=vdata.get("description"),
thumbnail_url=vdata.get("thumbnail_url"),
duration_seconds=vdata.get("duration_seconds"),
published_at=vdata.get("published_at"),
tags=vdata.get("tags"),
category=vdata.get("category"),
)
db.add(new_video)
db.flush()
new_video_ids.append((yt_id, new_video.id))
else:
# Backfill missing metadata on existing videos
if existing.published_at is None and vdata.get("published_at"):
existing.published_at = vdata["published_at"]
if not existing.title and vdata.get("title"):
existing.title = vdata["title"]
if not existing.thumbnail_url and vdata.get("thumbnail_url"):
existing.thumbnail_url = vdata["thumbnail_url"]
if not existing.duration_seconds and vdata.get("duration_seconds"):
existing.duration_seconds = vdata["duration_seconds"]
if not existing.description and vdata.get("description"):
existing.description = vdata["description"]
db.commit()
# Auto-download new videos if setting says to
if new_video_ids and user_id:
uc = db.query(UserChannel).filter_by(user_id=user_id, channel_id=channel.id).first()
user_settings = db.query(UserSettings).filter_by(user_id=user_id).first()
global_auto = user_settings.auto_download_on_sync if user_settings else False
channel_auto = uc.auto_download if uc and uc.auto_download is not None else global_auto
if channel_auto:
quality = user_settings.preferred_quality if user_settings else "best"
from ..routers.downloads import _on_progress, _on_complete, _on_error
for yt_id, vid_id in new_video_ids:
existing_dl = db.query(Download).filter_by(
user_id=user_id, video_id=vid_id
).filter(Download.status.in_(["pending", "downloading", "complete"])).first()
if not existing_dl:
dl = Download(user_id=user_id, video_id=vid_id, status="pending")
db.add(dl)
db.flush()
import threading
t = threading.Thread(
target=ytdlp.start_download,
args=(yt_id, dl.id, _on_progress, _on_complete, _on_error, quality),
daemon=True,
)
t.start()
db.commit()
except Exception:
db.rollback()
finally:
db.close()
def _discovery_task(user_id: int):
from ..database import SessionLocal
from ..services.discovery import run_full_discovery
db = SessionLocal()
try:
run_full_discovery(db, user_id)
except Exception:
pass
finally:
db.close()
def _enrich_missing_task(limit: int = 20):
"""Fetch full metadata for videos that are missing a description."""
from ..database import SessionLocal
db = SessionLocal()
try:
rows = db.execute(
text("""
SELECT v.id, v.youtube_video_id FROM videos v
WHERE v.description IS NULL
ORDER BY
-- prioritise: followed-channel videos first, then discovery queue, then rest
(EXISTS (SELECT 1 FROM user_channels uc
WHERE uc.channel_id = v.channel_id AND uc.status = 'followed')) DESC,
(EXISTS (SELECT 1 FROM discovery_queue dq
WHERE dq.channel_id = v.channel_id)) DESC,
v.id DESC
LIMIT :limit
"""),
{"limit": limit},
).mappings().all()
for i, row in enumerate(rows):
if i > 0:
import time; time.sleep(2)
try:
meta = ytdlp.fetch_video_metadata(row["youtube_video_id"])
if meta:
vid = db.query(Video).filter_by(id=row["id"]).first()
if vid:
if meta.get("description") is not None:
vid.description = meta["description"] or ""
if not vid.tags and meta.get("tags"):
vid.tags = meta["tags"]
if not vid.category and meta.get("category"):
vid.category = meta["category"]
if not vid.chapters and meta.get("chapters"):
vid.chapters = meta["chapters"]
db.commit()
except Exception:
db.rollback()
finally:
db.close()
@router.get("/feed", response_model=list[VideoOut])
def channel_feed(
limit: int = 24,
offset: int = 0,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
rows = db.execute(
text("""
SELECT v.id, v.youtube_video_id, v.title, v.thumbnail_url,
v.duration_seconds, v.published_at,
c.id AS channel_id, c.name AS channel_name, c.youtube_channel_id AS channel_youtube_id,
COALESCE(uv.downloaded, 0) AS is_downloaded,
COALESCE(uv.watched, 0) AS is_watched,
COALESCE(uv.queued, 0) AS queued
FROM videos v
JOIN channels c ON v.channel_id = c.id
JOIN user_channels uc ON c.id = uc.channel_id AND uc.user_id = :user_id AND uc.status = 'followed'
LEFT JOIN user_videos uv ON v.id = uv.video_id AND uv.user_id = :user_id
ORDER BY v.published_at DESC
LIMIT :limit OFFSET :offset
"""),
{"user_id": current_user.id, "limit": limit, "offset": offset},
).mappings().all()
return [VideoOut(**dict(r)) for r in rows]
@router.post("/sync-all", status_code=202)
def sync_all_channels(
background_tasks: BackgroundTasks,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
# Only sync channels not touched in the last 6 hours to avoid hammering YouTube
channels = db.execute(
text("""
SELECT c.id FROM channels c
JOIN user_channels uc ON c.id = uc.channel_id
WHERE uc.user_id = :uid AND uc.status = 'followed'
AND (c.crawled_at IS NULL OR c.crawled_at < datetime('now', '-6 hours'))
ORDER BY COALESCE(c.crawled_at, '1970-01-01') ASC
"""),
{"uid": current_user.id},
).mappings().all()
if channels:
ids = [row["id"] for row in channels]
background_tasks.add_task(_index_channels_batch, ids, current_user.id)
background_tasks.add_task(_discovery_task, current_user.id)
background_tasks.add_task(_enrich_missing_task, 5)
return {"indexing": len(channels)}
@router.post("/mark-seen", status_code=204)
def mark_channels_seen(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
db.execute(
text("UPDATE user_channels SET last_seen_at = :now WHERE user_id = :uid AND status = 'followed'"),
{"now": datetime.utcnow(), "uid": current_user.id},
)
db.commit()
@router.get("", response_model=list[ChannelOut])
def list_channels(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
rows = db.execute(
text(_CHANNEL_STATS_SELECT + "ORDER BY last_published_at DESC"),
{"user_id": current_user.id},
).mappings().all()
return [ChannelOut(**dict(r)) for r in rows]
# ── Channel Groups (must be before /{channel_id} to avoid route shadowing) ───
@router.get("/groups", response_model=list[ChannelGroupOut])
def list_groups(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
groups = db.query(ChannelGroup).filter_by(user_id=current_user.id).all()
result = []
for g in groups:
members = db.query(ChannelGroupMember).filter_by(group_id=g.id).all()
result.append(ChannelGroupOut(id=g.id, name=g.name, channel_ids=[m.channel_id for m in members]))
return result
@router.post("/groups", response_model=ChannelGroupOut, status_code=201)
def create_group(
body: dict,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
name = (body.get("name") or "").strip()
if not name:
raise HTTPException(status_code=400, detail="name required")
g = ChannelGroup(user_id=current_user.id, name=name)
db.add(g)
db.commit()
db.refresh(g)
return ChannelGroupOut(id=g.id, name=g.name, channel_ids=[])
@router.delete("/groups/{group_id}", status_code=204)
def delete_group(
group_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
g = db.query(ChannelGroup).filter_by(id=group_id, user_id=current_user.id).first()
if not g:
raise HTTPException(status_code=404, detail="Group not found")
db.delete(g)
db.commit()
@router.patch("/groups/{group_id}", response_model=ChannelGroupOut)
def rename_group(
group_id: int,
body: dict,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
g = db.query(ChannelGroup).filter_by(id=group_id, user_id=current_user.id).first()
if not g:
raise HTTPException(status_code=404, detail="Group not found")
name = (body.get("name") or "").strip()
if name:
g.name = name
db.commit()
members = db.query(ChannelGroupMember).filter_by(group_id=g.id).all()
return ChannelGroupOut(id=g.id, name=g.name, channel_ids=[m.channel_id for m in members])
@router.post("/groups/{group_id}/channels/{channel_id}", status_code=204)
def add_channel_to_group(
group_id: int,
channel_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
g = db.query(ChannelGroup).filter_by(id=group_id, user_id=current_user.id).first()
if not g:
raise HTTPException(status_code=404, detail="Group not found")
existing = db.query(ChannelGroupMember).filter_by(group_id=group_id, channel_id=channel_id).first()
if not existing:
db.add(ChannelGroupMember(group_id=group_id, channel_id=channel_id))
db.commit()
@router.delete("/groups/{group_id}/channels/{channel_id}", status_code=204)
def remove_channel_from_group_route(
group_id: int,
channel_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
m = db.query(ChannelGroupMember).filter_by(group_id=group_id, channel_id=channel_id).first()
if m:
db.delete(m)
db.commit()
class BulkChannelBody(BaseModel):
channel_ids: list[int]
action: str # "mute" | "unmute" | "unfollow"
@router.post("/bulk-action", status_code=200)
def bulk_channel_action(
body: BulkChannelBody,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
if not body.channel_ids:
return {"ok": True}
placeholders = ",".join(str(int(i)) for i in body.channel_ids)
if body.action == "mute":
db.execute(
text(f"""
UPDATE user_channels SET muted_until = :until
WHERE user_id = :user_id AND channel_id IN ({placeholders})
"""),
{"until": datetime.utcnow() + timedelta(days=30), "user_id": current_user.id},
)
elif body.action == "unmute":
db.execute(
text(f"UPDATE user_channels SET muted_until = NULL WHERE user_id = :user_id AND channel_id IN ({placeholders})"),
{"user_id": current_user.id},
)
elif body.action == "unfollow":
db.execute(
text(f"DELETE FROM user_channels WHERE user_id = :user_id AND channel_id IN ({placeholders})"),
{"user_id": current_user.id},
)
db.commit()
return {"ok": True}
@router.get("/{channel_id}", response_model=ChannelOut)
def get_channel(
channel_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
row = db.execute(
text("""
SELECT c.*, uc.status, uc.auto_download, uc.muted_until,
(SELECT COUNT(*) FROM videos WHERE channel_id = c.id) AS video_count,
(SELECT MAX(v.published_at) FROM videos v WHERE v.channel_id = c.id) AS last_published_at,
(SELECT COUNT(*) FROM videos v
LEFT JOIN user_videos uv ON v.id = uv.video_id AND uv.user_id = :user_id
WHERE v.channel_id = c.id AND COALESCE(uv.watched, 0) = 0) AS unwatched_count,
(SELECT COUNT(*) FROM videos v
JOIN user_videos uv ON v.id = uv.video_id AND uv.user_id = :user_id
WHERE v.channel_id = c.id AND uv.watched = 1) AS watched_count,
(SELECT COUNT(*) FROM videos v
JOIN user_videos uv ON v.id = uv.video_id AND uv.user_id = :user_id
WHERE v.channel_id = c.id AND uv.downloaded = 1) AS downloaded_count,
0 AS new_count,
(SELECT v.youtube_video_id FROM videos v
WHERE v.channel_id = c.id ORDER BY v.published_at DESC LIMIT 1) AS latest_video_id,
(SELECT v.title FROM videos v
WHERE v.channel_id = c.id ORDER BY v.published_at DESC LIMIT 1) AS latest_video_title
FROM channels c
LEFT JOIN user_channels uc ON c.id = uc.channel_id AND uc.user_id = :user_id
WHERE c.id = :channel_id
"""),
{"user_id": current_user.id, "channel_id": channel_id},
).mappings().first()
if not row:
raise HTTPException(status_code=404, detail="Channel not found")
return ChannelOut(**dict(row))
@router.get("/{channel_id}/videos", response_model=list[VideoOut])
def get_channel_videos(
channel_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
_get_channel_or_404(db, channel_id)
rows = db.execute(
text("""
SELECT v.id, v.youtube_video_id, v.title, v.thumbnail_url,
v.duration_seconds, v.published_at,
COALESCE(uv.downloaded, 0) AS is_downloaded,
COALESCE(uv.watched, 0) AS is_watched
FROM videos v
LEFT JOIN user_videos uv ON v.id = uv.video_id AND uv.user_id = :user_id
WHERE v.channel_id = :channel_id
ORDER BY v.published_at DESC
"""),
{"user_id": current_user.id, "channel_id": channel_id},
).mappings().all()
return [VideoOut(**dict(r)) for r in rows]
@router.post("/{channel_id}/follow", status_code=status.HTTP_204_NO_CONTENT)
def follow_channel(
channel_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
_get_channel_or_404(db, channel_id)
uc = db.query(UserChannel).filter_by(user_id=current_user.id, channel_id=channel_id).first()
if uc:
uc.status = "followed"
else:
db.add(UserChannel(user_id=current_user.id, channel_id=channel_id, status="followed"))
db.commit()
@router.delete("/{channel_id}/follow", status_code=status.HTTP_204_NO_CONTENT)
def unfollow_channel(
channel_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
uc = db.query(UserChannel).filter_by(user_id=current_user.id, channel_id=channel_id).first()
if uc:
db.delete(uc)
db.commit()
@router.patch("/{channel_id}/auto-download", status_code=200)
def set_channel_auto_download(
channel_id: int,
body: dict,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
uc = db.query(UserChannel).filter_by(user_id=current_user.id, channel_id=channel_id).first()
if not uc:
raise HTTPException(status_code=404, detail="Not following this channel")
value = body.get("auto_download") # True / False / None
uc.auto_download = value
db.commit()
return {"auto_download": uc.auto_download}
@router.post("/{channel_id}/index", status_code=status.HTTP_202_ACCEPTED)
def index_channel(
channel_id: int,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
_get_channel_or_404(db, channel_id)
background_tasks.add_task(_index_channel_task, channel_id, current_user.id)
return {"detail": "Indexing started"}
@router.post("/follow-bulk", status_code=200)
def follow_bulk(
body: dict,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Follow a large list of channel handles/IDs without hitting YouTube.
Creates stub Channel records for unknowns and UserChannel rows immediately.
Metadata (name, thumbnail, videos) fills in when the user hits Sync All.
"""
handles = body.get("handles", [])
if not handles or not isinstance(handles, list):
raise HTTPException(status_code=400, detail="handles list required")
followed = 0
already = 0
created = 0
for handle in handles:
handle = str(handle).strip()
if not handle:
continue
channel = db.query(Channel).filter_by(youtube_channel_id=handle).first()
if not channel:
# Stub — name defaults to handle, filled in on next index
channel = Channel(
youtube_channel_id=handle,
name=handle.lstrip("@"),
)
db.add(channel)
db.flush()
created += 1
uc = db.query(UserChannel).filter_by(
user_id=current_user.id, channel_id=channel.id
).first()
if uc:
if uc.status != "followed":
uc.status = "followed"
followed += 1
else:
already += 1
else:
db.add(UserChannel(
user_id=current_user.id,
channel_id=channel.id,
status="followed",
))
followed += 1
db.commit()
return {"followed": followed, "already_following": already, "new_channels": created}
@router.patch("/{channel_id}/notes", status_code=200)
def update_channel_notes(
channel_id: int,
body: dict,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
uc = db.query(UserChannel).filter_by(user_id=current_user.id, channel_id=channel_id).first()
if not uc:
raise HTTPException(status_code=404, detail="Not following this channel")
uc.notes = body.get("notes", "") or ""
db.commit()
return {"ok": True}
@router.post("/{channel_id}/mute", status_code=204)
def mute_channel(
channel_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
uc = db.query(UserChannel).filter_by(user_id=current_user.id, channel_id=channel_id).first()
if not uc:
raise HTTPException(status_code=404, detail="Not following this channel")
uc.muted_until = datetime.utcnow() + timedelta(days=30)
db.commit()
@router.delete("/{channel_id}/mute", status_code=204)
def unmute_channel(
channel_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
uc = db.query(UserChannel).filter_by(user_id=current_user.id, channel_id=channel_id).first()
if uc:
uc.muted_until = None
db.commit()
@router.post("/follow-by-url", status_code=status.HTTP_201_CREATED)
def follow_by_url(
body: dict,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
yt_channel_id = body.get("youtube_channel_id") or body.get("channel_id")
if not yt_channel_id:
raise HTTPException(status_code=400, detail="youtube_channel_id required")
channel = db.query(Channel).filter_by(youtube_channel_id=yt_channel_id).first()
if not channel:
meta = ytdlp.fetch_channel_metadata(yt_channel_id, max_videos=30)
if not meta or not meta.get("channel"):
raise HTTPException(status_code=404, detail="Channel not found on YouTube")
ch_data = meta["channel"]
channel = Channel(**{k: v for k, v in ch_data.items() if hasattr(Channel, k)})
channel.crawled_at = datetime.utcnow()
db.add(channel)
db.flush()
for vdata in meta.get("videos", []):
yt_id = vdata.get("youtube_video_id")
if not yt_id:
continue
if not db.query(Video).filter_by(youtube_video_id=yt_id).first():
db.add(Video(
youtube_video_id=yt_id,
channel_id=channel.id,
title=vdata.get("title", ""),
description=vdata.get("description"),
thumbnail_url=vdata.get("thumbnail_url"),
duration_seconds=vdata.get("duration_seconds"),
published_at=vdata.get("published_at"),
tags=vdata.get("tags"),
category=vdata.get("category"),
))
db.commit()
db.refresh(channel)
uc = db.query(UserChannel).filter_by(user_id=current_user.id, channel_id=channel.id).first()
if uc:
uc.status = "followed"
else:
db.add(UserChannel(user_id=current_user.id, channel_id=channel.id, status="followed"))
db.commit()
background_tasks.add_task(_discovery_task, current_user.id)
return {"channel_id": channel.id, "name": channel.name}