在 Java 9 新特性中,包含了由 JSR166 領導者 Doug Lea 提案的 Flow API,它被包含在
java.base
模組,是定義在 java.util.concurrent
套件中精簡的
Flow API,目的在於改進開發者的非同步串流資料處理,然而 Java 平台內部並沒有任何地方應用到 Flow API,若實際看看 API
文件,也覺得它實際上只是個空殼?- 關於 Reactive Programming
網路上有許多的資料都解釋了 Reactive Programming 的概念,我先前專欄〈FRP 與函數式〉也曾經舉試算表軟體中欄位間基於公式的連動作為例子,就實作上是將(事件)資料的迭代、轉換等底層細節隱藏起 來,就應用上更重要的是,從需求規格中辨識出高階資料流,結合函數式的概念,將資料流的處理意圖等突顯出來。
在 Java 這塊,社群的腳步總是遠快於 Java 標準本身,在好不容易推出了純為安撫使用者,而新特性殘缺不全的 Java 7 之後,接下來就是努力追趕社群企求的一些現代化特性,當 Java 8 好不容易跟上 Lambda、
Stream
等
函數式概念,並且提供了可基於同步概念實現非同步處理的 CompletableFuture
之後,社群開始追求下
一塊關於 Reactive 的拼圖,而終於也在 2015 年一月,看到了 Doug
Lea 提出了 Reactive 的侯選規格。如果你的身份之一是 Java 開發者,從未聽說過 Reactive Programming,甚至也沒使用過 Java 8 中的 Lambda、
Stream
、CompletableFuture
等
API,可以先試著看看〈解
析 JDK8 Functional API〉,然後試著用函數式概念的來看看〈Java
8 Patterns〉,對於(Functional) Reactive Programming 在 Java
中的實現與應用,就能有個粗淺的概念。- Reactive Streams Specification
在〈Reactive Streams〉中一開始 也指出,其主要目標之一是為非同步串流的壓力處理(Back pressure)定義一個標準,它僅提供一個最小的介面、方法與協定集合,最終的 DSL 相關 API 被特意排除在外,留待並鼓勵廠商或社群去各自實現。
就 Java 這部份具體來說,規格在
org.reactivestreams
套件下定義了 Publisher
、Subscriber
、
Subscription
與 Processor
四個介面(詳細方法簽署等可參考
http://www.reactive-streams.org/reactive-streams-1.0.0-javadoc/),Publisher
實例會發佈資料串流,接受 Subscriber
的訂閱,並建立一個 Subscription
實
例代表該次訂閱,在訂閱成功事件發生時,會呼叫 Subscriber
的 onSubscribe
並傳入 Subscription
實例。Subscription
是 Publisher
、Subscriber
之間溝通的橋樑,可以進行流量控制,這是為了避免訂閱者來不及消化資料流來源產 生的資料,而引發事件的持續堆積而造成記憶體的滿溢,Subscriber
可以透過傳入的 Subscription
,使用 request(n)
向 Publisher
請求 n 筆資料,或者是透過 cancel()
要求 Publisher
停止傳送資料並清除資源。 資料流可能被轉換,
Processor
同時扮演著 Publisher
與 Subscriber
(Processor
繼承了這兩個介面),在最前端的 Subscriber
與最末端的 Subscriber
之間,可以串接多個 Processor
,每個 Processor
代表著整個資料流串的一個階段。若開發者曾經使用過 RxJava,可能想要知道的是相同概念下 1.x 與 2.0 之間名稱的不同,這部份在 RxJava 的〈What's different in 2.0〉中有詳細的說明。
- Java 9 Flow API
java.util.concurrent.Flow
類別中定義了四個介面,它們遵守 Reactive Streams Specification 的規範,因此各介面下實際的方法簽署與 org.reactivestreams
套件下的定義是一樣的。在 Java 9 中,
Flow
類別基本上僅作為一個名稱空間,除了管理四個介面之外,本身只定義了Publisher
或 Subscriber
的緩衝預設值,Flow API 中的唯一實作,就是
java.util.concurrent.SubmissionPublisher
類別,其實現了 Publisher
介 面,內部時使用 ForkJoinPool.commonPool()
作為預設實作,以非同步地對 Subscriber
傳遞資料。除了像〈Reactive Programming with JDK 9 Flow API〉的介紹中一些簡單場合,可直接使用
SubmissionPublisher
之外,SubmissionPublisher
基本上是作為基礎類別,以便在繼承之後自行實現 Publisher
類別,或者是繼承之後同時實現 Processor
介面,以自行實作 Processor
類別,然而,Java 9 中並沒有其他任何 SubmissionPublisher
的子類別了,目前看來,Java 9 也沒有在內部使用到 Flow API。如果真的要使用 Java 9 Flow API,目前來說必須自行實作,
SubmissionPublisher
類別的 API 文件 提供了一些範例,而在 Flow
類 別的 API 文件中則提供了直接實作 Publisher
與 Subscriber
介面的基本架構,其中也 包含了Subscription
的實作,有助於瞭解它是怎麼在 Publisher
與 Subscriber
之間進行溝通。有興趣的話,也可以看看
SubmissionPublisher
的原始碼,別忘了,在 Java 9
的模組化架構下,它已經被歸在 java.base
模組中,在打開原始碼壓縮檔之後,得在java/base目錄下,才能找到 java.util.concurrent
套 件以及底下相關的.java檔案。- 等待實作品的Flow API
CompletableFuture
/CompletionStage
支援了以同步風格撰寫非同步程式,以及 java.util.stream
支援了群集 的 pull
風格之後,Java 還少了從主動源 push 資料的操作風格,支援這最小集合,有助於避免採取對主動資料源採用 pull
風格時的一些不愉快意外(unpleasant surprises)。就目前而言,Java 9 Flow API 還需等待實作品,而由於 Flow API 也僅出現在 Java 9 上,對於其他版本平台,選擇支持 Reactive Streams Specification 的第三方程式庫會是更好的選擇(例如 RxJava 2.0),搭配 retrolambda以便使用 lambda 語法,將來真要遷移至 Java 9,過程應會比較和緩。
當然,保守的 Java 納入了 Reactive 的概念是一種象徵,代表著 Reactive 已是普及的開發選項之一,而面對這類從社群進入標準平台的典 範,瞭解其發展過程,往往也是更為重要的一環,無論是等待支援 Flow API 的實作品,或者採取 RxJava 2.0 之類的程式庫,瞭解 ReactiveX、Reactive Streams Specification 與它們之間的關係,將有助於瞭解驅動演進的問題與需求在哪,從而更能掌握何時以及如何應用這樣的典範。