summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKeuin <[email protected]>2021-01-22 18:59:47 +0800
committerkeuin <[email protected]>2021-01-22 18:59:47 +0800
commit1c23fc14be8a0ac9542f1412448c4d896756ba01 (patch)
tree98a89f19f84877d24a99f9a256408e46b4c32f29
parent2f1d2ec7ddaebbbd19cde6314afa873f6fb964f4 (diff)
Speed up the incremental backup by using multiple CPU cores if available (use multiple threads to calculate the hash).
-rw-r--r--gradle.properties2
-rw-r--r--src/main/java/com/keuin/kbackupfabric/backup/incremental/ObjectCollectionFactory.java131
-rw-r--r--src/main/java/com/keuin/kbackupfabric/backup/incremental/ObjectElement.java8
-rw-r--r--src/main/java/com/keuin/kbackupfabric/operation/backup/method/ConfiguredIncrementalBackupMethod.java11
-rw-r--r--src/main/java/com/keuin/kbackupfabric/util/ThreadingUtil.java22
-rw-r--r--src/test/java/com/keuin/kbackupfabric/backup/incremental/ObjectCollectionFactoryTest.java23
-rw-r--r--src/test/java/com/keuin/kbackupfabric/backup/incremental/ObjectCollectionSerializerTest.java24
-rw-r--r--src/test/java/com/keuin/kbackupfabric/operation/backup/method/ConfiguredIncrementalBackupMethodTest.java15
8 files changed, 219 insertions, 17 deletions
diff --git a/gradle.properties b/gradle.properties
index 7878280..6e44a42 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -6,7 +6,7 @@ minecraft_version=1.14.4
yarn_mappings=1.14.4+build.18
loader_version=0.11.0
# Mod Properties
-mod_version=1.4.3
+mod_version=1.4.4
maven_group=com.keuin.kbackupfabric
archives_base_name=kbackup-fabric
# Dependencies
diff --git a/src/main/java/com/keuin/kbackupfabric/backup/incremental/ObjectCollectionFactory.java b/src/main/java/com/keuin/kbackupfabric/backup/incremental/ObjectCollectionFactory.java
index 0e02606..56fc052 100644
--- a/src/main/java/com/keuin/kbackupfabric/backup/incremental/ObjectCollectionFactory.java
+++ b/src/main/java/com/keuin/kbackupfabric/backup/incremental/ObjectCollectionFactory.java
@@ -9,6 +9,10 @@ import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.*;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
/**
* Incremental backup is implemented as git-like file collection.
@@ -19,18 +23,30 @@ import java.util.*;
public class ObjectCollectionFactory<T extends ObjectIdentifier> {
private final FileIdentifierProvider<T> identifierFactory;
+ private final int threads;
+ private Exception exception = null; // fail in async
- public ObjectCollectionFactory(FileIdentifierProvider<T> identifierFactory) {
+ public ObjectCollectionFactory(FileIdentifierProvider<T> identifierFactory, int threads) {
this.identifierFactory = identifierFactory;
+ this.threads = threads;
+ if (threads <= 0)
+ throw new IllegalArgumentException("thread count must be positive.");
}
public ObjectCollection fromDirectory(File directory, Set<String> ignoredFiles) throws IOException {
- final Set<ObjectElement> subFiles = new HashSet<>();
+
+ final int minParallelProcessFileCountThreshold = 0;
+
+ final Set<ObjectElement> subFiles = new ConcurrentSkipListSet<>();
+
final Map<String, ObjectCollection> subCollections = new HashMap<>();
if (!Objects.requireNonNull(directory).isDirectory())
throw new IllegalArgumentException("given file is not a directory");
+
+ // TODO: use putter instead
+ Set<File> files = new HashSet<>();
for (Iterator<Path> iter = Files.walk(directory.toPath(), 1).iterator(); iter.hasNext(); ) {
Path path = iter.next();
if (Files.isSameFile(path, directory.toPath()))
@@ -39,17 +55,124 @@ public class ObjectCollectionFactory<T extends ObjectIdentifier> {
if (file.isDirectory()) {
subCollections.put(file.getName(), fromDirectory(file, ignoredFiles));
} else if (!ignoredFiles.contains(file.getName())) {
- subFiles.add(new ObjectElement(file.getName(), identifierFactory.fromFile(file)));
+ files.add(file); // add to the set to be processed
} else {
PrintUtil.info(String.format("Skipping file %s.", file.getName()));
}
}
- return new ObjectCollection(directory.getName(), subFiles, subCollections);
+ // deal with all direct sub files
+ if (threads == 1 || files.size() < minParallelProcessFileCountThreshold) {
+ for (File file : files) {
+ subFiles.add(new ObjectElement(file.getName(), identifierFactory.fromFile(file)));
+ }
+ } else {
+ // use ParallelSupplier to process
+ ParallelSupplier<ObjectElement> parallelSupplier = new ParallelSupplier<>(subFiles::add, threads);
+ files.forEach(file -> parallelSupplier.addTask(() -> {
+ try {
+ return new ObjectElement(file.getName(), identifierFactory.fromFile(file));
+ } catch (IOException e) {
+ fail(e);
+ }
+ return null;
+ }));
+ parallelSupplier.process();
+ }
+
+ // check if any exception has been thrown in async workers.
+ synchronized (this) {
+ if (this.exception != null) {
+ if (exception instanceof IOException)
+ throw (IOException) exception;
+ else
+ throw new RuntimeException(exception);
+ }
+ }
+
+ return new ObjectCollection(directory.getName(), new HashSet<>(subFiles), subCollections);
}
public ObjectCollection fromDirectory(File directory) throws IOException {
return fromDirectory(directory, Collections.emptySet());
}
+ private synchronized void fail(IOException e) {
+ this.exception = e;
+ }
+
+ /**
+ * A single-consumer, multiple-producer model.
+ *
+ * @param <Res> type of the resource to be produced and consumed.
+ */
+ private static class ParallelSupplier<Res> {
+
+ private final Consumer<Res> consumer;
+ private final ConcurrentLinkedQueue<Supplier<Res>> taskList = new ConcurrentLinkedQueue<>();
+ private final int threads;
+ private final Set<ParallelWorker<Res>> workers = new HashSet<>();
+
+ public ParallelSupplier(Consumer<Res> consumer, int threads) {
+ this.consumer = consumer;
+ this.threads = threads;
+ }
+
+ public void addTask(Supplier<Res> valueSupplier) {
+ this.taskList.add(valueSupplier);
+ }
+
+ public void process() {
+ workers.clear();
+ for (int i = 0; i < threads; i++) {
+ ParallelWorker<Res> worker = new ParallelWorker<Res>(taskList, consumer, i);
+ workers.add(worker);
+ worker.start();
+ }
+ join(); // wait for all workers to exit before returning
+ }
+
+ private void join() {
+ while (true) {
+ int aliveCount = 0;
+ for (ParallelWorker<Res> worker : workers) {
+ try {
+ if (worker.isAlive()) {
+ ++aliveCount;
+ worker.join();
+ }
+ } catch (InterruptedException ignored) {
+ }
+ }
+ if (aliveCount == 0)
+ return;
+ }
+ }
+
+ private static class ParallelWorker<V> extends Thread {
+
+ private final Queue<Supplier<V>> taskProvider;
+ private final Consumer<V> consumer;
+
+ public ParallelWorker(Queue<Supplier<V>> taskProvider, Consumer<V> consumer, int workerId) {
+ super("PutterWorker#" + workerId);
+ this.taskProvider = taskProvider;
+ this.consumer = consumer;
+ }
+
+ @Override
+ public void run() {
+ Supplier<V> supplier;
+ while ((supplier = taskProvider.poll()) != null) {
+ // here we do not let the consumer accept null productions
+ Optional.ofNullable(supplier.get()).ifPresent(consumer);
+ }
+ }
+ }
+ }
+
+// private interface Puttable<K, V> {
+// void put(K key, V value);
+// }
+
}
diff --git a/src/main/java/com/keuin/kbackupfabric/backup/incremental/ObjectElement.java b/src/main/java/com/keuin/kbackupfabric/backup/incremental/ObjectElement.java
index 0bb7873..ee677fa 100644
--- a/src/main/java/com/keuin/kbackupfabric/backup/incremental/ObjectElement.java
+++ b/src/main/java/com/keuin/kbackupfabric/backup/incremental/ObjectElement.java
@@ -1,6 +1,7 @@
package com.keuin.kbackupfabric.backup.incremental;
import com.keuin.kbackupfabric.backup.incremental.identifier.ObjectIdentifier;
+import org.jetbrains.annotations.NotNull;
import java.io.Serializable;
import java.util.Objects;
@@ -9,7 +10,7 @@ import java.util.Objects;
* Representing a file in a ObjectCollection.
* Immutable.
*/
-public class ObjectElement implements Serializable {
+public class ObjectElement implements Serializable, Comparable<ObjectElement> {
private final String name;
private final ObjectIdentifier identifier;
@@ -57,4 +58,9 @@ public class ObjectElement implements Serializable {
", identifier=" + identifier +
'}';
}
+
+ @Override
+ public int compareTo(@NotNull ObjectElement objectElement) {
+ return name.compareTo(objectElement.name);
+ }
}
diff --git a/src/main/java/com/keuin/kbackupfabric/operation/backup/method/ConfiguredIncrementalBackupMethod.java b/src/main/java/com/keuin/kbackupfabric/operation/backup/method/ConfiguredIncrementalBackupMethod.java
index baa6580..ab0c114 100644
--- a/src/main/java/com/keuin/kbackupfabric/operation/backup/method/ConfiguredIncrementalBackupMethod.java
+++ b/src/main/java/com/keuin/kbackupfabric/operation/backup/method/ConfiguredIncrementalBackupMethod.java
@@ -8,6 +8,7 @@ import com.keuin.kbackupfabric.backup.incremental.manager.IncrementalBackupStora
import com.keuin.kbackupfabric.operation.backup.feedback.IncrementalBackupFeedback;
import com.keuin.kbackupfabric.util.FilesystemUtil;
import com.keuin.kbackupfabric.util.PrintUtil;
+import com.keuin.kbackupfabric.util.ThreadingUtil;
import java.io.File;
import java.io.IOException;
@@ -34,13 +35,18 @@ public class ConfiguredIncrementalBackupMethod implements ConfiguredBackupMethod
@Override
public IncrementalBackupFeedback backup() {
+
+ final int hashFactoryThreads = ThreadingUtil.getRecommendedThreadCount(); // how many threads do we use to generate the hash tree
+
+ LOGGER.info("Threads: " + hashFactoryThreads);
+
IncrementalBackupFeedback feedback;
try {
File levelPathFile = new File(levelPath);
// construct incremental backup index
PrintUtil.info("Hashing files...");
- ObjectCollection collection = new ObjectCollectionFactory<>(Sha256Identifier.getFactory())
+ ObjectCollection collection = new ObjectCollectionFactory<>(Sha256Identifier.getFactory(), hashFactoryThreads)
.fromDirectory(levelPathFile, new HashSet<>(Arrays.asList("session.lock", "kbackup_metadata")));
// update storage
@@ -56,10 +62,12 @@ public class ConfiguredIncrementalBackupMethod implements ConfiguredBackupMethod
PrintUtil.info("Incremental backup finished.");
feedback = new IncrementalBackupFeedback(filesAdded >= 0, filesAdded);
} catch (IOException e) {
+ e.printStackTrace(); // at least we should print it out if we discard the exception... Better than doing nothing.
feedback = new IncrementalBackupFeedback(false, 0);
}
if (!feedback.isSuccess()) {
+ LOGGER.severe("Failed to backup.");
// do clean-up if failed
File backupIndexFile = new File(backupIndexFileSaveDirectory, backupIndexFileName);
if (backupIndexFile.exists()) {
@@ -67,7 +75,6 @@ public class ConfiguredIncrementalBackupMethod implements ConfiguredBackupMethod
LOGGER.warning("Failed to clean up: cannot delete file " + backupIndexFile.getName());
}
}
-
//TODO: do more deep clean for object files
}
diff --git a/src/main/java/com/keuin/kbackupfabric/util/ThreadingUtil.java b/src/main/java/com/keuin/kbackupfabric/util/ThreadingUtil.java
new file mode 100644
index 0000000..b270895
--- /dev/null
+++ b/src/main/java/com/keuin/kbackupfabric/util/ThreadingUtil.java
@@ -0,0 +1,22 @@
+package com.keuin.kbackupfabric.util;
+
+public class ThreadingUtil {
+ public static int getRecommendedThreadCount() {
+ int coreCount = Runtime.getRuntime().availableProcessors();
+
+ // if the cores are too few, we regress to single thread
+ if (coreCount <= 2)
+ return 1;
+
+ // we have multiple cores, but not too many
+ if (coreCount == 3)
+ return 2;
+
+ // if we have multiple core, but not too many, we use a half
+ if (coreCount <= 6) // 4, 5, 6 -> 3, 3, 4
+ return (coreCount + 2) / 2;
+
+ // cores are sufficient, we use almost all of them, except a fixed count remained for the OS
+ return coreCount - 2;
+ }
+}
diff --git a/src/test/java/com/keuin/kbackupfabric/backup/incremental/ObjectCollectionFactoryTest.java b/src/test/java/com/keuin/kbackupfabric/backup/incremental/ObjectCollectionFactoryTest.java
index 9a2a73c..16fb258 100644
--- a/src/test/java/com/keuin/kbackupfabric/backup/incremental/ObjectCollectionFactoryTest.java
+++ b/src/test/java/com/keuin/kbackupfabric/backup/incremental/ObjectCollectionFactoryTest.java
@@ -25,10 +25,29 @@ public class ObjectCollectionFactoryTest {
}
@Test
- public void fromDirectory() {
+ public void fromDirectory1() {
+ fromDirectory(1);
+ }
+
+ @Test
+ public void fromDirectory2() {
+ fromDirectory(2);
+ }
+
+ @Test
+ public void fromDirectory4() {
+ fromDirectory(4);
+ }
+
+ @Test
+ public void fromDirectory8() {
+ fromDirectory(8);
+ }
+
+ public void fromDirectory(int threads) {
try {
ObjectCollectionFactory<Sha256Identifier> factory =
- new ObjectCollectionFactory<>(Sha256Identifier.getFactory());
+ new ObjectCollectionFactory<>(Sha256Identifier.getFactory(), threads);
ObjectCollection collection =
factory.fromDirectory(new File("./testfile/ObjectCollectionFactoryTest"));
diff --git a/src/test/java/com/keuin/kbackupfabric/backup/incremental/ObjectCollectionSerializerTest.java b/src/test/java/com/keuin/kbackupfabric/backup/incremental/ObjectCollectionSerializerTest.java
index bd25215..33f5a9c 100644
--- a/src/test/java/com/keuin/kbackupfabric/backup/incremental/ObjectCollectionSerializerTest.java
+++ b/src/test/java/com/keuin/kbackupfabric/backup/incremental/ObjectCollectionSerializerTest.java
@@ -10,10 +10,30 @@ import java.nio.file.Files;
import static org.junit.Assert.assertEquals;
public class ObjectCollectionSerializerTest {
+
+ @Test
+ public void testSerializationConsistency1() throws IOException {
+ testSerializationConsistency(1);
+ }
+
+ @Test
+ public void testSerializationConsistency2() throws IOException {
+ testSerializationConsistency(2);
+ }
+
+ @Test
+ public void testSerializationConsistency4() throws IOException {
+ testSerializationConsistency(4);
+ }
+
@Test
- public void testSerializationConsistency() throws IOException {
+ public void testSerializationConsistency8() throws IOException {
+ testSerializationConsistency(8);
+ }
+
+ public void testSerializationConsistency(int threads) throws IOException {
ObjectCollectionFactory<Sha256Identifier> factory =
- new ObjectCollectionFactory<>(Sha256Identifier.getFactory());
+ new ObjectCollectionFactory<>(Sha256Identifier.getFactory(), threads);
ObjectCollection collection =
factory.fromDirectory(new File("./testfile/ObjectCollectionFactoryTest"));
File file = new File("./testfile/serialized");
diff --git a/src/test/java/com/keuin/kbackupfabric/operation/backup/method/ConfiguredIncrementalBackupMethodTest.java b/src/test/java/com/keuin/kbackupfabric/operation/backup/method/ConfiguredIncrementalBackupMethodTest.java
index 30f2d44..2b4bd94 100644
--- a/src/test/java/com/keuin/kbackupfabric/operation/backup/method/ConfiguredIncrementalBackupMethodTest.java
+++ b/src/test/java/com/keuin/kbackupfabric/operation/backup/method/ConfiguredIncrementalBackupMethodTest.java
@@ -1,5 +1,6 @@
package com.keuin.kbackupfabric.operation.backup.method;
+import com.keuin.kbackupfabric.operation.backup.feedback.IncrementalBackupFeedback;
import org.apache.commons.codec.digest.DigestUtils;
import org.junit.Test;
@@ -20,9 +21,9 @@ public class ConfiguredIncrementalBackupMethodTest {
private final String destDirectoryName = "destination";
private final String indexFileName = "index";
- private final double directoryFactor = 0.02;
- private final double fileFactor = 0.05;
- private final int maxRandomFileSizeBytes = 1024 * 1024 * 16;
+ private final double directoryFactor = 0.03;
+ private final double fileFactor = 0.04;
+ private final int maxRandomFileSizeBytes = 1024 * 1024 * 4;
private final Function<Integer, Integer> scaleDecayFunc = (x) -> x - 1;
@Test
@@ -66,11 +67,15 @@ public class ConfiguredIncrementalBackupMethodTest {
testTempPath,
destPath.toString()
);
- method.backup();
+ IncrementalBackupFeedback feedback = method.backup();
+ assertNotNull("Feedback should not be null.", feedback);
+ if (!feedback.isSuccess())
+ fail("Failed to backup. Cannot perform test.");
+ assertTrue("Index file was not generated by backup.", new File(testTempPath, indexFileName).exists());
// delete src
forceDelete(sourcePath.toFile());
- assertFalse(sourcePath.toFile().isDirectory());
+ assertFalse(sourcePath.toFile().exists());
// restore src
if (!method.restore())