成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

實(shí)戰(zhàn)java高并發(fā)程序設(shè)計(jì)第三章(一)

joyvw / 2707人閱讀

摘要:只要線程池未關(guān)閉該策略直接在調(diào)用者線程中運(yùn)行當(dāng)前被丟棄的任務(wù)。顯然這樣做不會(huì)真的丟棄任務(wù)但是任務(wù)提交線程的性能極有可能會(huì)急劇下降。任務(wù)并嘗試再次提交當(dāng)前任務(wù)。

1. 同步控制

synchronized的擴(kuò)展:重入鎖

同步控制不僅有synchronized配合object.wait()以及object.notify(),也有增強(qiáng)版的reentrantLock(重入鎖)

public class ReenterLock implements Runnable{
    public static ReentrantLock lock=new ReentrantLock();
    public static int i=0;
    @Override
    public void run() {
        for(int j=0;j<10000000;j++){
            lock.lock();
            lock.lock();        //此處演示重入性
            try{
                i++;
            }finally{
                lock.unlock();        //退出臨界區(qū)必須解鎖
                lock.unlock();
            }
        }
    }
    public static void main(String[] args) throws InterruptedException {
        ReenterLock tl=new ReenterLock();
        Thread t1=new Thread(tl);
        Thread t2=new Thread(tl);
        t1.start();t2.start();
        t1.join();t2.join();
        System.out.println(i);    //計(jì)算結(jié)果為 20000000

    }
}

我們來(lái)看下reentrantlock相比synchronized鎖有何優(yōu)點(diǎn):

中斷響應(yīng)

面對(duì)死鎖,似乎synchronized沒(méi)有任何主動(dòng)解決策略,而reentrantlock則可以輕松解決

public class IntLock implements Runnable {
    public static ReentrantLock lock1 = new ReentrantLock();
    public static ReentrantLock lock2 = new ReentrantLock();
    int lock;
    /**
     * 控制加鎖順序,方便構(gòu)造死鎖
     * @param lock
     */
    public IntLock(int lock) {
        this.lock = lock;
    }

    @Override
    public void run() {
        try {
            if (lock == 1) {
                lock1.lockInterruptibly();        //可中斷的加鎖
                try{
                    Thread.sleep(500);
                }catch(InterruptedException e){}
                lock2.lockInterruptibly();
            } else {
                lock2.lockInterruptibly();
                try{
                    Thread.sleep(500);
                }catch(InterruptedException e){}
                lock1.lockInterruptibly();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
            System.out.println(Thread.currentThread().getName()+":線程被中斷");
        } finally {
            if (lock1.isHeldByCurrentThread())
                lock1.unlock();
            if (lock2.isHeldByCurrentThread())
                lock2.unlock();
            System.out.println(Thread.currentThread().getName()+":線程退出");
        }
    }
    public static void main(String[] args) throws InterruptedException {
        IntLock r1 = new IntLock(1);
        IntLock r2 = new IntLock(2);
        Thread t1 = new Thread(r1,"線程1");
        Thread t2 = new Thread(r2,"線程2");
        t1.start();t2.start();
        Thread.sleep(1000);
        //中斷其中一個(gè)線程
        t2.interrupt();
    }
}
//    輸出結(jié)果:
//    java.lang.InterruptedException
//        at //java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:898)
//        at //java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1222)
//        at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335)
//        at geym.conc.ch3.synctrl.IntLock.run(IntLock.java:31)
//        at java.lang.Thread.run(Thread.java:745)
//    線程2:線程被中斷
//    線程2:線程退出
//    線程1:線程退出

由上可知,當(dāng)t1,t2形成死鎖時(shí),可以主動(dòng)利用中斷來(lái)解開(kāi),但完成任務(wù)的只有t1,t2被中斷. 而如果換成synchronized則將無(wú)法進(jìn)行中斷

1鎖申請(qǐng)等待時(shí)限

lock1.tryLock();                            //嘗試獲取鎖,獲得立即返回true,未獲得立即返回false
lock1.tryLock(5, TimeUnit.SECONDS);         //嘗試獲取鎖,5秒內(nèi)未獲得則返回false,獲得返回true
public class TryLock implements Runnable {
    public static ReentrantLock lock1 = new ReentrantLock();
    public static ReentrantLock lock2 = new ReentrantLock();
    int lock;
    public TryLock(int lock) {
        this.lock = lock;
    }
    @Override
    public void run() {
        if (lock == 1) {
            while (true) {
                if (lock1.tryLock()) {
                    try {
                        try {
                            Thread.sleep(500);
                        } catch (InterruptedException e) {
                        }
                        if (lock2.tryLock()) {
                            try {
                                System.out.println(Thread.currentThread()
                                        .getId() + ":My Job done");
                                return;
                            } finally {
                                lock2.unlock();
                            }
                        }
                    } finally {
                        lock1.unlock();
                    }
                }
            }
        } else {
            while (true) {
                if (lock2.tryLock()) {
                    try {
                        try {
                            Thread.sleep(500);
                        } catch (InterruptedException e) {
                        }
                        if (lock1.tryLock()) {
                            try {
                                System.out.println(Thread.currentThread()
                                        .getId() + ":My Job done");
                                return;
                            } finally {
                                lock1.unlock();
                            }
                        }
                    } finally {
                        lock2.unlock();
                    }
                }
            }
        }
    }
    public static void main(String[] args) throws InterruptedException {
        TryLock r1 = new TryLock(1);
        TryLock r2 = new TryLock(2);
        Thread t1 = new Thread(r1);
        Thread t2 = new Thread(r2);
        t1.start();
        t2.start();
    }
}
//    15:My Job done
//    14:My Job done

使用trylock可以有效地避免產(chǎn)生死鎖

公平鎖

synchronized鎖為非公平鎖,而reentrantLock既可以是公平鎖也可以是非公平鎖
非公平鎖容易產(chǎn)生饑餓,公平鎖先進(jìn)先出,但效率不敵非公平鎖

public ReentrantLock(boolean fair)

ffffd

重入鎖的搭檔Condition

Condition和object.wait(),object.notify()方法類似
condition的基本方法如下:

void await() throws InterruptedException;        //使當(dāng)前線程等待,釋放鎖,能響應(yīng)signal和signalAll方法,響應(yīng)中斷
void awaitUninterruptibly();                    //類似 await,但不響應(yīng)中斷
long awaitNanos(long nanosTimeout)throws InterruptedException;    //等待一段時(shí)間
boolean await (long time,TimeUnit unit)throws InterruptedException;
boolean awaitUntil(Date deadline)throws InterruptedException;
void signal();    //喚醒一個(gè)等待中的線程
void signalAll();  //喚醒所有等待中的線程

JDK內(nèi)部就有很多對(duì)于ReentrantLock的使用,如ArrayBlockingQueue

