成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專(zhuān)欄INFORMATION COLUMN

AbstractQueuedSynchronizer的介紹和原理分析

Yuanf / 986人閱讀

摘要:同步器擁有三個(gè)成員變量隊(duì)列的頭結(jié)點(diǎn)隊(duì)列的尾節(jié)點(diǎn)和狀態(tài)。對(duì)于同步器維護(hù)的狀態(tài),多個(gè)線(xiàn)程對(duì)其的獲取將會(huì)產(chǎn)生一個(gè)鏈?zhǔn)降慕Y(jié)構(gòu)。使用將當(dāng)前線(xiàn)程,關(guān)于后續(xù)會(huì)詳細(xì)介紹。

簡(jiǎn)介

提供了一個(gè)基于FIFO隊(duì)列,可以用于構(gòu)建鎖或者其他相關(guān)同步裝置的基礎(chǔ)框架。該同步器(以下簡(jiǎn)稱(chēng)同步器)利用了一個(gè)int來(lái)表示狀態(tài),期望它能夠成為實(shí)現(xiàn)大部分同步需求的基礎(chǔ)。使用的方法是繼承,子類(lèi)通過(guò)繼承同步器并需要實(shí)現(xiàn)它的方法來(lái)管理其狀態(tài),管理的方式就是通過(guò)類(lèi)似acquire和release的方式來(lái)操縱狀態(tài)。然而多線(xiàn)程環(huán)境中對(duì)狀態(tài)的操縱必須確保原子性,因此子類(lèi)對(duì)于狀態(tài)的把握,需要使用這個(gè)同步器提供的以下三個(gè)方法對(duì)狀態(tài)進(jìn)行操作:

java.util.concurrent.locks.AbstractQueuedSynchronizer.getState()

java.util.concurrent.locks.AbstractQueuedSynchronizer.setState(int)

java.util.concurrent.locks.AbstractQueuedSynchronizer.compareAndSetState(int, int)

子類(lèi)推薦被定義為自定義同步裝置的內(nèi)部類(lèi),同步器自身沒(méi)有實(shí)現(xiàn)任何同步接口,它僅僅是定義了若干acquire之類(lèi)的方法來(lái)供使用。該同步器即可以作為排他模式也可以作為共享模式,當(dāng)它被定義為一個(gè)排他模式時(shí),其他線(xiàn)程對(duì)其的獲取就被阻止,而共享模式對(duì)于多個(gè)線(xiàn)程獲取都可以成功。

同步器是實(shí)現(xiàn)鎖的關(guān)鍵,利用同步器將鎖的語(yǔ)義實(shí)現(xiàn),然后在鎖的實(shí)現(xiàn)中聚合同步器??梢赃@樣理解:鎖的API是面向使用者的,它定義了與鎖交互的公共行為,而每個(gè)鎖需要完成特定的操作也是透過(guò)這些行為來(lái)完成的(比如:可以允許兩個(gè)線(xiàn)程進(jìn)行加鎖,排除兩個(gè)以上的線(xiàn)程),但是實(shí)現(xiàn)是依托給同步器來(lái)完成;同步器面向的是線(xiàn)程訪(fǎng)問(wèn)和資源控制,它定義了線(xiàn)程對(duì)資源是否能夠獲取以及線(xiàn)程的排隊(duì)等操作。鎖和同步器很好的隔離了二者所需要關(guān)注的領(lǐng)域,嚴(yán)格意義上講,同步器可以適用于除了鎖以外的其他同步設(shè)施上(包括鎖)。

同步器的開(kāi)始提到了其實(shí)現(xiàn)依賴(lài)于一個(gè)FIFO隊(duì)列,那么隊(duì)列中的元素Node就是保存著線(xiàn)程引用和線(xiàn)程狀態(tài)的容器,每個(gè)線(xiàn)程對(duì)同步器的訪(fǎng)問(wèn),都可以看做是隊(duì)列中的一個(gè)節(jié)點(diǎn)。Node的主要包含以下成員變量:

Node {
    int waitStatus;
    Node prev;
    Node next;
    Node nextWaiter;
    Thread thread;
}

以上五個(gè)成員變量主要負(fù)責(zé)保存該節(jié)點(diǎn)的線(xiàn)程引用,同步等待隊(duì)列(以下簡(jiǎn)稱(chēng)sync隊(duì)列)的前驅(qū)和后繼節(jié)點(diǎn),同時(shí)也包括了同步狀態(tài)。

屬性名稱(chēng)

描述

int waitStatus

表示節(jié)點(diǎn)的狀態(tài)。其中包含的狀態(tài)有:

CANCELLED,值為1,表示當(dāng)前的線(xiàn)程被取消;

SIGNAL,值為-1,表示當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn)包含的線(xiàn)程需要運(yùn)行,也就是unpark;

CONDITION,值為-2,表示當(dāng)前節(jié)點(diǎn)在等待condition,也就是在condition隊(duì)列中;

PROPAGATE,值為-3,表示當(dāng)前場(chǎng)景下后續(xù)的acquireShared能夠得以執(zhí)行;

值為0,表示當(dāng)前節(jié)點(diǎn)在sync隊(duì)列中,等待著獲取鎖。

Node prev

前驅(qū)節(jié)點(diǎn),比如當(dāng)前節(jié)點(diǎn)被取消,那就需要前驅(qū)節(jié)點(diǎn)和后繼節(jié)點(diǎn)來(lái)完成連接。

