您好,登录后才能下订单哦!
这篇文章主要介绍了怎么使用canal+Kafka进行数据库同步操作的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇怎么使用canal+Kafka进行数据库同步操作文章都会有所收获,下面我们一起来看看吧。
平时工作中数据库是我们经常使用的,在微服务拆分的架构中,各服务拥有自己的数据库,所以常常会遇到服务之间数据通信的问题。比如,B 服务数据库的数据来源于A服务的数据库;A 服务的数据有变更操作时,需要同步到 B 服务中。
在代码逻辑中,有相关 A 服务数据写操作时,以调用接口的方式,调用 B 服务接口,B 服务再将数据写到新的数据库中。这种方式看似简单,但其实“坑”很多。在 A 服务代码逻辑中会增加大量这种调用接口同步的代码,增加了项目代码的复杂度,以后会越来越难维护。并且,接口调用的方式并不是一个稳定的方式,没有重试机制,没有同步位置记录,接口调用失败了怎么处理,突然的大量接口调用会产生的问题等,这些都要考虑并且在业务中处理。这里会有不少工作量。想到这里,就将这个方案排除了。
通过数据库的binlog
进行同步。这种解决方案,与 A 服务是独立的,不会和 A 服务有代码上的耦合。可以直接 TCP
连接进行传输数据,优于接口调用的方式。 这是一套成熟的生产解决方案,也有不少binlog
同步的中间件工具,所以我们关注的就是哪个工具能够更好的构建稳定、性能满足且易于高可用部署的方案。
经过调研,我们选择了canal。canal
是阿里巴巴 MySQL binlog
增量订阅&消费组件,已经有在生产上实践的例子,并且方便的支持和其他常用的中间件组件组合,比如kafka
,elasticsearch
等,也有了canal-go
go
语言的client
库,满足我们在go
上的需求。
Canal
连接到 A 数据库,模拟slave
canal-client
与Canal
建立连接,并订阅对应的数据库表
A 数据库发生变更写入到binlog
,Canal
向数据库发送dump
请求,获取binlog
并解析,发送解析后的数据给canal-client
canal-client
收到数据,将数据同步到新的数据库
Protocol Buffer
的序列化速度还是很快的。反序列化后得到的数据,是每一行的数据,按照字段名和字段的值的结构,放到一个数组中 代码简单示例:
func Handler(entry protocol.Entry) { var keys []string rowChange := &protocol.RowChange{} proto.Unmarshal(entry.GetStoreValue(), rowChange) if rowChange != nil { eventType := rowChange.GetEventType() for _, rowData := range rowChange.GetRowDatas() { // 遍历每一行数据 if eventType == protocol.EventType_DELETE || eventType == protocol.EventType_UPDATE { columns := rowData.GetBeforeColumns() // 得到更改前的所有字段属性 } else if eventType == protocol.EventType_INSERT { columns := rowData.GetAfterColumns() // 得到更后前的所有字段属性 } ...... } } }
为了高可用和更高的性能,我们会创建多个canal-client
构成一个集群,来进行解析并同步到新的数据库。这里就出现了一个比较重要的问题,如何保证canal-client
集群解析消费binlog
的顺序性呢?
我们使用的binlog
是row
模式。每一个写操作都会产生一条binlog
日志。 举个简单的例子:插入了一条 a 记录,并且立马修改 a 记录。这样会有两个消息发送给canal-client
,如果由于网络等原因,更新的消息早于插入的消息被处理了,还没有插入记录,更新操作的最后效果是失败的。
怎么办呢? canal
可以和消息队列组合呀!而且支持kafka
,rabbitmq
,rocketmq
多种选择,如此优秀。我们在消息队列这层来实现消息的顺序性。
我们选择了消息队列的业界标杆: kafka UCloud
提供了kafka
和rocketMQ
消息队列产品服务,使用它们能够快速便捷的搭建起一套消息队列系统。加速开发,方便运维。
下面就让我们来一探究竟:
1.选择kafka
消息队列产品,并申请开通
![kafka消息队列](https://atts.yisu.com/attachments/image/20200817/1597643207499951.jpg "kafka消息队列")
2.开通完成后,在管理界面,创建kafka
集群,根据自身需求,选择相应的硬件配置
![硬件配置](https://atts.yisu.com/attachments/image/20200817/1597643247605810.jpg "硬件配置")
3.一个kafka + ZooKeeper
集群就搭建出来了,给力!
![kafka+ZooKeeper集群](https://atts.yisu.com/attachments/image/20200817/1597643275823817.jpg "kafka+ZooKeeper集群")
并且包含了节点管理、Topic
管理、Consumer Group
管理,能够非常方便的直接在控制台对配置进行修改
监控视图方面,监控的数据包括kafka
生成和消费QPS
,集群监控,ZooKeeper
的监控。能够比较完善的提供监控指标。
![监控指标](https://atts.yisu.com/attachments/image/20200817/1597643316579478.jpg "监控指标")
![监控指标](https://atts.yisu.com/attachments/image/20200817/1597643347799100.jpg "监控指标")
![监控指标](https://atts.yisu.com/attachments/image/20200817/1597643363690805.jpg "监控指标")
canal
配上kafka
也非常的简单。 vi /usr/local/canal/conf/canal.properties
# 可选项: tcp(默认), kafka, RocketMQ canal.serverMode = kafka # ... # kafka/rocketmq 集群配置: 192.168.1.117:9092,192.168.1.118:9092,192.168.1.119:9092 canal.mq.servers = 127.0.0.1:9002 canal.mq.retries = 0 # flagMessage模式下可以调大该值, 但不要超过MQ消息体大小上限 canal.mq.batchSize = 16384 canal.mq.maxRequestSize = 1048576 # flatMessage模式下请将该值改大, 建议50-200 canal.mq.lingerMs = 1 canal.mq.bufferMemory = 33554432 # Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下) canal.mq.canalBatchSize = 50 # Canal get数据的超时时间, 单位: 毫秒, 空为不限超时 canal.mq.canalGetTimeout = 100 # 是否为flat json格式对象 canal.mq.flatMessage = false canal.mq.compressionType = none canal.mq.acks = all # kafka消息投递是否使用事务 canal.mq.transaction = false # mq config canal.mq.topic=default # dynamic topic route by schema or table regex #canal.mq.dynamicTopic=mytest1.user,mytest2\\\\..*,.*\\\\..* canal.mq.dynamicTopic=mydatabase.mytable canal.mq.partition=0 # hash partition config canal.mq.partitionsNum=3 canal.mq.partitionHash=mydatabase.mytable
看到下面这一行配置
canal.mq.partitionHash=mydatabase.mytable
我们配置了kafka
的partitionHash
,并且我们一个Topic
就是一个表。这样的效果就是,一个表的数据只会推到一个固定的partition
中,然后再推给consumer
进行消费处理,同步到新的数据库。通过这种方式,解决了之前碰到的binlog
日志顺序处理的问题。这样即使我们部署了多个kafka consumer
端,构成一个集群,这样consumer
从一个partition
消费消息,就是消费处理同一个表的数据。这样对于一个表来说,牺牲掉了并行处理,不过个人觉得,凭借kafka
的性能强大的处理架构,我们的业务在kafka
这个节点产生瓶颈并不容易。并且我们的业务目的不是实时一致性,在一定延迟下,两个数据库保证最终一致性。
(推荐微课:SQL微课)
下图是最终的同步架构,我们在每一个服务节点都实现了集群化。全都跑在UCloud
的UK8s
服务上,保证了服务节点的高可用性。
canal
也是集群换,但是某一时刻只会有一台canal
在处理binlog
,其他都是冗余服务。当这台canal
服务挂了,其中一台冗余服务就会切换到工作状态。同样的,也是因为要保证binlog
的顺序读取,所以只能有一台canal
在工作。
并且,我们还用这套架构进行缓存失效的同步。我们使用的缓存模式是:Cache-Aside
。同样的,如果在代码中数据更改的地方进行缓存失效操作,会将代码变得复杂。所以,在上述架构的基础上,将复杂的触发缓存失效的逻辑放到kafka-client
端统一处理,达到一定解耦的目的。
关于“怎么使用canal+Kafka进行数据库同步操作”这篇文章的内容就介绍到这里,感谢各位的阅读!相信大家对“怎么使用canal+Kafka进行数据库同步操作”知识都有一定的了解,大家如果还想学习更多知识,欢迎关注亿速云行业资讯频道。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。