summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKeuin <[email protected]>2023-10-23 02:47:01 +0800
committerKeuin <[email protected]>2023-10-23 02:47:01 +0800
commit311778a284d1eba2891d4477af534f3e1219c2b1 (patch)
treea0315d42f7aa1610da6b585c27a5c2c5ac42466a
parentd468e2f524022b1b1e5fe0e1d58a4efcc6583545 (diff)
bugfix: batch write collect time is unbounded
-rw-r--r--src/main/java/com/keuin/blame/SubmitWorker.java11
1 files changed, 9 insertions, 2 deletions
diff --git a/src/main/java/com/keuin/blame/SubmitWorker.java b/src/main/java/com/keuin/blame/SubmitWorker.java
index 8eba940..a704c7b 100644
--- a/src/main/java/com/keuin/blame/SubmitWorker.java
+++ b/src/main/java/com/keuin/blame/SubmitWorker.java
@@ -59,9 +59,13 @@ public class SubmitWorker {
* @throws InterruptedException The buffer may be empty or non-empty.
*/
private void accumulateBuffer(List<LogEntry> buffer) throws InterruptedException {
+ long accumulateStart = -1;
while (true) {
- if (buffer.size() >= batchSize) {
- // buffer is full, flush it
+ if (
+ buffer.size() >= batchSize ||
+ (accumulateStart > 0 && System.currentTimeMillis() - accumulateStart > maxWaitMillis)
+ ) {
+ // buffer is full, or max accumulate time reached, flush it
break;
}
var el = queue.poll();
@@ -79,6 +83,9 @@ public class SubmitWorker {
break;
}
buffer.add(el);
+ if (accumulateStart < 0) {
+ accumulateStart = System.currentTimeMillis();
+ }
}
}