日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營(yíng)銷解決方案
并發(fā)編程之Exchanger原理與使用

 前言

網(wǎng)站的建設(shè)成都創(chuàng)新互聯(lián)專注網(wǎng)站定制,經(jīng)驗(yàn)豐富,不做模板,主營(yíng)網(wǎng)站定制開發(fā).小程序定制開發(fā),H5頁面制作!給你煥然一新的設(shè)計(jì)體驗(yàn)!已為成都展覽展示等企業(yè)提供專業(yè)服務(wù)。

在JUC包中,除了一些常用的或者說常見的并發(fā)工具類(ReentrantLock,CountDownLatch,CyclicBarrier,Semaphore)等,還有一個(gè)不常用的線程同步器類 —— Exchanger。

Exchanger是適用在兩個(gè)線程之間數(shù)據(jù)交換的并發(fā)工具類,它的作用是找到一個(gè)同步點(diǎn),當(dāng)兩個(gè)線程都執(zhí)行到了同步點(diǎn)(exchange方法)之后(有一個(gè)沒有執(zhí)行到就一直等待,也可以設(shè)置等待超時(shí)時(shí)間),就將自身線程的數(shù)據(jù)與對(duì)方交換。

Exchanger 是什么?

它提供一個(gè)同步點(diǎn),在這個(gè)同步點(diǎn)兩個(gè)線程可以交換彼此的數(shù)據(jù)。這個(gè)兩個(gè)線程通過exchange方法交換數(shù)據(jù),如果第一個(gè)線程先執(zhí)行exchange方法,它會(huì)一直等待第二個(gè)線程也執(zhí)行exchange,當(dāng)兩個(gè)線程都到達(dá)同步點(diǎn)時(shí),這兩個(gè)線程就可以交換數(shù)據(jù),將本線程生產(chǎn)出來的數(shù)據(jù)傳遞給對(duì)方。因此使用Exchanger的中斷時(shí)成對(duì)的線程使用exchange()方法,當(dāng)有一對(duì)線程到達(dá)了同步點(diǎn),就會(huì)進(jìn)行交換數(shù)據(jù),因此該工具類的線程對(duì)象是成對(duì)的。

線程可以在成對(duì)內(nèi)配對(duì)和交換元素的同步點(diǎn)。每個(gè)線程在輸入exchange方法時(shí)提供一些對(duì)象,與合作者線程匹配,并在返回時(shí)接收其合作伙伴的對(duì)象。交換器可以被視為一個(gè)的雙向形式的SynchroniuzedQueue。交換器在諸如遺傳算法和管道設(shè)計(jì)的應(yīng)用中可能是有用的。

一個(gè)用于兩個(gè)工作線程之間交換數(shù)據(jù)的封裝工具類,簡(jiǎn)單說就是一個(gè)線程在完成一定事務(wù)后想與另一個(gè)線程交換數(shù)據(jù),則第一個(gè)先拿出數(shù)據(jù)的線程會(huì)一直等待第二個(gè)線程,直到第二個(gè)線程拿著數(shù)據(jù)到來時(shí)才能彼此交換對(duì)應(yīng)數(shù)據(jù)。

Exchanger 用法

  • Exchanger 泛型類型,其中V表示可交換的數(shù)據(jù)類型
  • V exchanger(V v):等待另一個(gè)線程到達(dá)此交換點(diǎn)(除非當(dāng)前線程被中斷),然后將給定的對(duì)象傳送該線程,并接收該線程的對(duì)象。
  • V exchanger(V v, long timeout, TimeUnit unit):等待另一個(gè)線程到達(dá)此交換點(diǎn)(除非當(dāng)前線程被中斷或超出類指定的等待時(shí)間),然后將給定的對(duì)象傳送給該線程,并接收該線程的對(duì)象。

應(yīng)用場(chǎng)景

Exchanger可以用于遺傳算法,遺傳算法里需要選出兩個(gè)人作為交配對(duì)象,這時(shí)候會(huì)交換兩人的數(shù)據(jù),并使用交叉規(guī)則得出2個(gè)交配結(jié)果。

