From 76a7674dae069e0be15bed6af8450ad214f8dde2 Mon Sep 17 00:00:00 2001 From: Keuin Date: Sun, 2 Jul 2023 16:12:13 +0800 Subject: Refactor: distinct task errors with type enum. --- bilibili/errors/errors.go | 43 ------------- bilibili/errors/taskerror.go | 146 +++++++++++++++++++++++++++++++++++++++++++ bilibili/streaming.go | 4 +- go.mod | 1 + go.sum | 2 + recording/runner.go | 100 ++++++++++++++++------------- recording/watcher.go | 18 +++--- 7 files changed, 215 insertions(+), 99 deletions(-) delete mode 100644 bilibili/errors/errors.go create mode 100644 bilibili/errors/taskerror.go diff --git a/bilibili/errors/errors.go b/bilibili/errors/errors.go deleted file mode 100644 index 2b810ed..0000000 --- a/bilibili/errors/errors.go +++ /dev/null @@ -1,43 +0,0 @@ -package errors - -import ( - "fmt" -) - -/* -Task errors. -*/ - -type RecoverableTaskError struct { - err error - message string -} - -func (e *RecoverableTaskError) Error() string { - return fmt.Sprintf("%v: %v", e.message, e.err) -} - -func (e *RecoverableTaskError) Unwrap() error { - return e.err -} - -func NewRecoverableTaskError(message string, err error) error { - return &RecoverableTaskError{message: message, err: err} -} - -type UnrecoverableTaskError struct { - err error - message string -} - -func (e *UnrecoverableTaskError) Error() string { - return fmt.Sprintf("%v: %v", e.message, e.err) -} - -func (e *UnrecoverableTaskError) Unwrap() error { - return e.err -} - -func NewUnrecoverableTaskError(message string, err error) error { - return &UnrecoverableTaskError{message: message, err: err} -} diff --git a/bilibili/errors/taskerror.go b/bilibili/errors/taskerror.go new file mode 100644 index 0000000..644a794 --- /dev/null +++ b/bilibili/errors/taskerror.go @@ -0,0 +1,146 @@ +package errors + +import ( + "fmt" + "github.com/samber/lo" + "strings" +) + +type Type int + +const ( + // GetRoomInfo means failed to read live room information + GetRoomInfo Type = iota + // GetLiveInfo means failed to get live information + GetLiveInfo + // StreamCopy means an error occurred while reading stream video data + StreamCopy + // LiveEnded means the live is ended + LiveEnded + // DanmakuServerConnection means failed to connect to danmaku server, which provides danmaku and other control data + DanmakuServerConnection + // Heartbeat means an error occurred while sending heartbeat message, which may indicate a broken network connection + Heartbeat + // InitialLiveStatus means failed to get the live status for the first time, which happens in the early stage + InitialLiveStatus + // DanmakuExchangeRead means an error occurred while reading danmaku datagram from the server, + // which may indicate a broken network connection + DanmakuExchangeRead + // GetDanmakuServerInfo means failed to get danmaku server info + GetDanmakuServerInfo + // RecoverLiveStatusChecker means failed to restart live status checker + RecoverLiveStatusChecker + + // FileCreation means failed to create a file + FileCreation + // InvalidLiveInfo means the live info is insufficient for recording + InvalidLiveInfo + // LiveStatusWatch means the live status watcher encountered an unrecoverable error + LiveStatusWatch + // Unknown means the error type is unexpected and is not suitable to retry for safety + Unknown + // InvalidAuthProtocol means authentication failed because the protocol is invalid, + // which may indicate the protocol implementation is outdated + InvalidAuthProtocol + // MessageDecompression means the message cannot be decompressed, and we cannot understand its content + MessageDecompression + // JsonDecode means we cannot decode a datum which is expected to be a JSON object string + JsonDecode +) + +var recoverableErrors = []Type{ + GetRoomInfo, + GetLiveInfo, + StreamCopy, + LiveEnded, + DanmakuServerConnection, + Heartbeat, + InitialLiveStatus, + DanmakuExchangeRead, + GetDanmakuServerInfo, + RecoverLiveStatusChecker, +} + +var errorStrings = map[Type]string{ + GetRoomInfo: "failed to get living room information", + GetLiveInfo: "failed to get live info", + StreamCopy: "stream copy was unexpectedly interrupted", + LiveEnded: "live is ended", + DanmakuServerConnection: "failed to connect to danmaku server", + Heartbeat: "heartbeat failed", + InitialLiveStatus: "check initial live status failed", + DanmakuExchangeRead: "failed to read exchange from server", + GetDanmakuServerInfo: "cannot get notification server info", + RecoverLiveStatusChecker: "when recovering from a previous error, another error occurred", + FileCreation: "failed to create file", + InvalidLiveInfo: "invalid live info", + LiveStatusWatch: "failed to watch live status", + Unknown: "unexpected error type", + InvalidAuthProtocol: "authentication failed, invalid protocol", + MessageDecompression: "failed to decompress server message", + JsonDecode: "invalid JSON response from server", +} + +func (t Type) String() string { + if s, ok := errorStrings[t]; ok { + return s + } + return fmt.Sprintf("", int(t)) +} + +type taskError struct { + typ Type + err []error +} + +func (e *taskError) Message() string { + //TODO implement me + panic("implement me") +} + +func (e *taskError) IsRecoverable() bool { + return lo.Contains(recoverableErrors, e.typ) +} + +func (e *taskError) Unwrap() []error { + return e.err +} + +func (e taskError) Error() string { + sb := strings.Builder{} + if e.IsRecoverable() { + sb.WriteString("recoverable task error") + } else { + sb.WriteString("unrecoverable task error") + } + sb.WriteString(": ") + sb.WriteString(fmt.Sprintf("%v", e.typ)) + if len(e.err) > 0 { + sb.WriteString(", ") + for i := range e.err { + sb.WriteString(fmt.Sprintf("%v", e.err[i])) + if i != len(e.err)-1 { + sb.WriteString(", ") + } + } + } + return sb.String() +} + +type TaskError interface { + // IsRecoverable reports if this task error is safe to retry. + IsRecoverable() bool + // Unwrap returns the underneath errors which cause the task error. + Unwrap() []error + // Message returns the detailed task-specific error message. + Message() string + // Error returns this error as string + Error() string +} + +func NewError(typ Type, err ...error) TaskError { + return &taskError{ + typ: typ, + err: err, + } +} diff --git a/bilibili/streaming.go b/bilibili/streaming.go index d081a8f..b985bbb 100644 --- a/bilibili/streaming.go +++ b/bilibili/streaming.go @@ -4,7 +4,7 @@ import ( "context" "errors" "fmt" - errors2 "github.com/keuin/slbr/bilibili/errors" + errs "github.com/keuin/slbr/bilibili/errors" "github.com/keuin/slbr/common" "io" "net/http" @@ -71,7 +71,7 @@ func (b Bilibili) CopyLiveStream( out, err = fileCreator() if err != nil { b.logger.Error("Cannot open file for writing: %v", err) - err = errors2.NewUnrecoverableTaskError("failed to create file", err) + err = errs.NewError(errs.FileCreation, err) return } _, err = out.Write(initBytes) diff --git a/go.mod b/go.mod index 69d0883..7794195 100644 --- a/go.mod +++ b/go.mod @@ -18,6 +18,7 @@ require ( github.com/klauspost/compress v1.16.5 // indirect github.com/magiconair/properties v1.8.7 // indirect github.com/pelletier/go-toml/v2 v2.0.8 // indirect + github.com/samber/lo v1.38.1 // indirect github.com/samber/mo v1.8.0 // indirect github.com/spf13/afero v1.9.5 // indirect github.com/spf13/cast v1.5.1 // indirect diff --git a/go.sum b/go.sum index 6229bed..2e7a7c0 100644 --- a/go.sum +++ b/go.sum @@ -184,6 +184,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= +github.com/samber/lo v1.38.1 h1:j2XEAqXKb09Am4ebOg31SpvzUTTs6EN3VfgeLUhPdXM= +github.com/samber/lo v1.38.1/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA= github.com/samber/mo v1.8.0 h1:vYjHTfg14JF9tD2NLhpoUsRi9bjyRoYwa4+do0nvbVw= github.com/samber/mo v1.8.0/go.mod h1:BfkrCPuYzVG3ZljnZB783WIJIGk1mcZr9c9CPf8tAxs= github.com/spf13/afero v1.9.5 h1:stMpOSZFs//0Lv29HduCmli3GUfpFoF3Y1Q/aXj/wVM= diff --git a/recording/runner.go b/recording/runner.go index 8050fd7..d43f451 100644 --- a/recording/runner.go +++ b/recording/runner.go @@ -11,7 +11,7 @@ import ( "errors" "fmt" "github.com/keuin/slbr/bilibili" - errors2 "github.com/keuin/slbr/bilibili/errors" + errs "github.com/keuin/slbr/bilibili/errors" "github.com/keuin/slbr/common" "github.com/keuin/slbr/common/myurl" "github.com/keuin/slbr/logging" @@ -31,7 +31,7 @@ type TaskResult struct { const SpecialExtName = "partial" -var errLiveEnded = errors2.NewRecoverableTaskError("live is ended", nil) +var errLiveEnded = errs.NewError(errs.LiveEnded) // runTaskWithAutoRestart // start a monitor&download task. @@ -50,8 +50,8 @@ loop: switch err.(type) { case nil: t.logger.Info("Task stopped: %v", t.String()) - case *errors2.RecoverableTaskError: - if err != errLiveEnded { + case errs.TaskError: + if !errors.Is(err, errLiveEnded) { t.logger.Error("Temporary error: %v", err) } t.status = StRestarting @@ -87,7 +87,7 @@ func tryRunTask(t *RunningTask) error { }, ) if err != nil { - return errors2.NewRecoverableTaskError("cannot get notification server info", err) + return errs.NewError(errs.GetDanmakuServerInfo, err) } t.logger.Info("Success.") @@ -132,20 +132,22 @@ func tryRunTask(t *RunningTask) error { if errors.Is(err, context.Canceled) { break loop } - switch err.(type) { + switch err := err.(type) { case nil: // live is started, stop watcher loop and start the recorder break loop - case *errors2.RecoverableTaskError: - // if the watcher fails and recoverable, just try to recover - // because the recorder has not started yet - run = true - t.logger.Error("Error occurred in live status watcher: %v", err) + case errs.TaskError: + if err.IsRecoverable() { + // if the watcher fails and recoverable, just try to recover + // because the recorder has not started yet + run = true + t.logger.Error("Error occurred in live status watcher: %v", err) + } else { + // the watcher cannot recover, so the task should be stopped + run = false + t.logger.Error("Error occurred in live status watcher: %v", err) + } break - case *errors2.UnrecoverableTaskError: - // the watcher cannot recover, so the task should be stopped - run = false - t.logger.Error("Error occurred in live status watcher: %v", err) default: run = false // unknown error type, this should not happen @@ -161,7 +163,7 @@ func tryRunTask(t *RunningTask) error { }() // wait for live start signal or the watcher stops abnormally - switch errWatcher := <-chWatcherError; errWatcher.(type) { + switch errWatcher := <-chWatcherError; err := errWatcher.(type) { case nil: // live is started, start recording // (now the watcher should have stopped) @@ -170,20 +172,18 @@ func tryRunTask(t *RunningTask) error { run := true for run { err = record(t.ctx, bi, &t.TaskConfig, t.logger) - switch err.(type) { - case nil: + if err == nil { // live is ended t.logger.Info("The live is ended. Restarting current task...") return errLiveEnded - case *errors2.RecoverableTaskError: + } + if err, ok := err.(errs.TaskError); ok && err.IsRecoverable() { + run = true // here we don't know if the live is ended, so we have to do a check t.logger.Warning("Recording is interrupted. Checking live status...") isLiving, err2 := AutoRetryWithTask(t, liveStatusChecker) if err2 != nil { - return errors2.NewRecoverableTaskError( - "when handling an error, another error occurred", - fmt.Errorf("first: %v, second: %w", err, err2), - ) + return errs.NewError(errs.RecoverLiveStatusChecker, err, err2) } if isLiving { t.logger.Info("This is a temporary error. Restarting recording...") @@ -192,33 +192,43 @@ func tryRunTask(t *RunningTask) error { return errLiveEnded } run = isLiving - break - default: - run = false - if errors.Is(err, context.Canceled) { - t.logger.Info("Recorder is stopped.") - } else if errors.Is(err, io.EOF) { - t.logger.Info("The live seems to be closed normally.") - } else if errors.Is(err, io.ErrUnexpectedEOF) { - t.logger.Warning("Reading is interrupted because of an unexpected EOF.") - } else { - t.logger.Error("Error when copying live stream: %v", err) - } - t.logger.Info("Stop recording.") } + // unrecoverable or unexpected errors + run = false + if errors.Is(err, context.Canceled) { + t.logger.Info("Recorder is stopped.") + } else if errors.Is(err, io.EOF) { + t.logger.Info("The live seems to be closed normally.") + } else if errors.Is(err, io.ErrUnexpectedEOF) { + t.logger.Warning("Reading is interrupted because of an unexpected EOF.") + } else { + t.logger.Error("Error when copying live stream: %v", err) + } + t.logger.Info("Stop recording.") } return err }() - case *errors2.UnrecoverableTaskError: - // watcher is stopped and cannot restart - return errors2.NewUnrecoverableTaskError("failed to watch live status", errWatcher) + case errs.TaskError: + if !err.IsRecoverable() { + // watcher is stopped and cannot restart + return errs.NewError(errs.LiveStatusWatch, errWatcher) + } + // this shouldn't happen + // TODO this code looks error-prone, we need to refactor the entire error handling routine, + // now we just try to keep the logic close to what it looks like before refactoring + // watcher is cancelled, stop running the task + if errors.Is(errWatcher, context.Canceled) { + return errWatcher + } + // unexpected error, this is a programming error + return errs.NewError(errs.Unknown, errWatcher) default: // watcher is cancelled, stop running the task if errors.Is(errWatcher, context.Canceled) { return errWatcher } // unexpected error, this is a programming error - return errors2.NewUnrecoverableTaskError("unexpected error type", errWatcher) + return errs.NewError(errs.Unknown, errWatcher) } } @@ -248,7 +258,7 @@ func record( if errors.Is(err, context.Canceled) { return err } - return errors2.NewRecoverableTaskError("failed to get living room information", err) + return errs.NewError(errs.GetRoomInfo, err) } logger.Info("Getting stream url...") @@ -264,7 +274,7 @@ func record( if errors.Is(err, context.Canceled) { return err } - return errors2.NewRecoverableTaskError("failed to get live info", err) + return errs.NewError(errs.GetLiveInfo, err) } if len(urlInfo.Data.URLs) == 0 { j, err2 := json.Marshal(urlInfo) @@ -272,7 +282,7 @@ func record( j = []byte("(not available)") } logger.Error("No stream was provided. Response: %v", string(j)) - return errors2.NewUnrecoverableTaskError("invalid live info", fmt.Errorf("no stream provided")) + return errs.NewError(errs.InvalidLiveInfo, fmt.Errorf("no stream provided")) } streamSource := urlInfo.Data.URLs[0] @@ -325,14 +335,14 @@ func record( logger.Info("Recording live stream to file \"%v\"...", filePath) return }, writeBufferSize) - if _, ok := err.(*errors2.UnrecoverableTaskError); ok { + if err, ok := err.(errs.TaskError); ok && !err.IsRecoverable() { logger.Error("Cannot record: %v", err) return err } else if errors.Is(err, context.Canceled) || err == nil { return err } logger.Error("Error when copying live stream: %v", err) - return errors2.NewRecoverableTaskError("stream copy was unexpectedly interrupted", err) + return errs.NewError(errs.StreamCopy, err) } func getDanmakuServer( diff --git a/recording/watcher.go b/recording/watcher.go index dd3bfa8..181e0fc 100644 --- a/recording/watcher.go +++ b/recording/watcher.go @@ -3,7 +3,7 @@ package recording import ( "context" "encoding/json" - "github.com/keuin/slbr/bilibili/errors" + errs "github.com/keuin/slbr/bilibili/errors" "github.com/keuin/slbr/danmaku" "github.com/keuin/slbr/danmaku/dmmsg" "github.com/keuin/slbr/danmaku/dmpkg" @@ -51,7 +51,7 @@ func watch( // connect to danmaku server for live online/offline notifications err = dm.Connect(ctx, url) if err != nil { - return errors.NewRecoverableTaskError("failed to connect to danmaku server", err) + return errs.NewError(errs.DanmakuServerConnection, err) } defer func() { // this operation may be time-consuming, so run in another goroutine @@ -64,7 +64,7 @@ func watch( logger.Info("ws connected. Authenticating...") err = dm.Authenticate(t.RoomId, authKey) if err != nil { - return errors.NewUnrecoverableTaskError("authentication failed, invalid protocol", err) + return errs.NewError(errs.InvalidAuthProtocol, err) } // the danmaku server requires heartbeat messages every 30 seconds @@ -76,7 +76,7 @@ func watch( // send initial heartbeat immediately err = heartbeat() if err != nil { - return errors.NewRecoverableTaskError("heartbeat failed", err) + return errs.NewError(errs.Heartbeat, err) } // create heartbeat timer @@ -86,7 +86,7 @@ func watch( logger.Info("Checking initial live status...") isLiving, err := AutoRetryWithConfig[bool](ctx, logger, &t, liveStatusChecker) if err != nil { - return errors.NewRecoverableTaskError("check initial live status failed", err) + return errs.NewError(errs.InitialLiveStatus, err) } if isLiving { logger.Info("The live is already started. Start recording immediately.") @@ -102,18 +102,18 @@ func watch( case <-heartBeatTimer.C: err = heartbeat() if err != nil { - return errors.NewRecoverableTaskError("heartbeat failed", err) + return errs.NewError(errs.Heartbeat, err) } default: var msg dmpkg.DanmakuExchange msg, err = dm.ReadExchange() if err != nil { - return errors.NewRecoverableTaskError("failed to read exchange from server", err) + return errs.NewError(errs.DanmakuExchangeRead, err) } // the exchange may be compressed msg, err = msg.Inflate() if err != nil { - return errors.NewUnrecoverableTaskError("failed to decompress server message", err) + return errs.NewError(errs.MessageDecompression, err) } switch msg.Operation { @@ -123,7 +123,7 @@ func watch( err := json.Unmarshal(msg.Body, &info) if err != nil { logger.Error("Invalid JSON: \"%v\", exchange: %v", string(msg.Body), msg) - return errors.NewUnrecoverableTaskError("invalid JSON response from server", err) + return errs.NewError(errs.JsonDecode, err) } switch info.Command { case CommandLiveStart: -- cgit v1.2.3