C++线程池

本文最后更新于:几秒前

单缓冲队列线程池

在并发数量不高,或者多生产者单消费者的情况下,单个队列足够满足使用要求。常见的线程池设计如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <future>

class ThreadPool {
public:
using Task = std::function<void()>;

explicit ThreadPool(size_t numThreads) : stop_(false) {
for (size_t i = 0; i < numThreads; ++i) {
workers_.emplace_back([this] {
while (true) {
Task task;
{
std::unique_lock<std::mutex> lock(mtx_);
cv_.wait(lock, [this] { return stop_ || !tasks_.empty(); });

if (stop_ && tasks_.empty()) return; // 线程退出

task = std::move(tasks_.front());
tasks_.pop();
}
task();
}
});
}
}

template <typename F, typename... Args>
void submit(F&& f, Args&&... args){
auto task = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
{
std::lock_guard<std::mutex> lock(mtx_);
tasks_.emplace(task);
}
cv_.notify_one();
}

~ThreadPool() {
{
std::lock_guard<std::mutex> lock(mtx_);
stop_ = true;
}
cv_.notify_all(); // 唤醒所有等待线程,准备退出
for (auto& worker : workers_)
{
if (worker.joinable()) {
worker.join();
}
}
}

private:
std::vector<std::thread> workers_;
std::queue<Task> tasks_;
std::mutex mtx_;
std::condition_variable cv_;
bool stop_;
};

一般会将目标函数使用std::bind包装成function对象, 但是如果希望在外层还能够获取线程的运行结果或状态,就需要使用future包装。主要需要修改submit函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
template <typename F, typename... Args>
auto submit(F&& f, Args&&... args) {
using ReturnType = std::invoke_result_t<F, Args...>;
auto task = std::make_shared<std::packaged_task<ReturnType()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...));

std::future<ReturnType> res = task->get_future();
{
std::lock_guard<std::mutex> lock(mtx_);
if (stop_) throw std::runtime_error("ThreadPool is stopped!");
tasks_.emplace([task]() { (*task)(); });
}
cv_.notify_one();
return res;
}

重点在于submit函数,首先使用了可变参数模板,技术点如下:

  • C++11 之前,返回类型必须写在 template 函数的 auto 之前,但是此处submit函数的返回值依赖于 F(Args...) 的推导,而模板参数在 F&& f, Args&&... args 解析之前 并不知道 F(Args...) 的返回类型。所以c++11之前没法这么写

  • C++11 引入 尾置返回类型(trailing return type),允许先解析 F(Args...) 再确定返回值

    1
    2
    3
    template<typename F, typename... Args>
    auto enqueue(F&& f, Args&&... args)
    -> std::future<typename std::result_of<F(Args...)>::type>
  • 在 C++14 中,引入了 返回类型自动推导(return type deduction),可以直接写:

    1
    2
    3
    4
    5
    template<typename F, typename... Args>
    auto enqueue(F&& f, Args&&... args) {
    using ReturnType = typename std::result_of<F(Args...)>::type;
    return std::future<ReturnType>{};
    }
  • 在 C++17 及之后,std::invoke_result_t<F, Args...> 取代了 std::result_of,推荐用法:

    1
    2
    3
    4
    5
    6
    template<typename F, typename... Args>
    auto enqueue(F&& f, Args&&... args) {
    using ReturnType = std::invoke_result_t<F, Args...>;
    std::packaged_task<ReturnType()> task(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
    return task.get_future();
    }

此外,执行线程和提交任务时也可以加上异常处理。


综上,完整的线程池示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <future>

class ThreadPool {
public:
using Task = std::function<void()>;

explicit ThreadPool(size_t numThreads) : stop_(false) {
for (size_t i = 0; i < numThreads; ++i) {
workers_.emplace_back([this] {
while (true) {
Task task;
{
std::unique_lock<std::mutex> lock(mtx_);
cv_.wait(lock, [this] { return stop_ || !tasks_.empty(); });

if (stop_ && tasks_.empty()) return; // 线程退出

task = std::move(tasks_.front());
tasks_.pop();
}
try {
task();
} catch (...) {
// 捕获异常,防止线程崩溃
}
}
});
}
}

template <typename F, typename... Args>
auto submit(F&& f, Args&&... args) -> std::future<std::invoke_result_t<F, Args...>> {
using ReturnType = std::invoke_result_t<F, Args...>;
auto task = std::make_shared<std::packaged_task<ReturnType()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...));

std::future<ReturnType> res = task->get_future();
{
std::lock_guard<std::mutex> lock(mtx_);
if (stop_) throw std::runtime_error("ThreadPool is stopped!");
tasks_.emplace([task]() { (*task)(); });
}
cv_.notify_one();
return res;
}

~ThreadPool() {
{
std::lock_guard<std::mutex> lock(mtx_);
stop_ = true;
}
cv_.notify_all(); // 唤醒所有等待线程,准备退出
for (auto& worker : workers_)
{
if (worker.joinable()) {
worker.join();
}
}
}

