成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

springCloud學(xué)習(xí)5(Spring-Cloud-Stream事件驅(qū)動(dòng))

Atom / 2180人閱讀

摘要:它是事件驅(qū)動(dòng)的,我們不斷的發(fā)送消息接受消息處理消息。使用消息實(shí)現(xiàn)事件通信的概念被稱為消息驅(qū)動(dòng)架構(gòu),也被稱為消息驅(qū)動(dòng)架構(gòu)。許可證服務(wù)收到該消息后清除對(duì)應(yīng)的緩存。通過綁定器,使得開發(fā)人員不必依賴于特定平臺(tái)的庫(kù)和來發(fā)布和消費(fèi)消息。

springcloud 總集:https://www.tapme.top/blog/detail/2019-02-28-11-33

代碼見文章結(jié)尾

??想想平常生活中做飯的場(chǎng)景,在用電飯鍋?zhàn)鲲埖耐瑫r(shí),我們可以洗菜、切菜,等待電飯鍋發(fā)出飯做好的提示我們回去拔下電飯鍋電源(或者什么也不知讓它處于保溫狀態(tài)),反正這個(gè)時(shí)候我們知道飯做好了,接下來可以炒菜了。從這里可以看出我們?cè)谌粘I钪信c世界的互動(dòng)并不是同步的、線性的,不是簡(jiǎn)單的請(qǐng)求--響應(yīng)模型。它是事件驅(qū)動(dòng)的,我們不斷的發(fā)送消息、接受消息、處理消息。

??同樣在軟件世界中也不全是請(qǐng)求--響應(yīng)模型,也會(huì)需要進(jìn)行異步的消息通信。使用消息實(shí)現(xiàn)事件通信的概念被稱為消息驅(qū)動(dòng)架構(gòu)(Event Driven Architecture,EDA),也被稱為消息驅(qū)動(dòng)架構(gòu)(Message Driven Architecture,MDA)。使用這類架構(gòu)可以構(gòu)建高度解耦的系統(tǒng),該系統(tǒng)能夠?qū)ψ兓龀鲰憫?yīng),且不需要與特定的庫(kù)或者服務(wù)緊密耦合。

??在 Spring Cloud 項(xiàng)目中可以使用Spirng Cloud Stream輕而易舉的構(gòu)建基于消息傳遞的解決方案。

為什么使用消息傳遞

??要解答這個(gè)問題,讓我們從一個(gè)例子開始,之前一直使用的兩個(gè)服務(wù):許可證服務(wù)和組織服務(wù)。每次對(duì)許可證服務(wù)進(jìn)行請(qǐng)求,許可證服務(wù)都要通過 http 請(qǐng)求到組織服務(wù)上查詢組織信息。顯而易見這次額外的 http 請(qǐng)求會(huì)花費(fèi)較長(zhǎng)的時(shí)間。如果能夠?qū)⒕彺娼M織數(shù)據(jù)的讀操作,將會(huì)大幅提高許可證服務(wù)的響應(yīng)時(shí)間。但是緩存數(shù)據(jù)有如下 2 個(gè)要求:

緩存的數(shù)據(jù)需要在許可證服務(wù)的所有實(shí)例之間保存一致——這意味著不能將數(shù)據(jù)緩存到服務(wù)實(shí)例的內(nèi)存中。

在更新或者刪除一個(gè)組織數(shù)據(jù)時(shí),許可證服務(wù)緩存的數(shù)據(jù)需要失效——避免讀取到過期數(shù)據(jù),需要盡早讓過時(shí)數(shù)據(jù)失效并刪除。

??要實(shí)現(xiàn)上面的要求,現(xiàn)在有兩種辦法。

使用同步請(qǐng)求--響應(yīng)模型來實(shí)現(xiàn)。組織服務(wù)在組織數(shù)據(jù)變化時(shí)調(diào)用許可證服務(wù)的接口通知組織服務(wù)已經(jīng)變化,或者直接操作許可證服務(wù)的緩存。

