摘要:可以參考源碼,項(xiàng)目支持網(wǎng)站,最新文章或?qū)崿F(xiàn)會(huì)更新在上面前言在訂閱發(fā)布中我們建立了一個(gè)簡單的日志系統(tǒng),從而將消息廣播給一些消費(fèi)者。因此,發(fā)送到路由鍵的消息會(huì)發(fā)送給隊(duì)列,發(fā)送到路由鍵或者的消息會(huì)發(fā)送給,其它的消息將被丟棄。
推廣
https://segmentfault.com/l/15...
我們利用消息隊(duì)列實(shí)現(xiàn)了分布式事務(wù)的最終一致性解決方案,請大家圍觀??梢詤⒖荚创a:https://github.com/vvsuperman…,項(xiàng)目支持網(wǎng)站: http://rabbitmq.org.cn,最新文章或?qū)崿F(xiàn)會(huì)更新在上面
前言在訂閱/發(fā)布中我們建立了一個(gè)簡單的日志系統(tǒng),從而將log消息廣播給一些消費(fèi)者。這章我們會(huì)在此基礎(chǔ)上加入一些新的特性-我們將有針對性的進(jìn)行消息分發(fā),比如,只把錯(cuò)誤(error)消息保存到磁盤,與此同時(shí),打印出所有的消息。
綁定我們在前面的例子中,綁定是這么來做的
channel.queueBind(queueName, EXCHANGE_NAME, "");
綁定是建立交換機(jī)和隊(duì)列之間的一種聯(lián)系:隊(duì)列會(huì)接受交換機(jī)中的消息。綁定可以用一個(gè)路由鍵來指明,為了與basic_publish區(qū)分開,我們稱之為綁定鍵(binding key):
channel.queueBind(queueName, EXCHANGE_NAME, "black");
綁定鍵跟路由器類型也有關(guān)系,我們之前用的廣播路由器,會(huì)忽略掉這個(gè)值
直達(dá)交換機(jī)(Direct Exchange)之前我們用的是廣播交換機(jī),會(huì)將消息發(fā)送給所有的消費(fèi)者。這里我們希望通過log的嚴(yán)重程度進(jìn)行過濾,例如只有嚴(yán)重的錯(cuò)誤才會(huì)寫入到磁盤,而warn和info消息就不用了,以此來節(jié)省磁盤空間
而廣播交換機(jī)沒法滿足這個(gè)需求-它只是無腦的發(fā)送消息。所以我們會(huì)使用直達(dá)交換機(jī)(Direct Exchange)- 消息會(huì)通過所綁定的鍵來發(fā)送給對應(yīng)的隊(duì)列,可以看如下這幅圖
如上圖所示,直達(dá)交換機(jī)X綁定了兩個(gè)隊(duì)列,C1是通過orange來綁定,而C2是通過black和green綁定。因此,發(fā)送到路由鍵orange的消息會(huì)發(fā)送給隊(duì)列Q1,發(fā)送到路由鍵black或者green的消息會(huì)發(fā)送給Q2,其它的消息將被丟棄。
多項(xiàng)綁定
當(dāng)然,多個(gè)隊(duì)列綁定到一個(gè)鍵上也是合法的,在這種情況下,直達(dá)交換機(jī)將會(huì)將消息發(fā)送給所有的隊(duì)列,就像廣播交換機(jī)一樣,如上圖所示,一個(gè)鍵為black的消息將會(huì)同時(shí)被發(fā)送給C1和C2.
我們首先需要?jiǎng)?chuàng)建一個(gè)直達(dá)路由器
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
并發(fā)送消息到這個(gè)路由器
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
上面我們是發(fā)送給"severity",簡單起見,假設(shè)有下列幾種日志類型"severity" ,"info", "warning", "error".
訂閱消息(Subscribing)接受消息跟之前一樣,但有一點(diǎn)不同,我們提供了一個(gè)binding key,
String queueName = channel.queueDeclare().getQueue(); for(String severity : argv){ channel.queueBind(queueName, EXCHANGE_NAME, severity); }整合
將上面的所有代碼整合到一起
EmitLogDirect.java
import com.rabbitmq.client.*; import java.io.IOException; public class EmitLogDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws java.io.IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String severity = getSeverity(argv); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes()); System.out.println(" [x] Sent "" + severity + "":"" + message + """); channel.close(); connection.close(); } //.. }
ReceiveLogsDirect.java
import com.rabbitmq.client.*; import java.io.IOException; public class ReceiveLogsDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String queueName = channel.queueDeclare().getQueue(); if (argv.length < 1){ System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]"); System.exit(1); } for(String severity : argv){ channel.queueBind(queueName, EXCHANGE_NAME, severity); } System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received "" + envelope.getRoutingKey() + "":"" + message + """); } }; channel.basicConsume(queueName, true, consumer); } }
編譯
javac -cp $CP ReceiveLogsDirect.java EmitLogDirect.java
只保存warning和error的消息到磁盤上
java -cp $CP ReceiveLogsDirect warning error > logs_from_rabbit.log
將所有的消息打印到頻幕上
java -cp $CP ReceiveLogsDirect info warning error # => [*] Waiting for logs. To exit press CTRL+C
最后,發(fā)送error消息
java -cp $CP EmitLogDirect error "Run. Run. Or it will explode." # => [x] Sent "error":"Run. Run. Or it will explode."
好了,這一章就到這兒,下一章我們將講述如何基于特定模式進(jìn)行監(jiān)聽
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/68120.html
摘要:推廣專題講座開源項(xiàng)目我們利用消息隊(duì)列實(shí)現(xiàn)了分布式事務(wù)的最終一致性解決方案,請大家圍觀。主題交換機(jī)也可以當(dāng)成其它交換機(jī)來使用,假如隊(duì)列綁定到了那么它會(huì)接收所有的消息,就像廣播路由器一樣而如果未使用,那么就跟直達(dá)路由器一樣了。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項(xiàng)目 我們利用消息隊(duì)列實(shí)現(xiàn)了分布式事務(wù)的最終一致性...
摘要:推廣專題講座開源項(xiàng)目我們利用消息隊(duì)列實(shí)現(xiàn)了分布式事務(wù)的最終一致性解決方案,請大家圍觀。因此一旦有有消息,消息會(huì)廣播到所有的消費(fèi)者。如此一來路由器就能夠把消息發(fā)送給相應(yīng)的隊(duì)列了。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項(xiàng)目 我們利用消息隊(duì)列實(shí)現(xiàn)了分布式事務(wù)的最終一致性解決方案,請大家圍觀??梢詤⒖荚创a:https...
摘要:作為消息隊(duì)列的一個(gè)典型實(shí)踐,完全實(shí)現(xiàn)了標(biāo)準(zhǔn),與的快快快不同,它追求的穩(wěn)定可靠。同一個(gè)隊(duì)列不僅可以綁定多個(gè)生產(chǎn)者,而且能夠發(fā)送消息到多個(gè)消費(fèi)者。消費(fèi)者接受并消費(fèi)消息。幾乎于完全類似是一個(gè)繼承了接口的類,方便我們來存儲(chǔ)消息隊(duì)列來的消息。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項(xiàng)目 我們利用消息隊(duì)列實(shí)現(xiàn)了分布式事務(wù)的...
摘要:因?yàn)橄M(fèi)消息是在另外一個(gè)進(jìn)程中,我們需要阻塞我們的進(jìn)程直到結(jié)果返回,使用阻塞隊(duì)列是一種非常好的方式,這里我們使用了長度為的,的功能是檢查消息的的是不是我們之前所發(fā)送的,如果是,將返回值返回到。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項(xiàng)目 我們利用消息隊(duì)列實(shí)現(xiàn)了分布式事務(wù)的最終一致性解決方案,請大家圍觀??梢詤⒖?..
摘要:路由模式在之前的文章中我們建立了一個(gè)簡單的日志系統(tǒng)。更形象的表示,如對中的感興趣。為了進(jìn)行說明,像下圖這么來設(shè)置如圖,可以看到有兩個(gè)綁到了類型為的上。如圖的設(shè)置中,一個(gè)為的就會(huì)同時(shí)發(fā)送到和。接收程序可以選擇要接收日志的嚴(yán)重性級別。 路由模式 在之前的文章中我們建立了一個(gè)簡單的日志系統(tǒng)。我們可以通過這個(gè)系統(tǒng)將日志message廣播給很多接收者。 在本篇文章中,我們在這之上,添加一個(gè)新的功...
閱讀 1276·2021-10-18 13:32
閱讀 2355·2021-09-24 09:47
閱讀 1336·2021-09-23 11:22
閱讀 2473·2019-08-30 14:06
閱讀 579·2019-08-30 12:48
閱讀 2010·2019-08-30 11:03
閱讀 546·2019-08-29 17:09
閱讀 2473·2019-08-29 14:10