一、并发控制的核心价值与设计原则

视频首先明确了 “工作流并发控制” 的本质是 “在多线程 / 进程 / 分布式环境下,安全、高效地并行执行 LangGraph 节点,同时保证状态一致性与资源不超限”,核心价值与设计原则如下:

(一)核心价值
  1. 效率提升:批量任务(如多文件 PDF 提取)并行执行,耗时从 “串行累加” 变为 “单任务耗时”;
  2. 资源可控:限制并发数,避免 CPU / 内存 / IO 资源耗尽;
  3. 分布式扩展:Electron 多实例、多设备间可分布式执行工作流,突破单进程资源限制;
  4. 故障隔离:单个并发任务失败不影响其他任务执行。
(二)核心设计原则
  1. 隔离性原则:每个并发工作流实例的 State 独立,避免数据互相覆盖;
  2. 资源限制原则:通过线程池 / 进程池控制最大并发数(如 CPU 核心数 = 4 则并发数≤4);
  3. 一致性原则:分布式场景下,核心状态(如任务进度、执行结果)需同步至共享存储;
  4. 可取消原则:支持随时终止单个并发任务,不影响整体工作流。
二、LangGraph 单进程并发执行实现(核心实操)

视频首先实现了单进程内的多线程并发,基于concurrent.futures.ThreadPoolExecutor管控并发节点执行:

(一)线程池并发工作流封装

python

运行

import uuid
from concurrent.futures import ThreadPoolExecutor, as_completed
from langgraph import Graph
from typing import List, Dict, Any
# 复用之前定义的PDFProcessState和DBWorkflowPersistence
from persistence import PDFProcessState, DBWorkflowPersistence

# 初始化数据库持久化工具
db_persistence = DBWorkflowPersistence()

class ConcurrentWorkflowManager:
    """并发工作流管理器"""
    def __init__(self, max_workers: int = 4):
        """
        :param max_workers: 最大并发数(建议等于CPU核心数)
        """
        self.max_workers = max_workers
        self.executor = ThreadPoolExecutor(max_workers=max_workers)
        # 存储正在执行的任务(task_id: future)
        self.running_tasks: Dict[str, Any] = {}

    def build_workflow(self) -> Any:
        """构建基础工作流(复用之前的节点)"""
        graph = Graph(PDFProcessState)
        graph.add_node("pdf_extract", pdf_extract_node)
        graph.add_node("text_clean", text_clean_node)
        graph.add_edge("pdf_extract", "text_clean")
        graph.set_entry_point("pdf_extract")
        graph.set_finish_point("text_clean")
        return graph.compile()

    def run_single_workflow(self, file_path: str, use_ocr: bool = False) -> PDFProcessState:
        """执行单个工作流(独立State)"""
        execution_id = str(uuid.uuid4())
        # 初始化独立State
        state = PDFProcessState(
            execution_id=execution_id,
            file_path=file_path,
            use_ocr=use_ocr,
            current_node="start",
            workflow_status="running"
        )
        # 保存初始快照
        db_persistence.save_snapshot(state)
        # 执行工作流
        app = self.build_workflow()
        final_state = app.invoke(state)
        # 保存最终快照
        db_persistence.save_snapshot(final_state)
        return final_state

    def run_concurrent_workflows(self, task_list: List[Dict[str, Any]]) -> List[PDFProcessState]:
        """
        并发执行多个工作流任务
        :param task_list: 任务列表,格式:[{"file_path": "...", "use_ocr": ...}, ...]
        :return: 所有任务的执行结果
        """
        futures = []
        results = []
        # 提交并发任务
        for task in task_list:
            task_id = str(uuid.uuid4())
            future = self.executor.submit(self.run_single_workflow, task["file_path"], task.get("use_ocr", False))
            self.running_tasks[task_id] = future
            futures.append((task_id, future))
        
        # 收集执行结果
        for task_id, future in futures:
            try:
                result = future.result()  # 获取任务结果
                results.append(result)
            except Exception as e:
                # 记录失败任务状态
                error_state = PDFProcessState(
                    execution_id=task_id,
                    file_path=task_list[futures.index((task_id, future))]["file_path"],
                    workflow_status="failed",
                    error_message=f"并发执行失败:{str(e)}"
                )
                db_persistence.save_snapshot(error_state)
                results.append(error_state)
            finally:
                del self.running_tasks[task_id]
        
        return results

    def cancel_task(self, task_id: str) -> bool:
        """取消单个正在执行的并发任务"""
        if task_id in self.running_tasks:
            future = self.running_tasks[task_id]
            if not future.done():
                cancel_success = future.cancel()
                if cancel_success:
                    # 更新任务状态为已取消
                    cancel_state = PDFProcessState(
                        execution_id=task_id,
                        workflow_status="cancelled",
                        error_message="任务已手动取消"
                    )
                    db_persistence.save_snapshot(cancel_state)
                    del self.running_tasks[task_id]
                return cancel_success
        return False
