📚 批量操作 vs 并发操作

🎯 1. 批量操作(Batch Operations)

什么是批量操作?

批量操作 = 把多个数据库操作打包成一次提交,减少数据库交互次数

单个操作:
插入1条 → 提交 → 插入1条 → 提交 → 插入1条 → 提交
  ↓        ↓        ↓        ↓        ↓        ↓
数据库    数据库    数据库    数据库    数据库    数据库
(6次交互)

批量操作:
插入1条 + 插入1条 + 插入1条 → 一次性提交
  ↓
数据库
(1次交互)

性能对比示例

import time

# ❌ 慢方式:逐个插入(单个操作)
start = time.time()

for i in range(1000):
    resource = ExpiringResource(
        resource_id=f"res_{i}",
        resource_name=f"资源{i}",
        cloud_provider="huawei"
    )
    db.add(resource)
    db.commit()  # ⚠️ 每次都提交,1000次提交

print(f"逐个插入耗时: {time.time() - start:.2f}秒")
# 输出:逐个插入耗时: 15.32秒


# ✅ 快方式1:批量插入(循环+单次提交)
start = time.time()

for i in range(1000):
    resource = ExpiringResource(
        resource_id=f"res_{i}",
        resource_name=f"资源{i}",
        cloud_provider="huawei"
    )
    db.add(resource)

db.commit()  # ✅ 只提交一次

print(f"批量插入耗时: {time.time() - start:.2f}秒")
# 输出:批量插入耗时: 2.45秒(快6倍!)


# ✅ 快方式2:bulk_insert_mappings(最快)
start = time.time()

data_list = [
    {
        "resource_id": f"res_{i}",
        "resource_name": f"资源{i}",
        "cloud_provider": "huawei"
    }
    for i in range(1000)
]

db.bulk_insert_mappings(ExpiringResource, data_list)
db.commit()

print(f"bulk_insert耗时: {time.time() - start:.2f}秒")
# 输出:bulk_insert耗时: 0.85秒(快18倍!)

批量操作的方法

1. bulk_insert_mappings - 批量插入
# 准备数据(字典列表)
resources_data = [
    {"resource_id": "res_1", "resource_name": "资源1", "cloud_provider": "huawei"},
    {"resource_id": "res_2", "resource_name": "资源2", "cloud_provider": "huawei"},
    {"resource_id": "res_3", "resource_name": "资源3", "cloud_provider": "huawei"},
    # ... 1000条数据
]

# ⭐ 批量插入
db.bulk_insert_mappings(ExpiringResource, resources_data)
db.commit()

# 相当于一次性执行:
# INSERT INTO expiring_resources (resource_id, resource_name, cloud_provider) 
# VALUES ('res_1', '资源1', 'huawei'),
#        ('res_2', '资源2', 'huawei'),
#        ('res_3', '资源3', 'huawei');
2. bulk_update_mappings - 批量更新
# 批量更新
updates = [
    {"id": 1, "status": "active", "updated_at": datetime.now()},
    {"id": 2, "status": "active", "updated_at": datetime.now()},
    {"id": 3, "status": "inactive", "updated_at": datetime.now()},
]

db.bulk_update_mappings(ExpiringResource, updates)
db.commit()
3. 批量删除
# 批量删除
db.query(ExpiringResource).filter(
    ExpiringResource.cloud_provider == "huawei",
    ExpiringResource.is_active == False
).delete()
db.commit()

# 一次性删除所有符合条件的记录

你的项目中的批量操作

# 在你的 main.py 中(华为云同步)
def background_sync(sync_log_id: int, expire_days: int):
    db_thread = SessionLocal()
    try:
        # 获取华为云资源(假设有1000条)
        hw_resources = huawei_client.get_all_resources()
        
        # ⭐ 批量插入(你已经在用了!)
        db_thread.bulk_insert_mappings(
            ExpiringResource,
            hw_resources  # 1000条数据一次性插入
        )
        db_thread.commit()
        
        logger.info(f"✅ 批量插入完成: {len(hw_resources)}条资源")
        
    except Exception as e:
        db_thread.rollback()
        logger.error(f"批量操作失败: {e}")
    finally:
        db_thread.close()

🎯 2. 并发操作(Concurrent Operations)

什么是并发操作?

