PYTHON
auth.py🐍python
"""
JWT Authentication utilities.
"""
from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.orm import Session
from .config import settings
from .database import get_db
from .models import User
from .schemas import TokenData
# Password hashing context
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# OAuth2 scheme for token extraction
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/users/login")
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify a plain password against a hashed password."""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
"""Hash a password using bcrypt."""
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
"""
Create a JWT access token.
Args:
data: Payload data (usually {"sub": username})
expires_delta: Optional custom expiration time
Returns:
Encoded JWT token
"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
return encoded_jwt
def authenticate_user(db: Session, username: str, password: str) -> Optional[User]:
"""
Authenticate a user by username and password.
Args:
db: Database session
username: User's username
password: Plain text password
Returns:
User object if authenticated, None otherwise
"""
user = db.query(User).filter(User.username == username).first()
if not user:
return None
if not verify_password(password, user.hashed_password):
return None
return user
async def get_current_user(
token: str = Depends(oauth2_scheme),
db: Session = Depends(get_db)
) -> User:
"""
Dependency to get the current authenticated user from JWT token.
Usage:
@app.get("/protected")
def protected_route(user: User = Depends(get_current_user)):
return {"user": user.username}
"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = db.query(User).filter(User.username == token_data.username).first()
if user is None:
raise credentials_exception
return user
async def get_current_active_user(
current_user: User = Depends(get_current_user)
) -> User:
"""Dependency to ensure the current user is active."""
if not current_user.is_active:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user