diff options
author | Keuin <[email protected]> | 2023-10-23 02:47:01 +0800 |
---|---|---|
committer | Keuin <[email protected]> | 2023-10-23 02:47:01 +0800 |
commit | 311778a284d1eba2891d4477af534f3e1219c2b1 (patch) | |
tree | a0315d42f7aa1610da6b585c27a5c2c5ac42466a | |
parent | d468e2f524022b1b1e5fe0e1d58a4efcc6583545 (diff) |
bugfix: batch write collect time is unbounded
-rw-r--r-- | src/main/java/com/keuin/blame/SubmitWorker.java | 11 |
1 files changed, 9 insertions, 2 deletions
diff --git a/src/main/java/com/keuin/blame/SubmitWorker.java b/src/main/java/com/keuin/blame/SubmitWorker.java index 8eba940..a704c7b 100644 --- a/src/main/java/com/keuin/blame/SubmitWorker.java +++ b/src/main/java/com/keuin/blame/SubmitWorker.java @@ -59,9 +59,13 @@ public class SubmitWorker { * @throws InterruptedException The buffer may be empty or non-empty. */ private void accumulateBuffer(List<LogEntry> buffer) throws InterruptedException { + long accumulateStart = -1; while (true) { - if (buffer.size() >= batchSize) { - // buffer is full, flush it + if ( + buffer.size() >= batchSize || + (accumulateStart > 0 && System.currentTimeMillis() - accumulateStart > maxWaitMillis) + ) { + // buffer is full, or max accumulate time reached, flush it break; } var el = queue.poll(); @@ -79,6 +83,9 @@ public class SubmitWorker { break; } buffer.add(el); + if (accumulateStart < 0) { + accumulateStart = System.currentTimeMillis(); + } } } |