成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

Kafka - PHP 使用 Rdkafka 生產(chǎn)/消費(fèi)數(shù)據(jù)

BetaRabbit / 3643人閱讀

摘要:集群部署安裝依賴可以參閱支持的客戶端版本生產(chǎn)者連接集群,創(chuàng)建,生產(chǎn)數(shù)據(jù)。鏈接集群創(chuàng)建消費(fèi)者自動(dòng)分配,,。消費(fèi)者指定消費(fèi)。沒(méi)有消費(fèi)組的概念,也可以認(rèn)為每個(gè)消費(fèi)者都屬于一個(gè)獨(dú)立消費(fèi)組。

Kafka集群部署

安裝rdkafka

rdkafka 依賴 libkafka

yum install rdkafka rdkafka-devel
pecl install rdkafka
php --ri rdkafka

http://pecl.php.net/package/r... 可以參閱支持的kafka客戶端版本

生產(chǎn)者

連接集群,創(chuàng)建 topic,生產(chǎn)數(shù)據(jù)。

setLogLevel(LOG_DEBUG);

// 鏈接kafka集群
$rk->addBrokers("192.168.20.6:9092,192.168.20.6:9093");

// 創(chuàng)建topic
$topic = $rk->newTopic("topic_1");

while (true) {
    $message = "hello kafka " . date("Y-m-d H:i:s");
    echo "hello kafka " . date("Y-m-d H:i:s") . PHP_EOL;

    try {
        $topic->produce(RD_KAFKA_PARTITION_UA, 0, $message);
        sleep(2);
    } catch (Exception $e) {
        echo $e->getMessage() . PHP_EOL;
    }
}
消費(fèi)者-HighLevel

自動(dòng)分配partition,rebalancecomsumer group。

setRebalanceCb(function (RdKafkaKafkaConsumer $kafka, $err, array $partitions = null) {
    switch ($err) {
        case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
            echo "Assign: ";
            var_dump($partitions);
            $kafka->assign($partitions);
            break;
        case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
            echo "Revoke: ";
            var_dump($partitions);
            $kafka->assign(null);
            break;
        default:
            throw new Exception($err);
    }
});

// Configure the group.id. All consumer with the same group.id will consume
// different partitions.
$conf->set("group.id", "group_1");

// Initial list of Kafka brokers
$conf->set("metadata.broker.list", "192.168.20.6:9092,192.168.20.6:9093");

$topicConf = new RdKafkaTopicConf();

// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// "smallest": start from the beginning
$topicConf->set("auto.offset.reset", "smallest");

// Set the configuration to use for subscribed/assigned topics
$conf->setDefaultTopicConf($topicConf);

$consumer = new RdKafkaKafkaConsumer($conf);

// Subscribe to topic "topic_1"
$consumer->subscribe(["topic_1"]);

echo "Waiting for partition assignment... (make take some time when
";
echo "quickly re-joining the group after leaving it.)
";

while (true) {
    $message = $consumer->consume(3e3);
    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            var_dump($message);
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
            sleep(2);
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            echo $message->errstr() . PHP_EOL;
            break;
        default:
            throw new Exception($message->errstr(), $message->err);
            break;
    }
}
消費(fèi)者-LowLevel

指定partition消費(fèi)。
php consumer_lowlevel.php [partitonNuo]
LowLevel 沒(méi)有消費(fèi)組的概念,也可以認(rèn)為每個(gè)消費(fèi)者都屬于一個(gè)獨(dú)立消費(fèi)組。

set("group.id", "group_2");

$rk = new RdKafkaConsumer($conf);
$rk->addBrokers("192.168.20.6:9092,192.168.20.6:9093");

$topicConf = new RdKafkaTopicConf();
$topicConf->set("auto.commit.interval.ms", 2000);

// Set the offset store method to "file"
// $topicConf->set("offset.store.method", "file");
// $topicConf->set("offset.store.path", sys_get_temp_dir());

// Alternatively, set the offset store method to "broker"
$topicConf->set("offset.store.method", "broker");

// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// "smallest": start from the beginning
$topicConf->set("auto.offset.reset", "smallest");

$topic = $rk->newTopic($topic, $topicConf);

// Start consuming partition 0
$topic->consumeStart($partition, RD_KAFKA_OFFSET_STORED);

while (true) {
    $message = $topic->consume($partition, 3 * 1000);
    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            var_dump($message);
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            echo $message->errstr() . PHP_EOL;
            break;
        default:
            throw new Exception($message->errstr(), $message->err);
            break;
    }
}

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/31031.html

相關(guān)文章

  • PHPkafka的實(shí)踐

    摘要:消息以為類別記錄將消息種子分類每一類的消息稱之為一個(gè)主題。這意味著生產(chǎn)者不等待來(lái)自同步完成的確認(rèn)繼續(xù)發(fā)送下一條批消息。這意味著在已成功收到的數(shù)據(jù)并得到確認(rèn)后發(fā)送下一條。三種機(jī)制,性能依次遞減吞吐量降低,數(shù)據(jù)健壯性則依次遞增。 kafka 簡(jiǎn)介 Kafka 是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng) kafka角色必知 producer:生產(chǎn)者。 consumer:消費(fèi)者。 topic: 消...

    Codeing_ls 評(píng)論0 收藏0
  • PHP 使用 Kafka 安裝拾遺

    摘要:最近項(xiàng)目開(kāi)發(fā)中需要使用消息隊(duì)列。不過(guò)在環(huán)境中安裝的過(guò)程中出現(xiàn)了以下報(bào)錯(cuò)開(kāi)始以為是因?yàn)榘惭b缺少了一些依賴。然后使用了源碼編譯的方式進(jìn)行安裝同樣報(bào)錯(cuò)了。然后安裝它再執(zhí)行,執(zhí)行。擴(kuò)展包使用純粹的編寫(xiě)的客戶端,目前支持以上版本的。 最近項(xiàng)目開(kāi)發(fā)中需要使用 Kafka 消息隊(duì)列。經(jīng)過(guò)檢索,PHP下面有通用的兩種方式來(lái)調(diào)用 Kafka 。 php-rdkafka 擴(kuò)展 以 PHP 擴(kuò)展的形式進(jìn)行...

    SimonMa 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<