成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

從0到1實(shí)現(xiàn)自己的阻塞隊(duì)列(上)

niceforbear / 2071人閱讀

摘要:而且在大多數(shù)經(jīng)典的多線程編程資料中,阻塞隊(duì)列都是其中非常重要的一個(gè)實(shí)踐案例。甚至可以說只有自己動(dòng)手實(shí)現(xiàn)了一個(gè)阻塞隊(duì)列才能真正掌握多線程相關(guān)的。為什么會(huì)發(fā)生這種情況呢原因就是在我們實(shí)現(xiàn)的這個(gè)阻塞隊(duì)列中完全沒有線程同步機(jī)制,所以同時(shí)并發(fā)進(jìn)行的個(gè)

阻塞隊(duì)列不止是一道熱門的面試題,同時(shí)也是許多并發(fā)處理模型的基礎(chǔ),比如常用的線程池類ThreadPoolExecutor內(nèi)部就使用了阻塞隊(duì)列來(lái)保存等待被處理的任務(wù)。而且在大多數(shù)經(jīng)典的多線程編程資料中,阻塞隊(duì)列都是其中非常重要的一個(gè)實(shí)踐案例。甚至可以說只有自己動(dòng)手實(shí)現(xiàn)了一個(gè)阻塞隊(duì)列才能真正掌握多線程相關(guān)的API。

在這篇文章中,我們會(huì)從一個(gè)最簡(jiǎn)單的原型開始一步一步完善為一個(gè)類似于JDK中阻塞隊(duì)列實(shí)現(xiàn)的真正實(shí)用的阻塞隊(duì)列。在這個(gè)過程中,我們會(huì)一路涉及synchronized關(guān)鍵字、條件變量、顯式鎖ReentrantLock等等多線程編程的關(guān)鍵技術(shù),最終掌握J(rèn)ava多線程編程的完整理論和實(shí)踐知識(shí)。

閱讀本文需要了解基本的多線程編程概念與互斥鎖的使用,還不了解的讀者可以參考一下這篇文章《多線程中那些看不見的陷阱》中到ReentrantLock部分為止的內(nèi)容。

什么是阻塞隊(duì)列?

阻塞隊(duì)列是這樣的一種數(shù)據(jù)結(jié)構(gòu),它是一個(gè)隊(duì)列(類似于一個(gè)List),可以存放0到N個(gè)元素。我們可以對(duì)這個(gè)隊(duì)列執(zhí)行插入或彈出元素操作,彈出元素操作就是獲取隊(duì)列中的第一個(gè)元素,并且將其從隊(duì)列中移除;而插入操作就是將元素添加到隊(duì)列的末尾。當(dāng)隊(duì)列中沒有元素時(shí),對(duì)這個(gè)隊(duì)列的彈出操作將會(huì)被阻塞,直到有元素被插入時(shí)才會(huì)被喚醒;當(dāng)隊(duì)列已滿時(shí),對(duì)這個(gè)隊(duì)列的插入操作就會(huì)被阻塞,直到有元素被彈出后才會(huì)被喚醒。

在線程池中,往往就會(huì)用阻塞隊(duì)列來(lái)保存那些暫時(shí)沒有空閑線程可以直接執(zhí)行的任務(wù),等到線程空閑之后再?gòu)淖枞?duì)列中彈出任務(wù)來(lái)執(zhí)行。一旦隊(duì)列為空,那么線程就會(huì)被阻塞,直到有新任務(wù)被插入為止。

一個(gè)最簡(jiǎn)單的版本 代碼實(shí)現(xiàn)

我們先來(lái)實(shí)現(xiàn)一個(gè)最簡(jiǎn)單的隊(duì)列,在這個(gè)隊(duì)列中我們不會(huì)添加任何線程同步措施,而只是實(shí)現(xiàn)了最基本的隊(duì)列與阻塞特性。 那么首先,一個(gè)隊(duì)列可以存放一定量的元素,而且可以執(zhí)行插入元素和彈出元素的操作。然后因?yàn)檫@個(gè)隊(duì)列還是一個(gè)阻塞隊(duì)列,那么在隊(duì)列為空時(shí),彈出元素的操作將會(huì)被阻塞,直到隊(duì)列中被插入新的元素可供彈出為止;而在隊(duì)列已滿的情況下,插入元素的操作將會(huì)被阻塞,直到隊(duì)列中有元素被彈出為止。

下面我們會(huì)將這個(gè)最初的阻塞隊(duì)列實(shí)現(xiàn)類拆解為獨(dú)立的幾塊分別講解和實(shí)現(xiàn),到最后就能拼裝出一個(gè)完整的阻塞隊(duì)列類了。為了在阻塞隊(duì)列中保存元素,我們首先要定義一個(gè)數(shù)組來(lái)保存元素,也就是下面代碼中的items字段了,這是一個(gè)Object數(shù)組,所以可以保存任意類型的對(duì)象。在最后的構(gòu)造器中,會(huì)傳入一個(gè)capacity參數(shù)來(lái)指定items數(shù)組的大小,這個(gè)值也就是我們的阻塞隊(duì)列的大小了。

takeIndexputIndex就是我們插入和彈出元素的下標(biāo)位置了,為什么要分別用兩個(gè)整型來(lái)保存這樣的位置呢?因?yàn)樽枞?duì)列在使用的過程中會(huì)不斷地被插入和彈出元素,所以可以認(rèn)為元素在數(shù)組中是像貪吃蛇一樣一步一步往前移動(dòng)的,每次彈出的都是隊(duì)列中的第一個(gè)元素,而插入的元素則會(huì)被添加到隊(duì)列的末尾。當(dāng)下標(biāo)到達(dá)末尾時(shí)會(huì)被設(shè)置為0,從數(shù)組的第一個(gè)下標(biāo)位置重新開始向后增長(zhǎng),形成一個(gè)不斷循環(huán)的過程。

