summaryrefslogtreecommitdiff
path: root/src/main/java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java')
-rw-r--r--src/main/java/com/keuin/crosslink/messaging/config/remote/PsmbEndpointFactory.java64
-rw-r--r--src/main/java/com/keuin/crosslink/messaging/config/remote/RemoteEndpointFactory.java4
-rw-r--r--src/main/java/com/keuin/crosslink/messaging/endpoint/local/BungeeServerChatEndpoint.java1
-rw-r--r--src/main/java/com/keuin/crosslink/messaging/endpoint/remote/psmb/PsmbEndpoint.java187
-rw-r--r--src/main/java/com/keuin/crosslink/messaging/endpoint/remote/psmb/PsmbMessageSerializer.java179
-rw-r--r--src/main/java/com/keuin/psmb4j/BaseClient.java126
-rw-r--r--src/main/java/com/keuin/psmb4j/PublishClient.java59
-rw-r--r--src/main/java/com/keuin/psmb4j/SubscribeClient.java121
-rw-r--r--src/main/java/com/keuin/psmb4j/error/BadProtocolException.java13
-rw-r--r--src/main/java/com/keuin/psmb4j/error/CommandFailureException.java10
-rw-r--r--src/main/java/com/keuin/psmb4j/error/IllegalParameterException.java10
-rw-r--r--src/main/java/com/keuin/psmb4j/error/ProtocolFailureException.java13
-rw-r--r--src/main/java/com/keuin/psmb4j/error/ServerMisbehaveException.java10
-rw-r--r--src/main/java/com/keuin/psmb4j/error/UnsupportedProtocolException.java7
-rw-r--r--src/main/java/com/keuin/psmb4j/util/InputStreamUtils.java56
-rw-r--r--src/main/java/com/keuin/psmb4j/util/StringUtils.java14
-rw-r--r--src/main/java/com/keuin/psmb4j/util/error/SocketClosedException.java15
-rw-r--r--src/main/java/com/keuin/psmb4j/util/error/StringLengthExceededException.java9
18 files changed, 897 insertions, 1 deletions
diff --git a/src/main/java/com/keuin/crosslink/messaging/config/remote/PsmbEndpointFactory.java b/src/main/java/com/keuin/crosslink/messaging/config/remote/PsmbEndpointFactory.java
new file mode 100644
index 0000000..3ee2e2b
--- /dev/null
+++ b/src/main/java/com/keuin/crosslink/messaging/config/remote/PsmbEndpointFactory.java
@@ -0,0 +1,64 @@
+package com.keuin.crosslink.messaging.config.remote;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.keuin.crosslink.messaging.endpoint.IEndpoint;
+import com.keuin.crosslink.messaging.endpoint.remote.psmb.PsmbEndpoint;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.Optional;
+
+public class PsmbEndpointFactory implements IRemoteEndpointFactory {
+ @Override
+ public @NotNull String type() {
+ return "psmb";
+ }
+
+ @Override
+ public @Nullable IEndpoint create(@NotNull JsonNode config) throws InvalidEndpointConfigurationException {
+ JsonNode node;
+ // read id, not optional
+ var id = Optional.ofNullable(config.get("id")).map(JsonNode::textValue).orElse(null);
+ if (id == null || id.isEmpty()) throw new InvalidEndpointConfigurationException("Invalid \"id\"");
+
+ // read enabled, optional (true by default)
+ if (!config.get("enabled").isBoolean()) throw new InvalidEndpointConfigurationException("Invalid \"enabled\"");
+ var enabled = Optional.ofNullable(config.get("enabled")).map(JsonNode::booleanValue).orElse(true);
+
+ // read host, not optional
+ var host = Optional.ofNullable(config.get("host")).map(JsonNode::textValue).orElse(null);
+ if (host == null || host.isEmpty()) throw new InvalidEndpointConfigurationException("Invalid \"host\"");
+
+ // read port, not optional
+ node = config.get("port");
+ if (node == null || !node.isIntegralNumber()) throw new InvalidEndpointConfigurationException("Invalid \"port\"");
+ final int port = node.intValue();
+ if (port <= 0 || port >= 65536) throw new InvalidEndpointConfigurationException("Invalid \"port\"");
+
+ // read subscribe_from, not optional
+ var pubTopic = Optional.ofNullable(config.get("publish_to")).map(JsonNode::textValue).orElse(null);
+ if (pubTopic == null || pubTopic.isEmpty()) throw new InvalidEndpointConfigurationException("Invalid \"publish_to\"");
+
+ // read subscribe_from, not optional
+ var subPattern = Optional.ofNullable(config.get("subscribe_from")).map(JsonNode::textValue).orElse(null);
+ if (subPattern == null || subPattern.isEmpty()) throw new InvalidEndpointConfigurationException("Invalid \"subscribe_from\"");
+
+ // read subscriber_id, not optional
+ node = config.get("subscriber_id");
+ if (node == null || !node.isIntegralNumber()) {
+ throw new InvalidEndpointConfigurationException("Invalid \"subscriber_id\"");
+ }
+ final var subId = node.longValue();
+
+ // read keepalive, optional
+ node = config.get("keepalive");
+ if (node != null && !node.isIntegralNumber()) {
+ throw new InvalidEndpointConfigurationException("Invalid \"keepalive\"");
+ }
+ final var keepalive = Optional.ofNullable(node).map(JsonNode::intValue).orElse(0);
+
+ if (!enabled) return null; // not enabled, give a null value
+
+ return new PsmbEndpoint(id, host, port, pubTopic, subPattern, subId, keepalive);
+ }
+}
diff --git a/src/main/java/com/keuin/crosslink/messaging/config/remote/RemoteEndpointFactory.java b/src/main/java/com/keuin/crosslink/messaging/config/remote/RemoteEndpointFactory.java
index b73fba5..844bae4 100644
--- a/src/main/java/com/keuin/crosslink/messaging/config/remote/RemoteEndpointFactory.java
+++ b/src/main/java/com/keuin/crosslink/messaging/config/remote/RemoteEndpointFactory.java
@@ -17,7 +17,9 @@ import java.util.Optional;
public class RemoteEndpointFactory {
private static final Logger logger =
LoggerFactory.getLogger(LoggerNaming.name().of("config").of("remotes").toString());
- private static final IRemoteEndpointFactory[] factories = new IRemoteEndpointFactory[]{new TelegramEndpointFactory()};
+ private static final IRemoteEndpointFactory[] factories = new IRemoteEndpointFactory[]{
+ new TelegramEndpointFactory(), new PsmbEndpointFactory()
+ };
/**
* Create an {@link IEndpoint} instance based on given JSON config node.
diff --git a/src/main/java/com/keuin/crosslink/messaging/endpoint/local/BungeeServerChatEndpoint.java b/src/main/java/com/keuin/crosslink/messaging/endpoint/local/BungeeServerChatEndpoint.java
index a221f02..813eee0 100644
--- a/src/main/java/com/keuin/crosslink/messaging/endpoint/local/BungeeServerChatEndpoint.java
+++ b/src/main/java/com/keuin/crosslink/messaging/endpoint/local/BungeeServerChatEndpoint.java
@@ -88,6 +88,7 @@ public class BungeeServerChatEndpoint implements IEndpoint {
}
private void onMessage(@NotNull IMessage message) {
+ // TODO make message routing async, probably utilizing a producer-consumer model
var rt = router;
if (rt == null) {
throw new IllegalStateException("Current endpoint hasn't bound to any router");
diff --git a/src/main/java/com/keuin/crosslink/messaging/endpoint/remote/psmb/PsmbEndpoint.java b/src/main/java/com/keuin/crosslink/messaging/endpoint/remote/psmb/PsmbEndpoint.java
new file mode 100644
index 0000000..48df6a8
--- /dev/null
+++ b/src/main/java/com/keuin/crosslink/messaging/endpoint/remote/psmb/PsmbEndpoint.java
@@ -0,0 +1,187 @@
+package com.keuin.crosslink.messaging.endpoint.remote.psmb;
+
+import com.keuin.crosslink.messaging.endpoint.EndpointNamespace;
+import com.keuin.crosslink.messaging.endpoint.IEndpoint;
+import com.keuin.crosslink.messaging.message.IMessage;
+import com.keuin.crosslink.messaging.router.IRouter;
+import com.keuin.crosslink.util.LoggerNaming;
+import com.keuin.psmb4j.PublishClient;
+import com.keuin.psmb4j.SubscribeClient;
+import com.keuin.psmb4j.error.CommandFailureException;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class PsmbEndpoint implements IEndpoint {
+ public static final int RETRY_INTERVAL_MILLIS = 10 * 1000;
+ private IRouter router = null;
+ private final String id;
+
+ private final String host;
+ private final int port;
+ private final String pubTopic;
+ private final String subPattern;
+ private final long subId; // subscriber id
+ private final int keepAliveInterval;
+
+ /*
+ PublishClient and SubscribeClient are networking components.
+ When IO error occurred, they will be invalidated,
+ thus we need to create a new instance for retry.
+ As a result, they are not final.
+ */
+
+ // pub queue and its read-write lock
+ private final BlockingQueue<IMessage> pubQueue = new ArrayBlockingQueue<>(32);
+ private final Object pubLock = new Object();
+
+ private final Thread pubThread = new Thread(this::publish);
+ private final Thread subThread = new Thread(this::subscribe);
+
+ public PsmbEndpoint(String id,
+ String host,
+ int port,
+ String publicTopic,
+ String subscribePattern,
+ long subscriberId,
+ int keepAliveInterval) {
+ this.id = id;
+ this.host = host;
+ this.port = port;
+ this.pubTopic = publicTopic;
+ this.subPattern = subscribePattern;
+ this.subId = subscriberId;
+ this.keepAliveInterval = keepAliveInterval;
+ // start pubsub threads
+ pubThread.start();
+ subThread.start();
+ }
+
+ private void publish() {
+ final var pub = new PublishClient(host, port);
+ final var logger = LoggerFactory.getLogger(LoggerNaming.name().of("endpoint").of("psmb")
+ .of(String.format("%s,%s", host, pubTopic)).of("pub").toString());
+ try {
+ // reconnect loop
+ while (true) {
+ // try to connect
+ try {
+ pub.connect();
+ pub.setPublish(pubTopic);
+ } catch (IOException | CommandFailureException ex) {
+ logger.error("Cannot connect to server", ex);
+ //noinspection BusyWait
+ Thread.sleep(RETRY_INTERVAL_MILLIS);
+ continue;
+ }
+ // connected successfully, send messages
+ try {
+ // publish loop
+ long lastBeat = -1;
+ while (true) {
+ if (Math.abs(System.currentTimeMillis() - lastBeat) >= keepAliveInterval) {
+ pub.keepAlive();
+ lastBeat = System.currentTimeMillis();
+ }
+ var message = pubQueue.poll(Math.max(keepAliveInterval, 0), TimeUnit.MILLISECONDS);
+ if (message == null) continue;
+ pub.publish(PsmbMessageSerializer.serialize(message));
+ }
+ } catch (IOException | CommandFailureException ex) {
+ logger.error("Cannot publish message", ex);
+ pub.disconnect(); // reconnect in the outer loop
+ }
+ }
+ } catch (InterruptedException ignored) {
+ logger.info("Thread is interrupted.");
+ } finally {
+ logger.info("Thread is stopping.");
+ }
+ }
+
+ private void subscribe() {
+ final var sub = new SubscribeClient(host, port, subPattern, keepAliveInterval, subId);
+ final var logger = LoggerFactory.getLogger(LoggerNaming.name().of("endpoint").of("psmb")
+ .of(String.format("%s,%d,%s", host, subId, subPattern)).of("sub").toString());
+ try {
+ // reconnect loop
+ while (true) {
+ // try to connect
+ try {
+ sub.connect();
+ sub.setSubscribe(subPattern, subId);
+ } catch (IOException | CommandFailureException ex) {
+ logger.error("Cannot connect to server", ex);
+ //noinspection BusyWait
+ Thread.sleep(RETRY_INTERVAL_MILLIS);
+ continue;
+ }
+ // connected successfully, receive messages
+ try {
+ // subscribe loop
+ sub.subscribe(raw -> {
+ try {
+ onMessage(PsmbMessageSerializer.deserialize(raw, this));
+ } catch (PsmbMessageSerializer.IllegalPackedMessageException ex) {
+ logger.error("Cannot decode message", ex);
+ }
+ });
+ } catch (IOException | CommandFailureException ex) {
+ logger.error("Cannot receive message", ex);
+ sub.disconnect(); // reconnect in the outer loop
+ }
+ }
+ } catch (InterruptedException ignored) {
+ logger.info("Thread is interrupted.");
+ } finally {
+ logger.info("Thread is stopping.");
+ }
+ }
+
+
+ private void onMessage(@NotNull IMessage message) {
+ Objects.requireNonNull(message);
+ this.router.sendMessage(message);
+ }
+
+ @Override
+ public void sendMessage(IMessage message) {
+ synchronized (pubLock) {
+ pubQueue.add(message);
+ }
+ }
+
+ @Override
+ public void setRouter(IRouter router) {
+ this.router = router;
+ }
+
+ @Override
+ public void close() {
+ pubThread.interrupt();
+ subThread.interrupt();
+ }
+
+ @Override
+ public @NotNull String id() {
+ return id;
+ }
+
+ @Override
+ public @NotNull EndpointNamespace namespace() {
+ return EndpointNamespace.REMOTE;
+ }
+
+ @Override
+ public String toString() {
+ return "PsmbEndpoint{" +
+ "router=" + router +
+ ", id='" + id + '\'' +
+ '}';
+ }
+}
diff --git a/src/main/java/com/keuin/crosslink/messaging/endpoint/remote/psmb/PsmbMessageSerializer.java b/src/main/java/com/keuin/crosslink/messaging/endpoint/remote/psmb/PsmbMessageSerializer.java
new file mode 100644
index 0000000..cc8b220
--- /dev/null
+++ b/src/main/java/com/keuin/crosslink/messaging/endpoint/remote/psmb/PsmbMessageSerializer.java
@@ -0,0 +1,179 @@
+package com.keuin.crosslink.messaging.endpoint.remote.psmb;
+
+import com.keuin.crosslink.messaging.endpoint.EndpointNamespace;
+import com.keuin.crosslink.messaging.endpoint.IEndpoint;
+import com.keuin.crosslink.messaging.message.IMessage;
+import com.keuin.crosslink.messaging.router.IRouter;
+import com.keuin.crosslink.messaging.sender.ISender;
+import net.kyori.adventure.text.Component;
+import net.kyori.adventure.text.format.TextColor;
+import net.kyori.adventure.text.format.TextDecoration;
+import org.bson.BsonBinaryReader;
+import org.bson.BsonType;
+import org.jetbrains.annotations.NotNull;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.zip.GZIPInputStream;
+
+public class PsmbMessageSerializer {
+
+ // psmb BSON message decoder
+ // copied from BungeeCross
+
+ public static class IllegalPackedMessageException extends Exception {
+ public IllegalPackedMessageException() {
+ super();
+ }
+
+ public IllegalPackedMessageException(String missingPropertyName) {
+ super(String.format("missing BSON property `%s`.", missingPropertyName));
+ }
+
+ public IllegalPackedMessageException(String message, Throwable cause) {
+ super(message, cause);
+ }
+ }
+
+ /**
+ * Deserialize a message from BSON data.
+ *
+ * @param type the type.
+ * @param data the bytes.
+ * @return the message.
+ * @throws IOException when failed to decompress gzip data.
+ */
+ private static Component fromSerializedMessage(int type, byte[] data) throws IOException {
+ if (type == MessageType.TEXT) {
+ return Component.text(new String(data, StandardCharsets.UTF_8));
+ } else if (type == MessageType.GZIP_TEXT) {
+ // decompress gzip bytes, then decode to string
+ var os = new GZIPInputStream(new ByteArrayInputStream(data));
+ return Component.text(new String(os.readAllBytes(), StandardCharsets.UTF_8));
+ } else if (type == MessageType.IMAGE) {
+ return Component.text("[图片]", TextColor.color(0xFFAA00), TextDecoration.BOLD);
+ } else {
+ return Component.text("[未知消息]", TextColor.color(0xAAAAAA), TextDecoration.BOLD);
+ }
+ }
+
+ private static class MessageType {
+ public static final int TEXT = 0;
+ public static final int IMAGE = 1;
+ public static final int GZIP_TEXT = 2;
+ }
+
+ public static byte[] serialize(@NotNull IMessage message) {
+ // TODO
+ Objects.requireNonNull(message);
+ throw new RuntimeException("not implemented");
+ }
+
+ public static @NotNull IMessage deserialize(@NotNull ByteBuffer buffer, @NotNull IEndpoint source) throws IllegalPackedMessageException {
+ Objects.requireNonNull(buffer);
+ Objects.requireNonNull(source);
+ try {
+ var reader = new BsonBinaryReader(buffer);
+
+ reader.readStartDocument();
+
+ if (isBsonKeyInvalid(reader, "endpoint"))
+ throw new IllegalPackedMessageException("endpoint");
+ var endpoint = reader.readString();
+
+ if (isBsonKeyInvalid(reader, "sender"))
+ throw new IllegalPackedMessageException("sender");
+ var sender = reader.readString();
+
+ // read message array
+ var msgBuilder = Component.text();
+
+ if (isBsonKeyInvalid(reader, "msg"))
+ throw new IllegalPackedMessageException("msg");
+ reader.readStartArray();
+ while (reader.readBsonType() != BsonType.END_OF_DOCUMENT) {
+ // read sub array
+ reader.readStartArray();
+// var i = reader.readInt32();
+// System.out.println("Index: " + i);
+ // we only deal with text messages
+ var messageType = reader.readInt32();
+ var data = reader.readBinaryData().getData();
+ try {
+ msgBuilder.append(fromSerializedMessage(messageType, data));
+ } catch (IOException e) {
+ throw new IllegalPackedMessageException("Unsupported message block type: " + messageType, e);
+ }
+ reader.readEndArray();
+ }
+ reader.readEndArray();
+
+ if (isBsonKeyInvalid(reader, "time"))
+ throw new IllegalPackedMessageException("time");
+ var createTime = reader.readInt64();
+
+ reader.readEndDocument();
+
+ // TODO: refactor, create a new class `UnpackedRedisMessage`,
+ // which has a more powerful internal representation
+ var senderObj = ISender.create(sender, UUID.nameUUIDFromBytes(sender.getBytes(StandardCharsets.UTF_8)));
+ // wrap the original source to rename the 'seen' endpoint to remote endpoint name
+ var wrappedSource = new IEndpoint() {
+ // delegate all methods to "source"
+ @Override
+ public void sendMessage(IMessage message) {
+ source.sendMessage(message);
+ }
+
+ @Override
+ public void setRouter(IRouter router) {
+ source.setRouter(router);
+ }
+
+ @Override
+ public void close() {
+ source.close();
+ }
+
+ @Override
+ public @NotNull String id() {
+ return source.id();
+ }
+
+ @Override
+ public @NotNull EndpointNamespace namespace() {
+ return source.namespace();
+ }
+
+ @Override
+ public @NotNull String friendlyName() {
+ return endpoint;
+ }
+
+ @Override
+ public String namespacedId() {
+ return source.namespacedId();
+ }
+ };
+ return IMessage.create(wrappedSource, senderObj, msgBuilder.asComponent());
+ } catch (Exception e) {
+ throw new IllegalPackedMessageException("invalid packed message data", e);
+ }
+ }
+
+ /**
+ * Read in one BSON key and check if it is invalid.
+ *
+ * @param reader the BSON reader.
+ * @param keyName expected key.
+ * @return if the key name equals to what is expected.
+ */
+ private static boolean isBsonKeyInvalid(BsonBinaryReader reader, String keyName) {
+ var name = reader.readName();
+ return !keyName.equals(name);
+ }
+}
diff --git a/src/main/java/com/keuin/psmb4j/BaseClient.java b/src/main/java/com/keuin/psmb4j/BaseClient.java
new file mode 100644
index 0000000..3c5cf08
--- /dev/null
+++ b/src/main/java/com/keuin/psmb4j/BaseClient.java
@@ -0,0 +1,126 @@
+package com.keuin.psmb4j;
+
+import com.keuin.psmb4j.error.IllegalParameterException;
+import com.keuin.psmb4j.error.UnsupportedProtocolException;
+import com.keuin.psmb4j.util.InputStreamUtils;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.Socket;
+import java.net.SocketException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+public abstract class BaseClient implements AutoCloseable {
+
+ protected final int protocolVersion = 1;
+ protected final int MAX_CSTRING_LENGTH = 1024;
+ protected final int DEFAULT_SOCKET_TIMEOUT_MILLIS = 0;
+
+ private final String host;
+ private final int port;
+
+ private Socket socket;
+ protected DataInputStream is;
+ protected DataOutputStream os;
+
+ protected final Object socketWriteLock = new Object();
+ protected final Object socketReadLock = new Object();
+
+ public BaseClient(String host, int port) {
+ this.host = host;
+ this.port = port;
+ }
+
+ /**
+ * Connect to the server.
+ * This method must be called before sending any other messages,
+ * and should be called only once.
+ * If an IO error occurred when doing some operation,
+ * this client must be reconnected before next operations.
+ * @throws IOException if a network error occurred
+ */
+ public void connect() throws IOException {
+ try {
+ if (this.socket != null) {
+ throw new IllegalStateException("already connected");
+ }
+ this.socket = new Socket(host, port);
+ this.socket.setSoTimeout(DEFAULT_SOCKET_TIMEOUT_MILLIS);
+ this.is = new DataInputStream(this.socket.getInputStream());
+ this.os = new DataOutputStream(this.socket.getOutputStream());
+
+ os.writeBytes("PSMB");
+ os.writeInt(protocolVersion);
+ os.writeInt(0); // options
+ os.flush();
+ var response = InputStreamUtils.readCString(is, MAX_CSTRING_LENGTH);
+ if (response.equals("UNSUPPORTED PROTOCOL")) {
+ throw new UnsupportedProtocolException();
+ } else if (response.equals("OK")) {
+ var serverOptions = is.readInt();
+ if (serverOptions != 0) {
+ throw new IllegalParameterException("Illegal server options: " + serverOptions);
+ }
+ }
+ } catch (IOException ex) {
+ // failed to connect, reset to initial state
+ close();
+ throw ex;
+ }
+ }
+
+ public void keepAlive() throws IOException {
+ final var nop = new byte[]{'N', 'O', 'P'};
+ final var nil = new byte[]{'N', 'I', 'L'};
+ synchronized (socketReadLock) {
+ synchronized (socketWriteLock) {
+ // lock the whole bidirectional communication
+ os.write(nop);
+ os.flush();
+ // wait for a response NIL
+ var response = InputStreamUtils.readBytes(is, 3);
+ if (!Arrays.equals(response, nil)) {
+ throw new RuntimeException("illegal command from server: " +
+ new String(response, StandardCharsets.US_ASCII));
+ }
+ }
+ }
+ }
+
+ public void disconnect() {
+ if (os != null) {
+ try {
+ os.writeBytes("BYE");
+ os.flush();
+ os.close();
+ } catch (IOException ignored) {
+ os = null;
+ }
+ }
+ if (is != null) {
+ try {
+ is.close();
+ } catch (IOException ignored) {
+ is = null;
+ }
+ }
+ if (socket != null) {
+ try {
+ socket.close();
+ } catch (IOException ignored) {
+ socket = null;
+ }
+ }
+ }
+
+ protected void setSocketTimeout(int t) throws SocketException {
+ this.socket.setSoTimeout(t);
+ }
+
+ @Override
+ public void close() {
+ disconnect();
+ }
+}
diff --git a/src/main/java/com/keuin/psmb4j/PublishClient.java b/src/main/java/com/keuin/psmb4j/PublishClient.java
new file mode 100644
index 0000000..54cf4ff
--- /dev/null
+++ b/src/main/java/com/keuin/psmb4j/PublishClient.java
@@ -0,0 +1,59 @@
+package com.keuin.psmb4j;
+
+import com.keuin.psmb4j.error.CommandFailureException;
+import com.keuin.psmb4j.error.ServerMisbehaveException;
+import com.keuin.psmb4j.util.InputStreamUtils;
+import com.keuin.psmb4j.util.StringUtils;
+
+import java.io.IOException;
+
+public class PublishClient extends BaseClient {
+
+ /**
+ * Create a client in PUBLISH mode.
+ * @param host server host.
+ * @param port server port.
+ */
+ public PublishClient(String host, int port) {
+ super(host, port);
+ }
+
+ public void setPublish(String topicId) throws IOException, CommandFailureException {
+ if (!StringUtils.isPureAscii(topicId)) {
+ throw new IllegalArgumentException("topicId cannot be encoded with ASCII");
+ }
+ setSocketTimeout(DEFAULT_SOCKET_TIMEOUT_MILLIS);
+ synchronized (socketWriteLock) {
+ os.writeBytes("PUB");
+ os.writeBytes(topicId);
+ os.writeByte('\0');
+ os.flush();
+ }
+
+ synchronized (socketReadLock) {
+ var response = InputStreamUtils.readCString(is, MAX_CSTRING_LENGTH);
+ if (response.equals("FAILED")) {
+ var errorMessage = InputStreamUtils.readCString(is, MAX_CSTRING_LENGTH);
+ throw new CommandFailureException("Publish failed: " + errorMessage);
+ } else if (!response.equals("OK")) {
+ throw new ServerMisbehaveException("Unexpected response: " + response);
+ }
+ }
+ }
+
+ /**
+ * Publish a message.
+ * Note that this method is not thread-safe.
+ * @param message the message to publish.
+ * @throws CommandFailureException If a command was rejected by the server.
+ * @throws IOException if an IO error occurred.
+ */
+ public void publish(byte[] message) throws CommandFailureException, IOException {
+ synchronized (this.socketWriteLock) {
+ os.writeBytes("MSG");
+ os.writeLong(message.length);
+ os.write(message);
+ os.flush();
+ }
+ }
+}
diff --git a/src/main/java/com/keuin/psmb4j/SubscribeClient.java b/src/main/java/com/keuin/psmb4j/SubscribeClient.java
new file mode 100644
index 0000000..2c0083d
--- /dev/null
+++ b/src/main/java/com/keuin/psmb4j/SubscribeClient.java
@@ -0,0 +1,121 @@
+package com.keuin.psmb4j;
+
+import com.keuin.psmb4j.error.CommandFailureException;
+import com.keuin.psmb4j.error.ServerMisbehaveException;
+import com.keuin.psmb4j.util.InputStreamUtils;
+import com.keuin.psmb4j.util.StringUtils;
+import org.jetbrains.annotations.NotNull;
+
+import java.io.IOException;
+import java.net.SocketTimeoutException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Objects;
+import java.util.function.Consumer;
+
+public class SubscribeClient extends BaseClient {
+
+ private final String pattern;
+ private final long subscriberId;
+ private final int keepAliveIntervalMillis;
+
+ private volatile boolean isRunning = false;
+
+ /**
+ * Create a client in SUBSCRIBE mode.
+ * @param host the host to connect to.
+ * @param port the port to connect to.
+ * @param pattern the pattern to subscribe
+ * @param keepAliveIntervalMillis interval between sending keep-alive messages. If <=0, keep-alive is disabled.
+ * @param subscriberId an integer identifying a subscriber.
+ */
+ public SubscribeClient(String host, int port, String pattern, int keepAliveIntervalMillis, long subscriberId) {
+ super(host, port);
+ this.pattern = pattern;
+ this.subscriberId = subscriberId;
+ this.keepAliveIntervalMillis = keepAliveIntervalMillis;
+ if (keepAliveIntervalMillis < 3000) {
+ throw new IllegalArgumentException("Keep alive interval is too small!");
+ }
+ }
+
+ public void setSubscribe(String pattern, long subscriberId) throws IOException, CommandFailureException {
+ if (!StringUtils.isPureAscii(pattern)) {
+ throw new IllegalArgumentException("pattern cannot be encoded in ASCII");
+ }
+ os.writeBytes("SUB");
+ os.writeInt(1); // options
+ os.writeBytes(pattern);
+ os.writeByte('\0');
+ os.writeLong(subscriberId);
+ os.flush();
+
+ var response = InputStreamUtils.readCString(is, MAX_CSTRING_LENGTH);
+ if (response.equals("FAILED")) {
+ var errorMessage = InputStreamUtils.readCString(is, MAX_CSTRING_LENGTH);
+ throw new CommandFailureException("Subscribe failed: " + errorMessage);
+ } else if (!response.equals("OK")) {
+ throw new ServerMisbehaveException("Unexpected response: " + response);
+ }
+ }
+
+ /**
+ * Start subscribing.
+ * This method is blocking, the callback will be called in the same thread.
+ * This method cannot run simultaneously by more than one thread,
+ * or an {@link IllegalStateException} will be thrown.
+ * @param callback the callback which accepts message from server.
+ * @throws CommandFailureException If a command was rejected by the server.
+ * @throws IOException if an IO error occurred. In this case,
+ * it is usually unsafe to retry this function, since the internal socket is probably broken.
+ * You should use another new instance in order to reconnect.
+ */
+ public void subscribe(@NotNull Consumer<ByteBuffer> callback) throws CommandFailureException, IOException {
+ Objects.requireNonNull(callback);
+ if (isRunning) {
+ throw new IllegalStateException();
+ }
+ try {
+ while (true) {
+ try {
+ // only timeout when reading the command
+ // in other reading, we use default timeout
+ if (keepAliveIntervalMillis > 0) {
+ setSocketTimeout(keepAliveIntervalMillis);
+ }
+ var command = new String(InputStreamUtils.readBytes(is, 3), StandardCharsets.US_ASCII);
+ if (keepAliveIntervalMillis > 0) {
+ setSocketTimeout(DEFAULT_SOCKET_TIMEOUT_MILLIS);
+ }
+ if (command.equals("MSG")) {
+ var length = is.readLong();
+ if ((length & 0xffffffff00000000L) != 0) {
+ throw new RuntimeException(String.format("Client implementation does not support " +
+ "such long payload (%s Bytes)", Long.toUnsignedString(length)));
+ }
+ var message = InputStreamUtils.readBytes(is, (int) length);
+ callback.accept(ByteBuffer.wrap(message));
+ } else if (command.equals("NOP")) {
+ os.writeBytes("NIL");
+ os.flush();
+ } else if (command.equals("BYE")) {
+ break;
+ } else if (!command.equals("NIL")) {
+ throw new ServerMisbehaveException("Illegal command from server: " + command);
+ }
+ } catch (SocketTimeoutException e) {
+ // lock is unnecessary, since no other readers are present
+ super.keepAlive();
+ }
+ }
+ } finally {
+ isRunning = false;
+ disconnect();
+ }
+ }
+
+ @Override
+ public void keepAlive() {
+ throw new RuntimeException("Manual keepalive in this class is not allowed.");
+ }
+}
diff --git a/src/main/java/com/keuin/psmb4j/error/BadProtocolException.java b/src/main/java/com/keuin/psmb4j/error/BadProtocolException.java
new file mode 100644
index 0000000..893b8c3
--- /dev/null
+++ b/src/main/java/com/keuin/psmb4j/error/BadProtocolException.java
@@ -0,0 +1,13 @@
+package com.keuin.psmb4j.error;
+
+/**
+ * The server speaks a bad protocol which is not compatible to the client.
+ */
+public class BadProtocolException extends RuntimeException {
+ public BadProtocolException() {
+ }
+
+ public BadProtocolException(String message) {
+ super(message);
+ }
+}
diff --git a/src/main/java/com/keuin/psmb4j/error/CommandFailureException.java b/src/main/java/com/keuin/psmb4j/error/CommandFailureException.java
new file mode 100644
index 0000000..0ac6f07
--- /dev/null
+++ b/src/main/java/com/keuin/psmb4j/error/CommandFailureException.java
@@ -0,0 +1,10 @@
+package com.keuin.psmb4j.error;
+
+/**
+ * The previous command was rejected by the server for certain reasons.
+ */
+public class CommandFailureException extends ProtocolFailureException {
+ public CommandFailureException(String message) {
+ super(message);
+ }
+}
diff --git a/src/main/java/com/keuin/psmb4j/error/IllegalParameterException.java b/src/main/java/com/keuin/psmb4j/error/IllegalParameterException.java
new file mode 100644
index 0000000..0fe960b
--- /dev/null
+++ b/src/main/java/com/keuin/psmb4j/error/IllegalParameterException.java
@@ -0,0 +1,10 @@
+package com.keuin.psmb4j.error;
+
+/**
+ * Some parameters are illegal according to current version's protocol definition.
+ */
+public class IllegalParameterException extends ServerMisbehaveException {
+ public IllegalParameterException(String message) {
+ super(message);
+ }
+}
diff --git a/src/main/java/com/keuin/psmb4j/error/ProtocolFailureException.java b/src/main/java/com/keuin/psmb4j/error/ProtocolFailureException.java
new file mode 100644
index 0000000..55e6e30
--- /dev/null
+++ b/src/main/java/com/keuin/psmb4j/error/ProtocolFailureException.java
@@ -0,0 +1,13 @@
+package com.keuin.psmb4j.error;
+
+/**
+ * Expected protocol error.
+ */
+public class ProtocolFailureException extends Exception {
+ public ProtocolFailureException() {
+ }
+
+ public ProtocolFailureException(String message) {
+ super(message);
+ }
+}
diff --git a/src/main/java/com/keuin/psmb4j/error/ServerMisbehaveException.java b/src/main/java/com/keuin/psmb4j/error/ServerMisbehaveException.java
new file mode 100644
index 0000000..8aa1449
--- /dev/null
+++ b/src/main/java/com/keuin/psmb4j/error/ServerMisbehaveException.java
@@ -0,0 +1,10 @@
+package com.keuin.psmb4j.error;
+
+/**
+ * The server made an illegal response, which conflicts to the protocol definition.
+ */
+public class ServerMisbehaveException extends BadProtocolException {
+ public ServerMisbehaveException(String message) {
+ super(message);
+ }
+}
diff --git a/src/main/java/com/keuin/psmb4j/error/UnsupportedProtocolException.java b/src/main/java/com/keuin/psmb4j/error/UnsupportedProtocolException.java
new file mode 100644
index 0000000..a6cc935
--- /dev/null
+++ b/src/main/java/com/keuin/psmb4j/error/UnsupportedProtocolException.java
@@ -0,0 +1,7 @@
+package com.keuin.psmb4j.error;
+
+/**
+ * The PSMB server does not support our protocol version.
+ */
+public class UnsupportedProtocolException extends BadProtocolException {
+}
diff --git a/src/main/java/com/keuin/psmb4j/util/InputStreamUtils.java b/src/main/java/com/keuin/psmb4j/util/InputStreamUtils.java
new file mode 100644
index 0000000..7b3fd9c
--- /dev/null
+++ b/src/main/java/com/keuin/psmb4j/util/InputStreamUtils.java
@@ -0,0 +1,56 @@
+package com.keuin.psmb4j.util;
+
+import com.keuin.psmb4j.util.error.SocketClosedException;
+import com.keuin.psmb4j.util.error.StringLengthExceededException;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public class InputStreamUtils {
+ /**
+ * Read until '\0', the trailing '\0' is dropped.
+ */
+ public static String readCString(InputStream stream) throws IOException {
+ var sb = new StringBuilder();
+ int c;
+ while ((c = stream.read()) > 0) {
+ sb.append((char) c);
+ }
+ return sb.toString();
+ }
+
+ /**
+ * Read a C style string, with a length limit.
+ * If the string is longer than given limit,
+ * a {@link StringLengthExceededException} will be thrown.
+ */
+ public static String readCString(InputStream stream, long maxLength) throws IOException {
+ var sb = new StringBuilder();
+ int c;
+ long length = 0;
+ while ((c = stream.read()) > 0) {
+ sb.append((char) c);
+ if (++length > maxLength) {
+ throw new StringLengthExceededException(maxLength);
+ }
+ }
+ return sb.toString();
+ }
+
+ /**
+ * Read fixed length bytes from stream.
+ * If not enough, a {@link SocketClosedException} will be thrown.
+ */
+ public static byte[] readBytes(InputStream stream, int length) throws IOException {
+ var buffer = new byte[length];
+ int c;
+ for (int i = 0; i < length; i++) {
+ if ((c = stream.read()) >= 0) {
+ buffer[i] = (byte) c;
+ } else {
+ throw new SocketClosedException(length, i);
+ }
+ }
+ return buffer;
+ }
+}
diff --git a/src/main/java/com/keuin/psmb4j/util/StringUtils.java b/src/main/java/com/keuin/psmb4j/util/StringUtils.java
new file mode 100644
index 0000000..036a560
--- /dev/null
+++ b/src/main/java/com/keuin/psmb4j/util/StringUtils.java
@@ -0,0 +1,14 @@
+package com.keuin.psmb4j.util;
+
+import java.nio.charset.StandardCharsets;
+
+public class StringUtils {
+ /**
+ * If the string can be encoded into binary using ASCII.
+ */
+ @SuppressWarnings("BooleanMethodIsAlwaysInverted")
+ public static boolean isPureAscii(String v) {
+ return StandardCharsets.US_ASCII.newEncoder().canEncode(v);
+ // or "ISO-8859-1" for ISO Latin 1
+ }
+}
diff --git a/src/main/java/com/keuin/psmb4j/util/error/SocketClosedException.java b/src/main/java/com/keuin/psmb4j/util/error/SocketClosedException.java
new file mode 100644
index 0000000..e7c9b2d
--- /dev/null
+++ b/src/main/java/com/keuin/psmb4j/util/error/SocketClosedException.java
@@ -0,0 +1,15 @@
+package com.keuin.psmb4j.util.error;
+
+import java.io.IOException;
+
+/**
+ * The socket closed before enough bytes have been read out.
+ */
+public class SocketClosedException extends IOException {
+ public SocketClosedException() {
+ }
+
+ public SocketClosedException(long expected, long actual) {
+ super(String.format("expected %d bytes, EOF after reading %d bytes", expected, actual));
+ }
+}
diff --git a/src/main/java/com/keuin/psmb4j/util/error/StringLengthExceededException.java b/src/main/java/com/keuin/psmb4j/util/error/StringLengthExceededException.java
new file mode 100644
index 0000000..afb08b6
--- /dev/null
+++ b/src/main/java/com/keuin/psmb4j/util/error/StringLengthExceededException.java
@@ -0,0 +1,9 @@
+package com.keuin.psmb4j.util.error;
+
+import java.io.IOException;
+
+public class StringLengthExceededException extends IOException {
+ public StringLengthExceededException(long length) {
+ super(String.format("String is too long (%d Bytes)", length));
+ }
+}