詳解Java中的reactive stream協(xié)議
每個數(shù)據(jù)流都有一個生產(chǎn)者一個消費者。生產(chǎn)者負責產(chǎn)生數(shù)據(jù),而消費者負責消費數(shù)據(jù)。如果是同步系統(tǒng),生產(chǎn)一個消費一個沒什么問題。但是如果在異步系統(tǒng)中,就會產(chǎn)生問題。
因為生產(chǎn)者無法感知消費者的狀態(tài),不知道消費者到底是繁忙狀態(tài)還是空閑狀態(tài),是否有能力去消費更多的數(shù)據(jù)。
一般來說數(shù)據(jù)隊列的長度都是有限的,即使沒有做限制,但是系統(tǒng)的內(nèi)存也是有限的。當太多的數(shù)據(jù)沒有被消費的話,會導致內(nèi)存溢出或者數(shù)據(jù)得不到即使處理的問題。
這時候就需要back-pressure了。
如果消息接收方消息處理不過來,則可以通知消息發(fā)送方,告知其正在承受壓力,需要降低負載。back-pressure是一種消息反饋機制,從而使系統(tǒng)得以優(yōu)雅地響應負載, 而不是在負載下崩潰。
而reactive stream的目的就是用來管理異步服務的流數(shù)據(jù)交換,并能夠讓接收方自主決定接受數(shù)據(jù)的頻率。back-pressure就是reactive stream中不可或缺的一部分。
什么是reactive stream上面我們講到了reactive stream的作用,大家應該對reactive stream有了一個基本的了解。這里我們再給reactive stream做一個定義:
reactive stream就是一個異步stream處理的標準,它的特點就是非阻塞的back pressure。
reactive stream只是一個標準,它定義了實現(xiàn)非阻塞的back pressure的最小區(qū)間的接口,方法和協(xié)議。
所以reactive stream其實有很多種實現(xiàn)的,不僅僅是java可以使用reactive stream,其他的編程語言也可以。
reactive stream只是定義了最基本的功能,各大實現(xiàn)在實現(xiàn)了基本功能的同時可以自由擴展。
目前reactive stream最新的java版本是1.0.3,是在2019年8月23發(fā)布的。它包含了java API,協(xié)議定義文件,測試工具集合和具體的實現(xiàn)例子。
深入了解java版本的reactive stream在介紹java版本的reactive stream之前,我們先回顧一下reactive stream需要做哪些事情:
1.能夠處理無效數(shù)量的消息
2.消息處理是有順序的
3.可以異步的在組件之間傳遞消息
4.一定是非阻塞和backpressure的
為了實現(xiàn)這4個功能,reactive stream定義了4個接口,Publisher,Subscriber,Subscription,Processor。這四個接口實際上是一個觀察者模式的實現(xiàn)。接下來我們詳細來分析一下各個接口的作用和約定。
Publisher先看下Publisher的定義:
public interface Publisher<T> { public void subscribe(Subscriber<? super T> s);}
Publisher就是用來生成消息的。它定義了一個subscribe方法,傳入一個Subscriber。這個方法用來將Publisher和Subscriber進行連接。
一個Publisher可以連接多個Subscriber。
每次調(diào)用subscribe建立連接,都會創(chuàng)建一個新的Subscription,Subscription和subscriber是一一對應的。
一個Subscriber只能夠subscribe一次Publisher。
如果subscribe失敗或者被拒絕,則會出發(fā)Subscriber.onError(Throwable)方法。
Subscriber先看下Subscriber的定義:
public interface Subscriber<T> { public void onSubscribe(Subscription s); public void onNext(T t); public void onError(Throwable t); public void onComplete();}
Subscriber就是消息的接收者。
在Publisher和Subscriber建立連接的時候會觸發(fā)onSubscribe(Subscription s)方法。
當調(diào)用Subscription.request(long)方法時,onNext(T t)會被觸發(fā),根據(jù)request請求參數(shù)的大小,onNext會被觸發(fā)一次或者多次。
在發(fā)生異?;蛘呓Y(jié)束時會觸發(fā)onError(Throwable t)或者onComplete()方法。
Subscription先看下Subscription的定義:
public interface Subscription { public void request(long n); public void cancel();}
Subscription代表著一對一的Subscriber和Publisher之間的Subscribe關(guān)系。
request(long n)意思是向publisher請求多少個events,這會觸發(fā)Subscriber.onNext方法。
cancel()則是請求Publisher停止發(fā)送信息,并清除資源。
Processor先看下Processor的定義:
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {}
Processor即是Subscriber又是Publisher,它代表著一種處理狀態(tài)。
JDK中reactive stream的實現(xiàn)在JDK中java.util.concurrent.Flow就是reactive stream語義的一種實現(xiàn)。
Flow從JDK9就開始有了。我們看下它的結(jié)構(gòu):
從上圖我們可以看到在JDK中Flow是一個final class,而Subscriber,Publisher,Subscription,Processor都是它的內(nèi)部類。
總結(jié)reactive stream的出現(xiàn)有效的解決了異步系統(tǒng)中的背壓問題。只不過reactive stream只是一個接口標準或者說是一種協(xié)議,具體的實現(xiàn)還需要自己去實現(xiàn)。
以上就是詳解Java中的reactive stream協(xié)議的詳細內(nèi)容,更多關(guān)于Java中的reactive stream協(xié)議的資料請關(guān)注好吧啦網(wǎng)其它相關(guān)文章!
相關(guān)文章:
1. React+umi+typeScript創(chuàng)建項目的過程2. ASP.NET Core 5.0中的Host.CreateDefaultBuilder執(zhí)行過程解析3. SharePoint Server 2019新特性介紹4. ASP中常用的22個FSO文件操作函數(shù)整理5. 三個不常見的 HTML5 實用新特性簡介6. ASP調(diào)用WebService轉(zhuǎn)化成JSON數(shù)據(jù),附json.min.asp7. .Net core 的熱插拔機制的深入探索及卸載問題求救指南8. 無線標記語言(WML)基礎之WMLScript 基礎第1/2頁9. 讀大數(shù)據(jù)量的XML文件的讀取問題10. 解決ASP中http狀態(tài)跳轉(zhuǎn)返回錯誤頁的問題