那么如果隊(duì)列中存儲(chǔ)的個(gè)數(shù)超過items數(shù)組的長(zhǎng)度時(shí),新插入的元素豈不是會(huì)覆蓋隊(duì)列開頭還沒有被彈出的元素了嗎?這時(shí)我們的最后一個(gè)字段count就能派上用場(chǎng)了,當(dāng)count等于items.length時(shí),插入操作就會(huì)被阻塞,直到隊(duì)列中有元素被彈出時(shí)為止。那么這種阻塞是如何實(shí)現(xiàn)的呢?我們接下來(lái)來(lái)看一下put()方法如何實(shí)現(xiàn)。

    /** 存放元素的數(shù)組 */
    private final Object[] items;
    
    /** 彈出元素的位置 */
    private int takeIndex;

    /** 插入元素的位置 */
    private int putIndex;
    
    /** 隊(duì)列中的元素總數(shù) */
    private int count;
    
    /**
     * 指定隊(duì)列大小的構(gòu)造器
     *
     * @param capacity  隊(duì)列大小
     */
    public BlockingQueue(int capacity) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        // putIndex, takeIndex和count都會(huì)被默認(rèn)初始化為0
        items = new Object[capacity];
    }

下面是put()take()方法的實(shí)現(xiàn),put()方法向隊(duì)列末尾添加新元素,而take()方法從隊(duì)列中彈出最前面的一個(gè)元素,我們首先來(lái)看一下我們目前最關(guān)心的put()方法。在put()方法的開頭,我們可以看到有一個(gè)判斷count是否達(dá)到了items.length(隊(duì)列大?。┑膇f語(yǔ)句,如果count不等于items.length,那么就表示隊(duì)列還沒有滿,隨后就直接調(diào)用了enqueue方法對(duì)元素進(jìn)行了入隊(duì)。enqueue方法的實(shí)現(xiàn)會(huì)在稍后介紹,這里我們只需要知道這個(gè)入隊(duì)方法會(huì)將元素放入到隊(duì)列中并對(duì)count加1就可以了。在成功插入元素之后我們就會(huì)通過break語(yǔ)句跳出最外層的無(wú)限while循環(huán),從方法中返回。

但是如果這時(shí)候隊(duì)列已滿,那么count的值就會(huì)等于items.length,這將會(huì)導(dǎo)致我們調(diào)用Thread.sleep(200L)使當(dāng)前線程休眠200毫秒。當(dāng)線程從休眠中恢復(fù)時(shí),又會(huì)進(jìn)入下一次循環(huán),重新判斷條件count != items.length。也就是說,如果隊(duì)列沒有彈出元素使我們可以完成插入操作,那么線程就會(huì)一直處于“判斷 -> 休眠”的循環(huán)而無(wú)法從put()方法中返回,也就是進(jìn)入了“阻塞”狀態(tài)。

隨后的take()方法也是一樣的道理,只有在隊(duì)列不為空的情況下才能順利彈出元素完成任務(wù)并返回,如果隊(duì)列一直為空,調(diào)用線程就會(huì)在循環(huán)中一直等待,直到隊(duì)列中有元素插入為止。

    /**
     * 將指定元素插入隊(duì)列
     *
     * @param e 待插入的對(duì)象
     */
    public void put(Object e) throws InterruptedException {
        while (true) {
            // 直到隊(duì)列未滿時(shí)才執(zhí)行入隊(duì)操作并跳出循環(huán)
            if (count != items.length) {
                // 執(zhí)行入隊(duì)操作,將對(duì)象e實(shí)際放入隊(duì)列中
                enqueue(e);
                break;
            }

            // 隊(duì)列已滿的情況下休眠200ms
            Thread.sleep(200L);
        }
    }

    /**
     * 從隊(duì)列中彈出一個(gè)元素
     *
     * @return  被彈出的元素
     */
    public Object take() throws InterruptedException {
        while (true) {
            // 直到隊(duì)列非空時(shí)才繼續(xù)執(zhí)行后續(xù)的出隊(duì)操作并返回彈出的元素
            if (count != 0) {
                // 執(zhí)行出隊(duì)操作,將隊(duì)列中的第一個(gè)元素彈出
                return dequeue();
            }

            // 隊(duì)列為空的情況下休眠200ms
            Thread.sleep(200L);
        }
    }

在上面的put()take()方法中分別調(diào)用了入隊(duì)方法enqueue和出隊(duì)方法dequeue,那么這兩個(gè)方法到底需要如何實(shí)現(xiàn)呢?下面是這兩個(gè)方法的源代碼,我們可以看到,在入隊(duì)方法enqueue()中,總共有三步操作:

首先把指定的對(duì)象e保存到items[putIndex]中,putIndex指示的就是我們插入元素的位置。

之后,我們會(huì)將putIndex向后移一位,來(lái)確定下一次插入元素的下標(biāo)位置,如果已經(jīng)到了隊(duì)列末尾我們就會(huì)把putIndex設(shè)置為0,回到隊(duì)列的開頭。

最后,入隊(duì)操作會(huì)將count值加1,讓count值和隊(duì)列中的元素個(gè)數(shù)一致。

而出隊(duì)方法dequeue中執(zhí)行的操作則與入隊(duì)方法enqueue相反。

    /**
     * 入隊(duì)操作
     *
     * @param e 待插入的對(duì)象
     */
    private void enqueue(Object e) {
        // 將對(duì)象e放入putIndex指向的位置
        items[putIndex] = e;

        // putIndex向后移一位,如果已到末尾則返回隊(duì)列開頭(位置0)
        if (++putIndex == items.length)
            putIndex = 0;

        // 增加元素總數(shù)
        count++;
    }

    /**
     * 出隊(duì)操作
     *
     * @return  被彈出的元素
     */
    private Object dequeue() {
        // 取出takeIndex指向位置中的元素
        // 并將該位置清空
        Object e = items[takeIndex];
        items[takeIndex] = null;

        // takeIndex向后移一位,如果已到末尾則返回隊(duì)列開頭(位置0)
        if (++takeIndex == items.length)
            takeIndex = 0;

        // 減少元素總數(shù)
        count--;

        // 返回之前代碼中取出的元素e
        return e;
    }

到這里我們就可以將這個(gè)三個(gè)模塊拼接為一個(gè)完整的阻塞隊(duì)列類BlockingQueue了。完整的代碼如下,大家可以拷貝到IDE中,或者自己重新實(shí)現(xiàn)一遍,然后我們就可以開始上手用一用我們剛剛完成的阻塞隊(duì)列了。

public class BlockingQueue {

    /** 存放元素的數(shù)組 */
    private final Object[] items;

    /** 彈出元素的位置 */
    private int takeIndex;

    /** 插入元素的位置 */
    private int putIndex;

    /** 隊(duì)列中的元素總數(shù) */
    private int count;

    /**
     * 指定隊(duì)列大小的構(gòu)造器
     *
     * @param capacity  隊(duì)列大小
     */
    public BlockingQueue(int capacity) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        items = new Object[capacity];
    }

    /**
     * 入隊(duì)操作
     *
     * @param e 待插入的對(duì)象
     */
    private void enqueue(Object e) {
        // 將對(duì)象e放入putIndex指向的位置
        items[putIndex] = e;

        // putIndex向后移一位,如果已到末尾則返回隊(duì)列開頭(位置0)
        if (++putIndex == items.length)
            putIndex = 0;

        // 增加元素總數(shù)
        count++;
    }

    /**
     * 出隊(duì)操作
     *
     * @return  被彈出的元素
     */
    private Object dequeue() {
        // 取出takeIndex指向位置中的元素
        // 并將該位置清空
        Object e = items[takeIndex];
        items[takeIndex] = null;

        // takeIndex向后移一位,如果已到末尾則返回隊(duì)列開頭(位置0)
        if (++takeIndex == items.length)
            takeIndex = 0;

        // 減少元素總數(shù)
        count--;

        // 返回之前代碼中取出的元素e
        return e;
    }

    /**
     * 將指定元素插入隊(duì)列
     *
     * @param e 待插入的對(duì)象
     */
    public void put(Object e) throws InterruptedException {
        while (true) {
            // 直到隊(duì)列未滿時(shí)才執(zhí)行入隊(duì)操作并跳出循環(huán)
            if (count != items.length) {
                // 執(zhí)行入隊(duì)操作,將對(duì)象e實(shí)際放入隊(duì)列中
                enqueue(e);
                break;
            }

            // 隊(duì)列已滿的情況下休眠200ms
            Thread.sleep(200L);
        }
    }

    /**
     * 從隊(duì)列中彈出一個(gè)元素
     *
     * @return  被彈出的元素
     */
    public Object take() throws InterruptedException {
        while (true) {
            // 直到隊(duì)列非空時(shí)才繼續(xù)執(zhí)行后續(xù)的出隊(duì)操作并返回彈出的元素
            if (count != 0) {
                // 執(zhí)行出隊(duì)操作,將隊(duì)列中的第一個(gè)元素彈出
                return dequeue();
            }

            // 隊(duì)列為空的情況下休眠200ms
            Thread.sleep(200L);
        }
    }

}
測(cè)驗(yàn)阻塞隊(duì)列實(shí)現(xiàn)

