成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

AbstractQueuedSynchronizer原理分析

JasinYip / 296人閱讀

摘要:當(dāng)前節(jié)點(diǎn)擁有的線程。方法返回值表示在線程等待過程中,是否有另一個(gè)線程調(diào)用該線程的方法,發(fā)起中斷。如果前一個(gè)節(jié)點(diǎn)狀態(tài)是,那么直接返回,阻塞當(dāng)前線程如果前一個(gè)節(jié)點(diǎn)狀態(tài)是大于就是,表示前一個(gè)

AQS是JUC鎖框架中最重要的類,通過它來實(shí)現(xiàn)獨(dú)占鎖和共享鎖的。本章是對(duì)AbstractQueuedSynchronizer源碼的完全解析,分為四個(gè)部分介紹:


CLH隊(duì)列即同步隊(duì)列:儲(chǔ)存著所有等待鎖的線程

獨(dú)占鎖

共享鎖

Condition條件

注: 還有一個(gè)AbstractQueuedLongSynchronizer類,它與AQS功能和實(shí)現(xiàn)幾乎一樣,唯一不同的是AQLS中代表鎖被獲取次數(shù)的成員變量state類型是long長整類型,而AQS中該成員變量是int類型。

一. CLH隊(duì)列(線程同步隊(duì)列)

因?yàn)楂@取鎖是有條件的,沒有獲取鎖的線程就要阻塞等待,那么就要存儲(chǔ)這些等待的線程。

在AQS中我們使用CLH隊(duì)列儲(chǔ)存這些等待的線程,但它并不是直接儲(chǔ)存線程,而是儲(chǔ)存擁有線程的node節(jié)點(diǎn)。所以先介紹重要內(nèi)部類Node。
1.1 內(nèi)部類Node
   static final class Node {
        // 共享模式的標(biāo)記
        static final Node SHARED = new Node();
        // 獨(dú)占模式的標(biāo)記
        static final Node EXCLUSIVE = null;

        // waitStatus變量的值,標(biāo)志著線程被取消
        static final int CANCELLED =  1;
        // waitStatus變量的值,標(biāo)志著后繼線程(即隊(duì)列中此節(jié)點(diǎn)之后的節(jié)點(diǎn))需要被阻塞.(用于獨(dú)占鎖)
        static final int SIGNAL    = -1;
        // waitStatus變量的值,標(biāo)志著線程在Condition條件上等待阻塞.(用于Condition的await等待)
        static final int CONDITION = -2;
        // waitStatus變量的值,標(biāo)志著下一個(gè)acquireShared方法線程應(yīng)該被允許。(用于共享鎖)
        static final int PROPAGATE = -3;

        // 標(biāo)記著當(dāng)前節(jié)點(diǎn)的狀態(tài),默認(rèn)狀態(tài)是0, 小于0的狀態(tài)都是有特殊作用,大于0的狀態(tài)表示已取消
        volatile int waitStatus;

        // prev和next實(shí)現(xiàn)一個(gè)雙向鏈表
        volatile Node prev;
        volatile Node next;

        // 該節(jié)點(diǎn)擁有的線程
        volatile Thread thread;

        // 可能有兩種作用:1. 表示下一個(gè)在Condition條件上等待的節(jié)點(diǎn)
        // 2. 表示是共享模式或者獨(dú)占模式,注意第一種情況節(jié)點(diǎn)一定是共享模式
        Node nextWaiter;

        // 是不是共享模式
        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        // 返回前一個(gè)節(jié)點(diǎn)prev,如果為null,則拋出NullPointerException異常
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        // 用于創(chuàng)建鏈表頭head,或者共享模式SHARED
        Node() {
        }

        // 使用在addWaiter方法中
        Node(Thread thread, Node mode) {
            this.nextWaiter = mode;
            this.thread = thread;
        }

        // 使用在Condition條件中
        Node(Thread thread, int waitStatus) {
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

重要的成員屬性:


waitStatus: 表示當(dāng)前節(jié)點(diǎn)的狀態(tài),默認(rèn)狀態(tài)是0。總共有五個(gè)值CANCELLED、SIGNAL、CONDITION、PROPAGATE以及0。

prev和next:記錄著當(dāng)前節(jié)點(diǎn)前一個(gè)節(jié)點(diǎn)和后一個(gè)節(jié)點(diǎn)的引用。

thread:當(dāng)前節(jié)點(diǎn)擁有的線程。當(dāng)擁有鎖的線程釋放鎖的時(shí)候,可能會(huì)調(diào)用LockSupport.unpark(thread),喚醒這個(gè)被阻塞的線程。

nextWaiter:如果是SHARED,表示當(dāng)前節(jié)點(diǎn)是共享模式,如果是null,當(dāng)前節(jié)點(diǎn)是獨(dú)占模式,如果是其他值,當(dāng)前節(jié)點(diǎn)也是獨(dú)占模式,不過這個(gè)值也是Condition隊(duì)列的下一個(gè)節(jié)點(diǎn)。

注意:通過Node我們可以實(shí)現(xiàn)兩個(gè)隊(duì)列,一是通過prev和next實(shí)現(xiàn)CLH隊(duì)列(線程同步隊(duì)列,雙向隊(duì)列),二是nextWaiter實(shí)現(xiàn)Condition條件上的等待線程隊(duì)列(單向隊(duì)列),后一個(gè)我們?cè)贑ondition中介紹。

1.2 操作CLH隊(duì)列 1.2.1 存儲(chǔ)CLH隊(duì)列
    // CLH隊(duì)列頭
    private transient volatile Node head;

    // CLH隊(duì)列尾
    private transient volatile Node tail;
1.2.2 設(shè)置CLH隊(duì)列頭head
    /**
     * 通過CAS函數(shù)設(shè)置head值,僅僅在enq方法中調(diào)用
     */
    private final boolean compareAndSetHead(Node update) {
        return unsafe.compareAndSwapObject(this, headOffset, null, update);
    }

這個(gè)方法只在enq方法中調(diào)用,通過CAS函數(shù)設(shè)置head值,保證多線程安全

   // 重新設(shè)置隊(duì)列頭head,它只在acquire系列的方法中調(diào)用
    private void setHead(Node node) {
        head = node;
        // 線程也沒有意義了,因?yàn)樵摼€程已經(jīng)獲取到鎖了
        node.thread = null;
        // 前一個(gè)節(jié)點(diǎn)已經(jīng)沒有意義了
        node.prev = null;
    }

這個(gè)方法只在acquire系列的方法中調(diào)用,重新設(shè)置head,表示移除一些等待線程節(jié)點(diǎn)。

1.2.3 設(shè)置CLH隊(duì)列尾tail
    /**
     * 通過CAS函數(shù)設(shè)置tail值,僅僅在enq方法中調(diào)用
     */
    private final boolean compareAndSetTail(Node expect, Node update) {
        return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
    }

這個(gè)方法只在enq方法中調(diào)用,通過CAS函數(shù)設(shè)置tail值,保證多線程安全

1.2.4 將一個(gè)節(jié)點(diǎn)插入到CLH隊(duì)列尾
   // 向隊(duì)列尾插入新節(jié)點(diǎn),如果隊(duì)列沒有初始化,就先初始化。返回原先的隊(duì)列尾節(jié)點(diǎn)
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            // t為null,表示隊(duì)列為空,先初始化隊(duì)列
            if (t == null) {
                // 采用CAS函數(shù)即原子操作方式,設(shè)置隊(duì)列頭head值。
                // 如果成功,再將head值賦值給鏈表尾tail。如果失敗,表示head值已經(jīng)被其他線程,那么就進(jìn)入循環(huán)下一次
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                // 新添加的node節(jié)點(diǎn)的前一個(gè)節(jié)點(diǎn)prev指向原來的隊(duì)列尾tail
                node.prev = t;
                // 采用CAS函數(shù)即原子操作方式,設(shè)置新隊(duì)列尾tail值。
                if (compareAndSetTail(t, node)) {
                    // 設(shè)置老的隊(duì)列尾tail的下一個(gè)節(jié)點(diǎn)next指向新添加的節(jié)點(diǎn)node
                    t.next = node;
                    return t;
                }
            }
        }
    }

