summaryrefslogtreecommitdiff
path: root/threading.h
diff options
context:
space:
mode:
authorKeuin <[email protected]>2022-04-16 23:41:10 +0800
committerKeuin <[email protected]>2022-04-17 10:45:59 +0800
commit3e7ca8a48e9a788882fa201b14da2eb5b6cf8bf0 (patch)
treee877dd4786684d67c28b06144bc75be83fbbb155 /threading.h
parent9992f29a6222e1822f23329b2f49eef62dd949fe (diff)
New lock-free thread pool. Improve CPU utility with massive small tasks.
Diffstat (limited to 'threading.h')
-rw-r--r--threading.h89
1 files changed, 89 insertions, 0 deletions
diff --git a/threading.h b/threading.h
new file mode 100644
index 0000000..0b11d60
--- /dev/null
+++ b/threading.h
@@ -0,0 +1,89 @@
+//
+// Created by Keuin on 2022/4/16.
+//
+
+#ifndef RT_THREADING_H
+#define RT_THREADING_H
+
+#include <vector>
+#include <thread>
+#include <memory>
+#include <deque>
+#include <mutex>
+#include <atomic>
+#include <iostream>
+
+// A simple once-usage thread pool and task queue.
+// Using lock-free atomic counter to avoid expensive queue or synchronization mechanism.
+// Tasks should be added into the queue before starting.
+// Once the task queue is empty, threads quit.
+
+// internal usage
+template<typename T>
+struct s_task {
+ void (*f)(T &);
+
+ T arg;
+};
+
+template<typename T>
+class thread_pool {
+
+ unsigned thread_count;
+ std::vector<std::thread> workers;
+ std::atomic<size_t> counter{0}; // index to the first available task in queue
+ std::vector<s_task<T>> tasks;
+
+ void worker_main();
+
+public:
+ explicit thread_pool(unsigned thread_count) : thread_count{thread_count} {
+ std::cerr << "Using " << (counter.is_lock_free() ? "lock-free" : "locking") << " dispatcher." << std::endl;
+ }
+
+ // Thread unsafe!
+ void submit_task(void (*f)(T &), T &&t);
+
+ void start();
+
+ // Wait the queue to become empty
+ void wait();
+};
+
+template<typename T>
+void thread_pool<T>::start() {
+ if (workers.empty()) {
+ for (typeof(thread_count) i = 0; i < thread_count; ++i) {
+ workers.emplace_back(std::thread{[this]() { this->worker_main(); }});
+ }
+ } else {
+ // TODO
+ }
+}
+
+template<typename T>
+void thread_pool<T>::worker_main() {
+ const auto max_cnt = tasks.size();
+ while (true) {
+ const auto i = counter.fetch_add(1, std::memory_order_relaxed); // we only need atomicity
+ if (i >= max_cnt) break; // all tasks are done
+ auto &task = tasks[i];
+ task.f(task.arg);
+ }
+}
+
+// Do not submit after starting.
+template<typename T>
+void thread_pool<T>::submit_task(void (*f)(T &), T &&t) {
+ tasks.push_back(s_task<T>{.f=f, .arg=std::move(t)});
+}
+
+template<typename T>
+void thread_pool<T>::wait() {
+ for (auto &th: workers) {
+ th.join();
+ }
+}
+
+
+#endif //RT_THREADING_H