import json from datetime import datetime, timedelta from typing import Optional from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, status from pydantic import BaseModel from sqlalchemy.orm import Session from sqlalchemy import text from ..auth_utils import get_current_user from ..database import get_db from ..models import Channel, ChannelGroup, ChannelGroupMember, Download, User, UserChannel, UserSettings, UserVideo, Video from ..services import ytdlp router = APIRouter() class ChannelOut(BaseModel): id: int youtube_channel_id: str name: str description: Optional[str] thumbnail_url: Optional[str] banner_url: Optional[str] crawled_at: Optional[datetime] status: Optional[str] auto_download: Optional[bool] = None subscriber_count: Optional[int] = None video_count: int = 0 unwatched_count: int = 0 watched_count: int = 0 downloaded_count: int = 0 last_published_at: Optional[datetime] = None new_count: int = 0 latest_video_id: Optional[str] = None latest_video_title: Optional[str] = None muted_until: Optional[datetime] = None upload_frequency_days: Optional[float] = None notes: Optional[str] = "" model_config = {"from_attributes": True} class ChannelGroupOut(BaseModel): id: int name: str channel_ids: list[int] = [] model_config = {"from_attributes": True} class VideoOut(BaseModel): id: int youtube_video_id: str title: str thumbnail_url: Optional[str] duration_seconds: Optional[int] published_at: Optional[datetime] channel_id: Optional[int] = None channel_name: Optional[str] = None channel_youtube_id: Optional[str] = None is_downloaded: bool = False is_watched: bool = False queued: bool = False view_count: Optional[int] = None model_config = {"from_attributes": True} def _get_channel_or_404(db: Session, channel_id: int) -> Channel: c = db.query(Channel).filter(Channel.id == channel_id).first() if not c: raise HTTPException(status_code=404, detail="Channel not found") return c def _index_channels_batch(channel_ids: list[int], user_id: int, delay: float = 1.5): """Run channel syncs sequentially with a polite delay between requests.""" import time for i, cid in enumerate(channel_ids): if i > 0: time.sleep(delay) _index_channel_task(cid, user_id) def _index_channel_task(channel_id: int, user_id: int, max_videos: int = 30): from ..database import SessionLocal db = SessionLocal() try: channel = db.query(Channel).filter_by(id=channel_id).first() if not channel: return result = ytdlp.fetch_channel_metadata(channel.youtube_channel_id, max_videos=max_videos) if not result: return ch_data = result.get("channel", {}) if ch_data: for k, v in ch_data.items(): if hasattr(channel, k) and v is not None and v != "": setattr(channel, k, v) channel.crawled_at = datetime.utcnow() db.merge(channel) new_video_ids = [] for vdata in result.get("videos", []): yt_id = vdata.get("youtube_video_id") if not yt_id: continue existing = db.query(Video).filter_by(youtube_video_id=yt_id).first() if not existing: new_video = Video( youtube_video_id=yt_id, channel_id=channel.id, title=vdata.get("title", ""), description=vdata.get("description"), thumbnail_url=vdata.get("thumbnail_url"), duration_seconds=vdata.get("duration_seconds"), published_at=vdata.get("published_at"), tags=vdata.get("tags"), category=vdata.get("category"), ) db.add(new_video) db.flush() new_video_ids.append((yt_id, new_video.id)) else: # Backfill missing metadata on existing videos if existing.published_at is None and vdata.get("published_at"): existing.published_at = vdata["published_at"] if not existing.title and vdata.get("title"): existing.title = vdata["title"] if not existing.thumbnail_url and vdata.get("thumbnail_url"): existing.thumbnail_url = vdata["thumbnail_url"] if not existing.duration_seconds and vdata.get("duration_seconds"): existing.duration_seconds = vdata["duration_seconds"] if not existing.description and vdata.get("description"): existing.description = vdata["description"] db.commit() # Auto-download new videos if setting says to if new_video_ids and user_id: uc = db.query(UserChannel).filter_by(user_id=user_id, channel_id=channel.id).first() user_settings = db.query(UserSettings).filter_by(user_id=user_id).first() global_auto = user_settings.auto_download_on_sync if user_settings else False channel_auto = uc.auto_download if uc and uc.auto_download is not None else global_auto if channel_auto: quality = user_settings.preferred_quality if user_settings else "best" subtitle_langs = (user_settings.subtitle_langs or "") if user_settings else "" from ..routers.downloads import _on_progress, _on_complete, _on_error for yt_id, vid_id in new_video_ids: existing_dl = db.query(Download).filter_by( user_id=user_id, video_id=vid_id ).filter(Download.status.in_(["pending", "downloading", "complete"])).first() if not existing_dl: dl = Download(user_id=user_id, video_id=vid_id, status="pending") db.add(dl) db.flush() import threading t = threading.Thread( target=ytdlp.start_download, args=(yt_id, dl.id, _on_progress, _on_complete, _on_error, quality, subtitle_langs), daemon=True, ) t.start() db.commit() except Exception: db.rollback() finally: db.close() def _discovery_task(user_id: int): from ..database import SessionLocal from ..services.discovery import run_full_discovery db = SessionLocal() try: run_full_discovery(db, user_id) except Exception: pass finally: db.close() def _enrich_missing_task(limit: int = 20): """Fetch full metadata for videos missing description, published_at, or view_count.""" from ..database import SessionLocal import time db = SessionLocal() try: rows = db.execute( text(""" SELECT v.id, v.youtube_video_id FROM videos v WHERE v.description IS NULL OR v.published_at IS NULL OR v.view_count IS NULL ORDER BY (EXISTS (SELECT 1 FROM user_channels uc WHERE uc.channel_id = v.channel_id AND uc.status = 'followed')) DESC, (EXISTS (SELECT 1 FROM discovery_queue dq WHERE dq.channel_id = v.channel_id)) DESC, v.id DESC LIMIT :limit """), {"limit": limit}, ).mappings().all() for i, row in enumerate(rows): if i > 0: time.sleep(0.5) try: meta = ytdlp.fetch_video_metadata(row["youtube_video_id"]) if meta: vid = db.query(Video).filter_by(id=row["id"]).first() if vid: if meta.get("description") is not None: vid.description = meta["description"] or "" if not vid.published_at and meta.get("published_at"): vid.published_at = meta["published_at"] if vid.view_count is None and meta.get("view_count") is not None: vid.view_count = meta["view_count"] if not vid.tags and meta.get("tags"): vid.tags = meta["tags"] if not vid.category and meta.get("category"): vid.category = meta["category"] if not vid.chapters and meta.get("chapters"): vid.chapters = meta["chapters"] db.commit() except Exception: db.rollback() finally: db.close() @router.get("/feed", response_model=list[VideoOut]) def channel_feed( limit: int = 24, offset: int = 0, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): rows = db.execute( text(""" SELECT v.id, v.youtube_video_id, v.title, v.thumbnail_url, v.duration_seconds, v.published_at, c.id AS channel_id, c.name AS channel_name, c.youtube_channel_id AS channel_youtube_id, COALESCE(uv.downloaded, 0) AS is_downloaded, COALESCE(uv.watched, 0) AS is_watched, COALESCE(uv.queued, 0) AS queued FROM videos v JOIN channels c ON v.channel_id = c.id JOIN user_channels uc ON c.id = uc.channel_id AND uc.user_id = :user_id AND uc.status = 'followed' LEFT JOIN user_videos uv ON v.id = uv.video_id AND uv.user_id = :user_id ORDER BY v.published_at DESC LIMIT :limit OFFSET :offset """), {"user_id": current_user.id, "limit": limit, "offset": offset}, ).mappings().all() return [VideoOut(**dict(r)) for r in rows] @router.post("/sync-all", status_code=202) def sync_all_channels( background_tasks: BackgroundTasks, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): # Only sync channels not touched in the last 6 hours to avoid hammering YouTube channels = db.execute( text(""" SELECT c.id FROM channels c JOIN user_channels uc ON c.id = uc.channel_id WHERE uc.user_id = :uid AND uc.status = 'followed' AND (c.crawled_at IS NULL OR c.crawled_at < datetime('now', '-6 hours')) ORDER BY COALESCE(c.crawled_at, '1970-01-01') ASC """), {"uid": current_user.id}, ).mappings().all() if channels: ids = [row["id"] for row in channels] background_tasks.add_task(_index_channels_batch, ids, current_user.id) background_tasks.add_task(_discovery_task, current_user.id) background_tasks.add_task(_enrich_missing_task, 30) return {"indexing": len(channels)} @router.post("/mark-seen", status_code=204) def mark_channels_seen( db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): db.execute( text("UPDATE user_channels SET last_seen_at = :now WHERE user_id = :uid AND status = 'followed'"), {"now": datetime.utcnow(), "uid": current_user.id}, ) db.commit() @router.get("", response_model=list[ChannelOut]) def list_channels( db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): uid = current_user.id # Step 1 — channel rows + user_channel metadata (fast, no video stats) ch_rows = db.execute( text(""" SELECT c.id, c.youtube_channel_id, c.name, c.description, c.thumbnail_url, c.banner_url, c.subscriber_count, c.crawled_at, uc.status, uc.auto_download, uc.muted_until, uc.notes, uc.last_seen_at FROM channels c JOIN user_channels uc ON c.id = uc.channel_id WHERE uc.user_id = :uid AND uc.status = 'followed' """), {"uid": uid}, ).mappings().all() if not ch_rows: return [] id_csv = ",".join(str(r["id"]) for r in ch_rows) last_seen = {r["id"]: r["last_seen_at"] for r in ch_rows} # Step 2 — aggregated video stats for all channels in one query vstats = { r["channel_id"]: r for r in db.execute( text(f""" SELECT v.channel_id, COUNT(*) AS video_count, MAX(v.published_at) AS last_published_at, julianday(MAX(v.published_at)) - julianday(MIN(v.published_at)) AS date_span_days, SUM(CASE WHEN COALESCE(uv.watched, 0) = 0 THEN 1 ELSE 0 END) AS unwatched_count, SUM(CASE WHEN uv.watched = 1 THEN 1 ELSE 0 END) AS watched_count, SUM(CASE WHEN uv.downloaded = 1 THEN 1 ELSE 0 END) AS downloaded_count FROM videos v LEFT JOIN user_videos uv ON uv.video_id = v.id AND uv.user_id = :uid WHERE v.channel_id IN ({id_csv}) GROUP BY v.channel_id """), {"uid": uid}, ).mappings().all() } # Step 3 — new-video count per channel (videos indexed after last_seen_at) new_counts = { r["channel_id"]: r["new_count"] for r in db.execute( text(f""" SELECT v.channel_id, COUNT(*) AS new_count FROM videos v JOIN user_channels uc ON uc.channel_id = v.channel_id AND uc.user_id = :uid WHERE v.channel_id IN ({id_csv}) AND (uc.last_seen_at IS NULL OR v.indexed_at > uc.last_seen_at) GROUP BY v.channel_id """), {"uid": uid}, ).mappings().all() } # Step 4 — latest video id + title per channel (derived-table join, no correlated subquery) latest = { r["channel_id"]: r for r in db.execute( text(f""" SELECT v.channel_id, v.youtube_video_id AS latest_video_id, v.title AS latest_video_title FROM videos v JOIN ( SELECT channel_id, MAX(published_at) AS max_pub FROM videos WHERE channel_id IN ({id_csv}) GROUP BY channel_id ) m ON v.channel_id = m.channel_id AND v.published_at = m.max_pub GROUP BY v.channel_id """), ).mappings().all() } # Merge and build response result = [] for r in ch_rows: cid = r["id"] vs = vstats.get(cid) or {} vc = vs.get("video_count") or 0 newest = vs.get("last_published_at") span = vs.get("date_span_days") freq = (span / (vc - 1.0)) if (vc >= 2 and span is not None) else None result.append(ChannelOut( id=cid, youtube_channel_id=r["youtube_channel_id"], name=r["name"], description=r["description"], thumbnail_url=r["thumbnail_url"], banner_url=r.get("banner_url"), subscriber_count=r.get("subscriber_count"), crawled_at=r.get("crawled_at"), status=r["status"], auto_download=r.get("auto_download"), muted_until=r.get("muted_until"), notes=r.get("notes") or "", video_count=vc, last_published_at=newest, unwatched_count=vs.get("unwatched_count") or 0, watched_count=vs.get("watched_count") or 0, downloaded_count=vs.get("downloaded_count") or 0, new_count=new_counts.get(cid, 0), latest_video_id=latest.get(cid, {}).get("latest_video_id"), latest_video_title=latest.get(cid, {}).get("latest_video_title"), upload_frequency_days=freq, )) result.sort(key=lambda c: c.last_published_at or datetime.min, reverse=True) return result # ── Channel Groups (must be before /{channel_id} to avoid route shadowing) ─── @router.get("/groups", response_model=list[ChannelGroupOut]) def list_groups( db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): groups = db.query(ChannelGroup).filter_by(user_id=current_user.id).all() result = [] for g in groups: members = db.query(ChannelGroupMember).filter_by(group_id=g.id).all() result.append(ChannelGroupOut(id=g.id, name=g.name, channel_ids=[m.channel_id for m in members])) return result @router.post("/groups", response_model=ChannelGroupOut, status_code=201) def create_group( body: dict, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): name = (body.get("name") or "").strip() if not name: raise HTTPException(status_code=400, detail="name required") g = ChannelGroup(user_id=current_user.id, name=name) db.add(g) db.commit() db.refresh(g) return ChannelGroupOut(id=g.id, name=g.name, channel_ids=[]) @router.delete("/groups/{group_id}", status_code=204) def delete_group( group_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): g = db.query(ChannelGroup).filter_by(id=group_id, user_id=current_user.id).first() if not g: raise HTTPException(status_code=404, detail="Group not found") db.delete(g) db.commit() @router.patch("/groups/{group_id}", response_model=ChannelGroupOut) def rename_group( group_id: int, body: dict, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): g = db.query(ChannelGroup).filter_by(id=group_id, user_id=current_user.id).first() if not g: raise HTTPException(status_code=404, detail="Group not found") name = (body.get("name") or "").strip() if name: g.name = name db.commit() members = db.query(ChannelGroupMember).filter_by(group_id=g.id).all() return ChannelGroupOut(id=g.id, name=g.name, channel_ids=[m.channel_id for m in members]) @router.post("/groups/{group_id}/channels/{channel_id}", status_code=204) def add_channel_to_group( group_id: int, channel_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): g = db.query(ChannelGroup).filter_by(id=group_id, user_id=current_user.id).first() if not g: raise HTTPException(status_code=404, detail="Group not found") existing = db.query(ChannelGroupMember).filter_by(group_id=group_id, channel_id=channel_id).first() if not existing: db.add(ChannelGroupMember(group_id=group_id, channel_id=channel_id)) db.commit() @router.delete("/groups/{group_id}/channels/{channel_id}", status_code=204) def remove_channel_from_group_route( group_id: int, channel_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): m = db.query(ChannelGroupMember).filter_by(group_id=group_id, channel_id=channel_id).first() if m: db.delete(m) db.commit() class BulkChannelBody(BaseModel): channel_ids: list[int] action: str # "mute" | "unmute" | "unfollow" @router.post("/bulk-action", status_code=200) def bulk_channel_action( body: BulkChannelBody, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): if not body.channel_ids: return {"ok": True} placeholders = ",".join(str(int(i)) for i in body.channel_ids) if body.action == "mute": db.execute( text(f""" UPDATE user_channels SET muted_until = :until WHERE user_id = :user_id AND channel_id IN ({placeholders}) """), {"until": datetime.utcnow() + timedelta(days=30), "user_id": current_user.id}, ) elif body.action == "unmute": db.execute( text(f"UPDATE user_channels SET muted_until = NULL WHERE user_id = :user_id AND channel_id IN ({placeholders})"), {"user_id": current_user.id}, ) elif body.action == "unfollow": db.execute( text(f"DELETE FROM user_channels WHERE user_id = :user_id AND channel_id IN ({placeholders})"), {"user_id": current_user.id}, ) db.commit() return {"ok": True} @router.get("/{channel_id}", response_model=ChannelOut) def get_channel( channel_id: int, background_tasks: BackgroundTasks, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): row = db.execute( text(""" SELECT c.*, uc.status, uc.auto_download, uc.muted_until, (SELECT COUNT(*) FROM videos WHERE channel_id = c.id) AS video_count, (SELECT MAX(v.published_at) FROM videos v WHERE v.channel_id = c.id) AS last_published_at, (SELECT COUNT(*) FROM videos v LEFT JOIN user_videos uv ON v.id = uv.video_id AND uv.user_id = :user_id WHERE v.channel_id = c.id AND COALESCE(uv.watched, 0) = 0) AS unwatched_count, (SELECT COUNT(*) FROM videos v JOIN user_videos uv ON v.id = uv.video_id AND uv.user_id = :user_id WHERE v.channel_id = c.id AND uv.watched = 1) AS watched_count, (SELECT COUNT(*) FROM videos v JOIN user_videos uv ON v.id = uv.video_id AND uv.user_id = :user_id WHERE v.channel_id = c.id AND uv.downloaded = 1) AS downloaded_count, 0 AS new_count, (SELECT v.youtube_video_id FROM videos v WHERE v.channel_id = c.id ORDER BY v.published_at DESC LIMIT 1) AS latest_video_id, (SELECT v.title FROM videos v WHERE v.channel_id = c.id ORDER BY v.published_at DESC LIMIT 1) AS latest_video_title FROM channels c LEFT JOIN user_channels uc ON c.id = uc.channel_id AND uc.user_id = :user_id WHERE c.id = :channel_id """), {"user_id": current_user.id, "channel_id": channel_id}, ).mappings().first() if not row: raise HTTPException(status_code=404, detail="Channel not found") # Re-index in the background if stale (not crawled in the last hour) stale = True try: crawled_at_raw = row.get("crawled_at") if crawled_at_raw: crawled_at = ( crawled_at_raw if isinstance(crawled_at_raw, datetime) else datetime.fromisoformat(str(crawled_at_raw)) ) stale = (datetime.utcnow() - crawled_at).total_seconds() > 3600 except Exception: pass if stale: background_tasks.add_task(_index_channel_task, channel_id, current_user.id) return ChannelOut(**dict(row)) @router.get("/{channel_id}/videos", response_model=list[VideoOut]) def get_channel_videos( channel_id: int, sort: str = "newest", offset: int = 0, limit: int = 60, q: str = "", db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): _get_channel_or_404(db, channel_id) params: dict = {"user_id": current_user.id, "channel_id": channel_id, "limit": limit, "offset": offset} q_clause = "" if q.strip(): q_clause = "AND (v.title LIKE :q OR v.description LIKE :q)" params["q"] = f"%{q.strip()}%" order = { "newest": "v.published_at DESC NULLS LAST", "oldest": "v.published_at ASC NULLS LAST", "title": "v.title ASC", "unwatched":"COALESCE(uv.watched, 0) ASC, v.published_at DESC NULLS LAST", "popular": "v.view_count DESC NULLS LAST", }.get(sort, "v.published_at DESC NULLS LAST") view_count_clause = "AND v.view_count IS NOT NULL" if sort == "popular" else "" rows = db.execute( text(f""" SELECT v.id, v.youtube_video_id, v.title, v.thumbnail_url, v.duration_seconds, v.published_at, v.view_count, COALESCE(uv.downloaded, 0) AS is_downloaded, COALESCE(uv.watched, 0) AS is_watched FROM videos v LEFT JOIN user_videos uv ON v.id = uv.video_id AND uv.user_id = :user_id WHERE v.channel_id = :channel_id {view_count_clause} {q_clause} ORDER BY {order} LIMIT :limit OFFSET :offset """), params, ).mappings().all() return [VideoOut(**dict(r)) for r in rows] @router.post("/{channel_id}/fetch-popular", status_code=status.HTTP_202_ACCEPTED) def fetch_popular_videos( channel_id: int, background_tasks: BackgroundTasks, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """Fetch the channel's most popular videos from YouTube and index them.""" channel = _get_channel_or_404(db, channel_id) background_tasks.add_task(_fetch_popular_task, channel_id, channel.youtube_channel_id) return {"detail": "Fetching popular videos"} def _fetch_popular_task(channel_id: int, youtube_channel_id: str): """Half-and-half popular fetch. Phase 1 (fast): flat-playlist crawl of the full channel → store any new videos in DB (title, duration, thumbnail). No individual requests. Phase 2 (parallel): for every video now in DB for this channel, fetch its watch page to get real view_count + published_at. Prioritises videos missing view_count; caps at 200 per run. Popular tab then sorts by view_count DESC. """ from ..database import SessionLocal from concurrent.futures import ThreadPoolExecutor, as_completed # Phase 1 — flat-playlist: crawl all channel videos quickly if youtube_channel_id.startswith("@"): url = f"https://www.youtube.com/{youtube_channel_id}/videos" else: url = f"https://www.youtube.com/channel/{youtube_channel_id}/videos" stdout, _, _ = ytdlp._run([ "yt-dlp", url, "--dump-json", "--flat-playlist", "--quiet", *ytdlp._cookie_args(), ], timeout=120) flat_entries = [] for line in stdout.splitlines(): line = line.strip() if not line: continue try: info = json.loads(line) yt_id = info.get("id") if yt_id: flat_entries.append({ "id": yt_id, "title": info.get("title", ""), "duration": info.get("duration"), }) except json.JSONDecodeError: continue # Store any new videos from the flat crawl if flat_entries: db = SessionLocal() try: channel = db.query(Channel).filter_by(id=channel_id).first() if channel: for entry in flat_entries: if not db.query(Video).filter_by(youtube_video_id=entry["id"]).first(): try: db.add(Video( youtube_video_id=entry["id"], channel_id=channel_id, title=entry["title"], thumbnail_url=ytdlp._stable_thumbnail(entry["id"]), duration_seconds=entry["duration"], tags="[]", )) db.commit() except Exception: db.rollback() finally: db.close() # Phase 2 — individual fetches for view_count, prioritising missing ones db = SessionLocal() try: rows = db.execute( text(""" SELECT youtube_video_id FROM videos WHERE channel_id = :cid ORDER BY (view_count IS NULL) DESC, published_at DESC NULLS LAST LIMIT 200 """), {"cid": channel_id}, ).mappings().all() video_ids = [r["youtube_video_id"] for r in rows] finally: db.close() if not video_ids: return with ThreadPoolExecutor(max_workers=8) as pool: futures = {pool.submit(ytdlp.fetch_video_metadata, vid): vid for vid in video_ids} results = {} for future in as_completed(futures): vid = futures[future] try: results[vid] = future.result() except Exception: pass db = SessionLocal() try: for yt_id, meta in results.items(): if not meta: continue try: vid = db.query(Video).filter_by(youtube_video_id=yt_id).first() if vid: if meta.get("view_count") is not None: vid.view_count = meta["view_count"] if not vid.published_at and meta.get("published_at"): vid.published_at = meta["published_at"] db.commit() except Exception: db.rollback() finally: db.close() @router.post("/{channel_id}/search", status_code=status.HTTP_202_ACCEPTED) def search_channel_youtube( channel_id: int, q: str, background_tasks: BackgroundTasks, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """Search YouTube within this channel and index matching videos.""" channel = _get_channel_or_404(db, channel_id) background_tasks.add_task(_search_channel_task, channel_id, channel.youtube_channel_id, q, current_user.id) return {"detail": "Search started"} def _search_channel_task(channel_id: int, youtube_channel_id: str, q: str, user_id: int): """Fetch videos matching q from YouTube for this channel and index them.""" from ..database import SessionLocal from urllib.parse import quote db = SessionLocal() try: url = f"https://www.youtube.com/channel/{youtube_channel_id}/search?query={quote(q)}" result = ytdlp.fetch_channel_metadata(youtube_channel_id, max_videos=100) if not result: return # Filter results by query match (yt-dlp fetches all; we filter titles locally) q_lower = q.lower() matched = [v for v in result.get("videos", []) if q_lower in (v.get("title") or "").lower()] if not matched: matched = result.get("videos", [])[:30] channel = db.query(Channel).filter_by(id=channel_id).first() if not channel: return for vdata in matched: yt_id = vdata.get("youtube_video_id") if not yt_id: continue existing = db.query(Video).filter_by(youtube_video_id=yt_id).first() if not existing: db.add(Video( youtube_video_id=yt_id, channel_id=channel.id, title=vdata.get("title", ""), description=vdata.get("description"), thumbnail_url=vdata.get("thumbnail_url"), duration_seconds=vdata.get("duration_seconds"), published_at=vdata.get("published_at"), tags=vdata.get("tags"), category=vdata.get("category"), )) db.commit() except Exception: db.rollback() finally: db.close() @router.post("/{channel_id}/follow", status_code=status.HTTP_204_NO_CONTENT) def follow_channel( channel_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): _get_channel_or_404(db, channel_id) uc = db.query(UserChannel).filter_by(user_id=current_user.id, channel_id=channel_id).first() if uc: uc.status = "followed" else: db.add(UserChannel(user_id=current_user.id, channel_id=channel_id, status="followed")) db.commit() @router.delete("/{channel_id}/follow", status_code=status.HTTP_204_NO_CONTENT) def unfollow_channel( channel_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): uc = db.query(UserChannel).filter_by(user_id=current_user.id, channel_id=channel_id).first() if uc: db.delete(uc) db.commit() @router.patch("/{channel_id}/auto-download", status_code=200) def set_channel_auto_download( channel_id: int, body: dict, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): uc = db.query(UserChannel).filter_by(user_id=current_user.id, channel_id=channel_id).first() if not uc: raise HTTPException(status_code=404, detail="Not following this channel") value = body.get("auto_download") # True / False / None uc.auto_download = value db.commit() return {"auto_download": uc.auto_download} @router.post("/{channel_id}/index", status_code=status.HTTP_202_ACCEPTED) def index_channel( channel_id: int, background_tasks: BackgroundTasks, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): _get_channel_or_404(db, channel_id) background_tasks.add_task(_index_channel_task, channel_id, current_user.id, 100) return {"detail": "Indexing started"} @router.post("/{channel_id}/index-full", status_code=status.HTTP_202_ACCEPTED) def index_channel_full( channel_id: int, background_tasks: BackgroundTasks, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): _get_channel_or_404(db, channel_id) background_tasks.add_task(_index_channel_task, channel_id, current_user.id, 0) return {"detail": "Full index started"} @router.post("/{channel_id}/explore", status_code=status.HTTP_202_ACCEPTED) def explore_channel_older( channel_id: int, page: int = 2, background_tasks: BackgroundTasks = None, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """Fetch a page of older videos from this channel (page 1 = newest 30, page 2 = next 100, etc.).""" _get_channel_or_404(db, channel_id) start = 1 if page <= 1 else (30 + (page - 2) * 100 + 1) background_tasks.add_task(_index_channel_explore_task, channel_id, current_user.id, start, 100) return {"detail": f"Fetching older videos (page {page})", "start": start} def _index_channel_explore_task(channel_id: int, user_id: int, start_video: int, count: int): from ..database import SessionLocal db = SessionLocal() try: channel = db.query(Channel).filter_by(id=channel_id).first() if not channel: return result = ytdlp.fetch_channel_metadata(channel.youtube_channel_id, max_videos=count, start_video=start_video) if not result: return for vdata in result.get("videos", []): yt_id = vdata.get("youtube_video_id") if not yt_id: continue if not db.query(Video).filter_by(youtube_video_id=yt_id).first(): db.add(Video( youtube_video_id=yt_id, channel_id=channel.id, title=vdata.get("title", ""), description=vdata.get("description"), thumbnail_url=vdata.get("thumbnail_url"), duration_seconds=vdata.get("duration_seconds"), published_at=vdata.get("published_at"), tags=vdata.get("tags"), category=vdata.get("category"), )) db.commit() except Exception: db.rollback() finally: db.close() @router.post("/follow-bulk", status_code=200) def follow_bulk( body: dict, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """Follow a large list of channel handles/IDs without hitting YouTube. Creates stub Channel records for unknowns and UserChannel rows immediately. Metadata (name, thumbnail, videos) fills in when the user hits Sync All. """ handles = body.get("handles", []) if not handles or not isinstance(handles, list): raise HTTPException(status_code=400, detail="handles list required") followed = 0 already = 0 created = 0 for handle in handles: handle = str(handle).strip() if not handle: continue channel = db.query(Channel).filter_by(youtube_channel_id=handle).first() if not channel: # Stub — name defaults to handle, filled in on next index channel = Channel( youtube_channel_id=handle, name=handle.lstrip("@"), ) db.add(channel) db.flush() created += 1 uc = db.query(UserChannel).filter_by( user_id=current_user.id, channel_id=channel.id ).first() if uc: if uc.status != "followed": uc.status = "followed" followed += 1 else: already += 1 else: db.add(UserChannel( user_id=current_user.id, channel_id=channel.id, status="followed", )) followed += 1 db.commit() return {"followed": followed, "already_following": already, "new_channels": created} @router.patch("/{channel_id}/notes", status_code=200) def update_channel_notes( channel_id: int, body: dict, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): uc = db.query(UserChannel).filter_by(user_id=current_user.id, channel_id=channel_id).first() if not uc: raise HTTPException(status_code=404, detail="Not following this channel") uc.notes = body.get("notes", "") or "" db.commit() return {"ok": True} @router.post("/{channel_id}/mute", status_code=204) def mute_channel( channel_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): uc = db.query(UserChannel).filter_by(user_id=current_user.id, channel_id=channel_id).first() if not uc: raise HTTPException(status_code=404, detail="Not following this channel") uc.muted_until = datetime.utcnow() + timedelta(days=30) db.commit() @router.delete("/{channel_id}/mute", status_code=204) def unmute_channel( channel_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): uc = db.query(UserChannel).filter_by(user_id=current_user.id, channel_id=channel_id).first() if uc: uc.muted_until = None db.commit() @router.post("/follow-by-url", status_code=status.HTTP_201_CREATED) def follow_by_url( body: dict, background_tasks: BackgroundTasks, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): yt_channel_id = body.get("youtube_channel_id") or body.get("channel_id") if not yt_channel_id: raise HTTPException(status_code=400, detail="youtube_channel_id required") channel = db.query(Channel).filter_by(youtube_channel_id=yt_channel_id).first() if not channel: meta = ytdlp.fetch_channel_metadata(yt_channel_id, max_videos=30) if not meta or not meta.get("channel"): raise HTTPException(status_code=404, detail="Channel not found on YouTube") ch_data = meta["channel"] channel = Channel(**{k: v for k, v in ch_data.items() if hasattr(Channel, k)}) channel.crawled_at = datetime.utcnow() db.add(channel) db.flush() for vdata in meta.get("videos", []): yt_id = vdata.get("youtube_video_id") if not yt_id: continue if not db.query(Video).filter_by(youtube_video_id=yt_id).first(): db.add(Video( youtube_video_id=yt_id, channel_id=channel.id, title=vdata.get("title", ""), description=vdata.get("description"), thumbnail_url=vdata.get("thumbnail_url"), duration_seconds=vdata.get("duration_seconds"), published_at=vdata.get("published_at"), tags=vdata.get("tags"), category=vdata.get("category"), )) db.commit() db.refresh(channel) uc = db.query(UserChannel).filter_by(user_id=current_user.id, channel_id=channel.id).first() if uc: uc.status = "followed" else: db.add(UserChannel(user_id=current_user.id, channel_id=channel.id, status="followed")) db.commit() background_tasks.add_task(_discovery_task, current_user.id) return {"channel_id": channel.id, "name": channel.name}