日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
Java阻塞隊(duì)列實(shí)現(xiàn)原理分析

Java中的阻塞隊(duì)列接口BlockingQueue繼承自Queue接口。

創(chuàng)新互聯(lián)建站主要從事成都網(wǎng)站設(shè)計(jì)、成都網(wǎng)站建設(shè)、網(wǎng)頁設(shè)計(jì)、企業(yè)做網(wǎng)站、公司建網(wǎng)站等業(yè)務(wù)。立足成都服務(wù)上思,十年網(wǎng)站建設(shè)經(jīng)驗(yàn),價(jià)格優(yōu)惠、服務(wù)專業(yè),歡迎來電咨詢建站服務(wù):18982081108

BlockingQueue接口提供了3個添加元素方法:

  • add:添加元素到隊(duì)列里,添加成功返回true,由于容量滿了添加失敗會拋出IllegalStateException異常;
  • offer:添加元素到隊(duì)列里,添加成功返回true,添加失敗返回false;
  • put:添加元素到隊(duì)列里,如果容量滿了會阻塞直到容量不滿。

3個刪除方法:

  • poll:刪除隊(duì)列頭部元素,如果隊(duì)列為空,返回null。否則返回元素;
  • remove:基于對象找到對應(yīng)的元素,并刪除。刪除成功返回true,否則返回false;
  • take:刪除隊(duì)列頭部元素,如果隊(duì)列為空,一直阻塞到隊(duì)列有元素并刪除。

常用的阻塞隊(duì)列具體類有ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、LinkedBlockingDeque等。

本文以ArrayBlockingQueue和LinkedBlockingQueue為例,分析它們的實(shí)現(xiàn)原理。

ArrayBlockingQueue

ArrayBlockingQueue的原理就是使用一個可重入鎖和這個鎖生成的兩個條件對象進(jìn)行并發(fā)控制(classic two-condition algorithm)。

ArrayBlockingQueue是一個帶有長度的阻塞隊(duì)列,初始化的時(shí)候必須要指定隊(duì)列長度,且指定長度之后不允許進(jìn)行修改。

它帶有的屬性如下:

 
 
 
 
  1. // 存儲隊(duì)列元素的數(shù)組,是個循環(huán)數(shù)組
  2. final Object[] items;
  3.  
  4. // 拿數(shù)據(jù)的索引,用于take,poll,peek,remove方法
  5. int takeIndex;
  6.  
  7. // 放數(shù)據(jù)的索引,用于put,offer,add方法
  8. int putIndex;
  9.  
  10. // 元素個數(shù)
  11. int count;
  12.  
  13. // 可重入鎖
  14. final ReentrantLock lock;
  15. // notEmpty條件對象,由lock創(chuàng)建
  16. private final Condition notEmpty;
  17. // notFull條件對象,由lock創(chuàng)建
  18. private final Condition notFull; 

數(shù)據(jù)的添加

ArrayBlockingQueue有不同的幾個數(shù)據(jù)添加方法,add、offer、put方法。

