1. 认证与权限基础概念

1.1 认证(Authentication)与授权(Authorization)的区别

认证是验证用户身份的过程,确认用户是谁。授权是确定用户能做什么的过程,即用户拥有的权限。

1.2 FastAPI 安全机制架构

FastAPI 的安全机制基于 Starlette 的安全模块,通过依赖注入系统实现。核心组件包括:

  • 安全方案(Security Schemes):定义认证方式
  • 安全依赖(Security Dependencies):验证用户身份
  • 权限检查(Permission Checks):验证用户权限

1.3 常用认证方案对比

认证方案 适用场景 优点 缺点
Basic Auth 简单API、内部系统 实现简单 安全性低,密码明文传输
JWT 前后端分离、微服务 无状态,易于扩展 令牌泄露风险
OAuth2 第三方登录、API授权 标准协议,安全性高 实现复杂
API Key 服务间通信 实现简单,易于管理 密钥管理复杂
Session 传统Web应用 服务端管理,易于失效 状态管理复杂,不易扩展

1.4 认证方案性能对比

认证方案 认证速度 存储开销 网络开销 扩展性
Basic Auth
JWT
OAuth2
API Key
Session

1.5 权限系统设计原则

  • 最小权限原则:只授予用户完成任务所需的最小权限
  • 权限分离:不同功能的权限应分离管理
  • 权限继承:合理设计权限层级,支持权限继承
  • 审计日志:记录权限变更和使用情况

1.6 密码哈希与验证基础

密码哈希是将密码转换为不可逆字符串的过程,常用的哈希算法包括:

  • bcrypt:推荐使用,自适应哈希函数

1.7 FastAPI 最新安全特性

FastAPI 0.104+版本中新增了一些安全特性:

  • 自动生成文档 :根据安全依赖自动生成 API 文档中的安全部分
  • 改进的 OAuth2 支持:更好的 OAuth2 流程集成
  • 增强的依赖注入系统:更灵活的安全依赖管理
  • 内置的密码强度检查:提供密码强度评估工具

2. 安全模块核心组件与源码分析

2.1 Security 依赖注入系统

FastAPI 的安全机制基于依赖注入系统,通过 Security 函数创建安全依赖:

from fastapi import Depends, Security
from fastapi.security import HTTPBasic, HTTPBasicCredentials

security = HTTPBasic()

@app.get("/items/")
async def read_items(credentials: HTTPBasicCredentials = Security(security)):
    return {"username": credentials.username, "password": credentials.password}

2.2 OAuth2PasswordBearer 源码分析与参数详解

OAuth2PasswordBearer 是 FastAPI 中用于 OAuth2 密码流的核心组件:

from typing import Optional, Dict

class OAuth2PasswordBearer(OAuth2):
    def __init__(
        self,
        tokenUrl: str,
        scheme_name: Optional[str] = None,
        scopes: Optional[Dict[str, str]] = None,
        auto_error: bool = True,
    ):
        # 初始化 OAuth2 密码流
        # tokenUrl: 用于获取令牌的 URL
        # scheme_name: 安全方案名称
        # scopes: 权限范围字典
        # auto_error: 是否自动抛出错误
        pass

2.3 HTTPBasic 与 HTTPBearer 实现原理

  • HTTPBasic:基于 HTTP Basic 认证,使用用户名和密码进行认证
  • HTTPBearer:基于 HTTP Bearer 认证,使用令牌进行认证

2.4 安全依赖的执行流程

  1. 客户端发送请求,包含认证信息
  2. FastAPI 解析认证信息
  3. 调用安全依赖验证认证信息
  4. 验证通过后,将用户信息传递给路由函数
  5. 验证失败则返回 401 或 403 错误

2.5 安全依赖优先级

FastAPI 中的安全依赖可以组合使用,优先级遵循以下规则:

  1. 显式依赖优先:直接在路由函数中声明的依赖优先于其他依赖
  2. 依赖链顺序:依赖链中的依赖按照声明顺序执行
  3. 异常处理:如果一个安全依赖抛出异常,后续的依赖不会执行

示例:

from fastapi import Depends, Security, HTTPException
from fastapi.security import HTTPBearer, HTTPBasic

bearer = HTTPBearer()
basic = HTTPBasic()

