异步编程实验--文件解析
本文介绍了Python异步编程中的常见错误及最佳实践。通过文件上传模拟案例,对比了四种实现方式:1)错误同步调用导致协程未执行;2)顺序执行失去并发优势;3)并发执行但顺序打印;4)使用asyncio.as_completed()无法获取任务ID。推荐方案采用带标识的并发处理,通过返回ID实现任务追踪。最后对比了asyncio.run()和threading.Thread()的核心区别,指出asy
·
python异步编程实践 --文件解析
文章目录
异步编程常犯错误
情况1:异步函数但同步调用
async def simulate_upload_file(id):
print(f"Start uploading file {id}")
await asyncio.sleep(id)
print(f'finish uploading file {id}')
def process_files1(id_list):
for id in id_list:
simulate_upload_file(id)
print(f'文件{id}已经上传完成')
if __name__ == '__main__':
start_time = time.time()
id_list = [1, 2, 3, 2, 1]
process_files1(id_list)
end_time = time.time()
print(f'运行时间:{end_time - start_time}')
结果:
- upload_file(id)只是创建了一个协程对象,函数不会实际执行
- 所有print会立即连续打印
- 实际上没有任何文件被上传

情况2:正确使用异步循环
async def simulate_upload_file(id):
print(f"Start uploading file {id}")
await asyncio.sleep(id)
print(f'finish uploading file {id}')
async def process_files2(id_list):
for id in id_list:
await simulate_upload_file(id)
print(f'文件{id}已经上传完成')
if __name__ == '__main__':
start_time = time.time()
id_list = [1, 2, 3, 2, 1]
asyncio.run(process_files2(id_list))
end_time = time.time()
print(f'运行时间:{end_time - start_time}')
结果:
- 顺序执行,等上一个文件上传完成再处理下一个
- 打印顺序和上传顺序一致
- 缺点:没有发挥异步并发优势

情况3:并发执行但仍顺序打印
async def simulate_upload_file(id):
print(f"Start uploading file {id}")
await asyncio.sleep(id)
print(f'finish uploading file {id}')
async def process_files3(id_list):
tasks = []
for id in id_list:
# 创建任务但不立即等待
task = asyncio.create_task(simulate_upload_file(id))
tasks.append((id,task))
for id, task in tasks:
await task
print(f'文件{id}已经上传完成')
if __name__ == '__main__':
start_time = time.time()
id_list = [1, 2, 3, 2, 1]
asyncio.run(process_files3(id_list))
end_time = time.time()
print(f'运行时间:{end_time - start_time}')
结果:
- 所有上传并发开始
- 但打印顺序仍按tasks列表顺序
- 打印时机取决于每个任务的实际完成时间

情况4:使用asyncio.as_completed()
async def simulate_upload_file(id):
print(f"Start uploading file {id}")
await asyncio.sleep(id)
print(f'finish uploading file {id}')
async def process_files4(id_list):
tasks = [simulate_upload_file(id) for id in id_list]
for task in asyncio.as_completed(tasks):
# 这里的result结果为None
result = await task
# 但这里不知道是哪个id完成了!
print('一个文件上传完成')
if __name__ == '__main__':
start_time = time.time()
id_list = [1, 2, 3, 2, 1]
asyncio.run(process_files4(id_list))
end_time = time.time()
print(f'运行时间:{end_time - start_time}')

推荐方案:带标识的并发处理
async def simulate_upload_file(id):
print(f"Start uploading file {id}")
await asyncio.sleep(id)
print(f'finish uploading file {id}')
async def simulate_upload_file_with_id(id):
await simulate_upload_file(id)
return id
async def process_files5(id_list):
# 为每个任务保留id信息
tasks = [simulate_upload_file_with_id(id) for id in id_list]
# 使用asyncio.as_completed获取完成顺序
for task in asyncio.as_completed(tasks):
id = await task # upload_file_with_id返回id
print(f'文件{id}已上传完成')
if __name__ == '__main__':
start_time = time.time()
id_list = [1, 2, 3, 2, 1]
asyncio.run(process_files5(id_list))
end_time = time.time()
print(f'运行时间:{end_time - start_time}')