    //在 ArrayBlockingQueue中的一些定義
    boolean fair = true;
    private final ReentrantLock lock = new ReentrantLock(fair);
    private final Condition notEmpty = lock.newCondition();
    private final Condition notFull = lock.newCondition();
    //put(方法的實(shí)現(xiàn)  
    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        final E[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();       //put方法做同步
        try {
            try {
                while (count == items.length) //隊(duì)列已滿
                    notFull.await();        //等待隊(duì)列有足夠的空間
            } catch (InterruptedException ie) {
                notFull.signal();
                throw ie;
            }
            insert(e);                  //notFull被通知時(shí),說(shuō)明有足夠的空間
        } finally {
            lock.unlock();
        }
    }
    private void insert(E x) {
        items[putIndex] = x;
        putIndex = inc(putIndex);
        ++count;
        notFull.signal();           //通知take方法的線程,隊(duì)列已有數(shù)據(jù)
    }
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();         //對(duì)take()方法做同步
        try {
            try {
                while (count == 0)          //如果隊(duì)列為空
                    notEmpty.await();        //則消費(fèi)者隊(duì)列要等待一個(gè)非空的信號(hào)
            } catch (InterruptedException ie) {
                notEmpty.signal();
                throw ie;
            }
            E x = extract();
            return x;
        } finally {
            lock.unlock();
        }
    }
    private E extract() {
        final E[] items = this.items;
        E x = items[takeIndex];
        items[takeIndex] = null;
        takeIndex = inc(takeIndex);
        --count;
        notFull.signal();    //通知put線程隊(duì)列已有空閑空間
        return x;
    }

多線程同時(shí)訪問(wèn):信號(hào)量(semaphore)

