1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
|
package recording
/*
In this file we implement task lifecycle management.
Concrete task works are done in the `runner.go` file.
*/
import (
"context"
"fmt"
"github.com/keuin/slbr/common/retry"
"github.com/keuin/slbr/logging"
"sync/atomic"
"time"
)
type TaskStatus int
const (
StNotStarted TaskStatus = iota
StWaiting
StRecording
StRestarting
StStopped
)
var taskStatusStrings = map[TaskStatus]string{
StNotStarted: "ready",
StWaiting: "waiting",
StRecording: "recording",
StRestarting: "restarting",
StStopped: "stopped",
}
func (s TaskStatus) String() string {
return taskStatusStrings[s]
}
var (
ErrTaskIsAlreadyStarted = fmt.Errorf("task is already started")
ErrTaskIsStopped = fmt.Errorf("restarting a stopped task is not allowed")
)
// RunningTask is an augmented TaskConfig struct
// that contains volatile runtime information.
type RunningTask struct {
TaskConfig
// ctx: the biggest context this task uses. It may create children contexts.
ctx context.Context
// status: running status
status TaskStatus
// hookStarted: called asynchronously when the task is started. This won't be called when restarting.
hookStarted func()
// hookStopped: called asynchronously when the task is stopped. This won't be called when restarting.
hookStopped func()
// logger: where to print logs
logger logging.Logger
roomTitle atomic.Pointer[string]
}
func (t *RunningTask) GetStatus() TaskStatus {
return t.status
}
func (t *RunningTask) GetRoomTitle() *string {
return t.roomTitle.Load()
}
func NewRunningTask(
config TaskConfig,
ctx context.Context,
hookStarted func(),
hookStopped func(),
logger logging.Logger,
) *RunningTask {
return &RunningTask{
TaskConfig: config,
ctx: ctx,
status: StNotStarted,
hookStarted: hookStarted,
hookStopped: hookStopped,
logger: logger,
}
}
func (t *RunningTask) StartTask() error {
st := t.status
switch st {
case StNotStarted:
// TODO real start
go func() {
defer func() { t.status = StStopped }()
t.hookStarted()
defer t.hookStopped()
// do the task
t.runTaskWithAutoRestart()
}()
return nil
case StWaiting, StRecording:
return ErrTaskIsAlreadyStarted
case StRestarting:
return ErrTaskIsAlreadyStarted
case StStopped:
// we don't allow starting a stopped task
// because some state needs to be reset
// just create a new task and run
return ErrTaskIsStopped
}
panic(fmt.Errorf("invalid task status: %v", st))
}
func AutoRetryWithTask[T any](
t *RunningTask,
supplier func() (T, error),
) (T, error) {
return retry.AutoRetry[T](
t.ctx,
supplier,
t.Transport.MaxRetryTimes,
time.Duration(t.Transport.RetryIntervalSeconds)*time.Second,
&t.logger,
)
}
func AutoRetryWithConfig[T any](
ctx context.Context,
logger logging.Logger,
t *TaskConfig,
supplier func() (T, error),
) (T, error) {
return retry.AutoRetry[T](
ctx,
supplier,
t.Transport.MaxRetryTimes,
time.Duration(t.Transport.RetryIntervalSeconds)*time.Second,
&logger,
)
}
|