新聞中心
前言
本節(jié)以ArrayBlockingQueue為例, 帶大家看下阻塞隊(duì)列是如何實(shí)現(xiàn),一起來(lái)看下吧!

創(chuàng)新互聯(lián)是一家網(wǎng)站設(shè)計(jì)公司,集創(chuàng)意、互聯(lián)網(wǎng)應(yīng)用、軟件技術(shù)為一體的創(chuàng)意網(wǎng)站建設(shè)服務(wù)商,主營(yíng)產(chǎn)品:響應(yīng)式網(wǎng)站設(shè)計(jì)、成都品牌網(wǎng)站建設(shè)、營(yíng)銷型網(wǎng)站。我們專注企業(yè)品牌在網(wǎng)站中的整體樹立,網(wǎng)絡(luò)互動(dòng)的體驗(yàn),以及在手機(jī)等移動(dòng)端的優(yōu)質(zhì)呈現(xiàn)。網(wǎng)站制作、做網(wǎng)站、移動(dòng)互聯(lián)產(chǎn)品、網(wǎng)絡(luò)運(yùn)營(yíng)、VI設(shè)計(jì)、云產(chǎn)品.運(yùn)維為核心業(yè)務(wù)。為用戶提供一站式解決方案,我們深知市場(chǎng)的競(jìng)爭(zhēng)激烈,認(rèn)真對(duì)待每位客戶,為客戶提供賞析悅目的作品,網(wǎng)站的價(jià)值服務(wù)。
ArrayBlockingQueue 源碼分析
構(gòu)造函數(shù)
同樣的,我們先從它的構(gòu)造函數(shù)看起。
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}- capacity 固定容量大小。
- false,這個(gè)字段名稱其實(shí)是fair默認(rèn)下它是false,非公平鎖。
??上節(jié)??我們使用的就是它的默認(rèn)用法,公平鎖和非公平鎖我們之前講過,可以查看以往文章(ReentrantLock源碼分析)。下面我們接著看:
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}從上面的代碼來(lái)看,可知capacity > 0,第一個(gè)構(gòu)造函數(shù)的this()其實(shí)就是調(diào)的這個(gè)構(gòu)造函數(shù),我們可以通過它來(lái)指定容量和訪問策略(fair 和 nofair)的ArrayBlockingQueue。
再接著看最后一個(gè)構(gòu)造函數(shù)。
public ArrayBlockingQueue(int capacity, boolean fair,
Collection extends E> c) {
this(capacity, fair);
final ReentrantLock lock = this.lock;
// 鎖用于可見性
lock.lock();
try {
int i = 0;
try {
// 迭代集合
for (E e : c) {
checkNotNull(e);
items[i++] = e;
}
// 捕獲異常 越界
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i;
putIndex = (i == capacity) ? 0 : i;
} finally {
lock.unlock();
}
}
從代碼來(lái)看,對(duì)比上一個(gè)多了個(gè)Collection,這是干嘛的呢?它允許我們?cè)趧?chuàng)建的時(shí)候初始化一個(gè)集合進(jìn)去,按迭代順序添加到容器,從它的內(nèi)部我們也可以看出來(lái)。
內(nèi)部變量
// 隊(duì)列的元素
final Object[] items;
// 獲取下一個(gè)元素時(shí)的索引
int takeIndex;
// 下一個(gè)添加元素時(shí)的索引
int putIndex;
// 隊(duì)列的元素?cái)?shù)量
int count;
// 全局鎖
final ReentrantLock lock;
// 等待條件
private final Condition notEmpty;
private final Condition notFull;
// 迭代器的共享狀態(tài)
transient Itrs itrs = null;
內(nèi)部方法
add() & offer()
我們看下add方法,這個(gè)方法用于向隊(duì)列中添加元素。
public boolean add(E e) {
return super.add(e);
}內(nèi)部調(diào)用了父類的方法,它繼承了AbstractQueue。
public class ArrayBlockingQueueextends AbstractQueue
implements BlockingQueue, java.io.Serializable {....}
接著看AbstractQueue的add()。
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}可以看到內(nèi)部調(diào)用了offer(), 如果添加成功就返回true,失敗就拋出異常, 這符合我們上節(jié)使用時(shí)它的特點(diǎn)。
但是,我們發(fā)現(xiàn)在它的內(nèi)部并沒有offer方法,所以實(shí)現(xiàn)不在AbstractQueue,實(shí)現(xiàn)還是在ArrayBlockingQueue。
來(lái)看下ArrayBlockingQueue的offer()方法。
public boolean offer(E e) {
// 判斷元素 e 是否為空,空拋出 NullPointerException 異常
checkNotNull(e);
final ReentrantLock lock = this.lock;
// 需要持鎖
lock.lock();
try {
// 如果元素已滿 返回false, 對(duì)標(biāo) add 就會(huì)拋出異常
if (count == items.length)
return false;
else {
// 添加到隊(duì)列中
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}看下enqueue:
private void enqueue(E x) {
final Object[] items = this.items;
// 將元素添加到預(yù)期的索引位置
items[putIndex] = x;
// 如果下個(gè)索引值等于容器數(shù)量值 將putIndex歸0
if (++putIndex == items.length)
putIndex = 0;
// 容器元素?cái)?shù)量+1
count++;
// 喚醒等待的線程
notEmpty.signal();
}remove & poll
先看第一個(gè) remove(), 同樣的這個(gè)方法存在于 AbstractQueue內(nèi)部,如果被移除的元素為null則拋出異常。
public E remove() {
E x = poll();
if (x != null)
return x;
else
throw new NoSuchElementException();
}poll()的實(shí)現(xiàn)在ArrayBlockingQueue,內(nèi)部實(shí)現(xiàn)方式跟add很像。
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}private E dequeue() {
final Object[] items = this.items;
// 這個(gè)注解用于忽略一些警告 這不是重點(diǎn)
@SuppressWarnings("unchecked")
// 取出元素 takeIndex 按照 FIFO
E x = (E) items[takeIndex];
// 元素取出時(shí) 置為空
items[takeIndex] = null;
// 判斷下一個(gè)元素的位置
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
// 喚醒等待的線程
notFull.signal();
// 返回元素
return x;
}第二個(gè)remove(e), 這個(gè)實(shí)現(xiàn)在ArrayBlockingQueue的內(nèi)部,可以移除指定元素。
public boolean remove(Object o) {
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count > 0) {
final int putIndex = this.putIndex;
int i = takeIndex;
do {
// 遍歷移除指定元素
if (o.equals(items[i])) {
// 移除指定元素 并更新對(duì)應(yīng)的索引位置
removeAt(i);
return true;
}
// 防止越界
if (++i == items.length)
i = 0;
} while (i != putIndex);
}
return false;
} finally {
lock.unlock();
}
}take
take會(huì)造成線程阻塞下面我看下它的內(nèi)部實(shí)現(xiàn)。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 獲取鎖,當(dāng)線程中斷會(huì)直接返回
lock.lockInterruptibly();
try {
// 如果元素內(nèi)部為空 會(huì)進(jìn)入阻塞,意思是沒有元素可拿了,進(jìn)入等待
while (count == 0)
// 使當(dāng)前線程等待
notEmpty.await();
// 否則出列
return dequeue();
} finally {
lock.unlock();
}
}put
該方法實(shí)現(xiàn)跟 take類似, 也會(huì)阻塞線程。
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 如果元素滿了 就阻塞
while (count == items.length)
notFull.await();
// 否則入列
enqueue(e);
} finally {
lock.unlock();
}
}結(jié)束語(yǔ)
本節(jié)主要給大家講了ArrayBlockingQueue的源碼實(shí)現(xiàn),它的源碼相對(duì)簡(jiǎn)單一些, 大家可以根據(jù)本節(jié)看下BlockingQueue其它的實(shí)現(xiàn)類。
當(dāng)前文章:面試官:阻塞隊(duì)列的底層實(shí)現(xiàn)有了解過嗎?
文章轉(zhuǎn)載:http://m.fisionsoft.com.cn/article/copehdj.html


咨詢
建站咨詢
