成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

跟著阿里p7一起學(xué)java高并發(fā) - 第18天:玩轉(zhuǎn)java線程池,這一篇就夠了

AdolphLWQ / 2747人閱讀

摘要:高并發(fā)系列第篇文章。簡(jiǎn)單的說(shuō),在使用了線程池之后,創(chuàng)建線程變成了從線程池中獲取一個(gè)空閑的線程,然后使用,關(guān)閉線程變成了將線程歸還到線程池。如果調(diào)用了線程池的方法,線程池會(huì)提前把核心線程都創(chuàng)造好,并啟動(dòng)線程池允許創(chuàng)建的最大線程數(shù)。

java高并發(fā)系列第18篇文章。

本文主要內(nèi)容

什么是線程池

線程池實(shí)現(xiàn)原理

線程池中常見(jiàn)的各種隊(duì)列

自定義線程創(chuàng)建的工廠

常見(jiàn)的飽和策略

自定義飽和策略

線程池中兩種關(guān)閉方法有何不同

擴(kuò)展線程池

合理地配置線程池

線程池中線程數(shù)量的配置

什么是線程池

大家用jdbc操作過(guò)數(shù)據(jù)庫(kù)應(yīng)該知道,操作數(shù)據(jù)庫(kù)需要和數(shù)據(jù)庫(kù)建立連接,拿到連接之后才能操作數(shù)據(jù)庫(kù),用完之后銷毀。數(shù)據(jù)庫(kù)連接的創(chuàng)建和銷毀其實(shí)是比較耗時(shí)的,真正和業(yè)務(wù)相關(guān)的操作耗時(shí)是比較短的。每個(gè)數(shù)據(jù)庫(kù)操作之前都需要?jiǎng)?chuàng)建連接,為了提升系統(tǒng)性能,后來(lái)出現(xiàn)了數(shù)據(jù)庫(kù)連接池,系統(tǒng)啟動(dòng)的時(shí)候,先創(chuàng)建很多連接放在池子里面,使用的時(shí)候,直接從連接池中獲取一個(gè),使用完畢之后返回到池子里面,繼續(xù)給其他需要者使用,這其中就省去創(chuàng)建連接的時(shí)間,從而提升了系統(tǒng)整體的性能。

線程池和數(shù)據(jù)庫(kù)連接池的原理也差不多,創(chuàng)建線程去處理業(yè)務(wù),可能創(chuàng)建線程的時(shí)間比處理業(yè)務(wù)的時(shí)間還長(zhǎng)一些,如果系統(tǒng)能夠提前為我們創(chuàng)建好線程,我們需要的時(shí)候直接拿來(lái)使用,用完之后不是直接將其關(guān)閉,而是將其返回到線程中中,給其他需要這使用,這樣直接節(jié)省了創(chuàng)建和銷毀的時(shí)間,提升了系統(tǒng)的性能。

簡(jiǎn)單的說(shuō),在使用了線程池之后,創(chuàng)建線程變成了從線程池中獲取一個(gè)空閑的線程,然后使用,關(guān)閉線程變成了將線程歸還到線程池。

線程池實(shí)現(xiàn)原理

當(dāng)向線程池提交一個(gè)任務(wù)之后,線程池的處理流程如下:

判斷是否達(dá)到核心線程數(shù),若未達(dá)到,則直接創(chuàng)建新的線程處理當(dāng)前傳入的任務(wù),否則進(jìn)入下個(gè)流程

線程池中的工作隊(duì)列是否已滿,若未滿,則將任務(wù)丟入工作隊(duì)列中先存著等待處理,否則進(jìn)入下個(gè)流程

是否達(dá)到最大線程數(shù),若未達(dá)到,則創(chuàng)建新的線程處理當(dāng)前傳入的任務(wù),否則交給線程池中的飽和策略進(jìn)行處理。

流程如下圖:

舉個(gè)例子,加深理解:

咱們作為開(kāi)發(fā)者,上面都有開(kāi)發(fā)主管,主管下面帶領(lǐng)幾個(gè)小弟干活,CTO給主管授權(quán)說(shuō),你可以招聘5個(gè)小弟干活,新來(lái)任務(wù),如果小弟還不到吳哥,立即去招聘一個(gè)來(lái)干這個(gè)新來(lái)的任務(wù),當(dāng)5個(gè)小弟都招來(lái)了,再來(lái)任務(wù)之后,將任務(wù)記錄到一個(gè)表格中,表格中最多記錄100個(gè),小弟們會(huì)主動(dòng)去表格中獲取任務(wù)執(zhí)行,如果5個(gè)小弟都在干活,并且表格中也記錄滿了,那你可以將小弟擴(kuò)充到20個(gè),如果20個(gè)小弟都在干活,并且存放任務(wù)的表也滿了,產(chǎn)品經(jīng)理再來(lái)任務(wù)后,是直接拒絕,還是讓產(chǎn)品自己干,這個(gè)由你自己決定,小弟們都盡心盡力在干活,任務(wù)都被處理完了,突然公司業(yè)績(jī)下滑,幾個(gè)員工沒(méi)事干,打醬油,為了節(jié)約成本,CTO主管把小弟控制到5人,其他15個(gè)人直接被干掉了。所以作為小弟們,別讓自己閑著,多干活。

原理:先找?guī)讉€(gè)人干活,大家都忙于干活,任務(wù)太多可以排期,排期的任務(wù)太多了,再招一些人來(lái)干活,最后干活的和排期都達(dá)到上層領(lǐng)導(dǎo)要求的上限了,那需要采取一些其他策略進(jìn)行處理了。對(duì)于長(zhǎng)時(shí)間不干活的人,考慮將其開(kāi)掉,節(jié)約資源和成本。