使用事件驅(qū)動(dòng)。組織服務(wù)發(fā)出一個(gè)異步消息。許可證服務(wù)收到該消息后清除對(duì)應(yīng)的緩存。

同步請(qǐng)求-響應(yīng)方式

??許可證服務(wù)在 redis 中緩存從組織服務(wù)中查詢到的服務(wù)信息,當(dāng)組織數(shù)據(jù)更新時(shí),組織服務(wù)同步 http 請(qǐng)求通知許可證服務(wù)數(shù)據(jù)過期。這種方式有以下幾個(gè)問題:

組織服務(wù)和許可證服務(wù)緊密耦合

這種方式不夠靈活,如果要為組織服務(wù)添加新的消費(fèi)者,必須修改組織服務(wù)代碼,以讓其通知新的服務(wù)數(shù)據(jù)變動(dòng)。

使用消息傳遞方式

??同樣的許可證服務(wù)在 redis 中緩存從組織服務(wù)中查詢到的服務(wù)信息,當(dāng)組織數(shù)據(jù)更新時(shí),組織服務(wù)將更新信息寫入到隊(duì)列中。許可證服務(wù)監(jiān)聽消息隊(duì)列。使用消息傳遞有一下 4 個(gè)好處:

松耦合性:將服務(wù)間的依賴,變成了服務(wù)對(duì)隊(duì)列的依賴,依賴關(guān)系變?nèi)趿恕?/p>

耐久性:即使服務(wù)消費(fèi)者已經(jīng)關(guān)閉了,也可以繼續(xù)往里發(fā)送消息,等消費(fèi)者開啟后處理

可伸縮性: 消息發(fā)送者不用等待消息消費(fèi)者的響應(yīng),它們可以繼續(xù)做各自的工作

靈活性:消息發(fā)送者不用知道誰會(huì)消費(fèi)這個(gè)消息,因此在有新的消息消費(fèi)者時(shí)無需修改消息發(fā)送代碼

spring cloud 中使用消息傳遞

??spring cloud 項(xiàng)目中可以通過 spring cloud stream 框架來輕松集成消息傳遞。該框架最大的特點(diǎn)是抽象了消息傳遞平臺(tái)的細(xì)節(jié),因此可以在支持的消息隊(duì)列中隨意切換(包括 Apache Kafka 和 RabbitMQ)。

spring cloud stream 架構(gòu)

??spring cloud stream 中有 4 個(gè)組件涉及到消息發(fā)布和消息消費(fèi),分別為:

發(fā)射器

??當(dāng)一個(gè)服務(wù)準(zhǔn)備發(fā)送消息時(shí),它將使用發(fā)射器發(fā)布消息。發(fā)射器是一個(gè) Spring 注解接口,它接收一個(gè)普通 Java 對(duì)象,表示要發(fā)布的消息。發(fā)射器接收消息,然后序列化(默認(rèn)序列化為 JSON)后發(fā)布到通道中。

通道

??通道是對(duì)隊(duì)列的一個(gè)抽象。通道名稱是與目標(biāo)隊(duì)列名稱相關(guān)聯(lián)的。但是隊(duì)列名稱并不會(huì)直接公開在代碼中,代碼永遠(yuǎn)只會(huì)使用通道名。

綁定器

??綁定器是 spring cloud stream 框架的一部分,它是與特定消息平臺(tái)對(duì)話的 Spring 代碼。通過綁定器,使得開發(fā)人員不必依賴于特定平臺(tái)的庫(kù)和 API 來發(fā)布和消費(fèi)消息。

接收器

??服務(wù)通過接收器來從隊(duì)列中接收消息,并將消息反序列化。

處理邏輯如下:

實(shí)戰(zhàn)

??繼續(xù)使用之前的項(xiàng)目,在許可證服務(wù)中緩存組織數(shù)據(jù)到 redis 中。

建立 redis 服務(wù)

??為方便起見,使用 docker 創(chuàng)建 redis,建立腳本如下:

docker run -itd --name redis --net host redis:
建立 kafka 服務(wù) 在組織服務(wù)中編寫消息生產(chǎn)者

