GLM-4.7-Flash代码实例:FastAPI封装API,支持JWT鉴权与QPS限流的企业级接入

你是不是已经部署好了GLM-4.7-Flash,用Web界面玩得很开心,但一想到要把它集成到公司的业务系统里,就有点头疼?

直接调用原始的OpenAI兼容API,总觉得少了点什么——没有用户认证,谁都能调用;没有限流保护,万一被刷爆了怎么办;没有统一的错误处理,出了问题排查起来像大海捞针。

今天,我就带你手把手搭建一个企业级的API服务层。我们用FastAPI把GLM-4.7-Flash包装起来,加上JWT鉴权确保只有合法用户能访问,加上QPS限流防止服务被滥用,再加上统一的日志和错误处理。最终,你会得到一个开箱即用、安全可靠、易于管理的生产级AI服务接口。

1. 项目目标与核心价值

在开始写代码之前,我们先搞清楚为什么要做这件事,以及做完之后能带来什么好处。

1.1 我们要解决什么问题?

直接使用镜像提供的 http://127.0.0.1:8000/v1/chat/completions 接口,虽然简单,但在企业环境下有几个明显的短板:

  1. 没有身份验证:任何知道地址的人都能调用,无法区分不同用户或应用
  2. 缺乏访问控制:无法限制单个用户的调用频率,容易被恶意刷量
  3. 错误处理不统一:底层模型服务的错误信息可能不够友好,也不便于监控
  4. 缺少业务逻辑:无法在调用前后添加审计、计费、内容过滤等业务逻辑
  5. 接口不够友好:可能需要适配公司内部已有的调用规范

1.2 企业级API服务应该具备什么?

一个合格的生产环境API服务,至少应该包含这些能力:

  • 身份认证:确保只有授权用户/应用可以访问
  • 访问限流:防止单个用户过度使用,保护后端服务
  • 请求审计:记录谁在什么时候调用了什么,便于追溯
  • 统一错误处理:给客户端返回格式一致的错误信息
  • 服务健康检查:监控API服务本身和后端模型服务的状态
  • 配置化管理:所有参数都可以通过配置文件调整,无需修改代码

1.3 技术选型:为什么是FastAPI?

FastAPI是目前Python领域构建API服务的事实标准,选择它有充分的理由:

  • 性能卓越:基于Starlette和Pydantic,速度堪比NodeJS和Go
  • 开发高效:自动生成交互式API文档,类型提示减少bug
  • 生态丰富:有成熟的JWT、限流、日志等中间件和工具
  • 学习曲线平缓:如果你用过Flask,迁移到FastAPI几乎无成本

接下来,我们就从零开始,一步步构建这个企业级API服务。

2. 环境准备与项目结构

我们先搭建好开发环境,并规划一个清晰的项目结构。好的开始是成功的一半。

2.1 环境要求与依赖安装

确保你已经部署了GLM-4.7-Flash镜像,并且可以通过 http://127.0.0.1:8000/v1/chat/completions 正常调用。

在我们的API服务项目中,创建一个新的Python环境,然后安装必要的依赖:

# 创建项目目录
mkdir glm-enterprise-api
cd glm-enterprise-api

# 创建虚拟环境(推荐)
python -m venv venv
source venv/bin/activate  # Linux/Mac
# venv\Scripts\activate  # Windows

# 安装核心依赖
pip install fastapi uvicorn httpx python-jose[cryptography] passlib[bcrypt] python-multipart

# 安装限流和配置管理相关
pip install slowapi python-dotenv redis

# 安装开发工具(可选,但推荐)
pip install black isort pylint

主要依赖说明:

  • fastapi + uvicorn:Web框架和ASGI服务器
  • httpx:异步HTTP客户端,用于调用后端GLM服务
  • python-jose:JWT令牌的生成和验证
  • passlib:密码哈希(用于模拟用户系统)
  • slowapi:限流中间件
  • python-dotenv:环境变量管理
  • redis:限流器的存储后端(如果需要分布式限流)

2.2 项目结构规划

一个清晰的项目结构能让后续的开发和维护事半功倍:

glm-enterprise-api/
├── app/
│   ├── __init__.py
│   ├── main.py              # FastAPI应用入口
│   ├── config.py            # 配置管理
│   ├── dependencies.py      # 依赖注入(认证、限流等)
│   ├── models.py           # Pydantic数据模型
│   ├── routers/            # 路由模块
│   │   ├── __init__.py
│   │   ├── auth.py         # 认证相关路由
│   │   ├── chat.py         # 聊天对话路由
│   │   └── health.py       # 健康检查路由
│   ├── services/           # 业务逻辑层
│   │   ├── __init__.py
│   │   ├── auth_service.py # 认证服务
│   │   ├── glm_service.py  # GLM模型调用服务
│   │   └── rate_limit.py   # 限流服务
│   ├── utils/              # 工具函数
│   │   ├── __init__.py
│   │   ├── logger.py       # 日志配置
│   │   └── exceptions.py   # 自定义异常
│   └── middleware/         # 中间件
│       ├── __init__.py
│       └── logging_middleware.py  # 日志中间件
├── tests/                  # 测试文件
├── .env.example           # 环境变量示例
├── requirements.txt       # 依赖列表
├── docker-compose.yml    # Docker编排(可选)
└── README.md             # 项目说明

这个结构遵循了关注点分离的原则,不同功能的代码放在不同的目录中,便于管理和维护。

3. 核心功能实现:从配置到认证