java中的線程池

jdk中提供了線程池的具體實(shí)現(xiàn),實(shí)現(xiàn)類是:java.util.concurrent.ThreadPoolExecutor,主要構(gòu)造方法:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

corePoolSize:核心線程大小,當(dāng)提交一個(gè)任務(wù)到線程池時(shí),線程池會(huì)創(chuàng)建一個(gè)線程來(lái)執(zhí)行任務(wù),即使有其他空閑線程可以處理任務(wù)也會(huì)創(chuàng)新線程,等到工作的線程數(shù)大于核心線程數(shù)時(shí)就不會(huì)在創(chuàng)建了。如果調(diào)用了線程池的prestartAllCoreThreads方法,線程池會(huì)提前把核心線程都創(chuàng)造好,并啟動(dòng)

maximumPoolSize:線程池允許創(chuàng)建的最大線程數(shù)。如果隊(duì)列滿了,并且以創(chuàng)建的線程數(shù)小于最大線程數(shù),則線程池會(huì)再創(chuàng)建新的線程執(zhí)行任務(wù)。如果我們使用了無(wú)界隊(duì)列,那么所有的任務(wù)會(huì)加入隊(duì)列,這個(gè)參數(shù)就沒(méi)有什么效果了

keepAliveTime:線程池的工作線程空閑后,保持存活的時(shí)間。如果沒(méi)有任務(wù)處理了,有些線程會(huì)空閑,空閑的時(shí)間超過(guò)了這個(gè)值,會(huì)被回收掉。如果任務(wù)很多,并且每個(gè)任務(wù)的執(zhí)行時(shí)間比較短,避免線程重復(fù)創(chuàng)建和回收,可以調(diào)大這個(gè)時(shí)間,提高線程的利用率

unit:keepAliveTIme的時(shí)間單位,可以選擇的單位有天、小時(shí)、分鐘、毫秒、微妙、千分之一毫秒和納秒。類型是一個(gè)枚舉java.util.concurrent.TimeUnit,這個(gè)枚舉也經(jīng)常使用,有興趣的可以看一下其源碼

workQueue:工作隊(duì)列,用于緩存待處理任務(wù)的阻塞隊(duì)列,常見(jiàn)的有4種,本文后面有介紹

threadFactory:線程池中創(chuàng)建線程的工廠,可以通過(guò)線程工廠給每個(gè)創(chuàng)建出來(lái)的線程設(shè)置更有意義的名字

handler:飽和策略,當(dāng)線程池?zé)o法處理新來(lái)的任務(wù)了,那么需要提供一種策略處理提交的新任務(wù),默認(rèn)有4種策略,文章后面會(huì)提到

調(diào)用線程池的execute方法處理任務(wù),執(zhí)行execute方法的過(guò)程:

判斷線程池中運(yùn)行的線程數(shù)是否小于corepoolsize,是:則創(chuàng)建新的線程來(lái)處理任務(wù),否:執(zhí)行下一步

試圖將任務(wù)添加到workQueue指定的隊(duì)列中,如果無(wú)法添加到隊(duì)列,進(jìn)入下一步

判斷線程池中運(yùn)行的線程數(shù)是否小于maximumPoolSize,是:則新增線程處理當(dāng)前傳入的任務(wù),否:將任務(wù)傳遞給handler對(duì)象rejectedExecution方法處理

線程池的使用步驟:

調(diào)用構(gòu)造方法創(chuàng)建線程池

調(diào)用線程池的方法處理任務(wù)

關(guān)閉線程池

線程池使用的簡(jiǎn)單示例

上一個(gè)簡(jiǎn)單的示例,如下:

package com.itsoku.chat16;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * 跟著阿里p7學(xué)并發(fā),微信公眾號(hào):javacode2018
 */
public class Demo1 {
    static ThreadPoolExecutor executor = new ThreadPoolExecutor(3,
            5,
            10,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue(10),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy());

    public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            int j = i;
            String taskName = "任務(wù)" + j;
            executor.execute(() -> {
                //模擬任務(wù)內(nèi)部處理耗時(shí)
                try {
                    TimeUnit.SECONDS.sleep(j);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + taskName + "處理完畢");
            });
        }
        //關(guān)閉線程池
        executor.shutdown();
    }
}

輸出:

pool-1-thread-1任務(wù)0處理完畢
pool-1-thread-2任務(wù)1處理完畢
pool-1-thread-3任務(wù)2處理完畢
pool-1-thread-1任務(wù)3處理完畢
pool-1-thread-2任務(wù)4處理完畢
pool-1-thread-3任務(wù)5處理完畢
pool-1-thread-1任務(wù)6處理完畢
pool-1-thread-2任務(wù)7處理完畢
pool-1-thread-3任務(wù)8處理完畢
pool-1-thread-1任務(wù)9處理完畢
線程池中常見(jiàn)5種工作隊(duì)列

任務(wù)太多的時(shí)候,工作隊(duì)列用于暫時(shí)緩存待處理的任務(wù),jdk中常見(jiàn)的5種阻塞隊(duì)列:

ArrayBlockingQueue:是一個(gè)基于數(shù)組結(jié)構(gòu)的有界阻塞隊(duì)列,此隊(duì)列按照先進(jìn)先出原則對(duì)元素進(jìn)行排序

