import json
import threading as _threading
from datetime import datetime, timedelta
from typing import Optional
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, status
from pydantic import BaseModel
from sqlalchemy.orm import Session
from sqlalchemy import text
from ..auth_utils import get_current_user
from ..database import get_db
from ..models import Channel, ChannelGroup, ChannelGroupMember, Download, User, UserChannel, UserSettings, UserVideo, Video
from ..services import ytdlp
router = APIRouter()
_tasks: dict = {}
_tasks_lock = _threading.Lock()
class ChannelOut(BaseModel):
id: int
youtube_channel_id: str
name: str
description: Optional[str]
thumbnail_url: Optional[str]
banner_url: Optional[str]
crawled_at: Optional[datetime]
status: Optional[str]
auto_download: Optional[bool] = None
subscriber_count: Optional[int] = None
video_count: int = 0
unwatched_count: int = 0
watched_count: int = 0
downloaded_count: int = 0
last_published_at: Optional[datetime] = None
new_count: int = 0
latest_video_id: Optional[str] = None
latest_video_title: Optional[str] = None
muted_until: Optional[datetime] = None
upload_frequency_days: Optional[float] = None
notes: Optional[str] = ""
model_config = {"from_attributes": True}
class ChannelGroupOut(BaseModel):
id: int
name: str
channel_ids: list[int] = []
model_config = {"from_attributes": True}
class VideoOut(BaseModel):
id: int
youtube_video_id: str
title: str
thumbnail_url: Optional[str]
duration_seconds: Optional[int]
published_at: Optional[datetime]
channel_id: Optional[int] = None
channel_name: Optional[str] = None
channel_youtube_id: Optional[str] = None
is_downloaded: bool = False
is_watched: bool = False
queued: bool = False
view_count: Optional[int] = None
model_config = {"from_attributes": True}
def _get_channel_or_404(db: Session, channel_id: int) -> Channel:
c = db.query(Channel).filter(Channel.id == channel_id).first()
if not c:
raise HTTPException(status_code=404, detail="Channel not found")
return c
def _index_channels_batch(channel_ids: list[int], user_id: int, delay: float = 1.5):
for cid in channel_ids:
_index_channel_task(cid, user_id)
def _index_channel_task(channel_id: int, user_id: int, max_videos: int = 30):
from ..database import SessionLocal
db = SessionLocal()
try:
channel = db.query(Channel).filter_by(id=channel_id).first()
if not channel:
return
result = ytdlp.fetch_channel_metadata(channel.youtube_channel_id, max_videos=max_videos, polite=True)
if not result:
return
ch_data = result.get("channel", {})
if ch_data:
for k, v in ch_data.items():
if hasattr(channel, k) and v is not None and v != "":
setattr(channel, k, v)
channel.crawled_at = datetime.utcnow()
db.merge(channel)
new_video_ids = []
for vdata in result.get("videos", []):
yt_id = vdata.get("youtube_video_id")
if not yt_id:
continue
existing = db.query(Video).filter_by(youtube_video_id=yt_id).first()
if not existing:
new_video = Video(
youtube_video_id=yt_id,
channel_id=channel.id,
title=vdata.get("title", ""),
description=vdata.get("description"),
thumbnail_url=vdata.get("thumbnail_url"),
duration_seconds=vdata.get("duration_seconds"),
published_at=vdata.get("published_at"),
tags=vdata.get("tags"),
category=vdata.get("category"),
)
db.add(new_video)
db.flush()
new_video_ids.append((yt_id, new_video.id))
else:
# Backfill missing metadata on existing videos
if existing.published_at is None and vdata.get("published_at"):
existing.published_at = vdata["published_at"]
if not existing.title and vdata.get("title"):
existing.title = vdata["title"]
if not existing.thumbnail_url and vdata.get("thumbnail_url"):
existing.thumbnail_url = vdata["thumbnail_url"]
if not existing.duration_seconds and vdata.get("duration_seconds"):
existing.duration_seconds = vdata["duration_seconds"]
if not existing.description and vdata.get("description"):
existing.description = vdata["description"]
db.commit()
# Auto-download new videos if setting says to
if new_video_ids and user_id:
uc = db.query(UserChannel).filter_by(user_id=user_id, channel_id=channel.id).first()
user_settings = db.query(UserSettings).filter_by(user_id=user_id).first()
global_auto = user_settings.auto_download_on_sync if user_settings else False
channel_auto = uc.auto_download if uc and uc.auto_download is not None else global_auto
if channel_auto:
quality = user_settings.preferred_quality if user_settings else "best"
subtitle_langs = (user_settings.subtitle_langs or "") if user_settings else ""
from ..routers.downloads import _on_progress, _on_complete, _on_error
for yt_id, vid_id in new_video_ids:
existing_dl = db.query(Download).filter_by(
user_id=user_id, video_id=vid_id
).filter(Download.status.in_(["pending", "downloading", "complete"])).first()
if not existing_dl:
dl = Download(user_id=user_id, video_id=vid_id, status="pending")
db.add(dl)
db.flush()
import threading
t = threading.Thread(
target=ytdlp.start_download,
args=(yt_id, dl.id, _on_progress, _on_complete, _on_error, quality, subtitle_langs),
daemon=True,
)
t.start()
db.commit()
except Exception:
db.rollback()
finally:
db.close()
def _discovery_task(user_id: int):
from ..database import SessionLocal
from ..services.discovery import run_full_discovery
db = SessionLocal()
try:
run_full_discovery(db, user_id)
except Exception:
pass
finally:
db.close()
def _enrich_missing_task(limit: int = 20):
"""Fetch full metadata for videos missing description, published_at, or view_count."""
from ..database import SessionLocal
import time
db = SessionLocal()
try:
rows = db.execute(
text("""
SELECT v.id, v.youtube_video_id FROM videos v
WHERE v.description IS NULL OR v.published_at IS NULL OR v.view_count IS NULL
ORDER BY
(EXISTS (SELECT 1 FROM user_channels uc
WHERE uc.channel_id = v.channel_id AND uc.status = 'followed')) DESC,
(EXISTS (SELECT 1 FROM discovery_queue dq
WHERE dq.channel_id = v.channel_id)) DESC,
v.id DESC
LIMIT :limit
"""),
{"limit": limit},
).mappings().all()
for row in rows:
try:
meta = ytdlp.fetch_video_metadata(row["youtube_video_id"], polite=True)
if meta:
vid = db.query(Video).filter_by(id=row["id"]).first()
if vid:
if meta.get("description") is not None:
vid.description = meta["description"] or ""
if not vid.published_at and meta.get("published_at"):
vid.published_at = meta["published_at"]
if vid.view_count is None and meta.get("view_count") is not None:
vid.view_count = meta["view_count"]
if not vid.tags and meta.get("tags"):
vid.tags = meta["tags"]
if not vid.category and meta.get("category"):
vid.category = meta["category"]
if not vid.chapters and meta.get("chapters"):
vid.chapters = meta["chapters"]
db.commit()
except Exception:
db.rollback()
finally:
db.close()
@router.get("/feed", response_model=list[VideoOut])
def channel_feed(
limit: int = 24,
offset: int = 0,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
rows = db.execute(
text("""
SELECT v.id, v.youtube_video_id, v.title, v.thumbnail_url,
v.duration_seconds, v.published_at,
c.id AS channel_id, c.name AS channel_name, c.youtube_channel_id AS channel_youtube_id,
COALESCE(uv.downloaded, 0) AS is_downloaded,
COALESCE(uv.watched, 0) AS is_watched,
COALESCE(uv.queued, 0) AS queued
FROM videos v
JOIN channels c ON v.channel_id = c.id
JOIN user_channels uc ON c.id = uc.channel_id AND uc.user_id = :user_id AND uc.status = 'followed'
LEFT JOIN user_videos uv ON v.id = uv.video_id AND uv.user_id = :user_id
ORDER BY v.published_at DESC
LIMIT :limit OFFSET :offset
"""),
{"user_id": current_user.id, "limit": limit, "offset": offset},
).mappings().all()
return [VideoOut(**dict(r)) for r in rows]
@router.post("/sync-all", status_code=202)
def sync_all_channels(
background_tasks: BackgroundTasks,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
# Only sync channels not touched in the last 6 hours to avoid hammering YouTube
channels = db.execute(
text("""
SELECT c.id FROM channels c
JOIN user_channels uc ON c.id = uc.channel_id
WHERE uc.user_id = :uid AND uc.status = 'followed'
AND (c.crawled_at IS NULL OR c.crawled_at < datetime('now', '-6 hours'))
ORDER BY COALESCE(c.crawled_at, '1970-01-01') ASC
"""),
{"uid": current_user.id},
).mappings().all()
if channels:
ids = [row["id"] for row in channels]
background_tasks.add_task(_index_channels_batch, ids, current_user.id)
background_tasks.add_task(_discovery_task, current_user.id)
background_tasks.add_task(_enrich_missing_task, 30)
return {"indexing": len(channels)}
@router.get("/rss")
def rss_feed(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
from fastapi.responses import Response
rows = db.execute(
text("""
SELECT v.youtube_video_id, v.title, v.description, v.published_at,
c.name AS channel_name, c.youtube_channel_id
FROM videos v
JOIN channels c ON v.channel_id = c.id
JOIN user_channels uc ON c.id = uc.channel_id AND uc.user_id = :uid AND uc.status = 'followed'
WHERE v.published_at IS NOT NULL
ORDER BY v.published_at DESC
LIMIT 100
"""),
{"uid": current_user.id},
).mappings().all()
def esc(s):
if not s:
return ""
return str(s).replace("&", "&").replace("<", "<").replace(">", ">").replace('"', """)
items = []
for r in rows:
pub = r["published_at"]
pub_str = pub.strftime("%a, %d %b %Y %H:%M:%S +0000") if pub else ""
yt_id = r["youtube_video_id"]
items.append(f""" {esc(r['title'])}
https://www.youtube.com/watch?v={yt_id}
{esc(r['description'] or '')}{esc(r['channel_name'])}{pub_str}https://www.youtube.com/watch?v={yt_id}""")
xml = f"""
YTContinue — Following
https://www.youtube.com/
Latest videos from your followed channels
{chr(10).join(items)}
"""
return Response(content=xml, media_type="application/rss+xml; charset=utf-8")
@router.get("/tasks")
def get_active_tasks(current_user: User = Depends(get_current_user)):
with _tasks_lock:
return list(_tasks.values())
@router.post("/mark-seen", status_code=204)
def mark_channels_seen(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
db.execute(
text("UPDATE user_channels SET last_seen_at = :now WHERE user_id = :uid AND status = 'followed'"),
{"now": datetime.utcnow(), "uid": current_user.id},
)
db.commit()
@router.get("", response_model=list[ChannelOut])
def list_channels(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
uid = current_user.id
# Step 1 — channel rows + user_channel metadata (fast, no video stats)
ch_rows = db.execute(
text("""
SELECT c.id, c.youtube_channel_id, c.name, c.description,
c.thumbnail_url, c.banner_url, c.subscriber_count, c.crawled_at,
uc.status, uc.auto_download, uc.muted_until, uc.notes, uc.last_seen_at
FROM channels c
JOIN user_channels uc ON c.id = uc.channel_id
WHERE uc.user_id = :uid AND uc.status = 'followed'
"""),
{"uid": uid},
).mappings().all()
if not ch_rows:
return []
id_csv = ",".join(str(r["id"]) for r in ch_rows)
last_seen = {r["id"]: r["last_seen_at"] for r in ch_rows}
# Step 2 — aggregated video stats for all channels in one query
vstats = {
r["channel_id"]: r
for r in db.execute(
text(f"""
SELECT v.channel_id,
COUNT(*) AS video_count,
MAX(v.published_at) AS last_published_at,
julianday(MAX(v.published_at)) - julianday(MIN(v.published_at)) AS date_span_days,
SUM(CASE WHEN COALESCE(uv.watched, 0) = 0 THEN 1 ELSE 0 END) AS unwatched_count,
SUM(CASE WHEN uv.watched = 1 THEN 1 ELSE 0 END) AS watched_count,
SUM(CASE WHEN uv.downloaded = 1 THEN 1 ELSE 0 END) AS downloaded_count
FROM videos v
LEFT JOIN user_videos uv ON uv.video_id = v.id AND uv.user_id = :uid
WHERE v.channel_id IN ({id_csv})
GROUP BY v.channel_id
"""),
{"uid": uid},
).mappings().all()
}
# Step 3 — new-video count per channel (videos indexed after last_seen_at)
new_counts = {
r["channel_id"]: r["new_count"]
for r in db.execute(
text(f"""
SELECT v.channel_id, COUNT(*) AS new_count
FROM videos v
JOIN user_channels uc
ON uc.channel_id = v.channel_id
AND uc.user_id = :uid
WHERE v.channel_id IN ({id_csv})
AND (uc.last_seen_at IS NULL OR v.indexed_at > uc.last_seen_at)
GROUP BY v.channel_id
"""),
{"uid": uid},
).mappings().all()
}
# Step 4 — latest video id + title per channel (derived-table join, no correlated subquery)
latest = {
r["channel_id"]: r
for r in db.execute(
text(f"""
SELECT v.channel_id,
v.youtube_video_id AS latest_video_id,
v.title AS latest_video_title
FROM videos v
JOIN (
SELECT channel_id, MAX(published_at) AS max_pub
FROM videos
WHERE channel_id IN ({id_csv})
GROUP BY channel_id
) m ON v.channel_id = m.channel_id AND v.published_at = m.max_pub
GROUP BY v.channel_id
"""),
).mappings().all()
}
# Merge and build response
result = []
for r in ch_rows:
cid = r["id"]
vs = vstats.get(cid) or {}
vc = vs.get("video_count") or 0
newest = vs.get("last_published_at")
span = vs.get("date_span_days")
freq = (span / (vc - 1.0)) if (vc >= 2 and span is not None) else None
result.append(ChannelOut(
id=cid,
youtube_channel_id=r["youtube_channel_id"],
name=r["name"],
description=r["description"],
thumbnail_url=r["thumbnail_url"],
banner_url=r.get("banner_url"),
subscriber_count=r.get("subscriber_count"),
crawled_at=r.get("crawled_at"),
status=r["status"],
auto_download=r.get("auto_download"),
muted_until=r.get("muted_until"),
notes=r.get("notes") or "",
video_count=vc,
last_published_at=newest,
unwatched_count=vs.get("unwatched_count") or 0,
watched_count=vs.get("watched_count") or 0,
downloaded_count=vs.get("downloaded_count") or 0,
new_count=new_counts.get(cid, 0),
latest_video_id=latest.get(cid, {}).get("latest_video_id"),
latest_video_title=latest.get(cid, {}).get("latest_video_title"),
upload_frequency_days=freq,
))
result.sort(key=lambda c: c.last_published_at or datetime.min, reverse=True)
return result
# ── Channel Groups (must be before /{channel_id} to avoid route shadowing) ───
@router.get("/groups", response_model=list[ChannelGroupOut])
def list_groups(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
groups = db.query(ChannelGroup).filter_by(user_id=current_user.id).all()
result = []
for g in groups:
members = db.query(ChannelGroupMember).filter_by(group_id=g.id).all()
result.append(ChannelGroupOut(id=g.id, name=g.name, channel_ids=[m.channel_id for m in members]))
return result
@router.post("/groups", response_model=ChannelGroupOut, status_code=201)
def create_group(
body: dict,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
name = (body.get("name") or "").strip()
if not name:
raise HTTPException(status_code=400, detail="name required")
g = ChannelGroup(user_id=current_user.id, name=name)
db.add(g)
db.commit()
db.refresh(g)
return ChannelGroupOut(id=g.id, name=g.name, channel_ids=[])
@router.delete("/groups/{group_id}", status_code=204)
def delete_group(
group_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
g = db.query(ChannelGroup).filter_by(id=group_id, user_id=current_user.id).first()
if not g:
raise HTTPException(status_code=404, detail="Group not found")
db.delete(g)
db.commit()
@router.patch("/groups/{group_id}", response_model=ChannelGroupOut)
def rename_group(
group_id: int,
body: dict,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
g = db.query(ChannelGroup).filter_by(id=group_id, user_id=current_user.id).first()
if not g:
raise HTTPException(status_code=404, detail="Group not found")
name = (body.get("name") or "").strip()
if name:
g.name = name
db.commit()
members = db.query(ChannelGroupMember).filter_by(group_id=g.id).all()
return ChannelGroupOut(id=g.id, name=g.name, channel_ids=[m.channel_id for m in members])
@router.post("/groups/{group_id}/channels/{channel_id}", status_code=204)
def add_channel_to_group(
group_id: int,
channel_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
g = db.query(ChannelGroup).filter_by(id=group_id, user_id=current_user.id).first()
if not g:
raise HTTPException(status_code=404, detail="Group not found")
existing = db.query(ChannelGroupMember).filter_by(group_id=group_id, channel_id=channel_id).first()
if not existing:
db.add(ChannelGroupMember(group_id=group_id, channel_id=channel_id))
db.commit()
@router.delete("/groups/{group_id}/channels/{channel_id}", status_code=204)
def remove_channel_from_group_route(
group_id: int,
channel_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
m = db.query(ChannelGroupMember).filter_by(group_id=group_id, channel_id=channel_id).first()
if m:
db.delete(m)
db.commit()
class BulkChannelBody(BaseModel):
channel_ids: list[int]
action: str # "mute" | "unmute" | "unfollow"
@router.post("/bulk-action", status_code=200)
def bulk_channel_action(
body: BulkChannelBody,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
if not body.channel_ids:
return {"ok": True}
placeholders = ",".join(str(int(i)) for i in body.channel_ids)
if body.action == "mute":
db.execute(
text(f"""
UPDATE user_channels SET muted_until = :until
WHERE user_id = :user_id AND channel_id IN ({placeholders})
"""),
{"until": datetime.utcnow() + timedelta(days=30), "user_id": current_user.id},
)
elif body.action == "unmute":
db.execute(
text(f"UPDATE user_channels SET muted_until = NULL WHERE user_id = :user_id AND channel_id IN ({placeholders})"),
{"user_id": current_user.id},
)
elif body.action == "unfollow":
db.execute(
text(f"DELETE FROM user_channels WHERE user_id = :user_id AND channel_id IN ({placeholders})"),
{"user_id": current_user.id},
)
db.commit()
return {"ok": True}
@router.get("/{channel_id}", response_model=ChannelOut)
def get_channel(
channel_id: int,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
row = db.execute(
text("""
SELECT c.*, uc.status, uc.auto_download, uc.muted_until,
(SELECT COUNT(*) FROM videos WHERE channel_id = c.id) AS video_count,
(SELECT MAX(v.published_at) FROM videos v WHERE v.channel_id = c.id) AS last_published_at,
(SELECT COUNT(*) FROM videos v
LEFT JOIN user_videos uv ON v.id = uv.video_id AND uv.user_id = :user_id
WHERE v.channel_id = c.id AND COALESCE(uv.watched, 0) = 0) AS unwatched_count,
(SELECT COUNT(*) FROM videos v
JOIN user_videos uv ON v.id = uv.video_id AND uv.user_id = :user_id
WHERE v.channel_id = c.id AND uv.watched = 1) AS watched_count,
(SELECT COUNT(*) FROM videos v
JOIN user_videos uv ON v.id = uv.video_id AND uv.user_id = :user_id
WHERE v.channel_id = c.id AND uv.downloaded = 1) AS downloaded_count,
0 AS new_count,
(SELECT v.youtube_video_id FROM videos v
WHERE v.channel_id = c.id ORDER BY v.published_at DESC LIMIT 1) AS latest_video_id,
(SELECT v.title FROM videos v
WHERE v.channel_id = c.id ORDER BY v.published_at DESC LIMIT 1) AS latest_video_title
FROM channels c
LEFT JOIN user_channels uc ON c.id = uc.channel_id AND uc.user_id = :user_id
WHERE c.id = :channel_id
"""),
{"user_id": current_user.id, "channel_id": channel_id},
).mappings().first()
if not row:
raise HTTPException(status_code=404, detail="Channel not found")
# Re-index in the background if stale (not crawled in the last hour)
stale = True
try:
crawled_at_raw = row.get("crawled_at")
if crawled_at_raw:
crawled_at = (
crawled_at_raw if isinstance(crawled_at_raw, datetime)
else datetime.fromisoformat(str(crawled_at_raw))
)
stale = (datetime.utcnow() - crawled_at).total_seconds() > 3600
except Exception:
pass
if stale:
background_tasks.add_task(_index_channel_task, channel_id, current_user.id)
return ChannelOut(**dict(row))
@router.get("/{channel_id}/videos", response_model=list[VideoOut])
def get_channel_videos(
channel_id: int,
sort: str = "newest",
offset: int = 0,
limit: int = 60,
q: str = "",
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
_get_channel_or_404(db, channel_id)
params: dict = {"user_id": current_user.id, "channel_id": channel_id, "limit": limit, "offset": offset}
q_clause = ""
if q.strip():
q_clause = "AND (v.title LIKE :q OR v.description LIKE :q)"
params["q"] = f"%{q.strip()}%"
order = {
"newest": "v.published_at DESC NULLS LAST",
"oldest": "v.published_at ASC NULLS LAST",
"title": "v.title ASC",
"unwatched":"COALESCE(uv.watched, 0) ASC, v.published_at DESC NULLS LAST",
"popular": "v.view_count DESC NULLS LAST",
}.get(sort, "v.published_at DESC NULLS LAST")
view_count_clause = "AND v.view_count IS NOT NULL" if sort == "popular" else ""
rows = db.execute(
text(f"""
SELECT v.id, v.youtube_video_id, v.title, v.thumbnail_url,
v.duration_seconds, v.published_at, v.view_count,
COALESCE(uv.downloaded, 0) AS is_downloaded,
COALESCE(uv.watched, 0) AS is_watched
FROM videos v
LEFT JOIN user_videos uv ON v.id = uv.video_id AND uv.user_id = :user_id
WHERE v.channel_id = :channel_id {view_count_clause} {q_clause}
ORDER BY {order}
LIMIT :limit OFFSET :offset
"""),
params,
).mappings().all()
return [VideoOut(**dict(r)) for r in rows]
@router.post("/{channel_id}/fetch-popular", status_code=status.HTTP_202_ACCEPTED)
def fetch_popular_videos(
channel_id: int,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Fetch the channel's most popular videos from YouTube and index them."""
channel = _get_channel_or_404(db, channel_id)
background_tasks.add_task(_fetch_popular_task, channel_id, channel.youtube_channel_id, channel.name or "")
return {"detail": "Fetching popular videos"}
def _fetch_popular_task(channel_id: int, youtube_channel_id: str, channel_name: str = ""):
"""Half-and-half popular fetch.
Phase 1 (fast): flat-playlist crawl of the full channel → store any
new videos in DB (title, duration, thumbnail). No individual requests.
Phase 2 (sequential, polite): fetch each video's watch page one at a time
with a 2-second pause between requests to avoid cookie invalidation.
Prioritises videos missing view_count; caps at 100 per run.
"""
import time
from ..database import SessionLocal
task_id = f"popular-{channel_id}"
label = f"Popular fetch — {channel_name}" if channel_name else "Popular fetch"
# Phase 1 — flat-playlist: crawl all channel videos quickly
with _tasks_lock:
_tasks[task_id] = {
"id": task_id,
"label": label,
"phase": "Crawling channel…",
"total": 0,
"done": 0,
"started_at": datetime.utcnow().isoformat(),
}
if youtube_channel_id.startswith("@"):
url = f"https://www.youtube.com/{youtube_channel_id}/videos"
else:
url = f"https://www.youtube.com/channel/{youtube_channel_id}/videos"
stdout, _, _ = ytdlp._run([
"yt-dlp", url,
"--dump-json", "--flat-playlist",
"--quiet",
*ytdlp._cookie_args(),
], timeout=120)
flat_entries = []
for line in stdout.splitlines():
line = line.strip()
if not line:
continue
try:
info = json.loads(line)
yt_id = info.get("id")
if yt_id:
flat_entries.append({
"id": yt_id,
"title": info.get("title", ""),
"duration": info.get("duration"),
})
except json.JSONDecodeError:
continue
# Store any new videos from the flat crawl
if flat_entries:
db = SessionLocal()
try:
channel = db.query(Channel).filter_by(id=channel_id).first()
if channel:
for entry in flat_entries:
if not db.query(Video).filter_by(youtube_video_id=entry["id"]).first():
try:
db.add(Video(
youtube_video_id=entry["id"],
channel_id=channel_id,
title=entry["title"],
thumbnail_url=ytdlp._stable_thumbnail(entry["id"]),
duration_seconds=entry["duration"],
tags="[]",
))
db.commit()
except Exception:
db.rollback()
finally:
db.close()
# Phase 2 — sequential fetches with a polite delay to avoid cookie invalidation
db = SessionLocal()
try:
rows = db.execute(
text("""
SELECT youtube_video_id FROM videos
WHERE channel_id = :cid
ORDER BY (view_count IS NULL) DESC, RANDOM()
LIMIT 100
"""),
{"cid": channel_id},
).mappings().all()
video_ids = [r["youtube_video_id"] for r in rows]
finally:
db.close()
if not video_ids:
with _tasks_lock:
_tasks.pop(task_id, None)
return
with _tasks_lock:
if task_id in _tasks:
_tasks[task_id]["phase"] = "Enriching view counts…"
_tasks[task_id]["total"] = len(video_ids)
_tasks[task_id]["done"] = 0
try:
for yt_id in video_ids:
try:
meta = ytdlp.fetch_video_metadata(yt_id, polite=True)
if meta:
db = SessionLocal()
try:
vid = db.query(Video).filter_by(youtube_video_id=yt_id).first()
if vid:
if meta.get("view_count") is not None:
vid.view_count = meta["view_count"]
if not vid.published_at and meta.get("published_at"):
vid.published_at = meta["published_at"]
db.commit()
except Exception:
db.rollback()
finally:
db.close()
except Exception:
pass
with _tasks_lock:
if task_id in _tasks:
_tasks[task_id]["done"] += 1
finally:
with _tasks_lock:
_tasks.pop(task_id, None)
@router.post("/{channel_id}/search", status_code=status.HTTP_202_ACCEPTED)
def search_channel_youtube(
channel_id: int,
q: str,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Search YouTube within this channel and index matching videos."""
channel = _get_channel_or_404(db, channel_id)
background_tasks.add_task(_search_channel_task, channel_id, channel.youtube_channel_id, q, current_user.id)
return {"detail": "Search started"}
def _search_channel_task(channel_id: int, youtube_channel_id: str, q: str, user_id: int):
"""Fetch videos matching q from YouTube for this channel and index them."""
from ..database import SessionLocal
from urllib.parse import quote
db = SessionLocal()
try:
url = f"https://www.youtube.com/channel/{youtube_channel_id}/search?query={quote(q)}"
result = ytdlp.fetch_channel_metadata(youtube_channel_id, max_videos=100, polite=True)
if not result:
return
# Filter results by query match (yt-dlp fetches all; we filter titles locally)
q_lower = q.lower()
matched = [v for v in result.get("videos", []) if q_lower in (v.get("title") or "").lower()]
if not matched:
matched = result.get("videos", [])[:30]
channel = db.query(Channel).filter_by(id=channel_id).first()
if not channel:
return
for vdata in matched:
yt_id = vdata.get("youtube_video_id")
if not yt_id:
continue
existing = db.query(Video).filter_by(youtube_video_id=yt_id).first()
if not existing:
db.add(Video(
youtube_video_id=yt_id,
channel_id=channel.id,
title=vdata.get("title", ""),
description=vdata.get("description"),
thumbnail_url=vdata.get("thumbnail_url"),
duration_seconds=vdata.get("duration_seconds"),
published_at=vdata.get("published_at"),
tags=vdata.get("tags"),
category=vdata.get("category"),
))
db.commit()
except Exception:
db.rollback()
finally:
db.close()
@router.get("/{channel_id}/random", response_model=VideoOut)
def get_random_channel_video(
channel_id: int,
unwatched_only: bool = True,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
_get_channel_or_404(db, channel_id)
unwatched_clause = "AND COALESCE(uv.watched, 0) = 0" if unwatched_only else ""
row = db.execute(
text(f"""
SELECT v.id, v.youtube_video_id, v.title, v.thumbnail_url,
v.duration_seconds, v.published_at, v.view_count,
c.id AS channel_id, c.name AS channel_name, c.youtube_channel_id AS channel_youtube_id,
COALESCE(uv.downloaded, 0) AS is_downloaded,
COALESCE(uv.watched, 0) AS is_watched,
COALESCE(uv.queued, 0) AS queued
FROM videos v
JOIN channels c ON v.channel_id = c.id
LEFT JOIN user_videos uv ON v.id = uv.video_id AND uv.user_id = :user_id
WHERE v.channel_id = :channel_id {unwatched_clause}
ORDER BY RANDOM()
LIMIT 1
"""),
{"user_id": current_user.id, "channel_id": channel_id},
).mappings().first()
if not row:
raise HTTPException(status_code=404, detail="No videos found")
return VideoOut(**dict(row))
@router.get("/{channel_id}/in-progress", response_model=list[VideoOut])
def get_channel_in_progress(
channel_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
_get_channel_or_404(db, channel_id)
rows = db.execute(
text("""
SELECT v.id, v.youtube_video_id, v.title, v.thumbnail_url,
v.duration_seconds, v.published_at, v.view_count,
c.id AS channel_id, c.name AS channel_name, c.youtube_channel_id AS channel_youtube_id,
COALESCE(uv.downloaded, 0) AS is_downloaded,
COALESCE(uv.watched, 0) AS is_watched,
COALESCE(uv.queued, 0) AS queued
FROM videos v
JOIN channels c ON v.channel_id = c.id
JOIN user_videos uv ON v.id = uv.video_id AND uv.user_id = :user_id
WHERE v.channel_id = :channel_id
AND uv.watch_progress_seconds > 30
AND COALESCE(uv.watched, 0) = 0
ORDER BY uv.watch_progress_seconds DESC
LIMIT 6
"""),
{"user_id": current_user.id, "channel_id": channel_id},
).mappings().all()
return [VideoOut(**dict(r)) for r in rows]
@router.post("/{channel_id}/follow", status_code=status.HTTP_204_NO_CONTENT)
def follow_channel(
channel_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
_get_channel_or_404(db, channel_id)
uc = db.query(UserChannel).filter_by(user_id=current_user.id, channel_id=channel_id).first()
if uc:
uc.status = "followed"
else:
db.add(UserChannel(user_id=current_user.id, channel_id=channel_id, status="followed"))
db.commit()
@router.delete("/{channel_id}/follow", status_code=status.HTTP_204_NO_CONTENT)
def unfollow_channel(
channel_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
uc = db.query(UserChannel).filter_by(user_id=current_user.id, channel_id=channel_id).first()
if uc:
db.delete(uc)
db.commit()
@router.patch("/{channel_id}/auto-download", status_code=200)
def set_channel_auto_download(
channel_id: int,
body: dict,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
uc = db.query(UserChannel).filter_by(user_id=current_user.id, channel_id=channel_id).first()
if not uc:
raise HTTPException(status_code=404, detail="Not following this channel")
value = body.get("auto_download") # True / False / None
uc.auto_download = value
db.commit()
return {"auto_download": uc.auto_download}
@router.post("/{channel_id}/index", status_code=status.HTTP_202_ACCEPTED)
def index_channel(
channel_id: int,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
_get_channel_or_404(db, channel_id)
background_tasks.add_task(_index_channel_task, channel_id, current_user.id, 100)
return {"detail": "Indexing started"}
@router.post("/{channel_id}/index-full", status_code=status.HTTP_202_ACCEPTED)
def index_channel_full(
channel_id: int,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
_get_channel_or_404(db, channel_id)
background_tasks.add_task(_index_channel_task, channel_id, current_user.id, 0)
return {"detail": "Full index started"}
@router.post("/{channel_id}/explore", status_code=status.HTTP_202_ACCEPTED)
def explore_channel_older(
channel_id: int,
page: int = 2,
background_tasks: BackgroundTasks = None,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Fetch a page of older videos from this channel (page 1 = newest 30, page 2 = next 100, etc.)."""
_get_channel_or_404(db, channel_id)
start = 1 if page <= 1 else (30 + (page - 2) * 100 + 1)
background_tasks.add_task(_index_channel_explore_task, channel_id, current_user.id, start, 100)
return {"detail": f"Fetching older videos (page {page})", "start": start}
def _index_channel_explore_task(channel_id: int, user_id: int, start_video: int, count: int):
from ..database import SessionLocal
db = SessionLocal()
try:
channel = db.query(Channel).filter_by(id=channel_id).first()
if not channel:
return
result = ytdlp.fetch_channel_metadata(channel.youtube_channel_id, max_videos=count, start_video=start_video, polite=True)
if not result:
return
for vdata in result.get("videos", []):
yt_id = vdata.get("youtube_video_id")
if not yt_id:
continue
if not db.query(Video).filter_by(youtube_video_id=yt_id).first():
db.add(Video(
youtube_video_id=yt_id,
channel_id=channel.id,
title=vdata.get("title", ""),
description=vdata.get("description"),
thumbnail_url=vdata.get("thumbnail_url"),
duration_seconds=vdata.get("duration_seconds"),
published_at=vdata.get("published_at"),
tags=vdata.get("tags"),
category=vdata.get("category"),
))
db.commit()
except Exception:
db.rollback()
finally:
db.close()
@router.post("/follow-bulk", status_code=200)
def follow_bulk(
body: dict,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Follow a large list of channel handles/IDs without hitting YouTube.
Creates stub Channel records for unknowns and UserChannel rows immediately.
Metadata (name, thumbnail, videos) fills in when the user hits Sync All.
"""
handles = body.get("handles", [])
if not handles or not isinstance(handles, list):
raise HTTPException(status_code=400, detail="handles list required")
followed = 0
already = 0
created = 0
for handle in handles:
handle = str(handle).strip()
if not handle:
continue
channel = db.query(Channel).filter_by(youtube_channel_id=handle).first()
if not channel:
# Stub — name defaults to handle, filled in on next index
channel = Channel(
youtube_channel_id=handle,
name=handle.lstrip("@"),
)
db.add(channel)
db.flush()
created += 1
uc = db.query(UserChannel).filter_by(
user_id=current_user.id, channel_id=channel.id
).first()
if uc:
if uc.status != "followed":
uc.status = "followed"
followed += 1
else:
already += 1
else:
db.add(UserChannel(
user_id=current_user.id,
channel_id=channel.id,
status="followed",
))
followed += 1
db.commit()
return {"followed": followed, "already_following": already, "new_channels": created}
@router.patch("/{channel_id}/notes", status_code=200)
def update_channel_notes(
channel_id: int,
body: dict,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
uc = db.query(UserChannel).filter_by(user_id=current_user.id, channel_id=channel_id).first()
if not uc:
raise HTTPException(status_code=404, detail="Not following this channel")
uc.notes = body.get("notes", "") or ""
db.commit()
return {"ok": True}
@router.post("/{channel_id}/mute", status_code=204)
def mute_channel(
channel_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
uc = db.query(UserChannel).filter_by(user_id=current_user.id, channel_id=channel_id).first()
if not uc:
raise HTTPException(status_code=404, detail="Not following this channel")
uc.muted_until = datetime.utcnow() + timedelta(days=30)
db.commit()
@router.delete("/{channel_id}/mute", status_code=204)
def unmute_channel(
channel_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
uc = db.query(UserChannel).filter_by(user_id=current_user.id, channel_id=channel_id).first()
if uc:
uc.muted_until = None
db.commit()
@router.post("/follow-by-url", status_code=status.HTTP_201_CREATED)
def follow_by_url(
body: dict,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
yt_channel_id = body.get("youtube_channel_id") or body.get("channel_id")
if not yt_channel_id:
raise HTTPException(status_code=400, detail="youtube_channel_id required")
channel = db.query(Channel).filter_by(youtube_channel_id=yt_channel_id).first()
if not channel:
meta = ytdlp.fetch_channel_metadata(yt_channel_id, max_videos=30, polite=True)
if not meta or not meta.get("channel"):
raise HTTPException(status_code=404, detail="Channel not found on YouTube")
ch_data = meta["channel"]
channel = Channel(**{k: v for k, v in ch_data.items() if hasattr(Channel, k)})
channel.crawled_at = datetime.utcnow()
db.add(channel)
db.flush()
for vdata in meta.get("videos", []):
yt_id = vdata.get("youtube_video_id")
if not yt_id:
continue
if not db.query(Video).filter_by(youtube_video_id=yt_id).first():
db.add(Video(
youtube_video_id=yt_id,
channel_id=channel.id,
title=vdata.get("title", ""),
description=vdata.get("description"),
thumbnail_url=vdata.get("thumbnail_url"),
duration_seconds=vdata.get("duration_seconds"),
published_at=vdata.get("published_at"),
tags=vdata.get("tags"),
category=vdata.get("category"),
))
db.commit()
db.refresh(channel)
uc = db.query(UserChannel).filter_by(user_id=current_user.id, channel_id=channel.id).first()
if uc:
uc.status = "followed"
else:
db.add(UserChannel(user_id=current_user.id, channel_id=channel.id, status="followed"))
db.commit()
background_tasks.add_task(_discovery_task, current_user.id)
return {"channel_id": channel.id, "name": channel.name}