fastapi-批量操作 vs 并发操作------多线程 vs 多进程
并发操作= 多个任务同时执行,而不是按顺序一个接一个执行串行执行(非并发):任务1 ────→ 任务2 ────→ 任务3 ────→0秒 5秒 10秒 15秒总耗时:15秒并发执行:任务1 ────→任务2 ────→任务3 ────→0秒 5秒总耗时:5秒(快3倍!# 当前你的代码(已经很好)try:# ✅ 已经使用了批量操作# ✅ 已经使用了并发操作(ThreadPoolExecutor,
·
📚 批量操作 vs 并发操作
🎯 1. 批量操作(Batch Operations)
什么是批量操作?
批量操作 = 把多个数据库操作打包成一次提交,减少数据库交互次数
单个操作:
插入1条 → 提交 → 插入1条 → 提交 → 插入1条 → 提交
↓ ↓ ↓ ↓ ↓ ↓
数据库 数据库 数据库 数据库 数据库 数据库
(6次交互)
批量操作:
插入1条 + 插入1条 + 插入1条 → 一次性提交
↓
数据库
(1次交互)
性能对比示例
import time
# ❌ 慢方式:逐个插入(单个操作)
start = time.time()
for i in range(1000):
resource = ExpiringResource(
resource_id=f"res_{i}",
resource_name=f"资源{i}",
cloud_provider="huawei"
)
db.add(resource)
db.commit() # ⚠️ 每次都提交,1000次提交
print(f"逐个插入耗时: {time.time() - start:.2f}秒")
# 输出:逐个插入耗时: 15.32秒
# ✅ 快方式1:批量插入(循环+单次提交)
start = time.time()
for i in range(1000):
resource = ExpiringResource(
resource_id=f"res_{i}",
resource_name=f"资源{i}",
cloud_provider="huawei"
)
db.add(resource)
db.commit() # ✅ 只提交一次
print(f"批量插入耗时: {time.time() - start:.2f}秒")
# 输出:批量插入耗时: 2.45秒(快6倍!)
# ✅ 快方式2:bulk_insert_mappings(最快)
start = time.time()
data_list = [
{
"resource_id": f"res_{i}",
"resource_name": f"资源{i}",
"cloud_provider": "huawei"
}
for i in range(1000)
]
db.bulk_insert_mappings(ExpiringResource, data_list)
db.commit()
print(f"bulk_insert耗时: {time.time() - start:.2f}秒")
# 输出:bulk_insert耗时: 0.85秒(快18倍!)
批量操作的方法
1. bulk_insert_mappings - 批量插入
# 准备数据(字典列表)
resources_data = [
{"resource_id": "res_1", "resource_name": "资源1", "cloud_provider": "huawei"},
{"resource_id": "res_2", "resource_name": "资源2", "cloud_provider": "huawei"},
{"resource_id": "res_3", "resource_name": "资源3", "cloud_provider": "huawei"},
# ... 1000条数据
]
# ⭐ 批量插入
db.bulk_insert_mappings(ExpiringResource, resources_data)
db.commit()
# 相当于一次性执行:
# INSERT INTO expiring_resources (resource_id, resource_name, cloud_provider)
# VALUES ('res_1', '资源1', 'huawei'),
# ('res_2', '资源2', 'huawei'),
# ('res_3', '资源3', 'huawei');
2. bulk_update_mappings - 批量更新
# 批量更新
updates = [
{"id": 1, "status": "active", "updated_at": datetime.now()},
{"id": 2, "status": "active", "updated_at": datetime.now()},
{"id": 3, "status": "inactive", "updated_at": datetime.now()},
]
db.bulk_update_mappings(ExpiringResource, updates)
db.commit()
3. 批量删除
# 批量删除
db.query(ExpiringResource).filter(
ExpiringResource.cloud_provider == "huawei",
ExpiringResource.is_active == False
).delete()
db.commit()
# 一次性删除所有符合条件的记录
你的项目中的批量操作
# 在你的 main.py 中(华为云同步)
def background_sync(sync_log_id: int, expire_days: int):
db_thread = SessionLocal()
try:
# 获取华为云资源(假设有1000条)
hw_resources = huawei_client.get_all_resources()
# ⭐ 批量插入(你已经在用了!)
db_thread.bulk_insert_mappings(
ExpiringResource,
hw_resources # 1000条数据一次性插入
)
db_thread.commit()
logger.info(f"✅ 批量插入完成: {len(hw_resources)}条资源")
except Exception as e:
db_thread.rollback()
logger.error(f"批量操作失败: {e}")
finally:
db_thread.close()
🎯 2. 并发操作(Concurrent Operations)
什么是并发操作?
并发操作 = 多个任务同时执行,而不是按顺序一个接一个执行
串行执行(非并发):
任务1 ────→ 任务2 ────→ 任务3 ────→
0秒 5秒 10秒 15秒
总耗时:15秒
并发执行:
任务1 ────→
任务2 ────→
任务3 ────→
0秒 5秒
总耗时:5秒(快3倍!)
并发操作的方式
1. 多线程(Threading) - 适合IO密集型任务
import threading
# ❌ 串行执行(慢)
def sync_serial():
# 查询华为云ECS标签(假设需要5秒)
ecs_tags = fetch_ecs_tags()
# 查询华为云RDS标签(假设需要5秒)
rds_tags = fetch_rds_tags()
# 查询华为云ELB标签(假设需要5秒)
elb_tags = fetch_elb_tags()
# 总耗时:5 + 5 + 5 = 15秒
# ✅ 并发执行(快)
def sync_concurrent():
threads = []
# 创建3个线程,同时查询
t1 = threading.Thread(target=fetch_ecs_tags)
t2 = threading.Thread(target=fetch_rds_tags)
t3 = threading.Thread(target=fetch_elb_tags)
threads = [t1, t2, t3]
# 启动所有线程
for t in threads:
t.start()
# 等待所有线程完成
for t in threads:
t.join()
# 总耗时:约5秒(3个任务同时进行)
sync_concurrent()
2. 线程池(ThreadPoolExecutor) - 更优雅的方式
from concurrent.futures import ThreadPoolExecutor, as_completed
# ✅ 使用线程池
def fetch_tags_concurrent(resource_ids):
results = []
# 创建线程池(5个工作线程)
with ThreadPoolExecutor(max_workers=5) as executor:
# 提交所有任务
future_to_resource = {
executor.submit(fetch_resource_tags, res_id): res_id
for res_id in resource_ids
}
# 获取结果(哪个先完成就先处理哪个)
for future in as_completed(future_to_resource):
resource_id = future_to_resource[future]
try:
tags = future.result()
results.append(tags)
except Exception as e:
logger.error(f"查询失败 {resource_id}: {e}")
return results
# 1000个资源ID
resource_ids = [f"res_{i}" for i in range(1000)]
# 串行:每个5秒 × 1000 = 5000秒(83分钟)
# 并发(5个线程):5秒 × (1000/5) = 1000秒(16分钟)
3. 多进程(Multiprocessing) - 适合CPU密集型任务
from multiprocessing import Pool
# CPU密集型任务(大量计算)
def process_resource_data(data):
# 复杂的数据处理(假设需要大量计算)
result = complex_calculation(data)
return result
# ✅ 使用多进程
if __name__ == '__main__':
data_list = [...] # 1000条数据
# 创建进程池(4个进程,利用4个CPU核心)
with Pool(processes=4) as pool:
results = pool.map(process_resource_data, data_list)
# 4个进程并行处理,速度快4倍
你的项目中的并发操作
# 在你的 huawei_bss_client.py 中
def _batch_query_tags(self, resources: List[Dict]) -> Dict:
"""批量查询资源标签(并发执行)"""
tags_dict = {}
# ⭐ 使用线程池并发查询标签
with ThreadPoolExecutor(max_workers=5) as executor: # 5个线程同时工作
future_to_resource = {
executor.submit(
self._query_single_resource_tags,
resource['id'],
resource['provider'],
resource['type']
): resource
for resource in resources
}
# 处理完成的任务
for future in as_completed(future_to_resource):
resource = future_to_resource[future]
try:
tags = future.result() # 获取查询结果
tags_dict[resource['id']] = tags
except Exception as e:
logger.error(f"查询失败: {resource['id']}")
return tags_dict
🎯 3. 批量操作 vs 并发操作 - 关键区别
| 维度 | 批量操作 | 并发操作 |
|---|---|---|
| 定义 | 多个操作打包成一次提交 | 多个任务同时执行 |
| 目的 | 减少数据库交互次数 | 减少总执行时间 |
| 应用场景 | 数据库插入/更新 | API调用、文件IO |
| 技术手段 | bulk_insert_mappings |
多线程、多进程 |
| 提速原理 | 减少网络开销和事务开销 | 利用等待时间做其他事 |
| 资源消耗 | 单线程,内存占用稍高 | 多线程/进程,CPU/内存都增加 |
类比理解
批量操作:
就像去超市购物
❌ 买一件结账一次(单个操作)
→ 排队10次,很慢
✅ 买完所有东西一起结账(批量操作)
→ 排队1次,很快
并发操作:
就像餐厅做菜
❌ 厨师做完一道菜再做下一道(串行)
→ 炒菜 → 煮汤 → 烤肉(3小时)
✅ 3个厨师同时做(并发)
→ 炒菜 | 煮汤 | 烤肉(1小时)
🎯 4. 两者结合 - 最佳实践
场景:华为云同步1000个资源
def sync_huawei_resources_optimized(expire_days: int):
"""优化后的华为云同步(批量+并发)"""
# 步骤1: 获取资源列表(API调用)
hw_client = HuaweiBSSClient()
resources = hw_client.list_resources() # 1000个资源
logger.info(f"📊 获取到 {len(resources)} 个资源")
# 步骤2: 并发查询标签(多线程)
logger.info("🔄 开始并发查询标签...")
all_tags = {}
with ThreadPoolExecutor(max_workers=5) as executor: # ⭐ 并发操作
future_to_resource = {
executor.submit(hw_client.get_resource_tags, res['id']): res
for res in resources
}
for future in as_completed(future_to_resource):
resource = future_to_resource[future]
try:
tags = future.result()
all_tags[resource['id']] = tags
except Exception as e:
logger.error(f"标签查询失败: {resource['id']}")
logger.info(f"✅ 标签查询完成,共 {len(all_tags)} 个")
# 步骤3: 准备数据
resources_data = []
for res in resources:
resources_data.append({
"resource_id": res['id'],
"resource_name": res['name'],
"cloud_provider": "huawei",
"tags": all_tags.get(res['id'], {}),
"created_at": datetime.now()
})
# 步骤4: 批量插入数据库(批量操作)
logger.info("💾 开始批量保存到数据库...")
db = SessionLocal()
try:
db.bulk_insert_mappings(ExpiringResource, resources_data) # ⭐ 批量操作
db.commit()
logger.info(f"✅ 批量保存完成: {len(resources_data)} 条")
except Exception as e:
db.rollback()
logger.error(f"批量保存失败: {e}")
raise
finally:
db.close()
# 性能对比:
# ❌ 串行 + 单个插入:5秒×1000 + 0.015秒×1000 = 5015秒(83分钟)
# ✅ 并发 + 批量插入:5秒×(1000/5) + 0.85秒 = 1000.85秒(16分钟)
# 提速:5倍!
🎯 5. 实际应用中的注意事项
批量操作的限制
# ⚠️ 批量操作有大小限制
# MySQL的max_allowed_packet通常是64MB
# ❌ 一次插入10万条(可能超限)
db.bulk_insert_mappings(ExpiringResource, huge_data_list) # 可能失败
# ✅ 分批插入(每批1000条)
batch_size = 1000
for i in range(0, len(huge_data_list), batch_size):
batch = huge_data_list[i:i+batch_size]
db.bulk_insert_mappings(ExpiringResource, batch)
db.commit()
logger.info(f"已保存 {i+len(batch)}/{len(huge_data_list)}")
并发操作的限制
# ⚠️ 线程数不是越多越好
# ❌ 线程过多(资源浪费)
with ThreadPoolExecutor(max_workers=1000) as executor: # 太多了!
# 1000个线程竞争资源,反而变慢
# ✅ 合理的线程数
# IO密集型任务:max_workers = CPU核心数 × (1 + IO等待时间/CPU计算时间)
# 经验值:5-20个线程
with ThreadPoolExecutor(max_workers=5) as executor: # 合理
# 5个线程足够,不会浪费资源
数据库连接池与并发
# 你的配置
pool_size=30 # 常驻连接
max_overflow=70 # 临时连接
# 总共100个连接
# 场景分析:
# 1. 正常API请求:10个连接
# 2. 华为云同步线程(5个):每个线程需要1个连接 = 5个连接
# 3. 批量插入:1个连接(因为是在单个Session中)
# 总计:10 + 5 + 1 = 16个连接(远低于30,很安全)
# ⚠️ 如果线程数过多
with ThreadPoolExecutor(max_workers=50) as executor:
# 50个线程同时查询标签
# 每个线程可能需要1个数据库连接
# 50个连接(可能接近pool_size,需要注意)
🎯 6. 性能优化总结
你的项目优化建议
# 当前你的代码(已经很好)
def background_sync(sync_log_id: int, expire_days: int):
db_thread = SessionLocal()
try:
# ✅ 已经使用了批量操作
db_thread.bulk_insert_mappings(ExpiringResource, resources_data)
db_thread.commit()
# ✅ 已经使用了并发操作(ThreadPoolExecutor, max_workers=5)
# 在 huawei_bss_client.py 的 _batch_query_tags 中
finally:
db_thread.close()
# 可以进一步优化的点:
# 1. 分批插入(如果资源超过5000条)
# 2. 添加进度反馈
# 3. 错误重试机制
完整优化示例
def background_sync_optimized(sync_log_id: int, expire_days: int):
"""优化后的后台同步"""
db_thread = SessionLocal()
try:
log = db_thread.query(ResourceSyncLog).filter(
ResourceSyncLog.id == sync_log_id
).first()
# 1. 获取资源(API调用)
hw_client = HuaweiBSSClient()
resources = hw_client.list_resources()
total = len(resources)
logger.info(f"📊 共 {total} 个资源需要同步")
# 2. 并发查询标签
logger.info("🔄 并发查询标签(5个线程)...")
tags_dict = hw_client._batch_query_tags(resources) # ⭐ 并发
# 3. 准备数据
resources_data = prepare_data(resources, tags_dict)
# 4. 分批插入(每批1000条)⭐ 批量
batch_size = 1000
for i in range(0, len(resources_data), batch_size):
batch = resources_data[i:i+batch_size]
# 批量插入
db_thread.bulk_insert_mappings(ExpiringResource, batch)
db_thread.commit()
# 更新进度
progress = (i + len(batch)) / total * 100
logger.info(f"💾 保存进度: {i+len(batch)}/{total} ({progress:.1f}%)")
# 5. 更新同步日志
log.status = SyncStatusEnum.SUCCESS.value
log.total_resources = total
log.end_time = datetime.now()
db_thread.commit()
logger.info(f"✅ 同步完成!共 {total} 个资源")
except Exception as e:
logger.error(f"❌ 同步失败: {e}")
log.status = SyncStatusEnum.FAILED.value
log.error_message = str(e)
db_thread.commit()
raise
finally:
db_thread.close()
📊 总结对照表
| 特性 | 批量操作 | 并发操作 | 组合使用 |
|---|---|---|---|
| 速度提升 | 5-20倍 | 3-10倍 | 15-100倍 |
| 适用场景 | 数据库写入 | API调用、IO操作 | 复杂任务 |
| 代码复杂度 | 简单 | 中等 | 中等 |
| 资源消耗 | 低 | 中-高 | 中-高 |
| 你的项目 | ✅ 已使用 | ✅ 已使用 | ✅ 已完美结合 |
你的华为云同步代码已经同时使用了批量操作和并发操作,这是非常正确的做法!🎉
- 并发操作:在
huawei_bss_client.py的_batch_query_tags中使用ThreadPoolExecutor(max_workers=5) - 批量操作:在
main.py的background_sync中使用bulk_insert_mappings
这就是为什么你的同步速度能够处理1000+资源的原因!👍
太好了!这是并发编程中最重要的概念之一。让我详细对比多线程和多进程的区别。
🔥 多线程 vs 多进程 - 完整对比
🎯 1. 核心概念
什么是进程(Process)?
进程 = 一个正在运行的程序,是操作系统资源分配的基本单位
进程的特点:
├─ 独立的内存空间
├─ 独立的资源(文件句柄、网络连接等)
├─ 至少包含一个线程(主线程)
└─ 进程之间完全隔离
类比: 进程就像独立的房子
- 每个房子有自己的厨房、卧室、客厅
- 房子之间互不干扰
- 邻居家的火灾不会烧到你家
什么是线程(Thread)?
线程 = 进程内的执行单元,是CPU调度的基本单位
线程的特点:
├─ 共享进程的内存空间
├─ 共享进程的资源
├─ 多个线程可以同时存在于一个进程中
└─ 线程之间可以直接通信
类比: 线程就像房子里的房间
- 多个房间共享厨房、客厅
- 房间之间可以方便地交流
- 一个房间着火会影响整个房子
🎯 2. 图形化对比
进程模型
操作系统
├─ 进程A [内存空间A] 🏠
│ ├─ 线程1
│ ├─ 线程2
│ └─ 资源A(独立)
│
├─ 进程B [内存空间B] 🏠
│ ├─ 线程1
│ ├─ 线程2
│ └─ 资源B(独立)
│
└─ 进程C [内存空间C] 🏠
├─ 线程1
└─ 资源C(独立)
⭐ 进程之间完全隔离,内存独立
线程模型
进程A [共享内存空间] 🏠
├─ 线程1 👤(访问共享内存)
├─ 线程2 👤(访问共享内存)
├─ 线程3 👤(访问共享内存)
└─ 共享资源(所有线程都能访问)
├─ 全局变量
├─ 堆内存
└─ 文件句柄
⭐ 线程之间共享内存,可以直接访问相同数据
🎯 3. 详细对比表
| 维度 | 多线程(Threading) | 多进程(Multiprocessing) |
|---|---|---|
| 内存空间 | ✅ 共享(所有线程共用一块内存) | ❌ 独立(每个进程有自己的内存) |
| 创建速度 | ⚡ 快(毫秒级) | 🐌 慢(需要复制内存,秒级) |
| 内存开销 | 💚 小(几MB) | 💔 大(几十到几百MB) |
| 通信方式 | ✅ 简单(直接访问共享变量) | ❌ 复杂(需要IPC:管道、队列) |
| 数据共享 | ✅ 天然共享 | ❌ 需要显式传递 |
| 安全性 | ⚠️ 低(线程崩溃会导致整个进程崩溃) | ✅ 高(进程崩溃不影响其他进程) |
| 适用场景 | 🌐 IO密集型(网络、文件、数据库) | 🔢 CPU密集型(计算、数据处理) |
| Python GIL | ❌ 受限(同一时刻只能一个线程执行Python代码) | ✅ 无限制(每个进程有独立的GIL) |
| 并行性 | ❌ 伪并行(CPU计算不能真正并行) | ✅ 真并行(可以利用多核CPU) |
| 调试难度 | 😰 难(竞态条件、死锁) | 😊 相对容易(进程隔离) |
🎯 4. Python的特殊情况 - GIL(全局解释器锁)
什么是GIL?
GIL = Global Interpreter Lock(全局解释器锁)
Python的GIL规则:
在同一时刻,只允许一个线程执行Python字节码
即使你有8核CPU,多线程也只能用到1个核心(对于CPU密集型任务)
GIL的影响
import time
import threading
from multiprocessing import Process
# CPU密集型任务(大量计算)
def cpu_intensive_task():
"""计算密集型任务"""
total = 0
for i in range(10_000_000):
total += i * i
return total
# 测试1: 单线程
start = time.time()
cpu_intensive_task()
cpu_intensive_task()
print(f"单线程耗时: {time.time() - start:.2f}秒")
# 输出:单线程耗时: 3.50秒
# 测试2: 多线程(2个线程)
start = time.time()
t1 = threading.Thread(target=cpu_intensive_task)
t2 = threading.Thread(target=cpu_intensive_task)
t1.start()
t2.start()
t1.join()
t2.join()
print(f"多线程耗时: {time.time() - start:.2f}秒")
# 输出:多线程耗时: 3.55秒 ❌ 没有加速!(因为GIL)
# 测试3: 多进程(2个进程)
start = time.time()
p1 = Process(target=cpu_intensive_task)
p2 = Process(target=cpu_intensive_task)
p1.start()
p2.start()
p1.join()
p2.join()
print(f"多进程耗时: {time.time() - start:.2f}秒")
# 输出:多进程耗时: 1.80秒 ✅ 快了近2倍!(绕过GIL)
GIL对不同任务的影响
CPU密集型任务(大量计算):
├─ 多线程:❌ 无效(GIL限制)
│ → 1核CPU → 速度:1x
│
└─ 多进程:✅ 有效(绕过GIL)
→ 4核CPU → 速度:4x
IO密集型任务(网络、文件、数据库):
├─ 多线程:✅ 有效(IO等待时释放GIL)
│ → 线程在等待IO时,其他线程可以执行
│ → 速度:5-10x
│
└─ 多进程:✅ 有效(但开销大,不推荐)
→ 创建进程慢,内存占用大
→ 速度:5-10x(但代价更高)
🎯 5. 实际代码对比
场景1: IO密集型任务(网络请求)
import time
import requests
import threading
from multiprocessing import Process
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
# 模拟网络请求
def fetch_url(url):
"""IO密集型:网络请求"""
response = requests.get(url)
return len(response.content)
urls = [
"https://www.baidu.com",
"https://www.taobao.com",
"https://www.jd.com",
"https://www.qq.com",
"https://www.sina.com.cn"
]
# ❌ 串行执行(慢)
start = time.time()
for url in urls:
fetch_url(url)
print(f"串行耗时: {time.time() - start:.2f}秒")
# 输出:串行耗时: 5.20秒
# ✅ 多线程(推荐!)
start = time.time()
with ThreadPoolExecutor(max_workers=5) as executor:
executor.map(fetch_url, urls)
print(f"多线程耗时: {time.time() - start:.2f}秒")
# 输出:多线程耗时: 1.05秒 ⚡ 快5倍!
# ⚠️ 多进程(不推荐,开销大)
start = time.time()
with ProcessPoolExecutor(max_workers=5) as executor:
executor.map(fetch_url, urls)
print(f"多进程耗时: {time.time() - start:.2f}秒")
# 输出:多进程耗时: 2.30秒(包含进程创建开销)
结论: IO密集型任务 → 用多线程
场景2: CPU密集型任务(数据处理)
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
# 模拟CPU密集型任务
def process_data(n):
"""CPU密集型:大量计算"""
total = 0
for i in range(n):
total += i * i * i
return total
data_list = [5_000_000] * 8 # 8个计算任务
# ❌ 串行执行
start = time.time()
results = [process_data(n) for n in data_list]
print(f"串行耗时: {time.time() - start:.2f}秒")
# 输出:串行耗时: 8.50秒
# ❌ 多线程(无效!)
start = time.time()
with ThreadPoolExecutor(max_workers=8) as executor:
results = list(executor.map(process_data, data_list))
print(f"多线程耗时: {time.time() - start:.2f}秒")
# 输出:多线程耗时: 8.60秒 ❌ 没有加速(GIL限制)
# ✅ 多进程(推荐!)
start = time.time()
with ProcessPoolExecutor(max_workers=8) as executor:
results = list(executor.map(process_data, data_list))
print(f"多进程耗时: {time.time() - start:.2f}秒")
# 输出:多进程耗时: 1.15秒 ⚡ 快7-8倍!(充分利用8核CPU)
结论: CPU密集型任务 → 用多进程
🎯 6. 数据共享对比
多线程 - 数据天然共享
import threading
# 全局变量(所有线程共享)
shared_counter = 0
shared_data = {"total": 0}
def increment_counter():
global shared_counter
for _ in range(100000):
shared_counter += 1 # ✅ 直接访问共享变量
def update_dict():
for _ in range(100000):
shared_data["total"] += 1 # ✅ 直接访问共享字典
# 创建线程
t1 = threading.Thread(target=increment_counter)
t2 = threading.Thread(target=update_dict)
t1.start()
t2.start()
t1.join()
t2.join()
print(f"Counter: {shared_counter}") # 可以直接访问
print(f"Dict: {shared_data}") # 可以直接访问
# ⚠️ 但是有线程安全问题!(需要加锁)
多进程 - 需要显式共享
from multiprocessing import Process, Value, Manager
# ❌ 普通变量无法共享
counter = 0
def increment_normal():
global counter
for _ in range(100000):
counter += 1
p1 = Process(target=increment_normal)
p2 = Process(target=increment_normal)
p1.start()
p2.start()
p1.join()
p2.join()
print(f"Counter: {counter}") # 输出:0 ❌ 没有变化!(进程内存隔离)
# ✅ 使用共享内存(Value)
shared_counter = Value('i', 0) # 'i' = integer
def increment_shared(counter):
for _ in range(100000):
with counter.get_lock(): # 需要加锁
counter.value += 1
p1 = Process(target=increment_shared, args=(shared_counter,))
p2 = Process(target=increment_shared, args=(shared_counter,))
p1.start()
p2.start()
p1.join()
p2.join()
print(f"Shared Counter: {shared_counter.value}") # ✅ 正确
# ✅ 使用Manager(支持复杂数据结构)
manager = Manager()
shared_dict = manager.dict()
shared_list = manager.list()
def update_shared(d, l):
d["key"] = "value"
l.append(1)
p = Process(target=update_shared, args=(shared_dict, shared_list))
p.start()
p.join()
print(shared_dict) # {'key': 'value'}
print(shared_list) # [1]
🎯 7. 进程间通信(IPC)
多线程 - 直接通信
import threading
import queue
# 线程安全的队列
q = queue.Queue()
def producer():
for i in range(5):
q.put(f"数据{i}") # 放入队列
print(f"生产: 数据{i}")
def consumer():
while True:
try:
data = q.get(timeout=1) # 从队列取出
print(f"消费: {data}")
except queue.Empty:
break
# 创建线程
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t1.start()
t2.start()
t1.join()
t2.join()
# ✅ 简单直接
多进程 - 需要特殊机制
from multiprocessing import Process, Queue, Pipe
# 方式1: Queue(队列)
def producer(q):
for i in range(5):
q.put(f"数据{i}")
print(f"生产: 数据{i}")
def consumer(q):
while True:
try:
data = q.get(timeout=1)
print(f"消费: {data}")
except:
break
if __name__ == '__main__':
q = Queue() # 进程安全的队列
p1 = Process(target=producer, args=(q,))
p2 = Process(target=consumer, args=(q,))
p1.start()
p2.start()
p1.join()
p2.join()
# 方式2: Pipe(管道)
def send_data(conn):
conn.send("Hello from process")
conn.close()
def receive_data(conn):
msg = conn.recv()
print(f"收到: {msg}")
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p1 = Process(target=send_data, args=(child_conn,))
p2 = Process(target=receive_data, args=(parent_conn,))
p1.start()
p2.start()
p1.join()
p2.join()
🎯 8. 安全性对比
多线程 - 需要加锁
import threading
# ❌ 不安全的代码(竞态条件)
counter = 0
def increment():
global counter
for _ in range(100000):
counter += 1 # 不是原子操作!
threads = [threading.Thread(target=increment) for _ in range(5)]
for t in threads:
t.start()
for t in threads:
t.join()
print(counter) # 期望:500000,实际:可能是 327891(错误!)
# ✅ 安全的代码(加锁)
counter = 0
lock = threading.Lock()
def increment_safe():
global counter
for _ in range(100000):
with lock: # ⭐ 加锁保护
counter += 1
threads = [threading.Thread(target=increment_safe) for _ in range(5)]
for t in threads:
t.start()
for t in threads:
t.join()
print(counter) # 输出:500000 ✅ 正确
多进程 - 天然隔离
from multiprocessing import Process
# ✅ 天然安全(进程隔离)
def process_task():
# 每个进程有自己的内存空间
# 不会相互干扰
local_data = []
for i in range(1000):
local_data.append(i)
print(f"进程 {Process().name} 完成")
processes = [Process(target=process_task) for _ in range(5)]
for p in processes:
p.start()
for p in processes:
p.join()
# ✅ 没有竞态条件,不需要加锁
🎯 9. 你的项目应该用哪个?
华为云同步任务分析
# 你的任务类型分析:
华为云同步流程:
├─ 1. 调用华为云API获取资源列表(IO密集型)⭐
├─ 2. 并发查询每个资源的标签(IO密集型)⭐
├─ 3. 处理数据(轻量计算)
└─ 4. 批量写入数据库(IO密集型)⭐
结论:主要是 IO密集型任务
推荐:✅ 多线程
你当前的实现(完全正确!)
# 在 huawei_bss_client.py 中
from concurrent.futures import ThreadPoolExecutor
def _batch_query_tags(self, resources: List[Dict]) -> Dict:
"""批量查询资源标签"""
tags_dict = {}
# ✅ 使用多线程(正确选择!)
with ThreadPoolExecutor(max_workers=5) as executor:
future_to_resource = {
executor.submit(
self._query_single_resource_tags,
resource['id'],
resource['provider'],
resource['type']
): resource
for resource in resources
}
for future in as_completed(future_to_resource):
resource = future_to_resource[future]
try:
tags = future.result()
tags_dict[resource['id']] = tags
except Exception as e:
logger.error(f"查询失败: {resource['id']}")
return tags_dict
# ✅ 为什么用多线程是正确的:
# 1. 查询标签是IO密集型(网络请求)
# 2. 多线程在IO等待时会释放GIL,其他线程可以执行
# 3. 线程创建快,内存开销小
# 4. 数据共享方便(tags_dict 直接访问)
如果是CPU密集型任务,应该改用多进程
# 假设你需要处理大量数据计算(示例)
from concurrent.futures import ProcessPoolExecutor
def process_heavy_calculation(resource_data):
"""CPU密集型:复杂计算"""
# 假设这里有大量计算
result = complex_algorithm(resource_data)
return result
def sync_with_heavy_processing():
resources = get_resources()
# ✅ 对于CPU密集型,用多进程
with ProcessPoolExecutor(max_workers=4) as executor:
results = list(executor.map(process_heavy_calculation, resources))
return results
🎯 10. 选择指南
快速决策树
你的任务是什么?
├─ IO密集型(网络、文件、数据库)
│ └─ ✅ 使用多线程(ThreadPoolExecutor)
│ ├─ 优点:快速、内存小、数据共享方便
│ └─ max_workers = 5-20(经验值)
│
└─ CPU密集型(计算、数据处理)
└─ ✅ 使用多进程(ProcessPoolExecutor)
├─ 优点:绕过GIL、真并行
└─ max_workers = CPU核心数(通常4-8)
实际场景对应表
| 场景 | 推荐方案 | 理由 |
|---|---|---|
| 网络爬虫 | 多线程 | IO密集型(网络请求) |
| 文件读写 | 多线程 | IO密集型(磁盘IO) |
| 数据库查询 | 多线程 | IO密集型(网络+磁盘) |
| 图像处理 | 多进程 | CPU密集型(像素计算) |
| 视频编码 | 多进程 | CPU密集型(编解码) |
| 机器学习训练 | 多进程 | CPU密集型(矩阵运算) |
| API调用 | 多线程 | IO密集型(网络请求) |
| 数据分析 | 多进程 | CPU密集型(统计计算) |
| 你的华为云同步 | ✅ 多线程 | IO密集型(API调用) |
📊 总结对比
| 特性 | 多线程 | 多进程 |
|---|---|---|
| 内存 | 共享 | 独立 |
| 创建速度 | ⚡ 快 | 🐌 慢 |
| GIL影响 | ❌ 受限 | ✅ 无限制 |
| 数据共享 | ✅ 简单 | ❌ 复杂 |
| 安全性 | ⚠️ 需要加锁 | ✅ 天然隔离 |
| 适用任务 | IO密集型 | CPU密集型 |
| 你的项目 | ✅ 正确选择 | 不需要 |
🎓 最佳实践
# ✅ 你的项目(当前实现,完全正确)
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=5) as executor:
# IO密集型任务:API调用
results = executor.map(fetch_resource_tags, resource_ids)
# 如果将来需要处理CPU密集型任务
from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor(max_workers=4) as executor:
# CPU密集型任务:数据处理
results = executor.map(process_data, data_list)
你的华为云同步使用多线程是完全正确的选择!因为主要是API调用(IO密集型),多线程可以在等待网络响应时执行其他任务,效率最高。🎉
还有什么想了解的吗?比如:
- 如何避免多线程的竞态条件?
- 如何监控线程/进程的执行状态?
- 如何处理线程/进程的异常?
还有什么想了解的吗?比如: - 如何进一步优化线程数?
- 如何处理并发时的数据库死锁?
- 如何监控批量操作的进度?
更多推荐
所有评论(0)