摘要:過程中,各個(gè)節(jié)點(diǎn)上的相同都會(huì)先寫入本地磁盤文件中,然后其他節(jié)點(diǎn)需要通過網(wǎng)絡(luò)傳輸拉取各個(gè)節(jié)點(diǎn)上的磁盤文件中的相同。因此在過程中,可能會(huì)發(fā)生大量的磁盤文件讀寫的操作,以及數(shù)據(jù)的網(wǎng)絡(luò)傳輸操作。
需要對名為“hello.txt”的HDFS文件進(jìn)行一次map操作,再進(jìn)行一次reduce操作。也就是說,需要對一份數(shù)據(jù)執(zhí)行兩次算子操作。
錯(cuò)誤的做法:
對于同一份數(shù)據(jù)執(zhí)行多次算子操作時(shí),創(chuàng)建多個(gè)RDD。//這里執(zhí)行了兩次textFile方法,針對同一個(gè)HDFS文件,創(chuàng)建了兩個(gè)RDD出來,然后分別對每個(gè)RDD都執(zhí)行了一個(gè)算子操作。
這種情況下,Spark需要從HDFS上兩次加載hello.txt文件的內(nèi)容,并創(chuàng)建兩個(gè)多帶帶的RDD;//第二次加載HDFS文件以及創(chuàng)建RDD的性能開銷,很明顯是白白浪費(fèi)掉的。
val rdd1 = sc.textFile("hdfs://master:9000/hello.txt")rdd1.map(...)val rdd2 = sc.textFile("hdfs://master:9000/hello.txt")rdd2.reduce(...)
正確的用法:
對于一份數(shù)據(jù)執(zhí)行多次算子操作時(shí),只使用一個(gè)RDD。
錯(cuò)誤的做法:
有一個(gè)
接著由于業(yè)務(wù)需要,對rdd1執(zhí)行了一個(gè)map操作,創(chuàng)建了一個(gè)rdd2,而rdd2中的數(shù)據(jù)僅僅是rdd1中的value值而已,也就是說,rdd2是rdd1的子集。
JavaPairRDD rdd1 = ...JavaRDD rdd2 = rdd1.map(...)
分別對rdd1和rdd2執(zhí)行了不同的算子操作。
rdd1.reduceByKey(...)rdd2.map(...)
正確的做法:
rdd2的數(shù)據(jù)完全就是rdd1的子集而已,卻創(chuàng)建了兩個(gè)rdd,并對兩個(gè)rdd都執(zhí)行了一次算子操作。
此時(shí)會(huì)因?yàn)閷dd1執(zhí)行map算子來創(chuàng)建rdd2,而多執(zhí)行一次算子操作,進(jìn)而增加性能開銷。
其實(shí)在這種情況下完全可以復(fù)用同一個(gè)RDD。
我們可以使用rdd1,既做reduceByKey操作,也做map操作。
JavaPairRDD rdd1 = ...rdd1.reduceByKey(...)rdd1.map(tuple._2...)
正確的做法:
cache()方法表示:使用非序列化的方式將RDD中的數(shù)據(jù)全部嘗試持久化到內(nèi)存中。
此時(shí)再對rdd1執(zhí)行兩次算子操作時(shí),只有在第一次執(zhí)行map算子時(shí),才會(huì)將這個(gè)rdd1從源頭處計(jì)算一次。
第二次執(zhí)行reduce算子時(shí),就會(huì)直接從內(nèi)存中提取數(shù)據(jù)進(jìn)行計(jì)算,不會(huì)重復(fù)計(jì)算一個(gè)rdd。
val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt").cache()rdd1.map(...)rdd1.reduce(...)
序列化的方式可以減少持久化的數(shù)據(jù)對內(nèi)存/磁盤的占用量,進(jìn)而避免內(nèi)存被持久化數(shù)據(jù)占用過多,從而發(fā)生頻繁GC。
val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt") .persist(StorageLevel.MEMORY_AND_DISK_SER)rdd1.map(...)rdd1.reduce(...)
注意:通常不建議使用DISK_ONLY和后綴為_2的級別:因?yàn)橥耆诖疟P文件進(jìn)行數(shù)據(jù)的讀寫,會(huì)導(dǎo)致性能急劇降低,導(dǎo)致網(wǎng)絡(luò)較大開銷
如果有可能的話,要盡量避免使用shuffle類算子,最消耗性能的地方就是shuffle過程。
shuffle過程中,各個(gè)節(jié)點(diǎn)上的相同key都會(huì)先寫入本地磁盤文件中,然后其他節(jié)點(diǎn)需要通過網(wǎng)絡(luò)傳輸拉取各個(gè)節(jié)點(diǎn)上的磁盤文件中的相同key。而且相同key都拉取到同一個(gè)節(jié)點(diǎn)進(jìn)行聚合操作時(shí),還有可能會(huì)因?yàn)橐粋€(gè)節(jié)點(diǎn)上處理的key過多,導(dǎo)致內(nèi)存不夠存放,進(jìn)而溢寫到磁盤文件中。因此在shuffle過程中,可能會(huì)發(fā)生大量的磁盤文件讀寫的IO操作,以及數(shù)據(jù)的網(wǎng)絡(luò)傳輸操作。磁盤IO和網(wǎng)絡(luò)數(shù)據(jù)傳輸也是shuffle性能較差的主要原因。
盡可能避免使用reduceByKey、join、distinct、repartition等會(huì)進(jìn)行shuffle的算子,盡量使用map類的非shuffle算子。
傳統(tǒng)的join操作會(huì)導(dǎo)致shuffle操作。
因?yàn)閮蓚€(gè)RDD中,相同的key都需要通過網(wǎng)絡(luò)拉取到一個(gè)節(jié)點(diǎn)上,由一個(gè)task進(jìn)行join操作。
val rdd3 = rdd1.join(rdd2)
Broadcast+map的join操作,不會(huì)導(dǎo)致shuffle操作。
使用Broadcast將一個(gè)數(shù)據(jù)量較小的RDD作為廣播變量。
val rdd2Data = rdd2.collect()val rdd2DataBroadcast = sc.broadcast(rdd2Data)val rdd3 = rdd1.map(rdd2DataBroadcast...)
注意:以上操作,建議僅僅在rdd2的數(shù)據(jù)量比較少(比如幾百M(fèi),或者一兩G)的情況下使用。因?yàn)槊總€(gè)Executor的內(nèi)存中,都會(huì)駐留一份rdd2的全量數(shù)據(jù)。
如果因?yàn)闃I(yè)務(wù)需要,一定要使用shuffle操作,無法用map類的算子來替代,那么盡量使用可以map-side預(yù)聚合的算子,類似于MapReduce中的本地combiner。map-side預(yù)聚合之后,每個(gè)節(jié)點(diǎn)本地就只會(huì)有一條相同的key,因?yàn)槎鄺l相同的key都被聚合起來了。其他節(jié)點(diǎn)在拉取所有節(jié)點(diǎn)上的相同key時(shí),就會(huì)大大減少需要拉取的數(shù)據(jù)數(shù)量,從而也就減少了磁盤IO以及網(wǎng)絡(luò)傳輸開銷。
建議使用reduceByKey或者aggregateByKey算子來替代掉groupByKey算子
使用reduceByKey/aggregateByKey替代groupByKey : map-side
使用mapPartitions替代普通map : 函數(shù)執(zhí)行頻率
使用foreachPartitions替代foreach : 函數(shù)執(zhí)行頻率
使用filter之后進(jìn)行coalesce操作 : filter后對分區(qū)進(jìn)行壓縮
使用repartitionAndSortWithinPartitions替代repartition與sort類操作
repartitionAndSortWithinPartitions是Spark官網(wǎng)推薦的一個(gè)算子,官方建議,如果需要在repartition重分區(qū)之后,還要進(jìn)行排序,建議直接使用repartitionAndSortWithinPartitions算子
有時(shí)在開發(fā)過程中,會(huì)遇到需要在算子函數(shù)中使用外部變量的場景(尤其是大變量,比如100M以上的大集合),那么此時(shí)就應(yīng)該使用Spark的廣播(Broadcast)功能來提升性能。
默認(rèn)情況下,Spark會(huì)將該變量復(fù)制多個(gè)副本,通過網(wǎng)絡(luò)傳輸?shù)絫ask中,此時(shí)每個(gè)task都有一個(gè)變量副本。如果變量本身比較大的話(比如100M,甚至1G),那么大量的變量副本在網(wǎng)絡(luò)中傳輸?shù)男阅荛_銷,以及在各個(gè)節(jié)點(diǎn)的Executor中占用過多內(nèi)存導(dǎo)致的頻繁GC,都會(huì)極大地影響性能。
廣播后的變量,會(huì)保證每個(gè)Executor的內(nèi)存中,只駐留一份變量副本,而Executor中的task執(zhí)行時(shí)共享該Executor中的那份變量副本。
1)在算子函數(shù)中使用到外部變量時(shí),該變量會(huì)被序列化后進(jìn)行網(wǎng)絡(luò)傳輸。
2)將自定義的類型作為RDD的泛型類型時(shí)(比如JavaRDD,Student是自定義類型),所有自定義類型對象,都會(huì)進(jìn)行序列化。因此這種情況下,也要求自定義的類必須實(shí)現(xiàn)Serializable接口。
3)使用可序列化的持久化策略時(shí)(比如MEMORY_ONLY_SER),Spark會(huì)將RDD中的每個(gè)partition都序列化成一個(gè)大的字節(jié)數(shù)組。
Spark默認(rèn)使用的是Java的序列化機(jī)制,你可以使用Kryo作為序列化類庫,效率要比Java的序列化機(jī)制要高
// 創(chuàng)建SparkConf對象。val conf = new SparkConf().setMaster(...).setAppName(...)// 設(shè)置序列化器為KryoSerializer。conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")// 注冊要序列化的自定義類型。conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
當(dāng)遇到userData和events進(jìn)行join時(shí),userData比較大,而且join操作比較頻繁,這個(gè)時(shí)候,可以先將userData調(diào)用了 partitionBy()分區(qū),可以極大提高效率。
cogroup()、 groupWith()、join()、leftOuterJoin()、rightOuterJoin()、groupByKey()、reduceByKey()、 combineByKey() 以及 lookup()等都能夠受益
總結(jié):如果遇到一個(gè)RDD頻繁和其他RDD進(jìn)行Shuffle類操作,比如 cogroup()、 groupWith()、join()、leftOuterJoin()、rightOuterJoin()、groupByKey()、reduceByKey()、 combineByKey() 以及 lookup()等,那么最好將該RDD通過partitionBy()操作進(jìn)行預(yù)分區(qū),這些操作在Shuffle過程中會(huì)減少Shuffle的數(shù)據(jù)量
Java中,有三種類型比較耗費(fèi)內(nèi)存:
1)對象,每個(gè)Java對象都有對象頭、引用等額外的信息,因此比較占用內(nèi)存空間。
2)字符串,每個(gè)字符串內(nèi)部都有一個(gè)字符數(shù)組以及長度等額外信息。
3)集合類型,比如HashMap、LinkedList等,因?yàn)榧项愋蛢?nèi)部通常會(huì)使用一些內(nèi)部類來封裝集合元素,比如Map.Entry
Spark官方建議,在Spark編碼實(shí)現(xiàn)中,特別是對于算子函數(shù)中的代碼,盡量不要使用上述三種數(shù)據(jù)結(jié)構(gòu),盡量使用字符串替代對象,使用原始類型(比如Int、Long)替代字符串,使用數(shù)組替代集合類型,這樣盡可能地減少內(nèi)存占用,從而降低GC頻率,提升性能。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/123982.html
摘要:正如我標(biāo)題所說,簡歷被拒。看了我簡歷之后說頭條競爭激烈,我背景不夠,點(diǎn)到為止。。三準(zhǔn)備面試其實(shí)從三月份投遞簡歷開始準(zhǔn)備面試到四月份收,也不過個(gè)月的時(shí)間,但這都是建立在我過去一年的積累啊。 本文是 無精瘋 同學(xué)投稿的面試經(jīng)歷 關(guān)注微信公眾號:進(jìn)擊的java程序員K,即可獲取最新BAT面試資料一份 在此感謝 無精瘋 同學(xué)的分享 目錄: 印象中的頭條 面試背景 準(zhǔn)備面試 ...
摘要:正如我標(biāo)題所說,簡歷被拒??戳宋液啔v之后說頭條競爭激烈,我背景不夠,點(diǎn)到為止。。三準(zhǔn)備面試其實(shí)從三月份投遞簡歷開始準(zhǔn)備面試到四月份收,也不過個(gè)月的時(shí)間,但這都是建立在我過去一年的積累啊。 本文是 無精瘋 同學(xué)投稿的面試經(jīng)歷 關(guān)注微信公眾號:進(jìn)擊的java程序員K,即可獲取最新BAT面試資料一份 在此感謝 無精瘋 同學(xué)的分享目錄:印象中的頭條面試背景準(zhǔn)備面試頭條一面(Java+項(xiàng)目)頭條...
摘要:創(chuàng)新萌芽期望最頂點(diǎn)下調(diào)預(yù)期至低點(diǎn)回歸理想生產(chǎn)率平臺。而大數(shù)據(jù)已從頂峰滑落,和云計(jì)算接近谷底。對于迅速成長的中國市場,大公司也意味著大數(shù)據(jù)。三家對大數(shù)據(jù)的投入都是不惜余力的。 非商業(yè)轉(zhuǎn)載請注明作譯者、出處,并保留本文的原始鏈接:http://www.ituring.com.cn/article/177529 董飛,Coursera數(shù)據(jù)工程師。曾先后在創(chuàng)業(yè)公司酷迅,百度基礎(chǔ)架構(gòu)組...
閱讀 2040·2021-11-19 11:37
閱讀 729·2021-11-11 16:54
閱讀 1179·2021-11-02 14:44
閱讀 3078·2021-09-02 15:40
閱讀 2383·2019-08-30 15:44
閱讀 970·2019-08-29 11:17
閱讀 1073·2019-08-26 14:06
閱讀 1567·2019-08-26 13:47