成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

Spark Java使用DataFrame的foreach/foreachPartition

Jrain / 2360人閱讀

摘要:已更新至,歸管了,因此也相應(yīng)統(tǒng)一。本文不再適用及以上版本。字段類型會非常非常奇葩。。。。但是如果體積過于龐大,很容易導(dǎo)致特別是我們一般不會給配置過高的內(nèi)存。第二個,是函數(shù)的返回值。對于而言,我們可以直接使用,來得到這個什么都沒有的東西。

Spark已更新至2.x,DataFrame歸DataSet管了,因此API也相應(yīng)統(tǒng)一。本文不再適用2.0.0及以上版本。

DataFrame原生支持直接輸出到JDBC,但如果目標(biāo)表有自增字段(比如id),那么DataFrame就不能直接進(jìn)行寫入了。因為DataFrame.write().jdbc()要求DataFrame的schema與目標(biāo)表的表結(jié)構(gòu)必須完全一致(甚至字段順序都要一致),否則會拋異常,當(dāng)然,如果你SaveMode選擇了Overwrite,那么Spark刪除你原有的表,然后根據(jù)DataFrame的Schema生成一個。。。。字段類型會非常非常奇葩。。。。
于是我們只能通過DataFrame.collect(),把整個DataFrame轉(zhuǎn)成List到Driver上,然后通過原生的JDBC方法進(jìn)行寫入。但是如果DataFrame體積過于龐大,很容易導(dǎo)致Driver OOM(特別是我們一般不會給Driver配置過高的內(nèi)存)。這個問題真的很讓人糾結(jié)。
翻看Spark的JDBC源碼,發(fā)現(xiàn)實際上是通過foreachPartition方法,在DataFrame每一個分區(qū)中,對每個Row的數(shù)據(jù)進(jìn)行JDBC插入,那么為什么我們就不能直接用呢?

