成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

java中的Executors簡(jiǎn)介與多線程在網(wǎng)站上逐步優(yōu)化的運(yùn)用案例

sunsmell / 2259人閱讀

摘要:的多線程機(jī)制可彌補(bǔ)拋出未檢查的異常,將終止線程執(zhí)行,此時(shí)會(huì)錯(cuò)誤的認(rèn)為任務(wù)都取消了。如果想要不保留,則需要設(shè)置,此時(shí)最小的就是線程池最大的線程數(shù)。

提供Executor的工廠類

忽略了自定義的ThreadFactory、callable和unconfigurable相關(guān)的方法

newFixedxxx:在任意時(shí)刻,最多有nThreads個(gè)線程在處理task;如果所有線程都在運(yùn)行時(shí)來了新的任務(wù),它會(huì)被扔入隊(duì)列;如果有線程在執(zhí)行期間因某種原因終止了運(yùn)行,如果需要執(zhí)行后續(xù)任務(wù),新的線程將取代它

   return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue());

newCachedxxx:新任務(wù)到來如果線程池中有空閑的線程就復(fù)用,否則新建一個(gè)線程。如果一個(gè)線程超過60秒沒有使用,它就會(huì)被關(guān)閉移除線程池

 return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue());

newSingleThreadExecutor:僅使用一個(gè)線程來處理任務(wù),如果這線程掛了,會(huì)產(chǎn)生一個(gè)新的線程來代替它。每一個(gè)任務(wù)被保證按照順序執(zhí)行,而且一次只執(zhí)行一個(gè)

  public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue()));
    }
使用newFixedxxx方法也能實(shí)現(xiàn)類似的作用,但是ThreadPoolExecutor會(huì)提供修改線程數(shù)的方法,F(xiàn)inalizableDelegatedExecutorService則沒有修改的途徑,它在DelegatedExecutorService的基礎(chǔ)
上僅提供了執(zhí)行finalize時(shí)候去關(guān)閉線程,而DelegatedExecutorService僅暴漏ExecutorService自身的方法

newScheduledThreadPool:提供一個(gè)線程池來延遲或者定期執(zhí)行任務(wù)

  public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
              new DelayedWorkQueue());
    }

newSingleThreadScheduledExecutor:提供單個(gè)線程來延遲或者定期執(zhí)行任務(wù),如果執(zhí)行的線程掛了,會(huì)生成新的。

  return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1));
同樣,它保證返回的Executor自身的線程數(shù)不可修改

從上述的實(shí)現(xiàn)可以看出,核心在于三個(gè)部分

ThreadPoolExecutor:提供線程數(shù)相關(guān)的控制

DelegatedExecutorService:僅暴露ExecutorService自身的方法,保證線程數(shù)不變來實(shí)現(xiàn)語義場(chǎng)景

ScheduledExecutorService:提供延遲或者定期執(zhí)行的功能

對(duì)應(yīng)的,相應(yīng)也有不同的隊(duì)列去實(shí)現(xiàn)不同的場(chǎng)景

LinkedBlockingQueue:無界阻塞隊(duì)列

SynchronousQueue:沒有消費(fèi)者消費(fèi)時(shí),新的任務(wù)就會(huì)被阻塞

DelayQueue:隊(duì)列中的任務(wù)過期之后才可以執(zhí)行,否則無法查詢到隊(duì)列中的元素

DelegatedExecutorService