# 组合使用多种认证方式
@app.get("/protected")
async def protected(
    bearer_creds = Security(bearer, auto_error=False),
    basic_creds = Security(basic, auto_error=False)
):
    if bearer_creds:
        # 使用 Bearer 令牌认证
        return {"authenticated": True, "method": "bearer"}
    elif basic_creds:
        # 使用 Basic 认证
        return {"authenticated": True, "method": "basic"}
    else:
        # 认证失败
        raise HTTPException(
            status_code=401,
            detail="Not authenticated",
            headers={"WWW-Authenticate": "Bearer, Basic"},
        )

2.6 安全依赖缓存机制

FastAPI 支持对安全依赖进行缓存,以提高性能。可以使用 functools.lru_cache 或自定义缓存机制:

from fastapi import Depends, Security
from fastapi.security import HTTPBearer
from functools import lru_cache

bearer = HTTPBearer()

# 缓存安全依赖结果
@lru_cache(maxsize=100)
async def get_current_user(token: str = Security(bearer)):
    # 验证令牌
    # 这里可以添加令牌验证逻辑
    return {"user_id": "123", "username": "johndoe"}

@app.get("/protected")
async def protected(current_user = Depends(get_current_user)):
    return {"user": current_user}

缓存机制的最佳实践:

  • 合理设置缓存时间:根据令牌过期时间设置缓存时间
  • 缓存键设计:使用令牌作为缓存键,确保不同用户的缓存相互独立
  • 缓存清理:在令牌过期或用户登出时清理缓存

2.7 Starlette 底层安全实现

FastAPI 的安全模块基于 Starlette 的 authentication 模块,核心实现包括:

  • AuthenticationBackend:认证后端基类
  • BaseUser:用户基类
  • AuthCredentials:认证凭证

3. 基于 Token 的认证实现

3.1 JWT Token 生成与验证

JWT (JSON Web Token) 是一种无状态的认证机制,由三部分组成:

  • Header:包含令牌类型和算法
  • Payload:包含用户信息和声明
  • Signature:用于验证令牌真实性
import jwt
from jose import JWTError
from datetime import datetime, timedelta
from pydantic import BaseModel
from typing import Optional

# 数据模型
class TokenData(BaseModel):
    username: str

# 生成 JWT 令牌
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

# 验证 JWT 令牌
def verify_token(token: str, credentials_exception):
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except JWTError:
        raise credentials_exception
    return token_data

3.2 实现步骤与核心代码

  1. 安装依赖:pip install python-jose[cryptography] passlib[bcrypt]
  2. 配置 JWT 密钥和算法
  3. 实现用户认证和令牌生成
  4. 实现令牌验证依赖
  5. 保护需要认证的路由

3.3 Token 过期与刷新机制

  • 令牌过期:设置合理的过期时间,减少令牌泄露风险
  • 令牌刷新:实现刷新令牌机制,避免用户频繁登录

3.4 重要参数详解

  • SECRET_KEY:用于签名 JWT 令牌的密钥,应保密存储
  • ALGORITHM:使用的签名算法,推荐使用 HS256
  • ACCESS_TOKEN_EXPIRE_MINUTES:访问令牌的过期时间

3.5 常见错误与解决方案

  • 令牌过期:实现令牌刷新机制
  • 令牌无效:验证令牌签名和过期时间
  • 密钥泄露:定期轮换密钥

3.6 执行时序图

4. OAuth2 完整实现

4.1 OAuth2 授权码流程详解

OAuth2 授权码流程是最安全的 OAuth2 流程,适用于有后端的应用:

  1. 客户端请求授权码
  2. 用户授权
  3. 服务器返回授权码
  4. 客户端使用授权码获取令牌
  5. 客户端使用令牌访问资源

4.2 集成第三方认证(Google, GitHub)

以 GitHub 为例:

from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import OAuth2AuthorizationCodeBearer
from httpx import AsyncClient

app = FastAPI()

# 配置 OAuth2
oauth2_scheme = OAuth2AuthorizationCodeBearer(
    authorizationUrl="https://github.com/login/oauth/authorize",
    tokenUrl="https://github.com/login/oauth/access_token",
)

@app.get("/github/login")
async def github_login():
    # 重定向到 GitHub 授权页面
    import urllib.parse
    redirect_uri = "http://localhost:8000/github/callback"
    params = {
        "client_id": CLIENT_ID,
        "redirect_uri": redirect_uri,
        "scope": "user:email",
        "state": "random_state_string"  # 用于防止 CSRF 攻击
    }
    url = "https://github.com/login/oauth/authorize?" + urllib.parse.urlencode(params)
    from fastapi.responses import RedirectResponse
    return RedirectResponse(url)

