1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
|
package yggdrasil
import (
"fmt"
"github.com/avast/retry-go"
"github.com/rs/zerolog/log"
"github.com/samber/lo"
"github.com/samber/mo"
"github.com/sourcegraph/conc"
"strings"
"time"
)
const (
retryInterval = 100 * time.Millisecond
maxRetryTimes = 3
)
type muxServer struct {
subServers []Server
}
func (m muxServer) Name() string {
return "muxServer[" + strings.Join(lo.Map(m.subServers, func(item Server, _ int) string {
return item.Name()
}), ", ") + "]"
}
func (m muxServer) HasJoined(username string, serverID string) (*HasJoinedResponse, error) {
var wg conc.WaitGroup
type Ret = *HasJoinedResponse
results := make(chan mo.Result[Ret], len(m.subServers))
for _, s := range m.subServers {
s := s
wg.Go(func() {
var resp *HasJoinedResponse
err := retry.Do(
func() error {
var err error
resp, err = s.HasJoined(username, serverID)
return err
},
retry.Delay(retryInterval),
retry.Attempts(maxRetryTimes),
)
if err != nil {
results <- mo.Err[Ret](fmt.Errorf("call hasJoined on server `%v` failed: %w",
s.Name(), err))
} else {
results <- mo.Ok[Ret](resp)
}
})
}
chPanic := make(chan error, 1)
go func() {
// wait for all async tasks to finish in another async goroutine
// to allow the main request return as early as possible
r := wg.WaitAndRecover()
close(results)
if r != nil {
chPanic <- r.AsError()
}
close(chPanic)
}()
var last *Ret
for r := range results {
r, err := r.Get()
if err != nil {
log.Error().Err(err).Msg("hasJoined failed")
continue
}
last = &r
// return the first positive result
if r.HasJoined() {
return r, nil
}
}
if last == nil {
// no data generated, all async tasks panicked
err := <-chPanic
log.Error().Err(err).Msg("all hasJoined async query panicked")
return nil, err
} else {
return *last, nil
}
}
func NewMuxServer(servers ...Server) Server {
return muxServer{
subServers: servers,
}
}
|