日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務時間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
JavaConcurrentHashMap高并發(fā)安全實現(xiàn)原理解析

 一、概述

ConcurrentHashMap (以下簡稱C13Map) 是并發(fā)編程出場率最高的數(shù)據(jù)結(jié)構(gòu)之一,大量的并發(fā)CASE背后都有C13Map的支持,同時也是JUC包中代碼量最大的組件(6000多行),自JDK8開始Oracle對其進行了大量優(yōu)化工作。

本文從 HashMap 的基礎(chǔ)知識開始,嘗試逐一分析C13Map中各個組件的實現(xiàn)和安全性保證。

二、HashMap基礎(chǔ)知識 

分析C13MAP前,需要了解以下的HashMap知識或者約定:

  •  哈希表的長度永遠都是2的冪次方,原因是hashcode%tableSize==hashcode&(tableSize-1),也就是哈希槽位的確定可以用一次與運算來替代取余運算。
  •  會對hashcode調(diào)用若干次擾動函數(shù),將高16位與低16位做異或運算,因為高16位的隨機性更強。
  •  當表中的元素總數(shù)超過tableSize * 0.75時,哈希表會發(fā)生擴容操作,每次擴容的tableSize是原先的兩倍。
  •  下文提到的槽位(bucket)、哈希分桶、BIN均表示同一個概念,即哈希table上的某一列。
  •  舊表在做搬運時i槽位的node可以根據(jù)其哈希值的第tableSize位的bit決定在新表上的槽位是i還是i+tableSize。
  •  每個槽位上有可能會出現(xiàn)哈希沖突,在未達到某個閾值時它是一個鏈表結(jié)構(gòu),達到閾值后會升級到紅黑樹結(jié)構(gòu)。
  •  HashMap本身并非為多線程環(huán)境設(shè)計,永遠不要嘗試在并發(fā)環(huán)境下直接使用HashMap,C13Map不存在這個安全問題。

三、C13Map的字段定義

C13Map的字段定義 

 
 
 
 
  1. //最大容量  
  2. private static final int MAXIMUM_CAPACITY = 1 << 30;   
  3. //默認初始容量  
  4. private static final int DEFAULT_CAPACITY = 16;  
  5. //數(shù)組的最大容量,防止拋出OOM  
  6. static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;   
  7. //最大并行度,僅用于兼容JDK1.7以前版本  
  8. private static final int DEFAULT_CONCURRENCY_LEVEL = 16;  
  9. //擴容因子  
  10. private static final float LOAD_FACTOR = 0.75f;  
  11. //鏈表轉(zhuǎn)紅黑樹的閾值  
  12. static final int TREEIFY_THRESHOLD = 8;  
  13. //紅黑樹退化閾值  
  14. static final int UNTREEIFY_THRESHOLD = 6;  
  15. //鏈表轉(zhuǎn)紅黑樹的最小總量  
  16. static final int MIN_TREEIFY_CAPACITY = 64;  
  17. //擴容搬運時批量搬運的最小槽位數(shù)  
  18. private static final int MIN_TRANSFER_STRIDE = 16;  
  19. //當前待擴容table的郵戳位,通常是高16位  
  20. private static final int RESIZE_STAMP_BITS = 16;  
  21. //同時搬運的線程數(shù)自增的最大值  
  22. private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;  
  23. //搬運線程數(shù)的標識位,通常是低16位  
  24. private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;  
  25. static final int MOVED     = -1; // 說明是forwardingNode  
  26. static final int TREEBIN   = -2; // 紅黑樹  
  27. static final int RESERVED  = -3; // 原子計算的占位Node  
  28. static final int HASH_BITS = 0x7fffffff; // 保證hashcode擾動計算結(jié)果為正數(shù)  
  29. //當前哈希表  
  30. transient volatile Node[] table;  
  31. //下一個哈希表  
  32. private transient volatile Node[] nextTable;  
  33. //計數(shù)的基準值  
  34. private transient volatile long baseCount;  
  35. //控制變量,不同場景有不同用途,參考下文  
  36. private transient volatile int sizeCtl;  
  37. //并發(fā)搬運過程中CAS獲取區(qū)段的下限值  
  38. private transient volatile int transferIndex;  
  39. //計數(shù)cell初始化或者擴容時基于此字段使用自旋鎖  
  40. private transient volatile int cellsBusy;  
  41. //加速多核CPU計數(shù)的cell數(shù)組  
  42. private transient volatile CounterCell[] counterCells; 

