成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

開源一個kafka增強:okmq-1.0.0

PAMPANG / 1333人閱讀

摘要:只有兩個基礎組件同時死亡,才會受到嚴重影響。的意外死亡,造成生產(chǎn)端發(fā)送失敗。后臺會有一個線程進行這些失敗消息的遍歷和重新投遞。二阻塞業(yè)務正常進行。死亡,或者多帶帶死亡,消息最終都會被發(fā)出,僅當與同時死亡,消息才會發(fā)送失敗,并記錄在日志文件里。

本工具的核心思想就是:賭。只有兩個基礎組件同時死亡,才會受到嚴重影響。哦,斷電除外。

mq是個好東西,我們都在用。這也決定了mq應該是高高高可用的。某團就因為這個組件,出了好幾次生產(chǎn)事故,呵呵。

大部分業(yè)務系統(tǒng),要求的消息語義都是at least once,即都會有重復消息,但保證不會丟。即使這樣,依然有很多問題:

一、mq可用性無法保證。 mq的意外死亡,造成生產(chǎn)端發(fā)送失敗。很多消息要通過扒取日志進行回放,成本高耗時長。

二、mq阻塞業(yè)務正常進行。 mq卡頓或者網(wǎng)絡問題,會造成業(yè)務線程卡在mq的發(fā)送方法上,正常業(yè)務進行不下去,造成災難性的后果。

三、消息延遲。 mq死了就用不著說了,消息還沒投胎就已死亡。消息延遲主要是客戶端消費能力不強,或者是消費通道單一造成的。

使用組合存儲來保證消息的可靠投遞,就是okmq。

注意:okmq注重的是可靠性。對于順序性、事務等其他要素,不予考慮。當然,速度是必須的。
設計想法

我即使用兩套redis來模擬一些mq操作,都會比現(xiàn)有的一些解決方案要強。但這肯定不是我們需要的,因為redis的堆積能力太有限,內(nèi)存占用率直線上升的感覺并不太好。

但我們可以用redis來作為額外的發(fā)送確認機制。這個想法,在《使用多線程增加kafka消費能力》一文中曾經(jīng)提到過,現(xiàn)在到了實現(xiàn)的時候了。

首先看下使用Api
OkmqKafkaProducer producer = new ProducerBuilder()
.defaultSerializer()
.eanbleHa("redis")
.any("okmq.redis.mode", "single")
.any("okmq.redis.endpoint", "127.0.0.1:6379")
.any("okmq.redis.poolConfig.maxTotal", 100)
.servers("localhost:9092")
.clientID("okMQProducerTest")
.build();

Packet packet = new Packet();
packet.setTopic("okmq-test-topic");
packet.setContent("i will send you a msg");
producer.sendAsync(packet, null);
producer.shutdown();
以redis為例


我們按照數(shù)字標號來介紹:

1、 在消息發(fā)送到kafka之前,首先入庫redis。由于后續(xù)回調(diào)需要用到一個唯一表示,我們在packet包里添加了一個uuid。

2、 調(diào)用底層的api,進行真正的消息投遞。

3、 通過監(jiān)聽kafka的回調(diào),刪除redis中對應的key。在這里可以得到某條消息確切的的ack時間。那么長時間沒有刪除的,就算是投遞失敗的消息。

4、 后臺會有一個線程進行這些失敗消息的遍歷和重新投遞。我們叫做recovery。最復雜的也就是這一部分。對于redis來說,會首先爭搶一個持續(xù)5min的鎖,然后遍歷相關hashkey。

所以,對于以上代碼,redis發(fā)出以下命令:

1559206423.395597 [0 127.0.0.1:62858] "HEXISTS" "okmq:indexhash" "okmq:5197354"
1559206423.396670 [0 127.0.0.1:62858] "HSET" "okmq:indexhash" "okmq:5197354" ""
1559206423.397300 [0 127.0.0.1:62858] "HSET" "okmq:5197354" "okmq::2b9b33fd-95fd-4cd6-8815-4c572f13f76e" "{"content":"i will send you a msg104736623015238","topic":"okmq-test-topic","identify":"2b9b33fd-95fd-4cd6-8815-4c572f13f76e","timestamp":1559206423318}"
1559206423.676212 [0 127.0.0.1:62858] "HDEL" "okmq:5197354" "okmq::2b9b33fd-95fd-4cd6-8815-4c572f13f76e"
1559206428.327788 [0 127.0.0.1:62861] "SET" "okmq:recovery:lock" "01fb85a9-0670-40c3-8386-b2b7178d4faf" "px" "300000"
1559206428.337930 [0 127.0.0.1:62858] "HGETALL" "okmq:indexhash"
1559206428.341365 [0 127.0.0.1:62858] "HSCAN" "okmq:5197354" "0"
1559206428.342446 [0 127.0.0.1:62858] "HDEL" "okmq:indexhash" "okmq:5197354"
1559206428.342788 [0 127.0.0.1:62861] "GET" "okmq:recovery:lock"
1559206428.343119 [0 127.0.0.1:62861] "DEL" "okmq:recovery:lock"
以上問題解答 所以對于以上的三個問題,回答如下:

一、mq可用性無法保證。

為什么要要通過事后進行恢復呢?我把recovery機制帶著不是更好么?通過對未收到ack的消息進行遍歷,可以把這個過程做成自動化。

二、mq阻塞業(yè)務正常進行。

通過設置kafka的MAX_BLOCK_MS_CONFIG
參數(shù),其實是可以不阻塞業(yè)務的,但會丟失消息。我可以使用其他存儲來保證這些丟失的消息重新發(fā)送。

三、消息延遲。

mq死掉了,依然有其他備用通道進行正常服務。也有的團隊采用雙寫mq雙消費的方式來保證這個過程,也是被逼急了:)。如果kafka死掉了,業(yè)務會切換到備用通道進行消費。

擴展自己的HA

如果你不想用redis,比如你先要用hbase,那也是很簡單的。
但需要實現(xiàn)一個HA接口。

public interface HA {
    void close();

    void configure(Properties properties);

    void preSend(Packet packet) throws HaException;

    void postSend(Packet packet) throws HaException;

    void doRecovery(AbstractProducer producer) throws HaException;
}

使用之前,還需要注冊一下你的插件。

AbstractProducer.register("log", "com.sayhiai.arch.okmq.api.producer.ha.Ha2SimpleLog");
重要參數(shù)
okmq.ha.recoveryPeriod 恢復線程檢測周期,默認5秒

okmq.redis.mode redis的集群模式,可選:single、sentinel、cluster
okmq.redis.endpoint 地址,多個地址以,分隔
okmq.redis.connectionTimeout 連接超時
okmq.redis.soTimeout socket超時
okmq.redis.lockPx 分布式鎖的持有時間,可默認,5min
okmq.redis.splitMillis 間隔時間,redis換一個key進行運算,默認5min
okmq.redis.poolConfig.* 兼容jedis的所有參數(shù)
1.0.0 版本功能

1、進行了生產(chǎn)端的高可用抽象,實現(xiàn)了kafka的樣例。

2、增加了SimpleLog的ping、pong日志實現(xiàn)。

3、增加了Redis的生產(chǎn)端備用通道。包含single、cluster、sentinel三種模式。

4、可以自定義其他備用通道。

5、兼容kakfa的所有參數(shù)設置。

規(guī)劃 2.0.0

1、實現(xiàn)ActiveMQ的集成。

2、實現(xiàn)消費者的備用通道集成。

3、增加嵌入式kv存儲的生產(chǎn)者集成。

4、更精細的控制系統(tǒng)的行為。

5、加入開關和預熱,避免新啟動mq即被壓垮。

6、redis分片機制,大型系統(tǒng)專用。

3.0.0

1、監(jiān)控功能添加。

2、rest接口添加。

使用限制

當你把參數(shù)ha設置為true,表明你已經(jīng)收到以下的使用限制。反之,系統(tǒng)反應于原生無異。

使用限制:
本工具僅適用于非順序性、非事務性的普通消息投遞,且客戶端已經(jīng)做了冪等。一些訂單系統(tǒng)、消息通知等業(yè)務,非常適合。如果你需要其他特性,請?zhí)龃隧撁妗?/p>

kafka死亡,或者redis多帶帶死亡,消息最終都會被發(fā)出,僅當kafka與redis同時死亡,消息才會發(fā)送失敗,并記錄在日志文件里。

正常情況下,redis的使用容量極少極少。異常情況下,redis的容量有限,會迅速占滿。redis的剩余時間就是你的StopWatch,你必須在這個時間內(nèi)恢復你的消息系統(tǒng),一定要頂住哇。

End

系統(tǒng)目前處于1.0.0版本,正在線上小規(guī)模試用。工具小眾,但適用于大部分應用場景。如果你正在尋求這樣的解決方案,歡迎一塊完善代碼。

github地址:

https://github.com/sayhiai/okmq

也歡迎關注《小姐姐味道》微信公眾號,進行交流。

文章版權歸作者所有,未經(jīng)允許請勿轉載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉載請注明本文地址:http://systransis.cn/yun/74816.html

相關文章

  • Confluent的25億美元估值可能在開源動蕩中提供肯定。

    摘要:的億美元估值可能在開源中提供肯定,是一家基于的開源軟件提供商,在系列融資輪中籌集了億美元,估值為億美元。在月最新的期間,在公共預覽中為管理流媒體。在兩周后通過宣布其平臺組件的許可證更改做出了響應。該出版物推測如下,與超分頻器持有所有的卡。Confluent的25億美元估值可能在開源TurbulencetweetConfluent中提供肯定,Confluent是一家基于Apache Kafka...

    enda 評論0 收藏0

發(fā)表評論

0條評論

PAMPANG

|高級講師

TA的文章

閱讀更多
最新活動
閱讀需要支付1元查看
<