成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

AbstractQueuedSynchronizer理解之一(ReentrantLock)

yunhao / 2405人閱讀

摘要:有了這個基礎(chǔ),才能發(fā)揮作用,使得在節(jié)點取消和異常時能夠保證隊列在多線程下的完整性。

Doug Lea是JDK中concurrent工具包的作者,這位大神是誰可以自行g(shù)oogle。

本文淺析ReentrantLock(可重入鎖)的原理

Lock接口

Lock接口定義了這幾個方法:

lock()
用來獲取鎖,如果鎖已經(jīng)被其他線程占有,則進(jìn)行等待,直到搶占到鎖;該方法在發(fā)送異常時不會自動釋放鎖,所以在使用時需要在finall塊中釋放鎖;

tryLock()和tryLock(long time, TimeUnit unit)
嘗試獲得鎖,如果鎖已經(jīng)被其他線程占有,返回false,成功獲取鎖返回true;該方法不會等待,立即返回;而帶有參數(shù)的tryLock在等待時長內(nèi)拿到鎖返回true,超時或者沒拿到鎖返回false;帶參數(shù)的方法還支持響應(yīng)中斷;

lockInterruptibly()
支持中斷的lock();

unlock()
釋放鎖;

newCondition()
新建Condition,Condition以后會分析;

ReentrantLock可重入鎖

ReentrantLock實現(xiàn)了Lock接口,ReentrantLock中有一個重要的成員變量,同步器sync繼承了AbstractQueuedSynchronizer簡稱AQS,我們先介紹AQS;

AQS用一個隊列(結(jié)構(gòu)是一個FIFO隊列)來管理同步狀態(tài),當(dāng)線程獲取同步狀態(tài)失敗時,會將當(dāng)前線程包裝成一個Node放入隊列,當(dāng)前線程進(jìn)入阻塞狀態(tài);當(dāng)同步狀態(tài)釋放時,會從隊列去出線程獲取同步狀態(tài)。

AQS里定義了head、tail、state,他們都是volatile修飾的,head指向隊列的第一個元素,tail指向隊列的最后一個元素,state表示了同步狀態(tài),這個狀態(tài)非常重要,在ReentrantLock中,state為0的時候代表鎖被釋放,state為1時代表鎖已經(jīng)被占用;

看下面代碼:

private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;

