成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

nsq 優(yōu)秀的消息隊(duì)列

Atom / 3342人閱讀

摘要:簡介是語言編寫的,開源的分布式消息隊(duì)列中間件,其設(shè)計(jì)的目的是用來大規(guī)模地處理每天數(shù)以十億計(jì)級別的消息。

簡介

NSQ是Go語言編寫的,開源的分布式消息隊(duì)列中間件,其設(shè)計(jì)的目的是用來大規(guī)模地處理每天數(shù)以十億計(jì)級別的消息。NSQ 具有分布式和去中心化拓?fù)浣Y(jié)構(gòu),該結(jié)構(gòu)具有無單點(diǎn)故障、故障容錯、高可用性以及能夠保證消息的可靠傳遞的特征,是一個成熟的、已在大規(guī)模生成環(huán)境下應(yīng)用的產(chǎn)品。

NSQ在國內(nèi)公司用的很少,在使用當(dāng)中愈發(fā)的覺得驚喜,比如他的簡單易用、部署快捷,再比如之前比較困擾的 延時定時消息,發(fā)現(xiàn)nsq 也支持,官方文檔比較全,咨詢問題時回復(fù)也非常的耐心和即時,所以我覺得有必要發(fā)布一篇文章來介紹下nsq,惠及大眾。

nsq 有三個必要的組建nsqd、nsqlookupd、nsqadmin 其中nsqd 和 nsqlookup是必須部署的 下面我們一一介紹。

nsqd :

負(fù)責(zé)接收消息,存儲隊(duì)列和將消息發(fā)送給客戶端,nsqd 可以多機(jī)器部署,當(dāng)你使用客戶端向一個topic發(fā)送消息時,可以配置多個nsqd地址,消息會隨機(jī)的分配到各個nsqd上,nsqd優(yōu)先把消息存儲到內(nèi)存channel中,當(dāng)內(nèi)存channel滿了之后,則把消息寫到磁盤文件中。他監(jiān)聽了兩個tcp端口,一個用來服務(wù)客戶端,一個用來提供http的接口 ,nsqd 啟動時置頂下nsqlookupd地址即可:

nsqd –lookupd-tcp-address=127.0.0.1:4160

也可以指定端口 與數(shù)據(jù)目錄

nsqd –lookupd-tcp-address=127.0.0.1:4160 --broadcast-address=127.0.0.1 -tcp-address=127.0.0.1:4154 -http-address=”0.0.0.0:4155″ –data-path=/data/nsqdata

其他配置項(xiàng)可詳見官網(wǎng)

nsqlookupd
主要負(fù)責(zé)服務(wù)發(fā)現(xiàn) 負(fù)責(zé)nsqd的心跳、狀態(tài)監(jiān)測,給客戶端、nsqadmin提供nsqd地址與狀態(tài)

nsqadmin:
nsqadmin是一個web管理界面 啟動方式如下:

nsqadmin –lookupd-http-address=127.0.0.1:4161

channel詳情頁示例圖如下 ,empty可以清空當(dāng)前channel的信息,delete刪除當(dāng)前channel, pause是暫停消息消費(fèi)。

圖中也有幾個比較重要的參數(shù) depth當(dāng)前的積壓量,in-flight代表已經(jīng)投遞還未消費(fèi)掉的消息,deferred是未消費(fèi)的定時(延時)消息數(shù),ready count比較重要,go的客戶端是通過設(shè)置max-in-flight 除以客戶端連接數(shù)得到的,代表一次推給客戶端多少條消息,或者客戶端準(zhǔn)備一次性接受多少條消息,謹(jǐn)慎設(shè)置其值,因?yàn)榭赡茉斐煞?wù)器壓力,如果消費(fèi)能力比較弱,rdy建議設(shè)置的低一點(diǎn)比如3

Topic 和 Channel

其實(shí)nsqd相當(dāng)于kafka當(dāng)中的分區(qū),channel和consumers客戶端的多個連接 相當(dāng)于kafka的消費(fèi)組,但nsq比kafka使用方式便捷概念上更容易理解
拋開與kafka的對比,nsq的topic 可以設(shè)置多個channel,因?yàn)橛锌赡苡卸鄠€業(yè)務(wù)方需要定值topic的消息,這樣互不影響,
當(dāng)然一個消息會發(fā)送topic下的所有channel,然后會分配到不同客戶端的連接上,如下圖。

這篇文章主要介紹nsq的使用,源碼就不展開講,如果有興趣的同學(xué)多的話 過幾天我會再開一篇專門敘述nsq的源碼與分析。

這里提下延時消息:

nsq支持延時消息的投遞,比如我想這條消息5分鐘之后才被投遞出去被客戶端消費(fèi),較于普通的消息投遞,多了個毫秒數(shù),默認(rèn)支持最大的毫秒數(shù)為3600000毫秒也就是60分鐘,不過這個值可以在nsqd 啟動的時候 用 -max-req-timeout參數(shù)修改最大值。

