summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKeuin <[email protected]>2022-09-09 02:30:19 +0800
committerKeuin <[email protected]>2022-09-09 02:30:41 +0800
commitf028bff042f471a68dff681af9c79ef96bc952e5 (patch)
tree40763feb1d0ec05260e56d6822622462b35b165a
parent719946a8211f3c8c68234a7c9e9c5af0226386aa (diff)
Fix file buffer does not take effect. No idea why golang's io utility is so suck. Use ad-hoc buffered copy loop instead.
-rw-r--r--bilibili/streaming.go8
-rw-r--r--common/copy.go78
-rw-r--r--common/minmax.go27
-rw-r--r--go.mod3
-rw-r--r--go.sum6
-rw-r--r--recording/runner.go25
6 files changed, 100 insertions, 47 deletions
diff --git a/bilibili/streaming.go b/bilibili/streaming.go
index e8a6911..64dc26d 100644
--- a/bilibili/streaming.go
+++ b/bilibili/streaming.go
@@ -5,8 +5,8 @@ import (
"context"
"errors"
"fmt"
- "io"
"net/http"
+ "os"
"strings"
)
@@ -15,7 +15,9 @@ func (b Bilibili) CopyLiveStream(
ctx context.Context,
roomId common.RoomId,
stream StreamingUrlInfo,
- out io.Writer,
+ out *os.File,
+ buffer []byte,
+ readChunkSize int,
) (err error) {
url := stream.URL
if !strings.HasPrefix(url, "https://") &&
@@ -60,7 +62,7 @@ func (b Bilibili) CopyLiveStream(
defer cancelGuardian()
// blocking copy
- n, err := common.Copy(ctx, out, resp.Body)
+ n, err := common.CopyToFileWithBuffer(ctx, out, resp.Body, buffer, readChunkSize, false)
if err != nil && !errors.Is(err, context.Canceled) {
// real error happens
diff --git a/common/copy.go b/common/copy.go
index 6f47a62..bee6515 100644
--- a/common/copy.go
+++ b/common/copy.go
@@ -7,36 +7,58 @@ Copied from https://ixday.github.io/post/golang-cancel-copy/
import (
"context"
"io"
+ "os"
)
-// here is some syntax sugar inspired by the Tomas Senart's video,
-// it allows me to inline the Reader interface
-type readerFunc func(p []byte) (n int, err error)
-
-func (rf readerFunc) Read(p []byte) (n int, err error) { return rf(p) }
-
-// Copy slightly modified function signature:
-// - context has been added in order to propagate cancellation
-// - (undo by Keuin) I do not return the number of bytes written, has it is not useful in my use case
-func Copy(ctx context.Context, out io.Writer, in io.Reader) (written int64, err error) {
- // Copy will call the Reader and Writer interface multiple time, in order
- // to copy by chunk (avoiding loading the whole file in memory).
- // I insert the ability to cancel before read time as it is the earliest
- // possible in the call process.
- written, err = io.Copy(out, readerFunc(func(p []byte) (int, error) {
-
- // golang non-blocking channel: https://gobyexample.com/non-blocking-channel-operations
- select {
-
- // if context has been canceled
- case <-ctx.Done():
- // stop process and propagate "context canceled" error
- return 0, ctx.Err()
- default:
- // otherwise just run default io.Reader implementation
- return in.Read(p)
+// CopyToFileWithBuffer copies data from io.Reader to os.File with given buffer and read chunk size.
+// The reader and file won't be closed.
+// If syncFile is set, the file will be synced after every read.
+func CopyToFileWithBuffer(
+ ctx context.Context,
+ out *os.File,
+ in io.Reader,
+ buffer []byte,
+ chunkSize int,
+ syncFile bool,
+) (written int64, err error) {
+ bufSize := len(buffer)
+ off := 0 // offset to the end of data in buffer
+ nRead := 0 // how many bytes were read in the last read
+ defer func() {
+ if off+nRead > 0 {
+ // write unwritten data in buffer
+ nWrite, _ := out.Write(buffer[:off+nRead])
+ written += int64(nWrite)
+ if syncFile {
+ _ = out.Sync()
+ }
}
- }))
+ }()
- return
+ for {
+ if err = ctx.Err(); err != nil {
+ return
+ }
+ nRead, err = in.Read(buffer[off:Min[int](off+chunkSize, bufSize)])
+ if err != nil {
+ return
+ }
+ off += nRead
+ if off == bufSize {
+ // buffer is full
+ var nWritten int
+ nWritten, err = out.Write(buffer)
+ if err != nil {
+ return
+ }
+ if syncFile {
+ err = out.Sync()
+ if err != nil {
+ return
+ }
+ }
+ written += int64(nWritten)
+ off = 0
+ }
+ }
}
diff --git a/common/minmax.go b/common/minmax.go
new file mode 100644
index 0000000..b670887
--- /dev/null
+++ b/common/minmax.go
@@ -0,0 +1,27 @@
+package common
+
+/*
+Golang is a piece of shit. Its creators are paranoids.
+*/
+
+import (
+ "golang.org/x/exp/constraints"
+)
+
+type Number interface {
+ constraints.Integer | constraints.Float
+}
+
+func Min[T Number](t1 T, t2 T) T {
+ if t1 < t2 {
+ return t1
+ }
+ return t2
+}
+
+func Max[T Number](t1 T, t2 T) T {
+ if t1 > t2 {
+ return t1
+ }
+ return t2
+}
diff --git a/go.mod b/go.mod
index d6c3e54..670d252 100644
--- a/go.mod
+++ b/go.mod
@@ -7,6 +7,7 @@ require (
github.com/andybalholm/brotli v1.0.4
github.com/lunixbochs/struc v0.0.0-20200707160740-784aaebc1d40
github.com/spf13/viper v1.13.0
+ golang.org/x/exp v0.0.0-20220907003533-145caa8ea1d0
nhooyr.io/websocket v1.8.7
)
@@ -23,7 +24,7 @@ require (
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/subosito/gotenv v1.4.1 // indirect
- golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect
+ golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect
golang.org/x/text v0.3.7 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
diff --git a/go.sum b/go.sum
index f640eed..b34874f 100644
--- a/go.sum
+++ b/go.sum
@@ -238,6 +238,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0
golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4=
golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM=
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU=
+golang.org/x/exp v0.0.0-20220907003533-145caa8ea1d0 h1:17k44ji3KFYG94XS5QEFC8pyuOlMh3IoR+vkmTZmJJs=
+golang.org/x/exp v0.0.0-20220907003533-145caa8ea1d0/go.mod h1:cyybsKvd6eL0RnXn6p/Grxp8F5bW7iYuBgsNCOHpMYE=
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
@@ -347,8 +349,8 @@ golang.org/x/sys v0.0.0-20210225134936-a50acf3fe073/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a h1:dGzPydgVsqGcTRVwiLJ1jVbufYwmzD3LfVPLKsKg+0k=
-golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f h1:v4INt8xihDGvnrfjMDVXGxw9wrfxYyCjk0KbXjhR55s=
+golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
diff --git a/recording/runner.go b/recording/runner.go
index 974fa0e..2f15abd 100644
--- a/recording/runner.go
+++ b/recording/runner.go
@@ -8,7 +8,6 @@ package recording
import (
"bilibili-livestream-archiver/bilibili"
"bilibili-livestream-archiver/common"
- "bufio"
"context"
"encoding/json"
"errors"
@@ -26,6 +25,8 @@ type TaskResult struct {
Error error
}
+const kReadChunkSize = 64 * 1024
+
// RunTask start a monitor&download task and
// put its execution result into a channel.
func RunTask(ctx context.Context, wg *sync.WaitGroup, task *TaskConfig) {
@@ -189,19 +190,17 @@ func record(
}
defer func() { _ = file.Close() }()
- // buffered writer
- fWriter := bufio.NewWriterSize(file, task.Download.DiskWriteBufferBytes)
- defer func() {
- err := fWriter.Flush()
- if err != nil {
- logger := log.Default()
- logger.Printf("Failed to flush buffered file write data: %v", err)
- }
- }()
- logger.Printf("Write buffer size: %v byte", fWriter.Size())
-
+ writeBufferSize := task.Download.DiskWriteBufferBytes
+ if writeBufferSize < kReadChunkSize {
+ writeBufferSize = kReadChunkSize
+ }
+ if mod := writeBufferSize % kReadChunkSize; mod != 0 {
+ writeBufferSize += kReadChunkSize - mod
+ }
+ writeBuffer := make([]byte, writeBufferSize)
+ logger.Printf("Write buffer size: %v byte", writeBufferSize)
logger.Printf("Recording live stream to file \"%v\"...", filePath)
- err = bi.CopyLiveStream(ctx, task.RoomId, streamSource, fWriter)
+ err = bi.CopyLiveStream(ctx, task.RoomId, streamSource, file, writeBuffer, kReadChunkSize)
cancelled = err == nil || errors.Is(err, context.Canceled)
if !cancelled {
// real error happens