1. 说在前面:是时候把所学融会贯通了

兄弟们,恭喜你走到了这里!这是《Python 进阶之路:从工程师到架构师》系列的最后一讲。

在前面的 9 讲里,我们学习了:

  • 性能优化与调优
  • 异步编程深度解析
  • 分布式系统入门
  • 高并发架构设计
  • 安全编程实战
  • 数据工程与 ETL
  • 设计模式进阶
  • Python 内部机制
  • DevOps 与 CI/CD

这一讲,咱们把这些知识整合起来,从零构建一个高可用的电商订单 API 服务。这个项目会涵盖:

  • FastAPI 异步 Web 框架
  • PostgreSQL 数据库
  • Redis 缓存
  • JWT 认证
  • 限流与熔断
  • Docker 容器化
  • Prometheus 监控
  • GitHub Actions CI/CD

2. 项目架构设计

2.1 系统架构图

                    ┌─────────────┐
                    │   Nginx     │
                    │ (负载均衡)   │
                    └──────┬──────┘
                           │
          ┌────────────────┼────────────────┐
          │                │                │
    ┌─────▼─────┐    ┌─────▼─────┐    ┌─────▼─────┐
    │  API 服务  │    │  API 服务  │    │  API 服务  │
    │  (实例 1)  │    │  (实例 2)  │    │  (实例 3)  │
    └─────┬─────┘    └─────┬─────┘    └─────┬─────┘
          │                │                │
          └────────────────┼────────────────┘
                           │
    ┌──────────────────────┼──────────────────────┐
    │                      │                      │
┌───▼───┐            ┌─────▼─────┐          ┌─────▼─────┐
│ Redis │            │ PostgreSQL│          │ Prometheus│
│(缓存)  │            │  (数据库)  │          │  (监控)   │
└───────┘            └───────────┘          └───────────┘

2.2 技术栈

层级 技术选型
Web 框架 FastAPI
数据库 PostgreSQL + SQLAlchemy
缓存 Redis
认证 JWT
容器化 Docker + Docker Compose
CI/CD GitHub Actions
监控 Prometheus + Grafana
负载均衡 Nginx

2.3 微服务架构设计

本项目采用单体架构向微服务演进的设计思路,初期以单体形式部署,便于开发和测试,后期可按业务领域拆分为独立服务。

2.3.1 服务拆分规划
服务名称 职责 数据库 端口
API Gateway 统一入口、路由、鉴权 - 80
User Service 用户注册、登录、信息管理 PostgreSQL 8001
Product Service 商品管理、库存管理 PostgreSQL 8002
Order Service 订单创建、查询、取消 PostgreSQL 8003
Notification Service 邮件、短信通知 - 8004
2.3.2 服务间通信
  • 同步通信:服务间调用使用 HTTP/gRPC(内部网络)
  • 异步通信:订单事件通过消息队列(Redis Pub/Sub 或 RabbitMQ)广播
2.3.3 项目结构
ecommerce-api/
├── .github/
│   └── workflows/
│       └── ci.yml
├── app/
│   ├── __init__.py
│   ├── main.py
│   ├── config.py
│   ├── database.py
│   ├── models/
│   │   ├── __init__.py
│   │   ├── user.py
│   │   ├── product.py
│   │   └── order.py
│   ├── schemas/
│   │   ├── __init__.py
│   │   ├── user.py
│   │   ├── product.py
│   │   └── order.py
│   ├── api/
│   │   ├── __init__.py
│   │   ├── auth.py
│   │   ├── users.py
│   │   ├── products.py
│   │   └── orders.py
│   ├── services/
│   │   ├── __init__.py
│   │   ├── auth_service.py
│   │   ├── user_service.py
│   │   ├── product_service.py
│   │   └── order_service.py
│   ├── middleware/
│   │   ├── __init__.py
│   │   ├── rate_limit.py
│   │   └── circuit_breaker.py
│   └── utils/
│       ├── __init__.py
│       ├── security.py
│       └── cache.py
├── tests/
│   ├── __init__.py
│   ├── conftest.py
│   ├── test_auth.py
│   ├── test_products.py
│   └── test_orders.py
├── k8s/
│   ├── namespace.yaml
│   ├── configmap.yaml
│   ├── secret.yaml
│   ├── deployment.yaml
│   ├── service.yaml
│   ├── ingress.yaml
│   └── hpa.yaml
├── monitoring/
│   ├── prometheus.yml
│   ├── grafana-dashboard.json
│   └── alertmanager.yml
├── logging/
│   └── fluentd.conf
├── Dockerfile
├── docker-compose.yml
├── nginx.conf
├── requirements.txt
└── README.md

3. 数据库设计

3.1 ER图设计

┌─────────────┐       ┌─────────────┐       ┌─────────────┐
│    users    │       │   orders    │       │ order_items │
├─────────────┤       ├─────────────┤       ├─────────────┤
│ id (PK)     │──┐    │ id (PK)     │──┐    │ id (PK)     │
│ email       │  │    │ user_id(FK) │  └───│ order_id(FK)│
│ username    │  └───│ status      │       │ product_id  │
│ password    │       │ total_amount│       │ quantity    │
│ is_active   │       │ created_at  │       │ unit_price  │
│ created_at  │       │ updated_at  │       └──────┬──────┘
└─────────────┘       └─────────────┘              │
                                                   │
                                            ┌──────▼──────┐
                                            │  products   │
                                            ├─────────────┤
                                            │ id (PK)     │
                                            │ name        │
                                            │ description │
                                            │ price       │
                                            │ stock       │
                                            │ is_active   │
                                            └─────────────┘

3.2 数据库索引设计

表名 索引字段 索引类型 说明
users email UNIQUE 邮箱唯一
users username UNIQUE 用户名唯一
orders user_id BTREE 用户订单查询
orders created_at BTREE 时间范围查询
order_items order_id BTREE 订单详情查询
order_items product_id BTREE 商品销量统计
products name BTREE 商品搜索
products is_active BTREE 筛选有效商品

3.3 数据库优化策略

  1. 连接池配置:SQLAlchemy 连接池大小设为 10,最大溢出 20
  2. 读写分离:主库写入,从库查询(后期扩展)
  3. 分表策略:订单表按年份分表(orders_2024, orders_2025)
  4. 归档策略:3个月前订单归档到历史表

4. API 设计

4.1 RESTful API 规范

方法 路径 描述 认证
POST /api/auth/register 用户注册
POST /api/auth/login 用户登录
POST /api/auth/refresh 刷新Token
GET /api/users/me 获取当前用户信息
PUT /api/users/me 更新用户信息
GET /api/products 获取商品列表
GET /api/products/{id} 获取商品详情
POST /api/products 创建商品(管理员)
PUT /api/products/{id} 更新商品(管理员)
DELETE /api/products/{id} 删除商品(管理员)
GET /api/orders 获取用户订单列表
GET /api/orders/{id} 获取订单详情
POST /api/orders 创建订单
PUT /api/orders/{id}/cancel 取消订单
GET /health 健康检查
GET /metrics 监控指标

4.2 请求/响应规范

统一响应格式

{
    "code": 200,
    "message": "success",
    "data": {},
    "timestamp": "2024-01-01T00:00:00Z"
}

错误响应格式

{
    "code": 400,
    "message": "请求参数错误",
    "errors": {
        "field": "错误详情"
    },
    "timestamp": "2024-01-01T00:00:00Z"
}

4.3 分页规范

{
    "code": 200,
    "data": {
        "items": [],
        "total": 100,
        "page": 1,
        "page_size": 20,
        "total_pages": 5
    }
}

5. 核心代码实现

5.1 配置管理

from pydantic_settings import BaseSettings
from functools import lru_cache

