diff options
author | Keuin <[email protected]> | 2024-03-08 13:39:53 +0800 |
---|---|---|
committer | Keuin <[email protected]> | 2024-03-08 13:42:11 +0800 |
commit | 6ee1bbbd1c491f1a6972fd62cf8ab652d4e8a942 (patch) | |
tree | 14aec57c64271db5bba33b8bc85f399e499edda8 /protocol/client_mode.go |
Diffstat (limited to 'protocol/client_mode.go')
-rw-r--r-- | protocol/client_mode.go | 103 |
1 files changed, 103 insertions, 0 deletions
diff --git a/protocol/client_mode.go b/protocol/client_mode.go new file mode 100644 index 0000000..b9ca057 --- /dev/null +++ b/protocol/client_mode.go @@ -0,0 +1,103 @@ +package protocol + +import ( + "bufio" + "errors" + "fmt" + "io" +) + +type ModeType uint16 + +const ( + ModePublish ModeType = 1 + ModeSubscribe ModeType = 2 +) + +type ClientMode interface { + Type() ModeType + // Consume a message from remote peer. This method should panic if not supported. + Consume(reader io.Reader, length int64) + sendSelectionRequest(c *Client) error +} + +func Publish(topicID string) ClientMode { + return modePublish{ + topicID: topicID, + } +} + +type modePublish struct { + topicID string +} + +func (m modePublish) Consume(io.Reader, int64) { + // it's the user's responsibility to filter out unsuitable messages + panic(errors.New("remote peer should not send data message to publisher")) +} + +func (m modePublish) Type() ModeType { + return ModePublish +} + +func (m modePublish) sendSelectionRequest(c *Client) error { + if m.topicID == "" { + panic(fmt.Errorf("empty subscription topic ID")) + } + return c.writeFlush("PUB" + m.topicID + "\x00") +} + +func Subscribe(topicIDPattern string, messageConsumer func(reader io.Reader, length int64)) ClientMode { + return modeSubscribe{ + topicIDPattern: topicIDPattern, + messageConsumer: messageConsumer, + } +} + +type modeSubscribe struct { + topicIDPattern string + messageConsumer func(reader io.Reader, length int64) +} + +func (m modeSubscribe) Consume(reader io.Reader, length int64) { + m.messageConsumer(reader, length) +} + +func (m modeSubscribe) Type() ModeType { + return ModeSubscribe +} + +func (m modeSubscribe) sendSelectionRequest(c *Client) error { + return c.writeFlush("SUB\x00\x00\x00\x00" + m.topicIDPattern + "\x00") +} + +func readModeSelectionResponse(r *bufio.Reader) error { + msg, err := r.ReadString('\x00') + if err != nil { + return err + } + msg = msg[:len(msg)-1] + if msg == "OK" { + return nil + } + // failed or other protocol errors + if msg == "FAILED" { + msg2, err := r.ReadString('\x00') + if err != nil { + return err + } + msg += ": " + msg2[:len(msg2)-1] + } + return protocolError{ + phase: PhaseModeSelection, + message: msg, + } +} + +func (c *Client) SelectMode(mode ClientMode) error { + err := mode.sendSelectionRequest(c) + if err != nil { + return err + } + return readModeSelectionResponse(c.rx) +} |