摘要:任務(wù)隊(duì)列最主要的功能就是解耦高耗時的操作,否則程序會一直等在那里,浪費(fèi)大量資源。我們將任務(wù)封裝成一個消息發(fā)送給隊(duì)列,后臺的任務(wù)進(jìn)程會得到這個任務(wù)并執(zhí)行它,而且可以配置多個任務(wù)進(jìn)程,進(jìn)一步加大吞吐率。為了確保消息不丟失,支持消息確認(rèn)。
推廣
https://segmentfault.com/l/15...
我們利用消息隊(duì)列實(shí)現(xiàn)了分布式事務(wù)的最終一致性解決方案,請大家圍觀??梢詤⒖荚创a:https://github.com/vvsuperman…,項(xiàng)目支持網(wǎng)站: http://rabbitmq.org.cn,最新文章或?qū)崿F(xiàn)會更新在上面
前言在第一篇中我們描述了如何最簡單的RabbitMQ操作,如何發(fā)送、接受消息。在今天這篇文章中我們將描述如何創(chuàng)建一個任務(wù)隊(duì)列,來將高耗時的任務(wù)分發(fā)到多個消費(fèi)者,從而提高處理效率。
任務(wù)隊(duì)列最主要的功能就是解耦高耗時的操作,否則程序會一直等在那里,浪費(fèi)大量資源。反之我們會把這個操作交給隊(duì)列,讓它延后再做。我們將任務(wù)封裝成一個消息發(fā)送給隊(duì)列,后臺的任務(wù)進(jìn)程會得到這個任務(wù)并執(zhí)行它,而且可以配置多個任務(wù)進(jìn)程,進(jìn)一步加大吞吐率。
特別是對于網(wǎng)絡(luò)請求,一次短短的HTTP請求是要求迅速響應(yīng)的,不可能讓它一直停頓在高耗時操作上。
準(zhǔn)備工作在第一章中我們發(fā)送了“Hello World!”?,F(xiàn)在來完成更復(fù)雜一點(diǎn)的,因?yàn)檫@里并沒有真正的高耗時操作,比如縮放圖像或輸出一個pdf。因此我們只是用Thread.sleep()來假裝我們很繁忙,而且會用"."來表示需要停頓的秒數(shù),比如一個叫Hello...的任務(wù)將停頓3秒鐘。
我們簡單的更改下Send.java,稱之為 NewTask.java.
String message = getMessage(argv); channel.basicPublish("", "hello", null, message.getBytes()); System.out.println(" [x] Sent "" + message + """);
然后是工具類
private static String getMessage(String[] strings){ if (strings.length < 1) return "Hello World!"; return joinStrings(strings, " "); } private static String joinStrings(String[] strings, String delimiter) { int length = strings.length; if (length == 0) return ""; StringBuilder words = new StringBuilder(strings[0]); for (int i = 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); }
當(dāng)然,我們的Recv.java也需要進(jìn)行一些改造,它需要對每一個"."停頓1秒,Work.java如下
final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received "" + message + """); try { doWork(message); } finally { System.out.println(" [x] Done"); } } }; boolean autoAck = true; // acknowledgment is covered below channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer); private static void doWork(String task) throws InterruptedException { for (char ch: task.toCharArray()) { if (ch == ".") Thread.sleep(1000); } }
編譯上面這些代碼
javac -cp $CP NewTask.java Worker.java輪詢調(diào)度
任務(wù)隊(duì)列的一個最大優(yōu)點(diǎn)是可以并行工作,能夠非常容易的水平擴(kuò)張。
首先,讓我們同時運(yùn)行兩個工作線程,他們能夠同時從隊(duì)列獲取消息。我們也需要同時開啟3個console:1個生產(chǎn)者,2個消費(fèi)者
消費(fèi)者C1
# shell 1 java -cp $CP Worker # => [*] Waiting for messages. To exit press CTRL+C
消費(fèi)者C2
# shell 2 java -cp $CP Worker # => [*] Waiting for messages. To exit press CTRL+C
讓我們運(yùn)行生產(chǎn)者
# shell 3 java -cp $CP NewTask # => First message. java -cp $CP NewTask # => Second message.. java -cp $CP NewTask # => Third message... java -cp $CP NewTask # => Fourth message.... java -cp $CP NewTask # => Fifth message.....
讓我們看看消費(fèi)者們
消費(fèi)者C1
java -cp $CP Worker # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received "First message." # => [x] Received "Third message..." # => [x] Received "Fifth message....."
消費(fèi)者C2
java -cp $CP Worker # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received "Second message.." # => [x] Received "Fourth message..
RabbitMQ默認(rèn)有序的將會發(fā)送消息給下一個消費(fèi)者,所以每一個消費(fèi)者都會得到相同數(shù)量的消息,這種方式就叫做輪詢調(diào)度(round-robin),你可以嘗試下更多的消費(fèi)者
消息確認(rèn)一個任務(wù)可能非常耗時,如果消費(fèi)者在做一個高耗時任務(wù)時掛掉了,我們將會丟失所有發(fā)送到這個消費(fèi)者上的消息。這是非常不可取的,所以我們希望能夠明確的知道消息是否消費(fèi)成功,如果一個消費(fèi)掛了,我們能夠知道,并且將消息發(fā)送給下一個消費(fèi)者。
為了確保消息不丟失,RabbitMQ支持消息確認(rèn)。收到消息后消費(fèi)者會給RabbitMQ服務(wù)器發(fā)送一個ack(我已經(jīng)收到消息了),RabbitMQ就會在服務(wù)上刪除這個消息了。
如果一個消費(fèi)者掛了(連接關(guān)閉,channel關(guān)閉,或者是TCP連接丟失)而沒有發(fā)送ack,RabbitMQ就會知道消息并沒有消費(fèi)成功,于是乎消息會被放到消息隊(duì)列重新消費(fèi)。如果此時還有其它消費(fèi)者的話,消息會發(fā)送給其它消費(fèi)者來消費(fèi),確保消息不會丟失
消息并沒有超時時間這個概念,消息只會在消費(fèi)者掛掉了時候重發(fā),即使是一個非常非常耗時的的消費(fèi)者也不會發(fā)生重發(fā)
手動消息確認(rèn)(Manual message acknowledgments)默認(rèn)是打開的,雖然我們之前關(guān)閉了它:autoAck=true。讓我們先將它設(shè)置為false
channel.basicQos(1); // accept only one unack-ed message at a time (see below) final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received "" + message + """); try { doWork(message); } finally { System.out.println(" [x] Done"); channel.basicAck(envelope.getDeliveryTag(), false); } } }; boolean autoAck = false; channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
這樣一來,即使你使用CTRL+C強(qiáng)制殺死了一個消費(fèi)者,消費(fèi)者所丟失的消息也將會被重發(fā),會被另一個消費(fèi)者所接受并消費(fèi)。
忘記應(yīng)答很容易犯忘記應(yīng)答的錯誤,但會導(dǎo)致非常嚴(yán)重的后果。Messages會被重發(fā),RabbitMQ會消耗越來越多的內(nèi)存因?yàn)閡nacked的消息無法釋放(甚至更嚴(yán)重,RabbitMQ內(nèi)部維護(hù)了一個最大打開線程數(shù),如果太多的消息沒有應(yīng)答,RabbitMQ甚至?xí)麄€崩潰掉)
你可以用Rabbitmqctl查看未被應(yīng)答的消息數(shù)
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
windows下:
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged消息持久化
我們現(xiàn)在知道了可以通過應(yīng)答來保證消息不丟失,但萬一RabbitMQ掛了呢?還是可能會導(dǎo)致消息丟失。因此我們可以通過持久化的機(jī)制,包括將隊(duì)列以及隊(duì)列中的消息持久化的方式,來保證即便RabbitMQ掛了,當(dāng)它重啟的時候,隊(duì)列以及消息也能夠恢復(fù)
首先做隊(duì)列的持久化,聲明隊(duì)列為durable
boolean durable = true; channel.queueDeclare("hello", durable, false, false, null);
但很可惜的是,這種聲明方式并不適用與上面的方法,因?yàn)槲覀円呀?jīng)將“Hello”定義為一個非持久化的隊(duì)列了,是不能再將他改為持久化的,如果這樣做,將會直接返回一個error信息。所以,我們需要重新再定義一個隊(duì)列
boolean durable = true; channel.queueDeclare("task_queue", durable, false, false, null);
在保證隊(duì)列的持久化后需要保證消息的持久化-將消息設(shè)置為PERSISTENT_TEXT_PLAIN
import com.rabbitmq.client.MessageProperties; channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());公平分發(fā)
但這樣還是存在問題:假設(shè)有如下的情形,一個消費(fèi)者非常耗時,而一個消費(fèi)者非???,由于消息都是公平的發(fā)送,所以它們都是接收到相同數(shù)量的消息,會導(dǎo)致一個消費(fèi)者非常忙碌,而另外一個消費(fèi)者非??臻e,而RabbitMQ無法得知這一點(diǎn)。
為了解決這個缺陷我們引入了basicQos方法以及prefetchCount =1的設(shè)置。這會告訴RabbitMQ一次只給消費(fèi)者一個消息:如果這個消息未確認(rèn),將不會發(fā)送新的消息,從而它會將消息發(fā)送給其它并不那么忙的消費(fèi)者
int prefetchCount = 1; channel.basicQos(prefetchCount);留意queue size
如果所有的消費(fèi)者都非常忙,隊(duì)列可能會很快被填滿,所以你需要留意這一點(diǎn),要么增加更多的消費(fèi)者,或者采取其它的策略。
整合NewTask.java
import java.io.IOException; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.MessageProperties; public class NewTask { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws java.io.IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); String message = getMessage(argv); channel.basicPublish( "", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); System.out.println(" [x] Sent "" + message + """); channel.close(); connection.close(); } //... }
Worker.java
import com.rabbitmq.client.*; import java.io.IOException; public class Worker { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); channel.basicQos(1); final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received "" + message + """); try { doWork(message); } finally { System.out.println(" [x] Done"); channel.basicAck(envelope.getDeliveryTag(), false); } } }; boolean autoAck = false; channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer); } private static void doWork(String task) { for (char ch : task.toCharArray()) { if (ch == ".") { try { Thread.sleep(1000); } catch (InterruptedException _ignored) { Thread.currentThread().interrupt(); } } } } }
使用消息確認(rèn)和prefetchCount你就能設(shè)置一個持久化隊(duì)列了,同時,使用durable和persist,,即使RabbitMQ掛掉了,重啟后也能夠重發(fā)消息
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/68099.html
摘要:推廣專題講座開源項(xiàng)目我們利用消息隊(duì)列實(shí)現(xiàn)了分布式事務(wù)的最終一致性解決方案,請大家圍觀。因此一旦有有消息,消息會廣播到所有的消費(fèi)者。如此一來路由器就能夠把消息發(fā)送給相應(yīng)的隊(duì)列了。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項(xiàng)目 我們利用消息隊(duì)列實(shí)現(xiàn)了分布式事務(wù)的最終一致性解決方案,請大家圍觀??梢詤⒖荚创a:https...
摘要:作為消息隊(duì)列的一個典型實(shí)踐,完全實(shí)現(xiàn)了標(biāo)準(zhǔn),與的快快快不同,它追求的穩(wěn)定可靠。同一個隊(duì)列不僅可以綁定多個生產(chǎn)者,而且能夠發(fā)送消息到多個消費(fèi)者。消費(fèi)者接受并消費(fèi)消息。幾乎于完全類似是一個繼承了接口的類,方便我們來存儲消息隊(duì)列來的消息。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項(xiàng)目 我們利用消息隊(duì)列實(shí)現(xiàn)了分布式事務(wù)的...
摘要:可以參考源碼,項(xiàng)目支持網(wǎng)站,最新文章或?qū)崿F(xiàn)會更新在上面前言在訂閱發(fā)布中我們建立了一個簡單的日志系統(tǒng),從而將消息廣播給一些消費(fèi)者。因此,發(fā)送到路由鍵的消息會發(fā)送給隊(duì)列,發(fā)送到路由鍵或者的消息會發(fā)送給,其它的消息將被丟棄。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項(xiàng)目 我們利用消息隊(duì)列實(shí)現(xiàn)了分布式事務(wù)的最終一致性解決...
摘要:推廣專題講座開源項(xiàng)目我們利用消息隊(duì)列實(shí)現(xiàn)了分布式事務(wù)的最終一致性解決方案,請大家圍觀。主題交換機(jī)也可以當(dāng)成其它交換機(jī)來使用,假如隊(duì)列綁定到了那么它會接收所有的消息,就像廣播路由器一樣而如果未使用,那么就跟直達(dá)路由器一樣了。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項(xiàng)目 我們利用消息隊(duì)列實(shí)現(xiàn)了分布式事務(wù)的最終一致性...
摘要:因?yàn)橄M(fèi)消息是在另外一個進(jìn)程中,我們需要阻塞我們的進(jìn)程直到結(jié)果返回,使用阻塞隊(duì)列是一種非常好的方式,這里我們使用了長度為的,的功能是檢查消息的的是不是我們之前所發(fā)送的,如果是,將返回值返回到。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項(xiàng)目 我們利用消息隊(duì)列實(shí)現(xiàn)了分布式事務(wù)的最終一致性解決方案,請大家圍觀。可以參考...
閱讀 3122·2021-11-23 09:51
閱讀 1989·2021-09-09 09:32
閱讀 1096·2019-08-30 15:53
閱讀 2966·2019-08-30 11:19
閱讀 2477·2019-08-29 14:15
閱讀 1444·2019-08-29 13:52
閱讀 563·2019-08-29 12:46
閱讀 2831·2019-08-26 12:18