import json from datetime import datetime, timedelta from typing import Optional from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, status from pydantic import BaseModel from sqlalchemy.orm import Session from sqlalchemy import text from ..auth_utils import get_current_user from ..database import get_db from ..models import Channel, ChannelGroup, ChannelGroupMember, Download, User, UserChannel, UserSettings, UserVideo, Video from ..services import ytdlp router = APIRouter() class ChannelOut(BaseModel): id: int youtube_channel_id: str name: str description: Optional[str] thumbnail_url: Optional[str] banner_url: Optional[str] crawled_at: Optional[datetime] status: Optional[str] auto_download: Optional[bool] = None subscriber_count: Optional[int] = None video_count: int = 0 unwatched_count: int = 0 watched_count: int = 0 downloaded_count: int = 0 last_published_at: Optional[datetime] = None new_count: int = 0 latest_video_id: Optional[str] = None latest_video_title: Optional[str] = None muted_until: Optional[datetime] = None upload_frequency_days: Optional[float] = None notes: Optional[str] = "" model_config = {"from_attributes": True} class ChannelGroupOut(BaseModel): id: int name: str channel_ids: list[int] = [] model_config = {"from_attributes": True} class VideoOut(BaseModel): id: int youtube_video_id: str title: str thumbnail_url: Optional[str] duration_seconds: Optional[int] published_at: Optional[datetime] channel_id: Optional[int] = None channel_name: Optional[str] = None channel_youtube_id: Optional[str] = None is_downloaded: bool = False is_watched: bool = False queued: bool = False model_config = {"from_attributes": True} def _get_channel_or_404(db: Session, channel_id: int) -> Channel: c = db.query(Channel).filter(Channel.id == channel_id).first() if not c: raise HTTPException(status_code=404, detail="Channel not found") return c def _index_channels_batch(channel_ids: list[int], user_id: int, delay: float = 1.5): """Run channel syncs sequentially with a polite delay between requests.""" import time for i, cid in enumerate(channel_ids): if i > 0: time.sleep(delay) _index_channel_task(cid, user_id) def _index_channel_task(channel_id: int, user_id: int): from ..database import SessionLocal db = SessionLocal() try: channel = db.query(Channel).filter_by(id=channel_id).first() if not channel: return result = ytdlp.fetch_channel_metadata(channel.youtube_channel_id) if not result: return ch_data = result.get("channel", {}) if ch_data: for k, v in ch_data.items(): if hasattr(channel, k) and v is not None and v != "": setattr(channel, k, v) channel.crawled_at = datetime.utcnow() db.merge(channel) new_video_ids = [] for vdata in result.get("videos", []): yt_id = vdata.get("youtube_video_id") if not yt_id: continue existing = db.query(Video).filter_by(youtube_video_id=yt_id).first() if not existing: new_video = Video( youtube_video_id=yt_id, channel_id=channel.id, title=vdata.get("title", ""), description=vdata.get("description"), thumbnail_url=vdata.get("thumbnail_url"), duration_seconds=vdata.get("duration_seconds"), published_at=vdata.get("published_at"), tags=vdata.get("tags"), category=vdata.get("category"), ) db.add(new_video) db.flush() new_video_ids.append((yt_id, new_video.id)) else: # Backfill missing metadata on existing videos if existing.published_at is None and vdata.get("published_at"): existing.published_at = vdata["published_at"] if not existing.title and vdata.get("title"): existing.title = vdata["title"] if not existing.thumbnail_url and vdata.get("thumbnail_url"): existing.thumbnail_url = vdata["thumbnail_url"] if not existing.duration_seconds and vdata.get("duration_seconds"): existing.duration_seconds = vdata["duration_seconds"] if not existing.description and vdata.get("description"): existing.description = vdata["description"] db.commit() # Auto-download new videos if setting says to if new_video_ids and user_id: uc = db.query(UserChannel).filter_by(user_id=user_id, channel_id=channel.id).first() user_settings = db.query(UserSettings).filter_by(user_id=user_id).first() global_auto = user_settings.auto_download_on_sync if user_settings else False channel_auto = uc.auto_download if uc and uc.auto_download is not None else global_auto if channel_auto: quality = user_settings.preferred_quality if user_settings else "best" subtitle_langs = (user_settings.subtitle_langs or "") if user_settings else "" from ..routers.downloads import _on_progress, _on_complete, _on_error for yt_id, vid_id in new_video_ids: existing_dl = db.query(Download).filter_by( user_id=user_id, video_id=vid_id ).filter(Download.status.in_(["pending", "downloading", "complete"])).first() if not existing_dl: dl = Download(user_id=user_id, video_id=vid_id, status="pending") db.add(dl) db.flush() import threading t = threading.Thread( target=ytdlp.start_download, args=(yt_id, dl.id, _on_progress, _on_complete, _on_error, quality, subtitle_langs), daemon=True, ) t.start() db.commit() except Exception: db.rollback() finally: db.close() def _discovery_task(user_id: int): from ..database import SessionLocal from ..services.discovery import run_full_discovery db = SessionLocal() try: run_full_discovery(db, user_id) except Exception: pass finally: db.close() def _enrich_missing_task(limit: int = 20): """Fetch full metadata for videos that are missing a description.""" from ..database import SessionLocal db = SessionLocal() try: rows = db.execute( text(""" SELECT v.id, v.youtube_video_id FROM videos v WHERE v.description IS NULL ORDER BY -- prioritise: followed-channel videos first, then discovery queue, then rest (EXISTS (SELECT 1 FROM user_channels uc WHERE uc.channel_id = v.channel_id AND uc.status = 'followed')) DESC, (EXISTS (SELECT 1 FROM discovery_queue dq WHERE dq.channel_id = v.channel_id)) DESC, v.id DESC LIMIT :limit """), {"limit": limit}, ).mappings().all() for i, row in enumerate(rows): if i > 0: import time; time.sleep(2) try: meta = ytdlp.fetch_video_metadata(row["youtube_video_id"]) if meta: vid = db.query(Video).filter_by(id=row["id"]).first() if vid: if meta.get("description") is not None: vid.description = meta["description"] or "" if not vid.tags and meta.get("tags"): vid.tags = meta["tags"] if not vid.category and meta.get("category"): vid.category = meta["category"] if not vid.chapters and meta.get("chapters"): vid.chapters = meta["chapters"] db.commit() except Exception: db.rollback() finally: db.close() @router.get("/feed", response_model=list[VideoOut]) def channel_feed( limit: int = 24, offset: int = 0, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): rows = db.execute( text(""" SELECT v.id, v.youtube_video_id, v.title, v.thumbnail_url, v.duration_seconds, v.published_at, c.id AS channel_id, c.name AS channel_name, c.youtube_channel_id AS channel_youtube_id, COALESCE(uv.downloaded, 0) AS is_downloaded, COALESCE(uv.watched, 0) AS is_watched, COALESCE(uv.queued, 0) AS queued FROM videos v JOIN channels c ON v.channel_id = c.id JOIN user_channels uc ON c.id = uc.channel_id AND uc.user_id = :user_id AND uc.status = 'followed' LEFT JOIN user_videos uv ON v.id = uv.video_id AND uv.user_id = :user_id ORDER BY v.published_at DESC LIMIT :limit OFFSET :offset """), {"user_id": current_user.id, "limit": limit, "offset": offset}, ).mappings().all() return [VideoOut(**dict(r)) for r in rows] @router.post("/sync-all", status_code=202) def sync_all_channels( background_tasks: BackgroundTasks, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): # Only sync channels not touched in the last 6 hours to avoid hammering YouTube channels = db.execute( text(""" SELECT c.id FROM channels c JOIN user_channels uc ON c.id = uc.channel_id WHERE uc.user_id = :uid AND uc.status = 'followed' AND (c.crawled_at IS NULL OR c.crawled_at < datetime('now', '-6 hours')) ORDER BY COALESCE(c.crawled_at, '1970-01-01') ASC """), {"uid": current_user.id}, ).mappings().all() if channels: ids = [row["id"] for row in channels] background_tasks.add_task(_index_channels_batch, ids, current_user.id) background_tasks.add_task(_discovery_task, current_user.id) background_tasks.add_task(_enrich_missing_task, 5) return {"indexing": len(channels)} @router.post("/mark-seen", status_code=204) def mark_channels_seen( db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): db.execute( text("UPDATE user_channels SET last_seen_at = :now WHERE user_id = :uid AND status = 'followed'"), {"now": datetime.utcnow(), "uid": current_user.id}, ) db.commit() @router.get("", response_model=list[ChannelOut]) def list_channels( db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): uid = current_user.id # Step 1 — channel rows + user_channel metadata (fast, no video stats) ch_rows = db.execute( text(""" SELECT c.id, c.youtube_channel_id, c.name, c.description, c.thumbnail_url, c.banner_url, c.subscriber_count, c.crawled_at, uc.status, uc.auto_download, uc.muted_until, uc.notes, uc.last_seen_at FROM channels c JOIN user_channels uc ON c.id = uc.channel_id WHERE uc.user_id = :uid AND uc.status = 'followed' """), {"uid": uid}, ).mappings().all() if not ch_rows: return [] id_csv = ",".join(str(r["id"]) for r in ch_rows) last_seen = {r["id"]: r["last_seen_at"] for r in ch_rows} # Step 2 — aggregated video stats for all channels in one query vstats = { r["channel_id"]: r for r in db.execute( text(f""" SELECT v.channel_id, COUNT(*) AS video_count, MAX(v.published_at) AS last_published_at, julianday(MAX(v.published_at)) - julianday(MIN(v.published_at)) AS date_span_days, SUM(CASE WHEN COALESCE(uv.watched, 0) = 0 THEN 1 ELSE 0 END) AS unwatched_count, SUM(CASE WHEN uv.watched = 1 THEN 1 ELSE 0 END) AS watched_count, SUM(CASE WHEN uv.downloaded = 1 THEN 1 ELSE 0 END) AS downloaded_count FROM videos v LEFT JOIN user_videos uv ON uv.video_id = v.id AND uv.user_id = :uid WHERE v.channel_id IN ({id_csv}) GROUP BY v.channel_id """), {"uid": uid}, ).mappings().all() } # Step 3 — new-video count per channel (videos indexed after last_seen_at) new_counts = { r["channel_id"]: r["new_count"] for r in db.execute( text(f""" SELECT v.channel_id, COUNT(*) AS new_count FROM videos v JOIN user_channels uc ON uc.channel_id = v.channel_id AND uc.user_id = :uid WHERE v.channel_id IN ({id_csv}) AND (uc.last_seen_at IS NULL OR v.indexed_at > uc.last_seen_at) GROUP BY v.channel_id """), {"uid": uid}, ).mappings().all() } # Step 4 — latest video id + title per channel (derived-table join, no correlated subquery) latest = { r["channel_id"]: r for r in db.execute( text(f""" SELECT v.channel_id, v.youtube_video_id AS latest_video_id, v.title AS latest_video_title FROM videos v JOIN ( SELECT channel_id, MAX(published_at) AS max_pub FROM videos WHERE channel_id IN ({id_csv}) GROUP BY channel_id ) m ON v.channel_id = m.channel_id AND v.published_at = m.max_pub GROUP BY v.channel_id """), ).mappings().all() } # Merge and build response result = [] for r in ch_rows: cid = r["id"] vs = vstats.get(cid) or {} vc = vs.get("video_count") or 0 newest = vs.get("last_published_at") span = vs.get("date_span_days") freq = (span / (vc - 1.0)) if (vc >= 2 and span is not None) else None result.append(ChannelOut( id=cid, youtube_channel_id=r["youtube_channel_id"], name=r["name"], description=r["description"], thumbnail_url=r["thumbnail_url"], banner_url=r.get("banner_url"), subscriber_count=r.get("subscriber_count"), crawled_at=r.get("crawled_at"), status=r["status"], auto_download=r.get("auto_download"), muted_until=r.get("muted_until"), notes=r.get("notes") or "", video_count=vc, last_published_at=newest, unwatched_count=vs.get("unwatched_count") or 0, watched_count=vs.get("watched_count") or 0, downloaded_count=vs.get("downloaded_count") or 0, new_count=new_counts.get(cid, 0), latest_video_id=latest.get(cid, {}).get("latest_video_id"), latest_video_title=latest.get(cid, {}).get("latest_video_title"), upload_frequency_days=freq, )) result.sort(key=lambda c: c.last_published_at or datetime.min, reverse=True) return result # ── Channel Groups (must be before /{channel_id} to avoid route shadowing) ─── @router.get("/groups", response_model=list[ChannelGroupOut]) def list_groups( db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): groups = db.query(ChannelGroup).filter_by(user_id=current_user.id).all() result = [] for g in groups: members = db.query(ChannelGroupMember).filter_by(group_id=g.id).all() result.append(ChannelGroupOut(id=g.id, name=g.name, channel_ids=[m.channel_id for m in members])) return result @router.post("/groups", response_model=ChannelGroupOut, status_code=201) def create_group( body: dict, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): name = (body.get("name") or "").strip() if not name: raise HTTPException(status_code=400, detail="name required") g = ChannelGroup(user_id=current_user.id, name=name) db.add(g) db.commit() db.refresh(g) return ChannelGroupOut(id=g.id, name=g.name, channel_ids=[]) @router.delete("/groups/{group_id}", status_code=204) def delete_group( group_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): g = db.query(ChannelGroup).filter_by(id=group_id, user_id=current_user.id).first() if not g: raise HTTPException(status_code=404, detail="Group not found") db.delete(g) db.commit() @router.patch("/groups/{group_id}", response_model=ChannelGroupOut) def rename_group( group_id: int, body: dict, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): g = db.query(ChannelGroup).filter_by(id=group_id, user_id=current_user.id).first() if not g: raise HTTPException(status_code=404, detail="Group not found") name = (body.get("name") or "").strip() if name: g.name = name db.commit() members = db.query(ChannelGroupMember).filter_by(group_id=g.id).all() return ChannelGroupOut(id=g.id, name=g.name, channel_ids=[m.channel_id for m in members]) @router.post("/groups/{group_id}/channels/{channel_id}", status_code=204) def add_channel_to_group( group_id: int, channel_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): g = db.query(ChannelGroup).filter_by(id=group_id, user_id=current_user.id).first() if not g: raise HTTPException(status_code=404, detail="Group not found") existing = db.query(ChannelGroupMember).filter_by(group_id=group_id, channel_id=channel_id).first() if not existing: db.add(ChannelGroupMember(group_id=group_id, channel_id=channel_id)) db.commit() @router.delete("/groups/{group_id}/channels/{channel_id}", status_code=204) def remove_channel_from_group_route( group_id: int, channel_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): m = db.query(ChannelGroupMember).filter_by(group_id=group_id, channel_id=channel_id).first() if m: db.delete(m) db.commit() class BulkChannelBody(BaseModel): channel_ids: list[int] action: str # "mute" | "unmute" | "unfollow" @router.post("/bulk-action", status_code=200) def bulk_channel_action( body: BulkChannelBody, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): if not body.channel_ids: return {"ok": True} placeholders = ",".join(str(int(i)) for i in body.channel_ids) if body.action == "mute": db.execute( text(f""" UPDATE user_channels SET muted_until = :until WHERE user_id = :user_id AND channel_id IN ({placeholders}) """), {"until": datetime.utcnow() + timedelta(days=30), "user_id": current_user.id}, ) elif body.action == "unmute": db.execute( text(f"UPDATE user_channels SET muted_until = NULL WHERE user_id = :user_id AND channel_id IN ({placeholders})"), {"user_id": current_user.id}, ) elif body.action == "unfollow": db.execute( text(f"DELETE FROM user_channels WHERE user_id = :user_id AND channel_id IN ({placeholders})"), {"user_id": current_user.id}, ) db.commit() return {"ok": True} @router.get("/{channel_id}", response_model=ChannelOut) def get_channel( channel_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): row = db.execute( text(""" SELECT c.*, uc.status, uc.auto_download, uc.muted_until, (SELECT COUNT(*) FROM videos WHERE channel_id = c.id) AS video_count, (SELECT MAX(v.published_at) FROM videos v WHERE v.channel_id = c.id) AS last_published_at, (SELECT COUNT(*) FROM videos v LEFT JOIN user_videos uv ON v.id = uv.video_id AND uv.user_id = :user_id WHERE v.channel_id = c.id AND COALESCE(uv.watched, 0) = 0) AS unwatched_count, (SELECT COUNT(*) FROM videos v JOIN user_videos uv ON v.id = uv.video_id AND uv.user_id = :user_id WHERE v.channel_id = c.id AND uv.watched = 1) AS watched_count, (SELECT COUNT(*) FROM videos v JOIN user_videos uv ON v.id = uv.video_id AND uv.user_id = :user_id WHERE v.channel_id = c.id AND uv.downloaded = 1) AS downloaded_count, 0 AS new_count, (SELECT v.youtube_video_id FROM videos v WHERE v.channel_id = c.id ORDER BY v.published_at DESC LIMIT 1) AS latest_video_id, (SELECT v.title FROM videos v WHERE v.channel_id = c.id ORDER BY v.published_at DESC LIMIT 1) AS latest_video_title FROM channels c LEFT JOIN user_channels uc ON c.id = uc.channel_id AND uc.user_id = :user_id WHERE c.id = :channel_id """), {"user_id": current_user.id, "channel_id": channel_id}, ).mappings().first() if not row: raise HTTPException(status_code=404, detail="Channel not found") return ChannelOut(**dict(row)) @router.get("/{channel_id}/videos", response_model=list[VideoOut]) def get_channel_videos( channel_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): _get_channel_or_404(db, channel_id) rows = db.execute( text(""" SELECT v.id, v.youtube_video_id, v.title, v.thumbnail_url, v.duration_seconds, v.published_at, COALESCE(uv.downloaded, 0) AS is_downloaded, COALESCE(uv.watched, 0) AS is_watched FROM videos v LEFT JOIN user_videos uv ON v.id = uv.video_id AND uv.user_id = :user_id WHERE v.channel_id = :channel_id ORDER BY v.published_at DESC """), {"user_id": current_user.id, "channel_id": channel_id}, ).mappings().all() return [VideoOut(**dict(r)) for r in rows] @router.post("/{channel_id}/follow", status_code=status.HTTP_204_NO_CONTENT) def follow_channel( channel_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): _get_channel_or_404(db, channel_id) uc = db.query(UserChannel).filter_by(user_id=current_user.id, channel_id=channel_id).first() if uc: uc.status = "followed" else: db.add(UserChannel(user_id=current_user.id, channel_id=channel_id, status="followed")) db.commit() @router.delete("/{channel_id}/follow", status_code=status.HTTP_204_NO_CONTENT) def unfollow_channel( channel_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): uc = db.query(UserChannel).filter_by(user_id=current_user.id, channel_id=channel_id).first() if uc: db.delete(uc) db.commit() @router.patch("/{channel_id}/auto-download", status_code=200) def set_channel_auto_download( channel_id: int, body: dict, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): uc = db.query(UserChannel).filter_by(user_id=current_user.id, channel_id=channel_id).first() if not uc: raise HTTPException(status_code=404, detail="Not following this channel") value = body.get("auto_download") # True / False / None uc.auto_download = value db.commit() return {"auto_download": uc.auto_download} @router.post("/{channel_id}/index", status_code=status.HTTP_202_ACCEPTED) def index_channel( channel_id: int, background_tasks: BackgroundTasks, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): _get_channel_or_404(db, channel_id) background_tasks.add_task(_index_channel_task, channel_id, current_user.id) return {"detail": "Indexing started"} @router.post("/follow-bulk", status_code=200) def follow_bulk( body: dict, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """Follow a large list of channel handles/IDs without hitting YouTube. Creates stub Channel records for unknowns and UserChannel rows immediately. Metadata (name, thumbnail, videos) fills in when the user hits Sync All. """ handles = body.get("handles", []) if not handles or not isinstance(handles, list): raise HTTPException(status_code=400, detail="handles list required") followed = 0 already = 0 created = 0 for handle in handles: handle = str(handle).strip() if not handle: continue channel = db.query(Channel).filter_by(youtube_channel_id=handle).first() if not channel: # Stub — name defaults to handle, filled in on next index channel = Channel( youtube_channel_id=handle, name=handle.lstrip("@"), ) db.add(channel) db.flush() created += 1 uc = db.query(UserChannel).filter_by( user_id=current_user.id, channel_id=channel.id ).first() if uc: if uc.status != "followed": uc.status = "followed" followed += 1 else: already += 1 else: db.add(UserChannel( user_id=current_user.id, channel_id=channel.id, status="followed", )) followed += 1 db.commit() return {"followed": followed, "already_following": already, "new_channels": created} @router.patch("/{channel_id}/notes", status_code=200) def update_channel_notes( channel_id: int, body: dict, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): uc = db.query(UserChannel).filter_by(user_id=current_user.id, channel_id=channel_id).first() if not uc: raise HTTPException(status_code=404, detail="Not following this channel") uc.notes = body.get("notes", "") or "" db.commit() return {"ok": True} @router.post("/{channel_id}/mute", status_code=204) def mute_channel( channel_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): uc = db.query(UserChannel).filter_by(user_id=current_user.id, channel_id=channel_id).first() if not uc: raise HTTPException(status_code=404, detail="Not following this channel") uc.muted_until = datetime.utcnow() + timedelta(days=30) db.commit() @router.delete("/{channel_id}/mute", status_code=204) def unmute_channel( channel_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): uc = db.query(UserChannel).filter_by(user_id=current_user.id, channel_id=channel_id).first() if uc: uc.muted_until = None db.commit() @router.post("/follow-by-url", status_code=status.HTTP_201_CREATED) def follow_by_url( body: dict, background_tasks: BackgroundTasks, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): yt_channel_id = body.get("youtube_channel_id") or body.get("channel_id") if not yt_channel_id: raise HTTPException(status_code=400, detail="youtube_channel_id required") channel = db.query(Channel).filter_by(youtube_channel_id=yt_channel_id).first() if not channel: meta = ytdlp.fetch_channel_metadata(yt_channel_id, max_videos=30) if not meta or not meta.get("channel"): raise HTTPException(status_code=404, detail="Channel not found on YouTube") ch_data = meta["channel"] channel = Channel(**{k: v for k, v in ch_data.items() if hasattr(Channel, k)}) channel.crawled_at = datetime.utcnow() db.add(channel) db.flush() for vdata in meta.get("videos", []): yt_id = vdata.get("youtube_video_id") if not yt_id: continue if not db.query(Video).filter_by(youtube_video_id=yt_id).first(): db.add(Video( youtube_video_id=yt_id, channel_id=channel.id, title=vdata.get("title", ""), description=vdata.get("description"), thumbnail_url=vdata.get("thumbnail_url"), duration_seconds=vdata.get("duration_seconds"), published_at=vdata.get("published_at"), tags=vdata.get("tags"), category=vdata.get("category"), )) db.commit() db.refresh(channel) uc = db.query(UserChannel).filter_by(user_id=current_user.id, channel_id=channel.id).first() if uc: uc.status = "followed" else: db.add(UserChannel(user_id=current_user.id, channel_id=channel.id, status="followed")) db.commit() background_tasks.add_task(_discovery_task, current_user.id) return {"channel_id": channel.id, "name": channel.name}