摘要:相關(guān)概念協(xié)議高級消息隊(duì)列協(xié)議是一個標(biāo)準(zhǔn)開放的應(yīng)用層的消息中間件協(xié)議??梢杂妹钆c不同,不是線程安全的。手動提交執(zhí)行相關(guān)邏輯提交注意點(diǎn)將寫成單例模式,有助于減少端占用的資源。自身是線程安全的類,只要封裝得當(dāng)就能最恰當(dāng)?shù)陌l(fā)揮好的作用。
本文使用的Kafka版本0.11
先思考些問題:
我想分析一下用戶行為(pageviews),以便我能設(shè)計(jì)出更好的廣告位
我想對用戶的搜索關(guān)鍵詞進(jìn)行統(tǒng)計(jì),分析出當(dāng)前的流行趨勢。這個很有意思,在經(jīng)濟(jì)學(xué)上有個長裙理論,就是說,如果長裙的銷量高了,說明經(jīng)濟(jì)不景氣了,因?yàn)楣媚飩儧]錢買各種絲襪了。
有些數(shù)據(jù),我覺得存數(shù)據(jù)庫浪費(fèi),直接存硬盤又怕到時候操作效率低。
這個時候,我們就可以用到分布式消息系統(tǒng)了。雖然上面的描述更偏向于一個日志系統(tǒng),但確實(shí)kafka在實(shí)際應(yīng)用中被大量的用于日志系統(tǒng)。
這些場景都有一個共同點(diǎn):數(shù)據(jù)是由上游模塊產(chǎn)生,上游模塊,使用上游模塊的數(shù)據(jù)計(jì)算、統(tǒng)計(jì)、分析,這個時候就可以使用消息系統(tǒng),尤其是分布式消息系統(tǒng)!
Kafka是一個分布式消息系統(tǒng),由linkedin使用scala編寫. Kafka的動態(tài)擴(kuò)容是通過Zookeeper來實(shí)現(xiàn)的。
Zookeeper是一種在分布式系統(tǒng)中被廣泛用來作為:分布式狀態(tài)管理、分布式協(xié)調(diào)管理、分布式配置管理、和分布式鎖服務(wù)的集群。kafka增加和減少服務(wù)器都會在Zookeeper節(jié)點(diǎn)上觸發(fā)相應(yīng)的事件。
1、 AMQP協(xié)議(Advanced Message Queuing Protocol,高級消息隊(duì)列協(xié)議)
AMQP是一個標(biāo)準(zhǔn)開放的應(yīng)用層的消息中間件(Message Oriented Middleware)協(xié)議。AMQP定義了通過網(wǎng)絡(luò)發(fā)送的字節(jié)流的數(shù)據(jù)格式。因此兼容性非常好,任何實(shí)現(xiàn)AMQP協(xié)議的程序都可以和與AMQP協(xié)議兼容的其他程序交互,可以很容易做到跨語言,跨平臺。
2、 一些基本的概念
消費(fèi)者:(Consumer):從消息隊(duì)列中請求消息的客戶端應(yīng)用程序
生產(chǎn)者:(Producer) :向broker發(fā)布消息的應(yīng)用程序
AMQP服務(wù)端(broker):用來接收生產(chǎn)者發(fā)送的消息并將這些消息路由給服務(wù)器中的隊(duì)列,便于fafka將生產(chǎn)者發(fā)送的消息,動態(tài)的添加到磁盤并給每一條消息一個偏移量,所以對于kafka一個broker就是一個應(yīng)用程序的實(shí)例
主題(Topic):一個主題類似新聞中的體育、娛樂、教育等分類概念,在實(shí)際工程中通常一個業(yè)務(wù)一個主題。
分區(qū)(Partition):一個Topic中的消息數(shù)據(jù)按照多個分區(qū)組織,分區(qū)是kafka消息隊(duì)列組織的最小單位,一個分區(qū)可以看作是一個FIFO( First Input First Output的縮寫,先入先出隊(duì)列)的隊(duì)列。
生產(chǎn)者生產(chǎn)(push)消息、kafka集群、消費(fèi)者獲取(pull)消息這樣一種架構(gòu),kafka集群中的消息,是通過Topic(主題)來進(jìn)行組織的. 生產(chǎn)者可以選擇自己喜歡的序列化方法對消息內(nèi)容編碼。
kafka分區(qū)是提高kafka性能的關(guān)鍵所在,當(dāng)你發(fā)現(xiàn)你的集群性能不高時,常用手段就是增加Topic的分區(qū),分區(qū)里面的消息是按照從新到老的順序進(jìn)行組織,消費(fèi)者從隊(duì)列頭訂閱消息,生產(chǎn)者從隊(duì)列尾添加消息。
簡化圖如下:
我們看上面的圖,我們把broker的數(shù)量減少,只有一臺?,F(xiàn)在假設(shè)我們按照上圖進(jìn)行部署:
Server-1 broker其實(shí)就是kafka的server,因?yàn)閜roducer和consumer都要去連它。Broker主要還是做存儲用。
Server-2是zookeeper的server端,在這里你可以先想象,它維持了一張表,記錄了各個節(jié)點(diǎn)的IP、端口等信息(以后還會講到,它里面還存了kafka的相關(guān)信息)。
Server-3、4、5他們的共同之處就是都配置了zkClient,這之間的連接都是需要zookeeper來進(jìn)行分發(fā)的。
Server-1和Server-2的關(guān)系,他們可以放在一臺機(jī)器上,也可以分開放,zookeeper也可以配集群。目的是防止某一臺掛了。
kafka和JMS(Java Message Service)實(shí)現(xiàn)(activeMQ)不同的是:即使消息被消費(fèi),消息仍然不會被立即刪除.日志文件將會根據(jù)broker中的配置要求,保留一定的時間之后刪除;比如log文件保留2天,那么兩天后,文件會被清除,無論其中的消息是否被消費(fèi).但kafka并沒有提供JMS中的"事務(wù)性""消息傳輸擔(dān)保(消息確認(rèn)機(jī)制)""消息分組"等企業(yè)級特性;kafka只能使用作為"常規(guī)"的消息系統(tǒng),在一定程度上,無法確保消息的發(fā)送與接收絕對可靠(比如,消息重發(fā),消息發(fā)送丟失等)
對于consumer而言,它需要保存消費(fèi)消息的offset,對于offset的保存和使用,有consumer來控制;當(dāng)consumer正常消費(fèi)消息時,offset將會"線性"的向前驅(qū)動,即消息將依次順序被消費(fèi).事實(shí)上consumer可以使用任意順序消費(fèi)消息,它只需要將offset重置為任意值..(offset將會保存在zookeeper中)
kafka集群幾乎不需要維護(hù)任何consumer和producer狀態(tài)信息,這些信息有zookeeper保存;因此producer和consumer的實(shí)現(xiàn)非常輕量級,它們可以隨意離開,而不會對集群造成額外的影響.
partitions的目的有多個.最根本原因是kafka基于文件存儲.通過分區(qū),可以將日志內(nèi)容分散到多個上,來避免文件尺寸達(dá)到單機(jī)磁盤的上限;可以將一個topic切分多任意多個partitions.此外越多的partitions意味著可以容納更多的consumer,有效提升并發(fā)消費(fèi)的能力.
每個consumer屬于一個consumer group;反過來說,每個group中可以有多個consumer.發(fā)送到Topic的消息,只會被訂閱此Topic的每個group中的一個consumer消費(fèi)(而不是該group下的所有consumer,一定要注意這點(diǎn)).
如果所有的consumer都具有相同的group,這種情況和queue模式很像;消息將會在consumers之間負(fù)載均衡.
如果所有的consumer都具有不同的group,那這就是"發(fā)布-訂閱";消息將會廣播給所有的消費(fèi)者.
在kafka中,一個partition中的消息只會被group中的一個consumer消費(fèi);每個group中consumer消息消費(fèi)互相獨(dú)立;我們可以認(rèn)為一個group是一個"訂閱"者,一個Topic中的每個partions,只會被一個"訂閱者"中的一個consumer消費(fèi),不過一個consumer可以消費(fèi)多個partitions中的消息
注意:kafka使用文件存儲消息,這就直接決定kafka在性能上嚴(yán)重依賴文件系統(tǒng)的本身特性。
在分布式方面:
broker的部署是一種no central master的概念,并且每個節(jié)點(diǎn)都是同等的,節(jié)點(diǎn)的增加和減少都不需要改變?nèi)魏闻渲谩?/p>
producer和consumer通過zookeeper去發(fā)現(xiàn)topic,并且通過zookeeper來協(xié)調(diào)生產(chǎn)和消費(fèi)的過程。
producer、consumer和broker均采用TCP連接,通信基于NIO實(shí)現(xiàn)。Producer和consumer能自動檢測broker的增加和減少。
使用場景:
常規(guī)消息系統(tǒng)。
kafka可以作為"網(wǎng)站活性跟蹤"的最佳工具;可以將網(wǎng)頁/用戶操作等信息發(fā)送到kafka中.并實(shí)時監(jiān)控,或者離線統(tǒng)計(jì)分析等
kafka的特性決定它非常適合作為"日志收集中心";application可以將操作日志"批量""異步"的發(fā)送到kafka集群中,而不是保存在本地或者DB中;kafka可以批量提交消息/壓縮消息等,這對producer端而言,幾乎感覺不到性能的開支.此時consumer端可以使hadoop等其他系統(tǒng)化的存儲和分析系統(tǒng).
簡單說下整個系統(tǒng)運(yùn)行的順序:
1.啟動zookeeper的server
2.啟動kafka的server
3.Producer如果生產(chǎn)了數(shù)據(jù),會先通過zookeeper找到broker,然后將數(shù)據(jù)存放進(jìn)broker
4.Consumer如果要消費(fèi)數(shù)據(jù),會先通過zookeeper找對應(yīng)的broker,然后消費(fèi)。
本地單擊測試環(huán)境啟動順序:
1.啟動zookeeper server :bin/zookeeper-server-start.sh ../config/zookeeper.properties &
2.啟動kafka server: bin/kafka-server-start.sh ../config/server.properties &
3.Kafka為我們提供了一個console來做連通性測試,
先創(chuàng)建一個topic:bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test ,你可以運(yùn)行bin/kafka-topics.sh --list --zookeeper localhost:2181來檢查是否創(chuàng)建成功和topic列表
運(yùn)行producer(默認(rèn)broker端口9092):bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 這是相當(dāng)于開啟了一個producer的命令行。
4.接下來運(yùn)行consumer,新啟一個terminal:bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
5.執(zhí)行完consumer的命令后,你可以在producer的terminal中輸入信息,馬上在consumer的terminal中就會出現(xiàn)你輸?shù)男畔?。有點(diǎn)兒像一個通信客戶端。
配置項(xiàng):見http://kafka.apache.org/docum...
必要配置項(xiàng):
broker.id
log.dirs
zookeeper.connect
編程APIDOC:http://kafka.apache.org/0110/...
官方github例子: https://github.com/apache/kaf...
org.apache.kafka kafka-clients 0.11.0.0 org.apache.kafka kafka-streams 0.11.0.0
首先貼一下官方例子:
Producer:
public class MyKafkaProducer { public static void main(String[] args) { /** * 這個例子中,每次調(diào)用都會創(chuàng)建一個Producer實(shí)例,但此處只是為了演示方便,實(shí)際使用中,請將Producer作為單例使用,它是線程安全的。 * 從Kafka 0.11 開始,KafkaProducer支持兩種額外的模式:冪等(idempotent)與事務(wù)(transactional)。冪等使得之前的at least once變成exactly once傳送 * 冪等Producer的重試不再會導(dǎo)致重復(fù)消息。事務(wù)允許應(yīng)用程序以原子方式將消息發(fā)送到多個分區(qū)(和主題!) * 開啟idempotence冪等:props.put("enable.idempotence", true);設(shè)置之后retries屬性自動被設(shè)為Integer.MAX_VALUE;;acks屬性自動設(shè)為all;;max.inflight.requests.per.connection屬性自動設(shè)為1.其余一樣。 * 開啟事務(wù)性: props.put("transactional.id", "my-transactional-id");一旦這個屬性被設(shè)置,那么冪等也會自動開啟。然后使用事務(wù)API操作即可 */ } private static void send(){ Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("enable.idempotence", true);//開啟idempotence冪等 extract-once // props.put("acks", "all");//acks配置控制請求被認(rèn)為完成的條件 // props.put("retries", 0);重試次數(shù) // props.put("batch.size", 16384); // props.put("linger.ms", 1); // props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producerproducer = new KafkaProducer<>(props); for (int i = 0; i < 100; i++) producer.send(new ProducerRecord ("my-topic", Integer.toString(i), Integer.toString(i))); producer.close(); } private static void sendInTx(){ Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("transactional.id", "my-transactional-id");//要啟用事務(wù),必須配置一個唯一的事務(wù)id /** * http://kafka.apache.org/0110/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html * KafkaProducer類是線程安全的,可以在多線程之間共享。 */ Producer producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer()); producer.initTransactions(); try { producer.beginTransaction(); for (int i = 0; i < 100; i++){ // send()是異步的,會立即返回,內(nèi)部是緩存到producer的buffer中,以便于生產(chǎn)者可以批量提交, 你也可以傳遞一個回調(diào)send(ProducerRecord record, Callback callback) producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i))); } producer.commitTransaction(); } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) { //無法恢復(fù)的異常,我們只能關(guān)閉producer producer.close(); } catch (KafkaException e) { // 可恢復(fù)的異常,終止事務(wù)然后重試即可。 producer.abortTransaction(); } producer.close(); } }
發(fā)送完之后,我們可以用bin目錄下的kafka-console-consumer來看發(fā)送的結(jié)果(當(dāng)然現(xiàn)在用的topic是test)??梢杂妹睿?/p>
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
Consumer:
/** *與producer不同,Kafka consumer不是線程安全的。 */ public class MyKafkaConsumer { /** * 通過配置enable.auto.commit,auto.commit.interval.ms來定期自動提交消費(fèi)的偏移量 */ private void recieveByAutoCommitOffset(){ Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumerconsumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("foo", "bar")); while (true) { ConsumerRecords records = consumer.poll(100); for (ConsumerRecord record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } // consumer.wakeup(); } /** * 手動提交消費(fèi)的偏移量,這樣用戶可以控制記錄何時被視為已消費(fèi),從而提交其偏移量。 當(dāng)消息的消耗與一些處理邏輯相結(jié)合時,這是有用的,因?yàn)樵谕瓿商幚碇安粦?yīng)將消息視為已消費(fèi)。 */ private void recieveByManualCommitOffset(){ Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "false");//手動提交offset props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("foo", "bar")); final int minBatchSize = 200; List > buffer = new ArrayList<>(); while (true) { ConsumerRecords records = consumer.poll(100); for (ConsumerRecord record : records) { buffer.add(record); } if (buffer.size() >= minBatchSize) { // insertIntoDb(buffer); 執(zhí)行相關(guān)邏輯 consumer.commitSync();//提交offset buffer.clear(); } } } }
Streams:
public class MyKafkaStreams { public void test(){ Mapprops = new HashMap<>(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); StreamsConfig config = new StreamsConfig(props); KStreamBuilder builder = new KStreamBuilder(); builder.stream("my-input-topic").mapValues(value -> value.toString()+"!!!").to("my-output-topic"); KafkaStreams streams = new KafkaStreams(builder, config); streams.start(); } }
注意點(diǎn):
將producer寫成單例模式,有助于減少zookeeper端占用的資源。Producer自身是線程安全的類,只要封裝得當(dāng)就能最恰當(dāng)?shù)陌l(fā)揮好producer的作用。(ZkClient去連接zookeeper的server時候都會創(chuàng)建sendThread和eventThread兩個線程,其中sendThread主要用于client與server端之間的網(wǎng)絡(luò)連接,真正的處理線程由eventThread來執(zhí)行。Zookeeper是一個分布式的協(xié)調(diào)框架,而分布式應(yīng)用中經(jīng)常會出現(xiàn)動態(tài)的增加或刪除節(jié)點(diǎn)的操作,所以為了實(shí)時了解分布式整個節(jié)點(diǎn)的數(shù)量和基本信息,就有必要維護(hù)一個長連接的線程與服務(wù)端保持連接。另外zookeeper連接時占用的時間也比較長,如果每次生產(chǎn)數(shù)據(jù)時都連接發(fā)起一次連接勢必造成了大量時間的耗費(fèi)。)
kafka是將消息按照topic的形式存儲,一個topic會按照partition存在同一個文件夾下,目錄在config/server.properties中指定:
# The directory under which to store log files log.dir=/tmp/kafka-logs
在消息系統(tǒng)中都會有這樣一個問題存在,數(shù)據(jù)消費(fèi)狀態(tài)這個信息到底存哪里。是存在consumer端,還是存在broker端。對于這樣的爭論,一般會出現(xiàn)三種情況:
At most once :消息一旦發(fā)出就立馬標(biāo)記已消費(fèi),不會再有第二發(fā)生即使失敗了,缺點(diǎn)是容易丟失消息。
At least once :消息至少發(fā)送一次,如果消息未能接受成功,可能會重發(fā),直到接收成功.
Exactly once :每個消息僅發(fā)生一次,而且一次就能確保到達(dá)。這是理想狀態(tài)。(kafka0.11支持冪等之后,在開啟冪等的情況下,就是這種模式)
at most once: 消費(fèi)者fetch消息,然后保存offset,然后處理消息;當(dāng)client保存offset之后,但是在消息處理過程中出現(xiàn)了異常,導(dǎo)致部分消息未能繼續(xù)處理.那么此后"未處理"的消息將不能被fetch到,這就是"atmost once".
at least once: 消費(fèi)者fetch消息,然后處理消息,然后保存offset.如果消息處理成功之后,但是在保存offset階段zookeeper異常導(dǎo)致保存操作未能執(zhí)行成功,這就導(dǎo)致接下來再次fetch時可能獲得上次已經(jīng)處理過的消息,這就是"at least once",原因offset沒有及時的提交給zookeeper,zookeeper恢復(fù)正常還是之前offset狀態(tài).
logback-kafka集成例子https://github.com/xbynet/log...
參考:http://kafka.apache.org/docum...
http://kafka.apache.org/intro...
https://my.oschina.net/ielts0...
http://blog.csdn.net/my_bai/a...
http://www.infoq.com/cn/artic...
http://www.cnblogs.com/likehu...
http://www.cnblogs.com/likehu...
https://www.iteblog.com/archi...
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/67313.html
摘要:作者胡夕人人貸計(jì)算平臺部總監(jiān),將在這篇專欄中一步一步的教你填平這些坑,全面提升你的實(shí)戰(zhàn)能力搭配掘金小冊圖解之核心原理學(xué)習(xí)效果更佳哦送學(xué)習(xí)筆記 showImg(https://segmentfault.com/img/bVbsg9O?w=258&h=258);關(guān)注有課學(xué)微信公眾號,回復(fù)暗號 kafka 獲取購買《Kafka核心技術(shù)與實(shí)戰(zhàn)》極客時間專欄地址,購買成功后提交購買截圖即可獲得返...
摘要:可靠性一旦數(shù)據(jù)更新成功,將一直保持,直到新的更新。這是一種主動的分布式數(shù)據(jù)結(jié)構(gòu),能夠在外部情況發(fā)生變化時候主動修改數(shù)據(jù)項(xiàng)狀態(tài)的數(shù)據(jù)機(jī)構(gòu)。如果監(jiān)視節(jié)點(diǎn)狀態(tài)發(fā)生變化,則跳轉(zhuǎn)到第步,繼續(xù)進(jìn)行后續(xù)的操作,直到退出鎖競爭。 題外話:從字面上來看,ZooKeeper表示動物園管理員,而Hadoop生態(tài)系統(tǒng)中,許多項(xiàng)目的Logo都采用了動物,比如Hadoop采用了大象的形象,所以可以ZooKeepe...
閱讀 918·2021-10-27 14:19
閱讀 1145·2021-10-15 09:42
閱讀 1567·2021-09-14 18:02
閱讀 765·2019-08-30 13:09
閱讀 3010·2019-08-29 15:08
閱讀 2113·2019-08-28 18:05
閱讀 977·2019-08-26 10:25
閱讀 2813·2019-08-23 16:28