c++线程池
c++实现线程池
1. 线程池核心设计思路
1.1 核心组件
任务队列:存储待执行的任务(用std::function封装任意可调用对象);
工作线程:固定数量的线程,循环从任务队列取任务执行;
同步机制:std::mutex保护任务队列,std::condition_variable实现线程等待 / 唤醒;
控制标志:标记线程池是否停止,避免线程无意义循环。
1.2 核心逻辑
线程池初始化时创建指定数量的工作线程;
外部通过submit提交任务,任务入队后唤醒一个等待的工作线程;
工作线程循环取任务执行,无任务时等待;
线程池销毁时,唤醒所有工作线程,等待线程退出。
2. 完整线程池实现代码
#include <iostream>
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <atomic>
#include <future>
// 线程池类
class ThreadPool {
public:
// 构造函数:初始化线程池,指定线程数量
explicit ThreadPool(size_t threadNum = std::thread::hardware_concurrency())
: m_stop(false) {
// 创建指定数量的工作线程
for (size_t i = 0; i < threadNum; ++i) {
m_workers.emplace_back([this]() {
// 工作线程核心循环
while (true) {
std::function<void()> task; // 存储待执行的任务
// 加锁访问任务队列
{
std::unique_lock<std::mutex> lock(m_mutex);
// 等待条件:线程池未停止 且 任务队列为空
m_cv.wait(lock, [this]() {
return m_stop || !m_tasks.empty();
});
// 线程池停止且任务队列为空,退出线程
if (m_stop && m_tasks.empty()) {
return;
}
// 取出任务队列的第一个任务
task = std::move(m_tasks.front());
m_tasks.pop();
} // 解锁
// 执行任务
try {
task();
} catch (const std::exception& e) {
// 捕获任务执行中的异常,避免线程崩溃
std::cerr << "Task execution error: " << e.what() << std::endl;
} catch (...) {
std::cerr << "Unknown task execution error" << std::endl;
}
}
});
}
}
// 禁用拷贝构造和赋值运算符(线程池不可拷贝)
ThreadPool(const ThreadPool&) = delete;
ThreadPool& operator=(const ThreadPool&) = delete;
// 移动构造(可选,按需启用)
ThreadPool(ThreadPool&&) = default;
ThreadPool& operator=(ThreadPool&&) = default;
// 析构函数:停止线程池,等待所有线程退出
~ThreadPool() {
// 标记线程池停止
{
std::unique_lock<std::mutex> lock(m_mutex);
m_stop = true;
} // 解锁
// 唤醒所有等待的工作线程
m_cv.notify_all();
// 等待所有工作线程退出
for (auto& worker : m_workers) {
if (worker.joinable()) {
worker.join();
}
}
}
// 提交任务:支持任意参数和返回值的函数,返回future获取结果
template <typename F, typename... Args>
auto submit(F&& f, Args&&... args) -> std::future<decltype(f(args...))> {
// 封装任务为可调用对象(绑定参数)
using ReturnType = decltype(f(args...));
auto task = std::make_shared<std::packaged_task<ReturnType()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
// 获取future,用于外部获取任务执行结果
std::future<ReturnType> res = task->get_future();
// 任务入队
{
std::unique_lock<std::mutex> lock(m_mutex);
// 线程池已停止时,禁止提交任务
if (m_stop) {
throw std::runtime_error("Submit task to stopped ThreadPool");
}
// 将任务放入队列(封装为无参数无返回值的函数)
m_tasks.emplace([task]() {
(*task)();
});
} // 解锁
// 唤醒一个等待的工作线程执行任务
m_cv.notify_one();
return res;
}
// 获取线程池中的线程数量
size_t getThreadNum() const {
return m_workers.size();
}
// 获取当前等待执行的任务数
size_t getTaskNum() const {
std::unique_lock<std::mutex> lock(m_mutex);
return m_tasks.size();
}
private:
std::vector<std::thread> m_workers; // 工作线程列表
std::queue<std::function<void()>> m_tasks;// 任务队列(存储无参数无返回值的可调用对象)
mutable std::mutex m_mutex; // 保护任务队列的互斥锁(mutable允许const函数加锁)
std::condition_variable m_cv; // 条件变量:唤醒等待的工作线程
std::atomic<bool> m_stop; // 线程池停止标志(atomic保证原子操作,避免数据竞争)
};
// 测试用例
int main() {
try {
// 创建线程池,线程数为CPU核心数(默认)
ThreadPool pool(4);
std::cout << "ThreadPool created with " << pool.getThreadNum() << " threads" << std::endl;
// 存储任务的future,用于获取结果
std::vector<std::future<int>> results;
// 提交10个任务
for (int i = 0; i < 10; ++i) {
results.emplace_back(
pool.submit([i]() {
// 模拟任务执行(耗时操作)
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::cout << "Task " << i << " executed by thread " << std::this_thread::get_id() << std::endl;
return i * i; // 任务返回值
})
);
}
// 获取并打印所有任务的结果
std::cout << "\nTask results:" << std::endl;
for (auto& res : results) {
std::cout << res.get() << " ";
}
std::cout << std::endl;
// 提交一个会抛出异常的任务
pool.submit([]() {
throw std::runtime_error("Test exception in task");
});
} catch (const std::exception& e) {
std::cerr << "Main thread error: " << e.what() << std::endl;
}
// 线程池析构时自动停止并等待所有线程退出
std::cout << "\nThreadPool destroyed" << std::endl;
return 0;
}
3. 代码河西解释
3.1 关键成员变量
m_workers:存储工作线程的容器,线程池初始化时创建,析构时销毁;
m_tasks:任务队列,用std::function<void()>封装任意任务(通过std::bind/std::packaged_task转换);
m_mutex:保护任务队列的互斥锁,确保多线程下任务入队 / 出队的线程安全;
m_cv:条件变量,工作线程无任务时等待,有新任务时唤醒;
m_stop:原子布尔值,标记线程池是否停止(原子操作避免多线程修改时的数据竞争)。
3.2 核心函数
(1)构造函数
创建指定数量的工作线程,每个线程执行一个「无限循环」:
加锁后等待条件(线程池未停止 或 任务队列非空);
满足条件时取出任务,解锁后执行;
线程池停止且任务队列为空时,退出循环(线程结束)。
(2)submit 函数(核心!)
模板函数,支持任意参数和返回值的任务;
用std::packaged_task封装任务,绑定参数,生成future供外部获取结果;
任务入队前检查线程池是否停止,避免向已停止的线程池提交任务;
任务入队后调用notify_one唤醒一个等待的工作线程。
(3)析构函数
标记m_stop = true(原子操作);
调用notify_all唤醒所有等待的工作线程;
遍历工作线程,调用join等待所有线程退出(避免线程成为僵尸线程)。
3.3 线程安全保障
- 所有对任务队列的操作(入队、出队、获取大小)都加锁保护;
- m_stop用std::atomic,保证多线程下修改 / 读取的原子性;
- 任务执行时捕获异常,避免单个任务崩溃导致整个工作线程退出。
4. 关键特性说明
- 支持任意任务:
通过std::function+ 模板 +std::packaged_task,支持任意参数、任意返回值的函数 /lambda/ 可调用对象;
返回std::future,外部可通过get()获取任务执行结果(会阻塞直到任务完成)。 - 优雅停止:
析构时自动停止线程池,唤醒所有线程,等待线程退出;
工作线程退出前会执行完队列中已有的任务(若需立即停止,可修改逻辑为「丢弃未执行任务」)。 - 异常安全:
任务执行中的异常被捕获,不会导致工作线程崩溃;
向已停止的线程池提交任务会抛出异常,避免无效操作。
5. submit函数详解
submit的核心价值就是让外部能提交任意任务,并通过future异步获取结果,而packaged_task是连接「任务执行」和「结果返回」的桥梁
5.1 submit的核心目标
线程池的submit函数需要实现两个关键功能:
- 接收任意类型的任务:支持不同参数、不同返回值的函数 /lambda/ 可调用对象;
- 异步返回任务结果:任务在工作线程中执行,外部线程(如主线程)能在需要时获取结果(或异常),且不阻塞提交过程。
而std::packaged_task和std::future正是为解决这两个问题而生的 ——packaged_task封装任务并关联结果,future提供获取结果的接口。
5.2 逐行拆解
// 模板参数F是任务类型,Args是任务参数类型
template <typename F, typename... Args>
// 返回值:std::future<任务返回值类型>
auto submit(F&& f, Args&&... args) -> std::future<decltype(f(args...))> {
// 1. 推导任务的返回值类型(ReturnType)
using ReturnType = decltype(f(args...));
// 2. 封装任务为packaged_task:把带参数的任务转为无参数的可调用对象
auto task = std::make_shared<std::packaged_task<ReturnType()>>(
// bind:绑定任务和参数,返回一个无参数的可调用对象
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
// 3. 获取future:通过packaged_task获取关联的future,供外部获取结果
std::future<ReturnType> res = task->get_future();
// 4. 任务入队(加锁保证线程安全)
{
std::unique_lock<std::mutex> lock(m_mutex);
// 禁止向已停止的线程池提交任务
if (m_stop) {
throw std::runtime_error("Submit task to stopped ThreadPool");
}
// 5. 将packaged_task封装为void()类型,放入任务队列
m_tasks.emplace([task]() {
(*task)(); // 执行任务,结果会自动存入packaged_task关联的状态中
});
}
// 6. 唤醒一个工作线程执行任务
m_cv.notify_one();
// 7. 返回future,外部通过它获取结果
return res;
}
关键步骤解释:
- 类型推导(ReturnType):
decltype(f(args…))推导任务f执行后的返回值类型(比如f是 { return 10; },则ReturnType是int)。 - packaged_task 封装任务:
线程池的任务队列是std::queue<std::function<void()>>(只能存无参数、无返回值的任务),但外部提交的任务可能有参数 / 返回值 ——std::bind把带参数的任务转为无参数,packaged_task把「无参数任务 + 返回值」封装起来,同时关联一个「结果存储区」。 - future 获取结果:
packaged_task::get_future()返回一个future对象,它和packaged_task共享同一个「结果存储区」—— 当任务执行完成,结果会存入该存储区,外部通过future::get()就能获取。 - 任务入队:
把packaged_task封装为void()类型的 lambda(因为任务队列要求),放入队列后唤醒工作线程。 - 执行任务:
工作线程取出任务执行(*task)()时,任务的返回值会自动存入packaged_task的结果存储区,future就能感知到。
5.3 std::packaged_task 详解
5.3.1 核心作用
std::packaged_task<Ret(Args…)>是一个「任务包装器」,核心功能:
- 封装一个「返回值为 Ret、参数为 Args…」的可调用对象(函数、lambda、仿函数);
- 将任务的执行结果(或异常)存储到一个「共享状态」中;
- 提供get_future()方法,返回一个std::future,用于获取共享状态中的结果。
简单说:packaged_task = 可调用对象 + 结果存储区 + future 关联接口。
5.3.2 基本用法
#include <iostream>
#include <future>
#include <functional>
int main() {
// 1. 定义一个packaged_task,封装「无参数、返回int」的任务
std::packaged_task<int()> task([]() {
return 100; // 任务返回值
});
// 2. 获取关联的future
std::future<int> fut = task.get_future();
// 3. 执行任务(结果会存入共享状态)
task(); // 等价于调用封装的lambda,返回值100存入共享状态
// 4. 通过future获取结果(阻塞直到任务完成)
std::cout << "Result: " << fut.get() << std::endl; // 输出100
return 0;
}
5.3.3 关键特性
- 只能执行一次:一个packaged_task对象只能调用一次(再次调用会抛出异常),因此线程池中用std::shared_ptr封装(因为任务队列可能拷贝,且工作线程执行时需要保证有效性);
- 异常传递:如果任务执行时抛出异常,异常会被捕获并存储到共享状态中,调用future::get()时会重新抛出该异常;
- 类型匹配:packaged_task的模板参数必须和封装的任务类型严格匹配(比如任务是int(int),则packaged_task需是std::packaged_task<int(int)>)。
5.4 std::future 详解
5.4.1 核心作用
std::future是一个「结果获取器」,核心功能:
- 与packaged_task/std::async/std::promise共享同一个「共享状态」;
- 提供接口获取共享状态中的结果(或异常);
- 支持异步等待(阻塞 / 非阻塞检查任务是否完成)。
简单说:future是外部获取异步任务结果的「唯一入口」。
5.4.2 核心方法
方法 作用
- get()
获取任务结果(阻塞当前线程直到任务完成);
若任务抛异常,get()会重新抛出;
只能调用一次(调用后共享状态失效)。 - wait()
阻塞当前线程直到任务完成(不获取结果)。 - wait_for(dur)
阻塞指定时长,返回状态:
future_status::ready(完成)、
future_status::timeout(超时)、
future_status::deferred(任务未开始)。 - valid()
判断 future 是否关联有效的共享状态(get()后返回 false)。
5.4.3 基本用法
#include <iostream>
#include <future>
#include <thread>
int main() {
// 1. 封装任务
std::packaged_task<int(int)> task([](int x) {
std::this_thread::sleep_for(std::chrono::seconds(1)); // 模拟耗时
return x * x;
});
// 2. 获取future
std::future<int> fut = task.get_future();
// 3. 启动线程执行任务
std::thread t(std::move(task), 10); // packaged_task不可拷贝,需move
t.detach(); // 分离线程(或join,这里演示异步)
// 4. 主线程做其他事
std::cout << "Waiting for task..." << std::endl;
// 5. 获取结果(阻塞直到任务完成)
std::cout << "Result: " << fut.get() << std::endl; // 输出100
return 0;
}
5.5 为什么线程池中要用 shared_ptr 封装 packaged_task?
这是新手最容易困惑的点,核心原因:
- packaged_task 不可拷贝:std::packaged_task的拷贝构造函数被删除,只能移动(std::move);
- 任务队列的要求:线程池的任务队列是std::queue<std::function<void()>>,std::function要求存储的对象可拷贝;
- 生命周期管理:packaged_task需要存活到工作线程执行任务时(若直接 move 到队列,可能提前析构)。
因此,用std::make_shared创建packaged_task的智能指针,lambda 捕获该指针(拷贝指针,而非packaged_task本身),既满足std::function的拷贝要求,又能保证packaged_task的生命周期直到任务执行完成。
5.6 submit 函数中的异常处理
- 提交时异常:线程池已停止(m_stop=true)时提交任务,抛出runtime_error;
- 任务执行异常:工作线程中执行(*task)()时,若任务抛异常,异常会被packaged_task捕获并存储到共享状态,外部调用future::get()时会重新抛出该异常;
- 示例:
// 提交一个抛异常的任务
auto fut = pool.submit([]() {
throw std::runtime_error("Task failed");
});
// 获取结果时会重新抛出异常
try {
fut.get();
} catch (const std::runtime_error& e) {
std::cerr << "Task error: " << e.what() << std::endl; // 输出"Task failed"
}
5.7 总结
submit函数、packaged_task、future的核心关系:
- submit:作为「任务提交入口」,将任意任务封装为线程池可接受的void()类型,返回future供外部获取结果;
- packaged_task:作为「任务 - 结果桥梁」,封装任务并关联共享状态,任务执行结果自动存入共享状态;
- future:作为「结果获取接口」,外部通过它阻塞 / 非阻塞获取任务结果(或异常)。
关键要点: - packaged_task封装任务,解决「任意任务→线程池统一任务类型」的问题;
- future解耦「任务提交」和「结果获取」,实现异步编程;
- 用shared_ptr封装packaged_task,解决其不可拷贝和生命周期问题;
- future::get()是获取结果的唯一方式,且只能调用一次,会传递任务执行时的异常。
更多推荐
所有评论(0)