Node next

后繼節(jié)點(diǎn)。

Node nextWaiter

存儲(chǔ)condition隊(duì)列中的后繼節(jié)點(diǎn)。

Thread thread

入隊(duì)列時(shí)的當(dāng)前線(xiàn)程。

節(jié)點(diǎn)成為sync隊(duì)列和condition隊(duì)列構(gòu)建的基礎(chǔ),在同步器中就包含了sync隊(duì)列。同步器擁有三個(gè)成員變量:sync隊(duì)列的頭結(jié)點(diǎn)head、sync隊(duì)列的尾節(jié)點(diǎn)tail和狀態(tài)state。對(duì)于鎖的獲取,請(qǐng)求形成節(jié)點(diǎn),將其掛載在尾部,而鎖資源的轉(zhuǎn)移(釋放再獲?。┦菑念^部開(kāi)始向后進(jìn)行。對(duì)于同步器維護(hù)的狀態(tài)state,多個(gè)線(xiàn)程對(duì)其的獲取將會(huì)產(chǎn)生一個(gè)鏈?zhǔn)降慕Y(jié)構(gòu)。

API說(shuō)明

實(shí)現(xiàn)自定義同步器時(shí),需要使用同步器提供的getState()、setState()和compareAndSetState()方法來(lái)操縱狀態(tài)的變遷。

protected boolean tryAcquire(int arg)

排它的獲取這個(gè)狀態(tài)。這個(gè)方法的實(shí)現(xiàn)需要查詢(xún)當(dāng)前狀態(tài)是否允許獲取,然后再進(jìn)行獲?。ㄊ褂胏ompareAndSetState來(lái)做)狀態(tài)。

protected boolean tryRelease(int arg)?

釋放狀態(tài)。

protected int tryAcquireShared(int arg)

共享的模式下獲取狀態(tài)。

protected boolean tryReleaseShared(int arg)

共享的模式下釋放狀態(tài)。

protected boolean isHeldExclusively()

在排它模式下,狀態(tài)是否被占用。

實(shí)現(xiàn)這些方法必須是非阻塞而且是線(xiàn)程安全的,推薦使用該同步器的父類(lèi)java.util.concurrent.locks.AbstractOwnableSynchronizer來(lái)設(shè)置當(dāng)前的線(xiàn)程。

開(kāi)始提到同步器內(nèi)部基于一個(gè)FIFO隊(duì)列,對(duì)于一個(gè)獨(dú)占鎖的獲取和釋放有以下偽碼可以表示。

獲取一個(gè)排他鎖。

while(獲取鎖) {
    if (獲取到) {
        退出while循環(huán)
    } else {
        if(當(dāng)前線(xiàn)程沒(méi)有入隊(duì)列) {
            那么入隊(duì)列
        }
        阻塞當(dāng)前線(xiàn)程
    }
}

釋放一個(gè)排他鎖。

if (釋放成功) {
    刪除頭結(jié)點(diǎn)
    激活原頭結(jié)點(diǎn)的后繼節(jié)點(diǎn)
}

示例

下面通過(guò)一個(gè)排它鎖的例子來(lái)深入理解一下同步器的工作原理,而只有掌握同步器的工作原理才能夠更加深入了解其他的并發(fā)組件。

排他鎖的實(shí)現(xiàn),一次只能一個(gè)線(xiàn)程獲取到鎖。

class Mutex implements Lock, java.io.Serializable {
   // 內(nèi)部類(lèi),自定義同步器
   private static class Sync extends AbstractQueuedSynchronizer {
     // 是否處于占用狀態(tài)
     protected boolean isHeldExclusively() {
       return getState() == 1;
     }
     // 當(dāng)狀態(tài)為0的時(shí)候獲取鎖
     public boolean tryAcquire(int acquires) {
       assert acquires == 1; // Otherwise unused
       if (compareAndSetState(0, 1)) {
         setExclusiveOwnerThread(Thread.currentThread());
         return true;
       }
       return false;
     }
     // 釋放鎖,將狀態(tài)設(shè)置為0
     protected boolean tryRelease(int releases) {
       assert releases == 1; // Otherwise unused
       if (getState() == 0) throw new IllegalMonitorStateException();
       setExclusiveOwnerThread(null);
       setState(0);
       return true;
     }
     // 返回一個(gè)Condition,每個(gè)condition都包含了一個(gè)condition隊(duì)列
     Condition newCondition() { return new ConditionObject(); }
   }
   // 僅需要將操作代理到Sync上即可
   private final Sync sync = new Sync();
   public void lock()                { sync.acquire(1); }
   public boolean tryLock()          { return sync.tryAcquire(1); }
   public void unlock()              { sync.release(1); }
   public Condition newCondition()   { return sync.newCondition(); }
   public boolean isLocked()         { return sync.isHeldExclusively(); }
   public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
   public void lockInterruptibly() throws InterruptedException {
     sync.acquireInterruptibly(1);
   }
   public boolean tryLock(long timeout, TimeUnit unit)
       throws InterruptedException {
     return sync.tryAcquireNanos(1, unit.toNanos(timeout));
   }
 }

可以看到Mutex將Lock接口均代理給了同步器的實(shí)現(xiàn)。

