成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

【Swoole源碼研究】深入理解Swoole協(xié)程實現(xiàn)

darkerXi / 1304人閱讀

摘要:此時的協(xié)程實現(xiàn)無法完美的支持語法,其根本原因在于沒有保存棧信息。這是因為調(diào)用函數(shù)時,底層指令已經(jīng)將入棧了。協(xié)程創(chuàng)建時,底層通過函數(shù)實現(xiàn)了棧的創(chuàng)建創(chuàng)建并初始化棧為結(jié)構(gòu)分配空間創(chuàng)建新的執(zhí)行數(shù)據(jù)結(jié)構(gòu)從代碼中可以看到結(jié)構(gòu)是直接存儲在棧的底部。

作者:李樂
??本文基于Swoole-4.3.2和PHP-7.1.0版本

Swoole協(xié)程簡介

??Swoole4為PHP語言提供了強大的CSP協(xié)程編程模式,用戶可以通過go函數(shù)創(chuàng)建一個協(xié)程,以達到并發(fā)執(zhí)行的效果,如下面代碼所示:


??其實在Swoole4之前就實現(xiàn)了多協(xié)程編程模式,在協(xié)程創(chuàng)建、切換以及結(jié)束的時候,相應(yīng)的操作php棧即可(創(chuàng)建、切換以及回收php棧)。

??此時的協(xié)程實現(xiàn)無法完美的支持php語法,其根本原因在于沒有保存c棧信息。(vm內(nèi)部或者某些擴展提供的API是通過c函數(shù)實現(xiàn)的,調(diào)用這些函數(shù)時如果發(fā)生協(xié)程切換,c棧該如何處理?)

??Swoole4新增了c棧的管理,在協(xié)程創(chuàng)建、切換以及結(jié)束的同時會伴隨著c棧的創(chuàng)建、切換以及回收。

??Swoole4協(xié)程實現(xiàn)方案如下圖所示:

??其中:

API層是提供給用戶使用的協(xié)程相關(guān)函數(shù),比如go()函數(shù)用于創(chuàng)建協(xié)程;Co::yield()使得當前協(xié)程讓出CPU;Co::resume()可恢復(fù)某個協(xié)程執(zhí)行;

Swoole4協(xié)程需要同時管理c棧與php棧,Coroutine用于管理c棧,PHPCoroutine用于管理php棧;其中Coroutine(),yield(),resume()實現(xiàn)了c棧的創(chuàng)建以及換入換出;create_func(),on_yield(),on_resume()實現(xiàn)了php棧的創(chuàng)建以及換入換出;

Swoole4在管理c棧時,用到了 boost.context庫,make_fcontext()和jump_fcontext()函數(shù)均使用匯編語言編寫,實現(xiàn)了c棧上下文的創(chuàng)建以及切換;

Swoole4對boost.context進行了簡單封裝,即Context層,Context(),SwapIn()以及SwapOut()

對應(yīng)c棧的創(chuàng)建以及換入換出。

深入理解C棧

??函數(shù)是對代碼的封裝,對外暴露的只是一組指定的參數(shù)和一個可選的返回值;假設(shè)函數(shù)P調(diào)用函數(shù)Q,Q執(zhí)行后返回函數(shù)P,實現(xiàn)該函數(shù)調(diào)用需要考慮以下三點:

指令跳轉(zhuǎn):進入函數(shù)Q的時候,程序計數(shù)器必須被設(shè)置為Q的代碼的起始地址;在返回時,程序計數(shù)器需要設(shè)置為P中調(diào)用Q后面那條指令的地址;

數(shù)據(jù)傳遞:P能夠向Q提供一個或多個參數(shù),Q能夠向P返回一個值;

內(nèi)存分配與釋放:Q開始執(zhí)行時,可能需要為局部變量分配內(nèi)存空間,而在返回前,又需要釋放這些內(nèi)存空間;

??大多數(shù)語言的函數(shù)調(diào)用都采用了棧結(jié)構(gòu)實現(xiàn),函數(shù)的調(diào)用與返回即對應(yīng)的是一系列的入棧與出棧操作,我們通常稱之為函數(shù)棧幀(stack frame)。示意圖如下:

??上面提到的程序計數(shù)器即寄存器%rip,另外還有兩個寄存器需要重點關(guān)注:%rbp指向棧幀底部,%rsp指向棧幀頂部。

??下面將通過具體的代碼事例,為讀者講解函數(shù)棧幀。c代碼與匯編代碼如下:

int add(int x, int y)
{
    int a, b;
    a = 10;
    b = 5;
       return x+y;
}

int main()
{
    int sum = add(1,2);
}
main:
    pushq   %rbp
    movq    %rsp, %rbp
    subq    $16, %rsp
    movl    $2, %esi
    movl    $1, %edi
    call    add
    movl    %eax, -4(%rbp)
    leave
    ret
add:
    pushq   %rbp
    movq    %rsp, %rbp
    movl    %edi, -20(%rbp)
    movl    %esi, -24(%rbp)
    movl    $10, -4(%rbp)
    movl    $5, -8(%rbp)
    movl    -24(%rbp), %eax
    movl    -20(%rbp), %edx
    addl    %edx, %eax
    popq    %rbp
    ret

??分析匯編代碼:

main函數(shù)與add函數(shù)入口,首先將寄存器%rbp壓入棧中用于保存其值,其次移動%rbp指向當前棧頂部(此時%rbp,%rsp都指向棧頂,開始新的函數(shù)棧幀);

main函數(shù)"subq $16, %rsp",是為main函數(shù)棧幀預(yù)留16個字節(jié)的內(nèi)存空間;

調(diào)用add函數(shù)時,第一個參數(shù)和第二個參數(shù)分別保存在寄存器%edi和%esi,返回值保存在寄存器%eax;

call指令用于函數(shù)調(diào)用,實現(xiàn)了兩個功能:寄存器%rip壓入棧中,跳轉(zhuǎn)到新的代碼位置;

ret指令用于函數(shù)返回,彈出棧頂內(nèi)容到寄存器%rip,依次實現(xiàn)代碼跳轉(zhuǎn);

leave指令等同于兩條指令:movq %rsp,%rbp和popq %rbp,用于釋放main函數(shù)棧幀,恢復(fù)前一個函數(shù)棧幀;

注意add函數(shù)棧幀,并沒有為其分配空間,寄存器%rsp和%rbp都指向棧幀底部;根本因為是add函數(shù)沒有調(diào)用其他函數(shù)。

該程序的棧結(jié)構(gòu)示意圖如下:

??問題:觀察上面的匯編代碼,輸入?yún)?shù)分別使用的是寄存器%edi和%esi,返回值使用的是寄存器%eax,輸入輸出參數(shù)不應(yīng)該保存在棧上嗎?寄存器比內(nèi)存訪問要快的多,現(xiàn)代處理器寄存器數(shù)目也比較多,因此傾向于將參數(shù)優(yōu)先保存在寄存器。比如%rdi, %rsi, %rdx, %rcx, %r8d, %r9d 六個寄存器用于存儲函數(shù)調(diào)用時的前6個參數(shù),那么當輸入?yún)?shù)數(shù)目超過6個時,如何處理?這些輸入?yún)?shù)只能存儲在棧上了。
(%rdi等表示64位寄存器,%edi等表示32位寄存器)

//add函數(shù)需要9個參數(shù)
add(1,2,3,4,5,6,7,8,9);

//參數(shù)7,8,9存儲在棧上
movl    $9, 16(%rsp)
movl    $8, 8(%rsp)
movl    $7, (%rsp)
movl    $6, %r9d
movl    $5, %r8d
movl    $4, %ecx
movl    $3, %edx
movl    $2, %esi
movl    $1, %edi
Swoole C棧管理

??通過學習c?;局R,我們知道最主要有三個寄存器:%rip程序計數(shù)器指向下一條需要執(zhí)行的指令,%rbp指向函數(shù)棧幀底部,%rsp指向函數(shù)棧幀頂部。這三個寄存器可以確定一個c棧執(zhí)行上下文,c棧的管理其實就是這些寄存器的管理。

??第一節(jié)我們提到Swoole在管理c棧時,用到了 boost.context庫,其中make_fcontext()和jump_fcontext()函數(shù)均使用匯編語言編寫,實現(xiàn)了c棧執(zhí)行上下文的創(chuàng)建以及切換;函聲明命如下:

fcontext_t make_fcontext(void *sp, size_t size, void (*fn)(intptr_t));
intptr_t jump_fcontext(fcontext_t *ofc, fcontext_t nfc, intptr_t vp, bool preserve_fpu = false);

??make_fcontext函數(shù)用于創(chuàng)建一個執(zhí)行上下文,其中參數(shù)sp指向內(nèi)存最高地址處(在堆中分配一塊內(nèi)存作為該執(zhí)行上下文的c棧),參數(shù)size為棧大小,參數(shù)fn是一個函數(shù)指針,指向該執(zhí)行上下文的入口函數(shù);代碼主要邏輯如下:

/*%rdi表示第一個參數(shù)sp,指向棧頂*/
movq  %rdi, %rax

//保證%rax指向的地址按照16字節(jié)對齊
andq  $-16, %rax

//將%rax向低地址處偏移0x48字節(jié)
leaq  -0x48(%rax), %rax

/* %rdx表示第三個參數(shù)fn,保存在%rax偏移0x38位置處 */
movq  %rdx, 0x38(%rax)

stmxcsr  (%rax)
fnstcw   0x4(%rax)


leaq  finish(%rip), %rcx
movq  %rcx, 0x40(%rax)

//返回值保存在%rax寄存器
ret 

??make_fcontext函數(shù)創(chuàng)建的執(zhí)行上下文示意圖如下(可以看到預(yù)留了若干字節(jié)用于保存上下文信息):

??Swoole協(xié)程實現(xiàn)的Context層封裝了上下文的創(chuàng)建,創(chuàng)建上下文函數(shù)實現(xiàn)如下:

Context::Context(size_t stack_size, coroutine_func_t fn, void* private_data) :
        fn_(fn), stack_size_(stack_size), private_data_(private_data)
{
    
    stack_ = (char*) sw_malloc(stack_size_);

    void* sp = (void*) ((char*) stack_ + stack_size_);
    ctx_ = make_fcontext(sp, stack_size_, (void (*)(intptr_t))&context_func);

}

??可以看到c棧執(zhí)行上下文是通過sw_malloc函數(shù)在堆上分配的一塊內(nèi)存,默認大小為2M字節(jié);參數(shù)sp指向的是內(nèi)存最高地址處;執(zhí)行上下文的入口函數(shù)為Context::context_func()。

??jump_fcontext函數(shù)用于切換c棧上下文:1)函數(shù)會將當前上下文(寄存器)保存在當前棧頂(push),同時將%rsp寄存器內(nèi)容保存在ofc地址;2)函數(shù)從nfc地址處恢復(fù)%rsp寄存器內(nèi)容,同時從棧頂恢復(fù)上下文信息(pop)。代碼主要邏輯如下:

//-------------------保存當前c棧上下文-------------------
pushq  %rbp  /* save RBP */
pushq  %rbx  /* save RBX */
pushq  %r15  /* save R15 */
pushq  %r14  /* save R14 */
pushq  %r13  /* save R13 */
pushq  %r12  /* save R12 */

leaq  -0x8(%rsp), %rsp
stmxcsr  (%rsp)
fnstcw   0x4(%rsp)

//%rdi表示第一個參數(shù),即ofc,保存%rsp到ofc地址處
movq  %rsp, (%rdi)


//-------------------從nfc中恢復(fù)上下文-------------------
//%rsi表示第二個參數(shù),即nfc,從nfc地址處恢復(fù)%rsp
movq  %rsi, %rsp

ldmxcsr  (%rsp)
fldcw  0x4(%rsp)
leaq  0x8(%rsp), %rsp

popq  %r12  /* restrore R12 */
popq  %r13  /* restrore R13 */
popq  %r14  /* restrore R14 */
popq  %r15  /* restrore R15 */
popq  %rbx  /* restrore RBX */
popq  %rbp  /* restrore RBP */

//這里彈出的其實是之前保存的%rip
popq  %r8

//%rdx表示第三個參數(shù),%rax用于存儲函數(shù)返回值;
movq  %rdx, %rax
//%rdi用于存儲第一個參數(shù)
movq  %rdx, %rdi

//跳轉(zhuǎn)到%r8指向的地址
jmp  *%r8

??觀察jump_fcontext函數(shù)的匯編代碼,可以看到保存上下文與恢復(fù)上下文的代碼基本是對稱的?;謴?fù)上下文時"popq %r8"用于彈出上一次保存的程序計數(shù)器%rip的內(nèi)容,然而并沒有看到保存寄存器%rip的代碼。這是因為調(diào)用jump_fcontext函數(shù)時,底層call指令已經(jīng)將%rip入棧了。

??Swoole協(xié)程實現(xiàn)的Context層封裝了上下文的換入換出,可以在上下文swap_ctx_和ctx_之間隨時換入換出,代碼實現(xiàn)如下:

bool Context::SwapIn()
{
    jump_fcontext(&swap_ctx_, ctx_, (intptr_t) this, true);
    return true;
}

bool Context::SwapOut()
{
    jump_fcontext(&ctx_, swap_ctx_, (intptr_t) this, true);
    return true;
}

??上下文示意圖如下所示:

Swoole PHP棧管理

??php代碼在執(zhí)行時,同樣存在函數(shù)棧幀的分配與回收。php將此抽象為兩個結(jié)構(gòu),php棧zend_vm_stack,與執(zhí)行數(shù)據(jù)(函數(shù)棧幀)zend_execute_data。

??php棧結(jié)構(gòu)與c棧結(jié)構(gòu)基本類似,定義如下:

struct _zend_vm_stack {
    zval *top; 
    zval *end; 
    zend_vm_stack prev; 
};

??其中top字段指向棧頂位置,end字段指向棧底位置;prev指向上一個棧,形成鏈表,當??臻g不夠時,可以進行擴容。php虛擬機申請??臻g時默認大小為256K,Swoole創(chuàng)建??臻g時默認大小為8K。

??執(zhí)行數(shù)據(jù)結(jié)構(gòu)體,我們需要重點關(guān)注這幾個字段:當前函數(shù)編譯后的指令集(opline指向指令集數(shù)組中的某一個元素,虛擬機只需要遍歷該數(shù)組并執(zhí)行所有指令即可),函數(shù)返回值,以及調(diào)用該函數(shù)的執(zhí)行數(shù)據(jù);結(jié)構(gòu)定義如下:

struct _zend_execute_data {
    //當前執(zhí)行指令
    const zend_op       *opline; 
    
    zend_execute_data   *call; 
    //函數(shù)返回值          
    zval                *return_value;
    zend_function       *func;            
    zval                 This;      /* this + call_info + num_args */
    //調(diào)用當前函數(shù)的棧幀       
    zend_execute_data   *prev_execute_data;
    //符號表
    zend_array          *symbol_table;
#if ZEND_EX_USE_RUN_TIME_CACHE
    void               **run_time_cache;  
#endif
#if ZEND_EX_USE_LITERALS
    //常量數(shù)組
    zval                *literals;        
#endif
};

??php棧初始化函數(shù)為zend_vm_stack_init;當執(zhí)行用戶函數(shù)調(diào)用時,虛擬機通過函數(shù)zend_vm_stack_push_call_frame在php棧上分配新的執(zhí)行數(shù)據(jù),并執(zhí)行該函數(shù)代碼;函數(shù)執(zhí)行完成后,釋放該執(zhí)行數(shù)據(jù)。代碼邏輯如下:

ZEND_API void zend_execute(zend_op_array *op_array, zval *return_value)
{
    //分配新的執(zhí)行數(shù)據(jù)
    execute_data = zend_vm_stack_push_call_frame(ZEND_CALL_TOP_CODE | ZEND_CALL_HAS_SYMBOL_TABLE,
        (zend_function*)op_array, 0, zend_get_called_scope(EG(current_execute_data)), zend_get_this_object(EG(current_execute_data)));
    
    //設(shè)置prev
    execute_data->prev_execute_data = EG(current_execute_data);
    
    //初始化當前執(zhí)行數(shù)據(jù),op_array即為當前函數(shù)編譯得到的指令集
    i_init_execute_data(execute_data, op_array, return_value);
    
    //執(zhí)行函數(shù)代碼
    zend_execute_ex(execute_data);
    
    //釋放執(zhí)行數(shù)據(jù)
    zend_vm_stack_free_call_frame(execute_data);
}

??php棧幀結(jié)構(gòu)示意圖如下:

??Swoole協(xié)程實現(xiàn),需要自己管理php棧,在發(fā)生協(xié)程創(chuàng)建以及切換時,對應(yīng)的創(chuàng)建新的php棧,切換php棧,同時保存和恢復(fù)php棧上下文信息。這里涉及到一個很重要的結(jié)構(gòu)體php_coro_task:

struct php_coro_task
{
    zval *vm_stack_top;
    zval *vm_stack_end;
    zend_vm_stack vm_stack;
    
    zend_execute_data *execute_data;
};

??這里列出了php_coro_task結(jié)構(gòu)體的若干關(guān)鍵字段,這些字段用于保存和恢復(fù)php上下文信息。

??協(xié)程創(chuàng)建時,底層通過函數(shù)PHPCoroutine::create_func實現(xiàn)了php棧的創(chuàng)建:

void PHPCoroutine::create_func(void *arg)
{
    //創(chuàng)建并初始化php棧
    vm_stack_init();
    call = (zend_execute_data *) (EG(vm_stack_top));
    
    //為結(jié)構(gòu)php_coro_task分配空間
    task = (php_coro_task *) EG(vm_stack_top);
    EG(vm_stack_top) = (zval *) ((char *) call + PHP_CORO_TASK_SLOT * sizeof(zval));
    
    //創(chuàng)建新的執(zhí)行數(shù)據(jù)結(jié)構(gòu)
    call = zend_vm_stack_push_call_frame(
        ZEND_CALL_TOP_FUNCTION | ZEND_CALL_ALLOCATED,
        func, argc, fci_cache.called_scope, fci_cache.object
    );
}

??從代碼中可以看到結(jié)構(gòu)php_coro_task是直接存儲在php棧的底部。

??當通過yield函數(shù)讓出CPU時,底層會調(diào)用函數(shù)PHPCoroutine::on_yield切換php棧:

void PHPCoroutine::on_yield(void *arg)
{
    php_coro_task *task = (php_coro_task *) arg;
    php_coro_task *origin_task = get_origin_task(task);
    
    //保存當前php棧上下文信息到php_coro_task結(jié)構(gòu)
    save_task(task);
    
    //從php_coro_task結(jié)構(gòu)中恢復(fù)php棧上下文信息
    restore_task(origin_task);
}
Swoole協(xié)程實現(xiàn)

??前面我們簡單介紹了Swoole協(xié)程的實現(xiàn)方案,以及Swoole對c棧與php棧的管理,接下來將結(jié)合前面的知識,系統(tǒng)性的介紹Swoole協(xié)程的實現(xiàn)原理。

swoole協(xié)程數(shù)據(jù)模型

??話不多說,先看一張圖:

每個協(xié)程都需要管理自己的c棧與php棧;

Context封裝了c棧的管理操作;ctx_字段保存的是寄存器%rsp的內(nèi)容(指向c棧棧頂位置);swap_ctx_字段保存的是將被換出的協(xié)程寄存器%rsp內(nèi)容(即,將被換出的協(xié)程的c棧棧頂位置);SwapIn()對應(yīng)協(xié)程換入操作;SwapOut()對應(yīng)協(xié)程換出操作;

參考jump_fcontext實現(xiàn),協(xié)程在換出時,會將寄存器%rip,%rbp等暫存在c棧棧頂;協(xié)程在換入時,相應(yīng)的會從棧頂恢復(fù)這些寄存器的內(nèi)容;

Coroutine管理著協(xié)程所有內(nèi)容;cid字段表示當前協(xié)程的ID;task字段指向當前協(xié)程的php_coro_task結(jié)構(gòu),該結(jié)構(gòu)中保存的是當前協(xié)程的php棧信息(vm_stack_top,execute_data等);ctx字段指向的是當前協(xié)程的Context對象;origin字段指向的是另一個協(xié)程Coroutine對象;yield()和resume()對應(yīng)的是協(xié)程的換出換入操作;

注意到php_coro_task結(jié)構(gòu)的co字段指向其對應(yīng)的協(xié)程對象Coroutine;

Coroutine還有一些靜態(tài)屬性,靜態(tài)屬性的屬于類屬性,所有協(xié)程共享的;last_cid字段存儲的是當前最大的協(xié)程ID,創(chuàng)建協(xié)程時可用于生成協(xié)程ID;current字段指向的是當前正在運行的協(xié)程Coroutine對象;on_yield和on_resume是兩個函數(shù)指針,用于實現(xiàn)php棧的切換操作,實際指向的是方法PHPCoroutine::on_yield和PHPCoroutine::on_resume;

swoole協(xié)程實現(xiàn) 協(xié)程創(chuàng)建

??Swoole創(chuàng)建協(xié)程可以使用go()函數(shù),底層實現(xiàn)對應(yīng)的是PHP_FUNCTION(swoole_coroutine_create),其函數(shù)實現(xiàn)如下:

PHP_FUNCTION(swoole_coroutine_create)
{
    ……
    
    long cid = PHPCoroutine::create(&fci_cache, fci.param_count, fci.params);
}

long PHPCoroutine::create(zend_fcall_info_cache *fci_cache, uint32_t argc, zval *argv)
{
    ……
    
    save_task(get_task());

    return Coroutine::create(create_func, (void*) &php_coro_args);
}

class Coroutine
{
public:
    static inline long create(coroutine_func_t fn, void* args = nullptr)
    {
        return (new Coroutine(fn, args))->run();
    }
}

注意Coroutine::create函數(shù)第一個參數(shù)偉create_func,該函數(shù)后續(xù)用于創(chuàng)建php棧,并開始協(xié)程代碼的執(zhí)行;

可以看到PHPCoroutine::create在調(diào)用Coroutine::create創(chuàng)建創(chuàng)建協(xié)程之前,保存了當前php棧信息到php_coro_task結(jié)構(gòu)中。

注意主程序的php棧是虛擬機創(chuàng)建的,結(jié)構(gòu)與上面畫的協(xié)程php棧不同,主程序的php_coro_task結(jié)構(gòu)并沒有存儲在php棧上,而是一個靜態(tài)變量PHPCoroutine::main_task,從get_task方法可以看出,主程序中g(shù)et_current_task()返回的是null,因此最后獲得的php_coro_task結(jié)構(gòu)是PHPCoroutine::main_task。

class PHPCoroutine
{
public:
   static inline php_coro_task* get_task()
    {
        php_coro_task *task = (php_coro_task *) Coroutine::get_current_task();
        return task ? task : &main_task;
    } 
}

在Coroutine的構(gòu)造函數(shù)中完成了協(xié)程對象Coroutine的創(chuàng)建與初始化,以及Context對象的創(chuàng)建與初始化(創(chuàng)建了c棧);run()函數(shù)執(zhí)行了協(xié)程的換入,從而開始協(xié)程的運行;

//全局協(xié)程map
std::unordered_map Coroutine::coroutines;

class Coroutine
{
protected:
    Coroutine(coroutine_func_t fn, void *private_data) :
            ctx(stack_size, fn, private_data)
    {
        cid = ++last_cid;
        coroutines[cid] = this;
    }
    
    inline long run()
    {
        long cid = this->cid;
        origin = current;
        current = this;
        ctx.SwapIn();
        if (ctx.end)
        {
            close();
        }
        return cid;
    }
}

可以看到創(chuàng)建協(xié)程對象Coroutine時,通過last_cid來計算當前協(xié)程的ID,同時將該協(xié)程對象加入到全局map中;代碼ctx(stack_size, fn, private_data)創(chuàng)建并初始化了Context對象;

