summaryrefslogtreecommitdiff
path: root/threading.h
blob: 6d79494685ccb0a36b75ebae108c8a0b3a9e039d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
//
// Created by Keuin on 2022/4/16.
//

#ifndef RT_THREADING_H
#define RT_THREADING_H

#include <vector>
#include <thread>
#include <memory>
#include <deque>
#include <mutex>
#include <atomic>
#include <iostream>

// A simple once-usage thread pool and task queue.
// Using lock-free atomic counter to avoid expensive queue or synchronization mechanism.
// Tasks should be added into the queue before starting.
// Once the task queue is empty, threads quit.

template<typename T_Args, typename T_ImmuCtx, typename T_MutCtx>
using task_func_t = void (*)(size_t, T_Args &, const T_ImmuCtx &, T_MutCtx &);

// internal usage
template<typename T, typename U, typename V>
struct s_task {
    task_func_t<T, U, V> f;
    T arg;
};

template<typename T, typename U, typename V>
class thread_pool {

    unsigned thread_count;
    std::vector<std::thread> workers;
    std::atomic<size_t> counter{0}; // index to the first available task in queue
    std::vector<s_task<T, U, V>> tasks;
    const U &shared_ctx; // reference to immutable shared context
    V &mut_shared_ctx; // mutable shared context

    void worker_main();

public:
    explicit thread_pool(unsigned thread_count, const U &shared_ctx, V &mut_shared_ctx) :
            thread_count{thread_count}, shared_ctx{shared_ctx}, mut_shared_ctx{mut_shared_ctx} {
        std::cerr << "Using " << (counter.is_lock_free() ? "lock-free" : "locking") << " dispatcher." << std::endl;
    }

    // Thread unsafe!
    void submit_task(task_func_t<T, U, V> f, T &&t);

    void start();

    // Wait the queue to become empty
    void wait();
};

template<typename T, typename U, typename V>
void thread_pool<T, U, V>::start() {
    if (workers.empty()) {
        for (typeof(thread_count) i = 0; i < thread_count; ++i) {
            workers.emplace_back(std::thread{&thread_pool<T, U, V>::worker_main, this});
        }
    } else {
        // TODO
    }
}

template<typename T, typename U, typename V>
void thread_pool<T, U, V>::worker_main() {
    const auto max_cnt = tasks.size();
    while (true) {
        const auto i = counter.fetch_add(1, std::memory_order_relaxed); // we only need atomicity
        if (i >= max_cnt) break; // all tasks are done
        auto &task = tasks[i];
        task.f(i, task.arg, shared_ctx, mut_shared_ctx);
    }
}

// Do not submit after starting.
template<typename T, typename U, typename V>
void thread_pool<T, U, V>::submit_task(task_func_t<T, U, V> f, T &&t) {
    tasks.push_back(s_task<T, U, V>{.f=f, .arg=std::move(t)});
}

template<typename T, typename U, typename V>
void thread_pool<T, U, V>::wait() {
    for (auto &th: workers) {
        th.join();
    }
}


#endif //RT_THREADING_H