LinkedBlockingQueue:是一個(gè)基于鏈表結(jié)構(gòu)的阻塞隊(duì)列,此隊(duì)列按照先進(jìn)先出排序元素,吞吐量通常要高于ArrayBlockingQueue。靜態(tài)工廠方法Executors.newFixedThreadPool使用了這個(gè)隊(duì)列。

SynchronousQueue :一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列,每個(gè)插入操作必須等到另外一個(gè)線程調(diào)用移除操作,否則插入操作一直處理阻塞狀態(tài),吞吐量通常要高于LinkedBlockingQueue,靜態(tài)工廠方法Executors.newCachedThreadPool使用這個(gè)隊(duì)列

PriorityBlockingQueue:優(yōu)先級(jí)隊(duì)列,進(jìn)入隊(duì)列的元素按照優(yōu)先級(jí)會(huì)進(jìn)行排序

前2種隊(duì)列相關(guān)示例就不說(shuō)了,主要說(shuō)一下后面2種隊(duì)列的使用示例。

SynchronousQueue隊(duì)列的線程池
package com.itsoku.chat16;

import java.util.concurrent.*;

/**
 * 跟著阿里p7學(xué)并發(fā),微信公眾號(hào):javacode2018
 */
public class Demo2 {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newCachedThreadPool();
        for (int i = 0; i < 50; i++) {
            int j = i;
            String taskName = "任務(wù)" + j;
            executor.execute(() -> {
                System.out.println(Thread.currentThread().getName() + "處理" + taskName);
                //模擬任務(wù)內(nèi)部處理耗時(shí)
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        executor.shutdown();
    }
}
pool-1-thread-1處理任務(wù)0
pool-1-thread-2處理任務(wù)1
pool-1-thread-3處理任務(wù)2
pool-1-thread-6處理任務(wù)5
pool-1-thread-7處理任務(wù)6
pool-1-thread-4處理任務(wù)3
pool-1-thread-5處理任務(wù)4
pool-1-thread-8處理任務(wù)7
pool-1-thread-9處理任務(wù)8
pool-1-thread-10處理任務(wù)9
pool-1-thread-11處理任務(wù)10
pool-1-thread-12處理任務(wù)11
pool-1-thread-13處理任務(wù)12
pool-1-thread-14處理任務(wù)13
pool-1-thread-15處理任務(wù)14
pool-1-thread-16處理任務(wù)15
pool-1-thread-17處理任務(wù)16
pool-1-thread-18處理任務(wù)17
pool-1-thread-19處理任務(wù)18
pool-1-thread-20處理任務(wù)19
pool-1-thread-21處理任務(wù)20
pool-1-thread-25處理任務(wù)24
pool-1-thread-24處理任務(wù)23
pool-1-thread-23處理任務(wù)22
pool-1-thread-22處理任務(wù)21
pool-1-thread-26處理任務(wù)25
pool-1-thread-27處理任務(wù)26
pool-1-thread-28處理任務(wù)27
pool-1-thread-30處理任務(wù)29
pool-1-thread-29處理任務(wù)28
pool-1-thread-31處理任務(wù)30
pool-1-thread-32處理任務(wù)31
pool-1-thread-33處理任務(wù)32
pool-1-thread-38處理任務(wù)37
pool-1-thread-35處理任務(wù)34
pool-1-thread-36處理任務(wù)35
pool-1-thread-41處理任務(wù)40
pool-1-thread-34處理任務(wù)33
pool-1-thread-39處理任務(wù)38
pool-1-thread-40處理任務(wù)39
pool-1-thread-37處理任務(wù)36
pool-1-thread-42處理任務(wù)41
pool-1-thread-43處理任務(wù)42
pool-1-thread-45處理任務(wù)44
pool-1-thread-46處理任務(wù)45
pool-1-thread-44處理任務(wù)43
pool-1-thread-47處理任務(wù)46
pool-1-thread-50處理任務(wù)49
pool-1-thread-48處理任務(wù)47
pool-1-thread-49處理任務(wù)48

代碼中使用Executors.newCachedThreadPool()創(chuàng)建線程池,看一下的源碼:

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue());
    }

從輸出中可以看出,系統(tǒng)創(chuàng)建了50個(gè)線程處理任務(wù),代碼中使用了SynchronousQueue同步隊(duì)列,這種隊(duì)列比較特殊,放入元素必須要有另外一個(gè)線程去獲取這個(gè)元素,否則放入元素會(huì)失敗或者一直阻塞在那里直到有線程取走,示例中任務(wù)處理休眠了指定的時(shí)間,導(dǎo)致已創(chuàng)建的工作線程都忙于處理任務(wù),所以新來(lái)任務(wù)之后,將任務(wù)丟入同步隊(duì)列會(huì)失敗,丟入隊(duì)列失敗之后,會(huì)嘗試新建線程處理任務(wù)。使用上面的方式創(chuàng)建線程池需要注意,如果需要處理的任務(wù)比較耗時(shí),會(huì)導(dǎo)致新來(lái)的任務(wù)都會(huì)創(chuàng)建新的線程進(jìn)行處理,可能會(huì)導(dǎo)致創(chuàng)建非常多的線程,最終耗盡系統(tǒng)資源,觸發(fā)OOM。

PriorityBlockingQueue優(yōu)先級(jí)隊(duì)列的線程池
package com.itsoku.chat16;

import java.util.concurrent.*;

/**
 * 跟著阿里p7學(xué)并發(fā),微信公眾號(hào):javacode2018
 */
public class Demo3 {

    static class Task implements Runnable, Comparable {

        private int i;
        private String name;

