From dfa163e5a3cffa46f0241210f2c8be1f8e298d7a Mon Sep 17 00:00:00 2001 From: Keuin Date: Fri, 4 Feb 2022 22:54:22 +0800 Subject: Add psmb support. --- README_zh_CN.md | 208 ++++++++++++--------- build.gradle | 2 + .../config/remote/PsmbEndpointFactory.java | 64 +++++++ .../config/remote/RemoteEndpointFactory.java | 4 +- .../endpoint/local/BungeeServerChatEndpoint.java | 1 + .../endpoint/remote/psmb/PsmbEndpoint.java | 187 ++++++++++++++++++ .../remote/psmb/PsmbMessageSerializer.java | 179 ++++++++++++++++++ src/main/java/com/keuin/psmb4j/BaseClient.java | 126 +++++++++++++ src/main/java/com/keuin/psmb4j/PublishClient.java | 59 ++++++ .../java/com/keuin/psmb4j/SubscribeClient.java | 121 ++++++++++++ .../keuin/psmb4j/error/BadProtocolException.java | 13 ++ .../psmb4j/error/CommandFailureException.java | 10 + .../psmb4j/error/IllegalParameterException.java | 10 + .../psmb4j/error/ProtocolFailureException.java | 13 ++ .../psmb4j/error/ServerMisbehaveException.java | 10 + .../psmb4j/error/UnsupportedProtocolException.java | 7 + .../com/keuin/psmb4j/util/InputStreamUtils.java | 56 ++++++ .../java/com/keuin/psmb4j/util/StringUtils.java | 14 ++ .../psmb4j/util/error/SocketClosedException.java | 15 ++ .../util/error/StringLengthExceededException.java | 9 + 20 files changed, 1016 insertions(+), 92 deletions(-) create mode 100644 src/main/java/com/keuin/crosslink/messaging/config/remote/PsmbEndpointFactory.java create mode 100644 src/main/java/com/keuin/crosslink/messaging/endpoint/remote/psmb/PsmbEndpoint.java create mode 100644 src/main/java/com/keuin/crosslink/messaging/endpoint/remote/psmb/PsmbMessageSerializer.java create mode 100644 src/main/java/com/keuin/psmb4j/BaseClient.java create mode 100644 src/main/java/com/keuin/psmb4j/PublishClient.java create mode 100644 src/main/java/com/keuin/psmb4j/SubscribeClient.java create mode 100644 src/main/java/com/keuin/psmb4j/error/BadProtocolException.java create mode 100644 src/main/java/com/keuin/psmb4j/error/CommandFailureException.java create mode 100644 src/main/java/com/keuin/psmb4j/error/IllegalParameterException.java create mode 100644 src/main/java/com/keuin/psmb4j/error/ProtocolFailureException.java create mode 100644 src/main/java/com/keuin/psmb4j/error/ServerMisbehaveException.java create mode 100644 src/main/java/com/keuin/psmb4j/error/UnsupportedProtocolException.java create mode 100644 src/main/java/com/keuin/psmb4j/util/InputStreamUtils.java create mode 100644 src/main/java/com/keuin/psmb4j/util/StringUtils.java create mode 100644 src/main/java/com/keuin/psmb4j/util/error/SocketClosedException.java create mode 100644 src/main/java/com/keuin/psmb4j/util/error/StringLengthExceededException.java diff --git a/README_zh_CN.md b/README_zh_CN.md index dc756c4..dcfb90c 100644 --- a/README_zh_CN.md +++ b/README_zh_CN.md @@ -2,16 +2,11 @@ CrossLink 的目的是连接Minecraft群组服务器和即时通讯软件。 -CrossLink 将每个子服务器的聊天窗口视为独立的端点,将每个即时通讯软件的群聊也视为独立的端点。 -服务器管理员编写配置文件,描述消息将如何在这些端点之间互相转发,从而让玩家无论在哪个子服务器 -或者哪个即时通讯软件里,都可以在一起聊天。 - -路由策略可根据需求灵活配置。由于 CrossLink 的代码完全不关心消息该从哪出现、又要被发送到哪里, -因此转发操作可被用户完全控制。用户将`按来源过滤`、`按内容过滤`、`字符串替换`、`丢弃`、`转发` -这些**基本操作(action)** 组合成为**路由规则(rule)**,有限个路由规则的有序排列形成**路由表**。 -CrossLink 按照路由表转发消息,基本操作的组合可实现自由而强大的消息转发,从而 -提供高度定制化的 Minecraft 群组服务器消息互联方案。 +CrossLink 将每个子服务器的聊天窗口视为独立的端点,将每个即时通讯软件的群聊也视为独立的端点。 服务器管理员编写配置文件,描述消息将如何在这些端点之间互相转发,从而让玩家无论在哪个子服务器 或者哪个即时通讯软件里,都可以在一起聊天。 +路由策略可根据需求灵活配置。由于 CrossLink 的代码完全不关心消息该从哪出现、又要被发送到哪里, 因此转发操作可被用户完全控制。用户将`按来源过滤`、`按内容过滤`、`字符串替换`、`丢弃`、`转发` +这些**基本操作(action)** 组合成为**路由规则(rule)**,有限个路由规则的有序排列形成**路由表**。 CrossLink 按照路由表转发消息,基本操作的组合可实现自由而强大的消息转发,从而 提供高度定制化的 +Minecraft 群组服务器消息互联方案。 # 运行环境 @@ -35,95 +30,125 @@ CrossLink 按照路由表转发消息,基本操作的组合可实现自由而 ```json5 { - "remotes": [ + "remotes": [ + { + "type": "telegram", + // this endpoint is identified with "remote:Telegram" + "id": "Telegram", + // default: true, if set to false, this remote will be ignored + "enabled": true, + // Telegram Bot token + "token": "======SECRET======", + // repeat to and from this chat + "chat_id": 123456789, + // connect to Telegram API using this proxy + "proxy": "socks://127.0.0.1:10809", + // url to custom Telegram API + "api": "https://my-telegram-api.com" + }, + { + "type": "psmb", + // this endpoint is identified as "remote:mypsmb" + "id": "mypsmb", + // but it creates zero or one or more than one sub "virtual" endpoints + "enabled": true, + "host": "1.onesmp.org", + "port": 3456, + // messages sent to this endpoint will be published to the psmb topic whose id is 'subscribe_to' + "publish_to": "chat_mc", + // messages from topics matched by this pattern will be present on this endpoint + "subscribe_from": "chat_im*", + // the unique subscription client id required by psmb protocol + "subscriber_id": 1314, + // send keep alive packet in every 20 seconds + // if the value is ignored or not positive, keepalive will be disabled + "keepalive": 20000 + }, + { + // not implemented yet, may be added in future version + "type": "json-rpc", + "id": "rpc", + "enabled": true, + "listen": ["127.0.0.1", 8008], + "methods": { + "get": "getMessage", + "put": "sendMessage" + } + } + ], + "routing": [ + // all rules are processed sequentially + // a message may match multiple rules and thus may be duplicate in your case + // if the message is dropped in an action in one rule, + // (the action type is just "drop" and it does not have any argument) + // all subsequent rules will NOT see this message + { + // inbound chat messages (remote -> all servers) + "object": "chat_message", + // match chat messages + "from": "remote:.*", + // regexp matching source, + // only messages with matched source will be + // processed by this rule, otherwise this rule is skipped + "actions": [ + // actions run sequentially { - "type": "telegram", - "id": "Telegram", // this endpoint is identified with "remote:Telegram" - "enabled": true, // default: true, if set to false, this remote will be ignored - "token": "======SECRET======", // Telegram Bot token - "chat_id": 123456789, // repeat to and from this chat - "proxy": "socks://127.0.0.1:10809", // connect to Telegram API using this proxy - "api": "https://my-telegram-api.com" // url to custom Telegram API + "type": "format", + "color": "green" }, - { - "type": "psmb", // psmb is a special case, since it is not an endpoint - "id": "mypsmb", // this stub endpoint is identified with "remote:mypsmb" - "enabled": true, // but it creates zero or one or more than one sub "virtual" endpoints - "host": "1.onesmp.org", - "port": 3456, - "subscribe_to": "chat_.+", // psmb subscription pattern - // for example: if you have topic chat_1 and chat_2 - // then they are identified with "remote:mypsmb:chat_1" and "remote:mypsmb:chat_2" - // dispatching messages from "remote:mypsmb" to virtual endpoints - // such as "remote:mypsmb:chat_1", is done by psmb stub endpoint - "topics": [ // all topics this endpoint can actually "see" - "chat_qq", // regexp in "subscribe_to" and action route "to" - "chat_wechat" // will only match endpoints declared in this list - ] - }, - { - "type": "json-rpc", - "id": "rpc", - "enabled": true, - "listen": ["127.0.0.1", 8008], - "methods": { - "get": "getMessage", - "put": "sendMessage" - } - } - ], - "routing": [ - // all rules are processed sequentially - // a message may match multiple rules and thus may be duplicate in your case - // if the message is dropped in an action in one rule, - // (the action type is just "drop" and it does not have any argument) - // all subsequent rules will NOT see this message { - // inbound chat messages (remote -> all servers) - "object": "chat_message", // match chat messages - "from": "remote:.*", // regexp matching source, - // only messages with matched source will be - // processed by this rule, otherwise this rule is skipped - "actions": [{ - "type": "format", - "color": "green" - }, { // actions run sequentially - "type": "route", // route this message to matched destinations - "to": "server:.*" // regexp matching destination - }] + // route this message to matched destinations + "type": "route", + // the regexp matching destinations + "to": "server:.*" + } + ] + }, + { + // outbound messages (starting with '#', server -> all remotes) + "object": "chat_message", + "from": "server:.*", + "actions": [ + { + // filter the message using given regexp + // if the message does not match given pattern, + // it won't be passed into subsequent actions + "type": "filter", + // match all messages starts with char '#' + "pattern": "#.+" }, { - // outbound messages (starting with '#', server -> all remotes) - "object": "chat_message", - "from": "server:.*", - "actions": [{ - "type": "filter", // filter the message using given regexp - // if the message does not match given pattern, - // it won't be passed into subsequent actions - "pattern": "#.+" // match all messages starts with char '#' - }, { - "type": "replace", // replace the message, removing heading '#' - "from": "^#(.*)", // capture all chars after the heading '#' - "to": "$1" // and make them as the output - }, { - "type": "route", // send the message to all remotes - "to": "remote:.*" - }] + // replace the message + "type": "replace", + // removing heading '#' capture all chars after the heading '#' + // and make them as the output + "from": "^#(.*)", + "to": "$1" }, { - // cross-server messages (server -> all other servers) - "object": "chat_message", - "from": "server:.*", - "actions": [{ - "type": "route", - "to": "server:.*", - "backflow": false // do not repeat to sender, true by default - // since the destination pattern will match the source, - // we have to set backflow to false to prevent - // players from seeing duplicate messages - }] + // send the message to all remotes + "type": "route", + "to": "remote:.*" + } + ] + }, + { + // cross-server messages (server -> all other servers) + "object": "chat_message", + "from": "server:.*", + "actions": [ + { + "type": "route", + "to": "server:.*", + // do not repeat to sender, true by default + // since the destination pattern will match the source, + // setting backflow to false will prevent + // players from seeing duplicate messages + "backflow": false } - ] + ] + } + ] } ``` @@ -152,4 +177,5 @@ CrossLink 基于许多开源组件,这些开源组件的许可协议和声明 - [adventure](https://github.com/KyoriPowered/adventure) - [jackson-core](https://github.com/FasterXML/jackson-core) - [gson](https://github.com/google/gson) -- [java-telegram-bot-api](https://github.com/pengrad/java-telegram-bot-api) \ No newline at end of file +- [java-telegram-bot-api](https://github.com/pengrad/java-telegram-bot-api) +- [bson](https://mvnrepository.com/artifact/org.mongodb/bson) \ No newline at end of file diff --git a/build.gradle b/build.gradle index b6032ec..fd9c603 100644 --- a/build.gradle +++ b/build.gradle @@ -73,6 +73,8 @@ dependencies { shadow 'com.google.code.gson:gson:2.8.9' // https://mvnrepository.com/artifact/com.github.pengrad/java-telegram-bot-api shadow 'com.github.pengrad:java-telegram-bot-api:5.5.0' + // https://mvnrepository.com/artifact/org.mongodb/bson + shadow 'org.mongodb:bson:4.4.1' } shadowJar { diff --git a/src/main/java/com/keuin/crosslink/messaging/config/remote/PsmbEndpointFactory.java b/src/main/java/com/keuin/crosslink/messaging/config/remote/PsmbEndpointFactory.java new file mode 100644 index 0000000..3ee2e2b --- /dev/null +++ b/src/main/java/com/keuin/crosslink/messaging/config/remote/PsmbEndpointFactory.java @@ -0,0 +1,64 @@ +package com.keuin.crosslink.messaging.config.remote; + +import com.fasterxml.jackson.databind.JsonNode; +import com.keuin.crosslink.messaging.endpoint.IEndpoint; +import com.keuin.crosslink.messaging.endpoint.remote.psmb.PsmbEndpoint; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import java.util.Optional; + +public class PsmbEndpointFactory implements IRemoteEndpointFactory { + @Override + public @NotNull String type() { + return "psmb"; + } + + @Override + public @Nullable IEndpoint create(@NotNull JsonNode config) throws InvalidEndpointConfigurationException { + JsonNode node; + // read id, not optional + var id = Optional.ofNullable(config.get("id")).map(JsonNode::textValue).orElse(null); + if (id == null || id.isEmpty()) throw new InvalidEndpointConfigurationException("Invalid \"id\""); + + // read enabled, optional (true by default) + if (!config.get("enabled").isBoolean()) throw new InvalidEndpointConfigurationException("Invalid \"enabled\""); + var enabled = Optional.ofNullable(config.get("enabled")).map(JsonNode::booleanValue).orElse(true); + + // read host, not optional + var host = Optional.ofNullable(config.get("host")).map(JsonNode::textValue).orElse(null); + if (host == null || host.isEmpty()) throw new InvalidEndpointConfigurationException("Invalid \"host\""); + + // read port, not optional + node = config.get("port"); + if (node == null || !node.isIntegralNumber()) throw new InvalidEndpointConfigurationException("Invalid \"port\""); + final int port = node.intValue(); + if (port <= 0 || port >= 65536) throw new InvalidEndpointConfigurationException("Invalid \"port\""); + + // read subscribe_from, not optional + var pubTopic = Optional.ofNullable(config.get("publish_to")).map(JsonNode::textValue).orElse(null); + if (pubTopic == null || pubTopic.isEmpty()) throw new InvalidEndpointConfigurationException("Invalid \"publish_to\""); + + // read subscribe_from, not optional + var subPattern = Optional.ofNullable(config.get("subscribe_from")).map(JsonNode::textValue).orElse(null); + if (subPattern == null || subPattern.isEmpty()) throw new InvalidEndpointConfigurationException("Invalid \"subscribe_from\""); + + // read subscriber_id, not optional + node = config.get("subscriber_id"); + if (node == null || !node.isIntegralNumber()) { + throw new InvalidEndpointConfigurationException("Invalid \"subscriber_id\""); + } + final var subId = node.longValue(); + + // read keepalive, optional + node = config.get("keepalive"); + if (node != null && !node.isIntegralNumber()) { + throw new InvalidEndpointConfigurationException("Invalid \"keepalive\""); + } + final var keepalive = Optional.ofNullable(node).map(JsonNode::intValue).orElse(0); + + if (!enabled) return null; // not enabled, give a null value + + return new PsmbEndpoint(id, host, port, pubTopic, subPattern, subId, keepalive); + } +} diff --git a/src/main/java/com/keuin/crosslink/messaging/config/remote/RemoteEndpointFactory.java b/src/main/java/com/keuin/crosslink/messaging/config/remote/RemoteEndpointFactory.java index b73fba5..844bae4 100644 --- a/src/main/java/com/keuin/crosslink/messaging/config/remote/RemoteEndpointFactory.java +++ b/src/main/java/com/keuin/crosslink/messaging/config/remote/RemoteEndpointFactory.java @@ -17,7 +17,9 @@ import java.util.Optional; public class RemoteEndpointFactory { private static final Logger logger = LoggerFactory.getLogger(LoggerNaming.name().of("config").of("remotes").toString()); - private static final IRemoteEndpointFactory[] factories = new IRemoteEndpointFactory[]{new TelegramEndpointFactory()}; + private static final IRemoteEndpointFactory[] factories = new IRemoteEndpointFactory[]{ + new TelegramEndpointFactory(), new PsmbEndpointFactory() + }; /** * Create an {@link IEndpoint} instance based on given JSON config node. diff --git a/src/main/java/com/keuin/crosslink/messaging/endpoint/local/BungeeServerChatEndpoint.java b/src/main/java/com/keuin/crosslink/messaging/endpoint/local/BungeeServerChatEndpoint.java index a221f02..813eee0 100644 --- a/src/main/java/com/keuin/crosslink/messaging/endpoint/local/BungeeServerChatEndpoint.java +++ b/src/main/java/com/keuin/crosslink/messaging/endpoint/local/BungeeServerChatEndpoint.java @@ -88,6 +88,7 @@ public class BungeeServerChatEndpoint implements IEndpoint { } private void onMessage(@NotNull IMessage message) { + // TODO make message routing async, probably utilizing a producer-consumer model var rt = router; if (rt == null) { throw new IllegalStateException("Current endpoint hasn't bound to any router"); diff --git a/src/main/java/com/keuin/crosslink/messaging/endpoint/remote/psmb/PsmbEndpoint.java b/src/main/java/com/keuin/crosslink/messaging/endpoint/remote/psmb/PsmbEndpoint.java new file mode 100644 index 0000000..48df6a8 --- /dev/null +++ b/src/main/java/com/keuin/crosslink/messaging/endpoint/remote/psmb/PsmbEndpoint.java @@ -0,0 +1,187 @@ +package com.keuin.crosslink.messaging.endpoint.remote.psmb; + +import com.keuin.crosslink.messaging.endpoint.EndpointNamespace; +import com.keuin.crosslink.messaging.endpoint.IEndpoint; +import com.keuin.crosslink.messaging.message.IMessage; +import com.keuin.crosslink.messaging.router.IRouter; +import com.keuin.crosslink.util.LoggerNaming; +import com.keuin.psmb4j.PublishClient; +import com.keuin.psmb4j.SubscribeClient; +import com.keuin.psmb4j.error.CommandFailureException; +import org.jetbrains.annotations.NotNull; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Objects; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +public class PsmbEndpoint implements IEndpoint { + public static final int RETRY_INTERVAL_MILLIS = 10 * 1000; + private IRouter router = null; + private final String id; + + private final String host; + private final int port; + private final String pubTopic; + private final String subPattern; + private final long subId; // subscriber id + private final int keepAliveInterval; + + /* + PublishClient and SubscribeClient are networking components. + When IO error occurred, they will be invalidated, + thus we need to create a new instance for retry. + As a result, they are not final. + */ + + // pub queue and its read-write lock + private final BlockingQueue pubQueue = new ArrayBlockingQueue<>(32); + private final Object pubLock = new Object(); + + private final Thread pubThread = new Thread(this::publish); + private final Thread subThread = new Thread(this::subscribe); + + public PsmbEndpoint(String id, + String host, + int port, + String publicTopic, + String subscribePattern, + long subscriberId, + int keepAliveInterval) { + this.id = id; + this.host = host; + this.port = port; + this.pubTopic = publicTopic; + this.subPattern = subscribePattern; + this.subId = subscriberId; + this.keepAliveInterval = keepAliveInterval; + // start pubsub threads + pubThread.start(); + subThread.start(); + } + + private void publish() { + final var pub = new PublishClient(host, port); + final var logger = LoggerFactory.getLogger(LoggerNaming.name().of("endpoint").of("psmb") + .of(String.format("%s,%s", host, pubTopic)).of("pub").toString()); + try { + // reconnect loop + while (true) { + // try to connect + try { + pub.connect(); + pub.setPublish(pubTopic); + } catch (IOException | CommandFailureException ex) { + logger.error("Cannot connect to server", ex); + //noinspection BusyWait + Thread.sleep(RETRY_INTERVAL_MILLIS); + continue; + } + // connected successfully, send messages + try { + // publish loop + long lastBeat = -1; + while (true) { + if (Math.abs(System.currentTimeMillis() - lastBeat) >= keepAliveInterval) { + pub.keepAlive(); + lastBeat = System.currentTimeMillis(); + } + var message = pubQueue.poll(Math.max(keepAliveInterval, 0), TimeUnit.MILLISECONDS); + if (message == null) continue; + pub.publish(PsmbMessageSerializer.serialize(message)); + } + } catch (IOException | CommandFailureException ex) { + logger.error("Cannot publish message", ex); + pub.disconnect(); // reconnect in the outer loop + } + } + } catch (InterruptedException ignored) { + logger.info("Thread is interrupted."); + } finally { + logger.info("Thread is stopping."); + } + } + + private void subscribe() { + final var sub = new SubscribeClient(host, port, subPattern, keepAliveInterval, subId); + final var logger = LoggerFactory.getLogger(LoggerNaming.name().of("endpoint").of("psmb") + .of(String.format("%s,%d,%s", host, subId, subPattern)).of("sub").toString()); + try { + // reconnect loop + while (true) { + // try to connect + try { + sub.connect(); + sub.setSubscribe(subPattern, subId); + } catch (IOException | CommandFailureException ex) { + logger.error("Cannot connect to server", ex); + //noinspection BusyWait + Thread.sleep(RETRY_INTERVAL_MILLIS); + continue; + } + // connected successfully, receive messages + try { + // subscribe loop + sub.subscribe(raw -> { + try { + onMessage(PsmbMessageSerializer.deserialize(raw, this)); + } catch (PsmbMessageSerializer.IllegalPackedMessageException ex) { + logger.error("Cannot decode message", ex); + } + }); + } catch (IOException | CommandFailureException ex) { + logger.error("Cannot receive message", ex); + sub.disconnect(); // reconnect in the outer loop + } + } + } catch (InterruptedException ignored) { + logger.info("Thread is interrupted."); + } finally { + logger.info("Thread is stopping."); + } + } + + + private void onMessage(@NotNull IMessage message) { + Objects.requireNonNull(message); + this.router.sendMessage(message); + } + + @Override + public void sendMessage(IMessage message) { + synchronized (pubLock) { + pubQueue.add(message); + } + } + + @Override + public void setRouter(IRouter router) { + this.router = router; + } + + @Override + public void close() { + pubThread.interrupt(); + subThread.interrupt(); + } + + @Override + public @NotNull String id() { + return id; + } + + @Override + public @NotNull EndpointNamespace namespace() { + return EndpointNamespace.REMOTE; + } + + @Override + public String toString() { + return "PsmbEndpoint{" + + "router=" + router + + ", id='" + id + '\'' + + '}'; + } +} diff --git a/src/main/java/com/keuin/crosslink/messaging/endpoint/remote/psmb/PsmbMessageSerializer.java b/src/main/java/com/keuin/crosslink/messaging/endpoint/remote/psmb/PsmbMessageSerializer.java new file mode 100644 index 0000000..cc8b220 --- /dev/null +++ b/src/main/java/com/keuin/crosslink/messaging/endpoint/remote/psmb/PsmbMessageSerializer.java @@ -0,0 +1,179 @@ +package com.keuin.crosslink.messaging.endpoint.remote.psmb; + +import com.keuin.crosslink.messaging.endpoint.EndpointNamespace; +import com.keuin.crosslink.messaging.endpoint.IEndpoint; +import com.keuin.crosslink.messaging.message.IMessage; +import com.keuin.crosslink.messaging.router.IRouter; +import com.keuin.crosslink.messaging.sender.ISender; +import net.kyori.adventure.text.Component; +import net.kyori.adventure.text.format.TextColor; +import net.kyori.adventure.text.format.TextDecoration; +import org.bson.BsonBinaryReader; +import org.bson.BsonType; +import org.jetbrains.annotations.NotNull; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Objects; +import java.util.UUID; +import java.util.zip.GZIPInputStream; + +public class PsmbMessageSerializer { + + // psmb BSON message decoder + // copied from BungeeCross + + public static class IllegalPackedMessageException extends Exception { + public IllegalPackedMessageException() { + super(); + } + + public IllegalPackedMessageException(String missingPropertyName) { + super(String.format("missing BSON property `%s`.", missingPropertyName)); + } + + public IllegalPackedMessageException(String message, Throwable cause) { + super(message, cause); + } + } + + /** + * Deserialize a message from BSON data. + * + * @param type the type. + * @param data the bytes. + * @return the message. + * @throws IOException when failed to decompress gzip data. + */ + private static Component fromSerializedMessage(int type, byte[] data) throws IOException { + if (type == MessageType.TEXT) { + return Component.text(new String(data, StandardCharsets.UTF_8)); + } else if (type == MessageType.GZIP_TEXT) { + // decompress gzip bytes, then decode to string + var os = new GZIPInputStream(new ByteArrayInputStream(data)); + return Component.text(new String(os.readAllBytes(), StandardCharsets.UTF_8)); + } else if (type == MessageType.IMAGE) { + return Component.text("[图片]", TextColor.color(0xFFAA00), TextDecoration.BOLD); + } else { + return Component.text("[未知消息]", TextColor.color(0xAAAAAA), TextDecoration.BOLD); + } + } + + private static class MessageType { + public static final int TEXT = 0; + public static final int IMAGE = 1; + public static final int GZIP_TEXT = 2; + } + + public static byte[] serialize(@NotNull IMessage message) { + // TODO + Objects.requireNonNull(message); + throw new RuntimeException("not implemented"); + } + + public static @NotNull IMessage deserialize(@NotNull ByteBuffer buffer, @NotNull IEndpoint source) throws IllegalPackedMessageException { + Objects.requireNonNull(buffer); + Objects.requireNonNull(source); + try { + var reader = new BsonBinaryReader(buffer); + + reader.readStartDocument(); + + if (isBsonKeyInvalid(reader, "endpoint")) + throw new IllegalPackedMessageException("endpoint"); + var endpoint = reader.readString(); + + if (isBsonKeyInvalid(reader, "sender")) + throw new IllegalPackedMessageException("sender"); + var sender = reader.readString(); + + // read message array + var msgBuilder = Component.text(); + + if (isBsonKeyInvalid(reader, "msg")) + throw new IllegalPackedMessageException("msg"); + reader.readStartArray(); + while (reader.readBsonType() != BsonType.END_OF_DOCUMENT) { + // read sub array + reader.readStartArray(); +// var i = reader.readInt32(); +// System.out.println("Index: " + i); + // we only deal with text messages + var messageType = reader.readInt32(); + var data = reader.readBinaryData().getData(); + try { + msgBuilder.append(fromSerializedMessage(messageType, data)); + } catch (IOException e) { + throw new IllegalPackedMessageException("Unsupported message block type: " + messageType, e); + } + reader.readEndArray(); + } + reader.readEndArray(); + + if (isBsonKeyInvalid(reader, "time")) + throw new IllegalPackedMessageException("time"); + var createTime = reader.readInt64(); + + reader.readEndDocument(); + + // TODO: refactor, create a new class `UnpackedRedisMessage`, + // which has a more powerful internal representation + var senderObj = ISender.create(sender, UUID.nameUUIDFromBytes(sender.getBytes(StandardCharsets.UTF_8))); + // wrap the original source to rename the 'seen' endpoint to remote endpoint name + var wrappedSource = new IEndpoint() { + // delegate all methods to "source" + @Override + public void sendMessage(IMessage message) { + source.sendMessage(message); + } + + @Override + public void setRouter(IRouter router) { + source.setRouter(router); + } + + @Override + public void close() { + source.close(); + } + + @Override + public @NotNull String id() { + return source.id(); + } + + @Override + public @NotNull EndpointNamespace namespace() { + return source.namespace(); + } + + @Override + public @NotNull String friendlyName() { + return endpoint; + } + + @Override + public String namespacedId() { + return source.namespacedId(); + } + }; + return IMessage.create(wrappedSource, senderObj, msgBuilder.asComponent()); + } catch (Exception e) { + throw new IllegalPackedMessageException("invalid packed message data", e); + } + } + + /** + * Read in one BSON key and check if it is invalid. + * + * @param reader the BSON reader. + * @param keyName expected key. + * @return if the key name equals to what is expected. + */ + private static boolean isBsonKeyInvalid(BsonBinaryReader reader, String keyName) { + var name = reader.readName(); + return !keyName.equals(name); + } +} diff --git a/src/main/java/com/keuin/psmb4j/BaseClient.java b/src/main/java/com/keuin/psmb4j/BaseClient.java new file mode 100644 index 0000000..3c5cf08 --- /dev/null +++ b/src/main/java/com/keuin/psmb4j/BaseClient.java @@ -0,0 +1,126 @@ +package com.keuin.psmb4j; + +import com.keuin.psmb4j.error.IllegalParameterException; +import com.keuin.psmb4j.error.UnsupportedProtocolException; +import com.keuin.psmb4j.util.InputStreamUtils; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.Socket; +import java.net.SocketException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; + +public abstract class BaseClient implements AutoCloseable { + + protected final int protocolVersion = 1; + protected final int MAX_CSTRING_LENGTH = 1024; + protected final int DEFAULT_SOCKET_TIMEOUT_MILLIS = 0; + + private final String host; + private final int port; + + private Socket socket; + protected DataInputStream is; + protected DataOutputStream os; + + protected final Object socketWriteLock = new Object(); + protected final Object socketReadLock = new Object(); + + public BaseClient(String host, int port) { + this.host = host; + this.port = port; + } + + /** + * Connect to the server. + * This method must be called before sending any other messages, + * and should be called only once. + * If an IO error occurred when doing some operation, + * this client must be reconnected before next operations. + * @throws IOException if a network error occurred + */ + public void connect() throws IOException { + try { + if (this.socket != null) { + throw new IllegalStateException("already connected"); + } + this.socket = new Socket(host, port); + this.socket.setSoTimeout(DEFAULT_SOCKET_TIMEOUT_MILLIS); + this.is = new DataInputStream(this.socket.getInputStream()); + this.os = new DataOutputStream(this.socket.getOutputStream()); + + os.writeBytes("PSMB"); + os.writeInt(protocolVersion); + os.writeInt(0); // options + os.flush(); + var response = InputStreamUtils.readCString(is, MAX_CSTRING_LENGTH); + if (response.equals("UNSUPPORTED PROTOCOL")) { + throw new UnsupportedProtocolException(); + } else if (response.equals("OK")) { + var serverOptions = is.readInt(); + if (serverOptions != 0) { + throw new IllegalParameterException("Illegal server options: " + serverOptions); + } + } + } catch (IOException ex) { + // failed to connect, reset to initial state + close(); + throw ex; + } + } + + public void keepAlive() throws IOException { + final var nop = new byte[]{'N', 'O', 'P'}; + final var nil = new byte[]{'N', 'I', 'L'}; + synchronized (socketReadLock) { + synchronized (socketWriteLock) { + // lock the whole bidirectional communication + os.write(nop); + os.flush(); + // wait for a response NIL + var response = InputStreamUtils.readBytes(is, 3); + if (!Arrays.equals(response, nil)) { + throw new RuntimeException("illegal command from server: " + + new String(response, StandardCharsets.US_ASCII)); + } + } + } + } + + public void disconnect() { + if (os != null) { + try { + os.writeBytes("BYE"); + os.flush(); + os.close(); + } catch (IOException ignored) { + os = null; + } + } + if (is != null) { + try { + is.close(); + } catch (IOException ignored) { + is = null; + } + } + if (socket != null) { + try { + socket.close(); + } catch (IOException ignored) { + socket = null; + } + } + } + + protected void setSocketTimeout(int t) throws SocketException { + this.socket.setSoTimeout(t); + } + + @Override + public void close() { + disconnect(); + } +} diff --git a/src/main/java/com/keuin/psmb4j/PublishClient.java b/src/main/java/com/keuin/psmb4j/PublishClient.java new file mode 100644 index 0000000..54cf4ff --- /dev/null +++ b/src/main/java/com/keuin/psmb4j/PublishClient.java @@ -0,0 +1,59 @@ +package com.keuin.psmb4j; + +import com.keuin.psmb4j.error.CommandFailureException; +import com.keuin.psmb4j.error.ServerMisbehaveException; +import com.keuin.psmb4j.util.InputStreamUtils; +import com.keuin.psmb4j.util.StringUtils; + +import java.io.IOException; + +public class PublishClient extends BaseClient { + + /** + * Create a client in PUBLISH mode. + * @param host server host. + * @param port server port. + */ + public PublishClient(String host, int port) { + super(host, port); + } + + public void setPublish(String topicId) throws IOException, CommandFailureException { + if (!StringUtils.isPureAscii(topicId)) { + throw new IllegalArgumentException("topicId cannot be encoded with ASCII"); + } + setSocketTimeout(DEFAULT_SOCKET_TIMEOUT_MILLIS); + synchronized (socketWriteLock) { + os.writeBytes("PUB"); + os.writeBytes(topicId); + os.writeByte('\0'); + os.flush(); + } + + synchronized (socketReadLock) { + var response = InputStreamUtils.readCString(is, MAX_CSTRING_LENGTH); + if (response.equals("FAILED")) { + var errorMessage = InputStreamUtils.readCString(is, MAX_CSTRING_LENGTH); + throw new CommandFailureException("Publish failed: " + errorMessage); + } else if (!response.equals("OK")) { + throw new ServerMisbehaveException("Unexpected response: " + response); + } + } + } + + /** + * Publish a message. + * Note that this method is not thread-safe. + * @param message the message to publish. + * @throws CommandFailureException If a command was rejected by the server. + * @throws IOException if an IO error occurred. + */ + public void publish(byte[] message) throws CommandFailureException, IOException { + synchronized (this.socketWriteLock) { + os.writeBytes("MSG"); + os.writeLong(message.length); + os.write(message); + os.flush(); + } + } +} diff --git a/src/main/java/com/keuin/psmb4j/SubscribeClient.java b/src/main/java/com/keuin/psmb4j/SubscribeClient.java new file mode 100644 index 0000000..2c0083d --- /dev/null +++ b/src/main/java/com/keuin/psmb4j/SubscribeClient.java @@ -0,0 +1,121 @@ +package com.keuin.psmb4j; + +import com.keuin.psmb4j.error.CommandFailureException; +import com.keuin.psmb4j.error.ServerMisbehaveException; +import com.keuin.psmb4j.util.InputStreamUtils; +import com.keuin.psmb4j.util.StringUtils; +import org.jetbrains.annotations.NotNull; + +import java.io.IOException; +import java.net.SocketTimeoutException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Objects; +import java.util.function.Consumer; + +public class SubscribeClient extends BaseClient { + + private final String pattern; + private final long subscriberId; + private final int keepAliveIntervalMillis; + + private volatile boolean isRunning = false; + + /** + * Create a client in SUBSCRIBE mode. + * @param host the host to connect to. + * @param port the port to connect to. + * @param pattern the pattern to subscribe + * @param keepAliveIntervalMillis interval between sending keep-alive messages. If <=0, keep-alive is disabled. + * @param subscriberId an integer identifying a subscriber. + */ + public SubscribeClient(String host, int port, String pattern, int keepAliveIntervalMillis, long subscriberId) { + super(host, port); + this.pattern = pattern; + this.subscriberId = subscriberId; + this.keepAliveIntervalMillis = keepAliveIntervalMillis; + if (keepAliveIntervalMillis < 3000) { + throw new IllegalArgumentException("Keep alive interval is too small!"); + } + } + + public void setSubscribe(String pattern, long subscriberId) throws IOException, CommandFailureException { + if (!StringUtils.isPureAscii(pattern)) { + throw new IllegalArgumentException("pattern cannot be encoded in ASCII"); + } + os.writeBytes("SUB"); + os.writeInt(1); // options + os.writeBytes(pattern); + os.writeByte('\0'); + os.writeLong(subscriberId); + os.flush(); + + var response = InputStreamUtils.readCString(is, MAX_CSTRING_LENGTH); + if (response.equals("FAILED")) { + var errorMessage = InputStreamUtils.readCString(is, MAX_CSTRING_LENGTH); + throw new CommandFailureException("Subscribe failed: " + errorMessage); + } else if (!response.equals("OK")) { + throw new ServerMisbehaveException("Unexpected response: " + response); + } + } + + /** + * Start subscribing. + * This method is blocking, the callback will be called in the same thread. + * This method cannot run simultaneously by more than one thread, + * or an {@link IllegalStateException} will be thrown. + * @param callback the callback which accepts message from server. + * @throws CommandFailureException If a command was rejected by the server. + * @throws IOException if an IO error occurred. In this case, + * it is usually unsafe to retry this function, since the internal socket is probably broken. + * You should use another new instance in order to reconnect. + */ + public void subscribe(@NotNull Consumer callback) throws CommandFailureException, IOException { + Objects.requireNonNull(callback); + if (isRunning) { + throw new IllegalStateException(); + } + try { + while (true) { + try { + // only timeout when reading the command + // in other reading, we use default timeout + if (keepAliveIntervalMillis > 0) { + setSocketTimeout(keepAliveIntervalMillis); + } + var command = new String(InputStreamUtils.readBytes(is, 3), StandardCharsets.US_ASCII); + if (keepAliveIntervalMillis > 0) { + setSocketTimeout(DEFAULT_SOCKET_TIMEOUT_MILLIS); + } + if (command.equals("MSG")) { + var length = is.readLong(); + if ((length & 0xffffffff00000000L) != 0) { + throw new RuntimeException(String.format("Client implementation does not support " + + "such long payload (%s Bytes)", Long.toUnsignedString(length))); + } + var message = InputStreamUtils.readBytes(is, (int) length); + callback.accept(ByteBuffer.wrap(message)); + } else if (command.equals("NOP")) { + os.writeBytes("NIL"); + os.flush(); + } else if (command.equals("BYE")) { + break; + } else if (!command.equals("NIL")) { + throw new ServerMisbehaveException("Illegal command from server: " + command); + } + } catch (SocketTimeoutException e) { + // lock is unnecessary, since no other readers are present + super.keepAlive(); + } + } + } finally { + isRunning = false; + disconnect(); + } + } + + @Override + public void keepAlive() { + throw new RuntimeException("Manual keepalive in this class is not allowed."); + } +} diff --git a/src/main/java/com/keuin/psmb4j/error/BadProtocolException.java b/src/main/java/com/keuin/psmb4j/error/BadProtocolException.java new file mode 100644 index 0000000..893b8c3 --- /dev/null +++ b/src/main/java/com/keuin/psmb4j/error/BadProtocolException.java @@ -0,0 +1,13 @@ +package com.keuin.psmb4j.error; + +/** + * The server speaks a bad protocol which is not compatible to the client. + */ +public class BadProtocolException extends RuntimeException { + public BadProtocolException() { + } + + public BadProtocolException(String message) { + super(message); + } +} diff --git a/src/main/java/com/keuin/psmb4j/error/CommandFailureException.java b/src/main/java/com/keuin/psmb4j/error/CommandFailureException.java new file mode 100644 index 0000000..0ac6f07 --- /dev/null +++ b/src/main/java/com/keuin/psmb4j/error/CommandFailureException.java @@ -0,0 +1,10 @@ +package com.keuin.psmb4j.error; + +/** + * The previous command was rejected by the server for certain reasons. + */ +public class CommandFailureException extends ProtocolFailureException { + public CommandFailureException(String message) { + super(message); + } +} diff --git a/src/main/java/com/keuin/psmb4j/error/IllegalParameterException.java b/src/main/java/com/keuin/psmb4j/error/IllegalParameterException.java new file mode 100644 index 0000000..0fe960b --- /dev/null +++ b/src/main/java/com/keuin/psmb4j/error/IllegalParameterException.java @@ -0,0 +1,10 @@ +package com.keuin.psmb4j.error; + +/** + * Some parameters are illegal according to current version's protocol definition. + */ +public class IllegalParameterException extends ServerMisbehaveException { + public IllegalParameterException(String message) { + super(message); + } +} diff --git a/src/main/java/com/keuin/psmb4j/error/ProtocolFailureException.java b/src/main/java/com/keuin/psmb4j/error/ProtocolFailureException.java new file mode 100644 index 0000000..55e6e30 --- /dev/null +++ b/src/main/java/com/keuin/psmb4j/error/ProtocolFailureException.java @@ -0,0 +1,13 @@ +package com.keuin.psmb4j.error; + +/** + * Expected protocol error. + */ +public class ProtocolFailureException extends Exception { + public ProtocolFailureException() { + } + + public ProtocolFailureException(String message) { + super(message); + } +} diff --git a/src/main/java/com/keuin/psmb4j/error/ServerMisbehaveException.java b/src/main/java/com/keuin/psmb4j/error/ServerMisbehaveException.java new file mode 100644 index 0000000..8aa1449 --- /dev/null +++ b/src/main/java/com/keuin/psmb4j/error/ServerMisbehaveException.java @@ -0,0 +1,10 @@ +package com.keuin.psmb4j.error; + +/** + * The server made an illegal response, which conflicts to the protocol definition. + */ +public class ServerMisbehaveException extends BadProtocolException { + public ServerMisbehaveException(String message) { + super(message); + } +} diff --git a/src/main/java/com/keuin/psmb4j/error/UnsupportedProtocolException.java b/src/main/java/com/keuin/psmb4j/error/UnsupportedProtocolException.java new file mode 100644 index 0000000..a6cc935 --- /dev/null +++ b/src/main/java/com/keuin/psmb4j/error/UnsupportedProtocolException.java @@ -0,0 +1,7 @@ +package com.keuin.psmb4j.error; + +/** + * The PSMB server does not support our protocol version. + */ +public class UnsupportedProtocolException extends BadProtocolException { +} diff --git a/src/main/java/com/keuin/psmb4j/util/InputStreamUtils.java b/src/main/java/com/keuin/psmb4j/util/InputStreamUtils.java new file mode 100644 index 0000000..7b3fd9c --- /dev/null +++ b/src/main/java/com/keuin/psmb4j/util/InputStreamUtils.java @@ -0,0 +1,56 @@ +package com.keuin.psmb4j.util; + +import com.keuin.psmb4j.util.error.SocketClosedException; +import com.keuin.psmb4j.util.error.StringLengthExceededException; + +import java.io.IOException; +import java.io.InputStream; + +public class InputStreamUtils { + /** + * Read until '\0', the trailing '\0' is dropped. + */ + public static String readCString(InputStream stream) throws IOException { + var sb = new StringBuilder(); + int c; + while ((c = stream.read()) > 0) { + sb.append((char) c); + } + return sb.toString(); + } + + /** + * Read a C style string, with a length limit. + * If the string is longer than given limit, + * a {@link StringLengthExceededException} will be thrown. + */ + public static String readCString(InputStream stream, long maxLength) throws IOException { + var sb = new StringBuilder(); + int c; + long length = 0; + while ((c = stream.read()) > 0) { + sb.append((char) c); + if (++length > maxLength) { + throw new StringLengthExceededException(maxLength); + } + } + return sb.toString(); + } + + /** + * Read fixed length bytes from stream. + * If not enough, a {@link SocketClosedException} will be thrown. + */ + public static byte[] readBytes(InputStream stream, int length) throws IOException { + var buffer = new byte[length]; + int c; + for (int i = 0; i < length; i++) { + if ((c = stream.read()) >= 0) { + buffer[i] = (byte) c; + } else { + throw new SocketClosedException(length, i); + } + } + return buffer; + } +} diff --git a/src/main/java/com/keuin/psmb4j/util/StringUtils.java b/src/main/java/com/keuin/psmb4j/util/StringUtils.java new file mode 100644 index 0000000..036a560 --- /dev/null +++ b/src/main/java/com/keuin/psmb4j/util/StringUtils.java @@ -0,0 +1,14 @@ +package com.keuin.psmb4j.util; + +import java.nio.charset.StandardCharsets; + +public class StringUtils { + /** + * If the string can be encoded into binary using ASCII. + */ + @SuppressWarnings("BooleanMethodIsAlwaysInverted") + public static boolean isPureAscii(String v) { + return StandardCharsets.US_ASCII.newEncoder().canEncode(v); + // or "ISO-8859-1" for ISO Latin 1 + } +} diff --git a/src/main/java/com/keuin/psmb4j/util/error/SocketClosedException.java b/src/main/java/com/keuin/psmb4j/util/error/SocketClosedException.java new file mode 100644 index 0000000..e7c9b2d --- /dev/null +++ b/src/main/java/com/keuin/psmb4j/util/error/SocketClosedException.java @@ -0,0 +1,15 @@ +package com.keuin.psmb4j.util.error; + +import java.io.IOException; + +/** + * The socket closed before enough bytes have been read out. + */ +public class SocketClosedException extends IOException { + public SocketClosedException() { + } + + public SocketClosedException(long expected, long actual) { + super(String.format("expected %d bytes, EOF after reading %d bytes", expected, actual)); + } +} diff --git a/src/main/java/com/keuin/psmb4j/util/error/StringLengthExceededException.java b/src/main/java/com/keuin/psmb4j/util/error/StringLengthExceededException.java new file mode 100644 index 0000000..afb08b6 --- /dev/null +++ b/src/main/java/com/keuin/psmb4j/util/error/StringLengthExceededException.java @@ -0,0 +1,9 @@ +package com.keuin.psmb4j.util.error; + +import java.io.IOException; + +public class StringLengthExceededException extends IOException { + public StringLengthExceededException(long length) { + super(String.format("String is too long (%d Bytes)", length)); + } +} -- cgit v1.2.3