使用方將Mutex構(gòu)造出來(lái)之后,調(diào)用lock獲取鎖,調(diào)用unlock進(jìn)行解鎖。下面以Mutex為例子,詳細(xì)分析以下同步器的實(shí)現(xiàn)邏輯。

實(shí)現(xiàn)分析

public final void acquire(int arg)

該方法以排他的方式獲取鎖,對(duì)中斷不敏感,完成synchronized語(yǔ)義。

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
}

上述邏輯主要包括:

嘗試獲?。ㄕ{(diào)用tryAcquire更改狀態(tài),需要保證原子性);

tryAcquire方法中使用了同步器提供的對(duì)state操作的方法,利用compareAndSet保證只有一個(gè)線(xiàn)程能夠?qū)顟B(tài)進(jìn)行成功修改,而沒(méi)有成功修改的線(xiàn)程將進(jìn)入sync隊(duì)列排隊(duì)。

如果獲取不到,將當(dāng)前線(xiàn)程構(gòu)造成節(jié)點(diǎn)Node并加入sync隊(duì)列;

進(jìn)入隊(duì)列的每個(gè)線(xiàn)程都是一個(gè)節(jié)點(diǎn)Node,從而形成了一個(gè)雙向隊(duì)列,類(lèi)似CLH隊(duì)列,這樣做的目的是線(xiàn)程間的通信會(huì)被限制在較小規(guī)模(也就是兩個(gè)節(jié)點(diǎn)左右)。

再次嘗試獲取,如果沒(méi)有獲取到那么將當(dāng)前線(xiàn)程從線(xiàn)程調(diào)度器上摘下,進(jìn)入等待狀態(tài)。

使用LockSupport將當(dāng)前線(xiàn)程unpark,關(guān)于LockSupport后續(xù)會(huì)詳細(xì)介紹。

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // 快速?lài)L試在尾部添加
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
            t.next = node;
            return t;
        }
    }
}

上述邏輯主要包括:

使用當(dāng)前線(xiàn)程構(gòu)造Node;

對(duì)于一個(gè)節(jié)點(diǎn)需要做的是將當(dāng)節(jié)點(diǎn)前驅(qū)節(jié)點(diǎn)指向尾節(jié)點(diǎn)(current.prev = tail),尾節(jié)點(diǎn)指向它(tail = current),原有的尾節(jié)點(diǎn)的后繼節(jié)點(diǎn)指向它(t.next = current)而這些操作要求是原子的。上面的操作是利用尾節(jié)點(diǎn)的設(shè)置來(lái)保證的,也就是compareAndSetTail來(lái)完成的。

先行嘗試在隊(duì)尾添加;

如果尾節(jié)點(diǎn)已經(jīng)有了,然后做如下操作:

分配引用T指向尾節(jié)點(diǎn);

將節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)更新為尾節(jié)點(diǎn)(current.prev = tail);

如果尾節(jié)點(diǎn)是T,那么將當(dāng)尾節(jié)點(diǎn)設(shè)置為該節(jié)點(diǎn)(tail = current,原子更新);

T的后繼節(jié)點(diǎn)指向當(dāng)前節(jié)點(diǎn)(T.next = current)。

注意第3點(diǎn)是要求原子的。

這樣可以以最短路徑O(1)的效果來(lái)完成線(xiàn)程入隊(duì),是最大化減少開(kāi)銷(xiāo)的一種方式。

如果隊(duì)尾添加失敗或者是第一個(gè)入隊(duì)的節(jié)點(diǎn)。

如果是第1個(gè)節(jié)點(diǎn),也就是sync隊(duì)列沒(méi)有初始化,那么會(huì)進(jìn)入到enq這個(gè)方法,進(jìn)入的線(xiàn)程可能有多個(gè),或者說(shuō)在addWaiter中沒(méi)有成功入隊(duì)的線(xiàn)程都將進(jìn)入enq這個(gè)方法。

可以看到enq的邏輯是確保進(jìn)入的Node都會(huì)有機(jī)會(huì)順序的添加到sync隊(duì)列中,而加入的步驟如下:

如果尾節(jié)點(diǎn)為空,那么原子化的分配一個(gè)頭節(jié)點(diǎn),并將尾節(jié)點(diǎn)指向頭節(jié)點(diǎn),這一步是初始化;

然后是重復(fù)在addWaiter中做的工作,但是在一個(gè)while(true)的循環(huán)中,直到當(dāng)前節(jié)點(diǎn)入隊(duì)為止。

進(jìn)入sync隊(duì)列之后,接下來(lái)就是要進(jìn)行鎖的獲取,或者說(shuō)是訪(fǎng)問(wèn)控制了,只有一個(gè)線(xiàn)程能夠在同一時(shí)刻繼續(xù)的運(yùn)行,而其他的進(jìn)入等待狀態(tài)。而每個(gè)線(xiàn)程都是一個(gè)獨(dú)立的個(gè)體,它們自省的觀(guān)察,當(dāng)條件滿(mǎn)足的時(shí)候(自己的前驅(qū)是頭結(jié)點(diǎn)并且原子性的獲取了狀態(tài)),那么這個(gè)線(xiàn)程能夠繼續(xù)運(yùn)行。

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                interrupted = true;
                }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

上述邏輯主要包括:

獲取當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn);

需要獲取當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn),而頭結(jié)點(diǎn)所對(duì)應(yīng)的含義是當(dāng)前站有鎖且正在運(yùn)行。

