成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

Flink實(shí)現(xiàn)批處理離線計(jì)算

IT那活兒 / 3331人閱讀
Flink實(shí)現(xiàn)批處理離線計(jì)算





  引  言  



筆者的項(xiàng)目,一直是用flink進(jìn)行流處理,為什么這次寫flink批處理離線計(jì)算呢,因?yàn)榭蛻籼岬男滦枨蟾m合用批處理離線計(jì)算:
  1. 客戶不需要實(shí)時(shí)計(jì)算的結(jié)果,認(rèn)為實(shí)時(shí)結(jié)果對(duì)他們沒有參考意義

  2. 客戶要求計(jì)算一個(gè)月(或更長時(shí)間)內(nèi)歷史數(shù)據(jù)的基準(zhǔn)值,認(rèn)為一個(gè)月(或更長時(shí)間)歷史數(shù)據(jù)得到的基準(zhǔn)值才能更加準(zhǔn)確的評(píng)估和預(yù)測未來的趨勢


一 . 批處理介紹

大數(shù)據(jù)計(jì)算分為離線計(jì)算和實(shí)時(shí)計(jì)算,離線計(jì)算就是我們通常說的批計(jì)算,代表是Hadoop MapReduce、Hive等大數(shù)據(jù)技術(shù),實(shí)時(shí)計(jì)算也被稱作流計(jì)算,代表是Storm、Spark Streaming、Flink等大數(shù)據(jù)技術(shù)。

批處理在大數(shù)據(jù)世界有著悠久的歷史。批處理主要操作大容量靜態(tài)數(shù)據(jù)集,并在計(jì)算過程完成后返回結(jié)果。

批處理模式中使用的數(shù)據(jù)集通常符合下列特征:
有界:批處理數(shù)據(jù)集代表數(shù)據(jù)的有限集合
持久:數(shù)據(jù)通常始終存儲(chǔ)在某種類型的持久存儲(chǔ)位置中
大量:海量數(shù)據(jù)集

批處理非常適合需要訪問全套記錄才能完成的計(jì)算工作。例如在計(jì)算總數(shù)和平均數(shù)時(shí),必須將數(shù)據(jù)集作為一個(gè)整體加以處理,而不能將其視作多條記錄的集合。這些操作要求在計(jì)算進(jìn)行過程中數(shù)據(jù)維持自己的狀態(tài)。

需要處理大量數(shù)據(jù)的任務(wù)通常最適合用批處理操作進(jìn)行處理。無論直接從持久存儲(chǔ)設(shè)備處理數(shù)據(jù)集,或首先將數(shù)據(jù)集載入內(nèi)存,批處理系統(tǒng)在設(shè)計(jì)過程中就充分考慮了數(shù)據(jù)的量,可提供充足的處理資源。由于批處理在應(yīng)對(duì)大量持久數(shù)據(jù)方面的表現(xiàn)極為出色,因此經(jīng)常被用于對(duì)歷史數(shù)據(jù)進(jìn)行分析。


二. 批處理vs流處理

相信各位已經(jīng)看過諸多流計(jì)算優(yōu)點(diǎn)的文章,流計(jì)算的優(yōu)點(diǎn)筆者就略過, 下面說一下批處理相比流處理的優(yōu)點(diǎn)。


1. 批處理的吞吐量大、資源利用率高

由于批量和流式處理數(shù)據(jù)粒度不一樣,批量每次處理一定大小的數(shù)據(jù)塊(輸入一般采用文件系統(tǒng)),一個(gè)任務(wù)處理完一個(gè)數(shù)據(jù)塊之后,才將處理好的中間數(shù)據(jù)發(fā)送給下游。流式計(jì)算則是以單條記錄為單位,任務(wù)在處理完一條記錄之后,然后發(fā)送給下游進(jìn)行處理。流式計(jì)算來一條記錄就計(jì)算一次,計(jì)算量巨大,當(dāng)不需要中間值的時(shí)候,這種計(jì)算屬實(shí)浪費(fèi),因此批處理的吞吐量更大、資源利用率更高、系統(tǒng)的開銷更小。


