From 8e15d802865ed57db0018c15ea5559c8bd44c01f Mon Sep 17 00:00:00 2001 From: Keuin Date: Wed, 7 Sep 2022 02:48:46 +0800 Subject: First working version. Just a POC. --- danmaku/client.go | 136 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 136 insertions(+) create mode 100644 danmaku/client.go (limited to 'danmaku/client.go') diff --git a/danmaku/client.go b/danmaku/client.go new file mode 100644 index 0000000..7745761 --- /dev/null +++ b/danmaku/client.go @@ -0,0 +1,136 @@ +/* +This file implements the background WebSocket messaging channel in Bilibili webui. +Server send livestream start and stop messages via this channel. +Note: In this file we manage the concrete WebSocket connection. +The Bilibili WebSocket channel protocol is decoupled and implemented in package `dmpkg`. +*/ +package danmaku + +import ( + "bilibili-livestream-archiver/common" + "bilibili-livestream-archiver/danmaku/dmpkg" + "context" + "fmt" + + "nhooyr.io/websocket" +) + +// Bilibili uses only binary WebSocket messages +const kBilibiliWebSocketMessageType = websocket.MessageBinary + +type DanmakuClient struct { + ws *websocket.Conn + wsio wsDatagramIO +} + +type DanmakuMessageType int + +// wsDatagramIO wraps websocket into a datagram I/O, +// since Bilibili uses only binary data, +// which is effectively a datagram communication. +type wsDatagramIO struct { + ws *websocket.Conn + ctx context.Context +} + +func (w *wsDatagramIO) Consume(data []byte) error { + return w.ws.Write(w.ctx, kBilibiliWebSocketMessageType, data) +} + +func (w *wsDatagramIO) Get() (data []byte, err error) { + typ, data, err := w.ws.Read(w.ctx) + if err != nil { + return + } + if typ != kBilibiliWebSocketMessageType { + err = fmt.Errorf("invalid message type: expected a binary WebSocket message, however got %v", typ.String()) + } + return +} + +func NewDanmakuClient() DanmakuClient { + return DanmakuClient{ + ws: nil, + } +} + +func (d *DanmakuClient) Connect(ctx context.Context, url string) error { + // thread unsafe + + // dial + if d.ws != nil { + return fmt.Errorf("already connected") + } + ws, _, err := websocket.Dial(ctx, url, nil) + if err != nil { + return fmt.Errorf("failed to establish WebSocket connection: %w", err) + } + d.ws = ws + + // init wsio + d.wsio = wsDatagramIO{ + ws: ws, + ctx: ctx, + } + + return nil +} + +func (d *DanmakuClient) Disconnect() error { + // thread unsafe + ws := d.ws + if ws == nil { + return nil + } + d.ws = nil + d.wsio = wsDatagramIO{} + return ws.Close(websocket.StatusInternalError, "disconnected") +} + +func (d *DanmakuClient) Authenticate(roomId common.RoomId, authKey string) error { + pkg := dmpkg.NewAuth(dmpkg.ProtoPlainJson, roomId, authKey) + data, err := pkg.Marshal() + if err != nil { + return fmt.Errorf("exchange marshal failed: %w", err) + } + err = d.wsio.Consume(data) + if err != nil { + return fmt.Errorf("channel write failed: %w", err) + } + // read server response + resp, err := d.wsio.Get() + if err != nil { + return err + } + respEx, err := dmpkg.DecodeExchange(resp) + if err != nil { + return fmt.Errorf("server danmaku exchange decode error: %w", err) + } + ok, err := dmpkg.IsAuthOk(respEx) + if !ok { + return fmt.Errorf("danmaku auth failed: %w", err) + } + return nil +} + +func (d *DanmakuClient) Heartbeat() error { + pkg := dmpkg.NewPing() + data, err := pkg.Marshal() + if err != nil { + return fmt.Errorf("exchange marshal failed: %w", err) + } + err = d.wsio.Consume(data) + if err != nil { + return fmt.Errorf("channel write failed: %w", err) + } + return nil +} + +// ReadExchange read and decode some kind of exchanges which we are interested +func (d *DanmakuClient) ReadExchange() (dmpkg.DanmakuExchange, error) { + data, err := d.wsio.Get() + if err != nil { + return dmpkg.DanmakuExchange{}, fmt.Errorf("failed to read danmaku datagram from server: %w", err) + } + return dmpkg.DecodeExchange(data) +} -- cgit v1.2.3