四、安全操作Node數(shù)組 

 
 
 
 
  1. static final  Node tabAt(Node[] tab, int i) {  
  2.     return (Node)U.getReferenceAcquire(tab, ((long)i << ASHIFT) + ABASE);  
  3. }  
  4. static final  boolean casTabAt(Node[] tab, int i,  
  5.                                     Node c, Node v) {  
  6.     return U.compareAndSetReference(tab, ((long)i << ASHIFT) + ABASE, c, v);  
  7. }  
  8. static final  void setTabAt(Node[] tab, int i, Node v) {  
  9.     U.putReferenceRelease(tab, ((long)i << ASHIFT) + ABASE, v); 

對Node[] 上任意一個index的讀取和寫入都使用了Unsafe輔助類,table本身是volatile類型的并不能保證其下的每個元素的內(nèi)存語義也是volatile類型;

需要借助于Unsafe來保證Node[]元素的“讀/寫/CAS”操作在多核并發(fā)環(huán)境下的原子或者可見性。

五、讀操作get為什么是線程安全的

首先需要明確的是,C13Map的讀操作一般是不加鎖的(TreeBin的讀寫鎖除外),而讀操作與寫操作有可能并行;可以保證的是,因為C13Map的寫操作都要獲取bin頭部的syncronized互斥鎖,能保證最多只有一個線程在做更新,這其實是一個單線程寫、多線程讀的并發(fā)安全性的問題。

C13Map的get方法 

 
 
 
 
  1. public V get(Object key) {  
  2.     Node[] tab; Node e, p; int n, eh; K ek;  
  3.     //執(zhí)行擾動函數(shù)  
  4.     int h = spread(key.hashCode());  
  5.     if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) {  
  6.         if ((eeh = e.hash) == h) {  
  7.             if ((eek = e.key) == key || (ek != null && key.equals(ek)))  
  8.                 return e.val;  
  9.         } 
  10.          else if (eh < 0)  
  11.             return (p = e.find(h, key)) != null ? p.val : null;  
  12.         while ((ee = e.next) != null) {  
  13.             if (e.hash == h &&  
  14.                 ((eek = e.key) == key || (ek != null && key.equals(ek))))  
  15.                 return e.val; 
  16.          }  
  17.     }  
  18.     return null;  

1、如果當前哈希表table為null

哈希表未初始化或者正在初始化未完成,直接返回null;雖然line5和line18之間其它線程可能經(jīng)歷了千山萬水,至少在判斷tab==null的時間點key肯定是不存在的,返回null符合某一時刻的客觀事實。

2、如果讀取的bin頭節(jié)點為null

說明該槽位尚未有節(jié)點,直接返回null。

3、如果讀取的bin是一個鏈表

說明頭節(jié)點是個普通Node。

(1)如果正在發(fā)生鏈表向紅黑樹的treeify工作,因為treeify本身并不破壞舊的鏈表bin的結(jié)構(gòu),只是在全部treeify完成后將頭節(jié)點一次性替換為新創(chuàng)建的TreeBin,可以放心讀取。

(2)如果正在發(fā)生resize且當前bin正在被transfer,因為transfer本身并不破壞舊的鏈表bin的結(jié)構(gòu),只是在全部transfer完成后將頭節(jié)點一次性替換為ForwardingNode,可以放心讀取。

(3)如果其它線程正在操作鏈表,在當前線程遍歷鏈表的任意一個時間點,都有可能同時在發(fā)生add/replace/remove操作。

  •  如果是add操作,因為鏈表的節(jié)點新增從JDK8以后都采用了后入式,無非是多遍歷或者少遍歷一個tailNode。
  •  如果是remove操作,存在遍歷到某個Node時,正好有其它線程將其remove,導致其孤立于整個鏈表之外;但因為其next引用未發(fā)生變更,整個鏈表并沒有斷開,還是可以照常遍歷鏈表直到tailNode。
  •  如果是replace操作,鏈表的結(jié)構(gòu)未變,只是某個Node的value發(fā)生了變化,沒有安全問題。

結(jié)論:對于鏈表這種線性數(shù)據(jù)結(jié)構(gòu),單線程寫且插入操作保證是后入式的前提下,并發(fā)讀取是安全的;不會存在誤讀、鏈表斷開導致的漏讀、讀到環(huán)狀鏈表等問題。

4、如果讀取的bin是一個紅黑樹

說明頭節(jié)點是個TreeBin節(jié)點。

(1)如果正在發(fā)生紅黑樹向鏈表的untreeify操作,因為untreeify本身并不破壞舊的紅黑樹結(jié)構(gòu),只是在全部untreeify完成后將頭節(jié)點一次性替換為新創(chuàng)建的普通Node,可以放心讀取。

(2)如果正在發(fā)生resize且當前bin正在被transfer,因為transfer本身并不破壞舊的紅黑樹結(jié)構(gòu),只是在全部transfer完成后將頭節(jié)點一次性替換為ForwardingNode,可以放心讀取。

(3)如果其他線程在操作紅黑樹,在當前線程遍歷紅黑樹的任意一個時間點,都可能有單個的其它線程發(fā)生add/replace/remove/紅黑樹的翻轉(zhuǎn)等操作,參考下面的紅黑樹的讀寫鎖實現(xiàn)。

TreeBin中的讀寫鎖實現(xiàn) 

 
 
 
 
  1.  TreeNode root;  
  2.     volatile TreeNode first;  
  3.     volatile Thread waiter;  
  4.     volatile int lockState;  
  5.     // values for lockState  
  6.     static final int WRITER = 1; // set while holding write lock  
  7.     static final int WAITER = 2; // set when waiting for write lock  
  8.     static final int READER = 4; // increment value for setting read lock  
  9.     private final void lockRoot() {  
  10.         //如果一次性獲取寫鎖失敗,進入contendedLock循環(huán)體,循環(huán)獲取寫鎖或者休眠等待  
  11.         if (!U.compareAndSetInt(this, LOCKSTATE, 0, WRITER))  
  12.             contendedLock(); // offload to separate method  
  13.     }  
  14.     private final void unlockRoot() {  
  15.         lockState = 0; 
  16.     }  
  17.     //對紅黑樹加互斥鎖,也就是寫鎖  
  18.     private final void contendedLock() {  
  19.         boolean waiting = false;  
  20.         for (int s;;) {  
  21.             //如果lockState除了第二位外其它位上都為0,表示紅黑樹當前既沒有上讀鎖,又沒有上寫鎖,僅有可能存在waiter,可以嘗試直接獲取寫鎖  
  22.             if (((s = lockState) & ~WAITER) == 0) {  
  23.                 if (U.compareAndSetInt(this, LOCKSTATE, s, WRITER)) { 
  24.                      if (waiting)  
  25.                         waiter = null;  
  26.                     return; 
  27.                 }  
  28.             }  
  29.             //如果lockState第二位是0,表示當前沒有線程在等待寫鎖  
  30.             else if ((s & WAITER) == 0) {  
  31.                 //將lockState的第二位設(shè)置為1,相當于打上了waiter的標記,表示有線程在等待寫鎖  
  32.                 if (U.compareAndSetInt(this, LOCKSTATE, s, s | WAITER)) {  
  33.                     waiting = true;  
  34.                     waiter = Thread.currentThread();  
  35.                 }  
  36.             }  
  37.             //休眠當前線程  
  38.             else if (waiting)  
  39.                 LockSupport.park(this);  
  40.         }  
  41.     } 
  42.      //查找紅黑樹中的某個節(jié)點  
  43.     final Node find(int h, Object k) {  
  44.         if (k != null) {  
  45.             for (Node e = first; e != null; ) {  
  46.                 int s; K ek;  
  47.                 //如果當前有waiter或者有寫鎖,走線性檢索,因為紅黑樹雖然替代了鏈表,但其內(nèi)部依然保留了鏈表的結(jié)構(gòu),雖然鏈表的查詢性能一般,但根據(jù)先前的分析其讀取的安全性有保證。  
  48.                 //發(fā)現(xiàn)有寫鎖改走線性檢索,是為了避免等待寫鎖釋放花去太久時間; 而發(fā)現(xiàn)有waiter改走線性檢索,是為了避免讀鎖疊加的太多,導致寫鎖線程需要等待太長的時間; 本質(zhì)上都是為了減少讀寫碰撞 
  49.                  //線性遍歷的過程中,每遍歷到下一個節(jié)點都做一次判斷,一旦發(fā)現(xiàn)鎖競爭的可能性減少就改走tree檢索以提高性能 
  50.                  if (((s = lockState) & (WAITER|WRITER)) != 0) {  
  51.                     if (e.hash == h &&  
  52.                         ((eek = e.key) == k || (ek != null && k.equals(ek))))  
  53.                         return e;  
  54.                     ee = e.next;  
  55.                 }  
  56.                 //對紅黑樹加共享鎖,也就是讀鎖,CAS一次性增加4,也就是增加的只是3~32位  
  57.                 else if (U.compareAndSetInt(this, LOCKSTATE, s, 
  58.                                               s + READER)) {  
  59.                     TreeNode r, p;  
  60.                     try {  
  61.                         p = ((r = root) == null ? null :  
  62.                              r.findTreeNode(h, k, null));  
  63.                     } finally {  
  64.                         Thread w;  
  65.                         //釋放讀鎖,如果釋放完畢且有waiter,則將其喚醒  
  66.                         if (U.getAndAddInt(this, LOCKSTATE, -READER) ==  
  67.                             (READER|WAITER) && (w = waiter) != null)  
  68.                             LockSupport.unpark(w);  
  69.                     }  
  70.                     return p;  
  71.                 }  
  72.             }  
  73.         }  
  74.         return null;  
  75.     }  
  76.     //更新紅黑樹中的某個節(jié)點  
  77.     final TreeNode putTreeVal(int h, K k, V v) {  
  78.         Class kc = null;  
  79.         boolean searched = false;  
  80.         for (TreeNode p = root;;) {  
  81.             int dir, ph; K pk;  
  82.             //...省略處理紅黑樹數(shù)據(jù)結(jié)構(gòu)的代碼若干        
  83.                  else {  
  84.                     //寫操作前加互斥鎖  
  85.                     lockRoot();  
  86.                     try {  
  87.                         root = balanceInsertion(root, x);  
  88.                     } finally {  
  89.                         //釋放互斥鎖 
  90.                          unlockRoot();  
  91.                     }  
  92.                 }  
  93.                 break;  
  94.             }  
  95.         }  
  96.         assert checkInvariants(root);  
  97.         return null;  
  98.     }  

