目录

同步mysql数据到redis

1 流程思路

程序重启后,需要对表中记录的全量同步,查询所有记录,序列化成protobuf文件,写入redis,然后从binglog的当前文件和当前位置利用canal读取binlog日志文件同步增量数据到MySQL

https://s1.ax1x.com/2020/09/11/wNZ2w9.png

2 Binlog

2.1 是什么

​ binlog是Mysql sever层维护的一种二进制日志,其主要是用来记录对mysql数据更新或潜在发生更新的SQL语句,并以"事务"的形式保存在磁盘中

2.2 有什么用
  • 主从复制:MySQL Replication在Master端开启binlog,Master把它的二进制日志传递给slaves并回放来达到master-slave数据一致的目的
  • 数据恢复:通过mysqlbinlog工具恢复数据
  • 增量备份
2.3 开启和配置Binlog
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# 1.将msyql的配置文件挂载出来
docker run -d --name mysql 
-p 3306:3306 
-e MYSQL_ROOT_PASSWORD=123456
-v /etc/mysql:/etc/mysql  
-v /opt/mysql:/var/lib/mysql 
-v /etc/localtime:/etc/localtime
mysql:5.7

# 2.查看binlog日志是否开启
show variables like '%log_bin%';


# 3.设置binlog位置,
# ubuntu /etc/mysql/mysql.conf.d/mysqld.cnf
# centos /etc/my.cnf

log-bin=/var/lib/mysql/mysql-bin
server-id=123454

# 4.查询binlog文件和位置信息

show master status;


# Tip 对于已经存在的docker容器可以进入容器,安装vim编辑配置文件

# ubuntu
apt-get update
apt-get install vim

# centos
yum install vim

# 配置完成后重启容器

3 canal库的使用

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
//监听数据记录
func (h *MyEventHandler) OnRow(ev *canal.RowsEvent) error {
	//库名,表名,行为,数据记录,同步位置
	//record := fmt.Sprintf("%v %v %s %v %v\n", ev.Table.Schema, ev.Table.Name, ev.Action, ev.Rows, ev.Header)

	// 1. 数据库/数据表过滤 只监听想要的表
	matchdb, err := regexp.Match(`client_\d+`, []byte(ev.Table.Schema))
	if err != nil {
		log.Fatal(err.Error())
	}
	matchtb, err := regexp.Match(`personnel_feature`, []byte(ev.Table.Name))
	if err != nil {
		log.Fatal(err.Error())
	}
	// 只监听空间库下面的personnel_feature表
	if !matchdb || !matchtb {
		return nil
	}

	// 增/删/改
	op := ev.Action
	if op == "delete" {
		var spacename, id string
		for columnIndex, currColumn := range ev.Table.Columns {
			spacename = ev.Table.Schema
			if currColumn.Name == "id" {
				tmp := ev.Rows[len(ev.Rows)-1][columnIndex].(int64)
				id = strconv.FormatInt(tmp, 10)
			}
		}
		redis.DelData(spacename, id)

	} else {
		// insert / update
		mid := model.Midlayer{}
		for columnIndex, currColumn := range ev.Table.Columns {
			mid.SpaceName = ev.Table.Schema
		.......
      
		redis.SetData(mid)
	}
	return nil
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
//参考 https://github.com/gitstliu/MysqlToAll 里面的获取字段和值的方法
func (this *CommonEventHandler) OnRow(e *canal.RowsEvent) error {
	log4go.Debug("OnRow")
	entity := &common.RawLogEntity{}
	entity.Action = e.Action 
	entity.Rows = e.Rows
	entity.TableName = e.Table.Name
	entity.Header = []string{}
	entity.HeaderMap = map[string]int{}
	entity.ValueMap = map[string]interface{}{}

	for columnIndex, currColumn := range e.Table.Columns {
		entity.Header = append(entity.Header, currColumn.Name)
		entity.HeaderMap[currColumn.Name] = columnIndex
		entity.ValueMap[currColumn.Name] = e.Rows[len(e.Rows)-1][columnIndex]
	}
	log4go.Debug(entity)
	this.CurrOutput.Write(entity)

	return nil
}

参考博客

4 性能测试

  • 数据量 2万
  • 每条数据大小 2.6k
操作 耗时
1.网络下载获取3组特征值;2.构造对象并序列化成protocbuf;3.存入redis 17m40.595323274ss
从redis中取值并反序列化为对象 405.893165mss