并发操作 = 多个任务同时执行,而不是按顺序一个接一个执行

串行执行(非并发):
任务1 ────→ 任务2 ────→ 任务3 ────→
0秒       5秒       10秒      15秒
总耗时:15秒

并发执行:
任务1 ────→
任务2 ────→
任务3 ────→
0秒       5秒
总耗时:5秒(快3倍!)

并发操作的方式

1. 多线程(Threading) - 适合IO密集型任务
import threading

# ❌ 串行执行(慢)
def sync_serial():
    # 查询华为云ECS标签(假设需要5秒)
    ecs_tags = fetch_ecs_tags()
    
    # 查询华为云RDS标签(假设需要5秒)
    rds_tags = fetch_rds_tags()
    
    # 查询华为云ELB标签(假设需要5秒)
    elb_tags = fetch_elb_tags()
    
    # 总耗时:5 + 5 + 5 = 15秒

# ✅ 并发执行(快)
def sync_concurrent():
    threads = []
    
    # 创建3个线程,同时查询
    t1 = threading.Thread(target=fetch_ecs_tags)
    t2 = threading.Thread(target=fetch_rds_tags)
    t3 = threading.Thread(target=fetch_elb_tags)
    
    threads = [t1, t2, t3]
    
    # 启动所有线程
    for t in threads:
        t.start()
    
    # 等待所有线程完成
    for t in threads:
        t.join()
    
    # 总耗时:约5秒(3个任务同时进行)

sync_concurrent()
2. 线程池(ThreadPoolExecutor) - 更优雅的方式
from concurrent.futures import ThreadPoolExecutor, as_completed

# ✅ 使用线程池
def fetch_tags_concurrent(resource_ids):
    results = []
    
    # 创建线程池(5个工作线程)
    with ThreadPoolExecutor(max_workers=5) as executor:
        # 提交所有任务
        future_to_resource = {
            executor.submit(fetch_resource_tags, res_id): res_id
            for res_id in resource_ids
        }
        
        # 获取结果(哪个先完成就先处理哪个)
        for future in as_completed(future_to_resource):
            resource_id = future_to_resource[future]
            try:
                tags = future.result()
                results.append(tags)
            except Exception as e:
                logger.error(f"查询失败 {resource_id}: {e}")
    
    return results

# 1000个资源ID
resource_ids = [f"res_{i}" for i in range(1000)]

# 串行:每个5秒 × 1000 = 5000秒(83分钟)
# 并发(5个线程):5秒 × (1000/5) = 1000秒(16分钟)
3. 多进程(Multiprocessing) - 适合CPU密集型任务
from multiprocessing import Pool

# CPU密集型任务(大量计算)
def process_resource_data(data):
    # 复杂的数据处理(假设需要大量计算)
    result = complex_calculation(data)
    return result

# ✅ 使用多进程
if __name__ == '__main__':
    data_list = [...]  # 1000条数据
    
    # 创建进程池(4个进程,利用4个CPU核心)
    with Pool(processes=4) as pool:
        results = pool.map(process_resource_data, data_list)
    
    # 4个进程并行处理,速度快4倍

你的项目中的并发操作

# 在你的 huawei_bss_client.py 中
def _batch_query_tags(self, resources: List[Dict]) -> Dict:
    """批量查询资源标签(并发执行)"""
    tags_dict = {}
    
    # ⭐ 使用线程池并发查询标签
    with ThreadPoolExecutor(max_workers=5) as executor:  # 5个线程同时工作
        future_to_resource = {
            executor.submit(
                self._query_single_resource_tags,
                resource['id'],
                resource['provider'],
                resource['type']
            ): resource
            for resource in resources
        }
        
        # 处理完成的任务
        for future in as_completed(future_to_resource):
            resource = future_to_resource[future]
            try:
                tags = future.result()  # 获取查询结果
                tags_dict[resource['id']] = tags
            except Exception as e:
                logger.error(f"查询失败: {resource['id']}")
    
    return tags_dict

🎯 3. 批量操作 vs 并发操作 - 关键区别

维度 批量操作 并发操作
定义 多个操作打包成一次提交 多个任务同时执行
目的 减少数据库交互次数 减少总执行时间
应用场景 数据库插入/更新 API调用、文件IO
技术手段 bulk_insert_mappings 多线程、多进程
提速原理 减少网络开销和事务开销 利用等待时间做其他事
资源消耗 单线程,内存占用稍高 多线程/进程,CPU/内存都增加

