summaryrefslogtreecommitdiff
path: root/src/main/java/com/keuin/kbackupfabric/backup/incremental/ObjectCollectionFactory.java
diff options
context:
space:
mode:
authorKeuin <[email protected]>2021-01-22 18:59:47 +0800
committerkeuin <[email protected]>2021-01-22 18:59:47 +0800
commit1c23fc14be8a0ac9542f1412448c4d896756ba01 (patch)
tree98a89f19f84877d24a99f9a256408e46b4c32f29 /src/main/java/com/keuin/kbackupfabric/backup/incremental/ObjectCollectionFactory.java
parent2f1d2ec7ddaebbbd19cde6314afa873f6fb964f4 (diff)
Speed up the incremental backup by using multiple CPU cores if available (use multiple threads to calculate the hash).
Diffstat (limited to 'src/main/java/com/keuin/kbackupfabric/backup/incremental/ObjectCollectionFactory.java')
-rw-r--r--src/main/java/com/keuin/kbackupfabric/backup/incremental/ObjectCollectionFactory.java131
1 files changed, 127 insertions, 4 deletions
diff --git a/src/main/java/com/keuin/kbackupfabric/backup/incremental/ObjectCollectionFactory.java b/src/main/java/com/keuin/kbackupfabric/backup/incremental/ObjectCollectionFactory.java
index 0e02606..56fc052 100644
--- a/src/main/java/com/keuin/kbackupfabric/backup/incremental/ObjectCollectionFactory.java
+++ b/src/main/java/com/keuin/kbackupfabric/backup/incremental/ObjectCollectionFactory.java
@@ -9,6 +9,10 @@ import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.*;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
/**
* Incremental backup is implemented as git-like file collection.
@@ -19,18 +23,30 @@ import java.util.*;
public class ObjectCollectionFactory<T extends ObjectIdentifier> {
private final FileIdentifierProvider<T> identifierFactory;
+ private final int threads;
+ private Exception exception = null; // fail in async
- public ObjectCollectionFactory(FileIdentifierProvider<T> identifierFactory) {
+ public ObjectCollectionFactory(FileIdentifierProvider<T> identifierFactory, int threads) {
this.identifierFactory = identifierFactory;
+ this.threads = threads;
+ if (threads <= 0)
+ throw new IllegalArgumentException("thread count must be positive.");
}
public ObjectCollection fromDirectory(File directory, Set<String> ignoredFiles) throws IOException {
- final Set<ObjectElement> subFiles = new HashSet<>();
+
+ final int minParallelProcessFileCountThreshold = 0;
+
+ final Set<ObjectElement> subFiles = new ConcurrentSkipListSet<>();
+
final Map<String, ObjectCollection> subCollections = new HashMap<>();
if (!Objects.requireNonNull(directory).isDirectory())
throw new IllegalArgumentException("given file is not a directory");
+
+ // TODO: use putter instead
+ Set<File> files = new HashSet<>();
for (Iterator<Path> iter = Files.walk(directory.toPath(), 1).iterator(); iter.hasNext(); ) {
Path path = iter.next();
if (Files.isSameFile(path, directory.toPath()))
@@ -39,17 +55,124 @@ public class ObjectCollectionFactory<T extends ObjectIdentifier> {
if (file.isDirectory()) {
subCollections.put(file.getName(), fromDirectory(file, ignoredFiles));
} else if (!ignoredFiles.contains(file.getName())) {
- subFiles.add(new ObjectElement(file.getName(), identifierFactory.fromFile(file)));
+ files.add(file); // add to the set to be processed
} else {
PrintUtil.info(String.format("Skipping file %s.", file.getName()));
}
}
- return new ObjectCollection(directory.getName(), subFiles, subCollections);
+ // deal with all direct sub files
+ if (threads == 1 || files.size() < minParallelProcessFileCountThreshold) {
+ for (File file : files) {
+ subFiles.add(new ObjectElement(file.getName(), identifierFactory.fromFile(file)));
+ }
+ } else {
+ // use ParallelSupplier to process
+ ParallelSupplier<ObjectElement> parallelSupplier = new ParallelSupplier<>(subFiles::add, threads);
+ files.forEach(file -> parallelSupplier.addTask(() -> {
+ try {
+ return new ObjectElement(file.getName(), identifierFactory.fromFile(file));
+ } catch (IOException e) {
+ fail(e);
+ }
+ return null;
+ }));
+ parallelSupplier.process();
+ }
+
+ // check if any exception has been thrown in async workers.
+ synchronized (this) {
+ if (this.exception != null) {
+ if (exception instanceof IOException)
+ throw (IOException) exception;
+ else
+ throw new RuntimeException(exception);
+ }
+ }
+
+ return new ObjectCollection(directory.getName(), new HashSet<>(subFiles), subCollections);
}
public ObjectCollection fromDirectory(File directory) throws IOException {
return fromDirectory(directory, Collections.emptySet());
}
+ private synchronized void fail(IOException e) {
+ this.exception = e;
+ }
+
+ /**
+ * A single-consumer, multiple-producer model.
+ *
+ * @param <Res> type of the resource to be produced and consumed.
+ */
+ private static class ParallelSupplier<Res> {
+
+ private final Consumer<Res> consumer;
+ private final ConcurrentLinkedQueue<Supplier<Res>> taskList = new ConcurrentLinkedQueue<>();
+ private final int threads;
+ private final Set<ParallelWorker<Res>> workers = new HashSet<>();
+
+ public ParallelSupplier(Consumer<Res> consumer, int threads) {
+ this.consumer = consumer;
+ this.threads = threads;
+ }
+
+ public void addTask(Supplier<Res> valueSupplier) {
+ this.taskList.add(valueSupplier);
+ }
+
+ public void process() {
+ workers.clear();
+ for (int i = 0; i < threads; i++) {
+ ParallelWorker<Res> worker = new ParallelWorker<Res>(taskList, consumer, i);
+ workers.add(worker);
+ worker.start();
+ }
+ join(); // wait for all workers to exit before returning
+ }
+
+ private void join() {
+ while (true) {
+ int aliveCount = 0;
+ for (ParallelWorker<Res> worker : workers) {
+ try {
+ if (worker.isAlive()) {
+ ++aliveCount;
+ worker.join();
+ }
+ } catch (InterruptedException ignored) {
+ }
+ }
+ if (aliveCount == 0)
+ return;
+ }
+ }
+
+ private static class ParallelWorker<V> extends Thread {
+
+ private final Queue<Supplier<V>> taskProvider;
+ private final Consumer<V> consumer;
+
+ public ParallelWorker(Queue<Supplier<V>> taskProvider, Consumer<V> consumer, int workerId) {
+ super("PutterWorker#" + workerId);
+ this.taskProvider = taskProvider;
+ this.consumer = consumer;
+ }
+
+ @Override
+ public void run() {
+ Supplier<V> supplier;
+ while ((supplier = taskProvider.poll()) != null) {
+ // here we do not let the consumer accept null productions
+ Optional.ofNullable(supplier.get()).ifPresent(consumer);
+ }
+ }
+ }
+ }
+
+// private interface Puttable<K, V> {
+// void put(K key, V value);
+// }
+
}