??首先在 organization 服務(wù)中引入 spring cloud stream 和 kafka 的依賴。


    org.springframework.cloud
    spring-cloud-stream



    org.springframework.cloud
    spring-cloud-starter-stream-kafka

??然后在 events 類中編寫SimpleSouce類,用于組織數(shù)據(jù)修改,產(chǎn)生一條消息到隊(duì)列中。代碼如下:

@EnableBinding(Source.class)
public class SimpleSource {
    private Logger logger = LoggerFactory.getLogger(SimpleSource.class);

    private Source source;

    @Autowired
    public SimpleSource(Source source) {
        this.source = source;
    }

    public void publishOrChange(String action, String orgId) {
        logger.info("在請(qǐng)求:{}中,發(fā)送kafka消息:{} for Organization Id:{}", UserContextHolder.getContext().id, action, orgId);
        OrganizationChange change = new OrganizationChange(action, orgId, UserContextHolder.getContext().id);
        source.output().send(MessageBuilder.withPayload(change).build());
    }
}

這里使用的是默認(rèn)通道,Source 類定義的 output 通道發(fā)消息。后面通過 Sink 定義的 input 通道收消息。

??然后在OrganizationController類中定義一個(gè) delete 方法,并注入 SimpleSouce 類,代碼如下:

@Autowired
private SimpleSource simpleSource;

@DeleteMapping(value = "/organization/{orgId}")
public void deleteOne(@PathVariable("orgId") String id) {
    logger.debug("刪除了組織:{}", id);
    simpleSource.publishOrChange("delete", id);
}

??最后在配置文件中加入消息隊(duì)列的配置:

# 省略了其他配置
spring:
  cloud:
    stream:
      bindings:
        output:
          destination: orgChangeTopic
          content-type: application/json
      kafka:
        binder:
          # 替換為部署kafka的ip和端口
          zk-nodes: 192.168.226.5:2181
          brokers: 192.168.226.5:9092

??現(xiàn)在我們可以測(cè)試下訪問localhost:5555/apis/org/organization/12,可以看到控制臺(tái)打印消息生成的日志。

在許可證服務(wù)中編寫消息消費(fèi)者

??集成 redis 的方法,參看[]()。這里不作說明。

??首先引入依賴,依賴項(xiàng)同上面組織服務(wù)。

??然后在 event 包下創(chuàng)建OrgChange的類,代碼如下:

@EnableBinding(Sink.class) //使用Sink接口中定義的通道來監(jiān)聽傳入消息
public class OrgChange {

    private Logger logger = LoggerFactory.getLogger(OrgChange.class);

    @StreamListener(Sink.INPUT)
    public void loggerSink(OrganizationChange change){
        logger.info("收到一個(gè)消息,組織id為:{},關(guān)聯(lián)id為:{}",change.getOrgId(),change.getId());
        //刪除失效緩存
        RedisUtils.del(RedisKeyUtils.getOrgCacheKey(change.getOrgId()));
    }
}

//下面兩個(gè)都在util包下
//RedisKeyUtils.java代碼如下
public class RedisKeyUtils {

    private static final String  ORG_CACHE_PREFIX = "orgCache_";

    public static String getOrgCacheKey(String orgId){
        return ORG_CACHE_PREFIX+orgId;
    }
}

//RedisUtils.java代碼如下
@Component
@SuppressWarnings("all")
public class RedisUtils {

    public static RedisTemplate redisTemplate;

    @Autowired
    public void setRedisTemplate(RedisTemplate redisTemplate) {
        RedisUtils.redisTemplate = redisTemplate;
    }

    public static boolean setObj(String key,Object value){
        return setObj(key,value,0);
    }

    /**
     * Description:
     *
     * @author fanxb
     * @date 2019/2/21 15:21
     * @param key 鍵
     * @param value 值
     * @param time 過期時(shí)間,單位ms
     * @return boolean 是否成功
     */
    public static boolean setObj(String key,Object value,long time){
        try{
            if(time<=0){
                redisTemplate.opsForValue().set(key,value);
            }else{
                redisTemplate.opsForValue().set(key,value,time,TimeUnit.MILLISECONDS);
            }
            return true;
        }catch (Exception e){
            e.printStackTrace();
            return false;
        }
    }

