diff options
author | Keuin <[email protected]> | 2022-09-10 16:05:10 +0800 |
---|---|---|
committer | Keuin <[email protected]> | 2022-09-10 16:05:10 +0800 |
commit | e999937d75d8e8c40b06add376ebac423b0c2079 (patch) | |
tree | 8a30abaa5fc8fc68d283e0be52a73ee83bdaf3a2 | |
parent | f028bff042f471a68dff681af9c79ef96bc952e5 (diff) |
Fix task is not properly restarted when the live is closed and started again.
Use more friendly log format to replace golang's default `log.Logger`. (not completed)
Cleaner task status management.
-rw-r--r-- | bilibili/streaming.go | 12 | ||||
-rw-r--r-- | common/bytesize.go | 19 | ||||
-rw-r--r-- | common/copy.go | 32 | ||||
-rw-r--r-- | logging/logger.go | 77 | ||||
-rw-r--r-- | main.go | 33 | ||||
-rw-r--r-- | recording/runner.go | 143 | ||||
-rw-r--r-- | recording/task.go | 87 |
7 files changed, 300 insertions, 103 deletions
diff --git a/bilibili/streaming.go b/bilibili/streaming.go index 64dc26d..9a26c2d 100644 --- a/bilibili/streaming.go +++ b/bilibili/streaming.go @@ -52,20 +52,10 @@ func (b Bilibili) CopyLiveStream( defer func() { _ = resp.Body.Close() }() - // guard the following copy loop - // if the context is cancelled, stop it by closing the reader - guardianCtx, cancelGuardian := context.WithCancel(ctx) - go func() { - <-guardianCtx.Done() - _ = resp.Body.Close() - }() - defer cancelGuardian() - // blocking copy n, err := common.CopyToFileWithBuffer(ctx, out, resp.Body, buffer, readChunkSize, false) if err != nil && !errors.Is(err, context.Canceled) { - // real error happens b.error.Printf("Stream copying was interrupted unexpectedly: %v", err) } @@ -73,6 +63,6 @@ func (b Bilibili) CopyLiveStream( b.info.Printf("The live is ended. (room %v)", roomId) } - b.info.Printf("Bytes copied: %v", n) + b.info.Printf("Total downloaded: %v", common.PrettyBytes(uint64(n))) return err } diff --git a/common/bytesize.go b/common/bytesize.go new file mode 100644 index 0000000..9b3aa97 --- /dev/null +++ b/common/bytesize.go @@ -0,0 +1,19 @@ +package common + +import "fmt" + +func PrettyBytes(b uint64) string { + if b < 1000 { + return fmt.Sprintf("%d Byte", b) + } + if b < 1000_000 { + return fmt.Sprintf("%.2f KiB", float64(b)/1024) + } + if b < 1000_000_000 { + return fmt.Sprintf("%.2f MiB", float64(b)/1024/1024) + } + if b < 1000_000_000_000 { + return fmt.Sprintf("%.2f GiB", float64(b)/1024/1024/1024) + } + return fmt.Sprintf("%.2f TiB", float64(b)/1024/1024/1024/1024) +} diff --git a/common/copy.go b/common/copy.go index bee6515..1d273dc 100644 --- a/common/copy.go +++ b/common/copy.go @@ -36,29 +36,31 @@ func CopyToFileWithBuffer( }() for { - if err = ctx.Err(); err != nil { + select { + case <-ctx.Done(): return - } - nRead, err = in.Read(buffer[off:Min[int](off+chunkSize, bufSize)]) - if err != nil { - return - } - off += nRead - if off == bufSize { - // buffer is full - var nWritten int - nWritten, err = out.Write(buffer) + default: + nRead, err = in.Read(buffer[off:Min[int](off+chunkSize, bufSize)]) if err != nil { return } - if syncFile { - err = out.Sync() + off += nRead + if off == bufSize { + // buffer is full + var nWritten int + nWritten, err = out.Write(buffer) if err != nil { return } + if syncFile { + err = out.Sync() + if err != nil { + return + } + } + written += int64(nWritten) + off = 0 } - written += int64(nWritten) - off = 0 } } } diff --git a/logging/logger.go b/logging/logger.go new file mode 100644 index 0000000..9724c2f --- /dev/null +++ b/logging/logger.go @@ -0,0 +1,77 @@ +package logging + +/* +golang's `log` package sucks, so we wrap it. +*/ + +import ( + "fmt" + "log" + "runtime" +) + +type Logger struct { + delegate *log.Logger + prefix string + debugHeader string + infoHeader string + warningHeader string + errorHeader string + fatalHeader string +} + +const ( + kDebug = "DEBUG" + kInfo = "INFO" + kWarning = "WARNING" + kError = "ERROR" + kFatal = "FATAL" +) + +func NewWrappedLogger(delegate *log.Logger, name string) Logger { + return Logger{ + delegate: delegate, + debugHeader: fmt.Sprintf("[%v][%v]", name, kDebug), + infoHeader: fmt.Sprintf("[%v][%v]", name, kInfo), + warningHeader: fmt.Sprintf("[%v][%v]", name, kWarning), + errorHeader: fmt.Sprintf("[%v][%v]", name, kError), + fatalHeader: fmt.Sprintf("[%v][%v]", name, kFatal), + } +} + +func getCallerInfo() string { + _, file, line, ok := runtime.Caller(2) + if !ok { + file = "???" + line = 0 + } + short := file + for i := len(file) - 1; i > 0; i-- { + if file[i] == '/' { + short = file[i+1:] + break + } + } + file = short + return fmt.Sprintf("[%v:%v]", file, line) +} + +func (l Logger) Debug(format string, v ...any) { + l.delegate.Printf(l.debugHeader+getCallerInfo()+" "+format, v...) +} + +func (l Logger) Info(format string, v ...any) { + l.delegate.Printf(l.infoHeader+getCallerInfo()+" "+format, v...) +} + +func (l Logger) Warning(format string, v ...any) { + l.delegate.Printf(l.warningHeader+getCallerInfo()+" "+format, v...) +} + +func (l Logger) Error(format string, v ...any) { + l.delegate.Printf(l.errorHeader+getCallerInfo()+" "+format, v...) +} + +func (l Logger) Fatal(format string, v ...any) { + l.delegate.Printf(l.fatalHeader+getCallerInfo()+" "+format, v...) +} @@ -1,7 +1,13 @@ package main +/* +In this file we implement config file and command line arguments parsing. +Task lifecycle management are implemented in recording package. +*/ + import ( "bilibili-livestream-archiver/common" + "bilibili-livestream-archiver/logging" "bilibili-livestream-archiver/recording" "context" "fmt" @@ -127,23 +133,32 @@ func getTasks() (tasks []recording.TaskConfig) { } func main() { - tasks := getTasks() + logger := log.Default() + taskConfigs := getTasks() + tasks := make([]recording.RunningTask, len(taskConfigs)) + wg := sync.WaitGroup{} + ctxTasks, cancelTasks := context.WithCancel(context.Background()) fmt.Println("Record tasks:") - for i, task := range tasks { + for i, task := range taskConfigs { + tasks[i] = recording.NewRunningTask( + taskConfigs[i], + ctxTasks, + func() { wg.Add(1) }, + func() { wg.Done() }, + logging.NewWrappedLogger(logger, fmt.Sprintf("room %v", task.RoomId)), + ) fmt.Printf("[%2d] %s\n", i+1, task) } fmt.Println("") - logger := log.Default() - logger.Printf("Starting tasks...") - wg := sync.WaitGroup{} - ctx, cancelTasks := context.WithCancel(context.Background()) - for _, task := range tasks { - wg.Add(1) - go recording.RunTask(ctx, &wg, &task) + for i := range tasks { + err := tasks[i].StartTask() + if err != nil { + logger.Printf("Cannot start task %v (room %v): %v. Skip.", i, tasks[i].RoomId, err) + } } // listen on stop signals diff --git a/recording/runner.go b/recording/runner.go index 2f15abd..98d4432 100644 --- a/recording/runner.go +++ b/recording/runner.go @@ -15,7 +15,6 @@ import ( "log" "os" "path" - "sync" "time" ) @@ -25,93 +24,106 @@ type TaskResult struct { Error error } -const kReadChunkSize = 64 * 1024 +const kReadChunkSize = 128 * 1024 -// RunTask start a monitor&download task and -// put its execution result into a channel. -func RunTask(ctx context.Context, wg *sync.WaitGroup, task *TaskConfig) { - defer wg.Done() - err := doTask(ctx, task) - logger := log.Default() - if err != nil && !errors.Is(err, context.Canceled) { - logger.Printf("A task stopped with an error (room %v): %v", task.RoomId, err) - } else { - logger.Printf("Task stopped (room %v): %v", task.RoomId, task.String()) +// runTaskWithAutoRestart +// start a monitor&download task. +// The task will be restarted infinitely until the context is closed, +// which means it will survive when the live is ended. (It always waits for the next start) +// During the process, its status may change. +// Note: this method is blocking. +func (t *RunningTask) runTaskWithAutoRestart() error { + for { + t.status = StRunning + err := tryRunTask(t) + if errors.Is(err, bilibili.ErrRoomIsClosed) { + t.status = StRestarting + t.logger.Info("Restarting task...") + continue + } else if err != nil && !errors.Is(err, context.Canceled) { + t.logger.Error("Task stopped with an error: %v", err) + return fmt.Errorf("task stopped: %v", err) + } else { + t.logger.Info("Task stopped: %v", t.String()) + return nil + } } } -// doTask do the actual work, but returns synchronously. -func doTask(ctx context.Context, task *TaskConfig) error { - logger := log.Default() - netTypes := task.Transport.AllowedNetworkTypes - logger.Printf("Network types: %v", netTypes) +// tryRunTask does the actual work. It will return when in the following cases: +// - the task context is cancelled +// - the task is restarting (e.g. because of the end of live) +// - some unrecoverable error happens (e.g. a protocol error caused by a bilibili protocol update) +func tryRunTask(t *RunningTask) error { + netTypes := t.Transport.AllowedNetworkTypes + t.logger.Info("Network types: %v", netTypes) bi := bilibili.NewBilibiliWithNetType(netTypes) - logger.Printf("Start task: room %v", task.RoomId) + t.logger.Info("Start task: room %v", t.RoomId) - authKey, url, err := getStreamingServer(task, logger, bi) + t.logger.Info("Getting notification server info...") + authKey, dmUrl, err := getDanmakuServer(&t.TaskConfig, bi) if err != nil { return err } + t.logger.Info("Success.") // run live status watcher asynchronously - logger.Println("Starting watcher...") + t.logger.Info("Starting watcher...") chWatcherEvent := make(chan WatcherEvent) chWatcherDown := make(chan struct{}) // start and recover watcher asynchronously // the watcher may also be stopped by the downloader goroutine - watcherCtx, stopWatcher := context.WithCancel(ctx) + watcherCtx, stopWatcher := context.WithCancel(t.ctx) defer stopWatcher() - go watcherRecoverableLoop(watcherCtx, url, authKey, task, bi, chWatcherEvent, chWatcherDown) + go watcherRecoverableLoop( + watcherCtx, + dmUrl, + authKey, + t, + bi, + chWatcherEvent, + chWatcherDown, + ) // The stream download goroutine may fail due to wrong watcher state. // But this is likely temporarily, so we should restart the downloader // until the state turns to closed. - // We store the last modified live status - // in case there is a false-positive duplicate. - lastStatusIsLiving := false - recorderCtx, stopRecorder := context.WithCancel(ctx) + recorderCtx, stopRecorder := context.WithCancel(t.ctx) defer stopRecorder() for { select { - case <-ctx.Done(): - logger.Printf("Task (room %v) is stopped.", task.RoomId) + case <-t.ctx.Done(): + t.logger.Info("Task is stopped.") return nil case <-chWatcherDown: // watcher is down and unrecoverable, stop this task - return fmt.Errorf("task (room %v) stopped: watcher is down and unrecoverable", task.RoomId) + return fmt.Errorf("task (room %v) stopped: watcher is down and unrecoverable", t.RoomId) case ev := <-chWatcherEvent: switch ev { case WatcherLiveStart: - if lastStatusIsLiving { - logger.Println("Duplicate adjacent WatcherLiveStart event. Ignoring.") - continue - } - go func() { - cancelled := false - var err2 error - // restart recorder if interrupted by I/O errors - for !cancelled { - cancelled, err2 = record(recorderCtx, bi, task) - if err2 == bilibili.ErrRoomIsClosed { - sec := task.Watch.LiveInterruptedRestartSleepSeconds - if sec == 0 { - // default: 3s - // TODO move this to default config value (not easily supported by viper) - time.Sleep(3 * time.Second) - } - if sec > 0 { - logger.Printf("Sleep for %vs before restart recording.", sec) - time.Sleep(time.Duration(sec) * time.Second) - } - } + cancelled := false + var err2 error + // restart recorder if interrupted by I/O errors + for !cancelled { + cancelled, err2 = record(recorderCtx, bi, t) + // live is closed normally, do not restart in current function + // the watcher will wait for the next start + if errors.Is(err2, bilibili.ErrRoomIsClosed) { + t.logger.Info("Live is ended. Stop recording.") + return bilibili.ErrRoomIsClosed } - logger.Printf("Task is cancelled. Stop recording. (room %v)", task.RoomId) - }() - lastStatusIsLiving = true + if err2 != nil { + // some other unrecoverable error + return err2 + } + } + t.logger.Info("Task is cancelled. Stop recording.") case WatcherLiveStop: - lastStatusIsLiving = false + // once the live is ended, the watcher will no longer receive live start event + // we have to restart the watcher + return bilibili.ErrRoomIsClosed } } } @@ -121,7 +133,7 @@ func doTask(ctx context.Context, task *TaskConfig) error { func record( ctx context.Context, bi bilibili.Bilibili, - task *TaskConfig, + task *RunningTask, ) (cancelled bool, err error) { logger := log.Default() logger.Printf("INFO: Getting room profile...") @@ -214,13 +226,11 @@ func watcherRecoverableLoop( ctx context.Context, url string, authKey string, - task *TaskConfig, + task *RunningTask, bi bilibili.Bilibili, - chWatcherEvent chan WatcherEvent, + chWatcherEvent chan<- WatcherEvent, chWatcherDown chan<- struct{}, ) { - logger := log.Default() - for { err, errReason := watch( ctx, @@ -250,29 +260,27 @@ func watcherRecoverableLoop( // stop normally, the context is closed return case ErrProtocol: - logger.Printf("FATAL: Watcher stopped due to an unrecoverable error: %v", err) + task.logger.Fatal("Watcher stopped due to an unrecoverable error: %v", err) // shutdown the whole task chWatcherDown <- struct{}{} return case ErrTransport: - logger.Printf("ERROR: Watcher stopped due to an I/O error: %v", err) + task.logger.Error("ERROR: Watcher stopped due to an I/O error: %v", err) waitSeconds := task.Transport.RetryIntervalSeconds - logger.Printf( + task.logger.Warning( "WARNING: Sleep for %v second(s) before restarting watcher.\n", waitSeconds, ) time.Sleep(time.Duration(waitSeconds) * time.Second) - logger.Printf("Retrying...") + task.logger.Info("Retrying...") } } } -func getStreamingServer( +func getDanmakuServer( task *TaskConfig, - logger *log.Logger, bi bilibili.Bilibili, ) (string, string, error) { - logger.Println("Getting stream server info...") dmInfo, err := bi.GetDanmakuServerInfo(task.RoomId) if err != nil { return "", "", fmt.Errorf("failed to read stream server info: %w", err) @@ -280,7 +288,6 @@ func getStreamingServer( if len(dmInfo.Data.HostList) == 0 { return "", "", fmt.Errorf("no available stream server") } - logger.Println("Success.") // get authkey and ws url authKey := dmInfo.Data.Token diff --git a/recording/task.go b/recording/task.go new file mode 100644 index 0000000..9aa41a8 --- /dev/null +++ b/recording/task.go @@ -0,0 +1,87 @@ +package recording + +/* +In this file we implement task lifecycle management. +Concrete task works are done in the `runner.go` file. +*/ + +import ( + "bilibili-livestream-archiver/logging" + "context" + "fmt" +) + +type TaskStatus int + +const ( + StNotStarted TaskStatus = iota + StRunning + StRestarting + StStopped +) + +var ( + ErrTaskIsAlreadyStarted = fmt.Errorf("task is already started") + ErrTaskIsStopped = fmt.Errorf("restarting a stopped task is not allowed") +) + +// RunningTask is an augmented TaskConfig struct +// that contains volatile runtime information. +type RunningTask struct { + TaskConfig + // ctx: the biggest context this task uses. It may create children contexts. + ctx context.Context + // result: if the task is ended, here is the returned error + result error + // status: running status + status TaskStatus + // hookStarted: called asynchronously when the task is started. This won't be called when restarting. + hookStarted func() + // hookStopped: called asynchronously when the task is stopped. This won't be called when restarting. + hookStopped func() + // logger: where to print logs + logger logging.Logger +} + +func NewRunningTask( + config TaskConfig, + ctx context.Context, + hookStarted func(), + hookStopped func(), + logger logging.Logger, +) RunningTask { + return RunningTask{ + TaskConfig: config, + ctx: ctx, + status: StNotStarted, + hookStarted: hookStarted, + hookStopped: hookStopped, + logger: logger, + } +} + +func (t *RunningTask) StartTask() error { + st := t.status + switch st { + case StNotStarted: + // TODO real start + go func() { + defer func() { t.status = StStopped }() + t.hookStarted() + defer t.hookStopped() + // do the task + _ = t.runTaskWithAutoRestart() + }() + return nil + case StRunning: + return ErrTaskIsAlreadyStarted + case StRestarting: + return ErrTaskIsAlreadyStarted + case StStopped: + // we don't allow starting a stopped task + // because some state needs to be reset + // just create a new task and run + return ErrTaskIsStopped + } + panic(fmt.Errorf("invalid task status: %v", st)) +} |