summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKeuin <[email protected]>2022-04-16 23:41:10 +0800
committerKeuin <[email protected]>2022-04-17 10:45:59 +0800
commit3e7ca8a48e9a788882fa201b14da2eb5b6cf8bf0 (patch)
treee877dd4786684d67c28b06144bc75be83fbbb155
parent9992f29a6222e1822f23329b2f49eef62dd949fe (diff)
New lock-free thread pool. Improve CPU utility with massive small tasks.
-rw-r--r--CMakeLists.txt6
-rw-r--r--aa.h74
-rw-r--r--threading.h89
3 files changed, 136 insertions, 33 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index fc3e6f3..bd4d71f 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -4,6 +4,8 @@ project(rt)
set(CMAKE_CXX_STANDARD 11)
set(CMAKE_CXX_STANDARD_REQUIRED True)
+set(CMAKE_EXE_LINKER_FLAGS "-latomic")
+
set(common_compiler_args "-Wall -Werror -Wno-unused -std=c++11 -pthread")
set(CMAKE_CXX_FLAGS_DEBUG "${common_compiler_args} -g -ggdb -fsanitize=address -DDEBUG")
set(CMAKE_CXX_FLAGS_RELEASE "${common_compiler_args} -O2")
@@ -12,9 +14,9 @@ set(CMAKE_VERBOSE_MAKEFILE on)
# main executable
-add_executable(rt main.cpp vec.h bitmap.h ray.h bitfont.h hitlist.h object.h sphere.h viewport.h aa.h material.h material_diffusive.h material_diffusive.cpp material_reflective.h material_reflective.cpp material_dielectric.cpp material_dielectric.h tracelog.h)
+add_executable(rt main.cpp vec.h bitmap.h ray.h bitfont.h hitlist.h object.h sphere.h viewport.h aa.h material.h material_diffusive.h material_diffusive.cpp material_reflective.h material_reflective.cpp material_dielectric.cpp material_dielectric.h tracelog.h threading.h)
add_executable(image_output main_image_output.cpp vec.h bitmap.h bitfont.h hitlist.h object.h sphere.h viewport.h)
-add_executable(simple_scanner main_simple_scanner.cpp vec.h bitmap.h ray.h timer.h bitfont.h hitlist.h object.h sphere.h viewport.h aa.h material.h material_diffusive.h material_diffusive.cpp material_reflective.h material_reflective.cpp material_dielectric.cpp material_dielectric.h tracelog.h)
+add_executable(simple_scanner main_simple_scanner.cpp vec.h bitmap.h ray.h timer.h bitfont.h hitlist.h object.h sphere.h viewport.h aa.h material.h material_diffusive.h material_diffusive.cpp material_reflective.h material_reflective.cpp material_dielectric.cpp material_dielectric.h tracelog.h threading.h)
# googletest
diff --git a/aa.h b/aa.h
index 7e6e722..f2dfd69 100644
--- a/aa.h
+++ b/aa.h
@@ -8,8 +8,8 @@
#include "vec.h"
#include "viewport.h"
#include "hitlist.h"
+#include "threading.h"
#include <vector>
-#include <thread>
#include <algorithm>
#include <random>
@@ -30,38 +30,50 @@ public:
}
virtual bitmap<T> render(const hitlist &world, vec3d viewpoint, uint16_t image_width, uint16_t image_height) {
- const unsigned hwcc = std::thread::hardware_concurrency();
- std::cerr << "Rendering with " << hwcc << " thread(s)." << std::endl;
-
- const auto seed = 123456789012345678ULL;
- std::mt19937_64 seedgen{seed}; // generates seeds for workers
+ static constexpr auto seed = 123456789012345678ULL;
+ const unsigned thread_count = std::min(std::thread::hardware_concurrency(), samples);
+ std::cerr << "Preparing tasks..." << std::endl;
std::vector<bitmap<T>> images{samples, {1, 1}};
- std::thread t;
-
- std::vector<std::thread> workers;
- unsigned remaining = samples; // tasks remaining
- size_t base = 0;
- while (remaining > 0) {
- const unsigned n = std::min(hwcc, remaining); // threads in current batch
- remaining -= n;
- for (unsigned i = 0; i < n; ++i) {
- workers.emplace_back(std::thread{
- [&](int tid, uint64_t seed, uint64_t diffuse_seed, std::vector<basic_viewport<T>> *subs, vec3d viewpoint,
- uint16_t image_width, uint16_t image_height) {
- bias_ctx bc{seed};
- auto image = (*subs)[tid].render(
- world, viewpoint, image_width, image_height, bc, diffuse_seed);
- images[base + tid] = image;
- },
- i, seedgen(), seedgen(), subviews, viewpoint, image_width, image_height
- });
- }
- for (auto &th: workers) {
- th.join();
- }
- workers.clear();
- base += n;
+ std::mt19937_64 seedgen{seed}; // generates seeds for workers
+
+ const struct s_render_shared {
+ std::vector<basic_viewport<T>> &subs;
+ vec3d viewpoint;
+ uint16_t image_width;
+ uint16_t image_height;
+ const hitlist &world;
+ std::vector<bitmap<T>> &images;
+ } s_{.subs=*subviews, .viewpoint = viewpoint,
+ .image_width=image_width, .image_height=image_height,
+ .world=world, .images=images
+ };
+
+ struct s_render_task {
+ uint32_t task_id;
+ uint64_t seed;
+ uint64_t diffuse_seed;
+ const s_render_shared &shared;
+ };
+
+ thread_pool<s_render_task> pool{thread_count};
+
+
+ for (typeof(samples) i = 0; i < samples; ++i) {
+ pool.submit_task([](s_render_task &task) {
+ bias_ctx bc{seed};
+ auto image = task.shared.subs[task.task_id].render(
+ task.shared.world, task.shared.viewpoint,
+ task.shared.image_width, task.shared.image_height,
+ bc, task.diffuse_seed);
+ task.shared.images[task.task_id] = image;
+ }, s_render_task{
+ .task_id = i, .seed=seedgen(), .diffuse_seed=seedgen(), .shared=s_
+ });
}
+
+ std::cerr << "Rendering with " << thread_count << " thread(s)." << std::endl;
+ pool.start();
+ pool.wait();
return bitmap<T>::average(images);
}
diff --git a/threading.h b/threading.h
new file mode 100644
index 0000000..0b11d60
--- /dev/null
+++ b/threading.h
@@ -0,0 +1,89 @@
+//
+// Created by Keuin on 2022/4/16.
+//
+
+#ifndef RT_THREADING_H
+#define RT_THREADING_H
+
+#include <vector>
+#include <thread>
+#include <memory>
+#include <deque>
+#include <mutex>
+#include <atomic>
+#include <iostream>
+
+// A simple once-usage thread pool and task queue.
+// Using lock-free atomic counter to avoid expensive queue or synchronization mechanism.
+// Tasks should be added into the queue before starting.
+// Once the task queue is empty, threads quit.
+
+// internal usage
+template<typename T>
+struct s_task {
+ void (*f)(T &);
+
+ T arg;
+};
+
+template<typename T>
+class thread_pool {
+
+ unsigned thread_count;
+ std::vector<std::thread> workers;
+ std::atomic<size_t> counter{0}; // index to the first available task in queue
+ std::vector<s_task<T>> tasks;
+
+ void worker_main();
+
+public:
+ explicit thread_pool(unsigned thread_count) : thread_count{thread_count} {
+ std::cerr << "Using " << (counter.is_lock_free() ? "lock-free" : "locking") << " dispatcher." << std::endl;
+ }
+
+ // Thread unsafe!
+ void submit_task(void (*f)(T &), T &&t);
+
+ void start();
+
+ // Wait the queue to become empty
+ void wait();
+};
+
+template<typename T>
+void thread_pool<T>::start() {
+ if (workers.empty()) {
+ for (typeof(thread_count) i = 0; i < thread_count; ++i) {
+ workers.emplace_back(std::thread{[this]() { this->worker_main(); }});
+ }
+ } else {
+ // TODO
+ }
+}
+
+template<typename T>
+void thread_pool<T>::worker_main() {
+ const auto max_cnt = tasks.size();
+ while (true) {
+ const auto i = counter.fetch_add(1, std::memory_order_relaxed); // we only need atomicity
+ if (i >= max_cnt) break; // all tasks are done
+ auto &task = tasks[i];
+ task.f(task.arg);
+ }
+}
+
+// Do not submit after starting.
+template<typename T>
+void thread_pool<T>::submit_task(void (*f)(T &), T &&t) {
+ tasks.push_back(s_task<T>{.f=f, .arg=std::move(t)});
+}
+
+template<typename T>
+void thread_pool<T>::wait() {
+ for (auto &th: workers) {
+ th.join();
+ }
+}
+
+
+#endif //RT_THREADING_H