summaryrefslogtreecommitdiff
path: root/src/main/java/com/keuin/psmb4j
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/keuin/psmb4j')
-rw-r--r--src/main/java/com/keuin/psmb4j/BaseClient.java126
-rw-r--r--src/main/java/com/keuin/psmb4j/PublishClient.java59
-rw-r--r--src/main/java/com/keuin/psmb4j/SubscribeClient.java121
-rw-r--r--src/main/java/com/keuin/psmb4j/error/BadProtocolException.java13
-rw-r--r--src/main/java/com/keuin/psmb4j/error/CommandFailureException.java10
-rw-r--r--src/main/java/com/keuin/psmb4j/error/IllegalParameterException.java10
-rw-r--r--src/main/java/com/keuin/psmb4j/error/ProtocolFailureException.java13
-rw-r--r--src/main/java/com/keuin/psmb4j/error/ServerMisbehaveException.java10
-rw-r--r--src/main/java/com/keuin/psmb4j/error/UnsupportedProtocolException.java7
-rw-r--r--src/main/java/com/keuin/psmb4j/util/InputStreamUtils.java56
-rw-r--r--src/main/java/com/keuin/psmb4j/util/StringUtils.java14
-rw-r--r--src/main/java/com/keuin/psmb4j/util/error/SocketClosedException.java15
-rw-r--r--src/main/java/com/keuin/psmb4j/util/error/StringLengthExceededException.java9
13 files changed, 463 insertions, 0 deletions
diff --git a/src/main/java/com/keuin/psmb4j/BaseClient.java b/src/main/java/com/keuin/psmb4j/BaseClient.java
new file mode 100644
index 0000000..3c5cf08
--- /dev/null
+++ b/src/main/java/com/keuin/psmb4j/BaseClient.java
@@ -0,0 +1,126 @@
+package com.keuin.psmb4j;
+
+import com.keuin.psmb4j.error.IllegalParameterException;
+import com.keuin.psmb4j.error.UnsupportedProtocolException;
+import com.keuin.psmb4j.util.InputStreamUtils;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.Socket;
+import java.net.SocketException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+public abstract class BaseClient implements AutoCloseable {
+
+ protected final int protocolVersion = 1;
+ protected final int MAX_CSTRING_LENGTH = 1024;
+ protected final int DEFAULT_SOCKET_TIMEOUT_MILLIS = 0;
+
+ private final String host;
+ private final int port;
+
+ private Socket socket;
+ protected DataInputStream is;
+ protected DataOutputStream os;
+
+ protected final Object socketWriteLock = new Object();
+ protected final Object socketReadLock = new Object();
+
+ public BaseClient(String host, int port) {
+ this.host = host;
+ this.port = port;
+ }
+
+ /**
+ * Connect to the server.
+ * This method must be called before sending any other messages,
+ * and should be called only once.
+ * If an IO error occurred when doing some operation,
+ * this client must be reconnected before next operations.
+ * @throws IOException if a network error occurred
+ */
+ public void connect() throws IOException {
+ try {
+ if (this.socket != null) {
+ throw new IllegalStateException("already connected");
+ }
+ this.socket = new Socket(host, port);
+ this.socket.setSoTimeout(DEFAULT_SOCKET_TIMEOUT_MILLIS);
+ this.is = new DataInputStream(this.socket.getInputStream());
+ this.os = new DataOutputStream(this.socket.getOutputStream());
+
+ os.writeBytes("PSMB");
+ os.writeInt(protocolVersion);
+ os.writeInt(0); // options
+ os.flush();
+ var response = InputStreamUtils.readCString(is, MAX_CSTRING_LENGTH);
+ if (response.equals("UNSUPPORTED PROTOCOL")) {
+ throw new UnsupportedProtocolException();
+ } else if (response.equals("OK")) {
+ var serverOptions = is.readInt();
+ if (serverOptions != 0) {
+ throw new IllegalParameterException("Illegal server options: " + serverOptions);
+ }
+ }
+ } catch (IOException ex) {
+ // failed to connect, reset to initial state
+ close();
+ throw ex;
+ }
+ }
+
+ public void keepAlive() throws IOException {
+ final var nop = new byte[]{'N', 'O', 'P'};
+ final var nil = new byte[]{'N', 'I', 'L'};
+ synchronized (socketReadLock) {
+ synchronized (socketWriteLock) {
+ // lock the whole bidirectional communication
+ os.write(nop);
+ os.flush();
+ // wait for a response NIL
+ var response = InputStreamUtils.readBytes(is, 3);
+ if (!Arrays.equals(response, nil)) {
+ throw new RuntimeException("illegal command from server: " +
+ new String(response, StandardCharsets.US_ASCII));
+ }
+ }
+ }
+ }
+
+ public void disconnect() {
+ if (os != null) {
+ try {
+ os.writeBytes("BYE");
+ os.flush();
+ os.close();
+ } catch (IOException ignored) {
+ os = null;
+ }
+ }
+ if (is != null) {
+ try {
+ is.close();
+ } catch (IOException ignored) {
+ is = null;
+ }
+ }
+ if (socket != null) {
+ try {
+ socket.close();
+ } catch (IOException ignored) {
+ socket = null;
+ }
+ }
+ }
+
+ protected void setSocketTimeout(int t) throws SocketException {
+ this.socket.setSoTimeout(t);
+ }
+
+ @Override
+ public void close() {
+ disconnect();
+ }
+}
diff --git a/src/main/java/com/keuin/psmb4j/PublishClient.java b/src/main/java/com/keuin/psmb4j/PublishClient.java
new file mode 100644
index 0000000..54cf4ff
--- /dev/null
+++ b/src/main/java/com/keuin/psmb4j/PublishClient.java
@@ -0,0 +1,59 @@
+package com.keuin.psmb4j;
+
+import com.keuin.psmb4j.error.CommandFailureException;
+import com.keuin.psmb4j.error.ServerMisbehaveException;
+import com.keuin.psmb4j.util.InputStreamUtils;
+import com.keuin.psmb4j.util.StringUtils;
+
+import java.io.IOException;
+
+public class PublishClient extends BaseClient {
+
+ /**
+ * Create a client in PUBLISH mode.
+ * @param host server host.
+ * @param port server port.
+ */
+ public PublishClient(String host, int port) {
+ super(host, port);
+ }
+
+ public void setPublish(String topicId) throws IOException, CommandFailureException {
+ if (!StringUtils.isPureAscii(topicId)) {
+ throw new IllegalArgumentException("topicId cannot be encoded with ASCII");
+ }
+ setSocketTimeout(DEFAULT_SOCKET_TIMEOUT_MILLIS);
+ synchronized (socketWriteLock) {
+ os.writeBytes("PUB");
+ os.writeBytes(topicId);
+ os.writeByte('\0');
+ os.flush();
+ }
+
+ synchronized (socketReadLock) {
+ var response = InputStreamUtils.readCString(is, MAX_CSTRING_LENGTH);
+ if (response.equals("FAILED")) {
+ var errorMessage = InputStreamUtils.readCString(is, MAX_CSTRING_LENGTH);
+ throw new CommandFailureException("Publish failed: " + errorMessage);
+ } else if (!response.equals("OK")) {
+ throw new ServerMisbehaveException("Unexpected response: " + response);
+ }
+ }
+ }
+
+ /**
+ * Publish a message.
+ * Note that this method is not thread-safe.
+ * @param message the message to publish.
+ * @throws CommandFailureException If a command was rejected by the server.
+ * @throws IOException if an IO error occurred.
+ */
+ public void publish(byte[] message) throws CommandFailureException, IOException {
+ synchronized (this.socketWriteLock) {
+ os.writeBytes("MSG");
+ os.writeLong(message.length);
+ os.write(message);
+ os.flush();
+ }
+ }
+}
diff --git a/src/main/java/com/keuin/psmb4j/SubscribeClient.java b/src/main/java/com/keuin/psmb4j/SubscribeClient.java
new file mode 100644
index 0000000..2c0083d
--- /dev/null
+++ b/src/main/java/com/keuin/psmb4j/SubscribeClient.java
@@ -0,0 +1,121 @@
+package com.keuin.psmb4j;
+
+import com.keuin.psmb4j.error.CommandFailureException;
+import com.keuin.psmb4j.error.ServerMisbehaveException;
+import com.keuin.psmb4j.util.InputStreamUtils;
+import com.keuin.psmb4j.util.StringUtils;
+import org.jetbrains.annotations.NotNull;
+
+import java.io.IOException;
+import java.net.SocketTimeoutException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Objects;
+import java.util.function.Consumer;
+
+public class SubscribeClient extends BaseClient {
+
+ private final String pattern;
+ private final long subscriberId;
+ private final int keepAliveIntervalMillis;
+
+ private volatile boolean isRunning = false;
+
+ /**
+ * Create a client in SUBSCRIBE mode.
+ * @param host the host to connect to.
+ * @param port the port to connect to.
+ * @param pattern the pattern to subscribe
+ * @param keepAliveIntervalMillis interval between sending keep-alive messages. If <=0, keep-alive is disabled.
+ * @param subscriberId an integer identifying a subscriber.
+ */
+ public SubscribeClient(String host, int port, String pattern, int keepAliveIntervalMillis, long subscriberId) {
+ super(host, port);
+ this.pattern = pattern;
+ this.subscriberId = subscriberId;
+ this.keepAliveIntervalMillis = keepAliveIntervalMillis;
+ if (keepAliveIntervalMillis < 3000) {
+ throw new IllegalArgumentException("Keep alive interval is too small!");
+ }
+ }
+
+ public void setSubscribe(String pattern, long subscriberId) throws IOException, CommandFailureException {
+ if (!StringUtils.isPureAscii(pattern)) {
+ throw new IllegalArgumentException("pattern cannot be encoded in ASCII");
+ }
+ os.writeBytes("SUB");
+ os.writeInt(1); // options
+ os.writeBytes(pattern);
+ os.writeByte('\0');
+ os.writeLong(subscriberId);
+ os.flush();
+
+ var response = InputStreamUtils.readCString(is, MAX_CSTRING_LENGTH);
+ if (response.equals("FAILED")) {
+ var errorMessage = InputStreamUtils.readCString(is, MAX_CSTRING_LENGTH);
+ throw new CommandFailureException("Subscribe failed: " + errorMessage);
+ } else if (!response.equals("OK")) {
+ throw new ServerMisbehaveException("Unexpected response: " + response);
+ }
+ }
+
+ /**
+ * Start subscribing.
+ * This method is blocking, the callback will be called in the same thread.
+ * This method cannot run simultaneously by more than one thread,
+ * or an {@link IllegalStateException} will be thrown.
+ * @param callback the callback which accepts message from server.
+ * @throws CommandFailureException If a command was rejected by the server.
+ * @throws IOException if an IO error occurred. In this case,
+ * it is usually unsafe to retry this function, since the internal socket is probably broken.
+ * You should use another new instance in order to reconnect.
+ */
+ public void subscribe(@NotNull Consumer<ByteBuffer> callback) throws CommandFailureException, IOException {
+ Objects.requireNonNull(callback);
+ if (isRunning) {
+ throw new IllegalStateException();
+ }
+ try {
+ while (true) {
+ try {
+ // only timeout when reading the command
+ // in other reading, we use default timeout
+ if (keepAliveIntervalMillis > 0) {
+ setSocketTimeout(keepAliveIntervalMillis);
+ }
+ var command = new String(InputStreamUtils.readBytes(is, 3), StandardCharsets.US_ASCII);
+ if (keepAliveIntervalMillis > 0) {
+ setSocketTimeout(DEFAULT_SOCKET_TIMEOUT_MILLIS);
+ }
+ if (command.equals("MSG")) {
+ var length = is.readLong();
+ if ((length & 0xffffffff00000000L) != 0) {
+ throw new RuntimeException(String.format("Client implementation does not support " +
+ "such long payload (%s Bytes)", Long.toUnsignedString(length)));
+ }
+ var message = InputStreamUtils.readBytes(is, (int) length);
+ callback.accept(ByteBuffer.wrap(message));
+ } else if (command.equals("NOP")) {
+ os.writeBytes("NIL");
+ os.flush();
+ } else if (command.equals("BYE")) {
+ break;
+ } else if (!command.equals("NIL")) {
+ throw new ServerMisbehaveException("Illegal command from server: " + command);
+ }
+ } catch (SocketTimeoutException e) {
+ // lock is unnecessary, since no other readers are present
+ super.keepAlive();
+ }
+ }
+ } finally {
+ isRunning = false;
+ disconnect();
+ }
+ }
+
+ @Override
+ public void keepAlive() {
+ throw new RuntimeException("Manual keepalive in this class is not allowed.");
+ }
+}
diff --git a/src/main/java/com/keuin/psmb4j/error/BadProtocolException.java b/src/main/java/com/keuin/psmb4j/error/BadProtocolException.java
new file mode 100644
index 0000000..893b8c3
--- /dev/null
+++ b/src/main/java/com/keuin/psmb4j/error/BadProtocolException.java
@@ -0,0 +1,13 @@
+package com.keuin.psmb4j.error;
+
+/**
+ * The server speaks a bad protocol which is not compatible to the client.
+ */
+public class BadProtocolException extends RuntimeException {
+ public BadProtocolException() {
+ }
+
+ public BadProtocolException(String message) {
+ super(message);
+ }
+}
diff --git a/src/main/java/com/keuin/psmb4j/error/CommandFailureException.java b/src/main/java/com/keuin/psmb4j/error/CommandFailureException.java
new file mode 100644
index 0000000..0ac6f07
--- /dev/null
+++ b/src/main/java/com/keuin/psmb4j/error/CommandFailureException.java
@@ -0,0 +1,10 @@
+package com.keuin.psmb4j.error;
+
+/**
+ * The previous command was rejected by the server for certain reasons.
+ */
+public class CommandFailureException extends ProtocolFailureException {
+ public CommandFailureException(String message) {
+ super(message);
+ }
+}
diff --git a/src/main/java/com/keuin/psmb4j/error/IllegalParameterException.java b/src/main/java/com/keuin/psmb4j/error/IllegalParameterException.java
new file mode 100644
index 0000000..0fe960b
--- /dev/null
+++ b/src/main/java/com/keuin/psmb4j/error/IllegalParameterException.java
@@ -0,0 +1,10 @@
+package com.keuin.psmb4j.error;
+
+/**
+ * Some parameters are illegal according to current version's protocol definition.
+ */
+public class IllegalParameterException extends ServerMisbehaveException {
+ public IllegalParameterException(String message) {
+ super(message);
+ }
+}
diff --git a/src/main/java/com/keuin/psmb4j/error/ProtocolFailureException.java b/src/main/java/com/keuin/psmb4j/error/ProtocolFailureException.java
new file mode 100644
index 0000000..55e6e30
--- /dev/null
+++ b/src/main/java/com/keuin/psmb4j/error/ProtocolFailureException.java
@@ -0,0 +1,13 @@
+package com.keuin.psmb4j.error;
+
+/**
+ * Expected protocol error.
+ */
+public class ProtocolFailureException extends Exception {
+ public ProtocolFailureException() {
+ }
+
+ public ProtocolFailureException(String message) {
+ super(message);
+ }
+}
diff --git a/src/main/java/com/keuin/psmb4j/error/ServerMisbehaveException.java b/src/main/java/com/keuin/psmb4j/error/ServerMisbehaveException.java
new file mode 100644
index 0000000..8aa1449
--- /dev/null
+++ b/src/main/java/com/keuin/psmb4j/error/ServerMisbehaveException.java
@@ -0,0 +1,10 @@
+package com.keuin.psmb4j.error;
+
+/**
+ * The server made an illegal response, which conflicts to the protocol definition.
+ */
+public class ServerMisbehaveException extends BadProtocolException {
+ public ServerMisbehaveException(String message) {
+ super(message);
+ }
+}
diff --git a/src/main/java/com/keuin/psmb4j/error/UnsupportedProtocolException.java b/src/main/java/com/keuin/psmb4j/error/UnsupportedProtocolException.java
new file mode 100644
index 0000000..a6cc935
--- /dev/null
+++ b/src/main/java/com/keuin/psmb4j/error/UnsupportedProtocolException.java
@@ -0,0 +1,7 @@
+package com.keuin.psmb4j.error;
+
+/**
+ * The PSMB server does not support our protocol version.
+ */
+public class UnsupportedProtocolException extends BadProtocolException {
+}
diff --git a/src/main/java/com/keuin/psmb4j/util/InputStreamUtils.java b/src/main/java/com/keuin/psmb4j/util/InputStreamUtils.java
new file mode 100644
index 0000000..7b3fd9c
--- /dev/null
+++ b/src/main/java/com/keuin/psmb4j/util/InputStreamUtils.java
@@ -0,0 +1,56 @@
+package com.keuin.psmb4j.util;
+
+import com.keuin.psmb4j.util.error.SocketClosedException;
+import com.keuin.psmb4j.util.error.StringLengthExceededException;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public class InputStreamUtils {
+ /**
+ * Read until '\0', the trailing '\0' is dropped.
+ */
+ public static String readCString(InputStream stream) throws IOException {
+ var sb = new StringBuilder();
+ int c;
+ while ((c = stream.read()) > 0) {
+ sb.append((char) c);
+ }
+ return sb.toString();
+ }
+
+ /**
+ * Read a C style string, with a length limit.
+ * If the string is longer than given limit,
+ * a {@link StringLengthExceededException} will be thrown.
+ */
+ public static String readCString(InputStream stream, long maxLength) throws IOException {
+ var sb = new StringBuilder();
+ int c;
+ long length = 0;
+ while ((c = stream.read()) > 0) {
+ sb.append((char) c);
+ if (++length > maxLength) {
+ throw new StringLengthExceededException(maxLength);
+ }
+ }
+ return sb.toString();
+ }
+
+ /**
+ * Read fixed length bytes from stream.
+ * If not enough, a {@link SocketClosedException} will be thrown.
+ */
+ public static byte[] readBytes(InputStream stream, int length) throws IOException {
+ var buffer = new byte[length];
+ int c;
+ for (int i = 0; i < length; i++) {
+ if ((c = stream.read()) >= 0) {
+ buffer[i] = (byte) c;
+ } else {
+ throw new SocketClosedException(length, i);
+ }
+ }
+ return buffer;
+ }
+}
diff --git a/src/main/java/com/keuin/psmb4j/util/StringUtils.java b/src/main/java/com/keuin/psmb4j/util/StringUtils.java
new file mode 100644
index 0000000..036a560
--- /dev/null
+++ b/src/main/java/com/keuin/psmb4j/util/StringUtils.java
@@ -0,0 +1,14 @@
+package com.keuin.psmb4j.util;
+
+import java.nio.charset.StandardCharsets;
+
+public class StringUtils {
+ /**
+ * If the string can be encoded into binary using ASCII.
+ */
+ @SuppressWarnings("BooleanMethodIsAlwaysInverted")
+ public static boolean isPureAscii(String v) {
+ return StandardCharsets.US_ASCII.newEncoder().canEncode(v);
+ // or "ISO-8859-1" for ISO Latin 1
+ }
+}
diff --git a/src/main/java/com/keuin/psmb4j/util/error/SocketClosedException.java b/src/main/java/com/keuin/psmb4j/util/error/SocketClosedException.java
new file mode 100644
index 0000000..e7c9b2d
--- /dev/null
+++ b/src/main/java/com/keuin/psmb4j/util/error/SocketClosedException.java
@@ -0,0 +1,15 @@
+package com.keuin.psmb4j.util.error;
+
+import java.io.IOException;
+
+/**
+ * The socket closed before enough bytes have been read out.
+ */
+public class SocketClosedException extends IOException {
+ public SocketClosedException() {
+ }
+
+ public SocketClosedException(long expected, long actual) {
+ super(String.format("expected %d bytes, EOF after reading %d bytes", expected, actual));
+ }
+}
diff --git a/src/main/java/com/keuin/psmb4j/util/error/StringLengthExceededException.java b/src/main/java/com/keuin/psmb4j/util/error/StringLengthExceededException.java
new file mode 100644
index 0000000..afb08b6
--- /dev/null
+++ b/src/main/java/com/keuin/psmb4j/util/error/StringLengthExceededException.java
@@ -0,0 +1,9 @@
+package com.keuin.psmb4j.util.error;
+
+import java.io.IOException;
+
+public class StringLengthExceededException extends IOException {
+ public StringLengthExceededException(long length) {
+ super(String.format("String is too long (%d Bytes)", length));
+ }
+}