成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專(zhuān)欄INFORMATION COLUMN

Java多線程框架源碼閱讀之---ReentrantLock非公平鎖

zacklee / 2767人閱讀

摘要:注意是一個(gè)假節(jié)點(diǎn),阻塞的節(jié)點(diǎn)是作為后面的節(jié)點(diǎn)出現(xiàn)的。總之在非公平鎖場(chǎng)景下嘗試去獲取鎖,如果獲取上了,則置一下?tīng)顟B(tài),并設(shè)置自己為獨(dú)占線程,并支持重入鎖功能。方法用于創(chuàng)建一個(gè)節(jié)點(diǎn)值為當(dāng)前線程并維護(hù)一個(gè)雙向鏈表。阻塞了當(dāng)前線程。

部分段落來(lái)自于http://javadoop.com/post/Abst...,他的文章相當(dāng)不錯(cuò)。

ReentrantLock基于Sync內(nèi)部類(lèi)來(lái)完成鎖。Sync繼承于AbstractQueuedSynchronizer。Sync有兩個(gè)不同的子類(lèi)NonfairSync和FairSync。

ReentrantLock的大部分方法都是基于AbstractQueuedSynchronizer實(shí)現(xiàn),大部分僅僅是對(duì)AbstractQueuedSynchronizer的轉(zhuǎn)發(fā)。因此,了解AbstractQueuedSynchronizer就非常重要。

作為AbstractQueuedSynchronizer的實(shí)現(xiàn)者需要實(shí)現(xiàn)isHeldExclusively,tryAcquire,tryRelease,(可選tryAcquireShared,tryReleaseShared)

那么我們看看對(duì)于一個(gè)常用的套路,ReentrantLock是如何實(shí)現(xiàn)同步的

lock.lock();
try{
   i++;
}finally {
   lock.unlock();
}

lock.lock()內(nèi)部實(shí)現(xiàn)為調(diào)用了sync.lock(),之后又會(huì)調(diào)用NonfairSync或FairSync的lock(),你看果然重度使用了AQS吧,這里我們先記住這個(gè)位置,一會(huì)我們還會(huì)回來(lái)分析。

public void lock() {
    sync.lock();
}

先介紹一下AQS里面的屬性,不復(fù)雜就4個(gè)主要的屬性:AQS里面阻塞的節(jié)點(diǎn)是作為隊(duì)列出現(xiàn)的,維護(hù)了一個(gè)head節(jié)點(diǎn)和tail節(jié)點(diǎn),同時(shí)維護(hù)了一個(gè)阻塞狀態(tài),如果state=0表示沒(méi)有鎖,如果state>0表示鎖被重入了幾次。
注意head是一個(gè)假節(jié)點(diǎn),阻塞的節(jié)點(diǎn)是作為head后面的節(jié)點(diǎn)出現(xiàn)的。

// 頭結(jié)點(diǎn),你直接把它當(dāng)做 當(dāng)前持有鎖的線程 可能是最好理解的
private transient volatile Node head;
// 阻塞的尾節(jié)點(diǎn),每個(gè)新的節(jié)點(diǎn)進(jìn)來(lái),都插入到最后,也就形成了一個(gè)隱視的鏈表
private transient volatile Node tail;
// 這個(gè)是最重要的,不過(guò)也是最簡(jiǎn)單的,代表當(dāng)前鎖的狀態(tài),0代表沒(méi)有被占用,大于0代表有線程持有當(dāng)前鎖
// 之所以說(shuō)大于0,而不是等于1,是因?yàn)殒i可以重入嘛,每次重入都加上1
private volatile int state;
// 代表當(dāng)前持有獨(dú)占鎖的線程,舉個(gè)最重要的使用例子,因?yàn)殒i可以重入
// reentrantLock.lock()可以嵌套調(diào)用多次,所以每次用這個(gè)來(lái)判斷當(dāng)前線程是否已經(jīng)擁有了鎖
// if (currentThread == getExclusiveOwnerThread()) {state++}
private transient Thread exclusiveOwnerThread; //繼承自AbstractOwnableSynchronizer

接著看一下FairSync和NonfairSync的實(shí)現(xiàn),F(xiàn)airSync和NonfairSync都繼承了Sync,而且Sync又繼承了AbstractQueuedSynchronizer。可以看到FairSync和NonfairSync直接或間接的實(shí)現(xiàn)了isHeldExclusively,tryAcquire,tryRelease這三個(gè)方法。

abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = -5179523762034025860L;

    /**
     * Performs {@link Lock#lock}. The main reason for subclassing
     * is to allow fast path for nonfair version.
     */
    abstract void lock();

    /**
     * Performs non-fair tryLock.  tryAcquire is implemented in
     * subclasses, but both need nonfair try for trylock method.
     */
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
     //如果沒(méi)有鎖上,則設(shè)置為鎖上并設(shè)置自己為獨(dú)占線程
        if (c == 0) {
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
     //如果鎖上了,而且獨(dú)占線程是自己,那么重新設(shè)置state+1,并且返回true
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
     //否則返回false
        return false;
    }

    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }

    protected final boolean isHeldExclusively() {
        // While we must in general read state before owner,
        // we don"t need to do so to check if current thread is owner
        return getExclusiveOwnerThread() == Thread.currentThread();
    }

    final ConditionObject newCondition() {
        return new ConditionObject();
    }

    // Methods relayed from outer class

    final Thread getOwner() {
        return getState() == 0 ? null : getExclusiveOwnerThread();
    }

    final int getHoldCount() {
        return isHeldExclusively() ? getState() : 0;
    }

    final boolean isLocked() {
        return getState() != 0;
    }

    /**
     * Reconstitutes the instance from a stream (that is, deserializes it).
     */
    private void readObject(java.io.ObjectInputStream s)
        throws java.io.IOException, ClassNotFoundException {
        s.defaultReadObject();
        setState(0); // reset to unlocked state
    }
}
static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;

    /**
     * Performs lock.  Try immediate barge, backing up to normal
     * acquire on failure.
     */
    final void lock() {
        //如果沒(méi)有人鎖上,那么就設(shè)置我自己為獨(dú)占線程,否則再acquire一次
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            //調(diào)用到了AQS的acquire里面
            acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}
static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;

    final void lock() {
        acquire(1);
    }

    /**
     * Fair version of tryAcquire.  Don"t grant access unless
     * recursive call or no waiters or is first.
     */
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}

之前我們說(shuō)到回到ReentrantLock的lock()調(diào)用了sync.lock();現(xiàn)在我們回來(lái)看看非公平鎖的邏輯是:如果搶到鎖了,則設(shè)置自己的線程為占有鎖的線程,否則調(diào)用acquire(1),這個(gè)是AQS的方法。

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

acquire會(huì)調(diào)用tryAcquire,而這個(gè)是對(duì)于不同的實(shí)現(xiàn)是不一樣的,非公平鎖NonfairSync里面的tryAcquire,而tryAcquire又會(huì)調(diào)用到Sync的nonfairTryAcquire??傊畉ryAcquire在非公平鎖場(chǎng)景下嘗試去獲取鎖,如果獲取上了,則置一下AQS狀態(tài)state,并設(shè)置自己為獨(dú)占線程,并支持重入鎖功能。

addWaiter方法用于創(chuàng)建一個(gè)節(jié)點(diǎn)(值為當(dāng)前線程)并維護(hù)一個(gè)雙向鏈表。

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
    
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

現(xiàn)在說(shuō)一下Node的結(jié)構(gòu),主要有用的field為waitStatus,prev,next,thread。waitStatus目前僅要了解1,0,-1就夠了。 0是默認(rèn)狀態(tài),1代表爭(zhēng)取鎖取消,-1表示它的后繼節(jié)點(diǎn)對(duì)應(yīng)的線程需要被喚醒。也就是說(shuō)這個(gè)waitStatus其實(shí)代表的不是自己的狀態(tài),而是后繼節(jié)點(diǎn)的狀態(tài)??梢钥匆?jiàn)默認(rèn)進(jìn)隊(duì)的節(jié)點(diǎn)的waitStatus都是0

static final class Node {
    /** Marker to indicate a node is waiting in shared mode */
    // 標(biāo)識(shí)節(jié)點(diǎn)當(dāng)前在共享模式下
    static final Node SHARED = new Node();
    /** Marker to indicate a node is waiting in exclusive mode */
    // 標(biāo)識(shí)節(jié)點(diǎn)當(dāng)前在獨(dú)占模式下
    static final Node EXCLUSIVE = null;

    // ======== 下面的幾個(gè)int常量是給waitStatus用的 ===========
    /** waitStatus value to indicate thread has cancelled */
    // 代碼此線程取消了爭(zhēng)搶這個(gè)鎖
    static final int CANCELLED =  1;
    /** waitStatus value to indicate successor"s thread needs unparking */
    // 官方的描述是,其表示當(dāng)前node的后繼節(jié)點(diǎn)對(duì)應(yīng)的線程需要被喚醒
    static final int SIGNAL    = -1;
    /** waitStatus value to indicate thread is waiting on condition */
    // 本文不分析condition,所以略過(guò)吧
    static final int CONDITION = -2;
    /**
     * waitStatus value to indicate the next acquireShared should
     * unconditionally propagate
     */
    // 同樣的不分析,略過(guò)吧
    static final int PROPAGATE = -3;
    // =====================================================

