前面已經(jīng)說明了HashMap以及紅黑樹的一些基本知識(shí),對(duì)JDK8的HashMap也有了一定的了解,本篇就開始看看并發(fā)包下的ConcurrentHashMap,說實(shí)話,還是比較復(fù)雜的,筆者在這里也不會(huì)過多深入,源碼層次上了解一些主要流程即可,清楚多線程環(huán)境下整個(gè)Map的運(yùn)作過程就算是很大進(jìn)步了,更細(xì)的底層部分需要時(shí)間和精力來研究,暫不深入
前言jdk版本:1.8
JDK7中,ConcurrentHashMap把內(nèi)部細(xì)分成了若干個(gè)小的HashMap,稱之為段(Segment),默認(rèn)被分為16個(gè)段。多線程寫操作對(duì)每個(gè)段進(jìn)行加鎖,段與段之間互不影響。而JDK8則拋棄了這種結(jié)構(gòu),類似HashMap,多線程下為了保證線程安全,通過CAS和synchronized進(jìn)行并發(fā)控制,降低鎖顆粒度,性能上也就提高許多
同時(shí)由于降低鎖粒度,同時(shí)需要兼顧讀操作的正確性,增加了許多內(nèi)部類來幫助完成并發(fā)控制,保證讀操作的正確執(zhí)行,同時(shí)支持了并發(fā)擴(kuò)容操作,算是相當(dāng)復(fù)雜了,由于過于復(fù)雜,對(duì)ConcurrentHashMap的說明將分為兩章說明,本章就對(duì)ConcurrentHashMap的常量,變量,內(nèi)部類和構(gòu)造方法進(jìn)行說明,下一章將重點(diǎn)分析其中的重要方法
這里先提前說明下,有個(gè)整體印象:
ConcurrentHashMap結(jié)構(gòu)上類似HashMap,即數(shù)組+鏈表+紅黑樹
鎖顆粒度降低,復(fù)雜度提升
多線程并發(fā)擴(kuò)容
多線程下保證讀操作正確性
計(jì)數(shù)方式處理:分段處理
函數(shù)式編程(不是本文重點(diǎn),自行查閱)
類定義public class ConcurrentHashMapextends AbstractMap implements ConcurrentMap , Serializable
繼承AbstractMap,實(shí)現(xiàn)了ConcurrentMap接口,ConcurrentMap也可以看看源碼,加入了并發(fā)操作方法,是一個(gè)實(shí)現(xiàn)了并發(fā)訪問的集合接口
常量有些常量和變量可能不是很好理解,在后邊到方法時(shí)會(huì)盡量詳細(xì)說明
/** * 最大容量 */ private static final int MAXIMUM_CAPACITY = 1 << 30; /** * 默認(rèn)初始化容量,同HashMap,必須為2的倍數(shù) */ private static final int DEFAULT_CAPACITY = 16; /** * 可能達(dá)到的最大的數(shù)組大小值(非2的次冪),2的31次方-8 */ static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; /** * 默認(rèn)并發(fā)級(jí)別,新版本無用,為了兼容舊版本,使用的地方在序列化方法writeObject中 */ private static final int DEFAULT_CONCURRENCY_LEVEL = 16; /** * 負(fù)載因子,只是為了兼容性 * 構(gòu)造方法里指定的負(fù)載因子只會(huì)影響初始化的table容量 * 一般也不使用浮點(diǎn)數(shù)計(jì)算 * ConcurrentHashMap不會(huì)使用這個(gè)常量,而使用類似 n -(n >>> 2) 的方式來進(jìn)行調(diào)整大小 */ private static final float LOAD_FACTOR = 0.75f; /** * 樹化閾值 * 同HashMap */ static final int TREEIFY_THRESHOLD = 8; /** * 調(diào)整大小時(shí)樹轉(zhuǎn)化為鏈表的閾值 */ static final int UNTREEIFY_THRESHOLD = 6; /** * 可以轉(zhuǎn)化為紅黑樹的最小數(shù)組容量,即調(diào)整為紅黑樹時(shí)數(shù)組長(zhǎng)度最小值必須為MIN_TREEIFY_CAPACITY * 如果bin包含太多節(jié)點(diǎn),則會(huì)調(diào)整表的大小 * 該值應(yīng)至少為4 * TREEIFY_THRESHOLD,避免擴(kuò)容和樹化閾值之間的沖突。 */ static final int MIN_TREEIFY_CAPACITY = 64; /** * * 擴(kuò)容的每個(gè)線程每次最少要遷移16個(gè)hash桶 * 每個(gè)線程都可參與遷移任務(wù),每個(gè)線程至少要連續(xù)遷移MIN_TRANSFER_STRIDE個(gè)hash桶 * 幫助擴(kuò)容提高了效率,當(dāng)然復(fù)雜性也提高了很多,要處理的事情更多 */ private static final int MIN_TRANSFER_STRIDE = 16; /** * 與移位量和最大線程數(shù)相關(guān) * 先了解就好,后邊涉及到方法會(huì)進(jìn)行說明 */ private static int RESIZE_STAMP_BITS = 16; /** * 幫助擴(kuò)容的最大線程數(shù) */ private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1; /** * 移位量 */ private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS; /** * Node hash值編碼,規(guī)定了各自的含義 */ // forwarding nodes節(jié)點(diǎn)hash值 // 臨時(shí)節(jié)點(diǎn),擴(kuò)容時(shí)出現(xiàn),不存儲(chǔ)實(shí)際數(shù)據(jù) // 如果舊數(shù)組的一個(gè)hash桶中全部的節(jié)點(diǎn)都遷移到新數(shù)組中,舊數(shù)組就在這個(gè)hash桶中放置一個(gè)ForwardingNode // 讀操作遇見該節(jié)點(diǎn)時(shí),轉(zhuǎn)到新的table數(shù)組上執(zhí)行,寫操作遇見時(shí),則幫助擴(kuò)容 static final int MOVED = -1; // hash for forwarding nodes // TREEBIN節(jié)點(diǎn) // TreeBin是ConcurrentHashMap中用于代理操作TreeNode的特殊節(jié)點(diǎn) // 保存實(shí)際的紅黑樹根節(jié)點(diǎn),在紅黑樹插入節(jié)點(diǎn)時(shí)會(huì)對(duì)讀操作造成影響,該對(duì)象維護(hù)了一個(gè)讀寫鎖來保證多線程的正確性 static final int TREEBIN = -2; // hash for roots of trees // ReservationNode節(jié)點(diǎn)hash值 // 保留節(jié)點(diǎn),JDK8的新特性會(huì)用到,這里不過多說明 static final int RESERVED = -3; // hash for transient reservations // 負(fù)數(shù)轉(zhuǎn)正數(shù),定位hash桶時(shí)用到了,負(fù)數(shù)Hash值有特殊含義,具體看后邊 static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash // CPU數(shù)量,限制邊界,計(jì)算一個(gè)線程需要做多少遷移量 static final int NCPU = Runtime.getRuntime().availableProcessors(); // 序列化兼容性 private static final ObjectStreamField[] serialPersistentFields = { new ObjectStreamField("segments", Segment[].class), new ObjectStreamField("segmentMask", Integer.TYPE), new ObjectStreamField("segmentShift", Integer.TYPE) };變量
/** * The array of bins. Lazily initialized upon first insertion. * Size is always a power of two. Accessed directly by iterators. * * volatile保證可見性 * Node類和HashMap中的類似,val和next屬性通過volatile保證可見性 */ transient volatile Node[] table; /** * The next table to use; non-null only while resizing. * * 擴(kuò)容時(shí)使用的數(shù)組,只在擴(kuò)容時(shí)非空,擴(kuò)容時(shí)會(huì)創(chuàng)建 */ private transient volatile Node [] nextTable; /** * Table initialization and resizing control. When negative, the * table is being initialized or resized: -1 for initialization, * else -(1 + the number of active resizing threads). Otherwise, * when table is null, holds the initial table size to use upon * creation, or 0 for default. After initialization, holds the * next element count value upon which to resize the table. * * sizeCtl = -1,表示有線程在進(jìn)行初始化操作 * sizeCtl < 0且不為-1表示有多個(gè)線程正在進(jìn)行擴(kuò)容操作,jdk源碼解釋部分感覺有點(diǎn)問題 * 每次第一個(gè)線程參與擴(kuò)容時(shí),會(huì)將sizeCtl設(shè)置為一個(gè)與當(dāng)前table長(zhǎng)度相關(guān)的數(shù)值,避免出現(xiàn)問題,講解方法時(shí)進(jìn)行說明 * sizeCtl > 0,表示第一次初始化操作中使用的容量,或者初始化/擴(kuò)容完成后的閾值 * sizeCtl = 0,默認(rèn)值,此時(shí)在真正的初始化操作中使用默認(rèn)容量 */ private transient volatile int sizeCtl; /** * 多線程幫助擴(kuò)容相關(guān) * 下一個(gè)transfer任務(wù)的起始下標(biāo)index + 1 的值 * transfer時(shí)下標(biāo)index從length - 1到0遞減 * 擴(kuò)容index從后往前和迭代從前往后為了避免沖突 */ private transient volatile int transferIndex; /** * Base counter value, used mainly when there is no contention, * but also as a fallback during table initialization * races. Updated via CAS. * * 計(jì)數(shù)器基礎(chǔ)值,記錄元素個(gè)數(shù),通過CAS操作進(jìn)行更新 */ private transient volatile long baseCount; /** * Spinlock (locked via CAS) used when resizing and/or creating CounterCells. * * CAS自旋鎖標(biāo)志位,counterCells擴(kuò)容或初始化時(shí)使用 */ private transient volatile int cellsBusy; /** * Table of counter cells. When non-null, size is a power of 2. * * 高并發(fā)下計(jì)數(shù)數(shù)組,counterCells數(shù)組非空時(shí)大小是2的n次冪 */ private transient volatile CounterCell[] counterCells; // views private transient KeySetView keySet; private transient ValuesView values; private transient EntrySetView entrySet;
同時(shí)需要注意靜態(tài)代碼塊中已經(jīng)獲取了一些變量在對(duì)象中的內(nèi)存偏移量,這個(gè)是為了方便我們?cè)贑AS中的使用,如果不明白CAS,請(qǐng)自行查閱資料學(xué)習(xí),源碼如下:
// Unsafe mechanics private static final sun.misc.Unsafe U; private static final long SIZECTL; private static final long TRANSFERINDEX; private static final long BASECOUNT; private static final long CELLSBUSY; private static final long CELLVALUE; private static final long ABASE; private static final int ASHIFT; static { try { U = sun.misc.Unsafe.getUnsafe(); Class> k = ConcurrentHashMap.class; // 獲取sizeCtl在對(duì)象中的內(nèi)存偏移量,下同 SIZECTL = U.objectFieldOffset (k.getDeclaredField("sizeCtl")); TRANSFERINDEX = U.objectFieldOffset (k.getDeclaredField("transferIndex")); BASECOUNT = U.objectFieldOffset (k.getDeclaredField("baseCount")); CELLSBUSY = U.objectFieldOffset (k.getDeclaredField("cellsBusy")); Class> ck = CounterCell.class; CELLVALUE = U.objectFieldOffset (ck.getDeclaredField("value")); Class> ak = Node[].class; ABASE = U.arrayBaseOffset(ak); int scale = U.arrayIndexScale(ak); if ((scale & (scale - 1)) != 0) throw new Error("data type scale not a power of two"); ASHIFT = 31 - Integer.numberOfLeadingZeros(scale); } catch (Exception e) { throw new Error(e); } }內(nèi)部類
ConcurrentHashMap中增加了許多內(nèi)部類來幫助完成并發(fā)下的操作
Node普通Entry,與HashMap.Node類似,不同主要在于val和next屬性設(shè)置為了volatile,同時(shí)不支持setValue方法,直接拋錯(cuò),增加了find方法
TreeNode繼承自Node節(jié)點(diǎn),與HashMap.TreeNode類似,之前文章專門介紹過這個(gè)內(nèi)部類,可以去看看,同時(shí)需要說明的是,在ConcurrentHashMap中并不是直接操作TreeNode節(jié)點(diǎn),而是通過TreeBin來代理操作的
TreeBin代理操作TreeNode節(jié)點(diǎn),保存紅黑樹的根節(jié)點(diǎn),Hash值固定為-2,構(gòu)造方法里傳入對(duì)應(yīng)的節(jié)點(diǎn)創(chuàng)建一棵紅黑樹。故在數(shù)組中保存的hash桶為紅黑樹結(jié)構(gòu)時(shí),在數(shù)組上的節(jié)點(diǎn)類型為TreeBin,該節(jié)點(diǎn)類自帶讀寫鎖
static final class TreeBinTraverserextends Node { // 保存樹的根節(jié)點(diǎn) TreeNode root; // 鏈表頭節(jié)點(diǎn) volatile TreeNode first; // 最近一次waiter狀態(tài)的線程 volatile Thread waiter; // 鎖狀態(tài) volatile int lockState; // values for lockState // 寫鎖,二進(jìn)制001 static final int WRITER = 1; // set while holding write lock // 等待獲取寫鎖的狀態(tài),二進(jìn)制010 static final int WAITER = 2; // set when waiting for write lock // 讀鎖狀態(tài),二進(jìn)制100 static final int READER = 4; // increment value for setting read lock /** * 紅黑樹結(jié)構(gòu)重構(gòu)時(shí)需要獲取寫鎖 */ private final void lockRoot() { // CAS嘗試獲取寫鎖 if (!U.compareAndSwapInt(this, LOCKSTATE, 0, WRITER)) // 當(dāng)線程獲取到寫鎖時(shí),才會(huì)返回 contendedLock(); // offload to separate method } /** * 釋放寫鎖 */ private final void unlockRoot() { lockState = 0; } /** * 寫線程不斷嘗試獲取寫鎖,出現(xiàn)的幾種情況的處理,這里不會(huì)有寫線程和寫線程競(jìng)爭(zhēng)的情況出現(xiàn) * 在寫線程進(jìn)入這個(gè)方法時(shí),這個(gè)紅黑樹對(duì)應(yīng)的桶已經(jīng)被外部鎖住,不會(huì)讓寫線程進(jìn)入,故不需要考慮寫寫競(jìng)爭(zhēng)的情況 * 故當(dāng)前線程為寫線程,則判斷讀線程即可 */ private final void contendedLock() { boolean waiting = false; for (int s;;) { // 取反與操作,其他線程未持有讀鎖 if (((s = lockState) & ~WAITER) == 0) { // CAS嘗試修改狀態(tài)為寫鎖狀態(tài) if (U.compareAndSwapInt(this, LOCKSTATE, s, WRITER)) { // 當(dāng)前線程曾經(jīng)處于waiting狀態(tài),將其清除 if (waiting) waiter = null; return; } } // 有讀線程 else if ((s & WAITER) == 0) { // 當(dāng)前線程置為wait狀態(tài),waiter記錄線程 if (U.compareAndSwapInt(this, LOCKSTATE, s, s | WAITER)) { waiting = true; waiter = Thread.currentThread(); } } // 當(dāng)前線程處于waiting狀態(tài) else if (waiting) // 阻塞線程 LockSupport.park(this); } } /** * 從根節(jié)點(diǎn)開始進(jìn)行遍歷,查找到對(duì)應(yīng)匹配的節(jié)點(diǎn) * 同時(shí)需要注意多線程環(huán)境下如果有寫鎖則通過鏈表結(jié)構(gòu)(不是用紅黑樹結(jié)構(gòu)查詢)進(jìn)行查詢 */ final Node find(int h, Object k) { if (k != null) { // 從first開始 for (Node e = first; e != null; ) { int s; K ek; // 1.有寫線程操作,通過鏈表查詢不進(jìn)行阻塞 // 2.WAITER狀態(tài) if (((s = lockState) & (WAITER|WRITER)) != 0) { if (e.hash == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; e = e.next; } // 讀線程累加 else if (U.compareAndSwapInt(this, LOCKSTATE, s, s + READER)) { TreeNode r, p; try { p = ((r = root) == null ? null : r.findTreeNode(h, k, null)); } finally { Thread w; // 當(dāng)線程為最后一個(gè)讀線程并且有寫鎖被阻塞,此時(shí)應(yīng)該通知阻塞的寫線程重新嘗試獲得寫鎖 // getAndAddInt更新之后返回舊值 if (U.getAndAddInt(this, LOCKSTATE, -READER) == (READER|WAITER) && (w = waiter) != null) // 喚醒阻塞的線程 LockSupport.unpark(w); } return p; } } } return null; } /** * 大部分與HashMap.TreeNode中的putTreeVal操作類似 * 這里只說下不同的部分 * 多線程環(huán)境下主要是在平衡時(shí)加鎖操作,防止讀線程操作時(shí)樹結(jié)構(gòu)在變化 */ final TreeNode putTreeVal(int h, K k, V v) { ...... TreeNode xp = p; if ((p = (dir <= 0) ? p.left : p.right) == null) { TreeNode x, f = first; first = x = new TreeNode (h, k, v, f, xp); if (f != null) f.prev = x; if (dir <= 0) xp.left = x; else xp.right = x; // 新添加節(jié)點(diǎn)父節(jié)點(diǎn)為黑色,添加節(jié)點(diǎn)置為紅色即可保證平衡 if (!xp.red) x.red = true; else { //在進(jìn)行插入平衡操作時(shí)會(huì)對(duì)讀線程造成影響,需要加鎖,讓讀線程以鏈表方式進(jìn)行查詢操作 lockRoot(); try { root = balanceInsertion(root, x); } finally { unlockRoot(); } } break; } } assert checkInvariants(root); return null; } /** * 刪除時(shí)修改鏈表關(guān)系和刪除平衡操作時(shí)需要添加鎖,防止讀線程讀取錯(cuò)誤 * 代碼操作參考HashMap.TreeNode,這里不再詳細(xì)說明了 */ final boolean removeTreeNode(TreeNode p) { ...... lockRoot(); try { TreeNode replacement; TreeNode pl = p.left; TreeNode pr = p.right; if (pl != null && pr != null) { TreeNode s = pr, sl; while ((sl = s.left) != null) // find successor s = sl; boolean c = s.red; s.red = p.red; p.red = c; // swap colors TreeNode sr = s.right; TreeNode pp = p.parent; if (s == pr) { // p was s"s direct parent p.parent = s; s.right = p; } else { TreeNode sp = s.parent; if ((p.parent = sp) != null) { if (s == sp.left) sp.left = p; else sp.right = p; } if ((s.right = pr) != null) pr.parent = s; } p.left = null; if ((p.right = sr) != null) sr.parent = p; if ((s.left = pl) != null) pl.parent = s; if ((s.parent = pp) == null) r = s; else if (p == pp.left) pp.left = s; else pp.right = s; if (sr != null) replacement = sr; else replacement = p; } else if (pl != null) replacement = pl; else if (pr != null) replacement = pr; else replacement = p; if (replacement != p) { TreeNode pp = replacement.parent = p.parent; if (pp == null) r = replacement; else if (p == pp.left) pp.left = replacement; else pp.right = replacement; p.left = p.right = p.parent = null; } root = (p.red) ? r : balanceDeletion(r, replacement); if (p == replacement) { // detach pointers TreeNode pp; if ((pp = p.parent) != null) { if (p == pp.left) pp.left = null; else if (p == pp.right) pp.right = null; p.parent = null; } } } finally { unlockRoot(); } assert checkInvariants(root); return false; }
首先需要明白的是在并發(fā)下遍歷讀操作和擴(kuò)容操作同時(shí)進(jìn)行時(shí),讀操作有可能不會(huì)正確的進(jìn)行,而為了解決這個(gè)問題,引入了Traverser這個(gè)內(nèi)部類來完成并發(fā)操作下的讀操作。在涉及到讀取所有hash桶時(shí),比如containsValue操作,不能使用HashMap的方式進(jìn)行,一個(gè)一個(gè)hash桶遍歷進(jìn)行讀取,此時(shí)擴(kuò)容時(shí),ConcurrentHashMap會(huì)在轉(zhuǎn)移完一個(gè)hash桶后將一個(gè)ForwardingNode節(jié)點(diǎn)填入,如果還是非并發(fā)遍歷,導(dǎo)致這個(gè)hash桶將不會(huì)被遍歷到,正確的處理方式也就是遇到ForwardingNode節(jié)點(diǎn)時(shí)將當(dāng)前數(shù)組信息和索引保存,然后在ForwardingNode節(jié)點(diǎn)的nextTable數(shù)組(也就是擴(kuò)容后的數(shù)組)上進(jìn)行遍歷。這里還使用了棧結(jié)構(gòu)TableStack,是為了處理多個(gè)ForwardingNode節(jié)點(diǎn)遍歷的情況,同時(shí)需要注意的是,參考HashMap,在nextTable上需要遍歷舊的hash桶和新的對(duì)應(yīng)的擴(kuò)容hash桶(index+size)
static class Traverser{ // 當(dāng)前table Node [] tab; // current table; updated if resized // 下一個(gè)entry Node next; // the next entry to use // 保存/恢復(fù)ForwardingNodes TableStack stack, spare; // to save/restore on ForwardingNodes // 下一個(gè)遍歷的hash桶index int index; // index of bin to use next // 開始index int baseIndex; // current index of initial table // 結(jié)尾index int baseLimit; // index bound for initial table // 數(shù)組長(zhǎng)度 final int baseSize; // initial table size Traverser(Node [] tab, int size, int index, int limit) { this.tab = tab; this.baseSize = size; this.baseIndex = this.index = index; this.baseLimit = limit; this.next = null; } /** * 遍歷到下一個(gè)有數(shù)據(jù)的節(jié)點(diǎn)并返回,無則返回null */ final Node advance() { Node e; // next值不為空,則直接獲取其下一個(gè)節(jié)點(diǎn) if ((e = next) != null) e = e.next; for (;;) { Node [] t; int i, n; // must use locals in checks // e非空則直接返回當(dāng)前值e并將next賦值為e便于下次再次調(diào)用該方法使用 if (e != null) return next = e; // 判斷邊界同時(shí)做了一些賦值操作 // 數(shù)組 t = tab // 數(shù)組長(zhǎng)度 n = t.length // 數(shù)組index i = index // 不滿足直接返回null if (baseIndex >= baseLimit || (t = tab) == null || (n = t.length) <= (i = index) || i < 0) return next = null; // e賦值為t的第i處hash桶節(jié)點(diǎn)值,判斷是否為特殊節(jié)點(diǎn) if ((e = tabAt(t, i)) != null && e.hash < 0) { // ForwardingNode節(jié)點(diǎn)說明正在擴(kuò)容,此時(shí)遍歷讀需要轉(zhuǎn)移到擴(kuò)容數(shù)組上進(jìn)行 if (e instanceof ForwardingNode) { // tab指向擴(kuò)容數(shù)組 tab = ((ForwardingNode )e).nextTable; // e置空,繼續(xù)循環(huán) e = null; // 保存此時(shí)的未擴(kuò)容前的數(shù)組狀態(tài),相當(dāng)于入棧操作 pushState(t, i, n); continue; } // TreeBin節(jié)點(diǎn),e置為first節(jié)點(diǎn) else if (e instanceof TreeBin) e = ((TreeBin )e).first; else // 保留節(jié)點(diǎn) e = null; } if (stack != null) // 出棧操作,恢復(fù)tab到未擴(kuò)容之前的數(shù)組繼續(xù)遍歷 recoverState(n); else if ((index = i + baseSize) >= n) // 更新為下一個(gè)hash桶索引 index = ++baseIndex; // visit upper slots if present } } /** * 入棧操作,保存當(dāng)前數(shù)組的狀態(tài) */ private void pushState(Node [] t, int i, int n) { TableStack s = spare; // reuse if possible if (s != null) spare = s.next; else s = new TableStack (); s.tab = t; s.length = n; s.index = i; s.next = stack; stack = s; } /** * 出棧操作,恢復(fù)擴(kuò)容之前的數(shù)組狀態(tài) */ private void recoverState(int n) { TableStack s; int len; while ((s = stack) != null && (index += (len = s.length)) >= n) { n = len; index = s.index; tab = s.tab; s.tab = null; TableStack next = s.next; s.next = spare; // save for reuse stack = next; spare = s; } if (s == null && (index += baseSize) >= n) index = ++baseIndex; } }
遍歷過程可參考下圖(圖中數(shù)據(jù)非真實(shí)數(shù)據(jù)):
ForwardingNodeForwardingNode節(jié)點(diǎn)在擴(kuò)容時(shí)被使用,hash值固定為-1,當(dāng)進(jìn)行擴(kuò)容操作時(shí),轉(zhuǎn)移hash桶時(shí)會(huì)在原數(shù)組位置上放置一個(gè)ForwardingNode節(jié)點(diǎn),表示當(dāng)前位置hash桶已轉(zhuǎn)移到新擴(kuò)容數(shù)組上,當(dāng)讀線程讀到ForwardingNode節(jié)點(diǎn)時(shí)則需要到新擴(kuò)容數(shù)組對(duì)應(yīng)的位置進(jìn)行讀操作,如果是寫線程,則需要幫助進(jìn)行擴(kuò)容操作
/** * A node inserted at head of bins during transfer operations. */ static final class ForwardingNodeReservationNodeextends Node { final Node [] nextTable; ForwardingNode(Node [] tab) { // MOVED hash值固定為-1 super(MOVED, null, null, null); this.nextTable = tab; } // 提供find方法查找,這里通過nextTable進(jìn)行遍歷 Node find(int h, Object k) { // loop to avoid arbitrarily deep recursion on forwarding nodes outer: for (Node [] tab = nextTable;;) { Node e; int n; // 前置判斷 if (k == null || tab == null || (n = tab.length) == 0 || (e = tabAt(tab, (n - 1) & h)) == null) return null; for (;;) { int eh; K ek; // 判斷是hash桶第一個(gè)節(jié)點(diǎn),返回這個(gè)節(jié)點(diǎn) if ((eh = e.hash) == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; if (eh < 0) { // 遞歸 if (e instanceof ForwardingNode) { tab = ((ForwardingNode )e).nextTable; continue outer; } else // 特殊節(jié)點(diǎn)調(diào)用其自身方法 return e.find(h, k); } // 找到最后返回空 if ((e = e.next) == null) return null; } } } }
保留節(jié)點(diǎn),源碼僅compute,computeIfAbsent中使用,通過ReservationNode對(duì)hash桶進(jìn)行加鎖處理,寫操作時(shí)正常情況下需要對(duì)第一個(gè)節(jié)點(diǎn)進(jìn)行加鎖處理,put操作使用CAS不需要加鎖,而在compute,computeIfAbsent方法中比較復(fù)雜,需加鎖處理,在hash桶節(jié)點(diǎn)為空時(shí),添加一個(gè)ReservationNode節(jié)點(diǎn),同時(shí)對(duì)其加鎖處理
static final class ReservationNode構(gòu)造方法extends Node { ReservationNode() { super(RESERVED, null, null, null); } Node find(int h, Object k) { return null; } }
在有參構(gòu)造函數(shù)中,傳入初始容量參數(shù),調(diào)用tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1)),這里和HashMap就不一樣了,可自行查閱源碼,ConcurrentHashMap先擴(kuò)大了1.5倍再調(diào)用tableSizeFor方法,可以回想下tableSizeFor方法的含義,和HashMap是相同的處理方式,同時(shí)構(gòu)造方法也不是table進(jìn)行初始化的地方,和HashMap類似,都是在插入K-V值時(shí)進(jìn)行數(shù)組的初始化,構(gòu)造方法僅僅是來設(shè)置屬性值的,重要的在于sizeCtl,此時(shí)是用來記錄數(shù)組初始化使用容量的,在initTable方法中使用
public ConcurrentHashMap() { } public ConcurrentHashMap(int initialCapacity) { if (initialCapacity < 0) throw new IllegalArgumentException(); int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1)); // 設(shè)置sizeCtl,此時(shí)表示數(shù)組初始化使用容量,因?yàn)樵诔跏蓟瘯r(shí)將會(huì)使用這個(gè)變量 this.sizeCtl = cap; } public ConcurrentHashMap(Map extends K, ? extends V> m) { // sizeCtl設(shè)置為默認(rèn)容量,此時(shí)表示數(shù)組初始化使用容量 this.sizeCtl = DEFAULT_CAPACITY; putAll(m); } public ConcurrentHashMap(int initialCapacity, float loadFactor) { this(initialCapacity, loadFactor, 1); } // concurrencyLevel 并發(fā)級(jí)別 兼容舊版本 public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (initialCapacity < concurrencyLevel) // Use at least as many bins initialCapacity = concurrencyLevel; // as estimated threads long size = (long)(1.0 + (long)initialCapacity / loadFactor); int cap = (size >= (long)MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : tableSizeFor((int)size); // 同樣進(jìn)行設(shè)置sizeCtl,此時(shí)表示數(shù)組初始化使用容量 this.sizeCtl = cap; }initTable
構(gòu)造方法并沒有創(chuàng)建Node數(shù)組,最終執(zhí)行是在put方法中的initTable方法,這個(gè)方法中也能看出sizeCtl在不同值下代表了不同的含義
private final Node總結(jié)[] initTable() { Node [] tab; int sc; while ((tab = table) == null || tab.length == 0) { // 在初始化時(shí)不能并發(fā)執(zhí)行,在已經(jīng)有線程進(jìn)入初始化狀態(tài)時(shí),非初始化線程需讓步操作,等待完成 if ((sc = sizeCtl) < 0) Thread.yield(); // lost initialization race; just spin // CAS將sizeCtl置為-1 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { // 檢查是否已經(jīng)初始化,否則進(jìn)入初始化過程 if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node [] nt = (Node [])new Node,?>[n]; table = tab = nt; // sc = 3n/4,相當(dāng)于0.75n sc = n - (n >>> 2); } } finally { // sizeCtl代表了閾值,類似threshold sizeCtl = sc; } break; } } return tab; }
本文從整體上對(duì)ConcurrentHashMap進(jìn)行了介紹,其中常量和變量以及其中重要的內(nèi)部類都進(jìn)行了相關(guān)的解釋說明,當(dāng)然,限于篇幅以及復(fù)雜度暫未涉及方法的源碼部分,下一篇將結(jié)合本文的相關(guān)知識(shí)重點(diǎn)說明其中涉及的方法,在看本文時(shí)請(qǐng)完全理解HashMap的源碼,將會(huì)事半功倍,畢竟結(jié)構(gòu)上和HashMap非常相似
ConcurrentHashMap從HashMap的基礎(chǔ)上進(jìn)行演變,適應(yīng)多線程并發(fā)情況下的操作,理解下列幾點(diǎn):
初始化數(shù)組,單線程創(chuàng)建
擴(kuò)容時(shí),多線程可幫助擴(kuò)容(下一章將詳細(xì)說明)
hash桶頭節(jié)點(diǎn)有4種類型,F(xiàn)orwardingNode節(jié)點(diǎn)表明當(dāng)前hash桶正在擴(kuò)容移動(dòng)中,TreeBin節(jié)點(diǎn)表明為紅黑樹結(jié)構(gòu),ReservationNode節(jié)點(diǎn)暫不多說,還有就是正常鏈表節(jié)點(diǎn)
遍歷操作通過一個(gè)特殊的內(nèi)部類Traverser實(shí)現(xiàn)
CAS操作大大增加了并發(fā)執(zhí)行的效率
水平有限,如有錯(cuò)誤,請(qǐng)及時(shí)指出,謝謝!
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/77874.html
摘要:上一篇文章已經(jīng)就進(jìn)行了部分說明,介紹了其中涉及的常量和變量的含義,有些部分需要結(jié)合方法源碼來理解,今天這篇文章就繼續(xù)講解并發(fā)前言本文主要介紹中的一些重要方法,結(jié)合上篇文章中的講解部分進(jìn)行更進(jìn)一步的介紹回顧下上篇文章,我們應(yīng)該已經(jīng)知道的整體結(jié) 上一篇文章已經(jīng)就ConcurrentHashMap進(jìn)行了部分說明,介紹了其中涉及的常量和變量的含義,有些部分需要結(jié)合方法源碼來理解,今天這篇文章就...
摘要:用這種范例表示紅黑樹是可能的,但是這會(huì)改變一些性質(zhì)并使算法復(fù)雜。插入會(huì)出現(xiàn)種情況為根節(jié)點(diǎn),即紅黑樹的根節(jié)點(diǎn)。 說到HashMap,就一定要說到紅黑樹,紅黑樹作為一種自平衡二叉查找樹,是一種用途較廣的數(shù)據(jù)結(jié)構(gòu),在jdk1.8中使用紅黑樹提升HashMap的性能,今天就來說一說紅黑樹。 前言 限于篇幅,本文只對(duì)紅黑樹的基礎(chǔ)進(jìn)行說明,暫不涉及源碼部分,大部分摘抄自維基百科,這里也貼出對(duì)應(yīng)鏈接...
摘要:前面已經(jīng)講解集合中的并且也對(duì)其中使用的紅黑樹結(jié)構(gòu)做了對(duì)應(yīng)的說明,這次就來看下簡(jiǎn)單一些的另一個(gè)集合類,也是日常經(jīng)常使用到的,整體來說,算是比較好理解的集合了,一起來看下前言版本類定義繼承了,實(shí)現(xiàn)了,提供對(duì)數(shù)組隊(duì)列的增刪改查操作實(shí)現(xiàn)接口,提供隨 前面已經(jīng)講解集合中的HashMap并且也對(duì)其中使用的紅黑樹結(jié)構(gòu)做了對(duì)應(yīng)的說明,這次就來看下簡(jiǎn)單一些的另一個(gè)集合類,也是日常經(jīng)常使用到的ArrayL...
摘要:如果沖突了,同步頭節(jié)點(diǎn),進(jìn)行鏈表操作,如果鏈表長(zhǎng)度達(dá)到,分成紅黑樹。每次加入一個(gè)線程都會(huì)將的低位加一。擴(kuò)容最大的幫助線程是,這是低位的最大值限制的。線程處理完之后,如果沒有可選區(qū)間,且任務(wù)沒有完成,就會(huì)將整個(gè)表檢查一遍,防止遺漏。 前言 每一次總結(jié)都意味著重新開始,同時(shí)也是為了更好的開始。ConcurrentHashMap 一直是我心中的痛。雖然不敢說完全讀懂了,但也看了幾個(gè)重要的方法...
閱讀 1550·2023-04-26 02:08
閱讀 3139·2021-10-14 09:42
閱讀 7228·2021-09-22 15:34
閱讀 3249·2019-08-30 13:16
閱讀 2751·2019-08-26 13:49
閱讀 1355·2019-08-26 11:59
閱讀 1285·2019-08-26 10:31
閱讀 2178·2019-08-23 17:19