summaryrefslogtreecommitdiff
path: root/recording/watcher.go
diff options
context:
space:
mode:
Diffstat (limited to 'recording/watcher.go')
-rw-r--r--recording/watcher.go72
1 files changed, 29 insertions, 43 deletions
diff --git a/recording/watcher.go b/recording/watcher.go
index 4ae17b7..2510b8b 100644
--- a/recording/watcher.go
+++ b/recording/watcher.go
@@ -3,8 +3,6 @@ package recording
import (
"context"
"encoding/json"
- "fmt"
- "github.com/keuin/slbr/common"
"github.com/keuin/slbr/danmaku"
"github.com/keuin/slbr/danmaku/dmmsg"
"github.com/keuin/slbr/danmaku/dmpkg"
@@ -12,13 +10,6 @@ import (
"time"
)
-type WatcherEvent int
-
-const (
- WatcherLiveStart WatcherEvent = 0
- WatcherLiveStop WatcherEvent = 1
-)
-
type liveCommand string
const (
@@ -31,14 +22,6 @@ type liveInfo struct {
Data map[string]interface{} `json:"data"`
}
-type ErrorReason int
-
-const (
- ErrSuccess ErrorReason = iota // no error happens, normally closed
- ErrTransport // I/O error, safe to retry
- ErrProtocol // application protocol logic error, do not retry
-)
-
const (
kHeartBeatInterval = 30 * time.Second
)
@@ -46,32 +29,41 @@ const (
// watch monitors live room status by subscribing messages from Bilibili danmaku server,
// which talks to the client via a WebSocket or TCP connection.
// In our implementation, we use WebSocket over SSL/TLS.
+// This function will return after the live is started,
+// since one connection cannot receive more than one live start event.
+// Error types:
+// - UnrecoverableError
+// - RecoverableError
+// - context.Cancelled
func watch(
ctx context.Context,
+ t TaskConfig,
url string,
authKey string,
- roomId common.RoomId,
liveStatusChecker func() (bool, error),
- chEvent chan<- WatcherEvent,
logger logging.Logger,
-) (error, ErrorReason) {
+) error {
var err error
dm := danmaku.NewDanmakuClient()
- defer func() { _ = dm.Disconnect() }()
// connect to danmaku server for live online/offline notifications
err = dm.Connect(ctx, url)
if err != nil {
- return fmt.Errorf("failed to connect to danmaku server: %w", err), ErrTransport
+ return NewRecoverableTaskError("failed to connect to danmaku server", err)
}
- defer func() { _ = dm.Disconnect() }()
+ defer func() {
+ // this operation may be time-consuming, so run in another goroutine
+ go func() {
+ _ = dm.Disconnect()
+ }()
+ }()
// the danmaku server requires an auth token and room id when connected
logger.Info("ws connected. Authenticating...")
- err = dm.Authenticate(roomId, authKey)
+ err = dm.Authenticate(t.RoomId, authKey)
if err != nil {
- return fmt.Errorf("auth failed: %w", err), ErrProtocol
+ return NewUnrecoverableTaskError("authentication failed, invalid protocol", err)
}
// the danmaku server requires heartbeat messages every 30 seconds
@@ -83,7 +75,7 @@ func watch(
// send initial heartbeat immediately
err = heartbeat()
if err != nil {
- return err, ErrTransport
+ return NewRecoverableTaskError("heartbeat failed", err)
}
// create heartbeat timer
@@ -91,14 +83,13 @@ func watch(
defer func() { heartBeatTimer.Stop() }()
logger.Info("Checking initial live status...")
- isLiving, err := liveStatusChecker()
+ isLiving, err := AutoRetryWithConfig[bool](ctx, logger, &t, liveStatusChecker)
if err != nil {
- return fmt.Errorf("check initial live status failed: %w", err), ErrTransport
+ return NewRecoverableTaskError("check initial live status failed", err)
}
-
if isLiving {
logger.Info("The live is already started. Start recording immediately.")
- chEvent <- WatcherLiveStart
+ return nil
} else {
logger.Info("The live is not started yet. Waiting...")
}
@@ -106,22 +97,22 @@ func watch(
for {
select {
case <-ctx.Done():
- return nil, ErrSuccess
+ return ctx.Err()
case <-heartBeatTimer.C:
err = heartbeat()
if err != nil {
- return fmt.Errorf("heartbeat failed: %w", err), ErrTransport
+ return NewRecoverableTaskError("heartbeat failed", err)
}
default:
var msg dmpkg.DanmakuExchange
msg, err = dm.ReadExchange()
if err != nil {
- return fmt.Errorf("exchange read failed: %w", err), ErrTransport
+ return NewRecoverableTaskError("failed to read exchange from server", err)
}
// the exchange may be compressed
msg, err = msg.Inflate()
if err != nil {
- return fmt.Errorf("inflate server message failed: %v", err), ErrProtocol
+ return NewUnrecoverableTaskError("failed to decompress server message", err)
}
switch msg.Operation {
@@ -131,18 +122,13 @@ func watch(
err := json.Unmarshal(msg.Body, &info)
if err != nil {
logger.Error("Invalid JSON: \"%v\", exchange: %v", string(msg.Body), msg)
- return fmt.Errorf("decode server message body JSON failed: %w", err), ErrProtocol
+ return NewUnrecoverableTaskError("invalid JSON response from server", err)
}
switch info.Command {
case CommandLiveStart:
- if !isLiving {
- chEvent <- WatcherLiveStart
- isLiving = true
- }
+ return nil
case CommandStreamPreparing:
- if isLiving {
- chEvent <- WatcherLiveStop
- }
+ break
default:
switch info.Command {
case "ENTRY_EFFECT":
@@ -168,7 +154,7 @@ func watch(
logger.Error("Cannot parse watched people number: %v", obj)
continue
}
- logger.Info("The number of viewers (room: %v): %v", roomId, viewersNum)
+ logger.Info("The number of viewers (room: %v): %v", t.RoomId, viewersNum)
case "INTERACT_WORD":
var raw dmmsg.RawInteractWordMessage
err = json.Unmarshal(msg.Body, &raw)