新聞中心
Shuffle過程,也稱Copy階段。reduce task從各個map task上遠(yuǎn)程拷貝一片數(shù)據(jù),并針對某一片數(shù)據(jù),如果其大小超過一定的閥值,則寫到磁盤上,否則直接放到內(nèi)存中。

成都創(chuàng)新互聯(lián)公司主營丹東網(wǎng)站建設(shè)的網(wǎng)絡(luò)公司,主營網(wǎng)站建設(shè)方案,app開發(fā)定制,丹東h5小程序開發(fā)搭建,丹東網(wǎng)站營銷推廣歡迎丹東等地區(qū)企業(yè)咨詢
MAP端
map函數(shù)開始產(chǎn)生輸出時,并不是簡單地將它寫到磁盤上。這個過程更復(fù)雜,它利用緩沖的方式寫到內(nèi)存并出于效率的目的進(jìn)行預(yù)排序。
每個map任務(wù)都有一個環(huán)形緩沖區(qū)用于存儲任務(wù)輸出。在默認(rèn)情況下,緩沖區(qū)的大小為100MB,這個值可以通過mapreduce.task.io.sort.mb屬性來調(diào)整。一旦緩沖內(nèi)容達(dá)到閾值(mapreduce.map.sort.spill.percent,默認(rèn)為80%),一個后臺線程便開始把內(nèi)容溢寫(spill)到磁盤,在溢寫到磁盤的過程中,map輸出繼續(xù)寫道緩沖區(qū),但如果在此期間緩沖區(qū)被寫滿,map會被阻塞直到磁盤過程完成。溢寫過程按輪詢方式將緩沖區(qū)的內(nèi)容寫到mapreduce.cluster.local.dir屬性在作業(yè)特定子目錄下的指定的目錄中。在寫磁盤之前,線程首先根據(jù)數(shù)據(jù)最終要傳的reducer把數(shù)據(jù)劃分成相應(yīng)的分區(qū)(partition,用戶也可自定義分區(qū)函數(shù),但默認(rèn)的partitioner通過哈希函數(shù)來分區(qū),也很高效)。在每個分區(qū)中,后臺線程按鍵進(jìn)行內(nèi)存中排序,如果有一個combiner函數(shù),它就在排序后的輸出上運行。運行combiner函數(shù)使得map輸出結(jié)果更緊湊,因此減少寫到磁盤的數(shù)據(jù)和傳遞給reducer的數(shù)據(jù)。
每次內(nèi)存緩沖區(qū)達(dá)到溢出閾值時,就會新建一個溢出文件(spill file),因此,在map任務(wù)寫完其最后一個輸出記錄后,會有幾個溢寫文件。在任務(wù)完成之前,溢寫文件被合并成一個已分區(qū)且已排序的輸出文件。配置屬性是mapreduce.task.io.sort.factor控制著一次最多能合并多少流,默認(rèn)值是10.
如果至少存在3個溢寫文件(通過mapreduce.map.combine.minspills屬性設(shè)置)時,則combiner就會在輸出文件寫到磁盤之前再次運行。combiner可以在輸入上反復(fù)運行,但并不影響最終結(jié)果。如果只有1個或者2個溢寫文件,那么由于map輸出規(guī)模減少,因此不值得調(diào)用combiner帶來的開銷,因此不會為該map輸出再次運行combiner。
在將壓縮map輸出寫到磁盤的過程中對他進(jìn)行壓縮往往是一個很好的主意,因為這樣寫磁盤的速度更快,節(jié)約磁盤空間,并且減少傳給reducer的數(shù)據(jù)量。在默認(rèn)情況下,輸出時不壓縮的,但只要將mapreduce.map.output.compress設(shè)置為true,就可以輕松使用此功能。使用的壓縮庫由mapreduce.map.output.compress.codec指定。
reducer通過HTTP得到輸出文件的分區(qū)。用于文件分區(qū)的工作線程的數(shù)量由任務(wù)的mapreduce.shuffle.max.threads屬性控制,此設(shè)置針對的是每一個節(jié)點管理器,而不是針對每個map任務(wù)。默認(rèn)值0將最大線程數(shù)設(shè)置為機(jī)器中處理器數(shù)量的兩倍。
REDUCE端
現(xiàn)在轉(zhuǎn)到處理過程的reduce部分。map輸出文件位于運行map任務(wù)的tasktracker的本地磁盤(注意,盡管map輸出經(jīng)常寫到map tasktracker 的本地磁盤,但reduce輸出并不這樣),現(xiàn)在,tasktracker需要為分區(qū)文件運行reduce任務(wù)。并且,reduce任務(wù)需要集群上若干個map任務(wù)的map輸出作為其特殊的分區(qū)文件。每個map任務(wù)的完成時間可能不同,因此在每個任務(wù)完成時,reduce任務(wù)就開始復(fù)制其輸出。這就是reduce任務(wù)的復(fù)制階段。reduce任務(wù)有少量復(fù)制線程,因此能夠并行取得map輸出。默認(rèn)值是5個線程,但這個默認(rèn)值可以修改設(shè)置mapreduce.reduce.shuffle.parallelcopies屬性即可。
如果map輸出相當(dāng)小,會被復(fù)制到reduce任務(wù)JVM的內(nèi)存(緩沖區(qū)大小由mapreduce.reduce.shuffle.input.buffer.percent屬性控制,指定用于此用途的堆空間的百分比),否則,map輸出被復(fù)制到磁盤。一旦內(nèi)存緩沖區(qū)達(dá)到閾值大?。ㄓ?code>mapreduce.reduce.shuffle.merge.percent決定)或者達(dá)到map輸出閾值(由mapreduce.reduce.merge.inmen.threshold控制),則合并后溢出寫到磁盤中。如果指定combiner,則在合并期間運行它以降低寫入硬盤的數(shù)據(jù)量。
隨著磁盤上副本增多,后臺線程會將它們合并為更大的、排好序的文件。這會為后面的合并節(jié)省一些時間。注意,為了合并,壓縮的map輸出(通過map任務(wù))都必須在內(nèi)存中被解壓縮。
復(fù)制完所有map輸出后,reduce任務(wù)進(jìn)入排序階段(更恰當(dāng)?shù)恼f法是合并階段,因為排序是在map端進(jìn)行的),這個階段將合并map輸出,維持其順序排序。這是循環(huán)進(jìn)行的。比如,如果有50個map輸出,而合并因子是10(10為默認(rèn)設(shè)置,由mapreduce.task.io.sort.factor屬性設(shè)置,與map的合并類似),合并將進(jìn)行5趟,每趟將10個文件合并成一個文件,因此最后有5個中間文件。
在最后階段,即reduce階段,直接把數(shù)據(jù)輸入reduce函數(shù),從而省略了一次磁盤往返行程,并沒有將這5個文件合并成一個已排序的文件作為最后一趟。最后的合并可以來自內(nèi)存和磁盤片段。
每趟合并的文件數(shù)實際上比事例中展示有所不同。目標(biāo)是合并最少數(shù)量的文件以便滿足于最后一趟的合并系數(shù)。因此如果有40個文件,我們并不會在四趟中每趟合并10個文件從而得到4個文件。相反,第一趟只合并4個文件,隨后的三趟合并完整的10個文件。在最后一趟中,4個已合并的文件和余下的6個(未合并的)文件合計10個。
在reduce階段,對已排序輸出中的每個鍵都調(diào)用reduce函數(shù)。此階段的輸出直接寫到輸出文件系統(tǒng),一般為HDFS(可自定義)。如果采用HDFS,由于節(jié)點管理器也運行數(shù)據(jù)節(jié)點,所以第一個塊的副本將被寫入到本地磁盤。
分享名稱:詳解MapReduceShuffle機(jī)制
轉(zhuǎn)載注明:http://m.fisionsoft.com.cn/article/cocpeej.html


咨詢
建站咨詢
