成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專(zhuān)欄INFORMATION COLUMN

2018年第18周-Java語(yǔ)言思想-并發(fā)

JouyPub / 1033人閱讀

摘要:某些編程語(yǔ)言被設(shè)計(jì)為可以將并發(fā)任務(wù)彼此隔離,這些語(yǔ)言通常被稱(chēng)為函數(shù)性語(yǔ)言。通過(guò)使用多線程機(jī)制,這些獨(dú)立任務(wù)也被稱(chēng)為子任務(wù)中的每一個(gè)都將由執(zhí)行線程來(lái)驅(qū)動(dòng)。

并發(fā)

之前學(xué)的都是順序編程的知識(shí),學(xué)習(xí)并發(fā)編程就好像進(jìn)入了一個(gè)全新的領(lǐng)域,有點(diǎn)類(lèi)似于學(xué)習(xí)了一門(mén)新的編程語(yǔ)言,或者至少是學(xué)習(xí)了一整套新的語(yǔ)言概念。要理解并發(fā)編程,其難度與理解面向?qū)ο缶幊滩畈欢?。如果花點(diǎn)兒功夫,就能明白其基本機(jī)制,但想要抓住其本質(zhì),就需要深入的學(xué)習(xí)和理解。所以看完《Java編程思想》或許會(huì)變得過(guò)分自信,但寫(xiě)復(fù)雜的多線程時(shí),應(yīng)該多看其他多線程的書(shū)籍,關(guān)鍵還是多動(dòng)手。

“并發(fā)是一種具有可論證的確定性,但實(shí)際上具有不可確定性。”

使用并發(fā)時(shí),你的自食其力,并且只有變得多疑而自信,才能用Java編寫(xiě)出可靠的多線程代碼。

用并發(fā)解決的問(wèn)題大體可以分為“速度”和“設(shè)計(jì)可管理性”兩種。 速度

并發(fā)解決“速度”問(wèn)題不僅僅是利用多個(gè)CPU去解決分片的問(wèn)題,也就是說(shuō)并發(fā)不僅僅是多個(gè)CPU的事情,也是單個(gè)CPU的事情。如果提高程序在單個(gè)CPU的性能,就得考慮具體情況,正常情況單個(gè)CPU運(yùn)行多任務(wù)(task)是有上下文切換的性能損耗。但在阻塞(Blocking)的情況下就不同了。
我們先看看阻塞的定義:如果程序中的某個(gè)任務(wù)因?yàn)樵摮绦蚩刂品秶獾哪承l件(通常是I/O),那我們就說(shuō)這個(gè)任務(wù)或線程阻塞了。
如果使用并發(fā)來(lái)寫(xiě)這個(gè)阻塞程序,在一個(gè)任務(wù)阻塞時(shí),程序中的其他任務(wù)還可以繼續(xù)執(zhí)行。這樣性能會(huì)有很大的提升。所以如果沒(méi)有阻塞的情況,在單CPU使用并發(fā),就沒(méi)必要了。

在單個(gè)CPU的系統(tǒng)中性能提高的常見(jiàn)示例:事件驅(qū)動(dòng)編程(event-driven programing)。

實(shí)現(xiàn)并發(fā)最直接的方式是在操作系統(tǒng)級(jí)別使用進(jìn)程(process)。多任務(wù)操作系統(tǒng)可以通過(guò)周期性地將CPU從一個(gè)進(jìn)程切換到另一個(gè)進(jìn)程,來(lái)實(shí)現(xiàn)同時(shí)運(yùn)行多個(gè)進(jìn)程(程序)。

某些編程語(yǔ)言被設(shè)計(jì)為可以將并發(fā)任務(wù)彼此隔離,這些語(yǔ)言通常被稱(chēng)為函數(shù)性語(yǔ)言。Erlang就是這樣的語(yǔ)言,它包含針對(duì)任務(wù)之間彼此通信的安全機(jī)制。如果你發(fā)現(xiàn)程序中某個(gè)部分必須大量使用并發(fā),并且你在試圖構(gòu)建這個(gè)部分時(shí)遇到過(guò)多的問(wèn)題。那么你可以考慮使用像Erlang這類(lèi)專(zhuān)門(mén)的并發(fā)語(yǔ)言來(lái)創(chuàng)建這個(gè)部分。

Java語(yǔ)言采用更加傳統(tǒng)的方式,在順序語(yǔ)言的基礎(chǔ)上提供對(duì)線程的支持。 Java的目的是“編寫(xiě)一次,到處運(yùn)行”,所以在OSX之前的Macintosh操作系統(tǒng)版本是不支持多任務(wù),因此Java支持多線程機(jī)制,讓并發(fā)Java程序能夠移植到Macintosh和類(lèi)似的平臺(tái)上。

設(shè)計(jì)可管理性

設(shè)計(jì)可管理性,我更愿意說(shuō)是一個(gè)解決問(wèn)題的方法模型(程序設(shè)計(jì))。線程使你能夠創(chuàng)建更加松散耦合的設(shè)計(jì)。
在單CPU上使用多任務(wù)的程序(代碼)在任意時(shí)刻仍然只能執(zhí)行一項(xiàng)任務(wù),因此理論上講,肯定可以不用任何任務(wù)就可以編寫(xiě)相同的程序。但是,這樣寫(xiě)來(lái)的代碼可能會(huì)很混亂,不方便維護(hù)。因此并發(fā)提供一種重要的組織結(jié)構(gòu)上的好處:你的程序設(shè)計(jì)可以極大地簡(jiǎn)化。某些類(lèi)似的問(wèn)題,例如仿真,沒(méi)有并發(fā)的支持是很難解決的。

一般線程調(diào)度模式分為:搶占式(preemtive)調(diào)度和協(xié)同式調(diào)度(cooperative).

搶占式調(diào)度指的是每條線程執(zhí)行的時(shí)間、線程的切換都是由系統(tǒng)控制,每條線程可能都分同樣的的執(zhí)行時(shí)間片(CPU切片),也可能是在某些線程執(zhí)行的時(shí)間片較長(zhǎng),甚至某些線程得不到執(zhí)行時(shí)間片。這種機(jī)制下,優(yōu)點(diǎn)是一個(gè)線程阻塞不會(huì)導(dǎo)致整個(gè)進(jìn)程堵塞,缺點(diǎn)就是上下文切換開(kāi)銷(xiāo)大
協(xié)同式調(diào)度指的是某一條線程執(zhí)行完后主動(dòng)通知系統(tǒng)切到另一條線程上執(zhí)行。線程的執(zhí)行時(shí)間由線程本身控制,線程切換可以預(yù)知。優(yōu)點(diǎn)是不存在多線程同步問(wèn)題,上下文切換開(kāi)銷(xiāo)小,缺點(diǎn)是如果一個(gè)線程阻塞了,那么可能造成整個(gè)系統(tǒng)崩潰。

