blob: 0b11d6024a1db91b62ff33db9f8e9cbfebd3dd36 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
|
//
// Created by Keuin on 2022/4/16.
//
#ifndef RT_THREADING_H
#define RT_THREADING_H
#include <vector>
#include <thread>
#include <memory>
#include <deque>
#include <mutex>
#include <atomic>
#include <iostream>
// A simple once-usage thread pool and task queue.
// Using lock-free atomic counter to avoid expensive queue or synchronization mechanism.
// Tasks should be added into the queue before starting.
// Once the task queue is empty, threads quit.
// internal usage
template<typename T>
struct s_task {
void (*f)(T &);
T arg;
};
template<typename T>
class thread_pool {
unsigned thread_count;
std::vector<std::thread> workers;
std::atomic<size_t> counter{0}; // index to the first available task in queue
std::vector<s_task<T>> tasks;
void worker_main();
public:
explicit thread_pool(unsigned thread_count) : thread_count{thread_count} {
std::cerr << "Using " << (counter.is_lock_free() ? "lock-free" : "locking") << " dispatcher." << std::endl;
}
// Thread unsafe!
void submit_task(void (*f)(T &), T &&t);
void start();
// Wait the queue to become empty
void wait();
};
template<typename T>
void thread_pool<T>::start() {
if (workers.empty()) {
for (typeof(thread_count) i = 0; i < thread_count; ++i) {
workers.emplace_back(std::thread{[this]() { this->worker_main(); }});
}
} else {
// TODO
}
}
template<typename T>
void thread_pool<T>::worker_main() {
const auto max_cnt = tasks.size();
while (true) {
const auto i = counter.fetch_add(1, std::memory_order_relaxed); // we only need atomicity
if (i >= max_cnt) break; // all tasks are done
auto &task = tasks[i];
task.f(task.arg);
}
}
// Do not submit after starting.
template<typename T>
void thread_pool<T>::submit_task(void (*f)(T &), T &&t) {
tasks.push_back(s_task<T>{.f=f, .arg=std::move(t)});
}
template<typename T>
void thread_pool<T>::wait() {
for (auto &th: workers) {
th.join();
}
}
#endif //RT_THREADING_H
|