summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--bilibili/streaming.go13
-rw-r--r--common/copy.go46
-rw-r--r--recording/runner.go8
3 files changed, 63 insertions, 4 deletions
diff --git a/bilibili/streaming.go b/bilibili/streaming.go
index cc915a3..ec99565 100644
--- a/bilibili/streaming.go
+++ b/bilibili/streaming.go
@@ -59,8 +59,17 @@ func (b Bilibili) CopyLiveStream(
defer cancelGuardian()
// blocking copy
- n, err := io.Copy(out, resp.Body)
+ n, err, isCancelled := common.Copy(ctx, out, resp.Body)
+ if isCancelled {
+ // cancelled by context
+ // this error is useless
+ err = nil
+ }
+ if !isCancelled && err != nil {
+ // real error happens
+ b.error.Printf("Stream copying was interrupted unexpectedly: %v", err)
+ }
b.info.Printf("Bytes copied: %v", n)
- return
+ return err
}
diff --git a/common/copy.go b/common/copy.go
new file mode 100644
index 0000000..a896fb4
--- /dev/null
+++ b/common/copy.go
@@ -0,0 +1,46 @@
+package common
+
+/*
+Copied from https://ixday.github.io/post/golang-cancel-copy/
+*/
+
+import (
+ "context"
+ "io"
+)
+
+// here is some syntax sugar inspired by the Tomas Senart's video,
+// it allows me to inline the Reader interface
+type readerFunc func(p []byte) (n int, err error)
+
+func (rf readerFunc) Read(p []byte) (n int, err error) { return rf(p) }
+
+// Copy slightly modified function signature:
+// - context has been added in order to propagate cancellation
+// - (undo by Keuin) I do not return the number of bytes written, has it is not useful in my use case
+// - (added by Keuin) add a isCancelled return value indicating the copy is stopped by cancelling the context
+func Copy(ctx context.Context, out io.Writer, in io.Reader) (written int64, err error, isCancelled bool) {
+ isCancelled = false
+
+ // Copy will call the Reader and Writer interface multiple time, in order
+ // to copy by chunk (avoiding loading the whole file in memory).
+ // I insert the ability to cancel before read time as it is the earliest
+ // possible in the call process.
+ written, err = io.Copy(out, readerFunc(func(p []byte) (int, error) {
+
+ // golang non-blocking channel: https://gobyexample.com/non-blocking-channel-operations
+ select {
+
+ // if context has been canceled
+ case <-ctx.Done():
+ // stop process and propagate "context canceled" error
+ isCancelled = true
+ return 0, ctx.Err()
+ default:
+ // otherwise just run default io.Reader implementation
+ return in.Read(p)
+ }
+ }))
+
+ return
+}
diff --git a/recording/runner.go b/recording/runner.go
index cee4325..a4693fb 100644
--- a/recording/runner.go
+++ b/recording/runner.go
@@ -85,7 +85,7 @@ func doTask(ctx context.Context, task *TaskConfig) error {
for !cancelled {
cancelled = record(recorderCtx, bi, task)
}
- logger.Printf("Task is cancelled. (room %v)\n", task.RoomId)
+ logger.Printf("Task is cancelled. Stop recording. (room %v)\n", task.RoomId)
}()
lastStatusIsLiving = true
case WatcherLiveStop:
@@ -159,7 +159,11 @@ func record(
logger.Printf("Recording live stream to file \"%v\"...", filePath)
err = bi.CopyLiveStream(ctx, task.RoomId, streamSource, file)
- cancelled = false
+ cancelled = err == nil
+ if !cancelled {
+ // real error happens
+ logger.Printf("Error when copying live stream: %v\n", err)
+ }
return
}