來認識一下如何使用 Reactor,同樣地,使用 Gradle 來管理相依程式庫,因此可以建立你的 Gradle 專案,然後在 build.gradle 中加入:
compile 'io.projectreactor:reactor-core:3.2.3.RELEASE'
testCompile 'io.projectreactor:reactor-test:3.2.3.RELEASE'
reactor-core
是 Reactor 核心,然而,測試資料流其實是件有挑戰性的事情(想想看,你怎麼測試 Stream API 實作的功能),為此,Reactor 提供了 reactor-test
,可以透過其 StepVerifier
來協助測試。
先來寫個簡單的程式,假設你有個資料來源(或許是來自網路、資料庫等),最後取得了一個 List<String>
,後續程式有其他部份,都會對這個來源感興趣,並有自己的處理流程,而處理過後的資料,也會有各自感興趣的輸出:
package cc.openhome;
import java.util.Arrays;
import java.util.List;
import reactor.core.publisher.Flux;
import static java.lang.System.out;
public class FsttReactor {
public static void main(String[] args) {
List<String> skillSource = Arrays.asList("java", "python", "javascript");
Flux<String> skills = Flux.fromIterable(skillSource);
Flux<String> upperSkills = skills.map(String::toUpperCase);
Flux<String> chars = upperSkills.flatMap(skill -> Flux.fromArray(skill.split("")));
chars.subscribe(out::print);
out.println();
upperSkills.subscribe(out::println);
skills.subscribe(out::print);
}
}
若來源可能有 0 到 多個資料,可以使用 Flux
,它是個 Publisher
實作,若程式上來源是 Iterable
實例,可以使用 Flux
的 fromIterable
來建立 Flux
實例,後續可以使用 map
來做處理轉換,若處理轉換過程,需要組合另一個 Flux
,可以使用 flatMap
,對於某個 Flux
資料流有興趣的話,可以使用 subscribe
,它有多個重載版本,這邊使用的是接受 Consumer
的版本,直接透過方法參考語法,將標準輸出作為訂閱者。
試著執行程式的話,就會看到各自的訂閱者,依各自的資料流輸出了各自的結果:
JAVAPYTHONJAVASCRIPT
JAVA
PYTHON
JAVASCRIPT
javapythonjavascript
你可能有不同型態的來源,Flux
有多個 fromXXX 方法,像是 fromStream
可接受 Stream
,fromArray
可接受陣列、from
可接受 Publisher
等,如果已經有個別的資料,也可以透過 just
方法來設定(just 這個名稱應該是來自 Haskell 中 Maybe
的建構式之一 Just
):
Flux<String> skills = Flux.just("java", "python", "javascript");
Flux
代表 0 到多個資料的發佈者,對於 0 的情況,可以使用 Flux.just()
,不過建議使用 Flux.empty()
比較清楚,當然,Flux
上頭還有許多方法,之後有機會用到再來談,你也可以查看一下 API 文件稍微先瞭解一下。
對於 0 或 1 個資料的發佈者,例如,資料庫查詢時 findUserById
之類的方法,有一個或沒有的情況,這方法若想傳回一個發佈者,可以使用 Mono
,例如:
Mono.just("X1234").subscribe(out::println)
Mono.fromSupplier(() -> request("X1234")).subscribe(out::println);
Mono.justOrEmpty(instanceOrNull("X1234")).subscribe(out::println);
justOrEmpty
也有可以接受 Optional
的版本,Mono
也有多個 fromXXX
版本,像是 fromRunnable
、fromCallable
、fromFuture
等,便於銜接各式的資料來源。
基本上,上述的這些方法,在訂閱者 subscribe
前,發佈者並不會進行發佈,若想直接觸發發佈者,可以使用無引數的 subscribe()
。
就以上的例子來說,有訂閱者進行了 subscribe
的話,訂閱處理會是在同一個執行緒進行,可以透過 subscribeOn
安排在其他執行緒。
就這邊的範例來說,無論何時有訂閱者進行了 subscribe
,都收到了一開始被發佈的資料,在 Reactive 的術語說詞中,這屬於冷流(Cold stream)發佈者,之後有機會再來談談熱流(Hot stream)發佈者,也就是後續訂閱者,只會收到訂閱之後才發佈的資料。
那麼,該怎麼進行測試呢?直接來看個例子,這應該不需要多解釋些什麼:
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
public class ReactorTest {
private Flux<String> skills() {
return Flux.just("java", "python", "javascript");
}
@Test
public void testSkills() {
StepVerifier.create(skills())
.expectNext("java", "python", "javascript")
.expectComplete()
.verify();
}
@Test
public void testUpperSkills() {
StepVerifier.create(skills().map(String::toUpperCase))
.expectNext("JAVA", "PYTHON", "JAVASCRIPT")
.expectComplete()
.verify();
}
}
對 Reactor 在測試方面提供的輔助有興趣的話,可以進一步參考官方文件 Testing 的內容。
你可以在 Reactor 找到以上的範例專案。