成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

基于Swoole和Redis實(shí)現(xiàn)的并發(fā)隊(duì)列處理系統(tǒng)

booster / 3465人閱讀

摘要:大家知道,一個(gè)消息隊(duì)列處理系統(tǒng)主要分為兩大部分消費(fèi)者和生產(chǎn)者。任務(wù)系統(tǒng)實(shí)時(shí)的對任務(wù)隊(duì)列進(jìn)行,出來一個(gè)任務(wù)就一個(gè)子進(jìn)程,由子進(jìn)程完成具體的任務(wù)邏輯。新的設(shè)計(jì)為了解決并發(fā)的問題,我們計(jì)劃做一個(gè)更加高效強(qiáng)壯的隊(duì)里處理系統(tǒng)。

背景

由于PHP不支持多線程,但是作為一個(gè)完善的系統(tǒng),有很多操作都是需要異步完成的。為了完成這些異步操作,我們做了一個(gè)基于Redis隊(duì)列任務(wù)系統(tǒng)。

大家知道,一個(gè)消息隊(duì)列處理系統(tǒng)主要分為兩大部分:消費(fèi)者和生產(chǎn)者。

在我們的系統(tǒng)中,主系統(tǒng)作為生產(chǎn)者,任務(wù)系統(tǒng)作為消費(fèi)者。

具體的工作流程如下: 1、主系統(tǒng)將需要需要處理的任務(wù)名稱+任務(wù)參數(shù)push到隊(duì)列中。 2、任務(wù)系統(tǒng)實(shí)時(shí)的對任務(wù)隊(duì)列進(jìn)行pop,pop出來一個(gè)任務(wù)就fork一個(gè)子進(jìn)程,由子進(jìn)程完成具體的任務(wù)邏輯。

具體代碼如下:

/**
 * 啟動守護(hù)進(jìn)程
 */
public function runAction() {
    Tools::log_message("ERROR", "daemon/run" . " | action: restart", "daemon-");
    while (true) {
        $this->fork_process();
    }
    exit;
}
 
/**
 * 創(chuàng)建子進(jìn)程
 */
private function fork_process() {
    $ppid = getmypid();
    $pid = pcntl_fork();
    if ($pid == 0) {//子進(jìn)程
        $pid = posix_getpid();
        //echo "* Process {$pid} was created 

";
        $this->mq_process();
        exit;
    } else {//主進(jìn)程
        $pid = pcntl_wait($status, WUNTRACED); //取得子進(jìn)程結(jié)束狀態(tài)
        if (pcntl_wifexited($status)) {
            //echo "

* Sub process: {$pid} exited with {$status}";
            //Tools::log_message("INFO", "daemon/run succ" . "|status:" . $status . "|pid:" . $ppid . "|childpid:" . $pid );
        } else {
            Tools::log_message("ERROR", "daemon/run fail" . "|status:" . $status . "|pid:" . $ppid . "|childpid:" . $pid, "daemon-");
        }
    }
}
 
/**
 * 業(yè)務(wù)任務(wù)隊(duì)列處理
 */
private function mq_process() {
    $data_pop = $this->masterRedis->rPop($this->redis_list_key);
    $data = json_decode($data_pop, 1);
    if (!$data) {
        return FALSE;
    }
    $worker = "_task_" . $data["worker"];
    $class_name = isset($data["class"]) ? $data["class"] : "TaskproModel";
    $params = $data["params"];
    $class = new $class_name();
    $class->$worker($params);
    return TRUE;
}

這是一個(gè)簡單的任務(wù)處理系統(tǒng)。

通過這個(gè)任務(wù)系統(tǒng)幫助我們實(shí)現(xiàn)了異步,到目前為止已經(jīng)穩(wěn)定運(yùn)行了將近一年。

但很可惜,它是一個(gè)單進(jìn)程的系統(tǒng)。它是一直在不斷的fork,如果有任務(wù)就處理,沒有任務(wù)就跳過。

這樣很穩(wěn)定。

但問題有兩個(gè):一是不斷地fork、pop會浪費(fèi)服務(wù)器資源,二是不支持并發(fā)!

第一個(gè)問題還好,但第二個(gè)問題就很嚴(yán)重。

當(dāng)主系統(tǒng) 同時(shí) 拋過來大量的任務(wù)時(shí),任務(wù)的處理時(shí)間就會無限的拉長。

新的設(shè)計(jì)

為了解決并發(fā)的問題,我們計(jì)劃做一個(gè)更加高效強(qiáng)壯的隊(duì)里處理系統(tǒng)。

因?yàn)樵赑HP7之前不支持多線程,所以我們采用多進(jìn)程。

從網(wǎng)上找了不少資料,大多所謂的多進(jìn)程都是N個(gè)進(jìn)程同時(shí)在后臺運(yùn)行。

顯然這是不合適的。

我的預(yù)想是:每pop出一個(gè)任務(wù)就fork一個(gè)任務(wù),任務(wù)執(zhí)行完成后子進(jìn)程結(jié)束。

遇到的問題

1、如何控制最大進(jìn)程數(shù)
這個(gè)問題很簡單,那就是每fork一個(gè)子進(jìn)程就自增一次。而當(dāng)子進(jìn)程執(zhí)行完成就自減一次。

自增沒有問題,我們就在主進(jìn)程中操作就完了。那么該如何自減呢?

可能你會說,當(dāng)然是在子進(jìn)程中啊。但這里你需要注意:當(dāng)fork的時(shí)候是從主進(jìn)程復(fù)制了一份資源給子進(jìn)程,這就意味著你無法在子進(jìn)程中操作主進(jìn)程中的計(jì)數(shù)器!

所以,這里就需要了解一個(gè)知識點(diǎn):信號。

具體的可以自行Google,這里直接看代碼。

// install signal handler for dead kids
pcntl_signal(SIGCHLD, array($this, "sig_handler"));

這就安裝了一個(gè)信號處理器。當(dāng)然還缺少一點(diǎn)。

declare(ticks = 1);

declare是一個(gè)控制結(jié)構(gòu)語句,具體的用法也請去Google。

這句代碼的意思就是每執(zhí)行一條低級語句就調(diào)用一次信號處理器。

這樣,每當(dāng)子進(jìn)程結(jié)束的時(shí)候就會調(diào)用信號處理器,我們就可以在信號處理器中進(jìn)行自減。

2、如何解決進(jìn)程殘留

在多進(jìn)程開發(fā)中,如果處理不當(dāng)就會導(dǎo)致進(jìn)程殘留。

為了解決進(jìn)程殘留,必須得將子進(jìn)程回收。

那么如何對子進(jìn)程進(jìn)行回收就是一個(gè)技術(shù)點(diǎn)了。

在pcntl的demo中,包括很多博文中都是說在主進(jìn)程中回收子進(jìn)程。

但我們是基于Redis的brpop的,而brpop是阻塞的。

這就導(dǎo)致一個(gè)問題:當(dāng)執(zhí)行N個(gè)任務(wù)之后,任務(wù)系統(tǒng)空閑的時(shí)候主進(jìn)程是阻塞的,而在發(fā)生阻塞的時(shí)候子進(jìn)程還在執(zhí)行,所以就無法完成最后幾個(gè)子進(jìn)程的進(jìn)程回收。。。

這里本來一直很糾結(jié),但當(dāng)我將信號處理器搞定之后就也很簡單了。

進(jìn)程回收也放到信號處理器中去。

新系統(tǒng)的評估

pcntl是一個(gè)進(jìn)程處理的擴(kuò)展,但很可惜它對多進(jìn)程的支持非常乏力。

所以這里采用Swoole擴(kuò)展中的Process。

具體代碼如下:

declare(ticks = 1);
class JobDaemonController extends Yaf_Controller_Abstract{
 
    use Trait_Redis;
 
    private $maxProcesses = 800;
    private $child;
    private $masterRedis;
    private $redis_task_wing = "task:wing"; //待處理隊(duì)列
 
    public function init(){
        // install signal handler for dead kids
        pcntl_signal(SIGCHLD, array($this, "sig_handler"));
        set_time_limit(0);
        ini_set("default_socket_timeout", -1); //隊(duì)列處理不超時(shí),解決redis報(bào)錯:read error on connection
    }
 
    private function redis_client(){
        $rds = new Redis();
        $rds->connect("redis.master.host",6379);
        return $rds;
    }
 
    public function process(swoole_process $worker){// 第一個(gè)處理
        $GLOBALS["worker"] = $worker;
        swoole_event_add($worker->pipe, function($pipe) {
            $worker = $GLOBALS["worker"];
            $recv = $worker->read();            //send data to master
 
            sleep(rand(1, 3));
            echo "From Master: $recv
";
            $worker->exit(0);
        });
        exit;
    }
 
    public function testAction(){
        for ($i = 0; $i < 10000; $i++){
            $data = [
                "abc" => $i,
                "timestamp" => time().rand(100,999)
            ];
            $this->masterRedis->lpush($this->redis_task_wing, json_encode($data));
        }
        exit;
    }
 
    public function runAction(){
        while (1){
//            echo "	 now we de have $this->child child processes
";
            if ($this->child < $this->maxProcesses){
                $rds = $this->redis_client();
                $data_pop = $rds->brpop($this->redis_task_wing, 3);//無任務(wù)時(shí),阻塞等待
                if (!$data_pop){
                    continue;
                }
                echo "	 Starting new child | now we de have $this->child child processes
";
                $this->child++;
                $process = new swoole_process([$this, "process"]);
                $process->write(json_encode($data_pop));
                $pid = $process->start();
            }
        }
    }
 