@app.get("/github/callback")
async def github_callback(code: str):
    # 使用授权码获取令牌
    async with AsyncClient() as client:
        response = await client.post(
            "https://github.com/login/oauth/access_token",
            data={
                "client_id": CLIENT_ID,
                "client_secret": CLIENT_SECRET,
                "code": code,
            },
            headers={"Accept": "application/json"},
        )
    return response.json()

4.3 权限范围(Scope)管理

OAuth2 权限范围用于限制令牌的访问权限:

from fastapi import Security
from fastapi.security import OAuth2PasswordBearer

scopes = {
    "read": "Read access",
    "write": "Write access",
    "admin": "Admin access",
}

oauth2_scheme = OAuth2PasswordBearer(
    tokenUrl="/token",
    scopes=scopes,
)

@app.get("/items/")
async def read_items(credentials=Security(oauth2_scheme, scopes=["read"])):
    return {"items": ["item1", "item2"]}

4.4 重要参数配置

  • client_id:客户端 ID,由授权服务器分配
  • client_secret:客户端密钥,应保密存储
  • redirect_uri:授权后重定向的 URI
  • scope:请求的权限范围

4.5 安全隐患与防护措施

  • CSRF 攻击:使用 state 参数防止
  • 令牌泄露:使用 HTTPS 传输令牌
  • 授权码拦截:使用 PKCE 扩展(适用于公共客户端)

4.6 PKCE 扩展实现

PKCE (Proof Key for Code Exchange) 是 OAuth2 的扩展,用于增强公共客户端(如移动应用、单页应用)的安全性:

from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import OAuth2AuthorizationCodeBearer
from httpx import AsyncClient
import secrets
import hashlib
import base64

app = FastAPI()

# 生成 code_verifier
def generate_code_verifier():
    return secrets.token_urlsafe(64)

# 生成 code_challenge
def generate_code_challenge(code_verifier):
    code_challenge = hashlib.sha256(code_verifier.encode()).digest()
    code_challenge = base64.urlsafe_b64encode(code_challenge).decode().rstrip('=')
    return code_challenge

# 配置 OAuth2 带 PKCE
oauth2_scheme = OAuth2AuthorizationCodeBearer(
    authorizationUrl="https://github.com/login/oauth/authorize",
    tokenUrl="https://github.com/login/oauth/access_token",
)

@app.get("/github/login")
async def github_login():
    # 生成 code_verifier 和 code_challenge
    code_verifier = generate_code_verifier()
    code_challenge = generate_code_challenge(code_verifier)
    
    # 重定向到 GitHub 授权页面
    import urllib.parse
    redirect_uri = "http://localhost:8000/github/callback"
    params = {
        "client_id": CLIENT_ID,
        "redirect_uri": redirect_uri,
        "scope": "user:email",
        "state": "random_state_string",  # 用于防止 CSRF 攻击
        "code_challenge": code_challenge,
        "code_challenge_method": "S256"  # 使用 SHA-256 哈希方法
    }
    url = "https://github.com/login/oauth/authorize?" + urllib.parse.urlencode(params)
    from fastapi.responses import RedirectResponse
    # 保存 code_verifier 到会话或数据库
    # 这里简化处理,实际应存储到会话中
    return RedirectResponse(url)

@app.get("/github/callback")
async def github_callback(code: str, state: str):
    # 获取之前保存的 code_verifier
    code_verifier = "saved_code_verifier"  # 实际应从会话中获取
    
    # 使用授权码和 code_verifier 获取令牌
    async with AsyncClient() as client:
        response = await client.post(
            "https://github.com/login/oauth/access_token",
            data={
                "client_id": CLIENT_ID,
                "client_secret": CLIENT_SECRET,
                "code": code,
                "code_verifier": code_verifier
            },
            headers={"Accept": "application/json"},
        )
    return response.json()

4.7 代码实现与示例

完整的 OAuth2 实现示例:

from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from datetime import datetime, timedelta
from pydantic import BaseModel
from typing import Optional

# 配置
SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

# 密码加密上下文
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

# OAuth2 密码流
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token")

# 数据模型
class User(BaseModel):
    username: str
    email: Optional[str] = None
    full_name: Optional[str] = None
    disabled: Optional[bool] = None

class UserInDB(User):
    hashed_password: str

class Token(BaseModel):
    access_token: str
    token_type: str

class TokenData(BaseModel):
    username: Optional[str] = None

