摘要:每個消費者會得到平均數(shù)量的。為了確保不會丟失,采用確認機制。如果中斷退出了關(guān)閉了,關(guān)閉了,或是連接丟失了而沒有發(fā)送,會認為該消息沒有完整的執(zhí)行,會將該消息重新入隊。該消息會被發(fā)送給其他的。當(dāng)消費者中斷退出,會重新分派。
Work模式
原文地址
在第一章中,我們寫了通過一個queue來發(fā)送和接收message的簡單程序。在這一章中,我們會創(chuàng)建一個workqueue,來將執(zhí)行時間敏感的任務(wù)分發(fā)到多個worker中。
work模式主要的意圖是要避免等待完成一個耗時的任務(wù)。取而代之地,我們延遲任務(wù)的執(zhí)行,將任務(wù)封裝成消息,將之發(fā)送到queue。一個運行著的worker進程會彈出這個任務(wù)并執(zhí)行它。當(dāng)運行多個worker進程時,任務(wù)會在它們之間分派。
這種模式在web應(yīng)用中特別有用,因為在一個較短的HTTP請求窗口中不會去執(zhí)行一個復(fù)雜的任務(wù)。
準(zhǔn)備工作在上一章中,我們發(fā)送了一個”Hello World!"的message。現(xiàn)在我們將發(fā)送一個代表了復(fù)雜任務(wù)的字符串。這不是一個實際的任務(wù),比如像調(diào)整圖片大小或是重新渲染pdf文檔,我們通Thead.sleep() 來模擬一個耗時的任務(wù)。message中的小圓點表示其復(fù)雜度,圓點越多則任務(wù)的執(zhí)行越耗時。比如“Hello..."的message將耗時3秒。
我們簡單的修改上一章的Send.java代碼,允許在命令行發(fā)送任意message。新的類叫做NewTask.java
String message = String.join(" ", argv); channel.basicPublish("", "hello", null, message.getBytes()); System.out.println(" [x] Sent "" + message + """);
同樣的,我們修改上一章中的Recv.java,讓它在處理message的時候根據(jù)小圓點進行睡眠。新的類叫Worker.java
DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received "" + message + """); try { doWork(message); } finally { System.out.println(" [x] Done"); } }; boolean autoAck = true; // acknowledgment is covered below channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
private static void doWork(String task) throws InterruptedException { for (char ch: task.toCharArray()) { if (ch == ".") Thread.sleep(1000); } }
像在第一章一樣編譯這兩個類
javac -cp $CP NewTask.java Worker.javaRound-robin分派
使用Task模式的一個明顯的優(yōu)勢是讓并行執(zhí)行任務(wù)變得簡單。我們只需要啟動更多的worker就可以消減堆積的message,系統(tǒng)水平擴展簡單。
首先,我們在同一時間啟動兩個worker。他們都會從queue獲得message,來看一下具體細節(jié)。
打開了三個終端,兩個是跑worker的。
java -cp $CP Worker
java -cp $CP Worker
第三個終端里來發(fā)布新的任務(wù)message。
java -cp $CP NewTask First message. java -cp $CP NewTask Second message.. java -cp $CP NewTask Third message... java -cp $CP NewTask Fourth message.... java -cp $CP NewTask Fifth message.....
讓我們看看worker的處理message的情況.第一個worker收到了第1,3,5message,第二個worker收到了第2,4個message。
默認情況下,RabbitMQ會順序的將message發(fā)給下一個消費者。每個消費者會得到平均數(shù)量的message。這種方式稱之為round-robin(輪詢).
Message 確認執(zhí)行任務(wù)需要一定的時間。你可能會好奇如果一個worker開始執(zhí)行任務(wù),但是中途異常退出,會是什么結(jié)果。在我們現(xiàn)在的代碼中,一旦RabbitMQ將消息發(fā)送出去了,它會立即將該message刪除。這樣的話,就可能丟失message。
在實際場景中,我們不想丟失任何一個task。如果一個worker異常中斷了,我們希望這個task能分派給另一個worker。
為了確保不會丟失message,RabbitMQ采用message確認機制。RabbitMQ只有收到該message的Ack之后,才會刪除該消息。
如果worker中斷退出了( channel關(guān)閉了,connection關(guān)閉了,或是TCP連接丟失了)而沒有發(fā)送Ack,RabbitMQ會認為該消息沒有完整的執(zhí)行,會將該消息重新入隊。該消息會被發(fā)送給其他的worker。這樣就不用message丟失,即使是在worker經(jīng)常異常中斷退出的場景下。
不會有任何message會timeout。當(dāng)消費者中斷退出,RabbitMQ會重新分派message。即使消息的執(zhí)行會花費很長的時間。
默認情況下,message是需要人工確認的。在上面的例子中,我們通過autoAck=true來關(guān)閉了人工確認。像下面這樣,我們將該標(biāo)志設(shè)置為false,worker就需要在完成了任務(wù)之后,發(fā)送確認。
channel.basicQos(1); // accept only one unack-ed message at a time (see below) DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received "" + message + """); try { doWork(message); } finally { System.out.println(" [x] Done"); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }; boolean autoAck = false; channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
上面的代碼保證即使當(dāng)worker還在處理一條消息,而強制它退出,也不會丟失message。然后不久,所有未被確認的消息都會被重新分派。
發(fā)送確認必須和接收相同的channel。使用不同的channel進行確認會導(dǎo)致channel-level protocol 異常。
忘記確認消息是一個比較常見的錯誤,但是其后果是很嚴重的。當(dāng)client退出時,message會被重新分派,但是RabbitMQ會占用越來越多的內(nèi)存,因它無法釋放那些未被確認的message。
可以通過rabbitmqctl來打印messages_unacknowledged:
##linux sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged ##windows rabbitmqctl.bat list_queues name messages_ready messages_unacknowledgedMessage 持久化
我們學(xué)習(xí)了在消費者出現(xiàn)問題的時候不丟失message。但是如果RabbitMQ服務(wù)器宕機了,我們還是會丟失message。
當(dāng)RabbitMQ宕機時,默認情況下,它會”忘記“所有的queue和message。為了確保message不丟失,我們需要確認兩件事情:我們要使得queue和message都是持久的。
首先,我們要確保RabbitMQ不會丟失我們設(shè)置好的queue。所以,我們要把它聲明成持久的:
boolean durable = true; channel.queueDeclare("hello", durable, false, false, null);
雖然代碼沒有任何問題,但是光這樣是無效的。因為我們之前已經(jīng)定義過名字為hello的queue。RabbitMQ不允許你使用不同的參數(shù)去重新定義一個已經(jīng)存在的queue,而且這還不會反悔任何錯誤信息。但是我們還是有別的方法,讓我們使用一個別的名字,比如task_queue:
boolean durable = true; channel.queueDeclare("task_queue", durable, false, false, null);
聲明queue的改變要在生產(chǎn)者和消費者的代碼里都進行修改。
接著我們要設(shè)置message的持久性,我們通過設(shè)置MessageProperties為PERSISTENT_TEXT_PLAIN:
import com.rabbitmq.client.MessageProperties; channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
將message標(biāo)記成持久的不能100%保證message不會丟失,雖然這告訴RabbitMQ將message保存到磁盤,然而在RabbitMQ從接到message到保存之間,仍然有一小段時間。同時RabbitMQ不會給每一條message執(zhí)行fsync(2) -- 可能只是保存到了cache而沒有寫到磁盤上去。所以持久的保證也不是非常強,然后對我們簡單的task queue來說則足夠了。如果需要一個非常強的保證,則可以使用發(fā)布確認的方式。Fair 分派
你可能已經(jīng)注意到分派的工作沒有如我們所期望的來執(zhí)行。比如在有2個worker的情況系,所有偶數(shù)的message耗時很長,而所有奇數(shù)的message則耗時很短,這樣其中一個worker則一直被分派到偶數(shù)的message,而另一個則一直是奇數(shù)的message。RabbitMQ對此并不知曉,進而繼續(xù)這樣分派著message。
這樣的原因是RabbitMQ是在message入queue的時候確定分派的。它不關(guān)心消費者ack的情況。
我們可以通過basicQos方法和prefetchCount(1)來解決這個問題。這個設(shè)置是讓RabbitMQ給worker一次一個message。或者這么說,直到worker處理完之前的message并發(fā)送ack,才給worker下一個message。否則,Rabbitmq會將message發(fā)送給其它不忙的worker。
int prefetchCount = 1; channel.basicQos(prefetchCount);
注意queue的大小。如果所有的worker都處于忙碌狀態(tài),queue可能會被裝滿。必須監(jiān)控queue深度,可能要開啟更多的worker,或者采取其他的措施。開始執(zhí)行
NewTask.java的最終版本
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; public class NewTask { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); String message = String.join(" ", argv); channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8")); System.out.println(" [x] Sent "" + message + """); } } }
Worker.java的最終版本
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; public class Worker { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); channel.basicQos(1); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received "" + message + """); try { doWork(message); } finally { System.out.println(" [x] Done"); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }; channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { }); } private static void doWork(String task) { for (char ch : task.toCharArray()) { if (ch == ".") { try { Thread.sleep(1000); } catch (InterruptedException _ignored) { Thread.currentThread().interrupt(); } } } } }
使用message ack和prefetchCount,來設(shè)定work queue。持久化選項則在RabbitMQ重啟后能讓任務(wù)得以恢復(fù)。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/73901.html
摘要:發(fā)布訂閱模式在之前的文章里,創(chuàng)建了。我們稱之為發(fā)布訂閱模式。其實我們是用到了默認的,用空字符串來標(biāo)識??兆址砹藳]有名字的被路由到了由指定名字的。和這種關(guān)系的建立我們稱之為從現(xiàn)在開始這個就會將推向我們的隊列了。 發(fā)布訂閱模式 在之前的文章里,創(chuàng)建了work queue。work queue中,每一個task都會派發(fā)給一個worker。在本章中,我們會完成完全不一樣的事情 - 我們會...
摘要:如果涉及返回值,就要用到本章提到的了。方法發(fā)送請求,并阻塞知道結(jié)果返回。當(dāng)有消息時,進行計算并通過指定的發(fā)送給客戶端。當(dāng)接收到,則檢查。如果和之前的匹配,則將消息返回給應(yīng)用進行處理。 RPC模式 在第二章中我們學(xué)習(xí)了如何使用Work模式在多個worker之間派發(fā)時間敏感的任務(wù)。這種情況是不涉及到返回值的,worker執(zhí)行任務(wù)就好。如果涉及返回值,就要用到本章提到的RPC(Remote ...
摘要:這樣的消息分發(fā)機制稱作輪詢。在進程掛了之后,所有的未被確認的消息會被重新分發(fā)。忘記確認這是一個普遍的錯誤,丟失。為了使消息不會丟失,兩件事情需要確保,我們需要持久化隊列和消息。 工作隊列 showImg(https://segmentfault.com/img/remote/1460000008229494?w=332&h=111); 在第一篇中,我們寫了一個程序從已經(jīng)聲明的隊列中收發(fā)...
摘要:主題模式在上一章我們改進了我們的日志系統(tǒng),如果使用我們只能簡單進行廣播,而使用則允許消費者可以進行一定程度的選擇。為的會同時發(fā)布到這兩個。當(dāng)為時,會接收所有的。當(dāng)中沒有使用通配符和時,的行為和一致。 主題模式 在上一章我們改進了我們的日志系統(tǒng),如果使用fanout我們只能簡單進行廣播,而使用direct則允許消費者可以進行一定程度的選擇。但是direct還是有其局限性,其路由不支持多個...
摘要:路由模式在之前的文章中我們建立了一個簡單的日志系統(tǒng)。更形象的表示,如對中的感興趣。為了進行說明,像下圖這么來設(shè)置如圖,可以看到有兩個綁到了類型為的上。如圖的設(shè)置中,一個為的就會同時發(fā)送到和。接收程序可以選擇要接收日志的嚴重性級別。 路由模式 在之前的文章中我們建立了一個簡單的日志系統(tǒng)。我們可以通過這個系統(tǒng)將日志message廣播給很多接收者。 在本篇文章中,我們在這之上,添加一個新的功...
閱讀 489·2019-08-30 15:44
閱讀 903·2019-08-30 10:55
閱讀 2737·2019-08-29 15:16
閱讀 943·2019-08-29 13:17
閱讀 2811·2019-08-26 13:27
閱讀 578·2019-08-26 11:53
閱讀 2126·2019-08-23 18:31
閱讀 1893·2019-08-23 18:23