Java線程機(jī)制是搶占式.
線程讓出cpu的情況:
1.當(dāng)前運(yùn)行線程主動(dòng)放棄CPU,JVM暫時(shí)放棄CPU操作(基于時(shí)間片輪轉(zhuǎn)調(diào)度的JVM操作系統(tǒng)不會(huì)讓線程永久放棄CPU,或者說(shuō)放棄本次時(shí)間片的執(zhí)行權(quán)),例如調(diào)用yield()方法。
2.當(dāng)前運(yùn)行線程因?yàn)槟承┰蜻M(jìn)入阻塞狀態(tài),例如阻塞在I/O上。
3.當(dāng)前運(yùn)行線程結(jié)束,即運(yùn)行完run()方法里面的任務(wù)

并發(fā)需要付出代價(jià),包含復(fù)雜性代價(jià)。但這些代價(jià)與優(yōu)化程序設(shè)計(jì)、資源負(fù)載均衡以及用戶體驗(yàn)上的改進(jìn)相比,這些代價(jià)就顯得微不足道。

線程帶來(lái)設(shè)計(jì)上的演變

為了獲取線程的結(jié)果,于是產(chǎn)生輪詢,然后再后來(lái)為了解決輪詢,引進(jìn)了靜態(tài)方法的回調(diào),再后來(lái)帶來(lái)實(shí)例方法的回調(diào),最后引出設(shè)計(jì)模式:策略模式 和Java5引進(jìn)多線程編程的新方法,通過(guò)隱藏細(xì)節(jié)可以更容易地處理回調(diào)——ExecutorService和Futrue

輪詢例子:

package com.jc.thread;

import com.jc.thinkinjava.io.util.Directory; 

import javax.xml.bind.DatatypeConverter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.security.DigestInputStream;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.List;
/**
 * 回調(diào)例子-前序
 *
 * 計(jì)算文件的256位的SHA-2消息摘要
 * 由于瓶頸在IO上,所以采用多線程
 *
 * 嘗試去獲取線程返回的值,但發(fā)現(xiàn)需要另外個(gè)線程不停的輪詢,這是很耗cpu資源
 */
@SuppressWarnings("Duplicates")
public class ReturnDigest extends Thread {

    private String fileName;

    private byte[] digest;

    public ReturnDigest(String fileName) {
        this.fileName = fileName;
    }