    private function sig_handler($signo) {
//        echo "Recive: $signo 
";
        switch ($signo) {
            case SIGCHLD:
                while($ret = swoole_process::wait(false)) {
//                    echo "PID={$ret["pid"]}
";
                    $this->child--;
                }
        }
    }
}

最終,經(jīng)過測試,單核1G的服務(wù)器執(zhí)行1到3秒的任務(wù)可以做到800的并發(fā)。

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/30967.html

相關(guān)文章

  • PHP并發(fā)IO編程之路

    摘要:下文如無特殊聲明將使用進(jìn)程同時(shí)表示進(jìn)程線程。收到數(shù)據(jù)后服務(wù)器程序進(jìn)行處理然后使用向客戶端發(fā)送響應(yīng)?,F(xiàn)在各種高并發(fā)異步的服務(wù)器程序都是基于實(shí)現(xiàn)的,比如。 并發(fā) IO 問題一直是服務(wù)器端編程中的技術(shù)難題,從最早的同步阻塞直接 Fork 進(jìn)程,到 Worker 進(jìn)程池/線程池,到現(xiàn)在的異步IO、協(xié)程。PHP 程序員因?yàn)橛袕?qiáng)大的 LAMP 框架,對這類底層方面的知識知之甚少,本文目的就是詳細(xì)介...

    Riddler 評論0 收藏0
  • Swoole-2.1.2 進(jìn)程池模塊使用

    摘要:在版本中我們將的進(jìn)程管理模塊封裝成了類,現(xiàn)在可以在代碼中使用的進(jìn)程管理器了。提供的進(jìn)程管理器來自于,經(jīng)過大量生產(chǎn)項(xiàng)目驗(yàn)證,穩(wěn)定性和健壯性都非常高。三任務(wù)投遞進(jìn)程管理器自帶了消息隊(duì)列和消息投遞的支持。 在Swoole-2.1.2版本中我們將Server的進(jìn)程管理模塊封裝成了PHP類,現(xiàn)在可以在PHP代碼中使用Swoole的進(jìn)程管理器了。 在實(shí)際項(xiàng)目中經(jīng)常需要寫一些長期運(yùn)行的腳本,如基于r...

    ZoomQuiet 評論0 收藏0
  • Easyswoole 源碼學(xué)習(xí)個(gè)人解析 目錄

    摘要:易用穩(wěn)定,本次想通過對的學(xué)習(xí)和個(gè)人解析,吸收框架的思想和設(shè)計(jì)知識,加強(qiáng)自己對的認(rèn)知和理解。當(dāng)然,筆者能力水平有限,后續(xù)的文章如有錯誤,還請指出和諒解。目錄如下后續(xù)添加文章都會記錄在此服務(wù)啟動過程以及主體設(shè)計(jì)流程源碼解析 前言 swoole是什么?官網(wǎng)的原話介紹是這樣的: Swoole 使用純 C 語言編寫,提供了 PHP 語言的異步多線程服務(wù)器,異步 TCP/UDP 網(wǎng)絡(luò)客戶端,異步 ...

    CoXie 評論0 收藏0
  • Swoft 源碼剖析 - 連接池

    摘要:基于擴(kuò)展實(shí)現(xiàn)真正的數(shù)據(jù)庫連接池這種方案中,項(xiàng)目占用的連接數(shù)僅僅為。一種是連接暫時(shí)不再使用,其占用狀態(tài)解除,可以從使用者手中交回到空閑隊(duì)列中這種我們稱為連接的歸隊(duì)。源碼剖析系列目錄 作者:bromine鏈接:https://www.jianshu.com/p/1a7...來源:簡書著作權(quán)歸作者所有,本文已獲得作者授權(quán)轉(zhuǎn)載,并對原文進(jìn)行了重新的排版。Swoft Github: https:...

    rozbo 評論0 收藏0
  • PHP 協(xié)程:Go + Chan + Defer

    摘要:為語言提供了強(qiáng)大的協(xié)程編程模式。提供的協(xié)程語法借鑒自,在此向開發(fā)組致敬協(xié)程可以與很好地互補(bǔ)。并發(fā)執(zhí)行使用創(chuàng)建協(xié)程,可以讓和兩個(gè)函數(shù)變成并發(fā)執(zhí)行。協(xié)程需要拿到請求的結(jié)果。 Swoole4為PHP語言提供了強(qiáng)大的CSP協(xié)程編程模式。底層提供了3個(gè)關(guān)鍵詞,可以方便地實(shí)現(xiàn)各類功能。 Swoole4提供的PHP協(xié)程語法借鑒自Golang,在此向GO開發(fā)組致敬 PHP+Swoole協(xié)程可以與...

    nidaye 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<