diff options
author | Keuin <[email protected]> | 2023-10-21 14:42:37 +0800 |
---|---|---|
committer | Keuin <[email protected]> | 2023-10-21 14:42:37 +0800 |
commit | bdd01d2a7e8fe4f322dad674966730a8538c4171 (patch) | |
tree | 371f7fd3dac256c5e0deb87f2827a8073f7f0bbf /src/main/java/com | |
parent | ef573d007ac15c80561da6cee7033bda5778f590 (diff) |
write to ClickHouse
Diffstat (limited to 'src/main/java/com')
17 files changed, 258 insertions, 378 deletions
diff --git a/src/main/java/com/keuin/blame/SubmitWorker.java b/src/main/java/com/keuin/blame/SubmitWorker.java index 7b40baf..3c33fd6 100644 --- a/src/main/java/com/keuin/blame/SubmitWorker.java +++ b/src/main/java/com/keuin/blame/SubmitWorker.java @@ -1,31 +1,40 @@ package com.keuin.blame; +import com.clickhouse.client.*; +import com.clickhouse.client.config.ClickHouseClientOption; +import com.clickhouse.data.ClickHouseDataStreamFactory; +import com.clickhouse.data.ClickHouseFormat; import com.keuin.blame.data.entry.LogEntry; import com.keuin.blame.util.DatabaseUtil; -import com.mongodb.MongoClientException; -import com.mongodb.client.MongoClient; -import com.mongodb.client.MongoClients; -import com.mongodb.client.MongoCollection; -import com.mongodb.client.MongoDatabase; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.jetbrains.annotations.NotNull; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.logging.Logger; + +import static com.clickhouse.client.ClickHouseCredentials.fromUserAndPassword; public class SubmitWorker { public static final SubmitWorker INSTANCE = new SubmitWorker(); - private final Logger logger = Logger.getLogger(SubmitWorker.class.getName()); - + private final Logger logger = LogManager.getLogger(); private final BlockingQueue<LogEntry> queue = new ArrayBlockingQueue<>(1048576); private final Thread thread = new Thread(SubmitWorker.this::run); - private final AtomicBoolean isStopped = new AtomicBoolean(false); + private static final int batchSize = 1024; + private static final int maxWaitMillis = 3000; + private SubmitWorker() { - thread.setUncaughtExceptionHandler((t, e) -> logger.severe(String.format("Exception in thread %s: %s", t.getName(), e))); + thread.setUncaughtExceptionHandler((t, e) -> + logger.error(String.format("Exception in thread %s: %s", t.getName(), e))); thread.start(); } @@ -36,7 +45,7 @@ public class SubmitWorker { if (entry == null) throw new IllegalArgumentException("entry cannot be null"); if (!queue.offer(entry)) { - logger.severe("Write queue is full. Dropping new log entries."); + logger.error("Write queue is full. Dropping new log entries."); } } @@ -45,26 +54,138 @@ public class SubmitWorker { thread.interrupt(); } + /** + * Ensures the buffer is ready to consume from. + * + * @throws InterruptedException The buffer may be empty or non-empty. + */ + private void accumulateBuffer(List<LogEntry> buffer) throws InterruptedException { + while (true) { + if (buffer.size() >= batchSize) { + // buffer is full, flush it + break; + } + var el = queue.poll(); + if (el == null) { + if (buffer.isEmpty()) { + // block until the first entry is read + el = queue.take(); + } else { + // try to read more entries with timeout + el = queue.poll(maxWaitMillis, TimeUnit.MILLISECONDS); + } + } + if (el == null) { + // poll timed out, flush buffer + break; + } + buffer.add(el); + } + } + + /** + * Bulk write all entries in buffer to database. + * Throws exception if error. The buffer is kept intact. No item is written. + */ + private void bulkWrite( + List<LogEntry> buffer, + ClickHouseRequest.Mutation req + ) throws IOException, ClickHouseException { + try (var os = ClickHouseDataStreamFactory.getInstance() + .createPipedOutputStream(req.getConfig(), (Runnable) null); + var resp = req.data(os.getInputStream()).executeAndWait()) { + for (var el : buffer) { + el.write(os); + } + logger.info(String.format("Written %d log entries to ClickHouse. %s", + buffer.size(), resp.getSummary().toString())); + } + buffer.clear(); + } + private void run() { - try (final MongoClient mongoClient = MongoClients.create(DatabaseUtil.CLIENT_SETTINGS)) { - final MongoDatabase db = mongoClient.getDatabase( - DatabaseUtil.MONGO_CONFIG.getDatabaseName() - ); - final MongoCollection<LogEntry> collection = db.getCollection( - DatabaseUtil.MONGO_CONFIG.getLogCollectionName(), LogEntry.class - ); - // TODO: 第一个事件触发导致延迟很大 - while (!isStopped.get() || !queue.isEmpty()) { - try { - LogEntry entry = queue.take(); - collection.insertOne(entry); - } catch (InterruptedException ex) { - isStopped.set(true); + var config = DatabaseUtil.DB_CONFIG; + var server = ClickHouseNode.builder() + .host(config.address()) + .port(ClickHouseProtocol.HTTP, config.port()) + // .port(ClickHouseProtocol.GRPC, Integer.getInteger("chPort", 9100)) + // .port(ClickHouseProtocol.TCP, Integer.getInteger("chPort", 9000)) + .database(config.table()) + .credentials(fromUserAndPassword(config.username(), config.password())) + .build(); + + var batchBuffer = new ArrayList<LogEntry>(batchSize); + boolean writeImmediately = false; + workLoop: + while (true) { + try (var client = ClickHouseClient.newInstance(server.getProtocol())) { + writeLoop: + while (true) { + var result = work(client, server, batchBuffer, writeImmediately); + switch (result) { + case CONTINUE -> { + writeImmediately = false; + } + case RECONNECT -> { + writeImmediately = true; + break writeLoop; + } + case FINISH -> { + break workLoop; + } + } } } - } catch (MongoClientException exception) { - logger.severe("Failed to submit: " + exception + ". Worker is quitting..."); } } + enum WorkResult { + CONTINUE, + RECONNECT, + FINISH, + INSTANT_WRITE + } + + private @NotNull WorkResult work( + ClickHouseClient client, + ClickHouseNode server, + List<LogEntry> buffer, + boolean writeImmediately + ) { + var req = client.write(server).format(ClickHouseFormat.RowBinary) + .option(ClickHouseClientOption.ASYNC, false); + boolean interrupted = false; + // if writeImmediately is set, do not accumulate the buffer, flush it immediately + try { + // accumulate buffer + if (!writeImmediately) { + accumulateBuffer(buffer); + } + } catch (InterruptedException ignored) { + // server is closing, flush the buffer immediately + // decline new write requests + isStopped.set(true); + interrupted = true; + } + try { + bulkWrite(buffer, req); + } catch (IOException ignored) { + // write failed + if (interrupted) { + // do not retry if already interrupted + // the error may be unrecoverable + return WorkResult.FINISH; + } + return WorkResult.RECONNECT; + } catch (ClickHouseException ex) { + logger.error("ClickHouse writer error", ex); + try { + Thread.sleep(3000); + } catch (InterruptedException ignored) { + } + return WorkResult.RECONNECT; + } + return WorkResult.CONTINUE; + } + } diff --git a/src/main/java/com/keuin/blame/command/BlameBlockCommand.java b/src/main/java/com/keuin/blame/command/BlameBlockCommand.java index eb82fd5..dc54487 100644 --- a/src/main/java/com/keuin/blame/command/BlameBlockCommand.java +++ b/src/main/java/com/keuin/blame/command/BlameBlockCommand.java @@ -19,8 +19,6 @@ import net.minecraft.util.Formatting; import net.minecraft.util.Identifier; import net.minecraft.util.math.BlockPos; -import java.util.Collections; -import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -30,13 +28,12 @@ import static com.keuin.blame.command.Commands.SUCCESS; @SuppressWarnings("SameReturnValue") public class BlameBlockCommand { - public static final Map<String, Integer> timeUnitAmplifierMap = Collections - .unmodifiableMap(new HashMap<String, Integer>() {{ - put("second", 1); - put("minute", 60); - put("hour", 3600); - put("day", 86400); - }}); + public static final Map<String, Integer> timeUnitAmplifierMap = Map.of( + "second", 1, + "minute", 60, + "hour", 3600, + "day", 86400 + ); public static final Set<String> timeUnits = timeUnitAmplifierMap.keySet(); public static int blameGivenBlockPos(CommandContext<ServerCommandSource> context) throws CommandSyntaxException { diff --git a/src/main/java/com/keuin/blame/command/BlameStatCommand.java b/src/main/java/com/keuin/blame/command/BlameStatCommand.java index c42fa8f..295ef01 100644 --- a/src/main/java/com/keuin/blame/command/BlameStatCommand.java +++ b/src/main/java/com/keuin/blame/command/BlameStatCommand.java @@ -54,10 +54,10 @@ public class BlameStatCommand { logger.info("Collecting statistics..."); try (final MongoClient mongoClient = MongoClients.create(DatabaseUtil.CLIENT_SETTINGS)) { final MongoDatabase db = mongoClient.getDatabase( - DatabaseUtil.MONGO_CONFIG.getDatabaseName() + DatabaseUtil.DB_CONFIG.getDatabaseName() ); final MongoCollection<LogEntry> collection = db.getCollection( - DatabaseUtil.MONGO_CONFIG.getLogCollectionName(), LogEntry.class + DatabaseUtil.DB_CONFIG.getTableName(), LogEntry.class ); Collection<String> ids = VersionedLogEntryHelper.getLoggedSubjectsId(collection); diff --git a/src/main/java/com/keuin/blame/config/BlameConfig.java b/src/main/java/com/keuin/blame/config/BlameConfig.java index ace8fa3..f0edda7 100644 --- a/src/main/java/com/keuin/blame/config/BlameConfig.java +++ b/src/main/java/com/keuin/blame/config/BlameConfig.java @@ -7,15 +7,15 @@ import java.util.Objects; public class BlameConfig { @SerializedName("database") - private final MongoConfig mongoConfig; + private final DatabaseConfig databaseConfig; - public BlameConfig(MongoConfig mongoConfig) { - this.mongoConfig = mongoConfig; + public BlameConfig(DatabaseConfig databaseConfig) { + this.databaseConfig = databaseConfig; } - public MongoConfig getMongoConfig() { - return mongoConfig; + public DatabaseConfig getMongoConfig() { + return databaseConfig; } @Override @@ -23,18 +23,18 @@ public class BlameConfig { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; BlameConfig that = (BlameConfig) o; - return Objects.equals(mongoConfig, that.mongoConfig); + return Objects.equals(databaseConfig, that.databaseConfig); } @Override public int hashCode() { - return Objects.hash(mongoConfig); + return Objects.hash(databaseConfig); } @Override public String toString() { return "BlameConfig{" + - "mongoConfig=" + mongoConfig + + "databaseConfig=" + databaseConfig + '}'; } } diff --git a/src/main/java/com/keuin/blame/config/DatabaseConfig.java b/src/main/java/com/keuin/blame/config/DatabaseConfig.java new file mode 100644 index 0000000..e7d3189 --- /dev/null +++ b/src/main/java/com/keuin/blame/config/DatabaseConfig.java @@ -0,0 +1,11 @@ +package com.keuin.blame.config; + +public record DatabaseConfig( + String address, + int port, + String database, + String table, + String username, + String password +) { +} diff --git a/src/main/java/com/keuin/blame/config/MongoConfig.java b/src/main/java/com/keuin/blame/config/MongoConfig.java deleted file mode 100644 index 628af51..0000000 --- a/src/main/java/com/keuin/blame/config/MongoConfig.java +++ /dev/null @@ -1,72 +0,0 @@ -package com.keuin.blame.config; - -import com.google.gson.annotations.SerializedName; - -import java.util.Objects; - -public class MongoConfig { - - private final String address; - private final String username; - private final String password; - @SerializedName("database") - private final String databaseName; - @SerializedName("collection") - private final String logCollectionName; - - public MongoConfig(String address, String username, String password, String databaseName, String logCollectionName) { - this.address = address; - this.username = username; - this.password = password; - this.databaseName = databaseName; - this.logCollectionName = logCollectionName; - } - - public String getAddress() { - return address; - } - - public String getUsername() { - return username; - } - - public String getPassword() { - return password; - } - - public String getDatabaseName() { - return databaseName; - } - - public String getLogCollectionName() { - return logCollectionName; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - MongoConfig that = (MongoConfig) o; - return Objects.equals(address, that.address) && - Objects.equals(username, that.username) && - Objects.equals(password, that.password) && - Objects.equals(databaseName, that.databaseName) && - Objects.equals(logCollectionName, that.logCollectionName); - } - - @Override - public int hashCode() { - return Objects.hash(address, username, password, databaseName, logCollectionName); - } - - @Override - public String toString() { - return "MongoConfig{" + - "address='" + address + '\'' + - ", username='" + username + '\'' + - ", password='" + password + '\'' + - ", databaseName='" + databaseName + '\'' + - ", logCollectionName='" + logCollectionName + '\'' + - '}'; - } -} diff --git a/src/main/java/com/keuin/blame/data/entry/LogEntry.java b/src/main/java/com/keuin/blame/data/entry/LogEntry.java index 49ad720..839afdd 100644 --- a/src/main/java/com/keuin/blame/data/entry/LogEntry.java +++ b/src/main/java/com/keuin/blame/data/entry/LogEntry.java @@ -1,79 +1,69 @@ package com.keuin.blame.data.entry; +import com.clickhouse.data.ClickHousePipedOutputStream; +import com.clickhouse.data.format.BinaryStreamUtils; import com.keuin.blame.data.WorldPos; import com.keuin.blame.data.enums.ActionType; import com.keuin.blame.data.enums.ObjectType; -import com.keuin.blame.util.MinecraftUtil; import com.keuin.blame.util.PrettyUtil; import com.keuin.blame.util.UuidUtils; import net.minecraft.MinecraftVersion; -import org.bson.codecs.pojo.annotations.BsonProperty; +import java.io.IOException; +import java.io.OutputStream; import java.util.Objects; import java.util.UUID; -import static com.keuin.blame.data.entry.LogEntryNames.*; public class LogEntry { - // { - // "version": 1, // int - // "subject": { - // "uuid": player_uuid_bytes, // bytes - // "id": player_id, // string - // "pos": { - // "world": world_id, // string - // "x": pos_x, // float - // "y": pos_y, // float - // "z": pos_z, // float - // } - // }, - // "action": BLOCK_BREAK | BLOCK_PLACE | BLOCK_USE | ENTITY_USE | ENTITY_ATTACK | ITEM_USE, // int - // "object": { - // "type": OBJECT_BLOCK | OBJECT_ENTITY, // int - // "id": object_id, // string - // "pos": { - // "world": world_id, // string - // "x": pos_x, // float - // "y": pos_y, // float - // "z": pos_z, // float - // } - // } - //} - - @BsonProperty(VERSION) + /* + { + "version": 1, // int + "subject": { + "uuid": player_uuid_bytes, // bytes + "id": player_id, // string + "pos": { + "world": world_id, // string + "x": pos_x, // float + "y": pos_y, // float + "z": pos_z, // float + } + }, + "action": BLOCK_BREAK | BLOCK_PLACE | BLOCK_USE | ENTITY_USE | ENTITY_ATTACK | ITEM_USE, // int + "object": { + "type": OBJECT_BLOCK | OBJECT_ENTITY, // int + "id": object_id, // string + "pos": { + "world": world_id, // string + "x": pos_x, // float + "y": pos_y, // float + "z": pos_z, // float + } + } + } + */ + public int version = 1; - @BsonProperty(GAME_VERSION) public String gameVersion = MinecraftVersion.field_25319.getName(); - @BsonProperty(TIMESTAMP_MILLIS) - public long timeMillis = 0; + public long timeMillis = 0; // timestamp_millis - @BsonProperty(SUBJECT_ID) public String subjectId = ""; - @BsonProperty(SUBJECT_UUID) public UUID subjectUUID = UuidUtils.UUID_NULL; - @BsonProperty(SUBJECT_POS) public WorldPos subjectPos = WorldPos.NULL_POS; - @BsonProperty(ACTION_TYPE) public ActionType actionType = ActionType.NULL; - @BsonProperty(OBJECT_TYPE) public ObjectType objectType = ObjectType.NULL; - @BsonProperty(OBJECT_ID) public String objectId = ""; - @BsonProperty(OBJECT_POS) public WorldPos objectPos = WorldPos.NULL_POS; - @BsonProperty(RADIUS) - public double radius = 0; - public LogEntry() { } @@ -89,7 +79,6 @@ public class LogEntry { this.objectType = entry.objectType; this.objectId = entry.objectId; this.objectPos = entry.objectPos; - this.radius = entry.radius; } public LogEntry(long timeMillis, String subjectId, UUID subjectUUID, WorldPos subjectPos, ActionType actionType, ObjectType objectType, String objectId, WorldPos objectPos) { @@ -117,9 +106,6 @@ public class LogEntry { this.objectType = objectType; this.objectId = objectId; this.objectPos = objectPos; - - // v2 params - this.radius = MinecraftUtil.getRadius(objectPos); } @Override @@ -160,4 +146,51 @@ public class LogEntry { gameVersion + ")"; } + + public void write(ClickHousePipedOutputStream os) throws IOException { + BinaryStreamUtils.writeNonNull(os); + BinaryStreamUtils.writeInt8(os, actionType.getValue()); + + BinaryStreamUtils.writeNonNull(os); + BinaryStreamUtils.writeFixedString(os, gameVersion, 8); + + BinaryStreamUtils.writeNonNull(os); + BinaryStreamUtils.writeString(os, objectId); + + BinaryStreamUtils.writeNonNull(os); + BinaryStreamUtils.writeFixedString(os, objectPos.getWorld(), 24); + + BinaryStreamUtils.writeNonNull(os); + BinaryStreamUtils.writeInt64(os, (long) (objectPos.getX())); + + BinaryStreamUtils.writeNonNull(os); + BinaryStreamUtils.writeInt64(os, (long) (objectPos.getY())); + + BinaryStreamUtils.writeNonNull(os); + BinaryStreamUtils.writeInt64(os, (long) (objectPos.getZ())); + + BinaryStreamUtils.writeNonNull(os); + BinaryStreamUtils.writeInt32(os, objectType.getValue()); + + BinaryStreamUtils.writeNonNull(os); + BinaryStreamUtils.writeString(os, subjectId); + + BinaryStreamUtils.writeNonNull(os); + BinaryStreamUtils.writeFixedString(os, subjectPos.getWorld(), 24); + + BinaryStreamUtils.writeNonNull(os); + BinaryStreamUtils.writeFloat64(os, subjectPos.getX()); + + BinaryStreamUtils.writeNonNull(os); + BinaryStreamUtils.writeFloat64(os, subjectPos.getY()); + + BinaryStreamUtils.writeNonNull(os); + BinaryStreamUtils.writeFloat64(os, subjectPos.getZ()); + + BinaryStreamUtils.writeNonNull(os); + BinaryStreamUtils.writeFixedString(os, subjectUUID.toString(), 36); // lowercase + + BinaryStreamUtils.writeNonNull(os); + BinaryStreamUtils.writeInt64(os, timeMillis); + } } diff --git a/src/main/java/com/keuin/blame/data/entry/LogEntryNames.java b/src/main/java/com/keuin/blame/data/entry/LogEntryNames.java deleted file mode 100644 index 180f478..0000000 --- a/src/main/java/com/keuin/blame/data/entry/LogEntryNames.java +++ /dev/null @@ -1,15 +0,0 @@ -package com.keuin.blame.data.entry; - -public class LogEntryNames { - public static final String VERSION = "version"; - public static final String GAME_VERSION = "game_version"; - public static final String TIMESTAMP_MILLIS = "timestamp_millis"; - public static final String SUBJECT_ID = "subject_id"; - public static final String SUBJECT_UUID = "subject_uuid"; - public static final String SUBJECT_POS = "subject_pos"; - public static final String ACTION_TYPE = "action_type"; - public static final String OBJECT_TYPE = "object_type"; - public static final String OBJECT_ID = "object_id"; - public static final String OBJECT_POS = "object_pos"; - public static final String RADIUS = "object_radius"; -} diff --git a/src/main/java/com/keuin/blame/data/enums/codec/AbstractIntegerEnumCodec.java b/src/main/java/com/keuin/blame/data/enums/codec/AbstractIntegerEnumCodec.java deleted file mode 100644 index 98f871a..0000000 --- a/src/main/java/com/keuin/blame/data/enums/codec/AbstractIntegerEnumCodec.java +++ /dev/null @@ -1,13 +0,0 @@ -package com.keuin.blame.data.enums.codec; - -import com.keuin.blame.data.enums.IntegerEnum; -import org.bson.BsonWriter; -import org.bson.codecs.Codec; -import org.bson.codecs.EncoderContext; - -public abstract class AbstractIntegerEnumCodec<T extends IntegerEnum> implements Codec<T> { - @Override - public void encode(BsonWriter writer, T value, EncoderContext encoderContext) { - writer.writeInt32(value.getValue()); - } -} diff --git a/src/main/java/com/keuin/blame/data/enums/codec/ActionTypeCodec.java b/src/main/java/com/keuin/blame/data/enums/codec/ActionTypeCodec.java deleted file mode 100644 index a7fc228..0000000 --- a/src/main/java/com/keuin/blame/data/enums/codec/ActionTypeCodec.java +++ /dev/null @@ -1,17 +0,0 @@ -package com.keuin.blame.data.enums.codec; - -import com.keuin.blame.data.enums.ActionType; -import org.bson.BsonReader; -import org.bson.codecs.DecoderContext; - -public class ActionTypeCodec extends AbstractIntegerEnumCodec<ActionType> { - @Override - public ActionType decode(BsonReader reader, DecoderContext decoderContext) { - return ActionType.parseInt(reader.readInt32()); - } - - @Override - public Class<ActionType> getEncoderClass() { - return ActionType.class; - } -} diff --git a/src/main/java/com/keuin/blame/data/enums/codec/ObjectTypeCodec.java b/src/main/java/com/keuin/blame/data/enums/codec/ObjectTypeCodec.java deleted file mode 100644 index 8b47cc6..0000000 --- a/src/main/java/com/keuin/blame/data/enums/codec/ObjectTypeCodec.java +++ /dev/null @@ -1,17 +0,0 @@ -package com.keuin.blame.data.enums.codec; - -import com.keuin.blame.data.enums.ObjectType; -import org.bson.BsonReader; -import org.bson.codecs.DecoderContext; - -public class ObjectTypeCodec extends AbstractIntegerEnumCodec<ObjectType> { - @Override - public ObjectType decode(BsonReader reader, DecoderContext decoderContext) { - return ObjectType.parseInt(reader.readInt32()); - } - - @Override - public Class<ObjectType> getEncoderClass() { - return ObjectType.class; - } -} diff --git a/src/main/java/com/keuin/blame/data/enums/codec/WorldPosCodec.java b/src/main/java/com/keuin/blame/data/enums/codec/WorldPosCodec.java deleted file mode 100644 index 1f0fa91..0000000 --- a/src/main/java/com/keuin/blame/data/enums/codec/WorldPosCodec.java +++ /dev/null @@ -1,51 +0,0 @@ -package com.keuin.blame.data.enums.codec; - -import com.keuin.blame.data.WorldPos; -import org.bson.BsonReader; -import org.bson.BsonWriter; -import org.bson.Document; -import org.bson.codecs.Codec; -import org.bson.codecs.DecoderContext; -import org.bson.codecs.DocumentCodec; -import org.bson.codecs.EncoderContext; - -import java.util.Optional; - -public class WorldPosCodec implements Codec<WorldPos> { - - private final Codec<Document> documentCodec; - - public WorldPosCodec() { - documentCodec = new DocumentCodec(); - } - - public WorldPosCodec(Codec<Document> documentCodec) { - this.documentCodec = documentCodec; - } - - @Override - public WorldPos decode(BsonReader reader, DecoderContext decoderContext) { - Document document = documentCodec.decode(reader, decoderContext); - return new WorldPos( - document.getString("world"), - document.getDouble("x"), - document.getDouble("y"), - document.getDouble("z") - ); - } - - @Override - public void encode(BsonWriter writer, WorldPos value, EncoderContext encoderContext) { - Document document = new Document(); - Optional.ofNullable(value.getWorld()).ifPresent(world -> document.put("world", world)); - document.put("x", value.getX()); - document.put("y", value.getY()); - document.put("z", value.getZ()); - documentCodec.encode(writer, document, encoderContext); - } - - @Override - public Class<WorldPos> getEncoderClass() { - return WorldPos.class; - } -} diff --git a/src/main/java/com/keuin/blame/data/enums/transformer/ActionTypeTransformer.java b/src/main/java/com/keuin/blame/data/enums/transformer/ActionTypeTransformer.java deleted file mode 100644 index b50118d..0000000 --- a/src/main/java/com/keuin/blame/data/enums/transformer/ActionTypeTransformer.java +++ /dev/null @@ -1,12 +0,0 @@ -package com.keuin.blame.data.enums.transformer; - -import com.keuin.blame.data.enums.ActionType; -import org.bson.Transformer; - -public class ActionTypeTransformer implements Transformer { - @Override - public Object transform(Object objectToTransform) { - ActionType actionType = (ActionType) objectToTransform; - return actionType.getValue(); - } -} diff --git a/src/main/java/com/keuin/blame/data/enums/transformer/ObjectTypeTransformer.java b/src/main/java/com/keuin/blame/data/enums/transformer/ObjectTypeTransformer.java deleted file mode 100644 index e28b1b7..0000000 --- a/src/main/java/com/keuin/blame/data/enums/transformer/ObjectTypeTransformer.java +++ /dev/null @@ -1,12 +0,0 @@ -package com.keuin.blame.data.enums.transformer; - -import com.keuin.blame.data.enums.ObjectType; -import org.bson.Transformer; - -public class ObjectTypeTransformer implements Transformer { - @Override - public Object transform(Object objectToTransform) { - ObjectType objectType = (ObjectType) objectToTransform; - return objectType.getValue(); - } -} diff --git a/src/main/java/com/keuin/blame/data/transformer/LogEntryV1ToV2Transformer.java b/src/main/java/com/keuin/blame/data/transformer/LogEntryV1ToV2Transformer.java deleted file mode 100644 index 5172413..0000000 --- a/src/main/java/com/keuin/blame/data/transformer/LogEntryV1ToV2Transformer.java +++ /dev/null @@ -1,31 +0,0 @@ -package com.keuin.blame.data.transformer; - -import com.keuin.blame.data.entry.LogEntry; -import com.keuin.blame.util.MinecraftUtil; - -public class LogEntryV1ToV2Transformer extends AbstractLogEntryTransformer { - - private static final LogEntryV1ToV2Transformer INSTANCE = new LogEntryV1ToV2Transformer(); - - static { - TransformerManager.setTransformer(1, INSTANCE); - } - - @Override - public LogEntry transform(LogEntry entry) { - LogEntry entryV2 = new LogEntry(); - entryV2.version = 2; - entryV2.gameVersion = entry.gameVersion; - entryV2.timeMillis = entry.timeMillis; - entryV2.subjectId = entry.subjectId; - entryV2.subjectUUID = entry.subjectUUID; - entryV2.subjectPos = entry.subjectPos; - entryV2.actionType = entry.actionType; - entryV2.objectType = entry.objectType; - entryV2.objectId = entry.objectId; - entryV2.objectPos = entry.objectPos; - entryV2.radius = MinecraftUtil.getRadius(entry.objectPos); - return entryV2; - } - -} diff --git a/src/main/java/com/keuin/blame/lookup/LookupWorker.java b/src/main/java/com/keuin/blame/lookup/LookupWorker.java index d9425b6..3ce21b8 100644 --- a/src/main/java/com/keuin/blame/lookup/LookupWorker.java +++ b/src/main/java/com/keuin/blame/lookup/LookupWorker.java @@ -31,10 +31,10 @@ public class LookupWorker extends Thread { public void run() { try (final MongoClient mongoClient = MongoClients.create(CLIENT_SETTINGS)) { final MongoDatabase db = mongoClient.getDatabase( - DatabaseUtil.MONGO_CONFIG.getDatabaseName() + DatabaseUtil.DB_CONFIG.getDatabaseName() ); final MongoCollection<LogEntry> collection = db.getCollection( - DatabaseUtil.MONGO_CONFIG.getLogCollectionName(), LogEntry.class + DatabaseUtil.DB_CONFIG.getTableName(), LogEntry.class ); long time; while (running) { diff --git a/src/main/java/com/keuin/blame/util/DatabaseUtil.java b/src/main/java/com/keuin/blame/util/DatabaseUtil.java index 8d8b2ad..1611a39 100644 --- a/src/main/java/com/keuin/blame/util/DatabaseUtil.java +++ b/src/main/java/com/keuin/blame/util/DatabaseUtil.java @@ -1,51 +1,9 @@ package com.keuin.blame.util; import com.keuin.blame.Blame; -import com.keuin.blame.config.MongoConfig; -import com.keuin.blame.data.enums.codec.ActionTypeCodec; -import com.keuin.blame.data.enums.codec.ObjectTypeCodec; -import com.keuin.blame.data.enums.codec.WorldPosCodec; -import com.mongodb.ConnectionString; -import com.mongodb.MongoClientSettings; -import org.bson.UuidRepresentation; -import org.bson.codecs.configuration.CodecRegistries; -import org.bson.codecs.configuration.CodecRegistry; -import org.bson.codecs.pojo.PojoCodecProvider; - -import java.util.logging.Level; -import java.util.logging.Logger; - -import static org.bson.codecs.configuration.CodecRegistries.fromProviders; +import com.keuin.blame.config.DatabaseConfig; public class DatabaseUtil { - public static final MongoConfig MONGO_CONFIG = Blame.config.getMongoConfig(); - public static final CodecRegistry CODEC_REGISTRY = CodecRegistries.fromRegistries( - MongoClientSettings.getDefaultCodecRegistry(), - CodecRegistries.fromCodecs( - new ActionTypeCodec(), - new ObjectTypeCodec(), - new WorldPosCodec() - ), - fromProviders(PojoCodecProvider.builder().automatic(true).build()) - ); - public static final MongoClientSettings CLIENT_SETTINGS = MongoClientSettings.builder() - .applyConnectionString(new ConnectionString(MONGO_CONFIG.getAddress())) - .codecRegistry(CODEC_REGISTRY) - .uuidRepresentation(UuidRepresentation.JAVA_LEGACY) // for backward-compatible with logs created by older versions - .build(); - - // TODO: Auto create indexes if the collection is empty - // db.log.createIndex({ timestamp_millis: -1 }) - // db.log.createIndex({ timestamp_millis: -1, object_id: "hashed" }) - // db.log.createIndex({ timestamp_millis: -1, subject_id: "hashed" }) - - public static void disableMongoSpamming() { -// ((LoggerContext) LoggerFactory.getILoggerFactory()).getLogger("org.mongodb.driver").setLevel(Level.ERROR); - String[] mongoDrivers = new String[]{"org.mongodb", "org.mongodb.driver", "org.mongodb.driver.cluster", "org.mongodb.driver.connection"}; - for (String driverName : mongoDrivers) { - Logger mongoLogger = Logger.getLogger(driverName); - mongoLogger.setLevel(Level.OFF); // Plz be silent, my boy. - } - } + public static final DatabaseConfig DB_CONFIG = Blame.config.getMongoConfig(); } |