同步鎖只能允許一個(gè)線程進(jìn)行訪問(wèn),信號(hào)量可以指定多個(gè)線程同時(shí)訪問(wèn)同一個(gè)資源.

    //構(gòu)造方法
    public Semaphore(int permits)                //傳入int表示能同時(shí)訪問(wèn)的線程數(shù)
    public Semaphore(int permits, boolean fair)  //線程數(shù),是否公平鎖
    
    //實(shí)例方法
    public void acquire() throws InterruptedException        //獲取一個(gè)訪問(wèn)權(quán)限,會(huì)阻塞線程,會(huì)被打斷
    public void acquireUninterruptibly()                     //獲取一個(gè)訪問(wèn)權(quán)限,會(huì)阻塞線程,不會(huì)被打斷
    public boolean tryAcquire()                              //獲取一個(gè)訪問(wèn)權(quán)限,立即返回
    public boolean tryAcquire(long timeout, TimeUnit unit)   //獲取一個(gè)訪問(wèn)權(quán)限,嘗試一段時(shí)間
    public void release()                                    //釋放一個(gè)訪問(wèn)權(quán)限
public class SemapDemo implements Runnable {
    final Semaphore semp = new Semaphore(5);
    @Override
    public void run() {
        try {
            semp.acquire();                
            Thread.sleep(2000);
            System.out.println(Thread.currentThread().getId() + ":done!");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            semp.release();                //使用完后要釋放,否則會(huì)引起信號(hào)量泄漏
        }
    }
    public static void main(String[] args) {
        ExecutorService exec = Executors.newFixedThreadPool(20);
        final SemapDemo demo = new SemapDemo();
        for (int i = 0; i < 20; i++) {
            exec.submit(demo);
        }
    }
}

    //輸出結(jié)果
    //每次輸出5個(gè)結(jié)果,對(duì)應(yīng)信號(hào)量的5個(gè)許可

讀寫(xiě)鎖ReadWriteLock
讀寫(xiě)鎖適用于讀多寫(xiě)少的場(chǎng)景,讀讀之間為并行,讀寫(xiě)之間為串行,寫(xiě)寫(xiě)之間也為串行

