summaryrefslogtreecommitdiff
path: root/src/main/java/com/keuin/kbackupfabric/backup/incremental/ObjectCollectionFactory.java
diff options
context:
space:
mode:
authorKeuin <[email protected]>2023-01-17 01:33:44 +0800
committerKeuin <[email protected]>2023-01-17 01:33:44 +0800
commit61aa66b558ae8f2987a238216ac01e375f195c1e (patch)
tree4256ad9c7075ed119daa9ce55c5a0cbadf63c359 /src/main/java/com/keuin/kbackupfabric/backup/incremental/ObjectCollectionFactory.java
parent822a5c5f53122763bf63e0baff79c435b1a60dbc (diff)
Make ParallelSupplier a top-level class.
Diffstat (limited to 'src/main/java/com/keuin/kbackupfabric/backup/incremental/ObjectCollectionFactory.java')
-rw-r--r--src/main/java/com/keuin/kbackupfabric/backup/incremental/ObjectCollectionFactory.java74
1 files changed, 1 insertions, 73 deletions
diff --git a/src/main/java/com/keuin/kbackupfabric/backup/incremental/ObjectCollectionFactory.java b/src/main/java/com/keuin/kbackupfabric/backup/incremental/ObjectCollectionFactory.java
index 095108b..e8fd1f0 100644
--- a/src/main/java/com/keuin/kbackupfabric/backup/incremental/ObjectCollectionFactory.java
+++ b/src/main/java/com/keuin/kbackupfabric/backup/incremental/ObjectCollectionFactory.java
@@ -2,6 +2,7 @@ package com.keuin.kbackupfabric.backup.incremental;
import com.keuin.kbackupfabric.backup.incremental.identifier.FileIdentifierProvider;
import com.keuin.kbackupfabric.backup.incremental.identifier.ObjectIdentifier;
+import com.keuin.kbackupfabric.util.ParallelSupplier;
import com.keuin.kbackupfabric.util.PrintUtil;
import java.io.File;
@@ -10,9 +11,6 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.function.Consumer;
-import java.util.function.Supplier;
import java.util.stream.Stream;
/**
@@ -104,74 +102,4 @@ public class ObjectCollectionFactory<T extends ObjectIdentifier> {
this.exception = e;
}
- /**
- * A single-consumer, multiple-producer model.
- *
- * @param <Res> type of the resource to be produced and consumed.
- */
- private static class ParallelSupplier<Res> {
-
- private final Consumer<Res> consumer;
- private final ConcurrentLinkedQueue<Supplier<Res>> taskList = new ConcurrentLinkedQueue<>();
- private final int threads;
- private final Set<ParallelWorker<Res>> workers = new HashSet<>();
-
- public ParallelSupplier(Consumer<Res> consumer, int threads) {
- this.consumer = consumer;
- this.threads = threads;
- }
-
- public void addTask(Supplier<Res> valueSupplier) {
- this.taskList.add(valueSupplier);
- }
-
- public void process() {
- workers.clear();
- for (int i = 0; i < threads; i++) {
- ParallelWorker<Res> worker = new ParallelWorker<>(taskList, consumer, i);
- workers.add(worker);
- worker.start();
- }
- join(); // wait for all workers to exit before returning
- }
-
- private void join() {
- while (true) {
- int aliveCount = 0;
- for (ParallelWorker<Res> worker : workers) {
- try {
- if (worker.isAlive()) {
- ++aliveCount;
- worker.join();
- }
- } catch (InterruptedException ignored) {
- }
- }
- if (aliveCount == 0)
- return;
- }
- }
-
- private static class ParallelWorker<V> extends Thread {
-
- private final Queue<Supplier<V>> taskProvider;
- private final Consumer<V> consumer;
-
- public ParallelWorker(Queue<Supplier<V>> taskProvider, Consumer<V> consumer, int workerId) {
- super("PutterWorker#" + workerId);
- this.taskProvider = taskProvider;
- this.consumer = consumer;
- }
-
- @Override
- public void run() {
- Supplier<V> supplier;
- while ((supplier = taskProvider.poll()) != null) {
- // here we do not let the consumer accept null productions
- Optional.ofNullable(supplier.get()).ifPresent(consumer);
- }
- }
- }
- }
-
}