它僅僅是包裝了ExecutorService的方法,交由傳入的ExecutorService來執(zhí)行,所謂的UnConfigurable實(shí)際也就是它沒有暴漏配置各種參數(shù)調(diào)整的方法

  static class DelegatedExecutorService extends AbstractExecutorService {
        private final ExecutorService e;
        DelegatedExecutorService(ExecutorService executor) { e = executor; }
        public void execute(Runnable command) { e.execute(command); }
        public void shutdown() { e.shutdown(); }
        public List shutdownNow() { return e.shutdownNow(); }
        public boolean isShutdown() { return e.isShutdown(); }
        public boolean isTerminated() { return e.isTerminated(); }
        public boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException {
            return e.awaitTermination(timeout, unit);
        }
        public Future submit(Runnable task) {
            return e.submit(task);
        }
        public  Future submit(Callable task) {
            return e.submit(task);
        }
        public  Future submit(Runnable task, T result) {
            return e.submit(task, result);
        }
        public  List> invokeAll(Collection> tasks)
            throws InterruptedException {
            return e.invokeAll(tasks);
        }
        public  List> invokeAll(Collection> tasks,
                                             long timeout, TimeUnit unit)
            throws InterruptedException {
            return e.invokeAll(tasks, timeout, unit);
        }
        public  T invokeAny(Collection> tasks)
            throws InterruptedException, ExecutionException {
            return e.invokeAny(tasks);
        }
        public  T invokeAny(Collection> tasks,
                               long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException {
            return e.invokeAny(tasks, timeout, unit);
        }
    }
ScheduledExecutorService

提供一系列的schedule方法,使得任務(wù)可以延遲或者周期性的執(zhí)行,對(duì)應(yīng)schedule方法會(huì)返回ScheduledFuture以供確認(rèn)是否執(zhí)行以及是否要取消。它的實(shí)現(xiàn)ScheduledThreadPoolExecutor也支持立即執(zhí)行由submit提交的任務(wù)

僅支持相對(duì)延遲時(shí)間,比如距離現(xiàn)在5分鐘后執(zhí)行。類似Timer也可以管理延遲任務(wù)和周期任務(wù),但是存在一些缺陷:

所有的定時(shí)任務(wù)只有一個(gè)線程,如果某個(gè)任務(wù)執(zhí)行時(shí)間長(zhǎng),將影響其它TimerTask的精確性。ScheduledExecutorService的多線程機(jī)制可彌補(bǔ)

TimerTask拋出未檢查的異常,將終止線程執(zhí)行,此時(shí)會(huì)錯(cuò)誤的認(rèn)為任務(wù)都取消了。1:可以使用try-catch-finally對(duì)相應(yīng)執(zhí)行快處理;2:通過execute執(zhí)行的方法可以設(shè)置UncaughtExceptionHandler來接收未捕獲的異常,并作出處理;3:通過submit執(zhí)行的,將被封裝層ExecutionException重新拋出

ThreadPoolExecutor
 public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

corePoolSize、maximumPoolSize:ThreadPoolExecutor會(huì)根據(jù)這兩自動(dòng)調(diào)整線程池的大小,當(dāng)一個(gè)新任務(wù)通過execute提交的時(shí)候:
如果當(dāng)前運(yùn)行的線程數(shù)小于corePoolSize就新建線程;
如果當(dāng)前線程數(shù)在corePoolSize與maximumPoolSize之間,則只有在隊(duì)列滿的時(shí)候才會(huì)創(chuàng)建新的線程;
如果已經(jīng)達(dá)到最大線程數(shù),并且隊(duì)列都滿了,在這種飽和狀態(tài)下就會(huì)執(zhí)行拒絕策略

默認(rèn)情況下,只有新任務(wù)到達(dá)的時(shí)候才會(huì)啟動(dòng)線程,可通過prestartCoreThread方法實(shí)現(xiàn)事先啟動(dòng)

corePoolSize:默認(rèn)線程池所需要維護(hù)的最小的worker的數(shù)量,就算是worker過期了也會(huì)保留。如果想要不保留,則需要設(shè)置allowCoreThreadTimeOut,此時(shí)最小的就是0

maximumPoolSize:線程池最大的線程數(shù)。java限制最多為 2^29-1,大約5億個(gè)

keepAliveTime、unit:如果當(dāng)前線程池有超過corePoolSize的線程數(shù),只要有線程空閑時(shí)間超過keepAliveTime的設(shè)定,就會(huì)被終止;unit則是它的時(shí)間單位

workQueue:任何BlockingQueue都可以使用,基本上有三種

Direct handoffs,直接交付任務(wù)。比如 SynchronousQueue,如果沒有線程消費(fèi),提交任務(wù)會(huì)失敗,當(dāng)然可以新建一個(gè)線程來處理。它適合處理有依賴關(guān)系的任務(wù),一般它的maximumPoolSizes會(huì)被設(shè)置成最大的

Unbounded queues,無界隊(duì)列。比如LinkedBlockingQueue,這意味著如果有corePoolSize個(gè)線程在執(zhí)行,那么其他的任務(wù)都只能等待。它適合于處理任務(wù)都是互相獨(dú)立的,

Bounded queues,有界隊(duì)列。比如ArrayBlockingQueue,需要考慮隊(duì)列大小和最大線程數(shù)之間的關(guān)系,來達(dá)到更好的資源利用率和吞吐量

threadFactory:沒有指定的時(shí)候,使用Executors.defaultThreadFactory

RejectedExecutionHandler:通過execute添加的任務(wù),如果Executor已經(jīng)關(guān)閉或者已經(jīng)飽和了(線程數(shù)達(dá)到了maximumPoolSize,并且隊(duì)列滿了),就會(huì)執(zhí)行,java提供了4種策略:

AbortPolicy,拒絕的時(shí)候拋出運(yùn)行時(shí)異常RejectedExecutionException;

CallerRunsPolicy,如果executor沒有關(guān)閉,那么由調(diào)用execute的線程來執(zhí)行它;

DiscardPolicy,直接扔掉新的任務(wù);

DiscardOldestPolicy,如果executor沒有關(guān)閉,那么扔掉隊(duì)列頭部的任務(wù),再次嘗試;

ThreadPoolExecutor可自定義beforeExecutor、afterExecutor可以用來添加日志統(tǒng)計(jì)、計(jì)時(shí)、件事或統(tǒng)計(jì)信息收集功能,無論run是正常返回還是拋出異常,afterExecutor都會(huì)被執(zhí)行。如果beforeExecutor拋出RuntimeException,任務(wù)和afterExecutor都不會(huì)被執(zhí)行。terminated在所有任務(wù)都已經(jīng)完成,并且所有工作者線程關(guān)閉后會(huì)調(diào)用,此時(shí)也可以用來執(zhí)行發(fā)送通知、記錄日志等等。
如何估算線程池的大小

計(jì)算密集型,通常在擁有$N_{cpu}$個(gè)處理器的系統(tǒng)上,線程池大小設(shè)置為$N_{cpu}+1$能夠?qū)崿F(xiàn)最優(yōu)的利用率;

