成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

分布式計(jì)算框架MapReduce

Tecode / 2093人閱讀

1 MapReduce概念 和 MapReduce編程模型
什么是MapReduce

  • 源于Google的MapReduce論文(2004年12月)
  • Hadoop的MapReduce是Google論文的開(kāi)源實(shí)現(xiàn)
  • MapReduce優(yōu)點(diǎn): 海量數(shù)據(jù)離線處理&易開(kāi)發(fā)
  • MapReduce缺點(diǎn): 實(shí)時(shí)流式計(jì)算

MapReduce分而治之的思想

  • 數(shù)錢實(shí)例:一堆鈔票,各種面值分別是多少
  • 單點(diǎn)策略
  • 一個(gè)人數(shù)所有的鈔票,數(shù)出各種面值有多少?gòu)?/li>
  • 分治策略
  • 每個(gè)人分得一堆鈔票,數(shù)出各種面值有多少?gòu)?/li>
  • 匯總,每個(gè)人負(fù)責(zé)統(tǒng)計(jì)一種面值
  • 解決數(shù)據(jù)可以切割進(jìn)行計(jì)算的應(yīng)用

MapReduce編程分Map和Reduce階段

  • 將作業(yè)拆分成Map階段和Reduce階段
  • Map階段 Map Tasks 分:把復(fù)雜的問(wèn)題分解為若干"簡(jiǎn)單的任務(wù)"
  • Reduce階段: Reduce Tasks 合:reduce

MapReduce編程執(zhí)行步驟

  • 準(zhǔn)備MapReduce的輸入數(shù)據(jù)
  • 準(zhǔn)備Mapper數(shù)據(jù)
  • Shuffle
  • Reduce處理
  • 結(jié)果輸出

編程模型

  • 借鑒函數(shù)式編程方式
  • 用戶只需要實(shí)現(xiàn)兩個(gè)函數(shù)接口:

Map(in_keyin_value)

--->(out_keyintermediate_value) list

Reduce(out_keyintermediate_value) list

--->out_value list

  • Word Count 詞頻統(tǒng)計(jì)案例

2 應(yīng)用MRJob編寫MapReduce代碼
mrjob 簡(jiǎn)介

  • 使用python開(kāi)發(fā)在Hadoop上運(yùn)行的程序 mrjob是最簡(jiǎn)單的方式
  • mrjob程序可以在本地測(cè)試運(yùn)行也可以部署到Hadoop集群上運(yùn)行
  • 如果不想成為hadoop專家 但是需要利用Hadoop寫MapReduce代碼mrJob是很好的選擇

mrjob 安裝

  • 使用pip安裝

pip install mrjob
mrjob實(shí)現(xiàn)WordCount

from mrjob.job import MRJob

class MRWordCount(MRJob):

    #每一行從line中輸入
    def mapper(self _ line):
        for word in line.split():
            yield word1

    # word相同的 會(huì)走到同一個(gè)reduce
    def reducer(self word counts):
        yield word sum(counts)

if __name__ == __main__:
    MRWordCount.run()

運(yùn)行WordCount代碼

打開(kāi)命令行 找到一篇文本文檔 敲如下命令:

python mr_word_count.py my_file.txt

運(yùn)行MRJOB的不同方式

1、內(nèi)嵌(-r inline)方式

特點(diǎn)是調(diào)試方便,啟動(dòng)單一進(jìn)程模擬任務(wù)執(zhí)行狀態(tài)和結(jié)果,默認(rèn)(-r inline)可以省略,輸出文件使用 > output-file 或-o output-file,比如下面兩種運(yùn)行方式是等價(jià)的

python word_count.py -r inline input.txt > output.txt python word_count.py input.txt > output.txt

2、Hadoop(-r hadoop)方式

用于hadoop環(huán)境

python word_count.py -r hadoop hdfs:///test.txt -o  hdfs:///output

支持Hadoop運(yùn)行調(diào)度控制參數(shù),如:

1)指定Hadoop任務(wù)調(diào)度優(yōu)先級(jí)(VERY_HIGH|HIGH)如:--jobconf mapreduce.job.priority=VERY_HIGH。

2)Map及Reduce任務(wù)個(gè)數(shù)限制,如:--jobconf mapreduce.map.tasks=2 --jobconf mapreduce.reduce.tasks=5

