From 0725ebf24da4d3f06097372e31c05a49543edc79 Mon Sep 17 00:00:00 2001 From: Keuin Date: Fri, 16 Sep 2022 02:30:24 +0800 Subject: Bugfix: empty files will be created if the live room is opened but the live hasn't started yet --- bilibili/streaming.go | 30 ++++++++++++++++++++++++++-- common/errors.go | 31 +++++++++++++++++++++++++++++ recording/errors.go | 33 ------------------------------- recording/runner.go | 55 ++++++++++++++++++++++++++++++--------------------- recording/watcher.go | 17 ++++++++-------- 5 files changed, 101 insertions(+), 65 deletions(-) delete mode 100644 recording/errors.go diff --git a/bilibili/streaming.go b/bilibili/streaming.go index 21bca01..005ffd6 100644 --- a/bilibili/streaming.go +++ b/bilibili/streaming.go @@ -11,12 +11,14 @@ import ( "strings" ) +const kInitReadBytes = 4096 // 4KiB + // CopyLiveStream read data from a livestream video stream, copy them to a writer. func (b Bilibili) CopyLiveStream( ctx context.Context, roomId common.RoomId, stream StreamingUrlInfo, - out *os.File, + fileCreator func() (*os.File, error), bufSize int64, ) (err error) { url := stream.URL @@ -52,7 +54,31 @@ func (b Bilibili) CopyLiveStream( defer func() { _ = resp.Body.Close() }() - b.logger.Info("Copying live stream...") + b.logger.Info("Waiting for stream initial bytes...") + // read some first bytes to ensure that the live is really started, + // so we don't create blank files if the live room is open + // but the live hasn't started yet + initBytes := make([]byte, kInitReadBytes) + _, err = io.ReadFull(resp.Body, initBytes) + if err != nil { + b.logger.Error("Failed to read stream initial bytes: %v", err) + return + } + b.logger.Info("Stream is started. Receiving live stream...") + // write initial bytes + var out *os.File + out, err = fileCreator() + if err != nil { + b.logger.Error("Cannot open file for writing: %v", err) + err = common.NewUnrecoverableTaskError("failed to create file", err) + return + } + _, err = out.Write(initBytes) + if err != nil { + b.logger.Error("Failed to write to file: %v", err) + return + } + initBytes = nil // discard that buffer var n int64 diff --git a/common/errors.go b/common/errors.go index ba686f1..fa2d492 100644 --- a/common/errors.go +++ b/common/errors.go @@ -2,6 +2,7 @@ package common import ( "errors" + "fmt" "reflect" ) @@ -27,3 +28,33 @@ func IsErrorOfType(err, target error) bool { } } } + +/* +Task errors. +*/ + +type RecoverableTaskError struct { + err error + message string +} + +func (e *RecoverableTaskError) Error() string { + return fmt.Sprintf("%v: %v", e.message, e.err) +} + +func NewRecoverableTaskError(message string, err error) error { + return &RecoverableTaskError{message: message, err: err} +} + +type UnrecoverableTaskError struct { + err error + message string +} + +func (e *UnrecoverableTaskError) Error() string { + return fmt.Sprintf("%v: %v", e.message, e.err) +} + +func NewUnrecoverableTaskError(message string, err error) error { + return &UnrecoverableTaskError{message: message, err: err} +} diff --git a/recording/errors.go b/recording/errors.go deleted file mode 100644 index 48a492e..0000000 --- a/recording/errors.go +++ /dev/null @@ -1,33 +0,0 @@ -package recording - -import "fmt" - -/* -Task errors. -*/ - -type RecoverableTaskError struct { - err error - message string -} - -func (e *RecoverableTaskError) Error() string { - return fmt.Sprintf("%v: %v", e.message, e.err) -} - -func NewRecoverableTaskError(message string, err error) error { - return &RecoverableTaskError{message: message, err: err} -} - -type UnrecoverableTaskError struct { - err error - message string -} - -func (e *UnrecoverableTaskError) Error() string { - return fmt.Sprintf("%v: %v", e.message, e.err) -} - -func NewUnrecoverableTaskError(message string, err error) error { - return &UnrecoverableTaskError{message: message, err: err} -} diff --git a/recording/runner.go b/recording/runner.go index 47c6764..8c1e047 100644 --- a/recording/runner.go +++ b/recording/runner.go @@ -28,7 +28,7 @@ type TaskResult struct { const kSpecialExtName = "partial" -var errLiveEnded = NewRecoverableTaskError("live is ended", nil) +var errLiveEnded = common.NewRecoverableTaskError("live is ended", nil) // runTaskWithAutoRestart // start a monitor&download task. @@ -43,7 +43,7 @@ loop: switch err := tryRunTask(t); err.(type) { case nil: t.logger.Info("Task stopped: %v", t.String()) - case *RecoverableTaskError: + case *common.RecoverableTaskError: if err != errLiveEnded { t.logger.Error("Temporary error: %v", err) } @@ -82,7 +82,7 @@ func tryRunTask(t *RunningTask) error { }, ) if err != nil { - return NewRecoverableTaskError("cannot get notification server info", err) + return common.NewRecoverableTaskError("cannot get notification server info", err) } t.logger.Info("Success.") @@ -127,13 +127,13 @@ func tryRunTask(t *RunningTask) error { case nil: // live is started, stop watcher loop and start the recorder break loop - case *RecoverableTaskError: + case *common.RecoverableTaskError: // if the watcher fails and recoverable, just try to recover // because the recorder has not started yet run = true t.logger.Error("Error occurred in live status watcher: %v", err) break - case *UnrecoverableTaskError: + case *common.UnrecoverableTaskError: // the watcher cannot recover, so the task should be stopped run = false t.logger.Error("Error occurred in live status watcher: %v", err) @@ -170,12 +170,12 @@ func tryRunTask(t *RunningTask) error { // live is ended t.logger.Info("The live is ended. Restarting current task...") return errLiveEnded - case *RecoverableTaskError: + case *common.RecoverableTaskError: // here we don't know if the live is ended, so we have to do a check t.logger.Warning("Recording is interrupted. Checking live status...") isLiving, err2 := AutoRetryWithTask(t, liveStatusChecker) if err2 != nil { - return NewRecoverableTaskError( + return common.NewRecoverableTaskError( "when handling an error, another error occurred", fmt.Errorf("first: %v, second: %w", err, err2), ) @@ -204,16 +204,16 @@ func tryRunTask(t *RunningTask) error { } return err }() - case *UnrecoverableTaskError: + case *common.UnrecoverableTaskError: // watcher is stopped and cannot restart - return NewUnrecoverableTaskError("failed to watch live status", errWatcher) + return common.NewUnrecoverableTaskError("failed to watch live status", errWatcher) default: // watcher is cancelled, stop running the task if errors.Is(errWatcher, context.Canceled) { return errWatcher } // unexpected error, this is a programming error - return NewUnrecoverableTaskError("unexpected error type", errWatcher) + return common.NewUnrecoverableTaskError("unexpected error type", errWatcher) } } @@ -243,7 +243,7 @@ func record( if errors.Is(err, context.Canceled) { return err } - return NewRecoverableTaskError("failed to get living room information", err) + return common.NewRecoverableTaskError("failed to get living room information", err) } logger.Info("Getting stream url...") @@ -259,7 +259,7 @@ func record( if errors.Is(err, context.Canceled) { return err } - return NewRecoverableTaskError("failed to get live info", err) + return common.NewRecoverableTaskError("failed to get live info", err) } if len(urlInfo.Data.URLs) == 0 { j, err2 := json.Marshal(urlInfo) @@ -267,7 +267,7 @@ func record( j = []byte("(not available)") } logger.Error("No stream was provided. Response: %v", string(j)) - return NewUnrecoverableTaskError("invalid live info", fmt.Errorf("no stream provided")) + return common.NewUnrecoverableTaskError("invalid live info", fmt.Errorf("no stream provided")) } streamSource := urlInfo.Data.URLs[0] @@ -287,13 +287,15 @@ func record( saveDir := task.Download.SaveDirectory filePath := path.Join(saveDir, fileName) - file, err := os.OpenFile(filePath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644) - if err != nil { - logger.Error("Cannot open file for writing: %v", err) - return NewUnrecoverableTaskError("cannot open file for writing", err) - } + var file *os.File + + // TODO refactor, move file close logic to CopyLiveStream // rename the extension name to originalExtName when finish writing defer func() { + if file == nil { + // the file is not created + return + } if extName == originalExtName { return } @@ -310,13 +312,22 @@ func record( writeBufferSize := task.Download.DiskWriteBufferBytes logger.Info("Write buffer size: %v byte", writeBufferSize) - logger.Info("Recording live stream to file \"%v\"...", filePath) - err = bi.CopyLiveStream(ctx, task.RoomId, streamSource, file, writeBufferSize) - if errors.Is(err, context.Canceled) || err == nil { + err = bi.CopyLiveStream(ctx, task.RoomId, streamSource, func() (f *os.File, e error) { + f, e = os.OpenFile(filePath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644) + if e != nil { + file = f + } + logger.Info("Recording live stream to file \"%v\"...", filePath) + return + }, writeBufferSize) + if common.IsErrorOfType(err, &common.UnrecoverableTaskError{}) { + logger.Error("Cannot record: %v", err) + return err + } else if errors.Is(err, context.Canceled) || err == nil { return err } logger.Error("Error when copying live stream: %v", err) - return NewRecoverableTaskError("stream copy was unexpectedly interrupted", err) + return common.NewRecoverableTaskError("stream copy was unexpectedly interrupted", err) } func getDanmakuServer( diff --git a/recording/watcher.go b/recording/watcher.go index 2510b8b..13610a4 100644 --- a/recording/watcher.go +++ b/recording/watcher.go @@ -3,6 +3,7 @@ package recording import ( "context" "encoding/json" + "github.com/keuin/slbr/common" "github.com/keuin/slbr/danmaku" "github.com/keuin/slbr/danmaku/dmmsg" "github.com/keuin/slbr/danmaku/dmpkg" @@ -50,7 +51,7 @@ func watch( // connect to danmaku server for live online/offline notifications err = dm.Connect(ctx, url) if err != nil { - return NewRecoverableTaskError("failed to connect to danmaku server", err) + return common.NewRecoverableTaskError("failed to connect to danmaku server", err) } defer func() { // this operation may be time-consuming, so run in another goroutine @@ -63,7 +64,7 @@ func watch( logger.Info("ws connected. Authenticating...") err = dm.Authenticate(t.RoomId, authKey) if err != nil { - return NewUnrecoverableTaskError("authentication failed, invalid protocol", err) + return common.NewUnrecoverableTaskError("authentication failed, invalid protocol", err) } // the danmaku server requires heartbeat messages every 30 seconds @@ -75,7 +76,7 @@ func watch( // send initial heartbeat immediately err = heartbeat() if err != nil { - return NewRecoverableTaskError("heartbeat failed", err) + return common.NewRecoverableTaskError("heartbeat failed", err) } // create heartbeat timer @@ -85,7 +86,7 @@ func watch( logger.Info("Checking initial live status...") isLiving, err := AutoRetryWithConfig[bool](ctx, logger, &t, liveStatusChecker) if err != nil { - return NewRecoverableTaskError("check initial live status failed", err) + return common.NewRecoverableTaskError("check initial live status failed", err) } if isLiving { logger.Info("The live is already started. Start recording immediately.") @@ -101,18 +102,18 @@ func watch( case <-heartBeatTimer.C: err = heartbeat() if err != nil { - return NewRecoverableTaskError("heartbeat failed", err) + return common.NewRecoverableTaskError("heartbeat failed", err) } default: var msg dmpkg.DanmakuExchange msg, err = dm.ReadExchange() if err != nil { - return NewRecoverableTaskError("failed to read exchange from server", err) + return common.NewRecoverableTaskError("failed to read exchange from server", err) } // the exchange may be compressed msg, err = msg.Inflate() if err != nil { - return NewUnrecoverableTaskError("failed to decompress server message", err) + return common.NewUnrecoverableTaskError("failed to decompress server message", err) } switch msg.Operation { @@ -122,7 +123,7 @@ func watch( err := json.Unmarshal(msg.Body, &info) if err != nil { logger.Error("Invalid JSON: \"%v\", exchange: %v", string(msg.Body), msg) - return NewUnrecoverableTaskError("invalid JSON response from server", err) + return common.NewUnrecoverableTaskError("invalid JSON response from server", err) } switch info.Command { case CommandLiveStart: -- cgit v1.2.3