成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專(zhuān)欄INFORMATION COLUMN

spark Dstreams-程序部署

IT那活兒 / 2325人閱讀
spark Dstreams-程序部署

點(diǎn)擊上方“IT那活兒”,關(guān)注后了解更多內(nèi)容,不管IT什么活兒,干就完了!?。?/span>



01


部署條件



為了運(yùn)行一個(gè)spark streaming應(yīng)用程序,需要滿(mǎn)足以下條件 :

1.1 使用集群管理器管理集群:

這是基本的要求。

1.2 打成jar包:

你必須將你的應(yīng)用程序編譯成jar包,使用spark-submit啟動(dòng)程序,然而如果你的程序使用的是高級(jí)數(shù)據(jù)源(例如kafka),你必須將kafka依賴(lài)打進(jìn)jar包。

1.3 為執(zhí)行節(jié)點(diǎn)配置足夠的內(nèi)存:

因?yàn)榻邮盏降臄?shù)據(jù)必須保存在內(nèi)存,所以執(zhí)行節(jié)點(diǎn)必須有足夠的內(nèi)存來(lái)存儲(chǔ)數(shù)據(jù),如果要執(zhí)行10分鐘的窗口操作,系統(tǒng)必須在內(nèi)存中保留至少10分鐘的數(shù)據(jù),因此應(yīng)用程序的內(nèi)存需求取決于其中使用的操作。

1.4 配置檢查點(diǎn):

如果流應(yīng)用程序需要,則必須將Hadoop API兼容容錯(cuò)存儲(chǔ)(例如HDFS、S3等)中的目錄配置為檢查點(diǎn)目錄,并且流應(yīng)用程序的寫(xiě)入方式應(yīng)確保檢查點(diǎn)信息可用于故障恢復(fù)。

1.5 配置應(yīng)用驅(qū)動(dòng)程序的的自動(dòng)重啟:

為了從驅(qū)動(dòng)程序故障中自動(dòng)修復(fù),用于運(yùn)行流應(yīng)用程序的部署基礎(chǔ)結(jié)構(gòu)必須監(jiān)視驅(qū)動(dòng)程序進(jìn)程,并在驅(qū)動(dòng)程序失敗時(shí)重新啟動(dòng)驅(qū)動(dòng)程序。不同的集群管理器有不同的工具來(lái)實(shí)現(xiàn)這一點(diǎn)。

  • spark standalone:可以提交spark程序以以spark standalone方式運(yùn)行,也就是說(shuō)應(yīng)用程序在一個(gè)節(jié)點(diǎn)運(yùn)行,而且可以指示standalone集群管理器監(jiān)督驅(qū)動(dòng)程序,如果驅(qū)動(dòng)程序由于非零退出代碼或運(yùn)行驅(qū)動(dòng)程序的節(jié)點(diǎn)故障而失敗,則重新啟動(dòng)它。

  • YARN:YARN支持自動(dòng)重啟應(yīng)用程序的類(lèi)似機(jī)制。

  •  Mesos:Marathon已經(jīng)被用來(lái)實(shí)現(xiàn)這一目標(biāo)。

1.6 配置預(yù)寫(xiě)日志(write-ahead logs):

自spark1.2,我們已經(jīng)引入了預(yù)寫(xiě)日志以實(shí)現(xiàn)強(qiáng)大的容錯(cuò)保證,如果啟用它,所有從receiver接收到的數(shù)據(jù)都會(huì)寫(xiě)入配置檢查點(diǎn)目錄中的預(yù)寫(xiě)日志。

這可以防止驅(qū)動(dòng)程序恢復(fù)時(shí)的數(shù)據(jù)丟失,從而確保零數(shù)據(jù)丟失,可以通過(guò)設(shè)置spark.streaming.receiver.writeAheadLog.enable=true來(lái)啟用它,然而這可能以單個(gè)接收器的接收吞吐為代價(jià),但是這可以通過(guò)并行運(yùn)行更多接收器來(lái)彌補(bǔ)。

此外,建議在啟用預(yù)寫(xiě)日志時(shí)禁用spark內(nèi)接收數(shù)據(jù)的復(fù)制,因?yàn)樵撊罩疽汛鎯?chǔ)在已復(fù)制的存儲(chǔ)系統(tǒng)中,這可以通過(guò)設(shè)置存儲(chǔ)級(jí)別為StorageLevel.MEMORY_AND_DISK_SER來(lái)實(shí)現(xiàn),使用S3(或任何不支持刷新的文件系統(tǒng))進(jìn)行預(yù)寫(xiě)日志時(shí),請(qǐng)記住啟用:

spark.streaming.driver.writeAheadLog.closeFileAfterWrite

spark.streaming.receiver.writeAheadLog.closeFileAfterWrite。

1.7 設(shè)置最大接收速率:

如果集群資源不夠大,spark streaming應(yīng)用程序無(wú)法以接收數(shù)據(jù)的速度處理數(shù)據(jù),則可以通過(guò)設(shè)置記錄的最大速率限制來(lái)限制接收器的速率,請(qǐng)參閱:

  • 接收器的配置參數(shù)spark.streaming.receiver.maxRate

  • 直接kafka方法的配置參數(shù)spark.streaming.kafka.maxRatePerPartition

