成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

PHP下kafka的實(shí)踐

Codeing_ls / 1055人閱讀

摘要:消息以為類別記錄將消息種子分類每一類的消息稱之為一個(gè)主題。這意味著生產(chǎn)者不等待來自同步完成的確認(rèn)繼續(xù)發(fā)送下一條批消息。這意味著在已成功收到的數(shù)據(jù)并得到確認(rèn)后發(fā)送下一條。三種機(jī)制,性能依次遞減吞吐量降低,數(shù)據(jù)健壯性則依次遞增。

kafka 簡(jiǎn)介
Kafka 是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng)
kafka角色必知
producer:生產(chǎn)者。
consumer:消費(fèi)者。
topic: 消息以topic為類別記錄,Kafka將消息種子(Feed)分類, 每一類的消息稱之為一個(gè)主題(Topic)。
broker:以集群的方式運(yùn)行,可以由一個(gè)或多個(gè)服務(wù)組成,每個(gè)服務(wù)叫做一個(gè)broker;消費(fèi)者可以訂閱一個(gè)或多個(gè)主題(topic), 并從Broker拉數(shù)據(jù),從而消費(fèi)這些已發(fā)布的消息。
經(jīng)典模型
1. 一個(gè)主題下的分區(qū)不能小于消費(fèi)者數(shù)量,即一個(gè)主題下消費(fèi)者數(shù)量不能大于分區(qū)屬,大了就浪費(fèi)了空閑了
2. 一個(gè)主題下的一個(gè)分區(qū)可以同時(shí)被不同消費(fèi)組其中某一個(gè)消費(fèi)者消費(fèi)
3. 一個(gè)主題下的一個(gè)分區(qū)只能被同一個(gè)消費(fèi)組的一個(gè)消費(fèi)者消費(fèi)

常用參數(shù)說明 request.required.acks
Kafka producer的ack有3中機(jī)制,初始化producer時(shí)的producerconfig可以通過配置request.required.acks不同的值來實(shí)現(xiàn)。

0:這意味著生產(chǎn)者producer不等待來自broker同步完成的確認(rèn)繼續(xù)發(fā)送下一條(批)消息。此選項(xiàng)提供最低的延遲但最弱的耐久性保證(當(dāng)服務(wù)器發(fā)生故障時(shí)某些數(shù)據(jù)會(huì)丟失,如leader已死,但producer并不知情,發(fā)出去的信息broker就收不到)。

1:這意味著producer在leader已成功收到的數(shù)據(jù)并得到確認(rèn)后發(fā)送下一條message。此選項(xiàng)提供了更好的耐久性為客戶等待服務(wù)器確認(rèn)請(qǐng)求成功(被寫入死亡leader但尚未復(fù)制將失去了唯一的消息)。

-1:這意味著producer在follower副本確認(rèn)接收到數(shù)據(jù)后才算一次發(fā)送完成。 
此選項(xiàng)提供最好的耐久性,我們保證沒有信息將丟失,只要至少一個(gè)同步副本保持存活。

三種機(jī)制,性能依次遞減 (producer吞吐量降低),數(shù)據(jù)健壯性則依次遞增。
auto.offset.reset
1. earliest:自動(dòng)將偏移重置為最早的偏移量
2. latest:自動(dòng)將偏移量重置為最新的偏移量(默認(rèn))
3. none:如果consumer group沒有發(fā)現(xiàn)先前的偏移量,則向consumer拋出異常。
4. 其他的參數(shù):向consumer拋出異常(無效參數(shù))
kafka安裝和簡(jiǎn)單測(cè)試 安裝kafka(不需要安裝,解包即可)
# 官方下載地址:http://kafka.apache.org/downloads
# wget https://www.apache.org/dyn/closer.cgi?path=/kafka/1.1.1/kafka_2.12-1.1.1.tgz
tar -xzf kafka_2.12-1.1.1.tgz
cd kafka_2.12-1.1.0
啟動(dòng)kafka server
# 需先啟動(dòng)zookeeper
# -daemon 可啟動(dòng)后臺(tái)守護(hù)模式
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
啟動(dòng)kafka客戶端測(cè)試
# 創(chuàng)建一個(gè)話題,test話題2個(gè)分區(qū)
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test
Created topic "test".

