【Guava 教學】(9)ListenableFuture 聽取未來需求



在許多語言中,多少都內建了一些非同步處理的方案,像是 Java,在 JDK5 後為了簡化非同步處理,提出了 Future 等相關 API,舉個例子來說,如果你要非同步地讀取文字檔案,在檔案讀取完後做某些事,直接使用 Future 實作可能會是類似以下的流程:
...
    public static void readFile(String file, 
         Consumer<String> success, Consumer<Throwable> fail, ExecutorService service) {
        service.submit(() -> {
            try {
                success.accept(new String(Files.readAllBytes(Paths.get(file))));
            } catch (IOException ex) {
                fail.accept(ex);
            }
        });
    }
...
這麼一來,你就可以用類似 Node.js 的風格來讀取一個文字檔案:
readFile(args[0], 
    content -> out.println(content),  // success
    ex -> ex.printStackTrace(),       // error
    Executors.newFixedThreadPool(10)
);
可以看到,就算搭配了 JDK8 的 Lambda,使用 Future 來實作 readFile 方法,也是囉嗦了一些。JDK7 新增了 AsynchronousFileChannel,可以有類似風格的撰寫方式,不過必須搭配 NIO 相關 API 來使用,就標準 API 來說,沒能讓這類需求在實作時淺顯易讀。
Guava 的 ListenableFuture 是一種 Future,可搭配 Guava 的 ListeningExecutorService 來使用,ListeningExecutorServicesubmit 方法會傳回 ListenableFuture 實例,可使用 Futures 類別的 addCallback 方法設定 FutureCallback 來處理任務執行成功與失敗時的處理器。例如用 Guava 的這些 API 重新實作以上的 readFile 方法會像是:
...
   public static void readFile(String file, 
                        Consumer<String> success, Consumer<Throwable> fail, 
                        ListeningExecutorService service) {

        ListenableFuture<String> future = service.submit(
            () -> new String(Files.readAllBytes(Paths.get(file)))
        );

        Futures.addCallback(future, new FutureCallback<String>() {
            public void onSuccess(String content) {
                success.accept(content);
            }
            public void onFailure(Throwable thrown) {
                fail.accept(thrown);
            }
        });
    }
...
readFile 的實作簡化且清楚多了,這麼一來,你就可以用以下風格來讀取一個文字檔案:
readFile(args[0], 
    content -> out.println(content),  // success
    ex -> ex.printStackTrace(),       // error
    MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10))
);
然而在非同步操作使用回呼(Callback)風格,在每次回呼中又再度進行非同步操作及回呼,很容易寫出回呼地獄(Callback hell),造成可讀性不佳。例如若有個類似 readFile 風格的非同步 asyncProcess 方法,用來再繼續處理 readFile 讀取的檔案內容,那麼可能撰寫出以下的程式碼:
readFile(args[0],
    content -> asyncProcess(content,
                            processedContent -> out.println(processedContent) ,
                            errorHandler, service),
    errorHandler, service);
為了避免可讀性變差的問題,你可以使用 Futures.transform 來進行相關回呼操作的組合,不過,若你照著 ListenableFutureExplained 文件中的說明來撰寫程式,也許會困惑,雖然使用 Guava 的 API 是有簡化了一些,不過可讀性好像還是有些差,其實你可以自行再做一些封裝。例如自行撰寫一個 asyncFuncFutureasyncComposeFuture 方法:
....
    public static <R> ListenableFuture<R> asyncFuncFuture(Callable<R> callable,
                                 ListeningExecutorService service) {

        return service.submit(() -> callable.call());
    }

    public static <P, R> ListenableFuture<R> asyncComposeFuture(
                            ListenableFuture<P> future, Function<P, R> f,
                            ListeningExecutorService service) {

        AsyncFunction<P, R> asyncFunc = new AsyncFunction<P, R>() {
            public ListenableFuture<R> apply(P content) throws Exception {
                return service.submit(() -> f.apply(content));
            }
        };

        return Futures.transform(future, asyncFunc, service);

    }