(二)并发任务调用示例

python

运行

# 初始化并发管理器(最大并发数4)
concurrent_manager = ConcurrentWorkflowManager(max_workers=4)

# 批量任务列表(5个PDF提取任务)
task_list = [
    {"file_path": "C:/docs/1.pdf", "use_ocr": False},
    {"file_path": "C:/docs/2.pdf", "use_ocr": True},
    {"file_path": "C:/docs/3.pdf", "use_ocr": False},
    {"file_path": "C:/docs/4.pdf", "use_ocr": True},
    {"file_path": "C:/docs/5.pdf", "use_ocr": False},
]

# 执行并发任务
results = concurrent_manager.run_concurrent_workflows(task_list)

# 打印执行结果
for result in results:
    print(f"任务ID:{result.execution_id},文件:{result.file_path},状态:{result.workflow_status}")
三、进程池并发(突破 GIL 限制)

针对 CPU 密集型任务(如 OCR 识别、文本处理),视频中使用multiprocessing.Pool实现进程级并发,规避 Python GIL(全局解释器锁)的限制:

(一)进程池并发实现

python

运行

import multiprocessing
from multiprocessing import Pool
import os

# 注意:进程池执行的函数需可序列化,建议单独定义
def run_worker_task(task: Dict[str, Any]) -> Dict[str, Any]:
    """进程池工作函数(独立进程执行)"""
    # 初始化进程内的持久化工具(每个进程独立连接)
    worker_persistence = DBWorkflowPersistence()
    execution_id = str(uuid.uuid4())
    try:
        state = PDFProcessState(
            execution_id=execution_id,
            file_path=task["file_path"],
            use_ocr=task.get("use_ocr", False),
            workflow_status="running"
        )
        # 构建工作流并执行
        graph = Graph(PDFProcessState)
        graph.add_node("pdf_extract", pdf_extract_node)
        graph.add_node("text_clean", text_clean_node)
        graph.add_edge("pdf_extract", "text_clean")
        app = graph.compile()
        final_state = app.invoke(state)
        # 保存快照(进程内独立保存)
        worker_persistence.save_snapshot(final_state)
        return {
            "success": True,
            "execution_id": execution_id,
            "state": final_state.dict()  # 转换为字典便于进程间传递
        }
    except Exception as e:
        error_state = PDFProcessState(
            execution_id=execution_id,
            file_path=task["file_path"],
            workflow_status="failed",
            error_message=str(e)
        )
        worker_persistence.save_snapshot(error_state)
        return {
            "success": False,
            "execution_id": execution_id,
            "error": str(e)
        }