现在开始编写核心代码。我们先从配置管理开始,这是所有功能的基础。

3.1 配置管理:让一切可配置

创建 app/config.py,集中管理所有配置项:

import os
from typing import Optional
from pydantic_settings import BaseSettings
from dotenv import load_dotenv

# 加载.env文件
load_dotenv()

class Settings(BaseSettings):
    """应用配置"""
    
    # 应用基础配置
    APP_NAME: str = "GLM-Enterprise-API"
    APP_VERSION: str = "1.0.0"
    DEBUG: bool = os.getenv("DEBUG", "False").lower() == "true"
    
    # GLM后端服务配置
    GLM_BASE_URL: str = os.getenv("GLM_BASE_URL", "http://127.0.0.1:8000")
    GLM_API_PATH: str = os.getenv("GLM_API_PATH", "/v1/chat/completions")
    GLM_MODEL_NAME: str = os.getenv("GLM_MODEL_NAME", "/root/.cache/huggingface/ZhipuAI/GLM-4.7-Flash")
    GLM_TIMEOUT: int = int(os.getenv("GLM_TIMEOUT", "30"))
    
    # JWT认证配置
    SECRET_KEY: str = os.getenv("SECRET_KEY", "your-secret-key-change-in-production")
    ALGORITHM: str = os.getenv("ALGORITHM", "HS256")
    ACCESS_TOKEN_EXPIRE_MINUTES: int = int(os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES", "30"))
    
    # 限流配置
    RATE_LIMIT_ENABLED: bool = os.getenv("RATE_LIMIT_ENABLED", "True").lower() == "true"
    DEFAULT_RATE_LIMIT: str = os.getenv("DEFAULT_RATE_LIMIT", "10/minute")
    PREMIUM_RATE_LIMIT: str = os.getenv("PREMIUM_RATE_LIMIT", "50/minute")
    
    # Redis配置(用于分布式限流)
    REDIS_ENABLED: bool = os.getenv("REDIS_ENABLED", "False").lower() == "true"
    REDIS_HOST: str = os.getenv("REDIS_HOST", "localhost")
    REDIS_PORT: int = int(os.getenv("REDIS_PORT", "6379"))
    REDIS_PASSWORD: Optional[str] = os.getenv("REDIS_PASSWORD")
    
    # 日志配置
    LOG_LEVEL: str = os.getenv("LOG_LEVEL", "INFO")
    LOG_FILE: Optional[str] = os.getenv("LOG_FILE")
    
    class Config:
        env_file = ".env"

# 创建全局配置实例
settings = Settings()

同时创建 .env.example 文件,供使用者参考:

# GLM后端服务配置
GLM_BASE_URL=http://127.0.0.1:8000
GLM_API_PATH=/v1/chat/completions
GLM_MODEL_NAME=/root/.cache/huggingface/ZhipuAI/GLM-4.7-Flash
GLM_TIMEOUT=30

# JWT认证配置
SECRET_KEY=your-super-secret-jwt-key-change-this-in-production
ALGORITHM=HS256
ACCESS_TOKEN_EXPIRE_MINUTES=30

# 限流配置
RATE_LIMIT_ENABLED=True
DEFAULT_RATE_LIMIT=10/minute
PREMIUM_RATE_LIMIT=50/minute

# Redis配置
REDIS_ENABLED=False
REDIS_HOST=localhost
REDIS_PORT=6379
# REDIS_PASSWORD=your-redis-password

# 应用配置
DEBUG=False
LOG_LEVEL=INFO
# LOG_FILE=/var/log/glm-api.log

3.2 用户认证:JWT令牌的实现

认证是企业API的基石。我们实现一个完整的JWT认证流程。

首先,定义用户相关的数据模型,创建 app/models.py

from pydantic import BaseModel, EmailStr
from typing import Optional, List
from datetime import datetime
from enum import Enum

class UserRole(str, Enum):
    """用户角色枚举"""
    FREE = "free"      # 免费用户
    BASIC = "basic"    # 基础用户
    PREMIUM = "premium" # 高级用户
    ADMIN = "admin"    # 管理员

class UserBase(BaseModel):
    """用户基础信息"""
    username: str
    email: Optional[EmailStr] = None
    full_name: Optional[str] = None
    disabled: bool = False
    role: UserRole = UserRole.FREE

class UserCreate(UserBase):
    """创建用户时的数据模型"""
    password: str

class UserInDB(UserBase):
    """数据库中的用户模型"""
    id: str
    hashed_password: str
    created_at: datetime
    updated_at: datetime
    
    class Config:
        from_attributes = True

class Token(BaseModel):
    """令牌响应模型"""
    access_token: str
    token_type: str = "bearer"
    expires_in: int  # 过期时间(秒)

class TokenData(BaseModel):
    """令牌中存储的数据"""
    username: Optional[str] = None
    user_id: Optional[str] = None
    role: Optional[UserRole] = None
    scopes: List[str] = []

class ChatMessage(BaseModel):
    """聊天消息模型"""
    role: str  # "user" 或 "assistant"
    content: str

class ChatRequest(BaseModel):
    """聊天请求模型"""
    messages: List[ChatMessage]
    temperature: float = 0.7
    max_tokens: int = 2048
    stream: bool = False
    model: Optional[str] = None  # 如果不指定,使用配置的默认模型

class ChatResponse(BaseModel):
    """聊天响应模型"""
    id: str
    object: str = "chat.completion"
    created: int
    model: str
    choices: List[dict]
    usage: Optional[dict] = None

接下来,实现认证服务,创建 app/services/auth_service.py

import jwt
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
from passlib.context import CryptContext

from app.config import settings
from app.models import TokenData, UserRole

# 密码哈希上下文
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

# 模拟用户数据库(实际项目中应使用真实数据库)
fake_users_db = {
    "alice": {
        "id": "user_001",
        "username": "alice",
        "email": "alice@example.com",
        "full_name": "Alice Smith",
        "hashed_password": pwd_context.hash("alicepassword"),
        "disabled": False,
        "role": UserRole.PREMIUM,
        "created_at": datetime(2024, 1, 1),
        "updated_at": datetime(2024, 1, 1)
    },
    "bob": {
        "id": "user_002",
        "username": "bob",
        "email": "bob@example.com",
        "full_name": "Bob Johnson",
        "hashed_password": pwd_context.hash("bobpassword"),
        "disabled": False,
        "role": UserRole.BASIC,
        "created_at": datetime(2024, 1, 2),
        "updated_at": datetime(2024, 1, 2)
    }
}

class AuthService:
    """认证服务"""
    
    @staticmethod
    def verify_password(plain_password: str, hashed_password: str) -> bool:
        """验证密码"""
        return pwd_context.verify(plain_password, hashed_password)
    
    @staticmethod
    def get_password_hash(password: str) -> str:
        """生成密码哈希"""
        return pwd_context.hash(password)
    
    @staticmethod
    def get_user(username: str):
        """获取用户信息"""
        if username in fake_users_db:
            user_dict = fake_users_db[username]
            return user_dict
        return None
    
    @staticmethod
    def authenticate_user(username: str, password: str):
        """用户认证"""
        user = AuthService.get_user(username)
        if not user:
            return False
        if not AuthService.verify_password(password, user["hashed_password"]):
            return False
        return user
    
    @staticmethod
    def create_access_token(data: Dict[str, Any], expires_delta: Optional[timedelta] = None) -> str:
        """创建JWT访问令牌"""
        to_encode = data.copy()
        if expires_delta:
            expire = datetime.utcnow() + expires_delta
        else:
            expire = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
        
        to_encode.update({"exp": expire})
        encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
        return encoded_jwt
    
    @staticmethod
    def verify_token(token: str) -> Optional[TokenData]:
        """验证JWT令牌"""
        try:
            payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
            username: str = payload.get("sub")
            user_id: str = payload.get("user_id")
            role: str = payload.get("role")
            
            if username is None:
                return None
            
            token_data = TokenData(
                username=username,
                user_id=user_id,
                role=UserRole(role) if role else UserRole.FREE
            )
            return token_data
        except jwt.PyJWTError:
            return None

3.3 依赖注入:在路由中使用认证

创建 app/dependencies.py,实现FastAPI的依赖注入:

from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from typing import Optional

from app.config import settings
from app.services.auth_service import AuthService
from app.models import TokenData, UserRole

# OAuth2密码Bearer令牌方案
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="auth/token")

