summaryrefslogtreecommitdiff
path: root/src/main/java/com/keuin/psmb4j/SubscribeClient.java
blob: 2c0083dddd2e6e72e6659d5a328842a50347d1b0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package com.keuin.psmb4j;

import com.keuin.psmb4j.error.CommandFailureException;
import com.keuin.psmb4j.error.ServerMisbehaveException;
import com.keuin.psmb4j.util.InputStreamUtils;
import com.keuin.psmb4j.util.StringUtils;
import org.jetbrains.annotations.NotNull;

import java.io.IOException;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.function.Consumer;

public class SubscribeClient extends BaseClient {

    private final String pattern;
    private final long subscriberId;
    private final int keepAliveIntervalMillis;

    private volatile boolean isRunning = false;

    /**
     * Create a client in SUBSCRIBE mode.
     * @param host the host to connect to.
     * @param port the port to connect to.
     * @param pattern the pattern to subscribe
     * @param keepAliveIntervalMillis interval between sending keep-alive messages. If <=0, keep-alive is disabled.
     * @param subscriberId an integer identifying a subscriber.
     */
    public SubscribeClient(String host, int port, String pattern, int keepAliveIntervalMillis, long subscriberId) {
        super(host, port);
        this.pattern = pattern;
        this.subscriberId = subscriberId;
        this.keepAliveIntervalMillis = keepAliveIntervalMillis;
        if (keepAliveIntervalMillis < 3000) {
            throw new IllegalArgumentException("Keep alive interval is too small!");
        }
    }

    public void setSubscribe(String pattern, long subscriberId) throws IOException, CommandFailureException {
        if (!StringUtils.isPureAscii(pattern)) {
            throw new IllegalArgumentException("pattern cannot be encoded in ASCII");
        }
        os.writeBytes("SUB");
        os.writeInt(1); // options
        os.writeBytes(pattern);
        os.writeByte('\0');
        os.writeLong(subscriberId);
        os.flush();

        var response = InputStreamUtils.readCString(is, MAX_CSTRING_LENGTH);
        if (response.equals("FAILED")) {
            var errorMessage = InputStreamUtils.readCString(is, MAX_CSTRING_LENGTH);
            throw new CommandFailureException("Subscribe failed: " + errorMessage);
        } else if (!response.equals("OK")) {
            throw new ServerMisbehaveException("Unexpected response: " + response);
        }
    }

    /**
     * Start subscribing.
     * This method is blocking, the callback will be called in the same thread.
     * This method cannot run simultaneously by more than one thread,
     * or an {@link IllegalStateException} will be thrown.
     * @param callback the callback which accepts message from server.
     * @throws CommandFailureException If a command was rejected by the server.
     * @throws IOException if an IO error occurred. In this case,
     * it is usually unsafe to retry this function, since the internal socket is probably broken.
     * You should use another new instance in order to reconnect.
     */
    public void subscribe(@NotNull Consumer<ByteBuffer> callback) throws CommandFailureException, IOException {
        Objects.requireNonNull(callback);
        if (isRunning) {
            throw new IllegalStateException();
        }
        try {
            while (true) {
                try {
                    // only timeout when reading the command
                    // in other reading, we use default timeout
                    if (keepAliveIntervalMillis > 0) {
                        setSocketTimeout(keepAliveIntervalMillis);
                    }
                    var command = new String(InputStreamUtils.readBytes(is, 3), StandardCharsets.US_ASCII);
                    if (keepAliveIntervalMillis > 0) {
                        setSocketTimeout(DEFAULT_SOCKET_TIMEOUT_MILLIS);
                    }
                    if (command.equals("MSG")) {
                        var length = is.readLong();
                        if ((length & 0xffffffff00000000L) != 0) {
                            throw new RuntimeException(String.format("Client implementation does not support " +
                                    "such long payload (%s Bytes)", Long.toUnsignedString(length)));
                        }
                        var message = InputStreamUtils.readBytes(is, (int) length);
                        callback.accept(ByteBuffer.wrap(message));
                    } else if (command.equals("NOP")) {
                        os.writeBytes("NIL");
                        os.flush();
                    } else if (command.equals("BYE")) {
                        break;
                    } else if (!command.equals("NIL")) {
                        throw new ServerMisbehaveException("Illegal command from server: " + command);
                    }
                } catch (SocketTimeoutException e) {
                    // lock is unnecessary, since no other readers are present
                    super.keepAlive();
                }
            }
        } finally {
            isRunning = false;
            disconnect();
        }
    }

    @Override
    public void keepAlive() {
        throw new RuntimeException("Manual keepalive in this class is not allowed.");
    }
}