這個(gè)方法向CLH隊(duì)列尾插入一個(gè)新節(jié)點(diǎn),如果隊(duì)列為空,就先創(chuàng)建隊(duì)列再插入新節(jié)點(diǎn),返回老的隊(duì)列尾節(jié)點(diǎn)。

1.2.5 將當(dāng)前線程添加到CLH隊(duì)列尾
   // 通過給定的模式mode(獨(dú)占或者共享)為當(dāng)前線程創(chuàng)建新節(jié)點(diǎn),并插入隊(duì)列中
    private Node addWaiter(Node mode) {
        // 為當(dāng)前線程創(chuàng)建新的節(jié)點(diǎn)
        Node node = new Node(Thread.currentThread(), mode);
        Node pred = tail;
        // 如果隊(duì)列已經(jīng)創(chuàng)建,就將新節(jié)點(diǎn)插入隊(duì)列尾。
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        // 如果隊(duì)列沒有創(chuàng)建,通過enq方法創(chuàng)建隊(duì)列,并插入新的節(jié)點(diǎn)。
        enq(node);
        return node;
    }

為當(dāng)前線程創(chuàng)建一個(gè)新節(jié)點(diǎn),再插入到CLH隊(duì)列尾,返回新創(chuàng)建的節(jié)點(diǎn)。

二. 獨(dú)占鎖

我們先想一想獨(dú)占鎖的功能是什么?

獨(dú)占鎖至少有兩個(gè)功能:

獲取鎖的功能。 當(dāng)多個(gè)線程一起獲取鎖的時(shí)候,只有一個(gè)線程能獲取到鎖,其他線程必須在當(dāng)前位置阻塞等待。

釋放鎖的功能。獲取鎖的線程釋放鎖資源,而且還必須能喚醒正在等待鎖資源的一個(gè)線程。

帶著這些疑惑,我們來看AQS中是怎么實(shí)現(xiàn)的。

2.1 獲取獨(dú)占鎖的方法 2.1.1 acquire方法
   /**
     * 獲取獨(dú)占鎖。如果沒有獲取到,線程就會(huì)阻塞等待,直到獲取鎖。不會(huì)響應(yīng)中斷異常
     * @param arg
     */
    public final void acquire(int arg) {
        // 1. 先調(diào)用tryAcquire方法,嘗試獲取獨(dú)占鎖,返回true,表示獲取到鎖,不需要執(zhí)行acquireQueued方法。
        // 2. 調(diào)用acquireQueued方法,先調(diào)用addWaiter方法為當(dāng)前線程創(chuàng)建一個(gè)節(jié)點(diǎn)node,并插入隊(duì)列中,
        // 然后調(diào)用acquireQueued方法去獲取鎖,如果不成功,就會(huì)讓當(dāng)前線程阻塞,當(dāng)鎖釋放時(shí)才會(huì)被喚醒。
        // acquireQueued方法返回值表示在線程等待過程中,是否有另一個(gè)線程調(diào)用該線程的interrupt方法,發(fā)起中斷。
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

將這個(gè)方法一下格式,大家就很好理解了

   public final void acquire(int arg) {
        // 1.先調(diào)用tryAcquire方法,嘗試獲取獨(dú)占鎖,返回true則直接返回
        if (tryAcquire(arg)) return;
        // 2. 調(diào)用addWaiter方法為當(dāng)前線程創(chuàng)建一個(gè)節(jié)點(diǎn)node,并插入隊(duì)列中
        Node node = addWaiter(Node.EXCLUSIVE);
        // 調(diào)用acquireQueued方法去獲取鎖,
        // acquireQueued方法返回值表示在線程等待過程中,是否有另一個(gè)線程調(diào)用該線程的interrupt方法,發(fā)起中斷。
        boolean interrupted = acquireQueued(node, arg);
        // 如果interrupted為true,則當(dāng)前線程要發(fā)起中斷請(qǐng)求
        if (interrupted) {
            selfInterrupt();
        }
    }
2.1.2 tryAcquire方法
    // 嘗試去獲取獨(dú)占鎖,立即返回。如果返回true表示獲取鎖成功。
    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

如果子類想實(shí)現(xiàn)獨(dú)占鎖,則必須重寫這個(gè)方法,否則拋出異常。這個(gè)方法的作用是當(dāng)前線程嘗試獲取鎖,如果獲取到鎖,就會(huì)返回true,并更改鎖資源。沒有獲取到鎖返回false。

注:這個(gè)方法是立即返回的,不會(huì)阻塞當(dāng)前線程

下面是ReentrantLock中FairSync的tryAcquire方法實(shí)現(xiàn)

       // 嘗試獲取鎖,與非公平鎖最大的不同就是調(diào)用hasQueuedPredecessors()方法
        // hasQueuedPredecessors方法返回true,表示等待線程隊(duì)列中有一個(gè)線程在當(dāng)前線程之前,
        // 根據(jù)公平鎖的規(guī)則,當(dāng)前線程不能獲取鎖。
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            // 獲取鎖的記錄狀態(tài)
            int c = getState();
            // 如果c==0表示當(dāng)前鎖是空閑的
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            // 判斷當(dāng)前線程是不是獨(dú)占鎖的線程
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                // 更改鎖的記錄狀態(tài)
                setState(nextc);
                return true;
            }
            return false;
        }
2.1.3 acquireQueued方法

addWaiter方法已經(jīng)在上面講解了。acquireQueued方法作用就是獲取鎖,如果沒有獲取到,就讓當(dāng)前線程阻塞等待。

     /**
     * 想要獲取鎖的 acquire系列方法,都會(huì)這個(gè)方法來獲取鎖
     * 循環(huán)通過tryAcquire方法不斷去獲取鎖,如果沒有獲取成功,
     * 就有可能調(diào)用parkAndCheckInterrupt方法,讓當(dāng)前線程阻塞
     * @param node 想要獲取鎖的節(jié)點(diǎn)
     * @param arg
     * @return 返回true,表示在線程等待的過程中,線程被中斷了
     */
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            // 表示線程在等待過程中,是否被中斷了
            boolean interrupted = false;
            // 通過死循環(huán),直到node節(jié)點(diǎn)的線程獲取到鎖,才返回
            for (;;) {
                // 獲取node的前一個(gè)節(jié)點(diǎn)
                final Node p = node.predecessor();
                // 如果前一個(gè)節(jié)點(diǎn)是隊(duì)列頭head,并且嘗試獲取鎖成功
                // 那么當(dāng)前線程就不需要阻塞等待,繼續(xù)執(zhí)行
                if (p == head && tryAcquire(arg)) {
                    // 將節(jié)點(diǎn)node設(shè)置為新的隊(duì)列頭
                    setHead(node);
                    // help GC
                    p.next = null;
                    // 不需要調(diào)用cancelAcquire方法
                    failed = false;
                    return interrupted;
                }
                // 當(dāng)p節(jié)點(diǎn)的狀態(tài)是Node.SIGNAL時(shí),就會(huì)調(diào)用parkAndCheckInterrupt方法,阻塞node線程
                // node線程被阻塞,有兩種方式喚醒,
                // 1.是在unparkSuccessor(Node node)方法,會(huì)喚醒被阻塞的node線程,返回false
                // 2.node線程被調(diào)用了interrupt方法,線程被喚醒,返回true
                // 在這里只是簡(jiǎn)單地將interrupted = true,沒有跳出for的死循環(huán),繼續(xù)嘗試獲取鎖
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            // failed為true,表示發(fā)生異常,非正常退出
            // 則將node節(jié)點(diǎn)的狀態(tài)設(shè)置成CANCELLED,表示node節(jié)點(diǎn)所在線程已取消,不需要喚醒了。
            if (failed)
                cancelAcquire(node);
        }
    }

主要流程:


通過for (;;)死循環(huán),直到node節(jié)點(diǎn)的線程獲取到鎖,才跳出循環(huán)。

獲取node節(jié)點(diǎn)的前一個(gè)節(jié)點(diǎn)p。

當(dāng)前一個(gè)節(jié)點(diǎn)p時(shí)CLH隊(duì)列頭節(jié)點(diǎn)時(shí),調(diào)用tryAcquire方法嘗試去獲取鎖,如果獲取成功,就將節(jié)點(diǎn)node設(shè)置成CLH隊(duì)列頭節(jié)點(diǎn)(相當(dāng)于移除節(jié)點(diǎn)node和之前的節(jié)點(diǎn))然后return返回。
注意:只有當(dāng)node節(jié)點(diǎn)的前一個(gè)節(jié)點(diǎn)是隊(duì)列頭節(jié)點(diǎn)時(shí),才會(huì)嘗試獲取鎖,所以獲取鎖是有順序的,按照添加到CLH隊(duì)列時(shí)的順序。

調(diào)用shouldParkAfterFailedAcquire方法,來決定是否要阻塞當(dāng)前線程。

調(diào)用parkAndCheckInterrupt方法,阻塞當(dāng)前線程。

如果當(dāng)前線程發(fā)生異常,非正常退出,那么會(huì)在finally模塊中調(diào)用cancelAcquire(node)方法,取消當(dāng)前節(jié)點(diǎn)狀態(tài)。

注意:這里當(dāng)嘗試獲取鎖失敗時(shí),并沒有立即阻塞當(dāng)前線程,但是因?yàn)樵趂or (;;)死循環(huán)里,會(huì)繼續(xù)循環(huán),方法不會(huì)返回。

2.1.4 shouldParkAfterFailedAcquire方法

這個(gè)方法的返回值決定是否要阻塞當(dāng)前線程

    /**
     * 根據(jù)前一個(gè)節(jié)點(diǎn)pred的狀態(tài),來判斷當(dāng)前線程是否應(yīng)該被阻塞
     * @param pred : node節(jié)點(diǎn)的前一個(gè)節(jié)點(diǎn)
     * @param node
     * @return 返回true 表示當(dāng)前線程應(yīng)該被阻塞,之后應(yīng)該會(huì)調(diào)用parkAndCheckInterrupt方法來阻塞當(dāng)前線程
     */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            // 如果前一個(gè)pred的狀態(tài)是Node.SIGNAL,那么直接返回true,當(dāng)前線程應(yīng)該被阻塞
            return true;
        if (ws > 0) {
            // 如果前一個(gè)節(jié)點(diǎn)狀態(tài)是Node.CANCELLED(大于0就是CANCELLED),
            // 表示前一個(gè)節(jié)點(diǎn)所在線程已經(jīng)被喚醒了,要從CLH隊(duì)列中移除CANCELLED的節(jié)點(diǎn)。
            // 所以從pred節(jié)點(diǎn)一直向前查找直到找到不是CANCELLED狀態(tài)的節(jié)點(diǎn),并把它賦值給node.prev,
            // 表示node節(jié)點(diǎn)的前一個(gè)節(jié)點(diǎn)已經(jīng)改變。
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            // 此時(shí)前一個(gè)節(jié)點(diǎn)pred的狀態(tài)只能是0或者PROPAGATE,不可能是CONDITION狀態(tài)
            // CONDITION(這個(gè)是特殊狀態(tài),只在condition列表中節(jié)點(diǎn)中存在,CLH隊(duì)列中不存在這個(gè)狀態(tài)的節(jié)點(diǎn))
            // 將前一個(gè)節(jié)點(diǎn)pred的狀態(tài)設(shè)置成Node.SIGNAL,這樣在下一次循環(huán)時(shí),就是直接阻塞當(dāng)前線程
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

我們發(fā)現(xiàn)是根據(jù)前一個(gè)節(jié)點(diǎn)的狀態(tài),來決定是否阻塞當(dāng)前線程。而前一個(gè)節(jié)點(diǎn)狀態(tài)是在哪里改變的呢?驚奇地發(fā)現(xiàn)也是在這個(gè)方法中改變的。


如果前一個(gè)節(jié)點(diǎn)狀態(tài)是Node.SIGNAL,那么直接返回true,阻塞當(dāng)前線程

如果前一個(gè)節(jié)點(diǎn)狀態(tài)是Node.CANCELLED(大于0就是CANCELLED),表示前一個(gè)節(jié)點(diǎn)所在線程已經(jīng)被喚醒了,要從CLH隊(duì)列中移除CANCELLED的節(jié)點(diǎn)。所以從pred節(jié)點(diǎn)一直向前查找直到找到不是CANCELLED狀態(tài)的節(jié)點(diǎn)。

并把它賦值給node.prev,表示node節(jié)點(diǎn)的前一個(gè)節(jié)點(diǎn)已經(jīng)改變。在acquireQueued方法中進(jìn)行下一次循環(huán)。

不是前面兩種狀態(tài),那么就將前一個(gè)節(jié)點(diǎn)狀態(tài)設(shè)置成Node.SIGNAL,表示需要阻塞當(dāng)前線程,這樣再下一次循環(huán)時(shí),就會(huì)直接阻塞當(dāng)前線程。

2.1.5 parkAndCheckInterrupt 方法

阻塞當(dāng)前線程,線程被喚醒后返回當(dāng)前線程中斷狀態(tài)

    /**
     * 阻塞當(dāng)前線程,線程被喚醒后返回當(dāng)前線程中斷狀態(tài)
     */
    private final boolean parkAndCheckInterrupt() {
        // 通過LockSupport.park方法,阻塞當(dāng)前線程
        LockSupport.park(this);
        // 當(dāng)前線程被喚醒后,返回當(dāng)前線程中斷狀態(tài)
        return Thread.interrupted();
    }

通過LockSupport.park(this)阻塞當(dāng)前線程。

2.1.6 cancelAcquire方法

將node節(jié)點(diǎn)的狀態(tài)設(shè)置成CANCELLED,表示node節(jié)點(diǎn)所在線程已取消,不需要喚醒了。

    // 將node節(jié)點(diǎn)的狀態(tài)設(shè)置成CANCELLED,表示node節(jié)點(diǎn)所在線程已取消,不需要喚醒了。
    private void cancelAcquire(Node node) {
        // 如果node為null,就直接返回
        if (node == null)
            return;

        //
        node.thread = null;

        // 跳過那些已取消的節(jié)點(diǎn),在隊(duì)列中找到在node節(jié)點(diǎn)前面的第一次狀態(tài)不是已取消的節(jié)點(diǎn)
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;

        // 記錄pred原來的下一個(gè)節(jié)點(diǎn),用于CAS函數(shù)更新時(shí)使用
        Node predNext = pred.next;

        // Can use unconditional write instead of CAS here.
        // After this atomic step, other Nodes can skip past us.
        // Before, we are free of interference from other threads.
        // 將node節(jié)點(diǎn)狀態(tài)設(shè)置為已取消Node.CANCELLED;
        node.waitStatus = Node.CANCELLED;

        // 如果node節(jié)點(diǎn)是隊(duì)列尾節(jié)點(diǎn),那么就將pred節(jié)點(diǎn)設(shè)置為新的隊(duì)列尾節(jié)點(diǎn)
        if (node == tail && compareAndSetTail(node, pred)) {
            // 并且設(shè)置pred節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)next為null
            compareAndSetNext(pred, predNext, null);
        } else {
            // If successor needs signal, try to set pred"s next-link
            // so it will get one. Otherwise wake it up to propagate.
            int ws;
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                unparkSuccessor(node);
            }

            node.next = node; // help GC
        }
    }
2.1.7 小結(jié)

tryAcquire方法嘗試獲取獨(dú)占鎖,由子類實(shí)現(xiàn)

