1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
|
package bilibili
import (
"context"
"errors"
"fmt"
errs "github.com/keuin/slbr/bilibili/errors"
"github.com/keuin/slbr/common/pretty"
"github.com/keuin/slbr/types"
"io"
"net/http"
"os"
"strings"
"sync/atomic"
"time"
)
const InitReadBytes = 4096 // 4KiB
const progressReportInterval = 30 * time.Second
// CopyLiveStream read data from a livestream video stream, copy them to a writer.
func (b *Bilibili) CopyLiveStream(
ctx context.Context,
roomId types.RoomId,
stream types.StreamingUrlInfo,
fileCreator func() (*os.File, error),
bufSize int64,
) (err error) {
url := stream.URL
if !strings.HasPrefix(url, "https://") &&
!strings.HasPrefix(url, "http://") {
return fmt.Errorf("invalid URL: %v", url)
}
r, err := b.newGet(url)
if err != nil {
b.logger.Error("Cannot create HTTP GET instance on %v: %v", url, err)
return err
}
r.Header.Set("Referer",
fmt.Sprintf("https://live.bilibili.com/blanc/%d?liteVersion=true", roomId))
resp, err := b.Do(r)
if err != nil {
b.logger.Error("Cannot make HTTP GET request on %v: %v\n", url, err)
return
}
// 404 when not streaming
if resp.StatusCode == http.StatusNotFound {
return fmt.Errorf("live is not started or the room does not exist")
}
err = validateHttpStatus(resp)
if err != nil {
return
}
defer func() { _ = resp.Body.Close() }()
b.logger.Info("Waiting for stream initial bytes...")
// read some first bytes to ensure that the live is really started,
// so we don't create blank files if the live room is open
// but the live hasn't started yet
initBytes := make([]byte, InitReadBytes)
startTime := time.Now()
_, err = io.ReadFull(resp.Body, initBytes)
if err != nil {
b.logger.Error("Failed to read stream initial bytes: %v", err)
return
}
b.logger.Info("Stream is started. Receiving live stream...")
// write initial bytes
var out *os.File
out, err = fileCreator()
if err != nil {
b.logger.Error("Cannot open file for writing: %v", err)
err = errs.NewError(errs.FileCreation, err)
return
}
_, err = out.Write(initBytes)
if err != nil {
b.logger.Error("Failed to write to file: %v", err)
return
}
initBytes = nil // discard that buffer
var n atomic.Int64
// print download progress at a steady interval
printTicker := time.NewTicker(progressReportInterval)
stopPrintLoop := make(chan struct{})
go func() {
defer printTicker.Stop()
for {
select {
case <-printTicker.C:
b.logger.Info("Downloaded: %v, duration: %v",
pretty.Bytes(uint64(n.Load())), pretty.Duration(time.Now().Sub(startTime)))
case <-stopPrintLoop:
return
}
}
}()
// blocking copy
copyLoop:
for err == nil {
select {
case <-ctx.Done():
// cancelled
err = ctx.Err()
break copyLoop
default:
var sz int64
sz, err = io.CopyN(out, resp.Body, bufSize)
n.Add(sz)
}
}
close(stopPrintLoop)
if errors.Is(err, context.Canceled) {
b.logger.Info("Stop copying...")
} else if errors.Is(err, io.EOF) {
b.logger.Info("The live is ended. (room %v)", roomId)
} else {
b.logger.Error("Stream copying was interrupted unexpectedly: %v", err)
}
b.logger.Info("Total downloaded: %v", pretty.Bytes(uint64(n.Load())))
return err
}
|