成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

Spark Streaming遇到問題分析

stormzhang / 819人閱讀

摘要:遇到問題分析之后搞了個還沒仔細(xì)了解可參考的與的有區(qū)別及并發(fā)控制先看看的,與的這幾個概念。一個可以認(rèn)為就是會最終輸出一個結(jié)果的一條由組織而成的計算。在中,我們通過使用新極大地增強(qiáng)對狀態(tài)流處理的支持。

Spark Streaming遇到問題分析

1、Spark2.0之后搞了個Structured Streaming

還沒仔細(xì)了解,可參考:https://github.com/lw-lin/Coo...

2、Spark的Job與Streaming的Job有區(qū)別及Streaming Job并發(fā)控制:

先看看Spark Streaming 的 JobSet, Job,與 Spark Core 的 Job, Stage, TaskSet, Task 這幾個概念。

[Spark Streaming]

JobSet 的全限定名是:org.apache.spark.streaming.scheduler.JobSet

Job 的全限定名是:org.apache.spark.streaming.scheduler.Job

[Spark Core]

Job 沒有一個對應(yīng)的實體類,主要是通過 jobId:Int 來表示一個具體的 job

Stage 的全限定名是:org.apache.spark.scheduler.Stage

TaskSet 的全限定名是:org.apache.spark.scheduler.TaskSet

Task 的全限定名是:org.apache.spark.scheduler.Task
Spark Core 的 Job, Stage, Task 就是我們“日?!闭?wù)?Spark任務(wù)時所說的那些含義,而且在 Spark 的 WebUI 上有非常好的體現(xiàn),比如下圖就是 1 個 Job 包含duo1 個 Stage;3 個 Stage 各包含 8, 2, 4 個 Task。而 TaskSet 則是 Spark Core 的內(nèi)部代碼里用的類,是 Task 的集合,和 Stage 是同義的。

Spark Core中:一個RDD DAG Graph可以生成一個或多個Job。一個Job可以認(rèn)為就是會最終輸出一個結(jié)果RDD的一條由RDD組織而成的計算。Job在spark里應(yīng)用里是一個被調(diào)度的單位。
Streaming中:一個batch的數(shù)據(jù)對應(yīng)一個DStreamGraph,而一個DStreamGraph包含一或多個關(guān)于DStream的輸出操作,每一個輸出對應(yīng)于一個Job,一個DStreamGraph對應(yīng)一個JobSet,里面包含一個或多個Job。用下圖表示如下:

生產(chǎn)的JobSet會提交給JobScheduler去執(zhí)行,JobScheduler包含了一個線程池,通過spark.streaming.concurrentJobs參數(shù)來控制其大小,也就是可以并發(fā)執(zhí)行的job數(shù),默認(rèn)是1.不過這個參數(shù)的設(shè)置以集群中executor機(jī)器的cpu core為準(zhǔn),比如集群中有2臺4核executor,那么spark.streaming.concurrentJobs可以設(shè)置為2x4=8. 同時你還可以控制調(diào)度策略:spark.scheduler.mode (FIFO/FAIR) 默認(rèn)是FIFO

我們的問題就是,運(yùn)行一段時間之后發(fā)現(xiàn)處理速度跟不上了,后來才發(fā)現(xiàn)原來這個參數(shù)默認(rèn)是1,而我們代碼中對于每個batch有兩個輸出操作,這樣會產(chǎn)生兩個job,而同一時間只能執(zhí)行一個job,慢慢地處理速度就跟不上生產(chǎn)速度了。所以實際中,請根據(jù)具體情況調(diào)整該參數(shù)。

此處參考:

https://github.com/lw-lin/Coo...

http://blog.csdn.net/xueba207...

http://www.jianshu.com/p/ab38...

3、 Spark Streaming緩存數(shù)據(jù)清理:

調(diào)用cache有兩種情況,一種是調(diào)用DStream.cache,第二種是RDD.cache。事實上他們是完全一樣的。

DStream的cache 動作只是將DStream的變量storageLevel 設(shè)置為MEMORY_ONLY_SER,然后在產(chǎn)生(或者獲取)RDD的時候,調(diào)用RDD的persit方法進(jìn)行設(shè)置。所以DStream.cache 產(chǎn)生的效果等價于RDD.cache(也就是你自己調(diào)用foreachRDD 將RDD 都設(shè)置一遍)

注意,當(dāng)你調(diào)用dstream.cache緩存數(shù)據(jù)的時候,Streaming在該batch處理完畢后,默認(rèn)會立即清除這個緩存,通過參數(shù)spark.streaming.unpersist 你是可以決定是否手工控制是否需要對cache住的數(shù)據(jù)進(jìn)行清理.

參考:
http://www.jianshu.com/p/f068...
https://github.com/apache/spa...

4、Streaming中維持狀態(tài)

有兩種方式:updateStateByKey和mapWithState(Spark 1.6以后新增的)。
推薦使用mapWithState, 實際使用中,我發(fā)現(xiàn)updateStateByKey會慢慢拖慢處理速度,問題描述與該情況類似:http://comments.gmane.org/gma...

許多復(fù)雜流處理流水線程序必須將狀態(tài)保持一段時間,例如,如果你想實時了解網(wǎng)站用戶行為,你需要將網(wǎng)站上各“用戶會話(user session)”信息保存為持久狀態(tài)并根據(jù)用戶的行為對這一狀態(tài)進(jìn)行持續(xù)更新。這種有狀態(tài)的流計算可以在Spark Streaming中使用updateStateByKey 方法實現(xiàn)。

在Spark 1.6 中,我們通過使用新API mapWithState極大地增強(qiáng)對狀態(tài)流處理的支持。該新的API提供了通用模式的內(nèi)置支持,而在以前使用updateStateByKey 方法來實現(xiàn)這一相同功能(如會話超時)需要進(jìn)行手動編碼和優(yōu)化。因此,mapWithState 方法較之于updateStateByKey方法,有十倍之多的性能提升。

使用mapWithState方法進(jìn)行狀態(tài)流處理
盡管現(xiàn)有DStream中updateStateByKey方法能夠允許用戶執(zhí)行狀態(tài)計算,但使用mapWithState方法能夠讓用戶更容易地表達(dá)程序邏輯,同時讓性能提升10倍之多。讓我們通過一個例子對mapWithState方法的優(yōu)勢進(jìn)行闡述。

假設(shè)我們要根據(jù)用戶歷史動作對某一網(wǎng)站的用戶行為進(jìn)行實時分析,對各個用戶,我們需要保持用戶動作的歷史信息,然后根據(jù)這些歷史信息得到用戶的行為模型并輸出到下游的數(shù)據(jù)存儲當(dāng)中。
在Spark Streaming中構(gòu)建此應(yīng)用程序時,我們首先需要獲取用戶動作流作為輸入(例如通過Kafka或Kinesis),然后使用mapWithState 方法對輸入進(jìn)行轉(zhuǎn)換操作以生成用戶模型流,最后將處理后的數(shù)據(jù)流保存到數(shù)據(jù)存儲當(dāng)中。

mapWithState方法可以通過下面的抽象方式進(jìn)行理解,假設(shè)它是將用戶動作和當(dāng)前用戶會話作為輸入的一個算子(operator),基于某個輸入動作,該算子能夠有選擇地更新用戶會話,然后輸出更新后的用戶模型作為下游操作的輸入。開發(fā)人員在定義mapWithState方法時可以指定該更新函數(shù)。

首先我們定義狀態(tài)數(shù)據(jù)結(jié)構(gòu)及狀態(tài)更新函數(shù):

def stateUpdateFunction(
userId: UserId,
newData: UserAction, 
stateData: State[UserSession]): UserModel = {

    val currentSession = stateData.get()// 獲取當(dāng)前會話數(shù)據(jù)
    val updatedSession = ...  // 使用newData計算更新后的會話
    stateData.update(updatedSession) // 更新會話數(shù)據(jù)     

    val userModel = ...  // 使用updatedSession計算模型
    return userModel   // 將模型發(fā)送給下游操作
}
// 用去動作構(gòu)成的Stream,用戶ID作為key
val userActions = ...  // key-value元組(UserId, UserAction)構(gòu)成的stream
// 待提交的數(shù)據(jù)流
val userModels = userActions.mapWithState(StateSpec.function(stateUpdateFunction))
//--------------------------------------------------------------------------------------

//java的例子
Function3, State, Void> mappingFunction =
new Function3, State, Void>() {
    @Override
    public Void call(String key,Optional value, State state) {
        // Use state.exists(), state.get(), state.update() and state.remove()
        // to manage state, and return the necessary string
        LiveInfo info=value.orNull();
        if(info!=null){
            state.update(info.getChannel()+":::"+info.getTime());
        }
        return null;
    }
};
//處理計數(shù)
samples
//先將ip作為key
.mapPartitionsToPair((v)->{
    List> list=new ArrayList<>();
    while(v.hasNext()){
        Tuple2 tmpv = v.next();
        String channelName=tmpv._1();
        String ip=tmpv._2().getIp();
        list.add(new Tuple2(ip,tmpv._2()));
    }
    return list.iterator();
})
//更新狀態(tài)
.mapWithState(
    StateSpec.function(mappingFunction)
        //4小時沒有更新則剔除
        .timeout(Durations.minutes(4*60))
)
//獲得狀態(tài)快照流
.stateSnapshots()
//后續(xù)操作

mapWithState的新特性和性能改進(jìn)

原生支持會話超時
許多基于會話的應(yīng)用程序要求具備超時機(jī)制,當(dāng)某個會話在一定的時間內(nèi)(如用戶沒有顯式地注銷而結(jié)束會話)沒有接收到新數(shù)據(jù)時就應(yīng)該將其關(guān)閉,與使用updateStateByKey方法時需要手動進(jìn)行編碼實現(xiàn)所不同的是,開發(fā)人員可以通過mapWithState方法直接指定其超時時間。

userActions.mapWithState(StateSpec.function(stateUpdateFunction).timeout(Minutes(10)))

除超時機(jī)制外,開發(fā)人員也可以設(shè)置程序啟動時的分區(qū)模式和初始狀態(tài)信息。

任意數(shù)據(jù)都能夠發(fā)送到下游
與updateStateByKey方法不同,任意數(shù)據(jù)都可以通過狀態(tài)更新函數(shù)將數(shù)據(jù)發(fā)送到下游操作,這一點(diǎn)已經(jīng)在前面的例子中有說明(例如通過用戶會話狀態(tài)返回用戶模型),此外,最新狀態(tài)的快照也能夠被訪問。

val userSessionSnapshots = userActions.mapWithState(statSpec).snapshotStream()

變量userSessionSnapshots 為一個DStream,其中各個RDD為各批(batch)數(shù)據(jù)處理后狀態(tài)更新會話的快照,該DStream與updateStateByKey方法返回的DStream是等同的。

更高的性能
最后,與updateStateByKey方法相比,使用mapWithState方法能夠得到6倍的低延遲同時維護(hù)的key狀態(tài)數(shù)量要多10倍。

此部分參考:

http://blog.csdn.net/lively19...

http://blog.csdn.net/zengxiao...

https://databricks.gitbooks.i...

5、如何理解時間窗口?時間窗口中數(shù)據(jù)是否會存在重復(fù)?


上圖里 batch duration = 1, window length = 3, sliding interval = 2
任何情況下 Job Submit 是以 batch duration 為準(zhǔn), 對于 window 操作,每隔 sliding interval 才去實際生成 RDD(每隔batch都會生成一個RDD,只是到windowDStream的時候做了合并,生成UnionRDD或者PartitionerAwareUnionRDD,最后輸出一個RDD),每次計算的結(jié)果包括 window length 個 batch 的數(shù)據(jù)。

是否會存在重復(fù)?看下面兩張圖:

答案是:取決于你怎么設(shè)置窗口的兩個參數(shù)

(窗口長度)window length – 窗口覆蓋的時間長度

(滑動距離)sliding interval – 窗口啟動的時間間隔

更深入請參考:

https://github.com/lw-lin/Coo...

http://concord.io/posts/windo...

http://www.cnblogs.com/haozhe...

6、WAL(Write Ahead Log,預(yù)寫日志)與容錯機(jī)制

WAL是在 1.2 版本中就添加的特性。作用就是,將數(shù)據(jù)通過日志的方式寫到可靠的存儲,比如 HDFS、s3,在 driver 或 worker failure 時可以從在可靠存儲上的日志文件恢復(fù)數(shù)據(jù)。WAL 在 driver 端和 executor 端都有應(yīng)用。
WAL使用在文件系統(tǒng)和數(shù)據(jù)庫用于數(shù)據(jù)操作的持久性,先把數(shù)據(jù)寫到一個持久化的日志中,然后對數(shù)據(jù)做操作,如果操作過程中系統(tǒng)掛了,恢復(fù)的時候可以重新讀取日志文件再次進(jìn)行操作。
對于像kafka和flume這些使用接收器來接收數(shù)據(jù)的數(shù)據(jù)源。接收器作為一個長時間的任務(wù)運(yùn)行在executor中,負(fù)責(zé)從數(shù)據(jù)源接收數(shù)據(jù),如果數(shù)據(jù)源支持的話,向數(shù)據(jù)源確認(rèn)接收到數(shù)據(jù),然后把數(shù)據(jù)存儲在executor的內(nèi)存中,然后driver在exector上運(yùn)行任務(wù)處理這些數(shù)據(jù)。如果wal啟用了,所有接收到的數(shù)據(jù)會保存到一個日志文件中去(HDFS), 這樣保存接收數(shù)據(jù)的持久性,此外,只有在數(shù)據(jù)寫入到log中之后接收器才向數(shù)據(jù)源確認(rèn),這樣drive重啟后那些保存在內(nèi)存中但是沒有寫入到log中的數(shù)據(jù)將會重新發(fā)送,這兩點(diǎn)保證的數(shù)據(jù)的無丟失。

啟用WAL:

給streamingContext設(shè)置checkpoint的目錄,該目錄必須是HADOOP支持的文件系統(tǒng),用來保存WAL和做Streaming的checkpoint

spark.streaming.receiver.writeAheadLog.enable 設(shè)置為true

正常流程圖:

解析

1:藍(lán)色的箭頭表示接收的數(shù)據(jù),接收器把數(shù)據(jù)流打包成塊,存儲在executor的內(nèi)存中,如果開啟了WAL,將會把數(shù)據(jù)寫入到存在容錯文件系統(tǒng)的日志文件中

2:青色的箭頭表示提醒driver, 接收到的數(shù)據(jù)塊的元信息發(fā)送給driver中的StreamingContext, 這些元數(shù)據(jù)包括:executor內(nèi)存中數(shù)據(jù)塊的引用ID和日志文件中數(shù)據(jù)塊的偏移信息

3:紅色箭頭表示處理數(shù)據(jù),每一個批處理間隔,StreamingContext使用塊信息用來生成RDD和jobs. SparkContext執(zhí)行這些job用于處理executor內(nèi)存中的數(shù)據(jù)塊

4:黃色箭頭表示checkpoint這些計算,以便于恢復(fù)。流式處理會周期的被checkpoint到文件中

當(dāng)一個失敗的driver重啟以后,恢復(fù)流程如下:

1:黃色的箭頭用于恢復(fù)計算,checkpointed的信息是用于重啟driver,重新構(gòu)造上下文和重啟所有的receiver

2: 青色箭頭恢復(fù)塊元數(shù)據(jù)信息,所有的塊信息對已恢復(fù)計算很重要

3:重新生成未完成的job(紅色箭頭),會使用到2恢復(fù)的元數(shù)據(jù)信息

4:讀取保存在日志中的塊(藍(lán)色箭頭),當(dāng)job重新執(zhí)行的時候,塊數(shù)據(jù)將會直接從日志中讀取,

5:重發(fā)沒有確認(rèn)的數(shù)據(jù)(紫色的箭頭)。緩沖的數(shù)據(jù)沒有寫到WAL中去將會被重新發(fā)送。