acquireQueued方法,方法內(nèi)采用for死循環(huán),先調(diào)用tryAcquire方法,嘗試獲取鎖,如果成功,則跳出循環(huán)方法返回。 如果失敗,就可能阻塞當(dāng)前線程。當(dāng)別的線程鎖釋放的時(shí)候,可能會(huì)喚醒這個(gè)線程,然后再次進(jìn)行循環(huán)判斷,調(diào)用tryAcquire方法,嘗試獲取鎖。

如果發(fā)生異常,該線程被喚醒,所以要取消節(jié)點(diǎn)node的狀態(tài),因?yàn)楣?jié)點(diǎn)node所在線程不是在阻塞狀態(tài)了。

注:還有其他獲取獨(dú)占鎖的方法,例如doAcquireInterruptibly、doAcquireNanos都在文章最后的源碼解析中,這里就不做解析了,具體原理都差不多。

2.2 釋放獨(dú)占鎖的方法 2.2.1 release方法
    // 在獨(dú)占鎖模式下,釋放鎖的操作
    public final boolean release(int arg) {
        // 調(diào)用tryRelease方法,嘗試去釋放鎖,由子類具體實(shí)現(xiàn)
        if (tryRelease(arg)) {
            Node h = head;
            // 如果隊(duì)列頭節(jié)點(diǎn)的狀態(tài)不是0,那么隊(duì)列中就可能存在需要喚醒的等待節(jié)點(diǎn)。
            // 還記得我們?cè)赼cquireQueued(final Node node, int arg)獲取鎖的方法中,如果節(jié)點(diǎn)node沒有獲取到鎖,
            // 那么我們會(huì)將節(jié)點(diǎn)node的前一個(gè)節(jié)點(diǎn)狀態(tài)設(shè)置為Node.SIGNAL,然后調(diào)用parkAndCheckInterrupt方法
            // 將節(jié)點(diǎn)node所在線程阻塞。
            // 在這里就是通過unparkSuccessor方法,進(jìn)而調(diào)用LockSupport.unpark(s.thread)方法,喚醒被阻塞的線程
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

調(diào)用tryRelease方法釋放鎖資源,返回true表示鎖資源完全釋放了,返回false表示還持有鎖資源。

如果鎖資源完全被釋放了,就要喚醒等待鎖資源的線程。調(diào)用unparkSuccessor方法喚醒一個(gè)等待線程

注:CLH隊(duì)列頭節(jié)點(diǎn)h為null,表示隊(duì)列為空,沒有節(jié)點(diǎn)。節(jié)點(diǎn)h的狀態(tài)是0,表示沒有CLH隊(duì)列中沒有被阻塞的線程。

2.2.2 tryRelease方法
     // 嘗試去釋放當(dāng)前線程持有的獨(dú)占鎖,立即返回。如果返回true表示釋放鎖成功
    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }

如果子類想實(shí)現(xiàn)獨(dú)占鎖,則必須重寫這個(gè)方法,否則拋出異常。作用是釋放當(dāng)前線程持有的鎖,返回true表示已經(jīng)完全釋放鎖資源,返回false,表示還持有鎖資源。

注:對(duì)于獨(dú)占鎖來說,同一時(shí)間只能有一個(gè)線程持有這個(gè)鎖,但是這個(gè)線程可以重復(fù)地獲取鎖,因?yàn)楸绘i住的模塊,再次進(jìn)入另一個(gè)被這個(gè)鎖鎖住的模塊,是允許的。這個(gè)就做可重入性,所以對(duì)于可重入的鎖釋放操作,也需要多次。

下面是ReentrantLock中Sync的tryRelease方法實(shí)現(xiàn)

   protected final boolean tryRelease(int releases) {
            // c表示新的鎖的記錄狀態(tài)
            int c = getState() - releases;
            // 如果當(dāng)前線程不是獨(dú)占鎖的線程,就拋出IllegalMonitorStateException異常
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            // 標(biāo)志是否可以釋放鎖
            boolean free = false;
            // 當(dāng)新的鎖的記錄狀態(tài)為0時(shí),表示可以釋放鎖
            if (c == 0) {
                free = true;
                // 設(shè)置獨(dú)占鎖的線程為null
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }
2.2.3 unparkSuccessor方法
   // 喚醒node節(jié)點(diǎn)的下一個(gè)非取消狀態(tài)的節(jié)點(diǎn)所在線程(即waitStatus<=0)
    private void unparkSuccessor(Node node) {
        // 獲取node節(jié)點(diǎn)的狀態(tài)
        int ws = node.waitStatus;
        // 如果小于0,就將狀態(tài)重新設(shè)置為0,表示這個(gè)node節(jié)點(diǎn)已經(jīng)完成了
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        // 下一個(gè)節(jié)點(diǎn)
        Node s = node.next;
        // 如果下一個(gè)節(jié)點(diǎn)為null,或者狀態(tài)是已取消,那么就要尋找下一個(gè)非取消狀態(tài)的節(jié)點(diǎn)
        if (s == null || s.waitStatus > 0) {
            // 先將s設(shè)置為null,s不是非取消狀態(tài)的節(jié)點(diǎn)
            s = null;
            // 從隊(duì)列尾向前遍歷,直到遍歷到node節(jié)點(diǎn)
            for (Node t = tail; t != null && t != node; t = t.prev)
                // 因?yàn)槭菑暮笙蚯氨闅v,所以不斷覆蓋找到的值,這樣才能得到node節(jié)點(diǎn)后下一個(gè)非取消狀態(tài)的節(jié)點(diǎn)
                if (t.waitStatus <= 0)
                    s = t;
        }
        // 如果s不為null,表示存在非取消狀態(tài)的節(jié)點(diǎn)。那么調(diào)用LockSupport.unpark方法,喚醒這個(gè)節(jié)點(diǎn)的線程
        if (s != null)
            LockSupport.unpark(s.thread);
    }

這個(gè)方法的作用是喚醒node節(jié)點(diǎn)的下一個(gè)非取消狀態(tài)的節(jié)點(diǎn)所在線程。


將node節(jié)點(diǎn)的狀態(tài)設(shè)置為0

尋找到下一個(gè)非取消狀態(tài)的節(jié)點(diǎn)s

如果節(jié)點(diǎn)s不為null,則調(diào)用LockSupport.unpark(s.thread)方法喚醒s所在線程。

注:?jiǎn)拘丫€程也是有順序的,就是添加到CLH隊(duì)列線程的順序。

2.2.4 小結(jié)

調(diào)用tryRelease方法去釋放當(dāng)前持有的鎖資源。

如果完全釋放了鎖資源,那么就調(diào)用unparkSuccessor方法,去喚醒一個(gè)等待鎖的線程。

三. 共享鎖

共享鎖與獨(dú)占鎖相比,共享鎖可能被多個(gè)線程共同持有

3.1 獲取共享鎖的方法 3.1.1 acquireShared方法
    // 獲取共享鎖
    public final void acquireShared(int arg) {
        // 嘗試去獲取共享鎖,如果返回值小于0表示獲取共享鎖失敗
        if (tryAcquireShared(arg) < 0)
            // 調(diào)用doAcquireShared方法去獲取共享鎖
            doAcquireShared(arg);
    }

調(diào)用tryAcquireShared方法嘗試獲取共享鎖,如果返回值小于0表示獲取共享鎖失敗.則繼續(xù)調(diào)用doAcquireShared方法獲取共享鎖。

3.1.2 tryAcquireShared方法
     // 嘗試去獲取共享鎖,立即返回。返回值大于等于0,表示獲取共享鎖成功
    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }

如果子類想實(shí)現(xiàn)共享鎖,則必須重寫這個(gè)方法,否則拋出異常。作用是嘗試獲取共享鎖,返回值大于等于0,表示獲取共享鎖成功。

