summaryrefslogtreecommitdiff
path: root/threading.h
blob: 73a174171c6076e286339b5b81c50dffeddf3436 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
//
// Created by Keuin on 2022/4/16.
//

#ifndef RT_THREADING_H
#define RT_THREADING_H

#include <vector>
#include <thread>
#include <memory>
#include <deque>
#include <mutex>
#include <atomic>
#include <iostream>

// A simple once-usage thread pool and task queue.
// Using lock-free atomic counter to avoid expensive queue or synchronization mechanism.
// Tasks should be added into the queue before starting.
// Once the task queue is empty, threads quit.

// internal usage
template<typename T>
struct s_task {
    void (*f)(T &);

    T arg;
};

template<typename T>
class thread_pool {

    unsigned thread_count;
    std::vector<std::thread> workers;
    std::atomic<size_t> counter{0}; // index to the first available task in queue
    std::vector<s_task<T>> tasks;

    void worker_main();

public:
    explicit thread_pool(unsigned thread_count) : thread_count{thread_count} {
        std::cerr << "Using " << (counter.is_lock_free() ? "lock-free" : "locking") << " dispatcher." << std::endl;
    }

    // Thread unsafe!
    void submit_task(void (*f)(T &), T &&t);

    void start();

    // Wait the queue to become empty
    void wait();
};

template<typename T>
void thread_pool<T>::start() {
    if (workers.empty()) {
        for (typeof(thread_count) i = 0; i < thread_count; ++i) {
            workers.emplace_back(std::thread{&thread_pool<T>::worker_main, this});
        }
    } else {
        // TODO
    }
}

template<typename T>
void thread_pool<T>::worker_main() {
    const auto max_cnt = tasks.size();
    while (true) {
        const auto i = counter.fetch_add(1, std::memory_order_relaxed); // we only need atomicity
        if (i >= max_cnt) break; // all tasks are done
        auto &task = tasks[i];
        task.f(task.arg);
    }
}

// Do not submit after starting.
template<typename T>
void thread_pool<T>::submit_task(void (*f)(T &), T &&t) {
    tasks.push_back(s_task<T>{.f=f, .arg=std::move(t)});
}

template<typename T>
void thread_pool<T>::wait() {
    for (auto &th: workers) {
        th.join();
    }
}


#endif //RT_THREADING_H