package com.keuin.kbackupfabric.backup.incremental; import com.keuin.kbackupfabric.backup.incremental.identifier.FileIdentifierProvider; import com.keuin.kbackupfabric.backup.incremental.identifier.ObjectIdentifier; import com.keuin.kbackupfabric.util.PrintUtil; import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.function.Consumer; import java.util.function.Supplier; /** * Incremental backup is implemented as git-like file collection. * Files are called `objects`, the collection contains all files distinguished by their * identifiers. Usually, identifier is the combination of hash and other short information (such as size and another hash). * The identifier should use hashes that are strong enough, to prevent possible collisions. */ public class ObjectCollectionFactory { private final FileIdentifierProvider identifierFactory; private final int threads; private Exception exception = null; // fail in async private final int minParallelProcessFileCountThreshold; public ObjectCollectionFactory(FileIdentifierProvider identifierFactory, int threads, int minParallelProcessFileCountThreshold) { this.identifierFactory = identifierFactory; this.threads = threads; this.minParallelProcessFileCountThreshold = minParallelProcessFileCountThreshold; if (threads <= 0) throw new IllegalArgumentException("thread count must be positive."); if (minParallelProcessFileCountThreshold < 0) throw new IllegalArgumentException("minParallelProcessFileCountThreshold must not be negative."); } public ObjectCollection2 fromDirectory(File directory, Set ignoredFiles) throws IOException { final Map subCollections = new HashMap<>(); if (!Objects.requireNonNull(directory).isDirectory()) throw new IllegalArgumentException("given file is not a directory"); Set files = new HashSet<>(); for (Iterator iter = Files.walk(directory.toPath(), 1).iterator(); iter.hasNext(); ) { Path path = iter.next(); if (Files.isSameFile(path, directory.toPath())) continue; File file = path.toFile(); if (file.isDirectory()) { subCollections.put(file.getName(), fromDirectory(file, ignoredFiles)); } else if (!ignoredFiles.contains(file.getName())) { files.add(file); // add to the set to be processed } else { PrintUtil.info(String.format("Skipping file %s.", file.getName())); } } final Set subFiles = ConcurrentHashMap.newKeySet(files.size()); // deal with all direct sub files if (threads == 1 || files.size() < minParallelProcessFileCountThreshold) { for (File file : files) { subFiles.add(new ObjectElement(file.getName(), identifierFactory.fromFile(file))); } } else { // use ParallelSupplier to process ParallelSupplier parallelSupplier = new ParallelSupplier<>(subFiles::add, threads); files.forEach(file -> parallelSupplier.addTask(() -> { try { return new ObjectElement(file.getName(), identifierFactory.fromFile(file)); } catch (IOException e) { fail(e); } return null; })); parallelSupplier.process(); } // check if any exception has been thrown in async workers. synchronized (this) { if (this.exception != null) { if (exception instanceof IOException) throw (IOException) exception; else throw new RuntimeException(exception); } } return new ObjectCollection2(directory.getName(), subFiles, subCollections); } public ObjectCollection2 fromDirectory(File directory) throws IOException { return fromDirectory(directory, Collections.emptySet()); } private synchronized void fail(IOException e) { this.exception = e; } /** * A single-consumer, multiple-producer model. * * @param type of the resource to be produced and consumed. */ private static class ParallelSupplier { private final Consumer consumer; private final ConcurrentLinkedQueue> taskList = new ConcurrentLinkedQueue<>(); private final int threads; private final Set> workers = new HashSet<>(); public ParallelSupplier(Consumer consumer, int threads) { this.consumer = consumer; this.threads = threads; } public void addTask(Supplier valueSupplier) { this.taskList.add(valueSupplier); } public void process() { workers.clear(); for (int i = 0; i < threads; i++) { ParallelWorker worker = new ParallelWorker(taskList, consumer, i); workers.add(worker); worker.start(); } join(); // wait for all workers to exit before returning } private void join() { while (true) { int aliveCount = 0; for (ParallelWorker worker : workers) { try { if (worker.isAlive()) { ++aliveCount; worker.join(); } } catch (InterruptedException ignored) { } } if (aliveCount == 0) return; } } private static class ParallelWorker extends Thread { private final Queue> taskProvider; private final Consumer consumer; public ParallelWorker(Queue> taskProvider, Consumer consumer, int workerId) { super("PutterWorker#" + workerId); this.taskProvider = taskProvider; this.consumer = consumer; } @Override public void run() { Supplier supplier; while ((supplier = taskProvider.poll()) != null) { // here we do not let the consumer accept null productions Optional.ofNullable(supplier.get()).ifPresent(consumer); } } } } }