From eb0795d9e7f474379d99c6dc4f483cd707b205dc Mon Sep 17 00:00:00 2001 From: Keuin Date: Sat, 21 Oct 2023 16:51:26 +0800 Subject: bugfix: write error --- src/main/java/com/keuin/blame/SubmitWorker.java | 50 ++++++++++++++++------ .../java/com/keuin/blame/data/entry/LogEntry.java | 16 ------- .../keuin/blame/lookup/AbstractLookupFilter.java | 7 --- .../java/com/keuin/blame/test/TestDatabase.java | 21 ++++----- 4 files changed, 45 insertions(+), 49 deletions(-) delete mode 100644 src/main/java/com/keuin/blame/lookup/AbstractLookupFilter.java (limited to 'src/main/java') diff --git a/src/main/java/com/keuin/blame/SubmitWorker.java b/src/main/java/com/keuin/blame/SubmitWorker.java index 3c33fd6..ba4a922 100644 --- a/src/main/java/com/keuin/blame/SubmitWorker.java +++ b/src/main/java/com/keuin/blame/SubmitWorker.java @@ -13,9 +13,7 @@ import org.jetbrains.annotations.NotNull; import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import static com.clickhouse.client.ClickHouseCredentials.fromUserAndPassword; @@ -90,27 +88,43 @@ public class SubmitWorker { private void bulkWrite( List buffer, ClickHouseRequest.Mutation req - ) throws IOException, ClickHouseException { + ) throws IOException, ClickHouseException, ExecutionException, InterruptedException { + if (buffer.isEmpty()) { + logger.error("bulkWrite is called with empty write buffer"); + return; + } + logger.info("bulkWrite size: " + buffer.size()); + CompletableFuture fut; try (var os = ClickHouseDataStreamFactory.getInstance() - .createPipedOutputStream(req.getConfig(), (Runnable) null); - var resp = req.data(os.getInputStream()).executeAndWait()) { + .createPipedOutputStream(req.getConfig())) { + fut = req.data(os.getInputStream()).execute(); for (var el : buffer) { el.write(os); } - logger.info(String.format("Written %d log entries to ClickHouse. %s", - buffer.size(), resp.getSummary().toString())); + } + try (var resp = fut.get()) { + var summary = resp.getSummary(); + var expected = buffer.size(); + var actual = summary.getReadRows(); + logger.info("Write success: " + summary.toString()); + if (expected != actual) { + logger.error(String.format( + "Unexpected write rows, expected %d (buffer), actual %d (write summary)", + expected, actual)); + } } buffer.clear(); } private void run() { var config = DatabaseUtil.DB_CONFIG; + logger.info("Database: " + config.toString()); var server = ClickHouseNode.builder() .host(config.address()) .port(ClickHouseProtocol.HTTP, config.port()) // .port(ClickHouseProtocol.GRPC, Integer.getInteger("chPort", 9100)) // .port(ClickHouseProtocol.TCP, Integer.getInteger("chPort", 9000)) - .database(config.table()) + .database(config.database()) .credentials(fromUserAndPassword(config.username(), config.password())) .build(); @@ -121,7 +135,12 @@ public class SubmitWorker { try (var client = ClickHouseClient.newInstance(server.getProtocol())) { writeLoop: while (true) { - var result = work(client, server, batchBuffer, writeImmediately); + var req = client.read(server).write() + .table(config.table()) + .format(ClickHouseFormat.RowBinary) +// .option(ClickHouseClientOption.ASYNC, false) + .option(ClickHouseClientOption.COMPRESS, false); + var result = work(client, req, batchBuffer, writeImmediately); switch (result) { case CONTINUE -> { writeImmediately = false; @@ -133,6 +152,9 @@ public class SubmitWorker { case FINISH -> { break workLoop; } + case INSTANT_WRITE -> { + writeImmediately = true; + } } } } @@ -148,12 +170,10 @@ public class SubmitWorker { private @NotNull WorkResult work( ClickHouseClient client, - ClickHouseNode server, + ClickHouseRequest.Mutation req, List buffer, boolean writeImmediately ) { - var req = client.write(server).format(ClickHouseFormat.RowBinary) - .option(ClickHouseClientOption.ASYNC, false); boolean interrupted = false; // if writeImmediately is set, do not accumulate the buffer, flush it immediately try { @@ -177,13 +197,15 @@ public class SubmitWorker { return WorkResult.FINISH; } return WorkResult.RECONNECT; - } catch (ClickHouseException ex) { + } catch (ClickHouseException | ExecutionException ex) { logger.error("ClickHouse writer error", ex); try { Thread.sleep(3000); } catch (InterruptedException ignored) { } return WorkResult.RECONNECT; + } catch (InterruptedException e) { + return WorkResult.INSTANT_WRITE; } return WorkResult.CONTINUE; } diff --git a/src/main/java/com/keuin/blame/data/entry/LogEntry.java b/src/main/java/com/keuin/blame/data/entry/LogEntry.java index 839afdd..fba04be 100644 --- a/src/main/java/com/keuin/blame/data/entry/LogEntry.java +++ b/src/main/java/com/keuin/blame/data/entry/LogEntry.java @@ -10,7 +10,6 @@ import com.keuin.blame.util.UuidUtils; import net.minecraft.MinecraftVersion; import java.io.IOException; -import java.io.OutputStream; import java.util.Objects; import java.util.UUID; @@ -148,49 +147,34 @@ public class LogEntry { } public void write(ClickHousePipedOutputStream os) throws IOException { - BinaryStreamUtils.writeNonNull(os); BinaryStreamUtils.writeInt8(os, actionType.getValue()); - BinaryStreamUtils.writeNonNull(os); BinaryStreamUtils.writeFixedString(os, gameVersion, 8); - BinaryStreamUtils.writeNonNull(os); BinaryStreamUtils.writeString(os, objectId); - BinaryStreamUtils.writeNonNull(os); BinaryStreamUtils.writeFixedString(os, objectPos.getWorld(), 24); - BinaryStreamUtils.writeNonNull(os); BinaryStreamUtils.writeInt64(os, (long) (objectPos.getX())); - BinaryStreamUtils.writeNonNull(os); BinaryStreamUtils.writeInt64(os, (long) (objectPos.getY())); - BinaryStreamUtils.writeNonNull(os); BinaryStreamUtils.writeInt64(os, (long) (objectPos.getZ())); - BinaryStreamUtils.writeNonNull(os); BinaryStreamUtils.writeInt32(os, objectType.getValue()); - BinaryStreamUtils.writeNonNull(os); BinaryStreamUtils.writeString(os, subjectId); - BinaryStreamUtils.writeNonNull(os); BinaryStreamUtils.writeFixedString(os, subjectPos.getWorld(), 24); - BinaryStreamUtils.writeNonNull(os); BinaryStreamUtils.writeFloat64(os, subjectPos.getX()); - BinaryStreamUtils.writeNonNull(os); BinaryStreamUtils.writeFloat64(os, subjectPos.getY()); - BinaryStreamUtils.writeNonNull(os); BinaryStreamUtils.writeFloat64(os, subjectPos.getZ()); - BinaryStreamUtils.writeNonNull(os); BinaryStreamUtils.writeFixedString(os, subjectUUID.toString(), 36); // lowercase - BinaryStreamUtils.writeNonNull(os); BinaryStreamUtils.writeInt64(os, timeMillis); } } diff --git a/src/main/java/com/keuin/blame/lookup/AbstractLookupFilter.java b/src/main/java/com/keuin/blame/lookup/AbstractLookupFilter.java deleted file mode 100644 index f2e45dc..0000000 --- a/src/main/java/com/keuin/blame/lookup/AbstractLookupFilter.java +++ /dev/null @@ -1,7 +0,0 @@ -package com.keuin.blame.lookup; - -import org.bson.conversions.Bson; - -public interface AbstractLookupFilter { - abstract Bson filter(); -} \ No newline at end of file diff --git a/src/main/java/com/keuin/blame/test/TestDatabase.java b/src/main/java/com/keuin/blame/test/TestDatabase.java index 84bf4bc..6d40eb5 100644 --- a/src/main/java/com/keuin/blame/test/TestDatabase.java +++ b/src/main/java/com/keuin/blame/test/TestDatabase.java @@ -6,9 +6,6 @@ import com.keuin.blame.data.WorldPos; import com.keuin.blame.data.entry.LogEntry; import com.keuin.blame.data.enums.ActionType; import com.keuin.blame.data.enums.ObjectType; -import com.keuin.blame.lookup.DummyFilter; -import com.keuin.blame.lookup.LookupCallback; -import com.keuin.blame.lookup.LookupManager; import org.junit.Before; import org.junit.Test; @@ -70,15 +67,15 @@ public class TestDatabase { ); SubmitWorker.INSTANCE.submit(entry); Thread.sleep(2000); - LookupManager.INSTANCE.lookup(new DummyFilter(), new LookupCallback() { - @Override - public void onLookupFinishes(Iterable logEntries) { - for (LogEntry e : logEntries) { - System.out.println(e); - success[0] |= Objects.equals(e, entry); - } - } - }, 100); +// LookupManager.INSTANCE.lookup(new DummyFilter(), new LookupCallback() { +// @Override +// public void onLookupFinishes(Iterable logEntries) { +// for (LogEntry e : logEntries) { +// System.out.println(e); +// success[0] |= Objects.equals(e, entry); +// } +// } +// }, 100); Thread.sleep(2000); assertTrue(success[0]); } catch (Exception e) { -- cgit v1.2.3