摘要:目標(biāo)了解會使用過程首先示例代碼以上它實現(xiàn)了算法,該算法從輸入文本計算單詞出現(xiàn)的直方圖。
歡迎關(guān)注公眾號:n平方
如有問題或建議,請后臺留言,我會盡力解決你的問題。
本文主要介紹【KafkaStreams】
簡介Kafka Streams編寫關(guān)鍵任務(wù)實時應(yīng)用程序和微服務(wù)的最簡單方法,是一個用于構(gòu)建應(yīng)用程序和微服務(wù)的客戶端庫,其中輸入和輸出數(shù)據(jù)存儲在Kafka集群中。它結(jié)合了在客戶端編寫和部署標(biāo)準(zhǔn)Java和Scala應(yīng)用程序的簡單性和Kafka服務(wù)器端集群技術(shù)的優(yōu)點。
Kafka Streams是一個用于構(gòu)建關(guān)鍵任務(wù)實時應(yīng)用程序和微服務(wù)的客戶端庫,其中輸入和/或輸出數(shù)據(jù)存儲在Kafka集群中。Kafka Streams結(jié)合了在客戶端編寫和部署標(biāo)準(zhǔn)Java和Scala應(yīng)用程序的簡單性和Kafka服務(wù)器端集群技術(shù)的優(yōu)點,使這些應(yīng)用程序具有高度可伸縮性、靈活性、容錯性、分布式等等。
目標(biāo)了解kafka Streams
會使用kafka Streams
過程1.首先WordCountDemo示例代碼(Java8以上)
// Serializers/deserializers (serde) for String and Long types final SerdestringSerde = Serdes.String(); final Serde longSerde = Serdes.Long(); // Construct a `KStream` from the input topic "streams-plaintext-input", where message values // represent lines of text (for the sake of this example, we ignore whatever may be stored // in the message keys). KStream textLines = builder.stream("streams-plaintext-input", Consumed.with(stringSerde, stringSerde); KTable wordCounts = textLines // Split each text line, by whitespace, into words. .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("W+"))) // Group the text words as message keys .groupBy((key, value) -> value) // Count the occurrences of each word (message key). .count() // Store the running counts as a changelog stream to the output topic. wordCounts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
它實現(xiàn)了WordCount算法,該算法從輸入文本計算單詞出現(xiàn)的直方圖。然而,與您以前可能看到的對有界數(shù)據(jù)進(jìn)行操作的其他WordCount示例不同,WordCount演示應(yīng)用程序的行為略有不同,因為它被設(shè)計為對無限、無界的數(shù)據(jù)流進(jìn)行操作。與有界變量類似,它是一種有狀態(tài)算法,用于跟蹤和更新單詞的計數(shù)。然而,由于它必須假定輸入數(shù)據(jù)可能是無界的,因此它將周期性地輸出當(dāng)前狀態(tài)和結(jié)果,同時繼續(xù)處理更多的數(shù)據(jù),因為它不知道何時處理了“所有”輸入數(shù)據(jù)。
2.安裝并啟動zookeeper和kafka
bin/zookeeper-server-start.sh config/zookeeper.properties bin/kafka-server-start.sh config/server.properties
3.創(chuàng)建主題
接下來,我們創(chuàng)建名為streams-plain -input的輸入主題和名為streams-wordcount-output的輸出主題:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic streams-plaintext-input Created topic "streams-plaintext-input"
我們創(chuàng)建啟用壓縮的輸出主題,因為輸出流是一個變更日志流.
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic streams-wordcount-output --config cleanup.policy=compact Created topic "streams-wordcount-output"
創(chuàng)建的主題也可以使用相同的kafka主題進(jìn)行描述
bin/kafka-topics.sh --zookeeper localhost:2181 --describe
4.啟動Wordcount應(yīng)用程序
bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo
a)演示應(yīng)用程序?qū)妮斎胫黝}流(明文輸入)中讀取,對每個讀取的消息執(zhí)行WordCount算法的計算,并不斷將其當(dāng)前結(jié)果寫入輸出主題流(WordCount -output)。因此,除了日志條目之外,不會有任何STDOUT輸出,因為結(jié)果是用Kafka寫回去的。
b)現(xiàn)在我們可以在一個多帶帶的終端上啟動控制臺生成器,向這個主題寫入一些輸入數(shù)據(jù)和檢查輸出的WordCount演示應(yīng)用程序從其輸出主題與控制臺消費者在一個多帶帶的終端.
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic streams-wordcount-output --from-beginning --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property print.value=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
c)輸入端:現(xiàn)在讓我們使用控制臺生成器將一些消息寫入輸入主題流——純文本輸入,方法是輸入一行文本,然后單擊。這將發(fā)送新消息輸入主題,消息鍵為空和消息值是剛才輸入的字符串編碼的文本行。
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
此時你可以在控制臺輸入如下字符:
all streams lead to kafka
d))輸出端:此消息將由Wordcount應(yīng)用程序處理,以下輸出數(shù)據(jù)將寫入streams-wordcount-output主題并由控制臺使用者打印:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic streams-wordcount-output --from-beginning --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property print.value=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
這個時候會接收到剛剛在控制臺輸入的單詞統(tǒng)計結(jié)果:
all 1 streams 1 lead 1 to 1 kafka 1
如此類推:你可以在輸入端輸入單詞,對應(yīng)的在輸出端就會有統(tǒng)計結(jié)果。
小結(jié):
可以看到,Wordcount應(yīng)用程序的輸出實際上是連續(xù)的更新流,其中每個輸出記錄(即上面原始輸出中的每一行)是單個單詞的更新計數(shù),也就是記錄鍵,如“kafka”。對于具有相同鍵的多個記錄,后面的每個記錄都是前一個記錄的更新。
下面的兩個圖說明了幕后的本質(zhì)。第一列顯示KTable的當(dāng)前狀態(tài)的演變,該狀態(tài)為count計算單詞出現(xiàn)的次數(shù)。第二列顯示KTable的狀態(tài)更新所產(chǎn)生的更改記錄,這些記錄被發(fā)送到輸出Kafka主題流-wordcount-output。
最后本人水平有限,歡迎各位建議以及指正。順便關(guān)注一下公眾號唄,會經(jīng)常更新文章的哦。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/73235.html
摘要:前提好幾周沒更新博客了,對不斷支持我博客的童鞋們說聲抱歉了。熟悉我的人都知道我寫博客的時間比較早,而且堅持的時間也比較久,一直到現(xiàn)在也是一直保持著更新狀態(tài)。 showImg(https://segmentfault.com/img/remote/1460000014076586?w=1920&h=1080); 前提 好幾周沒更新博客了,對不斷支持我博客的童鞋們說聲:抱歉了!。自己這段時...
閱讀 3259·2021-09-22 15:58
閱讀 1724·2019-08-30 14:17
閱讀 1729·2019-08-28 18:05
閱讀 1514·2019-08-26 13:33
閱讀 692·2019-08-26 12:20
閱讀 616·2019-08-26 12:18
閱讀 3198·2019-08-26 11:59
閱讀 1412·2019-08-26 10:36