summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--bilibili/streaming.go30
-rw-r--r--common/errors.go31
-rw-r--r--recording/errors.go33
-rw-r--r--recording/runner.go55
-rw-r--r--recording/watcher.go17
5 files changed, 101 insertions, 65 deletions
diff --git a/bilibili/streaming.go b/bilibili/streaming.go
index 21bca01..005ffd6 100644
--- a/bilibili/streaming.go
+++ b/bilibili/streaming.go
@@ -11,12 +11,14 @@ import (
"strings"
)
+const kInitReadBytes = 4096 // 4KiB
+
// CopyLiveStream read data from a livestream video stream, copy them to a writer.
func (b Bilibili) CopyLiveStream(
ctx context.Context,
roomId common.RoomId,
stream StreamingUrlInfo,
- out *os.File,
+ fileCreator func() (*os.File, error),
bufSize int64,
) (err error) {
url := stream.URL
@@ -52,7 +54,31 @@ func (b Bilibili) CopyLiveStream(
defer func() { _ = resp.Body.Close() }()
- b.logger.Info("Copying live stream...")
+ b.logger.Info("Waiting for stream initial bytes...")
+ // read some first bytes to ensure that the live is really started,
+ // so we don't create blank files if the live room is open
+ // but the live hasn't started yet
+ initBytes := make([]byte, kInitReadBytes)
+ _, err = io.ReadFull(resp.Body, initBytes)
+ if err != nil {
+ b.logger.Error("Failed to read stream initial bytes: %v", err)
+ return
+ }
+ b.logger.Info("Stream is started. Receiving live stream...")
+ // write initial bytes
+ var out *os.File
+ out, err = fileCreator()
+ if err != nil {
+ b.logger.Error("Cannot open file for writing: %v", err)
+ err = common.NewUnrecoverableTaskError("failed to create file", err)
+ return
+ }
+ _, err = out.Write(initBytes)
+ if err != nil {
+ b.logger.Error("Failed to write to file: %v", err)
+ return
+ }
+ initBytes = nil // discard that buffer
var n int64
diff --git a/common/errors.go b/common/errors.go
index ba686f1..fa2d492 100644
--- a/common/errors.go
+++ b/common/errors.go
@@ -2,6 +2,7 @@ package common
import (
"errors"
+ "fmt"
"reflect"
)
@@ -27,3 +28,33 @@ func IsErrorOfType(err, target error) bool {
}
}
}
+
+/*
+Task errors.
+*/
+
+type RecoverableTaskError struct {
+ err error
+ message string
+}
+
+func (e *RecoverableTaskError) Error() string {
+ return fmt.Sprintf("%v: %v", e.message, e.err)
+}
+
+func NewRecoverableTaskError(message string, err error) error {
+ return &RecoverableTaskError{message: message, err: err}
+}
+
+type UnrecoverableTaskError struct {
+ err error
+ message string
+}
+
+func (e *UnrecoverableTaskError) Error() string {
+ return fmt.Sprintf("%v: %v", e.message, e.err)
+}
+
+func NewUnrecoverableTaskError(message string, err error) error {
+ return &UnrecoverableTaskError{message: message, err: err}
+}
diff --git a/recording/errors.go b/recording/errors.go
deleted file mode 100644
index 48a492e..0000000
--- a/recording/errors.go
+++ /dev/null
@@ -1,33 +0,0 @@
-package recording
-
-import "fmt"
-
-/*
-Task errors.
-*/
-
-type RecoverableTaskError struct {
- err error
- message string
-}
-
-func (e *RecoverableTaskError) Error() string {
- return fmt.Sprintf("%v: %v", e.message, e.err)
-}
-
-func NewRecoverableTaskError(message string, err error) error {
- return &RecoverableTaskError{message: message, err: err}
-}
-
-type UnrecoverableTaskError struct {
- err error
- message string
-}
-
-func (e *UnrecoverableTaskError) Error() string {
- return fmt.Sprintf("%v: %v", e.message, e.err)
-}
-
-func NewUnrecoverableTaskError(message string, err error) error {
- return &UnrecoverableTaskError{message: message, err: err}
-}
diff --git a/recording/runner.go b/recording/runner.go
index 47c6764..8c1e047 100644
--- a/recording/runner.go
+++ b/recording/runner.go
@@ -28,7 +28,7 @@ type TaskResult struct {
const kSpecialExtName = "partial"
-var errLiveEnded = NewRecoverableTaskError("live is ended", nil)
+var errLiveEnded = common.NewRecoverableTaskError("live is ended", nil)
// runTaskWithAutoRestart
// start a monitor&download task.
@@ -43,7 +43,7 @@ loop:
switch err := tryRunTask(t); err.(type) {
case nil:
t.logger.Info("Task stopped: %v", t.String())
- case *RecoverableTaskError:
+ case *common.RecoverableTaskError:
if err != errLiveEnded {
t.logger.Error("Temporary error: %v", err)
}
@@ -82,7 +82,7 @@ func tryRunTask(t *RunningTask) error {
},
)
if err != nil {
- return NewRecoverableTaskError("cannot get notification server info", err)
+ return common.NewRecoverableTaskError("cannot get notification server info", err)
}
t.logger.Info("Success.")
@@ -127,13 +127,13 @@ func tryRunTask(t *RunningTask) error {
case nil:
// live is started, stop watcher loop and start the recorder
break loop
- case *RecoverableTaskError:
+ case *common.RecoverableTaskError:
// if the watcher fails and recoverable, just try to recover
// because the recorder has not started yet
run = true
t.logger.Error("Error occurred in live status watcher: %v", err)
break
- case *UnrecoverableTaskError:
+ case *common.UnrecoverableTaskError:
// the watcher cannot recover, so the task should be stopped
run = false
t.logger.Error("Error occurred in live status watcher: %v", err)
@@ -170,12 +170,12 @@ func tryRunTask(t *RunningTask) error {
// live is ended
t.logger.Info("The live is ended. Restarting current task...")
return errLiveEnded
- case *RecoverableTaskError:
+ case *common.RecoverableTaskError:
// here we don't know if the live is ended, so we have to do a check
t.logger.Warning("Recording is interrupted. Checking live status...")
isLiving, err2 := AutoRetryWithTask(t, liveStatusChecker)
if err2 != nil {
- return NewRecoverableTaskError(
+ return common.NewRecoverableTaskError(
"when handling an error, another error occurred",
fmt.Errorf("first: %v, second: %w", err, err2),
)
@@ -204,16 +204,16 @@ func tryRunTask(t *RunningTask) error {
}
return err
}()
- case *UnrecoverableTaskError:
+ case *common.UnrecoverableTaskError:
// watcher is stopped and cannot restart
- return NewUnrecoverableTaskError("failed to watch live status", errWatcher)
+ return common.NewUnrecoverableTaskError("failed to watch live status", errWatcher)
default:
// watcher is cancelled, stop running the task
if errors.Is(errWatcher, context.Canceled) {
return errWatcher
}
// unexpected error, this is a programming error
- return NewUnrecoverableTaskError("unexpected error type", errWatcher)
+ return common.NewUnrecoverableTaskError("unexpected error type", errWatcher)
}
}
@@ -243,7 +243,7 @@ func record(
if errors.Is(err, context.Canceled) {
return err
}
- return NewRecoverableTaskError("failed to get living room information", err)
+ return common.NewRecoverableTaskError("failed to get living room information", err)
}
logger.Info("Getting stream url...")
@@ -259,7 +259,7 @@ func record(
if errors.Is(err, context.Canceled) {
return err
}
- return NewRecoverableTaskError("failed to get live info", err)
+ return common.NewRecoverableTaskError("failed to get live info", err)
}
if len(urlInfo.Data.URLs) == 0 {
j, err2 := json.Marshal(urlInfo)
@@ -267,7 +267,7 @@ func record(
j = []byte("(not available)")
}
logger.Error("No stream was provided. Response: %v", string(j))
- return NewUnrecoverableTaskError("invalid live info", fmt.Errorf("no stream provided"))
+ return common.NewUnrecoverableTaskError("invalid live info", fmt.Errorf("no stream provided"))
}
streamSource := urlInfo.Data.URLs[0]
@@ -287,13 +287,15 @@ func record(
saveDir := task.Download.SaveDirectory
filePath := path.Join(saveDir, fileName)
- file, err := os.OpenFile(filePath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644)
- if err != nil {
- logger.Error("Cannot open file for writing: %v", err)
- return NewUnrecoverableTaskError("cannot open file for writing", err)
- }
+ var file *os.File
+
+ // TODO refactor, move file close logic to CopyLiveStream
// rename the extension name to originalExtName when finish writing
defer func() {
+ if file == nil {
+ // the file is not created
+ return
+ }
if extName == originalExtName {
return
}
@@ -310,13 +312,22 @@ func record(
writeBufferSize := task.Download.DiskWriteBufferBytes
logger.Info("Write buffer size: %v byte", writeBufferSize)
- logger.Info("Recording live stream to file \"%v\"...", filePath)
- err = bi.CopyLiveStream(ctx, task.RoomId, streamSource, file, writeBufferSize)
- if errors.Is(err, context.Canceled) || err == nil {
+ err = bi.CopyLiveStream(ctx, task.RoomId, streamSource, func() (f *os.File, e error) {
+ f, e = os.OpenFile(filePath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644)
+ if e != nil {
+ file = f
+ }
+ logger.Info("Recording live stream to file \"%v\"...", filePath)
+ return
+ }, writeBufferSize)
+ if common.IsErrorOfType(err, &common.UnrecoverableTaskError{}) {
+ logger.Error("Cannot record: %v", err)
+ return err
+ } else if errors.Is(err, context.Canceled) || err == nil {
return err
}
logger.Error("Error when copying live stream: %v", err)
- return NewRecoverableTaskError("stream copy was unexpectedly interrupted", err)
+ return common.NewRecoverableTaskError("stream copy was unexpectedly interrupted", err)
}
func getDanmakuServer(
diff --git a/recording/watcher.go b/recording/watcher.go
index 2510b8b..13610a4 100644
--- a/recording/watcher.go
+++ b/recording/watcher.go
@@ -3,6 +3,7 @@ package recording
import (
"context"
"encoding/json"
+ "github.com/keuin/slbr/common"
"github.com/keuin/slbr/danmaku"
"github.com/keuin/slbr/danmaku/dmmsg"
"github.com/keuin/slbr/danmaku/dmpkg"
@@ -50,7 +51,7 @@ func watch(
// connect to danmaku server for live online/offline notifications
err = dm.Connect(ctx, url)
if err != nil {
- return NewRecoverableTaskError("failed to connect to danmaku server", err)
+ return common.NewRecoverableTaskError("failed to connect to danmaku server", err)
}
defer func() {
// this operation may be time-consuming, so run in another goroutine
@@ -63,7 +64,7 @@ func watch(
logger.Info("ws connected. Authenticating...")
err = dm.Authenticate(t.RoomId, authKey)
if err != nil {
- return NewUnrecoverableTaskError("authentication failed, invalid protocol", err)
+ return common.NewUnrecoverableTaskError("authentication failed, invalid protocol", err)
}
// the danmaku server requires heartbeat messages every 30 seconds
@@ -75,7 +76,7 @@ func watch(
// send initial heartbeat immediately
err = heartbeat()
if err != nil {
- return NewRecoverableTaskError("heartbeat failed", err)
+ return common.NewRecoverableTaskError("heartbeat failed", err)
}
// create heartbeat timer
@@ -85,7 +86,7 @@ func watch(
logger.Info("Checking initial live status...")
isLiving, err := AutoRetryWithConfig[bool](ctx, logger, &t, liveStatusChecker)
if err != nil {
- return NewRecoverableTaskError("check initial live status failed", err)
+ return common.NewRecoverableTaskError("check initial live status failed", err)
}
if isLiving {
logger.Info("The live is already started. Start recording immediately.")
@@ -101,18 +102,18 @@ func watch(
case <-heartBeatTimer.C:
err = heartbeat()
if err != nil {
- return NewRecoverableTaskError("heartbeat failed", err)
+ return common.NewRecoverableTaskError("heartbeat failed", err)
}
default:
var msg dmpkg.DanmakuExchange
msg, err = dm.ReadExchange()
if err != nil {
- return NewRecoverableTaskError("failed to read exchange from server", err)
+ return common.NewRecoverableTaskError("failed to read exchange from server", err)
}
// the exchange may be compressed
msg, err = msg.Inflate()
if err != nil {
- return NewUnrecoverableTaskError("failed to decompress server message", err)
+ return common.NewUnrecoverableTaskError("failed to decompress server message", err)
}
switch msg.Operation {
@@ -122,7 +123,7 @@ func watch(
err := json.Unmarshal(msg.Body, &info)
if err != nil {
logger.Error("Invalid JSON: \"%v\", exchange: %v", string(msg.Body), msg)
- return NewUnrecoverableTaskError("invalid JSON response from server", err)
+ return common.NewUnrecoverableTaskError("invalid JSON response from server", err)
}
switch info.Command {
case CommandLiveStart: