摘要:同時集成了機(jī)器學(xué)習(xí)類庫?;谟?jì)算框架,將的分布式計(jì)算應(yīng)用到機(jī)器學(xué)習(xí)領(lǐng)域。提供了一個簡單的聲明方法指定機(jī)器學(xué)習(xí)任務(wù),并且動態(tài)地選擇最優(yōu)的學(xué)習(xí)算法。宣稱其性能是的多倍。
介紹
spark是分布式并行數(shù)據(jù)處理框架
與mapreduce的區(qū)別:
mapreduce通常將中間結(jié)果放在hdfs上,spark是基于內(nèi)存并行大數(shù)據(jù)框架,中間結(jié)果放在內(nèi)存,對于迭代數(shù)據(jù)spark效率更高,mapreduce總是消耗大量時間排序,而有些場景不需要排序,spark可以避免不必要的排序所帶來的開銷,spark是一張有向無環(huán)圖,spark支持scala,python,java等
適用范圍:
spark更適合于迭代云端比較多的ml和dm運(yùn)算,因?yàn)閟park里面有rdd的抽象概念,spark比hadoop更通用,spark提供的數(shù)據(jù)集操作類型有很多,不像hadoop只提供map和reduce倆種操作,比如map,filter,flatmapt,sample,groupbykey,reducebykey,union,join,cogroup,mapvalues,sort,partionby等多種操作類型,spark
把這些操作稱為transformations,同時還提供count,collect,reduce,lookup,save等多種action操作。這些多種多樣的數(shù)據(jù)集操作類型,給開發(fā)上層應(yīng)用的用戶提供了方便,各個處理節(jié)點(diǎn)之間的通信模型不在像hadoop那樣就是唯一的data shuffle一種模式,用戶可以明明,物化,控制中間結(jié)果的存儲,分區(qū)等,可以說編程模型比hadoop更靈活。
spark是基于內(nèi)存的迭代計(jì)算框架,使用與需要多次操作特定數(shù)據(jù)集的應(yīng)用場合,需要反復(fù)操作的次數(shù)越多,所需要讀取的數(shù)據(jù)量越大,受益越大,數(shù)據(jù)量小但是計(jì)算密集度較大的場合,受益就相對較小. 不過由于rdd的特性,spark不適用那種一部細(xì)粒度更新狀態(tài)的應(yīng)用,例如web服務(wù)的存儲或者增量的web爬蟲和索引,就是對于那種增量修改的應(yīng)用模型不合適。
spark和hadoop的結(jié)合:
spark可以直接對hdfs進(jìn)行數(shù)據(jù)的讀寫,同樣支持spark on yarn。spark可以與mapreduce運(yùn)行于同集群中,共享存儲資源與計(jì)算,數(shù)據(jù)倉庫shark實(shí)現(xiàn)上借用hive,幾乎和hive完全兼容。
四種spark運(yùn)行模式,local模型用于測試開發(fā),standlone 獨(dú)立集群模式,spark on yarn spark在yarn上 ,spark on mesos spark在mesos上。
應(yīng)用:
企業(yè)大數(shù)據(jù)應(yīng)用: 1,count 平均值 2.分類,對比 3.趨勢,統(tǒng)計(jì)分析 4,精準(zhǔn)預(yù)測 人工智能
行業(yè)大數(shù)據(jù)案例:電商,傳媒,能源,交通
spark生態(tài)系統(tǒng)介紹:
spark 可以很容易和yarn結(jié)合,直接調(diào)用HDFS、Hbase上面的數(shù)據(jù),和hadoop結(jié)合。
spark核心部分分為RDD。Spark SQL、Spark Streaming、MLlib、GraphX、Spark R等核心組件解決了很多的大數(shù)據(jù)問題
Spark分為driver和executor,driver提交作業(yè),executor是application早worknode上的進(jìn)程,運(yùn)行task,driver對應(yīng)為sparkcontext。Spark的RDD操作有transformation、action。Transformation對RDD進(jìn)行依賴包裝,RDD所對應(yīng)的依賴都進(jìn)行DAG的構(gòu)建并保存,在worknode掛掉之后除了通過備份恢復(fù)還可以通過元數(shù)據(jù)對其保存的依賴再計(jì)算一次得到。當(dāng)作業(yè)提交也就是調(diào)用runJob時,spark會根據(jù)RDD構(gòu)建DAG圖,提交給DAGScheduler,這個DAGScheduler是在SparkContext創(chuàng)建時一同初始化的,他會對作業(yè)進(jìn)行調(diào)度處理。當(dāng)依賴圖構(gòu)建好以后,從action開始進(jìn)行解析,每一個操作作為一個task,每遇到shuffle就切割成為一個taskSet,并把數(shù)據(jù)輸出到磁盤,如果不是shuffle數(shù)據(jù)還在內(nèi)存中存儲。就這樣再往前推進(jìn),直到?jīng)]有算子,然后運(yùn)行從前面開始,如果沒有action的算子在這里不會執(zhí)行,直到遇到action為止才開始運(yùn)行,這就形成了spark的懶加載,taskset提交給TaskSheduler生成TaskSetManager并且提交給Executor運(yùn)行,運(yùn)行結(jié)束后反饋給DAGScheduler完成一個taskSet,之后再提交下一個,當(dāng)TaskSet運(yùn)行失敗時就返回DAGScheduler并重新再次創(chuàng)建。一個job里面可能有多個TaskSet,一個application可能包含多個job。
1、shark介紹:
shark基本上就是spark的框架基礎(chǔ)上提供和hive一樣的hivesql命令接口,為了最大程度的保持和hive的兼容性,shark使用hive的api來實(shí)現(xiàn)query parsing和logic plan generation,最后的physicalplan execution階段用spark代替hadoop mapreduce,用過配置shark參數(shù),shark可以自動在內(nèi)存中緩存特定的rdd,實(shí)現(xiàn)數(shù)據(jù)重用,進(jìn)而加快特定數(shù)據(jù)集的檢索,同時,shark通過udf用戶自定義函數(shù)實(shí)現(xiàn)特定的數(shù)據(jù)分析學(xué)習(xí)算法,使得sql數(shù)據(jù)查詢和運(yùn)算分析能結(jié)合在一起,最大化rdd的重復(fù)使用。
2、spark streaming介紹:
Spark Streaming 是 Spark 提供的對實(shí)時數(shù)據(jù)進(jìn)行流式計(jì)算的組件,一般與kafka結(jié)合,基本的原理是將stream數(shù)據(jù)分成小的時間片段,以類似batch批量處理的方式來處理這些小部分?jǐn)?shù)據(jù)。spark streaming構(gòu)建在spark上,一方面是因?yàn)閟park的低延遲執(zhí)行引擎可以用于實(shí)時計(jì)算,此外小批量的處理方式使得他可以同時兼容批量和實(shí)時數(shù)據(jù)處理的邏輯和算法,方便了一些需要?dú)v史數(shù)據(jù)和實(shí)時數(shù)據(jù)聯(lián)合分析的特定應(yīng)用場景。
Spark Streaming也有一個StreamingContext,其核心是DStream,是通過以組時間序列上的連續(xù)RDD來組成的,包含一個有Time作為key、RDD作為value的結(jié)構(gòu)體,每一個RDD都包含特定時間間隔的數(shù)據(jù)流,可以通過persist將其持久化。在接受不斷的數(shù)據(jù)流后,在blockGenerator中維護(hù)一個隊(duì)列,將流數(shù)據(jù)放到隊(duì)列中,等處理時間間隔到來后將其中的所有數(shù)據(jù)合并成為一個RDD(這一間隔中的數(shù)據(jù))。其作業(yè)提交和spark相似,只不過在提交時拿到DStream內(nèi)部的RDD并產(chǎn)生Job提交,RDD在action觸發(fā)之后,將job提交給jobManager中的JobQueue,又jobScheduler調(diào)度,JobScheduler將job提交到spark的job調(diào)度器,然后將job轉(zhuǎn)換成為大量的任務(wù)分發(fā)給spark集群執(zhí)行。
3、Graphx
主要用于圖的計(jì)算。核心算法有PageRank、SVD奇異矩陣、TriangleConut等。
4、Spark SQL
是Spark新推出的交互式大數(shù)據(jù)SQL技術(shù)。把sql語句翻譯成Spark上的RDD操作可以支持Hive、Json等類型的數(shù)據(jù)。
5、Spark R
通過R語言調(diào)用spark,目前不會擁有像Scala或者java那樣廣泛的API,Spark通過RDD類提供Spark API,并且允許用戶使用R交互式方式在集群中運(yùn)行任務(wù)。同時集成了MLlib機(jī)器學(xué)習(xí)類庫。
6、MLBase
從上到下包括了MLOptimizer(給使用者)、MLI(給算法使用者)、MLlib(給算法開發(fā)者)、Spark。也可以直接使用MLlib。ML Optimizer,一個優(yōu)化機(jī)器學(xué)習(xí)選擇更合適的算法和相關(guān)參數(shù)的模塊,還有MLI進(jìn)行特征抽取和高級ML編程 抽象算法實(shí)現(xiàn)API平臺,MLlib分布式機(jī)器學(xué)習(xí)庫,可以不斷擴(kuò)充算法。MLRuntime基于spark計(jì)算框架,將Spark的分布式計(jì)算應(yīng)用到機(jī)器學(xué)習(xí)領(lǐng)域。MLBase提供了一個簡單的聲明方法指定機(jī)器學(xué)習(xí)任務(wù),并且動態(tài)地選擇最優(yōu)的學(xué)習(xí)算法。
7、Tachyon
高容錯的分布式文件系統(tǒng)。宣稱其性能是HDFS的3000多倍。有類似java的接口,也實(shí)現(xiàn)了HDFS接口,所以Spark和MR程序不需要任何的修改就可以運(yùn)行。目前支持HDFS、S3等。
rdd是spark最基本,也是最根本的數(shù)據(jù)抽象,RDD表示分布在多個計(jì)算節(jié)點(diǎn)上的可以并行操作的元素集合,rdd是只讀的,分區(qū)記錄的集合。
rdd支持兩種操作,1,轉(zhuǎn)換從現(xiàn)有的數(shù)據(jù)集創(chuàng)建一個新的數(shù)據(jù)集,2,動作 在數(shù)據(jù)集上運(yùn)行計(jì)算后,返回一個值給驅(qū)動程序,例如,map就是一種轉(zhuǎn)換,他將數(shù)據(jù)集每一個元素都傳遞給函數(shù),并返回一個新的分布數(shù)據(jù)集表示結(jié)果,另一個方面,reduce是一個動作,通過一些函數(shù)將所有的元組疊加起來,并將結(jié)果返回給driver程序,spark中的所有轉(zhuǎn)換都有惰性的,也就是說,他們并不會直接計(jì)算結(jié)果,相反的,他們只是記住應(yīng)用哦個到基礎(chǔ)數(shù)據(jù)集上的這些轉(zhuǎn)換動作,例如,我們可以實(shí)現(xiàn),通過map創(chuàng)建的一個新數(shù)據(jù)集,并在reduce使用,最終只返回reduce的結(jié)果給driver,而不是整個大的新數(shù)據(jù)集。默認(rèn)情況下,每個轉(zhuǎn)換過的rdd都會在你在他之上執(zhí)行一個動作時被重新計(jì)算,不過,你也可以使用persist方法,持久話一個rdd在內(nèi)存中,在這種情況下,spark將會在集群中,保存相關(guān)元素,下次你查詢這個rdd是,他將能更快訪問,在磁盤上持久化數(shù)據(jù)集,或在集群間賦值數(shù)據(jù)集也是支持的。除了這些操作外,用戶還可以請求將rdd緩存起來,而且,用戶還可以通過partitioner類獲取rdd的分區(qū)順序,然后將另一個rdd按照同樣的方式分區(qū)。
如何操作rdd?
1、如何獲取rdd 1,從共享的文件系統(tǒng)獲取,hdfs,2.通過已存在的rdd轉(zhuǎn)換 3.將已存在的scala集合并行化,通過調(diào)用sparkcontext的parallelize方法實(shí)現(xiàn) 4.改變現(xiàn)有rdd的之久性,rdd是懶散,短暫的
2、操作rdd的倆個動作,1,actions:對數(shù)據(jù)集計(jì)算后返回一個數(shù)值value給驅(qū)動程序,例如redue將數(shù)據(jù)集的所有元素用某個函數(shù)聚合后,將最終結(jié)果返回給程序,2.transformation 根據(jù)數(shù)據(jù)集創(chuàng)建一個新的數(shù)據(jù)集,計(jì)算后返回一個新rdd;例如map將數(shù)據(jù)的每個元素講過某個函數(shù)計(jì)算后,返回一個姓的分布式數(shù)據(jù)集。
actions具體內(nèi)容:
reduce(func)通過函數(shù)func聚集數(shù)據(jù)集中所有元素,func函數(shù)接受2個參數(shù),返回一個值,這個函數(shù)必須是關(guān)聯(lián)性的,確??梢员徽_的并發(fā)執(zhí)行。
collect() 在driver的程序中,以數(shù)組的形式,返回數(shù)據(jù)集的所有元素,這通常會在使用filter或者其他操作后,返回一個縱溝小的數(shù)據(jù)自己在使用,直接將整個rdd集coloect返回,很可能會讓driver程序oom。
count() 返回數(shù)據(jù)集的元素個數(shù)
take(n) 返回一個數(shù)組,用數(shù)據(jù)集的前n個元素組成,注意,這個操作目前并非在多個節(jié)點(diǎn)上,并行執(zhí)行,而是driver程序所在機(jī)制,單機(jī)計(jì)算所有的元素:注;gateway的內(nèi)存壓力會增大,需要謹(jǐn)慎使用
first()返回數(shù)據(jù)集的第一個元素
saveAsTextFile(path) 將數(shù)據(jù)集的元素,以txtfile的形式,保存到本地文件系統(tǒng),hdfs或者其他hadoop支持的文件系統(tǒng),spark將會調(diào)用每個元素的tostring方法,并將他轉(zhuǎn)換成文件中一行文本。
saveAsSequenceFile(path)將數(shù)據(jù)集的元素,以sequencefile的格式,到指定的目錄下,本地系統(tǒng),hdfs或者其他hadoop支持的文件系統(tǒng),rdd的元組必須有key-value對組成,并都實(shí)現(xiàn)了hadoop的- writable接口或隱式可以轉(zhuǎn)換為wirtable
foreach(func)在數(shù)據(jù)集的每個元素上,運(yùn)行函數(shù)func,這通常用于更新一個累加器變量,或者和外部存儲系統(tǒng)做交互。直接使用 rdd.foreach(println) 在local模式下是可行的,但是在cluster模式下是不行的,必須要執(zhí)行collect()方法,將所有的數(shù)據(jù)拉取到本地,然后執(zhí)行foreach()操作。如果是數(shù)據(jù)量比較小的話可以使用take方法,rdd.take(100).foreach(println)
transformation具體內(nèi)容:
map(func) 返回一個新的分布式數(shù)據(jù)集,有每個原元素經(jīng)過func函數(shù)轉(zhuǎn)換后組成
filter(func) 返回一個新的數(shù)據(jù)集,有經(jīng)過func函數(shù)后返回值為true的原元素組成
flatmap(func)類似于map 但是每一個輸入元素,會被映射0到多個輸出元素,因此func函數(shù)的返回值是一個seq,而不是單一元素
sample(withReplacement,frac,seed) 給定的隨機(jī)種子seed,隨機(jī)抽樣出數(shù)量為frac的數(shù)據(jù)
union(otherdataset)返回一個新的數(shù)據(jù)集,由原數(shù)據(jù)集和參數(shù)聯(lián)合而成
intersection : 只返回兩個RDD中都有的元素,intersecton()在運(yùn)行時會去掉所有重復(fù)的元素(單個RDD內(nèi)重復(fù)元素也會一起移除)。 需要通過網(wǎng)絡(luò)混洗來發(fā)現(xiàn)共有數(shù)據(jù)。
distinct : 生成一個只包含不同元素的新RDD。需要注意:distinct() 操作的開銷很大,因?yàn)樗枰獙⑺袛?shù)據(jù)通過網(wǎng)絡(luò)進(jìn)行混洗(shuffle),以確保每個元素只有一份。
subtract : 接受另一個RDD作為參數(shù),返回一個由只存在在第一個RDD而不存在第二個RDD中的所有元素組成的RDD。 需要數(shù)據(jù)混洗。
cartesian : 返回所有可能的(a,b)對,其中a是源RDD中的元素,b是另一個RDD中的元素。
groupbykey(【num tasks】)在一個有kv對組成的數(shù)據(jù)集上調(diào)用,返回一個k,seq【v】對的數(shù)據(jù)集,注意,默認(rèn)情況下,使用8個并行任務(wù)進(jìn)行分組,你可以傳入num task可選參數(shù),根絕數(shù)據(jù)量設(shè)置不同數(shù)目的task
reducebykey(func,【num tasks】)在一個kv對的數(shù)據(jù)集上使用,返回一個kv的數(shù)據(jù)集,key相同的值都被使用指定的reduce函數(shù)聚合在一起,和groupbykey類似,任務(wù)個數(shù)是第二個參數(shù)來配置
join(otherdataset,【num tasks】)在類型kev和kw類型的數(shù)據(jù)集上調(diào)用,返回一個k(v w)對,每個key中所有元素都在一起的數(shù)據(jù)集
groupwith(otherdataset,【num tasks】)在類型為kv和kw類型的數(shù)據(jù)集上調(diào)用,返回一個數(shù)據(jù)集,組成元組為k seq【v】seq[w]tuples ,這個在其他框架稱為cogroup
cartesian(otherdataset) 笛卡兒積,但在數(shù)據(jù)集t和u調(diào)用是,返回一個tu對的數(shù)據(jù)集,所有元素交互進(jìn)行笛卡兒積。
持久化(緩存)
persist()
cache()
基本開發(fā)思路每個saprk應(yīng)用都有一個驅(qū)動器程序來發(fā)起集群上的各種并行操作。驅(qū)動器程序通過一個SparkContext對象來訪問Spark。這個對象代表對計(jì)算集群的一個連接。
一旦有了SparkContext,你就可以用它來創(chuàng)建RDD。要執(zhí)行這些操作,啟動器程序一般要管理多個執(zhí)行器(executor)節(jié)點(diǎn)。
可以先通過SparkConf對象來配置你的應(yīng)用,然后基于這個SparkConf創(chuàng)建一個SparkContext對象。
創(chuàng)建SparkConf的基本方法,傳遞兩個參數(shù):
1、集群URL:告訴Spark如何連接到集群上。
2、應(yīng)用名:當(dāng)連接到一個集群式,這個值可以幫助你在集群管理器的用戶界面中找到你的應(yīng)用。
關(guān)閉Spark:調(diào)用SparkContext的stop()方法?;蛑苯油顺鰬?yīng)用。(system.exit(0)/sys.exit())
在Spark中,對數(shù)據(jù)的所有操作不外乎是: 創(chuàng)建RDD、 轉(zhuǎn)化已有的RDD、調(diào)用RDD操作進(jìn)行求值
Spark中的RDD是一個不可變的分布式對象集合。每個RDD都被分為多個分區(qū),這些分區(qū)運(yùn)行在集群中的不同節(jié)點(diǎn)上。
當(dāng)我們調(diào)用一個新的行動操作時,整個RDD都會從頭開始計(jì)算。要避免這種行為,用戶可以將中間結(jié)果持久化。
1、初始化sparkcontext
from pyspark import SparkConf, SparkContxt conf = SparkConf().setMaster("local").setAppName("my app") sc = SparkContext(conf=conf) # 關(guān)閉連接 sc.stop()
2、RDD編程
# 從文件讀取數(shù)據(jù) line = sc.textFile("README.md") # parallelize 方法 line = sc.parallelize(["pandas","i like pandas"]) inputRDD = sc.textFile("log.txt") errRDD = inputRDD.filter(lambda x:"error" in x) warnRDD = inputRDD.filter(lambda x:"warning" in x) bindRDD = errRDD.union(warnRDD) bindRDD.count() bindRDD.take(10) # 返回全部數(shù)據(jù)集 bindRDD.collect() # lambda 函數(shù) word = rdd.filter(lambda s:"python" in s) # def 定義的函數(shù) def containsErr(s): return "error" in s word = rdd.filter(containsErr)
2.1、RDD常見轉(zhuǎn)換操作
以 rdd={1,2,3,3} 為例的轉(zhuǎn)換操作
# 將函數(shù)應(yīng)用與RDD中的每個元素,將返回值構(gòu)建新的RDD rdd.map(x => x+1) # 將函數(shù)應(yīng)用用RDD中的每個元素,將返回的迭代器的所有內(nèi)容構(gòu)成新的RDD。通常用于切分單詞。 rdd.flatMap(x=>x.to(3)) --> {1,2,3,2,3,3,3}) # 返回一個由通過傳給filter()的函數(shù)的元素組成的RDD rdd.filter(x=>x!=1) --> {2,3,4} # 去重 rdd.distinct() --> {1,2,3} sample(withReplacement,fraction,[seed]) # 對RDD進(jìn)行采樣,以及是否替換 rdd.sample(false,0.5) --> 非確定的
以{1,2,3}和{3,4,5}的RDD轉(zhuǎn)換操作
# 求并集 rdd.union(other) --> {1,2,3,4,5} # 求交集 rdd.intersection(other) --> {3} # 移除一個RDD中的內(nèi)容,相當(dāng)于減去一個交集 rdd.subtract(other) --> {1,2} # 與另一個RDD的笛卡爾積 rdd.cartesian(other) --> {(1,3),(1,4)...(3,5)}
2.2、RDD常見行動操作
以{1,2,3,3}為列說明常見行動操作
# 返回RDD中所有的元素 rdd.collect() --> {1,2,3,3} # 計(jì)數(shù) rdd.count() # 各元素在RDD中出現(xiàn)的次數(shù) rdd.countByValue() --> {(1,1),(2,1),(3,2)} take(num) # 返回前n元素 top(n) # 排序后的前n個元素 # 按照指定順序,從rdd中返回前n個元素 rdd.takeOrdered(2)(myOrdering) --> {3,3} takeSample(withReplacement,num,[seed]) # 從RDD中返回任意一些元素 rdd.takeSample(false,1) --> 非確定的 # 并行整合rdd中所有的數(shù)據(jù),比如sum rdd.reduce((x,y)=>x+y) --> 9 fold(zero)(func) # 和reduce()一樣,但是需要提供初始值 rdd.fold(0)((x,y)=>x+y) --> 9 aggregate(zeroValue)(seqOp,combOp) # 和reduce類似,但是通常返回不同類型的函數(shù) aggregate((0,0))((x,y)=>(x._1+y,x._2+1), (x,y)=>(x._1+y._1,x._2+y._2) ) --> 9 # 對RDD中的每個元素使用給定的函數(shù) rdd.foreach(func)
2.3、持久化緩存
from pyspark.storage import StorageLvel rdd.presist(StoragLevel.DISK_ONLY) RDD.cache() # 緩存的級別 # MEMORY_ONLY # MEMORY_ONLY_SER # MEMORY_AND_DISK # 如果內(nèi)存放不下,則溢出寫到磁盤上 # MEMORY_AND_DISK_SER # 如果內(nèi)存放不下,則溢出寫到磁盤上,在內(nèi)存中存放序列化后的數(shù)據(jù) # DISK_ONLY # 移除緩存 RDD.unpersist()
3、鍵值對操作
# 以{(1,2),(3,4),(3,6)}為例 # 合并具有形同鍵的值 rdd.reduceByKey((x,y)=>x+y) -->{(1,2),(3,10)} # 對具有相同鍵的值分組 rdd.groupByKey() --->{(1,[2]),(3,[4,6])} combineByKey(createCombiner,mergeValue,mergeComBiners,partitioner) # 使用不同的返回類型合并具有相同鍵的值。有多個參數(shù)分別對應(yīng)聚合操作的各個階段,因而非常適合用來解釋聚合操作各個階段的功能劃分。 # 下面是求每個鍵的平均值 sumCount=num.combineByKey((lambda x:(x,1)), (lambda x,y:(x[0]+y,x[1]+1)), (lambda x,y:(x[0+y[0],x[1]+y[1]]))) sumCount.map(lambda key,xy:(key,xy[0]/xy[1])).collectAaMap() # 對pairRDD的每個值應(yīng)用一個函數(shù)而不改變鍵 rdd.mapVlues(x=>x+1) # 對pairRDD的每個值應(yīng)用一個返回迭代器的函數(shù),然后對返回的每個元素都生成一個對應(yīng)原鍵的鍵值對記錄,通常用于符號化 rdd.flatMapValues(x=>(x to 5)) -->{(1,2),(1,3),(1,4),(1,5),(3,4),(3,5)} # 返回一個僅含有鍵的RDD rdd.keys() ->{1,3,3} # 返回一個僅包含值的RDD rdd.values() -->{2,4,6} # 返回一個根據(jù)鍵排序的RDD rdd.sortByKey(ascending=True) -->{(1,2),(3,4),(3,6)}
3.1、兩個鍵值對RDD的轉(zhuǎn)換操作
# 以rdd={(1,2),(3,4),(3,6)} other={(3,9)} 為例 # 刪除rdd中鍵與other中鍵相同的元素 rdd.subtracByKey(other) --> {(1,2)} # 對兩個rdd內(nèi)鏈接 rdd.join(other) --> {(3,(4,9)),(3,(6,9))} # 對兩個rdd進(jìn)行連接操作,確保第一個rdd中的鍵必須存在(右外鏈接) rdd.rightOuterJoin(other) --> {(3,(some(4),9)),(3,(some(6),9))} # 對兩個rdd進(jìn)行連接操作,確保第二個rdd中的鍵必須存在(左外連接) rdd.leftOuterJoin(other) --> {(1,(2,None)),(3,(4,some(9))),(3,(6,some(9)))} # 將兩個rdd中擁有相同鍵的數(shù)據(jù)分組到一起 rdd.congroup(other) --> {(1,([2],[])),(3,([4,6],[9]))}
3.2、鍵值對Pair RDD的行動操作
# 以 rdd={(1,2),(3,4),(3,6)} 為例 # 對每個鍵對應(yīng)的元素分別計(jì)數(shù) rdd.countByKey() --> {(1,1,),(3,2)}) # 將結(jié)果以映射表的形式返回,以便查詢 rdd.collectAsMap() --> Map{(1,2),(2,6)} # 返回給定鍵對應(yīng)的所有值 rdd.lookup(3) --> [4,6]
4、并行度調(diào)優(yōu)
每個rdd都有固定數(shù)目的分區(qū),分區(qū)數(shù)決定了在rdd上執(zhí)行操作的并行度。 大多數(shù)操作符都能接受第二個參數(shù),用來指定分組結(jié)果或者聚合結(jié)果的rdd的分區(qū)數(shù)。
比如 sc.parallelize(data).reduceByKey(lambda x,y:x+y,10) 指定分區(qū)數(shù)10
查看分區(qū)數(shù) rdd.partitions.size或rdd.getNumPartitions ,改變分區(qū)的方法repartition()
5、數(shù)據(jù)讀取與保存
讀取txt文件,輸入的每一行都會成為RDD的一個元素。
# 讀取文件 input=sc.textFile("file:///home/holden/README.md") # 保存文件 result.saveAsTextFile(outputFile)
讀取json
# 將json文件的每一行假設(shè)為一條記錄來處理 import json data = input.map(lambda x:json.load(x)) # 寫 (data.filter(lambda x:x[lovesPandas"]).map(lambda x:json.dumps(x)).saveAsTextFile(outputFile))
讀取csv,同樣是將讀取的文本的每一行當(dāng)做一條記錄
import csv from io import StringIO def loadRecord(line): """解析一行csv記錄""" input = StringIO(line) reader = csv.DictReader(input,filednames=["name","favouriteAnimal"]) return reader.next() input = sc.textFile(inputFile).map(loadRecord) # 保存csv def writeRecords(records): """寫出一些csv記錄""" output = StringIO() writer = csv.DictWriter(output,fieldnames=["name","favoriteAnimal"]) for record in records: writer.writerow(record) return [output.getvalue()] pandaLovers.mapPartitions(writeRecords).saveAsTextFile(outputFile)
讀取SequenceFile
Hadoop輸入輸出格式
關(guān)系型數(shù)據(jù)庫
HBase
6、Spark進(jìn)階編程
6.1、兩種類型的共享變量
累加器(qccumulator):用于對信息聚合,提供了將工作節(jié)點(diǎn)中的值聚合到驅(qū)動器程序中的簡單語法。
廣播變量(broadcast variable):用來高效分發(fā)較大的對象,讓程序高效地向所有工作節(jié)點(diǎn)發(fā)送一個較大的值,以供一個或多個spark操作使用。
# 在python中累加空行,使用了累加器 file = sc.textFile(inputFile) # 創(chuàng)建累加器并初始化為0 blankLine=sc.accumulator(0) def extractCallSigs(line): global blankLine # 訪問全局變量 if (line==""): blankLine+=1 return line.split(" ") callSigns = file.flatMap(extractCallSigns) callSigns.saveAsTextFile(output) # 使用廣播變量查詢國家 # 查詢rdd中呼叫號對應(yīng)的位置,將呼號前綴讀取為國家代碼來進(jìn)行查詢 signPrefixes = sc.broadcast(loadCallSignTable()) # 廣播變量 def processSignCount(sign_count,signPrefixes): country=lookupCountry(sign_count[0],signPrefixes.value) count = sign_count[1] return (country,count) countryContactCounts=(contactCounts.map(processSignCount).reduceByKey((lambda x,y:x+y))) countryContactCounts.saveAsTextFile(output)
基于分區(qū)進(jìn)行操作
spark提供基于分區(qū)的map和foreach,使部分代碼只對rdd的每個分區(qū)運(yùn)行一次,可以幫助降低這些操作的代價。
# 按照分區(qū)執(zhí)行的操作符 mapPartitions() # 參數(shù):該分區(qū)中元素的迭代器。返回:元素的迭代器 # 對于RRD[T]的函數(shù)簽名 :f:(iterator[T]) --> iterator[U] mapPartitionsWithIndex() # 參數(shù):分區(qū)序號,以及每個分區(qū)中的元素的迭代器。返回:元素的迭代器 # 對于RRD[T]的函數(shù)簽名 :f:(int,iterator[T]) --> iterator[U] foreachPartitions() # 參數(shù):元素迭代器。返回:無 # 對于RRD[T]的函數(shù)簽名 :f:(iterator(T)) -->Unit
數(shù)值RDD的操作
count() # RDD中元素個數(shù) mean() # 元素平均值 sum() # max() min() variance() # 方差 sampleVariance() # 從采樣中計(jì)算出的方差 stdev() # 標(biāo)準(zhǔn)差 sampleStdev() # 采用的標(biāo)準(zhǔn)差
7、基于MLlib的機(jī)器學(xué)習(xí)
# 邏輯回歸的垃圾郵件分類 from pyspark.mllib.regression import LabeldPoint from pyspark.mllib.feature import HashingTF from pyspark.mllib.classification import LogisticRegressionWithSGD spam=sc.textFile("spam.txt") normal = sc.textFile("normal.txt") # 創(chuàng)建一個HashingTF實(shí)例來把郵件文本映射為包含10000個特征的向量 tf=HashingTF(numFeatures=10000) # 各郵件都切分為單詞,每個單詞映射為一個特征 spamFeatures = spam.map(lambda email: tf.transForm(email.split(" "))) normalFeatures = normal.map(lambda email: tf.transform(email.split(" "))) # 創(chuàng)建LabelPoint數(shù)據(jù)集分別存放陽性(垃圾郵件)和陰性(正常郵件)的例子 positiveExample = spamFeatures.map(lambda features:LabeldPoint(1,features)) negativeExamples = normalFeatures.map(lambda features:labeldPoint(0,features)) trainingData = positiveExample.union(negativeExample) trainingData.cache() # 因?yàn)檫壿嫽貧w是迭代算法,所以需要緩存訓(xùn)練數(shù)據(jù)RDD # 使用SGD算法 model = LogisticRegressionWithSGD.train(trainningData) # 以陽性和陰性的例子分別測試。 # 首先用一樣的HashingTF特征來得到特征向量,然后對該向量應(yīng)用得到的模型 posTest = tf.transform("O M G GET cheap stuff by sending money to ...".split(" ")) negTest = tf.transform("Hi Dad, i started studying spark the other ...".split(" ")) print( "predict for postive test example:%g" % model.predict(posTest)) print( "predict for negative test example:%g" % model.predict(negTest))
MLlib包含一些特有的數(shù)據(jù)類型,對于Scala和Java,它們位于org.apache.spark.mllib下,對于Python則是位于pyspark.mllib下。
入門:spark有兩個重要的抽象:
RDD,分布式彈性數(shù)據(jù)集,他是一個跨越多個節(jié)點(diǎn)的分布式集合。
另一個抽象是共享變量。spark支持兩種類型的共享變量:一個是廣播(broadcast variables)他可以緩存一個值在集群的各個節(jié)點(diǎn)。另一個是累加器(accumulators)他只能執(zhí)行累加的操作,比如可以做計(jì)數(shù)器和求和。
初始化 Spark
在一個Spark程序中要做的第一件事就是創(chuàng)建一個SparkContext對象來告訴Spark如何連接一個集群。為了創(chuàng)建SparkContext,你首先需要創(chuàng)建一個SparkConf對象,這個對象會包含你的應(yīng)用的一些相關(guān)信息。這個通常是通過下面的構(gòu)造器來實(shí)現(xiàn)的:
new SparkContext(master, appName, [sparkHome], [jars])
參數(shù)說明:
master:用于指定所連接的 Spark 或者 Mesos 集群的 URL。
appName :應(yīng)用的名稱,將會在集群的 Web 監(jiān)控 UI 中顯示。
sparkHome:可選,你的集群機(jī)器上 Spark 的安裝路徑(所有機(jī)器上路徑必須一致)。
jars:可選,在本地機(jī)器上的 JAR 文件列表,其中包括你應(yīng)用的代碼以及任何的依賴,Spark 將會把他們部署到所有的集群結(jié)點(diǎn)上。
在 python 中初始化,示例代碼如下:
//conf = SparkContext("local", "Hello Spark") conf = SparkConf().setAppName("Hello Spark").setMaster("local") sc = SparkContext(conf=conf)
說明:如果部署到集群,在分布式模式下運(yùn)行,最后兩個參數(shù)是必須的,第一個參數(shù)可以是以下任一種形式:
Master URL 含義
local 默認(rèn)值,使用一個 Worker 線程本地化運(yùn)行(完全不并行)
local[N] 使用 N 個 Worker 線程本地化運(yùn)行,N 為 * 時,表示使用系統(tǒng)中所有核
local[N,M] 第一個代表的是用到的核個數(shù);第二個參數(shù)代表的是容許該作業(yè)失敗M次
spark://HOST:PORT 連接到指定的 Spark 單機(jī)版集群 master 進(jìn)程所在的主機(jī)和端口
mesos://HOST:PORT 連接到指定的 Mesos 集群。host 參數(shù)是Moses master的hostname。端口默認(rèn)是5050
如果你在一個集群上運(yùn)行 spark-shell,則 master 參數(shù)默認(rèn)為 local。在實(shí)際使用中,當(dāng)你在集群中運(yùn)行你的程序,你一般不會把 master 參數(shù)寫死在代碼中,而是通過用 spark-submit 運(yùn)行程序來獲得這個參數(shù)。但是,在本地測試以及單元測試時,你仍需要自行傳入 local 來運(yùn)行Spark程序。
運(yùn)行代碼有幾種方式,一是通過 spark-shell 來運(yùn)行 scala 代碼,一是編寫 java 代碼并打成包以 spark on yarn 方式運(yùn)行,還有一種是通過 PySpark 來運(yùn)行 python 代碼。
在 spark-shell 和 PySpark 命令行中,一個特殊的集成在解釋器里的 SparkContext 變量已經(jīng)建立好了,變量名叫做 sc,創(chuàng)建你自己的 SparkContext 不會起作用。
org.apache.spark spark-core_2.10 2.1.1 junit junit 4.12 test
創(chuàng)建一個簡單的spark程序:
public class SimpleApp { public static void main(String[] args) { // 文件路徑 String logFile = "/home/wm/apps/spark-1.4.0-bin-hadoop2.6/README.md"; SparkConf conf = new SparkConf().setAppName("Simple Application").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDDlogData = sc.textFile(logFile).cache(); @SuppressWarnings("serial") long numAs = logData.filter(new Function () { public Boolean call(String s) throws Exception { return s.contains("a"); } }).count(); @SuppressWarnings("serial") long numBs = logData.filter(new Function () { public Boolean call(String s) throws Exception { return s.contains("b"); } }).count(); System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs); sc.close(); } }
Spark的核心就是圍繞著RDD,它是一個自動容錯的分布式數(shù)據(jù)集合。他有兩種方式創(chuàng)建,第一種就是在驅(qū)動程序中對一個集合進(jìn)行并行化。第二種是來源于一個外部的存儲系統(tǒng)。比如:共享系統(tǒng)、HDFS、HBase或者任何提供任何Hadoop 輸入格式的數(shù)據(jù)源。
第一種:Parallelized Collections 創(chuàng)建這個集合需要調(diào)用那個JavaSparkContext的parallelize方法來初始化一個已經(jīng)存在的集合。
Listdata = Arrays.asList(1,2,3,4,5); JavaRDD distData = sc.parallelize(data);
這就創(chuàng)建了一個并行的集合,在這個集合上可以執(zhí)行 distData.reduce((a, b) -> a + b)
在并行數(shù)組中一個很重要的參數(shù)是partitions,它來描述數(shù)組被切割的數(shù)據(jù)集數(shù)量。Spark會在每一個partitions上運(yùn)行任務(wù),這個partitions會被spark自動設(shè)置,一般都是集群中每個CPU上運(yùn)行2-4partitions,但是也可以自己設(shè)置,可以通過parallelize (e.g. sc.parallelize(data, 10)),在有些地方把partitions成為 slices。
第二種:External Datasets
JavaRDD distFile = sc.textFile("data.txt");
textFile也可以設(shè)置partitions參數(shù),一般都是一個block一個partitions,但是也可以自己設(shè)置,自己設(shè)置必須要不能少于block的數(shù)量。
針對Hadoop的其他輸入格式,你能用這個JavaSparkContext.hadoopRDD方法,你需要設(shè)置JobConf和輸入格式的類。也可以使用JavaSparkContext.newAPIHadoopRDD針對輸入格式是基于“new”的MapReduceAPI
先將測試數(shù)據(jù)上傳到 hdfs:
$ hadoop fs -put access.log
然后,編寫一個 python 文件,保存為 SimpleApp.py:
from pyspark import SparkContext logFile = "access.log" sc = SparkContext("local", "Simple App") rdd = sc.textFile(logFile).cache() counts = rdd.map(lambda line: line.split()[8]).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b).sortByKey(lambda x: x) # This is just a demo on how to bring all the sorted data back to a single node. # In reality, we wouldn"t want to collect all the data to the driver node. output = counts.collect() for (word, count) in output: print "%s: %i" % (word, count) counts.saveAsTextFile("/data/result") sc.stop()
接下來,運(yùn)行下面代碼:
$ spark-submit --master local[4] SimpleApp.pydemo(java) 統(tǒng)計(jì)單詞出現(xiàn)次數(shù)
JavaRDDdemo (java) 讀取HDFS中的數(shù)據(jù),并簡單分析,最后結(jié)果寫入mysql數(shù)據(jù)庫中。lines = sc.textFile("data.txt"); JavaPairRDD pairs = lines.mapToPair(s -> new Tuple2(s, 1)); JavaPairRDD counts = pairs.reduceByKey((a, b) -> a + b);
org.apache.spark spark-core_2.10 2.11 mysql mysql-connector-java 5.1.13 org.apache.hadoop hadoop-client 2.6.0 junit junit 4.12 test
由于需要讀取HDFS中的數(shù)據(jù),所以需要hadoop-client文件
在main函數(shù)中首先創(chuàng)建JavaSparkcontext對象。
SparkConf conf = new SparkConf().setAppName("FindError"); JavaSparkContext sc = new JavaSparkContext(conf);
/** * * 列出指定目錄中的文件,這里的文件是不包括子目錄的。 * @param pathOfDirectory * 目錄路徑 * @return * @throws IOException */ public static String[] findFilePathFromDir(String dst) throws IOException { SetfilePathSet = new HashSet (); String[] result = null; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(dst), conf); FileStatus fileList[] = fs.listStatus(new Path(dst)); int size = fileList.length; for (int i = 0; i < size; i++) { filePathSet.add(fileList[i].getPath().toString()); } if (filePathSet.size() > 0) { result = new String[filePathSet.size()]; int i = 0; for (String str : filePathSet) { result[i++] = str; } } fs.close(); return result; }
依次遍歷文件路徑并為每個文件創(chuàng)建一個新的RDD然后計(jì)算出這個文件中包涵ERROR字符串的行數(shù)。
Mapresult = new HashMap (); if (filePaths != null) { for (String path : filePaths) { result.put(path, sc.textFile(path).filter(new Function () { public Boolean call(String line) throws Exception { return line.contains("ERROR"); } }).count()); } }
將results中的數(shù)據(jù)寫入mysql中
/** * 將結(jié)果寫入mysql中 * @param result * @throws Exception */ public static void wirteResultToMysql(Map共享變量result) throws Exception { String DBDRIVER = "com.mysql.jdbc.Driver"; //連接地址是由各個數(shù)據(jù)庫生產(chǎn)商多帶帶提供的,所以需要多帶帶記住 String DBURL = "jdbc:mysql://ip:3306/test"; //連接數(shù)據(jù)庫的用戶名 String DBUSER = "root"; //連接數(shù)據(jù)庫的密碼 String DBPASS = "root"; Connection con = null; //表示數(shù)據(jù)庫的連接對象 PreparedStatement pstmt = null; //表示數(shù)據(jù)庫更新操作 String sql = "insert into aaa values(?,?)"; Class.forName(DBDRIVER); //1、使用CLASS 類加載驅(qū)動程序 con = DriverManager.getConnection(DBURL,DBUSER,DBPASS); //2、連接數(shù)據(jù)庫 pstmt = con.prepareStatement(sql); //使用預(yù)處理的方式創(chuàng)建對象 if (result != null) { for (String str : result.keySet()) { pstmt.setString(1, str); pstmt.setLong(2, result.get(str)); pstmt.addBatch(); } } //pstmt.executeUpdate(); //執(zhí)行SQL 語句,更新數(shù)據(jù)庫 pstmt.executeBatch(); pstmt.close(); con.close(); // 4、關(guān)閉數(shù)據(jù)庫 }
通常情況下,當(dāng)一個函數(shù)傳遞給一個在遠(yuǎn)程集群節(jié)點(diǎn)上運(yùn)行的Spark操作(比如map和reduce)時,Spark會對涉及到的變量的所有副本執(zhí)行這個函數(shù)。這些變量會被復(fù)制到每個機(jī)器上,而且這個過程不會被反饋給驅(qū)動程序。通常情況下,在任務(wù)之間讀寫共享變量是很低效的。但是,Spark仍然提供了有限的兩種共享變量類型用于常見的使用場景:廣播變量和累加器。
1、廣播變量
廣播變量允許程序員在每臺機(jī)器上保持一個只讀變量的緩存而不是將一個變量的拷貝傳遞給各個任務(wù)。它們可以被使用,比如,給每一個節(jié)點(diǎn)傳遞一份大輸入數(shù)據(jù)集的拷貝是很低效的。Spark 試圖使用高效的廣播算法來分布廣播變量,以此來降低通信花銷。 可以通過 SparkContext.broadcast(v) 來從變量 v 創(chuàng)建一個廣播變量。這個廣播變量是 v 的一個包裝,同時它的值可以功過調(diào)用 value 方法來獲得。以下的代碼展示了這一點(diǎn):
broadcastVar = sc.broadcast([1, 2, 3])>>> broadcastVar.value [1, 2, 3]
在廣播變量被創(chuàng)建之后,在所有函數(shù)中都應(yīng)當(dāng)使用它來代替原來的變量v,這樣就可以保證v在節(jié)點(diǎn)之間只被傳遞一次。另外,v變量在被廣播之后不應(yīng)該再被修改了,這樣可以確保每一個節(jié)點(diǎn)上儲存的廣播變量的一致性(如果這個變量后來又被傳輸給一個新的節(jié)點(diǎn))。
2、累加器
累加器是在一個相關(guān)過程中只能被”累加”的變量,對這個變量的操作可以有效地被并行化。它們可以被用于實(shí)現(xiàn)計(jì)數(shù)器(就像在MapReduce過程中)或求和運(yùn)算。Spark原生支持對數(shù)字類型的累加器,程序員也可以為其他新的類型添加支持。累加器被以一個名字創(chuàng)建之后,會在Spark的UI中顯示出來。這有助于了解計(jì)算的累進(jìn)過程(注意:目前Python中不支持這個特性)。
可以通過SparkContext.accumulator(v)來從變量v創(chuàng)建一個累加器。在集群中運(yùn)行的任務(wù)隨后可以使用add方法或+=操作符(在Scala和Python中)來向這個累加器中累加值。但是,他們不能讀取累加器中的值。只有驅(qū)動程序可以讀取累加器中的值,通過累加器的value方法。
以下的代碼展示了向一個累加器中累加數(shù)組元素的過程:
accum = sc.accumulator(0) Accumulator>>> sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x)) ... 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s scala> accum.value 10
這段代碼利用了累加器對 int 類型的內(nèi)建支持,程序員可以通過繼承 AccumulatorParam 類來創(chuàng)建自己想要的類型支持。AccumulatorParam 的接口提供了兩個方法:zero用于為你的數(shù)據(jù)類型提供零值;addInPlace 用于計(jì)算兩個值得和。比如,假設(shè)我們有一個 Vector類表示數(shù)學(xué)中的向量,我們可以這樣寫:
class VectorAccumulatorParam(AccumulatorParam): def zero(self, initialValue): return Vector.zeros(initialValue.size) def addInPlace(self, v1, v2): v1 += v2 return v1 # Then, create an Accumulator of this type: vecAccum = sc.accumulator(Vector(...), VectorAccumulatorParam())
累加器的更新操作只會被運(yùn)行一次,Spark 提供了保證,每個任務(wù)中對累加器的更新操作都只會被運(yùn)行一次。比如,重啟一個任務(wù)不會再次更新累加器。在轉(zhuǎn)化過程中,用戶應(yīng)該留意每個任務(wù)的更新操作在任務(wù)或作業(yè)重新運(yùn)算時是否被執(zhí)行了超過一次。
累加器不會改變Spark 的惰性求值模型。如果累加器在對RDD的操作中被更新了,它們的值只會在啟動操作中作為 RDD 計(jì)算過程中的一部分被更新。所以,在一個懶惰的轉(zhuǎn)化操作中調(diào)用累加器的更新,并沒法保證會被及時運(yùn)行。 下面的代碼段展示了這一點(diǎn):
accum = sc.accumulator(0) data.map(lambda x => acc.add(x); f(x)) // 這里,accum任然是0,因?yàn)闆]有action算子,所以map也不會進(jìn)行實(shí)際的計(jì)算任務(wù)的提交以及Standalone集群模式的部署
參考官方文檔:http://spark.apache.org/docs/...
spark-submit
首先需要打包代碼,如果你的代碼需要依賴其他的包環(huán)境則需要多帶帶的打包這些依賴,應(yīng)為cluster會將所有依賴的jar包分發(fā)到各個節(jié)點(diǎn)上進(jìn)行使用。推薦的方法是將依賴包和程序都統(tǒng)一的打成一個包,這樣就可以直接使用spark-submit方法來運(yùn)行,具體的pom.xml配置如下:
org.apache.spark spark-core_2.10 2.11 provided mysql mysql-connector-java 5.1.13 org.apache.hadoop hadoop-client 2.6.0 provided junit junit 4.11 test org.apache.maven.plugins maven-compiler-plugin 2.3.2 1.7 1.7 maven-assembly-plugin 2.5.5 jar-with-dependencies make-assembly package single
spark && hadoop 的scope值都設(shè)置為provided
在服務(wù)器上提交的命令如下:
./bin/spark-submit --class--master --deploy-mode --conf = ... # other options [application-arguments]
spark-submit 可以加載一個配置文件,默認(rèn)是加載在conf/spark-defaults.conf
單元測試Spark對所有常見的單元測試框架提供友好的支持。你只需要在測試中創(chuàng)建一個SparkContext對象,然后吧master URL設(shè)為local,運(yùn)行測試操作,最后調(diào)用 SparkContext.stop() 來停止測試。注意,一定要在 finally 代碼塊或者單元測試框架的 tearDown方法里調(diào)用SparkContext.stop(),因?yàn)镾park不支持同一程序中有多個SparkContext對象同時運(yùn)行。
部署1、Spark Standalone Mode
除了運(yùn)行在Mesos和YARN集群之外,spark也提供了簡單的獨(dú)立部署模式??梢酝ㄟ^手動的啟動master和worker,也可以通過spark提供的啟動腳本來啟動。獨(dú)立部署也可以通過運(yùn)行在一個機(jī)器上,進(jìn)行測試。
為了安裝你需要放置一個編譯好的spark版本到每個機(jī)器上。
啟動集群有兩種方式,一種是手動啟動,另一種是通過啟動腳本啟動。
1.1、手動啟動spark集群
啟動一個獨(dú)立的master可以使用如下的命令:
./sbin/start-master.sh
一旦啟動可以通過訪問:http://localhost:8080端口訪問master
可以使用如下的命令來使worker節(jié)點(diǎn)連接到master上:
./sbin/start-slave.sh
worker在加入到master后可以訪問master的http://localhost:8080,可以看到被加入的worker節(jié)點(diǎn)的信息。
在啟動master和worker的時候可以帶上參數(shù)進(jìn)行設(shè)置,參數(shù)的列表如下:其中比較重要的是:
-c CORES, 這個是指定多少個cpu分配給spark使用,默認(rèn)是全部cpu
-m MEM,這個是指定多少的內(nèi)存分配給spark使用,默認(rèn)是全部的內(nèi)存的減去1g的操作系統(tǒng)內(nèi)存全部分配給spark使用。一般的格式是1000M or 2G
-d DIR, 這個指定spark任務(wù)的日志輸出目錄。
–properties-file FILE 指定spark指定加載的配置文件的路徑默認(rèn)是: conf/spark-defaults.conf
1.2、腳本方式部署
通過spark的部署腳本部署首先需要在spark的主目錄下創(chuàng)建一個conf/slaves的文件,這個文件中每一行代表一個worker的hostname.需要注意的是,master訪問worker節(jié)點(diǎn)是通過SSH訪問的,所以需要master通過ssh無密碼的登錄到worker,否則需要設(shè)置一個 SPARK_SSH_FOREGROUND的環(huán)境變量,這個變量的值就是每個worker的密碼
然后可以通過spark安裝目錄下的sbin/….sh文件進(jìn)行啟動, 如果要啟動和停止master和slave可以使用:
sbin/start-all.sh
sbin/stop-all.sh
注意的是這些腳本必須是在master機(jī)器上執(zhí)行
同時你可以通過配置集群的 conf/spark-env.sh文件來進(jìn)一步配置集群的環(huán)境。但是這也文件需要通過拷貝conf/spark-env.sh.template文件來創(chuàng)建,并且需要把這個文件拷貝到所有的worker節(jié)點(diǎn)上。
其中: SPARK_MASTER_OPTS && SPARK_WORKER_OPTS 兩個配置項(xiàng)比較復(fù)雜。
通過在SparkContext構(gòu)造器中傳入spark://IP:PORT這個來啟用這個集群。同時可以在交互式的方式啟動腳本中使用:./bin/spark-shell –master spark://IP:PORT 來啟動集群執(zhí)行。
獨(dú)立部署模式的集群現(xiàn)在只是簡單的支持FIFO調(diào)度。 為了允許多個并發(fā)用戶,可以通過SparkConf設(shè)置每個應(yīng)用程序需要的資源的最大數(shù)。默認(rèn)情況下,它會請求使用集群的全部的核,而這只是同時運(yùn)行一個應(yīng)用程序才回有意義。
val conf = new SparkConf() .setMaster(...) .setAppName(...) .set("spark.cores.max", "10") val sc = new SparkContext(conf)
除了可以在程序中指定你也可以在spark-env.sh中設(shè)置默認(rèn)的值,export SPARK_MASTER_OPTS="-Dspark.deploy.defaultCores=
2、spark的高可用設(shè)置
spark的高可用設(shè)置有兩種,一種是通過Zookeeper來實(shí)現(xiàn),另一種是通過本地文件系統(tǒng)來實(shí)現(xiàn)。
2.1、使用ZooKeeper備份master,利用zookeeper提供的領(lǐng)導(dǎo)選舉和狀態(tài)保存,你可以讓更多的master連接到zookeepre實(shí)例。一個將會被選舉為leader其他的則會保存?zhèn)浞菟臓顟B(tài)。如果master死掉,zookeeper可以選舉一個新的leader,整個過程需要1到2分鐘的時間,但是這個過程只會對新的任務(wù)調(diào)度有影響。為了使用這種方式需要的配置項(xiàng)為:SPARK_DAEMON_JAVA_OPTS,這個配置項(xiàng)有三個配置信息:spark.deploy.recoveryMode/spark.deploy.zookeeper.url/spark.deploy.zookeeper.dir
2.2、使用本地文件系統(tǒng)來恢復(fù)該節(jié)點(diǎn)。為了使用這種方式需要的配置項(xiàng)為:SPARK_DAEMON_JAVA_OPTS,這個配置項(xiàng)有兩個配置信息:spark.deploy.recoveryMode、spark.deploy.recoveryDirectory
Spark架構(gòu)與原理
Spark架構(gòu)采用了分布式計(jì)算中的Master-Slave模型。Master是對應(yīng)集群中的含有Master進(jìn)程的節(jié)點(diǎn),Slave是集群中含有Worker進(jìn)程的節(jié)點(diǎn)。Master作為整個集群的控制器,負(fù)責(zé)整個集群的正常運(yùn)行;Worker相當(dāng)于是計(jì)算節(jié)點(diǎn),接收主節(jié)點(diǎn)命令與進(jìn)行狀態(tài)匯報;Executor負(fù)責(zé)任務(wù)的執(zhí)行;Cluster作為用戶的客戶端負(fù)責(zé)提交應(yīng)用,Driver負(fù)責(zé)控制一個應(yīng)用的執(zhí)行。
Spark集群部署后,需要在主節(jié)點(diǎn)和從節(jié)點(diǎn)分別啟動Master進(jìn)程和Woker進(jìn)程,對整個集群進(jìn)行控制。在一個Spark應(yīng)用的執(zhí)行過程中,Driver和Worker是兩個重要角色。Driver程序是應(yīng)用邏輯執(zhí)行的起點(diǎn),負(fù)責(zé)作業(yè)的調(diào)度,即Task任務(wù)的分發(fā),而多個Worker用來管理計(jì)算節(jié)點(diǎn)和創(chuàng)建Executor并行處理任務(wù)。在執(zhí)行階段,Driver會將Task和Task所依賴的
file和jar序列化后傳遞給對應(yīng)的Worker機(jī)器,同時Exucutor對相應(yīng)數(shù)據(jù)分區(qū)的任務(wù)進(jìn)行處理。
下面詳細(xì)介紹Spark的架構(gòu)中的基本組件。
ClusterManager:在Standalone模式中即為Master(主節(jié)點(diǎn)),控制整個集群,監(jiān)控Worker。在YARN模式中為資源管理器。
Worker:從節(jié)點(diǎn),負(fù)責(zé)控制計(jì)算節(jié)點(diǎn),啟動Executor或Driver。在YARN模式中為NodeManager,負(fù)責(zé)計(jì)算節(jié)點(diǎn)的控制。
Spark整體流程為:Client提交應(yīng)用,Master找到一個Worker啟動Driver,Driver向Master或者資源管理器申請資源,之后將應(yīng)用轉(zhuǎn)化為RDD Graph,再由DAGScheduler將RDD Graph轉(zhuǎn)化為Stage的有向無環(huán)圖提交給TaskScheduler,由TaskScheduler提交任務(wù)給Executor執(zhí)行。在任務(wù)執(zhí)行過程中,其他組件協(xié)同工作,確保整個應(yīng)用順利進(jìn)行。
Application:應(yīng)用??梢哉J(rèn)為是多次批量計(jì)算組合起來的過程,在物理上可以表現(xiàn)為你寫的程序包+部署配置。應(yīng)用的概念類似于計(jì)算機(jī)中的程序,它只是一個藍(lán)本,尚沒有運(yùn)行起來。
RDD:Resilient Distributed Datasets,彈性分布式數(shù)據(jù)集。RDD即是計(jì)算模型里的一個概念,也是你編程時用到的一種類。一個RDD可以認(rèn)為是spark在執(zhí)行分布式計(jì)算時的 一批相同來源、相同結(jié)構(gòu)、相同用途的數(shù)據(jù)集,這個數(shù)據(jù)集可能被切割成多個分區(qū),分布在不同的機(jī)器上,無論如何,這個數(shù)據(jù)集被稱為一個RDD。在編程 時,RDD對象就對應(yīng)了這個數(shù)據(jù)集,并且RDD對象被當(dāng)作一個數(shù)據(jù)操作的基本單位。比如,對某個RDD對象進(jìn)行map操作,其實(shí)就相當(dāng)于將數(shù)據(jù)集中的每個 分區(qū)的每一條數(shù)據(jù)進(jìn)行了map映射。
Partition:分區(qū)。一個RDD在物理上被切割成多個數(shù)據(jù)子集,分布在不同的機(jī)器上。每個數(shù)據(jù)子集叫一個分區(qū)。
RDD Graph:RDD組成的DAG(有向無環(huán)圖)。RDD是不可變的,一個RDD經(jīng)過某種操作后,會生成一個新的RDD。這樣說來,一個 Application中的程序,其內(nèi)容基本上都是對各種RDD的操作,從源RDD,經(jīng)過各種計(jì)算,產(chǎn)生中間RDD,最后生成你想要的RDD并輸出。這個 過程中的各個RDD,會構(gòu)成一個有向無環(huán)圖。
Lineage:血統(tǒng)。RDD這個概念本身包含了這種信息“由哪個父類RDD經(jīng)過哪種操作得到”。所以某個RDD可以通過不斷尋找父類,找到最原始的那個RDD。這條繼承路徑就認(rèn)為是RDD的血統(tǒng)。
Job:從Application和RDD Graph的概念可以知道,一個應(yīng)用往往對應(yīng)了一個RDD Graph。這個應(yīng)用在準(zhǔn)備被spark集群運(yùn)行前,實(shí)際上就是會生成一個或多個RDD Graph結(jié)構(gòu),而一個RDD Graph,又可以生成一個或多個Job。一個Job可以認(rèn)為就是會最終輸出一個結(jié)果RDD(后面會介紹,實(shí)際上這是action操作)的一條由RDD組 織而成的計(jì)算,在Application生成的RDD Graph上表現(xiàn)為一個子圖。Job在spark里應(yīng)用里也是一個被調(diào)度的單位。
寬依賴:RDD生成另一個RDD時,各個兩個父子RDD間分區(qū)的對應(yīng)關(guān)系,被叫做RDD間依賴。寬依賴就是子RDD的某個分區(qū),依賴父RDD的全部分區(qū)。
窄依賴:窄依賴就是子RDD的某個分區(qū),只依賴常數(shù)個父RDD的分區(qū)。寬窄依賴的區(qū)別如下圖所示。
Stage:Stage可以理解為完成一個Job的不同階段。一個Job被劃分為多個Stage,每個Stage又包含了對多個RDD的多個操作。一個Stage里,一般包含了一個寬依賴操作,或者多個窄依賴操作。
窄依賴是指前一個rdd計(jì)算能出一個唯一的rdd,比如map或者filter等;寬依賴則是指多個rdd生成一個或者多個rdd的操作,比如groupbykey reducebykey等,這種寬依賴通常會進(jìn)行shuffle。
算子:父子RDD間的某種操作,被叫某種算子。比如下面會介紹的map,filter,groupByKey等。算子可從多個維度分類,之后再介紹。
Task:一個分區(qū)對應(yīng)一個Task。實(shí)際上一個Task就是在一個Stage范圍內(nèi),某個Executor所要執(zhí)行的算子。
TaskSet:一個Stage范圍內(nèi),所有相同的Task被稱為一個TaskSet。
DAGScheduler:DAGScheduler用于根據(jù)RDD DAG切分Stage,并維護(hù)各個Stage的先后依賴關(guān)系,相當(dāng)于完成了一個Job內(nèi)的不同Stage間的調(diào)度策略。
TasksetManager:管理一個TaskSet,并決定了這個TaskSet中各個Task的分發(fā)策略。
TaskScheduler:執(zhí)行實(shí)際的Task分發(fā)操作。
SparkUI、History Server:SparkUI: 4044
History Server:18080
怎么看?http://www.cnblogs.com/xing90...
http://blog.csdn.net/qq_26562...
http://blog.csdn.net/suzyu123...
http://www.cnblogs.com/helloc...
http://blog.csdn.net/suzyu123...
http://www.jianshu.com/nb/340...
http://www.cnblogs.com/ainima...
http://www.chinahadoop.cn/gro...
https://yq.aliyun.com/article...
http://ifeve.com/category/spa...
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/67244.html
摘要:如果你想查看運(yùn)行時模塊的加載過程輸出結(jié)果表示為模塊,由于我限制了不再往下輸出了,而我們模塊又沒有別的額外依賴,所以僅有這行輸出。 jdk9模塊快速入門 列出自帶模塊:java --list-modulesmac多版本jdk共存:http://adolphor.com/blog/2016...模塊規(guī)則示意圖:showImg(https://segmentfault.com/img/bVb...
摘要:楚江數(shù)據(jù)是專業(yè)的互聯(lián)網(wǎng)數(shù)據(jù)技術(shù)服務(wù),現(xiàn)整理出零基礎(chǔ)如何學(xué)爬蟲技術(shù)以供學(xué)習(xí),。本文來源知乎作者路人甲鏈接楚江數(shù)據(jù)提供網(wǎng)站數(shù)據(jù)采集和爬蟲軟件定制開發(fā)服務(wù),服務(wù)范圍涵蓋社交網(wǎng)絡(luò)電子商務(wù)分類信息學(xué)術(shù)研究等。 楚江數(shù)據(jù)是專業(yè)的互聯(lián)網(wǎng)數(shù)據(jù)技術(shù)服務(wù),現(xiàn)整理出零基礎(chǔ)如何學(xué)爬蟲技術(shù)以供學(xué)習(xí),http://www.chujiangdata.com。 第一:Python爬蟲學(xué)習(xí)系列教程(來源于某博主:htt...
摘要:以下這些項(xiàng)目,你拿來學(xué)習(xí)學(xué)習(xí)練練手。當(dāng)你每個步驟都能做到很優(yōu)秀的時候,你應(yīng)該考慮如何組合這四個步驟,使你的爬蟲達(dá)到效率最高,也就是所謂的爬蟲策略問題,爬蟲策略學(xué)習(xí)不是一朝一夕的事情,建議多看看一些比較優(yōu)秀的爬蟲的設(shè)計(jì)方案,比如說。 (一)如何學(xué)習(xí)Python 學(xué)習(xí)Python大致可以分為以下幾個階段: 1.剛上手的時候肯定是先過一遍Python最基本的知識,比如說:變量、數(shù)據(jù)結(jié)構(gòu)、語法...
摘要:學(xué)習(xí)致謝一數(shù)據(jù)數(shù)據(jù)網(wǎng)站二需求針對用戶查詢?nèi)罩緮?shù)據(jù)中不同字段,使用讀取日志數(shù)據(jù),封裝到數(shù)據(jù)集中,調(diào)用函數(shù)和函數(shù)進(jìn)行處理不同業(yè)務(wù)統(tǒng)計(jì)分析三分詞工具測試使用比較流行好用的中文分區(qū)面向生產(chǎn)環(huán)境的自然語言處理工具包,是由一系列模 ...
閱讀 1559·2021-09-22 15:35
閱讀 2035·2021-09-14 18:04
閱讀 918·2019-08-30 15:55
閱讀 2480·2019-08-30 15:53
閱讀 2709·2019-08-30 12:45
閱讀 1228·2019-08-29 17:01
閱讀 2604·2019-08-29 15:30
閱讀 3535·2019-08-29 15:09