紅黑樹內(nèi)置了一套讀寫鎖的邏輯,其內(nèi)部定義了32位的int型變量lockState,第1位是寫鎖標志位,第2位是寫鎖等待標志位,從3~32位則是共享鎖標志位。

讀寫操作是互斥的,允許多個線程同時讀取,但不允許讀寫操作并行,同一時刻只允許一個線程進行寫操作;這樣任意時間點讀取的都是一個合法的紅黑樹,整體上是安全的。

有的同學會產(chǎn)生疑惑,寫鎖釋放時為何沒有將waiter喚醒的操作呢?是否有可能A線程進入了等待區(qū),B線程獲取了寫鎖,釋放寫鎖時僅做了lockState=0的操作。

那么A線程是否就沒有機會被喚醒了,只有等待下一個讀鎖釋放時的喚醒了呢 ?

顯然這種情況違背常理,C13Map不會出現(xiàn)這樣的疏漏,再進一步觀察,紅黑樹的變更操作的外圍,也就是在putValue/replaceNode那一層,都是對BIN的頭節(jié)點加了synchornized互斥鎖的,同一時刻只能有一個寫線程進入TreeBin的方法范圍內(nèi),當寫線程發(fā)現(xiàn)當前waiter不為空,其實此waiter只能是當前線程自己,可以放心的獲取寫鎖,不用擔心無法被喚醒的問題。

