summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKeuin <[email protected]>2023-07-02 16:12:13 +0800
committerKeuin <[email protected]>2023-07-02 16:12:13 +0800
commit76a7674dae069e0be15bed6af8450ad214f8dde2 (patch)
tree5b0f1b09ce3c212c5dcc88d53667421cd3d5ac9d
parent6f7987593a9db5500fa56fef0ec1845a02df9876 (diff)
Refactor: distinct task errors with type enum.
-rw-r--r--bilibili/errors/errors.go43
-rw-r--r--bilibili/errors/taskerror.go146
-rw-r--r--bilibili/streaming.go4
-rw-r--r--go.mod1
-rw-r--r--go.sum2
-rw-r--r--recording/runner.go100
-rw-r--r--recording/watcher.go18
7 files changed, 215 insertions, 99 deletions
diff --git a/bilibili/errors/errors.go b/bilibili/errors/errors.go
deleted file mode 100644
index 2b810ed..0000000
--- a/bilibili/errors/errors.go
+++ /dev/null
@@ -1,43 +0,0 @@
-package errors
-
-import (
- "fmt"
-)
-
-/*
-Task errors.
-*/
-
-type RecoverableTaskError struct {
- err error
- message string
-}
-
-func (e *RecoverableTaskError) Error() string {
- return fmt.Sprintf("%v: %v", e.message, e.err)
-}
-
-func (e *RecoverableTaskError) Unwrap() error {
- return e.err
-}
-
-func NewRecoverableTaskError(message string, err error) error {
- return &RecoverableTaskError{message: message, err: err}
-}
-
-type UnrecoverableTaskError struct {
- err error
- message string
-}
-
-func (e *UnrecoverableTaskError) Error() string {
- return fmt.Sprintf("%v: %v", e.message, e.err)
-}
-
-func (e *UnrecoverableTaskError) Unwrap() error {
- return e.err
-}
-
-func NewUnrecoverableTaskError(message string, err error) error {
- return &UnrecoverableTaskError{message: message, err: err}
-}
diff --git a/bilibili/errors/taskerror.go b/bilibili/errors/taskerror.go
new file mode 100644
index 0000000..644a794
--- /dev/null
+++ b/bilibili/errors/taskerror.go
@@ -0,0 +1,146 @@
+package errors
+
+import (
+ "fmt"
+ "github.com/samber/lo"
+ "strings"
+)
+
+type Type int
+
+const (
+ // GetRoomInfo means failed to read live room information
+ GetRoomInfo Type = iota
+ // GetLiveInfo means failed to get live information
+ GetLiveInfo
+ // StreamCopy means an error occurred while reading stream video data
+ StreamCopy
+ // LiveEnded means the live is ended
+ LiveEnded
+ // DanmakuServerConnection means failed to connect to danmaku server, which provides danmaku and other control data
+ DanmakuServerConnection
+ // Heartbeat means an error occurred while sending heartbeat message, which may indicate a broken network connection
+ Heartbeat
+ // InitialLiveStatus means failed to get the live status for the first time, which happens in the early stage
+ InitialLiveStatus
+ // DanmakuExchangeRead means an error occurred while reading danmaku datagram from the server,
+ // which may indicate a broken network connection
+ DanmakuExchangeRead
+ // GetDanmakuServerInfo means failed to get danmaku server info
+ GetDanmakuServerInfo
+ // RecoverLiveStatusChecker means failed to restart live status checker
+ RecoverLiveStatusChecker
+
+ // FileCreation means failed to create a file
+ FileCreation
+ // InvalidLiveInfo means the live info is insufficient for recording
+ InvalidLiveInfo
+ // LiveStatusWatch means the live status watcher encountered an unrecoverable error
+ LiveStatusWatch
+ // Unknown means the error type is unexpected and is not suitable to retry for safety
+ Unknown
+ // InvalidAuthProtocol means authentication failed because the protocol is invalid,
+ // which may indicate the protocol implementation is outdated
+ InvalidAuthProtocol
+ // MessageDecompression means the message cannot be decompressed, and we cannot understand its content
+ MessageDecompression
+ // JsonDecode means we cannot decode a datum which is expected to be a JSON object string
+ JsonDecode
+)
+
+var recoverableErrors = []Type{
+ GetRoomInfo,
+ GetLiveInfo,
+ StreamCopy,
+ LiveEnded,
+ DanmakuServerConnection,
+ Heartbeat,
+ InitialLiveStatus,
+ DanmakuExchangeRead,
+ GetDanmakuServerInfo,
+ RecoverLiveStatusChecker,
+}
+
+var errorStrings = map[Type]string{
+ GetRoomInfo: "failed to get living room information",
+ GetLiveInfo: "failed to get live info",
+ StreamCopy: "stream copy was unexpectedly interrupted",
+ LiveEnded: "live is ended",
+ DanmakuServerConnection: "failed to connect to danmaku server",
+ Heartbeat: "heartbeat failed",
+ InitialLiveStatus: "check initial live status failed",
+ DanmakuExchangeRead: "failed to read exchange from server",
+ GetDanmakuServerInfo: "cannot get notification server info",
+ RecoverLiveStatusChecker: "when recovering from a previous error, another error occurred",
+ FileCreation: "failed to create file",
+ InvalidLiveInfo: "invalid live info",
+ LiveStatusWatch: "failed to watch live status",
+ Unknown: "unexpected error type",
+ InvalidAuthProtocol: "authentication failed, invalid protocol",
+ MessageDecompression: "failed to decompress server message",
+ JsonDecode: "invalid JSON response from server",
+}
+
+func (t Type) String() string {
+ if s, ok := errorStrings[t]; ok {
+ return s
+ }
+ return fmt.Sprintf("<Type %v>", int(t))
+}
+
+type taskError struct {
+ typ Type
+ err []error
+}
+
+func (e *taskError) Message() string {
+ //TODO implement me
+ panic("implement me")
+}
+
+func (e *taskError) IsRecoverable() bool {
+ return lo.Contains(recoverableErrors, e.typ)
+}
+
+func (e *taskError) Unwrap() []error {
+ return e.err
+}
+
+func (e taskError) Error() string {
+ sb := strings.Builder{}
+ if e.IsRecoverable() {
+ sb.WriteString("recoverable task error")
+ } else {
+ sb.WriteString("unrecoverable task error")
+ }
+ sb.WriteString(": ")
+ sb.WriteString(fmt.Sprintf("%v", e.typ))
+ if len(e.err) > 0 {
+ sb.WriteString(", ")
+ for i := range e.err {
+ sb.WriteString(fmt.Sprintf("%v", e.err[i]))
+ if i != len(e.err)-1 {
+ sb.WriteString(", ")
+ }
+ }
+ }
+ return sb.String()
+}
+
+type TaskError interface {
+ // IsRecoverable reports if this task error is safe to retry.
+ IsRecoverable() bool
+ // Unwrap returns the underneath errors which cause the task error.
+ Unwrap() []error
+ // Message returns the detailed task-specific error message.
+ Message() string
+ // Error returns this error as string
+ Error() string
+}
+
+func NewError(typ Type, err ...error) TaskError {
+ return &taskError{
+ typ: typ,
+ err: err,
+ }
+}
diff --git a/bilibili/streaming.go b/bilibili/streaming.go
index d081a8f..b985bbb 100644
--- a/bilibili/streaming.go
+++ b/bilibili/streaming.go
@@ -4,7 +4,7 @@ import (
"context"
"errors"
"fmt"
- errors2 "github.com/keuin/slbr/bilibili/errors"
+ errs "github.com/keuin/slbr/bilibili/errors"
"github.com/keuin/slbr/common"
"io"
"net/http"
@@ -71,7 +71,7 @@ func (b Bilibili) CopyLiveStream(
out, err = fileCreator()
if err != nil {
b.logger.Error("Cannot open file for writing: %v", err)
- err = errors2.NewUnrecoverableTaskError("failed to create file", err)
+ err = errs.NewError(errs.FileCreation, err)
return
}
_, err = out.Write(initBytes)
diff --git a/go.mod b/go.mod
index 69d0883..7794195 100644
--- a/go.mod
+++ b/go.mod
@@ -18,6 +18,7 @@ require (
github.com/klauspost/compress v1.16.5 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/pelletier/go-toml/v2 v2.0.8 // indirect
+ github.com/samber/lo v1.38.1 // indirect
github.com/samber/mo v1.8.0 // indirect
github.com/spf13/afero v1.9.5 // indirect
github.com/spf13/cast v1.5.1 // indirect
diff --git a/go.sum b/go.sum
index 6229bed..2e7a7c0 100644
--- a/go.sum
+++ b/go.sum
@@ -184,6 +184,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
+github.com/samber/lo v1.38.1 h1:j2XEAqXKb09Am4ebOg31SpvzUTTs6EN3VfgeLUhPdXM=
+github.com/samber/lo v1.38.1/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA=
github.com/samber/mo v1.8.0 h1:vYjHTfg14JF9tD2NLhpoUsRi9bjyRoYwa4+do0nvbVw=
github.com/samber/mo v1.8.0/go.mod h1:BfkrCPuYzVG3ZljnZB783WIJIGk1mcZr9c9CPf8tAxs=
github.com/spf13/afero v1.9.5 h1:stMpOSZFs//0Lv29HduCmli3GUfpFoF3Y1Q/aXj/wVM=
diff --git a/recording/runner.go b/recording/runner.go
index 8050fd7..d43f451 100644
--- a/recording/runner.go
+++ b/recording/runner.go
@@ -11,7 +11,7 @@ import (
"errors"
"fmt"
"github.com/keuin/slbr/bilibili"
- errors2 "github.com/keuin/slbr/bilibili/errors"
+ errs "github.com/keuin/slbr/bilibili/errors"
"github.com/keuin/slbr/common"
"github.com/keuin/slbr/common/myurl"
"github.com/keuin/slbr/logging"
@@ -31,7 +31,7 @@ type TaskResult struct {
const SpecialExtName = "partial"
-var errLiveEnded = errors2.NewRecoverableTaskError("live is ended", nil)
+var errLiveEnded = errs.NewError(errs.LiveEnded)
// runTaskWithAutoRestart
// start a monitor&download task.
@@ -50,8 +50,8 @@ loop:
switch err.(type) {
case nil:
t.logger.Info("Task stopped: %v", t.String())
- case *errors2.RecoverableTaskError:
- if err != errLiveEnded {
+ case errs.TaskError:
+ if !errors.Is(err, errLiveEnded) {
t.logger.Error("Temporary error: %v", err)
}
t.status = StRestarting
@@ -87,7 +87,7 @@ func tryRunTask(t *RunningTask) error {
},
)
if err != nil {
- return errors2.NewRecoverableTaskError("cannot get notification server info", err)
+ return errs.NewError(errs.GetDanmakuServerInfo, err)
}
t.logger.Info("Success.")
@@ -132,20 +132,22 @@ func tryRunTask(t *RunningTask) error {
if errors.Is(err, context.Canceled) {
break loop
}
- switch err.(type) {
+ switch err := err.(type) {
case nil:
// live is started, stop watcher loop and start the recorder
break loop
- case *errors2.RecoverableTaskError:
- // if the watcher fails and recoverable, just try to recover
- // because the recorder has not started yet
- run = true
- t.logger.Error("Error occurred in live status watcher: %v", err)
+ case errs.TaskError:
+ if err.IsRecoverable() {
+ // if the watcher fails and recoverable, just try to recover
+ // because the recorder has not started yet
+ run = true
+ t.logger.Error("Error occurred in live status watcher: %v", err)
+ } else {
+ // the watcher cannot recover, so the task should be stopped
+ run = false
+ t.logger.Error("Error occurred in live status watcher: %v", err)
+ }
break
- case *errors2.UnrecoverableTaskError:
- // the watcher cannot recover, so the task should be stopped
- run = false
- t.logger.Error("Error occurred in live status watcher: %v", err)
default:
run = false
// unknown error type, this should not happen
@@ -161,7 +163,7 @@ func tryRunTask(t *RunningTask) error {
}()
// wait for live start signal or the watcher stops abnormally
- switch errWatcher := <-chWatcherError; errWatcher.(type) {
+ switch errWatcher := <-chWatcherError; err := errWatcher.(type) {
case nil:
// live is started, start recording
// (now the watcher should have stopped)
@@ -170,20 +172,18 @@ func tryRunTask(t *RunningTask) error {
run := true
for run {
err = record(t.ctx, bi, &t.TaskConfig, t.logger)
- switch err.(type) {
- case nil:
+ if err == nil {
// live is ended
t.logger.Info("The live is ended. Restarting current task...")
return errLiveEnded
- case *errors2.RecoverableTaskError:
+ }
+ if err, ok := err.(errs.TaskError); ok && err.IsRecoverable() {
+ run = true
// here we don't know if the live is ended, so we have to do a check
t.logger.Warning("Recording is interrupted. Checking live status...")
isLiving, err2 := AutoRetryWithTask(t, liveStatusChecker)
if err2 != nil {
- return errors2.NewRecoverableTaskError(
- "when handling an error, another error occurred",
- fmt.Errorf("first: %v, second: %w", err, err2),
- )
+ return errs.NewError(errs.RecoverLiveStatusChecker, err, err2)
}
if isLiving {
t.logger.Info("This is a temporary error. Restarting recording...")
@@ -192,33 +192,43 @@ func tryRunTask(t *RunningTask) error {
return errLiveEnded
}
run = isLiving
- break
- default:
- run = false
- if errors.Is(err, context.Canceled) {
- t.logger.Info("Recorder is stopped.")
- } else if errors.Is(err, io.EOF) {
- t.logger.Info("The live seems to be closed normally.")
- } else if errors.Is(err, io.ErrUnexpectedEOF) {
- t.logger.Warning("Reading is interrupted because of an unexpected EOF.")
- } else {
- t.logger.Error("Error when copying live stream: %v", err)
- }
- t.logger.Info("Stop recording.")
}
+ // unrecoverable or unexpected errors
+ run = false
+ if errors.Is(err, context.Canceled) {
+ t.logger.Info("Recorder is stopped.")
+ } else if errors.Is(err, io.EOF) {
+ t.logger.Info("The live seems to be closed normally.")
+ } else if errors.Is(err, io.ErrUnexpectedEOF) {
+ t.logger.Warning("Reading is interrupted because of an unexpected EOF.")
+ } else {
+ t.logger.Error("Error when copying live stream: %v", err)
+ }
+ t.logger.Info("Stop recording.")
}
return err
}()
- case *errors2.UnrecoverableTaskError:
- // watcher is stopped and cannot restart
- return errors2.NewUnrecoverableTaskError("failed to watch live status", errWatcher)
+ case errs.TaskError:
+ if !err.IsRecoverable() {
+ // watcher is stopped and cannot restart
+ return errs.NewError(errs.LiveStatusWatch, errWatcher)
+ }
+ // this shouldn't happen
+ // TODO this code looks error-prone, we need to refactor the entire error handling routine,
+ // now we just try to keep the logic close to what it looks like before refactoring
+ // watcher is cancelled, stop running the task
+ if errors.Is(errWatcher, context.Canceled) {
+ return errWatcher
+ }
+ // unexpected error, this is a programming error
+ return errs.NewError(errs.Unknown, errWatcher)
default:
// watcher is cancelled, stop running the task
if errors.Is(errWatcher, context.Canceled) {
return errWatcher
}
// unexpected error, this is a programming error
- return errors2.NewUnrecoverableTaskError("unexpected error type", errWatcher)
+ return errs.NewError(errs.Unknown, errWatcher)
}
}
@@ -248,7 +258,7 @@ func record(
if errors.Is(err, context.Canceled) {
return err
}
- return errors2.NewRecoverableTaskError("failed to get living room information", err)
+ return errs.NewError(errs.GetRoomInfo, err)
}
logger.Info("Getting stream url...")
@@ -264,7 +274,7 @@ func record(
if errors.Is(err, context.Canceled) {
return err
}
- return errors2.NewRecoverableTaskError("failed to get live info", err)
+ return errs.NewError(errs.GetLiveInfo, err)
}
if len(urlInfo.Data.URLs) == 0 {
j, err2 := json.Marshal(urlInfo)
@@ -272,7 +282,7 @@ func record(
j = []byte("(not available)")
}
logger.Error("No stream was provided. Response: %v", string(j))
- return errors2.NewUnrecoverableTaskError("invalid live info", fmt.Errorf("no stream provided"))
+ return errs.NewError(errs.InvalidLiveInfo, fmt.Errorf("no stream provided"))
}
streamSource := urlInfo.Data.URLs[0]
@@ -325,14 +335,14 @@ func record(
logger.Info("Recording live stream to file \"%v\"...", filePath)
return
}, writeBufferSize)
- if _, ok := err.(*errors2.UnrecoverableTaskError); ok {
+ if err, ok := err.(errs.TaskError); ok && !err.IsRecoverable() {
logger.Error("Cannot record: %v", err)
return err
} else if errors.Is(err, context.Canceled) || err == nil {
return err
}
logger.Error("Error when copying live stream: %v", err)
- return errors2.NewRecoverableTaskError("stream copy was unexpectedly interrupted", err)
+ return errs.NewError(errs.StreamCopy, err)
}
func getDanmakuServer(
diff --git a/recording/watcher.go b/recording/watcher.go
index dd3bfa8..181e0fc 100644
--- a/recording/watcher.go
+++ b/recording/watcher.go
@@ -3,7 +3,7 @@ package recording
import (
"context"
"encoding/json"
- "github.com/keuin/slbr/bilibili/errors"
+ errs "github.com/keuin/slbr/bilibili/errors"
"github.com/keuin/slbr/danmaku"
"github.com/keuin/slbr/danmaku/dmmsg"
"github.com/keuin/slbr/danmaku/dmpkg"
@@ -51,7 +51,7 @@ func watch(
// connect to danmaku server for live online/offline notifications
err = dm.Connect(ctx, url)
if err != nil {
- return errors.NewRecoverableTaskError("failed to connect to danmaku server", err)
+ return errs.NewError(errs.DanmakuServerConnection, err)
}
defer func() {
// this operation may be time-consuming, so run in another goroutine
@@ -64,7 +64,7 @@ func watch(
logger.Info("ws connected. Authenticating...")
err = dm.Authenticate(t.RoomId, authKey)
if err != nil {
- return errors.NewUnrecoverableTaskError("authentication failed, invalid protocol", err)
+ return errs.NewError(errs.InvalidAuthProtocol, err)
}
// the danmaku server requires heartbeat messages every 30 seconds
@@ -76,7 +76,7 @@ func watch(
// send initial heartbeat immediately
err = heartbeat()
if err != nil {
- return errors.NewRecoverableTaskError("heartbeat failed", err)
+ return errs.NewError(errs.Heartbeat, err)
}
// create heartbeat timer
@@ -86,7 +86,7 @@ func watch(
logger.Info("Checking initial live status...")
isLiving, err := AutoRetryWithConfig[bool](ctx, logger, &t, liveStatusChecker)
if err != nil {
- return errors.NewRecoverableTaskError("check initial live status failed", err)
+ return errs.NewError(errs.InitialLiveStatus, err)
}
if isLiving {
logger.Info("The live is already started. Start recording immediately.")
@@ -102,18 +102,18 @@ func watch(
case <-heartBeatTimer.C:
err = heartbeat()
if err != nil {
- return errors.NewRecoverableTaskError("heartbeat failed", err)
+ return errs.NewError(errs.Heartbeat, err)
}
default:
var msg dmpkg.DanmakuExchange
msg, err = dm.ReadExchange()
if err != nil {
- return errors.NewRecoverableTaskError("failed to read exchange from server", err)
+ return errs.NewError(errs.DanmakuExchangeRead, err)
}
// the exchange may be compressed
msg, err = msg.Inflate()
if err != nil {
- return errors.NewUnrecoverableTaskError("failed to decompress server message", err)
+ return errs.NewError(errs.MessageDecompression, err)
}
switch msg.Operation {
@@ -123,7 +123,7 @@ func watch(
err := json.Unmarshal(msg.Body, &info)
if err != nil {
logger.Error("Invalid JSON: \"%v\", exchange: %v", string(msg.Body), msg)
- return errors.NewUnrecoverableTaskError("invalid JSON response from server", err)
+ return errs.NewError(errs.JsonDecode, err)
}
switch info.Command {
case CommandLiveStart: