成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

學(xué)習(xí)kafka教程(二)

Prasanta / 1114人閱讀

摘要:目標(biāo)了解會使用過程首先示例代碼以上它實現(xiàn)了算法,該算法從輸入文本計算單詞出現(xiàn)的直方圖。

歡迎關(guān)注公眾號:n平方
如有問題或建議,請后臺留言,我會盡力解決你的問題。

本文主要介紹【KafkaStreams】

簡介

Kafka Streams編寫關(guān)鍵任務(wù)實時應(yīng)用程序和微服務(wù)的最簡單方法,是一個用于構(gòu)建應(yīng)用程序和微服務(wù)的客戶端庫,其中輸入和輸出數(shù)據(jù)存儲在Kafka集群中。它結(jié)合了在客戶端編寫和部署標(biāo)準(zhǔn)Java和Scala應(yīng)用程序的簡單性和Kafka服務(wù)器端集群技術(shù)的優(yōu)點。

Kafka Streams是一個用于構(gòu)建關(guān)鍵任務(wù)實時應(yīng)用程序和微服務(wù)的客戶端庫,其中輸入和/或輸出數(shù)據(jù)存儲在Kafka集群中。Kafka Streams結(jié)合了在客戶端編寫和部署標(biāo)準(zhǔn)Java和Scala應(yīng)用程序的簡單性和Kafka服務(wù)器端集群技術(shù)的優(yōu)點,使這些應(yīng)用程序具有高度可伸縮性、靈活性、容錯性、分布式等等。

目標(biāo)

了解kafka Streams

會使用kafka Streams

過程

1.首先WordCountDemo示例代碼(Java8以上)

// Serializers/deserializers (serde) for String and Long types
final Serde stringSerde = Serdes.String();
final Serde longSerde = Serdes.Long();
 
// Construct a `KStream` from the input topic "streams-plaintext-input", where message values
// represent lines of text (for the sake of this example, we ignore whatever may be stored
// in the message keys).
KStream textLines = builder.stream("streams-plaintext-input",
    Consumed.with(stringSerde, stringSerde);
 
KTable wordCounts = textLines
    // Split each text line, by whitespace, into words.
    .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("W+")))
 
    // Group the text words as message keys
    .groupBy((key, value) -> value)
 
    // Count the occurrences of each word (message key).
    .count()
 
// Store the running counts as a changelog stream to the output topic.
wordCounts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));

它實現(xiàn)了WordCount算法,該算法從輸入文本計算單詞出現(xiàn)的直方圖。然而,與您以前可能看到的對有界數(shù)據(jù)進(jìn)行操作的其他WordCount示例不同,WordCount演示應(yīng)用程序的行為略有不同,因為它被設(shè)計為對無限、無界的數(shù)據(jù)流進(jìn)行操作。與有界變量類似,它是一種有狀態(tài)算法,用于跟蹤和更新單詞的計數(shù)。然而,由于它必須假定輸入數(shù)據(jù)可能是無界的,因此它將周期性地輸出當(dāng)前狀態(tài)和結(jié)果,同時繼續(xù)處理更多的數(shù)據(jù),因為它不知道何時處理了“所有”輸入數(shù)據(jù)。

2.安裝并啟動zookeeper和kafka

bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties

3.創(chuàng)建主題
接下來,我們創(chuàng)建名為streams-plain -input的輸入主題和名為streams-wordcount-output的輸出主題:

bin/kafka-topics.sh --create 
    --zookeeper localhost:2181 
    --replication-factor 1 
    --partitions 1 
    --topic streams-plaintext-input
Created topic "streams-plaintext-input"

我們創(chuàng)建啟用壓縮的輸出主題,因為輸出流是一個變更日志流.

bin/kafka-topics.sh --create 
    --zookeeper localhost:2181 
    --replication-factor 1 
    --partitions 1 
    --topic streams-wordcount-output 
    --config cleanup.policy=compact
Created topic "streams-wordcount-output"

創(chuàng)建的主題也可以使用相同的kafka主題進(jìn)行描述

bin/kafka-topics.sh --zookeeper localhost:2181 --describe

4.啟動Wordcount應(yīng)用程序

bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo

a)演示應(yīng)用程序?qū)妮斎胫黝}流(明文輸入)中讀取,對每個讀取的消息執(zhí)行WordCount算法的計算,并不斷將其當(dāng)前結(jié)果寫入輸出主題流(WordCount -output)。因此,除了日志條目之外,不會有任何STDOUT輸出,因為結(jié)果是用Kafka寫回去的。

b)現(xiàn)在我們可以在一個多帶帶的終端上啟動控制臺生成器,向這個主題寫入一些輸入數(shù)據(jù)和檢查輸出的WordCount演示應(yīng)用程序從其輸出主題與控制臺消費者在一個多帶帶的終端.

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 
    --topic streams-wordcount-output 
    --from-beginning 
    --formatter kafka.tools.DefaultMessageFormatter 
    --property print.key=true 
    --property print.value=true 
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer 
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

c)輸入端:現(xiàn)在讓我們使用控制臺生成器將一些消息寫入輸入主題流——純文本輸入,方法是輸入一行文本,然后單擊。這將發(fā)送新消息輸入主題,消息鍵為空和消息值是剛才輸入的字符串編碼的文本行。

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input

此時你可以在控制臺輸入如下字符:

all streams lead to kafka

d))輸出端:此消息將由Wordcount應(yīng)用程序處理,以下輸出數(shù)據(jù)將寫入streams-wordcount-output主題并由控制臺使用者打印:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 
    --topic streams-wordcount-output 
    --from-beginning 
    --formatter kafka.tools.DefaultMessageFormatter 
    --property print.key=true 
    --property print.value=true 
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer 
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

這個時候會接收到剛剛在控制臺輸入的單詞統(tǒng)計結(jié)果:

all     1
streams 1
lead    1
to      1
kafka   1

如此類推:你可以在輸入端輸入單詞,對應(yīng)的在輸出端就會有統(tǒng)計結(jié)果。

小結(jié):
可以看到,Wordcount應(yīng)用程序的輸出實際上是連續(xù)的更新流,其中每個輸出記錄(即上面原始輸出中的每一行)是單個單詞的更新計數(shù),也就是記錄鍵,如“kafka”。對于具有相同鍵的多個記錄,后面的每個記錄都是前一個記錄的更新。

下面的兩個圖說明了幕后的本質(zhì)。第一列顯示KTable的當(dāng)前狀態(tài)的演變,該狀態(tài)為count計算單詞出現(xiàn)的次數(shù)。第二列顯示KTable的狀態(tài)更新所產(chǎn)生的更改記錄,這些記錄被發(fā)送到輸出Kafka主題流-wordcount-output。

最后

本人水平有限,歡迎各位建議以及指正。順便關(guān)注一下公眾號唄,會經(jīng)常更新文章的哦。

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/73235.html

相關(guān)文章

  • 寫這么多系列博客,怪不得找不到女朋友

    摘要:前提好幾周沒更新博客了,對不斷支持我博客的童鞋們說聲抱歉了。熟悉我的人都知道我寫博客的時間比較早,而且堅持的時間也比較久,一直到現(xiàn)在也是一直保持著更新狀態(tài)。 showImg(https://segmentfault.com/img/remote/1460000014076586?w=1920&h=1080); 前提 好幾周沒更新博客了,對不斷支持我博客的童鞋們說聲:抱歉了!。自己這段時...

    JerryWangSAP 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<