    @Override
    public void run() {
        try {

//            System.out.println(fileName);
            FileInputStream in = new FileInputStream(fileName);
            MessageDigest sha = MessageDigest.getInstance("SHA-256");
            DigestInputStream digestInputStream = new DigestInputStream(in, sha);
            while (digestInputStream.read() != -1) ;
            digestInputStream.close();
            digest = sha.digest(); //注意,不是DigestInputStream的方法哦

            StringBuilder sb = new StringBuilder(fileName);
            sb.append(":").append(DatatypeConverter.printHexBinary(digest));

            System.out.println(sb.toString());
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (NoSuchAlgorithmException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }


    }


    public byte[] getDigest() {
        return this.digest;
    }


    public static void main(String[] args) {
        File[] files = Directory.local(".", ".*");

        List fileList = new ArrayList();

        for (int i = 0; i < files.length; i++) {
            File file = files[i];
            if (!file.isDirectory()) {
                fileList.add(file);
            }
        }

        ReturnDigest[] digests = new ReturnDigest[fileList.size()];
        for (int i = 0; i < fileList.size(); i++) {
            File file = fileList.get(0);
            digests[i] = new ReturnDigest(file.getAbsolutePath());
            digests[i].start();
        }

        for(int i=0;i

然后為了解決輪詢,產(chǎn)生了靜態(tài)方法的回調(diào):

package com.jc.thread.callback;

import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.security.DigestInputStream;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;

/**
 * 回調(diào)例子
 * 靜態(tài)方法的回調(diào)
 */
@SuppressWarnings("Duplicates")
public class CallbackDigest  implements  Runnable{
    private String fileName;

    public CallbackDigest(String fileName) {
        this.fileName = fileName;
    }

    @Override
    public void run() {
        try {

//            System.out.println(fileName);
            FileInputStream in = new FileInputStream(fileName);
            MessageDigest sha = MessageDigest.getInstance("SHA-256");
            DigestInputStream digestInputStream = new DigestInputStream(in, sha);
            while (digestInputStream.read() != -1) ;
            digestInputStream.close();
            byte[] digest = sha.digest(); //注意,不是DigestInputStream的方法哦

            CallbackDigestUserInterface.receiveDigest(digest,fileName);
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (NoSuchAlgorithmException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}
package com.jc.thread.callback;

import com.jc.thinkinjava.io.util.Directory;
import com.jc.thread.DigestRunnable;

import javax.xml.bind.DatatypeConverter;
import java.io.File;

/**
 * 回調(diào)例子
 * 靜態(tài)方法的回調(diào)
 */
public class CallbackDigestUserInterface {

    public static void receiveDigest(byte[] digest,String fileName){
        StringBuilder sb = new StringBuilder(fileName);
        sb.append(":").append(DatatypeConverter.printHexBinary(digest));

        System.out.println(sb.toString());
    }


    public static void main(String[] args) {
        File[] files = Directory.local(".", ".*");
        for (File file : files) {
            if (!file.isDirectory())
                new Thread(new DigestRunnable(file.getAbsolutePath())).start();
        }
    }


}

實(shí)例方法的回調(diào):

package com.jc.thread.callback;

import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.security.DigestInputStream;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;

public class InstanceCallbackDigest   implements  Runnable{
    private String fileName;
    private InstanceCallbackDigestUserInterface callback;

    public InstanceCallbackDigest(String fileName, InstanceCallbackDigestUserInterface instanceCallbackDigestUserInterface) {
        this.fileName = fileName;
        this.callback = instanceCallbackDigestUserInterface;
    }

    @Override
    public void run() {
        try {

//            System.out.println(fileName);
            FileInputStream in = new FileInputStream(fileName);
            MessageDigest sha = MessageDigest.getInstance("SHA-256");
            DigestInputStream digestInputStream = new DigestInputStream(in, sha);
            while (digestInputStream.read() != -1) ;
            digestInputStream.close();
            byte[] digest = sha.digest(); //注意,不是DigestInputStream的方法哦

            callback.receiveDigest(digest);
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (NoSuchAlgorithmException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}
package com.jc.thread.callback;

import com.jc.thinkinjava.io.util.Directory;
import com.jc.thread.ReturnDigest;

import javax.xml.bind.DatatypeConverter;
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;


/**
 * 回調(diào)例子
 * 

* 使用實(shí)例方法代替靜態(tài)方法進(jìn)行回調(diào) *

* 雖然復(fù)雜點(diǎn),但優(yōu)點(diǎn)很多。如: * 1. 主類(lèi)(InstanceCallbackDigestUserInterface)的各個(gè)實(shí)例映射為一個(gè)文件,可以很自然地記錄跟蹤這個(gè)文件的信息,而不需要額外的數(shù)據(jù)結(jié)構(gòu) * 2. 這個(gè)實(shí)例在有必要時(shí)可以容易地重新計(jì)算某個(gè)特定文件的摘要 *

* 實(shí)際上,經(jīng)證明,這種機(jī)制有更大的靈活性。 *

* 這種機(jī)制,也稱(chēng)為:觀察者模式,如Swing、AWT */ public class InstanceCallbackDigestUserInterface { private String fileName; private byte[] digest; public InstanceCallbackDigestUserInterface(String fileName) { this.fileName = fileName; } public void calculateDigest() { InstanceCallbackDigest instanceCallbackDigest = new InstanceCallbackDigest(fileName, this); new Thread(instanceCallbackDigest).start(); } public void receiveDigest(byte[] digest) { this.digest = digest; System.out.println(this); } @Override public String toString() { String result = fileName + ": "; if (digest != null) { result += DatatypeConverter.printHexBinary(digest); } else { result += "digest not available"; } return result; } public static void main(String[] args) { File[] files = Directory.local(".", ".*"); List fileList = new ArrayList(); for (int i = 0; i < files.length; i++) { File file = files[i]; if (!file.isDirectory()) { fileList.add(file); } } for (int i = 0; i < fileList.size(); i++) { File file = fileList.get(0); InstanceCallbackDigestUserInterface instanceCallbackDigestUserInterface = new InstanceCallbackDigestUserInterface(file.getAbsolutePath()); instanceCallbackDigestUserInterface.calculateDigest(); } } }

Java5引進(jìn)的新方法,ExecutorService和Future:

package com.jc.thread.callback;

import java.util.concurrent.Callable;

public class FindMaxTask implements Callable {


    private int[] data;
    private int start;
    private int end;

    public FindMaxTask(int[] data, int start, int end) {
        this.data = data;
        this.start = start;
        this.end = end;
    }

    @Override
    public Integer call() throws Exception {
        int max = Integer.MAX_VALUE;
        for (int i = start; i < end; i++) {
            if (data[i] > max) max = data[i];
        }
        return max;
    }
}
package com.jc.thread.callback;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
 *
 * Java5引入了多線程編程的一個(gè)新方法,通過(guò)隱藏細(xì)節(jié)可以更容易地處理回調(diào)
 * 使用回調(diào)實(shí)現(xiàn)的Futrue
 */
public class MultithreadedMaxFinder {

    public static int max(int[] data) throws ExecutionException, InterruptedException {
        if (data.length == 1) {
            return data[0];
        } else if (data.length == 0) {
            throw new IllegalArgumentException();
        }

        FindMaxTask task1 = new FindMaxTask(data,0,data.length/2);
        FindMaxTask task2 = new FindMaxTask(data,data.length/2,data.length);


        ExecutorService executorService = Executors.newFixedThreadPool(2);
        Future future1 = executorService.submit(task1);
        Future future2 = executorService.submit(task2);
        //調(diào)用future1.get()時(shí),這個(gè)方法會(huì)進(jìn)行阻塞,等待第一個(gè)FindMaxTask完成。只有當(dāng)?shù)谝粋€(gè)FindMaxTask完成,才會(huì)調(diào)用future2.get()
        return Math.max(future1.get(),future2.get());
    }
}
基本線程機(jī)制

并發(fā)編程使我們可以將程序劃分為多個(gè)分離的、獨(dú)立運(yùn)行的任務(wù)。通過(guò)使用多線程機(jī)制,這些獨(dú)立任務(wù)(也被稱(chēng)為子任務(wù))中的每一個(gè)都將由執(zhí)行線程來(lái)驅(qū)動(dòng)。
線程模型:一個(gè)線程就是進(jìn)程中的一個(gè)單一順序控制流,因此單個(gè)進(jìn)程可以擁有多個(gè)并發(fā)執(zhí)行的任務(wù),感覺(jué)每個(gè)任務(wù)都好像有其CPU一樣,其底層機(jī)制是切分CPU時(shí)間,但通常不用考慮CPU的切片。
線程模型為編程帶來(lái)便利,它簡(jiǎn)化了在單一程序中同時(shí)交織在一起的多個(gè)操作的處理。在使用線程時(shí),CPU將輪流給每個(gè)任務(wù)分配其占用時(shí)間。線程的一大好處是可以使你從這一個(gè)層次抽身出來(lái),即代碼不必知道它是運(yùn)行在具有一個(gè)還是多個(gè)CPU的機(jī)子上。
所以,使用線程機(jī)制是一種建立透明的、可擴(kuò)展的程序的方法,如果程序運(yùn)行得太慢,為機(jī)器增添一個(gè)CPU就能容易地加快程序的運(yùn)行速度。多任務(wù)和多線程往往是使用多處理器系統(tǒng)的最合理方式。

//此方法調(diào)用是對(duì) 線程調(diào)度器 的一種建議:我已經(jīng)執(zhí)行完生命周期中最重要的部分了,此刻正是切換給其他任務(wù)執(zhí)行一段時(shí)間的大好時(shí)機(jī)。
Thread.yield();

Thread.yield();這個(gè)方法叫“讓步”,不過(guò)沒(méi)有任何機(jī)制保證它將會(huì)被采納。

術(shù)語(yǔ)

在Java中學(xué)習(xí)并發(fā)編程,總是會(huì)讓人困惑。讓人困惑是那些概念,特別是涉及到線程。
要執(zhí)行的任務(wù)和驅(qū)動(dòng)它的線程,這里的任務(wù)和線程是不同的,在Java中會(huì)更明細(xì),因?yàn)槟銓?duì)Thread類(lèi)實(shí)際沒(méi)有任何控制權(quán)(特別是使用Executor時(shí)候)。通過(guò)某種方式,將任務(wù)附著到線程,以使這個(gè)線程可以驅(qū)動(dòng)任務(wù)。
在Java中,Thread類(lèi)自身不執(zhí)行任何操作,它只是驅(qū)動(dòng)賦予它的任務(wù),但是線程的一些研究中,總是使用這樣的話語(yǔ)“線程執(zhí)行這項(xiàng)或那項(xiàng)動(dòng)作”,仿佛“線程就是任務(wù)”。這一點(diǎn)是讓新人是十分困惑的。因?yàn)闀?huì)讓人覺(jué)得任務(wù)和線程是一種“是一個(gè)”的關(guān)系。覺(jué)得應(yīng)該從Thread繼承出一個(gè)任務(wù)。但實(shí)際不是,所以用Task名字會(huì)更好。
那為什么Java設(shè)計(jì)者不用Task而用Thread或Runnable呢? 之所以有上述的困惑(概念混淆),那是因?yàn)?,雖然從概念上講,我們應(yīng)該只關(guān)注任務(wù),而不需要關(guān)注線程的細(xì)節(jié),我們只需要定義任務(wù),然后說(shuō)“開(kāi)始”就好。但實(shí)際情況是,在物理上,創(chuàng)建線程可能會(huì)代價(jià)很高,因此需要人工去保存和管理它們。而且Java的線程機(jī)制是基于C的低級(jí)的P線程(pthread)方式。所以才導(dǎo)致任務(wù)和線程這兩個(gè)概念總是混在一起。站在實(shí)現(xiàn)和更抽象的角度,這兩者應(yīng)該分開(kāi),所以編寫(xiě)代碼時(shí),你必須遵守規(guī)則。

為了描述更清楚,因?yàn)槎x為要執(zhí)行的工作則為“任務(wù)”,引用到驅(qū)動(dòng)任務(wù)的具體機(jī)制時(shí),用“線程”。 如果只是概念級(jí)別上討論系統(tǒng),則只用“任務(wù)”就行。

加入一個(gè)線程

一個(gè)線程可以調(diào)用其他線程的join()方法,其效果是等待一段時(shí)間直到第二個(gè)線程結(jié)束才繼續(xù)執(zhí)行。

package com.jc.concurrency;
/**
 * 一個(gè)線程可以等待一個(gè)線程完成,那就是用join
 * @author 
 *
 */
class Sleeper extends Thread {
    private int duration;

    public Sleeper(String name, int sleepTime) {
        super(name);
        duration = sleepTime;
        start();
    }

    public void run() {
        try {
            sleep(duration);
        } catch (InterruptedException e) { //異常捕獲時(shí)會(huì)將Interrupted這個(gè)標(biāo)志位重置為false,所以在這里輸出false
            System.out.println(getName() + " was interrupted. " + "isInterrupted(): " + isInterrupted());
            return;
        }
        System.out.println(getName() + " has awakened");
    }
}

class Joiner extends Thread {
    private Sleeper sleeper;

    public Joiner(String name, Sleeper sleeper) {
        super(name);
        this.sleeper = sleeper;
        start();
    }

    public void run() {
        try {
            sleeper.join();
        } catch (InterruptedException e) {
            System.out.println("Interrupted");
        }
        System.out.println(getName() + " join completed");
    }
}

public class Joining {
    public static void main(String[] args) {
        Sleeper sleepy = new Sleeper("Sleepy", 1500), grumpy = new Sleeper("Grumpy", 1500);
        Joiner dopey = new Joiner("Dopey", sleepy), doc = new Joiner("Doc", grumpy);
        grumpy.interrupt();
    }
}
捕獲異常

在main方法是無(wú)法捕獲到線程里的異常。為解決這個(gè)問(wèn)題,我們修改Executor產(chǎn)生線程的方式。Java SE5中的新接口:Thread.UncaughtExceptionHandler

package com.jc.concurrency;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

/**
 * 使用Thread.UncaughtExceptionHandler處理線程拋出的異常
 * 
 * MyUncaughtExceptionHandler會(huì)新建線程去處理其他線程跑出來(lái)的異常
 * 
 * @author 
 *
 */
class ExceptionThread2 implements Runnable {
    public void run() {
        Thread t = Thread.currentThread();
        System.out.println("run() by " + t);
        System.out.println("eh = " + t.getUncaughtExceptionHandler());
        throw new RuntimeException();
    }
}

class MyUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
    public void uncaughtException(Thread t, Throwable e) {
        System.out.println("caught " + t + ""s " + e);
    }
}

class HandlerThreadFactory implements ThreadFactory {
    @Override
    public Thread newThread(Runnable r) {
        System.out.println(this + " creating new Thread");
        Thread t = new Thread(r);
        System.out.println("created " + t);
        t.setUncaughtExceptionHandler(new MyUncaughtExceptionHandler());
        System.out.println("eh = " + t.getUncaughtExceptionHandler());
        return t;
    }
}

public class CaptureUncaughtException {
    public static void main(String[] args) {
        ExecutorService exec = Executors.newCachedThreadPool(new HandlerThreadFactory());
        exec.execute(new ExceptionThread2());
    }
}/*
     * output:
     * 
     * com.jc.concurrency.HandlerThreadFactory@4e25154f creating new Thread
     * created Thread[Thread-0,5,main] eh =
     * com.jc.concurrency.MyUncaughtExceptionHandler@70dea4e run() by
     * Thread[Thread-0,5,main] eh =
     * com.jc.concurrency.MyUncaughtExceptionHandler@70dea4e
     * com.jc.concurrency.HandlerThreadFactory@4e25154f creating new Thread
     * created Thread[Thread-1,5,main] eh =
     * com.jc.concurrency.MyUncaughtExceptionHandler@5490c2f5 caught
     * Thread[Thread-0,5,main]"s java.lang.RuntimeException
     * 
     * 
     * 
     */

還可以設(shè)置默認(rèn)異常處理器:

package com.jc.concurrency;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
 * 設(shè)置默認(rèn)的線程異常處理類(lèi)
 * @author 
 *
 */
public class SettingDefaultHandler {
    public static void main(String[] args) {
        Thread.setDefaultUncaughtExceptionHandler(new MyUncaughtExceptionHandler());
        ExecutorService exec = Executors.newCachedThreadPool();
        exec.execute(new ExceptionThread());
    }
}
線程狀態(tài)(Thread state)1.新建(new):

一個(gè)線程可以處于四種狀態(tài)之一:新建(new),就緒(Runnable),阻塞(Blocked),死亡(Dead)。
1.新建(new):這是個(gè)短暫狀態(tài),當(dāng)線程被創(chuàng)建時(shí),它只會(huì)短暫地處于這種狀態(tài)。此時(shí)它已經(jīng)分配了必須的系統(tǒng)資源,并執(zhí)行了初始化。此刻線程已經(jīng)有資格獲取CPU時(shí)間了,之后調(diào)度器將把這個(gè)線程轉(zhuǎn)變?yōu)榭蛇\(yùn)行狀態(tài)或阻塞狀態(tài)。
2.就緒(Runnable):在這種狀態(tài)下,只要調(diào)度器把時(shí)間片分配給線程,線程就可以運(yùn)行。也就是說(shuō),在任意時(shí)刻,此狀態(tài)的線程可以運(yùn)行也可以不運(yùn)行。不同于死亡和阻塞狀態(tài)。
3.阻塞(Blocked):線程能夠運(yùn)行,但有某個(gè)條件阻止它的運(yùn)行。當(dāng)線程處于阻塞狀態(tài)時(shí),調(diào)度器將忽略線程,不會(huì)分配給線程任何CPU時(shí)間。直到線程重新進(jìn)入了就緒狀態(tài),它才有可能執(zhí)行操作。
4.死亡(Dead):處于死亡或終止?fàn)顟B(tài)的線程將不再是可調(diào)度的,并且再也不會(huì)得到CPU時(shí)間,它的任務(wù)已結(jié)束,或不再是可運(yùn)行的。任務(wù)死亡的通常方式是從run()方法返回,但是任務(wù)的線程還可以被中斷,中斷也是屬于死亡。

進(jìn)入阻塞狀態(tài)

一個(gè)任務(wù)進(jìn)入阻塞狀態(tài),可能要有如下原因:

通過(guò)調(diào)用sleep(milliseconds)使任務(wù)進(jìn)入休眠狀態(tài),在這種情況下,任務(wù)在指定的時(shí)間內(nèi)不會(huì)運(yùn)行。

通過(guò)調(diào)用wait()使線程掛起。直到線程得到了notify()或notifyAll()消息(或者在Java SE5的java.util.concurrent類(lèi)庫(kù)中等價(jià)的signal()或signalAll()消息),線程才會(huì)進(jìn)入就緒狀態(tài)。

任務(wù)在等待某個(gè)輸入/輸出完成。

任務(wù)試圖在某個(gè)對(duì)象上調(diào)用其同步控制方法,但是對(duì)象鎖不可用,因?yàn)榱硪粋€(gè)任務(wù)已經(jīng)獲取了這個(gè)鎖。

在較早的代碼中,會(huì)有suspend()和resume()來(lái)阻塞和喚醒線程,因?yàn)槿菀讓?dǎo)致死鎖,所以被廢止了。

