spark Dstreams-程序部署
點(diǎn)擊上方“IT那活兒”,關(guān)注后了解更多內(nèi)容,不管IT什么活兒,干就完了!?。?/span>
為了運(yùn)行一個(gè)spark streaming應(yīng)用程序,需要滿(mǎn)足以下條件 :
1.1 使用集群管理器管理集群:
這是基本的要求。
1.2 打成jar包:
你必須將你的應(yīng)用程序編譯成jar包,使用spark-submit啟動(dòng)程序,然而如果你的程序使用的是高級(jí)數(shù)據(jù)源(例如kafka),你必須將kafka依賴(lài)打進(jìn)jar包。
1.3 為執(zhí)行節(jié)點(diǎn)配置足夠的內(nèi)存:
因?yàn)榻邮盏降臄?shù)據(jù)必須保存在內(nèi)存,所以執(zhí)行節(jié)點(diǎn)必須有足夠的內(nèi)存來(lái)存儲(chǔ)數(shù)據(jù),如果要執(zhí)行10分鐘的窗口操作,系統(tǒng)必須在內(nèi)存中保留至少10分鐘的數(shù)據(jù),因此應(yīng)用程序的內(nèi)存需求取決于其中使用的操作。
1.4 配置檢查點(diǎn):
如果流應(yīng)用程序需要,則必須將Hadoop API兼容容錯(cuò)存儲(chǔ)(例如HDFS、S3等)中的目錄配置為檢查點(diǎn)目錄,并且流應(yīng)用程序的寫(xiě)入方式應(yīng)確保檢查點(diǎn)信息可用于故障恢復(fù)。
1.5 配置應(yīng)用驅(qū)動(dòng)程序的的自動(dòng)重啟:
為了從驅(qū)動(dòng)程序故障中自動(dòng)修復(fù),用于運(yùn)行流應(yīng)用程序的部署基礎(chǔ)結(jié)構(gòu)必須監(jiān)視驅(qū)動(dòng)程序進(jìn)程,并在驅(qū)動(dòng)程序失敗時(shí)重新啟動(dòng)驅(qū)動(dòng)程序。不同的集群管理器有不同的工具來(lái)實(shí)現(xiàn)這一點(diǎn)。
spark standalone:可以提交spark程序以以spark standalone方式運(yùn)行,也就是說(shuō)應(yīng)用程序在一個(gè)節(jié)點(diǎn)運(yùn)行,而且可以指示standalone集群管理器監(jiān)督驅(qū)動(dòng)程序,如果驅(qū)動(dòng)程序由于非零退出代碼或運(yùn)行驅(qū)動(dòng)程序的節(jié)點(diǎn)故障而失敗,則重新啟動(dòng)它。
YARN:YARN支持自動(dòng)重啟應(yīng)用程序的類(lèi)似機(jī)制。
Mesos:Marathon已經(jīng)被用來(lái)實(shí)現(xiàn)這一目標(biāo)。
1.6 配置預(yù)寫(xiě)日志(write-ahead logs):
自spark1.2,我們已經(jīng)引入了預(yù)寫(xiě)日志以實(shí)現(xiàn)強(qiáng)大的容錯(cuò)保證,如果啟用它,所有從receiver接收到的數(shù)據(jù)都會(huì)寫(xiě)入配置檢查點(diǎn)目錄中的預(yù)寫(xiě)日志。
這可以防止驅(qū)動(dòng)程序恢復(fù)時(shí)的數(shù)據(jù)丟失,從而確保零數(shù)據(jù)丟失,可以通過(guò)設(shè)置spark.streaming.receiver.writeAheadLog.enable=true來(lái)啟用它,然而這可能以單個(gè)接收器的接收吞吐為代價(jià),但是這可以通過(guò)并行運(yùn)行更多接收器來(lái)彌補(bǔ)。
此外,建議在啟用預(yù)寫(xiě)日志時(shí)禁用spark內(nèi)接收數(shù)據(jù)的復(fù)制,因?yàn)樵撊罩疽汛鎯?chǔ)在已復(fù)制的存儲(chǔ)系統(tǒng)中,這可以通過(guò)設(shè)置存儲(chǔ)級(jí)別為StorageLevel.MEMORY_AND_DISK_SER來(lái)實(shí)現(xiàn),使用S3(或任何不支持刷新的文件系統(tǒng))進(jìn)行預(yù)寫(xiě)日志時(shí),請(qǐng)記住啟用:
spark.streaming.driver.writeAheadLog.closeFileAfterWrite
和
spark.streaming.receiver.writeAheadLog.closeFileAfterWrite。
1.7 設(shè)置最大接收速率:
如果集群資源不夠大,spark streaming應(yīng)用程序無(wú)法以接收數(shù)據(jù)的速度處理數(shù)據(jù),則可以通過(guò)設(shè)置記錄的最大速率限制來(lái)限制接收器的速率,請(qǐng)參閱:
在Spark 1.5中,我們引入了一種稱(chēng)為背壓的功能,它消除了設(shè)置此速率限制的需要,因?yàn)镾park Streaming會(huì)自動(dòng)計(jì)算速率限制,并在處理?xiàng)l件發(fā)生變化時(shí)動(dòng)態(tài)調(diào)整速率限制??赏ㄟ^(guò)將配置參數(shù)spark.streaming.backpressure.enabled設(shè)置為true來(lái)啟用此背壓。
如果你需要升級(jí)spark streaming應(yīng)用程序代碼,有兩種可能的機(jī)制。2.1 升級(jí)后的Spark Streaming應(yīng)用程序?qū)?dòng),并與現(xiàn)有應(yīng)用程序并行運(yùn)行。一旦新的(接收到與舊的相同的數(shù)據(jù))被預(yù)熱并準(zhǔn)備好進(jìn)入黃金時(shí)段,舊的就可以被取下。2.2 現(xiàn)有應(yīng)用程序正常關(guān)閉(有關(guān)正常關(guān)閉選項(xiàng),請(qǐng)參閱StreamingContext.stop(…)或JavaStreamingContext.stop(…),以確保在關(guān)閉之前完全處理已接收的數(shù)據(jù)。然后可以啟動(dòng)升級(jí)后的應(yīng)用程序,該應(yīng)用程序?qū)脑缙趹?yīng)用程序停止的同一點(diǎn)開(kāi)始處理。請(qǐng)注意,這只能通過(guò)支持源端緩沖(如Kafka)的輸入源來(lái)實(shí)現(xiàn),因?yàn)樵谏弦粋€(gè)應(yīng)用程序關(guān)閉且升級(jí)的應(yīng)用程序尚未啟動(dòng)時(shí),需要緩沖數(shù)據(jù)。無(wú)法從升級(jí)前代碼的早期檢查點(diǎn)信息重新啟動(dòng)。檢查點(diǎn)信息實(shí)質(zhì)上包含序列化的Scala/Java/Python對(duì)象,嘗試使用新的、修改過(guò)的類(lèi)反序列化對(duì)象可能會(huì)導(dǎo)致錯(cuò)誤。在這種情況下,使用不同的檢查點(diǎn)目錄啟動(dòng)升級(jí)的應(yīng)用程序,或者刪除以前的檢查點(diǎn)目錄。
除了Spark的監(jiān)控功能外,還有Spark streaming特有的其他功能。使用StreamingContext時(shí),Spark web UI會(huì)顯示一個(gè)附加的流選項(xiàng)卡,其中顯示有關(guān)正在運(yùn)行的接收器(接收器是否處于活動(dòng)狀態(tài)、接收到的記錄數(shù)、接收器錯(cuò)誤等)和已完成批次(批處理時(shí)間、排隊(duì)延遲等)的統(tǒng)計(jì)信息。這可用于監(jiān)視流應(yīng)用程序的進(jìn)度。Web UI中的兩個(gè)指標(biāo)特別重要:- processing time:處理每個(gè)批次花費(fèi)的時(shí)間
- Scheduling Delay:批在隊(duì)列里等待前一批處理完成的時(shí)間
如果批次處理時(shí)間始終大于批次間隔和/或排隊(duì)延遲持續(xù)增加,則表明系統(tǒng)無(wú)法以生成批次的速度處理批次,并且正在落后。在這種情況下,考慮減少批處理時(shí)間。還可以使用StreamingListener接口監(jiān)控Spark streaming程序的進(jìn)度,該接口允許您獲取接收器狀態(tài)和處理時(shí)間。請(qǐng)注意,這是一個(gè)開(kāi)發(fā)人員API,將來(lái)可能會(huì)對(duì)其進(jìn)行改進(jìn)(即報(bào)告更多信息)。
要從集群上的Spark流媒體應(yīng)用程序中獲得最佳性能,需要進(jìn)行一些調(diào)整。本節(jié)介紹了一些可以調(diào)整以提高應(yīng)用程序性能的參數(shù)和配置。在高層次上,你需要考慮兩件事:- 通過(guò)高效地使用群集資源,減少每批數(shù)據(jù)的處理時(shí)間。
- 設(shè)置正確的批大小,以便可以在接收數(shù)據(jù)時(shí)盡快處理數(shù)據(jù)批(即,數(shù)據(jù)處理與數(shù)據(jù)攝取保持同步)。
減少批處理時(shí)間可以在Spark中進(jìn)行許多優(yōu)化,以最大限度地縮短每個(gè)批次的處理時(shí)間。下面重點(diǎn)介紹了一些最重要的問(wèn)題。4.1 數(shù)據(jù)接收中的并行級(jí)別通過(guò)網(wǎng)絡(luò)接收數(shù)據(jù)(如kafka,socket等)需要將數(shù)據(jù)反序列化并存儲(chǔ)到spark中,如果數(shù)據(jù)接收成為系統(tǒng)中的瓶頸,那么考慮數(shù)據(jù)接收的并行化。請(qǐng)注意,每個(gè)輸入數(shù)據(jù)流都會(huì)創(chuàng)建一個(gè)接收單個(gè)數(shù)據(jù)流的接收器(在工作機(jī)器上運(yùn)行)。因此,通過(guò)創(chuàng)建多個(gè)輸入數(shù)據(jù)流并將其配置為從源接收數(shù)據(jù)流的不同分區(qū),可以實(shí)現(xiàn)接收多個(gè)數(shù)據(jù)流。例如,接收兩個(gè)主題數(shù)據(jù)的單個(gè)kafka輸入數(shù)據(jù)流可以分成兩個(gè)kafka輸入流,每個(gè)kafka輸入流只接收一個(gè)主題。這將運(yùn)行兩個(gè)接收器,允許并行接收數(shù)據(jù),從而提高了總體吞吐量。這些多個(gè)數(shù)據(jù)流可以聯(lián)合在一起以創(chuàng)建單個(gè)數(shù)據(jù)流。然后,應(yīng)用于單個(gè)輸入數(shù)據(jù)流的轉(zhuǎn)換可以應(yīng)用于統(tǒng)一流。可以這樣做:應(yīng)考慮的另一個(gè)參數(shù)是接收器的塊間隔,它由配置參數(shù)spark.streaming.blockInterval確定。對(duì)于大多數(shù)接收器,接收到的數(shù)據(jù)在存儲(chǔ)到Spark的內(nèi)存中之前會(huì)合并成數(shù)據(jù)塊。每個(gè)批處理中的塊數(shù)決定了在類(lèi)似映射的轉(zhuǎn)換中用于處理接收數(shù)據(jù)的任務(wù)數(shù)。每批每個(gè)接收器的任務(wù)數(shù)大約為(批間隔/塊間隔)。例如,200 ms的塊間隔將每2秒批創(chuàng)建10個(gè)任務(wù)。如果任務(wù)數(shù)量太少(即,少于每臺(tái)機(jī)器的核心數(shù)量),那么它將是低效的,因?yàn)樗锌捎玫暮诵亩疾粫?huì)用于處理數(shù)據(jù)。要增加給定批處理間隔的任務(wù)數(shù),請(qǐng)減少塊間隔。但是,建議的最小塊間隔值約為50 ms,低于該值,任務(wù)啟動(dòng)開(kāi)銷(xiāo)可能會(huì)出現(xiàn)問(wèn)題。使用多個(gè)輸入流/接收器接收數(shù)據(jù)的另一種方法是顯式地重新劃分輸入數(shù)據(jù)流(使用inputStream.repartition())。這將在進(jìn)一步處理之前在群集中指定數(shù)量的計(jì)算機(jī)上分發(fā)接收到的數(shù)據(jù)批。4.2 數(shù)據(jù)處理中的并行級(jí)別如果在計(jì)算的任何階段中使用的并行任務(wù)的數(shù)量不夠多,那么集群資源可能會(huì)利用不足。例如,對(duì)于reduceByKey和ReduceByAndWindow等分布式reduce操作,并行任務(wù)的默認(rèn)數(shù)量由spark.default.parallelism配置屬性控制。您可以將并行級(jí)別作為參數(shù)傳遞(請(qǐng)參閱PairDStreamFunctions文檔),或者設(shè)置spark.default.parallelism配置屬性以更改默認(rèn)值。通過(guò)調(diào)整序列化格式,可以減少數(shù)據(jù)序列化的開(kāi)銷(xiāo)。在流式傳輸?shù)那闆r下,有兩種類(lèi)型的數(shù)據(jù)可以被序列化。- 輸入數(shù)據(jù)(input data):默認(rèn)情況下,通過(guò)接收器接收的輸入數(shù)據(jù)存儲(chǔ)在具有StorageLevel.memory_DISK_SER_2的執(zhí)行器內(nèi)存中。也就是說(shuō),數(shù)據(jù)序列化為字節(jié)以減少GC開(kāi)銷(xiāo),并復(fù)制以容忍執(zhí)行器故障。此外,數(shù)據(jù)首先保存在內(nèi)存中,只有當(dāng)內(nèi)存不足以保存流計(jì)算所需的所有輸入數(shù)據(jù)時(shí),才會(huì)溢出到磁盤(pán)。這種序列化顯然有開(kāi)銷(xiāo)——接收器必須對(duì)接收到的數(shù)據(jù)進(jìn)行反序列化,并使用Spark的序列化格式對(duì)其重新序列化。
- 流式計(jì)算生成的RDD可以保存在內(nèi)存中。例如,窗口操作將數(shù)據(jù)保存在內(nèi)存中,因?yàn)樗鼈儗⒈欢啻翁幚?。但是,與Spark Core默認(rèn)的StorageLevel.MEMORY_ONLY不同,流式計(jì)算生成的持久化RDD默認(rèn)使用StorageLevel.MEMORY_ONLY_DISK(即序列化)持久化,以最小化GC開(kāi)銷(xiāo)。
在這兩種情況下,使用Kryo序列化可以減少CPU和內(nèi)存開(kāi)銷(xiāo)。在流應(yīng)用程序需要保留的數(shù)據(jù)量不大的特定情況下,可以將數(shù)據(jù)(兩種類(lèi)型)作為反序列化對(duì)象持久化,而不會(huì)產(chǎn)生過(guò)多的GC開(kāi)銷(xiāo)。例如,如果使用幾秒鐘的批處理間隔且沒(méi)有窗口操作,則可以通過(guò)顯式設(shè)置相應(yīng)的存儲(chǔ)級(jí)別來(lái)嘗試禁用持久化數(shù)據(jù)中的序列化。這將減少由于序列化而產(chǎn)生的CPU開(kāi)銷(xiāo),從而有可能在沒(méi)有太多GC開(kāi)銷(xiāo)的情況下提高性能。4.4 任務(wù)啟動(dòng)開(kāi)銷(xiāo)如果每秒啟動(dòng)的任務(wù)數(shù)很高(例如,每秒50個(gè)或更多),則向執(zhí)行者發(fā)送任務(wù)的開(kāi)銷(xiāo)可能很大,并且將很難實(shí)現(xiàn)亞秒延遲。通過(guò)以下更改可以減少開(kāi)銷(xiāo):執(zhí)行模式:在standalone模式或粗粒度Mesos模式下運(yùn)行Spark會(huì)導(dǎo)致比細(xì)粒度Mesos模式更好的任務(wù)啟動(dòng)時(shí)間。這些更改可能會(huì)將批處理時(shí)間減少100毫秒,從而允許使用次秒級(jí)的批處理大小。為了使在集群上運(yùn)行的Spark Streaming應(yīng)用程序保持穩(wěn)定,系統(tǒng)應(yīng)該能夠以接收數(shù)據(jù)的速度處理數(shù)據(jù)。換句話(huà)說(shuō),批處理速度應(yīng)該與接收數(shù)據(jù)速度一樣快,可以通過(guò)web UI監(jiān)控批數(shù)據(jù)處理時(shí)間,其應(yīng)該小于批處理間隔。根據(jù)流計(jì)算的性質(zhì),所使用的批處理間隔可能會(huì)對(duì)應(yīng)用程序在一組固定的群集資源上可以維持的數(shù)據(jù)速率產(chǎn)生重大影響。例如,讓我們考慮較早的WorddCurnNead示例。對(duì)于特定的數(shù)據(jù)速率,系統(tǒng)可能能夠每2秒(即2秒的批處理間隔)跟蹤報(bào)告字?jǐn)?shù),但不是每500毫秒一次。因此,需要設(shè)置批次間隔,以便能夠維持生產(chǎn)中的預(yù)期數(shù)據(jù)速率。為應(yīng)用程序確定正確的批處理大小的一個(gè)好方法是使用保守的批處理間隔(例如,5-10秒)和低數(shù)據(jù)速率對(duì)其進(jìn)行測(cè)試。為了驗(yàn)證系統(tǒng)是否能夠跟上數(shù)據(jù)速率,您可以檢查每個(gè)處理批次所經(jīng)歷的端到端延遲值(在Spark driver log4j日志中查找“總延遲”,或使用StreamingListener接口)。如果延遲保持與批量大小相當(dāng),則系統(tǒng)是穩(wěn)定的。否則,如果延遲持續(xù)增加,則意味著系統(tǒng)無(wú)法跟上,因此不穩(wěn)定。一旦你有了一個(gè)穩(wěn)定配置的想法,你可以嘗試增加數(shù)據(jù)速率和/或減少批量大小。請(qǐng)注意,只要延遲降低回較低值(即,小于批量大?。?,由于臨時(shí)數(shù)據(jù)速率增加而導(dǎo)致的延遲瞬時(shí)增加就可以了。我們將專(zhuān)門(mén)討論Spark流應(yīng)用程序上下文中的一些調(diào)優(yōu)參數(shù)。Spark流應(yīng)用程序所需的集群內(nèi)存量在很大程度上取決于所使用的轉(zhuǎn)換類(lèi)型。例如,如果要對(duì)最后10分鐘的數(shù)據(jù)使用窗口操作,那么集群應(yīng)該有足夠的內(nèi)存保留10分鐘的數(shù)據(jù)。或者,如果您想使用帶有大量鍵的updateStateByKey,那么所需的內(nèi)存將很高。相反,如果要執(zhí)行簡(jiǎn)單的映射過(guò)濾器存儲(chǔ)操作,則所需內(nèi)存將較低。通常,由于通過(guò)接收器接收的數(shù)據(jù)存儲(chǔ)在StorageLevel.MEMORY_AND_DISK_SER_2中,因此超過(guò)內(nèi)存的數(shù)據(jù)將溢出到磁盤(pán)。這可能會(huì)降低streaming應(yīng)用程序的性能。因此建議根據(jù)streaming應(yīng)用程序的要求提供足夠的內(nèi)存。最好嘗試在小范圍內(nèi)查看內(nèi)存使用情況,并進(jìn)行相應(yīng)的估計(jì)。內(nèi)存調(diào)優(yōu)的另一個(gè)方面是垃圾收集。對(duì)于需要低延遲的流應(yīng)用程序,不希望JVM垃圾收集導(dǎo)致暫停。有幾個(gè)參數(shù)可以幫助您調(diào)整內(nèi)存使用和GC開(kāi)銷(xiāo):- 數(shù)據(jù)流的存儲(chǔ)級(jí)別:imput data數(shù)據(jù)和RDD在默認(rèn)情況下作為序列化字節(jié)持久化。與反序列化持久化相比,這減少了內(nèi)存使用和GC開(kāi)銷(xiāo)。啟用Kryo序列化進(jìn)一步減少了序列化大小和內(nèi)存使用。
- 默認(rèn)情況下,由數(shù)據(jù)流轉(zhuǎn)換生成的所有輸入數(shù)據(jù)和持久化RDD將自動(dòng)清除。Spark Streaming根據(jù)所使用的轉(zhuǎn)換決定何時(shí)清除數(shù)據(jù)。例如,如果使用10分鐘的窗口操作,則Spark Streaming將保留最后10分鐘的數(shù)據(jù),并主動(dòng)丟棄較舊的數(shù)據(jù)。通過(guò)設(shè)置streamingContext.remember,數(shù)據(jù)可以保留更長(zhǎng)的時(shí)間。
- 強(qiáng)烈建議使用并發(fā)標(biāo)記和掃描GC,以保持GC相關(guān)暫停始終較低。盡管已知并發(fā)GC會(huì)降低系統(tǒng)的總體處理吞吐量,但仍建議使用它來(lái)實(shí)現(xiàn)更一致的批處理時(shí)間。確保在驅(qū)動(dòng)程序(使用spark submit中的--driver java選項(xiàng))和執(zhí)行器(使用spark配置spark.executor.extraJavaOptions)上設(shè)置CMS GC。
接下來(lái)討論在spark streaming應(yīng)用程序中發(fā)生故障時(shí)行為。為了理解spark streaming的容錯(cuò)語(yǔ)義,讓我們先看下RDD的基本容錯(cuò)語(yǔ)義。- RDD是一個(gè)不可變的、確定的和可重新計(jì)算的分布式數(shù)據(jù)集,每個(gè)RDD都會(huì)記住創(chuàng)建數(shù)據(jù)集的依賴(lài)。
- 如果RDD的任何分區(qū)由于工作節(jié)點(diǎn)故障而丟失,則可以使用操作依賴(lài)關(guān)系從原始容錯(cuò)數(shù)據(jù)集重新計(jì)算該分區(qū)。
- 假設(shè)所有的RDD轉(zhuǎn)換都是確定性的,那么不管Spark集群中是否出現(xiàn)故障,最終轉(zhuǎn)換的RDD中的數(shù)據(jù)都將始終相同。
Spark對(duì)HDFS或S3等容錯(cuò)文件系統(tǒng)中的數(shù)據(jù)進(jìn)行操作。因此,從容錯(cuò)數(shù)據(jù)生成的所有RDD也是容錯(cuò)的。但是,Spark streaming的情況并非如此,因?yàn)樵诖蠖鄶?shù)情況下,數(shù)據(jù)是通過(guò)網(wǎng)絡(luò)接收的(使用fileStream時(shí)除外)。為了為所有生成的RDD實(shí)現(xiàn)相同的容錯(cuò)屬性,在群集中工作節(jié)點(diǎn)的多個(gè)Spark執(zhí)行器之間復(fù)制接收到的數(shù)據(jù)(默認(rèn)復(fù)制系數(shù)為2)。這導(dǎo)致系統(tǒng)中出現(xiàn)兩種數(shù)據(jù),在發(fā)生故障時(shí)需要恢復(fù):- 接收和復(fù)制的數(shù)據(jù)—此數(shù)據(jù)在單個(gè)工作節(jié)點(diǎn)發(fā)生故障時(shí)仍然有效,因?yàn)樗母北敬嬖谟诹硪粋€(gè)節(jié)點(diǎn)上。
- 已接收但已緩沖用于復(fù)制的數(shù)據(jù)—由于未復(fù)制此數(shù)據(jù),因此恢復(fù)此數(shù)據(jù)的唯一方法是再次從源獲取它。
- 運(yùn)行executors的任何工作節(jié)點(diǎn)都可能失敗,并且這些節(jié)點(diǎn)上的所有內(nèi)存中數(shù)據(jù)都將丟失。如果任何接收器在發(fā)生故障的節(jié)點(diǎn)上運(yùn)行,則其緩沖數(shù)據(jù)將丟失。
- 如果運(yùn)行Spark Streaming應(yīng)用程序的驅(qū)動(dòng)程序節(jié)點(diǎn)出現(xiàn)故障,則SparkContext顯然會(huì)丟失,所有executor及其內(nèi)存中的數(shù)據(jù)也會(huì)丟失。
5.2 Spark streaming容錯(cuò)語(yǔ)義在所有可能的操作條件下(盡管出現(xiàn)故障等),系統(tǒng)可以提供三種類(lèi)型的保證:- 至少一次:每條記錄將被處理一次或多次。這比最多一次強(qiáng),因?yàn)樗_保不會(huì)丟失任何數(shù)據(jù)。但也可能有重復(fù)。
- 只有一次:每條記錄將精確處理一次-不會(huì)丟失任何數(shù)據(jù),也不會(huì)多次處理任何數(shù)據(jù)。這顯然是三者中最有力的保證。
在任何流處理系統(tǒng)中,廣義地說(shuō),處理數(shù)據(jù)有三個(gè)步驟。- 接收數(shù)據(jù):使用接收器或其他方式從源接收數(shù)據(jù)。
- 轉(zhuǎn)換數(shù)據(jù):使用數(shù)據(jù)流和RDD轉(zhuǎn)換處理接收的數(shù)據(jù)。
- 輸出數(shù)據(jù):最終轉(zhuǎn)換的數(shù)據(jù)輸出到外部系統(tǒng),如文件系統(tǒng)、數(shù)據(jù)庫(kù)、儀表板等。
如果流應(yīng)用程序必須實(shí)現(xiàn)端到端的精確一次保證,那么每個(gè)步驟都必須提供精確一次的保證。也就是說(shuō),每個(gè)記錄必須準(zhǔn)確接收一次,準(zhǔn)確轉(zhuǎn)換一次,并準(zhǔn)確推送到下游系統(tǒng)一次。讓我們?cè)赟park流的上下文中理解這些步驟的語(yǔ)義。- 接收數(shù)據(jù):不同的輸入源提供不同的保證,文章后面將詳細(xì)討論。
- 轉(zhuǎn)換數(shù)據(jù):由于RDD提供的保證,所有接收到的數(shù)據(jù)都將被精確地處理一次。即使出現(xiàn)故障,只要接收到的輸入數(shù)據(jù)是可訪問(wèn)的,最終轉(zhuǎn)換的RDD將始終具有相同的內(nèi)容。
- 推出數(shù)據(jù):默認(rèn)情況下,輸出操作至少確保一次語(yǔ)義,因?yàn)樗Q于輸出操作的類(lèi)型(冪等式或非冪等式)和下游系統(tǒng)的語(yǔ)義(是否支持事務(wù))。但用戶(hù)可以實(shí)現(xiàn)自己的事務(wù)機(jī)制,以實(shí)現(xiàn)精確的一次性語(yǔ)義。文章后面將詳細(xì)討論這一點(diǎn)。
5.2.3 接收數(shù)據(jù)語(yǔ)義不同的輸入源提供不同的保證,從至少一次到恰好一次不等。如果所有輸入數(shù)據(jù)都已存在于HDFS等容錯(cuò)文件系統(tǒng)中,Spark Streaming始終可以從任何故障中恢復(fù)并處理所有數(shù)據(jù)。這給出了精確的一次語(yǔ)義,這意味著所有數(shù)據(jù)將被精確地處理一次,而不管什么失敗。對(duì)于基于接收器的輸入源,容錯(cuò)語(yǔ)義取決于故障場(chǎng)景和接收器類(lèi)型。如前所述,有兩種類(lèi)型的接收器:- 可靠接收器-這些接收器僅在確保已復(fù)制接收到的數(shù)據(jù)后才確認(rèn)可靠來(lái)源。如果這樣的接收器出現(xiàn)故障,源將不會(huì)收到緩沖(未復(fù)制)數(shù)據(jù)的確認(rèn)。因此,如果接收器重新啟動(dòng),源將重新發(fā)送數(shù)據(jù),并且不會(huì)因故障而丟失任何數(shù)據(jù)。
- 不可靠的接收器-此類(lèi)接收器不發(fā)送確認(rèn),因此在由于工作人員或驅(qū)動(dòng)程序故障而失敗時(shí)可能會(huì)丟失數(shù)據(jù)。
根據(jù)所使用的接收器類(lèi)型,我們實(shí)現(xiàn)了以下語(yǔ)義:如果工作節(jié)點(diǎn)發(fā)生故障,則可靠的接收器不會(huì)丟失數(shù)據(jù)。對(duì)于不可靠的接收器,接收到但未復(fù)制的數(shù)據(jù)可能會(huì)丟失。如果驅(qū)動(dòng)程序節(jié)點(diǎn)發(fā)生故障,那么除了這些丟失之外,在內(nèi)存中接收和復(fù)制的所有過(guò)去的數(shù)據(jù)都將丟失。這將影響有狀態(tài)轉(zhuǎn)換的結(jié)果。為了避免丟失過(guò)去接收到的數(shù)據(jù),Spark 1.2引入了預(yù)寫(xiě)日志,將接收到的數(shù)據(jù)保存到容錯(cuò)存儲(chǔ)器中。通過(guò)啟用預(yù)寫(xiě)日志和可靠的接收器,可以實(shí)現(xiàn)零數(shù)據(jù)丟失。在語(yǔ)義方面,它提供了至少一次的保證。在Spark 1.3中,我們引入了一個(gè)新的Kafka Direct API,它可以確保Spark Streaming只接收一次所有Kafka數(shù)據(jù)。此外,如果實(shí)現(xiàn)一次輸出操作,則可以實(shí)現(xiàn)端到端的一次輸出保證。輸出操作(比如foreachRDD)至少有一次語(yǔ)義,也就是說(shuō),如果工作程序發(fā)生故障,轉(zhuǎn)換后的數(shù)據(jù)可能會(huì)多次寫(xiě)入外部實(shí)體。雖然這對(duì)于使用saveAs***Files操作保存到文件系統(tǒng)是可以接受的(因?yàn)槲募⒈幌嗤臄?shù)據(jù)覆蓋),但是可能需要額外的努力來(lái)實(shí)現(xiàn)一次語(yǔ)義。- 冪等更新:多次嘗試總是寫(xiě)入相同的數(shù)據(jù)。例如,saveAs***文件總是將相同的數(shù)據(jù)寫(xiě)入生成的文件。
- 事務(wù)性更新:所有更新都是以事務(wù)方式進(jìn)行的,因此更新只以原子方式進(jìn)行一次。實(shí)現(xiàn)這一點(diǎn)的一種方法是:
使用批處理時(shí)間(在foreachRDD中可用)和RDD的分區(qū)索引來(lái)創(chuàng)建標(biāo)識(shí)符。此標(biāo)識(shí)符唯一標(biāo)識(shí)流應(yīng)用程序中的數(shù)據(jù)。使用標(biāo)識(shí)符以事務(wù)方式(即原子方式)更新外部系統(tǒng)。也就是說(shuō),如果標(biāo)識(shí)符尚未提交,則以原子方式提交分區(qū)數(shù)據(jù)和標(biāo)識(shí)符。否則,如果已提交,則跳過(guò)更新。
6.1 一個(gè)數(shù)據(jù)流與一個(gè)接收器相關(guān)聯(lián)。為了實(shí)現(xiàn)讀取并行性,需要?jiǎng)?chuàng)建多個(gè)接收器,即多個(gè)數(shù)據(jù)流。接收器在執(zhí)行器中運(yùn)行。它占據(jù)一個(gè)核心。確保在預(yù)訂接收器插槽后有足夠的內(nèi)核進(jìn)行處理,即spark.cores.max應(yīng)考慮接收器插槽。接收器以循環(huán)方式分配給執(zhí)行者。6.2 當(dāng)從流源接收數(shù)據(jù)時(shí),接收器創(chuàng)建數(shù)據(jù)塊。每個(gè)塊間隔生成一個(gè)新的數(shù)據(jù)塊。在N個(gè)塊間隔內(nèi)創(chuàng)建N個(gè)數(shù)據(jù)塊。這些塊由當(dāng)前executor的塊管理器分配給其他executor的塊管理器。之后,驅(qū)動(dòng)程序上運(yùn)行的網(wǎng)絡(luò)輸入跟蹤器將被告知塊位置,以便進(jìn)一步處理。6.3 在驅(qū)動(dòng)程序(driver)上為批處理間隔期間創(chuàng)建的塊創(chuàng)建RDD。批處理間隔期間生成的塊是分區(qū)RDD。每個(gè)分區(qū)都是spark中的一項(xiàng)任務(wù)。塊間隔(blockInterval)==批處理間隔(batchinterval)意味著創(chuàng)建了單個(gè)分區(qū),并且可能在本地對(duì)其進(jìn)行處理。6.4 塊上的map任務(wù)在具有塊的executor(一個(gè)接收塊,另一個(gè)復(fù)制塊)中處理,而不考慮塊間隔,除非非本地調(diào)度開(kāi)始。擁有更大的區(qū)塊間隔意味著更大的區(qū)塊。較高的spark.locality.wait值會(huì)增加在本地節(jié)點(diǎn)上處理塊的機(jī)會(huì)。需要在這兩個(gè)參數(shù)之間找到平衡,以確保在本地處理較大的塊。6.5 您可以通過(guò)調(diào)用inputDstream.repartition(n)來(lái)定義分區(qū)的數(shù)量,而不是依賴(lài)于batchInterval和blockInterval。這將隨機(jī)重新排列RDD中的數(shù)據(jù),以創(chuàng)建n個(gè)分區(qū)。是的,為了更大的并行性。雖然是以洗牌為代價(jià)的。RDD的處理由驅(qū)動(dòng)程序的jobscheduler作為作業(yè)進(jìn)行調(diào)度。在給定的時(shí)間點(diǎn),只有一個(gè)作業(yè)處于活動(dòng)狀態(tài)。因此,如果一個(gè)作業(yè)正在執(zhí)行,其他作業(yè)將排隊(duì)。6.6 如果有兩個(gè)數(shù)據(jù)流,將形成兩個(gè)RDD,并將創(chuàng)建兩個(gè)作業(yè),這兩個(gè)作業(yè)將一個(gè)接一個(gè)地安排。為了避免這種情況,可以合并兩個(gè)數(shù)據(jù)流。這將確保為數(shù)據(jù)流的兩個(gè)RDD形成一個(gè)unionRDD。然后將此unionRDD視為單個(gè)作業(yè)。但是,RDD的分區(qū)不受影響。6.7 如果批處理時(shí)間超過(guò)batchinterval,那么很明顯,接收器的內(nèi)存將開(kāi)始填滿(mǎn),并最終引發(fā)異常(很可能是BlockNotFoundException)。目前,無(wú)法暫停接收器。使用SparkConf配置spark.streaming.receiver.maxRate,可以限制接收器的速率。
本文來(lái)源:IT那活兒(上海新炬王翦團(tuán)隊(duì))
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/129642.html