From 3e7ca8a48e9a788882fa201b14da2eb5b6cf8bf0 Mon Sep 17 00:00:00 2001 From: Keuin Date: Sat, 16 Apr 2022 23:41:10 +0800 Subject: New lock-free thread pool. Improve CPU utility with massive small tasks. --- CMakeLists.txt | 6 ++-- aa.h | 74 ++++++++++++++++++++++++++++-------------------- threading.h | 89 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 136 insertions(+), 33 deletions(-) create mode 100644 threading.h diff --git a/CMakeLists.txt b/CMakeLists.txt index fc3e6f3..bd4d71f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -4,6 +4,8 @@ project(rt) set(CMAKE_CXX_STANDARD 11) set(CMAKE_CXX_STANDARD_REQUIRED True) +set(CMAKE_EXE_LINKER_FLAGS "-latomic") + set(common_compiler_args "-Wall -Werror -Wno-unused -std=c++11 -pthread") set(CMAKE_CXX_FLAGS_DEBUG "${common_compiler_args} -g -ggdb -fsanitize=address -DDEBUG") set(CMAKE_CXX_FLAGS_RELEASE "${common_compiler_args} -O2") @@ -12,9 +14,9 @@ set(CMAKE_VERBOSE_MAKEFILE on) # main executable -add_executable(rt main.cpp vec.h bitmap.h ray.h bitfont.h hitlist.h object.h sphere.h viewport.h aa.h material.h material_diffusive.h material_diffusive.cpp material_reflective.h material_reflective.cpp material_dielectric.cpp material_dielectric.h tracelog.h) +add_executable(rt main.cpp vec.h bitmap.h ray.h bitfont.h hitlist.h object.h sphere.h viewport.h aa.h material.h material_diffusive.h material_diffusive.cpp material_reflective.h material_reflective.cpp material_dielectric.cpp material_dielectric.h tracelog.h threading.h) add_executable(image_output main_image_output.cpp vec.h bitmap.h bitfont.h hitlist.h object.h sphere.h viewport.h) -add_executable(simple_scanner main_simple_scanner.cpp vec.h bitmap.h ray.h timer.h bitfont.h hitlist.h object.h sphere.h viewport.h aa.h material.h material_diffusive.h material_diffusive.cpp material_reflective.h material_reflective.cpp material_dielectric.cpp material_dielectric.h tracelog.h) +add_executable(simple_scanner main_simple_scanner.cpp vec.h bitmap.h ray.h timer.h bitfont.h hitlist.h object.h sphere.h viewport.h aa.h material.h material_diffusive.h material_diffusive.cpp material_reflective.h material_reflective.cpp material_dielectric.cpp material_dielectric.h tracelog.h threading.h) # googletest diff --git a/aa.h b/aa.h index 7e6e722..f2dfd69 100644 --- a/aa.h +++ b/aa.h @@ -8,8 +8,8 @@ #include "vec.h" #include "viewport.h" #include "hitlist.h" +#include "threading.h" #include -#include #include #include @@ -30,38 +30,50 @@ public: } virtual bitmap render(const hitlist &world, vec3d viewpoint, uint16_t image_width, uint16_t image_height) { - const unsigned hwcc = std::thread::hardware_concurrency(); - std::cerr << "Rendering with " << hwcc << " thread(s)." << std::endl; - - const auto seed = 123456789012345678ULL; - std::mt19937_64 seedgen{seed}; // generates seeds for workers + static constexpr auto seed = 123456789012345678ULL; + const unsigned thread_count = std::min(std::thread::hardware_concurrency(), samples); + std::cerr << "Preparing tasks..." << std::endl; std::vector> images{samples, {1, 1}}; - std::thread t; - - std::vector workers; - unsigned remaining = samples; // tasks remaining - size_t base = 0; - while (remaining > 0) { - const unsigned n = std::min(hwcc, remaining); // threads in current batch - remaining -= n; - for (unsigned i = 0; i < n; ++i) { - workers.emplace_back(std::thread{ - [&](int tid, uint64_t seed, uint64_t diffuse_seed, std::vector> *subs, vec3d viewpoint, - uint16_t image_width, uint16_t image_height) { - bias_ctx bc{seed}; - auto image = (*subs)[tid].render( - world, viewpoint, image_width, image_height, bc, diffuse_seed); - images[base + tid] = image; - }, - i, seedgen(), seedgen(), subviews, viewpoint, image_width, image_height - }); - } - for (auto &th: workers) { - th.join(); - } - workers.clear(); - base += n; + std::mt19937_64 seedgen{seed}; // generates seeds for workers + + const struct s_render_shared { + std::vector> &subs; + vec3d viewpoint; + uint16_t image_width; + uint16_t image_height; + const hitlist &world; + std::vector> &images; + } s_{.subs=*subviews, .viewpoint = viewpoint, + .image_width=image_width, .image_height=image_height, + .world=world, .images=images + }; + + struct s_render_task { + uint32_t task_id; + uint64_t seed; + uint64_t diffuse_seed; + const s_render_shared &shared; + }; + + thread_pool pool{thread_count}; + + + for (typeof(samples) i = 0; i < samples; ++i) { + pool.submit_task([](s_render_task &task) { + bias_ctx bc{seed}; + auto image = task.shared.subs[task.task_id].render( + task.shared.world, task.shared.viewpoint, + task.shared.image_width, task.shared.image_height, + bc, task.diffuse_seed); + task.shared.images[task.task_id] = image; + }, s_render_task{ + .task_id = i, .seed=seedgen(), .diffuse_seed=seedgen(), .shared=s_ + }); } + + std::cerr << "Rendering with " << thread_count << " thread(s)." << std::endl; + pool.start(); + pool.wait(); return bitmap::average(images); } diff --git a/threading.h b/threading.h new file mode 100644 index 0000000..0b11d60 --- /dev/null +++ b/threading.h @@ -0,0 +1,89 @@ +// +// Created by Keuin on 2022/4/16. +// + +#ifndef RT_THREADING_H +#define RT_THREADING_H + +#include +#include +#include +#include +#include +#include +#include + +// A simple once-usage thread pool and task queue. +// Using lock-free atomic counter to avoid expensive queue or synchronization mechanism. +// Tasks should be added into the queue before starting. +// Once the task queue is empty, threads quit. + +// internal usage +template +struct s_task { + void (*f)(T &); + + T arg; +}; + +template +class thread_pool { + + unsigned thread_count; + std::vector workers; + std::atomic counter{0}; // index to the first available task in queue + std::vector> tasks; + + void worker_main(); + +public: + explicit thread_pool(unsigned thread_count) : thread_count{thread_count} { + std::cerr << "Using " << (counter.is_lock_free() ? "lock-free" : "locking") << " dispatcher." << std::endl; + } + + // Thread unsafe! + void submit_task(void (*f)(T &), T &&t); + + void start(); + + // Wait the queue to become empty + void wait(); +}; + +template +void thread_pool::start() { + if (workers.empty()) { + for (typeof(thread_count) i = 0; i < thread_count; ++i) { + workers.emplace_back(std::thread{[this]() { this->worker_main(); }}); + } + } else { + // TODO + } +} + +template +void thread_pool::worker_main() { + const auto max_cnt = tasks.size(); + while (true) { + const auto i = counter.fetch_add(1, std::memory_order_relaxed); // we only need atomicity + if (i >= max_cnt) break; // all tasks are done + auto &task = tasks[i]; + task.f(task.arg); + } +} + +// Do not submit after starting. +template +void thread_pool::submit_task(void (*f)(T &), T &&t) { + tasks.push_back(s_task{.f=f, .arg=std::move(t)}); +} + +template +void thread_pool::wait() { + for (auto &th: workers) { + th.join(); + } +} + + +#endif //RT_THREADING_H -- cgit v1.2.3