From 1cb1364145aa9925f1b3bba64bf21b17a5dbc31f Mon Sep 17 00:00:00 2001 From: Keuin Date: Mon, 23 Oct 2023 23:08:42 +0800 Subject: increase write delay when no real player is online --- src/main/java/com/keuin/blame/Blame.java | 15 +++++++++ src/main/java/com/keuin/blame/SubmitWorker.java | 45 +++++++++++++++++++++---- 2 files changed, 54 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/keuin/blame/Blame.java b/src/main/java/com/keuin/blame/Blame.java index 4031e65..1374913 100644 --- a/src/main/java/com/keuin/blame/Blame.java +++ b/src/main/java/com/keuin/blame/Blame.java @@ -17,11 +17,13 @@ import net.fabricmc.api.ModInitializer; import net.fabricmc.fabric.api.command.v1.CommandRegistrationCallback; import net.fabricmc.fabric.api.event.lifecycle.v1.ServerLifecycleEvents; import net.fabricmc.fabric.api.event.player.*; +import net.fabricmc.fabric.api.networking.v1.ServerPlayConnectionEvents; import net.minecraft.command.CommandSource; import net.minecraft.command.argument.BlockPosArgumentType; import net.minecraft.command.argument.DimensionArgumentType; import net.minecraft.server.command.CommandManager; import net.minecraft.server.command.ServerCommandSource; +import net.minecraft.server.network.ServerPlayerEntity; import java.io.File; import java.io.IOException; @@ -82,6 +84,19 @@ public class Blame implements ModInitializer { SubmitWorker.INSTANCE.stop(); }); + ServerPlayConnectionEvents.JOIN.register((handler, sender, server) -> { + // does not take carpet fake players into account + var isNormalPlayer = handler.player.getClass() == ServerPlayerEntity.class; + if (!isNormalPlayer) return; + SubmitWorker.INSTANCE.playerJoin(); + }); + + ServerPlayConnectionEvents.DISCONNECT.register((handler, server) -> { + var isNormalPlayer = handler.player.getClass() == ServerPlayerEntity.class; + if (!isNormalPlayer) return; + SubmitWorker.INSTANCE.playerQuit(); + }); + // hook game events AttackEntityCallback.EVENT.register(new AttackEntityAdapter(EventHandler.INSTANCE)); PlaceBlockHandler.EVENT.register(EventHandler.INSTANCE); diff --git a/src/main/java/com/keuin/blame/SubmitWorker.java b/src/main/java/com/keuin/blame/SubmitWorker.java index 103fc9c..3582688 100644 --- a/src/main/java/com/keuin/blame/SubmitWorker.java +++ b/src/main/java/com/keuin/blame/SubmitWorker.java @@ -17,6 +17,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; public class SubmitWorker { @@ -29,6 +30,28 @@ public class SubmitWorker { private static final int batchSize = 1024; private static final int maxWaitMillis = 3000; + private static final int idleMaxWaitMillis = 600 * 1000; + + private final AtomicInteger playerCounter = new AtomicInteger(); + private final AtomicBoolean isPlayerPresent = new AtomicBoolean(false); + + private final AtomicBoolean playerJoin = new AtomicBoolean(false); + + public void playerJoin() { + if (isStopped.get()) return; + playerCounter.incrementAndGet(); + playerJoin.set(true); + isPlayerPresent.set(true); + thread.interrupt(); // interrupt if sleeping + } + + public void playerQuit() { + var cnt = playerCounter.decrementAndGet(); + if (cnt == 0) { + isPlayerPresent.set(false); + } + } + private SubmitWorker() { thread.setUncaughtExceptionHandler((t, e) -> @@ -53,6 +76,10 @@ public class SubmitWorker { thread.interrupt(); } + private long getSleepDuration() { + return isPlayerPresent.get() ? maxWaitMillis : idleMaxWaitMillis; + } + /** * Ensures the buffer is ready to consume from. * @@ -63,7 +90,7 @@ public class SubmitWorker { while (true) { if ( buffer.size() >= batchSize || - (accumulateStart > 0 && System.currentTimeMillis() - accumulateStart > maxWaitMillis) + (accumulateStart > 0 && System.currentTimeMillis() - accumulateStart > getSleepDuration()) ) { // buffer is full, or max accumulate time reached, flush it break; @@ -75,7 +102,9 @@ public class SubmitWorker { el = queue.take(); } else { // try to read more entries with timeout - el = queue.poll(maxWaitMillis, TimeUnit.MILLISECONDS); + var duration = getSleepDuration(); + logger.info("Sleep duration: " + duration); + el = queue.poll(duration, TimeUnit.MILLISECONDS); } } if (el == null) { @@ -193,10 +222,14 @@ public class SubmitWorker { accumulateBuffer(buffer); } } catch (InterruptedException ignored) { - // server is closing, flush the buffer immediately - // decline new write requests - isStopped.set(true); - interrupted = true; + // check if the interruption is triggered by player join + // in this case, we don't stop, but try to write the buffer immediately + if (!playerJoin.getAndSet(false)) { + // server is closing, flush the buffer immediately + // decline new write requests + isStopped.set(true); + interrupted = true; + } } try { bulkWrite(buffer, req); -- cgit v1.2.3