在构建 Agent 系统(如 RAG、AutoGPT、工具调用 Agent) 时,一个非常常见的问题是:

Agent 需要频繁调用 LLM,如果接口是同步阻塞的,会严重影响系统吞吐量。

例如:

  • 多个 Agent 同时推理

  • RAG 检索 + LLM 生成

  • Tool 调用后再次调用 LLM

  • 多轮推理链

很容易出现API 服务吞吐量低、线程阻塞或者高并发性能差的问题

针对这个问题,本文将使用 FastAPI + aiohttp 构建异步 LLM 调用接口。

项目结构

my_pj
│
├── API_run.py
│
├── tools
│   ├── config.py
│   ├── fetch.py
│   └── chat_req.py

 
调用的LLM接口:https://cloud.siliconflow.cn/i/hQjWsIw3

1、fetch.py代码

这个是之前做爬虫的时候写的一套通用的异步请求框架,基本上所有的请求都会经过这里

from tools.config import logging
import aiohttp
from urllib.parse import parse_qs


def get_header():
    headers = {
        "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
        "accept-language": "zh-CN,zh;q=0.9",
        "priority": "u=0, i",
        "sec-ch-ua": "\"Not(A:Brand\";v=\"99\", \"Google Chrome\";v=\"133\", \"Chromium\";v=\"133\"",
        "sec-ch-ua-mobile": "?0",
        "sec-ch-ua-platform": "\"Windows\"",
        "sec-fetch-dest": "document",
        "sec-fetch-mode": "navigate",
        "sec-fetch-site": "none",
        "sec-fetch-user": "?1",
        "upgrade-insecure-requests": "1",
        "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36"
    }
    return headers


async def fetch(url: str, method='GET', headers=None, params=None, data=None, files=None, json=None, timeout=180,
                logger_print=True):
    if not headers:
        headers = get_header()
    if method not in ["GET", "POST", "HEAD", "OPTIONS", "TRACE", "PUT", "DELETE"]:
        response_data = 'method not in ["GET", "POST", "HEAD", "OPTIONS", "TRACE", "PUT", "DELETE"]'
        response_status = 400
        return response_data, response_status

    # 处理表单数据 / 文件, 支持同时处理类似requests请求中的 files + data两个参数
    form_data = None
    if files or data:
        form_data = aiohttp.FormData()
        # ---------- 1. data 是 dict----------
        if data and isinstance(data, dict):
            for key, value in data.items():
                # 表单不支持直接传入 int 类型跟 float 类型
                if isinstance(value, int) or isinstance(value, float):
                    form_data.add_field(key, str(value))
                else:
                    form_data.add_field(key, value)
        # ---------- 2. data 是 str, "a=1&b=2" → {"a":["1"],"b":["2"]}----------
        elif isinstance(data, str):
            parsed = parse_qs(data)  # {'task_id': ['xxx']}
            for key, values in parsed.items():
                form_data.add_field(key, values[0])  # 只取第一个值
        # ---------- 3. data 是 bytes ----------
        elif isinstance(data, bytes):
            decoded = data.decode("utf-8")
            parsed = parse_qs(decoded)
            for key, values in parsed.items():
                form_data.add_field(key, values[0])

        if files:
            # 重置指针, 防止文件多次读写时 files.read() 为空
            await files.seek(0)
            # 此处的files是通过http api传输过来的文件对象,与本地读取的不通用
            form_data.add_field(
                'file',  # 文件字段名称
                await files.read(),  # 文件内容
                filename=files.filename,  # 指定文件名
                content_type=files.content_type  # 指定内容类型
            )
    if logger_print:
        logging.info(f"正在发起请求 - url:{url}")

    timeout = aiohttp.ClientTimeout(total=timeout)  # 180 秒
    async with aiohttp.ClientSession(timeout=timeout) as session:
        async with session.request(method, url, headers=headers, data=form_data, json=json, params=params) as response:
            response_status = response.status

            msg = ""
            if params:
                msg += f", params:{params}"
            if form_data:
                msg += f", data:{form_data}"
            if json:
                msg += f", json:{str(json)[:200]}"
                pass
            # 根据响应内容类型解析数据
            if response.status in [200, 201, 204]:
                msg_heard = f"请求成功 - url:{url}"
                msg = msg_heard + msg
                if logger_print:
                    logging.info(msg)
            else:
                msg_heard = f"请求失败!- status:{response_status} - url:{url}"
                msg = msg_heard + msg
                if logger_print:
                    logging.error(msg)
                else:
                    logging.error(msg_heard)

            # 尝试解析为 JSON
            try:
                response_data = await response.json()
            except Exception:
                # 如果不是 JSON,强制返回文本
                response_data = await response.text()

            return response_data, response_status

