- Offline banner in nav when backend is unreachable (network error, not 4xx)
- GET /channels/{id}/random — picks random unwatched video, navigates to watch
- GET /channels/{id}/in-progress — videos with >30s progress, not yet watched
- Channel page: 'Surprise me' button (desktop + mobile) navigates to random video
- Channel page: 'Continue watching' row above video list when in-progress videos exist
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1190 lines
44 KiB
Python
1190 lines
44 KiB
Python
import json
|
|
import threading as _threading
|
|
from datetime import datetime, timedelta
|
|
from typing import Optional
|
|
|
|
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, status
|
|
from pydantic import BaseModel
|
|
from sqlalchemy.orm import Session
|
|
from sqlalchemy import text
|
|
|
|
from ..auth_utils import get_current_user
|
|
from ..database import get_db
|
|
from ..models import Channel, ChannelGroup, ChannelGroupMember, Download, User, UserChannel, UserSettings, UserVideo, Video
|
|
from ..services import ytdlp
|
|
|
|
router = APIRouter()
|
|
|
|
_tasks: dict = {}
|
|
_tasks_lock = _threading.Lock()
|
|
|
|
|
|
class ChannelOut(BaseModel):
|
|
id: int
|
|
youtube_channel_id: str
|
|
name: str
|
|
description: Optional[str]
|
|
thumbnail_url: Optional[str]
|
|
banner_url: Optional[str]
|
|
crawled_at: Optional[datetime]
|
|
status: Optional[str]
|
|
auto_download: Optional[bool] = None
|
|
subscriber_count: Optional[int] = None
|
|
video_count: int = 0
|
|
unwatched_count: int = 0
|
|
watched_count: int = 0
|
|
downloaded_count: int = 0
|
|
last_published_at: Optional[datetime] = None
|
|
new_count: int = 0
|
|
latest_video_id: Optional[str] = None
|
|
latest_video_title: Optional[str] = None
|
|
muted_until: Optional[datetime] = None
|
|
upload_frequency_days: Optional[float] = None
|
|
notes: Optional[str] = ""
|
|
|
|
model_config = {"from_attributes": True}
|
|
|
|
|
|
class ChannelGroupOut(BaseModel):
|
|
id: int
|
|
name: str
|
|
channel_ids: list[int] = []
|
|
|
|
model_config = {"from_attributes": True}
|
|
|
|
|
|
class VideoOut(BaseModel):
|
|
id: int
|
|
youtube_video_id: str
|
|
title: str
|
|
thumbnail_url: Optional[str]
|
|
duration_seconds: Optional[int]
|
|
published_at: Optional[datetime]
|
|
channel_id: Optional[int] = None
|
|
channel_name: Optional[str] = None
|
|
channel_youtube_id: Optional[str] = None
|
|
is_downloaded: bool = False
|
|
is_watched: bool = False
|
|
queued: bool = False
|
|
view_count: Optional[int] = None
|
|
|
|
model_config = {"from_attributes": True}
|
|
|
|
|
|
|
|
def _get_channel_or_404(db: Session, channel_id: int) -> Channel:
|
|
c = db.query(Channel).filter(Channel.id == channel_id).first()
|
|
if not c:
|
|
raise HTTPException(status_code=404, detail="Channel not found")
|
|
return c
|
|
|
|
|
|
def _index_channels_batch(channel_ids: list[int], user_id: int, delay: float = 1.5):
|
|
"""Run channel syncs sequentially with a polite delay between requests."""
|
|
import time
|
|
for i, cid in enumerate(channel_ids):
|
|
if i > 0:
|
|
time.sleep(delay)
|
|
_index_channel_task(cid, user_id)
|
|
|
|
|
|
def _index_channel_task(channel_id: int, user_id: int, max_videos: int = 30):
|
|
from ..database import SessionLocal
|
|
db = SessionLocal()
|
|
try:
|
|
channel = db.query(Channel).filter_by(id=channel_id).first()
|
|
if not channel:
|
|
return
|
|
|
|
result = ytdlp.fetch_channel_metadata(channel.youtube_channel_id, max_videos=max_videos)
|
|
if not result:
|
|
return
|
|
|
|
ch_data = result.get("channel", {})
|
|
if ch_data:
|
|
for k, v in ch_data.items():
|
|
if hasattr(channel, k) and v is not None and v != "":
|
|
setattr(channel, k, v)
|
|
channel.crawled_at = datetime.utcnow()
|
|
db.merge(channel)
|
|
|
|
new_video_ids = []
|
|
for vdata in result.get("videos", []):
|
|
yt_id = vdata.get("youtube_video_id")
|
|
if not yt_id:
|
|
continue
|
|
existing = db.query(Video).filter_by(youtube_video_id=yt_id).first()
|
|
if not existing:
|
|
new_video = Video(
|
|
youtube_video_id=yt_id,
|
|
channel_id=channel.id,
|
|
title=vdata.get("title", ""),
|
|
description=vdata.get("description"),
|
|
thumbnail_url=vdata.get("thumbnail_url"),
|
|
duration_seconds=vdata.get("duration_seconds"),
|
|
published_at=vdata.get("published_at"),
|
|
tags=vdata.get("tags"),
|
|
category=vdata.get("category"),
|
|
)
|
|
db.add(new_video)
|
|
db.flush()
|
|
new_video_ids.append((yt_id, new_video.id))
|
|
else:
|
|
# Backfill missing metadata on existing videos
|
|
if existing.published_at is None and vdata.get("published_at"):
|
|
existing.published_at = vdata["published_at"]
|
|
if not existing.title and vdata.get("title"):
|
|
existing.title = vdata["title"]
|
|
if not existing.thumbnail_url and vdata.get("thumbnail_url"):
|
|
existing.thumbnail_url = vdata["thumbnail_url"]
|
|
if not existing.duration_seconds and vdata.get("duration_seconds"):
|
|
existing.duration_seconds = vdata["duration_seconds"]
|
|
if not existing.description and vdata.get("description"):
|
|
existing.description = vdata["description"]
|
|
db.commit()
|
|
|
|
# Auto-download new videos if setting says to
|
|
if new_video_ids and user_id:
|
|
uc = db.query(UserChannel).filter_by(user_id=user_id, channel_id=channel.id).first()
|
|
user_settings = db.query(UserSettings).filter_by(user_id=user_id).first()
|
|
global_auto = user_settings.auto_download_on_sync if user_settings else False
|
|
channel_auto = uc.auto_download if uc and uc.auto_download is not None else global_auto
|
|
|
|
if channel_auto:
|
|
quality = user_settings.preferred_quality if user_settings else "best"
|
|
subtitle_langs = (user_settings.subtitle_langs or "") if user_settings else ""
|
|
from ..routers.downloads import _on_progress, _on_complete, _on_error
|
|
for yt_id, vid_id in new_video_ids:
|
|
existing_dl = db.query(Download).filter_by(
|
|
user_id=user_id, video_id=vid_id
|
|
).filter(Download.status.in_(["pending", "downloading", "complete"])).first()
|
|
if not existing_dl:
|
|
dl = Download(user_id=user_id, video_id=vid_id, status="pending")
|
|
db.add(dl)
|
|
db.flush()
|
|
import threading
|
|
t = threading.Thread(
|
|
target=ytdlp.start_download,
|
|
args=(yt_id, dl.id, _on_progress, _on_complete, _on_error, quality, subtitle_langs),
|
|
daemon=True,
|
|
)
|
|
t.start()
|
|
db.commit()
|
|
except Exception:
|
|
db.rollback()
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
def _discovery_task(user_id: int):
|
|
from ..database import SessionLocal
|
|
from ..services.discovery import run_full_discovery
|
|
db = SessionLocal()
|
|
try:
|
|
run_full_discovery(db, user_id)
|
|
except Exception:
|
|
pass
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
def _enrich_missing_task(limit: int = 20):
|
|
"""Fetch full metadata for videos missing description, published_at, or view_count."""
|
|
from ..database import SessionLocal
|
|
import time
|
|
db = SessionLocal()
|
|
try:
|
|
rows = db.execute(
|
|
text("""
|
|
SELECT v.id, v.youtube_video_id FROM videos v
|
|
WHERE v.description IS NULL OR v.published_at IS NULL OR v.view_count IS NULL
|
|
ORDER BY
|
|
(EXISTS (SELECT 1 FROM user_channels uc
|
|
WHERE uc.channel_id = v.channel_id AND uc.status = 'followed')) DESC,
|
|
(EXISTS (SELECT 1 FROM discovery_queue dq
|
|
WHERE dq.channel_id = v.channel_id)) DESC,
|
|
v.id DESC
|
|
LIMIT :limit
|
|
"""),
|
|
{"limit": limit},
|
|
).mappings().all()
|
|
for i, row in enumerate(rows):
|
|
if i > 0:
|
|
time.sleep(0.5)
|
|
try:
|
|
meta = ytdlp.fetch_video_metadata(row["youtube_video_id"])
|
|
if meta:
|
|
vid = db.query(Video).filter_by(id=row["id"]).first()
|
|
if vid:
|
|
if meta.get("description") is not None:
|
|
vid.description = meta["description"] or ""
|
|
if not vid.published_at and meta.get("published_at"):
|
|
vid.published_at = meta["published_at"]
|
|
if vid.view_count is None and meta.get("view_count") is not None:
|
|
vid.view_count = meta["view_count"]
|
|
if not vid.tags and meta.get("tags"):
|
|
vid.tags = meta["tags"]
|
|
if not vid.category and meta.get("category"):
|
|
vid.category = meta["category"]
|
|
if not vid.chapters and meta.get("chapters"):
|
|
vid.chapters = meta["chapters"]
|
|
db.commit()
|
|
except Exception:
|
|
db.rollback()
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@router.get("/feed", response_model=list[VideoOut])
|
|
def channel_feed(
|
|
limit: int = 24,
|
|
offset: int = 0,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
rows = db.execute(
|
|
text("""
|
|
SELECT v.id, v.youtube_video_id, v.title, v.thumbnail_url,
|
|
v.duration_seconds, v.published_at,
|
|
c.id AS channel_id, c.name AS channel_name, c.youtube_channel_id AS channel_youtube_id,
|
|
COALESCE(uv.downloaded, 0) AS is_downloaded,
|
|
COALESCE(uv.watched, 0) AS is_watched,
|
|
COALESCE(uv.queued, 0) AS queued
|
|
FROM videos v
|
|
JOIN channels c ON v.channel_id = c.id
|
|
JOIN user_channels uc ON c.id = uc.channel_id AND uc.user_id = :user_id AND uc.status = 'followed'
|
|
LEFT JOIN user_videos uv ON v.id = uv.video_id AND uv.user_id = :user_id
|
|
ORDER BY v.published_at DESC
|
|
LIMIT :limit OFFSET :offset
|
|
"""),
|
|
{"user_id": current_user.id, "limit": limit, "offset": offset},
|
|
).mappings().all()
|
|
return [VideoOut(**dict(r)) for r in rows]
|
|
|
|
|
|
@router.post("/sync-all", status_code=202)
|
|
def sync_all_channels(
|
|
background_tasks: BackgroundTasks,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
# Only sync channels not touched in the last 6 hours to avoid hammering YouTube
|
|
channels = db.execute(
|
|
text("""
|
|
SELECT c.id FROM channels c
|
|
JOIN user_channels uc ON c.id = uc.channel_id
|
|
WHERE uc.user_id = :uid AND uc.status = 'followed'
|
|
AND (c.crawled_at IS NULL OR c.crawled_at < datetime('now', '-6 hours'))
|
|
ORDER BY COALESCE(c.crawled_at, '1970-01-01') ASC
|
|
"""),
|
|
{"uid": current_user.id},
|
|
).mappings().all()
|
|
|
|
if channels:
|
|
ids = [row["id"] for row in channels]
|
|
background_tasks.add_task(_index_channels_batch, ids, current_user.id)
|
|
background_tasks.add_task(_discovery_task, current_user.id)
|
|
|
|
background_tasks.add_task(_enrich_missing_task, 30)
|
|
|
|
return {"indexing": len(channels)}
|
|
|
|
|
|
@router.get("/tasks")
|
|
def get_active_tasks(current_user: User = Depends(get_current_user)):
|
|
with _tasks_lock:
|
|
return list(_tasks.values())
|
|
|
|
|
|
@router.post("/mark-seen", status_code=204)
|
|
def mark_channels_seen(
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
db.execute(
|
|
text("UPDATE user_channels SET last_seen_at = :now WHERE user_id = :uid AND status = 'followed'"),
|
|
{"now": datetime.utcnow(), "uid": current_user.id},
|
|
)
|
|
db.commit()
|
|
|
|
|
|
@router.get("", response_model=list[ChannelOut])
|
|
def list_channels(
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
uid = current_user.id
|
|
|
|
# Step 1 — channel rows + user_channel metadata (fast, no video stats)
|
|
ch_rows = db.execute(
|
|
text("""
|
|
SELECT c.id, c.youtube_channel_id, c.name, c.description,
|
|
c.thumbnail_url, c.banner_url, c.subscriber_count, c.crawled_at,
|
|
uc.status, uc.auto_download, uc.muted_until, uc.notes, uc.last_seen_at
|
|
FROM channels c
|
|
JOIN user_channels uc ON c.id = uc.channel_id
|
|
WHERE uc.user_id = :uid AND uc.status = 'followed'
|
|
"""),
|
|
{"uid": uid},
|
|
).mappings().all()
|
|
|
|
if not ch_rows:
|
|
return []
|
|
|
|
id_csv = ",".join(str(r["id"]) for r in ch_rows)
|
|
last_seen = {r["id"]: r["last_seen_at"] for r in ch_rows}
|
|
|
|
# Step 2 — aggregated video stats for all channels in one query
|
|
vstats = {
|
|
r["channel_id"]: r
|
|
for r in db.execute(
|
|
text(f"""
|
|
SELECT v.channel_id,
|
|
COUNT(*) AS video_count,
|
|
MAX(v.published_at) AS last_published_at,
|
|
julianday(MAX(v.published_at)) - julianday(MIN(v.published_at)) AS date_span_days,
|
|
SUM(CASE WHEN COALESCE(uv.watched, 0) = 0 THEN 1 ELSE 0 END) AS unwatched_count,
|
|
SUM(CASE WHEN uv.watched = 1 THEN 1 ELSE 0 END) AS watched_count,
|
|
SUM(CASE WHEN uv.downloaded = 1 THEN 1 ELSE 0 END) AS downloaded_count
|
|
FROM videos v
|
|
LEFT JOIN user_videos uv ON uv.video_id = v.id AND uv.user_id = :uid
|
|
WHERE v.channel_id IN ({id_csv})
|
|
GROUP BY v.channel_id
|
|
"""),
|
|
{"uid": uid},
|
|
).mappings().all()
|
|
}
|
|
|
|
# Step 3 — new-video count per channel (videos indexed after last_seen_at)
|
|
new_counts = {
|
|
r["channel_id"]: r["new_count"]
|
|
for r in db.execute(
|
|
text(f"""
|
|
SELECT v.channel_id, COUNT(*) AS new_count
|
|
FROM videos v
|
|
JOIN user_channels uc
|
|
ON uc.channel_id = v.channel_id
|
|
AND uc.user_id = :uid
|
|
WHERE v.channel_id IN ({id_csv})
|
|
AND (uc.last_seen_at IS NULL OR v.indexed_at > uc.last_seen_at)
|
|
GROUP BY v.channel_id
|
|
"""),
|
|
{"uid": uid},
|
|
).mappings().all()
|
|
}
|
|
|
|
# Step 4 — latest video id + title per channel (derived-table join, no correlated subquery)
|
|
latest = {
|
|
r["channel_id"]: r
|
|
for r in db.execute(
|
|
text(f"""
|
|
SELECT v.channel_id,
|
|
v.youtube_video_id AS latest_video_id,
|
|
v.title AS latest_video_title
|
|
FROM videos v
|
|
JOIN (
|
|
SELECT channel_id, MAX(published_at) AS max_pub
|
|
FROM videos
|
|
WHERE channel_id IN ({id_csv})
|
|
GROUP BY channel_id
|
|
) m ON v.channel_id = m.channel_id AND v.published_at = m.max_pub
|
|
GROUP BY v.channel_id
|
|
"""),
|
|
).mappings().all()
|
|
}
|
|
|
|
# Merge and build response
|
|
result = []
|
|
for r in ch_rows:
|
|
cid = r["id"]
|
|
vs = vstats.get(cid) or {}
|
|
vc = vs.get("video_count") or 0
|
|
newest = vs.get("last_published_at")
|
|
span = vs.get("date_span_days")
|
|
freq = (span / (vc - 1.0)) if (vc >= 2 and span is not None) else None
|
|
|
|
result.append(ChannelOut(
|
|
id=cid,
|
|
youtube_channel_id=r["youtube_channel_id"],
|
|
name=r["name"],
|
|
description=r["description"],
|
|
thumbnail_url=r["thumbnail_url"],
|
|
banner_url=r.get("banner_url"),
|
|
subscriber_count=r.get("subscriber_count"),
|
|
crawled_at=r.get("crawled_at"),
|
|
status=r["status"],
|
|
auto_download=r.get("auto_download"),
|
|
muted_until=r.get("muted_until"),
|
|
notes=r.get("notes") or "",
|
|
video_count=vc,
|
|
last_published_at=newest,
|
|
unwatched_count=vs.get("unwatched_count") or 0,
|
|
watched_count=vs.get("watched_count") or 0,
|
|
downloaded_count=vs.get("downloaded_count") or 0,
|
|
new_count=new_counts.get(cid, 0),
|
|
latest_video_id=latest.get(cid, {}).get("latest_video_id"),
|
|
latest_video_title=latest.get(cid, {}).get("latest_video_title"),
|
|
upload_frequency_days=freq,
|
|
))
|
|
|
|
result.sort(key=lambda c: c.last_published_at or datetime.min, reverse=True)
|
|
return result
|
|
|
|
|
|
# ── Channel Groups (must be before /{channel_id} to avoid route shadowing) ───
|
|
|
|
@router.get("/groups", response_model=list[ChannelGroupOut])
|
|
def list_groups(
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
groups = db.query(ChannelGroup).filter_by(user_id=current_user.id).all()
|
|
result = []
|
|
for g in groups:
|
|
members = db.query(ChannelGroupMember).filter_by(group_id=g.id).all()
|
|
result.append(ChannelGroupOut(id=g.id, name=g.name, channel_ids=[m.channel_id for m in members]))
|
|
return result
|
|
|
|
|
|
@router.post("/groups", response_model=ChannelGroupOut, status_code=201)
|
|
def create_group(
|
|
body: dict,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
name = (body.get("name") or "").strip()
|
|
if not name:
|
|
raise HTTPException(status_code=400, detail="name required")
|
|
g = ChannelGroup(user_id=current_user.id, name=name)
|
|
db.add(g)
|
|
db.commit()
|
|
db.refresh(g)
|
|
return ChannelGroupOut(id=g.id, name=g.name, channel_ids=[])
|
|
|
|
|
|
@router.delete("/groups/{group_id}", status_code=204)
|
|
def delete_group(
|
|
group_id: int,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
g = db.query(ChannelGroup).filter_by(id=group_id, user_id=current_user.id).first()
|
|
if not g:
|
|
raise HTTPException(status_code=404, detail="Group not found")
|
|
db.delete(g)
|
|
db.commit()
|
|
|
|
|
|
@router.patch("/groups/{group_id}", response_model=ChannelGroupOut)
|
|
def rename_group(
|
|
group_id: int,
|
|
body: dict,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
g = db.query(ChannelGroup).filter_by(id=group_id, user_id=current_user.id).first()
|
|
if not g:
|
|
raise HTTPException(status_code=404, detail="Group not found")
|
|
name = (body.get("name") or "").strip()
|
|
if name:
|
|
g.name = name
|
|
db.commit()
|
|
members = db.query(ChannelGroupMember).filter_by(group_id=g.id).all()
|
|
return ChannelGroupOut(id=g.id, name=g.name, channel_ids=[m.channel_id for m in members])
|
|
|
|
|
|
@router.post("/groups/{group_id}/channels/{channel_id}", status_code=204)
|
|
def add_channel_to_group(
|
|
group_id: int,
|
|
channel_id: int,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
g = db.query(ChannelGroup).filter_by(id=group_id, user_id=current_user.id).first()
|
|
if not g:
|
|
raise HTTPException(status_code=404, detail="Group not found")
|
|
existing = db.query(ChannelGroupMember).filter_by(group_id=group_id, channel_id=channel_id).first()
|
|
if not existing:
|
|
db.add(ChannelGroupMember(group_id=group_id, channel_id=channel_id))
|
|
db.commit()
|
|
|
|
|
|
@router.delete("/groups/{group_id}/channels/{channel_id}", status_code=204)
|
|
def remove_channel_from_group_route(
|
|
group_id: int,
|
|
channel_id: int,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
m = db.query(ChannelGroupMember).filter_by(group_id=group_id, channel_id=channel_id).first()
|
|
if m:
|
|
db.delete(m)
|
|
db.commit()
|
|
|
|
|
|
class BulkChannelBody(BaseModel):
|
|
channel_ids: list[int]
|
|
action: str # "mute" | "unmute" | "unfollow"
|
|
|
|
|
|
@router.post("/bulk-action", status_code=200)
|
|
def bulk_channel_action(
|
|
body: BulkChannelBody,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
if not body.channel_ids:
|
|
return {"ok": True}
|
|
placeholders = ",".join(str(int(i)) for i in body.channel_ids)
|
|
if body.action == "mute":
|
|
db.execute(
|
|
text(f"""
|
|
UPDATE user_channels SET muted_until = :until
|
|
WHERE user_id = :user_id AND channel_id IN ({placeholders})
|
|
"""),
|
|
{"until": datetime.utcnow() + timedelta(days=30), "user_id": current_user.id},
|
|
)
|
|
elif body.action == "unmute":
|
|
db.execute(
|
|
text(f"UPDATE user_channels SET muted_until = NULL WHERE user_id = :user_id AND channel_id IN ({placeholders})"),
|
|
{"user_id": current_user.id},
|
|
)
|
|
elif body.action == "unfollow":
|
|
db.execute(
|
|
text(f"DELETE FROM user_channels WHERE user_id = :user_id AND channel_id IN ({placeholders})"),
|
|
{"user_id": current_user.id},
|
|
)
|
|
db.commit()
|
|
return {"ok": True}
|
|
|
|
|
|
@router.get("/{channel_id}", response_model=ChannelOut)
|
|
def get_channel(
|
|
channel_id: int,
|
|
background_tasks: BackgroundTasks,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
row = db.execute(
|
|
text("""
|
|
SELECT c.*, uc.status, uc.auto_download, uc.muted_until,
|
|
(SELECT COUNT(*) FROM videos WHERE channel_id = c.id) AS video_count,
|
|
(SELECT MAX(v.published_at) FROM videos v WHERE v.channel_id = c.id) AS last_published_at,
|
|
(SELECT COUNT(*) FROM videos v
|
|
LEFT JOIN user_videos uv ON v.id = uv.video_id AND uv.user_id = :user_id
|
|
WHERE v.channel_id = c.id AND COALESCE(uv.watched, 0) = 0) AS unwatched_count,
|
|
(SELECT COUNT(*) FROM videos v
|
|
JOIN user_videos uv ON v.id = uv.video_id AND uv.user_id = :user_id
|
|
WHERE v.channel_id = c.id AND uv.watched = 1) AS watched_count,
|
|
(SELECT COUNT(*) FROM videos v
|
|
JOIN user_videos uv ON v.id = uv.video_id AND uv.user_id = :user_id
|
|
WHERE v.channel_id = c.id AND uv.downloaded = 1) AS downloaded_count,
|
|
0 AS new_count,
|
|
(SELECT v.youtube_video_id FROM videos v
|
|
WHERE v.channel_id = c.id ORDER BY v.published_at DESC LIMIT 1) AS latest_video_id,
|
|
(SELECT v.title FROM videos v
|
|
WHERE v.channel_id = c.id ORDER BY v.published_at DESC LIMIT 1) AS latest_video_title
|
|
FROM channels c
|
|
LEFT JOIN user_channels uc ON c.id = uc.channel_id AND uc.user_id = :user_id
|
|
WHERE c.id = :channel_id
|
|
"""),
|
|
{"user_id": current_user.id, "channel_id": channel_id},
|
|
).mappings().first()
|
|
if not row:
|
|
raise HTTPException(status_code=404, detail="Channel not found")
|
|
|
|
# Re-index in the background if stale (not crawled in the last hour)
|
|
stale = True
|
|
try:
|
|
crawled_at_raw = row.get("crawled_at")
|
|
if crawled_at_raw:
|
|
crawled_at = (
|
|
crawled_at_raw if isinstance(crawled_at_raw, datetime)
|
|
else datetime.fromisoformat(str(crawled_at_raw))
|
|
)
|
|
stale = (datetime.utcnow() - crawled_at).total_seconds() > 3600
|
|
except Exception:
|
|
pass
|
|
if stale:
|
|
background_tasks.add_task(_index_channel_task, channel_id, current_user.id)
|
|
|
|
return ChannelOut(**dict(row))
|
|
|
|
|
|
@router.get("/{channel_id}/videos", response_model=list[VideoOut])
|
|
def get_channel_videos(
|
|
channel_id: int,
|
|
sort: str = "newest",
|
|
offset: int = 0,
|
|
limit: int = 60,
|
|
q: str = "",
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
_get_channel_or_404(db, channel_id)
|
|
params: dict = {"user_id": current_user.id, "channel_id": channel_id, "limit": limit, "offset": offset}
|
|
q_clause = ""
|
|
if q.strip():
|
|
q_clause = "AND (v.title LIKE :q OR v.description LIKE :q)"
|
|
params["q"] = f"%{q.strip()}%"
|
|
|
|
order = {
|
|
"newest": "v.published_at DESC NULLS LAST",
|
|
"oldest": "v.published_at ASC NULLS LAST",
|
|
"title": "v.title ASC",
|
|
"unwatched":"COALESCE(uv.watched, 0) ASC, v.published_at DESC NULLS LAST",
|
|
"popular": "v.view_count DESC NULLS LAST",
|
|
}.get(sort, "v.published_at DESC NULLS LAST")
|
|
view_count_clause = "AND v.view_count IS NOT NULL" if sort == "popular" else ""
|
|
rows = db.execute(
|
|
text(f"""
|
|
SELECT v.id, v.youtube_video_id, v.title, v.thumbnail_url,
|
|
v.duration_seconds, v.published_at, v.view_count,
|
|
COALESCE(uv.downloaded, 0) AS is_downloaded,
|
|
COALESCE(uv.watched, 0) AS is_watched
|
|
FROM videos v
|
|
LEFT JOIN user_videos uv ON v.id = uv.video_id AND uv.user_id = :user_id
|
|
WHERE v.channel_id = :channel_id {view_count_clause} {q_clause}
|
|
ORDER BY {order}
|
|
LIMIT :limit OFFSET :offset
|
|
"""),
|
|
params,
|
|
).mappings().all()
|
|
return [VideoOut(**dict(r)) for r in rows]
|
|
|
|
|
|
@router.post("/{channel_id}/fetch-popular", status_code=status.HTTP_202_ACCEPTED)
|
|
def fetch_popular_videos(
|
|
channel_id: int,
|
|
background_tasks: BackgroundTasks,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""Fetch the channel's most popular videos from YouTube and index them."""
|
|
channel = _get_channel_or_404(db, channel_id)
|
|
background_tasks.add_task(_fetch_popular_task, channel_id, channel.youtube_channel_id, channel.name or "")
|
|
return {"detail": "Fetching popular videos"}
|
|
|
|
|
|
def _fetch_popular_task(channel_id: int, youtube_channel_id: str, channel_name: str = ""):
|
|
"""Half-and-half popular fetch.
|
|
|
|
Phase 1 (fast): flat-playlist crawl of the full channel → store any
|
|
new videos in DB (title, duration, thumbnail). No individual requests.
|
|
|
|
Phase 2 (parallel): for every video now in DB for this channel,
|
|
fetch its watch page to get real view_count + published_at.
|
|
Prioritises videos missing view_count; caps at 200 per run.
|
|
Popular tab then sorts by view_count DESC.
|
|
"""
|
|
from ..database import SessionLocal
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
|
|
task_id = f"popular-{channel_id}"
|
|
label = f"Popular fetch — {channel_name}" if channel_name else "Popular fetch"
|
|
|
|
# Phase 1 — flat-playlist: crawl all channel videos quickly
|
|
with _tasks_lock:
|
|
_tasks[task_id] = {
|
|
"id": task_id,
|
|
"label": label,
|
|
"phase": "Crawling channel…",
|
|
"total": 0,
|
|
"done": 0,
|
|
"started_at": datetime.utcnow().isoformat(),
|
|
}
|
|
|
|
if youtube_channel_id.startswith("@"):
|
|
url = f"https://www.youtube.com/{youtube_channel_id}/videos"
|
|
else:
|
|
url = f"https://www.youtube.com/channel/{youtube_channel_id}/videos"
|
|
|
|
stdout, _, _ = ytdlp._run([
|
|
"yt-dlp", url,
|
|
"--dump-json", "--flat-playlist",
|
|
"--quiet",
|
|
*ytdlp._cookie_args(),
|
|
], timeout=120)
|
|
|
|
flat_entries = []
|
|
for line in stdout.splitlines():
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
info = json.loads(line)
|
|
yt_id = info.get("id")
|
|
if yt_id:
|
|
flat_entries.append({
|
|
"id": yt_id,
|
|
"title": info.get("title", ""),
|
|
"duration": info.get("duration"),
|
|
})
|
|
except json.JSONDecodeError:
|
|
continue
|
|
|
|
# Store any new videos from the flat crawl
|
|
if flat_entries:
|
|
db = SessionLocal()
|
|
try:
|
|
channel = db.query(Channel).filter_by(id=channel_id).first()
|
|
if channel:
|
|
for entry in flat_entries:
|
|
if not db.query(Video).filter_by(youtube_video_id=entry["id"]).first():
|
|
try:
|
|
db.add(Video(
|
|
youtube_video_id=entry["id"],
|
|
channel_id=channel_id,
|
|
title=entry["title"],
|
|
thumbnail_url=ytdlp._stable_thumbnail(entry["id"]),
|
|
duration_seconds=entry["duration"],
|
|
tags="[]",
|
|
))
|
|
db.commit()
|
|
except Exception:
|
|
db.rollback()
|
|
finally:
|
|
db.close()
|
|
|
|
# Phase 2 — individual fetches for view_count, prioritising missing ones
|
|
db = SessionLocal()
|
|
try:
|
|
rows = db.execute(
|
|
text("""
|
|
SELECT youtube_video_id FROM videos
|
|
WHERE channel_id = :cid
|
|
ORDER BY (view_count IS NULL) DESC, RANDOM()
|
|
LIMIT 200
|
|
"""),
|
|
{"cid": channel_id},
|
|
).mappings().all()
|
|
video_ids = [r["youtube_video_id"] for r in rows]
|
|
finally:
|
|
db.close()
|
|
|
|
if not video_ids:
|
|
with _tasks_lock:
|
|
_tasks.pop(task_id, None)
|
|
return
|
|
|
|
with _tasks_lock:
|
|
if task_id in _tasks:
|
|
_tasks[task_id]["phase"] = "Enriching view counts…"
|
|
_tasks[task_id]["total"] = len(video_ids)
|
|
_tasks[task_id]["done"] = 0
|
|
|
|
results = {}
|
|
try:
|
|
with ThreadPoolExecutor(max_workers=3) as pool:
|
|
futures = {pool.submit(ytdlp.fetch_video_metadata, vid): vid for vid in video_ids}
|
|
for future in as_completed(futures):
|
|
vid = futures[future]
|
|
try:
|
|
results[vid] = future.result()
|
|
except Exception:
|
|
pass
|
|
with _tasks_lock:
|
|
if task_id in _tasks:
|
|
_tasks[task_id]["done"] += 1
|
|
finally:
|
|
with _tasks_lock:
|
|
_tasks.pop(task_id, None)
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
for yt_id, meta in results.items():
|
|
if not meta:
|
|
continue
|
|
try:
|
|
vid = db.query(Video).filter_by(youtube_video_id=yt_id).first()
|
|
if vid:
|
|
if meta.get("view_count") is not None:
|
|
vid.view_count = meta["view_count"]
|
|
if not vid.published_at and meta.get("published_at"):
|
|
vid.published_at = meta["published_at"]
|
|
db.commit()
|
|
except Exception:
|
|
db.rollback()
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@router.post("/{channel_id}/search", status_code=status.HTTP_202_ACCEPTED)
|
|
def search_channel_youtube(
|
|
channel_id: int,
|
|
q: str,
|
|
background_tasks: BackgroundTasks,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""Search YouTube within this channel and index matching videos."""
|
|
channel = _get_channel_or_404(db, channel_id)
|
|
background_tasks.add_task(_search_channel_task, channel_id, channel.youtube_channel_id, q, current_user.id)
|
|
return {"detail": "Search started"}
|
|
|
|
|
|
def _search_channel_task(channel_id: int, youtube_channel_id: str, q: str, user_id: int):
|
|
"""Fetch videos matching q from YouTube for this channel and index them."""
|
|
from ..database import SessionLocal
|
|
from urllib.parse import quote
|
|
db = SessionLocal()
|
|
try:
|
|
url = f"https://www.youtube.com/channel/{youtube_channel_id}/search?query={quote(q)}"
|
|
result = ytdlp.fetch_channel_metadata(youtube_channel_id, max_videos=100)
|
|
if not result:
|
|
return
|
|
# Filter results by query match (yt-dlp fetches all; we filter titles locally)
|
|
q_lower = q.lower()
|
|
matched = [v for v in result.get("videos", []) if q_lower in (v.get("title") or "").lower()]
|
|
if not matched:
|
|
matched = result.get("videos", [])[:30]
|
|
channel = db.query(Channel).filter_by(id=channel_id).first()
|
|
if not channel:
|
|
return
|
|
for vdata in matched:
|
|
yt_id = vdata.get("youtube_video_id")
|
|
if not yt_id:
|
|
continue
|
|
existing = db.query(Video).filter_by(youtube_video_id=yt_id).first()
|
|
if not existing:
|
|
db.add(Video(
|
|
youtube_video_id=yt_id,
|
|
channel_id=channel.id,
|
|
title=vdata.get("title", ""),
|
|
description=vdata.get("description"),
|
|
thumbnail_url=vdata.get("thumbnail_url"),
|
|
duration_seconds=vdata.get("duration_seconds"),
|
|
published_at=vdata.get("published_at"),
|
|
tags=vdata.get("tags"),
|
|
category=vdata.get("category"),
|
|
))
|
|
db.commit()
|
|
except Exception:
|
|
db.rollback()
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@router.get("/{channel_id}/random", response_model=VideoOut)
|
|
def get_random_channel_video(
|
|
channel_id: int,
|
|
unwatched_only: bool = True,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
_get_channel_or_404(db, channel_id)
|
|
unwatched_clause = "AND COALESCE(uv.watched, 0) = 0" if unwatched_only else ""
|
|
row = db.execute(
|
|
text(f"""
|
|
SELECT v.id, v.youtube_video_id, v.title, v.thumbnail_url,
|
|
v.duration_seconds, v.published_at, v.view_count,
|
|
c.id AS channel_id, c.name AS channel_name, c.youtube_channel_id AS channel_youtube_id,
|
|
COALESCE(uv.downloaded, 0) AS is_downloaded,
|
|
COALESCE(uv.watched, 0) AS is_watched,
|
|
COALESCE(uv.queued, 0) AS queued
|
|
FROM videos v
|
|
JOIN channels c ON v.channel_id = c.id
|
|
LEFT JOIN user_videos uv ON v.id = uv.video_id AND uv.user_id = :user_id
|
|
WHERE v.channel_id = :channel_id {unwatched_clause}
|
|
ORDER BY RANDOM()
|
|
LIMIT 1
|
|
"""),
|
|
{"user_id": current_user.id, "channel_id": channel_id},
|
|
).mappings().first()
|
|
if not row:
|
|
raise HTTPException(status_code=404, detail="No videos found")
|
|
return VideoOut(**dict(row))
|
|
|
|
|
|
@router.get("/{channel_id}/in-progress", response_model=list[VideoOut])
|
|
def get_channel_in_progress(
|
|
channel_id: int,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
_get_channel_or_404(db, channel_id)
|
|
rows = db.execute(
|
|
text("""
|
|
SELECT v.id, v.youtube_video_id, v.title, v.thumbnail_url,
|
|
v.duration_seconds, v.published_at, v.view_count,
|
|
c.id AS channel_id, c.name AS channel_name, c.youtube_channel_id AS channel_youtube_id,
|
|
COALESCE(uv.downloaded, 0) AS is_downloaded,
|
|
COALESCE(uv.watched, 0) AS is_watched,
|
|
COALESCE(uv.queued, 0) AS queued
|
|
FROM videos v
|
|
JOIN channels c ON v.channel_id = c.id
|
|
JOIN user_videos uv ON v.id = uv.video_id AND uv.user_id = :user_id
|
|
WHERE v.channel_id = :channel_id
|
|
AND uv.watch_progress_seconds > 30
|
|
AND COALESCE(uv.watched, 0) = 0
|
|
ORDER BY uv.watch_progress_seconds DESC
|
|
LIMIT 6
|
|
"""),
|
|
{"user_id": current_user.id, "channel_id": channel_id},
|
|
).mappings().all()
|
|
return [VideoOut(**dict(r)) for r in rows]
|
|
|
|
|
|
@router.post("/{channel_id}/follow", status_code=status.HTTP_204_NO_CONTENT)
|
|
def follow_channel(
|
|
channel_id: int,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
_get_channel_or_404(db, channel_id)
|
|
uc = db.query(UserChannel).filter_by(user_id=current_user.id, channel_id=channel_id).first()
|
|
if uc:
|
|
uc.status = "followed"
|
|
else:
|
|
db.add(UserChannel(user_id=current_user.id, channel_id=channel_id, status="followed"))
|
|
db.commit()
|
|
|
|
|
|
@router.delete("/{channel_id}/follow", status_code=status.HTTP_204_NO_CONTENT)
|
|
def unfollow_channel(
|
|
channel_id: int,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
uc = db.query(UserChannel).filter_by(user_id=current_user.id, channel_id=channel_id).first()
|
|
if uc:
|
|
db.delete(uc)
|
|
db.commit()
|
|
|
|
|
|
@router.patch("/{channel_id}/auto-download", status_code=200)
|
|
def set_channel_auto_download(
|
|
channel_id: int,
|
|
body: dict,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
uc = db.query(UserChannel).filter_by(user_id=current_user.id, channel_id=channel_id).first()
|
|
if not uc:
|
|
raise HTTPException(status_code=404, detail="Not following this channel")
|
|
value = body.get("auto_download") # True / False / None
|
|
uc.auto_download = value
|
|
db.commit()
|
|
return {"auto_download": uc.auto_download}
|
|
|
|
|
|
@router.post("/{channel_id}/index", status_code=status.HTTP_202_ACCEPTED)
|
|
def index_channel(
|
|
channel_id: int,
|
|
background_tasks: BackgroundTasks,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
_get_channel_or_404(db, channel_id)
|
|
background_tasks.add_task(_index_channel_task, channel_id, current_user.id, 100)
|
|
return {"detail": "Indexing started"}
|
|
|
|
|
|
@router.post("/{channel_id}/index-full", status_code=status.HTTP_202_ACCEPTED)
|
|
def index_channel_full(
|
|
channel_id: int,
|
|
background_tasks: BackgroundTasks,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
_get_channel_or_404(db, channel_id)
|
|
background_tasks.add_task(_index_channel_task, channel_id, current_user.id, 0)
|
|
return {"detail": "Full index started"}
|
|
|
|
|
|
@router.post("/{channel_id}/explore", status_code=status.HTTP_202_ACCEPTED)
|
|
def explore_channel_older(
|
|
channel_id: int,
|
|
page: int = 2,
|
|
background_tasks: BackgroundTasks = None,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""Fetch a page of older videos from this channel (page 1 = newest 30, page 2 = next 100, etc.)."""
|
|
_get_channel_or_404(db, channel_id)
|
|
start = 1 if page <= 1 else (30 + (page - 2) * 100 + 1)
|
|
background_tasks.add_task(_index_channel_explore_task, channel_id, current_user.id, start, 100)
|
|
return {"detail": f"Fetching older videos (page {page})", "start": start}
|
|
|
|
|
|
def _index_channel_explore_task(channel_id: int, user_id: int, start_video: int, count: int):
|
|
from ..database import SessionLocal
|
|
db = SessionLocal()
|
|
try:
|
|
channel = db.query(Channel).filter_by(id=channel_id).first()
|
|
if not channel:
|
|
return
|
|
result = ytdlp.fetch_channel_metadata(channel.youtube_channel_id, max_videos=count, start_video=start_video)
|
|
if not result:
|
|
return
|
|
for vdata in result.get("videos", []):
|
|
yt_id = vdata.get("youtube_video_id")
|
|
if not yt_id:
|
|
continue
|
|
if not db.query(Video).filter_by(youtube_video_id=yt_id).first():
|
|
db.add(Video(
|
|
youtube_video_id=yt_id,
|
|
channel_id=channel.id,
|
|
title=vdata.get("title", ""),
|
|
description=vdata.get("description"),
|
|
thumbnail_url=vdata.get("thumbnail_url"),
|
|
duration_seconds=vdata.get("duration_seconds"),
|
|
published_at=vdata.get("published_at"),
|
|
tags=vdata.get("tags"),
|
|
category=vdata.get("category"),
|
|
))
|
|
db.commit()
|
|
except Exception:
|
|
db.rollback()
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@router.post("/follow-bulk", status_code=200)
|
|
def follow_bulk(
|
|
body: dict,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""Follow a large list of channel handles/IDs without hitting YouTube.
|
|
|
|
Creates stub Channel records for unknowns and UserChannel rows immediately.
|
|
Metadata (name, thumbnail, videos) fills in when the user hits Sync All.
|
|
"""
|
|
handles = body.get("handles", [])
|
|
if not handles or not isinstance(handles, list):
|
|
raise HTTPException(status_code=400, detail="handles list required")
|
|
|
|
followed = 0
|
|
already = 0
|
|
created = 0
|
|
|
|
for handle in handles:
|
|
handle = str(handle).strip()
|
|
if not handle:
|
|
continue
|
|
|
|
channel = db.query(Channel).filter_by(youtube_channel_id=handle).first()
|
|
if not channel:
|
|
# Stub — name defaults to handle, filled in on next index
|
|
channel = Channel(
|
|
youtube_channel_id=handle,
|
|
name=handle.lstrip("@"),
|
|
)
|
|
db.add(channel)
|
|
db.flush()
|
|
created += 1
|
|
|
|
uc = db.query(UserChannel).filter_by(
|
|
user_id=current_user.id, channel_id=channel.id
|
|
).first()
|
|
if uc:
|
|
if uc.status != "followed":
|
|
uc.status = "followed"
|
|
followed += 1
|
|
else:
|
|
already += 1
|
|
else:
|
|
db.add(UserChannel(
|
|
user_id=current_user.id,
|
|
channel_id=channel.id,
|
|
status="followed",
|
|
))
|
|
followed += 1
|
|
|
|
db.commit()
|
|
return {"followed": followed, "already_following": already, "new_channels": created}
|
|
|
|
|
|
@router.patch("/{channel_id}/notes", status_code=200)
|
|
def update_channel_notes(
|
|
channel_id: int,
|
|
body: dict,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
uc = db.query(UserChannel).filter_by(user_id=current_user.id, channel_id=channel_id).first()
|
|
if not uc:
|
|
raise HTTPException(status_code=404, detail="Not following this channel")
|
|
uc.notes = body.get("notes", "") or ""
|
|
db.commit()
|
|
return {"ok": True}
|
|
|
|
|
|
@router.post("/{channel_id}/mute", status_code=204)
|
|
def mute_channel(
|
|
channel_id: int,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
uc = db.query(UserChannel).filter_by(user_id=current_user.id, channel_id=channel_id).first()
|
|
if not uc:
|
|
raise HTTPException(status_code=404, detail="Not following this channel")
|
|
uc.muted_until = datetime.utcnow() + timedelta(days=30)
|
|
db.commit()
|
|
|
|
|
|
@router.delete("/{channel_id}/mute", status_code=204)
|
|
def unmute_channel(
|
|
channel_id: int,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
uc = db.query(UserChannel).filter_by(user_id=current_user.id, channel_id=channel_id).first()
|
|
if uc:
|
|
uc.muted_until = None
|
|
db.commit()
|
|
|
|
|
|
@router.post("/follow-by-url", status_code=status.HTTP_201_CREATED)
|
|
def follow_by_url(
|
|
body: dict,
|
|
background_tasks: BackgroundTasks,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
yt_channel_id = body.get("youtube_channel_id") or body.get("channel_id")
|
|
if not yt_channel_id:
|
|
raise HTTPException(status_code=400, detail="youtube_channel_id required")
|
|
|
|
channel = db.query(Channel).filter_by(youtube_channel_id=yt_channel_id).first()
|
|
if not channel:
|
|
meta = ytdlp.fetch_channel_metadata(yt_channel_id, max_videos=30)
|
|
if not meta or not meta.get("channel"):
|
|
raise HTTPException(status_code=404, detail="Channel not found on YouTube")
|
|
ch_data = meta["channel"]
|
|
channel = Channel(**{k: v for k, v in ch_data.items() if hasattr(Channel, k)})
|
|
channel.crawled_at = datetime.utcnow()
|
|
db.add(channel)
|
|
db.flush()
|
|
|
|
for vdata in meta.get("videos", []):
|
|
yt_id = vdata.get("youtube_video_id")
|
|
if not yt_id:
|
|
continue
|
|
if not db.query(Video).filter_by(youtube_video_id=yt_id).first():
|
|
db.add(Video(
|
|
youtube_video_id=yt_id,
|
|
channel_id=channel.id,
|
|
title=vdata.get("title", ""),
|
|
description=vdata.get("description"),
|
|
thumbnail_url=vdata.get("thumbnail_url"),
|
|
duration_seconds=vdata.get("duration_seconds"),
|
|
published_at=vdata.get("published_at"),
|
|
tags=vdata.get("tags"),
|
|
category=vdata.get("category"),
|
|
))
|
|
db.commit()
|
|
db.refresh(channel)
|
|
|
|
uc = db.query(UserChannel).filter_by(user_id=current_user.id, channel_id=channel.id).first()
|
|
if uc:
|
|
uc.status = "followed"
|
|
else:
|
|
db.add(UserChannel(user_id=current_user.id, channel_id=channel.id, status="followed"))
|
|
db.commit()
|
|
|
|
background_tasks.add_task(_discovery_task, current_user.id)
|
|
|
|
return {"channel_id": channel.id, "name": channel.name}
|