摘要:集群部署安裝依賴可以參閱支持的客戶端版本生產(chǎn)者連接集群,創(chuàng)建,生產(chǎn)數(shù)據(jù)。鏈接集群創(chuàng)建消費(fèi)者自動(dòng)分配,,。消費(fèi)者指定消費(fèi)。沒(méi)有消費(fèi)組的概念,也可以認(rèn)為每個(gè)消費(fèi)者都屬于一個(gè)獨(dú)立消費(fèi)組。
Kafka集群部署
安裝rdkafkardkafka 依賴 libkafka
yum install rdkafka rdkafka-devel pecl install rdkafka php --ri rdkafka
http://pecl.php.net/package/r... 可以參閱支持的kafka客戶端版本
生產(chǎn)者連接集群,創(chuàng)建 topic,生產(chǎn)數(shù)據(jù)。
setLogLevel(LOG_DEBUG); // 鏈接kafka集群 $rk->addBrokers("192.168.20.6:9092,192.168.20.6:9093"); // 創(chuàng)建topic $topic = $rk->newTopic("topic_1"); while (true) { $message = "hello kafka " . date("Y-m-d H:i:s"); echo "hello kafka " . date("Y-m-d H:i:s") . PHP_EOL; try { $topic->produce(RD_KAFKA_PARTITION_UA, 0, $message); sleep(2); } catch (Exception $e) { echo $e->getMessage() . PHP_EOL; } }消費(fèi)者-HighLevel
自動(dòng)分配partition,rebalance,comsumer group。
setRebalanceCb(function (RdKafkaKafkaConsumer $kafka, $err, array $partitions = null) { switch ($err) { case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: echo "Assign: "; var_dump($partitions); $kafka->assign($partitions); break; case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: echo "Revoke: "; var_dump($partitions); $kafka->assign(null); break; default: throw new Exception($err); } }); // Configure the group.id. All consumer with the same group.id will consume // different partitions. $conf->set("group.id", "group_1"); // Initial list of Kafka brokers $conf->set("metadata.broker.list", "192.168.20.6:9092,192.168.20.6:9093"); $topicConf = new RdKafkaTopicConf(); // Set where to start consuming messages when there is no initial offset in // offset store or the desired offset is out of range. // "smallest": start from the beginning $topicConf->set("auto.offset.reset", "smallest"); // Set the configuration to use for subscribed/assigned topics $conf->setDefaultTopicConf($topicConf); $consumer = new RdKafkaKafkaConsumer($conf); // Subscribe to topic "topic_1" $consumer->subscribe(["topic_1"]); echo "Waiting for partition assignment... (make take some time when "; echo "quickly re-joining the group after leaving it.) "; while (true) { $message = $consumer->consume(3e3); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: var_dump($message); break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: sleep(2); case RD_KAFKA_RESP_ERR__TIMED_OUT: echo $message->errstr() . PHP_EOL; break; default: throw new Exception($message->errstr(), $message->err); break; } }消費(fèi)者-LowLevel
指定partition消費(fèi)。
php consumer_lowlevel.php [partitonNuo]
LowLevel 沒(méi)有消費(fèi)組的概念,也可以認(rèn)為每個(gè)消費(fèi)者都屬于一個(gè)獨(dú)立消費(fèi)組。
set("group.id", "group_2"); $rk = new RdKafkaConsumer($conf); $rk->addBrokers("192.168.20.6:9092,192.168.20.6:9093"); $topicConf = new RdKafkaTopicConf(); $topicConf->set("auto.commit.interval.ms", 2000); // Set the offset store method to "file" // $topicConf->set("offset.store.method", "file"); // $topicConf->set("offset.store.path", sys_get_temp_dir()); // Alternatively, set the offset store method to "broker" $topicConf->set("offset.store.method", "broker"); // Set where to start consuming messages when there is no initial offset in // offset store or the desired offset is out of range. // "smallest": start from the beginning $topicConf->set("auto.offset.reset", "smallest"); $topic = $rk->newTopic($topic, $topicConf); // Start consuming partition 0 $topic->consumeStart($partition, RD_KAFKA_OFFSET_STORED); while (true) { $message = $topic->consume($partition, 3 * 1000); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: var_dump($message); break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: case RD_KAFKA_RESP_ERR__TIMED_OUT: echo $message->errstr() . PHP_EOL; break; default: throw new Exception($message->errstr(), $message->err); break; } }
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/31031.html
摘要:消息以為類別記錄將消息種子分類每一類的消息稱之為一個(gè)主題。這意味著生產(chǎn)者不等待來(lái)自同步完成的確認(rèn)繼續(xù)發(fā)送下一條批消息。這意味著在已成功收到的數(shù)據(jù)并得到確認(rèn)后發(fā)送下一條。三種機(jī)制,性能依次遞減吞吐量降低,數(shù)據(jù)健壯性則依次遞增。 kafka 簡(jiǎn)介 Kafka 是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng) kafka角色必知 producer:生產(chǎn)者。 consumer:消費(fèi)者。 topic: 消...
摘要:最近項(xiàng)目開(kāi)發(fā)中需要使用消息隊(duì)列。不過(guò)在環(huán)境中安裝的過(guò)程中出現(xiàn)了以下報(bào)錯(cuò)開(kāi)始以為是因?yàn)榘惭b缺少了一些依賴。然后使用了源碼編譯的方式進(jìn)行安裝同樣報(bào)錯(cuò)了。然后安裝它再執(zhí)行,執(zhí)行。擴(kuò)展包使用純粹的編寫(xiě)的客戶端,目前支持以上版本的。 最近項(xiàng)目開(kāi)發(fā)中需要使用 Kafka 消息隊(duì)列。經(jīng)過(guò)檢索,PHP下面有通用的兩種方式來(lái)調(diào)用 Kafka 。 php-rdkafka 擴(kuò)展 以 PHP 擴(kuò)展的形式進(jìn)行...
閱讀 544·2023-04-26 01:39
閱讀 4524·2021-11-16 11:45
閱讀 2623·2021-09-27 13:37
閱讀 899·2021-09-01 10:50
閱讀 3610·2021-08-16 10:50
閱讀 2232·2019-08-30 15:55
閱讀 2995·2019-08-30 15:55
閱讀 2265·2019-08-30 14:07