摘要:在公開的方法中,為的設(shè)置了繼承于回調(diào)句柄。如此看來,如果想要異步通信完畢后,處理一些回調(diào),則只需實(shí)現(xiàn),并在適當(dāng)?shù)奈恢迷O(shè)置到的的里。在其保護(hù)方法里,創(chuàng)建了對象,并傳入了。
ActiveMQChannelHandler
NettyConnector在公開的start方法中,為Channel的pipeline設(shè)置了ActiveMQChannelHandler(繼承于io.netty.channel.ChannelDuplexHandler)回調(diào)句柄。
ActiveMQChannelHandler其構(gòu)造函數(shù)定義如下:
ActiveMQChannelHandler(final ChannelGroup group, final BufferHandler handler, final BaseConnectionLifeCycleListener listener)
可見它接收了一個BufferHandler對象。在其channelRead這個callback方法中,調(diào)用了這個BufferHandler對象bufferReceived方法。
如此看來,如果想要Netty異步通信完畢后,處理一些回調(diào),則只需實(shí)現(xiàn)BufferHandler,并在適當(dāng)?shù)奈恢迷O(shè)置到Netty的Channel的pipeline里。
BufferHandlerClientSessionFactoryImpl在其保護(hù)方法createConnector里,創(chuàng)建了NettyConnector對象,并傳入了DelegatingBufferHandler。
DelegatingBufferHandler實(shí)現(xiàn)了BufferHandler,可用來處理Netty回調(diào)。
DelegatingBufferHandler定義如下,它是定義在ClientSessionFactoryImpl類里的:
private class DelegatingBufferHandler implements BufferHandler { @Override public void bufferReceived(final Object connectionID, final ActiveMQBuffer buffer) { RemotingConnection theConn = connection; if (theConn != null && connectionID.equals(theConn.getID())) { theConn.bufferReceived(connectionID, buffer); } else { logger.debug("TheConn == null on ClientSessionFactoryImpl::DelegatingBufferHandler, ignoring packet"); } } }
也就是說,在Netty執(zhí)行回調(diào)時,會調(diào)用ClientSessionFactory中的成員對象connection(類型:RemotingConnection)的bufferReceived方法來處理數(shù)據(jù)。
RemotingConnection實(shí)際上RemotingConnection也是一種BufferHandler
RemotingConnection(Impl)實(shí)現(xiàn)了bufferReceived(connectionID, buffer)方法,該方法會根據(jù)傳入的buffer來decode出一個package。
bufferReceived
=> doBufferReceived (以下ChannelImpl對應(yīng)的實(shí)例,是根據(jù)decode出來的package對應(yīng)的channelID,到RemotingConnectionImpl包含的channel集合里取得的)
=> ChannelImpl::doBufferReceived
=> ChannelImpl::handlePacket
=> ChannelImpl::clearUpTo
=> commandConfirmationHandler.commandConfirmed(packet)
以ArtemisMQMessageProducer為例:
他的send方法中,最后是調(diào)用core api的ClientProducer的send方法的,傳入一個core api的handler —— CompletionListenerWrapper(繼承于SendAcknowledgementHandler類型),它包裝了JMS的CompletionListener。
再轉(zhuǎn)到ClientProducer的send方法, 它又調(diào)用了doSend方法,
然后它又調(diào)用了sendRegularMessage方法,它又調(diào)用了sessionContext.sendFullMessage方法。
在sessionContext.sendFullMessage方法里,可以看到,handler被包裝到packet里了,并且傳給了sessionChannel.sendBatched(packet)方法去異步發(fā)送了。
在服務(wù)器返回的packet里,也會帶有這個handler,然后BufferHandler的實(shí)現(xiàn)者RemotingConnection(Impl)的bufferReceived方法會被回調(diào),它會解析服務(wù)器回傳的packet里的handler進(jìn)行執(zhí)行。
CompletionListenerWrapper類定義:packet是SessionSendMessage類型的消息的別名
sessionContext.sendFullMessage方法里負(fù)責(zé)將SendAcknowledgementHandler包裝到SessionSendMessage類型的packet里,然后才發(fā)送至服務(wù)器
服務(wù)器返回的packet,也會首先被轉(zhuǎn)換成SessionSendMessage類型,然后獲取里面包含的SendAcknowledgementHandler類型的回調(diào)handler執(zhí)行回調(diào)。
private static final class CompletionListenerWrapper implements SendAcknowledgementHandler { private final CompletionListener completionListener; private final Message jmsMessage; private final ActiveMQMessageProducer producer; /** * @param jmsMessage * @param producer */ private CompletionListenerWrapper(CompletionListener listener, Message jmsMessage, ActiveMQMessageProducer producer) { this.completionListener = listener; this.jmsMessage = jmsMessage; this.producer = producer; } @Override public void sendAcknowledged(org.apache.activemq.artemis.api.core.Message clientMessage) { if (jmsMessage instanceof StreamMessage) { try { ((StreamMessage) jmsMessage).reset(); } catch (JMSException e) { // HORNETQ-1209 XXX ignore? } } if (jmsMessage instanceof BytesMessage) { try { ((BytesMessage) jmsMessage).reset(); } catch (JMSException e) { // HORNETQ-1209 XXX ignore? } } try { producer.connection.getThreadAwareContext().setCurrentThread(true); completionListener.onCompletion(jmsMessage); } finally { producer.connection.getThreadAwareContext().clearCurrentThread(true); } } @Override public String toString() { return CompletionListenerWrapper.class.getSimpleName() + "( completionListener=" + completionListener + ")"; } }
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/66894.html
摘要:還自動配置發(fā)送和接收消息所需的基礎(chǔ)設(shè)施。支持是一個輕量級的可靠的可伸縮的可移植的消息代理,基于協(xié)議,使用通過協(xié)議進(jìn)行通信。 32. 消息傳遞 Spring框架為與消息傳遞系統(tǒng)集成提供了廣泛的支持,從使用JmsTemplate簡化的JMS API到使用完整的基礎(chǔ)設(shè)施異步接收消息,Spring AMQP為高級消息隊(duì)列協(xié)議提供了類似的特性集。Spring Boot還為RabbitTempla...
摘要:通過以上修改保證了客戶端連接能夠快速的斷開,在應(yīng)用重啟時不會持續(xù)往這邊發(fā)送消息,我使用進(jìn)行壓測,重啟消費(fèi)者過程中,消息都正常。 showImg(https://segmentfault.com/img/bVbjWjt?w=470&h=200);2018年6月份,我們開發(fā)了兩個使用Artemis做消息隊(duì)列實(shí)現(xiàn)的積分模塊和PUSH推送模塊,在幾輪測試以后,大家信心滿滿的正式上線了,而且經(jīng)過...
摘要:本文旨在指出中集成的一些性能陷阱,在另一篇文章各組件詳解里有組件介紹及如何正確使用的內(nèi)容。因此的做法會大大降低性能,并且將大部分的時間都花在反復(fù)重建這些對象上。提供的可以讓使用避免頻繁創(chuàng)建的問題。至于使用的性能測試則留給同學(xué)自己做了。 Github 本文旨在指出Spring/Spring Boot中集成JMS的一些性能陷阱,在另一篇文章Spring JMS各組件詳解里有Spring J...
閱讀 1579·2021-10-25 09:44
閱讀 2945·2021-09-04 16:48
閱讀 1581·2019-08-30 15:44
閱讀 2517·2019-08-30 15:44
閱讀 1746·2019-08-30 15:44
閱讀 2835·2019-08-30 14:14
閱讀 2984·2019-08-30 13:00
閱讀 2160·2019-08-30 11:09