        public Task(int i, String name) {
            this.i = i;
            this.name = name;
        }

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + "處理" + this.name);
        }

        @Override
        public int compareTo(Task o) {
            return Integer.compare(o.i, this.i);
        }
    }

    public static void main(String[] args) {
        ExecutorService executor = new ThreadPoolExecutor(1, 1,
                60L, TimeUnit.SECONDS,
                new PriorityBlockingQueue());
        for (int i = 0; i < 10; i++) {
            String taskName = "任務(wù)" + i;
            executor.execute(new Task(i, taskName));
        }
        for (int i = 100; i >= 90; i--) {
            String taskName = "任務(wù)" + i;
            executor.execute(new Task(i, taskName));
        }
        executor.shutdown();
    }
}

輸出:

pool-1-thread-1處理任務(wù)0
pool-1-thread-1處理任務(wù)100
pool-1-thread-1處理任務(wù)99
pool-1-thread-1處理任務(wù)98
pool-1-thread-1處理任務(wù)97
pool-1-thread-1處理任務(wù)96
pool-1-thread-1處理任務(wù)95
pool-1-thread-1處理任務(wù)94
pool-1-thread-1處理任務(wù)93
pool-1-thread-1處理任務(wù)92
pool-1-thread-1處理任務(wù)91
pool-1-thread-1處理任務(wù)90
pool-1-thread-1處理任務(wù)9
pool-1-thread-1處理任務(wù)8
pool-1-thread-1處理任務(wù)7
pool-1-thread-1處理任務(wù)6
pool-1-thread-1處理任務(wù)5
pool-1-thread-1處理任務(wù)4
pool-1-thread-1處理任務(wù)3
pool-1-thread-1處理任務(wù)2
pool-1-thread-1處理任務(wù)1

輸出中,除了第一個(gè)任務(wù),其他任務(wù)按照優(yōu)先級(jí)高低按順序處理。原因在于:創(chuàng)建線程池的時(shí)候使用了優(yōu)先級(jí)隊(duì)列,進(jìn)入隊(duì)列中的任務(wù)會(huì)進(jìn)行排序,任務(wù)的先后順序由Task中的i變量決定。向PriorityBlockingQueue加入元素的時(shí)候,內(nèi)部會(huì)調(diào)用代碼中Task的compareTo方法決定元素的先后順序。

自定義創(chuàng)建線程的工廠

給線程池中線程起一個(gè)有意義的名字,在系統(tǒng)出現(xiàn)問(wèn)題的時(shí)候,通過(guò)線程堆棧信息可以更容易發(fā)現(xiàn)系統(tǒng)中問(wèn)題所在。自定義創(chuàng)建工廠需要實(shí)現(xiàn)java.util.concurrent.ThreadFactory接口中的Thread newThread(Runnable r)方法,參數(shù)為傳入的任務(wù),需要返回一個(gè)工作線程。

示例代碼:

package com.itsoku.chat16;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 跟著阿里p7學(xué)并發(fā),微信公眾號(hào):javacode2018
 */
public class Demo4 {
    static AtomicInteger threadNum = new AtomicInteger(1);

    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5,
                60L, TimeUnit.SECONDS,
                new ArrayBlockingQueue(10), r -> {
            Thread thread = new Thread(r);
            thread.setName("自定義線程-" + threadNum.getAndIncrement());
            return thread;
        });
        for (int i = 0; i < 5; i++) {
            String taskName = "任務(wù)-" + i;
            executor.execute(() -> {
                System.out.println(Thread.currentThread().getName() + "處理" + taskName);
            });
        }
        executor.shutdown();
    }
}

輸出:

自定義線程-1處理任務(wù)-0
自定義線程-3處理任務(wù)-2
自定義線程-2處理任務(wù)-1
自定義線程-4處理任務(wù)-3
自定義線程-5處理任務(wù)-4

代碼中在任務(wù)中輸出了當(dāng)前線程的名稱,可以看到是我們自定義的名稱。

通過(guò)jstack查看線程的堆棧信息,也可以看到我們自定義的名稱,我們可以將代碼中executor.shutdown();先給注釋掉讓程序先不退出,然后通過(guò)jstack查看,如下:

4種常見(jiàn)飽和策略

當(dāng)線程池中隊(duì)列已滿,并且線程池已達(dá)到最大線程數(shù),線程池會(huì)將任務(wù)傳遞給飽和策略進(jìn)行處理。這些策略都實(shí)現(xiàn)了RejectedExecutionHandler接口。接口中有個(gè)方法:

void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
參數(shù)說(shuō)明:

r:需要執(zhí)行的任務(wù)

executor:當(dāng)前線程池對(duì)象

JDK中提供了4種常見(jiàn)的飽和策略:

AbortPolicy:直接拋出異常

CallerRunsPolicy:在當(dāng)前調(diào)用者的線程中運(yùn)行任務(wù),即隨丟來(lái)的任務(wù),由他自己去處理

DiscardOldestPolicy:丟棄隊(duì)列中最老的一個(gè)任務(wù),即丟棄隊(duì)列頭部的一個(gè)任務(wù),然后執(zhí)行當(dāng)前傳入的任務(wù)

DiscardPolicy:不處理,直接丟棄掉,方法內(nèi)部為空

自定義飽和策略

需要實(shí)現(xiàn)RejectedExecutionHandler接口。任務(wù)無(wú)法處理的時(shí)候,我們想記錄一下日志,我們需要自定義一個(gè)飽和策略,示例代碼:

package com.itsoku.chat16;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 跟著阿里p7學(xué)并發(fā),微信公眾號(hào):javacode2018
 */