中斷

在阻塞狀態(tài)的線程,可以通過(guò)中斷來(lái)終止該阻塞的任務(wù)。Thread類(lèi)包含interrupt()方法來(lái)中斷。如果使用Executor,則使用Future的cancel()來(lái)中斷任務(wù)。其實(shí)Executor的shutdownNow()方法,就是將發(fā)送一個(gè)interrupt()調(diào)用給它所啟動(dòng)的所有線程。

package com.jc.concurrency;

import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

/**
 * 中斷處于阻塞狀態(tài)的線程例子  
 * 發(fā)現(xiàn)只有sleep()操作的才能中斷,其余的io和同步都不能被中斷
 * @author 
 *
 */
class SleepBlocked implements Runnable {
    public void run() {
        try {
            TimeUnit.SECONDS.sleep(100);
        } catch (InterruptedException e) {
            System.out.println("InterruptedException");
        }
        System.out.println("Exiting SleepBlocked.run()");
    }
}

class IOBlocked implements Runnable {
    private InputStream in;

    public IOBlocked(InputStream is) {
        in = is;
    }

    public void run() {
        try {
            System.out.println("Waiting for read():");
            in.read();
        } catch (IOException e) {
            if (Thread.currentThread().isInterrupted()) {
                System.out.println("Interrupted from blocked I/O");
            } else {
                throw new RuntimeException(e);
            }
        }
        System.out.println("Exiting IOBlocked.run()");
    }
}

