摘要:在發(fā)送后端監(jiān)聽聲明的排他隊(duì)列,當(dāng)收到消息后比對正確則處理消息斷開監(jiān)聽連接,然后此隊(duì)列被系統(tǒng)自動回收。并且通過也看到了這條消息的返回。此時(shí)我們基本已經(jīng)將問題鎖定在端了。
背景
公司的一個(gè)項(xiàng)目使用rabbitmq作為broker進(jìn)行交互,并且數(shù)據(jù)的查詢方法使用RPC模式,RPC Client端使用java編寫并使用springAMQP包與rabbitmq交互,在RPC Server端使用python的 pika包與rabbitmq交互。兩端都使用標(biāo)準(zhǔn)官方例程,發(fā)現(xiàn)在Client端發(fā)送的消息可以被Server端接收并處理然后返回結(jié)果,但是Client端只會會收到一個(gè)null值。
問題排查 1 理解傳統(tǒng)的RPC模式運(yùn)行流程傳統(tǒng)模式下 Client端向一個(gè)指定的隊(duì)列里推送消息,并聲明一個(gè)一次性排他隊(duì)列,然后將發(fā)送消息頭部的reply-to屬性的值設(shè)置為隊(duì)列的名字,correlation_id屬性設(shè)置為一個(gè)隨機(jī)生成的值用于消息鑒定然后發(fā)送消息。在發(fā)送后Client端監(jiān)聽聲明的排他隊(duì)列,當(dāng)收到消息后比對correaltiion_id,正確則處理消息斷開監(jiān)聽連接,然后此隊(duì)列被系統(tǒng)自動回收。 在Server端收到消息后處理消息然后將消息返回,返回的消息的routing-key設(shè)置為reply-to的值,properties中設(shè)置correlation_id為收到的correlation_id值。這樣就完成一次RPC交互模式。
要解決今天這個(gè)問題我們還要知道幾個(gè)知識點(diǎn):
1當(dāng)消息發(fā)送到exchange后如果沒有隊(duì)列接收此消息,那么此消息就會丟失。
2 一次性的排他隊(duì)列在Client不在監(jiān)聽此隊(duì)列就會自動被rabbitmq刪除。
排查1 Client端收到的Null值從哪里來?因?yàn)槲沂鞘褂胮ython寫RPC Server端并且我也不怎么會java代碼?!?br>所以這個(gè)null值從那里來我就無法從Client端下手。那我們只能從Server端進(jìn)行排查。(最后我認(rèn)為是在java代碼編寫錯(cuò)誤(是自己的代碼)的情況下 springAMQP返回的一個(gè)默認(rèn)值)
排查2 Server端收到消息后是否正確的將消息返回在Server端打印收到的message并打印此消息的header信息和body信息,看到在reply-to中就是Client端設(shè)置的隊(duì)列。并且通過rabbitmq也看到了這條消息的返回。
排查3 觀察消息有沒有被推送回reply-to隊(duì)列然后我在Server端收到消息后的callback函數(shù)的頭部大了斷點(diǎn),接收到消息后Server端程序掛起。此時(shí)我去查看reply-to中的隊(duì)列,發(fā)現(xiàn)其已經(jīng)不存在于rabbitmq中了。 由上面的傳統(tǒng)RPC模式我推斷出 可能是Client端發(fā)送代碼后沒有監(jiān)聽reply-to隊(duì)列造成隊(duì)列消失,然后Server端發(fā)送的消息因?yàn)闆]有接收隊(duì)列而被丟棄。此時(shí)我們基本已經(jīng)將問題鎖定在Client端了。但是Client端的代碼是按照rabbitmq官方給的例程書寫,應(yīng)該是沒有問題的。此時(shí)似乎陷入了僵局。
定位問題:Google大發(fā)加官方文檔這時(shí)候我Google一下SpringAMQP框架的是如何寫RPC代碼?在一些帖子中我發(fā)現(xiàn)有的代碼會添加一個(gè)Listener的類,但有的又不添加。我們假設(shè)他們都是可以運(yùn)行的。那么是什么原因會造成這種情況呢?我第一個(gè)就是想到了版本問題。隨著版本的改變可能代碼也會發(fā)生變化。之后我就在SpringAMQP的官方文檔里面進(jìn)行查找。果然被我找到了,官方文檔里面有這樣一段描述:
Starting with?version 3.4.0, the RabbitMQ server now supports?Direct reply-to; this eliminates the main reason for a fixed reply queue (to avoid the need to create a temporary queue for each request). Starting with?Spring AMQP version 1.4.1?Direct reply-to will be used by default (if supported by the server) instead of creating temporary reply queues. When no?replyQueue ?is provided (or it is set with the name?amq.rabbitmq.reply-to), the?RabbitTemplate?will automatically detect whether Direct reply-to is supported and either use it or fall back to using a temporary reply queue. When using Direct reply-to, a?reply-listener is not required and should not be configured.
springAMQP官方地址
翻譯一下大體意思就是在RabbitMQ3.4.0版本以后官方提供一種叫做Direct reply-to的方式來實(shí)現(xiàn)RPC(這種方式可以極大的提高RPC的性能,因?yàn)樗恍枰看握埱驝lient端都要新申請一個(gè)隊(duì)列,之后我會再寫一篇來詳細(xì)介紹(翻譯 o(∩_∩)o 哈哈 )這個(gè)特性。并且在SpringAMQP version 1.4.1版本之后默認(rèn)使用特性,看了一下服務(wù)器上的rabbitmq版本3.3.0 這個(gè)真的老果然不支持,SpringAMQP的版本果然也是高于這個(gè)版本,問題找到。開心 , 但是怎么解決呢?
Direct reply-to 官方介紹
難點(diǎn)1 python的官網(wǎng)沒有給例程 ,不過給了介紹也告訴了如何來實(shí)現(xiàn)
難點(diǎn)2 服務(wù)器提升版本,已經(jīng)有業(yè)務(wù)跑在上面了,我這種對rabbitmq的萌新對rabbitmq各版本升級后的改變并不是很了解,估計(jì)是難說動領(lǐng)導(dǎo)換了。
針對難點(diǎn)2 我就不想了 不過難點(diǎn)1的我已經(jīng)寫出來python如何適配direct reply-to的代碼。
更改都是在Client端,Server端還是可以保持不變。主要主機(jī)這幾個(gè)方面
1 reply-to的名字更改為‘a(chǎn)mq.rabbitmq.reply-to’這條虛擬隊(duì)列,你在rabbitmq的控制臺上是看不到這條隊(duì)列的。
2 然后Client監(jiān)聽這條隊(duì)列的時(shí)候要設(shè)為為no-ack模式。
下面是根據(jù)官方python RPC代碼更改的 適配 Direct reply-to的python代碼
Client端 python代碼
# -*- coding:utf-8 -*- #!/usr/bin/env python import pika import uuid class FibonacciRpcClient(object): def __init__(self): self.connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost")) self.channel = self.connection.channel() # result = self.channel.queue_declare(exclusive=True) # self.callback_queue = result.method.queue # self.channel.basic_consume(self.on_response, no_ack=True, # queue=self.callback_queue) # 監(jiān)聽隊(duì)列為 amp.rabbitmq.reply-to 啟動no_ack 模式 self.channel.basic_consume(self.on_response, queue="amq.rabbitmq.reply-to", no_ack=True) def on_response(self, ch, method, props, body): if self.corr_id == props.correlation_id: self.response = body def call(self, n): self.response = None self.corr_id = str(uuid.uuid4()) self.channel.basic_publish(exchange="", routing_key="rpc_queue", properties=pika.BasicProperties( # reply_to = self.callback_queue, # 更改了隊(duì)列名字 reply_to="amq.rabbitmq.reply-to", correlation_id=self.corr_id, ), body=str(n)) while self.response is None: self.connection.process_data_events() return int(self.response) fibonacci_rpc = FibonacciRpcClient() print(" [x] Requesting fib(30)") response = fibonacci_rpc.call(30) print(" [.] Got %r" % response)
Server端代碼 沒有改動
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost")) channel = connection.channel() channel.queue_declare(queue="rpc_queue") def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n-1) + fib(n-2) def on_request(ch, method, props, body): n = int(body) print(" [.] fib(%s)" % n) response = fib(n) ch.basic_publish(exchange="", routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id = props.correlation_id), body=str(response)) # ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(on_request, queue="rpc_queue") print(" [x] Awaiting RPC requests") channel.start_consuming()解決辦法2 java代碼不使用默認(rèn)的direct reply-to模式
這個(gè)辦法因?yàn)槲也皇菍慾ava的所以我只能寫一些我在官方文檔里面理解的東西了。就是當(dāng)你不使用SpringAMQP的默認(rèn)RPC模式的化需要增加Listener對象來監(jiān)聽自己的隊(duì)列。
RabbitTemplate rabbitTempete=new RabbitTemplate(connectionFactory); rabbitTempete.setExchange(exchangeName); rabbitTempete.setRoutingKey(topic); //比官方文檔多的 Queue replyqQueue=replyQueue(); admin.declareQueue(replyqQueue); rabbitTempete.setReplyQueue(replyqQueue); SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueues(replyqQueue); container.setMessageListener(rabbitTempete); container.start(); //比官方文檔多的停止 Object response=rabbitTempete.convertSendAndReceive(t);
SpringAMQP書寫官方文檔
相比較要自己申請隊(duì)列自己監(jiān)聽。不過我也沒試過這段代碼就不知道能不能用了。
這個(gè)問題基本得到很好的解決了。解決一個(gè)問題首先你要明白一個(gè)東西正常情況下是一種什么狀況,然后出了問題就從前往后,從后往前,從中往兩邊等等等。然后Google,或者官方文檔,官方論壇。我個(gè)人認(rèn)為官方文檔真的是好東西。無數(shù)的淺坑的解決辦法都在官方文檔。當(dāng)然深坑就不說了那就是論壇加能力加運(yùn)氣才能排查出來的了。不過官方大多都是英文。真是愁人,我 加強(qiáng)英語能力吧。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/40679.html
一、RabbitMQ整合SpringAMQP RabbitAdmin RabbitAdmin類可以很好的操作RabbitMQ,在Spring中直接進(jìn)行注入即可。 注意:autoStartup必須要設(shè)置為true,否則Spring容器不會加載RabbitAdmin類 RabbitAdmin底層實(shí)現(xiàn)就是從Spring容器中獲取Exchange、Binding、RoutingKey以及Queue的...
摘要:后續(xù)介紹交換機(jī),生產(chǎn)者直接將消息投遞到中。消息,服務(wù)器和應(yīng)用程序之間傳送的數(shù)據(jù),由和組成。也稱為消息隊(duì)列,保存消息并將它們轉(zhuǎn)發(fā)給消費(fèi)者。主要是應(yīng)為和有一個(gè)綁定的關(guān)系。 showImg(https://img-blog.csdnimg.cn/20190509221741422.gif); showImg(https://img-blog.csdnimg.cn/20190731191914...
摘要:為了避免與參數(shù)混淆,我們將其稱為綁定鍵。直接交換我們之前教程的日志記錄系統(tǒng)將所有消息廣播給所有消費(fèi)者。在這種設(shè)置中,使用路由鍵發(fā)布到交換機(jī)的消息將被路由到隊(duì)列。所有其他消息將被丟棄。 源碼:https://github.com/ltoddy/rabbitmq-tutorial 路由 本章節(jié)教程重點(diǎn)介紹的內(nèi)容 在之前的教程中,我們構(gòu)建了一個(gè)簡單的日志系統(tǒng) 我們能夠?qū)⑷罩鞠V播給許多接收...
摘要:需要特別明確的概念交換機(jī)的持久化,并不等于消息的持久化。消息的處理,是有兩種方式,一次性。在上述示例中,使用的,意味著接收全部的消息。注意與是兩個(gè)不同的隊(duì)列。后端處理,可以針對每一個(gè)啟動一個(gè)或多個(gè),以提高消息處理的實(shí)時(shí)性。 RabbitMQ與PHP(一) 項(xiàng)目中使用RabbitMQ作為隊(duì)列處理用戶消息通知,消息由前端PHP代碼產(chǎn)生,處理消息使用Python,這就導(dǎo)致代碼一致性問題,調(diào)...
摘要:為了預(yù)防消息丟失,提供了,即工作進(jìn)程在收到消息并處理后,發(fā)送給,告知這時(shí)候可以把該消息從隊(duì)列中刪除了。如果工作進(jìn)程掛掉了,沒有收到,那么會把該消息重新分發(fā)給其他工作進(jìn)程。之前在發(fā)布消息時(shí),的值為即使用。 HelloWorld 簡介 RabbitMQ:接受消息再傳遞消息,可以視為一個(gè)郵局。發(fā)送者和接受者通過隊(duì)列來進(jìn)行交互,隊(duì)列的大小可以視為無限的,多個(gè)發(fā)送者可以發(fā)生給一個(gè)隊(duì)列,多個(gè)接收者...
閱讀 2603·2019-08-30 10:53
閱讀 3204·2019-08-29 16:20
閱讀 2962·2019-08-29 15:35
閱讀 1782·2019-08-29 12:24
閱讀 2890·2019-08-28 18:19
閱讀 1871·2019-08-23 18:07
閱讀 2352·2019-08-23 15:31
閱讀 1183·2019-08-23 14:05