既然已經(jīng)有了阻塞隊(duì)列的實(shí)現(xiàn),那么我們就寫一個(gè)測(cè)試程序來(lái)測(cè)試一下吧。下面是一個(gè)對(duì)阻塞隊(duì)列進(jìn)行并發(fā)的插入和彈出操作的測(cè)試程序,在這個(gè)程序中,會(huì)創(chuàng)建2個(gè)生產(chǎn)者線程向阻塞隊(duì)列中插入數(shù)字0~19;同時(shí)也會(huì)創(chuàng)建2個(gè)消費(fèi)者線程從阻塞隊(duì)列中彈出20個(gè)數(shù)字,并打印這些數(shù)字。而且在程序中也統(tǒng)計(jì)了整個(gè)程序的耗時(shí),會(huì)在所有子線程執(zhí)行完成之后打印出程序的總耗時(shí)。

這里我們期望這個(gè)測(cè)驗(yàn)程序能夠以任意順序輸出0~19這20個(gè)數(shù)字,然后打印出程序的總耗時(shí),那么實(shí)際執(zhí)行情況會(huì)如何呢?

public class BlockingQueueTest {

    public static void main(String[] args) throws Exception {

        // 創(chuàng)建一個(gè)大小為2的阻塞隊(duì)列
        final BlockingQueue q = new BlockingQueue(2);

        // 創(chuàng)建2個(gè)線程
        final int threads = 2;
        // 每個(gè)線程執(zhí)行10次
        final int times = 10;

        // 線程列表,用于等待所有線程完成
        List threadList = new ArrayList<>(threads * 2);
        long startTime = System.currentTimeMillis();

        // 創(chuàng)建2個(gè)生產(chǎn)者線程,向隊(duì)列中并發(fā)放入數(shù)字0到19,每個(gè)線程放入10個(gè)數(shù)字
        for (int i = 0; i < threads; ++i) {
            final int offset = i * times;
            Thread producer = new Thread(() -> {
                try {
                    for (int j = 0; j < times; ++j) {
                        q.put(new Integer(offset + j));
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });

            threadList.add(producer);
            producer.start();
        }

        // 創(chuàng)建2個(gè)消費(fèi)者線程,從隊(duì)列中彈出20次數(shù)字并打印彈出的數(shù)字
        for (int i = 0; i < threads; ++i) {
            Thread consumer = new Thread(() -> {
                try {
                    for (int j = 0; j < times; ++j) {
                        Integer element = (Integer) q.take();
                        System.out.println(element);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });

            threadList.add(consumer);
            consumer.start();
        }

        // 等待所有線程執(zhí)行完成
        for (Thread thread : threadList) {
            thread.join();
        }

        // 打印運(yùn)行耗時(shí)
        long endTime = System.currentTimeMillis();
        System.out.println(String.format("總耗時(shí):%.2fs", (endTime - startTime) / 1e3));
    }
}

在我的電腦上運(yùn)行這段程序的輸出為:

0
1
2
3
4
5
null
10
8
7
14
9
16
15
18
17
null

不僅是打印出了很多個(gè)null,而且打印出17行之后就不再打印更多數(shù)據(jù),而且程序也就一直沒有打印總耗時(shí)并結(jié)束了。為什么會(huì)發(fā)生這種情況呢?

原因就是在我們實(shí)現(xiàn)的這個(gè)阻塞隊(duì)列中完全沒有線程同步機(jī)制,所以同時(shí)并發(fā)進(jìn)行的4個(gè)線程(2個(gè)生產(chǎn)者和2個(gè)消費(fèi)者)會(huì)同時(shí)執(zhí)行阻塞隊(duì)列的put()take()方法。這就可能會(huì)導(dǎo)致各種各樣并發(fā)執(zhí)行順序?qū)е碌膯栴},比如兩個(gè)生產(chǎn)者同時(shí)對(duì)阻塞隊(duì)列進(jìn)行插入操作,有可能就會(huì)在putIndex沒更新的情況下對(duì)同一下標(biāo)位置又插入了一次數(shù)據(jù),導(dǎo)致了數(shù)據(jù)還沒被消費(fèi)就被覆蓋了;而兩個(gè)消費(fèi)者也可能會(huì)在takeIndex沒更新的情況下又獲取了一次已經(jīng)被清空的位置,導(dǎo)致打印出了null。最后因?yàn)檫@些原因都有可能會(huì)導(dǎo)致消費(fèi)者線程最后還沒有彈出20個(gè)數(shù)字count就已經(jīng)為0了,這時(shí)消費(fèi)者線程就會(huì)一直處于阻塞狀態(tài)無(wú)法退出了。

那么我們應(yīng)該如何給阻塞隊(duì)列加上線程同步措施,使它的運(yùn)行不會(huì)發(fā)生錯(cuò)誤呢?

一個(gè)線程安全的版本 使用互斥鎖來(lái)保護(hù)隊(duì)列操作

之前碰到的并發(fā)問題的核心就是多個(gè)線程同時(shí)對(duì)阻塞隊(duì)列進(jìn)行插入或彈出操作,那么我們有沒有辦法讓同一時(shí)間只能有一個(gè)線程對(duì)阻塞隊(duì)列進(jìn)行操作呢?

也許很多讀者已經(jīng)想到了,我們最常用的一種并發(fā)控制方式就是synchronized關(guān)鍵字。通過synchronized,我們可以讓一段代碼同一時(shí)間只能有一個(gè)線程進(jìn)入;如果在同一個(gè)對(duì)象上通過synchronized加鎖,那么put()take()兩個(gè)方法可以做到同一時(shí)間只能有一個(gè)線程調(diào)用兩個(gè)方法中的任意一個(gè)。比如如果有一個(gè)線程調(diào)用了put()方法插入元素,那么其他線程再調(diào)用put()方法或者take()就都會(huì)被阻塞直到前一個(gè)線程完成對(duì)put()方法的調(diào)用了。

在這里,我們只修改put()take()方法,把這兩個(gè)方法中對(duì)enqueuedequeue的調(diào)用都包裝到一個(gè)synchronized (this) {...}的語(yǔ)句塊中,保證了同一時(shí)間只能有一個(gè)線程進(jìn)入這兩個(gè)語(yǔ)句塊中的任意一個(gè)。如果對(duì)synchronized之類的線程同步機(jī)制還不熟悉的讀者,建議先看一下這篇介紹多線程同步機(jī)制的文章《多線程中那些看不見的陷阱》再繼續(xù)閱讀之后的內(nèi)容,相信會(huì)有事半功倍的效果。