类比理解

批量操作:
就像去超市购物
❌ 买一件结账一次(单个操作)
   → 排队10次,很慢
✅ 买完所有东西一起结账(批量操作)
   → 排队1次,很快

并发操作:
就像餐厅做菜
❌ 厨师做完一道菜再做下一道(串行)
   → 炒菜 → 煮汤 → 烤肉(3小时)
✅ 3个厨师同时做(并发)
   → 炒菜 | 煮汤 | 烤肉(1小时)

🎯 4. 两者结合 - 最佳实践

场景:华为云同步1000个资源

def sync_huawei_resources_optimized(expire_days: int):
    """优化后的华为云同步(批量+并发)"""
    
    # 步骤1: 获取资源列表(API调用)
    hw_client = HuaweiBSSClient()
    resources = hw_client.list_resources()  # 1000个资源
    
    logger.info(f"📊 获取到 {len(resources)} 个资源")
    
    # 步骤2: 并发查询标签(多线程)
    logger.info("🔄 开始并发查询标签...")
    
    all_tags = {}
    with ThreadPoolExecutor(max_workers=5) as executor:  # ⭐ 并发操作
        future_to_resource = {
            executor.submit(hw_client.get_resource_tags, res['id']): res
            for res in resources
        }
        
        for future in as_completed(future_to_resource):
            resource = future_to_resource[future]
            try:
                tags = future.result()
                all_tags[resource['id']] = tags
            except Exception as e:
                logger.error(f"标签查询失败: {resource['id']}")
    
    logger.info(f"✅ 标签查询完成,共 {len(all_tags)} 个")
    
    # 步骤3: 准备数据
    resources_data = []
    for res in resources:
        resources_data.append({
            "resource_id": res['id'],
            "resource_name": res['name'],
            "cloud_provider": "huawei",
            "tags": all_tags.get(res['id'], {}),
            "created_at": datetime.now()
        })
    
    # 步骤4: 批量插入数据库(批量操作)
    logger.info("💾 开始批量保存到数据库...")
    
    db = SessionLocal()
    try:
        db.bulk_insert_mappings(ExpiringResource, resources_data)  # ⭐ 批量操作
        db.commit()
        logger.info(f"✅ 批量保存完成: {len(resources_data)} 条")
    except Exception as e:
        db.rollback()
        logger.error(f"批量保存失败: {e}")
        raise
    finally:
        db.close()

# 性能对比:
# ❌ 串行 + 单个插入:5秒×1000 + 0.015秒×1000 = 5015秒(83分钟)
# ✅ 并发 + 批量插入:5秒×(1000/5) + 0.85秒 = 1000.85秒(16分钟)
# 提速:5倍!

🎯 5. 实际应用中的注意事项

批量操作的限制

# ⚠️ 批量操作有大小限制
# MySQL的max_allowed_packet通常是64MB

# ❌ 一次插入10万条(可能超限)
db.bulk_insert_mappings(ExpiringResource, huge_data_list)  # 可能失败

# ✅ 分批插入(每批1000条)
batch_size = 1000
for i in range(0, len(huge_data_list), batch_size):
    batch = huge_data_list[i:i+batch_size]
    db.bulk_insert_mappings(ExpiringResource, batch)
    db.commit()
    logger.info(f"已保存 {i+len(batch)}/{len(huge_data_list)}")

并发操作的限制

# ⚠️ 线程数不是越多越好

# ❌ 线程过多(资源浪费)
with ThreadPoolExecutor(max_workers=1000) as executor:  # 太多了!
    # 1000个线程竞争资源,反而变慢

# ✅ 合理的线程数
# IO密集型任务:max_workers = CPU核心数 × (1 + IO等待时间/CPU计算时间)
# 经验值:5-20个线程

with ThreadPoolExecutor(max_workers=5) as executor:  # 合理
    # 5个线程足够,不会浪费资源

数据库连接池与并发

# 你的配置
pool_size=30        # 常驻连接
max_overflow=70     # 临时连接
# 总共100个连接