在Spark 1.5中,我們引入了一種稱(chēng)為背壓的功能,它消除了設(shè)置此速率限制的需要,因?yàn)镾park Streaming會(huì)自動(dòng)計(jì)算速率限制,并在處理?xiàng)l件發(fā)生變化時(shí)動(dòng)態(tài)調(diào)整速率限制??赏ㄟ^(guò)將配置參數(shù)spark.streaming.backpressure.enabled設(shè)置為true來(lái)啟用此背壓。


02


升級(jí)應(yīng)用程序代碼


如果你需要升級(jí)spark streaming應(yīng)用程序代碼,有兩種可能的機(jī)制。
2.1 升級(jí)后的Spark Streaming應(yīng)用程序?qū)?dòng),并與現(xiàn)有應(yīng)用程序并行運(yùn)行。一旦新的(接收到與舊的相同的數(shù)據(jù))被預(yù)熱并準(zhǔn)備好進(jìn)入黃金時(shí)段,舊的就可以被取下。
2.2 現(xiàn)有應(yīng)用程序正常關(guān)閉(有關(guān)正常關(guān)閉選項(xiàng),請(qǐng)參閱StreamingContext.stop(…)或JavaStreamingContext.stop(…),以確保在關(guān)閉之前完全處理已接收的數(shù)據(jù)。然后可以啟動(dòng)升級(jí)后的應(yīng)用程序,該應(yīng)用程序?qū)脑缙趹?yīng)用程序停止的同一點(diǎn)開(kāi)始處理。
請(qǐng)注意,這只能通過(guò)支持源端緩沖(如Kafka)的輸入源來(lái)實(shí)現(xiàn),因?yàn)樵谏弦粋€(gè)應(yīng)用程序關(guān)閉且升級(jí)的應(yīng)用程序尚未啟動(dòng)時(shí),需要緩沖數(shù)據(jù)。無(wú)法從升級(jí)前代碼的早期檢查點(diǎn)信息重新啟動(dòng)。
檢查點(diǎn)信息實(shí)質(zhì)上包含序列化的Scala/Java/Python對(duì)象,嘗試使用新的、修改過(guò)的類(lèi)反序列化對(duì)象可能會(huì)導(dǎo)致錯(cuò)誤。
在這種情況下,使用不同的檢查點(diǎn)目錄啟動(dòng)升級(jí)的應(yīng)用程序,或者刪除以前的檢查點(diǎn)目錄。


03


監(jiān)控應(yīng)用程序


除了Spark的監(jiān)控功能外,還有Spark streaming特有的其他功能。
使用StreamingContext時(shí),Spark web UI會(huì)顯示一個(gè)附加的流選項(xiàng)卡,其中顯示有關(guān)正在運(yùn)行的接收器(接收器是否處于活動(dòng)狀態(tài)、接收到的記錄數(shù)、接收器錯(cuò)誤等)和已完成批次(批處理時(shí)間、排隊(duì)延遲等)的統(tǒng)計(jì)信息。這可用于監(jiān)視流應(yīng)用程序的進(jìn)度。
Web UI中的兩個(gè)指標(biāo)特別重要:
  • processing time:處理每個(gè)批次花費(fèi)的時(shí)間
  • Scheduling Delay:批在隊(duì)列里等待前一批處理完成的時(shí)間
如果批次處理時(shí)間始終大于批次間隔和/或排隊(duì)延遲持續(xù)增加,則表明系統(tǒng)無(wú)法以生成批次的速度處理批次,并且正在落后。在這種情況下,考慮減少批處理時(shí)間。
還可以使用StreamingListener接口監(jiān)控Spark streaming程序的進(jìn)度,該接口允許您獲取接收器狀態(tài)和處理時(shí)間。請(qǐng)注意,這是一個(gè)開(kāi)發(fā)人員API,將來(lái)可能會(huì)對(duì)其進(jìn)行改進(jìn)(即報(bào)告更多信息)。


04


性能調(diào)整


要從集群上的Spark流媒體應(yīng)用程序中獲得最佳性能,需要進(jìn)行一些調(diào)整。
本節(jié)介紹了一些可以調(diào)整以提高應(yīng)用程序性能的參數(shù)和配置。在高層次上,你需要考慮兩件事:
  • 通過(guò)高效地使用群集資源,減少每批數(shù)據(jù)的處理時(shí)間。
  • 設(shè)置正確的批大小,以便可以在接收數(shù)據(jù)時(shí)盡快處理數(shù)據(jù)批(即,數(shù)據(jù)處理與數(shù)據(jù)攝取保持同步)。
