diff options
-rw-r--r-- | bilibili/errors.go | 7 | ||||
-rw-r--r-- | bilibili/streaming.go | 2 | ||||
-rw-r--r-- | recording/errors.go | 33 | ||||
-rw-r--r-- | recording/runner.go | 357 | ||||
-rw-r--r-- | recording/task.go | 34 | ||||
-rw-r--r-- | recording/watcher.go | 72 |
6 files changed, 288 insertions, 217 deletions
diff --git a/bilibili/errors.go b/bilibili/errors.go deleted file mode 100644 index 3f8ac9b..0000000 --- a/bilibili/errors.go +++ /dev/null @@ -1,7 +0,0 @@ -package bilibili - -import "fmt" - -var ( - ErrRoomIsClosed = fmt.Errorf("living room is closed") -) diff --git a/bilibili/streaming.go b/bilibili/streaming.go index c89126d..ea9207c 100644 --- a/bilibili/streaming.go +++ b/bilibili/streaming.go @@ -42,7 +42,7 @@ func (b Bilibili) CopyLiveStream( // 404 when not streaming if resp.StatusCode == http.StatusNotFound { - return ErrRoomIsClosed + return fmt.Errorf("live is not started or the room does not exist") } err = validateHttpStatus(resp) diff --git a/recording/errors.go b/recording/errors.go new file mode 100644 index 0000000..48a492e --- /dev/null +++ b/recording/errors.go @@ -0,0 +1,33 @@ +package recording + +import "fmt" + +/* +Task errors. +*/ + +type RecoverableTaskError struct { + err error + message string +} + +func (e *RecoverableTaskError) Error() string { + return fmt.Sprintf("%v: %v", e.message, e.err) +} + +func NewRecoverableTaskError(message string, err error) error { + return &RecoverableTaskError{message: message, err: err} +} + +type UnrecoverableTaskError struct { + err error + message string +} + +func (e *UnrecoverableTaskError) Error() string { + return fmt.Sprintf("%v: %v", e.message, e.err) +} + +func NewUnrecoverableTaskError(message string, err error) error { + return &UnrecoverableTaskError{message: message, err: err} +} diff --git a/recording/runner.go b/recording/runner.go index 58bcbc9..fe518b8 100644 --- a/recording/runner.go +++ b/recording/runner.go @@ -12,9 +12,11 @@ import ( "fmt" "github.com/keuin/slbr/bilibili" "github.com/keuin/slbr/common" + "github.com/keuin/slbr/logging" "io" "os" "path" + "sync" "time" ) @@ -27,40 +29,40 @@ type TaskResult struct { const kReadChunkSize = 128 * 1024 const kSpecialExtName = "partial" +var errLiveEnded = NewRecoverableTaskError("live is ended", nil) + // runTaskWithAutoRestart // start a monitor&download task. // The task will be restarted infinitely until the context is closed, // which means it will survive when the live is ended. (It always waits for the next start) // During the process, its status may change. // Note: this method is blocking. -func (t *RunningTask) runTaskWithAutoRestart() error { +func (t *RunningTask) runTaskWithAutoRestart() { + t.status = StRunning +loop: for { - t.status = StRunning - err := tryRunTask(t) - if err == nil || - errors.Is(err, bilibili.ErrRoomIsClosed) || - errors.Is(err, io.EOF) || - errors.Is(err, io.ErrUnexpectedEOF) { - if !errors.Is(err, io.EOF) { - t.logger.Error("Task stopped because of an error: %v", err) + switch err := tryRunTask(t); err.(type) { + case nil: + t.logger.Info("Task stopped: %v", t.String()) + case *RecoverableTaskError: + if err != errLiveEnded { + t.logger.Error("Temporary error: %v", err) } t.status = StRestarting - t.logger.Info("Restarting task...") - continue - } else if err != nil && !errors.Is(err, context.Canceled) { - t.logger.Error("Task stopped with an error: %v", err) - return fmt.Errorf("task stopped: %v", err) - } else { - t.logger.Info("Task stopped: %v", t.String()) - return nil + default: + if !errors.Is(err, context.Canceled) { + t.logger.Error("Cannot recover from error: %v", err) + } + break loop } } + t.logger.Info("Task stopped: %v", t.String()) } // tryRunTask does the actual work. It will return when in the following cases: -// - the task context is cancelled -// - the task is restarting (e.g. because of the end of live) -// - some unrecoverable error happens (e.g. a protocol error caused by a bilibili protocol update) +// RecoverableError (end of live, IO error) +// UnrecoverableError (protocol error) +// context.Cancelled (the task is stopping) func tryRunTask(t *RunningTask) error { netTypes := t.Transport.AllowedNetworkTypes t.logger.Info("Network types: %v", netTypes) @@ -68,117 +70,205 @@ func tryRunTask(t *RunningTask) error { t.logger.Info("Start task: room %v", t.RoomId) t.logger.Info("Getting notification server info...") - authKey, dmUrl, err := getDanmakuServer(&t.TaskConfig, bi) + + type dmServerInfo struct { + AuthKey string + DmUrl string + } + + dmInfo, err := AutoRetryWithTask( + t, func() (info dmServerInfo, err error) { + info.AuthKey, info.DmUrl, err = getDanmakuServer(&t.TaskConfig, bi) + return + }, + ) if err != nil { - return err + return NewRecoverableTaskError("cannot get notification server info", err) } + t.logger.Info("Success.") + // wait for watcher goroutine + wg := sync.WaitGroup{} + defer wg.Wait() + + liveStatusChecker := func() (bool, error) { + resp, err := bi.GetRoomPlayInfo(t.RoomId) + if err != nil { + return false, err + } + if resp.Code != 0 { + return false, fmt.Errorf("bilibili API error: %v", resp.Message) + } + return resp.Data.LiveStatus.IsStreaming(), nil + } + // run live status watcher asynchronously t.logger.Info("Starting watcher...") - chWatcherEvent := make(chan WatcherEvent) - chWatcherDown := make(chan struct{}) - // start and recover watcher asynchronously - // the watcher may also be stopped by the downloader goroutine - watcherCtx, stopWatcher := context.WithCancel(t.ctx) + wg.Add(1) + chWatcherError := make(chan error) + ctxWatcher, stopWatcher := context.WithCancel(t.ctx) defer stopWatcher() - go watcherRecoverableLoop( - watcherCtx, - dmUrl, - authKey, - t, - bi, - chWatcherEvent, - chWatcherDown, - ) - - // The stream download goroutine may fail due to wrong watcher state. - // But this is likely temporarily, so we should restart the downloader - // until the state turns to closed. - - recorderCtx, stopRecorder := context.WithCancel(t.ctx) - defer stopRecorder() - - ev := <-chWatcherEvent - switch ev { - case WatcherLiveStart: - var err2 error - // restart recorder if interrupted by I/O errors - _, err2 = record(recorderCtx, bi, t) - if errors.Is(err2, io.EOF) { - t.logger.Info("The live seems to be closed normally.") - } else if errors.Is(err2, io.ErrUnexpectedEOF) { - t.logger.Warning("Reading is interrupted because of an unexpected EOF.") - } else { - t.logger.Error("Error when copying live stream: %v", err2) + go func() { + var err error + defer wg.Done() + run := true + loop: + for run { + err = watch( + ctxWatcher, + t.TaskConfig, + dmInfo.DmUrl, + dmInfo.AuthKey, + liveStatusChecker, + t.logger, + ) + switch err.(type) { + case nil: + // live is started, stop watcher loop and start the recorder + break loop + case *RecoverableTaskError: + // if the watcher fails and recoverable, just try to recover + // because the recorder has not started yet + run = true + t.logger.Error("Error occurred in live status watcher: %v", err) + break + case *UnrecoverableTaskError: + // the watcher cannot recover, so the task should be stopped + run = false + t.logger.Error("Error occurred in live status watcher: %v", err) + default: + run = false + // the task is being cancelled + if errors.Is(err, context.Canceled) { + break loop + } + // unknown error type, this should not happen + t.logger.Error("Unexpected type of error in watcher: %v", err) + } + if run { + t.logger.Info("Restarting watcher...") + } else { + t.logger.Error("Cannot restart watcher to recover from that error.") + } } - t.logger.Info("Stop recording.") - return err2 - case WatcherLiveStop: - // once the live is ended, the watcher will no longer receive live start event - // we have to restart the watcher - return bilibili.ErrRoomIsClosed + chWatcherError <- err + }() + + // wait for live start signal or the watcher stops abnormally + switch errWatcher := <-chWatcherError; errWatcher.(type) { + case nil: + // live is started, start recording + // (now the watcher should have stopped) + return func() error { + var err error + run := true + for run { + err = record(t.ctx, bi, &t.TaskConfig, t.logger) + switch err.(type) { + case nil: + // live is ended + t.logger.Info("The live is ended. Restarting current task...") + return errLiveEnded + case *RecoverableTaskError: + // here we don't know if the live is ended, so we have to do a check + t.logger.Warning("Recording is interrupted. Checking live status...") + isLiving, err2 := AutoRetryWithTask(t, liveStatusChecker) + if err2 != nil { + return NewRecoverableTaskError( + "when handling an error, another error occurred", + fmt.Errorf("first: %v, second: %w", err, err2), + ) + } + if isLiving { + t.logger.Info("This is a temporary error. Restarting recording...") + } else { + t.logger.Info("The live is ended. Restarting current task...") + return errLiveEnded + } + run = isLiving + break + default: + run = false + if errors.Is(err, context.Canceled) { + t.logger.Info("Recorder is stopped.") + } else if errors.Is(err, io.EOF) { + t.logger.Info("The live seems to be closed normally.") + } else if errors.Is(err, io.ErrUnexpectedEOF) { + t.logger.Warning("Reading is interrupted because of an unexpected EOF.") + } else { + t.logger.Error("Error when copying live stream: %v", err) + } + t.logger.Info("Stop recording.") + } + } + return err + }() + case *UnrecoverableTaskError: + // watcher is stopped and cannot restart + return NewUnrecoverableTaskError("failed to watch live status", errWatcher) default: - return fmt.Errorf("unknown watcher event: %v", ev) + // watcher is cancelled, stop running the task + if errors.Is(errWatcher, context.Canceled) { + return errWatcher + } + // unexpected error, this is a programming error + return NewUnrecoverableTaskError("unexpected error type", errWatcher) } } // record. When cancelled, the caller should clean up immediately and stop the task. +// Errors: +// RecoverableError +// UnrecoverableError +// context.Cancelled +// nil (live is ended normally) func record( ctx context.Context, bi bilibili.Bilibili, - task *RunningTask, -) (cancelled bool, err error) { - task.logger.Info("Getting room profile...") + task *TaskConfig, + logger logging.Logger, +) error { + logger.Info("Getting room profile...") - profile, err := common.AutoRetry( + profile, err := AutoRetryWithConfig( ctx, + logger, + task, func() (bilibili.RoomProfileResponse, error) { return bi.GetRoomProfile(task.RoomId) }, - task.Transport.MaxRetryTimes, - time.Duration(task.Transport.RetryIntervalSeconds)*time.Second, - &task.logger, ) - if errors.Is(err, context.Canceled) { - cancelled = true - return - } if err != nil { - // still error, abort - task.logger.Error("Cannot get room information: %v. Stopping current task.", err) - cancelled = true - return + if errors.Is(err, context.Canceled) { + return err + } + return NewRecoverableTaskError("failed to get living room information", err) } - task.logger.Info("Getting stream url...") - urlInfo, err := common.AutoRetry( + logger.Info("Getting stream url...") + urlInfo, err := AutoRetryWithConfig( ctx, + logger, + task, func() (bilibili.RoomUrlInfoResponse, error) { return bi.GetStreamingInfo(task.RoomId) }, - task.Transport.MaxRetryTimes, - time.Duration(task.Transport.RetryIntervalSeconds)*time.Second, - &task.logger, ) - if errors.Is(err, context.Canceled) { - cancelled = true - return - } if err != nil { - task.logger.Error("Cannot get streaming info: %v", err) - cancelled = true - return + if errors.Is(err, context.Canceled) { + return err + } + return NewRecoverableTaskError("failed to get live info", err) } if len(urlInfo.Data.URLs) == 0 { j, err2 := json.Marshal(urlInfo) if err2 != nil { j = []byte("(not available)") } - task.logger.Error("No stream returned from API. Response: %v", string(j)) - cancelled = true - return + logger.Error("No stream was provided. Response: %v", string(j)) + return NewUnrecoverableTaskError("invalid live info", fmt.Errorf("no stream provided")) } streamSource := urlInfo.Data.URLs[0] @@ -187,7 +277,7 @@ func record( // the real extension name (without renaming) originalExtName := common.Errorable[string](common.GetFileExtensionFromUrl(streamSource.URL)).OrElse("flv") - if task.TaskConfig.Download.UseSpecialExtNameBeforeFinishing { + if task.Download.UseSpecialExtNameBeforeFinishing { extName = kSpecialExtName } else { extName = originalExtName @@ -200,9 +290,8 @@ func record( file, err := os.OpenFile(filePath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644) if err != nil { - task.logger.Error("Cannot open file for writing: %v", err) - cancelled = true - return + logger.Error("Cannot open file for writing: %v", err) + return NewUnrecoverableTaskError("cannot open file for writing", err) } // rename the extension name to originalExtName when finish writing defer func() { @@ -213,10 +302,10 @@ func record( to := path.Join(saveDir, common.CombineFileName(baseName, originalExtName)) err := os.Rename(from, to) if err != nil { - task.logger.Error("Cannot rename %v to %v: %v", from, to, err) + logger.Error("Cannot rename %v to %v: %v", from, to, err) return } - task.logger.Info("Rename file \"%s\" to \"%s\".", from, to) + logger.Info("Rename file \"%s\" to \"%s\".", from, to) }() defer func() { _ = file.Close() }() @@ -228,72 +317,14 @@ func record( writeBufferSize += kReadChunkSize - mod } writeBuffer := make([]byte, writeBufferSize) - task.logger.Info("Write buffer size: %v byte", writeBufferSize) - task.logger.Info("Recording live stream to file \"%v\"...", filePath) + logger.Info("Write buffer size: %v byte", writeBufferSize) + logger.Info("Recording live stream to file \"%v\"...", filePath) err = bi.CopyLiveStream(ctx, task.RoomId, streamSource, file, writeBuffer, kReadChunkSize) - cancelled = err == nil || errors.Is(err, context.Canceled) - if !cancelled { - // real error happens - task.logger.Error("Error when copying live stream: %v", err) - } - return -} - -// watcherRecoverableLoop run watcher forever until the context is cancelled. -func watcherRecoverableLoop( - ctx context.Context, - url string, - authKey string, - task *RunningTask, - bi bilibili.Bilibili, - chWatcherEvent chan<- WatcherEvent, - chWatcherDown chan<- struct{}, -) { - for { - err, errReason := watch( - ctx, - url, - authKey, - task.RoomId, - func() (bool, error) { - resp, err := bi.GetRoomPlayInfo(task.RoomId) - if err != nil { - return false, err - } - if resp.Code != 0 { - return false, fmt.Errorf("bilibili API error: %v", resp.Message) - } - return resp.Data.LiveStatus.IsStreaming(), nil - }, - chWatcherEvent, - task.logger, - ) - - // the context is cancelled, stop watching - if errors.Is(err, context.Canceled) { - return - } - - switch errReason { - case ErrSuccess: - // stop normally, the context is closed - return - case ErrProtocol: - task.logger.Fatal("Watcher stopped due to an unrecoverable error: %v", err) - // shutdown the whole task - chWatcherDown <- struct{}{} - return - case ErrTransport: - task.logger.Error("Watcher stopped due to an I/O error: %v", err) - waitSeconds := task.Transport.RetryIntervalSeconds - task.logger.Warning( - "Sleep for %v second(s) before restarting watcher.\n", - waitSeconds, - ) - time.Sleep(time.Duration(waitSeconds) * time.Second) - task.logger.Info("Retrying...") - } + if errors.Is(err, context.Canceled) || err == nil { + return err } + logger.Error("Error when copying live stream: %v", err) + return NewRecoverableTaskError("stream copy was unexpectedly interrupted", err) } func getDanmakuServer( diff --git a/recording/task.go b/recording/task.go index 3d417ba..78be348 100644 --- a/recording/task.go +++ b/recording/task.go @@ -8,7 +8,9 @@ Concrete task works are done in the `runner.go` file. import ( "context" "fmt" + "github.com/keuin/slbr/common" "github.com/keuin/slbr/logging" + "time" ) type TaskStatus int @@ -31,8 +33,6 @@ type RunningTask struct { TaskConfig // ctx: the biggest context this task uses. It may create children contexts. ctx context.Context - // result: if the task is ended, here is the returned error - result error // status: running status status TaskStatus // hookStarted: called asynchronously when the task is started. This won't be called when restarting. @@ -70,7 +70,7 @@ func (t *RunningTask) StartTask() error { t.hookStarted() defer t.hookStopped() // do the task - _ = t.runTaskWithAutoRestart() + t.runTaskWithAutoRestart() }() return nil case StRunning: @@ -85,3 +85,31 @@ func (t *RunningTask) StartTask() error { } panic(fmt.Errorf("invalid task status: %v", st)) } + +func AutoRetryWithTask[T any]( + t *RunningTask, + supplier func() (T, error), +) (T, error) { + return common.AutoRetry[T]( + t.ctx, + supplier, + t.Transport.MaxRetryTimes, + time.Duration(t.Transport.RetryIntervalSeconds)*time.Second, + &t.logger, + ) +} + +func AutoRetryWithConfig[T any]( + ctx context.Context, + logger logging.Logger, + t *TaskConfig, + supplier func() (T, error), +) (T, error) { + return common.AutoRetry[T]( + ctx, + supplier, + t.Transport.MaxRetryTimes, + time.Duration(t.Transport.RetryIntervalSeconds)*time.Second, + &logger, + ) +} diff --git a/recording/watcher.go b/recording/watcher.go index 4ae17b7..2510b8b 100644 --- a/recording/watcher.go +++ b/recording/watcher.go @@ -3,8 +3,6 @@ package recording import ( "context" "encoding/json" - "fmt" - "github.com/keuin/slbr/common" "github.com/keuin/slbr/danmaku" "github.com/keuin/slbr/danmaku/dmmsg" "github.com/keuin/slbr/danmaku/dmpkg" @@ -12,13 +10,6 @@ import ( "time" ) -type WatcherEvent int - -const ( - WatcherLiveStart WatcherEvent = 0 - WatcherLiveStop WatcherEvent = 1 -) - type liveCommand string const ( @@ -31,14 +22,6 @@ type liveInfo struct { Data map[string]interface{} `json:"data"` } -type ErrorReason int - -const ( - ErrSuccess ErrorReason = iota // no error happens, normally closed - ErrTransport // I/O error, safe to retry - ErrProtocol // application protocol logic error, do not retry -) - const ( kHeartBeatInterval = 30 * time.Second ) @@ -46,32 +29,41 @@ const ( // watch monitors live room status by subscribing messages from Bilibili danmaku server, // which talks to the client via a WebSocket or TCP connection. // In our implementation, we use WebSocket over SSL/TLS. +// This function will return after the live is started, +// since one connection cannot receive more than one live start event. +// Error types: +// - UnrecoverableError +// - RecoverableError +// - context.Cancelled func watch( ctx context.Context, + t TaskConfig, url string, authKey string, - roomId common.RoomId, liveStatusChecker func() (bool, error), - chEvent chan<- WatcherEvent, logger logging.Logger, -) (error, ErrorReason) { +) error { var err error dm := danmaku.NewDanmakuClient() - defer func() { _ = dm.Disconnect() }() // connect to danmaku server for live online/offline notifications err = dm.Connect(ctx, url) if err != nil { - return fmt.Errorf("failed to connect to danmaku server: %w", err), ErrTransport + return NewRecoverableTaskError("failed to connect to danmaku server", err) } - defer func() { _ = dm.Disconnect() }() + defer func() { + // this operation may be time-consuming, so run in another goroutine + go func() { + _ = dm.Disconnect() + }() + }() // the danmaku server requires an auth token and room id when connected logger.Info("ws connected. Authenticating...") - err = dm.Authenticate(roomId, authKey) + err = dm.Authenticate(t.RoomId, authKey) if err != nil { - return fmt.Errorf("auth failed: %w", err), ErrProtocol + return NewUnrecoverableTaskError("authentication failed, invalid protocol", err) } // the danmaku server requires heartbeat messages every 30 seconds @@ -83,7 +75,7 @@ func watch( // send initial heartbeat immediately err = heartbeat() if err != nil { - return err, ErrTransport + return NewRecoverableTaskError("heartbeat failed", err) } // create heartbeat timer @@ -91,14 +83,13 @@ func watch( defer func() { heartBeatTimer.Stop() }() logger.Info("Checking initial live status...") - isLiving, err := liveStatusChecker() + isLiving, err := AutoRetryWithConfig[bool](ctx, logger, &t, liveStatusChecker) if err != nil { - return fmt.Errorf("check initial live status failed: %w", err), ErrTransport + return NewRecoverableTaskError("check initial live status failed", err) } - if isLiving { logger.Info("The live is already started. Start recording immediately.") - chEvent <- WatcherLiveStart + return nil } else { logger.Info("The live is not started yet. Waiting...") } @@ -106,22 +97,22 @@ func watch( for { select { case <-ctx.Done(): - return nil, ErrSuccess + return ctx.Err() case <-heartBeatTimer.C: err = heartbeat() if err != nil { - return fmt.Errorf("heartbeat failed: %w", err), ErrTransport + return NewRecoverableTaskError("heartbeat failed", err) } default: var msg dmpkg.DanmakuExchange msg, err = dm.ReadExchange() if err != nil { - return fmt.Errorf("exchange read failed: %w", err), ErrTransport + return NewRecoverableTaskError("failed to read exchange from server", err) } // the exchange may be compressed msg, err = msg.Inflate() if err != nil { - return fmt.Errorf("inflate server message failed: %v", err), ErrProtocol + return NewUnrecoverableTaskError("failed to decompress server message", err) } switch msg.Operation { @@ -131,18 +122,13 @@ func watch( err := json.Unmarshal(msg.Body, &info) if err != nil { logger.Error("Invalid JSON: \"%v\", exchange: %v", string(msg.Body), msg) - return fmt.Errorf("decode server message body JSON failed: %w", err), ErrProtocol + return NewUnrecoverableTaskError("invalid JSON response from server", err) } switch info.Command { case CommandLiveStart: - if !isLiving { - chEvent <- WatcherLiveStart - isLiving = true - } + return nil case CommandStreamPreparing: - if isLiving { - chEvent <- WatcherLiveStop - } + break default: switch info.Command { case "ENTRY_EFFECT": @@ -168,7 +154,7 @@ func watch( logger.Error("Cannot parse watched people number: %v", obj) continue } - logger.Info("The number of viewers (room: %v): %v", roomId, viewersNum) + logger.Info("The number of viewers (room: %v): %v", t.RoomId, viewersNum) case "INTERACT_WORD": var raw dmmsg.RawInteractWordMessage err = json.Unmarshal(msg.Body, &raw) |