class Settings(BaseSettings):
    APP_NAME: str = "E-Commerce API"
    APP_VERSION: str = "1.0.0"
    DEBUG: bool = False
  
    DATABASE_URL: str
    REDIS_URL: str
  
    SECRET_KEY: str
    ALGORITHM: str = "HS256"
    ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
    REFRESH_TOKEN_EXPIRE_DAYS: int = 7
  
    RATE_LIMIT_REQUESTS: int = 100
    RATE_LIMIT_WINDOW: int = 60
  
    CIRCUIT_BREAKER_THRESHOLD: int = 5
    CIRCUIT_BREAKER_TIMEOUT: int = 30
  
    class Config:
        env_file = ".env"

@lru_cache()
def get_settings() -> Settings:
    return Settings()

5.2 数据模型定义(Schemas)

from pydantic import BaseModel, EmailStr, Field
from typing import Optional, List
from datetime import datetime
from enum import Enum

# 用户相关
class UserBase(BaseModel):
    email: EmailStr
    username: str = Field(..., min_length=3, max_length=50)

class UserCreate(UserBase):
    password: str = Field(..., min_length=8)

class UserLogin(BaseModel):
    username: str
    password: str

class UserResponse(UserBase):
    id: int
    is_active: bool
    created_at: datetime
  
    class Config:
        from_attributes = True

# 商品相关
class ProductBase(BaseModel):
    name: str = Field(..., min_length=1, max_length=200)
    description: Optional[str] = None
    price: float = Field(..., gt=0)
    stock: int = Field(..., ge=0)

class ProductCreate(ProductBase):
    pass

class ProductResponse(ProductBase):
    id: int
    is_active: bool
    created_at: datetime
  
    class Config:
        from_attributes = True

# 订单相关
class OrderStatus(str, Enum):
    PENDING = "pending"
    PAID = "paid"
    SHIPPED = "shipped"
    COMPLETED = "completed"
    CANCELLED = "cancelled"

class OrderItemCreate(BaseModel):
    product_id: int
    quantity: int = Field(..., ge=1)

class OrderItemResponse(BaseModel):
    product_id: int
    quantity: int
    unit_price: float
  
    class Config:
        from_attributes = True

class OrderCreate(BaseModel):
    items: List[OrderItemCreate] = Field(..., min_items=1)

class OrderResponse(BaseModel):
    id: int
    user_id: int
    status: OrderStatus
    total_amount: float
    items: List[OrderItemResponse]
    created_at: datetime
  
    class Config:
        from_attributes = True

# Token
class Token(BaseModel):
    access_token: str
    refresh_token: str
    token_type: str = "bearer"

# 分页
class PaginationParams(BaseModel):
    page: int = Field(1, ge=1)
    page_size: int = Field(20, ge=1, le=100)

class PaginatedResponse(BaseModel):
    items: List
    total: int
    page: int
    page_size: int
    total_pages: int

5.3 数据库模型

from sqlalchemy import Column, Integer, String, Float, Boolean, DateTime, ForeignKey, Enum
from sqlalchemy.orm import relationship
from datetime import datetime
import enum
from app.database import Base

class OrderStatus(str, enum.Enum):
    PENDING = "pending"
    PAID = "paid"
    SHIPPED = "shipped"
    COMPLETED = "completed"
    CANCELLED = "cancelled"

class User(Base):
    __tablename__ = "users"
  
    id = Column(Integer, primary_key=True, index=True)
    email = Column(String, unique=True, index=True, nullable=False)
    username = Column(String, unique=True, index=True, nullable=False)
    hashed_password = Column(String, nullable=False)
    is_active = Column(Boolean, default=True)
    is_superuser = Column(Boolean, default=False)
    created_at = Column(DateTime, default=datetime.utcnow)
    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
  
    orders = relationship("Order", back_populates="user")

class Product(Base):
    __tablename__ = "products"
  
    id = Column(Integer, primary_key=True, index=True)
    name = Column(String, index=True, nullable=False)
    description = Column(String)
    price = Column(Float, nullable=False)
    stock = Column(Integer, default=0)
    is_active = Column(Boolean, default=True)
    created_at = Column(DateTime, default=datetime.utcnow)
    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
  
    order_items = relationship("OrderItem", back_populates="product")

class Order(Base):
    __tablename__ = "orders"
  
    id = Column(Integer, primary_key=True, index=True)
    user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
    status = Column(Enum(OrderStatus), default=OrderStatus.PENDING)
    total_amount = Column(Float, default=0)
    created_at = Column(DateTime, default=datetime.utcnow)
    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
  
    user = relationship("User", back_populates="orders")
    items = relationship("OrderItem", back_populates="order", cascade="all, delete-orphan")

class OrderItem(Base):
    __tablename__ = "order_items"
  
    id = Column(Integer, primary_key=True, index=True)
    order_id = Column(Integer, ForeignKey("orders.id"), nullable=False)
    product_id = Column(Integer, ForeignKey("products.id"), nullable=False)
    quantity = Column(Integer, nullable=False)
    unit_price = Column(Float, nullable=False)
  
    order = relationship("Order", back_populates="items")
    product = relationship("Product", back_populates="order_items")

5.4 数据库连接

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import declarative_base
from app.config import get_settings

settings = get_settings()

engine = create_async_engine(
    settings.DATABASE_URL.replace("postgresql://", "postgresql+asyncpg://"),
    echo=settings.DEBUG,
    pool_size=10,
    max_overflow=20,
    pool_pre_ping=True
)

AsyncSessionLocal = async_sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False,
    autocommit=False,
    autoflush=False
)

Base = declarative_base()

async def get_db():
    async with AsyncSessionLocal() as session:
        try:
            yield session
        finally:
            await session.close()

async def init_db():
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)

5.5 Redis 缓存

import redis.asyncio as redis
from typing import Optional, Any
import json
from app.config import get_settings

settings = get_settings()

class CacheManager:
    def __init__(self):
        self.client: Optional[redis.Redis] = None
  
    async def connect(self):
        self.client = redis.from_url(
            settings.REDIS_URL,
            encoding="utf-8",
            decode_responses=True
        )
  
    async def disconnect(self):
        if self.client:
            await self.client.close()
  
    async def get(self, key: str) -> Optional[Any]:
        if not self.client:
            return None
        value = await self.client.get(key)
        if value:
            return json.loads(value)
        return None
  
    async def set(self, key: str, value: Any, expire: int = 300):
        if not self.client:
            return
        await self.client.set(key, json.dumps(value), ex=expire)
  
    async def delete(self, key: str):
        if not self.client:
            return
        await self.client.delete(key)
  
    async def incr(self, key: str) -> int:
        if not self.client:
            return 0
        return await self.client.incr(key)
  
    async def expire(self, key: str, seconds: int):
        if not self.client:
            return
        await self.client.expire(key, seconds)

cache = CacheManager()

5.6 安全工具

import bcrypt
import jwt
from datetime import datetime, timedelta
from typing import Optional
from app.config import get_settings

settings = get_settings()

def hash_password(password: str) -> str:
    salt = bcrypt.gensalt(rounds=12)
    return bcrypt.hashpw(password.encode(), salt).decode()

def verify_password(password: str, hashed: str) -> bool:
    return bcrypt.checkpw(password.encode(), hashed.encode())

def create_access_token(user_id: int, username: str) -> str:
    expire = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
    payload = {
        "sub": str(user_id),
        "username": username,
        "type": "access",
        "exp": expire
    }
    return jwt.encode(payload, settings.SECRET_KEY, algorithm=settings.ALGORITHM)

def create_refresh_token(user_id: int) -> str:
    expire = datetime.utcnow() + timedelta(days=settings.REFRESH_TOKEN_EXPIRE_DAYS)
    payload = {
        "sub": str(user_id),
        "type": "refresh",
        "exp": expire
    }
    return jwt.encode(payload, settings.SECRET_KEY, algorithm=settings.ALGORITHM)

def decode_token(token: str) -> Optional[dict]:
    try:
        return jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
    except jwt.InvalidTokenError:
        return None

5.7 熔断器中间件

from fastapi import Request, HTTPException
from starlette.middleware.base import BaseHTTPMiddleware
from enum import Enum
import time
from typing import Dict
from app.config import get_settings