減少批處理時(shí)間可以在Spark中進(jìn)行許多優(yōu)化,以最大限度地縮短每個(gè)批次的處理時(shí)間。
下面重點(diǎn)介紹了一些最重要的問(wèn)題。
4.1 數(shù)據(jù)接收中的并行級(jí)別
通過(guò)網(wǎng)絡(luò)接收數(shù)據(jù)(如kafka,socket等)需要將數(shù)據(jù)反序列化并存儲(chǔ)到spark中,如果數(shù)據(jù)接收成為系統(tǒng)中的瓶頸,那么考慮數(shù)據(jù)接收的并行化。
請(qǐng)注意,每個(gè)輸入數(shù)據(jù)流都會(huì)創(chuàng)建一個(gè)接收單個(gè)數(shù)據(jù)流的接收器(在工作機(jī)器上運(yùn)行)。因此,通過(guò)創(chuàng)建多個(gè)輸入數(shù)據(jù)流并將其配置為從源接收數(shù)據(jù)流的不同分區(qū),可以實(shí)現(xiàn)接收多個(gè)數(shù)據(jù)流。
例如,接收兩個(gè)主題數(shù)據(jù)的單個(gè)kafka輸入數(shù)據(jù)流可以分成兩個(gè)kafka輸入流,每個(gè)kafka輸入流只接收一個(gè)主題。這將運(yùn)行兩個(gè)接收器,允許并行接收數(shù)據(jù),從而提高了總體吞吐量。
這些多個(gè)數(shù)據(jù)流可以聯(lián)合在一起以創(chuàng)建單個(gè)數(shù)據(jù)流。然后,應(yīng)用于單個(gè)輸入數(shù)據(jù)流的轉(zhuǎn)換可以應(yīng)用于統(tǒng)一流。可以這樣做:
應(yīng)考慮的另一個(gè)參數(shù)是接收器的塊間隔,它由配置參數(shù)spark.streaming.blockInterval確定。對(duì)于大多數(shù)接收器,接收到的數(shù)據(jù)在存儲(chǔ)到Spark的內(nèi)存中之前會(huì)合并成數(shù)據(jù)塊。
每個(gè)批處理中的塊數(shù)決定了在類(lèi)似映射的轉(zhuǎn)換中用于處理接收數(shù)據(jù)的任務(wù)數(shù)。每批每個(gè)接收器的任務(wù)數(shù)大約為(批間隔/塊間隔)。
例如,200 ms的塊間隔將每2秒批創(chuàng)建10個(gè)任務(wù)。如果任務(wù)數(shù)量太少(即,少于每臺(tái)機(jī)器的核心數(shù)量),那么它將是低效的,因?yàn)樗锌捎玫暮诵亩疾粫?huì)用于處理數(shù)據(jù)。要增加給定批處理間隔的任務(wù)數(shù),請(qǐng)減少塊間隔。但是,建議的最小塊間隔值約為50 ms,低于該值,任務(wù)啟動(dòng)開(kāi)銷(xiāo)可能會(huì)出現(xiàn)問(wèn)題。
使用多個(gè)輸入流/接收器接收數(shù)據(jù)的另一種方法是顯式地重新劃分輸入數(shù)據(jù)流(使用inputStream.repartition())。這將在進(jìn)一步處理之前在群集中指定數(shù)量的計(jì)算機(jī)上分發(fā)接收到的數(shù)據(jù)批。
4.2 數(shù)據(jù)處理中的并行級(jí)別
如果在計(jì)算的任何階段中使用的并行任務(wù)的數(shù)量不夠多,那么集群資源可能會(huì)利用不足。例如,對(duì)于reduceByKey和ReduceByAndWindow等分布式reduce操作,并行任務(wù)的默認(rèn)數(shù)量由spark.default.parallelism配置屬性控制。
您可以將并行級(jí)別作為參數(shù)傳遞(請(qǐng)參閱PairDStreamFunctions文檔),或者設(shè)置spark.default.parallelism配置屬性以更改默認(rèn)值。
4.3 數(shù)據(jù)序列化
通過(guò)調(diào)整序列化格式,可以減少數(shù)據(jù)序列化的開(kāi)銷(xiāo)。在流式傳輸?shù)那闆r下,有兩種類(lèi)型的數(shù)據(jù)可以被序列化。
  • 輸入數(shù)據(jù)(input data):
    默認(rèn)情況下,通過(guò)接收器接收的輸入數(shù)據(jù)存儲(chǔ)在具有StorageLevel.memory_DISK_SER_2的執(zhí)行器內(nèi)存中。也就是說(shuō),數(shù)據(jù)序列化為字節(jié)以減少GC開(kāi)銷(xiāo),并復(fù)制以容忍執(zhí)行器故障。
    此外,數(shù)據(jù)首先保存在內(nèi)存中,只有當(dāng)內(nèi)存不足以保存流計(jì)算所需的所有輸入數(shù)據(jù)時(shí),才會(huì)溢出到磁盤(pán)。這種序列化顯然有開(kāi)銷(xiāo)——接收器必須對(duì)接收到的數(shù)據(jù)進(jìn)行反序列化,并使用Spark的序列化格式對(duì)其重新序列化。
  • 流操作生成的持久化RDD:
    流式計(jì)算生成的RDD可以保存在內(nèi)存中。例如,窗口操作將數(shù)據(jù)保存在內(nèi)存中,因?yàn)樗鼈儗⒈欢啻翁幚?。但是,與Spark Core默認(rèn)的StorageLevel.MEMORY_ONLY不同,流式計(jì)算生成的持久化RDD默認(rèn)使用StorageLevel.MEMORY_ONLY_DISK(即序列化)持久化,以最小化GC開(kāi)銷(xiāo)。
