c++线程池
c++线程池实现
·
目录
1. 核心设计思路
基于生产者 - 消费者模型:主线程提交任务到任务队列,工作线程从队列取任务执行;
线程安全:用互斥锁 + 条件变量保护任务队列,避免竞态条件;
优雅退出:支持等待所有任务完成后停止线程池,或强制终止;
任务类型适配:支持任意参数的函数 /lambda 表达式,返回值可通过 std::future 获取;
资源管理:自动管理线程生命周期,避免内存泄漏。
2. 代码实现
#ifndef THREAD_POOL_H
#define THREAD_POOL_H
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <future>
#include <stdexcept>
#include <atomic>
// 通用线程池类(C++11 及以上)
class ThreadPool {
public:
// 构造函数:指定线程数(默认CPU核心数)
explicit ThreadPool(size_t thread_num = std::thread::hardware_concurrency())
: is_running_(true) {
if (thread_num == 0) {
thread_num = 1; // 至少1个线程
}
// 创建工作线程
for (size_t i = 0; i < thread_num; ++i) {
workers_.emplace_back([this]() {
// 工作线程主循环
while (is_running_) {
std::function<void()> task;
// 加锁取任务
{
std::unique_lock<std::mutex> lock(this->mtx_);
// 等待条件:线程池运行中 且 任务队列为空
this->cv_.wait(lock, [this]() {
return !this->is_running_ || !this->tasks_.empty();
});
// 线程池停止且任务队列为空,退出线程
if (!this->is_running_ && this->tasks_.empty()) {
return;
}
// 取出队列头部任务
task = std::move(this->tasks_.front());
this->tasks_.pop();
}
// 执行任务(解锁后执行,避免阻塞其他线程取任务)
try {
task();
} catch (const std::exception& e) {
// 捕获任务执行异常,避免线程崩溃
// 可扩展:日志记录、异常回调等
fprintf(stderr, "ThreadPool task exception: %s\n", e.what());
} catch (...) {
fprintf(stderr, "ThreadPool task unknown exception\n");
}
}
});
}
}
// 禁用拷贝和移动(线程池不可拷贝)
ThreadPool(const ThreadPool&) = delete;
ThreadPool& operator=(const ThreadPool&) = delete;
ThreadPool(ThreadPool&&) = delete;
ThreadPool& operator=(ThreadPool&&) = delete;
// 析构函数:优雅停止线程池
~ThreadPool() {
if (is_running_) {
stop(false); // 等待所有任务完成后停止
}
}
// 提交任务到线程池,返回future获取结果
// 支持任意参数的可调用对象(函数、lambda、bind表达式等)
template<class F, class... Args>
auto submit(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> {
if (!is_running_) {
throw std::runtime_error("submit task to stopped ThreadPool");
}
// 封装任务为std::function<void()>,并绑定参数
using ReturnType = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared<std::packaged_task<ReturnType()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<ReturnType> res = task->get_future();
// 加锁将任务加入队列
{
std::unique_lock<std::mutex> lock(mtx_);
tasks_.emplace([task]() { (*task)(); });
}
// 通知一个等待的工作线程执行任务
cv_.notify_one();
return res;
}
// 停止线程池
// force: true=强制终止(立即停止,未执行的任务丢弃);false=优雅停止(等待所有任务完成)
void stop(bool force = false) {
{
std::unique_lock<std::mutex> lock(mtx_);
is_running_ = false;
if (force) {
// 强制终止:清空任务队列
while (!tasks_.empty()) {
tasks_.pop();
}
}
}
// 唤醒所有工作线程
cv_.notify_all();
// 等待所有线程退出
for (std::thread& worker : workers_) {
if (worker.joinable()) {
worker.join();
}
}
}
// 获取当前线程池的线程数
size_t get_thread_num() const {
return workers_.size();
}
// 获取当前等待执行的任务数
size_t get_pending_task_num() const {
std::unique_lock<std::mutex> lock(mtx_);
return tasks_.size();
}
private:
// 工作线程列表
std::vector<std::thread> workers_;
// 任务队列(存储无参数无返回值的可调用对象)
std::queue<std::function<void()>> tasks_;
// 保护任务队列的互斥锁
mutable std::mutex mtx_;
// 条件变量:通知线程有任务/停止
std::condition_variable cv_;
// 线程池运行状态
std::atomic<bool> is_running_;
};
#endif // THREAD_POOL_H
3. 编译与运行
3.1 编译命令
g++ -std=c++11 main.cpp -o thread_pool_demo -pthread
3.2 运行
./thread_pool_demo
3.3 预期输出
ThreadPool created with 4 threads
Thread 140709280656128: Hello ThreadPool!
Thread 140709272263424: Hello ThreadPool!
Thread 140709263870720: Hello ThreadPool!
Thread 140709255478016: Hello ThreadPool!
Thread 140709280656128: Calculating 0 + 1
Thread 140709272263424: Calculating 1 + 2
Thread 140709263870720: Calculating 2 + 3
Add result: 1
Add result: 3
Add result: 5
Thread 140709255478016: Lambda task in ThreadPool
Lambda task return: 24
Pending tasks: 1
Thread 140709280656128: Hello ThreadPool!
ThreadPool stopped gracefully
4. 核心功能解析
4.1. 任务提交(submit 函数)
模板函数支持任意可调用对象(普通函数、lambda、bind 表达式、成员函数);
通过 std::packaged_task 封装任务,std::future 获取返回值,支持异步获取结果;
参数通过 std::forward 完美转发,避免拷贝开销。
4.2. 线程安全保障
std::mutex 保护任务队列的读写,避免多线程同时操作队列;
std::condition_variable 实现 “无任务时线程休眠,有任务时唤醒”,避免忙等;
std::atomic 标记线程池运行状态,保证多线程下的原子操作。
4.3. 优雅退出机制
析构函数自动调用 stop(false),确保线程池销毁时等待所有任务完成;
stop 函数支持两种模式:
force=false:等待队列中所有任务执行完毕后停止线程;
force=true:立即清空队列并停止线程,适合紧急终止场景。
4.4. 异常处理
任务执行时捕获所有异常,避免单个任务崩溃导致整个线程退出;
提交任务时检查线程池状态,已停止的线程池拒绝提交任务并抛出异常。
更多推荐
所有评论(0)