From 3e7ca8a48e9a788882fa201b14da2eb5b6cf8bf0 Mon Sep 17 00:00:00 2001 From: Keuin Date: Sat, 16 Apr 2022 23:41:10 +0800 Subject: New lock-free thread pool. Improve CPU utility with massive small tasks. --- threading.h | 89 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 89 insertions(+) create mode 100644 threading.h (limited to 'threading.h') diff --git a/threading.h b/threading.h new file mode 100644 index 0000000..0b11d60 --- /dev/null +++ b/threading.h @@ -0,0 +1,89 @@ +// +// Created by Keuin on 2022/4/16. +// + +#ifndef RT_THREADING_H +#define RT_THREADING_H + +#include +#include +#include +#include +#include +#include +#include + +// A simple once-usage thread pool and task queue. +// Using lock-free atomic counter to avoid expensive queue or synchronization mechanism. +// Tasks should be added into the queue before starting. +// Once the task queue is empty, threads quit. + +// internal usage +template +struct s_task { + void (*f)(T &); + + T arg; +}; + +template +class thread_pool { + + unsigned thread_count; + std::vector workers; + std::atomic counter{0}; // index to the first available task in queue + std::vector> tasks; + + void worker_main(); + +public: + explicit thread_pool(unsigned thread_count) : thread_count{thread_count} { + std::cerr << "Using " << (counter.is_lock_free() ? "lock-free" : "locking") << " dispatcher." << std::endl; + } + + // Thread unsafe! + void submit_task(void (*f)(T &), T &&t); + + void start(); + + // Wait the queue to become empty + void wait(); +}; + +template +void thread_pool::start() { + if (workers.empty()) { + for (typeof(thread_count) i = 0; i < thread_count; ++i) { + workers.emplace_back(std::thread{[this]() { this->worker_main(); }}); + } + } else { + // TODO + } +} + +template +void thread_pool::worker_main() { + const auto max_cnt = tasks.size(); + while (true) { + const auto i = counter.fetch_add(1, std::memory_order_relaxed); // we only need atomicity + if (i >= max_cnt) break; // all tasks are done + auto &task = tasks[i]; + task.f(task.arg); + } +} + +// Do not submit after starting. +template +void thread_pool::submit_task(void (*f)(T &), T &&t) { + tasks.push_back(s_task{.f=f, .arg=std::move(t)}); +} + +template +void thread_pool::wait() { + for (auto &th: workers) { + th.join(); + } +} + + +#endif //RT_THREADING_H -- cgit v1.2.3