目录

一致性hash算法

当我们在实现数据库分库分表,分布式缓存,服务器负载均衡时,如何将数据或请求均匀的分散到各个节点,并且在增减节点(服务器)时能使受到影响的数据量最小呢?

1 Hash取模

比随机放置,更好的实现方式就是对key进行hash运算后对节点总数N取模,这样的话,就使得数据能均匀分布到各节点上,但是容错性和扩展性并不好,一旦出现节点的增减,就要重新计算分配。

1
index = hash(key)/N

有没有一种算法使得节点在增减时,让受影响的数据范围尽可能的小

2 一致性Hash算法

一致性hash算法通过一个叫做hash环的数据结构来实现 ,他的范围在通常在【0,2*32-1】(一般多用于服务器的负载均衡,服务器ip是32位)

https://s1.ax1x.com/2020/11/02/BDbbKe.jpg

之后将节点(ip地址,hostname)和数据都hash运算后,映射到hash环上,这个hash环,可以用一个列表来表示

https://s1.ax1x.com/2020/11/02/BDbv5t.jpg

然后将数据存放在hash环顺时针方向上,最近的节点上

https://s1.ax1x.com/2020/11/02/BDbzPP.jpg

这样做的好处就是,当节点增减的时候,影响的数据范围就会变成局部:

  • 节点下线:下线节点上的数据迁移到下一节点
  • 新增节点:新增节点的下一节点上的数据,会部分迁移到新节点上

https://s1.ax1x.com/2020/11/02/BDbXVA.jpg

https://s1.ax1x.com/2020/11/02/BDbqDH.jpg

3 虚拟节点

  • 数据倾斜

到目前为止,算法仍然存在问题

https://s1.ax1x.com/2020/11/04/BcxCHP.png

例如上图所示,当节点过少的时候,会导致大部分数据都存放在sl上;另外增减节点的时候也只会影响该节点的下一节点,增加的数据和减少的数据不会负载到其他节点上,针对这个问题引入虚拟节点的概念,将每个节点虚拟为一组虚拟节点,将这些虚拟节点放置到hash环上,如果要确定节点服务器,需要先确定虚拟服务器,再找到真实的服务器。这个虚拟映射表可以用hash表来实现。

https://s1.ax1x.com/2020/11/02/BDbLbd.jpg

4.代码实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package consistenthash

import (
	"hash/crc32"
	"sort"
	"strconv"
)

type Hash func(data []byte) uint32
type Map struct {
	hash     Hash           // 依赖注入hash函数
	replicas int            // 虚拟节点的倍数
	keys     []int          // hash环
	hashMap  map[int]string // 虚拟节点映射表  map[虚拟节点键hash值]虚拟节点名称
}

//构造函数
func New(replicas int, fn Hash) *Map {
	m := &Map{
		hash:     fn,
		replicas: replicas,
		hashMap:  make(map[int]string),
	}

	// 没有自定义hash函数,使用默认
	if m.hash == nil {
		m.hash = crc32.ChecksumIEEE
	}
	return m
}

func (m *Map) Add(keys ...string) {
 	// 可一次性添加多个key
	for _, key := range keys {
		// 创建虚拟节点
		for i:=0;i<m.replicas;i++{
			hash := int(m.hash([]byte(strconv.Itoa(i) +key)))
			m.keys = append(m.keys, hash)
			m.hashMap[hash] = key //虚拟节点映射表
		}
	}
    // 确保hash环是有序的
	sort.Ints(m.keys)
}




func (m *Map)Get(key string)string{
	if len(key) == 0{
		return ""
	}
	
	hash := int(m.hash([]byte(key)))

	// 返回比hash大的最小index => 顺时针找到peer节点的index
	idx := sort.Search(len(m.keys), func(i int) bool {
		return m.keys[i] >= hash
	})
	
    // 返回真实的服务器
	return m.hashMap[m.keys[idx%len(m.keys)]]
}

go单元测试代码,为了验证这里将hash函数替换成了简单的字符串转int

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package consistenthash

import (
	"strconv"
	"testing"
)

func TestHashing(t *testing.T){
	hash := New(3, func(data []byte) uint32 {
		i,_ := strconv.Atoi(string(data)) // 自定义hash string => int
		return uint32(i)
	})

	// 2 12 22 ; 4 14 24 ; 6 16 26
	hash.Add("6","4", "2")
	
	testCases := map[string]string{
		"2":  "2",
		"11": "2",
		"23": "4",
		"27": "2", // 27-26 = 1 => 2
	}

	for k,v := range testCases{
		if hash.Get(k) != v {
			t.Errorf("Asking for %s, should have yielded %s", k, v)
		}
	}

	hash.Add("8")
	testCases["27"] = "8"


	for k,v := range testCases{
		if hash.Get(k) != v {
			t.Errorf("Asking for %s, should have yielded %s", k, v)
		}
	}
}