如果你要非同步(Asyncronous)讀取文字檔案,在檔案讀取完後做某些事,可以使用 ExecutorService
來 submit
一個 Runnable
物件,像是類似以下的流程:
public static Future readFileAsync(String file, Consumer<String> success,
Consumer<IOException> fail, ExecutorService service) {
return service.submit(() -> {
try {
success.accept(new String(Files.readAllBytes(Paths.get(file))));
} catch (IOException ex) {
fail.accept(ex);
}
});
}
這麼一來,你就可使用以下非同步的風格來讀取一個文字檔案:
readFileAsync(args[0],
content -> out.println(content), // 成功處理
ex -> ex.printStackTrace(), // 失敗處理
Executors.newFixedThreadPool(10)
);
out.println(content)
與 ex.printStackTrace()
會在與讀取檔案的同一個執行緒中進行,如果你想要在不同執行緒中進行,得再額外作些設計;另一方面,這種非同步操作使用的回呼(Callback)風格,在每次回呼中若又再度進行非同步操作及回呼,很容易寫出回呼地獄(Callback hell),造成可讀性不佳。例如若有個類似 readFileAsync
風格的非同步 processContentAsync
方法,用來再繼續處理 readFileAsync
讀取的檔案內容,那麼可能撰寫出以下的程式碼:
readFileAsync(args[0],
content -> processContentAsync(content,
processedContent -> out.println(processedContent) ,
ex -> ex.printStackTrace(), service),
ex -> ex.printStackTrace(), service);
實際上非同步處理的組合需求很多,為此,JDK8 新增了 java.util.concurrent.CompletableFuture
,你可以使用它來改寫 readFileAsync
,例如:
package cc.openhome;
import java.io.IOException;
import static java.lang.System.out;
import java.nio.file.*;
import java.util.concurrent.*;
public class Async {
public static CompletableFuture<String> readFileAsync(
String file, ExecutorService service) {
return CompletableFuture.supplyAsync(() -> {
try {
return new String(Files.readAllBytes(Paths.get(file)));
} catch(IOException ex) {
throw new RuntimeException(ex);
}
}, service);
}
public static void main(String[] args) throws Exception {
ExecutorService poolService = Executors.newFixedThreadPool(10);
readFileAsync(args[0], poolService).whenComplete((ok, ex) -> {
if(ex == null) {
out.println(ok);
} else {
ex.printStackTrace();
}
}).join(); // join 是為了避免 main 執行緒在任務完成前就關閉ExecutorService
poolService.shutdown();
}
}
CompletableFuture
的靜態方法 supplyAsync
接受 Supplier
實例,可指定非同步執行任務,它會傳回 CompletableFuture
實例,你可以呼叫 whenComplete
以 BiConsumer
實例指定任務完成如何處理,第一個參數是 Supplier
的傳回值,若有例外發生則會指定給第二個參數,想要在任務完成後繼續非同步地處理,則可以使用 whenCompleteAsync
方法。
如果第一個 CompletableFuture
任務完成後,想要繼續以非同步方式來處理結果,可以使用 thenApplyAsync
。例如:
readFileAsync(args[0], poolService)
.thenApplyAsync(String::toUpperCase)
.whenComplete((ok, ex) -> {
if(ex == null) {
out.println(ok);
} else {
ex.printStackTrace();
}
});
CompletableFuture
實例的方法,基本上都會有同步與非同步兩個版本,可以用 Async
後置名稱來區分,例如,thenApplyAsync
的同步版本就是 thenApply
方法。
〈Optional
與 Stream
的 flatMap
〉中談到,Optional
與 Stream
中各定義有 map
方法,可讓你指定 Optional
或 Stream
中的值 T
如何映射為值 U
,然後傳回新的 Optional
或 Stream
,CompletableFuture
的 thenApply
(以及非同步的 thenApply
版本)其實就類似 Optional
或 Stream
的 map
,可讓你指定前一個 CompletableFuture
處理後的結果 T
如何映射為值 U
,然後傳回新的 CompletableFuture
。
該份文件中也談到,Optional
與 Stream
中也各定義有 flatMap
方法,可讓你指定 Optional
或 Stream
中的值 T
與 Optional<U>
、Stream<U>
之間的關係,CompletableFuture
也有個 thenCompose
(以及非同步的 thenComposeAsnyc
版本),作用就類似 flatMap
,可以讓你指定前一個 CompletableFuture
處理後的結果 T
如何映射為值 CompleteableFuture<U>
,舉例來說,你想在 readFileAsync
傳回的 CompletableFuture<String>
處理完後,繼續組合 processContentAsync
方法傳回 CompletableFuture<String>
,就可以如下撰寫:
readFileAsync(args[0], poolService)
.thenCompose(content -> processContentAsync(content, poolService))
.whenComplete((ok, ex) -> {
if (ex == null) {
out.println(ok);
} else {
ex.printStackTrace();
}
});
CompletableFuture
上還有許多方法可以使用,詳情除了參考 API 文件之中,還可以看看〈Java 8: Definitive guide to CompletableFuture〉這篇文章,有 JDK8 之前,可以使用 guava-libraries 的 ListenableFuture
,有興趣的話可以參考〈ListenableFuture 聽取未來需求〉,其他各技術生態中的類似產物,可以參考〈Composable Future API〉的介紹。