开篇引言

        在上一篇《FastAPI进阶》中,我们深入探讨了依赖注入系统、异步数据库操作、中间件开发、测试策略等核心机制。掌握了这些知识后,你已经能够构建一个功能完整、结构清晰的Web API。

        但真正的生产级应用远不止于此。当你面对高并发场景、复杂的业务逻辑、微服务架构时,会发现新的挑战接踵而至:

  • 如何处理长时间运行的异步任务而不阻塞请求响应?
  • 多服务之间如何实现高效的RPC通信?
  • 如何构建一个高可用的分布式任务队列?
  • 在高并发下,连接池、缓存策略如何优化?
  • 如何实现细粒度的权限控制和审计日志?

本文将深入探讨这些进阶主题,带你从"能构建API"进阶到"能构建生产级微服务系统"。我们将聚焦于:

  1. 异步任务处理与Celery集成 - 解决长时间任务处理问题
  2. gRPC服务间通信 - 构建高效微服务架构
  3. 分布式锁与缓存策略 - 解决并发竞争问题
  4. WebSocket实时通信 - 处理实时数据推送场景
  5. API版本管理与兼容性 - 实现平滑的系统演进

这些技术点的价值在于:它们是构建企业级、高可用、高性能系统的必备能力,也是区分中级开发者与高级架构师的关键技能。

一、异步任务处理与Celery集成

概念解析

在Web开发中,有些操作耗时较长,如:

  • 发送邮件(SMTP握手+传输需要数秒)
  • 生成PDF报告(可能需要数十秒)
  • 大数据导出(可能需要数分钟)
  • 图像处理/转码(CPU密集型)

如果在HTTP请求中同步执行这些任务,会导致:

  1. 用户体验差(长时间等待响应)
  2. 服务器资源被长时间占用
  3. 可能超时失败

Celery是一个强大的分布式任务队列,与FastAPI结合可以实现:

  • 任务异步化:立即返回响应,后台处理任务
  • 任务调度:定时任务、延迟任务
  • 任务重试:失败自动重试
  • 任务监控:查看任务状态、进度

架构设计

┌─────────────┐         ┌─────────────┐         ┌─────────────┐
│   FastAPI   │         │   Redis     │         │   Celery    │
│  (Producer) │────────>│  (Broker)   │────────>│  (Worker)   │
└─────────────┘         └─────────────┘         └─────────────┘
       │                       │                       │
       v                       v                       v
  立即返回任务ID           消息队列                  执行任务
  或状态查询              持久化存储                访问资源

完整代码实现

1. 项目结构
fastapi_celery_app/
├── app/
│   ├── __init__.py
│   ├── main.py              # FastAPI应用
│   ├── config.py            # 配置管理
│   ├── tasks.py             # Celery任务定义
│   ├── schemas.py           # Pydantic模型
│   └── dependencies.py      # 依赖注入
├── worker/
│   └── celery_worker.py     # Celery Worker启动
└── requirements.txt
2. 配置管理(config.py)
from pydantic_settings import BaseSettings
from functools import lru_cache
class Settings(BaseSettings):
    """应用配置类"""
    # 应用基础配置
    APP_NAME: str = "FastAPI + Celery 应用"
    APP_VERSION: str = "2.0.0"
    # Redis配置(Celery Broker)
    REDIS_HOST: str = "localhost"
    REDIS_PORT: int = 6379
    REDIS_DB: int = 0
    REDIS_PASSWORD: str = ""
    # Celery配置
    CELERY_BROKER_URL: str = ""  # 自动生成
    CELERY_RESULT_BACKEND: str = ""  # 自动生成
    CELERY_TASK_TRACK_STARTED: bool = True
    CELERY_TASK_TIME_LIMIT: int = 3600  # 任务超时1小时
    # 文件存储配置
    UPLOAD_DIR: str = "./uploads"
    REPORTS_DIR: str = "./reports"
    class Config:
        env_file = ".env"
        case_sensitive = True
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        # 自动生成Celery URL
        self.CELERY_BROKER_URL = f"redis://:{self.REDIS_PASSWORD}@{self.REDIS_HOST}:{self.REDIS_PORT}/{self.REDIS_DB}"
        self.CELERY_RESULT_BACKEND = f"redis://:{self.REDIS_PASSWORD}@{self.REDIS_HOST}:{self.REDIS_PORT}/{self.REDIS_DB}"
@lru_cache()
def get_settings() -> Settings:
    """获取配置实例(单例模式)"""
    return Settings()
3. Celery任务定义(tasks.py)
from celery import Celery
from celery.schedules import crontab
from app.config import get_settings
from pathlib import Path
from datetime import datetime
import os
import time
# 获取配置
settings = get_settings()
# 创建Celery实例
celery_app = Celery(
    "fastapi_tasks",
    broker=settings.CELERY_BROKER_URL,
    backend=settings.CELERY_RESULT_BACKEND
)
# Celery配置
celery_app.conf.update(
    task_serializer='json',
    accept_content=['json'],
    result_serializer='json',
    timezone='Asia/Shanghai',
    enable_utc=True,
    task_track_started=True,
    task_time_limit=settings.CELERY_TASK_TIME_LIMIT,
    task_soft_time_limit=3300,  # 软超时55分钟
    worker_prefetch_multiplier=4,  # 每个worker预取4个任务
    worker_max_tasks_per_child=1000,  # 防止内存泄漏,每执行1000个任务重启worker
)
# ==================== 任务定义 ====================
@celery_app.task(
    bind=True,  # 允许任务访问自身状态
    max_retries=3,  # 最多重试3次
    default_retry_delay=60  # 默认延迟60秒重试
)
def send_email_task(self, to_email: str, subject: str, body: str):
    """
    发送邮件任务
    Args:
        self: 任务实例(bind=True时可用)
        to_email: 收件人邮箱
        subject: 邮件主题
        body: 邮件正文
    Returns:
        dict: 发送结果
    """
    try:
        # 模拟邮件发送过程(实际项目中使用smtplib或第三方API)
        task_id = self.request.id
        print(f"[{task_id}] 开始发送邮件到: {to_email}")
        # 更新任务状态为处理中
        self.update_state(
            state='PROGRESS',
            meta={'current': 0, 'total': 100, 'status': '正在连接SMTP服务器...'}
        )
        time.sleep(2)  # 模拟连接
        self.update_state(
            state='PROGRESS',
            meta={'current': 30, 'total': 100, 'status': '正在发送数据...'}
        )
        time.sleep(3)  # 模拟发送
        self.update_state(
            state='PROGRESS',
            meta={'current': 90, 'total': 100, 'status': '正在完成...'}
        )
        time.sleep(1)  # 模拟完成
        # 返回结果
        return {
            'status': 'success',
            'message': f'邮件已成功发送到 {to_email}',
            'task_id': task_id
        }
    except Exception as exc:
        # 记录错误并重试
        print(f"邮件发送失败: {str(exc)}")
        # 如果未达到最大重试次数,则重试
        raise self.retry(exc=exc, countdown=60)
@celery_app.task(bind=True)
def generate_pdf_report_task(self, report_id: int, data: dict):
    """
    生成PDF报告任务
    Args:
        report_id: 报告ID
        data: 报告数据
    Returns:
        dict: 生成的报告文件信息
    """
    try:
        task_id = self.request.id
        print(f"[{task_id}] 开始生成报告 {report_id}")
        # 确保输出目录存在
        reports_dir = Path(settings.REPORTS_DIR)
        reports_dir.mkdir(parents=True, exist_ok=True)
        # 更新任务进度
        self.update_state(
            state='PROGRESS',
            meta={'current': 10, 'total': 100, 'status': '正在收集数据...'}
        )
        time.sleep(1)
        # 模拟数据处理(实际项目中从数据库查询)
        total_records = len(data.get('records', []))
        records_processed = 0
        self.update_state(
            state='PROGRESS',
            meta={'current': 30, 'total': 100, 'status': f'正在处理 {total_records} 条记录...'}
        )
        # 处理每条记录
        for i, record in enumerate(data.get('records', [])):
            # 模拟处理时间
            time.sleep(0.1)
            records_processed = i + 1
            progress = 30 + int((records_processed / total_records) * 50)
            # 每10条记录更新一次进度
            if records_processed % 10 == 0:
                self.update_state(
                    state='PROGRESS',
                    meta={
                        'current': progress,
                        'total': 100,
                        'status': f'已处理 {records_processed}/{total_records} 条记录...'
                    }
                )
        # 生成PDF文件(简化版,实际使用reportlab或weasyprint)
        filename = f"report_{report_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.pdf"
        filepath = reports_dir / filename
        # 模拟PDF生成
        self.update_state(
            state='PROGRESS',
            meta={'current': 85, 'total': 100, 'status': '正在生成PDF...'}
        )
        time.sleep(2)
        # 写入模拟内容
        with open(filepath, 'w', encoding='utf-8') as f:
            f.write(f"Report ID: {report_id}\n")
            f.write(f"Generated at: {datetime.now()}\n")
            f.write(f"Total records: {total_records}\n")
        self.update_state(
            state='PROGRESS',
            meta={'current': 100, 'total': 100, 'status': '完成!'}
        )
        return {
            'status': 'success',
            'report_id': report_id,
            'filename': filename,
            'filepath': str(filepath),
            'file_size': os.path.getsize(filepath),
            'task_id': task_id
        }
    except Exception as exc:
        print(f"报告生成失败: {str(exc)}")
        raise self.retry(exc=exc, countdown=120)
@celery_app.task(bind=True)
def process_image_upload_task(self, image_path: str, operations: list):
    """
    图像处理任务(CPU密集型)
    Args:
        image_path: 图像文件路径
        operations: 操作列表,如 ['resize_800x600', 'watermark', 'optimize']
    Returns:
        dict: 处理后的图像信息
    """
    try:
        task_id = self.request.id
        print(f"[{task_id}] 开始处理图像: {image_path}")
        self.update_state(
            state='PROGRESS',
            meta={'current': 0, 'total': len(operations), 'status': '加载图像...'}
        )
        time.sleep(1)
        results = []
        for i, operation in enumerate(operations):
            # 模拟图像处理
            print(f"执行操作: {operation}")
            time.sleep(2)
            results.append({
                'operation': operation,
                'status': 'completed',
                'output_path': f"{image_path}.{operation}"
            })
            self.update_state(
                state='PROGRESS',
                meta={
                    'current': i + 1,
                    'total': len(operations),
                    'status': f'完成: {operation}'
                }
            )
        return {
            'status': 'success',
            'image_path': image_path,
            'operations_performed': results,
            'task_id': task_id
        }
    except Exception as exc:
        print(f"图像处理失败: {str(exc)}")
        raise self.retry(exc=exc, countdown=30)
# ==================== 定时任务 ====================
@celery_app.task
def cleanup_old_reports():
    """清理超过30天的旧报告"""
    print("开始清理旧报告...")
    reports_dir = Path(settings.REPORTS_DIR)
    cutoff_time = time.time() - (30 * 24 * 60 * 60)  # 30天前
    deleted_count = 0
    for file_path in reports_dir.glob('*.pdf'):
        if file_path.stat().st_mtime < cutoff_time:
            file_path.unlink()
            deleted_count += 1
            print(f"删除旧报告: {file_path.name}")
    return {
        'status': 'success',
        'deleted_count': deleted_count
    }
# 配置定时任务
celery_app.conf.beat_schedule = {
    # 每天凌晨2点清理旧报告
    'cleanup-old-reports-daily': {
        'task': 'app.tasks.cleanup_old_reports',
        'schedule': crontab(hour=2, minute=0),
    },
    # 每5分钟检查系统状态
    'check-system-status': {
        'task': 'app.tasks.check_system_status',
        'schedule': crontab(minute='*/5'),
    }
}
4. FastAPI应用(main.py)
from fastapi import FastAPI, HTTPException, status, BackgroundTasks
from fastapi.responses import FileResponse
from celery.result import AsyncResult
from app.tasks import celery_app
from app.schemas import (
    EmailRequest,
    ReportRequest,
    ImageProcessingRequest,
    TaskStatusResponse
)
from app.config import get_settings
import os
settings = get_settings()
app = FastAPI(
    title="FastAPI + Celery 异步任务管理",
    version="2.0.0",
    description="展示FastAPI与Celery集成的异步任务处理能力"
)
# ==================== API端点 ====================
@app.get("/")
async def root():
    """根路径"""
    return {
        "message": "FastAPI + Celery 异步任务管理API",
        "version": settings.APP_VERSION
    }
@app.post("/tasks/send-email", response_model=dict)
async def send_email(request: EmailRequest):
    """
    发送邮件(异步任务)
    立即返回任务ID,后台执行邮件发送
    """
    # 调用Celery任务
    task = send_email_task.delay(
        to_email=request.to_email,
        subject=request.subject,
        body=request.body
    )
    return {
        "task_id": task.id,
        "status": "submitted",
        "message": "邮件任务已提交,正在后台处理",
        "check_status_url": f"/tasks/{task.id}/status"
    }
@app.post("/tasks/generate-report", response_model=dict)
async def generate_report(request: ReportRequest):
    """
    生成PDF报告(异步任务)
    适用于大数据量报告生成,避免HTTP超时
    """
    task = generate_pdf_report_task.delay(
        report_id=request.report_id,
        data=request.data.dict() if request.data else {}
    )
    return {
        "task_id": task.id,
        "status": "submitted",
        "message": "报告生成任务已提交",
        "check_status_url": f"/tasks/{task.id}/status"
    }
@app.post("/tasks/process-image", response_model=dict)
async def process_image(request: ImageProcessingRequest):
    """
    处理图像(异步任务)
    CPU密集型任务,适合异步处理
    """
    task = process_image_upload_task.delay(
        image_path=request.image_path,
        operations=request.operations
    )
    return {
        "task_id": task.id,
        "status": "submitted",
        "message": "图像处理任务已提交",
        "check_status_url": f"/tasks/{task.id}/status"
    }
