summaryrefslogtreecommitdiff
path: root/danmaku/dmpkg
diff options
context:
space:
mode:
Diffstat (limited to 'danmaku/dmpkg')
-rw-r--r--danmaku/dmpkg/auth.go52
-rw-r--r--danmaku/dmpkg/decode.go44
-rw-r--r--danmaku/dmpkg/frameio.go15
-rw-r--r--danmaku/dmpkg/operation.go28
-rw-r--r--danmaku/dmpkg/package.go111
-rw-r--r--danmaku/dmpkg/ping.go8
-rw-r--r--danmaku/dmpkg/raw.go47
7 files changed, 305 insertions, 0 deletions
diff --git a/danmaku/dmpkg/auth.go b/danmaku/dmpkg/auth.go
new file mode 100644
index 0000000..5a205ad
--- /dev/null
+++ b/danmaku/dmpkg/auth.go
@@ -0,0 +1,52 @@
+/*
+This file implements the auth exchange.
+When Bilibili live client established the WebSocket connection successfully,
+it sends this message at first. The server then responses a OpConnectOk exchange with body `{"code":0}` which indicates success.
+*/
+package dmpkg
+
+import (
+ "bilibili-livestream-archiver/common"
+ "encoding/json"
+ "fmt"
+)
+
+type authInfo struct {
+ UID uint64 `json:"uid"`
+ RoomId uint64 `json:"roomid"`
+ ProtoVer int `json:"protover"`
+ Platform string `json:"platform"`
+ Type int `json:"type"`
+ Key string `json:"key"`
+}
+
+// NewAuth creates a new authentication exchange.
+func NewAuth(protocol ProtocolVer, roomId common.RoomId, authKey string) (exc DanmakuExchange) {
+ exc, _ = NewPlainExchange(OpConnect, authInfo{
+ UID: kUidGuest,
+ RoomId: uint64(roomId),
+ ProtoVer: int(protocol),
+ Platform: kPlatformWeb,
+ Type: kAuthTypeDefault,
+ Key: authKey,
+ })
+ return
+}
+
+func IsAuthOk(serverResponse DanmakuExchange) (bool, error) {
+ if op := serverResponse.Operation; op != OpConnectOk {
+ return false, fmt.Errorf("server operation is not OpConnectOk: %w", op)
+ }
+ var body struct {
+ Code int `json:"code"`
+ }
+ body.Code = 1
+ err := json.Unmarshal(serverResponse.Body, &body)
+ if err != nil {
+ return false, fmt.Errorf("JSON decode error: %w", err)
+ }
+ if c := body.Code; c != 0 {
+ return false, fmt.Errorf("server response code is non-zero: %w", c)
+ }
+ return true, nil
+}
diff --git a/danmaku/dmpkg/decode.go b/danmaku/dmpkg/decode.go
new file mode 100644
index 0000000..7d9f796
--- /dev/null
+++ b/danmaku/dmpkg/decode.go
@@ -0,0 +1,44 @@
+package dmpkg
+
+import (
+ "bytes"
+ "fmt"
+ "github.com/lunixbochs/struc"
+)
+
+func DecodeExchange(data []byte) (exc DanmakuExchange, err error) {
+ if ln := len(data); ln < kHeaderLength {
+ err = fmt.Errorf("incomplete datagram: length = %v < %v", ln, kHeaderLength)
+ return
+ }
+
+ // unpack header
+ var exchangeHeader DanmakuExchangeHeader
+ err = struc.Unpack(bytes.NewReader(data[:kHeaderLength]), &exchangeHeader)
+ if err != nil {
+ err = fmt.Errorf("cannot unpack exchange header: %w", err)
+ return
+ }
+ headerLength := exchangeHeader.HeaderLength
+
+ // validate header length, fail fast if not match
+ if headerLength != kHeaderLength {
+ err = fmt.Errorf("invalid header length, "+
+ "the protocol implementation might be obsolete: %v != %v", headerLength, kHeaderLength)
+ return
+ }
+
+ // special process
+ // TODO decouple this
+ // The server OpHeartbeatAck contains an extra 4-bytes header entry in the body, maybe a heat value
+ var body []byte
+ // copy body
+ body = make([]byte, exchangeHeader.Length-uint32(headerLength))
+ copy(body, data[headerLength:])
+
+ exc = DanmakuExchange{
+ DanmakuExchangeHeader: exchangeHeader,
+ Body: body,
+ }
+ return
+}
diff --git a/danmaku/dmpkg/frameio.go b/danmaku/dmpkg/frameio.go
new file mode 100644
index 0000000..2e7693a
--- /dev/null
+++ b/danmaku/dmpkg/frameio.go
@@ -0,0 +1,15 @@
+/*
+Since Bilibili only uses WebSocket binary message,
+the transportation is effectively a datagram channel.
+So we use a consumer-supplier abstraction to decouple
+the real protocol with WebSocket stuff.
+*/
+package dmpkg
+
+type Consumer[T any] interface {
+ Consume(value T) error
+}
+
+type Supplier[T any] interface {
+ Get() (T, error)
+}
diff --git a/danmaku/dmpkg/operation.go b/danmaku/dmpkg/operation.go
new file mode 100644
index 0000000..d3c13f4
--- /dev/null
+++ b/danmaku/dmpkg/operation.go
@@ -0,0 +1,28 @@
+package dmpkg
+
+type Operation uint32
+
+// Operations
+const (
+ OpHeartbeat Operation = 2
+ OpHeartbeatAck Operation = 3
+ OpLayer7Data Operation = 5
+ OpConnect Operation = 7
+ OpConnectOk Operation = 8
+)
+
+var opStringMap = map[Operation]string{
+ OpHeartbeat: "HeartBeat",
+ OpHeartbeatAck: "HeartBeatAck",
+ OpLayer7Data: "AppData",
+ OpConnect: "Connect",
+ OpConnectOk: "ConnectOk",
+}
+
+func (o Operation) String() string {
+ s, exists := opStringMap[o]
+ if !exists {
+ return "<Unknown>"
+ }
+ return s
+}
diff --git a/danmaku/dmpkg/package.go b/danmaku/dmpkg/package.go
new file mode 100644
index 0000000..3175a18
--- /dev/null
+++ b/danmaku/dmpkg/package.go
@@ -0,0 +1,111 @@
+package dmpkg
+
+import (
+ "bytes"
+ "compress/zlib"
+ "fmt"
+ "github.com/andybalholm/brotli"
+ "github.com/lunixbochs/struc"
+ "io"
+)
+
+type DanmakuExchangeHeader struct {
+ // Length total remaining bytes of this exchange, excluding `Length` itself
+ Length uint32
+ // HeaderLength = Length - len(Body) + 4, always equals to 16
+ HeaderLength uint16
+ ProtocolVer ProtocolVer
+ Operation Operation
+ // SequenceId is always 1
+ SequenceId uint32
+}
+
+// DanmakuExchange represents an actual message sent from client or server. This is an atomic unit.
+type DanmakuExchange struct {
+ DanmakuExchangeHeader
+ Body []byte
+}
+
+func (e *DanmakuExchange) String() string {
+ return fmt.Sprintf("DanmakuExchange{length=%v, protocol=%v, operation=%v, body=%v}",
+ e.Length, e.ProtocolVer, e.Operation, e.Body)
+}
+
+const kHeaderLength = 16
+const kSequenceId = 1
+
+type ProtocolVer uint16
+
+const (
+ // ProtoPlainJson the body is plain JSON text
+ ProtoPlainJson ProtocolVer = 0
+ // ProtoMinimal the body is uint32 watcher count (big endian)
+ ProtoMinimal ProtocolVer = 1
+ // ProtoZlib the body is a zlib compressed package
+ ProtoZlib ProtocolVer = 2
+ // ProtoBrotli the body is a brotli compressed package
+ ProtoBrotli ProtocolVer = 3
+)
+
+const kUidGuest = 0
+const kPlatformWeb = "web"
+const kAuthTypeDefault = 2 // magic number, not sure what does it mean
+
+func (e *DanmakuExchange) Marshal() (data []byte, err error) {
+ var buffer bytes.Buffer
+ // only unpack header with struc, since it does not support indirect variable field length calculation
+ err = struc.Pack(&buffer, &e.DanmakuExchangeHeader)
+ if err != nil {
+ err = fmt.Errorf("cannot pack an exchange into binary form: %w", err)
+ return
+ }
+ data = buffer.Bytes()
+ data = append(data, e.Body...)
+ return
+}
+
+// Inflate decompresses the body if it is compressed
+func (e *DanmakuExchange) Inflate() (ret DanmakuExchange, err error) {
+ switch e.ProtocolVer {
+ case ProtoMinimal:
+ fallthrough
+ case ProtoPlainJson:
+ ret = *e
+ case ProtoBrotli:
+ var data []byte
+ rd := brotli.NewReader(bytes.NewReader(e.Body))
+ data, err = io.ReadAll(rd)
+ if err != nil {
+ err = fmt.Errorf("cannot decompress exchange body: %w", err)
+ return
+ }
+ var nestedExchange DanmakuExchange
+ nestedExchange, err = DecodeExchange(data)
+ if err != nil {
+ err = fmt.Errorf("cannot decode nested exchange: %w", err)
+ return
+ }
+ return nestedExchange.Inflate()
+ case ProtoZlib:
+ var data []byte
+ var rd io.ReadCloser
+ rd, err = zlib.NewReader(bytes.NewReader(e.Body))
+ if err != nil {
+ err = fmt.Errorf("cannot create zlib reader: %w", err)
+ return
+ }
+ data, err = io.ReadAll(rd)
+ if err != nil {
+ err = fmt.Errorf("cannot decompress exchange body: %w", err)
+ return
+ }
+ var nestedExchange DanmakuExchange
+ nestedExchange, err = DecodeExchange(data)
+ if err != nil {
+ err = fmt.Errorf("cannot decode nested exchange: %w", err)
+ return
+ }
+ return nestedExchange.Inflate()
+ }
+ return
+}
diff --git a/danmaku/dmpkg/ping.go b/danmaku/dmpkg/ping.go
new file mode 100644
index 0000000..4596d49
--- /dev/null
+++ b/danmaku/dmpkg/ping.go
@@ -0,0 +1,8 @@
+package dmpkg
+
+// NewPing construct a new PING exahange.
+func NewPing() (exc DanmakuExchange) {
+ // compliant with Bilibili webapp behavior
+ exc, _ = NewPlainExchange(OpHeartbeat, "[object Object]")
+ return
+}
diff --git a/danmaku/dmpkg/raw.go b/danmaku/dmpkg/raw.go
new file mode 100644
index 0000000..1cbd12e
--- /dev/null
+++ b/danmaku/dmpkg/raw.go
@@ -0,0 +1,47 @@
+package dmpkg
+
+import (
+ "encoding/json"
+ "fmt"
+ "math"
+)
+
+const kMaxBodyLength = math.MaxUint32 - kHeaderLength
+
+// NewPlainExchange creates a new exchange with raw body specified.
+// body: a struct or a raw string
+func NewPlainExchange(operation Operation, body interface{}) (exc DanmakuExchange, err error) {
+ var bodyData []byte
+
+ // convert body to []byte
+ if _, ok := body.(string); ok {
+ // a string
+ bodyData = []byte(body.(string))
+ } else if _, ok := body.([]byte); ok {
+ // a []byte
+ copy(bodyData, body.([]byte))
+ } else {
+ // a JSON struct
+ bodyData, err = json.Marshal(body)
+ if err != nil {
+ return
+ }
+ }
+
+ length := uint64(kHeaderLength + len(bodyData))
+ if length > kMaxBodyLength {
+ err = fmt.Errorf("body is too large (> %d)", kMaxBodyLength)
+ return
+ }
+ exc = DanmakuExchange{
+ DanmakuExchangeHeader: DanmakuExchangeHeader{
+ Length: uint32(length),
+ HeaderLength: kHeaderLength,
+ ProtocolVer: ProtoPlainJson,
+ Operation: operation,
+ SequenceId: kSequenceId,
+ },
+ Body: bodyData,
+ }
+ return
+}