diff options
author | Keuin <[email protected]> | 2022-02-04 22:54:22 +0800 |
---|---|---|
committer | Keuin <[email protected]> | 2022-02-04 22:54:22 +0800 |
commit | dfa163e5a3cffa46f0241210f2c8be1f8e298d7a (patch) | |
tree | 72db589d79bcc3bbca45c3a34a6dac71b2f55526 /src/main/java/com/keuin/psmb4j/PublishClient.java | |
parent | 12ddbba66e6f2585e59d05d1782c0e8ce9fe6146 (diff) |
Add psmb support.
Diffstat (limited to 'src/main/java/com/keuin/psmb4j/PublishClient.java')
-rw-r--r-- | src/main/java/com/keuin/psmb4j/PublishClient.java | 59 |
1 files changed, 59 insertions, 0 deletions
diff --git a/src/main/java/com/keuin/psmb4j/PublishClient.java b/src/main/java/com/keuin/psmb4j/PublishClient.java new file mode 100644 index 0000000..54cf4ff --- /dev/null +++ b/src/main/java/com/keuin/psmb4j/PublishClient.java @@ -0,0 +1,59 @@ +package com.keuin.psmb4j; + +import com.keuin.psmb4j.error.CommandFailureException; +import com.keuin.psmb4j.error.ServerMisbehaveException; +import com.keuin.psmb4j.util.InputStreamUtils; +import com.keuin.psmb4j.util.StringUtils; + +import java.io.IOException; + +public class PublishClient extends BaseClient { + + /** + * Create a client in PUBLISH mode. + * @param host server host. + * @param port server port. + */ + public PublishClient(String host, int port) { + super(host, port); + } + + public void setPublish(String topicId) throws IOException, CommandFailureException { + if (!StringUtils.isPureAscii(topicId)) { + throw new IllegalArgumentException("topicId cannot be encoded with ASCII"); + } + setSocketTimeout(DEFAULT_SOCKET_TIMEOUT_MILLIS); + synchronized (socketWriteLock) { + os.writeBytes("PUB"); + os.writeBytes(topicId); + os.writeByte('\0'); + os.flush(); + } + + synchronized (socketReadLock) { + var response = InputStreamUtils.readCString(is, MAX_CSTRING_LENGTH); + if (response.equals("FAILED")) { + var errorMessage = InputStreamUtils.readCString(is, MAX_CSTRING_LENGTH); + throw new CommandFailureException("Publish failed: " + errorMessage); + } else if (!response.equals("OK")) { + throw new ServerMisbehaveException("Unexpected response: " + response); + } + } + } + + /** + * Publish a message. + * Note that this method is not thread-safe. + * @param message the message to publish. + * @throws CommandFailureException If a command was rejected by the server. + * @throws IOException if an IO error occurred. + */ + public void publish(byte[] message) throws CommandFailureException, IOException { + synchronized (this.socketWriteLock) { + os.writeBytes("MSG"); + os.writeLong(message.length); + os.write(message); + os.flush(); + } + } +} |