    /**
     * 將指定元素插入隊(duì)列
     *
     * @param e 待插入的對(duì)象
     */
    public void put(Object e) throws InterruptedException {
        while (true) {
            synchronized (this) {
                // 直到隊(duì)列未滿時(shí)才執(zhí)行入隊(duì)操作并跳出循環(huán)
                if (count != items.length) {
                    // 執(zhí)行入隊(duì)操作,將對(duì)象e實(shí)際放入隊(duì)列中
                    enqueue(e);
                    break;
                }
            }

            // 隊(duì)列已滿的情況下休眠200ms
            Thread.sleep(200L);
        }
    }

    /**
     * 從隊(duì)列中彈出一個(gè)元素
     *
     * @return  被彈出的元素
     */
    public Object take() throws InterruptedException {
        while (true) {
            synchronized (this) {
                // 直到隊(duì)列非空時(shí)才繼續(xù)執(zhí)行后續(xù)的出隊(duì)操作并返回彈出的元素
                if (count != 0) {
                    // 執(zhí)行出隊(duì)操作,將隊(duì)列中的第一個(gè)元素彈出
                    return dequeue();
                }
            }

            // 隊(duì)列為空的情況下休眠200ms
            Thread.sleep(200L);
        }
    }
再次測(cè)試

我們?cè)賮?lái)試一試這個(gè)新的阻塞隊(duì)列實(shí)現(xiàn),在我的電腦上測(cè)試程序的輸出如下:

0
1
2
3
10
11
4
5
6
12
13
14
15
7
8
9
16
17
18
19
總耗時(shí):1.81s

這下看起來(lái)結(jié)果就對(duì)了,而且多跑了幾次也都能穩(wěn)定輸出所有0~19的20個(gè)數(shù)字??雌饋?lái)非常棒,我們成功了,來(lái)給自己鼓個(gè)掌吧!

但是仔細(xì)那么一看,好像最后的耗時(shí)是不是有一些高了?雖然“1.81秒”也不是太長(zhǎng)的時(shí)間,但是好像一般計(jì)算機(jī)程序做這么一點(diǎn)事情只要一眨眼的功夫就能完成才對(duì)呀。為什么這個(gè)阻塞隊(duì)列會(huì)這么慢呢?

一個(gè)更快的阻塞隊(duì)列

讓我們先來(lái)診斷一下之前的阻塞隊(duì)列中到底是什么導(dǎo)致了效率的降低,因?yàn)?b>put()和take()方法是阻塞隊(duì)列的核心,所以我們自然從這兩個(gè)方法看起。在這兩個(gè)方法里,我們都看到了同一段代碼Thread.sleep(200L),這段代碼會(huì)讓put()take()方法分別在隊(duì)列已滿和隊(duì)列為空的情況下進(jìn)入一次固定的200毫秒的休眠,防止線程占用過多的CPU資源。但是如果隊(duì)列在這200毫秒里發(fā)生了變化,那么線程也還是在休眠狀態(tài)無(wú)法馬上對(duì)變化做出響應(yīng)。比如如果一個(gè)調(diào)用put()方法的線程因?yàn)殛?duì)列已滿而進(jìn)入了200毫秒的休眠,那么即使隊(duì)列已經(jīng)被消費(fèi)者線程清空了,它也仍然會(huì)忠實(shí)地等到200毫秒之后才會(huì)重新嘗試向隊(duì)列中插入元素,中間的這些時(shí)間就都被浪費(fèi)了。

但是如果我們?nèi)サ暨@段休眠的代碼,又會(huì)導(dǎo)致CPU的使用率過高的問題。那么有沒有一種方法可以平衡兩者的利弊,同時(shí)得到兩種情況的好處又沒有各自的缺點(diǎn)呢?

使用條件變量?jī)?yōu)化阻塞喚醒

為了完成上面這個(gè)困難的任務(wù),既要馬兒跑又要馬兒不吃草。那么我們就需要有一種方法,既讓線程進(jìn)入休眠狀態(tài)不再占用CPU,但是在隊(duì)列發(fā)生改變時(shí)又能及時(shí)地被喚醒來(lái)重試之前的操作了。既然用了對(duì)象鎖synchronized,那么我們就找找有沒有與之相搭配的同步機(jī)制可以實(shí)現(xiàn)我們的目標(biāo)。

Object類,也就是所有Java類的基類里,我們找到了三個(gè)有意思的方法Object.wait()Object.notify()、Object.notifyAll()。這三個(gè)方法是需要搭配在一起使用的,其功能與操作系統(tǒng)層面的條件變量類似。條件變量是這樣的一種線程同步工具:

每個(gè)條件變量都會(huì)有一個(gè)對(duì)應(yīng)的互斥鎖,要調(diào)用條件變量的wait()方法,首先需要持有條件變量對(duì)應(yīng)的這個(gè)互斥鎖。之后,在調(diào)用條件變量的wait()方法時(shí),首先會(huì)釋放已持有的這個(gè)互斥鎖,然后當(dāng)前線程進(jìn)入休眠狀態(tài),等待被Object.notify()或者Object.notifyAll()方法喚醒;

調(diào)用Object.notify()或者Object.notifyAll()方法可以喚醒因?yàn)?b>Object.wait()進(jìn)入休眠狀態(tài)的線程,區(qū)別是Object.notify()方法只會(huì)喚醒一個(gè)線程,而Object.notifyAll()會(huì)喚醒所有線程。

