0%

​JWT Authentication in FastAPI:Securing Your API​

(This section is part of Blog Post 2: “Advanced FastAPI: Databases, Auth, and Async”)

What is JWT and How Does It Work?

JWT (JSON Web Token) is an open standard (RFC 7519) for securely transmitting information between parties as a JSON object.

JWT is commonly used for authentication and information exchange. It consists of three parts: Header, Payload, and Signature.

  • Header: Specifies the type (JWT) and the signing algorithm (e.g., HS256).
  • Payload: Contains the actual data to be transmitted (such as user ID, expiration time, etc.), and can be customized.
  • Signature: Created by encrypting the first two parts with a secret key to prevent tampering.

The advantages of JWT are its simple structure, easy transmission, cross-language support, and stateless user authentication. It is widely used in web application login authentication and API authorization scenarios.

Integrating JWT Authentication into Your FastAPI Project

JWT Authentication Flow Sequence Diagram

​Step 1: Install Required Libraries​

JWT requires cryptographic libraries for token signing and password hashing

1
pip install "python-jose[cryptography]" "passlib[bcrypt]"

python-jose: Handles JWT creation/validation (supports multiple algorithms like HS256).
passlib: Securely hashes passwords using ​bcrypt​ (industry-standard for password storage).

​Step 2: Implementation Steps

The code for this article is located in the GitHub repository named 03_fastapi_jwt.
https://github.com/gzthss/fastapi_tutorial.git

For detailed information about the database section, please refer to my other article:
Advanced FastAPI: Databases…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
project/
├── db/
│ └── app.db # sqlite db
│ └── database.py # Database connection and session management
│ └── models.py # Database models and table definitions

├── routers/
│ └── auth.py # Authentication routes

├── utils/
│ └── security.py # JWT/password utilities
│ └── schemas.py # Pydantic data models
│ └── dependencies.py # Auth dependencies

└── main.py # Main app setup

routers/auth.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
"""
Authentication routes and user management
"""
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session

from utils.security import get_password_hash, verify_password, create_access_token
from utils.schemas import UserCreate, TokenResponse
from utils.dependencies import get_current_user

from db.database import get_db
from db.models import User

router = APIRouter(prefix="/auth", tags=["Authentication"])

@router.post("/register", status_code=status.HTTP_201_CREATED)
async def register_user(user: UserCreate, db: Session = Depends(get_db)):
"""User registration endpoint."""
# Check for existing user
existing_user = db.query(User).filter(
(User.user_name == user.username) | (User.email == user.email)
).first()
if existing_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Username or email already registered."
)

Hash password & save user
hashed_password = get_password_hash(user.password)
new_user = User(
user_name=user.username,
email=user.email,
hashed_password=hashed_password
)
db.add(new_user)
db.commit()
return {"User registered successfully"}

@router.post("/token", response_model=TokenResponse)
async def login_for_access_token(username: str,password: str,db: Session = Depends(get_db)):
"""Login endpoint returning JWT."""
user = db.query(User).filter(User.user_name == username).first()
if not user or not verify_password(password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)

#Generate JWT
access_token = create_access_token(data={"sub": user.user_name})
return {"access_token": access_token, "token_type": "bearer"}

utils/schemas.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# routers/schemas.py
from pydantic import BaseModel

class TokenData(BaseModel):
"""JWT Token payload schema."""
username: str | None = None

class UserCreate(BaseModel):
"""User registration request schema."""
username: str
password: str
email: str

class TokenResponse(BaseModel):
"""Login response schema."""
access_token: str
token_type: str

utils/dependencies.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# dependencies.py
from fastapi import HTTPException, Depends, status
from fastapi.security import OAuth2PasswordBearer
from jose import jwt, JWTError

from utils.schemas import TokenData
from utils.security import decode_token

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/token")

def get_current_user(token: str = Depends(oauth2_scheme)) -> TokenData:
"""Dependency to validate JWT."""
try:
payload = decode_token(token)
username: str = payload.get("sub")
if username is None:
raise JWTError
return TokenData(username=username)
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)

utils/security.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# security.py is used to create and decode JWT tokens.
from datetime import datetime, timedelta
from jose import jwt,JWTError
from passlib.context import CryptContext
from fastapi import HTTPException, status

# -----------------------------
# Password Hashing Config
# -----------------------------

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify plain password against hash."""
return pwd_context.verify(plain_password, hashed_password)

def get_password_hash(password: str) -> str:
"""Generate bcrypt hash from password."""
return pwd_context.hash(password)

# -----------------------------
# JWT Configuration
# -----------------------------

SECRET_KEY = "f072b58f044a20bd1488f6d8d2f002af5715011952d185fa84750421282defba"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

def create_access_token(data: dict, expires_delta: timedelta = None):
"""Generate JWT with expiration."""
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire}) #
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)

def decode_token(token: str) -> dict:
""" Validate and decode JWT."""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
return payload
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)

/main.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from fastapi import FastAPI, HTTPException, Depends
from routers import auth
from utils.dependencies import get_current_user
from utils.schemas import TokenData
import uvicorn

app = FastAPI()

app.include_router(auth.router)

@app.get("/secure-data")
async def get_secure_data(current_user: TokenData = Depends(get_current_user)):
"""Example protected route."""
return {
"message": f" Hello {current_user.username}, this is sensitive data!",
"status": "Authorized"
}

if __name__ == "__main__":
uvicorn.run(app, host="127.0.0.1", port=8000)

​Step 3: Testing the Flow With Postman
  • Let’s create a user first.
  • Use the user you just created to log in and get token
  • Use the token to call the authorized API endpoint.

This is the process of using JWT for authorization and encryption in FastAPI. See you next time!