1. 线程池核心设计思路

1.1 核心组件

任务队列:存储待执行的任务(用std::function封装任意可调用对象);
工作线程:固定数量的线程,循环从任务队列取任务执行;
同步机制:std::mutex保护任务队列,std::condition_variable实现线程等待 / 唤醒;
控制标志:标记线程池是否停止,避免线程无意义循环。

1.2 核心逻辑

线程池初始化时创建指定数量的工作线程;
外部通过submit提交任务,任务入队后唤醒一个等待的工作线程;
工作线程循环取任务执行,无任务时等待;
线程池销毁时,唤醒所有工作线程,等待线程退出。

2. 完整线程池实现代码

#include <iostream>
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <atomic>
#include <future>

// 线程池类
class ThreadPool {
public:
    // 构造函数:初始化线程池,指定线程数量
    explicit ThreadPool(size_t threadNum = std::thread::hardware_concurrency()) 
        : m_stop(false) {
        // 创建指定数量的工作线程
        for (size_t i = 0; i < threadNum; ++i) {
            m_workers.emplace_back([this]() {
                // 工作线程核心循环
                while (true) {
                    std::function<void()> task; // 存储待执行的任务

                    // 加锁访问任务队列
                    {
                        std::unique_lock<std::mutex> lock(m_mutex);
                        // 等待条件:线程池未停止 且 任务队列为空
                        m_cv.wait(lock, [this]() {
                            return m_stop || !m_tasks.empty();
                        });

                        // 线程池停止且任务队列为空,退出线程
                        if (m_stop && m_tasks.empty()) {
                            return;
                        }

                        // 取出任务队列的第一个任务
                        task = std::move(m_tasks.front());
                        m_tasks.pop();
                    } // 解锁

                    // 执行任务
                    try {
                        task();
                    } catch (const std::exception& e) {
                        // 捕获任务执行中的异常,避免线程崩溃
                        std::cerr << "Task execution error: " << e.what() << std::endl;
                    } catch (...) {
                        std::cerr << "Unknown task execution error" << std::endl;
                    }
                }
            });
        }
    }

    // 禁用拷贝构造和赋值运算符(线程池不可拷贝)
    ThreadPool(const ThreadPool&) = delete;
    ThreadPool& operator=(const ThreadPool&) = delete;

    // 移动构造(可选,按需启用)
    ThreadPool(ThreadPool&&) = default;
    ThreadPool& operator=(ThreadPool&&) = default;

    // 析构函数:停止线程池,等待所有线程退出
    ~ThreadPool() {
        // 标记线程池停止
        {
            std::unique_lock<std::mutex> lock(m_mutex);
            m_stop = true;
        } // 解锁

        // 唤醒所有等待的工作线程
        m_cv.notify_all();

        // 等待所有工作线程退出
        for (auto& worker : m_workers) {
            if (worker.joinable()) {
                worker.join();
            }
        }
    }

    // 提交任务:支持任意参数和返回值的函数,返回future获取结果
    template <typename F, typename... Args>
    auto submit(F&& f, Args&&... args) -> std::future<decltype(f(args...))> {
        // 封装任务为可调用对象(绑定参数)
        using ReturnType = decltype(f(args...));
        auto task = std::make_shared<std::packaged_task<ReturnType()>>(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...)
        );

        // 获取future,用于外部获取任务执行结果
        std::future<ReturnType> res = task->get_future();

        // 任务入队
        {
            std::unique_lock<std::mutex> lock(m_mutex);
            // 线程池已停止时,禁止提交任务
            if (m_stop) {
                throw std::runtime_error("Submit task to stopped ThreadPool");
            }

            // 将任务放入队列(封装为无参数无返回值的函数)
            m_tasks.emplace([task]() {
                (*task)();
            });
        } // 解锁

        // 唤醒一个等待的工作线程执行任务
        m_cv.notify_one();

        return res;
    }

    // 获取线程池中的线程数量
    size_t getThreadNum() const {
        return m_workers.size();
    }

    // 获取当前等待执行的任务数
    size_t getTaskNum() const {
        std::unique_lock<std::mutex> lock(m_mutex);
        return m_tasks.size();
    }

