Flux.generate 或 Flux.create?


到目前為止,資料流的行為都很單純,將一組資料發佈至資料流,然後結束資料流,然而,當事情變得比較複雜的時候,你可能會想要進一步地控制時間、條件來決定是否發佈訊息到資料流中,以及是否該完成整個資料流。

例如,你想要定時地發佈系統時間呢?Flux.interval 只能用來發佈從 0 開始的計數資料,Flux.fromStream 也沒辦法,因為得不斷地發佈系統時間,當 Stream 實例傳給 Flux.fromStream 之後,又要如何繼續發佈系統時間呢?

可以使用 Flux.generate,正如其名稱,genertate 用來產生資料發佈至資料流,直到你明確地結束為止。例如:

Flux.generate(sink -> {
    sink.next(System.currentTimeMillis());
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
})
.subscribe(out::println);

你指定給 generate 的引數稱為產生器(generator),它會被不斷地呼叫,sink 的型態是 SynchronousSink,要發佈訊號的話可透過它的 next 方法,在產生器的一次呼叫過程中,next 只能被呼叫一次,若要結束訊號的發佈,可以使用 complete 方法,上面的範例沒有呼叫 complete,資料就會一直發佈至資料流。

底下是個發佈 10 次系統時間,就結束訊號發佈的例子,使用了另一個 generate 版本,第一個引數是個處理狀態的物件提供者:

Flux.generate(
    () -> new AtomicInteger(0),
    (counter, sink) -> {
        if(counter.get() == 10) {
            sink.complete();
        }
        sink.next(System.currentTimeMillis());
        counter.incrementAndGet();
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return counter;
    }
)
.subscribe(out::println);

每一次要產生訊號,就會使用你提供給 generateConsomerBiFunction,呼叫 acceptapply 時傳入 SynchronousSink 實例,在 acceptapply 前,一定得呼叫過 SynchronousSinknextcomplete 等方法,不然就會引發錯誤訊息。

這意謂著,generate 只能用來處理同步問題,如果你想要在某事件發生時,再進行 next 的話,這類非同步操作就不能使用 generate

這時可以使用 create,指定給 create 的引數被稱為發射器(Emitter)(它不是產生器,不會被不斷地呼叫),接受 FluxSync 實例,可以多次呼叫它的 next 方法,而且你可以在其他時間點呼叫 next,若要完成資料流,可以呼叫 complete 方法。

例如,想在事件發生時發送訊號可以如下:

List<ActionListener> listeners = new ArrayList<>();

Flux.create(sink -> {
    listeners.add(evt -> sink.next(evt));
})
.subscribe(out::println);

listeners.forEach(action -> action.actionPerformed(
    new ActionEvent(Main.class, 0, "shit happens"))
);

由於你可以多次呼叫 next 方法,若訂閱者來不及消化,預設的策略是緩衝資料,你也可以在 create 時指定 FluxSink.OverflowStrategy

Spring Reactor 相對於其他 Reactive 框架來說,使用上比較直覺而簡單,隱藏了不少 PublisherSubscriber 的細節,因此至今,你都可以不用直接面對 PublisherSubscriber,如果必要,也可以透過 Flux.from 來指定 Publisher 實例,或者是 subscribe 時指定 Subscriber 實例。