在這兩種情況下,使用Kryo序列化可以減少CPU和內(nèi)存開(kāi)銷(xiāo)。
在流應(yīng)用程序需要保留的數(shù)據(jù)量不大的特定情況下,可以將數(shù)據(jù)(兩種類(lèi)型)作為反序列化對(duì)象持久化,而不會(huì)產(chǎn)生過(guò)多的GC開(kāi)銷(xiāo)。
例如,如果使用幾秒鐘的批處理間隔且沒(méi)有窗口操作,則可以通過(guò)顯式設(shè)置相應(yīng)的存儲(chǔ)級(jí)別來(lái)嘗試禁用持久化數(shù)據(jù)中的序列化。這將減少由于序列化而產(chǎn)生的CPU開(kāi)銷(xiāo),從而有可能在沒(méi)有太多GC開(kāi)銷(xiāo)的情況下提高性能。
4.4 任務(wù)啟動(dòng)開(kāi)銷(xiāo)
如果每秒啟動(dòng)的任務(wù)數(shù)很高(例如,每秒50個(gè)或更多),則向執(zhí)行者發(fā)送任務(wù)的開(kāi)銷(xiāo)可能很大,并且將很難實(shí)現(xiàn)亞秒延遲。
通過(guò)以下更改可以減少開(kāi)銷(xiāo):
執(zhí)行模式:在standalone模式或粗粒度Mesos模式下運(yùn)行Spark會(huì)導(dǎo)致比細(xì)粒度Mesos模式更好的任務(wù)啟動(dòng)時(shí)間。
這些更改可能會(huì)將批處理時(shí)間減少100毫秒,從而允許使用次秒級(jí)的批處理大小。
4.5 設(shè)置正確的批處理間隔
為了使在集群上運(yùn)行的Spark Streaming應(yīng)用程序保持穩(wěn)定,系統(tǒng)應(yīng)該能夠以接收數(shù)據(jù)的速度處理數(shù)據(jù)。
換句話(huà)說(shuō),批處理速度應(yīng)該與接收數(shù)據(jù)速度一樣快,可以通過(guò)web UI監(jiān)控批數(shù)據(jù)處理時(shí)間,其應(yīng)該小于批處理間隔。
根據(jù)流計(jì)算的性質(zhì),所使用的批處理間隔可能會(huì)對(duì)應(yīng)用程序在一組固定的群集資源上可以維持的數(shù)據(jù)速率產(chǎn)生重大影響。
例如,讓我們考慮較早的WorddCurnNead示例。對(duì)于特定的數(shù)據(jù)速率,系統(tǒng)可能能夠每2秒(即2秒的批處理間隔)跟蹤報(bào)告字?jǐn)?shù),但不是每500毫秒一次。因此,需要設(shè)置批次間隔,以便能夠維持生產(chǎn)中的預(yù)期數(shù)據(jù)速率。
為應(yīng)用程序確定正確的批處理大小的一個(gè)好方法是使用保守的批處理間隔(例如,5-10秒)和低數(shù)據(jù)速率對(duì)其進(jìn)行測(cè)試。
為了驗(yàn)證系統(tǒng)是否能夠跟上數(shù)據(jù)速率,您可以檢查每個(gè)處理批次所經(jīng)歷的端到端延遲值(在Spark driver log4j日志中查找“總延遲”,或使用StreamingListener接口)。
如果延遲保持與批量大小相當(dāng),則系統(tǒng)是穩(wěn)定的。否則,如果延遲持續(xù)增加,則意味著系統(tǒng)無(wú)法跟上,因此不穩(wěn)定。一旦你有了一個(gè)穩(wěn)定配置的想法,你可以嘗試增加數(shù)據(jù)速率和/或減少批量大小。
請(qǐng)注意,只要延遲降低回較低值(即,小于批量大?。?,由于臨時(shí)數(shù)據(jù)速率增加而導(dǎo)致的延遲瞬時(shí)增加就可以了。
4.6 內(nèi)存調(diào)整
我們將專(zhuān)門(mén)討論Spark流應(yīng)用程序上下文中的一些調(diào)優(yōu)參數(shù)。
Spark流應(yīng)用程序所需的集群內(nèi)存量在很大程度上取決于所使用的轉(zhuǎn)換類(lèi)型。例如,如果要對(duì)最后10分鐘的數(shù)據(jù)使用窗口操作,那么集群應(yīng)該有足夠的內(nèi)存保留10分鐘的數(shù)據(jù)。
或者,如果您想使用帶有大量鍵的updateStateByKey,那么所需的內(nèi)存將很高。相反,如果要執(zhí)行簡(jiǎn)單的映射過(guò)濾器存儲(chǔ)操作,則所需內(nèi)存將較低。
通常,由于通過(guò)接收器接收的數(shù)據(jù)存儲(chǔ)在StorageLevel.MEMORY_AND_DISK_SER_2中,因此超過(guò)內(nèi)存的數(shù)據(jù)將溢出到磁盤(pán)。這可能會(huì)降低streaming應(yīng)用程序的性能。
因此建議根據(jù)streaming應(yīng)用程序的要求提供足夠的內(nèi)存。最好嘗試在小范圍內(nèi)查看內(nèi)存使用情況,并進(jìn)行相應(yīng)的估計(jì)。
內(nèi)存調(diào)優(yōu)的另一個(gè)方面是垃圾收集。對(duì)于需要低延遲的流應(yīng)用程序,不希望JVM垃圾收集導(dǎo)致暫停。
有幾個(gè)參數(shù)可以幫助您調(diào)整內(nèi)存使用和GC開(kāi)銷(xiāo):
  • 數(shù)據(jù)流的存儲(chǔ)級(jí)別:
    imput data數(shù)據(jù)和RDD在默認(rèn)情況下作為序列化字節(jié)持久化。與反序列化持久化相比,這減少了內(nèi)存使用和GC開(kāi)銷(xiāo)。啟用Kryo序列化進(jìn)一步減少了序列化大小和內(nèi)存使用。
  • 清除舊數(shù)據(jù):
    默認(rèn)情況下,由數(shù)據(jù)流轉(zhuǎn)換生成的所有輸入數(shù)據(jù)和持久化RDD將自動(dòng)清除。Spark Streaming根據(jù)所使用的轉(zhuǎn)換決定何時(shí)清除數(shù)據(jù)。例如,如果使用10分鐘的窗口操作,則Spark Streaming將保留最后10分鐘的數(shù)據(jù),并主動(dòng)丟棄較舊的數(shù)據(jù)。通過(guò)設(shè)置streamingContext.remember,數(shù)據(jù)可以保留更長(zhǎng)的時(shí)間。
  • CMS垃圾收集器:
    強(qiáng)烈建議使用并發(fā)標(biāo)記和掃描GC,以保持GC相關(guān)暫停始終較低。盡管已知并發(fā)GC會(huì)降低系統(tǒng)的總體處理吞吐量,但仍建議使用它來(lái)實(shí)現(xiàn)更一致的批處理時(shí)間。確保在驅(qū)動(dòng)程序(使用spark submit中的--driver java選項(xiàng))和執(zhí)行器(使用spark配置spark.executor.extraJavaOptions)上設(shè)置CMS GC。


05


容  錯(cuò)


接下來(lái)討論在spark streaming應(yīng)用程序中發(fā)生故障時(shí)行為。
5.1 RDD容錯(cuò)語(yǔ)義
為了理解spark streaming的容錯(cuò)語(yǔ)義,讓我們先看下RDD的基本容錯(cuò)語(yǔ)義。
  • RDD是一個(gè)不可變的、確定的和可重新計(jì)算的分布式數(shù)據(jù)集,每個(gè)RDD都會(huì)記住創(chuàng)建數(shù)據(jù)集的依賴(lài)。
  • 如果RDD的任何分區(qū)由于工作節(jié)點(diǎn)故障而丟失,則可以使用操作依賴(lài)關(guān)系從原始容錯(cuò)數(shù)據(jù)集重新計(jì)算該分區(qū)。
  • 假設(shè)所有的RDD轉(zhuǎn)換都是確定性的,那么不管Spark集群中是否出現(xiàn)故障,最終轉(zhuǎn)換的RDD中的數(shù)據(jù)都將始終相同。
Spark對(duì)HDFS或S3等容錯(cuò)文件系統(tǒng)中的數(shù)據(jù)進(jìn)行操作。
因此,從容錯(cuò)數(shù)據(jù)生成的所有RDD也是容錯(cuò)的。但是,Spark streaming的情況并非如此,因?yàn)樵诖蠖鄶?shù)情況下,數(shù)據(jù)是通過(guò)網(wǎng)絡(luò)接收的(使用fileStream時(shí)除外)。
為了為所有生成的RDD實(shí)現(xiàn)相同的容錯(cuò)屬性,在群集中工作節(jié)點(diǎn)的多個(gè)Spark執(zhí)行器之間復(fù)制接收到的數(shù)據(jù)(默認(rèn)復(fù)制系數(shù)為2)。
這導(dǎo)致系統(tǒng)中出現(xiàn)兩種數(shù)據(jù),在發(fā)生故障時(shí)需要恢復(fù):
  • 接收和復(fù)制的數(shù)據(jù)—此數(shù)據(jù)在單個(gè)工作節(jié)點(diǎn)發(fā)生故障時(shí)仍然有效,因?yàn)樗母北敬嬖谟诹硪粋€(gè)節(jié)點(diǎn)上。
  • 已接收但已緩沖用于復(fù)制的數(shù)據(jù)—由于未復(fù)制此數(shù)據(jù),因此恢復(fù)此數(shù)據(jù)的唯一方法是再次從源獲取它。