# 场景分析:
# 1. 正常API请求:10个连接
# 2. 华为云同步线程(5个):每个线程需要1个连接 = 5个连接
# 3. 批量插入:1个连接(因为是在单个Session中)
# 总计:10 + 5 + 1 = 16个连接(远低于30,很安全)

# ⚠️ 如果线程数过多
with ThreadPoolExecutor(max_workers=50) as executor:
    # 50个线程同时查询标签
    # 每个线程可能需要1个数据库连接
    # 50个连接(可能接近pool_size,需要注意)

🎯 6. 性能优化总结

你的项目优化建议

# 当前你的代码(已经很好)
def background_sync(sync_log_id: int, expire_days: int):
    db_thread = SessionLocal()
    try:
        # ✅ 已经使用了批量操作
        db_thread.bulk_insert_mappings(ExpiringResource, resources_data)
        db_thread.commit()
        
        # ✅ 已经使用了并发操作(ThreadPoolExecutor, max_workers=5)
        # 在 huawei_bss_client.py 的 _batch_query_tags 中
        
    finally:
        db_thread.close()

# 可以进一步优化的点:
# 1. 分批插入(如果资源超过5000条)
# 2. 添加进度反馈
# 3. 错误重试机制

完整优化示例

def background_sync_optimized(sync_log_id: int, expire_days: int):
    """优化后的后台同步"""
    db_thread = SessionLocal()
    
    try:
        log = db_thread.query(ResourceSyncLog).filter(
            ResourceSyncLog.id == sync_log_id
        ).first()
        
        # 1. 获取资源(API调用)
        hw_client = HuaweiBSSClient()
        resources = hw_client.list_resources()
        total = len(resources)
        
        logger.info(f"📊 共 {total} 个资源需要同步")
        
        # 2. 并发查询标签
        logger.info("🔄 并发查询标签(5个线程)...")
        tags_dict = hw_client._batch_query_tags(resources)  # ⭐ 并发
        
        # 3. 准备数据
        resources_data = prepare_data(resources, tags_dict)
        
        # 4. 分批插入(每批1000条)⭐ 批量
        batch_size = 1000
        for i in range(0, len(resources_data), batch_size):
            batch = resources_data[i:i+batch_size]
            
            # 批量插入
            db_thread.bulk_insert_mappings(ExpiringResource, batch)
            db_thread.commit()
            
            # 更新进度
            progress = (i + len(batch)) / total * 100
            logger.info(f"💾 保存进度: {i+len(batch)}/{total} ({progress:.1f}%)")
        
        # 5. 更新同步日志
        log.status = SyncStatusEnum.SUCCESS.value
        log.total_resources = total
        log.end_time = datetime.now()
        db_thread.commit()
        
        logger.info(f"✅ 同步完成!共 {total} 个资源")
        
    except Exception as e:
        logger.error(f"❌ 同步失败: {e}")
        log.status = SyncStatusEnum.FAILED.value
        log.error_message = str(e)
        db_thread.commit()
        raise
    finally:
        db_thread.close()

📊 总结对照表

特性 批量操作 并发操作 组合使用
速度提升 5-20倍 3-10倍 15-100倍
适用场景 数据库写入 API调用、IO操作 复杂任务
代码复杂度 简单 中等 中等
资源消耗 中-高 中-高
你的项目 ✅ 已使用 ✅ 已使用 ✅ 已完美结合

你的华为云同步代码已经同时使用了批量操作和并发操作,这是非常正确的做法!🎉

  • 并发操作:在 huawei_bss_client.py_batch_query_tags 中使用 ThreadPoolExecutor(max_workers=5)
  • 批量操作:在 main.pybackground_sync 中使用 bulk_insert_mappings

这就是为什么你的同步速度能够处理1000+资源的原因!👍
太好了!这是并发编程中最重要的概念之一。让我详细对比多线程和多进程的区别。


🔥 多线程 vs 多进程 - 完整对比

🎯 1. 核心概念

什么是进程(Process)?

进程 = 一个正在运行的程序,是操作系统资源分配的基本单位

进程的特点:
├─ 独立的内存空间
├─ 独立的资源(文件句柄、网络连接等)
├─ 至少包含一个线程(主线程)
└─ 进程之间完全隔离

