from datetime import datetime, timedelta from typing import Optional import bcrypt from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer from jose import JWTError, jwt from sqlalchemy.orm import Session from .config import settings from .database import get_db from .models import User oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/login") def verify_password(plain: str, hashed: str) -> bool: return bcrypt.checkpw(plain.encode(), hashed.encode()) def hash_password(plain: str) -> str: return bcrypt.hashpw(plain.encode(), bcrypt.gensalt()).decode() def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str: to_encode = data.copy() expire = datetime.utcnow() + (expires_delta or timedelta(minutes=settings.access_token_expire_minutes)) to_encode["exp"] = expire return jwt.encode(to_encode, settings.secret_key, algorithm=settings.algorithm) def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)) -> User: credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, settings.secret_key, algorithms=[settings.algorithm]) user_id: int = payload.get("sub") if user_id is None: raise credentials_exception except JWTError: raise credentials_exception user = db.query(User).filter(User.id == int(user_id)).first() if user is None: raise credentials_exception return user