summaryrefslogtreecommitdiff
path: root/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'src/main')
-rw-r--r--src/main/java/com/keuin/kbackupfabric/backup/incremental/ObjectCollectionFactory.java131
-rw-r--r--src/main/java/com/keuin/kbackupfabric/backup/incremental/ObjectElement.java8
-rw-r--r--src/main/java/com/keuin/kbackupfabric/operation/backup/method/ConfiguredIncrementalBackupMethod.java11
-rw-r--r--src/main/java/com/keuin/kbackupfabric/util/ThreadingUtil.java22
4 files changed, 165 insertions, 7 deletions
diff --git a/src/main/java/com/keuin/kbackupfabric/backup/incremental/ObjectCollectionFactory.java b/src/main/java/com/keuin/kbackupfabric/backup/incremental/ObjectCollectionFactory.java
index 0e02606..56fc052 100644
--- a/src/main/java/com/keuin/kbackupfabric/backup/incremental/ObjectCollectionFactory.java
+++ b/src/main/java/com/keuin/kbackupfabric/backup/incremental/ObjectCollectionFactory.java
@@ -9,6 +9,10 @@ import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.*;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
/**
* Incremental backup is implemented as git-like file collection.
@@ -19,18 +23,30 @@ import java.util.*;
public class ObjectCollectionFactory<T extends ObjectIdentifier> {
private final FileIdentifierProvider<T> identifierFactory;
+ private final int threads;
+ private Exception exception = null; // fail in async
- public ObjectCollectionFactory(FileIdentifierProvider<T> identifierFactory) {
+ public ObjectCollectionFactory(FileIdentifierProvider<T> identifierFactory, int threads) {
this.identifierFactory = identifierFactory;
+ this.threads = threads;
+ if (threads <= 0)
+ throw new IllegalArgumentException("thread count must be positive.");
}
public ObjectCollection fromDirectory(File directory, Set<String> ignoredFiles) throws IOException {
- final Set<ObjectElement> subFiles = new HashSet<>();
+
+ final int minParallelProcessFileCountThreshold = 0;
+
+ final Set<ObjectElement> subFiles = new ConcurrentSkipListSet<>();
+
final Map<String, ObjectCollection> subCollections = new HashMap<>();
if (!Objects.requireNonNull(directory).isDirectory())
throw new IllegalArgumentException("given file is not a directory");
+
+ // TODO: use putter instead
+ Set<File> files = new HashSet<>();
for (Iterator<Path> iter = Files.walk(directory.toPath(), 1).iterator(); iter.hasNext(); ) {
Path path = iter.next();
if (Files.isSameFile(path, directory.toPath()))
@@ -39,17 +55,124 @@ public class ObjectCollectionFactory<T extends ObjectIdentifier> {
if (file.isDirectory()) {
subCollections.put(file.getName(), fromDirectory(file, ignoredFiles));
} else if (!ignoredFiles.contains(file.getName())) {
- subFiles.add(new ObjectElement(file.getName(), identifierFactory.fromFile(file)));
+ files.add(file); // add to the set to be processed
} else {
PrintUtil.info(String.format("Skipping file %s.", file.getName()));
}
}
- return new ObjectCollection(directory.getName(), subFiles, subCollections);
+ // deal with all direct sub files
+ if (threads == 1 || files.size() < minParallelProcessFileCountThreshold) {
+ for (File file : files) {
+ subFiles.add(new ObjectElement(file.getName(), identifierFactory.fromFile(file)));
+ }
+ } else {
+ // use ParallelSupplier to process
+ ParallelSupplier<ObjectElement> parallelSupplier = new ParallelSupplier<>(subFiles::add, threads);
+ files.forEach(file -> parallelSupplier.addTask(() -> {
+ try {
+ return new ObjectElement(file.getName(), identifierFactory.fromFile(file));
+ } catch (IOException e) {
+ fail(e);
+ }
+ return null;
+ }));
+ parallelSupplier.process();
+ }
+
+ // check if any exception has been thrown in async workers.
+ synchronized (this) {
+ if (this.exception != null) {
+ if (exception instanceof IOException)
+ throw (IOException) exception;
+ else
+ throw new RuntimeException(exception);
+ }
+ }
+
+ return new ObjectCollection(directory.getName(), new HashSet<>(subFiles), subCollections);
}
public ObjectCollection fromDirectory(File directory) throws IOException {
return fromDirectory(directory, Collections.emptySet());
}
+ private synchronized void fail(IOException e) {
+ this.exception = e;
+ }
+
+ /**
+ * A single-consumer, multiple-producer model.
+ *
+ * @param <Res> type of the resource to be produced and consumed.
+ */
+ private static class ParallelSupplier<Res> {
+
+ private final Consumer<Res> consumer;
+ private final ConcurrentLinkedQueue<Supplier<Res>> taskList = new ConcurrentLinkedQueue<>();
+ private final int threads;
+ private final Set<ParallelWorker<Res>> workers = new HashSet<>();
+
+ public ParallelSupplier(Consumer<Res> consumer, int threads) {
+ this.consumer = consumer;
+ this.threads = threads;
+ }
+
+ public void addTask(Supplier<Res> valueSupplier) {
+ this.taskList.add(valueSupplier);
+ }
+
+ public void process() {
+ workers.clear();
+ for (int i = 0; i < threads; i++) {
+ ParallelWorker<Res> worker = new ParallelWorker<Res>(taskList, consumer, i);
+ workers.add(worker);
+ worker.start();
+ }
+ join(); // wait for all workers to exit before returning
+ }
+
+ private void join() {
+ while (true) {
+ int aliveCount = 0;
+ for (ParallelWorker<Res> worker : workers) {
+ try {
+ if (worker.isAlive()) {
+ ++aliveCount;
+ worker.join();
+ }
+ } catch (InterruptedException ignored) {
+ }
+ }
+ if (aliveCount == 0)
+ return;
+ }
+ }
+
+ private static class ParallelWorker<V> extends Thread {
+
+ private final Queue<Supplier<V>> taskProvider;
+ private final Consumer<V> consumer;
+
+ public ParallelWorker(Queue<Supplier<V>> taskProvider, Consumer<V> consumer, int workerId) {
+ super("PutterWorker#" + workerId);
+ this.taskProvider = taskProvider;
+ this.consumer = consumer;
+ }
+
+ @Override
+ public void run() {
+ Supplier<V> supplier;
+ while ((supplier = taskProvider.poll()) != null) {
+ // here we do not let the consumer accept null productions
+ Optional.ofNullable(supplier.get()).ifPresent(consumer);
+ }
+ }
+ }
+ }
+
+// private interface Puttable<K, V> {
+// void put(K key, V value);
+// }
+
}
diff --git a/src/main/java/com/keuin/kbackupfabric/backup/incremental/ObjectElement.java b/src/main/java/com/keuin/kbackupfabric/backup/incremental/ObjectElement.java
index 0bb7873..ee677fa 100644
--- a/src/main/java/com/keuin/kbackupfabric/backup/incremental/ObjectElement.java
+++ b/src/main/java/com/keuin/kbackupfabric/backup/incremental/ObjectElement.java
@@ -1,6 +1,7 @@
package com.keuin.kbackupfabric.backup.incremental;
import com.keuin.kbackupfabric.backup.incremental.identifier.ObjectIdentifier;
+import org.jetbrains.annotations.NotNull;
import java.io.Serializable;
import java.util.Objects;
@@ -9,7 +10,7 @@ import java.util.Objects;
* Representing a file in a ObjectCollection.
* Immutable.
*/
-public class ObjectElement implements Serializable {
+public class ObjectElement implements Serializable, Comparable<ObjectElement> {
private final String name;
private final ObjectIdentifier identifier;
@@ -57,4 +58,9 @@ public class ObjectElement implements Serializable {
", identifier=" + identifier +
'}';
}
+
+ @Override
+ public int compareTo(@NotNull ObjectElement objectElement) {
+ return name.compareTo(objectElement.name);
+ }
}
diff --git a/src/main/java/com/keuin/kbackupfabric/operation/backup/method/ConfiguredIncrementalBackupMethod.java b/src/main/java/com/keuin/kbackupfabric/operation/backup/method/ConfiguredIncrementalBackupMethod.java
index baa6580..ab0c114 100644
--- a/src/main/java/com/keuin/kbackupfabric/operation/backup/method/ConfiguredIncrementalBackupMethod.java
+++ b/src/main/java/com/keuin/kbackupfabric/operation/backup/method/ConfiguredIncrementalBackupMethod.java
@@ -8,6 +8,7 @@ import com.keuin.kbackupfabric.backup.incremental.manager.IncrementalBackupStora
import com.keuin.kbackupfabric.operation.backup.feedback.IncrementalBackupFeedback;
import com.keuin.kbackupfabric.util.FilesystemUtil;
import com.keuin.kbackupfabric.util.PrintUtil;
+import com.keuin.kbackupfabric.util.ThreadingUtil;
import java.io.File;
import java.io.IOException;
@@ -34,13 +35,18 @@ public class ConfiguredIncrementalBackupMethod implements ConfiguredBackupMethod
@Override
public IncrementalBackupFeedback backup() {
+
+ final int hashFactoryThreads = ThreadingUtil.getRecommendedThreadCount(); // how many threads do we use to generate the hash tree
+
+ LOGGER.info("Threads: " + hashFactoryThreads);
+
IncrementalBackupFeedback feedback;
try {
File levelPathFile = new File(levelPath);
// construct incremental backup index
PrintUtil.info("Hashing files...");
- ObjectCollection collection = new ObjectCollectionFactory<>(Sha256Identifier.getFactory())
+ ObjectCollection collection = new ObjectCollectionFactory<>(Sha256Identifier.getFactory(), hashFactoryThreads)
.fromDirectory(levelPathFile, new HashSet<>(Arrays.asList("session.lock", "kbackup_metadata")));
// update storage
@@ -56,10 +62,12 @@ public class ConfiguredIncrementalBackupMethod implements ConfiguredBackupMethod
PrintUtil.info("Incremental backup finished.");
feedback = new IncrementalBackupFeedback(filesAdded >= 0, filesAdded);
} catch (IOException e) {
+ e.printStackTrace(); // at least we should print it out if we discard the exception... Better than doing nothing.
feedback = new IncrementalBackupFeedback(false, 0);
}
if (!feedback.isSuccess()) {
+ LOGGER.severe("Failed to backup.");
// do clean-up if failed
File backupIndexFile = new File(backupIndexFileSaveDirectory, backupIndexFileName);
if (backupIndexFile.exists()) {
@@ -67,7 +75,6 @@ public class ConfiguredIncrementalBackupMethod implements ConfiguredBackupMethod
LOGGER.warning("Failed to clean up: cannot delete file " + backupIndexFile.getName());
}
}
-
//TODO: do more deep clean for object files
}
diff --git a/src/main/java/com/keuin/kbackupfabric/util/ThreadingUtil.java b/src/main/java/com/keuin/kbackupfabric/util/ThreadingUtil.java
new file mode 100644
index 0000000..b270895
--- /dev/null
+++ b/src/main/java/com/keuin/kbackupfabric/util/ThreadingUtil.java
@@ -0,0 +1,22 @@
+package com.keuin.kbackupfabric.util;
+
+public class ThreadingUtil {
+ public static int getRecommendedThreadCount() {
+ int coreCount = Runtime.getRuntime().availableProcessors();
+
+ // if the cores are too few, we regress to single thread
+ if (coreCount <= 2)
+ return 1;
+
+ // we have multiple cores, but not too many
+ if (coreCount == 3)
+ return 2;
+
+ // if we have multiple core, but not too many, we use a half
+ if (coreCount <= 6) // 4, 5, 6 -> 3, 3, 4
+ return (coreCount + 2) / 2;
+
+ // cores are sufficient, we use almost all of them, except a fixed count remained for the OS
+ return coreCount - 2;
+ }
+}