diff options
Diffstat (limited to 'recording/watcher.go')
-rw-r--r-- | recording/watcher.go | 72 |
1 files changed, 29 insertions, 43 deletions
diff --git a/recording/watcher.go b/recording/watcher.go index 4ae17b7..2510b8b 100644 --- a/recording/watcher.go +++ b/recording/watcher.go @@ -3,8 +3,6 @@ package recording import ( "context" "encoding/json" - "fmt" - "github.com/keuin/slbr/common" "github.com/keuin/slbr/danmaku" "github.com/keuin/slbr/danmaku/dmmsg" "github.com/keuin/slbr/danmaku/dmpkg" @@ -12,13 +10,6 @@ import ( "time" ) -type WatcherEvent int - -const ( - WatcherLiveStart WatcherEvent = 0 - WatcherLiveStop WatcherEvent = 1 -) - type liveCommand string const ( @@ -31,14 +22,6 @@ type liveInfo struct { Data map[string]interface{} `json:"data"` } -type ErrorReason int - -const ( - ErrSuccess ErrorReason = iota // no error happens, normally closed - ErrTransport // I/O error, safe to retry - ErrProtocol // application protocol logic error, do not retry -) - const ( kHeartBeatInterval = 30 * time.Second ) @@ -46,32 +29,41 @@ const ( // watch monitors live room status by subscribing messages from Bilibili danmaku server, // which talks to the client via a WebSocket or TCP connection. // In our implementation, we use WebSocket over SSL/TLS. +// This function will return after the live is started, +// since one connection cannot receive more than one live start event. +// Error types: +// - UnrecoverableError +// - RecoverableError +// - context.Cancelled func watch( ctx context.Context, + t TaskConfig, url string, authKey string, - roomId common.RoomId, liveStatusChecker func() (bool, error), - chEvent chan<- WatcherEvent, logger logging.Logger, -) (error, ErrorReason) { +) error { var err error dm := danmaku.NewDanmakuClient() - defer func() { _ = dm.Disconnect() }() // connect to danmaku server for live online/offline notifications err = dm.Connect(ctx, url) if err != nil { - return fmt.Errorf("failed to connect to danmaku server: %w", err), ErrTransport + return NewRecoverableTaskError("failed to connect to danmaku server", err) } - defer func() { _ = dm.Disconnect() }() + defer func() { + // this operation may be time-consuming, so run in another goroutine + go func() { + _ = dm.Disconnect() + }() + }() // the danmaku server requires an auth token and room id when connected logger.Info("ws connected. Authenticating...") - err = dm.Authenticate(roomId, authKey) + err = dm.Authenticate(t.RoomId, authKey) if err != nil { - return fmt.Errorf("auth failed: %w", err), ErrProtocol + return NewUnrecoverableTaskError("authentication failed, invalid protocol", err) } // the danmaku server requires heartbeat messages every 30 seconds @@ -83,7 +75,7 @@ func watch( // send initial heartbeat immediately err = heartbeat() if err != nil { - return err, ErrTransport + return NewRecoverableTaskError("heartbeat failed", err) } // create heartbeat timer @@ -91,14 +83,13 @@ func watch( defer func() { heartBeatTimer.Stop() }() logger.Info("Checking initial live status...") - isLiving, err := liveStatusChecker() + isLiving, err := AutoRetryWithConfig[bool](ctx, logger, &t, liveStatusChecker) if err != nil { - return fmt.Errorf("check initial live status failed: %w", err), ErrTransport + return NewRecoverableTaskError("check initial live status failed", err) } - if isLiving { logger.Info("The live is already started. Start recording immediately.") - chEvent <- WatcherLiveStart + return nil } else { logger.Info("The live is not started yet. Waiting...") } @@ -106,22 +97,22 @@ func watch( for { select { case <-ctx.Done(): - return nil, ErrSuccess + return ctx.Err() case <-heartBeatTimer.C: err = heartbeat() if err != nil { - return fmt.Errorf("heartbeat failed: %w", err), ErrTransport + return NewRecoverableTaskError("heartbeat failed", err) } default: var msg dmpkg.DanmakuExchange msg, err = dm.ReadExchange() if err != nil { - return fmt.Errorf("exchange read failed: %w", err), ErrTransport + return NewRecoverableTaskError("failed to read exchange from server", err) } // the exchange may be compressed msg, err = msg.Inflate() if err != nil { - return fmt.Errorf("inflate server message failed: %v", err), ErrProtocol + return NewUnrecoverableTaskError("failed to decompress server message", err) } switch msg.Operation { @@ -131,18 +122,13 @@ func watch( err := json.Unmarshal(msg.Body, &info) if err != nil { logger.Error("Invalid JSON: \"%v\", exchange: %v", string(msg.Body), msg) - return fmt.Errorf("decode server message body JSON failed: %w", err), ErrProtocol + return NewUnrecoverableTaskError("invalid JSON response from server", err) } switch info.Command { case CommandLiveStart: - if !isLiving { - chEvent <- WatcherLiveStart - isLiving = true - } + return nil case CommandStreamPreparing: - if isLiving { - chEvent <- WatcherLiveStop - } + break default: switch info.Command { case "ENTRY_EFFECT": @@ -168,7 +154,7 @@ func watch( logger.Error("Cannot parse watched people number: %v", obj) continue } - logger.Info("The number of viewers (room: %v): %v", roomId, viewersNum) + logger.Info("The number of viewers (room: %v): %v", t.RoomId, viewersNum) case "INTERACT_WORD": var raw dmmsg.RawInteractWordMessage err = json.Unmarshal(msg.Body, &raw) |