Files
vela-platform/backend/auth.py
oimwiodev d1160673a7 🎉 v1.0.0 — Vela Platform launch
Self-hosted P&L tracking app with component-level pricing.
Offers: Atlas/Atlas+/Rif/Rif+ with granular cost breakdown.
API + MCP + multi-user auth.
2026-06-15 23:05:59 +01:00

112 lines
3.4 KiB
Python

"""Authentication: password hashing, JWT tokens, and FastAPI dependencies."""
import os
from datetime import datetime, timedelta, timezone
from typing import Optional
import bcrypt
from fastapi import Depends, HTTPException, Request, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from jose import JWTError, jwt
from sqlalchemy.orm import Session
from database import get_db
from models import User
# Secret key for JWT signing (generate a random one if not set)
SECRET_KEY = os.environ.get("JWT_SECRET", "bestof-manager-secret-key-change-in-production")
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_HOURS = 24
security = HTTPBearer(auto_error=False)
def hash_password(password: str) -> str:
return bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8")
def verify_password(plain_password: str, hashed_password: str) -> bool:
return bcrypt.checkpw(plain_password.encode("utf-8"), hashed_password.encode("utf-8"))
def create_access_token(user_id: int, role: str) -> str:
expire = datetime.now(timezone.utc) + timedelta(hours=ACCESS_TOKEN_EXPIRE_HOURS)
payload = {
"sub": str(user_id),
"role": role,
"exp": expire,
}
return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
def _decode_token(token: str) -> Optional[dict]:
"""Try to decode a JWT token, return payload or None."""
try:
return jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
except (JWTError, ValueError, TypeError):
return None
def _token_to_user(token: str, db: Session) -> Optional[User]:
"""Given a token string, return User or None."""
payload = _decode_token(token)
if payload is None:
return None
try:
user_id = int(payload.get("sub"))
except (ValueError, TypeError):
return None
return db.query(User).filter(User.id == user_id).first()
def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db),
) -> User:
"""FastAPI dependency: validates JWT Bearer token and returns the current User."""
if credentials is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not authenticated",
)
user = _token_to_user(credentials.credentials, db)
if user is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or expired token",
)
return user
def get_page_user(
request: Request,
db: Session = Depends(get_db),
) -> Optional[User]:
"""FastAPI dependency for page routes: checks cookie first, then Bearer header.
Returns None (not 401) if not authenticated — caller should redirect to /login.
"""
token = request.cookies.get("token")
if token:
user = _token_to_user(token, db)
if user:
return user
# Fallback: check Authorization header (for AJAX page loads)
auth = request.headers.get("Authorization")
if auth and auth.startswith("Bearer "):
user = _token_to_user(auth[7:], db)
if user:
return user
return None
def require_admin(current_user: User = Depends(get_current_user)) -> User:
"""FastAPI dependency: requires admin role."""
if current_user.role != "admin":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Admin access required",
)
return current_user