成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

【譯】RabbitMQ系列(四) - 路由模式

liuchengxu / 427人閱讀

摘要:路由模式在之前的文章中我們建立了一個(gè)簡單的日志系統(tǒng)。更形象的表示,如對(duì)中的感興趣。為了進(jìn)行說明,像下圖這么來設(shè)置如圖,可以看到有兩個(gè)綁到了類型為的上。如圖的設(shè)置中,一個(gè)為的就會(huì)同時(shí)發(fā)送到和。接收程序可以選擇要接收日志的嚴(yán)重性級(jí)別。

路由模式

在之前的文章中我們建立了一個(gè)簡單的日志系統(tǒng)。我們可以通過這個(gè)系統(tǒng)將日志message廣播給很多接收者。

在本篇文章中,我們?cè)谶@之上,添加一個(gè)新的功能,即允許接收者訂閱message的一個(gè)子集。舉個(gè)例子,我們將日志分成多個(gè)級(jí)別,一個(gè)接收者接收錯(cuò)誤日志將之保存到磁盤,另一個(gè)接收者接收所有日志將之打印到控制臺(tái)。

Bindings

在前面的章節(jié)中,我們已經(jīng)接觸過binding了,像下面的代碼這樣:

channel.queueBind(queueName,EXCHANGE_NAME,"");

binding將exchange和queue關(guān)聯(lián)在了一起。更形象的表示,如:queue對(duì)exchange中的message感興趣。

bindings可以攜帶一個(gè)routingKey參數(shù)。為了避免和basic_publish的參數(shù)弄混,我們稱之它為binding_key.我們像下面這樣創(chuàng)建一個(gè)binding

channel.queueBind(queueName,EXCHANGE_NAME,"black");

binding key的作用要看exchange的類型,對(duì)于fanout類型的exchange,binding key是直接忽略的。

Direct Exchange

在之前的日志系統(tǒng)中,message會(huì)推送到所有的消費(fèi)者去。我們想讓系統(tǒng)依據(jù)message的日志級(jí)別進(jìn)行過濾。比如一個(gè)消費(fèi)者只接收嚴(yán)重級(jí)別的日志。

fanout無法幫我們實(shí)現(xiàn)這樣的功能,它只是無腦的進(jìn)行廣播。

我們使用direct類型的exchange,它的路由算法是非常簡單的 - 只要message的routing_key和bind的binding_key相同即進(jìn)行轉(zhuǎn)發(fā)。

為了進(jìn)行說明,像下圖這么來設(shè)置

如圖,可以看到有兩個(gè)queue綁到了類型為direct的exchange上。第一個(gè)queue綁定用了orange這個(gè)binding key,第二個(gè)則用了black和green兩個(gè)binding key。

那么結(jié)果就是有routing key為orange的message路由到了Q1.而routing key為black和green的message則路由到了Q2,其他的消息則被丟棄了。

Multiple Bindings


若使用相同的binding key將多個(gè)queue綁定到exchange上,就和fanout的行為一樣了,message會(huì)廣播到binding key相同的queue去。如圖的設(shè)置中,一個(gè)routing key為black的message就會(huì)同時(shí)發(fā)送到Q1和Q2。

Emitting logs

我們將在我們的日志系統(tǒng)上應(yīng)用這個(gè)模型,使用direct類型的exchange去替代fanout類型的exchange。提供日志的嚴(yán)重性作為routing key。接收程序可以選擇要接收日志的嚴(yán)重性級(jí)別。
首先我們創(chuàng)建exchange

channel.exchangeDeclare(EXCHANGE_NAME, "direct");

然后就是發(fā)送message

channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());

我們先假設(shè)severity取值 info | warning | error

Subscribing

接收message和上一章沒什么區(qū)別,只是需要給各個(gè)severity創(chuàng)建新的binding。

String queueName = channel.queueDeclare().getQueue();

for(String severity : argv){
  channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
開始執(zhí)行

EmitLogDirect.java代碼如下

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class EmitLogDirect {

  private static final String EXCHANGE_NAME = "direct_logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    try (Connection connection = factory.newConnection();
         Channel channel = connection.createChannel()) {
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        String severity = getSeverity(argv);
        String message = getMessage(argv);

        channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
        System.out.println(" [x] Sent "" + severity + "":"" + message + """);
    }
  }
  //..
}

ReceiveLogsDirect.java代碼如下:

import com.rabbitmq.client.*;

public class ReceiveLogsDirect {

  private static final String EXCHANGE_NAME = "direct_logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "direct");
    String queueName = channel.queueDeclare().getQueue();

    if (argv.length < 1) {
        System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
        System.exit(1);
    }

