summaryrefslogtreecommitdiff
path: root/common/copy.go
blob: 02e33a5ef8089c96427519d9d24520c7fa61e57d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
package common

/*
Copied from https://ixday.github.io/post/golang-cancel-copy/
*/

import (
	"context"
	"fmt"
	"io"
	"os"
	"sync"
)

// CopyToFileWithBuffer copies data from io.Reader to os.File with given buffer and read chunk size.
// The reader and writer may not be synchronized if any error occurs.
// A block contains one or more chunks.
// Every syscall reads at most one chunk.
// Every disk-write writes at most one block.
// The buffer is a ring contains one or more blocks.
// The reader and file won't be closed.
// If syncFile is set, the file will be synced after every read.
// ringSize: how many blocks are in the buffer ring.
func CopyToFileWithBuffer(
	ctx context.Context,
	out *os.File,
	in io.Reader,
	buffer []byte,
	syncFile bool,
	ringSize uint,
) (written int64, err error) {
	var blkSize uint
	bufSize := uint(len(buffer))
	if bufSize%ringSize != 0 {
		err = fmt.Errorf("len(buffer) %% ringSize != 0")
	} else {
		blkSize = bufSize / ringSize
	}

	chWriteQue := make(chan uint, ringSize) // buffer write task queue
	chReadQue := make(chan uint, ringSize)  // buffer read task queue

	// when reader and writer are stopped, this channel must have 2 elements
	chResult := make(chan error, 2)

	type task struct {
		Offset uint
		Length uint
	}

	chLastWrite := make(chan task, 1)

	rwCtx, cancelReadWrite := context.WithCancel(ctx)

	// wait reader and writer to finish
	wg := sync.WaitGroup{}

	// buffer reader
	// buffer -> file
	go func() {
		var err error
		wg.Add(1)
		defer wg.Done()
		defer func() {
			chResult <- err
		}()
		for {
			select {
			case <-rwCtx.Done():
				return
			default:
				// get the next available block to read from
				// this block is fully written in the buffer
				off := <-chReadQue

				// read the entire block to file
				n, err2 := out.Write(buffer[off : off+blkSize])
				if err2 != nil {
					// failed to write to the file
					// we can do nothing more than stop reading and writing
					err = fmt.Errorf("write error (%v byte written in this call): %w", n, err2)
					cancelReadWrite()
					return
				}
			}
		}
	}()

	// buffer writer
	// reader -> buffer
	go func() {
		var err error
		wg.Add(1)
		defer wg.Done()
		defer func() {
			chResult <- err
		}()
		for {
			select {
			case <-rwCtx.Done():
				chLastWrite <- task{
					Offset: 0,
					Length: 0,
				}
				return
			default:
				// get the next free block for writing
				off := <-chWriteQue // byte offset

				// fully fill the block
				nWritten := uint(0) // bytes written in this block currently
				for nWritten < blkSize {
					n, err := in.Read(buffer[off+nWritten : off+blkSize])
					nWritten += uint(n)
					if err != nil {
						// if we can't fully fill current block (e.g. EOF or IO error),
						// set nLeft to make the main goroutine handle that
						cancelReadWrite()
						err = fmt.Errorf("reader failed: %w", err)
						chLastWrite <- task{
							Offset: off,
							Length: nWritten, // bytes of valid data in the last incomplete block
						}
						return
					}
				}
			}
		}
	}()

	// init write tasks, all buffer blocks are available for writing initially
	for i := uint(0); i < bufSize; i += blkSize {
		chWriteQue <- i
	}

	wg.Wait()

	for i := 0; i < 2; i++ {
		if err2 := <-chResult; err2 != nil {
			err = err2
			return
		}
	}

	// write the remaining data
	last := <-chLastWrite
	if last.Length > 0 {
		var n int
		n, err = out.Write(buffer[last.Offset : last.Offset+last.Length])
		written += int64(n)
	}
	return
}