summaryrefslogtreecommitdiff
path: root/recording
diff options
context:
space:
mode:
Diffstat (limited to 'recording')
-rw-r--r--recording/config.go20
-rw-r--r--recording/runner.go252
-rw-r--r--recording/watcher.go156
3 files changed, 428 insertions, 0 deletions
diff --git a/recording/config.go b/recording/config.go
new file mode 100644
index 0000000..1a24508
--- /dev/null
+++ b/recording/config.go
@@ -0,0 +1,20 @@
+package recording
+
+import "bilibili-livestream-archiver/common"
+
+type TaskConfig struct {
+ RoomId common.RoomId `mapstructure:"room_id"`
+ Transport TransportConfig `mapstructure:"transport"`
+ Download DownloadConfig `mapstructure:"download"`
+}
+
+type TransportConfig struct {
+ SocketTimeoutSeconds int `mapstructure:"socket_timeout_seconds"`
+ RetryIntervalSeconds int `mapstructure:"retry_interval_seconds"`
+ MaxRetryTimes int `mapstructure:"max_retry_times"`
+}
+
+type DownloadConfig struct {
+ SaveDirectory string `mapstructure:"save_directory"`
+ FileNameTemplate string `mapstructure:"file_name_template"`
+}
diff --git a/recording/runner.go b/recording/runner.go
new file mode 100644
index 0000000..cee4325
--- /dev/null
+++ b/recording/runner.go
@@ -0,0 +1,252 @@
+/*
+This file contains task runner.
+Task runner composes status monitor and stream downloader concrete task config.
+The config can be load from a config file.
+*/
+package recording
+
+import (
+ "bilibili-livestream-archiver/bilibili"
+ "bilibili-livestream-archiver/common"
+ "context"
+ "encoding/json"
+ "fmt"
+ "log"
+ "os"
+ "path"
+ "time"
+)
+
+// TaskResult represents an execution result of a task.
+type TaskResult struct {
+ Task *TaskConfig
+ Error error
+}
+
+// RunTask start a monitor&download task and
+// put its execution result into a channel.
+func RunTask(ctx context.Context, task *TaskConfig, chTaskResult chan<- TaskResult) {
+ err := doTask(ctx, task)
+ chTaskResult <- TaskResult{
+ Task: task,
+ Error: err,
+ }
+}
+
+// doTask do the actual work, but returns synchronously.
+func doTask(ctx context.Context, task *TaskConfig) error {
+ logger := log.Default()
+ bi := bilibili.NewBilibili()
+ logger.Printf("Start task: room %v\n", task.RoomId)
+
+ authKey, url, err := getStreamingServer(task, logger, bi)
+ if err != nil {
+ return err
+ }
+
+ // run live status watcher asynchronously
+ logger.Println("Starting watcher...")
+ chWatcherEvent := make(chan WatcherEvent)
+ chWatcherDown := make(chan struct{})
+
+ // start and recover watcher asynchronously
+ // the watcher may also be stopped by the downloader goroutine
+ watcherCtx, stopWatcher := context.WithCancel(ctx)
+ defer stopWatcher()
+ go watcherRecoverableLoop(watcherCtx, url, authKey, task, bi, chWatcherEvent, chWatcherDown)
+
+ // The stream download goroutine may fail due to wrong watcher state.
+ // But this is likely temporarily, so we should restart the downloader
+ // until the state turns to closed.
+
+ // We store the last modified live status
+ // in case there is a false-positive duplicate.
+ lastStatusIsLiving := false
+ recorderCtx, stopRecorder := context.WithCancel(ctx)
+ defer stopRecorder()
+ for {
+ select {
+ case <-ctx.Done():
+ logger.Printf("Task (room %v) is stopped.\n", task.RoomId)
+ return nil
+ case <-chWatcherDown:
+ // watcher is down and unrecoverable, stop this task
+ return fmt.Errorf("task (room %v) stopped: watcher is down and unrecoverable", task.RoomId)
+ case ev := <-chWatcherEvent:
+ switch ev {
+ case WatcherLiveStart:
+ if lastStatusIsLiving {
+ logger.Println("Duplicate adjacent WatcherLiveStart event. Ignoring.")
+ continue
+ }
+ go func() {
+ cancelled := false
+ // restart recorder if interrupted by I/O errors
+ for !cancelled {
+ cancelled = record(recorderCtx, bi, task)
+ }
+ logger.Printf("Task is cancelled. (room %v)\n", task.RoomId)
+ }()
+ lastStatusIsLiving = true
+ case WatcherLiveStop:
+ lastStatusIsLiving = false
+ }
+ }
+ }
+}
+
+// record. When cancelled, the caller should clean up immediately and stop the task.
+func record(
+ ctx context.Context,
+ bi bilibili.Bilibili,
+ task *TaskConfig,
+) (cancelled bool) {
+ logger := log.Default()
+ logger.Printf("INFO: Getting room profile...\n")
+
+ profile, err := common.AutoRetry(
+ func() (bilibili.RoomProfileResponse, error) {
+ return bi.GetRoomProfile(task.RoomId)
+ },
+ task.Transport.MaxRetryTimes,
+ time.Duration(task.Transport.RetryIntervalSeconds)*time.Second,
+ logger,
+ )
+ if err != nil {
+ // still error, abort
+ logger.Printf("ERROR: Cannot get room information: %v. Stopping current task.\n", err)
+ cancelled = true
+ return
+ }
+
+ urlInfo, err := common.AutoRetry(
+ func() (bilibili.RoomUrlInfoResponse, error) {
+ return bi.GetStreamingInfo(task.RoomId)
+ },
+ task.Transport.MaxRetryTimes,
+ time.Duration(task.Transport.RetryIntervalSeconds)*time.Second,
+ logger,
+ )
+ if err != nil {
+ logger.Printf("ERROR: Cannot get streaming info: %v", err)
+ cancelled = true
+ return
+ }
+ if len(urlInfo.Data.URLs) == 0 {
+ j, err := json.Marshal(urlInfo)
+ if err != nil {
+ j = []byte("(not available)")
+ }
+ logger.Printf("ERROR: No stream returned from API. Response: %v", string(j))
+ cancelled = true
+ return
+ }
+ streamSource := urlInfo.Data.URLs[0]
+
+ fileName := fmt.Sprintf(
+ "%s.%s",
+ GenerateFileName(profile.Data.Title, time.Now()),
+ common.Optional[string](common.GetFileExtensionFromUrl(streamSource.URL)).OrElse("flv"),
+ )
+ filePath := path.Join(task.Download.SaveDirectory, fileName)
+ file, err := os.OpenFile(filePath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644)
+ defer func() { _ = file.Close() }()
+ if err != nil {
+ logger.Printf("ERROR: Cannot open file for writing: %v", err)
+ cancelled = true
+ return
+ }
+
+ logger.Printf("Recording live stream to file \"%v\"...", filePath)
+ err = bi.CopyLiveStream(ctx, task.RoomId, streamSource, file)
+ cancelled = false
+ return
+}
+
+// watcherRecoverableLoop run watcher forever until the context is cancelled.
+func watcherRecoverableLoop(
+ ctx context.Context,
+ url string,
+ authKey string,
+ task *TaskConfig,
+ bi bilibili.Bilibili,
+ chWatcherEvent chan WatcherEvent,
+ chWatcherDown chan<- struct{},
+) {
+ logger := log.Default()
+
+ for {
+ err, errReason := watch(
+ ctx,
+ url,
+ authKey,
+ task.RoomId,
+ func() (bool, error) {
+ resp, err := bi.GetRoomPlayInfo(task.RoomId)
+ if err != nil {
+ return false, err
+ }
+ if resp.Code != 0 {
+ return false, fmt.Errorf("bilibili API error: %v", resp.Message)
+ }
+ return resp.Data.LiveStatus.IsStreaming(), nil
+ },
+ chWatcherEvent,
+ )
+
+ switch errReason {
+ case ErrSuccess:
+ // stop normally, the context is closed
+ return
+ case ErrProtocol:
+ logger.Printf("FATAL: Watcher stopped due to an unrecoverable error: %v\n", err)
+ // shutdown the whole task
+ chWatcherDown <- struct{}{}
+ return
+ case ErrTransport:
+ logger.Printf("ERROR: Watcher stopped due to an I/O error: %v\n", err)
+ waitSeconds := task.Transport.RetryIntervalSeconds
+ logger.Printf(
+ "WARNING: Sleep for %v second(s) before restarting watcher.\n",
+ waitSeconds,
+ )
+ time.Sleep(time.Duration(waitSeconds) * time.Second)
+ logger.Printf("Retrying...")
+ }
+ }
+}
+
+func getStreamingServer(
+ task *TaskConfig,
+ logger *log.Logger,
+ bi bilibili.Bilibili,
+) (string, string, error) {
+ logger.Println("Getting stream server info...")
+ dmInfo, err := bi.GetDanmakuServerInfo(task.RoomId)
+ if err != nil {
+ return "", "", fmt.Errorf("failed to read stream server info: %w", err)
+ }
+ if len(dmInfo.Data.HostList) == 0 {
+ return "", "", fmt.Errorf("no available stream server")
+ }
+ logger.Println("Success.")
+
+ // get authkey and ws url
+ authKey := dmInfo.Data.Token
+ host := dmInfo.Data.HostList[0]
+ url := fmt.Sprintf("wss://%s:%d/sub", host.Host, host.WssPort)
+ return authKey, url, nil
+}
+
+func GenerateFileName(roomName string, t time.Time) string {
+ ts := fmt.Sprintf(
+ "%d-%02d-%02d-%02d-%02d-%02d",
+ t.Year(),
+ t.Month(),
+ t.Day(),
+ t.Hour(),
+ t.Minute(),
+ t.Second(),
+ )
+ return fmt.Sprintf("%s_%s", roomName, ts)
+}
diff --git a/recording/watcher.go b/recording/watcher.go
new file mode 100644
index 0000000..439ffcb
--- /dev/null
+++ b/recording/watcher.go
@@ -0,0 +1,156 @@
+package recording
+
+import (
+ "bilibili-livestream-archiver/common"
+ "bilibili-livestream-archiver/danmaku"
+ "bilibili-livestream-archiver/danmaku/dmpkg"
+ "context"
+ "encoding/json"
+ "fmt"
+ "log"
+ "time"
+)
+
+type WatcherEvent int
+
+const (
+ WatcherLiveStart WatcherEvent = 0
+ WatcherLiveStop WatcherEvent = 1
+)
+
+type liveCommand string
+
+const (
+ CommandLiveStart = "LIVE"
+ CommandStreamPreparing = "PREPARING"
+)
+
+type liveInfo struct {
+ Command liveCommand `json:"cmd"`
+}
+
+type ErrorReason int
+
+const (
+ ErrSuccess ErrorReason = iota // no error happens, normally closed
+ ErrTransport // I/O error, safe to retry
+ ErrProtocol // application protocol logic error, do not retry
+)
+
+const (
+ kHeartBeatInterval = 30 * time.Second
+)
+
+// watch monitors live room status by subscribing messages from Bilibili danmaku server,
+// which talks to the client via a WebSocket or TCP connection.
+// In our implementation, we use WebSocket over SSL/TLS.
+func watch(
+ ctx context.Context,
+ url string,
+ authKey string,
+ roomId common.RoomId,
+ liveStatusChecker func() (bool, error),
+ chEvent chan<- WatcherEvent,
+) (error, ErrorReason) {
+
+ logger := log.Default()
+
+ var err error
+
+ dm := danmaku.NewDanmakuClient()
+ defer func() { _ = dm.Disconnect() }()
+
+ // connect to danmaku server for live online/offline notifications
+ err = dm.Connect(ctx, url)
+ if err != nil {
+ return fmt.Errorf("failed to connect to danmaku server: %w", err), ErrTransport
+ }
+ defer func() { _ = dm.Disconnect() }()
+
+ // the danmaku server requires an auth token and room id when connected
+ logger.Println("ws connected. Authenticating...")
+ err = dm.Authenticate(roomId, authKey)
+ if err != nil {
+ return fmt.Errorf("auth failed: %w", err), ErrProtocol
+ }
+
+ // the danmaku server requires heartbeat messages every 30 seconds
+ heartbeat := func() error {
+ err := dm.Heartbeat()
+ return err
+ }
+
+ // send initial heartbeat immediately
+ err = heartbeat()
+ if err != nil {
+ return err, ErrTransport
+ }
+
+ // create heartbeat timer
+ heartBeatTimer := time.NewTicker(kHeartBeatInterval)
+ defer func() { heartBeatTimer.Stop() }()
+
+ logger.Println("Checking initial live status...")
+ isLiving, err := liveStatusChecker()
+ if err != nil {
+ return fmt.Errorf("check initial live status failed: %w", err), ErrTransport
+ }
+
+ if isLiving {
+ logger.Println("The live is already started. Start recording immediately.")
+ chEvent <- WatcherLiveStart
+ } else {
+ logger.Println("The live is not started yet. Waiting...")
+ }
+
+ for {
+ select {
+ case <-ctx.Done():
+ return nil, ErrSuccess
+ case <-heartBeatTimer.C:
+ err = heartbeat()
+ if err != nil {
+ return fmt.Errorf("heartbeat failed: %w", err), ErrTransport
+ }
+ default:
+ var msg dmpkg.DanmakuExchange
+ msg, err = dm.ReadExchange()
+ if err != nil {
+ return fmt.Errorf("exchange read failed: %w", err), ErrTransport
+ }
+ // the exchange may be compressed
+ msg, err = msg.Inflate()
+ if err != nil {
+ return fmt.Errorf("inflate server message failed: %v", err), ErrProtocol
+ }
+
+ switch msg.Operation {
+ case dmpkg.OpLayer7Data:
+ logger.Printf("server message: op %v, body %v\n", msg.Operation, string(msg.Body))
+ var info liveInfo
+ err := json.Unmarshal(msg.Body, &info)
+ if err != nil {
+ logger.Printf("ERROR: invalid JSON: \"%v\", exchange: %v", string(msg.Body), msg)
+ return fmt.Errorf("decode server message body JSON failed: %w", err), ErrProtocol
+ }
+ switch info.Command {
+ case CommandLiveStart:
+ if !isLiving {
+ chEvent <- WatcherLiveStart
+ isLiving = true
+ }
+ case CommandStreamPreparing:
+ if isLiving {
+ chEvent <- WatcherLiveStop
+ }
+ default:
+ logger.Printf("Ignoring server message %v %v %v\n",
+ info.Command, msg.Operation, string(msg.Body))
+ }
+ default:
+ logger.Printf("Server message: %v\n", msg.String())
+ }
+
+ }
+ }
+}