Exchanger也可以用于校對(duì)工作。比如我們需要將紙制銀流通過人工的方式錄入成電子銀行流水,為了避免錯(cuò)誤,采用AB崗兩人進(jìn)行錄入,錄入到Excel之后,系統(tǒng)需要加載這兩個(gè)Excel,并對(duì)這兩個(gè)Excel數(shù)據(jù)進(jìn)行校對(duì),看看是否錄入的一致

Exchanger的典型應(yīng)用場(chǎng)景是:一個(gè)任務(wù)在創(chuàng)建對(duì)象,而這些對(duì)象的生產(chǎn)代價(jià)很高,另一個(gè)任務(wù)在消費(fèi)這些對(duì)象。通過這種方式,可以有更多的對(duì)象在被創(chuàng)建的同時(shí)被消費(fèi)。

案例說明

Exchanger 用于兩個(gè)線程間交換數(shù)據(jù),當(dāng)然實(shí)際參與的線程可以不止兩個(gè),測(cè)試用例如下:

 
 
 
 
  1. private static void test1() throws InterruptedException {
  2.         Exchanger exchanger = new Exchanger<>();
  3.         CountDownLatch countDownLatch = new CountDownLatch(5);
  4.         for (int i = 0; i < 5; i++) {
  5.             new Thread(() ->  {
  6.                 try {
  7.                     String origMsg = RandomStringUtils.randomNumeric(6);
  8.                     // 先到達(dá)的線程會(huì)在此等待,直到有一個(gè)線程跟它交換數(shù)據(jù)或者等待超時(shí)
  9.                     String exchangeMsg = exchanger.exchange(origMsg,5, TimeUnit.SECONDS);
  10.                     System.out.println(Thread.currentThread().getName() + "\t origMsg:" + origMsg + "\t exchangeMsg:" + exchangeMsg);
  11.                 } catch (InterruptedException e) {
  12.                     e.printStackTrace();
  13.                 } catch (TimeoutException e) {
  14.                     e.printStackTrace();
  15.                 }finally {
  16.                     countDownLatch.countDown();
  17.                 }
  18.             },String.valueOf(i)).start();
  19.         }
  20.         countDownLatch.await();
  21.     }

 第5個(gè)線程因?yàn)闆]有匹配的線程而等待超時(shí),輸出如下:

 
 
 
 
  1. 0  origMsg:524053  exchangeMsg:098544
  2. 3  origMsg:433246  exchangeMsg:956604
  3. 4  origMsg:098544  exchangeMsg:524053
  4. 1  origMsg:956604  exchangeMsg:433246
  5. java.util.concurrent.TimeoutException
  6.  at java.util.concurrent.Exchanger.exchange(Exchanger.java:626)
  7.  at com.nuih.juc.ExchangerDemo.lambda$test1$0(ExchangerDemo.java:37)
  8.  at java.lang.Thread.run(Thread.java:748)

 上述測(cè)試用例是比較簡(jiǎn)單,可以模擬消息消費(fèi)的場(chǎng)景來觀察Exchanger的行為,測(cè)試用例如下:

 
 
 
 
  1. private static void test2() throws InterruptedException {
  2.         Exchanger exchanger = new Exchanger<>();
  3.         CountDownLatch countDownLatch = new CountDownLatch(4);
  4.         CyclicBarrier cyclicBarrier = new CyclicBarrier(4);
  5.         // 生產(chǎn)者
  6.         Runnable producer = new Runnable() {
  7.             @Override
  8.             public void run() {
  9.                 try{
  10.                     cyclicBarrier.await();
  11.                     for (int i = 0; i < 5; i++) {
  12.                         String msg = RandomStringUtils.randomNumeric(6);
  13.                         exchanger.exchange(msg,5,TimeUnit.SECONDS);
  14.                         System.out.println(Thread.currentThread().getName() + "\t producer msg -> " + msg + " ,\t i -> " + i);
  15.                     }
  16.                 }catch (Exception e){
  17.                     e.printStackTrace();
  18.                 }finally {
  19.                     countDownLatch.countDown();
  20.                 }
  21.             }
  22.         };
  23.         // 消費(fèi)者
  24.         Runnable consumer = new Runnable() {
  25.             @Override
  26.             public void run() {
  27.                 try{
  28.                     cyclicBarrier.await();
  29.                     for (int i = 0; i < 5; i++) {
  30.                         String msg = exchanger.exchange(null,5,TimeUnit.SECONDS);
  31.                         System.out.println(Thread.currentThread().getName() + "\t consumer msg -> " + msg + ",\t" + i);
  32.                     }
  33.                 }catch (Exception e){
  34.                     e.printStackTrace();
  35.                 }finally {
  36.                     countDownLatch.countDown();
  37.                 }
  38.             }
  39.         };
  40.         for (int i = 0; i < 2; i++){
  41.             new Thread(producer).start();
  42.             new Thread(consumer).start();
  43.         }
  44.         countDownLatch.await();
  45.     }

 輸出如下,上面生產(chǎn)者和消費(fèi)者線程數(shù)是一樣的,循環(huán)次數(shù)也是一樣的,但是還是出現(xiàn)等待超時(shí)的情形:

 
 
 
 
  1. Thread-3  consumer msg -> null, 0
  2. Thread-1  consumer msg -> null, 0
  3. Thread-1  consumer msg -> null, 1
  4. Thread-2  producer msg -> 640010 ,  i -> 0
  5. Thread-2  producer msg -> 733133 ,  i -> 1
  6. Thread-3  consumer msg -> null, 1
  7. Thread-3  consumer msg -> 476520, 2
  8. Thread-1  consumer msg -> 640010, 2
  9. Thread-1  consumer msg -> null, 3
  10. Thread-0  producer msg -> 993414 ,  i -> 0
  11. Thread-0  producer msg -> 292745 ,  i -> 1
  12. Thread-2  producer msg -> 476520 ,  i -> 2
  13. Thread-2  producer msg -> 408446 ,  i -> 3
  14. Thread-3  consumer msg -> null, 3
  15. Thread-1  consumer msg -> 292745, 4
  16. Thread-2  producer msg -> 251971 ,  i -> 4
  17. Thread-0  producer msg -> 078939 ,  i -> 2
  18. Thread-3  consumer msg -> 251971, 4
  19. java.util.concurrent.TimeoutException
  20.  at java.util.concurrent.Exchanger.exchange(Exchanger.java:626)
  21.  at com.nuih.juc.ExchangerDemo$1.run(ExchangerDemo.java:70)
  22.  at java.lang.Thread.run(Thread.java:748)
  23. Process finished with exit code 0

 這種等待超時(shí)是概率出現(xiàn)的,這是為啥?

