From 86a6753d9fa4068dcebbc705e661492e280e1622 Mon Sep 17 00:00:00 2001 From: Keuin Date: Mon, 23 Oct 2023 23:38:04 +0800 Subject: reduce stats report frequency --- src/main/java/com/keuin/blame/SubmitWorker.java | 33 +++++++++++++++++++------ 1 file changed, 26 insertions(+), 7 deletions(-) (limited to 'src/main/java/com/keuin') diff --git a/src/main/java/com/keuin/blame/SubmitWorker.java b/src/main/java/com/keuin/blame/SubmitWorker.java index 3582688..0d86c0a 100644 --- a/src/main/java/com/keuin/blame/SubmitWorker.java +++ b/src/main/java/com/keuin/blame/SubmitWorker.java @@ -26,16 +26,16 @@ public class SubmitWorker { private final BlockingQueue queue = new ArrayBlockingQueue<>(1048576); private final Thread thread = new Thread(SubmitWorker.this::run); private final AtomicBoolean isStopped = new AtomicBoolean(false); - private static final int batchSize = 1024; private static final int maxWaitMillis = 3000; - private static final int idleMaxWaitMillis = 600 * 1000; - + private static final int minReportIntervalMillis = 600 * 1000; private final AtomicInteger playerCounter = new AtomicInteger(); private final AtomicBoolean isPlayerPresent = new AtomicBoolean(false); - private final AtomicBoolean playerJoin = new AtomicBoolean(false); + private long lastReportTimestamp = 0; + private long entryWritten = 0; + private long bufferFlushCount = 0; public void playerJoin() { if (isStopped.get()) return; @@ -103,7 +103,24 @@ public class SubmitWorker { } else { // try to read more entries with timeout var duration = getSleepDuration(); - logger.info("Sleep duration: " + duration); + logger.debug("Sleep duration: " + duration); + var ts = System.currentTimeMillis(); + var interval = ts - lastReportTimestamp; + if (interval > minReportIntervalMillis) { + if (lastReportTimestamp <= 0) { + lastReportTimestamp = ts; + } else { + double speed = (double) entryWritten / interval * 1000; + logger.info(String.format( + "Stats in last %.1f seconds: %.3f item/sec written, %d element in write buffer, " + + "buffer interval: %d, buffer flush counter: %s", + interval / 1000.0, speed, queue.size(), getSleepDuration(), bufferFlushCount + )); + entryWritten = 0; + bufferFlushCount = 0; + lastReportTimestamp = ts; + } + } el = queue.poll(duration, TimeUnit.MILLISECONDS); } } @@ -130,7 +147,7 @@ public class SubmitWorker { logger.error("bulkWrite is called with empty write buffer"); return; } - logger.info("bulkWrite size: " + buffer.size()); + logger.debug("bulkWrite size: " + buffer.size()); CompletableFuture fut; try (var os = ClickHouseDataStreamFactory.getInstance() .createPipedOutputStream(req.getConfig())) { @@ -143,13 +160,15 @@ public class SubmitWorker { var summary = resp.getSummary(); var expected = buffer.size(); var actual = summary.getReadRows(); - logger.info("Write success: " + summary.toString()); + logger.debug("Write success: " + summary.toString()); if (expected != actual) { logger.error(String.format( "Unexpected write rows, expected %d (buffer), actual %d (write summary)", expected, actual)); } } + entryWritten += buffer.size(); + bufferFlushCount++; buffer.clear(); } -- cgit v1.2.3