成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專(zhuān)欄INFORMATION COLUMN

kafka分布式消息平臺(tái)的初探

zsy888 / 1798人閱讀

摘要:介紹是一個(gè)分布式的流數(shù)據(jù)平臺(tái),可發(fā)布訂閱消息流,使用進(jìn)行集群管理。啟動(dòng)一個(gè),拉取消息參數(shù)表示從頭開(kāi)始讀取數(shù)據(jù),如果不設(shè)置,則只讀取最新的數(shù)據(jù)。消息發(fā)布者,方式,負(fù)責(zé)發(fā)布消息到。表明消息,被同一內(nèi)的均分了。

介紹

Kafka是一個(gè)分布式的流數(shù)據(jù)平臺(tái),可發(fā)布、訂閱消息流,使用zookeeper進(jìn)行集群管理。也可作為一個(gè)消息隊(duì)列中間件,類(lèi)似于RabbitMQ,ActiveMQ,ZeroMQ等。由LinkdIn開(kāi)源,用Scala語(yǔ)言實(shí)現(xiàn)。
Kafka有如下特點(diǎn):

kafka利用線(xiàn)性存儲(chǔ)來(lái)進(jìn)行硬盤(pán)讀寫(xiě),速度快;

以時(shí)間復(fù)雜度為O(1)的方式提供消息持久化能力,即使對(duì)TB級(jí)以上數(shù)據(jù)也能保證常數(shù)時(shí)間的訪(fǎng)問(wèn)性能。因此不清除存儲(chǔ)的數(shù)據(jù)并不會(huì)影響性能;

zero-copy Gzip和Snappy壓縮

已消費(fèi)的消息不會(huì)自動(dòng)刪除

考慮到高效性,對(duì)事務(wù)的支持較弱。

應(yīng)用場(chǎng)景

安裝使用

// 從官網(wǎng)下載最新版本,這里為:kafka_2.11-1.0.0.tgz

// 解壓

$ tar -xzf kafka_2.11-1.0.0.tgz
$ cd kafka_2.11-1.0.0

// Kafka用到了zookeeper,所以需要啟動(dòng)zookeeper(新版本內(nèi)置了zookeeper,如果讀者已有其他zookeeper啟動(dòng)了,這步可以略過(guò))

$ bin/zookeeper-server-start.sh config/zookeeper.properties

// 修改配置文件,并啟動(dòng)kafka server:
config/server.properties中的zookeeper.connect默認(rèn)為localhost:2181,可以修改為其他的zookeeper地址。多個(gè)地址間,通過(guò)逗號(hào)分隔,如:"127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"。
默認(rèn)為9092端口,通過(guò)修改“l(fā)isteners=PLAINTEXT://:9092” 來(lái)指定其他端口或IP。
配置好后,啟動(dòng)kafka server:

$ bin/kafka-server-start.sh config/server.properties

配置文件目錄下還有consumer.properties和producer.properties,按默認(rèn)即可。

// 創(chuàng)建一個(gè)topic,topic名稱(chēng)為test

$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

可以通過(guò)命令:bin/kafka-topics.sh --list --zookeeper localhost:2181查看當(dāng)前所有的topic.

// 通過(guò)producer發(fā)送消息

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
This is a message
This is another message

往test中發(fā)送數(shù)據(jù)。

// 啟動(dòng)一個(gè)consumer,拉取消息

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message

--from-beginning參數(shù)表示從頭開(kāi)始讀取數(shù)據(jù),如果不設(shè)置,則只讀取最新的數(shù)據(jù)。

// 可以再啟動(dòng)一個(gè)Server

$ bin/kafka-server-start.sh config/server-1.properties &

這里的server-1.properties是拷貝的server.properties,主要修改如下幾個(gè)參數(shù):

# broker id,整數(shù),和其他broker不能重復(fù)
broker.id=2
# 指定端口為9094,因?yàn)樵谕慌_(tái)機(jī)器上,需要避免端口沖突。這里沒(méi)有配置IP,默認(rèn)為本機(jī)
listeners=PLAINTEXT://:9094
# 日志文件路徑,即topic數(shù)據(jù)的存儲(chǔ)位置。不同的broker,指定不同的路徑。
log.dir=/tmp/kafka-logs-2
示意圖

Producer1和Producer2往Topic A中發(fā)送消息,Consumer1/2/3/4/5 從Topic中接收消息。
Kafka Cluster包含兩個(gè)Server,分別為Server1,Server2。
Topic A包含4個(gè)Partition,為:P0, P1, P3, P4,平均分配到Server1和Server2上。

Broker