async def get_current_user(token: str = Depends(oauth2_scheme)) -> TokenData:
    """获取当前用户(依赖注入)"""
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="无法验证凭据",
        headers={"WWW-Authenticate": "Bearer"},
    )
    
    token_data = AuthService.verify_token(token)
    if token_data is None:
        raise credentials_exception
    
    # 检查用户是否被禁用
    user = AuthService.get_user(token_data.username)
    if user is None or user.get("disabled"):
        raise credentials_exception
    
    return token_data

async def get_current_active_user(
    current_user: TokenData = Depends(get_current_user)
) -> TokenData:
    """获取当前活跃用户"""
    if current_user.username is None:
        raise HTTPException(status_code=400, detail="用户未认证")
    return current_user

def require_role(required_role: UserRole):
    """角色权限检查装饰器"""
    def role_checker(current_user: TokenData = Depends(get_current_active_user)):
        user_role = current_user.role
        
        # 角色权限层级:ADMIN > PREMIUM > BASIC > FREE
        role_hierarchy = {
            UserRole.ADMIN: 4,
            UserRole.PREMIUM: 3,
            UserRole.BASIC: 2,
            UserRole.FREE: 1
        }
        
        if role_hierarchy.get(user_role, 0) < role_hierarchy.get(required_role, 0):
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail=f"需要{required_role.value}及以上权限"
            )
        return current_user
    return role_checker

4. 限流保护与GLM服务封装

有了认证基础,我们接着实现限流保护和GLM模型服务的封装。

4.1 智能限流:保护后端服务

创建 app/services/rate_limit.py,实现灵活的限流策略:

from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from typing import Optional, Callable
import redis
from functools import wraps

from app.config import settings
from app.models import UserRole

# 初始化限流器
limiter = Limiter(key_func=get_remote_address)

# Redis连接(用于分布式限流)
redis_client = None
if settings.REDIS_ENABLED:
    try:
        redis_client = redis.Redis(
            host=settings.REDIS_HOST,
            port=settings.REDIS_PORT,
            password=settings.REDIS_PASSWORD,
            decode_responses=True
        )
        # 测试连接
        redis_client.ping()
        print("Redis连接成功,启用分布式限流")
    except Exception as e:
        print(f"Redis连接失败,使用内存限流: {e}")
        redis_client = None

