summaryrefslogtreecommitdiff
path: root/yggdrasil/muxserver.go
diff options
context:
space:
mode:
authorKeuin <[email protected]>2024-02-12 18:16:16 +0800
committerKeuin <[email protected]>2024-02-12 18:18:45 +0800
commita89cbe5a93aede3703cd5981ea71827b55db0866 (patch)
tree165089cb6c94e00c5f00d57da1a83e84c46722cb /yggdrasil/muxserver.go
initial version
Diffstat (limited to 'yggdrasil/muxserver.go')
-rw-r--r--yggdrasil/muxserver.go94
1 files changed, 94 insertions, 0 deletions
diff --git a/yggdrasil/muxserver.go b/yggdrasil/muxserver.go
new file mode 100644
index 0000000..7db4021
--- /dev/null
+++ b/yggdrasil/muxserver.go
@@ -0,0 +1,94 @@
+package yggdrasil
+
+import (
+ "fmt"
+ "github.com/avast/retry-go"
+ "github.com/rs/zerolog/log"
+ "github.com/samber/lo"
+ "github.com/samber/mo"
+ "github.com/sourcegraph/conc"
+ "strings"
+ "time"
+)
+
+const (
+ retryInterval = 100 * time.Millisecond
+ maxRetryTimes = 3
+)
+
+type muxServer struct {
+ subServers []Server
+}
+
+func (m muxServer) Name() string {
+ return "muxServer[" + strings.Join(lo.Map(m.subServers, func(item Server, _ int) string {
+ return item.Name()
+ }), ", ") + "]"
+}
+
+func (m muxServer) HasJoined(username string, serverID string) (*HasJoinedResponse, error) {
+ var wg conc.WaitGroup
+ type Ret = *HasJoinedResponse
+ results := make(chan mo.Result[Ret], len(m.subServers))
+ for _, s := range m.subServers {
+ s := s
+ wg.Go(func() {
+ var resp *HasJoinedResponse
+ err := retry.Do(
+ func() error {
+ var err error
+ resp, err = s.HasJoined(username, serverID)
+ return err
+ },
+ retry.Delay(retryInterval),
+ retry.Attempts(maxRetryTimes),
+ )
+ if err != nil {
+ results <- mo.Err[Ret](fmt.Errorf("call hasJoined on server `%v` failed: %w",
+ s.Name(), err))
+ } else {
+ results <- mo.Ok[Ret](resp)
+ }
+ })
+ }
+
+ chPanic := make(chan error, 1)
+ go func() {
+ // wait for all async tasks to finish in another async goroutine
+ // to allow the main request return as early as possible
+ r := wg.WaitAndRecover()
+ close(results)
+ if r != nil {
+ chPanic <- r.AsError()
+ }
+ close(chPanic)
+ }()
+
+ var last *Ret
+ for r := range results {
+ r, err := r.Get()
+ if err != nil {
+ log.Error().Err(err).Msg("hasJoined failed")
+ continue
+ }
+ last = &r
+ // return the first positive result
+ if r.HasJoined() {
+ return r, nil
+ }
+ }
+ if last == nil {
+ // no data generated, all async tasks panicked
+ err := <-chPanic
+ log.Error().Err(err).Msg("all hasJoined async query panicked")
+ return nil, err
+ } else {
+ return *last, nil
+ }
+}
+
+func NewMuxServer(servers ...Server) Server {
+ return muxServer{
+ subServers: servers,
+ }
+}