@app.get("/tasks/{task_id}/status", response_model=TaskStatusResponse)
async def get_task_status(task_id: str):
    """
    查询任务状态
    支持长轮询或轮询获取任务进度
    """
    # 获取任务结果
    task_result = AsyncResult(task_id, app=celery_app)
    # 构建响应
    response = {
        "task_id": task_id,
        "status": task_result.status,
        "result": None,
        "error": None,
        "progress": None
    }
    if task_result.state == 'PENDING':
        response["message"] = "任务等待中..."
    elif task_result.state == 'STARTED':
        response["message"] = "任务已开始处理"
    elif task_result.state == 'PROGRESS':
        # 任务执行中,返回进度信息
        response["message"] = "任务执行中"
        response["progress"] = task_result.info
        response["result"] = task_result.info
    elif task_result.state == 'SUCCESS':
        # 任务成功完成
        response["message"] = "任务完成"
        response["result"] = task_result.result
    elif task_result.state == 'FAILURE':
        # 任务失败
        response["message"] = "任务失败"
        response["error"] = str(task_result.info)
    return response
@app.get("/tasks/{task_id}/result")
async def get_task_result(task_id: str):
    """
    获取任务最终结果
    仅在任务成功时返回
    """
    task_result = AsyncResult(task_id, app=celery_app)
    if task_result.state == 'PENDING':
        raise HTTPException(
            status_code=status.HTTP_202_ACCEPTED,
            detail="任务还在队列中等待处理"
        )
    elif task_result.state == 'STARTED':
        raise HTTPException(
            status_code=status.HTTP_202_ACCEPTED,
            detail="任务正在处理中"
        )
    elif task_result.state == 'PROGRESS':
        raise HTTPException(
            status_code=status.HTTP_202_ACCEPTED,
            detail="任务执行中",
            headers={"Location": f"/tasks/{task_id}/status"}
        )
    elif task_result.state == 'SUCCESS':
        return {
            "task_id": task_id,
            "status": "success",
            "result": task_result.result
        }
    elif task_result.state == 'FAILURE':
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail=f"任务执行失败: {str(task_result.info)}"
        )
@app.get("/reports/download/{filename}")
async def download_report(filename: str):
    """
    下载生成的报告文件
    Args:
        filename: 报告文件名
    """
    filepath = os.path.join(settings.REPORTS_DIR, filename)
    # 检查文件是否存在
    if not os.path.exists(filepath):
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="报告文件不存在"
        )
    # 检查文件类型
    if not filename.endswith('.pdf'):
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="仅支持PDF格式文件下载"
        )
    # 返回文件
    return FileResponse(
        path=filepath,
        filename=filename,
        media_type='application/pdf'
    )
@app.delete("/tasks/{task_id}")
async def cancel_task(task_id: str):
    """
    取消任务
    只能取消未开始或正在执行的任务
    """
    task_result = AsyncResult(task_id, app=celery_app)
    if task_result.state in ['PENDING', 'STARTED', 'PROGRESS']:
        # 撤销任务
        celery_app.control.revoke(task_id, terminate=True)
        return {
            "task_id": task_id,
            "status": "cancelled",
            "message": "任务已取消"
        }
    elif task_result.state == 'SUCCESS':
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="任务已完成,无法取消"
        )
    else:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail=f"无法取消状态为 {task_result.state} 的任务"
        )
@app.get("/workers/stats")
async def get_workers_stats():
    """
    获取Worker统计信息
    """
    inspect = celery_app.control.inspect()
    # 获取所有注册的任务
    registered_tasks = inspect.registered()
    # 获取活跃任务
    active_tasks = inspect.active()
    # 获取计划任务
    scheduled_tasks = inspect.scheduled()
    return {
        "workers_count": len(registered_tasks) if registered_tasks else 0,
        "registered_tasks": registered_tasks,
        "active_tasks": active_tasks,
        "scheduled_tasks": scheduled_tasks
    }
5. 数据模型(schemas.py)
from pydantic import BaseModel, EmailStr, Field
from typing import List, Dict, Any, Optional
class EmailRequest(BaseModel):
    """邮件发送请求"""
    to_email: EmailStr = Field(..., description="收件人邮箱")
    subject: str = Field(..., min_length=1, max_length=200, description="邮件主题")
    body: str = Field(..., min_length=1, description="邮件正文")
    class Config:
        json_schema_extra = {
            "example": {
                "to_email": "user@example.com",
                "subject": "系统通知",
                "body": "您好,这是一封测试邮件。"
            }
        }
class ReportRequest(BaseModel):
    """报告生成请求"""
    report_id: int = Field(..., description="报告ID")
    data: Optional[Dict[str, Any]] = Field(default=None, description="报告数据")
    class Config:
        json_schema_extra = {
            "example": {
                "report_id": 12345,
                "data": {
                    "title": "月度销售报告",
                    "records": [{"name": "产品A", "sales": 1000}]
                }
            }
        }
class ImageProcessingRequest(BaseModel):
    """图像处理请求"""
    image_path: str = Field(..., description="图像文件路径")
    operations: List[str] = Field(
        ...,
        description="操作列表,如 ['resize_800x600', 'watermark']"
    )
    class Config:
        json_schema_extra = {
            "example": {
                "image_path": "/uploads/image.jpg",
                "operations": ["resize_800x600", "watermark", "optimize"]
            }
        }
class TaskStatusResponse(BaseModel):
    """任务状态响应"""
    task_id: str
    status: str
    message: Optional[str] = None
    result: Optional[Dict[str, Any]] = None
    error: Optional[str] = None
    progress: Optional[Dict[str, Any]] = None
class ProgressInfo(BaseModel):
    """进度信息"""
    current: int
    total: int
    status: str
6. Celery Worker启动(worker/celery_worker.py)
from app.tasks import celery_app
# 启动Celery Worker
if __name__ == '__main__':
    celery_app.start(
        argv=[
            'worker',
            '--loglevel=info',
            '--pool=solo',  # 开发环境使用solo池,生产环境使用prefork或gevent
            '--concurrency=4',  # 并发worker数量
        ]
    )

实际应用场景分析

场景1:电商订单处理

需求:用户下单后,需要:

  1. 扣减库存
  2. 创建订单记录
  3. 发送确认邮件
  4. 生成电子发票
  5. 通知物流系统

同步处理的问题:整个流程可能需要10-30秒,用户体验差。

Celery解决方案

@app.post("/orders")
async def create_order(order_data: OrderCreate):
    # 同步:快速创建订单(<100ms)
    order = await create_order_record(order_data)
    # 异步:后台处理耗时任务
    process_order_payment.delay(order.id)  # 支付处理
    send_order_confirmation_email.delay(order.id)  # 发送邮件
    generate_invoice.delay(order.id)  # 生成发票
    notify_logistics.delay(order.id)  # 通知物流
    return {"order_id": order.id, "status": "created"}
场景2:数据分析报告

需求:运营人员需要生成月度销售报告,包含:

  • 销售数据统计
  • 趋势图表生成
  • PDF文档导出

Celery解决方案

  • 提交任务后立即返回
  • 通过WebSocket推送进度
  • 完成后提供下载链接
  • 支持任务取消和重试
场景3:批量数据处理

需求:批量导入10万条商品数据,需要:

  • 数据验证
  • 图片上传到OSS
  • 生成缩略图
  • 更新数据库索引

Celery解决方案

  • 使用Celery Group并行处理
  • 每个任务处理1000条数据
  • 实时返回处理进度
  • 失败自动重试单条数据

二、gRPC服务间通信

概念解析

在微服务架构中,服务之间需要通信。传统REST API有以下问题:

  • 性能开销:JSON序列化/反序列化慢,HTTP头部开销大
  • 数据传输:二进制数据需要Base64编码,增加体积
  • 类型安全:依赖文档维护,容易出错

gRPC是Google开源的高性能RPC框架,具有以下优势:

 特性

 REST API

 gRPC

 协议

 HTTP/1.1

 HTTP/2

 数据格式

 JSON

 Protocol Buffers

 序列化效率

 中等

 高(二进制)

 类型安全

 弱

 强(proto定义)

 流式传输

 无

 支持(双向流)

 代码生成

 无

 自动生成多语言

架构设计

┌─────────────────┐         ┌─────────────────┐
│   FastAPI       │         │   FastAPI       │
│  (Gateway)      │         │  (Service B)    │
│                 │         │                 │
│  ┌───────────┐  │         │  ┌───────────┐  │
│  │  gRPC     │  │<-------->│  │  gRPC     │  │
│  │  Client   │  │         │  │  Server   │  │
│  └───────────┘  │         │  └───────────┘  │
└─────────────────┘         └─────────────────┘
        │                           │
        v                           v
   HTTP请求                      数据库

完整代码实现

1. Protocol Buffers定义(user_service.proto)
syntax = "proto3";
package user_service;
// 定义用户服务
service UserService {
  // 获取用户信息
  rpc GetUser(GetUserRequest) returns (GetUserResponse);
  // 批量获取用户
  rpc GetUsers(GetUsersRequest) returns (GetUsersResponse);
  // 创建用户
  rpc CreateUser(CreateUserRequest) returns (CreateUserResponse);
  // 更新用户
  rpc UpdateUser(UpdateUserRequest) returns (UpdateUserResponse);
  // 流式获取用户列表(适合大数据量)
  rpc StreamUsers(StreamUsersRequest) returns (stream User);
}
// 用户数据模型
message User {
  int32 id = 1;
  string username = 2;
  string email = 3;
  string full_name = 4;
  bool is_active = 5;
  string created_at = 6;
}
// 请求和响应消息
message GetUserRequest {
  int32 user_id = 1;
}
message GetUserResponse {
  User user = 1;
}
message GetUsersRequest {
  repeated int32 user_ids = 1;
}
message GetUsersResponse {
  repeated User users = 1;
}
message CreateUserRequest {
  string username = 1;
  string email = 2;
  string password = 3;
  string full_name = 4;
}
message CreateUserResponse {
  User user = 1;
}
message UpdateUserRequest {
  int32 user_id = 1;
  string email = 2;
  string full_name = 3;
}
message UpdateUserResponse {
  User user = 1;
}
message StreamUsersRequest {
  int32 limit = 1;
  int32 offset = 2;
}
2. 生成Python代码
# 安装依赖
pip install grpcio grpcio-tools
# 生成Python代码
python -m grpc_tools.protoc \
  --python_out=. \
  --grpc_python_out=. \
  user_service.proto
3. gRPC服务端实现(grpc_server.py)
import grpc
from concurrent import futures
import asyncio
from datetime import datetime
from typing import Dict
import user_service_pb2
import user_service_pb2_grpc
class UserServiceImpl(user_service_pb2_grpc.UserServiceServicer):
    """用户服务实现"""
    def __init__(self):
        # 模拟数据库(实际使用真实数据库)
        self.users_db: Dict[int, user_service_pb2.User] = {}
        self._init_sample_data()
    def _init_sample_data(self):
        """初始化示例数据"""
        users = [
            (1, "alice", "alice@example.com", "Alice Smith", True),
            (2, "bob", "bob@example.com", "Bob Johnson", True),
            (3, "charlie", "charlie@example.com", "Charlie Brown", False),
        ]
        for user_id, username, email, full_name, is_active in users:
            self.users_db[user_id] = user_service_pb2.User(
                id=user_id,
                username=username,
                email=email,
                full_name=full_name,
                is_active=is_active,
                created_at=datetime.now().isoformat()
            )
    def GetUser(self, request, context):
        """
        获取单个用户
        Args:
            request: GetUserRequest
            context: gRPC上下文
        Returns:
            GetUserResponse
        """
        print(f"收到GetUser请求: user_id={request.user_id}")
        user = self.users_db.get(request.user_id)
        if not user:
            # 用户不存在,返回gRPC错误
            context.set_code(grpc.StatusCode.NOT_FOUND)
            context.set_details(f"User {request.user_id} not found")
            return user_service_pb2.GetUserResponse()
        return user_service_pb2.GetUserResponse(user=user)
    def GetUsers(self, request, context):
        """
        批量获取用户
        Args:
            request: GetUsersRequest
            context: gRPC上下文
        Returns:
            GetUsersResponse
        """
        print(f"收到GetUsers请求: user_ids={request.user_ids}")
        users = []
        for user_id in request.user_ids:
            user = self.users_db.get(user_id)
            if user:
                users.append(user)
        return user_service_pb2.GetUsersResponse(users=users)
    def CreateUser(self, request, context):
        """
        创建用户
        Args:
            request: CreateUserRequest
            context: gRPC上下文
        Returns:
            CreateUserResponse
        """
        print(f"收到CreateUser请求: username={request.username}")
        # 生成新用户ID(简化版)
        new_id = max(self.users_db.keys(), default=0) + 1
        user = user_service_pb2.User(
            id=new_id,
            username=request.username,
            email=request.email,
            full_name=request.full_name,
            is_active=True,
            created_at=datetime.now().isoformat()
        )
        self.users_db[new_id] = user
        return user_service_pb2.CreateUserResponse(user=user)
    def UpdateUser(self, request, context):
        """
        更新用户
        Args:
            request: UpdateUserRequest
            context: gRPC上下文
        Returns:
            UpdateUserResponse
        """
        print(f"收到UpdateUser请求: user_id={request.user_id}")
        user = self.users_db.get(request.user_id)
        if not user:
            context.set_code(grpc.StatusCode.NOT_FOUND)
            context.set_details(f"User {request.user_id} not found")
            return user_service_pb2.UpdateUserResponse()
        # 更新字段
        if request.email:
            user.email = request.email
        if request.full_name:
            user.full_name = request.full_name
        return user_service_pb2.UpdateUserResponse(user=user)
    def StreamUsers(self, request, context):
        """
        流式返回用户列表
        适合大数据量场景,避免一次性返回过多数据
        Args:
            request: StreamUsersRequest
            context: gRPC上下文
        Yields:
            User: 每次返回一个用户
        """
        print(f"收到StreamUsers请求: limit={request.limit}, offset={request.offset}")
        # 获取用户列表
        user_list = list(self.users_db.values())
        # 应用分页
        start = request.offset if request.offset > 0 else 0
        end = start + request.limit if request.limit > 0 else len(user_list)
        paginated_users = user_list[start:end]
        # 流式返回
        for user in paginated_users:
            yield user
            # 模拟处理延迟
            import time
            time.sleep(0.1)