def get_rate_limit_key(user_role: Optional[UserRole] = None) -> str:
    """根据用户角色获取限流配置"""
    if not settings.RATE_LIMIT_ENABLED:
        return None
    
    if user_role == UserRole.ADMIN:
        return "1000/minute"  # 管理员几乎不限流
    elif user_role == UserRole.PREMIUM:
        return settings.PREMIUM_RATE_LIMIT
    elif user_role == UserRole.BASIC:
        return "20/minute"
    else:  # FREE or None
        return settings.DEFAULT_RATE_LIMIT

def rate_limit_by_user(limiter: Limiter):
    """根据用户角色动态限流的装饰器"""
    def decorator(func: Callable):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            # 这里需要从请求中获取用户信息
            # 实际使用时,这个装饰器应该与认证依赖结合
            return await func(*args, **kwargs)
        return wrapper
    return decorator

class RateLimitService:
    """限流服务"""
    
    @staticmethod
    def check_rate_limit(user_id: str, endpoint: str) -> bool:
        """检查用户是否超过限流(手动检查方式)"""
        if not settings.RATE_LIMIT_ENABLED:
            return True
        
        if redis_client:
            # 使用Redis进行分布式限流
            key = f"rate_limit:{user_id}:{endpoint}"
            current = redis_client.get(key)
            
            if current is None:
                # 第一次调用,设置过期时间
                redis_client.setex(key, 60, 1)
                return True
            elif int(current) < 10:  # 示例:限制10次/分钟
                redis_client.incr(key)
                return True
            else:
                return False
        else:
            # 内存限流(单机版)
            # 这里简化实现,实际应该使用更复杂的数据结构
            return True
    
    @staticmethod
    def get_user_quota(user_role: UserRole) -> dict:
        """获取用户配额信息"""
        quotas = {
            UserRole.FREE: {
                "requests_per_minute": 10,
                "max_tokens_per_request": 1024,
                "concurrent_requests": 1
            },
            UserRole.BASIC: {
                "requests_per_minute": 20,
                "max_tokens_per_request": 2048,
                "concurrent_requests": 3
            },
            UserRole.PREMIUM: {
                "requests_per_minute": 50,
                "max_tokens_per_request": 4096,
                "concurrent_requests": 10
            },
            UserRole.ADMIN: {
                "requests_per_minute": 1000,
                "max_tokens_per_request": 8192,
                "concurrent_requests": 50
            }
        }
        return quotas.get(user_role, quotas[UserRole.FREE])

4.2 GLM服务封装:统一调用接口

创建 app/services/glm_service.py,封装对GLM-4.7-Flash的调用:

import httpx
import json
import uuid
from datetime import datetime
from typing import AsyncGenerator, Dict, Any, Optional
import logging

from app.config import settings
from app.models import ChatRequest, ChatResponse
from app.utils.exceptions import GLMServiceError

logger = logging.getLogger(__name__)

