python异步编程实践 --文件解析

异步编程常犯错误

情况1:异步函数但同步调用

async def simulate_upload_file(id):
    print(f"Start uploading file {id}")
    await asyncio.sleep(id)
    print(f'finish uploading file {id}')

def process_files1(id_list):
    for id in id_list:
        simulate_upload_file(id)
        print(f'文件{id}已经上传完成')

if __name__ == '__main__':
    start_time = time.time()
    id_list = [1, 2, 3, 2, 1]
    process_files1(id_list)
    end_time = time.time()
    print(f'运行时间:{end_time - start_time}')

结果:

  • upload_file(id)只是创建了一个协程对象,函数不会实际执行
  • 所有print会立即连续打印
  • 实际上没有任何文件被上传

在这里插入图片描述

情况2:正确使用异步循环

async def simulate_upload_file(id):
    print(f"Start uploading file {id}")
    await asyncio.sleep(id)
    print(f'finish uploading file {id}')

async def process_files2(id_list):
    for id in id_list:
        await simulate_upload_file(id)
        print(f'文件{id}已经上传完成')

if __name__ == '__main__':
    start_time = time.time()
    id_list = [1, 2, 3, 2, 1]
    asyncio.run(process_files2(id_list))
    end_time = time.time()
    print(f'运行时间:{end_time - start_time}')

结果:

  • 顺序执行,等上一个文件上传完成再处理下一个
  • 打印顺序和上传顺序一致
  • 缺点:没有发挥异步并发优势

在这里插入图片描述

情况3:并发执行但仍顺序打印

async def simulate_upload_file(id):
    print(f"Start uploading file {id}")
    await asyncio.sleep(id)
    print(f'finish uploading file {id}')

async def process_files3(id_list):
    tasks = []
    for id in id_list:
        # 创建任务但不立即等待
        task = asyncio.create_task(simulate_upload_file(id))
        tasks.append((id,task))

    for id, task in tasks:
        await task
        print(f'文件{id}已经上传完成')

if __name__ == '__main__':
    start_time = time.time()
    id_list = [1, 2, 3, 2, 1]
    asyncio.run(process_files3(id_list))
    end_time = time.time()
    print(f'运行时间:{end_time - start_time}')

结果:

  • 所有上传并发开始
  • 但打印顺序仍按tasks列表顺序
  • 打印时机取决于每个任务的实际完成时间

在这里插入图片描述

情况4:使用asyncio.as_completed()

async def simulate_upload_file(id):
    print(f"Start uploading file {id}")
    await asyncio.sleep(id)
    print(f'finish uploading file {id}')

async def process_files4(id_list):
    tasks = [simulate_upload_file(id) for id in id_list]
    
    for task in asyncio.as_completed(tasks):
        # 这里的result结果为None
        result = await task
        # 但这里不知道是哪个id完成了!
        print('一个文件上传完成')

if __name__ == '__main__':
    start_time = time.time()
    id_list = [1, 2, 3, 2, 1]
    asyncio.run(process_files4(id_list))
    end_time = time.time()
    print(f'运行时间:{end_time - start_time}')

在这里插入图片描述

推荐方案:带标识的并发处理

async def simulate_upload_file(id):
    print(f"Start uploading file {id}")
    await asyncio.sleep(id)
    print(f'finish uploading file {id}')

async def simulate_upload_file_with_id(id):
    await simulate_upload_file(id)
    return id

async def process_files5(id_list):
    # 为每个任务保留id信息
    tasks = [simulate_upload_file_with_id(id) for id in id_list]
    
    # 使用asyncio.as_completed获取完成顺序
    for task in asyncio.as_completed(tasks):
        id = await task  # upload_file_with_id返回id
        print(f'文件{id}已上传完成')

if __name__ == '__main__':
    start_time = time.time()
    id_list = [1, 2, 3, 2, 1]
    asyncio.run(process_files5(id_list))
    end_time = time.time()
    print(f'运行时间:{end_time - start_time}')

在这里插入图片描述

耗时操作带return的处理

async def simulate_upload_file_with_return(id):
    print(f"Start uploading file {id}")
    await asyncio.sleep(id)
    print(f'finish uploading file {id}')
    return id, id ** 2

async def process_files6(id_list):
    tasks = [simulate_upload_file_with_return(id) for id in id_list]
    for task in asyncio.as_completed(tasks):
        id, result = await task
        print(f'文件{id}已上传完成,结果为{result}')