1、WAL在 driver 端的應(yīng)用
用于寫日志的對象 writeAheadLogOption: WriteAheadLog在 StreamingContext 中的 JobScheduler 中的 ReceiverTracker 的 ReceivedBlockTracker 構(gòu)造函數(shù)中被創(chuàng)建,ReceivedBlockTracker 用于管理已接收到的 blocks 信息。需要注意的是,這里只需要啟用 checkpoint 就可以創(chuàng)建該 driver 端的 WAL 管理實例,將 spark.streaming.receiver.writeAheadLog.enable 設(shè)置為 true。
首選需要明確的是,ReceivedBlockTracker 通過 WAL 寫入 log 文件的內(nèi)容是3種事件(當(dāng)然,會進(jìn)行序列化):

BlockAdditionEvent(receivedBlockInfo: ReceivedBlockInfo);即新增了一個 block 及該 block 的具體信息,包括 streamId、blockId、數(shù)據(jù)條數(shù)等

BatchAllocationEvent(time: Time, allocatedBlocks: AllocatedBlocks);即為某個 batchTime 分配了哪些 blocks 作為該 batch RDD 的數(shù)據(jù)源

BatchCleanupEvent(times: Seq[Time]);即清理了哪些 batchTime 對應(yīng)的 blocks

2、WAL 在 executor 端的應(yīng)用
Receiver 接收到的數(shù)據(jù)會源源不斷的傳遞給 ReceiverSupervisor,是否啟用 WAL 機(jī)制(即是否將 spark.streaming.receiver.writeAheadLog.enable 設(shè)置為 true)會影響 ReceiverSupervisor 在存儲 block 時的行為:

不啟用 WAL:你設(shè)置的StorageLevel是什么,就怎么存儲。比如MEMORY_ONLY只會在內(nèi)存中存一份,MEMORY_AND_DISK會在內(nèi)存和磁盤上各存一份等

啟用 WAL:在StorageLevel指定的存儲的基礎(chǔ)上,寫一份到 WAL 中。存儲一份在 WAL 上,更不容易丟數(shù)據(jù)但性能損失也比較大

3、WAL 使用建議
關(guān)于是否要啟用 WAL,要視具體的業(yè)務(wù)而定:

若可以接受一定的數(shù)據(jù)丟失,則不需要啟用 WAL,WAL開啟了以后會減少Spark Streaming處理數(shù)據(jù)的吞吐,因為所有接收的數(shù)據(jù)會被寫到到容錯的文件系統(tǒng)上,這樣文件系統(tǒng)的吞吐和網(wǎng)絡(luò)帶寬將成為瓶頸。

若完全不能接受數(shù)據(jù)丟失,那就需要同時啟用 checkpoint 和 WAL,checkpoint 保存著執(zhí)行進(jìn)度(比如已生成但未完成的 jobs),WAL 中保存著 blocks 及 blocks 元數(shù)據(jù)(比如保存著未完成的 jobs 對應(yīng)的 blocks 信息及 block 文件)。同時,這種情況可能要在數(shù)據(jù)源和 Streaming Application 中聯(lián)合來保證 exactly once 語義

此處參考:
http://www.jianshu.com/p/5e09...
http://www.cnblogs.com/gaoxin...

7、HDFS操作的一系列問題

1、ls操作
列出子目錄和文件(不包括嵌套層級):listStatus(path,filter)
列出所有(包括嵌套層級):listFiles(path,true)

Path file = new Path(HDFSConst.live_path);
FileStatus[] statuslist = hdfs.listStatus(file,
        (v) -> {
            return v.getName().contains(prefix)
                    && !v.getName().endsWith(HDFSConst.processSubffix);
        });
List paths = new ArrayList();
for (FileStatus status : statuslist) {
    Path tmp = new Path(status.getPath().toString());
    RemoteIterator statusIter = hdfs.listFiles(tmp,
            true);
    boolean shouldAdd = true;
    while (statusIter.hasNext()) {
        LocatedFileStatus status2 = statusIter.next();
        if (status2.getPath().toString().contains("/_temporary/")) {
            shouldAdd = false;
            break;
        }
    }
    if (shouldAdd) {
        paths.add(tmp);
    }
}

2、hadoop No FileSystem for scheme:
問題來源:
This is a typical case of the maven-assembly plugin breaking things.
Different JARs (hadoop-commons for LocalFileSystem, hadoop-hdfs for DistributedFileSystem) each contain a different file called org.apache.hadoop.fs.FileSystem in their META-INFO/services directory. This file lists the canonical classnames of the filesystem implementations they want to declare (This is called a Service Provider Interface implemented via java.util.ServiceLoader, see org.apache.hadoop.FileSystem line 2622).
When we use maven-assembly-plugin, it merges all our JARs into one, and all META-INFO/services/org.apache.hadoop.fs.FileSystem overwrite each-other. Only one of these files remains (the last one that was added). In this case, the FileSystem list from hadoop-commons overwrites the list from hadoop-hdfs, so DistributedFileSystem was no longer declared.

How we fixed it
After loading the Hadoop configuration, but just before doing anything FileSystem-related

hadoopConfig.set("fs.hdfs.impl", 
        org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()
    );
    hadoopConfig.set("fs.file.impl",
        org.apache.hadoop.fs.LocalFileSystem.class.getName()
    );

不要使用maven-assembly-plugin,使用maven shade插件:


    org.apache.maven.plugins
    maven-shade-plugin
    2.3
    
      
        package
        
          shade
        
        
          
            
          
        
      
    
  

3、Wrong FS: hdfs… expected: file:///

Configuration configuration = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://my-master:8020"), configuration);
Path filePath = new Path();
FSDataInputStream fsDataInputStream = fs.open(filePath);
BufferedReader br = new BufferedReader(new InputStreamReader(fsDataInputStream));

此處參考:
https://stackoverflow.com/que...
https://stackoverflow.com/que...
https://stackoverflow.com/que...

補(bǔ)充:關(guān)于DAG的一些概念和零散資料

RDD DAG(有向無環(huán)圖,Directed Acyclic Graph):每一個操作生成一個rdd,rdd之間連一條邊,最后這些rdd和他們之間的邊組成一個有向無環(huán)圖,就是這個dag。不只是spark,現(xiàn)在很多計算引擎都是dag模型的.(有向指的是 RDD 之間的依賴關(guān)系,無環(huán)是因為 RDD 中數(shù)據(jù)是不可變的)
在Spark作業(yè)調(diào)度系統(tǒng)中,調(diào)度的前提是判斷多個作業(yè)任務(wù)的依賴關(guān)系,這些作業(yè)任務(wù)之間可能存在因果的依賴關(guān)系,也就是說有些任務(wù)必須先獲得執(zhí)行,然后相關(guān)的依賴任務(wù)才能執(zhí)行,但是任務(wù)之間顯然不應(yīng)出現(xiàn)任何直接或間接的循環(huán)依賴關(guān)系,所以本質(zhì)上這種關(guān)系適合用DAG表示。
DAGscheduler簡單來說就是負(fù)責(zé)任務(wù)的邏輯調(diào)度,負(fù)責(zé)將作業(yè)拆分成不同階段的具有依賴關(guān)系的多批任務(wù)。DAGscheduler最重要的任務(wù)之一就是計算作業(yè)和任務(wù)的依賴關(guān)系,制定調(diào)度邏輯。

spark中rdd經(jīng)過若干次transform操作,由于transform操作是lazy的,因此,當(dāng)rdd進(jìn)行action操作時,rdd間的轉(zhuǎn)換關(guān)系也會被提交上去,得到rdd內(nèi)部的依賴關(guān)系,進(jìn)而根據(jù)依賴,劃分出不同的stage。
DAG是有向無環(huán)圖,一般用來描述任務(wù)之間的先后關(guān)系,spark中的DAG就是rdd內(nèi)部的轉(zhuǎn)換關(guān)系,這些轉(zhuǎn)換關(guān)系會被轉(zhuǎn)換成依賴關(guān)系,進(jìn)而被劃分成不同階段,從而描繪出任務(wù)的先后順序。

