【Python 进阶之路】第10讲 | 综合项目实战:从零构建高可用 API 服务
本文是《Python进阶之路》系列最后一讲,通过构建高可用电商订单API服务,将前9讲知识融会贯通。项目采用多层架构设计,技术栈包括FastAPI、PostgreSQL、Redis、JWT等,实现负载均衡、缓存、认证等功能。核心代码展示了配置管理、数据库模型等关键实现,系统架构支持Nginx负载均衡、多实例API服务、数据库和监控组件。项目结构清晰,包含完整的CI/CD流程和监控方案,可作为Pyt
1. 说在前面:是时候把所学融会贯通了
兄弟们,恭喜你走到了这里!这是《Python 进阶之路:从工程师到架构师》系列的最后一讲。
在前面的 9 讲里,我们学习了:
- 性能优化与调优
- 异步编程深度解析
- 分布式系统入门
- 高并发架构设计
- 安全编程实战
- 数据工程与 ETL
- 设计模式进阶
- Python 内部机制
- DevOps 与 CI/CD
这一讲,咱们把这些知识整合起来,从零构建一个高可用的电商订单 API 服务。这个项目会涵盖:
- FastAPI 异步 Web 框架
- PostgreSQL 数据库
- Redis 缓存
- JWT 认证
- 限流与熔断
- Docker 容器化
- Prometheus 监控
- GitHub Actions CI/CD
2. 项目架构设计
2.1 系统架构图
┌─────────────┐
│ Nginx │
│ (负载均衡) │
└──────┬──────┘
│
┌────────────────┼────────────────┐
│ │ │
┌─────▼─────┐ ┌─────▼─────┐ ┌─────▼─────┐
│ API 服务 │ │ API 服务 │ │ API 服务 │
│ (实例 1) │ │ (实例 2) │ │ (实例 3) │
└─────┬─────┘ └─────┬─────┘ └─────┬─────┘
│ │ │
└────────────────┼────────────────┘
│
┌──────────────────────┼──────────────────────┐
│ │ │
┌───▼───┐ ┌─────▼─────┐ ┌─────▼─────┐
│ Redis │ │ PostgreSQL│ │ Prometheus│
│(缓存) │ │ (数据库) │ │ (监控) │
└───────┘ └───────────┘ └───────────┘
2.2 技术栈
| 层级 | 技术选型 |
|---|---|
| Web 框架 | FastAPI |
| 数据库 | PostgreSQL + SQLAlchemy |
| 缓存 | Redis |
| 认证 | JWT |
| 容器化 | Docker + Docker Compose |
| CI/CD | GitHub Actions |
| 监控 | Prometheus + Grafana |
| 负载均衡 | Nginx |
2.3 微服务架构设计
本项目采用单体架构向微服务演进的设计思路,初期以单体形式部署,便于开发和测试,后期可按业务领域拆分为独立服务。
2.3.1 服务拆分规划
| 服务名称 | 职责 | 数据库 | 端口 |
|---|---|---|---|
| API Gateway | 统一入口、路由、鉴权 | - | 80 |
| User Service | 用户注册、登录、信息管理 | PostgreSQL | 8001 |
| Product Service | 商品管理、库存管理 | PostgreSQL | 8002 |
| Order Service | 订单创建、查询、取消 | PostgreSQL | 8003 |
| Notification Service | 邮件、短信通知 | - | 8004 |
2.3.2 服务间通信
- 同步通信:服务间调用使用 HTTP/gRPC(内部网络)
- 异步通信:订单事件通过消息队列(Redis Pub/Sub 或 RabbitMQ)广播
2.3.3 项目结构
ecommerce-api/
├── .github/
│ └── workflows/
│ └── ci.yml
├── app/
│ ├── __init__.py
│ ├── main.py
│ ├── config.py
│ ├── database.py
│ ├── models/
│ │ ├── __init__.py
│ │ ├── user.py
│ │ ├── product.py
│ │ └── order.py
│ ├── schemas/
│ │ ├── __init__.py
│ │ ├── user.py
│ │ ├── product.py
│ │ └── order.py
│ ├── api/
│ │ ├── __init__.py
│ │ ├── auth.py
│ │ ├── users.py
│ │ ├── products.py
│ │ └── orders.py
│ ├── services/
│ │ ├── __init__.py
│ │ ├── auth_service.py
│ │ ├── user_service.py
│ │ ├── product_service.py
│ │ └── order_service.py
│ ├── middleware/
│ │ ├── __init__.py
│ │ ├── rate_limit.py
│ │ └── circuit_breaker.py
│ └── utils/
│ ├── __init__.py
│ ├── security.py
│ └── cache.py
├── tests/
│ ├── __init__.py
│ ├── conftest.py
│ ├── test_auth.py
│ ├── test_products.py
│ └── test_orders.py
├── k8s/
│ ├── namespace.yaml
│ ├── configmap.yaml
│ ├── secret.yaml
│ ├── deployment.yaml
│ ├── service.yaml
│ ├── ingress.yaml
│ └── hpa.yaml
├── monitoring/
│ ├── prometheus.yml
│ ├── grafana-dashboard.json
│ └── alertmanager.yml
├── logging/
│ └── fluentd.conf
├── Dockerfile
├── docker-compose.yml
├── nginx.conf
├── requirements.txt
└── README.md
3. 数据库设计
3.1 ER图设计
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ users │ │ orders │ │ order_items │
├─────────────┤ ├─────────────┤ ├─────────────┤
│ id (PK) │──┐ │ id (PK) │──┐ │ id (PK) │
│ email │ │ │ user_id(FK) │ └───│ order_id(FK)│
│ username │ └───│ status │ │ product_id │
│ password │ │ total_amount│ │ quantity │
│ is_active │ │ created_at │ │ unit_price │
│ created_at │ │ updated_at │ └──────┬──────┘
└─────────────┘ └─────────────┘ │
│
┌──────▼──────┐
│ products │
├─────────────┤
│ id (PK) │
│ name │
│ description │
│ price │
│ stock │
│ is_active │
└─────────────┘
3.2 数据库索引设计
| 表名 | 索引字段 | 索引类型 | 说明 |
|---|---|---|---|
| users | UNIQUE | 邮箱唯一 | |
| users | username | UNIQUE | 用户名唯一 |
| orders | user_id | BTREE | 用户订单查询 |
| orders | created_at | BTREE | 时间范围查询 |
| order_items | order_id | BTREE | 订单详情查询 |
| order_items | product_id | BTREE | 商品销量统计 |
| products | name | BTREE | 商品搜索 |
| products | is_active | BTREE | 筛选有效商品 |
3.3 数据库优化策略
- 连接池配置:SQLAlchemy 连接池大小设为 10,最大溢出 20
- 读写分离:主库写入,从库查询(后期扩展)
- 分表策略:订单表按年份分表(orders_2024, orders_2025)
- 归档策略:3个月前订单归档到历史表
4. API 设计
4.1 RESTful API 规范
| 方法 | 路径 | 描述 | 认证 |
|---|---|---|---|
| POST | /api/auth/register | 用户注册 | 否 |
| POST | /api/auth/login | 用户登录 | 否 |
| POST | /api/auth/refresh | 刷新Token | 否 |
| GET | /api/users/me | 获取当前用户信息 | 是 |
| PUT | /api/users/me | 更新用户信息 | 是 |
| GET | /api/products | 获取商品列表 | 是 |
| GET | /api/products/{id} | 获取商品详情 | 是 |
| POST | /api/products | 创建商品(管理员) | 是 |
| PUT | /api/products/{id} | 更新商品(管理员) | 是 |
| DELETE | /api/products/{id} | 删除商品(管理员) | 是 |
| GET | /api/orders | 获取用户订单列表 | 是 |
| GET | /api/orders/{id} | 获取订单详情 | 是 |
| POST | /api/orders | 创建订单 | 是 |
| PUT | /api/orders/{id}/cancel | 取消订单 | 是 |
| GET | /health | 健康检查 | 否 |
| GET | /metrics | 监控指标 | 否 |
4.2 请求/响应规范
统一响应格式:
{
"code": 200,
"message": "success",
"data": {},
"timestamp": "2024-01-01T00:00:00Z"
}
错误响应格式:
{
"code": 400,
"message": "请求参数错误",
"errors": {
"field": "错误详情"
},
"timestamp": "2024-01-01T00:00:00Z"
}
4.3 分页规范
{
"code": 200,
"data": {
"items": [],
"total": 100,
"page": 1,
"page_size": 20,
"total_pages": 5
}
}
5. 核心代码实现
5.1 配置管理
from pydantic_settings import BaseSettings
from functools import lru_cache
class Settings(BaseSettings):
APP_NAME: str = "E-Commerce API"
APP_VERSION: str = "1.0.0"
DEBUG: bool = False
DATABASE_URL: str
REDIS_URL: str
SECRET_KEY: str
ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
REFRESH_TOKEN_EXPIRE_DAYS: int = 7
RATE_LIMIT_REQUESTS: int = 100
RATE_LIMIT_WINDOW: int = 60
CIRCUIT_BREAKER_THRESHOLD: int = 5
CIRCUIT_BREAKER_TIMEOUT: int = 30
class Config:
env_file = ".env"
@lru_cache()
def get_settings() -> Settings:
return Settings()
5.2 数据模型定义(Schemas)
from pydantic import BaseModel, EmailStr, Field
from typing import Optional, List
from datetime import datetime
from enum import Enum
# 用户相关
class UserBase(BaseModel):
email: EmailStr
username: str = Field(..., min_length=3, max_length=50)
class UserCreate(UserBase):
password: str = Field(..., min_length=8)
class UserLogin(BaseModel):
username: str
password: str
class UserResponse(UserBase):
id: int
is_active: bool
created_at: datetime
class Config:
from_attributes = True
# 商品相关
class ProductBase(BaseModel):
name: str = Field(..., min_length=1, max_length=200)
description: Optional[str] = None
price: float = Field(..., gt=0)
stock: int = Field(..., ge=0)
class ProductCreate(ProductBase):
pass
class ProductResponse(ProductBase):
id: int
is_active: bool
created_at: datetime
class Config:
from_attributes = True
# 订单相关
class OrderStatus(str, Enum):
PENDING = "pending"
PAID = "paid"
SHIPPED = "shipped"
COMPLETED = "completed"
CANCELLED = "cancelled"
class OrderItemCreate(BaseModel):
product_id: int
quantity: int = Field(..., ge=1)
class OrderItemResponse(BaseModel):
product_id: int
quantity: int
unit_price: float
class Config:
from_attributes = True
class OrderCreate(BaseModel):
items: List[OrderItemCreate] = Field(..., min_items=1)
class OrderResponse(BaseModel):
id: int
user_id: int
status: OrderStatus
total_amount: float
items: List[OrderItemResponse]
created_at: datetime
class Config:
from_attributes = True
# Token
class Token(BaseModel):
access_token: str
refresh_token: str
token_type: str = "bearer"
# 分页
class PaginationParams(BaseModel):
page: int = Field(1, ge=1)
page_size: int = Field(20, ge=1, le=100)
class PaginatedResponse(BaseModel):
items: List
total: int
page: int
page_size: int
total_pages: int
5.3 数据库模型
from sqlalchemy import Column, Integer, String, Float, Boolean, DateTime, ForeignKey, Enum
from sqlalchemy.orm import relationship
from datetime import datetime
import enum
from app.database import Base
class OrderStatus(str, enum.Enum):
PENDING = "pending"
PAID = "paid"
SHIPPED = "shipped"
COMPLETED = "completed"
CANCELLED = "cancelled"
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
email = Column(String, unique=True, index=True, nullable=False)
username = Column(String, unique=True, index=True, nullable=False)
hashed_password = Column(String, nullable=False)
is_active = Column(Boolean, default=True)
is_superuser = Column(Boolean, default=False)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
orders = relationship("Order", back_populates="user")
class Product(Base):
__tablename__ = "products"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, index=True, nullable=False)
description = Column(String)
price = Column(Float, nullable=False)
stock = Column(Integer, default=0)
is_active = Column(Boolean, default=True)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
order_items = relationship("OrderItem", back_populates="product")
class Order(Base):
__tablename__ = "orders"
id = Column(Integer, primary_key=True, index=True)
user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
status = Column(Enum(OrderStatus), default=OrderStatus.PENDING)
total_amount = Column(Float, default=0)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
user = relationship("User", back_populates="orders")
items = relationship("OrderItem", back_populates="order", cascade="all, delete-orphan")
class OrderItem(Base):
__tablename__ = "order_items"
id = Column(Integer, primary_key=True, index=True)
order_id = Column(Integer, ForeignKey("orders.id"), nullable=False)
product_id = Column(Integer, ForeignKey("products.id"), nullable=False)
quantity = Column(Integer, nullable=False)
unit_price = Column(Float, nullable=False)
order = relationship("Order", back_populates="items")
product = relationship("Product", back_populates="order_items")
5.4 数据库连接
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import declarative_base
from app.config import get_settings
settings = get_settings()
engine = create_async_engine(
settings.DATABASE_URL.replace("postgresql://", "postgresql+asyncpg://"),
echo=settings.DEBUG,
pool_size=10,
max_overflow=20,
pool_pre_ping=True
)
AsyncSessionLocal = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False,
autocommit=False,
autoflush=False
)
Base = declarative_base()
async def get_db():
async with AsyncSessionLocal() as session:
try:
yield session
finally:
await session.close()
async def init_db():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
5.5 Redis 缓存
import redis.asyncio as redis
from typing import Optional, Any
import json
from app.config import get_settings
settings = get_settings()
class CacheManager:
def __init__(self):
self.client: Optional[redis.Redis] = None
async def connect(self):
self.client = redis.from_url(
settings.REDIS_URL,
encoding="utf-8",
decode_responses=True
)
async def disconnect(self):
if self.client:
await self.client.close()
async def get(self, key: str) -> Optional[Any]:
if not self.client:
return None
value = await self.client.get(key)
if value:
return json.loads(value)
return None
async def set(self, key: str, value: Any, expire: int = 300):
if not self.client:
return
await self.client.set(key, json.dumps(value), ex=expire)
async def delete(self, key: str):
if not self.client:
return
await self.client.delete(key)
async def incr(self, key: str) -> int:
if not self.client:
return 0
return await self.client.incr(key)
async def expire(self, key: str, seconds: int):
if not self.client:
return
await self.client.expire(key, seconds)
cache = CacheManager()
5.6 安全工具
import bcrypt
import jwt
from datetime import datetime, timedelta
from typing import Optional
from app.config import get_settings
settings = get_settings()
def hash_password(password: str) -> str:
salt = bcrypt.gensalt(rounds=12)
return bcrypt.hashpw(password.encode(), salt).decode()
def verify_password(password: str, hashed: str) -> bool:
return bcrypt.checkpw(password.encode(), hashed.encode())
def create_access_token(user_id: int, username: str) -> str:
expire = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
payload = {
"sub": str(user_id),
"username": username,
"type": "access",
"exp": expire
}
return jwt.encode(payload, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
def create_refresh_token(user_id: int) -> str:
expire = datetime.utcnow() + timedelta(days=settings.REFRESH_TOKEN_EXPIRE_DAYS)
payload = {
"sub": str(user_id),
"type": "refresh",
"exp": expire
}
return jwt.encode(payload, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
def decode_token(token: str) -> Optional[dict]:
try:
return jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
except jwt.InvalidTokenError:
return None
5.7 熔断器中间件
from fastapi import Request, HTTPException
from starlette.middleware.base import BaseHTTPMiddleware
from enum import Enum
import time
from typing import Dict
from app.config import get_settings
settings = get_settings()
class CircuitState(Enum):
CLOSED = "closed" # 正常状态
OPEN = "open" # 熔断状态
HALF_OPEN = "half_open" # 半开状态
class CircuitBreaker:
def __init__(self, threshold: int = 5, timeout: int = 30):
self.threshold = threshold
self.timeout = timeout
self.failures: Dict[str, int] = {}
self.last_failure_time: Dict[str, float] = {}
self.state: Dict[str, CircuitState] = {}
def get_state(self, key: str) -> CircuitState:
return self.state.get(key, CircuitState.CLOSED)
def record_success(self, key: str):
if key in self.failures:
del self.failures[key]
if key in self.last_failure_time:
del self.last_failure_time[key]
self.state[key] = CircuitState.CLOSED
def record_failure(self, key: str):
current_time = time.time()
self.failures[key] = self.failures.get(key, 0) + 1
self.last_failure_time[key] = current_time
if self.failures[key] >= self.threshold:
self.state[key] = CircuitState.OPEN
def can_execute(self, key: str) -> bool:
state = self.get_state(key)
if state == CircuitState.CLOSED:
return True
if state == CircuitState.OPEN:
last_fail = self.last_failure_time.get(key, 0)
if time.time() - last_fail > self.timeout:
self.state[key] = CircuitState.HALF_OPEN
return True
return False
return True # HALF_OPEN
circuit_breaker = CircuitBreaker(
threshold=settings.CIRCUIT_BREAKER_THRESHOLD,
timeout=settings.CIRCUIT_BREAKER_TIMEOUT
)
class CircuitBreakerMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
# 只对特定路径启用熔断
if not request.url.path.startswith("/api/"):
return await call_next(request)
service_key = request.url.path.split("/")[2] # /api/service/...
if not circuit_breaker.can_execute(service_key):
raise HTTPException(
status_code=503,
detail="服务暂时不可用,请稍后重试"
)
try:
response = await call_next(request)
# 5xx 错误视为服务故障
if response.status_code >= 500:
circuit_breaker.record_failure(service_key)
else:
circuit_breaker.record_success(service_key)
return response
except Exception as e:
circuit_breaker.record_failure(service_key)
raise
5.8 限流中间件
from fastapi import Request, HTTPException
from starlette.middleware.base import BaseHTTPMiddleware
from app.utils.cache import cache
from app.config import get_settings
import time
settings = get_settings()
class RateLimitMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
if request.url.path in ["/health", "/metrics", "/docs", "/openapi.json"]:
return await call_next(request)
client_ip = request.client.host
key = f"rate_limit:{client_ip}"
current = await cache.incr(key)
if current == 1:
await cache.expire(key, settings.RATE_LIMIT_WINDOW)
if current > settings.RATE_LIMIT_REQUESTS:
raise HTTPException(
status_code=429,
detail="请求过于频繁,请稍后重试"
)
response = await call_next(request)
response.headers["X-RateLimit-Limit"] = str(settings.RATE_LIMIT_REQUESTS)
response.headers["X-RateLimit-Remaining"] = str(max(0, settings.RATE_LIMIT_REQUESTS - current))
return response
5.9 依赖注入与认证
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db
from app.utils.security import decode_token
from app.models.user import User
from sqlalchemy import select
security = HTTPBearer()
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: AsyncSession = Depends(get_db)
) -> User:
token = credentials.credentials
payload = decode_token(token)
if not payload or payload.get("type") != "access":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="无效的认证凭证",
headers={"WWW-Authenticate": "Bearer"}
)
user_id = int(payload.get("sub"))
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="用户不存在"
)
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="账户已被禁用"
)
return user
async def get_current_active_user(
current_user: User = Depends(get_current_user)
) -> User:
return current_user
async def get_current_superuser(
current_user: User = Depends(get_current_user)
) -> User:
if not current_user.is_superuser:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="需要管理员权限"
)
return current_user
5.10 认证服务
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from app.models.user import User
from app.utils.security import hash_password, verify_password, create_access_token, create_refresh_token
from app.schemas.user import UserCreate, UserLogin, Token
from fastapi import HTTPException
class AuthService:
def __init__(self, db: AsyncSession):
self.db = db
async def register(self, user_data: UserCreate) -> User:
result = await self.db.execute(
select(User).where(User.email == user_data.email)
)
if result.scalar_one_or_none():
raise HTTPException(status_code=400, detail="邮箱已被注册")
result = await self.db.execute(
select(User).where(User.username == user_data.username)
)
if result.scalar_one_or_none():
raise HTTPException(status_code=400, detail="用户名已被使用")
user = User(
email=user_data.email,
username=user_data.username,
hashed_password=hash_password(user_data.password)
)
self.db.add(user)
await self.db.commit()
await self.db.refresh(user)
return user
async def login(self, credentials: UserLogin) -> Token:
result = await self.db.execute(
select(User).where(User.username == credentials.username)
)
user = result.scalar_one_or_none()
if not user or not verify_password(credentials.password, user.hashed_password):
raise HTTPException(status_code=401, detail="用户名或密码错误")
if not user.is_active:
raise HTTPException(status_code=401, detail="账户已被禁用")
return Token(
access_token=create_access_token(user.id, user.username),
refresh_token=create_refresh_token(user.id),
token_type="bearer"
)
5.11 商品服务
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func
from app.models.product import Product
from app.schemas.product import ProductCreate, ProductResponse
from app.utils.cache import cache
from fastapi import HTTPException
from typing import List, Optional
class ProductService:
def __init__(self, db: AsyncSession):
self.db = db
async def create_product(self, product_data: ProductCreate) -> Product:
product = Product(**product_data.model_dump())
self.db.add(product)
await self.db.commit()
await self.db.refresh(product)
# 清除缓存
await cache.delete("products:list")
return product
async def get_product(self, product_id: int) -> Optional[Product]:
cache_key = f"product:{product_id}"
cached = await cache.get(cache_key)
if cached:
return Product(**cached)
result = await self.db.execute(
select(Product).where(Product.id == product_id, Product.is_active == True)
)
product = result.scalar_one_or_none()
if product:
await cache.set(cache_key, {
"id": product.id,
"name": product.name,
"description": product.description,
"price": product.price,
"stock": product.stock,
"is_active": product.is_active
}, expire=300)
return product
async def get_products(
self,
skip: int = 0,
limit: int = 20,
search: Optional[str] = None
) -> tuple[List[Product], int]:
cache_key = f"products:list:{skip}:{limit}:{search}"
cached = await cache.get(cache_key)
if cached:
return cached["items"], cached["total"]
query = select(Product).where(Product.is_active == True)
count_query = select(func.count()).select_from(Product).where(Product.is_active == True)
if search:
query = query.where(Product.name.ilike(f"%{search}%"))
count_query = count_query.where(Product.name.ilike(f"%{search}%"))
query = query.offset(skip).limit(limit)
result = await self.db.execute(query)
products = result.scalars().all()
count_result = await self.db.execute(count_query)
total = count_result.scalar()
await cache.set(cache_key, {"items": products, "total": total}, expire=60)
return products, total
async def update_product(
self,
product_id: int,
product_data: ProductCreate
) -> Product:
result = await self.db.execute(
select(Product).where(Product.id == product_id)
)
product = result.scalar_one_or_none()
if not product:
raise HTTPException(status_code=404, detail="商品不存在")
for key, value in product_data.model_dump().items():
setattr(product, key, value)
await self.db.commit()
await self.db.refresh(product)
# 清除缓存
await cache.delete(f"product:{product_id}")
await cache.delete("products:list")
return product
async def delete_product(self, product_id: int) -> None:
result = await self.db.execute(
select(Product).where(Product.id == product_id)
)
product = result.scalar_one_or_none()
if not product:
raise HTTPException(status_code=404, detail="商品不存在")
product.is_active = False
await self.db.commit()
# 清除缓存
await cache.delete(f"product:{product_id}")
await cache.delete("products:list")
5.12 订单服务
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from sqlalchemy.orm import selectinload
from app.models.order import Order, OrderItem, OrderStatus
from app.models.product import Product
from app.schemas.order import OrderCreate, OrderResponse
from app.utils.cache import cache
from fastapi import HTTPException
from typing import List
class OrderService:
def __init__(self, db: AsyncSession):
self.db = db
async def create_order(self, user_id: int, order_data: OrderCreate) -> Order:
total_amount = 0
items = []
for item in order_data.items:
result = await self.db.execute(
select(Product).where(Product.id == item.product_id, Product.is_active == True)
)
product = result.scalar_one_or_none()
if not product:
raise HTTPException(status_code=404, detail=f"商品 {item.product_id} 不存在")
if product.stock < item.quantity:
raise HTTPException(status_code=400, detail=f"商品 {product.name} 库存不足")
product.stock -= item.quantity
item_total = product.price * item.quantity
total_amount += item_total
items.append(OrderItem(
product_id=product.id,
quantity=item.quantity,
unit_price=product.price
))
order = Order(
user_id=user_id,
status=OrderStatus.PENDING,
total_amount=total_amount
)
order.items = items
self.db.add(order)
await self.db.commit()
await self.db.refresh(order)
await cache.delete(f"user_orders:{user_id}")
return order
async def get_user_orders(self, user_id: int) -> List[Order]:
cache_key = f"user_orders:{user_id}"
cached = await cache.get(cache_key)
if cached:
return cached
result = await self.db.execute(
select(Order)
.options(selectinload(Order.items))
.where(Order.user_id == user_id)
.order_by(Order.created_at.desc())
)
orders = result.scalars().all()
orders_data = [
{
"id": o.id,
"status": o.status.value,
"total_amount": o.total_amount,
"items": [
{"product_id": i.product_id, "quantity": i.quantity, "unit_price": i.unit_price}
for i in o.items
],
"created_at": o.created_at.isoformat()
}
for o in orders
]
await cache.set(cache_key, orders_data, expire=60)
return orders
async def cancel_order(self, user_id: int, order_id: int) -> Order:
result = await self.db.execute(
select(Order)
.options(selectinload(Order.items))
.where(Order.id == order_id, Order.user_id == user_id)
)
order = result.scalar_one_or_none()
if not order:
raise HTTPException(status_code=404, detail="订单不存在")
if order.status not in [OrderStatus.PENDING, OrderStatus.PAID]:
raise HTTPException(status_code=400, detail="订单状态不允许取消")
for item in order.items:
result = await self.db.execute(
select(Product).where(Product.id == item.product_id)
)
product = result.scalar_one_or_none()
if product:
product.stock += item.quantity
order.status = OrderStatus.CANCELLED
await self.db.commit()
await self.db.refresh(order)
await cache.delete(f"user_orders:{user_id}")
return order
5.13 API路由
# app/api/auth.py
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db
from app.schemas.user import UserCreate, UserLogin, Token, UserResponse
from app.services.auth_service import AuthService
router = APIRouter()
@router.post("/register", response_model=UserResponse)
async def register(
user_data: UserCreate,
db: AsyncSession = Depends(get_db)
):
auth_service = AuthService(db)
return await auth_service.register(user_data)
@router.post("/login", response_model=Token)
async def login(
credentials: UserLogin,
db: AsyncSession = Depends(get_db)
):
auth_service = AuthService(db)
return await auth_service.login(credentials)
# app/api/products.py
from fastapi import APIRouter, Depends, Query
from sqlalchemy.ext.asyncio import AsyncSession
from typing import Optional
from app.database import get_db
from app.schemas.product import ProductCreate, ProductResponse
from app.schemas.user import UserResponse
from app.services.product_service import ProductService
from app.api.deps import get_current_user, get_current_superuser
from app.models.user import User
router = APIRouter()
@router.get("/", response_model=dict)
async def get_products(
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100),
search: Optional[str] = None,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
product_service = ProductService(db)
skip = (page - 1) * page_size
products, total = await product_service.get_products(
skip=skip, limit=page_size, search=search
)
total_pages = (total + page_size - 1) // page_size
return {
"code": 200,
"data": {
"items": products,
"total": total,
"page": page,
"page_size": page_size,
"total_pages": total_pages
}
}
@router.get("/{product_id}", response_model=dict)
async def get_product(
product_id: int,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
product_service = ProductService(db)
product = await product_service.get_product(product_id)
if not product:
raise HTTPException(status_code=404, detail="商品不存在")
return {"code": 200, "data": product}
@router.post("/", response_model=dict)
async def create_product(
product_data: ProductCreate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_superuser)
):
product_service = ProductService(db)
product = await product_service.create_product(product_data)
return {"code": 200, "data": product}
# app/api/orders.py
from fastapi import APIRouter, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db
from app.schemas.order import OrderCreate, OrderResponse
from app.services.order_service import OrderService
from app.api.deps import get_current_user
from app.models.user import User
router = APIRouter()
@router.get("/")
async def get_orders(
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
order_service = OrderService(db)
orders = await order_service.get_user_orders(current_user.id)
return {"code": 200, "data": orders}
@router.post("/")
async def create_order(
order_data: OrderCreate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
order_service = OrderService(db)
order = await order_service.create_order(current_user.id, order_data)
return {"code": 200, "data": order}
@router.put("/{order_id}/cancel")
async def cancel_order(
order_id: int,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
order_service = OrderService(db)
order = await order_service.cancel_order(current_user.id, order_id)
return {"code": 200, "data": order}
5.14 主应用
from fastapi import FastAPI, Depends
from fastapi.middleware.cors import CORSMiddleware
from prometheus_client import Counter, Histogram, generate_latest
from fastapi.responses import Response
import time
from contextlib import asynccontextmanager
from app.config import get_settings
from app.database import init_db, get_db
from app.utils.cache import cache
from app.middleware.rate_limit import RateLimitMiddleware
from app.middleware.circuit_breaker import CircuitBreakerMiddleware
from app.api import auth, users, products, orders
settings = get_settings()
REQUEST_COUNT = Counter(
'http_requests_total',
'Total HTTP requests',
['method', 'endpoint', 'status']
)
REQUEST_LATENCY = Histogram(
'http_request_duration_seconds',
'HTTP request latency',
['method', 'endpoint']
)
@asynccontextmanager
async def lifespan(app: FastAPI):
await init_db()
await cache.connect()
yield
await cache.disconnect()
app = FastAPI(
title=settings.APP_NAME,
version=settings.APP_VERSION,
lifespan=lifespan
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.add_middleware(RateLimitMiddleware)
app.add_middleware(CircuitBreakerMiddleware)
@app.middleware("http")
async def add_metrics(request, call_next):
start_time = time.time()
response = await call_next(request)
latency = time.time() - start_time
REQUEST_LATENCY.labels(
method=request.method,
endpoint=request.url.path
).observe(latency)
REQUEST_COUNT.labels(
method=request.method,
endpoint=request.url.path,
status=response.status_code
).inc()
return response
app.include_router(auth.router, prefix="/api/auth", tags=["认证"])
app.include_router(users.router, prefix="/api/users", tags=["用户"])
app.include_router(products.router, prefix="/api/products", tags=["商品"])
app.include_router(orders.router, prefix="/api/orders", tags=["订单"])
@app.get("/health")
async def health():
return {"status": "ok", "version": settings.APP_VERSION}
@app.get("/metrics")
async def metrics():
return Response(
content=generate_latest(),
media_type="text/plain"
)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
5.15 依赖文件
# requirements.txt
fastapi==0.109.0
uvicorn[standard]==0.27.0
pydantic==2.5.0
pydantic-settings==2.1.0
sqlalchemy[asyncio]==2.0.25
asyncpg==0.29.0
redis==5.0.1
bcrypt==4.1.2
pyjwt==2.8.0
python-multipart==0.0.6
email-validator==2.1.0
prometheus-client==0.19.0
httpx==0.26.0
pytest==7.4.4
pytest-asyncio==0.23.3
pytest-cov==4.1.0
5.16 环境变量配置
# .env 示例
APP_NAME=E-Commerce API
APP_VERSION=1.0.0
DEBUG=false
DATABASE_URL=postgresql://postgres:password@localhost:5432/ecommerce
REDIS_URL=redis://localhost:6379/0
SECRET_KEY=your-secret-key-here-min-32-characters-long
ALGORITHM=HS256
ACCESS_TOKEN_EXPIRE_MINUTES=30
REFRESH_TOKEN_EXPIRE_DAYS=7
RATE_LIMIT_REQUESTS=100
RATE_LIMIT_WINDOW=60
CIRCUIT_BREAKER_THRESHOLD=5
CIRCUIT_BREAKER_TIMEOUT=30
6. Docker 配置
6.1 Dockerfile
FROM python:3.12-slim as builder
WORKDIR /app
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential \
libpq-dev \
&& rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
RUN pip wheel --no-cache-dir --no-deps --wheel-dir /app/wheels -r requirements.txt
FROM python:3.12-slim
WORKDIR /app
RUN apt-get update && apt-get install -y --no-install-recommends \
libpq5 \
&& rm -rf /var/lib/apt/lists/*
RUN groupadd -r appuser && useradd -r -g appuser appuser
COPY --from=builder /app/wheels /wheels
COPY --from=builder /app/requirements.txt .
RUN pip install --no-cache /wheels/*
COPY --chown=appuser:appuser . .
USER appuser
EXPOSE 8000
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
6.2 Nginx 配置
# nginx.conf
upstream api_servers {
least_conn;
server api:8000 weight=5;
}
server {
listen 80;
server_name localhost;
# 日志配置
access_log /var/log/nginx/access.log;
error_log /var/log/nginx/error.log;
# 健康检查
location /health {
proxy_pass http://api_servers/health;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_connect_timeout 5s;
proxy_send_timeout 10s;
proxy_read_timeout 10s;
}
# API 路由
location /api/ {
proxy_pass http://api_servers/;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# 超时配置
proxy_connect_timeout 5s;
proxy_send_timeout 60s;
proxy_read_timeout 60s;
# 缓冲区配置
proxy_buffering on;
proxy_buffer_size 4k;
proxy_buffers 8 4k;
# 限流配置(每秒10请求,突发20)
limit_req zone=api_limit burst=20 nodelay;
}
# 监控指标
location /metrics {
proxy_pass http://api_servers/metrics;
allow 10.0.0.0/8;
allow 172.16.0.0/12;
allow 192.168.0.0/16;
deny all;
}
# 静态文件
location /static/ {
alias /app/static/;
expires 30d;
add_header Cache-Control "public, immutable";
}
}
# 限流区域定义
limit_req_zone $binary_remote_addr zone=api_limit:10m rate=10r/s;
6.3 docker-compose.yml
version: '3.8'
services:
api:
build:
context: .
dockerfile: Dockerfile
ports:
- "8000:8000"
environment:
- DATABASE_URL=postgresql://postgres:password@db:5432/ecommerce
- REDIS_URL=redis://redis:6379/0
- SECRET_KEY=${SECRET_KEY}
depends_on:
db:
condition: service_healthy
redis:
condition: service_started
networks:
- app-network
restart: unless-stopped
db:
image: postgres:15-alpine
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=password
- POSTGRES_DB=ecommerce
volumes:
- postgres_data:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres -d ecommerce"]
interval: 10s
timeout: 5s
retries: 5
networks:
- app-network
restart: unless-stopped
redis:
image: redis:7-alpine
command: redis-server --appendonly yes
volumes:
- redis_data:/data
networks:
- app-network
restart: unless-stopped
nginx:
image: nginx:alpine
ports:
- "80:80"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf:ro
depends_on:
- api
networks:
- app-network
restart: unless-stopped
prometheus:
image: prom/prometheus:latest
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml:ro
networks:
- app-network
restart: unless-stopped
networks:
app-network:
driver: bridge
volumes:
postgres_data:
redis_data:
7. Kubernetes 部署配置
7.1 命名空间
# k8s/namespace.yaml
apiVersion: v1
kind: Namespace
metadata:
name: ecommerce
labels:
name: ecommerce
environment: production
7.2 配置映射
# k8s/configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: ecommerce-config
namespace: ecommerce
data:
APP_NAME: "E-Commerce API"
APP_VERSION: "1.0.0"
DEBUG: "false"
DATABASE_URL: "postgresql://postgres:password@postgres:5432/ecommerce"
REDIS_URL: "redis://redis:6379/0"
ALGORITHM: "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: "30"
REFRESH_TOKEN_EXPIRE_DAYS: "7"
RATE_LIMIT_REQUESTS: "100"
RATE_LIMIT_WINDOW: "60"
CIRCUIT_BREAKER_THRESHOLD: "5"
CIRCUIT_BREAKER_TIMEOUT: "30"
7.3 密钥
# k8s/secret.yaml
apiVersion: v1
kind: Secret
metadata:
name: ecommerce-secret
namespace: ecommerce
type: Opaque
data:
# echo -n 'your-secret-key' | base64
SECRET_KEY: eW91ci1zZWNyZXQta2V5LWhlcmUtbWluLTMyLWNoYXJhY3RlcnMtbG9uZw==
7.4 部署配置
# k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: ecommerce-api
namespace: ecommerce
labels:
app: ecommerce-api
spec:
replicas: 3
selector:
matchLabels:
app: ecommerce-api
template:
metadata:
labels:
app: ecommerce-api
spec:
containers:
- name: api
image: ecommerce-api:latest
ports:
- containerPort: 8000
envFrom:
- configMapRef:
name: ecommerce-config
env:
- name: SECRET_KEY
valueFrom:
secretKeyRef:
name: ecommerce-secret
key: SECRET_KEY
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: postgres
namespace: ecommerce
spec:
replicas: 1
selector:
matchLabels:
app: postgres
template:
metadata:
labels:
app: postgres
spec:
containers:
- name: postgres
image: postgres:15-alpine
ports:
- containerPort: 5432
env:
- name: POSTGRES_USER
value: postgres
- name: POSTGRES_PASSWORD
value: password
- name: POSTGRES_DB
value: ecommerce
volumeMounts:
- name: postgres-storage
mountPath: /var/lib/postgresql/data
volumes:
- name: postgres-storage
persistentVolumeClaim:
claimName: postgres-pvc
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: redis
namespace: ecommerce
spec:
replicas: 1
selector:
matchLabels:
app: redis
template:
metadata:
labels:
app: redis
spec:
containers:
- name: redis
image: redis:7-alpine
ports:
- containerPort: 6379
command:
- redis-server
- --appendonly
- "yes"
volumeMounts:
- name: redis-storage
mountPath: /data
volumes:
- name: redis-storage
persistentVolumeClaim:
claimName: redis-pvc
7.5 服务配置
# k8s/service.yaml
apiVersion: v1
kind: Service
metadata:
name: ecommerce-api
namespace: ecommerce
spec:
selector:
app: ecommerce-api
ports:
- port: 80
targetPort: 8000
type: ClusterIP
---
apiVersion: v1
kind: Service
metadata:
name: postgres
namespace: ecommerce
spec:
selector:
app: postgres
ports:
- port: 5432
targetPort: 5432
type: ClusterIP
---
apiVersion: v1
kind: Service
metadata:
name: redis
namespace: ecommerce
spec:
selector:
app: redis
ports:
- port: 6379
targetPort: 6379
type: ClusterIP
7.6 入口配置
# k8s/ingress.yaml
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: ecommerce-ingress
namespace: ecommerce
annotations:
nginx.ingress.kubernetes.io/rewrite-target: /
nginx.ingress.kubernetes.io/rate-limit: "100"
nginx.ingress.kubernetes.io/rate-limit-window: "1m"
cert-manager.io/cluster-issuer: "letsencrypt-prod"
spec:
ingressClassName: nginx
tls:
- hosts:
- api.ecommerce.com
secretName: ecommerce-tls
rules:
- host: api.ecommerce.com
http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: ecommerce-api
port:
number: 80
7.7 自动扩缩容
# k8s/hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: ecommerce-api-hpa
namespace: ecommerce
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: ecommerce-api
minReplicas: 3
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
behavior:
scaleUp:
stabilizationWindowSeconds: 60
policies:
- type: Percent
value: 100
periodSeconds: 60
scaleDown:
stabilizationWindowSeconds: 300
policies:
- type: Percent
value: 10
periodSeconds: 60
7.8 持久化存储
# k8s/pvc.yaml
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: postgres-pvc
namespace: ecommerce
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 10Gi
storageClassName: standard
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: redis-pvc
namespace: ecommerce
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 5Gi
storageClassName: standard
8. 监控与告警
8.1 Prometheus 配置
# monitoring/prometheus.yml
global:
scrape_interval: 15s
evaluation_interval: 15s
alerting:
alertmanagers:
- static_configs:
- targets: ['alertmanager:9093']
rule_files:
- /etc/prometheus/rules/*.yml
scrape_configs:
- job_name: 'prometheus'
static_configs:
- targets: ['localhost:9090']
- job_name: 'ecommerce-api'
static_configs:
- targets: ['ecommerce-api:8000']
metrics_path: /metrics
scrape_interval: 5s
- job_name: 'postgres'
static_configs:
- targets: ['postgres-exporter:9187']
- job_name: 'redis'
static_configs:
- targets: ['redis-exporter:9121']
- job_name: 'node'
static_configs:
- targets: ['node-exporter:9100']
8.2 告警规则
# monitoring/alerts.yml
groups:
- name: ecommerce-api
rules:
- alert: HighErrorRate
expr: rate(http_requests_total{status=~"5.."}[5m]) > 0.05
for: 5m
labels:
severity: critical
annotations:
summary: "High error rate detected"
description: "Error rate is above 5% for 5 minutes"
- alert: HighLatency
expr: histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m])) > 0.5
for: 5m
labels:
severity: warning
annotations:
summary: "High latency detected"
description: "95th percentile latency is above 500ms"
- alert: ServiceDown
expr: up{job="ecommerce-api"} == 0
for: 1m
labels:
severity: critical
annotations:
summary: "Service is down"
description: "E-Commerce API service is not responding"
- alert: HighCPUUsage
expr: 100 - (avg by(instance) (irate(node_cpu_seconds_total{mode="idle"}[5m])) * 100) > 80
for: 10m
labels:
severity: warning
annotations:
summary: "High CPU usage"
description: "CPU usage is above 80% for 10 minutes"
- alert: HighMemoryUsage
expr: (node_memory_MemTotal_bytes - node_memory_MemAvailable_bytes) / node_memory_MemTotal_bytes * 100 > 85
for: 10m
labels:
severity: warning
annotations:
summary: "High memory usage"
description: "Memory usage is above 85% for 10 minutes"
8.3 Alertmanager 配置
# monitoring/alertmanager.yml
global:
smtp_smarthost: 'smtp.gmail.com:587'
smtp_from: 'alerts@ecommerce.com'
smtp_auth_username: 'alerts@ecommerce.com'
smtp_auth_password: 'your-email-password'
route:
group_by: ['alertname', 'severity']
group_wait: 10s
group_interval: 10s
repeat_interval: 1h
receiver: 'default'
routes:
- match:
severity: critical
receiver: 'pagerduty'
continue: true
- match:
severity: warning
receiver: 'email'
receivers:
- name: 'default'
slack_configs:
- api_url: 'https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK'
channel: '#alerts'
title: 'E-Commerce API Alert'
text: '{{ range .Alerts }}{{ .Annotations.summary }}{{ end }}'
- name: 'email'
email_configs:
- to: 'ops@ecommerce.com'
subject: 'E-Commerce API Alert'
body: '{{ range .Alerts }}{{ .Annotations.description }}{{ end }}'
- name: 'pagerduty'
pagerduty_configs:
- service_key: 'your-pagerduty-key'
description: '{{ .GroupLabels.alertname }}'
8.4 Grafana 仪表板
{
"dashboard": {
"title": "E-Commerce API Dashboard",
"panels": [
{
"title": "Request Rate",
"type": "graph",
"targets": [
{
"expr": "rate(http_requests_total[5m])",
"legendFormat": "{{method}} {{endpoint}}"
}
]
},
{
"title": "Response Time (95th percentile)",
"type": "graph",
"targets": [
{
"expr": "histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m]))",
"legendFormat": "{{method}} {{endpoint}}"
}
]
},
{
"title": "Error Rate",
"type": "singlestat",
"targets": [
{
"expr": "rate(http_requests_total{status=~\"5..\"}[5m])"
}
]
},
{
"title": "Active Connections",
"type": "singlestat",
"targets": [
{
"expr": "up{job=\"ecommerce-api\"}"
}
]
}
]
}
}
9. 日志收集
9.1 Fluentd 配置
# logging/fluentd.conf
<source>
@type tail
path /var/log/containers/*.log
pos_file /var/log/fluentd-docker.pos
tag kubernetes.*
<parse>
@type json
time_key time
time_format %Y-%m-%dT%H:%M:%S.%NZ
</parse>
</source>
<filter kubernetes.**>
@type kubernetes_metadata
</filter>
<filter kubernetes.**>
@type grep
<regexp>
key $.kubernetes.container_name
pattern ^ecommerce-api$
</regexp>
</filter>
<match kubernetes.**>
@type elasticsearch
host elasticsearch
port 9200
logstash_format true
logstash_prefix ecommerce-api
flush_interval 10s
</match>
9.2 应用日志配置
# app/utils/logger.py
import logging
import sys
from pythonjsonlogger import jsonlogger
class CustomJsonFormatter(jsonlogger.JsonFormatter):
def add_fields(self, log_record, record, message_dict):
super(CustomJsonFormatter, self).add_fields(log_record, record, message_dict)
log_record['level'] = record.levelname
log_record['logger'] = record.name
log_record['timestamp'] = self.formatTime(record)
def setup_logging():
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# JSON 格式处理器
log_handler = logging.StreamHandler(sys.stdout)
formatter = CustomJsonFormatter('%(timestamp)s %(level)s %(name)s %(message)s')
log_handler.setFormatter(formatter)
logger.addHandler(log_handler)
return logger
logger = setup_logging()
9.3 日志使用示例
from app.utils.logger import logger
# 记录请求日志
logger.info("Request processed", extra={
"method": request.method,
"path": request.url.path,
"status_code": response.status_code,
"duration_ms": duration * 1000,
"user_id": user_id
})
# 记录错误日志
logger.error("Database connection failed", extra={
"error": str(e),
"retry_count": retry_count
})
10. CI/CD 配置
name: CI/CD Pipeline
on:
push:
branches: [ main, develop ]
pull_request:
branches: [ main ]
jobs:
test:
runs-on: ubuntu-latest
services:
postgres:
image: postgres:15
env:
POSTGRES_USER: test
POSTGRES_PASSWORD: test
POSTGRES_DB: test_db
ports:
- 5432:5432
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
redis:
image: redis:7
ports:
- 6379:6379
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: '3.12'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
pip install pytest pytest-asyncio pytest-cov httpx
- name: Run tests
env:
DATABASE_URL: postgresql://test:test@localhost:5432/test_db
REDIS_URL: redis://localhost:6379/0
SECRET_KEY: test-secret-key
run: |
pytest tests/ -v --cov=app --cov-report=xml
- name: Upload coverage
uses: codecov/codecov-action@v3
with:
file: ./coverage.xml
build:
needs: test
runs-on: ubuntu-latest
if: github.event_name == 'push'
steps:
- uses: actions/checkout@v4
- name: Build Docker image
run: docker build -t ecommerce-api:${{ github.sha }} .
- name: Push to registry
run: |
echo ${{ secrets.DOCKER_PASSWORD }} | docker login -u ${{ secrets.DOCKER_USERNAME }} --password-stdin
docker push ecommerce-api:${{ github.sha }}
deploy:
needs: build
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
steps:
- name: Deploy to production
uses: appleboy/ssh-action@v1.0.0
with:
host: ${{ secrets.PRODUCTION_HOST }}
username: ${{ secrets.PRODUCTION_USER }}
key: ${{ secrets.SSH_PRIVATE_KEY }}
script: |
cd /opt/ecommerce-api
docker-compose pull
docker-compose up -d
docker image prune -f
11. 测试代码
import pytest
from httpx import AsyncClient
from sqlalchemy.ext.asyncio import AsyncSession
from app.main import app
from app.database import AsyncSessionLocal
from app.models.user import User
from app.utils.security import hash_password
@pytest.fixture
async def db_session():
async with AsyncSessionLocal() as session:
yield session
@pytest.fixture
async def client():
async with AsyncClient(app=app, base_url="http://test") as ac:
yield ac
@pytest.fixture
async def test_user(db_session: AsyncSession):
user = User(
email="test@example.com",
username="testuser",
hashed_password=hash_password("password123")
)
db_session.add(user)
await db_session.commit()
await db_session.refresh(user)
return user
@pytest.fixture
async def auth_token(client: AsyncClient, test_user: User):
response = await client.post(
"/api/auth/login",
json={"username": "testuser", "password": "password123"}
)
return response.json()["access_token"]
@pytest.mark.asyncio
async def test_register(client: AsyncClient):
response = await client.post(
"/api/auth/register",
json={
"email": "newuser@example.com",
"username": "newuser",
"password": "password123"
}
)
assert response.status_code == 200
data = response.json()
assert data["email"] == "newuser@example.com"
assert data["username"] == "newuser"
@pytest.mark.asyncio
async def test_login(client: AsyncClient, test_user: User):
response = await client.post(
"/api/auth/login",
json={"username": "testuser", "password": "password123"}
)
assert response.status_code == 200
data = response.json()
assert "access_token" in data
assert "refresh_token" in data
@pytest.mark.asyncio
async def test_get_products(client: AsyncClient, auth_token: str):
response = await client.get(
"/api/products/",
headers={"Authorization": f"Bearer {auth_token}"}
)
assert response.status_code == 200
@pytest.mark.asyncio
async def test_create_order(client: AsyncClient, auth_token: str, db_session: AsyncSession):
from app.models.product import Product
product = Product(
name="测试商品",
price=99.9,
stock=100
)
db_session.add(product)
await db_session.commit()
await db_session.refresh(product)
response = await client.post(
"/api/orders/",
headers={"Authorization": f"Bearer {auth_token}"},
json={
"items": [
{"product_id": product.id, "quantity": 2}
]
}
)
assert response.status_code == 200
data = response.json()
assert data["total_amount"] == 199.8
12. 项目总结与扩展建议
12.1 项目回顾
本项目是一个完整的电商订单 API 服务,涵盖了从开发到部署的全流程。以下是核心实现的功能模块:
| 模块 | 实现内容 | 技术要点 |
|---|---|---|
| 用户认证 | 注册、登录、JWT Token | bcrypt加密、Token刷新机制 |
| 商品管理 | CRUD、搜索、分页 | SQLAlchemy异步查询、缓存策略 |
| 订单系统 | 创建、查询、取消 | 事务处理、库存扣减、缓存失效 |
| 限流保护 | IP级别限流 | Redis计数器、滑动窗口 |
| 熔断机制 | 服务熔断降级 | 状态机模式、故障恢复 |
| 监控告警 | 指标采集、告警通知 | Prometheus、Grafana、Alertmanager |
| 日志收集 | 结构化日志 | JSON格式、Fluentd、Elasticsearch |
| 容器化 | Docker、K8s部署 | 多阶段构建、健康检查、HPA |
12.2 架构设计亮点
-
分层架构清晰
- API层:处理HTTP请求/响应
- Service层:业务逻辑处理
- Model层:数据模型定义
- 职责分离,便于单元测试
-
异步全链路
- 数据库操作使用 asyncpg
- Redis操作使用 aioredis
- HTTP服务使用 Uvicorn + FastAPI
- 最大化I/O并发性能
-
缓存策略完善
- 读多写少数据:5分钟缓存
- 列表数据:1分钟缓存
- 写操作后主动失效缓存
- 避免缓存穿透、雪崩
-
安全防护到位
- JWT Token认证
- 密码bcrypt加密
- 限流防刷
- 熔断保护下游服务
12.3 性能优化建议
数据库优化
| 优化项 | 具体措施 | 预期效果 |
|---|---|---|
| 连接池 | 调整pool_size和max_overflow | 减少连接创建开销 |
| 索引优化 | 添加复合索引 | 查询性能提升50%+ |
| 读写分离 | 主库写、从库读 | 支撑更高并发 |
| 分库分表 | 订单表按用户ID分片 | 单表数据量控制 |
| 查询优化 | 避免N+1查询 | 减少数据库往返 |
缓存优化
# 多级缓存示例
async def get_product_with_cache(product_id: int):
# L1: 本地缓存(如cachetools)
if product_id in local_cache:
return local_cache[product_id]
# L2: Redis缓存
cache_key = f"product:{product_id}"
cached = await redis.get(cache_key)
if cached:
product = json.loads(cached)
local_cache[product_id] = product
return product
# L3: 数据库
product = await db.get(product_id)
await redis.set(cache_key, json.dumps(product), ex=300)
local_cache[product_id] = product
return product
异步优化
# 并行查询示例
async def get_order_details(order_id: int):
order, items, user = await asyncio.gather(
get_order(order_id),
get_order_items(order_id),
get_order_user(order_id)
)
return {"order": order, "items": items, "user": user}
12.4 功能扩展方向
短期扩展(1-2周)
| 功能 | 描述 | 技术方案 |
|---|---|---|
| 支付集成 | 对接支付宝/微信支付 | 异步回调、幂等处理 |
| 订单超时 | 未支付订单自动取消 | Redis过期监听/Celery定时任务 |
| 库存扣减优化 | 防止超卖 | Redis分布式锁 + 数据库乐观锁 |
| 邮件通知 | 订单状态变更通知 | Celery + SMTP |
中期扩展(1-2月)
| 功能 | 描述 | 技术方案 |
|---|---|---|
| 消息队列 | 订单事件异步处理 | RabbitMQ / Kafka |
| 搜索服务 | 商品全文搜索 | Elasticsearch |
| 推荐系统 | 个性化商品推荐 | 协同过滤 / 深度学习 |
| 秒杀系统 | 高并发秒杀场景 | Redis预减库存、消息队列削峰 |
长期规划(3-6月)
| 功能 | 描述 | 技术方案 |
|---|---|---|
| 微服务拆分 | 独立部署各服务 | gRPC通信、服务注册发现 |
| 分布式事务 | 跨服务数据一致性 | Saga模式、TCC |
| 链路追踪 | 全链路性能分析 | Jaeger / Zipkin |
| 多租户 | SaaS化支持 | 数据隔离、配置隔离 |
12.5 生产环境检查清单
部署前务必检查以下事项:
[ ] 环境变量配置正确(SECRET_KEY必须修改)
[ ] 数据库连接使用SSL
[ ] Redis配置密码认证
[ ] 日志级别设置为INFO或WARN
[ ] 关闭DEBUG模式
[ ] 配置合适的资源限制(CPU/内存)
[ ] 健康检查端点可访问
[ ] 监控告警已配置
[ ] 备份策略已制定
[ ] 回滚方案已准备
13. 系列完结
13.1 核心知识点回顾
| 章节 | 核心知识点 |
|---|---|
| 性能优化 | 数据库索引、缓存策略、异步处理 |
| 异步编程 | asyncio、aiohttp、异步数据库 |
| 分布式系统 | 微服务、消息队列、分布式锁 |
| 高并发架构 | 限流、熔断、负载均衡 |
| 安全编程 | JWT 认证、密码加密、输入验证 |
| DevOps | Docker、CI/CD、监控告警 |
13.2 架构设计要点
- 分层架构:API 层 → 服务层 → 数据层,职责清晰。
- 异步优先:I/O 操作全部异步,提高并发能力。
- 缓存策略:热点数据缓存,减少数据库压力。
- 安全防护:限流、认证、输入验证,层层把关。
- 可观测性:日志、指标、链路追踪,问题可追溯。
13.3 学习路径建议
如果你是按照这个系列一路学过来的,建议按以下路径继续深入:
基础巩固 → 源码阅读 → 项目实战 → 领域深入
↓ ↓ ↓ ↓
Python核心 FastAPI源码 开源贡献 云原生
算法数据结构 SQLAlchemy 个人项目 分布式系统
网络协议 asyncio 技术博客 架构设计
13.4 推荐学习资源
| 类型 | 资源名称 | 说明 |
|---|---|---|
| 书籍 | 《Fluent Python》 | Python进阶必读 |
| 书籍 | 《Designing Data-Intensive Applications》 | 分布式系统经典 |
| 文档 | FastAPI官方文档 | 最权威的学习资料 |
| 源码 | Celery/Redis-py | 学习优秀代码设计 |
| 社区 | Python Weekly | 订阅技术周刊 |
14. 写在最后
恭喜你完成了《Python 进阶之路:从工程师到架构师》系列!
系列索引
- 第1讲 | 性能优化与调优实战
- 第2讲 | 异步编程深度解析
- 第3讲 | 分布式系统入门
- 第4讲 | 高并发架构设计
- 第5讲 | 安全编程实战
- 第6讲 | 数据工程与 ETL
- 第7讲 | 设计模式进阶
- 第8讲 | Python 内部机制
- 第9讲 | DevOps 与 CI/CD
- 第10讲 | 综合项目实战
写在最后:
兄弟们,编程是一场长跑,不是短跑。这个系列结束了,但你的学习之路才刚刚开始。记住三个原则:
- 实践出真知:看十遍教程,不如自己动手写一个项目。
- 持续学习:技术在不断演进,保持好奇心和学习能力。
- 分享交流:教是最好的学,把你的知识分享出去。
如果这个系列对你有帮助,请点赞、收藏、关注! 你的支持是我持续产出高质量内容的动力。
咱们江湖再见,代码里见!
文档总结
本文档完整介绍了从零构建高可用电商订单 API 服务的全过程,主要内容包括:
一、架构设计
- 微服务架构演进规划
- 数据库ER图与索引设计
- RESTful API 规范定义
二、核心实现
- FastAPI 异步 Web 框架应用
- SQLAlchemy 异步数据库操作
- Redis 缓存策略实现
- JWT 认证与权限控制
- 限流与熔断中间件
三、部署运维
- Docker 多阶段构建
- Kubernetes 完整部署配置
- Prometheus + Grafana 监控体系
- Fluentd + Elasticsearch 日志收集
- GitHub Actions CI/CD 流水线
四、项目扩展
- 性能优化建议(数据库、缓存、异步)
- 功能扩展路线图(短期/中期/长期)
- 生产环境检查清单
通过本项目的学习,你将掌握构建企业级 Python Web 服务的完整技能栈,为成为架构师打下坚实基础。
更多推荐
所有评论(0)