summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--bilibili/client.go36
-rw-r--r--bilibili/request.go12
-rw-r--r--bilibili/request_test.go4
-rw-r--r--bilibili/streaming.go10
-rw-r--r--common/retry.go8
-rw-r--r--recording/runner.go27
-rw-r--r--recording/watcher.go36
7 files changed, 62 insertions, 71 deletions
diff --git a/bilibili/client.go b/bilibili/client.go
index 8097115..f72432a 100644
--- a/bilibili/client.go
+++ b/bilibili/client.go
@@ -5,11 +5,10 @@ This struct is a facade of all necessary Bilibili HTTP API wrappers.
package bilibili
import (
+ "bilibili-livestream-archiver/logging"
"context"
- "log"
"net"
"net/http"
- "os"
)
const (
@@ -21,19 +20,12 @@ const (
type Bilibili struct {
userAgent string
http *http.Client
- loggerCommon
- ctx context.Context
- netTypes []IpNetType
+ ctx context.Context
+ netTypes []IpNetType
+ logger logging.Logger
}
-func NewBilibiliWithContext(ctx context.Context, netTypes []IpNetType) Bilibili {
- logger := loggerCommon{
- debug: log.New(os.Stderr, "DEBUG: ", log.Ldate|log.Ltime|log.Lshortfile),
- info: log.New(os.Stderr, "INFO: ", log.Ldate|log.Ltime|log.Lshortfile),
- warn: log.New(os.Stderr, "WARNING: ", log.Ldate|log.Ltime|log.Lshortfile),
- error: log.New(os.Stderr, "ERROR: ", log.Ldate|log.Ltime|log.Lshortfile),
- }
-
+func NewBilibiliWithContext(ctx context.Context, netTypes []IpNetType, logger logging.Logger) Bilibili {
var nets []IpNetType
nets = append(nets, netTypes...)
if len(nets) == 0 {
@@ -47,19 +39,19 @@ func NewBilibiliWithContext(ctx context.Context, netTypes []IpNetType) Bilibili
transport.DialContext, _ = np.NextNetworkType(dialer)
return Bilibili{
- loggerCommon: logger,
- userAgent: kUserAgent,
- http: http.DefaultClient,
- ctx: ctx,
- netTypes: nets,
+ logger: logger,
+ userAgent: kUserAgent,
+ http: http.DefaultClient,
+ ctx: ctx,
+ netTypes: nets,
}
}
-func NewBilibiliWithNetType(netTypes []IpNetType) Bilibili {
+func NewBilibiliWithNetType(netTypes []IpNetType, logger logging.Logger) Bilibili {
ctx := context.Background()
- return NewBilibiliWithContext(ctx, netTypes)
+ return NewBilibiliWithContext(ctx, netTypes, logger)
}
-func NewBilibili() Bilibili {
- return NewBilibiliWithNetType(nil)
+func NewBilibili(logger logging.Logger) Bilibili {
+ return NewBilibiliWithNetType(nil, logger)
}
diff --git a/bilibili/request.go b/bilibili/request.go
index f9e2088..7de35d3 100644
--- a/bilibili/request.go
+++ b/bilibili/request.go
@@ -18,7 +18,7 @@ func (b Bilibili) newRequest(
) (req *http.Request, err error) {
req, err = http.NewRequestWithContext(b.ctx, method, url, body)
if err != nil {
- b.error.Printf("Cannot create HTTP request instance: %v. Method: %v, URL: %v", err, method, url)
+ b.logger.Error("Cannot create HTTP request instance: %v. Method: %v, URL: %v", err, method, url)
return
}
req.Header.Set("User-Agent", b.userAgent)
@@ -48,24 +48,24 @@ func callGet[T BaseResponse[V], V any](b Bilibili, url string) (resp T, err erro
err = validateHttpStatus(r)
if err != nil {
- b.error.Printf("%v", err)
+ b.logger.Error("%v", err)
return
}
data, err := io.ReadAll(r.Body)
if err != nil {
- b.error.Printf("Error when reading HTTP response on API %v: %v", url, err)
+ b.logger.Error("Error when reading HTTP response on API %v: %v", url, err)
return
}
err = json.Unmarshal(data, &resp)
if err != nil {
- b.error.Printf("Invalid JSON body of HTTP response on API %v: %v. Text: \"%v\"",
+ b.logger.Error("Invalid JSON body of HTTP response on API %v: %v. Text: \"%v\"",
url, err, string(data))
return
}
- b.debug.Printf("HTTP %v, len: %v bytes, url: %v", r.StatusCode, len(data), url)
+ b.logger.Debug("HTTP %v, len: %v bytes, url: %v", r.StatusCode, len(data), url)
return
}
@@ -84,7 +84,7 @@ func (b Bilibili) Do(req *http.Request) (resp *http.Response, err error) {
isAddrErr := common.IsErrorOfType(err, &net.AddrError{})
if err == nil || !isOpErr || !isAddrErr {
// return the first success request
- b.loggerCommon.info.Printf("Request success with network %v.", typeName)
+ b.logger.Info("Request success with network %v.", typeName)
return
}
}
diff --git a/bilibili/request_test.go b/bilibili/request_test.go
index 68e5c7a..58b7127 100644
--- a/bilibili/request_test.go
+++ b/bilibili/request_test.go
@@ -1,12 +1,14 @@
package bilibili
import (
+ "bilibili-livestream-archiver/logging"
+ "log"
"testing"
)
func Test_callGet(t *testing.T) {
// an always-fail request should not panic
- bi := NewBilibili()
+ bi := NewBilibili(logging.NewWrappedLogger(log.Default(), "main"))
_, err := callGet[BaseResponse[struct{}]](bi, "https://256.256.256.256")
if err == nil {
t.Fatalf("the artificial request should fail, but it haven't")
diff --git a/bilibili/streaming.go b/bilibili/streaming.go
index 9a26c2d..7ad852c 100644
--- a/bilibili/streaming.go
+++ b/bilibili/streaming.go
@@ -27,7 +27,7 @@ func (b Bilibili) CopyLiveStream(
r, err := b.newGet(url)
if err != nil {
- b.error.Printf("Cannot create HTTP GET instance on %v: %v", url, err)
+ b.logger.Error("Cannot create HTTP GET instance on %v: %v", url, err)
return err
}
@@ -36,7 +36,7 @@ func (b Bilibili) CopyLiveStream(
resp, err := b.Do(r)
if err != nil {
- b.error.Printf("Cannot make HTTP GET request on %v: %v\n", url, err)
+ b.logger.Error("Cannot make HTTP GET request on %v: %v\n", url, err)
return
}
@@ -56,13 +56,13 @@ func (b Bilibili) CopyLiveStream(
n, err := common.CopyToFileWithBuffer(ctx, out, resp.Body, buffer, readChunkSize, false)
if err != nil && !errors.Is(err, context.Canceled) {
- b.error.Printf("Stream copying was interrupted unexpectedly: %v", err)
+ b.logger.Error("Stream copying was interrupted unexpectedly: %v", err)
}
if err == nil {
- b.info.Printf("The live is ended. (room %v)", roomId)
+ b.logger.Info("The live is ended. (room %v)", roomId)
}
- b.info.Printf("Total downloaded: %v", common.PrettyBytes(uint64(n)))
+ b.logger.Info("Total downloaded: %v", common.PrettyBytes(uint64(n)))
return err
}
diff --git a/common/retry.go b/common/retry.go
index 1751679..647e8ad 100644
--- a/common/retry.go
+++ b/common/retry.go
@@ -1,8 +1,8 @@
package common
import (
+ "bilibili-livestream-archiver/logging"
"context"
- "log"
"time"
)
@@ -15,14 +15,14 @@ func AutoRetry[T any](
supplier func() (T, error),
maxRetryTimes int,
retryInterval time.Duration,
- logger *log.Logger) (T, error) {
+ logger *logging.Logger) (T, error) {
var err error
var ret T
for i := 0; i < maxRetryTimes+1; i++ {
ret, err = supplier()
if err != nil {
if logger != nil {
- logger.Printf("Try %v/%v (sleep %vs): %v",
+ logger.Info("Try %v/%v (sleep %vs): %v",
i, maxRetryTimes, retryInterval, err)
}
timer := time.NewTimer(retryInterval)
@@ -40,7 +40,7 @@ func AutoRetry[T any](
return ret, nil
}
if logger != nil {
- logger.Printf("Max retry times reached, but it still fails. Last error: %v", err)
+ logger.Error("Max retry times reached, but it still fails. Last error: %v", err)
}
var zero T
return zero, err
diff --git a/recording/runner.go b/recording/runner.go
index 98d4432..4f3b1f5 100644
--- a/recording/runner.go
+++ b/recording/runner.go
@@ -12,7 +12,6 @@ import (
"encoding/json"
"errors"
"fmt"
- "log"
"os"
"path"
"time"
@@ -57,7 +56,7 @@ func (t *RunningTask) runTaskWithAutoRestart() error {
func tryRunTask(t *RunningTask) error {
netTypes := t.Transport.AllowedNetworkTypes
t.logger.Info("Network types: %v", netTypes)
- bi := bilibili.NewBilibiliWithNetType(netTypes)
+ bi := bilibili.NewBilibiliWithNetType(netTypes, t.logger)
t.logger.Info("Start task: room %v", t.RoomId)
t.logger.Info("Getting notification server info...")
@@ -135,8 +134,7 @@ func record(
bi bilibili.Bilibili,
task *RunningTask,
) (cancelled bool, err error) {
- logger := log.Default()
- logger.Printf("INFO: Getting room profile...")
+ task.logger.Info("Getting room profile...")
profile, err := common.AutoRetry(
ctx,
@@ -145,7 +143,7 @@ func record(
},
task.Transport.MaxRetryTimes,
time.Duration(task.Transport.RetryIntervalSeconds)*time.Second,
- logger,
+ &task.logger,
)
if errors.Is(err, context.Canceled) {
cancelled = true
@@ -153,12 +151,12 @@ func record(
}
if err != nil {
// still error, abort
- logger.Printf("ERROR: Cannot get room information: %v. Stopping current task.", err)
+ task.logger.Error("Cannot get room information: %v. Stopping current task.", err)
cancelled = true
return
}
- logger.Printf("INFO: Getting stream url...")
+ task.logger.Info("Getting stream url...")
urlInfo, err := common.AutoRetry(
ctx,
func() (bilibili.RoomUrlInfoResponse, error) {
@@ -166,14 +164,14 @@ func record(
},
task.Transport.MaxRetryTimes,
time.Duration(task.Transport.RetryIntervalSeconds)*time.Second,
- logger,
+ &task.logger,
)
if errors.Is(err, context.Canceled) {
cancelled = true
return
}
if err != nil {
- logger.Printf("ERROR: Cannot get streaming info: %v", err)
+ task.logger.Error("Cannot get streaming info: %v", err)
cancelled = true
return
}
@@ -182,7 +180,7 @@ func record(
if err2 != nil {
j = []byte("(not available)")
}
- logger.Printf("ERROR: No stream returned from API. Response: %v", string(j))
+ task.logger.Error("No stream returned from API. Response: %v", string(j))
cancelled = true
return
}
@@ -196,7 +194,7 @@ func record(
filePath := path.Join(task.Download.SaveDirectory, fileName)
file, err := os.OpenFile(filePath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644)
if err != nil {
- logger.Printf("ERROR: Cannot open file for writing: %v", err)
+ task.logger.Error("Cannot open file for writing: %v", err)
cancelled = true
return
}
@@ -210,13 +208,13 @@ func record(
writeBufferSize += kReadChunkSize - mod
}
writeBuffer := make([]byte, writeBufferSize)
- logger.Printf("Write buffer size: %v byte", writeBufferSize)
- logger.Printf("Recording live stream to file \"%v\"...", filePath)
+ task.logger.Info("Write buffer size: %v byte", writeBufferSize)
+ task.logger.Info("Recording live stream to file \"%v\"...", filePath)
err = bi.CopyLiveStream(ctx, task.RoomId, streamSource, file, writeBuffer, kReadChunkSize)
cancelled = err == nil || errors.Is(err, context.Canceled)
if !cancelled {
// real error happens
- logger.Printf("Error when copying live stream: %v", err)
+ task.logger.Error("Error when copying live stream: %v", err)
}
return
}
@@ -248,6 +246,7 @@ func watcherRecoverableLoop(
return resp.Data.LiveStatus.IsStreaming(), nil
},
chWatcherEvent,
+ task.logger,
)
// the context is cancelled, stop watching
diff --git a/recording/watcher.go b/recording/watcher.go
index afc7ab5..0e5d087 100644
--- a/recording/watcher.go
+++ b/recording/watcher.go
@@ -5,10 +5,10 @@ import (
"bilibili-livestream-archiver/danmaku"
"bilibili-livestream-archiver/danmaku/dmmsg"
"bilibili-livestream-archiver/danmaku/dmpkg"
+ "bilibili-livestream-archiver/logging"
"context"
"encoding/json"
"fmt"
- "log"
"time"
)
@@ -53,10 +53,8 @@ func watch(
roomId common.RoomId,
liveStatusChecker func() (bool, error),
chEvent chan<- WatcherEvent,
+ logger logging.Logger,
) (error, ErrorReason) {
-
- logger := log.Default()
-
var err error
dm := danmaku.NewDanmakuClient()
@@ -70,7 +68,7 @@ func watch(
defer func() { _ = dm.Disconnect() }()
// the danmaku server requires an auth token and room id when connected
- logger.Println("ws connected. Authenticating...")
+ logger.Info("ws connected. Authenticating...")
err = dm.Authenticate(roomId, authKey)
if err != nil {
return fmt.Errorf("auth failed: %w", err), ErrProtocol
@@ -92,17 +90,17 @@ func watch(
heartBeatTimer := time.NewTicker(kHeartBeatInterval)
defer func() { heartBeatTimer.Stop() }()
- logger.Println("Checking initial live status...")
+ logger.Info("Checking initial live status...")
isLiving, err := liveStatusChecker()
if err != nil {
return fmt.Errorf("check initial live status failed: %w", err), ErrTransport
}
if isLiving {
- logger.Println("The live is already started. Start recording immediately.")
+ logger.Info("The live is already started. Start recording immediately.")
chEvent <- WatcherLiveStart
} else {
- logger.Println("The live is not started yet. Waiting...")
+ logger.Info("The live is not started yet. Waiting...")
}
for {
@@ -132,7 +130,7 @@ func watch(
var info liveInfo
err := json.Unmarshal(msg.Body, &info)
if err != nil {
- logger.Printf("ERROR: invalid JSON: \"%v\", exchange: %v", string(msg.Body), msg)
+ logger.Error("Invalid JSON: \"%v\", exchange: %v", string(msg.Body), msg)
return fmt.Errorf("decode server message body JSON failed: %w", err), ErrProtocol
}
switch info.Command {
@@ -158,7 +156,7 @@ func watch(
fallthrough
case "HOT_RANK_CHANGED_V2":
// useless message
- logger.Printf("Ignore message: %v", info.Command)
+ logger.Info("Ignore message: %v", info.Command)
case "WATCHED_CHANGE":
// number of watched people changed
obj, exists := info.Data["num"]
@@ -167,39 +165,39 @@ func watch(
}
viewersNum, ok := obj.(float64)
if !ok {
- logger.Printf("Cannot parse watched people number: %v", obj)
+ logger.Error("Cannot parse watched people number: %v", obj)
continue
}
- logger.Printf("The number of viewers (room: %v): %v", roomId, viewersNum)
+ logger.Info("The number of viewers (room: %v): %v", roomId, viewersNum)
case "INTERACT_WORD":
var raw dmmsg.RawInteractWordMessage
err = json.Unmarshal(msg.Body, &raw)
if err != nil {
- logger.Printf("Cannot parse RawInteractWordMessage JSON: %v", err)
+ logger.Error("Cannot parse RawInteractWordMessage JSON: %v", err)
continue
}
- logger.Printf("Interact word message: user: %v medal: %v",
+ logger.Info("Interact word message: user: %v medal: %v",
raw.Data.UserName, raw.Data.FansMedal.Name)
case "DANMU_MSG":
var raw dmmsg.RawDanMuMessage
err = json.Unmarshal(msg.Body, &raw)
if err != nil {
- logger.Printf("Cannot parse Dan Mu message as JSON: %v", err)
+ logger.Error("Cannot parse Dan Mu message as JSON: %v", err)
continue
}
dmm, err := dmmsg.ParseDanmakuMessage(raw)
if err != nil {
- logger.Printf("Cannot parse Dan Mu message JSON: %v", err)
+ logger.Error("Cannot parse Dan Mu message JSON: %v", err)
continue
}
- logger.Printf("Dan Mu: %v", dmm.String())
+ logger.Info("Dan Mu: %v", dmm.String())
default:
- logger.Printf("Ignore unhandled server message %v %v %v",
+ logger.Info("Ignore unhandled server message %v %v %v",
info.Command, msg.Operation, string(msg.Body))
}
}
default:
- logger.Printf("Server message: %v", msg.String())
+ logger.Info("Server message: %v", msg.String())
}
}