因?yàn)槲覀冎暗拇a中通過synchronized獲取了對(duì)應(yīng)于this引用的對(duì)象鎖,所以自然也就要用this.wait()this.notify()、this.notifyAll()方法來(lái)使用與這個(gè)對(duì)象鎖對(duì)應(yīng)的條件變量了。下面是使用條件變量改造后的put()take()方法。還是和之前一樣,我們首先以put()方法為例分析具體的改動(dòng)。首先,我們?nèi)サ袅俗钔鈱拥膚hile循環(huán),然后我們把Thread.sleep替換為了this.wait(),以此在隊(duì)列已滿時(shí)進(jìn)入休眠狀態(tài),等待隊(duì)列中的元素被彈出后再繼續(xù)。在隊(duì)列滿足條件,入隊(duì)操作成功后,我們通過調(diào)用this.notifyAll()喚醒了可能在等待隊(duì)列非空條件的調(diào)用take()的線程。take()方法的實(shí)現(xiàn)與put()也基本類似,只是操作相反。

    /**
     * 將指定元素插入隊(duì)列
     *
     * @param e 待插入的對(duì)象
     */
    public void put(Object e) throws InterruptedException {
        synchronized (this) {
            if (count == items.length) {
                // 隊(duì)列已滿時(shí)進(jìn)入休眠
                this.wait();
            }

            // 執(zhí)行入隊(duì)操作,將對(duì)象e實(shí)際放入隊(duì)列中
            enqueue(e);

            // 喚醒所有休眠等待的進(jìn)程
            this.notifyAll();
        }
    }

    /**
     * 從隊(duì)列中彈出一個(gè)元素
     *
     * @return  被彈出的元素
     */
    public Object take() throws InterruptedException {
        synchronized (this) {
            if (count == 0) {
                // 隊(duì)列為空時(shí)進(jìn)入休眠
                this.wait();
            }

            // 執(zhí)行出隊(duì)操作,將隊(duì)列中的第一個(gè)元素彈出
            Object e = dequeue();

            // 喚醒所有休眠等待的進(jìn)程
            this.notifyAll();

            return e;
        }
    }

但是我們?cè)跍y(cè)試程序運(yùn)行之后發(fā)現(xiàn)結(jié)果好像又出現(xiàn)了問題,在我電腦上的輸出如下:

0
19
null
null
null
null
null
null
null
null
null
18
null
null
null
null
null
null
null
null
總耗時(shí):0.10s

雖然我們解決了耗時(shí)問題,現(xiàn)在的耗時(shí)已經(jīng)只有0.10s了,但是結(jié)果中又出現(xiàn)了大量的null,我們的阻塞隊(duì)列好像又出現(xiàn)了正確性問題。那么問題出在哪呢?建議讀者可以先自己嘗試分析一下,這樣有助于大家積累解決多線程并發(fā)問題的能力。

while循環(huán)判斷條件是否滿足

經(jīng)過分析,我們看到,在調(diào)用this.wait()后,如果線程被this.notifyAll()方法喚醒,那么就會(huì)直接開始直接入隊(duì)/出隊(duì)操作,而不會(huì)再次檢查count的值是否滿足條件。而在我們的程序中,當(dāng)隊(duì)列為空時(shí),可能會(huì)有很多消費(fèi)者線程在等待插入元素。此時(shí),如果有一個(gè)生產(chǎn)者線程插入了一個(gè)元素并調(diào)用了this.notifyAll(),則所有消費(fèi)者線程都會(huì)被喚醒,然后依次執(zhí)行出隊(duì)操作,那么第一個(gè)消費(fèi)者線程之后的所有線程拿到的都將是null值。而且同時(shí),在這種情況下,每一個(gè)執(zhí)行完出隊(duì)操作的消費(fèi)者線程也同樣會(huì)調(diào)用this.notifyAll()方法,這樣即使隊(duì)列中已經(jīng)沒有元素了,后續(xù)進(jìn)入等待的消費(fèi)者線程仍然會(huì)被自己的同類所喚醒,消費(fèi)根本不存在的元素,最終只能返回null。

所以要解決這個(gè)問題,核心就是在線程從this.wait()中被喚醒時(shí)也仍然要重新檢查一遍count值是否滿足要求,如果count不滿足要求,那么當(dāng)前線程仍然調(diào)用this.wait()回到等待狀態(tài)當(dāng)中去繼續(xù)休眠。而我們是沒辦法預(yù)知程序在第幾次判斷條件時(shí)可以得到滿足條件的count值從而繼續(xù)執(zhí)行的,所以我們必須讓程序循環(huán)執(zhí)行“判斷條件 -> 不滿足條件繼續(xù)休眠”這樣的流程,直到count滿足條件為止。那么我們就可以使用一個(gè)while循環(huán)來(lái)包裹this.wait()調(diào)用和對(duì)count的條件判斷,以此達(dá)到這個(gè)目的。

下面是具體的實(shí)現(xiàn)代碼,我們?cè)谄渲邪裞ount條件(隊(duì)列未滿/非空)作為while條件,然后在count值還不滿足要求的情況下調(diào)用this.wait()方法使當(dāng)前線程進(jìn)入等待狀態(tài)繼續(xù)休眠。

    /**
     * 將指定元素插入隊(duì)列
     *
     * @param e 待插入的對(duì)象
     */
    public void put(Object e) throws InterruptedException {
        synchronized (this) {
            while (count == items.length) {
                // 隊(duì)列已滿時(shí)進(jìn)入休眠
                this.wait();
            }

            // 執(zhí)行入隊(duì)操作,將對(duì)象e實(shí)際放入隊(duì)列中
            enqueue(e);

            // 喚醒所有休眠等待的進(jìn)程
            this.notifyAll();
        }
    }

    /**
     * 從隊(duì)列中彈出一個(gè)元素
     *
     * @return  被彈出的元素
     */
    public Object take() throws InterruptedException {
        synchronized (this) {
            while (count == 0) {
                // 隊(duì)列為空時(shí)進(jìn)入休眠
                this.wait();
            }

            // 執(zhí)行出隊(duì)操作,將隊(duì)列中的第一個(gè)元素彈出
            Object e = dequeue();

            // 喚醒所有休眠等待的進(jìn)程
            this.notifyAll();

            return e;
        }
    }

再次運(yùn)行我們的測(cè)試程序,在我的電腦上得到了如下的輸出:

0
10
1
2
11
12
13
3
4
14
5
6
15
16
7
17
8
18
9
19
總耗時(shí):0.11s

