Scheduler


在〈初試 Reactor〉中的範例,有訂閱者進行了 subscribe 的話,訂閱處理會是在同一個執行緒進行,這意謂著,若獲取資料來源是個阻斷操作的話,訂閱處理也會被阻斷,這也就是為何在〈初試 Reactor〉中首個範例會是這樣的結果:

JAVAPYTHONJAVASCRIPT
JAVA
PYTHON
JAVASCRIPT
javapythonjavascript

也就是按照各個 subscribe 的順序執行了各個訂閱者的處理,在前一個訂閱者處理完資料序列之前,下一個 subscribe 就不會執行。

如果希望訂閱處理可以在不同的執行緒進行,可以使用 subscribeOn,並指定 Scheduler 實例。例如:

package cc.openhome;

import java.util.Arrays;
import java.util.List;

import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

import static java.lang.System.out;

public class FsttReactor {
    public static void main(String[] args) throws InterruptedException {
        List<String> skillSource = Arrays.asList("java", "python", "javascript");

        Flux<String> skills = Flux.fromIterable(skillSource)
                                  .subscribeOn(Schedulers.parallel()); 

        Flux<String> upperSkills = skills.map(String::toUpperCase);
        Flux<String> chars = upperSkills.flatMap(skill -> Flux.fromArray(skill.split("")));

        chars.subscribe(out::print);
        out.println();
        upperSkills.subscribe(out::println);
        skills.subscribe(out::print);

        Thread.sleep(5000);
    }
}

由於訂閱處理在不同的執行緒中進行,在主執行緒結束後,Scheduler 安排的執行緒也會自動結束,為了能觀察,上頭使用了 Thread.sleep(5000),你會看到執行結果並不會是依訂閱的順序進行。

Reactor 提供了幾個預設的 Scheduler 實例,可以透過 Schedulersstatic 方法取得。Schedulers.parallel() 傳回的 Scheduler 實例,會按照 CPU 核心數決定執行緒數量,因而適合計算密集式處理;single 為可重用的單一執行緒;elastic 會重用已建立的執行緒,必要時建立新執行緒,執行緒閒置過久也會自動回收。你可以查詢 API 文件來瞭解細節。

對於一些會阻斷的場合,可以使用 subscribeOn 將之變為非同步處理,例如若 findUserById 若是基於 JDBC 查詢的阻斷操作,可如下建立一個可非同步訂閱的發佈者:

Mono<User> user = Mono.fromCallable(() -> findUserById("X1234")) 
                      .subscribeOn(Schedulers.elastic());

Reactor 中,有些發佈者預設就會使用某種 Scheduler,例如 Flux.interval 預設使用 parallel,可以以固定週期發佈:

    Flux<Long> seconds = Flux.interval(Duration.ofSeconds(1)).log();

    seconds.subscribe(out::println);
    seconds.subscribe(out::println);

    Thread.sleep(10000);

log 方法會自動記錄發佈、訂閱時的一些細節,有助於觀察發佈者與訂閱者的關係,像是底下的訊息中可看到使用了 parallel 的 Scheduler

[DEBUG] (main) Using Console logging
[ INFO] (main) onSubscribe(FluxInterval.IntervalRunnable)
[ INFO] (main) request(unbounded)
[ INFO] (main) onSubscribe(FluxInterval.IntervalRunnable)
[ INFO] (main) request(unbounded)
[ INFO] (parallel-1) onNext(0)
0
[ INFO] (parallel-2) onNext(0)
0
[ INFO] (parallel-1) onNext(1)
1
[ INFO] (parallel-2) onNext(1)
1
...略

你也可以試著在先前或〈初試 Reactor〉中的範例上,加上 log 來觀察有什麼不同。