From 8e15d802865ed57db0018c15ea5559c8bd44c01f Mon Sep 17 00:00:00 2001 From: Keuin Date: Wed, 7 Sep 2022 02:48:46 +0800 Subject: First working version. Just a POC. --- README.md | 29 +++++ bilibili/client.go | 45 +++++++ bilibili/danmaku_server_info.go | 28 +++++ bilibili/errors.go | 7 ++ bilibili/logging.go | 13 +++ bilibili/model.go | 8 ++ bilibili/play_url.go | 35 ++++++ bilibili/request.go | 68 +++++++++++ bilibili/room_profile.go | 86 ++++++++++++++ bilibili/room_status.go | 58 +++++++++ bilibili/streaming.go | 66 +++++++++++ bilibili/util.go | 13 +++ common/orelse.go | 20 ++++ common/retry.go | 35 ++++++ common/types.go | 3 + common/urlparse.go | 21 ++++ danmaku/client.go | 136 ++++++++++++++++++++++ danmaku/dmpkg/auth.go | 52 +++++++++ danmaku/dmpkg/decode.go | 44 +++++++ danmaku/dmpkg/frameio.go | 15 +++ danmaku/dmpkg/operation.go | 28 +++++ danmaku/dmpkg/package.go | 111 ++++++++++++++++++ danmaku/dmpkg/ping.go | 8 ++ danmaku/dmpkg/raw.go | 47 ++++++++ go.mod | 21 ++++ go.sum | 78 +++++++++++++ main.go | 30 +++++ recording/config.go | 20 ++++ recording/runner.go | 252 ++++++++++++++++++++++++++++++++++++++++ recording/watcher.go | 156 +++++++++++++++++++++++++ 30 files changed, 1533 insertions(+) create mode 100644 README.md create mode 100644 bilibili/client.go create mode 100644 bilibili/danmaku_server_info.go create mode 100644 bilibili/errors.go create mode 100644 bilibili/logging.go create mode 100644 bilibili/model.go create mode 100644 bilibili/play_url.go create mode 100644 bilibili/request.go create mode 100644 bilibili/room_profile.go create mode 100644 bilibili/room_status.go create mode 100644 bilibili/streaming.go create mode 100644 bilibili/util.go create mode 100644 common/orelse.go create mode 100644 common/retry.go create mode 100644 common/types.go create mode 100644 common/urlparse.go create mode 100644 danmaku/client.go create mode 100644 danmaku/dmpkg/auth.go create mode 100644 danmaku/dmpkg/decode.go create mode 100644 danmaku/dmpkg/frameio.go create mode 100644 danmaku/dmpkg/operation.go create mode 100644 danmaku/dmpkg/package.go create mode 100644 danmaku/dmpkg/ping.go create mode 100644 danmaku/dmpkg/raw.go create mode 100644 go.mod create mode 100644 go.sum create mode 100644 main.go create mode 100644 recording/config.go create mode 100644 recording/runner.go create mode 100644 recording/watcher.go diff --git a/README.md b/README.md new file mode 100644 index 0000000..c0f46b5 --- /dev/null +++ b/README.md @@ -0,0 +1,29 @@ +# YABR: Yet another Bilibili livestream recorder + +## 1. Features + +If you want a Bilibili recorder featured with: + +- Single executable file, just copy and run +- Friendly command-line arguments and an optional configuration file +- Save raw video streams directly, without intentional clipping +- Efficient execution +- Friendly logging to `stdout` or files + +Then you should give YABR a try. + +## 2. Why another Bilibili livestream recorder + +The other recorder sucks. So I write my own recorder, which, I hope, sucks less. + +## Reference + +https://github.com/lovelyyoshino/Bilibili-Live-API + +https://github.com/BililiveRecorder/BililiveRecorder + +https://github.com/SocialSisterYi/bilibili-API-collect + +https://github.com/hr3lxphr6j/bililive-go + +https://github.com/zyzsdy/biliroku \ No newline at end of file diff --git a/bilibili/client.go b/bilibili/client.go new file mode 100644 index 0000000..fcea395 --- /dev/null +++ b/bilibili/client.go @@ -0,0 +1,45 @@ +/* +This file defines the Bilibili client struct. +This struct is a facade of all necessary Bilibili HTTP API wrappers. +*/ +package bilibili + +import ( + "context" + "log" + "net/http" + "os" +) + +const ( + // kUserAgent: the default user-agent header to use when communicating with bilibili. + kUserAgent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " + + "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Safari/537.36" +) + +type Bilibili struct { + userAgent string + http *http.Client + loggerCommon + ctx context.Context +} + +func NewBilibiliWithContext(ctx context.Context) Bilibili { + logger := loggerCommon{ + debug: log.New(os.Stderr, "DEBUG: ", log.Ldate|log.Ltime|log.Lshortfile), + info: log.New(os.Stderr, "INFO: ", log.Ldate|log.Ltime|log.Lshortfile), + warn: log.New(os.Stderr, "WARNING: ", log.Ldate|log.Ltime|log.Lshortfile), + error: log.New(os.Stderr, "ERROR: ", log.Ldate|log.Ltime|log.Lshortfile), + } + return Bilibili{ + loggerCommon: logger, + userAgent: kUserAgent, + http: http.DefaultClient, + ctx: ctx, + } +} + +func NewBilibili() Bilibili { + ctx := context.Background() + return NewBilibiliWithContext(ctx) +} diff --git a/bilibili/danmaku_server_info.go b/bilibili/danmaku_server_info.go new file mode 100644 index 0000000..dbdf6bb --- /dev/null +++ b/bilibili/danmaku_server_info.go @@ -0,0 +1,28 @@ +package bilibili + +import ( + "bilibili-livestream-archiver/common" + "fmt" +) + +type DanmakuServerInfoResponse = BaseResponse[danmakuInfo] + +type danmakuInfo struct { + Group string `json:"group"` + BusinessID int `json:"business_id"` + RefreshRowFactor float64 `json:"refresh_row_factor"` + RefreshRate int `json:"refresh_rate"` + MaxDelay int `json:"max_delay"` + Token string `json:"token"` + HostList []struct { + Host string `json:"host"` + Port int `json:"port"` + WssPort int `json:"wss_port"` + WsPort int `json:"ws_port"` + } `json:"host_list"` +} + +func (b Bilibili) GetDanmakuServerInfo(roomId common.RoomId) (resp DanmakuServerInfoResponse, err error) { + url := fmt.Sprintf("https://api.live.bilibili.com/xlive/web-room/v1/index/getDanmuInfo?id=%d&type=0", roomId) + return callGet[DanmakuServerInfoResponse](b, url) +} diff --git a/bilibili/errors.go b/bilibili/errors.go new file mode 100644 index 0000000..3f8ac9b --- /dev/null +++ b/bilibili/errors.go @@ -0,0 +1,7 @@ +package bilibili + +import "fmt" + +var ( + ErrRoomIsClosed = fmt.Errorf("living room is closed") +) diff --git a/bilibili/logging.go b/bilibili/logging.go new file mode 100644 index 0000000..a0b9b8e --- /dev/null +++ b/bilibili/logging.go @@ -0,0 +1,13 @@ +/* +This file defines the common struct of logger pointers used in modules of this package. +*/ +package bilibili + +import "log" + +type loggerCommon struct { + debug *log.Logger + info *log.Logger + warn *log.Logger + error *log.Logger +} diff --git a/bilibili/model.go b/bilibili/model.go new file mode 100644 index 0000000..ccffef9 --- /dev/null +++ b/bilibili/model.go @@ -0,0 +1,8 @@ +package bilibili + +type BaseResponse[T any] struct { + Code int `json:"code"` + Message string `json:"message"` + TTL int `json:"ttl"` + Data T `json:"data"` +} diff --git a/bilibili/play_url.go b/bilibili/play_url.go new file mode 100644 index 0000000..282d556 --- /dev/null +++ b/bilibili/play_url.go @@ -0,0 +1,35 @@ +package bilibili + +import ( + "bilibili-livestream-archiver/common" + "fmt" +) + +type RoomUrlInfoResponse = BaseResponse[roomUrlInfo] + +type roomUrlInfo struct { + CurrentQuality int `json:"current_quality"` + AcceptQuality []string `json:"accept_quality"` + CurrentQualityNumber int `json:"current_qn"` + QualityDescription []qualityDescription `json:"quality_description"` + URLs []StreamingUrlInfo `json:"durl"` +} + +type qualityDescription struct { + QualityNumber int `json:"qn"` + Description string `json:"desc"` +} + +type StreamingUrlInfo struct { + URL string `json:"url"` + Length int `json:"length"` + Order int `json:"order"` + StreamType int `json:"stream_type"` + P2pType int `json:"p2p_type"` +} + +func (b Bilibili) GetStreamingInfo(roomId common.RoomId) (resp RoomUrlInfoResponse, err error) { + url := fmt.Sprintf("https://api.live.bilibili.com/room/v1/Room/playUrl?"+ + "cid=%d&otype=json&qn=10000&platform=web", roomId) + return callGet[RoomUrlInfoResponse](b, url) +} diff --git a/bilibili/request.go b/bilibili/request.go new file mode 100644 index 0000000..cb989a7 --- /dev/null +++ b/bilibili/request.go @@ -0,0 +1,68 @@ +package bilibili + +import ( + "encoding/json" + "io" + "log" + "net/http" + "strings" +) + +// newRequest create an HTTP request with per-instance User-Agent set. +func (b Bilibili) newRequest( + method string, + url string, + body io.Reader, +) (req *http.Request, err error) { + req, err = http.NewRequestWithContext(b.ctx, method, url, body) + if err != nil { + b.error.Printf("Cannot create HTTP request instance: %v. Method: %v, URL: %v", err, method, url) + return + } + req.Header.Set("User-Agent", b.userAgent) + return +} + +// newRequest create an HTTP GET request with an empty body and per-instance User-Agent set. +func (b Bilibili) newGet(url string) (req *http.Request, err error) { + return b.newRequest("GET", url, strings.NewReader("")) +} + +// callGet make a GET request and parse response as a JSON document with given model. +func callGet[T BaseResponse[V], V any](b Bilibili, url string) (resp T, err error) { + logger := log.Default() + req, err := b.newGet(url) + if err != nil { + logger.Printf("ERROR: Cannot create HTTP request instance on API %v: %v", url, err) + return + } + + r, err := b.http.Do(req) + defer func() { _ = r.Body.Close() }() + if err != nil { + logger.Printf("ERROR: HTTP Request failed on API %v: %v", url, err) + return + } + + err = validateHttpStatus(r) + if err != nil { + b.error.Printf("%v", err) + return + } + + data, err := io.ReadAll(r.Body) + if err != nil { + b.error.Printf("Error when reading HTTP response on API %v: %v", url, err) + return + } + + err = json.Unmarshal(data, &resp) + if err != nil { + b.error.Printf("Invalid JSON body of HTTP response on API %v: %v. Text: \"%v\"", + url, err, string(data)) + return + } + + b.debug.Printf("HTTP %v, len: %v bytes, url: %v", r.StatusCode, len(data), url) + return +} diff --git a/bilibili/room_profile.go b/bilibili/room_profile.go new file mode 100644 index 0000000..d75d447 --- /dev/null +++ b/bilibili/room_profile.go @@ -0,0 +1,86 @@ +package bilibili + +import ( + "bilibili-livestream-archiver/common" + "fmt" +) + +type roomProfile struct { + UID int `json:"uid"` + RoomID int `json:"room_id"` + ShortID int `json:"short_id"` + Attention int `json:"attention"` + Online int `json:"online"` + IsPortrait bool `json:"is_portrait"` + Description string `json:"description"` + LiveStatus int `json:"live_status"` + AreaID int `json:"area_id"` + ParentAreaID int `json:"parent_area_id"` + ParentAreaName string `json:"parent_area_name"` + OldAreaID int `json:"old_area_id"` + Background string `json:"background"` + Title string `json:"title"` + UserCover string `json:"user_cover"` + Keyframe string `json:"keyframe"` + IsStrictRoom bool `json:"is_strict_room"` + LiveTime string `json:"live_time"` + Tags string `json:"tags"` + IsAnchor int `json:"is_anchor"` + RoomSilentType string `json:"room_silent_type"` + RoomSilentLevel int `json:"room_silent_level"` + RoomSilentSecond int `json:"room_silent_second"` + AreaName string `json:"area_name"` + Pendants string `json:"pendants"` + AreaPendants string `json:"area_pendants"` + HotWords []string `json:"hot_words"` + HotWordsStatus int `json:"hot_words_status"` + Verify string `json:"verify"` + NewPendants struct { + Frame struct { + Name string `json:"name"` + Value string `json:"value"` + Position int `json:"position"` + Desc string `json:"desc"` + Area int `json:"area"` + AreaOld int `json:"area_old"` + BgColor string `json:"bg_color"` + BgPic string `json:"bg_pic"` + UseOldArea bool `json:"use_old_area"` + } `json:"frame"` + Badge struct { + Name string `json:"name"` + Position int `json:"position"` + Value string `json:"value"` + Desc string `json:"desc"` + } `json:"badge"` + MobileFrame struct { + Name string `json:"name"` + Value string `json:"value"` + Position int `json:"position"` + Desc string `json:"desc"` + Area int `json:"area"` + AreaOld int `json:"area_old"` + BgColor string `json:"bg_color"` + BgPic string `json:"bg_pic"` + UseOldArea bool `json:"use_old_area"` + } `json:"mobile_frame"` + MobileBadge interface{} `json:"mobile_badge"` + } `json:"new_pendants"` + UpSession string `json:"up_session"` + PkStatus int `json:"pk_status"` + PkID int `json:"pk_id"` + BattleID int `json:"battle_id"` + AllowChangeAreaTime int `json:"allow_change_area_time"` + AllowUploadCoverTime int `json:"allow_upload_cover_time"` + StudioInfo struct { + Status int `json:"status"` + MasterList []interface{} `json:"master_list"` + } `json:"studio_info"` +} + +type RoomProfileResponse = BaseResponse[roomProfile] + +func (b Bilibili) GetRoomProfile(roomId common.RoomId) (resp RoomProfileResponse, err error) { + url := fmt.Sprintf("https://api.live.bilibili.com/room/v1/Room/get_info?room_id=%d", roomId) + return callGet[RoomProfileResponse](b, url) +} diff --git a/bilibili/room_status.go b/bilibili/room_status.go new file mode 100644 index 0000000..6b6495b --- /dev/null +++ b/bilibili/room_status.go @@ -0,0 +1,58 @@ +/* +Get live room basic status. +This is used to check initially if it is streaming or not. +*/ +package bilibili + +import ( + "bilibili-livestream-archiver/common" + "fmt" +) + +type LiveStatus int + +const ( + Inactive LiveStatus = 0 + Streaming LiveStatus = 1 + Playback LiveStatus = 2 +) + +var liveStatusStringMap = map[LiveStatus]string{ + Inactive: "inactive", + Streaming: "streaming", + Playback: "inactive (playback)", +} + +type roomPlayInfo struct { + RoomID uint64 `json:"room_id"` + ShortID uint `json:"short_id"` + UID uint `json:"uid"` + IsHidden bool `json:"is_hidden"` + IsLocked bool `json:"is_locked"` + IsPortrait bool `json:"is_portrait"` + LiveStatus LiveStatus `json:"live_status"` // 0: inactive 1: streaming 2: playback + HiddenTill int `json:"hidden_till"` + LockTill int `json:"lock_till"` + Encrypted bool `json:"encrypted"` + PwdVerified bool `json:"pwd_verified"` + LiveTime int `json:"live_time"` + RoomShield int `json:"room_shield"` + AllSpecialTypes []interface{} `json:"all_special_types"` + PlayurlInfo interface{} `json:"playurl_info"` +} + +type RoomPlayInfoResponse = BaseResponse[roomPlayInfo] + +func (s LiveStatus) IsStreaming() bool { + return s == Streaming +} + +func (s LiveStatus) String() string { + return liveStatusStringMap[s] +} + +func (b Bilibili) GetRoomPlayInfo(roomId common.RoomId) (resp RoomPlayInfoResponse, err error) { + url := fmt.Sprintf("https://api.live.bilibili.com/xlive/web-room/v2/index/getRoomPlayInfo"+ + "?room_id=%d&protocol=0,1&format=0,1,2&codec=0,1&qn=0&platform=web&ptype=8&dolby=5&panorama=1", roomId) + return callGet[RoomPlayInfoResponse](b, url) +} diff --git a/bilibili/streaming.go b/bilibili/streaming.go new file mode 100644 index 0000000..cc915a3 --- /dev/null +++ b/bilibili/streaming.go @@ -0,0 +1,66 @@ +package bilibili + +import ( + "bilibili-livestream-archiver/common" + "context" + "fmt" + "io" + "net/http" + "strings" +) + +// CopyLiveStream read data from a livestream video stream, copy them to a writer. +func (b Bilibili) CopyLiveStream( + ctx context.Context, + roomId common.RoomId, + stream StreamingUrlInfo, + out io.Writer, +) (err error) { + url := stream.URL + if !strings.HasPrefix(url, "https://") && + !strings.HasPrefix(url, "http://") { + return fmt.Errorf("invalid URL: %v", url) + } + + r, err := b.newGet(url) + if err != nil { + b.error.Printf("Cannot create HTTP GET instance on %v: %v", url, err) + return err + } + + r.Header.Set("Referer", + fmt.Sprintf("https://live.bilibili.com/blanc/%d?liteVersion=true", roomId)) + + resp, err := b.http.Do(r) + if err != nil { + b.error.Printf("Cannot make HTTP GET request on %v: %v\n", url, err) + return + } + + // 404 when not streaming + if resp.StatusCode == http.StatusNotFound { + return ErrRoomIsClosed + } + + err = validateHttpStatus(resp) + if err != nil { + return + } + + defer func() { _ = resp.Body.Close() }() + + // guard the following copy loop + // if the context is cancelled, stop it by closing the reader + guardianCtx, cancelGuardian := context.WithCancel(ctx) + go func() { + <-guardianCtx.Done() + _ = resp.Body.Close() + }() + defer cancelGuardian() + + // blocking copy + n, err := io.Copy(out, resp.Body) + + b.info.Printf("Bytes copied: %v", n) + return +} diff --git a/bilibili/util.go b/bilibili/util.go new file mode 100644 index 0000000..1267a39 --- /dev/null +++ b/bilibili/util.go @@ -0,0 +1,13 @@ +package bilibili + +import ( + "fmt" + "net/http" +) + +func validateHttpStatus(r *http.Response) (err error) { + if code := r.StatusCode; code != http.StatusOK { + err = fmt.Errorf("unsuccessful HTTP status on API %v: %v", r.Request.URL, code) + } + return +} diff --git a/common/orelse.go b/common/orelse.go new file mode 100644 index 0000000..a96bde7 --- /dev/null +++ b/common/orelse.go @@ -0,0 +1,20 @@ +package common + +type Opt[T any] struct { + thing T + err error +} + +func Optional[T any](thing T, err error) Opt[T] { + return Opt[T]{ + thing: thing, + err: err, + } +} + +func (o Opt[T]) OrElse(thing T) T { + if o.err != nil { + return thing + } + return o.thing +} diff --git a/common/retry.go b/common/retry.go new file mode 100644 index 0000000..6b97ff3 --- /dev/null +++ b/common/retry.go @@ -0,0 +1,35 @@ +package common + +import ( + "log" + "time" +) + +// AutoRetry retries the supplier automatically, with given time limit and interval. +// If maximum retry time limit is reached and the supplier still fails, +// the last error will be returned. +// If logger is not nil, retry information will be printed to it. +func AutoRetry[T any]( + supplier func() (T, error), + maxRetryTimes int, + retryInterval time.Duration, + logger *log.Logger) (T, error) { + var err error + for i := 0; i < maxRetryTimes; i++ { + ret, err := supplier() + if err != nil { + if logger != nil { + logger.Printf("Try %v/%v (sleep %vs): %v\n", + i, maxRetryTimes, retryInterval, err) + } + time.Sleep(retryInterval) + continue + } + // success + return ret, nil + } + if logger != nil { + logger.Printf("Max retry times reached, but it still fails. Last error: %v", err) + } + return *new(T), err +} diff --git a/common/types.go b/common/types.go new file mode 100644 index 0000000..81456b4 --- /dev/null +++ b/common/types.go @@ -0,0 +1,3 @@ +package common + +type RoomId uint64 diff --git a/common/urlparse.go b/common/urlparse.go new file mode 100644 index 0000000..dc72cee --- /dev/null +++ b/common/urlparse.go @@ -0,0 +1,21 @@ +package common + +import ( + "errors" + "net/url" + "strings" +) + +// GetFileExtensionFromUrl +// copied from https://elisegev.medium.com/get-a-file-extension-from-a-url-in-golang-5061d4a298a +func GetFileExtensionFromUrl(rawUrl string) (string, error) { + u, err := url.Parse(rawUrl) + if err != nil { + return "", err + } + pos := strings.LastIndex(u.Path, ".") + if pos == -1 { + return "", errors.New("couldn't find a period to indicate a file extension") + } + return u.Path[pos+1 : len(u.Path)], nil +} diff --git a/danmaku/client.go b/danmaku/client.go new file mode 100644 index 0000000..7745761 --- /dev/null +++ b/danmaku/client.go @@ -0,0 +1,136 @@ +/* +This file implements the background WebSocket messaging channel in Bilibili webui. +Server send livestream start and stop messages via this channel. +Note: In this file we manage the concrete WebSocket connection. +The Bilibili WebSocket channel protocol is decoupled and implemented in package `dmpkg`. +*/ +package danmaku + +import ( + "bilibili-livestream-archiver/common" + "bilibili-livestream-archiver/danmaku/dmpkg" + "context" + "fmt" + + "nhooyr.io/websocket" +) + +// Bilibili uses only binary WebSocket messages +const kBilibiliWebSocketMessageType = websocket.MessageBinary + +type DanmakuClient struct { + ws *websocket.Conn + wsio wsDatagramIO +} + +type DanmakuMessageType int + +// wsDatagramIO wraps websocket into a datagram I/O, +// since Bilibili uses only binary data, +// which is effectively a datagram communication. +type wsDatagramIO struct { + ws *websocket.Conn + ctx context.Context +} + +func (w *wsDatagramIO) Consume(data []byte) error { + return w.ws.Write(w.ctx, kBilibiliWebSocketMessageType, data) +} + +func (w *wsDatagramIO) Get() (data []byte, err error) { + typ, data, err := w.ws.Read(w.ctx) + if err != nil { + return + } + if typ != kBilibiliWebSocketMessageType { + err = fmt.Errorf("invalid message type: expected a binary WebSocket message, however got %v", typ.String()) + } + return +} + +func NewDanmakuClient() DanmakuClient { + return DanmakuClient{ + ws: nil, + } +} + +func (d *DanmakuClient) Connect(ctx context.Context, url string) error { + // thread unsafe + + // dial + if d.ws != nil { + return fmt.Errorf("already connected") + } + ws, _, err := websocket.Dial(ctx, url, nil) + if err != nil { + return fmt.Errorf("failed to establish WebSocket connection: %w", err) + } + d.ws = ws + + // init wsio + d.wsio = wsDatagramIO{ + ws: ws, + ctx: ctx, + } + + return nil +} + +func (d *DanmakuClient) Disconnect() error { + // thread unsafe + ws := d.ws + if ws == nil { + return nil + } + d.ws = nil + d.wsio = wsDatagramIO{} + return ws.Close(websocket.StatusInternalError, "disconnected") +} + +func (d *DanmakuClient) Authenticate(roomId common.RoomId, authKey string) error { + pkg := dmpkg.NewAuth(dmpkg.ProtoPlainJson, roomId, authKey) + data, err := pkg.Marshal() + if err != nil { + return fmt.Errorf("exchange marshal failed: %w", err) + } + err = d.wsio.Consume(data) + if err != nil { + return fmt.Errorf("channel write failed: %w", err) + } + // read server response + resp, err := d.wsio.Get() + if err != nil { + return err + } + respEx, err := dmpkg.DecodeExchange(resp) + if err != nil { + return fmt.Errorf("server danmaku exchange decode error: %w", err) + } + ok, err := dmpkg.IsAuthOk(respEx) + if !ok { + return fmt.Errorf("danmaku auth failed: %w", err) + } + return nil +} + +func (d *DanmakuClient) Heartbeat() error { + pkg := dmpkg.NewPing() + data, err := pkg.Marshal() + if err != nil { + return fmt.Errorf("exchange marshal failed: %w", err) + } + err = d.wsio.Consume(data) + if err != nil { + return fmt.Errorf("channel write failed: %w", err) + } + return nil +} + +// ReadExchange read and decode some kind of exchanges which we are interested +func (d *DanmakuClient) ReadExchange() (dmpkg.DanmakuExchange, error) { + data, err := d.wsio.Get() + if err != nil { + return dmpkg.DanmakuExchange{}, fmt.Errorf("failed to read danmaku datagram from server: %w", err) + } + return dmpkg.DecodeExchange(data) +} diff --git a/danmaku/dmpkg/auth.go b/danmaku/dmpkg/auth.go new file mode 100644 index 0000000..5a205ad --- /dev/null +++ b/danmaku/dmpkg/auth.go @@ -0,0 +1,52 @@ +/* +This file implements the auth exchange. +When Bilibili live client established the WebSocket connection successfully, +it sends this message at first. The server then responses a OpConnectOk exchange with body `{"code":0}` which indicates success. +*/ +package dmpkg + +import ( + "bilibili-livestream-archiver/common" + "encoding/json" + "fmt" +) + +type authInfo struct { + UID uint64 `json:"uid"` + RoomId uint64 `json:"roomid"` + ProtoVer int `json:"protover"` + Platform string `json:"platform"` + Type int `json:"type"` + Key string `json:"key"` +} + +// NewAuth creates a new authentication exchange. +func NewAuth(protocol ProtocolVer, roomId common.RoomId, authKey string) (exc DanmakuExchange) { + exc, _ = NewPlainExchange(OpConnect, authInfo{ + UID: kUidGuest, + RoomId: uint64(roomId), + ProtoVer: int(protocol), + Platform: kPlatformWeb, + Type: kAuthTypeDefault, + Key: authKey, + }) + return +} + +func IsAuthOk(serverResponse DanmakuExchange) (bool, error) { + if op := serverResponse.Operation; op != OpConnectOk { + return false, fmt.Errorf("server operation is not OpConnectOk: %w", op) + } + var body struct { + Code int `json:"code"` + } + body.Code = 1 + err := json.Unmarshal(serverResponse.Body, &body) + if err != nil { + return false, fmt.Errorf("JSON decode error: %w", err) + } + if c := body.Code; c != 0 { + return false, fmt.Errorf("server response code is non-zero: %w", c) + } + return true, nil +} diff --git a/danmaku/dmpkg/decode.go b/danmaku/dmpkg/decode.go new file mode 100644 index 0000000..7d9f796 --- /dev/null +++ b/danmaku/dmpkg/decode.go @@ -0,0 +1,44 @@ +package dmpkg + +import ( + "bytes" + "fmt" + "github.com/lunixbochs/struc" +) + +func DecodeExchange(data []byte) (exc DanmakuExchange, err error) { + if ln := len(data); ln < kHeaderLength { + err = fmt.Errorf("incomplete datagram: length = %v < %v", ln, kHeaderLength) + return + } + + // unpack header + var exchangeHeader DanmakuExchangeHeader + err = struc.Unpack(bytes.NewReader(data[:kHeaderLength]), &exchangeHeader) + if err != nil { + err = fmt.Errorf("cannot unpack exchange header: %w", err) + return + } + headerLength := exchangeHeader.HeaderLength + + // validate header length, fail fast if not match + if headerLength != kHeaderLength { + err = fmt.Errorf("invalid header length, "+ + "the protocol implementation might be obsolete: %v != %v", headerLength, kHeaderLength) + return + } + + // special process + // TODO decouple this + // The server OpHeartbeatAck contains an extra 4-bytes header entry in the body, maybe a heat value + var body []byte + // copy body + body = make([]byte, exchangeHeader.Length-uint32(headerLength)) + copy(body, data[headerLength:]) + + exc = DanmakuExchange{ + DanmakuExchangeHeader: exchangeHeader, + Body: body, + } + return +} diff --git a/danmaku/dmpkg/frameio.go b/danmaku/dmpkg/frameio.go new file mode 100644 index 0000000..2e7693a --- /dev/null +++ b/danmaku/dmpkg/frameio.go @@ -0,0 +1,15 @@ +/* +Since Bilibili only uses WebSocket binary message, +the transportation is effectively a datagram channel. +So we use a consumer-supplier abstraction to decouple +the real protocol with WebSocket stuff. +*/ +package dmpkg + +type Consumer[T any] interface { + Consume(value T) error +} + +type Supplier[T any] interface { + Get() (T, error) +} diff --git a/danmaku/dmpkg/operation.go b/danmaku/dmpkg/operation.go new file mode 100644 index 0000000..d3c13f4 --- /dev/null +++ b/danmaku/dmpkg/operation.go @@ -0,0 +1,28 @@ +package dmpkg + +type Operation uint32 + +// Operations +const ( + OpHeartbeat Operation = 2 + OpHeartbeatAck Operation = 3 + OpLayer7Data Operation = 5 + OpConnect Operation = 7 + OpConnectOk Operation = 8 +) + +var opStringMap = map[Operation]string{ + OpHeartbeat: "HeartBeat", + OpHeartbeatAck: "HeartBeatAck", + OpLayer7Data: "AppData", + OpConnect: "Connect", + OpConnectOk: "ConnectOk", +} + +func (o Operation) String() string { + s, exists := opStringMap[o] + if !exists { + return "" + } + return s +} diff --git a/danmaku/dmpkg/package.go b/danmaku/dmpkg/package.go new file mode 100644 index 0000000..3175a18 --- /dev/null +++ b/danmaku/dmpkg/package.go @@ -0,0 +1,111 @@ +package dmpkg + +import ( + "bytes" + "compress/zlib" + "fmt" + "github.com/andybalholm/brotli" + "github.com/lunixbochs/struc" + "io" +) + +type DanmakuExchangeHeader struct { + // Length total remaining bytes of this exchange, excluding `Length` itself + Length uint32 + // HeaderLength = Length - len(Body) + 4, always equals to 16 + HeaderLength uint16 + ProtocolVer ProtocolVer + Operation Operation + // SequenceId is always 1 + SequenceId uint32 +} + +// DanmakuExchange represents an actual message sent from client or server. This is an atomic unit. +type DanmakuExchange struct { + DanmakuExchangeHeader + Body []byte +} + +func (e *DanmakuExchange) String() string { + return fmt.Sprintf("DanmakuExchange{length=%v, protocol=%v, operation=%v, body=%v}", + e.Length, e.ProtocolVer, e.Operation, e.Body) +} + +const kHeaderLength = 16 +const kSequenceId = 1 + +type ProtocolVer uint16 + +const ( + // ProtoPlainJson the body is plain JSON text + ProtoPlainJson ProtocolVer = 0 + // ProtoMinimal the body is uint32 watcher count (big endian) + ProtoMinimal ProtocolVer = 1 + // ProtoZlib the body is a zlib compressed package + ProtoZlib ProtocolVer = 2 + // ProtoBrotli the body is a brotli compressed package + ProtoBrotli ProtocolVer = 3 +) + +const kUidGuest = 0 +const kPlatformWeb = "web" +const kAuthTypeDefault = 2 // magic number, not sure what does it mean + +func (e *DanmakuExchange) Marshal() (data []byte, err error) { + var buffer bytes.Buffer + // only unpack header with struc, since it does not support indirect variable field length calculation + err = struc.Pack(&buffer, &e.DanmakuExchangeHeader) + if err != nil { + err = fmt.Errorf("cannot pack an exchange into binary form: %w", err) + return + } + data = buffer.Bytes() + data = append(data, e.Body...) + return +} + +// Inflate decompresses the body if it is compressed +func (e *DanmakuExchange) Inflate() (ret DanmakuExchange, err error) { + switch e.ProtocolVer { + case ProtoMinimal: + fallthrough + case ProtoPlainJson: + ret = *e + case ProtoBrotli: + var data []byte + rd := brotli.NewReader(bytes.NewReader(e.Body)) + data, err = io.ReadAll(rd) + if err != nil { + err = fmt.Errorf("cannot decompress exchange body: %w", err) + return + } + var nestedExchange DanmakuExchange + nestedExchange, err = DecodeExchange(data) + if err != nil { + err = fmt.Errorf("cannot decode nested exchange: %w", err) + return + } + return nestedExchange.Inflate() + case ProtoZlib: + var data []byte + var rd io.ReadCloser + rd, err = zlib.NewReader(bytes.NewReader(e.Body)) + if err != nil { + err = fmt.Errorf("cannot create zlib reader: %w", err) + return + } + data, err = io.ReadAll(rd) + if err != nil { + err = fmt.Errorf("cannot decompress exchange body: %w", err) + return + } + var nestedExchange DanmakuExchange + nestedExchange, err = DecodeExchange(data) + if err != nil { + err = fmt.Errorf("cannot decode nested exchange: %w", err) + return + } + return nestedExchange.Inflate() + } + return +} diff --git a/danmaku/dmpkg/ping.go b/danmaku/dmpkg/ping.go new file mode 100644 index 0000000..4596d49 --- /dev/null +++ b/danmaku/dmpkg/ping.go @@ -0,0 +1,8 @@ +package dmpkg + +// NewPing construct a new PING exahange. +func NewPing() (exc DanmakuExchange) { + // compliant with Bilibili webapp behavior + exc, _ = NewPlainExchange(OpHeartbeat, "[object Object]") + return +} diff --git a/danmaku/dmpkg/raw.go b/danmaku/dmpkg/raw.go new file mode 100644 index 0000000..1cbd12e --- /dev/null +++ b/danmaku/dmpkg/raw.go @@ -0,0 +1,47 @@ +package dmpkg + +import ( + "encoding/json" + "fmt" + "math" +) + +const kMaxBodyLength = math.MaxUint32 - kHeaderLength + +// NewPlainExchange creates a new exchange with raw body specified. +// body: a struct or a raw string +func NewPlainExchange(operation Operation, body interface{}) (exc DanmakuExchange, err error) { + var bodyData []byte + + // convert body to []byte + if _, ok := body.(string); ok { + // a string + bodyData = []byte(body.(string)) + } else if _, ok := body.([]byte); ok { + // a []byte + copy(bodyData, body.([]byte)) + } else { + // a JSON struct + bodyData, err = json.Marshal(body) + if err != nil { + return + } + } + + length := uint64(kHeaderLength + len(bodyData)) + if length > kMaxBodyLength { + err = fmt.Errorf("body is too large (> %d)", kMaxBodyLength) + return + } + exc = DanmakuExchange{ + DanmakuExchangeHeader: DanmakuExchangeHeader{ + Length: uint32(length), + HeaderLength: kHeaderLength, + ProtocolVer: ProtoPlainJson, + Operation: operation, + SequenceId: kSequenceId, + }, + Body: bodyData, + } + return +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..b183d34 --- /dev/null +++ b/go.mod @@ -0,0 +1,21 @@ +module bilibili-livestream-archiver + +go 1.18 + +require ( + github.com/andybalholm/brotli v1.0.4 + github.com/lunixbochs/struc v0.0.0-20200707160740-784aaebc1d40 + nhooyr.io/websocket v1.8.7 +) + +require ( + github.com/golang/protobuf v1.5.2 // indirect + github.com/google/go-cmp v0.5.8 // indirect + github.com/json-iterator/go v1.1.12 // indirect + github.com/klauspost/compress v1.10.3 // indirect + github.com/mattn/go-isatty v0.0.14 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect + google.golang.org/protobuf v1.28.0 // indirect + gopkg.in/yaml.v2 v2.4.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..9220e21 --- /dev/null +++ b/go.sum @@ -0,0 +1,78 @@ +github.com/andybalholm/brotli v1.0.4 h1:V7DdXeJtZscaqfNuAdSRuRFzuiKlHSC/Zh3zl9qY3JY= +github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE= +github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI= +github.com/gin-gonic/gin v1.6.3 h1:ahKqKTFpO5KTPHxWZjEdPScmYaGtLo8Y4DMHoEsnp14= +github.com/gin-gonic/gin v1.6.3/go.mod h1:75u5sXoLsGZoRN5Sgbi1eraJ4GU3++wFwWzhwvtwp4M= +github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= +github.com/go-playground/locales v0.13.0 h1:HyWk6mgj5qFqCT5fjGBuRArbVDfE4hi8+e8ceBS/t7Q= +github.com/go-playground/locales v0.13.0/go.mod h1:taPMhCMXrRLJO55olJkUXHZBHCxTMfnGwq/HNwmWNS8= +github.com/go-playground/universal-translator v0.17.0 h1:icxd5fm+REJzpZx7ZfpaD876Lmtgy7VtROAbHHXk8no= +github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+Scu5vgOQjsIJAF8j9muTVoKLVtA= +github.com/go-playground/validator/v10 v10.2.0 h1:KgJ0snyC2R9VXYN2rneOtQcw5aHQB1Vv0sFl1UcHBOY= +github.com/go-playground/validator/v10 v10.2.0/go.mod h1:uOYAAleCW8F/7oMFd6aG0GOhaH6EGOAJShg8Id5JGkI= +github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee h1:s+21KNqlpePfkah2I+gwHF8xmJWRjooY+5248k6m4A0= +github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee/go.mod h1:L0fX3K22YWvt/FAX9NnzrNzcI4wNYi9Yku4O0LKYflo= +github.com/gobwas/pool v0.2.0 h1:QEmUOlnSjWtnpRGHF3SauEiOsy82Cup83Vf2LcMlnc8= +github.com/gobwas/pool v0.2.0/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw= +github.com/gobwas/ws v1.0.2 h1:CoAavW/wd/kulfZmSIBt6p24n4j7tHgNVCjsfHVNUbo= +github.com/gobwas/ws v1.0.2/go.mod h1:szmBTxLgaFppYjEmNtny/v3w89xOydFnnZMcgRRu/EM= +github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw= +github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= +github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= +github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/gorilla/websocket v1.4.1 h1:q7AeDBpnBk8AogcD4DSag/Ukw/KV+YhzLj2bP5HvKCM= +github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/klauspost/compress v1.10.3 h1:OP96hzwJVBIHYU52pVTI6CczrxPvrGfgqF9N5eTO0Q8= +github.com/klauspost/compress v1.10.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= +github.com/leodido/go-urn v1.2.0 h1:hpXL4XnriNwQ/ABnpepYM/1vCLWNDfUNts8dX3xTG6Y= +github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII= +github.com/lunixbochs/struc v0.0.0-20200707160740-784aaebc1d40 h1:EnfXoSqDfSNJv0VBNqY/88RNnhSGYkrHaO0mmFGbVsc= +github.com/lunixbochs/struc v0.0.0-20200707160740-784aaebc1d40/go.mod h1:vy1vK6wD6j7xX6O6hXe621WabdtNkou2h7uRtTfRMyg= +github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= +github.com/mattn/go-isatty v0.0.14 h1:yVuAays6BHfxijgZPzw+3Zlu5yQgKGP2/hcQbHb7S9Y= +github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo= +github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw= +github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs= +github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY= +golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a h1:dGzPydgVsqGcTRVwiLJ1jVbufYwmzD3LfVPLKsKg+0k= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw= +google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= +nhooyr.io/websocket v1.8.7 h1:usjR2uOr/zjjkVMy0lW+PPohFok7PCow5sDjLgX4P4g= +nhooyr.io/websocket v1.8.7/go.mod h1:B70DZP8IakI65RVQ51MsWP/8jndNma26DVA/nFSCgW0= diff --git a/main.go b/main.go new file mode 100644 index 0000000..1cf56ad --- /dev/null +++ b/main.go @@ -0,0 +1,30 @@ +package main + +import ( + "bilibili-livestream-archiver/recording" + "context" + "fmt" +) + +func main() { + task := recording.TaskConfig{ + RoomId: 7777, + Transport: recording.TransportConfig{ + SocketTimeoutSeconds: 10, + RetryIntervalSeconds: 5, + MaxRetryTimes: 5, + }, + Download: recording.DownloadConfig{ + SaveDirectory: ".", + FileNameTemplate: "", + }, + } + chResult := make(chan recording.TaskResult) + go recording.RunTask( + context.Background(), + &task, + chResult, + ) + result := <-chResult + fmt.Println(result.Error) +} diff --git a/recording/config.go b/recording/config.go new file mode 100644 index 0000000..1a24508 --- /dev/null +++ b/recording/config.go @@ -0,0 +1,20 @@ +package recording + +import "bilibili-livestream-archiver/common" + +type TaskConfig struct { + RoomId common.RoomId `mapstructure:"room_id"` + Transport TransportConfig `mapstructure:"transport"` + Download DownloadConfig `mapstructure:"download"` +} + +type TransportConfig struct { + SocketTimeoutSeconds int `mapstructure:"socket_timeout_seconds"` + RetryIntervalSeconds int `mapstructure:"retry_interval_seconds"` + MaxRetryTimes int `mapstructure:"max_retry_times"` +} + +type DownloadConfig struct { + SaveDirectory string `mapstructure:"save_directory"` + FileNameTemplate string `mapstructure:"file_name_template"` +} diff --git a/recording/runner.go b/recording/runner.go new file mode 100644 index 0000000..cee4325 --- /dev/null +++ b/recording/runner.go @@ -0,0 +1,252 @@ +/* +This file contains task runner. +Task runner composes status monitor and stream downloader concrete task config. +The config can be load from a config file. +*/ +package recording + +import ( + "bilibili-livestream-archiver/bilibili" + "bilibili-livestream-archiver/common" + "context" + "encoding/json" + "fmt" + "log" + "os" + "path" + "time" +) + +// TaskResult represents an execution result of a task. +type TaskResult struct { + Task *TaskConfig + Error error +} + +// RunTask start a monitor&download task and +// put its execution result into a channel. +func RunTask(ctx context.Context, task *TaskConfig, chTaskResult chan<- TaskResult) { + err := doTask(ctx, task) + chTaskResult <- TaskResult{ + Task: task, + Error: err, + } +} + +// doTask do the actual work, but returns synchronously. +func doTask(ctx context.Context, task *TaskConfig) error { + logger := log.Default() + bi := bilibili.NewBilibili() + logger.Printf("Start task: room %v\n", task.RoomId) + + authKey, url, err := getStreamingServer(task, logger, bi) + if err != nil { + return err + } + + // run live status watcher asynchronously + logger.Println("Starting watcher...") + chWatcherEvent := make(chan WatcherEvent) + chWatcherDown := make(chan struct{}) + + // start and recover watcher asynchronously + // the watcher may also be stopped by the downloader goroutine + watcherCtx, stopWatcher := context.WithCancel(ctx) + defer stopWatcher() + go watcherRecoverableLoop(watcherCtx, url, authKey, task, bi, chWatcherEvent, chWatcherDown) + + // The stream download goroutine may fail due to wrong watcher state. + // But this is likely temporarily, so we should restart the downloader + // until the state turns to closed. + + // We store the last modified live status + // in case there is a false-positive duplicate. + lastStatusIsLiving := false + recorderCtx, stopRecorder := context.WithCancel(ctx) + defer stopRecorder() + for { + select { + case <-ctx.Done(): + logger.Printf("Task (room %v) is stopped.\n", task.RoomId) + return nil + case <-chWatcherDown: + // watcher is down and unrecoverable, stop this task + return fmt.Errorf("task (room %v) stopped: watcher is down and unrecoverable", task.RoomId) + case ev := <-chWatcherEvent: + switch ev { + case WatcherLiveStart: + if lastStatusIsLiving { + logger.Println("Duplicate adjacent WatcherLiveStart event. Ignoring.") + continue + } + go func() { + cancelled := false + // restart recorder if interrupted by I/O errors + for !cancelled { + cancelled = record(recorderCtx, bi, task) + } + logger.Printf("Task is cancelled. (room %v)\n", task.RoomId) + }() + lastStatusIsLiving = true + case WatcherLiveStop: + lastStatusIsLiving = false + } + } + } +} + +// record. When cancelled, the caller should clean up immediately and stop the task. +func record( + ctx context.Context, + bi bilibili.Bilibili, + task *TaskConfig, +) (cancelled bool) { + logger := log.Default() + logger.Printf("INFO: Getting room profile...\n") + + profile, err := common.AutoRetry( + func() (bilibili.RoomProfileResponse, error) { + return bi.GetRoomProfile(task.RoomId) + }, + task.Transport.MaxRetryTimes, + time.Duration(task.Transport.RetryIntervalSeconds)*time.Second, + logger, + ) + if err != nil { + // still error, abort + logger.Printf("ERROR: Cannot get room information: %v. Stopping current task.\n", err) + cancelled = true + return + } + + urlInfo, err := common.AutoRetry( + func() (bilibili.RoomUrlInfoResponse, error) { + return bi.GetStreamingInfo(task.RoomId) + }, + task.Transport.MaxRetryTimes, + time.Duration(task.Transport.RetryIntervalSeconds)*time.Second, + logger, + ) + if err != nil { + logger.Printf("ERROR: Cannot get streaming info: %v", err) + cancelled = true + return + } + if len(urlInfo.Data.URLs) == 0 { + j, err := json.Marshal(urlInfo) + if err != nil { + j = []byte("(not available)") + } + logger.Printf("ERROR: No stream returned from API. Response: %v", string(j)) + cancelled = true + return + } + streamSource := urlInfo.Data.URLs[0] + + fileName := fmt.Sprintf( + "%s.%s", + GenerateFileName(profile.Data.Title, time.Now()), + common.Optional[string](common.GetFileExtensionFromUrl(streamSource.URL)).OrElse("flv"), + ) + filePath := path.Join(task.Download.SaveDirectory, fileName) + file, err := os.OpenFile(filePath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644) + defer func() { _ = file.Close() }() + if err != nil { + logger.Printf("ERROR: Cannot open file for writing: %v", err) + cancelled = true + return + } + + logger.Printf("Recording live stream to file \"%v\"...", filePath) + err = bi.CopyLiveStream(ctx, task.RoomId, streamSource, file) + cancelled = false + return +} + +// watcherRecoverableLoop run watcher forever until the context is cancelled. +func watcherRecoverableLoop( + ctx context.Context, + url string, + authKey string, + task *TaskConfig, + bi bilibili.Bilibili, + chWatcherEvent chan WatcherEvent, + chWatcherDown chan<- struct{}, +) { + logger := log.Default() + + for { + err, errReason := watch( + ctx, + url, + authKey, + task.RoomId, + func() (bool, error) { + resp, err := bi.GetRoomPlayInfo(task.RoomId) + if err != nil { + return false, err + } + if resp.Code != 0 { + return false, fmt.Errorf("bilibili API error: %v", resp.Message) + } + return resp.Data.LiveStatus.IsStreaming(), nil + }, + chWatcherEvent, + ) + + switch errReason { + case ErrSuccess: + // stop normally, the context is closed + return + case ErrProtocol: + logger.Printf("FATAL: Watcher stopped due to an unrecoverable error: %v\n", err) + // shutdown the whole task + chWatcherDown <- struct{}{} + return + case ErrTransport: + logger.Printf("ERROR: Watcher stopped due to an I/O error: %v\n", err) + waitSeconds := task.Transport.RetryIntervalSeconds + logger.Printf( + "WARNING: Sleep for %v second(s) before restarting watcher.\n", + waitSeconds, + ) + time.Sleep(time.Duration(waitSeconds) * time.Second) + logger.Printf("Retrying...") + } + } +} + +func getStreamingServer( + task *TaskConfig, + logger *log.Logger, + bi bilibili.Bilibili, +) (string, string, error) { + logger.Println("Getting stream server info...") + dmInfo, err := bi.GetDanmakuServerInfo(task.RoomId) + if err != nil { + return "", "", fmt.Errorf("failed to read stream server info: %w", err) + } + if len(dmInfo.Data.HostList) == 0 { + return "", "", fmt.Errorf("no available stream server") + } + logger.Println("Success.") + + // get authkey and ws url + authKey := dmInfo.Data.Token + host := dmInfo.Data.HostList[0] + url := fmt.Sprintf("wss://%s:%d/sub", host.Host, host.WssPort) + return authKey, url, nil +} + +func GenerateFileName(roomName string, t time.Time) string { + ts := fmt.Sprintf( + "%d-%02d-%02d-%02d-%02d-%02d", + t.Year(), + t.Month(), + t.Day(), + t.Hour(), + t.Minute(), + t.Second(), + ) + return fmt.Sprintf("%s_%s", roomName, ts) +} diff --git a/recording/watcher.go b/recording/watcher.go new file mode 100644 index 0000000..439ffcb --- /dev/null +++ b/recording/watcher.go @@ -0,0 +1,156 @@ +package recording + +import ( + "bilibili-livestream-archiver/common" + "bilibili-livestream-archiver/danmaku" + "bilibili-livestream-archiver/danmaku/dmpkg" + "context" + "encoding/json" + "fmt" + "log" + "time" +) + +type WatcherEvent int + +const ( + WatcherLiveStart WatcherEvent = 0 + WatcherLiveStop WatcherEvent = 1 +) + +type liveCommand string + +const ( + CommandLiveStart = "LIVE" + CommandStreamPreparing = "PREPARING" +) + +type liveInfo struct { + Command liveCommand `json:"cmd"` +} + +type ErrorReason int + +const ( + ErrSuccess ErrorReason = iota // no error happens, normally closed + ErrTransport // I/O error, safe to retry + ErrProtocol // application protocol logic error, do not retry +) + +const ( + kHeartBeatInterval = 30 * time.Second +) + +// watch monitors live room status by subscribing messages from Bilibili danmaku server, +// which talks to the client via a WebSocket or TCP connection. +// In our implementation, we use WebSocket over SSL/TLS. +func watch( + ctx context.Context, + url string, + authKey string, + roomId common.RoomId, + liveStatusChecker func() (bool, error), + chEvent chan<- WatcherEvent, +) (error, ErrorReason) { + + logger := log.Default() + + var err error + + dm := danmaku.NewDanmakuClient() + defer func() { _ = dm.Disconnect() }() + + // connect to danmaku server for live online/offline notifications + err = dm.Connect(ctx, url) + if err != nil { + return fmt.Errorf("failed to connect to danmaku server: %w", err), ErrTransport + } + defer func() { _ = dm.Disconnect() }() + + // the danmaku server requires an auth token and room id when connected + logger.Println("ws connected. Authenticating...") + err = dm.Authenticate(roomId, authKey) + if err != nil { + return fmt.Errorf("auth failed: %w", err), ErrProtocol + } + + // the danmaku server requires heartbeat messages every 30 seconds + heartbeat := func() error { + err := dm.Heartbeat() + return err + } + + // send initial heartbeat immediately + err = heartbeat() + if err != nil { + return err, ErrTransport + } + + // create heartbeat timer + heartBeatTimer := time.NewTicker(kHeartBeatInterval) + defer func() { heartBeatTimer.Stop() }() + + logger.Println("Checking initial live status...") + isLiving, err := liveStatusChecker() + if err != nil { + return fmt.Errorf("check initial live status failed: %w", err), ErrTransport + } + + if isLiving { + logger.Println("The live is already started. Start recording immediately.") + chEvent <- WatcherLiveStart + } else { + logger.Println("The live is not started yet. Waiting...") + } + + for { + select { + case <-ctx.Done(): + return nil, ErrSuccess + case <-heartBeatTimer.C: + err = heartbeat() + if err != nil { + return fmt.Errorf("heartbeat failed: %w", err), ErrTransport + } + default: + var msg dmpkg.DanmakuExchange + msg, err = dm.ReadExchange() + if err != nil { + return fmt.Errorf("exchange read failed: %w", err), ErrTransport + } + // the exchange may be compressed + msg, err = msg.Inflate() + if err != nil { + return fmt.Errorf("inflate server message failed: %v", err), ErrProtocol + } + + switch msg.Operation { + case dmpkg.OpLayer7Data: + logger.Printf("server message: op %v, body %v\n", msg.Operation, string(msg.Body)) + var info liveInfo + err := json.Unmarshal(msg.Body, &info) + if err != nil { + logger.Printf("ERROR: invalid JSON: \"%v\", exchange: %v", string(msg.Body), msg) + return fmt.Errorf("decode server message body JSON failed: %w", err), ErrProtocol + } + switch info.Command { + case CommandLiveStart: + if !isLiving { + chEvent <- WatcherLiveStart + isLiving = true + } + case CommandStreamPreparing: + if isLiving { + chEvent <- WatcherLiveStop + } + default: + logger.Printf("Ignoring server message %v %v %v\n", + info.Command, msg.Operation, string(msg.Body)) + } + default: + logger.Printf("Server message: %v\n", msg.String()) + } + + } + } +} -- cgit v1.2.3