blob: bd51711fd387cf7de8c8d2776dd521e3102b0486 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
|
package com.keuin.kbackupfabric.util;
import com.keuin.kbackupfabric.backup.incremental.ObjectCollectionFactory;
import java.util.HashSet;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Consumer;
import java.util.function.Supplier;
/**
* A single-consumer, multiple-producer model.
*
* @param <Res> type of the resource to be produced and consumed.
*/
public class ParallelSupplier<Res> {
private final Consumer<Res> consumer;
private final ConcurrentLinkedQueue<Supplier<Res>> taskList = new ConcurrentLinkedQueue<>();
private final int threads;
private final Set<ParallelWorker<Res>> workers = new HashSet<>();
public ParallelSupplier(Consumer<Res> consumer, int threads) {
this.consumer = consumer;
this.threads = threads;
}
public void addTask(Supplier<Res> valueSupplier) {
this.taskList.add(valueSupplier);
}
public void process() {
workers.clear();
for (int i = 0; i < threads; i++) {
ParallelWorker<Res> worker = new ParallelWorker<>(taskList, consumer, i);
workers.add(worker);
worker.start();
}
join(); // wait for all workers to exit before returning
}
private void join() {
while (true) {
int aliveCount = 0;
for (ParallelWorker<Res> worker : workers) {
try {
if (worker.isAlive()) {
++aliveCount;
worker.join();
}
} catch (InterruptedException ignored) {
}
}
if (aliveCount == 0)
return;
}
}
private static class ParallelWorker<V> extends Thread {
private final Queue<Supplier<V>> taskProvider;
private final Consumer<V> consumer;
public ParallelWorker(Queue<Supplier<V>> taskProvider, Consumer<V> consumer, int workerId) {
super("PutterWorker#" + workerId);
this.taskProvider = taskProvider;
this.consumer = consumer;
}
@Override
public void run() {
Supplier<V> supplier;
while ((supplier = taskProvider.poll()) != null) {
// here we do not let the consumer accept null productions
Optional.ofNullable(supplier.get()).ifPresent(consumer);
}
}
}
}
|