2、config.py代码

这个可要可不要,主要是fastapi打印的时候会好看一点

import logging
from colorama import init, Fore, Style

init(autoreset=True)  # 必需:初始化 colorama
logging.basicConfig(
    level=logging.INFO,
    format=f'{Style.BRIGHT}%(asctime)s [%(levelname)s] - %(message)s',
    datefmt='%H:%M:%S',
    handlers=[logging.StreamHandler()]
)
# 修改日志颜色(精简版)
logging.addLevelName(logging.INFO, f"{Fore.CYAN}{Style.BRIGHT}{logging.getLevelName(logging.INFO)}")
logging.addLevelName(logging.WARNING, f"{Fore.YELLOW}{Style.BRIGHT}{logging.getLevelName(logging.WARNING)}")
logging.addLevelName(logging.ERROR, f"{Fore.RED}{Style.BRIGHT}{logging.getLevelName(logging.ERROR)}")

加了之后打印样式就会变成这样:

3、chat_req.py代码

我这里使用的是硅基流动的api,

注册地址:https://cloud.siliconflow.cn/i/hQjWsIw3


import time
from tools.config import logging
from tools.fetch import fetch
from pydantic import BaseModel
from fastapi import APIRouter


base_url = "https://api.siliconflow.cn/v1"

API_KEY = ""

MODEL = "deepseek-ai/DeepSeek-V3.2"

class ChatRequest(BaseModel):
    system: str = "你是一个有用的助手"
    content: str


router = APIRouter(tags=["Knowledge_RAG"])
@router.post(f"/chat_ai")
async def chat_ai(req:ChatRequest):
    headers = {
        "Authorization": f"Bearer {API_KEY}",
        "Content-Type": "application/json"
    }
    messages = [
            {"role": "system", "content": req.system},
            {"role": "user", "content": req.content}
        ]
    payload = {
        "model": MODEL,
        "messages": messages
    }
    start = time.time()
    AI_URL = base_url + "/chat/completions"
    response_data, status = await fetch(
        url=AI_URL,
        method="POST",
        headers=headers,
        json=payload
    )
    cost = time.time() - start
    logging.info(f"AI请求耗时: {cost:.3f} 秒")


    if status != 200:
        res = {
            "error": response_data
        }
    else:
        res = {
        "reply": response_data["choices"][0]["message"]["content"]
    }
    logging.info(res)
4、API_run.py对应代码
import fastapi_cdn_host
import uvicorn
from fastapi import FastAPI
from tools import chat_req

app = FastAPI(docs_url="/docs", redoc_url="/redoc")  # 离线下打开 /docs
fastapi_cdn_host.patch_docs(app)
app.include_router(chat_req.router, prefix="/api_pj")

if __name__ == "__main__":
    uvicorn.run(
        "API_run:app",
        host="0.0.0.0",
        port=6011
    )

FastApi运行

最后我们调试一下,使用这个链接打开:http://0.0.0.0:6000/docs

直接点击运行即可,成功了说明就通了,后续可以直接调用接口

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