下面是ReentrantReadWriteLock中Sync的tryReleaseShared方法實(shí)現(xiàn),這個(gè)我們會(huì)在ReentrantReadWriteLock章節(jié)中重點(diǎn)介紹的。

      protected final boolean tryReleaseShared(int unused) {
            Thread current = Thread.currentThread();
            // 當(dāng)前線程是第一個(gè)獲取讀鎖(共享鎖)的線程
            if (firstReader == current) {
                // 將firstReaderHoldCount減一,如果就是1,那么表示該線程需要釋放讀鎖(共享鎖),
                // 將firstReader設(shè)置為null
                if (firstReaderHoldCount == 1)
                    firstReader = null;
                else
                    firstReaderHoldCount--;
            } else {
                HoldCounter rh = cachedHoldCounter;
                // 獲取當(dāng)前線程的HoldCounter變量
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                // 將rh變量的count減一,
                int count = rh.count;
                if (count <= 1) {
                    readHolds.remove();
                    // count <= 0表示當(dāng)前線程就沒有獲取到讀鎖(共享鎖),這里釋放就拋出異常。
                    if (count <= 0)
                        throw unmatchedUnlockException();
                }
                --rh.count;
            }
            for (;;) {
                int c = getState();
                // 因?yàn)樽x鎖是利用高16位儲(chǔ)存的,低16位的數(shù)據(jù)是要屏蔽的,
                // 所以這里減去SHARED_UNIT(65536),相當(dāng)于減一
                // 表示一個(gè)讀鎖已經(jīng)釋放
                int nextc = c - SHARED_UNIT;
                // 利用CAS函數(shù)重新設(shè)置state值
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
3.1.3 doAcquireShared方法
    /**
     * 獲取共享鎖,獲取失敗,則會(huì)阻塞當(dāng)前線程,直到獲取共享鎖返回
     * @param arg the acquire argument
     */
    private void doAcquireShared(int arg) {
        // 為當(dāng)前線程創(chuàng)建共享鎖節(jié)點(diǎn)node
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                // 如果節(jié)點(diǎn)node前一個(gè)節(jié)點(diǎn)是同步隊(duì)列頭節(jié)點(diǎn)。就會(huì)調(diào)用tryAcquireShared方法嘗試獲取共享鎖
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    // 如果返回值大于0,表示獲取共享鎖成功
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                // 如果節(jié)點(diǎn)p的狀態(tài)是Node.SIGNAL,就是調(diào)用parkAndCheckInterrupt方法阻塞當(dāng)前線程
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
             // failed為true,表示發(fā)生異常,
            // 則將node節(jié)點(diǎn)的狀態(tài)設(shè)置成CANCELLED,表示node節(jié)點(diǎn)所在線程已取消,不需要喚醒了
            if (failed)
                cancelAcquire(node);
        }
    }

這個(gè)方法與獨(dú)占鎖的acquireQueued方法相比較,不同的有三點(diǎn):

doAcquireShared方法,調(diào)用addWaiter(Node.SHARED)方法,為當(dāng)前線程創(chuàng)建一個(gè)共享模式的節(jié)點(diǎn)node。而acquireQueued方法是由外部傳遞來的。

doAcquireShared方法沒有返回值,acquireQueued方法會(huì)返回布爾類型的值,是當(dāng)前線程中斷標(biāo)志位值

最大的區(qū)別是重新設(shè)置CLH隊(duì)列頭的方法不一樣。doAcquireShared方法調(diào)用setHeadAndPropagate方法,而acquireQueued方法調(diào)用setHead方法。

3.1.4 setHeadAndPropagate方法
     // 重新設(shè)置CLH隊(duì)列頭,如果CLH隊(duì)列頭的下一個(gè)節(jié)點(diǎn)為null或者共享模式,
    // 那么就要喚醒共享鎖上等待的線程
    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head;
        // 設(shè)置新的同步隊(duì)列頭head
        setHead(node);
        // 如果propagate大于0,
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            // 獲取新的CLH隊(duì)列頭的下一個(gè)節(jié)點(diǎn)s
            Node s = node.next;
            // 如果節(jié)點(diǎn)s是空或者共享模式節(jié)點(diǎn),那么就要喚醒共享鎖上等待的線程
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }
3.2 釋放共享鎖的方法 3.2.1 releaseShared方法
     // 釋放共享鎖
    public final boolean releaseShared(int arg) {
        // 嘗試釋放共享鎖
        if (tryReleaseShared(arg)) {
            // 喚醒等待共享鎖的線程
            doReleaseShared();
            return true;
        }
        return false;
    }
3.2.2 tryReleaseShared方法
     // 嘗試去釋放共享鎖
    protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
    }

如果子類想實(shí)現(xiàn)共享鎖,則必須重寫這個(gè)方法,否則拋出異常。作用是釋放當(dāng)前線程持有的鎖,返回true表示已經(jīng)完全釋放鎖資源,返回false,表示還持有鎖資源。

3.2.3 doReleaseShared方法
     // 會(huì)喚醒等待共享鎖的線程
    private void doReleaseShared() {
        for (;;) {
            // 將同步隊(duì)列頭賦值給節(jié)點(diǎn)h
            Node h = head;
            // 如果節(jié)點(diǎn)h不為null,且不等于同步隊(duì)列尾
            if (h != null && h != tail) {
                // 得到節(jié)點(diǎn)h的狀態(tài)
                int ws = h.waitStatus;
                // 如果狀態(tài)是Node.SIGNAL,就要喚醒節(jié)點(diǎn)h后繼節(jié)點(diǎn)的線程
                if (ws == Node.SIGNAL) {
                    // 將節(jié)點(diǎn)h的狀態(tài)設(shè)置成0,如果設(shè)置失敗,就繼續(xù)循環(huán),再試一次。
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    // 喚醒節(jié)點(diǎn)h后繼節(jié)點(diǎn)的線程
                    unparkSuccessor(h);
                }
                // 如果節(jié)點(diǎn)h的狀態(tài)是0,就設(shè)置ws的狀態(tài)是PROPAGATE。
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            // 如果同步隊(duì)列頭head節(jié)點(diǎn)發(fā)生改變,繼續(xù)循環(huán),
            // 如果沒有改變,就跳出循環(huán)
            if (h == head)
                break;
        }
    }
四. Condition條件

Condition是為了實(shí)現(xiàn)線程之間相互等待的問題。注意Condition對(duì)象只能在獨(dú)占鎖中才能使用。

考慮一下情況,有兩個(gè)線程,生產(chǎn)者線程,消費(fèi)者線程。當(dāng)消費(fèi)者線程消費(fèi)東西時(shí),發(fā)現(xiàn)沒有東西,這時(shí)它就要等待,讓生產(chǎn)者線程生產(chǎn)東西后,在通知它消費(fèi)。
因?yàn)椴僮鞯氖峭粋€(gè)資源,所以要加鎖,防止多線程沖突。而鎖在同一時(shí)間只能有一個(gè)線程持有,所以消費(fèi)者在讓線程等待前,必須釋放鎖,且喚醒另一個(gè)等待鎖的線程。
那么在AQS中Condition條件又是如何實(shí)現(xiàn)的呢?

首先內(nèi)部存在一個(gè)Condition隊(duì)列,存儲(chǔ)著所有在此Condition條件等待的線程。

await系列方法:讓當(dāng)前持有鎖的線程釋放鎖,并喚醒一個(gè)在CLH隊(duì)列上等待鎖的線程,再為當(dāng)前線程創(chuàng)建一個(gè)node節(jié)點(diǎn),插入到Condition隊(duì)列(注意不是插入到CLH隊(duì)列中)

signal系列方法:其實(shí)這里沒有喚醒任何線程,而是將Condition隊(duì)列上的等待節(jié)點(diǎn)插入到CLH隊(duì)列中,所以當(dāng)持有鎖的線程執(zhí)行完畢釋放鎖時(shí),就會(huì)喚醒CLH隊(duì)列中的一個(gè)線程,這個(gè)時(shí)候才會(huì)喚醒線程。

