摘要:支持消息刪除業(yè)務(wù)使用方,可以隨時(shí)刪除指定消息。消息傳輸可靠性消息進(jìn)入到延遲隊(duì)列后,保證至少被消費(fèi)一次。
延遲隊(duì)列,顧名思義它是一種帶有延遲功能的消息隊(duì)列。 那么,是在什么場(chǎng)景下我才需要這樣的隊(duì)列呢?
一、背景先看看一下業(yè)務(wù)場(chǎng)景:
1.會(huì)員過期前3天發(fā)送召回通知
2.訂單支付成功后,5分鐘后檢測(cè)下游環(huán)節(jié)是否都正常,比如用戶購(gòu)買會(huì)員后,各種會(huì)員狀態(tài)是否都設(shè)置成功
3.如何定期檢查處于退款狀態(tài)的訂單是否已經(jīng)退款成功?
4.實(shí)現(xiàn)通知失敗,1,3,5,7分鐘重復(fù)通知,直到對(duì)方回復(fù)?
通常解決以上問題,最簡(jiǎn)單直接的辦法就是定時(shí)去掃表。
掃表存在的問題是:
1.掃表與數(shù)據(jù)庫(kù)長(zhǎng)時(shí)間連接,在數(shù)量量大的情況容易出現(xiàn)連接異常中斷,需要更多的異常處理,對(duì)程序健壯性要求高
2.在數(shù)據(jù)量大的情況下延時(shí)較高,規(guī)定內(nèi)處理不完,影響業(yè)務(wù),雖然可以啟動(dòng)多個(gè)進(jìn)程來處理,這樣會(huì)帶來額外的維護(hù)成本,不能從根本上解決。
3.每個(gè)業(yè)務(wù)都要維護(hù)一個(gè)自己的掃表邏輯。 當(dāng)業(yè)務(wù)越來越多時(shí),發(fā)現(xiàn)掃表部分的邏輯會(huì)重復(fù)開發(fā),但是非常類似
延時(shí)隊(duì)列能對(duì)于上述需求能很好的解決
二、調(diào)研調(diào)研了市場(chǎng)上一些開源的方案,以下:
1.有贊科技:只有原理,沒有開源代碼
2.github個(gè)人的:https://github.com/ouqiang/de...
1.基于redis實(shí)現(xiàn),redis只能配置一個(gè),如果redis掛了整個(gè)服務(wù)不可用,可用性差點(diǎn)
2.消費(fèi)端實(shí)現(xiàn)的是拉模式,接入成本大,每個(gè)項(xiàng)目都得去實(shí)現(xiàn)一遍接入代碼
3.在star使用的人數(shù)不多,放在生產(chǎn)環(huán)境,存在風(fēng)險(xiǎn),加之對(duì)go語(yǔ)言不了解,出了問題難以維護(hù)
3.SchedulerX-阿里開源的: 功能很強(qiáng)大,但是運(yùn)維復(fù)雜,依賴組件多,不夠輕量
4.RabbitMQ-延時(shí)任務(wù): 本身沒有延時(shí)功能,需要借助一特性自己實(shí)現(xiàn),而且公司沒有部署這個(gè)隊(duì)列,去多帶帶部署一個(gè)這個(gè)來做延時(shí)隊(duì)列成本有點(diǎn)高,而且還需要專門的運(yùn)維來維護(hù),目前團(tuán)隊(duì)不支持
基本以上原因打算自己寫一個(gè),平常使用php多,項(xiàng)目基本redis的zset結(jié)構(gòu)作為存儲(chǔ),用php語(yǔ)言實(shí)現(xiàn) ,實(shí)現(xiàn)原理參考了有贊團(tuán)隊(duì):https://tech.youzan.com/queui...
三、目標(biāo)輕量級(jí):有較少的php的拓展就能直接運(yùn)行,不需要引入網(wǎng)絡(luò)框架,比如swoole,workman之類的
穩(wěn)定性:采用master-work架構(gòu),master不做業(yè)務(wù)處理,只負(fù)責(zé)管理子進(jìn)程,子進(jìn)程異常退出時(shí)自動(dòng)拉起
可用性:
1.支持多實(shí)例部署,每個(gè)實(shí)例無(wú)狀態(tài),一個(gè)實(shí)例掛掉不影響服務(wù)
2.支持配置多個(gè)redis,一個(gè)redis掛了只影響部分消息
3.業(yè)務(wù)方接入方便,在后臺(tái)只需填寫相關(guān)消息類型和回掉接口
拓展性: 當(dāng)消費(fèi)進(jìn)程存在瓶頸時(shí),可以配置加大消費(fèi)進(jìn)程數(shù),當(dāng)寫入存在瓶頸時(shí),可增加實(shí)例數(shù)寫入性能可線性提高
實(shí)時(shí)性:允許存在一定的時(shí)間誤差。
支持消息刪除:業(yè)務(wù)使用方,可以隨時(shí)刪除指定消息。
消息傳輸可靠性:消息進(jìn)入到延遲隊(duì)列后,保證至少被消費(fèi)一次。
寫入性能:qps>1000+
四、架構(gòu)設(shè)計(jì)與說明總體架構(gòu)
采用master-work架構(gòu)模式,主要包括6個(gè)模塊:
1.dq-mster: 主進(jìn)程,負(fù)責(zé)管理子進(jìn)程的創(chuàng)建,銷毀,回收以及信號(hào)通知
2.dq-server: 負(fù)責(zé)消息寫入,讀取,刪除功能以及維護(hù)redis連接池
3.dq-timer-N: 負(fù)責(zé)從redis的zset結(jié)構(gòu)中掃描到期的消息,并負(fù)責(zé)寫入ready 隊(duì)列,個(gè)數(shù)可配置,一般2個(gè)就行了,因?yàn)橄⒃趜set結(jié)構(gòu)是按時(shí)間有序的
4.dq-consume-N: 負(fù)責(zé)從ready隊(duì)列中讀取消息并通知給對(duì)應(yīng)回掉接口,個(gè)數(shù)可配置
5.dq-redis-checker: 負(fù)責(zé)檢查redis的服務(wù)狀態(tài),如果redis宕機(jī),發(fā)送告警郵件
6.dq-http-server: 提供web后臺(tái)界面,用于注冊(cè)topic
五、部署環(huán)境依賴:PHP 5.4+ 安裝sockets,redis,pcntl,pdo_mysql 拓展
create database dq; #存放告警信息 CREATE TABLE `dq_alert` ( `id` int(11) NOT NULL AUTO_INCREMENT, `host` varchar(255) NOT NULL DEFAULT "", `port` int(11) NOT NULL DEFAULT "0", `user` varchar(255) NOT NULL DEFAULT "", `pwd` varchar(255) NOT NULL DEFAULT "", `ext` varchar(2048) NOT NULL DEFAULT "", PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8; #存放redis信息 CREATE TABLE `dq_redis` ( `id` int(11) NOT NULL AUTO_INCREMENT, `t_name` varchar(200) NOT NULL DEFAULT "", `t_content` varchar(2048) NOT NULL DEFAULT "", PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=14 DEFAULT CHARSET=utf8; #存儲(chǔ)注冊(cè)信息 CREATE TABLE `dq_topic` ( `id` int(11) NOT NULL AUTO_INCREMENT, `t_name` varchar(1024) NOT NULL DEFAULT "", `delay` int(11) NOT NULL DEFAULT "0", `callback` varchar(1024) NOT NULL DEFAULT "", `timeout` int(11) NOT NULL DEFAULT "3000", `email` varchar(1024) NOT NULL DEFAULT "", `topic` varchar(255) NOT NULL DEFAULT "", `createor` varchar(1024) NOT NULL DEFAULT "", `status` tinyint(4) NOT NULL DEFAULT "1", `method` varchar(32) NOT NULL DEFAULT "GET", PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8;
在DqConf.php文件中修改php了路徑 $logPath
命令:
php DqHttpServer.php --port 8088
訪問:http://127.0.0.1:8088,出現(xiàn)配置界面
redis信息格式:host:post:auth 比如 127.0.0.1:6379:12345
php DqInit.php --port 6789
看到如下信息說明啟動(dòng)成功
addServer($server); $topic ="order_openvip_checker"; //topic在后臺(tái)注冊(cè) $id = uniqid(); $data=array( "id"=>$id, "body"=>array( "a"=>1, "b"=>2, "c"=>3, "ext"=>str_repeat("a",64), ), //可選,設(shè)置后以這個(gè)通知時(shí)間為準(zhǔn),默認(rèn)延時(shí)時(shí)間在注冊(cè)topic的時(shí)候指定 "fix_time"=>date("Y-m-d 23:50:50"), ); //添加 $boolRet = $dqClient->add($topic, $data); echo "add耗時(shí):".(msectime() - $time)."ms "; //查詢 $time = msectime(); $result = $dqClient->get($topic, $id); echo "get耗時(shí):".(msectime() - $time)."ms "; //刪除 $time = msectime(); $boolRet = $dqClient->del($topic,$id); echo "del耗時(shí):".(msectime() - $time)."ms ";
執(zhí)行php test.php
默認(rèn)日志目錄在項(xiàng)目目錄的logs目錄下,在DqConf.php修改$logPath
1.請(qǐng)求日志:request_ymd.txt
2.通知日志:notify_ymd.txt
3.錯(cuò)誤日志:err_ymd.txt
1.系統(tǒng)會(huì)自動(dòng)檢測(cè)配置文件新,如果有改動(dòng),會(huì)自動(dòng)退出(沒有找到較好的熱更新的方案),需要重啟,可以在crontab里面建個(gè)任務(wù),1分鐘執(zhí)行一次,程序有check_self的判斷
2.優(yōu)雅退出命令: master檢測(cè)偵聽了USR2信號(hào),收到信號(hào)后會(huì)通知所有子進(jìn)程,子進(jìn)程完成當(dāng)前任務(wù)后會(huì)自動(dòng)退出
ps -ef | grep dq-master| grep -v grep | head -n 1 | awk "{print $2}" | xargs kill -USR2六、性能測(cè)試
需要安裝pthreads拓展:
測(cè)試原理:使用多線程模擬并發(fā),在1s內(nèi)能成功返回請(qǐng)求成功的個(gè)數(shù)
php DqBench concurrency requests concurrency:并發(fā)數(shù) requests: 每個(gè)并發(fā)產(chǎn)生的請(qǐng)求數(shù) 測(cè)試環(huán)境:內(nèi)存 8G ,8核cpu,2個(gè)redis和1個(gè)dq-server 部署在一個(gè)機(jī)器上,數(shù)據(jù)包64字節(jié) qps:2400七、值得一提的性能優(yōu)化點(diǎn):
1.redis multi命令:將多個(gè)對(duì)redis的操作打包成一個(gè)減少網(wǎng)絡(luò)開銷
2.計(jì)數(shù)的操作異步處理,在異步邏輯里面用函數(shù)的static變量來保存,當(dāng)寫入redis成功后釋放static變量,可以在redis出現(xiàn)異常時(shí)計(jì)數(shù)仍能保持一致,除非進(jìn)程退出
3.內(nèi)存泄露檢測(cè)有必要: 所有的內(nèi)存分配在底層都是調(diào)用了brk或者mmap,只要程序只有大量brk或者mmap的系統(tǒng)調(diào)用,內(nèi)存泄露可能性非常高 ,檢測(cè)命令: strace -c -p pid | grep "mmap| brk"
4.檢測(cè)程序的系統(tǒng)調(diào)用情況:strace -c -p pid ,發(fā)現(xiàn)某個(gè)系統(tǒng)函數(shù)調(diào)用是其他的數(shù)倍,可能大概率程序存在問題
八、異常處理如果調(diào)用通知接口在超時(shí)時(shí)間內(nèi),沒有收到回復(fù)認(rèn)為通知失敗,系統(tǒng)會(huì)重新把數(shù)據(jù)放入隊(duì)列,重新通知,系統(tǒng)默認(rèn)最大通知10次(可以在Dqconf.php文件中修改$notify_exp_nums)通知間隔為2n+1,比如第一次1分鐘,通知失敗,第二次3分鐘后,直到收到回復(fù),超出最大通知次數(shù)后系統(tǒng)自動(dòng)丟棄,同時(shí)發(fā)郵件通知
ps:網(wǎng)絡(luò)抖動(dòng)在所難免,通知接口如果涉及到核心的服務(wù),一定要保證冪等??!
九、線上情況線上部署了兩個(gè)實(shí)例每個(gè)機(jī)房部一個(gè),4個(gè)redis作存儲(chǔ),服務(wù)穩(wěn)定運(yùn)行數(shù)月,各項(xiàng)指標(biāo)均符合預(yù)期
主要接入業(yè)務(wù):
訂單10分鐘召回通知
接口超時(shí)或者失敗補(bǔ)償
項(xiàng)目地址: https://github.com/chenlinzho...
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/29595.html
摘要:背景當(dāng)下視頻直播如此紅火,打造一個(gè)在線直播間涉及到哪些技術(shù)呢視頻直播由主播的直播端以及觀眾的觀看端組成。保持心跳斷開重連快速搭建在線直播間按前文所述,搭建直播間有非常多的細(xì)節(jié)需要考慮,包括采集推流分發(fā)播放體驗(yàn)優(yōu)化聊天室性能調(diào)優(yōu)等。 背景 當(dāng)下視頻直播如此紅火,打造一個(gè)在線直播間涉及到哪些技術(shù)呢? 視頻直播由主播的直播端以及觀眾的觀看端組成。一個(gè)簡(jiǎn)單的觀看端最起碼應(yīng)包含播放器以及聊天室。...
摘要:本文將會(huì)講解如何使用實(shí)現(xiàn)延時(shí)重試和失敗消息隊(duì)列,實(shí)現(xiàn)可靠的消息消費(fèi),消費(fèi)失敗后,自動(dòng)延時(shí)將消息重新投遞,當(dāng)達(dá)到一定的重試次數(shù)后,將消息投遞到失敗消息隊(duì)列,等待人工介入處理。 RabbitMQ是一款使用Erlang開發(fā)的開源消息隊(duì)列。本文假設(shè)讀者對(duì)RabbitMQ是什么已經(jīng)有了基本的了解,如果你還不知道它是什么以及可以用來做什么,建議先從官網(wǎng)的 RabbitMQ Tutorials 入門...
摘要:本文將會(huì)講解如何使用實(shí)現(xiàn)延時(shí)重試和失敗消息隊(duì)列,實(shí)現(xiàn)可靠的消息消費(fèi),消費(fèi)失敗后,自動(dòng)延時(shí)將消息重新投遞,當(dāng)達(dá)到一定的重試次數(shù)后,將消息投遞到失敗消息隊(duì)列,等待人工介入處理。 RabbitMQ是一款使用Erlang開發(fā)的開源消息隊(duì)列。本文假設(shè)讀者對(duì)RabbitMQ是什么已經(jīng)有了基本的了解,如果你還不知道它是什么以及可以用來做什么,建議先從官網(wǎng)的 RabbitMQ Tutorials 入門...
閱讀 4012·2021-11-18 13:22
閱讀 1829·2021-11-17 09:33
閱讀 2886·2021-09-26 09:46
閱讀 1220·2021-08-21 14:11
閱讀 2896·2019-08-30 15:53
閱讀 2717·2019-08-30 15:52
閱讀 1914·2019-08-30 10:52
閱讀 1528·2019-08-29 15:30