diff options
author | Keuin <[email protected]> | 2022-09-07 02:48:46 +0800 |
---|---|---|
committer | Keuin <[email protected]> | 2022-09-07 02:48:46 +0800 |
commit | 8e15d802865ed57db0018c15ea5559c8bd44c01f (patch) | |
tree | 48f4632a1ad044bd7f7f8da3ebe2bb03ab4ca6fe /danmaku | |
parent | 88234ca8fffc4e120adbe0d38071b625ad2f43c7 (diff) |
First working version. Just a POC.
Diffstat (limited to 'danmaku')
-rw-r--r-- | danmaku/client.go | 136 | ||||
-rw-r--r-- | danmaku/dmpkg/auth.go | 52 | ||||
-rw-r--r-- | danmaku/dmpkg/decode.go | 44 | ||||
-rw-r--r-- | danmaku/dmpkg/frameio.go | 15 | ||||
-rw-r--r-- | danmaku/dmpkg/operation.go | 28 | ||||
-rw-r--r-- | danmaku/dmpkg/package.go | 111 | ||||
-rw-r--r-- | danmaku/dmpkg/ping.go | 8 | ||||
-rw-r--r-- | danmaku/dmpkg/raw.go | 47 |
8 files changed, 441 insertions, 0 deletions
diff --git a/danmaku/client.go b/danmaku/client.go new file mode 100644 index 0000000..7745761 --- /dev/null +++ b/danmaku/client.go @@ -0,0 +1,136 @@ +/* +This file implements the background WebSocket messaging channel in Bilibili webui. +Server send livestream start and stop messages via this channel. +Note: In this file we manage the concrete WebSocket connection. +The Bilibili WebSocket channel protocol is decoupled and implemented in package `dmpkg`. +*/ +package danmaku + +import ( + "bilibili-livestream-archiver/common" + "bilibili-livestream-archiver/danmaku/dmpkg" + "context" + "fmt" + + "nhooyr.io/websocket" +) + +// Bilibili uses only binary WebSocket messages +const kBilibiliWebSocketMessageType = websocket.MessageBinary + +type DanmakuClient struct { + ws *websocket.Conn + wsio wsDatagramIO +} + +type DanmakuMessageType int + +// wsDatagramIO wraps websocket into a datagram I/O, +// since Bilibili uses only binary data, +// which is effectively a datagram communication. +type wsDatagramIO struct { + ws *websocket.Conn + ctx context.Context +} + +func (w *wsDatagramIO) Consume(data []byte) error { + return w.ws.Write(w.ctx, kBilibiliWebSocketMessageType, data) +} + +func (w *wsDatagramIO) Get() (data []byte, err error) { + typ, data, err := w.ws.Read(w.ctx) + if err != nil { + return + } + if typ != kBilibiliWebSocketMessageType { + err = fmt.Errorf("invalid message type: expected a binary WebSocket message, however got %v", typ.String()) + } + return +} + +func NewDanmakuClient() DanmakuClient { + return DanmakuClient{ + ws: nil, + } +} + +func (d *DanmakuClient) Connect(ctx context.Context, url string) error { + // thread unsafe + + // dial + if d.ws != nil { + return fmt.Errorf("already connected") + } + ws, _, err := websocket.Dial(ctx, url, nil) + if err != nil { + return fmt.Errorf("failed to establish WebSocket connection: %w", err) + } + d.ws = ws + + // init wsio + d.wsio = wsDatagramIO{ + ws: ws, + ctx: ctx, + } + + return nil +} + +func (d *DanmakuClient) Disconnect() error { + // thread unsafe + ws := d.ws + if ws == nil { + return nil + } + d.ws = nil + d.wsio = wsDatagramIO{} + return ws.Close(websocket.StatusInternalError, "disconnected") +} + +func (d *DanmakuClient) Authenticate(roomId common.RoomId, authKey string) error { + pkg := dmpkg.NewAuth(dmpkg.ProtoPlainJson, roomId, authKey) + data, err := pkg.Marshal() + if err != nil { + return fmt.Errorf("exchange marshal failed: %w", err) + } + err = d.wsio.Consume(data) + if err != nil { + return fmt.Errorf("channel write failed: %w", err) + } + // read server response + resp, err := d.wsio.Get() + if err != nil { + return err + } + respEx, err := dmpkg.DecodeExchange(resp) + if err != nil { + return fmt.Errorf("server danmaku exchange decode error: %w", err) + } + ok, err := dmpkg.IsAuthOk(respEx) + if !ok { + return fmt.Errorf("danmaku auth failed: %w", err) + } + return nil +} + +func (d *DanmakuClient) Heartbeat() error { + pkg := dmpkg.NewPing() + data, err := pkg.Marshal() + if err != nil { + return fmt.Errorf("exchange marshal failed: %w", err) + } + err = d.wsio.Consume(data) + if err != nil { + return fmt.Errorf("channel write failed: %w", err) + } + return nil +} + +// ReadExchange read and decode some kind of exchanges which we are interested +func (d *DanmakuClient) ReadExchange() (dmpkg.DanmakuExchange, error) { + data, err := d.wsio.Get() + if err != nil { + return dmpkg.DanmakuExchange{}, fmt.Errorf("failed to read danmaku datagram from server: %w", err) + } + return dmpkg.DecodeExchange(data) +} diff --git a/danmaku/dmpkg/auth.go b/danmaku/dmpkg/auth.go new file mode 100644 index 0000000..5a205ad --- /dev/null +++ b/danmaku/dmpkg/auth.go @@ -0,0 +1,52 @@ +/* +This file implements the auth exchange. +When Bilibili live client established the WebSocket connection successfully, +it sends this message at first. The server then responses a OpConnectOk exchange with body `{"code":0}` which indicates success. +*/ +package dmpkg + +import ( + "bilibili-livestream-archiver/common" + "encoding/json" + "fmt" +) + +type authInfo struct { + UID uint64 `json:"uid"` + RoomId uint64 `json:"roomid"` + ProtoVer int `json:"protover"` + Platform string `json:"platform"` + Type int `json:"type"` + Key string `json:"key"` +} + +// NewAuth creates a new authentication exchange. +func NewAuth(protocol ProtocolVer, roomId common.RoomId, authKey string) (exc DanmakuExchange) { + exc, _ = NewPlainExchange(OpConnect, authInfo{ + UID: kUidGuest, + RoomId: uint64(roomId), + ProtoVer: int(protocol), + Platform: kPlatformWeb, + Type: kAuthTypeDefault, + Key: authKey, + }) + return +} + +func IsAuthOk(serverResponse DanmakuExchange) (bool, error) { + if op := serverResponse.Operation; op != OpConnectOk { + return false, fmt.Errorf("server operation is not OpConnectOk: %w", op) + } + var body struct { + Code int `json:"code"` + } + body.Code = 1 + err := json.Unmarshal(serverResponse.Body, &body) + if err != nil { + return false, fmt.Errorf("JSON decode error: %w", err) + } + if c := body.Code; c != 0 { + return false, fmt.Errorf("server response code is non-zero: %w", c) + } + return true, nil +} diff --git a/danmaku/dmpkg/decode.go b/danmaku/dmpkg/decode.go new file mode 100644 index 0000000..7d9f796 --- /dev/null +++ b/danmaku/dmpkg/decode.go @@ -0,0 +1,44 @@ +package dmpkg + +import ( + "bytes" + "fmt" + "github.com/lunixbochs/struc" +) + +func DecodeExchange(data []byte) (exc DanmakuExchange, err error) { + if ln := len(data); ln < kHeaderLength { + err = fmt.Errorf("incomplete datagram: length = %v < %v", ln, kHeaderLength) + return + } + + // unpack header + var exchangeHeader DanmakuExchangeHeader + err = struc.Unpack(bytes.NewReader(data[:kHeaderLength]), &exchangeHeader) + if err != nil { + err = fmt.Errorf("cannot unpack exchange header: %w", err) + return + } + headerLength := exchangeHeader.HeaderLength + + // validate header length, fail fast if not match + if headerLength != kHeaderLength { + err = fmt.Errorf("invalid header length, "+ + "the protocol implementation might be obsolete: %v != %v", headerLength, kHeaderLength) + return + } + + // special process + // TODO decouple this + // The server OpHeartbeatAck contains an extra 4-bytes header entry in the body, maybe a heat value + var body []byte + // copy body + body = make([]byte, exchangeHeader.Length-uint32(headerLength)) + copy(body, data[headerLength:]) + + exc = DanmakuExchange{ + DanmakuExchangeHeader: exchangeHeader, + Body: body, + } + return +} diff --git a/danmaku/dmpkg/frameio.go b/danmaku/dmpkg/frameio.go new file mode 100644 index 0000000..2e7693a --- /dev/null +++ b/danmaku/dmpkg/frameio.go @@ -0,0 +1,15 @@ +/* +Since Bilibili only uses WebSocket binary message, +the transportation is effectively a datagram channel. +So we use a consumer-supplier abstraction to decouple +the real protocol with WebSocket stuff. +*/ +package dmpkg + +type Consumer[T any] interface { + Consume(value T) error +} + +type Supplier[T any] interface { + Get() (T, error) +} diff --git a/danmaku/dmpkg/operation.go b/danmaku/dmpkg/operation.go new file mode 100644 index 0000000..d3c13f4 --- /dev/null +++ b/danmaku/dmpkg/operation.go @@ -0,0 +1,28 @@ +package dmpkg + +type Operation uint32 + +// Operations +const ( + OpHeartbeat Operation = 2 + OpHeartbeatAck Operation = 3 + OpLayer7Data Operation = 5 + OpConnect Operation = 7 + OpConnectOk Operation = 8 +) + +var opStringMap = map[Operation]string{ + OpHeartbeat: "HeartBeat", + OpHeartbeatAck: "HeartBeatAck", + OpLayer7Data: "AppData", + OpConnect: "Connect", + OpConnectOk: "ConnectOk", +} + +func (o Operation) String() string { + s, exists := opStringMap[o] + if !exists { + return "<Unknown>" + } + return s +} diff --git a/danmaku/dmpkg/package.go b/danmaku/dmpkg/package.go new file mode 100644 index 0000000..3175a18 --- /dev/null +++ b/danmaku/dmpkg/package.go @@ -0,0 +1,111 @@ +package dmpkg + +import ( + "bytes" + "compress/zlib" + "fmt" + "github.com/andybalholm/brotli" + "github.com/lunixbochs/struc" + "io" +) + +type DanmakuExchangeHeader struct { + // Length total remaining bytes of this exchange, excluding `Length` itself + Length uint32 + // HeaderLength = Length - len(Body) + 4, always equals to 16 + HeaderLength uint16 + ProtocolVer ProtocolVer + Operation Operation + // SequenceId is always 1 + SequenceId uint32 +} + +// DanmakuExchange represents an actual message sent from client or server. This is an atomic unit. +type DanmakuExchange struct { + DanmakuExchangeHeader + Body []byte +} + +func (e *DanmakuExchange) String() string { + return fmt.Sprintf("DanmakuExchange{length=%v, protocol=%v, operation=%v, body=%v}", + e.Length, e.ProtocolVer, e.Operation, e.Body) +} + +const kHeaderLength = 16 +const kSequenceId = 1 + +type ProtocolVer uint16 + +const ( + // ProtoPlainJson the body is plain JSON text + ProtoPlainJson ProtocolVer = 0 + // ProtoMinimal the body is uint32 watcher count (big endian) + ProtoMinimal ProtocolVer = 1 + // ProtoZlib the body is a zlib compressed package + ProtoZlib ProtocolVer = 2 + // ProtoBrotli the body is a brotli compressed package + ProtoBrotli ProtocolVer = 3 +) + +const kUidGuest = 0 +const kPlatformWeb = "web" +const kAuthTypeDefault = 2 // magic number, not sure what does it mean + +func (e *DanmakuExchange) Marshal() (data []byte, err error) { + var buffer bytes.Buffer + // only unpack header with struc, since it does not support indirect variable field length calculation + err = struc.Pack(&buffer, &e.DanmakuExchangeHeader) + if err != nil { + err = fmt.Errorf("cannot pack an exchange into binary form: %w", err) + return + } + data = buffer.Bytes() + data = append(data, e.Body...) + return +} + +// Inflate decompresses the body if it is compressed +func (e *DanmakuExchange) Inflate() (ret DanmakuExchange, err error) { + switch e.ProtocolVer { + case ProtoMinimal: + fallthrough + case ProtoPlainJson: + ret = *e + case ProtoBrotli: + var data []byte + rd := brotli.NewReader(bytes.NewReader(e.Body)) + data, err = io.ReadAll(rd) + if err != nil { + err = fmt.Errorf("cannot decompress exchange body: %w", err) + return + } + var nestedExchange DanmakuExchange + nestedExchange, err = DecodeExchange(data) + if err != nil { + err = fmt.Errorf("cannot decode nested exchange: %w", err) + return + } + return nestedExchange.Inflate() + case ProtoZlib: + var data []byte + var rd io.ReadCloser + rd, err = zlib.NewReader(bytes.NewReader(e.Body)) + if err != nil { + err = fmt.Errorf("cannot create zlib reader: %w", err) + return + } + data, err = io.ReadAll(rd) + if err != nil { + err = fmt.Errorf("cannot decompress exchange body: %w", err) + return + } + var nestedExchange DanmakuExchange + nestedExchange, err = DecodeExchange(data) + if err != nil { + err = fmt.Errorf("cannot decode nested exchange: %w", err) + return + } + return nestedExchange.Inflate() + } + return +} diff --git a/danmaku/dmpkg/ping.go b/danmaku/dmpkg/ping.go new file mode 100644 index 0000000..4596d49 --- /dev/null +++ b/danmaku/dmpkg/ping.go @@ -0,0 +1,8 @@ +package dmpkg + +// NewPing construct a new PING exahange. +func NewPing() (exc DanmakuExchange) { + // compliant with Bilibili webapp behavior + exc, _ = NewPlainExchange(OpHeartbeat, "[object Object]") + return +} diff --git a/danmaku/dmpkg/raw.go b/danmaku/dmpkg/raw.go new file mode 100644 index 0000000..1cbd12e --- /dev/null +++ b/danmaku/dmpkg/raw.go @@ -0,0 +1,47 @@ +package dmpkg + +import ( + "encoding/json" + "fmt" + "math" +) + +const kMaxBodyLength = math.MaxUint32 - kHeaderLength + +// NewPlainExchange creates a new exchange with raw body specified. +// body: a struct or a raw string +func NewPlainExchange(operation Operation, body interface{}) (exc DanmakuExchange, err error) { + var bodyData []byte + + // convert body to []byte + if _, ok := body.(string); ok { + // a string + bodyData = []byte(body.(string)) + } else if _, ok := body.([]byte); ok { + // a []byte + copy(bodyData, body.([]byte)) + } else { + // a JSON struct + bodyData, err = json.Marshal(body) + if err != nil { + return + } + } + + length := uint64(kHeaderLength + len(bodyData)) + if length > kMaxBodyLength { + err = fmt.Errorf("body is too large (> %d)", kMaxBodyLength) + return + } + exc = DanmakuExchange{ + DanmakuExchangeHeader: DanmakuExchangeHeader{ + Length: uint32(length), + HeaderLength: kHeaderLength, + ProtocolVer: ProtoPlainJson, + Operation: operation, + SequenceId: kSequenceId, + }, + Body: bodyData, + } + return +} |