1. fastapi_jwt_auth 简介

fastapi_jwt_auth 是一个用于 FastAPI 的 JWT 身份验证库。它简化了 JWT 的生成、验证和管理过程,提供了一套易于使用的 API 来处理访问令牌(Access Token)和刷新令牌(Refresh Token)。

主要功能

  • JWT 生成和验证:自动生成和验证 JWT 令牌,包括访问令牌和刷新令牌。
  • 自定义配置:支持多种配置选项,如密钥、令牌过期时间等。
  • 依赖注入:通过 FastAPI 的依赖注入机制,轻松集成到路由和服务中。
  • 角色和权限管理:结合用户角色,实现细粒度的权限控制。

2. 安装

首先,您需要安装 fastapi_jwt_auth 包。可以使用 pip 进行安装:

pip install fastapi-jwt-auth

此外,确保您已经安装了 FastAPI 和 uvicorn 作为 ASGI 服务器:

pip install fastapi uvicorn

3. 配置

在使用 AuthJWT 之前,您需要进行一些配置。通常,这些配置会在 FastAPI 应用的设置文件中进行定义,例如在 settings.py 中。

配置选项

fastapi_jwt_auth 通过 Pydantic 模型 AuthJWTSettings 进行配置。您可以通过环境变量或直接在代码中设置配置参数。

示例配置(使用环境变量)

创建一个配置类,并在应用启动时加载配置:

# settings.py
from pydantic import BaseSettings

class Settings(BaseSettings):
    authjwt_secret_key: str = "your-secret-key"  # 用于签名 JWT 的密钥
    authjwt_access_token_expires: int = 3600    # 访问令牌过期时间(秒)
    authjwt_refresh_token_expires: int = 86400  # 刷新令牌过期时间(秒)

    class Config:
        env_file = ".env"  # 指定环境变量文件路径

settings = Settings()

.env 文件中定义环境变量:

authjwt_secret_key=your-secret-key
authjwt_access_token_expires=3600
authjwt_refresh_token_expires=86400
在 FastAPI 应用中加载配置
# main.py
from fastapi import FastAPI
from fastapi_jwt_auth import AuthJWT
from fastapi_jwt_auth.exceptions import AuthJWTException
from fastapi import Request, HTTPException
from settings import settings

app = FastAPI()

# 加载配置
@AuthJWT.load_config
def get_config():
    return settings

# 全局异常处理
@app.exception_handler(AuthJWTException)
def authjwt_exception_handler(request: Request, exc: AuthJWTException):
    return JSONResponse(
        status_code=exc.status_code,
        content={"detail": exc.message}
    )

4. 使用 AuthJWT

初始化和依赖注入

在需要使用 AuthJWT 的路由或服务中,通过依赖注入获取 AuthJWT 的实例。

from fastapi import Depends
from fastapi_jwt_auth import AuthJWT
from pydantic import BaseModel

class UserLogin(BaseModel):
    username: str
    password: str

@app.post('/login')
def login(user: UserLogin, Authorize: AuthJWT = Depends()):
    # 验证用户凭证(此处省略具体验证逻辑)
    access_token = Authorize.create_access_token(subject=user.username)
    refresh_token = Authorize.create_refresh_token(subject=user.username)
    return {"access_token": access_token, "refresh_token": refresh_token}

生成 JWT 令牌

使用 AuthJWT 的实例生成访问令牌和刷新令牌。

access_token = Authorize.create_access_token(subject=user.username)
refresh_token = Authorize.create_refresh_token(subject=user.username)
参数说明
  • subject:通常为用户的唯一标识符,如用户名或用户 ID。
  • expires_time:可选,覆盖默认的过期时间。

保护路由

使用 @Authorize.jwt_required() 装饰器保护需要身份验证的路由。

@app.get('/protected')
def protected_route(Authorize: AuthJWT = Depends()):
    Authorize.jwt_required()
    current_user = Authorize.get_jwt_subject()
    return {"message": f"Hello, {current_user}"}

刷新令牌

使用刷新令牌获取新的访问令牌。

@app.post('/refresh')
def refresh_token(Authorize: AuthJWT = Depends()):
    Authorize.jwt_refresh_token_required()
    current_user = Authorize.get_jwt_subject()
    new_access_token = Authorize.create_access_token(subject=current_user)
    return {"access_token": new_access_token}

