summaryrefslogtreecommitdiff
path: root/protocol/client_mode.go
diff options
context:
space:
mode:
authorKeuin <[email protected]>2024-03-08 13:39:53 +0800
committerKeuin <[email protected]>2024-03-08 13:40:58 +0800
commit2da3d40c1a08d27f1cfa5e62657f9940c94cc758 (patch)
tree14aec57c64271db5bba33b8bc85f399e499edda8 /protocol/client_mode.go
Initial commitv0.1.0
Diffstat (limited to 'protocol/client_mode.go')
-rw-r--r--protocol/client_mode.go103
1 files changed, 103 insertions, 0 deletions
diff --git a/protocol/client_mode.go b/protocol/client_mode.go
new file mode 100644
index 0000000..b9ca057
--- /dev/null
+++ b/protocol/client_mode.go
@@ -0,0 +1,103 @@
+package protocol
+
+import (
+ "bufio"
+ "errors"
+ "fmt"
+ "io"
+)
+
+type ModeType uint16
+
+const (
+ ModePublish ModeType = 1
+ ModeSubscribe ModeType = 2
+)
+
+type ClientMode interface {
+ Type() ModeType
+ // Consume a message from remote peer. This method should panic if not supported.
+ Consume(reader io.Reader, length int64)
+ sendSelectionRequest(c *Client) error
+}
+
+func Publish(topicID string) ClientMode {
+ return modePublish{
+ topicID: topicID,
+ }
+}
+
+type modePublish struct {
+ topicID string
+}
+
+func (m modePublish) Consume(io.Reader, int64) {
+ // it's the user's responsibility to filter out unsuitable messages
+ panic(errors.New("remote peer should not send data message to publisher"))
+}
+
+func (m modePublish) Type() ModeType {
+ return ModePublish
+}
+
+func (m modePublish) sendSelectionRequest(c *Client) error {
+ if m.topicID == "" {
+ panic(fmt.Errorf("empty subscription topic ID"))
+ }
+ return c.writeFlush("PUB" + m.topicID + "\x00")
+}
+
+func Subscribe(topicIDPattern string, messageConsumer func(reader io.Reader, length int64)) ClientMode {
+ return modeSubscribe{
+ topicIDPattern: topicIDPattern,
+ messageConsumer: messageConsumer,
+ }
+}
+
+type modeSubscribe struct {
+ topicIDPattern string
+ messageConsumer func(reader io.Reader, length int64)
+}
+
+func (m modeSubscribe) Consume(reader io.Reader, length int64) {
+ m.messageConsumer(reader, length)
+}
+
+func (m modeSubscribe) Type() ModeType {
+ return ModeSubscribe
+}
+
+func (m modeSubscribe) sendSelectionRequest(c *Client) error {
+ return c.writeFlush("SUB\x00\x00\x00\x00" + m.topicIDPattern + "\x00")
+}
+
+func readModeSelectionResponse(r *bufio.Reader) error {
+ msg, err := r.ReadString('\x00')
+ if err != nil {
+ return err
+ }
+ msg = msg[:len(msg)-1]
+ if msg == "OK" {
+ return nil
+ }
+ // failed or other protocol errors
+ if msg == "FAILED" {
+ msg2, err := r.ReadString('\x00')
+ if err != nil {
+ return err
+ }
+ msg += ": " + msg2[:len(msg2)-1]
+ }
+ return protocolError{
+ phase: PhaseModeSelection,
+ message: msg,
+ }
+}
+
+func (c *Client) SelectMode(mode ClientMode) error {
+ err := mode.sendSelectionRequest(c)
+ if err != nil {
+ return err
+ }
+ return readModeSelectionResponse(c.rx)
+}