Reactor 錯誤處理


對資料流處理來說,資料的發送、轉換等時機是不確定的,為了能有效處理錯誤,Reactor 在資料流處理過程中若發生錯誤,會視為結束資料流的事件,錯誤直接傳播至資料流處理的尾端,若沒有任何處理,會拋出一個 Reactor 內部的 reactor.core.Exceptions$ErrorCallbackNotImplemented 例外,其中包裹著原本的例外。

若訂閱者打算處理錯誤,FluxMonosubscribe 方法,都有接受 errorConsumer 的重載版本,可以藉此獲得例外物件,這是訂閱者獲取例外訊息的方式:

Flux.just(-2, -1, 0, 1, 2)
    .map(n -> 10 / n)
    .subscribe(out::println, err::println);

如果在資料流轉換過程中發生例外,在訂閱者收到例外前想要處理例外,可以使用 doOnError 方法,然而,例外仍會傳播,訂閱者仍然能獲取例外,然而,若不指定 errorConsumer,就只是當成是資料流結束而已,不會拋出 Reactor 內部例外。

Flux.just(-2, -1, 0, 1, 2)
    .map(n -> 10 / n)
    .doOnError(e -> Loggers.getLogger(Main.class.getName()).error(e.getMessage()))
    .subscribe(out::println, err::println);

如果在資料流轉換過程中發生例外,想要做一些處理,不讓下游知道曾經發生過例外,那麼有 onErrorXXX 等方法可以使用,例如,若有設置 onErrorContinue 來處理例外,那麼資料流不會結束,不會導致例外的資料可以持續發送給下游:

Flux.just(-2, -1, 0, 1, 2)
    .map(n -> 10 / n)
    .onErrorContinue((err, n) -> out.println("bad data: " + n))
    .subscribe(out::println);

onErrorMap 可以用來將例外轉換為另一種例外。例如:

Flux.just(-2, -1, 0, 1, 2)
    .map(n -> 10 / n)
    .onErrorMap(cause -> new BadDataException(cause))
    .subscribe(out::println, err::println);

如果想在發生例外時,轉由從另一個發佈者接續發佈資料,可以使用 onErrorResume,例如:

Flux.just(-2, -1, 0, 1, 2)
    .map(n -> 10 / n)
    .onErrorResume(e -> Flux.just(10, 20))
    .subscribe(out::println);

如果你轉接的發佈者,本身發佈的是個例外,例如:

Flux.just(-2, -1, 0, 1, 2)
    .map(n -> 10 / n)
    .onErrorResume(e -> Flux.error(new RuntimeException(e)))
    .subscribe(out::println, err::println);

Flux.error 傳回 Flux<T>T 是指定的引數型態,就上例來說,也相當於一個結束訊號了,因此效果相當於 onErrorMap

onErrorReturn 可以在例外發生,指定另一個資料發佈到資料流中:

Flux.just(-2, -1, 0, 1, 2)
    .map(n -> 10 / n)
    .onErrorReturn(Integer.MAX_VALUE)
    .subscribe(out::println);

無論是正常結束或者是因例外而結束,如果打算在資料流結束之時,作些收尾的動作,可以使用 doFinally,指定的 Comsumer 接收一個 SignalType,可以用來得知結束的訊號類型為何。

如果發佈者的資料來源,是個必須在結束時關閉、清理的資源,可以使用 using,它有點像是 try-with-resource 語法,可以指定資源如何建立,如何從資源處建立訂閱者,以及最後結束時如何清理資源:

Flux.using(
    () -> createResourceFrom("somewhere"),
    resource -> Flux.fromIterable(resource),
    Resource::dispose
);

如果在發生錯誤時,想要嘗試重啟資料流,可以使用 retry,指定重試次數,這會結束原本的資料流,重新建立另一個資料流,並令訂閱者訂閱新的資料流,例如〈Handling Errors〉中有個範例:

Flux.interval(Duration.ofMillis(250))
.map(input -> {
    if (input < 3) return "tick " + input;
    throw new RuntimeException("boom");
})
.retry(1)
.elapsed() 
.subscribe(System.out::println, System.err::println); 

更多的錯誤處理,在〈Handling Errors〉中有詳細說明,並有簡單的 try/catch 對照。