因?yàn)橄到y(tǒng)調(diào)度的不均衡和Exchanger底層的大量自旋等待導(dǎo)致這4個(gè)線程并不是調(diào)用exchanger成功的次數(shù)并不一致。另外從輸出可以看出,消費(fèi)者線程并沒有像我們想的那樣跟生產(chǎn)者線程一一匹配,生產(chǎn)者線程有時(shí)也充當(dāng)來消費(fèi)者線程,這是為啥?因?yàn)镋xchanger匹配時(shí)完全不關(guān)注這個(gè)線程的角色,兩個(gè)線程之間的匹配完全由調(diào)度決定的,即CPU同時(shí)執(zhí)行來或者緊挨著執(zhí)行來兩個(gè)線程,這兩個(gè)線程就匹配成功來。

源碼分析

Exchanger 類圖

其內(nèi)部主要變量和方法如下:

成員屬性

 
 
 
 
  1. // ThreadLocal變量,每個(gè)線程都有之間的一個(gè)副本
  2. private final Participant participant;
  3. // 高并發(fā)下使用的,保存待匹配的Node實(shí)例
  4. private volatile Node[] arena;
  5. // 低并發(fā)下,arena未初始化時(shí)使用的保存待匹配的Node實(shí)例
  6. private volatile Node slot;
  7. // 初始值為0,當(dāng)創(chuàng)建arena后被負(fù)責(zé)SEQ,用來記錄arena數(shù)組的可用最大索引,
  8. // 會(huì)隨著并發(fā)的增大而增大直到等于最大值FULL,
  9. // 會(huì)隨著并行的線程逐一匹配成功而減少恢復(fù)成初始值
  10. private volatile int bound;

 還有多個(gè)表示字段偏移量的靜態(tài)屬性,通過static代碼塊初始化,如下:

 
 
 
 
  1. // Unsafe mechanics
  2. private static final sun.misc.Unsafe U;
  3. private static final long BOUND;
  4. private static final long SLOT;
  5. private static final long MATCH;
  6. private static final long BLOCKER;
  7. private static final int ABASE;
  8. static {
  9.     int s;
  10.     try {
  11.         U = sun.misc.Unsafe.getUnsafe();
  12.         Class ek = Exchanger.class;
  13.         Class nk = Node.class;
  14.         Class ak = Node[].class;
  15.         Class tk = Thread.class;
  16.         BOUND = U.objectFieldOffset
  17.             (ek.getDeclaredField("bound"));
  18.         SLOT = U.objectFieldOffset
  19.             (ek.getDeclaredField("slot"));
  20.         MATCH = U.objectFieldOffset
  21.             (nk.getDeclaredField("match"));
  22.         BLOCKER = U.objectFieldOffset
  23.             (tk.getDeclaredField("parkBlocker"));
  24.         s = U.arrayIndexScale(ak);
  25.         // ABASE absorbs padding in front of element 0
  26.         ABASE = U.arrayBaseOffset(ak) + (1 << ASHIFT);
  27.     } catch (Exception e) {
  28.         throw new Error(e);
  29.     }
  30.     if ((s & (s-1)) != 0 || s > (1 << ASHIFT))
  31.         throw new Error("Unsupported array scale");
  32. }

 Exchanger 定義來多個(gè)靜態(tài)變量,如下:

 
 
 
 
  1. // 初始化arena時(shí)使用, 1 << ASHIFT 是一個(gè)緩存行的大小,避免來不同的Node落入到同一個(gè)高速緩存行
  2. // 這里實(shí)際是把數(shù)組容量擴(kuò)大來8倍,原來索引相鄰的兩個(gè)元素,擴(kuò)容后中間隔來7個(gè)元素,從元素的起始地址上看就隔來8個(gè)元素,中間的7個(gè)都是空的,為來避免原來相鄰的兩個(gè)元素都落入到同一個(gè)緩存行中
  3. // 因?yàn)閍rena是對(duì)象數(shù)組,一個(gè)元素占8字節(jié),8個(gè)就是64字節(jié)
  4. private static final int ASHIFT = 7;
  5. // arena 數(shù)組元素的索引最大值即255
  6. private static final int MMASK = 0xff;
  7. // arena 數(shù)組的最大長(zhǎng)度即256
  8. private static final int SEQ = MMASK + 1;
  9. // 獲取CPU核數(shù)
  10. private static final int NCPU = Runtime.getRuntime().availableProcessors();
  11. // 實(shí)際的數(shù)組長(zhǎng)度,因?yàn)槭蔷€程兩兩配對(duì)的,所以最大長(zhǎng)度是核數(shù)除以2
  12. static final int FULL = (NCPU >= (MMASK << 1)) ? MMASK : NCPU >>> 1;
  13. // 自旋等待的次數(shù)
  14. private static final int SPINS = 1 << 10;
  15. // 如果交換的對(duì)象是null,則返回此對(duì)象
  16. private static final Object NULL_ITEM = new Object();
  17. // 如果等待超時(shí)導(dǎo)致交換失敗,則返回此對(duì)象
  18. private static final Object TIMED_OUT = new Object();

 內(nèi)部類