def serve():
    """启动gRPC服务器"""
    # 创建gRPC服务器
    server = grpc.server(
        futures.ThreadPoolExecutor(max_workers=10),
        options=[
            ('grpc.max_receive_message_length', 100 * 1024 * 1024),  # 100MB
            ('grpc.max_send_message_length', 100 * 1024 * 1024),
        ]
    )
    # 注册服务
    user_service_pb2_grpc.add_UserServiceServicer_to_server(
        UserServiceImpl(),
        server
    )
    # 绑定端口
    server.add_insecure_port('[::]:50051')
    # 启动服务器
    server.start()
    print("gRPC服务器已启动,监听端口 50051")
    # 等待终止信号
    server.wait_for_termination()
if __name__ == '__main__':
    serve()
4. FastAPI集成gRPC客户端(main.py)
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List
import grpc
import user_service_pb2
import user_service_pb2_grpc
from contextlib import contextmanager
app = FastAPI(title="FastAPI + gRPC集成示例")
# gRPC服务端地址
GRPC_SERVER_ADDRESS = "localhost:50051"
@contextmanager
def get_grpc_channel():
    """获取gRPC通道(上下文管理器)"""
    channel = grpc.insecure_channel(GRPC_SERVER_ADDRESS)
    try:
        yield channel
    finally:
        channel.close()
# ==================== Pydantic模型 ====================
class UserResponse(BaseModel):
    id: int
    username: str
    email: str
    full_name: str
    is_active: bool
    created_at: str
class CreateUserRequest(BaseModel):
    username: str
    email: str
    password: str
    full_name: str
class UpdateUserRequest(BaseModel):
    email: str = None
    full_name: str = None
# ==================== API端点 ====================
@app.get("/")
async def root():
    """根路径"""
    return {"message": "FastAPI + gRPC集成示例"}
@app.get("/users/{user_id}", response_model=UserResponse)
async def get_user(user_id: int):
    """
    获取用户信息
    通过gRPC调用用户服务
    """
    try:
        with get_grpc_channel() as channel:
            stub = user_service_pb2_grpc.UserServiceStub(channel)
            # 调用gRPC方法
            request = user_service_pb2.GetUserRequest(user_id=user_id)
            response = stub.GetUser(request)
            if not response.user:
                raise HTTPException(status_code=404, detail="用户不存在")
            # 转换为Pydantic模型
            return UserResponse(
                id=response.user.id,
                username=response.user.username,
                email=response.user.email,
                full_name=response.user.full_name,
                is_active=response.user.is_active,
                created_at=response.user.created_at
            )
    except grpc.RpcError as e:
        # 处理gRPC错误
        if e.code() == grpc.StatusCode.NOT_FOUND:
            raise HTTPException(status_code=404, detail="用户不存在")
        else:
            raise HTTPException(
                status_code=500,
                detail=f"gRPC错误: {e.code()}: {e.details()}"
            )
@app.get("/users", response_model=List[UserResponse])
async def get_users(user_ids: str):
    """
    批量获取用户
    Args:
        user_ids: 逗号分隔的用户ID列表,如 "1,2,3"
    """
    try:
        # 解析用户ID列表
        user_id_list = [int(uid.strip()) for uid in user_ids.split(',')]
        with get_grpc_channel() as channel:
            stub = user_service_pb2_grpc.UserServiceStub(channel)
            # 调用gRPC方法
            request = user_service_pb2.GetUsersRequest(user_ids=user_id_list)
            response = stub.GetUsers(request)
            # 转换为Pydantic模型列表
            return [
                UserResponse(
                    id=user.id,
                    username=user.username,
                    email=user.email,
                    full_name=user.full_name,
                    is_active=user.is_active,
                    created_at=user.created_at
                )
                for user in response.users
            ]
    except grpc.RpcError as e:
        raise HTTPException(
            status_code=500,
            detail=f"gRPC错误: {e.code()}: {e.details()}"
        )
@app.post("/users", response_model=UserResponse)
async def create_user(request: CreateUserRequest):
    """
    创建用户
    通过gRPC调用用户服务
    """
    try:
        with get_grpc_channel() as channel:
            stub = user_service_pb2_grpc.UserServiceStub(channel)
            # 调用gRPC方法
            grpc_request = user_service_pb2.CreateUserRequest(
                username=request.username,
                email=request.email,
                password=request.password,
                full_name=request.full_name
            )
            response = stub.CreateUser(grpc_request)
            # 返回创建的用户
            return UserResponse(
                id=response.user.id,
                username=response.user.username,
                email=response.user.email,
                full_name=response.user.full_name,
                is_active=response.user.is_active,
                created_at=response.user.created_at
            )
    except grpc.RpcError as e:
        if e.code() == grpc.StatusCode.ALREADY_EXISTS:
            raise HTTPException(status_code=400, detail="用户已存在")
        else:
            raise HTTPException(
                status_code=500,
                detail=f"gRPC错误: {e.code()}: {e.details()}"
            )
@app.put("/users/{user_id}", response_model=UserResponse)
async def update_user(user_id: int, request: UpdateUserRequest):
    """
    更新用户
    通过gRPC调用用户服务
    """
    try:
        with get_grpc_channel() as channel:
            stub = user_service_pb2_grpc.UserServiceStub(channel)
            # 调用gRPC方法
            grpc_request = user_service_pb2.UpdateUserRequest(
                user_id=user_id,
                email=request.email,
                full_name=request.full_name
            )
            response = stub.UpdateUser(grpc_request)
            if not response.user:
                raise HTTPException(status_code=404, detail="用户不存在")
            return UserResponse(
                id=response.user.id,
                username=response.user.username,
                email=response.user.email,
                full_name=response.user.full_name,
                is_active=response.user.is_active,
                created_at=response.user.created_at
            )
    except grpc.RpcError as e:
        if e.code() == grpc.StatusCode.NOT_FOUND:
            raise HTTPException(status_code=404, detail="用户不存在")
        else:
            raise HTTPException(
                status_code=500,
                detail=f"gRPC错误: {e.code()}: {e.details()}"
            )
@app.get("/users/stream")
async def stream_users(limit: int = 10, offset: int = 0):
    """
    流式获取用户列表
    返回Server-Sent Events (SSE) 格式
    """
    from fastapi.responses import StreamingResponse
    import json
    async def generate():
        """生成SSE数据"""
        try:
            with get_grpc_channel() as channel:
                stub = user_service_pb2_grpc.UserServiceStub(channel)
                # 调用流式gRPC方法
                request = user_service_pb2.StreamUsersRequest(
                    limit=limit,
                    offset=offset
                )
                for user in stub.StreamUsers(request):
                    # 转换为JSON格式
                    user_dict = {
                        "id": user.id,
                        "username": user.username,
                        "email": user.email,
                        "full_name": user.full_name,
                        "is_active": user.is_active,
                        "created_at": user.created_at
                    }
                    # 返回SSE格式
                    yield f"data: {json.dumps(user_dict)}\n\n"
        except grpc.RpcError as e:
            error_dict = {"error": f"gRPC错误: {e.code()}: {e.details()}"}
            yield f"data: {json.dumps(error_dict)}\n\n"
    return StreamingResponse(
        generate(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive"
        }
    )

实际应用场景分析

场景1:订单服务调用用户服务

需求:在订单服务中创建订单时,需要验证用户是否存在并获取用户信息。

gRPC方案

# order_service.py
async def create_order(user_id: int, products: List[Product]):
    # 调用用户服务验证用户
    with get_grpc_channel() as channel:
        stub = user_service_pb2_grpc.UserServiceStub(channel)
        response = stub.GetUser(user_service_pb2.GetUserRequest(user_id=user_id))
        if not response.user:
            raise ValueError("用户不存在")
        # 创建订单...
        order = await Order.create(
            user_id=user_id,
            user_email=response.user.email,
            products=products
        )
        return order
场景2:实时数据同步

需求:用户服务修改用户信息后,需要实时同步到搜索服务。

gRPC双向流方案

# 搜索服务监听用户变更
async def listen_for_user_changes():
    async with grpc.aio.insecure_channel('user-service:50051') as channel:
        stub = user_service_pb2_grpc.UserServiceStub(channel)
        # 监听用户变更流
        async for change in stub.StreamUserChanges(user_service_pb2.Empty()):
            # 更新搜索索引
            await update_search_index(change.user)
场景3:微服务网关

需求:API网关需要聚合多个微服务的数据。

gRPC方案

# gateway.py
async def get_order_detail(order_id: int):
    # 并行调用多个服务
    with get_grpc_channel() as channel:
        order_stub = order_service_pb2_grpc.OrderServiceStub(channel)
        user_stub = user_service_pb2_grpc.UserServiceStub(channel)
        product_stub = product_service_pb2_grpc.ProductServiceStub(channel)
        # 并发调用
        order = await asyncio.to_thread(
            order_stub.GetOrder,
            order_service_pb2.GetOrderRequest(order_id=order_id)
        )
        user = await asyncio.to_thread(
            user_stub.GetUser,
            user_service_pb2.GetUserRequest(user_id=order.user_id)
        )
        products = await asyncio.to_thread(
            product_stub.GetProducts,
            product_service_pb2.GetProductsRequest(product_ids=order.product_ids)
        )
        # 聚合返回
        return {
            "order": order,
            "user": user,
            "products": products
        }

三、分布式锁与缓存策略

概念解析

在分布式系统中,多个进程/服务器可能同时访问共享资源,导致竞态条件(Race Condition):

时刻T1: 服务器A读取库存 = 100

时刻T2: 服务器B读取库存 = 100

时刻T3: 服务器A扣减库存 → 99

时刻T4: 服务器B扣减库存 → 99

实际结果: 库存应该是98,但变成了99

分布式锁可以确保同一时间只有一个进程能够操作共享资源。

缓存策略则用于减轻数据库压力,提升响应速度。

完整代码实现

1. Redis分布式锁实现(redis_lock.py)
import redis
import uuid
import time
from contextlib import contextmanager
from typing import Optional
from app.config import get_settings
settings = get_settings()
class RedisDistributedLock:
    """
    Redis分布式锁实现
    基于Redis的SET NX EX命令实现
    原理:
    1. SET key value NX EX timeout
       - NX: 只在key不存在时设置
       - EX: 设置过期时间
    2. 释放锁时,使用Lua脚本确保原子性
    """
    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client
    def acquire(
        self,
        lock_name: str,
        timeout: float = 10.0,
        blocking_timeout: Optional[float] = None
    ) -> bool:
        """
        获取锁
        Args:
            lock_name: 锁名称
            timeout: 锁超时时间(秒)
            blocking_timeout: 阻塞等待时间(秒),None为不阻塞
        Returns:
            bool: 是否成功获取锁
        """
        lock_key = f"lock:{lock_name}"
        lock_value = str(uuid.uuid4())  # 唯一标识,用于释放锁验证
        end_time = time.time() + blocking_timeout if blocking_timeout else None
        while True:
            # 尝试获取锁
            acquired = self.redis.set(
                lock_key,
                lock_value,
                nx=True,
                ex=timeout
            )
            if acquired:
                # 成功获取锁
                self._lock_value = lock_value
                return True
            # 检查是否超时
            if blocking_timeout is not None and time.time() > end_time:
                return False
            # 短暂休眠后重试
            time.sleep(0.01)
    def release(self, lock_name: str) -> bool:
        """
        释放锁
        使用Lua脚本确保原子性:
        - 检查锁的值是否匹配
        - 匹配则删除,不匹配则不删除
        Args:
            lock_name: 锁名称
        Returns:
            bool: 是否成功释放锁
        """
        lock_key = f"lock:{lock_name}"
        lock_value = getattr(self, '_lock_value', None)
        if lock_value is None:
            return False
        # Lua脚本:只有锁的值匹配时才删除
        lua_script = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
        """
        result = self.redis.eval(
            lua_script,
            1,  # key数量
            lock_key,  # key
            lock_value  # argument
        )
        return bool(result)
# 上下文管理器版本
@contextmanager
def distributed_lock(
    redis_client: redis.Redis,
    lock_name: str,
    timeout: float = 10.0,
    blocking_timeout: Optional[float] = None
):
    """
    分布式锁上下文管理器
    使用方式:
        with distributed_lock(redis, "my_lock"):
            # 执行需要加锁的操作
            pass
    Args:
        redis_client: Redis客户端
        lock_name: 锁名称
        timeout: 锁超时时间(秒)
        blocking_timeout: 阻塞等待时间(秒)
    """
    lock = RedisDistributedLock(redis_client)
    try:
        # 获取锁
        acquired = lock.acquire(lock_name, timeout, blocking_timeout)
        if not acquired:
            raise RuntimeError(f"Failed to acquire lock: {lock_name}")
        yield lock
    finally:
        # 释放锁
        lock.release(lock_name)
# 装饰器版本
def with_lock(lock_name: str, timeout: float = 10.0):
    """
    分布式锁装饰器
    使用方式:
        @with_lock("inventory_lock")
        def update_inventory(product_id, quantity):
            pass
    """
    def decorator(func):
        def wrapper(*args, **kwargs):
            # 获取Redis连接
            redis_client = redis.from_url(settings.REDIS_URL)
            with distributed_lock(redis_client, lock_name, timeout):
                return func(*args, **kwargs)
        return wrapper
    return decorator
# ==================== 高级功能 ====================
class RedisRedlock:
    """
    Redis Redlock算法实现
    Redlock是Redis官方推荐的分布式锁算法,
    通过在多个Redis实例上获取锁来提高可靠性。
    适用场景:对锁可靠性要求极高的场景
    """
    def __init__(self, redis_nodes: list):
        """
        Args:
            redis_nodes: Redis节点列表,如 [
                {'host': 'redis1', 'port': 6379},
                {'host': 'redis2', 'port': 6379},
                {'host': 'redis3', 'port': 6379},
            ]
        """
        self.redis_nodes = [
            redis.StrictRedis(**node_config)
            for node_config in redis_nodes
        ]
        self.quorum = len(self.redis_nodes) // 2 + 1  # 多数节点成功
    def acquire(self, lock_name: str, timeout: float = 10.0) -> bool:
        """
        获取Redlock
        需要在多数节点上成功获取锁
        Args:
            lock_name: 锁名称
            timeout: 锁超时时间
        Returns:
            bool: 是否成功获取锁
        """
        lock_key = f"lock:{lock_name}"
        lock_value = str(uuid.uuid4())
        success_count = 0
        start_time = time.time()
        # 在每个节点上尝试获取锁
        for redis_node in self.redis_nodes:
            acquired = redis_node.set(
                lock_key,
                lock_value,
                nx=True,
                ex=timeout
            )
            if acquired:
                success_count += 1
        # 计算获取锁花费的时间
        elapsed = time.time() - start_time
        lock_ttl = timeout - elapsed
        # 检查是否在多数节点上成功
        if success_count >= self.quorum and lock_ttl > 0:
            self._lock_value = lock_value
            self._lock_ttl = lock_ttl
            return True
        # 失败,释放已获取的锁
        self.release(lock_name)
        return False
    def release(self, lock_name: str) -> bool:
        """
        释放Redlock
        在所有节点上释放锁
        """
        lock_key = f"lock:{lock_name}"
        lock_value = getattr(self, '_lock_value', None)
        if lock_value is None:
            return False
        lua_script = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
        """
        # 在所有节点上尝试释放锁
        for redis_node in self.redis_nodes:
            try:
                redis_node.eval(lua_script, 1, lock_key, lock_value)
            except Exception:
                pass
        return True
