summaryrefslogtreecommitdiff
path: root/src/main/java/com/keuin
diff options
context:
space:
mode:
authorKeuin <[email protected]>2023-10-21 16:51:26 +0800
committerKeuin <[email protected]>2023-10-21 16:51:26 +0800
commiteb0795d9e7f474379d99c6dc4f483cd707b205dc (patch)
tree3b2448d63cb585ce86b235995dafe82f83d2811b /src/main/java/com/keuin
parentbdd01d2a7e8fe4f322dad674966730a8538c4171 (diff)
bugfix: write error
Diffstat (limited to 'src/main/java/com/keuin')
-rw-r--r--src/main/java/com/keuin/blame/SubmitWorker.java50
-rw-r--r--src/main/java/com/keuin/blame/data/entry/LogEntry.java16
-rw-r--r--src/main/java/com/keuin/blame/lookup/AbstractLookupFilter.java7
-rw-r--r--src/main/java/com/keuin/blame/test/TestDatabase.java21
4 files changed, 45 insertions, 49 deletions
diff --git a/src/main/java/com/keuin/blame/SubmitWorker.java b/src/main/java/com/keuin/blame/SubmitWorker.java
index 3c33fd6..ba4a922 100644
--- a/src/main/java/com/keuin/blame/SubmitWorker.java
+++ b/src/main/java/com/keuin/blame/SubmitWorker.java
@@ -13,9 +13,7 @@ import org.jetbrains.annotations.NotNull;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import static com.clickhouse.client.ClickHouseCredentials.fromUserAndPassword;
@@ -90,27 +88,43 @@ public class SubmitWorker {
private void bulkWrite(
List<LogEntry> buffer,
ClickHouseRequest.Mutation req
- ) throws IOException, ClickHouseException {
+ ) throws IOException, ClickHouseException, ExecutionException, InterruptedException {
+ if (buffer.isEmpty()) {
+ logger.error("bulkWrite is called with empty write buffer");
+ return;
+ }
+ logger.info("bulkWrite size: " + buffer.size());
+ CompletableFuture<ClickHouseResponse> fut;
try (var os = ClickHouseDataStreamFactory.getInstance()
- .createPipedOutputStream(req.getConfig(), (Runnable) null);
- var resp = req.data(os.getInputStream()).executeAndWait()) {
+ .createPipedOutputStream(req.getConfig())) {
+ fut = req.data(os.getInputStream()).execute();
for (var el : buffer) {
el.write(os);
}
- logger.info(String.format("Written %d log entries to ClickHouse. %s",
- buffer.size(), resp.getSummary().toString()));
+ }
+ try (var resp = fut.get()) {
+ var summary = resp.getSummary();
+ var expected = buffer.size();
+ var actual = summary.getReadRows();
+ logger.info("Write success: " + summary.toString());
+ if (expected != actual) {
+ logger.error(String.format(
+ "Unexpected write rows, expected %d (buffer), actual %d (write summary)",
+ expected, actual));
+ }
}
buffer.clear();
}
private void run() {
var config = DatabaseUtil.DB_CONFIG;
+ logger.info("Database: " + config.toString());
var server = ClickHouseNode.builder()
.host(config.address())
.port(ClickHouseProtocol.HTTP, config.port())
// .port(ClickHouseProtocol.GRPC, Integer.getInteger("chPort", 9100))
// .port(ClickHouseProtocol.TCP, Integer.getInteger("chPort", 9000))
- .database(config.table())
+ .database(config.database())
.credentials(fromUserAndPassword(config.username(), config.password()))
.build();
@@ -121,7 +135,12 @@ public class SubmitWorker {
try (var client = ClickHouseClient.newInstance(server.getProtocol())) {
writeLoop:
while (true) {
- var result = work(client, server, batchBuffer, writeImmediately);
+ var req = client.read(server).write()
+ .table(config.table())
+ .format(ClickHouseFormat.RowBinary)
+// .option(ClickHouseClientOption.ASYNC, false)
+ .option(ClickHouseClientOption.COMPRESS, false);
+ var result = work(client, req, batchBuffer, writeImmediately);
switch (result) {
case CONTINUE -> {
writeImmediately = false;
@@ -133,6 +152,9 @@ public class SubmitWorker {
case FINISH -> {
break workLoop;
}
+ case INSTANT_WRITE -> {
+ writeImmediately = true;
+ }
}
}
}
@@ -148,12 +170,10 @@ public class SubmitWorker {
private @NotNull WorkResult work(
ClickHouseClient client,
- ClickHouseNode server,
+ ClickHouseRequest.Mutation req,
List<LogEntry> buffer,
boolean writeImmediately
) {
- var req = client.write(server).format(ClickHouseFormat.RowBinary)
- .option(ClickHouseClientOption.ASYNC, false);
boolean interrupted = false;
// if writeImmediately is set, do not accumulate the buffer, flush it immediately
try {
@@ -177,13 +197,15 @@ public class SubmitWorker {
return WorkResult.FINISH;
}
return WorkResult.RECONNECT;
- } catch (ClickHouseException ex) {
+ } catch (ClickHouseException | ExecutionException ex) {
logger.error("ClickHouse writer error", ex);
try {
Thread.sleep(3000);
} catch (InterruptedException ignored) {
}
return WorkResult.RECONNECT;
+ } catch (InterruptedException e) {
+ return WorkResult.INSTANT_WRITE;
}
return WorkResult.CONTINUE;
}
diff --git a/src/main/java/com/keuin/blame/data/entry/LogEntry.java b/src/main/java/com/keuin/blame/data/entry/LogEntry.java
index 839afdd..fba04be 100644
--- a/src/main/java/com/keuin/blame/data/entry/LogEntry.java
+++ b/src/main/java/com/keuin/blame/data/entry/LogEntry.java
@@ -10,7 +10,6 @@ import com.keuin.blame.util.UuidUtils;
import net.minecraft.MinecraftVersion;
import java.io.IOException;
-import java.io.OutputStream;
import java.util.Objects;
import java.util.UUID;
@@ -148,49 +147,34 @@ public class LogEntry {
}
public void write(ClickHousePipedOutputStream os) throws IOException {
- BinaryStreamUtils.writeNonNull(os);
BinaryStreamUtils.writeInt8(os, actionType.getValue());
- BinaryStreamUtils.writeNonNull(os);
BinaryStreamUtils.writeFixedString(os, gameVersion, 8);
- BinaryStreamUtils.writeNonNull(os);
BinaryStreamUtils.writeString(os, objectId);
- BinaryStreamUtils.writeNonNull(os);
BinaryStreamUtils.writeFixedString(os, objectPos.getWorld(), 24);
- BinaryStreamUtils.writeNonNull(os);
BinaryStreamUtils.writeInt64(os, (long) (objectPos.getX()));
- BinaryStreamUtils.writeNonNull(os);
BinaryStreamUtils.writeInt64(os, (long) (objectPos.getY()));
- BinaryStreamUtils.writeNonNull(os);
BinaryStreamUtils.writeInt64(os, (long) (objectPos.getZ()));
- BinaryStreamUtils.writeNonNull(os);
BinaryStreamUtils.writeInt32(os, objectType.getValue());
- BinaryStreamUtils.writeNonNull(os);
BinaryStreamUtils.writeString(os, subjectId);
- BinaryStreamUtils.writeNonNull(os);
BinaryStreamUtils.writeFixedString(os, subjectPos.getWorld(), 24);
- BinaryStreamUtils.writeNonNull(os);
BinaryStreamUtils.writeFloat64(os, subjectPos.getX());
- BinaryStreamUtils.writeNonNull(os);
BinaryStreamUtils.writeFloat64(os, subjectPos.getY());
- BinaryStreamUtils.writeNonNull(os);
BinaryStreamUtils.writeFloat64(os, subjectPos.getZ());
- BinaryStreamUtils.writeNonNull(os);
BinaryStreamUtils.writeFixedString(os, subjectUUID.toString(), 36); // lowercase
- BinaryStreamUtils.writeNonNull(os);
BinaryStreamUtils.writeInt64(os, timeMillis);
}
}
diff --git a/src/main/java/com/keuin/blame/lookup/AbstractLookupFilter.java b/src/main/java/com/keuin/blame/lookup/AbstractLookupFilter.java
deleted file mode 100644
index f2e45dc..0000000
--- a/src/main/java/com/keuin/blame/lookup/AbstractLookupFilter.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package com.keuin.blame.lookup;
-
-import org.bson.conversions.Bson;
-
-public interface AbstractLookupFilter {
- abstract Bson filter();
-} \ No newline at end of file
diff --git a/src/main/java/com/keuin/blame/test/TestDatabase.java b/src/main/java/com/keuin/blame/test/TestDatabase.java
index 84bf4bc..6d40eb5 100644
--- a/src/main/java/com/keuin/blame/test/TestDatabase.java
+++ b/src/main/java/com/keuin/blame/test/TestDatabase.java
@@ -6,9 +6,6 @@ import com.keuin.blame.data.WorldPos;
import com.keuin.blame.data.entry.LogEntry;
import com.keuin.blame.data.enums.ActionType;
import com.keuin.blame.data.enums.ObjectType;
-import com.keuin.blame.lookup.DummyFilter;
-import com.keuin.blame.lookup.LookupCallback;
-import com.keuin.blame.lookup.LookupManager;
import org.junit.Before;
import org.junit.Test;
@@ -70,15 +67,15 @@ public class TestDatabase {
);
SubmitWorker.INSTANCE.submit(entry);
Thread.sleep(2000);
- LookupManager.INSTANCE.lookup(new DummyFilter(), new LookupCallback() {
- @Override
- public void onLookupFinishes(Iterable<LogEntry> logEntries) {
- for (LogEntry e : logEntries) {
- System.out.println(e);
- success[0] |= Objects.equals(e, entry);
- }
- }
- }, 100);
+// LookupManager.INSTANCE.lookup(new DummyFilter(), new LookupCallback() {
+// @Override
+// public void onLookupFinishes(Iterable<LogEntry> logEntries) {
+// for (LogEntry e : logEntries) {
+// System.out.println(e);
+// success[0] |= Objects.equals(e, entry);
+// }
+// }
+// }, 100);
Thread.sleep(2000);
assertTrue(success[0]);
} catch (Exception e) {