static {
    try {
        stateOffset = unsafe.objectFieldOffset
            (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
        headOffset = unsafe.objectFieldOffset
            (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
        tailOffset = unsafe.objectFieldOffset
            (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
        waitStatusOffset = unsafe.objectFieldOffset
            (Node.class.getDeclaredField("waitStatus"));
        nextOffset = unsafe.objectFieldOffset
            (Node.class.getDeclaredField("next"));

    } catch (Exception ex) { throw new Error(ex); }
}

這一段靜態(tài)初始化代碼初始了state、head、tail等變量的在內(nèi)存中的偏移量;Unsafe類是sun.misc下的類,不屬于java標(biāo)準(zhǔn)。Unsafe讓java可以像C語言一樣操作內(nèi)存指針,其中就提供了CAS的一些原子操作和park、unpark對線程掛起與恢復(fù)的操作;關(guān)于CAS是concurrent工具包的基礎(chǔ),以后會多帶帶介紹,其主要作用就是在硬件級別提供了compareAndSwap的功能,從而實現(xiàn)了比較和交換的原子性操作。

AQS還有一個內(nèi)部類叫Node,它將線程封裝,利用prev和next可以將Node串連成雙向鏈表,這就是一開始說的FIFO的結(jié)構(gòu);

ReentrantLock提供了公平鎖和非公平鎖,我們這里從非公平鎖分析AQS的應(yīng)用;
Lock調(diào)用lock()方法時調(diào)用了AQS的lock()方法,我們來看這個非公平鎖NonfairSync的lock方法:

final void lock() {
    //首先調(diào)用CAS搶占同步狀態(tài)state,如果成功則將當(dāng)前線程設(shè)置為同步器的獨占線程,
    //這也是非公平的體現(xiàn),因為新來的線程沒有馬上加入隊列尾部,而是先嘗試搶占同步狀態(tài)。
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        //搶占同步狀態(tài)失敗,調(diào)用AQS的acquire
        acquire(1);
}

瞄一眼acquire方法:

public final void acquire(int arg) {
    //在這里還是先試著搶占一下同步狀態(tài)
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

tryAcquire調(diào)用的是NonfairSync的實現(xiàn),然后又調(diào)用了Sync的nonfairTryAcquire方法:

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        //和之前一樣,利用CAS搶占同步狀態(tài),成功則設(shè)置當(dāng)前線程為獨占線程并且返回true
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    //如果當(dāng)前線程已經(jīng)是獨占線程,即當(dāng)前線程已經(jīng)獲得了同步狀態(tài)則將同步狀態(tài)state加1,
    //這里是可重入鎖的體現(xiàn)
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    //沒有搶占到同步狀態(tài)返回false
    return false;
}

再看addWaiter方法:

private Node addWaiter(Node mode) {
    //新建一個Node,封裝了當(dāng)前線程和模式,這里傳入的是獨占模式Node.EXCLUSIVE
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    //如果tail不為空就不需要初始化node隊列了
    if (pred != null) {
        //將node作為隊列最后一個元素入列
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            //返回新建的node
            return node;
        }
    }
    //如果tail為空則表示node隊列還沒有初始化,此時初始化隊列
    enq(node);
    return node;
}

瞄一眼enq方法:

private Node enq(final Node node) {
    //無限loop直到CAS成功,其他地方也大量使用了無限loop
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            //隊列尾部為空,必須初始化,head初始化為一個空node,不包含線程,tail = head
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            //隊列已經(jīng)初始化,將當(dāng)前node加在列尾
            node.prev = t;
            //將當(dāng)前node設(shè)置為tail,CAS操作,enqueue安全
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

拿到新建的node后傳給acquireQueued方法:

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        //標(biāo)記是否中斷狀態(tài)
        boolean interrupted = false;
        for (;;) {
            //拿到當(dāng)前node的前驅(qū)
            final Node p = node.predecessor();
            //如果前驅(qū)正好為head,即當(dāng)前線程在列首,馬上tryAcquire搶占同步狀態(tài)
            if (p == head && tryAcquire(arg)) {
                //搶占成功后,將當(dāng)前節(jié)點的thread、prev清空作為head
                setHead(node);
                p.next = null; // help GC 原來的head等待GC回收
                failed = false;
                return interrupted;
            }
            //沒有搶占成功后,判斷是否要park
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

瞄一眼shouldParkAfterFailedAcquire方法:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        //如果前驅(qū)node的狀態(tài)為SIGNAL,說明當(dāng)前node可以park
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    if (ws > 0) {
        //如果前驅(qū)的狀態(tài)大于0說明前驅(qū)node的thread已經(jīng)被取消
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            //從前驅(qū)node開始,將取消的node移出隊列
            //當(dāng)前節(jié)點之前的節(jié)點不會變化,所以這里可以更新prev,而且不必用CAS來更新。
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        //前驅(qū)node狀態(tài)等于0或者為PROPAGATE(以后會介紹)
        //將前驅(qū)node狀態(tài)設(shè)置為SIGNAL,返回false,表示當(dāng)前node暫不需要park,
        //可以再嘗試一下?lián)屨纪綘顟B(tài)
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don"t park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

看一下parkAndCheckInterrupt方法:

private final boolean parkAndCheckInterrupt() {
    //阻塞當(dāng)前線程
    LockSupport.park(this);
    //返回當(dāng)前線程是否設(shè)置中斷標(biāo)志,并清空中斷標(biāo)志
    return Thread.interrupted();
}

這里解釋一下為什么要保存一下中斷標(biāo)志:中斷會喚醒被park的阻塞線程,但被park的阻塞線程不會響應(yīng)中斷,所以這里保存一下中斷狀態(tài)并返回,如果狀態(tài)為true說明發(fā)生過中斷,會補發(fā)一次中斷,即調(diào)用interrupt()方法

在acquireQueued中發(fā)生異常時執(zhí)行cancelAcquire:

private void cancelAcquire(Node node) {
    // Ignore if node doesn"t exist
    if (node == null)
        return;
    //清空node的線程
    node.thread = null;

    // Skip cancelled predecessors
    //移除被取消的前繼node,這里只移動了node的prev,沒有改變next
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;

    // predNext is the apparent node to unsplice. CASes below will
    // fail if not, in which case, we lost race vs another cancel
    // or signal, so no further action is necessary.
    //獲取前繼node的后繼node
    Node predNext = pred.next;

    // Can use unconditional write instead of CAS here.
    // After this atomic step, other Nodes can skip past us.
    // Before, we are free of interference from other threads.
    //設(shè)置當(dāng)前node等待狀態(tài)為取消,其他線程檢測到取消狀態(tài)會移除它們
    node.waitStatus = Node.CANCELLED;

    // If we are the tail, remove ourselves.
    if (node == tail && compareAndSetTail(node, pred)) {
        //如果當(dāng)前node為tail,將前驅(qū)node設(shè)置為tail(CAS)
        //設(shè)置前驅(qū)node(即現(xiàn)在的tail)的后繼為null(CAS)
        //此時,如果中間有取消的node,將沒有引用指向它,將被GC回收
        compareAndSetNext(pred, predNext, null);
    } else {
        // If successor needs signal, try to set pred"s next-link
        // so it will get one. Otherwise wake it up to propagate.
        int ws;
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            //如果當(dāng)前node既不是head也不是tail,設(shè)置前繼node的后繼為當(dāng)前node后繼
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
            //喚醒當(dāng)前node后繼
            unparkSuccessor(node);
        }
        //當(dāng)前node的next設(shè)置為自己
        //注意現(xiàn)在當(dāng)前node的后繼的prev還指向當(dāng)前node,所以當(dāng)前node還未被刪除,prev是在移除取消節(jié)點時更新的
        //這里就是為什么在前面要從后往前找可換新的node原因了,next會導(dǎo)致死循環(huán)
        node.next = node; // help GC
    }
}

畫圖描述解析一下cancelAcquire:

首先看如何跳過取消的前驅(qū)

這時,前驅(qū)被取消的node并沒有被移出隊列,前驅(qū)的前驅(qū)的next還指向前驅(qū);

如果當(dāng)前node是tail的情況:

這時,沒有任何引用指向當(dāng)前node;

如果當(dāng)前node既不是tail也不是head:

這時,當(dāng)前node的前驅(qū)的next指向當(dāng)前node的后繼,當(dāng)前node的next指向自己,pre都沒有更新;

如果當(dāng)前node是head的后繼:

這時,只是簡單的將當(dāng)前node的next指向自己;

到這里,當(dāng)線程搶占同步狀態(tài)的時候,會進(jìn)入FIFO隊列等待同步狀態(tài)被釋放。在unlock()方法中調(diào)用了同步器的release方法;看一下release方法:

public final boolean release(int arg) {
    //判斷是否釋放同步狀態(tài)成功
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            //如果head不為null,且head的等待狀態(tài)不為0,
            //喚醒后繼node的線程
            unparkSuccessor(h);
        return true;
    }
    return false;
}

再來看一下tryRelease方法(在Sync類中實現(xiàn)):

    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        //當(dāng)前thread不是獨占模式的那個線程,拋出異常
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
            //如果同步狀態(tài)state為0,釋放成功,將獨占線程設(shè)置為null
            free = true;
            setExclusiveOwnerThread(null);
        }
        //更新同步狀態(tài)state
        setState(c);
        return free;
    }

繼續(xù)看unparkSuccessor(喚醒后繼node的tread)方法:

private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;
    if (ws < 0)
        //head的等待狀態(tài)為負(fù)數(shù),設(shè)置head的等待狀態(tài)為0
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        //如果head的后繼node不存在或者后繼node等待狀態(tài)大于0(即取消)
        //從尾部往當(dāng)前node迭代找到等待狀態(tài)為負(fù)數(shù)的node,unpark
        //因為會有取消的節(jié)點
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

總結(jié)

介紹完ReentrantLock后,我們大體了解了AQS的工作原理。AQS主要就是使用了同步狀態(tài)和隊列實現(xiàn)了鎖的功能。有了CAS這個基礎(chǔ),AQS才能發(fā)揮作用,使得在enqueue、dequeque、節(jié)點取消和異常時能夠保證隊列在多線程下的完整性。

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/68433.html

相關(guān)文章

  • AbstractQueuedSynchronizer理解之一ReentrantLock

    摘要:有了這個基礎(chǔ),才能發(fā)揮作用,使得在節(jié)點取消和異常時能夠保證隊列在多線程下的完整性。 Doug Lea是JDK中concurrent工具包的作者,這位大神是誰可以自行g(shù)oogle。 本文淺析ReentrantLock(可重入鎖)的原理 Lock接口 showImg(https://segmentfault.com/img/bV2671?w=276&h=176); Lock接口定義了這幾個...

    learning 評論0 收藏0
  • AbstractQueuedSynchronizer理解之一ReentrantLock

    摘要:有了這個基礎(chǔ),才能發(fā)揮作用,使得在節(jié)點取消和異常時能夠保證隊列在多線程下的完整性。 Doug Lea是JDK中concurrent工具包的作者,這位大神是誰可以自行g(shù)oogle。 本文淺析ReentrantLock(可重入鎖)的原理 Lock接口 showImg(https://segmentfault.com/img/bV2671?w=276&h=176); Lock接口定義了這幾個...

    bigdevil_s 評論0 收藏0
  • AbstractQueuedSynchronizer 理解 ReentrantLock

    摘要:當(dāng)前線程已經(jīng)獲取過這個鎖,則此時是重入,改變的計數(shù)即可,返回表示加鎖成功。的核心在于使用更新鎖的狀態(tài),并利用一個同步隊列將獲取鎖失敗的線程進(jìn)行排隊,當(dāng)前驅(qū)節(jié)點解鎖后再喚醒后繼節(jié)點,是一個幾乎純實現(xiàn)的加鎖與解鎖。 簡介 Java 并發(fā)編程離不開鎖, Synchronized 是常用的一種實現(xiàn)加鎖的方式,使用比較簡單快捷。在 Java 中還有另一種鎖,即 Lock 鎖。 Lock 是一個接...

    LeoHsiun 評論0 收藏0
  • Java多線程進(jìn)階(七)—— J.U.C之locks框架:AQS獨占功能剖析(2)

    摘要:開始獲取鎖終于輪到出場了,的調(diào)用過程和完全一樣,同樣拿不到鎖,然后加入到等待隊列隊尾然后,在阻塞前需要把前驅(qū)結(jié)點的狀態(tài)置為,以確保將來可以被喚醒至此,的執(zhí)行也暫告一段落了安心得在等待隊列中睡覺。 showImg(https://segmentfault.com/img/remote/1460000016012467); 本文首發(fā)于一世流云的專欄:https://segmentfault...

    JayChen 評論0 收藏0
  • Java 重入鎖 ReentrantLock 原理分析

    摘要:的主要功能和關(guān)鍵字一致,均是用于多線程的同步。而僅支持通過查詢當(dāng)前線程是否持有鎖。由于和使用的是同一把可重入鎖,所以線程可以進(jìn)入方法,并再次獲得鎖,而不會被阻塞住。公平與非公平公平與非公平指的是線程獲取鎖的方式。 1.簡介 可重入鎖ReentrantLock自 JDK 1.5 被引入,功能上與synchronized關(guān)鍵字類似。所謂的可重入是指,線程可對同一把鎖進(jìn)行重復(fù)加鎖,而不會被阻...

    lx1036 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<