初試 Reactor


來認識一下如何使用 Reactor,同樣地,使用 Gradle 來管理相依程式庫,因此可以建立你的 Gradle 專案,然後在 build.gradle 中加入:

compile 'io.projectreactor:reactor-core:3.2.3.RELEASE'
testCompile 'io.projectreactor:reactor-test:3.2.3.RELEASE'

reactor-core 是 Reactor 核心,然而,測試資料流其實是件有挑戰性的事情(想想看,你怎麼測試 Stream API 實作的功能),為此,Reactor 提供了 reactor-test,可以透過其 StepVerifier 來協助測試。

先來寫個簡單的程式,假設你有個資料來源(或許是來自網路、資料庫等),最後取得了一個 List<String>,後續程式有其他部份,都會對這個來源感興趣,並有自己的處理流程,而處理過後的資料,也會有各自感興趣的輸出:

package cc.openhome;

import java.util.Arrays;
import java.util.List;

import reactor.core.publisher.Flux;

import static java.lang.System.out;

public class FsttReactor {
    public static void main(String[] args) {
        List<String> skillSource = Arrays.asList("java", "python", "javascript");

        Flux<String> skills = Flux.fromIterable(skillSource);
        Flux<String> upperSkills = skills.map(String::toUpperCase);
        Flux<String> chars = upperSkills.flatMap(skill -> Flux.fromArray(skill.split("")));

        chars.subscribe(out::print);
        out.println();
        upperSkills.subscribe(out::println);
        skills.subscribe(out::print);
    }
}

若來源可能有 0 到 多個資料,可以使用 Flux,它是個 Publisher 實作,若程式上來源是 Iterable 實例,可以使用 FluxfromIterable 來建立 Flux 實例,後續可以使用 map 來做處理轉換,若處理轉換過程,需要組合另一個 Flux,可以使用 flatMap,對於某個 Flux 資料流有興趣的話,可以使用 subscribe,它有多個重載版本,這邊使用的是接受 Consumer 的版本,直接透過方法參考語法,將標準輸出作為訂閱者。

試著執行程式的話,就會看到各自的訂閱者,依各自的資料流輸出了各自的結果:

JAVAPYTHONJAVASCRIPT
JAVA
PYTHON
JAVASCRIPT
javapythonjavascript

你可能有不同型態的來源,Flux 有多個 fromXXX 方法,像是 fromStream 可接受 StreamfromArray 可接受陣列、from 可接受 Publisher 等,如果已經有個別的資料,也可以透過 just 方法來設定(just 這個名稱應該是來自 Haskell 中 Maybe 的建構式之一 Just):

Flux<String> skills = Flux.just("java", "python", "javascript");

Flux 代表 0 到多個資料的發佈者,對於 0 的情況,可以使用 Flux.just(),不過建議使用 Flux.empty() 比較清楚,當然,Flux 上頭還有許多方法,之後有機會用到再來談,你也可以查看一下 API 文件稍微先瞭解一下。

對於 0 或 1 個資料的發佈者,例如,資料庫查詢時 findUserById 之類的方法,有一個或沒有的情況,這方法若想傳回一個發佈者,可以使用 Mono,例如:

Mono.just("X1234").subscribe(out::println)
Mono.fromSupplier(() -> request("X1234")).subscribe(out::println);
Mono.justOrEmpty(instanceOrNull("X1234")).subscribe(out::println);

justOrEmpty 也有可以接受 Optional 的版本,Mono 也有多個 fromXXX 版本,像是 fromRunnablefromCallablefromFuture 等,便於銜接各式的資料來源。

基本上,上述的這些方法,在訂閱者 subscribe 前,發佈者並不會進行發佈,若想直接觸發發佈者,可以使用無引數的 subscribe()

就以上的例子來說,有訂閱者進行了 subscribe 的話,訂閱處理會是在同一個執行緒進行,可以透過 subscribeOn 安排在其他執行緒。

就這邊的範例來說,無論何時有訂閱者進行了 subscribe,都收到了一開始被發佈的資料,在 Reactive 的術語說詞中,這屬於冷流(Cold stream)發佈者,之後有機會再來談談熱流(Hot stream)發佈者,也就是後續訂閱者,只會收到訂閱之後才發佈的資料。

那麼,該怎麼進行測試呢?直接來看個例子,這應該不需要多解釋些什麼:

import org.junit.Test;

import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;

public class ReactorTest {

    private Flux<String> skills() {
        return Flux.just("java", "python", "javascript");
    }

    @Test
    public void testSkills() {
        StepVerifier.create(skills())
                    .expectNext("java", "python", "javascript")
                    .expectComplete()
                    .verify();
    }

    @Test
    public void testUpperSkills() {
        StepVerifier.create(skills().map(String::toUpperCase))
                    .expectNext("JAVA", "PYTHON", "JAVASCRIPT")
                    .expectComplete()
                    .verify();
    }
}

對 Reactor 在測試方面提供的輔助有興趣的話,可以進一步參考官方文件 Testing 的內容。

你可以在 Reactor 找到以上的範例專案。