摘要:它是事件驅(qū)動(dòng)的,我們不斷的發(fā)送消息接受消息處理消息。使用消息實(shí)現(xiàn)事件通信的概念被稱為消息驅(qū)動(dòng)架構(gòu),也被稱為消息驅(qū)動(dòng)架構(gòu)。許可證服務(wù)收到該消息后清除對(duì)應(yīng)的緩存。通過綁定器,使得開發(fā)人員不必依賴于特定平臺(tái)的庫(kù)和來發(fā)布和消費(fèi)消息。
springcloud 總集:https://www.tapme.top/blog/detail/2019-02-28-11-33
代碼見文章結(jié)尾
??想想平常生活中做飯的場(chǎng)景,在用電飯鍋?zhàn)鲲埖耐瑫r(shí),我們可以洗菜、切菜,等待電飯鍋發(fā)出飯做好的提示我們回去拔下電飯鍋電源(或者什么也不知讓它處于保溫狀態(tài)),反正這個(gè)時(shí)候我們知道飯做好了,接下來可以炒菜了。從這里可以看出我們?cè)谌粘I钪信c世界的互動(dòng)并不是同步的、線性的,不是簡(jiǎn)單的請(qǐng)求--響應(yīng)模型。它是事件驅(qū)動(dòng)的,我們不斷的發(fā)送消息、接受消息、處理消息。
??同樣在軟件世界中也不全是請(qǐng)求--響應(yīng)模型,也會(huì)需要進(jìn)行異步的消息通信。使用消息實(shí)現(xiàn)事件通信的概念被稱為消息驅(qū)動(dòng)架構(gòu)(Event Driven Architecture,EDA),也被稱為消息驅(qū)動(dòng)架構(gòu)(Message Driven Architecture,MDA)。使用這類架構(gòu)可以構(gòu)建高度解耦的系統(tǒng),該系統(tǒng)能夠?qū)ψ兓龀鲰憫?yīng),且不需要與特定的庫(kù)或者服務(wù)緊密耦合。
??在 Spring Cloud 項(xiàng)目中可以使用Spirng Cloud Stream輕而易舉的構(gòu)建基于消息傳遞的解決方案。
為什么使用消息傳遞??要解答這個(gè)問題,讓我們從一個(gè)例子開始,之前一直使用的兩個(gè)服務(wù):許可證服務(wù)和組織服務(wù)。每次對(duì)許可證服務(wù)進(jìn)行請(qǐng)求,許可證服務(wù)都要通過 http 請(qǐng)求到組織服務(wù)上查詢組織信息。顯而易見這次額外的 http 請(qǐng)求會(huì)花費(fèi)較長(zhǎng)的時(shí)間。如果能夠?qū)⒕彺娼M織數(shù)據(jù)的讀操作,將會(huì)大幅提高許可證服務(wù)的響應(yīng)時(shí)間。但是緩存數(shù)據(jù)有如下 2 個(gè)要求:
緩存的數(shù)據(jù)需要在許可證服務(wù)的所有實(shí)例之間保存一致——這意味著不能將數(shù)據(jù)緩存到服務(wù)實(shí)例的內(nèi)存中。
在更新或者刪除一個(gè)組織數(shù)據(jù)時(shí),許可證服務(wù)緩存的數(shù)據(jù)需要失效——避免讀取到過期數(shù)據(jù),需要盡早讓過時(shí)數(shù)據(jù)失效并刪除。
??要實(shí)現(xiàn)上面的要求,現(xiàn)在有兩種辦法。
使用同步請(qǐng)求--響應(yīng)模型來實(shí)現(xiàn)。組織服務(wù)在組織數(shù)據(jù)變化時(shí)調(diào)用許可證服務(wù)的接口通知組織服務(wù)已經(jīng)變化,或者直接操作許可證服務(wù)的緩存。
使用事件驅(qū)動(dòng)。組織服務(wù)發(fā)出一個(gè)異步消息。許可證服務(wù)收到該消息后清除對(duì)應(yīng)的緩存。
同步請(qǐng)求-響應(yīng)方式??許可證服務(wù)在 redis 中緩存從組織服務(wù)中查詢到的服務(wù)信息,當(dāng)組織數(shù)據(jù)更新時(shí),組織服務(wù)同步 http 請(qǐng)求通知許可證服務(wù)數(shù)據(jù)過期。這種方式有以下幾個(gè)問題:
組織服務(wù)和許可證服務(wù)緊密耦合
這種方式不夠靈活,如果要為組織服務(wù)添加新的消費(fèi)者,必須修改組織服務(wù)代碼,以讓其通知新的服務(wù)數(shù)據(jù)變動(dòng)。
使用消息傳遞方式??同樣的許可證服務(wù)在 redis 中緩存從組織服務(wù)中查詢到的服務(wù)信息,當(dāng)組織數(shù)據(jù)更新時(shí),組織服務(wù)將更新信息寫入到隊(duì)列中。許可證服務(wù)監(jiān)聽消息隊(duì)列。使用消息傳遞有一下 4 個(gè)好處:
松耦合性:將服務(wù)間的依賴,變成了服務(wù)對(duì)隊(duì)列的依賴,依賴關(guān)系變?nèi)趿恕?/p>
耐久性:即使服務(wù)消費(fèi)者已經(jīng)關(guān)閉了,也可以繼續(xù)往里發(fā)送消息,等消費(fèi)者開啟后處理
可伸縮性: 消息發(fā)送者不用等待消息消費(fèi)者的響應(yīng),它們可以繼續(xù)做各自的工作
靈活性:消息發(fā)送者不用知道誰會(huì)消費(fèi)這個(gè)消息,因此在有新的消息消費(fèi)者時(shí)無需修改消息發(fā)送代碼
spring cloud 中使用消息傳遞??spring cloud 項(xiàng)目中可以通過 spring cloud stream 框架來輕松集成消息傳遞。該框架最大的特點(diǎn)是抽象了消息傳遞平臺(tái)的細(xì)節(jié),因此可以在支持的消息隊(duì)列中隨意切換(包括 Apache Kafka 和 RabbitMQ)。
spring cloud stream 架構(gòu)??spring cloud stream 中有 4 個(gè)組件涉及到消息發(fā)布和消息消費(fèi),分別為:
發(fā)射器
??當(dāng)一個(gè)服務(wù)準(zhǔn)備發(fā)送消息時(shí),它將使用發(fā)射器發(fā)布消息。發(fā)射器是一個(gè) Spring 注解接口,它接收一個(gè)普通 Java 對(duì)象,表示要發(fā)布的消息。發(fā)射器接收消息,然后序列化(默認(rèn)序列化為 JSON)后發(fā)布到通道中。
通道
??通道是對(duì)隊(duì)列的一個(gè)抽象。通道名稱是與目標(biāo)隊(duì)列名稱相關(guān)聯(lián)的。但是隊(duì)列名稱并不會(huì)直接公開在代碼中,代碼永遠(yuǎn)只會(huì)使用通道名。
綁定器
??綁定器是 spring cloud stream 框架的一部分,它是與特定消息平臺(tái)對(duì)話的 Spring 代碼。通過綁定器,使得開發(fā)人員不必依賴于特定平臺(tái)的庫(kù)和 API 來發(fā)布和消費(fèi)消息。
接收器
??服務(wù)通過接收器來從隊(duì)列中接收消息,并將消息反序列化。
處理邏輯如下:
實(shí)戰(zhàn)??繼續(xù)使用之前的項(xiàng)目,在許可證服務(wù)中緩存組織數(shù)據(jù)到 redis 中。
建立 redis 服務(wù)??為方便起見,使用 docker 創(chuàng)建 redis,建立腳本如下:
docker run -itd --name redis --net host redis:建立 kafka 服務(wù) 在組織服務(wù)中編寫消息生產(chǎn)者
??首先在 organization 服務(wù)中引入 spring cloud stream 和 kafka 的依賴。
org.springframework.cloud spring-cloud-stream org.springframework.cloud spring-cloud-starter-stream-kafka
??然后在 events 類中編寫SimpleSouce類,用于組織數(shù)據(jù)修改,產(chǎn)生一條消息到隊(duì)列中。代碼如下:
@EnableBinding(Source.class) public class SimpleSource { private Logger logger = LoggerFactory.getLogger(SimpleSource.class); private Source source; @Autowired public SimpleSource(Source source) { this.source = source; } public void publishOrChange(String action, String orgId) { logger.info("在請(qǐng)求:{}中,發(fā)送kafka消息:{} for Organization Id:{}", UserContextHolder.getContext().id, action, orgId); OrganizationChange change = new OrganizationChange(action, orgId, UserContextHolder.getContext().id); source.output().send(MessageBuilder.withPayload(change).build()); } }
這里使用的是默認(rèn)通道,Source 類定義的 output 通道發(fā)消息。后面通過 Sink 定義的 input 通道收消息。
??然后在OrganizationController類中定義一個(gè) delete 方法,并注入 SimpleSouce 類,代碼如下:
@Autowired private SimpleSource simpleSource; @DeleteMapping(value = "/organization/{orgId}") public void deleteOne(@PathVariable("orgId") String id) { logger.debug("刪除了組織:{}", id); simpleSource.publishOrChange("delete", id); }
??最后在配置文件中加入消息隊(duì)列的配置:
# 省略了其他配置 spring: cloud: stream: bindings: output: destination: orgChangeTopic content-type: application/json kafka: binder: # 替換為部署kafka的ip和端口 zk-nodes: 192.168.226.5:2181 brokers: 192.168.226.5:9092
??現(xiàn)在我們可以測(cè)試下訪問localhost:5555/apis/org/organization/12,可以看到控制臺(tái)打印消息生成的日志。
在許可證服務(wù)中編寫消息消費(fèi)者??集成 redis 的方法,參看[]()。這里不作說明。
??首先引入依賴,依賴項(xiàng)同上面組織服務(wù)。
??然后在 event 包下創(chuàng)建OrgChange的類,代碼如下:
@EnableBinding(Sink.class) //使用Sink接口中定義的通道來監(jiān)聽傳入消息 public class OrgChange { private Logger logger = LoggerFactory.getLogger(OrgChange.class); @StreamListener(Sink.INPUT) public void loggerSink(OrganizationChange change){ logger.info("收到一個(gè)消息,組織id為:{},關(guān)聯(lián)id為:{}",change.getOrgId(),change.getId()); //刪除失效緩存 RedisUtils.del(RedisKeyUtils.getOrgCacheKey(change.getOrgId())); } } //下面兩個(gè)都在util包下 //RedisKeyUtils.java代碼如下 public class RedisKeyUtils { private static final String ORG_CACHE_PREFIX = "orgCache_"; public static String getOrgCacheKey(String orgId){ return ORG_CACHE_PREFIX+orgId; } } //RedisUtils.java代碼如下 @Component @SuppressWarnings("all") public class RedisUtils { public static RedisTemplate redisTemplate; @Autowired public void setRedisTemplate(RedisTemplate redisTemplate) { RedisUtils.redisTemplate = redisTemplate; } public static boolean setObj(String key,Object value){ return setObj(key,value,0); } /** * Description: * * @author fanxb * @date 2019/2/21 15:21 * @param key 鍵 * @param value 值 * @param time 過期時(shí)間,單位ms * @return boolean 是否成功 */ public static boolean setObj(String key,Object value,long time){ try{ if(time<=0){ redisTemplate.opsForValue().set(key,value); }else{ redisTemplate.opsForValue().set(key,value,time,TimeUnit.MILLISECONDS); } return true; }catch (Exception e){ e.printStackTrace(); return false; } } public static Object get(String key){ if(key==null){ return null; } try{ Object obj = redisTemplate.opsForValue().get(key); return obj; }catch (Exception e){ e.printStackTrace(); return null; } } public static void del(String... key){ if(key!=null && key.length>0){ redisTemplate.delete(CollectionUtils.arrayToList(key)); } } }
??上面用到的是 Sink.INPUT 通道,這個(gè)和之前的 Source.OUTPUT 通道剛好一隊(duì),一個(gè)負(fù)責(zé)收,一個(gè)負(fù)責(zé)發(fā)。
??然后修改OrganizationByRibbonService.java文件中的getOrganizationWithRibbon方法:
public Organization getOrganizationWithRibbon(String id) { String key = RedisKeyUtils.getOrgCacheKey(id); //先從redis緩存取數(shù)據(jù) Object res = RedisUtils.get(key); if (res == null) { logger.info("當(dāng)前數(shù)據(jù)無緩存:{}", id); try{ ResponseEntityresponseEntity = restTemplate.exchange("http://organizationservice/organization/{id}", HttpMethod.GET, null, Organization.class, id); res = responseEntity.getBody(); RedisUtils.setObj(key, res); }catch (Exception e){ e.printStackTrace(); } } else { logger.info("當(dāng)前數(shù)據(jù)為緩存數(shù)據(jù):{}", id); } return (Organization) res; }
??最后修改配置文件,為 input 通道指定 topic,配置如下:
spring: cloud: stream: bindings: input: destination: orgChangeTopic content-type: application/json # 定義將要消費(fèi)消息的消費(fèi)者組的名稱 # 可能多個(gè)服務(wù)監(jiān)聽同一個(gè)消息隊(duì)列。如果定義了消費(fèi)者組,那么同組中只要有一個(gè)消費(fèi)了消息,剩余的不會(huì)再次消費(fèi)該消息,保證只有消息的 # 一個(gè)副本會(huì)被該組的某個(gè)實(shí)例所消費(fèi) group: licensingGroup kafka: binder: zk-nodes: 192.168.226.5:2181 brokers: 192.168.226.5:9092
基本和發(fā)送的配置相同,只是這里是為input通道映射隊(duì)列,然后還定義了一個(gè)組名,避免一個(gè)消息被重復(fù)消費(fèi)。
??現(xiàn)在來多次訪問localhost:5555/apis/licensingservice/licensingByRibbon/12,可以看到 licensingservice 控制臺(tái)打印數(shù)據(jù)從緩存中讀取,如下所示:
然后再以 delete 訪問localhost:5555/apis/org/organization/12清除緩存,再次訪問 licensingservice 服務(wù),結(jié)果如下:
自定義通道??上面用的是Spring Cloud Stream自帶的 input/output 通道,那么要如何自定義通道呢?下面以自定義customInput/customOutput通道為例。
自定義發(fā)數(shù)據(jù)通道public interface CustomOutput { @Output("customOutput") MessageChannel out(); }
??對(duì)于每個(gè)自定義的發(fā)數(shù)據(jù)通道,需使用@OutPut 注解標(biāo)記的返回 MessageChannel 類的方法。
自定義收數(shù)據(jù)通道public interface CustomInput { @Input("customInput") SubscribableChannel in(); }
??同上,對(duì)應(yīng)自定義的收數(shù)據(jù)通道,需要使用@Input 注解標(biāo)記的返回 SubscribableChannel 類的方法。
結(jié)束??看完本篇你應(yīng)該已經(jīng)能夠在 Spring Cloud 中集成 Spring Cloud Stream 消息隊(duì)列了,貌似這個(gè)也能用到普通的 spring boot 項(xiàng)目中,比直接集成 mq 更加的優(yōu)雅。
2019,Fighting!
本篇原創(chuàng)發(fā)布于:FleyX 的個(gè)人博客
本篇所用全部代碼:FleyX 的 github
掃碼關(guān)注微信公眾號(hào):FleyX 學(xué)習(xí)筆記,獲取更多干貨
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/77850.html
摘要:授權(quán)框架使第三方應(yīng)用程序來獲取對(duì)服務(wù)的有限訪問機(jī)會(huì)。無論是通過編排資源所有者和服務(wù)之間的交互批準(zhǔn)的資源所有者,或通過允許第三方應(yīng)用程序來獲取自己的訪問權(quán)限。 SpringCloud打造微服務(wù)平臺(tái)--概覽 簡(jiǎn)述 SpringCloud是什么 Spring Boot和SpringCloud是什么關(guān)系 Spring Boot是Spring的一套快速WEB開發(fā)的腳手架,可建立獨(dú)立的Sprin...
摘要:它就是史上最簡(jiǎn)單的教程第三篇服務(wù)消費(fèi)者后端掘金上一篇文章,講述了通過去消費(fèi)服務(wù),這篇文章主要講述通過去消費(fèi)服務(wù)。概覽和架構(gòu)設(shè)計(jì)掘金技術(shù)征文后端掘金是基于的一整套實(shí)現(xiàn)微服務(wù)的框架。 Spring Boot 配置文件 – 在坑中實(shí)踐 - 后端 - 掘金作者:泥瓦匠鏈接:Spring Boot 配置文件 – 在坑中實(shí)踐版權(quán)歸作者所有,轉(zhuǎn)載請(qǐng)注明出處本文提綱一、自動(dòng)配置二、自定義屬性三、ran...
摘要:調(diào)用百度實(shí)現(xiàn)圖像識(shí)別使用渲染導(dǎo)出的制作的超級(jí)炫酷的三維模型一個(gè)代碼庫(kù)本人本人瀏覽器調(diào)試及所有錯(cuò)誤代碼整合千峰超級(jí)好用的各種開發(fā)自學(xué)文檔這是它對(duì)應(yīng)的學(xué)習(xí)視頻使用教程詳細(xì)虛擬機(jī)安裝系統(tǒng)詳解版網(wǎng)易開源鏡像站在線數(shù)據(jù)互轉(zhuǎn)使 1.Java調(diào)用百度API實(shí)現(xiàn)圖像識(shí)別 2.使用Three.js渲染Sketchup導(dǎo)出的dae 3.three.js制作的超級(jí)炫酷的三維模型 4.three.js - 一...
摘要:調(diào)用百度實(shí)現(xiàn)圖像識(shí)別使用渲染導(dǎo)出的制作的超級(jí)炫酷的三維模型一個(gè)代碼庫(kù)本人本人瀏覽器調(diào)試及所有錯(cuò)誤代碼整合千峰超級(jí)好用的各種開發(fā)自學(xué)文檔這是它對(duì)應(yīng)的學(xué)習(xí)視頻使用教程詳細(xì)虛擬機(jī)安裝系統(tǒng)詳解版網(wǎng)易開源鏡像站在線數(shù)據(jù)互轉(zhuǎn)使 1.Java調(diào)用百度API實(shí)現(xiàn)圖像識(shí)別 2.使用Three.js渲染Sketchup導(dǎo)出的dae 3.three.js制作的超級(jí)炫酷的三維模型 4.three.js - 一...
閱讀 3203·2023-04-25 19:09
閱讀 3914·2021-10-22 09:54
閱讀 1796·2021-09-29 09:35
閱讀 2947·2021-09-08 09:45
閱讀 2322·2021-09-06 15:00
閱讀 2796·2019-08-29 15:32
閱讀 1072·2019-08-28 18:30
閱讀 397·2019-08-26 13:43