settings = get_settings()

class CircuitState(Enum):
    CLOSED = "closed"      # 正常状态
    OPEN = "open"          # 熔断状态
    HALF_OPEN = "half_open"  # 半开状态

class CircuitBreaker:
    def __init__(self, threshold: int = 5, timeout: int = 30):
        self.threshold = threshold
        self.timeout = timeout
        self.failures: Dict[str, int] = {}
        self.last_failure_time: Dict[str, float] = {}
        self.state: Dict[str, CircuitState] = {}
  
    def get_state(self, key: str) -> CircuitState:
        return self.state.get(key, CircuitState.CLOSED)
  
    def record_success(self, key: str):
        if key in self.failures:
            del self.failures[key]
        if key in self.last_failure_time:
            del self.last_failure_time[key]
        self.state[key] = CircuitState.CLOSED
  
    def record_failure(self, key: str):
        current_time = time.time()
        self.failures[key] = self.failures.get(key, 0) + 1
        self.last_failure_time[key] = current_time
      
        if self.failures[key] >= self.threshold:
            self.state[key] = CircuitState.OPEN
  
    def can_execute(self, key: str) -> bool:
        state = self.get_state(key)
      
        if state == CircuitState.CLOSED:
            return True
      
        if state == CircuitState.OPEN:
            last_fail = self.last_failure_time.get(key, 0)
            if time.time() - last_fail > self.timeout:
                self.state[key] = CircuitState.HALF_OPEN
                return True
            return False
      
        return True  # HALF_OPEN

circuit_breaker = CircuitBreaker(
    threshold=settings.CIRCUIT_BREAKER_THRESHOLD,
    timeout=settings.CIRCUIT_BREAKER_TIMEOUT
)

class CircuitBreakerMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        # 只对特定路径启用熔断
        if not request.url.path.startswith("/api/"):
            return await call_next(request)
      
        service_key = request.url.path.split("/")[2]  # /api/service/...
      
        if not circuit_breaker.can_execute(service_key):
            raise HTTPException(
                status_code=503,
                detail="服务暂时不可用,请稍后重试"
            )
      
        try:
            response = await call_next(request)
          
            # 5xx 错误视为服务故障
            if response.status_code >= 500:
                circuit_breaker.record_failure(service_key)
            else:
                circuit_breaker.record_success(service_key)
          
            return response
        except Exception as e:
            circuit_breaker.record_failure(service_key)
            raise

5.8 限流中间件

from fastapi import Request, HTTPException
from starlette.middleware.base import BaseHTTPMiddleware
from app.utils.cache import cache
from app.config import get_settings
import time

settings = get_settings()

class RateLimitMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        if request.url.path in ["/health", "/metrics", "/docs", "/openapi.json"]:
            return await call_next(request)
      
        client_ip = request.client.host
        key = f"rate_limit:{client_ip}"
      
        current = await cache.incr(key)
      
        if current == 1:
            await cache.expire(key, settings.RATE_LIMIT_WINDOW)
      
        if current > settings.RATE_LIMIT_REQUESTS:
            raise HTTPException(
                status_code=429,
                detail="请求过于频繁,请稍后重试"
            )
      
        response = await call_next(request)
        response.headers["X-RateLimit-Limit"] = str(settings.RATE_LIMIT_REQUESTS)
        response.headers["X-RateLimit-Remaining"] = str(max(0, settings.RATE_LIMIT_REQUESTS - current))
      
        return response

5.9 依赖注入与认证

from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db
from app.utils.security import decode_token
from app.models.user import User
from sqlalchemy import select

security = HTTPBearer()

async def get_current_user(
    credentials: HTTPAuthorizationCredentials = Depends(security),
    db: AsyncSession = Depends(get_db)
) -> User:
    token = credentials.credentials
    payload = decode_token(token)
  
    if not payload or payload.get("type") != "access":
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="无效的认证凭证",
            headers={"WWW-Authenticate": "Bearer"}
        )
  
    user_id = int(payload.get("sub"))
    result = await db.execute(select(User).where(User.id == user_id))
    user = result.scalar_one_or_none()
  
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="用户不存在"
        )
  
    if not user.is_active:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="账户已被禁用"
        )
  
    return user

async def get_current_active_user(
    current_user: User = Depends(get_current_user)
) -> User:
    return current_user

async def get_current_superuser(
    current_user: User = Depends(get_current_user)
) -> User:
    if not current_user.is_superuser:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="需要管理员权限"
        )
    return current_user

5.10 认证服务

from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from app.models.user import User
from app.utils.security import hash_password, verify_password, create_access_token, create_refresh_token
from app.schemas.user import UserCreate, UserLogin, Token
from fastapi import HTTPException

class AuthService:
    def __init__(self, db: AsyncSession):
        self.db = db
  
    async def register(self, user_data: UserCreate) -> User:
        result = await self.db.execute(
            select(User).where(User.email == user_data.email)
        )
        if result.scalar_one_or_none():
            raise HTTPException(status_code=400, detail="邮箱已被注册")
      
        result = await self.db.execute(
            select(User).where(User.username == user_data.username)
        )
        if result.scalar_one_or_none():
            raise HTTPException(status_code=400, detail="用户名已被使用")
      
        user = User(
            email=user_data.email,
            username=user_data.username,
            hashed_password=hash_password(user_data.password)
        )
      
        self.db.add(user)
        await self.db.commit()
        await self.db.refresh(user)
      
        return user
  
    async def login(self, credentials: UserLogin) -> Token:
        result = await self.db.execute(
            select(User).where(User.username == credentials.username)
        )
        user = result.scalar_one_or_none()
      
        if not user or not verify_password(credentials.password, user.hashed_password):
            raise HTTPException(status_code=401, detail="用户名或密码错误")
      
        if not user.is_active:
            raise HTTPException(status_code=401, detail="账户已被禁用")
      
        return Token(
            access_token=create_access_token(user.id, user.username),
            refresh_token=create_refresh_token(user.id),
            token_type="bearer"
        )

5.11 商品服务

from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func
from app.models.product import Product
from app.schemas.product import ProductCreate, ProductResponse
from app.utils.cache import cache
from fastapi import HTTPException
from typing import List, Optional

class ProductService:
    def __init__(self, db: AsyncSession):
        self.db = db
  
    async def create_product(self, product_data: ProductCreate) -> Product:
        product = Product(**product_data.model_dump())
        self.db.add(product)
        await self.db.commit()
        await self.db.refresh(product)
      
        # 清除缓存
        await cache.delete("products:list")
      
        return product
  
    async def get_product(self, product_id: int) -> Optional[Product]:
        cache_key = f"product:{product_id}"
        cached = await cache.get(cache_key)
      
        if cached:
            return Product(**cached)
      
        result = await self.db.execute(
            select(Product).where(Product.id == product_id, Product.is_active == True)
        )
        product = result.scalar_one_or_none()
      
        if product:
            await cache.set(cache_key, {
                "id": product.id,
                "name": product.name,
                "description": product.description,
                "price": product.price,
                "stock": product.stock,
                "is_active": product.is_active
            }, expire=300)
      
        return product
  
    async def get_products(
        self,
        skip: int = 0,
        limit: int = 20,
        search: Optional[str] = None
    ) -> tuple[List[Product], int]:
        cache_key = f"products:list:{skip}:{limit}:{search}"
        cached = await cache.get(cache_key)
      
        if cached:
            return cached["items"], cached["total"]
      
        query = select(Product).where(Product.is_active == True)
        count_query = select(func.count()).select_from(Product).where(Product.is_active == True)
      
        if search:
            query = query.where(Product.name.ilike(f"%{search}%"))
            count_query = count_query.where(Product.name.ilike(f"%{search}%"))
      
        query = query.offset(skip).limit(limit)
      
        result = await self.db.execute(query)
        products = result.scalars().all()
      
        count_result = await self.db.execute(count_query)
        total = count_result.scalar()
      
        await cache.set(cache_key, {"items": products, "total": total}, expire=60)
      
        return products, total
  
    async def update_product(
        self,
        product_id: int,
        product_data: ProductCreate
    ) -> Product:
        result = await self.db.execute(
            select(Product).where(Product.id == product_id)
        )
        product = result.scalar_one_or_none()
      
        if not product:
            raise HTTPException(status_code=404, detail="商品不存在")
      
        for key, value in product_data.model_dump().items():
            setattr(product, key, value)
      
        await self.db.commit()
        await self.db.refresh(product)
      
        # 清除缓存
        await cache.delete(f"product:{product_id}")
        await cache.delete("products:list")
      
        return product
  
    async def delete_product(self, product_id: int) -> None:
        result = await self.db.execute(
            select(Product).where(Product.id == product_id)
        )
        product = result.scalar_one_or_none()
      
        if not product:
            raise HTTPException(status_code=404, detail="商品不存在")
      
        product.is_active = False
        await self.db.commit()
      
        # 清除缓存
        await cache.delete(f"product:{product_id}")
        await cache.delete("products:list")

