summaryrefslogtreecommitdiff
path: root/src/main/java/com/keuin
diff options
context:
space:
mode:
authorKeuin <[email protected]>2023-01-17 01:33:44 +0800
committerKeuin <[email protected]>2023-01-17 01:33:44 +0800
commit61aa66b558ae8f2987a238216ac01e375f195c1e (patch)
tree4256ad9c7075ed119daa9ce55c5a0cbadf63c359 /src/main/java/com/keuin
parent822a5c5f53122763bf63e0baff79c435b1a60dbc (diff)
Make ParallelSupplier a top-level class.
Diffstat (limited to 'src/main/java/com/keuin')
-rw-r--r--src/main/java/com/keuin/kbackupfabric/backup/incremental/ObjectCollectionFactory.java74
-rw-r--r--src/main/java/com/keuin/kbackupfabric/util/ParallelSupplier.java81
2 files changed, 82 insertions, 73 deletions
diff --git a/src/main/java/com/keuin/kbackupfabric/backup/incremental/ObjectCollectionFactory.java b/src/main/java/com/keuin/kbackupfabric/backup/incremental/ObjectCollectionFactory.java
index 095108b..e8fd1f0 100644
--- a/src/main/java/com/keuin/kbackupfabric/backup/incremental/ObjectCollectionFactory.java
+++ b/src/main/java/com/keuin/kbackupfabric/backup/incremental/ObjectCollectionFactory.java
@@ -2,6 +2,7 @@ package com.keuin.kbackupfabric.backup.incremental;
import com.keuin.kbackupfabric.backup.incremental.identifier.FileIdentifierProvider;
import com.keuin.kbackupfabric.backup.incremental.identifier.ObjectIdentifier;
+import com.keuin.kbackupfabric.util.ParallelSupplier;
import com.keuin.kbackupfabric.util.PrintUtil;
import java.io.File;
@@ -10,9 +11,6 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.function.Consumer;
-import java.util.function.Supplier;
import java.util.stream.Stream;
/**
@@ -104,74 +102,4 @@ public class ObjectCollectionFactory<T extends ObjectIdentifier> {
this.exception = e;
}
- /**
- * A single-consumer, multiple-producer model.
- *
- * @param <Res> type of the resource to be produced and consumed.
- */
- private static class ParallelSupplier<Res> {
-
- private final Consumer<Res> consumer;
- private final ConcurrentLinkedQueue<Supplier<Res>> taskList = new ConcurrentLinkedQueue<>();
- private final int threads;
- private final Set<ParallelWorker<Res>> workers = new HashSet<>();
-
- public ParallelSupplier(Consumer<Res> consumer, int threads) {
- this.consumer = consumer;
- this.threads = threads;
- }
-
- public void addTask(Supplier<Res> valueSupplier) {
- this.taskList.add(valueSupplier);
- }
-
- public void process() {
- workers.clear();
- for (int i = 0; i < threads; i++) {
- ParallelWorker<Res> worker = new ParallelWorker<>(taskList, consumer, i);
- workers.add(worker);
- worker.start();
- }
- join(); // wait for all workers to exit before returning
- }
-
- private void join() {
- while (true) {
- int aliveCount = 0;
- for (ParallelWorker<Res> worker : workers) {
- try {
- if (worker.isAlive()) {
- ++aliveCount;
- worker.join();
- }
- } catch (InterruptedException ignored) {
- }
- }
- if (aliveCount == 0)
- return;
- }
- }
-
- private static class ParallelWorker<V> extends Thread {
-
- private final Queue<Supplier<V>> taskProvider;
- private final Consumer<V> consumer;
-
- public ParallelWorker(Queue<Supplier<V>> taskProvider, Consumer<V> consumer, int workerId) {
- super("PutterWorker#" + workerId);
- this.taskProvider = taskProvider;
- this.consumer = consumer;
- }
-
- @Override
- public void run() {
- Supplier<V> supplier;
- while ((supplier = taskProvider.poll()) != null) {
- // here we do not let the consumer accept null productions
- Optional.ofNullable(supplier.get()).ifPresent(consumer);
- }
- }
- }
- }
-
}
diff --git a/src/main/java/com/keuin/kbackupfabric/util/ParallelSupplier.java b/src/main/java/com/keuin/kbackupfabric/util/ParallelSupplier.java
new file mode 100644
index 0000000..bd51711
--- /dev/null
+++ b/src/main/java/com/keuin/kbackupfabric/util/ParallelSupplier.java
@@ -0,0 +1,81 @@
+package com.keuin.kbackupfabric.util;
+
+import com.keuin.kbackupfabric.backup.incremental.ObjectCollectionFactory;
+
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+/**
+ * A single-consumer, multiple-producer model.
+ *
+ * @param <Res> type of the resource to be produced and consumed.
+ */
+public class ParallelSupplier<Res> {
+
+ private final Consumer<Res> consumer;
+ private final ConcurrentLinkedQueue<Supplier<Res>> taskList = new ConcurrentLinkedQueue<>();
+ private final int threads;
+ private final Set<ParallelWorker<Res>> workers = new HashSet<>();
+
+ public ParallelSupplier(Consumer<Res> consumer, int threads) {
+ this.consumer = consumer;
+ this.threads = threads;
+ }
+
+ public void addTask(Supplier<Res> valueSupplier) {
+ this.taskList.add(valueSupplier);
+ }
+
+ public void process() {
+ workers.clear();
+ for (int i = 0; i < threads; i++) {
+ ParallelWorker<Res> worker = new ParallelWorker<>(taskList, consumer, i);
+ workers.add(worker);
+ worker.start();
+ }
+ join(); // wait for all workers to exit before returning
+ }
+
+ private void join() {
+ while (true) {
+ int aliveCount = 0;
+ for (ParallelWorker<Res> worker : workers) {
+ try {
+ if (worker.isAlive()) {
+ ++aliveCount;
+ worker.join();
+ }
+ } catch (InterruptedException ignored) {
+ }
+ }
+ if (aliveCount == 0)
+ return;
+ }
+ }
+
+ private static class ParallelWorker<V> extends Thread {
+
+ private final Queue<Supplier<V>> taskProvider;
+ private final Consumer<V> consumer;
+
+ public ParallelWorker(Queue<Supplier<V>> taskProvider, Consumer<V> consumer, int workerId) {
+ super("PutterWorker#" + workerId);
+ this.taskProvider = taskProvider;
+ this.consumer = consumer;
+ }
+
+ @Override
+ public void run() {
+ Supplier<V> supplier;
+ while ((supplier = taskProvider.poll()) != null) {
+ // here we do not let the consumer accept null productions
+ Optional.ofNullable(supplier.get()).ifPresent(consumer);
+ }
+ }
+ }
+}