From 7902849dc021610b0ec16d1130b9515efd1f64b1 Mon Sep 17 00:00:00 2001 From: Keuin Date: Wed, 14 Sep 2022 03:02:51 +0800 Subject: Refactor: proper error handling. --- recording/runner.go | 357 ++++++++++++++++++++++++++++------------------------ 1 file changed, 194 insertions(+), 163 deletions(-) (limited to 'recording/runner.go') diff --git a/recording/runner.go b/recording/runner.go index 58bcbc9..fe518b8 100644 --- a/recording/runner.go +++ b/recording/runner.go @@ -12,9 +12,11 @@ import ( "fmt" "github.com/keuin/slbr/bilibili" "github.com/keuin/slbr/common" + "github.com/keuin/slbr/logging" "io" "os" "path" + "sync" "time" ) @@ -27,40 +29,40 @@ type TaskResult struct { const kReadChunkSize = 128 * 1024 const kSpecialExtName = "partial" +var errLiveEnded = NewRecoverableTaskError("live is ended", nil) + // runTaskWithAutoRestart // start a monitor&download task. // The task will be restarted infinitely until the context is closed, // which means it will survive when the live is ended. (It always waits for the next start) // During the process, its status may change. // Note: this method is blocking. -func (t *RunningTask) runTaskWithAutoRestart() error { +func (t *RunningTask) runTaskWithAutoRestart() { + t.status = StRunning +loop: for { - t.status = StRunning - err := tryRunTask(t) - if err == nil || - errors.Is(err, bilibili.ErrRoomIsClosed) || - errors.Is(err, io.EOF) || - errors.Is(err, io.ErrUnexpectedEOF) { - if !errors.Is(err, io.EOF) { - t.logger.Error("Task stopped because of an error: %v", err) + switch err := tryRunTask(t); err.(type) { + case nil: + t.logger.Info("Task stopped: %v", t.String()) + case *RecoverableTaskError: + if err != errLiveEnded { + t.logger.Error("Temporary error: %v", err) } t.status = StRestarting - t.logger.Info("Restarting task...") - continue - } else if err != nil && !errors.Is(err, context.Canceled) { - t.logger.Error("Task stopped with an error: %v", err) - return fmt.Errorf("task stopped: %v", err) - } else { - t.logger.Info("Task stopped: %v", t.String()) - return nil + default: + if !errors.Is(err, context.Canceled) { + t.logger.Error("Cannot recover from error: %v", err) + } + break loop } } + t.logger.Info("Task stopped: %v", t.String()) } // tryRunTask does the actual work. It will return when in the following cases: -// - the task context is cancelled -// - the task is restarting (e.g. because of the end of live) -// - some unrecoverable error happens (e.g. a protocol error caused by a bilibili protocol update) +// RecoverableError (end of live, IO error) +// UnrecoverableError (protocol error) +// context.Cancelled (the task is stopping) func tryRunTask(t *RunningTask) error { netTypes := t.Transport.AllowedNetworkTypes t.logger.Info("Network types: %v", netTypes) @@ -68,117 +70,205 @@ func tryRunTask(t *RunningTask) error { t.logger.Info("Start task: room %v", t.RoomId) t.logger.Info("Getting notification server info...") - authKey, dmUrl, err := getDanmakuServer(&t.TaskConfig, bi) + + type dmServerInfo struct { + AuthKey string + DmUrl string + } + + dmInfo, err := AutoRetryWithTask( + t, func() (info dmServerInfo, err error) { + info.AuthKey, info.DmUrl, err = getDanmakuServer(&t.TaskConfig, bi) + return + }, + ) if err != nil { - return err + return NewRecoverableTaskError("cannot get notification server info", err) } + t.logger.Info("Success.") + // wait for watcher goroutine + wg := sync.WaitGroup{} + defer wg.Wait() + + liveStatusChecker := func() (bool, error) { + resp, err := bi.GetRoomPlayInfo(t.RoomId) + if err != nil { + return false, err + } + if resp.Code != 0 { + return false, fmt.Errorf("bilibili API error: %v", resp.Message) + } + return resp.Data.LiveStatus.IsStreaming(), nil + } + // run live status watcher asynchronously t.logger.Info("Starting watcher...") - chWatcherEvent := make(chan WatcherEvent) - chWatcherDown := make(chan struct{}) - // start and recover watcher asynchronously - // the watcher may also be stopped by the downloader goroutine - watcherCtx, stopWatcher := context.WithCancel(t.ctx) + wg.Add(1) + chWatcherError := make(chan error) + ctxWatcher, stopWatcher := context.WithCancel(t.ctx) defer stopWatcher() - go watcherRecoverableLoop( - watcherCtx, - dmUrl, - authKey, - t, - bi, - chWatcherEvent, - chWatcherDown, - ) - - // The stream download goroutine may fail due to wrong watcher state. - // But this is likely temporarily, so we should restart the downloader - // until the state turns to closed. - - recorderCtx, stopRecorder := context.WithCancel(t.ctx) - defer stopRecorder() - - ev := <-chWatcherEvent - switch ev { - case WatcherLiveStart: - var err2 error - // restart recorder if interrupted by I/O errors - _, err2 = record(recorderCtx, bi, t) - if errors.Is(err2, io.EOF) { - t.logger.Info("The live seems to be closed normally.") - } else if errors.Is(err2, io.ErrUnexpectedEOF) { - t.logger.Warning("Reading is interrupted because of an unexpected EOF.") - } else { - t.logger.Error("Error when copying live stream: %v", err2) + go func() { + var err error + defer wg.Done() + run := true + loop: + for run { + err = watch( + ctxWatcher, + t.TaskConfig, + dmInfo.DmUrl, + dmInfo.AuthKey, + liveStatusChecker, + t.logger, + ) + switch err.(type) { + case nil: + // live is started, stop watcher loop and start the recorder + break loop + case *RecoverableTaskError: + // if the watcher fails and recoverable, just try to recover + // because the recorder has not started yet + run = true + t.logger.Error("Error occurred in live status watcher: %v", err) + break + case *UnrecoverableTaskError: + // the watcher cannot recover, so the task should be stopped + run = false + t.logger.Error("Error occurred in live status watcher: %v", err) + default: + run = false + // the task is being cancelled + if errors.Is(err, context.Canceled) { + break loop + } + // unknown error type, this should not happen + t.logger.Error("Unexpected type of error in watcher: %v", err) + } + if run { + t.logger.Info("Restarting watcher...") + } else { + t.logger.Error("Cannot restart watcher to recover from that error.") + } } - t.logger.Info("Stop recording.") - return err2 - case WatcherLiveStop: - // once the live is ended, the watcher will no longer receive live start event - // we have to restart the watcher - return bilibili.ErrRoomIsClosed + chWatcherError <- err + }() + + // wait for live start signal or the watcher stops abnormally + switch errWatcher := <-chWatcherError; errWatcher.(type) { + case nil: + // live is started, start recording + // (now the watcher should have stopped) + return func() error { + var err error + run := true + for run { + err = record(t.ctx, bi, &t.TaskConfig, t.logger) + switch err.(type) { + case nil: + // live is ended + t.logger.Info("The live is ended. Restarting current task...") + return errLiveEnded + case *RecoverableTaskError: + // here we don't know if the live is ended, so we have to do a check + t.logger.Warning("Recording is interrupted. Checking live status...") + isLiving, err2 := AutoRetryWithTask(t, liveStatusChecker) + if err2 != nil { + return NewRecoverableTaskError( + "when handling an error, another error occurred", + fmt.Errorf("first: %v, second: %w", err, err2), + ) + } + if isLiving { + t.logger.Info("This is a temporary error. Restarting recording...") + } else { + t.logger.Info("The live is ended. Restarting current task...") + return errLiveEnded + } + run = isLiving + break + default: + run = false + if errors.Is(err, context.Canceled) { + t.logger.Info("Recorder is stopped.") + } else if errors.Is(err, io.EOF) { + t.logger.Info("The live seems to be closed normally.") + } else if errors.Is(err, io.ErrUnexpectedEOF) { + t.logger.Warning("Reading is interrupted because of an unexpected EOF.") + } else { + t.logger.Error("Error when copying live stream: %v", err) + } + t.logger.Info("Stop recording.") + } + } + return err + }() + case *UnrecoverableTaskError: + // watcher is stopped and cannot restart + return NewUnrecoverableTaskError("failed to watch live status", errWatcher) default: - return fmt.Errorf("unknown watcher event: %v", ev) + // watcher is cancelled, stop running the task + if errors.Is(errWatcher, context.Canceled) { + return errWatcher + } + // unexpected error, this is a programming error + return NewUnrecoverableTaskError("unexpected error type", errWatcher) } } // record. When cancelled, the caller should clean up immediately and stop the task. +// Errors: +// RecoverableError +// UnrecoverableError +// context.Cancelled +// nil (live is ended normally) func record( ctx context.Context, bi bilibili.Bilibili, - task *RunningTask, -) (cancelled bool, err error) { - task.logger.Info("Getting room profile...") + task *TaskConfig, + logger logging.Logger, +) error { + logger.Info("Getting room profile...") - profile, err := common.AutoRetry( + profile, err := AutoRetryWithConfig( ctx, + logger, + task, func() (bilibili.RoomProfileResponse, error) { return bi.GetRoomProfile(task.RoomId) }, - task.Transport.MaxRetryTimes, - time.Duration(task.Transport.RetryIntervalSeconds)*time.Second, - &task.logger, ) - if errors.Is(err, context.Canceled) { - cancelled = true - return - } if err != nil { - // still error, abort - task.logger.Error("Cannot get room information: %v. Stopping current task.", err) - cancelled = true - return + if errors.Is(err, context.Canceled) { + return err + } + return NewRecoverableTaskError("failed to get living room information", err) } - task.logger.Info("Getting stream url...") - urlInfo, err := common.AutoRetry( + logger.Info("Getting stream url...") + urlInfo, err := AutoRetryWithConfig( ctx, + logger, + task, func() (bilibili.RoomUrlInfoResponse, error) { return bi.GetStreamingInfo(task.RoomId) }, - task.Transport.MaxRetryTimes, - time.Duration(task.Transport.RetryIntervalSeconds)*time.Second, - &task.logger, ) - if errors.Is(err, context.Canceled) { - cancelled = true - return - } if err != nil { - task.logger.Error("Cannot get streaming info: %v", err) - cancelled = true - return + if errors.Is(err, context.Canceled) { + return err + } + return NewRecoverableTaskError("failed to get live info", err) } if len(urlInfo.Data.URLs) == 0 { j, err2 := json.Marshal(urlInfo) if err2 != nil { j = []byte("(not available)") } - task.logger.Error("No stream returned from API. Response: %v", string(j)) - cancelled = true - return + logger.Error("No stream was provided. Response: %v", string(j)) + return NewUnrecoverableTaskError("invalid live info", fmt.Errorf("no stream provided")) } streamSource := urlInfo.Data.URLs[0] @@ -187,7 +277,7 @@ func record( // the real extension name (without renaming) originalExtName := common.Errorable[string](common.GetFileExtensionFromUrl(streamSource.URL)).OrElse("flv") - if task.TaskConfig.Download.UseSpecialExtNameBeforeFinishing { + if task.Download.UseSpecialExtNameBeforeFinishing { extName = kSpecialExtName } else { extName = originalExtName @@ -200,9 +290,8 @@ func record( file, err := os.OpenFile(filePath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644) if err != nil { - task.logger.Error("Cannot open file for writing: %v", err) - cancelled = true - return + logger.Error("Cannot open file for writing: %v", err) + return NewUnrecoverableTaskError("cannot open file for writing", err) } // rename the extension name to originalExtName when finish writing defer func() { @@ -213,10 +302,10 @@ func record( to := path.Join(saveDir, common.CombineFileName(baseName, originalExtName)) err := os.Rename(from, to) if err != nil { - task.logger.Error("Cannot rename %v to %v: %v", from, to, err) + logger.Error("Cannot rename %v to %v: %v", from, to, err) return } - task.logger.Info("Rename file \"%s\" to \"%s\".", from, to) + logger.Info("Rename file \"%s\" to \"%s\".", from, to) }() defer func() { _ = file.Close() }() @@ -228,72 +317,14 @@ func record( writeBufferSize += kReadChunkSize - mod } writeBuffer := make([]byte, writeBufferSize) - task.logger.Info("Write buffer size: %v byte", writeBufferSize) - task.logger.Info("Recording live stream to file \"%v\"...", filePath) + logger.Info("Write buffer size: %v byte", writeBufferSize) + logger.Info("Recording live stream to file \"%v\"...", filePath) err = bi.CopyLiveStream(ctx, task.RoomId, streamSource, file, writeBuffer, kReadChunkSize) - cancelled = err == nil || errors.Is(err, context.Canceled) - if !cancelled { - // real error happens - task.logger.Error("Error when copying live stream: %v", err) - } - return -} - -// watcherRecoverableLoop run watcher forever until the context is cancelled. -func watcherRecoverableLoop( - ctx context.Context, - url string, - authKey string, - task *RunningTask, - bi bilibili.Bilibili, - chWatcherEvent chan<- WatcherEvent, - chWatcherDown chan<- struct{}, -) { - for { - err, errReason := watch( - ctx, - url, - authKey, - task.RoomId, - func() (bool, error) { - resp, err := bi.GetRoomPlayInfo(task.RoomId) - if err != nil { - return false, err - } - if resp.Code != 0 { - return false, fmt.Errorf("bilibili API error: %v", resp.Message) - } - return resp.Data.LiveStatus.IsStreaming(), nil - }, - chWatcherEvent, - task.logger, - ) - - // the context is cancelled, stop watching - if errors.Is(err, context.Canceled) { - return - } - - switch errReason { - case ErrSuccess: - // stop normally, the context is closed - return - case ErrProtocol: - task.logger.Fatal("Watcher stopped due to an unrecoverable error: %v", err) - // shutdown the whole task - chWatcherDown <- struct{}{} - return - case ErrTransport: - task.logger.Error("Watcher stopped due to an I/O error: %v", err) - waitSeconds := task.Transport.RetryIntervalSeconds - task.logger.Warning( - "Sleep for %v second(s) before restarting watcher.\n", - waitSeconds, - ) - time.Sleep(time.Duration(waitSeconds) * time.Second) - task.logger.Info("Retrying...") - } + if errors.Is(err, context.Canceled) || err == nil { + return err } + logger.Error("Error when copying live stream: %v", err) + return NewRecoverableTaskError("stream copy was unexpectedly interrupted", err) } func getDanmakuServer( -- cgit v1.2.3