摘要:推廣專題講座開源項目我們利用消息隊列實現(xiàn)了分布式事務(wù)的最終一致性解決方案,請大家圍觀。主題交換機也可以當(dāng)成其它交換機來使用,假如隊列綁定到了那么它會接收所有的消息,就像廣播路由器一樣而如果未使用,那么就跟直達路由器一樣了。
推廣
https://segmentfault.com/l/15...
我們利用消息隊列實現(xiàn)了分布式事務(wù)的最終一致性解決方案,請大家圍觀??梢詤⒖荚创a:https://github.com/vvsuperman…,項目支持網(wǎng)站: http://rabbitmq.org.cn,最新文章或?qū)崿F(xiàn)會更新在上面
前言在之前的建立路由中我們改進了日志系統(tǒng)。我們摒棄無腦發(fā)送消息的廣播路由器,而使用能夠根據(jù)綁定鍵(binding key)來發(fā)送消息的,從而能有有選擇的后去logs.
盡管使用直達路由器大大的改進了我們系統(tǒng),但也存在局限性 - 無法加入更多條件。比如我們希望能夠加入更多的維度,我們希望不僅是基于嚴(yán)重程度,而且是基于來源,如果你對linux tool工具有了解的話,它不僅僅是基于嚴(yán)重程度(info/warn/crit...) 而且有來源(auth/cron/kern...),這個給到我們更大的靈活性-我們需要監(jiān)聽所有來自"cron"的errors消息,以及來自"kern"的所有l(wèi)og。所以我們需要的是一個更復(fù)雜的主題交換機
主題交換機發(fā)送到主題交換機的消息并不會有一個確定的路由鍵-而是一長串字符列表,以"."來分割,而這個字符串列表表明了路由信息,比如"stock.usd.nyse","nyse.vmw","quick.orange.rabbit",字符串的最大長度限制在255bytes。
同時,在隊列綁定交換機時也需要指定模式,而符合模式的消息將會被發(fā)送至該隊列,模式可以由通配符組成:
"*" 可以表示一個詞
"#" 表示0個或多個詞
可以通過如下的例子來說明
請看例子,以發(fā)送動物的消息為例,我們會發(fā)送包含三個詞的路由鍵(兩個".")。第一個是速度,第二個是顏色,而第三個是種族
同時,我們建立了三個綁定,Q1綁定了鍵".orange.",Q2綁定了鍵"..rabbit"以及"lazy.#"??梢宰鋈缦碌慕忉專琎1用來接受所有orange的動物,Q2用來接受所有rabbits,以及l(fā)azy的動物
一個路由為"quick.orange.rabbit"的消息將會被同時發(fā)送給這兩個隊列,消息"lazy.orange.elephant"也會被同時發(fā)給它們;"quick.orange.fox"只會發(fā)給第一個隊列;"lazy.brown.fox"會發(fā)到第二個;"lazy.pink.rabbit"將只會發(fā)送給第二個;"quick.brown.fox"會被丟棄因為匹配不上任何一個。
如果我們發(fā)送四個詞的呢?比如"oragne"或者"quick.orange.male.rabbit"?這些沒有任何匹配的隊列將會丟失。但比如"quick.orange.male.rabbit"會匹配到第二個隊列。
主題交換機也可以當(dāng)成其它交換機來使用,假如隊列綁定到了 "#",那么它會接收所有的消息,就像廣播路由器一樣;而如果未使用"*","#",那么就跟直達路由器一樣了。
整合所有的代碼我們用主題交換機替換掉之前的直達交換機,用如同"
import com.rabbitmq.client.*; import java.io.IOException; public class EmitLogTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String routingKey = getRouting(argv); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes()); System.out.println(" [x] Sent "" + routingKey + "":"" + message + """); connection.close(); } //... }
ReceiveLogsTopic.java的代碼片段
import com.rabbitmq.client.*; import java.io.IOException; public class ReceiveLogsTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String queueName = channel.queueDeclare().getQueue(); if (argv.length < 1) { System.err.println("Usage: ReceiveLogsTopic [binding_key]..."); System.exit(1); } for (String bindingKey : argv) { channel.queueBind(queueName, EXCHANGE_NAME, bindingKey); } System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received "" + envelope.getRoutingKey() + "":"" + message + """); } }; channel.basicConsume(queueName, true, consumer); } }
編譯這段代碼
javac -cp $CP ReceiveLogsTopic.java EmitLogTopic.java
接受所有的logs
java -cp $CP ReceiveLogsTopic "#"
接受來自"kern"的消息
java -cp $CP ReceiveLogsTopic "kern.*"
接受來自"critical"的消息
java -cp $CP ReceiveLogsTopic "*.critical"
創(chuàng)建多個綁定
java -cp $CP ReceiveLogsTopic "kern.*" "*.critical"
發(fā)送消息
java -cp $CP EmitLogTopic "kern.critical" "A critical kernel error"
你可以嘗試更多的參數(shù),以此來熟悉這個知識
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/68117.html
摘要:推廣專題講座開源項目我們利用消息隊列實現(xiàn)了分布式事務(wù)的最終一致性解決方案,請大家圍觀。因此一旦有有消息,消息會廣播到所有的消費者。如此一來路由器就能夠把消息發(fā)送給相應(yīng)的隊列了。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項目 我們利用消息隊列實現(xiàn)了分布式事務(wù)的最終一致性解決方案,請大家圍觀??梢詤⒖荚创a:https...
摘要:前提必讀本教程假設(shè)是安裝在標(biāo)準(zhǔn)端口上運行。這些詞可以是任何東西,但通常它們指定連接到消息的某些特性。如果我們違背合同,用一個或四個詞,如或那么,這些消息將不匹配任何綁定并將丟失。代碼與前面的教程幾乎相同。 (using php-amqplib) 前提必讀 本教程假設(shè)RabbitMQ是安裝在標(biāo)準(zhǔn)端口上運行(5672)。如果您使用不同的主機、端口或憑據(jù),則連接設(shè)置需要調(diào)整。 在哪里得到幫助...
摘要:主題模式在上一章我們改進了我們的日志系統(tǒng),如果使用我們只能簡單進行廣播,而使用則允許消費者可以進行一定程度的選擇。為的會同時發(fā)布到這兩個。當(dāng)為時,會接收所有的。當(dāng)中沒有使用通配符和時,的行為和一致。 主題模式 在上一章我們改進了我們的日志系統(tǒng),如果使用fanout我們只能簡單進行廣播,而使用direct則允許消費者可以進行一定程度的選擇。但是direct還是有其局限性,其路由不支持多個...
摘要:概述概述消息隊列,是分布式系統(tǒng)中重要的組件,是一種進程間通信或者是同一進程的不同線程的通信方式。消息隊列的使用場景消息隊列的使用場景異步處理流量控制應(yīng)用解耦應(yīng)用解耦應(yīng)用解耦消息隊列的一個作用就是實現(xiàn)系統(tǒng)應(yīng)用之間的解耦。概述消息隊列(Message Queue),是分布式系統(tǒng)中重要的組件,是一種進程間通信或者是同一進程的不同線程的通信方式。和 http 同步協(xié)議不同的是,消息隊列是一種異步的通...
摘要:消息隊列,用于存儲還未被消費者消費的消息。由在與時指定,而由發(fā)送時指定,兩者的匹配方式由決定。需要為每一個創(chuàng)建,協(xié)議規(guī)定只有通過才能執(zhí)行的命令。建議客戶端線程之間不要共用,至少要保證共用的線程發(fā)送消息必須是串行的,但是建議盡量共用。 安裝 rabbitmq 在 mac 下可以直接用 brew 安裝默認安裝在 /usr/local/Cellar/下命令被軟連接加入到了/usr/local...
閱讀 3160·2021-11-22 12:01
閱讀 3779·2021-08-30 09:46
閱讀 791·2019-08-30 13:48
閱讀 3223·2019-08-29 16:43
閱讀 1671·2019-08-29 16:33
閱讀 1859·2019-08-29 13:44
閱讀 1422·2019-08-26 13:45
閱讀 2239·2019-08-26 11:44