summaryrefslogtreecommitdiff
path: root/danmaku
diff options
context:
space:
mode:
Diffstat (limited to 'danmaku')
-rw-r--r--danmaku/client.go136
-rw-r--r--danmaku/dmpkg/auth.go52
-rw-r--r--danmaku/dmpkg/decode.go44
-rw-r--r--danmaku/dmpkg/frameio.go15
-rw-r--r--danmaku/dmpkg/operation.go28
-rw-r--r--danmaku/dmpkg/package.go111
-rw-r--r--danmaku/dmpkg/ping.go8
-rw-r--r--danmaku/dmpkg/raw.go47
8 files changed, 441 insertions, 0 deletions
diff --git a/danmaku/client.go b/danmaku/client.go
new file mode 100644
index 0000000..7745761
--- /dev/null
+++ b/danmaku/client.go
@@ -0,0 +1,136 @@
+/*
+This file implements the background WebSocket messaging channel in Bilibili webui.
+Server send livestream start and stop messages via this channel.
+Note: In this file we manage the concrete WebSocket connection.
+The Bilibili WebSocket channel protocol is decoupled and implemented in package `dmpkg`.
+*/
+package danmaku
+
+import (
+ "bilibili-livestream-archiver/common"
+ "bilibili-livestream-archiver/danmaku/dmpkg"
+ "context"
+ "fmt"
+
+ "nhooyr.io/websocket"
+)
+
+// Bilibili uses only binary WebSocket messages
+const kBilibiliWebSocketMessageType = websocket.MessageBinary
+
+type DanmakuClient struct {
+ ws *websocket.Conn
+ wsio wsDatagramIO
+}
+
+type DanmakuMessageType int
+
+// wsDatagramIO wraps websocket into a datagram I/O,
+// since Bilibili uses only binary data,
+// which is effectively a datagram communication.
+type wsDatagramIO struct {
+ ws *websocket.Conn
+ ctx context.Context
+}
+
+func (w *wsDatagramIO) Consume(data []byte) error {
+ return w.ws.Write(w.ctx, kBilibiliWebSocketMessageType, data)
+}
+
+func (w *wsDatagramIO) Get() (data []byte, err error) {
+ typ, data, err := w.ws.Read(w.ctx)
+ if err != nil {
+ return
+ }
+ if typ != kBilibiliWebSocketMessageType {
+ err = fmt.Errorf("invalid message type: expected a binary WebSocket message, however got %v", typ.String())
+ }
+ return
+}
+
+func NewDanmakuClient() DanmakuClient {
+ return DanmakuClient{
+ ws: nil,
+ }
+}
+
+func (d *DanmakuClient) Connect(ctx context.Context, url string) error {
+ // thread unsafe
+
+ // dial
+ if d.ws != nil {
+ return fmt.Errorf("already connected")
+ }
+ ws, _, err := websocket.Dial(ctx, url, nil)
+ if err != nil {
+ return fmt.Errorf("failed to establish WebSocket connection: %w", err)
+ }
+ d.ws = ws
+
+ // init wsio
+ d.wsio = wsDatagramIO{
+ ws: ws,
+ ctx: ctx,
+ }
+
+ return nil
+}
+
+func (d *DanmakuClient) Disconnect() error {
+ // thread unsafe
+ ws := d.ws
+ if ws == nil {
+ return nil
+ }
+ d.ws = nil
+ d.wsio = wsDatagramIO{}
+ return ws.Close(websocket.StatusInternalError, "disconnected")
+}
+
+func (d *DanmakuClient) Authenticate(roomId common.RoomId, authKey string) error {
+ pkg := dmpkg.NewAuth(dmpkg.ProtoPlainJson, roomId, authKey)
+ data, err := pkg.Marshal()
+ if err != nil {
+ return fmt.Errorf("exchange marshal failed: %w", err)
+ }
+ err = d.wsio.Consume(data)
+ if err != nil {
+ return fmt.Errorf("channel write failed: %w", err)
+ }
+ // read server response
+ resp, err := d.wsio.Get()
+ if err != nil {
+ return err
+ }
+ respEx, err := dmpkg.DecodeExchange(resp)
+ if err != nil {
+ return fmt.Errorf("server danmaku exchange decode error: %w", err)
+ }
+ ok, err := dmpkg.IsAuthOk(respEx)
+ if !ok {
+ return fmt.Errorf("danmaku auth failed: %w", err)
+ }
+ return nil
+}
+
+func (d *DanmakuClient) Heartbeat() error {
+ pkg := dmpkg.NewPing()
+ data, err := pkg.Marshal()
+ if err != nil {
+ return fmt.Errorf("exchange marshal failed: %w", err)
+ }
+ err = d.wsio.Consume(data)
+ if err != nil {
+ return fmt.Errorf("channel write failed: %w", err)
+ }
+ return nil
+}
+
+// ReadExchange read and decode some kind of exchanges which we are interested
+func (d *DanmakuClient) ReadExchange() (dmpkg.DanmakuExchange, error) {
+ data, err := d.wsio.Get()
+ if err != nil {
+ return dmpkg.DanmakuExchange{}, fmt.Errorf("failed to read danmaku datagram from server: %w", err)
+ }
+ return dmpkg.DecodeExchange(data)
+}
diff --git a/danmaku/dmpkg/auth.go b/danmaku/dmpkg/auth.go
new file mode 100644
index 0000000..5a205ad
--- /dev/null
+++ b/danmaku/dmpkg/auth.go
@@ -0,0 +1,52 @@
+/*
+This file implements the auth exchange.
+When Bilibili live client established the WebSocket connection successfully,
+it sends this message at first. The server then responses a OpConnectOk exchange with body `{"code":0}` which indicates success.
+*/
+package dmpkg
+
+import (
+ "bilibili-livestream-archiver/common"
+ "encoding/json"
+ "fmt"
+)
+
+type authInfo struct {
+ UID uint64 `json:"uid"`
+ RoomId uint64 `json:"roomid"`
+ ProtoVer int `json:"protover"`
+ Platform string `json:"platform"`
+ Type int `json:"type"`
+ Key string `json:"key"`
+}
+
+// NewAuth creates a new authentication exchange.
+func NewAuth(protocol ProtocolVer, roomId common.RoomId, authKey string) (exc DanmakuExchange) {
+ exc, _ = NewPlainExchange(OpConnect, authInfo{
+ UID: kUidGuest,
+ RoomId: uint64(roomId),
+ ProtoVer: int(protocol),
+ Platform: kPlatformWeb,
+ Type: kAuthTypeDefault,
+ Key: authKey,
+ })
+ return
+}
+
+func IsAuthOk(serverResponse DanmakuExchange) (bool, error) {
+ if op := serverResponse.Operation; op != OpConnectOk {
+ return false, fmt.Errorf("server operation is not OpConnectOk: %w", op)
+ }
+ var body struct {
+ Code int `json:"code"`
+ }
+ body.Code = 1
+ err := json.Unmarshal(serverResponse.Body, &body)
+ if err != nil {
+ return false, fmt.Errorf("JSON decode error: %w", err)
+ }
+ if c := body.Code; c != 0 {
+ return false, fmt.Errorf("server response code is non-zero: %w", c)
+ }
+ return true, nil
+}
diff --git a/danmaku/dmpkg/decode.go b/danmaku/dmpkg/decode.go
new file mode 100644
index 0000000..7d9f796
--- /dev/null
+++ b/danmaku/dmpkg/decode.go
@@ -0,0 +1,44 @@
+package dmpkg
+
+import (
+ "bytes"
+ "fmt"
+ "github.com/lunixbochs/struc"
+)
+
+func DecodeExchange(data []byte) (exc DanmakuExchange, err error) {
+ if ln := len(data); ln < kHeaderLength {
+ err = fmt.Errorf("incomplete datagram: length = %v < %v", ln, kHeaderLength)
+ return
+ }
+
+ // unpack header
+ var exchangeHeader DanmakuExchangeHeader
+ err = struc.Unpack(bytes.NewReader(data[:kHeaderLength]), &exchangeHeader)
+ if err != nil {
+ err = fmt.Errorf("cannot unpack exchange header: %w", err)
+ return
+ }
+ headerLength := exchangeHeader.HeaderLength
+
+ // validate header length, fail fast if not match
+ if headerLength != kHeaderLength {
+ err = fmt.Errorf("invalid header length, "+
+ "the protocol implementation might be obsolete: %v != %v", headerLength, kHeaderLength)
+ return
+ }
+
+ // special process
+ // TODO decouple this
+ // The server OpHeartbeatAck contains an extra 4-bytes header entry in the body, maybe a heat value
+ var body []byte
+ // copy body
+ body = make([]byte, exchangeHeader.Length-uint32(headerLength))
+ copy(body, data[headerLength:])
+
+ exc = DanmakuExchange{
+ DanmakuExchangeHeader: exchangeHeader,
+ Body: body,
+ }
+ return
+}
diff --git a/danmaku/dmpkg/frameio.go b/danmaku/dmpkg/frameio.go
new file mode 100644
index 0000000..2e7693a
--- /dev/null
+++ b/danmaku/dmpkg/frameio.go
@@ -0,0 +1,15 @@
+/*
+Since Bilibili only uses WebSocket binary message,
+the transportation is effectively a datagram channel.
+So we use a consumer-supplier abstraction to decouple
+the real protocol with WebSocket stuff.
+*/
+package dmpkg
+
+type Consumer[T any] interface {
+ Consume(value T) error
+}
+
+type Supplier[T any] interface {
+ Get() (T, error)
+}
diff --git a/danmaku/dmpkg/operation.go b/danmaku/dmpkg/operation.go
new file mode 100644
index 0000000..d3c13f4
--- /dev/null
+++ b/danmaku/dmpkg/operation.go
@@ -0,0 +1,28 @@
+package dmpkg
+
+type Operation uint32
+
+// Operations
+const (
+ OpHeartbeat Operation = 2
+ OpHeartbeatAck Operation = 3
+ OpLayer7Data Operation = 5
+ OpConnect Operation = 7
+ OpConnectOk Operation = 8
+)
+
+var opStringMap = map[Operation]string{
+ OpHeartbeat: "HeartBeat",
+ OpHeartbeatAck: "HeartBeatAck",
+ OpLayer7Data: "AppData",
+ OpConnect: "Connect",
+ OpConnectOk: "ConnectOk",
+}
+
+func (o Operation) String() string {
+ s, exists := opStringMap[o]
+ if !exists {
+ return "<Unknown>"
+ }
+ return s
+}
diff --git a/danmaku/dmpkg/package.go b/danmaku/dmpkg/package.go
new file mode 100644
index 0000000..3175a18
--- /dev/null
+++ b/danmaku/dmpkg/package.go
@@ -0,0 +1,111 @@
+package dmpkg
+
+import (
+ "bytes"
+ "compress/zlib"
+ "fmt"
+ "github.com/andybalholm/brotli"
+ "github.com/lunixbochs/struc"
+ "io"
+)
+
+type DanmakuExchangeHeader struct {
+ // Length total remaining bytes of this exchange, excluding `Length` itself
+ Length uint32
+ // HeaderLength = Length - len(Body) + 4, always equals to 16
+ HeaderLength uint16
+ ProtocolVer ProtocolVer
+ Operation Operation
+ // SequenceId is always 1
+ SequenceId uint32
+}
+
+// DanmakuExchange represents an actual message sent from client or server. This is an atomic unit.
+type DanmakuExchange struct {
+ DanmakuExchangeHeader
+ Body []byte
+}
+
+func (e *DanmakuExchange) String() string {
+ return fmt.Sprintf("DanmakuExchange{length=%v, protocol=%v, operation=%v, body=%v}",
+ e.Length, e.ProtocolVer, e.Operation, e.Body)
+}
+
+const kHeaderLength = 16
+const kSequenceId = 1
+
+type ProtocolVer uint16
+
+const (
+ // ProtoPlainJson the body is plain JSON text
+ ProtoPlainJson ProtocolVer = 0
+ // ProtoMinimal the body is uint32 watcher count (big endian)
+ ProtoMinimal ProtocolVer = 1
+ // ProtoZlib the body is a zlib compressed package
+ ProtoZlib ProtocolVer = 2
+ // ProtoBrotli the body is a brotli compressed package
+ ProtoBrotli ProtocolVer = 3
+)
+
+const kUidGuest = 0
+const kPlatformWeb = "web"
+const kAuthTypeDefault = 2 // magic number, not sure what does it mean
+
+func (e *DanmakuExchange) Marshal() (data []byte, err error) {
+ var buffer bytes.Buffer
+ // only unpack header with struc, since it does not support indirect variable field length calculation
+ err = struc.Pack(&buffer, &e.DanmakuExchangeHeader)
+ if err != nil {
+ err = fmt.Errorf("cannot pack an exchange into binary form: %w", err)
+ return
+ }
+ data = buffer.Bytes()
+ data = append(data, e.Body...)
+ return
+}
+
+// Inflate decompresses the body if it is compressed
+func (e *DanmakuExchange) Inflate() (ret DanmakuExchange, err error) {
+ switch e.ProtocolVer {
+ case ProtoMinimal:
+ fallthrough
+ case ProtoPlainJson:
+ ret = *e
+ case ProtoBrotli:
+ var data []byte
+ rd := brotli.NewReader(bytes.NewReader(e.Body))
+ data, err = io.ReadAll(rd)
+ if err != nil {
+ err = fmt.Errorf("cannot decompress exchange body: %w", err)
+ return
+ }
+ var nestedExchange DanmakuExchange
+ nestedExchange, err = DecodeExchange(data)
+ if err != nil {
+ err = fmt.Errorf("cannot decode nested exchange: %w", err)
+ return
+ }
+ return nestedExchange.Inflate()
+ case ProtoZlib:
+ var data []byte
+ var rd io.ReadCloser
+ rd, err = zlib.NewReader(bytes.NewReader(e.Body))
+ if err != nil {
+ err = fmt.Errorf("cannot create zlib reader: %w", err)
+ return
+ }
+ data, err = io.ReadAll(rd)
+ if err != nil {
+ err = fmt.Errorf("cannot decompress exchange body: %w", err)
+ return
+ }
+ var nestedExchange DanmakuExchange
+ nestedExchange, err = DecodeExchange(data)
+ if err != nil {
+ err = fmt.Errorf("cannot decode nested exchange: %w", err)
+ return
+ }
+ return nestedExchange.Inflate()
+ }
+ return
+}
diff --git a/danmaku/dmpkg/ping.go b/danmaku/dmpkg/ping.go
new file mode 100644
index 0000000..4596d49
--- /dev/null
+++ b/danmaku/dmpkg/ping.go
@@ -0,0 +1,8 @@
+package dmpkg
+
+// NewPing construct a new PING exahange.
+func NewPing() (exc DanmakuExchange) {
+ // compliant with Bilibili webapp behavior
+ exc, _ = NewPlainExchange(OpHeartbeat, "[object Object]")
+ return
+}
diff --git a/danmaku/dmpkg/raw.go b/danmaku/dmpkg/raw.go
new file mode 100644
index 0000000..1cbd12e
--- /dev/null
+++ b/danmaku/dmpkg/raw.go
@@ -0,0 +1,47 @@
+package dmpkg
+
+import (
+ "encoding/json"
+ "fmt"
+ "math"
+)
+
+const kMaxBodyLength = math.MaxUint32 - kHeaderLength
+
+// NewPlainExchange creates a new exchange with raw body specified.
+// body: a struct or a raw string
+func NewPlainExchange(operation Operation, body interface{}) (exc DanmakuExchange, err error) {
+ var bodyData []byte
+
+ // convert body to []byte
+ if _, ok := body.(string); ok {
+ // a string
+ bodyData = []byte(body.(string))
+ } else if _, ok := body.([]byte); ok {
+ // a []byte
+ copy(bodyData, body.([]byte))
+ } else {
+ // a JSON struct
+ bodyData, err = json.Marshal(body)
+ if err != nil {
+ return
+ }
+ }
+
+ length := uint64(kHeaderLength + len(bodyData))
+ if length > kMaxBodyLength {
+ err = fmt.Errorf("body is too large (> %d)", kMaxBodyLength)
+ return
+ }
+ exc = DanmakuExchange{
+ DanmakuExchangeHeader: DanmakuExchangeHeader{
+ Length: uint32(length),
+ HeaderLength: kHeaderLength,
+ ProtocolVer: ProtoPlainJson,
+ Operation: operation,
+ SequenceId: kSequenceId,
+ },
+ Body: bodyData,
+ }
+ return
+}