成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

深入理解線程通信

tuomao / 998人閱讀

摘要:前言開發(fā)中不免會遇到需要所有子線程執(zhí)行完畢通知主線程處理某些邏輯的場景??梢圆捎弥袛嗑€程的方式來通信,調(diào)用了方法其實(shí)就是將中的一個標(biāo)志屬性置為了。實(shí)際開發(fā)中可以靈活根據(jù)需求選擇最適合的線程通信方式。

前言

開發(fā)中不免會遇到需要所有子線程執(zhí)行完畢通知主線程處理某些邏輯的場景。

或者是線程 A 在執(zhí)行到某個條件通知線程 B 執(zhí)行某個操作。

可以通過以下幾種方式實(shí)現(xiàn):

等待通知機(jī)制
等待通知模式是 Java 中比較經(jīng)典的線程通信方式。

兩個線程通過對同一對象調(diào)用等待 wait() 和通知 notify() 方法來進(jìn)行通訊。

如兩個線程交替打印奇偶數(shù):

public class TwoThreadWaitNotify {

    private int start = 1;

    private boolean flag = false;

    public static void main(String[] args) {
        TwoThreadWaitNotify twoThread = new TwoThreadWaitNotify();

        Thread t1 = new Thread(new OuNum(twoThread));
        t1.setName("A");


        Thread t2 = new Thread(new JiNum(twoThread));
        t2.setName("B");

        t1.start();
        t2.start();
    }

    /**
     * 偶數(shù)線程
     */
    public static class OuNum implements Runnable {
        private TwoThreadWaitNotify number;

        public OuNum(TwoThreadWaitNotify number) {
            this.number = number;
        }