5.12 订单服务

from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from sqlalchemy.orm import selectinload
from app.models.order import Order, OrderItem, OrderStatus
from app.models.product import Product
from app.schemas.order import OrderCreate, OrderResponse
from app.utils.cache import cache
from fastapi import HTTPException
from typing import List

class OrderService:
    def __init__(self, db: AsyncSession):
        self.db = db
  
    async def create_order(self, user_id: int, order_data: OrderCreate) -> Order:
        total_amount = 0
        items = []
      
        for item in order_data.items:
            result = await self.db.execute(
                select(Product).where(Product.id == item.product_id, Product.is_active == True)
            )
            product = result.scalar_one_or_none()
          
            if not product:
                raise HTTPException(status_code=404, detail=f"商品 {item.product_id} 不存在")
          
            if product.stock < item.quantity:
                raise HTTPException(status_code=400, detail=f"商品 {product.name} 库存不足")
          
            product.stock -= item.quantity
            item_total = product.price * item.quantity
            total_amount += item_total
          
            items.append(OrderItem(
                product_id=product.id,
                quantity=item.quantity,
                unit_price=product.price
            ))
      
        order = Order(
            user_id=user_id,
            status=OrderStatus.PENDING,
            total_amount=total_amount
        )
        order.items = items
      
        self.db.add(order)
        await self.db.commit()
        await self.db.refresh(order)
      
        await cache.delete(f"user_orders:{user_id}")
      
        return order
  
    async def get_user_orders(self, user_id: int) -> List[Order]:
        cache_key = f"user_orders:{user_id}"
        cached = await cache.get(cache_key)
      
        if cached:
            return cached
      
        result = await self.db.execute(
            select(Order)
            .options(selectinload(Order.items))
            .where(Order.user_id == user_id)
            .order_by(Order.created_at.desc())
        )
        orders = result.scalars().all()
      
        orders_data = [
            {
                "id": o.id,
                "status": o.status.value,
                "total_amount": o.total_amount,
                "items": [
                    {"product_id": i.product_id, "quantity": i.quantity, "unit_price": i.unit_price}
                    for i in o.items
                ],
                "created_at": o.created_at.isoformat()
            }
            for o in orders
        ]
      
        await cache.set(cache_key, orders_data, expire=60)
      
        return orders
  
    async def cancel_order(self, user_id: int, order_id: int) -> Order:
        result = await self.db.execute(
            select(Order)
            .options(selectinload(Order.items))
            .where(Order.id == order_id, Order.user_id == user_id)
        )
        order = result.scalar_one_or_none()
      
        if not order:
            raise HTTPException(status_code=404, detail="订单不存在")
      
        if order.status not in [OrderStatus.PENDING, OrderStatus.PAID]:
            raise HTTPException(status_code=400, detail="订单状态不允许取消")
      
        for item in order.items:
            result = await self.db.execute(
                select(Product).where(Product.id == item.product_id)
            )
            product = result.scalar_one_or_none()
            if product:
                product.stock += item.quantity
      
        order.status = OrderStatus.CANCELLED
        await self.db.commit()
        await self.db.refresh(order)
      
        await cache.delete(f"user_orders:{user_id}")
      
        return order

5.13 API路由

# app/api/auth.py
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db
from app.schemas.user import UserCreate, UserLogin, Token, UserResponse
from app.services.auth_service import AuthService

router = APIRouter()

@router.post("/register", response_model=UserResponse)
async def register(
    user_data: UserCreate,
    db: AsyncSession = Depends(get_db)
):
    auth_service = AuthService(db)
    return await auth_service.register(user_data)

@router.post("/login", response_model=Token)
async def login(
    credentials: UserLogin,
    db: AsyncSession = Depends(get_db)
):
    auth_service = AuthService(db)
    return await auth_service.login(credentials)
# app/api/products.py
from fastapi import APIRouter, Depends, Query
from sqlalchemy.ext.asyncio import AsyncSession
from typing import Optional
from app.database import get_db
from app.schemas.product import ProductCreate, ProductResponse
from app.schemas.user import UserResponse
from app.services.product_service import ProductService
from app.api.deps import get_current_user, get_current_superuser
from app.models.user import User

router = APIRouter()

@router.get("/", response_model=dict)
async def get_products(
    page: int = Query(1, ge=1),
    page_size: int = Query(20, ge=1, le=100),
    search: Optional[str] = None,
    db: AsyncSession = Depends(get_db),
    current_user: User = Depends(get_current_user)
):
    product_service = ProductService(db)
    skip = (page - 1) * page_size
    products, total = await product_service.get_products(
        skip=skip, limit=page_size, search=search
    )
    total_pages = (total + page_size - 1) // page_size
  
    return {
        "code": 200,
        "data": {
            "items": products,
            "total": total,
            "page": page,
            "page_size": page_size,
            "total_pages": total_pages
        }
    }

@router.get("/{product_id}", response_model=dict)
async def get_product(
    product_id: int,
    db: AsyncSession = Depends(get_db),
    current_user: User = Depends(get_current_user)
):
    product_service = ProductService(db)
    product = await product_service.get_product(product_id)
    if not product:
        raise HTTPException(status_code=404, detail="商品不存在")
    return {"code": 200, "data": product}

@router.post("/", response_model=dict)
async def create_product(
    product_data: ProductCreate,
    db: AsyncSession = Depends(get_db),
    current_user: User = Depends(get_current_superuser)
):
    product_service = ProductService(db)
    product = await product_service.create_product(product_data)
    return {"code": 200, "data": product}
# app/api/orders.py
from fastapi import APIRouter, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db
from app.schemas.order import OrderCreate, OrderResponse
from app.services.order_service import OrderService
from app.api.deps import get_current_user
from app.models.user import User

router = APIRouter()

@router.get("/")
async def get_orders(
    db: AsyncSession = Depends(get_db),
    current_user: User = Depends(get_current_user)
):
    order_service = OrderService(db)
    orders = await order_service.get_user_orders(current_user.id)
    return {"code": 200, "data": orders}

@router.post("/")
async def create_order(
    order_data: OrderCreate,
    db: AsyncSession = Depends(get_db),
    current_user: User = Depends(get_current_user)
):
    order_service = OrderService(db)
    order = await order_service.create_order(current_user.id, order_data)
    return {"code": 200, "data": order}

@router.put("/{order_id}/cancel")
async def cancel_order(
    order_id: int,
    db: AsyncSession = Depends(get_db),
    current_user: User = Depends(get_current_user)
):
    order_service = OrderService(db)
    order = await order_service.cancel_order(current_user.id, order_id)
    return {"code": 200, "data": order}

5.14 主应用

from fastapi import FastAPI, Depends
from fastapi.middleware.cors import CORSMiddleware
from prometheus_client import Counter, Histogram, generate_latest
from fastapi.responses import Response
import time
from contextlib import asynccontextmanager

