冷流與熱流


到目前為止看到的資料流都是冷流(Cold stream),也就是無論何時有新的訂閱者,發佈者都會跑一次發佈流程,新訂閱者收到的會是完整資料序列,而不是從訂閱那一刻之後發佈流程發佈的資料。

如果發佈的是靜態資料比較容易理解,例如 Flux.just("java", "python", "javascript") 資料流,無論何時有新訂閱者,它都會收到 "java""python""javascript"

如果是動態資料呢?例如,Flux.interval(Duration.ofSeconds(1)),對每個新訂閱,發佈者都會跑一次發佈流程,這意謂著每個新訂閱者,收到的都會是從 0 開始的計數。例如:

Flux<Long> seconds = Flux.interval(Duration.ofSeconds(1));

seconds.subscribe(n -> out.println("sub1: " + n));

Thread.sleep(5000);

seconds.subscribe(n -> out.println("sub2: " + n));

Thread.sleep(5000);

這會看到以下的結果,可以看到第二個訂閱者,收到的也是從 0 開始的計數:

sub1: 0
sub1: 1
sub1: 2
sub1: 3
sub1: 4
sub1: 5
sub2: 0
sub1: 6
sub2: 1
sub1: 7
sub2: 2
...略

若希望發佈者在發佈流程的中途有了新訂閱者時,不要觸動發佈流程,而希望該訂閱者收到的資料,是現有發佈流程被訂閱那一刻之後發佈的資料,可以使用 publish 方法,將發佈轉為熱流(Hot stream),這會傳回一個 ConnectableFlux 實例。

publish 不會觸動發佈者開始發佈資料,訂閱 ConnectableFlux 實例也不會,你可以使用 connect 決定何時開始觸動發佈者。例如:

ConnectableFlux<Long> seconds = Flux.interval(Duration.ofSeconds(1)).publish();

seconds.subscribe(n -> out.println("sub1: " + n));
seconds.connect();

Thread.sleep(5000);

seconds.subscribe(n -> out.println("sub2: " + n));

Thread.sleep(5000);

在上頭的範例中,seconds 有了第一個訂閱者之後,呼叫了 connect 觸動發佈,5 秒之後,才有第二個訂閱者,然而這不會重新觸動發佈流程,只會收到現有發佈流程被訂閱那一刻之後發佈的資料,因此會看到以下的結果,可以看到第二個訂閱者收到的計數並不是從 0 開始:

sub1: 0
sub1: 1
sub1: 2
sub1: 3
sub1: 4
sub1: 5
sub2: 5
sub1: 6
sub2: 6
sub1: 7
sub2: 7
...略

底下的程式可用來觀察一下冷流有新的訂閱者時,發佈者都會跑一次發佈流程:

Flux<Integer> numbers = Flux.fromStream(() -> {
    out.println("publishing data ...");
    return Arrays.asList(1, 2, 3, 4).stream();
});

numbers.subscribe(n -> out.println("sub1: " + n));      
numbers.subscribe(n -> out.println("sub2: " + n));

你會看到底下的結果,提供的 Suppiler 被觸發了兩次:

publishing data ...
sub1: 1
sub1: 2
sub1: 3
sub1: 4
publishing data ...
sub2: 1
sub2: 2
sub2: 3
sub2: 4

如果轉為熱流,就只會觸發一次:

Flux<Integer> numbers = Flux.fromStream(() -> {
    out.println("publishing data ...");
    return Arrays.asList(1, 2, 3, 4).stream();
}).publish().autoConnect();

numbers.subscribe(n -> out.println("sub1: " + n));      
numbers.subscribe(n -> out.println("sub2: " + n));

autoConnect 表示一有新訂閱者就觸發,因此第一次 subscribe 時會執行 Suppiler,因為是熱流,第二次並不會觸發 Suppiler,結果就會如下:

publishing data ...
sub1: 1
sub1: 2
sub1: 3
sub1: 4