還有兩種故障需要注意:
  • 運(yùn)行executors的任何工作節(jié)點(diǎn)都可能失敗,并且這些節(jié)點(diǎn)上的所有內(nèi)存中數(shù)據(jù)都將丟失。如果任何接收器在發(fā)生故障的節(jié)點(diǎn)上運(yùn)行,則其緩沖數(shù)據(jù)將丟失。
  • 如果運(yùn)行Spark Streaming應(yīng)用程序的驅(qū)動(dòng)程序節(jié)點(diǎn)出現(xiàn)故障,則SparkContext顯然會(huì)丟失,所有executor及其內(nèi)存中的數(shù)據(jù)也會(huì)丟失。
5.2 Spark streaming容錯(cuò)語(yǔ)義
5.2.1 定義
在所有可能的操作條件下(盡管出現(xiàn)故障等),系統(tǒng)可以提供三種類(lèi)型的保證:
  • 最多一次:每條記錄要么處理一次,要么根本不處理。
  • 至少一次:每條記錄將被處理一次或多次。這比最多一次強(qiáng),因?yàn)樗_保不會(huì)丟失任何數(shù)據(jù)。但也可能有重復(fù)。
  • 只有一次:每條記錄將精確處理一次-不會(huì)丟失任何數(shù)據(jù),也不會(huì)多次處理任何數(shù)據(jù)。這顯然是三者中最有力的保證。