當(dāng)前驅(qū)節(jié)點(diǎn)是頭結(jié)點(diǎn)并且能夠獲取狀態(tài),代表該當(dāng)前節(jié)點(diǎn)占有鎖;

如果滿(mǎn)足上述條件,那么代表能夠占有鎖,根據(jù)節(jié)點(diǎn)對(duì)鎖占有的含義,設(shè)置頭結(jié)點(diǎn)為當(dāng)前節(jié)點(diǎn)。

否則進(jìn)入等待狀態(tài)。

如果沒(méi)有輪到當(dāng)前節(jié)點(diǎn)運(yùn)行,那么將當(dāng)前線(xiàn)程從線(xiàn)程調(diào)度器上摘下,也就是進(jìn)入等待狀態(tài)。

這里針對(duì)acquire做一下總結(jié):

狀態(tài)的維護(hù);

需要在鎖定時(shí),需要維護(hù)一個(gè)狀態(tài)(int類(lèi)型),而對(duì)狀態(tài)的操作是原子和非阻塞的,通過(guò)同步器提供的對(duì)狀態(tài)訪(fǎng)問(wèn)的方法對(duì)狀態(tài)進(jìn)行操縱,并且利用compareAndSet來(lái)確保原子性的修改。

狀態(tài)的獲?。?/p>

一旦成功的修改了狀態(tài),當(dāng)前線(xiàn)程或者說(shuō)節(jié)點(diǎn),就被設(shè)置為頭節(jié)點(diǎn)。

sync隊(duì)列的維護(hù)。

在獲取資源未果的過(guò)程中條件不符合的情況下(不該自己,前驅(qū)節(jié)點(diǎn)不是頭節(jié)點(diǎn)或者沒(méi)有獲取到資源)進(jìn)入睡眠狀態(tài),停止線(xiàn)程調(diào)度器對(duì)當(dāng)前節(jié)點(diǎn)線(xiàn)程的調(diào)度。

這時(shí)引入的一個(gè)釋放的問(wèn)題,也就是說(shuō)使睡眠中的Node或者說(shuō)線(xiàn)程獲得通知的關(guān)鍵,就是前驅(qū)節(jié)點(diǎn)的通知,而這一個(gè)過(guò)程就是釋放,釋放會(huì)通知它的后繼節(jié)點(diǎn)從睡眠中返回準(zhǔn)備運(yùn)行。

下面的流程圖基本描述了一次acquire所需要經(jīng)歷的過(guò)程:

如上圖所示,其中的判定退出隊(duì)列的條件,判定條件是否滿(mǎn)足和休眠當(dāng)前線(xiàn)程就是完成了自旋spin的過(guò)程。

public final boolean release(int arg)

在unlock方法的實(shí)現(xiàn)中,使用了同步器的release方法。相對(duì)于在之前的acquire方法中可以得出調(diào)用acquire,保證能夠獲取到鎖(成功獲取狀態(tài)),而release則表示將狀態(tài)設(shè)置回去,也就是將資源釋放,或者說(shuō)將鎖釋放。

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

上述邏輯主要包括:

嘗試釋放狀態(tài);

tryRelease能夠保證原子化的將狀態(tài)設(shè)置回去,當(dāng)然需要使用compareAndSet來(lái)保證。如果釋放狀態(tài)成功過(guò)之后,將會(huì)進(jìn)入后繼節(jié)點(diǎn)的喚醒過(guò)程。

喚醒當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn)所包含的線(xiàn)程。

通過(guò)LockSupport的unpark方法將休眠中的線(xiàn)程喚醒,讓其繼續(xù)acquire狀態(tài)。

private void unparkSuccessor(Node node) {
    // 將狀態(tài)設(shè)置為同步狀態(tài)
    int ws = node.waitStatus;
    if (ws < 0)      compareAndSetWaitStatus(node, ws, 0);   // 獲取當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn),如果滿(mǎn)足狀態(tài),那么進(jìn)行喚醒操作  // 如果沒(méi)有滿(mǎn)足狀態(tài),從尾部開(kāi)始找尋符合要求的節(jié)點(diǎn)并將其喚醒     Node s = node.next;     if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
        }
    if (s != null)
        LockSupport.unpark(s.thread);
}

上述邏輯主要包括,該方法取出了當(dāng)前節(jié)點(diǎn)的next引用,然后對(duì)其線(xiàn)程(Node)進(jìn)行了喚醒,這時(shí)就只有一個(gè)或合理個(gè)數(shù)的線(xiàn)程被喚醒,被喚醒的線(xiàn)程繼續(xù)進(jìn)行對(duì)資源的獲取與爭(zhēng)奪。

回顧整個(gè)資源的獲取和釋放過(guò)程:

在獲取時(shí),維護(hù)了一個(gè)sync隊(duì)列,每個(gè)節(jié)點(diǎn)都是一個(gè)線(xiàn)程在進(jìn)行自旋,而依據(jù)就是自己是否是首節(jié)點(diǎn)的后繼并且能夠獲取資源; 在釋放時(shí),僅僅需要將資源還回去,然后通知一下后繼節(jié)點(diǎn)并將其喚醒。

這里需要注意,隊(duì)列的維護(hù)(首節(jié)點(diǎn)的更換)是依靠消費(fèi)者(獲取時(shí))來(lái)完成的,也就是說(shuō)在滿(mǎn)足了自旋退出的條件時(shí)的一刻,這個(gè)節(jié)點(diǎn)就會(huì)被設(shè)置成為首節(jié)點(diǎn)。