2. 缓存策略实现(cache.py)
import redis
import json
import hashlib
from typing import Any, Optional, Callable
from functools import wraps
from app.config import get_settings
settings = get_settings()
class RedisCache:
    """
    Redis缓存封装
    支持多种缓存策略:
    - 简单缓存
    - 带过期时间的缓存
    - 缓存穿透保护
    - 缓存雪崩保护
    """
    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client
        self.default_ttl = 3600  # 默认过期时间1小时
    def generate_key(self, prefix: str, *args, **kwargs) -> str:
        """
        生成缓存键
        Args:
            prefix: 键前缀
            *args: 位置参数
            **kwargs: 关键字参数
        Returns:
            str: 缓存键
        """
        # 序列化参数
        params = {"args": args, "kwargs": kwargs}
        params_str = json.dumps(params, sort_keys=True)
        # 生成哈希
        hash_value = hashlib.md5(params_str.encode()).hexdigest()
        return f"{prefix}:{hash_value}"
    def get(self, key: str) -> Optional[Any]:
        """
        获取缓存
        Args:
            key: 缓存键
        Returns:
            缓存值,不存在返回None
        """
        value = self.redis.get(key)
        if value is None:
            return None
        try:
            return json.loads(value)
        except json.JSONDecodeError:
            return value.decode('utf-8')
    def set(
        self,
        key: str,
        value: Any,
        ttl: Optional[int] = None
    ) -> bool:
        """
        设置缓存
        Args:
            key: 缓存键
            value: 缓存值
            ttl: 过期时间(秒),None使用默认值
        Returns:
            bool: 是否成功
        """
        if ttl is None:
            ttl = self.default_ttl
        # 序列化值
        if isinstance(value, (dict, list)):
            serialized = json.dumps(value)
        else:
            serialized = str(value)
        return bool(self.redis.setex(key, ttl, serialized))
    def delete(self, key: str) -> bool:
        """
        删除缓存
        Args:
            key: 缓存键
        Returns:
            bool: 是否成功
        """
        return bool(self.redis.delete(key))
    def exists(self, key: str) -> bool:
        """
        检查缓存是否存在
        Args:
            key: 缓存键
        Returns:
            bool: 是否存在
        """
        return bool(self.redis.exists(key))
    def get_or_set(
        self,
        key: str,
        func: Callable,
        ttl: Optional[int] = None,
        cache_empty: bool = False
    ) -> Any:
        """
        获取缓存,不存在则执行函数并缓存结果
        Args:
            key: 缓存键
            func: 获取数据的函数
            ttl: 过期时间
            cache_empty: 是否缓存空结果(防止缓存穿透)
        Returns:
            Any: 缓存值或函数执行结果
        """
        # 尝试从缓存获取
        cached_value = self.get(key)
        if cached_value is not None:
            return cached_value
        # 缓存未命中,执行函数
        result = func()
        # 缓存结果
        if result is not None or cache_empty:
            self.set(key, result, ttl)
        return result
    def delete_pattern(self, pattern: str) -> int:
        """
        批量删除匹配模式的缓存
        Args:
            pattern: 匹配模式,如 "user:*"
        Returns:
            int: 删除的键数量
        """
        keys = self.redis.keys(pattern)
        if keys:
            return self.redis.delete(*keys)
        return 0
# ==================== 缓存装饰器 ====================
def cached(
    prefix: str,
    ttl: Optional[int] = None,
    cache_empty: bool = False,
    key_generator: Optional[Callable] = None
):
    """
    缓存装饰器
    使用方式:
        @cached(prefix="user", ttl=600)
        async def get_user(user_id: int):
            return await User.get(user_id)
    Args:
        prefix: 缓存键前缀
        ttl: 过期时间(秒)
        cache_empty: 是否缓存空结果
        key_generator: 自定义键生成函数
    """
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            # 获取Redis连接
            redis_client = redis.from_url(settings.REDIS_URL)
            cache = RedisCache(redis_client)
            # 生成缓存键
            if key_generator:
                key = key_generator(prefix, *args, **kwargs)
            else:
                key = cache.generate_key(prefix, *args, **kwargs)
            # 定义获取函数
            def get_data():
                return func(*args, **kwargs)
            # 获取或设置缓存
            return cache.get_or_set(key, get_data, ttl, cache_empty)
        return wrapper
    return decorator
def cache_evict(prefix: str, key_generator: Optional[Callable] = None):
    """
    缓存失效装饰器
    在数据更新后,清除相关缓存
    使用方式:
        @cache_evict(prefix="user")
        async def update_user(user_id: int, **kwargs):
            # 更新逻辑
            pass
    """
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            # 执行原始函数
            result = func(*args, **kwargs)
            # 清除缓存
            redis_client = redis.from_url(settings.REDIS_URL)
            cache = RedisCache(redis_client)
            if key_generator:
                key = key_generator(prefix, *args, **kwargs)
            else:
                key = cache.generate_key(prefix, *args, **kwargs)
            cache.delete(key)
            return result
        return wrapper
    return decorator
# ==================== 缓存策略 ====================
class CacheStrategy:
    """
    缓存策略工厂
    提供多种缓存策略:
    - CacheAside: 旁路缓存(最常用)
    - ReadThrough: 穿透缓存
    - WriteThrough: 直写缓存
    - WriteBehind: 回写缓存
    """
    @staticmethod
    def cache_aside(cache: RedisCache, key: str, ttl: int):
        """
        Cache Aside策略
        读取流程:
        1. 先读缓存
        2. 缓存命中则返回
        3. 缓存未命中,读数据库
        4. 写入缓存
        5. 返回结果
        更新流程:
        1. 先更新数据库
        2. 删除缓存(而不是更新)
        """
        class CacheAside:
            def __init__(self, cache, key, ttl):
                self.cache = cache
                self.key = key
                self.ttl = ttl
            def get(self, db_get_func: Callable) -> Any:
                """读取数据"""
                # 尝试从缓存获取
                cached = self.cache.get(self.key)
                if cached is not None:
                    return cached
                # 缓存未命中,从数据库读取
                data = db_get_func()
                # 写入缓存
                if data is not None:
                    self.cache.set(self.key, data, self.ttl)
                return data
            def update(self, db_update_func: Callable, new_data: Any) -> bool:
                """更新数据"""
                # 先更新数据库
                success = db_update_func(new_data)
                if success:
                    # 删除缓存(而不是更新)
                    self.cache.delete(self.key)
                return success
        return CacheAside(cache, key, ttl)
    @staticmethod
    def read_through(cache: RedisCache, key: str, ttl: int, db_loader: Callable):
        """
        Read Through策略
        缓存和数据库的交互由缓存层统一处理
        应用代码只需调用缓存
        """
        class ReadThrough:
            def __init__(self, cache, key, ttl, db_loader):
                self.cache = cache
                self.key = key
                self.ttl = ttl
                self.db_loader = db_loader
            def get(self) -> Any:
                """读取数据"""
                # 缓存未命中时,由缓存层自动加载数据
                return self.cache.get_or_set(
                    self.key,
                    self.db_loader,
                    self.ttl
                )
        return ReadThrough(cache, key, ttl, db_loader)
3. 实际应用示例(main.py)
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
from typing import Optional
import redis
from app.redis_lock import distributed_lock, with_lock
from app.cache import cached, cache_evict, RedisCache, CacheStrategy
from app.config import get_settings
settings = get_settings()
app = FastAPI(title="分布式锁与缓存示例")
# Redis连接
redis_client = redis.from_url(settings.REDIS_URL)
cache = RedisCache(redis_client)
# ==================== 数据模型 ====================
class ProductUpdate(BaseModel):
    """产品更新请求"""
    stock: int
    price: float
class ProductResponse(BaseModel):
    """产品响应"""
    id: int
    name: str
    stock: int
    price: float
# ==================== 应用示例 ====================
@app.post("/products/{product_id}/decrease-stock")
async def decrease_stock(product_id: int, quantity: int = 1):
    """
    扣减库存(使用分布式锁)
    场景:防止超卖
    问题:多个请求同时扣减库存,可能导致库存变负
    解决:使用分布式锁确保原子性
    """
    lock_key = f"stock_lock:{product_id}"
    # 使用上下文管理器获取锁
    with distributed_lock(redis_client, lock_key, timeout=10):
        # 获取当前库存
        stock_key = f"product:{product_id}:stock"
        current_stock = int(redis_client.get(stock_key) or 0)
        # 检查库存是否足够
        if current_stock < quantity:
            raise HTTPException(
                status_code=400,
                detail=f"库存不足,当前库存: {current_stock},需要: {quantity}"
            )
        # 扣减库存
        new_stock = current_stock - quantity
        redis_client.set(stock_key, new_stock)
        return {
            "product_id": product_id,
            "old_stock": current_stock,
            "new_stock": new_stock,
            "quantity": quantity
        }
@app.post("/products/{product_id}/update")
async def update_product(
    product_id: int,
    update_data: ProductUpdate
):
    """
    更新产品(使用缓存失效)
    场景:更新产品信息后,清除缓存
    """
    lock_key = f"product_lock:{product_id}"
    with distributed_lock(redis_client, lock_key, timeout=10):
        # 更新产品信息
        redis_client.hset(
            f"product:{product_id}",
            mapping={
                "stock": update_data.stock,
                "price": update_data.price
            }
        )
        # 清除产品缓存
        cache_key = f"product:{product_id}"
        cache.delete(cache_key)
        # 清除相关列表缓存
        cache.delete_pattern("products:*")
        return {"product_id": product_id, "updated": True}
@app.get("/products/{product_id}")
@cached(prefix="product", ttl=300)
async def get_product(product_id: int):
    """
    获取产品信息(使用缓存)
    场景:频繁访问的产品信息,使用缓存减轻数据库压力
    """
    # 模拟从数据库查询(实际使用真实数据库)
    product_data = redis_client.hgetall(f"product:{product_id}")
    if not product_data:
        raise HTTPException(status_code=404, detail="产品不存在")
    return {
        "id": product_id,
        "name": product_data.get(b"name", b"").decode(),
        "stock": int(product_data.get(b"stock", 0)),
        "price": float(product_data.get(b"price", 0))
    }
@app.get("/products")
@cached(prefix="products_list", ttl=60)
async def list_products(limit: int = 20, offset: int = 0):
    """
    获取产品列表(使用缓存)
    场景:产品列表页面,使用缓存提升响应速度
    """
    # 模拟从数据库查询
    products = []
    for i in range(offset, offset + limit):
        product_key = f"product:{i}"
        if redis_client.exists(product_key):
            product_data = redis_client.hgetall(product_key)
            products.append({
                "id": i,
                "name": product_data.get(b"name", b"").decode(),
                "stock": int(product_data.get(b"stock", 0)),
                "price": float(product_data.get(b"price", 0))
            })
    return {
        "products": products,
        "total": len(products),
        "limit": limit,
        "offset": offset
    }
@app.post("/products/batch-update")
async def batch_update_products(product_ids: list[int], updates: dict):
    """
    批量更新产品(使用分布式锁保护)
    场景:批量操作,需要锁住多个产品
    """
    # 获取多个产品的锁
    lock_keys = [f"product_lock:{pid}" for pid in product_ids]
    # 按锁的名称排序,避免死锁
    lock_keys.sort()
    try:
        # 获取所有锁
        for lock_key in lock_keys:
            distributed_lock(redis_client, lock_key).acquire(lock_key)
        # 执行批量更新
        for product_id in product_ids:
            redis_client.hset(
                f"product:{product_id}",
                mapping=updates
            )
            # 清除缓存
            cache.delete(f"product:{product_id}")
        # 清除列表缓存
        cache.delete_pattern("products:*")
        return {"updated_count": len(product_ids)}
    finally:
        # 释放所有锁
        for lock_key in lock_keys:
            distributed_lock(redis_client, lock_key).release(lock_key)
@app.post("/cache/clear")
async def clear_cache(pattern: str = "*"):
    """
    清除缓存
    用于运维或测试场景
    """
    deleted_count = cache.delete_pattern(pattern)
    return {
        "pattern": pattern,
        "deleted_count": deleted_count
    }

实际应用场景分析

场景1:秒杀系统

需求:防止超卖,确保库存扣减的准确性。

分布式锁方案

async def seckill_product(product_id: int, user_id: int):
    lock_key = f"seckill:product:{product_id}"
    with distributed_lock(redis_client, lock_key, timeout=5):
        # 检查库存
        stock = await get_stock(product_id)
        if stock <= 0:
            raise HTTPException(400, "已售罄")
        # 检查是否已购买
        if await check_purchased(user_id, product_id):
            raise HTTPException(400, "已购买")
        # 扣减库存
        await decrease_stock(product_id, 1)
        # 创建订单
        await create_order(user_id, product_id)
场景2:热点数据缓存

需求:首页热点商品,频繁访问。

缓存策略

@app.get("/products/hot")
@cached(prefix="hot_products", ttl=300)
async def get_hot_products():
    # 缓存5分钟,减轻数据库压力
    return await Product.get_hot_products()
# 更新热门商品时清除缓存
@cache_evict(prefix="hot_products")
async def update_hot_products():
    # 更新逻辑
    pass
场景3:缓存预热

需求:系统启动时预先加载热点数据到缓存。