Spark JdbcUtils.scala部分源碼:

  def saveTable(df: DataFrame,url: String,table: String,properties: Properties = new Properties()) {
    val dialect = JdbcDialects.get(url)
    val nullTypes: Array[Int] = df.schema.fields.map { field =>
      dialect.getJDBCType(field.dataType).map(_.jdbcNullType).getOrElse(
        field.dataType match {
          case IntegerType => java.sql.Types.INTEGER
          case LongType => java.sql.Types.BIGINT
          case DoubleType => java.sql.Types.DOUBLE
          case FloatType => java.sql.Types.REAL
          case ShortType => java.sql.Types.INTEGER
          case ByteType => java.sql.Types.INTEGER
          case BooleanType => java.sql.Types.BIT
          case StringType => java.sql.Types.CLOB
          case BinaryType => java.sql.Types.BLOB
          case TimestampType => java.sql.Types.TIMESTAMP
          case DateType => java.sql.Types.DATE
          case t: DecimalType => java.sql.Types.DECIMAL
          case _ => throw new IllegalArgumentException(
            s"Can"t translate null value for field $field")
        })
    }

    val rddSchema = df.schema
    val driver: String = DriverRegistry.getDriverClassName(url)
    val getConnection: () => Connection = JDBCRDD.getConnector(driver, url, properties)
    // ****************** here ****************** 
    df.foreachPartition { iterator =>
      savePartition(getConnection, table, iterator, rddSchema, nullTypes)
    }
  }
 

嗯。。。既然Scala能實現(xiàn),那么作為他的爸爸,Java也應(yīng)該能玩!
我們看看foreachPartition的方法原型:

def foreachPartition(f: Iterator[Row] => Unit)

又是函數(shù)式語言最愛的匿名函數(shù)。。。非常討厭寫lambda,所以我們還是實現(xiàn)個匿名類吧。要實現(xiàn)的抽象類為:
scala.runtime.AbstractFunction1,BoxedUnit> 兩個模板參數(shù),第一個很直觀,就是Row的迭代器,作為函數(shù)的參數(shù)。第二個BoxedUnit,是函數(shù)的返回值。不熟悉Scala的可能會很困惑,其實這就是Scala的void。由于Scala函數(shù)式編程的特性,代碼塊的末尾必須返回點什么,于是他們就搞出了個unit來代替本應(yīng)什么都沒有的void(解釋得可能不是很準(zhǔn)確,我是這么理解的)。對于Java而言,我們可以直接使用BoxedUnit.UNIT,來得到這個“什么都沒有”的東西。
來玩耍一下吧!

df.foreachPartition(new AbstractFunction1, BoxedUnit>() {
    @Override
    public BoxedUnit apply(Iterator it) {
        while (it.hasNext()){
            System.out.println(it.next().toString());
        }
        return BoxedUnit.UNIT;
    }
});

嗯,maven complete一下,spark-submit看看~
好勒~拋異常了
org.apache.spark.SparkException: Task not serializable
Task不能被序列化
嗯哼,想想之前實現(xiàn)UDF的時候,UDF1/2/3/4...各接口,都extends Serializable,也就是說,在Spark運行期間,Driver會把UDF接口實現(xiàn)類序列化,并在Executor中反序列化,執(zhí)行call方法。。。這就不難理解了,我們foreachPartition丟進(jìn)去的類,也應(yīng)該implements Serializable。這樣,我們就得自己搞一個繼承AbstractFunction1, BoxedUnit>,又實現(xiàn)Serializable的抽象類,給我們這些匿名類去實現(xiàn)!

import org.apache.spark.sql.Row;
import scala.runtime.AbstractFunction1;
import scala.runtime.BoxedUnit;

import java.io.Serializable;

public abstract class JavaForeachPartitionFunc extends AbstractFunction1, BoxedUnit> implements Serializable {
}

可是每次都要return BoxedUnit.UNIT 搞得太別扭了,沒一點Java的風(fēng)格。

import org.apache.spark.sql.Row;
import scala.collection.Iterator;
import scala.runtime.AbstractFunction1;
import scala.runtime.BoxedUnit;

import java.io.Serializable;

public abstract class JavaForeachPartitionFunc extends AbstractFunction1, BoxedUnit> implements Serializable {
    @Override
    public BoxedUnit apply(Iterator it) {
        call(it);
        return BoxedUnit.UNIT;
    }
    
    public abstract void call(Iterator it);
}

于是我們可以直接Override call方法,就可以用滿滿Java Style的代碼去玩耍了!

df.foreachPartition(new JavaForeachPartitionFunc() {
    @Override
    public void call(Iterator it) {
        while (it.hasNext()){
            System.out.println(it.next().toString());
        }
    }
});

注意!我們實現(xiàn)的匿名類的方法,實際上是在executor上執(zhí)行的,所以println是輸出到executor機(jī)器的stdout上。這個我們可以通過Spark的web ui,點擊具體Application的Executor頁面去查看(調(diào)試用的虛擬機(jī)集群,手扶拖拉機(jī)一樣的配置,別吐槽了~)

至于foreach方法同理。只不過把Iterator 換成 Row。具體怎么搞,慢慢玩吧~~~
have fun~

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/65979.html

相關(guān)文章

  • Spark SQL學(xué)習(xí)筆記

    摘要:是中處理結(jié)構(gòu)化數(shù)據(jù)的模塊??梢詮暮芏鄶?shù)據(jù)源加載數(shù)據(jù)并構(gòu)造得到,如結(jié)構(gòu)化數(shù)據(jù)文件,中的表,外部數(shù)據(jù)庫,或者已有的。使用反射機(jī)制,推導(dǎo)包含指定類型對象的。這一功能應(yīng)該優(yōu)先于使用。隨后,將會掃描必要的列,并自動調(diào)整壓縮比例,以減少內(nèi)存占用和壓力。 Spark SQL是Spark中處理結(jié)構(gòu)化數(shù)據(jù)的模塊。與基礎(chǔ)的Spark RDD API不同,Spark SQL的接口提供了更多關(guān)于數(shù)據(jù)的結(jié)構(gòu)信息...

    qieangel2013 評論0 收藏0
  • Spark SQL知識點大全與實戰(zhàn)

    摘要:本文發(fā)于我的個人博客知識點大全與實戰(zhàn)我正在大數(shù)據(jù)技術(shù)派和朋友們討論有趣的話題,你也來加入吧概述什么是是用于結(jié)構(gòu)化數(shù)據(jù)處理的模塊。是最新的查詢起始點,實質(zhì)上是和的組合,所以在和上可用的在上同樣是可以使用的。 關(guān)注公眾號:大數(shù)據(jù)技術(shù)派,回復(fù)資料,領(lǐng)取1000G資料。本文發(fā)于我的個人博客:Spark SQL知識點大全...

    番茄西紅柿 評論0 收藏2637
  • Spark SQL知識點與實戰(zhàn)

    摘要:是最新的查詢起始點,實質(zhì)上是和的組合,所以在和上可用的在上同樣是可以使用的。轉(zhuǎn)換為轉(zhuǎn)換為其實就是對的封裝,所以可以直接獲取內(nèi)部的注意此時得到的存儲類型為是具有強(qiáng)類型的數(shù)據(jù)集合,需要提供對應(yīng)的類型信息。Spark SQL概述1、什么是Spark SQLSpark SQL是Spark用于結(jié)構(gòu)化數(shù)據(jù)(structured data)處理的Spark模塊。與基本的Spark RDD API不同,Sp...

    番茄西紅柿 評論0 收藏2637

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<