一 認識Kafka
Kafka是一個開源流處理平臺,它由 Apache 軟件基金會開發(fā)的,開發(fā)它的目的是為了提供一個統(tǒng)一的、高吞吐、低延遲的實時數(shù)據(jù)處理平臺。它的持久化層與同類平臺不同,本質(zhì)上是一個“按照分布式事務(wù)日志架構(gòu)的大規(guī)模發(fā)布/訂閱消息隊列”,這使它非常具有價值。
二 Kafka的使用
想要完成分區(qū)副本的重分配,需要在 Kafka 的根路徑下,執(zhí)行如下命令
執(zhí)行
./bin/kafka‐reassign‐partitions.sh ‐‐zookeeper localhost:2181/kafka ‐‐reassignment‐json‐file reassign‐topic.json ‐‐execute
分區(qū)副本的分布情況由eassign‐topic.json 文件指定,如
{ "version": 1, "partitions": [ { "topic": "test", "partition": 2, "replicas": [ 2, 1 ], "log_dirs": [ "any", "any" ] } }
從上我們可以看出opic=test,partition=2 的分區(qū)的兩副本分別移動到 brokerId=2 和 brokerId=1 的節(jié)點的任意磁盤路徑上。
三 ZooKeeper 和 Kafka Controller
3.1 ZooKeeper
Kafka 的元數(shù)據(jù)存儲在 ZooKeeper 中。Apache ZooKeeper是可靠的分布式協(xié)調(diào)服務(wù)框架。它憑借著數(shù)據(jù)模型類似于文件系統(tǒng)的樹形結(jié)構(gòu),實現(xiàn)保存一些元數(shù)據(jù)協(xié)調(diào)信息。同時 ZooKeeper具有 Watch 通知功能。一旦 znode 節(jié)點被創(chuàng)建、刪除,子節(jié)點數(shù)量發(fā)生變化,或是 znode 所存的數(shù)據(jù)本身變更, ZooKeeper會及時通知客戶端,觸發(fā)對應的處理操作。
3.2 Kafka Controller
Kafka Controller作為 Apache Kafka 的核心組件,它能夠在 Apache ZooKeeper 的幫助下管理和協(xié)調(diào)整個 Kafka 集群。集群中任意一臺 Broker 都能充當控制器的角色。事實上,在運行過程中,只能有一個 Broker 成為控制器,來發(fā)送各種操作指令。
四 分區(qū)重分配流程
Kafka需要在client、broker 和 controller 的協(xié)同運行下完成分區(qū)重分配。
流程圖如下:
流程圖分析
1、kafka-reassign-partitions 客戶端
先由客戶端發(fā)起分區(qū)重分配任務(wù),它的入口主類為 ReassignPartitionsCommand.scala 中,接著調(diào)用 executeAssignment 方法??蛻舳说?executeAssignment 方法主要完成了如下操作:
· 解析 json 文件 ,進行json 文件校驗
· 讀取 json 文件內(nèi)容,判斷是否繼續(xù)執(zhí)行副本重分配
· 校驗分區(qū)副本數(shù)和副本數(shù)據(jù)路徑數(shù)是否一致,校驗 partition/replica 是否為空/重復
· 檢查待重分配的分區(qū)在集群中是否存在,檢查確認所有目標 broker 均在線,檢查是否已存在分區(qū)副本重分配任務(wù)
· 分配任務(wù)記錄,發(fā)送 alterReplicaLogDirs 請求
2、controller 維護分區(qū)的元數(shù)據(jù)信息
在 controller 啟動時會創(chuàng)建 partitionReassignmentHandler,kafkaController 主線程回調(diào) onControllerFailover 時,當/admin/reassign_partitions 發(fā)生變化時,會觸發(fā)分區(qū)副本重分配操作,在 maybeTriggerPartitionReassignment 中通過調(diào)用 onPartitionReassignment 真正執(zhí)行分區(qū)副本重分配。
onPartitionReassignment 的執(zhí)行過程如下:
· 在 zk 中將 AR 更新為 RAR+OAR
· 向所有副本(RAR+OAR)中發(fā)送 LeaderAndIsr 請求
· 將 RAR-OAR 的副本狀態(tài)置為 NewReplica,直到所有 RAR 中的副本完成與 leader 的同步
· 將所有 RAR 的副本置為 OnlineReplica 狀態(tài),將 RAR 作為 AR
· 判斷 leader 不在 RAR 中,檢查 leader 狀態(tài),如果 leader 健康則更新 LeaderEpoch,否則重新選擇 leader
· 將 OAR-RAR 的副本置為 Offline 狀態(tài)
· 將 OAR-RAR 的副本置為 NonExistentReplica 狀態(tài),并將 zk 中的 AR 置為 RAR(/brokers/topics/${topicName}數(shù)據(jù)格式:{"version":1,"partitions":{"0":[${brokerId}]}})
· 更新 zk 中/admin/reassign_partitions 的值,同步所有 broker,更新元數(shù)據(jù)信息
3、broker 端數(shù)據(jù)跨路徑遷移
底層數(shù)據(jù)跨路徑遷移需要 broker 端完成的,broker 接收到客戶端發(fā)來的請求后,調(diào)用 alterReplicaLogDirs 方法
步驟如下:
· 確保目的路徑/待移動分區(qū)在線
· 標記需要進行遷移的分區(qū)副本路徑
· 對于需要移動的分區(qū)副本,創(chuàng)建 future Log
· 停止當前 Log 的清理工作,等待 future Log 同步
· 創(chuàng)建 ReplicaAlterLogDirsThread,逐個數(shù)據(jù)構(gòu)造 Fetch 請求
· 通過 ReplicaManager.fetchMessages 從分區(qū)副本 leader 獲取數(shù)據(jù),完成數(shù)據(jù)同步
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/127584.html
摘要:而在服務(wù)器中應該充分利用多線程來處理執(zhí)行邏輯。能保證所在的失效,該消息仍然可以從新選舉的中獲取,不會造成消息丟失。這意味著無需等待來自的確認而繼續(xù)發(fā)送下一批消息。 showImg(https://segmentfault.com/img/remote/1460000018373147?w=702&h=369); 1.概述 Apache Kafka最早是由LinkedIn開源出來的分布式...
閱讀 1204·2022-09-27 09:47
閱讀 1113·2022-09-27 09:28
閱讀 1567·2022-09-27 09:16
閱讀 866·2022-09-27 08:21
閱讀 1024·2022-09-27 08:08
閱讀 1173·2022-09-18 12:33
閱讀 885·2022-09-16 08:01
閱讀 896·2022-09-15 12:27