blob: 54cf4ff1cc24ccbfc3a4d4e714571fcf2abec20d (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
|
package com.keuin.psmb4j;
import com.keuin.psmb4j.error.CommandFailureException;
import com.keuin.psmb4j.error.ServerMisbehaveException;
import com.keuin.psmb4j.util.InputStreamUtils;
import com.keuin.psmb4j.util.StringUtils;
import java.io.IOException;
public class PublishClient extends BaseClient {
/**
* Create a client in PUBLISH mode.
* @param host server host.
* @param port server port.
*/
public PublishClient(String host, int port) {
super(host, port);
}
public void setPublish(String topicId) throws IOException, CommandFailureException {
if (!StringUtils.isPureAscii(topicId)) {
throw new IllegalArgumentException("topicId cannot be encoded with ASCII");
}
setSocketTimeout(DEFAULT_SOCKET_TIMEOUT_MILLIS);
synchronized (socketWriteLock) {
os.writeBytes("PUB");
os.writeBytes(topicId);
os.writeByte('\0');
os.flush();
}
synchronized (socketReadLock) {
var response = InputStreamUtils.readCString(is, MAX_CSTRING_LENGTH);
if (response.equals("FAILED")) {
var errorMessage = InputStreamUtils.readCString(is, MAX_CSTRING_LENGTH);
throw new CommandFailureException("Publish failed: " + errorMessage);
} else if (!response.equals("OK")) {
throw new ServerMisbehaveException("Unexpected response: " + response);
}
}
}
/**
* Publish a message.
* Note that this method is not thread-safe.
* @param message the message to publish.
* @throws CommandFailureException If a command was rejected by the server.
* @throws IOException if an IO error occurred.
*/
public void publish(byte[] message) throws CommandFailureException, IOException {
synchronized (this.socketWriteLock) {
os.writeBytes("MSG");
os.writeLong(message.length);
os.write(message);
os.flush();
}
}
}
|