耗時(shí)只有0.11s,而且結(jié)果也是正確的,看來(lái)我們得到了一個(gè)又快又好的阻塞隊(duì)列實(shí)現(xiàn)。這是一個(gè)里程碑式的版本,我們實(shí)現(xiàn)了一個(gè)真正可以在程序代碼中使用的阻塞隊(duì)列,到這里可以說你已經(jīng)學(xué)會(huì)了如何實(shí)現(xiàn)一個(gè)阻塞隊(duì)列了,讓我們?yōu)樽约汗膫€(gè)掌吧。

當(dāng)時(shí)進(jìn)度條出賣了我,這篇文章還有不少內(nèi)容。既然我們已經(jīng)學(xué)會(huì)如何實(shí)現(xiàn)一個(gè)真正可用的阻塞隊(duì)列了,我們?yōu)槭裁催€要繼續(xù)看這么多內(nèi)容呢?別慌,雖然我們已經(jīng)實(shí)現(xiàn)了一個(gè)真正可用的版本,但是如果我們更進(jìn)一步的話就可以實(shí)現(xiàn)一個(gè)JDK級(jí)別的高強(qiáng)度版本了,這聽起來(lái)是不是非常的誘人?讓我們繼續(xù)我們的旅程吧。

一個(gè)更安全的版本

我們之前的版本中使用這些同步機(jī)制:synchronized (this)、this.wait()、this.notifyAll(),這些同步機(jī)制都和當(dāng)前對(duì)象this有關(guān)。因?yàn)?b>synchronized (obj)可以使用任意對(duì)象對(duì)應(yīng)的對(duì)象鎖,而Object.wati()Object.notifyAll()方法又都是public方法。也就是說不止在阻塞隊(duì)列類內(nèi)部可以使用這個(gè)阻塞隊(duì)列對(duì)象的對(duì)象鎖及其對(duì)應(yīng)的條件變量,在外部的代碼中也可以任意地獲取阻塞隊(duì)列對(duì)象上的對(duì)象鎖和對(duì)應(yīng)的條件變量,那么就有可能發(fā)生外部代碼濫用阻塞隊(duì)列對(duì)象上的對(duì)象鎖導(dǎo)致阻塞隊(duì)列性能下降甚至是發(fā)生死鎖的情況。那我們有沒有什么辦法可以讓阻塞隊(duì)列在這方面變得更安全呢?

使用顯式鎖

最直接的方式當(dāng)然是請(qǐng)出JDK在1.5之后引入的代替synchronized關(guān)鍵字的顯式鎖ReentrantLock類了。ReentrantLock類是一個(gè)可重入互斥鎖,互斥指的是和synchronized一樣,同一時(shí)間只能有一個(gè)線程持有鎖,其他獲取鎖的線程都必須等待持有鎖的線程釋放該鎖。而可重入指的就是同一個(gè)線程可以重復(fù)獲取同一個(gè)鎖,如果在獲取鎖時(shí)這個(gè)鎖已經(jīng)被當(dāng)前線程所持有了,那么這個(gè)獲取鎖的操作仍然會(huì)直接成功。

一般我們使用ReentrantLock的方法如下:

lock.lock();
try {
    做一些操作
}
finally {
    lock.unlock();
}

上面的lock變量就是一個(gè)ReentrantLock類型的對(duì)象。在這段代碼中,釋放鎖的操作lock.unlock()被放在了finally塊中,這是為了保證線程在獲取到鎖之后,不論出現(xiàn)異?;蛘呤裁刺厥馇闆r都能保證正確地釋放互斥鎖。如果不這么做就可能會(huì)導(dǎo)致持有鎖的線程異常退出后仍然持有該鎖,其他需要獲取同一個(gè)鎖的線程就永遠(yuǎn)運(yùn)行不了。

那么在我們的阻塞隊(duì)列中應(yīng)該如何用ReentrantLock類來(lái)改寫呢?

首先,我們顯然要為我們的阻塞隊(duì)列類添加一個(gè)實(shí)例變量lock來(lái)保存用于在不同線程間實(shí)現(xiàn)互斥訪問的ReentrantLock鎖。然后我們要將原來(lái)的synchronized(this) {...}格式的代碼修改為上面使用ReentrantLock進(jìn)行互斥訪問保護(hù)的實(shí)現(xiàn)形式,也就是lock.lock(); try {...} finally {lock.unlock();}這樣的形式。