类比: 进程就像独立的房子

  • 每个房子有自己的厨房、卧室、客厅
  • 房子之间互不干扰
  • 邻居家的火灾不会烧到你家

什么是线程(Thread)?

线程 = 进程内的执行单元,是CPU调度的基本单位

线程的特点:
├─ 共享进程的内存空间
├─ 共享进程的资源
├─ 多个线程可以同时存在于一个进程中
└─ 线程之间可以直接通信

类比: 线程就像房子里的房间

  • 多个房间共享厨房、客厅
  • 房间之间可以方便地交流
  • 一个房间着火会影响整个房子

🎯 2. 图形化对比

进程模型

操作系统
├─ 进程A [内存空间A] 🏠
│   ├─ 线程1
│   ├─ 线程2
│   └─ 资源A(独立)
│
├─ 进程B [内存空间B] 🏠
│   ├─ 线程1
│   ├─ 线程2
│   └─ 资源B(独立)
│
└─ 进程C [内存空间C] 🏠
    ├─ 线程1
    └─ 资源C(独立)

⭐ 进程之间完全隔离,内存独立

线程模型

进程A [共享内存空间] 🏠
├─ 线程1 👤(访问共享内存)
├─ 线程2 👤(访问共享内存)
├─ 线程3 👤(访问共享内存)
└─ 共享资源(所有线程都能访问)
    ├─ 全局变量
    ├─ 堆内存
    └─ 文件句柄

⭐ 线程之间共享内存,可以直接访问相同数据

🎯 3. 详细对比表

维度 多线程(Threading) 多进程(Multiprocessing)
内存空间 ✅ 共享(所有线程共用一块内存) ❌ 独立(每个进程有自己的内存)
创建速度 ⚡ 快(毫秒级) 🐌 慢(需要复制内存,秒级)
内存开销 💚 小(几MB) 💔 大(几十到几百MB)
通信方式 ✅ 简单(直接访问共享变量) ❌ 复杂(需要IPC:管道、队列)
数据共享 ✅ 天然共享 ❌ 需要显式传递
安全性 ⚠️ 低(线程崩溃会导致整个进程崩溃) ✅ 高(进程崩溃不影响其他进程)
适用场景 🌐 IO密集型(网络、文件、数据库) 🔢 CPU密集型(计算、数据处理)
Python GIL ❌ 受限(同一时刻只能一个线程执行Python代码) ✅ 无限制(每个进程有独立的GIL)
并行性 ❌ 伪并行(CPU计算不能真正并行) ✅ 真并行(可以利用多核CPU)
调试难度 😰 难(竞态条件、死锁) 😊 相对容易(进程隔离)

🎯 4. Python的特殊情况 - GIL(全局解释器锁)

什么是GIL?

GIL = Global Interpreter Lock(全局解释器锁)

Python的GIL规则:
在同一时刻,只允许一个线程执行Python字节码

即使你有8核CPU,多线程也只能用到1个核心(对于CPU密集型任务)

GIL的影响

import time
import threading
from multiprocessing import Process

# CPU密集型任务(大量计算)
def cpu_intensive_task():
    """计算密集型任务"""
    total = 0
    for i in range(10_000_000):
        total += i * i
    return total


# 测试1: 单线程
start = time.time()
cpu_intensive_task()
cpu_intensive_task()
print(f"单线程耗时: {time.time() - start:.2f}秒")
# 输出:单线程耗时: 3.50秒


# 测试2: 多线程(2个线程)
start = time.time()
t1 = threading.Thread(target=cpu_intensive_task)
t2 = threading.Thread(target=cpu_intensive_task)

t1.start()
t2.start()
t1.join()
t2.join()

print(f"多线程耗时: {time.time() - start:.2f}秒")
# 输出:多线程耗时: 3.55秒 ❌ 没有加速!(因为GIL)


# 测试3: 多进程(2个进程)
start = time.time()
p1 = Process(target=cpu_intensive_task)
p2 = Process(target=cpu_intensive_task)

p1.start()
p2.start()
p1.join()
p2.join()

print(f"多进程耗时: {time.time() - start:.2f}秒")
# 输出:多进程耗时: 1.80秒 ✅ 快了近2倍!(绕过GIL)

GIL对不同任务的影响