public class ReadWriteLockDemo {
    private static Lock lock=new ReentrantLock();
    private static ReentrantReadWriteLock readWriteLock=new ReentrantReadWriteLock();       //獲取讀寫(xiě)鎖
    private static Lock readLock = readWriteLock.readLock();                                //讀鎖
    private static Lock writeLock = readWriteLock.writeLock();                              //寫(xiě)鎖
    private int value;
    public Object handleRead(Lock lock) throws InterruptedException{
        try{
            lock.lock();                //模擬讀操作
            Thread.sleep(1000);            //讀操作的耗時(shí)越多,讀寫(xiě)鎖的優(yōu)勢(shì)就越明顯
            return value;                
        }finally{
        lock.unlock();
        }
    }
    public void handleWrite(Lock lock,int index) throws InterruptedException{
        try{
            lock.lock();                //模擬寫(xiě)操作
            Thread.sleep(1000);
            value=index;
        }finally{
        lock.unlock();
        }
    }
    public static void main(String[] args) {
        final ReadWriteLockDemo demo=new ReadWriteLockDemo();
        Runnable readRunnale=new Runnable() {
            @Override
            public void run() {
                try {
                    demo.handleRead(readLock);
//                    demo.handleRead(lock);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        Runnable writeRunnale=new Runnable() {
            @Override
            public void run() {
                try {
                    demo.handleWrite(writeLock,new Random().nextInt());
//                    demo.handleWrite(lock,new Random().nextInt());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        for(int i=0;i<18;i++){
            new Thread(readRunnale).start();
        }
        
        for(int i=18;i<20;i++){
            new Thread(writeRunnale).start();
        }    
    }
}

//結(jié)果:
//讀寫(xiě)鎖明顯要比單純的鎖要更快結(jié)束,說(shuō)明讀寫(xiě)鎖確實(shí)提升不少效率

倒計(jì)數(shù)器CountDownLatch

讓一個(gè)線程等待,知道倒計(jì)時(shí)結(jié)束

public class CountDownLatchDemo implements Runnable {
    static final CountDownLatch end = new CountDownLatch(10);        //構(gòu)造倒計(jì)時(shí)器,倒計(jì)數(shù)為10
    static final CountDownLatchDemo demo=new CountDownLatchDemo();   
    @Override
    public void run() {
        try {
            //模擬檢查任務(wù)
            Thread.sleep(new Random().nextInt(10)*1000);
            System.out.println("check complete");
            end.countDown();                                        //倒計(jì)時(shí)器減1
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    public static void main(String[] args) throws InterruptedException {
        ExecutorService exec = Executors.newFixedThreadPool(10);
        for(int i=0;i<10;i++){
            exec.submit(demo);
        }
        //等待檢查
        end.await();                        //主線程阻塞,待其他線程全部完成后再喚醒主線程
        //發(fā)射火箭
        System.out.println("Fire!");
        exec.shutdown();
    }
}

循環(huán)柵欄CyclicBarrier
循環(huán)柵欄類似于倒計(jì)時(shí)器,但是計(jì)數(shù)器可以反復(fù)使用,cyclicBarrier比CountDownLatch稍微強(qiáng)大些,可以傳入一個(gè)barrierAction,barrierAction指每次完成計(jì)數(shù)便出發(fā)一次

public CyclicBarrier(int parties,Runnable barrierAction)  //構(gòu)造方法
public class CyclicBarrierDemo {
    public static class Soldier implements Runnable {
        private String soldier;
        private final CyclicBarrier cyclic;
        Soldier(CyclicBarrier cyclic, String soldierName) {
            this.cyclic = cyclic;
            this.soldier = soldierName;
        }
        public void run() {
            try {
                //等待所有士兵到齊
                cyclic.await();                        //觸發(fā)一次循環(huán)柵欄,達(dá)到計(jì)數(shù)器后才會(huì)進(jìn)行下一步工作
                doWork();
                //等待所有士兵完成工作
                cyclic.await();                        //再次觸發(fā)循環(huán)柵欄,達(dá)到計(jì)數(shù)器后才會(huì)進(jìn)行下一步工作
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
        void doWork() {
            try {
                Thread.sleep(Math.abs(new Random().nextInt()%10000));        //模擬工作
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(soldier + ":任務(wù)完成");
        }
    }
    public static class BarrierRun implements Runnable {            //用于傳入CyclicBarrier的構(gòu)造方法,作為達(dá)到計(jì)數(shù)器數(shù)值后的觸發(fā)任務(wù), 可以被多次調(diào)用
        boolean flag;
        int N;
        public BarrierRun(boolean flag, int N) {
            this.flag = flag;
            this.N = N;
        }
        public void run() {
            if (flag) {
                System.out.println("司令:[士兵" + N + "個(gè),任務(wù)完成!]");
            } else {
                System.out.println("司令:[士兵" + N + "個(gè),集合完畢!]");
                flag = true;
            }
        }
    }
    public static void main(String args[]) throws InterruptedException {
        final int N = 10;
        Thread[] allSoldier=new Thread[N];
        boolean flag = false;
        CyclicBarrier cyclic = new CyclicBarrier(N, new BarrierRun(flag, N));
        //設(shè)置屏障點(diǎn),主要是為了執(zhí)行這個(gè)方法
        System.out.println("集合隊(duì)伍!");
        for (int i = 0; i < N; ++i) {
            System.out.println("士兵 "+i+" 報(bào)道!");
            allSoldier[i]=new Thread(new Soldier(cyclic, "士兵 " + i));
            allSoldier[i].start();
        }
    }
}
注意:
一旦其中一個(gè)被interrupt后,很可能會(huì)拋出一個(gè)interruptExpection和9個(gè)BrokenBarrierException,表示該循環(huán)柵欄已破損,防止其他線程進(jìn)行無(wú)所謂的長(zhǎng)久等待

線程阻塞工具LockSupport
LockSupport是一個(gè)非常實(shí)用的線程阻塞工具,不需要獲取某個(gè)對(duì)象的鎖(如wait),也不會(huì)拋出interruptedException異常

public static void park()        //掛起當(dāng)前線程,
public static void park(Object blocker)        //掛起當(dāng)前線程,顯示阻塞對(duì)象,parking to wait for <地址值>
public class LockSupportDemo {
    public static Object u = new Object();
    static ChangeObjectThread t1 = new ChangeObjectThread("t1");
    static ChangeObjectThread t2 = new ChangeObjectThread("t2");
    public static class ChangeObjectThread extends Thread {
        public ChangeObjectThread(String name){
            super.setName(name);
        }
        @Override
        public void run() {
            synchronized (u) {
                System.out.println("in "+getName());
                LockSupport.park(this);                
            }
        }
    }
    public static void main(String[] args) throws InterruptedException {
        t1.start();
        Thread.sleep(100);
        t2.start();
        LockSupport.unpark(t1);
        LockSupport.unpark(t2);    //即使unpark發(fā)生在park前,也可以使程序正常結(jié)束
        t1.join();
        t2.join();
    }
}
LockSupport使用了類似信號(hào)量的機(jī)制,它為每個(gè)線程準(zhǔn)備一個(gè)許可,如果許可可用,park立即返回,并且消費(fèi)這個(gè)許可(轉(zhuǎn)為不可用),如果許可不可用,就會(huì)阻塞,而unpark方法就是使一個(gè)許可變?yōu)榭捎?locksupport.park()可以相應(yīng)中斷,但是不會(huì)拋出interruptedException,我們可以用Thread.interrupted等方法中獲取中斷標(biāo)記.
public class LockSupportIntDemo {
    public static Object u = new Object();
    static ChangeObjectThread t1 = new ChangeObjectThread("t1");
    static ChangeObjectThread t2 = new ChangeObjectThread("t2");
    public static class ChangeObjectThread extends Thread {
        public ChangeObjectThread(String name){
            super.setName(name);
        }
        @Override
        public void run() {
            synchronized (u) {
                System.out.println("in "+getName());
                LockSupport.park();
                if(Thread.interrupted()){                        //檢測(cè)到中斷位,并清除中斷狀態(tài)
                    System.out.println(getName()+" 被中斷了");
                }
                if (Thread.currentThread().isInterrupted()){    //中斷狀態(tài)已被清除,無(wú)法檢測(cè)到
                    System.out.println(1);
                }
            }
            System.out.println(getName()+"執(zhí)行結(jié)束");
        }
    }
    public static void main(String[] args) throws InterruptedException {
        t1.start();
        Thread.sleep(100);
        t2.start();
        t1.interrupt();
        LockSupport.unpark(t2);
    }
}
//輸出:
//in t1
//t1 被中斷了
//t1執(zhí)行結(jié)束
//in t2
//t2執(zhí)行結(jié)束

Guava和Limiter限流
限流算法一般有兩種:漏桶算法和令牌桶算法
漏桶算法: 利用緩存區(qū),所有請(qǐng)求進(jìn)入系統(tǒng),都在緩存區(qū)中保存,然后以固定的流速流出緩存區(qū)進(jìn)行處理.
令牌桶算法: 桶中存放令牌,每個(gè)請(qǐng)求拿到令牌后才能進(jìn)行處理,如果沒(méi)有令牌,請(qǐng)求要么等待,要么丟棄.RateLimiter就是采用這種算法

public class RateLimiterDemo {
    static RateLimiter limiter = RateLimiter.create(2);        //每秒處理2個(gè)請(qǐng)求
    public static class Task implements Runnable {
        @Override
        public void run() {
            System.out.println(System.currentTimeMillis());
        }
    }
    public static void main(String args[]) throws InterruptedException {
        for (int i = 0; i < 50; i++) {
            limiter.acquire();            //過(guò)剩流量會(huì)等待
            new Thread(new Task()).start();
        }
    }
}
//  某些場(chǎng)景傾向于丟棄過(guò)剩流量,tryAcquire則是立即返回,不會(huì)阻塞
//        for (int i = 0; i < 50; i++) {        
//            if(!limiter.tryAcquire()) {
//                continue;
//            }
//            new Thread(new Task()).start();
//        }
2. 線程池

Executors框架

Executor框架提供了各種類型的線程池,主要有以下工廠方法:

//固定線程數(shù)量,當(dāng)有新任務(wù)提交時(shí),若池中有空閑線程則立即執(zhí)行,若沒(méi)有空閑線程,任務(wù)會(huì)被暫存在一個(gè)任務(wù)隊(duì)列中,直到有空閑線程
public static ExecutorService newFixedThreadPool(int nThreads)
//返回只有一個(gè)線程的線程池,多余任務(wù)被保存到一個(gè)任務(wù)隊(duì)列中,線程空閑時(shí),按先入先出的順序執(zhí)行隊(duì)列中的任務(wù)
public static ExecutorService newSingleThreadPoolExecutor()
//線程數(shù)量不固定,優(yōu)先使用空閑線程,多余任務(wù)會(huì)創(chuàng)建新線程
public static ExecutorService newCachedThreadPool()
//線程數(shù)量為1,給定時(shí)間執(zhí)行某任務(wù),或周期性執(zhí)行任務(wù)
public static ScheduledExecutorService newSingleThreadScheduledExecutor()
//線程數(shù)量可以指定,定時(shí)或周期性執(zhí)行任務(wù)
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

計(jì)劃任務(wù):newScheduledThreadPool主要方法

//給定時(shí)間,對(duì)任務(wù)進(jìn)行一次調(diào)度
public ScheduledFuture schedule(Runnable command,long delay, TimeUnit unit);
//周期調(diào)度,以任務(wù)完成后間隔固定時(shí)間調(diào)度下一個(gè)任務(wù),(兩者相加)
public ScheduledFuture scheduleWithFixedDelay(Runnable command,long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);
//周期調(diào)度,兩個(gè)任務(wù)開(kāi)始的時(shí)間差為固定間隔,如果任務(wù)時(shí)間大于間隔時(shí)間則以任務(wù)時(shí)間為準(zhǔn)(兩者取其大者)
public ScheduledFuture scheduleAtFixedRate(Runnable command,long initialDelay,
                                                  long period,
                                                  TimeUnit unit);
注意:
任務(wù)異常時(shí)后續(xù)所有任務(wù)都將停止調(diào)度,因此必須保證所有任務(wù)異常均被正常處理.

核心線程池 ThreadPoolExecutor

ThreadPoolExecutor構(gòu)造函數(shù):

 public ThreadPoolExecutor(int corePoolSize,         //核心線程池大小
                              int maximumPoolSize,  //最大線程池大小
                              long keepAliveTime,  //線程池中超過(guò)corePoolSize數(shù)目的空閑線程最大存活時(shí)間;可以allowCoreThreadTimeOut(true)使得核心線程有效時(shí)間
                              TimeUnit unit,  //keepAliveTime時(shí)間單位
                              BlockingQueue workQueue, //阻塞任務(wù)隊(duì)列
                              ThreadFactory threadFactory,  //新建線程工廠
                              RejectedExecutionHandler handler ) //當(dāng)提交任務(wù)數(shù)超過(guò)maxmumPoolSize+workQueue之和時(shí),任務(wù)會(huì)交給RejectedExecutionHandler來(lái)處理

workQueue指被提交但是未執(zhí)行的任務(wù)隊(duì)列,是BlockingQueue接口的對(duì)象
1.直接提交隊(duì)列:SynchronousQueue,該隊(duì)列沒(méi)有容量,每個(gè)插入操作對(duì)應(yīng)一個(gè)刪除操作,即提交的任務(wù)總是會(huì)交給線程執(zhí)行,如果沒(méi)有空閑進(jìn)程,則創(chuàng)建新線程,數(shù)量達(dá)最大則執(zhí)行拒絕策略,一般需要設(shè)置很大的maximumPoolSize
2.有界任務(wù)隊(duì)列:ArrayBlockingQueue,有新任務(wù)時(shí),若線程池的實(shí)際線程數(shù)小于corePoolSize,優(yōu)先創(chuàng)建新線程,若大于corePoolSize,加入到等待隊(duì)列,若隊(duì)列已滿,不大于maximumPoolSize前提下,創(chuàng)建新線程執(zhí)行;當(dāng)且僅當(dāng)?shù)却?duì)列滿時(shí)才會(huì)創(chuàng)建新線程,否則數(shù)量一直維持在corePoolSize
3.無(wú)界任務(wù)隊(duì)列:LinkedBlockingQueue,小于corePoolSize時(shí)創(chuàng)建線程,達(dá)到corePoolSize則加入隊(duì)列直到資源消耗殆盡
4.優(yōu)先任務(wù)隊(duì)列:PriorityBlockingQueue,特殊無(wú)界隊(duì)列,總是保證高優(yōu)先級(jí)的任務(wù)先執(zhí)行.

Executors分析
newFixedThreadPool: corePoolSize=maximumPoolSize,線程不會(huì)超過(guò)corePoolSize,使用LinkedBlockingQueue
newSingleThreadPoolExecutor: newFixedThreadPool的弱化版,corePoolSize只有1
newCachedThreadPool: corePoolSize=0,maximumPoolSize為無(wú)窮大,空閑線程60秒回收,使用SynchronousQueue隊(duì)列

ThreadPoolExecutor的execute()方法執(zhí)行邏輯

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn"t, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {    //檢查是否小于corePoolSize
            if (addWorker(command, true))    //添加線程,執(zhí)行任務(wù)
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {    //添加進(jìn)隊(duì)列
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))   //雙重校驗(yàn)
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))    //提交線程池失敗
            reject(command);                    //拒絕執(zhí)行
    }

拒絕策略
AbortPolicy:該策略會(huì)直接拋出異常,阻止系統(tǒng)正常工作。
CallerRunsPolicy:只要線程池未關(guān)閉,該策略直接在調(diào)用者線程中,運(yùn)行當(dāng)前被丟棄的任務(wù)。顯然這樣做不會(huì)真的丟棄任務(wù),但是,任務(wù)提交線程的性能極有可 能會(huì)急劇下降。 任務(wù),并嘗試再次提交當(dāng)前任務(wù)。
DiscardOldestPolicy:該策略將丟棄最老的一個(gè)請(qǐng)求,也就是即將被執(zhí)行的一個(gè)
DiscardPolicy:該策略默默地丟棄無(wú)法處理的任務(wù),不予任何處理。如果允許任務(wù)丟失,我覺(jué)得這可能是最好的一種方案了吧!

自定義ThreadFactory

    public static void main(String[] args) throws InterruptedException {
        MyTask task = new MyTask();
        ExecutorService es = new ThreadPoolExecutor(5, 5,
                0L, TimeUnit.MILLISECONDS,
                new SynchronousQueue(),
                new ThreadFactory(){
                    @Override
                    public Thread newThread(Runnable r) {    //自定義創(chuàng)建線程的方法
                        Thread t= new Thread(r);
                        t.setDaemon(true);
                        System.out.println("create "+t);
                        return t;
                    }
                }
               );
        for (int i = 0; i < 5; i++) {
            es.submit(task);
        }
        Thread.sleep(2000);
    }

擴(kuò)展線程池

public class ExtThreadPool {
    public static class MyTask implements Runnable {
        public String name;
        public MyTask(String name) {
            this.name = name;
        }
        @Override
        public void run() {
            System.out.println("正在執(zhí)行" + ":Thread ID:" + Thread.currentThread().getId()
                    + ",Task Name=" + name);
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue()) {
            @Override
            protected void beforeExecute(Thread t, Runnable r) {
                System.out.println("準(zhǔn)備執(zhí)行:" + ((MyTask) r).name);
            }
            @Override
            protected void afterExecute(Runnable r, Throwable t) {
                System.out.println("執(zhí)行完成:" + ((MyTask) r).name);
            }
            @Override
            protected void terminated() {
                System.out.println("線程池退出");
            }
        };
        for (int i = 0; i < 5; i++) {
            MyTask task = new MyTask("TASK-GEYM-" + i);
            es.execute(task);
            Thread.sleep(10);
        }
        es.shutdown();               //等待所有任務(wù)執(zhí)行完畢后再關(guān)閉
    }
}

異常堆棧消息

線程池中的異常堆??赡懿粫?huì)拋出,需要我們自己去包裝

public class TraceThreadPoolExecutor extends ThreadPoolExecutor {
    public TraceThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
            long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }
    @Override
    public void execute(Runnable task) {
        super.execute(wrap(task, clientTrace(), Thread.currentThread()
                .getName()));
    }
    @Override
    public Future submit(Runnable task) {
        return super.submit(wrap(task, clientTrace(), Thread.currentThread()
                .getName()));
    }
    private Exception clientTrace() {
        return new Exception("Client stack trace");
    }
    private Runnable wrap(final Runnable task, final Exception clientStack,
            String clientThreadName) {
        return new Runnable() {
            @Override
            public void run() {
                try {
                    task.run();                //外層包裹trycatch,即可打印出異常
                } catch (Exception e) {
                    clientStack.printStackTrace();
                    throw e;
                }
            }
        };
    }
}

