diff options
Diffstat (limited to 'src/main/java/com/keuin/kbackupfabric')
4 files changed, 165 insertions, 7 deletions
diff --git a/src/main/java/com/keuin/kbackupfabric/backup/incremental/ObjectCollectionFactory.java b/src/main/java/com/keuin/kbackupfabric/backup/incremental/ObjectCollectionFactory.java index 0e02606..56fc052 100644 --- a/src/main/java/com/keuin/kbackupfabric/backup/incremental/ObjectCollectionFactory.java +++ b/src/main/java/com/keuin/kbackupfabric/backup/incremental/ObjectCollectionFactory.java @@ -9,6 +9,10 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.util.*; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.function.Consumer; +import java.util.function.Supplier; /** * Incremental backup is implemented as git-like file collection. @@ -19,18 +23,30 @@ import java.util.*; public class ObjectCollectionFactory<T extends ObjectIdentifier> { private final FileIdentifierProvider<T> identifierFactory; + private final int threads; + private Exception exception = null; // fail in async - public ObjectCollectionFactory(FileIdentifierProvider<T> identifierFactory) { + public ObjectCollectionFactory(FileIdentifierProvider<T> identifierFactory, int threads) { this.identifierFactory = identifierFactory; + this.threads = threads; + if (threads <= 0) + throw new IllegalArgumentException("thread count must be positive."); } public ObjectCollection fromDirectory(File directory, Set<String> ignoredFiles) throws IOException { - final Set<ObjectElement> subFiles = new HashSet<>(); + + final int minParallelProcessFileCountThreshold = 0; + + final Set<ObjectElement> subFiles = new ConcurrentSkipListSet<>(); + final Map<String, ObjectCollection> subCollections = new HashMap<>(); if (!Objects.requireNonNull(directory).isDirectory()) throw new IllegalArgumentException("given file is not a directory"); + + // TODO: use putter instead + Set<File> files = new HashSet<>(); for (Iterator<Path> iter = Files.walk(directory.toPath(), 1).iterator(); iter.hasNext(); ) { Path path = iter.next(); if (Files.isSameFile(path, directory.toPath())) @@ -39,17 +55,124 @@ public class ObjectCollectionFactory<T extends ObjectIdentifier> { if (file.isDirectory()) { subCollections.put(file.getName(), fromDirectory(file, ignoredFiles)); } else if (!ignoredFiles.contains(file.getName())) { - subFiles.add(new ObjectElement(file.getName(), identifierFactory.fromFile(file))); + files.add(file); // add to the set to be processed } else { PrintUtil.info(String.format("Skipping file %s.", file.getName())); } } - return new ObjectCollection(directory.getName(), subFiles, subCollections); + // deal with all direct sub files + if (threads == 1 || files.size() < minParallelProcessFileCountThreshold) { + for (File file : files) { + subFiles.add(new ObjectElement(file.getName(), identifierFactory.fromFile(file))); + } + } else { + // use ParallelSupplier to process + ParallelSupplier<ObjectElement> parallelSupplier = new ParallelSupplier<>(subFiles::add, threads); + files.forEach(file -> parallelSupplier.addTask(() -> { + try { + return new ObjectElement(file.getName(), identifierFactory.fromFile(file)); + } catch (IOException e) { + fail(e); + } + return null; + })); + parallelSupplier.process(); + } + + // check if any exception has been thrown in async workers. + synchronized (this) { + if (this.exception != null) { + if (exception instanceof IOException) + throw (IOException) exception; + else + throw new RuntimeException(exception); + } + } + + return new ObjectCollection(directory.getName(), new HashSet<>(subFiles), subCollections); } public ObjectCollection fromDirectory(File directory) throws IOException { return fromDirectory(directory, Collections.emptySet()); } + private synchronized void fail(IOException e) { + this.exception = e; + } + + /** + * A single-consumer, multiple-producer model. + * + * @param <Res> type of the resource to be produced and consumed. + */ + private static class ParallelSupplier<Res> { + + private final Consumer<Res> consumer; + private final ConcurrentLinkedQueue<Supplier<Res>> taskList = new ConcurrentLinkedQueue<>(); + private final int threads; + private final Set<ParallelWorker<Res>> workers = new HashSet<>(); + + public ParallelSupplier(Consumer<Res> consumer, int threads) { + this.consumer = consumer; + this.threads = threads; + } + + public void addTask(Supplier<Res> valueSupplier) { + this.taskList.add(valueSupplier); + } + + public void process() { + workers.clear(); + for (int i = 0; i < threads; i++) { + ParallelWorker<Res> worker = new ParallelWorker<Res>(taskList, consumer, i); + workers.add(worker); + worker.start(); + } + join(); // wait for all workers to exit before returning + } + + private void join() { + while (true) { + int aliveCount = 0; + for (ParallelWorker<Res> worker : workers) { + try { + if (worker.isAlive()) { + ++aliveCount; + worker.join(); + } + } catch (InterruptedException ignored) { + } + } + if (aliveCount == 0) + return; + } + } + + private static class ParallelWorker<V> extends Thread { + + private final Queue<Supplier<V>> taskProvider; + private final Consumer<V> consumer; + + public ParallelWorker(Queue<Supplier<V>> taskProvider, Consumer<V> consumer, int workerId) { + super("PutterWorker#" + workerId); + this.taskProvider = taskProvider; + this.consumer = consumer; + } + + @Override + public void run() { + Supplier<V> supplier; + while ((supplier = taskProvider.poll()) != null) { + // here we do not let the consumer accept null productions + Optional.ofNullable(supplier.get()).ifPresent(consumer); + } + } + } + } + +// private interface Puttable<K, V> { +// void put(K key, V value); +// } + } diff --git a/src/main/java/com/keuin/kbackupfabric/backup/incremental/ObjectElement.java b/src/main/java/com/keuin/kbackupfabric/backup/incremental/ObjectElement.java index 0bb7873..ee677fa 100644 --- a/src/main/java/com/keuin/kbackupfabric/backup/incremental/ObjectElement.java +++ b/src/main/java/com/keuin/kbackupfabric/backup/incremental/ObjectElement.java @@ -1,6 +1,7 @@ package com.keuin.kbackupfabric.backup.incremental; import com.keuin.kbackupfabric.backup.incremental.identifier.ObjectIdentifier; +import org.jetbrains.annotations.NotNull; import java.io.Serializable; import java.util.Objects; @@ -9,7 +10,7 @@ import java.util.Objects; * Representing a file in a ObjectCollection. * Immutable. */ -public class ObjectElement implements Serializable { +public class ObjectElement implements Serializable, Comparable<ObjectElement> { private final String name; private final ObjectIdentifier identifier; @@ -57,4 +58,9 @@ public class ObjectElement implements Serializable { ", identifier=" + identifier + '}'; } + + @Override + public int compareTo(@NotNull ObjectElement objectElement) { + return name.compareTo(objectElement.name); + } } diff --git a/src/main/java/com/keuin/kbackupfabric/operation/backup/method/ConfiguredIncrementalBackupMethod.java b/src/main/java/com/keuin/kbackupfabric/operation/backup/method/ConfiguredIncrementalBackupMethod.java index baa6580..ab0c114 100644 --- a/src/main/java/com/keuin/kbackupfabric/operation/backup/method/ConfiguredIncrementalBackupMethod.java +++ b/src/main/java/com/keuin/kbackupfabric/operation/backup/method/ConfiguredIncrementalBackupMethod.java @@ -8,6 +8,7 @@ import com.keuin.kbackupfabric.backup.incremental.manager.IncrementalBackupStora import com.keuin.kbackupfabric.operation.backup.feedback.IncrementalBackupFeedback; import com.keuin.kbackupfabric.util.FilesystemUtil; import com.keuin.kbackupfabric.util.PrintUtil; +import com.keuin.kbackupfabric.util.ThreadingUtil; import java.io.File; import java.io.IOException; @@ -34,13 +35,18 @@ public class ConfiguredIncrementalBackupMethod implements ConfiguredBackupMethod @Override public IncrementalBackupFeedback backup() { + + final int hashFactoryThreads = ThreadingUtil.getRecommendedThreadCount(); // how many threads do we use to generate the hash tree + + LOGGER.info("Threads: " + hashFactoryThreads); + IncrementalBackupFeedback feedback; try { File levelPathFile = new File(levelPath); // construct incremental backup index PrintUtil.info("Hashing files..."); - ObjectCollection collection = new ObjectCollectionFactory<>(Sha256Identifier.getFactory()) + ObjectCollection collection = new ObjectCollectionFactory<>(Sha256Identifier.getFactory(), hashFactoryThreads) .fromDirectory(levelPathFile, new HashSet<>(Arrays.asList("session.lock", "kbackup_metadata"))); // update storage @@ -56,10 +62,12 @@ public class ConfiguredIncrementalBackupMethod implements ConfiguredBackupMethod PrintUtil.info("Incremental backup finished."); feedback = new IncrementalBackupFeedback(filesAdded >= 0, filesAdded); } catch (IOException e) { + e.printStackTrace(); // at least we should print it out if we discard the exception... Better than doing nothing. feedback = new IncrementalBackupFeedback(false, 0); } if (!feedback.isSuccess()) { + LOGGER.severe("Failed to backup."); // do clean-up if failed File backupIndexFile = new File(backupIndexFileSaveDirectory, backupIndexFileName); if (backupIndexFile.exists()) { @@ -67,7 +75,6 @@ public class ConfiguredIncrementalBackupMethod implements ConfiguredBackupMethod LOGGER.warning("Failed to clean up: cannot delete file " + backupIndexFile.getName()); } } - //TODO: do more deep clean for object files } diff --git a/src/main/java/com/keuin/kbackupfabric/util/ThreadingUtil.java b/src/main/java/com/keuin/kbackupfabric/util/ThreadingUtil.java new file mode 100644 index 0000000..b270895 --- /dev/null +++ b/src/main/java/com/keuin/kbackupfabric/util/ThreadingUtil.java @@ -0,0 +1,22 @@ +package com.keuin.kbackupfabric.util; + +public class ThreadingUtil { + public static int getRecommendedThreadCount() { + int coreCount = Runtime.getRuntime().availableProcessors(); + + // if the cores are too few, we regress to single thread + if (coreCount <= 2) + return 1; + + // we have multiple cores, but not too many + if (coreCount == 3) + return 2; + + // if we have multiple core, but not too many, we use a half + if (coreCount <= 6) // 4, 5, 6 -> 3, 3, 4 + return (coreCount + 2) / 2; + + // cores are sufficient, we use almost all of them, except a fixed count remained for the OS + return coreCount - 2; + } +} |