package com.keuin.kbackupfabric.util; import com.keuin.kbackupfabric.backup.incremental.ObjectCollectionFactory; import java.util.HashSet; import java.util.Optional; import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.function.Consumer; import java.util.function.Supplier; /** * A single-consumer, multiple-producer model. * * @param type of the resource to be produced and consumed. */ public class ParallelSupplier { private final Consumer consumer; private final ConcurrentLinkedQueue> taskList = new ConcurrentLinkedQueue<>(); private final int threads; private final Set> workers = new HashSet<>(); public ParallelSupplier(Consumer consumer, int threads) { this.consumer = consumer; this.threads = threads; } public void addTask(Supplier valueSupplier) { this.taskList.add(valueSupplier); } public void process() { workers.clear(); for (int i = 0; i < threads; i++) { ParallelWorker worker = new ParallelWorker<>(taskList, consumer, i); workers.add(worker); worker.start(); } join(); // wait for all workers to exit before returning } private void join() { while (true) { int aliveCount = 0; for (ParallelWorker worker : workers) { try { if (worker.isAlive()) { ++aliveCount; worker.join(); } } catch (InterruptedException ignored) { } } if (aliveCount == 0) return; } } private static class ParallelWorker extends Thread { private final Queue> taskProvider; private final Consumer consumer; public ParallelWorker(Queue> taskProvider, Consumer consumer, int workerId) { super("PutterWorker#" + workerId); this.taskProvider = taskProvider; this.consumer = consumer; } @Override public void run() { Supplier supplier; while ((supplier = taskProvider.poll()) != null) { // here we do not let the consumer accept null productions Optional.ofNullable(supplier.get()).ifPresent(consumer); } } } }