summaryrefslogtreecommitdiff
path: root/bilibili/streaming.go
blob: c1228ef5909806e66d473f3d12e1a29c75c9d8e5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package bilibili

import (
	"context"
	"errors"
	"fmt"
	errs "github.com/keuin/slbr/bilibili/errors"
	"github.com/keuin/slbr/common/pretty"
	"github.com/keuin/slbr/types"
	"io"
	"net/http"
	"os"
	"strings"
	"sync/atomic"
	"time"
)

const InitReadBytes = 4096 // 4KiB
const progressReportInterval = 30 * time.Second

// CopyLiveStream read data from a livestream video stream, copy them to a writer.
func (b *Bilibili) CopyLiveStream(
	ctx context.Context,
	roomId types.RoomId,
	stream types.StreamingUrlInfo,
	fileCreator func() (*os.File, error),
	bufSize int64,
) (err error) {
	url := stream.URL
	if !strings.HasPrefix(url, "https://") &&
		!strings.HasPrefix(url, "http://") {
		return fmt.Errorf("invalid URL: %v", url)
	}

	r, err := b.newGet(url)
	if err != nil {
		b.logger.Error("Cannot create HTTP GET instance on %v: %v", url, err)
		return err
	}

	r.Header.Set("Referer",
		fmt.Sprintf("https://live.bilibili.com/blanc/%d?liteVersion=true", roomId))

	resp, err := b.Do(r)
	if err != nil {
		b.logger.Error("Cannot make HTTP GET request on %v: %v\n", url, err)
		return
	}

	// 404 when not streaming
	if resp.StatusCode == http.StatusNotFound {
		return fmt.Errorf("live is not started or the room does not exist")
	}

	err = validateHttpStatus(resp)
	if err != nil {
		return
	}

	defer func() { _ = resp.Body.Close() }()

	b.logger.Info("Waiting for stream initial bytes...")
	// read some first bytes to ensure that the live is really started,
	// so we don't create blank files if the live room is open
	// but the live hasn't started yet
	initBytes := make([]byte, InitReadBytes)
	startTime := time.Now()
	_, err = io.ReadFull(resp.Body, initBytes)
	if err != nil {
		b.logger.Error("Failed to read stream initial bytes: %v", err)
		return
	}
	b.logger.Info("Stream is started. Receiving live stream...")
	// write initial bytes
	var out *os.File
	out, err = fileCreator()
	if err != nil {
		b.logger.Error("Cannot open file for writing: %v", err)
		err = errs.NewError(errs.FileCreation, err)
		return
	}
	_, err = out.Write(initBytes)
	if err != nil {
		b.logger.Error("Failed to write to file: %v", err)
		return
	}
	initBytes = nil // discard that buffer

	var n atomic.Int64

	// print download progress at a steady interval
	printTicker := time.NewTicker(progressReportInterval)
	stopPrintLoop := make(chan struct{})
	go func() {
		defer printTicker.Stop()
		for {
			select {
			case <-printTicker.C:
				b.logger.Info("Downloaded: %v, duration: %v",
					pretty.Bytes(uint64(n.Load())), pretty.Duration(time.Now().Sub(startTime)))
			case <-stopPrintLoop:
				return
			}
		}
	}()

	// blocking copy
copyLoop:
	for err == nil {
		select {
		case <-ctx.Done():
			// cancelled
			err = ctx.Err()
			break copyLoop
		default:
			var sz int64
			sz, err = io.CopyN(out, resp.Body, bufSize)
			n.Add(sz)
		}
	}

	close(stopPrintLoop)

	if errors.Is(err, context.Canceled) {
		b.logger.Info("Stop copying...")
	} else if errors.Is(err, io.EOF) {
		b.logger.Info("The live is ended. (room %v)", roomId)
	} else {
		b.logger.Error("Stream copying was interrupted unexpectedly: %v", err)
	}

	b.logger.Info("Total downloaded: %v", pretty.Bytes(uint64(n.Load())))
	return err
}