Files
youclonedl/backend/routers/channels.py
Mattias Thall 2f37072187 Fix popular fetch and improve date/view_count coverage
Popular fetch now does a two-phase approach: fast flat-playlist to get
IDs in popularity order, then parallel full metadata fetch (8 workers)
to get real view_count and published_at for each video. Previously
flat-playlist mode returned timestamp/view_count as null.

Enrich task now also backfills published_at and view_count (not just
description). Startup limit 3→50, enrichment sleep 2s→0.5s.

Raise all thread pool sizes to match 8-core machine:
- Discovery search: 5→8 workers
- Graph signal: 4→8 workers
- Popular fetch: 5→8 workers
- Download semaphore default 3→6, cap 10→16

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-26 22:36:18 +02:00

1055 lines
39 KiB
Python

import json
from datetime import datetime, timedelta
from typing import Optional
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, status
from pydantic import BaseModel
from sqlalchemy.orm import Session
from sqlalchemy import text
from ..auth_utils import get_current_user
from ..database import get_db
from ..models import Channel, ChannelGroup, ChannelGroupMember, Download, User, UserChannel, UserSettings, UserVideo, Video
from ..services import ytdlp
router = APIRouter()
class ChannelOut(BaseModel):
id: int
youtube_channel_id: str
name: str
description: Optional[str]
thumbnail_url: Optional[str]
banner_url: Optional[str]
crawled_at: Optional[datetime]
status: Optional[str]
auto_download: Optional[bool] = None
subscriber_count: Optional[int] = None
video_count: int = 0
unwatched_count: int = 0
watched_count: int = 0
downloaded_count: int = 0
last_published_at: Optional[datetime] = None
new_count: int = 0
latest_video_id: Optional[str] = None
latest_video_title: Optional[str] = None
muted_until: Optional[datetime] = None
upload_frequency_days: Optional[float] = None
notes: Optional[str] = ""
model_config = {"from_attributes": True}
class ChannelGroupOut(BaseModel):
id: int
name: str
channel_ids: list[int] = []
model_config = {"from_attributes": True}
class VideoOut(BaseModel):
id: int
youtube_video_id: str
title: str
thumbnail_url: Optional[str]
duration_seconds: Optional[int]
published_at: Optional[datetime]
channel_id: Optional[int] = None
channel_name: Optional[str] = None
channel_youtube_id: Optional[str] = None
is_downloaded: bool = False
is_watched: bool = False
queued: bool = False
view_count: Optional[int] = None
model_config = {"from_attributes": True}
def _get_channel_or_404(db: Session, channel_id: int) -> Channel:
c = db.query(Channel).filter(Channel.id == channel_id).first()
if not c:
raise HTTPException(status_code=404, detail="Channel not found")
return c
def _index_channels_batch(channel_ids: list[int], user_id: int, delay: float = 1.5):
"""Run channel syncs sequentially with a polite delay between requests."""
import time
for i, cid in enumerate(channel_ids):
if i > 0:
time.sleep(delay)
_index_channel_task(cid, user_id)
def _index_channel_task(channel_id: int, user_id: int, max_videos: int = 30):
from ..database import SessionLocal
db = SessionLocal()
try:
channel = db.query(Channel).filter_by(id=channel_id).first()
if not channel:
return
result = ytdlp.fetch_channel_metadata(channel.youtube_channel_id, max_videos=max_videos)
if not result:
return
ch_data = result.get("channel", {})
if ch_data:
for k, v in ch_data.items():
if hasattr(channel, k) and v is not None and v != "":
setattr(channel, k, v)
channel.crawled_at = datetime.utcnow()
db.merge(channel)
new_video_ids = []
for vdata in result.get("videos", []):
yt_id = vdata.get("youtube_video_id")
if not yt_id:
continue
existing = db.query(Video).filter_by(youtube_video_id=yt_id).first()
if not existing:
new_video = Video(
youtube_video_id=yt_id,
channel_id=channel.id,
title=vdata.get("title", ""),
description=vdata.get("description"),
thumbnail_url=vdata.get("thumbnail_url"),
duration_seconds=vdata.get("duration_seconds"),
published_at=vdata.get("published_at"),
tags=vdata.get("tags"),
category=vdata.get("category"),
)
db.add(new_video)
db.flush()
new_video_ids.append((yt_id, new_video.id))
else:
# Backfill missing metadata on existing videos
if existing.published_at is None and vdata.get("published_at"):
existing.published_at = vdata["published_at"]
if not existing.title and vdata.get("title"):
existing.title = vdata["title"]
if not existing.thumbnail_url and vdata.get("thumbnail_url"):
existing.thumbnail_url = vdata["thumbnail_url"]
if not existing.duration_seconds and vdata.get("duration_seconds"):
existing.duration_seconds = vdata["duration_seconds"]
if not existing.description and vdata.get("description"):
existing.description = vdata["description"]
db.commit()
# Auto-download new videos if setting says to
if new_video_ids and user_id:
uc = db.query(UserChannel).filter_by(user_id=user_id, channel_id=channel.id).first()
user_settings = db.query(UserSettings).filter_by(user_id=user_id).first()
global_auto = user_settings.auto_download_on_sync if user_settings else False
channel_auto = uc.auto_download if uc and uc.auto_download is not None else global_auto
if channel_auto:
quality = user_settings.preferred_quality if user_settings else "best"
subtitle_langs = (user_settings.subtitle_langs or "") if user_settings else ""
from ..routers.downloads import _on_progress, _on_complete, _on_error
for yt_id, vid_id in new_video_ids:
existing_dl = db.query(Download).filter_by(
user_id=user_id, video_id=vid_id
).filter(Download.status.in_(["pending", "downloading", "complete"])).first()
if not existing_dl:
dl = Download(user_id=user_id, video_id=vid_id, status="pending")
db.add(dl)
db.flush()
import threading
t = threading.Thread(
target=ytdlp.start_download,
args=(yt_id, dl.id, _on_progress, _on_complete, _on_error, quality, subtitle_langs),
daemon=True,
)
t.start()
db.commit()
except Exception:
db.rollback()
finally:
db.close()
def _discovery_task(user_id: int):
from ..database import SessionLocal
from ..services.discovery import run_full_discovery
db = SessionLocal()
try:
run_full_discovery(db, user_id)
except Exception:
pass
finally:
db.close()
def _enrich_missing_task(limit: int = 20):
"""Fetch full metadata for videos missing description, published_at, or view_count."""
from ..database import SessionLocal
import time
db = SessionLocal()
try:
rows = db.execute(
text("""
SELECT v.id, v.youtube_video_id FROM videos v
WHERE v.description IS NULL OR v.published_at IS NULL OR v.view_count IS NULL
ORDER BY
(EXISTS (SELECT 1 FROM user_channels uc
WHERE uc.channel_id = v.channel_id AND uc.status = 'followed')) DESC,
(EXISTS (SELECT 1 FROM discovery_queue dq
WHERE dq.channel_id = v.channel_id)) DESC,
v.id DESC
LIMIT :limit
"""),
{"limit": limit},
).mappings().all()
for i, row in enumerate(rows):
if i > 0:
time.sleep(0.5)
try:
meta = ytdlp.fetch_video_metadata(row["youtube_video_id"])
if meta:
vid = db.query(Video).filter_by(id=row["id"]).first()
if vid:
if meta.get("description") is not None:
vid.description = meta["description"] or ""
if not vid.published_at and meta.get("published_at"):
vid.published_at = meta["published_at"]
if vid.view_count is None and meta.get("view_count") is not None:
vid.view_count = meta["view_count"]
if not vid.tags and meta.get("tags"):
vid.tags = meta["tags"]
if not vid.category and meta.get("category"):
vid.category = meta["category"]
if not vid.chapters and meta.get("chapters"):
vid.chapters = meta["chapters"]
db.commit()
except Exception:
db.rollback()
finally:
db.close()
@router.get("/feed", response_model=list[VideoOut])
def channel_feed(
limit: int = 24,
offset: int = 0,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
rows = db.execute(
text("""
SELECT v.id, v.youtube_video_id, v.title, v.thumbnail_url,
v.duration_seconds, v.published_at,
c.id AS channel_id, c.name AS channel_name, c.youtube_channel_id AS channel_youtube_id,
COALESCE(uv.downloaded, 0) AS is_downloaded,
COALESCE(uv.watched, 0) AS is_watched,
COALESCE(uv.queued, 0) AS queued
FROM videos v
JOIN channels c ON v.channel_id = c.id
JOIN user_channels uc ON c.id = uc.channel_id AND uc.user_id = :user_id AND uc.status = 'followed'
LEFT JOIN user_videos uv ON v.id = uv.video_id AND uv.user_id = :user_id
ORDER BY v.published_at DESC
LIMIT :limit OFFSET :offset
"""),
{"user_id": current_user.id, "limit": limit, "offset": offset},
).mappings().all()
return [VideoOut(**dict(r)) for r in rows]
@router.post("/sync-all", status_code=202)
def sync_all_channels(
background_tasks: BackgroundTasks,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
# Only sync channels not touched in the last 6 hours to avoid hammering YouTube
channels = db.execute(
text("""
SELECT c.id FROM channels c
JOIN user_channels uc ON c.id = uc.channel_id
WHERE uc.user_id = :uid AND uc.status = 'followed'
AND (c.crawled_at IS NULL OR c.crawled_at < datetime('now', '-6 hours'))
ORDER BY COALESCE(c.crawled_at, '1970-01-01') ASC
"""),
{"uid": current_user.id},
).mappings().all()
if channels:
ids = [row["id"] for row in channels]
background_tasks.add_task(_index_channels_batch, ids, current_user.id)
background_tasks.add_task(_discovery_task, current_user.id)
background_tasks.add_task(_enrich_missing_task, 30)
return {"indexing": len(channels)}
@router.post("/mark-seen", status_code=204)
def mark_channels_seen(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
db.execute(
text("UPDATE user_channels SET last_seen_at = :now WHERE user_id = :uid AND status = 'followed'"),
{"now": datetime.utcnow(), "uid": current_user.id},
)
db.commit()
@router.get("", response_model=list[ChannelOut])
def list_channels(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
uid = current_user.id
# Step 1 — channel rows + user_channel metadata (fast, no video stats)
ch_rows = db.execute(
text("""
SELECT c.id, c.youtube_channel_id, c.name, c.description,
c.thumbnail_url, c.banner_url, c.subscriber_count, c.crawled_at,
uc.status, uc.auto_download, uc.muted_until, uc.notes, uc.last_seen_at
FROM channels c
JOIN user_channels uc ON c.id = uc.channel_id
WHERE uc.user_id = :uid AND uc.status = 'followed'
"""),
{"uid": uid},
).mappings().all()
if not ch_rows:
return []
id_csv = ",".join(str(r["id"]) for r in ch_rows)
last_seen = {r["id"]: r["last_seen_at"] for r in ch_rows}
# Step 2 — aggregated video stats for all channels in one query
vstats = {
r["channel_id"]: r
for r in db.execute(
text(f"""
SELECT v.channel_id,
COUNT(*) AS video_count,
MAX(v.published_at) AS last_published_at,
julianday(MAX(v.published_at)) - julianday(MIN(v.published_at)) AS date_span_days,
SUM(CASE WHEN COALESCE(uv.watched, 0) = 0 THEN 1 ELSE 0 END) AS unwatched_count,
SUM(CASE WHEN uv.watched = 1 THEN 1 ELSE 0 END) AS watched_count,
SUM(CASE WHEN uv.downloaded = 1 THEN 1 ELSE 0 END) AS downloaded_count
FROM videos v
LEFT JOIN user_videos uv ON uv.video_id = v.id AND uv.user_id = :uid
WHERE v.channel_id IN ({id_csv})
GROUP BY v.channel_id
"""),
{"uid": uid},
).mappings().all()
}
# Step 3 — new-video count per channel (videos indexed after last_seen_at)
new_counts = {
r["channel_id"]: r["new_count"]
for r in db.execute(
text(f"""
SELECT v.channel_id, COUNT(*) AS new_count
FROM videos v
JOIN user_channels uc
ON uc.channel_id = v.channel_id
AND uc.user_id = :uid
WHERE v.channel_id IN ({id_csv})
AND (uc.last_seen_at IS NULL OR v.indexed_at > uc.last_seen_at)
GROUP BY v.channel_id
"""),
{"uid": uid},
).mappings().all()
}
# Step 4 — latest video id + title per channel (derived-table join, no correlated subquery)
latest = {
r["channel_id"]: r
for r in db.execute(
text(f"""
SELECT v.channel_id,
v.youtube_video_id AS latest_video_id,
v.title AS latest_video_title
FROM videos v
JOIN (
SELECT channel_id, MAX(published_at) AS max_pub
FROM videos
WHERE channel_id IN ({id_csv})
GROUP BY channel_id
) m ON v.channel_id = m.channel_id AND v.published_at = m.max_pub
GROUP BY v.channel_id
"""),
).mappings().all()
}
# Merge and build response
result = []
for r in ch_rows:
cid = r["id"]
vs = vstats.get(cid) or {}
vc = vs.get("video_count") or 0
newest = vs.get("last_published_at")
span = vs.get("date_span_days")
freq = (span / (vc - 1.0)) if (vc >= 2 and span is not None) else None
result.append(ChannelOut(
id=cid,
youtube_channel_id=r["youtube_channel_id"],
name=r["name"],
description=r["description"],
thumbnail_url=r["thumbnail_url"],
banner_url=r.get("banner_url"),
subscriber_count=r.get("subscriber_count"),
crawled_at=r.get("crawled_at"),
status=r["status"],
auto_download=r.get("auto_download"),
muted_until=r.get("muted_until"),
notes=r.get("notes") or "",
video_count=vc,
last_published_at=newest,
unwatched_count=vs.get("unwatched_count") or 0,
watched_count=vs.get("watched_count") or 0,
downloaded_count=vs.get("downloaded_count") or 0,
new_count=new_counts.get(cid, 0),
latest_video_id=latest.get(cid, {}).get("latest_video_id"),
latest_video_title=latest.get(cid, {}).get("latest_video_title"),
upload_frequency_days=freq,
))
result.sort(key=lambda c: c.last_published_at or datetime.min, reverse=True)
return result
# ── Channel Groups (must be before /{channel_id} to avoid route shadowing) ───
@router.get("/groups", response_model=list[ChannelGroupOut])
def list_groups(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
groups = db.query(ChannelGroup).filter_by(user_id=current_user.id).all()
result = []
for g in groups:
members = db.query(ChannelGroupMember).filter_by(group_id=g.id).all()
result.append(ChannelGroupOut(id=g.id, name=g.name, channel_ids=[m.channel_id for m in members]))
return result
@router.post("/groups", response_model=ChannelGroupOut, status_code=201)
def create_group(
body: dict,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
name = (body.get("name") or "").strip()
if not name:
raise HTTPException(status_code=400, detail="name required")
g = ChannelGroup(user_id=current_user.id, name=name)
db.add(g)
db.commit()
db.refresh(g)
return ChannelGroupOut(id=g.id, name=g.name, channel_ids=[])
@router.delete("/groups/{group_id}", status_code=204)
def delete_group(
group_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
g = db.query(ChannelGroup).filter_by(id=group_id, user_id=current_user.id).first()
if not g:
raise HTTPException(status_code=404, detail="Group not found")
db.delete(g)
db.commit()
@router.patch("/groups/{group_id}", response_model=ChannelGroupOut)
def rename_group(
group_id: int,
body: dict,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
g = db.query(ChannelGroup).filter_by(id=group_id, user_id=current_user.id).first()
if not g:
raise HTTPException(status_code=404, detail="Group not found")
name = (body.get("name") or "").strip()
if name:
g.name = name
db.commit()
members = db.query(ChannelGroupMember).filter_by(group_id=g.id).all()
return ChannelGroupOut(id=g.id, name=g.name, channel_ids=[m.channel_id for m in members])
@router.post("/groups/{group_id}/channels/{channel_id}", status_code=204)
def add_channel_to_group(
group_id: int,
channel_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
g = db.query(ChannelGroup).filter_by(id=group_id, user_id=current_user.id).first()
if not g:
raise HTTPException(status_code=404, detail="Group not found")
existing = db.query(ChannelGroupMember).filter_by(group_id=group_id, channel_id=channel_id).first()
if not existing:
db.add(ChannelGroupMember(group_id=group_id, channel_id=channel_id))
db.commit()
@router.delete("/groups/{group_id}/channels/{channel_id}", status_code=204)
def remove_channel_from_group_route(
group_id: int,
channel_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
m = db.query(ChannelGroupMember).filter_by(group_id=group_id, channel_id=channel_id).first()
if m:
db.delete(m)
db.commit()
class BulkChannelBody(BaseModel):
channel_ids: list[int]
action: str # "mute" | "unmute" | "unfollow"
@router.post("/bulk-action", status_code=200)
def bulk_channel_action(
body: BulkChannelBody,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
if not body.channel_ids:
return {"ok": True}
placeholders = ",".join(str(int(i)) for i in body.channel_ids)
if body.action == "mute":
db.execute(
text(f"""
UPDATE user_channels SET muted_until = :until
WHERE user_id = :user_id AND channel_id IN ({placeholders})
"""),
{"until": datetime.utcnow() + timedelta(days=30), "user_id": current_user.id},
)
elif body.action == "unmute":
db.execute(
text(f"UPDATE user_channels SET muted_until = NULL WHERE user_id = :user_id AND channel_id IN ({placeholders})"),
{"user_id": current_user.id},
)
elif body.action == "unfollow":
db.execute(
text(f"DELETE FROM user_channels WHERE user_id = :user_id AND channel_id IN ({placeholders})"),
{"user_id": current_user.id},
)
db.commit()
return {"ok": True}
@router.get("/{channel_id}", response_model=ChannelOut)
def get_channel(
channel_id: int,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
row = db.execute(
text("""
SELECT c.*, uc.status, uc.auto_download, uc.muted_until,
(SELECT COUNT(*) FROM videos WHERE channel_id = c.id) AS video_count,
(SELECT MAX(v.published_at) FROM videos v WHERE v.channel_id = c.id) AS last_published_at,
(SELECT COUNT(*) FROM videos v
LEFT JOIN user_videos uv ON v.id = uv.video_id AND uv.user_id = :user_id
WHERE v.channel_id = c.id AND COALESCE(uv.watched, 0) = 0) AS unwatched_count,
(SELECT COUNT(*) FROM videos v
JOIN user_videos uv ON v.id = uv.video_id AND uv.user_id = :user_id
WHERE v.channel_id = c.id AND uv.watched = 1) AS watched_count,
(SELECT COUNT(*) FROM videos v
JOIN user_videos uv ON v.id = uv.video_id AND uv.user_id = :user_id
WHERE v.channel_id = c.id AND uv.downloaded = 1) AS downloaded_count,
0 AS new_count,
(SELECT v.youtube_video_id FROM videos v
WHERE v.channel_id = c.id ORDER BY v.published_at DESC LIMIT 1) AS latest_video_id,
(SELECT v.title FROM videos v
WHERE v.channel_id = c.id ORDER BY v.published_at DESC LIMIT 1) AS latest_video_title
FROM channels c
LEFT JOIN user_channels uc ON c.id = uc.channel_id AND uc.user_id = :user_id
WHERE c.id = :channel_id
"""),
{"user_id": current_user.id, "channel_id": channel_id},
).mappings().first()
if not row:
raise HTTPException(status_code=404, detail="Channel not found")
# Re-index in the background if stale (not crawled in the last hour)
stale = True
try:
crawled_at_raw = row.get("crawled_at")
if crawled_at_raw:
crawled_at = (
crawled_at_raw if isinstance(crawled_at_raw, datetime)
else datetime.fromisoformat(str(crawled_at_raw))
)
stale = (datetime.utcnow() - crawled_at).total_seconds() > 3600
except Exception:
pass
if stale:
background_tasks.add_task(_index_channel_task, channel_id, current_user.id)
return ChannelOut(**dict(row))
@router.get("/{channel_id}/videos", response_model=list[VideoOut])
def get_channel_videos(
channel_id: int,
sort: str = "newest",
offset: int = 0,
limit: int = 60,
q: str = "",
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
_get_channel_or_404(db, channel_id)
order = {
"newest": "v.published_at DESC NULLS LAST",
"oldest": "v.published_at ASC NULLS LAST",
"title": "v.title ASC",
"unwatched":"COALESCE(uv.watched, 0) ASC, v.published_at DESC NULLS LAST",
"popular": "v.view_count DESC NULLS LAST",
}.get(sort, "v.published_at DESC NULLS LAST")
params: dict = {"user_id": current_user.id, "channel_id": channel_id, "limit": limit, "offset": offset}
q_clause = ""
if q.strip():
q_clause = "AND (v.title LIKE :q OR v.description LIKE :q)"
params["q"] = f"%{q.strip()}%"
rows = db.execute(
text(f"""
SELECT v.id, v.youtube_video_id, v.title, v.thumbnail_url,
v.duration_seconds, v.published_at, v.view_count,
COALESCE(uv.downloaded, 0) AS is_downloaded,
COALESCE(uv.watched, 0) AS is_watched
FROM videos v
LEFT JOIN user_videos uv ON v.id = uv.video_id AND uv.user_id = :user_id
WHERE v.channel_id = :channel_id {q_clause}
ORDER BY {order}
LIMIT :limit OFFSET :offset
"""),
params,
).mappings().all()
return [VideoOut(**dict(r)) for r in rows]
@router.post("/{channel_id}/fetch-popular", status_code=status.HTTP_202_ACCEPTED)
def fetch_popular_videos(
channel_id: int,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Fetch the channel's most popular videos from YouTube and index them."""
channel = _get_channel_or_404(db, channel_id)
background_tasks.add_task(_fetch_popular_task, channel_id, channel.youtube_channel_id)
return {"detail": "Fetching popular videos"}
def _fetch_popular_task(channel_id: int, youtube_channel_id: str):
"""Two-phase popular fetch: get IDs fast via flat-playlist, then enrich with full metadata in parallel."""
from ..database import SessionLocal
from concurrent.futures import ThreadPoolExecutor, as_completed
if youtube_channel_id.startswith("@"):
url = f"https://www.youtube.com/{youtube_channel_id}/videos?sort=p"
else:
url = f"https://www.youtube.com/channel/{youtube_channel_id}/videos?sort=p"
# Phase 1: get ordered list of popular video IDs (fast)
stdout, _, _ = ytdlp._run([
"yt-dlp", url,
"--dump-json", "--flat-playlist",
"--playlist-end", "30",
"--quiet",
*ytdlp._cookie_args(),
], timeout=60)
video_ids = []
for line in stdout.splitlines():
line = line.strip()
if not line:
continue
try:
info = json.loads(line)
yt_id = info.get("id")
if yt_id:
video_ids.append(yt_id)
except json.JSONDecodeError:
continue
if not video_ids:
return
# Phase 2: fetch full metadata in parallel (gets view_count + published_at)
with ThreadPoolExecutor(max_workers=8) as pool:
futures = {pool.submit(ytdlp.fetch_video_metadata, vid): vid for vid in video_ids}
results = {}
for future in as_completed(futures):
vid = futures[future]
try:
results[vid] = future.result()
except Exception:
results[vid] = None
db = SessionLocal()
try:
channel = db.query(Channel).filter_by(id=channel_id).first()
if not channel:
return
for yt_id in video_ids:
meta = results.get(yt_id)
if not meta:
continue
try:
existing = db.query(Video).filter_by(youtube_video_id=yt_id).first()
if existing:
if meta.get("view_count") is not None:
existing.view_count = meta["view_count"]
if meta.get("published_at") and not existing.published_at:
existing.published_at = meta["published_at"]
else:
db.add(Video(
youtube_video_id=yt_id,
channel_id=channel.id,
title=meta.get("title", ""),
thumbnail_url=ytdlp._stable_thumbnail(yt_id),
duration_seconds=meta.get("duration_seconds"),
published_at=meta.get("published_at"),
tags=meta.get("tags") or "[]",
view_count=meta.get("view_count"),
))
db.commit()
except Exception:
db.rollback()
finally:
db.close()
@router.post("/{channel_id}/search", status_code=status.HTTP_202_ACCEPTED)
def search_channel_youtube(
channel_id: int,
q: str,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Search YouTube within this channel and index matching videos."""
channel = _get_channel_or_404(db, channel_id)
background_tasks.add_task(_search_channel_task, channel_id, channel.youtube_channel_id, q, current_user.id)
return {"detail": "Search started"}
def _search_channel_task(channel_id: int, youtube_channel_id: str, q: str, user_id: int):
"""Fetch videos matching q from YouTube for this channel and index them."""
from ..database import SessionLocal
from urllib.parse import quote
db = SessionLocal()
try:
url = f"https://www.youtube.com/channel/{youtube_channel_id}/search?query={quote(q)}"
result = ytdlp.fetch_channel_metadata(youtube_channel_id, max_videos=100)
if not result:
return
# Filter results by query match (yt-dlp fetches all; we filter titles locally)
q_lower = q.lower()
matched = [v for v in result.get("videos", []) if q_lower in (v.get("title") or "").lower()]
if not matched:
matched = result.get("videos", [])[:30]
channel = db.query(Channel).filter_by(id=channel_id).first()
if not channel:
return
for vdata in matched:
yt_id = vdata.get("youtube_video_id")
if not yt_id:
continue
existing = db.query(Video).filter_by(youtube_video_id=yt_id).first()
if not existing:
db.add(Video(
youtube_video_id=yt_id,
channel_id=channel.id,
title=vdata.get("title", ""),
description=vdata.get("description"),
thumbnail_url=vdata.get("thumbnail_url"),
duration_seconds=vdata.get("duration_seconds"),
published_at=vdata.get("published_at"),
tags=vdata.get("tags"),
category=vdata.get("category"),
))
db.commit()
except Exception:
db.rollback()
finally:
db.close()
@router.post("/{channel_id}/follow", status_code=status.HTTP_204_NO_CONTENT)
def follow_channel(
channel_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
_get_channel_or_404(db, channel_id)
uc = db.query(UserChannel).filter_by(user_id=current_user.id, channel_id=channel_id).first()
if uc:
uc.status = "followed"
else:
db.add(UserChannel(user_id=current_user.id, channel_id=channel_id, status="followed"))
db.commit()
@router.delete("/{channel_id}/follow", status_code=status.HTTP_204_NO_CONTENT)
def unfollow_channel(
channel_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
uc = db.query(UserChannel).filter_by(user_id=current_user.id, channel_id=channel_id).first()
if uc:
db.delete(uc)
db.commit()
@router.patch("/{channel_id}/auto-download", status_code=200)
def set_channel_auto_download(
channel_id: int,
body: dict,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
uc = db.query(UserChannel).filter_by(user_id=current_user.id, channel_id=channel_id).first()
if not uc:
raise HTTPException(status_code=404, detail="Not following this channel")
value = body.get("auto_download") # True / False / None
uc.auto_download = value
db.commit()
return {"auto_download": uc.auto_download}
@router.post("/{channel_id}/index", status_code=status.HTTP_202_ACCEPTED)
def index_channel(
channel_id: int,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
_get_channel_or_404(db, channel_id)
background_tasks.add_task(_index_channel_task, channel_id, current_user.id, 100)
return {"detail": "Indexing started"}
@router.post("/{channel_id}/index-full", status_code=status.HTTP_202_ACCEPTED)
def index_channel_full(
channel_id: int,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
_get_channel_or_404(db, channel_id)
background_tasks.add_task(_index_channel_task, channel_id, current_user.id, 0)
return {"detail": "Full index started"}
@router.post("/{channel_id}/explore", status_code=status.HTTP_202_ACCEPTED)
def explore_channel_older(
channel_id: int,
page: int = 2,
background_tasks: BackgroundTasks = None,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Fetch a page of older videos from this channel (page 1 = newest 30, page 2 = next 100, etc.)."""
_get_channel_or_404(db, channel_id)
start = 1 if page <= 1 else (30 + (page - 2) * 100 + 1)
background_tasks.add_task(_index_channel_explore_task, channel_id, current_user.id, start, 100)
return {"detail": f"Fetching older videos (page {page})", "start": start}
def _index_channel_explore_task(channel_id: int, user_id: int, start_video: int, count: int):
from ..database import SessionLocal
db = SessionLocal()
try:
channel = db.query(Channel).filter_by(id=channel_id).first()
if not channel:
return
result = ytdlp.fetch_channel_metadata(channel.youtube_channel_id, max_videos=count, start_video=start_video)
if not result:
return
for vdata in result.get("videos", []):
yt_id = vdata.get("youtube_video_id")
if not yt_id:
continue
if not db.query(Video).filter_by(youtube_video_id=yt_id).first():
db.add(Video(
youtube_video_id=yt_id,
channel_id=channel.id,
title=vdata.get("title", ""),
description=vdata.get("description"),
thumbnail_url=vdata.get("thumbnail_url"),
duration_seconds=vdata.get("duration_seconds"),
published_at=vdata.get("published_at"),
tags=vdata.get("tags"),
category=vdata.get("category"),
))
db.commit()
except Exception:
db.rollback()
finally:
db.close()
@router.post("/follow-bulk", status_code=200)
def follow_bulk(
body: dict,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Follow a large list of channel handles/IDs without hitting YouTube.
Creates stub Channel records for unknowns and UserChannel rows immediately.
Metadata (name, thumbnail, videos) fills in when the user hits Sync All.
"""
handles = body.get("handles", [])
if not handles or not isinstance(handles, list):
raise HTTPException(status_code=400, detail="handles list required")
followed = 0
already = 0
created = 0
for handle in handles:
handle = str(handle).strip()
if not handle:
continue
channel = db.query(Channel).filter_by(youtube_channel_id=handle).first()
if not channel:
# Stub — name defaults to handle, filled in on next index
channel = Channel(
youtube_channel_id=handle,
name=handle.lstrip("@"),
)
db.add(channel)
db.flush()
created += 1
uc = db.query(UserChannel).filter_by(
user_id=current_user.id, channel_id=channel.id
).first()
if uc:
if uc.status != "followed":
uc.status = "followed"
followed += 1
else:
already += 1
else:
db.add(UserChannel(
user_id=current_user.id,
channel_id=channel.id,
status="followed",
))
followed += 1
db.commit()
return {"followed": followed, "already_following": already, "new_channels": created}
@router.patch("/{channel_id}/notes", status_code=200)
def update_channel_notes(
channel_id: int,
body: dict,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
uc = db.query(UserChannel).filter_by(user_id=current_user.id, channel_id=channel_id).first()
if not uc:
raise HTTPException(status_code=404, detail="Not following this channel")
uc.notes = body.get("notes", "") or ""
db.commit()
return {"ok": True}
@router.post("/{channel_id}/mute", status_code=204)
def mute_channel(
channel_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
uc = db.query(UserChannel).filter_by(user_id=current_user.id, channel_id=channel_id).first()
if not uc:
raise HTTPException(status_code=404, detail="Not following this channel")
uc.muted_until = datetime.utcnow() + timedelta(days=30)
db.commit()
@router.delete("/{channel_id}/mute", status_code=204)
def unmute_channel(
channel_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
uc = db.query(UserChannel).filter_by(user_id=current_user.id, channel_id=channel_id).first()
if uc:
uc.muted_until = None
db.commit()
@router.post("/follow-by-url", status_code=status.HTTP_201_CREATED)
def follow_by_url(
body: dict,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
yt_channel_id = body.get("youtube_channel_id") or body.get("channel_id")
if not yt_channel_id:
raise HTTPException(status_code=400, detail="youtube_channel_id required")
channel = db.query(Channel).filter_by(youtube_channel_id=yt_channel_id).first()
if not channel:
meta = ytdlp.fetch_channel_metadata(yt_channel_id, max_videos=30)
if not meta or not meta.get("channel"):
raise HTTPException(status_code=404, detail="Channel not found on YouTube")
ch_data = meta["channel"]
channel = Channel(**{k: v for k, v in ch_data.items() if hasattr(Channel, k)})
channel.crawled_at = datetime.utcnow()
db.add(channel)
db.flush()
for vdata in meta.get("videos", []):
yt_id = vdata.get("youtube_video_id")
if not yt_id:
continue
if not db.query(Video).filter_by(youtube_video_id=yt_id).first():
db.add(Video(
youtube_video_id=yt_id,
channel_id=channel.id,
title=vdata.get("title", ""),
description=vdata.get("description"),
thumbnail_url=vdata.get("thumbnail_url"),
duration_seconds=vdata.get("duration_seconds"),
published_at=vdata.get("published_at"),
tags=vdata.get("tags"),
category=vdata.get("category"),
))
db.commit()
db.refresh(channel)
uc = db.query(UserChannel).filter_by(user_id=current_user.id, channel_id=channel.id).first()
if uc:
uc.status = "followed"
else:
db.add(UserChannel(user_id=current_user.id, channel_id=channel.id, status="followed"))
db.commit()
background_tasks.add_task(_discovery_task, current_user.id)
return {"channel_id": channel.id, "name": channel.name}