    for (String severity : argv) {
        channel.queueBind(queueName, EXCHANGE_NAME, severity);
    }
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), "UTF-8");
        System.out.println(" [x] Received "" +
            delivery.getEnvelope().getRoutingKey() + "":"" + message + """);
    };
    channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
  }
}

編譯代碼

javac -cp $CP ReceiveLogsDirect.java EmitLogDirect.java

如果想把warning和error的日志保存到文件去,那么

java -cp $CP ReceiveLogsDirect warning error > logs_from_rabbit.log

如果想把所有的日志打印到控制臺(tái),那么

java -cp $CP ReceiveLogsDirect info warning error

發(fā)送error日志

java -cp $CP EmitLogDirect error "Run.Run. Or it will explode"

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/73892.html

相關(guān)文章

  • RabbitMQ系列(五) - 主題模式

    摘要:主題模式在上一章我們改進(jìn)了我們的日志系統(tǒng),如果使用我們只能簡單進(jìn)行廣播,而使用則允許消費(fèi)者可以進(jìn)行一定程度的選擇。為的會(huì)同時(shí)發(fā)布到這兩個(gè)。當(dāng)為時(shí),會(huì)接收所有的。當(dāng)中沒有使用通配符和時(shí),的行為和一致。 主題模式 在上一章我們改進(jìn)了我們的日志系統(tǒng),如果使用fanout我們只能簡單進(jìn)行廣播,而使用direct則允許消費(fèi)者可以進(jìn)行一定程度的選擇。但是direct還是有其局限性,其路由不支持多個(gè)...

    pingan8787 評(píng)論0 收藏0
  • RabbitMQ系列(三) - 發(fā)布/訂閱模式

    摘要:發(fā)布訂閱模式在之前的文章里,創(chuàng)建了。我們稱之為發(fā)布訂閱模式。其實(shí)我們是用到了默認(rèn)的,用空字符串來標(biāo)識(shí)??兆址砹藳]有名字的被路由到了由指定名字的。和這種關(guān)系的建立我們稱之為從現(xiàn)在開始這個(gè)就會(huì)將推向我們的隊(duì)列了。 發(fā)布訂閱模式 在之前的文章里,創(chuàng)建了work queue。work queue中,每一個(gè)task都會(huì)派發(fā)給一個(gè)worker。在本章中,我們會(huì)完成完全不一樣的事情 - 我們會(huì)...

    WrBug 評(píng)論0 收藏0
  • RabbitMQ系列(二)-Work模式

    摘要:每個(gè)消費(fèi)者會(huì)得到平均數(shù)量的。為了確保不會(huì)丟失,采用確認(rèn)機(jī)制。如果中斷退出了關(guān)閉了,關(guān)閉了,或是連接丟失了而沒有發(fā)送,會(huì)認(rèn)為該消息沒有完整的執(zhí)行,會(huì)將該消息重新入隊(duì)。該消息會(huì)被發(fā)送給其他的。當(dāng)消費(fèi)者中斷退出,會(huì)重新分派。 Work模式 原文地址showImg(https://segmentfault.com/img/bVbqlXr?w=694&h=252); 在第一章中,我們寫了通過一個(gè)...

    lcodecorex 評(píng)論0 收藏0
  • RabbitMQ系列(六)-RPC模式

    摘要:如果涉及返回值,就要用到本章提到的了。方法發(fā)送請(qǐng)求,并阻塞知道結(jié)果返回。當(dāng)有消息時(shí),進(jìn)行計(jì)算并通過指定的發(fā)送給客戶端。當(dāng)接收到,則檢查。如果和之前的匹配,則將消息返回給應(yīng)用進(jìn)行處理。 RPC模式 在第二章中我們學(xué)習(xí)了如何使用Work模式在多個(gè)worker之間派發(fā)時(shí)間敏感的任務(wù)。這種情況是不涉及到返回值的,worker執(zhí)行任務(wù)就好。如果涉及返回值,就要用到本章提到的RPC(Remote ...

    894974231 評(píng)論0 收藏0
  • [] RabbitMQ tutorials (3) ---- 'Pub/Sub'

    摘要:生產(chǎn)者只能把消息發(fā)到交換器。是否要追加到一個(gè)特殊的隊(duì)列是否要追加到許多的隊(duì)列或者丟掉這條消息這些規(guī)則被定義為交換類型。有一點(diǎn)很關(guān)鍵,向不存在的交換器發(fā)布消息是被禁止的。如果仍然沒有隊(duì)列綁定交換器,消息會(huì)丟失。 發(fā)布與訂閱 (Publish/Subscribe) 在之前的章節(jié)中,我們創(chuàng)建了工作隊(duì)列,之前的工作隊(duì)列的假設(shè)是每個(gè)任務(wù)只被分發(fā)到一個(gè)worker。在這一節(jié)中,我們會(huì)做一些完全不一...

    zzir 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

閱讀需要支付1元查看
<