延時消息可用于以下場景,比如一個訂單超過30分鐘未付款,修改其狀態(tài) 或者給客戶發(fā)短信提醒,比如之前看到的滴滴打車訂單完成后 一定時間內(nèi)未評價的可以未其設(shè)置默認(rèn)值,再比如用戶的積分過期,等等場景避免了全表掃描,異步處理,kafka不支持延時消息的投遞,目前知道支持的有rabbitmq rocketmq,但是rabbitmq 有坑,有可能會超時投遞,而rocketmq只有阿里云付費(fèi)版支持的比較好。

nsq延時消息的實(shí)現(xiàn)是用最小堆算法完成,作者繼承實(shí)現(xiàn)heap的一系類接口,專門寫了一個pqueque最小堆的優(yōu)先隊(duì)列,在internal/pequeque 目錄可以看到相關(guān)實(shí)現(xiàn),pub的時候如果chanMsg.deferred != 0則會調(diào)用channel.PutMessageDeferred方法,最終會調(diào)用繼承了go heap接口的pqueque.push方法

延時消息的處理 和普通消息一樣都是 nsqd/protocol_v2.go下messagePump 中把消息發(fā)送給客戶端 然后在queueScanWorker中分別處理,pop是peekAndShift方法中,拿當(dāng)前時間 和 deferred[0]對比如果大于 就彈出發(fā)送給客戶端 如下代碼:

func (n *NSQD) queueScanWorker(workCh chan *Channel, responseCh chan bool, closeCh chan int) {
    for {
        select {
        case c := <-workCh:
            now := time.Now().UnixNano()
            dirty := false
            if c.processInFlightQueue(now) {
                dirty = true
            }
            if c.processDeferredQueue(now) {
                dirty = true
            }
            responseCh <- dirty
        case <-closeCh:
            return
        }
    }
}

func (c *Channel) processDeferredQueue(t int64) bool {
    c.exitMutex.RLock()
    defer c.exitMutex.RUnlock()

    if c.Exiting() {
        return false
    }

    dirty := false
    for {
        c.deferredMutex.Lock()
        item, _ := c.deferredPQ.PeekAndShift(t)
        c.deferredMutex.Unlock()

        if item == nil {
            goto exit
        }
        dirty = true

        msg := item.Value.(*Message)
        _, err := c.popDeferredMessage(msg.ID)
        if err != nil {
            goto exit
        }
        c.put(msg)
    }

exit:
    return dirty
}

func (pq *PriorityQueue) PeekAndShift(max int64) (*Item, int64) {
    if pq.Len() == 0 {
        return nil, 0
    }

    item := (*pq)[0]
    if item.Priority > max {
        return nil, item.Priority - max
    }
    heap.Remove(pq, 0)

    return item, 0
}

php和go的客戶端的使用

官網(wǎng)客戶端鏈接:Client Libraries php客戶端之前官網(wǎng)有一個5年前比較老的客戶端,已經(jīng)沒人維護(hù) 甚至無法運(yùn)行,于是我貢獻(xiàn)了一個php72擴(kuò)展版本 php-nsq,速度塊了近三倍,正在逐步完善,支持各種配置與特性,目前已被官網(wǎng)收納,簡單介紹下使用 順便求下star

php-nsq pub :

$nsqd_addr = array(
    "127.0.0.1:4150",
    "127.0.0.1:4154"
);

$nsq = new Nsq();
$is_true = $nsq->connect_nsqd($nsqd_addr);
for($i = 0; $i < 20; $i++){
    $nsq->publish("test", "nihao");
}

php-nsq 延時pub :

參數(shù) 僅僅多一個毫秒?yún)?shù),so easy!

$deferred = new Nsq();
$isTrue = $deferred->connectNsqd($nsqdAddr);
for($i = 0; $i < 20; $i++){
    $deferred->deferredPublish("test", "message daly", 3000); // 第三值默認(rèn)范圍 millisecond default : [0 < millisecond < 3600000] ,可以更改 上面已提到
}

php-nsq sub :

拋異常消息可以自動重試,重試時間可以有retry_delay_time設(shè)定,多少時間后再次接收被重試的消息

$nsq_lookupd = new NsqLookupd("127.0.0.1:4161"); //the nsqlookupd tcp addr
$nsq = new Nsq();
$config = array(
    "topic" => "test",
    "channel" => "struggle",
    "rdy" => 2,                //optional , default 1
    "connect_num" => 1,        //optional , default 1
    "retry_delay_time" => 5000,  //optional, default 0 , after 5000 msec, message will be retried
);

$nsq->subscribe($nsq_lookupd, $config, function($msg){

    echo $msg->payload;
    echo $msg->attempts;
    echo $msg->message_id;
    echo $msg->timestamp;

});

