成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

Artemis的JMS客戶端中的CompletionHandler是如何在artemis core

Edison / 999人閱讀

摘要:在公開的方法中,為的設(shè)置了繼承于回調(diào)句柄。如此看來,如果想要異步通信完畢后,處理一些回調(diào),則只需實(shí)現(xiàn),并在適當(dāng)?shù)奈恢迷O(shè)置到的的里。在其保護(hù)方法里,創(chuàng)建了對象,并傳入了。

ActiveMQChannelHandler

NettyConnector在公開的start方法中,為Channel的pipeline設(shè)置了ActiveMQChannelHandler(繼承于io.netty.channel.ChannelDuplexHandler)回調(diào)句柄。
ActiveMQChannelHandler其構(gòu)造函數(shù)定義如下:

ActiveMQChannelHandler(final ChannelGroup group,
                       final BufferHandler handler,
                       final BaseConnectionLifeCycleListener listener)

可見它接收了一個BufferHandler對象。在其channelRead這個callback方法中,調(diào)用了這個BufferHandler對象bufferReceived方法。

如此看來,如果想要Netty異步通信完畢后,處理一些回調(diào),則只需實(shí)現(xiàn)BufferHandler,并在適當(dāng)?shù)奈恢迷O(shè)置到Netty的Channel的pipeline里。

BufferHandler

ClientSessionFactoryImpl在其保護(hù)方法createConnector里,創(chuàng)建了NettyConnector對象,并傳入了DelegatingBufferHandler。
DelegatingBufferHandler實(shí)現(xiàn)了BufferHandler,可用來處理Netty回調(diào)。

DelegatingBufferHandler

DelegatingBufferHandler定義如下,它是定義在ClientSessionFactoryImpl類里的:

private class DelegatingBufferHandler implements BufferHandler {

      @Override
      public void bufferReceived(final Object connectionID, final ActiveMQBuffer buffer) {
         RemotingConnection theConn = connection;

         if (theConn != null && connectionID.equals(theConn.getID())) {
            theConn.bufferReceived(connectionID, buffer);
         } else {
            logger.debug("TheConn == null on ClientSessionFactoryImpl::DelegatingBufferHandler, ignoring packet");
         }
      }
   }

也就是說,在Netty執(zhí)行回調(diào)時,會調(diào)用ClientSessionFactory中的成員對象connection(類型:RemotingConnection)的bufferReceived方法來處理數(shù)據(jù)。

實(shí)際上RemotingConnection也是一種BufferHandler

RemotingConnection

RemotingConnection(Impl)實(shí)現(xiàn)了bufferReceived(connectionID, buffer)方法,該方法會根據(jù)傳入的buffer來decode出一個package。
bufferReceived
=> doBufferReceived (以下ChannelImpl對應(yīng)的實(shí)例,是根據(jù)decode出來的package對應(yīng)的channelID,到RemotingConnectionImpl包含的channel集合里取得的)
=> ChannelImpl::doBufferReceived
=> ChannelImpl::handlePacket
=> ChannelImpl::clearUpTo
=> commandConfirmationHandler.commandConfirmed(packet)

舉例:Artemis中實(shí)現(xiàn)的JMS規(guī)范下的Producer在異步投遞消息后的回調(diào)函數(shù)是如何被調(diào)用的

以ArtemisMQMessageProducer為例:

他的send方法中,最后是調(diào)用core api的ClientProducer的send方法的,傳入一個core api的handler —— CompletionListenerWrapper(繼承于SendAcknowledgementHandler類型),它包裝了JMS的CompletionListener。

再轉(zhuǎn)到ClientProducer的send方法, 它又調(diào)用了doSend方法,

然后它又調(diào)用了sendRegularMessage方法,它又調(diào)用了sessionContext.sendFullMessage方法。

在sessionContext.sendFullMessage方法里,可以看到,handler被包裝到packet里了,并且傳給了sessionChannel.sendBatched(packet)方法去異步發(fā)送了。

在服務(wù)器返回的packet里,也會帶有這個handler,然后BufferHandler的實(shí)現(xiàn)者RemotingConnection(Impl)的bufferReceived方法會被回調(diào),它會解析服務(wù)器回傳的packet里的handler進(jìn)行執(zhí)行。