Exchanger類中有兩個(gè)內(nèi)部類,一個(gè)Node,一個(gè)Participant。 Participant繼承了ThreadLocal并且重寫了其initialValue方法,返回一個(gè)Node對(duì)象。其定義如下:

 
 
 
 
  1. @sun.misc.Contended static final class Node {
  2.     int index;              // Arena index
  3.     int bound;              // Last recorded value of Exchanger.bound
  4.     int collides;           // Number of CAS failures at current bound
  5.     int hash;               // Pseudo-random for spins
  6.     Object item;            // This thread's current item
  7.     volatile Object match;  // Item provided by releasing thread
  8.     volatile Thread parked; // Set to this thread when parked, else null
  9. }
  10. /** The corresponding thread local class */
  11. static final class Participant extends ThreadLocal {
  12.     public Node initialValue() { return new Node(); }
  13. }

 其中Contended注解是為了避免高速緩存行導(dǎo)致的偽共享問題

  • index用來記錄arena數(shù)組的索引
  • bound用于記錄上一次的Exchanger bound屬性
  • collides用于記錄在bound不變的情況下CAS搶占失敗的次數(shù)
  • hash是自旋等待時(shí)計(jì)算隨機(jī)數(shù)使用的
  • item表示當(dāng)前線程請(qǐng)求交換的對(duì)象
  • match是同其它線程交換的結(jié)果,match不為null表示交換成功
  • parked為跟該Node關(guān)聯(lián)的處于休眠狀態(tài)的線程。

