1 流程思路
程序重启后,需要对表中记录的全量同步,查询所有记录,序列化成protobuf文件,写入redis,然后从binglog的当前文件和当前位置利用canal读取binlog日志文件同步增量数据到MySQL
2 Binlog
2.1 是什么
binlog是Mysql sever层维护的一种二进制日志,其主要是用来记录对mysql数据更新或潜在发生更新的SQL语句,并以"事务"的形式保存在磁盘中
2.2 有什么用
- 主从复制:MySQL Replication在Master端开启binlog,Master把它的二进制日志传递给slaves并回放来达到master-slave数据一致的目的
- 数据恢复:通过mysqlbinlog工具恢复数据
- 增量备份
2.3 开启和配置Binlog
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
# 1.将msyql的配置文件挂载出来
docker run -d --name mysql
-p 3306:3306
-e MYSQL_ROOT_PASSWORD=123456
-v /etc/mysql:/etc/mysql
-v /opt/mysql:/var/lib/mysql
-v /etc/localtime:/etc/localtime
mysql:5.7
# 2.查看binlog日志是否开启
show variables like '%log_bin%';
# 3.设置binlog位置,
# ubuntu /etc/mysql/mysql.conf.d/mysqld.cnf
# centos /etc/my.cnf
log-bin=/var/lib/mysql/mysql-bin
server-id=123454
# 4.查询binlog文件和位置信息
show master status;
# Tip 对于已经存在的docker容器可以进入容器,安装vim编辑配置文件
# ubuntu
apt-get update
apt-get install vim
# centos
yum install vim
# 配置完成后重启容器
|
3 canal库的使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
//监听数据记录
func (h *MyEventHandler) OnRow(ev *canal.RowsEvent) error {
//库名,表名,行为,数据记录,同步位置
//record := fmt.Sprintf("%v %v %s %v %v\n", ev.Table.Schema, ev.Table.Name, ev.Action, ev.Rows, ev.Header)
// 1. 数据库/数据表过滤 只监听想要的表
matchdb, err := regexp.Match(`client_\d+`, []byte(ev.Table.Schema))
if err != nil {
log.Fatal(err.Error())
}
matchtb, err := regexp.Match(`personnel_feature`, []byte(ev.Table.Name))
if err != nil {
log.Fatal(err.Error())
}
// 只监听空间库下面的personnel_feature表
if !matchdb || !matchtb {
return nil
}
// 增/删/改
op := ev.Action
if op == "delete" {
var spacename, id string
for columnIndex, currColumn := range ev.Table.Columns {
spacename = ev.Table.Schema
if currColumn.Name == "id" {
tmp := ev.Rows[len(ev.Rows)-1][columnIndex].(int64)
id = strconv.FormatInt(tmp, 10)
}
}
redis.DelData(spacename, id)
} else {
// insert / update
mid := model.Midlayer{}
for columnIndex, currColumn := range ev.Table.Columns {
mid.SpaceName = ev.Table.Schema
.......
redis.SetData(mid)
}
return nil
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
//参考 https://github.com/gitstliu/MysqlToAll 里面的获取字段和值的方法
func (this *CommonEventHandler) OnRow(e *canal.RowsEvent) error {
log4go.Debug("OnRow")
entity := &common.RawLogEntity{}
entity.Action = e.Action
entity.Rows = e.Rows
entity.TableName = e.Table.Name
entity.Header = []string{}
entity.HeaderMap = map[string]int{}
entity.ValueMap = map[string]interface{}{}
for columnIndex, currColumn := range e.Table.Columns {
entity.Header = append(entity.Header, currColumn.Name)
entity.HeaderMap[currColumn.Name] = columnIndex
entity.ValueMap[currColumn.Name] = e.Rows[len(e.Rows)-1][columnIndex]
}
log4go.Debug(entity)
this.CurrOutput.Write(entity)
return nil
}
|
参考博客
4 性能测试
操作 |
耗时 |
1.网络下载获取3组特征值;2.构造对象并序列化成protocbuf;3.存入redis |
17m40.595323274ss |
从redis中取值并反序列化为对象 |
405.893165mss |