耗时操作带return的处理
async def simulate_upload_file_with_return(id):
print(f"Start uploading file {id}")
await asyncio.sleep(id)
print(f'finish uploading file {id}')
return id, id ** 2
async def process_files6(id_list):
tasks = [simulate_upload_file_with_return(id) for id in id_list]
for task in asyncio.as_completed(tasks):
id, result = await task
print(f'文件{id}已上传完成,结果为{result}')
if __name__ == '__main__':
start_time = time.time()
id_list = [1, 2, 3, 2, 1]
asyncio.run(process_files6(id_list))
end_time = time.time()
print(f'运行时间:{end_time - start_time}')

astncio.run()和threading.Thread()的区别
🎯 核心区别概览
| 特性 | asyncio.run() | threading.Thread() |
|---|---|---|
| 并发模型 | 单线程异步I/O | 多线程并行 |
| CPU核心 | 1个 | 多个(真并行) |
| 切换方式 | 协程切换(无系统开销) | 线程切换(有系统开销) |
| 适用场景 | I/O密集型 | CPU密集型 + I/O密集型 |
| 内存开销 | 小(共享内存) | 大(每个线程~8MB栈) |
| 共享数据 | 直接访问(无竞争) | 需线程同步(锁) |
| GIL影响 | 无(单线程) | 有(Python GIL限制) |
📖 详细对比分析
asyncio.run()- 异步事件循环
import asyncio
async def background_parse():
# 异步函数
await parse_file_async() # 遇到await时挂起,事件循环执行其他任务
print("解析完成")
# 运行异步程序
asyncio.run(background_parse())
工作过程:
主线程
↓
[事件循环启动] ← 单线程
↓
[任务1开始] → 遇到I/O → [挂起任务1]
↓ ↓
[执行任务2] ← 事件循环调度 ← 等待I/O完成
↓ ↓
[切换回任务1] ← I/O完成 ←
threading.Thread()- 多线程并行
import threading
def background_parse():
# 同步函数
parse_file_sync() # 阻塞当前线程
print("解析完成")
# 启动新线程
thread = threading.Thread(target=background_parse, daemon=True)
thread.start() # 创建新的系统线程
工作原理:
主线程 工作线程
↓ ↓
[创建线程] → [开始执行]
↓ ↓
[继续执行] [阻塞I/O] ← 线程被操作系统挂起
↓ ↓
[做其他事] [I/O完成]
↓ ↓
[继续执行]
无论在多少个函数/模块中导入threading,在同一个Python进程中,threading模块都是同一个单例对象。
# module_a.py
import threading
import time
def check_threading_in_a():
print(f"模块A中 threading 的 id: {id(threading)}")
print(f"模块A中 threading 的内存地址: {hex(id(threading))}")
# 获取主线程
main_thread = threading.main_thread()
print(f"模块A中主线程: {main_thread.name} (id: {main_thread.ident})")
return threading
# module_b.py
import threading
import time
def check_threading_in_b():
print(f"模块B中 threading 的 id: {id(threading)}")
print(f"模块B中 threading 的内存地址: {hex(id(threading))}")
# 获取当前活动线程数
active_count = threading.active_count()
print(f"模块B中活动线程数: {active_count}")
return threading
# 测试脚本
import module_a
import module_b
import threading # 再次导入
print("=== threading模块单例验证 ===")
# 从不同模块获取
threading_a = module_a.check_threading_in_a()
print()
threading_b = module_b.check_threading_in_b()
print()
# 直接导入
print(f"主模块中 threading 的 id: {id(threading)}")
print(f"主模块中 threading 的内存地址: {hex(id(threading))}")
# 比较是否是同一个对象
print("\n=== 比较结果 ===")
print(f"threading_a is threading_b: {threading_a is threading_b}")
print(f"threading_a is threading: {threading_a is threading}")
print(f"id相等: {id(threading_a) == id(threading_b) == id(threading)}")

