diff options
-rw-r--r-- | bilibili/client.go | 36 | ||||
-rw-r--r-- | bilibili/request.go | 12 | ||||
-rw-r--r-- | bilibili/request_test.go | 4 | ||||
-rw-r--r-- | bilibili/streaming.go | 10 | ||||
-rw-r--r-- | common/retry.go | 8 | ||||
-rw-r--r-- | recording/runner.go | 27 | ||||
-rw-r--r-- | recording/watcher.go | 36 |
7 files changed, 62 insertions, 71 deletions
diff --git a/bilibili/client.go b/bilibili/client.go index 8097115..f72432a 100644 --- a/bilibili/client.go +++ b/bilibili/client.go @@ -5,11 +5,10 @@ This struct is a facade of all necessary Bilibili HTTP API wrappers. package bilibili import ( + "bilibili-livestream-archiver/logging" "context" - "log" "net" "net/http" - "os" ) const ( @@ -21,19 +20,12 @@ const ( type Bilibili struct { userAgent string http *http.Client - loggerCommon - ctx context.Context - netTypes []IpNetType + ctx context.Context + netTypes []IpNetType + logger logging.Logger } -func NewBilibiliWithContext(ctx context.Context, netTypes []IpNetType) Bilibili { - logger := loggerCommon{ - debug: log.New(os.Stderr, "DEBUG: ", log.Ldate|log.Ltime|log.Lshortfile), - info: log.New(os.Stderr, "INFO: ", log.Ldate|log.Ltime|log.Lshortfile), - warn: log.New(os.Stderr, "WARNING: ", log.Ldate|log.Ltime|log.Lshortfile), - error: log.New(os.Stderr, "ERROR: ", log.Ldate|log.Ltime|log.Lshortfile), - } - +func NewBilibiliWithContext(ctx context.Context, netTypes []IpNetType, logger logging.Logger) Bilibili { var nets []IpNetType nets = append(nets, netTypes...) if len(nets) == 0 { @@ -47,19 +39,19 @@ func NewBilibiliWithContext(ctx context.Context, netTypes []IpNetType) Bilibili transport.DialContext, _ = np.NextNetworkType(dialer) return Bilibili{ - loggerCommon: logger, - userAgent: kUserAgent, - http: http.DefaultClient, - ctx: ctx, - netTypes: nets, + logger: logger, + userAgent: kUserAgent, + http: http.DefaultClient, + ctx: ctx, + netTypes: nets, } } -func NewBilibiliWithNetType(netTypes []IpNetType) Bilibili { +func NewBilibiliWithNetType(netTypes []IpNetType, logger logging.Logger) Bilibili { ctx := context.Background() - return NewBilibiliWithContext(ctx, netTypes) + return NewBilibiliWithContext(ctx, netTypes, logger) } -func NewBilibili() Bilibili { - return NewBilibiliWithNetType(nil) +func NewBilibili(logger logging.Logger) Bilibili { + return NewBilibiliWithNetType(nil, logger) } diff --git a/bilibili/request.go b/bilibili/request.go index f9e2088..7de35d3 100644 --- a/bilibili/request.go +++ b/bilibili/request.go @@ -18,7 +18,7 @@ func (b Bilibili) newRequest( ) (req *http.Request, err error) { req, err = http.NewRequestWithContext(b.ctx, method, url, body) if err != nil { - b.error.Printf("Cannot create HTTP request instance: %v. Method: %v, URL: %v", err, method, url) + b.logger.Error("Cannot create HTTP request instance: %v. Method: %v, URL: %v", err, method, url) return } req.Header.Set("User-Agent", b.userAgent) @@ -48,24 +48,24 @@ func callGet[T BaseResponse[V], V any](b Bilibili, url string) (resp T, err erro err = validateHttpStatus(r) if err != nil { - b.error.Printf("%v", err) + b.logger.Error("%v", err) return } data, err := io.ReadAll(r.Body) if err != nil { - b.error.Printf("Error when reading HTTP response on API %v: %v", url, err) + b.logger.Error("Error when reading HTTP response on API %v: %v", url, err) return } err = json.Unmarshal(data, &resp) if err != nil { - b.error.Printf("Invalid JSON body of HTTP response on API %v: %v. Text: \"%v\"", + b.logger.Error("Invalid JSON body of HTTP response on API %v: %v. Text: \"%v\"", url, err, string(data)) return } - b.debug.Printf("HTTP %v, len: %v bytes, url: %v", r.StatusCode, len(data), url) + b.logger.Debug("HTTP %v, len: %v bytes, url: %v", r.StatusCode, len(data), url) return } @@ -84,7 +84,7 @@ func (b Bilibili) Do(req *http.Request) (resp *http.Response, err error) { isAddrErr := common.IsErrorOfType(err, &net.AddrError{}) if err == nil || !isOpErr || !isAddrErr { // return the first success request - b.loggerCommon.info.Printf("Request success with network %v.", typeName) + b.logger.Info("Request success with network %v.", typeName) return } } diff --git a/bilibili/request_test.go b/bilibili/request_test.go index 68e5c7a..58b7127 100644 --- a/bilibili/request_test.go +++ b/bilibili/request_test.go @@ -1,12 +1,14 @@ package bilibili import ( + "bilibili-livestream-archiver/logging" + "log" "testing" ) func Test_callGet(t *testing.T) { // an always-fail request should not panic - bi := NewBilibili() + bi := NewBilibili(logging.NewWrappedLogger(log.Default(), "main")) _, err := callGet[BaseResponse[struct{}]](bi, "https://256.256.256.256") if err == nil { t.Fatalf("the artificial request should fail, but it haven't") diff --git a/bilibili/streaming.go b/bilibili/streaming.go index 9a26c2d..7ad852c 100644 --- a/bilibili/streaming.go +++ b/bilibili/streaming.go @@ -27,7 +27,7 @@ func (b Bilibili) CopyLiveStream( r, err := b.newGet(url) if err != nil { - b.error.Printf("Cannot create HTTP GET instance on %v: %v", url, err) + b.logger.Error("Cannot create HTTP GET instance on %v: %v", url, err) return err } @@ -36,7 +36,7 @@ func (b Bilibili) CopyLiveStream( resp, err := b.Do(r) if err != nil { - b.error.Printf("Cannot make HTTP GET request on %v: %v\n", url, err) + b.logger.Error("Cannot make HTTP GET request on %v: %v\n", url, err) return } @@ -56,13 +56,13 @@ func (b Bilibili) CopyLiveStream( n, err := common.CopyToFileWithBuffer(ctx, out, resp.Body, buffer, readChunkSize, false) if err != nil && !errors.Is(err, context.Canceled) { - b.error.Printf("Stream copying was interrupted unexpectedly: %v", err) + b.logger.Error("Stream copying was interrupted unexpectedly: %v", err) } if err == nil { - b.info.Printf("The live is ended. (room %v)", roomId) + b.logger.Info("The live is ended. (room %v)", roomId) } - b.info.Printf("Total downloaded: %v", common.PrettyBytes(uint64(n))) + b.logger.Info("Total downloaded: %v", common.PrettyBytes(uint64(n))) return err } diff --git a/common/retry.go b/common/retry.go index 1751679..647e8ad 100644 --- a/common/retry.go +++ b/common/retry.go @@ -1,8 +1,8 @@ package common import ( + "bilibili-livestream-archiver/logging" "context" - "log" "time" ) @@ -15,14 +15,14 @@ func AutoRetry[T any]( supplier func() (T, error), maxRetryTimes int, retryInterval time.Duration, - logger *log.Logger) (T, error) { + logger *logging.Logger) (T, error) { var err error var ret T for i := 0; i < maxRetryTimes+1; i++ { ret, err = supplier() if err != nil { if logger != nil { - logger.Printf("Try %v/%v (sleep %vs): %v", + logger.Info("Try %v/%v (sleep %vs): %v", i, maxRetryTimes, retryInterval, err) } timer := time.NewTimer(retryInterval) @@ -40,7 +40,7 @@ func AutoRetry[T any]( return ret, nil } if logger != nil { - logger.Printf("Max retry times reached, but it still fails. Last error: %v", err) + logger.Error("Max retry times reached, but it still fails. Last error: %v", err) } var zero T return zero, err diff --git a/recording/runner.go b/recording/runner.go index 98d4432..4f3b1f5 100644 --- a/recording/runner.go +++ b/recording/runner.go @@ -12,7 +12,6 @@ import ( "encoding/json" "errors" "fmt" - "log" "os" "path" "time" @@ -57,7 +56,7 @@ func (t *RunningTask) runTaskWithAutoRestart() error { func tryRunTask(t *RunningTask) error { netTypes := t.Transport.AllowedNetworkTypes t.logger.Info("Network types: %v", netTypes) - bi := bilibili.NewBilibiliWithNetType(netTypes) + bi := bilibili.NewBilibiliWithNetType(netTypes, t.logger) t.logger.Info("Start task: room %v", t.RoomId) t.logger.Info("Getting notification server info...") @@ -135,8 +134,7 @@ func record( bi bilibili.Bilibili, task *RunningTask, ) (cancelled bool, err error) { - logger := log.Default() - logger.Printf("INFO: Getting room profile...") + task.logger.Info("Getting room profile...") profile, err := common.AutoRetry( ctx, @@ -145,7 +143,7 @@ func record( }, task.Transport.MaxRetryTimes, time.Duration(task.Transport.RetryIntervalSeconds)*time.Second, - logger, + &task.logger, ) if errors.Is(err, context.Canceled) { cancelled = true @@ -153,12 +151,12 @@ func record( } if err != nil { // still error, abort - logger.Printf("ERROR: Cannot get room information: %v. Stopping current task.", err) + task.logger.Error("Cannot get room information: %v. Stopping current task.", err) cancelled = true return } - logger.Printf("INFO: Getting stream url...") + task.logger.Info("Getting stream url...") urlInfo, err := common.AutoRetry( ctx, func() (bilibili.RoomUrlInfoResponse, error) { @@ -166,14 +164,14 @@ func record( }, task.Transport.MaxRetryTimes, time.Duration(task.Transport.RetryIntervalSeconds)*time.Second, - logger, + &task.logger, ) if errors.Is(err, context.Canceled) { cancelled = true return } if err != nil { - logger.Printf("ERROR: Cannot get streaming info: %v", err) + task.logger.Error("Cannot get streaming info: %v", err) cancelled = true return } @@ -182,7 +180,7 @@ func record( if err2 != nil { j = []byte("(not available)") } - logger.Printf("ERROR: No stream returned from API. Response: %v", string(j)) + task.logger.Error("No stream returned from API. Response: %v", string(j)) cancelled = true return } @@ -196,7 +194,7 @@ func record( filePath := path.Join(task.Download.SaveDirectory, fileName) file, err := os.OpenFile(filePath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644) if err != nil { - logger.Printf("ERROR: Cannot open file for writing: %v", err) + task.logger.Error("Cannot open file for writing: %v", err) cancelled = true return } @@ -210,13 +208,13 @@ func record( writeBufferSize += kReadChunkSize - mod } writeBuffer := make([]byte, writeBufferSize) - logger.Printf("Write buffer size: %v byte", writeBufferSize) - logger.Printf("Recording live stream to file \"%v\"...", filePath) + task.logger.Info("Write buffer size: %v byte", writeBufferSize) + task.logger.Info("Recording live stream to file \"%v\"...", filePath) err = bi.CopyLiveStream(ctx, task.RoomId, streamSource, file, writeBuffer, kReadChunkSize) cancelled = err == nil || errors.Is(err, context.Canceled) if !cancelled { // real error happens - logger.Printf("Error when copying live stream: %v", err) + task.logger.Error("Error when copying live stream: %v", err) } return } @@ -248,6 +246,7 @@ func watcherRecoverableLoop( return resp.Data.LiveStatus.IsStreaming(), nil }, chWatcherEvent, + task.logger, ) // the context is cancelled, stop watching diff --git a/recording/watcher.go b/recording/watcher.go index afc7ab5..0e5d087 100644 --- a/recording/watcher.go +++ b/recording/watcher.go @@ -5,10 +5,10 @@ import ( "bilibili-livestream-archiver/danmaku" "bilibili-livestream-archiver/danmaku/dmmsg" "bilibili-livestream-archiver/danmaku/dmpkg" + "bilibili-livestream-archiver/logging" "context" "encoding/json" "fmt" - "log" "time" ) @@ -53,10 +53,8 @@ func watch( roomId common.RoomId, liveStatusChecker func() (bool, error), chEvent chan<- WatcherEvent, + logger logging.Logger, ) (error, ErrorReason) { - - logger := log.Default() - var err error dm := danmaku.NewDanmakuClient() @@ -70,7 +68,7 @@ func watch( defer func() { _ = dm.Disconnect() }() // the danmaku server requires an auth token and room id when connected - logger.Println("ws connected. Authenticating...") + logger.Info("ws connected. Authenticating...") err = dm.Authenticate(roomId, authKey) if err != nil { return fmt.Errorf("auth failed: %w", err), ErrProtocol @@ -92,17 +90,17 @@ func watch( heartBeatTimer := time.NewTicker(kHeartBeatInterval) defer func() { heartBeatTimer.Stop() }() - logger.Println("Checking initial live status...") + logger.Info("Checking initial live status...") isLiving, err := liveStatusChecker() if err != nil { return fmt.Errorf("check initial live status failed: %w", err), ErrTransport } if isLiving { - logger.Println("The live is already started. Start recording immediately.") + logger.Info("The live is already started. Start recording immediately.") chEvent <- WatcherLiveStart } else { - logger.Println("The live is not started yet. Waiting...") + logger.Info("The live is not started yet. Waiting...") } for { @@ -132,7 +130,7 @@ func watch( var info liveInfo err := json.Unmarshal(msg.Body, &info) if err != nil { - logger.Printf("ERROR: invalid JSON: \"%v\", exchange: %v", string(msg.Body), msg) + logger.Error("Invalid JSON: \"%v\", exchange: %v", string(msg.Body), msg) return fmt.Errorf("decode server message body JSON failed: %w", err), ErrProtocol } switch info.Command { @@ -158,7 +156,7 @@ func watch( fallthrough case "HOT_RANK_CHANGED_V2": // useless message - logger.Printf("Ignore message: %v", info.Command) + logger.Info("Ignore message: %v", info.Command) case "WATCHED_CHANGE": // number of watched people changed obj, exists := info.Data["num"] @@ -167,39 +165,39 @@ func watch( } viewersNum, ok := obj.(float64) if !ok { - logger.Printf("Cannot parse watched people number: %v", obj) + logger.Error("Cannot parse watched people number: %v", obj) continue } - logger.Printf("The number of viewers (room: %v): %v", roomId, viewersNum) + logger.Info("The number of viewers (room: %v): %v", roomId, viewersNum) case "INTERACT_WORD": var raw dmmsg.RawInteractWordMessage err = json.Unmarshal(msg.Body, &raw) if err != nil { - logger.Printf("Cannot parse RawInteractWordMessage JSON: %v", err) + logger.Error("Cannot parse RawInteractWordMessage JSON: %v", err) continue } - logger.Printf("Interact word message: user: %v medal: %v", + logger.Info("Interact word message: user: %v medal: %v", raw.Data.UserName, raw.Data.FansMedal.Name) case "DANMU_MSG": var raw dmmsg.RawDanMuMessage err = json.Unmarshal(msg.Body, &raw) if err != nil { - logger.Printf("Cannot parse Dan Mu message as JSON: %v", err) + logger.Error("Cannot parse Dan Mu message as JSON: %v", err) continue } dmm, err := dmmsg.ParseDanmakuMessage(raw) if err != nil { - logger.Printf("Cannot parse Dan Mu message JSON: %v", err) + logger.Error("Cannot parse Dan Mu message JSON: %v", err) continue } - logger.Printf("Dan Mu: %v", dmm.String()) + logger.Info("Dan Mu: %v", dmm.String()) default: - logger.Printf("Ignore unhandled server message %v %v %v", + logger.Info("Ignore unhandled server message %v %v %v", info.Command, msg.Operation, string(msg.Body)) } } default: - logger.Printf("Server message: %v", msg.String()) + logger.Info("Server message: %v", msg.String()) } } |