# 模拟数据库
fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",  # secret
        "disabled": False,
    }
}

# 密码验证
def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)

def get_password_hash(password):
    return pwd_context.hash(password)

# 获取用户
def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)

# 验证用户
def authenticate_user(fake_db, username: str, password: str):
    user = get_user(fake_db, username)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user

# 创建访问令牌
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

# 获取当前用户
async def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except JWTError:
        raise credentials_exception
    user = get_user(fake_users_db, username=token_data.username)
    if user is None:
        raise credentials_exception
    return user

# 获取活跃用户
async def get_current_active_user(current_user: User = Depends(get_current_user)):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user

# 路由
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )
    return {"access_token": access_token, "token_type": "bearer"}

@app.get("/users/me", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
    return current_user

@app.get("/users/me/items/")
async def read_own_items(current_user: User = Depends(get_current_active_user)):
    return [{"item_id": "Foo", "owner": current_user.username}]

4.8 OpenID Connect 集成

OpenID Connect (OIDC) 是建立在 OAuth2 之上的身份认证协议,提供了标准化的身份验证机制:

from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import OpenIDConnect

app = FastAPI()

# 配置 OpenID Connect
oidc_scheme = OpenIDConnect(
    openIdConnectUrl="https://accounts.google.com/.well-known/openid-configuration"
)

async def get_current_user(token: str = Depends(oidc_scheme)):
    # 验证 OIDC 令牌
    # 这里应该调用 OIDC 提供商的验证接口
    return token

@app.get("/profile")
async def get_profile(current_user: str = Depends(get_current_user)):
    return {"user": current_user}

OpenID Connect 的核心概念:

  • ID Token:包含用户身份信息的 JWT 令牌
  • UserInfo Endpoint:提供用户详细信息的端点
  • Discovery:通过发现文档自动配置 OIDC 客户端
  • Scope:请求的权限范围,如 openidprofileemail

5. 权限系统设计与实现

5.1 基于角色的访问控制(RBAC)

RBAC 是一种基于角色的权限管理模型,核心概念包括:

  • 用户(User):系统使用者
  • 角色(Role):权限的集合
  • 权限(Permission):对资源的操作许可
  • 资源(Resource):被保护的对象

5.2 基于策略的访问控制(ABAC)

ABAC 是一种基于属性的权限管理模型,根据用户、资源和环境的属性来决定权限。

5.3 权限装饰器的设计与使用

from functools import wraps
from fastapi import HTTPException, status

# 权限装饰器
def require_permission(permission):
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, current_user=None, **kwargs):
            if not current_user:
                raise HTTPException(
                    status_code=status.HTTP_401_UNAUTHORIZED,
                    detail="Not authenticated",
                )
            # 检查用户是否有指定权限
            # 确保用户对象有permissions属性
            if not hasattr(current_user, "permissions"):
                raise HTTPException(
                    status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
                    detail="User object missing permissions attribute",
                )
            if permission not in current_user.permissions:
                raise HTTPException(
                    status_code=status.HTTP_403_FORBIDDEN,
                    detail="Not enough permissions",
                )
            return await func(*args, current_user=current_user, **kwargs)
        return wrapper
    return decorator

# 使用权限装饰器
@app.get("/admin/")
@require_permission("admin")
async def admin_only(current_user: User = Depends(get_current_user)):
    return {"message": "Admin only"}

5.4 权限检查的执行流程

  1. 用户认证:验证用户身份
  2. 权限获取:获取用户拥有的权限
  3. 权限检查:检查用户是否有访问资源的权限
  4. 授权决策:根据权限检查结果决定是否授权

5.5 权限继承实现

权限继承是 RBAC 系统中的重要特性,允许角色继承其他角色的权限:

from typing import List, Dict

class Permission:
    def __init__(self, name: str, description: str):
        self.name = name
        self.description = description

class Role:
    def __init__(self, name: str, permissions: List[Permission] = None, parent_roles: List['Role'] = None):
        self.name = name
        self.permissions = permissions or []
        self.parent_roles = parent_roles or []
    
    def add_permission(self, permission: Permission):
        if permission not in self.permissions:
            self.permissions.append(permission)
    
    def remove_permission(self, permission: Permission):
        if permission in self.permissions:
            self.permissions.remove(permission)
    
    def add_parent_role(self, role: 'Role'):
        if role not in self.parent_roles:
            self.parent_roles.append(role)
    
    def get_all_permissions(self) -> List[Permission]:
        """获取角色及其父角色的所有权限"""
        all_permissions = set(self.permissions)
        for parent_role in self.parent_roles:
            parent_permissions = parent_role.get_all_permissions()
            all_permissions.update(parent_permissions)
        return list(all_permissions)