注销和删除令牌

通过设置黑名单或删除令牌来实现注销功能(需要额外配置黑名单功能)。

@app.post('/logout')
def logout(Authorize: AuthJWT = Depends()):
    Authorize.jwt_required()
    Authorize.unset_jwt_cookies()
    return {"msg": "Successfully logged out"}

5. 高级功能

角色和权限管理

您可以在 JWT 令牌的负载中包含用户的角色信息,以实现基于角色的访问控制。

在创建令牌时包含角色
access_token = Authorize.create_access_token(subject=user.username, user_claims={"role": user.role})
在受保护路由中检查角色
@app.get('/admin')
def admin_route(Authorize: AuthJWT = Depends()):
    Authorize.jwt_required()
    claims = Authorize.get_raw_jwt()
    if claims.get("role") != "admin":
        raise HTTPException(status_code=403, detail="Insufficient permissions")
    return {"message": "Welcome, admin"}

自定义验证和异常处理

您可以根据需求自定义验证逻辑和异常处理。

@app.post('/custom-protected')
def custom_protected_route(Authorize: AuthJWT = Depends()):
    try:
        Authorize.jwt_required()
    except AuthJWTException as e:
        raise HTTPException(status_code=e.status_code, detail=e.message)
    return {"message": "This is a custom protected route"}

配置黑名单功能

fastapi_jwt_auth 支持黑名单功能,用于实现令牌的注销和撤销。

配置黑名单

在配置类中启用黑名单功能,并指定存储黑名单的数据库或缓存。

class Settings(BaseSettings):
    authjwt_secret_key: str = "your-secret-key"
    authjwt_access_token_expires: int = 3600
    authjwt_refresh_token_expires: int = 86400
    authjwt_blacklist_enabled: bool = True
    authjwt_blacklist_token_checks: set = {"access", "refresh"}

    class Config:
        env_file = ".env"
实现黑名单存储

您需要实现一个黑名单存储接口,通常使用数据库或缓存(如 Redis)来存储已撤销的令牌。

from fastapi_jwt_auth import AuthJWT
from fastapi_jwt_auth.exceptions import AuthJWTException

# 假设使用 Redis 作为黑名单存储
import redis
from datetime import datetime, timedelta

redis_client = redis.Redis(host='localhost', port=6379, db=0)

class Settings(BaseSettings):
    # 其他配置
    authjwt_blacklist_enabled: bool = True
    authjwt_blacklist_token_checks: set = {"access", "refresh"}

    class Config:
        env_file = ".env"

settings = Settings()

@AuthJWT.token_in_blacklist_loader
def check_if_token_in_blacklist(decrypted_token):
    jti = decrypted_token['jti']
    entry = redis_client.get(jti)
    return entry is not None

@app.post('/logout')
def logout(Authorize: AuthJWT = Depends()):
    Authorize.jwt_required()
    jti = Authorize.get_raw_jwt()['jti']
    expire = Authorize.get_raw_jwt()['exp']
    redis_client.setex(jti, timedelta(seconds=expire - datetime.utcnow().timestamp()), 'true')
    Authorize.unset_jwt_cookies()
    return {"msg": "Successfully logged out"}

在上述示例中:

  • check_if_token_in_blacklist 函数用于检查令牌是否在黑名单中。
  • 在用户注销时,将令牌的 jti(JWT ID)存储到 Redis 中,并设置过期时间与令牌一致。

6. 与您项目的集成

结合您之前提供的 user_service.py 代码,AuthJWT 在用户认证和授权中的应用如下:

创建和验证 JWT 令牌

在用户登录时,生成访问令牌和刷新令牌:

def gen_user_jwt(db_user: User):
    if db_user.delete == 1:
        raise HTTPException(status_code=500, detail='该账号已被禁用,请联系管理员')
    role, web_menu = gen_user_role(db_user)
    payload = {'user_name': db_user.user_name, 'user_id': db_user.user_id, 'role': role}
    access_token = AuthJWT().create_access_token(subject=json.dumps(payload), expires_time=ACCESS_TOKEN_EXPIRE_TIME)
    refresh_token = AuthJWT().create_refresh_token(subject=db_user.user_name)
    return access_token, refresh_token, role, web_menu

保护路由

在需要保护的路由中,使用依赖注入获取 UserPayload

