成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

CountDownLatch + Callbale+FutureTask 實(shí)現(xiàn)異步變同步調(diào)用

張金寶 / 3744人閱讀

摘要:背景通過接口實(shí)現(xiàn)調(diào)用發(fā)送數(shù)據(jù),接口返回值為發(fā)送數(shù)據(jù)的對(duì)應(yīng)結(jié)果。接口為同步阻塞,為異步回調(diào)方式。接收數(shù)據(jù)回調(diào)接收到數(shù)據(jù)后,通過閉鎖釋放阻塞的線程,同時(shí)設(shè)置結(jié)果返回給調(diào)用者

背景

通過HTTP接口實(shí)現(xiàn)調(diào)用MQTT Client發(fā)送數(shù)據(jù),HTTP接口返回值為MQTT Client發(fā)送數(shù)據(jù)的對(duì)應(yīng)結(jié)果。 HTTP接口為同步阻塞,MQTT Client 為異步回調(diào)方式。
如何實(shí)現(xiàn)在HTTP接口中調(diào)用MQTT Client發(fā)送數(shù)據(jù)后,能夠阻塞等待MQTT返回結(jié)果,然后將結(jié)果返回?

解決方法

CountDownLatch + Callbale+FutureTask

1.CountDownLatch作用

CountDownLatch實(shí)現(xiàn)在MQTT Client 發(fā)送數(shù)據(jù)后 到接收數(shù)據(jù)后這段時(shí)間的阻塞。
HTTP每次請(qǐng)求,新建一個(gè)CountDownLatch,然后將CountDownLatch作為值和deviceId作為KEY保存到Map中,
調(diào)用MQTT Client 發(fā)送數(shù)據(jù)后,countDownLatch.await(),進(jìn)行同步等待
在MQTT Client接收數(shù)據(jù)的回調(diào)方法中更加deviceId取出CountDwonLatch然后計(jì)數(shù)減一


2.Callbale+FutureTask作用

將調(diào)用MQTT Client發(fā)送數(shù)據(jù)的過程,封裝成Callable,投遞發(fā)送任務(wù)時(shí),通過返回的FutureTask的get()方法,
同步阻塞,直到結(jié)果返回。

關(guān)鍵代碼

1.Map保存CountDownObj用于同步阻塞等待MQTT Client返回結(jié)果,以及將返回結(jié)果傳遞個(gè)FutureTask

    private final static ConcurrentMap countDownLatchMap = new ConcurrentHashMap<>();
    //線程池
    private final ThreadPoolExecutor threadPoolExecutor = 
            new ThreadPoolExecutor(3, 5, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), runnable -> {
        Thread thread = new Thread(runnable, "mqtt thread");
        return thread;
    });

2.HTTP API 調(diào)用的發(fā)送MQTT 消息數(shù)據(jù)的接口

    /**
     * HTTP API 調(diào)用的發(fā)送MQTT 消息數(shù)據(jù)的接口
     * 同步阻塞
     */
    public Integer send(Long packageId, String deviceId) throws Exception {
        ......
       FutureTask futureTask = sendTask(publishDto));
       return futureTask.get()
    }

3.投遞發(fā)送MQTT指令的task方法

   /**
     * 投遞MQTT發(fā)送指令任務(wù)
     * 同步阻塞
     */ 
   private FutureTask sendTask(PublishDto publishDto) throws Exception {
        FutureTask futureTask = new FutureTask<>(new GetDatapointValueCallable(publishDto));
        threadPoolExecutor.execute(futureTask);
        //阻塞線程
        return futureTask;
    } 

4.封裝CountDownLatch 和 Integer的對(duì)象,用于CountDownLatch阻塞控制和返回結(jié)果

    /**
     * 封裝CountDownLatch 和 Integer
     * 用于CountDownLatch阻塞控制和返回結(jié)果
     */
    private class CountDownObj {
        private final CountDownLatch countDownLatch;
        private volatile Integer value;

        private CountDownObj(CountDownLatch countDownLatch) {
            this.countDownLatch = countDownLatch;
        }

        public CountDownLatch getCountDownLatch() {
            return countDownLatch;
        }

        public Integer getValue() {
            return value;
        }

        public void setValue(Integer value) {
            this.value = value;
        }
    }

5.具體發(fā)送MQTT數(shù)據(jù)的Callbale線程Task,會(huì)新建CountDownLatch,并通過CountDownLatch.await()方法阻塞,直到MQTT回調(diào)接收到數(shù)據(jù)或者超時(shí)。

    /**
     * 發(fā)送MQTT消息的任務(wù)Callable
     */
    private class GetDatapointValueCallable implements Callable {
        private final PublishDto publishDto;

        GetDatapointValueCallable(PublishDto publishDto) {
            this.publishDto = publishDto;
        }

        @Override
        public Integer call() throws Exception {
            //mqtt client 發(fā)送數(shù)據(jù),此處具體代碼省略
            ......
            
            CountDownLatch countDownLatch = new CountDownLatch(1);
            countDownLatchMap.putIfAbsent(publishDto.getDeviceId(), new CountDownObj(countDownLatch));
            //阻塞,超時(shí)時(shí)間3s
            countDownLatch.await(3, TimeUnit.SECONDS);
            //返回mqtt指令對(duì)應(yīng)的結(jié)果或者null
            return countDownLatchMap.remove(publishDto.getDeviceId()).getValue();
        }

    }

