新聞中心
01.***制線程的缺點

安多網(wǎng)站制作公司哪家好,找創(chuàng)新互聯(lián)!從網(wǎng)頁設(shè)計、網(wǎng)站建設(shè)、微信開發(fā)、APP開發(fā)、響應(yīng)式網(wǎng)站等網(wǎng)站項目制作,到程序開發(fā),運營維護。創(chuàng)新互聯(lián)成立于2013年到現(xiàn)在10年的時間,我們擁有了豐富的建站經(jīng)驗和運維經(jīng)驗,來保證我們的工作的順利進行。專注于網(wǎng)站建設(shè)就選創(chuàng)新互聯(lián)。
多線程的軟件設(shè)計方法確實可以***限度地發(fā)揮多核處理器的計算能力,提高生產(chǎn)系統(tǒng)的吞吐量和性能。但是,若不加控制和管理的隨意使用線程,對系統(tǒng)的性能反而會產(chǎn)生不利的影響。
一種最為簡單的線程創(chuàng)建和回收的方法類似如下:
- new Thread(new Runnable() {
- @Override
- public void run() {
- //do sth
- }
- }).start();
以上代碼創(chuàng)建了一條線程,并在run()方法結(jié)束后,自動回收該線程。在簡單的應(yīng)用系統(tǒng)中,這段代碼并沒有太多問題。但是在真實的生產(chǎn)環(huán)境中,系統(tǒng)由于真實環(huán)境的需要,可能會開啟很多線程來支撐其應(yīng)用。而當線程數(shù)量過大時,反而會耗盡CPU和內(nèi)存資源。
首先,雖然與進程相比,線程是一種輕量級的工具,但其創(chuàng)建和關(guān)閉依然需要花費時間,如果為每一個小的任務(wù)都創(chuàng)建一個線程,很有可能出現(xiàn)創(chuàng)建和銷毀線程所占用的時間大于該線程真實工作所消耗的時間,反而會得不償失。
其次,線程本身也是要占用內(nèi)存空間的,大量的線程會搶占寶貴的內(nèi)部資源。
因此,在實際生產(chǎn)環(huán)境中,線程的數(shù)量必須得到控制。盲目地大量創(chuàng)建線程對系統(tǒng)性能是有傷害的。
02.簡單的線程池實現(xiàn)
下面給出一個最簡單的線程池,該線程池不是一個完善的線程池,但已經(jīng)實現(xiàn)了一個基本線程池的核心功能,有助于快速理解線程池的實現(xiàn)。
1.線程池的實現(xiàn)
- public class ThreadPool {
- private static ThreadPool instance = null;
- //空閑的線程隊列
- private List
idleThreads; - //已有的線程總數(shù)
- private int threadCounter;
- private boolean isShutDown = false;
- private ThreadPool() {
- this.idleThreads = new Vector<>(5);
- threadCounter = 0;
- }
- public int getCreatedThreadCounter() {
- return threadCounter;
- }
- //取得線程池的實例
- public synchronized static ThreadPool getInstance() {
- if (instance == null) {
- instance = new ThreadPool();
- }
- return instance;
- }
- //將線程池放入池中
- protected synchronized void repool(PThread repoolingThread) {
- if (!isShutDown) {
- idleThreads.add(repoolingThread);
- } else {
- repoolingThread.shutDown();
- }
- }
- //停止池中所有線程
- public synchronized void shutDown() {
- isShutDown = true;
- for (int threadIndex = 0; threadIndex < idleThreads.size(); threadIndex++) {
- PThread pThread = idleThreads.get(threadIndex);
- pThread.shutDown();
- }
- }
- //執(zhí)行任務(wù)
- public synchronized void start(Runnable target) {
- PThread thread = null;
- //如果有空閑線程,則直接使用
- if (idleThreads.size() > 0) {
- int lastIndex = idleThreads.size() - 1;
- thread = idleThreads.get(lastIndex);
- idleThreads.remove(thread);
- //立即執(zhí)行這個任務(wù)
- thread.setTarget(target);
- }//沒有空閑線程,則創(chuàng)建線程
- else {
- threadCounter++;
- //創(chuàng)建新線程
- thread = new PThread(target, "PThread #" + threadCounter, this);
- //啟動這個線程
- thread.start();
- }
- }
- }
2.要實現(xiàn)上面的線程池,就需要一個永不退出的線程與之配合。PThread就是一個這樣的線程。它的主體部分是一個***循環(huán),該線程在手動關(guān)閉前永不結(jié)束,并一直等待新的任務(wù)到達。
- public class PThread extends Thread {
- //線程池
- private ThreadPool pool;
- //任務(wù)
- private Runnable target;
- private boolean isShutDown = false;
- private boolean isIdle = false; //是否閑置
- //構(gòu)造函數(shù)
- public PThread(Runnable target,String name, ThreadPool pool){
- super(name);
- this.pool = pool;
- this.target = target;
- }
- public Runnable getTarget(){
- return target;
- }
- public boolean isIdle() {
- return isIdle;
- }
- @Override
- public void run() {
- //只要沒有關(guān)閉,則一直不結(jié)束該線程
- while (!isShutDown){
- isIdle = false;
- if (target != null){
- //運行任務(wù)
- target.run();
- }
- try {
- //任務(wù)結(jié)束了,到閑置狀態(tài)
- isIdle = true;
- pool.repool(this);
- synchronized (this){
- //線程空閑,等待新的任務(wù)到來
- wait();
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- isIdle = false;
- }
- }
- public synchronized void setTarget(Runnable newTarget){
- target = newTarget;
- //設(shè)置了任務(wù)之后,通知run方法,開始執(zhí)行這個任務(wù)
- notifyAll();
- }
- //關(guān)閉線程
- public synchronized void shutDown(){
- isShutDown = true;
- notifyAll();
- }
- }
3.測試Main方法
- public static void main(String[] args) throws InterruptedException {
- for (int i = 0; i < 1000; i++) {
- ThreadPool.getInstance().start(new Runnable() {
- @Override
- public void run() {
- try {
- //休眠100ms
- Thread.sleep(100);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- });
- }
- }
03ThreadPoolExecutor
為了能夠更好地控制多線程,JDK提供了一套Executor框架,幫助開發(fā)人員有效地進行線程控制。Executor框架無論是newFixedThreadPool()方法、newSingleThreadExecutor()方法還是newCachedThreadPool()方法,其內(nèi)部實現(xiàn)均使用了 ThreadPoolExecutor:
- public static ExecutorService newCachedThreadPool() {
- return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
- 60L, TimeUnit.SECONDS,
- new SynchronousQueue
()); - }
- public static ExecutorService newFixedThreadPool(int nThreads) {
- return new ThreadPoolExecutor(nThreads, nThreads,
- 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue
()); - }
- public static ExecutorService newSingleThreadExecutor() {
- return new FinalizableDelegatedExecutorService
- (new ThreadPoolExecutor(1, 1,
- 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue
())); - }
由以上線程池的實現(xiàn)代碼可以知道,它們只是對 ThreadPoolExecutor 類的封裝。為何 ThreadPoolExecutor 類有如此強大的功能?來看一下 ThreadPoolExecutor 最重要的構(gòu)造方法。
3.1 構(gòu)造方法
ThreadPoolExecutor最重要的構(gòu)造方法如下:
- public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
方法參數(shù)如下:
ThreadPoolExecutor的使用示例,通過execute()方法提交任務(wù)。
- public static void main(String[] args) {
- ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 5, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
- for (int i = 0; i < 10; i++) {
- executor.execute(new Runnable() {
- @Override
- public void run() {
- System.out.println(Thread.currentThread().getName());
- }
- });
- }
- executor.shutdown();
- }
或者通過submit()方法提交任務(wù)
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 5, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
- List
futureList = new Vector<>(); - //在其它線程中執(zhí)行100次下列方法
- for (int i = 0; i < 100; i++) {
- futureList.add(executor.submit(new Callable
() { - @Override
- public String call() throws Exception {
- return Thread.currentThread().getName();
- }
- }));
- }
- for (int i = 0;i
- Object o = futureList.get(i).get();
- System.out.println(o.toString());
- }
- executor.shutdown();
- }
運行結(jié)果:
- ...
- pool-1-thread-4
- pool-1-thread-3
- pool-1-thread-2
下面主要講解ThreadPoolExecutor的構(gòu)造方法中workQueue和RejectedExecutionHandler參數(shù),其它參數(shù)都很簡單。
3.2 workQueue任務(wù)隊列
用于保存等待執(zhí)行的任務(wù)的阻塞隊列??梢赃x擇以下幾個阻塞隊列。
- ArrayBlockingQueue: 是一個基于數(shù)組結(jié)構(gòu)的有界阻塞隊列,按FIFO原則進行排序
- LinkedBlockingQueue: 一個基于鏈表結(jié)構(gòu)的阻塞隊列,吞吐量高于ArrayBlockingQueue。靜態(tài)工廠方法Excutors.newFixedThreadPool()使用了這個隊列
- SynchronousQueue: 一個不存儲元素的阻塞隊列。每個插入操作必須等到另一個線程調(diào)用移除操作,否則插入操作一直處于阻塞狀態(tài),吞吐量高于LinkedBlockingQueue,靜態(tài)工廠方法Excutors.newCachedThreadPool()使用了這個隊列
- PriorityBlockingQueue: 一個具有優(yōu)先級的***阻塞隊列。
3.3 RejectedExecutionHandler飽和策略
當隊列和線程池都滿了,說明線程池處于飽和狀態(tài),那么必須采取一種策略還處理新提交的任務(wù)。它可以有如下四個選項:
- AbortPolicy : 直接拋出異常,默認情況下采用這種策略
- CallerRunsPolicy : 只用調(diào)用者所在線程來運行任務(wù)
- DiscardOldestPolicy : 丟棄隊列里最近的一個任務(wù),并執(zhí)行當前任務(wù)
- DiscardPolicy : 不處理,丟棄掉
更多的時候,我們應(yīng)該通過實現(xiàn)RejectedExecutionHandler 接口來自定義策略,比如記錄日志或持久化存儲等。
3.4 submit()與execute()
可以使用execute和submit兩個方法向線程池提交任務(wù)。
execute方法用于提交不需要返回值的任務(wù),利用這種方式提交的任務(wù)無法得知是否正常執(zhí)行
submit方法用于提交一個任務(wù)并帶有返回值,這個方法將返回一個Future類型對象??梢酝ㄟ^這個返回對象判斷任務(wù)是否執(zhí)行成功,并且可以通過future.get()方法來獲取返回值,get()方法會阻塞當前線程直到任務(wù)完成。
3.5 shutdown()與shutdownNow()
可以通過調(diào)用 shutdown() 或 shutdownNow() 方法來關(guān)閉線程池。它們的原理是遍歷線程池中的工作線程,然后逐個調(diào)用線程的 interrupt 方法來中斷線程,所以無法響應(yīng)中斷的任務(wù)可能永遠無法停止。
這倆方法的區(qū)別是,shutdownNow() 首先將線程池的狀態(tài)設(shè)置成STOP,然后嘗試停止所有的正在執(zhí)行或暫停任務(wù)的線程,并返回等待執(zhí)行任務(wù)的列表,而 shutdown() 只是將線程池的狀態(tài)設(shè)置成 SHUTDOWN 狀態(tài),然后中斷所有沒有正在執(zhí)行任務(wù)的線程。
只要調(diào)用了這兩個關(guān)閉方法的任意一個,isShutdown 方法就會返回 true。當所有的任務(wù)都已關(guān)閉了,才表示線程池關(guān)閉成功,這時調(diào)用 isTerminaced 方法會返回 true。
通常調(diào)用 shutdown() 方法來關(guān)閉線程池,如果任務(wù)不一定要執(zhí)行完,則可以調(diào)用 shutdownNow() 方法。
3.6 合理配置線程池
要想合理地配置線程池,首先要分析任務(wù)特性
- 任務(wù)的性質(zhì):CPU密集型任務(wù)、IO密集型任務(wù)和混合型任務(wù)。
- 任務(wù)的優(yōu)先級:高、中和低。
- 任務(wù)的執(zhí)行時間:長、中和短。
- 任務(wù)的依賴性:是否依賴其他系統(tǒng)資源,如數(shù)據(jù)庫連接。
性質(zhì)不同的任務(wù)可以用不同規(guī)模的線程池分開處理。
CPU密集型任務(wù)應(yīng)該配置盡可能少的線程,如配置N+1個線程,N位CPU的個數(shù)。
而IO密集型任務(wù)線程并不是一直在執(zhí)行任務(wù),則應(yīng)配置盡可能多的線程,如2*N。
混合型任務(wù),如果可以拆分,將其拆分成一個CPU密集型任務(wù)和一個IO密集型任務(wù),只要這兩個任務(wù)執(zhí)行的時間相差不是太大,那么分解后執(zhí)行的吞吐量將高于串行執(zhí)行的吞吐量。如果這兩個任務(wù)執(zhí)行的時間相差很大,則沒有必要進行分解??梢酝ㄟ^Runtime.getRuntime().availableProcessors()方法獲得當前設(shè)備的CPU個數(shù)。
優(yōu)先級不同的任務(wù)可以使用優(yōu)先級隊列PriorityBlockingQueue來處理。它可以讓優(yōu)先級高的任務(wù)先執(zhí)行。
3.7 線程池的監(jiān)控
由于大量的使用線程池,所以很有必要對其進行監(jiān)控??梢酝ㄟ^繼承線程池來自定義線程池,重寫線程池的beforeExecute、afterExecute 和 terminated 方法,也可以在任務(wù)執(zhí)行前,執(zhí)行后和線程池關(guān)閉前執(zhí)行一些代碼來進行監(jiān)控。在監(jiān)控線程池的時候可以使用一下屬性:
(1) taskCount:線程池需要執(zhí)行的任務(wù)數(shù)量
(2) completedTaskCount:線程池在運行過程中已完成的任務(wù)數(shù)量,小于或等于taskCount
(3) largestPoolSize: 線程池里曾經(jīng)創(chuàng)建過***的線程數(shù)量。通過這個數(shù)據(jù)可以知道線程池是否曾經(jīng)滿過。如該數(shù)值等于線程池***大小,則表示線程池曾經(jīng)滿過。
(4) getPoolSize:線程池的線程數(shù)量。如果線程池不銷毀的話,線程池里的線程不會自動銷毀,所以這個大小只增不減。
(5) getActiveCount:獲取活動的線程數(shù)
04Executor多線程框架
ThreadPoolExecutor 表示一個線程池,Executors 類則扮演著線程池工廠的角色,通過 Executors 可以取得一個特定功能的線程池。
使用 Executors 框架實現(xiàn)上節(jié)中的例子,其代碼如下:
- public static void main(String[] args) {
- //新建一個線程池
- ExecutorService executor = Executors.newCachedThreadPool();
- //在其它線程中執(zhí)行100次下列方法
- for (int i = 0; i < 100; i++) {
- executor.execute(new Runnable() {
- @Override
- public void run() {
- System.out.println(Thread.currentThread().getName());
- }
- });
- }
- //執(zhí)行完關(guān)閉
- executor.shutdown();
- }
4.1 Executors框架的結(jié)構(gòu)
1.任務(wù)
包括被執(zhí)行任務(wù)需要實現(xiàn)的接口:Runnable 接口或 Callable 接口。
2.任務(wù)的執(zhí)行
包括任務(wù)執(zhí)行機制的核心接口 Executor,以及繼承自 Executor 的ExecutorService 接口。Executor框架有兩個關(guān)鍵類實現(xiàn)了 ExecutorService 接口(ThreadPoolExecutor 和 ScheduledThreadPoolExecutor)。
3.異步計算的結(jié)果
包括接口 Future 和實現(xiàn)Future接口的FutureTask類。
4.2 Executors工廠方法
Executors工廠類的主要方法:
- public static ExecutorService newFixedThreadPool(int nThreads)
該方法返回一個固定線程數(shù)量的線程池,該線程池中的線程數(shù)量始終不變。當有一個新的任務(wù)提交時,線程池中若有空閑線程,則立即執(zhí)行。若沒有,則新的任務(wù)會被暫存在一個任務(wù)隊列中,待有線程空閑時,便處理在任務(wù)隊列中的任務(wù)。
- public static ExecutorService newSingleThreadExecutor()
該方法返回一個只有一個線程的線程池。若多余一個任務(wù)被提交到線程池,任務(wù)會被保存在一個任務(wù)隊列中,待線程空閑,按先入先出的順序執(zhí)行隊列中的任務(wù)。
- public static ExecutorService newCachedThreadPool()
該方法返回一個可根據(jù)實際情況調(diào)整線程數(shù)量的線程池。線程池的線程數(shù)量不確定,但若有空閑線程可以復用,則會優(yōu)先使用可復用的線程。但所有線程均在工作,又有新的任務(wù)提交,則會創(chuàng)建新的線程處理任務(wù)。所有線程在當前任務(wù)執(zhí)行完畢后,將返回線程池進行復用。
- public static ScheduledExecutorService newSingleThreadScheduledExecutor()
該方法返回一個ScheduledExecutorService對象,線程池大小為1。ScheduledExecutorService接口在ExecutorService接口之上擴展了在給定時間執(zhí)行某任務(wù)的功能,如在某個固定的延時之后執(zhí)行,或者周期性執(zhí)行某個任務(wù)。
- public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
該方法也返回一個 ScheduledExecutorService 對象,但該線程池可以指定線程數(shù)量。
4.3 ThreadPoolExecutor與ScheduledThreadPoolExecutor
在前面提到了Executors 類扮演著線程池工廠的角色,通過 Executors 可以取得一個特定功能的線程池。Executors 工廠類的主要方法可以創(chuàng)建 ThreadPoolExecutor 和 ScheduledThreadPoolExecutor 線程池。
關(guān)于ThreadPoolExecutor ,前面第3節(jié)已經(jīng)詳細敘述。ScheduledThreadPoolExecutor 也是ExecutorService接口的實現(xiàn)類,可以在給定的延遲后運行命令,或者定期執(zhí)行命令。ScheduledThreadPoolExecutor 比 Timer 更靈活,功能更強大。
4.4 Future與FutureTask
上面的示例中使用 execute() 方法提交任務(wù),用于提交不需要返回值的任務(wù)。如果我們需要獲取執(zhí)行任務(wù)之后的返回值,可以使用submit()方法。
示例代碼:
- public static void main(String[] args) throws InterruptedException, ExecutionException {
- //新建一個線程池
- ExecutorService executor = Executors.newCachedThreadPool();
- List
futureList = new Vector<>(); - //在其它線程中執(zhí)行100次下列方法
- for (int i = 0; i < 100; i++) {
- futureList.add(executor.submit(new Callable
() { - @Override
- public String call() throws Exception {
- return Thread.currentThread().getName()+" "+System.currentTimeMillis()+" ";
- }
- }));
- }
- for (int i = 0;i
- Object o = futureList.get(i).get();
- System.out.println(o.toString()+i);
- }
- executor.shutdown();
- }
運行結(jié)果:
- ...
- pool-1-thread-11 1537872778612 96
- pool-1-thread-11 1537872778613 97
- pool-1-thread-10 1537872778613 98
- pool-1-thread-10 1537872778613 99
到這里,就不得不提Future接口與FutureTask實現(xiàn)類,它們代表異步計算的結(jié)果。
- Future
submit(Callable task) - Future> submit(Runnable task);
- Future
submit(Runnable task, T result);
當我們submit()提交后,會返回一個Future對象,到JDK1.8,返回的實際是FutureTask實現(xiàn)類。submit() 方法支持 Runnable 或 Callable 類型的參數(shù)。Runnable 接口 和Callable 接口的區(qū)別就是 Runnable 不會返回結(jié)果,Callable 會返回結(jié)果。
主線程可以執(zhí)行 futureTask.get() 方法來阻塞當前線程直到任務(wù)執(zhí)行完成,任務(wù)完成后返回任務(wù)執(zhí)行的結(jié)果。
futureTask.get(long timeout, TimeUnit unit) 方法則會阻塞當前線程一段時間立即返回,這時候有可能任務(wù)沒有執(zhí)行完。
主線程也可以執(zhí)行 futureTask.cancel(boolean mayInterruptIfRunning) 來取消此任務(wù)的執(zhí)行。
futureTask.isCancelled方法表示任務(wù)是否被取消成功,如果在任務(wù)正常完成前被取消成功,則返回 true。
futureTask.isDone方法表示任務(wù)是否已經(jīng)完成,若任務(wù)完成,則返回true。
網(wǎng)站名稱:Java線程池實現(xiàn)原理與技術(shù),看這一篇就夠了
本文地址:http://m.fisionsoft.com.cn/article/coodhip.html


咨詢
建站咨詢
