当我们在实现数据库分库分表,分布式缓存,服务器负载均衡时,如何将数据或请求均匀的分散到各个节点,并且在增减节点(服务器)时能使受到影响的数据量最小呢?
1 Hash取模
比随机放置,更好的实现方式就是对key
进行hash
运算后对节点总数N
取模,这样的话,就使得数据能均匀分布到各节点上,但是容错性和扩展性并不好,一旦出现节点的增减,就要重新计算分配。
有没有一种算法使得节点在增减时,让受影响的数据范围尽可能的小
2 一致性Hash算法
一致性hash
算法通过一个叫做hash
环的数据结构来实现 ,他的范围在通常在【0,2*32-1】(一般多用于服务器的负载均衡,服务器ip
是32位)
之后将节点(ip
地址,hostname
)和数据都hash
运算后,映射到hash
环上,这个hash
环,可以用一个列表
来表示
然后将数据存放在hash
环顺时针方向上,最近的节点上
这样做的好处就是,当节点增减的时候,影响的数据范围就会变成局部:
- 节点下线:下线节点上的数据迁移到下一节点
- 新增节点:新增节点的下一节点上的数据,会部分迁移到新节点上
3 虚拟节点
到目前为止,算法仍然存在问题
例如上图所示,当节点过少的时候,会导致大部分数据都存放在sl
上;另外增减节点的时候也只会影响该节点的下一节点,增加的数据和减少的数据不会负载到其他节点上,针对这个问题引入虚拟节点的概念,将每个节点虚拟为一组虚拟节点,将这些虚拟节点放置到hash
环上,如果要确定节点服务器,需要先确定虚拟服务器,再找到真实的服务器。这个虚拟映射表可以用hash
表来实现。
4.代码实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
|
package consistenthash
import (
"hash/crc32"
"sort"
"strconv"
)
type Hash func(data []byte) uint32
type Map struct {
hash Hash // 依赖注入hash函数
replicas int // 虚拟节点的倍数
keys []int // hash环
hashMap map[int]string // 虚拟节点映射表 map[虚拟节点键hash值]虚拟节点名称
}
//构造函数
func New(replicas int, fn Hash) *Map {
m := &Map{
hash: fn,
replicas: replicas,
hashMap: make(map[int]string),
}
// 没有自定义hash函数,使用默认
if m.hash == nil {
m.hash = crc32.ChecksumIEEE
}
return m
}
func (m *Map) Add(keys ...string) {
// 可一次性添加多个key
for _, key := range keys {
// 创建虚拟节点
for i:=0;i<m.replicas;i++{
hash := int(m.hash([]byte(strconv.Itoa(i) +key)))
m.keys = append(m.keys, hash)
m.hashMap[hash] = key //虚拟节点映射表
}
}
// 确保hash环是有序的
sort.Ints(m.keys)
}
func (m *Map)Get(key string)string{
if len(key) == 0{
return ""
}
hash := int(m.hash([]byte(key)))
// 返回比hash大的最小index => 顺时针找到peer节点的index
idx := sort.Search(len(m.keys), func(i int) bool {
return m.keys[i] >= hash
})
// 返回真实的服务器
return m.hashMap[m.keys[idx%len(m.keys)]]
}
|
go
单元测试代码,为了验证这里将hash
函数替换成了简单的字符串转int
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
package consistenthash
import (
"strconv"
"testing"
)
func TestHashing(t *testing.T){
hash := New(3, func(data []byte) uint32 {
i,_ := strconv.Atoi(string(data)) // 自定义hash string => int
return uint32(i)
})
// 2 12 22 ; 4 14 24 ; 6 16 26
hash.Add("6","4", "2")
testCases := map[string]string{
"2": "2",
"11": "2",
"23": "4",
"27": "2", // 27-26 = 1 => 2
}
for k,v := range testCases{
if hash.Get(k) != v {
t.Errorf("Asking for %s, should have yielded %s", k, v)
}
}
hash.Add("8")
testCases["27"] = "8"
for k,v := range testCases{
if hash.Get(k) != v {
t.Errorf("Asking for %s, should have yielded %s", k, v)
}
}
}
|