summaryrefslogtreecommitdiff
path: root/src/main/java/com/keuin/psmb4j/PublishClient.java
blob: 54cf4ff1cc24ccbfc3a4d4e714571fcf2abec20d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package com.keuin.psmb4j;

import com.keuin.psmb4j.error.CommandFailureException;
import com.keuin.psmb4j.error.ServerMisbehaveException;
import com.keuin.psmb4j.util.InputStreamUtils;
import com.keuin.psmb4j.util.StringUtils;

import java.io.IOException;

public class PublishClient extends BaseClient {

    /**
     * Create a client in PUBLISH mode.
     * @param host server host.
     * @param port server port.
     */
    public PublishClient(String host, int port) {
        super(host, port);
    }

    public void setPublish(String topicId) throws IOException, CommandFailureException {
        if (!StringUtils.isPureAscii(topicId)) {
            throw new IllegalArgumentException("topicId cannot be encoded with ASCII");
        }
        setSocketTimeout(DEFAULT_SOCKET_TIMEOUT_MILLIS);
        synchronized (socketWriteLock) {
            os.writeBytes("PUB");
            os.writeBytes(topicId);
            os.writeByte('\0');
            os.flush();
        }

        synchronized (socketReadLock) {
            var response = InputStreamUtils.readCString(is, MAX_CSTRING_LENGTH);
            if (response.equals("FAILED")) {
                var errorMessage = InputStreamUtils.readCString(is, MAX_CSTRING_LENGTH);
                throw new CommandFailureException("Publish failed: " + errorMessage);
            } else if (!response.equals("OK")) {
                throw new ServerMisbehaveException("Unexpected response: " + response);
            }
        }
    }

    /**
     * Publish a message.
     * Note that this method is not thread-safe.
     * @param message the message to publish.
     * @throws CommandFailureException If a command was rejected by the server.
     * @throws IOException if an IO error occurred.
     */
    public void publish(byte[] message) throws CommandFailureException, IOException {
        synchronized (this.socketWriteLock) {
            os.writeBytes("MSG");
            os.writeLong(message.length);
            os.write(message);
            os.flush();
        }
    }
}