    public static Object get(String key){
        if(key==null){
            return null;
        }
        try{
            Object obj = redisTemplate.opsForValue().get(key);
            return obj;
        }catch (Exception e){
            e.printStackTrace();
            return null;
        }
    }

    public static void del(String... key){
        if(key!=null && key.length>0){
            redisTemplate.delete(CollectionUtils.arrayToList(key));
        }
    }
}

??上面用到的是 Sink.INPUT 通道,這個(gè)和之前的 Source.OUTPUT 通道剛好一隊(duì),一個(gè)負(fù)責(zé)收,一個(gè)負(fù)責(zé)發(fā)。

??然后修改OrganizationByRibbonService.java文件中的getOrganizationWithRibbon方法:

    public Organization getOrganizationWithRibbon(String id) {
        String key = RedisKeyUtils.getOrgCacheKey(id);
        //先從redis緩存取數(shù)據(jù)
        Object res = RedisUtils.get(key);
        if (res == null) {
            logger.info("當(dāng)前數(shù)據(jù)無緩存:{}", id);
            try{

            ResponseEntity responseEntity = restTemplate.exchange("http://organizationservice/organization/{id}",
                    HttpMethod.GET, null, Organization.class, id);
            res = responseEntity.getBody();
            RedisUtils.setObj(key, res);
            }catch (Exception e){
                e.printStackTrace();
            }
        } else {
            logger.info("當(dāng)前數(shù)據(jù)為緩存數(shù)據(jù):{}", id);
        }
        return (Organization) res;
    }

??最后修改配置文件,為 input 通道指定 topic,配置如下:

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: orgChangeTopic
          content-type: application/json
          # 定義將要消費(fèi)消息的消費(fèi)者組的名稱
          # 可能多個(gè)服務(wù)監(jiān)聽同一個(gè)消息隊(duì)列。如果定義了消費(fèi)者組,那么同組中只要有一個(gè)消費(fèi)了消息,剩余的不會(huì)再次消費(fèi)該消息,保證只有消息的
          # 一個(gè)副本會(huì)被該組的某個(gè)實(shí)例所消費(fèi)
          group: licensingGroup
      kafka:
        binder:
          zk-nodes: 192.168.226.5:2181
          brokers: 192.168.226.5:9092

基本和發(fā)送的配置相同,只是這里是為input通道映射隊(duì)列,然后還定義了一個(gè)組名,避免一個(gè)消息被重復(fù)消費(fèi)。

??現(xiàn)在來多次訪問localhost:5555/apis/licensingservice/licensingByRibbon/12,可以看到 licensingservice 控制臺(tái)打印數(shù)據(jù)從緩存中讀取,如下所示:

然后再以 delete 訪問localhost:5555/apis/org/organization/12清除緩存,再次訪問 licensingservice 服務(wù),結(jié)果如下:

自定義通道

??上面用的是Spring Cloud Stream自帶的 input/output 通道,那么要如何自定義通道呢?下面以自定義customInput/customOutput通道為例。

自定義發(fā)數(shù)據(jù)通道
public interface CustomOutput {
    @Output("customOutput")
    MessageChannel out();
}

??對(duì)于每個(gè)自定義的發(fā)數(shù)據(jù)通道,需使用@OutPut 注解標(biāo)記的返回 MessageChannel 類的方法。

自定義收數(shù)據(jù)通道
public interface CustomInput {

    @Input("customInput")
    SubscribableChannel in();
}

??同上,對(duì)應(yīng)自定義的收數(shù)據(jù)通道,需要使用@Input 注解標(biāo)記的返回 SubscribableChannel 類的方法。

結(jié)束

??看完本篇你應(yīng)該已經(jīng)能夠在 Spring Cloud 中集成 Spring Cloud Stream 消息隊(duì)列了,貌似這個(gè)也能用到普通的 spring boot 項(xiàng)目中,比直接集成 mq 更加的優(yōu)雅。