class ProcessPoolWorkflowManager:
    """进程池并发工作流管理器"""
    def __init__(self, max_processes: int = None):
        """
        :param max_processes: 最大进程数(默认CPU核心数)
        """
        self.max_processes = max_processes or multiprocessing.cpu_count()
        self.pool = Pool(processes=self.max_processes)

    def run_concurrent_tasks(self, task_list: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """进程池执行并发任务"""
        # 提交任务到进程池
        results = self.pool.map(run_worker_task, task_list)
        # 关闭进程池
        self.pool.close()
        self.pool.join()
        return results

# 调用示例
if __name__ == "__main__":  # 进程池必须在main函数中调用
    process_manager = ProcessPoolWorkflowManager(max_processes=4)
    task_list = [
        {"file_path": "C:/docs/1.pdf", "use_ocr": True},
        {"file_path": "C:/docs/2.pdf", "use_ocr": True},
    ]
    results = process_manager.run_concurrent_tasks(task_list)
    print(results)
四、分布式执行与状态同步(跨 Electron 实例)

视频中实现了基于共享 SQLite 数据库 + 文件锁的分布式执行方案,支持多个 Electron 实例 / 设备并行执行工作流,同时保证状态一致性:

(一)分布式架构设计

预览

查看代码

执行任务/更新状态

执行任务/更新状态

状态同步

状态同步

防止并发写入冲突

Electron实例1

共享SQLite数据库

Electron实例2

文件锁

flowchart LR
    A[Electron实例1] -->|执行任务/更新状态| C[共享SQLite数据库]
    B[Electron实例2] -->|执行任务/更新状态| C
    C -->|状态同步| A
    C -->|状态同步| B
    D[文件锁] -->|防止并发写入冲突| C

执行任务/更新状态

执行任务/更新状态

状态同步

状态同步

防止并发写入冲突

Electron实例1

共享SQLite数据库

Electron实例2

文件锁

豆包

你的 AI 助手,助力每日工作学习

(二)分布式状态同步实现

python

运行

import fcntl
import os

class DistributedWorkflowManager:
    """分布式工作流管理器(跨实例)"""
    def __init__(self, db_path: str = "./shared_workflow_db.sqlite", lock_file: str = "./workflow_lock.lock"):
        self.db_path = db_path
        self.lock_file = lock_file
        self.persistence = DBWorkflowPersistence(db_path)

    def _acquire_lock(self) -> None:
        """获取文件锁(防止并发写入)"""
        self.lock_fd = open(self.lock_file, "w")
        fcntl.flock(self.lock_fd, fcntl.LOCK_EX)  # 排他锁

    def _release_lock(self) -> None:
        """释放文件锁"""
        fcntl.flock(self.lock_fd, fcntl.LOCK_UN)
        self.lock_fd.close()

    def fetch_pending_tasks(self, limit: int = 5) -> List[Dict[str, Any]]:
        """从共享数据库获取待执行任务(分布式任务分发)"""
        self._acquire_lock()
        try:
            conn = sqlite3.connect(self.db_path)
            cursor = conn.cursor()
            # 查询状态为pending的任务,限制数量(避免单实例抢占过多)
            cursor.execute('''
                SELECT id, file_path, use_ocr 
                FROM distributed_tasks 
                WHERE status = 'pending' 
                LIMIT ?
            ''', (limit,))
            tasks = cursor.fetchall()
            # 将任务标记为running(防止其他实例重复执行)
            task_ids = [task[0] for task in tasks]
            if task_ids:
                cursor.execute(f'''
                    UPDATE distributed_tasks 
                    SET status = 'running', worker_id = ? 
                    WHERE id IN ({",".join(["?"]*len(task_ids))})
                ''', (os.getpid(), *task_ids))
            conn.commit()
            conn.close()
            # 转换为任务列表格式
            return [{"id": t[0], "file_path": t[1], "use_ocr": t[2]} for t in tasks]
        finally:
            self._release_lock()

    def run_distributed_tasks(self) -> None:
        """执行分布式任务"""
        # 循环获取待执行任务,直到无任务
        while True:
            pending_tasks = self.fetch_pending_tasks(limit=2)
            if not pending_tasks:
                break
            # 执行任务
            for task in pending_tasks:
                execution_id = str(uuid.uuid4())
                try:
                    # 执行工作流
                    state = PDFProcessState(
                        execution_id=execution_id,
                        file_path=task["file_path"],
                        use_ocr=task["use_ocr"],
                        workflow_status="running"
                    )
                    app = self._build_workflow()
                    final_state = app.invoke(state)
                    # 更新任务状态为completed
                    self._update_task_status(task["id"], "completed", execution_id)
                except Exception as e:
                    # 更新任务状态为failed
                    self._update_task_status(task["id"], "failed", execution_id, error=str(e))

    def _update_task_status(self, task_id: int, status: str, execution_id: str, error: str = "") -> None:
        """更新分布式任务状态"""
        self._acquire_lock()
        try:
            conn = sqlite3.connect(self.db_path)
            cursor = conn.cursor()
            cursor.execute('''
                UPDATE distributed_tasks 
                SET status = ?, execution_id = ?, error_msg = ?, update_time = CURRENT_TIMESTAMP
                WHERE id = ?
            ''', (status, execution_id, error, task_id))
            conn.commit()
            conn.close()
        finally:
            self._release_lock()
(三)Electron 前端分布式任务管理

tsx

import { useState, useEffect } from 'react';
import { Button, Table, Input, Upload, Space } from 'antd';

const DistributedTaskPanel = () => {
  const [taskList, setTaskList] = useState([]);
  const [loading, setLoading] = useState(false);

  // 加载分布式任务列表
  const fetchTasks = async () => {
    setLoading(true);
    const list = await window.electronAPI.getDistributedTasks();
    setTaskList(list);
    setLoading(false);
  };

  // 提交批量任务到共享数据库
  const uploadTasks = async (files) => {
    const taskList = files.map(file => ({
      file_path: file.path,
      use_ocr: true,
      status: 'pending'
    }));
    await window.electronAPI.submitDistributedTasks(taskList);
    fetchTasks();
  };

  // 启动当前实例的任务执行
  const startWorker = async () => {
    await window.electronAPI.startDistributedWorker();
    fetchTasks();
  };

  useEffect(() => {
    fetchTasks();
    // 定时刷新任务状态
    const timer = setInterval(fetchTasks, 5000);
    return () => clearInterval(timer);
  }, []);

  const columns = [
    { title: '任务ID', dataIndex: 'id', key: 'id' },
    { title: '文件路径', dataIndex: 'file_path', key: 'file_path', ellipsis: true },
    { title: '执行状态', dataIndex: 'status', key: 'status' },
    { title: '执行实例ID', dataIndex: 'worker_id', key: 'worker_id' },
    { title: '错误信息', dataIndex: 'error_msg', key: 'error_msg', ellipsis: true },
  ];

  return (
    <Space direction="vertical" size="large" style={{ width: '100%' }}>
      <Space>
        <Upload directory multiple beforeUpload={(file) => false} onChange={({ fileList }) => uploadTasks(fileList)}>
          <Button>上传批量PDF文件</Button>
        </Upload>
        <Button type="primary" onClick={startWorker}>启动本地执行器</Button>
      </Space>
      <Table columns={columns} dataSource={taskList} loading={loading} rowKey="id" />
    </Space>
  );
};
五、并发控制常见问题与解决方案
问题现象 核心原因 解决方案
线程并发执行 CPU 密集任务效率低 Python GIL 限制,多线程无法利用多核 1. 改用进程池(multiprocessing);2. 核心计算逻辑用 C 扩展 / 第三方库(如 PyOCR)
分布式任务重复执行 多实例同时获取到同一任务,锁机制失效 1. 任务状态更新 + 文件锁双重保障;2. 任务分配时生成唯一 worker_id,绑定执行实例;3. 增加任务版本号
进程池执行时 State 序列化失败 进程间传递的对象不可序列化 1. 仅传递基础数据类型(字典 / 列表);2. 使用multiprocessing.Manager管理共享数据;3. 避免在进程间传递复杂对象
并发任务占用内存过高 大量任务同时加载大文件 / 模型 1. 任务队列限流(如每次仅加载 10 个任务);2. 执行完成后及时释放资源(关闭文件 / 卸载模型);3. 内存监控 + 自动清理
任务取消失败 线程 / 进程已执行到不可中断阶段 1. 在节点中添加 “取消检测点”(如每处理 1 页 PDF 检测一次取消标记);2. 使用可中断的执行框架(如asyncio

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