add方法:

 
 
 
 
  1. public boolean add(E e) {
  2.     if (offer(e))
  3.         return true;
  4.     else
  5.         throw new IllegalStateException("Queue full");

add方法內(nèi)部調(diào)用offer方法如下:

 
 
 
 
  1. public boolean offer(E e) {
  2.     checkNotNull(e); // 不允許元素為空
  3.     final ReentrantLock lock = this.lock;
  4.     lock.lock(); // 加鎖,保證調(diào)用offer方法的時(shí)候只有1個線程
  5.     try {
  6.         if (count == items.length) // 如果隊(duì)列已滿
  7.             return false; // 直接返回false,添加失敗
  8.         else {
  9.             insert(e); // 數(shù)組沒滿的話調(diào)用insert方法
  10.             return true; // 返回true,添加成功
  11.         }
  12.     } finally {
  13.         lock.unlock(); // 釋放鎖,讓其他線程可以調(diào)用offer方法
  14.     }

insert方法如下:

 
 
 
 
  1. private void insert(E x) {
  2.     items[putIndex] = x; // 元素添加到數(shù)組里
  3.     putIndex = inc(putIndex); // 放數(shù)據(jù)索引+1,當(dāng)索引滿了變成0
  4.     ++count; // 元素個數(shù)+1
  5.     notEmpty.signal(); // 使用條件對象notEmpty通知,比如使用take方法的時(shí)候隊(duì)列里沒有數(shù)據(jù),被阻塞。這個時(shí)候隊(duì)列insert了一條數(shù)據(jù),需要調(diào)用signal進(jìn)行通知

put方法:

 
 
 
 
  1. public void put(E e) throws InterruptedException {
  2.     checkNotNull(e); // 不允許元素為空
  3.     final ReentrantLock lock = this.lock;
  4.     lock.lockInterruptibly(); // 加鎖,保證調(diào)用put方法的時(shí)候只有1個線程
  5.     try {
  6.         while (count == items.length) // 如果隊(duì)列滿了,阻塞當(dāng)前線程,并加入到條件對象notFull的等待隊(duì)列里
  7.             notFull.await(); // 線程阻塞并被掛起,同時(shí)釋放鎖
  8.         insert(e); // 調(diào)用insert方法
  9.     } finally {
  10.         lock.unlock(); // 釋放鎖,讓其他線程可以調(diào)用put方法
  11.     }

ArrayBlockingQueue的添加數(shù)據(jù)方法有add,put,offer這3個方法,總結(jié)如下:

add方法內(nèi)部調(diào)用offer方法,如果隊(duì)列滿了,拋出IllegalStateException異常,否則返回true

offer方法如果隊(duì)列滿了,返回false,否則返回true

add方法和offer方法不會阻塞線程,put方法如果隊(duì)列滿了會阻塞線程,直到有線程消費(fèi)了隊(duì)列里的數(shù)據(jù)才有可能被喚醒。

這3個方法內(nèi)部都會使用可重入鎖保證原子性。

數(shù)據(jù)的刪除

ArrayBlockingQueue有不同的幾個數(shù)據(jù)刪除方法,poll、take、remove方法。

poll方法:

 
 
 
 
  1. public E poll() {
  2.     final ReentrantLock lock = this.lock;
  3.     lock.lock(); // 加鎖,保證調(diào)用poll方法的時(shí)候只有1個線程
  4.     try {
  5.         return (count == 0) ? null : extract(); // 如果隊(duì)列里沒元素了,返回null,否則調(diào)用extract方法
  6.     } finally {
  7.         lock.unlock(); // 釋放鎖,讓其他線程可以調(diào)用poll方法
  8.     }

poll方法內(nèi)部調(diào)用extract方法:

 
 
 
 
  1. private E extract() {
  2.     final Object[] items = this.items;
  3.     E x = this.cast(items[takeIndex]); // 得到取索引位置上的元素
  4.     items[takeIndex] = null; // 對應(yīng)取索引上的數(shù)據(jù)清空
  5.     takeIndex = inc(takeIndex); // 取數(shù)據(jù)索引+1,當(dāng)索引滿了變成0
  6.     --count; // 元素個數(shù)-1
  7.     notFull.signal(); // 使用條件對象notFull通知,比如使用put方法放數(shù)據(jù)的時(shí)候隊(duì)列已滿,被阻塞。這個時(shí)候消費(fèi)了一條數(shù)據(jù),隊(duì)列沒滿了,就需要調(diào)用signal進(jìn)行通知
  8.     return x; // 返回元素

take方法:

 
 
 
 
  1. public E take() throws InterruptedException {
  2.     final ReentrantLock lock = this.lock;
  3.     lock.lockInterruptibly(); // 加鎖,保證調(diào)用take方法的時(shí)候只有1個線程
  4.     try {
  5.         while (count == 0) // 如果隊(duì)列空,阻塞當(dāng)前線程,并加入到條件對象notEmpty的等待隊(duì)列里
  6.             notEmpty.await(); // 線程阻塞并被掛起,同時(shí)釋放鎖
  7.         return extract(); // 調(diào)用extract方法
  8.     } finally {
  9.         lock.unlock(); // 釋放鎖,讓其他線程可以調(diào)用take方法
  10.     }

remove方法:

 
 
 
 
  1. public boolean remove(Object o) {
  2.     if (o == null) return false;
  3.     final Object[] items = this.items;
  4.     final ReentrantLock lock = this.lock;
  5.     lock.lock(); // 加鎖,保證調(diào)用remove方法的時(shí)候只有1個線程
  6.     try {
  7.         for (int i = takeIndex, k = count; k > 0; i = inc(i), k--) { // 遍歷元素
  8.             if (o.equals(items[i])) { // 兩個對象相等的話
  9.                 removeAt(i); // 調(diào)用removeAt方法
  10.                 return true; // 刪除成功,返回true
  11.             }
  12.         }
  13.         return false; // 刪除成功,返回false
  14.     } finally {
  15.         lock.unlock(); // 釋放鎖,讓其他線程可以調(diào)用remove方法
  16.     }

removeAt方法:

 
 
 
 
  1. void removeAt(int i) {
  2.     final Object[] items = this.items;
  3.     if (i == takeIndex) { // 如果要刪除數(shù)據(jù)的索引是取索引位置,直接刪除取索引位置上的數(shù)據(jù),然后取索引+1即可
  4.         items[takeIndex] = null;
  5.         takeIndex = inc(takeIndex);
  6.     } else { // 如果要刪除數(shù)據(jù)的索引不是取索引位置,移動元素元素,更新取索引和放索引的值
  7.         for (;;) {
  8.             int nexti = inc(i);
  9.             if (nexti != putIndex) {
  10.                 items[i] = items[nexti];
  11.                 i = nexti;
  12.             } else {
  13.                 items[i] = null;
  14.                 putIndex = i;
  15.                 break;
  16.             }
  17.         }
  18.     }
  19.     --count; // 元素個數(shù)-1
  20.     notFull.signal(); // 使用條件對象notFull通知,比如使用put方法放數(shù)據(jù)的時(shí)候隊(duì)列已滿,被阻塞。這個時(shí)候消費(fèi)了一條數(shù)據(jù),隊(duì)列沒滿了,就需要調(diào)用signal進(jìn)行通知 

ArrayBlockingQueue的刪除數(shù)據(jù)方法有poll,take,remove這3個方法,總結(jié)如下:

poll方法對于隊(duì)列為空的情況,返回null,否則返回隊(duì)列頭部元素。

remove方法取的元素是基于對象的下標(biāo)值,刪除成功返回true,否則返回false。

poll方法和remove方法不會阻塞線程。

take方法對于隊(duì)列為空的情況,會阻塞并掛起當(dāng)前線程,直到有數(shù)據(jù)加入到隊(duì)列中。

這3個方法內(nèi)部都會調(diào)用notFull.signal方法通知正在等待隊(duì)列滿情況下的阻塞線程。

LinkedBlockingQueue

LinkedBlockingQueue是一個使用鏈表完成隊(duì)列操作的阻塞隊(duì)列。鏈表是單向鏈表,而不是雙向鏈表。

內(nèi)部使用放鎖和拿鎖,這兩個鎖實(shí)現(xiàn)阻塞(“two lock queue” algorithm)。

它帶有的屬性如下:

 
 
 
 
  1. // 容量大小
  2. private final int capacity;
  3.  
  4. // 元素個數(shù),因?yàn)橛?個鎖,存在競態(tài)條件,使用AtomicInteger
  5. private final AtomicInteger count = new AtomicInteger(0);
  6.  
  7. // 頭結(jié)點(diǎn)
  8. private transient Node head;
  9.  
  10. // 尾節(jié)點(diǎn)
  11. private transient Node last;
  12.  
  13. // 拿鎖
  14. private final ReentrantLock takeLock = new ReentrantLock();
  15.  
  16. // 拿鎖的條件對象
  17. private final Condition notEmpty = takeLock.newCondition();
  18.  
  19. // 放鎖
  20. private final ReentrantLock putLock = new ReentrantLock();
  21.  
  22. // 放鎖的條件對象
  23. private final Condition notFull = putLock.newCondition(); 

ArrayBlockingQueue只有1個鎖,添加數(shù)據(jù)和刪除數(shù)據(jù)的時(shí)候只能有1個被執(zhí)行,不允許并行執(zhí)行。

而LinkedBlockingQueue有2個鎖,放鎖和拿鎖,添加數(shù)據(jù)和刪除數(shù)據(jù)是可以并行進(jìn)行的,當(dāng)然添加數(shù)據(jù)和刪除數(shù)據(jù)的時(shí)候只能有1個線程各自執(zhí)行。

數(shù)據(jù)的添加

LinkedBlockingQueue有不同的幾個數(shù)據(jù)添加方法,add、offer、put方法。

add方法內(nèi)部調(diào)用offer方法:

 
 
 
 
  1. public boolean offer(E e) {
  2.     if (e == null) throw new NullPointerException(); // 不允許空元素
  3.     final AtomicInteger count = this.count;
  4.     if (count.get() == capacity) // 如果容量滿了,返回false
  5.         return false;
  6.     int c = -1;
  7.     Node node = new Node(e); // 容量沒滿,以新元素構(gòu)造節(jié)點(diǎn)
  8.     final ReentrantLock putLock = this.putLock;
  9.     putLock.lock(); // 放鎖加鎖,保證調(diào)用offer方法的時(shí)候只有1個線程
  10.     try {
  11.         if (count.get() < capacity) { // 再次判斷容量是否已滿,因?yàn)榭赡苣面i在進(jìn)行消費(fèi)數(shù)據(jù),沒滿的話繼續(xù)執(zhí)行
  12.             enqueue(node); // 節(jié)點(diǎn)添加到鏈表尾部
  13.             c = count.getAndIncrement(); // 元素個數(shù)+1
  14.             if (c + 1 < capacity) // 如果容量還沒滿
  15.                 notFull.signal(); // 在放鎖的條件對象notFull上喚醒正在等待的線程,表示可以再次往隊(duì)列里面加數(shù)據(jù)了,隊(duì)列還沒滿
  16.         }
  17.     } finally {
  18.         putLock.unlock(); // 釋放放鎖,讓其他線程可以調(diào)用offer方法
  19.     }
  20.     if (c == 0) // 由于存在放鎖和拿鎖,這里可能拿鎖一直在消費(fèi)數(shù)據(jù),count會變化。這里的if條件表示如果隊(duì)列中還有1條數(shù)據(jù)
  21.         signalNotEmpty(); // 在拿鎖的條件對象notEmpty上喚醒正在等待的1個線程,表示隊(duì)列里還有1條數(shù)據(jù),可以進(jìn)行消費(fèi)
  22.     return c >= 0; // 添加成功返回true,否則返回false

put方法:

 
 
 
 
  1. public void put(E e) throws InterruptedException {
  2.     if (e == null) throw new NullPointerException(); // 不允許空元素
  3.     int c = -1;
  4.     Node node = new Node(e); // 以新元素構(gòu)造節(jié)點(diǎn)
  5.     final ReentrantLock putLock = this.putLock;
  6.     final AtomicInteger count = this.count;
  7.     putLock.lockInterruptibly(); // 放鎖加鎖,保證調(diào)用put方法的時(shí)候只有1個線程
  8.     try {
  9.         while (count.get() == capacity) { // 如果容量滿了
  10.             notFull.await(); // 阻塞并掛起當(dāng)前線程
  11.         }
  12.         enqueue(node); // 節(jié)點(diǎn)添加到鏈表尾部
  13.         c = count.getAndIncrement(); // 元素個數(shù)+1
  14.         if (c + 1 < capacity) // 如果容量還沒滿
  15.             notFull.signal(); // 在放鎖的條件對象notFull上喚醒正在等待的線程,表示可以再次往隊(duì)列里面加數(shù)據(jù)了,隊(duì)列還沒滿
  16.     } finally {
  17.         putLock.unlock(); // 釋放放鎖,讓其他線程可以調(diào)用put方法
  18.     }
  19.     if (c == 0) // 由于存在放鎖和拿鎖,這里可能拿鎖一直在消費(fèi)數(shù)據(jù),count會變化。這里的if條件表示如果隊(duì)列中還有1條數(shù)據(jù)
  20.         signalNotEmpty(); // 在拿鎖的條件對象notEmpty上喚醒正在等待的1個線程,表示隊(duì)列里還有1條數(shù)據(jù),可以進(jìn)行消費(fèi)

LinkedBlockingQueue的添加數(shù)據(jù)方法add,put,offer跟ArrayBlockingQueue一樣,不同的是它們的底層實(shí)現(xiàn)不一樣。

ArrayBlockingQueue中放入數(shù)據(jù)阻塞的時(shí)候,需要消費(fèi)數(shù)據(jù)才能喚醒。

而LinkedBlockingQueue中放入數(shù)據(jù)阻塞的時(shí)候,因?yàn)樗鼉?nèi)部有2個鎖,可以并行執(zhí)行放入數(shù)據(jù)和消費(fèi)數(shù)據(jù),不僅在消費(fèi)數(shù)據(jù)的時(shí)候進(jìn)行喚醒插入阻塞的線程,同時(shí)在插入的時(shí)候如果容量還沒滿,也會喚醒插入阻塞的線程。

數(shù)據(jù)的刪除

LinkedBlockingQueue有不同的幾個數(shù)據(jù)刪除方法,poll、take、remove方法。

poll方法:

 
 
 
 
  1. public E poll() {
  2.     final AtomicInteger count = this.count;
  3.     if (count.get() == 0) // 如果元素個數(shù)為0
  4.         return null; // 返回null
  5.     E x = null;
  6.     int c = -1;
  7.     final ReentrantLock takeLock = this.takeLock;
  8.     takeLock.lock(); // 拿鎖加鎖,保證調(diào)用poll方法的時(shí)候只有1個線程
  9.     try {
  10.         if (count.get() > 0) { // 判斷隊(duì)列里是否還有數(shù)據(jù)
  11.             x = dequeue(); // 刪除頭結(jié)點(diǎn)
  12.             c = count.getAndDecrement(); // 元素個數(shù)-1
  13.             if (c > 1) // 如果隊(duì)列里還有元素
  14.                 notEmpty.signal(); // 在拿鎖的條件對象notEmpty上喚醒正在等待的線程,表示隊(duì)列里還有數(shù)據(jù),可以再次消費(fèi)
  15.         }
  16.     } finally {
  17.         takeLock.unlock(); // 釋放拿鎖,讓其他線程可以調(diào)用poll方法
  18.     }
  19.     if (c == capacity) // 由于存在放鎖和拿鎖,這里可能放鎖一直在添加數(shù)據(jù),count會變化。這里的if條件表示如果隊(duì)列中還可以再插入數(shù)據(jù)
  20.         signalNotFull(); // 在放鎖的條件對象notFull上喚醒正在等待的1個線程,表示隊(duì)列里還能再次添加數(shù)據(jù)
  21.                 return x;
  22. }

take方法:

 
 
 
 
  1. public E take() throws InterruptedException {
  2.     E x;
  3.     int c = -1;
  4.     final AtomicInteger count = this.count;
  5.     final ReentrantLock takeLock = this.takeLock;
  6.     takeLock.lockInterruptibly(); // 拿鎖加鎖,保證調(diào)用take方法的時(shí)候只有1個線程
  7.     try {
  8.         while (count.get() == 0) { // 如果隊(duì)列里已經(jīng)沒有元素了
  9.             notEmpty.await(); // 阻塞并掛起當(dāng)前線程
  10.         }
  11.         x = dequeue(); // 刪除頭結(jié)點(diǎn)
  12.         c = count.getAndDecrement(); // 元素個數(shù)-1
  13.         if (c > 1) // 如果隊(duì)列里還有元素
  14.             notEmpty.signal(); // 在拿鎖的條件對象notEmpty上喚醒正在等待的線程,表示隊(duì)列里還有數(shù)據(jù),可以再次消費(fèi)
  15.     } finally {
  16.         takeLock.unlock(); // 釋放拿鎖,讓其他線程可以調(diào)用take方法
  17.     }
  18.     if (c == capacity) // 由于存在放鎖和拿鎖,這里可能放鎖一直在添加數(shù)據(jù),count會變化。這里的if條件表示如果隊(duì)列中還可以再插入數(shù)據(jù)
  19.         signalNotFull(); // 在放鎖的條件對象notFull上喚醒正在等待的1個線程,表示隊(duì)列里還能再次添加數(shù)據(jù)
  20.     return x;
  21. }

remove方法:

 
 
 
 
  1. public boolean remove(Object o) {
  2.     if (o == null) return false;
  3.     fullyLock(); // remove操作要移動的位置不固定,2個鎖都需要加鎖
  4.     try {
  5.         for (Node trail = head, p = trail.next; // 從鏈表頭結(jié)點(diǎn)開始遍歷
  6.              p != null;
  7.              trail = p, p = p.next) {
  8.             if (o.equals(p.item)) { // 判斷是否找到對象
  9.                 unlink(p, trail); // 修改節(jié)點(diǎn)的鏈接信息,同時(shí)調(diào)用notFull的signal方法
  10.                 return true;
  11.             }
  12.         }
  13.         return false;
  14.     } finally {
  15.         fullyUnlock(); // 2個鎖解鎖
  16.     }
  17. }

LinkedBlockingQueue的take方法對于沒數(shù)據(jù)的情況下會阻塞,poll方法刪除鏈表頭結(jié)點(diǎn),remove方法刪除指定的對象。

需要注意的是remove方法由于要刪除的數(shù)據(jù)的位置不確定,需要2個鎖同時(shí)加鎖。


網(wǎng)站欄目:Java阻塞隊(duì)列實(shí)現(xiàn)原理分析
網(wǎng)頁地址:http://m.5511xx.com/article/dpsipig.html