新聞中心
4.1 簡介

10年積累的成都網(wǎng)站設(shè)計(jì)、做網(wǎng)站經(jīng)驗(yàn),可以快速應(yīng)對(duì)客戶對(duì)網(wǎng)站的新想法和需求。提供各種問題對(duì)應(yīng)的解決方案。讓選擇我們的客戶得到更好、更有力的網(wǎng)絡(luò)服務(wù)。我雖然不認(rèn)識(shí)你,你也不認(rèn)識(shí)我。但先網(wǎng)站制作后付款的網(wǎng)站建設(shè)流程,更有虎林免費(fèi)網(wǎng)站建設(shè)讓你可以放心的選擇與我們合作。
storm可以確保spout發(fā)送出來的每個(gè)消息都會(huì)被完整的處理。本章將會(huì)描述storm體系是如何達(dá)到這個(gè)目標(biāo)的,并將會(huì)詳述開發(fā)者應(yīng)該如何使用storm的這些機(jī)制來實(shí)現(xiàn)數(shù)據(jù)的可靠處理。
4.2 理解消息被完整處理
一個(gè)消息(tuple)從spout發(fā)送出來,可能會(huì)導(dǎo)致成百上千的消息基于此消息被創(chuàng)建。
我們來思考一下流式的“單詞統(tǒng)計(jì)”的例子:
storm任務(wù)從數(shù)據(jù)源(Kestrel queue)每次讀取一個(gè)完整的英文句子;將這個(gè)句子分解為獨(dú)立的單詞,***,實(shí)時(shí)的輸出每個(gè)單詞以及它出現(xiàn)過的次數(shù)。
本例中,每個(gè)從spout發(fā)送出來的消息(每個(gè)英文句子)都會(huì)觸發(fā)很多的消息被創(chuàng)建,那些從句子中分隔出來的單詞就是被創(chuàng)建出來的新消息。
這些消息構(gòu)成一個(gè)樹狀結(jié)構(gòu),我們稱之為“tuple tree”,看起來如圖1所示:
圖1 示例tuple tree
在什么條件下,Storm才會(huì)認(rèn)為一個(gè)從spout發(fā)送出來的消息被完整處理呢?答案就是下面的條件同時(shí)被滿足:
- tuple tree不再生長
- 樹中的任何消息被標(biāo)識(shí)為“已處理”
如果在指定的時(shí)間內(nèi),一個(gè)消息衍生出來的tuple tree未被完全處理成功,則認(rèn)為此消息未被完整處理。這個(gè)超時(shí)值可以通過任務(wù)級(jí)參數(shù)Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS 進(jìn)行配置,默認(rèn)超時(shí)值為30秒。
4.3 消息的生命周期
如果消息被完整處理或者未被完整處理,Storm會(huì)如何進(jìn)行接下來的操作呢?為了弄清這個(gè)問題,我們來研究一下從spout發(fā)出來的消息的生命周期。這里列出了spout應(yīng)該實(shí)現(xiàn)的接口:
首先, Storm使用spout實(shí)例的nextTuple()方法從spout請(qǐng)求一個(gè)消息(tuple)。 收到請(qǐng)求以后,spout使用open方法中提供的SpoutOutputCollector向它的輸出流發(fā)送一個(gè)或多個(gè)消息。每發(fā)送一個(gè)消 息,Spout會(huì)給這個(gè)消息提供一個(gè)message ID,它將會(huì)被用來標(biāo)識(shí)這個(gè)消息。
假設(shè)我們從kestrel隊(duì)列中讀取消息,Spout會(huì)將kestrel 隊(duì)列為這個(gè)消息設(shè)置的ID作為此消息的message ID。 向SpoutOutputCollector中發(fā)送消息格式如下:
接來下,這些消息會(huì)被發(fā)送到后續(xù)業(yè)務(wù)處理的bolts, 并且Storm會(huì)跟蹤由此消息產(chǎn)生出來的新消息。當(dāng)檢測到一個(gè)消息衍生出來的tuple tree被完整處理后,Storm會(huì)調(diào)用Spout中的ack方法,并將此消息的messageID作為參數(shù)傳入。同理,如果某消息處理超時(shí),則此消息對(duì) 應(yīng)的Spout的fail方法會(huì)被調(diào)用,調(diào)用時(shí)此消息的messageID會(huì)被作為參數(shù)傳入。
注意:一個(gè)消息只會(huì)由發(fā)送它的那個(gè)spout任務(wù)來調(diào)用ack或fail。如果系統(tǒng)中某個(gè)spout由多個(gè)任務(wù)運(yùn)行,消息也只會(huì)由創(chuàng)建它的spout任務(wù)來應(yīng)答(ack或fail),決不會(huì)由其他的spout任務(wù)來應(yīng)答。
我們繼續(xù)使用從kestrel隊(duì)列中讀取消息的例子來闡述高可靠性下spout需要做些什么(假設(shè)這個(gè)spout的名字是KestrelSpout)。
我們先簡述一下kestrel消息隊(duì)列:
當(dāng)KestrelSpout從kestrel隊(duì)列中讀取一個(gè)消息,表示它“打開”了隊(duì)列中某個(gè)消息。這意味著,此消息并未從隊(duì)列中真正的刪除,而是 將此消息設(shè)置為“pending”狀態(tài),它等待來自客戶端的應(yīng)答,被應(yīng)答以后,此消息才會(huì)被真正的從隊(duì)列中刪除。處于“pending”狀態(tài)的消息不會(huì)被 其他的客戶端看到。另外,如果一個(gè)客戶端意外的斷開連接,則由此客戶端“打開”的所有消息都會(huì)被重新加入到隊(duì)列中。當(dāng)消息被“打開”的時(shí) 候,kestrel隊(duì)列同時(shí)會(huì)為這個(gè)消息提供一個(gè)唯一的標(biāo)識(shí)。
KestrelSpout就是使用這個(gè)唯一的標(biāo)識(shí)作為這個(gè)tuple的messageID的。稍后當(dāng)ack或fail被調(diào)用的時(shí) 候,KestrelSpout會(huì)把a(bǔ)ck或者fail連同messageID一起發(fā)送給kestrel隊(duì)列,kestrel會(huì)將消息從隊(duì)列中真正刪除或者 將它重新放回隊(duì)列中。
#p#
4.4 可靠相關(guān)的API
為了使用Storm提供的可靠處理特性,我們需要做兩件事情:
- 無論何時(shí)在tuple tree中創(chuàng)建了一個(gè)新的節(jié)點(diǎn),我們需要明確的通知Storm;
- 當(dāng)處理完一個(gè)單獨(dú)的消息時(shí),我們需要告訴Storm 這棵tuple tree的變化狀態(tài)。
通過上面的兩步,storm就可以檢測到一個(gè)tuple tree何時(shí)被完全處理了,并且會(huì)調(diào)用相關(guān)的ack或fail方法。Storm提供了簡單明了的方法來完成上述兩步。
為tuple tree中指定的節(jié)點(diǎn)增加一個(gè)新的節(jié)點(diǎn),我們稱之為錨定(anchoring)。錨定是在我們發(fā)送消息的同時(shí)進(jìn)行的。為了更容易說明問題,我們使用下面代碼作為例子。本示例的bolt將包含整句話的消息分解為一系列的子消息,每個(gè)子消息包含一個(gè)單詞。
每個(gè)消息都通過這種方式被錨定:把輸入消息作為emit方法的***個(gè)參數(shù)。因?yàn)閣ord消息被錨定在了輸入消息上,這個(gè)輸入消息是spout發(fā)送過 來的tuple tree的根節(jié)點(diǎn),如果任意一個(gè)word消息處理失敗,派生這個(gè)tuple tree那個(gè)spout 消息將會(huì)被重新發(fā)送。
與此相反,我們來看看使用下面的方式emit消息時(shí),Storm會(huì)如何處理:
如果以這種方式發(fā)送消息,將會(huì)導(dǎo)致這個(gè)消息不會(huì)被錨定。如果此tuple tree中的消息處理失敗,派生此tuple tree的根消息不會(huì)被重新發(fā)送。根據(jù)任務(wù)的容錯(cuò)級(jí)別,有時(shí)候很適合發(fā)送一個(gè)非錨定的消息。
一個(gè)輸出消息可以被錨定在一個(gè)或者多個(gè)輸入消息上,這在做join或聚合的時(shí)候是很有用的。一個(gè)被多重錨定的消息處理失敗,會(huì)導(dǎo)致與之關(guān)聯(lián)的多個(gè)spout消息被重新發(fā)送。多重錨定通過在emit方法中指定多個(gè)輸入消息來實(shí)現(xiàn):
多重錨定會(huì)將被錨定的消息加到多棵tuple tree上。
注意:多重綁定可能會(huì)破壞傳統(tǒng)的樹形結(jié)構(gòu),從而構(gòu)成一個(gè)DAGs(有向無環(huán)圖),如圖2所示:
圖2 多重錨定構(gòu)成的鉆石型結(jié)構(gòu)
Storm的實(shí)現(xiàn)可以像處理樹那樣來處理DAGs。
錨定表明了如何將一個(gè)消息加入到指定的tuple tree中,高可靠處理API的接下來部分將向您描述當(dāng)處理完tuple tree中一個(gè)單獨(dú)的消息時(shí)我們?cè)撟鲂┦裁?。這些是通過OutputCollector 的ack和fail方法來實(shí)現(xiàn)的?;仡^看一下例子SplitSentence,可以發(fā)現(xiàn)當(dāng)所有的word消息被發(fā)送完成后,輸入的表示句子的消息會(huì)被應(yīng)答 (acked)。
每個(gè)被處理的消息必須表明成功或失?。╝cked 或者failed)。Storm是使用內(nèi)存來跟蹤每個(gè)消息的處理情況的,如果被處理的消息沒有應(yīng)答的話,遲早內(nèi)存會(huì)被耗盡!>
很多bolt遵循特定的處理流程: 讀取一個(gè)消息、發(fā)送它派生出來的子消息、在execute結(jié)尾處應(yīng)答此消息。一般的過濾器(filter)或者是簡單的處理功能都是這類的應(yīng)用。 Storm有一個(gè)BasicBolt接口封裝了上述的流程。示例SplitSentence可以使用BasicBolt來重寫:
使用這種方式,代碼比之前稍微簡單了一些,但是實(shí)現(xiàn)的功能是一樣的。發(fā)送到BasicOutputCollector的消息會(huì)被自動(dòng)的錨定到輸入消息,并且,當(dāng)execute執(zhí)行完畢的時(shí)候,會(huì)自動(dòng)的應(yīng)答輸入消息。
很多情況下,一個(gè)消息需要延遲應(yīng)答,例如聚合或者是join。只有根據(jù)一組輸入消息得到一個(gè)結(jié)果之后,才會(huì)應(yīng)答之前所有的輸入消息。并且聚合和join大部分時(shí)候?qū)敵鱿⒍际嵌嘀劐^定。然而,這些特性不是IBasicBolt所能處理的。
#p#
4.5 高效的實(shí)現(xiàn)tuple tree
Storm 系統(tǒng)中有一組叫做“acker”的特殊的任務(wù),它們負(fù)責(zé)跟蹤DAG(有向無環(huán)圖)中的每個(gè)消息。每當(dāng)發(fā)現(xiàn)一個(gè)DAG被完全處理,它就向創(chuàng)建這個(gè)根消息的spout任務(wù)發(fā)送一個(gè)信號(hào)。拓?fù)渲衋cker任務(wù)的并行度可以通過配置參數(shù)Config.TOPOLOGY_ACKERS來設(shè)置。默認(rèn)的acker任務(wù)并行度為1,當(dāng)系統(tǒng)中有大量的消息時(shí),應(yīng)該適當(dāng)提高acker任務(wù)的并發(fā)度。
為了理解Storm可靠性處理機(jī)制,我們從研究一個(gè)消息的生命周期和tuple tree的管理入手。當(dāng)一個(gè)消息被創(chuàng)建的時(shí)候(無論是在spout還是bolt中),系統(tǒng)都為該消息分配一個(gè)64bit的隨機(jī)值作為id。這些隨機(jī)的id 是acker用來跟蹤由spout消息派生出來的tuple tree的。
每個(gè)消息都知道它所在的tuple tree對(duì)應(yīng)的根消息的id。每當(dāng)bolt新生成一個(gè)消息,對(duì)應(yīng)tuple tree中的根消息的messageId就拷貝到這個(gè)消息中。當(dāng)這個(gè)消息被應(yīng)答的時(shí)候,它就把關(guān)于tuple tree變化的信息發(fā)送給跟蹤這棵樹的acker。例如,他會(huì)告訴acker:本消息已經(jīng)處理完畢,但是我派生出了一些新的消息,幫忙跟蹤一下吧。
舉個(gè)例子,假設(shè)消息D和E是由消息C派生出來的,這里演示了消息C被應(yīng)答時(shí),tuple tree是如何變化的。
因?yàn)樵贑被從樹中移除的同時(shí)D和E會(huì)被加入到tuple tree中,因此tuple tree不會(huì)被過早的認(rèn)為已完全處理。
關(guān)于Storm如何跟蹤tuple tree,我們?cè)偕钊氲奶接懸幌?。前面說過系統(tǒng)中可以有任意個(gè)數(shù)的acker,那么,每當(dāng)一個(gè)消息被創(chuàng)建或應(yīng)答的時(shí)候,它怎么知道應(yīng)該通知哪個(gè)acker呢?
系統(tǒng)使用一種哈希算法來根據(jù)spout消息的messageId確定由哪個(gè)acker跟蹤此消息派生出來的tuple tree。因?yàn)槊總€(gè)消息都知道與之對(duì)應(yīng)的根消息的messageId,因此它知道應(yīng)該與哪個(gè)acker通信。
當(dāng)spout發(fā)送一個(gè)消息的時(shí)候,它就通知對(duì)應(yīng)的acker一個(gè)新的根消息產(chǎn)生了,這時(shí)acker就會(huì)創(chuàng)建一個(gè)新的tuple tree。當(dāng)acker發(fā)現(xiàn)這棵樹被完全處理之后,他就會(huì)通知對(duì)應(yīng)的spout任務(wù)。
tuple是如何被跟蹤的呢?系統(tǒng)中有成千上萬的消息,如果為每個(gè)spout發(fā)送的消息都構(gòu)建一棵樹的話,很快內(nèi)存就會(huì) 耗盡。所以,必須采用不同的策略來跟蹤每個(gè)消息。由于使用了新的跟蹤算法,Storm只需要固定的內(nèi)存(大約20字節(jié))就可以跟蹤一棵樹。這個(gè)算法是 storm正確運(yùn)行的核心,也是storm***的突破。
acker任務(wù)保存了spout消息id到一對(duì)值的映射。***個(gè)值就是spout的任務(wù)id,通過這個(gè)id,acker 就知道消息處理完成時(shí)該通知哪個(gè)spout任務(wù)。第二個(gè)值是一個(gè)64bit的數(shù)字,我們稱之為“ack val”, 它是樹中所有消息的隨機(jī)id的異或結(jié)果。ack val表示了整棵樹的的狀態(tài),無論這棵樹多大,只需要這個(gè)固定大小的數(shù)字就可以跟蹤整棵樹。當(dāng)消息被創(chuàng)建和被應(yīng)答的時(shí)候都會(huì)有相同的消息id發(fā)送過來做異 或。
每當(dāng)acker發(fā)現(xiàn)一棵樹的ack val值為0的時(shí)候,它就知道這棵樹已經(jīng)被完全處理了。因?yàn)橄⒌碾S機(jī)ID是一個(gè)64bit的值,因此ack val在樹處理完之前被置為0的概率非常小。假設(shè)你每秒鐘發(fā)送一萬個(gè)消息,從概率上說,至少需要50,000,000年才會(huì)有機(jī)會(huì)發(fā)生一次錯(cuò)誤。即使如 此,也只有在這個(gè)消息確實(shí)處理失敗的情況下才會(huì)有數(shù)據(jù)的丟失!
4.6 選擇合適的可靠性級(jí)別
Acker任務(wù)是輕量級(jí)的,所以在拓?fù)渲胁⒉恍枰嗟腶cker存在??梢酝ㄟ^Storm UI來觀察acker任務(wù)的吞吐量,如果看上去吞吐量不夠的話,說明需要添加額外的acker。
如果你并不要求每個(gè)消息必須被處理(你允許在處理過程中丟失一些信息),那么可以關(guān)閉消息的可靠處理機(jī)制,從而可以獲取較好的性能。關(guān)閉消息的可靠 處理機(jī)制意味著系統(tǒng)中的消息數(shù)會(huì)減半(每個(gè)消息不需要應(yīng)答了)。另外,關(guān)閉消息的可靠處理可以減少消息的大?。ú恍枰總€(gè)tuple記錄它的根id了), 從而節(jié)省帶寬。
有三種方法可以關(guān)系消息的可靠處理機(jī)制:
- 將參數(shù)Config.TOPOLOGY_ACKERS設(shè)置為0,通過此方法,當(dāng)Spout發(fā)送一個(gè)消息的時(shí)候,它的ack方法將立刻被調(diào)用;
- 第二個(gè)方法是Spout發(fā)送一個(gè)消息時(shí),不指定此消息的messageID。當(dāng)需要關(guān)閉特定消息可靠性的時(shí)候,可以使用此方法;
- ***,如果你不在意某個(gè)消息派生出來的子孫消息的可靠性,則此消息派生出來的子消息在發(fā)送時(shí)不要做錨定,即在emit方法中不指定輸入消息。因?yàn)檫@些子孫消息沒有被錨定在任何tuple tree中,因此他們的失敗不會(huì)引起任何spout重新發(fā)送消息。
4.7 集群的各級(jí)容錯(cuò)
到現(xiàn)在為止,大家已經(jīng)理解了Storm的可靠性機(jī)制,并且知道了如何選擇不同的可靠性級(jí)別來滿足需求。接下來我們研究一下Storm如何保證在各種情況下確保數(shù)據(jù)不丟失。
3.7.1 任務(wù)級(jí)失敗
- 因?yàn)閎olt任務(wù)crash引起的消息未被應(yīng)答。此時(shí),acker中所有與此bolt任務(wù)關(guān)聯(lián)的消息都會(huì)因?yàn)槌瑫r(shí)而失敗,對(duì)應(yīng)spout的fail方法將被調(diào)用。
- acker任務(wù)失敗。如果acker任務(wù)本身失敗了,它在失敗之前持有的所有消息都將會(huì)因?yàn)槌瑫r(shí)而失敗。Spout的fail方法將被調(diào)用。
- Spout任務(wù)失敗。這種情況下,Spout任務(wù)對(duì)接的外部設(shè)備(如MQ)負(fù)責(zé)消息的完整性。例如當(dāng)客戶端異常的情況下,kestrel隊(duì)列會(huì)將處于pending狀態(tài)的所有的消息重新放回到隊(duì)列中。
4.7.2 任務(wù)槽(slot) 故障
- worker失敗。每個(gè)worker中包含數(shù)個(gè)bolt(或spout)任務(wù)。supervisor負(fù)責(zé)監(jiān)控這些任務(wù),當(dāng)worker失敗后,supervisor會(huì)嘗試在本機(jī)重啟它。
- supervisor失敗。supervisor是無狀態(tài)的,因此supervisor的失敗不會(huì)影響當(dāng)前正在運(yùn)行的任務(wù),只要及時(shí)的將它重新啟動(dòng)即可。supervisor不是自舉的,需要外部監(jiān)控來及時(shí)重啟。
- nimbus失敗。nimbus是無狀態(tài)的,因此nimbus的失敗不會(huì)影響當(dāng)前正在運(yùn)行的任務(wù)(nimbus失敗時(shí),無法提交新的任務(wù)),只要及時(shí)的將它重新啟動(dòng)即可。nimbus不是自舉的,需要外部監(jiān)控來及時(shí)重啟。
4.7.3. 集群節(jié)點(diǎn)(機(jī)器)故障
- storm集群中的節(jié)點(diǎn)故障。此時(shí)nimbus會(huì)將此機(jī)器上所有正在運(yùn)行的任務(wù)轉(zhuǎn)移到其他可用的機(jī)器上運(yùn)行。
- zookeeper集群中的節(jié)點(diǎn)故障。zookeeper保證少于半數(shù)的機(jī)器宕機(jī)仍可正常運(yùn)行,及時(shí)修復(fù)故障機(jī)器即可。
4.8 小結(jié)
本章介紹了storm集群如何實(shí)現(xiàn)數(shù)據(jù)的可靠處理。借助于創(chuàng)新性的tuple tree跟蹤技術(shù),storm高效的通過數(shù)據(jù)的應(yīng)答機(jī)制來保證數(shù)據(jù)不丟失。
storm集群中除nimbus外,沒有單點(diǎn)存在,任何節(jié)點(diǎn)都可以出故障而保證數(shù)據(jù)不會(huì)丟失。nimbus被設(shè)計(jì)為無狀態(tài)的,只要可以及時(shí)重啟,就不會(huì)影響正在運(yùn)行的任務(wù)。
原文鏈接:http://blog.linezing.com/2013/01/storm%E5%85%A5%E9%97%A8%E6%95%99%E7%A8%8B-%E7%AC%AC%E5%9B%9B%E7
標(biāo)題名稱:storm入門教程第四章消息的可靠處理
鏈接分享:http://m.fisionsoft.com.cn/article/djssgip.html


咨詢
建站咨詢
