From 8e15d802865ed57db0018c15ea5559c8bd44c01f Mon Sep 17 00:00:00 2001 From: Keuin Date: Wed, 7 Sep 2022 02:48:46 +0800 Subject: First working version. Just a POC. --- recording/config.go | 20 ++++ recording/runner.go | 252 +++++++++++++++++++++++++++++++++++++++++++++++++++ recording/watcher.go | 156 +++++++++++++++++++++++++++++++ 3 files changed, 428 insertions(+) create mode 100644 recording/config.go create mode 100644 recording/runner.go create mode 100644 recording/watcher.go (limited to 'recording') diff --git a/recording/config.go b/recording/config.go new file mode 100644 index 0000000..1a24508 --- /dev/null +++ b/recording/config.go @@ -0,0 +1,20 @@ +package recording + +import "bilibili-livestream-archiver/common" + +type TaskConfig struct { + RoomId common.RoomId `mapstructure:"room_id"` + Transport TransportConfig `mapstructure:"transport"` + Download DownloadConfig `mapstructure:"download"` +} + +type TransportConfig struct { + SocketTimeoutSeconds int `mapstructure:"socket_timeout_seconds"` + RetryIntervalSeconds int `mapstructure:"retry_interval_seconds"` + MaxRetryTimes int `mapstructure:"max_retry_times"` +} + +type DownloadConfig struct { + SaveDirectory string `mapstructure:"save_directory"` + FileNameTemplate string `mapstructure:"file_name_template"` +} diff --git a/recording/runner.go b/recording/runner.go new file mode 100644 index 0000000..cee4325 --- /dev/null +++ b/recording/runner.go @@ -0,0 +1,252 @@ +/* +This file contains task runner. +Task runner composes status monitor and stream downloader concrete task config. +The config can be load from a config file. +*/ +package recording + +import ( + "bilibili-livestream-archiver/bilibili" + "bilibili-livestream-archiver/common" + "context" + "encoding/json" + "fmt" + "log" + "os" + "path" + "time" +) + +// TaskResult represents an execution result of a task. +type TaskResult struct { + Task *TaskConfig + Error error +} + +// RunTask start a monitor&download task and +// put its execution result into a channel. +func RunTask(ctx context.Context, task *TaskConfig, chTaskResult chan<- TaskResult) { + err := doTask(ctx, task) + chTaskResult <- TaskResult{ + Task: task, + Error: err, + } +} + +// doTask do the actual work, but returns synchronously. +func doTask(ctx context.Context, task *TaskConfig) error { + logger := log.Default() + bi := bilibili.NewBilibili() + logger.Printf("Start task: room %v\n", task.RoomId) + + authKey, url, err := getStreamingServer(task, logger, bi) + if err != nil { + return err + } + + // run live status watcher asynchronously + logger.Println("Starting watcher...") + chWatcherEvent := make(chan WatcherEvent) + chWatcherDown := make(chan struct{}) + + // start and recover watcher asynchronously + // the watcher may also be stopped by the downloader goroutine + watcherCtx, stopWatcher := context.WithCancel(ctx) + defer stopWatcher() + go watcherRecoverableLoop(watcherCtx, url, authKey, task, bi, chWatcherEvent, chWatcherDown) + + // The stream download goroutine may fail due to wrong watcher state. + // But this is likely temporarily, so we should restart the downloader + // until the state turns to closed. + + // We store the last modified live status + // in case there is a false-positive duplicate. + lastStatusIsLiving := false + recorderCtx, stopRecorder := context.WithCancel(ctx) + defer stopRecorder() + for { + select { + case <-ctx.Done(): + logger.Printf("Task (room %v) is stopped.\n", task.RoomId) + return nil + case <-chWatcherDown: + // watcher is down and unrecoverable, stop this task + return fmt.Errorf("task (room %v) stopped: watcher is down and unrecoverable", task.RoomId) + case ev := <-chWatcherEvent: + switch ev { + case WatcherLiveStart: + if lastStatusIsLiving { + logger.Println("Duplicate adjacent WatcherLiveStart event. Ignoring.") + continue + } + go func() { + cancelled := false + // restart recorder if interrupted by I/O errors + for !cancelled { + cancelled = record(recorderCtx, bi, task) + } + logger.Printf("Task is cancelled. (room %v)\n", task.RoomId) + }() + lastStatusIsLiving = true + case WatcherLiveStop: + lastStatusIsLiving = false + } + } + } +} + +// record. When cancelled, the caller should clean up immediately and stop the task. +func record( + ctx context.Context, + bi bilibili.Bilibili, + task *TaskConfig, +) (cancelled bool) { + logger := log.Default() + logger.Printf("INFO: Getting room profile...\n") + + profile, err := common.AutoRetry( + func() (bilibili.RoomProfileResponse, error) { + return bi.GetRoomProfile(task.RoomId) + }, + task.Transport.MaxRetryTimes, + time.Duration(task.Transport.RetryIntervalSeconds)*time.Second, + logger, + ) + if err != nil { + // still error, abort + logger.Printf("ERROR: Cannot get room information: %v. Stopping current task.\n", err) + cancelled = true + return + } + + urlInfo, err := common.AutoRetry( + func() (bilibili.RoomUrlInfoResponse, error) { + return bi.GetStreamingInfo(task.RoomId) + }, + task.Transport.MaxRetryTimes, + time.Duration(task.Transport.RetryIntervalSeconds)*time.Second, + logger, + ) + if err != nil { + logger.Printf("ERROR: Cannot get streaming info: %v", err) + cancelled = true + return + } + if len(urlInfo.Data.URLs) == 0 { + j, err := json.Marshal(urlInfo) + if err != nil { + j = []byte("(not available)") + } + logger.Printf("ERROR: No stream returned from API. Response: %v", string(j)) + cancelled = true + return + } + streamSource := urlInfo.Data.URLs[0] + + fileName := fmt.Sprintf( + "%s.%s", + GenerateFileName(profile.Data.Title, time.Now()), + common.Optional[string](common.GetFileExtensionFromUrl(streamSource.URL)).OrElse("flv"), + ) + filePath := path.Join(task.Download.SaveDirectory, fileName) + file, err := os.OpenFile(filePath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644) + defer func() { _ = file.Close() }() + if err != nil { + logger.Printf("ERROR: Cannot open file for writing: %v", err) + cancelled = true + return + } + + logger.Printf("Recording live stream to file \"%v\"...", filePath) + err = bi.CopyLiveStream(ctx, task.RoomId, streamSource, file) + cancelled = false + return +} + +// watcherRecoverableLoop run watcher forever until the context is cancelled. +func watcherRecoverableLoop( + ctx context.Context, + url string, + authKey string, + task *TaskConfig, + bi bilibili.Bilibili, + chWatcherEvent chan WatcherEvent, + chWatcherDown chan<- struct{}, +) { + logger := log.Default() + + for { + err, errReason := watch( + ctx, + url, + authKey, + task.RoomId, + func() (bool, error) { + resp, err := bi.GetRoomPlayInfo(task.RoomId) + if err != nil { + return false, err + } + if resp.Code != 0 { + return false, fmt.Errorf("bilibili API error: %v", resp.Message) + } + return resp.Data.LiveStatus.IsStreaming(), nil + }, + chWatcherEvent, + ) + + switch errReason { + case ErrSuccess: + // stop normally, the context is closed + return + case ErrProtocol: + logger.Printf("FATAL: Watcher stopped due to an unrecoverable error: %v\n", err) + // shutdown the whole task + chWatcherDown <- struct{}{} + return + case ErrTransport: + logger.Printf("ERROR: Watcher stopped due to an I/O error: %v\n", err) + waitSeconds := task.Transport.RetryIntervalSeconds + logger.Printf( + "WARNING: Sleep for %v second(s) before restarting watcher.\n", + waitSeconds, + ) + time.Sleep(time.Duration(waitSeconds) * time.Second) + logger.Printf("Retrying...") + } + } +} + +func getStreamingServer( + task *TaskConfig, + logger *log.Logger, + bi bilibili.Bilibili, +) (string, string, error) { + logger.Println("Getting stream server info...") + dmInfo, err := bi.GetDanmakuServerInfo(task.RoomId) + if err != nil { + return "", "", fmt.Errorf("failed to read stream server info: %w", err) + } + if len(dmInfo.Data.HostList) == 0 { + return "", "", fmt.Errorf("no available stream server") + } + logger.Println("Success.") + + // get authkey and ws url + authKey := dmInfo.Data.Token + host := dmInfo.Data.HostList[0] + url := fmt.Sprintf("wss://%s:%d/sub", host.Host, host.WssPort) + return authKey, url, nil +} + +func GenerateFileName(roomName string, t time.Time) string { + ts := fmt.Sprintf( + "%d-%02d-%02d-%02d-%02d-%02d", + t.Year(), + t.Month(), + t.Day(), + t.Hour(), + t.Minute(), + t.Second(), + ) + return fmt.Sprintf("%s_%s", roomName, ts) +} diff --git a/recording/watcher.go b/recording/watcher.go new file mode 100644 index 0000000..439ffcb --- /dev/null +++ b/recording/watcher.go @@ -0,0 +1,156 @@ +package recording + +import ( + "bilibili-livestream-archiver/common" + "bilibili-livestream-archiver/danmaku" + "bilibili-livestream-archiver/danmaku/dmpkg" + "context" + "encoding/json" + "fmt" + "log" + "time" +) + +type WatcherEvent int + +const ( + WatcherLiveStart WatcherEvent = 0 + WatcherLiveStop WatcherEvent = 1 +) + +type liveCommand string + +const ( + CommandLiveStart = "LIVE" + CommandStreamPreparing = "PREPARING" +) + +type liveInfo struct { + Command liveCommand `json:"cmd"` +} + +type ErrorReason int + +const ( + ErrSuccess ErrorReason = iota // no error happens, normally closed + ErrTransport // I/O error, safe to retry + ErrProtocol // application protocol logic error, do not retry +) + +const ( + kHeartBeatInterval = 30 * time.Second +) + +// watch monitors live room status by subscribing messages from Bilibili danmaku server, +// which talks to the client via a WebSocket or TCP connection. +// In our implementation, we use WebSocket over SSL/TLS. +func watch( + ctx context.Context, + url string, + authKey string, + roomId common.RoomId, + liveStatusChecker func() (bool, error), + chEvent chan<- WatcherEvent, +) (error, ErrorReason) { + + logger := log.Default() + + var err error + + dm := danmaku.NewDanmakuClient() + defer func() { _ = dm.Disconnect() }() + + // connect to danmaku server for live online/offline notifications + err = dm.Connect(ctx, url) + if err != nil { + return fmt.Errorf("failed to connect to danmaku server: %w", err), ErrTransport + } + defer func() { _ = dm.Disconnect() }() + + // the danmaku server requires an auth token and room id when connected + logger.Println("ws connected. Authenticating...") + err = dm.Authenticate(roomId, authKey) + if err != nil { + return fmt.Errorf("auth failed: %w", err), ErrProtocol + } + + // the danmaku server requires heartbeat messages every 30 seconds + heartbeat := func() error { + err := dm.Heartbeat() + return err + } + + // send initial heartbeat immediately + err = heartbeat() + if err != nil { + return err, ErrTransport + } + + // create heartbeat timer + heartBeatTimer := time.NewTicker(kHeartBeatInterval) + defer func() { heartBeatTimer.Stop() }() + + logger.Println("Checking initial live status...") + isLiving, err := liveStatusChecker() + if err != nil { + return fmt.Errorf("check initial live status failed: %w", err), ErrTransport + } + + if isLiving { + logger.Println("The live is already started. Start recording immediately.") + chEvent <- WatcherLiveStart + } else { + logger.Println("The live is not started yet. Waiting...") + } + + for { + select { + case <-ctx.Done(): + return nil, ErrSuccess + case <-heartBeatTimer.C: + err = heartbeat() + if err != nil { + return fmt.Errorf("heartbeat failed: %w", err), ErrTransport + } + default: + var msg dmpkg.DanmakuExchange + msg, err = dm.ReadExchange() + if err != nil { + return fmt.Errorf("exchange read failed: %w", err), ErrTransport + } + // the exchange may be compressed + msg, err = msg.Inflate() + if err != nil { + return fmt.Errorf("inflate server message failed: %v", err), ErrProtocol + } + + switch msg.Operation { + case dmpkg.OpLayer7Data: + logger.Printf("server message: op %v, body %v\n", msg.Operation, string(msg.Body)) + var info liveInfo + err := json.Unmarshal(msg.Body, &info) + if err != nil { + logger.Printf("ERROR: invalid JSON: \"%v\", exchange: %v", string(msg.Body), msg) + return fmt.Errorf("decode server message body JSON failed: %w", err), ErrProtocol + } + switch info.Command { + case CommandLiveStart: + if !isLiving { + chEvent <- WatcherLiveStart + isLiving = true + } + case CommandStreamPreparing: + if isLiving { + chEvent <- WatcherLiveStop + } + default: + logger.Printf("Ignoring server message %v %v %v\n", + info.Command, msg.Operation, string(msg.Body)) + } + default: + logger.Printf("Server message: %v\n", msg.String()) + } + + } + } +} -- cgit v1.2.3