2. 批處理容易實(shí)現(xiàn)精準(zhǔn)計(jì)算

流處理數(shù)據(jù)丟失和重復(fù)處理

精確一次(exactly once)是指數(shù)據(jù)處理沒有數(shù)據(jù)丟失和重復(fù)處理的現(xiàn)象。

流處理的數(shù)據(jù)來源一般是消息隊(duì)列,是無界的,數(shù)據(jù)是一條一條獲取,在加載數(shù)據(jù)時(shí)可能會(huì)出現(xiàn)網(wǎng)絡(luò)連接等問題,所以流處理需要解決數(shù)據(jù)丟失和重復(fù)處理的問題,實(shí)現(xiàn)精確一次(exactly once)的語義相對(duì)復(fù)雜,目前storm流框架目前不支持(exactly once),spark為了支持(exactly once)引入預(yù)寫日志(AWL)并且offset由Spark自身管理 ,flink為了支持(exactly once)引入快照(snapshot)機(jī)制, 雖然流處理能夠解決數(shù)據(jù)丟失和重復(fù)計(jì)算問題,但需要引入各種機(jī)制,而這增加了系統(tǒng)消耗的資源。

批處理的數(shù)據(jù)源是靜態(tài)塊,比如文件,hdfs文件,批處理一次性加載一批數(shù)據(jù),基本不會(huì)出現(xiàn)數(shù)據(jù)丟失和重復(fù)計(jì)算的情況。

流處理水印(watermark)忽略數(shù)據(jù)

如果說流處理引入各種機(jī)制增加資源消耗可以解決數(shù)據(jù)丟失和重復(fù)處理問題,那么對(duì)于亂序數(shù)據(jù)流則存在忽略數(shù)據(jù)的可能。

流處理數(shù)據(jù)沒有邊界,需要窗口(window)的概念,根據(jù)窗口來匯總計(jì)算。窗口(window)類型有很多種, 滾動(dòng)窗口(Tumbling Window)、滑動(dòng)窗口(Sliding Window)和會(huì)話窗口(Session window)等,窗口(window)中需要定義時(shí)間,流處理中存在事件時(shí)間(event time)和處理時(shí)間(process time)。對(duì)于亂序的數(shù)據(jù),為此又引入了水印(watermark)機(jī)制。具體概念讀者自行查閱。

水印(watermark)有一個(gè)允許延時(shí)(allow lateness)的參數(shù), 窗口(window)接收到水印(watermark)后,再等待一段時(shí)間才會(huì)關(guān)閉窗口,如果這段時(shí)間有些數(shù)據(jù)依然沒有發(fā)送過來,那就只能忽略它們了。允許延時(shí)(allow lateness)參數(shù)設(shè)置的大,系統(tǒng)占用的資源就多,而且允許延時(shí)(allow lateness)的參數(shù)不能設(shè)置無限大,因此如果數(shù)據(jù)源異常亂序,流處理的窗口就等不到延時(shí)數(shù)據(jù)過來就進(jìn)行匯總計(jì)算,導(dǎo)致延時(shí)數(shù)據(jù)未處理。

批處理數(shù)據(jù)有界,所有的數(shù)據(jù)全部都會(huì)加載,不用考慮數(shù)據(jù)源的順序,不會(huì)出現(xiàn)忽略數(shù)據(jù)的情況,也不需要窗口(window) ,時(shí)間,水印等機(jī)制。


三. Flink實(shí)現(xiàn)批處理離線計(jì)算

通過上面簡單的介紹和對(duì)比,發(fā)現(xiàn)客戶的需求更適合用批處理離線計(jì)算,由于Flink是一個(gè)流處理框架, 可以處理有邊界和無邊界的數(shù)據(jù)流。無邊界的數(shù)據(jù)就是流數(shù)據(jù),有邊界的數(shù)據(jù)就是批數(shù)據(jù),因此Flink也是支持批處理的。所以筆者采用Flink進(jìn)行批處理計(jì)算。