private:
    std::vector<std::thread> m_workers;       // 工作线程列表
    std::queue<std::function<void()>> m_tasks;// 任务队列(存储无参数无返回值的可调用对象)
    mutable std::mutex m_mutex;               // 保护任务队列的互斥锁(mutable允许const函数加锁)
    std::condition_variable m_cv;             // 条件变量:唤醒等待的工作线程
    std::atomic<bool> m_stop;                 // 线程池停止标志(atomic保证原子操作,避免数据竞争)
};

// 测试用例
int main() {
    try {
        // 创建线程池,线程数为CPU核心数(默认)
        ThreadPool pool(4);
        std::cout << "ThreadPool created with " << pool.getThreadNum() << " threads" << std::endl;

        // 存储任务的future,用于获取结果
        std::vector<std::future<int>> results;

        // 提交10个任务
        for (int i = 0; i < 10; ++i) {
            results.emplace_back(
                pool.submit([i]() {
                    // 模拟任务执行(耗时操作)
                    std::this_thread::sleep_for(std::chrono::milliseconds(100));
                    std::cout << "Task " << i << " executed by thread " << std::this_thread::get_id() << std::endl;
                    return i * i; // 任务返回值
                })
            );
        }

        // 获取并打印所有任务的结果
        std::cout << "\nTask results:" << std::endl;
        for (auto& res : results) {
            std::cout << res.get() << " ";
        }
        std::cout << std::endl;

        // 提交一个会抛出异常的任务
        pool.submit([]() {
            throw std::runtime_error("Test exception in task");
        });

    } catch (const std::exception& e) {
        std::cerr << "Main thread error: " << e.what() << std::endl;
    }

    // 线程池析构时自动停止并等待所有线程退出
    std::cout << "\nThreadPool destroyed" << std::endl;
    return 0;
}

3. 代码河西解释

3.1 关键成员变量

m_workers:存储工作线程的容器,线程池初始化时创建,析构时销毁;
m_tasks:任务队列,用std::function<void()>封装任意任务(通过std::bind/std::packaged_task转换);
m_mutex:保护任务队列的互斥锁,确保多线程下任务入队 / 出队的线程安全;
m_cv:条件变量,工作线程无任务时等待,有新任务时唤醒;
m_stop:原子布尔值,标记线程池是否停止(原子操作避免多线程修改时的数据竞争)。

3.2 核心函数

(1)构造函数
创建指定数量的工作线程,每个线程执行一个「无限循环」:
加锁后等待条件(线程池未停止 或 任务队列非空);
满足条件时取出任务,解锁后执行;
线程池停止且任务队列为空时,退出循环(线程结束)。
(2)submit 函数(核心!)
模板函数,支持任意参数和返回值的任务;
用std::packaged_task封装任务,绑定参数,生成future供外部获取结果;
任务入队前检查线程池是否停止,避免向已停止的线程池提交任务;
任务入队后调用notify_one唤醒一个等待的工作线程。
(3)析构函数
标记m_stop = true(原子操作);
调用notify_all唤醒所有等待的工作线程;
遍历工作线程,调用join等待所有线程退出(避免线程成为僵尸线程)。

3.3 线程安全保障

  • 所有对任务队列的操作(入队、出队、获取大小)都加锁保护;
  • m_stop用std::atomic,保证多线程下修改 / 读取的原子性;
  • 任务执行时捕获异常,避免单个任务崩溃导致整个工作线程退出。

4. 关键特性说明

  • 支持任意任务:
    通过std::function+ 模板 +std::packaged_task,支持任意参数、任意返回值的函数 /lambda/ 可调用对象;
    返回std::future,外部可通过get()获取任务执行结果(会阻塞直到任务完成)。
  • 优雅停止:
    析构时自动停止线程池,唤醒所有线程,等待线程退出;
    工作线程退出前会执行完队列中已有的任务(若需立即停止,可修改逻辑为「丢弃未执行任务」)。
  • 异常安全:
    任务执行中的异常被捕获,不会导致工作线程崩溃;
    向已停止的线程池提交任务会抛出异常,避免无效操作。

5. submit函数详解

submit的核心价值就是让外部能提交任意任务,并通过future异步获取结果,而packaged_task是连接「任务执行」和「结果返回」的桥梁

5.1 submit的核心目标

