新聞中心
手寫一個異步工具類
于是我就到處看項(xiàng)目的源碼,看看有沒有什么能改進(jìn)的?果然讓我發(fā)現(xiàn)了。項(xiàng)目中到處充斥著 new Thread 類來異步執(zhí)行代碼的邏輯。

new Thread(r).start();
我們可以封裝一個異步工具類??!
第一版
說干就干,把上面的代碼簡單封裝一下,一個簡單的異步工具類就封裝好了。
public interface Executor {
void execute(Runnable r);
}
public class AsyncExecutorV1 implements Executor {
@Override
public void execute(Runnable r) {
new Thread(r).start();
}
}于是開開心心的提交了 merge request。
第二版
正當(dāng)我滿懷期待工具類代碼能被合并的時候,沒想代碼被組長杰哥打回來了。
「杰哥」:有心封裝工具類值得鼓勵,不過還可以改進(jìn)一下。
「小識」:還能再改進(jìn)?沒感覺我這個工具類還有改進(jìn)的余地啊!
「杰哥」:假如說有10000個異步任務(wù),你這創(chuàng)建10000個線程,資源耗費(fèi)太嚴(yán)重了!
「小識」:這樣啊,那我加個隊列,任務(wù)都放到隊列中,用一個線程從隊列中取任務(wù)執(zhí)行。
public class AsyncExecutorV2 implements Executor {
private BlockingQueue workQueue;
public AsyncExecutorV2(BlockingQueue workQueue) {
this.workQueue = workQueue;
WorkThread workThread = new WorkThread();
workThread.start();
}
@SneakyThrows
@Override
public void execute(Runnable r) {
workQueue.add(r);
}
class WorkThread extends Thread {
@Override
public void run() {
while (true) {
Runnable task = null;
try {
task = workQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
task.run();
}
}
}
}
第三版
「小識」:杰哥,快幫我看看,還有啥改進(jìn)的沒?
「杰哥」:小伙子不錯啊,居然能想到用隊列來緩沖任務(wù),不愧是我招進(jìn)來的人!但是用一個異步線程執(zhí)行任務(wù),你確定這個工具類比同步執(zhí)行的效率快?
「小識」:哈哈,又一個工具類翻車的案例,應(yīng)該多開幾個異步線程來執(zhí)行任務(wù),但是應(yīng)該開多少呢?
「杰哥」:誰最清楚異步工具類應(yīng)該用多少個線程來執(zhí)行呢?
「小識」:使用工具類的人。
「杰哥」:這不對了,你可以定義一個線程數(shù)量參數(shù),讓用戶來決定開多少線程?!噶硗饽氵@個工具類還個問題,隊列滿了會直接拋出異常!」
「小識」:那我增加一個拒絕策略類(RejectedExecutionHandler),當(dāng)線程池滿了讓用戶決定執(zhí)行策略,比如直接拋異常,用當(dāng)前線程同步執(zhí)行任務(wù)。
public class AsyncExecutorV3 implements Executor {
private BlockingQueue workQueue;
private List workThreadList = new ArrayList<>();
private RejectedExecutionHandler handler;
public AsyncExecutorV3(int corePoolSize,
BlockingQueue workQueue,
RejectedExecutionHandler handler) {
this.workQueue = workQueue;
this.handler = handler;
for (int i = 0; i < corePoolSize; i++) {
WorkThread workThread = new WorkThread();
workThread.start();
workThreadList.add(workThread);
}
}
@SneakyThrows
@Override
public void execute(Runnable r) {
if (!workQueue.offer(r)) {
// 隊列滿了,執(zhí)行拒絕策略
handler.rejectedExecution(r);
}
}
class WorkThread extends Thread {
@Override
public void run() {
while (true) {
Runnable task = null;
try {
task = workQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
task.run();
}
}
}
}
// 拒絕策略類
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r);
}
// 當(dāng)線程池滿了之后直接拋出異常
public class AbortPolicy implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r) {
throw new RuntimeException("queue is full");
}
}
// 當(dāng)線程池滿了之后直接拋出異常
public class AbortPolicy implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r) {
throw new RuntimeException("queue is full");
}
}
// 當(dāng)線程池滿了之后,用提交任務(wù)的線程同步執(zhí)行任務(wù)
public class CallerRunsPolicy implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r) {
r.run();
}
}
再次提交 merge request,終于被合并了,別的團(tuán)隊都開始使用我的工具類了!
過了幾天小亮急匆匆找到我。
「小亮」:小識,你的工具類挺好用的。但是我最近遇到了一個問題,我用了CountDownLatch批量執(zhí)行任務(wù),但是我這個任務(wù)好像卡住了,我用jstack想看看線程的執(zhí)行情況,快告訴我你異步線程的名字設(shè)置的是啥?
「小識」:哎呀,我們沒設(shè)置線程的名字,應(yīng)該用的是默認(rèn)的線程名字 Thread-n。
「小亮」:你可得給工具類加個線程名字的參數(shù)啊,不然一個一個看線程的狀態(tài)太累了,而且效率也不高。
「小識」:我這就加。
第四版
趕緊加了一個線程名字的參數(shù),然后再次提交代碼。
「杰哥」:哎呀,沒想到我也疏忽了,沒發(fā)現(xiàn)這個問題,確實(shí)應(yīng)該加個線程名字的參數(shù),代碼的可擴(kuò)展性太重要了,改來改去可不行。
「小識」:是啊!
「杰哥」:你覺得你只加一個線程名字參數(shù),可擴(kuò)展性高嗎?如果有的團(tuán)隊想修改異步線程的優(yōu)先級,你再加個優(yōu)先級參數(shù)?
「小識」:感覺不太行,那讓用戶把線程傳給我吧!
「杰哥」:哈哈,可以,你還可以用工廠模式優(yōu)化一下,用戶傳入線程工廠類,工具類用工廠類創(chuàng)建線程。
「小識」:不愧是杰哥,這樣一來代碼更清爽了!
public class AsyncExecutorV4 implements Executor {
private BlockingQueue workQueue;
private List workThreadList = new ArrayList<>();
private RejectedExecutionHandler handler;
public AsyncExecutorV4(int corePoolSize,
BlockingQueue workQueue,
RejectedExecutionHandler handler,
ThreadFactory threadFactory) {
this.workQueue = workQueue;
this.handler = handler;
for (int i = 0; i < corePoolSize; i++) {
// 用工廠類創(chuàng)建線程
WorkThread workThread = threadFactory.newThread();
workThread.start();
workThreadList.add(workThread);
}
}
@SneakyThrows
@Override
public void execute(Runnable r) {
if (!workQueue.offer(r)) {
handler.rejectedExecution(r);
}
}
// 異步線程
public class WorkThread extends Thread {
@Override
public void run() {
while (true) {
Runnable task = null;
try {
task = workQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
task.run();
}
}
}
// 異步線程工廠類
public interface ThreadFactory {
WorkThread newThread();
}
}
代碼提交之后,小亮給線程起了一個名字,async-thread,現(xiàn)在他通過名字很快就能知道線程池中的線程在干嘛了!
大家不斷的進(jìn)行改進(jìn)
隨著這個異步工具類在公司內(nèi)部使用的越來越多,大家也提交了很多改進(jìn)的代碼。
- 按需創(chuàng)建線程,不要一開始就創(chuàng)建「corePoolSize」個線程,而是在調(diào)用者提交任務(wù)的過程中逐漸創(chuàng)建出來,最后創(chuàng)建了「corePoolSize」個就不再創(chuàng)建了。
- 提高工具的彈性,當(dāng)任務(wù)突增時,隊列會被放滿,然后多余的任務(wù)有可能會被直接扔掉。當(dāng)然我們可以把「corePoolSize」設(shè)的很大,但是這樣并不優(yōu)雅,因?yàn)榇蟛糠智闆r下是用不到這么多線程的。當(dāng)任務(wù)突增時,我們可以適當(dāng)增加線程,提高執(zhí)行速度,當(dāng)然創(chuàng)建的總線程數(shù)還是要限制一下的,我們把能創(chuàng)建的總數(shù)定為「maximumPoolSize」。
- 及時關(guān)閉不需要的線程,當(dāng)任務(wù)突增時,線程數(shù)可能增加「maximumPoolSize」,但是大多數(shù)時間「corePoolSize」個線程就足夠用了,因此可以定義一個超時時間,當(dāng)一個線程在「keepAliveTime」時間內(nèi)沒有執(zhí)行任務(wù),就把它給關(guān)掉。
異步工具類執(zhí)行流程圖
經(jīng)過大家的不斷改進(jìn)之后,構(gòu)造函數(shù)中的參數(shù)也越來越多了,杰哥讓我寫個文檔吧,把這個異步工具類的構(gòu)造函數(shù)和執(zhí)行流程總結(jié)一下,不然新來的小伙伴看到這個工具類一臉懵可不行!
這個工具類的構(gòu)造函數(shù)目前有如下7個參數(shù)
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueueworkQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
|
參數(shù) |
含義 |
|
corePoolSize |
核心線程數(shù) |
|
maximumPoolSize |
最大線程數(shù) |
|
keepAliveTime |
非核心線程的空閑時間 |
|
TimeUnit |
空閑時間的單位 |
|
BlockingQueue |
任務(wù)隊列 |
|
ThreadFactory |
線程工廠 |
|
RejectedExecutionHandler |
拒絕策略 |
「執(zhí)行流程圖如下」:
對了,最后大家給這個異步工具類起了一個牛的名字,「線程池」。
分享標(biāo)題:如何手寫一個線程池?
瀏覽路徑:http://m.fisionsoft.com.cn/article/dhjogjg.html


咨詢
建站咨詢