实现方案

async def warm_up_cache():
    """缓存预热"""
    hot_products = await Product.get_hot_products()
    for product in hot_products:
        cache.set(
            f"product:{product.id}",
            product,
            ttl=3600
        )
    print(f"预热完成,加载了 {len(hot_products)} 个产品")
# 应用启动时执行
@app.on_event("startup")
async def startup_event():
    await warm_up_cache()

四、WebSocket实时通信

概念解析

WebSocket是一种全双工通信协议,允许服务器主动推送消息给客户端,适用于:

  • 实时聊天:IM、在线客服
  • 实时通知:系统消息、状态更新
  • 实时协作:在线编辑器、白板
  • 实时数据:股票行情、监控数据

HTTP轮询 vs WebSocket

 特性

 HTTP轮询

 WebSocket

 连接方式

 短连接,每次请求重新建立

 长连接,一次建立持续使用

 服务器推送

 不支持(需客户端主动请求)

 支持(服务器可主动推送)

 资源开销

 高(频繁建立连接)

 低(复用连接)

 实时性

 差(有延迟)

 好(即时)

 消息头部

 每次请求都带HTTP头部

 只有握手时带HTTP头部

完整代码实现

1. WebSocket连接管理(websocket_manager.py)
from fastapi import WebSocket, WebSocketDisconnect
from typing import Dict, List, Set
import json
import asyncio
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
class ConnectionManager:
    """
    WebSocket连接管理器
    功能:
    1. 管理所有活跃连接
    2. 支持广播消息
    3. 支持向特定用户发送消息
    4. 支持房间/频道概念
    """
    def __init__(self):
        # 存储所有连接
        self.active_connections: List[WebSocket] = []
        # 存储用户ID到连接的映射
        self.user_connections: Dict[int, WebSocket] = {}
        # 存储房间ID到连接集合的映射
        self.rooms: Dict[str, Set[WebSocket]] = {}
        # 存储连接到房间的映射
        self.connection_rooms: Dict[WebSocket, Set[str]] = {}
    async def connect(self, websocket: WebSocket, user_id: Optional[int] = None):
        """
        接受新连接
        Args:
            websocket: WebSocket连接
            user_id: 用户ID(可选)
        """
        await websocket.accept()
        self.active_connections.append(websocket)
        if user_id:
            self.user_connections[user_id] = websocket
        logger.info(f"新连接已建立,当前连接数: {len(self.active_connections)}")
    def disconnect(self, websocket: WebSocket):
        """
        断开连接
        Args:
            websocket: WebSocket连接
        """
        # 从活跃连接中移除
        if websocket in self.active_connections:
            self.active_connections.remove(websocket)
        # 从用户连接中移除
        user_id = None
        for uid, conn in self.user_connections.items():
            if conn == websocket:
                user_id = uid
                break
        if user_id:
            del self.user_connections[user_id]
        # 从房间中移除
        if websocket in self.connection_rooms:
            for room_id in self.connection_rooms[websocket]:
                if websocket in self.rooms[room_id]:
                    self.rooms[room_id].discard(websocket)
                    # 如果房间为空,删除房间
                    if not self.rooms[room_id]:
                        del self.rooms[room_id]
            del self.connection_rooms[websocket]
        logger.info(f"连接已断开,当前连接数: {len(self.active_connections)}")
    async def send_personal_message(self, message: dict, user_id: int):
        """
        向特定用户发送消息
        Args:
            message: 消息内容
            user_id: 用户ID
        """
        if user_id in self.user_connections:
            websocket = self.user_connections[user_id]
            try:
                await websocket.send_json(message)
                return True
            except Exception as e:
                logger.error(f"发送消息失败: {e}")
                self.disconnect(websocket)
                return False
        logger.warning(f"用户 {user_id} 不在线")
        return False
    async def broadcast(self, message: dict):
        """
        向所有连接广播消息
        Args:
            message: 消息内容
        """
        disconnected = []
        for connection in self.active_connections:
            try:
                await connection.send_json(message)
            except Exception as e:
                logger.error(f"广播消息失败: {e}")
                disconnected.append(connection)
        # 移除断开的连接
        for connection in disconnected:
            self.disconnect(connection)
    async def send_to_room(self, room_id: str, message: dict):
        """
        向房间内的所有连接发送消息
        Args:
            room_id: 房间ID
            message: 消息内容
        """
        if room_id not in self.rooms:
            logger.warning(f"房间 {room_id} 不存在")
            return
        disconnected = []
        for connection in self.rooms[room_id]:
            try:
                await connection.send_json(message)
            except Exception as e:
                logger.error(f"向房间发送消息失败: {e}")
                disconnected.append(connection)
        # 移除断开的连接
        for connection in disconnected:
            self.disconnect(connection)
    def join_room(self, websocket: WebSocket, room_id: str):
        """
        加入房间
        Args:
            websocket: WebSocket连接
            room_id: 房间ID
        """
        # 初始化房间
        if room_id not in self.rooms:
            self.rooms[room_id] = set()
        # 加入房间
        self.rooms[room_id].add(websocket)
        # 记录连接所属的房间
        if websocket not in self.connection_rooms:
            self.connection_rooms[websocket] = set()
        self.connection_rooms[websocket].add(room_id)
        logger.info(f"连接加入房间 {room_id}")
    def leave_room(self, websocket: WebSocket, room_id: str):
        """
        离开房间
        Args:
            websocket: WebSocket连接
            room_id: 房间ID
        """
        if room_id in self.rooms and websocket in self.rooms[room_id]:
            self.rooms[room_id].discard(websocket)
            # 如果房间为空,删除房间
            if not self.rooms[room_id]:
                del self.rooms[room_id]
        # 从连接的房间列表中移除
        if websocket in self.connection_rooms:
            self.connection_rooms[websocket].discard(room_id)
            if not self.connection_rooms[websocket]:
                del self.connection_rooms[websocket]
        logger.info(f"连接离开房间 {room_id}")
    def get_room_size(self, room_id: str) -> int:
        """
        获取房间内连接数
        Args:
            room_id: 房间ID
        Returns:
            int: 连接数
        """
        return len(self.rooms.get(room_id, set()))
    def get_connection_count(self) -> int:
        """
        获取当前总连接数
        Returns:
            int: 连接数
        """
        return len(self.active_connections)
# 创建全局连接管理器实例
manager = ConnectionManager()
# ==================== 消息类型定义 ====================
class MessageType:
    """消息类型常量"""
    # 系统消息
    SYSTEM = "system"
    # 聊天消息
    CHAT = "chat"
    # 通知消息
    NOTIFICATION = "notification"
    # 状态更新
    STATUS = "status"
    # 心跳
    PING = "ping"
    PONG = "pong"
    # 房间操作
    JOIN_ROOM = "join_room"
    LEAVE_ROOM = "leave_room"
    # 用户输入指示
    TYPING = "typing"
    # 错误消息
    ERROR = "error"
def create_message(message_type: str, data: dict, user_id: Optional[int] = None) -> dict:
    """
    创建标准化的消息格式
    Args:
        message_type: 消息类型
        data: 消息数据
        user_id: 发送者用户ID
    Returns:
        dict: 标准化消息
    """
    return {
        "type": message_type,
        "data": data,
        "user_id": user_id,
        "timestamp": datetime.now().isoformat()
    }
2. FastAPI WebSocket应用(main.py)
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Depends, Query
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Optional
import json
import asyncio
from app.websocket_manager import (
    manager,
    MessageType,
    create_message
)
from datetime import datetime
app = FastAPI(title="WebSocket实时通信示例")
# 允许WebSocket跨域
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)
# ==================== WebSocket端点 ====================
@app.websocket("/ws")
async def websocket_endpoint(
    websocket: WebSocket,
    user_id: Optional[int] = Query(None)
):
    """
    WebSocket端点
    支持以下消息类型:
    - chat: 聊天消息
    - join_room: 加入房间
    - leave_room: 离开房间
    - ping: 心跳检测
    - typing: 用户输入指示
    """
    # 接受连接
    await manager.connect(websocket, user_id)
    # 发送欢迎消息
    welcome_message = create_message(
        MessageType.SYSTEM,
        {
            "message": "WebSocket连接已建立",
            "connection_count": manager.get_connection_count()
        }
    )
    await websocket.send_json(welcome_message)
    try:
        while True:
            # 接收消息
            data = await websocket.receive_text()
            try:
                message = json.loads(data)
                msg_type = message.get("type")
                msg_data = message.get("data", {})
                # 处理不同类型的消息
                if msg_type == MessageType.CHAT:
                    # 聊天消息
                    chat_message = create_message(
                        MessageType.CHAT,
                        {
                            "content": msg_data.get("content"),
                            "room_id": msg_data.get("room_id")
                        },
                        user_id
                    )
                    # 发送到指定房间或广播
                    room_id = msg_data.get("room_id")
                    if room_id:
                        await manager.send_to_room(room_id, chat_message)
                    else:
                        await manager.broadcast(chat_message)
                elif msg_type == MessageType.JOIN_ROOM:
                    # 加入房间
                    room_id = msg_data.get("room_id")
                    if room_id:
                        manager.join_room(websocket, room_id)
                        # 通知房间其他成员
                        notification = create_message(
                            MessageType.SYSTEM,
                            {
                                "message": f"用户 {user_id} 加入了房间",
                                "room_id": room_id,
                                "room_size": manager.get_room_size(room_id)
                            },
                            user_id
                        )
                        await manager.send_to_room(room_id, notification)
                elif msg_type == MessageType.LEAVE_ROOM:
                    # 离开房间
                    room_id = msg_data.get("room_id")
                    if room_id:
                        manager.leave_room(websocket, room_id)
                        # 通知房间其他成员
                        notification = create_message(
                            MessageType.SYSTEM,
                            {
                                "message": f"用户 {user_id} 离开了房间",
                                "room_id": room_id,
                                "room_size": manager.get_room_size(room_id)
                            },
                            user_id
                        )
                        await manager.send_to_room(room_id, notification)
                elif msg_type == MessageType.PING:
                    # 心跳响应
                    pong_message = create_message(
                        MessageType.PONG,
                        {"timestamp": datetime.now().isoformat()}
                    )
                    await websocket.send_json(pong_message)
                elif msg_type == MessageType.TYPING:
                    # 用户输入指示
                    typing_message = create_message(
                        MessageType.TYPING,
                        {
                            "room_id": msg_data.get("room_id"),
                            "is_typing": msg_data.get("is_typing", True)
                        },
                        user_id
                    )
                    # 发送到房间
                    room_id = msg_data.get("room_id")
                    if room_id:
                        await manager.send_to_room(room_id, typing_message)
                else:
                    # 未知消息类型
                    error_message = create_message(
                        MessageType.ERROR,
                        {
                            "message": f"未知的消息类型: {msg_type}"
                        }
                    )
                    await websocket.send_json(error_message)
            except json.JSONDecodeError:
                # JSON解析错误
                error_message = create_message(
                    MessageType.ERROR,
                    {"message": "无效的JSON格式"}
                )
                await websocket.send_json(error_message)
            except Exception as e:
                # 消息处理错误
                logger.error(f"处理消息时出错: {e}")
                error_message = create_message(
                    MessageType.ERROR,
                    {"message": f"处理消息失败: {str(e)}"}
                )
                await websocket.send_json(error_message)
    except WebSocketDisconnect:
        manager.disconnect(websocket)
        logger.info(f"用户 {user_id} 断开连接")
@app.websocket("/ws/chat/{room_id}")
async def chat_room_websocket(
    websocket: WebSocket,
    room_id: str,
    user_id: Optional[int] = Query(None)
):
    """
    聊天房间WebSocket端点
    自动加入指定房间
    """
    # 接受连接
    await manager.connect(websocket, user_id)
    # 自动加入房间
    manager.join_room(websocket, room_id)
    # 发送欢迎消息
    welcome_message = create_message(
        MessageType.SYSTEM,
        {
            "message": f"已加入房间 {room_id}",
            "room_id": room_id,
            "room_size": manager.get_room_size(room_id)
        }
    )
    await websocket.send_json(welcome_message)
    try:
        while True:
            # 接收消息
            data = await websocket.receive_text()
            message = json.loads(data)
            # 确保消息包含内容
            if message.get("type") == MessageType.CHAT:
                chat_message = create_message(
                    MessageType.CHAT,
                    {
                        "content": message["data"].get("content"),
                        "room_id": room_id
                    },
                    user_id
                )
                # 发送到房间
                await manager.send_to_room(room_id, chat_message)
    except WebSocketDisconnect:
        manager.disconnect(websocket)
        logger.info(f"用户 {user_id} 离开房间 {room_id}")
# ==================== HTTP端点 ====================
@app.get("/")
async def root():
    """根路径"""
    return {
        "message": "WebSocket实时通信示例",
        "endpoints": {
            "websocket": "/ws",
            "chat_room": "/ws/chat/{room_id}"
        }
    }
@app.get("/stats")
async def get_stats():
    """
    获取WebSocket统计信息
    用于运维监控
    """
    return {
        "total_connections": manager.get_connection_count(),
        "user_connections": len(manager.user_connections),
        "rooms": {
            room_id: len(connections)
            for room_id, connections in manager.rooms.items()
        }
    }
@app.post("/broadcast")
async def broadcast_notification(notification: dict):
    """
    广播系统通知
    用于管理员发送全局消息
    """
    message = create_message(
        MessageType.NOTIFICATION,
        notification,
        user_id=0  # 系统消息
    )
    await manager.broadcast(message)
    return {
        "message": "通知已广播",
        "receivers": manager.get_connection_count()
    }
@app.post("/users/{user_id}/notify")
async def notify_user(user_id: int, notification: dict):
    """
    向特定用户发送通知
    Args:
        user_id: 用户ID
        notification: 通知内容
    """
    message = create_message(
        MessageType.NOTIFICATION,
        notification,
        user_id=0  # 系统消息
    )
    success = await manager.send_personal_message(message, user_id)
    return {
        "success": success,
        "message": "通知已发送" if success else "用户不在线"
    }
