摘要:消息以為類別記錄將消息種子分類每一類的消息稱之為一個(gè)主題。這意味著生產(chǎn)者不等待來自同步完成的確認(rèn)繼續(xù)發(fā)送下一條批消息。這意味著在已成功收到的數(shù)據(jù)并得到確認(rèn)后發(fā)送下一條。三種機(jī)制,性能依次遞減吞吐量降低,數(shù)據(jù)健壯性則依次遞增。
kafka 簡(jiǎn)介
Kafka 是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng)kafka角色必知
producer:生產(chǎn)者。 consumer:消費(fèi)者。 topic: 消息以topic為類別記錄,Kafka將消息種子(Feed)分類, 每一類的消息稱之為一個(gè)主題(Topic)。 broker:以集群的方式運(yùn)行,可以由一個(gè)或多個(gè)服務(wù)組成,每個(gè)服務(wù)叫做一個(gè)broker;消費(fèi)者可以訂閱一個(gè)或多個(gè)主題(topic), 并從Broker拉數(shù)據(jù),從而消費(fèi)這些已發(fā)布的消息。經(jīng)典模型
1. 一個(gè)主題下的分區(qū)不能小于消費(fèi)者數(shù)量,即一個(gè)主題下消費(fèi)者數(shù)量不能大于分區(qū)屬,大了就浪費(fèi)了空閑了 2. 一個(gè)主題下的一個(gè)分區(qū)可以同時(shí)被不同消費(fèi)組其中某一個(gè)消費(fèi)者消費(fèi) 3. 一個(gè)主題下的一個(gè)分區(qū)只能被同一個(gè)消費(fèi)組的一個(gè)消費(fèi)者消費(fèi)常用參數(shù)說明 request.required.acks
Kafka producer的ack有3中機(jī)制,初始化producer時(shí)的producerconfig可以通過配置request.required.acks不同的值來實(shí)現(xiàn)。 0:這意味著生產(chǎn)者producer不等待來自broker同步完成的確認(rèn)繼續(xù)發(fā)送下一條(批)消息。此選項(xiàng)提供最低的延遲但最弱的耐久性保證(當(dāng)服務(wù)器發(fā)生故障時(shí)某些數(shù)據(jù)會(huì)丟失,如leader已死,但producer并不知情,發(fā)出去的信息broker就收不到)。 1:這意味著producer在leader已成功收到的數(shù)據(jù)并得到確認(rèn)后發(fā)送下一條message。此選項(xiàng)提供了更好的耐久性為客戶等待服務(wù)器確認(rèn)請(qǐng)求成功(被寫入死亡leader但尚未復(fù)制將失去了唯一的消息)。 -1:這意味著producer在follower副本確認(rèn)接收到數(shù)據(jù)后才算一次發(fā)送完成。 此選項(xiàng)提供最好的耐久性,我們保證沒有信息將丟失,只要至少一個(gè)同步副本保持存活。 三種機(jī)制,性能依次遞減 (producer吞吐量降低),數(shù)據(jù)健壯性則依次遞增。auto.offset.reset
1. earliest:自動(dòng)將偏移重置為最早的偏移量 2. latest:自動(dòng)將偏移量重置為最新的偏移量(默認(rèn)) 3. none:如果consumer group沒有發(fā)現(xiàn)先前的偏移量,則向consumer拋出異常。 4. 其他的參數(shù):向consumer拋出異常(無效參數(shù))kafka安裝和簡(jiǎn)單測(cè)試 安裝kafka(不需要安裝,解包即可)
# 官方下載地址:http://kafka.apache.org/downloads # wget https://www.apache.org/dyn/closer.cgi?path=/kafka/1.1.1/kafka_2.12-1.1.1.tgz tar -xzf kafka_2.12-1.1.1.tgz cd kafka_2.12-1.1.0啟動(dòng)kafka server
# 需先啟動(dòng)zookeeper # -daemon 可啟動(dòng)后臺(tái)守護(hù)模式 bin/zookeeper-server-start.sh config/zookeeper.properties bin/kafka-server-start.sh config/server.properties啟動(dòng)kafka客戶端測(cè)試
# 創(chuàng)建一個(gè)話題,test話題2個(gè)分區(qū) bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test Created topic "test". # 顯示所有話題 bin/kafka-topics.sh --list --zookeeper localhost:2181 test # 顯示話題信息 bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test Topic:test PartitionCount:2 ReplicationFactor:1 Configs: Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: test Partition: 1 Leader: 0 Replicas: 0 Isr: 0 # 啟動(dòng)一個(gè)生產(chǎn)者(輸入消息) bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test [等待輸入自己的內(nèi)容 出現(xiàn)>輸入即可] >i am a new msg ! >i am a good msg ? # 啟動(dòng)一個(gè)消費(fèi)者(等待消息) # 注意這里的--from-beginning,每次都會(huì)從頭開始讀取,你可以嘗試去掉和不去掉看下效果 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning [等待消息] i am a new msg ! i am a good msg ?安裝kafka的php擴(kuò)展
# 先安裝rdkfka庫(kù)文件 git clone https://github.com/edenhill/librdkafka.git cd librdkafka/ ./configure make sudo make install git clone https://github.com/arnaud-lb/php-rdkafka.git cd php-rdkafka phpize ./configure make all -j 5 sudo make install vim [php]/php.ini extension=rdkafka.sophp代碼實(shí)踐 生產(chǎn)者
setDrMsgCb(function ($kafka, $message) { file_put_contents("./dr_cb.log", var_export($message, true).PHP_EOL, FILE_APPEND); }); $conf->setErrorCb(function ($kafka, $err, $reason) { file_put_contents("./err_cb.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND); }); $rk = new RdKafkaProducer($conf); $rk->setLogLevel(LOG_DEBUG); $rk->addBrokers("127.0.0.1"); $cf = new RdKafkaTopicConf(); // -1必須等所有brokers同步完成的確認(rèn) 1當(dāng)前服務(wù)器確認(rèn) 0不確認(rèn),這里如果是0回調(diào)里的offset無返回,如果是1和-1會(huì)返回offset // 我們可以利用該機(jī)制做消息生產(chǎn)的確認(rèn),不過還不是100%,因?yàn)橛锌赡軙?huì)中途kafka服務(wù)器掛掉 $cf->set("request.required.acks", 0); $topic = $rk->newTopic("test", $cf); $option = "qkl"; for ($i = 0; $i < 20; $i++) { //RD_KAFKA_PARTITION_UA自動(dòng)選擇分區(qū) //$option可選 $topic->produce(RD_KAFKA_PARTITION_UA, 0, "qkl . $i", $option); } $len = $rk->getOutQLen(); while ($len > 0) { $len = $rk->getOutQLen(); var_dump($len); $rk->poll(50); }運(yùn)行生產(chǎn)者
php producer.php # output int(20) int(20) int(20) int(20) int(0) # 你可以查看你剛才上面啟動(dòng)的消費(fèi)者shell應(yīng)該會(huì)輸出消息 qkl . 0 qkl . 1 qkl . 2 qkl . 3 qkl . 4 qkl . 5 qkl . 6 qkl . 7 qkl . 8 qkl . 9 qkl . 10 qkl . 11 qkl . 12 qkl . 13 qkl . 14 qkl . 15 qkl . 16 qkl . 17 qkl . 18 qkl . 19Low Level 消費(fèi)者
setDrMsgCb(function ($kafka, $message) { file_put_contents("./c_dr_cb.log", var_export($message, true), FILE_APPEND); }); $conf->setErrorCb(function ($kafka, $err, $reason) { file_put_contents("./err_cb.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND); }); //設(shè)置消費(fèi)組 $conf->set("group.id", "myConsumerGroup"); $rk = new RdKafkaConsumer($conf); $rk->addBrokers("127.0.0.1"); $topicConf = new RdKafkaTopicConf(); $topicConf->set("request.required.acks", 1); //在interval.ms的時(shí)間內(nèi)自動(dòng)提交確認(rèn)、建議不要啟動(dòng) //$topicConf->set("auto.commit.enable", 1); $topicConf->set("auto.commit.enable", 0); $topicConf->set("auto.commit.interval.ms", 100); // 設(shè)置offset的存儲(chǔ)為file //$topicConf->set("offset.store.method", "file"); // 設(shè)置offset的存儲(chǔ)為broker $topicConf->set("offset.store.method", "broker"); //$topicConf->set("offset.store.path", __DIR__); //smallest:簡(jiǎn)單理解為從頭開始消費(fèi),其實(shí)等價(jià)于上面的 earliest //largest:簡(jiǎn)單理解為從最新的開始消費(fèi),其實(shí)等價(jià)于上面的 latest //$topicConf->set("auto.offset.reset", "smallest"); $topic = $rk->newTopic("test", $topicConf); // 參數(shù)1消費(fèi)分區(qū)0 // RD_KAFKA_OFFSET_BEGINNING 重頭開始消費(fèi) // RD_KAFKA_OFFSET_STORED 最后一條消費(fèi)的offset記錄開始消費(fèi) // RD_KAFKA_OFFSET_END 最后一條消費(fèi) $topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING); //$topic->consumeStart(0, RD_KAFKA_OFFSET_END); // //$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED); while (true) { //參數(shù)1表示消費(fèi)分區(qū),這里是分區(qū)0 //參數(shù)2表示同步阻塞多久 $message = $topic->consume(0, 12 * 1000); if (is_null($message)) { sleep(1); echo "No more messages "; continue; } switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: var_dump($message); break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: echo "No more messages; will wait for more "; break; case RD_KAFKA_RESP_ERR__TIMED_OUT: echo "Timed out "; break; default: throw new Exception($message->errstr(), $message->err); break; } }High LEVEL消費(fèi)者
assign(); // $kafka->assign([new RdKafkaTopicPartition("qkl01", 0, 0)]); break; case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: echo "Revoke: "; var_dump($partitions); $kafka->assign(NULL); break; default: throw new Exception($err); } } // Set a rebalance callback to log partition assignments (optional) $conf->setRebalanceCb(function(RdKafkaKafkaConsumer $kafka, $err, array $partitions = null) { rebalance($kafka, $err, $partitions); }); // Configure the group.id. All consumer with the same group.id will consume // different partitions. $conf->set("group.id", "test-110-g100"); // Initial list of Kafka brokers $conf->set("metadata.broker.list", "192.168.216.122"); $topicConf = new RdKafkaTopicConf(); $topicConf->set("request.required.acks", -1); //在interval.ms的時(shí)間內(nèi)自動(dòng)提交確認(rèn)、建議不要啟動(dòng) $topicConf->set("auto.commit.enable", 0); //$topicConf->set("auto.commit.enable", 0); $topicConf->set("auto.commit.interval.ms", 100); // 設(shè)置offset的存儲(chǔ)為file $topicConf->set("offset.store.method", "file"); $topicConf->set("offset.store.path", __DIR__); // 設(shè)置offset的存儲(chǔ)為broker // $topicConf->set("offset.store.method", "broker"); // Set where to start consuming messages when there is no initial offset in // offset store or the desired offset is out of range. // "smallest": start from the beginning $topicConf->set("auto.offset.reset", "smallest"); // Set the configuration to use for subscribed/assigned topics $conf->setDefaultTopicConf($topicConf); $consumer = new RdKafkaKafkaConsumer($conf); //$KafkaConsumerTopic = $consumer->newTopic("qkl01", $topicConf); // Subscribe to topic "test" $consumer->subscribe(["qkl01"]); echo "Waiting for partition assignment... (make take some time when "; echo "quickly re-joining the group after leaving it.) "; while (true) { $message = $consumer->consume(120*1000); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: var_dump($message); // $consumer->commit($message); // $KafkaConsumerTopic->offsetStore(0, 20); break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: echo "No more messages; will wait for more "; break; case RD_KAFKA_RESP_ERR__TIMED_OUT: echo "Timed out "; break; default: throw new Exception($message->errstr(), $message->err); break; } }消費(fèi)組特別說明
特別注意,High LEVEL消費(fèi)者設(shè)置的消費(fèi)組,kafka服務(wù)器才會(huì)記錄, Low Level消費(fèi)者設(shè)置的消費(fèi)組,服務(wù)器不會(huì)記錄
具體查看消費(fèi)組信息,你可以翻閱本篇文章
查看服務(wù)器元數(shù)據(jù)(topic/partition/broker)setDrMsgCb(function ($kafka, $message) { file_put_contents("./xx.log", var_export($message, true), FILE_APPEND); }); $conf->setErrorCb(function ($kafka, $err, $reason) { printf("Kafka error: %s (reason: %s) ", rd_kafka_err2str($err), $reason); }); $conf->set("group.id", "myConsumerGroup"); $rk = new RdKafkaConsumer($conf); $rk->addBrokers("127.0.0.1"); $allInfo = $rk->metadata(true, NULL, 60e3); $topics = $allInfo->getTopics(); echo rd_kafka_offset_tail(100); echo "--"; echo count($topics); echo "--"; foreach ($topics as $topic) { $topicName = $topic->getTopic(); if ($topicName == "__consumer_offsets") { continue ; } $partitions = $topic->getPartitions(); foreach ($partitions as $partition) { // $rf = new ReflectionClass(get_class($partition)); // foreach ($rf->getMethods() as $f) { // var_dump($f); // } // die(); $topPartition = new RdKafkaTopicPartition($topicName, $partition->getId()); echo "當(dāng)前的話題:" . ($topPartition->getTopic()) . " - " . $partition->getId() . " - "; echo "offset:" . ($topPartition->getOffset()) . PHP_EOL; } }如果需遠(yuǎn)端生產(chǎn)和消費(fèi)
vim config/server.properties advertised.listeners=PLAINTEXT://ip:9092 # ip 未你kafka的外網(wǎng)ip即可分享一個(gè)打包好的php-rdkafka的類庫(kù)
https://github.com/qkl9527/php-rdkafka-class
參考文獻(xiàn)Kafka文檔
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/29117.html
摘要:閱讀本教程前最好先嘗試閱讀下的實(shí)踐自帶命令實(shí)踐嘗試實(shí)踐的知識(shí)創(chuàng)建話題生產(chǎn)消息消費(fèi)消息話題信息獲取消費(fèi)組獲取消費(fèi)組的自帶的命令安裝目錄的目錄下代表我們會(huì)使用的腳本 閱讀本教程前最好先嘗試閱讀:PHP下kafka的實(shí)踐 自帶命令實(shí)踐 嘗試實(shí)踐的kafka知識(shí): 創(chuàng)建話題 生產(chǎn)消息 消費(fèi)消息 話題信息 獲取消費(fèi)組 獲取消費(fèi)組的offset 自帶的命令 # kafka安裝目錄的bin目錄下...
閱讀 2238·2021-09-24 10:31
閱讀 3889·2021-09-22 15:16
閱讀 3411·2021-09-22 10:02
閱讀 1026·2021-09-22 10:02
閱讀 1842·2021-09-08 09:36
閱讀 1988·2019-08-30 14:18
閱讀 617·2019-08-30 10:51
閱讀 1881·2019-08-29 11:08