以下是核心代碼:

flink執(zhí)行環(huán)境:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

從kafka中獲取DataSet數(shù)據(jù)源,DataSet表示批處理的數(shù)據(jù) (流處理是DataStream):

DataSetString, String>> recordsDataSetDataSet = env.createInput(KafkaInputFormat
        .buildKafkaInputFormat().setBootstrapServers(KafkaServers)
        .setGroupId(xx).setTopic(sourceTopic).finish());

繼承GenericInputFormat類實(shí)現(xiàn)自定義獲取kafka數(shù)據(jù)源 KafkaInputFormat:

public class KafkaInputFormat extends GenericInputFormatString, String>> {
  @Override
  public void open(GenericInputSplit split) throws IOException {
        consumer = new KafkaConsumer<String,String>(props);
        initPartionMap();
  }
  //獲取kafka topic每個(gè)分區(qū)的偏移量,用做kafka消費(fèi)結(jié)束的標(biāo)識(shí)
  void initPartionMap(){
    Collection partitionInfos = consumer.partitionsFor(topic);
        List tp =new ArrayList();
        partitionInfos.forEach(partitionInfo -> {
            tp.add(new TopicPartition(topic,partitionInfo.partition()));
            consumer.assign(tp);
            consumer.seekToEnd(tp);
            partionOffsetMap.put(partitionInfo.partition(),consumer.position(new TopicPartition(topic, partitionInfo.partition())));
            partionBooleanMap.put(partitionInfo.partition(), false);
            //獲取參數(shù)值后返回最初
            consumer.seekToBeginning(tp);
        });
  }
  //消費(fèi)kafka是否結(jié)束
  @Override
  public boolean reachedEnd() throws IOException {
    return !partionBooleanMap.containsValue(false);
  }
  @Override
  public ConsumerRecords<String, String> nextRecord(ConsumerRecords<String, String> reuse) {
     //從kafka中獲取一批數(shù)據(jù)
     final ConsumerRecords<String, String> records consumer.poll(Duration.ofMillis(pollTime));
        for (ConsumerRecord<String, String> record : records) {
          Integer partion=record.partition();
          Long offset= record.offset();
          //表示已有分區(qū)已經(jīng)消費(fèi)完
          if(offset+1==partionOffsetMap.get(partion)) {
            partionBooleanMap.put(partion, true);
          }
        }
     return records;
  }


四. 總結(jié)

以前的流處理計(jì)算過程經(jīng)過批處理改造后,計(jì)算時(shí)間大大縮短,也不需要設(shè)置窗口(window)、等待時(shí)間(allow lateness)和水印(watermark),而且計(jì)算完成程序自動(dòng)退出,不再占用系統(tǒng)資源。

流處理和批處理都有各自的優(yōu)缺點(diǎn)和應(yīng)用場景,應(yīng)該根據(jù)項(xiàng)目需求選擇合適的。


END


更多精彩干貨分享

點(diǎn)擊下方名片關(guān)注

IT那活兒

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/129897.html

相關(guān)文章

  • 你公司到底需不需要引入實(shí)時(shí)計(jì)算引擎?