3. 客户端示例(index.html)
<!DOCTYPE html>
<html lang="zh-CN">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>WebSocket 聊天示例</title>
    <style>
        body {
            font-family: Arial, sans-serif;
            max-width: 800px;
            margin: 0 auto;
            padding: 20px;
        }
        .messages {
            height: 400px;
            overflow-y: auto;
            border: 1px solid #ddd;
            padding: 10px;
            margin-bottom: 20px;
            background: #f5f5f5;
        }
        .message {
            margin: 10px 0;
            padding: 8px;
            background: white;
            border-radius: 5px;
        }
        .system {
            color: #666;
            font-style: italic;
        }
        .chat {
            border-left: 3px solid #007bff;
            padding-left: 10px;
        }
        .notification {
            background: #fff3cd;
            border-left: 3px solid #ffc107;
            padding-left: 10px;
        }
        .input-group {
            display: flex;
            gap: 10px;
        }
        input[type="text"] {
            flex: 1;
            padding: 10px;
            border: 1px solid #ddd;
            border-radius: 5px;
        }
        button {
            padding: 10px 20px;
            background: #007bff;
            color: white;
            border: none;
            border-radius: 5px;
            cursor: pointer;
        }
        button:hover {
            background: #0056b3;
        }
        .status {
            margin-bottom: 10px;
            padding: 10px;
            border-radius: 5px;
        }
        .connected {
            background: #d4edda;
            color: #155724;
        }
        .disconnected {
            background: #f8d7da;
            color: #721c24;
        }
    </style>
</head>
<body>
    <h1>WebSocket 聊天示例</h1>
    <div id="status" class="status disconnected">未连接</div>
    <div class="input-group" style="margin-bottom: 20px;">
        <input type="text" id="userId" placeholder="用户ID" value="1">
        <input type="text" id="roomId" placeholder="房间ID(可选)">
        <button onclick="connect()">连接</button>
        <button onclick="disconnect()">断开</button>
    </div>
    <div class="messages" id="messages"></div>
    <div class="input-group">
        <input type="text" id="messageInput" placeholder="输入消息...">
        <button onclick="sendMessage()">发送</button>
    </div>
    <script>
        let ws = null;
        let userId = 1;
        let roomId = null;
        function updateStatus(connected) {
            const statusDiv = document.getElementById('status');
            if (connected) {
                statusDiv.textContent = '已连接';
                statusDiv.className = 'status connected';
            } else {
                statusDiv.textContent = '未连接';
                statusDiv.className = 'status disconnected';
            }
        }
        function addMessage(message) {
            const messagesDiv = document.getElementById('messages');
            const messageDiv = document.createElement('div');
            let className = 'message';
            let content = '';
            switch (message.type) {
                case 'system':
                    className += ' system';
                    content = `[系统] ${message.data.message}`;
                    break;
                case 'chat':
                    className += ' chat';
                    content = `[用户${message.user_id}] ${message.data.content}`;
                    break;
                case 'notification':
                    className += ' notification';
                    content = `[通知] ${message.data.message}`;
                    break;
                case 'pong':
                    return; // 不显示心跳响应
                default:
                    content = JSON.stringify(message);
            }
            messageDiv.className = className;
            messageDiv.textContent = content;
            messagesDiv.appendChild(messageDiv);
            messagesDiv.scrollTop = messagesDiv.scrollHeight;
        }
        function connect() {
            userId = document.getElementById('userId').value || 1;
            roomId = document.getElementById('roomId').value;
            const wsUrl = roomId
                ? `ws://localhost:8000/ws/chat/${roomId}?user_id=${userId}`
                : `ws://localhost:8000/ws?user_id=${userId}`;
            ws = new WebSocket(wsUrl);
            ws.onopen = function() {
                updateStatus(true);
                addMessage({
                    type: 'system',
                    data: { message: 'WebSocket连接已建立' }
                });
                // 启动心跳
                startHeartbeat();
            };
            ws.onmessage = function(event) {
                const message = JSON.parse(event.data);
                addMessage(message);
            };
            ws.onerror = function(error) {
                console.error('WebSocket错误:', error);
            };
            ws.onclose = function() {
                updateStatus(false);
                stopHeartbeat();
                addMessage({
                    type: 'system',
                    data: { message: 'WebSocket连接已断开' }
                });
            };
        }
        function disconnect() {
            if (ws) {
                ws.close();
                ws = null;
            }
        }
        function sendMessage() {
            const input = document.getElementById('messageInput');
            const content = input.value.trim();
            if (!content || !ws) {
                return;
            }
            const message = {
                type: 'chat',
                data: {
                    content: content,
                    room_id: roomId
                }
            };
            ws.send(JSON.stringify(message));
            input.value = '';
        }
        let heartbeatInterval = null;
        function startHeartbeat() {
            heartbeatInterval = setInterval(function() {
                if (ws && ws.readyState === WebSocket.OPEN) {
                    ws.send(JSON.stringify({ type: 'ping' }));
                }
            }, 30000); // 每30秒发送一次心跳
        }
        function stopHeartbeat() {
            if (heartbeatInterval) {
                clearInterval(heartbeatInterval);
                heartbeatInterval = null;
            }
        }
        // 回车发送消息
        document.getElementById('messageInput').addEventListener('keypress', function(e) {
            if (e.key === 'Enter') {
                sendMessage();
            }
        });
    </script>
</body>
</html>

实际应用场景分析

场景1:在线客服系统

需求:客服与客户实时对话,支持多客服接待。

WebSocket方案

# 客户加入聊天室
@app.websocket("/ws/customer/{chat_id}")
async def customer_websocket(websocket: WebSocket, chat_id: str, customer_id: int):
    await manager.connect(websocket, customer_id)
    manager.join_room(websocket, chat_id)
    # 通知客服有新客户
    await manager.send_to_room(
        "cs_agents",
        {
            "type": "new_customer",
            "chat_id": chat_id,
            "customer_id": customer_id
        }
    )
# 客服加入所有分配的聊天室
@app.websocket("/ws/agent")
async def agent_websocket(websocket: WebSocket, agent_id: int):
    await manager.connect(websocket, agent_id)
    manager.join_room(websocket, "cs_agents")
    # 获取分配的聊天室
    assigned_chats = await get_assigned_chats(agent_id)
    for chat_id in assigned_chats:
        manager.join_room(websocket, chat_id)
场景2:实时数据监控

需求:监控服务器状态,实时推送CPU、内存、网络数据。

WebSocket方案

async def monitor_broadcast():
    """定期广播监控数据"""
    while True:
        # 获取监控数据
        stats = await get_system_stats()
        # 广播给所有监控客户端
        message = {
            "type": "stats",
            "data": stats,
            "timestamp": datetime.now().isoformat()
        }
        await manager.send_to_room("monitoring", message)
        await asyncio.sleep(5)  # 每5秒更新一次
# 启动后台任务
@app.on_event("startup")
async def startup():
    asyncio.create_task(monitor_broadcast())
场景3:协作编辑器

需求:多人同时编辑文档,实时同步修改。

WebSocket方案

# 操作消息格式
{
    "type": "operation",
    "data": {
        "document_id": 123,
        "operation": {
            "type": "insert",  # 或 delete, retain
            "position": 10,
            "content": "Hello"
        }
    },
    "user_id": 1
}
# 处理操作
@app.websocket("/ws/document/{document_id}")
async def document_websocket(websocket: WebSocket, document_id: int, user_id: int):
    await manager.connect(websocket, user_id)
    manager.join_room(websocket, f"doc:{document_id}")
    # 发送当前文档内容
    content = await get_document_content(document_id)
    await websocket.send_json({
        "type": "init",
        "data": {"content": content}
    })
    try:
        while True:
            data = await websocket.receive_text()
            message = json.loads(data)
            # 应用操作
            operation = message["data"]["operation"]
            new_content = await apply_operation(document_id, operation)
            # 广播给其他协作者
            await manager.send_to_room(
                f"doc:{document_id}",
                {
                    "type": "operation",
                    "data": {"operation": operation, "user_id": user_id}
                },
                exclude=user_id
            )

五、综合实战案例:企业级订单管理系统

案例概述

        我们将整合前面讲解的多个技术点,构建一个企业级订单管理系统,展示FastAPI在实际项目中的最佳实践。

系统架构

┌─────────────────────────────────────────────────────────┐
│                        API Gateway                       │
│                    (FastAPI + gRPC Client)               │
└─────────────────────────────────────────────────────────┘
        │                          │                          │
        v                          v                          v
┌──────────────┐          ┌──────────────┐          ┌──────────────┐
│ Order Service│          │ User Service │          │ Product      │
│  (FastAPI)   │<-------->|  (gRPC)      │<-------->| Service      │
│              │          │              │          │  (gRPC)      │
│ - 创建订单   │          │ - 获取用户   │          │ - 扣减库存   │
│ - 查询订单   │          │ - 验证权限   │          │ - 更新库存   │
└──────────────┘          └──────────────┘          └──────────────┘
        │                          │                          │
        v                          v                          v
┌──────────────┐          ┌──────────────┐          ┌──────────────┐
│ PostgreSQL   │          │ PostgreSQL   │          │ PostgreSQL   │
│ (订单数据)   │          │ (用户数据)   │          │ (产品数据)   │
└──────────────┘          └──────────────┘          └──────────────┘
        │
        v
┌──────────────┐
│    Redis     │
│              │
│ - 缓存       │
│ - 分布式锁   │
│ - 任务队列   │
└──────────────┘
        │
        v
┌──────────────┐
│    Celery    │
│  (异步任务)  │
│              │
│ - 发送邮件   │
│ - 生成发票   │
│ - 通知物流   │
└──────────────┘

完整代码实现

1. 项目结构
enterprise_order_system/
├── services/
│   ├── gateway/
│   │   ├── main.py              # API网关
│   │   ├── config.py
│   │   └── grpc_clients.py      # gRPC客户端
│   ├── order_service/
│   │   ├── main.py              # 订单服务
│   │   ├── models.py            # 数据模型
│   │   ├── schemas.py           # Pydantic模型
│   │   ├── tasks.py             # Celery任务
│   │   └── deps.py              # 依赖注入
│   ├── user_service/
│   │   ├── user_service.proto   # gRPC定义
│   │   ├── server.py            # gRPC服务端
│   │   └── models.py
│   └── product_service/
│       ├── product_service.proto
│       ├── server.py
│       └── models.py
├── shared/
│   ├── config.py                # 共享配置
│   ├── redis_client.py          # Redis客户端
│   └── grpc_common.py           # gRPC通用工具
├── protos/
│   ├── user_service.proto
│   └── product_service.proto
└── requirements.txt
2. 订单服务(services/order_service/main.py)
from fastapi import FastAPI, Depends, HTTPException, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from sqlalchemy.ext.asyncio import AsyncSession
from typing import List, Optional
import asyncio
from datetime import datetime
from .models import Base, Order, OrderItem
from .schemas import (
    OrderCreate,
    OrderResponse,
    OrderItemCreate,
    OrderStatusUpdate
)
from .deps import get_db, get_redis, get_current_user
from .tasks import (
    process_order_payment,
    send_order_confirmation,
    generate_invoice,
    notify_logistics
)
from shared.config import settings
from shared.redis_client import get_redis_client
from app.grpc_clients import UserServiceClient, ProductServiceClient
app = FastAPI(
    title="订单服务",
    version="2.0.0",
    description="企业级订单管理服务"
)
# 跨域配置
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)
# gRPC客户端
user_client = UserServiceClient()
product_client = ProductServiceClient()
# ==================== 依赖注入 ====================
async def verify_order_permission(
    order_id: int,
    current_user_id: int,
    db: AsyncSession = Depends(get_db)
) -> Order:
    """
    验证用户是否有权限访问订单
    Args:
        order_id: 订单ID
        current_user_id: 当前用户ID
        db: 数据库会话
    Returns:
        Order: 订单对象
    Raises:
        HTTPException: 订单不存在或无权限
    """
    order = await db.get(Order, order_id)
    if not order:
        raise HTTPException(status_code=404, detail="订单不存在")
    if order.user_id != current_user_id:
        raise HTTPException(status_code=403, detail="无权访问此订单")
    return order
# ==================== API端点 ====================
@app.post("/orders", response_model=OrderResponse)
async def create_order(
    order_data: OrderCreate,
    current_user_id: int = Depends(get_current_user),
    db: AsyncSession = Depends(get_db),
    background_tasks: BackgroundTasks
):
    """
    创建订单
    流程:
    1. 验证用户存在性(gRPC)
    2. 验证产品存在性并检查库存(gRPC)
    3. 扣减库存(gRPC,使用分布式锁)
    4. 创建订单记录
    5. 启动异步任务(邮件、发票、物流)
    """
    # 1. 验证用户
    user = await user_client.get_user(current_user_id)
    if not user:
        raise HTTPException(status_code=404, detail="用户不存在")
    if not user.is_active:
        raise HTTPException(status_code=400, detail="用户账户已被禁用")
    # 2. 验证产品并检查库存
    product_ids = [item.product_id for item in order_data.items]
    products = await product_client.batch_get_products(product_ids)
    if len(products) != len(product_ids):
        raise HTTPException(status_code=400, detail="部分产品不存在")
    # 检查库存
    for product in products:
        for order_item in order_data.items:
            if product.id == order_item.product_id:
                if product.stock < order_item.quantity:
                    raise HTTPException(
                        status_code=400,
                        detail=f"产品 '{product.name}' 库存不足"
                    )
    # 3. 扣减库存(使用分布式锁)
    redis = get_redis_client()
    for product in products:
        for order_item in order_data.items:
            if product.id == order_item.product_id:
                # 使用Redis分布式锁扣减库存
                lock_key = f"product_stock_lock:{product.id}"
                # 模拟分布式锁获取
                acquired = redis.set(
                    lock_key,
                    "1",
                    nx=True,
                    ex=10
                )
                if acquired:
                    try:
                        # 扣减库存
                        new_stock = product.stock - order_item.quantity
                        await product_client.update_product_stock(
                            product.id,
                            new_stock
                        )
                    finally:
                        redis.delete(lock_key)
                else:
                    # 获取锁失败,重试或返回错误
                    raise HTTPException(
                        status_code=429,
                        detail="系统繁忙,请稍后重试"
                    )
    # 4. 创建订单记录
    order = Order(
        user_id=current_user_id,
        status="pending",
        total_amount=calculate_total_amount(products, order_data.items),
        shipping_address=order_data.shipping_address
    )
    db.add(order)
    await db.flush()  # 获取订单ID
    # 创建订单项
    for item in order_data.items:
        order_item = OrderItem(
            order_id=order.id,
            product_id=item.product_id,
            quantity=item.quantity,
            price=get_product_price(products, item.product_id)
        )
        db.add(order_item)
    await db.commit()
    await db.refresh(order)
    # 5. 启动异步任务
    background_tasks.add_task(process_order_payment.delay, order.id)
    background_tasks.add_task(send_order_confirmation.delay, order.id)
    background_tasks.add_task(generate_invoice.delay, order.id)
    background_tasks.add_task(notify_logistics.delay, order.id)
    # 6. 清除相关缓存
    redis.delete(f"user_orders:{current_user_id}")
    return order
