摘要:路由模式在之前的文章中我們建立了一個(gè)簡單的日志系統(tǒng)。更形象的表示,如對(duì)中的感興趣。為了進(jìn)行說明,像下圖這么來設(shè)置如圖,可以看到有兩個(gè)綁到了類型為的上。如圖的設(shè)置中,一個(gè)為的就會(huì)同時(shí)發(fā)送到和。接收程序可以選擇要接收日志的嚴(yán)重性級(jí)別。
路由模式
在之前的文章中我們建立了一個(gè)簡單的日志系統(tǒng)。我們可以通過這個(gè)系統(tǒng)將日志message廣播給很多接收者。
在本篇文章中,我們?cè)谶@之上,添加一個(gè)新的功能,即允許接收者訂閱message的一個(gè)子集。舉個(gè)例子,我們將日志分成多個(gè)級(jí)別,一個(gè)接收者接收錯(cuò)誤日志將之保存到磁盤,另一個(gè)接收者接收所有日志將之打印到控制臺(tái)。
Bindings在前面的章節(jié)中,我們已經(jīng)接觸過binding了,像下面的代碼這樣:
channel.queueBind(queueName,EXCHANGE_NAME,"");
binding將exchange和queue關(guān)聯(lián)在了一起。更形象的表示,如:queue對(duì)exchange中的message感興趣。
bindings可以攜帶一個(gè)routingKey參數(shù)。為了避免和basic_publish的參數(shù)弄混,我們稱之它為binding_key.我們像下面這樣創(chuàng)建一個(gè)binding
channel.queueBind(queueName,EXCHANGE_NAME,"black");
binding key的作用要看exchange的類型,對(duì)于fanout類型的exchange,binding key是直接忽略的。
Direct Exchange在之前的日志系統(tǒng)中,message會(huì)推送到所有的消費(fèi)者去。我們想讓系統(tǒng)依據(jù)message的日志級(jí)別進(jìn)行過濾。比如一個(gè)消費(fèi)者只接收嚴(yán)重級(jí)別的日志。
fanout無法幫我們實(shí)現(xiàn)這樣的功能,它只是無腦的進(jìn)行廣播。
我們使用direct類型的exchange,它的路由算法是非常簡單的 - 只要message的routing_key和bind的binding_key相同即進(jìn)行轉(zhuǎn)發(fā)。
為了進(jìn)行說明,像下圖這么來設(shè)置
如圖,可以看到有兩個(gè)queue綁到了類型為direct的exchange上。第一個(gè)queue綁定用了orange這個(gè)binding key,第二個(gè)則用了black和green兩個(gè)binding key。
那么結(jié)果就是有routing key為orange的message路由到了Q1.而routing key為black和green的message則路由到了Q2,其他的消息則被丟棄了。
Multiple Bindings
若使用相同的binding key將多個(gè)queue綁定到exchange上,就和fanout的行為一樣了,message會(huì)廣播到binding key相同的queue去。如圖的設(shè)置中,一個(gè)routing key為black的message就會(huì)同時(shí)發(fā)送到Q1和Q2。
我們將在我們的日志系統(tǒng)上應(yīng)用這個(gè)模型,使用direct類型的exchange去替代fanout類型的exchange。提供日志的嚴(yán)重性作為routing key。接收程序可以選擇要接收日志的嚴(yán)重性級(jí)別。
首先我們創(chuàng)建exchange
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
然后就是發(fā)送message
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
我們先假設(shè)severity取值 info | warning | error
Subscribing接收message和上一章沒什么區(qū)別,只是需要給各個(gè)severity創(chuàng)建新的binding。
String queueName = channel.queueDeclare().getQueue(); for(String severity : argv){ channel.queueBind(queueName, EXCHANGE_NAME, severity); }開始執(zhí)行
EmitLogDirect.java代碼如下
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class EmitLogDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String severity = getSeverity(argv); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8")); System.out.println(" [x] Sent "" + severity + "":"" + message + """); } } //.. }
ReceiveLogsDirect.java代碼如下:
import com.rabbitmq.client.*; public class ReceiveLogsDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String queueName = channel.queueDeclare().getQueue(); if (argv.length < 1) { System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]"); System.exit(1); } for (String severity : argv) { channel.queueBind(queueName, EXCHANGE_NAME, severity); } System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received "" + delivery.getEnvelope().getRoutingKey() + "":"" + message + """); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }
編譯代碼
javac -cp $CP ReceiveLogsDirect.java EmitLogDirect.java
如果想把warning和error的日志保存到文件去,那么
java -cp $CP ReceiveLogsDirect warning error > logs_from_rabbit.log
如果想把所有的日志打印到控制臺(tái),那么
java -cp $CP ReceiveLogsDirect info warning error
發(fā)送error日志
java -cp $CP EmitLogDirect error "Run.Run. Or it will explode"
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/73892.html
摘要:主題模式在上一章我們改進(jìn)了我們的日志系統(tǒng),如果使用我們只能簡單進(jìn)行廣播,而使用則允許消費(fèi)者可以進(jìn)行一定程度的選擇。為的會(huì)同時(shí)發(fā)布到這兩個(gè)。當(dāng)為時(shí),會(huì)接收所有的。當(dāng)中沒有使用通配符和時(shí),的行為和一致。 主題模式 在上一章我們改進(jìn)了我們的日志系統(tǒng),如果使用fanout我們只能簡單進(jìn)行廣播,而使用direct則允許消費(fèi)者可以進(jìn)行一定程度的選擇。但是direct還是有其局限性,其路由不支持多個(gè)...
摘要:發(fā)布訂閱模式在之前的文章里,創(chuàng)建了。我們稱之為發(fā)布訂閱模式。其實(shí)我們是用到了默認(rèn)的,用空字符串來標(biāo)識(shí)??兆址砹藳]有名字的被路由到了由指定名字的。和這種關(guān)系的建立我們稱之為從現(xiàn)在開始這個(gè)就會(huì)將推向我們的隊(duì)列了。 發(fā)布訂閱模式 在之前的文章里,創(chuàng)建了work queue。work queue中,每一個(gè)task都會(huì)派發(fā)給一個(gè)worker。在本章中,我們會(huì)完成完全不一樣的事情 - 我們會(huì)...
摘要:每個(gè)消費(fèi)者會(huì)得到平均數(shù)量的。為了確保不會(huì)丟失,采用確認(rèn)機(jī)制。如果中斷退出了關(guān)閉了,關(guān)閉了,或是連接丟失了而沒有發(fā)送,會(huì)認(rèn)為該消息沒有完整的執(zhí)行,會(huì)將該消息重新入隊(duì)。該消息會(huì)被發(fā)送給其他的。當(dāng)消費(fèi)者中斷退出,會(huì)重新分派。 Work模式 原文地址showImg(https://segmentfault.com/img/bVbqlXr?w=694&h=252); 在第一章中,我們寫了通過一個(gè)...
摘要:如果涉及返回值,就要用到本章提到的了。方法發(fā)送請(qǐng)求,并阻塞知道結(jié)果返回。當(dāng)有消息時(shí),進(jìn)行計(jì)算并通過指定的發(fā)送給客戶端。當(dāng)接收到,則檢查。如果和之前的匹配,則將消息返回給應(yīng)用進(jìn)行處理。 RPC模式 在第二章中我們學(xué)習(xí)了如何使用Work模式在多個(gè)worker之間派發(fā)時(shí)間敏感的任務(wù)。這種情況是不涉及到返回值的,worker執(zhí)行任務(wù)就好。如果涉及返回值,就要用到本章提到的RPC(Remote ...
摘要:生產(chǎn)者只能把消息發(fā)到交換器。是否要追加到一個(gè)特殊的隊(duì)列是否要追加到許多的隊(duì)列或者丟掉這條消息這些規(guī)則被定義為交換類型。有一點(diǎn)很關(guān)鍵,向不存在的交換器發(fā)布消息是被禁止的。如果仍然沒有隊(duì)列綁定交換器,消息會(huì)丟失。 發(fā)布與訂閱 (Publish/Subscribe) 在之前的章節(jié)中,我們創(chuàng)建了工作隊(duì)列,之前的工作隊(duì)列的假設(shè)是每個(gè)任務(wù)只被分發(fā)到一個(gè)worker。在這一節(jié)中,我們會(huì)做一些完全不一...
閱讀 1630·2021-11-11 10:59
閱讀 2640·2021-09-04 16:40
閱讀 3675·2021-09-04 16:40
閱讀 2996·2021-07-30 15:30
閱讀 1671·2021-07-26 22:03
閱讀 3174·2019-08-30 13:20
閱讀 2238·2019-08-29 18:31
閱讀 450·2019-08-29 12:21