diff --git a/serve.go b/serve.go index de09dd6d..d3abb42b 100644 --- a/serve.go +++ b/serve.go @@ -58,7 +58,11 @@ func (vs *version) serveProxied(w http.ResponseWriter, r *http.Request, // Shuffle the peers, so we try them in a random order. // TODO: We don't want to blacklist nodes, but we can weight them lower - peers := shuffle(vs.partitions.FindPeers(partition)) + rawPeers, disapperedNodes := vs.partitions.FindPeers(partition) + if len(rawPeers) < vs.sequins.config.Sharding.Replication { + rawPeers = append(rawPeers, disapperedNodes...) + } + peers := shuffle(rawPeers) if len(peers) == 0 { log.Printf("No peers available for /%s/%s (version %s)", vs.db.name, key, vs.name) w.WriteHeader(http.StatusBadGateway) @@ -70,7 +74,12 @@ func (vs *version) serveProxied(w http.ResponseWriter, r *http.Request, log.Println("Trying alternate partition for pathological key", key) resp.Body.Close() - alternatePeers := shuffle(vs.partitions.FindPeers(alternatePartition)) + rawPeers, disapperedNodes = vs.partitions.FindPeers(alternatePartition) + if len(rawPeers) < vs.sequins.config.Sharding.Replication { + rawPeers = append(rawPeers, disapperedNodes...) + } + + alternatePeers := shuffle(rawPeers) resp, peer, err = vs.proxy(r, alternatePeers) } diff --git a/sharding/partitions.go b/sharding/partitions.go index cc9e3d6c..526592c9 100644 --- a/sharding/partitions.go +++ b/sharding/partitions.go @@ -33,6 +33,7 @@ type Partitions struct { selected map[int]bool local map[int]bool remote map[int][]string + disappeared map[int][]string numMissing int readyClosed bool shouldAdvertise bool @@ -55,6 +56,7 @@ func WatchPartitions(zkWatcher *zk.Watcher, peers *Peers, db, version string, nu replication: replication, local: make(map[int]bool), remote: make(map[int][]string), + disappeared: make(map[int][]string, 1024), } p.pickLocal() @@ -69,6 +71,19 @@ func WatchPartitions(zkWatcher *zk.Watcher, peers *Peers, db, version string, nu return p } +// dedupe dedupelicates elements in a slice of strings. +func dedupe(nodes []string) []string { + found := map[string]bool{} + dedupedNodes := make([]string, 0, len(nodes)) + for _, node := range nodes { + if !found[node] { + found[node] = true + dedupedNodes = append(dedupedNodes, node) + } + } + return dedupedNodes +} + // pickLocal selects which partitions are local by iterating through // them all, and checking the hashring to see if this peer is one of the // replicas. @@ -106,18 +121,23 @@ func (p *Partitions) sync(updates chan []string) { } } -// FindPeers returns the list of peers who have the given partition available. -func (p *Partitions) FindPeers(partition int) []string { - if p.peers == nil { - return nil - } - +// FindPeers returns the list of peers who have the given partition available, +// It also returns a list of disappeared peers, +// that are no longer in Zookeeper +func (p *Partitions) FindPeers(partition int) ([]string, []string) { p.lock.RLock() defer p.lock.RUnlock() + disappearedPeers := make([]string, 1024) + copy(disappearedPeers, p.disappeared[partition]) + + if p.peers == nil { + return nil, disappearedPeers + } + peers := make([]string, len(p.remote[partition])) copy(peers, p.remote[partition]) - return peers + return peers, disappearedPeers } // Update updates the list of local partitions to the given list. @@ -228,6 +248,25 @@ func (p *Partitions) updateRemote(nodes []string) { } } + for partitionId, partition := range p.remote { + disappearedPeers := make([]string, len(partition)) + for _, oldPeer := range partition { + found := false + for _, newPeer := range remote[partitionId] { + if newPeer == oldPeer { + found = true + } + } + if !found { + disappearedPeers = append(disappearedPeers, oldPeer) + } + } + p.disappeared[partitionId] = dedupe(append(disappearedPeers, p.disappeared[partitionId]...)) + if len(p.disappeared[partitionId]) >= 1024 { + p.disappeared[partitionId] = p.disappeared[partitionId][:1024] + } + } + p.remote = remote p.updateMissing() } diff --git a/sharding/partitions_test.go b/sharding/partitions_test.go new file mode 100644 index 00000000..c1225164 --- /dev/null +++ b/sharding/partitions_test.go @@ -0,0 +1,23 @@ +package sharding + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestDedupeRandom(t *testing.T) { + dupe := []string{"4", "1", "2", "1", "3", "2"} + expected := []string{"4", "1", "2", "3"} + + deduped := dedupe(dupe) + assert.Equal(t, expected, deduped) +} + +func TestDedupe(t *testing.T) { + dupe := []string{"1", "1", "1", "2", "2", "3", "3", "3", "4", "5", "5"} + expected := []string{"1", "2", "3", "4", "5"} + + deduped := dedupe(dupe) + assert.Equal(t, expected, deduped) +}