summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKeuin <[email protected]>2022-09-07 02:48:46 +0800
committerKeuin <[email protected]>2022-09-07 02:48:46 +0800
commit8e15d802865ed57db0018c15ea5559c8bd44c01f (patch)
tree48f4632a1ad044bd7f7f8da3ebe2bb03ab4ca6fe
parent88234ca8fffc4e120adbe0d38071b625ad2f43c7 (diff)
First working version. Just a POC.
-rw-r--r--README.md29
-rw-r--r--bilibili/client.go45
-rw-r--r--bilibili/danmaku_server_info.go28
-rw-r--r--bilibili/errors.go7
-rw-r--r--bilibili/logging.go13
-rw-r--r--bilibili/model.go8
-rw-r--r--bilibili/play_url.go35
-rw-r--r--bilibili/request.go68
-rw-r--r--bilibili/room_profile.go86
-rw-r--r--bilibili/room_status.go58
-rw-r--r--bilibili/streaming.go66
-rw-r--r--bilibili/util.go13
-rw-r--r--common/orelse.go20
-rw-r--r--common/retry.go35
-rw-r--r--common/types.go3
-rw-r--r--common/urlparse.go21
-rw-r--r--danmaku/client.go136
-rw-r--r--danmaku/dmpkg/auth.go52
-rw-r--r--danmaku/dmpkg/decode.go44
-rw-r--r--danmaku/dmpkg/frameio.go15
-rw-r--r--danmaku/dmpkg/operation.go28
-rw-r--r--danmaku/dmpkg/package.go111
-rw-r--r--danmaku/dmpkg/ping.go8
-rw-r--r--danmaku/dmpkg/raw.go47
-rw-r--r--go.mod21
-rw-r--r--go.sum78
-rw-r--r--main.go30
-rw-r--r--recording/config.go20
-rw-r--r--recording/runner.go252
-rw-r--r--recording/watcher.go156
30 files changed, 1533 insertions, 0 deletions
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..c0f46b5
--- /dev/null
+++ b/README.md
@@ -0,0 +1,29 @@
+# YABR: Yet another Bilibili livestream recorder
+
+## 1. Features
+
+If you want a Bilibili recorder featured with:
+
+- Single executable file, just copy and run
+- Friendly command-line arguments and an optional configuration file
+- Save raw video streams directly, without intentional clipping
+- Efficient execution
+- Friendly logging to `stdout` or files
+
+Then you should give YABR a try.
+
+## 2. Why another Bilibili livestream recorder
+
+The other recorder sucks. So I write my own recorder, which, I hope, sucks less.
+
+## Reference
+
+https://github.com/lovelyyoshino/Bilibili-Live-API
+
+https://github.com/BililiveRecorder/BililiveRecorder
+
+https://github.com/SocialSisterYi/bilibili-API-collect
+
+https://github.com/hr3lxphr6j/bililive-go
+
+https://github.com/zyzsdy/biliroku \ No newline at end of file
diff --git a/bilibili/client.go b/bilibili/client.go
new file mode 100644
index 0000000..fcea395
--- /dev/null
+++ b/bilibili/client.go
@@ -0,0 +1,45 @@
+/*
+This file defines the Bilibili client struct.
+This struct is a facade of all necessary Bilibili HTTP API wrappers.
+*/
+package bilibili
+
+import (
+ "context"
+ "log"
+ "net/http"
+ "os"
+)
+
+const (
+ // kUserAgent: the default user-agent header to use when communicating with bilibili.
+ kUserAgent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " +
+ "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Safari/537.36"
+)
+
+type Bilibili struct {
+ userAgent string
+ http *http.Client
+ loggerCommon
+ ctx context.Context
+}
+
+func NewBilibiliWithContext(ctx context.Context) Bilibili {
+ logger := loggerCommon{
+ debug: log.New(os.Stderr, "DEBUG: ", log.Ldate|log.Ltime|log.Lshortfile),
+ info: log.New(os.Stderr, "INFO: ", log.Ldate|log.Ltime|log.Lshortfile),
+ warn: log.New(os.Stderr, "WARNING: ", log.Ldate|log.Ltime|log.Lshortfile),
+ error: log.New(os.Stderr, "ERROR: ", log.Ldate|log.Ltime|log.Lshortfile),
+ }
+ return Bilibili{
+ loggerCommon: logger,
+ userAgent: kUserAgent,
+ http: http.DefaultClient,
+ ctx: ctx,
+ }
+}
+
+func NewBilibili() Bilibili {
+ ctx := context.Background()
+ return NewBilibiliWithContext(ctx)
+}
diff --git a/bilibili/danmaku_server_info.go b/bilibili/danmaku_server_info.go
new file mode 100644
index 0000000..dbdf6bb
--- /dev/null
+++ b/bilibili/danmaku_server_info.go
@@ -0,0 +1,28 @@
+package bilibili
+
+import (
+ "bilibili-livestream-archiver/common"
+ "fmt"
+)
+
+type DanmakuServerInfoResponse = BaseResponse[danmakuInfo]
+
+type danmakuInfo struct {
+ Group string `json:"group"`
+ BusinessID int `json:"business_id"`
+ RefreshRowFactor float64 `json:"refresh_row_factor"`
+ RefreshRate int `json:"refresh_rate"`
+ MaxDelay int `json:"max_delay"`
+ Token string `json:"token"`
+ HostList []struct {
+ Host string `json:"host"`
+ Port int `json:"port"`
+ WssPort int `json:"wss_port"`
+ WsPort int `json:"ws_port"`
+ } `json:"host_list"`
+}
+
+func (b Bilibili) GetDanmakuServerInfo(roomId common.RoomId) (resp DanmakuServerInfoResponse, err error) {
+ url := fmt.Sprintf("https://api.live.bilibili.com/xlive/web-room/v1/index/getDanmuInfo?id=%d&type=0", roomId)
+ return callGet[DanmakuServerInfoResponse](b, url)
+}
diff --git a/bilibili/errors.go b/bilibili/errors.go
new file mode 100644
index 0000000..3f8ac9b
--- /dev/null
+++ b/bilibili/errors.go
@@ -0,0 +1,7 @@
+package bilibili
+
+import "fmt"
+
+var (
+ ErrRoomIsClosed = fmt.Errorf("living room is closed")
+)
diff --git a/bilibili/logging.go b/bilibili/logging.go
new file mode 100644
index 0000000..a0b9b8e
--- /dev/null
+++ b/bilibili/logging.go
@@ -0,0 +1,13 @@
+/*
+This file defines the common struct of logger pointers used in modules of this package.
+*/
+package bilibili
+
+import "log"
+
+type loggerCommon struct {
+ debug *log.Logger
+ info *log.Logger
+ warn *log.Logger
+ error *log.Logger
+}
diff --git a/bilibili/model.go b/bilibili/model.go
new file mode 100644
index 0000000..ccffef9
--- /dev/null
+++ b/bilibili/model.go
@@ -0,0 +1,8 @@
+package bilibili
+
+type BaseResponse[T any] struct {
+ Code int `json:"code"`
+ Message string `json:"message"`
+ TTL int `json:"ttl"`
+ Data T `json:"data"`
+}
diff --git a/bilibili/play_url.go b/bilibili/play_url.go
new file mode 100644
index 0000000..282d556
--- /dev/null
+++ b/bilibili/play_url.go
@@ -0,0 +1,35 @@
+package bilibili
+
+import (
+ "bilibili-livestream-archiver/common"
+ "fmt"
+)
+
+type RoomUrlInfoResponse = BaseResponse[roomUrlInfo]
+
+type roomUrlInfo struct {
+ CurrentQuality int `json:"current_quality"`
+ AcceptQuality []string `json:"accept_quality"`
+ CurrentQualityNumber int `json:"current_qn"`
+ QualityDescription []qualityDescription `json:"quality_description"`
+ URLs []StreamingUrlInfo `json:"durl"`
+}
+
+type qualityDescription struct {
+ QualityNumber int `json:"qn"`
+ Description string `json:"desc"`
+}
+
+type StreamingUrlInfo struct {
+ URL string `json:"url"`
+ Length int `json:"length"`
+ Order int `json:"order"`
+ StreamType int `json:"stream_type"`
+ P2pType int `json:"p2p_type"`
+}
+
+func (b Bilibili) GetStreamingInfo(roomId common.RoomId) (resp RoomUrlInfoResponse, err error) {
+ url := fmt.Sprintf("https://api.live.bilibili.com/room/v1/Room/playUrl?"+
+ "cid=%d&otype=json&qn=10000&platform=web", roomId)
+ return callGet[RoomUrlInfoResponse](b, url)
+}
diff --git a/bilibili/request.go b/bilibili/request.go
new file mode 100644
index 0000000..cb989a7
--- /dev/null
+++ b/bilibili/request.go
@@ -0,0 +1,68 @@
+package bilibili
+
+import (
+ "encoding/json"
+ "io"
+ "log"
+ "net/http"
+ "strings"
+)
+
+// newRequest create an HTTP request with per-instance User-Agent set.
+func (b Bilibili) newRequest(
+ method string,
+ url string,
+ body io.Reader,
+) (req *http.Request, err error) {
+ req, err = http.NewRequestWithContext(b.ctx, method, url, body)
+ if err != nil {
+ b.error.Printf("Cannot create HTTP request instance: %v. Method: %v, URL: %v", err, method, url)
+ return
+ }
+ req.Header.Set("User-Agent", b.userAgent)
+ return
+}
+
+// newRequest create an HTTP GET request with an empty body and per-instance User-Agent set.
+func (b Bilibili) newGet(url string) (req *http.Request, err error) {
+ return b.newRequest("GET", url, strings.NewReader(""))
+}
+
+// callGet make a GET request and parse response as a JSON document with given model.
+func callGet[T BaseResponse[V], V any](b Bilibili, url string) (resp T, err error) {
+ logger := log.Default()
+ req, err := b.newGet(url)
+ if err != nil {
+ logger.Printf("ERROR: Cannot create HTTP request instance on API %v: %v", url, err)
+ return
+ }
+
+ r, err := b.http.Do(req)
+ defer func() { _ = r.Body.Close() }()
+ if err != nil {
+ logger.Printf("ERROR: HTTP Request failed on API %v: %v", url, err)
+ return
+ }
+
+ err = validateHttpStatus(r)
+ if err != nil {
+ b.error.Printf("%v", err)
+ return
+ }
+
+ data, err := io.ReadAll(r.Body)
+ if err != nil {
+ b.error.Printf("Error when reading HTTP response on API %v: %v", url, err)
+ return
+ }
+
+ err = json.Unmarshal(data, &resp)
+ if err != nil {
+ b.error.Printf("Invalid JSON body of HTTP response on API %v: %v. Text: \"%v\"",
+ url, err, string(data))
+ return
+ }
+
+ b.debug.Printf("HTTP %v, len: %v bytes, url: %v", r.StatusCode, len(data), url)
+ return
+}
diff --git a/bilibili/room_profile.go b/bilibili/room_profile.go
new file mode 100644
index 0000000..d75d447
--- /dev/null
+++ b/bilibili/room_profile.go
@@ -0,0 +1,86 @@
+package bilibili
+
+import (
+ "bilibili-livestream-archiver/common"
+ "fmt"
+)
+
+type roomProfile struct {
+ UID int `json:"uid"`
+ RoomID int `json:"room_id"`
+ ShortID int `json:"short_id"`
+ Attention int `json:"attention"`
+ Online int `json:"online"`
+ IsPortrait bool `json:"is_portrait"`
+ Description string `json:"description"`
+ LiveStatus int `json:"live_status"`
+ AreaID int `json:"area_id"`
+ ParentAreaID int `json:"parent_area_id"`
+ ParentAreaName string `json:"parent_area_name"`
+ OldAreaID int `json:"old_area_id"`
+ Background string `json:"background"`
+ Title string `json:"title"`
+ UserCover string `json:"user_cover"`
+ Keyframe string `json:"keyframe"`
+ IsStrictRoom bool `json:"is_strict_room"`
+ LiveTime string `json:"live_time"`
+ Tags string `json:"tags"`
+ IsAnchor int `json:"is_anchor"`
+ RoomSilentType string `json:"room_silent_type"`
+ RoomSilentLevel int `json:"room_silent_level"`
+ RoomSilentSecond int `json:"room_silent_second"`
+ AreaName string `json:"area_name"`
+ Pendants string `json:"pendants"`
+ AreaPendants string `json:"area_pendants"`
+ HotWords []string `json:"hot_words"`
+ HotWordsStatus int `json:"hot_words_status"`
+ Verify string `json:"verify"`
+ NewPendants struct {
+ Frame struct {
+ Name string `json:"name"`
+ Value string `json:"value"`
+ Position int `json:"position"`
+ Desc string `json:"desc"`
+ Area int `json:"area"`
+ AreaOld int `json:"area_old"`
+ BgColor string `json:"bg_color"`
+ BgPic string `json:"bg_pic"`
+ UseOldArea bool `json:"use_old_area"`
+ } `json:"frame"`
+ Badge struct {
+ Name string `json:"name"`
+ Position int `json:"position"`
+ Value string `json:"value"`
+ Desc string `json:"desc"`
+ } `json:"badge"`
+ MobileFrame struct {
+ Name string `json:"name"`
+ Value string `json:"value"`
+ Position int `json:"position"`
+ Desc string `json:"desc"`
+ Area int `json:"area"`
+ AreaOld int `json:"area_old"`
+ BgColor string `json:"bg_color"`
+ BgPic string `json:"bg_pic"`
+ UseOldArea bool `json:"use_old_area"`
+ } `json:"mobile_frame"`
+ MobileBadge interface{} `json:"mobile_badge"`
+ } `json:"new_pendants"`
+ UpSession string `json:"up_session"`
+ PkStatus int `json:"pk_status"`
+ PkID int `json:"pk_id"`
+ BattleID int `json:"battle_id"`
+ AllowChangeAreaTime int `json:"allow_change_area_time"`
+ AllowUploadCoverTime int `json:"allow_upload_cover_time"`
+ StudioInfo struct {
+ Status int `json:"status"`
+ MasterList []interface{} `json:"master_list"`
+ } `json:"studio_info"`
+}
+
+type RoomProfileResponse = BaseResponse[roomProfile]
+
+func (b Bilibili) GetRoomProfile(roomId common.RoomId) (resp RoomProfileResponse, err error) {
+ url := fmt.Sprintf("https://api.live.bilibili.com/room/v1/Room/get_info?room_id=%d", roomId)
+ return callGet[RoomProfileResponse](b, url)
+}
diff --git a/bilibili/room_status.go b/bilibili/room_status.go
new file mode 100644
index 0000000..6b6495b
--- /dev/null
+++ b/bilibili/room_status.go
@@ -0,0 +1,58 @@
+/*
+Get live room basic status.
+This is used to check initially if it is streaming or not.
+*/
+package bilibili
+
+import (
+ "bilibili-livestream-archiver/common"
+ "fmt"
+)
+
+type LiveStatus int
+
+const (
+ Inactive LiveStatus = 0
+ Streaming LiveStatus = 1
+ Playback LiveStatus = 2
+)
+
+var liveStatusStringMap = map[LiveStatus]string{
+ Inactive: "inactive",
+ Streaming: "streaming",
+ Playback: "inactive (playback)",
+}
+
+type roomPlayInfo struct {
+ RoomID uint64 `json:"room_id"`
+ ShortID uint `json:"short_id"`
+ UID uint `json:"uid"`
+ IsHidden bool `json:"is_hidden"`
+ IsLocked bool `json:"is_locked"`
+ IsPortrait bool `json:"is_portrait"`
+ LiveStatus LiveStatus `json:"live_status"` // 0: inactive 1: streaming 2: playback
+ HiddenTill int `json:"hidden_till"`
+ LockTill int `json:"lock_till"`
+ Encrypted bool `json:"encrypted"`
+ PwdVerified bool `json:"pwd_verified"`
+ LiveTime int `json:"live_time"`
+ RoomShield int `json:"room_shield"`
+ AllSpecialTypes []interface{} `json:"all_special_types"`
+ PlayurlInfo interface{} `json:"playurl_info"`
+}
+
+type RoomPlayInfoResponse = BaseResponse[roomPlayInfo]
+
+func (s LiveStatus) IsStreaming() bool {
+ return s == Streaming
+}
+
+func (s LiveStatus) String() string {
+ return liveStatusStringMap[s]
+}
+
+func (b Bilibili) GetRoomPlayInfo(roomId common.RoomId) (resp RoomPlayInfoResponse, err error) {
+ url := fmt.Sprintf("https://api.live.bilibili.com/xlive/web-room/v2/index/getRoomPlayInfo"+
+ "?room_id=%d&protocol=0,1&format=0,1,2&codec=0,1&qn=0&platform=web&ptype=8&dolby=5&panorama=1", roomId)
+ return callGet[RoomPlayInfoResponse](b, url)
+}
diff --git a/bilibili/streaming.go b/bilibili/streaming.go
new file mode 100644
index 0000000..cc915a3
--- /dev/null
+++ b/bilibili/streaming.go
@@ -0,0 +1,66 @@
+package bilibili
+
+import (
+ "bilibili-livestream-archiver/common"
+ "context"
+ "fmt"
+ "io"
+ "net/http"
+ "strings"
+)
+
+// CopyLiveStream read data from a livestream video stream, copy them to a writer.
+func (b Bilibili) CopyLiveStream(
+ ctx context.Context,
+ roomId common.RoomId,
+ stream StreamingUrlInfo,
+ out io.Writer,
+) (err error) {
+ url := stream.URL
+ if !strings.HasPrefix(url, "https://") &&
+ !strings.HasPrefix(url, "http://") {
+ return fmt.Errorf("invalid URL: %v", url)
+ }
+
+ r, err := b.newGet(url)
+ if err != nil {
+ b.error.Printf("Cannot create HTTP GET instance on %v: %v", url, err)
+ return err
+ }
+
+ r.Header.Set("Referer",
+ fmt.Sprintf("https://live.bilibili.com/blanc/%d?liteVersion=true", roomId))
+
+ resp, err := b.http.Do(r)
+ if err != nil {
+ b.error.Printf("Cannot make HTTP GET request on %v: %v\n", url, err)
+ return
+ }
+
+ // 404 when not streaming
+ if resp.StatusCode == http.StatusNotFound {
+ return ErrRoomIsClosed
+ }
+
+ err = validateHttpStatus(resp)
+ if err != nil {
+ return
+ }
+
+ defer func() { _ = resp.Body.Close() }()
+
+ // guard the following copy loop
+ // if the context is cancelled, stop it by closing the reader
+ guardianCtx, cancelGuardian := context.WithCancel(ctx)
+ go func() {
+ <-guardianCtx.Done()
+ _ = resp.Body.Close()
+ }()
+ defer cancelGuardian()
+
+ // blocking copy
+ n, err := io.Copy(out, resp.Body)
+
+ b.info.Printf("Bytes copied: %v", n)
+ return
+}
diff --git a/bilibili/util.go b/bilibili/util.go
new file mode 100644
index 0000000..1267a39
--- /dev/null
+++ b/bilibili/util.go
@@ -0,0 +1,13 @@
+package bilibili
+
+import (
+ "fmt"
+ "net/http"
+)
+
+func validateHttpStatus(r *http.Response) (err error) {
+ if code := r.StatusCode; code != http.StatusOK {
+ err = fmt.Errorf("unsuccessful HTTP status on API %v: %v", r.Request.URL, code)
+ }
+ return
+}
diff --git a/common/orelse.go b/common/orelse.go
new file mode 100644
index 0000000..a96bde7
--- /dev/null
+++ b/common/orelse.go
@@ -0,0 +1,20 @@
+package common
+
+type Opt[T any] struct {
+ thing T
+ err error
+}
+
+func Optional[T any](thing T, err error) Opt[T] {
+ return Opt[T]{
+ thing: thing,
+ err: err,
+ }
+}
+
+func (o Opt[T]) OrElse(thing T) T {
+ if o.err != nil {
+ return thing
+ }
+ return o.thing
+}
diff --git a/common/retry.go b/common/retry.go
new file mode 100644
index 0000000..6b97ff3
--- /dev/null
+++ b/common/retry.go
@@ -0,0 +1,35 @@
+package common
+
+import (
+ "log"
+ "time"
+)
+
+// AutoRetry retries the supplier automatically, with given time limit and interval.
+// If maximum retry time limit is reached and the supplier still fails,
+// the last error will be returned.
+// If logger is not nil, retry information will be printed to it.
+func AutoRetry[T any](
+ supplier func() (T, error),
+ maxRetryTimes int,
+ retryInterval time.Duration,
+ logger *log.Logger) (T, error) {
+ var err error
+ for i := 0; i < maxRetryTimes; i++ {
+ ret, err := supplier()
+ if err != nil {
+ if logger != nil {
+ logger.Printf("Try %v/%v (sleep %vs): %v\n",
+ i, maxRetryTimes, retryInterval, err)
+ }
+ time.Sleep(retryInterval)
+ continue
+ }
+ // success
+ return ret, nil
+ }
+ if logger != nil {
+ logger.Printf("Max retry times reached, but it still fails. Last error: %v", err)
+ }
+ return *new(T), err
+}
diff --git a/common/types.go b/common/types.go
new file mode 100644
index 0000000..81456b4
--- /dev/null
+++ b/common/types.go
@@ -0,0 +1,3 @@
+package common
+
+type RoomId uint64
diff --git a/common/urlparse.go b/common/urlparse.go
new file mode 100644
index 0000000..dc72cee
--- /dev/null
+++ b/common/urlparse.go
@@ -0,0 +1,21 @@
+package common
+
+import (
+ "errors"
+ "net/url"
+ "strings"
+)
+
+// GetFileExtensionFromUrl
+// copied from https://elisegev.medium.com/get-a-file-extension-from-a-url-in-golang-5061d4a298a
+func GetFileExtensionFromUrl(rawUrl string) (string, error) {
+ u, err := url.Parse(rawUrl)
+ if err != nil {
+ return "", err
+ }
+ pos := strings.LastIndex(u.Path, ".")
+ if pos == -1 {
+ return "", errors.New("couldn't find a period to indicate a file extension")
+ }
+ return u.Path[pos+1 : len(u.Path)], nil
+}
diff --git a/danmaku/client.go b/danmaku/client.go
new file mode 100644
index 0000000..7745761
--- /dev/null
+++ b/danmaku/client.go
@@ -0,0 +1,136 @@
+/*
+This file implements the background WebSocket messaging channel in Bilibili webui.
+Server send livestream start and stop messages via this channel.
+Note: In this file we manage the concrete WebSocket connection.
+The Bilibili WebSocket channel protocol is decoupled and implemented in package `dmpkg`.
+*/
+package danmaku
+
+import (
+ "bilibili-livestream-archiver/common"
+ "bilibili-livestream-archiver/danmaku/dmpkg"
+ "context"
+ "fmt"
+
+ "nhooyr.io/websocket"
+)
+
+// Bilibili uses only binary WebSocket messages
+const kBilibiliWebSocketMessageType = websocket.MessageBinary
+
+type DanmakuClient struct {
+ ws *websocket.Conn
+ wsio wsDatagramIO
+}
+
+type DanmakuMessageType int
+
+// wsDatagramIO wraps websocket into a datagram I/O,
+// since Bilibili uses only binary data,
+// which is effectively a datagram communication.
+type wsDatagramIO struct {
+ ws *websocket.Conn
+ ctx context.Context
+}
+
+func (w *wsDatagramIO) Consume(data []byte) error {
+ return w.ws.Write(w.ctx, kBilibiliWebSocketMessageType, data)
+}
+
+func (w *wsDatagramIO) Get() (data []byte, err error) {
+ typ, data, err := w.ws.Read(w.ctx)
+ if err != nil {
+ return
+ }
+ if typ != kBilibiliWebSocketMessageType {
+ err = fmt.Errorf("invalid message type: expected a binary WebSocket message, however got %v", typ.String())
+ }
+ return
+}
+
+func NewDanmakuClient() DanmakuClient {
+ return DanmakuClient{
+ ws: nil,
+ }
+}
+
+func (d *DanmakuClient) Connect(ctx context.Context, url string) error {
+ // thread unsafe
+
+ // dial
+ if d.ws != nil {
+ return fmt.Errorf("already connected")
+ }
+ ws, _, err := websocket.Dial(ctx, url, nil)
+ if err != nil {
+ return fmt.Errorf("failed to establish WebSocket connection: %w", err)
+ }
+ d.ws = ws
+
+ // init wsio
+ d.wsio = wsDatagramIO{
+ ws: ws,
+ ctx: ctx,
+ }
+
+ return nil
+}
+
+func (d *DanmakuClient) Disconnect() error {
+ // thread unsafe
+ ws := d.ws
+ if ws == nil {
+ return nil
+ }
+ d.ws = nil
+ d.wsio = wsDatagramIO{}
+ return ws.Close(websocket.StatusInternalError, "disconnected")
+}
+
+func (d *DanmakuClient) Authenticate(roomId common.RoomId, authKey string) error {
+ pkg := dmpkg.NewAuth(dmpkg.ProtoPlainJson, roomId, authKey)
+ data, err := pkg.Marshal()
+ if err != nil {
+ return fmt.Errorf("exchange marshal failed: %w", err)
+ }
+ err = d.wsio.Consume(data)
+ if err != nil {
+ return fmt.Errorf("channel write failed: %w", err)
+ }
+ // read server response
+ resp, err := d.wsio.Get()
+ if err != nil {
+ return err
+ }
+ respEx, err := dmpkg.DecodeExchange(resp)
+ if err != nil {
+ return fmt.Errorf("server danmaku exchange decode error: %w", err)
+ }
+ ok, err := dmpkg.IsAuthOk(respEx)
+ if !ok {
+ return fmt.Errorf("danmaku auth failed: %w", err)
+ }
+ return nil
+}
+
+func (d *DanmakuClient) Heartbeat() error {
+ pkg := dmpkg.NewPing()
+ data, err := pkg.Marshal()
+ if err != nil {
+ return fmt.Errorf("exchange marshal failed: %w", err)
+ }
+ err = d.wsio.Consume(data)
+ if err != nil {
+ return fmt.Errorf("channel write failed: %w", err)
+ }
+ return nil
+}
+
+// ReadExchange read and decode some kind of exchanges which we are interested
+func (d *DanmakuClient) ReadExchange() (dmpkg.DanmakuExchange, error) {
+ data, err := d.wsio.Get()
+ if err != nil {
+ return dmpkg.DanmakuExchange{}, fmt.Errorf("failed to read danmaku datagram from server: %w", err)
+ }
+ return dmpkg.DecodeExchange(data)
+}
diff --git a/danmaku/dmpkg/auth.go b/danmaku/dmpkg/auth.go
new file mode 100644
index 0000000..5a205ad
--- /dev/null
+++ b/danmaku/dmpkg/auth.go
@@ -0,0 +1,52 @@
+/*
+This file implements the auth exchange.
+When Bilibili live client established the WebSocket connection successfully,
+it sends this message at first. The server then responses a OpConnectOk exchange with body `{"code":0}` which indicates success.
+*/
+package dmpkg
+
+import (
+ "bilibili-livestream-archiver/common"
+ "encoding/json"
+ "fmt"
+)
+
+type authInfo struct {
+ UID uint64 `json:"uid"`
+ RoomId uint64 `json:"roomid"`
+ ProtoVer int `json:"protover"`
+ Platform string `json:"platform"`
+ Type int `json:"type"`
+ Key string `json:"key"`
+}
+
+// NewAuth creates a new authentication exchange.
+func NewAuth(protocol ProtocolVer, roomId common.RoomId, authKey string) (exc DanmakuExchange) {
+ exc, _ = NewPlainExchange(OpConnect, authInfo{
+ UID: kUidGuest,
+ RoomId: uint64(roomId),
+ ProtoVer: int(protocol),
+ Platform: kPlatformWeb,
+ Type: kAuthTypeDefault,
+ Key: authKey,
+ })
+ return
+}
+
+func IsAuthOk(serverResponse DanmakuExchange) (bool, error) {
+ if op := serverResponse.Operation; op != OpConnectOk {
+ return false, fmt.Errorf("server operation is not OpConnectOk: %w", op)
+ }
+ var body struct {
+ Code int `json:"code"`
+ }
+ body.Code = 1
+ err := json.Unmarshal(serverResponse.Body, &body)
+ if err != nil {
+ return false, fmt.Errorf("JSON decode error: %w", err)
+ }
+ if c := body.Code; c != 0 {
+ return false, fmt.Errorf("server response code is non-zero: %w", c)
+ }
+ return true, nil
+}
diff --git a/danmaku/dmpkg/decode.go b/danmaku/dmpkg/decode.go
new file mode 100644
index 0000000..7d9f796
--- /dev/null
+++ b/danmaku/dmpkg/decode.go
@@ -0,0 +1,44 @@
+package dmpkg
+
+import (
+ "bytes"
+ "fmt"
+ "github.com/lunixbochs/struc"
+)
+
+func DecodeExchange(data []byte) (exc DanmakuExchange, err error) {
+ if ln := len(data); ln < kHeaderLength {
+ err = fmt.Errorf("incomplete datagram: length = %v < %v", ln, kHeaderLength)
+ return
+ }
+
+ // unpack header
+ var exchangeHeader DanmakuExchangeHeader
+ err = struc.Unpack(bytes.NewReader(data[:kHeaderLength]), &exchangeHeader)
+ if err != nil {
+ err = fmt.Errorf("cannot unpack exchange header: %w", err)
+ return
+ }
+ headerLength := exchangeHeader.HeaderLength
+
+ // validate header length, fail fast if not match
+ if headerLength != kHeaderLength {
+ err = fmt.Errorf("invalid header length, "+
+ "the protocol implementation might be obsolete: %v != %v", headerLength, kHeaderLength)
+ return
+ }
+
+ // special process
+ // TODO decouple this
+ // The server OpHeartbeatAck contains an extra 4-bytes header entry in the body, maybe a heat value
+ var body []byte
+ // copy body
+ body = make([]byte, exchangeHeader.Length-uint32(headerLength))
+ copy(body, data[headerLength:])
+
+ exc = DanmakuExchange{
+ DanmakuExchangeHeader: exchangeHeader,
+ Body: body,
+ }
+ return
+}
diff --git a/danmaku/dmpkg/frameio.go b/danmaku/dmpkg/frameio.go
new file mode 100644
index 0000000..2e7693a
--- /dev/null
+++ b/danmaku/dmpkg/frameio.go
@@ -0,0 +1,15 @@
+/*
+Since Bilibili only uses WebSocket binary message,
+the transportation is effectively a datagram channel.
+So we use a consumer-supplier abstraction to decouple
+the real protocol with WebSocket stuff.
+*/
+package dmpkg
+
+type Consumer[T any] interface {
+ Consume(value T) error
+}
+
+type Supplier[T any] interface {
+ Get() (T, error)
+}
diff --git a/danmaku/dmpkg/operation.go b/danmaku/dmpkg/operation.go
new file mode 100644
index 0000000..d3c13f4
--- /dev/null
+++ b/danmaku/dmpkg/operation.go
@@ -0,0 +1,28 @@
+package dmpkg
+
+type Operation uint32
+
+// Operations
+const (
+ OpHeartbeat Operation = 2
+ OpHeartbeatAck Operation = 3
+ OpLayer7Data Operation = 5
+ OpConnect Operation = 7
+ OpConnectOk Operation = 8
+)
+
+var opStringMap = map[Operation]string{
+ OpHeartbeat: "HeartBeat",
+ OpHeartbeatAck: "HeartBeatAck",
+ OpLayer7Data: "AppData",
+ OpConnect: "Connect",
+ OpConnectOk: "ConnectOk",
+}
+
+func (o Operation) String() string {
+ s, exists := opStringMap[o]
+ if !exists {
+ return "<Unknown>"
+ }
+ return s
+}
diff --git a/danmaku/dmpkg/package.go b/danmaku/dmpkg/package.go
new file mode 100644
index 0000000..3175a18
--- /dev/null
+++ b/danmaku/dmpkg/package.go
@@ -0,0 +1,111 @@
+package dmpkg
+
+import (
+ "bytes"
+ "compress/zlib"
+ "fmt"
+ "github.com/andybalholm/brotli"
+ "github.com/lunixbochs/struc"
+ "io"
+)
+
+type DanmakuExchangeHeader struct {
+ // Length total remaining bytes of this exchange, excluding `Length` itself
+ Length uint32
+ // HeaderLength = Length - len(Body) + 4, always equals to 16
+ HeaderLength uint16
+ ProtocolVer ProtocolVer
+ Operation Operation
+ // SequenceId is always 1
+ SequenceId uint32
+}
+
+// DanmakuExchange represents an actual message sent from client or server. This is an atomic unit.
+type DanmakuExchange struct {
+ DanmakuExchangeHeader
+ Body []byte
+}
+
+func (e *DanmakuExchange) String() string {
+ return fmt.Sprintf("DanmakuExchange{length=%v, protocol=%v, operation=%v, body=%v}",
+ e.Length, e.ProtocolVer, e.Operation, e.Body)
+}
+
+const kHeaderLength = 16
+const kSequenceId = 1
+
+type ProtocolVer uint16
+
+const (
+ // ProtoPlainJson the body is plain JSON text
+ ProtoPlainJson ProtocolVer = 0
+ // ProtoMinimal the body is uint32 watcher count (big endian)
+ ProtoMinimal ProtocolVer = 1
+ // ProtoZlib the body is a zlib compressed package
+ ProtoZlib ProtocolVer = 2
+ // ProtoBrotli the body is a brotli compressed package
+ ProtoBrotli ProtocolVer = 3
+)
+
+const kUidGuest = 0
+const kPlatformWeb = "web"
+const kAuthTypeDefault = 2 // magic number, not sure what does it mean
+
+func (e *DanmakuExchange) Marshal() (data []byte, err error) {
+ var buffer bytes.Buffer
+ // only unpack header with struc, since it does not support indirect variable field length calculation
+ err = struc.Pack(&buffer, &e.DanmakuExchangeHeader)
+ if err != nil {
+ err = fmt.Errorf("cannot pack an exchange into binary form: %w", err)
+ return
+ }
+ data = buffer.Bytes()
+ data = append(data, e.Body...)
+ return
+}
+
+// Inflate decompresses the body if it is compressed
+func (e *DanmakuExchange) Inflate() (ret DanmakuExchange, err error) {
+ switch e.ProtocolVer {
+ case ProtoMinimal:
+ fallthrough
+ case ProtoPlainJson:
+ ret = *e
+ case ProtoBrotli:
+ var data []byte
+ rd := brotli.NewReader(bytes.NewReader(e.Body))
+ data, err = io.ReadAll(rd)
+ if err != nil {
+ err = fmt.Errorf("cannot decompress exchange body: %w", err)
+ return
+ }
+ var nestedExchange DanmakuExchange
+ nestedExchange, err = DecodeExchange(data)
+ if err != nil {
+ err = fmt.Errorf("cannot decode nested exchange: %w", err)
+ return
+ }
+ return nestedExchange.Inflate()
+ case ProtoZlib:
+ var data []byte
+ var rd io.ReadCloser
+ rd, err = zlib.NewReader(bytes.NewReader(e.Body))
+ if err != nil {
+ err = fmt.Errorf("cannot create zlib reader: %w", err)
+ return
+ }
+ data, err = io.ReadAll(rd)
+ if err != nil {
+ err = fmt.Errorf("cannot decompress exchange body: %w", err)
+ return
+ }
+ var nestedExchange DanmakuExchange
+ nestedExchange, err = DecodeExchange(data)
+ if err != nil {
+ err = fmt.Errorf("cannot decode nested exchange: %w", err)
+ return
+ }
+ return nestedExchange.Inflate()
+ }
+ return
+}
diff --git a/danmaku/dmpkg/ping.go b/danmaku/dmpkg/ping.go
new file mode 100644
index 0000000..4596d49
--- /dev/null
+++ b/danmaku/dmpkg/ping.go
@@ -0,0 +1,8 @@
+package dmpkg
+
+// NewPing construct a new PING exahange.
+func NewPing() (exc DanmakuExchange) {
+ // compliant with Bilibili webapp behavior
+ exc, _ = NewPlainExchange(OpHeartbeat, "[object Object]")
+ return
+}
diff --git a/danmaku/dmpkg/raw.go b/danmaku/dmpkg/raw.go
new file mode 100644
index 0000000..1cbd12e
--- /dev/null
+++ b/danmaku/dmpkg/raw.go
@@ -0,0 +1,47 @@
+package dmpkg
+
+import (
+ "encoding/json"
+ "fmt"
+ "math"
+)
+
+const kMaxBodyLength = math.MaxUint32 - kHeaderLength
+
+// NewPlainExchange creates a new exchange with raw body specified.
+// body: a struct or a raw string
+func NewPlainExchange(operation Operation, body interface{}) (exc DanmakuExchange, err error) {
+ var bodyData []byte
+
+ // convert body to []byte
+ if _, ok := body.(string); ok {
+ // a string
+ bodyData = []byte(body.(string))
+ } else if _, ok := body.([]byte); ok {
+ // a []byte
+ copy(bodyData, body.([]byte))
+ } else {
+ // a JSON struct
+ bodyData, err = json.Marshal(body)
+ if err != nil {
+ return
+ }
+ }
+
+ length := uint64(kHeaderLength + len(bodyData))
+ if length > kMaxBodyLength {
+ err = fmt.Errorf("body is too large (> %d)", kMaxBodyLength)
+ return
+ }
+ exc = DanmakuExchange{
+ DanmakuExchangeHeader: DanmakuExchangeHeader{
+ Length: uint32(length),
+ HeaderLength: kHeaderLength,
+ ProtocolVer: ProtoPlainJson,
+ Operation: operation,
+ SequenceId: kSequenceId,
+ },
+ Body: bodyData,
+ }
+ return
+}
diff --git a/go.mod b/go.mod
new file mode 100644
index 0000000..b183d34
--- /dev/null
+++ b/go.mod
@@ -0,0 +1,21 @@
+module bilibili-livestream-archiver
+
+go 1.18
+
+require (
+ github.com/andybalholm/brotli v1.0.4
+ github.com/lunixbochs/struc v0.0.0-20200707160740-784aaebc1d40
+ nhooyr.io/websocket v1.8.7
+)
+
+require (
+ github.com/golang/protobuf v1.5.2 // indirect
+ github.com/google/go-cmp v0.5.8 // indirect
+ github.com/json-iterator/go v1.1.12 // indirect
+ github.com/klauspost/compress v1.10.3 // indirect
+ github.com/mattn/go-isatty v0.0.14 // indirect
+ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
+ golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect
+ google.golang.org/protobuf v1.28.0 // indirect
+ gopkg.in/yaml.v2 v2.4.0 // indirect
+)
diff --git a/go.sum b/go.sum
new file mode 100644
index 0000000..9220e21
--- /dev/null
+++ b/go.sum
@@ -0,0 +1,78 @@
+github.com/andybalholm/brotli v1.0.4 h1:V7DdXeJtZscaqfNuAdSRuRFzuiKlHSC/Zh3zl9qY3JY=
+github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
+github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE=
+github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI=
+github.com/gin-gonic/gin v1.6.3 h1:ahKqKTFpO5KTPHxWZjEdPScmYaGtLo8Y4DMHoEsnp14=
+github.com/gin-gonic/gin v1.6.3/go.mod h1:75u5sXoLsGZoRN5Sgbi1eraJ4GU3++wFwWzhwvtwp4M=
+github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4=
+github.com/go-playground/locales v0.13.0 h1:HyWk6mgj5qFqCT5fjGBuRArbVDfE4hi8+e8ceBS/t7Q=
+github.com/go-playground/locales v0.13.0/go.mod h1:taPMhCMXrRLJO55olJkUXHZBHCxTMfnGwq/HNwmWNS8=
+github.com/go-playground/universal-translator v0.17.0 h1:icxd5fm+REJzpZx7ZfpaD876Lmtgy7VtROAbHHXk8no=
+github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+Scu5vgOQjsIJAF8j9muTVoKLVtA=
+github.com/go-playground/validator/v10 v10.2.0 h1:KgJ0snyC2R9VXYN2rneOtQcw5aHQB1Vv0sFl1UcHBOY=
+github.com/go-playground/validator/v10 v10.2.0/go.mod h1:uOYAAleCW8F/7oMFd6aG0GOhaH6EGOAJShg8Id5JGkI=
+github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee h1:s+21KNqlpePfkah2I+gwHF8xmJWRjooY+5248k6m4A0=
+github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee/go.mod h1:L0fX3K22YWvt/FAX9NnzrNzcI4wNYi9Yku4O0LKYflo=
+github.com/gobwas/pool v0.2.0 h1:QEmUOlnSjWtnpRGHF3SauEiOsy82Cup83Vf2LcMlnc8=
+github.com/gobwas/pool v0.2.0/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw=
+github.com/gobwas/ws v1.0.2 h1:CoAavW/wd/kulfZmSIBt6p24n4j7tHgNVCjsfHVNUbo=
+github.com/gobwas/ws v1.0.2/go.mod h1:szmBTxLgaFppYjEmNtny/v3w89xOydFnnZMcgRRu/EM=
+github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw=
+github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk=
+github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
+github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
+github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
+github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg=
+github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
+github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
+github.com/gorilla/websocket v1.4.1 h1:q7AeDBpnBk8AogcD4DSag/Ukw/KV+YhzLj2bP5HvKCM=
+github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
+github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
+github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
+github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
+github.com/klauspost/compress v1.10.3 h1:OP96hzwJVBIHYU52pVTI6CczrxPvrGfgqF9N5eTO0Q8=
+github.com/klauspost/compress v1.10.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
+github.com/leodido/go-urn v1.2.0 h1:hpXL4XnriNwQ/ABnpepYM/1vCLWNDfUNts8dX3xTG6Y=
+github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII=
+github.com/lunixbochs/struc v0.0.0-20200707160740-784aaebc1d40 h1:EnfXoSqDfSNJv0VBNqY/88RNnhSGYkrHaO0mmFGbVsc=
+github.com/lunixbochs/struc v0.0.0-20200707160740-784aaebc1d40/go.mod h1:vy1vK6wD6j7xX6O6hXe621WabdtNkou2h7uRtTfRMyg=
+github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
+github.com/mattn/go-isatty v0.0.14 h1:yVuAays6BHfxijgZPzw+3Zlu5yQgKGP2/hcQbHb7S9Y=
+github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
+github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
+github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
+github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
+github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
+github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
+github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
+github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
+github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo=
+github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=
+github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs=
+github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
+golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a h1:dGzPydgVsqGcTRVwiLJ1jVbufYwmzD3LfVPLKsKg+0k=
+golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
+golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
+golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
+google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
+google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw=
+google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
+gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
+nhooyr.io/websocket v1.8.7 h1:usjR2uOr/zjjkVMy0lW+PPohFok7PCow5sDjLgX4P4g=
+nhooyr.io/websocket v1.8.7/go.mod h1:B70DZP8IakI65RVQ51MsWP/8jndNma26DVA/nFSCgW0=
diff --git a/main.go b/main.go
new file mode 100644
index 0000000..1cf56ad
--- /dev/null
+++ b/main.go
@@ -0,0 +1,30 @@
+package main
+
+import (
+ "bilibili-livestream-archiver/recording"
+ "context"
+ "fmt"
+)
+
+func main() {
+ task := recording.TaskConfig{
+ RoomId: 7777,
+ Transport: recording.TransportConfig{
+ SocketTimeoutSeconds: 10,
+ RetryIntervalSeconds: 5,
+ MaxRetryTimes: 5,
+ },
+ Download: recording.DownloadConfig{
+ SaveDirectory: ".",
+ FileNameTemplate: "",
+ },
+ }
+ chResult := make(chan recording.TaskResult)
+ go recording.RunTask(
+ context.Background(),
+ &task,
+ chResult,
+ )
+ result := <-chResult
+ fmt.Println(result.Error)
+}
diff --git a/recording/config.go b/recording/config.go
new file mode 100644
index 0000000..1a24508
--- /dev/null
+++ b/recording/config.go
@@ -0,0 +1,20 @@
+package recording
+
+import "bilibili-livestream-archiver/common"
+
+type TaskConfig struct {
+ RoomId common.RoomId `mapstructure:"room_id"`
+ Transport TransportConfig `mapstructure:"transport"`
+ Download DownloadConfig `mapstructure:"download"`
+}
+
+type TransportConfig struct {
+ SocketTimeoutSeconds int `mapstructure:"socket_timeout_seconds"`
+ RetryIntervalSeconds int `mapstructure:"retry_interval_seconds"`
+ MaxRetryTimes int `mapstructure:"max_retry_times"`
+}
+
+type DownloadConfig struct {
+ SaveDirectory string `mapstructure:"save_directory"`
+ FileNameTemplate string `mapstructure:"file_name_template"`
+}
diff --git a/recording/runner.go b/recording/runner.go
new file mode 100644
index 0000000..cee4325
--- /dev/null
+++ b/recording/runner.go
@@ -0,0 +1,252 @@
+/*
+This file contains task runner.
+Task runner composes status monitor and stream downloader concrete task config.
+The config can be load from a config file.
+*/
+package recording
+
+import (
+ "bilibili-livestream-archiver/bilibili"
+ "bilibili-livestream-archiver/common"
+ "context"
+ "encoding/json"
+ "fmt"
+ "log"
+ "os"
+ "path"
+ "time"
+)
+
+// TaskResult represents an execution result of a task.
+type TaskResult struct {
+ Task *TaskConfig
+ Error error
+}
+
+// RunTask start a monitor&download task and
+// put its execution result into a channel.
+func RunTask(ctx context.Context, task *TaskConfig, chTaskResult chan<- TaskResult) {
+ err := doTask(ctx, task)
+ chTaskResult <- TaskResult{
+ Task: task,
+ Error: err,
+ }
+}
+
+// doTask do the actual work, but returns synchronously.
+func doTask(ctx context.Context, task *TaskConfig) error {
+ logger := log.Default()
+ bi := bilibili.NewBilibili()
+ logger.Printf("Start task: room %v\n", task.RoomId)
+
+ authKey, url, err := getStreamingServer(task, logger, bi)
+ if err != nil {
+ return err
+ }
+
+ // run live status watcher asynchronously
+ logger.Println("Starting watcher...")
+ chWatcherEvent := make(chan WatcherEvent)
+ chWatcherDown := make(chan struct{})
+
+ // start and recover watcher asynchronously
+ // the watcher may also be stopped by the downloader goroutine
+ watcherCtx, stopWatcher := context.WithCancel(ctx)
+ defer stopWatcher()
+ go watcherRecoverableLoop(watcherCtx, url, authKey, task, bi, chWatcherEvent, chWatcherDown)
+
+ // The stream download goroutine may fail due to wrong watcher state.
+ // But this is likely temporarily, so we should restart the downloader
+ // until the state turns to closed.
+
+ // We store the last modified live status
+ // in case there is a false-positive duplicate.
+ lastStatusIsLiving := false
+ recorderCtx, stopRecorder := context.WithCancel(ctx)
+ defer stopRecorder()
+ for {
+ select {
+ case <-ctx.Done():
+ logger.Printf("Task (room %v) is stopped.\n", task.RoomId)
+ return nil
+ case <-chWatcherDown:
+ // watcher is down and unrecoverable, stop this task
+ return fmt.Errorf("task (room %v) stopped: watcher is down and unrecoverable", task.RoomId)
+ case ev := <-chWatcherEvent:
+ switch ev {
+ case WatcherLiveStart:
+ if lastStatusIsLiving {
+ logger.Println("Duplicate adjacent WatcherLiveStart event. Ignoring.")
+ continue
+ }
+ go func() {
+ cancelled := false
+ // restart recorder if interrupted by I/O errors
+ for !cancelled {
+ cancelled = record(recorderCtx, bi, task)
+ }
+ logger.Printf("Task is cancelled. (room %v)\n", task.RoomId)
+ }()
+ lastStatusIsLiving = true
+ case WatcherLiveStop:
+ lastStatusIsLiving = false
+ }
+ }
+ }
+}
+
+// record. When cancelled, the caller should clean up immediately and stop the task.
+func record(
+ ctx context.Context,
+ bi bilibili.Bilibili,
+ task *TaskConfig,
+) (cancelled bool) {
+ logger := log.Default()
+ logger.Printf("INFO: Getting room profile...\n")
+
+ profile, err := common.AutoRetry(
+ func() (bilibili.RoomProfileResponse, error) {
+ return bi.GetRoomProfile(task.RoomId)
+ },
+ task.Transport.MaxRetryTimes,
+ time.Duration(task.Transport.RetryIntervalSeconds)*time.Second,
+ logger,
+ )
+ if err != nil {
+ // still error, abort
+ logger.Printf("ERROR: Cannot get room information: %v. Stopping current task.\n", err)
+ cancelled = true
+ return
+ }
+
+ urlInfo, err := common.AutoRetry(
+ func() (bilibili.RoomUrlInfoResponse, error) {
+ return bi.GetStreamingInfo(task.RoomId)
+ },
+ task.Transport.MaxRetryTimes,
+ time.Duration(task.Transport.RetryIntervalSeconds)*time.Second,
+ logger,
+ )
+ if err != nil {
+ logger.Printf("ERROR: Cannot get streaming info: %v", err)
+ cancelled = true
+ return
+ }
+ if len(urlInfo.Data.URLs) == 0 {
+ j, err := json.Marshal(urlInfo)
+ if err != nil {
+ j = []byte("(not available)")
+ }
+ logger.Printf("ERROR: No stream returned from API. Response: %v", string(j))
+ cancelled = true
+ return
+ }
+ streamSource := urlInfo.Data.URLs[0]
+
+ fileName := fmt.Sprintf(
+ "%s.%s",
+ GenerateFileName(profile.Data.Title, time.Now()),
+ common.Optional[string](common.GetFileExtensionFromUrl(streamSource.URL)).OrElse("flv"),
+ )
+ filePath := path.Join(task.Download.SaveDirectory, fileName)
+ file, err := os.OpenFile(filePath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644)
+ defer func() { _ = file.Close() }()
+ if err != nil {
+ logger.Printf("ERROR: Cannot open file for writing: %v", err)
+ cancelled = true
+ return
+ }
+
+ logger.Printf("Recording live stream to file \"%v\"...", filePath)
+ err = bi.CopyLiveStream(ctx, task.RoomId, streamSource, file)
+ cancelled = false
+ return
+}
+
+// watcherRecoverableLoop run watcher forever until the context is cancelled.
+func watcherRecoverableLoop(
+ ctx context.Context,
+ url string,
+ authKey string,
+ task *TaskConfig,
+ bi bilibili.Bilibili,
+ chWatcherEvent chan WatcherEvent,
+ chWatcherDown chan<- struct{},
+) {
+ logger := log.Default()
+
+ for {
+ err, errReason := watch(
+ ctx,
+ url,
+ authKey,
+ task.RoomId,
+ func() (bool, error) {
+ resp, err := bi.GetRoomPlayInfo(task.RoomId)
+ if err != nil {
+ return false, err
+ }
+ if resp.Code != 0 {
+ return false, fmt.Errorf("bilibili API error: %v", resp.Message)
+ }
+ return resp.Data.LiveStatus.IsStreaming(), nil
+ },
+ chWatcherEvent,
+ )
+
+ switch errReason {
+ case ErrSuccess:
+ // stop normally, the context is closed
+ return
+ case ErrProtocol:
+ logger.Printf("FATAL: Watcher stopped due to an unrecoverable error: %v\n", err)
+ // shutdown the whole task
+ chWatcherDown <- struct{}{}
+ return
+ case ErrTransport:
+ logger.Printf("ERROR: Watcher stopped due to an I/O error: %v\n", err)
+ waitSeconds := task.Transport.RetryIntervalSeconds
+ logger.Printf(
+ "WARNING: Sleep for %v second(s) before restarting watcher.\n",
+ waitSeconds,
+ )
+ time.Sleep(time.Duration(waitSeconds) * time.Second)
+ logger.Printf("Retrying...")
+ }
+ }
+}
+
+func getStreamingServer(
+ task *TaskConfig,
+ logger *log.Logger,
+ bi bilibili.Bilibili,
+) (string, string, error) {
+ logger.Println("Getting stream server info...")
+ dmInfo, err := bi.GetDanmakuServerInfo(task.RoomId)
+ if err != nil {
+ return "", "", fmt.Errorf("failed to read stream server info: %w", err)
+ }
+ if len(dmInfo.Data.HostList) == 0 {
+ return "", "", fmt.Errorf("no available stream server")
+ }
+ logger.Println("Success.")
+
+ // get authkey and ws url
+ authKey := dmInfo.Data.Token
+ host := dmInfo.Data.HostList[0]
+ url := fmt.Sprintf("wss://%s:%d/sub", host.Host, host.WssPort)
+ return authKey, url, nil
+}
+
+func GenerateFileName(roomName string, t time.Time) string {
+ ts := fmt.Sprintf(
+ "%d-%02d-%02d-%02d-%02d-%02d",
+ t.Year(),
+ t.Month(),
+ t.Day(),
+ t.Hour(),
+ t.Minute(),
+ t.Second(),
+ )
+ return fmt.Sprintf("%s_%s", roomName, ts)
+}
diff --git a/recording/watcher.go b/recording/watcher.go
new file mode 100644
index 0000000..439ffcb
--- /dev/null
+++ b/recording/watcher.go
@@ -0,0 +1,156 @@
+package recording
+
+import (
+ "bilibili-livestream-archiver/common"
+ "bilibili-livestream-archiver/danmaku"
+ "bilibili-livestream-archiver/danmaku/dmpkg"
+ "context"
+ "encoding/json"
+ "fmt"
+ "log"
+ "time"
+)
+
+type WatcherEvent int
+
+const (
+ WatcherLiveStart WatcherEvent = 0
+ WatcherLiveStop WatcherEvent = 1
+)
+
+type liveCommand string
+
+const (
+ CommandLiveStart = "LIVE"
+ CommandStreamPreparing = "PREPARING"
+)
+
+type liveInfo struct {
+ Command liveCommand `json:"cmd"`
+}
+
+type ErrorReason int
+
+const (
+ ErrSuccess ErrorReason = iota // no error happens, normally closed
+ ErrTransport // I/O error, safe to retry
+ ErrProtocol // application protocol logic error, do not retry
+)
+
+const (
+ kHeartBeatInterval = 30 * time.Second
+)
+
+// watch monitors live room status by subscribing messages from Bilibili danmaku server,
+// which talks to the client via a WebSocket or TCP connection.
+// In our implementation, we use WebSocket over SSL/TLS.
+func watch(
+ ctx context.Context,
+ url string,
+ authKey string,
+ roomId common.RoomId,
+ liveStatusChecker func() (bool, error),
+ chEvent chan<- WatcherEvent,
+) (error, ErrorReason) {
+
+ logger := log.Default()
+
+ var err error
+
+ dm := danmaku.NewDanmakuClient()
+ defer func() { _ = dm.Disconnect() }()
+
+ // connect to danmaku server for live online/offline notifications
+ err = dm.Connect(ctx, url)
+ if err != nil {
+ return fmt.Errorf("failed to connect to danmaku server: %w", err), ErrTransport
+ }
+ defer func() { _ = dm.Disconnect() }()
+
+ // the danmaku server requires an auth token and room id when connected
+ logger.Println("ws connected. Authenticating...")
+ err = dm.Authenticate(roomId, authKey)
+ if err != nil {
+ return fmt.Errorf("auth failed: %w", err), ErrProtocol
+ }
+
+ // the danmaku server requires heartbeat messages every 30 seconds
+ heartbeat := func() error {
+ err := dm.Heartbeat()
+ return err
+ }
+
+ // send initial heartbeat immediately
+ err = heartbeat()
+ if err != nil {
+ return err, ErrTransport
+ }
+
+ // create heartbeat timer
+ heartBeatTimer := time.NewTicker(kHeartBeatInterval)
+ defer func() { heartBeatTimer.Stop() }()
+
+ logger.Println("Checking initial live status...")
+ isLiving, err := liveStatusChecker()
+ if err != nil {
+ return fmt.Errorf("check initial live status failed: %w", err), ErrTransport
+ }
+
+ if isLiving {
+ logger.Println("The live is already started. Start recording immediately.")
+ chEvent <- WatcherLiveStart
+ } else {
+ logger.Println("The live is not started yet. Waiting...")
+ }
+
+ for {
+ select {
+ case <-ctx.Done():
+ return nil, ErrSuccess
+ case <-heartBeatTimer.C:
+ err = heartbeat()
+ if err != nil {
+ return fmt.Errorf("heartbeat failed: %w", err), ErrTransport
+ }
+ default:
+ var msg dmpkg.DanmakuExchange
+ msg, err = dm.ReadExchange()
+ if err != nil {
+ return fmt.Errorf("exchange read failed: %w", err), ErrTransport
+ }
+ // the exchange may be compressed
+ msg, err = msg.Inflate()
+ if err != nil {
+ return fmt.Errorf("inflate server message failed: %v", err), ErrProtocol
+ }
+
+ switch msg.Operation {
+ case dmpkg.OpLayer7Data:
+ logger.Printf("server message: op %v, body %v\n", msg.Operation, string(msg.Body))
+ var info liveInfo
+ err := json.Unmarshal(msg.Body, &info)
+ if err != nil {
+ logger.Printf("ERROR: invalid JSON: \"%v\", exchange: %v", string(msg.Body), msg)
+ return fmt.Errorf("decode server message body JSON failed: %w", err), ErrProtocol
+ }
+ switch info.Command {
+ case CommandLiveStart:
+ if !isLiving {
+ chEvent <- WatcherLiveStart
+ isLiving = true
+ }
+ case CommandStreamPreparing:
+ if isLiving {
+ chEvent <- WatcherLiveStop
+ }
+ default:
+ logger.Printf("Ignoring server message %v %v %v\n",
+ info.Command, msg.Operation, string(msg.Body))
+ }
+ default:
+ logger.Printf("Server message: %v\n", msg.String())
+ }
+
+ }
+ }
+}