Fork/Join框架

類似于mapreduce,用于大數(shù)據(jù)量,fork()創(chuàng)造子線程,join表示等待,

public class CountTask extends RecursiveTask{
    private static final int THRESHOLD = 10000;        //任務(wù)分解規(guī)模
    private long start;
    private long end;
    public CountTask(long start,long end){
        this.start=start;
        this.end=end;
    }
    @Override
    public Long compute(){
        long sum=0;
        boolean canCompute = (end-start) subTasks=new ArrayList();
            long pos=start;
            for(int i=0;i<100;i++){
                long lastOne=pos+step;
                if(lastOne>end)lastOne=end;     //最后一個(gè)任務(wù)可能小于step,故需要此步
                CountTask subTask=new CountTask(pos,lastOne);   //子任務(wù)
                pos+=step+1;        //調(diào)整下一個(gè)任務(wù)
                subTasks.add(subTask);
                subTask.fork();     //fork子任務(wù)
            }
            for(CountTask  t:subTasks){
                sum+=t.join();      //聚合任務(wù)
            }
        }
        return sum;
    }
    public static void main(String[]args){
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        CountTask task = new CountTask(0,200000000000L);
        ForkJoinTask result = forkJoinPool.submit(task);
        try{
            long res = result.get();
            System.out.println("sum="+res);
        }catch(InterruptedException e){
            e.printStackTrace();
        }catch(ExecutionException e){
            e.printStackTrace();
        }
    }
}
注意:
如果任務(wù)的劃分層次很多,一直得不到返回,可能有兩種原因:
1.系統(tǒng)內(nèi)線程數(shù)量越積越多,導(dǎo)致性能嚴(yán)重下降
2.函數(shù)調(diào)用層次變多,導(dǎo)致棧溢出