@app.get("/orders/{order_id}", response_model=OrderResponse)
async def get_order(
    order_id: int,
    current_user_id: int = Depends(get_current_user),
    db: AsyncSession = Depends(get_db),
    redis = Depends(get_redis)
):
    """
    获取订单详情
    使用缓存优化性能
    """
    cache_key = f"order:{order_id}:user:{current_user_id}"
    # 尝试从缓存获取
    cached_order = redis.get(cache_key)
    if cached_order:
        import json
        return json.loads(cached_order)
    # 验证权限
    order = await verify_order_permission(order_id, current_user_id, db)
    # 缓存结果(5分钟)
    redis.setex(cache_key, 300, order.json())
    return order
@app.get("/orders", response_model=List[OrderResponse])
async def list_orders(
    status: Optional[str] = None,
    skip: int = 0,
    limit: int = 20,
    current_user_id: int = Depends(get_current_user),
    db: AsyncSession = Depends(get_db),
    redis = Depends(get_redis)
):
    """
    获取订单列表
    使用缓存优化性能
    """
    cache_key = f"user_orders:{current_user_id}:{status}:{skip}:{limit}"
    # 尝试从缓存获取
    cached_orders = redis.get(cache_key)
    if cached_orders:
        import json
        return json.loads(cached_orders)
    # 构建查询
    query = db.query(Order).filter(Order.user_id == current_user_id)
    if status:
        query = query.filter(Order.status == status)
    query = query.offset(skip).limit(limit)
    query = query.order_by(Order.created_at.desc())
    orders = await query.all()
    # 缓存结果(3分钟)
    redis.setex(cache_key, 180, json.dumps([o.json() for o in orders]))
    return orders
@app.put("/orders/{order_id}/status", response_model=OrderResponse)
async def update_order_status(
    order_id: int,
    status_update: OrderStatusUpdate,
    current_user_id: int = Depends(get_current_user),
    db: AsyncSession = Depends(get_db),
    redis = Depends(get_redis)
):
    """
    更新订单状态
    更新后清除相关缓存
    """
    # 验证权限(管理员或订单拥有者)
    order = await verify_order_permission(order_id, current_user_id, db)
    # 更新状态
    order.status = status_update.status
    order.updated_at = datetime.now()
    await db.commit()
    await db.refresh(order)
    # 清除缓存
    redis.delete(f"order:{order_id}:user:{current_user_id}")
    redis.delete(f"user_orders:{current_user_id}:*")
    # 通知用户(WebSocket)
    await send_order_status_notification(order)
    return order
@app.post("/orders/{order_id}/cancel", response_model=OrderResponse)
async def cancel_order(
    order_id: int,
    current_user_id: int = Depends(get_current_user),
    db: AsyncSession = Depends(get_db),
    redis = Depends(get_redis)
):
    """
    取消订单
    需要验证订单状态,已发货的订单不能取消
    """
    order = await verify_order_permission(order_id, current_user_id, db)
    # 验证订单状态
    if order.status in ["shipped", "completed"]:
        raise HTTPException(
            status_code=400,
            detail="订单已发货或已完成,无法取消"
        )
    if order.status == "cancelled":
        raise HTTPException(status_code=400, detail="订单已取消")
    # 取消订单
    order.status = "cancelled"
    order.updated_at = datetime.now()
    # 恢复库存
    for item in order.items:
        await product_client.restore_product_stock(
            item.product_id,
            item.quantity
        )
    await db.commit()
    await db.refresh(order)
    # 清除缓存
    redis.delete(f"order:{order_id}:user:{current_user_id}")
    redis.delete(f"user_orders:{current_user_id}:*")
    return order
# ==================== 辅助函数 ====================
def calculate_total_amount(products: List, items: List[OrderItemCreate]) -> float:
    """计算订单总金额"""
    total = 0.0
    for product in products:
        for item in items:
            if product.id == item.product_id:
                total += product.price * item.quantity
    return total
def get_product_price(products: List, product_id: int) -> float:
    """获取产品价格"""
    for product in products:
        if product.id == product_id:
            return product.price
    return 0.0
async def send_order_status_notification(order: Order):
    """发送订单状态变更通知(WebSocket)"""
    from app.websocket_manager import manager
    notification = {
        "type": "order_status_update",
        "data": {
            "order_id": order.id,
            "status": order.status,
            "timestamp": order.updated_at.isoformat()
        }
    }
    await manager.send_personal_message(notification, order.user_id)
3. Celery任务(services/order_service/tasks.py)
from celery import Celery
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
from app.models import Order, OrderItem
from app.schemas import OrderResponse
from app.deps import get_db
from app.grpc_clients import UserServiceClient, ProductServiceClient
from datetime import datetime
import smtplib
from email.mime.text import MIMEText
# 创建Celery实例
celery_app = Celery(
    "order_tasks",
    broker=settings.CELERY_BROKER_URL,
    backend=settings.CELERY_RESULT_BACKEND
)
# gRPC客户端
user_client = UserServiceClient()
product_client = ProductServiceClient()
@celery_app.task(bind=True, max_retries=3)
def process_order_payment(self, order_id: int):
    """
    处理订单支付
    Args:
        order_id: 订单ID
    """
    print(f"开始处理订单 {order_id} 的支付")
    try:
        # 获取订单信息
        db = next(get_db())
        order = db.get(Order, order_id)
        # 模拟支付处理
        # 实际项目中调用支付网关API
        payment_success = simulate_payment_process(order)
        if payment_success:
            # 更新订单状态
            order.status = "paid"
            order.paid_at = datetime.now()
            db.commit()
            print(f"订单 {order_id} 支付成功")
        else:
            # 支付失败,取消订单
            order.status = "payment_failed"
            db.commit()
            raise Exception("支付处理失败")
    except Exception as exc:
        print(f"支付处理失败: {str(exc)}")
        raise self.retry(exc=exc, countdown=60)
@celery_app.task(bind=True)
def send_order_confirmation(self, order_id: int):
    """
    发送订单确认邮件
    Args:
        order_id: 订单ID
    """
    print(f"发送订单 {order_id} 确认邮件")
    try:
        # 获取订单信息
        db = next(get_db())
        order = db.get(Order, order_id)
        # 获取用户信息
        user = user_client.get_user(order.user_id)
        # 构建邮件内容
        subject = f"订单确认 - #{order.id}"
        body = f"""
        尊敬的 {user.full_name}:
        您的订单已创建成功!
        订单号:{order.id}
        下单时间:{order.created_at}
        订单金额:¥{order.total_amount:.2f}
        收货地址:{order.shipping_address}
        订单详情:
        """
        # 添加订单项
        for item in order.items:
            product = product_client.get_product(item.product_id)
            body += f"""
            - {product.name} x {item.quantity} = ¥{item.price * item.quantity:.2f}
            """
        body += f"""
        总计:¥{order.total_amount:.2f}
        感谢您的购买!
        """
        # 发送邮件
        send_email(user.email, subject, body)
        print(f"订单 {order_id} 确认邮件已发送")
    except Exception as exc:
        print(f"发送邮件失败: {str(exc)}")
        raise self.retry(exc=exc, countdown=120)
@celery_app.task(bind=True)
def generate_invoice(self, order_id: int):
    """
    生成发票
    Args:
        order_id: 订单ID
    """
    print(f"生成订单 {order_id} 的发票")
    try:
        # 获取订单信息
        db = next(get_db())
        order = db.get(Order, order_id)
        # 获取用户信息
        user = user_client.get_user(order.user_id)
        # 生成发票(实际使用reportlab或weasyprint)
        invoice_path = f"./invoices/invoice_{order.id}.pdf"
        generate_pdf_invoice(order, user, invoice_path)
        # 更新订单发票路径
        order.invoice_path = invoice_path
        db.commit()
        print(f"订单 {order_id} 发票已生成: {invoice_path}")
    except Exception as exc:
        print(f"生成发票失败: {str(exc)}")
        raise self.retry(exc=exc, countdown=180)
@celery_app.task(bind=True)
def notify_logistics(self, order_id: int):
    """
    通知物流系统
    Args:
        order_id: 订单ID
    """
    print(f"通知物流系统: 订单 {order_id}")
    try:
        # 获取订单信息
        db = next(get_db())
        order = db.get(Order, order_id)
        # 调用物流系统API
        notify_logistics_api(order)
        print(f"订单 {order_id} 已通知物流系统")
    except Exception as exc:
        print(f"通知物流系统失败: {str(exc)}")
        raise self.retry(exc=exc, countdown=300)
# ==================== 辅助函数 ====================
def simulate_payment_process(order: Order) -> bool:
    """模拟支付处理"""
    # 实际项目中调用支付宝、微信支付等API
    import time
    time.sleep(2)  # 模拟网络延迟
    return True  # 假设支付成功
def send_email(to_email: str, subject: str, body: str):
    """发送邮件"""
    # 实际项目中配置SMTP服务器
    msg = MIMEText(body, 'plain', 'utf-8')
    msg['Subject'] = subject
    msg['From'] = 'noreply@example.com'
    msg['To'] = to_email
    # 实际发送
    # smtp = smtplib.SMTP('smtp.example.com', 587)
    # smtp.send_message(msg)
    # smtp.quit()
    print(f"邮件已发送至 {to_email}")
def generate_pdf_invoice(order: Order, user, output_path: str):
    """生成PDF发票"""
    # 实际使用reportlab或weasyprint
    with open(output_path, 'w', encoding='utf-8') as f:
        f.write(f"发票\n")
        f.write(f"订单号: {order.id}\n")
        f.write(f"用户: {user.full_name}\n")
        f.write(f"金额: ¥{order.total_amount:.2f}\n")
def notify_logistics_api(order: Order):
    """通知物流系统API"""
    # 实际调用物流系统API
    import requests
    # requests.post(
    #     "https://logistics.example.com/api/orders",
    #     json={"order_id": order.id, ...}
    # )
    pass
4. 数据模型(services/order_service/models.py)
from sqlalchemy import Column, Integer, String, Float, DateTime, Text, ForeignKey, Enum
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship
from datetime import datetime
import enum
Base = declarative_base()
class OrderStatus(str, enum.Enum):
    """订单状态枚举"""
    PENDING = "pending"
    PAID = "paid"
    PROCESSING = "processing"
    SHIPPED = "shipped"
    DELIVERED = "delivered"
    CANCELLED = "cancelled"
    REFUNDED = "refunded"
class Order(Base):
    """订单模型"""
    __tablename__ = "orders"
    id = Column(Integer, primary_key=True, index=True)
    user_id = Column(Integer, ForeignKey("users.id"), nullable=False, index=True)
    status = Column(Enum(OrderStatus), default=OrderStatus.PENDING, nullable=False, index=True)
    total_amount = Column(Float, nullable=False)
    shipping_address = Column(Text, nullable=False)
    tracking_number = Column(String(100), nullable=True)
    invoice_path = Column(String(255), nullable=True)
    created_at = Column(DateTime, default=datetime.now, nullable=False)
    updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now)
    paid_at = Column(DateTime, nullable=True)
    shipped_at = Column(DateTime, nullable=True)
    delivered_at = Column(DateTime, nullable=True)
    # 关系
    items = relationship("OrderItem", back_populates="order", cascade="all, delete-orphan")
    def __repr__(self):
        return f"<Order(id={self.id}, user_id={self.user_id}, status={self.status})>"
class OrderItem(Base):
    """订单项模型"""
    __tablename__ = "order_items"
    id = Column(Integer, primary_key=True, index=True)
    order_id = Column(Integer, ForeignKey("orders.id"), nullable=False)
    product_id = Column(Integer, ForeignKey("products.id"), nullable=False)
    quantity = Column(Integer, nullable=False)
    price = Column(Float, nullable=False)  # 下单时的价格
    # 关系
    order = relationship("Order", back_populates="items")
    def __repr__(self):
        return f"<OrderItem(id={self.id}, order_id={self.order_id}, product_id={self.product_id})>"
5. gRPC客户端(services/order_service/grpc_clients.py)
import grpc
import user_service_pb2
import user_service_pb2_grpc
import product_service_pb2
import product_service_pb2_grpc
from app.config import settings
from contextlib import contextmanager
class UserServiceClient:
    """用户服务gRPC客户端"""
    def __init__(self):
        self.address = f"{settings.USER_SERVICE_HOST}:{settings.USER_SERVICE_PORT}"
    @contextmanager
    def get_channel(self):
        """获取gRPC通道"""
        channel = grpc.insecure_channel(self.address)
        try:
            yield channel
        finally:
            channel.close()
    async def get_user(self, user_id: int):
        """获取用户信息"""
        with self.get_channel() as channel:
            stub = user_service_pb2_grpc.UserServiceStub(channel)
            request = user_service_pb2.GetUserRequest(user_id=user_id)
            response = stub.GetUser(request)
            return response.user if response.user else None
    async def batch_get_users(self, user_ids: list[int]):
        """批量获取用户"""
        with self.get_channel() as channel:
            stub = user_service_pb2_grpc.UserServiceStub(channel)
            request = user_service_pb2.GetUsersRequest(user_ids=user_ids)
            response = stub.GetUsers(request)
            return list(response.users)