6.MQTT接收數(shù)據(jù)回調(diào),這里通過deviceId從MAP里面取到CountDownObj,釋放閉鎖(結(jié)束callable線程的等待)和設(shè)置MQTT返回的結(jié)果(即callable中call()返回的結(jié)果,也就是FutureTask的get()方法返回的結(jié)果)。

    /**
     * MQTT 接收數(shù)據(jù)回調(diào)
     */
    void mqttReceiveCallback(String deviceId, String datapointId, String value) {
        ......
        
        //接收到數(shù)據(jù)后,通過閉鎖釋放阻塞的線程,同時(shí)設(shè)置結(jié)果返回給調(diào)用者
        CountDownObj countDownObj=countDownLatchMap.get(deviceId);
        if(countDownObj!=null) {
            countDownObj.setValue(Integer.parseInt(value));
            countDownObj.getCountDownLatch().countDown();
        }
        
        .......
    }

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/68941.html

相關(guān)文章

  • Java多線程&高并發(fā)

    摘要:線程啟動(dòng)規(guī)則對(duì)象的方法先行發(fā)生于此線程的每一個(gè)動(dòng)作。所以局部變量是不被多個(gè)線程所共享的,也就不會(huì)出現(xiàn)并發(fā)問題。通過獲取到數(shù)據(jù),放入當(dāng)前線程處理完之后將當(dāng)前線程中的信息移除。主線程必須在啟動(dòng)其他線程后立即調(diào)用方法。 一、線程安全性 定義:當(dāng)多個(gè)線程訪問某個(gè)類時(shí),不管運(yùn)行時(shí)環(huán)境采用何種調(diào)度方式,或者這些線程將如何交替執(zhí)行,并且在主調(diào)代碼中不需要任何額外的同步或協(xié)同,這個(gè)類都能表現(xiàn)出正確的行...

    SQC 評(píng)論0 收藏0
  • 40道阿里巴巴JAVA研發(fā)崗多線程面試題詳解,你能答出多少

    摘要:但是單核我們還是要應(yīng)用多線程,就是為了防止阻塞。多線程可以防止這個(gè)問題,多條線程同時(shí)運(yùn)行,哪怕一條線程的代碼執(zhí)行讀取數(shù)據(jù)阻塞,也不會(huì)影響其它任務(wù)的執(zhí)行。 1、多線程有什么用?一個(gè)可能在很多人看來(lái)很扯淡的一個(gè)問題:我會(huì)用多線程就好了,還管它有什么用?在我看來(lái),這個(gè)回答更扯淡。所謂知其然知其所以然,會(huì)用只是知其然,為什么用才是知其所以然,只有達(dá)到知其然知其所以然的程度才可以說是把一個(gè)知識(shí)點(diǎn)...

    lpjustdoit 評(píng)論0 收藏0
  • bat等大公司??糺ava多線程面試題

    摘要:典型地,和被用在等待另一個(gè)線程產(chǎn)生的結(jié)果的情形測(cè)試發(fā)現(xiàn)結(jié)果還沒有產(chǎn)生后,讓線程阻塞,另一個(gè)線程產(chǎn)生了結(jié)果后,調(diào)用使其恢復(fù)。使當(dāng)前線程放棄當(dāng)前已經(jīng)分得的時(shí)間,但不使當(dāng)前線程阻塞,即線程仍處于可執(zhí)行狀態(tài),隨時(shí)可能再次分得時(shí)間。 1、說說進(jìn)程,線程,協(xié)程之間的區(qū)別 簡(jiǎn)而言之,進(jìn)程是程序運(yùn)行和資源分配的基本單位,一個(gè)程序至少有一個(gè)進(jìn)程,一個(gè)進(jìn)程至少有一個(gè)線程.進(jìn)程在執(zhí)行過程中擁有獨(dú)立的內(nèi)存單元...

    Charlie_Jade 評(píng)論0 收藏0
  • 想進(jìn)大廠?50個(gè)多線程面試題,你會(huì)多少?【后25題】(二)

    摘要:大多數(shù)待遇豐厚的開發(fā)職位都要求開發(fā)者精通多線程技術(shù)并且有豐富的程序開發(fā)調(diào)試優(yōu)化經(jīng)驗(yàn),所以線程相關(guān)的問題在面試中經(jīng)常會(huì)被提到。掌握了這些技巧,你就可以輕松應(yīng)對(duì)多線程和并發(fā)面試了。進(jìn)入等待通行準(zhǔn)許時(shí),所提供的對(duì)象。 最近看到網(wǎng)上流傳著,各種面試經(jīng)驗(yàn)及面試題,往往都是一大堆技術(shù)題目貼上去,而沒有答案。 不管你是新程序員還是老手,你一定在面試中遇到過有關(guān)線程的問題。Java語(yǔ)言一個(gè)重要的特點(diǎn)就...

    caozhijian 評(píng)論0 收藏0
  • java并發(fā)編程學(xué)習(xí)10--同步器--倒計(jì)時(shí)門栓

    摘要:每個(gè)工作線程在結(jié)束前將門栓計(jì)數(shù)器減一,門栓的計(jì)數(shù)變?yōu)榫捅砻鞴ぷ魍瓿伞3S梅椒ㄟf減鎖存器的計(jì)數(shù),如果計(jì)數(shù)到達(dá)零,則釋放所有等待的線程。使當(dāng)前線程在鎖存器倒計(jì)數(shù)至零之前一直等待,除非線程被中斷或超出了指定的等待時(shí)間。 【同步器 java.util.concurrent包包含幾個(gè)能幫助人們管理相互合作的線程集的類。這些機(jī)制具有為線程直間的共用集結(jié)點(diǎn)模式提供的‘預(yù)制功能’。如果有一個(gè)相互合作的...

    stackfing 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

張金寶

|高級(jí)講師

TA的文章

閱讀更多
最新活動(dòng)
閱讀需要支付1元查看
<