From 8e15d802865ed57db0018c15ea5559c8bd44c01f Mon Sep 17 00:00:00 2001 From: Keuin Date: Wed, 7 Sep 2022 02:48:46 +0800 Subject: First working version. Just a POC. --- danmaku/dmpkg/auth.go | 52 +++++++++++++++++++++ danmaku/dmpkg/decode.go | 44 ++++++++++++++++++ danmaku/dmpkg/frameio.go | 15 ++++++ danmaku/dmpkg/operation.go | 28 ++++++++++++ danmaku/dmpkg/package.go | 111 +++++++++++++++++++++++++++++++++++++++++++++ danmaku/dmpkg/ping.go | 8 ++++ danmaku/dmpkg/raw.go | 47 +++++++++++++++++++ 7 files changed, 305 insertions(+) create mode 100644 danmaku/dmpkg/auth.go create mode 100644 danmaku/dmpkg/decode.go create mode 100644 danmaku/dmpkg/frameio.go create mode 100644 danmaku/dmpkg/operation.go create mode 100644 danmaku/dmpkg/package.go create mode 100644 danmaku/dmpkg/ping.go create mode 100644 danmaku/dmpkg/raw.go (limited to 'danmaku/dmpkg') diff --git a/danmaku/dmpkg/auth.go b/danmaku/dmpkg/auth.go new file mode 100644 index 0000000..5a205ad --- /dev/null +++ b/danmaku/dmpkg/auth.go @@ -0,0 +1,52 @@ +/* +This file implements the auth exchange. +When Bilibili live client established the WebSocket connection successfully, +it sends this message at first. The server then responses a OpConnectOk exchange with body `{"code":0}` which indicates success. +*/ +package dmpkg + +import ( + "bilibili-livestream-archiver/common" + "encoding/json" + "fmt" +) + +type authInfo struct { + UID uint64 `json:"uid"` + RoomId uint64 `json:"roomid"` + ProtoVer int `json:"protover"` + Platform string `json:"platform"` + Type int `json:"type"` + Key string `json:"key"` +} + +// NewAuth creates a new authentication exchange. +func NewAuth(protocol ProtocolVer, roomId common.RoomId, authKey string) (exc DanmakuExchange) { + exc, _ = NewPlainExchange(OpConnect, authInfo{ + UID: kUidGuest, + RoomId: uint64(roomId), + ProtoVer: int(protocol), + Platform: kPlatformWeb, + Type: kAuthTypeDefault, + Key: authKey, + }) + return +} + +func IsAuthOk(serverResponse DanmakuExchange) (bool, error) { + if op := serverResponse.Operation; op != OpConnectOk { + return false, fmt.Errorf("server operation is not OpConnectOk: %w", op) + } + var body struct { + Code int `json:"code"` + } + body.Code = 1 + err := json.Unmarshal(serverResponse.Body, &body) + if err != nil { + return false, fmt.Errorf("JSON decode error: %w", err) + } + if c := body.Code; c != 0 { + return false, fmt.Errorf("server response code is non-zero: %w", c) + } + return true, nil +} diff --git a/danmaku/dmpkg/decode.go b/danmaku/dmpkg/decode.go new file mode 100644 index 0000000..7d9f796 --- /dev/null +++ b/danmaku/dmpkg/decode.go @@ -0,0 +1,44 @@ +package dmpkg + +import ( + "bytes" + "fmt" + "github.com/lunixbochs/struc" +) + +func DecodeExchange(data []byte) (exc DanmakuExchange, err error) { + if ln := len(data); ln < kHeaderLength { + err = fmt.Errorf("incomplete datagram: length = %v < %v", ln, kHeaderLength) + return + } + + // unpack header + var exchangeHeader DanmakuExchangeHeader + err = struc.Unpack(bytes.NewReader(data[:kHeaderLength]), &exchangeHeader) + if err != nil { + err = fmt.Errorf("cannot unpack exchange header: %w", err) + return + } + headerLength := exchangeHeader.HeaderLength + + // validate header length, fail fast if not match + if headerLength != kHeaderLength { + err = fmt.Errorf("invalid header length, "+ + "the protocol implementation might be obsolete: %v != %v", headerLength, kHeaderLength) + return + } + + // special process + // TODO decouple this + // The server OpHeartbeatAck contains an extra 4-bytes header entry in the body, maybe a heat value + var body []byte + // copy body + body = make([]byte, exchangeHeader.Length-uint32(headerLength)) + copy(body, data[headerLength:]) + + exc = DanmakuExchange{ + DanmakuExchangeHeader: exchangeHeader, + Body: body, + } + return +} diff --git a/danmaku/dmpkg/frameio.go b/danmaku/dmpkg/frameio.go new file mode 100644 index 0000000..2e7693a --- /dev/null +++ b/danmaku/dmpkg/frameio.go @@ -0,0 +1,15 @@ +/* +Since Bilibili only uses WebSocket binary message, +the transportation is effectively a datagram channel. +So we use a consumer-supplier abstraction to decouple +the real protocol with WebSocket stuff. +*/ +package dmpkg + +type Consumer[T any] interface { + Consume(value T) error +} + +type Supplier[T any] interface { + Get() (T, error) +} diff --git a/danmaku/dmpkg/operation.go b/danmaku/dmpkg/operation.go new file mode 100644 index 0000000..d3c13f4 --- /dev/null +++ b/danmaku/dmpkg/operation.go @@ -0,0 +1,28 @@ +package dmpkg + +type Operation uint32 + +// Operations +const ( + OpHeartbeat Operation = 2 + OpHeartbeatAck Operation = 3 + OpLayer7Data Operation = 5 + OpConnect Operation = 7 + OpConnectOk Operation = 8 +) + +var opStringMap = map[Operation]string{ + OpHeartbeat: "HeartBeat", + OpHeartbeatAck: "HeartBeatAck", + OpLayer7Data: "AppData", + OpConnect: "Connect", + OpConnectOk: "ConnectOk", +} + +func (o Operation) String() string { + s, exists := opStringMap[o] + if !exists { + return "" + } + return s +} diff --git a/danmaku/dmpkg/package.go b/danmaku/dmpkg/package.go new file mode 100644 index 0000000..3175a18 --- /dev/null +++ b/danmaku/dmpkg/package.go @@ -0,0 +1,111 @@ +package dmpkg + +import ( + "bytes" + "compress/zlib" + "fmt" + "github.com/andybalholm/brotli" + "github.com/lunixbochs/struc" + "io" +) + +type DanmakuExchangeHeader struct { + // Length total remaining bytes of this exchange, excluding `Length` itself + Length uint32 + // HeaderLength = Length - len(Body) + 4, always equals to 16 + HeaderLength uint16 + ProtocolVer ProtocolVer + Operation Operation + // SequenceId is always 1 + SequenceId uint32 +} + +// DanmakuExchange represents an actual message sent from client or server. This is an atomic unit. +type DanmakuExchange struct { + DanmakuExchangeHeader + Body []byte +} + +func (e *DanmakuExchange) String() string { + return fmt.Sprintf("DanmakuExchange{length=%v, protocol=%v, operation=%v, body=%v}", + e.Length, e.ProtocolVer, e.Operation, e.Body) +} + +const kHeaderLength = 16 +const kSequenceId = 1 + +type ProtocolVer uint16 + +const ( + // ProtoPlainJson the body is plain JSON text + ProtoPlainJson ProtocolVer = 0 + // ProtoMinimal the body is uint32 watcher count (big endian) + ProtoMinimal ProtocolVer = 1 + // ProtoZlib the body is a zlib compressed package + ProtoZlib ProtocolVer = 2 + // ProtoBrotli the body is a brotli compressed package + ProtoBrotli ProtocolVer = 3 +) + +const kUidGuest = 0 +const kPlatformWeb = "web" +const kAuthTypeDefault = 2 // magic number, not sure what does it mean + +func (e *DanmakuExchange) Marshal() (data []byte, err error) { + var buffer bytes.Buffer + // only unpack header with struc, since it does not support indirect variable field length calculation + err = struc.Pack(&buffer, &e.DanmakuExchangeHeader) + if err != nil { + err = fmt.Errorf("cannot pack an exchange into binary form: %w", err) + return + } + data = buffer.Bytes() + data = append(data, e.Body...) + return +} + +// Inflate decompresses the body if it is compressed +func (e *DanmakuExchange) Inflate() (ret DanmakuExchange, err error) { + switch e.ProtocolVer { + case ProtoMinimal: + fallthrough + case ProtoPlainJson: + ret = *e + case ProtoBrotli: + var data []byte + rd := brotli.NewReader(bytes.NewReader(e.Body)) + data, err = io.ReadAll(rd) + if err != nil { + err = fmt.Errorf("cannot decompress exchange body: %w", err) + return + } + var nestedExchange DanmakuExchange + nestedExchange, err = DecodeExchange(data) + if err != nil { + err = fmt.Errorf("cannot decode nested exchange: %w", err) + return + } + return nestedExchange.Inflate() + case ProtoZlib: + var data []byte + var rd io.ReadCloser + rd, err = zlib.NewReader(bytes.NewReader(e.Body)) + if err != nil { + err = fmt.Errorf("cannot create zlib reader: %w", err) + return + } + data, err = io.ReadAll(rd) + if err != nil { + err = fmt.Errorf("cannot decompress exchange body: %w", err) + return + } + var nestedExchange DanmakuExchange + nestedExchange, err = DecodeExchange(data) + if err != nil { + err = fmt.Errorf("cannot decode nested exchange: %w", err) + return + } + return nestedExchange.Inflate() + } + return +} diff --git a/danmaku/dmpkg/ping.go b/danmaku/dmpkg/ping.go new file mode 100644 index 0000000..4596d49 --- /dev/null +++ b/danmaku/dmpkg/ping.go @@ -0,0 +1,8 @@ +package dmpkg + +// NewPing construct a new PING exahange. +func NewPing() (exc DanmakuExchange) { + // compliant with Bilibili webapp behavior + exc, _ = NewPlainExchange(OpHeartbeat, "[object Object]") + return +} diff --git a/danmaku/dmpkg/raw.go b/danmaku/dmpkg/raw.go new file mode 100644 index 0000000..1cbd12e --- /dev/null +++ b/danmaku/dmpkg/raw.go @@ -0,0 +1,47 @@ +package dmpkg + +import ( + "encoding/json" + "fmt" + "math" +) + +const kMaxBodyLength = math.MaxUint32 - kHeaderLength + +// NewPlainExchange creates a new exchange with raw body specified. +// body: a struct or a raw string +func NewPlainExchange(operation Operation, body interface{}) (exc DanmakuExchange, err error) { + var bodyData []byte + + // convert body to []byte + if _, ok := body.(string); ok { + // a string + bodyData = []byte(body.(string)) + } else if _, ok := body.([]byte); ok { + // a []byte + copy(bodyData, body.([]byte)) + } else { + // a JSON struct + bodyData, err = json.Marshal(body) + if err != nil { + return + } + } + + length := uint64(kHeaderLength + len(bodyData)) + if length > kMaxBodyLength { + err = fmt.Errorf("body is too large (> %d)", kMaxBodyLength) + return + } + exc = DanmakuExchange{ + DanmakuExchangeHeader: DanmakuExchangeHeader{ + Length: uint32(length), + HeaderLength: kHeaderLength, + ProtocolVer: ProtoPlainJson, + Operation: operation, + SequenceId: kSequenceId, + }, + Body: bodyData, + } + return +} -- cgit v1.2.3