async def get_login_user(authorize: AuthJWT = Depends()) -> UserPayload:
    authorize.jwt_required()
    current_user = json.loads(authorize.get_jwt_subject())
    user = UserPayload(**current_user)
    # 多点登录逻辑
    return user

async def get_admin_user(authorize: AuthJWT = Depends()) -> UserPayload:
    login_user = await get_login_user(authorize)
    if not login_user.is_admin():
        raise UnAuthorizedError.http_exception()
    return login_user

在路由中使用这些依赖:

@app.get('/admin/dashboard')
def admin_dashboard(current_user: UserPayload = Depends(get_admin_user)):
    return {"message": f"Welcome, admin {current_user.user_name}"}

处理多点登录

get_login_user 函数中,通过 Redis 检查当前令牌是否与存储的会话一致,防止用户被迫下线:

async def get_login_user(authorize: AuthJWT = Depends()) -> UserPayload:
    authorize.jwt_required()
    current_user = json.loads(authorize.get_jwt_subject())
    user = UserPayload(**current_user)
    if not settings.get_system_login_method().allow_multi_login:
        current_token = redis_client.get(USER_CURRENT_SESSION.format(user.user_id))
        if current_token != authorize._token:
            raise UserLoginOfflineError.http_exception()
    return user

7. 示例代码

以下是一个完整的 FastAPI 应用示例,展示了如何集成和使用 fastapi_jwt_auth

# main.py
from fastapi import FastAPI, Depends, HTTPException
from fastapi_jwt_auth import AuthJWT
from fastapi_jwt_auth.exceptions import AuthJWTException
from pydantic import BaseModel
from settings import settings  # 前面定义的配置类
from fastapi.responses import JSONResponse

app = FastAPI()

# 加载配置
@AuthJWT.load_config
def get_config():
    return settings

# 全局异常处理
@app.exception_handler(AuthJWTException)
def authjwt_exception_handler(request, exc):
    return JSONResponse(
        status_code=exc.status_code,
        content={"detail": exc.message}
    )

# 用户登录请求模型
class UserLogin(BaseModel):
    username: str
    password: str

# 用户登录路由
@app.post('/login')
def login(user: UserLogin, Authorize: AuthJWT = Depends()):
    # 验证用户凭证(此处假设验证成功)
    if user.username != "testuser" or user.password != "testpassword":
        raise HTTPException(status_code=401, detail="Bad username or password")
    access_token = Authorize.create_access_token(subject=user.username)
    refresh_token = Authorize.create_refresh_token(subject=user.username)
    return {"access_token": access_token, "refresh_token": refresh_token}

# 受保护的路由
@app.get('/protected')
def protected(Authorize: AuthJWT = Depends()):
    Authorize.jwt_required()
    current_user = Authorize.get_jwt_subject()
    return {"message": f"Hello, {current_user}"}

# 刷新令牌
@app.post('/refresh')
def refresh(Authorize: AuthJWT = Depends()):
    Authorize.jwt_refresh_token_required()
    current_user = Authorize.get_jwt_subject()
    new_access_token = Authorize.create_access_token(subject=current_user)
    return {"access_token": new_access_token}

# 注销路由
@app.post('/logout')
def logout(Authorize: AuthJWT = Depends()):
    Authorize.jwt_required()
    Authorize.unset_jwt_cookies()
    return {"msg": "Successfully logged out"}

8. 注意事项

1. 安全性

  • 密钥管理:确保 authjwt_secret_key 的安全性,不要将其暴露在公共代码库中。建议通过环境变量或安全的配置管理系统来管理密钥。
  • 令牌过期:合理设置访问令牌和刷新令牌的过期时间,平衡安全性和用户体验。
  • HTTPS:在生产环境中,确保所有的通信通过 HTTPS 进行,防止令牌在传输过程中被窃取。

2. 性能优化

  • 数据库连接:优化数据库连接和会话管理,避免因频繁打开和关闭会话导致性能问题。
  • 缓存:结合缓存机制(如 Redis)来加速频繁访问的数据,提升响应速度。

3. 扩展功能

  • 多因素认证(MFA):在现有基础上,添加多因素认证机制,进一步提升安全性。
  • 角色和权限细化:结合项目需求,细化用户角色和权限,实现更复杂的访问控制。
Logo

加入社区!打开量化的大门,首批课程上线啦!

更多推荐