1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
|
package common
/*
Copied from https://ixday.github.io/post/golang-cancel-copy/
*/
import (
"context"
"fmt"
"io"
"os"
"sync"
)
// CopyToFileWithBuffer copies data from io.Reader to os.File with given buffer and read chunk size.
// The reader and writer may not be synchronized if any error occurs.
// A block contains one or more chunks.
// Every syscall reads at most one chunk.
// Every disk-write writes at most one block.
// The buffer is a ring contains one or more blocks.
// The reader and file won't be closed.
// If syncFile is set, the file will be synced after every read.
// ringSize: how many blocks are in the buffer ring.
func CopyToFileWithBuffer(
ctx context.Context,
out *os.File,
in io.Reader,
buffer []byte,
syncFile bool,
ringSize uint,
) (written int64, err error) {
var blkSize uint
bufSize := uint(len(buffer))
if bufSize%ringSize != 0 {
err = fmt.Errorf("len(buffer) %% ringSize != 0")
} else {
blkSize = bufSize / ringSize
}
chWriteQue := make(chan uint, ringSize) // buffer write task queue
chReadQue := make(chan uint, ringSize) // buffer read task queue
// when reader and writer are stopped, this channel must have 2 elements
chResult := make(chan error, 2)
type task struct {
Offset uint
Length uint
}
chLastWrite := make(chan task, 1)
rwCtx, cancelReadWrite := context.WithCancel(ctx)
// wait reader and writer to finish
wg := sync.WaitGroup{}
// buffer reader
// buffer -> file
go func() {
var err error
wg.Add(1)
defer wg.Done()
defer func() {
chResult <- err
}()
for {
select {
case <-rwCtx.Done():
return
default:
// get the next available block to read from
// this block is fully written in the buffer
off := <-chReadQue
// read the entire block to file
n, err2 := out.Write(buffer[off : off+blkSize])
if err2 != nil {
// failed to write to the file
// we can do nothing more than stop reading and writing
err = fmt.Errorf("write error (%v byte written in this call): %w", n, err2)
cancelReadWrite()
return
}
}
}
}()
// buffer writer
// reader -> buffer
go func() {
var err error
wg.Add(1)
defer wg.Done()
defer func() {
chResult <- err
}()
for {
select {
case <-rwCtx.Done():
chLastWrite <- task{
Offset: 0,
Length: 0,
}
return
default:
// get the next free block for writing
off := <-chWriteQue // byte offset
// fully fill the block
nWritten := uint(0) // bytes written in this block currently
for nWritten < blkSize {
n, err := in.Read(buffer[off+nWritten : off+blkSize])
nWritten += uint(n)
if err != nil {
// if we can't fully fill current block (e.g. EOF or IO error),
// set nLeft to make the main goroutine handle that
cancelReadWrite()
err = fmt.Errorf("reader failed: %w", err)
chLastWrite <- task{
Offset: off,
Length: nWritten, // bytes of valid data in the last incomplete block
}
return
}
}
}
}
}()
// init write tasks, all buffer blocks are available for writing initially
for i := uint(0); i < bufSize; i += blkSize {
chWriteQue <- i
}
wg.Wait()
for i := 0; i < 2; i++ {
if err2 := <-chResult; err2 != nil {
err = err2
return
}
}
// write the remaining data
last := <-chLastWrite
if last.Length > 0 {
var n int
n, err = out.Write(buffer[last.Offset : last.Offset+last.Length])
written += int64(n)
}
return
}
|