线程池的submit函数需要实现两个关键功能:

  • 接收任意类型的任务:支持不同参数、不同返回值的函数 /lambda/ 可调用对象;
  • 异步返回任务结果:任务在工作线程中执行,外部线程(如主线程)能在需要时获取结果(或异常),且不阻塞提交过程。
    而std::packaged_task和std::future正是为解决这两个问题而生的 ——packaged_task封装任务并关联结果,future提供获取结果的接口。

5.2 逐行拆解

// 模板参数F是任务类型,Args是任务参数类型
template <typename F, typename... Args>
// 返回值:std::future<任务返回值类型>
auto submit(F&& f, Args&&... args) -> std::future<decltype(f(args...))> {
    // 1. 推导任务的返回值类型(ReturnType)
    using ReturnType = decltype(f(args...));

    // 2. 封装任务为packaged_task:把带参数的任务转为无参数的可调用对象
    auto task = std::make_shared<std::packaged_task<ReturnType()>>(
        // bind:绑定任务和参数,返回一个无参数的可调用对象
        std::bind(std::forward<F>(f), std::forward<Args>(args)...)
    );

    // 3. 获取future:通过packaged_task获取关联的future,供外部获取结果
    std::future<ReturnType> res = task->get_future();

    // 4. 任务入队(加锁保证线程安全)
    {
        std::unique_lock<std::mutex> lock(m_mutex);
        // 禁止向已停止的线程池提交任务
        if (m_stop) {
            throw std::runtime_error("Submit task to stopped ThreadPool");
        }

        // 5. 将packaged_task封装为void()类型,放入任务队列
        m_tasks.emplace([task]() {
            (*task)(); // 执行任务,结果会自动存入packaged_task关联的状态中
        });
    }

    // 6. 唤醒一个工作线程执行任务
    m_cv.notify_one();

    // 7. 返回future,外部通过它获取结果
    return res;
}

关键步骤解释:

  • 类型推导(ReturnType):
    decltype(f(args…))推导任务f执行后的返回值类型(比如f是 { return 10; },则ReturnType是int)。
  • packaged_task 封装任务:
    线程池的任务队列是std::queue<std::function<void()>>(只能存无参数、无返回值的任务),但外部提交的任务可能有参数 / 返回值 ——std::bind把带参数的任务转为无参数,packaged_task把「无参数任务 + 返回值」封装起来,同时关联一个「结果存储区」。
  • future 获取结果:
    packaged_task::get_future()返回一个future对象,它和packaged_task共享同一个「结果存储区」—— 当任务执行完成,结果会存入该存储区,外部通过future::get()就能获取。
  • 任务入队:
    把packaged_task封装为void()类型的 lambda(因为任务队列要求),放入队列后唤醒工作线程。
  • 执行任务:
    工作线程取出任务执行(*task)()时,任务的返回值会自动存入packaged_task的结果存储区,future就能感知到。

5.3 std::packaged_task 详解

5.3.1 核心作用

std::packaged_task<Ret(Args…)>是一个「任务包装器」,核心功能:

  • 封装一个「返回值为 Ret、参数为 Args…」的可调用对象(函数、lambda、仿函数);
  • 将任务的执行结果(或异常)存储到一个「共享状态」中;
  • 提供get_future()方法,返回一个std::future,用于获取共享状态中的结果。
    简单说:packaged_task = 可调用对象 + 结果存储区 + future 关联接口。

5.3.2 基本用法

#include <iostream>
#include <future>
#include <functional>

int main() {
    // 1. 定义一个packaged_task,封装「无参数、返回int」的任务
    std::packaged_task<int()> task([]() {
        return 100; // 任务返回值
    });

    // 2. 获取关联的future
    std::future<int> fut = task.get_future();

    // 3. 执行任务(结果会存入共享状态)
    task(); // 等价于调用封装的lambda,返回值100存入共享状态

    // 4. 通过future获取结果(阻塞直到任务完成)
    std::cout << "Result: " << fut.get() << std::endl; // 输出100

    return 0;
}

