diff options
author | Keuin <[email protected]> | 2023-01-17 01:33:44 +0800 |
---|---|---|
committer | Keuin <[email protected]> | 2023-01-17 01:33:44 +0800 |
commit | 61aa66b558ae8f2987a238216ac01e375f195c1e (patch) | |
tree | 4256ad9c7075ed119daa9ce55c5a0cbadf63c359 /src/main/java/com/keuin/kbackupfabric | |
parent | 822a5c5f53122763bf63e0baff79c435b1a60dbc (diff) |
Make ParallelSupplier a top-level class.
Diffstat (limited to 'src/main/java/com/keuin/kbackupfabric')
-rw-r--r-- | src/main/java/com/keuin/kbackupfabric/backup/incremental/ObjectCollectionFactory.java | 74 | ||||
-rw-r--r-- | src/main/java/com/keuin/kbackupfabric/util/ParallelSupplier.java | 81 |
2 files changed, 82 insertions, 73 deletions
diff --git a/src/main/java/com/keuin/kbackupfabric/backup/incremental/ObjectCollectionFactory.java b/src/main/java/com/keuin/kbackupfabric/backup/incremental/ObjectCollectionFactory.java index 095108b..e8fd1f0 100644 --- a/src/main/java/com/keuin/kbackupfabric/backup/incremental/ObjectCollectionFactory.java +++ b/src/main/java/com/keuin/kbackupfabric/backup/incremental/ObjectCollectionFactory.java @@ -2,6 +2,7 @@ package com.keuin.kbackupfabric.backup.incremental; import com.keuin.kbackupfabric.backup.incremental.identifier.FileIdentifierProvider; import com.keuin.kbackupfabric.backup.incremental.identifier.ObjectIdentifier; +import com.keuin.kbackupfabric.util.ParallelSupplier; import com.keuin.kbackupfabric.util.PrintUtil; import java.io.File; @@ -10,9 +11,6 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.*; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.function.Consumer; -import java.util.function.Supplier; import java.util.stream.Stream; /** @@ -104,74 +102,4 @@ public class ObjectCollectionFactory<T extends ObjectIdentifier> { this.exception = e; } - /** - * A single-consumer, multiple-producer model. - * - * @param <Res> type of the resource to be produced and consumed. - */ - private static class ParallelSupplier<Res> { - - private final Consumer<Res> consumer; - private final ConcurrentLinkedQueue<Supplier<Res>> taskList = new ConcurrentLinkedQueue<>(); - private final int threads; - private final Set<ParallelWorker<Res>> workers = new HashSet<>(); - - public ParallelSupplier(Consumer<Res> consumer, int threads) { - this.consumer = consumer; - this.threads = threads; - } - - public void addTask(Supplier<Res> valueSupplier) { - this.taskList.add(valueSupplier); - } - - public void process() { - workers.clear(); - for (int i = 0; i < threads; i++) { - ParallelWorker<Res> worker = new ParallelWorker<>(taskList, consumer, i); - workers.add(worker); - worker.start(); - } - join(); // wait for all workers to exit before returning - } - - private void join() { - while (true) { - int aliveCount = 0; - for (ParallelWorker<Res> worker : workers) { - try { - if (worker.isAlive()) { - ++aliveCount; - worker.join(); - } - } catch (InterruptedException ignored) { - } - } - if (aliveCount == 0) - return; - } - } - - private static class ParallelWorker<V> extends Thread { - - private final Queue<Supplier<V>> taskProvider; - private final Consumer<V> consumer; - - public ParallelWorker(Queue<Supplier<V>> taskProvider, Consumer<V> consumer, int workerId) { - super("PutterWorker#" + workerId); - this.taskProvider = taskProvider; - this.consumer = consumer; - } - - @Override - public void run() { - Supplier<V> supplier; - while ((supplier = taskProvider.poll()) != null) { - // here we do not let the consumer accept null productions - Optional.ofNullable(supplier.get()).ifPresent(consumer); - } - } - } - } - } diff --git a/src/main/java/com/keuin/kbackupfabric/util/ParallelSupplier.java b/src/main/java/com/keuin/kbackupfabric/util/ParallelSupplier.java new file mode 100644 index 0000000..bd51711 --- /dev/null +++ b/src/main/java/com/keuin/kbackupfabric/util/ParallelSupplier.java @@ -0,0 +1,81 @@ +package com.keuin.kbackupfabric.util; + +import com.keuin.kbackupfabric.backup.incremental.ObjectCollectionFactory; + +import java.util.HashSet; +import java.util.Optional; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.function.Consumer; +import java.util.function.Supplier; + +/** + * A single-consumer, multiple-producer model. + * + * @param <Res> type of the resource to be produced and consumed. + */ +public class ParallelSupplier<Res> { + + private final Consumer<Res> consumer; + private final ConcurrentLinkedQueue<Supplier<Res>> taskList = new ConcurrentLinkedQueue<>(); + private final int threads; + private final Set<ParallelWorker<Res>> workers = new HashSet<>(); + + public ParallelSupplier(Consumer<Res> consumer, int threads) { + this.consumer = consumer; + this.threads = threads; + } + + public void addTask(Supplier<Res> valueSupplier) { + this.taskList.add(valueSupplier); + } + + public void process() { + workers.clear(); + for (int i = 0; i < threads; i++) { + ParallelWorker<Res> worker = new ParallelWorker<>(taskList, consumer, i); + workers.add(worker); + worker.start(); + } + join(); // wait for all workers to exit before returning + } + + private void join() { + while (true) { + int aliveCount = 0; + for (ParallelWorker<Res> worker : workers) { + try { + if (worker.isAlive()) { + ++aliveCount; + worker.join(); + } + } catch (InterruptedException ignored) { + } + } + if (aliveCount == 0) + return; + } + } + + private static class ParallelWorker<V> extends Thread { + + private final Queue<Supplier<V>> taskProvider; + private final Consumer<V> consumer; + + public ParallelWorker(Queue<Supplier<V>> taskProvider, Consumer<V> consumer, int workerId) { + super("PutterWorker#" + workerId); + this.taskProvider = taskProvider; + this.consumer = consumer; + } + + @Override + public void run() { + Supplier<V> supplier; + while ((supplier = taskProvider.poll()) != null) { + // here we do not let the consumer accept null productions + Optional.ofNullable(supplier.get()).ifPresent(consumer); + } + } + } +} |