3 mrjob 實(shí)現(xiàn) topN統(tǒng)計(jì)(實(shí)驗(yàn))
統(tǒng)計(jì)數(shù)據(jù)中出現(xiàn)次數(shù)最多的前n個(gè)數(shù)據(jù)

import sys
from mrjob.job import MRJobMRStep
import heapq

class TopNWords(MRJob):
    def mapper(self _ line):
        if line.strip() != "":
            for word in line.strip().split():
                yield word1

    #介于mapper和reducer之間,用于臨時(shí)的將mapper輸出的數(shù)據(jù)進(jìn)行統(tǒng)計(jì)
    def combiner(self word counts):
        yield wordsum(counts)

    def reducer_sum(self word counts):
        yield None(sum(counts)word)

    #利用heapq將數(shù)據(jù)進(jìn)行排序,將最大的2個(gè)取出
    def top_n_reducer(self_word_cnts):
        for cntword in heapq.nlargest(2word_cnts):
            yield wordcnt

    #實(shí)現(xiàn)steps方法用于指定自定義的mapper,comnbiner和reducer方法
    def steps(self):
        #傳入兩個(gè)step 定義了執(zhí)行的順序
        return [
            MRStep(mapper=self.mapper
                   combiner=self.combiner
                   reducer=self.reducer_sum)
            MRStep(reducer=self.top_n_reducer)
        ]

def main():
    TopNWords.run()

if __name__==__main__:
    main()

4 MapReduce原理詳解
單機(jī)程序計(jì)算流程

輸入數(shù)據(jù)--->讀取數(shù)據(jù)--->處理數(shù)據(jù)--->寫入數(shù)據(jù)--->輸出數(shù)據(jù)

Hadoop計(jì)算流程

input data:輸入數(shù)據(jù)

InputFormat:對(duì)數(shù)據(jù)進(jìn)行切分,格式化處理

map:將前面切分的數(shù)據(jù)做map處理(將數(shù)據(jù)進(jìn)行分類,輸出(kv)鍵值對(duì)數(shù)據(jù))

shuffle&sort:將相同的數(shù)據(jù)放在一起,并對(duì)數(shù)據(jù)進(jìn)行排序處理

reduce:將map輸出的數(shù)據(jù)進(jìn)行hash計(jì)算,對(duì)每個(gè)map數(shù)據(jù)進(jìn)行統(tǒng)計(jì)計(jì)算

OutputFormat:格式化輸出數(shù)據(jù)
image.png

image.png

map:將數(shù)據(jù)進(jìn)行處理

buffer in memory:達(dá)到80%數(shù)據(jù)時(shí),將數(shù)據(jù)鎖在內(nèi)存上,將這部分輸出到磁盤上

partitions:在磁盤上有很多"小的數(shù)據(jù)",將這些數(shù)據(jù)進(jìn)行歸并排序。

merge on disk:將所有的"小的數(shù)據(jù)"進(jìn)行合并。

reduce:不同的reduce任務(wù),會(huì)從map中對(duì)應(yīng)的任務(wù)中copy數(shù)據(jù),在reduce中同樣要進(jìn)行merge操作

5 MapReduce架構(gòu)
MapReduce架構(gòu) 1.X
JobTracker:負(fù)責(zé)接收客戶作業(yè)提交,負(fù)責(zé)任務(wù)到作業(yè)節(jié)點(diǎn)上運(yùn)行,檢查作業(yè)的狀態(tài)
TaskTracker:由JobTracker指派任務(wù),定期向JobTracker匯報(bào)狀態(tài),在每一個(gè)工作節(jié)點(diǎn)上永遠(yuǎn)只會(huì)有一個(gè)TaskTracker
image.png

MapReduce2.X架構(gòu)

ResourceManager:負(fù)責(zé)資源的管理,負(fù)責(zé)提交任務(wù)到NodeManager所在的節(jié)點(diǎn)運(yùn)行,檢查節(jié)點(diǎn)的狀態(tài)
NodeManager:由ResourceManager指派任務(wù),定期向ResourceManager匯報(bào)狀態(tài)
{{image.png(uploading...)}}

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/126017.html

相關(guān)文章

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<