5.3.3 关键特性

  • 只能执行一次:一个packaged_task对象只能调用一次(再次调用会抛出异常),因此线程池中用std::shared_ptr封装(因为任务队列可能拷贝,且工作线程执行时需要保证有效性);
  • 异常传递:如果任务执行时抛出异常,异常会被捕获并存储到共享状态中,调用future::get()时会重新抛出该异常;
  • 类型匹配:packaged_task的模板参数必须和封装的任务类型严格匹配(比如任务是int(int),则packaged_task需是std::packaged_task<int(int)>)。

5.4 std::future 详解

5.4.1 核心作用

std::future是一个「结果获取器」,核心功能:

  • 与packaged_task/std::async/std::promise共享同一个「共享状态」;
  • 提供接口获取共享状态中的结果(或异常);
  • 支持异步等待(阻塞 / 非阻塞检查任务是否完成)。
    简单说:future是外部获取异步任务结果的「唯一入口」。

5.4.2 核心方法

方法 作用

  • get()
    获取任务结果(阻塞当前线程直到任务完成);
    若任务抛异常,get()会重新抛出;
    只能调用一次(调用后共享状态失效)。
  • wait()
    阻塞当前线程直到任务完成(不获取结果)。
  • wait_for(dur)
    阻塞指定时长,返回状态:
    future_status::ready(完成)、
    future_status::timeout(超时)、
    future_status::deferred(任务未开始)。
  • valid()
    判断 future 是否关联有效的共享状态(get()后返回 false)。

5.4.3 基本用法

#include <iostream>
#include <future>
#include <thread>

int main() {
    // 1. 封装任务
    std::packaged_task<int(int)> task([](int x) {
        std::this_thread::sleep_for(std::chrono::seconds(1)); // 模拟耗时
        return x * x;
    });

    // 2. 获取future
    std::future<int> fut = task.get_future();

    // 3. 启动线程执行任务
    std::thread t(std::move(task), 10); // packaged_task不可拷贝,需move
    t.detach(); // 分离线程(或join,这里演示异步)

    // 4. 主线程做其他事
    std::cout << "Waiting for task..." << std::endl;

    // 5. 获取结果(阻塞直到任务完成)
    std::cout << "Result: " << fut.get() << std::endl; // 输出100

    return 0;
}

5.5 为什么线程池中要用 shared_ptr 封装 packaged_task?

这是新手最容易困惑的点,核心原因:

  • packaged_task 不可拷贝:std::packaged_task的拷贝构造函数被删除,只能移动(std::move);
  • 任务队列的要求:线程池的任务队列是std::queue<std::function<void()>>,std::function要求存储的对象可拷贝;
  • 生命周期管理:packaged_task需要存活到工作线程执行任务时(若直接 move 到队列,可能提前析构)。
    因此,用std::make_shared创建packaged_task的智能指针,lambda 捕获该指针(拷贝指针,而非packaged_task本身),既满足std::function的拷贝要求,又能保证packaged_task的生命周期直到任务执行完成。

5.6 submit 函数中的异常处理

  • 提交时异常:线程池已停止(m_stop=true)时提交任务,抛出runtime_error;
  • 任务执行异常:工作线程中执行(*task)()时,若任务抛异常,异常会被packaged_task捕获并存储到共享状态,外部调用future::get()时会重新抛出该异常;
  • 示例:
// 提交一个抛异常的任务
auto fut = pool.submit([]() {
    throw std::runtime_error("Task failed");
});

// 获取结果时会重新抛出异常
try {
    fut.get();
} catch (const std::runtime_error& e) {
    std::cerr << "Task error: " << e.what() << std::endl; // 输出"Task failed"
}

5.7 总结

submit函数、packaged_task、future的核心关系:

  • submit:作为「任务提交入口」,将任意任务封装为线程池可接受的void()类型,返回future供外部获取结果;
  • packaged_task:作为「任务 - 结果桥梁」,封装任务并关联共享状态,任务执行结果自动存入共享状态;
  • future:作为「结果获取接口」,外部通过它阻塞 / 非阻塞获取任务结果(或异常)。
    关键要点:
  • packaged_task封装任务,解决「任意任务→线程池统一任务类型」的问题;
  • future解耦「任务提交」和「结果获取」,实现异步编程;
  • 用shared_ptr封装packaged_task,解决其不可拷贝和生命周期问题;
  • future::get()是获取结果的唯一方式,且只能调用一次,会传递任务执行时的异常。
Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