成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

spark Dstreams-kafka數(shù)據(jù)源

IT那活兒 / 487人閱讀
spark Dstreams-kafka數(shù)據(jù)源

點擊上方“IT那活兒”,關注后了解更多內(nèi)容,不管IT什么活兒,干就完了?。?!


01


簡   介


Spark Streaming+Kafka集成在實際應用中是非常常見的,其中kafka需要是0.10.0版本及以上。Kafka 0.10的Spark Streaming集成提供了簡單的并行性、Kafka分區(qū)和Spark分區(qū)之間的1:1對應關系以及對偏移量和元數(shù)據(jù)的訪。
但是,由于較新的集成使用了新的Kafka consumer API而不是簡單的API,因此在使用上存在顯著差異。


02


案例及說明


首先需要添加依賴:
Stream中的每條記錄是一個ConsumerRecord實體。如果Spark batch持續(xù)時間大于默認的Kafka心跳會話超時(30秒),請適當增加heartbeat.interval.ms和session.timeout.ms。
于大于5分鐘的批處理,這將需要在代理上更改group.max.session.timeout.ms。
新的Kafka消費API將把消息預取到緩沖區(qū)中。因此,出于性能原因,Spark integration將緩存的使用者保留在執(zhí)行器上(而不是為每個批處理重新創(chuàng)建它們),并且更愿意在具有適當使用者的主機位置上調(diào)度分區(qū),這一點很重要。
在大多數(shù)情況下,您應該使用LocationStrategies.PreferConsistent,如上所示。這將在可用的執(zhí)行器之間均勻地分配分區(qū)。如果您的執(zhí)行者與您的Kafka代理位于相同的主機上,請使用PreferBrokers,這將更傾向于在Kafka leader上為該分區(qū)安排分區(qū)。
最后,如果分區(qū)之間的負載有明顯的偏差,請使用PreferFixed。這允許您指定分區(qū)到主機的顯式映射(任何未指定的分區(qū)都將使用一致的位置)。
消費者的緩存的默認最大大小為64.如果您希望處理超過(64 *個執(zhí)行程序數(shù))Kafka分區(qū),則可以通過spark.streaming.kafka.consumer.cache.maxCapacity更改此設置。
如果要禁用Kafka使用者的緩存,可以將spark.streaming.Kafka.consumer.cache.enabled設置為false。
緩存由topicpartition和group.id設置密鑰,因此對createDirectStream的每次調(diào)用使用多帶帶的group.id。
新的Kafka consumer API有許多不同的方法來指定主題,其中一些方法需要大量的對象實例化后設置。ConsumerStrategies提供了一個抽象,允許Spark即使在從檢查點重新啟動后也能獲得正確配置的使用者。
如上所示,Subscribe允許您訂閱固定的主題集合。SubscribePattern允許您使用正則表達式指定感興趣的主題。請注意,與0.8集成不同,使用Subscribe或SubscribePattern應該響應在運行流期間添加分區(qū)。最后,Assign允許您指定一個固定的分區(qū)集合。這三種策略都有重載構(gòu)造函數(shù),允許您指定特定分區(qū)的起始偏移量。
如果你有一個更適合批處理的用例,那么可以創(chuàng)建RDD來定義偏移范圍:
獲取偏移量:
請注意,只有在createDirectStream結(jié)果上調(diào)用的第一個方法中,而不是在隨后的方法鏈中,才能成功地將類型轉(zhuǎn)換為HasOffsetRanges。
請注意,RDD分區(qū)和Kafka分區(qū)之間的一對一映射在任何洗牌或重新分區(qū)的方法(例如reduceByKey()或window())之后都不會保留。


03


偏移量管理


kafka在失敗情況下傳輸語義取決于偏移量的存儲方式和存儲時間,spark輸出操作至少一次,如果你想只有一次輸出,則必須在冪等輸出后存儲偏移量,或者在原子事務中與輸出一起存儲偏移量,通過這種集成,為了提高可靠性,您有三個選項來存儲偏移量。
1) Checkpoint
如果啟用checkpointing,偏移量將存儲在檢查點中。這很容易實現(xiàn),但也有缺點。您的輸出操作必須是冪等的,因為您將得到重復的輸出;交易不是一種選擇。此外,如果應用程序代碼已更改,則無法從檢查點恢復。對于計劃的升級,您可以通過在舊代碼的同時運行新代碼來緩解這一問題(因為輸出無論如何都需要是冪等的,所以它們不應該沖突)。但對于需要更改代碼的計劃外故障,您將丟失數(shù)據(jù),除非您有另一種方法來識別已知良好的起始偏移量。
2)kafka自己管理偏移量
Kafka有一個特殊的topic用來存儲偏移量,默認情況下,消費者會自動定期提交偏移量,但是這肯定不是你想要的,因為輪詢期間消息可能還未輸出,這就是上面的流示例將“enable.auto.commit”設置為false的原因,但是在知道輸出已存儲后,可以手動將偏移提交到Kafka,與檢查點相比,Kafka的好處在于無論應用程序代碼如何更改,它都是一個持久的存儲。然而,卡夫卡不是事務性的,所以您的輸出仍然必須是冪等的。
3)自定義存儲
對于支持事務的數(shù)據(jù)存儲,將偏移保存在與結(jié)果相同的事務中可以使兩者保持同步,即使在失敗的情況下也是如此。
如果在檢測重復或跳過的偏移量范圍時非常小心,則回滾事務可防止重復或丟失的消息影響結(jié)果。這給出了精確一次語義的等價物,甚至對于聚合產(chǎn)生的輸出也可以使用這種策略,聚合通常很難使其成為冪等的。



本文作者:潘宗昊

本文來源:IT那活兒(上海新炬王翦團隊)

文章版權歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/129613.html

相關文章

  • Spark 快速入門

    摘要:數(shù)據(jù)科學任務主要是數(shù)據(jù)分析領域,數(shù)據(jù)科學家要負責分析數(shù)據(jù)并建模,具備統(tǒng)計預測建模機器學習等方面的經(jīng)驗,以及一定的使用或語言進行編程的能力。監(jiān)控運行時性能指標信息。 Spark Spark 背景 什么是 Spark 官網(wǎng):http://spark.apache.org Spark是一種快速、通用、可擴展的大數(shù)據(jù)分析引擎,2009年誕生于加州大學伯克利分校AMPLab,2010年開源,20...

    wangshijun 評論0 收藏0
  • Spark SQL知識點與實戰(zhàn)

    摘要:是最新的查詢起始點,實質(zhì)上是和的組合,所以在和上可用的在上同樣是可以使用的。轉(zhuǎn)換為轉(zhuǎn)換為其實就是對的封裝,所以可以直接獲取內(nèi)部的注意此時得到的存儲類型為是具有強類型的數(shù)據(jù)集合,需要提供對應的類型信息。Spark SQL概述1、什么是Spark SQLSpark SQL是Spark用于結(jié)構(gòu)化數(shù)據(jù)(structured data)處理的Spark模塊。與基本的Spark RDD API不同,Sp...

    番茄西紅柿 評論0 收藏2637

發(fā)表評論

0條評論

IT那活兒

|高級講師

TA的文章

閱讀更多
最新活動
閱讀需要支付1元查看
<