2019,Fighting!

本篇原創(chuàng)發(fā)布于:FleyX 的個(gè)人博客

本篇所用全部代碼:FleyX 的 github

掃碼關(guān)注微信公眾號(hào):FleyX 學(xué)習(xí)筆記,獲取更多干貨

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/77850.html

相關(guān)文章

  • SpringCloud打造微服務(wù)平臺(tái)--概覽

    摘要:授權(quán)框架使第三方應(yīng)用程序來獲取對(duì)服務(wù)的有限訪問機(jī)會(huì)。無論是通過編排資源所有者和服務(wù)之間的交互批準(zhǔn)的資源所有者,或通過允許第三方應(yīng)用程序來獲取自己的訪問權(quán)限。 SpringCloud打造微服務(wù)平臺(tái)--概覽 簡(jiǎn)述 SpringCloud是什么 Spring Boot和SpringCloud是什么關(guān)系 Spring Boot是Spring的一套快速WEB開發(fā)的腳手架,可建立獨(dú)立的Sprin...

    siberiawolf 評(píng)論0 收藏0
  • 架構(gòu)~微服務(wù) - 收藏集 - 掘金

    摘要:它就是史上最簡(jiǎn)單的教程第三篇服務(wù)消費(fèi)者后端掘金上一篇文章,講述了通過去消費(fèi)服務(wù),這篇文章主要講述通過去消費(fèi)服務(wù)。概覽和架構(gòu)設(shè)計(jì)掘金技術(shù)征文后端掘金是基于的一整套實(shí)現(xiàn)微服務(wù)的框架。 Spring Boot 配置文件 – 在坑中實(shí)踐 - 后端 - 掘金作者:泥瓦匠鏈接:Spring Boot 配置文件 – 在坑中實(shí)踐版權(quán)歸作者所有,轉(zhuǎn)載請(qǐng)注明出處本文提綱一、自動(dòng)配置二、自定義屬性三、ran...

    church 評(píng)論0 收藏0
  • 整理一下學(xué)習(xí)微服務(wù)springboot+springcloud+vue以來用到的好的博客

    摘要:調(diào)用百度實(shí)現(xiàn)圖像識(shí)別使用渲染導(dǎo)出的制作的超級(jí)炫酷的三維模型一個(gè)代碼庫(kù)本人本人瀏覽器調(diào)試及所有錯(cuò)誤代碼整合千峰超級(jí)好用的各種開發(fā)自學(xué)文檔這是它對(duì)應(yīng)的學(xué)習(xí)視頻使用教程詳細(xì)虛擬機(jī)安裝系統(tǒng)詳解版網(wǎng)易開源鏡像站在線數(shù)據(jù)互轉(zhuǎn)使 1.Java調(diào)用百度API實(shí)現(xiàn)圖像識(shí)別 2.使用Three.js渲染Sketchup導(dǎo)出的dae 3.three.js制作的超級(jí)炫酷的三維模型 4.three.js - 一...

    gitmilk 評(píng)論0 收藏0
  • 整理一下學(xué)習(xí)微服務(wù)springboot+springcloud+vue以來用到的好的博客

    摘要:調(diào)用百度實(shí)現(xiàn)圖像識(shí)別使用渲染導(dǎo)出的制作的超級(jí)炫酷的三維模型一個(gè)代碼庫(kù)本人本人瀏覽器調(diào)試及所有錯(cuò)誤代碼整合千峰超級(jí)好用的各種開發(fā)自學(xué)文檔這是它對(duì)應(yīng)的學(xué)習(xí)視頻使用教程詳細(xì)虛擬機(jī)安裝系統(tǒng)詳解版網(wǎng)易開源鏡像站在線數(shù)據(jù)互轉(zhuǎn)使 1.Java調(diào)用百度API實(shí)現(xiàn)圖像識(shí)別 2.使用Three.js渲染Sketchup導(dǎo)出的dae 3.three.js制作的超級(jí)炫酷的三維模型 4.three.js - 一...

    bluesky 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<