4.1 await系列方法 4.1.1 await方法
     /**
         * 讓當(dāng)前持有鎖的線程阻塞等待,并釋放鎖。如果有中斷請(qǐng)求,則拋出InterruptedException異常
         * @throws InterruptedException
         */
        public final void await() throws InterruptedException {
            // 如果當(dāng)前線程中斷標(biāo)志位是true,就拋出InterruptedException異常
            if (Thread.interrupted())
                throw new InterruptedException();
            // 為當(dāng)前線程創(chuàng)建新的Node節(jié)點(diǎn),并且將這個(gè)節(jié)點(diǎn)插入到Condition隊(duì)列中了
            Node node = addConditionWaiter();
            // 釋放當(dāng)前線程占有的鎖,并喚醒CLH隊(duì)列一個(gè)等待線程
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            // 如果節(jié)點(diǎn)node不在同步隊(duì)列中(注意不是Condition隊(duì)列)
            while (!isOnSyncQueue(node)) {
                // 阻塞當(dāng)前線程,那么怎么喚醒這個(gè)線程呢?
                // 首先我們必須調(diào)用signal或者signalAll將這個(gè)節(jié)點(diǎn)node加入到同步隊(duì)列。
                // 只有這樣unparkSuccessor(Node node)方法,才有可能喚醒被阻塞的線程
                LockSupport.park(this);
                // 如果當(dāng)前線程產(chǎn)生中斷請(qǐng)求,就跳出循環(huán)
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            // 如果節(jié)點(diǎn)node已經(jīng)在同步隊(duì)列中了,獲取同步鎖,只有得到鎖才能繼續(xù)執(zhí)行,否則線程繼續(xù)阻塞等待
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            // 清除Condition隊(duì)列中狀態(tài)不是Node.CONDITION的節(jié)點(diǎn)
            if (node.nextWaiter != null)
                unlinkCancelledWaiters();
            // 是否要拋出異常,或者發(fā)出中斷請(qǐng)求
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

方法流程:

addConditionWaiter方法:為當(dāng)前線程創(chuàng)建新的Node節(jié)點(diǎn),并且將這個(gè)節(jié)點(diǎn)插入到Condition隊(duì)列中了

fullyRelease方法:釋放當(dāng)前線程占有的鎖,并喚醒CLH隊(duì)列一個(gè)等待線程

isOnSyncQueue 方法:如果返回false,表示節(jié)點(diǎn)node不在CLH隊(duì)列中,即沒有調(diào)用過 signal系列方法,所以調(diào)用LockSupport.park(this)方法阻塞當(dāng)前線程。

如果跳出while循環(huán),表示節(jié)點(diǎn)node已經(jīng)在CLH隊(duì)列中,那么調(diào)用acquireQueued方法去獲取鎖。

清除Condition隊(duì)列中狀態(tài)不是Node.CONDITION的節(jié)點(diǎn)

4.1.2 addConditionWaiter方法

為當(dāng)前線程創(chuàng)建新的Node節(jié)點(diǎn),并且將這個(gè)節(jié)點(diǎn)插入到Condition隊(duì)列中了

    private Node addConditionWaiter() {
            Node t = lastWaiter;
            // 如果Condition隊(duì)列尾節(jié)點(diǎn)的狀態(tài)不是Node.CONDITION
            if (t != null && t.waitStatus != Node.CONDITION) {
                // 清除Condition隊(duì)列中,狀態(tài)不是Node.CONDITION的節(jié)點(diǎn),
                // 并且可能會(huì)重新設(shè)置firstWaiter和lastWaiter
                unlinkCancelledWaiters();
                // 重新將Condition隊(duì)列尾賦值給t
                t = lastWaiter;
            }
            // 為當(dāng)前線程創(chuàng)建一個(gè)狀態(tài)為Node.CONDITION的節(jié)點(diǎn)
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            // 如果t為null,表示Condition隊(duì)列為空,將node節(jié)點(diǎn)賦值給鏈表頭
            if (t == null)
                firstWaiter = node;
            else
                // 將新節(jié)點(diǎn)node插入到Condition隊(duì)列尾
                t.nextWaiter = node;
            // 將新節(jié)點(diǎn)node設(shè)置為新的Condition隊(duì)列尾
            lastWaiter = node;
            return node;
        }
4.1.3 fullyRelease方法

釋放當(dāng)前線程占有的鎖,并喚醒CLH隊(duì)列一個(gè)等待線程

    /**
     * 釋放當(dāng)前線程占有的鎖,并喚醒CLH隊(duì)列一個(gè)等待線程
     * 如果失敗就拋出異常,設(shè)置node節(jié)點(diǎn)的狀態(tài)是Node.CANCELLED
     * @return
     */
    final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();
            // 釋放當(dāng)前線程占有的鎖
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }
4.1.4 isOnSyncQueue

節(jié)點(diǎn)node是不是在CLH隊(duì)列中

    // 節(jié)點(diǎn)node是不是在CLH隊(duì)列中
    final boolean isOnSyncQueue(Node node) {
        // 如果node的狀態(tài)是Node.CONDITION,或者node沒有前一個(gè)節(jié)點(diǎn)prev,
        // 那么返回false,節(jié)點(diǎn)node不在同步隊(duì)列中
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        // 如果node有下一個(gè)節(jié)點(diǎn)next,那么它一定在同步隊(duì)列中
        if (node.next != null) // If has successor, it must be on queue
            return true;
        // 從同步隊(duì)列中查找節(jié)點(diǎn)node
        return findNodeFromTail(node);
    }

    // 在同步隊(duì)列中從后向前查找節(jié)點(diǎn)node,如果找到返回true,否則返回false
    private boolean findNodeFromTail(Node node) {
        Node t = tail;
        for (;;) {
            if (t == node)
                return true;
            if (t == null)
                return false;
            t = t.prev;
        }
    }
4.1.5 acquireQueued方法

獲取獨(dú)占鎖,這個(gè)在獨(dú)占鎖章節(jié)已經(jīng)說過

4.1.6 unlinkCancelledWaiters 方法

清除Condition隊(duì)列中狀態(tài)不是Node.CONDITION的節(jié)點(diǎn)

    private void unlinkCancelledWaiters() {
            // condition隊(duì)列頭賦值給t
            Node t = firstWaiter;
            // 這個(gè)trail節(jié)點(diǎn),只是起輔助作用
            Node trail = null;
            while (t != null) {
                //得到下一個(gè)節(jié)點(diǎn)next。當(dāng)節(jié)點(diǎn)是condition時(shí)候,nextWaiter表示condition隊(duì)列的下一個(gè)節(jié)點(diǎn)
                Node next = t.nextWaiter;
                // 如果節(jié)點(diǎn)t的狀態(tài)不是CONDITION,那么該節(jié)點(diǎn)就要從condition隊(duì)列中移除
                if (t.waitStatus != Node.CONDITION) {
                    // 將節(jié)點(diǎn)t的nextWaiter設(shè)置為null
                    t.nextWaiter = null;
                    // 如果trail為null,表示原先的condition隊(duì)列頭節(jié)點(diǎn)實(shí)效,需要設(shè)置新的condition隊(duì)列頭
                    if (trail == null)
                        firstWaiter = next;
                    else
                        // 將節(jié)點(diǎn)t從condition隊(duì)列中移除,因?yàn)楦淖兞艘玫闹赶?,從condition隊(duì)列中已經(jīng)找不到節(jié)點(diǎn)t了
                        trail.nextWaiter = next;
                    // 如果next為null,表示原先的condition隊(duì)列尾節(jié)點(diǎn)也實(shí)效,重新設(shè)置隊(duì)列尾節(jié)點(diǎn)
                    if (next == null)
                        lastWaiter = trail;
                }
                else
                    // 遍歷到的有效節(jié)點(diǎn)
                    trail = t;
                // 將next賦值給t,遍歷完整個(gè)condition隊(duì)列
                t = next;
            }
        }
4.1.7 reportInterruptAfterWait方法
    /**
         * 如果interruptMode是THROW_IE,就拋出InterruptedException異常
         * 如果interruptMode是REINTERRUPT,則當(dāng)前線程再發(fā)出中斷請(qǐng)求
         * 否則就什么都不做
         */
        private void reportInterruptAfterWait(int interruptMode)
            throws InterruptedException {
            if (interruptMode == THROW_IE)
                throw new InterruptedException();
            else if (interruptMode == REINTERRUPT)
                selfInterrupt();
        }
4.2 signal系列方法 4.2.1 signal方法
       // 如果condition隊(duì)列不為空,將condition隊(duì)列頭節(jié)點(diǎn)插入到同步隊(duì)列中
        public final void signal() {
            // 如果當(dāng)前線程不是獨(dú)占鎖線程,就拋出IllegalMonitorStateException異常
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();

            // 將Condition隊(duì)列頭賦值給節(jié)點(diǎn)first
            Node first = firstWaiter;
            if (first != null)
                //  將Condition隊(duì)列中的first節(jié)點(diǎn)插入到CLH隊(duì)列中
                doSignal(first);
        }

如果condition隊(duì)列不為空,就調(diào)用doSignal方法將condition隊(duì)列頭節(jié)點(diǎn)插入到CLH隊(duì)列中。

4.2.2 doSignal方法
        // 將Condition隊(duì)列中的first節(jié)點(diǎn)插入到CLH隊(duì)列中
        private void doSignal(Node first) {
            do {
                // 原先的Condition隊(duì)列頭節(jié)點(diǎn)取消,所以重新賦值Condition隊(duì)列頭節(jié)點(diǎn)
                // 如果新的Condition隊(duì)列頭節(jié)點(diǎn)為null,表示Condition隊(duì)列為空了
                // ,所以也要設(shè)置Condition隊(duì)列尾lastWaiter為null
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                // 取消first節(jié)點(diǎn)nextWaiter引用
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }

為什么使用while循環(huán),因?yàn)橹挥惺荖ode.CONDITION狀態(tài)的節(jié)點(diǎn)才能插入CLH隊(duì)列,如果不是這個(gè)狀態(tài),那么循環(huán)Condition隊(duì)列下一個(gè)節(jié)點(diǎn)。

4.2.3 transferForSignal方法
      // 返回true表示節(jié)點(diǎn)node插入到同步隊(duì)列中,返回false表示節(jié)點(diǎn)node沒有插入到同步隊(duì)列中
    final boolean transferForSignal(Node node) {
        // 如果節(jié)點(diǎn)node的狀態(tài)不是Node.CONDITION,或者更新狀態(tài)失敗,
        // 說明該node節(jié)點(diǎn)已經(jīng)插入到同步隊(duì)列中,所以直接返回false
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        // 將節(jié)點(diǎn)node插入到同步隊(duì)列中,p是原先同步隊(duì)列尾節(jié)點(diǎn),也是node節(jié)點(diǎn)的前一個(gè)節(jié)點(diǎn)
        Node p = enq(node);
        int ws = p.waitStatus;
        // 如果前一個(gè)節(jié)點(diǎn)是已取消狀態(tài),或者不能將它設(shè)置成Node.SIGNAL狀態(tài)。
        // 就說明節(jié)點(diǎn)p之后也不會(huì)發(fā)起喚醒下一個(gè)node節(jié)點(diǎn)線程的操作,
        // 所以這里直接調(diào)用 LockSupport.unpark(node.thread)方法,喚醒節(jié)點(diǎn)node所在線程
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

狀態(tài)不是 Node.CONDITION的節(jié)點(diǎn),是不能從Condition隊(duì)列中插入到CLH隊(duì)列中。直接返回false

調(diào)用enq方法,將節(jié)點(diǎn)node插入到同步隊(duì)列中,p是原先同步隊(duì)列尾節(jié)點(diǎn),也是node節(jié)點(diǎn)的前一個(gè)節(jié)點(diǎn)

如果前一個(gè)節(jié)點(diǎn)是已取消狀態(tài),或者不能將它設(shè)置成Node.SIGNAL狀態(tài)。那么就要LockSupport.unpark(node.thread)方法喚醒node節(jié)點(diǎn)所在線程。

4.2.4 signalAll 方法
     // 將condition隊(duì)列中所有的節(jié)點(diǎn)都插入到同步隊(duì)列中
        public final void signalAll() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignalAll(first);
        }
4.2.5 doSignalAll方法
    /**
         * 將condition隊(duì)列中所有的節(jié)點(diǎn)都插入到同步隊(duì)列中
         * @param first condition隊(duì)列頭節(jié)點(diǎn)
         */
        private void doSignalAll(Node first) {
            // 表示將condition隊(duì)列設(shè)置為空
            lastWaiter = firstWaiter = null;
            do {
                // 得到condition隊(duì)列的下一個(gè)節(jié)點(diǎn)
                Node next = first.nextWaiter;
                first.nextWaiter = null;
                // 將節(jié)點(diǎn)first插入到同步隊(duì)列中
                transferForSignal(first);
                first = next;
                // 循環(huán)遍歷condition隊(duì)列中所有的節(jié)點(diǎn)
            } while (first != null);
        }

循環(huán)遍歷整個(gè)condition隊(duì)列,調(diào)用transferForSignal方法,將節(jié)點(diǎn)插入到CLH隊(duì)列中。

4.3 小結(jié)

Condition只能使用在獨(dú)占鎖中。它內(nèi)部有一個(gè)Condition隊(duì)列記錄所有在Condition條件等待的線程(即就是調(diào)用await系列方法后等待的線程).
await系列方法:會(huì)讓當(dāng)前線程釋放持有的鎖,并喚醒在CLH隊(duì)列上的一個(gè)等待鎖的線程,再將當(dāng)前線程插入到Condition隊(duì)列中(注意不是CLH隊(duì)列)
signal系列方法:并不是喚醒線程,而是將Condition隊(duì)列中的節(jié)點(diǎn)插入到CLH隊(duì)列中。

總結(jié)

使用AQS類來實(shí)現(xiàn)獨(dú)占鎖和共享鎖:

內(nèi)部有一個(gè)CLH隊(duì)列,用來記錄所有等待鎖的線程

通過 acquire系列方法用來獲取獨(dú)占鎖,獲取失敗,則阻塞當(dāng)前線程

通過release方法用來釋放獨(dú)占鎖,釋放成功,則會(huì)喚醒一個(gè)等待獨(dú)占鎖的線程。

通過acquireShared系列方法用來獲取共享鎖。

通過releaseShared方法用來釋放共享鎖。

通過Condition來實(shí)現(xiàn)線程之間相互等待的。

示例
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

// 實(shí)現(xiàn)一個(gè)可重入的獨(dú)占鎖, 必須復(fù)寫tryAcquire和tryRelease方法
class Sync extends AbstractQueuedSynchronizer {
    // 嘗試獲取獨(dú)占鎖, 利用鎖的獲取次數(shù)state屬性
    @Override
    protected boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        // 獲取鎖的記錄狀態(tài)state
        int c = getState();
        // 如果c==0表示當(dāng)前鎖是空閑的
        if (c == 0) {
            // 通過CAS原子操作方式設(shè)置鎖的狀態(tài),如果為true,表示當(dāng)前線程獲取的鎖,
            // 為false,鎖的狀態(tài)被其他線程更改,當(dāng)前線程獲取的鎖失敗
            if (compareAndSetState(0, acquires)) {
                // 設(shè)置當(dāng)前線程為獨(dú)占鎖的線程
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        // 判斷當(dāng)前線程是不是獨(dú)占鎖的線程,因?yàn)槭强芍厝腈i
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            // 更改鎖的記錄狀態(tài)
            setState(nextc);
            return true;
        }
        return false;
    }

    // 釋放持有的獨(dú)占鎖, 因?yàn)槭强芍厝腈i,所以只有當(dāng)c等于0的時(shí)候,表示當(dāng)前持有鎖的完全釋放了鎖。
    @Override
    protected boolean tryRelease(int releases) {
        // c表示新的鎖的記錄狀態(tài)
        int c = getState() - releases;
        // 如果當(dāng)前線程不是獨(dú)占鎖的線程,就拋出IllegalMonitorStateException異常
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        // 標(biāo)志是否可以釋放鎖
        boolean free = false;
        // 當(dāng)新的鎖的記錄狀態(tài)為0時(shí),表示可以釋放鎖
        if (c == 0) {
            free = true;
            // 設(shè)置獨(dú)占鎖的線程為null
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }
}

public class AQSTest {

    public static void newThread(Sync sync, String name, int time) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("線程"+Thread.currentThread().getName()+" 開始運(yùn)行,準(zhǔn)備獲取鎖");
                // 通過acquire方法,獲取鎖,如果沒有獲取到,就等待
                sync.acquire(1);
                try {
                    System.out.println("====線程"+Thread.currentThread().getName()+" 在run方法中獲取了鎖");
                    lockAgain();
                    try {
                        Thread.sleep(time);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } finally {
                    System.out.println("----線程"+Thread.currentThread().getName()+" 在run方法中釋放了鎖");
                    sync.release(1);
                }
            }

            private void lockAgain() {
                sync.acquire(1);
                try {
                    System.out.println("====線程"+Thread.currentThread().getName()+"  在lockAgain方法中再次獲取了鎖");
                    try {
                        Thread.sleep(10);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } finally {
                    System.out.println("----線程"+Thread.currentThread().getName()+" 在lockAgain方法中釋放了鎖");
                    sync.release(1);
                }
            }
        },name).start();
    }

    public static void main(String[] args) {
        Sync sync = new Sync();
        newThread(sync, "t1111", 1000);
        newThread(sync, "t2222", 1000);
        newThread(sync, "t3333", 1000);
    }
}

