summaryrefslogtreecommitdiff
path: root/src/main
diff options
context:
space:
mode:
authorKeuin <[email protected]>2023-10-23 23:08:42 +0800
committerKeuin <[email protected]>2023-10-23 23:08:42 +0800
commit1cb1364145aa9925f1b3bba64bf21b17a5dbc31f (patch)
tree9fd64eecafe8c3ead1e04f4e9e01a51b8044f54e /src/main
parentf4296321542128163fa38be56c272c008f3d602f (diff)
increase write delay when no real player is online
Diffstat (limited to 'src/main')
-rw-r--r--src/main/java/com/keuin/blame/Blame.java15
-rw-r--r--src/main/java/com/keuin/blame/SubmitWorker.java45
2 files changed, 54 insertions, 6 deletions
diff --git a/src/main/java/com/keuin/blame/Blame.java b/src/main/java/com/keuin/blame/Blame.java
index 4031e65..1374913 100644
--- a/src/main/java/com/keuin/blame/Blame.java
+++ b/src/main/java/com/keuin/blame/Blame.java
@@ -17,11 +17,13 @@ import net.fabricmc.api.ModInitializer;
import net.fabricmc.fabric.api.command.v1.CommandRegistrationCallback;
import net.fabricmc.fabric.api.event.lifecycle.v1.ServerLifecycleEvents;
import net.fabricmc.fabric.api.event.player.*;
+import net.fabricmc.fabric.api.networking.v1.ServerPlayConnectionEvents;
import net.minecraft.command.CommandSource;
import net.minecraft.command.argument.BlockPosArgumentType;
import net.minecraft.command.argument.DimensionArgumentType;
import net.minecraft.server.command.CommandManager;
import net.minecraft.server.command.ServerCommandSource;
+import net.minecraft.server.network.ServerPlayerEntity;
import java.io.File;
import java.io.IOException;
@@ -82,6 +84,19 @@ public class Blame implements ModInitializer {
SubmitWorker.INSTANCE.stop();
});
+ ServerPlayConnectionEvents.JOIN.register((handler, sender, server) -> {
+ // does not take carpet fake players into account
+ var isNormalPlayer = handler.player.getClass() == ServerPlayerEntity.class;
+ if (!isNormalPlayer) return;
+ SubmitWorker.INSTANCE.playerJoin();
+ });
+
+ ServerPlayConnectionEvents.DISCONNECT.register((handler, server) -> {
+ var isNormalPlayer = handler.player.getClass() == ServerPlayerEntity.class;
+ if (!isNormalPlayer) return;
+ SubmitWorker.INSTANCE.playerQuit();
+ });
+
// hook game events
AttackEntityCallback.EVENT.register(new AttackEntityAdapter(EventHandler.INSTANCE));
PlaceBlockHandler.EVENT.register(EventHandler.INSTANCE);
diff --git a/src/main/java/com/keuin/blame/SubmitWorker.java b/src/main/java/com/keuin/blame/SubmitWorker.java
index 103fc9c..3582688 100644
--- a/src/main/java/com/keuin/blame/SubmitWorker.java
+++ b/src/main/java/com/keuin/blame/SubmitWorker.java
@@ -17,6 +17,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
public class SubmitWorker {
@@ -29,6 +30,28 @@ public class SubmitWorker {
private static final int batchSize = 1024;
private static final int maxWaitMillis = 3000;
+ private static final int idleMaxWaitMillis = 600 * 1000;
+
+ private final AtomicInteger playerCounter = new AtomicInteger();
+ private final AtomicBoolean isPlayerPresent = new AtomicBoolean(false);
+
+ private final AtomicBoolean playerJoin = new AtomicBoolean(false);
+
+ public void playerJoin() {
+ if (isStopped.get()) return;
+ playerCounter.incrementAndGet();
+ playerJoin.set(true);
+ isPlayerPresent.set(true);
+ thread.interrupt(); // interrupt if sleeping
+ }
+
+ public void playerQuit() {
+ var cnt = playerCounter.decrementAndGet();
+ if (cnt == 0) {
+ isPlayerPresent.set(false);
+ }
+ }
+
private SubmitWorker() {
thread.setUncaughtExceptionHandler((t, e) ->
@@ -53,6 +76,10 @@ public class SubmitWorker {
thread.interrupt();
}
+ private long getSleepDuration() {
+ return isPlayerPresent.get() ? maxWaitMillis : idleMaxWaitMillis;
+ }
+
/**
* Ensures the buffer is ready to consume from.
*
@@ -63,7 +90,7 @@ public class SubmitWorker {
while (true) {
if (
buffer.size() >= batchSize ||
- (accumulateStart > 0 && System.currentTimeMillis() - accumulateStart > maxWaitMillis)
+ (accumulateStart > 0 && System.currentTimeMillis() - accumulateStart > getSleepDuration())
) {
// buffer is full, or max accumulate time reached, flush it
break;
@@ -75,7 +102,9 @@ public class SubmitWorker {
el = queue.take();
} else {
// try to read more entries with timeout
- el = queue.poll(maxWaitMillis, TimeUnit.MILLISECONDS);
+ var duration = getSleepDuration();
+ logger.info("Sleep duration: " + duration);
+ el = queue.poll(duration, TimeUnit.MILLISECONDS);
}
}
if (el == null) {
@@ -193,10 +222,14 @@ public class SubmitWorker {
accumulateBuffer(buffer);
}
} catch (InterruptedException ignored) {
- // server is closing, flush the buffer immediately
- // decline new write requests
- isStopped.set(true);
- interrupted = true;
+ // check if the interruption is triggered by player join
+ // in this case, we don't stop, but try to write the buffer immediately
+ if (!playerJoin.getAndSet(false)) {
+ // server is closing, flush the buffer immediately
+ // decline new write requests
+ isStopped.set(true);
+ interrupted = true;
+ }
}
try {
bulkWrite(buffer, req);