if __name__ == '__main__':
    start_time = time.time()
    id_list = [1, 2, 3, 2, 1]
    asyncio.run(process_files6(id_list))
    end_time = time.time()
    print(f'运行时间:{end_time - start_time}')

在这里插入图片描述

astncio.run()和threading.Thread()的区别

🎯 核心区别概览

特性 asyncio.run() threading.Thread()
并发模型​ 单线程异步I/O 多线程并行
CPU核心​ 1个 多个(真并行)
切换方式​ 协程切换(无系统开销) 线程切换(有系统开销)
适用场景​ I/O密集型 CPU密集型 + I/O密集型
内存开销​ 小(共享内存) 大(每个线程~8MB栈)
共享数据​ 直接访问(无竞争) 需线程同步(锁)
GIL影响​ 无(单线程) 有(Python GIL限制)

📖 详细对比分析

asyncio.run()- 异步事件循环

import asyncio

async def background_parse():
    # 异步函数
    await parse_file_async()  # 遇到await时挂起,事件循环执行其他任务
    print("解析完成")

# 运行异步程序
asyncio.run(background_parse())

工作过程:

主线程
    ↓
[事件循环启动] ← 单线程
    ↓
[任务1开始] → 遇到I/O → [挂起任务1]
    ↓                     ↓
[执行任务2] ← 事件循环调度 ← 等待I/O完成
    ↓                     ↓
[切换回任务1] ← I/O完成 ←

threading.Thread()- 多线程并行

import threading

def background_parse():
    # 同步函数
    parse_file_sync()  # 阻塞当前线程
    print("解析完成")

# 启动新线程
thread = threading.Thread(target=background_parse, daemon=True)
thread.start()  # 创建新的系统线程

工作原理:

主线程        工作线程
    ↓            ↓
[创建线程] → [开始执行]
    ↓            ↓
[继续执行]   [阻塞I/O] ← 线程被操作系统挂起
    ↓            ↓
[做其他事]   [I/O完成]
    ↓            ↓
             [继续执行]

无论在多少个函数/模块中导入threading,在同一个Python进程中,threading模块都是同一个单例对象。

# module_a.py
import threading
import time

def check_threading_in_a():
    print(f"模块A中 threading 的 id: {id(threading)}")
    print(f"模块A中 threading 的内存地址: {hex(id(threading))}")
    
    # 获取主线程
    main_thread = threading.main_thread()
    print(f"模块A中主线程: {main_thread.name} (id: {main_thread.ident})")
    
    return threading

# module_b.py  
import threading
import time

def check_threading_in_b():
    print(f"模块B中 threading 的 id: {id(threading)}")
    print(f"模块B中 threading 的内存地址: {hex(id(threading))}")
    
    # 获取当前活动线程数
    active_count = threading.active_count()
    print(f"模块B中活动线程数: {active_count}")
    
    return threading

# 测试脚本
import module_a
import module_b
import threading  # 再次导入

print("=== threading模块单例验证 ===")

# 从不同模块获取
threading_a = module_a.check_threading_in_a()
print()
threading_b = module_b.check_threading_in_b()
print()

# 直接导入
print(f"主模块中 threading 的 id: {id(threading)}")
print(f"主模块中 threading 的内存地址: {hex(id(threading))}")

# 比较是否是同一个对象
print("\n=== 比较结果 ===")
print(f"threading_a is threading_b: {threading_a is threading_b}")
print(f"threading_a is threading: {threading_a is threading}")
print(f"id相等: {id(threading_a) == id(threading_b) == id(threading)}")

在这里插入图片描述

使用场景

场景:批量下载文件

# 方案A: asyncio (异步I/O)
import asyncio
import aiohttp

