From 110301a975e43739192577166d089e28c22ae266 Mon Sep 17 00:00:00 2001 From: Keuin Date: Sat, 29 Jul 2023 21:20:12 +0800 Subject: Add API server --- agent.go | 23 +++++++++++++++++++++++ api/agent/agent.go | 19 +++++++++++++++++++ api/server.go | 8 ++++++-- main.go | 18 +++++++++++++++++- recording/runner.go | 7 ++++++- recording/task.go | 16 +++++++++++++--- 6 files changed, 84 insertions(+), 7 deletions(-) create mode 100644 agent.go create mode 100644 api/agent/agent.go diff --git a/agent.go b/agent.go new file mode 100644 index 0000000..9f021f5 --- /dev/null +++ b/agent.go @@ -0,0 +1,23 @@ +package main + +import ( + "github.com/keuin/slbr/api/agent" + "github.com/keuin/slbr/recording" + "github.com/samber/lo" +) + +type agentImpl struct { + tasks *[]*recording.RunningTask +} + +func (a *agentImpl) GetTasks() []agent.TaskInfo { + return lo.Map(*a.tasks, func(t *recording.RunningTask, _ int) agent.TaskInfo { + return agent.TaskInfo{ + LiveRoom: agent.LiveRoomInfo{ + ID: t.RoomId, + Title: t.GetRoomTitle(), + }, + Status: t.GetStatus().String(), + } + }) +} diff --git a/api/agent/agent.go b/api/agent/agent.go new file mode 100644 index 0000000..52c5500 --- /dev/null +++ b/api/agent/agent.go @@ -0,0 +1,19 @@ +package agent + +import "github.com/keuin/slbr/types" + +type Agent interface { + GetTasks() []TaskInfo +} + +type TaskStatus string + +type LiveRoomInfo struct { + ID types.RoomId `json:"id"` + Title *string `json:"title"` +} + +type TaskInfo struct { + LiveRoom LiveRoomInfo `json:"live_room"` + Status string `json:"status"` +} diff --git a/api/server.go b/api/server.go index 4cfe8f4..32e3202 100644 --- a/api/server.go +++ b/api/server.go @@ -2,15 +2,19 @@ package api import ( "github.com/gofiber/fiber/v2" - "github.com/keuin/slbr/logging" + "github.com/keuin/slbr/api/agent" ) -func StartServer(logger logging.Logger, addr string) error { +func StartServer(addr string, a agent.Agent) error { app := fiber.New() app.Get("/", func(c *fiber.Ctx) error { return c.SendString("Hello, World!") }) + app.Get("/tasks", func(c *fiber.Ctx) error { + return c.JSON(a.GetTasks()) + }) + return app.Listen(addr) } diff --git a/main.go b/main.go index 90e0d6a..e50dbba 100644 --- a/main.go +++ b/main.go @@ -9,6 +9,7 @@ import ( "context" "fmt" "github.com/akamensky/argparse" + "github.com/keuin/slbr/api" "github.com/keuin/slbr/logging" "github.com/keuin/slbr/recording" "github.com/keuin/slbr/types" @@ -153,7 +154,7 @@ func getTasks() (tasks []recording.TaskConfig) { func main() { logger := log.Default() taskConfigs := getTasks() - tasks := make([]recording.RunningTask, len(taskConfigs)) + tasks := make([]*recording.RunningTask, len(taskConfigs)) wg := sync.WaitGroup{} ctxTasks, cancelTasks := context.WithCancel(context.Background()) @@ -170,6 +171,21 @@ func main() { } fmt.Println("") + apiAddr := os.Getenv("BIND_ADDR") + if apiAddr == "" { + apiAddr = ":8080" + } + apiAgent := &agentImpl{ + tasks: &tasks, + } + go func() { + logger.Println("Starting API server...") + err := api.StartServer(apiAddr, apiAgent) + if err != nil { + logger.Fatalf("Failed to start API server: %v", err) + } + }() + logger.Printf("Starting tasks...") for i := range tasks { diff --git a/recording/runner.go b/recording/runner.go index 9be78c9..7bf3790 100644 --- a/recording/runner.go +++ b/recording/runner.go @@ -175,7 +175,9 @@ func tryRunTask(t *RunningTask) error { var err error run := true for run { - err = record(t.ctx, bi, &t.TaskConfig, t.logger) + err = record(t.ctx, bi, &t.TaskConfig, t.logger, func(resp types.RoomProfileResponse) { + t.roomTitle.Store(&resp.Data.Title) + }) if err == nil { // live is ended t.logger.Info("The live is ended. Restarting current task...") @@ -247,6 +249,7 @@ func record( bi *bilibili.Bilibili, task *TaskConfig, logger logging.Logger, + profileConsumer func(types.RoomProfileResponse), ) error { logger.Info("Getting room profile...") @@ -265,6 +268,8 @@ func record( return errs.NewError(errs.GetRoomInfo, err) } + profileConsumer(profile) + logger.Info("Getting stream url...") urlInfo, err := AutoRetryWithConfig( ctx, diff --git a/recording/task.go b/recording/task.go index 26f3638..43b6678 100644 --- a/recording/task.go +++ b/recording/task.go @@ -10,6 +10,7 @@ import ( "fmt" "github.com/keuin/slbr/common/retry" "github.com/keuin/slbr/logging" + "sync/atomic" "time" ) @@ -51,7 +52,16 @@ type RunningTask struct { // hookStopped: called asynchronously when the task is stopped. This won't be called when restarting. hookStopped func() // logger: where to print logs - logger logging.Logger + logger logging.Logger + roomTitle atomic.Pointer[string] +} + +func (t *RunningTask) GetStatus() TaskStatus { + return t.status +} + +func (t *RunningTask) GetRoomTitle() *string { + return t.roomTitle.Load() } func NewRunningTask( @@ -60,8 +70,8 @@ func NewRunningTask( hookStarted func(), hookStopped func(), logger logging.Logger, -) RunningTask { - return RunningTask{ +) *RunningTask { + return &RunningTask{ TaskConfig: config, ctx: ctx, status: StNotStarted, -- cgit v1.2.3