新聞中心
1. 前言

很多同學(xué)反映對響應(yīng)式編程中的Flux和Mono這兩個Reactor中的概念有點懵逼。但是目前Java響應(yīng)式編程中我們對這兩個對象的接觸又最多,諸如Spring WebFlux、RSocket、R2DBC。我開始也對這兩個對象頭疼,所以今天我們就簡單來探討一下它們。
2. 響應(yīng)流的特點
要搞清楚這兩個概念,必須說一下響應(yīng)流規(guī)范。它是響應(yīng)式編程的基石。他具有以下特點:
- 響應(yīng)流必須是無阻塞的。
- 響應(yīng)流必須是一個數(shù)據(jù)流。
- 它必須可以異步執(zhí)行。
- 并且它也應(yīng)該能夠處理背壓。
背壓是反應(yīng)流中的一個重要概念,可以理解為,生產(chǎn)者可以感受到消費者反饋的消費壓力,并根據(jù)壓力進行動態(tài)調(diào)整生產(chǎn)速率。形象點可以按照下面理解:
有沒有背壓的兩種情形
3. Publisher
由于響應(yīng)流的特點,我們不能再返回一個簡單的POJO對象來表示結(jié)果了。必須返回一個類似Java中的Future的概念,在有結(jié)果可用時通知消費者進行消費響應(yīng)。
Reactive Stream規(guī)范中這種被定義為Publisher ,Publisher 是一個可以提供 0-N 個序列元素的提供者,并根據(jù)其訂閱者Subscriber 的需求推送元素。一個Publisher 可以支持多個訂閱者,并可以根據(jù)訂閱者的邏輯進行推送序列元素。下面這個Excel計算就能說明一些Publisher 的特點。
A1-A9就可以看做Publisher 及其提供的元素序列。A10-A13分別是求和函數(shù)SUM(A1:A9)、平均函數(shù)AVERAGE(A1:A9)、最大值函數(shù)MAX(A1:A9)、最小值函數(shù)MIN(A1:A9),可以看作訂閱者Subscriber。假如說我們沒有A10-A13,那么A1-A9就沒有實際意義,它們并不產(chǎn)生計算。這也是響應(yīng)式的一個重要特點:當(dāng)沒有訂閱時發(fā)布者什么也不做。
而Flux和Mono都是Publisher 在Reactor 3實現(xiàn)。Publisher 提供了subscribe方法,允許消費者在有結(jié)果可用時進行消費。如果沒有消費者Publisher 不會做任何事情,他根據(jù)消費情況進行響應(yīng)。Publisher 可能返回零或者多個,甚至可能是無限的,為了更加清晰表示期待的結(jié)果就引入了兩個實現(xiàn)模型Mono和Flux。
4. Flux
Flux是一個發(fā)出(emit)0-N個元素組成的異步序列的Publisher ,可以被onComplete信號或者onError信號所終止。在響應(yīng)流規(guī)范中存在三種給下游消費者調(diào)用的方法 onNext, onComplete, 和onError。下面這張圖表示了 Flux 的抽象模型:
Flux
以上的的講解對于初次接觸反應(yīng)式編程的依然是難以理解的,所以這里有一個循序漸進的理解過程。
有些類比并不是很妥當(dāng),但是對于你循序漸進的理解這些新概念還是有幫助的。
傳統(tǒng)數(shù)據(jù)處理
我們在平常是這么寫的:
- public List
allUsers() { - return Arrays.asList(new ClientUser("felord.cn", "reactive"),
- new ClientUser("Felordcn", "Reactor"));
- }
我們通過迭代返回值List來get這些元素進行再處理(消費),這種方式有點類似廚師做了很多菜,吃不吃在于食客。需要食客主動去來吃就行了(pull的方式),至于喜歡吃什么不喜歡吃什么自己隨意,怎么吃也自己隨意。
流式數(shù)據(jù)處理
在Java 8中我們可以改寫為流的表示:
- public Stream
allUsers() { - return Stream.of(new ClientUser("felord.cn", "reactive"),
- new ClientUser("Felordcn", "Reactor"));
- }
依然是廚師做了很多菜,但是這種就更加高級了一些,提供了菜品的搭配方式(不包含具體細節(jié)),食客可以按照說明根據(jù)自己的習(xí)慣搭配著去吃,一但開始概不退換,吃完為止,過期不候。
反應(yīng)式數(shù)據(jù)處理
在Reactor中我們又可以改寫為Flux表示:
- public Flux
allUsers(){ - return Flux.just(new ClientUser("felord.cn", "reactive"),
- new ClientUser("Felordcn", "Reactor"));
- }
這時候食客只需要訂餐就行了,做好了自然就呈上來,而且可以隨時根據(jù)食客的飯量進行調(diào)整。如果沒有食客訂餐那么廚師就什么都不用做。當(dāng)然不止有這么點特性,不過對于方便我們理解來說這就夠了。
5. Mono
Mono 是一個發(fā)出(emit)0-1個元素的Publisher ,可以被onComplete信號或者onError信號所終止。
Mono
這里就不翻譯了,整體和Flux差不多,只不過這里只會發(fā)出 0-1 個元素。也就是說不是有就是沒有。象Flux一樣,我們來看看Mono的演化過程以幫助理解。
傳統(tǒng)數(shù)據(jù)處理
- public ClientUser currentUser () {
- return isAuthenticated ? new ClientUser("felord.cn", "reactive") : null;
- }
直接返回符合條件的對象或者null。
Optional 的處理方式
- public Optional
currentUser () { - return isAuthenticated ? Optional.of(new ClientUser("felord.cn", "reactive"))
- : Optional.empty();
- }
這個Optional我覺得就有反應(yīng)式的那種味兒了,當(dāng)然它并不是反應(yīng)式。當(dāng)我們不從返回值Optional取其中具體的對象時,我們不清楚里面到底有沒有,但是Optional是一定客觀存在的,不會出現(xiàn)NPE問題。
反應(yīng)式數(shù)據(jù)處理
- public Mono
currentUser () { - return isAuthenticated ? Mono.just(new ClientUser("felord.cn", "reactive"))
- : Mono.empty();
- }
和Optional有點類似的機制,當(dāng)然Mono不是為了解決NPE問題的,它是為了處理響應(yīng)流中單個值(也可能是Void)而存在的。
6. 總結(jié)
Flux和Mono是Java反應(yīng)式中的重要概念,但是很多同學(xué)包括我在開始都難以理解它們。這其實是規(guī)定了兩種流式范式,這種范式讓數(shù)據(jù)具有一些新的特性,比如基于發(fā)布訂閱的事件驅(qū)動,異步流、背壓等等。另外數(shù)據(jù)是推送(Push)給消費者的以區(qū)別于平時我們的拉(Pull)模式。同時我們可以像Stream Api一樣使用類似map、flatmap等操作符(operator)來操作它們。對Flux和Mono這兩個概念需要花一些時間去理解它們,不能操之過急。
本文轉(zhuǎn)載自微信公眾號「碼農(nóng)小胖哥」,可以通過以下二維碼關(guān)注。轉(zhuǎn)載本文請聯(lián)系碼農(nóng)小胖哥公眾號。
新聞標題:我對響應(yīng)式編程中Mono和Flux的理解
分享地址:http://m.fisionsoft.com.cn/article/cdgdcog.html


咨詢
建站咨詢