5.2.2 基本語(yǔ)義
在任何流處理系統(tǒng)中,廣義地說(shuō),處理數(shù)據(jù)有三個(gè)步驟。
  • 接收數(shù)據(jù):使用接收器或其他方式從源接收數(shù)據(jù)。
  • 轉(zhuǎn)換數(shù)據(jù):使用數(shù)據(jù)流和RDD轉(zhuǎn)換處理接收的數(shù)據(jù)。
  • 輸出數(shù)據(jù):最終轉(zhuǎn)換的數(shù)據(jù)輸出到外部系統(tǒng),如文件系統(tǒng)、數(shù)據(jù)庫(kù)、儀表板等。
如果流應(yīng)用程序必須實(shí)現(xiàn)端到端的精確一次保證,那么每個(gè)步驟都必須提供精確一次的保證。也就是說(shuō),每個(gè)記錄必須準(zhǔn)確接收一次,準(zhǔn)確轉(zhuǎn)換一次,并準(zhǔn)確推送到下游系統(tǒng)一次。讓我們?cè)赟park流的上下文中理解這些步驟的語(yǔ)義。
  • 接收數(shù)據(jù):不同的輸入源提供不同的保證,文章后面將詳細(xì)討論。
  • 轉(zhuǎn)換數(shù)據(jù):由于RDD提供的保證,所有接收到的數(shù)據(jù)都將被精確地處理一次。即使出現(xiàn)故障,只要接收到的輸入數(shù)據(jù)是可訪問(wèn)的,最終轉(zhuǎn)換的RDD將始終具有相同的內(nèi)容。
  • 推出數(shù)據(jù):默認(rèn)情況下,輸出操作至少確保一次語(yǔ)義,因?yàn)樗Q于輸出操作的類(lèi)型(冪等式或非冪等式)和下游系統(tǒng)的語(yǔ)義(是否支持事務(wù))。但用戶(hù)可以實(shí)現(xiàn)自己的事務(wù)機(jī)制,以實(shí)現(xiàn)精確的一次性語(yǔ)義。文章后面將詳細(xì)討論這一點(diǎn)。
5.2.3 接收數(shù)據(jù)語(yǔ)義
不同的輸入源提供不同的保證,從至少一次到恰好一次不等。
File source
如果所有輸入數(shù)據(jù)都已存在于HDFS等容錯(cuò)文件系統(tǒng)中,Spark Streaming始終可以從任何故障中恢復(fù)并處理所有數(shù)據(jù)。這給出了精確的一次語(yǔ)義,這意味著所有數(shù)據(jù)將被精確地處理一次,而不管什么失敗。
基于接收器的source
對(duì)于基于接收器的輸入源,容錯(cuò)語(yǔ)義取決于故障場(chǎng)景和接收器類(lèi)型。如前所述,有兩種類(lèi)型的接收器:
  • 可靠接收器-這些接收器僅在確保已復(fù)制接收到的數(shù)據(jù)后才確認(rèn)可靠來(lái)源。如果這樣的接收器出現(xiàn)故障,源將不會(huì)收到緩沖(未復(fù)制)數(shù)據(jù)的確認(rèn)。
    因此,如果接收器重新啟動(dòng),源將重新發(fā)送數(shù)據(jù),并且不會(huì)因故障而丟失任何數(shù)據(jù)。
  • 不可靠的接收器-此類(lèi)接收器不發(fā)送確認(rèn),因此在由于工作人員或驅(qū)動(dòng)程序故障而失敗時(shí)可能會(huì)丟失數(shù)據(jù)。