class User:
    def __init__(self, username: str, roles: List[Role] = None):
        self.username = username
        self.roles = roles or []
    
    def add_role(self, role: Role):
        if role not in self.roles:
            self.roles.append(role)
    
    def remove_role(self, role: Role):
        if role in self.roles:
            self.roles.remove(role)
    
    def get_all_permissions(self) -> List[Permission]:
        """获取用户所有角色的权限"""
        all_permissions = set()
        for role in self.roles:
            role_permissions = role.get_all_permissions()
            all_permissions.update(role_permissions)
        return list(all_permissions)
    
    def has_permission(self, permission_name: str) -> bool:
        """检查用户是否拥有指定权限"""
        all_permissions = self.get_all_permissions()
        return any(perm.name == permission_name for perm in all_permissions)

# 示例使用
# 创建权限
read_perm = Permission("read", "Read access")
write_perm = Permission("write", "Write access")
admin_perm = Permission("admin", "Admin access")

# 创建角色
user_role = Role("user", [read_perm])
editor_role = Role("editor", [write_perm], [user_role])  # 继承 user 角色的权限
admin_role = Role("admin", [admin_perm], [editor_role])  # 继承 editor 角色的权限

# 创建用户
user = User("johndoe", [editor_role])

# 检查权限
print(user.has_permission("read"))  # True,从 user_role 继承
print(user.has_permission("write"))  # True,从 editor_role 直接获取
print(user.has_permission("admin"))  # False,没有 admin 角色

5.6 类图与架构设计

5.7 错误处理与异常机制

from fastapi import HTTPException, status
from passlib.context import CryptContext

# 密码加密上下文
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

# 模拟数据库
fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",  # secret
        "disabled": False,
        "permissions": ["read", "write"],
    }
}

# 数据模型
class User:
    def __init__(self, username, hashed_password, disabled=False, permissions=None):
        self.username = username
        self.hashed_password = hashed_password
        self.disabled = disabled
        self.permissions = permissions or []

# 获取用户
def get_user(username: str):
    if username in fake_users_db:
        user_dict = fake_users_db[username]
        return User(
            username=user_dict["username"],
            hashed_password=user_dict["hashed_password"],
            disabled=user_dict["disabled"],
            permissions=user_dict.get("permissions", [])
        )
    return None

# 密码验证
def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)

# 认证异常
def authenticate_user(username: str, password: str):
    user = get_user(username)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    if not verify_password(password, user.hashed_password):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    return user

# 权限异常
def check_permission(user: User, permission: str):
    if not hasattr(user, "permissions"):
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail="User object missing permissions attribute",
        )
    if permission not in user.permissions:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="Not enough permissions",
        )

6. 高级认证场景

6.1 多因素认证(MFA)实现

多因素认证结合了多种认证因素,提高安全性:

  • 知识因素:密码、PIN 码
  • 拥有因素:手机、安全令牌
  • 生物因素:指纹、面部识别
from pyotp import TOTP

# 生成 TOTP 密钥
def generate_totp_secret():
    return TOTP.random_base32()

# 验证 TOTP 码
def verify_totp(secret: str, code: str):
    totp = TOTP(secret)
    return totp.verify(code)

# MFA 认证流程
@app.post("/mfa/verify")
async def verify_mfa(code: str, current_user: User = Depends(get_current_user)):
    # 确保用户对象有totp_secret属性
    if not hasattr(current_user, "totp_secret"):
        raise HTTPException(status_code=400, detail="User has not set up MFA")
    if not verify_totp(current_user.totp_secret, code):
        raise HTTPException(status_code=401, detail="Invalid MFA code")
    # 生成带有 MFA 标记的令牌
    access_token = create_access_token(
        data={"sub": current_user.username, "mfa_verified": True}
    )
    return {"access_token": access_token, "token_type": "bearer"}

6.2 API 密钥认证

API 密钥认证适用于服务间通信:

from fastapi import Depends, HTTPException, status
from fastapi.security import APIKeyHeader

api_key_header = APIKeyHeader(name="X-API-Key")

async def get_api_key(api_key: str = Depends(api_key_header)):
    if api_key != API_KEY:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid API key",
        )
    return api_key

