diff options
Diffstat (limited to 'recording')
-rw-r--r-- | recording/runner.go | 143 | ||||
-rw-r--r-- | recording/task.go | 87 |
2 files changed, 162 insertions, 68 deletions
diff --git a/recording/runner.go b/recording/runner.go index 2f15abd..98d4432 100644 --- a/recording/runner.go +++ b/recording/runner.go @@ -15,7 +15,6 @@ import ( "log" "os" "path" - "sync" "time" ) @@ -25,93 +24,106 @@ type TaskResult struct { Error error } -const kReadChunkSize = 64 * 1024 +const kReadChunkSize = 128 * 1024 -// RunTask start a monitor&download task and -// put its execution result into a channel. -func RunTask(ctx context.Context, wg *sync.WaitGroup, task *TaskConfig) { - defer wg.Done() - err := doTask(ctx, task) - logger := log.Default() - if err != nil && !errors.Is(err, context.Canceled) { - logger.Printf("A task stopped with an error (room %v): %v", task.RoomId, err) - } else { - logger.Printf("Task stopped (room %v): %v", task.RoomId, task.String()) +// runTaskWithAutoRestart +// start a monitor&download task. +// The task will be restarted infinitely until the context is closed, +// which means it will survive when the live is ended. (It always waits for the next start) +// During the process, its status may change. +// Note: this method is blocking. +func (t *RunningTask) runTaskWithAutoRestart() error { + for { + t.status = StRunning + err := tryRunTask(t) + if errors.Is(err, bilibili.ErrRoomIsClosed) { + t.status = StRestarting + t.logger.Info("Restarting task...") + continue + } else if err != nil && !errors.Is(err, context.Canceled) { + t.logger.Error("Task stopped with an error: %v", err) + return fmt.Errorf("task stopped: %v", err) + } else { + t.logger.Info("Task stopped: %v", t.String()) + return nil + } } } -// doTask do the actual work, but returns synchronously. -func doTask(ctx context.Context, task *TaskConfig) error { - logger := log.Default() - netTypes := task.Transport.AllowedNetworkTypes - logger.Printf("Network types: %v", netTypes) +// tryRunTask does the actual work. It will return when in the following cases: +// - the task context is cancelled +// - the task is restarting (e.g. because of the end of live) +// - some unrecoverable error happens (e.g. a protocol error caused by a bilibili protocol update) +func tryRunTask(t *RunningTask) error { + netTypes := t.Transport.AllowedNetworkTypes + t.logger.Info("Network types: %v", netTypes) bi := bilibili.NewBilibiliWithNetType(netTypes) - logger.Printf("Start task: room %v", task.RoomId) + t.logger.Info("Start task: room %v", t.RoomId) - authKey, url, err := getStreamingServer(task, logger, bi) + t.logger.Info("Getting notification server info...") + authKey, dmUrl, err := getDanmakuServer(&t.TaskConfig, bi) if err != nil { return err } + t.logger.Info("Success.") // run live status watcher asynchronously - logger.Println("Starting watcher...") + t.logger.Info("Starting watcher...") chWatcherEvent := make(chan WatcherEvent) chWatcherDown := make(chan struct{}) // start and recover watcher asynchronously // the watcher may also be stopped by the downloader goroutine - watcherCtx, stopWatcher := context.WithCancel(ctx) + watcherCtx, stopWatcher := context.WithCancel(t.ctx) defer stopWatcher() - go watcherRecoverableLoop(watcherCtx, url, authKey, task, bi, chWatcherEvent, chWatcherDown) + go watcherRecoverableLoop( + watcherCtx, + dmUrl, + authKey, + t, + bi, + chWatcherEvent, + chWatcherDown, + ) // The stream download goroutine may fail due to wrong watcher state. // But this is likely temporarily, so we should restart the downloader // until the state turns to closed. - // We store the last modified live status - // in case there is a false-positive duplicate. - lastStatusIsLiving := false - recorderCtx, stopRecorder := context.WithCancel(ctx) + recorderCtx, stopRecorder := context.WithCancel(t.ctx) defer stopRecorder() for { select { - case <-ctx.Done(): - logger.Printf("Task (room %v) is stopped.", task.RoomId) + case <-t.ctx.Done(): + t.logger.Info("Task is stopped.") return nil case <-chWatcherDown: // watcher is down and unrecoverable, stop this task - return fmt.Errorf("task (room %v) stopped: watcher is down and unrecoverable", task.RoomId) + return fmt.Errorf("task (room %v) stopped: watcher is down and unrecoverable", t.RoomId) case ev := <-chWatcherEvent: switch ev { case WatcherLiveStart: - if lastStatusIsLiving { - logger.Println("Duplicate adjacent WatcherLiveStart event. Ignoring.") - continue - } - go func() { - cancelled := false - var err2 error - // restart recorder if interrupted by I/O errors - for !cancelled { - cancelled, err2 = record(recorderCtx, bi, task) - if err2 == bilibili.ErrRoomIsClosed { - sec := task.Watch.LiveInterruptedRestartSleepSeconds - if sec == 0 { - // default: 3s - // TODO move this to default config value (not easily supported by viper) - time.Sleep(3 * time.Second) - } - if sec > 0 { - logger.Printf("Sleep for %vs before restart recording.", sec) - time.Sleep(time.Duration(sec) * time.Second) - } - } + cancelled := false + var err2 error + // restart recorder if interrupted by I/O errors + for !cancelled { + cancelled, err2 = record(recorderCtx, bi, t) + // live is closed normally, do not restart in current function + // the watcher will wait for the next start + if errors.Is(err2, bilibili.ErrRoomIsClosed) { + t.logger.Info("Live is ended. Stop recording.") + return bilibili.ErrRoomIsClosed } - logger.Printf("Task is cancelled. Stop recording. (room %v)", task.RoomId) - }() - lastStatusIsLiving = true + if err2 != nil { + // some other unrecoverable error + return err2 + } + } + t.logger.Info("Task is cancelled. Stop recording.") case WatcherLiveStop: - lastStatusIsLiving = false + // once the live is ended, the watcher will no longer receive live start event + // we have to restart the watcher + return bilibili.ErrRoomIsClosed } } } @@ -121,7 +133,7 @@ func doTask(ctx context.Context, task *TaskConfig) error { func record( ctx context.Context, bi bilibili.Bilibili, - task *TaskConfig, + task *RunningTask, ) (cancelled bool, err error) { logger := log.Default() logger.Printf("INFO: Getting room profile...") @@ -214,13 +226,11 @@ func watcherRecoverableLoop( ctx context.Context, url string, authKey string, - task *TaskConfig, + task *RunningTask, bi bilibili.Bilibili, - chWatcherEvent chan WatcherEvent, + chWatcherEvent chan<- WatcherEvent, chWatcherDown chan<- struct{}, ) { - logger := log.Default() - for { err, errReason := watch( ctx, @@ -250,29 +260,27 @@ func watcherRecoverableLoop( // stop normally, the context is closed return case ErrProtocol: - logger.Printf("FATAL: Watcher stopped due to an unrecoverable error: %v", err) + task.logger.Fatal("Watcher stopped due to an unrecoverable error: %v", err) // shutdown the whole task chWatcherDown <- struct{}{} return case ErrTransport: - logger.Printf("ERROR: Watcher stopped due to an I/O error: %v", err) + task.logger.Error("ERROR: Watcher stopped due to an I/O error: %v", err) waitSeconds := task.Transport.RetryIntervalSeconds - logger.Printf( + task.logger.Warning( "WARNING: Sleep for %v second(s) before restarting watcher.\n", waitSeconds, ) time.Sleep(time.Duration(waitSeconds) * time.Second) - logger.Printf("Retrying...") + task.logger.Info("Retrying...") } } } -func getStreamingServer( +func getDanmakuServer( task *TaskConfig, - logger *log.Logger, bi bilibili.Bilibili, ) (string, string, error) { - logger.Println("Getting stream server info...") dmInfo, err := bi.GetDanmakuServerInfo(task.RoomId) if err != nil { return "", "", fmt.Errorf("failed to read stream server info: %w", err) @@ -280,7 +288,6 @@ func getStreamingServer( if len(dmInfo.Data.HostList) == 0 { return "", "", fmt.Errorf("no available stream server") } - logger.Println("Success.") // get authkey and ws url authKey := dmInfo.Data.Token diff --git a/recording/task.go b/recording/task.go new file mode 100644 index 0000000..9aa41a8 --- /dev/null +++ b/recording/task.go @@ -0,0 +1,87 @@ +package recording + +/* +In this file we implement task lifecycle management. +Concrete task works are done in the `runner.go` file. +*/ + +import ( + "bilibili-livestream-archiver/logging" + "context" + "fmt" +) + +type TaskStatus int + +const ( + StNotStarted TaskStatus = iota + StRunning + StRestarting + StStopped +) + +var ( + ErrTaskIsAlreadyStarted = fmt.Errorf("task is already started") + ErrTaskIsStopped = fmt.Errorf("restarting a stopped task is not allowed") +) + +// RunningTask is an augmented TaskConfig struct +// that contains volatile runtime information. +type RunningTask struct { + TaskConfig + // ctx: the biggest context this task uses. It may create children contexts. + ctx context.Context + // result: if the task is ended, here is the returned error + result error + // status: running status + status TaskStatus + // hookStarted: called asynchronously when the task is started. This won't be called when restarting. + hookStarted func() + // hookStopped: called asynchronously when the task is stopped. This won't be called when restarting. + hookStopped func() + // logger: where to print logs + logger logging.Logger +} + +func NewRunningTask( + config TaskConfig, + ctx context.Context, + hookStarted func(), + hookStopped func(), + logger logging.Logger, +) RunningTask { + return RunningTask{ + TaskConfig: config, + ctx: ctx, + status: StNotStarted, + hookStarted: hookStarted, + hookStopped: hookStopped, + logger: logger, + } +} + +func (t *RunningTask) StartTask() error { + st := t.status + switch st { + case StNotStarted: + // TODO real start + go func() { + defer func() { t.status = StStopped }() + t.hookStarted() + defer t.hookStopped() + // do the task + _ = t.runTaskWithAutoRestart() + }() + return nil + case StRunning: + return ErrTaskIsAlreadyStarted + case StRestarting: + return ErrTaskIsAlreadyStarted + case StStopped: + // we don't allow starting a stopped task + // because some state needs to be reset + // just create a new task and run + return ErrTaskIsStopped + } + panic(fmt.Errorf("invalid task status: %v", st)) +} |