import json from typing import Optional from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException from pydantic import BaseModel from sqlalchemy.orm import Session from sqlalchemy import text from ..auth_utils import get_current_user from ..database import get_db from ..models import Channel, DiscoveryQueue, User, UserChannel, UserSettings from ..services.discovery import run_full_discovery router = APIRouter() class PreviewVideo(BaseModel): thumbnail_url: str title: str class DiscoveryItem(BaseModel): id: int channel_id: int youtube_channel_id: str name: str description: Optional[str] thumbnail_url: Optional[str] subscriber_count: Optional[int] = None score: float source: Optional[str] recent_video_titles: list[str] = [] preview_videos: list[PreviewVideo] = [] model_config = {"from_attributes": True} @router.get("", response_model=list[DiscoveryItem]) def list_discovery( offset: int = 0, limit: int = 50, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): rows = db.execute( text(""" SELECT dq.id, dq.channel_id, dq.score, dq.source, dq.preview_json, c.youtube_channel_id, c.name, c.description, c.thumbnail_url, c.subscriber_count FROM discovery_queue dq JOIN channels c ON dq.channel_id = c.id WHERE dq.user_id = :user_id AND dq.seen = 0 AND dq.channel_id NOT IN ( SELECT channel_id FROM user_channels WHERE user_id = :user_id AND status IN ('followed', 'dismissed') ) ORDER BY dq.score DESC LIMIT :limit OFFSET :offset """), {"user_id": current_user.id, "limit": limit * 3, "offset": offset}, ).mappings().all() # Load negative affinity tags and use them to filter channels already in the queue neg_affinity = { r["tag"] for r in db.execute( text("SELECT tag FROM user_tag_affinity WHERE user_id = :user_id AND score < -2"), {"user_id": current_user.id}, ).mappings().all() } if neg_affinity and rows: channel_ids_csv = ",".join(str(r["channel_id"]) for r in rows) vtag_rows = db.execute( text(f"SELECT channel_id, tags FROM videos WHERE channel_id IN ({channel_ids_csv}) AND tags IS NOT NULL LIMIT 1000") ).mappings().all() neg_hit: dict[int, int] = {} for vr in vtag_rows: try: for tag in json.loads(vr["tags"] or "[]"): if isinstance(tag, str) and tag.lower().strip() in neg_affinity: neg_hit[vr["channel_id"]] = neg_hit.get(vr["channel_id"], 0) + 1 except (json.JSONDecodeError, TypeError): pass rows = [r for r in rows if neg_hit.get(r["channel_id"], 0) < 3] rows = rows[:limit] items = [] for row in rows: row = dict(row) video_rows = db.execute( text(""" SELECT title, youtube_video_id FROM videos WHERE channel_id = :channel_id ORDER BY published_at DESC LIMIT 3 """), {"channel_id": row["channel_id"]}, ).fetchall() row["recent_video_titles"] = [r[0] for r in video_rows] if video_rows: row["preview_videos"] = [ { "thumbnail_url": f"https://i.ytimg.com/vi/{r[1]}/hqdefault.jpg", "title": r[0], } for r in video_rows ] else: try: row["preview_videos"] = json.loads(row.get("preview_json") or "[]") except (json.JSONDecodeError, TypeError): row["preview_videos"] = [] items.append(DiscoveryItem(**row)) return items @router.post("/{channel_id}/follow", status_code=204) def follow_discovery( channel_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): channel = db.query(Channel).filter(Channel.id == channel_id).first() if not channel: raise HTTPException(status_code=404, detail="Channel not found") uc = db.query(UserChannel).filter_by(user_id=current_user.id, channel_id=channel_id).first() if uc: uc.status = "followed" else: db.add(UserChannel(user_id=current_user.id, channel_id=channel_id, status="followed")) dq = db.query(DiscoveryQueue).filter_by(user_id=current_user.id, channel_id=channel_id).first() if dq: dq.seen = True db.commit() @router.post("/{channel_id}/dismiss", status_code=204) def dismiss_discovery( channel_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): uc = db.query(UserChannel).filter_by(user_id=current_user.id, channel_id=channel_id).first() if uc: uc.status = "dismissed" else: db.add(UserChannel(user_id=current_user.id, channel_id=channel_id, status="dismissed")) dq = db.query(DiscoveryQueue).filter_by(user_id=current_user.id, channel_id=channel_id).first() if dq: dq.seen = True db.commit() @router.post("/refresh", status_code=202) def refresh_discovery( background_tasks: BackgroundTasks, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): s = db.query(UserSettings).filter_by(user_id=current_user.id).first() regions_str = (s.discovery_regions if s and s.discovery_regions else "US,SE") regions = [r.strip().upper() for r in regions_str.split(",") if r.strip()] background_tasks.add_task(run_full_discovery, db, current_user.id, regions) from .channels import _enrich_missing_task background_tasks.add_task(_enrich_missing_task, 20) return {"detail": "Discovery refresh started"} @router.get("/videos", response_model=list[dict]) def discovery_videos( offset: int = 0, limit: int = 50, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """2 recent videos from every channel in the discovery queue that has indexed content. Ordered by channel score so the best-matched channels surface first. Channels fall out naturally when dismissed or followed.""" rows = db.execute( text(""" SELECT * FROM ( SELECT v.id, v.youtube_video_id, v.title, v.thumbnail_url, v.duration_seconds, v.published_at, c.id AS channel_id, c.name AS channel_name, c.youtube_channel_id AS channel_youtube_id, dq.score, ROW_NUMBER() OVER ( PARTITION BY c.id ORDER BY v.published_at DESC NULLS LAST ) AS rn FROM videos v JOIN channels c ON v.channel_id = c.id JOIN discovery_queue dq ON c.id = dq.channel_id WHERE dq.user_id = :user_id AND dq.seen = 0 AND dq.channel_id NOT IN ( SELECT channel_id FROM user_channels WHERE user_id = :user_id AND status IN ('followed', 'dismissed') ) ) WHERE rn <= 2 ORDER BY score DESC, rn ASC, RANDOM() LIMIT :limit OFFSET :offset """), {"user_id": current_user.id, "limit": limit, "offset": offset}, ).mappings().all() return [dict(r) for r in rows] @router.post("/videos/{youtube_video_id}/dismiss", status_code=204) def dismiss_discovery_video( youtube_video_id: str, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """Dismiss all discovery for the channel that owns this video.""" from ..models import Video video = db.query(Video).filter_by(youtube_video_id=youtube_video_id).first() if not video or not video.channel_id: raise HTTPException(status_code=404, detail="Video not found") channel_id = video.channel_id uc = db.query(UserChannel).filter_by(user_id=current_user.id, channel_id=channel_id).first() if uc: uc.status = "dismissed" else: db.add(UserChannel(user_id=current_user.id, channel_id=channel_id, status="dismissed")) dq = db.query(DiscoveryQueue).filter_by(user_id=current_user.id, channel_id=channel_id).first() if dq: dq.seen = True from ..routers.videos import _update_affinity _update_affinity(db, current_user.id, video, -3.0) db.commit() @router.get("/community", response_model=list[dict]) def community_shelf( db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """Channels downloaded by other users, weighted by count.""" rows = db.execute( text(""" SELECT c.id, c.youtube_channel_id, c.name, c.thumbnail_url, COUNT(DISTINCT d.user_id) AS downloader_count, v.title AS latest_title, v.thumbnail_url AS latest_thumbnail FROM downloads d JOIN videos v ON d.video_id = v.id JOIN channels c ON v.channel_id = c.id WHERE d.user_id != :user_id AND d.status = 'complete' AND v.id NOT IN ( SELECT uv.video_id FROM user_videos uv WHERE uv.user_id = :user_id AND (uv.watched = 1 OR uv.downloaded = 1) ) GROUP BY c.id ORDER BY downloader_count DESC LIMIT 20 """), {"user_id": current_user.id}, ).mappings().all() return [dict(r) for r in rows]