成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

白話RabbitMQ(二): 任務(wù)隊(duì)列

fnngj / 1762人閱讀

摘要:任務(wù)隊(duì)列最主要的功能就是解耦高耗時的操作,否則程序會一直等在那里,浪費(fèi)大量資源。我們將任務(wù)封裝成一個消息發(fā)送給隊(duì)列,后臺的任務(wù)進(jìn)程會得到這個任務(wù)并執(zhí)行它,而且可以配置多個任務(wù)進(jìn)程,進(jìn)一步加大吞吐率。為了確保消息不丟失,支持消息確認(rèn)。

推廣
RabbitMQ專題講座

https://segmentfault.com/l/15...

CoolMQ開源項(xiàng)目

我們利用消息隊(duì)列實(shí)現(xiàn)了分布式事務(wù)的最終一致性解決方案,請大家圍觀??梢詤⒖荚创a:https://github.com/vvsuperman…,項(xiàng)目支持網(wǎng)站: http://rabbitmq.org.cn,最新文章或?qū)崿F(xiàn)會更新在上面

前言

在第一篇中我們描述了如何最簡單的RabbitMQ操作,如何發(fā)送、接受消息。在今天這篇文章中我們將描述如何創(chuàng)建一個任務(wù)隊(duì)列,來將高耗時的任務(wù)分發(fā)到多個消費(fèi)者,從而提高處理效率。

任務(wù)隊(duì)列最主要的功能就是解耦高耗時的操作,否則程序會一直等在那里,浪費(fèi)大量資源。反之我們會把這個操作交給隊(duì)列,讓它延后再做。我們將任務(wù)封裝成一個消息發(fā)送給隊(duì)列,后臺的任務(wù)進(jìn)程會得到這個任務(wù)并執(zhí)行它,而且可以配置多個任務(wù)進(jìn)程,進(jìn)一步加大吞吐率。

特別是對于網(wǎng)絡(luò)請求,一次短短的HTTP請求是要求迅速響應(yīng)的,不可能讓它一直停頓在高耗時操作上。

準(zhǔn)備工作

在第一章中我們發(fā)送了“Hello World!”?,F(xiàn)在來完成更復(fù)雜一點(diǎn)的,因?yàn)檫@里并沒有真正的高耗時操作,比如縮放圖像或輸出一個pdf。因此我們只是用Thread.sleep()來假裝我們很繁忙,而且會用"."來表示需要停頓的秒數(shù),比如一個叫Hello...的任務(wù)將停頓3秒鐘。

我們簡單的更改下Send.java,稱之為 NewTask.java.

String message = getMessage(argv);

channel.basicPublish("", "hello", null, message.getBytes());
System.out.println(" [x] Sent "" + message + """);

然后是工具類

private static String getMessage(String[] strings){
    if (strings.length < 1)
        return "Hello World!";
    return joinStrings(strings, " ");
}

private static String joinStrings(String[] strings, String delimiter) {
    int length = strings.length;
    if (length == 0) return "";
    StringBuilder words = new StringBuilder(strings[0]);
    for (int i = 1; i < length; i++) {
        words.append(delimiter).append(strings[i]);
    }
    return words.toString();
}

當(dāng)然,我們的Recv.java也需要進(jìn)行一些改造,它需要對每一個"."停頓1秒,Work.java如下

final Consumer consumer = new DefaultConsumer(channel) {
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    String message = new String(body, "UTF-8");

    System.out.println(" [x] Received "" + message + """);
    try {
      doWork(message);
    } finally {
      System.out.println(" [x] Done");
    }
  }
};
boolean autoAck = true; // acknowledgment is covered below
channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);

private static void doWork(String task) throws InterruptedException {
    for (char ch: task.toCharArray()) {
        if (ch == ".") Thread.sleep(1000);
    }
}    

編譯上面這些代碼

javac -cp $CP NewTask.java Worker.java
輪詢調(diào)度

任務(wù)隊(duì)列的一個最大優(yōu)點(diǎn)是可以并行工作,能夠非常容易的水平擴(kuò)張。

首先,讓我們同時運(yùn)行兩個工作線程,他們能夠同時從隊(duì)列獲取消息。我們也需要同時開啟3個console:1個生產(chǎn)者,2個消費(fèi)者

消費(fèi)者C1

# shell 1
java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C

消費(fèi)者C2

# shell 2
java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C

讓我們運(yùn)行生產(chǎn)者

# shell 3
java -cp $CP NewTask
# => First message.
java -cp $CP NewTask
# => Second message..
java -cp $CP NewTask
# => Third message...
java -cp $CP NewTask
# => Fourth message....
java -cp $CP NewTask
# => Fifth message.....    

讓我們看看消費(fèi)者們
消費(fèi)者C1

java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received "First message."
# => [x] Received "Third message..."
# => [x] Received "Fifth message....."   

消費(fèi)者C2

java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received "Second message.."
# => [x] Received "Fourth message..   

RabbitMQ默認(rèn)有序的將會發(fā)送消息給下一個消費(fèi)者,所以每一個消費(fèi)者都會得到相同數(shù)量的消息,這種方式就叫做輪詢調(diào)度(round-robin),你可以嘗試下更多的消費(fèi)者