class SynchronizedBlocked implements Runnable {
    public synchronized void f() {
        while (true) // Never releases lock
            Thread.yield();
    }

    public SynchronizedBlocked() {
        new Thread() {
            public void run() {
                f(); // Lock acquired by this thread
            }
        }.start();
    }

    public void run() {
        System.out.println("Trying to call f()");
        f();
        System.out.println("Exiting SynchronizedBlocked.run()");
    }
}

public class Interrupting {
    private static ExecutorService exec = Executors.newCachedThreadPool();

    static void test(Runnable r) throws InterruptedException {
        Future f = exec.submit(r);
        TimeUnit.MILLISECONDS.sleep(100);
        System.out.println("Interrupting " + r.getClass().getName());
        f.cancel(true); // Interrupts if running
        System.out.println("Interrupt sent to " + r.getClass().getName());
    }

    public static void main(String[] args) throws Exception {
        test(new SleepBlocked());
        test(new IOBlocked(System.in));
        test(new SynchronizedBlocked());
        TimeUnit.SECONDS.sleep(3);
        System.out.println("Aborting with System.exit(0)");
        System.exit(0); // ... since last 2 interrupts failed
    }
}

發(fā)現(xiàn)只有sleep()操作的才能中斷,其余的io和同步都不能被中斷。所以有個(gè)比較不優(yōu)雅,但有效的關(guān)閉方式:

package com.jc.concurrency;

import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * 中斷IO阻塞的線程的方式:關(guān)閉資源
 * @author 
 *
 */
public class CloseResource {
    public static void main(String[] args) throws Exception {
        ExecutorService exec = Executors.newCachedThreadPool();
        ServerSocket server = new ServerSocket(8080);
        InputStream socketInput = new Socket("localhost", 8080).getInputStream();
        exec.execute(new IOBlocked(socketInput));
        exec.execute(new IOBlocked(System.in));
        TimeUnit.MILLISECONDS.sleep(100);
        System.out.println("Shutting down all threads");
        exec.shutdownNow();
        TimeUnit.SECONDS.sleep(1);
        System.out.println("Closing " + socketInput.getClass().getName());
        socketInput.close(); // Releases blocked thread
        TimeUnit.SECONDS.sleep(1);
        System.out.println("Closing " + System.in.getClass().getName());
        System.in.close(); // Releases blocked thread
    }
}

