Functional Reactive Programming


Spring 5 包含了 Reactor,專案官方網站上寫著第四代 Reactive 程式庫,世代的區隔之說來自於〈Advanced Reactive Java〉,其實不用太在意世代劃分,重要的是知道,Reactor 支援 Functional Reactive Programming。

對於 Reactive Programming 一般常看到的定義是:

可以自動傳播資料流變化的程式設計典範。

在一般設計典範中,如果寫下了 c = b + 5,在該程式碼運算過後,變數 c 的值就固定了,若有其他流程導致變數 b 改變,c 的值並不會自動變化,然而在 Reactive Programming 的概念中,c 的值必須對 b 的值作出反應,這對多數開發者而言,似乎是有點陌生的功能概念。

實際上若用過試算表軟體,應該知道這類軟體的功能之一:可以在欄位 C1 輸入 =B1+5,如此就會將欄位 B1 的值加 5 後作為 C1 的值,如果使用者變動了 B1 的值,那麼變化會傳播,C1 的值也會自動反應變化;另外像是聯級(Cascade)表單、搜尋框自動提示等功能,某些程度都可以算是 Reactive 概念的實現。

想要實作出試算表欄位、聯級表單、搜尋框自動提示之類的功能,方式之一基於發佈訂閱模式,可以訂閱某事件,並在事件發生時獲得通知以執行對應的變化操作,當任務不複雜時,這個方式也沒什麼問題。

然而,如果事情變得複雜的話,例如,本來實作的流程是「欄位失焦時發出請求,請求完成後更新表單,表單欄位更新後顯示提示訊息」,現在若想「欄位失焦時發出請求,而請求完成後」另外發出一個請求呢?你得將新請求按插在相對的處理器之中,若另外想在「欄位失焦時發出請求,請求完成後更新表單,表單欄位更新後顯示提示訊息」後又做其他的任務呢?你得又找出對應的處理器,將原始碼安插進去。

也就是說你想要的是,能夠基於某個「事件流程」來做些組合與銜接,可想而知的,在事件間關係複雜之時,這種做法就會令程式碼流程變得錯綜複雜。

既然談到了「事件流程」,來想想,事件發生時不是會有事件相關的資料嗎?像是鍵盤事件會有按上哪個鍵的訊息,這些資料是在「事件流程」中流動,談到資料流動,就會讓人想到 Java 8 的 Stream API:

source.filter(n -> n > 10)
      .map(n -> n * 5)
      .forEach(out::println);

如果在「事件流程」中流動的事件資料,也能像上頭這樣處理就好了,例如底下的概念程式碼:

fromEvent("blur", field).flatMap(evt -> fromEvent('load', request(evt.text)))
                        .flatMap(evt -> fromEvent('change', changeName(evt.responseText)))
                        .subscribe(message::pop);

與 Stream API 不同的是,Stream 的資料來源是被動地提取,而事件這類資料是主動地發送,這樣就仍然可以處理先前談到的事件流程,若也能進一步銜接與組合:

var keywordLoaded = fromEvent("blur", field).flatMap(evt -> fromEvent('load', request(evt.text)));
var fieldChanged = keywordLoaded.flatMap(evt -> fromEvent('change', changeName(evt.responseText)));

那麼對 keywordLoaded 這個事件流有興趣的開發者,可以直接再銜接想進行的流程,對 fieldChanged 有興趣的開發者,也可以組合出自己的事件流,這些事件流也可以進一步再被組合,那就可以避免程式碼流程變得錯綜複雜了!

Reactive 在於辨識出可銜接與組合的資料流。

實際上要實作出程式庫來支援資料流的銜接、組合,必須得付出一番功夫,而且還得保持關切點清楚明白,也就是資料流清晰,銜接與組合也容易,這就是為什麼採用 Functional Programming 風格的原因,因為資料流的處理細節,被隱藏在各個高階操作之下,採用 Functional Programming 的 Reactive 設計,就被稱為 Reactive Functional Programming 了。

在 Reactive Functional Programming 的概念逐漸成形之際,由於各技術生態圈都有這方面的需求與實現,為了避免各搞各的,造成日後相容性的問題,因而催生了 Reactive Streams,在 ReactiveX 中有著各個技術生態圈的實現。

就 Java 這部份具體來說,規格在 org.reactivestreams 套件下定義了 PublisherSubscriberSubscriptionProcessor 四個介面(詳細方法簽署等可參考 JavaDoc)。

Publisher 實例會發佈資料串流,接受 Subscriber 的訂閱,並建立一個 Subscription 實例代表該次訂閱,在訂閱成功事件發生時,會呼叫 SubscriberonSubscribe 並傳入 Subscription 實例。

SubscriptionPublisherSubscriber 之間溝通的橋樑,可以進行流量控制,這是為了避免訂閱者來不及消化資料流來源產 生的資料,而引發事件的持續堆積而造成記憶體的滿溢,Subscriber 可以透過傳入的 Subscription,使用 request(n)Publisher 請求 n 筆資料,或者是透過 cancel 要求 Publisher 停止傳送資料並清除資源。

資料流可能被轉換,Processor 同時扮演著 PublisherSubscriberProcessor 繼承了這兩個介面),在最前端的 Subscriber 與最末端的 Subscriber 之間,可以串接多個 Processor,每個 Processor 代表著整個資料流串的一個階段。

在 Java 的領域,早期就有 RxJava 可以用來支援 Reactive Functional Programming,後來為了符合 Reactive Streams 規範,而重寫為 RxJava 2,目前的 RxJava 庫,就是 RxJava 2 的實現。

在 Java 標準平台這塊是在 Java 9 引入,具體來說,是在 java.util.concurrent.Flow 類別中定義了四個介面,它們遵守 Reactive Streams 的規範,因此各介面下實際的方法簽署與 org.reactivestreams 套件下的定義是一樣的,不過,Flow API 還是個空殼,還需等待實作品。

至於 Spring 的 Reactor,也實現了 Reactive Streams 規範,主要是用來支援 Reactive 堆疊,像是 Web Flux,問題來了,Spring 為何不直接基於 RxJava 2,而是打造專屬的 Reactive Streams 實作呢?

就技術而言,Reactor 是在 Java 8 的基礎上開發,並全面擁抱 Java 8 之後的新 API,像是 Lambda 相關介面、新日期與時間 API 等,這意謂著,專案如果還是基於 Java 7 或更早版本,就無法使用 Reactor。

在 API 層面,RxJava 2 有著因為歷史發展脈絡的原因,不得不保留一些令人容易困惑或混淆的型態或操作,而 Reactor 在這方面,都有著明確的對應 API 來取代。

另一方面,Reactor 較直覺易用,例如之後會介紹的 MonoFlux,實現了 Reactive Streams 的 Publisher 介面,並簡化了訊息發布,讓開發者在許多場合,不用處理 SubscriberSubscription 的細節(想想剛剛速談那幾個介面時,是不是有點頭暈?),在 Web Flux 中,MonoFlux 也是主要的操作對象。