From 1c23fc14be8a0ac9542f1412448c4d896756ba01 Mon Sep 17 00:00:00 2001 From: Keuin Date: Fri, 22 Jan 2021 18:59:47 +0800 Subject: Speed up the incremental backup by using multiple CPU cores if available (use multiple threads to calculate the hash). --- .../incremental/ObjectCollectionFactory.java | 131 ++++++++++++++++++++- .../backup/incremental/ObjectElement.java | 8 +- 2 files changed, 134 insertions(+), 5 deletions(-) (limited to 'src/main/java/com/keuin/kbackupfabric/backup/incremental') diff --git a/src/main/java/com/keuin/kbackupfabric/backup/incremental/ObjectCollectionFactory.java b/src/main/java/com/keuin/kbackupfabric/backup/incremental/ObjectCollectionFactory.java index 0e02606..56fc052 100644 --- a/src/main/java/com/keuin/kbackupfabric/backup/incremental/ObjectCollectionFactory.java +++ b/src/main/java/com/keuin/kbackupfabric/backup/incremental/ObjectCollectionFactory.java @@ -9,6 +9,10 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.util.*; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.function.Consumer; +import java.util.function.Supplier; /** * Incremental backup is implemented as git-like file collection. @@ -19,18 +23,30 @@ import java.util.*; public class ObjectCollectionFactory { private final FileIdentifierProvider identifierFactory; + private final int threads; + private Exception exception = null; // fail in async - public ObjectCollectionFactory(FileIdentifierProvider identifierFactory) { + public ObjectCollectionFactory(FileIdentifierProvider identifierFactory, int threads) { this.identifierFactory = identifierFactory; + this.threads = threads; + if (threads <= 0) + throw new IllegalArgumentException("thread count must be positive."); } public ObjectCollection fromDirectory(File directory, Set ignoredFiles) throws IOException { - final Set subFiles = new HashSet<>(); + + final int minParallelProcessFileCountThreshold = 0; + + final Set subFiles = new ConcurrentSkipListSet<>(); + final Map subCollections = new HashMap<>(); if (!Objects.requireNonNull(directory).isDirectory()) throw new IllegalArgumentException("given file is not a directory"); + + // TODO: use putter instead + Set files = new HashSet<>(); for (Iterator iter = Files.walk(directory.toPath(), 1).iterator(); iter.hasNext(); ) { Path path = iter.next(); if (Files.isSameFile(path, directory.toPath())) @@ -39,17 +55,124 @@ public class ObjectCollectionFactory { if (file.isDirectory()) { subCollections.put(file.getName(), fromDirectory(file, ignoredFiles)); } else if (!ignoredFiles.contains(file.getName())) { - subFiles.add(new ObjectElement(file.getName(), identifierFactory.fromFile(file))); + files.add(file); // add to the set to be processed } else { PrintUtil.info(String.format("Skipping file %s.", file.getName())); } } - return new ObjectCollection(directory.getName(), subFiles, subCollections); + // deal with all direct sub files + if (threads == 1 || files.size() < minParallelProcessFileCountThreshold) { + for (File file : files) { + subFiles.add(new ObjectElement(file.getName(), identifierFactory.fromFile(file))); + } + } else { + // use ParallelSupplier to process + ParallelSupplier parallelSupplier = new ParallelSupplier<>(subFiles::add, threads); + files.forEach(file -> parallelSupplier.addTask(() -> { + try { + return new ObjectElement(file.getName(), identifierFactory.fromFile(file)); + } catch (IOException e) { + fail(e); + } + return null; + })); + parallelSupplier.process(); + } + + // check if any exception has been thrown in async workers. + synchronized (this) { + if (this.exception != null) { + if (exception instanceof IOException) + throw (IOException) exception; + else + throw new RuntimeException(exception); + } + } + + return new ObjectCollection(directory.getName(), new HashSet<>(subFiles), subCollections); } public ObjectCollection fromDirectory(File directory) throws IOException { return fromDirectory(directory, Collections.emptySet()); } + private synchronized void fail(IOException e) { + this.exception = e; + } + + /** + * A single-consumer, multiple-producer model. + * + * @param type of the resource to be produced and consumed. + */ + private static class ParallelSupplier { + + private final Consumer consumer; + private final ConcurrentLinkedQueue> taskList = new ConcurrentLinkedQueue<>(); + private final int threads; + private final Set> workers = new HashSet<>(); + + public ParallelSupplier(Consumer consumer, int threads) { + this.consumer = consumer; + this.threads = threads; + } + + public void addTask(Supplier valueSupplier) { + this.taskList.add(valueSupplier); + } + + public void process() { + workers.clear(); + for (int i = 0; i < threads; i++) { + ParallelWorker worker = new ParallelWorker(taskList, consumer, i); + workers.add(worker); + worker.start(); + } + join(); // wait for all workers to exit before returning + } + + private void join() { + while (true) { + int aliveCount = 0; + for (ParallelWorker worker : workers) { + try { + if (worker.isAlive()) { + ++aliveCount; + worker.join(); + } + } catch (InterruptedException ignored) { + } + } + if (aliveCount == 0) + return; + } + } + + private static class ParallelWorker extends Thread { + + private final Queue> taskProvider; + private final Consumer consumer; + + public ParallelWorker(Queue> taskProvider, Consumer consumer, int workerId) { + super("PutterWorker#" + workerId); + this.taskProvider = taskProvider; + this.consumer = consumer; + } + + @Override + public void run() { + Supplier supplier; + while ((supplier = taskProvider.poll()) != null) { + // here we do not let the consumer accept null productions + Optional.ofNullable(supplier.get()).ifPresent(consumer); + } + } + } + } + +// private interface Puttable { +// void put(K key, V value); +// } + } diff --git a/src/main/java/com/keuin/kbackupfabric/backup/incremental/ObjectElement.java b/src/main/java/com/keuin/kbackupfabric/backup/incremental/ObjectElement.java index 0bb7873..ee677fa 100644 --- a/src/main/java/com/keuin/kbackupfabric/backup/incremental/ObjectElement.java +++ b/src/main/java/com/keuin/kbackupfabric/backup/incremental/ObjectElement.java @@ -1,6 +1,7 @@ package com.keuin.kbackupfabric.backup.incremental; import com.keuin.kbackupfabric.backup.incremental.identifier.ObjectIdentifier; +import org.jetbrains.annotations.NotNull; import java.io.Serializable; import java.util.Objects; @@ -9,7 +10,7 @@ import java.util.Objects; * Representing a file in a ObjectCollection. * Immutable. */ -public class ObjectElement implements Serializable { +public class ObjectElement implements Serializable, Comparable { private final String name; private final ObjectIdentifier identifier; @@ -57,4 +58,9 @@ public class ObjectElement implements Serializable { ", identifier=" + identifier + '}'; } + + @Override + public int compareTo(@NotNull ObjectElement objectElement) { + return name.compareTo(objectElement.name); + } } -- cgit v1.2.3