1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
|
package protocol
import (
"bufio"
"errors"
"fmt"
"io"
)
type ModeType uint16
const (
ModePublish ModeType = 1
ModeSubscribe ModeType = 2
)
type ClientMode interface {
Type() ModeType
// Consume a message from remote peer. This method should panic if not supported.
Consume(reader io.Reader, length int64)
sendSelectionRequest(c *Client) error
}
func Publish(topicID string) ClientMode {
return modePublish{
topicID: topicID,
}
}
type modePublish struct {
topicID string
}
func (m modePublish) Consume(io.Reader, int64) {
// it's the user's responsibility to filter out unsuitable messages
panic(errors.New("remote peer should not send data message to publisher"))
}
func (m modePublish) Type() ModeType {
return ModePublish
}
func (m modePublish) sendSelectionRequest(c *Client) error {
if m.topicID == "" {
panic(fmt.Errorf("empty subscription topic ID"))
}
return c.writeFlush("PUB" + m.topicID + "\x00")
}
func Subscribe(topicIDPattern string, messageConsumer func(reader io.Reader, length int64)) ClientMode {
return modeSubscribe{
topicIDPattern: topicIDPattern,
messageConsumer: messageConsumer,
}
}
type modeSubscribe struct {
topicIDPattern string
messageConsumer func(reader io.Reader, length int64)
}
func (m modeSubscribe) Consume(reader io.Reader, length int64) {
m.messageConsumer(reader, length)
}
func (m modeSubscribe) Type() ModeType {
return ModeSubscribe
}
func (m modeSubscribe) sendSelectionRequest(c *Client) error {
return c.writeFlush("SUB\x00\x00\x00\x00" + m.topicIDPattern + "\x00")
}
func readModeSelectionResponse(r *bufio.Reader) error {
msg, err := r.ReadString('\x00')
if err != nil {
return err
}
msg = msg[:len(msg)-1]
if msg == "OK" {
return nil
}
// failed or other protocol errors
if msg == "FAILED" {
msg2, err := r.ReadString('\x00')
if err != nil {
return err
}
msg += ": " + msg2[:len(msg2)-1]
}
return protocolError{
phase: PhaseModeSelection,
message: msg,
}
}
func (c *Client) SelectMode(mode ClientMode) error {
err := mode.sendSelectionRequest(c)
if err != nil {
return err
}
return readModeSelectionResponse(c.rx)
}
|