From f028bff042f471a68dff681af9c79ef96bc952e5 Mon Sep 17 00:00:00 2001 From: Keuin Date: Fri, 9 Sep 2022 02:30:19 +0800 Subject: Fix file buffer does not take effect. No idea why golang's io utility is so suck. Use ad-hoc buffered copy loop instead. --- bilibili/streaming.go | 8 ++++-- common/copy.go | 78 +++++++++++++++++++++++++++++++++------------------ common/minmax.go | 27 ++++++++++++++++++ go.mod | 3 +- go.sum | 6 ++-- recording/runner.go | 25 ++++++++--------- 6 files changed, 100 insertions(+), 47 deletions(-) create mode 100644 common/minmax.go diff --git a/bilibili/streaming.go b/bilibili/streaming.go index e8a6911..64dc26d 100644 --- a/bilibili/streaming.go +++ b/bilibili/streaming.go @@ -5,8 +5,8 @@ import ( "context" "errors" "fmt" - "io" "net/http" + "os" "strings" ) @@ -15,7 +15,9 @@ func (b Bilibili) CopyLiveStream( ctx context.Context, roomId common.RoomId, stream StreamingUrlInfo, - out io.Writer, + out *os.File, + buffer []byte, + readChunkSize int, ) (err error) { url := stream.URL if !strings.HasPrefix(url, "https://") && @@ -60,7 +62,7 @@ func (b Bilibili) CopyLiveStream( defer cancelGuardian() // blocking copy - n, err := common.Copy(ctx, out, resp.Body) + n, err := common.CopyToFileWithBuffer(ctx, out, resp.Body, buffer, readChunkSize, false) if err != nil && !errors.Is(err, context.Canceled) { // real error happens diff --git a/common/copy.go b/common/copy.go index 6f47a62..bee6515 100644 --- a/common/copy.go +++ b/common/copy.go @@ -7,36 +7,58 @@ Copied from https://ixday.github.io/post/golang-cancel-copy/ import ( "context" "io" + "os" ) -// here is some syntax sugar inspired by the Tomas Senart's video, -// it allows me to inline the Reader interface -type readerFunc func(p []byte) (n int, err error) - -func (rf readerFunc) Read(p []byte) (n int, err error) { return rf(p) } - -// Copy slightly modified function signature: -// - context has been added in order to propagate cancellation -// - (undo by Keuin) I do not return the number of bytes written, has it is not useful in my use case -func Copy(ctx context.Context, out io.Writer, in io.Reader) (written int64, err error) { - // Copy will call the Reader and Writer interface multiple time, in order - // to copy by chunk (avoiding loading the whole file in memory). - // I insert the ability to cancel before read time as it is the earliest - // possible in the call process. - written, err = io.Copy(out, readerFunc(func(p []byte) (int, error) { - - // golang non-blocking channel: https://gobyexample.com/non-blocking-channel-operations - select { - - // if context has been canceled - case <-ctx.Done(): - // stop process and propagate "context canceled" error - return 0, ctx.Err() - default: - // otherwise just run default io.Reader implementation - return in.Read(p) +// CopyToFileWithBuffer copies data from io.Reader to os.File with given buffer and read chunk size. +// The reader and file won't be closed. +// If syncFile is set, the file will be synced after every read. +func CopyToFileWithBuffer( + ctx context.Context, + out *os.File, + in io.Reader, + buffer []byte, + chunkSize int, + syncFile bool, +) (written int64, err error) { + bufSize := len(buffer) + off := 0 // offset to the end of data in buffer + nRead := 0 // how many bytes were read in the last read + defer func() { + if off+nRead > 0 { + // write unwritten data in buffer + nWrite, _ := out.Write(buffer[:off+nRead]) + written += int64(nWrite) + if syncFile { + _ = out.Sync() + } } - })) + }() - return + for { + if err = ctx.Err(); err != nil { + return + } + nRead, err = in.Read(buffer[off:Min[int](off+chunkSize, bufSize)]) + if err != nil { + return + } + off += nRead + if off == bufSize { + // buffer is full + var nWritten int + nWritten, err = out.Write(buffer) + if err != nil { + return + } + if syncFile { + err = out.Sync() + if err != nil { + return + } + } + written += int64(nWritten) + off = 0 + } + } } diff --git a/common/minmax.go b/common/minmax.go new file mode 100644 index 0000000..b670887 --- /dev/null +++ b/common/minmax.go @@ -0,0 +1,27 @@ +package common + +/* +Golang is a piece of shit. Its creators are paranoids. +*/ + +import ( + "golang.org/x/exp/constraints" +) + +type Number interface { + constraints.Integer | constraints.Float +} + +func Min[T Number](t1 T, t2 T) T { + if t1 < t2 { + return t1 + } + return t2 +} + +func Max[T Number](t1 T, t2 T) T { + if t1 > t2 { + return t1 + } + return t2 +} diff --git a/go.mod b/go.mod index d6c3e54..670d252 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/andybalholm/brotli v1.0.4 github.com/lunixbochs/struc v0.0.0-20200707160740-784aaebc1d40 github.com/spf13/viper v1.13.0 + golang.org/x/exp v0.0.0-20220907003533-145caa8ea1d0 nhooyr.io/websocket v1.8.7 ) @@ -23,7 +24,7 @@ require ( github.com/spf13/jwalterweatherman v1.1.0 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/subosito/gotenv v1.4.1 // indirect - golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect + golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect golang.org/x/text v0.3.7 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect diff --git a/go.sum b/go.sum index f640eed..b34874f 100644 --- a/go.sum +++ b/go.sum @@ -238,6 +238,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= +golang.org/x/exp v0.0.0-20220907003533-145caa8ea1d0 h1:17k44ji3KFYG94XS5QEFC8pyuOlMh3IoR+vkmTZmJJs= +golang.org/x/exp v0.0.0-20220907003533-145caa8ea1d0/go.mod h1:cyybsKvd6eL0RnXn6p/Grxp8F5bW7iYuBgsNCOHpMYE= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= @@ -347,8 +349,8 @@ golang.org/x/sys v0.0.0-20210225134936-a50acf3fe073/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a h1:dGzPydgVsqGcTRVwiLJ1jVbufYwmzD3LfVPLKsKg+0k= -golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f h1:v4INt8xihDGvnrfjMDVXGxw9wrfxYyCjk0KbXjhR55s= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/recording/runner.go b/recording/runner.go index 974fa0e..2f15abd 100644 --- a/recording/runner.go +++ b/recording/runner.go @@ -8,7 +8,6 @@ package recording import ( "bilibili-livestream-archiver/bilibili" "bilibili-livestream-archiver/common" - "bufio" "context" "encoding/json" "errors" @@ -26,6 +25,8 @@ type TaskResult struct { Error error } +const kReadChunkSize = 64 * 1024 + // RunTask start a monitor&download task and // put its execution result into a channel. func RunTask(ctx context.Context, wg *sync.WaitGroup, task *TaskConfig) { @@ -189,19 +190,17 @@ func record( } defer func() { _ = file.Close() }() - // buffered writer - fWriter := bufio.NewWriterSize(file, task.Download.DiskWriteBufferBytes) - defer func() { - err := fWriter.Flush() - if err != nil { - logger := log.Default() - logger.Printf("Failed to flush buffered file write data: %v", err) - } - }() - logger.Printf("Write buffer size: %v byte", fWriter.Size()) - + writeBufferSize := task.Download.DiskWriteBufferBytes + if writeBufferSize < kReadChunkSize { + writeBufferSize = kReadChunkSize + } + if mod := writeBufferSize % kReadChunkSize; mod != 0 { + writeBufferSize += kReadChunkSize - mod + } + writeBuffer := make([]byte, writeBufferSize) + logger.Printf("Write buffer size: %v byte", writeBufferSize) logger.Printf("Recording live stream to file \"%v\"...", filePath) - err = bi.CopyLiveStream(ctx, task.RoomId, streamSource, fWriter) + err = bi.CopyLiveStream(ctx, task.RoomId, streamSource, file, writeBuffer, kReadChunkSize) cancelled = err == nil || errors.Is(err, context.Canceled) if !cancelled { // real error happens -- cgit v1.2.3