一個(gè)Broker就是一個(gè)server。多個(gè)Broker構(gòu)成一個(gè)kafka集群,同時(shí)對(duì)外提供服務(wù),如果某個(gè)節(jié)點(diǎn)down掉,則重新分配。
注意:集群和主從熱備不同,對(duì)于主從熱備,同時(shí)只有一個(gè)節(jié)點(diǎn)提供服務(wù),其他節(jié)點(diǎn)待命狀態(tài)。

Producer

消息發(fā)布者,Push方式,負(fù)責(zé)發(fā)布消息到Kafka broker。

Consumer

消費(fèi)者,Pull方式,消費(fèi)消息。每個(gè)consumer屬于一個(gè)特定的consuer group。

主題(Topic)

通過(guò)對(duì)消息指定主題可以將消息分類(lèi),Consumer可以只關(guān)注特定Topic中的消息。
查看總共有多少個(gè)Topic:

$ bin/kafka-topics.sh --list --zookeeper localhost:2181

查看某個(gè)topic的情況(分區(qū)、副本數(shù)等),這里查看topic為test的信息:

$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
消費(fèi)者組(Consumer Group)

每個(gè)Consumer都會(huì)歸到一個(gè)Group中。某個(gè)Partition中的消息,可以被多個(gè)Group消費(fèi),但只能被Group中的一個(gè)Consumer消費(fèi)。所以,如果要對(duì)多個(gè)Consumer進(jìn)行消息廣播,則這些Consumer需要放在不同的Group中。
當(dāng)一個(gè)Consumer進(jìn)程或線(xiàn)程掛掉,它所訂閱的Partition會(huì)被重新分配到該Group內(nèi)的其他Consumer上。如果Consumer A訂閱了多個(gè)Partition,那么當(dāng)該Group內(nèi)新增Consumer B時(shí),會(huì)從Consumer A中分配出一個(gè)Partition給Consumer B。

為了維持Consumer 與 Consumer Group的關(guān)系,需要Consumer周期性的發(fā)送heartbeat到coordinator(協(xié)調(diào)者,在早期版本,以zookeeper作為協(xié)調(diào)者。后期版本則以某個(gè)broker作為協(xié)調(diào)者)。當(dāng)Consumer由于某種原因不能發(fā)Heartbeat到coordinator時(shí),并且時(shí)間超過(guò)session.timeout.ms時(shí),就會(huì)認(rèn)為該consumer已退出,它所訂閱的partition會(huì)分配到同一group 內(nèi)的其它的consumer上。而這個(gè)過(guò)程,被稱(chēng)為rebalance。

位移(Offset)

Offset是針對(duì)Partition的,它用來(lái)記錄消費(fèi)到Partition中的哪條消息了。
Consumer并不維護(hù)Offset,而是由Consumer所在的Group維護(hù)。因此,Group中的一個(gè)Consumer消費(fèi)了某個(gè)Partition中的消息,那么該組的其他Consumer就不能重復(fù)消費(fèi)該條消息了,因?yàn)镺ffset已經(jīng)+1了。

上圖中,Consumer A和Consumer B屬于不同的Group。Consumer A所在的Group,在該P(yáng)artition的Offset=9,表示下次該Group獲取消息時(shí)是從9開(kāi)始獲取;同理,Consumer B所在的Group在該P(yáng)artition的Offset=11,下次該Group的Consumer獲取消息時(shí),從11開(kāi)始獲取。

分區(qū)(Partition)

Partition是物理上的概念,每個(gè)Partition對(duì)應(yīng)一個(gè)文件夾(默認(rèn)在/tmp/kafka-logs下,通過(guò)server.properties中l(wèi)og.dirs配置)。一個(gè)topic可以對(duì)應(yīng)多個(gè)partition,consumer訂閱的其實(shí)就是partition。

上圖表示一個(gè)Topic,指定了3個(gè)分區(qū)。在向該Topic寫(xiě)數(shù)據(jù)時(shí),會(huì)根據(jù)均衡策略,往相應(yīng)的分區(qū)中寫(xiě)。這3個(gè)分區(qū)中的數(shù)據(jù)是不一樣的,它們的數(shù)據(jù)總和,構(gòu)成該Topic的數(shù)據(jù)。
每個(gè)分區(qū)中的數(shù)據(jù),保證嚴(yán)格的寫(xiě)入順序。

分區(qū)會(huì)自動(dòng)根據(jù)均衡策略分配到多個(gè)broker上。比如有2個(gè)broker(或者叫Server):broker1, broker2,創(chuàng)建一個(gè)包含4個(gè)partition且replication-factor(副本)為1的topic,那么對(duì)于該topic,每個(gè)broker會(huì)被分配2個(gè)partition。如下圖:

有兩個(gè)Group:Group A和Group B,其中Group A包含C1、C2兩個(gè)Consumer;Group B包含C3,C4,C5,C6四個(gè)Consumer。
如果向該Topic寫(xiě)入4條信息:M1, M2, M3, M4。那么各個(gè)Consumer收到的消息是(一種情況):

C1:M1, M3
C2:M2, M4

C3:M1
C4:M3
C5:M2
C6:M4

C1,C2各接收到2條消息,它們的和為:M1,M2,M3,M4。
C3,C4,C5,C6各接收到1條消息,它們的和為:M1,M2,M3,M4。
表明Topic消息,被同一Group內(nèi)的Consumer均分了。因?yàn)槊看蜗騎opic中寫(xiě)入消息時(shí),會(huì)被均分至各個(gè)Partition,然后各Consumer收到自己所訂閱Partition的消息。同時(shí),這里也說(shuō)明了同一個(gè)partition內(nèi)的消息只能被同一個(gè)組中的一個(gè)consumer消費(fèi)。
注:如果replication-factor為3,那么每個(gè)broker會(huì)有6(即2x3)個(gè)partition。

另外,創(chuàng)建topic時(shí),在當(dāng)前的所有broker間進(jìn)行均分,分好后就不會(huì)變了。假設(shè)把上述broker1停掉,它的分區(qū)不會(huì)轉(zhuǎn)到broker2上。producer在寫(xiě)消息時(shí),不會(huì)再寫(xiě)入broker2中的分區(qū)。
那么,原先訂閱broker2分區(qū)的consumer,不能接收消息了。提示:

WARN [Consumer clientId=consumer-1, groupId=g4] Connection to node 1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

對(duì)于一個(gè)Topic的partition數(shù),增加Broker(即服務(wù)節(jié)點(diǎn))并不會(huì)增加partition的數(shù)量。
驗(yàn)證:
查看topic信息

$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

再啟動(dòng)一個(gè)新的Broker:

$ bin/kafka-server-start.sh ../config/server-1.properties

啟動(dòng)后,再用上一步的命令看topic信息,partition數(shù)量并未改變。
并且,如果group g1上有兩個(gè)consumer,始終只會(huì)有一個(gè)consumer能收到該topic的消息,另一個(gè)一直處于空閑狀態(tài)(光占著資源不做事)。所以,Topic的Partition數(shù),要大于等于Consumer數(shù)量。

默認(rèn)組的疑問(wèn)
可能讀者會(huì)有疑問(wèn),通過(guò)命令:

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test

執(zhí)行多次,創(chuàng)建了多個(gè)consumer,這些consumer屬于默認(rèn)的一個(gè)組,但是卻能同時(shí)收到一個(gè)topic的信息。和上述所說(shuō)的“一個(gè)Topic中的消息,只能被group中的一個(gè)consumer消費(fèi)”有沖突。
其實(shí),不指定group名稱(chēng),的確會(huì)分配默認(rèn)的group,但每次分配的名稱(chēng)是不一樣的,即這里創(chuàng)建的consumer是屬于不同的group的??梢酝ㄟ^(guò)命令查看所有g(shù)roup:

$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
Note: This will not show information about old Zookeeper-based consumers.

console-consumer-94070
console-consumer-27823
console-consumer-4826
console-consumer-47050

可以看出,這里的group名稱(chēng)是不一樣的。

consumer數(shù)量和group數(shù)量
對(duì)于一個(gè)topic,如果group中consumer數(shù)量比partition數(shù)量多,那么多余的consumer會(huì)空閑。這是因?yàn)?,group中的某個(gè)consumer一旦訂閱了某個(gè)partition,則會(huì)一直占用并消費(fèi)該partition中的信息。除非該consumer退出,否則該partition不會(huì)被該組的其他consumer占用。所以會(huì)導(dǎo)致多余的consumer空閑,一直收不到消息。
可以通過(guò)命令,查看consumer和partition的對(duì)應(yīng)關(guān)系:

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group g4 

一個(gè)topic可以對(duì)應(yīng)多個(gè)partition,但一個(gè)partition只能對(duì)應(yīng)一個(gè)topic。

數(shù)據(jù)文件分段

Kafka解決查詢(xún)效率的手段之一是將數(shù)據(jù)文件分段,比如有100條Message,它們的offset是從0到99。假設(shè)將數(shù)據(jù)文件分成5段,第一段為0-19,第二段為20-39,以此類(lèi)推,每段放在一個(gè)多帶帶的數(shù)據(jù)文件里面,數(shù)據(jù)文件以該段中最小的offset命名。這樣在查找指定offset的Message的時(shí)候,用二分查找就可以定位到該Message在哪個(gè)段中。

零拷貝(圖來(lái)自網(wǎng)絡(luò))

上圖展示文件傳輸?shù)絊ocket的常規(guī)方式,步驟:

