摘要:本教程是藍(lán)圖系列的第三篇教程,對(duì)應(yīng)的版本為。提供了一個(gè)服務(wù)發(fā)現(xiàn)模塊用于發(fā)布和獲取服務(wù)記錄。前端此微服務(wù)的前端部分,目前已整合至組件中。監(jiān)視儀表板用于監(jiān)視微服務(wù)系統(tǒng)的狀態(tài)以及日志統(tǒng)計(jì)數(shù)據(jù)的查看。而服務(wù)則負(fù)責(zé)發(fā)布其它服務(wù)如服務(wù)或消息源并且部署。
本文章是 Vert.x 藍(lán)圖系列 的第三篇教程。全系列:
Vert.x Blueprint 系列教程(一) | 待辦事項(xiàng)服務(wù)開發(fā)教程
Vert.x Blueprint 系列教程(二) | 開發(fā)基于消息的應(yīng)用 - Vert.x Kue 教程
Vert.x Blueprint 系列教程(三) | Micro-Shop 微服務(wù)應(yīng)用實(shí)戰(zhàn)
本系列已發(fā)布至Vert.x官網(wǎng):Vert.x Blueprint Tutorials
前言歡迎回到Vert.x 藍(lán)圖系列!當(dāng)今,微服務(wù)架構(gòu) 變得越來越流行,開發(fā)者們都想嘗試一下微服務(wù)應(yīng)用的開發(fā)和架構(gòu)設(shè)計(jì)。令人激動(dòng)的是,Vert.x給我們提供了一系列用于微服務(wù)開發(fā)的組件,包括 Service Discovery (服務(wù)發(fā)現(xiàn))、Circuit Breaker (斷路器) 以及其它的一些組件。有了Vert.x微服務(wù)組件的幫助,我們就可以快速利用Vert.x搭建我們的微服務(wù)應(yīng)用。在這篇藍(lán)圖教程中,我們一起來探索一個(gè)利用Vert.x的各個(gè)組件開發(fā)的 Micro-Shop 微服務(wù)應(yīng)用~
通過本教程,你將會(huì)學(xué)習(xí)到以下內(nèi)容:
微服務(wù)架構(gòu)
如何利用Vert.x來開發(fā)微服務(wù)應(yīng)用
異步開發(fā)模式
響應(yīng)式、函數(shù)式編程
事件溯源 (Event Sourcing)
通過分布式 Event Bus 進(jìn)行異步RPC調(diào)用
各種各樣的服務(wù)類型(例如REST、數(shù)據(jù)源、Event Bus服務(wù)等)
如何使用服務(wù)發(fā)現(xiàn)模塊 (Vert.x Service Discovery)
如何使用斷路器模塊 (Vert.x Circuit Breaker)
如何利用Vert.x實(shí)現(xiàn)API Gateway
如何進(jìn)行權(quán)限認(rèn)證 (OAuth 2 + Keycloak)
如何配置及使用 SockJS - Event Bus Bridge
以及其它的一些東西。。。
本教程是 Vert.x 藍(lán)圖系列 的第三篇教程,對(duì)應(yīng)的Vert.x版本為 3.3.2 。本教程中的完整代碼已托管至 GitHub。
踏入微服務(wù)之門哈~你一定對(duì)“微服務(wù)”這個(gè)詞很熟悉——至少聽起來很熟悉~越來越多的開發(fā)者開始擁抱微服務(wù)架構(gòu),那么微服務(wù)究竟是什么呢?一句話總結(jié)一下:
Microservices are small, autonomous services that work together.
我們來深入一下微服務(wù)的各種特性,來看看微服務(wù)為何如此出色:
首先,微服務(wù)的重要的一點(diǎn)是“微”。每個(gè)微服務(wù)都是獨(dú)立的,每個(gè)多帶帶的微服務(wù)組件都注重某一特定的邏輯。在微服務(wù)架構(gòu)中,我們將傳統(tǒng)的單體應(yīng)用拆分成許多互相獨(dú)立的組件。每個(gè)組件都由其特定的“邏輯邊界”,因此組件不會(huì)過于龐大。不過話又說回來了,每個(gè)組件應(yīng)該有多小呢?這個(gè)問題可不好回答,它通常取決與我們的業(yè)務(wù)與負(fù)載。正如Sam Newman在其《Building
Microservices》書中所講的那樣:
We seem to have a very good sense of what is too big, and so it could be argued that once a piece of code no longer feels too big, it’s probably small enough.
因此,當(dāng)我們覺得每個(gè)組件不是特別大的時(shí)候,組件的大小可能就剛剛好。
在微服務(wù)架構(gòu)中,組件之間可以通過任意協(xié)議進(jìn)行通信,比如 HTTP 或 AMQP。
每個(gè)組件是獨(dú)立的,因此我們可以在不同的組件中使用不同的編程語(yǔ)言,不同的技術(shù) —— 這就是所謂的 polyglot support (不錯(cuò),Vert.x也是支持多語(yǔ)言的?。?/p>
每個(gè)組件都是獨(dú)立開發(fā)、部署以及發(fā)布的,所以這減少了部署及發(fā)布的難度。
微服務(wù)架構(gòu)通常與分布式系統(tǒng)形影不離,所以我們還需要考慮分布式系統(tǒng)中的方方面面,包括可用性、彈性以及可擴(kuò)展性。
微服務(wù)架構(gòu)通常被設(shè)計(jì)成為 面向失敗的,因?yàn)樵诜植际较到y(tǒng)中失敗的場(chǎng)景非常復(fù)雜,我們需要有效地處理失敗的手段。
雖然微服務(wù)有如此多的優(yōu)點(diǎn),但是不要忘了,微服務(wù)可不是銀彈,因?yàn)樗肓朔植际较到y(tǒng)中所帶來的各種問題,因此設(shè)計(jì)架構(gòu)時(shí)我們都要考慮這些情況。
服務(wù)發(fā)現(xiàn)在微服務(wù)架構(gòu)中,每個(gè)組件都是獨(dú)立的,它們都不知道其他組件的位置,但是組件之間又需要通信,因此我們必須知道各個(gè)組件的位置。然而,把位置信息寫死在代碼中顯然不好,因此我們需要一種機(jī)制可以動(dòng)態(tài)地記錄每個(gè)組件的位置 —— 這就是 服務(wù)發(fā)現(xiàn)。有了服務(wù)發(fā)現(xiàn)模塊,我們就可以將服務(wù)位置發(fā)布至服務(wù)發(fā)現(xiàn)模塊中,其它服務(wù)就可以從服務(wù)發(fā)現(xiàn)模塊中獲取想要調(diào)用的服務(wù)的位置并進(jìn)行調(diào)用。在調(diào)用服務(wù)的過程中,我們不需要知道對(duì)應(yīng)服務(wù)的位置,所以當(dāng)服務(wù)位置或環(huán)境變動(dòng)時(shí),服務(wù)調(diào)用可以不受影響,這使得我們的架構(gòu)更加靈活。
Vert.x提供了一個(gè)服務(wù)發(fā)現(xiàn)模塊用于發(fā)布和獲取服務(wù)記錄。在Vert.x 服務(wù)發(fā)現(xiàn)模塊,每個(gè)服務(wù)都被抽象成一個(gè)Record(服務(wù)記錄)。服務(wù)提供者可以向服務(wù)發(fā)現(xiàn)模塊中發(fā)布服務(wù),此時(shí)Record會(huì)根據(jù)底層ServiceDiscoveryBackend的配置存儲(chǔ)在本地Map、分布式Map或Redis中。服務(wù)消費(fèi)者可以從服務(wù)發(fā)現(xiàn)模塊中獲取服務(wù)記錄,并且通過服務(wù)記錄獲取對(duì)應(yīng)的服務(wù)實(shí)例然后進(jìn)行服務(wù)調(diào)用。目前Vert.x原生支持好幾種服務(wù)類型,比如 Event Bus 服務(wù)(即服務(wù)代理)、HTTP 端點(diǎn)、消息源 以及 數(shù)據(jù)源。當(dāng)然我們也可以實(shí)現(xiàn)自己的服務(wù)類型,可以參考相關(guān)的文檔。在后面我們還會(huì)詳細(xì)講述如何使用服務(wù)發(fā)現(xiàn)模塊,這里先簡(jiǎn)單做個(gè)了解。
異步的、響應(yīng)式的Vert.x異步與響應(yīng)式風(fēng)格都很適合微服務(wù)架構(gòu),而Vert.x兼具這兩種風(fēng)格!異步開發(fā)模式相信大家已經(jīng)了然于胸了,而如果大家讀過前幾篇藍(lán)圖教程的話,響應(yīng)式風(fēng)格大家一定不會(huì)陌生。有了基于Future以及基于RxJava的異步開發(fā)模式,我們可以隨心所欲地對(duì)異步過程進(jìn)行組合和變換,這樣代碼可以非常簡(jiǎn)潔,非常優(yōu)美!在本藍(lán)圖教程中,我們會(huì)見到大量基于Future和RxJava的異步方法。
Mirco Shop 微服務(wù)應(yīng)用好啦,現(xiàn)在大家應(yīng)該對(duì)微服務(wù)架構(gòu)有了一個(gè)大致的了解了,下面我們來講一下本藍(lán)圖中的微服務(wù)應(yīng)用。這是一個(gè)簡(jiǎn)單的 Micro-Shop 微服務(wù)應(yīng)用 (目前只完成了基本功能),人們可以進(jìn)行網(wǎng)上購(gòu)物以及交易。。。當(dāng)前版本的微服務(wù)應(yīng)用包含下列組件:
賬戶服務(wù):提供用戶賬戶的操作服務(wù),使用MySQL作為后端存儲(chǔ)。
商品服務(wù):提供商品的操作服務(wù),使用MySQL作為后端存儲(chǔ)。
庫(kù)存服務(wù):提供商品庫(kù)存的操作服務(wù),如查詢庫(kù)存、增加庫(kù)存即減少庫(kù)存。使用Redis作為后端存儲(chǔ)。
網(wǎng)店服務(wù):提供網(wǎng)店的操作即管理服務(wù),使用MongoDB作為后端存儲(chǔ)。
購(gòu)物車服務(wù):提供購(gòu)物車事件的生成以及購(gòu)物車操作(添加、刪除商品以及結(jié)算)服務(wù)。我們通過此服務(wù)來講述 事件溯源。
訂單服務(wù):訂單服務(wù)從Event Bus接收購(gòu)物車服務(wù)發(fā)送的訂單請(qǐng)求,接著處理訂單并將訂單發(fā)送至下層服務(wù)(本例中僅僅簡(jiǎn)單地存儲(chǔ)至數(shù)據(jù)庫(kù)中)。
Micro Shop 前端:此微服務(wù)的前端部分(SPA),目前已整合至API Gateway組件中。
監(jiān)視儀表板:用于監(jiān)視微服務(wù)系統(tǒng)的狀態(tài)以及日志、統(tǒng)計(jì)數(shù)據(jù)的查看。
API Gateway:整個(gè)微服務(wù)的入口,它負(fù)責(zé)將收到的請(qǐng)求按照一定的規(guī)則分發(fā)至對(duì)應(yīng)的組件的REST端點(diǎn)中(相當(dāng)于反向代理)。它也負(fù)責(zé)權(quán)限認(rèn)證與管理,負(fù)載均衡,心跳檢測(cè)以及失敗處理(使用Vert.x Circuit Breaker)。
Micro Shop 微服務(wù)架構(gòu)我們來看一下Micro Shop微服務(wù)應(yīng)用的架構(gòu):
用戶請(qǐng)求首先經(jīng)過API Gateway,再經(jīng)其處理并分發(fā)至對(duì)應(yīng)的業(yè)務(wù)端點(diǎn)。
我們?cè)賮砜匆幌旅總€(gè)基礎(chǔ)組件內(nèi)部的結(jié)構(gòu)(基礎(chǔ)組件即圖中最下面的各個(gè)業(yè)務(wù)組件)。
組件結(jié)構(gòu)每個(gè)基礎(chǔ)組件至少有兩個(gè)Verticle:服務(wù)Verticle以及REST Verticle。REST Vertice提供了服務(wù)對(duì)應(yīng)的REST端點(diǎn),并且也負(fù)責(zé)將此端點(diǎn)發(fā)布至服務(wù)發(fā)現(xiàn)層。而服務(wù)Verticle則負(fù)責(zé)發(fā)布其它服務(wù)(如Event Bus服務(wù)或消息源)并且部署REST Verticle。
每個(gè)基礎(chǔ)組件中都包含對(duì)應(yīng)的服務(wù)接口,如商品組件中包含ProductService接口。這些服務(wù)接口都是Event Bus 服務(wù),由@ProxyGen注解修飾。上篇藍(lán)圖教程中我們講過,Vert.x Service Proxy可以自動(dòng)為@ProxyGen注解修飾的接口生成服務(wù)代理類,因此我們可以很方便地在Event Bus上進(jìn)行異步RPC調(diào)用而不用寫額外的代碼。很酷吧!并且有了服務(wù)發(fā)現(xiàn)組件以后,我們可以非常方便地將Event Bus服務(wù)發(fā)布至服務(wù)發(fā)現(xiàn)層,這樣其它組件可以更方便地調(diào)用服務(wù)。
組件之間的通信我們先來看一下我們的微服務(wù)應(yīng)用中用到的服務(wù)類型:
HTTP端點(diǎn) (e.g. REST 端點(diǎn)以及API Gateway) - 此服務(wù)的位置用URL描述
Event Bus服務(wù) - 此服務(wù)的位置用Event Bus上的一個(gè)特定地址描述
事件源 - 事件源服務(wù)對(duì)應(yīng)Event Bus上某個(gè)地址的事件消費(fèi)者。此服務(wù)的位置用Event Bus上的一個(gè)特定地址描述
因此,我們各個(gè)組件之間可以通過HTTP以及Event Bus(本質(zhì)是TCP)進(jìn)行通信,例如:
API Gateway與其它組件通過HTTP進(jìn)行通信。
讓我們開始吧!好啦,現(xiàn)在開始我們的微服務(wù)藍(lán)圖旅程吧!首先我們從GitHub上clone項(xiàng)目:
git clone https://github.com/sczyh30/vertx-blueprint-microservice.git
在本藍(lán)圖教程中,我們使用 Maven 作為構(gòu)建工具。我們首先來看一下pom.xml配置文件。我們可以看到,我們的藍(lán)圖應(yīng)用由許多模塊構(gòu)成:
microservice-blueprint-common account-microservice product-microservice inventory-microservice store-microservice shopping-cart-microservice order-microservice api-gateway cache-infrastructure monitor-dashboard
每個(gè)模塊代表一個(gè)組件??粗渲梦募坪跤胁簧俳M件呢!不要擔(dān)心,我們將會(huì)一一探究這些組件。下面我們先來看一下所有組件的基礎(chǔ)模塊 - microservice-blueprint-common。
微服務(wù)基礎(chǔ)模塊microservice-blueprint-common模塊提供了一些微服務(wù)功能相關(guān)的輔助類以及輔助Verticle。我們先來看一下兩個(gè)base verticles - BaseMicroserviceVerticle 和 RestAPIVerticle。
Base Microservice VerticleBaseMicroserviceVerticle提供了與微服務(wù)相關(guān)的初始化函數(shù)以及各種各樣的輔助函數(shù)。其它每一個(gè)Verticle都會(huì)繼承此Verticle,因此這個(gè)基礎(chǔ)Verticle非常重要。
首先我們來看一下其中的成員變量:
protected ServiceDiscovery discovery; protected CircuitBreaker circuitBreaker; protected SetregisteredRecords = new ConcurrentHashSet<>();
discovery以及circuitBreaker分別代表服務(wù)發(fā)現(xiàn)實(shí)例以及斷路器實(shí)例,而registeredRecords代表當(dāng)前已發(fā)布的服務(wù)記錄的集合,用于在結(jié)束Verticle時(shí)注銷服務(wù)。
start函數(shù)中主要是對(duì)服務(wù)發(fā)現(xiàn)實(shí)例和斷路器實(shí)例進(jìn)行初始化,配置文件從config()中獲取。它的實(shí)現(xiàn)非常簡(jiǎn)單:
@Override public void start() throws Exception { // init service discovery instance discovery = ServiceDiscovery.create(vertx, new ServiceDiscoveryOptions().setBackendConfiguration(config())); // init circuit breaker instance JsonObject cbOptions = config().getJsonObject("circuit-breaker") != null ? config().getJsonObject("circuit-breaker") : new JsonObject(); circuitBreaker = CircuitBreaker.create(cbOptions.getString("name", "circuit-breaker"), vertx, new CircuitBreakerOptions() .setMaxFailures(cbOptions.getInteger("max-failures", 5)) .setTimeout(cbOptions.getLong("timeout", 10000L)) .setFallbackOnFailure(true) .setResetTimeout(cbOptions.getLong("reset-timeout", 30000L)) ); }
下面我們還提供了幾個(gè)輔助函數(shù)用于發(fā)布各種各樣的服務(wù)。這些函數(shù)都是異步的,并且基于Future:
protected FuturepublishHttpEndpoint(String name, String host, int port) { Record record = HttpEndpoint.createRecord(name, host, port, "/", new JsonObject().put("api.name", config().getString("api.name", "")) ); return publish(record); } protected Future publishMessageSource(String name, String address) { Record record = MessageSource.createRecord(name, address); return publish(record); } protected Future publishJDBCDataSource(String name, JsonObject location) { Record record = JDBCDataSource.createRecord(name, location, new JsonObject()); return publish(record); } protected Future publishEventBusService(String name, String address, Class serviceClass) { Record record = EventBusService.createRecord(name, address, serviceClass); return publish(record); }
之前我們提到過,每個(gè)服務(wù)記錄Record代表一個(gè)服務(wù),其中服務(wù)類型由記錄中的type字段標(biāo)識(shí)。Vert.x原生支持的各種服務(wù)接口中都包含著好幾個(gè)createRecord方法因此我們可以利用這些方法來方便地創(chuàng)建服務(wù)記錄。通常情況下我們需要給每個(gè)服務(wù)都指定一個(gè)name,這樣之后我們就可以通過名稱來獲取服務(wù)了。我們還可以通過setMetadata方法來給服務(wù)記錄添加額外的元數(shù)據(jù)。
你可能注意到在publishHttpEndpoint方法中我們就提供了含有api-name的元數(shù)據(jù),之后我們會(huì)了解到,API Gateway在進(jìn)行反向代理時(shí)會(huì)用到它。
下面我們來看一下發(fā)布服務(wù)的通用方法 —— publish方法:
private Futurepublish(Record record) { Future future = Future.future(); // publish the service discovery.publish(record, ar -> { if (ar.succeeded()) { registeredRecords.add(record); logger.info("Service <" + ar.result().getName() + "> published"); future.complete(); } else { future.fail(ar.cause()); } }); return future; }
在publish方法中,我們調(diào)用了服務(wù)發(fā)現(xiàn)實(shí)例discovery的publish方法來將服務(wù)發(fā)布至服務(wù)發(fā)現(xiàn)模塊。它同樣也是一個(gè)異步方法,當(dāng)發(fā)布成功時(shí),我們將此服務(wù)記錄存儲(chǔ)至registeredRecords中,輸出日志然后通知future操作已完成。最后返回對(duì)應(yīng)的future。
注意,在Vert.x Service Discovery當(dāng)前版本(3.3.2)的設(shè)計(jì)中,服務(wù)發(fā)布者需要在必要時(shí)手動(dòng)注銷服務(wù),因此當(dāng)Verticle結(jié)束時(shí),我們需要將注冊(cè)的服務(wù)都注銷掉:
@Override public void stop(Futurefuture) throws Exception { // In current design, the publisher is responsible for removing the service List futures = new ArrayList<>(); for (Record record : registeredRecords) { Future unregistrationFuture = Future.future(); futures.add(unregistrationFuture); discovery.unpublish(record.getRegistration(), unregistrationFuture.completer()); } if (futures.isEmpty()) { discovery.close(); future.complete(); } else { CompositeFuture.all(futures) .setHandler(ar -> { discovery.close(); if (ar.failed()) { future.fail(ar.cause()); } else { future.complete(); } }); } }
在stop方法中,我們遍歷registeredRecords集合并且嘗試注銷每一個(gè)服務(wù),并將異步結(jié)果future添加至futures列表中。之后我們調(diào)用CompositeFuture.all(futures)來依次獲取每個(gè)異步結(jié)果的狀態(tài)。all方法返回一個(gè)組合的Future,當(dāng)列表中的所有Future都成功賦值時(shí)方為成功狀態(tài),反之只要有一個(gè)異步結(jié)果失敗,它就為失敗狀態(tài)。因此,我們給它綁定一個(gè)Handler,當(dāng)所有服務(wù)都被注銷時(shí),服務(wù)發(fā)現(xiàn)模塊就可以安全地關(guān)閉了,否則結(jié)束函數(shù)會(huì)失敗。
REST API VerticleRestAPIVerticle抽象類繼承了BaseMicroserviceVerticle抽象類。從名字上就可以看出,它提供了諸多的用于REST API開發(fā)的輔助方法。我們?cè)谄渲蟹庋b了諸如創(chuàng)建服務(wù)端、開啟Cookie和Session支持,開啟心跳檢測(cè)支持(通過HTTP),各種各樣的路由處理封裝以及用于權(quán)限驗(yàn)證的路由處理器。在之后的章節(jié)中我們將會(huì)見到這些方法。
好啦,現(xiàn)在我們已經(jīng)了解了整個(gè)藍(lán)圖應(yīng)用中的兩個(gè)基礎(chǔ)Verticle,下面是時(shí)候探索各個(gè)模塊了!在探索邏輯組件之前,我們先來看一下其中最重要的組件之一 —— API Gateway。
API Gateway我們把API Gateway的內(nèi)容多帶帶歸為一篇教程,請(qǐng)見:Vert.x 藍(lán)圖 - Micro Shop 微服務(wù)實(shí)戰(zhàn) (API Gateway篇)。
Event Bus 服務(wù) - 賬戶、網(wǎng)店及商品服務(wù) 在Event Bus上進(jìn)行異步RPC在之前的 Vert.x Kue 藍(lán)圖教程 中我們已經(jīng)介紹過Vert.x中的異步RPC(也叫服務(wù)代理)了,這里我們?cè)賮砘仡櫼幌拢⑶艺f一說如何利用服務(wù)發(fā)現(xiàn)模塊更方便地進(jìn)行異步RPC。
傳統(tǒng)的RPC有一個(gè)缺點(diǎn):消費(fèi)者需要阻塞等待生產(chǎn)者的回應(yīng)。這是一種阻塞模型,和Vert.x推崇的異步開發(fā)模式不相符。并且,傳統(tǒng)的RPC不是真正面向失敗設(shè)計(jì)的。還好,Vert.x提供了一種高效的、響應(yīng)式的RPC —— 異步RPC。我們不需要等待生產(chǎn)者的回應(yīng),而只需要傳遞一個(gè)Handler
Vert.x Service Proxy(服務(wù)代理組件)可以自動(dòng)處理含有@ProxyGen注解的服務(wù)接口,生成相應(yīng)的服務(wù)代理類。生成的服務(wù)代理類可以幫我們將數(shù)據(jù)封裝好后發(fā)送至Event Bus、從Event Bus接收數(shù)據(jù),以及對(duì)數(shù)據(jù)進(jìn)行編碼和解碼,因此我們可以省掉不少代碼。我們需要做的就是遵循@ProxyGen注解的一些限定。
比如,這里有一個(gè)Event Bus服務(wù)接口:
@ProxyGen public interface MyService { @Fluent MyService retrieveData(String id, Handler> resultHandler); }
我們可以通過Vert.x Service Proxy組件生成對(duì)應(yīng)的代理類。然后我們就可以通過ProxyHelper類的registerService方法將此服務(wù)注冊(cè)至Event Bus上:
MyService myService = MyService.createService(vertx, config); ProxyHelper.registerService(MyService.class, vertx, myService, SERVICE_ADDRESS);
有了服務(wù)發(fā)現(xiàn)組件之后,將服務(wù)發(fā)布至服務(wù)發(fā)現(xiàn)層就非常容易了。比如在我們的藍(lán)圖應(yīng)用中我們使用封裝好的方法:
publishEventBusService(SERVICE_NAME, SERVICE_ADDRESS, MyService.class)
OK,現(xiàn)在服務(wù)已經(jīng)成功地發(fā)布至服務(wù)發(fā)現(xiàn)模塊?,F(xiàn)在我們就可以通過EventBusService接口的getProxy方法來從服務(wù)發(fā)現(xiàn)層獲取發(fā)布的Event Bus服務(wù),并且像調(diào)用普通異步方法那樣進(jìn)行異步RPC:
EventBusService.幾個(gè)服務(wù)模塊的通用特性getProxy(discovery, new JsonObject().put("name", SERVICE_NAME), ar -> { if (ar.succeeded()) { MyService myService = ar.result(); myService.retrieveData(...); } });
在我們的Micro Shop微服務(wù)應(yīng)用中,賬戶、網(wǎng)店及商品服務(wù)有幾個(gè)通用的特性及約定?,F(xiàn)在我們來解釋一下。
在這三個(gè)模塊中,每個(gè)模塊都包含:
一個(gè)Event Bus服務(wù)接口。此服務(wù)接口定義了對(duì)實(shí)體存儲(chǔ)的各種操作
服務(wù)接口的實(shí)現(xiàn)
REST API Verticle,用于創(chuàng)建服務(wù)端并將其發(fā)布至服務(wù)發(fā)現(xiàn)模塊
Main Verticle,用于部署其它的verticles以及將Event Bus服務(wù)和消息源發(fā)布至服務(wù)發(fā)現(xiàn)層
其中,用戶賬戶服務(wù)以及商品服務(wù)都使用 MySQL 作為后端存儲(chǔ),而網(wǎng)店服務(wù)則以 MongoDB 作為后端存儲(chǔ)。這里我們只挑兩個(gè)典型的服務(wù)介紹如何通過Vert.x操作不同的數(shù)據(jù)庫(kù):product-microservice和store-microservice。account-microservice的實(shí)現(xiàn)與product-microservice非常相似,大家可以查閱 GitHub 上的代碼。
基于MySQL的商品服務(wù)商品微服務(wù)模塊提供了商品的操作功能,包括添加、查詢(搜索)、刪除與更新商品等。其中最重要的是ProductService服務(wù)接口以及其實(shí)現(xiàn)了。我們先來看一下此服務(wù)接口的定義:
@VertxGen @ProxyGen public interface ProductService { /** * The name of the event bus service. */ String SERVICE_NAME = "product-eb-service"; /** * The address on which the service is published. */ String SERVICE_ADDRESS = "service.product"; /** * Initialize the persistence. */ @Fluent ProductService initializePersistence(Handler> resultHandler); /** * Add a product to the persistence. */ @Fluent ProductService addProduct(Product product, Handler > resultHandler); /** * Retrieve the product with certain `productId`. */ @Fluent ProductService retrieveProduct(String productId, Handler > resultHandler); /** * Retrieve the product price with certain `productId`. */ @Fluent ProductService retrieveProductPrice(String productId, Handler > resultHandler); /** * Retrieve all products. */ @Fluent ProductService retrieveAllProducts(Handler >> resultHandler); /** * Retrieve products by page. */ @Fluent ProductService retrieveProductsByPage(int page, Handler >> resultHandler); /** * Delete a product from the persistence */ @Fluent ProductService deleteProduct(String productId, Handler > resultHandler); /** * Delete all products from the persistence */ @Fluent ProductService deleteAllProducts(Handler > resultHandler); }
正如我們之前所提到的那樣,這個(gè)服務(wù)接口是一個(gè)Event Bus服務(wù),所以我們需要給它加上@ProxyGen注解。這些方法都是異步的,因此每個(gè)方法都需要接受一個(gè)Handler
每個(gè)方法的含義都在注釋中給出了,這里就不解釋了。
商品服務(wù)接口的實(shí)現(xiàn)位于ProductServiceImpl類中。商品信息存儲(chǔ)在MySQL中,因此我們可以通過 Vert.x-JDBC 對(duì)數(shù)據(jù)庫(kù)進(jìn)行操作。我們?cè)?第一篇藍(lán)圖教程 中已經(jīng)詳細(xì)講述過Vert.x JDBC的使用細(xì)節(jié)了,因此這里我們就不過多地討論細(xì)節(jié)了。這里我們只關(guān)注如何減少代碼量。因?yàn)橥ǔ:?jiǎn)單數(shù)據(jù)庫(kù)操作的過程都是千篇一律的,因此做個(gè)封裝是很有必要的。
首先來回顧一下每次數(shù)據(jù)庫(kù)操作的過程:
從JDBCClient中獲取數(shù)據(jù)庫(kù)連接SQLConnection,這是一個(gè)異步過程
執(zhí)行SQL語(yǔ)句,綁定回調(diào)Handler
最后不要忘記關(guān)閉數(shù)據(jù)庫(kù)連接以釋放資源
對(duì)于正常的CRUD操作來說,它們的實(shí)現(xiàn)都很相似,因此我們封裝了一個(gè)JdbcRepositoryWrapper類來實(shí)現(xiàn)這些通用邏輯。它位于io.vertx.blueprint.microservice.common.service包中:
我們提供了以下的封裝方法:
executeNoResult: 執(zhí)行含參數(shù)的SQL語(yǔ)句 (通過updateWithParams方法)。執(zhí)行結(jié)果是不需要的,因此只需要接受一個(gè) Handler
retrieveOne: 執(zhí)行含參數(shù)的SQL語(yǔ)句,用于獲取某一特定實(shí)體(通過 queryWithParams方法)。此方法是基于Future的,它返回一個(gè)Future
retrieveMany: 獲取多個(gè)實(shí)體,返回Future>
作為異步結(jié)果。
retrieveByPage: 與retrieveMany 方法相似,但包含分頁(yè)邏輯。
retrieveAll: similar to retrieveMany method but does not require query parameters as it simply executes statement such as SELECT * FROM xx_table.
removeOne and removeAll: remove entity from the database.
當(dāng)然這與Spring JPA相比的不足之處在于SQL語(yǔ)句得自己寫,自己封裝也不是很方便。。。考慮到Vert.x JDBC底層也只是使用了Worker線程池包裝了原生的JDBC(不是真正的異步),我們也可以結(jié)合Spring Data的相關(guān)組件來簡(jiǎn)化開發(fā)。另外,Vert.x JDBC使用C3P0作為默認(rèn)的數(shù)據(jù)庫(kù)連接池,C3P0的性能我想大家應(yīng)該都懂。。。因此換成性能更優(yōu)的HikariCP是很有必要的。
回到JdbcRepositoryWrapper中來。這層封裝可以大大地減少代碼量。比如,我們的ProductServiceImpl實(shí)現(xiàn)類就可以繼承JdbcRepositoryWrapper類,然后利用這些封裝好的方法??磦€(gè)例子 —— retrieveProduct方法的實(shí)現(xiàn):
@Override public ProductService retrieveProduct(String productId, Handler> resultHandler) { this.retrieveOne(productId, FETCH_STATEMENT) .map(option -> option.map(Product::new).orElse(null)) .setHandler(resultHandler); return this; }
我們唯一需要做的只是將結(jié)果變換成需要的類型。是不是很方便呢?
當(dāng)然這不是唯一方法。在下面的章節(jié)中,我們將會(huì)講到一種更Reactive,更Functional的方法 —— 利用Rx版本的Vert.x JDBC。另外,用vertx-sync也是一種不錯(cuò)的選擇(類似于async/await)。
好啦!看完服務(wù)實(shí)現(xiàn),下面輪到REST API了。我們來看看RestProductAPIVerticle的實(shí)現(xiàn):
public class RestProductAPIVerticle extends RestAPIVerticle { public static final String SERVICE_NAME = "product-rest-api"; private static final String API_ADD = "/add"; private static final String API_RETRIEVE = "/:productId"; private static final String API_RETRIEVE_BY_PAGE = "/products"; private static final String API_RETRIEVE_PRICE = "/:productId/price"; private static final String API_RETRIEVE_ALL = "/products"; private static final String API_DELETE = "/:productId"; private static final String API_DELETE_ALL = "/all"; private final ProductService service; public RestProductAPIVerticle(ProductService service) { this.service = service; } @Override public void start(Futurefuture) throws Exception { super.start(); final Router router = Router.router(vertx); // body handler router.route().handler(BodyHandler.create()); // API route handler router.post(API_ADD).handler(this::apiAdd); router.get(API_RETRIEVE).handler(this::apiRetrieve); router.get(API_RETRIEVE_BY_PAGE).handler(this::apiRetrieveByPage); router.get(API_RETRIEVE_PRICE).handler(this::apiRetrievePrice); router.get(API_RETRIEVE_ALL).handler(this::apiRetrieveAll); router.patch(API_UPDATE).handler(this::apiUpdate); router.delete(API_DELETE).handler(this::apiDelete); router.delete(API_DELETE_ALL).handler(context -> requireLogin(context, this::apiDeleteAll)); enableHeartbeatCheck(router, config()); // get HTTP host and port from configuration, or use default value String host = config().getString("product.http.address", "0.0.0.0"); int port = config().getInteger("product.http.port", 8082); // create HTTP server and publish REST service createHttpServer(router, host, port) .compose(serverCreated -> publishHttpEndpoint(SERVICE_NAME, host, port)) .setHandler(future.completer()); } private void apiAdd(RoutingContext context) { try { Product product = new Product(new JsonObject(context.getBodyAsString())); service.addProduct(product, resultHandler(context, r -> { String result = new JsonObject().put("message", "product_added") .put("productId", product.getProductId()) .encodePrettily(); context.response().setStatusCode(201) .putHeader("content-type", "application/json") .end(result); })); } catch (DecodeException e) { badRequest(context, e); } } private void apiRetrieve(RoutingContext context) { String productId = context.request().getParam("productId"); service.retrieveProduct(productId, resultHandlerNonEmpty(context)); } private void apiRetrievePrice(RoutingContext context) { String productId = context.request().getParam("productId"); service.retrieveProductPrice(productId, resultHandlerNonEmpty(context)); } private void apiRetrieveByPage(RoutingContext context) { try { String p = context.request().getParam("p"); int page = p == null ? 1 : Integer.parseInt(p); service.retrieveProductsByPage(page, resultHandler(context, Json::encodePrettily)); } catch (Exception ex) { badRequest(context, ex); } } private void apiRetrieveAll(RoutingContext context) { service.retrieveAllProducts(resultHandler(context, Json::encodePrettily)); } private void apiDelete(RoutingContext context) { String productId = context.request().getParam("productId"); service.deleteProduct(productId, deleteResultHandler(context)); } private void apiDeleteAll(RoutingContext context, JsonObject principle) { service.deleteAllProducts(deleteResultHandler(context)); } }
此Verticle繼承了RestAPIVerticle,因此我們可以利用其中諸多的輔助方法。首先來看一下啟動(dòng)過程,即start方法。首先我們先調(diào)用super.start()來初始化服務(wù)發(fā)現(xiàn)組件,然后創(chuàng)建Router,綁定BodyHandler以便操作請(qǐng)求正文,然后創(chuàng)建各個(gè)API路由并綁定相應(yīng)的處理函數(shù)。接著我們調(diào)用enableHeartbeatCheck方法開啟簡(jiǎn)單的心跳檢測(cè)支持。最后我們通過封裝好的createHttpServer創(chuàng)建HTTP服務(wù)端,并通過publishHttpEndpoint方法將HTTP端點(diǎn)發(fā)布至服務(wù)發(fā)現(xiàn)模塊。
其中createHttpServer方法非常簡(jiǎn)單,我們只是把vertx.createHttpServer方法變成了基于Future的:
protected FuturecreateHttpServer(Router router, String host, int port) { Future httpServerFuture = Future.future(); vertx.createHttpServer() .requestHandler(router::accept) .listen(port, host, httpServerFuture.completer()); return httpServerFuture.map(r -> null); }
至于各個(gè)路由處理邏輯如何實(shí)現(xiàn),可以參考 Vert.x Blueprint - Todo Backend Tutorial 獲取相信信息。
最后我們打開此微服務(wù)模塊中的Main Verticle - ProductVerticle類。正如我們之前所提到的,它負(fù)責(zé)發(fā)布服務(wù)以及部署REST Verticle。我們來看一下其start方法:
@Override public void start(Futurefuture) throws Exception { super.start(); // create the service instance ProductService productService = new ProductServiceImpl(vertx, config()); // (1) // register the service proxy on event bus ProxyHelper.registerService(ProductService.class, vertx, productService, SERVICE_ADDRESS); // (2) // publish the service in the discovery infrastructure initProductDatabase(productService) // (3) .compose(databaseOkay -> publishEventBusService(ProductService.SERVICE_NAME, SERVICE_ADDRESS, ProductService.class)) // (4) .compose(servicePublished -> deployRestService(productService)) // (5) .setHandler(future.completer()); // (6) }
首先我們創(chuàng)建一個(gè)ProductService服務(wù)實(shí)例(1),然后通過registerService方法將服務(wù)注冊(cè)至Event Bus(2)。接著我們初始化數(shù)據(jù)庫(kù)表(3),將商品服務(wù)發(fā)布至服務(wù)發(fā)現(xiàn)層(4)然后部署REST Verticle(5)。這是一系列的異步方法的組合操作,很溜吧!最后我們將future.completer()綁定至組合后的Future上,這樣當(dāng)所有異步操作都OK的時(shí)候,Future會(huì)自動(dòng)完成。
當(dāng)然,不要忘記在配置里指定api.name。之前我們?cè)?API Gateway章節(jié) 提到過,API Gateway的反向代理部分就是通過對(duì)應(yīng)REST服務(wù)的 api.name 來進(jìn)行請(qǐng)求分發(fā)的。默認(rèn)情況下api.name為product:
{ "api.name": "product" }基于MongoDB的網(wǎng)店服務(wù)
網(wǎng)店服務(wù)用于網(wǎng)店的操作,如開店、關(guān)閉、更新數(shù)據(jù)。正常情況下,開店都需要人工申請(qǐng),不過在本藍(lán)圖教程中,我們把這一步簡(jiǎn)化掉了。網(wǎng)店服務(wù)模塊的結(jié)構(gòu)和商品服務(wù)模塊非常相似,所以我們就不細(xì)說了。我們這里僅僅瞅一瞅如何使用Vert.x Mongo Client。
使用Vert.x Mongo Client非常簡(jiǎn)單,首先我們需要?jiǎng)?chuàng)建一個(gè)MongoClient實(shí)例,過程類似于JDBCClient:
private final MongoClient client; public StoreCRUDServiceImpl(Vertx vertx, JsonObject config) { this.client = MongoClient.createNonShared(vertx, config); }
然后我們就可以通過它來操作Mongo了。比如我們想執(zhí)行存儲(chǔ)(save)操作,我們可以這樣寫:
@Override public void saveStore(Store store, Handler> resultHandler) { client.save(COLLECTION, new JsonObject().put("_id", store.getSellerId()) .put("name", store.getName()) .put("description", store.getDescription()) .put("openTime", store.getOpenTime()), ar -> { if (ar.succeeded()) { resultHandler.handle(Future.succeededFuture()); } else { resultHandler.handle(Future.failedFuture(ar.cause())); } } ); }
這些操作都是異步的,因此你一定非常熟悉這種模式!當(dāng)然如果不喜歡基于回調(diào)的異步模式的話,你也可以選擇Rx版本的API~
更多關(guān)于Vert.x Mongo Client的使用細(xì)節(jié),請(qǐng)參考官方文檔。
基于Redis的商品庫(kù)存服務(wù)商品庫(kù)存服務(wù)負(fù)責(zé)操作商品的庫(kù)存數(shù)量,比如添加庫(kù)存、減少庫(kù)存以及獲取當(dāng)前庫(kù)存數(shù)量。庫(kù)存使用Redis來存儲(chǔ)。
與之前的Event Bus服務(wù)不同,我們這里的商品庫(kù)存服務(wù)是基于Future的,而不是基于回調(diào)的。由于服務(wù)代理模塊不支持處理基于Future的服務(wù)接口,因此這里我們就不用異步RPC了,只發(fā)布一個(gè)REST API端點(diǎn),所有的調(diào)用都通過REST進(jìn)行。
首先來看一下InventoryService服務(wù)接口:
public interface InventoryService { /** * Create a new inventory service instance. * * @param vertx Vertx instance * @param config configuration object * @return a new inventory service instance */ static InventoryService createService(Vertx vertx, JsonObject config) { return new InventoryServiceImpl(vertx, config); } /** * Increase the inventory amount of a certain product. * * @param productId the id of the product * @param increase increase amount * @return the asynchronous result of current amount */ Futureincrease(String productId, int increase); /** * Decrease the inventory amount of a certain product. * * @param productId the id of the product * @param decrease decrease amount * @return the asynchronous result of current amount */ Future decrease(String productId, int decrease); /** * Retrieve the inventory amount of a certain product. * * @param productId the id of the product * @return the asynchronous result of current amount */ Future retrieveInventoryForProduct(String productId); }
接口定義非常簡(jiǎn)單,含義都在注釋中給出了。接著我們?cè)倏匆幌路?wù)的實(shí)現(xiàn)類InventoryServiceImpl類。在Redis中,所有的庫(kù)存數(shù)量都被存儲(chǔ)在inventory:v1命名空間中,并以商品號(hào)productId作為標(biāo)識(shí)。比如商品A123456會(huì)被存儲(chǔ)至inventory:v1:A123456鍵值對(duì)中。
Vert.x Redis提供了incrby和decrby命令,可以很方便地實(shí)現(xiàn)庫(kù)存增加和減少功能,代碼類似。這里我們只看庫(kù)存增加功能:
@Override public Futureincrease(String productId, int increase) { Future future = Future.future(); client.incrby(PREFIX + productId, increase, future.completer()); return future.map(Long::intValue); }
由于庫(kù)存數(shù)量不會(huì)非常大,Integer就足夠了,因此我們需要通過Long::intValue方法引用來將Long結(jié)果變換成Integer類型的。
retrieveInventoryForProduct方法的實(shí)現(xiàn)也非常短小精悍:
@Override public FutureretrieveInventoryForProduct(String productId) { Future future = Future.future(); client.get(PREFIX + productId, future.completer()); return future.map(r -> r == null ? "0" : r) .map(Integer::valueOf); }
我們通過get命令來獲取值。由于結(jié)果是String類型的,因此我們需要自行將其轉(zhuǎn)換為Integer類型。如果結(jié)果為空,我們就認(rèn)為商品沒有庫(kù)存,返回0。
至于REST Verticle(在此模塊中也為Main Verticle),其實(shí)現(xiàn)模式與前面的大同小異,這里就不展開說了。不要忘記在config.json中指定api.name:
{ "api.name": "inventory", "redis.host": "redis", "inventory.http.address": "inventory-microservice", "inventory.http.port": 8086 }事件溯源 - 購(gòu)物車服務(wù)
好了,現(xiàn)在我們與基礎(chǔ)服務(wù)模塊告一段落了。下面我們來到了另一個(gè)重要的服務(wù)模塊 —— 購(gòu)物車微服務(wù)。此模塊負(fù)責(zé)購(gòu)物車的獲取、購(gòu)物車事件的添加以及結(jié)算功能。與傳統(tǒng)的實(shí)現(xiàn)不同,這里我們要介紹一種不同的開發(fā)模式 —— 事件溯源(Event Sourcing)。
解道Event Sourcing在傳統(tǒng)的數(shù)據(jù)存儲(chǔ)模式中,我們通常直接將數(shù)據(jù)本身的狀態(tài)存儲(chǔ)至數(shù)據(jù)庫(kù)中。這在一般場(chǎng)景中是沒有問題的,但有些時(shí)候,我們不僅想獲取到數(shù)據(jù),還想獲取數(shù)據(jù)操作的過程(即此數(shù)據(jù)是經(jīng)過怎樣的操作生成的),這時(shí)候我們就可以利用事件溯源(Event Sourcing)來解決這個(gè)問題。
事件溯源保證了數(shù)據(jù)狀態(tài)的變換都以一系列的事件的形式存儲(chǔ)在數(shù)據(jù)庫(kù)中。所以,我們不僅可以獲取每個(gè)變換的事件,而且可以通過過去的事件來組合出過去任意時(shí)刻的數(shù)據(jù)狀態(tài)!這真是極好的~注意,有一點(diǎn)很重要,我們不能更改已經(jīng)保存的事件以及它們的序列 —— 也就是說,事件存儲(chǔ)是只能添加而不能刪除的,并且需要不可變。是不是感覺和數(shù)據(jù)庫(kù)事務(wù)日志的原理差不多呢?
在微服務(wù)架構(gòu)中,事件溯源模式可以帶來以下的好處:
我們可以從過去的事件序列中組建出任意時(shí)刻的數(shù)據(jù)狀態(tài)
每個(gè)過去的事件都得以保存,因此這使得補(bǔ)償事務(wù)成為可能
我們可以從事件存儲(chǔ)中獲取事件流,并且以異步、響應(yīng)式風(fēng)格對(duì)其進(jìn)行變換和處理
事件存儲(chǔ)同樣可以當(dāng)作為數(shù)據(jù)日志
事件存儲(chǔ)的選擇也需要好好考慮。Apache Kafka非常適合這種場(chǎng)景,在此版本的Micro Shop微服務(wù)中,為了簡(jiǎn)化其實(shí)現(xiàn),我們簡(jiǎn)單地使用了MySQL作為事件存儲(chǔ)。下個(gè)版本我們將把Kafka整合進(jìn)來。
購(gòu)物車事件注:在實(shí)際生產(chǎn)環(huán)境中,購(gòu)物車通常被存儲(chǔ)于Session或緩存內(nèi)。本章節(jié)僅為介紹事件溯源而使用事件存儲(chǔ)模式。
我們來看一下代表購(gòu)物車事件的CartEvent數(shù)據(jù)對(duì)象:
@DataObject(generateConverter = true) public class CartEvent { private Long id; private CartEventType cartEventType; private String userId; private String productId; private Integer amount; private long createdAt; public CartEvent() { this.createdAt = System.currentTimeMillis(); } public CartEvent(JsonObject json) { CartEventConverter.fromJson(json, this); } public CartEvent(CartEventType cartEventType, String userId, String productId, Integer amount) { this.cartEventType = cartEventType; this.userId = userId; this.productId = productId; this.amount = amount; this.createdAt = System.currentTimeMillis(); } public static CartEvent createCheckoutEvent(String userId) { return new CartEvent(CartEventType.CHECKOUT, userId, "all", 0); } public static CartEvent createClearEvent(String userId) { return new CartEvent(CartEventType.CLEAR_CART, userId, "all", 0); } public JsonObject toJson() { JsonObject json = new JsonObject(); CartEventConverter.toJson(this, json); return json; } public static boolean isTerminal(CartEventType eventType) { return eventType == CartEventType.CLEAR_CART || eventType == CartEventType.CHECKOUT; } }
一個(gè)購(gòu)物車事件存儲(chǔ)著事件的類型、發(fā)生的時(shí)間、操作用戶、對(duì)應(yīng)的商品ID以及商品數(shù)量變動(dòng)。在我們的藍(lán)圖應(yīng)用中,購(gòu)物車事件一共有四種,它們用CartEventType枚舉類表示:
public enum CartEventType { ADD_ITEM, // 添加商品至購(gòu)物車 REMOVE_ITEM, // 從購(gòu)物車中刪除商品 CHECKOUT, // 結(jié)算并清空 CLEAR_CART // 清空 }
其中CHECKOUT和CLEAR_CART事件是對(duì)整個(gè)購(gòu)物車實(shí)體進(jìn)行操作,對(duì)應(yīng)的購(gòu)物車事件參數(shù)類似,因此我們寫了兩個(gè)靜態(tài)方法來創(chuàng)建這兩種事件。
另外我們還注意到一個(gè)靜態(tài)方法isTerminal,它用于檢測(cè)當(dāng)前購(gòu)物車事件是否為一個(gè)“終結(jié)”事件。所謂的“終結(jié)”,指的是到此就對(duì)整個(gè)購(gòu)物車進(jìn)行操作(結(jié)算或清空)。在從購(gòu)物車事件流構(gòu)建出對(duì)應(yīng)的購(gòu)物車狀態(tài)的時(shí)候,此方法非常有用。
購(gòu)物車實(shí)體看完了購(gòu)物車事件,我們?cè)賮砜匆幌沦?gòu)物車。購(gòu)物車實(shí)體用ShoppingCart數(shù)據(jù)對(duì)象表示,它包含著一個(gè)商品列表表示當(dāng)前購(gòu)物車中的商品即數(shù)量:
private ListproductItems = new ArrayList<>();
其中ProductTuple數(shù)據(jù)對(duì)象包含著商品號(hào)、商品賣家ID、單價(jià)以及當(dāng)前購(gòu)物車中次商品的數(shù)目amount。
為了方便,我們還在ShoppingCart類中放了一個(gè)amountMap用于暫時(shí)存儲(chǔ)商品數(shù)量:
private MapamountMap = new HashMap<>();
由于它只是暫時(shí)存儲(chǔ),我們不希望在對(duì)應(yīng)的JSON數(shù)據(jù)中看到它,所以把它的getter和setter方法都注解上@GenIgnore。
在事件溯源模式中,我們要從一系列的購(gòu)物車事件構(gòu)建對(duì)應(yīng)的購(gòu)物車狀態(tài),因此我們需要一個(gè)incorporate方法將每個(gè)購(gòu)物車事件“合并”至購(gòu)物車內(nèi)以變更對(duì)應(yīng)的商品數(shù)目:
public ShoppingCart incorporate(CartEvent cartEvent) { // 此事件必須為添加或刪除事件 boolean ifValid = Stream.of(CartEventType.ADD_ITEM, CartEventType.REMOVE_ITEM) .anyMatch(cartEventType -> cartEvent.getCartEventType().equals(cartEventType)); if (ifValid) { amountMap.put(cartEvent.getProductId(), amountMap.getOrDefault(cartEvent.getProductId(), 0) + (cartEvent.getAmount() * (cartEvent.getCartEventType() .equals(CartEventType.ADD_ITEM) ? 1 : -1))); } return this; }
實(shí)現(xiàn)倒是比較簡(jiǎn)單,我們首先來檢查要合并的事件是不是添加商品或移除商品事件,如果是的話,我們就根據(jù)事件類型以及對(duì)應(yīng)的數(shù)量變更來改變當(dāng)前購(gòu)物車中該商品的數(shù)量(amountMap)。
使用Rx版本的Vert.x JDBC我們現(xiàn)在已經(jīng)了解購(gòu)物車微服務(wù)中的實(shí)體類了,下面該看看購(gòu)物車事件存儲(chǔ)服務(wù)了。
之前用callback-based API寫Vert.x JDBC操作總感覺心累,還好Vert.x支持與RxJava進(jìn)行整合,并且?guī)缀趺總€(gè)Vert.x組件都有對(duì)應(yīng)的Rx版本!是不是瞬間感覺整個(gè)人都變得Reactive了呢~(⊙o⊙) 這里我們就來使用Rx版本的Vert.x JDBC來寫我們的購(gòu)物車事件存儲(chǔ)服務(wù)。也就是說,里面所有的異步方法都將是基于Observable的,很有FRP風(fēng)格!
我們首先定義了一個(gè)簡(jiǎn)單的CRUD接口SimpleCrudDataSource:
public interface SimpleCrudDataSource{ Observable save(T entity); Observable retrieveOne(ID id); Observable delete(ID id); }
接著定義了一個(gè)CartEventDataSource接口,定義了購(gòu)物車事件獲取的相關(guān)方法:
public interface CartEventDataSource extends SimpleCrudDataSource{ Observable streamByUser(String userId); }
可以看到這個(gè)接口只有一個(gè)方法 —— streamByUser方法會(huì)返回某一用戶對(duì)應(yīng)的購(gòu)物車事件流,這樣后面我們就可以對(duì)其進(jìn)行流式變換操作了!
下面我們來看一下服務(wù)的實(shí)現(xiàn)類CartEventDataSourceImpl。首先是save方法,它將一個(gè)事件存儲(chǔ)至事件數(shù)據(jù)庫(kù)中:
@Override public Observablesave(CartEvent cartEvent) { JsonArray params = new JsonArray().add(cartEvent.getCartEventType().name()) .add(cartEvent.getUserId()) .add(cartEvent.getProductId()) .add(cartEvent.getAmount()) .add(cartEvent.getCreatedAt() > 0 ? cartEvent.getCreatedAt() : System.currentTimeMillis()); return client.getConnectionObservable() .flatMap(conn -> conn.updateWithParamsObservable(SAVE_STATEMENT, params)) .map(r -> null); }
看看我們的代碼,在對(duì)比對(duì)比普通的callback-based的Vert.x JDBC,是不是更加簡(jiǎn)潔,更加Reactive呢?我們可以非常簡(jiǎn)單地通過getConnectionObservable方法獲取數(shù)據(jù)庫(kù)連接,然后組合updateWithParamsObservable方法執(zhí)行對(duì)應(yīng)的含參SQL語(yǔ)句。只需要兩行有木有!而如果用callback-based的風(fēng)格的話,你只能這么寫:
client.getConnection(ar -> { if (ar.succeeded) { SQLConnection connection = ar.result(); connection.updateWithParams(SAVE_STATEMENT, params, ar2 -> { // ... }) } else { resultHandler.handle(Future.failedFuture(ar.cause())); } })
因此,使用RxJava是非常愉快的一件事!當(dāng)然vertx-sync也是一個(gè)不錯(cuò)的選擇。
當(dāng)然,不要忘記返回的Observable是 cold 的,因此只有在它被subscribe的時(shí)候,數(shù)據(jù)才會(huì)被發(fā)射。
不過話說回來了,Vert.x JDBC底層本質(zhì)還是阻塞型的調(diào)用,要實(shí)現(xiàn)真正的異步數(shù)據(jù)庫(kù)操作,我們可以利用 Vert.x MySQL / PostgreSQL Client 這個(gè)組件,底層使用Scala寫的異步數(shù)據(jù)庫(kù)操作庫(kù),不過目前還不是很穩(wěn)定,大家可以自己嘗嘗鮮。
下面我們?cè)賮砜匆幌?b>retrieveOne方法,它從數(shù)據(jù)存儲(chǔ)中獲取特定ID的事件:
@Override public ObservableretrieveOne(Long id) { return client.getConnectionObservable() .flatMap(conn -> conn.queryWithParamsObservable(RETRIEVE_STATEMENT, new JsonArray().add(id))) .map(ResultSet::getRows) .filter(list -> !list.isEmpty()) .map(res -> res.get(0)) .map(this::wrapCartEvent); }
非常簡(jiǎn)潔明了,就像之前我們的基于Future的范式相似,因此這里就不再詳細(xì)解釋了~
下面我們來看一下里面最重要的方法 —— streamByUser方法:
@Override public ObservablestreamByUser(String userId) { JsonArray params = new JsonArray().add(userId).add(userId); return client.getConnectionObservable() .flatMap(conn -> conn.queryWithParamsObservable(STREAM_STATEMENT, params)) .map(ResultSet::getRows) .flatMapIterable(item -> item) // list merge into observable .map(this::wrapCartEvent); }
其核心在于它的SQL語(yǔ)句STREAM_STATEMENT:
SELECT * FROM cart_event c WHERE c.user_id = ? AND c.created_at > coalesce( (SELECT created_at FROM cart_event WHERE user_id = ? AND (`type` = "CHECKOUT" OR `type` = "CLEAR_CART") ORDER BY cart_event.created_at DESC LIMIT 1), 0) ORDER BY c.created_at ASC;
此SQL語(yǔ)句執(zhí)行時(shí)會(huì)獲取與當(dāng)前購(gòu)物車相關(guān)的所有購(gòu)物車事件。注意到我們有許多用戶,每個(gè)用戶可能會(huì)有許多購(gòu)物車事件,它們屬于不同時(shí)間的購(gòu)物車,那么如何來獲取相關(guān)的事件呢?方法是 —— 首先我們獲取最近一次“終結(jié)”事件發(fā)生對(duì)應(yīng)的時(shí)間,那么當(dāng)前購(gòu)物車相關(guān)的購(gòu)物車事件就是在此終結(jié)事件發(fā)生后所有的購(gòu)物車事件。
明白了這一點(diǎn),我們?cè)倩氐?b>streamByUser方法中來。既然此方法是從數(shù)據(jù)庫(kù)中獲取一個(gè)事件列表,那么為什么此方法返回Observable>
呢?我們來看看其中的奧秘 —— flatMapIterable算子,它將一個(gè)序列變換為一串?dāng)?shù)據(jù)流。所以,這里的Observable
哇!現(xiàn)在你一定又被Rx這種函數(shù)響應(yīng)式風(fēng)格所吸引了~在下面的部分中,我們將探索購(gòu)物車服務(wù)及其實(shí)現(xiàn),基于Future,同樣非常Reactive!
根據(jù)購(gòu)物車事件序列構(gòu)建對(duì)應(yīng)的購(gòu)物車狀態(tài)我們首先來看一下ShoppingCartService —— 購(gòu)物車服務(wù)接口,它也是一個(gè)Event Bus服務(wù):
@VertxGen @ProxyGen public interface ShoppingCartService { /** * The name of the event bus service. */ String SERVICE_NAME = "shopping-cart-eb-service"; /** * The address on which the service is published. */ String SERVICE_ADDRESS = "service.shopping.cart"; @Fluent ShoppingCartService addCartEvent(CartEvent event, Handler> resultHandler); @Fluent ShoppingCartService getShoppingCart(String userId, Handler > resultHandler); }
這里我們定義了兩個(gè)方法:addCartEvent用于將購(gòu)物車事件存儲(chǔ)至事件存儲(chǔ)中;getShoppingCart方法用于獲取某個(gè)用戶當(dāng)前購(gòu)物車的狀態(tài)。
下面我們來看一下其實(shí)現(xiàn)類 —— ShoppingCartServiceImpl。首先是addCartEvent方法,它非常簡(jiǎn)單:
@Override public ShoppingCartService addCartEvent(CartEvent event, Handler> resultHandler) { Future future = Future.future(); repository.save(event).toSingle().subscribe(future::complete, future::fail); future.setHandler(resultHand return this; }
正如之前我們所提到的,這里save方法返回的Observable其實(shí)更像個(gè)Single,因此我們將其通過toSingle方法變換為Single,然后通過subscribe(future::complete, future::fail)將其轉(zhuǎn)化為Future以便于給其綁定一個(gè)Handler
而getShoppingCart方法的邏輯位于aggregateCartEvents方法中,此方法非常重要,并且是基于Future的。我們先來看一下代碼:
private FutureaggregateCartEvents(String userId) { Future future = Future.future(); // aggregate cart events into raw shopping cart repository.streamByUser(userId) // (1) .takeWhile(cartEvent -> !CartEvent.isTerminal(cartEvent.getCartEventType())) // (2) .reduce(new ShoppingCart(), ShoppingCart::incorporate) // (3) .toSingle() .subscribe(future::complete, future::fail); // (4) return future.compose(cart -> getProductService() // (5) .compose(service -> prepareProduct(service, cart)) // (6) prepare product data .compose(productList -> generateCurrentCartFromStream(cart, productList)) // (7) prepare product items ); }
我們來詳細(xì)地解釋一下。首先我們先創(chuàng)建個(gè)Future,然后先通過repository.streamByUser(userId)方法獲取事件流(1),然后我們使用takeWhile算子來獲取所有的ADD_ITEM和REMOVE_ITEM類型的事件(2)。takeWhile算子在判定條件變?yōu)榧贂r(shí)停止發(fā)射新的數(shù)據(jù),因此當(dāng)事件流遇到一個(gè)終結(jié)事件時(shí),新的事件就不再往外發(fā)送了,之前的事件將會(huì)繼續(xù)被傳遞。
下面就是產(chǎn)生購(gòu)物車狀態(tài)的過程了!我們通過reduce算子將事件流來“聚合”成購(gòu)物車實(shí)體(3)。這個(gè)過程可以總結(jié)為以下幾步:首先我們先創(chuàng)建一個(gè)空的購(gòu)物車,然后依次將各個(gè)購(gòu)物車事件“合并”至購(gòu)物車實(shí)體中。最后聚合而成的購(gòu)物車實(shí)體應(yīng)該包含一個(gè)完整的amountMap。
現(xiàn)在此Observable已經(jīng)包含了我們想要的初始狀態(tài)的購(gòu)物車了。我們將其轉(zhuǎn)化為Single然后通過subscribe(future::complete, future::fail)轉(zhuǎn)化為Future(4)。
現(xiàn)在我們需要更多的信息以組件一個(gè)完整的購(gòu)物車,所以我們首先組合getProductService異步方法來從服務(wù)發(fā)現(xiàn)層獲取商品服務(wù)(5),然后通過prepareProduct方法來獲取需要的商品數(shù)據(jù)(6),最后通過generateCurrentCartFromStream異步方法組合出完整的購(gòu)物車實(shí)體(7)。這里面包含了好幾個(gè)組合過程,我們來一一解釋。
首先來看getProductService異步方法。它用于從服務(wù)發(fā)現(xiàn)層獲取商品服務(wù),然后返回其異步結(jié)果:
private FuturegetProductService() { Future future = Future.future(); EventBusService.getProxy(discovery, new JsonObject().put("name", ProductService.SERVICE_NAME), future.completer()); return future; }
現(xiàn)在我們獲取到商品服務(wù)了,那么下一步自然是獲取需要的商品數(shù)據(jù)了。這個(gè)過程通過prepareProduct異步方法實(shí)現(xiàn):
private Future> prepareProduct(ProductService service, ShoppingCart cart) { List
> futures = cart.getAmountMap().keySet() // (1) .stream() .map(productId -> { Future future = Future.future(); service.retrieveProduct(productId, future.completer()); return future; // (2) }) .collect(Collectors.toList()); // (3) return Functional.sequenceFuture(futures); // (4) }
在此實(shí)現(xiàn)中,首先我們從amountMap中獲取購(gòu)物車中所有商品的ID(1),然后我們根據(jù)每個(gè)ID異步調(diào)用商品服務(wù)的retrieveProduct方法并且以Future包裝(2),然后將此流轉(zhuǎn)化為List>
,那么如何轉(zhuǎn)換呢?這里我寫了一個(gè)輔助函數(shù)sequenceFuture來實(shí)現(xiàn)這樣的變換,它位于io.vertx.blueprint.microservice.common.functional包下的Functional類中:
public staticFuture > sequenceFuture(List
> futures) { return CompositeFutureImpl.all(futures.toArray(new Future[futures.size()])) // (1) .map(v -> futures.stream() .map(Future::result) // (2) .collect(Collectors.toList()) // (3) ); }
此方法對(duì)于想將一個(gè)Future序列變換成單個(gè)Future的情況非常有用。這里我們首先調(diào)用CompositeFutureImpl類的all方法(1),它返回一個(gè)組合的Future,當(dāng)且僅當(dāng)序列中所有的Future都成功完成時(shí),它為成功狀態(tài),否則為失敗狀態(tài)。下面我們就對(duì)此組合Future做變換:獲取每個(gè)Future對(duì)應(yīng)的結(jié)果(因?yàn)?b>all方法已經(jīng)強(qiáng)制獲取所有結(jié)果),然后歸結(jié)成列表(3)。
回到之前的組合中來!現(xiàn)在我們得到了我們需要的商品信息列表List
private FuturegenerateCurrentCartFromStream(ShoppingCart rawCart, List productList) { Future future = Future.future(); // check if any of the product is invalid if (productList.stream().anyMatch(e -> e == null)) { // (1) future.fail("Error when retrieve products: empty"); return future; } // construct the product items List currentItems = rawCart.getAmountMap().entrySet() // (2) .stream() .map(item -> new ProductTuple(getProductFromStream(productList, item.getKey()), // (3) item.getValue())) // (4) amount value .filter(item -> item.getAmount() > 0) // (5) amount must be greater than zero .collect(Collectors.toList()); ShoppingCart cart = rawCart.setProductItems(currentItems); // (6) return Future.succeededFuture(cart); // (7) }
看起來非常混亂的樣子。。。不要擔(dān)心,我們慢慢來~注意這個(gè)方法本身不是異步的,但我們需要表示此方法成功或失敗兩種狀態(tài)(即AsyncResult),所以此方法仍然返回Future。首先我們創(chuàng)建一個(gè)Future,然后通過anyMatch方法檢查商品列表是否合法(1)。若不合法,返回一個(gè)失敗的Future;若合法,我們對(duì)每個(gè)商品依次構(gòu)建出對(duì)應(yīng)的ProductTuple。在(3)中,我們通過這個(gè)構(gòu)造函數(shù)來構(gòu)建ProductTuple:
public ProductTuple(Product product, Integer amount) { this.productId = product.getProductId(); this.sellerId = product.getSellerId(); this.price = product.getPrice(); this.amount = amount; }
其中第一個(gè)參數(shù)是對(duì)應(yīng)的商品實(shí)體。為了從列表中獲取對(duì)應(yīng)的商品實(shí)體,我們寫了一個(gè)getProductFromStream方法:
private Product getProductFromStream(ListproductList, String productId) { return productList.stream() .filter(product -> product.getProductId().equals(productId)) .findFirst() .get(); }
當(dāng)每個(gè)商品的ProductTuple都構(gòu)建完畢的時(shí)候,我們就可以將列表賦值給對(duì)應(yīng)的購(gòu)物車實(shí)體了(6),并且返回購(gòu)物車實(shí)體結(jié)果(7)。現(xiàn)在我們終于整合出一個(gè)完整的購(gòu)物車了!
結(jié)算 - 根據(jù)購(gòu)物車產(chǎn)生訂單現(xiàn)在我們已經(jīng)選好了自己喜愛的商品,把購(gòu)物車填的慢慢當(dāng)當(dāng)了,下面是時(shí)候進(jìn)行結(jié)算了!我們這里同樣定義了一個(gè)結(jié)算服務(wù)接口CheckoutService,它只包含一個(gè)特定的方法:checkout:
@VertxGen @ProxyGen public interface CheckoutService { /** * The name of the event bus service. */ String SERVICE_NAME = "shopping-checkout-eb-service"; /** * The address on which the service is published. */ String SERVICE_ADDRESS = "service.shopping.cart.checkout"; /** * Order event source address. */ String ORDER_EVENT_ADDRESS = "events.service.shopping.to.order"; /** * Create a shopping checkout service instance */ static CheckoutService createService(Vertx vertx, ServiceDiscovery discovery) { return new CheckoutServiceImpl(vertx, discovery); } void checkout(String userId, Handler> handler); }
接口非常簡(jiǎn)單,下面我們來看其實(shí)現(xiàn) —— CheckoutServiceImpl類。盡管接口只包含一個(gè)checkout方法,但我們都知道結(jié)算過程可不簡(jiǎn)單。。。它包含庫(kù)存檢測(cè)、付款(這里暫時(shí)省掉了)以及生成訂單的邏輯。我們先來看看checkout方法的源碼:
@Override public void checkout(String userId, Handler> resultHandler) { if (userId == null) { // (1) resultHandler.handle(Future.failedFuture(new IllegalStateException("Invalid user"))); return; } Future cartFuture = getCurrentCart(userId); // (2) Future orderFuture = cartFuture.compose(cart -> checkAvailableInventory(cart).compose(checkResult -> { // (3) if (checkResult.getBoolean("res")) { // (3) double totalPrice = calculateTotalPrice(cart); // (4) // 創(chuàng)建訂單實(shí)體 Order order = new Order().setBuyerId(userId) // (5) .setPayId("TEST") .setProducts(cart.getProductItems()) .setTotalPrice(totalPrice); // 設(shè)置訂單流水號(hào),然后向訂單組件發(fā)送訂單并等待回應(yīng) return retrieveCounter("order") // (6) .compose(id -> sendOrderAwaitResult(order.setOrderId(id))) // (7) .compose(result -
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/65105.html
摘要:本文章是藍(lán)圖系列的第二篇教程。這就是請(qǐng)求回應(yīng)模式。好多屬性我們一個(gè)一個(gè)地解釋一個(gè)序列,作為的地址任務(wù)的編號(hào)任務(wù)的類型任務(wù)攜帶的數(shù)據(jù),以類型表示任務(wù)優(yōu)先級(jí),以枚舉類型表示。默認(rèn)優(yōu)先級(jí)為正常任務(wù)的延遲時(shí)間,默認(rèn)是任務(wù)狀態(tài),以枚舉類型表示。 本文章是 Vert.x 藍(lán)圖系列 的第二篇教程。全系列: Vert.x Blueprint 系列教程(一) | 待辦事項(xiàng)服務(wù)開發(fā)教程 Vert.x B...
摘要:本文章是藍(lán)圖系列的第一篇教程。是事件驅(qū)動(dòng)的,同時(shí)也是非阻塞的。是一組負(fù)責(zé)分發(fā)和處理事件的線程。注意,我們絕對(duì)不能去阻塞線程,否則事件的處理過程會(huì)被阻塞,我們的應(yīng)用就失去了響應(yīng)能力。每個(gè)負(fù)責(zé)處理請(qǐng)求并且寫入回應(yīng)結(jié)果。 本文章是 Vert.x 藍(lán)圖系列 的第一篇教程。全系列: Vert.x Blueprint 系列教程(一) | 待辦事項(xiàng)服務(wù)開發(fā)教程 Vert.x Blueprint 系...
摘要:上部分藍(lán)圖教程中我們一起探索了如何用開發(fā)一個(gè)基于消息的應(yīng)用。對(duì)部分來說,如果看過我們之前的藍(lán)圖待辦事項(xiàng)服務(wù)開發(fā)教程的話,你應(yīng)該對(duì)這一部分非常熟悉了,因此這里我們就不詳細(xì)解釋了。有關(guān)使用實(shí)現(xiàn)的教程可參考藍(lán)圖待辦事項(xiàng)服務(wù)開發(fā)教程。 上部分藍(lán)圖教程中我們一起探索了如何用Vert.x開發(fā)一個(gè)基于消息的應(yīng)用。在這部分教程中,我們將粗略地探索一下kue-http模塊的實(shí)現(xiàn)。 Vert.x Kue ...
摘要:主要是避免引入太多的復(fù)雜性,并且出于靈活部署的需要。以應(yīng)用為例,由于實(shí)際上是在上執(zhí)行,若它被阻塞,即導(dǎo)致后續(xù)請(qǐng)求全部無(wú)法得到處理。因此,最合適的做法就是對(duì)于簡(jiǎn)單業(yè)務(wù),采用異步庫(kù)。本系列其他文章入坑須知入坑須知入坑須知 最開始覺得這個(gè)系列也就最多3篇了不起了(因?yàn)槭虏贿^三嘛),沒曾想居然迎來了第四篇! Kotlin 由于最近決定投身到區(qū)塊鏈的學(xué)習(xí)當(dāng)中的緣故,出于更好的理解它的基本概念,自...
摘要:二來,給大家新開坑的項(xiàng)目一個(gè)參考。因此,本系列以主要以官方文檔為基礎(chǔ),將盡可能多的特性融入本項(xiàng)目,并標(biāo)注官網(wǎng)原文出處,有興趣的小伙伴可點(diǎn)擊深入了解??梢酝ㄟ^一些特殊協(xié)議例如將消息作為統(tǒng)一消息服務(wù)導(dǎo)出。下載完成后自行修改和。 開坑前言 我給這個(gè)專欄的名氣取名叫做小項(xiàng)目,聽名字就知道,這個(gè)專題最終的目的是帶領(lǐng)大家完成一個(gè)項(xiàng)目。為什么要開這么大一個(gè)坑呢,一來,雖然網(wǎng)上講IT知識(shí)點(diǎn)的書籍鋪天蓋...
閱讀 1406·2021-11-08 13:14
閱讀 760·2021-09-23 11:31
閱讀 1051·2021-07-29 13:48
閱讀 2789·2019-08-29 12:29
閱讀 3384·2019-08-29 11:24
閱讀 1910·2019-08-26 12:02
閱讀 3703·2019-08-26 10:34
閱讀 3447·2019-08-23 17:07