1. 核心设计思路

基于生产者 - 消费者模型:主线程提交任务到任务队列,工作线程从队列取任务执行;
线程安全:用互斥锁 + 条件变量保护任务队列,避免竞态条件;
优雅退出:支持等待所有任务完成后停止线程池,或强制终止;
任务类型适配:支持任意参数的函数 /lambda 表达式,返回值可通过 std::future 获取;
资源管理:自动管理线程生命周期,避免内存泄漏。

2. 代码实现

#ifndef THREAD_POOL_H
#define THREAD_POOL_H

#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <future>
#include <stdexcept>
#include <atomic>

// 通用线程池类(C++11 及以上)
class ThreadPool {
public:
    // 构造函数:指定线程数(默认CPU核心数)
    explicit ThreadPool(size_t thread_num = std::thread::hardware_concurrency()) 
        : is_running_(true) {
        if (thread_num == 0) {
            thread_num = 1; // 至少1个线程
        }
        // 创建工作线程
        for (size_t i = 0; i < thread_num; ++i) {
            workers_.emplace_back([this]() {
                // 工作线程主循环
                while (is_running_) {
                    std::function<void()> task;
                    // 加锁取任务
                    {
                        std::unique_lock<std::mutex> lock(this->mtx_);
                        // 等待条件:线程池运行中 且 任务队列为空
                        this->cv_.wait(lock, [this]() {
                            return !this->is_running_ || !this->tasks_.empty();
                        });
                        // 线程池停止且任务队列为空,退出线程
                        if (!this->is_running_ && this->tasks_.empty()) {
                            return;
                        }
                        // 取出队列头部任务
                        task = std::move(this->tasks_.front());
                        this->tasks_.pop();
                    }
                    // 执行任务(解锁后执行,避免阻塞其他线程取任务)
                    try {
                        task();
                    } catch (const std::exception& e) {
                        // 捕获任务执行异常,避免线程崩溃
                        // 可扩展:日志记录、异常回调等
                        fprintf(stderr, "ThreadPool task exception: %s\n", e.what());
                    } catch (...) {
                        fprintf(stderr, "ThreadPool task unknown exception\n");
                    }
                }
            });
        }
    }

    // 禁用拷贝和移动(线程池不可拷贝)
    ThreadPool(const ThreadPool&) = delete;
    ThreadPool& operator=(const ThreadPool&) = delete;
    ThreadPool(ThreadPool&&) = delete;
    ThreadPool& operator=(ThreadPool&&) = delete;

    // 析构函数:优雅停止线程池
    ~ThreadPool() {
        if (is_running_) {
            stop(false); // 等待所有任务完成后停止
        }
    }

    // 提交任务到线程池,返回future获取结果
    // 支持任意参数的可调用对象(函数、lambda、bind表达式等)
    template<class F, class... Args>
    auto submit(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> {
        if (!is_running_) {
            throw std::runtime_error("submit task to stopped ThreadPool");
        }

        // 封装任务为std::function<void()>,并绑定参数
        using ReturnType = typename std::result_of<F(Args...)>::type;
        auto task = std::make_shared<std::packaged_task<ReturnType()>>(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...)
        );

        std::future<ReturnType> res = task->get_future();
        // 加锁将任务加入队列
        {
            std::unique_lock<std::mutex> lock(mtx_);
            tasks_.emplace([task]() { (*task)(); });
        }
        // 通知一个等待的工作线程执行任务
        cv_.notify_one();
        return res;
    }

    // 停止线程池
    // force: true=强制终止(立即停止,未执行的任务丢弃);false=优雅停止(等待所有任务完成)
    void stop(bool force = false) {
        {
            std::unique_lock<std::mutex> lock(mtx_);
            is_running_ = false;
            if (force) {
                // 强制终止:清空任务队列
                while (!tasks_.empty()) {
                    tasks_.pop();
                }
            }
        }
        // 唤醒所有工作线程
        cv_.notify_all();
        // 等待所有线程退出
        for (std::thread& worker : workers_) {
            if (worker.joinable()) {
                worker.join();
            }
        }
    }

    // 获取当前线程池的线程数
    size_t get_thread_num() const {
        return workers_.size();
    }

    // 获取当前等待执行的任务数
    size_t get_pending_task_num() const {
        std::unique_lock<std::mutex> lock(mtx_);
        return tasks_.size();
    }

private:
    // 工作线程列表
    std::vector<std::thread> workers_;
    // 任务队列(存储无参数无返回值的可调用对象)
    std::queue<std::function<void()>> tasks_;
    // 保护任务队列的互斥锁
    mutable std::mutex mtx_;
    // 条件变量:通知线程有任务/停止
    std::condition_variable cv_;
    // 线程池运行状态
    std::atomic<bool> is_running_;
};

#endif // THREAD_POOL_H

3. 编译与运行

3.1 编译命令

g++ -std=c++11 main.cpp -o thread_pool_demo -pthread

3.2 运行

./thread_pool_demo

3.3 预期输出

ThreadPool created with 4 threads
Thread 140709280656128: Hello ThreadPool!
Thread 140709272263424: Hello ThreadPool!
Thread 140709263870720: Hello ThreadPool!
Thread 140709255478016: Hello ThreadPool!
Thread 140709280656128: Calculating 0 + 1
Thread 140709272263424: Calculating 1 + 2
Thread 140709263870720: Calculating 2 + 3
Add result: 1
Add result: 3
Add result: 5
Thread 140709255478016: Lambda task in ThreadPool
Lambda task return: 24
Pending tasks: 1
Thread 140709280656128: Hello ThreadPool!
ThreadPool stopped gracefully

4. 核心功能解析

4.1. 任务提交(submit 函数)

模板函数支持任意可调用对象(普通函数、lambda、bind 表达式、成员函数);
通过 std::packaged_task 封装任务,std::future 获取返回值,支持异步获取结果;
参数通过 std::forward 完美转发,避免拷贝开销。

4.2. 线程安全保障

std::mutex 保护任务队列的读写,避免多线程同时操作队列;
std::condition_variable 实现 “无任务时线程休眠,有任务时唤醒”,避免忙等;
std::atomic 标记线程池运行状态,保证多线程下的原子操作。

4.3. 优雅退出机制

析构函数自动调用 stop(false),确保线程池销毁时等待所有任务完成;
stop 函数支持两种模式:
force=false:等待队列中所有任务执行完毕后停止线程;
force=true:立即清空队列并停止线程,适合紧急终止场景。

4.4. 异常处理

任务执行时捕获所有异常,避免单个任务崩溃导致整个线程退出;
提交任务时检查线程池状态,已停止的线程池拒绝提交任务并抛出异常。

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