summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKeuin <[email protected]>2022-09-10 16:05:10 +0800
committerKeuin <[email protected]>2022-09-10 16:05:10 +0800
commite999937d75d8e8c40b06add376ebac423b0c2079 (patch)
tree8a30abaa5fc8fc68d283e0be52a73ee83bdaf3a2
parentf028bff042f471a68dff681af9c79ef96bc952e5 (diff)
Fix task is not properly restarted when the live is closed and started again.
Use more friendly log format to replace golang's default `log.Logger`. (not completed) Cleaner task status management.
-rw-r--r--bilibili/streaming.go12
-rw-r--r--common/bytesize.go19
-rw-r--r--common/copy.go32
-rw-r--r--logging/logger.go77
-rw-r--r--main.go33
-rw-r--r--recording/runner.go143
-rw-r--r--recording/task.go87
7 files changed, 300 insertions, 103 deletions
diff --git a/bilibili/streaming.go b/bilibili/streaming.go
index 64dc26d..9a26c2d 100644
--- a/bilibili/streaming.go
+++ b/bilibili/streaming.go
@@ -52,20 +52,10 @@ func (b Bilibili) CopyLiveStream(
defer func() { _ = resp.Body.Close() }()
- // guard the following copy loop
- // if the context is cancelled, stop it by closing the reader
- guardianCtx, cancelGuardian := context.WithCancel(ctx)
- go func() {
- <-guardianCtx.Done()
- _ = resp.Body.Close()
- }()
- defer cancelGuardian()
-
// blocking copy
n, err := common.CopyToFileWithBuffer(ctx, out, resp.Body, buffer, readChunkSize, false)
if err != nil && !errors.Is(err, context.Canceled) {
- // real error happens
b.error.Printf("Stream copying was interrupted unexpectedly: %v", err)
}
@@ -73,6 +63,6 @@ func (b Bilibili) CopyLiveStream(
b.info.Printf("The live is ended. (room %v)", roomId)
}
- b.info.Printf("Bytes copied: %v", n)
+ b.info.Printf("Total downloaded: %v", common.PrettyBytes(uint64(n)))
return err
}
diff --git a/common/bytesize.go b/common/bytesize.go
new file mode 100644
index 0000000..9b3aa97
--- /dev/null
+++ b/common/bytesize.go
@@ -0,0 +1,19 @@
+package common
+
+import "fmt"
+
+func PrettyBytes(b uint64) string {
+ if b < 1000 {
+ return fmt.Sprintf("%d Byte", b)
+ }
+ if b < 1000_000 {
+ return fmt.Sprintf("%.2f KiB", float64(b)/1024)
+ }
+ if b < 1000_000_000 {
+ return fmt.Sprintf("%.2f MiB", float64(b)/1024/1024)
+ }
+ if b < 1000_000_000_000 {
+ return fmt.Sprintf("%.2f GiB", float64(b)/1024/1024/1024)
+ }
+ return fmt.Sprintf("%.2f TiB", float64(b)/1024/1024/1024/1024)
+}
diff --git a/common/copy.go b/common/copy.go
index bee6515..1d273dc 100644
--- a/common/copy.go
+++ b/common/copy.go
@@ -36,29 +36,31 @@ func CopyToFileWithBuffer(
}()
for {
- if err = ctx.Err(); err != nil {
+ select {
+ case <-ctx.Done():
return
- }
- nRead, err = in.Read(buffer[off:Min[int](off+chunkSize, bufSize)])
- if err != nil {
- return
- }
- off += nRead
- if off == bufSize {
- // buffer is full
- var nWritten int
- nWritten, err = out.Write(buffer)
+ default:
+ nRead, err = in.Read(buffer[off:Min[int](off+chunkSize, bufSize)])
if err != nil {
return
}
- if syncFile {
- err = out.Sync()
+ off += nRead
+ if off == bufSize {
+ // buffer is full
+ var nWritten int
+ nWritten, err = out.Write(buffer)
if err != nil {
return
}
+ if syncFile {
+ err = out.Sync()
+ if err != nil {
+ return
+ }
+ }
+ written += int64(nWritten)
+ off = 0
}
- written += int64(nWritten)
- off = 0
}
}
}
diff --git a/logging/logger.go b/logging/logger.go
new file mode 100644
index 0000000..9724c2f
--- /dev/null
+++ b/logging/logger.go
@@ -0,0 +1,77 @@
+package logging
+
+/*
+golang's `log` package sucks, so we wrap it.
+*/
+
+import (
+ "fmt"
+ "log"
+ "runtime"
+)
+
+type Logger struct {
+ delegate *log.Logger
+ prefix string
+ debugHeader string
+ infoHeader string
+ warningHeader string
+ errorHeader string
+ fatalHeader string
+}
+
+const (
+ kDebug = "DEBUG"
+ kInfo = "INFO"
+ kWarning = "WARNING"
+ kError = "ERROR"
+ kFatal = "FATAL"
+)
+
+func NewWrappedLogger(delegate *log.Logger, name string) Logger {
+ return Logger{
+ delegate: delegate,
+ debugHeader: fmt.Sprintf("[%v][%v]", name, kDebug),
+ infoHeader: fmt.Sprintf("[%v][%v]", name, kInfo),
+ warningHeader: fmt.Sprintf("[%v][%v]", name, kWarning),
+ errorHeader: fmt.Sprintf("[%v][%v]", name, kError),
+ fatalHeader: fmt.Sprintf("[%v][%v]", name, kFatal),
+ }
+}
+
+func getCallerInfo() string {
+ _, file, line, ok := runtime.Caller(2)
+ if !ok {
+ file = "???"
+ line = 0
+ }
+ short := file
+ for i := len(file) - 1; i > 0; i-- {
+ if file[i] == '/' {
+ short = file[i+1:]
+ break
+ }
+ }
+ file = short
+ return fmt.Sprintf("[%v:%v]", file, line)
+}
+
+func (l Logger) Debug(format string, v ...any) {
+ l.delegate.Printf(l.debugHeader+getCallerInfo()+" "+format, v...)
+}
+
+func (l Logger) Info(format string, v ...any) {
+ l.delegate.Printf(l.infoHeader+getCallerInfo()+" "+format, v...)
+}
+
+func (l Logger) Warning(format string, v ...any) {
+ l.delegate.Printf(l.warningHeader+getCallerInfo()+" "+format, v...)
+}
+
+func (l Logger) Error(format string, v ...any) {
+ l.delegate.Printf(l.errorHeader+getCallerInfo()+" "+format, v...)
+}
+
+func (l Logger) Fatal(format string, v ...any) {
+ l.delegate.Printf(l.fatalHeader+getCallerInfo()+" "+format, v...)
+}
diff --git a/main.go b/main.go
index 6b68ee0..2b22cfc 100644
--- a/main.go
+++ b/main.go
@@ -1,7 +1,13 @@
package main
+/*
+In this file we implement config file and command line arguments parsing.
+Task lifecycle management are implemented in recording package.
+*/
+
import (
"bilibili-livestream-archiver/common"
+ "bilibili-livestream-archiver/logging"
"bilibili-livestream-archiver/recording"
"context"
"fmt"
@@ -127,23 +133,32 @@ func getTasks() (tasks []recording.TaskConfig) {
}
func main() {
- tasks := getTasks()
+ logger := log.Default()
+ taskConfigs := getTasks()
+ tasks := make([]recording.RunningTask, len(taskConfigs))
+ wg := sync.WaitGroup{}
+ ctxTasks, cancelTasks := context.WithCancel(context.Background())
fmt.Println("Record tasks:")
- for i, task := range tasks {
+ for i, task := range taskConfigs {
+ tasks[i] = recording.NewRunningTask(
+ taskConfigs[i],
+ ctxTasks,
+ func() { wg.Add(1) },
+ func() { wg.Done() },
+ logging.NewWrappedLogger(logger, fmt.Sprintf("room %v", task.RoomId)),
+ )
fmt.Printf("[%2d] %s\n", i+1, task)
}
fmt.Println("")
- logger := log.Default()
-
logger.Printf("Starting tasks...")
- wg := sync.WaitGroup{}
- ctx, cancelTasks := context.WithCancel(context.Background())
- for _, task := range tasks {
- wg.Add(1)
- go recording.RunTask(ctx, &wg, &task)
+ for i := range tasks {
+ err := tasks[i].StartTask()
+ if err != nil {
+ logger.Printf("Cannot start task %v (room %v): %v. Skip.", i, tasks[i].RoomId, err)
+ }
}
// listen on stop signals
diff --git a/recording/runner.go b/recording/runner.go
index 2f15abd..98d4432 100644
--- a/recording/runner.go
+++ b/recording/runner.go
@@ -15,7 +15,6 @@ import (
"log"
"os"
"path"
- "sync"
"time"
)
@@ -25,93 +24,106 @@ type TaskResult struct {
Error error
}
-const kReadChunkSize = 64 * 1024
+const kReadChunkSize = 128 * 1024
-// RunTask start a monitor&download task and
-// put its execution result into a channel.
-func RunTask(ctx context.Context, wg *sync.WaitGroup, task *TaskConfig) {
- defer wg.Done()
- err := doTask(ctx, task)
- logger := log.Default()
- if err != nil && !errors.Is(err, context.Canceled) {
- logger.Printf("A task stopped with an error (room %v): %v", task.RoomId, err)
- } else {
- logger.Printf("Task stopped (room %v): %v", task.RoomId, task.String())
+// runTaskWithAutoRestart
+// start a monitor&download task.
+// The task will be restarted infinitely until the context is closed,
+// which means it will survive when the live is ended. (It always waits for the next start)
+// During the process, its status may change.
+// Note: this method is blocking.
+func (t *RunningTask) runTaskWithAutoRestart() error {
+ for {
+ t.status = StRunning
+ err := tryRunTask(t)
+ if errors.Is(err, bilibili.ErrRoomIsClosed) {
+ t.status = StRestarting
+ t.logger.Info("Restarting task...")
+ continue
+ } else if err != nil && !errors.Is(err, context.Canceled) {
+ t.logger.Error("Task stopped with an error: %v", err)
+ return fmt.Errorf("task stopped: %v", err)
+ } else {
+ t.logger.Info("Task stopped: %v", t.String())
+ return nil
+ }
}
}
-// doTask do the actual work, but returns synchronously.
-func doTask(ctx context.Context, task *TaskConfig) error {
- logger := log.Default()
- netTypes := task.Transport.AllowedNetworkTypes
- logger.Printf("Network types: %v", netTypes)
+// tryRunTask does the actual work. It will return when in the following cases:
+// - the task context is cancelled
+// - the task is restarting (e.g. because of the end of live)
+// - some unrecoverable error happens (e.g. a protocol error caused by a bilibili protocol update)
+func tryRunTask(t *RunningTask) error {
+ netTypes := t.Transport.AllowedNetworkTypes
+ t.logger.Info("Network types: %v", netTypes)
bi := bilibili.NewBilibiliWithNetType(netTypes)
- logger.Printf("Start task: room %v", task.RoomId)
+ t.logger.Info("Start task: room %v", t.RoomId)
- authKey, url, err := getStreamingServer(task, logger, bi)
+ t.logger.Info("Getting notification server info...")
+ authKey, dmUrl, err := getDanmakuServer(&t.TaskConfig, bi)
if err != nil {
return err
}
+ t.logger.Info("Success.")
// run live status watcher asynchronously
- logger.Println("Starting watcher...")
+ t.logger.Info("Starting watcher...")
chWatcherEvent := make(chan WatcherEvent)
chWatcherDown := make(chan struct{})
// start and recover watcher asynchronously
// the watcher may also be stopped by the downloader goroutine
- watcherCtx, stopWatcher := context.WithCancel(ctx)
+ watcherCtx, stopWatcher := context.WithCancel(t.ctx)
defer stopWatcher()
- go watcherRecoverableLoop(watcherCtx, url, authKey, task, bi, chWatcherEvent, chWatcherDown)
+ go watcherRecoverableLoop(
+ watcherCtx,
+ dmUrl,
+ authKey,
+ t,
+ bi,
+ chWatcherEvent,
+ chWatcherDown,
+ )
// The stream download goroutine may fail due to wrong watcher state.
// But this is likely temporarily, so we should restart the downloader
// until the state turns to closed.
- // We store the last modified live status
- // in case there is a false-positive duplicate.
- lastStatusIsLiving := false
- recorderCtx, stopRecorder := context.WithCancel(ctx)
+ recorderCtx, stopRecorder := context.WithCancel(t.ctx)
defer stopRecorder()
for {
select {
- case <-ctx.Done():
- logger.Printf("Task (room %v) is stopped.", task.RoomId)
+ case <-t.ctx.Done():
+ t.logger.Info("Task is stopped.")
return nil
case <-chWatcherDown:
// watcher is down and unrecoverable, stop this task
- return fmt.Errorf("task (room %v) stopped: watcher is down and unrecoverable", task.RoomId)
+ return fmt.Errorf("task (room %v) stopped: watcher is down and unrecoverable", t.RoomId)
case ev := <-chWatcherEvent:
switch ev {
case WatcherLiveStart:
- if lastStatusIsLiving {
- logger.Println("Duplicate adjacent WatcherLiveStart event. Ignoring.")
- continue
- }
- go func() {
- cancelled := false
- var err2 error
- // restart recorder if interrupted by I/O errors
- for !cancelled {
- cancelled, err2 = record(recorderCtx, bi, task)
- if err2 == bilibili.ErrRoomIsClosed {
- sec := task.Watch.LiveInterruptedRestartSleepSeconds
- if sec == 0 {
- // default: 3s
- // TODO move this to default config value (not easily supported by viper)
- time.Sleep(3 * time.Second)
- }
- if sec > 0 {
- logger.Printf("Sleep for %vs before restart recording.", sec)
- time.Sleep(time.Duration(sec) * time.Second)
- }
- }
+ cancelled := false
+ var err2 error
+ // restart recorder if interrupted by I/O errors
+ for !cancelled {
+ cancelled, err2 = record(recorderCtx, bi, t)
+ // live is closed normally, do not restart in current function
+ // the watcher will wait for the next start
+ if errors.Is(err2, bilibili.ErrRoomIsClosed) {
+ t.logger.Info("Live is ended. Stop recording.")
+ return bilibili.ErrRoomIsClosed
}
- logger.Printf("Task is cancelled. Stop recording. (room %v)", task.RoomId)
- }()
- lastStatusIsLiving = true
+ if err2 != nil {
+ // some other unrecoverable error
+ return err2
+ }
+ }
+ t.logger.Info("Task is cancelled. Stop recording.")
case WatcherLiveStop:
- lastStatusIsLiving = false
+ // once the live is ended, the watcher will no longer receive live start event
+ // we have to restart the watcher
+ return bilibili.ErrRoomIsClosed
}
}
}
@@ -121,7 +133,7 @@ func doTask(ctx context.Context, task *TaskConfig) error {
func record(
ctx context.Context,
bi bilibili.Bilibili,
- task *TaskConfig,
+ task *RunningTask,
) (cancelled bool, err error) {
logger := log.Default()
logger.Printf("INFO: Getting room profile...")
@@ -214,13 +226,11 @@ func watcherRecoverableLoop(
ctx context.Context,
url string,
authKey string,
- task *TaskConfig,
+ task *RunningTask,
bi bilibili.Bilibili,
- chWatcherEvent chan WatcherEvent,
+ chWatcherEvent chan<- WatcherEvent,
chWatcherDown chan<- struct{},
) {
- logger := log.Default()
-
for {
err, errReason := watch(
ctx,
@@ -250,29 +260,27 @@ func watcherRecoverableLoop(
// stop normally, the context is closed
return
case ErrProtocol:
- logger.Printf("FATAL: Watcher stopped due to an unrecoverable error: %v", err)
+ task.logger.Fatal("Watcher stopped due to an unrecoverable error: %v", err)
// shutdown the whole task
chWatcherDown <- struct{}{}
return
case ErrTransport:
- logger.Printf("ERROR: Watcher stopped due to an I/O error: %v", err)
+ task.logger.Error("ERROR: Watcher stopped due to an I/O error: %v", err)
waitSeconds := task.Transport.RetryIntervalSeconds
- logger.Printf(
+ task.logger.Warning(
"WARNING: Sleep for %v second(s) before restarting watcher.\n",
waitSeconds,
)
time.Sleep(time.Duration(waitSeconds) * time.Second)
- logger.Printf("Retrying...")
+ task.logger.Info("Retrying...")
}
}
}
-func getStreamingServer(
+func getDanmakuServer(
task *TaskConfig,
- logger *log.Logger,
bi bilibili.Bilibili,
) (string, string, error) {
- logger.Println("Getting stream server info...")
dmInfo, err := bi.GetDanmakuServerInfo(task.RoomId)
if err != nil {
return "", "", fmt.Errorf("failed to read stream server info: %w", err)
@@ -280,7 +288,6 @@ func getStreamingServer(
if len(dmInfo.Data.HostList) == 0 {
return "", "", fmt.Errorf("no available stream server")
}
- logger.Println("Success.")
// get authkey and ws url
authKey := dmInfo.Data.Token
diff --git a/recording/task.go b/recording/task.go
new file mode 100644
index 0000000..9aa41a8
--- /dev/null
+++ b/recording/task.go
@@ -0,0 +1,87 @@
+package recording
+
+/*
+In this file we implement task lifecycle management.
+Concrete task works are done in the `runner.go` file.
+*/
+
+import (
+ "bilibili-livestream-archiver/logging"
+ "context"
+ "fmt"
+)
+
+type TaskStatus int
+
+const (
+ StNotStarted TaskStatus = iota
+ StRunning
+ StRestarting
+ StStopped
+)
+
+var (
+ ErrTaskIsAlreadyStarted = fmt.Errorf("task is already started")
+ ErrTaskIsStopped = fmt.Errorf("restarting a stopped task is not allowed")
+)
+
+// RunningTask is an augmented TaskConfig struct
+// that contains volatile runtime information.
+type RunningTask struct {
+ TaskConfig
+ // ctx: the biggest context this task uses. It may create children contexts.
+ ctx context.Context
+ // result: if the task is ended, here is the returned error
+ result error
+ // status: running status
+ status TaskStatus
+ // hookStarted: called asynchronously when the task is started. This won't be called when restarting.
+ hookStarted func()
+ // hookStopped: called asynchronously when the task is stopped. This won't be called when restarting.
+ hookStopped func()
+ // logger: where to print logs
+ logger logging.Logger
+}
+
+func NewRunningTask(
+ config TaskConfig,
+ ctx context.Context,
+ hookStarted func(),
+ hookStopped func(),
+ logger logging.Logger,
+) RunningTask {
+ return RunningTask{
+ TaskConfig: config,
+ ctx: ctx,
+ status: StNotStarted,
+ hookStarted: hookStarted,
+ hookStopped: hookStopped,
+ logger: logger,
+ }
+}
+
+func (t *RunningTask) StartTask() error {
+ st := t.status
+ switch st {
+ case StNotStarted:
+ // TODO real start
+ go func() {
+ defer func() { t.status = StStopped }()
+ t.hookStarted()
+ defer t.hookStopped()
+ // do the task
+ _ = t.runTaskWithAutoRestart()
+ }()
+ return nil
+ case StRunning:
+ return ErrTaskIsAlreadyStarted
+ case StRestarting:
+ return ErrTaskIsAlreadyStarted
+ case StStopped:
+ // we don't allow starting a stopped task
+ // because some state needs to be reset
+ // just create a new task and run
+ return ErrTaskIsStopped
+ }
+ panic(fmt.Errorf("invalid task status: %v", st))
+}