之所以要sleep,是想要interrupt都傳到各個(gè)線程里面。以達(dá)到中斷的效果。

NIO提供了優(yōu)雅的I/O中斷。

/**
 * NIO提供了優(yōu)雅的I/O中斷
 * @author 
 *
 */
class NIOBlocked implements Runnable {
    private final SocketChannel sc;

    public NIOBlocked(SocketChannel sc) {
        this.sc = sc;
    }

    public void run() {
        try {
            System.out.println("Waiting for read() in " + this);
            sc.read(ByteBuffer.allocate(1));
        } catch (ClosedByInterruptException e) {
            System.out.println("ClosedByInterruptException" + this);
        } catch (AsynchronousCloseException e) {
            System.out.println("AsynchronousCloseException" + this);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
        System.out.println("Exiting NIOBlocked.run() " + this);
    }
}

public class NIOInterruption {
    public static void main(String[] args) throws Exception {
        ExecutorService exec = Executors.newCachedThreadPool();
        ServerSocket server = new ServerSocket(8080);
        InetSocketAddress isa = new InetSocketAddress("localhost", 8080);
        SocketChannel sc1 = SocketChannel.open(isa);
        SocketChannel sc2 = SocketChannel.open(isa);
        System.out.println(sc1);
        System.out.println(sc2);
        Future f = exec.submit(new NIOBlocked(sc1));
        exec.execute(new NIOBlocked(sc2));
        exec.shutdown();
        TimeUnit.SECONDS.sleep(1);
        // Produce an interrupt via cancel:
        f.cancel(true);
        TimeUnit.SECONDS.sleep(1);
        // Release the block by closing the channel:
        sc2.close();
    }
}

SleepBlocked例子展示了synchronized的鎖是不可以中斷,這是很危險(xiǎn)的。所以ReentrantLock提供了可中斷的能力

package com.jc.concurrency;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * SleepBlocked例子展示了synchronized的鎖是不可以中斷,這是很危險(xiǎn)的。
 * 所以ReentrantLock提供了可中斷的能力
 * @author 
 *
 */
class BlockedMutex {
    private Lock lock = new ReentrantLock();

    public BlockedMutex() {
        // Acquire it right away, to demonstrate interruption
        // of a task blocked on a ReentrantLock:
        lock.lock();
    }

    public void f() {
        try {
            // This will never be available to a second task
            lock.lockInterruptibly(); // Special call
            System.out.println("lock acquired in f()");
        } catch (InterruptedException e) {
            System.out.println("Interrupted from lock acquisition in f()");
        }
    }
}

class Blocked2 implements Runnable {
    BlockedMutex blocked = new BlockedMutex();

    public void run() {
        System.out.println("Waiting for f() in BlockedMutex");
        blocked.f();
        System.out.println("Broken out of blocked call");
    }
}

public class Interrupting2 {
    public static void main(String[] args) throws Exception {
        Thread t = new Thread(new Blocked2());
        t.start();
        TimeUnit.SECONDS.sleep(1);
        System.out.println("Issuing t.interrupt()");
        t.interrupt();
    }
}/**output:
Waiting for f() in BlockedMutex
Issuing t.interrupt()
Interrupted from lock acquisition in f()
Broken out of blocked call
**/

在沒(méi)有阻塞的語(yǔ)句時(shí),通過(guò)Thread.interrupted()判斷線程被中斷:

package com.jc.concurrency;

import java.util.concurrent.TimeUnit;

/**
 * 在沒(méi)有阻塞的語(yǔ)句時(shí),通過(guò)Thread.interrupted()判斷線程被中斷
 * @author 
 *
 */
class NeedsCleanup {
    private final int id;

    public NeedsCleanup(int ident) {
        id = ident;
        System.out.println("NeedsCleanup " + id);
    }

    public void cleanup() {
        System.out.println("Cleaning up " + id);
    }
}

class Blocked3 implements Runnable {
    private volatile double d = 0.0;

    public void run() {
//        try {
            while (!Thread.interrupted()) {
                // point1
                NeedsCleanup n1 = new NeedsCleanup(1);
                // Start try-finally immediately after definition
                // of n1, to guarantee proper cleanup of n1:
                try {
                    System.out.println("Sleeping");
//                    TimeUnit.SECONDS.sleep(1);
                    // point2
                    NeedsCleanup n2 = new NeedsCleanup(2);
                    // Guarantee proper cleanup of n2:
                    try {
                        System.out.println("Calculating");
                        // A time-consuming, non-blocking operation:
                        for (int i = 1; i < 2500000; i++)
                            d = d + (Math.PI + Math.E) / d;
                        System.out.println("Finished time-consuming operation");
                    } finally {
                        n2.cleanup();
                    }
                } finally {
                    n1.cleanup();
                }
            }
            System.out.println("Exiting via while() test");
//        } catch (InterruptedException e) {
//            System.out.println("Exiting via InterruptedException");
//        }
    }
}

public class InterruptingIdiom {
    public static void main(String[] args) throws Exception {
        if (args.length != 1) {
            System.out.println("usage: java InterruptingIdiom delay-in-mS");
            System.exit(1);
        }
        Thread t = new Thread(new Blocked3());
        t.start();
        TimeUnit.MILLISECONDS.sleep(new Integer(args[0]));
        t.interrupt();
    }
}
線程協(xié)作wait()和notify()

wait()、notify()以及nofityAll()有一個(gè)比較特殊的方面,那就是這些方法都是基類(lèi)Object的方法,而不是Thread的一部分。一開(kāi)始或許有這種困惑,覺(jué)得很奇怪。明明是線程的功能,為啥要放在Object里。那時(shí)因?yàn)檫@些方法需要操作鎖,當(dāng)一個(gè)任務(wù)在方法里遇到wait()的調(diào)用時(shí),線程的執(zhí)行被掛起(阻塞狀態(tài)),對(duì)象上的鎖會(huì)被是否。因此wait()方法需放在同步控制塊里(與之相對(duì)比是sleep()因?yàn)椴挥貌僮麈i,所以可以放在非同步控制塊里,而且還是Thread的方法)。如果在非同步控制調(diào)用這些方法,程序能通過(guò)編譯,但運(yùn)行時(shí)會(huì)拋IllegalMonitorStateException差異。例子:

package com.jc.concurrency;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * wait()和notifyAll()例子,notifyAll會(huì)將該對(duì)象的wait()方法所阻塞的線程
 * @author 
 *
 */
class Car {
    private boolean waxOn = false;

    public synchronized void waxed() {
        waxOn = true; // Ready to buff
        notifyAll();
    }