有向無環(huán)圖(Directed Acyclic Graph, DAG)是有向圖的一種,字面意思的理解就是圖中沒有環(huán)。常常被用來表示事件之間的驅(qū)動依賴關(guān)系,管理任務(wù)之間的調(diào)度。

在圖論中,如果一個有向圖無法從任意頂點(diǎn)出發(fā)經(jīng)過若干條邊回到該點(diǎn),則這個圖是一個有向無環(huán)圖(DAG圖)。
因為有向圖中一個點(diǎn)經(jīng)過兩種路線到達(dá)另一個點(diǎn)未必形成環(huán),因此有向無環(huán)圖未必能轉(zhuǎn)化成樹,但任何有向樹均為有向無環(huán)圖。

拓?fù)渑判蚴菍AG的頂點(diǎn)進(jìn)行排序,使得對每一條有向邊(u, v),均有u(在排序記錄中)比v先出現(xiàn)。亦可理解為對某點(diǎn)v而言,只有當(dāng)v的所有源點(diǎn)均出現(xiàn)了,v才能出現(xiàn)。

下圖給出的頂點(diǎn)排序不是拓?fù)渑判?,因為頂點(diǎn)D的鄰接點(diǎn)E比其先出現(xiàn):

DAG可用于對數(shù)學(xué)和 計算機(jī)科學(xué)中得一些不同種類的結(jié)構(gòu)進(jìn)行建模。
由于受制于某些任務(wù)必須比另一些任務(wù)較早執(zhí)行的限制,必須排序為一個隊 列的任務(wù)集合可以由一個DAG圖來呈現(xiàn),其中每個頂點(diǎn)表示一個任務(wù),每條邊表示一種限制約束,拓?fù)渑判蛩惴梢杂脕砩梢粋€有效的序列。
DAG也可以用來模擬信息沿著一個一 致性的方向通過處理器網(wǎng)絡(luò)的過程。
DAG中得可達(dá)性關(guān)系構(gòu)成了一個局 部順序,任何有限的局部順序可以由DAG使用可達(dá)性來呈現(xiàn)。
http://www.cnblogs.com/en-hen...

Spark Streaming 的 模塊 1 DAG 靜態(tài)定義 要解決的問題就是如何把計算邏輯描述為一個 RDD DAG 的“模板”,在后面 Job 動態(tài)生成的時候,針對每個 batch,都將根據(jù)這個“模板”生成一個 RDD DAG 的實例。

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/67399.html

相關(guān)文章

  • Spark Streaming 到 Apache Flink : 實時數(shù)據(jù)流在愛奇藝的演進(jìn)

    摘要:在移動端,愛奇藝月度總有效時長億小時,穩(wěn)居中國榜第三名。愛奇藝的峰值事件數(shù)達(dá)到萬秒,在正確性容錯性能延遲吞吐量擴(kuò)展性等方面均遇到不小的挑戰(zhàn)。從到愛奇藝主要使用的是和來進(jìn)行流式計算。作者:陳越晨 整理:劉河 本文將為大家介紹Apache Flink在愛奇藝的生產(chǎn)與實踐過程。你可以借此了解到愛奇藝引入Apache Flink的背景與挑戰(zhàn),以及平臺構(gòu)建化流程。主要內(nèi)容如下: 愛奇藝在實時計算方...

    econi 評論0 收藏0
  • Spark 快速入門

    摘要:數(shù)據(jù)科學(xué)任務(wù)主要是數(shù)據(jù)分析領(lǐng)域,數(shù)據(jù)科學(xué)家要負(fù)責(zé)分析數(shù)據(jù)并建模,具備統(tǒng)計預(yù)測建模機(jī)器學(xué)習(xí)等方面的經(jīng)驗,以及一定的使用或語言進(jìn)行編程的能力。監(jiān)控運(yùn)行時性能指標(biāo)信息。 Spark Spark 背景 什么是 Spark 官網(wǎng):http://spark.apache.org Spark是一種快速、通用、可擴(kuò)展的大數(shù)據(jù)分析引擎,2009年誕生于加州大學(xué)伯克利分校AMPLab,2010年開源,20...

    wangshijun 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<