[大模型架构] LangGraph AI 工作流编排(18)
针对 CPU 密集型任务(如 OCR 识别、文本处理),视频中使用。视频首先实现了单进程内的多线程并发,基于。你的 AI 助手,助力每日工作学习。Electron实例1。共享SQLite数据库。Electron实例2。Electron实例1。共享SQLite数据库。Electron实例2。
·
一、并发控制的核心价值与设计原则
视频首先明确了 “工作流并发控制” 的本质是 “在多线程 / 进程 / 分布式环境下,安全、高效地并行执行 LangGraph 节点,同时保证状态一致性与资源不超限”,核心价值与设计原则如下:
(一)核心价值
- 效率提升:批量任务(如多文件 PDF 提取)并行执行,耗时从 “串行累加” 变为 “单任务耗时”;
- 资源可控:限制并发数,避免 CPU / 内存 / IO 资源耗尽;
- 分布式扩展:Electron 多实例、多设备间可分布式执行工作流,突破单进程资源限制;
- 故障隔离:单个并发任务失败不影响其他任务执行。
(二)核心设计原则
- 隔离性原则:每个并发工作流实例的 State 独立,避免数据互相覆盖;
- 资源限制原则:通过线程池 / 进程池控制最大并发数(如 CPU 核心数 = 4 则并发数≤4);
- 一致性原则:分布式场景下,核心状态(如任务进度、执行结果)需同步至共享存储;
- 可取消原则:支持随时终止单个并发任务,不影响整体工作流。
二、LangGraph 单进程并发执行实现(核心实操)
视频首先实现了单进程内的多线程并发,基于concurrent.futures.ThreadPoolExecutor管控并发节点执行:
(一)线程池并发工作流封装
python
运行
import uuid
from concurrent.futures import ThreadPoolExecutor, as_completed
from langgraph import Graph
from typing import List, Dict, Any
# 复用之前定义的PDFProcessState和DBWorkflowPersistence
from persistence import PDFProcessState, DBWorkflowPersistence
# 初始化数据库持久化工具
db_persistence = DBWorkflowPersistence()
class ConcurrentWorkflowManager:
"""并发工作流管理器"""
def __init__(self, max_workers: int = 4):
"""
:param max_workers: 最大并发数(建议等于CPU核心数)
"""
self.max_workers = max_workers
self.executor = ThreadPoolExecutor(max_workers=max_workers)
# 存储正在执行的任务(task_id: future)
self.running_tasks: Dict[str, Any] = {}
def build_workflow(self) -> Any:
"""构建基础工作流(复用之前的节点)"""
graph = Graph(PDFProcessState)
graph.add_node("pdf_extract", pdf_extract_node)
graph.add_node("text_clean", text_clean_node)
graph.add_edge("pdf_extract", "text_clean")
graph.set_entry_point("pdf_extract")
graph.set_finish_point("text_clean")
return graph.compile()
def run_single_workflow(self, file_path: str, use_ocr: bool = False) -> PDFProcessState:
"""执行单个工作流(独立State)"""
execution_id = str(uuid.uuid4())
# 初始化独立State
state = PDFProcessState(
execution_id=execution_id,
file_path=file_path,
use_ocr=use_ocr,
current_node="start",
workflow_status="running"
)
# 保存初始快照
db_persistence.save_snapshot(state)
# 执行工作流
app = self.build_workflow()
final_state = app.invoke(state)
# 保存最终快照
db_persistence.save_snapshot(final_state)
return final_state
def run_concurrent_workflows(self, task_list: List[Dict[str, Any]]) -> List[PDFProcessState]:
"""
并发执行多个工作流任务
:param task_list: 任务列表,格式:[{"file_path": "...", "use_ocr": ...}, ...]
:return: 所有任务的执行结果
"""
futures = []
results = []
# 提交并发任务
for task in task_list:
task_id = str(uuid.uuid4())
future = self.executor.submit(self.run_single_workflow, task["file_path"], task.get("use_ocr", False))
self.running_tasks[task_id] = future
futures.append((task_id, future))
# 收集执行结果
for task_id, future in futures:
try:
result = future.result() # 获取任务结果
results.append(result)
except Exception as e:
# 记录失败任务状态
error_state = PDFProcessState(
execution_id=task_id,
file_path=task_list[futures.index((task_id, future))]["file_path"],
workflow_status="failed",
error_message=f"并发执行失败:{str(e)}"
)
db_persistence.save_snapshot(error_state)
results.append(error_state)
finally:
del self.running_tasks[task_id]
return results
def cancel_task(self, task_id: str) -> bool:
"""取消单个正在执行的并发任务"""
if task_id in self.running_tasks:
future = self.running_tasks[task_id]
if not future.done():
cancel_success = future.cancel()
if cancel_success:
# 更新任务状态为已取消
cancel_state = PDFProcessState(
execution_id=task_id,
workflow_status="cancelled",
error_message="任务已手动取消"
)
db_persistence.save_snapshot(cancel_state)
del self.running_tasks[task_id]
return cancel_success
return False
(二)并发任务调用示例
python
运行
# 初始化并发管理器(最大并发数4)
concurrent_manager = ConcurrentWorkflowManager(max_workers=4)
# 批量任务列表(5个PDF提取任务)
task_list = [
{"file_path": "C:/docs/1.pdf", "use_ocr": False},
{"file_path": "C:/docs/2.pdf", "use_ocr": True},
{"file_path": "C:/docs/3.pdf", "use_ocr": False},
{"file_path": "C:/docs/4.pdf", "use_ocr": True},
{"file_path": "C:/docs/5.pdf", "use_ocr": False},
]
# 执行并发任务
results = concurrent_manager.run_concurrent_workflows(task_list)
# 打印执行结果
for result in results:
print(f"任务ID:{result.execution_id},文件:{result.file_path},状态:{result.workflow_status}")
三、进程池并发(突破 GIL 限制)
针对 CPU 密集型任务(如 OCR 识别、文本处理),视频中使用multiprocessing.Pool实现进程级并发,规避 Python GIL(全局解释器锁)的限制:
(一)进程池并发实现
python
运行
import multiprocessing
from multiprocessing import Pool
import os
# 注意:进程池执行的函数需可序列化,建议单独定义
def run_worker_task(task: Dict[str, Any]) -> Dict[str, Any]:
"""进程池工作函数(独立进程执行)"""
# 初始化进程内的持久化工具(每个进程独立连接)
worker_persistence = DBWorkflowPersistence()
execution_id = str(uuid.uuid4())
try:
state = PDFProcessState(
execution_id=execution_id,
file_path=task["file_path"],
use_ocr=task.get("use_ocr", False),
workflow_status="running"
)
# 构建工作流并执行
graph = Graph(PDFProcessState)
graph.add_node("pdf_extract", pdf_extract_node)
graph.add_node("text_clean", text_clean_node)
graph.add_edge("pdf_extract", "text_clean")
app = graph.compile()
final_state = app.invoke(state)
# 保存快照(进程内独立保存)
worker_persistence.save_snapshot(final_state)
return {
"success": True,
"execution_id": execution_id,
"state": final_state.dict() # 转换为字典便于进程间传递
}
except Exception as e:
error_state = PDFProcessState(
execution_id=execution_id,
file_path=task["file_path"],
workflow_status="failed",
error_message=str(e)
)
worker_persistence.save_snapshot(error_state)
return {
"success": False,
"execution_id": execution_id,
"error": str(e)
}
class ProcessPoolWorkflowManager:
"""进程池并发工作流管理器"""
def __init__(self, max_processes: int = None):
"""
:param max_processes: 最大进程数(默认CPU核心数)
"""
self.max_processes = max_processes or multiprocessing.cpu_count()
self.pool = Pool(processes=self.max_processes)
def run_concurrent_tasks(self, task_list: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""进程池执行并发任务"""
# 提交任务到进程池
results = self.pool.map(run_worker_task, task_list)
# 关闭进程池
self.pool.close()
self.pool.join()
return results
# 调用示例
if __name__ == "__main__": # 进程池必须在main函数中调用
process_manager = ProcessPoolWorkflowManager(max_processes=4)
task_list = [
{"file_path": "C:/docs/1.pdf", "use_ocr": True},
{"file_path": "C:/docs/2.pdf", "use_ocr": True},
]
results = process_manager.run_concurrent_tasks(task_list)
print(results)
四、分布式执行与状态同步(跨 Electron 实例)
视频中实现了基于共享 SQLite 数据库 + 文件锁的分布式执行方案,支持多个 Electron 实例 / 设备并行执行工作流,同时保证状态一致性:
(一)分布式架构设计
预览
查看代码
执行任务/更新状态
执行任务/更新状态
状态同步
状态同步
防止并发写入冲突
Electron实例1
共享SQLite数据库
Electron实例2
文件锁
flowchart LR
A[Electron实例1] -->|执行任务/更新状态| C[共享SQLite数据库]
B[Electron实例2] -->|执行任务/更新状态| C
C -->|状态同步| A
C -->|状态同步| B
D[文件锁] -->|防止并发写入冲突| C
执行任务/更新状态
执行任务/更新状态
状态同步
状态同步
防止并发写入冲突
Electron实例1
共享SQLite数据库
Electron实例2
文件锁
豆包
你的 AI 助手,助力每日工作学习
(二)分布式状态同步实现
python
运行
import fcntl
import os
class DistributedWorkflowManager:
"""分布式工作流管理器(跨实例)"""
def __init__(self, db_path: str = "./shared_workflow_db.sqlite", lock_file: str = "./workflow_lock.lock"):
self.db_path = db_path
self.lock_file = lock_file
self.persistence = DBWorkflowPersistence(db_path)
def _acquire_lock(self) -> None:
"""获取文件锁(防止并发写入)"""
self.lock_fd = open(self.lock_file, "w")
fcntl.flock(self.lock_fd, fcntl.LOCK_EX) # 排他锁
def _release_lock(self) -> None:
"""释放文件锁"""
fcntl.flock(self.lock_fd, fcntl.LOCK_UN)
self.lock_fd.close()
def fetch_pending_tasks(self, limit: int = 5) -> List[Dict[str, Any]]:
"""从共享数据库获取待执行任务(分布式任务分发)"""
self._acquire_lock()
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 查询状态为pending的任务,限制数量(避免单实例抢占过多)
cursor.execute('''
SELECT id, file_path, use_ocr
FROM distributed_tasks
WHERE status = 'pending'
LIMIT ?
''', (limit,))
tasks = cursor.fetchall()
# 将任务标记为running(防止其他实例重复执行)
task_ids = [task[0] for task in tasks]
if task_ids:
cursor.execute(f'''
UPDATE distributed_tasks
SET status = 'running', worker_id = ?
WHERE id IN ({",".join(["?"]*len(task_ids))})
''', (os.getpid(), *task_ids))
conn.commit()
conn.close()
# 转换为任务列表格式
return [{"id": t[0], "file_path": t[1], "use_ocr": t[2]} for t in tasks]
finally:
self._release_lock()
def run_distributed_tasks(self) -> None:
"""执行分布式任务"""
# 循环获取待执行任务,直到无任务
while True:
pending_tasks = self.fetch_pending_tasks(limit=2)
if not pending_tasks:
break
# 执行任务
for task in pending_tasks:
execution_id = str(uuid.uuid4())
try:
# 执行工作流
state = PDFProcessState(
execution_id=execution_id,
file_path=task["file_path"],
use_ocr=task["use_ocr"],
workflow_status="running"
)
app = self._build_workflow()
final_state = app.invoke(state)
# 更新任务状态为completed
self._update_task_status(task["id"], "completed", execution_id)
except Exception as e:
# 更新任务状态为failed
self._update_task_status(task["id"], "failed", execution_id, error=str(e))
def _update_task_status(self, task_id: int, status: str, execution_id: str, error: str = "") -> None:
"""更新分布式任务状态"""
self._acquire_lock()
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
UPDATE distributed_tasks
SET status = ?, execution_id = ?, error_msg = ?, update_time = CURRENT_TIMESTAMP
WHERE id = ?
''', (status, execution_id, error, task_id))
conn.commit()
conn.close()
finally:
self._release_lock()
(三)Electron 前端分布式任务管理
tsx
import { useState, useEffect } from 'react';
import { Button, Table, Input, Upload, Space } from 'antd';
const DistributedTaskPanel = () => {
const [taskList, setTaskList] = useState([]);
const [loading, setLoading] = useState(false);
// 加载分布式任务列表
const fetchTasks = async () => {
setLoading(true);
const list = await window.electronAPI.getDistributedTasks();
setTaskList(list);
setLoading(false);
};
// 提交批量任务到共享数据库
const uploadTasks = async (files) => {
const taskList = files.map(file => ({
file_path: file.path,
use_ocr: true,
status: 'pending'
}));
await window.electronAPI.submitDistributedTasks(taskList);
fetchTasks();
};
// 启动当前实例的任务执行
const startWorker = async () => {
await window.electronAPI.startDistributedWorker();
fetchTasks();
};
useEffect(() => {
fetchTasks();
// 定时刷新任务状态
const timer = setInterval(fetchTasks, 5000);
return () => clearInterval(timer);
}, []);
const columns = [
{ title: '任务ID', dataIndex: 'id', key: 'id' },
{ title: '文件路径', dataIndex: 'file_path', key: 'file_path', ellipsis: true },
{ title: '执行状态', dataIndex: 'status', key: 'status' },
{ title: '执行实例ID', dataIndex: 'worker_id', key: 'worker_id' },
{ title: '错误信息', dataIndex: 'error_msg', key: 'error_msg', ellipsis: true },
];
return (
<Space direction="vertical" size="large" style={{ width: '100%' }}>
<Space>
<Upload directory multiple beforeUpload={(file) => false} onChange={({ fileList }) => uploadTasks(fileList)}>
<Button>上传批量PDF文件</Button>
</Upload>
<Button type="primary" onClick={startWorker}>启动本地执行器</Button>
</Space>
<Table columns={columns} dataSource={taskList} loading={loading} rowKey="id" />
</Space>
);
};
五、并发控制常见问题与解决方案
| 问题现象 | 核心原因 | 解决方案 |
|---|---|---|
| 线程并发执行 CPU 密集任务效率低 | Python GIL 限制,多线程无法利用多核 | 1. 改用进程池(multiprocessing);2. 核心计算逻辑用 C 扩展 / 第三方库(如 PyOCR) |
| 分布式任务重复执行 | 多实例同时获取到同一任务,锁机制失效 | 1. 任务状态更新 + 文件锁双重保障;2. 任务分配时生成唯一 worker_id,绑定执行实例;3. 增加任务版本号 |
| 进程池执行时 State 序列化失败 | 进程间传递的对象不可序列化 | 1. 仅传递基础数据类型(字典 / 列表);2. 使用multiprocessing.Manager管理共享数据;3. 避免在进程间传递复杂对象 |
| 并发任务占用内存过高 | 大量任务同时加载大文件 / 模型 | 1. 任务队列限流(如每次仅加载 10 个任务);2. 执行完成后及时释放资源(关闭文件 / 卸载模型);3. 内存监控 + 自动清理 |
| 任务取消失败 | 线程 / 进程已执行到不可中断阶段 | 1. 在节点中添加 “取消检测点”(如每处理 1 页 PDF 检测一次取消标记);2. 使用可中断的执行框架(如asyncio) |
更多推荐
所有评论(0)