$N_{cpu}$ cpu的個(gè)數(shù)

I/O密集型或者其它阻塞型的任務(wù),定義 $N_{cpu}$為CPU的個(gè)數(shù),$U_{cpu}$為CPU的利用率,$W/C$為等待時(shí)間與計(jì)算時(shí)間的比率,此時(shí)線程池的最優(yōu)大小為

$$N_{threads}=N_{cpu}*U_{cpu}*(1+W/C)$$

場(chǎng)景說明

將一個(gè)網(wǎng)站的業(yè)務(wù)抽象成如下幾塊

接收客戶端請(qǐng)求與處理請(qǐng)求

頁面渲染返回的文本和圖片

獲取頁面的廣告

接收請(qǐng)求與處理請(qǐng)求 理論模型

理論上,服務(wù)端通過實(shí)現(xiàn)約定的接口就可以實(shí)現(xiàn)接收請(qǐng)求和處理連續(xù)不斷的請(qǐng)求過來

ServerSocket socket = new ServerSocket(80);
while(true){
    Socket conn = socket.accept();
    handleRequest(conn)
}

缺點(diǎn):每次只能處理一個(gè)請(qǐng)求,新請(qǐng)求到來時(shí),必須等到正在處理的請(qǐng)求處理完成,才能接收新的請(qǐng)求

顯示的創(chuàng)建多線程

為每個(gè)請(qǐng)求創(chuàng)建新的線程提供服務(wù)

ServerSocket socket = new ServerSocket(80);
while(true){
    final Socket conn = socket.accept();
    Runnable task = new Runnable(){
        public void run(){
            handleRequest(conn);        
        }
    }
    new Thread(task).start();
}

缺點(diǎn):

線程的創(chuàng)建和銷毀都有一定的開銷,延遲對(duì)請(qǐng)求的處理;