go client pub

package main

import (
        "github.com/nsqio/go-nsq"
       )

var producer *nsq.Producer

func main() {
    nsqd := "127.0.0.1:4150"
    producer, err := nsq.NewProducer(nsqd, nsq.NewConfig())
    producer.Publish("test", []byte("nihao"))
    if err != nil {
        panic(err)
    }
}

go client sub

package main

import (
 "fmt"
 "sync"
 "github.com/nsqio/go-nsq"
)
type NSQHandler struct {
}

func (this *NSQHandler) HandleMessage(msg *nsq.Message) error {
    fmt.Println("receive", msg.NSQDAddress, "message:", string(msg.Body))
    return nil
}

func testNSQ() {
    waiter := sync.WaitGroup{}
    waiter.Add(1)

    go func() {
        defer waiter.Done()
        config:=nsq.NewConfig()
        config.MaxInFlight=9

    //建立多個連接
    for i := 0; i<10; i++ {
        consumer, err := nsq.NewConsumer("test", "struggle", config)
        if nil != err {
            fmt.Println("err", err)
            return
        }

        consumer.AddHandler(&NSQHandler{})
        err = consumer.ConnectToNSQD("127.0.0.1:4150")
        if nil != err {
            fmt.Println("err", err)
            return
        }
    }
        select{}

    }()

    waiter.Wait()
}
func main() {
        testNSQ();

}

同時此篇文章 更新到了自己博客

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/28127.html

相關(guān)文章

  • nsqjs客戶端部署

    摘要:因?yàn)楣驹跇I(yè)務(wù)中需要用到消息隊(duì)列產(chǎn)品,我選用了基于開源的產(chǎn)品,記錄下我遇到的那些部署中的坑。 因?yàn)楣驹跇I(yè)務(wù)中需要用到消息隊(duì)列產(chǎn)品,我選用了基于golang開源的nsq產(chǎn)品,記錄下我遇到的那些部署中的坑。首先安裝nsq,這個沒什么好說的,我是直接在官網(wǎng)下載bin文件,直接部署的,環(huán)境是centOS 6.7,安裝在/opt/nsq-0.3.7.linux-amd64.go1.6目錄下;其...

    myeveryheart 評論0 收藏0
  • How we redesigned the NSQ- 其他特性及未來計(jì)劃

    摘要:一條消息除了基本的元數(shù)據(jù)之外,其余內(nèi)容為消息體。消息的元數(shù)據(jù)主要包括了消息在服務(wù)端產(chǎn)生時的時間戳,服務(wù)端對于該消息的下發(fā)次數(shù),消息。作為的消費(fèi)者,從消費(fèi)消息后通過進(jìn)行處理。 在系列文章前面幾篇中,介紹了 NSQ 改造的過程和幾個基礎(chǔ)特性,本文中我們繼續(xù)介紹幾個高級特性及其使用場景,這些都是結(jié)合有贊業(yè)務(wù)場景總結(jié)提煉出來的重要功能。 NSQ 拓展消息格式的設(shè)計(jì) 有贊中間件在 NSQ 中引入...

    blastz 評論0 收藏0
  • 【swoole】結(jié)合swoole 和 nsq 實(shí)際應(yīng)用

    摘要:并且注冊回調(diào)函數(shù)。在重寫的回調(diào)函數(shù)中,實(shí)現(xiàn)了的訂閱功能消息的處理簡單封裝了重復(fù)消息的判斷沒有消費(fèi)消息的重新投遞引入就是構(gòu)造方法引入的實(shí)例化同時,重寫的方法。所以當(dāng)執(zhí)行腳本的時候,也就是啟動了對應(yīng)的服務(wù)。當(dāng)然更好的是使用協(xié)程。 集合 swoole 的框架設(shè)計(jì)為了減少理解度,我盡量的從源頭開始引入 1. nsq 案例中是使用 swoole 結(jié)合一個php 框架實(shí)現(xiàn)的是 NSQ 訂閱功能。...

    AdolphLWQ 評論0 收藏0
  • 有贊業(yè)務(wù)對賬平臺探索與實(shí)踐

    摘要:業(yè)務(wù)對賬平臺的核心目的,就是及時發(fā)現(xiàn)類似問題,并及時修復(fù)。這對對賬平臺的吞吐量造成了挑戰(zhàn)。五健康度對賬中心可以拿到業(yè)務(wù)系統(tǒng)及其所在整個鏈路的數(shù)據(jù)一致性信息。在分布式環(huán)境下,沒有人能回避數(shù)據(jù)一致性問題,我們對此充滿著敬畏。 一、引子 根據(jù)CAP原理,分布式系統(tǒng)無法在保證了可用性(Availability)和分區(qū)容忍性(Partition)之后,繼續(xù)保證一致性(Consistency)。我...

    wangjuntytl 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<