摘要:比如,服務(wù)數(shù)據(jù)庫(kù)的數(shù)據(jù)來(lái)源于服務(wù)的數(shù)據(jù)庫(kù)服務(wù)的數(shù)據(jù)有變更操作時(shí),需要同步到服務(wù)中。第二種解決方案通過(guò)數(shù)據(jù)庫(kù)的進(jìn)行同步。并且,我們還用這套架構(gòu)進(jìn)行緩存失效的同步。目前這套同步架構(gòu)正常運(yùn)行中,后續(xù)有遇到問題再繼續(xù)更新。
在微服務(wù)拆分的架構(gòu)中,各服務(wù)擁有自己的數(shù)據(jù)庫(kù),所以常常會(huì)遇到服務(wù)之間數(shù)據(jù)通信的問題。比如,B服務(wù)數(shù)據(jù)庫(kù)的數(shù)據(jù)來(lái)源于A服務(wù)的數(shù)據(jù)庫(kù);A服務(wù)的數(shù)據(jù)有變更操作時(shí),需要同步到B服務(wù)中。
第一種解決方案:
在代碼邏輯中,有相關(guān)A服務(wù)數(shù)據(jù)寫操作時(shí),以調(diào)用接口的方式,調(diào)用B服務(wù)接口,B服務(wù)再將數(shù)據(jù)寫到新的數(shù)據(jù)庫(kù)中。這種方式看似簡(jiǎn)單,但其實(shí)“坑”很多。在A服務(wù)代碼邏輯中會(huì)增加大量這種調(diào)用接口同步的代碼,增加了項(xiàng)目代碼的復(fù)雜度,以后會(huì)越來(lái)越難維護(hù)。并且,接口調(diào)用的方式并不是一個(gè)穩(wěn)定的方式,沒有重試機(jī)制,沒有同步位置記錄,接口調(diào)用失敗了怎么處理,突然的大量接口調(diào)用會(huì)產(chǎn)生的問題等,這些都要考慮并且在業(yè)務(wù)中處理。這里會(huì)有不少工作量。想到這里,就將這個(gè)方案排除了。
第二種解決方案:
通過(guò)數(shù)據(jù)庫(kù)的binlog進(jìn)行同步。這種解決方案,與A服務(wù)是獨(dú)立的,不會(huì)和A服務(wù)有代碼上的耦合。可以直接TCP連接進(jìn)行傳輸數(shù)據(jù),優(yōu)于接口調(diào)用的方式。 這是一套成熟的生產(chǎn)解決方案,也有不少binlog同步的中間件工具,所以我們關(guān)注的就是哪個(gè)工具能夠更好的構(gòu)建穩(wěn)定、性能滿足且易于高可用部署的方案。
經(jīng)過(guò)調(diào)研,我們選擇了canal
[https://github.com/alibaba/canal]。canal
是阿里巴巴 MySQL binlog 增量訂閱&消費(fèi)組件,已經(jīng)有在生產(chǎn)上實(shí)踐的例子,并且方便的支持和其他常用的中間件組件組合,比如kafka,elasticsearch等,也有了canal-go
go語(yǔ)言的client庫(kù),滿足我們?cè)趃o上的需求,其他具體內(nèi)容參閱canal的github主頁(yè)。
OK,開始干!現(xiàn)在要將A數(shù)據(jù)庫(kù)的數(shù)據(jù)變更同步到B數(shù)據(jù)庫(kù)。根據(jù)wiki很快就用docker跑起了一臺(tái)canal-server
服務(wù),直接用canal-go
寫canal-client
代碼邏輯。用canal-go
直接連canal-server
,canal-server
和canal-client
之間是Socket來(lái)進(jìn)行通信的,傳輸協(xié)議是TCP,交互協(xié)議采用的是 Google Protocol Buffer 3.0。
1.Canal連接到A數(shù)據(jù)庫(kù),模擬slave
2.canal-client與Canal建立連接,并訂閱對(duì)應(yīng)的數(shù)據(jù)庫(kù)表
3.A數(shù)據(jù)庫(kù)發(fā)生變更寫入到binlog,Canal向數(shù)據(jù)庫(kù)發(fā)送dump請(qǐng)求,獲取binlog并解析,發(fā)送解析后的數(shù)據(jù)給canal-client
4.canal-client收到數(shù)據(jù),將數(shù)據(jù)同步到新的數(shù)據(jù)庫(kù)
Protocol Buffer的序列化速度還是很快的。反序列化后得到的數(shù)據(jù),是每一行的數(shù)據(jù),按照字段名和字段的值的結(jié)構(gòu),放到一個(gè)數(shù)組中 代碼簡(jiǎn)單示例:
func Handler(entry protocol.Entry) {
var keys []string
rowChange := &protocol.RowChange{}
proto.Unmarshal(entry.GetStoreValue(), rowChange)
if rowChange != nil {
eventType := rowChange.GetEventType()
for _, rowData := range rowChange.GetRowDatas() { // 遍歷每一行數(shù)據(jù) if eventType == protocol.EventType_DELETE || eventType == protocol.EventType_UPDATE {
columns := rowData.GetBeforeColumns() // 得到更改前的所有字段屬性 } else if eventType == protocol.EventType_INSERT {
columns := rowData.GetAfterColumns() // 得到更后前的所有字段屬性 }
......
}
}
}
為了高可用和更高的性能,我們會(huì)創(chuàng)建多個(gè)canal-client
構(gòu)成一個(gè)集群,來(lái)進(jìn)行解析并同步到新的數(shù)據(jù)庫(kù)。這里就出現(xiàn)了一個(gè)比較重要的問題,如何保證canal-client
集群解析消費(fèi)binlog的順序性呢?
我們使用的binlog是row模式。每一個(gè)寫操作都會(huì)產(chǎn)生一條binlog日志。 舉個(gè)簡(jiǎn)單的例子:插入了一條a記錄,并且立馬修改a記錄。這樣會(huì)有兩個(gè)消息發(fā)送給canal-client
,如果由于網(wǎng)絡(luò)等原因,更新的消息早于插入的消息被處理了,還沒有插入記錄,更新操作的最后效果是失敗的。
怎么辦呢?canal可以和消息隊(duì)列組合呀!而且支持kafka,rabbitmq,rocketmq多種選擇,如此優(yōu)秀。我們?cè)谙㈥?duì)列這層來(lái)實(shí)現(xiàn)消息的順序性。(后面會(huì)說(shuō)怎么做)
我們選擇了消息隊(duì)列的業(yè)界標(biāo)桿: kafka UCloud提供了kafka和rocketMQ消息隊(duì)列產(chǎn)品服務(wù),使用它們能夠快速便捷的搭建起一套消息隊(duì)列系統(tǒng)。加速開發(fā),方便運(yùn)維。
下面就讓我們來(lái)一探究竟:
①選擇kafka消息隊(duì)列產(chǎn)品,并申請(qǐng)開通
②開通完成后,在管理界面,創(chuàng)建kafka集群,根據(jù)自身需求,選擇相應(yīng)的硬件配置
③一個(gè)kafka+ZooKeeper集群就搭建出來(lái)了,給力!
并且包含了節(jié)點(diǎn)管理、Topic管理、Consumer Group管理,能夠非常方便的直接在控制臺(tái)對(duì)配置進(jìn)行修改
監(jiān)控視圖方面,監(jiān)控的數(shù)據(jù)包括kafka生成和消費(fèi)QPS,集群監(jiān)控,ZooKeeper的監(jiān)控。能夠比較完善的提供監(jiān)控指標(biāo)。
canal配上kafka也非常的簡(jiǎn)單。 vi /usr/local/canal/conf/canal.properties
# ...
# 可選項(xiàng): tcp(默認(rèn)), kafka, RocketMQ
canal.serverMode = kafka
# ...
# kafka/rocketmq 集群配置: 192.168.1.117:9092,192.168.1.118:9092,192.168.1.119:9092
canal.mq.servers = 127.0.0.1:9002
canal.mq.retries = 0
# flagMessage模式下可以調(diào)大該值, 但不要超過(guò)MQ消息體大小上限
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
# flatMessage模式下請(qǐng)將該值改大, 建議50-200
canal.mq.lingerMs = 1
canal.mq.bufferMemory = 33554432
# Canal的batch size, 默認(rèn)50K, 由于kafka最大消息體限制請(qǐng)勿超過(guò)1M(900K以下)
canal.mq.canalBatchSize = 50
# Canal get數(shù)據(jù)的超時(shí)時(shí)間, 單位: 毫秒, 空為不限超時(shí)
canal.mq.canalGetTimeout = 100
# 是否為flat json格式對(duì)象
canal.mq.flatMessage = false
canal.mq.compressionType = none
canal.mq.acks = all
# kafka消息投遞是否使用事務(wù)
canal.mq.transaction = false
# mq config
canal.mq.topic=default
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2..*,.*..*
canal.mq.dynamicTopic=mydatabase.mytable
canal.mq.partition=0
# hash partition config
canal.mq.partitionsNum=3
canal.mq.partitionHash=mydatabase.mytable
具體見:https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart
看到下面這一行配置
canal.mq.partitionHash=mydatabase.mytable
我們配置了kafka的partitionHash,并且我們一個(gè)Topic就是一個(gè)表。這樣的效果就是,一個(gè)表的數(shù)據(jù)只會(huì)推到一個(gè)固定的partition中,然后再推給consumer進(jìn)行消費(fèi)處理,同步到新的數(shù)據(jù)庫(kù)。通過(guò)這種方式,解決了之前碰到的binlog日志順序處理的問題。這樣即使我們部署了多個(gè)kafka consumer端,構(gòu)成一個(gè)集群,這樣consumer從一個(gè)partition消費(fèi)消息,就是消費(fèi)處理同一個(gè)表的數(shù)據(jù)。這樣對(duì)于一個(gè)表來(lái)說(shuō),犧牲掉了并行處理,不過(guò)個(gè)人覺得,憑借kafka的性能強(qiáng)大的處理架構(gòu),我們的業(yè)務(wù)在kafka這個(gè)節(jié)點(diǎn)產(chǎn)生瓶頸并不容易。并且我們的業(yè)務(wù)目的不是實(shí)時(shí)一致性,在一定延遲下,兩個(gè)數(shù)據(jù)庫(kù)保證最終一致性。
下圖是最終的同步架構(gòu),我們?cè)诿恳粋€(gè)服務(wù)節(jié)點(diǎn)都實(shí)現(xiàn)了集群化。全都跑在UCloud的UK8s服務(wù)上,保證了服務(wù)節(jié)點(diǎn)的高可用性。
canal也是集群換,但是某一時(shí)刻只會(huì)有一臺(tái)canal在處理binlog,其他都是冗余服務(wù)。當(dāng)這臺(tái)canal服務(wù)掛了,其中一臺(tái)冗余服務(wù)就會(huì)切換到工作狀態(tài)。同樣的,也是因?yàn)橐WCbinlog的順序讀取,所以只能有一臺(tái)canal在工作。
并且,我們還用這套架構(gòu)進(jìn)行緩存失效的同步。我們使用的緩存模式是:Cache-Aside
。同樣的,如果在代碼中數(shù)據(jù)更改的地方進(jìn)行緩存失效操作,會(huì)將代碼變得復(fù)雜。所以,在上述架構(gòu)的基礎(chǔ)上,將復(fù)雜的觸發(fā)緩存失效的邏輯放到kafka-client
端統(tǒng)一處理,達(dá)到一定解耦的目的。
目前這套同步架構(gòu)正常運(yùn)行中,后續(xù)有遇到問題再繼續(xù)更新。
更多內(nèi)容,歡迎點(diǎn)擊下方作者主頁(yè)進(jìn)行交流~
本文作者:UCloud應(yīng)用研發(fā)工程師 Cary
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/125986.html
摘要:易用的客戶端自身提供了簡(jiǎn)單的客戶端,數(shù)據(jù)格式較為復(fù)雜,處理消費(fèi)數(shù)據(jù)也不太方便,為了方便給業(yè)務(wù)使用,提供一種直接能獲取實(shí)體對(duì)象的方式來(lái)進(jìn)行消費(fèi)才更方便。 易用的canaljava 客戶端 canal 自身提供了簡(jiǎn)單的客戶端,數(shù)據(jù)格式較為復(fù)雜,處理消費(fèi)數(shù)據(jù)也不太方便,為了方便給業(yè)務(wù)使用,提供一種直接能獲取實(shí)體對(duì)象的方式來(lái)進(jìn)行消費(fèi)才更方便。先說(shuō)一下實(shí)現(xiàn)的思路,首先canal 客戶端的消息對(duì)象...
閱讀 3539·2023-04-25 20:09
閱讀 3740·2022-06-28 19:00
閱讀 3061·2022-06-28 19:00
閱讀 3082·2022-06-28 19:00
閱讀 3176·2022-06-28 19:00
閱讀 2881·2022-06-28 19:00
閱讀 3050·2022-06-28 19:00
閱讀 2638·2022-06-28 19:00