在〈初試 Reactor〉中的範例,有訂閱者進行了 subscribe
的話,訂閱處理會是在同一個執行緒進行,這意謂著,若獲取資料來源是個阻斷操作的話,訂閱處理也會被阻斷,這也就是為何在〈初試 Reactor〉中首個範例會是這樣的結果:
JAVAPYTHONJAVASCRIPT
JAVA
PYTHON
JAVASCRIPT
javapythonjavascript
也就是按照各個 subscribe
的順序執行了各個訂閱者的處理,在前一個訂閱者處理完資料序列之前,下一個 subscribe
就不會執行。
如果希望訂閱處理可以在不同的執行緒進行,可以使用 subscribeOn
,並指定 Scheduler
實例。例如:
package cc.openhome;
import java.util.Arrays;
import java.util.List;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import static java.lang.System.out;
public class FsttReactor {
public static void main(String[] args) throws InterruptedException {
List<String> skillSource = Arrays.asList("java", "python", "javascript");
Flux<String> skills = Flux.fromIterable(skillSource)
.subscribeOn(Schedulers.parallel());
Flux<String> upperSkills = skills.map(String::toUpperCase);
Flux<String> chars = upperSkills.flatMap(skill -> Flux.fromArray(skill.split("")));
chars.subscribe(out::print);
out.println();
upperSkills.subscribe(out::println);
skills.subscribe(out::print);
Thread.sleep(5000);
}
}
由於訂閱處理在不同的執行緒中進行,在主執行緒結束後,Scheduler
安排的執行緒也會自動結束,為了能觀察,上頭使用了 Thread.sleep(5000)
,你會看到執行結果並不會是依訂閱的順序進行。
Reactor 提供了幾個預設的 Scheduler
實例,可以透過 Schedulers
的 static
方法取得。Schedulers.parallel()
傳回的 Scheduler
實例,會按照 CPU 核心數決定執行緒數量,因而適合計算密集式處理;single
為可重用的單一執行緒;elastic
會重用已建立的執行緒,必要時建立新執行緒,執行緒閒置過久也會自動回收。你可以查詢 API 文件來瞭解細節。
對於一些會阻斷的場合,可以使用 subscribeOn
將之變為非同步處理,例如若 findUserById
若是基於 JDBC 查詢的阻斷操作,可如下建立一個可非同步訂閱的發佈者:
Mono<User> user = Mono.fromCallable(() -> findUserById("X1234"))
.subscribeOn(Schedulers.elastic());
在 Reactor
中,有些發佈者預設就會使用某種 Scheduler
,例如 Flux.interval
預設使用 parallel
,可以以固定週期發佈:
Flux<Long> seconds = Flux.interval(Duration.ofSeconds(1)).log();
seconds.subscribe(out::println);
seconds.subscribe(out::println);
Thread.sleep(10000);
log
方法會自動記錄發佈、訂閱時的一些細節,有助於觀察發佈者與訂閱者的關係,像是底下的訊息中可看到使用了 parallel 的 Scheduler
:
[DEBUG] (main) Using Console logging
[ INFO] (main) onSubscribe(FluxInterval.IntervalRunnable)
[ INFO] (main) request(unbounded)
[ INFO] (main) onSubscribe(FluxInterval.IntervalRunnable)
[ INFO] (main) request(unbounded)
[ INFO] (parallel-1) onNext(0)
0
[ INFO] (parallel-2) onNext(0)
0
[ INFO] (parallel-1) onNext(1)
1
[ INFO] (parallel-2) onNext(1)
1
...略
你也可以試著在先前或〈初試 Reactor〉中的範例上,加上 log
來觀察有什麼不同。