class GLMService:
    """GLM模型服务封装"""
    
    def __init__(self):
        self.base_url = settings.GLM_BASE_URL
        self.api_url = f"{self.base_url}{settings.GLM_API_PATH}"
        self.default_model = settings.GLM_MODEL_NAME
        self.timeout = settings.GLM_TIMEOUT
        
        # 创建异步HTTP客户端
        self.client = httpx.AsyncClient(
            timeout=httpx.Timeout(self.timeout),
            limits=httpx.Limits(max_keepalive_connections=5, max_connections=10)
        )
    
    async def close(self):
        """关闭HTTP客户端"""
        await self.client.aclose()
    
    async def chat_completion(
        self, 
        chat_request: ChatRequest,
        user_id: Optional[str] = None,
        request_id: Optional[str] = None
    ) -> ChatResponse:
        """聊天补全(非流式)"""
        
        if request_id is None:
            request_id = str(uuid.uuid4())
        
        # 准备请求数据
        request_data = {
            "model": chat_request.model or self.default_model,
            "messages": [msg.dict() for msg in chat_request.messages],
            "temperature": chat_request.temperature,
            "max_tokens": chat_request.max_tokens,
            "stream": False
        }
        
        logger.info(f"GLM请求 [{request_id}]: {json.dumps(request_data, ensure_ascii=False)}")
        
        try:
            # 发送请求
            response = await self.client.post(
                self.api_url,
                json=request_data,
                headers={"Content-Type": "application/json"}
            )
            
            # 检查响应状态
            if response.status_code != 200:
                error_msg = f"GLM服务错误: {response.status_code} - {response.text}"
                logger.error(f"[{request_id}] {error_msg}")
                raise GLMServiceError(error_msg)
            
            # 解析响应
            result = response.json()
            
            # 添加请求ID和用户ID到响应中
            result["request_id"] = request_id
            if user_id:
                result["user_id"] = user_id
            
            logger.info(f"GLM响应 [{request_id}]: 成功生成 {result.get('usage', {}).get('total_tokens', 0)} tokens")
            
            return ChatResponse(**result)
            
        except httpx.TimeoutException:
            error_msg = f"GLM服务请求超时 ({self.timeout}秒)"
            logger.error(f"[{request_id}] {error_msg}")
            raise GLMServiceError(error_msg)
        except httpx.RequestError as e:
            error_msg = f"GLM服务请求失败: {str(e)}"
            logger.error(f"[{request_id}] {error_msg}")
            raise GLMServiceError(error_msg)
        except json.JSONDecodeError as e:
            error_msg = f"GLM响应JSON解析失败: {str(e)}"
            logger.error(f"[{request_id}] {error_msg}")
            raise GLMServiceError(error_msg)
    
    async def chat_completion_stream(
        self, 
        chat_request: ChatRequest,
        user_id: Optional[str] = None,
        request_id: Optional[str] = None
    ) -> AsyncGenerator[str, None]:
        """聊天补全(流式输出)"""
        
        if request_id is None:
            request_id = str(uuid.uuid4())
        
        # 准备请求数据
        request_data = {
            "model": chat_request.model or self.default_model,
            "messages": [msg.dict() for msg in chat_request.messages],
            "temperature": chat_request.temperature,
            "max_tokens": chat_request.max_tokens,
            "stream": True
        }
        
        logger.info(f"GLM流式请求 [{request_id}]: 开始流式生成")
        
        try:
            # 发送流式请求
            async with self.client.stream(
                "POST",
                self.api_url,
                json=request_data,
                headers={"Content-Type": "application/json"}
            ) as response:
                
                # 检查响应状态
                if response.status_code != 200:
                    error_text = await response.aread()
                    error_msg = f"GLM服务错误: {response.status_code} - {error_text.decode()}"
                    logger.error(f"[{request_id}] {error_msg}")
                    raise GLMServiceError(error_msg)
                
                # 流式读取响应
                async for chunk in response.aiter_lines():
                    if chunk:
                        # SSE格式: data: {...}
                        if chunk.startswith("data: "):
                            data = chunk[6:]  # 去掉"data: "前缀
                            if data.strip() == "[DONE]":
                                break
                            
                            try:
                                chunk_data = json.loads(data)
                                # 添加请求ID到每个chunk
                                chunk_data["request_id"] = request_id
                                if user_id:
                                    chunk_data["user_id"] = user_id
                                
                                yield f"data: {json.dumps(chunk_data, ensure_ascii=False)}\n\n"
                            except json.JSONDecodeError:
                                logger.warning(f"[{request_id}] 无法解析流式chunk: {data}")
                                continue
                
                logger.info(f"GLM流式响应 [{request_id}]: 流式生成完成")
                
        except httpx.TimeoutException:
            error_msg = f"GLM流式请求超时 ({self.timeout}秒)"
            logger.error(f"[{request_id}] {error_msg}")
            raise GLMServiceError(error_msg)
        except Exception as e:
            error_msg = f"GLM流式请求失败: {str(e)}"
            logger.error(f"[{request_id}] {error_msg}")
            raise GLMServiceError(error_msg)
    
    async def health_check(self) -> Dict[str, Any]:
        """健康检查"""
        try:
            # 尝试调用GLM服务的健康检查或简单请求
            health_url = f"{self.base_url}/health"
            response = await self.client.get(health_url, timeout=5)
            
            if response.status_code == 200:
                return {
                    "status": "healthy",
                    "glm_service": "available",
                    "response_time": response.elapsed.total_seconds()
                }
            else:
                return {
                    "status": "unhealthy",
                    "glm_service": "unavailable",
                    "error": f"HTTP {response.status_code}"
                }
        except Exception as e:
            return {
                "status": "unhealthy",
                "glm_service": "unavailable",
                "error": str(e)
            }

# 创建全局GLM服务实例
glm_service = GLMService()

5. 路由实现与API端点

现在我们把所有组件组合起来,创建完整的API路由。

5.1 认证路由

创建 app/routers/auth.py

from datetime import timedelta
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm

from app.config import settings
from app.models import Token, UserCreate
from app.services.auth_service import AuthService
from app.dependencies import get_current_user

router = APIRouter(prefix="/auth", tags=["认证"])

@router.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    """获取访问令牌"""
    user = AuthService.authenticate_user(form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="用户名或密码错误",
            headers={"WWW-Authenticate": "Bearer"},
        )
    
    # 创建访问令牌
    access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = AuthService.create_access_token(
        data={
            "sub": user["username"],
            "user_id": user["id"],
            "role": user["role"].value if hasattr(user["role"], "value") else user["role"]
        },
        expires_delta=access_token_expires
    )
    
    return {
        "access_token": access_token,
        "token_type": "bearer",
        "expires_in": int(access_token_expires.total_seconds())
    }

@router.get("/me")
async def read_users_me(current_user: dict = Depends(get_current_user)):
    """获取当前用户信息"""
    user = AuthService.get_user(current_user.username)
    if not user:
        raise HTTPException(status_code=404, detail="用户不存在")
    
    # 移除敏感信息
    user.pop("hashed_password", None)
    return user

@router.post("/register")
async def register_user(user_data: UserCreate):
    """注册新用户(示例)"""
    # 检查用户是否已存在
    if AuthService.get_user(user_data.username):
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="用户名已存在"
        )
    
    # 在实际项目中,这里应该将用户保存到数据库
    # 这里只是示例,返回成功消息
    return {
        "message": "用户注册成功(示例)",
        "username": user_data.username,
        "note": "在实际项目中,这里会真正创建用户并保存到数据库"
    }

5.2 聊天路由(核心功能)

创建 app/routers/chat.py

import uuid
from fastapi import APIRouter, Depends, HTTPException, Request
from fastapi.responses import StreamingResponse
from slowapi import Limiter
from slowapi.util import get_remote_address

from app.config import settings
from app.models import ChatRequest, ChatResponse
from app.dependencies import get_current_active_user, require_role
from app.services.glm_service import glm_service
from app.services.rate_limit_service import RateLimitService, get_rate_limit_key
from app.models import TokenData, UserRole

router = APIRouter(prefix="/chat", tags=["聊天"])

