到目前為止看到的資料流都是冷流(Cold stream),也就是無論何時有新的訂閱者,發佈者都會跑一次發佈流程,新訂閱者收到的會是完整資料序列,而不是從訂閱那一刻之後發佈流程發佈的資料。
如果發佈的是靜態資料比較容易理解,例如 Flux.just("java", "python", "javascript")
資料流,無論何時有新訂閱者,它都會收到 "java"
、"python"
、"javascript"
。
如果是動態資料呢?例如,Flux.interval(Duration.ofSeconds(1))
,對每個新訂閱,發佈者都會跑一次發佈流程,這意謂著每個新訂閱者,收到的都會是從 0 開始的計數。例如:
Flux<Long> seconds = Flux.interval(Duration.ofSeconds(1));
seconds.subscribe(n -> out.println("sub1: " + n));
Thread.sleep(5000);
seconds.subscribe(n -> out.println("sub2: " + n));
Thread.sleep(5000);
這會看到以下的結果,可以看到第二個訂閱者,收到的也是從 0 開始的計數:
sub1: 0
sub1: 1
sub1: 2
sub1: 3
sub1: 4
sub1: 5
sub2: 0
sub1: 6
sub2: 1
sub1: 7
sub2: 2
...略
若希望發佈者在發佈流程的中途有了新訂閱者時,不要觸動發佈流程,而希望該訂閱者收到的資料,是現有發佈流程被訂閱那一刻之後發佈的資料,可以使用 publish
方法,將發佈轉為熱流(Hot stream),這會傳回一個 ConnectableFlux
實例。
publish
不會觸動發佈者開始發佈資料,訂閱 ConnectableFlux
實例也不會,你可以使用 connect
決定何時開始觸動發佈者。例如:
ConnectableFlux<Long> seconds = Flux.interval(Duration.ofSeconds(1)).publish();
seconds.subscribe(n -> out.println("sub1: " + n));
seconds.connect();
Thread.sleep(5000);
seconds.subscribe(n -> out.println("sub2: " + n));
Thread.sleep(5000);
在上頭的範例中,seconds
有了第一個訂閱者之後,呼叫了 connect
觸動發佈,5 秒之後,才有第二個訂閱者,然而這不會重新觸動發佈流程,只會收到現有發佈流程被訂閱那一刻之後發佈的資料,因此會看到以下的結果,可以看到第二個訂閱者收到的計數並不是從 0 開始:
sub1: 0
sub1: 1
sub1: 2
sub1: 3
sub1: 4
sub1: 5
sub2: 5
sub1: 6
sub2: 6
sub1: 7
sub2: 7
...略
底下的程式可用來觀察一下冷流有新的訂閱者時,發佈者都會跑一次發佈流程:
Flux<Integer> numbers = Flux.fromStream(() -> {
out.println("publishing data ...");
return Arrays.asList(1, 2, 3, 4).stream();
});
numbers.subscribe(n -> out.println("sub1: " + n));
numbers.subscribe(n -> out.println("sub2: " + n));
你會看到底下的結果,提供的 Suppiler
被觸發了兩次:
publishing data ...
sub1: 1
sub1: 2
sub1: 3
sub1: 4
publishing data ...
sub2: 1
sub2: 2
sub2: 3
sub2: 4
如果轉為熱流,就只會觸發一次:
Flux<Integer> numbers = Flux.fromStream(() -> {
out.println("publishing data ...");
return Arrays.asList(1, 2, 3, 4).stream();
}).publish().autoConnect();
numbers.subscribe(n -> out.println("sub1: " + n));
numbers.subscribe(n -> out.println("sub2: " + n));
autoConnect
表示一有新訂閱者就觸發,因此第一次 subscribe
時會執行 Suppiler
,因為是熱流,第二次並不會觸發 Suppiler
,結果就會如下:
publishing data ...
sub1: 1
sub1: 2
sub1: 3
sub1: 4