    摘要:再如通過處理流數(shù)據(jù)生成簡單的報(bào)告,如五分鐘的窗口聚合數(shù)據(jù)平均值。復(fù)雜的事情還有在流數(shù)據(jù)中進(jìn)行數(shù)據(jù)多維度關(guān)聯(lián)聚合塞選,從而找到復(fù)雜事件中的根因。因?yàn)楦鞣N需求,也就造就了現(xiàn)在不斷出現(xiàn)實(shí)時(shí)計(jì)算框架,而下文我們將重磅介紹我們推薦的實(shí)時(shí)計(jì)算框架。 前言 先廣而告之,本文摘自本人《大數(shù)據(jù)重磅炸彈——實(shí)時(shí)計(jì)算框架 Flink》課程第二篇,內(nèi)容首發(fā)自我的知識(shí)星球,后面持續(xù)在星球里更新,這里做個(gè)預(yù)告,今...

    HackerShell 評(píng)論0 收藏0
  • OPPO數(shù)據(jù)中臺(tái)之基石:基于Flink SQL構(gòu)建實(shí)數(shù)據(jù)倉庫

    摘要:實(shí)際上,本身就預(yù)留了與外部元數(shù)據(jù)對(duì)接的能力,分別提供了和這兩個(gè)抽象。對(duì)接外部數(shù)據(jù)源搞清楚了注冊(cè)庫表的過程,給我們帶來這樣一個(gè)思路如果外部元數(shù)據(jù)創(chuàng)建的表也能被轉(zhuǎn)換成可識(shí)別的,那么就能被無縫地注冊(cè)到。 本文整理自 2019 年 4 月 13 日在深圳舉行的 Flink Meetup 會(huì)議,分享嘉賓張俊,目前擔(dān)任 OPPO 大數(shù)據(jù)平臺(tái)研發(fā)負(fù)責(zé)人,也是 Apache Flink contrib...

    jeffrey_up 評(píng)論0 收藏0
  • Apache Flink,流計(jì)算?不僅僅是流計(jì)算

    摘要:基于流處理機(jī)制實(shí)現(xiàn)批流融合相對(duì)基于批處理機(jī)制實(shí)現(xiàn)批流融合的思想更自然,更合理,也更有優(yōu)勢,因此阿里巴巴在基于支持大量核心實(shí)時(shí)計(jì)算場景的同時(shí),也在不斷改進(jìn)的架構(gòu),使其朝著真正批流融合的統(tǒng)一計(jì)算引擎方向前進(jìn)。 阿里妹導(dǎo)讀:2018年12月下旬,由阿里巴巴集團(tuán)主辦的Flink Forward China在北京國家會(huì)議中心舉行。Flink Forward是由Apache軟件基金會(huì)授權(quán)的全球范圍...

    KoreyLee 評(píng)論0 收藏0
  • Flink 從0到1學(xué)習(xí)—— 分享四本 Flink 國外的書和二十多篇 Paper 論文

    摘要:另外,將機(jī)制發(fā)揚(yáng)光大,對(duì)有著非常好的支持。系統(tǒng)也注意到并討論了和的問題??偨Y(jié)本文分享了四本相關(guān)的書籍和一份領(lǐng)域相關(guān)的論文列表篇,涉及的設(shè)計(jì),實(shí)現(xiàn),故障恢復(fù),彈性擴(kuò)展等各方面。 前言 之前也分享了不少自己的文章,但是對(duì)于 Flink 來說,還是有不少新入門的朋友,這里給大家分享點(diǎn) Flink 相關(guān)的資料(國外數(shù)據(jù) pdf 和流處理相關(guān)的 Paper),期望可以幫你更好的理解 Flink。...

    jollywing 評(píng)論0 收藏0
  • Jstorm到Flink 在今日頭條的遷移實(shí)踐

    摘要:第二個(gè)問題就是說業(yè)務(wù)團(tuán)隊(duì)之間沒有擴(kuò)大管理,預(yù)算和審核是無頭緒的。支持一些高優(yōu)先級(jí)的比如說支持以及窗口等特性包括說。到現(xiàn)在為止,整體遷移完了,還剩下十個(gè)左右的作業(yè)沒有遷移完。 作者:張光輝 本文將為大家展示字節(jié)跳動(dòng)公司怎么把Storm從Jstorm遷移到Flink的整個(gè)過程以及后續(xù)的計(jì)劃。你可以借此了解字節(jié)跳動(dòng)公司引入Flink的背景以及Flink集群的構(gòu)建過程。字節(jié)跳動(dòng)公司是如何兼容以...

    luckyyulin 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<