From ee451cbd950f82abf1df3d5e49ad74fbe9a53aa8 Mon Sep 17 00:00:00 2001 From: Keuin Date: Thu, 8 Sep 2022 03:33:41 +0800 Subject: Add watch restart sleep. --- main.go | 24 +++++++++++++++++++----- recording/config.go | 5 +++++ recording/runner.go | 22 ++++++++++++++++++---- 3 files changed, 42 insertions(+), 9 deletions(-) diff --git a/main.go b/main.go index db58eca..51350b5 100644 --- a/main.go +++ b/main.go @@ -48,8 +48,18 @@ func getTasks() (tasks []recording.TaskConfig) { &argparse.Options{ Required: false, Help: "Specify which configuration file to use", + Default: ".", }, ) + diskBufSizePtr := parser.Int( + "b", "disk-write-buffer", + &argparse.Options{ + Required: false, + Help: "Specify disk write buffer size (bytes). The real minimum buffer size is determined by OS.", + Default: -1, + }, + ) + err = parser.Parse(os.Args) if err != nil { return @@ -71,13 +81,15 @@ func getTasks() (tasks []recording.TaskConfig) { if fromFile { configFile := *configFilePtr fmt.Printf("Config file: %v\n", configFile) - var file *os.File - file, err = os.Open(configFile) + + viper.SetConfigFile(configFile) + + err = viper.ReadInConfig() if err != nil { err = fmt.Errorf("cannot open config file \"%v\": %w", configFile, err) return } - err = viper.ReadConfig(file) + if err != nil { err = fmt.Errorf("cannot read config file \"%v\": %w", configFile, err) return @@ -95,13 +107,15 @@ func getTasks() (tasks []recording.TaskConfig) { // generate task list from cli taskCount := len(*rooms) tasks = make([]recording.TaskConfig, taskCount) - saveTo := common.Zeroable[string](*saveToPtr).OrElse(".") + saveTo := *saveToPtr + diskBufSize := *diskBufSizePtr for i := 0; i < taskCount; i++ { tasks[i] = recording.TaskConfig{ RoomId: common.RoomId((*rooms)[i]), Transport: recording.DefaultTransportConfig(), Download: recording.DownloadConfig{ - SaveDirectory: saveTo, + DiskWriteBufferBytes: diskBufSize, + SaveDirectory: saveTo, }, } } diff --git a/recording/config.go b/recording/config.go index 975887b..6d8704b 100644 --- a/recording/config.go +++ b/recording/config.go @@ -9,6 +9,7 @@ type TaskConfig struct { RoomId common.RoomId `mapstructure:"room_id"` Transport TransportConfig `mapstructure:"transport"` Download DownloadConfig `mapstructure:"download"` + Watch WatchConfig `mapstructure:"watch"` } type TransportConfig struct { @@ -22,6 +23,10 @@ type DownloadConfig struct { DiskWriteBufferBytes int `mapstructure:"disk_write_buffer_bytes"` } +type WatchConfig struct { + LiveInterruptedRestartSleepSeconds int `mapstructure:"live_interrupted_restart_sleep_seconds"` +} + func DefaultTransportConfig() TransportConfig { return TransportConfig{ SocketTimeoutSeconds: 10, diff --git a/recording/runner.go b/recording/runner.go index c95e559..487cab8 100644 --- a/recording/runner.go +++ b/recording/runner.go @@ -87,9 +87,22 @@ func doTask(ctx context.Context, task *TaskConfig) error { } go func() { cancelled := false + var err2 error // restart recorder if interrupted by I/O errors for !cancelled { - cancelled = record(recorderCtx, bi, task) + cancelled, err2 = record(recorderCtx, bi, task) + if err2 == bilibili.ErrRoomIsClosed { + sec := task.Watch.LiveInterruptedRestartSleepSeconds + if sec == 0 { + // default: 3s + // TODO move this to default config value (not easily supported by viper) + time.Sleep(3 * time.Second) + } + if sec > 0 { + logger.Printf("Sleep for %vs before restart recording.\n", sec) + time.Sleep(time.Duration(sec) * time.Second) + } + } } logger.Printf("Task is cancelled. Stop recording. (room %v)\n", task.RoomId) }() @@ -106,7 +119,7 @@ func record( ctx context.Context, bi bilibili.Bilibili, task *TaskConfig, -) (cancelled bool) { +) (cancelled bool, err error) { logger := log.Default() logger.Printf("INFO: Getting room profile...\n") @@ -149,8 +162,8 @@ func record( return } if len(urlInfo.Data.URLs) == 0 { - j, err := json.Marshal(urlInfo) - if err != nil { + j, err2 := json.Marshal(urlInfo) + if err2 != nil { j = []byte("(not available)") } logger.Printf("ERROR: No stream returned from API. Response: %v", string(j)) @@ -182,6 +195,7 @@ func record( logger.Printf("Failed to flush buffered file write data: %v\n", err) } }() + logger.Printf("Write buffer size: %v byte\n", fWriter.Size()) logger.Printf("Recording live stream to file \"%v\"...", filePath) err = bi.CopyLiveStream(ctx, task.RoomId, streamSource, fWriter) -- cgit v1.2.3