# 顯示所有話題
bin/kafka-topics.sh --list --zookeeper localhost:2181
test

# 顯示話題信息
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test    PartitionCount:2    ReplicationFactor:1    Configs:
    Topic: test    Partition: 0    Leader: 0    Replicas: 0    Isr: 0
    Topic: test    Partition: 1    Leader: 0    Replicas: 0    Isr: 0


# 啟動(dòng)一個(gè)生產(chǎn)者(輸入消息)
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
[等待輸入自己的內(nèi)容 出現(xiàn)>輸入即可]
>i am a new msg !
>i am a good msg ?

# 啟動(dòng)一個(gè)消費(fèi)者(等待消息) 
# 注意這里的--from-beginning,每次都會(huì)從頭開始讀取,你可以嘗試去掉和不去掉看下效果
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
[等待消息]
i am a new msg !
i am a good msg ?
安裝kafka的php擴(kuò)展
# 先安裝rdkfka庫(kù)文件
git clone https://github.com/edenhill/librdkafka.git
cd librdkafka/
./configure 
make
sudo make install

git clone https://github.com/arnaud-lb/php-rdkafka.git
cd php-rdkafka
phpize
./configure
make all -j 5
sudo make install

vim [php]/php.ini
extension=rdkafka.so
php代碼實(shí)踐 生產(chǎn)者
setDrMsgCb(function ($kafka, $message) {
    file_put_contents("./dr_cb.log", var_export($message, true).PHP_EOL, FILE_APPEND);
});
$conf->setErrorCb(function ($kafka, $err, $reason) {
    file_put_contents("./err_cb.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND);
});

$rk = new RdKafkaProducer($conf);
$rk->setLogLevel(LOG_DEBUG);
$rk->addBrokers("127.0.0.1");

$cf = new RdKafkaTopicConf();
// -1必須等所有brokers同步完成的確認(rèn) 1當(dāng)前服務(wù)器確認(rèn) 0不確認(rèn),這里如果是0回調(diào)里的offset無返回,如果是1和-1會(huì)返回offset
// 我們可以利用該機(jī)制做消息生產(chǎn)的確認(rèn),不過還不是100%,因?yàn)橛锌赡軙?huì)中途kafka服務(wù)器掛掉
$cf->set("request.required.acks", 0);
$topic = $rk->newTopic("test", $cf);

$option = "qkl";
for ($i = 0; $i < 20; $i++) {
    //RD_KAFKA_PARTITION_UA自動(dòng)選擇分區(qū)
    //$option可選
    $topic->produce(RD_KAFKA_PARTITION_UA, 0, "qkl . $i", $option);
}


$len = $rk->getOutQLen();
while ($len > 0) {
    $len = $rk->getOutQLen();
    var_dump($len);
    $rk->poll(50);
}
運(yùn)行生產(chǎn)者
php producer.php
# output

int(20)
int(20)
int(20)
int(20)
int(0)

# 你可以查看你剛才上面啟動(dòng)的消費(fèi)者shell應(yīng)該會(huì)輸出消息
qkl . 0
qkl . 1
qkl . 2
qkl . 3
qkl . 4
qkl . 5
qkl . 6
qkl . 7
qkl . 8
qkl . 9
qkl . 10
qkl . 11
qkl . 12
qkl . 13
qkl . 14
qkl . 15
qkl . 16
qkl . 17
qkl . 18
qkl . 19
Low Level 消費(fèi)者
setDrMsgCb(function ($kafka, $message) {
    file_put_contents("./c_dr_cb.log", var_export($message, true), FILE_APPEND);
});
$conf->setErrorCb(function ($kafka, $err, $reason) {
    file_put_contents("./err_cb.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND);
});

//設(shè)置消費(fèi)組
$conf->set("group.id", "myConsumerGroup");

$rk = new RdKafkaConsumer($conf);
$rk->addBrokers("127.0.0.1");

$topicConf = new RdKafkaTopicConf();
$topicConf->set("request.required.acks", 1);
//在interval.ms的時(shí)間內(nèi)自動(dòng)提交確認(rèn)、建議不要啟動(dòng)
//$topicConf->set("auto.commit.enable", 1);
$topicConf->set("auto.commit.enable", 0);
$topicConf->set("auto.commit.interval.ms", 100);

// 設(shè)置offset的存儲(chǔ)為file
//$topicConf->set("offset.store.method", "file");
// 設(shè)置offset的存儲(chǔ)為broker
 $topicConf->set("offset.store.method", "broker");
//$topicConf->set("offset.store.path", __DIR__);

//smallest:簡(jiǎn)單理解為從頭開始消費(fèi),其實(shí)等價(jià)于上面的 earliest
//largest:簡(jiǎn)單理解為從最新的開始消費(fèi),其實(shí)等價(jià)于上面的 latest
//$topicConf->set("auto.offset.reset", "smallest");

$topic = $rk->newTopic("test", $topicConf);

// 參數(shù)1消費(fèi)分區(qū)0
// RD_KAFKA_OFFSET_BEGINNING 重頭開始消費(fèi)
// RD_KAFKA_OFFSET_STORED 最后一條消費(fèi)的offset記錄開始消費(fèi)
// RD_KAFKA_OFFSET_END 最后一條消費(fèi)
$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);
//$topic->consumeStart(0, RD_KAFKA_OFFSET_END); //
//$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);

while (true) {
    //參數(shù)1表示消費(fèi)分區(qū),這里是分區(qū)0
    //參數(shù)2表示同步阻塞多久
    $message = $topic->consume(0, 12 * 1000);
    if (is_null($message)) {
        sleep(1);
        echo "No more messages
";
        continue;
    }
    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            var_dump($message);
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
            echo "No more messages; will wait for more
";
            break;
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            echo "Timed out
";
            break;
        default:
            throw new Exception($message->errstr(), $message->err);
            break;
    }
}
High LEVEL消費(fèi)者
assign();
//            $kafka->assign([new RdKafkaTopicPartition("qkl01", 0, 0)]);
            break;

        case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
            echo "Revoke: ";
            var_dump($partitions);
            $kafka->assign(NULL);
            break;

        default:
            throw new Exception($err);
    }
}