class ProductServiceClient:
    """产品服务gRPC客户端"""
    def __init__(self):
        self.address = f"{settings.PRODUCT_SERVICE_HOST}:{settings.PRODUCT_SERVICE_PORT}"
    @contextmanager
    def get_channel(self):
        """获取gRPC通道"""
        channel = grpc.insecure_channel(self.address)
        try:
            yield channel
        finally:
            channel.close()
    async def get_product(self, product_id: int):
        """获取产品信息"""
        with self.get_channel() as channel:
            stub = product_service_pb2_grpc.ProductServiceStub(channel)
            request = product_service_pb2.GetProductRequest(product_id=product_id)
            response = stub.GetProduct(request)
            return response.product if response.product else None
    async def batch_get_products(self, product_ids: list[int]):
        """批量获取产品"""
        with self.get_channel() as channel:
            stub = product_service_pb2_grpc.ProductServiceStub(channel)
            request = product_service_pb2.GetProductsRequest(product_ids=product_ids)
            response = stub.GetProducts(request)
            return list(response.products)
    async def update_product_stock(self, product_id: int, new_stock: int):
        """更新产品库存"""
        with self.get_channel() as channel:
            stub = product_service_pb2_grpc.ProductServiceStub(channel)
            request = product_service_pb2.UpdateProductStockRequest(
                product_id=product_id,
                stock=new_stock
            )
            response = stub.UpdateProductStock(request)
            return response.product
    async def restore_product_stock(self, product_id: int, quantity: int):
        """恢复产品库存"""
        with self.get_channel() as channel:
            stub = product_service_pb2_grpc.ProductServiceStub(channel)
            request = product_service_pb2.RestoreProductStockRequest(
                product_id=product_id,
                quantity=quantity
            )
            response = stub.RestoreProductStock(request)
            return response.success

性能优化建议

1. 数据库优化
# 添加索引
from sqlalchemy import Index
# 在模型中定义复合索引
Index('idx_order_user_status', Order.user_id, Order.status)
Index('idx_order_created', Order.created_at.desc())
# 使用selectinload预加载关联数据
from sqlalchemy.orm import selectinload
async def get_order_with_items(order_id: int):
    query = select(Order).options(
        selectinload(Order.items)
    ).where(Order.id == order_id)
    result = await db.execute(query)
    return result.scalar_one_or_none()
2. 缓存策略
from app.redis_client import get_redis_client
# 缓存热点数据
redis = get_redis_client()
# 订单详情缓存(5分钟)
await redis.setex(
    f"order:{order_id}",
    300,
    order.json()
)
# 用户订单列表缓存(3分钟)
await redis.setex(
    f"user_orders:{user_id}",
    180,
    json.dumps([o.json() for o in orders])
)
# 使用缓存装饰器
@app.get("/orders/{order_id}")
@cached(prefix="order", ttl=300)
async def get_order(order_id: int):
    # 自动缓存结果
    pass
3. 连接池优化
# 数据库连接池配置
engine = create_async_engine(
    DATABASE_URL,
    pool_size=20,              # 基础连接数
    max_overflow=10,           # 最大溢出连接数
    pool_timeout=30,           # 获取连接超时时间
    pool_recycle=3600,         # 连接回收时间
    pool_pre_ping=True         # 连接健康检查
)
# Redis连接池
redis_pool = redis.ConnectionPool(
    host=settings.REDIS_HOST,
    port=settings.REDIS_PORT,
    max_connections=20,
    decode_responses=True
)
redis = redis.Redis(connection_pool=redis_pool)
4. 异步优化
# 并发调用多个gRPC服务
import asyncio
async def create_order(order_data: OrderCreate):
    # 并发调用用户服务和产品服务
    user_task = asyncio.create_task(user_client.get_user(current_user_id))
    product_task = asyncio.create_task(
        product_client.batch_get_products(product_ids)
    )
    # 等待所有任务完成
    user, products = await asyncio.gather(user_task, product_task)
    # 继续处理...

六、常见问题与解决方案

问题1:Celery任务不执行

症状:提交任务后,任务状态一直是PENDING,永远不会执行。

原因分析

  1. Celery Worker未启动或崩溃
  2. Broker连接失败
  3. 任务路由配置错误
  4. Worker队列名称不匹配

解决方案

# 1. 检查Worker状态
# 命令行执行:
celery -A app.tasks inspect active
# 2. 检查Broker连接
import redis
r = redis.from_url(settings.CELERY_BROKER_URL)
r.ping()  # 应该返回True
# 3. 确保任务路由正确
# 在celery配置中指定队列
celery_app.conf.task_routes = {
    'app.tasks.send_email_task': {'queue': 'email'},
    'app.tasks.generate_pdf_report_task': {'queue': 'reports'},
}
# 4. Worker启动时指定队列
celery -A app.tasks worker --loglevel=info --queues=reports,email
# 5. 添加错误处理
@celery_app.task(bind=True, max_retries=3)
def my_task(self, *args, **kwargs):
    try:
        # 任务逻辑
        pass
    except Exception as exc:
        # 记录错误
        logger.error(f"任务执行失败: {str(exc)}")
        # 重试
        raise self.retry(exc=exc, countdown=60)

问题2:gRPC连接超时

症状:调用gRPC服务时,经常出现连接超时错误。

原因分析

  1. 网络延迟或丢包
  2. 服务端处理慢
  3. 连接池耗尽
  4. 超时配置不合理

解决方案

# 1. 增加超时时间
from grpc import ChannelOptions
channel = grpc.insecure_channel(
    server_address,
    options=[
        ('grpc.max_send_message_length', 100 * 1024 * 1024),  # 100MB
        ('grpc.max_receive_message_length', 100 * 1024 * 1024),
        ('grpc.enable_retries', True),
        ('grpc.service_config', json.dumps({
            "methodConfig": [{
                "name": [{}],
                "retryPolicy": {
                    "maxAttempts": 5,
                    "initialBackoff": "1s",
                    "maxBackoff": "10s",
                    "backoffMultiplier": 2,
                    "retryableStatusCodes": ["UNAVAILABLE"]
                }
            }]
        }))
    ]
)
# 2. 使用连接池
import grpc.experimental.aio
class GrpcConnectionPool:
    def __init__(self, address: str, max_connections: int = 10):
        self.address = address
        self.pool = grpc.experimental.aio.AioChannelPool(
            lambda: grpc.experimental.aio.insecure_channel(address),
            max_connections
        )
    async def get_connection(self):
        return await self.pool.acquire()
    async def release_connection(self, connection):
        await self.pool.release(connection)
# 3. 添加熔断器
from circuitbreaker import circuit
@circuit(failure_threshold=5, recovery_timeout=30)
async def call_grpc_service():
    # 调用gRPC服务
    pass
# 4. 健康检查和重试
async def call_with_retry(service_func, max_retries=3, delay=1):
    for attempt in range(max_retries):
        try:
            return await service_func()
        except grpc.RpcError as e:
            if e.code() == grpc.StatusCode.UNAVAILABLE:
                if attempt < max_retries - 1:
                    await asyncio.sleep(delay)
                    continue
            raise

问题3:WebSocket连接频繁断开

症状:WebSocket连接建立后,经常自动断开。

原因分析

  1. 网络不稳定
  2. Nginx超时配置
  3. 服务器资源不足
  4. 客户端心跳机制缺失

解决方案

# 1. 实现心跳机制
class WebSocketConnection:
    def __init__(self, websocket: WebSocket):
        self.websocket = websocket
        self.heartbeat_task = None
    async def start_heartbeat(self):
        """启动心跳任务"""
        self.heartbeat_task = asyncio.create_task(self._heartbeat_loop())
    async def _heartbeat_loop(self):
        """心跳循环"""
        while True:
            try:
                # 发送ping
                await self.websocket.send_json({"type": "ping"})
                # 等待pong
                await asyncio.wait_for(
                    self.wait_for_pong(),
                    timeout=30
                )
            except asyncio.TimeoutError:
                # 超时,断开连接
                await self.disconnect()
                break
            except Exception:
                await self.disconnect()
                break
    async def wait_for_pong(self):
        """等待pong响应"""
        while True:
            data = await self.websocket.receive_text()
            message = json.loads(data)
            if message.get("type") == "pong":
                return
    async def disconnect(self):
        """断开连接"""
        if self.heartbeat_task:
            self.heartbeat_task.cancel()
        await self.websocket.close()
# 2. 配置Nginx超时
# nginx.conf
location /ws/ {
    proxy_pass http://backend;
    proxy_http_version 1.1;
    proxy_set_header Upgrade $http_upgrade;
    proxy_set_header Connection "upgrade";
    # 增加超时时间
    proxy_connect_timeout 300s;
    proxy_send_timeout 300s;
    proxy_read_timeout 300s;
}
# 3. 优雅关闭
@app.on_event("shutdown")
async def shutdown_event():
    """应用关闭时优雅关闭WebSocket连接"""
    for connection in manager.active_connections[:]:
        try:
            await connection.send_json({
                "type": "system",
                "data": {"message": "服务器即将关闭"}
            })
            await connection.close()
        except Exception:
            pass

问题4:Redis缓存雪崩

症状:大量缓存同时过期,导致数据库压力激增。

原因分析

  1. 缓存时间设置相同
  2. 缓存预热不当
  3. 缓存键设计不合理

解决方案

import random
# 1. 随机化过期时间
def set_cache_with_jitter(key: str, value: str, base_ttl: int = 3600):
    """
    设置缓存,带随机过期时间
    Args:
        key: 缓存键
        value: 缓存值
        base_ttl: 基础过期时间(秒)
    """
    # 添加±10%的随机波动
    jitter = random.randint(-int(base_ttl * 0.1), int(base_ttl * 0.1))
    ttl = base_ttl + jitter
    redis.setex(key, ttl, value)
# 2. 多级缓存
class MultiLevelCache:
    """多级缓存:L1(内存) -> L2(Redis) -> DB"""
    def __init__(self):
        self.l1_cache = {}  # 本地缓存
        self.l2_cache = redis  # Redis缓存
    async def get(self, key: str):
        # 尝试从L1缓存获取
        if key in self.l1_cache:
            return self.l1_cache[key]
        # 尝试从L2缓存获取
        value = self.l2_cache.get(key)
        if value:
            # 写入L1缓存
            self.l1_cache[key] = value
            return value
        # 从数据库获取
        value = await get_from_db(key)
        # 写入多级缓存
        self.l1_cache[key] = value
        self.l2_cache.setex(key, 3600, value)
        return value
# 3. 缓存预热
async def warm_up_cache():
    """预热缓存"""
    hot_keys = await get_hot_keys()
    for key in hot_keys:
        value = await get_from_db(key)
        set_cache_with_jitter(key, value, 3600)
        # 延迟,避免一次性加载过多
        await asyncio.sleep(0.1)
# 4. 缓存更新策略
def update_cache_with_lock(key: str, new_value: str):
    """
    使用锁更新缓存
    防止多个进程同时重建缓存
    """
    lock_key = f"cache_lock:{key}"
    # 获取锁
    acquired = redis.set(lock_key, "1", nx=True, ex=10)
    if acquired:
        try:
            # 重建缓存
            redis.setex(key, 3600, new_value)
        finally:
            redis.delete(lock_key)
    else:
        # 获取锁失败,等待并重试
        time.sleep(0.1)
        # 使用旧缓存或直接查询数据库

问题5:分布式锁死锁

症状:多个进程在等待分布式锁,导致系统卡死。

原因分析

  1. 获取锁顺序不一致
  2. 锁持有时间过长
  3. 异常情况下锁未释放
  4. 网络分区

解决方案

# 1. 统一锁的获取顺序
class DeadlockFreeLock:
    """无死锁锁获取器"""
    def __init__(self, redis_client):
        self.redis = redis_client
    async def acquire_multiple_locks(self, lock_names: list[str], timeout: int = 10):
        """
        获取多个锁,按顺序获取避免死锁
        Args:
            lock_names: 锁名称列表
            timeout: 单个锁超时时间
        """
        # 按名称排序,确保所有进程按相同顺序获取锁
        sorted_locks = sorted(lock_names)
        acquired_locks = []
        try:
            for lock_name in sorted_locks:
                lock_key = f"lock:{lock_name}"
                # 获取锁
                acquired = self.redis.set(lock_key, "1", nx=True, ex=timeout)
                if not acquired:
                    # 获取失败,释放已获取的锁
                    await self.release_locks(acquired_locks)
                    return False
                acquired_locks.append(lock_key)
            return True
        except Exception as e:
            # 异常,释放所有锁
            await self.release_locks(acquired_locks)
            raise
    async def release_locks(self, lock_keys: list[str]):
        """释放多个锁"""
        for lock_key in lock_keys:
            self.redis.delete(lock_key)
# 2. 设置锁超时时间
def get_lock_with_timeout(lock_name: str, timeout: int = 10):
    """
    获取带超时的锁
    确保锁不会永远持有
    """
    lock_key = f"lock:{lock_name}"
    lock_value = str(uuid.uuid4())
    # 设置锁,带过期时间
    acquired = redis.set(lock_key, lock_value, nx=True, ex=timeout)
    if not acquired:
        return None
    # 返回锁对象,支持自动释放
    return LockObject(lock_key, lock_value, redis)
class LockObject:
    """锁对象,支持上下文管理"""
    def __init__(self, key: str, value: str, redis_client):
        self.key = key
        self.value = value
        self.redis = redis_client
    def __enter__(self):
        return self
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.release()
    def release(self):
        """释放锁"""
        lua_script = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
        """
        self.redis.eval(lua_script, 1, self.key, self.value)
# 3. 使用Try-Lock模式
async def try_lock_execute(lock_name: str, func, max_retries=3, retry_delay=1):
    """
    尝试获取锁并执行函数
    如果获取锁失败,重试或放弃
    """
    for attempt in range(max_retries):
        lock = get_lock_with_timeout(lock_name, timeout=10)
        if lock:
            with lock:
                return func()
        else:
            # 获取锁失败,等待后重试
            await asyncio.sleep(retry_delay)
    # 重试次数用尽,放弃
    raise Exception(f"Failed to acquire lock after {max_retries} attempts")
# 4. 检测死锁
def detect_deadlock():
    """
    检测死锁
    通过检查锁的持有时间判断
    """
    # 获取所有锁
    lock_keys = redis.keys("lock:*")
    for lock_key in lock_keys:
        # 获取锁的剩余时间
        ttl = redis.ttl(lock_key)
        # 如果锁的剩余时间小于5秒,可能即将过期
        if ttl < 5:
            logger.warning(f"Lock {lock_key} will expire soon, possible deadlock")

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