使用场景
场景:批量下载文件
# 方案A: asyncio (异步I/O)
import asyncio
import aiohttp
async def download_file_async(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.read()
async def download_all_async(urls):
tasks = [download_file_async(url) for url in urls]
results = await asyncio.gather(*tasks) # 并发执行,单线程
return results
# 使用
asyncio.run(download_all_async(['url1', 'url2', 'url3']))
# ------------------------------------------------------
# 方案B: threading (多线程)
import threading
import requests
from concurrent.futures import ThreadPoolExecutor
def download_file_sync(url):
response = requests.get(url) # 阻塞调用
return response.content
def download_all_threads(urls):
with ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(download_file_sync, url) for url in urls]
results = [f.result() for f in futures] # 多线程并行
return results
# 使用
download_all_threads(['url1', 'url2', 'url3'])
📊 决策矩阵
| 条件 | 推荐方案 | 原因 |
|---|---|---|
| 90%+ I/O等待时间 | asyncio | 无线程开销,高并发 |
| 需要多核CPU并行 | threading | 绕过GIL限制 |
| 连接数 > 1000 | asyncio | 内存效率高 |
| 调用同步阻塞库 | threading | 简单直接 |
| 需要精确流量控制 | asyncio | 更好的控制粒度 |
| 简单的后台任务 | threading | 实现简单 |
| 实时Web应用 | asyncio | 低延迟,高并发 |
混合使用
# 结合两者优点
import asyncio
import concurrent.futures
import threading
async def hybrid_solution():
# I/O密集型:使用异步
async def fetch_data_async():
# 异步HTTP请求
async with aiohttp.ClientSession() as session:
async with session.get('https://api.example.com/data') as resp:
return await resp.json()
# CPU密集型:放到线程池
def process_data_cpu(data):
# CPU密集型计算
return expensive_computation(data)
# 1. 异步获取数据
data = await fetch_data_async()
# 2. 在线程池中处理CPU密集型任务
loop = asyncio.get_event_loop()
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as pool:
processed = await loop.run_in_executor(pool, process_data_cpu, data)
return processed
# 运行
result = asyncio.run(hybrid_solution())
🎯 总结
asyncio.run():
- 单线程 + 协程切换
- 适合I/O密集型、高并发
- 内存开销小,性能高
- 编程模型较复杂(async/await)
threading.Thread():
- 多线程真并行
- 适合CPU密集型 + 调用阻塞库
- 编程简单直观
- 内存开销大,有GIL限制
选择指南:
- Web服务、爬虫、高并发I/O → asyncio
- 数据处理、计算、调用同步库 → threading
- 混合场景 → asyncio主循环 + 线程池处理CPU任务
模拟文件上传过程
📁 项目结构
file_upload_example/
├── sync_api.py # 同步等待结果的接口
├── async_api.py # 异步立即返回通知的接口
├── file_parser.py # 模拟耗时解析器
└── client.py # 测试客户端
模拟的耗时解析器 (file_parser.py)
import time
import random
from datetime import datetime
import threading
from queue import Queue
from dataclasses import dataclass
from typing import Optional, Dict
@dataclass
class ParseResult:
"""解析结果"""
file_id: str
success: bool
result_data: Dict
error_message: Optional[str] = None
parse_time: float = 0.0
timestamp: str = ""
class FileParser:
"""模拟文件解析器"""
def __init__(self):
self.processing_queue = Queue()
self.results = {} # file_id -> ParseResult
self._stop = False
self._worker_thread = None
self._start_worker()
def _start_worker(self):
"""启动后台工作线程"""
def worker():
while not self._stop:
try:
# 从队列获取任务
file_id, simulate_time = self.processing_queue.get(timeout=1)
# 模拟耗时解析
print(f"[解析器] 开始解析文件 {file_id}, 预计耗时 {simulate_time}秒")
time.sleep(simulate_time) # 模拟解析时间
# 随机成功/失败
success = random.random() > 0.1 # 90%成功率
result = ParseResult(
file_id=file_id,
success=success,
result_data={
"file_size": random.randint(1024, 1024 * 1024),
"pages": random.randint(1, 100),
"entities_found": random.randint(0, 50)
} if success else {},
error_message="解析失败: 格式不支持" if not success else None,
parse_time=simulate_time,
timestamp=datetime.now().isoformat()
)
self.results[file_id] = result
print(f"[解析器] 文件 {file_id} 解析{'成功' if success else '失败'}, 耗时 {simulate_time}秒")
self.processing_queue.task_done()
except Exception as e:
if not self._stop:
continue
else:
print(f"[解析器] 错误: {e}")
self._worker_thread = threading.Thread(target=worker, daemon=True)
self._worker_thread.start()
print("[解析器] 后台解析线程已启动")
def parse_file(self, file_id: str, simulate_time: float = 5.0) -> str:
"""
提交文件解析请求
Args:
file_id: 文件uuid
simulate_time: 模拟解析耗时(秒)
Returns:
file_id: 文件唯一ID
"""
self.processing_queue.put((file_id, simulate_time))
print(f"[解析器] 文件 {file_id} 已加入解析队列, 队列长度: {self.processing_queue.qsize()}")
def get_result(self, file_id: str) -> Optional[ParseResult]:
"""获取解析结果"""
return self.results.get(file_id)
def stop(self):
"""停止解析器"""
self._stop = True
if self._worker_thread:
self._worker_thread.join(timeout=2)
同步等待结果的接口 (sync_api.py)
from fastapi import FastAPI, File, UploadFile, BackgroundTasks, Form
from fastapi.responses import JSONResponse
import time
from file_parser import FileParser
from pydantic import BaseModel
from typing import Optional
import uuid
from datetime import datetime
import asyncio
app = FastAPI(title="异步文件上传API")
# 全局解析器实例
parser = FileParser()
# 存储回调URL(模拟)
callback_store = {}
class AsyncUploadResponse(BaseModel):
"""异步上传响应"""
success: bool
file_id: str
message: str
status_url: str
estimated_time: float
queue_position: int
response_time: float # API响应时间
def process_callback(file_id: str, callback_url: Optional[str] = None):
"""
模拟解析完成后的回调
实际项目中可以:发消息、调用webhook、更新数据库等
"""
# 等待解析完成
max_wait = 300
start = time.time()
while time.time() - start < max_wait:
result = parser.get_result(file_id)
if result is not None:
# 模拟回调
print(f"[回调] 文件 {file_id} 解析完成,执行回调")
print(f" 结果: {'成功' if result.success else '失败'}")
print(f" 耗时: {result.parse_time}秒")
# 实际项目:发送HTTP请求到callback_url
if callback_url:
print(f" 将结果发送到: {callback_url}")
return
time.sleep(0.5)
print(f"[回调] 文件 {file_id} 解析超时")
@app.post("/upload/async", response_model=AsyncUploadResponse)
async def upload_file_async(
file: UploadFile = File(...),
callback_url: Optional[str] = None,
simulate_time: float = Form(1.0), # 模拟解析耗时参数,
background_tasks: BackgroundTasks = None
):
"""
异步上传接口:立即返回,后台处理
"""
start_time = time.time()
# 读取文件
try:
content = await file.read()
file_size = len(content)
print(f"[API-异步] 收到文件: {file.filename}, 大小: {file_size}字节")
except Exception as e:
return JSONResponse(
status_code=500,
content={"error": f"读取文件失败: {str(e)}"}
)
# 生成文件ID
file_id = str(uuid.uuid4())
# 提交到后台解析(不等待)
print(f"[API-异步] 提交文件 {file_id} 到后台解析,模拟时间: {simulate_time}秒")
# 这里可以保存回调信息
if callback_url:
callback_store[file_id] = callback_url
# 启动后台任务
if background_tasks:
background_tasks.add_task(
parser.parse_file, # 实际应该调用解析器
file_id,
simulate_time
)
# 如果是实际项目,这里会启动一个真正的后台任务
# 我们这里简单模拟
import threading
def background_parse(file_id):
time.sleep(0.1) # 模拟轻微延迟
parser.parse_file(file_id, simulate_time)
# 启动回调任务
callback_task = threading.Thread(
target=process_callback,
args=(file_id, callback_url),
daemon=True
)
callback_task.start()
threading.Thread(target=background_parse,args=(file_id,), daemon=True).start()
response_time = time.time() - start_time
print(f"[API-异步] 请求处理完成,响应时间: {response_time:.3f}秒")
return AsyncUploadResponse(
success=True,
file_id=file_id,
message="文件已接收,正在后台解析",
status_url=f"/status/{file_id}", # 状态查询地址
estimated_time=simulate_time,
queue_position=parser.processing_queue.qsize(),
response_time=response_time
)
@app.get("/status/{file_id}")
async def get_processing_status(file_id: str):
"""查询处理状态"""
result = parser.get_result(file_id)
if result is None:
# 检查是否在队列中
return {
"file_id": file_id,
"status": "queued",
"message": "文件在队列中等待解析",
"estimated_wait": parser.processing_queue.qsize() * 5, # 估计等待时间
"timestamp": datetime.now().isoformat()
}
return {
"file_id": file_id,
"status": "completed",
"success": result.success,
"data": result.result_data if result.success else {},
"error": result.error_message if not result.success else None,
"parse_time": result.parse_time,
"completion_time": result.timestamp
}
if __name__ == '__main__':
import uvicorn
uvicorn.run("async_api:app", host="127.0.0.1", port=8002, reload=True)
异步立即返回通知的接口 (async_api.py)
from fastapi import FastAPI, File, UploadFile, BackgroundTasks, Form
from fastapi.responses import JSONResponse
import time
from file_parser import FileParser
from pydantic import BaseModel
from typing import Optional
import uuid
from datetime import datetime
import asyncio
app = FastAPI(title="异步文件上传API")
# 全局解析器实例
parser = FileParser()
# 存储回调URL(模拟)
callback_store = {}
class AsyncUploadResponse(BaseModel):
"""异步上传响应"""
success: bool
file_id: str
message: str
status_url: str
estimated_time: float
queue_position: int
response_time: float # API响应时间
def process_callback(file_id: str, callback_url: Optional[str] = None):
"""
模拟解析完成后的回调
实际项目中可以:发消息、调用webhook、更新数据库等
"""
# 等待解析完成
max_wait = 300
start = time.time()
while time.time() - start < max_wait:
result = parser.get_result(file_id)
if result is not None:
# 模拟回调
print(f"[回调] 文件 {file_id} 解析完成,执行回调")
print(f" 结果: {'成功' if result.success else '失败'}")
print(f" 耗时: {result.parse_time}秒")
# 实际项目:发送HTTP请求到callback_url
if callback_url:
print(f" 将结果发送到: {callback_url}")
return
time.sleep(0.5)
print(f"[回调] 文件 {file_id} 解析超时")
@app.post("/upload/async", response_model=AsyncUploadResponse)
async def upload_file_async(
file: UploadFile = File(...),
callback_url: Optional[str] = None,
simulate_time: float = Form(1.0), # 模拟解析耗时参数,
background_tasks: BackgroundTasks = None
):
"""
异步上传接口:立即返回,后台处理
"""
start_time = time.time()
# 读取文件
try:
content = await file.read()
file_size = len(content)
print(f"[API-异步] 收到文件: {file.filename}, 大小: {file_size}字节")
except Exception as e:
return JSONResponse(
status_code=500,
content={"error": f"读取文件失败: {str(e)}"}
)
# 生成文件ID
file_id = str(uuid.uuid4())
# 提交到后台解析(不等待)
print(f"[API-异步] 提交文件 {file_id} 到后台解析,模拟时间: {simulate_time}秒")
# 这里可以保存回调信息
if callback_url:
callback_store[file_id] = callback_url
# 启动后台任务
if background_tasks:
background_tasks.add_task(
parser.parse_file, # 实际应该调用解析器
file_id,
simulate_time
)
# 如果是实际项目,这里会启动一个真正的后台任务
# 我们这里简单模拟
import threading
def background_parse(file_id):
time.sleep(0.1) # 模拟轻微延迟
parser.parse_file(file_id, simulate_time)
# 启动回调任务
callback_task = threading.Thread(
target=process_callback,
args=(file_id, callback_url),
daemon=True
)
callback_task.start()
threading.Thread(target=background_parse,args=(file_id,), daemon=True).start()
response_time = time.time() - start_time
print(f"[API-异步] 请求处理完成,响应时间: {response_time:.3f}秒")
return AsyncUploadResponse(
success=True,
file_id=file_id,
message="文件已接收,正在后台解析",
status_url=f"/status/{file_id}", # 状态查询地址
estimated_time=simulate_time,
queue_position=parser.processing_queue.qsize(),
response_time=response_time
)
@app.get("/status/{file_id}")
async def get_processing_status(file_id: str):
"""查询处理状态"""
result = parser.get_result(file_id)
if result is None:
# 检查是否在队列中
return {
"file_id": file_id,
"status": "queued",
"message": "文件在队列中等待解析",
"estimated_wait": parser.processing_queue.qsize() * 5, # 估计等待时间
"timestamp": datetime.now().isoformat()
}
return {
"file_id": file_id,
"status": "completed",
"success": result.success,
"data": result.result_data if result.success else {},
"error": result.error_message if not result.success else None,
"parse_time": result.parse_time,
"completion_time": result.timestamp
}
if __name__ == '__main__':
import uvicorn
uvicorn.run("async_api:app", host="127.0.0.1", port=8002, reload=True)
测试客户端 (client.py)
import requests
import time
import threading
import queue
from dataclasses import dataclass
from typing import List
@dataclass
class TestResult:
"""测试结果"""
client_id: int
api_type: str
success: bool
response_time: float
total_time: float = 0.0
error: str = None
def test_sync_api(client_id: int, results_queue: queue.Queue, simulate_time: float = 3.0):
"""测试同步API"""
start = time.time()
try:
# 模拟文件上传
files = {
'file': ('test.txt', b'fake file content' * 1000, 'text/plain')
}
data = {'simulate_time': simulate_time}
print(f"[客户端{client_id}] 开始调用同步API...")
response = requests.post(
'http://localhost:8001/upload/sync',
files=files,
data=data,
timeout=simulate_time + 30 # 设置较长超时
)
resp_time = time.time() - start
result = response.json()
print(f"[客户端{client_id}] 同步API响应: {result.get('message')}, "
f"耗时: {resp_time:.2f}秒")
results_queue.put(TestResult(
client_id=client_id,
api_type="sync",
success=result.get('success', False),
response_time=resp_time,
total_time=result.get('total_time', 0)
))
except Exception as e:
resp_time = time.time() - start
print(f"[客户端{client_id}] 同步API错误: {e}")
results_queue.put(TestResult(
client_id=client_id,
api_type="sync",
success=False,
response_time=resp_time,
error=str(e)
))
def test_async_api(client_id: int, results_queue: queue.Queue, simulate_time: float = 3.0):
"""测试异步API"""
start = time.time()
try:
# 模拟文件上传
files = {
'file': ('test.txt', b'fake file content' * 1000, 'text/plain')
}
data = {'simulate_time': simulate_time}
print(f"[客户端{client_id}] 开始调用异步API...")
response = requests.post(
'http://localhost:8002/upload/async', # 注意端口不同
files=files,
data=data,
timeout=10 # 短超时,因为应该立即返回
)
resp_time = time.time() - start
result = response.json()
print(f"[客户端{client_id}] 异步API立即返回: {result.get('message')}, "
f"API响应时间: {resp_time:.3f}秒, 文件ID: {result.get('file_id')}")
# 轮询检查结果
file_id = result.get('file_id')
if file_id:
poll_start = time.time()
max_poll = 300
while time.time() - poll_start < max_poll:
status_resp = requests.get(f'http://localhost:8002/status/{file_id}')
status = status_resp.json()
# print(f"[客户端{client_id}] 文件解析状态: {status.get('status')}")
if status.get('status') == 'completed':
poll_time = time.time() - poll_start
total_time = time.time() - start
print(f"[客户端{client_id}] 文件解析完成, "
f"轮询耗时: {poll_time:.2f}秒, 总耗时: {total_time:.2f}秒")
results_queue.put(TestResult(
client_id=client_id,
api_type="async",
success=status.get('success', False),
response_time=resp_time, # API响应时间
total_time=total_time # 包含轮询的总时间
))
return
time.sleep(1) # 每秒轮询一次
# 超时
results_queue.put(TestResult(
client_id=client_id,
api_type="async",
success=False,
response_time=resp_time,
error="轮询超时"
))
except Exception as e:
resp_time = time.time() - start
print(f"[客户端{client_id}] 异步API错误: {e}")
results_queue.put(TestResult(
client_id=client_id,
api_type="async",
success=False,
response_time=resp_time,
error=str(e)
))
def concurrent_test(num_clients: int = 5, api_type: str = "sync", simulate_time: float = 2.0):
"""并发测试"""
print(f"\n{'='*50}")
print(f"开始并发测试: {num_clients}个客户端, API类型: {api_type}")
print(f"{'='*50}")
results_queue = queue.Queue()
threads = []
start_time = time.time()
# 启动多个客户端线程
for i in range(num_clients):
if api_type == "sync":
thread = threading.Thread(
target=test_sync_api,
args=(i, results_queue, simulate_time)
)
else:
thread = threading.Thread(
target=test_async_api,
args=(i, results_queue, simulate_time)
)
thread.start()
threads.append(thread)
time.sleep(0.1) # 稍微错开启动时间
# 等待所有线程完成
for thread in threads:
thread.join()
total_test_time = time.time() - start_time
# 收集结果
results = []
while not results_queue.empty():
results.append(results_queue.get())
# 输出统计
print(f"\n{'='*50}")
print(f"测试完成统计:")
print(f"API类型: {api_type}")
print(f"客户端数量: {num_clients}")
print(f"模拟解析时间: {simulate_time}秒")
print(f"总测试时间: {total_test_time:.2f}秒")
if results:
success_count = sum(1 for r in results if r.success)
avg_response = sum(r.response_time for r in results) / len(results)
avg_total = sum(r.total_time for r in results if r.total_time > 0) / max(1, sum(1 for r in results if r.total_time > 0))
print(f"成功请求: {success_count}/{len(results)}")
print(f"平均API响应时间: {avg_response:.3f}秒")
if api_type == "async":
print(f"平均总处理时间(含轮询): {avg_total:.2f}秒")
print(f"{'='*50}")
return results
if __name__ == "__main__":
# 测试同步API
concurrent_test(num_clients=10, api_type="sync", simulate_time=3.0)
# 测试异步API
concurrent_test(num_clients=10, api_type="async", simulate_time=3.0)