# 初始化限流器
limiter = Limiter(key_func=get_remote_address)

@router.post(
    "/completions",
    response_model=ChatResponse,
    summary="聊天补全(非流式)",
    description="调用GLM-4.7-Flash进行聊天补全,返回完整响应"
)
@limiter.limit(lambda: get_rate_limit_key())
async def chat_completion(
    request: Request,
    chat_request: ChatRequest,
    current_user: TokenData = Depends(get_current_active_user)
):
    """聊天补全接口"""
    
    # 检查用户配额
    quota = RateLimitService.get_user_quota(current_user.role)
    
    # 验证请求参数
    if chat_request.max_tokens > quota["max_tokens_per_request"]:
        raise HTTPException(
            status_code=400,
            detail=f"max_tokens不能超过{quota['max_tokens_per_request']}(当前角色限制)"
        )
    
    # 生成请求ID
    request_id = str(uuid.uuid4())
    
    # 调用GLM服务
    try:
        response = await glm_service.chat_completion(
            chat_request=chat_request,
            user_id=current_user.user_id,
            request_id=request_id
        )
        
        # 记录审计日志(示例)
        print(f"[AUDIT] 用户 {current_user.username} 调用了聊天接口,请求ID: {request_id}")
        
        return response
        
    except Exception as e:
        # 统一错误处理
        raise HTTPException(
            status_code=500,
            detail=f"服务处理失败: {str(e)}"
        )

@router.post(
    "/completions/stream",
    summary="聊天补全(流式)",
    description="调用GLM-4.7-Flash进行流式聊天补全,返回SSE流"
)
@limiter.limit(lambda: get_rate_limit_key())
async def chat_completion_stream(
    request: Request,
    chat_request: ChatRequest,
    current_user: TokenData = Depends(get_current_active_user)
):
    """流式聊天补全接口"""
    
    # 检查用户配额
    quota = RateLimitService.get_user_quota(current_user.role)
    
    # 验证请求参数
    if chat_request.max_tokens > quota["max_tokens_per_request"]:
        raise HTTPException(
            status_code=400,
            detail=f"max_tokens不能超过{quota['max_tokens_per_request']}(当前角色限制)"
        )
    
    # 生成请求ID
    request_id = str(uuid.uuid4())
    
    # 设置流式响应
    async def stream_generator():
        try:
            async for chunk in glm_service.chat_completion_stream(
                chat_request=chat_request,
                user_id=current_user.user_id,
                request_id=request_id
            ):
                yield chunk
        except Exception as e:
            # 流式响应中的错误处理
            error_data = {
                "error": {
                    "message": f"流式生成失败: {str(e)}",
                    "type": "stream_error",
                    "request_id": request_id
                }
            }
            yield f"data: {json.dumps(error_data)}\n\n"
    
    return StreamingResponse(
        stream_generator(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Request-ID": request_id,
            "X-User-ID": current_user.user_id or ""
        }
    )

@router.get("/models")
async def list_models(current_user: TokenData = Depends(get_current_active_user)):
    """获取可用模型列表"""
    return {
        "models": [
            {
                "id": settings.GLM_MODEL_NAME,
                "name": "GLM-4.7-Flash",
                "description": "智谱AI最新一代大语言模型,30B参数MoE架构",
                "max_tokens": 4096,
                "supports_streaming": True
            }
        ],
        "default_model": settings.GLM_MODEL_NAME
    }

5.3 健康检查路由

创建 app/routers/health.py

from fastapi import APIRouter, Depends
from datetime import datetime
import psutil
import os

from app.config import settings
from app.services.glm_service import glm_service

router = APIRouter(prefix="/health", tags=["健康检查"])

@router.get("")
async def health_check():
    """健康检查端点"""
    
    # 检查GLM服务健康状态
    glm_health = await glm_service.health_check()
    
    # 系统信息
    system_info = {
        "timestamp": datetime.utcnow().isoformat() + "Z",
        "service": settings.APP_NAME,
        "version": settings.APP_VERSION,
        "status": "healthy" if glm_health.get("status") == "healthy" else "degraded",
        "system": {
            "cpu_percent": psutil.cpu_percent(),
            "memory_percent": psutil.virtual_memory().percent,
            "disk_usage": psutil.disk_usage("/").percent,
            "process_id": os.getpid()
        },
        "dependencies": {
            "glm_service": glm_health
        }
    }
    
    # 如果GLM服务不可用,返回503
    if glm_health.get("status") != "healthy":
        from fastapi import HTTPException
        raise HTTPException(
            status_code=503,
            detail=system_info
        )
    
    return system_info

@router.get("/readiness")
async def readiness_probe():
    """就绪探针"""
    glm_health = await glm_service.health_check()
    
    if glm_health.get("status") == "healthy":
        return {"status": "ready"}
    else:
        from fastapi import HTTPException
        raise HTTPException(
            status_code=503,
            detail={"status": "not_ready", "reason": "GLM服务不可用"}
        )

@router.get("/liveness")
async def liveness_probe():
    """存活探针"""
    return {"status": "alive"}

6. 应用入口与中间件

最后,我们把所有部分组合起来,创建完整的FastAPI应用。

6.1 应用主入口

创建 app/main.py

from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.errors import RateLimitExceeded
from slowapi.util import get_remote_address
import logging
import time

from app.config import settings
from app.routers import auth, chat, health
from app.utils.logger import setup_logging
from app.utils.exceptions import GLMServiceError
from app.services.rate_limit_service import limiter

