From 61aa66b558ae8f2987a238216ac01e375f195c1e Mon Sep 17 00:00:00 2001 From: Keuin Date: Tue, 17 Jan 2023 01:33:44 +0800 Subject: Make ParallelSupplier a top-level class. --- .../keuin/kbackupfabric/util/ParallelSupplier.java | 81 ++++++++++++++++++++++ 1 file changed, 81 insertions(+) create mode 100644 src/main/java/com/keuin/kbackupfabric/util/ParallelSupplier.java (limited to 'src/main/java/com/keuin/kbackupfabric/util/ParallelSupplier.java') diff --git a/src/main/java/com/keuin/kbackupfabric/util/ParallelSupplier.java b/src/main/java/com/keuin/kbackupfabric/util/ParallelSupplier.java new file mode 100644 index 0000000..bd51711 --- /dev/null +++ b/src/main/java/com/keuin/kbackupfabric/util/ParallelSupplier.java @@ -0,0 +1,81 @@ +package com.keuin.kbackupfabric.util; + +import com.keuin.kbackupfabric.backup.incremental.ObjectCollectionFactory; + +import java.util.HashSet; +import java.util.Optional; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.function.Consumer; +import java.util.function.Supplier; + +/** + * A single-consumer, multiple-producer model. + * + * @param type of the resource to be produced and consumed. + */ +public class ParallelSupplier { + + private final Consumer consumer; + private final ConcurrentLinkedQueue> taskList = new ConcurrentLinkedQueue<>(); + private final int threads; + private final Set> workers = new HashSet<>(); + + public ParallelSupplier(Consumer consumer, int threads) { + this.consumer = consumer; + this.threads = threads; + } + + public void addTask(Supplier valueSupplier) { + this.taskList.add(valueSupplier); + } + + public void process() { + workers.clear(); + for (int i = 0; i < threads; i++) { + ParallelWorker worker = new ParallelWorker<>(taskList, consumer, i); + workers.add(worker); + worker.start(); + } + join(); // wait for all workers to exit before returning + } + + private void join() { + while (true) { + int aliveCount = 0; + for (ParallelWorker worker : workers) { + try { + if (worker.isAlive()) { + ++aliveCount; + worker.join(); + } + } catch (InterruptedException ignored) { + } + } + if (aliveCount == 0) + return; + } + } + + private static class ParallelWorker extends Thread { + + private final Queue> taskProvider; + private final Consumer consumer; + + public ParallelWorker(Queue> taskProvider, Consumer consumer, int workerId) { + super("PutterWorker#" + workerId); + this.taskProvider = taskProvider; + this.consumer = consumer; + } + + @Override + public void run() { + Supplier supplier; + while ((supplier = taskProvider.poll()) != null) { + // here we do not let the consumer accept null productions + Optional.ofNullable(supplier.get()).ifPresent(consumer); + } + } + } +} -- cgit v1.2.3