@app.get("/api/data")
async def get_api_data(api_key: str = Depends(get_api_key)):
    return {"data": "Protected data"}

6.3 Session 认证

Session 认证使用服务器端存储的会话。FastAPI 本身不直接提供 Session 功能,需要使用 Starlette 的会话中间件:

from fastapi import FastAPI, Depends, HTTPException
from starlette.middleware.sessions import SessionMiddleware
from starlette.requests import Request

app = FastAPI()
app.add_middleware(SessionMiddleware, secret_key="your-secret-key")

# 自定义 get_session 依赖
async def get_session(request: Request):
    return request.session

@app.post("/login")
async def login(username: str, password: str, session: dict = Depends(get_session)):
    # 验证用户
    user = authenticate_user(username, password)
    if not user:
        raise HTTPException(status_code=401, detail="Incorrect username or password")
    # 确保用户对象有id属性
    if not hasattr(user, "id"):
        # 如果用户对象没有id属性,使用username作为唯一标识
        session["user_id"] = user.username
    else:
        session["user_id"] = user.id
    return {"message": "Login successful"}

@app.get("/protected")
async def protected(session: dict = Depends(get_session)):
    if "user_id" not in session:
        raise HTTPException(status_code=401, detail="Not authenticated")
    return {"message": "Protected resource"}

6.4 联合认证(SSO)集成

联合认证允许用户使用一个账号登录多个系统:

from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import OAuth2AuthorizationCodeBearer

app = FastAPI()

# 配置 SSO
sso_scheme = OAuth2AuthorizationCodeBearer(
    authorizationUrl="https://sso.example.com/auth",
    tokenUrl="https://sso.example.com/token",
)

async def verify_sso_token(token: str):
    # 验证 SSO 令牌
    # 这里应该调用 SSO 提供商的验证接口
    # 示例实现,实际应根据具体 SSO 提供商进行调整
    import httpx
    try:
        async with httpx.AsyncClient() as client:
            response = await client.get(
                "https://sso.example.com/userinfo",
                headers={"Authorization": f"Bearer {token}"}
            )
            if response.status_code == 200:
                return response.json()
            return None
    except Exception:
        return None

async def get_current_user(token: str = Depends(sso_scheme)):
    # 验证 SSO 令牌
    user_info = await verify_sso_token(token)
    if not user_info:
        raise HTTPException(status_code=401, detail="Invalid SSO token")
    return user_info

@app.get("/profile")
async def get_profile(current_user: dict = Depends(get_current_user)):
    return current_user

6.5 JWT 令牌撤销机制

JWT 令牌默认是无状态的,一旦颁发就无法直接撤销。实现令牌撤销机制的常用方法:

import redis
from datetime import datetime, timedelta
from jose import JWTError, jwt
from pydantic import BaseModel
from typing import Optional
from fastapi import Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer

# 数据模型
class TokenData(BaseModel):
    username: Optional[str] = None

# Redis 连接
redis_client = redis.Redis(host="localhost", port=6379, db=0)

# 配置
SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

# OAuth2 密码流
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token")