重要方法

exchange()方法

 
 
 
 
  1. @SuppressWarnings("unchecked")
  2. public V exchange(V x) throws InterruptedException {
  3.     Object v;
  4.     Object item = (x == null) ? NULL_ITEM : x; // translate null args
  5.     if ((arena != null || // 是null就執(zhí)行后面的方法
  6.          (v = slotExchange(item, false, 0L)) == null) &&
  7.         // 如果執(zhí)行slotExchange有結(jié)果就執(zhí)行后面的,否則返回
  8.         ((Thread.interrupted() || // 非中斷則執(zhí)行后面的方法
  9.           (v = arenaExchange(item, false, 0L)) == null)))
  10.         throw new InterruptedException();
  11.     return (v == NULL_ITEM) ? null : (V)v;
  12. }

 exchange 方法的執(zhí)行步驟:

  1. 如果執(zhí)行 soltExchange 有結(jié)果就執(zhí)行后面的 arenaExchange;
  2. 如果 slot 被占用,就執(zhí)行 arenaExchange;
  3. 返回的數(shù)據(jù) v 是對(duì)方線程的數(shù)據(jù)項(xiàng);
  4. 總結(jié)即:如果A線程先調(diào)用,那么A的數(shù)據(jù)項(xiàng)存儲(chǔ)的 item中,則B線程的數(shù)據(jù)項(xiàng)存儲(chǔ)在 math 中;
  5. 當(dāng)沒有多線程并發(fā)操作 Exchange 的時(shí)候,使用 slotExchange 就足夠了,slot 是一個(gè) node 對(duì)象;
  6. 當(dāng)出現(xiàn)并發(fā)了,一個(gè) slot 就不夠了,就需要使用一個(gè) node 數(shù)組 arena 操作了。

slotExchange()方法

slotExchange 是基于slot屬性來完成交換的,調(diào)用soltExchange方法時(shí),如果slot屬性為null,當(dāng)前線程會(huì)將slot屬性由null修改成當(dāng)前線程的Node,如果修改失敗則下一次for循環(huán)走solt屬性不為null的邏輯,如果修改成功則自旋等待,自旋一定次數(shù)后通過Unsafe的park方法當(dāng)當(dāng)前線程休眠,可以指定休眠的時(shí)間,如果沒有指定則無限期休眠直到被喚醒;無論是因?yàn)榫€程中斷被喚醒,等待超時(shí)被喚醒還是其它線程unpark喚醒的,都會(huì)檢查當(dāng)前線程的Node的屬性釋放為null,如果不為null說明交互成功,返回該對(duì)象;否則返回null或者TIME_OUT,在返回前會(huì)將item,match等屬性置為null,保存之前自旋時(shí)計(jì)算的hash值,方便下一次調(diào)用slotExchange。