public class Demo5 {
    static class Task implements Runnable {
        String name;

        public Task(String name) {
            this.name = name;
        }

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + "處理" + this.name);
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        @Override
        public String toString() {
            return "Task{" +
                    "name="" + name + """ +
                    "}";
        }
    }

    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(1,
                1,
                60L,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue(1),
                Executors.defaultThreadFactory(),
                (r, executors) -> {
                    //自定義飽和策略
                    //記錄一下無(wú)法處理的任務(wù)
                    System.out.println("無(wú)法處理的任務(wù):" + r.toString());
                });
        for (int i = 0; i < 5; i++) {
            executor.execute(new Task("任務(wù)-" + i));
        }
        executor.shutdown();
    }
}

輸出:

無(wú)法處理的任務(wù):Task{name="任務(wù)-2"}
無(wú)法處理的任務(wù):Task{name="任務(wù)-3"}
pool-1-thread-1處理任務(wù)-0
無(wú)法處理的任務(wù):Task{name="任務(wù)-4"}
pool-1-thread-1處理任務(wù)-1

輸出結(jié)果中可以看到有3個(gè)任務(wù)進(jìn)入了飽和策略中,記錄了任務(wù)的日志,對(duì)于無(wú)法處理多任務(wù),我們最好能夠記錄一下,讓開(kāi)發(fā)人員能夠知道。任務(wù)進(jìn)入了飽和策略,說(shuō)明線程池的配置可能不是太合理,或者機(jī)器的性能有限,需要做一些優(yōu)化調(diào)整。

線程池中的2個(gè)關(guān)閉方法

線程池提供了2個(gè)關(guān)閉方法:shutdownshutdownNow,當(dāng)調(diào)用者兩個(gè)方法之后,線程池會(huì)遍歷內(nèi)部的工作線程,然后調(diào)用每個(gè)工作線程的interrrupt方法給線程發(fā)送中斷信號(hào),內(nèi)部如果無(wú)法響應(yīng)中斷信號(hào)的可能永遠(yuǎn)無(wú)法終止,所以如果內(nèi)部有無(wú)線循環(huán)的,最好在循環(huán)內(nèi)部檢測(cè)一下線程的中斷信號(hào),合理的退出。調(diào)用者兩個(gè)方法中任意一個(gè),線程池的isShutdown方法就會(huì)返回true,當(dāng)所有的任務(wù)線程都關(guān)閉之后,才表示線程池關(guān)閉成功,這時(shí)調(diào)用isTerminaed方法會(huì)返回true。

調(diào)用shutdown方法之后,線程池將不再接口新任務(wù),內(nèi)部會(huì)將所有已提交的任務(wù)處理完畢,處理完畢之后,工作線程自動(dòng)退出。

而調(diào)用shutdownNow方法后,線程池會(huì)將還未處理的(在隊(duì)里等待處理的任務(wù))任務(wù)移除,將正在處理中的處理完畢之后,工作線程自動(dòng)退出。

至于調(diào)用哪個(gè)方法來(lái)關(guān)閉線程,應(yīng)該由提交到線程池的任務(wù)特性決定,多數(shù)情況下調(diào)用shutdown方法來(lái)關(guān)閉線程池,如果任務(wù)不一定要執(zhí)行完,則可以調(diào)用shutdownNow方法。

擴(kuò)展線程池

雖然jdk提供了ThreadPoolExecutor這個(gè)高性能線程池,但是如果我們自己想在這個(gè)線程池上面做一些擴(kuò)展,比如,監(jiān)控每個(gè)任務(wù)執(zhí)行的開(kāi)始時(shí)間,結(jié)束時(shí)間,或者一些其他自定義的功能,我們應(yīng)該怎么辦?

這個(gè)jdk已經(jīng)幫我們想到了,ThreadPoolExecutor內(nèi)部提供了幾個(gè)方法beforeExecute、afterExecuteterminated,可以由開(kāi)發(fā)人員自己去這些方法??匆幌戮€程池內(nèi)部的源碼:

try {
    beforeExecute(wt, task);//任務(wù)執(zhí)行之前調(diào)用的方法
    Throwable thrown = null;
    try {
        task.run();
    } catch (RuntimeException x) {
        thrown = x;
        throw x;
    } catch (Error x) {
        thrown = x;
        throw x;
    } catch (Throwable x) {
        thrown = x;
        throw new Error(x);
    } finally {
        afterExecute(task, thrown);//任務(wù)執(zhí)行完畢之后調(diào)用的方法
    }
} finally {
    task = null;
    w.completedTasks++;
    w.unlock();
}

beforeExecute:任務(wù)執(zhí)行之前調(diào)用的方法,有2個(gè)參數(shù),第1個(gè)參數(shù)是執(zhí)行任務(wù)的線程,第2個(gè)參數(shù)是任務(wù)

protected void beforeExecute(Thread t, Runnable r) { }

afterExecute:任務(wù)執(zhí)行完成之后調(diào)用的方法,2個(gè)參數(shù),第1個(gè)參數(shù)表示任務(wù),第2個(gè)參數(shù)表示任務(wù)執(zhí)行時(shí)的異常信息,如果無(wú)異常,第二個(gè)參數(shù)為null

protected void afterExecute(Runnable r, Throwable t) { }

terminated:線程池最終關(guān)閉之后調(diào)用的方法。所有的工作線程都退出了,最終線程池會(huì)退出,退出時(shí)調(diào)用該方法

示例代碼:

package com.itsoku.chat16;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * 跟著阿里p7學(xué)并發(fā),微信公眾號(hào):javacode2018
 */
