summaryrefslogtreecommitdiff
path: root/src/main/java/com/keuin/blame
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/keuin/blame')
-rw-r--r--src/main/java/com/keuin/blame/SubmitWorker.java33
1 files changed, 26 insertions, 7 deletions
diff --git a/src/main/java/com/keuin/blame/SubmitWorker.java b/src/main/java/com/keuin/blame/SubmitWorker.java
index 3582688..0d86c0a 100644
--- a/src/main/java/com/keuin/blame/SubmitWorker.java
+++ b/src/main/java/com/keuin/blame/SubmitWorker.java
@@ -26,16 +26,16 @@ public class SubmitWorker {
private final BlockingQueue<LogEntry> queue = new ArrayBlockingQueue<>(1048576);
private final Thread thread = new Thread(SubmitWorker.this::run);
private final AtomicBoolean isStopped = new AtomicBoolean(false);
-
private static final int batchSize = 1024;
private static final int maxWaitMillis = 3000;
-
private static final int idleMaxWaitMillis = 600 * 1000;
-
+ private static final int minReportIntervalMillis = 600 * 1000;
private final AtomicInteger playerCounter = new AtomicInteger();
private final AtomicBoolean isPlayerPresent = new AtomicBoolean(false);
-
private final AtomicBoolean playerJoin = new AtomicBoolean(false);
+ private long lastReportTimestamp = 0;
+ private long entryWritten = 0;
+ private long bufferFlushCount = 0;
public void playerJoin() {
if (isStopped.get()) return;
@@ -103,7 +103,24 @@ public class SubmitWorker {
} else {
// try to read more entries with timeout
var duration = getSleepDuration();
- logger.info("Sleep duration: " + duration);
+ logger.debug("Sleep duration: " + duration);
+ var ts = System.currentTimeMillis();
+ var interval = ts - lastReportTimestamp;
+ if (interval > minReportIntervalMillis) {
+ if (lastReportTimestamp <= 0) {
+ lastReportTimestamp = ts;
+ } else {
+ double speed = (double) entryWritten / interval * 1000;
+ logger.info(String.format(
+ "Stats in last %.1f seconds: %.3f item/sec written, %d element in write buffer, " +
+ "buffer interval: %d, buffer flush counter: %s",
+ interval / 1000.0, speed, queue.size(), getSleepDuration(), bufferFlushCount
+ ));
+ entryWritten = 0;
+ bufferFlushCount = 0;
+ lastReportTimestamp = ts;
+ }
+ }
el = queue.poll(duration, TimeUnit.MILLISECONDS);
}
}
@@ -130,7 +147,7 @@ public class SubmitWorker {
logger.error("bulkWrite is called with empty write buffer");
return;
}
- logger.info("bulkWrite size: " + buffer.size());
+ logger.debug("bulkWrite size: " + buffer.size());
CompletableFuture<ClickHouseResponse> fut;
try (var os = ClickHouseDataStreamFactory.getInstance()
.createPipedOutputStream(req.getConfig())) {
@@ -143,13 +160,15 @@ public class SubmitWorker {
var summary = resp.getSummary();
var expected = buffer.size();
var actual = summary.getReadRows();
- logger.info("Write success: " + summary.toString());
+ logger.debug("Write success: " + summary.toString());
if (expected != actual) {
logger.error(String.format(
"Unexpected write rows, expected %d (buffer), actual %d (write summary)",
expected, actual));
}
}
+ entryWritten += buffer.size();
+ bufferFlushCount++;
buffer.clear();
}