summaryrefslogtreecommitdiff
path: root/recording/task.go
blob: 328fe667651092a6ec32a410f6d0e47b83aaae17 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package recording

/*
In this file we implement task lifecycle management.
Concrete task works are done in the `runner.go` file.
*/

import (
	"context"
	"fmt"
	"github.com/keuin/slbr/common/retry"
	"github.com/keuin/slbr/logging"
	"sync/atomic"
	"time"
)

type TaskStatus uint32

const (
	StNotStarted TaskStatus = iota
	StWaiting
	StRecording
	StRestarting
	StStopped
)

var taskStatusStrings = map[TaskStatus]string{
	StNotStarted: "ready",
	StWaiting:    "waiting",
	StRecording:  "recording",
	StRestarting: "restarting",
	StStopped:    "stopped",
}

func (s TaskStatus) String() string {
	return taskStatusStrings[s]
}

var (
	ErrTaskIsAlreadyStarted = fmt.Errorf("task is already started")
	ErrTaskIsStopped        = fmt.Errorf("restarting a stopped task is not allowed")
)

// RunningTask is an augmented TaskConfig struct
// that contains volatile runtime information.
type RunningTask struct {
	TaskConfig
	// ctx: the biggest context this task uses. It may create children contexts.
	ctx context.Context
	// status: running status
	status atomic.Uint32
	// hookStarted: called asynchronously when the task is started. This won't be called when restarting.
	hookStarted func()
	// hookStopped: called asynchronously when the task is stopped. This won't be called when restarting.
	hookStopped func()
	// logger: where to print logs
	logger    logging.Logger
	roomTitle atomic.Pointer[string]
}

func (t *RunningTask) setStatus(status TaskStatus) {
	t.status.Store(uint32(status))
}

func (t *RunningTask) GetStatus() TaskStatus {
	return TaskStatus(t.status.Load())
}

func (t *RunningTask) GetRoomTitle() *string {
	return t.roomTitle.Load()
}

func NewRunningTask(
	config TaskConfig,
	ctx context.Context,
	hookStarted func(),
	hookStopped func(),
	logger logging.Logger,
) *RunningTask {
	t := &RunningTask{
		TaskConfig:  config,
		ctx:         ctx,
		hookStarted: hookStarted,
		hookStopped: hookStopped,
		logger:      logger,
	}
	t.setStatus(StNotStarted)
	return t
}

func (t *RunningTask) StartTask() error {
	st := t.GetStatus()
	switch st {
	case StNotStarted:
		// TODO real start
		go func() {
			defer func() { t.setStatus(StStopped) }()
			t.hookStarted()
			defer t.hookStopped()
			// do the task
			t.runTaskWithAutoRestart()
		}()
		return nil
	case StWaiting, StRecording:
		return ErrTaskIsAlreadyStarted
	case StRestarting:
		return ErrTaskIsAlreadyStarted
	case StStopped:
		// we don't allow starting a stopped task
		// because some state needs to be reset
		// just create a new task and run
		return ErrTaskIsStopped
	}
	panic(fmt.Errorf("invalid task status: %v", st))
}

func AutoRetryWithTask[T any](
	t *RunningTask,
	supplier func() (T, error),
) (T, error) {
	return retry.AutoRetry[T](
		t.ctx,
		supplier,
		t.Transport.MaxRetryTimes,
		time.Duration(t.Transport.RetryIntervalSeconds)*time.Second,
		&t.logger,
	)
}

func AutoRetryWithConfig[T any](
	ctx context.Context,
	logger logging.Logger,
	t *TaskConfig,
	supplier func() (T, error),
) (T, error) {
	return retry.AutoRetry[T](
		ctx,
		supplier,
		t.Transport.MaxRetryTimes,
		time.Duration(t.Transport.RetryIntervalSeconds)*time.Second,
		&logger,
	)
}