public class Demo6 {
    static class Task implements Runnable {
        String name;

        public Task(String name) {
            this.name = name;
        }

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + "處理" + this.name);
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        @Override
        public String toString() {
            return "Task{" +
                    "name="" + name + """ +
                    "}";
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(10,
                10,
                60L,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue(1),
                Executors.defaultThreadFactory(),
                (r, executors) -> {
                    //自定義飽和策略
                    //記錄一下無(wú)法處理的任務(wù)
                    System.out.println("無(wú)法處理的任務(wù):" + r.toString());
                }) {
            @Override
            protected void beforeExecute(Thread t, Runnable r) {
                System.out.println(System.currentTimeMillis() + "," + t.getName() + ",開(kāi)始執(zhí)行任務(wù):" + r.toString());
            }

            @Override
            protected void afterExecute(Runnable r, Throwable t) {
                System.out.println(System.currentTimeMillis() + "," + Thread.currentThread().getName() + ",任務(wù):" + r.toString() + ",執(zhí)行完畢!");
            }

            @Override
            protected void terminated() {
                System.out.println(System.currentTimeMillis() + "," + Thread.currentThread().getName() + ",關(guān)閉線程池!");
            }
        };
        for (int i = 0; i < 10; i++) {
            executor.execute(new Task("任務(wù)-" + i));
        }
        TimeUnit.SECONDS.sleep(1);
        executor.shutdown();
    }
}

輸出:

1564324574847,pool-1-thread-1,開(kāi)始執(zhí)行任務(wù):Task{name="任務(wù)-0"}
1564324574850,pool-1-thread-3,開(kāi)始執(zhí)行任務(wù):Task{name="任務(wù)-2"}
pool-1-thread-3處理任務(wù)-2
1564324574849,pool-1-thread-2,開(kāi)始執(zhí)行任務(wù):Task{name="任務(wù)-1"}
pool-1-thread-2處理任務(wù)-1
1564324574848,pool-1-thread-5,開(kāi)始執(zhí)行任務(wù):Task{name="任務(wù)-4"}
pool-1-thread-5處理任務(wù)-4
1564324574848,pool-1-thread-4,開(kāi)始執(zhí)行任務(wù):Task{name="任務(wù)-3"}
pool-1-thread-4處理任務(wù)-3
1564324574850,pool-1-thread-7,開(kāi)始執(zhí)行任務(wù):Task{name="任務(wù)-6"}
pool-1-thread-7處理任務(wù)-6
1564324574850,pool-1-thread-6,開(kāi)始執(zhí)行任務(wù):Task{name="任務(wù)-5"}
1564324574851,pool-1-thread-8,開(kāi)始執(zhí)行任務(wù):Task{name="任務(wù)-7"}
pool-1-thread-8處理任務(wù)-7
pool-1-thread-1處理任務(wù)-0
pool-1-thread-6處理任務(wù)-5
1564324574851,pool-1-thread-10,開(kāi)始執(zhí)行任務(wù):Task{name="任務(wù)-9"}
pool-1-thread-10處理任務(wù)-9
1564324574852,pool-1-thread-9,開(kāi)始執(zhí)行任務(wù):Task{name="任務(wù)-8"}
pool-1-thread-9處理任務(wù)-8
1564324576851,pool-1-thread-2,任務(wù):Task{name="任務(wù)-1"},執(zhí)行完畢!
1564324576851,pool-1-thread-3,任務(wù):Task{name="任務(wù)-2"},執(zhí)行完畢!
1564324576852,pool-1-thread-1,任務(wù):Task{name="任務(wù)-0"},執(zhí)行完畢!
1564324576852,pool-1-thread-4,任務(wù):Task{name="任務(wù)-3"},執(zhí)行完畢!
1564324576852,pool-1-thread-8,任務(wù):Task{name="任務(wù)-7"},執(zhí)行完畢!
1564324576852,pool-1-thread-7,任務(wù):Task{name="任務(wù)-6"},執(zhí)行完畢!
1564324576852,pool-1-thread-5,任務(wù):Task{name="任務(wù)-4"},執(zhí)行完畢!
1564324576853,pool-1-thread-6,任務(wù):Task{name="任務(wù)-5"},執(zhí)行完畢!
1564324576853,pool-1-thread-10,任務(wù):Task{name="任務(wù)-9"},執(zhí)行完畢!
1564324576853,pool-1-thread-9,任務(wù):Task{name="任務(wù)-8"},執(zhí)行完畢!
1564324576853,pool-1-thread-9,關(guān)閉線程池!

從輸出結(jié)果中可以看到,每個(gè)需要執(zhí)行的任務(wù)打印了3行日志,執(zhí)行前由線程池的beforeExecute打印,執(zhí)行時(shí)會(huì)調(diào)用任務(wù)的run方法,任務(wù)執(zhí)行完畢之后,會(huì)調(diào)用線程池的afterExecute方法,從每個(gè)任務(wù)的首尾2條日志中可以看到每個(gè)任務(wù)耗時(shí)2秒左右。線程池最終關(guān)閉之后調(diào)用了terminated方法。

合理地配置線程池

要想合理的配置線程池,需要先分析任務(wù)的特性,可以沖一下幾個(gè)角度分析:

任務(wù)的性質(zhì):CPU密集型任務(wù)、IO密集型任務(wù)和混合型任務(wù)

任務(wù)的優(yōu)先級(jí):高、中、低

任務(wù)的執(zhí)行時(shí)間:長(zhǎng)、中、短

