摘要:背景通過接口實(shí)現(xiàn)調(diào)用發(fā)送數(shù)據(jù),接口返回值為發(fā)送數(shù)據(jù)的對(duì)應(yīng)結(jié)果。接口為同步阻塞,為異步回調(diào)方式。接收數(shù)據(jù)回調(diào)接收到數(shù)據(jù)后,通過閉鎖釋放阻塞的線程,同時(shí)設(shè)置結(jié)果返回給調(diào)用者
背景
通過HTTP接口實(shí)現(xiàn)調(diào)用MQTT Client發(fā)送數(shù)據(jù),HTTP接口返回值為MQTT Client發(fā)送數(shù)據(jù)的對(duì)應(yīng)結(jié)果。 HTTP接口為同步阻塞,MQTT Client 為異步回調(diào)方式。
如何實(shí)現(xiàn)在HTTP接口中調(diào)用MQTT Client發(fā)送數(shù)據(jù)后,能夠阻塞等待MQTT返回結(jié)果,然后將結(jié)果返回?
CountDownLatch + Callbale+FutureTask
1.CountDownLatch作用
CountDownLatch實(shí)現(xiàn)在MQTT Client 發(fā)送數(shù)據(jù)后 到接收數(shù)據(jù)后這段時(shí)間的阻塞。 HTTP每次請(qǐng)求,新建一個(gè)CountDownLatch,然后將CountDownLatch作為值和deviceId作為KEY保存到Map中, 調(diào)用MQTT Client 發(fā)送數(shù)據(jù)后,countDownLatch.await(),進(jìn)行同步等待 在MQTT Client接收數(shù)據(jù)的回調(diào)方法中更加deviceId取出CountDwonLatch然后計(jì)數(shù)減一
2.Callbale+FutureTask作用
將調(diào)用MQTT Client發(fā)送數(shù)據(jù)的過程,封裝成Callable,投遞發(fā)送任務(wù)時(shí),通過返回的FutureTask的get()方法, 同步阻塞,直到結(jié)果返回。關(guān)鍵代碼
1.Map保存CountDownObj用于同步阻塞等待MQTT Client返回結(jié)果,以及將返回結(jié)果傳遞個(gè)FutureTask
private final static ConcurrentMapcountDownLatchMap = new ConcurrentHashMap<>(); //線程池 private final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3, 5, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), runnable -> { Thread thread = new Thread(runnable, "mqtt thread"); return thread; });
2.HTTP API 調(diào)用的發(fā)送MQTT 消息數(shù)據(jù)的接口
/** * HTTP API 調(diào)用的發(fā)送MQTT 消息數(shù)據(jù)的接口 * 同步阻塞 */ public Integer send(Long packageId, String deviceId) throws Exception { ...... FutureTaskfutureTask = sendTask(publishDto)); return futureTask.get() }
3.投遞發(fā)送MQTT指令的task方法
/** * 投遞MQTT發(fā)送指令任務(wù) * 同步阻塞 */ private FutureTasksendTask(PublishDto publishDto) throws Exception { FutureTask futureTask = new FutureTask<>(new GetDatapointValueCallable(publishDto)); threadPoolExecutor.execute(futureTask); //阻塞線程 return futureTask; }
4.封裝CountDownLatch 和 Integer的對(duì)象,用于CountDownLatch阻塞控制和返回結(jié)果
/** * 封裝CountDownLatch 和 Integer * 用于CountDownLatch阻塞控制和返回結(jié)果 */ private class CountDownObj { private final CountDownLatch countDownLatch; private volatile Integer value; private CountDownObj(CountDownLatch countDownLatch) { this.countDownLatch = countDownLatch; } public CountDownLatch getCountDownLatch() { return countDownLatch; } public Integer getValue() { return value; } public void setValue(Integer value) { this.value = value; } }
5.具體發(fā)送MQTT數(shù)據(jù)的Callbale線程Task,會(huì)新建CountDownLatch,并通過CountDownLatch.await()方法阻塞,直到MQTT回調(diào)接收到數(shù)據(jù)或者超時(shí)。
/** * 發(fā)送MQTT消息的任務(wù)Callable */ private class GetDatapointValueCallable implements Callable{ private final PublishDto publishDto; GetDatapointValueCallable(PublishDto publishDto) { this.publishDto = publishDto; } @Override public Integer call() throws Exception { //mqtt client 發(fā)送數(shù)據(jù),此處具體代碼省略 ...... CountDownLatch countDownLatch = new CountDownLatch(1); countDownLatchMap.putIfAbsent(publishDto.getDeviceId(), new CountDownObj(countDownLatch)); //阻塞,超時(shí)時(shí)間3s countDownLatch.await(3, TimeUnit.SECONDS); //返回mqtt指令對(duì)應(yīng)的結(jié)果或者null return countDownLatchMap.remove(publishDto.getDeviceId()).getValue(); } }
6.MQTT接收數(shù)據(jù)回調(diào),這里通過deviceId從MAP里面取到CountDownObj,釋放閉鎖(結(jié)束callable線程的等待)和設(shè)置MQTT返回的結(jié)果(即callable中call()返回的結(jié)果,也就是FutureTask的get()方法返回的結(jié)果)。
/** * MQTT 接收數(shù)據(jù)回調(diào) */ void mqttReceiveCallback(String deviceId, String datapointId, String value) { ...... //接收到數(shù)據(jù)后,通過閉鎖釋放阻塞的線程,同時(shí)設(shè)置結(jié)果返回給調(diào)用者 CountDownObj countDownObj=countDownLatchMap.get(deviceId); if(countDownObj!=null) { countDownObj.setValue(Integer.parseInt(value)); countDownObj.getCountDownLatch().countDown(); } ....... }
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/68941.html
摘要:線程啟動(dòng)規(guī)則對(duì)象的方法先行發(fā)生于此線程的每一個(gè)動(dòng)作。所以局部變量是不被多個(gè)線程所共享的,也就不會(huì)出現(xiàn)并發(fā)問題。通過獲取到數(shù)據(jù),放入當(dāng)前線程處理完之后將當(dāng)前線程中的信息移除。主線程必須在啟動(dòng)其他線程后立即調(diào)用方法。 一、線程安全性 定義:當(dāng)多個(gè)線程訪問某個(gè)類時(shí),不管運(yùn)行時(shí)環(huán)境采用何種調(diào)度方式,或者這些線程將如何交替執(zhí)行,并且在主調(diào)代碼中不需要任何額外的同步或協(xié)同,這個(gè)類都能表現(xiàn)出正確的行...
摘要:但是單核我們還是要應(yīng)用多線程,就是為了防止阻塞。多線程可以防止這個(gè)問題,多條線程同時(shí)運(yùn)行,哪怕一條線程的代碼執(zhí)行讀取數(shù)據(jù)阻塞,也不會(huì)影響其它任務(wù)的執(zhí)行。 1、多線程有什么用?一個(gè)可能在很多人看來(lái)很扯淡的一個(gè)問題:我會(huì)用多線程就好了,還管它有什么用?在我看來(lái),這個(gè)回答更扯淡。所謂知其然知其所以然,會(huì)用只是知其然,為什么用才是知其所以然,只有達(dá)到知其然知其所以然的程度才可以說是把一個(gè)知識(shí)點(diǎn)...
摘要:典型地,和被用在等待另一個(gè)線程產(chǎn)生的結(jié)果的情形測(cè)試發(fā)現(xiàn)結(jié)果還沒有產(chǎn)生后,讓線程阻塞,另一個(gè)線程產(chǎn)生了結(jié)果后,調(diào)用使其恢復(fù)。使當(dāng)前線程放棄當(dāng)前已經(jīng)分得的時(shí)間,但不使當(dāng)前線程阻塞,即線程仍處于可執(zhí)行狀態(tài),隨時(shí)可能再次分得時(shí)間。 1、說說進(jìn)程,線程,協(xié)程之間的區(qū)別 簡(jiǎn)而言之,進(jìn)程是程序運(yùn)行和資源分配的基本單位,一個(gè)程序至少有一個(gè)進(jìn)程,一個(gè)進(jìn)程至少有一個(gè)線程.進(jìn)程在執(zhí)行過程中擁有獨(dú)立的內(nèi)存單元...
摘要:大多數(shù)待遇豐厚的開發(fā)職位都要求開發(fā)者精通多線程技術(shù)并且有豐富的程序開發(fā)調(diào)試優(yōu)化經(jīng)驗(yàn),所以線程相關(guān)的問題在面試中經(jīng)常會(huì)被提到。掌握了這些技巧,你就可以輕松應(yīng)對(duì)多線程和并發(fā)面試了。進(jìn)入等待通行準(zhǔn)許時(shí),所提供的對(duì)象。 最近看到網(wǎng)上流傳著,各種面試經(jīng)驗(yàn)及面試題,往往都是一大堆技術(shù)題目貼上去,而沒有答案。 不管你是新程序員還是老手,你一定在面試中遇到過有關(guān)線程的問題。Java語(yǔ)言一個(gè)重要的特點(diǎn)就...
摘要:每個(gè)工作線程在結(jié)束前將門栓計(jì)數(shù)器減一,門栓的計(jì)數(shù)變?yōu)榫捅砻鞴ぷ魍瓿伞3S梅椒ㄟf減鎖存器的計(jì)數(shù),如果計(jì)數(shù)到達(dá)零,則釋放所有等待的線程。使當(dāng)前線程在鎖存器倒計(jì)數(shù)至零之前一直等待,除非線程被中斷或超出了指定的等待時(shí)間。 【同步器 java.util.concurrent包包含幾個(gè)能幫助人們管理相互合作的線程集的類。這些機(jī)制具有為線程直間的共用集結(jié)點(diǎn)模式提供的‘預(yù)制功能’。如果有一個(gè)相互合作的...
閱讀 2083·2019-08-30 15:52
閱讀 2475·2019-08-29 18:37
閱讀 830·2019-08-29 12:33
閱讀 2871·2019-08-29 11:04
閱讀 1586·2019-08-27 10:57
閱讀 2127·2019-08-26 13:38
閱讀 2793·2019-08-26 12:25
閱讀 2484·2019-08-26 12:23