Guava對(duì)線程池的拓展

1.特殊的DirectExecutor線程池

Executor executor=MoreExecutors.directExecutor();    // 僅在當(dāng)前線程運(yùn)行,用于抽象

2.Daemon線程池
提供將普通線程轉(zhuǎn)換為Daemon線程.很多情況下,我們不希望后臺(tái)線程池阻止程序的退出

public class MoreExecutorsDemo2 {
    public static void main(String[] args) {
        ThreadPoolExecutor exceutor = (ThreadPoolExecutor)Executors.newFixedThreadPool(2);
        MoreExecutors.getExitingExecutorService(exceutor);
        exceutor.execute(() -> System.out.println("I am running in " + Thread.currentThread().getName()));
    }
}

3.future模式擴(kuò)展
待續(xù)....

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/75977.html

相關(guān)文章

  • 實(shí)戰(zhàn)java并發(fā)程序設(shè)計(jì)第三(二)

    摘要:的并發(fā)容器并發(fā)集合這是一個(gè)高效的并發(fā)你可以把它理解為一個(gè)線程安全的??梢钥醋饕粋€(gè)線程安全的這是一個(gè)接口,內(nèi)部通過(guò)鏈表數(shù)組等方式實(shí)現(xiàn)了這個(gè)接口。 3. JDK的并發(fā)容器 并發(fā)集合 ConcurrentHashMap:這是一個(gè)高效的并發(fā)HashMap.你可以把它理解為一個(gè)線程安全的HashMap。 CopyOnWriteArrayList:這是一個(gè)List,從名字看就知道它和Ar...

    Sike 評(píng)論0 收藏0
  • 從小白程序路晉升為大廠級(jí)技術(shù)專家我看過(guò)哪些書(shū)籍?(建議收藏)

    摘要:大家好,我是冰河有句話叫做投資啥都不如投資自己的回報(bào)率高。馬上就十一國(guó)慶假期了,給小伙伴們分享下,從小白程序員到大廠高級(jí)技術(shù)專家我看過(guò)哪些技術(shù)類書(shū)籍。 大家好,我是...

    sf_wangchong 評(píng)論0 收藏0
  • 實(shí)戰(zhàn)Java并發(fā)程序設(shè)計(jì)5】讓普通變量也享受原子操作

    摘要:有時(shí)候,由于初期考慮不周,或者后期的需求變化,一些普通變量可能也會(huì)有線程安全的需求。它可以讓你在不改動(dòng)或者極少改動(dòng)原有代碼的基礎(chǔ)上,讓普通的變量也享受操作帶來(lái)的線程安全性,這樣你可以修改極少的代碼,來(lái)獲得線程安全的保證。 有時(shí)候,由于初期考慮不周,或者后期的需求變化,一些普通變量可能也會(huì)有線程安全的需求。如果改動(dòng)不大,我們可以簡(jiǎn)單地修改程序中每一個(gè)使用或者讀取這個(gè)變量的地方。但顯然,這...

    appetizerio 評(píng)論0 收藏0
  • Java學(xué)習(xí)必備書(shū)籍推薦終極版!

    摘要:實(shí)戰(zhàn)高并發(fā)程序設(shè)計(jì)推薦豆瓣評(píng)分書(shū)的質(zhì)量沒(méi)的說(shuō),推薦大家好好看一下。推薦,豆瓣評(píng)分,人評(píng)價(jià)本書(shū)介紹了在編程中條極具實(shí)用價(jià)值的經(jīng)驗(yàn)規(guī)則,這些經(jīng)驗(yàn)規(guī)則涵蓋了大多數(shù)開(kāi)發(fā)人員每天所面臨的問(wèn)題的解決方案。 很早就想把JavaGuide的書(shū)單更新一下了,昨晚加今天早上花了幾個(gè)時(shí)間對(duì)之前的書(shū)單進(jìn)行了分類和補(bǔ)充完善。雖是終極版,但一定還有很多不錯(cuò)的 Java 書(shū)籍我沒(méi)有添加進(jìn)去,會(huì)繼續(xù)完善下去。希望這篇...

    Steve_Wang_ 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<