summaryrefslogtreecommitdiff
path: root/recording/runner.go
diff options
context:
space:
mode:
Diffstat (limited to 'recording/runner.go')
-rw-r--r--recording/runner.go357
1 files changed, 194 insertions, 163 deletions
diff --git a/recording/runner.go b/recording/runner.go
index 58bcbc9..fe518b8 100644
--- a/recording/runner.go
+++ b/recording/runner.go
@@ -12,9 +12,11 @@ import (
"fmt"
"github.com/keuin/slbr/bilibili"
"github.com/keuin/slbr/common"
+ "github.com/keuin/slbr/logging"
"io"
"os"
"path"
+ "sync"
"time"
)
@@ -27,40 +29,40 @@ type TaskResult struct {
const kReadChunkSize = 128 * 1024
const kSpecialExtName = "partial"
+var errLiveEnded = NewRecoverableTaskError("live is ended", nil)
+
// runTaskWithAutoRestart
// start a monitor&download task.
// The task will be restarted infinitely until the context is closed,
// which means it will survive when the live is ended. (It always waits for the next start)
// During the process, its status may change.
// Note: this method is blocking.
-func (t *RunningTask) runTaskWithAutoRestart() error {
+func (t *RunningTask) runTaskWithAutoRestart() {
+ t.status = StRunning
+loop:
for {
- t.status = StRunning
- err := tryRunTask(t)
- if err == nil ||
- errors.Is(err, bilibili.ErrRoomIsClosed) ||
- errors.Is(err, io.EOF) ||
- errors.Is(err, io.ErrUnexpectedEOF) {
- if !errors.Is(err, io.EOF) {
- t.logger.Error("Task stopped because of an error: %v", err)
+ switch err := tryRunTask(t); err.(type) {
+ case nil:
+ t.logger.Info("Task stopped: %v", t.String())
+ case *RecoverableTaskError:
+ if err != errLiveEnded {
+ t.logger.Error("Temporary error: %v", err)
}
t.status = StRestarting
- t.logger.Info("Restarting task...")
- continue
- } else if err != nil && !errors.Is(err, context.Canceled) {
- t.logger.Error("Task stopped with an error: %v", err)
- return fmt.Errorf("task stopped: %v", err)
- } else {
- t.logger.Info("Task stopped: %v", t.String())
- return nil
+ default:
+ if !errors.Is(err, context.Canceled) {
+ t.logger.Error("Cannot recover from error: %v", err)
+ }
+ break loop
}
}
+ t.logger.Info("Task stopped: %v", t.String())
}
// tryRunTask does the actual work. It will return when in the following cases:
-// - the task context is cancelled
-// - the task is restarting (e.g. because of the end of live)
-// - some unrecoverable error happens (e.g. a protocol error caused by a bilibili protocol update)
+// RecoverableError (end of live, IO error)
+// UnrecoverableError (protocol error)
+// context.Cancelled (the task is stopping)
func tryRunTask(t *RunningTask) error {
netTypes := t.Transport.AllowedNetworkTypes
t.logger.Info("Network types: %v", netTypes)
@@ -68,117 +70,205 @@ func tryRunTask(t *RunningTask) error {
t.logger.Info("Start task: room %v", t.RoomId)
t.logger.Info("Getting notification server info...")
- authKey, dmUrl, err := getDanmakuServer(&t.TaskConfig, bi)
+
+ type dmServerInfo struct {
+ AuthKey string
+ DmUrl string
+ }
+
+ dmInfo, err := AutoRetryWithTask(
+ t, func() (info dmServerInfo, err error) {
+ info.AuthKey, info.DmUrl, err = getDanmakuServer(&t.TaskConfig, bi)
+ return
+ },
+ )
if err != nil {
- return err
+ return NewRecoverableTaskError("cannot get notification server info", err)
}
+
t.logger.Info("Success.")
+ // wait for watcher goroutine
+ wg := sync.WaitGroup{}
+ defer wg.Wait()
+
+ liveStatusChecker := func() (bool, error) {
+ resp, err := bi.GetRoomPlayInfo(t.RoomId)
+ if err != nil {
+ return false, err
+ }
+ if resp.Code != 0 {
+ return false, fmt.Errorf("bilibili API error: %v", resp.Message)
+ }
+ return resp.Data.LiveStatus.IsStreaming(), nil
+ }
+
// run live status watcher asynchronously
t.logger.Info("Starting watcher...")
- chWatcherEvent := make(chan WatcherEvent)
- chWatcherDown := make(chan struct{})
- // start and recover watcher asynchronously
- // the watcher may also be stopped by the downloader goroutine
- watcherCtx, stopWatcher := context.WithCancel(t.ctx)
+ wg.Add(1)
+ chWatcherError := make(chan error)
+ ctxWatcher, stopWatcher := context.WithCancel(t.ctx)
defer stopWatcher()
- go watcherRecoverableLoop(
- watcherCtx,
- dmUrl,
- authKey,
- t,
- bi,
- chWatcherEvent,
- chWatcherDown,
- )
-
- // The stream download goroutine may fail due to wrong watcher state.
- // But this is likely temporarily, so we should restart the downloader
- // until the state turns to closed.
-
- recorderCtx, stopRecorder := context.WithCancel(t.ctx)
- defer stopRecorder()
-
- ev := <-chWatcherEvent
- switch ev {
- case WatcherLiveStart:
- var err2 error
- // restart recorder if interrupted by I/O errors
- _, err2 = record(recorderCtx, bi, t)
- if errors.Is(err2, io.EOF) {
- t.logger.Info("The live seems to be closed normally.")
- } else if errors.Is(err2, io.ErrUnexpectedEOF) {
- t.logger.Warning("Reading is interrupted because of an unexpected EOF.")
- } else {
- t.logger.Error("Error when copying live stream: %v", err2)
+ go func() {
+ var err error
+ defer wg.Done()
+ run := true
+ loop:
+ for run {
+ err = watch(
+ ctxWatcher,
+ t.TaskConfig,
+ dmInfo.DmUrl,
+ dmInfo.AuthKey,
+ liveStatusChecker,
+ t.logger,
+ )
+ switch err.(type) {
+ case nil:
+ // live is started, stop watcher loop and start the recorder
+ break loop
+ case *RecoverableTaskError:
+ // if the watcher fails and recoverable, just try to recover
+ // because the recorder has not started yet
+ run = true
+ t.logger.Error("Error occurred in live status watcher: %v", err)
+ break
+ case *UnrecoverableTaskError:
+ // the watcher cannot recover, so the task should be stopped
+ run = false
+ t.logger.Error("Error occurred in live status watcher: %v", err)
+ default:
+ run = false
+ // the task is being cancelled
+ if errors.Is(err, context.Canceled) {
+ break loop
+ }
+ // unknown error type, this should not happen
+ t.logger.Error("Unexpected type of error in watcher: %v", err)
+ }
+ if run {
+ t.logger.Info("Restarting watcher...")
+ } else {
+ t.logger.Error("Cannot restart watcher to recover from that error.")
+ }
}
- t.logger.Info("Stop recording.")
- return err2
- case WatcherLiveStop:
- // once the live is ended, the watcher will no longer receive live start event
- // we have to restart the watcher
- return bilibili.ErrRoomIsClosed
+ chWatcherError <- err
+ }()
+
+ // wait for live start signal or the watcher stops abnormally
+ switch errWatcher := <-chWatcherError; errWatcher.(type) {
+ case nil:
+ // live is started, start recording
+ // (now the watcher should have stopped)
+ return func() error {
+ var err error
+ run := true
+ for run {
+ err = record(t.ctx, bi, &t.TaskConfig, t.logger)
+ switch err.(type) {
+ case nil:
+ // live is ended
+ t.logger.Info("The live is ended. Restarting current task...")
+ return errLiveEnded
+ case *RecoverableTaskError:
+ // here we don't know if the live is ended, so we have to do a check
+ t.logger.Warning("Recording is interrupted. Checking live status...")
+ isLiving, err2 := AutoRetryWithTask(t, liveStatusChecker)
+ if err2 != nil {
+ return NewRecoverableTaskError(
+ "when handling an error, another error occurred",
+ fmt.Errorf("first: %v, second: %w", err, err2),
+ )
+ }
+ if isLiving {
+ t.logger.Info("This is a temporary error. Restarting recording...")
+ } else {
+ t.logger.Info("The live is ended. Restarting current task...")
+ return errLiveEnded
+ }
+ run = isLiving
+ break
+ default:
+ run = false
+ if errors.Is(err, context.Canceled) {
+ t.logger.Info("Recorder is stopped.")
+ } else if errors.Is(err, io.EOF) {
+ t.logger.Info("The live seems to be closed normally.")
+ } else if errors.Is(err, io.ErrUnexpectedEOF) {
+ t.logger.Warning("Reading is interrupted because of an unexpected EOF.")
+ } else {
+ t.logger.Error("Error when copying live stream: %v", err)
+ }
+ t.logger.Info("Stop recording.")
+ }
+ }
+ return err
+ }()
+ case *UnrecoverableTaskError:
+ // watcher is stopped and cannot restart
+ return NewUnrecoverableTaskError("failed to watch live status", errWatcher)
default:
- return fmt.Errorf("unknown watcher event: %v", ev)
+ // watcher is cancelled, stop running the task
+ if errors.Is(errWatcher, context.Canceled) {
+ return errWatcher
+ }
+ // unexpected error, this is a programming error
+ return NewUnrecoverableTaskError("unexpected error type", errWatcher)
}
}
// record. When cancelled, the caller should clean up immediately and stop the task.
+// Errors:
+// RecoverableError
+// UnrecoverableError
+// context.Cancelled
+// nil (live is ended normally)
func record(
ctx context.Context,
bi bilibili.Bilibili,
- task *RunningTask,
-) (cancelled bool, err error) {
- task.logger.Info("Getting room profile...")
+ task *TaskConfig,
+ logger logging.Logger,
+) error {
+ logger.Info("Getting room profile...")
- profile, err := common.AutoRetry(
+ profile, err := AutoRetryWithConfig(
ctx,
+ logger,
+ task,
func() (bilibili.RoomProfileResponse, error) {
return bi.GetRoomProfile(task.RoomId)
},
- task.Transport.MaxRetryTimes,
- time.Duration(task.Transport.RetryIntervalSeconds)*time.Second,
- &task.logger,
)
- if errors.Is(err, context.Canceled) {
- cancelled = true
- return
- }
if err != nil {
- // still error, abort
- task.logger.Error("Cannot get room information: %v. Stopping current task.", err)
- cancelled = true
- return
+ if errors.Is(err, context.Canceled) {
+ return err
+ }
+ return NewRecoverableTaskError("failed to get living room information", err)
}
- task.logger.Info("Getting stream url...")
- urlInfo, err := common.AutoRetry(
+ logger.Info("Getting stream url...")
+ urlInfo, err := AutoRetryWithConfig(
ctx,
+ logger,
+ task,
func() (bilibili.RoomUrlInfoResponse, error) {
return bi.GetStreamingInfo(task.RoomId)
},
- task.Transport.MaxRetryTimes,
- time.Duration(task.Transport.RetryIntervalSeconds)*time.Second,
- &task.logger,
)
- if errors.Is(err, context.Canceled) {
- cancelled = true
- return
- }
if err != nil {
- task.logger.Error("Cannot get streaming info: %v", err)
- cancelled = true
- return
+ if errors.Is(err, context.Canceled) {
+ return err
+ }
+ return NewRecoverableTaskError("failed to get live info", err)
}
if len(urlInfo.Data.URLs) == 0 {
j, err2 := json.Marshal(urlInfo)
if err2 != nil {
j = []byte("(not available)")
}
- task.logger.Error("No stream returned from API. Response: %v", string(j))
- cancelled = true
- return
+ logger.Error("No stream was provided. Response: %v", string(j))
+ return NewUnrecoverableTaskError("invalid live info", fmt.Errorf("no stream provided"))
}
streamSource := urlInfo.Data.URLs[0]
@@ -187,7 +277,7 @@ func record(
// the real extension name (without renaming)
originalExtName := common.Errorable[string](common.GetFileExtensionFromUrl(streamSource.URL)).OrElse("flv")
- if task.TaskConfig.Download.UseSpecialExtNameBeforeFinishing {
+ if task.Download.UseSpecialExtNameBeforeFinishing {
extName = kSpecialExtName
} else {
extName = originalExtName
@@ -200,9 +290,8 @@ func record(
file, err := os.OpenFile(filePath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644)
if err != nil {
- task.logger.Error("Cannot open file for writing: %v", err)
- cancelled = true
- return
+ logger.Error("Cannot open file for writing: %v", err)
+ return NewUnrecoverableTaskError("cannot open file for writing", err)
}
// rename the extension name to originalExtName when finish writing
defer func() {
@@ -213,10 +302,10 @@ func record(
to := path.Join(saveDir, common.CombineFileName(baseName, originalExtName))
err := os.Rename(from, to)
if err != nil {
- task.logger.Error("Cannot rename %v to %v: %v", from, to, err)
+ logger.Error("Cannot rename %v to %v: %v", from, to, err)
return
}
- task.logger.Info("Rename file \"%s\" to \"%s\".", from, to)
+ logger.Info("Rename file \"%s\" to \"%s\".", from, to)
}()
defer func() { _ = file.Close() }()
@@ -228,72 +317,14 @@ func record(
writeBufferSize += kReadChunkSize - mod
}
writeBuffer := make([]byte, writeBufferSize)
- task.logger.Info("Write buffer size: %v byte", writeBufferSize)
- task.logger.Info("Recording live stream to file \"%v\"...", filePath)
+ logger.Info("Write buffer size: %v byte", writeBufferSize)
+ logger.Info("Recording live stream to file \"%v\"...", filePath)
err = bi.CopyLiveStream(ctx, task.RoomId, streamSource, file, writeBuffer, kReadChunkSize)
- cancelled = err == nil || errors.Is(err, context.Canceled)
- if !cancelled {
- // real error happens
- task.logger.Error("Error when copying live stream: %v", err)
- }
- return
-}
-
-// watcherRecoverableLoop run watcher forever until the context is cancelled.
-func watcherRecoverableLoop(
- ctx context.Context,
- url string,
- authKey string,
- task *RunningTask,
- bi bilibili.Bilibili,
- chWatcherEvent chan<- WatcherEvent,
- chWatcherDown chan<- struct{},
-) {
- for {
- err, errReason := watch(
- ctx,
- url,
- authKey,
- task.RoomId,
- func() (bool, error) {
- resp, err := bi.GetRoomPlayInfo(task.RoomId)
- if err != nil {
- return false, err
- }
- if resp.Code != 0 {
- return false, fmt.Errorf("bilibili API error: %v", resp.Message)
- }
- return resp.Data.LiveStatus.IsStreaming(), nil
- },
- chWatcherEvent,
- task.logger,
- )
-
- // the context is cancelled, stop watching
- if errors.Is(err, context.Canceled) {
- return
- }
-
- switch errReason {
- case ErrSuccess:
- // stop normally, the context is closed
- return
- case ErrProtocol:
- task.logger.Fatal("Watcher stopped due to an unrecoverable error: %v", err)
- // shutdown the whole task
- chWatcherDown <- struct{}{}
- return
- case ErrTransport:
- task.logger.Error("Watcher stopped due to an I/O error: %v", err)
- waitSeconds := task.Transport.RetryIntervalSeconds
- task.logger.Warning(
- "Sleep for %v second(s) before restarting watcher.\n",
- waitSeconds,
- )
- time.Sleep(time.Duration(waitSeconds) * time.Second)
- task.logger.Info("Retrying...")
- }
+ if errors.Is(err, context.Canceled) || err == nil {
+ return err
}
+ logger.Error("Error when copying live stream: %v", err)
+ return NewRecoverableTaskError("stream copy was unexpectedly interrupted", err)
}
func getDanmakuServer(