操作系統(tǒng)將文件數(shù)據(jù)從磁盤(pán)讀入到內(nèi)核空間的頁(yè)緩存;

應(yīng)用程序?qū)?shù)據(jù)從內(nèi)核空間讀入到用戶(hù)空間緩存中;

應(yīng)用程序?qū)?shù)據(jù)寫(xiě)回到內(nèi)核空間到socket緩存中;

操作系統(tǒng)將數(shù)據(jù)從socket緩沖區(qū)復(fù)制到網(wǎng)卡緩沖區(qū),以便將數(shù)據(jù)經(jīng)網(wǎng)絡(luò)發(fā)出。

上圖展示零拷貝方式傳輸文件到Socket,少了文件緩存到用戶(hù)空間,再由用戶(hù)空間到內(nèi)核空間的操作。
Kafka采用零拷貝的方式。

Partition備份

通過(guò)Replication Factor指定副本的數(shù)量,這樣,如果一個(gè)Partition出現(xiàn)了問(wèn)題,那么可以從副本中恢復(fù)了。

Kafka Manager安裝和使用

如果不喜歡通過(guò)命令行操作,也可以通過(guò)圖形化管理界面,比如yahoo開(kāi)源的Kafka Manager。
地址:https://github.com/yahoo/kafk...
這里以CentOS7為例,進(jìn)行編譯、運(yùn)行說(shuō)明。
注:Kafka Manager的編譯需要javac,需要安裝jdk環(huán)境。最新版的需要jdk8版本。
CentOS7默認(rèn)安裝了OpenJDK,將其卸載,從Oracle官網(wǎng)下載jdk8文件,然后安裝。

// github上下載kafka manager源碼
$ git clone https://github.com/yahoo/kafk...
$ cd kafka-manager

// 修改配置文件中zookeeper地址
配置文件:conf/application.conf

kafka-manager.zkhosts="127.0.0.1:2181"

如果有多個(gè)zookeeper,通過(guò)逗號(hào)分隔,如:

kafka-manager.zkhosts="my.zookeeper.host.com:2181,other.zookeeper.host.com:2181" 

// 將源碼編譯打包成zip包
$ ./sbt clean dist
這一步用到了javac,運(yùn)行完后,會(huì)在當(dāng)前目錄下生成target文件夾。生成zip包地址:
target/universal/kafka-manager-1.3.3.16.zip

// 進(jìn)入zip所在目錄,解壓zip包,啟動(dòng)服務(wù)(默認(rèn)9000端口)
$ cd target/universal
$ unzip kafka-manager-1.3.3.16
$ ./kafka-manager-1.3.3.16/bin/kafka-manager

// 打開(kāi)Kafka Manager頁(yè)面
瀏覽器輸入地址:http://192.168.0.12:9000/ (這里的IP需要替換成讀者自己的IP)

很簡(jiǎn)潔的一個(gè)頁(yè)面,第一次打開(kāi),什么都沒(méi)有。

// 添加一個(gè)Cluster

Cluster Name: 名稱(chēng)隨意,比如MyFirstCluster
Cluster Zookeeper Hosts: zookeeper的地址,比如:192.168.0.12:2181
Kafka Version: 筆者選的0.11
勾選“Enable JMX Polling”。注意:勾選了該項(xiàng),啟動(dòng)kafka server前,需要設(shè)置JMX_PORT變量,如:
$ JMX_PORT=9999
$ bin/zookeeper-server-start.sh config/zookeeper.properties 

保存后,就可以通過(guò)MyFirstCluster,查看Broker, Topic, Partition, Consumer等信息了。
注:如果查看不了Consumer信息,提示“Please enable consumer polling here.”,需要勾選一個(gè)配置。如:
提示信息:

修改Cluster:

勾選中“Poll consumer information”

保存。具體的管理功能,可以通過(guò)操作頁(yè)面進(jìn)一步挖掘。

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/70973.html

相關(guān)文章

  • 初探ZeroMQ

    摘要:關(guān)閉套接字和上下文備注說(shuō)明如何利用使用首先下載所需的包,解壓以后將和文件放到自己電腦中的安裝路徑中的文件夾下,最后需要將之前解壓后的包放在項(xiàng)目的中或者資源下載鏈接密碼項(xiàng)目源碼下載鏈接鏈接密碼 在講ZeroMQ前先給大家講一下什么是消息隊(duì)列。 消息隊(duì)列簡(jiǎn)介: 消息隊(duì)列中間件是分布式系統(tǒng)中重要的組件,主要解決應(yīng)用耦合,異步消息,流量削鋒等問(wèn)題。實(shí)現(xiàn)高性能,高可用,可伸縮和最終一致性架構(gòu)。是...

    Harriet666 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<