packet是SessionSendMessage類型的消息的別名
sessionContext.sendFullMessage方法里負(fù)責(zé)將SendAcknowledgementHandler包裝到SessionSendMessage類型的packet里,然后才發(fā)送至服務(wù)器
服務(wù)器返回的packet,也會首先被轉(zhuǎn)換成SessionSendMessage類型,然后獲取里面包含的SendAcknowledgementHandler類型的回調(diào)handler執(zhí)行回調(diào)。

CompletionListenerWrapper類定義:
private static final class CompletionListenerWrapper implements SendAcknowledgementHandler {

      private final CompletionListener completionListener;
      private final Message jmsMessage;
      private final ActiveMQMessageProducer producer;

      /**
       * @param jmsMessage
       * @param producer
       */
      private CompletionListenerWrapper(CompletionListener listener,
                                        Message jmsMessage,
                                        ActiveMQMessageProducer producer) {
         this.completionListener = listener;
         this.jmsMessage = jmsMessage;
         this.producer = producer;
      }

      @Override
      public void sendAcknowledged(org.apache.activemq.artemis.api.core.Message clientMessage) {
         if (jmsMessage instanceof StreamMessage) {
            try {
               ((StreamMessage) jmsMessage).reset();
            } catch (JMSException e) {
               // HORNETQ-1209 XXX ignore?
            }
         }
         if (jmsMessage instanceof BytesMessage) {
            try {
               ((BytesMessage) jmsMessage).reset();
            } catch (JMSException e) {
               // HORNETQ-1209 XXX ignore?
            }
         }

         try {
            producer.connection.getThreadAwareContext().setCurrentThread(true);
            completionListener.onCompletion(jmsMessage);
         } finally {
            producer.connection.getThreadAwareContext().clearCurrentThread(true);
         }
      }

      @Override
      public String toString() {
         return CompletionListenerWrapper.class.getSimpleName() + "( completionListener=" + completionListener + ")";
      }
   }

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/66894.html

相關(guān)文章

  • Spring Boot 參考指南(消息傳遞)

    摘要:還自動配置發(fā)送和接收消息所需的基礎(chǔ)設(shè)施。支持是一個輕量級的可靠的可伸縮的可移植的消息代理,基于協(xié)議,使用通過協(xié)議進(jìn)行通信。 32. 消息傳遞 Spring框架為與消息傳遞系統(tǒng)集成提供了廣泛的支持,從使用JmsTemplate簡化的JMS API到使用完整的基礎(chǔ)設(shè)施異步接收消息,Spring AMQP為高級消息隊(duì)列協(xié)議提供了類似的特性集。Spring Boot還為RabbitTempla...

    Doyle 評論0 收藏0
  • ArtemisMQ“未消費(fèi)之謎”

    摘要:通過以上修改保證了客戶端連接能夠快速的斷開,在應(yīng)用重啟時不會持續(xù)往這邊發(fā)送消息,我使用進(jìn)行壓測,重啟消費(fèi)者過程中,消息都正常。 showImg(https://segmentfault.com/img/bVbjWjt?w=470&h=200);2018年6月份,我們開發(fā)了兩個使用Artemis做消息隊(duì)列實(shí)現(xiàn)的積分模塊和PUSH推送模塊,在幾輪測試以后,大家信心滿滿的正式上線了,而且經(jīng)過...

    tomato 評論0 收藏0
  • 使用Spring/Spring Boot集成JMS陷阱

    摘要:本文旨在指出中集成的一些性能陷阱,在另一篇文章各組件詳解里有組件介紹及如何正確使用的內(nèi)容。因此的做法會大大降低性能,并且將大部分的時間都花在反復(fù)重建這些對象上。提供的可以讓使用避免頻繁創(chuàng)建的問題。至于使用的性能測試則留給同學(xué)自己做了。 Github 本文旨在指出Spring/Spring Boot中集成JMS的一些性能陷阱,在另一篇文章Spring JMS各組件詳解里有Spring J...

    xcold 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<