摘要:附帶的這些示例配置文件使用您之前啟動的默認(rèn)本地群集配置,并創(chuàng)建兩個連接器第一個是源連接器,用于讀取輸入文件中的行,并將每個連接生成為,第二個為連接器它從讀取消息,并在輸出文件中產(chǎn)生每行消息。
轉(zhuǎn)載請注明原創(chuàng)地址為:http://www.54tianzhisheng.cn/2018/01/04/Kafka/
介紹官網(wǎng):http://kafka.apache.org/
Apache Kafka是分布式發(fā)布-訂閱消息系統(tǒng)。它最初由LinkedIn公司開發(fā),之后成為Apache項目的一部分。Kafka是一種快速、可擴(kuò)展的、設(shè)計內(nèi)在就是分布式的,分區(qū)的和可復(fù)制的提交日志服務(wù)。
Apache Kafka與傳統(tǒng)消息系統(tǒng)相比,有以下不同:
它被設(shè)計為一個分布式系統(tǒng),易于向外擴(kuò)展;
它同時為發(fā)布和訂閱提供高吞吐量;
它支持多訂閱者,當(dāng)失敗時能自動平衡消費(fèi)者;
它將消息持久化到磁盤,因此可用于批量消費(fèi),例如ETL,以及實時應(yīng)用程序。
安裝 kafka下載地址:https://kafka.apache.org/down...
wget http://mirrors.shuosc.org/apache/kafka/1.0.0/kafka_2.11-1.0.0.tgz
解壓:
tar -zxvf kafka_2.11-1.0.0.tgz cd /usr/local/kafka_2.11-1.0.0/
修改 kafka-server 的配置文件
vim /usr/local/kafka/config/server.properties
修改其中的:
broker.id=1 log.dir=/data/kafka/logs-1功能驗證: 1、啟動 zk
使用安裝包中的腳本啟動單節(jié)點(diǎn) Zookeeper 實例:
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties2、啟動Kafka 服務(wù)
使用 kafka-server-start.sh 啟動 kafka 服務(wù):
bin/kafka-server-start.sh config/server.properties3、創(chuàng)建 topic
使用 kafka-topics.sh 創(chuàng)建單分區(qū)單副本的 topic test:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
查看 topic 列表:
bin/kafka-topics.sh --list --zookeeper localhost:2181
查詢創(chuàng)建的 topic 列表報錯:
解決方法:
vim /etc/hosts
將 host 里的
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4 ::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
修改為:
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4 ::1 ip6-localhost ip6-localhost.localdomain localhost6 localhost6.localdomain6
方法參考:zookeeper unable to open socket to localhost/0:0:0:0:0:0:0:1:2181
再次查詢就不報錯了。
4、產(chǎn)生消息使用 kafka-console-producer.sh 發(fā)送消息:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test5、消費(fèi)消息
使用 kafka-console-consumer.sh 接收消息并在終端打?。?/p>
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
打開個新的命令窗口執(zhí)行上面命令即可查看信息:
6、查看描述 topics 信息bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
結(jié)果:
Topic:test PartitionCount:1 ReplicationFactor:1 Configs: Topic: test Partition: 0 Leader: 1 Replicas: 1 Isr: 1
第一行給出了所有分區(qū)的摘要,每個附加行給出了關(guān)于一個分區(qū)的信息。 由于我們只有一個分區(qū),所以只有一行。
“Leader”: 是負(fù)責(zé)給定分區(qū)的所有讀取和寫入的節(jié)點(diǎn)。 每個節(jié)點(diǎn)將成為分區(qū)隨機(jī)選擇部分的領(lǐng)導(dǎo)者。
“Replicas”: 是復(fù)制此分區(qū)日志的節(jié)點(diǎn)列表,無論它們是否是領(lǐng)導(dǎo)者,或者即使他們當(dāng)前處于活動狀態(tài)。
“Isr”: 是一組“同步”副本。這是復(fù)制品列表的子集,當(dāng)前活著并被引導(dǎo)到領(lǐng)導(dǎo)者。
集群配置Kafka 支持兩種模式的集群搭建:可以在單機(jī)上運(yùn)行多個 broker 實例來實現(xiàn)集群,也可在多臺機(jī)器上搭建集群,下面介紹下如何實現(xiàn)單機(jī)多 broker 實例集群,其實很簡單,只需要如下配置即可。
單機(jī)多broker 集群配置利用單節(jié)點(diǎn)部署多個 broker。 不同的 broker 設(shè)置不同的 id,監(jiān)聽端口及日志目錄。 例如:
cp config/server.properties config/server-2.properties cp config/server.properties config/server-3.properties vim config/server-2.properties vim config/server-3.properties
修改 :
broker.id=2 listeners = PLAINTEXT://your.host.name:9093 log.dir=/data/kafka/logs-2
和
broker.id=3 listeners = PLAINTEXT://your.host.name:9094 log.dir=/data/kafka/logs-3
啟動Kafka服務(wù):
bin/kafka-server-start.sh config/server-2.properties & bin/kafka-server-start.sh config/server-3.properties &
至此,單機(jī)多broker實例的集群配置完畢。
多機(jī)多 broker 集群配置分別在多個節(jié)點(diǎn)按上述方式安裝 Kafka,配置啟動多個 Zookeeper 實例。
假設(shè)三臺機(jī)器 IP 地址是 : 192.168.153.135, 192.168.153.136, 192.168.153.137
分別配置多個機(jī)器上的 Kafka 服務(wù),設(shè)置不同的 broker id,zookeeper.connect 設(shè)置如下:
vim config/server.properties
里面的 zookeeper.connect
修改為:
zookeeper.connect=192.168.153.135:2181,192.168.153.136:2181,192.168.153.137:2181使用 Kafka Connect 來導(dǎo)入/導(dǎo)出數(shù)據(jù)
從控制臺寫入數(shù)據(jù)并將其寫回控制臺是一個方便的起點(diǎn),但您可能想要使用其他來源的數(shù)據(jù)或?qū)?shù)據(jù)從 Kafka 導(dǎo)出到其他系統(tǒng)。對于許多系統(tǒng),您可以使用 Kafka Connect 來導(dǎo)入或?qū)С鰯?shù)據(jù),而不必編寫自定義集成代碼。
Kafka Connect 是 Kafka 包含的一個工具,可以將數(shù)據(jù)導(dǎo)入和導(dǎo)出到 Kafka。它是一個可擴(kuò)展的工具,運(yùn)行 連接器,實現(xiàn)與外部系統(tǒng)交互的自定義邏輯。在這個快速入門中,我們將看到如何使用簡單的連接器運(yùn)行 Kafka Connect,這些連接器將數(shù)據(jù)從文件導(dǎo)入到 Kafka topic,并將數(shù)據(jù)從 Kafka topic 導(dǎo)出到文件。
首先,我們將通過創(chuàng)建一些種子數(shù)據(jù)開始測試:
echo -e "zhisheng tian" > test.txt
接下來,我們將啟動兩個以獨(dú)立模式運(yùn)行的連接器,這意味著它們將在單個本地專用進(jìn)程中運(yùn)行。我們提供三個配置文件作為參數(shù)。首先是 Kafka Connect 過程的配置,包含常見的配置,例如要連接的 Kafka 代理以及數(shù)據(jù)的序列化格式。其余的配置文件都指定一個要創(chuàng)建的連接器。這些文件包括唯一的連接器名稱,要實例化的連接器類以及連接器所需的任何其他配置。
bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
Kafka 附帶的這些示例配置文件使用您之前啟動的默認(rèn)本地群集配置,并創(chuàng)建兩個連接器:第一個是源連接器,用于讀取輸入文件中的行,并將每個連接生成為 Kafka topic,第二個為連接器它從 Kafka topic 讀取消息,并在輸出文件中產(chǎn)生每行消息。
在啟動過程中,您會看到一些日志消息,其中一些指示連接器正在實例化。Kafka Connect 進(jìn)程啟動后,源連接器應(yīng)該開始讀取 test.txt topic connect-test,并將其生成 topic ,并且接收器連接器應(yīng)該開始讀取 topic 中的消息 connect-test 并將其寫入文件 test.sink.txt。我們可以通過檢查輸出文件的內(nèi)容來驗證通過整個管道傳輸?shù)臄?shù)據(jù):
數(shù)據(jù)存儲在 Kafka topic 中 connect-test,因此我們也可以運(yùn)行控制臺使用者來查看 topic 中的數(shù)據(jù)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
連接器繼續(xù)處理數(shù)據(jù),所以我們可以將數(shù)據(jù)添加到文件中,并看到它在管道中移動:
echo zhishengtian>> test.txt echo zhishengtian2>> test.txt echo zhishengtian3>> test.txt echo zhishengtian4>> test.txt使用 Kafka 流來處理數(shù)據(jù)
Kafka Streams 是用于構(gòu)建關(guān)鍵任務(wù)實時應(yīng)用程序和微服務(wù)的客戶端庫,輸入和/或輸出數(shù)據(jù)存儲在 Kafka 集群中。Kafka Streams 結(jié)合了在客戶端編寫和部署標(biāo)準(zhǔn) Java 和 Scala 應(yīng)用程序的簡單性以及 Kafka 服務(wù)器端集群技術(shù)的優(yōu)勢,使這些應(yīng)用程序具有高度可伸縮性,彈性,容錯性,分布式等特性。
可參考官網(wǎng)入門案例:http://kafka.apache.org/10/do...
參考1、在CentOS 7上安裝Kafka
2、http://kafka.apache.org/10/do...
關(guān)注我文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/68195.html
摘要:附帶的這些示例配置文件使用您之前啟動的默認(rèn)本地群集配置,并創(chuàng)建兩個連接器第一個是源連接器,用于讀取輸入文件中的行,并將每個連接生成為,第二個為連接器它從讀取消息,并在輸出文件中產(chǎn)生每行消息。 轉(zhuǎn)載請注明原創(chuàng)地址為:http://www.54tianzhisheng.cn/2018/01/04/Kafka/ showImg(https://segmentfault.com/img/rem...
摘要:前提好幾周沒更新博客了,對不斷支持我博客的童鞋們說聲抱歉了。熟悉我的人都知道我寫博客的時間比較早,而且堅持的時間也比較久,一直到現(xiàn)在也是一直保持著更新狀態(tài)。 showImg(https://segmentfault.com/img/remote/1460000014076586?w=1920&h=1080); 前提 好幾周沒更新博客了,對不斷支持我博客的童鞋們說聲:抱歉了!。自己這段時...
閱讀 1856·2021-11-24 09:39
閱讀 2318·2021-09-30 09:47
閱讀 4208·2021-09-22 15:57
閱讀 1918·2019-08-29 18:36
閱讀 3609·2019-08-29 12:21
閱讀 621·2019-08-29 12:17
閱讀 1294·2019-08-29 11:25
閱讀 755·2019-08-28 18:26