對資料流處理來說,資料的發送、轉換等時機是不確定的,為了能有效處理錯誤,Reactor 在資料流處理過程中若發生錯誤,會視為結束資料流的事件,錯誤直接傳播至資料流處理的尾端,若沒有任何處理,會拋出一個 Reactor 內部的 reactor.core.Exceptions$ErrorCallbackNotImplemented
例外,其中包裹著原本的例外。
若訂閱者打算處理錯誤,Flux
或 Mono
的 subscribe
方法,都有接受 errorConsumer
的重載版本,可以藉此獲得例外物件,這是訂閱者獲取例外訊息的方式:
Flux.just(-2, -1, 0, 1, 2)
.map(n -> 10 / n)
.subscribe(out::println, err::println);
如果在資料流轉換過程中發生例外,在訂閱者收到例外前想要處理例外,可以使用 doOnError
方法,然而,例外仍會傳播,訂閱者仍然能獲取例外,然而,若不指定 errorConsumer
,就只是當成是資料流結束而已,不會拋出 Reactor 內部例外。
Flux.just(-2, -1, 0, 1, 2)
.map(n -> 10 / n)
.doOnError(e -> Loggers.getLogger(Main.class.getName()).error(e.getMessage()))
.subscribe(out::println, err::println);
如果在資料流轉換過程中發生例外,想要做一些處理,不讓下游知道曾經發生過例外,那麼有 onErrorXXX 等方法可以使用,例如,若有設置 onErrorContinue
來處理例外,那麼資料流不會結束,不會導致例外的資料可以持續發送給下游:
Flux.just(-2, -1, 0, 1, 2)
.map(n -> 10 / n)
.onErrorContinue((err, n) -> out.println("bad data: " + n))
.subscribe(out::println);
onErrorMap
可以用來將例外轉換為另一種例外。例如:
Flux.just(-2, -1, 0, 1, 2)
.map(n -> 10 / n)
.onErrorMap(cause -> new BadDataException(cause))
.subscribe(out::println, err::println);
如果想在發生例外時,轉由從另一個發佈者接續發佈資料,可以使用 onErrorResume
,例如:
Flux.just(-2, -1, 0, 1, 2)
.map(n -> 10 / n)
.onErrorResume(e -> Flux.just(10, 20))
.subscribe(out::println);
如果你轉接的發佈者,本身發佈的是個例外,例如:
Flux.just(-2, -1, 0, 1, 2)
.map(n -> 10 / n)
.onErrorResume(e -> Flux.error(new RuntimeException(e)))
.subscribe(out::println, err::println);
Flux.error
傳回 Flux<T>
,T
是指定的引數型態,就上例來說,也相當於一個結束訊號了,因此效果相當於 onErrorMap
。
onErrorReturn
可以在例外發生,指定另一個資料發佈到資料流中:
Flux.just(-2, -1, 0, 1, 2)
.map(n -> 10 / n)
.onErrorReturn(Integer.MAX_VALUE)
.subscribe(out::println);
無論是正常結束或者是因例外而結束,如果打算在資料流結束之時,作些收尾的動作,可以使用 doFinally
,指定的 Comsumer
接收一個 SignalType
,可以用來得知結束的訊號類型為何。
如果發佈者的資料來源,是個必須在結束時關閉、清理的資源,可以使用 using
,它有點像是 try-with-resource 語法,可以指定資源如何建立,如何從資源處建立訂閱者,以及最後結束時如何清理資源:
Flux.using(
() -> createResourceFrom("somewhere"),
resource -> Flux.fromIterable(resource),
Resource::dispose
);
如果在發生錯誤時,想要嘗試重啟資料流,可以使用 retry
,指定重試次數,這會結束原本的資料流,重新建立另一個資料流,並令訂閱者訂閱新的資料流,例如〈Handling Errors〉中有個範例:
Flux.interval(Duration.ofMillis(250))
.map(input -> {
if (input < 3) return "tick " + input;
throw new RuntimeException("boom");
})
.retry(1)
.elapsed()
.subscribe(System.out::println, System.err::println);
更多的錯誤處理,在〈Handling Errors〉中有詳細說明,並有簡單的 try/catch 對照。