摘要:摘要是一個項(xiàng)目,它被標(biāo)榜為快如閃電的集群計算。它擁有一個繁榮的開源社區(qū),并且是目前最活躍的項(xiàng)目。提供了一個更快更通用的數(shù)據(jù)處理平臺。更多經(jīng)典案例介紹期待下回分解。。。
摘要
Spark是一個Apache項(xiàng)目,它被標(biāo)榜為“快如閃電的集群計算”。它擁有一個繁榮的開源社區(qū),并且是目前最活躍的Apache項(xiàng)目。Spark提供了一個更快、更通用的數(shù)據(jù)處理平臺。和Hadoop相比,Spark可以讓你的程序在內(nèi)存中運(yùn)行時速度提升100倍,或者在磁盤上運(yùn)行時速度提升10倍。同時spark也讓傳統(tǒng)的map reduce job開發(fā)變得更加簡單快捷。本文將簡單介紹幾個經(jīng)典hadoop的mr按理用spark實(shí)現(xiàn),來讓大家熟悉spark的開發(fā)。
最大值最小值求最大值最小值一直是Hadoop的經(jīng)典案例,我們用Spark來實(shí)現(xiàn)一下,借此感受一下spark中mr的思想和實(shí)現(xiàn)方式。話不多說直接上code:
@Test def testMaxMin: Unit = { val sconf = new SparkConf().setAppName("test") val sc = new SparkContext(sconf) //初始化測試數(shù)據(jù) val data = sc.parallelize(Array(10,7,3,4,5,6,7,8,1001,6,2)) //方法一 val res = data.map(x => ("key", x)).groupByKey().map(x => { var min = Integer.MAX_VALUE var max = Integer.MIN_VALUE for(num <- x._2){ if(num>max){ max = num } if(num{ println("max "+x._1) println("min "+x._2) }) //方法二,下面用一個比較雞賊的方式求最大最小值 val max = data.reduce((a,b) => Math.max(a,b)) val min = data.reduce((a,b) => Math.min(a,b)) println("max : " + max) println("min : " + min) sc.stop }
預(yù)期結(jié)果:
max: 1001 min: 2
思路和hadoop中的mr類似,設(shè)定一個key,value為需要求最大與最小值的集合,然后再groupBykey聚合在一起處理。第二個方法就更簡單,性能也更好。
平均值問題求每個key對應(yīng)的平均值是常見的案例,在spark中處理類似問題常常會用到combineByKey這個函數(shù),詳細(xì)介紹請google一下用法,下面看代碼:
@Test def testAvg(): Unit ={ val sconf = new SparkConf().setAppName("test") val sc = new SparkContext(sconf) //初始化測試數(shù)據(jù) val foo = sc.parallelize(List(Tuple2("a", 1), Tuple2("a", 3), Tuple2("b", 2), Tuple2("b", 8))); //這里需要用到combineByKey這個函數(shù),需要了解的請google val results=foo.combineByKey( (v)=>(v,1), (acc:(Int,Int),v) =>(acc._1+v,acc._2+1), (acc1:(Int,Int),acc2:(Int,Int))=>(acc1._1+acc2._1,acc1._2+acc2._2) ).map{case(key,value)=>(key,value._1/value._2.toDouble)} results.collect().foreach(println) }
我們讓每個partiton先求出單個partition內(nèi)各個key對應(yīng)的所有整數(shù)的和 sum以及個數(shù) count,然后返回一個pair(sum, count)在shuffle后累加各個key對應(yīng)的所有sum和count,再相除得到均值.
TopN問題Top n問題同樣也是hadoop種體現(xiàn)mr思想的經(jīng)典案例,那么在spark中如何方便快捷的解決呢:
@Test def testTopN(): Unit ={ val sconf = new SparkConf().setAppName("test") val sc = new SparkContext(sconf) //初始話測試數(shù)據(jù) val foo = sc.parallelize(Array( ("a", 1), ("a", 2), ("a", 3), ("b", 3), ("b", 1), ("a", 4), ("b", 4), ("b", 2) )) //這里測試,取top 2。 val groupsSort=foo.groupByKey().map(tu=>{ val key=tu._1 val values=tu._2 val sortValues=values.toList.sortWith(_>_).take(2) (key,sortValues) }) //轉(zhuǎn)換格式進(jìn)行print val flattenedTopNPerGroup = groupsSort.flatMap({case (key, numbers) => numbers.map(key -> _)}) flattenedTopNPerGroup.foreach((value: Any) => { println(value) }) sc.stop }
思路很簡單,把數(shù)據(jù)groupBykey以后按key形成分組然后取每個分組最大的2個。預(yù)期結(jié)果:
(a,4) (a,3) (b,4) (b,3)
以上簡單介紹了一下hadoop中常見的3個案例在spark中的實(shí)現(xiàn)。如果讀者們已經(jīng)接觸過或者寫過一些hadoop的mapreduce job,那么會不會覺得在spark中寫起來方便快捷很多呢。
更多spark經(jīng)典案例介紹期待下回分解。。。
作者信息
MaxLeap團(tuán)隊_數(shù)據(jù)分析組 成員:譚楊【原創(chuàng)】
首發(fā)自:https://blog.maxleap.cn/archi...
作者往期佳作
淺析時間序列數(shù)據(jù)
淺析Apache Spark Caching和Checkpointing
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/66285.html
摘要:創(chuàng)新萌芽期望最頂點(diǎn)下調(diào)預(yù)期至低點(diǎn)回歸理想生產(chǎn)率平臺。而大數(shù)據(jù)已從頂峰滑落,和云計算接近谷底。對于迅速成長的中國市場,大公司也意味著大數(shù)據(jù)。三家對大數(shù)據(jù)的投入都是不惜余力的。 非商業(yè)轉(zhuǎn)載請注明作譯者、出處,并保留本文的原始鏈接:http://www.ituring.com.cn/article/177529 董飛,Coursera數(shù)據(jù)工程師。曾先后在創(chuàng)業(yè)公司酷迅,百度基礎(chǔ)架構(gòu)組...
閱讀 1886·2021-11-15 11:39
閱讀 1091·2020-12-03 17:06
閱讀 746·2019-12-27 11:42
閱讀 3278·2019-08-30 13:59
閱讀 1474·2019-08-26 13:22
閱讀 3291·2019-08-26 12:15
閱讀 2480·2019-08-26 10:22
閱讀 1570·2019-08-23 18:40