到目前為止,資料流的行為都很單純,將一組資料發佈至資料流,然後結束資料流,然而,當事情變得比較複雜的時候,你可能會想要進一步地控制時間、條件來決定是否發佈訊息到資料流中,以及是否該完成整個資料流。
例如,你想要定時地發佈系統時間呢?Flux.interval
只能用來發佈從 0 開始的計數資料,Flux.fromStream
也沒辦法,因為得不斷地發佈系統時間,當 Stream
實例傳給 Flux.fromStream
之後,又要如何繼續發佈系統時間呢?
可以使用 Flux.generate
,正如其名稱,genertate
用來產生資料發佈至資料流,直到你明確地結束為止。例如:
Flux.generate(sink -> {
sink.next(System.currentTimeMillis());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
})
.subscribe(out::println);
你指定給 generate
的引數稱為產生器(generator),它會被不斷地呼叫,sink
的型態是 SynchronousSink
,要發佈訊號的話可透過它的 next
方法,在產生器的一次呼叫過程中,next
只能被呼叫一次,若要結束訊號的發佈,可以使用 complete
方法,上面的範例沒有呼叫 complete
,資料就會一直發佈至資料流。
底下是個發佈 10 次系統時間,就結束訊號發佈的例子,使用了另一個 generate
版本,第一個引數是個處理狀態的物件提供者:
Flux.generate(
() -> new AtomicInteger(0),
(counter, sink) -> {
if(counter.get() == 10) {
sink.complete();
}
sink.next(System.currentTimeMillis());
counter.incrementAndGet();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return counter;
}
)
.subscribe(out::println);
每一次要產生訊號,就會使用你提供給 generate
的 Consomer
或 BiFunction
,呼叫 accept
或 apply
時傳入 SynchronousSink
實例,在 accept
或 apply
前,一定得呼叫過 SynchronousSink
的 next
或 complete
等方法,不然就會引發錯誤訊息。
這意謂著,generate
只能用來處理同步問題,如果你想要在某事件發生時,再進行 next
的話,這類非同步操作就不能使用 generate
。
這時可以使用 create
,指定給 create
的引數被稱為發射器(Emitter)(它不是產生器,不會被不斷地呼叫),接受 FluxSync
實例,可以多次呼叫它的 next
方法,而且你可以在其他時間點呼叫 next
,若要完成資料流,可以呼叫 complete
方法。
例如,想在事件發生時發送訊號可以如下:
List<ActionListener> listeners = new ArrayList<>();
Flux.create(sink -> {
listeners.add(evt -> sink.next(evt));
})
.subscribe(out::println);
listeners.forEach(action -> action.actionPerformed(
new ActionEvent(Main.class, 0, "shit happens"))
);
由於你可以多次呼叫 next
方法,若訂閱者來不及消化,預設的策略是緩衝資料,你也可以在 create
時指定 FluxSink.OverflowStrategy
。
Spring Reactor 相對於其他 Reactive 框架來說,使用上比較直覺而簡單,隱藏了不少 Publisher
與 Subscriber
的細節,因此至今,你都可以不用直接面對 Publisher
與 Subscriber
,如果必要,也可以透過 Flux.from
來指定 Publisher
實例,或者是 subscribe
時指定 Subscriber
實例。