summaryrefslogtreecommitdiff
path: root/protocol/client_mode.go
blob: b9ca05753cd2c1ed5e584a6df9cdc7893fa8e9c7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package protocol

import (
	"bufio"
	"errors"
	"fmt"
	"io"
)

type ModeType uint16

const (
	ModePublish   ModeType = 1
	ModeSubscribe ModeType = 2
)

type ClientMode interface {
	Type() ModeType
	// Consume a message from remote peer. This method should panic if not supported.
	Consume(reader io.Reader, length int64)
	sendSelectionRequest(c *Client) error
}

func Publish(topicID string) ClientMode {
	return modePublish{
		topicID: topicID,
	}
}

type modePublish struct {
	topicID string
}

func (m modePublish) Consume(io.Reader, int64) {
	// it's the user's responsibility to filter out unsuitable messages
	panic(errors.New("remote peer should not send data message to publisher"))
}

func (m modePublish) Type() ModeType {
	return ModePublish
}

func (m modePublish) sendSelectionRequest(c *Client) error {
	if m.topicID == "" {
		panic(fmt.Errorf("empty subscription topic ID"))
	}
	return c.writeFlush("PUB" + m.topicID + "\x00")
}

func Subscribe(topicIDPattern string, messageConsumer func(reader io.Reader, length int64)) ClientMode {
	return modeSubscribe{
		topicIDPattern:  topicIDPattern,
		messageConsumer: messageConsumer,
	}
}

type modeSubscribe struct {
	topicIDPattern  string
	messageConsumer func(reader io.Reader, length int64)
}

func (m modeSubscribe) Consume(reader io.Reader, length int64) {
	m.messageConsumer(reader, length)
}

func (m modeSubscribe) Type() ModeType {
	return ModeSubscribe
}

func (m modeSubscribe) sendSelectionRequest(c *Client) error {
	return c.writeFlush("SUB\x00\x00\x00\x00" + m.topicIDPattern + "\x00")
}

func readModeSelectionResponse(r *bufio.Reader) error {
	msg, err := r.ReadString('\x00')
	if err != nil {
		return err
	}
	msg = msg[:len(msg)-1]
	if msg == "OK" {
		return nil
	}
	// failed or other protocol errors
	if msg == "FAILED" {
		msg2, err := r.ReadString('\x00')
		if err != nil {
			return err
		}
		msg += ": " + msg2[:len(msg2)-1]
	}
	return protocolError{
		phase:   PhaseModeSelection,
		message: msg,
	}
}

func (c *Client) SelectMode(mode ClientMode) error {
	err := mode.sendSelectionRequest(c)
	if err != nil {
		return err
	}
	return readModeSelectionResponse(c.rx)
}