// Set a rebalance callback to log partition assignments (optional)
$conf->setRebalanceCb(function(RdKafkaKafkaConsumer $kafka, $err, array $partitions = null) {
    rebalance($kafka, $err, $partitions);
});

// Configure the group.id. All consumer with the same group.id will consume
// different partitions.
$conf->set("group.id", "test-110-g100");

// Initial list of Kafka brokers
$conf->set("metadata.broker.list", "192.168.216.122");

$topicConf = new RdKafkaTopicConf();

$topicConf->set("request.required.acks", -1);
//在interval.ms的時(shí)間內(nèi)自動(dòng)提交確認(rèn)、建議不要啟動(dòng)
$topicConf->set("auto.commit.enable", 0);
//$topicConf->set("auto.commit.enable", 0);
$topicConf->set("auto.commit.interval.ms", 100);

// 設(shè)置offset的存儲(chǔ)為file
$topicConf->set("offset.store.method", "file");
$topicConf->set("offset.store.path", __DIR__);
// 設(shè)置offset的存儲(chǔ)為broker
// $topicConf->set("offset.store.method", "broker");

// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// "smallest": start from the beginning
$topicConf->set("auto.offset.reset", "smallest");

// Set the configuration to use for subscribed/assigned topics
$conf->setDefaultTopicConf($topicConf);

$consumer = new RdKafkaKafkaConsumer($conf);

//$KafkaConsumerTopic = $consumer->newTopic("qkl01", $topicConf);

// Subscribe to topic "test"
$consumer->subscribe(["qkl01"]);

echo "Waiting for partition assignment... (make take some time when
";
echo "quickly re-joining the group after leaving it.)
";