执行分析:
-
同步API:
- 所有客户端同时发送请求,等待API响应
- FastAPI根据请求到达顺序依次处理
- 单个请求处理完成后(包括获取解析完成的结果),返回结果给客户端
- 实际parse解析器对列里只有一个任务
- 所以处理时间约为 解析时间*客户端数量
-
异步API:
- 所有客户端同时发送请求,等待API响应
- FastAPI根据请求到达顺序依次处理(一旦有等待的任务就挂起,处理下一个)
- 单个请求处理完成(只包含上传给解析器的操作),返回上传成功结果给客户端
- 解析器对列此时存着所有客户端上传的文件,解析器依次解析,解析成功的放入结果列表中
- 客户端接收到上传成功结果后,开始轮询API获取结果(即去解析器结果列表中获取结果)
-
为什么异步API结果反而比同步API更慢?
- 这是因为异步API的解析器对列是单线程的,同一时间只能处理一个任务(是因为我们在async_api前实例化了一个解析器对象)。
- 正常的场景下,我们程序调用后台的一个解析器接口,这个接口是多线程的,可以同时处理多个任务。
我们尝试按照这个逻辑,将解析器对列改为多线程,现在假设客户端有10个,解析器有3个线程。
from fastapi import FastAPI, File, UploadFile, BackgroundTasks, Form
from fastapi.responses import JSONResponse
import time
from file_parser import FileParser
from pydantic import BaseModel
from typing import Optional
import uuid
from datetime import datetime
import asyncio
import threading
from queue import Queue
import concurrent.futures
app = FastAPI(title="异步文件上传API")
# 创建共享队列
processing_queue = Queue()
# 创建多个解析器实例
num_parsers = 3
parsers = [FileParser() for _ in range(num_parsers)]
# 存储回调URL(模拟)
callback_store = {}
class AsyncUploadResponse(BaseModel):
"""异步上传响应"""
success: bool
file_id: str
message: str
status_url: str
estimated_time: float
queue_position: int
response_time: float # API响应时间
def worker(parser_instance, worker_id):
"""
工作线程函数,每个解析器实例作为一个工作线程
"""
print(f"[Worker-{worker_id}] 启动...")
while True:
if processing_queue.empty(): # 队列为空时,等待下一个任务
continue
try:
# 从队列获取任务,阻塞等待
task_info = processing_queue.get(timeout=1)
if task_info is None: # 用于停止线程
break
file_id, simulate_time = task_info
print(f"[Worker-{worker_id}] 开始处理文件 {file_id}")
# 使用解析器实例处理文件
parser_instance.parse_file(file_id, simulate_time)
print(f"[Worker-{worker_id}] 完成处理文件 {file_id}")
processing_queue.task_done()
except Exception as e:
print(f"[Worker-{worker_id}] 处理任务时出错: {e}")
if 'task_info' in locals():
processing_queue.task_done()
continue
# 启动多个工作线程
workers = []
for i in range(num_parsers):
worker_thread = threading.Thread(
target=worker,
args=(parsers[i], i),
daemon=True
)
worker_thread.start()
workers.append(worker_thread)
def process_callback(file_id: str, callback_url: Optional[str] = None):
"""
模拟解析完成后的回调
实际项目中可以:发消息、调用webhook、更新数据库等
"""
# 等待解析完成
max_wait = 300
start = time.time()
while time.time() - start < max_wait:
# 检查任一解析器是否有结果
result = None
for parser in parsers:
temp_result = parser.get_result(file_id)
if temp_result is not None:
result = temp_result
break
if result is not None:
# 模拟回调
print(f"[回调] 文件 {file_id} 解析完成,执行回调")
print(f" 结果: {'成功' if result.success else '失败'}")
print(f" 耗时: {result.parse_time}秒")
# 实际项目:发送HTTP请求到callback_url
if callback_url:
print(f" 将结果发送到: {callback_url}")
return
time.sleep(0.5)
print(f"[回调] 文件 {file_id} 解析超时")
@app.post("/upload/async", response_model=AsyncUploadResponse)
async def upload_file_async(
file: UploadFile = File(...),
callback_url: Optional[str] = None,
simulate_time: float = Form(1.0), # 模拟解析耗时参数,
background_tasks: BackgroundTasks = None
):
"""
异步上传接口:立即返回,后台处理
"""
start_time = time.time()
# 读取文件
try:
content = await file.read()
file_size = len(content)
print(f"[API-异步] 收到文件: {file.filename}, 大小: {file_size}字节")
except Exception as e:
return JSONResponse(
status_code=500,
content={"error": f"读取文件失败: {str(e)}"}
)
# 生成文件ID
file_id = str(uuid.uuid4())
# 提交到共享队列(供多个解析器消费)
print(f"[API-异步] 提交文件 {file_id} 到处理队列,模拟时间: {simulate_time}秒")
processing_queue.put((file_id, simulate_time))
# 这里可以保存回调信息
if callback_url:
callback_store[file_id] = callback_url
response_time = time.time() - start_time
print(f"[API-异步] 请求处理完成,响应时间: {response_time:.3f}秒")
return AsyncUploadResponse(
success=True,
file_id=file_id,
message="文件已接收,已加入处理队列",
status_url=f"/status/{file_id}", # 状态查询地址
estimated_time=simulate_time,
queue_position=processing_queue.qsize(), # 队列长度
response_time=response_time
)
@app.get("/status/{file_id}")
async def get_processing_status(file_id: str):
"""查询处理状态"""
# 检查任一解析器是否有结果
result = None
for parser in parsers:
temp_result = parser.get_result(file_id)
if temp_result is not None:
result = temp_result
break
if result is None:
# 检查是否在队列中
is_in_queue = False
with processing_queue.mutex: # 锁定队列检查
is_in_queue = any(task[0] == file_id for task in list(processing_queue.queue))
if is_in_queue:
return {
"file_id": file_id,
"status": "queued",
"message": "文件在队列中等待解析",
"estimated_wait": processing_queue.qsize() * 5, # 估计等待时间
"timestamp": datetime.now().isoformat()
}
else:
return {
"file_id": file_id,
"status": "unknown",
"message": "文件未找到或尚未提交",
"timestamp": datetime.now().isoformat()
}
return {
"file_id": file_id,
"status": "completed",
"success": result.success,
"data": result.result_data if result.success else {},
"error": result.error_message if not result.success else None,
"parse_time": result.parse_time,
"completion_time": result.timestamp
}
@app.get("/queue/status")
async def get_queue_status():
"""获取队列状态"""
return {
"queue_size": processing_queue.qsize(),
"active_workers": num_parsers,
"timestamp": datetime.now().isoformat()
}
if __name__ == '__main__':
import uvicorn
uvicorn.run("async_api:app", host="127.0.0.1", port=8002, reload=True)

可以看到解析速度明显比之前快了很多。
更多推荐
所有评论(0)