....
你可以傳給 asyncFuncFuture 一個函式物件,它會傳回 ListenableFuture 物件,用以非同步地執行你指定的函式物件,asyncComposeFuture 則用來將一個 ListenableFuture 物件的結果,以及一個希望非同步執行的函式物件組合在一起,實作中最後利用的是 Futures.transfor 再度傳回 ListenableFuture 物件,因此你可以一直組裝下去。
有了 asyncFuncFutureasyncComposeFuture 方法,接下來你就可以這麼撰寫程式了:
...
    ListeningExecutorService service = 
        MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));

    ListenableFuture<String> contentFuture = asyncFuncFuture(
        () -> new String(Files.readAllBytes(Paths.get(args[0]))), service);
    ListenableFuture<String> processedContentFuture = 
        asyncComposeFuture(contentFuture, content -> process(content), service);

    Futures.addCallback(processedContentFuture, new FutureCallback<String>() {
        public void onSuccess(String result) {
            out.println(result);
        }
        public void onFailure(Throwable t) {
            t.printStackTrace();
        }
    });
...
在撰寫非同步處理時,其實有許多方面在考量上與同步處理是不同的,可讀性是一個問題,上面的 asyncFuncFutureasyncComposeFuture 方法的實作方式,主要在實現可組裝的(Composable) Future 物件,不同程式語言實現可組裝性的方式不同,Guava 主要是透過 ListenableFuture 等 API,未來 JDK8 則有個 CompletableFuture,JavaScript 有不少程式庫可以實現這類組裝性,例如 q 程式庫等,你可以看看 Composable Future API 這篇文章中也有相關介紹。
有時候你會想要非同步但按照順序地執行一連串任務,例如非同步地讀取一連串檔案,但讀取時按照指定的順序:
readFile(args[0],
                content1 -> readFile(args[1], 
                                  content2 -> readFile(args[2],
                                              content3 -> /* 依序處理 content1, content2 與 content3 */ ,
                                              errHandler, service), 
                                  errHandler, service),
                errHandler, service);
在最後一次的回呼中,才會依序處理 content1content2content3,如果是這樣的話,你可以使用 Guava 的 Futures.allAsList 寫一個 readAllFiles
...
public static void readAllFiles(String[] files,
                               Consumer<List<String>> success, Consumer<Throwable> fail,
                               ListeningExecutorService service) {
    List<ListenableFuture<String>> listenables = new ArrayList<>(files.length);
    for(String file : files) {
        listenables.add(service.submit(
            () -> new String(Files.readAllBytes(Paths.get(file)))
        ));
    }

    ListenableFuture<List<String>> contentsFuture = Futures.allAsList(listenables);
    Futures.addCallback(contentsFuture, new FutureCallback<List<String>>() {
        public void onSuccess(List<String> contents) {
            success.accept(contents);
        }
        public void onFailure(Throwable thrwb) {
            fail.accept(thrwb);
        }
    });
}
...
這麼一來,你就可以使用這個 readAllFiles 來讓程式更容易閱讀:
readAllFiles(args, 
            contents -> process(contents), 
            ex -> ex.printStackTrace(), service);
實際上非同步處理時要注意的事項不只如此,可否注意到第一個的程式碼示範如何處理例外?readFile 在錯誤發生時,是將例外物件傳給指定的錯誤處理器,而不是從 Runnable 物件的 run 方法中重新拋出。因為是非同步,如果你設計 readFile 時,在錯誤發生時重新拋出例外,並希望你的客戶端如此處理例外的話:
...
try {
    readFile(args[0], content -> process(content), service);
} catch(Exception ex) {
    // 處理錯誤
}
...
實際上 readFile 是非同步地執行,因此不會阻斷(Blocked),程式會繼續離開 try-catch,因而實際例外發生時,早就不受 try-catch 的控制;在使用 Guava 的 FutureCallback 時,也切記別讓 onFailure 的實作空白,因為這就與實作 try-catch 時將 catch 留白沒兩樣。
類似這類與同步與非同步程式執行習慣不同的情況還有不少…大部份情況下,我們都習慣同步處理,面對非同步處理時,還有不少要學習的地方,JDK5 之後有了些標準 API 可以使用,一直到 JDK8 還在補充,可見得非同步處理日益重要,像 Guava 這類的程式庫也有不少簡化之處,然而最重要的,還是我們的思考方式,沒有這層思考,使用 Guava 只是少打了幾個字,可讀性也不見得會好到哪裡去,有了這層思考,在打造自己程式的可讀性時,使用 Guava 這類的程式庫,才可以得到它簡化上的益處。