Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 21 additions & 12 deletions robustsession/robustsession.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,10 @@ func CopyNetworks() []*Network {
// This type is only exported so that you can expose internal network state
// for debugging via CopyNetworks().
type Network struct {
servers []string
mu sync.RWMutex
backoff map[string]backoffState
servers []string
idxOffset int
mu sync.RWMutex
backoff map[string]backoffState
}

func (n *Network) String() string {
Expand All @@ -112,9 +113,9 @@ func (n *Network) String() string {
}
lines = append(lines, fmt.Sprintf("\tserver %v (backoff: next possible reconnect: %v)", srv, reconnect))
}
return fmt.Sprintf("[network %p with %d servers]\n",
n,
len(n.servers)) + strings.Join(lines, "\n")
return fmt.Sprintf("[network %p with %d servers]\n", n, len(n.servers)) +
fmt.Sprintf("index offset to the next server=%v\n", n.idxOffset) +
strings.Join(lines, "\n")
}

func newNetwork(networkname string) (*Network, error) {
Expand Down Expand Up @@ -166,8 +167,9 @@ func newNetwork(networkname string) (*Network, error) {
}

return &Network{
servers: servers,
backoff: make(map[string]backoffState),
servers: servers,
idxOffset: 0,
backoff: make(map[string]backoffState),
}, nil
}

Expand All @@ -180,7 +182,8 @@ func (n *Network) server(random bool) string {

for {
soonest := time.Duration(math.MaxInt64)
// Try to use a random server, but fall back to using the next

// If random, try to use a random server, but fall back to using the next
// available server in case the randomly picked server is unhealthy.
if random {
server := n.servers[rand.Intn(len(n.servers))]
Expand All @@ -189,10 +192,14 @@ func (n *Network) server(random bool) string {
return server
}
}
for _, server := range n.servers {
wait := n.backoff[server].next.Sub(time.Now())
// Try to use the next available server, searching in offset order
// (modified round robin).
for i := 0; i < len(n.servers); i++ {
idx := (i + n.idxOffset) % len(n.servers)
wait := n.backoff[n.servers[idx]].next.Sub(time.Now())
if wait <= 0 {
return server
n.idxOffset = (idx + 1) % len(n.servers)
return n.servers[idx]
}
if wait < soonest {
soonest = wait
Expand All @@ -212,6 +219,7 @@ func (n *Network) setServers(servers []string) {

// TODO(secure): we should clean up n.backoff from servers which no longer exist
n.servers = servers
n.idxOffset = 0
}

// prefer moves (or adds, if it doesn't already exist) the specified server to
Expand All @@ -229,6 +237,7 @@ func (n *Network) prefer(server string) {
}
}
n.servers = servers
n.idxOffset = 0
}

func (n *Network) failed(server string) {
Expand Down