summaryrefslogtreecommitdiff
path: root/recording
diff options
context:
space:
mode:
Diffstat (limited to 'recording')
-rw-r--r--recording/runner.go143
-rw-r--r--recording/task.go87
2 files changed, 162 insertions, 68 deletions
diff --git a/recording/runner.go b/recording/runner.go
index 2f15abd..98d4432 100644
--- a/recording/runner.go
+++ b/recording/runner.go
@@ -15,7 +15,6 @@ import (
"log"
"os"
"path"
- "sync"
"time"
)
@@ -25,93 +24,106 @@ type TaskResult struct {
Error error
}
-const kReadChunkSize = 64 * 1024
+const kReadChunkSize = 128 * 1024
-// RunTask start a monitor&download task and
-// put its execution result into a channel.
-func RunTask(ctx context.Context, wg *sync.WaitGroup, task *TaskConfig) {
- defer wg.Done()
- err := doTask(ctx, task)
- logger := log.Default()
- if err != nil && !errors.Is(err, context.Canceled) {
- logger.Printf("A task stopped with an error (room %v): %v", task.RoomId, err)
- } else {
- logger.Printf("Task stopped (room %v): %v", task.RoomId, task.String())
+// runTaskWithAutoRestart
+// start a monitor&download task.
+// The task will be restarted infinitely until the context is closed,
+// which means it will survive when the live is ended. (It always waits for the next start)
+// During the process, its status may change.
+// Note: this method is blocking.
+func (t *RunningTask) runTaskWithAutoRestart() error {
+ for {
+ t.status = StRunning
+ err := tryRunTask(t)
+ if errors.Is(err, bilibili.ErrRoomIsClosed) {
+ t.status = StRestarting
+ t.logger.Info("Restarting task...")
+ continue
+ } else if err != nil && !errors.Is(err, context.Canceled) {
+ t.logger.Error("Task stopped with an error: %v", err)
+ return fmt.Errorf("task stopped: %v", err)
+ } else {
+ t.logger.Info("Task stopped: %v", t.String())
+ return nil
+ }
}
}
-// doTask do the actual work, but returns synchronously.
-func doTask(ctx context.Context, task *TaskConfig) error {
- logger := log.Default()
- netTypes := task.Transport.AllowedNetworkTypes
- logger.Printf("Network types: %v", netTypes)
+// tryRunTask does the actual work. It will return when in the following cases:
+// - the task context is cancelled
+// - the task is restarting (e.g. because of the end of live)
+// - some unrecoverable error happens (e.g. a protocol error caused by a bilibili protocol update)
+func tryRunTask(t *RunningTask) error {
+ netTypes := t.Transport.AllowedNetworkTypes
+ t.logger.Info("Network types: %v", netTypes)
bi := bilibili.NewBilibiliWithNetType(netTypes)
- logger.Printf("Start task: room %v", task.RoomId)
+ t.logger.Info("Start task: room %v", t.RoomId)
- authKey, url, err := getStreamingServer(task, logger, bi)
+ t.logger.Info("Getting notification server info...")
+ authKey, dmUrl, err := getDanmakuServer(&t.TaskConfig, bi)
if err != nil {
return err
}
+ t.logger.Info("Success.")
// run live status watcher asynchronously
- logger.Println("Starting watcher...")
+ t.logger.Info("Starting watcher...")
chWatcherEvent := make(chan WatcherEvent)
chWatcherDown := make(chan struct{})
// start and recover watcher asynchronously
// the watcher may also be stopped by the downloader goroutine
- watcherCtx, stopWatcher := context.WithCancel(ctx)
+ watcherCtx, stopWatcher := context.WithCancel(t.ctx)
defer stopWatcher()
- go watcherRecoverableLoop(watcherCtx, url, authKey, task, bi, chWatcherEvent, chWatcherDown)
+ go watcherRecoverableLoop(
+ watcherCtx,
+ dmUrl,
+ authKey,
+ t,
+ bi,
+ chWatcherEvent,
+ chWatcherDown,
+ )
// The stream download goroutine may fail due to wrong watcher state.
// But this is likely temporarily, so we should restart the downloader
// until the state turns to closed.
- // We store the last modified live status
- // in case there is a false-positive duplicate.
- lastStatusIsLiving := false
- recorderCtx, stopRecorder := context.WithCancel(ctx)
+ recorderCtx, stopRecorder := context.WithCancel(t.ctx)
defer stopRecorder()
for {
select {
- case <-ctx.Done():
- logger.Printf("Task (room %v) is stopped.", task.RoomId)
+ case <-t.ctx.Done():
+ t.logger.Info("Task is stopped.")
return nil
case <-chWatcherDown:
// watcher is down and unrecoverable, stop this task
- return fmt.Errorf("task (room %v) stopped: watcher is down and unrecoverable", task.RoomId)
+ return fmt.Errorf("task (room %v) stopped: watcher is down and unrecoverable", t.RoomId)
case ev := <-chWatcherEvent:
switch ev {
case WatcherLiveStart:
- if lastStatusIsLiving {
- logger.Println("Duplicate adjacent WatcherLiveStart event. Ignoring.")
- continue
- }
- go func() {
- cancelled := false
- var err2 error
- // restart recorder if interrupted by I/O errors
- for !cancelled {
- cancelled, err2 = record(recorderCtx, bi, task)
- if err2 == bilibili.ErrRoomIsClosed {
- sec := task.Watch.LiveInterruptedRestartSleepSeconds
- if sec == 0 {
- // default: 3s
- // TODO move this to default config value (not easily supported by viper)
- time.Sleep(3 * time.Second)
- }
- if sec > 0 {
- logger.Printf("Sleep for %vs before restart recording.", sec)
- time.Sleep(time.Duration(sec) * time.Second)
- }
- }
+ cancelled := false
+ var err2 error
+ // restart recorder if interrupted by I/O errors
+ for !cancelled {
+ cancelled, err2 = record(recorderCtx, bi, t)
+ // live is closed normally, do not restart in current function
+ // the watcher will wait for the next start
+ if errors.Is(err2, bilibili.ErrRoomIsClosed) {
+ t.logger.Info("Live is ended. Stop recording.")
+ return bilibili.ErrRoomIsClosed
}
- logger.Printf("Task is cancelled. Stop recording. (room %v)", task.RoomId)
- }()
- lastStatusIsLiving = true
+ if err2 != nil {
+ // some other unrecoverable error
+ return err2
+ }
+ }
+ t.logger.Info("Task is cancelled. Stop recording.")
case WatcherLiveStop:
- lastStatusIsLiving = false
+ // once the live is ended, the watcher will no longer receive live start event
+ // we have to restart the watcher
+ return bilibili.ErrRoomIsClosed
}
}
}
@@ -121,7 +133,7 @@ func doTask(ctx context.Context, task *TaskConfig) error {
func record(
ctx context.Context,
bi bilibili.Bilibili,
- task *TaskConfig,
+ task *RunningTask,
) (cancelled bool, err error) {
logger := log.Default()
logger.Printf("INFO: Getting room profile...")
@@ -214,13 +226,11 @@ func watcherRecoverableLoop(
ctx context.Context,
url string,
authKey string,
- task *TaskConfig,
+ task *RunningTask,
bi bilibili.Bilibili,
- chWatcherEvent chan WatcherEvent,
+ chWatcherEvent chan<- WatcherEvent,
chWatcherDown chan<- struct{},
) {
- logger := log.Default()
-
for {
err, errReason := watch(
ctx,
@@ -250,29 +260,27 @@ func watcherRecoverableLoop(
// stop normally, the context is closed
return
case ErrProtocol:
- logger.Printf("FATAL: Watcher stopped due to an unrecoverable error: %v", err)
+ task.logger.Fatal("Watcher stopped due to an unrecoverable error: %v", err)
// shutdown the whole task
chWatcherDown <- struct{}{}
return
case ErrTransport:
- logger.Printf("ERROR: Watcher stopped due to an I/O error: %v", err)
+ task.logger.Error("ERROR: Watcher stopped due to an I/O error: %v", err)
waitSeconds := task.Transport.RetryIntervalSeconds
- logger.Printf(
+ task.logger.Warning(
"WARNING: Sleep for %v second(s) before restarting watcher.\n",
waitSeconds,
)
time.Sleep(time.Duration(waitSeconds) * time.Second)
- logger.Printf("Retrying...")
+ task.logger.Info("Retrying...")
}
}
}
-func getStreamingServer(
+func getDanmakuServer(
task *TaskConfig,
- logger *log.Logger,
bi bilibili.Bilibili,
) (string, string, error) {
- logger.Println("Getting stream server info...")
dmInfo, err := bi.GetDanmakuServerInfo(task.RoomId)
if err != nil {
return "", "", fmt.Errorf("failed to read stream server info: %w", err)
@@ -280,7 +288,6 @@ func getStreamingServer(
if len(dmInfo.Data.HostList) == 0 {
return "", "", fmt.Errorf("no available stream server")
}
- logger.Println("Success.")
// get authkey and ws url
authKey := dmInfo.Data.Token
diff --git a/recording/task.go b/recording/task.go
new file mode 100644
index 0000000..9aa41a8
--- /dev/null
+++ b/recording/task.go
@@ -0,0 +1,87 @@
+package recording
+
+/*
+In this file we implement task lifecycle management.
+Concrete task works are done in the `runner.go` file.
+*/
+
+import (
+ "bilibili-livestream-archiver/logging"
+ "context"
+ "fmt"
+)
+
+type TaskStatus int
+
+const (
+ StNotStarted TaskStatus = iota
+ StRunning
+ StRestarting
+ StStopped
+)
+
+var (
+ ErrTaskIsAlreadyStarted = fmt.Errorf("task is already started")
+ ErrTaskIsStopped = fmt.Errorf("restarting a stopped task is not allowed")
+)
+
+// RunningTask is an augmented TaskConfig struct
+// that contains volatile runtime information.
+type RunningTask struct {
+ TaskConfig
+ // ctx: the biggest context this task uses. It may create children contexts.
+ ctx context.Context
+ // result: if the task is ended, here is the returned error
+ result error
+ // status: running status
+ status TaskStatus
+ // hookStarted: called asynchronously when the task is started. This won't be called when restarting.
+ hookStarted func()
+ // hookStopped: called asynchronously when the task is stopped. This won't be called when restarting.
+ hookStopped func()
+ // logger: where to print logs
+ logger logging.Logger
+}
+
+func NewRunningTask(
+ config TaskConfig,
+ ctx context.Context,
+ hookStarted func(),
+ hookStopped func(),
+ logger logging.Logger,
+) RunningTask {
+ return RunningTask{
+ TaskConfig: config,
+ ctx: ctx,
+ status: StNotStarted,
+ hookStarted: hookStarted,
+ hookStopped: hookStopped,
+ logger: logger,
+ }
+}
+
+func (t *RunningTask) StartTask() error {
+ st := t.status
+ switch st {
+ case StNotStarted:
+ // TODO real start
+ go func() {
+ defer func() { t.status = StStopped }()
+ t.hookStarted()
+ defer t.hookStopped()
+ // do the task
+ _ = t.runTaskWithAutoRestart()
+ }()
+ return nil
+ case StRunning:
+ return ErrTaskIsAlreadyStarted
+ case StRestarting:
+ return ErrTaskIsAlreadyStarted
+ case StStopped:
+ // we don't allow starting a stopped task
+ // because some state needs to be reset
+ // just create a new task and run
+ return ErrTaskIsStopped
+ }
+ panic(fmt.Errorf("invalid task status: %v", st))
+}