消息確認(rèn)

一個任務(wù)可能非常耗時,如果消費(fèi)者在做一個高耗時任務(wù)時掛掉了,我們將會丟失所有發(fā)送到這個消費(fèi)者上的消息。這是非常不可取的,所以我們希望能夠明確的知道消息是否消費(fèi)成功,如果一個消費(fèi)掛了,我們能夠知道,并且將消息發(fā)送給下一個消費(fèi)者。

為了確保消息不丟失,RabbitMQ支持消息確認(rèn)。收到消息后消費(fèi)者會給RabbitMQ服務(wù)器發(fā)送一個ack(我已經(jīng)收到消息了),RabbitMQ就會在服務(wù)上刪除這個消息了。

如果一個消費(fèi)者掛了(連接關(guān)閉,channel關(guān)閉,或者是TCP連接丟失)而沒有發(fā)送ack,RabbitMQ就會知道消息并沒有消費(fèi)成功,于是乎消息會被放到消息隊(duì)列重新消費(fèi)。如果此時還有其它消費(fèi)者的話,消息會發(fā)送給其它消費(fèi)者來消費(fèi),確保消息不會丟失

消息并沒有超時時間這個概念,消息只會在消費(fèi)者掛掉了時候重發(fā),即使是一個非常非常耗時的的消費(fèi)者也不會發(fā)生重發(fā)

手動消息確認(rèn)(Manual message acknowledgments)默認(rèn)是打開的,雖然我們之前關(guān)閉了它:autoAck=true。讓我們先將它設(shè)置為false

channel.basicQos(1); // accept only one unack-ed message at a time (see below)

final Consumer consumer = new DefaultConsumer(channel) {
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    String message = new String(body, "UTF-8");

    System.out.println(" [x] Received "" + message + """);
    try {
      doWork(message);
    } finally {
      System.out.println(" [x] Done");
      channel.basicAck(envelope.getDeliveryTag(), false);
    }
  }
};
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);

這樣一來,即使你使用CTRL+C強(qiáng)制殺死了一個消費(fèi)者,消費(fèi)者所丟失的消息也將會被重發(fā),會被另一個消費(fèi)者所接受并消費(fèi)。

忘記應(yīng)答

很容易犯忘記應(yīng)答的錯誤,但會導(dǎo)致非常嚴(yán)重的后果。Messages會被重發(fā),RabbitMQ會消耗越來越多的內(nèi)存因?yàn)閡nacked的消息無法釋放(甚至更嚴(yán)重,RabbitMQ內(nèi)部維護(hù)了一個最大打開線程數(shù),如果太多的消息沒有應(yīng)答,RabbitMQ甚至?xí)麄€崩潰掉)

你可以用Rabbitmqctl查看未被應(yīng)答的消息數(shù)

sudo rabbitmqctl list_queues name messages_ready      
 messages_unacknowledged

windows下:

rabbitmqctl.bat list_queues name messages_ready     
messages_unacknowledged
消息持久化

我們現(xiàn)在知道了可以通過應(yīng)答來保證消息不丟失,但萬一RabbitMQ掛了呢?還是可能會導(dǎo)致消息丟失。因此我們可以通過持久化的機(jī)制,包括將隊(duì)列以及隊(duì)列中的消息持久化的方式,來保證即便RabbitMQ掛了,當(dāng)它重啟的時候,隊(duì)列以及消息也能夠恢復(fù)

首先做隊(duì)列的持久化,聲明隊(duì)列為durable

boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);

但很可惜的是,這種聲明方式并不適用與上面的方法,因?yàn)槲覀円呀?jīng)將“Hello”定義為一個非持久化的隊(duì)列了,是不能再將他改為持久化的,如果這樣做,將會直接返回一個error信息。所以,我們需要重新再定義一個隊(duì)列

boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);

在保證隊(duì)列的持久化后需要保證消息的持久化-將消息設(shè)置為PERSISTENT_TEXT_PLAIN

import com.rabbitmq.client.MessageProperties;

channel.basicPublish("", "task_queue",
        MessageProperties.PERSISTENT_TEXT_PLAIN,
        message.getBytes());  
公平分發(fā)

但這樣還是存在問題:假設(shè)有如下的情形,一個消費(fèi)者非常耗時,而一個消費(fèi)者非???,由于消息都是公平的發(fā)送,所以它們都是接收到相同數(shù)量的消息,會導(dǎo)致一個消費(fèi)者非常忙碌,而另外一個消費(fèi)者非??臻e,而RabbitMQ無法得知這一點(diǎn)。

為了解決這個缺陷我們引入了basicQos方法以及prefetchCount =1的設(shè)置。這會告訴RabbitMQ一次只給消費(fèi)者一個消息:如果這個消息未確認(rèn),將不會發(fā)送新的消息,從而它會將消息發(fā)送給其它并不那么忙的消費(fèi)者

int prefetchCount = 1;
channel.basicQos(prefetchCount);
留意queue size