private:
std::vector<std::thread> workers_;
std::queue<Task> tasks_;
std::mutex mtx_;
std::condition_variable cv_;
bool stop_;
};

双缓冲队列线程池

从上述代码可以看到,使用单缓冲队列时生产者和消费者共用一把锁,如果并发数较高时,容易发生碰撞。因此对于多生产者多消费者的场景,可以增加一个缓冲队列,生产者线程和消费者线程分别从一个队列中操作任务,当消费者队列空时,交换两个队列。并且使用两把锁,只有在交换队列时会同时锁住,减少碰撞。

示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <future>
#include <stdexcept>
#include <utility>

class ThreadPool {
public:
using Task = std::function<void()>;

// 构造时指定线程数
explicit ThreadPool(size_t numThreads)
: stop_(false)
{
for (size_t i = 0; i < numThreads; ++i) {
workers_.emplace_back([this] {
while (true) {
Task task;
{
// 先锁定消费者队列
std::unique_lock<std::mutex> cons_lock(cons_mtx_);
// 等待条件:停止标志为 true 或者消费队列不空,或者生产队列有任务
cv_.wait(cons_lock, [this] {
return stop_ || !consQueue_.empty() || !prodQueue_.empty();
});

// 如果消费队列为空,尝试从生产队列中交换任务到消费队列
if (consQueue_.empty()) {
// 锁定生产者队列进行交换
std::lock_guard<std::mutex> prod_lock(prod_mtx_);
if (!prodQueue_.empty()) {
std::swap(prodQueue_, consQueue_);
}
}

// 如果停止且消费队列为空,则退出线程
if (stop_ && consQueue_.empty())
return;

// 如果此时消费队列仍然为空,则继续等待
if (consQueue_.empty())
continue;

// 从消费队列中取出任务
task = std::move(consQueue_.front());
consQueue_.pop();
} // 解锁 cons_mtx_

// 执行任务
try {
task();
} catch (...) {
// 捕获异常,防止线程因任务异常退出
}
}
});
}
}

// 提交任务,返回一个 std::future 用于获取结果
template <typename F, typename... Args>
auto submit(F&& f, Args&&... args) -> std::future<std::invoke_result_t<F, Args...>> {
using ReturnType = std::invoke_result_t<F, Args...>;
// 使用 std::packaged_task 包装函数及其参数
auto task = std::make_shared<std::packaged_task<ReturnType()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);

std::future<ReturnType> res = task->get_future();
{
// 生产者只锁定生产者队列
std::lock_guard<std::mutex> lock(prod_mtx_);
if (stop_)
throw std::runtime_error("ThreadPool is stopped!");

// 将任务推入生产队列
prodQueue_.push([task]() { (*task)(); });
}
cv_.notify_one(); // 通知等待的消费者线程
return res;
}

~ThreadPool() {
{
// 锁定消费者锁设置停止标志
std::lock_guard<std::mutex> lock(cons_mtx_);
stop_ = true;
}
cv_.notify_all(); // 唤醒所有线程,使它们退出
for (auto& worker : workers_)
worker.join();
}

private:
std::vector<std::thread> workers_;
// 两个独立的任务队列:
// prodQueue_:生产者写入的队列;
// consQueue_:消费者读取的队列。
std::queue<Task> prodQueue_;
std::queue<Task> consQueue_;

// 两把锁分别保护生产者队列和消费者队列
std::mutex prod_mtx_;
std::mutex cons_mtx_;
std::condition_variable cv_;
bool stop_;
};

测试代码:

交带参数的任务,并接受返回值

1
2
3
4
5
6
7
8
9
10
11
12
int main() {
ThreadPool pool(4);

// 任务函数带参数
auto future = pool.submit([](int a, int b) {
return a + b;
}, 10, 20);

std::cout << "Sum: " << future.get() << std::endl; // 输出 30

return 0;
}

输出

1
Sum: 30

多个任务并行执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
int main() {
ThreadPool pool(4);

std::vector<std::future<int>> results;
for (int i = 0; i < 10; ++i) {
results.emplace_back(pool.submit([i] {
return i * i;
}));
}

for (auto& f : results) {
std::cout << f.get() << " "; // 输出 0 1 4 9 16 25 36 49 64 81
}
std::cout << std::endl;

return 0;
}

输出

1
0 1 4 9 16 25 36 49 64 81

在线程池析构前等待任务完成

线程池会在析构时自动等待所有线程完成,因此 main() 结束时不需要额外等待。但如果你想手动等待任务完成,可以使用 std::future::get()。

线程池安全退出

如果线程池被销毁,submit() 不能再调用:

1
2
3
4
5
{
ThreadPool pool(2);
auto future = pool.submit([] { return 123; });
std::cout << future.get() << std::endl;
} // 线程池在这里析构

如果你在线程池析构后再调用 submit(),会抛出异常:

1
2
3
ThreadPool* pool = new ThreadPool(2);
delete pool; // 线程池析构
pool->submit([] { return 456; }); // 这里会抛异常!