# 设置日志
setup_logging()
logger = logging.getLogger(__name__)

# 创建FastAPI应用
app = FastAPI(
    title=settings.APP_NAME,
    description="GLM-4.7-Flash企业级API服务,支持JWT鉴权与QPS限流",
    version=settings.APP_VERSION,
    docs_url="/docs" if settings.DEBUG else None,
    redoc_url="/redoc" if settings.DEBUG else None
)

# CORS中间件
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # 生产环境应该限制具体域名
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# 注册限流器
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)

# 注册路由
app.include_router(auth.router)
app.include_router(chat.router)
app.include_router(health.router)

# 自定义中间件:请求日志
@app.middleware("http")
async def log_requests(request: Request, call_next):
    """记录请求日志"""
    request_id = request.headers.get("X-Request-ID") or str(int(time.time() * 1000))
    
    # 记录请求开始
    start_time = time.time()
    
    logger.info(f"请求开始 [{request_id}] {request.method} {request.url.path}")
    
    # 处理请求
    try:
        response = await call_next(request)
        
        # 记录请求完成
        process_time = time.time() - start_time
        response.headers["X-Process-Time"] = str(process_time)
        response.headers["X-Request-ID"] = request_id
        
        logger.info(f"请求完成 [{request_id}] 状态码:{response.status_code} 耗时:{process_time:.3f}s")
        
        return response
        
    except Exception as e:
        # 记录请求异常
        process_time = time.time() - start_time
        logger.error(f"请求异常 [{request_id}] 错误:{str(e)} 耗时:{process_time:.3f}s")
        raise

# 全局异常处理
@app.exception_handler(GLMServiceError)
async def glm_service_exception_handler(request: Request, exc: GLMServiceError):
    """GLM服务异常处理"""
    logger.error(f"GLM服务异常: {str(exc)}")
    return JSONResponse(
        status_code=503,
        content={
            "error": {
                "message": "AI服务暂时不可用",
                "type": "service_unavailable",
                "detail": str(exc)
            }
        }
    )

@app.exception_handler(Exception)
async def general_exception_handler(request: Request, exc: Exception):
    """通用异常处理"""
    logger.error(f"未处理异常: {str(exc)}", exc_info=True)
    return JSONResponse(
        status_code=500,
        content={
            "error": {
                "message": "服务器内部错误",
                "type": "internal_error"
            }
        }
    )

# 根路径
@app.get("/")
async def root():
    """根路径"""
    return {
        "service": settings.APP_NAME,
        "version": settings.APP_VERSION,
        "status": "running",
        "docs": "/docs" if settings.DEBUG else None,
        "endpoints": {
            "auth": "/auth/token",
            "chat": "/chat/completions",
            "health": "/health"
        }
    }

# 启动事件
@app.on_event("startup")
async def startup_event():
    """应用启动事件"""
    logger.info(f"{settings.APP_NAME} v{settings.APP_VERSION} 正在启动...")
    logger.info(f"GLM服务地址: {settings.GLM_BASE_URL}")
    logger.info(f"JWT密钥: {'已设置' if settings.SECRET_KEY != 'your-secret-key-change-in-production' else '使用默认值,生产环境请修改!'}")
    logger.info(f"限流启用: {settings.RATE_LIMIT_ENABLED}")

@app.on_event("shutdown")
async def shutdown_event():
    """应用关闭事件"""
    logger.info(f"{settings.APP_NAME} 正在关闭...")
    await glm_service.close()

# 主程序入口
if __name__ == "__main__":
    import uvicorn
    
    uvicorn.run(
        "app.main:app",
        host="0.0.0.0",
        port=8001,  # 使用8001端口,避免与GLM服务的8000端口冲突
        reload=settings.DEBUG,
        log_level=settings.LOG_LEVEL.lower()
    )

6.2 工具函数:日志和异常

创建 app/utils/logger.py

import logging
import sys
from logging.handlers import RotatingFileHandler
from app.config import settings

def setup_logging():
    """配置日志"""
    
    # 创建日志格式
    formatter = logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    
    # 获取根日志记录器
    logger = logging.getLogger()
    logger.setLevel(getattr(logging, settings.LOG_LEVEL.upper()))
    
    # 控制台处理器
    console_handler = logging.StreamHandler(sys.stdout)
    console_handler.setFormatter(formatter)
    logger.addHandler(console_handler)
    
    # 文件处理器(如果配置了日志文件)
    if settings.LOG_FILE:
        file_handler = RotatingFileHandler(
            settings.LOG_FILE,
            maxBytes=10 * 1024 * 1024,  # 10MB
            backupCount=5
        )
        file_handler.setFormatter(formatter)
        logger.addHandler(file_handler)
    
    # 设置第三方库的日志级别
    logging.getLogger("httpx").setLevel(logging.WARNING)
    logging.getLogger("httpcore").setLevel(logging.WARNING)

创建 app/utils/exceptions.py

class GLMServiceError(Exception):
    """GLM服务异常"""
    pass

class RateLimitExceededError(Exception):
    """限流异常"""
    pass

class AuthenticationError(Exception):
    """认证异常"""
    pass

class AuthorizationError(Exception):
    """授权异常"""
    pass

7. 部署与使用指南

现在,我们的企业级API服务已经完成了。让我们看看如何部署和使用它。

7.1 启动服务

首先,确保GLM-4.7-Flash服务正在运行(端口8000),然后启动我们的API服务:

