The refresh endpoint was passing the request's db session to the background task, which is closed before the task runs — silently doing nothing on every refresh. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
277 lines
9.7 KiB
Python
277 lines
9.7 KiB
Python
import json
|
|
from typing import Optional
|
|
|
|
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException
|
|
from pydantic import BaseModel
|
|
from sqlalchemy.orm import Session
|
|
from sqlalchemy import text
|
|
|
|
from ..auth_utils import get_current_user
|
|
from ..database import get_db
|
|
from ..models import Channel, DiscoveryQueue, User, UserChannel, UserSettings
|
|
from ..services.discovery import run_full_discovery
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
class PreviewVideo(BaseModel):
|
|
thumbnail_url: str
|
|
title: str
|
|
|
|
|
|
class DiscoveryItem(BaseModel):
|
|
id: int
|
|
channel_id: int
|
|
youtube_channel_id: str
|
|
name: str
|
|
description: Optional[str]
|
|
thumbnail_url: Optional[str]
|
|
subscriber_count: Optional[int] = None
|
|
score: float
|
|
source: Optional[str]
|
|
recent_video_titles: list[str] = []
|
|
preview_videos: list[PreviewVideo] = []
|
|
|
|
model_config = {"from_attributes": True}
|
|
|
|
|
|
@router.get("", response_model=list[DiscoveryItem])
|
|
def list_discovery(
|
|
offset: int = 0,
|
|
limit: int = 50,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
rows = db.execute(
|
|
text("""
|
|
SELECT dq.id, dq.channel_id, dq.score, dq.source, dq.preview_json,
|
|
c.youtube_channel_id, c.name, c.description, c.thumbnail_url, c.subscriber_count
|
|
FROM discovery_queue dq
|
|
JOIN channels c ON dq.channel_id = c.id
|
|
WHERE dq.user_id = :user_id AND dq.seen = 0
|
|
AND dq.channel_id NOT IN (
|
|
SELECT channel_id FROM user_channels
|
|
WHERE user_id = :user_id AND status IN ('followed', 'dismissed')
|
|
)
|
|
ORDER BY dq.score DESC
|
|
LIMIT :limit OFFSET :offset
|
|
"""),
|
|
{"user_id": current_user.id, "limit": limit * 3, "offset": offset},
|
|
).mappings().all()
|
|
|
|
# Load negative affinity tags and use them to filter channels already in the queue
|
|
neg_affinity = {
|
|
r["tag"] for r in db.execute(
|
|
text("SELECT tag FROM user_tag_affinity WHERE user_id = :user_id AND score < -2"),
|
|
{"user_id": current_user.id},
|
|
).mappings().all()
|
|
}
|
|
if neg_affinity and rows:
|
|
channel_ids_csv = ",".join(str(r["channel_id"]) for r in rows)
|
|
vtag_rows = db.execute(
|
|
text(f"SELECT channel_id, tags FROM videos WHERE channel_id IN ({channel_ids_csv}) AND tags IS NOT NULL LIMIT 1000")
|
|
).mappings().all()
|
|
neg_hit: dict[int, int] = {}
|
|
for vr in vtag_rows:
|
|
try:
|
|
for tag in json.loads(vr["tags"] or "[]"):
|
|
if isinstance(tag, str) and tag.lower().strip() in neg_affinity:
|
|
neg_hit[vr["channel_id"]] = neg_hit.get(vr["channel_id"], 0) + 1
|
|
except (json.JSONDecodeError, TypeError):
|
|
pass
|
|
rows = [r for r in rows if neg_hit.get(r["channel_id"], 0) < 3]
|
|
|
|
rows = rows[:limit]
|
|
items = []
|
|
for row in rows:
|
|
row = dict(row)
|
|
video_rows = db.execute(
|
|
text("""
|
|
SELECT title, youtube_video_id FROM videos
|
|
WHERE channel_id = :channel_id
|
|
ORDER BY published_at DESC
|
|
LIMIT 3
|
|
"""),
|
|
{"channel_id": row["channel_id"]},
|
|
).fetchall()
|
|
row["recent_video_titles"] = [r[0] for r in video_rows]
|
|
|
|
if video_rows:
|
|
row["preview_videos"] = [
|
|
{
|
|
"thumbnail_url": f"https://i.ytimg.com/vi/{r[1]}/hqdefault.jpg",
|
|
"title": r[0],
|
|
}
|
|
for r in video_rows
|
|
]
|
|
else:
|
|
try:
|
|
row["preview_videos"] = json.loads(row.get("preview_json") or "[]")
|
|
except (json.JSONDecodeError, TypeError):
|
|
row["preview_videos"] = []
|
|
|
|
items.append(DiscoveryItem(**row))
|
|
return items
|
|
|
|
|
|
@router.post("/{channel_id}/follow", status_code=204)
|
|
def follow_discovery(
|
|
channel_id: int,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
channel = db.query(Channel).filter(Channel.id == channel_id).first()
|
|
if not channel:
|
|
raise HTTPException(status_code=404, detail="Channel not found")
|
|
|
|
uc = db.query(UserChannel).filter_by(user_id=current_user.id, channel_id=channel_id).first()
|
|
if uc:
|
|
uc.status = "followed"
|
|
else:
|
|
db.add(UserChannel(user_id=current_user.id, channel_id=channel_id, status="followed"))
|
|
|
|
dq = db.query(DiscoveryQueue).filter_by(user_id=current_user.id, channel_id=channel_id).first()
|
|
if dq:
|
|
dq.seen = True
|
|
db.commit()
|
|
|
|
|
|
@router.post("/{channel_id}/dismiss", status_code=204)
|
|
def dismiss_discovery(
|
|
channel_id: int,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
uc = db.query(UserChannel).filter_by(user_id=current_user.id, channel_id=channel_id).first()
|
|
if uc:
|
|
uc.status = "dismissed"
|
|
else:
|
|
db.add(UserChannel(user_id=current_user.id, channel_id=channel_id, status="dismissed"))
|
|
|
|
dq = db.query(DiscoveryQueue).filter_by(user_id=current_user.id, channel_id=channel_id).first()
|
|
if dq:
|
|
dq.seen = True
|
|
db.commit()
|
|
|
|
|
|
@router.post("/refresh", status_code=202)
|
|
def refresh_discovery(
|
|
background_tasks: BackgroundTasks,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
s = db.query(UserSettings).filter_by(user_id=current_user.id).first()
|
|
regions_str = (s.discovery_regions if s and s.discovery_regions else "US,SE")
|
|
regions = [r.strip().upper() for r in regions_str.split(",") if r.strip()]
|
|
user_id = current_user.id
|
|
|
|
def _run_discovery():
|
|
from ..database import SessionLocal
|
|
fresh_db = SessionLocal()
|
|
try:
|
|
run_full_discovery(fresh_db, user_id, regions)
|
|
finally:
|
|
fresh_db.close()
|
|
|
|
background_tasks.add_task(_run_discovery)
|
|
from .channels import _enrich_missing_task
|
|
background_tasks.add_task(_enrich_missing_task, 20)
|
|
return {"detail": "Discovery refresh started"}
|
|
|
|
|
|
@router.get("/videos", response_model=list[dict])
|
|
def discovery_videos(
|
|
offset: int = 0,
|
|
limit: int = 50,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""2 recent videos from every channel in the discovery queue that has indexed content.
|
|
Ordered by channel score so the best-matched channels surface first.
|
|
Channels fall out naturally when dismissed or followed."""
|
|
rows = db.execute(
|
|
text("""
|
|
SELECT * FROM (
|
|
SELECT v.id, v.youtube_video_id, v.title, v.thumbnail_url,
|
|
v.duration_seconds, v.published_at,
|
|
c.id AS channel_id, c.name AS channel_name,
|
|
c.youtube_channel_id AS channel_youtube_id,
|
|
dq.score,
|
|
ROW_NUMBER() OVER (
|
|
PARTITION BY c.id ORDER BY v.published_at DESC NULLS LAST
|
|
) AS rn
|
|
FROM videos v
|
|
JOIN channels c ON v.channel_id = c.id
|
|
JOIN discovery_queue dq ON c.id = dq.channel_id
|
|
WHERE dq.user_id = :user_id AND dq.seen = 0
|
|
AND dq.channel_id NOT IN (
|
|
SELECT channel_id FROM user_channels
|
|
WHERE user_id = :user_id AND status IN ('followed', 'dismissed')
|
|
)
|
|
)
|
|
WHERE rn <= 2
|
|
ORDER BY score DESC, rn ASC, RANDOM()
|
|
LIMIT :limit OFFSET :offset
|
|
"""),
|
|
{"user_id": current_user.id, "limit": limit, "offset": offset},
|
|
).mappings().all()
|
|
return [dict(r) for r in rows]
|
|
|
|
|
|
@router.post("/videos/{youtube_video_id}/dismiss", status_code=204)
|
|
def dismiss_discovery_video(
|
|
youtube_video_id: str,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""Dismiss all discovery for the channel that owns this video."""
|
|
from ..models import Video
|
|
video = db.query(Video).filter_by(youtube_video_id=youtube_video_id).first()
|
|
if not video or not video.channel_id:
|
|
raise HTTPException(status_code=404, detail="Video not found")
|
|
|
|
channel_id = video.channel_id
|
|
uc = db.query(UserChannel).filter_by(user_id=current_user.id, channel_id=channel_id).first()
|
|
if uc:
|
|
uc.status = "dismissed"
|
|
else:
|
|
db.add(UserChannel(user_id=current_user.id, channel_id=channel_id, status="dismissed"))
|
|
|
|
dq = db.query(DiscoveryQueue).filter_by(user_id=current_user.id, channel_id=channel_id).first()
|
|
if dq:
|
|
dq.seen = True
|
|
|
|
from ..routers.videos import _update_affinity
|
|
_update_affinity(db, current_user.id, video, -3.0)
|
|
|
|
db.commit()
|
|
|
|
|
|
@router.get("/community", response_model=list[dict])
|
|
def community_shelf(
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""Channels downloaded by other users, weighted by count."""
|
|
rows = db.execute(
|
|
text("""
|
|
SELECT c.id, c.youtube_channel_id, c.name, c.thumbnail_url,
|
|
COUNT(DISTINCT d.user_id) AS downloader_count,
|
|
v.title AS latest_title, v.thumbnail_url AS latest_thumbnail
|
|
FROM downloads d
|
|
JOIN videos v ON d.video_id = v.id
|
|
JOIN channels c ON v.channel_id = c.id
|
|
WHERE d.user_id != :user_id
|
|
AND d.status = 'complete'
|
|
AND v.id NOT IN (
|
|
SELECT uv.video_id FROM user_videos uv
|
|
WHERE uv.user_id = :user_id AND (uv.watched = 1 OR uv.downloaded = 1)
|
|
)
|
|
GROUP BY c.id
|
|
ORDER BY downloader_count DESC
|
|
LIMIT 20
|
|
"""),
|
|
{"user_id": current_user.id},
|
|
).mappings().all()
|
|
return [dict(r) for r in rows]
|