根據(jù)所使用的接收器類(lèi)型,我們實(shí)現(xiàn)了以下語(yǔ)義:
如果工作節(jié)點(diǎn)發(fā)生故障,則可靠的接收器不會(huì)丟失數(shù)據(jù)。對(duì)于不可靠的接收器,接收到但未復(fù)制的數(shù)據(jù)可能會(huì)丟失。
如果驅(qū)動(dòng)程序節(jié)點(diǎn)發(fā)生故障,那么除了這些丟失之外,在內(nèi)存中接收和復(fù)制的所有過(guò)去的數(shù)據(jù)都將丟失。這將影響有狀態(tài)轉(zhuǎn)換的結(jié)果。
為了避免丟失過(guò)去接收到的數(shù)據(jù),Spark 1.2引入了預(yù)寫(xiě)日志,將接收到的數(shù)據(jù)保存到容錯(cuò)存儲(chǔ)器中。
通過(guò)啟用預(yù)寫(xiě)日志和可靠的接收器,可以實(shí)現(xiàn)零數(shù)據(jù)丟失。在語(yǔ)義方面,它提供了至少一次的保證。
下表總結(jié)了故障下的語(yǔ)義:
在Spark 1.3中,我們引入了一個(gè)新的Kafka Direct API,它可以確保Spark Streaming只接收一次所有Kafka數(shù)據(jù)。此外,如果實(shí)現(xiàn)一次輸出操作,則可以實(shí)現(xiàn)端到端的一次輸出保證。
5.2.4 輸出操作語(yǔ)義
輸出操作(比如foreachRDD)至少有一次語(yǔ)義,也就是說(shuō),如果工作程序發(fā)生故障,轉(zhuǎn)換后的數(shù)據(jù)可能會(huì)多次寫(xiě)入外部實(shí)體。
雖然這對(duì)于使用saveAs***Files操作保存到文件系統(tǒng)是可以接受的(因?yàn)槲募⒈幌嗤臄?shù)據(jù)覆蓋),但是可能需要額外的努力來(lái)實(shí)現(xiàn)一次語(yǔ)義。
有兩種方法:
  • 冪等更新:多次嘗試總是寫(xiě)入相同的數(shù)據(jù)。例如,saveAs***文件總是將相同的數(shù)據(jù)寫(xiě)入生成的文件。
  • 事務(wù)性更新:所有更新都是以事務(wù)方式進(jìn)行的,因此更新只以原子方式進(jìn)行一次。實(shí)現(xiàn)這一點(diǎn)的一種方法是:
使用批處理時(shí)間(在foreachRDD中可用)和RDD的分區(qū)索引來(lái)創(chuàng)建標(biāo)識(shí)符。此標(biāo)識(shí)符唯一標(biāo)識(shí)流應(yīng)用程序中的數(shù)據(jù)。
使用標(biāo)識(shí)符以事務(wù)方式(即原子方式)更新外部系統(tǒng)。也就是說(shuō),如果標(biāo)識(shí)符尚未提交,則以原子方式提交分區(qū)數(shù)據(jù)和標(biāo)識(shí)符。否則,如果已提交,則跳過(guò)更新。


06


注  意


6.1 一個(gè)數(shù)據(jù)流與一個(gè)接收器相關(guān)聯(lián)。
為了實(shí)現(xiàn)讀取并行性,需要?jiǎng)?chuàng)建多個(gè)接收器,即多個(gè)數(shù)據(jù)流。接收器在執(zhí)行器中運(yùn)行。它占據(jù)一個(gè)核心。確保在預(yù)訂接收器插槽后有足夠的內(nèi)核進(jìn)行處理,即spark.cores.max應(yīng)考慮接收器插槽。接收器以循環(huán)方式分配給執(zhí)行者。
6.2 當(dāng)從流源接收數(shù)據(jù)時(shí),接收器創(chuàng)建數(shù)據(jù)塊。
每個(gè)塊間隔生成一個(gè)新的數(shù)據(jù)塊。在N個(gè)塊間隔內(nèi)創(chuàng)建N個(gè)數(shù)據(jù)塊。這些塊由當(dāng)前executor的塊管理器分配給其他executor的塊管理器。之后,驅(qū)動(dòng)程序上運(yùn)行的網(wǎng)絡(luò)輸入跟蹤器將被告知塊位置,以便進(jìn)一步處理。
6.3 在驅(qū)動(dòng)程序(driver)上為批處理間隔期間創(chuàng)建的塊創(chuàng)建RDD。
批處理間隔期間生成的塊是分區(qū)RDD。每個(gè)分區(qū)都是spark中的一項(xiàng)任務(wù)。塊間隔(blockInterval)==批處理間隔(batchinterval)意味著創(chuàng)建了單個(gè)分區(qū),并且可能在本地對(duì)其進(jìn)行處理。
6.4 塊上的map任務(wù)在具有塊的executor(一個(gè)接收塊,另一個(gè)復(fù)制塊)中處理,而不考慮塊間隔,除非非本地調(diào)度開(kāi)始。
擁有更大的區(qū)塊間隔意味著更大的區(qū)塊。較高的spark.locality.wait值會(huì)增加在本地節(jié)點(diǎn)上處理塊的機(jī)會(huì)。需要在這兩個(gè)參數(shù)之間找到平衡,以確保在本地處理較大的塊。
6.5 您可以通過(guò)調(diào)用inputDstream.repartition(n)來(lái)定義分區(qū)的數(shù)量,而不是依賴(lài)于batchInterval和blockInterval。
這將隨機(jī)重新排列RDD中的數(shù)據(jù),以創(chuàng)建n個(gè)分區(qū)。是的,為了更大的并行性。雖然是以洗牌為代價(jià)的。RDD的處理由驅(qū)動(dòng)程序的jobscheduler作為作業(yè)進(jìn)行調(diào)度。在給定的時(shí)間點(diǎn),只有一個(gè)作業(yè)處于活動(dòng)狀態(tài)。因此,如果一個(gè)作業(yè)正在執(zhí)行,其他作業(yè)將排隊(duì)。
6.6 如果有兩個(gè)數(shù)據(jù)流,將形成兩個(gè)RDD,并將創(chuàng)建兩個(gè)作業(yè),這兩個(gè)作業(yè)將一個(gè)接一個(gè)地安排。
為了避免這種情況,可以合并兩個(gè)數(shù)據(jù)流。這將確保為數(shù)據(jù)流的兩個(gè)RDD形成一個(gè)unionRDD。然后將此unionRDD視為單個(gè)作業(yè)。但是,RDD的分區(qū)不受影響。
6.7 如果批處理時(shí)間超過(guò)batchinterval,那么很明顯,接收器的內(nèi)存將開(kāi)始填滿(mǎn),并最終引發(fā)異常(很可能是BlockNotFoundException)。
目前,無(wú)法暫停接收器。使用SparkConf配置spark.streaming.receiver.maxRate,可以限制接收器的速率。



