摘要:所以消息可以重復(fù)的放入不同的隊(duì)列中。而是對于消息來說的,在其發(fā)送消息到交換器時,需指定。與發(fā)布訂閱模式的相同點(diǎn)是可以將消息重復(fù)發(fā)送。它需要處理低延遲的傳遞,用于支持傳統(tǒng)的消息傳遞系統(tǒng)用例。
理解概念的一個方法
之前說過學(xué)習(xí)一個新的東西,最核心的就是掌握概念。而如何掌握概念呢?我的其中一個方法就是對比,把相似且模糊不清的兩個概念進(jìn)行對比,這樣就理解更快。
RabbitMQ模式RabbitMQ有以下模式:
1.工作隊(duì)列(Worke Queues)
發(fā)消息和收消息都是直接通過隊(duì)列。在耗時比較多的任務(wù),我們把任務(wù)放入隊(duì)列里,然后每個工作者去獲取任務(wù)然后處理。所以這個工作隊(duì)列,也稱為任務(wù)隊(duì)列(Task Queues)。這樣就將耗資源的任務(wù)從產(chǎn)生任務(wù)的應(yīng)用上解耦出來。
這個模式最主要的特征是:每個任務(wù)只會分發(fā)到一個工作者中。
2.發(fā)布/訂閱(Publish/Subscribe)
這個發(fā)布/訂閱和觀察者模式很像,但不是同一個東西。具體可看看發(fā)布/訂閱和觀察者區(qū)別。
在這里,RabbitMQ引入了交換器(Exchange)的概念,生產(chǎn)者不直接與隊(duì)列交互,而是通過交換器去與隊(duì)列進(jìn)行交互(或者叫綁定)。也就說生產(chǎn)者只和交換器交互。引入交換器這概念后,這消息中間件可以玩的花樣就多了。發(fā)布/訂閱(Publish/Subscribe)就是其中的一個。這里使用到的就是fanout的交換器。
這個模式最主要的特征是:類似于廣播(broadcast),同個消息可以發(fā)送到不同的隊(duì)列中去,而且這fanout交換器也不關(guān)系隊(duì)列有哪些,只要隊(duì)列和fanout交換器有綁定就發(fā)送,這樣就可以將消息重復(fù)發(fā)送到不同的隊(duì)列上。
與工作隊(duì)列模式的區(qū)別是:發(fā)布/訂閱的概念叫消息,而不是任務(wù)。所以消息可以重復(fù)的放入不同的隊(duì)列中。
3.路由(Routing)
路由模式也是引入交換器概念后,消息中間件玩的一個花樣。這里用到的交換器叫direct。
在這模式里,得新增兩個概念,分別是binding key和routing key, binding key是對于隊(duì)列來說的,在其與direct交換器綁定時指定binding key。而routing key是對于消息來說的,在其發(fā)送消息到direct交換器時,需指定routing key。這樣routing key能夠和binding key匹配得上的(就是值相等),direct交換器就會將消息發(fā)送到對應(yīng)binding key的隊(duì)列上。
這個模式最主要的特征是:控制消息的精度更高,可以指定哪些消息發(fā)送到哪些隊(duì)列里。
與發(fā)布/訂閱模式的區(qū)別是:區(qū)別是發(fā)布/訂閱是廣播,將消息發(fā)送到任何綁定交換器的隊(duì)列上,所以沒能力選擇消息,而路由是需binding key和routing key匹配上,消息才能發(fā)送到對應(yīng)binding key的隊(duì)列上,從而有能力去選擇消息。
與發(fā)布/訂閱模式的相同點(diǎn)是:可以將消息重復(fù)發(fā)送。
注:隊(duì)列可以綁定多個routing key
4.主題(Topics)
當(dāng)然,主題模式也是引入交換器概念后,消息中間件玩的一個花樣。這里用到的交換器叫topic。
這里用到的也是binding key和routing key,但不一樣的是,routing_key不能指定明確的key。而是這個key需要帶有點(diǎn)“.”,如 "stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"。而在這模式下,binding key的指定可以更廣泛些,其結(jié)構(gòu)是這樣的".orange." 、 "..rabbit" 和"lazy.#"。其中*(星號)是可以代表一個單詞,#(井號)是可以代表零個或多個單詞。也跟路由類似的,只要這樣routing key能夠和binding key匹配得上的(這里可以不用值相等,模式匹配上即可),topic交換器就會將消息發(fā)送到對應(yīng)binding key的隊(duì)列上。
如Q1隊(duì)列的binding key是".orange.",而 Q2是"..rabbit"和"lazy.#"。如果消息的routing key是 "quick.orange.rabbit" 則此消息會被發(fā)送到Q1和Q2隊(duì)列上。routing key是"quick.orange.fox"的消息只會發(fā)送到Q1隊(duì)列上。routing key是"lazy.pink.rabbit" 的消息只會發(fā)送到Q2隊(duì)列一次,routing key是 "quick.brown.fox" 的消息沒有匹配任何的binding key則此消息丟棄。
注:隊(duì)列可以綁定多個routing key
5.遠(yuǎn)程過程調(diào)用RPC(Remote Procedure Call)
RPC可以遠(yuǎn)程調(diào)用函數(shù),等待服務(wù)器返回結(jié)果。
RPC的一個備注:RPC雖然用得很廣泛,然而它也有不足之處,就是開發(fā)人員無法清晰的知道自己調(diào)用的這個函數(shù)到底是本地函數(shù)還是很慢的RPC。這種困惑很容易導(dǎo)致出一個不可預(yù)測的系統(tǒng)和增加沒必要的復(fù)雜性導(dǎo)致難以定位問題。如果不用簡單的程序,誤用RPC還可能寫出很維護(hù)的意大利面條式的代碼。。
對于這個問題,有三個建議保證函數(shù)是很容易被辨別出是本地函數(shù)還是遠(yuǎn)程函數(shù)。
文檔化,清晰地記錄組件間的依賴。
處理網(wǎng)絡(luò)帶來的異常,如超時等。
當(dāng)出現(xiàn)用RPC是否必要時,如果可以的話,你最好用異步管道(asynchronous pipeline)的形式,而不是使用阻塞形式的RPC。
。
RabbitMQ可以用于構(gòu)建RPC系統(tǒng)。一個客戶端和一個可擴(kuò)展的RPC服務(wù)器。不過此功能不太常用,所以就不留篇幅來講解。大概原理就是可以新增消息的屬性,從而將請求和響應(yīng)的消息給匹配上。
觀察者模式和發(fā)布/訂閱模式的區(qū)別觀察者模式
觀察者模式的定義:對象間的一種一對多的組合關(guān)系,以便一個對象的狀態(tài)發(fā)生變化時,所有依賴于它的對象都得到通知。
舉個例子
假設(shè)你正在找一份軟件工程師的工作,對“香蕉公司”很感興趣。所以你聯(lián)系了他們的HR,給了他你的聯(lián)系電話。他保證如果有任何職位空缺都會通知你。這里還有幾個候選人也你一樣很感興趣。所以職位空缺大家都會知道,如果你回應(yīng)了他們的通知,他們就會聯(lián)系你面試。
該模式必須包含兩個角色:觀察者和觀察對象,香蕉公司就是被觀察者Subject,你就是Observers(還有和你一樣的候選人),當(dāng)被觀察者狀態(tài)發(fā)送變化(比如職位空缺)就會通知(notify)觀察者,前提是Observers注冊到Subject里,也就是香蕉公司的HR得有你的電話號碼。
發(fā)布/訂閱模式
在觀察者模式中的Subject就像一個發(fā)布者(Publisher),而觀察者(Observer)完全可以看作一個訂閱者(Subscriber)。subject通知觀察者時,就像一個發(fā)布者通知他的訂閱者。這也就是為什么很多書和文章使用“發(fā)布-訂閱”概念來解釋觀察者設(shè)計(jì)模式。但是這里還有另外一個流行的模式叫做發(fā)布-訂閱設(shè)計(jì)模式。它的概念和觀察者模式非常類似。最大的區(qū)別是:
在發(fā)布-訂閱模式,消息的發(fā)送方,叫做發(fā)布者(publishers),消息不會直接發(fā)送給特定的接收者(訂閱者)。
意思就是發(fā)布者和訂閱者不知道對方的存在。需要一個第三方組件,叫做消息中間件,它將訂閱者和發(fā)布者串聯(lián)起來,它過濾和分配所有輸入的消息。換句話說,發(fā)布/訂閱模式用來處理不同系統(tǒng)組件的信息交流,即使這些組件不知道對方的存在。
我們設(shè)計(jì)kafka,是希望它能成為統(tǒng)一的平臺來處理大公司可能擁有的所有實(shí)時數(shù)據(jù)流。要做到這一點(diǎn),我們必須考慮相當(dāng)廣的用例(use case)。
它需要擁有高吞吐量來支持大容量事件流,如實(shí)時日志聚合(real-time log aggregation)。
它需要優(yōu)雅地處理大量的數(shù)據(jù)備份,用于支持離線系統(tǒng)的周期性數(shù)據(jù)負(fù)載。
它需要處理低延遲的傳遞,用于支持傳統(tǒng)的消息傳遞系統(tǒng)用例。
我們想它是分區(qū)、分布式、實(shí)時處理信息流,以創(chuàng)建新的信息流和傳輸信息流。這些動機(jī)造就了kafka的分區(qū)和消費(fèi)者模型。
最后有可能數(shù)據(jù)流被輸入到其他數(shù)據(jù)系統(tǒng)中,而這些系統(tǒng)需要對外提供服務(wù),所以kafka需要有能力保證容錯性,哪怕存在有機(jī)器宕機(jī)。
為了支持上述這些,我們設(shè)計(jì)了一些獨(dú)特元素,更類似于數(shù)據(jù)庫日志,而不是傳統(tǒng)的消息傳遞系統(tǒng)。
我們將在下面部分中概述設(shè)計(jì)中的一些元素。
持久化(Persistence) 別害怕文件系統(tǒng)kafka重度依賴文件系統(tǒng),用文件系統(tǒng)來存儲和緩存消息。人們都由這感覺“硬盤很慢”,以致于大家懷疑一個持久化架構(gòu)是否能具有競爭力的性能。實(shí)際上硬盤它很快也很慢,這取決于我們怎么去使用它。一個合理的硬盤架構(gòu)通常可以和網(wǎng)絡(luò)一樣快。(看來作者的網(wǎng)速都很快)。
硬盤性能的關(guān)鍵是,磁盤驅(qū)動器的吞吐量與過去十年的硬盤搜索的延遲有所不同。因此在6×7200rpm SATA RAID-5陣列的JBOD配置上的線性寫的性能大約為600MB/秒,但隨機(jī)寫入的性能僅為100k/秒,即超過6000倍的差別。這些線性讀寫是所有使用模式中最可預(yù)測的,并且由操作系統(tǒng)進(jìn)行了大量優(yōu)化?,F(xiàn)代操作系統(tǒng)都提供了預(yù)讀取(read-ahead)和后寫(write-behind)操作的技術(shù),這些支持多次讀取到一個大塊中和合并小的邏輯寫形成一個大的物理寫。這問題更深入的討論可以在這找到 ACM Queue article,他們確實(shí)發(fā)現(xiàn)順序硬盤讀寫在某些情況下比隨機(jī)內(nèi)存訪問還快。
為了彌補(bǔ)這些性能差異,現(xiàn)代操作系統(tǒng)越來越著重使用主存來做磁盤緩存?,F(xiàn)代操作系統(tǒng)很樂意將空余內(nèi)存轉(zhuǎn)移到磁盤緩存中,但這需要承受在內(nèi)存被回收時帶來的一點(diǎn)點(diǎn)的性能損失。所有硬盤讀寫都通過這統(tǒng)一的緩存(磁盤緩存)。如果沒有直接IO,這特性并沒有那么容易被拋棄。因此即使一個進(jìn)場維護(hù)自己數(shù)據(jù)緩存時,這些數(shù)據(jù)將會在OS的頁緩存里復(fù)制兩份,兩次高效地存儲所有東西。
此外,我們是在JVM基礎(chǔ)上建立的,任何一位有花時間去研究Java內(nèi)存的使用,都會知道以下兩件事情:
1.對象的內(nèi)存開銷非常高,通常會使要存儲的數(shù)據(jù)的大小增大一倍(甚至更多)。
2.隨著堆內(nèi)存的增加,Java垃圾收集會變得越來越繁瑣和緩慢。
也正是使用文件系統(tǒng)和依賴頁緩存(pagecache)帶來的結(jié)果優(yōu)于維護(hù)一個內(nèi)存中的緩存(in-memory cache)或是其他結(jié)構(gòu),通過對所有空閑內(nèi)存進(jìn)行自動訪問,我們至少可以將可用緩存加倍,并且還可以繼續(xù)加倍,通過存儲緊湊的字節(jié)結(jié)構(gòu)而不是單個對象。這樣做的話可以在32GB的機(jī)器上使用28-30GB緩存,而不用擔(dān)心GC問題。而且,即使服務(wù)重啟,這些數(shù)據(jù)也保持熱度,對比起來,進(jìn)程內(nèi)存中的緩存在重啟后需要重建(對于10GB的緩存可能需要10分鐘),否則它需要從一個完全冷的緩存開始(這可能意味更糟糕的初始化性能)。這也極大地簡化了代碼,因?yàn)樵诰彺婧臀募到y(tǒng)之間保持一致性的所有邏輯現(xiàn)在都在操作系統(tǒng)中,這比一次性在進(jìn)程內(nèi)嘗試更有效、更正確。如果您的磁盤使用傾向于線性讀取,那么預(yù)讀取將有效地預(yù)操作這些緩存。
這表明了一個非常簡單的設(shè)計(jì):在我們耗盡空間的時候,與其保持盡可能多的內(nèi)存并將其全部清空到文件系統(tǒng),不如反過來,數(shù)據(jù)都是被立即寫入到文件系統(tǒng)上的持久日志中,而不必刷新到磁盤。實(shí)際上,這僅僅意味著它被轉(zhuǎn)移到內(nèi)核的頁緩存中。
以頁緩存為核心的設(shè)計(jì),在這里文章里有被描述,此文章是Varnish的設(shè)計(jì)。
在消息傳遞系統(tǒng)里的持久化數(shù)據(jù)結(jié)構(gòu)通常是一個消費(fèi)者隊(duì)列關(guān)聯(lián)著一棵BTree或者其他通用的隨機(jī)訪問數(shù)據(jù)結(jié)構(gòu)來維護(hù)消息的元數(shù)據(jù)。BTree是一個萬能的數(shù)據(jù)結(jié)構(gòu),可以在消息傳遞系統(tǒng)中支持各種事務(wù)和非事務(wù)性的語義。但它帶來相當(dāng)高的成本:BTree操作是O(log N)。通常O(log N)本質(zhì)上被認(rèn)為是等于常量時間,但對于硬盤操作則并不是這樣。磁盤尋軌達(dá)到10ms,并且每個磁盤一次只能執(zhí)行一次尋軌,所以并行性是有限的。因此,即使是少量的磁盤尋軌也會導(dǎo)致很高的開銷。由于存儲系統(tǒng)將非??斓木彺娌僮髋c非常慢的物理磁盤操作混合在一起,因此當(dāng)在緩存固定時,數(shù)據(jù)增加時,樹結(jié)構(gòu)的性能通常是超線性的。數(shù)據(jù)加倍則會使速度慢兩倍以上。
直觀上,一個持久的隊(duì)列可以建立在簡單的讀取和追加的形式,這通常也是日志解決方案使用的。這結(jié)構(gòu)有這樣的好處,所有操作都是O(1),并且讀操作不會阻塞寫和讀的操作。這是具有明顯的優(yōu)勢,是因?yàn)樾阅芡耆c數(shù)據(jù)量大小解耦了,一個服務(wù)現(xiàn)在可以充分利用那些大量的,且便宜,低轉(zhuǎn)速的SATA驅(qū)動器。雖然硬盤的尋軌性能差,但它們的大型讀和寫的性能還是可以接受的,而且還是三分之一的價(jià)格就有三倍的容量。
在沒有任何性能懲罰的情況下訪問幾乎無限的磁盤空間意味著我們可以提供一些在消息傳遞系統(tǒng)中不常見的特性。例如,在kafka中,我們可以在相對較長的時間內(nèi)保留消息(比如一個星期),而不是每次消費(fèi)完就刪除消息。這將給消費(fèi)者帶來很大的靈活性。
我們在效率方面付出大量的努力。我們最初用例中的一個是處理網(wǎng)站活動數(shù)據(jù),這可以是非常大量的數(shù)據(jù):每個頁面的訪問都會產(chǎn)生許多寫操作。此外,我們假設(shè)每條消息至少被一個消費(fèi)者讀?。ㄍǔJ呛芏嘞M(fèi)者),因此我們努力讓消費(fèi)盡可能的便宜。
我們還發(fā)現(xiàn),經(jīng)歷過構(gòu)建和運(yùn)行多個類似的系統(tǒng),有效的多租戶業(yè)務(wù)的關(guān)鍵是效率。
我們在前面章節(jié)討論過硬盤的效率。一旦消除了糟糕的磁盤訪問模式,在這種類型的系統(tǒng)中有兩個常見的低效原因:太多小的I/O操作和過度的字節(jié)復(fù)制。
這小IO問題發(fā)生在客戶端和服務(wù)器之間,和服務(wù)器自身的持久化操作中。
為了避免這種情況,我們的協(xié)議是圍繞一個“消息集(message set)”抽象構(gòu)建的,該抽象可以自然地將消息分組在一起。這允許網(wǎng)絡(luò)請求將消息分組,并分?jǐn)偩W(wǎng)絡(luò)往返的開銷,而不是一次發(fā)送一條消息。服務(wù)器依次將大量的消息追加到其日志中,而消費(fèi)者一次獲取大量的線性塊。
這個簡單的優(yōu)化產(chǎn)生數(shù)量級的加速。批處理導(dǎo)致了更大的網(wǎng)絡(luò)數(shù)據(jù)包、更大的順序磁盤操作、連續(xù)的內(nèi)存塊等等,所有這些都使得Kafka可以將隨機(jī)消息寫入的流變成 線性的寫 流給消費(fèi)者。
另一個低效率的是字節(jié)復(fù)制。在低消息率下,這不是一個問題,但在負(fù)載下的影響是顯著的。為了避免這種情況,我們采用了一種標(biāo)準(zhǔn)化的二進(jìn)制消息格式,由生產(chǎn)者、代理和消費(fèi)者共享(因此數(shù)據(jù)塊可以在不進(jìn)行修改的情況下傳輸)。
broker維護(hù)的消息日志本身就是一個文件目錄,每個文件都由一個以生產(chǎn)者和消費(fèi)者使用的相同格式寫入磁盤的消息集的序列填充。保持這種通用格式可以優(yōu)化最重要的操作:持久日志塊的網(wǎng)絡(luò)傳輸?,F(xiàn)代unix操作系統(tǒng)為將數(shù)據(jù)從頁緩存?zhèn)鬏數(shù)教捉幼痔峁┝烁叨葍?yōu)化的代碼路徑;在Linux中,這是通過sendfile的系統(tǒng)調(diào)用完成的。
要了解sendfile的作用,首先最重要先理解將數(shù)據(jù)從文件傳輸?shù)教捉幼值墓矓?shù)據(jù)路徑:
1.操作系統(tǒng)從磁盤讀取數(shù)據(jù)到內(nèi)核空間的頁緩存。
2.應(yīng)用程序?qū)?shù)據(jù)從內(nèi)核空間讀取到用戶空間緩沖區(qū)中。
3.應(yīng)用程序?qū)?shù)據(jù)返回到內(nèi)核空間,并將其寫入套接字緩沖區(qū)。
4.操作系統(tǒng)將數(shù)據(jù)從套接字緩沖區(qū)復(fù)制到通過網(wǎng)絡(luò)發(fā)送的NIC緩沖區(qū)。
有4次復(fù)制,兩次系統(tǒng)內(nèi)核調(diào)用,這樣的效率當(dāng)然就低下。使用sendfile,通過允許操作系統(tǒng)直接將數(shù)據(jù)從頁緩存發(fā)送到網(wǎng)絡(luò),避免了重復(fù)復(fù)制。因此在這個優(yōu)化的路徑中,只需要最后的復(fù)制,一次從磁盤復(fù)制到NIC緩沖區(qū)即可。——零拷貝(zero-copy)
我們期望一個常見的用例是在一個主題上有多個使用者。使用上述的零拷貝優(yōu)化,數(shù)據(jù)被完全復(fù)制到頁緩存中,并在每次讀取時重復(fù)使用,而不是存儲在內(nèi)存中并在每次讀取時將其復(fù)制到用戶空間。這就允許以接近網(wǎng)絡(luò)連接的極限的速率來讀取消息。
頁緩存和sendfile的組合意味著,在一個Kafka集群上,在有消費(fèi)者的機(jī)子上,您將看到磁盤上沒有任何讀取活動,因?yàn)樗鼈儗⑼耆珡木彺嬷刑峁?shù)據(jù)。
更多Java支持的sendfile和零拷貝,請點(diǎn)擊這里。
在某性情況下,事實(shí)上真正的瓶頸不是CPU也不是硬盤,而是網(wǎng)絡(luò)帶寬。對于需要在廣域網(wǎng)上的數(shù)據(jù)中心之間發(fā)送消息的數(shù)據(jù)管道來說,尤其如此。當(dāng)然,用戶自己可以壓縮消息而不需要kafka的支持。但這可能導(dǎo)致非常差的壓縮比,特別是當(dāng)消息的冗余字段很多(如JSON里的字段名和網(wǎng)站日志里的user agent或公共字符串)。高效的壓縮需要多個消息壓縮在一起,而不是每個消息獨(dú)立壓縮。
Kafka用高效的批處理格式支持這一點(diǎn)??梢詫⒁慌⒕酆系揭黄饓嚎s,并以這種形式發(fā)送到服務(wù)器。這批消息將以壓縮的形式寫入,并且將在日志中保持壓縮,并且只會被使用者解壓。
Kafka支持GZIP、Snappy和LZ4壓縮協(xié)議。關(guān)于壓縮的更多細(xì)節(jié)可以在這里找到。
生產(chǎn)者直接發(fā)送數(shù)據(jù)到broker,不需要任何的中間路由層,而接受的broker是該分區(qū)的leader。為了幫助生產(chǎn)者實(shí)現(xiàn)這一點(diǎn),所有Kafka節(jié)點(diǎn)都可以回答關(guān)于哪些是可用服務(wù)器的元數(shù)據(jù)的請求,以及在任何給定的時間內(nèi),某個主題的分區(qū)的leader是否允許生產(chǎn)者適當(dāng)?shù)匕l(fā)送它的請求。
由客戶端控制它想往哪個分區(qū)生產(chǎn)消息。這可以隨機(jī)地進(jìn)行,實(shí)現(xiàn)一種隨機(jī)的負(fù)載平衡,或者可以通過一些語義分區(qū)函數(shù)來實(shí)現(xiàn)。我們提供了語義分區(qū)的接口,允許用戶指定一個分區(qū)的key,并使用這個key來做hash到一個分區(qū)(如果需要的話,也是可以復(fù)寫這分區(qū)功能的)。例如,我們選擇user的id作為可用,則所以該用戶的信息都會發(fā)送到同樣的分區(qū)。這反過來又會讓消費(fèi)者對他們的消費(fèi)產(chǎn)生局部性的假設(shè)。這種明確設(shè)計(jì)的分區(qū),允許消費(fèi)者自己本地的處理。
批處理是效率的主要驅(qū)動因素之一,為了能夠批處理,kafka的生產(chǎn)者會嘗試在內(nèi)存中積累數(shù)據(jù),然后在一起在一個請求中以大批量的形式發(fā)送出去。批處理這個可以設(shè)置按固定的消息數(shù)量或按特定的延遲(64k或10ms)。這允許累積更多字節(jié)的發(fā)送出去,這樣只是在服務(wù)器上做少量的大IO操作。這種緩沖是可配置的,這樣提供了一種機(jī)制來以額外的延遲來提高吞吐量。
具體的配置)和生產(chǎn)者的api可以在這文檔中找到。
kafka消費(fèi)者的工作方式是,向其想消費(fèi)的分區(qū)的leader發(fā)送“fetch”請求。在每個請求中消費(fèi)者指定日志的偏移量,然后接受回一大塊從偏移量開始的日志。因此,消費(fèi)者對position有重要的控制權(quán),如果需要,可以重置position來重新消費(fèi)數(shù)據(jù)。
Push和pull我們首先考慮的一個問題是,消費(fèi)者應(yīng)該是從broker拉取消息,還是應(yīng)該是broker把消息推送給消費(fèi)者。在這方面,kafka遵循了一種更傳統(tǒng)的設(shè)計(jì),大多數(shù)消息傳遞系統(tǒng)也會用的,那就是數(shù)據(jù)是從生產(chǎn)者push到broker,消費(fèi)者是從broker拉取數(shù)據(jù)。一些日志集中系統(tǒng),如Scribe和Apache Flume,遵循一個非常不同的,基于推送的路徑,將數(shù)據(jù)被推到下游。這兩種方法都由利弊,在基于推送的系統(tǒng),由于是broker得控制數(shù)據(jù)傳輸?shù)乃俾?,不同消費(fèi)者可能要不同的速率。然而消費(fèi)者一般的目的都是讓消費(fèi)者自己能夠以最大的速度進(jìn)行消費(fèi),但在基于push的系統(tǒng),當(dāng)消費(fèi)速率低于生產(chǎn)效率時,消費(fèi)者就不知道該怎么辦好了(本質(zhì)上就是一種拒絕服務(wù)攻擊(DOS))。一個基于pull的系統(tǒng)就擁有很好的熟悉,消費(fèi)者可以簡單的調(diào)控速率。
基于pull的系統(tǒng)的另一個優(yōu)點(diǎn)是,它可以對發(fā)送給消費(fèi)者的數(shù)據(jù)進(jìn)行聚合的批處理?;谕扑偷南到y(tǒng)必須選擇立即發(fā)送請求或積累更多數(shù)據(jù),然后在不知道下游用戶是否能夠立即處理它的情況下發(fā)送它。如果對低延遲進(jìn)行調(diào)優(yōu),這將導(dǎo)致僅在傳輸結(jié)束時發(fā)送一條消息,最終將被緩沖,這是浪費(fèi)?;趐ull的設(shè)計(jì)解決了這個問題,因?yàn)橛脩艨偸窃谌罩镜漠?dāng)前位置(或者是一些可配置的最大大小)之后提取所有可用的消息。因此,我們可以在不引入不必要的延遲的情況下獲得最佳的批處理。
基于pull的系統(tǒng)的缺點(diǎn)是,如果broker沒數(shù)據(jù),則消費(fèi)者可能會不停的輪訓(xùn)。為了避免這一點(diǎn),我們在pull請求上提供了參數(shù),允許消費(fèi)者在“長輪訓(xùn)”中阻塞,直到數(shù)據(jù)達(dá)到(并且可以選擇等待,直到一定數(shù)量的自己可以,確保傳輸?shù)拇笮。?
你可能詳細(xì)其他可能的設(shè)計(jì),如只有pull,點(diǎn)到點(diǎn)。生產(chǎn)者會將本地的日志寫到本地日志中,而broker則會從這些日志中拉取數(shù)據(jù)。通常還會提出類似的“存儲轉(zhuǎn)發(fā)(store-and-forward)”生產(chǎn)者。這很有趣,但是我們覺得不太適合我們的目標(biāo)用例:它有成千上萬的生產(chǎn)者。我們在大規(guī)模上運(yùn)行持久數(shù)據(jù)系統(tǒng)的經(jīng)驗(yàn)使我們覺得,在許多應(yīng)用程序中涉及到數(shù)千個磁盤,實(shí)際上并不會使事情變得更可靠,而且操作起來也會是一場噩夢。在實(shí)踐中,我們發(fā)現(xiàn),我們可以在不需要生產(chǎn)者持久化的情況下,以大規(guī)模的SLAs來運(yùn)行管道。
消費(fèi)者的Position(Consumer Position)令人驚訝的是,跟蹤所使用的內(nèi)容是消息傳遞系統(tǒng)的關(guān)鍵性能點(diǎn)之一。
很多消息傳遞系統(tǒng)在broker中保存了關(guān)于什么消息是被消費(fèi)了的元數(shù)據(jù)。也就是說,當(dāng)消息傳遞給消費(fèi)者時,broker要么立即記錄信息到本地,要么就是等待消費(fèi)者的確認(rèn)。這是一個相當(dāng)直觀的選擇,而且對于一臺機(jī)器服務(wù)器來說,很清楚地知道這些消息的狀態(tài)。由于許多消息傳遞系統(tǒng)中用于存儲的數(shù)據(jù)結(jié)構(gòu)都很糟糕,因此這(記錄消息狀態(tài))也是一個實(shí)用的選擇——因?yàn)閎roker知道什么是已經(jīng)被消費(fèi)的,所以可以立即刪除它,保持?jǐn)?shù)據(jù)的大小。
讓broker和消費(fèi)者就已經(jīng)消費(fèi)的東西達(dá)成一致,這可不是小問題。如果一條消息發(fā)送到網(wǎng)絡(luò)上,broker就把它置為已消費(fèi),但消費(fèi)者可能處理這條消息失敗了(或許是消費(fèi)者掛了,也或許是請求超時等),這條消息就會丟失了。為了解決這個問題,很多消息傳遞系統(tǒng)增加了確認(rèn)機(jī)制。當(dāng)消息被發(fā)送時,是被標(biāo)志為已發(fā)送,而不是已消費(fèi);這是broker等待消費(fèi)者發(fā)來特定的確認(rèn)信息,則將消息置為已消費(fèi)。這個策略雖然解決了消息丟失的問題,但卻帶來了新的問題。第一,如果消費(fèi)者在發(fā)送確認(rèn)信息之前,在處理完消息之后,消費(fèi)者掛了,則會導(dǎo)致此消息會被處理兩次。第二個問題是關(guān)于性能,broker必須保存每個消息的不同狀態(tài)(首先先鎖住消息以致于不會讓它發(fā)送第二次,其次標(biāo)志位已消費(fèi)從而可以刪除它)。還有些棘手的問題要處理。如消息被發(fā)送出去,但其確認(rèn)信息一直沒返回。
kafka處理則不一樣。我們的主題被分為一個有序分區(qū)的集合,且每個分區(qū)在任何給定的時間內(nèi)只會被訂閱它的消費(fèi)者組中的一個消費(fèi)者給使用。這意味著每個分區(qū)中的消費(fèi)者的position僅僅是一個整數(shù),這是下一次消費(fèi)時,消息的偏移量。這使?fàn)顟B(tài)(記錄是否被消費(fèi))非常小,每個分區(qū)只有一個數(shù)字。這個狀態(tài)可以被定期檢查。這樣確認(rèn)一條消息是否被消費(fèi)的成本就很低。
這樣還附加了一個好處。消費(fèi)者可以重置其最先的position從而重新消費(fèi)數(shù)據(jù)。這雖然違反了隊(duì)列的公共契約,但它卻變成關(guān)鍵功能給許多消費(fèi)者。例如,如果消費(fèi)者代碼有一個bug,并且在一些消息被消費(fèi)后才被發(fā)現(xiàn),那么當(dāng)bug被修復(fù)后,消費(fèi)者就可以重新使用這些消息。
離線數(shù)據(jù)加載(Offline Data Load)可擴(kuò)展持久化允許只有周期性地使用批量數(shù)據(jù)的消費(fèi)者的可能性,比如定期將批量數(shù)據(jù)加載到離線系統(tǒng)(如Hadoop或關(guān)系數(shù)據(jù)倉庫)。
消息傳遞語義(Message Delivery Semantics)現(xiàn)在我們已經(jīng)了解了些生產(chǎn)者和消費(fèi)者是怎么工作的,接下來我們說下kafka提供給生產(chǎn)者和消費(fèi)者的語義保證。很明顯這里提供了以下幾種消息傳遞保證機(jī)制:
至多一次(At most once),這樣消息可能會丟失,但永遠(yuǎn)不會重新傳遞。
至少一次(At least once),這樣消息不可能會丟失,但可能會重新傳遞。
有且僅有一次(Exactly once),這是大家想要的,每個消息會被傳遞一次,而且也僅僅只有一次。
值得注意的是,這可以歸結(jié)為兩個問題:發(fā)布消息的持久化保證,以及在消費(fèi)消息時的保證。
很多系統(tǒng)聲稱提供“有且僅有一次”的傳遞語義,但閱讀這些細(xì)節(jié)時,會發(fā)現(xiàn)其中大部分都是誤導(dǎo)(他們不理解消費(fèi)者或生產(chǎn)者可能掛掉的情況,那些有多個消費(fèi)者處理的情況,或者是那些被寫入磁盤的數(shù)據(jù)可能丟失的情況)。
kafka的語義很直接。在發(fā)布消息時,我們將消息“提交”到log中。一旦發(fā)布的消息被提交,只要有一個broker復(fù)制這個消息被寫入活動分區(qū),它就不會丟失。提交的消息的定義、活動分區(qū)以及我們試圖處理的失敗的類型的描述將在下一節(jié)(副本)中詳細(xì)描述?,F(xiàn)在我們假設(shè)在完美的情況下,現(xiàn)在讓我們假設(shè)一個完美的、無損的broker,和嘗試?yán)斫鈱ιa(chǎn)者和消費(fèi)者的保證。如果一個生產(chǎn)者試圖發(fā)布消息并經(jīng)歷一個網(wǎng)絡(luò)錯誤,那么就不能確定該錯誤發(fā)生在消息提交之前還是之后。這類似于插入到一個數(shù)據(jù)庫表的自動生成的主鍵的語義。
在0.11.0.0版本之前,如果一個生產(chǎn)者沒有收到一個消息已經(jīng)提交的響應(yīng),那么它幾乎沒有選擇,只能重新發(fā)送消息。這提供了“至少一次”的傳遞語義,因?yàn)槿绻颊埱髮?shí)際上成功了,那么在重新發(fā)送期間,消息可能再次被寫入到日志中。從0.11.0.0開始,Kafka生產(chǎn)者也支持一個冪傳遞的選項(xiàng),該選項(xiàng)保證重新發(fā)送不會導(dǎo)致日志中有這重復(fù)的消息。為了實(shí)現(xiàn)這一目標(biāo),broker為每個生產(chǎn)者分配一個ID,并使用由生產(chǎn)者發(fā)送消息時一起把序列號發(fā)送到broker,這樣broker就可以根據(jù)序列和id來處理重復(fù)的消息。同樣,從0.11.0.0開始,生產(chǎn)者支持使用類似于事務(wù)的語義向多個主題分區(qū)發(fā)送消息:即所有消息都已成功寫入或都失敗寫入。這種情況的主要應(yīng)用場景是在Kafka主題之間進(jìn)行“有且僅有一次”的處理(如下所述)。
并非所有的用例都需要這樣強(qiáng)的保證。對于延遲敏感的使用,我們允許生產(chǎn)者指定它需要的持久化級別。如果生產(chǎn)者指定要等待消息被提交要在10ms完成。則生產(chǎn)者可以指定它異步地執(zhí)行發(fā)送,或者等待直到leader(但不一定是follower)得到消息。
現(xiàn)在我們描述下消費(fèi)者視角下的語義。所有的副本都有相同的日志和相同的偏移量。消費(fèi)者控制它在這個日志中的position。如果消費(fèi)者從未崩潰,它可以將這個position存儲在內(nèi)存中,但是如果消費(fèi)者崩潰了,我們希望這個主題的分區(qū)來接替這個position的處理,那么新的進(jìn)程將需要選擇一個合適的position來開始處理。
消費(fèi)者讀取消息時,有幾個處理消息和更新其位置的選項(xiàng)。
第二種是它先讀取消息,然后將position保存到日志中,最后是處理消息。在這種情況下,在保存其position之后,在保存處理消息產(chǎn)生的輸出之前,消費(fèi)者進(jìn)程可能會崩潰。在這種情況下,接手處理的過程將從保存的position開始,即使在此position之前的一些消息未被處理。這是對應(yīng)著“至多一次”的語義,失敗的消息可能不被處理。
第二種是它先讀取消息,然后處理消息,最后保存position到日志中。在這種情況下,在處理消息后,消費(fèi)者進(jìn)程可能會崩潰,但是在它保存它的position之前崩潰的。在這種情況下,當(dāng)新進(jìn)程接手了它接收到的最初幾條消息時,或許這幾條消息就已經(jīng)被處理過了。在消費(fèi)者崩潰的情況下,這相當(dāng)于“至少一次”的語義。在許多情況下,消息有主鍵,因此更新是冪等的(接收相同的消息兩次,只是用另一個副本重寫了一個記錄)。
那“有且僅有一次”的語義怎樣(或者是說你到底想要什么)?從kafka主題中獲取消息處理后發(fā)布到其他主題(如一個Kafka Streams應(yīng)用),我們可以利用上面提到的版本0.11.0.0里的新事務(wù)生產(chǎn)者的功能。消費(fèi)者的position被當(dāng)做一個消息存儲在一個主題,因此我們可以在與接收處理數(shù)據(jù)的輸出主題相同的事務(wù)中寫入kafka的偏移量。 如果事務(wù)被中止,消費(fèi)者的position將恢復(fù)到原來的值,而輸出主題的生成數(shù)據(jù)將不會被其他消費(fèi)者看到,這取決于他們的“隔離級別”。在默認(rèn)的“read_uncommitted”隔離級別中,所有消息對消費(fèi)者都是可見的,即使它們是被中止的事務(wù)的一部分,但是在“read_committed”中,使用者只會從提交的事務(wù)中返回消息(以及任何不屬于事務(wù)的消息)。
當(dāng)寫入外部系統(tǒng)時,限制是在需要協(xié)調(diào)消費(fèi)者的position和實(shí)際存儲的輸出。實(shí)現(xiàn)這一目標(biāo)的經(jīng)典方法是在存儲消費(fèi)者position和存儲消費(fèi)者輸出之間引入兩階段提交。但這可以更簡單地處理,并且通常通過讓消費(fèi)者將其偏移量存儲在與輸出相同的位置。這樣做比較好,因?yàn)橄M(fèi)者可能想要寫入的輸出系統(tǒng)都不支持兩階段提交。作為一個例子,考慮一個Kafka Connect連接器,它在HDFS中填充數(shù)據(jù),以及它讀取的數(shù)據(jù)的偏移量,從而保證數(shù)據(jù)和偏移量都得到了更新,或者兩者都不更新。對于需要這些更強(qiáng)語義的其他許多數(shù)據(jù)系統(tǒng),我們遵循類似的模式是為了那些需要強(qiáng)一致性語義的系統(tǒng),還為了這些消息沒有主鍵來允許刪除重復(fù)數(shù)據(jù)。
因此kafka為了kafka Streams,高效地支持“有且僅有一次”的傳遞,并且在Kafka主題之間傳輸和處理數(shù)據(jù)時,通??梢允褂檬聞?wù)生產(chǎn)者/消費(fèi)者提供“有且僅有一次”的傳遞。對于其他目標(biāo)系統(tǒng)的“有且僅有一次”的傳遞一般需要協(xié)調(diào),但kafka提供了偏移量,它可以實(shí)現(xiàn)這要求(參見Kafka Connect)。否則,缺省情況下Kafka保證“至少一次”傳遞,并且允許用戶禁止生產(chǎn)者的重試或消費(fèi)者在處理數(shù)據(jù)之前提交position,從而實(shí)現(xiàn)“至多一次”的專遞。
副本(Replication)Kafka通過一個可配置的服務(wù)器數(shù)量對每個主題的分區(qū)進(jìn)行復(fù)制日志(你您可以按主題設(shè)置此副本因子(replication factor))。這允許在集群中的服務(wù)器發(fā)生故障時自動恢復(fù),因此當(dāng)在出現(xiàn)故障時仍然可以使用消息。
其他消息傳遞系統(tǒng)提供了副本相關(guān)的特性,但,我們認(rèn)為,這似乎是一種策略而已,并沒有大量的使用,而且還有個很大的缺點(diǎn):slave是未被用上的,吞吐量受到嚴(yán)重的影響,恢復(fù)還需要繁瑣的人工配置,等等。kafka默認(rèn)是使用了副本功能,實(shí)際上那些副本因子設(shè)置為1的主題,我們也會當(dāng)做是使用副本功能的主題。
副本的最小單元是主題的分區(qū)。在沒有失敗的情況下,kafka的每個分區(qū)都是有一個leader,其follower可以為零個或多個。包括leader在內(nèi)的副本數(shù)量就是副本因子。所有讀和寫都是通過leader分區(qū)。通常情況,分區(qū)的數(shù)據(jù)量是多個broker,leader的數(shù)量時平均分配當(dāng)每個broker。follower的日志和leader的日志是完全相同的——它們都具有相同的偏移量和相同順序的消息(當(dāng)然,在任何給定的時刻,在日志的末尾可能會有一些還未同步到的消息)。
follower也跟kafka的普通消費(fèi)者一樣從leader消費(fèi)消息。follower從leader拉消息時,有個很好的特性,那就時可以讓follower很容易地批量把日志應(yīng)用到其(follower)日志中。
跟很多 分布式系統(tǒng)處理自動恢復(fù) 一樣,對于節(jié)點(diǎn)是否“存活(alive)”需要有一個明確的定義。對于kafka,節(jié)點(diǎn)存活有以下兩個條件:
1.節(jié)點(diǎn)必須維護(hù)它與ZooKeeper的session(通過ZooKeeper的心跳機(jī)制)
2.如果是slave,就必須復(fù)制leader,而且不能落后太遠(yuǎn)。
滿足上述兩個條件的節(jié)點(diǎn),我們更愿意叫“已同步(in sync)”而不是模糊不清的“存活”或“失敗”。leader保持跟蹤這些“已同步”的節(jié)點(diǎn)。如果follower掛了,或者卡住了,或者落后太遠(yuǎn)了,leader會講起從已同步的副本名單中移除。是有e replica.lag.time.max.ms這配置去控制卡住多長時間和落后多少副本數(shù)量。
在分布式系統(tǒng)術(shù)語中,我們只嘗試處理一個“失敗/恢復(fù)”模型,即節(jié)點(diǎn)突然停止工作,然后恢復(fù)(可能不知道它們已經(jīng)死亡)。kafka沒有處理所謂的“拜占庭式”的失敗,即節(jié)點(diǎn)產(chǎn)生任意或惡意的響應(yīng)(可能是由于某些錯誤)。
現(xiàn)在,我們可以更精確地定義一個消息的提交,當(dāng)所有副本都同步到分區(qū),分區(qū)并且應(yīng)用到其日志中時,就會被認(rèn)為是提交的。只有提交的消息才會分發(fā)給消費(fèi)者。這就意味著消費(fèi)者不用擔(dān)心當(dāng)leader崩潰時,消息會丟失。另一方面,生產(chǎn)者可以選擇等待消息提交或不提交,這取決與它們對延遲和持久化之間的權(quán)衡。生產(chǎn)者可以使用acks這配置來控制這權(quán)衡。注意,這“最小數(shù)據(jù)量(minimun number)”同步副本的數(shù)量設(shè)置,是指當(dāng)消息都同步到所有副本后,kafka再去檢查時,檢查的最小數(shù)量。如果生產(chǎn)者對確認(rèn)要求不太嚴(yán)格,則消息一發(fā)布就可以被使用了,即使同步副本數(shù)量還沒達(dá)到最小值。(這最小值可以低到只有一個,那就是leader)。
kafka保證消息不會丟,只要任何時候至少有一個已同步的副本存在。
kafka可以在節(jié)點(diǎn)故障的情況下可用。但存在網(wǎng)絡(luò)分區(qū)時,就可能無法使用了。
分區(qū)就是一個副本日志。副本日志是分布式數(shù)據(jù)系統(tǒng)的最基本的原語(primitvie)之一,而且有很多種實(shí)現(xiàn)方式。其他系統(tǒng)可以使用副本日志作為一種原語,用于在狀態(tài)機(jī)形式的分布式系統(tǒng)。
對于一系列值的順序達(dá)成一致的過程(通常編號為0、1、2、…),副本日志就是將其模型化。有很多方法可以實(shí)現(xiàn)這一點(diǎn),但最簡單和最快的是leader來選擇序值。只有l(wèi)eader還存活,所喲follower都只需要復(fù)制值即可,順序由leader決定。
當(dāng)然,如果leader不掛,那我們沒必要要follower。當(dāng)leader崩潰時,我從follower中選擇出新的leader。但follower自己可能落后或崩潰,所以我們必須保證我們選擇的是最新的follower。日志復(fù)制算法必須這最基本的保證時,如果我們告訴客戶端消息已經(jīng)提交了,而此時leader掛了,我們選擇的新leader也必須包含剛剛那個已經(jīng)提交了的消息。這就產(chǎn)生了一個權(quán)衡:如果leader等待過多的follower確認(rèn)消息,This yields a tradeoff: if the leader waits for more followers to acknowledge a message before declaring it committed then there will be more potentially electable leaders.
如果你指定確認(rèn)的數(shù)量和日志(與leader對比過的)的數(shù)量,這樣就保證有重疊性,那么這就叫法定人數(shù)(Quorums)。
這種權(quán)衡最常見的方法是,在提交決策和leader選舉中使用大多數(shù)投票。這不是kafka做的,但讓我們?nèi)ヌ剿魉?,了解它的利弊。假設(shè)我們有2f+1個副本。如果f+1節(jié)點(diǎn)收到消息,沒有超過f個節(jié)點(diǎn)失敗,則leader就保證所有消息都被提交,我們選擇新leader時也一樣。這是因?yàn)槲覀冊谌我夤?jié)點(diǎn)上選擇f+1個節(jié)點(diǎn),這f+1里必須至少有一個節(jié)點(diǎn)包含所有已提交消息的副本。副本最完整的結(jié)點(diǎn)將會被選中為新leader。這里還有很多算法細(xì)節(jié)需要處理(如明確定義日志的完整性,leader崩潰時怎么保證一致性,修改集群中的服務(wù)器),這些我們先暫時忽略。
多數(shù)投票方法有個非常好的特性:延遲僅僅取決于多臺最快的服務(wù)器。也就是說,如果副本因子時3,那么延遲由最快的一個slave決定,而不是最慢的slave(leader一個、最快的slave一個,這就達(dá)到法定人數(shù)了)。
這個家族有很多算法,包括ZooKeeper的Zab,Raft和Viewstamped副本算法。我們知道的,更接近kafka的用的算法的學(xué)術(shù)出版是來自微軟的PacificA。
多數(shù)投票的不足之處就是,它不需要很多失敗的節(jié)點(diǎn),就可以讓你選擇不到leader。為了容忍一個節(jié)點(diǎn)失敗,則需要3個節(jié)點(diǎn),容忍2個,則需要5個節(jié)點(diǎn)。在我們經(jīng)驗(yàn)里,以為只要剛剛好夠冗余的副本,就能容忍一個節(jié)點(diǎn)的失敗,但這是不實(shí)際的,在5倍硬盤空間(5個硬盤,每個硬盤占1/5吞吐)情況下,每次都要寫5次,這對于大量數(shù)據(jù)的問題時不切實(shí)際的。這也是為什么法定人數(shù)算法比較常用在集群的配置文件如ZooKeeper,而很少用在原數(shù)據(jù)存儲上。例如在HDFS的namenode的高可用是建立在多數(shù)人投票,但這成本很高的算法不會用在它的數(shù)據(jù)存儲上。
Kafka使用了一個稍微不太一樣的方法去選擇法定人數(shù)。kafka動態(tài)的維護(hù)一個ISR(in-sync replicas)集合,集合里面的節(jié)點(diǎn)都是已同步。只有這集合里面的人才適合選舉為leader。只有所有ISR都收到寫入分區(qū),則這分區(qū)的寫入就會被認(rèn)為已提交。這ISR保存在ZooKeeper。對于kafka的使用模型來說,這是一個重要的因素,那里有許多分區(qū),并且確保leader的平衡很重要。ISR模型和f+1副本,一個kafka主題可以容忍f個失?。偣簿蚮+1個節(jié)點(diǎn))。
我們想處理更多的用例,所以這個權(quán)衡我們覺得是合理的。在實(shí)際情況,對于容忍f個節(jié)點(diǎn)失敗,多數(shù)投票和ISR方法都是需要通用數(shù)量的副本確認(rèn)(比如,容忍1個節(jié)點(diǎn)失敗,多數(shù)投票方法則需要3個副本和1個確認(rèn),ISR方法需要2個副本和1個確認(rèn))。確認(rèn)提交而不需要由最慢的節(jié)點(diǎn)來確認(rèn)這是多數(shù)投票方法的好處。但我們覺得這是可以通過由客戶端選擇是否阻塞消息提交,以及控制副本因子(降低)而增加吞吐量和磁盤空間來優(yōu)化這個問題(這問題就是與多數(shù)投票對比)。
另一個重要的設(shè)計(jì)是,kafka不要求崩潰節(jié)點(diǎn)在所有數(shù)據(jù)完整的情況下恢復(fù)。在這個空間中,副本算法依賴于“穩(wěn)定存儲”的存在并不少見,這種“穩(wěn)定存儲”在任何故障恢復(fù)場景中都不能丟失,要保證一致性。這有兩個主要問題。首先,硬盤故障是我們在持久化數(shù)據(jù)系統(tǒng)的實(shí)際操作中最常見的問題,問題發(fā)生后,通常也不會完整地保留數(shù)據(jù)。其次,即使這不是一個問題,我們也不希望在每次寫入時都需要使用fsync,因?yàn)檫@樣會減少兩到三個數(shù)量級的性能。我們允許一個副本重新加入ISR的協(xié)議,這協(xié)議確保在重新加入之前,它必須完全重新同步,即使它在崩潰中丟失了未刷新的數(shù)據(jù)。
注意,Kafka對數(shù)據(jù)不丟失的保證是基于至少一個保持同步的副本。如果一個分區(qū)的副本都丟失了,則無法保證數(shù)據(jù)不丟失。
然而在實(shí)際情況下的系統(tǒng)當(dāng)所有副本掛之后必須做一些合理的事情。如果很不辛遇到這種情況,意識到后面會發(fā)生什么這是很重要。可能會出現(xiàn)以下兩種情況:
1.等待ISR里的所有節(jié)點(diǎn)恢復(fù),并選擇出新的leader(希望這leader還保存著所有的數(shù)據(jù))。
2.選擇第一個副本(不需要是ISR里面的)恢復(fù),作為leader。
以下是可用性和持久化的權(quán)衡。一、如果我們等待所有ISR副本恢復(fù),則我們會等很長的時間。。二、如果副本的數(shù)據(jù)都丟了,則永遠(yuǎn)無法恢復(fù)。最后一個就是,如果一個沒有同步的副本恢復(fù),我們允許它為leader,則認(rèn)為它的日志是最新的,哪怕它沒有包含所有已提交的消息。在0.11.0.0版本里默認(rèn)的選擇第一個權(quán)衡,用等待來換取數(shù)據(jù)的一致性。這個是可以配置的,如果啟動時間比一致性重要,則修改這個 unclean.leader.election.enable。
這個困惑不僅僅kafka有。它存在與任何基于法定人數(shù)算法的場景。例如,在多數(shù)投票的場景,如果你是去大多數(shù)服務(wù)器,在剩余的服務(wù)器,你就必須在兩者選其中一個,不是失去100%的數(shù)據(jù)就是丟失數(shù)據(jù)的一致性。
生產(chǎn)者生成消息時,可以選擇0個,1個或者全部副本確認(rèn)。注意這里的“全部副本確認(rèn)”不能保證所有被分配副本的結(jié)點(diǎn)都能收到消息。默認(rèn)的,當(dāng)acks=all時,只要所有當(dāng)前所有ISR都收到消息,則可以確認(rèn)消息。例如,一個主題被設(shè)置為兩個副本和一個失?。ㄖ挥惺O乱粋€ISR),然后所有acks=all的寫入都會是成功的。如果剩余的副本也失敗,這樣消息就會被丟失。盡管這確保了分區(qū)的最大可用性,但是這種行為可能不適合某些喜歡持久化而不是可用性的用戶。因此,我們提供了兩種頂級的配置,可用于更傾向于消息持久化而不是可用性:
1.關(guān)閉不清晰的leader選舉——如果所有副本變得不可用,直到最近的leader變得可用,所有分區(qū)才可以變得可以用。這有效地避免了消息丟失的風(fēng)險(xiǎn)。請參閱上一節(jié)不清晰的Leader選舉。
2.指定最小的ISR數(shù)量——只有高過這最小數(shù)量,消息才會被確認(rèn),這是為了避免在寫入一個副本時,而且副本掛了,導(dǎo)致消息丟失的風(fēng)險(xiǎn)。這個設(shè)置僅僅在生產(chǎn)者使用acks=all生效或保證消息在這數(shù)量以上的ISR確認(rèn)。這個設(shè)置提供了一致性和高可用的權(quán)衡。ISR最小數(shù)量設(shè)置高一點(diǎn),這樣更好的保證一致性。然而這樣會減少可用性,因?yàn)樵贗SR沒滿足這數(shù)量時,分區(qū)是不可用的。
上訴討論副本,也僅僅是一份日志,也就是主題的一個分區(qū)。然而kafka是管理成千上萬的分區(qū)。我們試圖以循環(huán)(round-robin)方式在集群中平衡分區(qū),以避免在大數(shù)據(jù)量的主題的所有分區(qū)都在少量節(jié)點(diǎn)上。同樣地,我們試圖平衡leader,使每個節(jié)點(diǎn)都是其一定份額分區(qū)的leader。
對ledaer選舉過程進(jìn)行優(yōu)化也很重要,因?yàn)檫@是服務(wù)不可用的窗口期。一個簡單的leader選舉會在一個節(jié)點(diǎn)失敗后,在該節(jié)點(diǎn)內(nèi)所有分區(qū),每個分區(qū)都會舉行一次選舉。相反,我們選擇一個broker作為“controller”。這controller檢測broker層次的失敗,負(fù)責(zé)修改受故障影響的分區(qū)的leader。其結(jié)果是,我們能夠?qū)⒃S多需要的leadr變更批量處理,這使得選舉過程在大量的分區(qū)上變得更加便宜和快速。如果controller失敗了,其中一個存活的節(jié)點(diǎn)會變成新的controller。
日志壓縮保證kafka在每個分區(qū),對于每個key,至少保存其最近的一條消息。這解決了那些需要當(dāng)應(yīng)用或系統(tǒng)崩潰后,重啟時需重新加載數(shù)據(jù)的場景。
到目前位置,我們只討論了簡單的數(shù)據(jù)保存方法,那就是當(dāng)舊日志數(shù)據(jù)超過一定時間或達(dá)到一定大小的時候會被刪除。這個適用于每條相對獨(dú)立的消息,如臨時事件。然而,還有一類很重要的數(shù)據(jù),那就是根據(jù)key修改數(shù)據(jù),一種可變的數(shù)據(jù)(例如在數(shù)據(jù)庫表數(shù)據(jù)的變更那樣)。
我們討論一個具體的例子。一個主題包含了用戶emial信息,每次用戶更新他們的email信息,我們都會發(fā)送消息到topic,是根據(jù)他們的userid做主鍵。以下是我們發(fā)送的消息,userid是123,每條信息都對應(yīng)著一次的email信息修改(省略號是省略其他userid的消息)。
123 => [email protected] . . . 123 => [email protected] . . . 123 => [email protected]
日志壓縮給了我們更細(xì)顆粒度保留數(shù)據(jù)機(jī)制,這樣我們就可以保證只保留每一個key最后的一次變更(如123 => [email protected])。這樣我們保證了日志里都包含了所有key的最后一個值的快照。這就意味著下游的消費(fèi)者可以重建狀態(tài)而不需要保存所有的更變?nèi)罩尽?
讓我們一些日志壓縮有用的場景,然后我們在看看是怎么被使用上。
1.數(shù)據(jù)庫變更訂閱(Database chagne subscription)。我們很常見到一份數(shù)據(jù)集會存在多種數(shù)據(jù)系統(tǒng)里,而且這系統(tǒng)里有一個類似數(shù)據(jù)庫那樣的(如RDBMS或新潮的key-value系統(tǒng))。舉個例子,你有一個數(shù)據(jù)庫、一個緩存、一個搜索集群和一個Hadoop集群。這樣每次數(shù)據(jù)庫的修改,都得映射到那緩存、那搜索集群和最后在Hadoop里。在這個場景里,你只是需要實(shí)時最新更新的日志。但如果需要重新加載進(jìn)緩存或恢復(fù)宕機(jī)的搜索節(jié)點(diǎn),就可能需要完整的數(shù)據(jù)集。
2.事件源(Event sourcing)。這是一種應(yīng)用設(shè)計(jì)風(fēng)格,它將查詢和應(yīng)用設(shè)計(jì)結(jié)合在一起,并使用日志作為程序的主要存儲。
3.高可用日志(Journaling for high-availability)。一個本地計(jì)算的進(jìn)程可以通過變更日志來做到容錯,這樣另一個進(jìn)程就能重新加載這些變更繼續(xù)處理。一個具體的例子就是流式查詢系統(tǒng),如計(jì)數(shù)、匯總和其他“分組”操作。實(shí)時流式處理框架Samza就是使用這功能達(dá)到目的的。
在上述場景中,主要處理實(shí)時的變更,偶爾需要重新加載或重新處理時,能做的就只有重新加載所有數(shù)據(jù)。日志壓縮提供了這兩個功能,處理實(shí)時數(shù)據(jù)變更,和重新加載數(shù)據(jù)。這種使用日志的風(fēng)格,詳情可參看點(diǎn)擊。
這思路很簡單。如果我們保存無窮無盡的日志,保存上述場景中每個變更日志,而且還是一開始就獲取每個系統(tǒng)的狀態(tài)。使用這個完整的日志,我們就可以恢復(fù)到任何一個時間點(diǎn)的狀態(tài)。但這種完整日志的假設(shè)時不切實(shí)際的,因?yàn)閷τ谀切┟恳恍杏涗浂荚谧兏啻蔚南到y(tǒng),即使數(shù)據(jù)很小,日志也會無限的增長下去。那我們就簡單的丟棄舊日志,雖然可以限制空間的增長,但也無法重建狀態(tài)——因?yàn)榕f日志被丟棄,可能一部分記錄的狀態(tài)無法重建。
相對于粗粒度的基于時間的數(shù)據(jù)保留策略,日志壓縮的策略是一種更細(xì)顆粒度,基于每一條記錄保存。這個想法是,有選擇性的刪除那些有多個變更記錄的同樣的key。這樣的日志就保證每個key都至少有一個最新的狀態(tài)。
數(shù)據(jù)保留策略可以為每個主題設(shè)置,所以一個集群里有些主題的保存策略可以設(shè)置為大小和時間來保存數(shù)據(jù),有主題也可以通過壓縮保留。
這個功能的靈感是來自于LinkedIn里最古老且最成功的基礎(chǔ)架構(gòu)——一個被稱為Databus的數(shù)據(jù)庫變更日志緩存系統(tǒng)。
跟大多數(shù)日志結(jié)構(gòu)存儲系統(tǒng)不一樣的時,Kafka是為了訂閱而設(shè)計(jì)的,組織數(shù)據(jù)的形式也是為了更快的線性讀取和寫入。跟Databus不一樣之處是,kafka作為真實(shí)源(source-of-truth)存儲,即使上游數(shù)據(jù)源不具備可重用性的情況下,它還是挺有用的。
不管是傳統(tǒng)的RDBMS還是分布式的NoSQL存儲在數(shù)據(jù)庫中的數(shù)據(jù)總是會更新的,相同key的新記錄更新數(shù)據(jù)的方式簡單來說有兩種:
1.直接更新(找到數(shù)據(jù)庫中的已有位置以最新的值替換舊的值)。
2.追加記錄(保留舊的值,查詢時再合并,或者也有一個后臺線程會定期合并)。
采用追加記錄的做法可以在節(jié)點(diǎn)崩潰時用于恢復(fù)數(shù)據(jù),還有一個好處是寫性能很高,因?yàn)槭蔷€性寫。
以下是各個數(shù)據(jù)系統(tǒng)的更新數(shù)據(jù)方式:
數(shù)據(jù)系統(tǒng) | 更新數(shù)據(jù)追加到哪里 | 數(shù)據(jù)文件 | 是否需要壓縮 |
---|---|---|---|
ZooKeeper | log | snapshot | 不要,因?yàn)閿?shù)據(jù)量不大 |
Redis | aof | rdb | 不需要,因?yàn)槭莾?nèi)存數(shù)據(jù)庫 |
Cassandra | commit log | data.db | 需要,數(shù)據(jù)存在本地文件 |
HBase | commit log | HFile | 需要,數(shù)據(jù)存在HDFS |
Kafka | commit log | commit log | 需要,數(shù)據(jù)存在分區(qū)中的Segment里 |
這里有個更高層次的圖,展示kafka日志的邏輯存儲結(jié)構(gòu),框框的每個數(shù)字都是一條消息的偏移量(offset):
日志的頭部(Log Head)就是傳統(tǒng)的kafka日志。日志的尾部(Log Tail)則是被壓縮過的日志。Log Head是很密集的,偏移量時連續(xù)的,保留了所有的消息。值得注意的是在Log Tail的消息雖然被壓縮,但依然保留它一開始被寫入時的偏移量,這個偏移量是永遠(yuǎn)不會被改變。而且這壓縮日志里的偏移量,在日志里依然時有效的。所以,時無法區(qū)分下一個更高的偏移量是什么,比如說,上面的例子,36、 37、 38都是屬于同一個位置。
以上說的都是數(shù)據(jù)更新時的日志壓縮,當(dāng)然日志壓縮也支持刪除。當(dāng)發(fā)送某個Key的最新版本的消息的內(nèi)容為null,這個Key將被刪除(某種程度上也算是更新,如上面的例子就是把email信息置為null)。這個消息也稱刪除標(biāo)志(delete marker),這個刪除標(biāo)志會把之前跟這key相同的消息刪掉。但這刪除標(biāo)志比較特殊,特殊之處是它是過一段時間才被刪除,從而騰出磁盤空間。而數(shù)據(jù)刪除的時間點(diǎn)會被標(biāo)志為“刪除保留點(diǎn)(delte retention point)”,也就是如上圖所示,這個圖展示也很特別,你看看兩個是point而不是pointer,也不是指向某個消息,而是消息與消息之間。說明它是個時間點(diǎn),而不是指向某個消息的指針pointer。
壓縮時通過后臺定期復(fù)制日志段(log segment)完成的。清除時并不會阻塞讀操作,而且還可以配置不超過一定的IO,從而避免影響消費(fèi)者和生產(chǎn)者。壓縮日志段的過程如下:
日志壓縮提供了什么保證?(What guarantees does log compaction provide?)日志壓縮保證:
1.任何消費(fèi)者只要是讀取日志的頭部的,都可以看到所有消息,頭部的消息不會被刪除。這些消息都是有連續(xù)的偏移量。Topic的min.compaction.lag.ms參數(shù)可用于保證在指定時間內(nèi)該消息的存在,而不會被壓縮。這提供了消息呆在頭部(未被壓縮)的時間的底線。
2.依然保持則消息的有序性。壓縮永遠(yuǎn)不會重新給消息排序,而僅僅是刪除其部分而已。
3.消息的偏移量永遠(yuǎn)不會改變。它永遠(yuǎn)標(biāo)志著消息所在的位置。
4.任何從日志最開始的地方開始處理都會至少看到每個key的最終狀態(tài)。另外,只要消費(fèi)者在delete.retention.ms(默認(rèn)是24小時)這時間內(nèi)達(dá)到日志的頭部,則將會看到所有刪除記錄的刪除標(biāo)志。也就是說:由于刪除標(biāo)志的移除和讀取是同時發(fā)生,所以如果錯過delete.retention.ms這時間,消費(fèi)者會錯過刪除標(biāo)志。
日志壓縮通過日志清除器(log cleaner)執(zhí)行,后臺線程池復(fù)制日志段,移除那些存在于Log Head中的記錄。每個壓縮線程工作如下:
1.選擇Log Head中相對比Log Tail的比例高的日志。
2.創(chuàng)建Log Head中每個Key對應(yīng)的最后一個偏移量的日志摘要。
3.從頭到尾的開始復(fù)制,在復(fù)制過程中刪除相同key的日志。新的、干凈的日志段將立刻被交換(swap)到日志里,所以只需一個額外的日志段大小的硬盤空間就可以(不需要全部日志的空間)。
4.Log Head的日志摘要實(shí)際上是一個空間緊湊的哈希表。每個實(shí)體只需要24個字節(jié)空間。所以8G的cleaner空間,可以處理大概366G的Log Head(假設(shè)每個消息大小為1k)。
Kafka是默認(rèn)啟用日志清除器,是個線程池。如果要開啟指定主題的清理功能,你可以在日志里添加以下屬性:
log.cleanup.policy=compact
這個可以在創(chuàng)建主題時指定或修改主題時指定。
日志清除器可以設(shè)置多少消息在Log Head而不被刪除。這個啟用是通過設(shè)置壓縮時間段:
log.cleaner.min.compaction.lag.ms
如果不設(shè)置,則默認(rèn)是除了最后一個segment之外,其余日志段都會被壓縮,即最后一個日志段不會被壓縮。任何已激活的日志段都不會被壓縮,就算消息的時間已經(jīng)超過了上面配置的時間,這里的激活,是指有在消費(fèi)。
配額(Quotas)Kafka集群有能力強(qiáng)制性地要求控制broker中客戶端使用的資源。以下是兩類客戶的quotas:
1.網(wǎng)絡(luò)帶寬quotas,具體到字節(jié)(從0.9版本開始)。
2.請求速率quotas,具體到CPU的利用率(網(wǎng)絡(luò)和IO的比值)。
生產(chǎn)者和消費(fèi)者有可能生成/消費(fèi)大量的數(shù)據(jù)或請求速率非常高,以致于占滿了broker的資源,導(dǎo)致網(wǎng)絡(luò)飽和broker拒絕給其他客戶端服務(wù)。使用quotas就能避免這個問題,在多租戶集群上尤為重要,因?yàn)橐徊糠值唾|(zhì)量的客戶可能會降低高質(zhì)量客戶的用戶體驗(yàn)。實(shí)際上,可以對API進(jìn)行這樣的限制。
客戶組(Client groups)Kafka客戶標(biāo)識是用戶主體(user principal),用于代表用戶在這安全的集群上的權(quán)限。在無鑒權(quán)的時候,broker通過可配置的PrincipalBuilder來提供用戶主體,用來分組。由客戶端應(yīng)用選擇client-id作為客戶的邏輯分組。元組(user,client-id)則定義了一個安全邏輯組,共享user principal和chient-id。
quotas可以被應(yīng)用到元組(user,client-id),user或client-id組。對于一個連接,匹配上的quota將會應(yīng)用到此連接上。例如(user="test-user",client-id="test-client")擁有生產(chǎn)者quota是10MB/s,這個10MB的帶寬將會被user是“test-user”并且client-id是"test-client"的生產(chǎn)者進(jìn)行共用。
quota可以按(user,client-id)配置,也可以按user組配置,也可以按client-id組配置。默認(rèn)quota可以被任何級別的quota給覆蓋。這個機(jī)制類似于每個Topic可以覆蓋自己的。ZooKeeper的/config/users的quota可以覆蓋user和(user,client-id)的quota。/config/clients下的則可以覆蓋client-id的quota。這些ZooKeeper的覆蓋會即可在所以broker中生效,這樣我們就不需要修改配置時重啟服務(wù)器。詳情請點(diǎn)擊。
quota配置的優(yōu)先級如下:
1./config/users//clients/ 2./config/users/ /clients/ 3./config/users/ 4./config/users/ /clients/ 5./config/users/ /clients/ 6./config/users/ 7./config/clients/ 8./config/clients/
broker的(quota.producer.default, quota.consumer.default)屬性來給每個client-id組設(shè)置默認(rèn)的網(wǎng)絡(luò)帶寬。但后面的版本會刪除這些屬性。
client-id組的默認(rèn)quota可以在ZooKeeper中配置。
網(wǎng)絡(luò)帶寬quota,具體到字節(jié),而且是有組里的客戶一起共享。默認(rèn)的,每個獨(dú)立的客戶組都有一個固定的網(wǎng)絡(luò)帶寬的quota。這quota配置在每個broker。
請求速率配額(Request Rate Quotas)請求速率quota,具體到時間的百分比,時間是在quota窗口里每個broker的處理請求的IO線程和網(wǎng)絡(luò)線程。 n%的quota代表一個線程的n%,所以quota總數(shù)是((num.io.threads+num.network.threads)×100)%。每個客戶組在一個quota窗口中最多使用n%的IO線程和網(wǎng)絡(luò)線程。由于分配給IO和網(wǎng)絡(luò)的線程數(shù)是根據(jù)broker主機(jī)的cpu個數(shù),則每個請求速率quota代表著CPU的百分比。
實(shí)施(Enforcement)默認(rèn)情況下,每個唯一的客戶組都會有一個集群配置好的固定的quota。這個quota是定義在每個broker上。我們決定由每個broer定義這些quota,而不是由集群為每個client統(tǒng)一設(shè)置一個quota的原因,是因?yàn)闉榱朔奖愎蚕韖uota的設(shè)置。
如果Broker檢測到超過quota了,會怎么處理?在我們的解決方案中,我們是選擇降低速率,而不是直接返回錯誤。broker會去計(jì)算處理這問題的延遲時間,這段時間則不會立刻響應(yīng)客戶端。這種超過quota的處理,對于客戶端來說是透明的??蛻舳瞬恍枰鲱~外的操作。實(shí)際上,客戶端額外的動作,如果操作不好,還會加劇超過quota的問題。
字節(jié)率和線程利用率都會在多個小窗口中監(jiān)測(一秒鐘有30個窗口),以便快速準(zhǔn)確的糾正quota違規(guī)行為。
客戶端字節(jié)率在多個小窗口(例如每個1秒的30個窗口)上進(jìn)行測量,以便快速檢測和糾正配額違規(guī)。 通常,大的測量窗口(例如,每30秒10個窗口)會導(dǎo)致大量的流量,然后是長時間的延遲,這對用戶體驗(yàn)方面并不好。
參考和翻譯:
RabbitMQ官方 https://www.rabbitmq.com
Kafka官方 http://kafka.apache.org/docum...
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/69265.html
閱讀 2765·2023-04-25 14:15
閱讀 2708·2021-11-04 16:11
閱讀 3400·2021-10-14 09:42
閱讀 448·2019-08-30 15:52
閱讀 2830·2019-08-30 14:03
閱讀 3550·2019-08-30 13:00
閱讀 2117·2019-08-26 11:40
閱讀 3312·2019-08-26 10:25