    // 取值為上面的1、-1、-2、-3,或者0(以后會(huì)講到)
    // 這么理解,暫時(shí)只需要知道如果這個(gè)值 大于0 代表此線程取消了等待,
    // 也許就是說(shuō)半天搶不到鎖,不搶了,ReentrantLock是可以指定timeouot的。。。
    volatile int waitStatus;
    // 前驅(qū)節(jié)點(diǎn)的引用
    volatile Node prev;
    // 后繼節(jié)點(diǎn)的引用
    volatile Node next;
    // 這個(gè)就是線程本尊
    volatile Thread thread;
}

acquireQueued的作用是從等待隊(duì)列中嘗試去把入隊(duì)的那個(gè)節(jié)點(diǎn)去做park。另外當(dāng)節(jié)點(diǎn)unpark以后,也會(huì)在循環(huán)中將自己設(shè)置成頭結(jié)點(diǎn),然后自己拿到鎖

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                //對(duì)于隊(duì)首節(jié)點(diǎn),剛才也許沒(méi)有搶到鎖,現(xiàn)在也許能搶到了,再試一次
                if (p == head && tryAcquire(arg)) {
                    //如果搶到了鎖,這個(gè)入隊(duì)的節(jié)點(diǎn)根本不需要park,直接可以執(zhí)行
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //如果不是隊(duì)首節(jié)點(diǎn),或者是隊(duì)首但是沒(méi)有搶過(guò)其他節(jié)點(diǎn)
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

shouldParkAfterFailedAcquire。這個(gè)方法說(shuō)的是:"當(dāng)前線程沒(méi)有搶到鎖,是否需要掛起當(dāng)前線程?第一個(gè)參數(shù)是前驅(qū)節(jié)點(diǎn),第二個(gè)參數(shù)才是代表當(dāng)前線程的節(jié)點(diǎn)。注意因?yàn)槟J(rèn)加入的節(jié)點(diǎn)的狀態(tài)都是0,這個(gè)方法會(huì)進(jìn)來(lái)兩次,第一次進(jìn)來(lái)走到else分支里面修改前置節(jié)點(diǎn)的waitStatus為-1.第二次進(jìn)來(lái)直接返回true。對(duì)于剛加入隊(duì)列的節(jié)點(diǎn),修改head節(jié)點(diǎn)的waitStatus為-1,對(duì)于后來(lái)加入的節(jié)點(diǎn),修改它前一個(gè)節(jié)點(diǎn)的waitStatus為-1。

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don"t park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

parkAndCheckInterrupt的代碼很簡(jiǎn)單,這個(gè)this就是ReentrantLock類(lèi)的實(shí)例。阻塞了當(dāng)前線程。

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

再來(lái)看看怎么解鎖。

public void unlock() {
    sync.release(1);
}

調(diào)用到AQS里面,如果鎖被完全釋放了,那么就unpark head的下一個(gè)

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

tryRelease是由Sync覆蓋的。重置AQS里面的state,返回鎖是否被完全釋放了的判斷。

        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        //下面的代碼就是喚醒后繼節(jié)點(diǎn),但是有可能后繼節(jié)點(diǎn)取消了等待(waitStatus==1)
        // 從隊(duì)尾往前找,找到waitStatus<=0的所有節(jié)點(diǎn)中排在最前面的 
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

等到unpark以后,parkAndCheckInterrupt的阻塞解除,將繼續(xù)for無(wú)限循環(huán),因?yàn)槭顷?duì)列里是一個(gè)一個(gè)阻塞的,此時(shí)阻塞節(jié)點(diǎn)的前置依次都是head,因此if (p == head && tryAcquire(arg)) 這句話如果它醒來(lái)?yè)屾i成功了將執(zhí)行成功,阻塞的線程獲取鎖并執(zhí)行,將自己設(shè)置成head,同時(shí)也將自己從隊(duì)列中清除出去。 注意這里是非公平鎖,因此在tryAcquire有可能還沒(méi)有搶過(guò)其他線程,那么搶到的那個(gè)將會(huì)直接執(zhí)行,而沒(méi)有搶到的,又在循環(huán)里鎖住了。

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/68033.html