from app.config import get_settings
from app.database import init_db, get_db
from app.utils.cache import cache
from app.middleware.rate_limit import RateLimitMiddleware
from app.middleware.circuit_breaker import CircuitBreakerMiddleware
from app.api import auth, users, products, orders

settings = get_settings()

REQUEST_COUNT = Counter(
    'http_requests_total',
    'Total HTTP requests',
    ['method', 'endpoint', 'status']
)

REQUEST_LATENCY = Histogram(
    'http_request_duration_seconds',
    'HTTP request latency',
    ['method', 'endpoint']
)

@asynccontextmanager
async def lifespan(app: FastAPI):
    await init_db()
    await cache.connect()
    yield
    await cache.disconnect()

app = FastAPI(
    title=settings.APP_NAME,
    version=settings.APP_VERSION,
    lifespan=lifespan
)

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

app.add_middleware(RateLimitMiddleware)
app.add_middleware(CircuitBreakerMiddleware)

@app.middleware("http")
async def add_metrics(request, call_next):
    start_time = time.time()
  
    response = await call_next(request)
  
    latency = time.time() - start_time
    REQUEST_LATENCY.labels(
        method=request.method,
        endpoint=request.url.path
    ).observe(latency)
  
    REQUEST_COUNT.labels(
        method=request.method,
        endpoint=request.url.path,
        status=response.status_code
    ).inc()
  
    return response

app.include_router(auth.router, prefix="/api/auth", tags=["认证"])
app.include_router(users.router, prefix="/api/users", tags=["用户"])
app.include_router(products.router, prefix="/api/products", tags=["商品"])
app.include_router(orders.router, prefix="/api/orders", tags=["订单"])

@app.get("/health")
async def health():
    return {"status": "ok", "version": settings.APP_VERSION}

@app.get("/metrics")
async def metrics():
    return Response(
        content=generate_latest(),
        media_type="text/plain"
    )

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

5.15 依赖文件

# requirements.txt
fastapi==0.109.0
uvicorn[standard]==0.27.0
pydantic==2.5.0
pydantic-settings==2.1.0
sqlalchemy[asyncio]==2.0.25
asyncpg==0.29.0
redis==5.0.1
bcrypt==4.1.2
pyjwt==2.8.0
python-multipart==0.0.6
email-validator==2.1.0
prometheus-client==0.19.0
httpx==0.26.0
pytest==7.4.4
pytest-asyncio==0.23.3
pytest-cov==4.1.0

5.16 环境变量配置

# .env 示例
APP_NAME=E-Commerce API
APP_VERSION=1.0.0
DEBUG=false

DATABASE_URL=postgresql://postgres:password@localhost:5432/ecommerce
REDIS_URL=redis://localhost:6379/0

SECRET_KEY=your-secret-key-here-min-32-characters-long
ALGORITHM=HS256
ACCESS_TOKEN_EXPIRE_MINUTES=30
REFRESH_TOKEN_EXPIRE_DAYS=7

RATE_LIMIT_REQUESTS=100
RATE_LIMIT_WINDOW=60

CIRCUIT_BREAKER_THRESHOLD=5
CIRCUIT_BREAKER_TIMEOUT=30

6. Docker 配置

6.1 Dockerfile

FROM python:3.12-slim as builder

WORKDIR /app

