Reactive 與 Java 9


iThome 網站首載:Reactive 與 Java 9

在 Java 9 新特性中,包含了由 JSR166 領導者 Doug Lea 提案的 Flow API,它被包含在 java.base 模組,是定義在 java.util.concurrent 套件中精簡的 Flow API,目的在於改進開發者的非同步串流資料處理,然而 Java 平台內部並沒有任何地方應用到 Flow API,若實際看看 API 文件,也覺得它實際上只是個空殼?

關於 Reactive Programming
現今對許多開發者而言,尤其是對前端開發者而言,聽到 Reactive Programming 應該不會覺得陌生,不少技術生態圈中都有著支援 Reactive 概念的框架,就 ReactiveX 而言,就有著 JavaScript 的 RxJS,Java 的 RxJava、Go 的 RxGo 等,清單可參考〈ReactiveX〉頁面,簡 單 來說,Reactive Programming 是一種程式設計典範,基於發佈訂閱(Publish-subscribe)的概念來處理非同步資料。

網路上有許多的資料都解釋了 Reactive Programming 的概念,我先前專欄〈FRP 與函數式〉也曾經舉試算表軟體中欄位間基於公式的連動作為例子,就實作上是將(事件)資料的迭代、轉換等底層細節隱藏起 來,就應用上更重要的是,從需求規格中辨識出高階資料流,結合函數式的概念,將資料流的處理意圖等突顯出來。

在 Java 這塊,社群的腳步總是遠快於 Java 標準本身,在好不容易推出了純為安撫使用者,而新特性殘缺不全的 Java 7 之後,接下來就是努力追趕社群企求的一些現代化特性,當 Java 8 好不容易跟上 Lambda、Stream 等 函數式概念,並且提供了可基於同步概念實現非同步處理的 CompletableFuture 之後,社群開始追求下 一塊關於   Reactive 的拼圖,而終於也在 2015 年一月,看到了 Doug Lea 提出了 Reactive 的侯選規格

如果你的身份之一是 Java 開發者,從未聽說過 Reactive Programming,甚至也沒使用過 Java 8 中的 Lambda、StreamCompletableFuture 等 API,可以先試著看看〈解 析 JDK8 Functional API〉,然後試著用函數式概念的來看看〈Java 8 Patterns〉,對於(Functional) Reactive Programming 在 Java 中的實現與應用,就能有個粗淺的概念。

Reactive Streams Specification
當試著瞭解從 RxJava 瞭解 Reactive Programming 的實作與應用時,會發現它已經來到了 RxJava 2.0,根據 RxJava 的〈What's different in 2.0〉頁面,其一開始就談到,為了符合 Reactive Streams Specification 的規範,RxJava 是整個重寫而成為 2.0,實際上,Reactive Streams Specification 本身亦是受到了 RxJava 1.x 許多影響而演化出來的規格。

在〈Reactive Streams〉中一開始 也指出,其主要目標之一是為非同步串流的壓力處理(Back pressure)定義一個標準,它僅提供一個最小的介面、方法與協定集合,最終的 DSL 相關 API 被特意排除在外,留待並鼓勵廠商或社群去各自實現。

就 Java 這部份具體來說,規格在 org.reactivestreams 套件下定義了 PublisherSubscriberSubscriptionProcessor 四個介面(詳細方法簽署等可參考 http://www.reactive-streams.org/reactive-streams-1.0.0-javadoc/),Publisher 實例會發佈資料串流,接受 Subscriber 的訂閱,並建立一個 Subscription 實 例代表該次訂閱,在訂閱成功事件發生時,會呼叫 SubscriberonSubscribe 並傳入 Subscription 實例。

SubscriptionPublisherSubscriber 之間溝通的橋樑,可以進行流量控制,這是為了避免訂閱者來不及消化資料流來源產 生的資料,而引發事件的持續堆積而造成記憶體的滿溢,Subscriber 可以透過傳入的 Subscription,使用 request(n)Publisher 請求 n 筆資料,或者是透過 cancel() 要求 Publisher 停止傳送資料並清除資源。   

資料流可能被轉換,Processor 同時扮演著 PublisherSubscriberProcessor 繼承了這兩個介面),在最前端的 Subscriber 與最末端的 Subscriber 之間,可以串接多個 Processor,每個 Processor 代表著整個資料流串的一個階段。

若開發者曾經使用過 RxJava,可能想要知道的是相同概念下 1.x 與 2.0 之間名稱的不同,這部份在 RxJava 的〈What's different in 2.0〉中有詳細的說明。

Java 9 Flow API
作為新特性之一,Java 9 引入了 Reactive Programming 的概念,具體來說,是在 java.util.concurrent.Flow 類別中定義了四個介面,它們遵守 Reactive Streams Specification 的規範,因此各介面下實際的方法簽署與 org.reactivestreams 套件下的定義是一樣的。

在 Java 9 中,Flow 類別基本上僅作為一個名稱空間,除了管理四個介面之外,本身只定義了PublisherSubscriber 的緩衝預設值,Flow API 中的唯一實作,就是 java.util.concurrent.SubmissionPublisher 類別,其實現了 Publisher 介 面,內部時使用 ForkJoinPool.commonPool() 作為預設實作,以非同步地對 Subscriber 傳遞資料。

除了像〈Reactive Programming with JDK 9 Flow API〉的介紹中一些簡單場合,可直接使用 SubmissionPublisher 之外,SubmissionPublisher 基本上是作為基礎類別,以便在繼承之後自行實現 Publisher 類別,或者是繼承之後同時實現 Processor 介面,以自行實作 Processor 類別,然而,Java 9 中並沒有其他任何 SubmissionPublisher 的子類別了,目前看來,Java 9 也沒有在內部使用到 Flow API。

如果真的要使用 Java 9 Flow API,目前來說必須自行實作,SubmissionPublisher 類別的 API 文件 提供了一些範例,而在 Flow 類 別的 API 文件中則提供了直接實作 PublisherSubscriber 介面的基本架構,其中也 包含了Subscription 的實作,有助於瞭解它是怎麼在 PublisherSubscriber 之間進行溝通。

有興趣的話,也可以看看 SubmissionPublisher 的原始碼,別忘了,在 Java 9 的模組化架構下,它已經被歸在 java.base 模組中,在打開原始碼壓縮檔之後,得在java/base目錄下,才能找到 java.util.concurrent 套 件以及底下相關的.java檔案。

等待實作品的Flow API
簡單來說,目前的 Java 9 中 Flow API 還是個空殼,或者另一說法是像 Doug Lea 談到的,在 CompletableFuture/CompletionStage 支援了以同步風格撰寫非同步程式,以及 java.util.stream 支援了群集 的 pull 風格之後,Java 還少了從主動源 push 資料的操作風格,支援這最小集合,有助於避免採取對主動資料源採用 pull 風格時的一些不愉快意外(unpleasant surprises)。

就目前而言,Java 9 Flow API 還需等待實作品,而由於 Flow API 也僅出現在 Java 9 上,對於其他版本平台,選擇支持 Reactive Streams Specification 的第三方程式庫會是更好的選擇(例如 RxJava 2.0),搭配 retrolambda以便使用 lambda 語法,將來真要遷移至 Java 9,過程應會比較和緩。

當然,保守的 Java 納入了 Reactive 的概念是一種象徵,代表著 Reactive 已是普及的開發選項之一,而面對這類從社群進入標準平台的典 範,瞭解其發展過程,往往也是更為重要的一環,無論是等待支援 Flow API 的實作品,或者採取 RxJava 2.0 之類的程式庫,瞭解 ReactiveX、Reactive Streams Specification 與它們之間的關係,將有助於瞭解驅動演進的問題與需求在哪,從而更能掌握何時以及如何應用這樣的典範。