# 生成 JWT 令牌
def create_access_token(data: dict, expires_delta: timedelta = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

# 撤销令牌(添加到黑名单)
def revoke_token(token: str, expires_at: datetime):
    # 计算令牌剩余有效期
    remaining_seconds = int((expires_at - datetime.utcnow()).total_seconds())
    if remaining_seconds > 0:
        # 将令牌添加到 Redis 黑名单,过期时间设置为令牌剩余有效期
        redis_client.setex(f"blacklist:{token}", remaining_seconds, "1")

# 检查令牌是否已撤销
def is_token_revoked(token: str):
    return redis_client.exists(f"blacklist:{token}")

# 验证令牌
def verify_token(token: str, credentials_exception):
    # 检查令牌是否已撤销
    if is_token_revoked(token):
        raise credentials_exception
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except JWTError:
        raise credentials_exception
    return token_data

# 登出接口
@app.post("/logout")
async def logout(token: str = Depends(oauth2_scheme)):
    # 解码令牌获取过期时间
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        exp = payload.get("exp")
        if exp:
            expires_at = datetime.fromtimestamp(exp)
            revoke_token(token, expires_at)
            return {"message": "Successfully logged out"}
        else:
            raise HTTPException(status_code=400, detail="Invalid token")
    except JWTError:
        raise HTTPException(status_code=400, detail="Invalid token")

6.7 认证方案组合使用

在实际项目中,常常需要组合使用多种认证方案,以满足不同场景的需求:

6.7.1 API 网关 + 微服务认证
# API 网关层面的认证
from fastapi import FastAPI, Depends
from fastapi.security import OAuth2PasswordBearer

app = FastAPI()

# 网关认证
gateway_auth = OAuth2PasswordBearer(tokenUrl="/gateway/token")

# 微服务认证
microservice_auth = OAuth2PasswordBearer(tokenUrl="/microservice/token")

# 组合认证
def get_current_user(
    gateway_token: str = Depends(gateway_auth),
    microservice_token: str = Depends(microservice_auth)
):
    # 验证网关令牌
    # 验证微服务令牌
    return {"gateway_token": gateway_token, "microservice_token": microservice_token}

@app.get("/protected")
async def protected(current_user = Depends(get_current_user)):
    return {"message": "Protected resource", "user": current_user}
6.7.2 内部系统 + 外部用户认证
from fastapi import FastAPI, Depends, HTTPException, Security
from fastapi.security import HTTPBasic, HTTPBearer

app = FastAPI()

# 内部系统认证
basic_auth = HTTPBasic()

# 外部用户认证
bearer_auth = HTTPBearer()

# 组合认证
async def get_user(
    basic_creds = Security(basic_auth, auto_error=False),
    bearer_creds = Security(bearer_auth, auto_error=False)
):
    # 优先使用内部系统认证
    if basic_creds and basic_creds.username == "admin" and basic_creds.password == "secret":
        return {"type": "internal", "username": basic_creds.username}
    # 其次使用外部用户认证
    elif bearer_creds:
        # 验证 bearer 令牌
        return {"type": "external", "token": bearer_creds.credentials}
    else:
        # 认证失败
        raise HTTPException(
            status_code=401,
            detail="Not authenticated",
            headers={"WWW-Authenticate": "Basic, Bearer"},
        )

@app.get("/api")
async def api(user = Depends(get_user)):
    return {"message": "API accessed", "user": user}

6.8 前后端认证交互流程

7. 开发与生产最佳实践

7.1 密码存储安全最佳实践

  • 使用 bcrypt 等强哈希算法
  • 添加随机盐值
  • 定期提醒用户更改密码
  • 实现密码强度检查

7.2 Token 泄露与防护

  • 使用 HTTPS 传输令牌
  • 设置合理的令牌过期时间
  • 实现令牌刷新机制
  • 监控异常令牌使用

7.3 CORS 配置与安全参数

from fastapi.middleware.cors import CORSMiddleware
import os

# 根据环境变量配置 CORS
if os.getenv("ENVIRONMENT") == "production":
    # 生产环境配置
    app.add_middleware(
        CORSMiddleware,
        allow_origins=["https://example.com", "https://www.example.com"],  # 生产环境应设置具体域名
        allow_credentials=True,
        allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"],  # 只允许必要的方法
        allow_headers=["Authorization", "Content-Type"],  # 只允许必要的头部
    )
else:
    # 开发环境配置
    app.add_middleware(
        CORSMiddleware,
        allow_origins=["*"],  # 开发环境可以使用通配符
        allow_credentials=True,
        allow_methods=["*"],
        allow_headers=["*"],
    )

7.4 速率限制与防暴力破解

  • 实现登录尝试限制
  • 使用验证码防止自动化攻击
  • 监控异常登录行为

7.5 日志记录与审计

  • 记录认证和授权事件
  • 监控异常访问模式
  • 定期审计权限配置

7.6 会话管理与安全

  • 设置合理的会话过期时间
  • 实现会话失效机制
  • 防止会话固定攻击

7.7 密钥管理与轮换

  • 使用环境变量存储密钥
  • 定期轮换密钥
  • 使用密钥管理服务

7.8 证书配置与 HTTPS

  • 使用有效的 SSL/TLS 证书
  • 配置安全的 TLS 版本和密码套件
  • 实现 HSTS 头部

7.9 安全头部设置

from fastapi import FastAPI, Response

@app.middleware("http")
async def add_security_headers(request, call_next):
    response = await call_next(request)
    response.headers["X-Content-Type-Options"] = "nosniff"
    response.headers["X-Frame-Options"] = "DENY"
    response.headers["X-XSS-Protection"] = "1; mode=block"
    response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"
    return response

7.10 监控与告警

  • 监控认证失败率
  • 监控异常访问模式
  • 实现安全事件告警

7.11 部署与扩展策略

  • 使用负载均衡器
  • 实现水平扩展
  • 配置健康检查

7.12 性能优化与缓存

  • 缓存用户信息和权限
  • 使用 Redis 存储会话和令牌
  • 优化数据库查询

8. 实际项目案例分析

8.1 企业级应用认证架构

企业级应用通常需要复杂的认证架构:

  • 单点登录(SSO)系统
  • 多因素认证
  • 权限管理系统
  • 审计日志

8.2 微服务架构中的认证方案

微服务架构中的认证方案:

  • 集中式认证服务
  • JWT 令牌在服务间传递
  • 服务间通信使用 API 密钥

8.3 前后端分离项目的认证实现

前后端分离项目的认证实现:

  • 前端存储 JWT 令牌
  • 拦截器自动添加认证头
  • 处理令牌过期和刷新

8.4 常见架构模式与最佳实践

  • 认证服务模式:独立的认证服务
  • API 网关模式:网关处理认证
  • 服务网格模式:服务间认证

8.5 性能与安全的平衡策略

  • 令牌缓存
  • 会话管理
  • 安全与性能的权衡

8.6 不同部署环境下的认证性能对比

不同认证方案在不同部署环境下的性能表现:

认证方案 容器环境 云服务环境 本地开发环境 高并发场景
JWT
OAuth2
API Key
Session
Basic Auth

部署环境优化建议:

  • 容器环境:推荐使用 JWT 或 API Key 认证,避免使用 Session
  • 云服务环境:使用 JWT 认证,配合云服务的密钥管理服务
  • 本地开发环境:可以使用任何认证方案,根据开发需求选择
  • 高并发场景:推荐使用 JWT 或 API Key 认证,配合缓存机制

8.7 实战案例:企业级认证系统

8.7.1 系统架构
flowchart TD
    A[客户端] --> B[API 网关]
    B --> C[认证服务]
    C --> D[用户数据库]
    C --> E[令牌存储]
    B --> F[业务服务]
    F --> C
8.7.2 实现要点
  1. 集中式认证服务:独立的认证服务,处理所有认证请求
  2. 多因素认证:支持密码 + 短信/邮件验证码 + TOTP
  3. 令牌管理:使用 Redis 存储令牌,支持令牌过期和刷新
  4. 权限管理:基于 RBAC 的权限系统,支持细粒度权限控制
  5. 审计日志:记录所有认证和授权事件
8.7.3 代码示例
# 认证服务核心代码
from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from datetime import datetime, timedelta
from pydantic import BaseModel
from typing import Optional
import redis

app = FastAPI()

# Redis 连接
redis_client = redis.Redis(host="localhost", port=6379, db=0)

# 配置
SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

# 密码加密上下文
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

# OAuth2 密码流
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token")

# 数据模型
class User(BaseModel):
    username: str
    email: Optional[str] = None
    full_name: Optional[str] = None
    disabled: Optional[bool] = None

class UserInDB(User):
    hashed_password: str

class Token(BaseModel):
    access_token: str
    token_type: str

class TokenData(BaseModel):
    username: Optional[str] = None

# 模拟数据库
fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "<!--MATH_PH_1-->12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",  # secret
        "disabled": False,
    }
}

# 密码验证
def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)

def get_password_hash(password):
    return pwd_context.hash(password)

# 获取用户
def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)
    return None

# 验证用户
def authenticate_user(fake_db, username: str, password: str):
    user = get_user(fake_db, username)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user

# 创建访问令牌
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    # 存储令牌到 Redis
    redis_client.setex(f"token:{encoded_jwt}", int(expires_delta.total_seconds()) if expires_delta else 900, "1")
    return encoded_jwt

# 验证令牌
def verify_token(token: str, credentials_exception):
    # 检查令牌是否在 Redis 中
    if not redis_client.exists(f"token:{token}"):
        raise credentials_exception
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except JWTError:
        raise credentials_exception
    return token_data

# 获取当前用户
async def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=401,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    token_data = verify_token(token, credentials_exception)
    user = get_user(fake_users_db, username=token_data.username)
    if user is None:
        raise credentials_exception
    return user

# 路由
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=401,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )
    return {"access_token": access_token, "token_type": "bearer"}

@app.post("/logout")
async def logout(token: str = Depends(oauth2_scheme)):
    # 从 Redis 中删除令牌
    redis_client.delete(f"token:{token}")
    return {"message": "Successfully logged out"}
Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