調(diào)用slotExchange方法時(shí),如果slot屬性不為null,則當(dāng)前線程會(huì)嘗試將其修改null,如果cas修改成功,表示當(dāng)前線程與slot屬性對(duì)應(yīng)的線程匹配成功,會(huì)獲取slot屬性對(duì)應(yīng)Node的item屬性,將當(dāng)前線程交換的對(duì)象保存到slot屬性對(duì)應(yīng)的Node的match屬性,然后喚醒獲取slot屬性對(duì)應(yīng)Node的waiter屬性,即處理休眠狀態(tài)的線程,至此交換完成,同樣的在返回前需要將item,match等屬性置為null,保存之前自旋時(shí)計(jì)算的hash置,方便下一次調(diào)用slotExchange;如果cas修改slot屬性失敗,說明有其它線程也在搶占slot,則初始化arena屬性,下一次for循環(huán)因?yàn)閍rena屬性不為null,直接返回null,從而通過arenaExchange完成交換。

 
 
 
 
  1. // arena 為null是會(huì)調(diào)用此方法,返回null表示交換失敗
  2. // item是交換的對(duì)象,timed表示是否等待指定的時(shí)間,為false表示無限期等待,ns為等待時(shí)間
  3. private final Object slotExchange(Object item, boolean timed, long ns) {
  4.     // 獲取當(dāng)前線程關(guān)聯(lián)的participant Node
  5.     Node p = participant.get();
  6.     Thread t = Thread.currentThread();
  7.     // 被中斷,返回null
  8.     if (t.isInterrupted()) // preserve interrupt status so caller can recheck
  9.         return null;
  10.     
  11.     for (Node q;;) {
  12.         if ((q = slot) != null) { // slot 不為null
  13.             // 將slot置為null,slot對(duì)應(yīng)的線程與當(dāng)前線程匹配成功
  14.             if (U.compareAndSwapObject(this, SLOT, q, null)) {
  15.                 Object v = q.item;
  16.                 // 保存item,即完成交互
  17.                 q.match = item;
  18.                 // 喚醒q對(duì)應(yīng)的處于休眠狀態(tài)的線程
  19.                 Thread w = q.parked;
  20.                 if (w != null)
  21.                     U.unpark(w);
  22.                 return v;
  23.             }
  24.             // slot修改失敗,其它某個(gè)線程搶占來該slot,多個(gè)線程同時(shí)調(diào)用exchange方法會(huì)觸發(fā)此邏輯
  25.             // bound等于0表示未初始化,此處校驗(yàn)避免重復(fù)初始化
  26.             if (NCPU > 1 && bound == 0 &&
  27.                 U.compareAndSwapInt(this, BOUND, 0, SEQ))
  28.                 arena = new Node[(FULL + 2) << ASHIFT];
  29.         }
  30.         else if (arena != null)
  31.             return null; // carena不為null,通過arenaExchange交互
  32.         else {
  33.             // slot和arena都為null
  34.             p.item = item;
  35.             // 修改slot為p,修改成功則終止循環(huán)
  36.             if (U.compareAndSwapObject(this, SLOT, null, p))
  37.                 break;
  38.             // 修改失敗則繼續(xù)for循環(huán),將otem恢復(fù)成null
  39.             p.item = null;
  40.         }
  41.     }
  42.     // 將slot修改為p后會(huì)進(jìn)入此分支
  43.     int h = p.hash; // hash初始為0
  44.     long end = timed ? System.nanoTime() + ns : 0L;
  45.     int spins = (NCPU > 1) ? SPINS : 1;
  46.     Object v;
  47.     // match保存著同其他線程交換的對(duì)象,如果不為null,說明交換成功了
  48.     while ((v = p.match) == null) {
  49.         // 執(zhí)行自旋等待
  50.         if (spins > 0) {
  51.         h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
  52.         if (h == 0)
  53.             h = SPINS | (int)t.getId(); 初始化h
  54.         // 只有生成的h小于0時(shí)才減少spins
  55.         else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
  56.             Thread.yield();
  57.         }
  58.         // slot被修改了,已經(jīng)有匹配的線程,重新自旋,讀取屬性,因?yàn)槭窍刃薷膕lot再修改屬性的,兩者因?yàn)镃PU調(diào)度的問題可能有時(shí)間差
  59.         else if (slot != p)
  60.             spins = SPINS;
  61.         // 線程沒有被中斷且arena為null
  62.         else if (!t.isInterrupted() && arena == null &&
  63.                  (!timed || (ns = end - System.nanoTime()) > 0L)) {
  64.             U.putObject(t, BLOCKER, this);
  65.             p.parked = t;
  66.             if (slot == p)
  67.                 U.park(false, ns);
  68.             // 線程被喚醒,繼續(xù)下一次for循環(huán)
  69.             // 如果是因?yàn)榈却瑫r(shí)而被喚醒,下次for循環(huán)進(jìn)入下沒的else if分支,返回TIMED_OUT
  70.             p.parked = null;
  71.                 U.putObject(t, BLOCKER, null);
  72.         }
  73.         // 將slot修改成p
  74.         else if (U.compareAndSwapObject(this, SLOT, p, null)) {
  75.             // timed為flase,無限期等待,因?yàn)橹袛啾粏拘逊祷豱ull
  76.             // timed為ture,因?yàn)槌瑫r(shí)被喚醒,返回TIMED_OUT,因?yàn)橹袛啾粏拘逊祷豱ull
  77.             v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
  78.             break;
  79.         }
  80.     }
  81.     // 修改match為null,item為null,保存h,下一次exchange是h就不是初始值為0了
  82.     U.putOrderedObject(p, MATCH, null);
  83.     // 重置 item
  84.     p.item = null;
  85.     // 保留偽隨機(jī)數(shù),供下次種子數(shù)字
  86.     p.hash = h;
  87.     // 返回
  88.     return v;
  89. }

 總結(jié)一下上面執(zhí)行的邏輯:

  • Exchange 使用了對(duì)象池的技術(shù),將對(duì)象保存在 ThreadLocal 中,這個(gè)對(duì)象(Node)封裝了數(shù)據(jù)項(xiàng),線程對(duì)象等關(guān)鍵數(shù)據(jù);
  • 第一個(gè)線程進(jìn)入的時(shí)候,會(huì)將數(shù)據(jù)放到池化對(duì)象中,并賦值給 slot 的 item,并阻塞自己(通常不會(huì)立即阻塞,而是使用 yield 自旋一會(huì)兒),等待對(duì)方取值;
  • 當(dāng)?shù)诙€(gè)線程進(jìn)入的時(shí)候,會(huì)拿出存儲(chǔ)在 slot item 中的值,然后對(duì) slot 的 match 賦值,并喚醒上次阻塞的線程;
  • 當(dāng)?shù)谝粋€(gè)線程阻塞被喚醒后,說明對(duì)方取到值了,就獲取 slot 的 match 值,并重置 slot 的數(shù)據(jù)和池化對(duì)象的數(shù)據(jù),并返回自己的數(shù)據(jù);
  • 如果超時(shí)了,就返回 Time_out 對(duì)象;
  • 如果線程中斷了,就返回 null。