利用AbstractQueuedSynchronizer簡(jiǎn)單實(shí)現(xiàn)一個(gè)可重入的獨(dú)占鎖。

要實(shí)現(xiàn)獨(dú)占鎖,必須重寫tryAcquire和tryRelease方法。否則在獲取鎖和釋放鎖的時(shí)候,會(huì)拋出異常。

直接的調(diào)用AQS類的acquire(1)和release(1)方法獲取鎖和釋放鎖。

輸出結(jié)果是

線程t1111 開始運(yùn)行,準(zhǔn)備獲取鎖
====線程t1111 在run方法中獲取了鎖
====線程t1111  在lockAgain方法中再次獲取了鎖
線程t2222 開始運(yùn)行,準(zhǔn)備獲取鎖
線程t3333 開始運(yùn)行,準(zhǔn)備獲取鎖
----線程t1111 在lockAgain方法中釋放了鎖
----線程t1111 在run方法中釋放了鎖
====線程t2222 在run方法中獲取了鎖
====線程t2222  在lockAgain方法中再次獲取了鎖
----線程t2222 在lockAgain方法中釋放了鎖
----線程t2222 在run方法中釋放了鎖
====線程t3333 在run方法中獲取了鎖
====線程t3333  在lockAgain方法中再次獲取了鎖
----線程t3333 在lockAgain方法中釋放了鎖
----線程t3333 在run方法中釋放了鎖

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/76385.html

