import os from datetime import datetime, timedelta from typing import Optional from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks from pydantic import BaseModel from sqlalchemy.orm import Session from sqlalchemy import text from ..auth_utils import get_current_user from ..config import settings from ..database import get_db, SessionLocal from ..models import Channel, Download, User, UserSettings, UserVideo, Video from ..services import ytdlp router = APIRouter() class DownloadRequest(BaseModel): youtube_video_id: str quality: Optional[str] = None subtitle_langs: Optional[str] = None # overrides user setting when provided TRASH_TTL_DAYS = 7 class DownloadOut(BaseModel): id: int status: str progress_percent: float video_title: Optional[str] video_thumbnail_url: Optional[str] youtube_video_id: Optional[str] file_url: Optional[str] resolution: Optional[str] created_at: datetime completed_at: Optional[datetime] error_message: Optional[str] pending_delete_at: Optional[datetime] = None model_config = {"from_attributes": True} def _on_progress(download_id: int, pct: float): db = SessionLocal() try: dl = db.query(Download).filter(Download.id == download_id).first() if dl: dl.progress_percent = pct dl.status = "downloading" db.commit() finally: db.close() def _on_complete(download_id: int, file_path: Optional[str], resolution: Optional[str] = None): db = SessionLocal() try: dl = db.query(Download).filter(Download.id == download_id).first() if dl: dl.status = "complete" dl.progress_percent = 100.0 dl.completed_at = datetime.utcnow() dl.file_path = file_path dl.resolution = resolution db.commit() uv = db.query(UserVideo).filter_by(user_id=dl.user_id, video_id=dl.video_id).first() if not uv: uv = UserVideo(user_id=dl.user_id, video_id=dl.video_id) db.add(uv) uv.downloaded = True uv.downloaded_at = datetime.utcnow() db.commit() finally: db.close() def _on_error(download_id: int, message: str): db = SessionLocal() try: dl = db.query(Download).filter(Download.id == download_id).first() if dl: dl.status = "failed" dl.error_message = message db.commit() finally: db.close() def _ensure_video(db: Session, youtube_video_id: str) -> Video: video = db.query(Video).filter_by(youtube_video_id=youtube_video_id).first() if video: return video meta = ytdlp.fetch_video_metadata(youtube_video_id) if not meta: raise HTTPException(status_code=404, detail="Video not found on YouTube") ch_data = meta.pop("channel", {}) or {} yt_channel_id = ch_data.get("youtube_channel_id") channel = None if yt_channel_id: channel = db.query(Channel).filter_by(youtube_channel_id=yt_channel_id).first() if not channel: channel = Channel(**{k: v for k, v in ch_data.items() if hasattr(Channel, k)}) db.add(channel) db.flush() video = Video( channel_id=channel.id if channel else None, **{k: v for k, v in meta.items() if hasattr(Video, k)}, ) db.add(video) db.commit() db.refresh(video) return video @router.post("", response_model=DownloadOut, status_code=201) def create_download( body: DownloadRequest, background_tasks: BackgroundTasks, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): video = _ensure_video(db, body.youtube_video_id) user_settings = db.query(UserSettings).filter_by(user_id=current_user.id).first() default_quality = user_settings.preferred_quality if user_settings else "best" quality = body.quality if body.quality in ytdlp.QUALITY_FORMATS else default_quality if body.subtitle_langs is not None: subtitle_langs = body.subtitle_langs.strip() else: subtitle_langs = (user_settings.subtitle_langs or "") if user_settings else "" _DL_SELECT = """ SELECT d.id, d.status, d.progress_percent, d.resolution, d.created_at, d.completed_at, d.error_message, d.pending_delete_at, v.title AS video_title, v.thumbnail_url AS video_thumbnail_url, v.youtube_video_id, '/files/' || v.youtube_video_id || '.mp4' AS file_url FROM downloads d JOIN videos v ON d.video_id = v.id WHERE d.id = :id """ existing = db.query(Download).filter_by( user_id=current_user.id, video_id=video.id, ).filter(Download.status.in_(["pending", "downloading", "complete"])).first() if existing: row = db.execute(text(_DL_SELECT), {"id": existing.id}).mappings().first() return DownloadOut(**dict(row)) dl = Download(user_id=current_user.id, video_id=video.id, status="pending") db.add(dl) db.commit() db.refresh(dl) background_tasks.add_task( ytdlp.start_download, video.youtube_video_id, dl.id, _on_progress, _on_complete, _on_error, quality, subtitle_langs, ) row = db.execute(text(_DL_SELECT), {"id": dl.id}).mappings().first() return DownloadOut(**dict(row)) @router.get("", response_model=list[DownloadOut]) def list_downloads( db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): _purge_expired_trash(db) rows = db.execute( text(""" SELECT d.id, d.status, d.progress_percent, d.created_at, d.completed_at, d.error_message, d.pending_delete_at, d.resolution, v.title AS video_title, v.thumbnail_url AS video_thumbnail_url, v.youtube_video_id, '/files/' || v.youtube_video_id || '.mp4' AS file_url FROM downloads d JOIN videos v ON d.video_id = v.id WHERE d.user_id = :user_id ORDER BY d.created_at DESC LIMIT 200 """), {"user_id": current_user.id}, ).mappings().all() return [DownloadOut(**dict(r)) for r in rows] def _get_quality(db, user_id: int) -> str: s = db.query(UserSettings).filter_by(user_id=user_id).first() return s.preferred_quality if s else "best" def _get_subtitle_langs(db, user_id: int) -> str: s = db.query(UserSettings).filter_by(user_id=user_id).first() return (s.subtitle_langs or "") if s else "" @router.post("/channel/{channel_id}", status_code=202) def download_channel_videos( channel_id: int, background_tasks: BackgroundTasks, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): quality = _get_quality(db, current_user.id) subtitle_langs = _get_subtitle_langs(db, current_user.id) rows = db.execute( text(""" SELECT v.id, v.youtube_video_id FROM videos v LEFT JOIN downloads d ON v.id = d.video_id AND d.user_id = :uid AND d.status IN ('pending', 'downloading', 'complete') WHERE v.channel_id = :cid AND d.id IS NULL """), {"uid": current_user.id, "cid": channel_id}, ).mappings().all() count = 0 for row in rows: dl = Download(user_id=current_user.id, video_id=row["id"], status="pending") db.add(dl) db.flush() background_tasks.add_task( ytdlp.start_download, row["youtube_video_id"], dl.id, _on_progress, _on_complete, _on_error, quality, subtitle_langs, ) count += 1 db.commit() return {"queued": count} @router.post("/following", status_code=202) def download_following_videos( background_tasks: BackgroundTasks, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): quality = _get_quality(db, current_user.id) subtitle_langs = _get_subtitle_langs(db, current_user.id) rows = db.execute( text(""" SELECT v.id, v.youtube_video_id FROM videos v JOIN channels c ON v.channel_id = c.id JOIN user_channels uc ON c.id = uc.channel_id AND uc.user_id = :uid AND uc.status = 'followed' LEFT JOIN downloads d ON v.id = d.video_id AND d.user_id = :uid AND d.status IN ('pending', 'downloading', 'complete') WHERE d.id IS NULL """), {"uid": current_user.id}, ).mappings().all() count = 0 for row in rows: dl = Download(user_id=current_user.id, video_id=row["id"], status="pending") db.add(dl) db.flush() background_tasks.add_task( ytdlp.start_download, row["youtube_video_id"], dl.id, _on_progress, _on_complete, _on_error, quality, ) count += 1 db.commit() return {"queued": count} def _purge_expired_trash(db: Session): expired = db.execute( text("SELECT id, video_id, user_id FROM downloads WHERE pending_delete_at IS NOT NULL AND pending_delete_at <= :now"), {"now": datetime.utcnow()}, ).mappings().all() for row in expired: video = db.query(Video).filter_by(id=row["video_id"]).first() if video: fp = ytdlp.predicted_file_path(video.youtube_video_id) if fp.exists(): try: os.remove(fp) except OSError: pass uv = db.query(UserVideo).filter_by(user_id=row["user_id"], video_id=row["video_id"]).first() if uv: uv.downloaded = False uv.downloaded_at = None db.execute(text("DELETE FROM downloads WHERE id = :id"), {"id": row["id"]}) if expired: db.commit() def _delete_download_record(db: Session, dl: "Download", user_id: int): video = db.query(Video).filter_by(id=dl.video_id).first() if video: fp = ytdlp.predicted_file_path(video.youtube_video_id) if fp.exists(): try: os.remove(fp) except OSError: pass uv = db.query(UserVideo).filter_by(user_id=user_id, video_id=dl.video_id).first() if uv: uv.downloaded = False uv.downloaded_at = None db.delete(dl) @router.delete("/all", status_code=204) def delete_all_downloads( db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): dls = db.query(Download).filter( Download.user_id == current_user.id, Download.status.notin_(["pending", "downloading"]), ).all() for dl in dls: _delete_download_record(db, dl, current_user.id) db.commit() @router.post("/{download_id}/restore", status_code=200) def restore_download( download_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): dl = db.query(Download).filter_by(id=download_id, user_id=current_user.id).first() if not dl: raise HTTPException(status_code=404, detail="Download not found") dl.pending_delete_at = None db.commit() return {"ok": True} @router.delete("/{download_id}", status_code=204) def delete_download( download_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """Delete a download record and its file from disk. Resets downloaded flag on the video.""" dl = db.query(Download).filter_by(id=download_id, user_id=current_user.id).first() if not dl: raise HTTPException(status_code=404, detail="Download not found") _delete_download_record(db, dl, current_user.id) db.commit() @router.get("/{download_id}", response_model=DownloadOut) def get_download( download_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): row = db.execute( text(""" SELECT d.id, d.status, d.progress_percent, d.resolution, d.created_at, d.completed_at, d.error_message, d.pending_delete_at, v.title AS video_title, v.thumbnail_url AS video_thumbnail_url, v.youtube_video_id, '/files/' || v.youtube_video_id || '.mp4' AS file_url FROM downloads d JOIN videos v ON d.video_id = v.id WHERE d.id = :id AND d.user_id = :user_id """), {"id": download_id, "user_id": current_user.id}, ).mappings().first() if not row: raise HTTPException(status_code=404, detail="Download not found") return DownloadOut(**dict(row))