創(chuàng)建后的線程多于可用處理器的數(shù)量,造成線程閑置,這會(huì)給垃圾回收帶來壓力

存活的大量線程競(jìng)爭(zhēng)CPU資源會(huì)產(chǎn)生很多性能開銷

系統(tǒng)上對(duì)可創(chuàng)建的線程數(shù)存在限制

使用線程池

使用java自帶的Executor框架。

private static final Executor exec = Executors.newFixedThreadPool(100);
...
ServerSocket socket = new ServerSocket(80);
while(true){
    final Socket conn = socket.accept();
    Runnable task = new Runnable(){
        public void run(){
            handleRequest(conn);        
        }
    }
    exec.execute(task);
}
...

線程池策略通過實(shí)現(xiàn)預(yù)估好的線程需求,限制并發(fā)任務(wù)的數(shù)量,重用現(xiàn)有的線程,解決每次創(chuàng)建線程的資源耗盡、競(jìng)爭(zhēng)過于激烈和頻繁創(chuàng)建的問題,也囊括了線程的優(yōu)勢(shì),解耦了任務(wù)提交和任務(wù)執(zhí)行。

頁面渲染返回的文本和圖片 串行渲染
renderText(source);
List imageData = new ArrayList();
for(ImageInfo info:scaForImageInfo(source)){
    imageData.add(info.downloadImage());
}
for(ImageData data:imageData){
    renderImage(data);
}

缺點(diǎn):圖像的下載大部分時(shí)間在等待I/O操作執(zhí)行完成,這期間CPU幾乎不做任何工作,使得用戶看到最終頁面之前要等待過長(zhǎng)的時(shí)間

并行化

渲染過程可以分成兩個(gè)部分,1是渲染文本,1是下載圖像

private static final ExecutorService exec = Executors.newFixedThreadPool(100);
...
final List infos=scaForImageInfo(source);
Callable> task=new Callable>(){
    public List call(){
        List r = new ArrayList();
        for(ImageInfo info:infos){
            r.add(info.downloadImage());
        }
        return r;
    }
};
Future> future = exec.submit(task);
renderText(source);
try{
    List imageData = future.get();
    for(ImageData data:imageData){
        renderImage(data);
    }
}catch(InterruptedException e){
    Thread.currentThread().interrupt();
    future.cancel(true);
}catche(ExecutionException e){
    throw launderThrowable(e.getCause());
}

使用Callable來返回下載的圖片結(jié)果,使用future來獲得下載的圖片,這樣將減少用戶所需要的等待時(shí)間。
缺點(diǎn):圖片的下載很明顯時(shí)間要比文本要慢,這樣的并行化很可能速度可能只提升了1%

并行性能提升

使用CompletionService。

private static final ExecutorService exec;
...
final List infos=scaForImageInfo(source);
CompletionService cService =  new ExecutorCompletionService(exec);
for(final ImageInfo info:infos){
    cService.submit(new Callable(){
        public ImageData call(){
            return info.downloadImage();
        }
    });
}
renderText(source);
try{
    for(int i=0,n=info.size();t f = cService.take();
        ImageData imageData=f.get();
        renderImage(imageData)
    }
}catch(InterruptedException e){
    Thread.currentThread().interrupt();
}catche(ExecutionException e){
    throw launderThrowable(e.getCause());
}

核心思路為為每一幅圖像下載都創(chuàng)建一個(gè)獨(dú)立的任務(wù),并在線程池中執(zhí)行他們,從而將串行的下載過程轉(zhuǎn)換為并行的過程

獲取頁面的廣告

