diff options
author | Keuin <[email protected]> | 2022-09-07 10:46:33 +0800 |
---|---|---|
committer | Keuin <[email protected]> | 2022-09-07 10:46:33 +0800 |
commit | c78edaa0ffa28bb360663f172e98540b7978e9b2 (patch) | |
tree | 6e99ca4c42d31eb8516fe8b16a445f3618530948 | |
parent | fd48122ae9c340b867bb31dbbde6a3b7a4667944 (diff) |
Handle error correctly when copying live stream.
-rw-r--r-- | bilibili/streaming.go | 13 | ||||
-rw-r--r-- | common/copy.go | 46 | ||||
-rw-r--r-- | recording/runner.go | 8 |
3 files changed, 63 insertions, 4 deletions
diff --git a/bilibili/streaming.go b/bilibili/streaming.go index cc915a3..ec99565 100644 --- a/bilibili/streaming.go +++ b/bilibili/streaming.go @@ -59,8 +59,17 @@ func (b Bilibili) CopyLiveStream( defer cancelGuardian() // blocking copy - n, err := io.Copy(out, resp.Body) + n, err, isCancelled := common.Copy(ctx, out, resp.Body) + if isCancelled { + // cancelled by context + // this error is useless + err = nil + } + if !isCancelled && err != nil { + // real error happens + b.error.Printf("Stream copying was interrupted unexpectedly: %v", err) + } b.info.Printf("Bytes copied: %v", n) - return + return err } diff --git a/common/copy.go b/common/copy.go new file mode 100644 index 0000000..a896fb4 --- /dev/null +++ b/common/copy.go @@ -0,0 +1,46 @@ +package common + +/* +Copied from https://ixday.github.io/post/golang-cancel-copy/ +*/ + +import ( + "context" + "io" +) + +// here is some syntax sugar inspired by the Tomas Senart's video, +// it allows me to inline the Reader interface +type readerFunc func(p []byte) (n int, err error) + +func (rf readerFunc) Read(p []byte) (n int, err error) { return rf(p) } + +// Copy slightly modified function signature: +// - context has been added in order to propagate cancellation +// - (undo by Keuin) I do not return the number of bytes written, has it is not useful in my use case +// - (added by Keuin) add a isCancelled return value indicating the copy is stopped by cancelling the context +func Copy(ctx context.Context, out io.Writer, in io.Reader) (written int64, err error, isCancelled bool) { + isCancelled = false + + // Copy will call the Reader and Writer interface multiple time, in order + // to copy by chunk (avoiding loading the whole file in memory). + // I insert the ability to cancel before read time as it is the earliest + // possible in the call process. + written, err = io.Copy(out, readerFunc(func(p []byte) (int, error) { + + // golang non-blocking channel: https://gobyexample.com/non-blocking-channel-operations + select { + + // if context has been canceled + case <-ctx.Done(): + // stop process and propagate "context canceled" error + isCancelled = true + return 0, ctx.Err() + default: + // otherwise just run default io.Reader implementation + return in.Read(p) + } + })) + + return +} diff --git a/recording/runner.go b/recording/runner.go index cee4325..a4693fb 100644 --- a/recording/runner.go +++ b/recording/runner.go @@ -85,7 +85,7 @@ func doTask(ctx context.Context, task *TaskConfig) error { for !cancelled { cancelled = record(recorderCtx, bi, task) } - logger.Printf("Task is cancelled. (room %v)\n", task.RoomId) + logger.Printf("Task is cancelled. Stop recording. (room %v)\n", task.RoomId) }() lastStatusIsLiving = true case WatcherLiveStop: @@ -159,7 +159,11 @@ func record( logger.Printf("Recording live stream to file \"%v\"...", filePath) err = bi.CopyLiveStream(ctx, task.RoomId, streamSource, file) - cancelled = false + cancelled = err == nil + if !cancelled { + // real error happens + logger.Printf("Error when copying live stream: %v\n", err) + } return } |