    public synchronized void buffed() {
        waxOn = false; // Ready for another coat of wax
        notifyAll();
    }

    public synchronized void waitForWaxing() throws InterruptedException {
        while (waxOn == false)
            wait();
    }

    public synchronized void waitForBuffing() throws InterruptedException {
        while (waxOn == true)
            wait();
    }
}

class WaxOn implements Runnable {
    private Car car;

    public WaxOn(Car c) {
        car = c;
    }

    public void run() {
        try {
            while (!Thread.interrupted()) {
                System.out.print("Wax On! ");
                TimeUnit.MILLISECONDS.sleep(200);
                car.waxed();
                car.waitForBuffing();
            }
        } catch (InterruptedException e) {
            System.out.println("Exiting via interrupt");
        }
        System.out.println("Ending Wax On task");
    }
}

class WaxOff implements Runnable {
    private Car car;

    public WaxOff(Car c) {
        car = c;
    }

    public void run() {
        try {
            while (!Thread.interrupted()) {
                car.waitForWaxing();
                System.out.print("Wax Off! ");
                TimeUnit.MILLISECONDS.sleep(200);
                car.buffed();
            }
        } catch (InterruptedException e) {
            System.out.println("Exiting via interrupt");
        }
        System.out.println("Ending Wax Off task");
    }
}

public class WaxOMatic {
    public static void main(String[] args) throws Exception {
        Car car = new Car();
        ExecutorService exec = Executors.newCachedThreadPool();
        exec.execute(new WaxOff(car));
        exec.execute(new WaxOn(car));
        TimeUnit.SECONDS.sleep(5); // Run for a while...
        exec.shutdownNow(); // Interrupt all tasks
    }
}
notify()和nofityAll()

因?yàn)榭赡苡卸鄠€(gè)任務(wù)在單個(gè)Car對(duì)象上處于wait()狀態(tài),因此調(diào)用nofityAll()比只調(diào)用notify()要更安全。所以上面那個(gè)程序,只有一個(gè)任務(wù),因此可以使用notify()來(lái)代替notifyAll()。
使用 notify()而不是notifyAll()是一種優(yōu)化。除非知道notify()會(huì)喚醒具體哪個(gè)任務(wù),不如還是notifyAll()保守點(diǎn)
在有關(guān)Java的線程機(jī)制的討論中,有一個(gè)令人困惑的描述:notifyAll()將喚醒“所有正在等待的任務(wù)”。其實(shí)更準(zhǔn)確是:當(dāng)notifyAll()因某個(gè)特定鎖而被調(diào)用時(shí),只有等待這個(gè)鎖的任務(wù)才會(huì)被喚醒:

package com.jc.concurrency;

import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
 * 當(dāng)notifyAll()因某個(gè)特定鎖而被調(diào)用時(shí),只有等待這個(gè)鎖的任務(wù)才會(huì)被喚醒
 * @author 
 *
 */
class Blocker {
    synchronized void waitingCall() {
        try {
            while (!Thread.interrupted()) {
                wait();
                System.out.print(Thread.currentThread() + " ");
            }
        } catch (InterruptedException e) {
            // OK to exit this way
        }
    }

    synchronized void prod() {
        notify();
    }

    synchronized void prodAll() {
        notifyAll();
    }
}

class Task implements Runnable {
    static Blocker blocker = new Blocker();

    public void run() {
        blocker.waitingCall();
    }
}

class Task2 implements Runnable {
    // A separate Blocker object:
    static Blocker blocker = new Blocker();

    public void run() {
        blocker.waitingCall();
    }
}