相關(guān)文章

  • AbstractQueuedSynchronizer 原理分析 - Condition 實(shí)現(xiàn)原理

    摘要:實(shí)現(xiàn)原理是通過基于單鏈表的條件隊(duì)列來管理等待線程的。中斷在轉(zhuǎn)移到同步隊(duì)列期間或之后發(fā)生,此時(shí)表明有線程正在調(diào)用轉(zhuǎn)移節(jié)點(diǎn)。在該種中斷模式下,再次設(shè)置線程的中斷狀態(tài)。 1. 簡(jiǎn)介 Condition是一個(gè)接口,AbstractQueuedSynchronizer 中的ConditionObject內(nèi)部類實(shí)現(xiàn)了這個(gè)接口。Condition聲明了一組等待/通知的方法,這些方法的功能與Objec...

    leone 評(píng)論0 收藏0
  • AbstractQueuedSynchronizer 原理分析 - Condition 實(shí)現(xiàn)原理

    摘要:實(shí)現(xiàn)原理是通過基于單鏈表的條件隊(duì)列來管理等待線程的。中斷在轉(zhuǎn)移到同步隊(duì)列期間或之后發(fā)生,此時(shí)表明有線程正在調(diào)用轉(zhuǎn)移節(jié)點(diǎn)。在該種中斷模式下,再次設(shè)置線程的中斷狀態(tài)。 1. 簡(jiǎn)介 Condition是一個(gè)接口,AbstractQueuedSynchronizer 中的ConditionObject內(nèi)部類實(shí)現(xiàn)了這個(gè)接口。Condition聲明了一組等待/通知的方法,這些方法的功能與Objec...

    李世贊 評(píng)論0 收藏0
  • AbstractQueuedSynchronizer 原理分析 - Condition 實(shí)現(xiàn)原理

    摘要:實(shí)現(xiàn)原理是通過基于單鏈表的條件隊(duì)列來管理等待線程的。中斷在轉(zhuǎn)移到同步隊(duì)列期間或之后發(fā)生,此時(shí)表明有線程正在調(diào)用轉(zhuǎn)移節(jié)點(diǎn)。在該種中斷模式下,再次設(shè)置線程的中斷狀態(tài)。 1. 簡(jiǎn)介 Condition是一個(gè)接口,AbstractQueuedSynchronizer 中的ConditionObject內(nèi)部類實(shí)現(xiàn)了這個(gè)接口。Condition聲明了一組等待/通知的方法,這些方法的功能與Objec...

    bigdevil_s 評(píng)論0 收藏0
  • 高并發(fā)

    摘要:表示的是兩個(gè),當(dāng)其中任意一個(gè)計(jì)算完并發(fā)編程之是線程安全并且高效的,在并發(fā)編程中經(jīng)??梢娝氖褂茫陂_始分析它的高并發(fā)實(shí)現(xiàn)機(jī)制前,先講講廢話,看看它是如何被引入的。電商秒殺和搶購,是兩個(gè)比較典型的互聯(lián)網(wǎng)高并發(fā)場(chǎng)景。 干貨:深度剖析分布式搜索引擎設(shè)計(jì) 分布式,高可用,和機(jī)器學(xué)習(xí)一樣,最近幾年被提及得最多的名詞,聽名字多牛逼,來,我們一步一步來擊破前兩個(gè)名詞,今天我們首先來說說分布式。 探究...

    supernavy 評(píng)論0 收藏0
  • 高并發(fā)

    摘要:表示的是兩個(gè),當(dāng)其中任意一個(gè)計(jì)算完并發(fā)編程之是線程安全并且高效的,在并發(fā)編程中經(jīng)??梢娝氖褂?,在開始分析它的高并發(fā)實(shí)現(xiàn)機(jī)制前,先講講廢話,看看它是如何被引入的。電商秒殺和搶購,是兩個(gè)比較典型的互聯(lián)網(wǎng)高并發(fā)場(chǎng)景。 干貨:深度剖析分布式搜索引擎設(shè)計(jì) 分布式,高可用,和機(jī)器學(xué)習(xí)一樣,最近幾年被提及得最多的名詞,聽名字多牛逼,來,我們一步一步來擊破前兩個(gè)名詞,今天我們首先來說說分布式。 探究...

    ddongjian0000 評(píng)論0 收藏0
  • 高并發(fā)

    摘要:表示的是兩個(gè),當(dāng)其中任意一個(gè)計(jì)算完并發(fā)編程之是線程安全并且高效的,在并發(fā)編程中經(jīng)??梢娝氖褂茫陂_始分析它的高并發(fā)實(shí)現(xiàn)機(jī)制前,先講講廢話,看看它是如何被引入的。電商秒殺和搶購,是兩個(gè)比較典型的互聯(lián)網(wǎng)高并發(fā)場(chǎng)景。 干貨:深度剖析分布式搜索引擎設(shè)計(jì) 分布式,高可用,和機(jī)器學(xué)習(xí)一樣,最近幾年被提及得最多的名詞,聽名字多牛逼,來,我們一步一步來擊破前兩個(gè)名詞,今天我們首先來說說分布式。 探究...

    wangdai 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

閱讀需要支付1元查看
<