        @Override
        public void run() {

            while (number.start <= 100) {
                synchronized (TwoThreadWaitNotify.class) {
                    System.out.println("偶數(shù)線程搶到鎖了");
                    if (number.flag) {
                        System.out.println(Thread.currentThread().getName() + "+-+偶數(shù)" + number.start);
                        number.start++;

                        number.flag = false;
                        TwoThreadWaitNotify.class.notify();

                    }else {
                        try {
                            TwoThreadWaitNotify.class.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }

            }
        }
    }


    /**
     * 奇數(shù)線程
     */
    public static class JiNum implements Runnable {
        private TwoThreadWaitNotify number;

        public JiNum(TwoThreadWaitNotify number) {
            this.number = number;
        }

        @Override
        public void run() {
            while (number.start <= 100) {
                synchronized (TwoThreadWaitNotify.class) {
                    System.out.println("奇數(shù)線程搶到鎖了");
                    if (!number.flag) {
                        System.out.println(Thread.currentThread().getName() + "+-+奇數(shù)" + number.start);
                        number.start++;

                        number.flag = true;

                        TwoThreadWaitNotify.class.notify();
                    }else {
                        try {
                            TwoThreadWaitNotify.class.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        }
    }
}

輸出結(jié)果:

t2+-+奇數(shù)93
t1+-+偶數(shù)94
t2+-+奇數(shù)95
t1+-+偶數(shù)96
t2+-+奇數(shù)97
t1+-+偶數(shù)98
t2+-+奇數(shù)99
t1+-+偶數(shù)100

這里的線程 A 和線程 B 都對同一個對象 TwoThreadWaitNotify.class 獲取鎖,A 線程調(diào)用了同步對象的 wait() 方法釋放了鎖并進(jìn)入 WAITING 狀態(tài)。

B 線程調(diào)用了 notify() 方法,這樣 A 線程收到通知之后就可以從 wait() 方法中返回。

這里利用了 TwoThreadWaitNotify.class 對象完成了通信。

有一些需要注意:

wait() 、nofify() 、nofityAll() 調(diào)用的前提都是獲得了對象的鎖(也可稱為對象監(jiān)視器)。

調(diào)用 wait() 方法后線程會釋放鎖,進(jìn)入 WAITING 狀態(tài),該線程也會被移動到等待隊(duì)列中。

調(diào)用 notify() 方法會將等待隊(duì)列中的線程移動到同步隊(duì)列中,線程狀態(tài)也會更新為 BLOCKED

從 wait() 方法返回的前提是調(diào)用 notify() 方法的線程釋放鎖,wait() 方法的線程獲得鎖。

等待通知有著一個經(jīng)典范式:

線程 A 作為消費(fèi)者:

獲取對象的鎖。

進(jìn)入 while(判斷條件),并調(diào)用 wait() 方法。

當(dāng)條件滿足跳出循環(huán)執(zhí)行具體處理邏輯。

線程 B 作為生產(chǎn)者:

獲取對象鎖。

更改與線程 A 共用的判斷條件。

調(diào)用 notify() 方法。

偽代碼如下:

//Thread A

synchronized(Object){
    while(條件){
        Object.wait();
    }
    //do something
}

//Thread B
synchronized(Object){
    條件=false;//改變條件
    Object.notify();
}
join() 方法
    private static void join() throws InterruptedException {
        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                LOGGER.info("running");
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }) ;
        Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                LOGGER.info("running2");
                try {
                    Thread.sleep(4000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }) ;

        t1.start();
        t2.start();

        //等待線程1終止
        t1.join();

        //等待線程2終止
        t2.join();

        LOGGER.info("main over");
    }

輸出結(jié)果:

2018-03-16 20:21:30.967 [Thread-1] INFO  c.c.actual.ThreadCommunication - running2
2018-03-16 20:21:30.967 [Thread-0] INFO  c.c.actual.ThreadCommunication - running
2018-03-16 20:21:34.972 [main] INFO  c.c.actual.ThreadCommunication - main over

t1.join() 時會一直阻塞到 t1 執(zhí)行完畢,所以最終主線程會等待 t1 和 t2 線程執(zhí)行完畢。

其實(shí)從源碼可以看出,join() 也是利用的等待通知機(jī)制:

核心邏輯:

    while (isAlive()) {
        wait(0);
    }

在 join 線程完成后會調(diào)用 notifyAll() 方法,是在 JVM 實(shí)現(xiàn)中調(diào)用,所以這里看不出來。

volatile 共享內(nèi)存

因?yàn)?Java 是采用共享內(nèi)存的方式進(jìn)行線程通信的,所以可以采用以下方式用主線程關(guān)閉 A 線程:

public class Volatile implements Runnable{

    private static volatile boolean flag = true ;

    @Override
    public void run() {
        while (flag){
            System.out.println(Thread.currentThread().getName() + "正在運(yùn)行。。。");
        }
        System.out.println(Thread.currentThread().getName() +"執(zhí)行完畢");
    }

    public static void main(String[] args) throws InterruptedException {
        Volatile aVolatile = new Volatile();
        new Thread(aVolatile,"thread A").start();


        System.out.println("main 線程正在運(yùn)行") ;

        TimeUnit.MILLISECONDS.sleep(100) ;

        aVolatile.stopThread();

    }

    private void stopThread(){
        flag = false ;
    }
}

輸出結(jié)果:

thread A正在運(yùn)行。。。
thread A正在運(yùn)行。。。
thread A正在運(yùn)行。。。
thread A正在運(yùn)行。。。
thread A執(zhí)行完畢

這里的 flag 存放于主內(nèi)存中,所以主線程和線程 A 都可以看到。

flag 采用 volatile 修飾主要是為了內(nèi)存可見性,更多內(nèi)容可以查看這里。

CountDownLatch 并發(fā)工具

CountDownLatch 可以實(shí)現(xiàn) join 相同的功能,但是更加的靈活。

    private static void countDownLatch() throws Exception{
        int thread = 3 ;
        long start = System.currentTimeMillis();
        final CountDownLatch countDown = new CountDownLatch(thread);
        for (int i= 0 ;i

輸出結(jié)果:

2018-03-16 20:19:44.126 [Thread-0] INFO  c.c.actual.ThreadCommunication - thread run
2018-03-16 20:19:44.126 [Thread-2] INFO  c.c.actual.ThreadCommunication - thread run
2018-03-16 20:19:44.126 [Thread-1] INFO  c.c.actual.ThreadCommunication - thread run
2018-03-16 20:19:46.136 [Thread-2] INFO  c.c.actual.ThreadCommunication - thread end
2018-03-16 20:19:46.136 [Thread-1] INFO  c.c.actual.ThreadCommunication - thread end
2018-03-16 20:19:46.136 [Thread-0] INFO  c.c.actual.ThreadCommunication - thread end
2018-03-16 20:19:46.136 [main] INFO  c.c.actual.ThreadCommunication - main over total time=2012

CountDownLatch 也是基于 AQS(AbstractQueuedSynchronizer) 實(shí)現(xiàn)的,更多實(shí)現(xiàn)參考 ReentrantLock 實(shí)現(xiàn)原理

初始化一個 CountDownLatch 時告訴并發(fā)的線程,然后在每個線程處理完畢之后調(diào)用 countDown() 方法。

該方法會將 AQS 內(nèi)置的一個 state 狀態(tài) -1 。

最終在主線程調(diào)用 await() 方法,它會阻塞直到 state == 0 的時候返回。

CyclicBarrier 并發(fā)工具
    private static void cyclicBarrier() throws Exception {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3) ;

        new Thread(new Runnable() {
            @Override
            public void run() {
                LOGGER.info("thread run");
                try {
                    cyclicBarrier.await() ;
                } catch (Exception e) {
                    e.printStackTrace();
                }

                LOGGER.info("thread end do something");
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                LOGGER.info("thread run");
                try {
                    cyclicBarrier.await() ;
                } catch (Exception e) {
                    e.printStackTrace();
                }

                LOGGER.info("thread end do something");
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                LOGGER.info("thread run");
                try {
                    Thread.sleep(5000);
                    cyclicBarrier.await() ;
                } catch (Exception e) {
                    e.printStackTrace();
                }

                LOGGER.info("thread end do something");
            }
        }).start();

        LOGGER.info("main thread");
    }

CyclicBarrier 中文名叫做屏障或者是柵欄,也可以用于線程間通信。

它可以等待 N 個線程都達(dá)到某個狀態(tài)后繼續(xù)運(yùn)行的效果。

首先初始化線程參與者。

調(diào)用 await() 將會在所有參與者線程都調(diào)用之前等待。

直到所有參與者都調(diào)用了 await() 后,所有線程從 await() 返回繼續(xù)后續(xù)邏輯。

運(yùn)行結(jié)果:

2018-03-18 22:40:00.731 [Thread-0] INFO  c.c.actual.ThreadCommunication - thread run
2018-03-18 22:40:00.731 [Thread-1] INFO  c.c.actual.ThreadCommunication - thread run
2018-03-18 22:40:00.731 [Thread-2] INFO  c.c.actual.ThreadCommunication - thread run
2018-03-18 22:40:00.731 [main] INFO  c.c.actual.ThreadCommunication - main thread
2018-03-18 22:40:05.741 [Thread-0] INFO  c.c.actual.ThreadCommunication - thread end do something
2018-03-18 22:40:05.741 [Thread-1] INFO  c.c.actual.ThreadCommunication - thread end do something
2018-03-18 22:40:05.741 [Thread-2] INFO  c.c.actual.ThreadCommunication - thread end do something

可以看出由于其中一個線程休眠了五秒,所有其余所有的線程都得等待這個線程調(diào)用 await() 。

該工具可以實(shí)現(xiàn) CountDownLatch 同樣的功能,但是要更加靈活。甚至可以調(diào)用 reset() 方法重置 CyclicBarrier (需要自行捕獲 BrokenBarrierException 處理) 然后重新執(zhí)行。

線程響應(yīng)中斷
public class StopThread implements Runnable {
    @Override
    public void run() {

        while ( !Thread.currentThread().isInterrupted()) {
            // 線程執(zhí)行具體邏輯
            System.out.println(Thread.currentThread().getName() + "運(yùn)行中。。");
        }

        System.out.println(Thread.currentThread().getName() + "退出。。");

    }

    public static void main(String[] args) throws InterruptedException {
        Thread thread = new Thread(new StopThread(), "thread A");
        thread.start();

        System.out.println("main 線程正在運(yùn)行") ;

        TimeUnit.MILLISECONDS.sleep(10) ;
        thread.interrupt();
    }


}

輸出結(jié)果:

thread A運(yùn)行中。。
thread A運(yùn)行中。。
thread A退出。。

可以采用中斷線程的方式來通信,調(diào)用了 thread.interrupt() 方法其實(shí)就是將 thread 中的一個標(biāo)志屬性置為了 true。

并不是說調(diào)用了該方法就可以中斷線程,如果不對這個標(biāo)志進(jìn)行響應(yīng)其實(shí)是沒有什么作用(這里對這個標(biāo)志進(jìn)行了判斷)。

但是如果拋出了 InterruptedException 異常,該標(biāo)志就會被 JVM 重置為 false。

線程池 awaitTermination() 方法

如果是用線程池來管理線程,可以使用以下方式來讓主線程等待線程池中所有任務(wù)執(zhí)行完畢:

    private static void executorService() throws Exception{
        BlockingQueue queue = new LinkedBlockingQueue<>(10) ;
        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(5,5,1, TimeUnit.MILLISECONDS,queue) ;
        poolExecutor.execute(new Runnable() {
            @Override
            public void run() {
                LOGGER.info("running");
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        poolExecutor.execute(new Runnable() {
            @Override
            public void run() {
                LOGGER.info("running2");
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        poolExecutor.shutdown();
        while (!poolExecutor.awaitTermination(1,TimeUnit.SECONDS)){
            LOGGER.info("線程還在執(zhí)行。。。");
        }
        LOGGER.info("main over");
    }

輸出結(jié)果:

2018-03-16 20:18:01.273 [pool-1-thread-2] INFO  c.c.actual.ThreadCommunication - running2
2018-03-16 20:18:01.273 [pool-1-thread-1] INFO  c.c.actual.ThreadCommunication - running
2018-03-16 20:18:02.273 [main] INFO  c.c.actual.ThreadCommunication - 線程還在執(zhí)行。。。
2018-03-16 20:18:03.278 [main] INFO  c.c.actual.ThreadCommunication - 線程還在執(zhí)行。。。
2018-03-16 20:18:04.278 [main] INFO  c.c.actual.ThreadCommunication - main over

使用這個 awaitTermination() 方法的前提需要關(guān)閉線程池,如調(diào)用了 shutdown() 方法。

調(diào)用了 shutdown() 之后線程池會停止接受新任務(wù),并且會平滑的關(guān)閉線程池中現(xiàn)有的任務(wù)。

管道通信
    public static void piped() throws IOException {
        //面向于字符 PipedInputStream 面向于字節(jié)
        PipedWriter writer = new PipedWriter();
        PipedReader reader = new PipedReader();

        //輸入輸出流建立連接
        writer.connect(reader);


        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                LOGGER.info("running");
                try {
                    for (int i = 0; i < 10; i++) {

                        writer.write(i+"");
                        Thread.sleep(10);
                    }
                } catch (Exception e) {

                } finally {
                    try {
                        writer.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }

            }
        });
        Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                LOGGER.info("running2");
                int msg = 0;
                try {
                    while ((msg = reader.read()) != -1) {
                        LOGGER.info("msg={}", (char) msg);
                    }

                } catch (Exception e) {

                }
            }
        });
        t1.start();
        t2.start();
    }

輸出結(jié)果:

2018-03-16 19:56:43.014 [Thread-0] INFO  c.c.actual.ThreadCommunication - running
2018-03-16 19:56:43.014 [Thread-1] INFO  c.c.actual.ThreadCommunication - running2
2018-03-16 19:56:43.130 [Thread-1] INFO  c.c.actual.ThreadCommunication - msg=0
2018-03-16 19:56:43.132 [Thread-1] INFO  c.c.actual.ThreadCommunication - msg=1
2018-03-16 19:56:43.132 [Thread-1] INFO  c.c.actual.ThreadCommunication - msg=2
2018-03-16 19:56:43.133 [Thread-1] INFO  c.c.actual.ThreadCommunication - msg=3
2018-03-16 19:56:43.133 [Thread-1] INFO  c.c.actual.ThreadCommunication - msg=4
2018-03-16 19:56:43.133 [Thread-1] INFO  c.c.actual.ThreadCommunication - msg=5
2018-03-16 19:56:43.133 [Thread-1] INFO  c.c.actual.ThreadCommunication - msg=6
2018-03-16 19:56:43.134 [Thread-1] INFO  c.c.actual.ThreadCommunication - msg=7
2018-03-16 19:56:43.134 [Thread-1] INFO  c.c.actual.ThreadCommunication - msg=8
2018-03-16 19:56:43.134 [Thread-1] INFO  c.c.actual.ThreadCommunication - msg=9

Java 雖說是基于內(nèi)存通信的,但也可以使用管道通信。

需要注意的是,輸入流和輸出流需要首先建立連接。這樣線程 B 就可以收到線程 A 發(fā)出的消息了。

實(shí)際開發(fā)中可以靈活根據(jù)需求選擇最適合的線程通信方式。

號外

最近在總結(jié)一些 Java 相關(guān)的知識點(diǎn),感興趣的朋友可以一起維護(hù)。

地址: https://github.com/crossoverJie/Java-Interview

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/68869.html

相關(guān)文章

  • 系列文章目錄

    摘要:為了避免一篇文章的篇幅過長,于是一些比較大的主題就都分成幾篇來講了,這篇文章是筆者所有文章的目錄,將會持續(xù)更新,以給大家一個查看系列文章的入口。 前言 大家好,筆者是今年才開始寫博客的,寫作的初衷主要是想記錄和分享自己的學(xué)習(xí)經(jīng)歷。因?yàn)閷懽鞯臅r候發(fā)現(xiàn),為了弄懂一個知識,不得不先去了解另外一些知識,這樣以來,為了說明一個問題,就要把一系列知識都了解一遍,寫出來的文章就特別長。 為了避免一篇...

    lijy91 評論0 收藏0
  • 系列文章目錄

    摘要:為了避免一篇文章的篇幅過長,于是一些比較大的主題就都分成幾篇來講了,這篇文章是筆者所有文章的目錄,將會持續(xù)更新,以給大家一個查看系列文章的入口。 前言 大家好,筆者是今年才開始寫博客的,寫作的初衷主要是想記錄和分享自己的學(xué)習(xí)經(jīng)歷。因?yàn)閷懽鞯臅r候發(fā)現(xiàn),為了弄懂一個知識,不得不先去了解另外一些知識,這樣以來,為了說明一個問題,就要把一系列知識都了解一遍,寫出來的文章就特別長。 為了避免一篇...

    Yumenokanata 評論0 收藏0
  • 深入理解volatile類型——從Java虛擬機(jī)內(nèi)存模型角度

    摘要:本文從內(nèi)存模型角度,探討的實(shí)現(xiàn)原理。通過共享內(nèi)存或者消息通知這兩種方法,可以實(shí)現(xiàn)通信或同步。基于共享內(nèi)存的線程通信是隱式的,線程同步是顯式的而基于消息通知的線程通信是顯式的,線程同步是隱式的。鎖規(guī)則鎖的解鎖,于于鎖的獲取或加鎖。 一、前言 在java多線程編程中,volatile可以用來定義輕量級的共享變量,它比synchronized的使用成本更低,因?yàn)樗粫鹁€程上下文的切換和調(diào)...