任務(wù)的依賴性:是否依賴其他的系統(tǒng)資源,如數(shù)據(jù)庫(kù)連接。

性質(zhì)不同任務(wù)可以用不同規(guī)模的線程池分開(kāi)處理。CPU密集型任務(wù)應(yīng)該盡可能小的線程,如配置cpu數(shù)量+1個(gè)線程的線程池。由于IO密集型任務(wù)并不是一直在執(zhí)行任務(wù),不能讓cpu閑著,則應(yīng)配置盡可能多的線程,如:cup數(shù)量*2?;旌闲偷娜蝿?wù),如果可以拆分,將其拆分成一個(gè)CPU密集型任務(wù)和一個(gè)IO密集型任務(wù),只要這2個(gè)任務(wù)執(zhí)行的時(shí)間相差不是太大,那么分解后執(zhí)行的吞吐量將高于串行執(zhí)行的吞吐量??梢酝ㄟ^(guò)Runtime.getRuntime().availableProcessors()方法獲取cpu數(shù)量。優(yōu)先級(jí)不同任務(wù)可以對(duì)線程池采用優(yōu)先級(jí)隊(duì)列來(lái)處理,讓優(yōu)先級(jí)高的先執(zhí)行。

使用隊(duì)列的時(shí)候建議使用有界隊(duì)列,有界隊(duì)列增加了系統(tǒng)的穩(wěn)定性,如果采用無(wú)解隊(duì)列,任務(wù)太多的時(shí)候可能導(dǎo)致系統(tǒng)OOM,直接讓系統(tǒng)宕機(jī)。

線程池中線程數(shù)量的配置

線程池匯總線程大小對(duì)系統(tǒng)的性能有一定的影響,我們的目標(biāo)是希望系統(tǒng)能夠發(fā)揮最好的性能,過(guò)多或者過(guò)小的線程數(shù)量無(wú)法有消息的使用機(jī)器的性能。咋Java Concurrency inPractice書(shū)中給出了估算線程池大小的公式:

Ncpu = CUP的數(shù)量
Ucpu = 目標(biāo)CPU的使用率,0<=Ucpu<=1
W/C = 等待時(shí)間與計(jì)算時(shí)間的比例
為保存處理器達(dá)到期望的使用率,最有的線程池的大小等于:
Nthreads = Ncpu × Ucpu × (1+W/C)
一些使用建議

在《阿里巴巴java開(kāi)發(fā)手冊(cè)》中指出了線程資源必須通過(guò)線程池提供,不允許在應(yīng)用中自行顯示的創(chuàng)建線程,這樣一方面是線程的創(chuàng)建更加規(guī)范,可以合理控制開(kāi)辟線程的數(shù)量;另一方面線程的細(xì)節(jié)管理交給線程池處理,優(yōu)化了資源的開(kāi)銷。而線程池不允許使用Executors去創(chuàng)建,而要通過(guò)ThreadPoolExecutor方式,這一方面是由于jdk中Executor框架雖然提供了如newFixedThreadPool()、newSingleThreadExecutor()、newCachedThreadPool()等創(chuàng)建線程池的方法,但都有其局限性,不夠靈活;另外由于前面幾種方法內(nèi)部也是通過(guò)ThreadPoolExecutor方式實(shí)現(xiàn),使用ThreadPoolExecutor有助于大家明確線程池的運(yùn)行規(guī)則,創(chuàng)建符合自己的業(yè)務(wù)場(chǎng)景需要的線程池,避免資源耗盡的風(fēng)險(xiǎn)。

ThreadPoolTaskExecutor 其他知識(shí)點(diǎn)匯總(待補(bǔ)充)

線程池中的所有線程超過(guò)了空閑時(shí)間都會(huì)被銷毀么?

如果allowCoreThreadTimeOut為true,超過(guò)了空閑時(shí)間的所有線程都會(huì)被回收,不過(guò)這個(gè)值默認(rèn)是false,系統(tǒng)會(huì)保留核心線程,其他的會(huì)被回收

空閑線程是如何被銷毀的?

所有運(yùn)行的工作線程會(huì)嘗試從隊(duì)列中獲取任務(wù)去執(zhí)行,超過(guò)一定時(shí)間(keepAliveTime)還沒(méi)有拿到任務(wù),自己主動(dòng)退出

核心線程在線程池創(chuàng)建的時(shí)候會(huì)初始化好么?

默認(rèn)情況下,核心線程不會(huì)進(jìn)行初始化,在剛開(kāi)始調(diào)用線程池執(zhí)行任務(wù)的時(shí)候,傳入一個(gè)任務(wù)會(huì)創(chuàng)建一個(gè)線程,直到達(dá)到核心線程數(shù)。不過(guò)可以在創(chuàng)建線程池之后,調(diào)用其prestartAllCoreThreads提前將核心線程創(chuàng)建好。

java高并發(fā)系列

java高并發(fā)系列 - 第1天:必須知道的幾個(gè)概念

java高并發(fā)系列 - 第2天:并發(fā)級(jí)別

java高并發(fā)系列 - 第3天:有關(guān)并行的兩個(gè)重要定律

java高并發(fā)系列 - 第4天:JMM相關(guān)的一些概念

java高并發(fā)系列 - 第5天:深入理解進(jìn)程和線程

java高并發(fā)系列 - 第6天:線程的基本操作

java高并發(fā)系列 - 第7天:volatile與Java內(nèi)存模型

java高并發(fā)系列 - 第8天:線程組

java高并發(fā)系列 - 第9天:用戶線程和守護(hù)線程

java高并發(fā)系列 - 第10天:線程安全和synchronized關(guān)鍵字