run()函數(shù)將該協(xié)程換入執(zhí)行時,賦值origin為當前協(xié)程(主程序中current為null),同時設(shè)置current為當前協(xié)程對象Coroutine;調(diào)用SwapIn()函數(shù)完成協(xié)程的換入執(zhí)行;最后如果協(xié)程執(zhí)行完畢,則關(guān)閉并釋放該協(xié)程對象Coroutine;

初始化Context對象時,可以看到其構(gòu)造函數(shù)Context::Context(size_t stack_size, coroutine_func_t fn, void* private_data),其中參數(shù)fn為協(xié)程入口函數(shù)(PHPCoroutine::create_func),可以看到其賦值給ontext對象的字段fn_,但是在創(chuàng)建c棧上下文時,其傳入的入口函數(shù)為context_func;

Context::Context(size_t stack_size, coroutine_func_t fn, void* private_data) :
        fn_(fn), stack_size_(stack_size), private_data_(private_data)
{
    ……
    
    ctx_ = make_fcontext(sp, stack_size_, (void (*)(intptr_t))&context_func);

}

函數(shù)context_func內(nèi)部其實調(diào)用的就是方法PHPCoroutine::create_func;當協(xié)程執(zhí)行結(jié)束時,會標記end字段為true,同時將該協(xié)程換出;

void Context::context_func(void *arg)
{
    Context *_this = (Context *) arg;
    _this->fn_(_this->private_data_);
    _this->end = true;
    _this->SwapOut();
}
??問題:參數(shù)arg為什么是Context對象呢,是如何傳遞的呢?這就涉及到j(luò)ump_fcontext匯編實現(xiàn),以及jump_fcontext的調(diào)用了
jump_fcontext(&swap_ctx_, ctx_, (intptr_t) this, true);

jump_fcontext:
    movq  %rdx, %rdi

??調(diào)用jump_fcontext函數(shù)時,第三個參數(shù)傳遞的是this,即當前Context對象;而函數(shù)jump_fcontext匯編實現(xiàn)時,將第三個參數(shù)的內(nèi)容拷貝到%rdi寄存器中,當協(xié)程換入執(zhí)行函數(shù)context_func時,寄存器%rdi存儲的就是第一個參數(shù),即Context對象。

方法PHPCoroutine::create_func就是創(chuàng)建并初始化php棧,執(zhí)行協(xié)程代碼;這里不做過多介紹。

??問題:Coroutine的靜態(tài)屬性on_yield和on_resume時什么時候賦值的?

??在Swoole模塊初始化時,會調(diào)用函數(shù)swoole_coroutine_util_init(該函數(shù)同時聲明了"Co"等短名稱),該函數(shù)進一步的調(diào)用PHPCoroutine::init()方法,該方法完成了靜態(tài)屬性的賦值操作。

void PHPCoroutine::init()
{
    Coroutine::set_on_yield(on_yield);
    Coroutine::set_on_resume(on_resume);
    Coroutine::set_on_close(on_close);
}
協(xié)程切換

??用戶可以通過Co::yield()和Co::resume()實現(xiàn)協(xié)程的讓出和恢復(fù),
Co::yield()的底層實現(xiàn)函數(shù)為PHP_METHOD(swoole_coroutine_util, yield),Co::resume()的底層實現(xiàn)函數(shù)為PHP_METHOD(swoole_coroutine_util, resume)。本節(jié)將為讀者講述協(xié)程切換的實現(xiàn)原理。

static unordered_map user_yield_coros;

static PHP_METHOD(swoole_coroutine_util, yield)
{
    Coroutine* co = Coroutine::get_current_safe();
    user_yield_coros[co->get_cid()] = co;
    co->yield();
    RETURN_TRUE;
}

static PHP_METHOD(swoole_coroutine_util, resume)
{
    ……
    auto coroutine_iterator = user_yield_coros.find(cid);
    if (coroutine_iterator == user_yield_coros.end())
    {
        swoole_php_fatal_error(E_WARNING, "you can not resume the coroutine which is in IO operation");
        RETURN_FALSE;
    }
    
    user_yield_coros.erase(cid);
    co->resume();
}

調(diào)用Co::resume()恢復(fù)某個協(xié)程之前,該協(xié)程必然已經(jīng)調(diào)用Co::yield()讓出CPU;因此在Co::yield()時,會將該協(xié)程對象添加到全局map中;Co::resume()時做相應(yīng)校驗,如果校驗通過則恢復(fù)協(xié)程,并從map種刪除該協(xié)程對象;

co->yield()實現(xiàn)了協(xié)程的讓出操作;1)設(shè)置協(xié)程狀態(tài)為SW_CORO_WAITING;2)回調(diào)on_yield方法,即PHPCoroutine::on_yield,保存當前協(xié)程(task代表協(xié)程)的php棧上下文,恢復(fù)另一個協(xié)程的php棧上下文(origin代表另一個協(xié)程對象);3)設(shè)置當前協(xié)程對象為origin;4)換出該協(xié)程;

void Coroutine::yield()
{
    state = SW_CORO_WAITING;
    if (on_yield)
    {
        on_yield(task);
    }
    current = origin;
    ctx.SwapOut();
}

co->resume()實現(xiàn)了協(xié)程的恢復(fù)操作:1)設(shè)置協(xié)程狀態(tài)為SW_CORO_RUNNING;2)回調(diào)on_resume方法,即PHPCoroutine::on_resume,保存當前協(xié)程(current協(xié)程)的php棧上下文,恢復(fù)另一個協(xié)程(task代表協(xié)程)的php棧上下文;3)設(shè)置origin為當前協(xié)程對象,current為即將要換入的協(xié)程對象;4)換入?yún)f(xié)程;

void Coroutine::resume()
{
    state = SW_CORO_RUNNING;
    if (on_resume)
    {
        on_resume(task);
    }
    origin = current;
    current = this;
    ctx.SwapIn();
    if (ctx.end)
    {
        close();
    }
}

Swoole協(xié)程有四種狀態(tài):初始化,運行中,等待運行,運行結(jié)束;定義如下:

typedef enum
{
    SW_CORO_INIT = 0,
    SW_CORO_WAITING,
    SW_CORO_RUNNING,
    SW_CORO_END,
} sw_coro_state;

協(xié)程之間可以通過Coroutine對象的origin字段形成一個類似鏈表的結(jié)構(gòu);Co::yield()換出當前協(xié)程時,會換入origin協(xié)程;在A協(xié)程種調(diào)用Co::resume()恢復(fù)B協(xié)程時,會換出A協(xié)程,換入B協(xié)程,同時標記A協(xié)程為B的origin協(xié)程;

??協(xié)程切換過程比較簡單,這里不做過多詳述。

協(xié)程調(diào)度

??當我們調(diào)用Co::sleep()讓協(xié)程休眠時,會換出當前協(xié)程;或者調(diào)用CoroutineSocket->recv()從socket接收數(shù)據(jù),但socket數(shù)據(jù)還沒有準備好時,會阻塞當前協(xié)程,從而使得協(xié)程換出。那么問題來了,什么時候再換入執(zhí)行這個協(xié)程呢?

socket讀寫實現(xiàn)

??Swoole的socket讀寫使用的成熟的IO多路復(fù)用模型:epoll/kqueue/select/poll等,并且將其封裝在結(jié)構(gòu)體_swReactor中,其定義如下:

struct _swReactor
{
    //超時時間
    int32_t timeout_msec;
    
    //fd的讀寫事件處理函數(shù)
    swReactor_handle handle[SW_MAX_FDTYPE];        
    swReactor_handle write_handle[SW_MAX_FDTYPE];  
    swReactor_handle error_handle[SW_MAX_FDTYPE];
    
    //fd事件的注冊修改刪除以及wait
    //函數(shù)指針,(以epoll為例)指向的是epoll_ctl、epoll_wait
    int (*add)(swReactor *, int fd, int fdtype);
    int (*set)(swReactor *, int fd, int fdtype);
    int (*del)(swReactor *, int fd);
    int (*wait)(swReactor *, struct timeval *);
    void (*free)(swReactor *);
    
    //超時回調(diào)函數(shù),結(jié)束、開始回調(diào)函數(shù)
    void (*onTimeout)(swReactor *);
    void (*onFinish)(swReactor *);
    void (*onBegin)(swReactor *);
}

??在調(diào)用函數(shù)PHPCoroutine::create創(chuàng)建協(xié)程時,會校驗是否已經(jīng)初始化_swReactor對象,如果沒有則會調(diào)用php_swoole_reactor_init函數(shù)創(chuàng)建并初始化main_reactor對象;

void php_swoole_reactor_init()
{
    if (SwooleG.main_reactor == NULL)
    {

        SwooleG.main_reactor = (swReactor *) sw_malloc(sizeof(swReactor));
        
        if (swReactor_create(SwooleG.main_reactor, SW_REACTOR_MAXEVENTS) < 0)
        {
           
        }

        ……
        
        php_swoole_register_shutdown_function_prepend("swoole_event_wait");
    }
    
}

??我們以epoll為例,main_reactor各回調(diào)函數(shù)如下:

reactor->onFinish = swReactor_onFinish;
reactor->onTimeout = swReactor_onTimeout;

reactor->add = swReactorEpoll_add;
reactor->set = swReactorEpoll_set;
reactor->del = swReactorEpoll_del;
reactor->wait = swReactorEpoll_wait;
reactor->free = swReactorEpoll_free;
??注意:這里注冊了一個函數(shù)swoole_event_wait,在生命周期register_shutdown階段會執(zhí)行該函數(shù),開始Swoole的事件循環(huán),阻擋了php生命周期的結(jié)束。

??類Socket封裝了socket讀寫相關(guān)的所有操作以及數(shù)據(jù)結(jié)構(gòu),其定義如下:

class Socket
{
public:
    swConnection *socket = nullptr;

    //讀寫函數(shù)
    ssize_t recv(void *__buf, size_t __n);
    ssize_t send(const void *__buf, size_t __n);
    ……
    
private:

    swReactor *reactor = nullptr;
    Coroutine *read_co = nullptr;
    Coroutine *write_co = nullptr;
    
    //連接超時時間,接收數(shù)據(jù)、發(fā)送數(shù)據(jù)超時時間
    double connect_timeout = default_connect_timeout;
    double read_timeout = default_read_timeout;
    double write_timeout = default_write_timeout;
}

socket字段類型為swConnection,代表傳輸層連接;

reactor字段指向結(jié)構(gòu)體swReactor對象,用于fd事件的注冊、修改、刪除以及wait;

當調(diào)用recv()函數(shù)接收數(shù)據(jù),阻塞了該協(xié)程時,read_co字段指向該協(xié)程對象Coroutine;

當調(diào)用send()函數(shù)接收數(shù)據(jù),阻塞了該協(xié)程時,write_co字段指向該協(xié)程對象Coroutine;

類Socket初始化函數(shù)為Socket::init_sock:

void Socket::init_sock(int _fd)
{
    
    reactor = SwooleG.main_reactor;
    
    //設(shè)置協(xié)程類型fd(SW_FD_CORO_SOCKET)的讀寫事件處理函數(shù)
    if (!swReactor_handle_isset(reactor, SW_FD_CORO_SOCKET))
    {
        reactor->setHandle(reactor, SW_FD_CORO_SOCKET | SW_EVENT_READ, readable_event_callback);
        reactor->setHandle(reactor, SW_FD_CORO_SOCKET | SW_EVENT_WRITE, writable_event_callback);
        reactor->setHandle(reactor, SW_FD_CORO_SOCKET | SW_EVENT_ERROR, error_event_callback);
    }
}

??當我們調(diào)用CoroutineSocket->recv接收數(shù)據(jù)時,底層實現(xiàn)如下:

Socket::timeout_setter ts(sock->socket, timeout, SW_TIMEOUT_READ);
ssize_t bytes = all ? sock->socket->recv_all(ZSTR_VAL(buf), length) : sock->socket->recv(ZSTR_VAL(buf), length);
    

??類timeout_setter會設(shè)置socket的接收數(shù)據(jù)超時時間read_timeout為timeout。

??函數(shù)socket->recv_all會循環(huán)讀取數(shù)據(jù),直到讀取到指定長度的數(shù)據(jù),或者底層返回等待標識阻塞當前協(xié)程:

ssize_t Socket::recv_all(void *__buf, size_t __n)
{
 
    timer_controller timer(&read_timer, read_timeout, this, timer_callback);
    while (true)
    {
        do {
            retval = swConnection_recv(socket, (char *) __buf + total_bytes, __n - total_bytes, 0);
        } while (retval < 0 && swConnection_error(errno) == SW_WAIT && timer.start() && wait_event(SW_EVENT_READ));
        if (unlikely(retval <= 0))
        {
            break;
        }
        total_bytes += retval;
        if ((size_t) total_bytes == __n)
        {
            break;
        }
    }
}

函數(shù)首先創(chuàng)建timer_controller對象,設(shè)置其超時時間為read_timeout,以及超時回調(diào)函數(shù)為timer_callback;

while (true)死循環(huán)讀取fd數(shù)據(jù),當讀取數(shù)據(jù)量等于__n時,讀取操作結(jié)束,break該循環(huán);如果讀取操作swConnection_recv返回值小于0,并且錯誤標識為SW_WAIT,說明需要等待數(shù)據(jù)到來,此時阻塞當前協(xié)程等待數(shù)據(jù)到來(函數(shù)wait_event會換出當前協(xié)程),阻塞超時時間為read_timeout(函數(shù)timer.start()用于設(shè)置超時時間)。

class timer_controller
{
public:
    bool start()
    {
        
        if (timeout > 0)
        {
            *timer_pp = swTimer_add(&SwooleG.timer, (long) (timeout * 1000), 0, data, callback);
        }

    }
}

函數(shù)swTimer_add用于添加一個定時器;Swoole底層定時任務(wù)是通過最小堆實現(xiàn)的,堆頂元素的超時時間最近;結(jié)構(gòu)體_swTimer維護著Swoole內(nèi)部所有的定時任務(wù):

struct _swTimer
{
    swHeap *heap; //最小堆
    swHashMap *map; //map,定時器ID作為key
    
    //最早的定時任務(wù)觸發(fā)時間
    long _next_msec;
    
    //函數(shù)指針,指向swReactorTimer_set
    int (*set)(swTimer *timer, long exec_msec);
    
    //函數(shù)指針,指向swReactorTimer_free
    void (*free)(swTimer *timer);
};

當調(diào)用swTimer_add向_swTimer結(jié)構(gòu)中添加定時任務(wù)時,需要更新_swTimer中最早的定時任務(wù)觸發(fā)時間_next_msec,同時更新main_reactor對象的超時時間:

if (timer->_next_msec < 0 || timer->_next_msec > _msec)
{
    timer->set(timer, _msec);
    timer->_next_msec = _msec;
}

static int swReactorTimer_set(swTimer *timer, long exec_msec)
{
    SwooleG.main_reactor->timeout_msec = exec_msec;
    return SW_OK;
}

函數(shù)wait_event負責將當前協(xié)程換出,直到注冊的事件發(fā)生

bool Socket::wait_event(const enum swEvent_type event, const void **__buf, size_t __n)
{
    if (unlikely(!add_event(event)))
    {
        return false;
    }
    
    if (likely(event == SW_EVENT_READ))
    {
        read_co = co;
        read_co->yield();
        read_co = nullptr;
    }
    else // if (event == SW_EVENT_WRITE)
    {
        write_co = co;
        write_co->yield();
        write_co = nullptr;
    }
}

函數(shù)add_event用于添加事件,底層調(diào)用reactor->add添加fd的監(jiān)聽事件;

read_co = co或者write_co = co,用于記錄當前哪個協(xié)程阻塞在該socket對象上,當該socket對象的讀寫事件被觸發(fā)時,可以恢復(fù)該協(xié)程執(zhí)行;

函數(shù)yield()將該協(xié)程換出;

??上面提到,創(chuàng)建協(xié)程時,注冊了一個函數(shù)swoole_event_wait,在生命周期register_shutdown階段會執(zhí)行該函數(shù),開始Swoole的事件循環(huán),阻擋了php生命周期的結(jié)束。函數(shù)swoole_event_wait底層就是調(diào)用main_reactor->wait等待fd讀寫事件的產(chǎn)生;我們以epoll為例講述事件循環(huán)的邏輯:

static int swReactorEpoll_wait(swReactor *reactor, struct timeval *timeo)
{
    while (reactor->running > 0)
    {
        n = epoll_wait(epoll_fd, events, max_event_num, swReactor_get_timeout_msec(reactor));
        
        if (n == 0)
        {
            if (reactor->onTimeout != NULL)
            {
                reactor->onTimeout(reactor);
            }
            SW_REACTOR_CONTINUE;
        }
        
        for (i = 0; i < n; i++)
        {
            if ((events[i].events & EPOLLIN) && !event.socket->removed)
            {
                handle = swReactor_getHandle(reactor, SW_EVENT_READ, event.type);
                ret = handle(reactor, &event);
                
            }
            
            if ((events[i].events & EPOLLOUT) && !event.socket->removed)
            {
                handle = swReactor_getHandle(reactor, SW_EVENT_WRITE, event.type);
                ret = handle(reactor, &event);
               
            }
        }
    }
}

swReactorEpoll_wait是對函數(shù)epoll_wait的封裝;當有讀寫事件發(fā)生時,執(zhí)行相應(yīng)的handle,根據(jù)上面的講解我們知道讀寫事件的handle分別為readable_event_callback和writable_event_callback;

int Socket::readable_event_callback(swReactor *reactor, swEvent *event)
{
    Socket *socket = (Socket *) event->socket->object;

    socket->read_co->resume();

}

可以看到函數(shù)readable_event_callback只是簡單的恢復(fù)read_co協(xié)程即可;

當epoll_wait發(fā)生超時,最終調(diào)用的是函數(shù)swReactor_onTimeout,該函數(shù)會從Swoole維護的一系列定時任務(wù)swTimer中查找已經(jīng)超時的定時任務(wù),同時執(zhí)行其callback回調(diào);

while ((tmp = swHeap_top(timer->heap)))
{
    tnode = tmp->data;
    if (tnode->exec_msec > now_msec || tnode->round == timer->round)
    {
        break;
    }

    timer->_current_id = tnode->id;
    if (!tnode->remove)
    {
        tnode->callback(timer, tnode);
    }
    
    ……
}

//該定時任務(wù)沒有超時,需要更新需要更新_swTimer中最早的定時任務(wù)觸發(fā)時間_next_msec
long next_msec = tnode->exec_msec - now_msec;
if (next_msec <= 0)
{
    next_msec = 1;
}
//同時更新main_reactor對象的超時時間,實現(xiàn)函數(shù)為swReactorTimer_set
timer->set(timer, next_msec);

該callback回調(diào)函數(shù)即為上面設(shè)置的timer_callback:

void Socket::timer_callback(swTimer *timer, swTimer_node *tnode)
{
    Socket *socket = (Socket *) tnode->data;
    socket->set_err(ETIMEDOUT);
    if (likely(tnode == socket->read_timer))
    {
        socket->read_timer = nullptr;
        socket->read_co->resume();
    }
    else if (tnode == socket->write_timer)
    {
        socket->write_timer = nullptr;
        socket->write_co->resume();
    }
}

同樣的,timer_callback函數(shù)只是簡單的恢復(fù)read_co或者write_co協(xié)程即可

sleep實現(xiàn)

??Co::sleep()的實現(xiàn)函數(shù)為PHP_METHOD(swoole_coroutine_util, sleep),該函數(shù)通過調(diào)用Coroutine::sleep實現(xiàn)了協(xié)程休眠的功能:

int Coroutine::sleep(double sec)
{
    Coroutine* co = Coroutine::get_current_safe();
    if (swTimer_add(&SwooleG.timer, (long) (sec * 1000), 0, co, sleep_timeout) == NULL)
    {
        return -1;
    }
    co->yield();
    return 0;
}

??可以看到,與socket讀寫事件超時處理相同,sleep內(nèi)部實現(xiàn)時通過swTimer_add添加定時任務(wù),同時換出當前協(xié)程實現(xiàn)的。該定時任務(wù)會導(dǎo)致main_reactor對象的超時時間的改變,即修改了epoll_wait的超時時間。

??sleep的超時處理函數(shù)為sleep_timeout,只需要換入該阻塞協(xié)程對象即可,實現(xiàn)如下:

static void sleep_timeout(swTimer *timer, swTimer_node *tnode)
{
    ((Coroutine *) tnode->data)->resume();
}

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/31412.html

相關(guān)文章

  • Swoole協(xié)程之旅-前篇

    摘要:協(xié)程完全有用戶態(tài)程序控制,所以也被成為用戶態(tài)的線程。目前支持協(xié)程的語言有很多,例如等。協(xié)程之旅前篇結(jié)束,下一篇文章我們將深入分析原生協(xié)程部分的實現(xiàn)。 寫在最前 ??Swoole協(xié)程經(jīng)歷了幾個里程碑,我們需要在前進的道路上不斷總結(jié)與回顧自己的發(fā)展歷程,正所謂溫故而知新,本系列文章將分為協(xié)程之旅前、中、后三篇。 前篇主要介紹協(xié)程的概念和Swoole幾個版本協(xié)程實現(xiàn)的主要方案技術(shù); 中篇主...

    terasum 評論0 收藏0
  • swoole——從入門到放棄(三)

    摘要:從入門到放棄三一進程子進程創(chuàng)建成功后要執(zhí)行的函數(shù)重定向子進程的標準輸入和輸出。默認為阻塞讀取。是否創(chuàng)建管道,啟用后,此選項將忽略用戶參數(shù),強制為。 swoole——從入門到放棄(三) 一、進程 swoole_process SwooleProcess swoole_process::__construct(callable $function, $redirect_stdin...

    王笑朝 評論0 收藏0
  • swoole——從入門到放棄(三)

    摘要:從入門到放棄三一進程子進程創(chuàng)建成功后要執(zhí)行的函數(shù)重定向子進程的標準輸入和輸出。默認為阻塞讀取。是否創(chuàng)建管道,啟用后,此選項將忽略用戶參數(shù),強制為。 swoole——從入門到放棄(三) 一、進程 swoole_process SwooleProcess swoole_process::__construct(callable $function, $redirect_stdin...

    rottengeek 評論0 收藏0
  • imi v1.0 正式版,專注單體應(yīng)用的 PHP 協(xié)程應(yīng)用開發(fā)框架

    摘要:年開發(fā)并發(fā)布框架現(xiàn)已停止維護。經(jīng)過一年實戰(zhàn),年月日,一周年之際正式發(fā)布版本。宇潤部分開源項目我已通過碼云平臺,向項目力所能及地捐款,聊表心意。所以,目前主打的還是單體應(yīng)用開發(fā)。協(xié)議的開發(fā),也是帶來的一大優(yōu)勢。 imi 介紹 showImg(https://segmentfault.com/img/bVbuab9?w=291&h=187); imi 是基于 PHP 協(xié)程應(yīng)用開發(fā)框架,它支...

    genefy 評論0 收藏0
  • Easyswoole 源碼學習和個人解析 目錄

    摘要:易用穩(wěn)定,本次想通過對的學習和個人解析,吸收框架的思想和設(shè)計知識,加強自己對的認知和理解。當然,筆者能力水平有限,后續(xù)的文章如有錯誤,還請指出和諒解。目錄如下后續(xù)添加文章都會記錄在此服務(wù)啟動過程以及主體設(shè)計流程源碼解析 前言 swoole是什么?官網(wǎng)的原話介紹是這樣的: Swoole 使用純 C 語言編寫,提供了 PHP 語言的異步多線程服務(wù)器,異步 TCP/UDP 網(wǎng)絡(luò)客戶端,異步 ...

    CoXie 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<