// // Created by Keuin on 2022/4/16. // #ifndef RT_THREADING_H #define RT_THREADING_H #include #include #include #include #include #include #include // A simple once-usage thread pool and task queue. // Using lock-free atomic counter to avoid expensive queue or synchronization mechanism. // Tasks should be added into the queue before starting. // Once the task queue is empty, threads quit. template using task_func_t = void (*)(size_t, T_Args &, const T_ImmuCtx &, T_MutCtx &); // internal usage template struct s_task { task_func_t f; T arg; }; template class thread_pool { unsigned thread_count; std::vector workers; std::atomic counter{0}; // index to the first available task in queue std::vector> tasks; const U &shared_ctx; // reference to immutable shared context V &mut_shared_ctx; // mutable shared context void worker_main(); public: explicit thread_pool(unsigned thread_count, const U &shared_ctx, V &mut_shared_ctx) : thread_count{thread_count}, shared_ctx{shared_ctx}, mut_shared_ctx{mut_shared_ctx} { std::cerr << "Using " << (counter.is_lock_free() ? "lock-free" : "locking") << " dispatcher." << std::endl; } // Thread unsafe! void submit_task(task_func_t f, T &&t); void start(); // Wait the queue to become empty void wait(); }; template void thread_pool::start() { if (workers.empty()) { for (typeof(thread_count) i = 0; i < thread_count; ++i) { workers.emplace_back(std::thread{&thread_pool::worker_main, this}); } } else { // TODO } } template void thread_pool::worker_main() { const auto max_cnt = tasks.size(); while (true) { const auto i = counter.fetch_add(1, std::memory_order_relaxed); // we only need atomicity if (i >= max_cnt) break; // all tasks are done auto &task = tasks[i]; task.f(i, task.arg, shared_ctx, mut_shared_ctx); } } // Do not submit after starting. template void thread_pool::submit_task(task_func_t f, T &&t) { tasks.push_back(s_task{.f=f, .arg=std::move(t)}); } template void thread_pool::wait() { for (auto &th: workers) { th.join(); } } #endif //RT_THREADING_H