diff options
author | Keuin <[email protected]> | 2022-02-04 22:54:22 +0800 |
---|---|---|
committer | Keuin <[email protected]> | 2022-02-04 22:54:22 +0800 |
commit | dfa163e5a3cffa46f0241210f2c8be1f8e298d7a (patch) | |
tree | 72db589d79bcc3bbca45c3a34a6dac71b2f55526 /src/main/java/com/keuin | |
parent | 12ddbba66e6f2585e59d05d1782c0e8ce9fe6146 (diff) |
Add psmb support.
Diffstat (limited to 'src/main/java/com/keuin')
18 files changed, 897 insertions, 1 deletions
diff --git a/src/main/java/com/keuin/crosslink/messaging/config/remote/PsmbEndpointFactory.java b/src/main/java/com/keuin/crosslink/messaging/config/remote/PsmbEndpointFactory.java new file mode 100644 index 0000000..3ee2e2b --- /dev/null +++ b/src/main/java/com/keuin/crosslink/messaging/config/remote/PsmbEndpointFactory.java @@ -0,0 +1,64 @@ +package com.keuin.crosslink.messaging.config.remote; + +import com.fasterxml.jackson.databind.JsonNode; +import com.keuin.crosslink.messaging.endpoint.IEndpoint; +import com.keuin.crosslink.messaging.endpoint.remote.psmb.PsmbEndpoint; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import java.util.Optional; + +public class PsmbEndpointFactory implements IRemoteEndpointFactory { + @Override + public @NotNull String type() { + return "psmb"; + } + + @Override + public @Nullable IEndpoint create(@NotNull JsonNode config) throws InvalidEndpointConfigurationException { + JsonNode node; + // read id, not optional + var id = Optional.ofNullable(config.get("id")).map(JsonNode::textValue).orElse(null); + if (id == null || id.isEmpty()) throw new InvalidEndpointConfigurationException("Invalid \"id\""); + + // read enabled, optional (true by default) + if (!config.get("enabled").isBoolean()) throw new InvalidEndpointConfigurationException("Invalid \"enabled\""); + var enabled = Optional.ofNullable(config.get("enabled")).map(JsonNode::booleanValue).orElse(true); + + // read host, not optional + var host = Optional.ofNullable(config.get("host")).map(JsonNode::textValue).orElse(null); + if (host == null || host.isEmpty()) throw new InvalidEndpointConfigurationException("Invalid \"host\""); + + // read port, not optional + node = config.get("port"); + if (node == null || !node.isIntegralNumber()) throw new InvalidEndpointConfigurationException("Invalid \"port\""); + final int port = node.intValue(); + if (port <= 0 || port >= 65536) throw new InvalidEndpointConfigurationException("Invalid \"port\""); + + // read subscribe_from, not optional + var pubTopic = Optional.ofNullable(config.get("publish_to")).map(JsonNode::textValue).orElse(null); + if (pubTopic == null || pubTopic.isEmpty()) throw new InvalidEndpointConfigurationException("Invalid \"publish_to\""); + + // read subscribe_from, not optional + var subPattern = Optional.ofNullable(config.get("subscribe_from")).map(JsonNode::textValue).orElse(null); + if (subPattern == null || subPattern.isEmpty()) throw new InvalidEndpointConfigurationException("Invalid \"subscribe_from\""); + + // read subscriber_id, not optional + node = config.get("subscriber_id"); + if (node == null || !node.isIntegralNumber()) { + throw new InvalidEndpointConfigurationException("Invalid \"subscriber_id\""); + } + final var subId = node.longValue(); + + // read keepalive, optional + node = config.get("keepalive"); + if (node != null && !node.isIntegralNumber()) { + throw new InvalidEndpointConfigurationException("Invalid \"keepalive\""); + } + final var keepalive = Optional.ofNullable(node).map(JsonNode::intValue).orElse(0); + + if (!enabled) return null; // not enabled, give a null value + + return new PsmbEndpoint(id, host, port, pubTopic, subPattern, subId, keepalive); + } +} diff --git a/src/main/java/com/keuin/crosslink/messaging/config/remote/RemoteEndpointFactory.java b/src/main/java/com/keuin/crosslink/messaging/config/remote/RemoteEndpointFactory.java index b73fba5..844bae4 100644 --- a/src/main/java/com/keuin/crosslink/messaging/config/remote/RemoteEndpointFactory.java +++ b/src/main/java/com/keuin/crosslink/messaging/config/remote/RemoteEndpointFactory.java @@ -17,7 +17,9 @@ import java.util.Optional; public class RemoteEndpointFactory { private static final Logger logger = LoggerFactory.getLogger(LoggerNaming.name().of("config").of("remotes").toString()); - private static final IRemoteEndpointFactory[] factories = new IRemoteEndpointFactory[]{new TelegramEndpointFactory()}; + private static final IRemoteEndpointFactory[] factories = new IRemoteEndpointFactory[]{ + new TelegramEndpointFactory(), new PsmbEndpointFactory() + }; /** * Create an {@link IEndpoint} instance based on given JSON config node. diff --git a/src/main/java/com/keuin/crosslink/messaging/endpoint/local/BungeeServerChatEndpoint.java b/src/main/java/com/keuin/crosslink/messaging/endpoint/local/BungeeServerChatEndpoint.java index a221f02..813eee0 100644 --- a/src/main/java/com/keuin/crosslink/messaging/endpoint/local/BungeeServerChatEndpoint.java +++ b/src/main/java/com/keuin/crosslink/messaging/endpoint/local/BungeeServerChatEndpoint.java @@ -88,6 +88,7 @@ public class BungeeServerChatEndpoint implements IEndpoint { } private void onMessage(@NotNull IMessage message) { + // TODO make message routing async, probably utilizing a producer-consumer model var rt = router; if (rt == null) { throw new IllegalStateException("Current endpoint hasn't bound to any router"); diff --git a/src/main/java/com/keuin/crosslink/messaging/endpoint/remote/psmb/PsmbEndpoint.java b/src/main/java/com/keuin/crosslink/messaging/endpoint/remote/psmb/PsmbEndpoint.java new file mode 100644 index 0000000..48df6a8 --- /dev/null +++ b/src/main/java/com/keuin/crosslink/messaging/endpoint/remote/psmb/PsmbEndpoint.java @@ -0,0 +1,187 @@ +package com.keuin.crosslink.messaging.endpoint.remote.psmb; + +import com.keuin.crosslink.messaging.endpoint.EndpointNamespace; +import com.keuin.crosslink.messaging.endpoint.IEndpoint; +import com.keuin.crosslink.messaging.message.IMessage; +import com.keuin.crosslink.messaging.router.IRouter; +import com.keuin.crosslink.util.LoggerNaming; +import com.keuin.psmb4j.PublishClient; +import com.keuin.psmb4j.SubscribeClient; +import com.keuin.psmb4j.error.CommandFailureException; +import org.jetbrains.annotations.NotNull; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Objects; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +public class PsmbEndpoint implements IEndpoint { + public static final int RETRY_INTERVAL_MILLIS = 10 * 1000; + private IRouter router = null; + private final String id; + + private final String host; + private final int port; + private final String pubTopic; + private final String subPattern; + private final long subId; // subscriber id + private final int keepAliveInterval; + + /* + PublishClient and SubscribeClient are networking components. + When IO error occurred, they will be invalidated, + thus we need to create a new instance for retry. + As a result, they are not final. + */ + + // pub queue and its read-write lock + private final BlockingQueue<IMessage> pubQueue = new ArrayBlockingQueue<>(32); + private final Object pubLock = new Object(); + + private final Thread pubThread = new Thread(this::publish); + private final Thread subThread = new Thread(this::subscribe); + + public PsmbEndpoint(String id, + String host, + int port, + String publicTopic, + String subscribePattern, + long subscriberId, + int keepAliveInterval) { + this.id = id; + this.host = host; + this.port = port; + this.pubTopic = publicTopic; + this.subPattern = subscribePattern; + this.subId = subscriberId; + this.keepAliveInterval = keepAliveInterval; + // start pubsub threads + pubThread.start(); + subThread.start(); + } + + private void publish() { + final var pub = new PublishClient(host, port); + final var logger = LoggerFactory.getLogger(LoggerNaming.name().of("endpoint").of("psmb") + .of(String.format("%s,%s", host, pubTopic)).of("pub").toString()); + try { + // reconnect loop + while (true) { + // try to connect + try { + pub.connect(); + pub.setPublish(pubTopic); + } catch (IOException | CommandFailureException ex) { + logger.error("Cannot connect to server", ex); + //noinspection BusyWait + Thread.sleep(RETRY_INTERVAL_MILLIS); + continue; + } + // connected successfully, send messages + try { + // publish loop + long lastBeat = -1; + while (true) { + if (Math.abs(System.currentTimeMillis() - lastBeat) >= keepAliveInterval) { + pub.keepAlive(); + lastBeat = System.currentTimeMillis(); + } + var message = pubQueue.poll(Math.max(keepAliveInterval, 0), TimeUnit.MILLISECONDS); + if (message == null) continue; + pub.publish(PsmbMessageSerializer.serialize(message)); + } + } catch (IOException | CommandFailureException ex) { + logger.error("Cannot publish message", ex); + pub.disconnect(); // reconnect in the outer loop + } + } + } catch (InterruptedException ignored) { + logger.info("Thread is interrupted."); + } finally { + logger.info("Thread is stopping."); + } + } + + private void subscribe() { + final var sub = new SubscribeClient(host, port, subPattern, keepAliveInterval, subId); + final var logger = LoggerFactory.getLogger(LoggerNaming.name().of("endpoint").of("psmb") + .of(String.format("%s,%d,%s", host, subId, subPattern)).of("sub").toString()); + try { + // reconnect loop + while (true) { + // try to connect + try { + sub.connect(); + sub.setSubscribe(subPattern, subId); + } catch (IOException | CommandFailureException ex) { + logger.error("Cannot connect to server", ex); + //noinspection BusyWait + Thread.sleep(RETRY_INTERVAL_MILLIS); + continue; + } + // connected successfully, receive messages + try { + // subscribe loop + sub.subscribe(raw -> { + try { + onMessage(PsmbMessageSerializer.deserialize(raw, this)); + } catch (PsmbMessageSerializer.IllegalPackedMessageException ex) { + logger.error("Cannot decode message", ex); + } + }); + } catch (IOException | CommandFailureException ex) { + logger.error("Cannot receive message", ex); + sub.disconnect(); // reconnect in the outer loop + } + } + } catch (InterruptedException ignored) { + logger.info("Thread is interrupted."); + } finally { + logger.info("Thread is stopping."); + } + } + + + private void onMessage(@NotNull IMessage message) { + Objects.requireNonNull(message); + this.router.sendMessage(message); + } + + @Override + public void sendMessage(IMessage message) { + synchronized (pubLock) { + pubQueue.add(message); + } + } + + @Override + public void setRouter(IRouter router) { + this.router = router; + } + + @Override + public void close() { + pubThread.interrupt(); + subThread.interrupt(); + } + + @Override + public @NotNull String id() { + return id; + } + + @Override + public @NotNull EndpointNamespace namespace() { + return EndpointNamespace.REMOTE; + } + + @Override + public String toString() { + return "PsmbEndpoint{" + + "router=" + router + + ", id='" + id + '\'' + + '}'; + } +} diff --git a/src/main/java/com/keuin/crosslink/messaging/endpoint/remote/psmb/PsmbMessageSerializer.java b/src/main/java/com/keuin/crosslink/messaging/endpoint/remote/psmb/PsmbMessageSerializer.java new file mode 100644 index 0000000..cc8b220 --- /dev/null +++ b/src/main/java/com/keuin/crosslink/messaging/endpoint/remote/psmb/PsmbMessageSerializer.java @@ -0,0 +1,179 @@ +package com.keuin.crosslink.messaging.endpoint.remote.psmb; + +import com.keuin.crosslink.messaging.endpoint.EndpointNamespace; +import com.keuin.crosslink.messaging.endpoint.IEndpoint; +import com.keuin.crosslink.messaging.message.IMessage; +import com.keuin.crosslink.messaging.router.IRouter; +import com.keuin.crosslink.messaging.sender.ISender; +import net.kyori.adventure.text.Component; +import net.kyori.adventure.text.format.TextColor; +import net.kyori.adventure.text.format.TextDecoration; +import org.bson.BsonBinaryReader; +import org.bson.BsonType; +import org.jetbrains.annotations.NotNull; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Objects; +import java.util.UUID; +import java.util.zip.GZIPInputStream; + +public class PsmbMessageSerializer { + + // psmb BSON message decoder + // copied from BungeeCross + + public static class IllegalPackedMessageException extends Exception { + public IllegalPackedMessageException() { + super(); + } + + public IllegalPackedMessageException(String missingPropertyName) { + super(String.format("missing BSON property `%s`.", missingPropertyName)); + } + + public IllegalPackedMessageException(String message, Throwable cause) { + super(message, cause); + } + } + + /** + * Deserialize a message from BSON data. + * + * @param type the type. + * @param data the bytes. + * @return the message. + * @throws IOException when failed to decompress gzip data. + */ + private static Component fromSerializedMessage(int type, byte[] data) throws IOException { + if (type == MessageType.TEXT) { + return Component.text(new String(data, StandardCharsets.UTF_8)); + } else if (type == MessageType.GZIP_TEXT) { + // decompress gzip bytes, then decode to string + var os = new GZIPInputStream(new ByteArrayInputStream(data)); + return Component.text(new String(os.readAllBytes(), StandardCharsets.UTF_8)); + } else if (type == MessageType.IMAGE) { + return Component.text("[图片]", TextColor.color(0xFFAA00), TextDecoration.BOLD); + } else { + return Component.text("[未知消息]", TextColor.color(0xAAAAAA), TextDecoration.BOLD); + } + } + + private static class MessageType { + public static final int TEXT = 0; + public static final int IMAGE = 1; + public static final int GZIP_TEXT = 2; + } + + public static byte[] serialize(@NotNull IMessage message) { + // TODO + Objects.requireNonNull(message); + throw new RuntimeException("not implemented"); + } + + public static @NotNull IMessage deserialize(@NotNull ByteBuffer buffer, @NotNull IEndpoint source) throws IllegalPackedMessageException { + Objects.requireNonNull(buffer); + Objects.requireNonNull(source); + try { + var reader = new BsonBinaryReader(buffer); + + reader.readStartDocument(); + + if (isBsonKeyInvalid(reader, "endpoint")) + throw new IllegalPackedMessageException("endpoint"); + var endpoint = reader.readString(); + + if (isBsonKeyInvalid(reader, "sender")) + throw new IllegalPackedMessageException("sender"); + var sender = reader.readString(); + + // read message array + var msgBuilder = Component.text(); + + if (isBsonKeyInvalid(reader, "msg")) + throw new IllegalPackedMessageException("msg"); + reader.readStartArray(); + while (reader.readBsonType() != BsonType.END_OF_DOCUMENT) { + // read sub array + reader.readStartArray(); +// var i = reader.readInt32(); +// System.out.println("Index: " + i); + // we only deal with text messages + var messageType = reader.readInt32(); + var data = reader.readBinaryData().getData(); + try { + msgBuilder.append(fromSerializedMessage(messageType, data)); + } catch (IOException e) { + throw new IllegalPackedMessageException("Unsupported message block type: " + messageType, e); + } + reader.readEndArray(); + } + reader.readEndArray(); + + if (isBsonKeyInvalid(reader, "time")) + throw new IllegalPackedMessageException("time"); + var createTime = reader.readInt64(); + + reader.readEndDocument(); + + // TODO: refactor, create a new class `UnpackedRedisMessage`, + // which has a more powerful internal representation + var senderObj = ISender.create(sender, UUID.nameUUIDFromBytes(sender.getBytes(StandardCharsets.UTF_8))); + // wrap the original source to rename the 'seen' endpoint to remote endpoint name + var wrappedSource = new IEndpoint() { + // delegate all methods to "source" + @Override + public void sendMessage(IMessage message) { + source.sendMessage(message); + } + + @Override + public void setRouter(IRouter router) { + source.setRouter(router); + } + + @Override + public void close() { + source.close(); + } + + @Override + public @NotNull String id() { + return source.id(); + } + + @Override + public @NotNull EndpointNamespace namespace() { + return source.namespace(); + } + + @Override + public @NotNull String friendlyName() { + return endpoint; + } + + @Override + public String namespacedId() { + return source.namespacedId(); + } + }; + return IMessage.create(wrappedSource, senderObj, msgBuilder.asComponent()); + } catch (Exception e) { + throw new IllegalPackedMessageException("invalid packed message data", e); + } + } + + /** + * Read in one BSON key and check if it is invalid. + * + * @param reader the BSON reader. + * @param keyName expected key. + * @return if the key name equals to what is expected. + */ + private static boolean isBsonKeyInvalid(BsonBinaryReader reader, String keyName) { + var name = reader.readName(); + return !keyName.equals(name); + } +} diff --git a/src/main/java/com/keuin/psmb4j/BaseClient.java b/src/main/java/com/keuin/psmb4j/BaseClient.java new file mode 100644 index 0000000..3c5cf08 --- /dev/null +++ b/src/main/java/com/keuin/psmb4j/BaseClient.java @@ -0,0 +1,126 @@ +package com.keuin.psmb4j; + +import com.keuin.psmb4j.error.IllegalParameterException; +import com.keuin.psmb4j.error.UnsupportedProtocolException; +import com.keuin.psmb4j.util.InputStreamUtils; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.Socket; +import java.net.SocketException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; + +public abstract class BaseClient implements AutoCloseable { + + protected final int protocolVersion = 1; + protected final int MAX_CSTRING_LENGTH = 1024; + protected final int DEFAULT_SOCKET_TIMEOUT_MILLIS = 0; + + private final String host; + private final int port; + + private Socket socket; + protected DataInputStream is; + protected DataOutputStream os; + + protected final Object socketWriteLock = new Object(); + protected final Object socketReadLock = new Object(); + + public BaseClient(String host, int port) { + this.host = host; + this.port = port; + } + + /** + * Connect to the server. + * This method must be called before sending any other messages, + * and should be called only once. + * If an IO error occurred when doing some operation, + * this client must be reconnected before next operations. + * @throws IOException if a network error occurred + */ + public void connect() throws IOException { + try { + if (this.socket != null) { + throw new IllegalStateException("already connected"); + } + this.socket = new Socket(host, port); + this.socket.setSoTimeout(DEFAULT_SOCKET_TIMEOUT_MILLIS); + this.is = new DataInputStream(this.socket.getInputStream()); + this.os = new DataOutputStream(this.socket.getOutputStream()); + + os.writeBytes("PSMB"); + os.writeInt(protocolVersion); + os.writeInt(0); // options + os.flush(); + var response = InputStreamUtils.readCString(is, MAX_CSTRING_LENGTH); + if (response.equals("UNSUPPORTED PROTOCOL")) { + throw new UnsupportedProtocolException(); + } else if (response.equals("OK")) { + var serverOptions = is.readInt(); + if (serverOptions != 0) { + throw new IllegalParameterException("Illegal server options: " + serverOptions); + } + } + } catch (IOException ex) { + // failed to connect, reset to initial state + close(); + throw ex; + } + } + + public void keepAlive() throws IOException { + final var nop = new byte[]{'N', 'O', 'P'}; + final var nil = new byte[]{'N', 'I', 'L'}; + synchronized (socketReadLock) { + synchronized (socketWriteLock) { + // lock the whole bidirectional communication + os.write(nop); + os.flush(); + // wait for a response NIL + var response = InputStreamUtils.readBytes(is, 3); + if (!Arrays.equals(response, nil)) { + throw new RuntimeException("illegal command from server: " + + new String(response, StandardCharsets.US_ASCII)); + } + } + } + } + + public void disconnect() { + if (os != null) { + try { + os.writeBytes("BYE"); + os.flush(); + os.close(); + } catch (IOException ignored) { + os = null; + } + } + if (is != null) { + try { + is.close(); + } catch (IOException ignored) { + is = null; + } + } + if (socket != null) { + try { + socket.close(); + } catch (IOException ignored) { + socket = null; + } + } + } + + protected void setSocketTimeout(int t) throws SocketException { + this.socket.setSoTimeout(t); + } + + @Override + public void close() { + disconnect(); + } +} diff --git a/src/main/java/com/keuin/psmb4j/PublishClient.java b/src/main/java/com/keuin/psmb4j/PublishClient.java new file mode 100644 index 0000000..54cf4ff --- /dev/null +++ b/src/main/java/com/keuin/psmb4j/PublishClient.java @@ -0,0 +1,59 @@ +package com.keuin.psmb4j; + +import com.keuin.psmb4j.error.CommandFailureException; +import com.keuin.psmb4j.error.ServerMisbehaveException; +import com.keuin.psmb4j.util.InputStreamUtils; +import com.keuin.psmb4j.util.StringUtils; + +import java.io.IOException; + +public class PublishClient extends BaseClient { + + /** + * Create a client in PUBLISH mode. + * @param host server host. + * @param port server port. + */ + public PublishClient(String host, int port) { + super(host, port); + } + + public void setPublish(String topicId) throws IOException, CommandFailureException { + if (!StringUtils.isPureAscii(topicId)) { + throw new IllegalArgumentException("topicId cannot be encoded with ASCII"); + } + setSocketTimeout(DEFAULT_SOCKET_TIMEOUT_MILLIS); + synchronized (socketWriteLock) { + os.writeBytes("PUB"); + os.writeBytes(topicId); + os.writeByte('\0'); + os.flush(); + } + + synchronized (socketReadLock) { + var response = InputStreamUtils.readCString(is, MAX_CSTRING_LENGTH); + if (response.equals("FAILED")) { + var errorMessage = InputStreamUtils.readCString(is, MAX_CSTRING_LENGTH); + throw new CommandFailureException("Publish failed: " + errorMessage); + } else if (!response.equals("OK")) { + throw new ServerMisbehaveException("Unexpected response: " + response); + } + } + } + + /** + * Publish a message. + * Note that this method is not thread-safe. + * @param message the message to publish. + * @throws CommandFailureException If a command was rejected by the server. + * @throws IOException if an IO error occurred. + */ + public void publish(byte[] message) throws CommandFailureException, IOException { + synchronized (this.socketWriteLock) { + os.writeBytes("MSG"); + os.writeLong(message.length); + os.write(message); + os.flush(); + } + } +} diff --git a/src/main/java/com/keuin/psmb4j/SubscribeClient.java b/src/main/java/com/keuin/psmb4j/SubscribeClient.java new file mode 100644 index 0000000..2c0083d --- /dev/null +++ b/src/main/java/com/keuin/psmb4j/SubscribeClient.java @@ -0,0 +1,121 @@ +package com.keuin.psmb4j; + +import com.keuin.psmb4j.error.CommandFailureException; +import com.keuin.psmb4j.error.ServerMisbehaveException; +import com.keuin.psmb4j.util.InputStreamUtils; +import com.keuin.psmb4j.util.StringUtils; +import org.jetbrains.annotations.NotNull; + +import java.io.IOException; +import java.net.SocketTimeoutException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Objects; +import java.util.function.Consumer; + +public class SubscribeClient extends BaseClient { + + private final String pattern; + private final long subscriberId; + private final int keepAliveIntervalMillis; + + private volatile boolean isRunning = false; + + /** + * Create a client in SUBSCRIBE mode. + * @param host the host to connect to. + * @param port the port to connect to. + * @param pattern the pattern to subscribe + * @param keepAliveIntervalMillis interval between sending keep-alive messages. If <=0, keep-alive is disabled. + * @param subscriberId an integer identifying a subscriber. + */ + public SubscribeClient(String host, int port, String pattern, int keepAliveIntervalMillis, long subscriberId) { + super(host, port); + this.pattern = pattern; + this.subscriberId = subscriberId; + this.keepAliveIntervalMillis = keepAliveIntervalMillis; + if (keepAliveIntervalMillis < 3000) { + throw new IllegalArgumentException("Keep alive interval is too small!"); + } + } + + public void setSubscribe(String pattern, long subscriberId) throws IOException, CommandFailureException { + if (!StringUtils.isPureAscii(pattern)) { + throw new IllegalArgumentException("pattern cannot be encoded in ASCII"); + } + os.writeBytes("SUB"); + os.writeInt(1); // options + os.writeBytes(pattern); + os.writeByte('\0'); + os.writeLong(subscriberId); + os.flush(); + + var response = InputStreamUtils.readCString(is, MAX_CSTRING_LENGTH); + if (response.equals("FAILED")) { + var errorMessage = InputStreamUtils.readCString(is, MAX_CSTRING_LENGTH); + throw new CommandFailureException("Subscribe failed: " + errorMessage); + } else if (!response.equals("OK")) { + throw new ServerMisbehaveException("Unexpected response: " + response); + } + } + + /** + * Start subscribing. + * This method is blocking, the callback will be called in the same thread. + * This method cannot run simultaneously by more than one thread, + * or an {@link IllegalStateException} will be thrown. + * @param callback the callback which accepts message from server. + * @throws CommandFailureException If a command was rejected by the server. + * @throws IOException if an IO error occurred. In this case, + * it is usually unsafe to retry this function, since the internal socket is probably broken. + * You should use another new instance in order to reconnect. + */ + public void subscribe(@NotNull Consumer<ByteBuffer> callback) throws CommandFailureException, IOException { + Objects.requireNonNull(callback); + if (isRunning) { + throw new IllegalStateException(); + } + try { + while (true) { + try { + // only timeout when reading the command + // in other reading, we use default timeout + if (keepAliveIntervalMillis > 0) { + setSocketTimeout(keepAliveIntervalMillis); + } + var command = new String(InputStreamUtils.readBytes(is, 3), StandardCharsets.US_ASCII); + if (keepAliveIntervalMillis > 0) { + setSocketTimeout(DEFAULT_SOCKET_TIMEOUT_MILLIS); + } + if (command.equals("MSG")) { + var length = is.readLong(); + if ((length & 0xffffffff00000000L) != 0) { + throw new RuntimeException(String.format("Client implementation does not support " + + "such long payload (%s Bytes)", Long.toUnsignedString(length))); + } + var message = InputStreamUtils.readBytes(is, (int) length); + callback.accept(ByteBuffer.wrap(message)); + } else if (command.equals("NOP")) { + os.writeBytes("NIL"); + os.flush(); + } else if (command.equals("BYE")) { + break; + } else if (!command.equals("NIL")) { + throw new ServerMisbehaveException("Illegal command from server: " + command); + } + } catch (SocketTimeoutException e) { + // lock is unnecessary, since no other readers are present + super.keepAlive(); + } + } + } finally { + isRunning = false; + disconnect(); + } + } + + @Override + public void keepAlive() { + throw new RuntimeException("Manual keepalive in this class is not allowed."); + } +} diff --git a/src/main/java/com/keuin/psmb4j/error/BadProtocolException.java b/src/main/java/com/keuin/psmb4j/error/BadProtocolException.java new file mode 100644 index 0000000..893b8c3 --- /dev/null +++ b/src/main/java/com/keuin/psmb4j/error/BadProtocolException.java @@ -0,0 +1,13 @@ +package com.keuin.psmb4j.error; + +/** + * The server speaks a bad protocol which is not compatible to the client. + */ +public class BadProtocolException extends RuntimeException { + public BadProtocolException() { + } + + public BadProtocolException(String message) { + super(message); + } +} diff --git a/src/main/java/com/keuin/psmb4j/error/CommandFailureException.java b/src/main/java/com/keuin/psmb4j/error/CommandFailureException.java new file mode 100644 index 0000000..0ac6f07 --- /dev/null +++ b/src/main/java/com/keuin/psmb4j/error/CommandFailureException.java @@ -0,0 +1,10 @@ +package com.keuin.psmb4j.error; + +/** + * The previous command was rejected by the server for certain reasons. + */ +public class CommandFailureException extends ProtocolFailureException { + public CommandFailureException(String message) { + super(message); + } +} diff --git a/src/main/java/com/keuin/psmb4j/error/IllegalParameterException.java b/src/main/java/com/keuin/psmb4j/error/IllegalParameterException.java new file mode 100644 index 0000000..0fe960b --- /dev/null +++ b/src/main/java/com/keuin/psmb4j/error/IllegalParameterException.java @@ -0,0 +1,10 @@ +package com.keuin.psmb4j.error; + +/** + * Some parameters are illegal according to current version's protocol definition. + */ +public class IllegalParameterException extends ServerMisbehaveException { + public IllegalParameterException(String message) { + super(message); + } +} diff --git a/src/main/java/com/keuin/psmb4j/error/ProtocolFailureException.java b/src/main/java/com/keuin/psmb4j/error/ProtocolFailureException.java new file mode 100644 index 0000000..55e6e30 --- /dev/null +++ b/src/main/java/com/keuin/psmb4j/error/ProtocolFailureException.java @@ -0,0 +1,13 @@ +package com.keuin.psmb4j.error; + +/** + * Expected protocol error. + */ +public class ProtocolFailureException extends Exception { + public ProtocolFailureException() { + } + + public ProtocolFailureException(String message) { + super(message); + } +} diff --git a/src/main/java/com/keuin/psmb4j/error/ServerMisbehaveException.java b/src/main/java/com/keuin/psmb4j/error/ServerMisbehaveException.java new file mode 100644 index 0000000..8aa1449 --- /dev/null +++ b/src/main/java/com/keuin/psmb4j/error/ServerMisbehaveException.java @@ -0,0 +1,10 @@ +package com.keuin.psmb4j.error; + +/** + * The server made an illegal response, which conflicts to the protocol definition. + */ +public class ServerMisbehaveException extends BadProtocolException { + public ServerMisbehaveException(String message) { + super(message); + } +} diff --git a/src/main/java/com/keuin/psmb4j/error/UnsupportedProtocolException.java b/src/main/java/com/keuin/psmb4j/error/UnsupportedProtocolException.java new file mode 100644 index 0000000..a6cc935 --- /dev/null +++ b/src/main/java/com/keuin/psmb4j/error/UnsupportedProtocolException.java @@ -0,0 +1,7 @@ +package com.keuin.psmb4j.error; + +/** + * The PSMB server does not support our protocol version. + */ +public class UnsupportedProtocolException extends BadProtocolException { +} diff --git a/src/main/java/com/keuin/psmb4j/util/InputStreamUtils.java b/src/main/java/com/keuin/psmb4j/util/InputStreamUtils.java new file mode 100644 index 0000000..7b3fd9c --- /dev/null +++ b/src/main/java/com/keuin/psmb4j/util/InputStreamUtils.java @@ -0,0 +1,56 @@ +package com.keuin.psmb4j.util; + +import com.keuin.psmb4j.util.error.SocketClosedException; +import com.keuin.psmb4j.util.error.StringLengthExceededException; + +import java.io.IOException; +import java.io.InputStream; + +public class InputStreamUtils { + /** + * Read until '\0', the trailing '\0' is dropped. + */ + public static String readCString(InputStream stream) throws IOException { + var sb = new StringBuilder(); + int c; + while ((c = stream.read()) > 0) { + sb.append((char) c); + } + return sb.toString(); + } + + /** + * Read a C style string, with a length limit. + * If the string is longer than given limit, + * a {@link StringLengthExceededException} will be thrown. + */ + public static String readCString(InputStream stream, long maxLength) throws IOException { + var sb = new StringBuilder(); + int c; + long length = 0; + while ((c = stream.read()) > 0) { + sb.append((char) c); + if (++length > maxLength) { + throw new StringLengthExceededException(maxLength); + } + } + return sb.toString(); + } + + /** + * Read fixed length bytes from stream. + * If not enough, a {@link SocketClosedException} will be thrown. + */ + public static byte[] readBytes(InputStream stream, int length) throws IOException { + var buffer = new byte[length]; + int c; + for (int i = 0; i < length; i++) { + if ((c = stream.read()) >= 0) { + buffer[i] = (byte) c; + } else { + throw new SocketClosedException(length, i); + } + } + return buffer; + } +} diff --git a/src/main/java/com/keuin/psmb4j/util/StringUtils.java b/src/main/java/com/keuin/psmb4j/util/StringUtils.java new file mode 100644 index 0000000..036a560 --- /dev/null +++ b/src/main/java/com/keuin/psmb4j/util/StringUtils.java @@ -0,0 +1,14 @@ +package com.keuin.psmb4j.util; + +import java.nio.charset.StandardCharsets; + +public class StringUtils { + /** + * If the string can be encoded into binary using ASCII. + */ + @SuppressWarnings("BooleanMethodIsAlwaysInverted") + public static boolean isPureAscii(String v) { + return StandardCharsets.US_ASCII.newEncoder().canEncode(v); + // or "ISO-8859-1" for ISO Latin 1 + } +} diff --git a/src/main/java/com/keuin/psmb4j/util/error/SocketClosedException.java b/src/main/java/com/keuin/psmb4j/util/error/SocketClosedException.java new file mode 100644 index 0000000..e7c9b2d --- /dev/null +++ b/src/main/java/com/keuin/psmb4j/util/error/SocketClosedException.java @@ -0,0 +1,15 @@ +package com.keuin.psmb4j.util.error; + +import java.io.IOException; + +/** + * The socket closed before enough bytes have been read out. + */ +public class SocketClosedException extends IOException { + public SocketClosedException() { + } + + public SocketClosedException(long expected, long actual) { + super(String.format("expected %d bytes, EOF after reading %d bytes", expected, actual)); + } +} diff --git a/src/main/java/com/keuin/psmb4j/util/error/StringLengthExceededException.java b/src/main/java/com/keuin/psmb4j/util/error/StringLengthExceededException.java new file mode 100644 index 0000000..afb08b6 --- /dev/null +++ b/src/main/java/com/keuin/psmb4j/util/error/StringLengthExceededException.java @@ -0,0 +1,9 @@ +package com.keuin.psmb4j.util.error; + +import java.io.IOException; + +public class StringLengthExceededException extends IOException { + public StringLengthExceededException(long length) { + super(String.format("String is too long (%d Bytes)", length)); + } +} |