summaryrefslogtreecommitdiff
path: root/yggdrasil/muxserver.go
blob: 7db40215eceb770905eb1067ecbc77d0e296fd79 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package yggdrasil

import (
	"fmt"
	"github.com/avast/retry-go"
	"github.com/rs/zerolog/log"
	"github.com/samber/lo"
	"github.com/samber/mo"
	"github.com/sourcegraph/conc"
	"strings"
	"time"
)

const (
	retryInterval = 100 * time.Millisecond
	maxRetryTimes = 3
)

type muxServer struct {
	subServers []Server
}

func (m muxServer) Name() string {
	return "muxServer[" + strings.Join(lo.Map(m.subServers, func(item Server, _ int) string {
		return item.Name()
	}), ", ") + "]"
}

func (m muxServer) HasJoined(username string, serverID string) (*HasJoinedResponse, error) {
	var wg conc.WaitGroup
	type Ret = *HasJoinedResponse
	results := make(chan mo.Result[Ret], len(m.subServers))
	for _, s := range m.subServers {
		s := s
		wg.Go(func() {
			var resp *HasJoinedResponse
			err := retry.Do(
				func() error {
					var err error
					resp, err = s.HasJoined(username, serverID)
					return err
				},
				retry.Delay(retryInterval),
				retry.Attempts(maxRetryTimes),
			)
			if err != nil {
				results <- mo.Err[Ret](fmt.Errorf("call hasJoined on server `%v` failed: %w",
					s.Name(), err))
			} else {
				results <- mo.Ok[Ret](resp)
			}
		})
	}

	chPanic := make(chan error, 1)
	go func() {
		// wait for all async tasks to finish in another async goroutine
		// to allow the main request return as early as possible
		r := wg.WaitAndRecover()
		close(results)
		if r != nil {
			chPanic <- r.AsError()
		}
		close(chPanic)
	}()

	var last *Ret
	for r := range results {
		r, err := r.Get()
		if err != nil {
			log.Error().Err(err).Msg("hasJoined failed")
			continue
		}
		last = &r
		// return the first positive result
		if r.HasJoined() {
			return r, nil
		}
	}
	if last == nil {
		// no data generated, all async tasks panicked
		err := <-chPanic
		log.Error().Err(err).Msg("all hasJoined async query panicked")
		return nil, err
	} else {
		return *last, nil
	}
}

func NewMuxServer(servers ...Server) Server {
	return muxServer{
		subServers: servers,
	}
}