"""Authentication: password hashing, JWT tokens, and FastAPI dependencies.""" import os from datetime import datetime, timedelta, timezone from typing import Optional import bcrypt from fastapi import Depends, HTTPException, Request, status from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from jose import JWTError, jwt from sqlalchemy.orm import Session from database import get_db from models import User # Secret key for JWT signing (generate a random one if not set) SECRET_KEY = os.environ.get("JWT_SECRET", "bestof-manager-secret-key-change-in-production") ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_HOURS = 24 security = HTTPBearer(auto_error=False) def hash_password(password: str) -> str: return bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8") def verify_password(plain_password: str, hashed_password: str) -> bool: return bcrypt.checkpw(plain_password.encode("utf-8"), hashed_password.encode("utf-8")) def create_access_token(user_id: int, role: str) -> str: expire = datetime.now(timezone.utc) + timedelta(hours=ACCESS_TOKEN_EXPIRE_HOURS) payload = { "sub": str(user_id), "role": role, "exp": expire, } return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM) def _decode_token(token: str) -> Optional[dict]: """Try to decode a JWT token, return payload or None.""" try: return jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) except (JWTError, ValueError, TypeError): return None def _token_to_user(token: str, db: Session) -> Optional[User]: """Given a token string, return User or None.""" payload = _decode_token(token) if payload is None: return None try: user_id = int(payload.get("sub")) except (ValueError, TypeError): return None return db.query(User).filter(User.id == user_id).first() def get_current_user( credentials: HTTPAuthorizationCredentials = Depends(security), db: Session = Depends(get_db), ) -> User: """FastAPI dependency: validates JWT Bearer token and returns the current User.""" if credentials is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated", ) user = _token_to_user(credentials.credentials, db) if user is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or expired token", ) return user def get_page_user( request: Request, db: Session = Depends(get_db), ) -> Optional[User]: """FastAPI dependency for page routes: checks cookie first, then Bearer header. Returns None (not 401) if not authenticated — caller should redirect to /login. """ token = request.cookies.get("token") if token: user = _token_to_user(token, db) if user: return user # Fallback: check Authorization header (for AJAX page loads) auth = request.headers.get("Authorization") if auth and auth.startswith("Bearer "): user = _token_to_user(auth[7:], db) if user: return user return None def require_admin(current_user: User = Depends(get_current_user)) -> User: """FastAPI dependency: requires admin role.""" if current_user.role != "admin": raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Admin access required", ) return current_user