diff options
author | Keuin <[email protected]> | 2022-09-15 00:58:10 +0800 |
---|---|---|
committer | Keuin <[email protected]> | 2022-09-15 01:19:02 +0800 |
commit | 4e742159af79cf0f6a14da630a362a0344a5a121 (patch) | |
tree | 11b09ae5100158198906a69c9e8214d8286d3893 | |
parent | 1a2cd1e08b5f63814acbddf453be523fcca8bad3 (diff) |
Make copy and write IO operations async.
-rw-r--r-- | bilibili/streaming.go | 2 | ||||
-rw-r--r-- | common/copy.go | 150 | ||||
-rw-r--r-- | recording/runner.go | 2 |
3 files changed, 120 insertions, 34 deletions
diff --git a/bilibili/streaming.go b/bilibili/streaming.go index ea9207c..0bda347 100644 --- a/bilibili/streaming.go +++ b/bilibili/streaming.go @@ -54,7 +54,7 @@ func (b Bilibili) CopyLiveStream( b.logger.Info("Copying live stream...") // blocking copy - n, err := common.CopyToFileWithBuffer(ctx, out, resp.Body, buffer, readChunkSize, false) + n, err := common.CopyToFileWithBuffer(ctx, out, resp.Body, buffer, false, uint(len(buffer)/readChunkSize)) if err != nil && !errors.Is(err, context.Canceled) { b.logger.Error("Stream copying was interrupted unexpectedly: %v", err) diff --git a/common/copy.go b/common/copy.go index 27bc9ad..02e33a5 100644 --- a/common/copy.go +++ b/common/copy.go @@ -6,62 +6,148 @@ Copied from https://ixday.github.io/post/golang-cancel-copy/ import ( "context" + "fmt" "io" "os" + "sync" ) // CopyToFileWithBuffer copies data from io.Reader to os.File with given buffer and read chunk size. +// The reader and writer may not be synchronized if any error occurs. +// A block contains one or more chunks. +// Every syscall reads at most one chunk. +// Every disk-write writes at most one block. +// The buffer is a ring contains one or more blocks. // The reader and file won't be closed. // If syncFile is set, the file will be synced after every read. +// ringSize: how many blocks are in the buffer ring. func CopyToFileWithBuffer( ctx context.Context, out *os.File, in io.Reader, buffer []byte, - chunkSize int, syncFile bool, + ringSize uint, ) (written int64, err error) { - bufSize := len(buffer) - off := 0 // offset to the end of data in buffer - nRead := 0 // how many bytes were read in the last read - defer func() { - if off+nRead > 0 { - // write unwritten data in buffer - nWrite, _ := out.Write(buffer[:off+nRead]) - written += int64(nWrite) - if syncFile { - _ = out.Sync() + var blkSize uint + bufSize := uint(len(buffer)) + if bufSize%ringSize != 0 { + err = fmt.Errorf("len(buffer) %% ringSize != 0") + } else { + blkSize = bufSize / ringSize + } + + chWriteQue := make(chan uint, ringSize) // buffer write task queue + chReadQue := make(chan uint, ringSize) // buffer read task queue + + // when reader and writer are stopped, this channel must have 2 elements + chResult := make(chan error, 2) + + type task struct { + Offset uint + Length uint + } + + chLastWrite := make(chan task, 1) + + rwCtx, cancelReadWrite := context.WithCancel(ctx) + + // wait reader and writer to finish + wg := sync.WaitGroup{} + + // buffer reader + // buffer -> file + go func() { + var err error + wg.Add(1) + defer wg.Done() + defer func() { + chResult <- err + }() + for { + select { + case <-rwCtx.Done(): + return + default: + // get the next available block to read from + // this block is fully written in the buffer + off := <-chReadQue + + // read the entire block to file + n, err2 := out.Write(buffer[off : off+blkSize]) + if err2 != nil { + // failed to write to the file + // we can do nothing more than stop reading and writing + err = fmt.Errorf("write error (%v byte written in this call): %w", n, err2) + cancelReadWrite() + return + } } } }() - for { - select { - case <-ctx.Done(): - err = ctx.Err() - return - default: - nRead, err = in.Read(buffer[off:Min[int](off+chunkSize, bufSize)]) - if err != nil { - return - } - off += nRead - if off == bufSize { - // buffer is full - var nWritten int - nWritten, err = out.Write(buffer) - if err != nil { - return + // buffer writer + // reader -> buffer + go func() { + var err error + wg.Add(1) + defer wg.Done() + defer func() { + chResult <- err + }() + for { + select { + case <-rwCtx.Done(): + chLastWrite <- task{ + Offset: 0, + Length: 0, } - if syncFile { - err = out.Sync() + return + default: + // get the next free block for writing + off := <-chWriteQue // byte offset + + // fully fill the block + nWritten := uint(0) // bytes written in this block currently + for nWritten < blkSize { + n, err := in.Read(buffer[off+nWritten : off+blkSize]) + nWritten += uint(n) if err != nil { + // if we can't fully fill current block (e.g. EOF or IO error), + // set nLeft to make the main goroutine handle that + cancelReadWrite() + err = fmt.Errorf("reader failed: %w", err) + chLastWrite <- task{ + Offset: off, + Length: nWritten, // bytes of valid data in the last incomplete block + } return } } - written += int64(nWritten) - off = 0 } } + }() + + // init write tasks, all buffer blocks are available for writing initially + for i := uint(0); i < bufSize; i += blkSize { + chWriteQue <- i + } + + wg.Wait() + + for i := 0; i < 2; i++ { + if err2 := <-chResult; err2 != nil { + err = err2 + return + } + } + + // write the remaining data + last := <-chLastWrite + if last.Length > 0 { + var n int + n, err = out.Write(buffer[last.Offset : last.Offset+last.Length]) + written += int64(n) } + return } diff --git a/recording/runner.go b/recording/runner.go index fe518b8..7fdffac 100644 --- a/recording/runner.go +++ b/recording/runner.go @@ -26,7 +26,7 @@ type TaskResult struct { Error error } -const kReadChunkSize = 128 * 1024 +const kReadChunkSize = 1024 * 1024 const kSpecialExtName = "partial" var errLiveEnded = NewRecoverableTaskError("live is ended", nil) |