本文作者:潘宗昊

本文來(lái)源:IT那活兒(上海新炬王翦團(tuán)隊(duì))

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/129642.html

相關(guān)文章

  • SparkStreaming概述

    摘要:但在企業(yè)中存在很多實(shí)時(shí)性處理的需求,例如雙十一的京東阿里,通常會(huì)做一個(gè)實(shí)時(shí)的數(shù)據(jù)大屏,顯示實(shí)時(shí)訂單。這種情況下,對(duì)數(shù)據(jù)實(shí)時(shí)性要求較高,僅僅能夠容忍到延遲分鐘或幾秒鐘。1 Spark Streaming是什么它是一個(gè)可擴(kuò)展,高吞吐具有容錯(cuò)性的流式計(jì)算框架吞吐量:?jiǎn)挝粫r(shí)間內(nèi)成功傳輸數(shù)據(jù)的數(shù)量之前我們接觸的spark-core和spark-sql都是處理屬于離線(xiàn)批處理任務(wù),數(shù)據(jù)一般都是在固定位置上...

    Tecode 評(píng)論0 收藏0
  • Spark Streaming學(xué)習(xí)筆記

    摘要:輸入和接收器輸入代表從某種流式數(shù)據(jù)源流入的數(shù)據(jù)流。文件數(shù)據(jù)流可以從任何兼容包括等的文件系統(tǒng),創(chuàng)建方式如下將監(jiān)視該目錄,并處理該目錄下任何新建的文件目前還不支持嵌套目錄。會(huì)被一個(gè)個(gè)依次推入隊(duì)列,而則會(huì)依次以數(shù)據(jù)流形式處理這些的數(shù)據(jù)。 特點(diǎn): Spark Streaming能夠?qū)崿F(xiàn)對(duì)實(shí)時(shí)數(shù)據(jù)流的流式處理,并具有很好的可擴(kuò)展性、高吞吐量和容錯(cuò)性。 Spark Streaming支持從多種數(shù)...

    陸斌 評(píng)論0 收藏0
  • Spark 快速入門(mén)

    摘要:數(shù)據(jù)科學(xué)任務(wù)主要是數(shù)據(jù)分析領(lǐng)域,數(shù)據(jù)科學(xué)家要負(fù)責(zé)分析數(shù)據(jù)并建模,具備統(tǒng)計(jì)預(yù)測(cè)建模機(jī)器學(xué)習(xí)等方面的經(jīng)驗(yàn),以及一定的使用或語(yǔ)言進(jìn)行編程的能力。監(jiān)控運(yùn)行時(shí)性能指標(biāo)信息。 Spark Spark 背景 什么是 Spark 官網(wǎng):http://spark.apache.org Spark是一種快速、通用、可擴(kuò)展的大數(shù)據(jù)分析引擎,2009年誕生于加州大學(xué)伯克利分校AMPLab,2010年開(kāi)源,20...

    wangshijun 評(píng)論0 收藏0
  • IntelliJ IDEA Windows下Spark開(kāi)發(fā)環(huán)境部署

    摘要:運(yùn)行數(shù)據(jù)準(zhǔn)備隨便準(zhǔn)備一個(gè)文檔格式不限,上傳到上。解決因?yàn)樵瓉?lái)是用的版本為相應(yīng)的依賴(lài)包官網(wǎng)已經(jīng)不再支持,所以更新的平臺(tái)的環(huán)境為,相應(yīng)的文檔很少,更改版本為。星期六星期一由為知筆記遷移到。 0x01 環(huán)境說(shuō)明 博客地址:http://www.cnblogs.com/ning-wang/p/7359977.html 1.1 本地 OS: windows 10jdk: jdk1.8.0_121...

    DevWiki 評(píng)論0 收藏0
  • Spark入門(mén)階段一之掃盲筆記

    摘要:同時(shí)集成了機(jī)器學(xué)習(xí)類(lèi)庫(kù)?;谟?jì)算框架,將的分布式計(jì)算應(yīng)用到機(jī)器學(xué)習(xí)領(lǐng)域。提供了一個(gè)簡(jiǎn)單的聲明方法指定機(jī)器學(xué)習(xí)任務(wù),并且動(dòng)態(tài)地選擇最優(yōu)的學(xué)習(xí)算法。宣稱(chēng)其性能是的多倍。 介紹 spark是分布式并行數(shù)據(jù)處理框架 與mapreduce的區(qū)別: mapreduce通常將中間結(jié)果放在hdfs上,spark是基于內(nèi)存并行大數(shù)據(jù)框架,中間結(jié)果放在內(nèi)存,對(duì)于迭代數(shù)據(jù)spark效率更高,mapred...

    starsfun 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<