廣告展示如果在一定的時(shí)間以內(nèi)沒有獲取,可以不再展示,并取消超時(shí)的任務(wù)。

 ExecutorService exe = Executors.newFixedThreadPool(3);
        List myTasks = new ArrayList<>();
        for (int i=0;i<3;i++){
          myTasks.add(new MyTask(3-i));
        }

        try {

            List> futures = exe.invokeAll(myTasks, 1, TimeUnit.SECONDS);
            for (int i=0;i

invokeAll方法對(duì)于沒有完成的任務(wù)會(huì)被取消,通過CancellationException可以捕獲,invokeAll返回的序列順序和傳入的task保持一致。結(jié)果如下:

task sleep 3 not execute ,because java.util.concurrent.CancellationException
task sleep 2 not execute ,because java.util.concurrent.CancellationException
task execut 1 s

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/72394.html

相關(guān)文章

  • Java并發(fā)編程之多線程線程

    摘要:目標(biāo)線程由運(yùn)行狀態(tài)轉(zhuǎn)換為就緒狀態(tài),也就是讓出執(zhí)行權(quán)限,讓其他線程得以優(yōu)先執(zhí)行,但其他線程能否優(yōu)先執(zhí)行時(shí)未知的。函數(shù)的官方解釋是意思是使調(diào)用該函數(shù)的線程讓出執(zhí)行時(shí)間給其他已就緒狀態(tài)的線程。 線程允許在同一個(gè)進(jìn)程中同時(shí)存在多個(gè)程序控制流,即通過線程可以實(shí)現(xiàn)同時(shí)處理多個(gè)任務(wù)的功能。線程會(huì)共享進(jìn)程范圍內(nèi)的資源,例如內(nèi)存句柄和文件句柄,但每個(gè)線程都有各自的程序計(jì)數(shù)器、棧以及局部變量。 多線程的實(shí)...

    wums 評(píng)論0 收藏0
  • Java線程

    摘要:中的線程池是運(yùn)用場(chǎng)景最多的并發(fā)框架。才是真正的線程池。存放任務(wù)的隊(duì)列存放需要被線程池執(zhí)行的線程隊(duì)列。所以線程池的所有任務(wù)完成后,它最終會(huì)收縮到的大小。飽和策略一般情況下,線程池采用的是,表示無法處理新任務(wù)時(shí)拋出異常。 Java線程池 1. 簡(jiǎn)介 系統(tǒng)啟動(dòng)一個(gè)新線程的成本是比較高的,因?yàn)樗婕芭c操作系統(tǒng)的交互,這個(gè)時(shí)候使用線程池可以提升性能,尤其是需要?jiǎng)?chuàng)建大量聲明周期很短暫的線程時(shí)。Ja...

    jerry 評(píng)論0 收藏0
  • Java 理解面向?qū)ο?/b>

    摘要:面向?qū)ο竺嫦驅(qū)ο蟮娜N基本特征繼承封裝多態(tài)結(jié)構(gòu)化程序設(shè)計(jì)簡(jiǎn)介主要原則自頂向下逐步求精模塊化。在面向?qū)ο蠓椒ㄖ?,類之間共享屬性和操作的機(jī)制稱為繼承。 面向?qū)ο?面向?qū)ο蟮娜N基本特征:繼承、封裝、多態(tài) 結(jié)構(gòu)化程序設(shè)計(jì)簡(jiǎn)介 主要原則:自頂向下、逐步求精、模塊化。 結(jié)構(gòu)化分析SA方法對(duì)系統(tǒng)進(jìn)行需求分析;結(jié)構(gòu)化設(shè)計(jì)SD方法對(duì)系統(tǒng)進(jìn)行概要設(shè)計(jì)、詳細(xì)設(shè)計(jì);結(jié)構(gòu)化編程SP方法來實(shí)現(xiàn)系統(tǒng)。 結(jié)構(gòu)化程序...

    NSFish 評(píng)論0 收藏0
  • 高并發(fā)

    摘要:表示的是兩個(gè),當(dāng)其中任意一個(gè)計(jì)算完并發(fā)編程之是線程安全并且高效的,在并發(fā)編程中經(jīng)??梢娝氖褂?,在開始分析它的高并發(fā)實(shí)現(xiàn)機(jī)制前,先講講廢話,看看它是如何被引入的。電商秒殺和搶購(gòu),是兩個(gè)比較典型的互聯(lián)網(wǎng)高并發(fā)場(chǎng)景。 干貨:深度剖析分布式搜索引擎設(shè)計(jì) 分布式,高可用,和機(jī)器學(xué)習(xí)一樣,最近幾年被提及得最多的名詞,聽名字多牛逼,來,我們一步一步來擊破前兩個(gè)名詞,今天我們首先來說說分布式。 探究...

    supernavy 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<