摘要:它通過使用來連接消息代理中間件以實現(xiàn)消息事件驅(qū)動的微服務(wù)應(yīng)用。該示例主要目標是構(gòu)建一個基于的微服務(wù)應(yīng)用,這個微服務(wù)應(yīng)用將通過使用消息中間件來接收消息并將消息打印到日志中。下面我們通過編寫生產(chǎn)消息的單元測試用例來完善我們的入門內(nèi)容。
之前在寫Spring Boot基礎(chǔ)教程的時候?qū)戇^一篇《Spring Boot中使用RabbitMQ》。在該文中,我們通過簡單的配置和注解就能實現(xiàn)向RabbitMQ中生產(chǎn)和消費消息。實際上我們使用的對RabbitMQ的starter就是通過Spring Cloud Stream中對RabbitMQ的支持來實現(xiàn)的。下面我們就通過本文來了解一下Spring Cloud Stream。
Spring Cloud Stream是一個用來為微服務(wù)應(yīng)用構(gòu)建消息驅(qū)動能力的框架。它可以基于Spring Boot來創(chuàng)建獨立的、可用于生產(chǎn)的Spring應(yīng)用程序。它通過使用Spring Integration來連接消息代理中間件以實現(xiàn)消息事件驅(qū)動的微服務(wù)應(yīng)用。Spring Cloud Stream為一些供應(yīng)商的消息中間件產(chǎn)品提供了個性化的自動化配置實現(xiàn),并且引入了發(fā)布-訂閱、消費組以及消息分區(qū)這三個核心概念。簡單的說,Spring Cloud Stream本質(zhì)上就是整合了Spring Boot和Spring Integration,實現(xiàn)了一套輕量級的消息驅(qū)動的微服務(wù)框架。通過使用Spring Cloud Stream,可以有效地簡化開發(fā)人員對消息中間件的使用復(fù)雜度,讓系統(tǒng)開發(fā)人員可以有更多的精力關(guān)注于核心業(yè)務(wù)邏輯的處理。由于Spring Cloud Stream基于Spring Boot實現(xiàn),所以它秉承了Spring Boot的優(yōu)點,實現(xiàn)了自動化配置的功能幫忙我們可以快速的上手使用,但是目前為止Spring Cloud Stream只支持下面兩個著名的消息中間件的自動化配置:
RabbitMQ
Kafka
快速入門下面我們通過構(gòu)建一個簡單的示例來對Spring Cloud Stream有一個初步認識。該示例主要目標是構(gòu)建一個基于Spring Boot的微服務(wù)應(yīng)用,這個微服務(wù)應(yīng)用將通過使用消息中間件RabbitMQ來接收消息并將消息打印到日志中。所以,在進行下面步驟之前請先確認已經(jīng)在本地安裝了RabbitMQ,具體安裝步驟請參考此文。
構(gòu)建一個Spring Cloud Stream消費者創(chuàng)建一個基礎(chǔ)的Spring Boot工程,命名為:stream-hello
編輯pom.xml中的依賴關(guān)系,引入Spring Cloud Stream對RabbitMQ的支持,具體如下:
org.springframework.boot spring-boot-starter-parent 1.5.9.RELEASE org.springframework.boot spring-boot-starter-test test org.springframework.cloud spring-cloud-starter-stream-rabbit org.springframework.cloud spring-cloud-dependencies Dalston.SR4 pom import
創(chuàng)建用于接收來自RabbitMQ消息的消費者SinkReceiver,具體如下:
@EnableBinding(Sink.class) public class SinkReceiver { private static Logger logger = LoggerFactory.getLogger(SinkReceiver.class); @StreamListener(Sink.INPUT) public void receive(Object payload) { logger.info("Received: " + payload); } }
創(chuàng)建應(yīng)用主類,這里同其他Spring Boot一樣,沒有什么特別之處,具體如下:
@SpringBootApplication public class SinkApplication { public static void main(String[] args) { SpringApplication.run(SinkApplication.class, args); } }
到這里,我們快速入門示例的編碼任務(wù)就已經(jīng)完成了。下面我們分別啟動RabbitMQ以及該Spring Boot應(yīng)用,然后做下面的試驗,看看它們是如何運作的。
手工測試驗證我們先來看一下Spring Boot應(yīng)用的啟動日志。
... INFO 16272 --- [main] o.s.c.s.b.r.RabbitMessageChannelBinder : declaring queue for inbound: input.anonymous.Y8VsFILmSC27eS5StsXp6A, bound to: input INFO 16272 --- [main] o.s.a.r.c.CachingConnectionFactory : Created new connection: SimpleConnection@3c78e551 [delegate=amqp://[email protected]:5672/] INFO 16272 --- [main] o.s.integration.channel.DirectChannel : Channel "input.anonymous.Y8VsFILmSC27eS5StsXp6A.bridge" has 1 subscriber(s). INFO 16272 --- [main] o.s.i.a.i.AmqpInboundChannelAdapter : started inbound.input.anonymous.Y8VsFILmSC27eS5StsXp6A ...
從上面的日志內(nèi)容中,我們可以獲得以下信息:
使用guest用戶創(chuàng)建了一個指向127.0.0.1:5672位置的RabbitMQ連接,在RabbitMQ的控制臺中我們也可以發(fā)現(xiàn)它。
聲明了一個名為input.anonymous.Y8VsFILmSC27eS5StsXp6A的隊列,并通過RabbitMessageChannelBinder將自己綁定為它的消費者。這些信息我們也能在RabbitMQ的控制臺中發(fā)現(xiàn)它們。
下面我們可以在RabbitMQ的控制臺中進入input.anonymous.Y8VsFILmSC27eS5StsXp6A隊列的管理頁面,通過Publish Message功能來發(fā)送一條消息到該隊列中。
此時,我們可以在當前啟動的Spring Boot應(yīng)用程序的控制臺中看到下面的內(nèi)容:
INFO 16272 --- [C27eS5StsXp6A-1] com.didispace.HelloApplication : Received: [B@7cba610e
我們可以發(fā)現(xiàn)在應(yīng)用控制臺中輸出的內(nèi)容就是SinkReceiver中receive方法定義的,而輸出的具體內(nèi)容則是來自消息隊列中獲取的對象。這里由于我們沒有對消息進行序列化,所以輸出的只是該對象的引用,在后面的小節(jié)中我們會詳細介紹接收消息后的處理。
在順利完成上面快速入門的示例后,我們簡單解釋一下上面的步驟是如何將我們的Spring Boot應(yīng)用連接上RabbitMQ來消費消息以實現(xiàn)消息驅(qū)動業(yè)務(wù)邏輯的。
首先,我們對Spring Boot應(yīng)用做的就是引入spring-cloud-starter-stream-rabbit依賴,該依賴包是Spring Cloud Stream對RabbitMQ支持的封裝,其中包含了對RabbitMQ的自動化配置等內(nèi)容。從下面它定義的依賴關(guān)系中,我們還可以知道它等價于spring-cloud-stream-binder-rabbit依賴。
org.springframework.cloud spring-cloud-stream-binder-rabbit
接著,我們再來看看這里用到的幾個Spring Cloud Stream的核心注解,它們都被定義在SinkReceiver中:
@EnableBinding,該注解用來指定一個或多個定義了@Input或@Output注解的接口,以此實現(xiàn)對消息通道(Channel)的綁定。在上面的例子中,我們通過@EnableBinding(Sink.class)綁定了Sink接口,該接口是Spring Cloud Stream中默認實現(xiàn)的對輸入消息通道綁定的定義,它的源碼如下:
public interface Sink { String INPUT = "input"; @Input(Sink.INPUT) SubscribableChannel input(); }
它通過@Input注解綁定了一個名為input的通道。除了Sink之外,Spring Cloud Stream還默認實現(xiàn)了綁定output通道的Source接口,還有結(jié)合了Sink和Source的Processor接口,實際使用時我們也可以自己通過@Input和@Output注解來定義綁定消息通道的接口。當我們需要為@EnableBinding指定多個接口來綁定消息通道的時候,可以這樣定義:@EnableBinding(value = {Sink.class, Source.class})。
@StreamListener:該注解主要定義在方法上,作用是將被修飾的方法注冊為消息中間件上數(shù)據(jù)流的事件監(jiān)聽器,注解中的屬性值對應(yīng)了監(jiān)聽的消息通道名。在上面的例子中,我們通過@StreamListener(Sink.INPUT)注解將receive方法注冊為對input消息通道的監(jiān)聽處理器,所以當我們在RabbitMQ的控制頁面中發(fā)布消息的時候,receive方法會做出對應(yīng)的響應(yīng)動作。
編寫消費消息的單元測試用例上面我們通過RabbitMQ的控制臺完成了發(fā)送消息來驗證了消息消費程序的功能,雖然這種方法比較low,但是通過上面的步驟,相信大家對RabbitMQ和Spring Cloud Stream的消息消費已經(jīng)有了一些基礎(chǔ)的認識。下面我們通過編寫生產(chǎn)消息的單元測試用例來完善我們的入門內(nèi)容。
在上面創(chuàng)建的工程中創(chuàng)建單元測試類:
@RunWith(SpringRunner.class) @EnableBinding(value = {SinkApplicationTests.SinkSender.class}) public class SinkApplicationTests { @Autowired private SinkSender sinkSender; @Test public void sinkSenderTester() { sinkSender.output().send(MessageBuilder.withPayload("produce a message :http://blog.didispace.com").build()); } public interface SinkSender { String OUTPUT = "input"; @Output(SinkSender.OUTPUT) MessageChannel output(); } }
在應(yīng)用了上面的消息消費者程序之后,運行這里定義的單元測試程序,我們馬上就能在消息消費者的控制臺中收到下面的內(nèi)容:
INFO 50947 --- [L2W-c2AcChb2Q-1] com.didispace.stream.SinkReceiver : Received: produce a message :http://blog.didispace.com
在上面的單元測試中,我們通過@Output(SinkSender.OUTPUT)定義了一個輸出通過,而該輸出通道的名稱為input,與前文中的Sink中定義的消費通道同名,所以這里的單元測試與前文的消費者程序組成了一對生產(chǎn)者與消費者。到這里,本文的內(nèi)容就次結(jié)束,如果您能夠獨立的完成上面的例子,那么對于Spring Cloud Stream的基礎(chǔ)使用算是入門了。但是,Spring Cloud Stream的使用遠不止于此,在近期的博文中,我講繼續(xù)更新這部分內(nèi)容,幫助他們來理解和用好Spring Cloud Stream來構(gòu)建消息驅(qū)動的微服務(wù)!
本文完整實例:
Github
Gitee
如果您對這些感興趣,歡迎star、follow、收藏、轉(zhuǎn)發(fā)給予支持!
本文內(nèi)容部分節(jié)選自我的《Spring Cloud微服務(wù)實戰(zhàn)》,但對依賴的Spring Boot和Spring Cloud版本做了升級。
本文首發(fā)于我的博客:http://blog.didispace.com系列教程推薦
Spring Boot基礎(chǔ)教程
Spring Cloud基礎(chǔ)教程
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/70870.html
摘要:微服務(wù)架構(gòu)概述應(yīng)用架構(gòu)的發(fā)展應(yīng)用是可獨立運行的程序代碼,提供相對完善的業(yè)務(wù)功能。阿里開源的是的典型實現(xiàn)。它目前由官方開發(fā)維護,基于開發(fā),提供一套完整的微服務(wù)解決方案。 微服務(wù)與Spring Cloud 隨著互聯(lián)網(wǎng)的快速發(fā)展, 云計算近十年也得到蓬勃發(fā)展, 企業(yè)的IT環(huán)境和IT架構(gòu)也逐漸在發(fā)生變革,從過去的單體應(yīng)用架構(gòu)發(fā)展為至今廣泛流行的微服務(wù)架構(gòu)。 微服務(wù)是一種架構(gòu)風(fēng)格, 能給軟件應(yīng)用...
摘要:屬性對應(yīng)服務(wù)注冊中心的配置內(nèi)容,指定服務(wù)注冊中心的位置。項目是針對的服務(wù)治理實現(xiàn)。下面可以嘗試讓的服務(wù)提供者運行起來。我們可以用下面的命令啟動的開發(fā)模式服務(wù)端啟動完成之后,我們再將之前改造后的服務(wù)提供者啟動起來。 已經(jīng)有非常長的時間沒有更新《Spring Cloud構(gòu)建微服務(wù)架構(gòu)》系列文章了,自從開始寫Spring Cloud的專題內(nèi)容開始就獲得了不少的閱讀量和認可,當然也有一些批評...
摘要:下面的例子,我們將利用上一篇中構(gòu)建的作為服務(wù)注冊中心作為服務(wù)提供者作為基礎(chǔ)。我們先來創(chuàng)建一個服務(wù)消費者工程,命名為。初始化,用來真正發(fā)起請求。注解用來將當前應(yīng)用加入到服務(wù)治理體系中。 通過上一篇《Spring Cloud構(gòu)建微服務(wù)架構(gòu):服務(wù)注冊與發(fā)現(xiàn)》,我們已經(jīng)成功地將服務(wù)提供者:eureka-client或consul-client注冊到了Eureka服務(wù)注冊中心或Consul服務(wù)端...
摘要:目前最新版本官網(wǎng)特性專注于提供良好的開箱即用經(jīng)驗的典型用例和可擴展性機制覆蓋。分布式消息隊列,是對的封裝。是對的封裝,能實現(xiàn)服務(wù)之間的認證調(diào)用和安全保護等,并能配合使用。不過隨著目前官方的重新申明維護并得到重視,生態(tài)圈也會逐漸強大。 簡介 Spring Cloud是一系列框架的有序集合。它利用Spring Boot的開發(fā)便利性巧妙地簡化了分布式系統(tǒng)基礎(chǔ)設(shè)施的開發(fā),如服務(wù)發(fā)現(xiàn)注冊、配置中...
摘要:而微服務(wù)架構(gòu)能否成功實踐,利用各種工具解決潛在問題是關(guān)鍵。因此,微服務(wù)本身可以通過庫和運行時代理解決客戶端服務(wù)發(fā)現(xiàn)負載均衡配置更新統(tǒng)計跟蹤等。與相比,解決了更廣的微服務(wù)架構(gòu)問題。和處理了不同范圍的微服務(wù)架構(gòu)技術(shù)點,而且是用了不同的方法。 Spring Cloud vs. Kubernetes,誰才是部署微服務(wù)的最佳拍檔? Spring Cloud和Kubernetes都聲稱自己是開發(fā)和...
閱讀 2483·2023-04-26 02:18
閱讀 1271·2021-10-14 09:43
閱讀 3840·2021-09-26 10:00
閱讀 6985·2021-09-22 15:28
閱讀 2550·2019-08-30 15:54
閱讀 2611·2019-08-30 15:52
閱讀 486·2019-08-29 11:30
閱讀 3475·2019-08-29 11:05