【JDK8】CompletableFuture 非同步處理


如果你要非同步(Asyncronous)讀取文字檔案,在檔案讀取完後做某些事,可以使用 ExecutorServicesubmit 一個 Runnable 物件,像是類似以下的流程:

public static Future readFileAsync(String file, Consumer<String> success,
                        Consumer<IOException> fail, ExecutorService service) {
    return service.submit(() -> {
        try {
            success.accept(new String(Files.readAllBytes(Paths.get(file))));
        } catch (IOException ex) {
            fail.accept(ex);
        }
    });
}

這麼一來,你就可使用以下非同步的風格來讀取一個文字檔案:

readFileAsync(args[0], 
    content -> out.println(content),  // 成功處理
    ex -> ex.printStackTrace(),       // 失敗處理
    Executors.newFixedThreadPool(10)
);

out.println(content)ex.printStackTrace() 會在與讀取檔案的同一個執行緒中進行,如果你想要在不同執行緒中進行,得再額外作些設計;另一方面,這種非同步操作使用的回呼(Callback)風格,在每次回呼中若又再度進行非同步操作及回呼,很容易寫出回呼地獄(Callback hell),造成可讀性不佳。例如若有個類似 readFileAsync 風格的非同步 processContentAsync 方法,用來再繼續處理 readFileAsync 讀取的檔案內容,那麼可能撰寫出以下的程式碼:

readFileAsync(args[0],
    content -> processContentAsync(content,
                 processedContent -> out.println(processedContent) ,
                 ex -> ex.printStackTrace(), service),
    ex -> ex.printStackTrace(), service);

實際上非同步處理的組合需求很多,為此,JDK8 新增了 java.util.concurrent.CompletableFuture,你可以使用它來改寫 readFileAsync,例如:

package cc.openhome;

import java.io.IOException;
import static java.lang.System.out;
import java.nio.file.*;
import java.util.concurrent.*;

public class Async {
    public static CompletableFuture<String> readFileAsync(
                       String file, ExecutorService service) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                return new String(Files.readAllBytes(Paths.get(file)));
            } catch(IOException ex) {
                throw new RuntimeException(ex);
            }
        }, service);
    }


    public static void main(String[] args) throws Exception {
        ExecutorService poolService = Executors.newFixedThreadPool(10);

        readFileAsync(args[0], poolService).whenComplete((ok, ex) -> {
            if(ex == null) {
                out.println(ok);
            } else {
                ex.printStackTrace();
            }
        }).join(); // join 是為了避免 main 執行緒在任務完成前就關閉ExecutorService

        poolService.shutdown();
    }
}

CompletableFuture 的靜態方法 supplyAsync 接受 Supplier 實例,可指定非同步執行任務,它會傳回 CompletableFuture 實例,你可以呼叫 whenCompleteBiConsumer 實例指定任務完成如何處理,第一個參數是 Supplier 的傳回值,若有例外發生則會指定給第二個參數,想要在任務完成後繼續非同步地處理,則可以使用 whenCompleteAsync 方法。

如果第一個 CompletableFuture 任務完成後,想要繼續以非同步方式來處理結果,可以使用 thenApplyAsync。例如:

readFileAsync(args[0], poolService)
     .thenApplyAsync(String::toUpperCase)
     .whenComplete((ok, ex) -> {
         if(ex == null) {
            out.println(ok);
         } else {
            ex.printStackTrace();
         }
     });

CompletableFuture 實例的方法,基本上都會有同步與非同步兩個版本,可以用 Async 後置名稱來區分,例如,thenApplyAsync 的同步版本就是 thenApply 方法。

OptionalStreamflatMap〉中談到,OptionalStream 中各定義有 map 方法,可讓你指定 OptionalStream 中的值 T 如何映射為值 U,然後傳回新的 OptionalStreamCompletableFuturethenApply(以及非同步的 thenApply 版本)其實就類似 OptionalStreammap,可讓你指定前一個 CompletableFuture 處理後的結果 T 如何映射為值 U,然後傳回新的 CompletableFuture

該份文件中也談到,OptionalStream 中也各定義有 flatMap 方法,可讓你指定 OptionalStream 中的值 TOptional<U>Stream<U> 之間的關係,CompletableFuture 也有個 thenCompose(以及非同步的 thenComposeAsnyc 版本),作用就類似 flatMap,可以讓你指定前一個 CompletableFuture 處理後的結果 T 如何映射為值 CompleteableFuture<U>,舉例來說,你想在 readFileAsync 傳回的 CompletableFuture<String> 處理完後,繼續組合 processContentAsync 方法傳回 CompletableFuture<String>,就可以如下撰寫:

readFileAsync(args[0], poolService)
        .thenCompose(content -> processContentAsync(content, poolService))
        .whenComplete((ok, ex) -> {
            if (ex == null) {
                out.println(ok);
            } else {
                ex.printStackTrace();
            }
        });

CompletableFuture 上還有許多方法可以使用,詳情除了參考 API 文件之中,還可以看看〈Java 8: Definitive guide to CompletableFuture〉這篇文章,有 JDK8 之前,可以使用 guava-libraries 的 ListenableFuture,有興趣的話可以參考〈ListenableFuture 聽取未來需求〉,其他各技術生態中的類似產物,可以參考〈Composable Future API〉的介紹。