summaryrefslogtreecommitdiff
path: root/recording
diff options
context:
space:
mode:
authorKeuin <[email protected]>2022-09-14 03:02:51 +0800
committerKeuin <[email protected]>2022-09-14 03:04:52 +0800
commit7902849dc021610b0ec16d1130b9515efd1f64b1 (patch)
tree058c1aca3a4947f75b0ba73f6f205498a1e186e5 /recording
parented3db79b86e9ea2c796cccedf1454a45823293a4 (diff)
Refactor: proper error handling.v0.3.3
Diffstat (limited to 'recording')
-rw-r--r--recording/errors.go33
-rw-r--r--recording/runner.go357
-rw-r--r--recording/task.go34
-rw-r--r--recording/watcher.go72
4 files changed, 287 insertions, 209 deletions
diff --git a/recording/errors.go b/recording/errors.go
new file mode 100644
index 0000000..48a492e
--- /dev/null
+++ b/recording/errors.go
@@ -0,0 +1,33 @@
+package recording
+
+import "fmt"
+
+/*
+Task errors.
+*/
+
+type RecoverableTaskError struct {
+ err error
+ message string
+}
+
+func (e *RecoverableTaskError) Error() string {
+ return fmt.Sprintf("%v: %v", e.message, e.err)
+}
+
+func NewRecoverableTaskError(message string, err error) error {
+ return &RecoverableTaskError{message: message, err: err}
+}
+
+type UnrecoverableTaskError struct {
+ err error
+ message string
+}
+
+func (e *UnrecoverableTaskError) Error() string {
+ return fmt.Sprintf("%v: %v", e.message, e.err)
+}
+
+func NewUnrecoverableTaskError(message string, err error) error {
+ return &UnrecoverableTaskError{message: message, err: err}
+}
diff --git a/recording/runner.go b/recording/runner.go
index 58bcbc9..fe518b8 100644
--- a/recording/runner.go
+++ b/recording/runner.go
@@ -12,9 +12,11 @@ import (
"fmt"
"github.com/keuin/slbr/bilibili"
"github.com/keuin/slbr/common"
+ "github.com/keuin/slbr/logging"
"io"
"os"
"path"
+ "sync"
"time"
)
@@ -27,40 +29,40 @@ type TaskResult struct {
const kReadChunkSize = 128 * 1024
const kSpecialExtName = "partial"
+var errLiveEnded = NewRecoverableTaskError("live is ended", nil)
+
// runTaskWithAutoRestart
// start a monitor&download task.
// The task will be restarted infinitely until the context is closed,
// which means it will survive when the live is ended. (It always waits for the next start)
// During the process, its status may change.
// Note: this method is blocking.
-func (t *RunningTask) runTaskWithAutoRestart() error {
+func (t *RunningTask) runTaskWithAutoRestart() {
+ t.status = StRunning
+loop:
for {
- t.status = StRunning
- err := tryRunTask(t)
- if err == nil ||
- errors.Is(err, bilibili.ErrRoomIsClosed) ||
- errors.Is(err, io.EOF) ||
- errors.Is(err, io.ErrUnexpectedEOF) {
- if !errors.Is(err, io.EOF) {
- t.logger.Error("Task stopped because of an error: %v", err)
+ switch err := tryRunTask(t); err.(type) {
+ case nil:
+ t.logger.Info("Task stopped: %v", t.String())
+ case *RecoverableTaskError:
+ if err != errLiveEnded {
+ t.logger.Error("Temporary error: %v", err)
}
t.status = StRestarting
- t.logger.Info("Restarting task...")
- continue
- } else if err != nil && !errors.Is(err, context.Canceled) {
- t.logger.Error("Task stopped with an error: %v", err)
- return fmt.Errorf("task stopped: %v", err)
- } else {
- t.logger.Info("Task stopped: %v", t.String())
- return nil
+ default:
+ if !errors.Is(err, context.Canceled) {
+ t.logger.Error("Cannot recover from error: %v", err)
+ }
+ break loop
}
}
+ t.logger.Info("Task stopped: %v", t.String())
}
// tryRunTask does the actual work. It will return when in the following cases:
-// - the task context is cancelled
-// - the task is restarting (e.g. because of the end of live)
-// - some unrecoverable error happens (e.g. a protocol error caused by a bilibili protocol update)
+// RecoverableError (end of live, IO error)
+// UnrecoverableError (protocol error)
+// context.Cancelled (the task is stopping)
func tryRunTask(t *RunningTask) error {
netTypes := t.Transport.AllowedNetworkTypes
t.logger.Info("Network types: %v", netTypes)
@@ -68,117 +70,205 @@ func tryRunTask(t *RunningTask) error {
t.logger.Info("Start task: room %v", t.RoomId)
t.logger.Info("Getting notification server info...")
- authKey, dmUrl, err := getDanmakuServer(&t.TaskConfig, bi)
+
+ type dmServerInfo struct {
+ AuthKey string
+ DmUrl string
+ }
+
+ dmInfo, err := AutoRetryWithTask(
+ t, func() (info dmServerInfo, err error) {
+ info.AuthKey, info.DmUrl, err = getDanmakuServer(&t.TaskConfig, bi)
+ return
+ },
+ )
if err != nil {
- return err
+ return NewRecoverableTaskError("cannot get notification server info", err)
}
+
t.logger.Info("Success.")
+ // wait for watcher goroutine
+ wg := sync.WaitGroup{}
+ defer wg.Wait()
+
+ liveStatusChecker := func() (bool, error) {
+ resp, err := bi.GetRoomPlayInfo(t.RoomId)
+ if err != nil {
+ return false, err
+ }
+ if resp.Code != 0 {
+ return false, fmt.Errorf("bilibili API error: %v", resp.Message)
+ }
+ return resp.Data.LiveStatus.IsStreaming(), nil
+ }
+
// run live status watcher asynchronously
t.logger.Info("Starting watcher...")
- chWatcherEvent := make(chan WatcherEvent)
- chWatcherDown := make(chan struct{})
- // start and recover watcher asynchronously
- // the watcher may also be stopped by the downloader goroutine
- watcherCtx, stopWatcher := context.WithCancel(t.ctx)
+ wg.Add(1)
+ chWatcherError := make(chan error)
+ ctxWatcher, stopWatcher := context.WithCancel(t.ctx)
defer stopWatcher()
- go watcherRecoverableLoop(
- watcherCtx,
- dmUrl,
- authKey,
- t,
- bi,
- chWatcherEvent,
- chWatcherDown,
- )
-
- // The stream download goroutine may fail due to wrong watcher state.
- // But this is likely temporarily, so we should restart the downloader
- // until the state turns to closed.
-
- recorderCtx, stopRecorder := context.WithCancel(t.ctx)
- defer stopRecorder()
-
- ev := <-chWatcherEvent
- switch ev {
- case WatcherLiveStart:
- var err2 error
- // restart recorder if interrupted by I/O errors
- _, err2 = record(recorderCtx, bi, t)
- if errors.Is(err2, io.EOF) {
- t.logger.Info("The live seems to be closed normally.")
- } else if errors.Is(err2, io.ErrUnexpectedEOF) {
- t.logger.Warning("Reading is interrupted because of an unexpected EOF.")
- } else {
- t.logger.Error("Error when copying live stream: %v", err2)
+ go func() {
+ var err error
+ defer wg.Done()
+ run := true
+ loop:
+ for run {
+ err = watch(
+ ctxWatcher,
+ t.TaskConfig,
+ dmInfo.DmUrl,
+ dmInfo.AuthKey,
+ liveStatusChecker,
+ t.logger,
+ )
+ switch err.(type) {
+ case nil:
+ // live is started, stop watcher loop and start the recorder
+ break loop
+ case *RecoverableTaskError:
+ // if the watcher fails and recoverable, just try to recover
+ // because the recorder has not started yet
+ run = true
+ t.logger.Error("Error occurred in live status watcher: %v", err)
+ break
+ case *UnrecoverableTaskError:
+ // the watcher cannot recover, so the task should be stopped
+ run = false
+ t.logger.Error("Error occurred in live status watcher: %v", err)
+ default:
+ run = false
+ // the task is being cancelled
+ if errors.Is(err, context.Canceled) {
+ break loop
+ }
+ // unknown error type, this should not happen
+ t.logger.Error("Unexpected type of error in watcher: %v", err)
+ }
+ if run {
+ t.logger.Info("Restarting watcher...")
+ } else {
+ t.logger.Error("Cannot restart watcher to recover from that error.")
+ }
}
- t.logger.Info("Stop recording.")
- return err2
- case WatcherLiveStop:
- // once the live is ended, the watcher will no longer receive live start event
- // we have to restart the watcher
- return bilibili.ErrRoomIsClosed
+ chWatcherError <- err
+ }()
+
+ // wait for live start signal or the watcher stops abnormally
+ switch errWatcher := <-chWatcherError; errWatcher.(type) {
+ case nil:
+ // live is started, start recording
+ // (now the watcher should have stopped)
+ return func() error {
+ var err error
+ run := true
+ for run {
+ err = record(t.ctx, bi, &t.TaskConfig, t.logger)
+ switch err.(type) {
+ case nil:
+ // live is ended
+ t.logger.Info("The live is ended. Restarting current task...")
+ return errLiveEnded
+ case *RecoverableTaskError:
+ // here we don't know if the live is ended, so we have to do a check
+ t.logger.Warning("Recording is interrupted. Checking live status...")
+ isLiving, err2 := AutoRetryWithTask(t, liveStatusChecker)
+ if err2 != nil {
+ return NewRecoverableTaskError(
+ "when handling an error, another error occurred",
+ fmt.Errorf("first: %v, second: %w", err, err2),
+ )
+ }
+ if isLiving {
+ t.logger.Info("This is a temporary error. Restarting recording...")
+ } else {
+ t.logger.Info("The live is ended. Restarting current task...")
+ return errLiveEnded
+ }
+ run = isLiving
+ break
+ default:
+ run = false
+ if errors.Is(err, context.Canceled) {
+ t.logger.Info("Recorder is stopped.")
+ } else if errors.Is(err, io.EOF) {
+ t.logger.Info("The live seems to be closed normally.")
+ } else if errors.Is(err, io.ErrUnexpectedEOF) {
+ t.logger.Warning("Reading is interrupted because of an unexpected EOF.")
+ } else {
+ t.logger.Error("Error when copying live stream: %v", err)
+ }
+ t.logger.Info("Stop recording.")
+ }
+ }
+ return err
+ }()
+ case *UnrecoverableTaskError:
+ // watcher is stopped and cannot restart
+ return NewUnrecoverableTaskError("failed to watch live status", errWatcher)
default:
- return fmt.Errorf("unknown watcher event: %v", ev)
+ // watcher is cancelled, stop running the task
+ if errors.Is(errWatcher, context.Canceled) {
+ return errWatcher
+ }
+ // unexpected error, this is a programming error
+ return NewUnrecoverableTaskError("unexpected error type", errWatcher)
}
}
// record. When cancelled, the caller should clean up immediately and stop the task.
+// Errors:
+// RecoverableError
+// UnrecoverableError
+// context.Cancelled
+// nil (live is ended normally)
func record(
ctx context.Context,
bi bilibili.Bilibili,
- task *RunningTask,
-) (cancelled bool, err error) {
- task.logger.Info("Getting room profile...")
+ task *TaskConfig,
+ logger logging.Logger,
+) error {
+ logger.Info("Getting room profile...")
- profile, err := common.AutoRetry(
+ profile, err := AutoRetryWithConfig(
ctx,
+ logger,
+ task,
func() (bilibili.RoomProfileResponse, error) {
return bi.GetRoomProfile(task.RoomId)
},
- task.Transport.MaxRetryTimes,
- time.Duration(task.Transport.RetryIntervalSeconds)*time.Second,
- &task.logger,
)
- if errors.Is(err, context.Canceled) {
- cancelled = true
- return
- }
if err != nil {
- // still error, abort
- task.logger.Error("Cannot get room information: %v. Stopping current task.", err)
- cancelled = true
- return
+ if errors.Is(err, context.Canceled) {
+ return err
+ }
+ return NewRecoverableTaskError("failed to get living room information", err)
}
- task.logger.Info("Getting stream url...")
- urlInfo, err := common.AutoRetry(
+ logger.Info("Getting stream url...")
+ urlInfo, err := AutoRetryWithConfig(
ctx,
+ logger,
+ task,
func() (bilibili.RoomUrlInfoResponse, error) {
return bi.GetStreamingInfo(task.RoomId)
},
- task.Transport.MaxRetryTimes,
- time.Duration(task.Transport.RetryIntervalSeconds)*time.Second,
- &task.logger,
)
- if errors.Is(err, context.Canceled) {
- cancelled = true
- return
- }
if err != nil {
- task.logger.Error("Cannot get streaming info: %v", err)
- cancelled = true
- return
+ if errors.Is(err, context.Canceled) {
+ return err
+ }
+ return NewRecoverableTaskError("failed to get live info", err)
}
if len(urlInfo.Data.URLs) == 0 {
j, err2 := json.Marshal(urlInfo)
if err2 != nil {
j = []byte("(not available)")
}
- task.logger.Error("No stream returned from API. Response: %v", string(j))
- cancelled = true
- return
+ logger.Error("No stream was provided. Response: %v", string(j))
+ return NewUnrecoverableTaskError("invalid live info", fmt.Errorf("no stream provided"))
}
streamSource := urlInfo.Data.URLs[0]
@@ -187,7 +277,7 @@ func record(
// the real extension name (without renaming)
originalExtName := common.Errorable[string](common.GetFileExtensionFromUrl(streamSource.URL)).OrElse("flv")
- if task.TaskConfig.Download.UseSpecialExtNameBeforeFinishing {
+ if task.Download.UseSpecialExtNameBeforeFinishing {
extName = kSpecialExtName
} else {
extName = originalExtName
@@ -200,9 +290,8 @@ func record(
file, err := os.OpenFile(filePath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644)
if err != nil {
- task.logger.Error("Cannot open file for writing: %v", err)
- cancelled = true
- return
+ logger.Error("Cannot open file for writing: %v", err)
+ return NewUnrecoverableTaskError("cannot open file for writing", err)
}
// rename the extension name to originalExtName when finish writing
defer func() {
@@ -213,10 +302,10 @@ func record(
to := path.Join(saveDir, common.CombineFileName(baseName, originalExtName))
err := os.Rename(from, to)
if err != nil {
- task.logger.Error("Cannot rename %v to %v: %v", from, to, err)
+ logger.Error("Cannot rename %v to %v: %v", from, to, err)
return
}
- task.logger.Info("Rename file \"%s\" to \"%s\".", from, to)
+ logger.Info("Rename file \"%s\" to \"%s\".", from, to)
}()
defer func() { _ = file.Close() }()
@@ -228,72 +317,14 @@ func record(
writeBufferSize += kReadChunkSize - mod
}
writeBuffer := make([]byte, writeBufferSize)
- task.logger.Info("Write buffer size: %v byte", writeBufferSize)
- task.logger.Info("Recording live stream to file \"%v\"...", filePath)
+ logger.Info("Write buffer size: %v byte", writeBufferSize)
+ logger.Info("Recording live stream to file \"%v\"...", filePath)
err = bi.CopyLiveStream(ctx, task.RoomId, streamSource, file, writeBuffer, kReadChunkSize)
- cancelled = err == nil || errors.Is(err, context.Canceled)
- if !cancelled {
- // real error happens
- task.logger.Error("Error when copying live stream: %v", err)
- }
- return
-}
-
-// watcherRecoverableLoop run watcher forever until the context is cancelled.
-func watcherRecoverableLoop(
- ctx context.Context,
- url string,
- authKey string,
- task *RunningTask,
- bi bilibili.Bilibili,
- chWatcherEvent chan<- WatcherEvent,
- chWatcherDown chan<- struct{},
-) {
- for {
- err, errReason := watch(
- ctx,
- url,
- authKey,
- task.RoomId,
- func() (bool, error) {
- resp, err := bi.GetRoomPlayInfo(task.RoomId)
- if err != nil {
- return false, err
- }
- if resp.Code != 0 {
- return false, fmt.Errorf("bilibili API error: %v", resp.Message)
- }
- return resp.Data.LiveStatus.IsStreaming(), nil
- },
- chWatcherEvent,
- task.logger,
- )
-
- // the context is cancelled, stop watching
- if errors.Is(err, context.Canceled) {
- return
- }
-
- switch errReason {
- case ErrSuccess:
- // stop normally, the context is closed
- return
- case ErrProtocol:
- task.logger.Fatal("Watcher stopped due to an unrecoverable error: %v", err)
- // shutdown the whole task
- chWatcherDown <- struct{}{}
- return
- case ErrTransport:
- task.logger.Error("Watcher stopped due to an I/O error: %v", err)
- waitSeconds := task.Transport.RetryIntervalSeconds
- task.logger.Warning(
- "Sleep for %v second(s) before restarting watcher.\n",
- waitSeconds,
- )
- time.Sleep(time.Duration(waitSeconds) * time.Second)
- task.logger.Info("Retrying...")
- }
+ if errors.Is(err, context.Canceled) || err == nil {
+ return err
}
+ logger.Error("Error when copying live stream: %v", err)
+ return NewRecoverableTaskError("stream copy was unexpectedly interrupted", err)
}
func getDanmakuServer(
diff --git a/recording/task.go b/recording/task.go
index 3d417ba..78be348 100644
--- a/recording/task.go
+++ b/recording/task.go
@@ -8,7 +8,9 @@ Concrete task works are done in the `runner.go` file.
import (
"context"
"fmt"
+ "github.com/keuin/slbr/common"
"github.com/keuin/slbr/logging"
+ "time"
)
type TaskStatus int
@@ -31,8 +33,6 @@ type RunningTask struct {
TaskConfig
// ctx: the biggest context this task uses. It may create children contexts.
ctx context.Context
- // result: if the task is ended, here is the returned error
- result error
// status: running status
status TaskStatus
// hookStarted: called asynchronously when the task is started. This won't be called when restarting.
@@ -70,7 +70,7 @@ func (t *RunningTask) StartTask() error {
t.hookStarted()
defer t.hookStopped()
// do the task
- _ = t.runTaskWithAutoRestart()
+ t.runTaskWithAutoRestart()
}()
return nil
case StRunning:
@@ -85,3 +85,31 @@ func (t *RunningTask) StartTask() error {
}
panic(fmt.Errorf("invalid task status: %v", st))
}
+
+func AutoRetryWithTask[T any](
+ t *RunningTask,
+ supplier func() (T, error),
+) (T, error) {
+ return common.AutoRetry[T](
+ t.ctx,
+ supplier,
+ t.Transport.MaxRetryTimes,
+ time.Duration(t.Transport.RetryIntervalSeconds)*time.Second,
+ &t.logger,
+ )
+}
+
+func AutoRetryWithConfig[T any](
+ ctx context.Context,
+ logger logging.Logger,
+ t *TaskConfig,
+ supplier func() (T, error),
+) (T, error) {
+ return common.AutoRetry[T](
+ ctx,
+ supplier,
+ t.Transport.MaxRetryTimes,
+ time.Duration(t.Transport.RetryIntervalSeconds)*time.Second,
+ &logger,
+ )
+}
diff --git a/recording/watcher.go b/recording/watcher.go
index 4ae17b7..2510b8b 100644
--- a/recording/watcher.go
+++ b/recording/watcher.go
@@ -3,8 +3,6 @@ package recording
import (
"context"
"encoding/json"
- "fmt"
- "github.com/keuin/slbr/common"
"github.com/keuin/slbr/danmaku"
"github.com/keuin/slbr/danmaku/dmmsg"
"github.com/keuin/slbr/danmaku/dmpkg"
@@ -12,13 +10,6 @@ import (
"time"
)
-type WatcherEvent int
-
-const (
- WatcherLiveStart WatcherEvent = 0
- WatcherLiveStop WatcherEvent = 1
-)
-
type liveCommand string
const (
@@ -31,14 +22,6 @@ type liveInfo struct {
Data map[string]interface{} `json:"data"`
}
-type ErrorReason int
-
-const (
- ErrSuccess ErrorReason = iota // no error happens, normally closed
- ErrTransport // I/O error, safe to retry
- ErrProtocol // application protocol logic error, do not retry
-)
-
const (
kHeartBeatInterval = 30 * time.Second
)
@@ -46,32 +29,41 @@ const (
// watch monitors live room status by subscribing messages from Bilibili danmaku server,
// which talks to the client via a WebSocket or TCP connection.
// In our implementation, we use WebSocket over SSL/TLS.
+// This function will return after the live is started,
+// since one connection cannot receive more than one live start event.
+// Error types:
+// - UnrecoverableError
+// - RecoverableError
+// - context.Cancelled
func watch(
ctx context.Context,
+ t TaskConfig,
url string,
authKey string,
- roomId common.RoomId,
liveStatusChecker func() (bool, error),
- chEvent chan<- WatcherEvent,
logger logging.Logger,
-) (error, ErrorReason) {
+) error {
var err error
dm := danmaku.NewDanmakuClient()
- defer func() { _ = dm.Disconnect() }()
// connect to danmaku server for live online/offline notifications
err = dm.Connect(ctx, url)
if err != nil {
- return fmt.Errorf("failed to connect to danmaku server: %w", err), ErrTransport
+ return NewRecoverableTaskError("failed to connect to danmaku server", err)
}
- defer func() { _ = dm.Disconnect() }()
+ defer func() {
+ // this operation may be time-consuming, so run in another goroutine
+ go func() {
+ _ = dm.Disconnect()
+ }()
+ }()
// the danmaku server requires an auth token and room id when connected
logger.Info("ws connected. Authenticating...")
- err = dm.Authenticate(roomId, authKey)
+ err = dm.Authenticate(t.RoomId, authKey)
if err != nil {
- return fmt.Errorf("auth failed: %w", err), ErrProtocol
+ return NewUnrecoverableTaskError("authentication failed, invalid protocol", err)
}
// the danmaku server requires heartbeat messages every 30 seconds
@@ -83,7 +75,7 @@ func watch(
// send initial heartbeat immediately
err = heartbeat()
if err != nil {
- return err, ErrTransport
+ return NewRecoverableTaskError("heartbeat failed", err)
}
// create heartbeat timer
@@ -91,14 +83,13 @@ func watch(
defer func() { heartBeatTimer.Stop() }()
logger.Info("Checking initial live status...")
- isLiving, err := liveStatusChecker()
+ isLiving, err := AutoRetryWithConfig[bool](ctx, logger, &t, liveStatusChecker)
if err != nil {
- return fmt.Errorf("check initial live status failed: %w", err), ErrTransport
+ return NewRecoverableTaskError("check initial live status failed", err)
}
-
if isLiving {
logger.Info("The live is already started. Start recording immediately.")
- chEvent <- WatcherLiveStart
+ return nil
} else {
logger.Info("The live is not started yet. Waiting...")
}
@@ -106,22 +97,22 @@ func watch(
for {
select {
case <-ctx.Done():
- return nil, ErrSuccess
+ return ctx.Err()
case <-heartBeatTimer.C:
err = heartbeat()
if err != nil {
- return fmt.Errorf("heartbeat failed: %w", err), ErrTransport
+ return NewRecoverableTaskError("heartbeat failed", err)
}
default:
var msg dmpkg.DanmakuExchange
msg, err = dm.ReadExchange()
if err != nil {
- return fmt.Errorf("exchange read failed: %w", err), ErrTransport
+ return NewRecoverableTaskError("failed to read exchange from server", err)
}
// the exchange may be compressed
msg, err = msg.Inflate()
if err != nil {
- return fmt.Errorf("inflate server message failed: %v", err), ErrProtocol
+ return NewUnrecoverableTaskError("failed to decompress server message", err)
}
switch msg.Operation {
@@ -131,18 +122,13 @@ func watch(
err := json.Unmarshal(msg.Body, &info)
if err != nil {
logger.Error("Invalid JSON: \"%v\", exchange: %v", string(msg.Body), msg)
- return fmt.Errorf("decode server message body JSON failed: %w", err), ErrProtocol
+ return NewUnrecoverableTaskError("invalid JSON response from server", err)
}
switch info.Command {
case CommandLiveStart:
- if !isLiving {
- chEvent <- WatcherLiveStart
- isLiving = true
- }
+ return nil
case CommandStreamPreparing:
- if isLiving {
- chEvent <- WatcherLiveStop
- }
+ break
default:
switch info.Command {
case "ENTRY_EFFECT":
@@ -168,7 +154,7 @@ func watch(
logger.Error("Cannot parse watched people number: %v", obj)
continue
}
- logger.Info("The number of viewers (room: %v): %v", roomId, viewersNum)
+ logger.Info("The number of viewers (room: %v): %v", t.RoomId, viewersNum)
case "INTERACT_WORD":
var raw dmmsg.RawInteractWordMessage
err = json.Unmarshal(msg.Body, &raw)