CPU密集型任务(大量计算):
├─ 多线程:❌ 无效(GIL限制)
│   → 1核CPU → 速度:1x
│
└─ 多进程:✅ 有效(绕过GIL)
    → 4核CPU → 速度:4x

IO密集型任务(网络、文件、数据库):
├─ 多线程:✅ 有效(IO等待时释放GIL)
│   → 线程在等待IO时,其他线程可以执行
│   → 速度:5-10x
│
└─ 多进程:✅ 有效(但开销大,不推荐)
    → 创建进程慢,内存占用大
    → 速度:5-10x(但代价更高)

🎯 5. 实际代码对比

场景1: IO密集型任务(网络请求)

import time
import requests
import threading
from multiprocessing import Process
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

# 模拟网络请求
def fetch_url(url):
    """IO密集型:网络请求"""
    response = requests.get(url)
    return len(response.content)

urls = [
    "https://www.baidu.com",
    "https://www.taobao.com",
    "https://www.jd.com",
    "https://www.qq.com",
    "https://www.sina.com.cn"
]


# ❌ 串行执行(慢)
start = time.time()
for url in urls:
    fetch_url(url)
print(f"串行耗时: {time.time() - start:.2f}秒")
# 输出:串行耗时: 5.20秒


# ✅ 多线程(推荐!)
start = time.time()
with ThreadPoolExecutor(max_workers=5) as executor:
    executor.map(fetch_url, urls)
print(f"多线程耗时: {time.time() - start:.2f}秒")
# 输出:多线程耗时: 1.05秒 ⚡ 快5倍!


# ⚠️ 多进程(不推荐,开销大)
start = time.time()
with ProcessPoolExecutor(max_workers=5) as executor:
    executor.map(fetch_url, urls)
print(f"多进程耗时: {time.time() - start:.2f}秒")
# 输出:多进程耗时: 2.30秒(包含进程创建开销)

结论: IO密集型任务 → 用多线程


场景2: CPU密集型任务(数据处理)

import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

# 模拟CPU密集型任务
def process_data(n):
    """CPU密集型:大量计算"""
    total = 0
    for i in range(n):
        total += i * i * i
    return total

data_list = [5_000_000] * 8  # 8个计算任务


# ❌ 串行执行
start = time.time()
results = [process_data(n) for n in data_list]
print(f"串行耗时: {time.time() - start:.2f}秒")
# 输出:串行耗时: 8.50秒


# ❌ 多线程(无效!)
start = time.time()
with ThreadPoolExecutor(max_workers=8) as executor:
    results = list(executor.map(process_data, data_list))
print(f"多线程耗时: {time.time() - start:.2f}秒")
# 输出:多线程耗时: 8.60秒 ❌ 没有加速(GIL限制)


# ✅ 多进程(推荐!)
start = time.time()
with ProcessPoolExecutor(max_workers=8) as executor:
    results = list(executor.map(process_data, data_list))
print(f"多进程耗时: {time.time() - start:.2f}秒")
# 输出:多进程耗时: 1.15秒 ⚡ 快7-8倍!(充分利用8核CPU)

结论: CPU密集型任务 → 用多进程


🎯 6. 数据共享对比

多线程 - 数据天然共享

import threading

# 全局变量(所有线程共享)
shared_counter = 0
shared_data = {"total": 0}

def increment_counter():
    global shared_counter
    for _ in range(100000):
        shared_counter += 1  # ✅ 直接访问共享变量

def update_dict():
    for _ in range(100000):
        shared_data["total"] += 1  # ✅ 直接访问共享字典

# 创建线程
t1 = threading.Thread(target=increment_counter)
t2 = threading.Thread(target=update_dict)

t1.start()
t2.start()
t1.join()
t2.join()

print(f"Counter: {shared_counter}")  # 可以直接访问
print(f"Dict: {shared_data}")         # 可以直接访问

# ⚠️ 但是有线程安全问题!(需要加锁)

多进程 - 需要显式共享

from multiprocessing import Process, Value, Manager

# ❌ 普通变量无法共享
counter = 0

def increment_normal():
    global counter
    for _ in range(100000):
        counter += 1

p1 = Process(target=increment_normal)
p2 = Process(target=increment_normal)
p1.start()
p2.start()
p1.join()
p2.join()

print(f"Counter: {counter}")  # 输出:0 ❌ 没有变化!(进程内存隔离)


# ✅ 使用共享内存(Value)
shared_counter = Value('i', 0)  # 'i' = integer

def increment_shared(counter):
    for _ in range(100000):
        with counter.get_lock():  # 需要加锁
            counter.value += 1

p1 = Process(target=increment_shared, args=(shared_counter,))
p2 = Process(target=increment_shared, args=(shared_counter,))
p1.start()
p2.start()
p1.join()
p2.join()

print(f"Shared Counter: {shared_counter.value}")  # ✅ 正确


# ✅ 使用Manager(支持复杂数据结构)
manager = Manager()
shared_dict = manager.dict()
shared_list = manager.list()

def update_shared(d, l):
    d["key"] = "value"
    l.append(1)

p = Process(target=update_shared, args=(shared_dict, shared_list))
p.start()
p.join()

print(shared_dict)  # {'key': 'value'}
print(shared_list)  # [1]

🎯 7. 进程间通信(IPC)

多线程 - 直接通信

import threading
import queue

# 线程安全的队列
q = queue.Queue()

def producer():
    for i in range(5):
        q.put(f"数据{i}")  # 放入队列
        print(f"生产: 数据{i}")

def consumer():
    while True:
        try:
            data = q.get(timeout=1)  # 从队列取出
            print(f"消费: {data}")
        except queue.Empty:
            break

# 创建线程
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)

t1.start()
t2.start()
t1.join()
t2.join()

# ✅ 简单直接

多进程 - 需要特殊机制

from multiprocessing import Process, Queue, Pipe

# 方式1: Queue(队列)
def producer(q):
    for i in range(5):
        q.put(f"数据{i}")
        print(f"生产: 数据{i}")

def consumer(q):
    while True:
        try:
            data = q.get(timeout=1)
            print(f"消费: {data}")
        except:
            break

if __name__ == '__main__':
    q = Queue()  # 进程安全的队列
    
    p1 = Process(target=producer, args=(q,))
    p2 = Process(target=consumer, args=(q,))
    
    p1.start()
    p2.start()
    p1.join()
    p2.join()


# 方式2: Pipe(管道)
def send_data(conn):
    conn.send("Hello from process")
    conn.close()

def receive_data(conn):
    msg = conn.recv()
    print(f"收到: {msg}")
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    
    p1 = Process(target=send_data, args=(child_conn,))
    p2 = Process(target=receive_data, args=(parent_conn,))
    
    p1.start()
    p2.start()
    p1.join()
    p2.join()

🎯 8. 安全性对比

多线程 - 需要加锁

import threading

# ❌ 不安全的代码(竞态条件)
counter = 0

def increment():
    global counter
    for _ in range(100000):
        counter += 1  # 不是原子操作!

threads = [threading.Thread(target=increment) for _ in range(5)]
for t in threads:
    t.start()
for t in threads:
    t.join()

print(counter)  # 期望:500000,实际:可能是 327891(错误!)


# ✅ 安全的代码(加锁)
counter = 0
lock = threading.Lock()

def increment_safe():
    global counter
    for _ in range(100000):
        with lock:  # ⭐ 加锁保护
            counter += 1

threads = [threading.Thread(target=increment_safe) for _ in range(5)]
for t in threads:
    t.start()
for t in threads:
    t.join()

print(counter)  # 输出:500000 ✅ 正确

多进程 - 天然隔离

from multiprocessing import Process

# ✅ 天然安全(进程隔离)
def process_task():
    # 每个进程有自己的内存空间
    # 不会相互干扰
    local_data = []
    for i in range(1000):
        local_data.append(i)
    print(f"进程 {Process().name} 完成")

processes = [Process(target=process_task) for _ in range(5)]
for p in processes:
    p.start()
for p in processes:
    p.join()

# ✅ 没有竞态条件,不需要加锁

🎯 9. 你的项目应该用哪个?

华为云同步任务分析

# 你的任务类型分析:
华为云同步流程:
├─ 1. 调用华为云API获取资源列表(IO密集型)⭐
├─ 2. 并发查询每个资源的标签(IO密集型)⭐
├─ 3. 处理数据(轻量计算)
└─ 4. 批量写入数据库(IO密集型)⭐

结论:主要是 IO密集型任务
推荐:✅ 多线程