async def download_file_async(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.read()

async def download_all_async(urls):
    tasks = [download_file_async(url) for url in urls]
    results = await asyncio.gather(*tasks)  # 并发执行,单线程
    return results

# 使用
asyncio.run(download_all_async(['url1', 'url2', 'url3']))

# ------------------------------------------------------

# 方案B: threading (多线程)
import threading
import requests
from concurrent.futures import ThreadPoolExecutor

def download_file_sync(url):
    response = requests.get(url)  # 阻塞调用
    return response.content

def download_all_threads(urls):
    with ThreadPoolExecutor(max_workers=5) as executor:
        futures = [executor.submit(download_file_sync, url) for url in urls]
        results = [f.result() for f in futures]  # 多线程并行
    return results

# 使用
download_all_threads(['url1', 'url2', 'url3'])

📊 决策矩阵

条件 推荐方案 原因
90%+ I/O等待时间 asyncio​ 无线程开销,高并发
需要多核CPU并行 threading​ 绕过GIL限制
连接数 > 1000 asyncio​ 内存效率高
调用同步阻塞库 threading​ 简单直接
需要精确流量控制 asyncio 更好的控制粒度
简单的后台任务 threading 实现简单
实时Web应用 asyncio 低延迟,高并发

混合使用

# 结合两者优点
import asyncio
import concurrent.futures
import threading

async def hybrid_solution():
    # I/O密集型:使用异步
    async def fetch_data_async():
        # 异步HTTP请求
        async with aiohttp.ClientSession() as session:
            async with session.get('https://api.example.com/data') as resp:
                return await resp.json()
    
    # CPU密集型:放到线程池
    def process_data_cpu(data):
        # CPU密集型计算
        return expensive_computation(data)
    
    # 1. 异步获取数据
    data = await fetch_data_async()
    
    # 2. 在线程池中处理CPU密集型任务
    loop = asyncio.get_event_loop()
    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as pool:
        processed = await loop.run_in_executor(pool, process_data_cpu, data)
    
    return processed

# 运行
result = asyncio.run(hybrid_solution())

🎯 总结

asyncio.run():

  • 单线程 + 协程切换
  • 适合I/O密集型、高并发
  • 内存开销小,性能高
  • 编程模型较复杂(async/await)

threading.Thread():

  • 多线程真并行
  • 适合CPU密集型​ + 调用阻塞库
  • 编程简单直观
  • 内存开销大,有GIL限制

选择指南:

  • Web服务、爬虫、高并发I/O → asyncio
  • 数据处理、计算、调用同步库 → threading
  • 混合场景 → asyncio主循环 + 线程池处理CPU任务

模拟文件上传过程

📁 项目结构

file_upload_example/
├── sync_api.py      # 同步等待结果的接口
├── async_api.py     # 异步立即返回通知的接口
├── file_parser.py   # 模拟耗时解析器
└── client.py        # 测试客户端

模拟的耗时解析器​ (file_parser.py)

import time
import random
from datetime import datetime
import threading
from queue import Queue
from dataclasses import dataclass
from typing import Optional, Dict

@dataclass
class ParseResult:
    """解析结果"""
    file_id: str
    success: bool
    result_data: Dict
    error_message: Optional[str] = None
    parse_time: float = 0.0
    timestamp: str = ""

class FileParser:
    """模拟文件解析器"""
    
    def __init__(self):
        self.processing_queue = Queue()
        self.results = {}  # file_id -> ParseResult
        self._stop = False
        self._worker_thread = None
        self._start_worker()
    
    def _start_worker(self):
        """启动后台工作线程"""
        def worker():
            while not self._stop:
                try:
                    # 从队列获取任务
                    file_id, simulate_time = self.processing_queue.get(timeout=1)
                    
                    # 模拟耗时解析
                    print(f"[解析器] 开始解析文件 {file_id}, 预计耗时 {simulate_time}秒")
                    time.sleep(simulate_time)  # 模拟解析时间
                    
                    # 随机成功/失败
                    success = random.random() > 0.1  # 90%成功率
                    
                    result = ParseResult(
                        file_id=file_id,
                        success=success,
                        result_data={
                            "file_size": random.randint(1024, 1024 * 1024),
                            "pages": random.randint(1, 100),
                            "entities_found": random.randint(0, 50)
                        } if success else {},
                        error_message="解析失败: 格式不支持" if not success else None,
                        parse_time=simulate_time,
                        timestamp=datetime.now().isoformat()
                    )
                    
                    self.results[file_id] = result
                    print(f"[解析器] 文件 {file_id} 解析{'成功' if success else '失败'}, 耗时 {simulate_time}秒")
                    
                    self.processing_queue.task_done()
                    
                except Exception as e:
                    if not self._stop:
                        continue
                    else:
                        print(f"[解析器] 错误: {e}")
                    
        
        self._worker_thread = threading.Thread(target=worker, daemon=True)
        self._worker_thread.start()
        print("[解析器] 后台解析线程已启动")
    
    def parse_file(self, file_id: str, simulate_time: float = 5.0) -> str:
        """
        提交文件解析请求
        Args:
            file_id: 文件uuid
            simulate_time: 模拟解析耗时(秒)
        Returns:
            file_id: 文件唯一ID
        """
        self.processing_queue.put((file_id, simulate_time))
        print(f"[解析器] 文件 {file_id} 已加入解析队列, 队列长度: {self.processing_queue.qsize()}")
    
    def get_result(self, file_id: str) -> Optional[ParseResult]:
        """获取解析结果"""
        return self.results.get(file_id)
    
    def stop(self):
        """停止解析器"""
        self._stop = True
        if self._worker_thread:
            self._worker_thread.join(timeout=2)

同步等待结果的接口 (sync_api.py)

from fastapi import FastAPI, File, UploadFile, BackgroundTasks, Form
from fastapi.responses import JSONResponse
import time
from file_parser import FileParser
from pydantic import BaseModel
from typing import Optional
import uuid
from datetime import datetime
import asyncio

app = FastAPI(title="异步文件上传API")

# 全局解析器实例
parser = FileParser()

# 存储回调URL(模拟)
callback_store = {}

class AsyncUploadResponse(BaseModel):
    """异步上传响应"""
    success: bool
    file_id: str
    message: str
    status_url: str
    estimated_time: float
    queue_position: int
    response_time: float  # API响应时间

def process_callback(file_id: str, callback_url: Optional[str] = None):
    """
    模拟解析完成后的回调
    实际项目中可以:发消息、调用webhook、更新数据库等
    """
    # 等待解析完成
    max_wait = 300
    start = time.time()
    
    while time.time() - start < max_wait:
        result = parser.get_result(file_id)
        if result is not None:
            # 模拟回调
            print(f"[回调] 文件 {file_id} 解析完成,执行回调")
            print(f"    结果: {'成功' if result.success else '失败'}")
            print(f"    耗时: {result.parse_time}秒")
            
            # 实际项目:发送HTTP请求到callback_url
            if callback_url:
                print(f"    将结果发送到: {callback_url}")
            return
        time.sleep(0.5)
    
    print(f"[回调] 文件 {file_id} 解析超时")

@app.post("/upload/async", response_model=AsyncUploadResponse)
async def upload_file_async(
    file: UploadFile = File(...),
    callback_url: Optional[str] = None,
    simulate_time: float = Form(1.0),  # 模拟解析耗时参数,
    background_tasks: BackgroundTasks = None
):
    """
    异步上传接口:立即返回,后台处理
    """
    start_time = time.time()
    
    # 读取文件
    try:
        content = await file.read()
        file_size = len(content)
        print(f"[API-异步] 收到文件: {file.filename}, 大小: {file_size}字节")
    except Exception as e:
        return JSONResponse(
            status_code=500,
            content={"error": f"读取文件失败: {str(e)}"}
        )
    
    # 生成文件ID
    file_id = str(uuid.uuid4())
    
    # 提交到后台解析(不等待)
    print(f"[API-异步] 提交文件 {file_id} 到后台解析,模拟时间: {simulate_time}秒")
    
    # 这里可以保存回调信息
    if callback_url:
        callback_store[file_id] = callback_url
    
    # 启动后台任务
    if background_tasks:
        background_tasks.add_task(
            parser.parse_file,  # 实际应该调用解析器
            file_id, 
            simulate_time
        )
        
        # 如果是实际项目,这里会启动一个真正的后台任务
        # 我们这里简单模拟
        import threading
        def background_parse(file_id):
            time.sleep(0.1)  # 模拟轻微延迟
            parser.parse_file(file_id, simulate_time)
            # 启动回调任务
            callback_task = threading.Thread(
                target=process_callback,
                args=(file_id, callback_url),
                daemon=True
            )
            callback_task.start()
        
        threading.Thread(target=background_parse,args=(file_id,), daemon=True).start()
    
    response_time = time.time() - start_time
    print(f"[API-异步] 请求处理完成,响应时间: {response_time:.3f}秒")
    
    return AsyncUploadResponse(
        success=True,
        file_id=file_id,
        message="文件已接收,正在后台解析",
        status_url=f"/status/{file_id}",  # 状态查询地址
        estimated_time=simulate_time,
        queue_position=parser.processing_queue.qsize(),
        response_time=response_time
    )

@app.get("/status/{file_id}")
async def get_processing_status(file_id: str):
    """查询处理状态"""
    result = parser.get_result(file_id)
    if result is None:
        # 检查是否在队列中
        return {
            "file_id": file_id,
            "status": "queued",
            "message": "文件在队列中等待解析",
            "estimated_wait": parser.processing_queue.qsize() * 5,  # 估计等待时间
            "timestamp": datetime.now().isoformat()
        }
    
    return {
        "file_id": file_id,
        "status": "completed",
        "success": result.success,
        "data": result.result_data if result.success else {},
        "error": result.error_message if not result.success else None,
        "parse_time": result.parse_time,
        "completion_time": result.timestamp
    }

if __name__ == '__main__':

    import uvicorn
    uvicorn.run("async_api:app", host="127.0.0.1", port=8002, reload=True)

异步立即返回通知的接口​ (async_api.py)

from fastapi import FastAPI, File, UploadFile, BackgroundTasks, Form
from fastapi.responses import JSONResponse
import time
from file_parser import FileParser
from pydantic import BaseModel
from typing import Optional
import uuid
from datetime import datetime
import asyncio

app = FastAPI(title="异步文件上传API")

# 全局解析器实例
parser = FileParser()

# 存储回调URL(模拟)
callback_store = {}

class AsyncUploadResponse(BaseModel):
    """异步上传响应"""
    success: bool
    file_id: str
    message: str
    status_url: str
    estimated_time: float
    queue_position: int
    response_time: float  # API响应时间

def process_callback(file_id: str, callback_url: Optional[str] = None):
    """
    模拟解析完成后的回调
    实际项目中可以:发消息、调用webhook、更新数据库等
    """
    # 等待解析完成
    max_wait = 300
    start = time.time()
    
    while time.time() - start < max_wait:
        result = parser.get_result(file_id)
        if result is not None:
            # 模拟回调
            print(f"[回调] 文件 {file_id} 解析完成,执行回调")
            print(f"    结果: {'成功' if result.success else '失败'}")
            print(f"    耗时: {result.parse_time}秒")
            
            # 实际项目:发送HTTP请求到callback_url
            if callback_url:
                print(f"    将结果发送到: {callback_url}")
            return
        time.sleep(0.5)
    
    print(f"[回调] 文件 {file_id} 解析超时")

@app.post("/upload/async", response_model=AsyncUploadResponse)
async def upload_file_async(
    file: UploadFile = File(...),
    callback_url: Optional[str] = None,
    simulate_time: float = Form(1.0),  # 模拟解析耗时参数,
    background_tasks: BackgroundTasks = None
):
    """
    异步上传接口:立即返回,后台处理
    """
    start_time = time.time()
    
    # 读取文件
    try:
        content = await file.read()
        file_size = len(content)
        print(f"[API-异步] 收到文件: {file.filename}, 大小: {file_size}字节")
    except Exception as e:
        return JSONResponse(
            status_code=500,
            content={"error": f"读取文件失败: {str(e)}"}
        )
    
    # 生成文件ID
    file_id = str(uuid.uuid4())
    
    # 提交到后台解析(不等待)
    print(f"[API-异步] 提交文件 {file_id} 到后台解析,模拟时间: {simulate_time}秒")
    
    # 这里可以保存回调信息
    if callback_url:
        callback_store[file_id] = callback_url
    
    # 启动后台任务
    if background_tasks:
        background_tasks.add_task(
            parser.parse_file,  # 实际应该调用解析器
            file_id, 
            simulate_time
        )
        
        # 如果是实际项目,这里会启动一个真正的后台任务
        # 我们这里简单模拟
        import threading
        def background_parse(file_id):
            time.sleep(0.1)  # 模拟轻微延迟
            parser.parse_file(file_id, simulate_time)
            # 启动回调任务
            callback_task = threading.Thread(
                target=process_callback,
                args=(file_id, callback_url),
                daemon=True
            )
            callback_task.start()
        
        threading.Thread(target=background_parse,args=(file_id,), daemon=True).start()
    
    response_time = time.time() - start_time
    print(f"[API-异步] 请求处理完成,响应时间: {response_time:.3f}秒")
    
    return AsyncUploadResponse(
        success=True,
        file_id=file_id,
        message="文件已接收,正在后台解析",
        status_url=f"/status/{file_id}",  # 状态查询地址
        estimated_time=simulate_time,
        queue_position=parser.processing_queue.qsize(),
        response_time=response_time
    )

@app.get("/status/{file_id}")
async def get_processing_status(file_id: str):
    """查询处理状态"""
    result = parser.get_result(file_id)
    if result is None:
        # 检查是否在队列中
        return {
            "file_id": file_id,
            "status": "queued",
            "message": "文件在队列中等待解析",
            "estimated_wait": parser.processing_queue.qsize() * 5,  # 估计等待时间
            "timestamp": datetime.now().isoformat()
        }
    
    return {
        "file_id": file_id,
        "status": "completed",
        "success": result.success,
        "data": result.result_data if result.success else {},
        "error": result.error_message if not result.success else None,
        "parse_time": result.parse_time,
        "completion_time": result.timestamp
    }

if __name__ == '__main__':

    import uvicorn
    uvicorn.run("async_api:app", host="127.0.0.1", port=8002, reload=True)

测试客户端​ (client.py)

import requests
import time
import threading
import queue
from dataclasses import dataclass
from typing import List

@dataclass
class TestResult:
    """测试结果"""
    client_id: int
    api_type: str
    success: bool
    response_time: float
    total_time: float = 0.0
    error: str = None

def test_sync_api(client_id: int, results_queue: queue.Queue, simulate_time: float = 3.0):
    """测试同步API"""
    start = time.time()
    try:
        # 模拟文件上传
        files = {
            'file': ('test.txt', b'fake file content' * 1000, 'text/plain')
        }
        data = {'simulate_time': simulate_time}
        
        print(f"[客户端{client_id}] 开始调用同步API...")
        response = requests.post(
            'http://localhost:8001/upload/sync',
            files=files,
            data=data,
            timeout=simulate_time + 30  # 设置较长超时
        )
        
        resp_time = time.time() - start
        result = response.json()
        
        print(f"[客户端{client_id}] 同步API响应: {result.get('message')}, "
              f"耗时: {resp_time:.2f}秒")
        
        results_queue.put(TestResult(
            client_id=client_id,
            api_type="sync",
            success=result.get('success', False),
            response_time=resp_time,
            total_time=result.get('total_time', 0)
        ))
        
    except Exception as e:
        resp_time = time.time() - start
        print(f"[客户端{client_id}] 同步API错误: {e}")
        results_queue.put(TestResult(
            client_id=client_id,
            api_type="sync",
            success=False,
            response_time=resp_time,
            error=str(e)
        ))

def test_async_api(client_id: int, results_queue: queue.Queue, simulate_time: float = 3.0):
    """测试异步API"""
    start = time.time()
    try:
        # 模拟文件上传
        files = {
            'file': ('test.txt', b'fake file content' * 1000, 'text/plain')
        }
        data = {'simulate_time': simulate_time}
        
        print(f"[客户端{client_id}] 开始调用异步API...")
        response = requests.post(
            'http://localhost:8002/upload/async',  # 注意端口不同
            files=files,
            data=data,
            timeout=10  # 短超时,因为应该立即返回
        )
        
        resp_time = time.time() - start
        result = response.json()
        
        print(f"[客户端{client_id}] 异步API立即返回: {result.get('message')}, "
              f"API响应时间: {resp_time:.3f}秒, 文件ID: {result.get('file_id')}")
        
        # 轮询检查结果
        file_id = result.get('file_id')
        if file_id:
            poll_start = time.time()
            max_poll = 300
            while time.time() - poll_start < max_poll:
                status_resp = requests.get(f'http://localhost:8002/status/{file_id}')
                status = status_resp.json()
                # print(f"[客户端{client_id}] 文件解析状态: {status.get('status')}")

                if status.get('status') == 'completed':
                    poll_time = time.time() - poll_start
                    total_time = time.time() - start
                    print(f"[客户端{client_id}] 文件解析完成, "
                          f"轮询耗时: {poll_time:.2f}秒, 总耗时: {total_time:.2f}秒")
                    
                    results_queue.put(TestResult(
                        client_id=client_id,
                        api_type="async",
                        success=status.get('success', False),
                        response_time=resp_time,  # API响应时间
                        total_time=total_time  # 包含轮询的总时间
                    ))
                    return
                
                time.sleep(1)  # 每秒轮询一次
        
        # 超时
        results_queue.put(TestResult(
            client_id=client_id,
            api_type="async",
            success=False,
            response_time=resp_time,
            error="轮询超时"
        ))
        
    except Exception as e:
        resp_time = time.time() - start
        print(f"[客户端{client_id}] 异步API错误: {e}")
        results_queue.put(TestResult(
            client_id=client_id,
            api_type="async",
            success=False,
            response_time=resp_time,
            error=str(e)
        ))

def concurrent_test(num_clients: int = 5, api_type: str = "sync", simulate_time: float = 2.0):
    """并发测试"""
    print(f"\n{'='*50}")
    print(f"开始并发测试: {num_clients}个客户端, API类型: {api_type}")
    print(f"{'='*50}")
    
    results_queue = queue.Queue()
    threads = []
    start_time = time.time()
    
    # 启动多个客户端线程
    for i in range(num_clients):
        if api_type == "sync":
            thread = threading.Thread(
                target=test_sync_api,
                args=(i, results_queue, simulate_time)
            )
        else:
            thread = threading.Thread(
                target=test_async_api,
                args=(i, results_queue, simulate_time)
            )
        
        thread.start()
        threads.append(thread)
        time.sleep(0.1)  # 稍微错开启动时间
    
    # 等待所有线程完成
    for thread in threads:
        thread.join()
    
    total_test_time = time.time() - start_time
    
    # 收集结果
    results = []
    while not results_queue.empty():
        results.append(results_queue.get())
    
    # 输出统计
    print(f"\n{'='*50}")
    print(f"测试完成统计:")
    print(f"API类型: {api_type}")
    print(f"客户端数量: {num_clients}")
    print(f"模拟解析时间: {simulate_time}秒")
    print(f"总测试时间: {total_test_time:.2f}秒")
    
    if results:
        success_count = sum(1 for r in results if r.success)
        avg_response = sum(r.response_time for r in results) / len(results)
        avg_total = sum(r.total_time for r in results if r.total_time > 0) / max(1, sum(1 for r in results if r.total_time > 0))
        
        print(f"成功请求: {success_count}/{len(results)}")
        print(f"平均API响应时间: {avg_response:.3f}秒")
        if api_type == "async":
            print(f"平均总处理时间(含轮询): {avg_total:.2f}秒")
        print(f"{'='*50}")
    
    return results

if __name__ == "__main__":
    # 测试同步API
    concurrent_test(num_clients=10, api_type="sync", simulate_time=3.0)
    
    # 测试异步API
    concurrent_test(num_clients=10, api_type="async", simulate_time=3.0)

在这里插入图片描述

在这里插入图片描述

执行分析:

  • 同步API:

    • 所有客户端同时发送请求,等待API响应
    • FastAPI根据请求到达顺序依次处理
      • 单个请求处理完成后(包括获取解析完成的结果),返回结果给客户端
      • 实际parse解析器对列里只有一个任务
    • 所以处理时间约为 解析时间*客户端数量
  • 异步API:

    • 所有客户端同时发送请求,等待API响应
    • FastAPI根据请求到达顺序依次处理(一旦有等待的任务就挂起,处理下一个)
      • 单个请求处理完成(只包含上传给解析器的操作),返回上传成功结果给客户端
    • 解析器对列此时存着所有客户端上传的文件,解析器依次解析,解析成功的放入结果列表中
    • 客户端接收到上传成功结果后,开始轮询API获取结果(即去解析器结果列表中获取结果)
  • 为什么异步API结果反而比同步API更慢?

    • 这是因为异步API的解析器对列是单线程的,同一时间只能处理一个任务(是因为我们在async_api前实例化了一个解析器对象)。
    • 正常的场景下,我们程序调用后台的一个解析器接口,这个接口是多线程的,可以同时处理多个任务。

我们尝试按照这个逻辑,将解析器对列改为多线程,现在假设客户端有10个,解析器有3个线程。

from fastapi import FastAPI, File, UploadFile, BackgroundTasks, Form
from fastapi.responses import JSONResponse
import time
from file_parser import FileParser
from pydantic import BaseModel
from typing import Optional
import uuid
from datetime import datetime
import asyncio
import threading
from queue import Queue
import concurrent.futures

app = FastAPI(title="异步文件上传API")

# 创建共享队列
processing_queue = Queue()
# 创建多个解析器实例
num_parsers = 3
parsers = [FileParser() for _ in range(num_parsers)]

# 存储回调URL(模拟)
callback_store = {}

class AsyncUploadResponse(BaseModel):
    """异步上传响应"""
    success: bool
    file_id: str
    message: str
    status_url: str
    estimated_time: float
    queue_position: int
    response_time: float  # API响应时间

def worker(parser_instance, worker_id):
    """
    工作线程函数,每个解析器实例作为一个工作线程
    """
    print(f"[Worker-{worker_id}] 启动...")
    while True:
        if processing_queue.empty():  # 队列为空时,等待下一个任务
            continue
        try:
            # 从队列获取任务,阻塞等待
            task_info = processing_queue.get(timeout=1)
            if task_info is None:  # 用于停止线程
                break
                
            file_id, simulate_time = task_info
            print(f"[Worker-{worker_id}] 开始处理文件 {file_id}")
            
            # 使用解析器实例处理文件
            parser_instance.parse_file(file_id, simulate_time)
            
            print(f"[Worker-{worker_id}] 完成处理文件 {file_id}")
            processing_queue.task_done()
            
        except Exception as e:
            print(f"[Worker-{worker_id}] 处理任务时出错: {e}")
            if 'task_info' in locals():
                processing_queue.task_done()
            continue

# 启动多个工作线程
workers = []
for i in range(num_parsers):
    worker_thread = threading.Thread(
        target=worker, 
        args=(parsers[i], i),
        daemon=True
    )
    worker_thread.start()
    workers.append(worker_thread)

def process_callback(file_id: str, callback_url: Optional[str] = None):
    """
    模拟解析完成后的回调
    实际项目中可以:发消息、调用webhook、更新数据库等
    """
    # 等待解析完成
    max_wait = 300
    start = time.time()
    
    while time.time() - start < max_wait:
        # 检查任一解析器是否有结果
        result = None
        for parser in parsers:
            temp_result = parser.get_result(file_id)
            if temp_result is not None:
                result = temp_result
                break
        
        if result is not None:
            # 模拟回调
            print(f"[回调] 文件 {file_id} 解析完成,执行回调")
            print(f"    结果: {'成功' if result.success else '失败'}")
            print(f"    耗时: {result.parse_time}秒")
            
            # 实际项目:发送HTTP请求到callback_url
            if callback_url:
                print(f"    将结果发送到: {callback_url}")
            return
        time.sleep(0.5)
    
    print(f"[回调] 文件 {file_id} 解析超时")

@app.post("/upload/async", response_model=AsyncUploadResponse)
async def upload_file_async(
    file: UploadFile = File(...),
    callback_url: Optional[str] = None,
    simulate_time: float = Form(1.0),  # 模拟解析耗时参数,
    background_tasks: BackgroundTasks = None
):
    """
    异步上传接口:立即返回,后台处理
    """
    start_time = time.time()
    
    # 读取文件
    try:
        content = await file.read()
        file_size = len(content)
        print(f"[API-异步] 收到文件: {file.filename}, 大小: {file_size}字节")
    except Exception as e:
        return JSONResponse(
            status_code=500,
            content={"error": f"读取文件失败: {str(e)}"}
        )
    
    # 生成文件ID
    file_id = str(uuid.uuid4())
    
    # 提交到共享队列(供多个解析器消费)
    print(f"[API-异步] 提交文件 {file_id} 到处理队列,模拟时间: {simulate_time}秒")
    processing_queue.put((file_id, simulate_time))
    
    # 这里可以保存回调信息
    if callback_url:
        callback_store[file_id] = callback_url
    
    response_time = time.time() - start_time
    print(f"[API-异步] 请求处理完成,响应时间: {response_time:.3f}秒")
    
    return AsyncUploadResponse(
        success=True,
        file_id=file_id,
        message="文件已接收,已加入处理队列",
        status_url=f"/status/{file_id}",  # 状态查询地址
        estimated_time=simulate_time,
        queue_position=processing_queue.qsize(),  # 队列长度
        response_time=response_time
    )

@app.get("/status/{file_id}")
async def get_processing_status(file_id: str):
    """查询处理状态"""
    # 检查任一解析器是否有结果
    result = None
    for parser in parsers:
        temp_result = parser.get_result(file_id)
        if temp_result is not None:
            result = temp_result
            break
            
    if result is None:
        # 检查是否在队列中
        is_in_queue = False
        with processing_queue.mutex:  # 锁定队列检查
            is_in_queue = any(task[0] == file_id for task in list(processing_queue.queue))
        
        if is_in_queue:
            return {
                "file_id": file_id,
                "status": "queued",
                "message": "文件在队列中等待解析",
                "estimated_wait": processing_queue.qsize() * 5,  # 估计等待时间
                "timestamp": datetime.now().isoformat()
            }
        else:
            return {
                "file_id": file_id,
                "status": "unknown",
                "message": "文件未找到或尚未提交",
                "timestamp": datetime.now().isoformat()
            }
    
    return {
        "file_id": file_id,
        "status": "completed",
        "success": result.success,
        "data": result.result_data if result.success else {},
        "error": result.error_message if not result.success else None,
        "parse_time": result.parse_time,
        "completion_time": result.timestamp
    }

@app.get("/queue/status")
async def get_queue_status():
    """获取队列状态"""
    return {
        "queue_size": processing_queue.qsize(),
        "active_workers": num_parsers,
        "timestamp": datetime.now().isoformat()
    }

if __name__ == '__main__':
    import uvicorn
    uvicorn.run("async_api:app", host="127.0.0.1", port=8002, reload=True)

在这里插入图片描述

可以看到解析速度明显比之前快了很多。

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