while (true) {
    $message = $consumer->consume(120*1000);
    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            var_dump($message);
//            $consumer->commit($message);
//            $KafkaConsumerTopic->offsetStore(0, 20);
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
            echo "No more messages; will wait for more
";
            break;
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            echo "Timed out
";
            break;
        default:
            throw new Exception($message->errstr(), $message->err);
            break;
    }
}
消費(fèi)組特別說明
特別注意,High LEVEL消費(fèi)者設(shè)置的消費(fèi)組,kafka服務(wù)器才會(huì)記錄, Low Level消費(fèi)者設(shè)置的消費(fèi)組,服務(wù)器不會(huì)記錄

具體查看消費(fèi)組信息,你可以翻閱本篇文章

查看服務(wù)器元數(shù)據(jù)(topic/partition/broker)
setDrMsgCb(function ($kafka, $message) {
    file_put_contents("./xx.log", var_export($message, true), FILE_APPEND);
});
$conf->setErrorCb(function ($kafka, $err, $reason) {
    printf("Kafka error: %s (reason: %s)
", rd_kafka_err2str($err), $reason);
});

$conf->set("group.id", "myConsumerGroup");

$rk = new RdKafkaConsumer($conf);
$rk->addBrokers("127.0.0.1");

$allInfo = $rk->metadata(true, NULL, 60e3);

$topics = $allInfo->getTopics();

echo rd_kafka_offset_tail(100);
echo "--";

echo count($topics);
echo "--";


foreach ($topics as $topic) {

    $topicName = $topic->getTopic();
    if ($topicName == "__consumer_offsets") {
        continue ;
    }

    $partitions = $topic->getPartitions();
    foreach ($partitions as $partition) {
//        $rf = new ReflectionClass(get_class($partition));
//        foreach ($rf->getMethods() as $f) {
//            var_dump($f);
//        }
//        die();
        $topPartition = new RdKafkaTopicPartition($topicName, $partition->getId());
        echo  "當(dāng)前的話題:" . ($topPartition->getTopic()) . " - " . $partition->getId() . " - ";
        echo  "offset:" . ($topPartition->getOffset()) . PHP_EOL;
    }
}
如果需遠(yuǎn)端生產(chǎn)和消費(fèi)
vim config/server.properties
advertised.listeners=PLAINTEXT://ip:9092
# ip 未你kafka的外網(wǎng)ip即可
分享一個(gè)打包好的php-rdkafka的類庫(kù)

https://github.com/qkl9527/php-rdkafka-class

參考文獻(xiàn)

Kafka文檔

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/29117.html

相關(guān)文章

  • PHPkafka常用腳本實(shí)踐

    摘要:閱讀本教程前最好先嘗試閱讀下的實(shí)踐自帶命令實(shí)踐嘗試實(shí)踐的知識(shí)創(chuàng)建話題生產(chǎn)消息消費(fèi)消息話題信息獲取消費(fèi)組獲取消費(fèi)組的自帶的命令安裝目錄的目錄下代表我們會(huì)使用的腳本 閱讀本教程前最好先嘗試閱讀:PHP下kafka的實(shí)踐 自帶命令實(shí)踐 嘗試實(shí)踐的kafka知識(shí): 創(chuàng)建話題 生產(chǎn)消息 消費(fèi)消息 話題信息 獲取消費(fèi)組 獲取消費(fèi)組的offset 自帶的命令 # kafka安裝目錄的bin目錄下...

    caiyongji 評(píng)論0 收藏0
  • PHPer書單

    摘要:想提升自己,還得多看書多看書多看書下面是我收集到的一些程序員應(yīng)該看得書單及在線教程,自己也沒有全部看完。共勉吧當(dāng)然,如果你有好的書想分享給大家的或者覺得書單不合理,可以去通過進(jìn)行提交。講師溫銘,軟件基金會(huì)主席,最佳實(shí)踐作者。 想提升自己,還得多看書!多看書!多看書!下面是我收集到的一些PHP程序員應(yīng)該看得書單及在線教程,自己也沒有全部看完。共勉吧!當(dāng)然,如果你有好的書想分享給大家的或者...

    jimhs 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

閱讀需要支付1元查看
<