    mushang 評論0 收藏0
  • 深入理解 Java 多線程系列(1)——一個簡單需求的并行改造 & Java多線程通信問題

    摘要:所以接下來,我們需要簡單的介紹下多線程中的并發(fā)通信模型。比如中,以及各種鎖機(jī)制,均為了解決線程間公共狀態(tài)的串行訪問問題。 并發(fā)的學(xué)習(xí)門檻較高,相較單純的羅列并發(fā)編程 API 的枯燥被動學(xué)習(xí)方式,本系列文章試圖用一個簡單的栗子,一步步結(jié)合并發(fā)編程的相關(guān)知識分析舊有實(shí)現(xiàn)的不足,再實(shí)現(xiàn)邏輯進(jìn)行分析改進(jìn),試圖展示例子背后的并發(fā)工具與實(shí)現(xiàn)原理。 本文是本系列的第一篇文章,提出了一個簡單的業(yè)務(wù)場景...

    ruicbAndroid 評論0 收藏0
  • 深入理解Java內(nèi)存模型(一)——基礎(chǔ)

    摘要:線程之間的通信由內(nèi)存模型本文簡稱為控制,決定一個線程對共享變量的寫入何時對另一個線程可見。為了保證內(nèi)存可見性,編譯器在生成指令序列的適當(dāng)位置會插入內(nèi)存屏障指令來禁止特定類型的處理器重排序。 并發(fā)編程模型的分類 在并發(fā)編程中,我們需要處理兩個關(guān)鍵問題:線程之間如何通信及線程之間如何同步(這里的線程是指并發(fā)執(zhí)行的活動實(shí)體)。通信是指線程之間以何種機(jī)制來交換信息。在命令式編程中,線程之間的...

    jsdt 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<