protected boolean tryAcquire(int arg)

tryAcquire是自定義同步器需要實(shí)現(xiàn)的方法,也就是自定義同步器非阻塞原子化的獲取狀態(tài),如果鎖該方法一般用于Lock的tryLock實(shí)現(xiàn)中,這個(gè)特性是synchronized無(wú)法提供的。

public final void acquireInterruptibly(int arg)

該方法提供獲取狀態(tài)能力,當(dāng)然在無(wú)法獲取狀態(tài)的情況下會(huì)進(jìn)入sync隊(duì)列進(jìn)行排隊(duì),這類(lèi)似acquire,但是和acquire不同的地方在于它能夠在外界對(duì)當(dāng)前線(xiàn)程進(jìn)行中斷的時(shí)候提前結(jié)束獲取狀態(tài)的操作,換句話(huà)說(shuō),就是在類(lèi)似synchronized獲取鎖時(shí),外界能夠?qū)Ξ?dāng)前線(xiàn)程進(jìn)行中斷,并且獲取鎖的這個(gè)操作能夠響應(yīng)中斷并提前返回。一個(gè)線(xiàn)程處于synchronized塊中或者進(jìn)行同步I/O操作時(shí),對(duì)該線(xiàn)程進(jìn)行中斷操作,這時(shí)該線(xiàn)程的中斷標(biāo)識(shí)位被設(shè)置為true,但是線(xiàn)程依舊繼續(xù)運(yùn)行。

如果在獲取一個(gè)通過(guò)網(wǎng)絡(luò)交互實(shí)現(xiàn)的鎖時(shí),這個(gè)鎖資源突然進(jìn)行了銷(xiāo)毀,那么使用acquireInterruptibly的獲取方式就能夠讓該時(shí)刻嘗試獲取鎖的線(xiàn)程提前返回。而同步器的這個(gè)特性被實(shí)現(xiàn)Lock接口中的lockInterruptibly方法。根據(jù)Lock的語(yǔ)義,在被中斷時(shí),lockInterruptibly將會(huì)拋出InterruptedException來(lái)告知使用者。

public final void acquireInterruptibly(int arg)
    throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}

private void doAcquireInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            // 檢測(cè)中斷標(biāo)志位
            if (shouldParkAfterFailedAcquire(p, node) &&
            parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

上述邏輯主要包括:

檢測(cè)當(dāng)前線(xiàn)程是否被中斷;

判斷當(dāng)前線(xiàn)程的中斷標(biāo)志位,如果已經(jīng)被中斷了,那么直接拋出異常并將中斷標(biāo)志位設(shè)置為false。

嘗試獲取狀態(tài);

調(diào)用tryAcquire獲取狀態(tài),如果順利會(huì)獲取成功并返回。

構(gòu)造節(jié)點(diǎn)并加入sync隊(duì)列;

獲取狀態(tài)失敗后,將當(dāng)前線(xiàn)程引用構(gòu)造為節(jié)點(diǎn)并加入到sync隊(duì)列中。退出隊(duì)列的方式在沒(méi)有中斷的場(chǎng)景下和acquireQueued類(lèi)似,當(dāng)頭結(jié)點(diǎn)是自己的前驅(qū)節(jié)點(diǎn)并且能夠獲取到狀態(tài)時(shí),即可以運(yùn)行,當(dāng)然要將本節(jié)點(diǎn)設(shè)置為頭結(jié)點(diǎn),表示正在運(yùn)行。

中斷檢測(cè)。

在每次被喚醒時(shí),進(jìn)行中斷檢測(cè),如果發(fā)現(xiàn)當(dāng)前線(xiàn)程被中斷,那么拋出InterruptedException并退出循環(huán)。

private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException

該方法提供了具備有超時(shí)功能的獲取狀態(tài)的調(diào)用,如果在指定的nanosTimeout內(nèi)沒(méi)有獲取到狀態(tài),那么返回false,反之返回true??梢詫⒃摲椒醋鯽cquireInterruptibly的升級(jí)版,也就是在判斷是否被中斷的基礎(chǔ)上增加了超時(shí)控制。

針對(duì)超時(shí)控制這部分的實(shí)現(xiàn),主要需要計(jì)算出睡眠的delta,也就是間隔值。間隔可以表示為nanosTimeout = 原有nanosTimeout – now(當(dāng)前時(shí)間)+
lastTime(睡眠之前記錄的時(shí)間)。如果nanosTimeout大于0,那么還需要使當(dāng)前線(xiàn)程睡眠,反之則返回false。

private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
    long lastTime = System.nanoTime();
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            if (nanosTimeout <= 0)               return false;           if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanosTimeout);
            long now = System.nanoTime();
            //計(jì)算時(shí)間,當(dāng)前時(shí)間減去睡眠之前的時(shí)間得到睡眠的時(shí)間,然后被
            //原有超時(shí)時(shí)間減去,得到了還應(yīng)該睡眠的時(shí)間
            nanosTimeout -= now - lastTime;
            lastTime = now;
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

上述邏輯主要包括:

加入sync隊(duì)列;

將當(dāng)前線(xiàn)程構(gòu)造成為節(jié)點(diǎn)Node加入到sync隊(duì)列中。

條件滿(mǎn)足直接返回;

