成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

線程池工作竊取實(shí)例

ruicbAndroid / 3380人閱讀

摘要:序本文主要來(lái)展示一下簡(jiǎn)版的線程池的實(shí)現(xiàn)。默認(rèn)提供了幾個(gè)工廠方法思路主要用到的是雙端隊(duì)列,不過(guò)這里我們粗糙的實(shí)現(xiàn)的話,也可以不用到。測(cè)試實(shí)例輸出從數(shù)據(jù)來(lái)看,還是相對(duì)均勻的。

本文主要來(lái)展示一下簡(jiǎn)版的work stealing線程池的實(shí)現(xiàn)。

Executors

Executors默認(rèn)提供了幾個(gè)工廠方法

/**
     * Creates a thread pool that maintains enough threads to support
     * the given parallelism level, and may use multiple queues to
     * reduce contention. The parallelism level corresponds to the
     * maximum number of threads actively engaged in, or available to
     * engage in, task processing. The actual number of threads may
     * grow and shrink dynamically. A work-stealing pool makes no
     * guarantees about the order in which submitted tasks are
     * executed.
     *
     * @param parallelism the targeted parallelism level
     * @return the newly created thread pool
     * @throws IllegalArgumentException if {@code parallelism <= 0}
     * @since 1.8
     */
    public static ExecutorService newWorkStealingPool(int parallelism) {
        return new ForkJoinPool
            (parallelism,
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

    /**
     * Creates a work-stealing thread pool using all
     * {@link Runtime#availableProcessors available processors}
     * as its target parallelism level.
     * @return the newly created thread pool
     * @see #newWorkStealingPool(int)
     * @since 1.8
     */
    public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }
思路

ForkJoinPool主要用到的是雙端隊(duì)列,不過(guò)這里我們粗糙的實(shí)現(xiàn)的話,也可以不用到deque。

public class WorkStealingChannel {

    private static final Logger LOGGER = LoggerFactory.getLogger(WorkStealingChannel.class);

    BlockingDeque[] managedQueues;

    AtomicLongMap stat = AtomicLongMap.create();

    public WorkStealingChannel() {
        int nCPU = Runtime.getRuntime().availableProcessors();
        int queueCount = nCPU / 2 + 1;
        managedQueues = new LinkedBlockingDeque[queueCount];
        for(int i=0;i();
        }
    }

    public void put(T item) throws InterruptedException {
        int targetIndex = Math.abs(item.hashCode() % managedQueues.length);
        BlockingQueue targetQueue = managedQueues[targetIndex];
        targetQueue.put(item);
    }

    public T take() throws InterruptedException {
        int rdnIdx = ThreadLocalRandom.current().nextInt(managedQueues.length);
        int idx = rdnIdx;
        while (true){
            idx = idx % managedQueues.length;
            T item = null;
            if(idx == rdnIdx){
                item = managedQueues[idx].poll();
            }else{
                item = managedQueues[idx].pollLast();
            }
            if(item != null){
                LOGGER.info("take ele from queue {}",idx);
                stat.addAndGet(idx,1);
                return item;
            }
            idx++;
            if(idx == rdnIdx){
                break;
            }
        }

        //走完一輪沒(méi)有,則隨機(jī)取一個(gè)等待
        LOGGER.info("wait for queue:{}",rdnIdx);
        stat.addAndGet(rdnIdx,1);
        return managedQueues[rdnIdx].take();
    }

    public AtomicLongMap getStat() {
        return stat;
    }
}

這里根據(jù)cpu的數(shù)量建立了幾個(gè)deque,然后每次put的時(shí)候,根據(jù)hashcode取模放到對(duì)應(yīng)的隊(duì)列。然后獲取的時(shí)候,先從隨機(jī)一個(gè)隊(duì)列取,沒(méi)有的話,再robbin round取其他隊(duì)列的,還沒(méi)有的話,則阻塞等待指定隊(duì)列的元素。

測(cè)試實(shí)例

public class WorkStealingDemo {

    static final WorkStealingChannel channel = new WorkStealingChannel<>();

    static volatile boolean running = true;

    static class Producer extends Thread{
        @Override
        public void run() {
            while(running){
                try {
                    channel.put(UUID.randomUUID().toString());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    static class Consumer extends Thread{
        @Override
        public void run() {
            while(running){
                try {
                    String value = channel.take();
                    System.out.println(value);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static void stop(){
        running = false;
        System.out.println(channel.getStat());
    }


    public static void main(String[] args) throws InterruptedException {
        int nCPU = Runtime.getRuntime().availableProcessors();
        int consumerCount = nCPU / 2 + 1;
        for (int i = 0; i < nCPU; i++) {
            new Producer().start();
        }

        for (int i = 0; i < consumerCount; i++) {
            new Consumer().start();
        }

        Thread.sleep(30*1000);
        stop();
    }
}

輸出

{0=660972, 1=660613, 2=661537, 3=659846, 4=659918}

從數(shù)據(jù)來(lái)看,還是相對(duì)均勻的。

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/70405.html

相關(guān)文章

  • Java? 教程(執(zhí)行器)

    執(zhí)行器 在前面的所有示例中,由新的線程(由其Runnable對(duì)象定義)和線程本身(由Thread對(duì)象定義)完成的任務(wù)之間存在緊密的聯(lián)系,這適用于小型應(yīng)用程序,但在大型應(yīng)用程序中,將線程管理和創(chuàng)建與應(yīng)用程序的其余部分分開(kāi)是有意義的,封裝這些函數(shù)的對(duì)象稱為執(zhí)行器,以下小節(jié)詳細(xì)描述了執(zhí)行器。 執(zhí)行器接口定義三個(gè)執(zhí)行器對(duì)象類型。 線程池是最常見(jiàn)的執(zhí)行器實(shí)現(xiàn)類型。 Fork/Join是一個(gè)利用多個(gè)處理器的...

    馬忠志 評(píng)論0 收藏0
  • 想進(jìn)大廠?50個(gè)多線程面試題,你會(huì)多少?【后25題】(二)

    摘要:大多數(shù)待遇豐厚的開(kāi)發(fā)職位都要求開(kāi)發(fā)者精通多線程技術(shù)并且有豐富的程序開(kāi)發(fā)調(diào)試優(yōu)化經(jīng)驗(yàn),所以線程相關(guān)的問(wèn)題在面試中經(jīng)常會(huì)被提到。掌握了這些技巧,你就可以輕松應(yīng)對(duì)多線程和并發(fā)面試了。進(jìn)入等待通行準(zhǔn)許時(shí),所提供的對(duì)象。 最近看到網(wǎng)上流傳著,各種面試經(jīng)驗(yàn)及面試題,往往都是一大堆技術(shù)題目貼上去,而沒(méi)有答案。 不管你是新程序員還是老手,你一定在面試中遇到過(guò)有關(guān)線程的問(wèn)題。Java語(yǔ)言一個(gè)重要的特點(diǎn)就...

    caozhijian 評(píng)論0 收藏0
  • Java多線程進(jìn)階(四三)—— J.U.C之executors框架:Fork/Join框架(1) 原

    摘要:同時(shí),它會(huì)通過(guò)的方法將自己注冊(cè)到線程池中。線程池中的每個(gè)工作線程都有一個(gè)自己的任務(wù)隊(duì)列,工作線程優(yōu)先處理自身隊(duì)列中的任務(wù)或順序,由線程池構(gòu)造時(shí)的參數(shù)決定,自身隊(duì)列為空時(shí),以的順序隨機(jī)竊取其它隊(duì)列中的任務(wù)。 showImg(https://segmentfault.com/img/bVbizJb?w=1802&h=762); 本文首發(fā)于一世流云的專欄:https://segmentfau...

    cooxer 評(píng)論0 收藏0
  • 基于Fork/Join框架實(shí)現(xiàn)對(duì)大型浮點(diǎn)數(shù)數(shù)組排序(歸并算法和插入排序算法)

    摘要:方法接受對(duì)象數(shù)組作為參數(shù),目標(biāo)是對(duì)數(shù)組進(jìn)行升序排序。創(chuàng)建一個(gè)對(duì)象,并調(diào)用方法將它提交給線程池。此排序算法不直接返回結(jié)果給調(diào)用方,因此基于類。 分支/合并框架 說(shuō)明 重點(diǎn)是那個(gè)浮點(diǎn)數(shù)數(shù)組排序的例子,從主函數(shù)展開(kāi),根據(jù)序號(hào)看 1、GitHub代碼歡迎star。你們輕輕的一點(diǎn),對(duì)我鼓勵(lì)特大,我有一個(gè)習(xí)慣,看完別人的文章是會(huì)點(diǎn)贊的。 2、個(gè)人認(rèn)為學(xué)習(xí)語(yǔ)言最好的方式就是模仿、思考別人為什么這么寫...

    yuxue 評(píng)論0 收藏0
  • Java多線程進(jìn)階(四三)—— J.U.C之executors框架:Fork/Join框架(2)實(shí)現(xiàn)

    摘要:并不會(huì)為每個(gè)任務(wù)都創(chuàng)建工作線程,而是根據(jù)實(shí)際情況構(gòu)造線程池時(shí)的參數(shù)確定是喚醒已有空閑工作線程,還是新建工作線程。 showImg(https://segmentfault.com/img/bVbiYSP?w=1071&h=707); 本文首發(fā)于一世流云的專欄:https://segmentfault.com/blog... 一、引言 前一章——Fork/Join框架(1) 原理,我們...

    FingerLiu 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<