summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKeuin <[email protected]>2023-07-02 14:52:06 +0800
committerKeuin <[email protected]>2023-07-02 14:52:06 +0800
commit6f7987593a9db5500fa56fef0ec1845a02df9876 (patch)
tree11dd64cfbe07e2d48c224da665932ce39e4f5596
parentb6eb2c0da4e653c6fdd278bcbdcd55ec376cd481 (diff)
Refactor: move task error types to separate package.
-rw-r--r--bilibili/errors/errors.go (renamed from common/errors.go)2
-rw-r--r--bilibili/streaming.go3
-rw-r--r--recording/runner.go31
-rw-r--r--recording/watcher.go18
4 files changed, 28 insertions, 26 deletions
diff --git a/common/errors.go b/bilibili/errors/errors.go
index bdf49fb..2b810ed 100644
--- a/common/errors.go
+++ b/bilibili/errors/errors.go
@@ -1,4 +1,4 @@
-package common
+package errors
import (
"fmt"
diff --git a/bilibili/streaming.go b/bilibili/streaming.go
index 8f6c491..d081a8f 100644
--- a/bilibili/streaming.go
+++ b/bilibili/streaming.go
@@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
+ errors2 "github.com/keuin/slbr/bilibili/errors"
"github.com/keuin/slbr/common"
"io"
"net/http"
@@ -70,7 +71,7 @@ func (b Bilibili) CopyLiveStream(
out, err = fileCreator()
if err != nil {
b.logger.Error("Cannot open file for writing: %v", err)
- err = common.NewUnrecoverableTaskError("failed to create file", err)
+ err = errors2.NewUnrecoverableTaskError("failed to create file", err)
return
}
_, err = out.Write(initBytes)
diff --git a/recording/runner.go b/recording/runner.go
index 1f2f101..8050fd7 100644
--- a/recording/runner.go
+++ b/recording/runner.go
@@ -11,6 +11,7 @@ import (
"errors"
"fmt"
"github.com/keuin/slbr/bilibili"
+ errors2 "github.com/keuin/slbr/bilibili/errors"
"github.com/keuin/slbr/common"
"github.com/keuin/slbr/common/myurl"
"github.com/keuin/slbr/logging"
@@ -30,7 +31,7 @@ type TaskResult struct {
const SpecialExtName = "partial"
-var errLiveEnded = common.NewRecoverableTaskError("live is ended", nil)
+var errLiveEnded = errors2.NewRecoverableTaskError("live is ended", nil)
// runTaskWithAutoRestart
// start a monitor&download task.
@@ -49,7 +50,7 @@ loop:
switch err.(type) {
case nil:
t.logger.Info("Task stopped: %v", t.String())
- case *common.RecoverableTaskError:
+ case *errors2.RecoverableTaskError:
if err != errLiveEnded {
t.logger.Error("Temporary error: %v", err)
}
@@ -86,7 +87,7 @@ func tryRunTask(t *RunningTask) error {
},
)
if err != nil {
- return common.NewRecoverableTaskError("cannot get notification server info", err)
+ return errors2.NewRecoverableTaskError("cannot get notification server info", err)
}
t.logger.Info("Success.")
@@ -135,13 +136,13 @@ func tryRunTask(t *RunningTask) error {
case nil:
// live is started, stop watcher loop and start the recorder
break loop
- case *common.RecoverableTaskError:
+ case *errors2.RecoverableTaskError:
// if the watcher fails and recoverable, just try to recover
// because the recorder has not started yet
run = true
t.logger.Error("Error occurred in live status watcher: %v", err)
break
- case *common.UnrecoverableTaskError:
+ case *errors2.UnrecoverableTaskError:
// the watcher cannot recover, so the task should be stopped
run = false
t.logger.Error("Error occurred in live status watcher: %v", err)
@@ -174,12 +175,12 @@ func tryRunTask(t *RunningTask) error {
// live is ended
t.logger.Info("The live is ended. Restarting current task...")
return errLiveEnded
- case *common.RecoverableTaskError:
+ case *errors2.RecoverableTaskError:
// here we don't know if the live is ended, so we have to do a check
t.logger.Warning("Recording is interrupted. Checking live status...")
isLiving, err2 := AutoRetryWithTask(t, liveStatusChecker)
if err2 != nil {
- return common.NewRecoverableTaskError(
+ return errors2.NewRecoverableTaskError(
"when handling an error, another error occurred",
fmt.Errorf("first: %v, second: %w", err, err2),
)
@@ -208,16 +209,16 @@ func tryRunTask(t *RunningTask) error {
}
return err
}()
- case *common.UnrecoverableTaskError:
+ case *errors2.UnrecoverableTaskError:
// watcher is stopped and cannot restart
- return common.NewUnrecoverableTaskError("failed to watch live status", errWatcher)
+ return errors2.NewUnrecoverableTaskError("failed to watch live status", errWatcher)
default:
// watcher is cancelled, stop running the task
if errors.Is(errWatcher, context.Canceled) {
return errWatcher
}
// unexpected error, this is a programming error
- return common.NewUnrecoverableTaskError("unexpected error type", errWatcher)
+ return errors2.NewUnrecoverableTaskError("unexpected error type", errWatcher)
}
}
@@ -247,7 +248,7 @@ func record(
if errors.Is(err, context.Canceled) {
return err
}
- return common.NewRecoverableTaskError("failed to get living room information", err)
+ return errors2.NewRecoverableTaskError("failed to get living room information", err)
}
logger.Info("Getting stream url...")
@@ -263,7 +264,7 @@ func record(
if errors.Is(err, context.Canceled) {
return err
}
- return common.NewRecoverableTaskError("failed to get live info", err)
+ return errors2.NewRecoverableTaskError("failed to get live info", err)
}
if len(urlInfo.Data.URLs) == 0 {
j, err2 := json.Marshal(urlInfo)
@@ -271,7 +272,7 @@ func record(
j = []byte("(not available)")
}
logger.Error("No stream was provided. Response: %v", string(j))
- return common.NewUnrecoverableTaskError("invalid live info", fmt.Errorf("no stream provided"))
+ return errors2.NewUnrecoverableTaskError("invalid live info", fmt.Errorf("no stream provided"))
}
streamSource := urlInfo.Data.URLs[0]
@@ -324,14 +325,14 @@ func record(
logger.Info("Recording live stream to file \"%v\"...", filePath)
return
}, writeBufferSize)
- if _, ok := err.(*common.UnrecoverableTaskError); ok {
+ if _, ok := err.(*errors2.UnrecoverableTaskError); ok {
logger.Error("Cannot record: %v", err)
return err
} else if errors.Is(err, context.Canceled) || err == nil {
return err
}
logger.Error("Error when copying live stream: %v", err)
- return common.NewRecoverableTaskError("stream copy was unexpectedly interrupted", err)
+ return errors2.NewRecoverableTaskError("stream copy was unexpectedly interrupted", err)
}
func getDanmakuServer(
diff --git a/recording/watcher.go b/recording/watcher.go
index 13610a4..dd3bfa8 100644
--- a/recording/watcher.go
+++ b/recording/watcher.go
@@ -3,7 +3,7 @@ package recording
import (
"context"
"encoding/json"
- "github.com/keuin/slbr/common"
+ "github.com/keuin/slbr/bilibili/errors"
"github.com/keuin/slbr/danmaku"
"github.com/keuin/slbr/danmaku/dmmsg"
"github.com/keuin/slbr/danmaku/dmpkg"
@@ -51,7 +51,7 @@ func watch(
// connect to danmaku server for live online/offline notifications
err = dm.Connect(ctx, url)
if err != nil {
- return common.NewRecoverableTaskError("failed to connect to danmaku server", err)
+ return errors.NewRecoverableTaskError("failed to connect to danmaku server", err)
}
defer func() {
// this operation may be time-consuming, so run in another goroutine
@@ -64,7 +64,7 @@ func watch(
logger.Info("ws connected. Authenticating...")
err = dm.Authenticate(t.RoomId, authKey)
if err != nil {
- return common.NewUnrecoverableTaskError("authentication failed, invalid protocol", err)
+ return errors.NewUnrecoverableTaskError("authentication failed, invalid protocol", err)
}
// the danmaku server requires heartbeat messages every 30 seconds
@@ -76,7 +76,7 @@ func watch(
// send initial heartbeat immediately
err = heartbeat()
if err != nil {
- return common.NewRecoverableTaskError("heartbeat failed", err)
+ return errors.NewRecoverableTaskError("heartbeat failed", err)
}
// create heartbeat timer
@@ -86,7 +86,7 @@ func watch(
logger.Info("Checking initial live status...")
isLiving, err := AutoRetryWithConfig[bool](ctx, logger, &t, liveStatusChecker)
if err != nil {
- return common.NewRecoverableTaskError("check initial live status failed", err)
+ return errors.NewRecoverableTaskError("check initial live status failed", err)
}
if isLiving {
logger.Info("The live is already started. Start recording immediately.")
@@ -102,18 +102,18 @@ func watch(
case <-heartBeatTimer.C:
err = heartbeat()
if err != nil {
- return common.NewRecoverableTaskError("heartbeat failed", err)
+ return errors.NewRecoverableTaskError("heartbeat failed", err)
}
default:
var msg dmpkg.DanmakuExchange
msg, err = dm.ReadExchange()
if err != nil {
- return common.NewRecoverableTaskError("failed to read exchange from server", err)
+ return errors.NewRecoverableTaskError("failed to read exchange from server", err)
}
// the exchange may be compressed
msg, err = msg.Inflate()
if err != nil {
- return common.NewUnrecoverableTaskError("failed to decompress server message", err)
+ return errors.NewUnrecoverableTaskError("failed to decompress server message", err)
}
switch msg.Operation {
@@ -123,7 +123,7 @@ func watch(
err := json.Unmarshal(msg.Body, &info)
if err != nil {
logger.Error("Invalid JSON: \"%v\", exchange: %v", string(msg.Body), msg)
- return common.NewUnrecoverableTaskError("invalid JSON response from server", err)
+ return errors.NewUnrecoverableTaskError("invalid JSON response from server", err)
}
switch info.Command {
case CommandLiveStart: