摘要:作者屈鵬本篇源碼解析將為大家介紹的另一周邊組件。這個(gè)函數(shù)會(huì)往完成隊(duì)列中注冊(cè)若干個(gè),相當(dāng)于用往一個(gè)中注冊(cè)一些事件的關(guān)注。在函數(shù)返回之后,服務(wù)端的初始化及啟動(dòng)過程便結(jié)束了。
作者:屈鵬
本篇 TiKV 源碼解析將為大家介紹 TiKV 的另一周邊組件—— grpc-rs。grpc-rs 是 PingCAP 實(shí)現(xiàn)的一個(gè) gRPC 的 Rust 綁定,其 Server/Client 端的代碼框架都基于 Future,事件驅(qū)動(dòng)的 EventLoop 被隱藏在了庫的內(nèi)部,所以非常易于使用。本文將以一個(gè)簡(jiǎn)單的 gRPC 服務(wù)作為例子,展示 grpc-rs 會(huì)生成的服務(wù)端代碼框架和需要服務(wù)的實(shí)現(xiàn)者填寫的內(nèi)容,然后會(huì)深入介紹服務(wù)器在啟動(dòng)時(shí)如何將后臺(tái)的事件循環(huán)與這個(gè)框架掛鉤,并在后臺(tái)線程中運(yùn)行實(shí)現(xiàn)者的代碼。
基本的代碼生成及服務(wù)端 APIgRPC 使用 protobuf 定義一個(gè)服務(wù),之后調(diào)用相關(guān)的代碼生成工具就可以生成服務(wù)端、客戶端的代碼框架了,這個(gè)過程可以參考我們的 官方文檔??蛻舳丝梢灾苯诱{(diào)用這些生成的代碼,向服務(wù)端發(fā)送請(qǐng)求并接收響應(yīng),而服務(wù)端則需要服務(wù)的實(shí)現(xiàn)者自己來定制對(duì)請(qǐng)求的處理邏輯,生成響應(yīng)并發(fā)回給客戶端。舉一個(gè)例子:
#[derive(Clone)] struct MyHelloService {} impl Hello for MyHelloService { // trait 中的函數(shù)簽名由 grpc-rs 生成,內(nèi)部實(shí)現(xiàn)需要用戶自己填寫 fn hello(&mut self, ctx: RpcContext, req: HelloRequest, sink: UnarySink) { let mut resp = HelloResponse::new(); resp.set_to(req.get_from()); ctx.spawn( sink.success(resp) .map(|_| println!("send hello response back success")) .map_err(|e| println!("send hello response back fail: {}", e)) ); } }
我們定義了一個(gè)名為 Hello 的服務(wù),里面只有一個(gè)名為 hello 的 RPC。grpc-rs 會(huì)為服務(wù)生成一個(gè) trait,里面的方法就是這個(gè)服務(wù)包含的所有 RPC。在這個(gè)例子中唯一的 RPC 中,我們從 HelloRequest 中拿到客戶端的名字,然后再將這個(gè)名字放到 HelloResponse 中發(fā)回去,非常簡(jiǎn)單,只是展示一下函數(shù)簽名中各個(gè)參數(shù)的用法。
然后,我們需要考慮的是如何把這個(gè)服務(wù)運(yùn)行起來,監(jiān)聽一個(gè)端口,真正能夠響應(yīng)客戶端的請(qǐng)求呢?下面的代碼片段展示了如何運(yùn)行這個(gè)服務(wù):
fn main() { // 創(chuàng)建一個(gè) Environment,里面包含一個(gè) Completion Queue let env = Arc::new(EnvBuilder::new().cq_count(4).build()); let channel_args = ChannelBuilder::new(env.clone()).build_args(); let my_service = MyHelloWorldService::new(); let mut server = ServerBuilder::new(env.clone()) // 使用 MyHelloWorldService 作為服務(wù)端的實(shí)現(xiàn),注冊(cè)到 gRPC server 中 .register_service(create_hello(my_service)) .bind("0.0.0.0", 44444) .channel_args(channel_args) .build() .unwrap(); server.start(); thread::park(); }
以上代碼展示了 grpc-rs 的足夠簡(jiǎn)潔的 API 接口,各行代碼的意義如其注釋所示。
Server 的創(chuàng)建和啟動(dòng)下面我們來看一下這個(gè) gRPC server 是如何接收客戶端的請(qǐng)求,并路由到我們實(shí)現(xiàn)的服務(wù)端代碼中進(jìn)行后續(xù)的處理的。
第一步我們初始化一個(gè) Environment,并設(shè)置 Completion Queue(完成隊(duì)列)的個(gè)數(shù)為 4 個(gè)。完成隊(duì)列是 gRPC 的一個(gè)核心概念,grpc-rs 為每一個(gè)完成隊(duì)列創(chuàng)建一個(gè)線程,并在線程中運(yùn)行一個(gè)事件循環(huán),類似于 Linux 網(wǎng)絡(luò)編程中不斷地調(diào)用 epoll_wait 來獲取事件,進(jìn)行處理:
// event loop fn poll_queue(cq: Arc) { let id = thread::current().id(); let cq = CompletionQueue::new(cq, id); loop { let e = cq.next(); match e.event_type { EventType::QueueShutdown => break, EventType::QueueTimeout => continue, EventType::OpComplete => {} } let tag: Box = unsafe { Box::from_raw(e.tag as _) }; tag.resolve(&cq, e.success != 0); } }
事件被封裝在 Tag 中。我們暫時(shí)忽略對(duì)事件的具體處理邏輯,目前我們只需要知道,當(dāng)這個(gè) Environment 被創(chuàng)建好之后,這些后臺(tái)線程便開始運(yùn)行了。那么剩下的任務(wù)就是監(jiān)聽一個(gè)端口,將網(wǎng)絡(luò)上的事件路由到這幾個(gè)事件循環(huán)中。這個(gè)過程在 Server 的 start 方法中:
/// Start the server. pub fn start(&mut self) { unsafe { grpc_sys::grpc_server_start(self.core.server); for cq in self.env.completion_queues() { let registry = self .handlers .iter() .map(|(k, v)| (k.to_owned(), v.box_clone())) .collect(); let rc = RequestCallContext { server: self.core.clone(), registry: Arc::new(UnsafeCell::new(registry)), }; for _ in 0..self.core.slots_per_cq { request_call(rc.clone(), cq); } } } }
首先調(diào)用 grpc_server_start 來啟動(dòng)這個(gè) Server,然后對(duì)每一個(gè)完成隊(duì)列,復(fù)制一份 handler 字典。這個(gè)字典的 key 是一個(gè)字符串,而 value 是一個(gè)函數(shù)指針,指向?qū)@個(gè)類型的請(qǐng)求的處理函數(shù)——其實(shí)就是前面所述的服務(wù)的具體實(shí)現(xiàn)邏輯。key 的構(gòu)造方式其實(shí)就是 /
接著我們創(chuàng)建一個(gè) RequestCallContext,然后對(duì)每個(gè)完成隊(duì)列調(diào)用幾次 request_call。這個(gè)函數(shù)會(huì)往完成隊(duì)列中注冊(cè)若干個(gè) Call,相當(dāng)于用 epoll_ctl 往一個(gè) epoll fd 中注冊(cè)一些事件的關(guān)注。Call 是 gRPC 在進(jìn)行遠(yuǎn)程過程調(diào)用時(shí)的基本單元,每一個(gè) RPC 在建立的時(shí)候都會(huì)從完成隊(duì)列里取出一個(gè) Call 對(duì)象,后者會(huì)在這個(gè) RPC 結(jié)束時(shí)被回收。因此,在 start 函數(shù)中每一個(gè)完成隊(duì)列上注冊(cè)的 Call 個(gè)數(shù)決定了這個(gè)完成隊(duì)列上可以并發(fā)地處理多少個(gè) RPC,在 grpc-rs 中默認(rèn)的值是 1024 個(gè)。
小結(jié)以上代碼基本都在 grpc-rs 倉庫中的 src/server.rs 文件中。在 start 函數(shù)返回之后,服務(wù)端的初始化及啟動(dòng)過程便結(jié)束了?,F(xiàn)在,可以快速地用幾句話回顧一下:首先創(chuàng)建一個(gè) Environment,內(nèi)部會(huì)為每一個(gè)完成隊(duì)列啟動(dòng)一個(gè)線程;接著創(chuàng)建 Server 對(duì)象,綁定端口,并將一個(gè)或多個(gè)服務(wù)注冊(cè)到這個(gè) Server 上;最后調(diào)用 Server 的 start 方法,將服務(wù)的具體實(shí)現(xiàn)關(guān)聯(lián)到若干個(gè) Call 上,并塞進(jìn)所有的完成隊(duì)列中。在這之后,網(wǎng)絡(luò)上新來的 RPC 請(qǐng)求便可以在后臺(tái)的事件循環(huán)中被取出,并根據(jù)具體實(shí)現(xiàn)的字典分別執(zhí)行了。最后,不要忘記 start 是一個(gè)非阻塞的方法,調(diào)用它的主線程在之后可以繼續(xù)執(zhí)行別的邏輯或者掛起。
本篇源碼解析就到這里,下篇關(guān)于 grpc-rs 的文章我們會(huì)進(jìn)一步介紹一個(gè) Call 或者 RPC 的生命周期,以及每一階段在 Server 端的完成隊(duì)列中對(duì)應(yīng)哪一種事件、會(huì)被如何處理,這一部分是 grpc-rs 的核心代碼,敬請(qǐng)期待!
原文鏈接:https://www.pingcap.com/blog-cn/tikv-source-code-reading-7/
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/18027.html
摘要:要說明的實(shí)現(xiàn),需要先介紹的運(yùn)行方式。封裝與實(shí)現(xiàn)細(xì)節(jié)通過上文的分析可以明顯看到,的通知機(jī)制其實(shí)和的通知機(jī)制非常類似。是用來處理上文提到的返回結(jié)果的。儲(chǔ)存的是和收到的消息。小結(jié)最后簡(jiǎn)要總結(jié)一下的封裝和實(shí)現(xiàn)過程。 作者: 李建俊 上一篇《gRPC Server 的初始化和啟動(dòng)流程》為大家介紹了 gRPC Server 的初始化和啟動(dòng)流程,本篇將帶大家深入到 grpc-rs 這個(gè)庫里,查看 R...
閱讀 2952·2021-10-14 09:50
閱讀 1266·2021-10-08 10:21
閱讀 3701·2021-10-08 10:16
閱讀 3107·2021-09-27 14:02
閱讀 3170·2021-09-23 11:21
閱讀 2213·2021-09-07 10:17
閱讀 435·2019-08-30 14:00
閱讀 2156·2019-08-29 17:26