新聞中心
一天,我在改進(jìn)多線程代碼時(shí)被Future.get()卡住了。

成都創(chuàng)新互聯(lián)公司主要從事成都做網(wǎng)站、網(wǎng)站設(shè)計(jì)、網(wǎng)頁設(shè)計(jì)、企業(yè)做網(wǎng)站、公司建網(wǎng)站等業(yè)務(wù)。立足成都服務(wù)汨羅,十余年網(wǎng)站建設(shè)經(jīng)驗(yàn),價(jià)格優(yōu)惠、服務(wù)專業(yè),歡迎來電咨詢建站服務(wù):028-86922220
public void serve() throws InterruptedException, ExecutionException, TimeoutException {
final Future responseFuture = asyncCode();
final Response response = responseFuture.get(1, SECONDS);
send(response);
}
private void send(Response response) {
//...
} 這是用Java寫的一個(gè)Akka應(yīng)用程序,使用了一個(gè)包含1000個(gè)線程的線程池(原來如此!)——所有的線程都在阻塞在這個(gè) get() 中。系統(tǒng)的處理速度跟不上并發(fā)請求的數(shù)量。重構(gòu)以后,我們干掉了所有的這些線程僅保留了一個(gè),極大的減少了內(nèi)存的占用。我們簡單一點(diǎn),通過一個(gè)Java 8的例子來演示。***步是使用CompletableFuture來替換簡單的Future(見:Tip 9)。
- 通過控制任務(wù)提交到ExecutorService的方式:只需用 CompletableFuture.supplyAsync(…, executorService) 來代替 executorService.submit(…) 即可
- 處理基于回調(diào)函數(shù)的API:使用promises
否則(如果你已經(jīng)使用了阻塞式的API或 Future
public void serve() throws InterruptedException, ExecutionException, TimeoutException {
final CompletableFuture responseFuture = asyncCode();
final Response response = responseFuture.get(1, SECONDS);
send(response);
} 很明顯,這不能解決任何問題,我們還必須利用新的風(fēng)格來編程:
public void serve() {
final CompletableFuture responseFuture = asyncCode();
responseFuture.thenAccept(this::send);
} 這個(gè)功能上是等同的,但是 serve() 只會(huì)運(yùn)行一小段時(shí)間(不會(huì)阻塞或等待)。只需要記?。簍his::send 將會(huì)在完成 responseFuture 的同一個(gè)線程內(nèi)執(zhí)行。如果你不想花費(fèi)太大的代價(jià)來重載已經(jīng)存在的線程池或send()方法,可以考慮通過 thenAcceptAsync(this::send, sendPool) 好極了,但是我們失去了兩個(gè)重要屬性:異常傳播與超時(shí)。異常傳播很難實(shí)現(xiàn),因?yàn)槲覀兏淖兞薃PI。當(dāng)serve()存在的時(shí)候,異步操作可能還沒有完成。 如果你關(guān)心異常,可以考慮返回 responseFutureor 或者其他可選的機(jī)制。至少,應(yīng)該有異常的日志,否則該異常就會(huì)被吞噬了。
final CompletableFutureresponseFuture = asyncCode(); responseFuture.exceptionally(throwable -> { log.error("Unrecoverable error", throwable); return null; });
請小心上面的代碼:exceptionally() 試圖從失敗中恢復(fù)過來,返回一個(gè)可選的結(jié)果。這個(gè)地方雖可以正常的工作,但是如果對 exceptionally()和withthenAccept() 使用鏈?zhǔn)秸{(diào)用,即使失敗了也還是會(huì)調(diào)用 send() 方法,返回一個(gè)null參數(shù),或者任何其它從 exceptionally() 方法中返回的值。
responseFuture
.exceptionally(throwable -> {
log.error("Unrecoverable error", throwable);
return null;
})
.thenAccept(this::send); //probably not what you think丟失一秒超時(shí)的問題非常巧妙。我們原始的代碼在Future完成之前最多等待(阻塞)1秒,否則就會(huì)拋出 TimeoutException。我們丟失了這個(gè)功能,更糟糕的是,單元測試超時(shí)的不是很方便,經(jīng)常會(huì)跳過這個(gè)環(huán)節(jié)。為了維持超時(shí)機(jī)制,而又不破壞事件 驅(qū)動(dòng)的原則,我們需要建立一個(gè)額外的模塊:一個(gè)在給定時(shí)間后必定會(huì)失敗的 Future。
public staticCompletableFuture failAfter(Duration duration) { final CompletableFuture promise = new CompletableFuture<>(); scheduler.schedule(() -> { final TimeoutException ex = new TimeoutException("Timeout after " + duration); return promise.completeExceptionally(ex); }, duration.toMillis(), MILLISECONDS); return promise; } private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool( 1, new ThreadFactoryBuilder() .setDaemon(true) .setNameFormat("failAfter-%d") .build());
這個(gè)很簡單:我們創(chuàng)建一個(gè)promise(沒有后臺任務(wù)或線程池的 Future),然后在給定的 java.time.Duration 之后會(huì)拋出 TimeoutException 異常。如果在某個(gè)地方調(diào)用 get() 獲取這個(gè) Future,阻塞的時(shí)間到達(dá)這個(gè)指定的時(shí)間后會(huì)拋出 TimeoutException。
實(shí)際上,它是一個(gè)包裝了 TimeoutException 的 ExecutionException,這個(gè)無需多說。注意,我使用了固定一個(gè)線程的線程池。這不僅僅是為了教學(xué)的目的:這是“1個(gè)線程應(yīng)當(dāng)能滿足任何人 的需求”的場景。failAfter() 本身沒多大的用處,但是如果和 ourresponseFuture 一起使用,我們就能解決這個(gè)問題了。
final CompletableFutureresponseFuture = asyncCode(); final CompletableFuture oneSecondTimeout = failAfter(Duration.ofSeconds(1)); responseFuture .acceptEither(oneSecondTimeout, this::send) .exceptionally(throwable -> { log.error("Problem", throwable); return null; });
這里還做了很多其他事情。在后臺的任務(wù)接收 responseFuture 時(shí),我們也創(chuàng)建了一個(gè)“合成”的 oneSecondTimeout future,這在成功的時(shí)候永遠(yuǎn)不會(huì)執(zhí)行,但是在1秒后就會(huì)導(dǎo)致任務(wù)失敗?,F(xiàn)在我們聯(lián)合這兩個(gè)叫做 acceptEither,這個(gè)操作將執(zhí)行先完成 Future 的代碼塊,而簡單的忽略 responseFuture 或 oneSecondTimeout 中運(yùn)行比較慢的那個(gè)。如果 asyncCode() 代碼在1秒內(nèi)執(zhí)行完成,this::send 就會(huì)被調(diào)用,而 oneSecondTimeout 異常就不會(huì)拋出。但是,如果 asyncCode() 執(zhí)行真的很慢,oneSecondTimeout 異常就先拋出。由于一個(gè)異常導(dǎo)致任務(wù)失敗,exceptionallyerror 處理器就會(huì)被調(diào)用,而不是 this::send 方法。你可以選擇執(zhí)行 send() 或者 exceptionally,但是不能兩個(gè)都執(zhí)行。當(dāng)如,如果我們有兩個(gè)“普通”的 Future 正常執(zhí)行完成了,則***響應(yīng)的那個(gè)將調(diào)用 send() 方法,后面的就會(huì)被丟棄。
這個(gè)不是最清晰的解決方案。更清晰的方案是包裝原始的 Future,然后保證它能在給定的時(shí)間內(nèi)執(zhí)行。這種操作對 com.twitter.util.Future 是可行的(Scala叫做 within()),但是 scala.concurrent.Future 中沒有這個(gè)功能(據(jù)推測是為了鼓勵(lì)使用前面的方式)。我們暫時(shí)不討論Scala背后如何執(zhí)行的,先實(shí)現(xiàn)類似 CompletableFuture 的操作。它接受一個(gè) Future 作為輸入,然后返回一個(gè) Future,這個(gè) Future 在后臺任務(wù)完成時(shí)候執(zhí)行完成。但是,如果底層的 Future 執(zhí)行的時(shí)間太長,就或拋出異常:
public staticCompletableFuture within(CompletableFuture future, Duration duration) { final CompletableFuture timeout = failAfter(duration); return future.applyToEither(timeout, Function.identity()); }
這引導(dǎo)我們實(shí)現(xiàn)最終的、清晰的、靈活的方法:
final CompletableFutureresponseFuture = within( asyncCode(), Duration.ofSeconds(1)); responseFuture .thenAccept(this::send) .exceptionally(throwable -> { log.error("Unrecoverable error", throwable); return null; });
希望你喜歡這篇文章,因?yàn)槟阋呀?jīng)知道在Java里,實(shí)現(xiàn)響應(yīng)式編程不再是什么問題。
當(dāng)前文章:Java中使用CompletableFuture處理異步超時(shí)
文章位置:http://m.fisionsoft.com.cn/article/dhgohdc.html


咨詢
建站咨詢
