Self-hosted personal YouTube management app. FastAPI + SQLite backend, React + Vite + Tailwind frontend. Dockerfiles and compose included for Portainer deployment. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
333 lines
12 KiB
Python
333 lines
12 KiB
Python
"""Two-tier search: local FTS5 first, yt-dlp live fallback."""
|
|
import json
|
|
import re as _re
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
|
|
from fastapi import APIRouter, Depends, Query
|
|
from pydantic import BaseModel
|
|
from sqlalchemy.orm import Session
|
|
from sqlalchemy import text
|
|
|
|
from ..auth_utils import get_current_user
|
|
from ..database import get_db
|
|
from ..models import User, Video, Channel, UserVideo, SearchHistory, UserTagAffinity
|
|
from ..services import ytdlp
|
|
|
|
router = APIRouter()
|
|
|
|
_STOPWORDS = {
|
|
"the","a","an","is","it","in","on","at","to","of","and","or","for",
|
|
"with","this","that","are","was","be","by","as","from","has","have",
|
|
"will","can","but","not","my","i","you","your","we","how","what",
|
|
"why","when","which","who","more","about","than","do","did","does",
|
|
}
|
|
|
|
def _query_affinity_tags(q: str) -> list[str]:
|
|
words = _re.sub(r"[^\w\s]", "", q.lower()).split()
|
|
return [w for w in words if len(w) >= 3 and w not in _STOPWORDS]
|
|
|
|
|
|
def _log_search(db: Session, user_id: int, q: str):
|
|
"""Persist search query and bump affinity scores for its meaningful terms."""
|
|
db.add(SearchHistory(user_id=user_id, query=q.strip()))
|
|
for tag in _query_affinity_tags(q):
|
|
existing = db.query(UserTagAffinity).filter_by(user_id=user_id, tag=tag).first()
|
|
if existing:
|
|
existing.score = min(existing.score + 0.3, 50.0)
|
|
existing.updated_at = datetime.utcnow()
|
|
else:
|
|
db.add(UserTagAffinity(user_id=user_id, tag=tag, score=0.3,
|
|
updated_at=datetime.utcnow()))
|
|
try:
|
|
db.commit()
|
|
except Exception:
|
|
db.rollback()
|
|
|
|
|
|
class VideoResult(BaseModel):
|
|
youtube_video_id: str
|
|
title: str
|
|
thumbnail_url: Optional[str]
|
|
duration_seconds: Optional[int]
|
|
channel_name: str
|
|
channel_youtube_id: Optional[str]
|
|
published_at: Optional[datetime]
|
|
is_local: bool
|
|
is_downloaded: bool
|
|
is_watched: bool
|
|
local_video_id: Optional[int]
|
|
|
|
model_config = {"from_attributes": True}
|
|
|
|
|
|
class ChannelResult(BaseModel):
|
|
youtube_channel_id: str
|
|
name: str
|
|
thumbnail_url: Optional[str]
|
|
description: Optional[str]
|
|
is_followed: bool
|
|
local_channel_id: Optional[int]
|
|
subscriber_count: Optional[int] = None
|
|
video_count: Optional[int] = None
|
|
|
|
model_config = {"from_attributes": True}
|
|
|
|
|
|
class SearchResponse(BaseModel):
|
|
videos: list[VideoResult]
|
|
channels: list[ChannelResult]
|
|
source: str # "local" | "live" | "mixed"
|
|
query: str
|
|
|
|
|
|
def _sanitize_fts(q: str) -> str:
|
|
"""Strip FTS5 syntax characters and return a safe multi-word query."""
|
|
clean = _re.sub(r'["\(\)\[\]\{\}\*\+\?\!\^\~\-]', ' ', q)
|
|
words = [w for w in clean.split() if w.upper() not in ("AND", "OR", "NOT")]
|
|
if not words:
|
|
return '""'
|
|
return " ".join(words)
|
|
|
|
|
|
def _local_video_search(db: Session, user_id: int, q: str, limit: int = 100) -> list[dict]:
|
|
try:
|
|
rows = db.execute(
|
|
text("""
|
|
SELECT
|
|
v.id, v.youtube_video_id, v.title, v.thumbnail_url,
|
|
v.duration_seconds, v.published_at,
|
|
c.name AS channel_name, c.youtube_channel_id,
|
|
COALESCE(uv.downloaded, 0) AS is_downloaded,
|
|
COALESCE(uv.watched, 0) AS is_watched
|
|
FROM videos_fts fts
|
|
JOIN videos v ON fts.rowid = v.id
|
|
LEFT JOIN channels c ON v.channel_id = c.id
|
|
LEFT JOIN user_videos uv ON v.id = uv.video_id AND uv.user_id = :user_id
|
|
WHERE videos_fts MATCH :query
|
|
ORDER BY rank
|
|
LIMIT :limit
|
|
"""),
|
|
{"user_id": user_id, "query": _sanitize_fts(q), "limit": limit},
|
|
).mappings().all()
|
|
except Exception:
|
|
return []
|
|
return [dict(r) for r in rows]
|
|
|
|
|
|
def _local_channel_search(db: Session, user_id: int, q: str, limit: int = 5) -> list[dict]:
|
|
try:
|
|
rows = db.execute(
|
|
text("""
|
|
SELECT
|
|
c.id, c.youtube_channel_id, c.name, c.thumbnail_url, c.description,
|
|
c.subscriber_count,
|
|
CASE WHEN uc.status = 'followed' THEN 1 ELSE 0 END AS is_followed,
|
|
(SELECT COUNT(*) FROM videos WHERE channel_id = c.id) AS video_count
|
|
FROM channels_fts fts
|
|
JOIN channels c ON fts.rowid = c.id
|
|
LEFT JOIN user_channels uc ON c.id = uc.channel_id AND uc.user_id = :user_id
|
|
WHERE channels_fts MATCH :query
|
|
ORDER BY rank
|
|
LIMIT :limit
|
|
"""),
|
|
{"user_id": user_id, "query": _sanitize_fts(q), "limit": limit},
|
|
).mappings().all()
|
|
except Exception:
|
|
return []
|
|
return [dict(r) for r in rows]
|
|
|
|
|
|
def _upsert_channel_from_meta(db: Session, ch: dict) -> Channel:
|
|
existing = db.query(Channel).filter_by(youtube_channel_id=ch["youtube_channel_id"]).first()
|
|
if not existing:
|
|
existing = Channel(**{k: v for k, v in ch.items() if hasattr(Channel, k)})
|
|
db.add(existing)
|
|
db.flush()
|
|
return existing
|
|
|
|
|
|
def _live_search_to_results(
|
|
db: Session, user_id: int, raw: list[dict]
|
|
) -> list[VideoResult]:
|
|
results = []
|
|
for item in raw:
|
|
yt_id = item.get("youtube_video_id")
|
|
if not yt_id:
|
|
continue
|
|
local = db.query(Video).filter_by(youtube_video_id=yt_id).first()
|
|
uv = None
|
|
if local:
|
|
uv = db.query(UserVideo).filter_by(user_id=user_id, video_id=local.id).first()
|
|
|
|
ch = item.get("channel", {}) or {}
|
|
# Prefer the DB date — flat-playlist search results rarely include upload_date
|
|
published_at = (local.published_at if local and local.published_at
|
|
else item.get("published_at"))
|
|
results.append(VideoResult(
|
|
youtube_video_id=yt_id,
|
|
title=item["title"],
|
|
thumbnail_url=item.get("thumbnail_url"),
|
|
duration_seconds=item.get("duration_seconds"),
|
|
channel_name=ch.get("name", ""),
|
|
channel_youtube_id=ch.get("youtube_channel_id"),
|
|
published_at=published_at,
|
|
is_local=local is not None,
|
|
is_downloaded=bool(uv and uv.downloaded),
|
|
is_watched=bool(uv and uv.watched),
|
|
local_video_id=local.id if local else None,
|
|
))
|
|
return results
|
|
|
|
|
|
@router.get("", response_model=SearchResponse)
|
|
def search(
|
|
q: str = Query(..., min_length=1),
|
|
live: bool = Query(False),
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
_log_search(db, current_user.id, q)
|
|
|
|
# Always run local search first
|
|
local_videos = _local_video_search(db, current_user.id, q)
|
|
local_channels = _local_channel_search(db, current_user.id, q)
|
|
|
|
video_results = [
|
|
VideoResult(
|
|
youtube_video_id=r["youtube_video_id"],
|
|
title=r["title"],
|
|
thumbnail_url=r["thumbnail_url"],
|
|
duration_seconds=r["duration_seconds"],
|
|
channel_name=r["channel_name"] or "",
|
|
channel_youtube_id=r["youtube_channel_id"],
|
|
published_at=r["published_at"],
|
|
is_local=True,
|
|
is_downloaded=bool(r["is_downloaded"]),
|
|
is_watched=bool(r["is_watched"]),
|
|
local_video_id=r["id"],
|
|
)
|
|
for r in local_videos
|
|
]
|
|
|
|
channel_results = [
|
|
ChannelResult(
|
|
youtube_channel_id=r["youtube_channel_id"],
|
|
name=r["name"],
|
|
thumbnail_url=r["thumbnail_url"],
|
|
description=r["description"],
|
|
is_followed=bool(r["is_followed"]),
|
|
local_channel_id=r["id"],
|
|
subscriber_count=r.get("subscriber_count"),
|
|
video_count=r.get("video_count"),
|
|
)
|
|
for r in local_channels
|
|
]
|
|
|
|
# Synthesize channel cards from video results for channels not yet in the list
|
|
found_ch_ids = {c.youtube_channel_id for c in channel_results}
|
|
|
|
def _channel_card_from_db(yt_ch_id: str) -> Optional[ChannelResult]:
|
|
row = db.execute(
|
|
text("""
|
|
SELECT c.id, c.youtube_channel_id, c.name, c.thumbnail_url, c.description,
|
|
c.subscriber_count,
|
|
CASE WHEN uc.status = 'followed' THEN 1 ELSE 0 END AS is_followed,
|
|
(SELECT COUNT(*) FROM videos WHERE channel_id = c.id) AS video_count
|
|
FROM channels c
|
|
LEFT JOIN user_channels uc ON c.id = uc.channel_id AND uc.user_id = :user_id
|
|
WHERE c.youtube_channel_id = :yt_ch_id
|
|
"""),
|
|
{"user_id": current_user.id, "yt_ch_id": yt_ch_id},
|
|
).mappings().first()
|
|
if not row:
|
|
return None
|
|
return ChannelResult(
|
|
youtube_channel_id=row["youtube_channel_id"],
|
|
name=row["name"],
|
|
thumbnail_url=row["thumbnail_url"],
|
|
description=row["description"],
|
|
is_followed=bool(row["is_followed"]),
|
|
local_channel_id=row["id"],
|
|
subscriber_count=row.get("subscriber_count"),
|
|
video_count=row.get("video_count"),
|
|
)
|
|
|
|
if video_results:
|
|
for v in video_results:
|
|
if not v.channel_youtube_id or v.channel_youtube_id in found_ch_ids:
|
|
continue
|
|
found_ch_ids.add(v.channel_youtube_id)
|
|
card = _channel_card_from_db(v.channel_youtube_id)
|
|
if card:
|
|
channel_results.append(card)
|
|
|
|
source = "local" if (video_results or channel_results) else "none"
|
|
|
|
# Fall back to live yt-dlp search if no local results or explicitly requested
|
|
if not video_results or live:
|
|
try:
|
|
live_raw = ytdlp.search_youtube(q)
|
|
live_results = _live_search_to_results(db, current_user.id, live_raw)
|
|
except Exception:
|
|
live_results = []
|
|
live_raw = []
|
|
if live_results:
|
|
# Merge: deduplicate by youtube_video_id, local results take priority
|
|
local_ids = {v.youtube_video_id for v in video_results}
|
|
for r in live_results:
|
|
if r.youtube_video_id not in local_ids:
|
|
video_results.append(r)
|
|
source = "live" if source == "none" else "mixed"
|
|
|
|
# Synthesize channel cards from YouTube results for channels not in local DB
|
|
ch_by_yt_id: dict[str, dict] = {}
|
|
for item in live_raw:
|
|
ch = item.get("channel") or {}
|
|
yt_ch_id = ch.get("youtube_channel_id")
|
|
if yt_ch_id and yt_ch_id not in found_ch_ids and yt_ch_id not in ch_by_yt_id:
|
|
ch_by_yt_id[yt_ch_id] = ch
|
|
for yt_ch_id, ch in ch_by_yt_id.items():
|
|
card = _channel_card_from_db(yt_ch_id)
|
|
if card:
|
|
channel_results.append(card)
|
|
found_ch_ids.add(yt_ch_id)
|
|
else:
|
|
name = (ch.get("name") or "").strip()
|
|
if name:
|
|
channel_results.append(ChannelResult(
|
|
youtube_channel_id=yt_ch_id,
|
|
name=name,
|
|
thumbnail_url=None,
|
|
description=None,
|
|
is_followed=False,
|
|
local_channel_id=None,
|
|
))
|
|
found_ch_ids.add(yt_ch_id)
|
|
|
|
return SearchResponse(
|
|
videos=video_results,
|
|
channels=channel_results,
|
|
source=source,
|
|
query=q,
|
|
)
|
|
|
|
|
|
@router.get("/history")
|
|
def search_history(
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""Return the last 8 unique queries for the current user."""
|
|
rows = db.execute(
|
|
text("""
|
|
SELECT query FROM search_history
|
|
WHERE user_id = :uid
|
|
GROUP BY query
|
|
ORDER BY MAX(searched_at) DESC
|
|
LIMIT 8
|
|
"""),
|
|
{"uid": current_user.id},
|
|
).scalars().all()
|
|
return {"queries": list(rows)}
|