java高并發(fā)系列 - 第11天:線程中斷的幾種方式

java高并發(fā)系列 - 第12天JUC:ReentrantLock重入鎖

java高并發(fā)系列 - 第13天:JUC中的Condition對(duì)象

java高并發(fā)系列 - 第14天:JUC中的LockSupport工具類,必備技能

java高并發(fā)系列 - 第15天:JUC中的Semaphore(信號(hào)量)

java高并發(fā)系列 - 第16天:JUC中等待多線程完成的工具類CountDownLatch,必備技能

java高并發(fā)系列 - 第17天:JUC中的循環(huán)柵欄CyclicBarrier的6種使用場(chǎng)景

java高并發(fā)系列 - 第18天:JAVA線程池,這一篇就夠了

java高并發(fā)系列 - 第19天:JUC中的Executor框架詳解1

高并發(fā)系列連載中,感興趣的加我微信itsoku,一起交流,關(guān)注公眾號(hào):路人甲Java,每天獲取最新連載文章!

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/75673.html

相關(guān)文章

  • java并發(fā)系列 - 19:JUC中的Executor框架詳解1,全面掌握java并發(fā)相關(guān)技術(shù)

    摘要:有三種狀態(tài)運(yùn)行關(guān)閉終止。類類,提供了一系列工廠方法用于創(chuàng)建線程池,返回的線程池都實(shí)現(xiàn)了接口。線程池的大小一旦達(dá)到最大值就會(huì)保持不變,在提交新任務(wù),任務(wù)將會(huì)進(jìn)入等待隊(duì)列中等待。此線程池支持定時(shí)以及周期性執(zhí)行任務(wù)的需求。 這是java高并發(fā)系列第19篇文章。 本文主要內(nèi)容 介紹Executor框架相關(guān)內(nèi)容 介紹Executor 介紹ExecutorService 介紹線程池ThreadP...

    icattlecoder 評(píng)論0 收藏0
  • java并發(fā)系列 - 21java中的CAS操作,java并發(fā)的基石

    摘要:方法由兩個(gè)參數(shù),表示期望的值,表示要給設(shè)置的新值。操作包含三個(gè)操作數(shù)內(nèi)存位置預(yù)期原值和新值。如果處的值尚未同時(shí)更改,則操作成功。中就使用了這樣的操作。上面操作還有一點(diǎn)是將事務(wù)范圍縮小了,也提升了系統(tǒng)并發(fā)處理的性能。 這是java高并發(fā)系列第21篇文章。 本文主要內(nèi)容 從網(wǎng)站計(jì)數(shù)器實(shí)現(xiàn)中一步步引出CAS操作 介紹java中的CAS及CAS可能存在的問(wèn)題 悲觀鎖和樂(lè)觀鎖的一些介紹及數(shù)據(jù)庫(kù)...

    zorro 評(píng)論0 收藏0
  • java并發(fā)系列 - 20:JUC中的Executor框架詳解2

    摘要:示例執(zhí)行一批任務(wù),然后消費(fèi)執(zhí)行結(jié)果代碼如下跟著阿里學(xué)并發(fā),微信公眾號(hào)輸出代碼中傳入了一批任務(wù)進(jìn)行處理,最終將所有處理完成的按任務(wù)完成的先后順序傳遞給進(jìn)行消費(fèi)了。 這是java高并發(fā)系列第20篇文章。 本文內(nèi)容 ExecutorCompletionService出現(xiàn)的背景 介紹CompletionService接口及常用的方法 介紹ExecutorCompletionService類及...

    msup 評(píng)論0 收藏0
  • SpringBoot中并發(fā)定時(shí)任務(wù)的實(shí)現(xiàn)、動(dòng)態(tài)定時(shí)任務(wù)的實(shí)現(xiàn)(看這一篇就夠了

    摘要:也是自帶的一個(gè)基于線程池設(shè)計(jì)的定時(shí)任務(wù)類。其每個(gè)調(diào)度任務(wù)都會(huì)分配到線程池中的一個(gè)線程執(zhí)行,所以其任務(wù)是并發(fā)執(zhí)行的,互不影響。 原創(chuàng)不易,如需轉(zhuǎn)載,請(qǐng)注明出處https://www.cnblogs.com/baixianlong/p/10659045.html,否則將追究法律責(zé)任?。?! 一、在JAVA開(kāi)發(fā)領(lǐng)域,目前可以通過(guò)以下幾種方式進(jìn)行定時(shí)任務(wù) 1、單機(jī)部署模式 Timer:jdk中...

    BWrong 評(píng)論0 收藏0
  • 【推薦】最新200篇:技術(shù)文章整理

    摘要:作為面試官,我是如何甄別應(yīng)聘者的包裝程度語(yǔ)言和等其他語(yǔ)言的對(duì)比分析和主從復(fù)制的原理詳解和持久化的原理是什么面試中經(jīng)常被問(wèn)到的持久化與恢復(fù)實(shí)現(xiàn)故障恢復(fù)自動(dòng)化詳解哨兵技術(shù)查漏補(bǔ)缺最易錯(cuò)過(guò)的技術(shù)要點(diǎn)大掃盲意外宕機(jī)不難解決,但你真的懂?dāng)?shù)據(jù)恢復(fù)嗎每秒 作為面試官,我是如何甄別應(yīng)聘者的包裝程度Go語(yǔ)言和Java、python等其他語(yǔ)言的對(duì)比分析 Redis和MySQL Redis:主從復(fù)制的原理詳...

    BicycleWarrior 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<