你当前的实现(完全正确!)

# 在 huawei_bss_client.py 中
from concurrent.futures import ThreadPoolExecutor

def _batch_query_tags(self, resources: List[Dict]) -> Dict:
    """批量查询资源标签"""
    tags_dict = {}
    
    # ✅ 使用多线程(正确选择!)
    with ThreadPoolExecutor(max_workers=5) as executor:
        future_to_resource = {
            executor.submit(
                self._query_single_resource_tags,
                resource['id'],
                resource['provider'],
                resource['type']
            ): resource
            for resource in resources
        }
        
        for future in as_completed(future_to_resource):
            resource = future_to_resource[future]
            try:
                tags = future.result()
                tags_dict[resource['id']] = tags
            except Exception as e:
                logger.error(f"查询失败: {resource['id']}")
    
    return tags_dict

# ✅ 为什么用多线程是正确的:
# 1. 查询标签是IO密集型(网络请求)
# 2. 多线程在IO等待时会释放GIL,其他线程可以执行
# 3. 线程创建快,内存开销小
# 4. 数据共享方便(tags_dict 直接访问)

如果是CPU密集型任务,应该改用多进程

# 假设你需要处理大量数据计算(示例)
from concurrent.futures import ProcessPoolExecutor

def process_heavy_calculation(resource_data):
    """CPU密集型:复杂计算"""
    # 假设这里有大量计算
    result = complex_algorithm(resource_data)
    return result

def sync_with_heavy_processing():
    resources = get_resources()
    
    # ✅ 对于CPU密集型,用多进程
    with ProcessPoolExecutor(max_workers=4) as executor:
        results = list(executor.map(process_heavy_calculation, resources))
    
    return results

🎯 10. 选择指南

快速决策树

你的任务是什么?
├─ IO密集型(网络、文件、数据库)
│   └─ ✅ 使用多线程(ThreadPoolExecutor)
│       ├─ 优点:快速、内存小、数据共享方便
│       └─ max_workers = 5-20(经验值)
│
└─ CPU密集型(计算、数据处理)
    └─ ✅ 使用多进程(ProcessPoolExecutor)
        ├─ 优点:绕过GIL、真并行
        └─ max_workers = CPU核心数(通常4-8)

实际场景对应表

场景 推荐方案 理由
网络爬虫 多线程 IO密集型(网络请求)
文件读写 多线程 IO密集型(磁盘IO)
数据库查询 多线程 IO密集型(网络+磁盘)
图像处理 多进程 CPU密集型(像素计算)
视频编码 多进程 CPU密集型(编解码)
机器学习训练 多进程 CPU密集型(矩阵运算)
API调用 多线程 IO密集型(网络请求)
数据分析 多进程 CPU密集型(统计计算)
你的华为云同步 ✅ 多线程 IO密集型(API调用)

📊 总结对比

特性 多线程 多进程
内存 共享 独立
创建速度 ⚡ 快 🐌 慢
GIL影响 ❌ 受限 ✅ 无限制
数据共享 ✅ 简单 ❌ 复杂
安全性 ⚠️ 需要加锁 ✅ 天然隔离
适用任务 IO密集型 CPU密集型
你的项目 ✅ 正确选择 不需要

🎓 最佳实践

# ✅ 你的项目(当前实现,完全正确)
from concurrent.futures import ThreadPoolExecutor

with ThreadPoolExecutor(max_workers=5) as executor:
    # IO密集型任务:API调用
    results = executor.map(fetch_resource_tags, resource_ids)

# 如果将来需要处理CPU密集型任务
from concurrent.futures import ProcessPoolExecutor

with ProcessPoolExecutor(max_workers=4) as executor:
    # CPU密集型任务:数据处理
    results = executor.map(process_data, data_list)

你的华为云同步使用多线程是完全正确的选择!因为主要是API调用(IO密集型),多线程可以在等待网络响应时执行其他任务,效率最高。🎉

还有什么想了解的吗?比如:

  • 如何避免多线程的竞态条件?
  • 如何监控线程/进程的执行状态?
  • 如何处理线程/进程的异常?
    还有什么想了解的吗?比如:
  • 如何进一步优化线程数?
  • 如何处理并发时的数据库死锁?
  • 如何监控批量操作的进度?
Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