新聞中心
前言
ThreadPoolExecutor中是如何做到線程復(fù)用的?
我們知道,一個(gè)線程在創(chuàng)建的時(shí)候會(huì)指定一個(gè)線程任務(wù),當(dāng)執(zhí)行完這個(gè)線程任務(wù)之后,線程自動(dòng)銷毀。但是線程池卻可以復(fù)用線程,一個(gè)線程執(zhí)行完線程任務(wù)后不銷毀,繼續(xù)執(zhí)行另外的線程任務(wù)。那么它是如何做到的?這得從addWorker()說起。

讓客戶滿意是我們工作的目標(biāo),不斷超越客戶的期望值來自于我們對(duì)這個(gè)行業(yè)的熱愛。我們立志把好的技術(shù)通過有效、簡(jiǎn)單的方式提供給客戶,將通過不懈努力成為客戶在信息化領(lǐng)域值得信任、有價(jià)值的長(zhǎng)期合作伙伴,公司提供的服務(wù)項(xiàng)目有:域名申請(qǐng)、網(wǎng)頁空間、營(yíng)銷軟件、網(wǎng)站建設(shè)、高陽網(wǎng)站維護(hù)、網(wǎng)站推廣。
addWorker()
- 先看上半部分addWorker()。
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 對(duì)邊界設(shè)定的檢查
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}?retry:可能有些同學(xué)沒用過,它只是一個(gè)標(biāo)記,它的下一個(gè)標(biāo)記就是for循環(huán),在for循環(huán)里面調(diào)用continue/break再緊接著retry標(biāo)記時(shí),就表示從這個(gè)地方開始執(zhí)行continue/break操作,但這不是我們關(guān)注的重點(diǎn)。
從上面的代碼,我們可以看出,ThreadPoolExecutor在創(chuàng)建線程時(shí),會(huì)將線程封裝成「工作線程worker」,并放入「工作線程組」中,然后這個(gè)worker反復(fù)從阻塞隊(duì)列中拿任務(wù)去執(zhí)行。這個(gè)addWorker是excute方法中調(diào)用的。
- 我們接著看下半部分。
private boolean addWorker(Runnable firstTask, boolean core) {
// 上半部分
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
// core是ture,需要?jiǎng)?chuàng)建的線程為核心線程,則先判斷當(dāng)前線程是否大于核心線程
// 如果core是false,證明需要?jiǎng)?chuàng)建的是非核心線程,則先判斷當(dāng)前線程數(shù)是否大于總線程數(shù)
// 如果不小于,則返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 下半部分
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 創(chuàng)建worker對(duì)象
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 獲取線程全局鎖
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
// 判斷線程池狀態(tài)
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 將當(dāng)前線程添加到線程組
workers.add(w);
int s = workers.size();
// 如果線程組中的線程數(shù)大于最大線程池?cái)?shù) largestPoolSize賦值s
if (s > largestPoolSize)
largestPoolSize = s;
// 添加成功
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 添加成功后執(zhí)行線程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// 添加失敗后執(zhí)行 addWorkerFailed
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}再看 addWorkerFailed(),與上邊相反,相當(dāng)于一個(gè)回滾操作,會(huì)移除失敗的工作線程。
private void addWorkerFailed(Worker w) {
// 同樣需要全局鎖
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w);
decrementWorkerCount();
tryTerminate();
} finally {
mainLock.unlock();
}
}Worker
我們接著看Worker對(duì)象。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
//.....
// 省略下邊代碼
}
Worker類實(shí)現(xiàn)了Runnable接口,所以Worker也是一個(gè)線程任務(wù)。在構(gòu)造方法中,創(chuàng)建了一個(gè)線程,回過頭想想addWorker()里為啥可以t.start()應(yīng)該很清楚了吧, 并且在構(gòu)造方法中調(diào)用了線程工廠創(chuàng)建了一個(gè)線程實(shí)例,我們上節(jié)講過線程工廠。其實(shí)這也不是關(guān)注的重點(diǎn),重點(diǎn)是這個(gè)runWorker()。
final void runWorker(Worker w) {
// 獲取當(dāng)前的線程實(shí)例
Thread wt = Thread.currentThread();
// 直接從第一個(gè)任務(wù)開始執(zhí)行
Runnable task = w.firstTask;
// 獲取完之后把worker的firstTask置為null 防止下次獲取到
w.firstTask = null;
// 線程啟動(dòng)之后,通過unlock方法釋放鎖
w.unlock(); // allow interrupts
// 線程異常退出時(shí) 為 true
boolean completedAbruptly = true;
try {
// Worker執(zhí)行firstTask或從workQueue中獲取任務(wù),直到任務(wù)為空
while (task != null || (task = getTask()) != null) {
// 獲取鎖以防止在任務(wù)執(zhí)行過程中發(fā)生中斷
w.lock();
// 判斷邊界值 如果線程池中斷 則中斷線程
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 相當(dāng)于鉤子方法
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 執(zhí)行任務(wù)
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}?首先去執(zhí)行創(chuàng)建這個(gè)worker時(shí)就有的任務(wù),當(dāng)執(zhí)行完這個(gè)任務(wù)后,worker的生命周期并沒有結(jié)束,在while循環(huán)中,worker會(huì)不斷地調(diào)用getTask方法從「阻塞隊(duì)列」中獲取任務(wù)然后調(diào)用task.run()執(zhí)行任務(wù),從而達(dá)到「復(fù)用線程」的目的。只要getTask方法不返回null,此線程就不會(huì)退出。
我們接著看getTask()?。
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 如果運(yùn)行線程數(shù)超過了最大線程數(shù),但是緩存隊(duì)列已經(jīng)空了,這時(shí)遞減worker數(shù)量。
// 如果有設(shè)置允許線程超時(shí)或者線程數(shù)量超過了核心線程數(shù)量,并且線程在規(guī)定時(shí)間內(nèi)均未poll到任務(wù)且隊(duì)列為空則遞減worker數(shù)量
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 如果timed為true,則會(huì)調(diào)用workQueue的poll方法獲取任務(wù).
// 超時(shí)時(shí)間是keepAliveTime。如果超過keepAliveTime時(shí)長(zhǎng),
// 如果timed為false, 則會(huì)調(diào)用workQueue的take方法阻塞在當(dāng)前。
// 隊(duì)列中有任務(wù)加入時(shí),線程被喚醒,take方法返回任務(wù),并執(zhí)行。
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}大家有沒有想過這里為啥要用take和poll,它們都是出隊(duì)的操作,這么做有什么好處?
take & poll
?我們說take()方法會(huì)將核心線程阻塞掛起,這樣一來它就不會(huì)占用太多的cpu資源,直到拿到Runnable 然后返回。
如果「allowCoreThreadTimeOut」設(shè)置為true,那么核心線程就會(huì)去調(diào)用poll方法,因?yàn)閜oll可能會(huì)返回null,所以這時(shí)候核心線程滿足超時(shí)條件也會(huì)被銷毀。
?非核心線程會(huì)workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS),如果超時(shí)還沒有拿到,下一次循環(huán)判斷「compareAndDecrementWorkerCount」就會(huì)返回null,Worker對(duì)象的run()方法循環(huán)體的判斷為null,任務(wù)結(jié)束,然后線程被系統(tǒng)回收 。
再回頭看一下runWorker()是不是設(shè)計(jì)的很巧妙。
結(jié)束語
本節(jié)內(nèi)容不是很好理解,想繼續(xù)探討的同學(xué)可以繼續(xù)閱讀它的源碼,這部分內(nèi)容了解一下就好,其實(shí)我們從源碼中可以看到大量的線程狀態(tài)檢查,代碼寫的很健壯,可以從中學(xué)習(xí)一下。
當(dāng)前文章:面試官:線程池是如何做到線程復(fù)用的?有了解過嗎?
URL標(biāo)題:http://m.fisionsoft.com.cn/article/djpoppd.html


咨詢
建站咨詢
