在許多語言中,多少都內建了一些非同步處理的方案,像是 Java,在 JDK5 後為了簡化非同步處理,提出了
Future
等相關 API,舉個例子來說,如果你要非同步地讀取文字檔案,在檔案讀取完後做某些事,直接使用 Future
實作可能會是類似以下的流程:
...
public static void readFile(String file,
Consumer<String> success, Consumer<Throwable> fail, ExecutorService service) {
service.submit(() -> {
try {
success.accept(new String(Files.readAllBytes(Paths.get(file))));
} catch (IOException ex) {
fail.accept(ex);
}
});
}
...
這麼一來,你就可以用類似 Node.js 的風格來讀取一個文字檔案:
readFile(args[0],
content -> out.println(content), // success
ex -> ex.printStackTrace(), // error
Executors.newFixedThreadPool(10)
);
可以看到,就算搭配了 JDK8 的 Lambda,使用 Future
來實作 readFile
方法,也是囉嗦了一些。JDK7 新增了 AsynchronousFileChannel
,可以有類似風格的撰寫方式,不過必須搭配 NIO 相關 API 來使用,就標準 API 來說,沒能讓這類需求在實作時淺顯易讀。Guava 的
ListenableFuture
是一種 Future
,可搭配 Guava 的 ListeningExecutorService
來使用,ListeningExecutorService
的 submit
方法會傳回 ListenableFuture
實例,可使用 Futures
類別的 addCallback
方法設定 FutureCallback
來處理任務執行成功與失敗時的處理器。例如用 Guava 的這些 API 重新實作以上的 readFile
方法會像是:
...
public static void readFile(String file,
Consumer<String> success, Consumer<Throwable> fail,
ListeningExecutorService service) {
ListenableFuture<String> future = service.submit(
() -> new String(Files.readAllBytes(Paths.get(file)))
);
Futures.addCallback(future, new FutureCallback<String>() {
public void onSuccess(String content) {
success.accept(content);
}
public void onFailure(Throwable thrown) {
fail.accept(thrown);
}
});
}
...
readFile
的實作簡化且清楚多了,這麼一來,你就可以用以下風格來讀取一個文字檔案:
readFile(args[0],
content -> out.println(content), // success
ex -> ex.printStackTrace(), // error
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10))
);
然而在非同步操作使用回呼(Callback)風格,在每次回呼中又再度進行非同步操作及回呼,很容易寫出回呼地獄(Callback hell),造成可讀性不佳。例如若有個類似 readFile
風格的非同步 asyncProcess
方法,用來再繼續處理 readFile
讀取的檔案內容,那麼可能撰寫出以下的程式碼:
readFile(args[0],
content -> asyncProcess(content,
processedContent -> out.println(processedContent) ,
errorHandler, service),
errorHandler, service);
為了避免可讀性變差的問題,你可以使用 Futures.transform
來進行相關回呼操作的組合,不過,若你照著 ListenableFutureExplained 文件中的說明來撰寫程式,也許會困惑,雖然使用 Guava 的 API 是有簡化了一些,不過可讀性好像還是有些差,其實你可以自行再做一些封裝。例如自行撰寫一個 asyncFuncFuture
與 asyncComposeFuture
方法:
....
public static <R> ListenableFuture<R> asyncFuncFuture(Callable<R> callable,
ListeningExecutorService service) {
return service.submit(() -> callable.call());
}
public static <P, R> ListenableFuture<R> asyncComposeFuture(
ListenableFuture<P> future, Function<P, R> f,
ListeningExecutorService service) {
AsyncFunction<P, R> asyncFunc = new AsyncFunction<P, R>() {
public ListenableFuture<R> apply(P content) throws Exception {
return service.submit(() -> f.apply(content));
}
};
return Futures.transform(future, asyncFunc, service);
}
....
你可以傳給 asyncFuncFuture
一個函式物件,它會傳回 ListenableFuture
物件,用以非同步地執行你指定的函式物件,asyncComposeFuture
則用來將一個 ListenableFuture
物件的結果,以及一個希望非同步執行的函式物件組合在一起,實作中最後利用的是 Futures.transfor
再度傳回 ListenableFuture
物件,因此你可以一直組裝下去。有了
asyncFuncFuture
與 asyncComposeFuture
方法,接下來你就可以這麼撰寫程式了:
...
ListeningExecutorService service =
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
ListenableFuture<String> contentFuture = asyncFuncFuture(
() -> new String(Files.readAllBytes(Paths.get(args[0]))), service);
ListenableFuture<String> processedContentFuture =
asyncComposeFuture(contentFuture, content -> process(content), service);
Futures.addCallback(processedContentFuture, new FutureCallback<String>() {
public void onSuccess(String result) {
out.println(result);
}
public void onFailure(Throwable t) {
t.printStackTrace();
}
});
...
在撰寫非同步處理時,其實有許多方面在考量上與同步處理是不同的,可讀性是一個問題,上面的 asyncFuncFuture
與 asyncComposeFuture
方法的實作方式,主要在實現可組裝的(Composable) Future
物件,不同程式語言實現可組裝性的方式不同,Guava 主要是透過 ListenableFuture
等 API,未來 JDK8 則有個 CompletableFuture
,JavaScript 有不少程式庫可以實現這類組裝性,例如 q 程式庫等,你可以看看 Composable Future API 這篇文章中也有相關介紹。有時候你會想要非同步但按照順序地執行一連串任務,例如非同步地讀取一連串檔案,但讀取時按照指定的順序:
readFile(args[0],
content1 -> readFile(args[1],
content2 -> readFile(args[2],
content3 -> /* 依序處理 content1, content2 與 content3 */ ,
errHandler, service),
errHandler, service),
errHandler, service);
在最後一次的回呼中,才會依序處理 content1
、content2
、content3
,如果是這樣的話,你可以使用 Guava 的 Futures.allAsList
寫一個 readAllFiles
:
...
public static void readAllFiles(String[] files,
Consumer<List<String>> success, Consumer<Throwable> fail,
ListeningExecutorService service) {
List<ListenableFuture<String>> listenables = new ArrayList<>(files.length);
for(String file : files) {
listenables.add(service.submit(
() -> new String(Files.readAllBytes(Paths.get(file)))
));
}
ListenableFuture<List<String>> contentsFuture = Futures.allAsList(listenables);
Futures.addCallback(contentsFuture, new FutureCallback<List<String>>() {
public void onSuccess(List<String> contents) {
success.accept(contents);
}
public void onFailure(Throwable thrwb) {
fail.accept(thrwb);
}
});
}
...
這麼一來,你就可以使用這個 readAllFiles
來讓程式更容易閱讀:
readAllFiles(args,
contents -> process(contents),
ex -> ex.printStackTrace(), service);
實際上非同步處理時要注意的事項不只如此,可否注意到第一個的程式碼示範如何處理例外?readFile
在錯誤發生時,是將例外物件傳給指定的錯誤處理器,而不是從 Runnable
物件的 run
方法中重新拋出。因為是非同步,如果你設計 readFile
時,在錯誤發生時重新拋出例外,並希望你的客戶端如此處理例外的話:
...
try {
readFile(args[0], content -> process(content), service);
} catch(Exception ex) {
// 處理錯誤
}
...
實際上 readFile
是非同步地執行,因此不會阻斷(Blocked),程式會繼續離開 try-catch
,因而實際例外發生時,早就不受 try-catch
的控制;在使用 Guava 的 FutureCallback
時,也切記別讓 onFailure
的實作空白,因為這就與實作 try-catch
時將 catch
留白沒兩樣。類似這類與同步與非同步程式執行習慣不同的情況還有不少…大部份情況下,我們都習慣同步處理,面對非同步處理時,還有不少要學習的地方,JDK5 之後有了些標準 API 可以使用,一直到 JDK8 還在補充,可見得非同步處理日益重要,像 Guava 這類的程式庫也有不少簡化之處,然而最重要的,還是我們的思考方式,沒有這層思考,使用 Guava 只是少打了幾個字,可讀性也不見得會好到哪裡去,有了這層思考,在打造自己程式的可讀性時,使用 Guava 這類的程式庫,才可以得到它簡化上的益處。