RUN apt-get update && apt-get install -y --no-install-recommends \
    build-essential \
    libpq-dev \
    && rm -rf /var/lib/apt/lists/*

COPY requirements.txt .
RUN pip wheel --no-cache-dir --no-deps --wheel-dir /app/wheels -r requirements.txt

FROM python:3.12-slim

WORKDIR /app

RUN apt-get update && apt-get install -y --no-install-recommends \
    libpq5 \
    && rm -rf /var/lib/apt/lists/*

RUN groupadd -r appuser && useradd -r -g appuser appuser

COPY --from=builder /app/wheels /wheels
COPY --from=builder /app/requirements.txt .
RUN pip install --no-cache /wheels/*

COPY --chown=appuser:appuser . .

USER appuser

EXPOSE 8000

HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
    CMD curl -f http://localhost:8000/health || exit 1

CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]

6.2 Nginx 配置

# nginx.conf
upstream api_servers {
    least_conn;
    server api:8000 weight=5;
}

server {
    listen 80;
    server_name localhost;
  
    # 日志配置
    access_log /var/log/nginx/access.log;
    error_log /var/log/nginx/error.log;
  
    # 健康检查
    location /health {
        proxy_pass http://api_servers/health;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
        proxy_connect_timeout 5s;
        proxy_send_timeout 10s;
        proxy_read_timeout 10s;
    }
  
    # API 路由
    location /api/ {
        proxy_pass http://api_servers/;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
      
        # 超时配置
        proxy_connect_timeout 5s;
        proxy_send_timeout 60s;
        proxy_read_timeout 60s;
      
        # 缓冲区配置
        proxy_buffering on;
        proxy_buffer_size 4k;
        proxy_buffers 8 4k;
      
        # 限流配置(每秒10请求,突发20)
        limit_req zone=api_limit burst=20 nodelay;
    }
  
    # 监控指标
    location /metrics {
        proxy_pass http://api_servers/metrics;
        allow 10.0.0.0/8;
        allow 172.16.0.0/12;
        allow 192.168.0.0/16;
        deny all;
    }
  
    # 静态文件
    location /static/ {
        alias /app/static/;
        expires 30d;
        add_header Cache-Control "public, immutable";
    }
}

# 限流区域定义
limit_req_zone $binary_remote_addr zone=api_limit:10m rate=10r/s;

6.3 docker-compose.yml

version: '3.8'

services:
  api:
    build:
      context: .
      dockerfile: Dockerfile
    ports:
      - "8000:8000"
    environment:
      - DATABASE_URL=postgresql://postgres:password@db:5432/ecommerce
      - REDIS_URL=redis://redis:6379/0
      - SECRET_KEY=${SECRET_KEY}
    depends_on:
      db:
        condition: service_healthy
      redis:
        condition: service_started
    networks:
      - app-network
    restart: unless-stopped
  
  db:
    image: postgres:15-alpine
    environment:
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=password
      - POSTGRES_DB=ecommerce
    volumes:
      - postgres_data:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U postgres -d ecommerce"]
      interval: 10s
      timeout: 5s
      retries: 5
    networks:
      - app-network
    restart: unless-stopped
  
  redis:
    image: redis:7-alpine
    command: redis-server --appendonly yes
    volumes:
      - redis_data:/data
    networks:
      - app-network
    restart: unless-stopped
  
  nginx:
    image: nginx:alpine
    ports:
      - "80:80"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf:ro
    depends_on:
      - api
    networks:
      - app-network
    restart: unless-stopped
  
  prometheus:
    image: prom/prometheus:latest
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml:ro
    networks:
      - app-network
    restart: unless-stopped

networks:
  app-network:
    driver: bridge

volumes:
  postgres_data:
  redis_data:

7. Kubernetes 部署配置

7.1 命名空间

# k8s/namespace.yaml
apiVersion: v1
kind: Namespace
metadata:
  name: ecommerce
  labels:
    name: ecommerce
    environment: production

7.2 配置映射

# k8s/configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: ecommerce-config
  namespace: ecommerce
data:
  APP_NAME: "E-Commerce API"
  APP_VERSION: "1.0.0"
  DEBUG: "false"
  DATABASE_URL: "postgresql://postgres:password@postgres:5432/ecommerce"
  REDIS_URL: "redis://redis:6379/0"
  ALGORITHM: "HS256"
  ACCESS_TOKEN_EXPIRE_MINUTES: "30"
  REFRESH_TOKEN_EXPIRE_DAYS: "7"
  RATE_LIMIT_REQUESTS: "100"
  RATE_LIMIT_WINDOW: "60"
  CIRCUIT_BREAKER_THRESHOLD: "5"
  CIRCUIT_BREAKER_TIMEOUT: "30"

7.3 密钥

# k8s/secret.yaml
apiVersion: v1
kind: Secret
metadata:
  name: ecommerce-secret
  namespace: ecommerce
type: Opaque
data:
  # echo -n 'your-secret-key' | base64
  SECRET_KEY: eW91ci1zZWNyZXQta2V5LWhlcmUtbWluLTMyLWNoYXJhY3RlcnMtbG9uZw==

7.4 部署配置

# k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: ecommerce-api
  namespace: ecommerce
  labels:
    app: ecommerce-api
spec:
  replicas: 3
  selector:
    matchLabels:
      app: ecommerce-api
  template:
    metadata:
      labels:
        app: ecommerce-api
    spec:
      containers:
      - name: api
        image: ecommerce-api:latest
        ports:
        - containerPort: 8000
        envFrom:
        - configMapRef:
            name: ecommerce-config
        env:
        - name: SECRET_KEY
          valueFrom:
            secretKeyRef:
              name: ecommerce-secret
              key: SECRET_KEY
        resources:
          requests:
            memory: "256Mi"
            cpu: "250m"
          limits:
            memory: "512Mi"
            cpu: "500m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 5
          periodSeconds: 5
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: postgres
  namespace: ecommerce
spec:
  replicas: 1
  selector:
    matchLabels:
      app: postgres
  template:
    metadata:
      labels:
        app: postgres
    spec:
      containers:
      - name: postgres
        image: postgres:15-alpine
        ports:
        - containerPort: 5432
        env:
        - name: POSTGRES_USER
          value: postgres
        - name: POSTGRES_PASSWORD
          value: password
        - name: POSTGRES_DB
          value: ecommerce
        volumeMounts:
        - name: postgres-storage
          mountPath: /var/lib/postgresql/data
      volumes:
      - name: postgres-storage
        persistentVolumeClaim:
          claimName: postgres-pvc
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: redis
  namespace: ecommerce
spec:
  replicas: 1
  selector:
    matchLabels:
      app: redis
  template:
    metadata:
      labels:
        app: redis
    spec:
      containers:
      - name: redis
        image: redis:7-alpine
        ports:
        - containerPort: 6379
        command:
        - redis-server
        - --appendonly
        - "yes"
        volumeMounts:
        - name: redis-storage
          mountPath: /data
      volumes:
      - name: redis-storage
        persistentVolumeClaim:
          claimName: redis-pvc

7.5 服务配置

# k8s/service.yaml
apiVersion: v1
kind: Service
metadata:
  name: ecommerce-api
  namespace: ecommerce
spec:
  selector:
    app: ecommerce-api
  ports:
  - port: 80
    targetPort: 8000
  type: ClusterIP
---
apiVersion: v1
kind: Service
metadata:
  name: postgres
  namespace: ecommerce
spec:
  selector:
    app: postgres
  ports:
  - port: 5432
    targetPort: 5432
  type: ClusterIP
---
apiVersion: v1
kind: Service
metadata:
  name: redis
  namespace: ecommerce
spec:
  selector:
    app: redis
  ports:
  - port: 6379
    targetPort: 6379
  type: ClusterIP

7.6 入口配置

# k8s/ingress.yaml
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: ecommerce-ingress
  namespace: ecommerce
  annotations:
    nginx.ingress.kubernetes.io/rewrite-target: /
    nginx.ingress.kubernetes.io/rate-limit: "100"
    nginx.ingress.kubernetes.io/rate-limit-window: "1m"
    cert-manager.io/cluster-issuer: "letsencrypt-prod"
spec:
  ingressClassName: nginx
  tls:
  - hosts:
    - api.ecommerce.com
    secretName: ecommerce-tls
  rules:
  - host: api.ecommerce.com
    http:
      paths:
      - path: /
        pathType: Prefix
        backend:
          service:
            name: ecommerce-api
            port:
              number: 80

7.7 自动扩缩容

# k8s/hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: ecommerce-api-hpa
  namespace: ecommerce
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: ecommerce-api
  minReplicas: 3
  maxReplicas: 10
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
  - type: Resource
    resource:
      name: memory
      target:
        type: Utilization
        averageUtilization: 80
  behavior:
    scaleUp:
      stabilizationWindowSeconds: 60
      policies:
      - type: Percent
        value: 100
        periodSeconds: 60
    scaleDown:
      stabilizationWindowSeconds: 300
      policies:
      - type: Percent
        value: 10
        periodSeconds: 60

7.8 持久化存储

# k8s/pvc.yaml
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: postgres-pvc
  namespace: ecommerce
spec:
  accessModes:
  - ReadWriteOnce
  resources:
    requests:
      storage: 10Gi
  storageClassName: standard
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: redis-pvc
  namespace: ecommerce
spec:
  accessModes:
  - ReadWriteOnce
  resources:
    requests:
      storage: 5Gi
  storageClassName: standard

8. 监控与告警

8.1 Prometheus 配置

# monitoring/prometheus.yml
global:
  scrape_interval: 15s
  evaluation_interval: 15s

alerting:
  alertmanagers:
  - static_configs:
    - targets: ['alertmanager:9093']

rule_files:
  - /etc/prometheus/rules/*.yml

scrape_configs:
  - job_name: 'prometheus'
    static_configs:
    - targets: ['localhost:9090']

  - job_name: 'ecommerce-api'
    static_configs:
    - targets: ['ecommerce-api:8000']
    metrics_path: /metrics
    scrape_interval: 5s

  - job_name: 'postgres'
    static_configs:
    - targets: ['postgres-exporter:9187']

  - job_name: 'redis'
    static_configs:
    - targets: ['redis-exporter:9121']

  - job_name: 'node'
    static_configs:
    - targets: ['node-exporter:9100']

8.2 告警规则

# monitoring/alerts.yml
groups:
- name: ecommerce-api
  rules:
  - alert: HighErrorRate
    expr: rate(http_requests_total{status=~"5.."}[5m]) > 0.05
    for: 5m
    labels:
      severity: critical
    annotations:
      summary: "High error rate detected"
      description: "Error rate is above 5% for 5 minutes"

  - alert: HighLatency
    expr: histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m])) > 0.5
    for: 5m
    labels:
      severity: warning
    annotations:
      summary: "High latency detected"
      description: "95th percentile latency is above 500ms"

  - alert: ServiceDown
    expr: up{job="ecommerce-api"} == 0
    for: 1m
    labels:
      severity: critical
    annotations:
      summary: "Service is down"
      description: "E-Commerce API service is not responding"

  - alert: HighCPUUsage
    expr: 100 - (avg by(instance) (irate(node_cpu_seconds_total{mode="idle"}[5m])) * 100) > 80
    for: 10m
    labels:
      severity: warning
    annotations:
      summary: "High CPU usage"
      description: "CPU usage is above 80% for 10 minutes"

  - alert: HighMemoryUsage
    expr: (node_memory_MemTotal_bytes - node_memory_MemAvailable_bytes) / node_memory_MemTotal_bytes * 100 > 85
    for: 10m
    labels:
      severity: warning
    annotations:
      summary: "High memory usage"
      description: "Memory usage is above 85% for 10 minutes"

8.3 Alertmanager 配置

# monitoring/alertmanager.yml
global:
  smtp_smarthost: 'smtp.gmail.com:587'
  smtp_from: 'alerts@ecommerce.com'
  smtp_auth_username: 'alerts@ecommerce.com'
  smtp_auth_password: 'your-email-password'

route:
  group_by: ['alertname', 'severity']
  group_wait: 10s
  group_interval: 10s
  repeat_interval: 1h
  receiver: 'default'
  routes:
  - match:
      severity: critical
    receiver: 'pagerduty'
    continue: true
  - match:
      severity: warning
    receiver: 'email'

receivers:
- name: 'default'
  slack_configs:
  - api_url: 'https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK'
    channel: '#alerts'
    title: 'E-Commerce API Alert'
    text: '{{ range .Alerts }}{{ .Annotations.summary }}{{ end }}'

- name: 'email'
  email_configs:
  - to: 'ops@ecommerce.com'
    subject: 'E-Commerce API Alert'
    body: '{{ range .Alerts }}{{ .Annotations.description }}{{ end }}'

- name: 'pagerduty'
  pagerduty_configs:
  - service_key: 'your-pagerduty-key'
    description: '{{ .GroupLabels.alertname }}'

8.4 Grafana 仪表板

{
  "dashboard": {
    "title": "E-Commerce API Dashboard",
    "panels": [
      {
        "title": "Request Rate",
        "type": "graph",
        "targets": [
          {
            "expr": "rate(http_requests_total[5m])",
            "legendFormat": "{{method}} {{endpoint}}"
          }
        ]
      },
      {
        "title": "Response Time (95th percentile)",
        "type": "graph",
        "targets": [
          {
            "expr": "histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m]))",
            "legendFormat": "{{method}} {{endpoint}}"
          }
        ]
      },
      {
        "title": "Error Rate",
        "type": "singlestat",
        "targets": [
          {
            "expr": "rate(http_requests_total{status=~\"5..\"}[5m])"
          }
        ]
      },
      {
        "title": "Active Connections",
        "type": "singlestat",
        "targets": [
          {
            "expr": "up{job=\"ecommerce-api\"}"
          }
        ]
      }
    ]
  }
}

9. 日志收集

9.1 Fluentd 配置

# logging/fluentd.conf
<source>
  @type tail
  path /var/log/containers/*.log
  pos_file /var/log/fluentd-docker.pos
  tag kubernetes.*
  <parse>
    @type json
    time_key time
    time_format %Y-%m-%dT%H:%M:%S.%NZ
  </parse>
</source>

<filter kubernetes.**>
  @type kubernetes_metadata
</filter>

<filter kubernetes.**>
  @type grep
  <regexp>
    key $.kubernetes.container_name
    pattern ^ecommerce-api$
  </regexp>
</filter>

<match kubernetes.**>
  @type elasticsearch
  host elasticsearch
  port 9200
  logstash_format true
  logstash_prefix ecommerce-api
  flush_interval 10s
</match>

9.2 应用日志配置

# app/utils/logger.py
import logging
import sys
from pythonjsonlogger import jsonlogger

class CustomJsonFormatter(jsonlogger.JsonFormatter):
    def add_fields(self, log_record, record, message_dict):
        super(CustomJsonFormatter, self).add_fields(log_record, record, message_dict)
        log_record['level'] = record.levelname
        log_record['logger'] = record.name
        log_record['timestamp'] = self.formatTime(record)

def setup_logging():
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
  
    # JSON 格式处理器
    log_handler = logging.StreamHandler(sys.stdout)
    formatter = CustomJsonFormatter('%(timestamp)s %(level)s %(name)s %(message)s')
    log_handler.setFormatter(formatter)
    logger.addHandler(log_handler)
  
    return logger

logger = setup_logging()

9.3 日志使用示例

from app.utils.logger import logger

# 记录请求日志
logger.info("Request processed", extra={
    "method": request.method,
    "path": request.url.path,
    "status_code": response.status_code,
    "duration_ms": duration * 1000,
    "user_id": user_id
})

# 记录错误日志
logger.error("Database connection failed", extra={
    "error": str(e),
    "retry_count": retry_count
})

10. CI/CD 配置

name: CI/CD Pipeline

on:
  push:
    branches: [ main, develop ]
  pull_request:
    branches: [ main ]

jobs:
  test:
    runs-on: ubuntu-latest
  
    services:
      postgres:
        image: postgres:15
        env:
          POSTGRES_USER: test
          POSTGRES_PASSWORD: test
          POSTGRES_DB: test_db
        ports:
          - 5432:5432
        options: >-
          --health-cmd pg_isready
          --health-interval 10s
          --health-timeout 5s
          --health-retries 5
    
      redis:
        image: redis:7
        ports:
          - 6379:6379
  
    steps:
    - uses: actions/checkout@v4
  
    - uses: actions/setup-python@v5
      with:
        python-version: '3.12'
  
    - name: Install dependencies
      run: |
        python -m pip install --upgrade pip
        pip install -r requirements.txt
        pip install pytest pytest-asyncio pytest-cov httpx
  
    - name: Run tests
      env:
        DATABASE_URL: postgresql://test:test@localhost:5432/test_db
        REDIS_URL: redis://localhost:6379/0
        SECRET_KEY: test-secret-key
      run: |
        pytest tests/ -v --cov=app --cov-report=xml
  
    - name: Upload coverage
      uses: codecov/codecov-action@v3
      with:
        file: ./coverage.xml

  build:
    needs: test
    runs-on: ubuntu-latest
    if: github.event_name == 'push'
  
    steps:
    - uses: actions/checkout@v4
  
    - name: Build Docker image
      run: docker build -t ecommerce-api:${{ github.sha }} .
  
    - name: Push to registry
      run: |
        echo ${{ secrets.DOCKER_PASSWORD }} | docker login -u ${{ secrets.DOCKER_USERNAME }} --password-stdin
        docker push ecommerce-api:${{ github.sha }}

  deploy:
    needs: build
    runs-on: ubuntu-latest
    if: github.ref == 'refs/heads/main'
  
    steps:
    - name: Deploy to production
      uses: appleboy/ssh-action@v1.0.0
      with:
        host: ${{ secrets.PRODUCTION_HOST }}
        username: ${{ secrets.PRODUCTION_USER }}
        key: ${{ secrets.SSH_PRIVATE_KEY }}
        script: |
          cd /opt/ecommerce-api
          docker-compose pull
          docker-compose up -d
          docker image prune -f

11. 测试代码

import pytest
from httpx import AsyncClient
from sqlalchemy.ext.asyncio import AsyncSession
from app.main import app
from app.database import AsyncSessionLocal
from app.models.user import User
from app.utils.security import hash_password

@pytest.fixture
async def db_session():
    async with AsyncSessionLocal() as session:
        yield session

@pytest.fixture
async def client():
    async with AsyncClient(app=app, base_url="http://test") as ac:
        yield ac

@pytest.fixture
async def test_user(db_session: AsyncSession):
    user = User(
        email="test@example.com",
        username="testuser",
        hashed_password=hash_password("password123")
    )
    db_session.add(user)
    await db_session.commit()
    await db_session.refresh(user)
    return user

@pytest.fixture
async def auth_token(client: AsyncClient, test_user: User):
    response = await client.post(
        "/api/auth/login",
        json={"username": "testuser", "password": "password123"}
    )
    return response.json()["access_token"]

@pytest.mark.asyncio
async def test_register(client: AsyncClient):
    response = await client.post(
        "/api/auth/register",
        json={
            "email": "newuser@example.com",
            "username": "newuser",
            "password": "password123"
        }
    )
    assert response.status_code == 200
    data = response.json()
    assert data["email"] == "newuser@example.com"
    assert data["username"] == "newuser"

@pytest.mark.asyncio
async def test_login(client: AsyncClient, test_user: User):
    response = await client.post(
        "/api/auth/login",
        json={"username": "testuser", "password": "password123"}
    )
    assert response.status_code == 200
    data = response.json()
    assert "access_token" in data
    assert "refresh_token" in data

@pytest.mark.asyncio
async def test_get_products(client: AsyncClient, auth_token: str):
    response = await client.get(
        "/api/products/",
        headers={"Authorization": f"Bearer {auth_token}"}
    )
    assert response.status_code == 200

@pytest.mark.asyncio
async def test_create_order(client: AsyncClient, auth_token: str, db_session: AsyncSession):
    from app.models.product import Product
  
    product = Product(
        name="测试商品",
        price=99.9,
        stock=100
    )
    db_session.add(product)
    await db_session.commit()
    await db_session.refresh(product)
  
    response = await client.post(
        "/api/orders/",
        headers={"Authorization": f"Bearer {auth_token}"},
        json={
            "items": [
                {"product_id": product.id, "quantity": 2}
            ]
        }
    )
    assert response.status_code == 200
    data = response.json()
    assert data["total_amount"] == 199.8

12. 项目总结与扩展建议

12.1 项目回顾

本项目是一个完整的电商订单 API 服务,涵盖了从开发到部署的全流程。以下是核心实现的功能模块:

模块 实现内容 技术要点
用户认证 注册、登录、JWT Token bcrypt加密、Token刷新机制
商品管理 CRUD、搜索、分页 SQLAlchemy异步查询、缓存策略
订单系统 创建、查询、取消 事务处理、库存扣减、缓存失效
限流保护 IP级别限流 Redis计数器、滑动窗口
熔断机制 服务熔断降级 状态机模式、故障恢复
监控告警 指标采集、告警通知 Prometheus、Grafana、Alertmanager
日志收集 结构化日志 JSON格式、Fluentd、Elasticsearch
容器化 Docker、K8s部署 多阶段构建、健康检查、HPA

12.2 架构设计亮点

  1. 分层架构清晰

    • API层:处理HTTP请求/响应
    • Service层:业务逻辑处理
    • Model层:数据模型定义
    • 职责分离,便于单元测试
  2. 异步全链路

    • 数据库操作使用 asyncpg
    • Redis操作使用 aioredis
    • HTTP服务使用 Uvicorn + FastAPI
    • 最大化I/O并发性能
  3. 缓存策略完善

    • 读多写少数据:5分钟缓存
    • 列表数据:1分钟缓存
    • 写操作后主动失效缓存
    • 避免缓存穿透、雪崩
  4. 安全防护到位

    • JWT Token认证
    • 密码bcrypt加密
    • 限流防刷
    • 熔断保护下游服务

12.3 性能优化建议

数据库优化
优化项 具体措施 预期效果
连接池 调整pool_size和max_overflow 减少连接创建开销
索引优化 添加复合索引 查询性能提升50%+
读写分离 主库写、从库读 支撑更高并发
分库分表 订单表按用户ID分片 单表数据量控制
查询优化 避免N+1查询 减少数据库往返
缓存优化
# 多级缓存示例
async def get_product_with_cache(product_id: int):
    # L1: 本地缓存(如cachetools)
    if product_id in local_cache:
        return local_cache[product_id]
  
    # L2: Redis缓存
    cache_key = f"product:{product_id}"
    cached = await redis.get(cache_key)
    if cached:
        product = json.loads(cached)
        local_cache[product_id] = product
        return product
  
    # L3: 数据库
    product = await db.get(product_id)
    await redis.set(cache_key, json.dumps(product), ex=300)
    local_cache[product_id] = product
    return product
异步优化
# 并行查询示例
async def get_order_details(order_id: int):
    order, items, user = await asyncio.gather(
        get_order(order_id),
        get_order_items(order_id),
        get_order_user(order_id)
    )
    return {"order": order, "items": items, "user": user}

12.4 功能扩展方向

短期扩展(1-2周)
功能 描述 技术方案
支付集成 对接支付宝/微信支付 异步回调、幂等处理
订单超时 未支付订单自动取消 Redis过期监听/Celery定时任务
库存扣减优化 防止超卖 Redis分布式锁 + 数据库乐观锁
邮件通知 订单状态变更通知 Celery + SMTP
中期扩展(1-2月)
功能 描述 技术方案
消息队列 订单事件异步处理 RabbitMQ / Kafka
搜索服务 商品全文搜索 Elasticsearch
推荐系统 个性化商品推荐 协同过滤 / 深度学习
秒杀系统 高并发秒杀场景 Redis预减库存、消息队列削峰
长期规划(3-6月)
功能 描述 技术方案
微服务拆分 独立部署各服务 gRPC通信、服务注册发现
分布式事务 跨服务数据一致性 Saga模式、TCC
链路追踪 全链路性能分析 Jaeger / Zipkin
多租户 SaaS化支持 数据隔离、配置隔离

12.5 生产环境检查清单

部署前务必检查以下事项:

[ ] 环境变量配置正确(SECRET_KEY必须修改)
[ ] 数据库连接使用SSL
[ ] Redis配置密码认证
[ ] 日志级别设置为INFO或WARN
[ ] 关闭DEBUG模式
[ ] 配置合适的资源限制(CPU/内存)
[ ] 健康检查端点可访问
[ ] 监控告警已配置
[ ] 备份策略已制定
[ ] 回滚方案已准备

13. 系列完结

13.1 核心知识点回顾

章节 核心知识点
性能优化 数据库索引、缓存策略、异步处理
异步编程 asyncio、aiohttp、异步数据库
分布式系统 微服务、消息队列、分布式锁
高并发架构 限流、熔断、负载均衡
安全编程 JWT 认证、密码加密、输入验证
DevOps Docker、CI/CD、监控告警

13.2 架构设计要点

  1. 分层架构:API 层 → 服务层 → 数据层,职责清晰。
  2. 异步优先:I/O 操作全部异步,提高并发能力。
  3. 缓存策略:热点数据缓存,减少数据库压力。
  4. 安全防护:限流、认证、输入验证,层层把关。
  5. 可观测性:日志、指标、链路追踪,问题可追溯。

13.3 学习路径建议

如果你是按照这个系列一路学过来的,建议按以下路径继续深入:

基础巩固 → 源码阅读 → 项目实战 → 领域深入
    ↓           ↓           ↓           ↓
Python核心   FastAPI源码   开源贡献    云原生
算法数据结构  SQLAlchemy    个人项目    分布式系统
网络协议     asyncio       技术博客    架构设计

13.4 推荐学习资源

类型 资源名称 说明
书籍 《Fluent Python》 Python进阶必读
书籍 《Designing Data-Intensive Applications》 分布式系统经典
文档 FastAPI官方文档 最权威的学习资料
源码 Celery/Redis-py 学习优秀代码设计
社区 Python Weekly 订阅技术周刊

14. 写在最后

恭喜你完成了《Python 进阶之路:从工程师到架构师》系列!

系列索引

  1. 第1讲 | 性能优化与调优实战
  2. 第2讲 | 异步编程深度解析
  3. 第3讲 | 分布式系统入门
  4. 第4讲 | 高并发架构设计
  5. 第5讲 | 安全编程实战
  6. 第6讲 | 数据工程与 ETL
  7. 第7讲 | 设计模式进阶
  8. 第8讲 | Python 内部机制
  9. 第9讲 | DevOps 与 CI/CD
  10. 第10讲 | 综合项目实战

写在最后
兄弟们,编程是一场长跑,不是短跑。这个系列结束了,但你的学习之路才刚刚开始。

记住三个原则:

  1. 实践出真知:看十遍教程,不如自己动手写一个项目。
  2. 持续学习:技术在不断演进,保持好奇心和学习能力。
  3. 分享交流:教是最好的学,把你的知识分享出去。

如果这个系列对你有帮助,请点赞、收藏、关注! 你的支持是我持续产出高质量内容的动力。

咱们江湖再见,代码里见!


文档总结

本文档完整介绍了从零构建高可用电商订单 API 服务的全过程,主要内容包括:

一、架构设计

  • 微服务架构演进规划
  • 数据库ER图与索引设计
  • RESTful API 规范定义

二、核心实现

  • FastAPI 异步 Web 框架应用
  • SQLAlchemy 异步数据库操作
  • Redis 缓存策略实现
  • JWT 认证与权限控制
  • 限流与熔断中间件

三、部署运维

  • Docker 多阶段构建
  • Kubernetes 完整部署配置
  • Prometheus + Grafana 监控体系
  • Fluentd + Elasticsearch 日志收集
  • GitHub Actions CI/CD 流水线

四、项目扩展

  • 性能优化建议(数据库、缓存、异步)
  • 功能扩展路线图(短期/中期/长期)
  • 生产环境检查清单

通过本项目的学习,你将掌握构建企业级 Python Web 服务的完整技能栈,为成为架构师打下坚实基础。

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