# 激活虚拟环境
source venv/bin/activate  # Linux/Mac
# venv\Scripts\activate  # Windows

# 安装依赖
pip install -r requirements.txt

# 复制环境变量文件并修改
cp .env.example .env
# 编辑.env文件,设置你的配置

# 启动服务
python -m app.main

服务启动后,访问 http://localhost:8001/docs 可以看到自动生成的API文档。

7.2 使用示例

获取访问令牌
import requests

# 获取JWT令牌
response = requests.post(
    "http://localhost:8001/auth/token",
    data={
        "username": "alice",
        "password": "alicepassword"
    }
)

token_data = response.json()
access_token = token_data["access_token"]
print(f"访问令牌: {access_token}")
调用聊天接口
import requests
import json

# 设置请求头
headers = {
    "Authorization": f"Bearer {access_token}",
    "Content-Type": "application/json"
}

# 准备聊天请求
chat_request = {
    "messages": [
        {"role": "user", "content": "你好,请介绍一下你自己"}
    ],
    "temperature": 0.7,
    "max_tokens": 500,
    "stream": False
}

# 发送请求
response = requests.post(
    "http://localhost:8001/chat/completions",
    headers=headers,
    json=chat_request
)

result = response.json()
print(f"AI回复: {result['choices'][0]['message']['content']}")
流式调用
import requests
import json

# 流式请求
chat_request = {
    "messages": [
        {"role": "user", "content": "写一个关于AI的短故事"}
    ],
    "temperature": 0.8,
    "max_tokens": 1000,
    "stream": True
}

response = requests.post(
    "http://localhost:8001/chat/completions/stream",
    headers=headers,
    json=chat_request,
    stream=True
)

# 处理流式响应
for line in response.iter_lines():
    if line:
        line = line.decode('utf-8')
        if line.startswith('data: '):
            data = line[6:]  # 去掉"data: "前缀
            if data.strip() != '[DONE]':
                try:
                    chunk = json.loads(data)
                    if 'choices' in chunk and chunk['choices']:
                        delta = chunk['choices'][0].get('delta', {})
                        if 'content' in delta:
                            print(delta['content'], end='', flush=True)
                except json.JSONDecodeError:
                    continue
print()  # 换行

7.3 Docker部署(可选)

创建 Dockerfile

FROM python:3.9-slim

WORKDIR /app

# 安装系统依赖
RUN apt-get update && apt-get install -y \
    gcc \
    && rm -rf /var/lib/apt/lists/*

# 复制依赖文件
COPY requirements.txt .

# 安装Python依赖
RUN pip install --no-cache-dir -r requirements.txt

# 复制应用代码
COPY app ./app

# 创建非root用户
RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app
USER appuser

# 暴露端口
EXPOSE 8001

# 启动命令
CMD ["python", "-m", "app.main"]

创建 docker-compose.yml

version: '3.8'

services:
  glm-api:
    build: .
    ports:
      - "8001:8001"
    environment:
      - GLM_BASE_URL=http://glm-service:8000
      - SECRET_KEY=your-production-secret-key-change-this
      - DEBUG=False
    depends_on:
      - glm-service
    restart: unless-stopped
    volumes:
      - ./logs:/app/logs

  glm-service:
    # 这里假设GLM服务也在Docker中运行
    image: glm-4.7-flash-image  # 替换为你的GLM镜像
    ports:
      - "8000:8000"
    restart: unless-stopped

8. 总结

通过这个完整的项目,我们成功地将GLM-4.7-Flash包装成了一个企业级的API服务。让我们回顾一下实现的核心功能:

8.1 实现的核心价值

  1. 安全认证:基于JWT的完整认证流程,支持用户角色和权限管理
  2. 智能限流:根据用户角色动态调整限流策略,保护后端服务
  3. 统一接口:提供了标准化、文档完善的RESTful API
  4. 错误处理:统一的错误响应格式和异常处理机制
  5. 监控审计:完整的请求日志、性能监控和审计追踪
  6. 配置管理:所有参数通过环境变量配置,便于不同环境部署

8.2 扩展建议

这个基础框架还可以根据实际需求进行扩展:

  1. 数据库集成:将用户信息从内存存储迁移到MySQL/PostgreSQL
  2. Redis缓存:实现响应缓存,减少重复请求对GLM服务的压力
  3. 消息队列:使用RabbitMQ或Kafka处理异步请求
  4. API网关:在前端添加API网关,实现更复杂的流量管理
  5. 监控告警:集成Prometheus和Grafana进行监控
  6. 分布式部署:支持多实例部署和负载均衡

8.3 生产环境建议

在实际生产环境中部署时,还需要考虑:

  1. 密钥管理:使用专业的密钥管理服务(如HashiCorp Vault)
  2. HTTPS加密:配置SSL证书,启用HTTPS
  3. 防火墙规则:限制只有特定IP可以访问管理接口
  4. 备份策略:定期备份配置和日志
  5. 灾难恢复:制定服务中断的应急预案

这个项目提供了一个坚实的基础,你可以根据具体的业务需求进行定制和扩展。最重要的是,现在你可以安全、可控地将GLM-4.7-Flash的强大能力集成到你的企业应用中,而不用担心安全、限流和可维护性问题。


获取更多AI镜像

想探索更多AI镜像和应用场景?访问 CSDN星图镜像广场,提供丰富的预置镜像,覆盖大模型推理、图像生成、视频生成、模型微调等多个领域,支持一键部署。

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