成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

TiKV 源碼解析系列文章(八)grpc-rs 的封裝與實(shí)現(xiàn)

weknow619 / 759人閱讀

摘要:要說明的實(shí)現(xiàn),需要先介紹的運(yùn)行方式。封裝與實(shí)現(xiàn)細(xì)節(jié)通過上文的分析可以明顯看到,的通知機(jī)制其實(shí)和的通知機(jī)制非常類似。是用來處理上文提到的返回結(jié)果的。儲(chǔ)存的是和收到的消息。小結(jié)最后簡要總結(jié)一下的封裝和實(shí)現(xiàn)過程。

作者: 李建俊

上一篇《gRPC Server 的初始化和啟動(dòng)流程》為大家介紹了 gRPC Server 的初始化和啟動(dòng)流程,本篇將帶大家深入到 grpc-rs 這個(gè)庫里,查看 RPC 請求是如何被封裝和派發(fā)的,以及它是怎么和 Rust Future 進(jìn)行結(jié)合的。

gRPC C Core

gRPC 包括了一系列復(fù)雜的協(xié)議和流控機(jī)制,如果要為每個(gè)語言都實(shí)現(xiàn)一遍這些機(jī)制和協(xié)議,將會(huì)是一個(gè)很繁重的工作。因此 gRPC 提供了一個(gè)統(tǒng)一的庫來提供基本的實(shí)現(xiàn),其他語言再基于這個(gè)實(shí)現(xiàn)進(jìn)行封裝和適配,提供更符合相應(yīng)語言習(xí)慣或生態(tài)的接口。這個(gè)庫就是 gRPC C Core,grpc-rs 就是基于 gRPC C Core 進(jìn)行封裝的。

要說明 grpc-rs 的實(shí)現(xiàn),需要先介紹 gRPC C Core 的運(yùn)行方式。gRPC C Core 有三個(gè)很關(guān)鍵的概念 grpc_channel、grpc_completion_queue、grpc_call。grpc_channel 在 RPC 里就是底層的連接,grpc_completion_queue 就是一個(gè)處理完成事件的隊(duì)列。grpc_call 代表的是一個(gè) RPC。要進(jìn)行一次 RPC,首先從 grpc_channel 創(chuàng)建一個(gè) grpc_call,然后再給這個(gè) grpc_call 發(fā)送請求,收取響應(yīng)。而這個(gè)過程都是異步,所以需要調(diào)用 grpc_completion_queue 的接口去驅(qū)動(dòng)消息處理。整個(gè)過程可以通過以下代碼來解釋(為了讓代碼更可讀一些,以下代碼和實(shí)際可編譯運(yùn)行的代碼有一些出入)。

grpc_completion_queue* queue = grpc_completion_queue_create_for_next(NULL);
grpc_channel* ch = grpc_insecure_channel_create("example.com", NULL);
grpc_call* call = grpc_channel_create_call(ch, NULL, 0, queue, "say_hello");
grpc_op ops[6];
memset(ops, 0, sizeof(ops));
char* buffer = (char*) malloc(100);
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
ops[1].op = GRPC_OP_SEND_MESSAGE;
ops[1].data.send_message.send_message = "gRPC";
ops[2].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
ops[3].op = GRPC_OP_RECV_INITIAL_METADATA;
ops[4].op = GRPC_OP_RECV_MESSAGE;
ops[4].data.recv_message.recv_message = buffer;
ops[5].op = GRPC_OP_RECV_STATUS_ON_CLIENT;
void* tag = malloc(1);
grpc_call_start_batch(call, ops, 6, tag);
grpc_event ev = grpc_completion_queue_next(queue);
ASSERT_EQ(ev.tag, tag);
ASSERT(strcmp(buffer, "Hello gRPC"));

可以看到,對 grpc_call 的操作是通過一次 grpc_call_start_batch 來指定的。這個(gè) start batch 會(huì)將指定的操作放在內(nèi)存 buffer 當(dāng)中,然后通過 grpc_completion_queue_next 來實(shí)際執(zhí)行相關(guān)操作,如收發(fā)消息。這里需要注意的是 tag 這個(gè)變量。當(dāng)這些操作都完成以后,grpc_completion_queue_next 會(huì)返回一個(gè)包含 tag 的消息來通知這個(gè)操作完成了。所以在代碼的末尾就可以在先前指定的 buffer 讀出預(yù)期的字符串。

由于篇幅有限,對于 gRPC C Core 的解析就不再深入了,對這部分很感興趣的朋友也可以在 github.com/grpc/grpc 閱讀相關(guān)文檔和源碼。

封裝與實(shí)現(xiàn)細(xì)節(jié)

通過上文的分析可以明顯看到,gRPC C Core 的通知機(jī)制其實(shí)和 Rust Future 的通知機(jī)制非常類似。Rust Future 提供一個(gè) poll 方法來檢驗(yàn)當(dāng)前 Future 是否已經(jīng) ready。如果尚未 ready,poll 方法會(huì)注冊一個(gè)通知鉤子 task。等到 ready 時(shí),task 會(huì)被調(diào)用,從而觸發(fā)對這個(gè) Future 的再次 poll,獲取結(jié)果。task 其實(shí)和上文中的 tag 正好對應(yīng)起來了,而在 grpc-rs 中,tag 就是一個(gè)儲(chǔ)存了 task 的 enum。

pub enum CallTag {
   Batch(BatchPromise),
   Request(RequestCallback),
   UnaryRequest(UnaryRequestCallback),
   Abort(Abort),
   Shutdown(ShutdownPromise),
   Spawn(SpawnNotify),
}

tag 之所以是一個(gè) enum 是因?yàn)椴煌?call 會(huì)對應(yīng)不同的行為,如對于服務(wù)器端接受請求的處理和客戶端發(fā)起請求的處理就不太一樣。

grpc-rs 在初始化時(shí)會(huì)創(chuàng)建多個(gè)線程來不斷調(diào)用 grpc_completion_queue_next 來獲取已經(jīng)完成的 tag,然后根據(jù) tag 的類型,將數(shù)據(jù)存放在結(jié)構(gòu)體中并通知 task 來獲取。下面是這個(gè)流程的代碼。

// event loop
fn poll_queue(cq: Arc) {
   let id = thread::current().id();
   let cq = CompletionQueue::new(cq, id);
   loop {
       let e = cq.next();
       match e.event_type {
           EventType::QueueShutdown => break,
           // timeout should not happen in theory.
           EventType::QueueTimeout => continue,
           EventType::OpComplete => {}
       }

       let tag: Box = unsafe { Box::from_raw(e.tag as _) };

       tag.resolve(&cq, e.success != 0);
   }
}

可以看到,tag 會(huì)被強(qiáng)轉(zhuǎn)成為一個(gè) CallTag,然后調(diào)用 resolve 方法來處理結(jié)果。不同的 enum 類型會(huì)有不同的 resolve 方式,這里挑選其中 CallTag::BatchCallTag::Request 來進(jìn)行解釋,其他的 CallTag 流程類似。

BatchPromise 是用來處理上文提到的 grpc_call_start_batch 返回結(jié)果的 tag。RequestCallback 則用來接受新的 RPC 請求。下面是 BatchPromise 的定義及其 resolve 方法。

/// A promise used to resolve batch jobs.
pub struct BatchPromise {
   ty: BatchType,
   ctx: BatchContext,
   inner: Arc>>,
}

impl BatchPromise {
   fn handle_unary_response(&mut self) {
       let task = {
           let mut guard = self.inner.lock();
           let status = self.ctx.rpc_status();
           if status.status == RpcStatusCode::Ok {
               guard.set_result(Ok(self.ctx.recv_message()))
           } else {
               guard.set_result(Err(Error::RpcFailure(status)))
           }
       };
       task.map(|t| t.notify());
   }

   pub fn resolve(mut self, success: bool) {
       match self.ty {
           BatchType::CheckRead => {
               assert!(success);
               self.handle_unary_response();
           }
           BatchType::Finish => {
               self.finish_response(success);
           }
           BatchType::Read => {
               self.read_one_msg(success);
           }
       }
   }
}

上面代碼中的 ctx 是用來儲(chǔ)存響應(yīng)的字段,包括響應(yīng)頭、數(shù)據(jù)之類的。當(dāng) next 返回時(shí),gRPC C Core 會(huì)將對應(yīng)內(nèi)容填充到這個(gè)結(jié)構(gòu)體里。inner 儲(chǔ)存的是 task 和收到的消息。當(dāng) resolve 被調(diào)用時(shí),先判斷這個(gè) tag 要執(zhí)行的是什么任務(wù)。BatchType::CheckRead 表示是一問一答式的讀取任務(wù),Batch::Finish 表示的是沒有返回?cái)?shù)據(jù)的任務(wù),BatchType::Read 表示的是流式響應(yīng)里讀取單個(gè)消息的任務(wù)。拿 CheckRead 舉例,它會(huì)將拉取到的數(shù)據(jù)存放在 inner 里,并通知 task。而 task 對應(yīng)的 Future 再被 poll 時(shí)就可以拿到對應(yīng)的數(shù)據(jù)了。這個(gè) Future 的定義如下:

/// A future object for task that is scheduled to `CompletionQueue`.
pub struct CqFuture {
    inner: Arc>,
}

impl Future for CqFuture {
    type Item = T;
    type Error = Error;

    fn poll(&mut self) -> Poll {
        let mut guard = self.inner.lock();
        if guard.stale {
            panic!("Resolved future is not supposed to be polled again.");
        }

        if let Some(res) = guard.result.take() {
            guard.stale = true;
            return Ok(Async::Ready(res?));
        }

        // So the task has not been finished yet, add notification hook.
        if guard.task.is_none() || !guard.task.as_ref().unwrap().will_notify_current() {
            guard.task = Some(task::current());
        }

        Ok(Async::NotReady)
    }
}

Inner 是一個(gè) SpinLock。如果在 poll 時(shí)還沒拿到結(jié)果時(shí),會(huì)將 task 存放在鎖里,在有結(jié)果的時(shí)候,存放結(jié)果并通過 task 通知再次 poll。如果有結(jié)果則直接返回結(jié)果。

下面是 RequestCallback 的定義和 resolve 方法。

pub struct RequestCallback {
   ctx: RequestContext,
}

impl RequestCallback {
   pub fn resolve(mut self, cq: &CompletionQueue, success: bool) {
       let mut rc = self.ctx.take_request_call_context().unwrap();
       if !success {
           server::request_call(rc, cq);
           return;
       }

       match self.ctx.handle_stream_req(cq, &mut rc) {
           Ok(_) => server::request_call(rc, cq),
           Err(ctx) => ctx.handle_unary_req(rc, cq),
       }
   }
}

上面代碼中的 ctx 是用來儲(chǔ)存請求的字段,主要包括請求頭。和 BatchPromise 類似,ctx 的內(nèi)容也是在調(diào)用 next 方法時(shí)被填充。在 resolve 時(shí),如果失敗,則再次調(diào)用 request_call 來接受下一個(gè) RPC,否則會(huì)調(diào)用對應(yīng)的 RPC 方法。

handle_stream_req 的定義如下:

pub fn handle_stream_req(
   self,
   cq: &CompletionQueue,
   rc: &mut RequestCallContext,
) -> result::Result<(), Self> {
   let handler = unsafe { rc.get_handler(self.method()) };
   match handler {
       Some(handler) => match handler.method_type() {
           MethodType::Unary | MethodType::ServerStreaming => Err(self),
           _ => {
               execute(self, cq, None, handler);
               Ok(())
           }
       },
       None => {
           execute_unimplemented(self, cq.clone());
           Ok(())
       }
   }
}

從上面可以看到,整個(gè)過程先通過 get_handler,根據(jù) RPC 想要執(zhí)行的方法名字拿到方法并調(diào)用,如果方法不存在,則向客戶端報(bào)錯(cuò)??梢钥吹竭@里對于 UnaryServerStreaming 返回了錯(cuò)誤。這是因?yàn)檫@兩種請求都是客戶端只發(fā)一次請求,所以返回錯(cuò)誤讓 resolve 繼續(xù)拉取消息體然后再執(zhí)行對應(yīng)的方法。

為什么 get_handler 可以知道調(diào)用的是什么方法呢?這是因?yàn)?gRPC 編譯器在生成代碼里對這些方法進(jìn)行了映射,具體的細(xì)節(jié)在生成的 create_xxx_service 里,本文就不再展開了。

小結(jié)

最后簡要總結(jié)一下 grpc-rs 的封裝和實(shí)現(xiàn)過程。當(dāng) grpc-rs 初始化時(shí),會(huì)創(chuàng)建數(shù)個(gè)線程輪詢消息隊(duì)列(grpc_completion_queue)并 resolve。當(dāng) server 被創(chuàng)建時(shí),RPC 會(huì)被注冊起來,server 啟動(dòng)時(shí),grpc-rs 會(huì)創(chuàng)建數(shù)個(gè) RequestCall 來接受請求。當(dāng)有 RPC 請求發(fā)到服務(wù)器端時(shí),CallTag::Request 就會(huì)被返回并 resolve,并在 resolve 中調(diào)用對應(yīng)的 RPC 方法。而 client 在調(diào)用 RPC 時(shí),其實(shí)都是創(chuàng)建了一個(gè) Call,并產(chǎn)生相應(yīng)的 BatchPromise 來異步通知 RPC 方法是否已經(jīng)完成。

還有很多 grpc-rs 的源碼在我們的文章中暫未涉及,其中還有不少有趣的技巧,比如,如何減少喚醒線程的次數(shù)而減少切換、如何無鎖地注冊調(diào)用各個(gè) service 鉤子等。歡迎有好奇心的小伙伴自行閱讀源碼,也歡迎大家提 issue 或 PR 一起來完善這個(gè)項(xiàng)目。

原文閱讀:https://www.pingcap.com/blog-cn/tikv-source-code-reading-8/

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/18039.html

相關(guān)文章

  • TiKV 源碼解析系列文章(七)gRPC Server 初始化和啟動(dòng)流程

    摘要:作者屈鵬本篇源碼解析將為大家介紹的另一周邊組件。這個(gè)函數(shù)會(huì)往完成隊(duì)列中注冊若干個(gè),相當(dāng)于用往一個(gè)中注冊一些事件的關(guān)注。在函數(shù)返回之后,服務(wù)端的初始化及啟動(dòng)過程便結(jié)束了。 作者:屈鵬 本篇 TiKV 源碼解析將為大家介紹 TiKV 的另一周邊組件—— grpc-rs。grpc-rs 是 PingCAP 實(shí)現(xiàn)的一個(gè) gRPC 的 Rust 綁定,其 Server/Client 端的代碼框架...

    YacaToy 評(píng)論0 收藏0
  • TiKV 源碼解析系列文章(一)序

    摘要:而源碼解析系列文章則是會(huì)從源碼層面給大家抽絲剝繭,讓大家知道我們內(nèi)部到底是如何實(shí)現(xiàn)的。我們希望通過該源碼解析系列,能讓大家對有一個(gè)更深刻的理解。 作者:唐劉 TiKV 是一個(gè)支持事務(wù)的分布式 Key-Value 數(shù)據(jù)庫,有很多社區(qū)開發(fā)者基于 TiKV 來開發(fā)自己的應(yīng)用,譬如 titan、tidis。尤其是在 TiKV 成為 CNCF 的 Sandbox 項(xiàng)目之后,吸引了越來越多開發(fā)者的...

    LeviDing 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<