1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
|
package recording
/*
In this file we implement task lifecycle management.
Concrete task works are done in the `runner.go` file.
*/
import (
"context"
"fmt"
"github.com/keuin/slbr/common/retry"
"github.com/keuin/slbr/logging"
"time"
)
type TaskStatus int
const (
StNotStarted TaskStatus = iota
StRunning
StRestarting
StStopped
)
var taskStatusStrings = map[TaskStatus]string{
StNotStarted: "ready",
StRunning: "running",
StRestarting: "restarting",
StStopped: "stopped",
}
func (s TaskStatus) String() string {
return taskStatusStrings[s]
}
var (
ErrTaskIsAlreadyStarted = fmt.Errorf("task is already started")
ErrTaskIsStopped = fmt.Errorf("restarting a stopped task is not allowed")
)
// RunningTask is an augmented TaskConfig struct
// that contains volatile runtime information.
type RunningTask struct {
TaskConfig
// ctx: the biggest context this task uses. It may create children contexts.
ctx context.Context
// status: running status
status TaskStatus
// hookStarted: called asynchronously when the task is started. This won't be called when restarting.
hookStarted func()
// hookStopped: called asynchronously when the task is stopped. This won't be called when restarting.
hookStopped func()
// logger: where to print logs
logger logging.Logger
}
func NewRunningTask(
config TaskConfig,
ctx context.Context,
hookStarted func(),
hookStopped func(),
logger logging.Logger,
) RunningTask {
return RunningTask{
TaskConfig: config,
ctx: ctx,
status: StNotStarted,
hookStarted: hookStarted,
hookStopped: hookStopped,
logger: logger,
}
}
func (t *RunningTask) StartTask() error {
st := t.status
switch st {
case StNotStarted:
// TODO real start
go func() {
defer func() { t.status = StStopped }()
t.hookStarted()
defer t.hookStopped()
// do the task
t.runTaskWithAutoRestart()
}()
return nil
case StRunning:
return ErrTaskIsAlreadyStarted
case StRestarting:
return ErrTaskIsAlreadyStarted
case StStopped:
// we don't allow starting a stopped task
// because some state needs to be reset
// just create a new task and run
return ErrTaskIsStopped
}
panic(fmt.Errorf("invalid task status: %v", st))
}
func AutoRetryWithTask[T any](
t *RunningTask,
supplier func() (T, error),
) (T, error) {
return retry.AutoRetry[T](
t.ctx,
supplier,
t.Transport.MaxRetryTimes,
time.Duration(t.Transport.RetryIntervalSeconds)*time.Second,
&t.logger,
)
}
func AutoRetryWithConfig[T any](
ctx context.Context,
logger logging.Logger,
t *TaskConfig,
supplier func() (T, error),
) (T, error) {
return retry.AutoRetry[T](
ctx,
supplier,
t.Transport.MaxRetryTimes,
time.Duration(t.Transport.RetryIntervalSeconds)*time.Second,
&logger,
)
}
|