-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathconsistentHash.go
More file actions
198 lines (176 loc) · 5.39 KB
/
consistentHash.go
File metadata and controls
198 lines (176 loc) · 5.39 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
// Package consistentHash implements a consistent hashing algorithm
package consistentHash
import (
"errors"
"fmt"
"github.com/spaolacci/murmur3"
"sort"
"strconv"
"sync"
)
var (
// ErrNoMembers occurs when trying to hash before any members are added
ErrNoMembers = errors.New("no members added")
// ErrNotEnoughMembers occurs when more members are asked for than are available
ErrNotEnoughMembers = errors.New("not enough members")
// ErrNotAvailableOnceMembersAdded occurs if any attempt is made to modify the vnode account once members are added
ErrNotAvailableOnceMembersAdded = errors.New("not available once members are added")
// ErrInvalidVnodeCount occurs if the vnode count is set to 0 or lower
ErrInvalidVnodeCount = errors.New("vnodeCount must be > 0")
)
const (
// DefaultVnodeCount is a tradeoff of memory and ~ log(N) speed versus how well the hash spreads
DefaultVnodeCount = 200
)
type vnode struct {
token uint64
address string
}
type vnodes []vnode
// ConsistentHash holds the internal data structures for the hashing
type ConsistentHash struct {
vnodes vnodes
nodes map[string]bool
vnodeCount int
mutex sync.Mutex
}
// New creates a new consistentHash pointer and initializes all the necessary fields
func New() *ConsistentHash {
ch := new(ConsistentHash)
ch.nodes = make(map[string]bool)
ch.vnodes = make(vnodes, 0)
ch.vnodeCount = DefaultVnodeCount
return ch
}
// dumpVnodes prints the vnode slice to stdout, only useful for debugging
func (ch *ConsistentHash) dumpVnodes() {
for _, vn := range ch.vnodes {
fmt.Printf("%v\n", vn)
}
}
// addressToKey converts an address and an integer to a []byte that we are sure won't be duplicated with a later valid IP
// or hostname
func addressToKey(address string, increment int) []byte {
return []byte(strconv.Itoa(increment) + "=" + address)
}
// SetVnodeCount sets the number of vnodes that will be added for every server
// This must be called before any Add() calls
func (ch *ConsistentHash) SetVnodeCount(count int) error {
if len(ch.nodes) > 0 {
return ErrNotAvailableOnceMembersAdded
}
if count < 1 {
return ErrInvalidVnodeCount
}
ch.vnodeCount = count
return nil
}
// Add adds a server to the consistentHash
func (ch *ConsistentHash) Add(address string) {
ch.mutex.Lock()
defer ch.mutex.Unlock()
// if the address has already been added, there is no work to do
if _, found := ch.nodes[address]; found {
return
}
ch.nodes[address] = true
for i := 0; i < ch.vnodeCount; i++ {
token := murmur3.Sum64(addressToKey(address, i))
newVnode := vnode{token, address}
ch.insertVnode(newVnode)
}
}
// Remove removes a server from the consistentHash
func (ch *ConsistentHash) Remove(address string) {
ch.mutex.Lock()
defer ch.mutex.Unlock()
if _, found := ch.nodes[address]; !found {
return
}
for i := 0; i < ch.vnodeCount; i++ {
token := murmur3.Sum64(addressToKey(address, i))
ch.removeVnode(token)
}
delete(ch.nodes, address)
}
func (v *vnode) String() string {
return fmt.Sprintf("token=%d address=%s", v.token, v.address)
}
// Get finds the closest member for a given key
func (ch *ConsistentHash) Get(key []byte) (string, error) {
ch.mutex.Lock()
defer ch.mutex.Unlock()
if len(ch.vnodes) == 0 {
return "", ErrNoMembers
}
token := murmur3.Sum64(key)
return ch.vnodes[ch.closest(token)].address, nil
}
// Get2 finds the closest 2 members for a given key and is just a helper function
// calling into GetN
func (ch *ConsistentHash) Get2(key []byte) (string, string, error) {
// don't use the mutex since GetN will use it
servers, err := ch.GetN(key, 2)
if err != nil {
return "", "", err
}
return servers[0], servers[1], nil
}
// GetN finds the closest N members for a given key
func (ch *ConsistentHash) GetN(key []byte, count int) ([]string, error) {
ch.mutex.Lock()
defer ch.mutex.Unlock()
if len(ch.nodes) < count {
return nil, ErrNotEnoughMembers
}
token := murmur3.Sum64(key)
addressMap := make(map[string]bool)
addresses := make([]string, count)
index := ch.closest(token)
found := 0
for found < count {
if exists := addressMap[ch.vnodes[index].address]; !exists {
addressMap[ch.vnodes[index].address] = true
addresses[found] = ch.vnodes[index].address
found++
}
index++
if index == len(ch.vnodes) {
index = 0
}
}
return addresses, nil
}
// removeVnode removes a vnode from the ring
func (ch *ConsistentHash) removeVnode(token uint64) {
index := ch.index(token)
if index == len(ch.vnodes) {
ch.vnodes = ch.vnodes[:index-1]
return
}
ch.vnodes = append(ch.vnodes[:index], ch.vnodes[index+1:]...)
}
// insertVnode adds a vnode into the appropriate location of the ring
func (ch *ConsistentHash) insertVnode(vn vnode) {
index := ch.index(vn.token)
ch.vnodes = append(ch.vnodes[:index], append(vnodes{vn}, ch.vnodes[index:]...)...)
}
// index returns the position where we should insert a new vnode
// differs from closest in that if the new token is bigger than the current highest token
// the index returned should be the end
func (ch *ConsistentHash) index(token uint64) int {
index := sort.Search(len(ch.vnodes), func(i int) bool {
return ch.vnodes[i].token >= token
})
return index
}
// closest returns the index of the vnode greater than or equal to the token
func (ch *ConsistentHash) closest(token uint64) int {
index := sort.Search(len(ch.vnodes), func(i int) bool {
return ch.vnodes[i].token >= token
})
if index == len(ch.vnodes) {
index = 0
}
return index
}