摘要:原因是它支持多線程進行擴容操作,而并沒有加鎖。多線程的情況下如果一個或多個線程正在對進行擴容操作,當前線程也要進入擴容的操作中。
KillCode系列 -- Java篇
原文發(fā)布在我的個人博客中killCode
因為JDK1.8 與 1.7 里對ConcurrentHashMap 有很多不同的更改以提高性能。所以特別找出類似的方面,進行分析。
1. 內部參數//初始容積為 16 private static final int DEFAULT_CAPACITY = 16; //加載因子 0.75 private static final float LOAD_FACTOR = 0.75f; /** * 盛裝Node元素的數組 它的大小是2的整數次冪 * Size is always a power of two. Accessed directly by iterators. */ transient volatile Node[] table; /* * hash表初始化或擴容時的一個控制位標識量。 * 負數代表正在進行初始化或擴容操作 * -1代表正在初始化 * -N 表示有N-1個線程正在進行擴容操作 * 正數或0代表hash表還沒有被初始化,這個數值表示初始化或下一次進行擴容的大小 * * **既代表 HashMap 的 threshold** * 又代表 **進行擴容時的進程數** */ private transient volatile int sizeCtl; // 以下兩個是用來控制擴容的時候 單線程進入的變量 // resize校驗碼 private static int RESIZE_STAMP_BITS = 16; // resize校驗碼的位移量。 private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS; /* * Encodings for Node hash fields. See above for explanation. */ static final int MOVED = -1; // hash值是-1,表示這是一個forwardNode節(jié)點 static final int TREEBIN = -2; // hash值是-2 表示這時一個TreeBin節(jié)點 static final int RESERVED = -3; // hash for transient reservations //在 spread() 方法中 用來對 hashcode 進行 高位hash 減少可能發(fā)生的碰撞。 static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
上面的 sizectl 很重要。是解決 concurrenthashmap 擴容的基礎
2. 內部類 2.1. Node與 HashMap 最大的區(qū)別是 加入了對val 與 next 用了volatile關鍵字修飾
并且 setValue() 方法 直接拋出異常,可以看出,val 是不能直接改變的。
是通過 Unsafe 類的 方法進行全部替換
static class Node2.2 TreeNodeimplements Map.Entry { final int hash; final K key; //相比于 HashMap ,加入了 volatile 關鍵字 volatile V val; volatile Node next; Node(int hash, K key, V val, Node next) { this.hash = hash; this.key = key; this.val = val; this.next = next; } public final K getKey() { return key; } public final V getValue() { return val; } public final int hashCode() { return key.hashCode() ^ val.hashCode(); } public final String toString(){ return key + "=" + val; } public final V setValue(V value) { throw new UnsupportedOperationException(); }
與 HashMap 不同的是
這次 TreeNode 不再是繼承自 LinkedHashMap.Entry 而是繼承自本類中的 Node.
并不直接用于紅黑樹的結點,而是將 結點包裝成 TreeNode 后,用下面的 TreeBin 進行二次包裝。
優(yōu)點是可以使用 Node 類的 next 指針,方便TreeBin 后續(xù) 從 鏈表 到 紅黑樹 的轉換。
構造函數可以看出,原先對TreeNode 的初始化只是設置了其的后續(xù)結點。組成了鏈表。
static final class TreeNode2.3. TreeBinextends Node { TreeNode parent; // red-black tree links TreeNode left; TreeNode right; TreeNode prev; // needed to unlink next upon deletion boolean red; TreeNode(int hash, K key, V val, Node next, TreeNode parent) { super(hash, key, val, next); this.parent = parent; }
特點: 1. 不持有key與val ,指向TreeNode 的 root 與 list。
2. 加入讀寫鎖。方便并發(fā)的訪問。
static final class TreeBinextends Node { TreeNode root; volatile TreeNode first; volatile Thread waiter; //通過鎖的狀態(tài) , 判斷鎖的類型。 volatile int lockState; // values for lockState static final int WRITER = 1; // set while holding write lock static final int WAITER = 2; // set when waiting for write lock static final int READER = 4; // increment value for setting read lock
構造方法如下
root 代表 TreeNode 的根結點
使用first ,是用于第一次初始化時,因為root的特殊性,所以不便于 this.root = b 因此通過 first代替第一次的初始化過程。
然后在 過程中 用r 代表root ,直到結束 紅黑樹的初始化后,再 root =r 保證root的安全性。
TreeBin(TreeNode2.4. ForwardingNodeb) { super(TREEBIN, null, null, null); this.first = b; TreeNode r = null; for (TreeNode x = b, next; x != null; x = next) { next = (TreeNode )x.next; x.left = x.right = null; if (r == null) { x.parent = null; x.red = false; r = x; } else { K k = x.key; int h = x.hash; Class> kc = null; for (TreeNode p = r;;) { int dir, ph; K pk = p.key; if ((ph = p.hash) > h) dir = -1; else if (ph < h) dir = 1; else if ((kc == null && (kc = comparableClassFor(k)) == null) || (dir = compareComparables(kc, k, pk)) == 0) dir = tieBreakOrder(k, pk); TreeNode xp = p; if ((p = (dir <= 0) ? p.left : p.right) == null) { x.parent = xp; if (dir <= 0) xp.left = x; else xp.right = x; r = balanceInsertion(r, x); break; } } } } this.root = r; assert checkInvariants(root); }
作用是在 transfer() 過程中,插入到 TreeBin 之間,用作鏈接作用。
static final class ForwardingNode3. Unsafe 類 與 常用的操作 3.1. Unsafe 與 靜態(tài)代碼塊extends Node { final Node [] nextTable; ForwardingNode(Node [] tab) { super(MOVED, null, null, null); this.nextTable = tab; }
Unsafe提供了硬件級別的原子操作。內部的方法均為 native方法 ,可以訪問系統(tǒng)底層。
這里用了 CAS 算法(compare and swap) 大大的避免了使用時對性能的消耗,以及保證了使用時的安全性。
**注:** CAS 算法的核心是 將需要改變的參數,與內存中已經存在的變量的值進行對比,一致就改變,不一致就放棄這次操作。與之相類似的優(yōu)化操作還有 LL/SC(Load-Linked/Store-Conditional : 加載鏈接/條件存儲) 、 Test-and-Set(測試并設置)
這里額外介紹一下 Unsafe 類的 compareAndSwapInt 方法。
/** * 比較obj的offset處內存位置中的值和期望的值,如果相同則更新。此更新是不可中斷的。 * * @param obj 需要更新的對象 * @param offset obj中整型field的偏移量 * @param expect 希望field中存在的值 * @param update 如果期望值expect與field的當前值相同,設置filed的值為這個新值 * @return 如果field的值被更改返回true */ public native boolean compareAndSwapInt(Object obj, long offset, int expect, int update);
下面是 ConcurrentHashMap 中有關的應用
// Unsafe mechanics private static final sun.misc.Unsafe U; //對應于 類中的 sizectl private static final long SIZECTL; //在 transfer() 方法的使用時,計算索引 private static final long TRANSFERINDEX; // 用于對 ConcurrentHashMap 的 size 統(tǒng)計。 // 下文 第8點關于 size 會說明。 private static final long BASECOUNT; // 輔助類 countercell 類中的屬性,用于分布式計算 // 是實現 java8 中 londAddr 的基礎 private static final long CELLSBUSY; private static final long CELLVALUE; // 用來確定在數組中的位置 // 數組中的偏移地址 private static final long ABASE; // 數組中的增量地址 private static final int ASHIFT; static { try { //通過反射調用 類中的值,從而對 這些變量賦值 U = sun.misc.Unsafe.getUnsafe(); Class> k = ConcurrentHashMap.class; SIZECTL = U.objectFieldOffset (k.getDeclaredField("sizeCtl")); TRANSFERINDEX = U.objectFieldOffset (k.getDeclaredField("transferIndex")); BASECOUNT = U.objectFieldOffset (k.getDeclaredField("baseCount")); CELLSBUSY = U.objectFieldOffset (k.getDeclaredField("cellsBusy")); Class> ck = CounterCell.class; CELLVALUE = U.objectFieldOffset (ck.getDeclaredField("value")); Class> ak = Node[].class; ABASE = U.arrayBaseOffset(ak); int scale = U.arrayIndexScale(ak); if ((scale & (scale - 1)) != 0) throw new Error("data type scale not a power of two"); ASHIFT = 31 - Integer.numberOfLeadingZeros(scale); } catch (Exception e) { throw new Error(e); } }3.2 常用方法
在操作過程中,經常會看到以下幾個,或者相類似的方法。
其核心是
//獲得 i 位置上的 Node 節(jié)點 static final4. 初始化函數 initTableNode tabAt(Node [] tab, int i) { return (Node )U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE); } //利用CAS算法設置i位置上的Node節(jié)點。 static final boolean casTabAt(Node [] tab, int i, Node c, Node v) { return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v); } //利用volatile方法設置節(jié)點位置的值 static final void setTabAt(Node [] tab, int i, Node v) { U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v); }
調用ConcurrentHashMap的構造方法僅僅是設置了一些參數而已,而整個table的初始化是在向ConcurrentHashMap中插入元素的時候發(fā)生的。
當向 map 插入數據的時候 table == null , 則會調用 initTable()方法 。
用 put 方法 簡單展示一下。
final V putVal(K key, V value, boolean onlyIfAbsent) { ... ... for (Node[] tab = table;;) { Node f; int n, i, fh; if (tab == null || (n = tab.length) == 0) tab = initTable(); ... ... }
initTable() 方法展示如下
其中有 sizectl 變量,這里回顧一下
hash表初始化或擴容時的一個控制位標識量。 負數代表正在進行初始化或擴容操作 -1代表正在初始化 -N 表示有N-1個線程正在進行擴容操作 正數或0代表hash表還沒有被初始化,這個數值表示初始化或下一次進行擴容的大小
/** * Initializes table, using the size recorded in sizeCtl. */ private final Node5. transfer() 擴容操作[] initTable() { Node [] tab; int sc; while ((tab = table) == null || tab.length == 0) { //sizeCtl <0 表示有其他線程正在進行初始化操作,把線程掛起。對于table的初始化工作,只能有一個線程在進行。 if ((sc = sizeCtl) < 0) Thread.yield(); // lost initialization race; just spin //利用CAS方法把sizectl的值置為-1 表示本線程正在進行初始化 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node [] nt = (Node [])new Node,?>[n]; table = tab = nt; //相當于0.75*n 設置一個擴容的閾值 // sc = n - n/4 sc = n - (n >>> 2); } } finally { // 更新 sizectl sizeCtl = sc; } break; } } return tab; }
當ConcurrentHashMap容量不足的時候,需要對table進行擴容。這個方法的基本思想跟HashMap是很像的,但是由于它是支持并發(fā)擴容的,所以要復雜的多。原因是它支持多線程進行擴容操作,而并沒有加鎖。我想這樣做的目的不僅僅是為了滿足concurrent的要求,而是希望利用并發(fā)處理去減少擴容帶來的時間影響。因為在擴容的時候,總是會涉及到從一個“數組”到另一個“數組”拷貝的操作,如果這個操作能夠并發(fā)進行,那真真是極好的了。
整個擴容操作分為兩個部分:
1. 第一部分是構建一個nextTable,它的容量是原來的兩倍,這個操作是單線程完成的。這個單線程的保證是通過RESIZE_STAMP_SHIFT這個常量經過一次運算來保證的,這個地方在后面會有提到; 2. 第二個部分就是將原來table中的元素復制到nextTable中,這里允許多線程進行操作。
先來看一下單線程是如何完成的:
它的大體思想就是遍歷、復制的過程。首先根據運算得到需要遍歷的次數i,然后利用tabAt方法獲得i位置的元素:
1. 如果這個位置為空,就在原table中的i位置放入forwardNode節(jié)點,這個也是觸發(fā)并發(fā)擴容的關鍵點; 2. 如果這個位置是Node節(jié)點(fh>=0),就構造兩個鏈表,一個代表高位為 0 , 一個代表高位為 1 。將原來的結點 分別放在nextTable的i和i+n的位置上,并且除了lastRun的位置相對位于鏈表的底部外,其余元素均為 **反序** 。 3. 如果這個位置是TreeBin節(jié)點(fh<0),也做一個處理,并且判斷是否需要untreefi,把處理的結果分別放在nextTable的i和i+n的位置上
遍歷過所有的節(jié)點以后就完成了復制工作,這時讓nextTable作為新的table,并且更新sizeCtl為新容量的0.75倍 ,完成擴容。
再看一下多線程是如何完成的:
//如果遍歷到ForwardingNode節(jié)點 說明這個點已經被處理過了,直接跳過 這里是控制并發(fā)擴容的核心 else if ((fh = f.hash) == MOVED) advance = true; // already processed
這是一個判斷,如果遍歷到的節(jié)點是forward節(jié)點,就向后繼續(xù)遍歷,再加上給節(jié)點上鎖的機制,就完成了多線程的控制。多線程遍歷節(jié)點,處理了一個節(jié)點,就把對應點的值set為forward,另一個線程看到forward,就向后遍歷。這樣交叉就完成了復制工作。而且還很好的解決了線程安全的問題。
如圖:
下面是源碼:
/** * 一個過渡的table表 只有在擴容的時候才會使用 */ private transient volatile Node6. put 方法[] nextTable; /** * Moves and/or copies the nodes in each bin to new table. See * above for explanation. */ private final void transfer(Node [] tab, Node [] nextTab) { int n = tab.length, stride; // 通過計算 NCPU CPU的核心數與 表的大小的比值,將表進行范圍的細分,以方便 并發(fā)。 // 感覺上 有點像 segment 分段鎖的意思。 if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range if (nextTab == null) { // initiating try { //構造一個nextTable對象 它的容量是原來的兩倍。 @SuppressWarnings("unchecked") Node [] nt = (Node []) new Node, ?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME //原來的 容量限制為 1<<30 //HashMap 在擴容時,會用 resize() 方法,擴大 threshold 的值 //當大于 MAXIMUM_CAPACITY 時,會將 threshold 設置為 Integer.MAX_VALUE sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; transferIndex = n; } int nextn = nextTab.length; ForwardingNode fwd = new ForwardingNode (nextTab);//構造一個連節(jié)點指針 用于標志位 boolean advance = true;//并發(fā)擴容的關鍵屬性 如果等于true 說明這個節(jié)點已經處理過 boolean finishing = false; // to ensure sweep before committing nextTab for (int i = 0, bound = 0; ; ) { Node f; int fh; //這個while循環(huán)體的作用就是在控制i遞減 通過i可以依次遍歷原h(huán)ash表中的節(jié)點 while (advance) { int nextIndex, nextBound; if (--i >= bound || finishing) advance = false; else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } if (i < 0 || i >= n || i + n >= nextn) { int sc; if (finishing) { //如果所有的節(jié)點都已經完成復制工作 就把nextTable賦值給table 清空臨時對象nextTable nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>> 1);//擴容閾值設置為原來容量的1.5倍 依然相當于現在容量的0.75倍 return; } //利用CAS方法更新這個擴容閾值,在這里面sizectl值減一,說明新加入一個線程參與到擴容操作 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; finishing = advance = true; i = n; // recheck before commit } } //如果遍歷到的節(jié)點為空 則放入ForwardingNode指針 else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); //如果遍歷到ForwardingNode節(jié)點 說明這個點已經被處理過了,直接跳過 這里是控制并發(fā)擴容的核心 else if ((fh = f.hash) == MOVED) advance = true; // already processed else { //節(jié)點上鎖 synchronized (f) { if (tabAt(tab, i) == f) { Node ln, hn; //如果fh>=0 證明這是一個Node節(jié)點 if (fh >= 0) { // runBit 代表正在 運行的 Node 節(jié)點的 分類 // 因此鏈表根據高位為0或者1分為兩個子鏈表,高位為0的節(jié)點桶位置沒有發(fā)生變化,高位為1的節(jié)點桶位置增加了n, // 所以有setTabAt(nextTab, i, ln);和 setTabAt(nextTab, i + n, hn); // n = 2的冪 。 二進制 0001000 // fh & n = 1. 1000 // 2. 0000 所以劃分出兩個鏈表。 int runBit = fh & n; // lastRun 是正在運行的節(jié)點 Node lastRun = f; //以下的部分在完成的工作是構造兩個鏈表 一個是高位為 0 的鏈表 另一個是高位為 1 的鏈表 // 找出最后一個 與后面的結點不同的 結點 for (Node p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } // 將最后一個 結點保存起來 if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } for (Node p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; //這個鏈表是從低層向上構建 // ln 或 hn = lastRun, 構建一個 node 結點 // 其下一個結點為 lastRun 。 if ((ph & n) == 0) // 構建低位鏈表 ln = new Node (ph, pk, pv, ln); else // 構建高位鏈表 hn = new Node (ph, pk, pv, hn); } //在nextTable的i位置上插入一個鏈表 setTabAt(nextTab, i, ln); //在nextTable的i+n的位置上插入另一個鏈表 setTabAt(nextTab, i + n, hn); //在table的i位置上插入forwardNode節(jié)點 表示已經處理過該節(jié)點 setTabAt(tab, i, fwd); //設置advance為true 返回到上面的while循環(huán)中 就可以執(zhí)行 --i 操作 advance = true; } //對TreeBin對象進行處理 與上面的過程類似 else if (f instanceof TreeBin) { TreeBin t = (TreeBin ) f; TreeNode lo = null, loTail = null; TreeNode hi = null, hiTail = null; int lc = 0, hc = 0; //構造高位和低位兩個鏈表 for (Node e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode p = new TreeNode (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } //如果擴容后已經不再需要tree的結構 反向轉換為鏈表結構 ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin (lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin (hi) : t; //在nextTable的i位置上插入一個鏈表 setTabAt(nextTab, i, ln); //在nextTable的i+n的位置上插入另一個鏈表 setTabAt(nextTab, i + n, hn); //在table的i位置上插入forwardNode節(jié)點 表示已經處理過該節(jié)點 setTabAt(tab, i, fwd); //設置advance為true 返回到上面的while循環(huán)中 就可以執(zhí)行 --i 操作 advance = true; } } } } } }
put方法依然沿用HashMap的put方法的思想,根據hash值計算這個新插入的點在table中的位置i。
注:1. hash = spread(key.hashCode()) 2. spread(int h) --> return (h ^ (h >>> 16)) & HASH_BITS; --> 通過hashCode()的高16位異或低16位優(yōu)化高位運算的算法 3. else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node(hash, key, value, null))) break; // no lock when adding to empty bin }
如果i位置是空的,直接放進去,否則進行判斷,
如果i位置是樹節(jié)點,按照樹的方式插入新的節(jié)點,否則把i插入到鏈表的末尾
不同點:ConcurrentHashMap不允許key或value為null值。
多線程的情況下:
如果一個或多個線程正在對ConcurrentHashMap進行擴容操作,當前線程也要進入擴容的操作中。這個擴容的操作之所以能被檢測到,是因為transfer方法中在空結點上插入forward節(jié)點,如果檢測到需要插入的位置被forward節(jié)點占有,就幫助進行擴容; --> helpTransfer() 方法。
如果檢測到要插入的節(jié)點是非空且不是forward節(jié)點,就對這個節(jié)點加鎖,這樣就保證了線程安全。盡管這個有一些影響效率,但是還是會比hashTable的synchronized要好得多。
首先判斷這個節(jié)點的類型。如果是鏈表節(jié)點(fh>0),則得到的結點就是hash值相同的節(jié)點組成的鏈表的頭節(jié)點。需要依次向后遍歷確定這個新加入的值所在位置。如果遇到hash值與key值都與新加入節(jié)點是一致的情況,則只需要更新value值即可。否則依次向后遍歷,直到鏈表尾插入這個結點。
如果加入這個節(jié)點以后鏈表長度大于8,就把這個鏈表轉換成紅黑樹。
如果這個節(jié)點的類型已經是樹節(jié)點的話,直接調用樹節(jié)點的插入方法進行插入新的值。
源碼如下:
public V put(K key, V value) { return putVal(key, value, false); } final V putVal(K key, V value, boolean onlyIfAbsent) { //不允許 key或value為null if (key == null || value == null) throw new NullPointerException(); //計算hash值 int hash = spread(key.hashCode()); //計算該鏈表 節(jié)點的數量 int binCount = 0; for (Node6.1 helpTransfer() 方法[] tab = table;;) { Node f; int n, i, fh; // 第一次 put 操作的時候初始化,如果table為空的話,初始化table if (tab == null || (n = tab.length) == 0) tab = initTable(); //根據hash值計算出在table里面的位置 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 根據對應的key hash 到具體的索引,如果該索引對應的 Node 為 null,則采用 CAS 操作更新整個 table // 如果這個位置沒有值 ,直接放進去,不需要加鎖 if (casTabAt(tab, i, null, new Node (hash, key, value, null))) break; // no lock when adding to empty bin } //當遇到表連接點時,需要進行整合表的操作 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null; // 結點上鎖,只是對鏈表頭結點作鎖操作 synchronized (f) { if (tabAt(tab, i) == f) { //fh > 0 說明這個節(jié)點是一個鏈表的節(jié)點 不是樹的節(jié)點 if (fh >= 0) { binCount = 1; //在這里遍歷鏈表所有的結點 //并且計算鏈表里結點的數量 for (Node e = f;; ++binCount) { K ek; //如果hash值和key值相同 則修改對應結點的value值 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node pred = e; //如果遍歷到了最后一個結點,那么就證明新的節(jié)點需要插入 就把它插入在鏈表尾部 if ((e = e.next) == null) { // 插入到鏈表尾 pred.next = new Node (hash, key, value, null); break; } } } //如果這個節(jié)點是樹節(jié)點,就按照樹的方式插入值 else if (f instanceof TreeBin) { // 如果是紅黑樹結點,按照紅黑樹的插入 Node p; // 如果為樹節(jié)點, binCount一直為2,不會引發(fā)擴容。 binCount = 2; if ((p = ((TreeBin )f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { // 如果這個鏈表結點達到了臨界值8,那么把這個鏈表轉換成紅黑樹 if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } //將當前ConcurrentHashMap的元素數量+1,table的擴容是在這里發(fā)生的 addCount(1L, binCount); return null; }
出現于 put 方法 如下地點
//當遇到表連接點時,需要進行整合表的操作 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f);
helpTransfer() 方法的源碼如下
final Node6.2 treeifyBin() 方法[] helpTransfer(Node [] tab, Node f) { Node [] nextTab; int sc; // 當前 table 不為 null , 且 f 為 forwardingNode 結點 , 且存在下一張表 if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode )f).nextTable) != null) { int rs = resizeStamp(tab.length);//計算一個擴容校驗碼 // 當 sizeCtl < 0 時,表示有線程在 transfer(). while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) { //正常情況下 sc >>> RESIZE_STAMP_SHIFT == resizeStamp(tab.length); if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0) break; //將 擴容的線程先行減一,表示,這是來輔助 transfer,而非進行 transfer的線程。 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { transfer(tab, nextTab); break; } } return nextTab; } return table; }
涉及變量 MIN_TREEIFY_CAPACITY = 64;
如果數組長度n小于閾值MIN_TREEIFY_CAPACITY,默認是64,則會調用tryPresize方法把數組長度擴大到原來的兩倍,并觸發(fā)transfer方法,重新調整節(jié)點的位置。
出現于 put 方法 如下地點
if (binCount != 0) { // TREEIFY_THRESHOLD 默認為 8. if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; }
其中源碼如下:
private final void treeifyBin(Node6.3 tableSizeFor 方法[] tab, int index) { Node b; int n, sc; if (tab != null) { if ((n = tab.length) < MIN_TREEIFY_CAPACITY) // 將原來的數組擴大為原來的兩倍 tryPresize(n << 1); else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { synchronized (b) { if (tabAt(tab, index) == b) { TreeNode hd = null, tl = null; for (Node e = b; e != null; e = e.next) { TreeNode p = new TreeNode (e.hash, e.key, e.val, null, null); if ((p.prev = tl) == null) hd = p; else tl.next = p; tl = p; } setTabAt(tab, index, new TreeBin (hd)); } } } } }
這里講一個 JDK8 中設計的非常巧妙的算法。看了好久才看懂。
出自 tryPresize 方法中的以下位置
//數組的最大容積為 1<<30 。如果數組大小超過 1<<29 ,則將最大大小設置為 MAXIMUM_CAPACITY //否則,設置為原來的兩倍。 private final void tryPresize(int size) { int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(size + (size >>> 1) + 1);
下面讓我們來分析一下,tableSizeFor()
這個算法的目的,是得出相比較于給定參數,返回一個剛好比參數大的 2次冪 整數。
static final int tableSizeFor(int cap) { int n = cap - 1; n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1; }
先來分析有關n位操作部分:先來假設n的二進制為01xxx...xxx。接著
對n右移1位:001xx...xxx,再位或:011xx...xxx
對n右移2為:00011...xxx,再位或:01111...xxx
此時前面已經有四個1了,再右移4位且位或可得8個1
同理,有8個1,右移8位肯定會讓后八位也為1。
綜上可得,該算法讓最高位的1后面的位全變?yōu)?。
最后再讓結果n+1,即得到了2的整數次冪的值了。
現在回來看看第一條語句:
int n = cap - 1;
讓cap-1再賦值給n的目的是另找到的目標值大于或等于原值。例如二進制1000,十進制數值為8。如果不對它減1而直接操作,將得到答案10000,即16。顯然不是結果。減1后二進制為111,再進行操作則會得到原來的數值1000,即8。
引用自(http://www.cnblogs.com/loadin...
7. get 方法通過 key值 搜索 value 值。
并且要 通過分辨 結點的種類,進行不同形式的尋找。
public V get(Object key) { Node8. Size相關[] tab; Node e, p; int n, eh; K ek; //計算hash值 int h = spread(key.hashCode()); //根據hash值確定節(jié)點位置 if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { //如果搜索到的節(jié)點key與傳入的key相同且不為null,直接返回這個節(jié)點 if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } //如果eh<0 說明這個節(jié)點在樹上 直接尋找 else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null; //否則遍歷鏈表 找到對應的值并返回 while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }
《并發(fā)編程實戰(zhàn)》中有提到,size返回的結果在計算時可能已經過期了,它實際上只是一個估計值,因此允許size返回一個近似值,而不是一個精確值。
8.1 CounterCell 類從注釋中可以看出,這是從 LongAdder 類中的思想,拷貝過來的一個類。
LongAdder 類 是 JDK 1.8 新引進的類,其思想:
多個線程持有自己的加數(cell),線程個數增加時,會自動提供新的加數。 當所有工作做完后,再提供新的加數。
有時間寫一篇相關的源碼分析~ 逃~
不過,這里一樣不能精確統(tǒng)計,這里的 CounterCell 等同于 LongAdder.Cell sumCount() 等同于 LongAdder.sum()方法。
執(zhí)行邏輯是一樣的。
就 LongAdder 類中的 sum 方法所說, 當有線程在運行時,一樣只是估計值,只有當所有線程執(zhí)行完畢,才是實際值。
而統(tǒng)計 Size ,不能夠像垃圾清除一樣,有 Safe point 或 Safe region ,所以,這個假設不成立。。。
其相關的源碼如下。
/** * A padded cell for distributing counts. Adapted from LongAdder * and Striped64. See their internal docs for explanation. */ @sun.misc.Contended static final class CounterCell { volatile long value; CounterCell(long x) { value = x; } } //執(zhí)行邏輯 final long sumCount() { CounterCell[] as = counterCells; CounterCell a; long sum = baseCount; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } return sum; }8.2 mappingCount 方法
就官方文檔中所說, mappingCount 方法,應該取代 size 方法,
但這個方法得出的值一樣在線程運行的時候,只是一個估計的值。
從源碼中就可以看出,使用的是上文分析的 sumCount() 方法。
public long mappingCount() { long n = sumCount(); return (n < 0L) ? 0L : n; // ignore transient negative values }8.3 addCount 方法
出自于 put 方法的如下位置
//將當前ConcurrentHashMap的元素數量+1 addCount(1L, binCount); return null; }
統(tǒng)計上:
這里用到 CounterCell類,并且統(tǒng)計的值的計算一樣是采用的 sumCount() 方法。
所以缺點如上,不再闡述。
擴容上:
邏輯與 helpTransfer() 類似,都是判斷是否有多個線程在執(zhí)行擴容,然后判斷是否需要輔助 transfer();
源碼如下
private final void addCount(long x, int check) { //用到了 CounterCell 類 CounterCell[] as; long b, s; //利用CAS方法更新baseCount的值 if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { fullAddCount(x, uncontended); return; } if (check <= 1) return; s = sumCount(); } //如果check值大于等于0 則需要檢驗是否需要進行擴容操作 //下面的邏輯與 helpTransfer() 類似,可以與 helpTransfer() 一起參考。 if (check >= 0) { Node[] tab, nt; int n, sc; while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { int rs = resizeStamp(n); //如果已經有其他線程在執(zhí)行擴容操作 if (sc < 0) { //校驗失效,直接退出。 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } //當前線程是唯一的或是第一個發(fā)起擴容的線程 此時nextTable=null else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); s = sumCount(); } } }
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規(guī)行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://systransis.cn/yun/70282.html
摘要:如果沖突了,同步頭節(jié)點,進行鏈表操作,如果鏈表長度達到,分成紅黑樹。每次加入一個線程都會將的低位加一。擴容最大的幫助線程是,這是低位的最大值限制的。線程處理完之后,如果沒有可選區(qū)間,且任務沒有完成,就會將整個表檢查一遍,防止遺漏。 前言 每一次總結都意味著重新開始,同時也是為了更好的開始。ConcurrentHashMap 一直是我心中的痛。雖然不敢說完全讀懂了,但也看了幾個重要的方法...
摘要:如果使用類,則代表該類所在的包為相對路徑的起點。雖然并沒有針對其性能作出確切的比較,不過就現有的網絡統(tǒng)計來說,類在使用中大都會比類快并且在最新的中,也是用代替了的操作,相關的文章請參考。綜上,推薦使用類替代類。 前言 最近做項目的時候,用java獲取文件。雖然用框架很容易,但是其內部的原理讓我很疑惑。在自己寫相似的代碼的時候,往往會出現各種各樣的錯誤。所以這里,對相關的類以及方法進行一...
摘要:簡介是的線程安全版本,內部也是使用數組鏈表紅黑樹的結構來存儲元素。相比于同樣線程安全的來說,效率等各方面都有極大地提高。中的關鍵字,內部實現為監(jiān)視器鎖,主要是通過對象監(jiān)視器在對象頭中的字段來表明的。 簡介 ConcurrentHashMap是HashMap的線程安全版本,內部也是使用(數組 + 鏈表 + 紅黑樹)的結構來存儲元素。 相比于同樣線程安全的HashTable來說,效率等各方...
ConcurrentHashMap源碼分析_JDK1.8版本 聲明 文章均為本人技術筆記,轉載請注明出處[1] https://segmentfault.com/u/yzwall[2] blog.csdn.net/j_dark/ JDK1.6版本 ConcurrentHashMap結構 showImg(https://segmentfault.com/img/remote/146000000900...
摘要:那么,如果之后不是簡單的操作,而是還有其它業(yè)務操作,之后才是,比如下面這樣,這該怎么辦呢其它業(yè)務操作這時候就沒辦法使用提供的方法了,只能業(yè)務自己來保證線程安全了,比如下面這樣其它業(yè)務操作這樣雖然不太友好,但是最起碼能保證業(yè)務邏輯是正確的。 刪除元素 刪除元素跟添加元素一樣,都是先找到元素所在的桶,然后采用分段鎖的思想鎖住整個桶,再進行操作。 public V remove(Objec...
閱讀 2906·2021-09-22 15:54
閱讀 1899·2019-08-30 15:53
閱讀 2252·2019-08-29 16:33
閱讀 1429·2019-08-29 12:29
閱讀 1401·2019-08-26 11:41
閱讀 2380·2019-08-26 11:34
閱讀 2966·2019-08-23 16:12
閱讀 1431·2019-08-23 15:56