TreeBin在find讀操作檢索時,在linearSearch(線性檢索)和treeSearch(樹檢索)間做了折衷,前者性能差但并發(fā)安全,后者性能佳但要做并發(fā)控制,可能導致鎖競爭;設(shè)計者使用線性檢索來盡量避免讀寫碰撞導致的鎖競爭,但評估到race condition已消失時,又立即趨向于改用樹檢索來提高性能,在安全和性能之間做到了極佳的平衡。具體的折衷策略請參考find方法及注釋。

由于有線性檢索這樣一個抄底方案,以及入口處bin頭節(jié)點的synchornized機制,保證了進入到TreeBin整體代碼塊的寫線程只有一個;TreeBin中讀寫鎖的整體設(shè)計與ReentrantReadWriteLock相比還是簡單了不少,比如并未定義用于存放待喚醒線程的threadQueue,以及讀線程僅會自旋而不會阻塞等等, 可以看做是特定條件下ReadWriteLock的簡化版本。

5、如果讀取的bin是一個ForwardingNode

說明當前bin已遷移,調(diào)用其find方法到nextTable讀取數(shù)據(jù)。

forwardingNode的find方法 

 
 
 
 
  1. static final class ForwardingNode extends Node {  
  2.     final Node[] nextTable;  
  3.     ForwardingNode(Node[] tab) {  
  4.         super(MOVED, null, null);  
  5.         this.nextTable = tab;  
  6.     }  
  7.      //遞歸檢索哈希表鏈  
  8.     Node find(int h, Object k) {  
  9.         // loop to avoid arbitrarily deep recursion on forwarding nodes  
  10.         outer: for (Node[] tab = nextTable;;) {  
  11.             Node e; int n;  
  12.             if (k == null || tab == null || (n = tab.length) == 0 ||  
  13.                 (e = tabAt(tab, (n - 1) & h)) == null)  
  14.                 return null;  
  15.             for (;;) {  
  16.                 int eh; K ek; 
  17.                  if ((eeh = e.hash) == h &&  
  18.                     ((eek = e.key) == k || (ek != null && k.equals(ek))))  
  19.                     return e;  
  20.                 if (eh < 0) {  
  21.                     if (e instanceof ForwardingNode) {  
  22.                         tab = ((ForwardingNode)e).nextTable;  
  23.                         continue outer;  
  24.                     }  
  25.                     else  
  26.                         return e.find(h, k);  
  27.                 }  
  28.                 if ((ee = e.next) == null)  
  29.                     return null;  
  30.             }  
  31.         }  
  32.     }  

ForwardingNode中保存了nextTable的引用,會轉(zhuǎn)向下一個哈希表進行檢索,但并不能保證nextTable就一定是currentTable,因為在高并發(fā)插入的情況下,極短時間內(nèi)就可以導致哈希表的多次擴容,內(nèi)存中極有可能駐留一條哈希表鏈,彼此以bin的頭節(jié)點上的ForwardingNode相連,線程剛讀取時拿到的是table1,遍歷時卻有可能經(jīng)歷了哈希表的鏈條。

eh<0有三種情況:

  •  如果是ForwardingNode繼續(xù)遍歷下一個哈希表。
  •  如果是TreeBin,調(diào)用其find方法進入TreeBin讀寫鎖的保護區(qū)讀取數(shù)據(jù)。
  •  如果是ReserveNode,說明當前有compute計算中,整條bin還是一個空結(jié)構(gòu),直接返回null。

6、如果讀取的bin是一個ReserveNode

ReserveNode用于compute/computeIfAbsent原子計算的方法,在BIN的頭節(jié)點為null且計算尚未完成時,先在bin的頭節(jié)點打上一個ReserveNode的占位標記。

讀操作發(fā)現(xiàn)ReserveNode直接返回null,寫操作會因為爭奪ReserveNode的互斥鎖而進入阻塞態(tài),在compute完成后被喚醒后循環(huán)重試。

六、寫操作putValue/replaceNode為什么是線程安全的

典型的編程范式如下:

C13Map的putValue方法 

 
 
 
 
  1. Node[] tab = table;  //將堆中的table變量賦給線程堆棧中的局部變量  
  2. Node f = tabAt(tab, i );  
  3. if(f==null){  
  4.  //當前槽位沒有頭節(jié)點,直接CAS寫入  
  5.  if (casTabAt(tab, i, null, new Node(hash, key, value)))  
  6.     break;  
  7. }else if(f.hash == MOVED){  
  8.  //加入?yún)f(xié)助搬運行列  
  9.  helpTransfer(tab,f);  
  10. }  
  11. //不是forwardingNode  
  12. else if(f.hash != MOVED){  
  13.     //先鎖住I槽位上的頭節(jié)點  
  14.     synchronized (f) {  
  15.     //再doubleCheck看此槽位上的頭節(jié)點是否還是f  
  16.     if (tabAt(tab, i) == f) {  
  17.        ...各種寫操作  
  18.     }  
  19.   }  

1、當前槽位如果頭節(jié)點為null時,直接CAS寫入

有人也許會質(zhì)疑,如果寫入時resize操作已完成,發(fā)生了table向nextTable的轉(zhuǎn)變,是否會存在寫入的是舊表的bin導致數(shù)據(jù)丟失的可能 ? 

這種可能性是不存在的,因為一個table在resize完成后所有的BIN都會被打上ForwardingNode的標記,可以形象的理解為所有槽位上都插滿了紅旗,而此處在CAS時的compare的變量null,能夠保證至少在CAS原子操作發(fā)生的時間點table并未發(fā)生變更。

2、當前槽位如果頭節(jié)點不為null

這里采用了一個小技巧:先鎖住I槽位上的頭節(jié)點,進入同步代碼塊后,再doubleCheck看此槽位上的頭節(jié)點是否有變化。

進入同步塊后還需要doubleCheck的原因:雖然一開始獲取到的頭節(jié)點f并非ForwardingNode,但在獲取到f的同步鎖之前,可能有其它線程提前獲取了f的同步鎖并完成了transfer工作,并將I槽位上的頭節(jié)點標記為ForwardingNode,此時的f就成了一個過時的bin的頭節(jié)點。

然而因為標記操作與transfer作為一個整體在同步的代碼塊中執(zhí)行,如果doubleCheck的結(jié)果是此槽位上的頭節(jié)點還是f,則表明至少在當前時間點該槽位還沒有被transfer到新表(假如當前有transfer in progress的話),可以放心的對該bin進行put/remove/replace等寫操作。

只要未發(fā)生transfer或者treeify操作,鏈表的新增操作都是采取后入式,頭節(jié)點一旦確定不會輕易改變,這種后入式的更新方式保證了鎖定頭節(jié)點就等于鎖住了整個bin。

如果不作doubleCheck判斷,則有可能當前槽位已被transfer,寫入的還是舊表的BIN,從而導致寫入數(shù)據(jù)的丟失;也有可能在獲取到f的同步鎖之前,其它線程對該BIN做了treeify操作,并將頭節(jié)點替換成了TreeBin, 導致寫入的是舊的鏈表,而非新的紅黑樹;

3、doubleCheck是否有ABA問題

也許有人會質(zhì)疑,如果有其它線程提前對當前bin進行了的remove/put的操作,引入了新的頭節(jié)點,并且恰好發(fā)生了JVM的內(nèi)存釋放和重新分配,導致新的Node的引用地址恰好跟舊的相同,也就是存在所謂的ABA問題。

這個可以通過反證法來推翻,在帶有GC機制的語言環(huán)境下通常不會發(fā)生ABA問題,因為當前線程包含了對頭節(jié)點f的引用,當前線程并未消亡,不可能存在f節(jié)點的內(nèi)存被GC回收的可能性。

還有人會質(zhì)疑,如果在寫入過程中主哈希表發(fā)生了變化,是否可能寫入的是舊表的bin導致數(shù)據(jù)丟失,這個也可以通過反證法來推翻,因為table向nextTable的轉(zhuǎn)化(也就是將resize后的新哈希表正式commit)只有在所有的槽位都已經(jīng)transfer成功后才會進行,只要有一個bin未transfer成功,則說明當前的table未發(fā)生變化,在當前的時間點可以放心的向table的bin內(nèi)寫入數(shù)據(jù)。

4、如何操作才安全

可以總結(jié)出規(guī)律,在對table的槽位成功進行了CAS操作且compare值為null,或者對槽位的非forwardingNode的頭節(jié)點加鎖后,doubleCheck頭節(jié)點未發(fā)生變化,對bin的寫操作都是安全的。

七、原子計算相關(guān)方法

原子計算主要包括:computeIfAbsent、computeIfPresent、compute、merge四個方法。 

1、幾個方法的比較 

主要區(qū)別如下:

(1)computeIfAbsent只會在判斷到key不存在時才會插入,判空與插入是一個原子操作,提供的FunctionalInterface是一個二元的Function, 接受key參數(shù),返回value結(jié)果;如果計算結(jié)果為null則不做插入。

(2)computeIfPresent只會在判讀單到Key非空時才會做更新,判斷非空與插入是一個原子操作,提供的FunctionalInterface是一個三元的BiFunction,接受key,value兩個參數(shù),返回新的value結(jié)果;如果新的value為null則刪除key對應節(jié)點。

(3)compute則不加key是否存在的限制,提供的FunctionalInterface是一個三元的BiFunction,接受key,value兩個參數(shù),返回新的value結(jié)果;如果舊的value不存在則以null替代進行計算;如果新的value為null則保證key對應節(jié)點不會存在。

(4)merge不加key是否存在的限制,提供的FunctionalInterface是一個三元的BiFunction,接受oldValue, newVALUE兩個參數(shù),返回merge后的value;如果舊的value不存在,直接以newVALUE作為最終結(jié)果,存在則返回merge后的結(jié)果;如果最終結(jié)果為null,則保證key對應節(jié)點不會存在。

2、何時會使用ReserveNode占位

如果目標bin的頭節(jié)點為null,需要寫入的話有兩種手段:一種是生成好新的節(jié)點r后使用casTabAt(tab, i, null, r)原子操作,因為compare的值為null可以保證并發(fā)的安全;

另外一種方式是創(chuàng)建一個占位的ReserveNode,鎖住該節(jié)點并將其CAS設(shè)置到bin的頭節(jié)點,再進行進一步的原子計算操作;這兩種辦法都有可能在CAS的時候失敗,需要自旋反復嘗試。

(1)為什么只有computeIfAbsent/compute方法使用占位符的方式

computeIfPresent只有在BIN結(jié)構(gòu)非空的情況下才會展開原子計算,自然不存在需要ReserveNode占位的情況;鎖住已有的頭節(jié)點即可。

computeIfAbsent/compute方法在BIN結(jié)構(gòu)為空時,需要展開Function或者BiFunction的運算,這個操作是外部引入的需要耗時多久無法準確評估;這種情況下如果采用先計算,再casTabAt(tab, i, null, r)的方式,如果有其它線程提前更新了這個BIN,那么就需要重新鎖定新加入的頭節(jié)點,并重復一次原子計算(C13Map無法幫你緩存上次計算的結(jié)果,因為計算的入?yún)⒂锌赡軙兓?,這個開銷是比較大的。

而使用ReserveNode占位的方式無需等到原子計算出結(jié)果,可以第一時間先搶占BIN的所有權(quán),使其他并發(fā)的寫線程阻塞。

(2)merge方法為何不需要占位

原因是如果BIN結(jié)構(gòu)為空時,根據(jù)merge的處理策略,老的value為空則直接使用新的value替代,這樣就省去了BiFunction中新老value進行merge的計算,這個消耗幾乎是沒有的;因此可以使用casTabAt(tab, i, null, r)的方式直接修改,避免了使用ReserveNode占位,鎖定該占位ReserveNode后再進行CAS修改的兩次CAS無謂的開銷。

C13Map的compute方法 

 
 
 
 
  1. public V compute(K key,  
  2.                  BiFunction remappingFunction) {  
  3.     if (key == null || remappingFunction == null)  
  4.         throw new nullPointerException();  
  5.     int h = spread(key.hashCode());  
  6.     V val = null;  
  7.     int delta = 0;  
  8.     int binCount = 0;  
  9.     for (Node[] tab = table; ; ) {  
  10.         Node f;  
  11.         int n, i, fh;  
  12.         if (tab == null || (n = tab.length) == 0)  
  13.             tab = initTable();  
  14.         else if ((f = tabAt(tab, i = (n - 1) & h)) == null) {  
  15.             //創(chuàng)建占位Node  
  16.             Node r = new ReservationNode();  
  17.            //先鎖定該占位Node  
  18.             synchronized (r) {  
  19.                 //將其設(shè)置到BIN的頭節(jié)點  
  20.                 if (casTabAt(tab, i, null, r)) {  
  21.                     binCount = 1; 
  22.                      Node node = null;  
  23.                     try {  
  24.                         //開始原子計算  
  25.                         if ((val = remappingFunction.apply(key, null)) != null) {  
  26.                             delta = 1; 
  27.                              node = new Node(h, key, val, null);  
  28.                         }  
  29.                     } finally {  
  30.                         //設(shè)置計算后的最終節(jié)點  
  31.                         setTabAt(tab, i, node);  
  32.                     }  
  33.                 }  
  34.             } 
  35.              if (binCount != 0)  
  36.                 break;  
  37.         } else if ((ffh = f.hash) == MOVED) 
    當前名稱:JavaConcurrentHashMap高并發(fā)安全實現(xiàn)原理解析
    鏈接URL:http://m.5511xx.com/article/dhgdddj.html