新聞中心
Shuffle過程,也稱Copy階段。reduce task從各個map task上遠程拷貝一片數(shù)據(jù),并針對某一片數(shù)據(jù),如果其大小超過一定的閥值,則寫到磁盤上,否則直接放到內(nèi)存中。

成都創(chuàng)新互聯(lián)公司主營丹東網(wǎng)站建設的網(wǎng)絡公司,主營網(wǎng)站建設方案,app開發(fā)定制,丹東h5小程序開發(fā)搭建,丹東網(wǎng)站營銷推廣歡迎丹東等地區(qū)企業(yè)咨詢
MAP端
map函數(shù)開始產(chǎn)生輸出時,并不是簡單地將它寫到磁盤上。這個過程更復雜,它利用緩沖的方式寫到內(nèi)存并出于效率的目的進行預排序。
每個map任務都有一個環(huán)形緩沖區(qū)用于存儲任務輸出。在默認情況下,緩沖區(qū)的大小為100MB,這個值可以通過mapreduce.task.io.sort.mb屬性來調(diào)整。一旦緩沖內(nèi)容達到閾值(mapreduce.map.sort.spill.percent,默認為80%),一個后臺線程便開始把內(nèi)容溢寫(spill)到磁盤,在溢寫到磁盤的過程中,map輸出繼續(xù)寫道緩沖區(qū),但如果在此期間緩沖區(qū)被寫滿,map會被阻塞直到磁盤過程完成。溢寫過程按輪詢方式將緩沖區(qū)的內(nèi)容寫到mapreduce.cluster.local.dir屬性在作業(yè)特定子目錄下的指定的目錄中。在寫磁盤之前,線程首先根據(jù)數(shù)據(jù)最終要傳的reducer把數(shù)據(jù)劃分成相應的分區(qū)(partition,用戶也可自定義分區(qū)函數(shù),但默認的partitioner通過哈希函數(shù)來分區(qū),也很高效)。在每個分區(qū)中,后臺線程按鍵進行內(nèi)存中排序,如果有一個combiner函數(shù),它就在排序后的輸出上運行。運行combiner函數(shù)使得map輸出結(jié)果更緊湊,因此減少寫到磁盤的數(shù)據(jù)和傳遞給reducer的數(shù)據(jù)。
每次內(nèi)存緩沖區(qū)達到溢出閾值時,就會新建一個溢出文件(spill file),因此,在map任務寫完其最后一個輸出記錄后,會有幾個溢寫文件。在任務完成之前,溢寫文件被合并成一個已分區(qū)且已排序的輸出文件。配置屬性是mapreduce.task.io.sort.factor控制著一次最多能合并多少流,默認值是10.
如果至少存在3個溢寫文件(通過mapreduce.map.combine.minspills屬性設置)時,則combiner就會在輸出文件寫到磁盤之前再次運行。combiner可以在輸入上反復運行,但并不影響最終結(jié)果。如果只有1個或者2個溢寫文件,那么由于map輸出規(guī)模減少,因此不值得調(diào)用combiner帶來的開銷,因此不會為該map輸出再次運行combiner。
在將壓縮map輸出寫到磁盤的過程中對他進行壓縮往往是一個很好的主意,因為這樣寫磁盤的速度更快,節(jié)約磁盤空間,并且減少傳給reducer的數(shù)據(jù)量。在默認情況下,輸出時不壓縮的,但只要將mapreduce.map.output.compress設置為true,就可以輕松使用此功能。使用的壓縮庫由mapreduce.map.output.compress.codec指定。
reducer通過HTTP得到輸出文件的分區(qū)。用于文件分區(qū)的工作線程的數(shù)量由任務的mapreduce.shuffle.max.threads屬性控制,此設置針對的是每一個節(jié)點管理器,而不是針對每個map任務。默認值0將最大線程數(shù)設置為機器中處理器數(shù)量的兩倍。
REDUCE端
現(xiàn)在轉(zhuǎn)到處理過程的reduce部分。map輸出文件位于運行map任務的tasktracker的本地磁盤(注意,盡管map輸出經(jīng)常寫到map tasktracker 的本地磁盤,但reduce輸出并不這樣),現(xiàn)在,tasktracker需要為分區(qū)文件運行reduce任務。并且,reduce任務需要集群上若干個map任務的map輸出作為其特殊的分區(qū)文件。每個map任務的完成時間可能不同,因此在每個任務完成時,reduce任務就開始復制其輸出。這就是reduce任務的復制階段。reduce任務有少量復制線程,因此能夠并行取得map輸出。默認值是5個線程,但這個默認值可以修改設置mapreduce.reduce.shuffle.parallelcopies屬性即可。
如果map輸出相當小,會被復制到reduce任務JVM的內(nèi)存(緩沖區(qū)大小由mapreduce.reduce.shuffle.input.buffer.percent屬性控制,指定用于此用途的堆空間的百分比),否則,map輸出被復制到磁盤。一旦內(nèi)存緩沖區(qū)達到閾值大?。ㄓ?code>mapreduce.reduce.shuffle.merge.percent決定)或者達到map輸出閾值(由mapreduce.reduce.merge.inmen.threshold控制),則合并后溢出寫到磁盤中。如果指定combiner,則在合并期間運行它以降低寫入硬盤的數(shù)據(jù)量。
隨著磁盤上副本增多,后臺線程會將它們合并為更大的、排好序的文件。這會為后面的合并節(jié)省一些時間。注意,為了合并,壓縮的map輸出(通過map任務)都必須在內(nèi)存中被解壓縮。
復制完所有map輸出后,reduce任務進入排序階段(更恰當?shù)恼f法是合并階段,因為排序是在map端進行的),這個階段將合并map輸出,維持其順序排序。這是循環(huán)進行的。比如,如果有50個map輸出,而合并因子是10(10為默認設置,由mapreduce.task.io.sort.factor屬性設置,與map的合并類似),合并將進行5趟,每趟將10個文件合并成一個文件,因此最后有5個中間文件。
在最后階段,即reduce階段,直接把數(shù)據(jù)輸入reduce函數(shù),從而省略了一次磁盤往返行程,并沒有將這5個文件合并成一個已排序的文件作為最后一趟。最后的合并可以來自內(nèi)存和磁盤片段。
每趟合并的文件數(shù)實際上比事例中展示有所不同。目標是合并最少數(shù)量的文件以便滿足于最后一趟的合并系數(shù)。因此如果有40個文件,我們并不會在四趟中每趟合并10個文件從而得到4個文件。相反,第一趟只合并4個文件,隨后的三趟合并完整的10個文件。在最后一趟中,4個已合并的文件和余下的6個(未合并的)文件合計10個。
在reduce階段,對已排序輸出中的每個鍵都調(diào)用reduce函數(shù)。此階段的輸出直接寫到輸出文件系統(tǒng),一般為HDFS(可自定義)。如果采用HDFS,由于節(jié)點管理器也運行數(shù)據(jù)節(jié)點,所以第一個塊的副本將被寫入到本地磁盤。
本文題目:詳解MapReduceShuffle機制
網(wǎng)頁地址:http://m.5511xx.com/article/cocpeej.html


咨詢
建站咨詢