在該方法中,會(huì)返回 2 種結(jié)果,一是有效的 item,二是 null 要么是線程競(jìng)爭(zhēng)使用 slot 了,創(chuàng)建了 arena 數(shù)組,要么是線程中斷了。

通過一副圖來看看具體邏輯

arenaExchange() 方法

arenaExchange是基于arena屬性完成交換的,整體邏輯比較復(fù)雜,有以下幾個(gè)要點(diǎn):

  • m的初始值就是0,index的初始值也是0,兩個(gè)都是大于等于0且i不大于m,當(dāng)某個(gè)線程多次嘗試搶占index對(duì)應(yīng)數(shù)組元素的Node都失敗的情形下則嘗試將m加1,然后搶占m加1對(duì)應(yīng)的新數(shù)組元素,將其由null修改成當(dāng)前線程關(guān)聯(lián)的Node,然后自旋等待匹配;如果自旋結(jié)束,沒有匹配的線程,則將m加1對(duì)應(yīng)的新數(shù)組元素重新置為null,將m減1,然后再次for循環(huán)搶占其他為null的數(shù)組元素。極端并發(fā)下m會(huì)一直增加直到達(dá)到最大值FULL為止,達(dá)到FULL后只能通過for循環(huán)不斷嘗試與其他線程匹配或者搶占為null的數(shù)組元素,然后隨著并發(fā)減少,m會(huì)一直減少到0。通過這種動(dòng)態(tài)調(diào)整m的方式可以避免過多的線程基于CAS修改同一個(gè)元素導(dǎo)致CAS失敗,提高匹配的效率,這種思想跟LongAdder的實(shí)現(xiàn)是一致的。
  • 只有當(dāng)m等于0的時(shí)候才會(huì)通過Unsafe park方法讓線程休眠,如果不等于0,即此時(shí)存在多個(gè)并行的等待匹配的線程,則主要通過自旋的方式等待其他線程到來,這是因?yàn)榻粨Q動(dòng)作本身是很快的很短暫的,通過自旋等待就可以讓多個(gè)等待的線程快速的完成匹配;只有當(dāng)前只剩下一個(gè)線程的時(shí)候,此時(shí)m肯定等于0,短期內(nèi)沒有匹配的線程,才會(huì)考慮通過park方法阻塞。
 
 
 
 
  1. // 搶占slot失敗后進(jìn)入此方法,arena不為空    
  2. private final Object arenaExchange(Object item, boolean timed, long ns) {
  3.     Node[] a = arena;
  4.     Node p = participant.get();
  5.     // index初始為0
  6.     for (int i = p.index;;) {                      // access slot at i
  7.         int b, m, c; long j;                       // j is raw array offset
  8.         // 在創(chuàng)建arena時(shí),將本來的數(shù)組容量 << ASHIFT,為了避免數(shù)組元素落到了同一個(gè)高速緩存行
  9.         // 這里獲取真實(shí)的數(shù)組元素索引時(shí)也需要 << ASHIFR
  10.         Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE);
  11.         // 如果q不為null,則將對(duì)應(yīng)的數(shù)組元素置為null,表示當(dāng)前線程和該元素對(duì)應(yīng)的線程匹配l
  12.         if (q != null && U.compareAndSwapObject(a, j, q, null)) {
  13.             Object v = q.item;                     // release
  14.             q.match = item; // 保存item,交互成功
  15.             Thread w = q.parked;
  16.             if (w != null) // 喚醒等待的線程
  17.                 U.unpark(w);
  18.             return v;
  19.         }
  20.         // q為null 或者q不為null,cas搶占q失敗了
  21.         // bound初始化時(shí)時(shí)SEQ,SEQ & MMASK 就是0,即m的初始值就0,m為0時(shí),i肯定為0
  22.         else if (i <= (m = (b = bound) & MMASK) && q == null) {
  23.             p.item = item;                         // offer
  24.             if (U.compareAndSwapObject(a, j, null, p)) {
  25.                 long end = (timed && m == 0) ? System.nanoTime() + ns : 0L;
  26.                 Thread t = Thread.currentThread(); // wait
  27.                 for (int h = p.hash, spins = SPINS;;) {
  28.                     Object v = p.match;
  29.                     if (v != null) {
  30.                         U.putOrderedObject(p, MATCH, null);
  31.                         p.item = null;             // clear for next use
  32.                         p.hash = h;
  33.                         return v;
  34.                     }
  35.                     else if (spins > 0) {
  36.                         h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshift
  37.                         if (h == 0)                // initialize hash
  38.                             h = SPINS | (int)t.getId();
  39.                         else if (h < 0 &&          // approx 50% true
  40.                                  (--spins & ((SPINS >>> 1) - 1)) == 0)
  41.                             Thread.yield();        // two yields per wait
  42.                     }
  43.                     else if (U.getObjectVolatile(a, j) != p)
  44.                         spins = SPINS;       // releaser hasn't set match yet
  45.                     else if (!t.isInterrupted() && m == 0 &&
  46.                              (!timed ||
  47.                               (ns = end - System.nanoTime()) > 0L)) {
  48.                         U.putObject(t, BLOCKER, this); // emulate LockSupport
  49.                      &n
    標(biāo)題名稱:并發(fā)編程之Exchanger原理與使用
    網(wǎng)頁地址:http://m.5511xx.com/article/ccdhoeo.html