退出條件判斷,如果前驅(qū)節(jié)點(diǎn)是頭結(jié)點(diǎn)并且成功獲取到狀態(tài),那么設(shè)置自己為頭結(jié)點(diǎn)并退出,返回true,也就是在指定的nanosTimeout之前獲取了鎖。

獲取狀態(tài)失敗休眠一段時(shí)間;

通過(guò)LockSupport.unpark來(lái)指定當(dāng)前線(xiàn)程休眠一段時(shí)間。

計(jì)算再次休眠的時(shí)間;

喚醒后的線(xiàn)程,計(jì)算仍需要休眠的時(shí)間,該時(shí)間表示為nanosTimeout = 原有nanosTimeout – now(當(dāng)前時(shí)間)+ lastTime(睡眠之前記錄的時(shí)間)。其中now
– lastTime表示這次睡眠所持續(xù)的時(shí)間。

休眠時(shí)間的判定。

喚醒后的線(xiàn)程,計(jì)算仍需要休眠的時(shí)間,并無(wú)阻塞的嘗試再獲取狀態(tài),如果失敗后查看其nanosTimeout是否大于0,如果小于0,那么返回完全超時(shí),沒(méi)有獲取到鎖。 如果nanosTimeout小于等于1000L納秒,則進(jìn)入快速的自旋過(guò)程。那么快速自旋會(huì)造成處理器資源緊張嗎?結(jié)果是不會(huì),經(jīng)過(guò)測(cè)算,開(kāi)銷(xiāo)看起來(lái)很小,幾乎微乎其微。Doug Lea應(yīng)該測(cè)算了在線(xiàn)程調(diào)度器上的切換造成的額外開(kāi)銷(xiāo),因此在短時(shí)1000納秒內(nèi)就讓當(dāng)前線(xiàn)程進(jìn)入快速自旋狀態(tài),如果這時(shí)再休眠相反會(huì)讓nanosTimeout的獲取時(shí)間變得更加不精確。

上述過(guò)程可以如下圖所示:

上述這個(gè)圖中可以理解為在類(lèi)似獲取狀態(tài)需要排隊(duì)的基礎(chǔ)上增加了一個(gè)超時(shí)控制的邏輯。每次超時(shí)的時(shí)間就是當(dāng)前超時(shí)剩余的時(shí)間減去睡眠的時(shí)間,而在這個(gè)超時(shí)時(shí)間的基礎(chǔ)上進(jìn)行了判斷,如果大于0那么繼續(xù)睡眠(等待),可以看出這個(gè)超時(shí)版本的獲取狀態(tài)只是一個(gè)近似超時(shí)的獲取狀態(tài),因此任何含有超時(shí)的調(diào)用基本結(jié)果就是近似于給定超時(shí)。

public final void acquireShared(int arg)

調(diào)用該方法能夠以共享模式獲取狀態(tài),共享模式和之前的獨(dú)占模式有所區(qū)別。以文件的查看為例,如果一個(gè)程序在對(duì)其進(jìn)行讀取操作,那么這一時(shí)刻,對(duì)這個(gè)文件的寫(xiě)操作就被阻塞,相反,這一時(shí)刻另一個(gè)程序?qū)ζ溥M(jìn)行同樣的讀操作是可以進(jìn)行的。如果一個(gè)程序在對(duì)其進(jìn)行寫(xiě)操作,那么所有的讀與寫(xiě)操作在這一時(shí)刻就被阻塞,直到這個(gè)程序完成寫(xiě)操作。

以讀寫(xiě)場(chǎng)景為例,描述共享和獨(dú)占的訪(fǎng)問(wèn)模式,如下圖所示:

上圖中,紅色代表被阻塞,綠色代表可以通過(guò)。

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)   doAcquireShared(arg); } private void doAcquireShared(int arg) {     final Node node = addWaiter(Node.SHARED);   boolean failed = true;  try {       boolean interrupted = false;        for (;;) {          final Node p = node.predecessor();          if (p == head) {                int r = tryAcquireShared(arg);              if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
            interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

上述邏輯主要包括:

嘗試獲取共享狀態(tài);

調(diào)用tryAcquireShared來(lái)獲取共享狀態(tài),該方法是非阻塞的,如果獲取成功則立刻返回,也就表示獲取共享鎖成功。

獲取失敗進(jìn)入sync隊(duì)列;

在獲取共享狀態(tài)失敗后,當(dāng)前時(shí)刻有可能是獨(dú)占鎖被其他線(xiàn)程所把持,那么將當(dāng)前線(xiàn)程構(gòu)造成為節(jié)點(diǎn)(共享模式)加入到sync隊(duì)列中。

循環(huán)內(nèi)判斷退出隊(duì)列條件;

如果當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)是頭結(jié)點(diǎn)并且獲取共享狀態(tài)成功,這里和獨(dú)占鎖acquire的退出隊(duì)列條件類(lèi)似。

獲取共享狀態(tài)成功;

在退出隊(duì)列的條件上,和獨(dú)占鎖之間的主要區(qū)別在于獲取共享狀態(tài)成功之后的行為,而如果共享狀態(tài)獲取成功之后會(huì)判斷后繼節(jié)點(diǎn)是否是共享模式,如果是共享模式,那么就直接對(duì)其進(jìn)行喚醒操作,也就是同時(shí)激發(fā)多個(gè)線(xiàn)程并發(fā)的運(yùn)行。

獲取共享狀態(tài)失敗。

通過(guò)使用LockSupport將當(dāng)前線(xiàn)程從線(xiàn)程調(diào)度器上摘下,進(jìn)入休眠狀態(tài)。

對(duì)于上述邏輯中,節(jié)點(diǎn)之間的通知過(guò)程如下圖所示:

上圖中,綠色表示共享節(jié)點(diǎn),它們之間的通知和喚醒操作是在前驅(qū)節(jié)點(diǎn)獲取狀態(tài)時(shí)就進(jìn)行的,紅色表示獨(dú)占節(jié)點(diǎn),它的被喚醒必須取決于前驅(qū)節(jié)點(diǎn)的釋放,也就是release操作,可以看出來(lái)圖中的獨(dú)占節(jié)點(diǎn)如果要運(yùn)行,必須等待前面的共享節(jié)點(diǎn)均釋放了狀態(tài)才可以。而獨(dú)占節(jié)點(diǎn)如果獲取了狀態(tài),那么后續(xù)的獨(dú)占式獲取和共享式獲取均被阻塞。

public final boolean releaseShared(int arg)

調(diào)用該方法釋放共享狀態(tài),每次獲取共享狀態(tài)acquireShared都會(huì)操作狀態(tài),同樣在共享鎖釋放的時(shí)候,也需要將狀態(tài)釋放。比如說(shuō),一個(gè)限定一定數(shù)量訪(fǎng)問(wèn)的同步工具,每次獲取都是共享的,但是如果超過(guò)了一定的數(shù)量,將會(huì)阻塞后續(xù)的獲取操作,只有當(dāng)之前獲取的消費(fèi)者將狀態(tài)釋放才可以使阻塞的獲取操作得以運(yùn)行。

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

上述邏輯主要就是調(diào)用同步器的tryReleaseShared方法來(lái)釋放狀態(tài),并同時(shí)在doReleaseShared方法中喚醒其后繼節(jié)點(diǎn)。

一個(gè)例子

在上述對(duì)同步器AbstractQueuedSynchronizer進(jìn)行了實(shí)現(xiàn)層面的分析之后,我們通過(guò)一個(gè)例子來(lái)加深對(duì)同步器的理解:

設(shè)計(jì)一個(gè)同步工具,該工具在同一時(shí)刻,只能有兩個(gè)線(xiàn)程能夠并行訪(fǎng)問(wèn),超過(guò)限制的其他線(xiàn)程進(jìn)入阻塞狀態(tài)。

對(duì)于這個(gè)需求,可以利用同步器完成一個(gè)這樣的設(shè)定,定義一個(gè)初始狀態(tài),為2,一個(gè)線(xiàn)程進(jìn)行獲取那么減1,一個(gè)線(xiàn)程釋放那么加1,狀態(tài)正確的范圍在[0,1,2]三個(gè)之間,當(dāng)在0時(shí),代表再有新的線(xiàn)程對(duì)資源進(jìn)行獲取時(shí)只能進(jìn)入阻塞狀態(tài)(注意在任何時(shí)候進(jìn)行狀態(tài)變更的時(shí)候均需要以CAS作為原子性保障)。

public class TwinsLock implements Lock {
    private static final Sync sync = new Sync();
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -7889272986162341211L;
        {
            setState(2);
        }

        protected boolean tryAcquire(int arg) {
            if (arg != 1) {
                return false;
            }
            int currentStats = getState();
            if (currentStats <= 0) {
                return false;
            }
            if (compareAndSetState(currentStats, currentStats - 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int arg) {
            if (arg != 1) {
                return false;
            }
            for(;;) {
                int currentStats = getState();
                if (compareAndSetState(currentStats, currentStats + 1)) {
                setExclusiveOwnerThread(null);
                return true;
                }
            }
        }

        protected boolean isHeldExclusively() {
                return getState() < 2;
        }
        }

    public void lock() {
        sync.acquire(1);
        }

        public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
        }

    public boolean tryLock() {
        return sync.tryAcquire(1);
        }

        public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(time));
        }

        public void unlock() {
        sync.release(1);
        }
}

這里我們編寫(xiě)一個(gè)測(cè)試來(lái)驗(yàn)證TwinsLock是否能夠正常工作并達(dá)到預(yù)期。

public class TwinsLockTest {

    @Test
    public void test() {
        final Lock lock = new TwinsLock();

        class Worker extends Thread {
            public void run() {
                while (true) {
                    lock.lock();

                    try {
                        Thread.sleep(1000L);
                System.out.println(Thread.currentThread());
                        Thread.sleep(1000L);
                    } catch (Exception ex) {

                    } finally {
                        lock.unlock();
                    }
                }
            }
        }

        for (int i = 0; i < 10; i++) {
            Worker w = new Worker();
            w.start();
        }

        new Thread() {
            public void run() {
                while (true) {

                    try {
                        Thread.sleep(200L);
                        System.out.println();
                    } catch (Exception ex) {

                    }
                }
            }
        }.start();

        try {
            Thread.sleep(20000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

上述測(cè)試用例的邏輯主要包括:

打印線(xiàn)程

Worker在兩次睡眠之間打印自身線(xiàn)程,如果一個(gè)時(shí)刻只能有兩個(gè)線(xiàn)程同時(shí)訪(fǎng)問(wèn),那么打印出來(lái)的內(nèi)容將是成對(duì)出現(xiàn)。

分隔線(xiàn)程

不停的打印換行,能讓W(xué)orker的輸出看起來(lái)更加直觀(guān)。

該測(cè)試的結(jié)果是在一個(gè)時(shí)刻,僅有兩個(gè)線(xiàn)程能夠獲得到鎖,并完成打印,而表象就是打印的內(nèi)容成對(duì)出現(xiàn)。

by 魏鵬 via ifeve

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/64024.html

相關(guān)文章

  • 高并發(fā)

    摘要:表示的是兩個(gè),當(dāng)其中任意一個(gè)計(jì)算完并發(fā)編程之是線(xiàn)程安全并且高效的,在并發(fā)編程中經(jīng)??梢?jiàn)它的使用,在開(kāi)始分析它的高并發(fā)實(shí)現(xiàn)機(jī)制前,先講講廢話(huà),看看它是如何被引入的。電商秒殺和搶購(gòu),是兩個(gè)比較典型的互聯(lián)網(wǎng)高并發(fā)場(chǎng)景。 干貨:深度剖析分布式搜索引擎設(shè)計(jì) 分布式,高可用,和機(jī)器學(xué)習(xí)一樣,最近幾年被提及得最多的名詞,聽(tīng)名字多牛逼,來(lái),我們一步一步來(lái)?yè)羝魄皟蓚€(gè)名詞,今天我們首先來(lái)說(shuō)說(shuō)分布式。 探究...

    supernavy 評(píng)論0 收藏0
  • 高并發(fā)

    摘要:表示的是兩個(gè),當(dāng)其中任意一個(gè)計(jì)算完并發(fā)編程之是線(xiàn)程安全并且高效的,在并發(fā)編程中經(jīng)??梢?jiàn)它的使用,在開(kāi)始分析它的高并發(fā)實(shí)現(xiàn)機(jī)制前,先講講廢話(huà),看看它是如何被引入的。電商秒殺和搶購(gòu),是兩個(gè)比較典型的互聯(lián)網(wǎng)高并發(fā)場(chǎng)景。 干貨:深度剖析分布式搜索引擎設(shè)計(jì) 分布式,高可用,和機(jī)器學(xué)習(xí)一樣,最近幾年被提及得最多的名詞,聽(tīng)名字多牛逼,來(lái),我們一步一步來(lái)?yè)羝魄皟蓚€(gè)名詞,今天我們首先來(lái)說(shuō)說(shuō)分布式。 探究...

    ddongjian0000 評(píng)論0 收藏0
  • 高并發(fā)

    摘要:表示的是兩個(gè),當(dāng)其中任意一個(gè)計(jì)算完并發(fā)編程之是線(xiàn)程安全并且高效的,在并發(fā)編程中經(jīng)??梢?jiàn)它的使用,在開(kāi)始分析它的高并發(fā)實(shí)現(xiàn)機(jī)制前,先講講廢話(huà),看看它是如何被引入的。電商秒殺和搶購(gòu),是兩個(gè)比較典型的互聯(lián)網(wǎng)高并發(fā)場(chǎng)景。 干貨:深度剖析分布式搜索引擎設(shè)計(jì) 分布式,高可用,和機(jī)器學(xué)習(xí)一樣,最近幾年被提及得最多的名詞,聽(tīng)名字多牛逼,來(lái),我們一步一步來(lái)?yè)羝魄皟蓚€(gè)名詞,今天我們首先來(lái)說(shuō)說(shuō)分布式。 探究...

    wangdai 評(píng)論0 收藏0
  • Java 重入鎖 ReentrantLock 原理分析

    摘要:的主要功能和關(guān)鍵字一致,均是用于多線(xiàn)程的同步。而僅支持通過(guò)查詢(xún)當(dāng)前線(xiàn)程是否持有鎖。由于和使用的是同一把可重入鎖,所以線(xiàn)程可以進(jìn)入方法,并再次獲得鎖,而不會(huì)被阻塞住。公平與非公平公平與非公平指的是線(xiàn)程獲取鎖的方式。 1.簡(jiǎn)介 可重入鎖ReentrantLock自 JDK 1.5 被引入,功能上與synchronized關(guān)鍵字類(lèi)似。所謂的可重入是指,線(xiàn)程可對(duì)同一把鎖進(jìn)行重復(fù)加鎖,而不會(huì)被阻...

    lx1036 評(píng)論0 收藏0
  • AbstractQueuedSynchronizer 原理分析 - 獨(dú)占/共享模式

    摘要:簡(jiǎn)介抽象隊(duì)列同步器,以下簡(jiǎn)稱(chēng)出現(xiàn)在中,由大師所創(chuàng)作。獲取成功則返回,獲取失敗,線(xiàn)程進(jìn)入同步隊(duì)列等待。響應(yīng)中斷版的超時(shí)響應(yīng)中斷版的共享式獲取同步狀態(tài),同一時(shí)刻可能會(huì)有多個(gè)線(xiàn)程獲得同步狀態(tài)。 1.簡(jiǎn)介 AbstractQueuedSynchronizer (抽象隊(duì)列同步器,以下簡(jiǎn)稱(chēng) AQS)出現(xiàn)在 JDK 1.5 中,由大師 Doug Lea 所創(chuàng)作。AQS 是很多同步器的基礎(chǔ)框架,比如 ...

    pf_miles 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<