摘要:已更新至,歸管了,因此也相應(yīng)統(tǒng)一。本文不再適用及以上版本。字段類型會非常非常奇葩。。。。但是如果體積過于龐大,很容易導(dǎo)致特別是我們一般不會給配置過高的內(nèi)存。第二個,是函數(shù)的返回值。對于而言,我們可以直接使用,來得到這個什么都沒有的東西。
Spark已更新至2.x,DataFrame歸DataSet管了,因此API也相應(yīng)統(tǒng)一。本文不再適用2.0.0及以上版本。
DataFrame原生支持直接輸出到JDBC,但如果目標(biāo)表有自增字段(比如id),那么DataFrame就不能直接進(jìn)行寫入了。因為DataFrame.write().jdbc()要求DataFrame的schema與目標(biāo)表的表結(jié)構(gòu)必須完全一致(甚至字段順序都要一致),否則會拋異常,當(dāng)然,如果你SaveMode選擇了Overwrite,那么Spark刪除你原有的表,然后根據(jù)DataFrame的Schema生成一個。。。。字段類型會非常非常奇葩。。。。
于是我們只能通過DataFrame.collect(),把整個DataFrame轉(zhuǎn)成List
翻看Spark的JDBC源碼,發(fā)現(xiàn)實際上是通過foreachPartition方法,在DataFrame每一個分區(qū)中,對每個Row的數(shù)據(jù)進(jìn)行JDBC插入,那么為什么我們就不能直接用呢?
Spark JdbcUtils.scala部分源碼:
def saveTable(df: DataFrame,url: String,table: String,properties: Properties = new Properties()) { val dialect = JdbcDialects.get(url) val nullTypes: Array[Int] = df.schema.fields.map { field => dialect.getJDBCType(field.dataType).map(_.jdbcNullType).getOrElse( field.dataType match { case IntegerType => java.sql.Types.INTEGER case LongType => java.sql.Types.BIGINT case DoubleType => java.sql.Types.DOUBLE case FloatType => java.sql.Types.REAL case ShortType => java.sql.Types.INTEGER case ByteType => java.sql.Types.INTEGER case BooleanType => java.sql.Types.BIT case StringType => java.sql.Types.CLOB case BinaryType => java.sql.Types.BLOB case TimestampType => java.sql.Types.TIMESTAMP case DateType => java.sql.Types.DATE case t: DecimalType => java.sql.Types.DECIMAL case _ => throw new IllegalArgumentException( s"Can"t translate null value for field $field") }) } val rddSchema = df.schema val driver: String = DriverRegistry.getDriverClassName(url) val getConnection: () => Connection = JDBCRDD.getConnector(driver, url, properties) // ****************** here ****************** df.foreachPartition { iterator => savePartition(getConnection, table, iterator, rddSchema, nullTypes) } }
嗯。。。既然Scala能實現(xiàn),那么作為他的爸爸,Java也應(yīng)該能玩!
我們看看foreachPartition的方法原型:
def foreachPartition(f: Iterator[Row] => Unit)
又是函數(shù)式語言最愛的匿名函數(shù)。。。非常討厭寫lambda,所以我們還是實現(xiàn)個匿名類吧。要實現(xiàn)的抽象類為:
scala.runtime.AbstractFunction1
來玩耍一下吧!
df.foreachPartition(new AbstractFunction1, BoxedUnit>() { @Override public BoxedUnit apply(Iterator it) { while (it.hasNext()){ System.out.println(it.next().toString()); } return BoxedUnit.UNIT; } });
嗯,maven complete一下,spark-submit看看~
好勒~拋異常了
org.apache.spark.SparkException: Task not serializable
Task不能被序列化
嗯哼,想想之前實現(xiàn)UDF的時候,UDF1/2/3/4...各接口,都extends Serializable,也就是說,在Spark運行期間,Driver會把UDF接口實現(xiàn)類序列化,并在Executor中反序列化,執(zhí)行call方法。。。這就不難理解了,我們foreachPartition丟進(jìn)去的類,也應(yīng)該implements Serializable。這樣,我們就得自己搞一個繼承AbstractFunction1
import org.apache.spark.sql.Row; import scala.runtime.AbstractFunction1; import scala.runtime.BoxedUnit; import java.io.Serializable; public abstract class JavaForeachPartitionFunc extends AbstractFunction1, BoxedUnit> implements Serializable { }
可是每次都要return BoxedUnit.UNIT 搞得太別扭了,沒一點Java的風(fēng)格。
import org.apache.spark.sql.Row; import scala.collection.Iterator; import scala.runtime.AbstractFunction1; import scala.runtime.BoxedUnit; import java.io.Serializable; public abstract class JavaForeachPartitionFunc extends AbstractFunction1, BoxedUnit> implements Serializable { @Override public BoxedUnit apply(Iterator it) { call(it); return BoxedUnit.UNIT; } public abstract void call(Iterator
it); }
于是我們可以直接Override call方法,就可以用滿滿Java Style的代碼去玩耍了!
df.foreachPartition(new JavaForeachPartitionFunc() { @Override public void call(Iteratorit) { while (it.hasNext()){ System.out.println(it.next().toString()); } } });
注意!我們實現(xiàn)的匿名類的方法,實際上是在executor上執(zhí)行的,所以println是輸出到executor機(jī)器的stdout上。這個我們可以通過Spark的web ui,點擊具體Application的Executor頁面去查看(調(diào)試用的虛擬機(jī)集群,手扶拖拉機(jī)一樣的配置,別吐槽了~)
至于foreach方法同理。只不過把Iterator
have fun~
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/65979.html
摘要:是中處理結(jié)構(gòu)化數(shù)據(jù)的模塊??梢詮暮芏鄶?shù)據(jù)源加載數(shù)據(jù)并構(gòu)造得到,如結(jié)構(gòu)化數(shù)據(jù)文件,中的表,外部數(shù)據(jù)庫,或者已有的。使用反射機(jī)制,推導(dǎo)包含指定類型對象的。這一功能應(yīng)該優(yōu)先于使用。隨后,將會掃描必要的列,并自動調(diào)整壓縮比例,以減少內(nèi)存占用和壓力。 Spark SQL是Spark中處理結(jié)構(gòu)化數(shù)據(jù)的模塊。與基礎(chǔ)的Spark RDD API不同,Spark SQL的接口提供了更多關(guān)于數(shù)據(jù)的結(jié)構(gòu)信息...
摘要:本文發(fā)于我的個人博客知識點大全與實戰(zhàn)我正在大數(shù)據(jù)技術(shù)派和朋友們討論有趣的話題,你也來加入吧概述什么是是用于結(jié)構(gòu)化數(shù)據(jù)處理的模塊。是最新的查詢起始點,實質(zhì)上是和的組合,所以在和上可用的在上同樣是可以使用的。 關(guān)注公眾號:大數(shù)據(jù)技術(shù)派,回復(fù)資料,領(lǐng)取1000G資料。本文發(fā)于我的個人博客:Spark SQL知識點大全...
摘要:是最新的查詢起始點,實質(zhì)上是和的組合,所以在和上可用的在上同樣是可以使用的。轉(zhuǎn)換為轉(zhuǎn)換為其實就是對的封裝,所以可以直接獲取內(nèi)部的注意此時得到的存儲類型為是具有強(qiáng)類型的數(shù)據(jù)集合,需要提供對應(yīng)的類型信息。Spark SQL概述1、什么是Spark SQLSpark SQL是Spark用于結(jié)構(gòu)化數(shù)據(jù)(structured data)處理的Spark模塊。與基本的Spark RDD API不同,Sp...
閱讀 1168·2021-11-24 09:38
閱讀 3613·2021-11-22 15:32
閱讀 3465·2019-08-30 15:54
閱讀 2574·2019-08-30 15:53
閱讀 1503·2019-08-30 15:52
閱讀 2554·2019-08-30 13:15
閱讀 1846·2019-08-29 12:21
閱讀 1405·2019-08-26 18:36