但是原來(lái)與synchronized所加的對(duì)象鎖相對(duì)應(yīng)的條件變量使用方法this.wait()this.notifyAll()應(yīng)該如何修改呢?ReentrantLock已經(jīng)為你做好了準(zhǔn)備,我們可以直接調(diào)用lock.newCondition()方法來(lái)創(chuàng)建一個(gè)與互斥鎖lock相對(duì)應(yīng)的條件變量。然后為了在不同線程中都能訪問到這個(gè)條件變量,我們同樣要新增一個(gè)實(shí)例變量condition來(lái)保存這個(gè)新創(chuàng)建的條件變量對(duì)象。然后我們?cè)瓉?lái)使用的this.wait()就需要修改為condition.await(),而this.notifyAll()就修改為了condition.signalAll()

    /** 顯式鎖 */
    private final ReentrantLock lock = new ReentrantLock();

    /** 鎖對(duì)應(yīng)的條件變量 */
    private final Condition condition = lock.newCondition();
    
    /**
     * 將指定元素插入隊(duì)列
     *
     * @param e 待插入的對(duì)象
     */
    public void put(Object e) throws InterruptedException {
        lock.lockInterruptibly();
        try {
            while (count == items.length) {
                // 隊(duì)列已滿時(shí)進(jìn)入休眠
                // 使用與顯式鎖對(duì)應(yīng)的條件變量
                condition.await();
            }

            // 執(zhí)行入隊(duì)操作,將對(duì)象e實(shí)際放入隊(duì)列中
            enqueue(e);

            // 通過條件變量喚醒休眠線程
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }

    /**
     * 從隊(duì)列中彈出一個(gè)元素
     *
     * @return  被彈出的元素
     */
    public Object take() throws InterruptedException {
        lock.lockInterruptibly();
        try {
            while (count == 0) {
                // 隊(duì)列為空時(shí)進(jìn)入休眠
                // 使用與顯式鎖對(duì)應(yīng)的條件變量
                condition.await();
            }

            // 執(zhí)行出隊(duì)操作,將隊(duì)列中的第一個(gè)元素彈出
            Object e = dequeue();

            // 通過條件變量喚醒休眠線程
            condition.signalAll();

            return e;
        } finally {
            lock.unlock();
        }
    }

到這里,我們就完成了使用顯式鎖ReentrantLock所需要做的所有改動(dòng)了。整個(gè)過程中并不涉及任何邏輯的變更,我們只是把synchronized (this) {...}修改為了lock.lock() try {...} finally {lock.unlock();},把this.wait()修改為了condition.await(),把this.notifyAll()修改為了condition.signalAll()。就這樣,我們的鎖和條件變量因?yàn)槭?b>private字段,所以外部的代碼就完全無(wú)法訪問了,這讓我們的阻塞隊(duì)列變得更加安全,是時(shí)候可以提供給其他人使用了。

但是這個(gè)版本的阻塞隊(duì)列仍然還有很大的優(yōu)化空間,繼續(xù)閱讀下一篇文章,相信你就可以實(shí)現(xiàn)出JDK級(jí)別的阻塞隊(duì)列了。

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/74359.html

相關(guān)文章

  • 01實(shí)現(xiàn)自己阻塞隊(duì)列(下)

    摘要:在上一篇文章從到實(shí)現(xiàn)自己的阻塞隊(duì)列上中,我們已經(jīng)實(shí)現(xiàn)了一個(gè)可以使用的阻塞隊(duì)列版本。插入鎖隊(duì)列未滿的條件變量彈出鎖隊(duì)列非空的條件變量最后我們要對(duì)和方法中的調(diào)用做出一些調(diào)整。 在上一篇文章《從0到1實(shí)現(xiàn)自己的阻塞隊(duì)列(上)》中,我們已經(jīng)實(shí)現(xiàn)了一個(gè)可以使用的阻塞隊(duì)列版本。在這篇文章中,我們可以繼續(xù)我們的冒險(xiǎn)之旅,將我們的阻塞隊(duì)列提升到接近JDK版本的水平上。 更進(jìn)一步優(yōu)化效率 我們一直使用的...

    XFLY 評(píng)論0 收藏0
  • 01玩轉(zhuǎn)線程池

    摘要:提交任務(wù)當(dāng)創(chuàng)建了一個(gè)線程池之后我們就可以將任務(wù)提交到線程池中執(zhí)行了。提交任務(wù)到線程池中相當(dāng)簡(jiǎn)單,我們只要把原來(lái)傳入類構(gòu)造器的對(duì)象傳入線程池的方法或者方法就可以了。 我們一般不會(huì)選擇直接使用線程類Thread進(jìn)行多線程編程,而是使用更方便的線程池來(lái)進(jìn)行任務(wù)的調(diào)度和管理。線程池就像共享單車,我們只要在我們有需要的時(shí)候去獲取就可以了。甚至可以說線程池更棒,我們只需要把任務(wù)提交給它,它就會(huì)在合...

    darkerXi 評(píng)論0 收藏0
  • 『并發(fā)包入坑指北』之阻塞隊(duì)列

    摘要:自己實(shí)現(xiàn)在自己實(shí)現(xiàn)之前先搞清楚阻塞隊(duì)列的幾個(gè)特點(diǎn)基本隊(duì)列特性先進(jìn)先出。消費(fèi)隊(duì)列空時(shí)會(huì)阻塞直到寫入線程寫入了隊(duì)列數(shù)據(jù)后喚醒消費(fèi)線程。最終的隊(duì)列大小為,可見線程也是安全的。 showImg(https://segmentfault.com/img/remote/1460000018811340); 前言 較長(zhǎng)一段時(shí)間以來(lái)我都發(fā)現(xiàn)不少開發(fā)者對(duì) jdk 中的 J.U.C(java.util.c...

    nicercode 評(píng)論0 收藏0
  • BlockingQueue與Condition原理解析

    摘要:最后一直調(diào)用函數(shù)判斷節(jié)點(diǎn)是否被轉(zhuǎn)移到隊(duì)列上,也就是中等待獲取鎖的隊(duì)列。這樣的話,函數(shù)中調(diào)用函數(shù)就會(huì)返回,導(dǎo)致函數(shù)進(jìn)入最后一步重新獲取鎖的狀態(tài)。函數(shù)其實(shí)就做了一件事情,就是不斷嘗試調(diào)用函數(shù),將隊(duì)首的一個(gè)節(jié)點(diǎn)轉(zhuǎn)移到隊(duì)列中,直到轉(zhuǎn)移成功。 ?我在前段時(shí)間寫了一篇關(guān)于AQS源碼解析的文章AbstractQueuedSynchronizer超詳細(xì)原理解析,在文章里邊我說JUC包中的大部分多線程相...

    TalkingData 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<