Files
youclonedl/backend/routers/search.py
Mattias Thall c223e57463 fix: prevent concurrent yt-dlp sessions that invalidate cookies
Three code paths could fire yt-dlp immediately (polite=False) while a
download was already running, causing YouTube to see two simultaneous
authenticated sessions and invalidate the cookie:

- search.py: live yt-dlp fallback now skipped while any download is active
- downloads.py: _ensure_video uses polite=True so it waits for active
  downloads to finish before fetching metadata for an unknown video
- channels.py: follow_by_url uses polite=True when fetching metadata
  for a brand-new channel

Added is_download_active() helper to ytdlp.py to expose the active
download state without importing private globals.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-27 03:07:19 +02:00

334 lines
12 KiB
Python

"""Two-tier search: local FTS5 first, yt-dlp live fallback."""
import json
import re as _re
from datetime import datetime
from typing import Optional
from fastapi import APIRouter, Depends, Query
from pydantic import BaseModel
from sqlalchemy.orm import Session
from sqlalchemy import text
from ..auth_utils import get_current_user
from ..database import get_db
from ..models import User, Video, Channel, UserVideo, SearchHistory, UserTagAffinity
from ..services import ytdlp
router = APIRouter()
_STOPWORDS = {
"the","a","an","is","it","in","on","at","to","of","and","or","for",
"with","this","that","are","was","be","by","as","from","has","have",
"will","can","but","not","my","i","you","your","we","how","what",
"why","when","which","who","more","about","than","do","did","does",
}
def _query_affinity_tags(q: str) -> list[str]:
words = _re.sub(r"[^\w\s]", "", q.lower()).split()
return [w for w in words if len(w) >= 3 and w not in _STOPWORDS]
def _log_search(db: Session, user_id: int, q: str):
"""Persist search query and bump affinity scores for its meaningful terms."""
db.add(SearchHistory(user_id=user_id, query=q.strip()))
for tag in _query_affinity_tags(q):
existing = db.query(UserTagAffinity).filter_by(user_id=user_id, tag=tag).first()
if existing:
existing.score = min(existing.score + 0.3, 50.0)
existing.updated_at = datetime.utcnow()
else:
db.add(UserTagAffinity(user_id=user_id, tag=tag, score=0.3,
updated_at=datetime.utcnow()))
try:
db.commit()
except Exception:
db.rollback()
class VideoResult(BaseModel):
youtube_video_id: str
title: str
thumbnail_url: Optional[str]
duration_seconds: Optional[int]
channel_name: str
channel_youtube_id: Optional[str]
published_at: Optional[datetime]
is_local: bool
is_downloaded: bool
is_watched: bool
local_video_id: Optional[int]
model_config = {"from_attributes": True}
class ChannelResult(BaseModel):
youtube_channel_id: str
name: str
thumbnail_url: Optional[str]
description: Optional[str]
is_followed: bool
local_channel_id: Optional[int]
subscriber_count: Optional[int] = None
video_count: Optional[int] = None
model_config = {"from_attributes": True}
class SearchResponse(BaseModel):
videos: list[VideoResult]
channels: list[ChannelResult]
source: str # "local" | "live" | "mixed"
query: str
def _sanitize_fts(q: str) -> str:
"""Strip FTS5 syntax characters and return a safe multi-word query."""
clean = _re.sub(r'["\(\)\[\]\{\}\*\+\?\!\^\~\-]', ' ', q)
words = [w for w in clean.split() if w.upper() not in ("AND", "OR", "NOT")]
if not words:
return '""'
return " ".join(words)
def _local_video_search(db: Session, user_id: int, q: str, limit: int = 100) -> list[dict]:
try:
rows = db.execute(
text("""
SELECT
v.id, v.youtube_video_id, v.title, v.thumbnail_url,
v.duration_seconds, v.published_at,
c.name AS channel_name, c.youtube_channel_id,
COALESCE(uv.downloaded, 0) AS is_downloaded,
COALESCE(uv.watched, 0) AS is_watched
FROM videos_fts fts
JOIN videos v ON fts.rowid = v.id
LEFT JOIN channels c ON v.channel_id = c.id
LEFT JOIN user_videos uv ON v.id = uv.video_id AND uv.user_id = :user_id
WHERE videos_fts MATCH :query
ORDER BY rank
LIMIT :limit
"""),
{"user_id": user_id, "query": _sanitize_fts(q), "limit": limit},
).mappings().all()
except Exception:
return []
return [dict(r) for r in rows]
def _local_channel_search(db: Session, user_id: int, q: str, limit: int = 5) -> list[dict]:
try:
rows = db.execute(
text("""
SELECT
c.id, c.youtube_channel_id, c.name, c.thumbnail_url, c.description,
c.subscriber_count,
CASE WHEN uc.status = 'followed' THEN 1 ELSE 0 END AS is_followed,
(SELECT COUNT(*) FROM videos WHERE channel_id = c.id) AS video_count
FROM channels_fts fts
JOIN channels c ON fts.rowid = c.id
LEFT JOIN user_channels uc ON c.id = uc.channel_id AND uc.user_id = :user_id
WHERE channels_fts MATCH :query
ORDER BY rank
LIMIT :limit
"""),
{"user_id": user_id, "query": _sanitize_fts(q), "limit": limit},
).mappings().all()
except Exception:
return []
return [dict(r) for r in rows]
def _upsert_channel_from_meta(db: Session, ch: dict) -> Channel:
existing = db.query(Channel).filter_by(youtube_channel_id=ch["youtube_channel_id"]).first()
if not existing:
existing = Channel(**{k: v for k, v in ch.items() if hasattr(Channel, k)})
db.add(existing)
db.flush()
return existing
def _live_search_to_results(
db: Session, user_id: int, raw: list[dict]
) -> list[VideoResult]:
results = []
for item in raw:
yt_id = item.get("youtube_video_id")
if not yt_id:
continue
local = db.query(Video).filter_by(youtube_video_id=yt_id).first()
uv = None
if local:
uv = db.query(UserVideo).filter_by(user_id=user_id, video_id=local.id).first()
ch = item.get("channel", {}) or {}
# Prefer the DB date — flat-playlist search results rarely include upload_date
published_at = (local.published_at if local and local.published_at
else item.get("published_at"))
results.append(VideoResult(
youtube_video_id=yt_id,
title=item["title"],
thumbnail_url=item.get("thumbnail_url"),
duration_seconds=item.get("duration_seconds"),
channel_name=ch.get("name", ""),
channel_youtube_id=ch.get("youtube_channel_id"),
published_at=published_at,
is_local=local is not None,
is_downloaded=bool(uv and uv.downloaded),
is_watched=bool(uv and uv.watched),
local_video_id=local.id if local else None,
))
return results
@router.get("", response_model=SearchResponse)
def search(
q: str = Query(..., min_length=1),
live: bool = Query(False),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
_log_search(db, current_user.id, q)
# Always run local search first
local_videos = _local_video_search(db, current_user.id, q)
local_channels = _local_channel_search(db, current_user.id, q)
video_results = [
VideoResult(
youtube_video_id=r["youtube_video_id"],
title=r["title"],
thumbnail_url=r["thumbnail_url"],
duration_seconds=r["duration_seconds"],
channel_name=r["channel_name"] or "",
channel_youtube_id=r["youtube_channel_id"],
published_at=r["published_at"],
is_local=True,
is_downloaded=bool(r["is_downloaded"]),
is_watched=bool(r["is_watched"]),
local_video_id=r["id"],
)
for r in local_videos
]
channel_results = [
ChannelResult(
youtube_channel_id=r["youtube_channel_id"],
name=r["name"],
thumbnail_url=r["thumbnail_url"],
description=r["description"],
is_followed=bool(r["is_followed"]),
local_channel_id=r["id"],
subscriber_count=r.get("subscriber_count"),
video_count=r.get("video_count"),
)
for r in local_channels
]
# Synthesize channel cards from video results for channels not yet in the list
found_ch_ids = {c.youtube_channel_id for c in channel_results}
def _channel_card_from_db(yt_ch_id: str) -> Optional[ChannelResult]:
row = db.execute(
text("""
SELECT c.id, c.youtube_channel_id, c.name, c.thumbnail_url, c.description,
c.subscriber_count,
CASE WHEN uc.status = 'followed' THEN 1 ELSE 0 END AS is_followed,
(SELECT COUNT(*) FROM videos WHERE channel_id = c.id) AS video_count
FROM channels c
LEFT JOIN user_channels uc ON c.id = uc.channel_id AND uc.user_id = :user_id
WHERE c.youtube_channel_id = :yt_ch_id
"""),
{"user_id": current_user.id, "yt_ch_id": yt_ch_id},
).mappings().first()
if not row:
return None
return ChannelResult(
youtube_channel_id=row["youtube_channel_id"],
name=row["name"],
thumbnail_url=row["thumbnail_url"],
description=row["description"],
is_followed=bool(row["is_followed"]),
local_channel_id=row["id"],
subscriber_count=row.get("subscriber_count"),
video_count=row.get("video_count"),
)
if video_results:
for v in video_results:
if not v.channel_youtube_id or v.channel_youtube_id in found_ch_ids:
continue
found_ch_ids.add(v.channel_youtube_id)
card = _channel_card_from_db(v.channel_youtube_id)
if card:
channel_results.append(card)
source = "local" if (video_results or channel_results) else "none"
# Fall back to live yt-dlp search if no local results or explicitly requested.
# Skip if a download is active — concurrent yt-dlp sessions invalidate cookies.
if (not video_results or live) and not ytdlp.is_download_active():
try:
live_raw = ytdlp.search_youtube(q)
live_results = _live_search_to_results(db, current_user.id, live_raw)
except Exception:
live_results = []
live_raw = []
if live_results:
# Merge: deduplicate by youtube_video_id, local results take priority
local_ids = {v.youtube_video_id for v in video_results}
for r in live_results:
if r.youtube_video_id not in local_ids:
video_results.append(r)
source = "live" if source == "none" else "mixed"
# Synthesize channel cards from YouTube results for channels not in local DB
ch_by_yt_id: dict[str, dict] = {}
for item in live_raw:
ch = item.get("channel") or {}
yt_ch_id = ch.get("youtube_channel_id")
if yt_ch_id and yt_ch_id not in found_ch_ids and yt_ch_id not in ch_by_yt_id:
ch_by_yt_id[yt_ch_id] = ch
for yt_ch_id, ch in ch_by_yt_id.items():
card = _channel_card_from_db(yt_ch_id)
if card:
channel_results.append(card)
found_ch_ids.add(yt_ch_id)
else:
name = (ch.get("name") or "").strip()
if name:
channel_results.append(ChannelResult(
youtube_channel_id=yt_ch_id,
name=name,
thumbnail_url=None,
description=None,
is_followed=False,
local_channel_id=None,
))
found_ch_ids.add(yt_ch_id)
return SearchResponse(
videos=video_results,
channels=channel_results,
source=source,
query=q,
)
@router.get("/history")
def search_history(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Return the last 8 unique queries for the current user."""
rows = db.execute(
text("""
SELECT query FROM search_history
WHERE user_id = :uid
GROUP BY query
ORDER BY MAX(searched_at) DESC
LIMIT 8
"""),
{"uid": current_user.id},
).scalars().all()
return {"queries": list(rows)}