相關(guān)文章

  • Java線程進(jìn)階(三)—— J.U.Clocks框架ReentrantLock

    摘要:公平策略在多個(gè)線程爭(zhēng)用鎖的情況下,公平策略傾向于將訪問(wèn)權(quán)授予等待時(shí)間最長(zhǎng)的線程。使用方式的典型調(diào)用方式如下二類(lèi)原理的源碼非常簡(jiǎn)單,它通過(guò)內(nèi)部類(lèi)實(shí)現(xiàn)了框架,接口的實(shí)現(xiàn)僅僅是對(duì)的的簡(jiǎn)單封裝,參見(jiàn)原理多線程進(jìn)階七鎖框架獨(dú)占功能剖析 showImg(https://segmentfault.com/img/remote/1460000016012582); 本文首發(fā)于一世流云的專(zhuān)欄:https...

    jasperyang 評(píng)論0 收藏0
  • Java線程——重入ReentrantLock源碼閱讀

    摘要:所謂的重入,就是當(dāng)本線程想再次獲得鎖,不需要重新申請(qǐng),它本身就已經(jīng)鎖了,即重入該鎖。如果不為,則表示有線程已經(jīng)占有了??偨Y(jié)回顧下要點(diǎn)是一個(gè)可重入的鎖被當(dāng)前占用的線程重入。 上一章《AQS源碼閱讀》講了AQS框架,這次講講它的應(yīng)用類(lèi)(注意不是子類(lèi)實(shí)現(xiàn),待會(huì)細(xì)講)。ReentrantLock,顧名思義重入鎖,但什么是重入,這個(gè)鎖到底是怎樣的,我們來(lái)看看類(lèi)的注解說(shuō)明showImg(http:...

    sushi 評(píng)論0 收藏0
  • Java 重入 ReentrantLock 原理分析

    摘要:的主要功能和關(guān)鍵字一致,均是用于多線程的同步。而僅支持通過(guò)查詢當(dāng)前線程是否持有鎖。由于和使用的是同一把可重入鎖,所以線程可以進(jìn)入方法,并再次獲得鎖,而不會(huì)被阻塞住。公平與非公平公平與非公平指的是線程獲取鎖的方式。 1.簡(jiǎn)介 可重入鎖ReentrantLock自 JDK 1.5 被引入,功能上與synchronized關(guān)鍵字類(lèi)似。所謂的可重入是指,線程可對(duì)同一把鎖進(jìn)行重復(fù)加鎖,而不會(huì)被阻...

    lx1036 評(píng)論0 收藏0
  • Java線程—ReentrantReadWriteLock源碼閱讀

    摘要:不同的是它還多了內(nèi)部類(lèi)和內(nèi)部類(lèi),以及讀寫(xiě)對(duì)應(yīng)的成員變量和方法。另外是給和內(nèi)部類(lèi)使用的。內(nèi)部類(lèi)前面說(shuō)到的操作是分配到里面執(zhí)行的。他們都是接口的實(shí)現(xiàn),所以其實(shí)最像應(yīng)該是這個(gè)兩個(gè)內(nèi)部類(lèi)。而且大體上也沒(méi)什么差異,也是用的內(nèi)部類(lèi)。 之前講了《AQS源碼閱讀》和《ReentrantLock源碼閱讀》,本次將延續(xù)閱讀下ReentrantReadWriteLock,建議沒(méi)看過(guò)之前兩篇文章的,先大概了解...

    Ververica 評(píng)論0 收藏0
  • Java線程進(jìn)階(十)—— J.U.Clocks框架:基于AQS的讀寫(xiě)(5)

    摘要:關(guān)于,最后有兩點(diǎn)規(guī)律需要注意當(dāng)?shù)牡却?duì)列隊(duì)首結(jié)點(diǎn)是共享結(jié)點(diǎn),說(shuō)明當(dāng)前寫(xiě)鎖被占用,當(dāng)寫(xiě)鎖釋放時(shí),會(huì)以傳播的方式喚醒頭結(jié)點(diǎn)之后緊鄰的各個(gè)共享結(jié)點(diǎn)。當(dāng)?shù)牡却?duì)列隊(duì)首結(jié)點(diǎn)是獨(dú)占結(jié)點(diǎn),說(shuō)明當(dāng)前讀鎖被使用,當(dāng)讀鎖釋放歸零后,會(huì)喚醒隊(duì)首的獨(dú)占結(jié)點(diǎn)。 showImg(https://segmentfault.com/img/remote/1460000016012293); 本文首發(fā)于一世流云的專(zhuān)欄:...

    dunizb 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<