看到有些面经说被要求手撕线程池啊,多少有点吓人

所以感觉准备了一下,虽然感觉也背不下来 QAQ

C++17 实现,纯代码 80 行左右,可以实际运行,参考文献在文末。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
#include <iostream>  
#include <thread>
#include <condition_variable>
#include <mutex>
#include <vector>
#include <future>
#include <functional>
#include <queue>
#include <array>

using namespace std;

// 线程池类
class ThreadPool {
private:
vector<thread> m_threads; // 线程队列
queue<function<void()>> m_tasks; // 任务队列
mutex m_mutex; // 任务队列锁
condition_variable m_cv; // 线程阻塞的条件变量
uint16_t m_threads_num; // 线程数量
bool m_stop; // 停机标记

public:
// 构造函数,初始化线程队列
ThreadPool(int _threadNm) : m_threads_num(_threadNm), m_stop(false) {
cout << "Thread pool constructing..." << endl;
for (size_t i = 0; i < m_threads_num; i++) {
m_threads.emplace_back([this]() {
while (true) {
function<void()> task;
{
// 任务队列加锁
unique_lock<mutex> lock(m_mutex);
// 阻塞,直到满足条件 任务队列非空 或者 停机标记为真
m_cv.wait(lock, [this](){return !m_tasks.empty() || m_stop;});
// 如果无任务且停机,则结束线程
if (m_stop && m_tasks.empty()) {
return;
}
// 获取任务
task = move(m_tasks.front());
m_tasks.pop();
}
// 执行任务
task();
}
});
}
}

// 新增任务
// 返回值是一个 future
template<typename F, typename... Args>
auto addTask(F&& _task, Args&&... _args) -> future<invoke_result_t<F, Args...>> {
// 简化返回类型
// invoke_result_t 用于获取函数返回值类型
using return_type = invoke_result_t<F, Args...>;
// 创建共享指针,指向任务的 packaged_task 对象
// 用 bind 绑定传入的任务和参数
// packaged_task 是 C++ 标准库中的一个类模板,用于将函数(task)与 future 关联起来。
auto task = make_shared<packaged_task<return_type()>>(
bind(forward<F>(_task), forward<Args>(_args)...));
{
// 获取任务队列锁
// 在临界区内创建了 lambda 函数并将其添加到任务队列中,确保线程安全
lock_guard<mutex> lock(m_mutex);
m_tasks.emplace([task]() { (*task)(); });
}
// 从 packaged_task 中获取与之关联的 future 对象
future<return_type> future = task->get_future();
// 通知阻塞线程又新任务加入
m_cv.notify_one();
return future;
}

// 关闭线程池
void stop() {
{
// 设置停机标识
unique_lock<mutex> lock(m_mutex);
m_stop = true;
}
// 通知所有阻塞线程
m_cv.notify_all();
}

// 析构函数
~ThreadPool() {
// 执行线程关闭函数
stop();
// 等待所有线程结束
for (auto &thread : m_threads) {
if (thread.joinable()) {
thread.join();
}
}
cout << "Thread pool destructed." << endl;
}
};

int calculate(const int &n) {
cout << "Task: " << n << " calculating: " << n << " * " << n << endl;
return n * n;
}

int main() {
ThreadPool pool(4);

array<future<int>, 10> futures;
for (int i = 0; i < 10; ++i) {
futures[i] = pool.addTask(calculate, i);
}

for (int i = 0; i < 10; ++i) {
cout << "Calculate result: " << futures[i].get() << endl;
}

return 0;
}

参考:Ray-Bos C++手写简易线程池