如果所有的消費(fèi)者都非常忙,隊(duì)列可能會很快被填滿,所以你需要留意這一點(diǎn),要么增加更多的消費(fèi)者,或者采取其它的策略。

整合

NewTask.java

import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;

public class NewTask {

  private static final String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv)
                      throws java.io.IOException {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

    String message = getMessage(argv);

    channel.basicPublish( "", TASK_QUEUE_NAME,
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes());
    System.out.println(" [x] Sent "" + message + """);

    channel.close();
    connection.close();
  }      
  //...
}

Worker.java

import com.rabbitmq.client.*;

import java.io.IOException;

public class Worker {
  private static final String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    final Connection connection = factory.newConnection();
    final Channel channel = connection.createChannel();

    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    channel.basicQos(1);

    final Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");

        System.out.println(" [x] Received "" + message + """);
        try {
          doWork(message);
        } finally {
          System.out.println(" [x] Done");
          channel.basicAck(envelope.getDeliveryTag(), false);
        }
      }
    };
    boolean autoAck = false;
    channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
  }

  private static void doWork(String task) {
    for (char ch : task.toCharArray()) {
      if (ch == ".") {
        try {
          Thread.sleep(1000);
        } catch (InterruptedException _ignored) {
          Thread.currentThread().interrupt();
        }
      }
    }
  }
}

使用消息確認(rèn)和prefetchCount你就能設(shè)置一個持久化隊(duì)列了,同時,使用durable和persist,,即使RabbitMQ掛掉了,重啟后也能夠重發(fā)消息

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/68099.html

相關(guān)文章

  • 白話RabbitMQ(三):發(fā)布/訂閱

    摘要:推廣專題講座開源項(xiàng)目我們利用消息隊(duì)列實(shí)現(xiàn)了分布式事務(wù)的最終一致性解決方案,請大家圍觀。因此一旦有有消息,消息會廣播到所有的消費(fèi)者。如此一來路由器就能夠把消息發(fā)送給相應(yīng)的隊(duì)列了。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項(xiàng)目 我們利用消息隊(duì)列實(shí)現(xiàn)了分布式事務(wù)的最終一致性解決方案,請大家圍觀??梢詤⒖荚创a:https...

    Ververica 評論0 收藏0
  • 白話rabbitmq(一): HelloWorld

    摘要:作為消息隊(duì)列的一個典型實(shí)踐,完全實(shí)現(xiàn)了標(biāo)準(zhǔn),與的快快快不同,它追求的穩(wěn)定可靠。同一個隊(duì)列不僅可以綁定多個生產(chǎn)者,而且能夠發(fā)送消息到多個消費(fèi)者。消費(fèi)者接受并消費(fèi)消息。幾乎于完全類似是一個繼承了接口的類,方便我們來存儲消息隊(duì)列來的消息。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項(xiàng)目 我們利用消息隊(duì)列實(shí)現(xiàn)了分布式事務(wù)的...

    garfileo 評論0 收藏0
  • 白話RabbitMQ(四): 建立路由

    摘要:可以參考源碼,項(xiàng)目支持網(wǎng)站,最新文章或?qū)崿F(xiàn)會更新在上面前言在訂閱發(fā)布中我們建立了一個簡單的日志系統(tǒng),從而將消息廣播給一些消費(fèi)者。因此,發(fā)送到路由鍵的消息會發(fā)送給隊(duì)列,發(fā)送到路由鍵或者的消息會發(fā)送給,其它的消息將被丟棄。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項(xiàng)目 我們利用消息隊(duì)列實(shí)現(xiàn)了分布式事務(wù)的最終一致性解決...

    CoderStudy 評論0 收藏0
  • 白話RabbitMQ(五): 主題路由器(Topic Exchange)

    摘要:推廣專題講座開源項(xiàng)目我們利用消息隊(duì)列實(shí)現(xiàn)了分布式事務(wù)的最終一致性解決方案,請大家圍觀。主題交換機(jī)也可以當(dāng)成其它交換機(jī)來使用,假如隊(duì)列綁定到了那么它會接收所有的消息,就像廣播路由器一樣而如果未使用,那么就跟直達(dá)路由器一樣了。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項(xiàng)目 我們利用消息隊(duì)列實(shí)現(xiàn)了分布式事務(wù)的最終一致性...

    Gilbertat 評論0 收藏0
  • 白話RabbitMQ(六): RPC

    摘要:因?yàn)橄M(fèi)消息是在另外一個進(jìn)程中,我們需要阻塞我們的進(jìn)程直到結(jié)果返回,使用阻塞隊(duì)列是一種非常好的方式,這里我們使用了長度為的,的功能是檢查消息的的是不是我們之前所發(fā)送的,如果是,將返回值返回到。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項(xiàng)目 我們利用消息隊(duì)列實(shí)現(xiàn)了分布式事務(wù)的最終一致性解決方案,請大家圍觀。可以參考...

    KevinYan 評論0 收藏0

發(fā)表評論

0條評論

fnngj

|高級講師

TA的文章

閱讀更多
最新活動
閱讀需要支付1元查看
<