public class NotifyVsNotifyAll {
    public static void main(String[] args) throws Exception {
        ExecutorService exec = Executors.newCachedThreadPool();
        for (int i = 0; i < 5; i++)
            exec.execute(new Task());
        exec.execute(new Task2());
        Timer timer = new Timer();
        timer.scheduleAtFixedRate(new TimerTask() {
            boolean prod = true;

            public void run() {
                if (prod) {
                    System.out.print("
notify() ");
                    Task.blocker.prod();
                    prod = false;
                } else {
                    System.out.print("
notifyAll() ");
                    Task.blocker.prodAll();
                    prod = true;
                }
            }
        }, 400, 400); // Run every .4 second
        TimeUnit.SECONDS.sleep(5); // Run for a while...
        timer.cancel();
        System.out.println("
Timer canceled");
        TimeUnit.MILLISECONDS.sleep(500);
        System.out.print("Task2.blocker.prodAll() ");
        Task2.blocker.prodAll();
        TimeUnit.MILLISECONDS.sleep(500);
        System.out.println("
Shutting down");
        exec.shutdownNow(); // Interrupt all tasks
    }
}

使用wait()和notifyAll()實(shí)現(xiàn)生產(chǎn)者和消費(fèi)者:一個(gè)飯店,有一個(gè)廚師和一個(gè)服務(wù)員,這個(gè)服務(wù)員必須等待廚師準(zhǔn)備好食物,當(dāng)廚師準(zhǔn)備好后就會(huì)通知服務(wù)員,之后服務(wù)員上菜,然后服務(wù)員繼續(xù)等待。

package com.jc.concurrency;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * 簡(jiǎn)單的生產(chǎn)者消費(fèi)者例子
 * 此例子有點(diǎn)局限因?yàn)椴荒苡卸嗑€程的生產(chǎn)者、多線程的消費(fèi)者。
 * 這例子僅僅展示如果使用wait()和notify()保證有序
 * @author 
 *
 */
class Meal {
    private final int orderNum;

    public Meal(int orderNum) {
        this.orderNum = orderNum;
    }

    public String toString() {
        return "Meal " + orderNum;
    }
}

class WaitPerson implements Runnable {
    private Restaurant restaurant;

    public WaitPerson(Restaurant r) {
        restaurant = r;
    }

    public void run() {
        try {
            while (!Thread.interrupted()) {
                synchronized (this) {
                    while (restaurant.meal == null)
                        wait(); // ... for the chef to produce a meal
                }
                System.out.println("Waitperson got " + restaurant.meal);
                synchronized (restaurant.chef) {
                    restaurant.meal = null;
                    restaurant.chef.notifyAll(); // Ready for another
                }
            }
        } catch (InterruptedException e) {
            System.out.println("WaitPerson interrupted");
        }
    }
}

class Chef implements Runnable {
    private Restaurant restaurant;
    private int count = 0;

    public Chef(Restaurant r) {
        restaurant = r;
    }

    public void run() {
        try {
            while (!Thread.interrupted()) {
                synchronized (this) {
                    while (restaurant.meal != null)
                        wait(); // ... for the meal to be taken
                }
                if (++count == 10) {
                    System.out.println("Out of food, closing");
                    restaurant.exec.shutdownNow();
                }
                System.out.println("Order up! ");
                synchronized (restaurant.waitPerson) {
                    restaurant.meal = new Meal(count);
                    restaurant.waitPerson.notifyAll();
                }
                TimeUnit.MILLISECONDS.sleep(100);
            }
        } catch (InterruptedException e) {
            System.out.println("Chef interrupted");
        }
    }
}

public class Restaurant {
    Meal meal;
    ExecutorService exec = Executors.newCachedThreadPool();
    WaitPerson waitPerson = new WaitPerson(this);
    Chef chef = new Chef(this);

    public Restaurant() {
        exec.execute(chef);
        exec.execute(waitPerson);
    }

    public static void main(String[] args) {
        new Restaurant();
    }
}

使用顯式鎖Lock和Condition對(duì)象:

package com.jc.concurrency.waxomatic2;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
 * 使用顯式的Lock和Condition對(duì)象來(lái)修改WaxOMatic例子
 * @author 
 *
 */
class Car {
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();
    private boolean waxOn = false;

    public void waxed() {
        lock.lock();
        try {
            waxOn = true; // Ready to buff
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }

    public void buffed() {
        lock.lock();
        try {
            waxOn = false; // Ready for another coat of wax
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }

    public void waitForWaxing() throws InterruptedException {
        lock.lock();
        try {
            while (waxOn == false)
                condition.await();
        } finally {
            lock.unlock();
        }
    }

    public void waitForBuffing() throws InterruptedException {
        lock.lock();
        try {
            while (waxOn == true)
                condition.await();
        } finally {
            lock.unlock();
        }
    }
}

class WaxOn implements Runnable {
    private Car car;

    public WaxOn(Car c) {
        car = c;
    }

    public void run() {
        try {
            while (!Thread.interrupted()) {
                System.out.print("Wax On! ");
                TimeUnit.MILLISECONDS.sleep(200);
                car.waxed();
                car.waitForBuffing();
            }
        } catch (InterruptedException e) {
            System.out.println("Exiting via interrupt");
        }
        System.out.println("Ending Wax On task");
    }
}

class WaxOff implements Runnable {
    private Car car;

    public WaxOff(Car c) {
        car = c;
    }

    public void run() {
        try {
            while (!Thread.interrupted()) {
                car.waitForWaxing();
                System.out.print("Wax Off! ");
                TimeUnit.MILLISECONDS.sleep(200);
                car.buffed();
            }
        } catch (InterruptedException e) {
            System.out.println("Exiting via interrupt");
        }
        System.out.println("Ending Wax Off task");
    }
}

public class WaxOMatic2 {
    public static void main(String[] args) throws Exception {
        Car car = new Car();
        ExecutorService exec = Executors.newCachedThreadPool();
        exec.execute(new WaxOff(car));
        exec.execute(new WaxOn(car));
        TimeUnit.SECONDS.sleep(5);
        exec.shutdownNow();
    }
}

基于Lock和鏈表存儲(chǔ)結(jié)構(gòu)寫(xiě)的一個(gè)消息隊(duì)列:

package com.jc.framework.queue;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class JcBlockingQueue {

    private JcQueueData head;
    private JcQueueData tail;
    private int size = 0;
    private int maxSize = Integer.MAX_VALUE;
    private final Lock lock;
    private final Condition full;
    private final Condition empty;

    public JcBlockingQueue() {
        lock = new ReentrantLock();
        full = lock.newCondition();     //角度是生產(chǎn)者
        empty = lock.newCondition();    //角度是消費(fèi)者
    }


    public void enQueue(T t) throws InterruptedException {
        lock.lock();
        if (size == maxSize) {
            full.await();
        }
        if (head == null) {
            head = new JcQueueData<>(t, null);
            tail = head;
            size++;
            empty.signalAll();
            lock.unlock();
            return;
        }


        JcQueueData jcQueueData = new JcQueueData<>(t, null);
        tail.setNext(jcQueueData);
        tail = jcQueueData;
        size++;
        if (size == 1)
            empty.signalAll();
        lock.unlock();

    }

    public T deQueue() throws InterruptedException {
        lock.lock();
        while (head == null) {
            empty.await();
        }

        T t = head.getData();
        if (head.next != null) {
            JcQueueData next = head.next;
            head.next = null;
            head = next;
        } else {
            head = null;
            tail = null;
        }
        size--;
        if(size==maxSize-1)
            full.signalAll();
        lock.unlock();
        return t;
    }

    public int size() {
        return size;
    }


    private class JcQueueData {

        private T data;
        private JcQueueData next;

        public JcQueueData(T data, JcQueueData next) {
            this.data = data;
            this.next = next;
        }

        public T getData() {
            return data;
        }

        public void setData(T data) {
            this.data = data;
        }

        public JcQueueData getNext() {
            return next;
        }

        public void setNext(JcQueueData next) {
            this.next = next;
        }
    }

}
ExecutorService的shutdown

ExecutorService的shutdown方法,這有可能還有工作正在執(zhí)行或準(zhǔn)備執(zhí)行,這情況下,它只是通知線程池再?zèng)]有更多任務(wù)需要增加到它的內(nèi)部隊(duì)列,而且一旦完成所有等待的工作,就應(yīng)當(dāng)關(guān)閉。

對(duì)應(yīng)的還有shutdownNow(),此方法中止當(dāng)前處理中的任務(wù),并忽略所有等待的任務(wù)。

參考:《Java編程思想》

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/69285.html

相關(guān)文章

  • 2018年第33-javeer對(duì)nodejs體會(huì)

    摘要:流程源處理源代碼,例如過(guò)濾任何值。工藝類(lèi)從編譯后處理生成的文件,例如對(duì)類(lèi)進(jìn)行字節(jié)碼增強(qiáng)。整合后的測(cè)試執(zhí)行集成測(cè)試后執(zhí)行所需的操作。校驗(yàn)運(yùn)行任何檢查以驗(yàn)證包裝是否有效并符合質(zhì)量標(biāo)準(zhǔn)。 nodejs和es6 nodejs的語(yǔ)法和es6不一樣,如模塊系統(tǒng),一個(gè)是CommonJS的require、一個(gè)是es6的import,寫(xiě)模塊也不一樣。 nodejs的npm 我來(lái)理解,nodejs類(lèi)似與j...

    xiongzenghui 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<