成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

日志服務(wù)Flink Connector《支持Exactly Once》

endiat / 1117人閱讀

摘要:是阿里云日志服務(wù)提供的,用于對接的工具,包括兩部分,消費者和生產(chǎn)者。子用戶使用需要授權(quán)如下幾個用于將數(shù)據(jù)寫到阿里云日志服務(wù)中。

摘要: Flink log connector是阿里云日志服務(wù)推出的,用于對接Flink的工具,包含兩塊,分別是消費者和生產(chǎn)者,消費者用于從日志服務(wù)中讀數(shù)據(jù),支持exactly once語義,生產(chǎn)者用于將數(shù)據(jù)寫到日志服務(wù)中,該Connector隱藏了日志服務(wù)的一些概念,比如Shard的分裂合并等,用戶在使用時只需要專注在自己的業(yè)務(wù)邏輯即可。

阿里云日志服務(wù)是針對實時數(shù)據(jù)一站式服務(wù),用戶只需要將精力集中在分析上,過程中數(shù)據(jù)采集、對接各種存儲計算、數(shù)據(jù)索引和查詢等瑣碎工作等都可以交給日志服務(wù)完成。

日志服務(wù)中最基礎(chǔ)的功能是LogHub,支持數(shù)據(jù)實時采集與消費,實時消費家族除 Spark Streaming、Storm、StreamCompute(Blink外),目前新增Flink啦。

Flink Connector
Flink log connector是阿里云日志服務(wù)提供的,用于對接flink的工具,包括兩部分,消費者(Consumer)和生產(chǎn)者(Producer)。

消費者用于從日志服務(wù)中讀取數(shù)據(jù),支持exactly once語義,支持shard負載均衡.
生產(chǎn)者用于將數(shù)據(jù)寫入日志服務(wù),使用connector時,需要在項目中添加maven依賴:


            org.apache.flink
            flink-streaming-java_2.11
            1.3.2


            com.aliyun.openservices
            flink-log-connector
            0.1.3


            com.google.protobuf
            protobuf-java
            2.5.0

 
            com.aliyun.openservices
            aliyun-log
            0.6.10
 

            com.aliyun.openservices
            log-loghub-producer
            0.1.8

代碼:Github

用法
請參考日志服務(wù)文檔,正確創(chuàng)建Logstore。
如果使用子賬號訪問,請確認正確設(shè)置了LogStore的RAM策略。參考授權(quán)RAM子用戶訪問日志服務(wù)資源。
1. Log Consumer
在Connector中, 類FlinkLogConsumer提供了訂閱日志服務(wù)中某一個LogStore的能力,實現(xiàn)了exactly once語義,在使用時,用戶無需關(guān)心LogStore中shard數(shù)
量的變化,consumer會自動感知。

flink中每一個子任務(wù)負責(zé)消費LogStore中部分shard,如果LogStore中shard發(fā)生split或者merge,子任務(wù)消費的shard也會隨之改變。

1.1 配置啟動參數(shù)

Properties configProps = new Properties();
// 設(shè)置訪問日志服務(wù)的域名
configProps.put(ConfigConstants.LOG_ENDPOINT, "cn-hangzhou.log.aliyuncs.com");
// 設(shè)置訪問ak
configProps.put(ConfigConstants.LOG_ACCESSSKEYID, "");
configProps.put(ConfigConstants.LOG_ACCESSKEY, "");
// 設(shè)置日志服務(wù)的project
configProps.put(ConfigConstants.LOG_PROJECT, "ali-cn-hangzhou-sls-admin");
// 設(shè)置日志服務(wù)的LogStore
configProps.put(ConfigConstants.LOG_LOGSTORE, "sls_consumergroup_log");
// 設(shè)置消費日志服務(wù)起始位置
configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_END_CURSOR);
// 設(shè)置日志服務(wù)的消息反序列化方法
RawLogGroupListDeserializer deserializer = new RawLogGroupListDeserializer();
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream logTestStream = env.addSource(
        new FlinkLogConsumer(deserializer, configProps));

上面是一個簡單的消費示例,我們使用java.util.Properties作為配置工具,所有Consumer的配置都可以在ConfigConstants中找到。

注意,flink stream的子任務(wù)數(shù)量和日志服務(wù)LogStore中的shard數(shù)量是獨立的,如果shard數(shù)量多于子任務(wù)數(shù)量,每個子任務(wù)不重復(fù)的消費多個shard,如果少于,

那么部分子任務(wù)就會空閑,等到新的shard產(chǎn)生。

1.2 設(shè)置消費起始位置
Flink log consumer支持設(shè)置shard的消費起始位置,通過設(shè)置屬性ConfigConstants.LOG_CONSUMER_BEGIN_POSITION,就可以定制消費從shard的頭尾或者某個特定時間開始消費,具體取值如下:

Consts.LOG_BEGIN_CURSOR: 表示從shard的頭開始消費,也就是從shard中最舊的數(shù)據(jù)開始消費。
Consts.LOG_END_CURSOR: 表示從shard的尾開始,也就是從shard中最新的數(shù)據(jù)開始消費。
UnixTimestamp: 一個整型數(shù)值的字符串,用1970-01-01到現(xiàn)在的秒數(shù)表示, 含義是消費shard中這個時間點之后的數(shù)據(jù)。
三種取值舉例如下:

configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_BEGIN_CURSOR);
configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_END_CURSOR);
configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, "1512439000");

1.3 監(jiān)控:消費進度(可選)
Flink log consumer支持設(shè)置消費進度監(jiān)控,所謂消費進度就是獲取每一個shard實時的消費位置,這個位置使用時間戳表示,詳細概念可以參考
文檔消費組-查看狀態(tài),[消費組-監(jiān)控報警
](https://help.aliyun.com/docum...。

configProps.put(ConfigConstants.LOG_CONSUMERGROUP, "your consumer group name”);

注意上面代碼是可選的,如果設(shè)置了,consumer會首先創(chuàng)建consumerGroup,如果已經(jīng)存在,則什么都不做,consumer中的snapshot會自動同步到日志服務(wù)的consumerGroup中,用戶可以在日志服務(wù)的控制臺查看consumer的消費進度。

1.4 容災(zāi)和exactly once語義支持
當打開Flink的checkpointing功能時,F(xiàn)link log consumer會周期性的將每個shard的消費進度保存起來,當作業(yè)失敗時,flink會恢復(fù)log consumer,并
從保存的最新的checkpoint開始消費。

寫checkpoint的周期定義了當發(fā)生失敗時,最多多少的數(shù)據(jù)會被回溯,也就是重新消費,使用代碼如下:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 開啟flink exactly once語義
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 每5s保存一次checkpoint
env.enableCheckpointing(5000);

更多Flink checkpoint的細節(jié)請參考Flink官方文檔Checkpoints。

1.5 補充材料:關(guān)聯(lián) API與權(quán)限設(shè)置
Flink log consumer 會用到的阿里云日志服務(wù)接口如下:

GetCursorOrData

用于從shard中拉數(shù)據(jù), 注意頻繁的調(diào)用該接口可能會導(dǎo)致數(shù)據(jù)超過日志服務(wù)的shard quota, 可以通過ConfigConstants.LOG_FETCH_DATA_INTERVAL_MILLIS和ConfigConstants.LOG_MAX_NUMBER_PER_FETCH
控制接口調(diào)用的時間間隔和每次調(diào)用拉取的日志數(shù)量,shard的quota參考文章[shard簡介](https://help.aliyun.com/document_detail/28976.html).
configProps.put(ConfigConstants.LOG_FETCH_DATA_INTERVAL_MILLIS, "100");
configProps.put(ConfigConstants.LOG_MAX_NUMBER_PER_FETCH, "100");

ListShards

 用于獲取logStore中所有的shard列表,獲取shard狀態(tài)等.如果您的shard經(jīng)常發(fā)生分裂合并,可以通過調(diào)整接口的調(diào)用周期來及時發(fā)現(xiàn)shard的變化。
// 設(shè)置每30s調(diào)用一次ListShards
configProps.put(ConfigConstants.LOG_SHARDS_DISCOVERY_INTERVAL_MILLIS, "30000");

CreateConsumerGroup

該接口調(diào)用只有當設(shè)置消費進度監(jiān)控時才會發(fā)生,功能是創(chuàng)建consumerGroup,用于同步checkpoint。

ConsumerGroupUpdateCheckPoint

該接口用戶將flink的snapshot同步到日志服務(wù)的consumerGroup中。

子用戶使用Flink log consumer需要授權(quán)如下幾個RAM Policy:

Log Producer

FlinkLogProducer 用于將數(shù)據(jù)寫到阿里云日志服務(wù)中。

注意producer只支持Flink at-least-once語義,這就意味著在發(fā)生作業(yè)失敗的情況下,寫入日志服務(wù)中的數(shù)據(jù)有可能會重復(fù),但是絕對不會丟失。

用法示例如下,我們將模擬產(chǎn)生的字符串寫入日志服務(wù):

// 將數(shù)據(jù)序列化成日志服務(wù)的數(shù)據(jù)格式
class SimpleLogSerializer implements LogSerializationSchema {

    public RawLogGroup serialize(String element) {
        RawLogGroup rlg = new RawLogGroup();
        RawLog rl = new RawLog();
        rl.setTime((int)(System.currentTimeMillis() / 1000));
        rl.addContent("message", element);
        rlg.addLog(rl);
        return rlg;
    }
}
public class ProducerSample {
    public static String sEndpoint = "cn-hangzhou.log.aliyuncs.com";
    public static String sAccessKeyId = "";
    public static String sAccessKey = "";
    public static String sProject = "ali-cn-hangzhou-sls-admin";
    public static String sLogstore = "test-flink-producer";
    private static final Logger LOG = LoggerFactory.getLogger(ConsumerSample.class);


    public static void main(String[] args) throws Exception {

        final ParameterTool params = ParameterTool.fromArgs(args);
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setGlobalJobParameters(params);
        env.setParallelism(3);

        DataStream simpleStringStream = env.addSource(new EventsGenerator());

        Properties configProps = new Properties();
        // 設(shè)置訪問日志服務(wù)的域名
        configProps.put(ConfigConstants.LOG_ENDPOINT, sEndpoint);
        // 設(shè)置訪問日志服務(wù)的ak
        configProps.put(ConfigConstants.LOG_ACCESSSKEYID, sAccessKeyId);
        configProps.put(ConfigConstants.LOG_ACCESSKEY, sAccessKey);
        // 設(shè)置日志寫入的日志服務(wù)project
        configProps.put(ConfigConstants.LOG_PROJECT, sProject);
        // 設(shè)置日志寫入的日志服務(wù)logStore
        configProps.put(ConfigConstants.LOG_LOGSTORE, sLogstore);

        FlinkLogProducer logProducer = new FlinkLogProducer(new SimpleLogSerializer(), configProps);

        simpleStringStream.addSink(logProducer);

        env.execute("flink log producer");
    }
    // 模擬產(chǎn)生日志
    public static class EventsGenerator implements SourceFunction {
        private boolean running = true;

        @Override
        public void run(SourceContext ctx) throws Exception {
            long seq = 0;
            while (running) {
                Thread.sleep(10);
                ctx.collect((seq++) + "-" + RandomStringUtils.randomAlphabetic(12));
            }
        }

        @Override
        public void cancel() {
            running = false;
        }
    }
}

2.1 初始化
Producer初始化主要需要做兩件事情:

初始化配置參數(shù)Properties, 這一步和Consumer類似, Producer有一些定制的參數(shù),一般情況下使用默認值即可,特殊場景可以考慮定制:

// 用于發(fā)送數(shù)據(jù)的io線程的數(shù)量,默認是8
ConfigConstants.LOG_SENDER_IO_THREAD_COUNT
// 該值定義日志數(shù)據(jù)被緩存發(fā)送的時間,默認是3000
ConfigConstants.LOG_PACKAGE_TIMEOUT_MILLIS
// 緩存發(fā)送的包中日志的數(shù)量,默認是4096
ConfigConstants.LOG_LOGS_COUNT_PER_PACKAGE
// 緩存發(fā)送的包的大小,默認是3Mb
ConfigConstants.LOG_LOGS_BYTES_PER_PACKAGE
// 作業(yè)可以使用的內(nèi)存總的大小,默認是100Mb
ConfigConstants.LOG_MEM_POOL_BYTES
上述參數(shù)不是必選參數(shù),用戶可以不設(shè)置,直接使用默認值。

重載LogSerializationSchema,定義將數(shù)據(jù)序列化成RawLogGroup的方法。

RawLogGroup是log的集合,每個字段的含義可以參考文檔[日志數(shù)據(jù)模型](https://help.aliyun.com/document_detail/29054.html)。

如果用戶需要使用日志服務(wù)的shardHashKey功能,指定數(shù)據(jù)寫到某一個shard中,可以使用LogPartitioner產(chǎn)生數(shù)據(jù)的hashKey,用法例子如下:

FlinkLogProducer logProducer = new FlinkLogProducer(new SimpleLogSerializer(), configProps);
logProducer.setCustomPartitioner(new LogPartitioner() {
            // 生成32位hash值
            public String getHashKey(String element) {
                try {
                    MessageDigest md = MessageDigest.getInstance("MD5");
                    md.update(element.getBytes());
                    String hash = new BigInteger(1, md.digest()).toString(16);
                    while(hash.length() < 32) hash = "0" + hash;
                    return hash;
                } catch (NoSuchAlgorithmException e) {
                }
                return  "0000000000000000000000000000000000000000000000000000000000000000";
            }
        });

注意LogPartitioner是可選的,不設(shè)置情況下, 數(shù)據(jù)會隨機寫入某一個shard。

2.2 權(quán)限設(shè)置:RAM Policy
Producer依賴日志服務(wù)的API寫數(shù)據(jù),如下:

log:PostLogStoreLogs
log:ListShards
當RAM子用戶使用Producer時,需要對上述兩個API進行授權(quán):

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/51577.html

相關(guān)文章

  • Flink實戰(zhàn)(八) - Streaming Connectors 編程

    摘要:默認情況下,當數(shù)據(jù)元到達時,分段接收器將按當前系統(tǒng)時間拆分,并使用日期時間模式命名存儲區(qū)。如果需要,可以使用數(shù)據(jù)元或元組的屬性來確定目錄。這將調(diào)用傳入的數(shù)據(jù)元并將它們寫入部分文件,由換行符分隔。消費者的消費者被稱為或等。 1 概覽 1.1 預(yù)定義的源和接收器 Flink內(nèi)置了一些基本數(shù)據(jù)源和接收器,并且始終可用。該預(yù)定義的數(shù)據(jù)源包括文件,目錄和插socket,并從集合和迭代器攝取數(shù)據(jù)...

    beita 評論0 收藏0
  • OPPO數(shù)據(jù)中臺之基石:基于Flink SQL構(gòu)建實數(shù)據(jù)倉庫

    摘要:實際上,本身就預(yù)留了與外部元數(shù)據(jù)對接的能力,分別提供了和這兩個抽象。對接外部數(shù)據(jù)源搞清楚了注冊庫表的過程,給我們帶來這樣一個思路如果外部元數(shù)據(jù)創(chuàng)建的表也能被轉(zhuǎn)換成可識別的,那么就能被無縫地注冊到。 本文整理自 2019 年 4 月 13 日在深圳舉行的 Flink Meetup 會議,分享嘉賓張俊,目前擔(dān)任 OPPO 大數(shù)據(jù)平臺研發(fā)負責(zé)人,也是 Apache Flink contrib...

    jeffrey_up 評論0 收藏0
  • Flink1.7穩(wěn)定版發(fā)布:新增功能為企業(yè)生產(chǎn)帶來哪些好處

    摘要:通過狀態(tài)演變,可以在狀態(tài)模式中添加或刪除列,以便更改應(yīng)用程序部署后應(yīng)捕獲的業(yè)務(wù)功能。本地恢復(fù)通過擴展的調(diào)度來完成本地恢復(fù)功能,以便在恢復(fù)時考慮先前的部署位置。此功能大大提高了恢復(fù)速度。問題導(dǎo)讀1.Flink1.7開始支持Scala哪個版本?2.Flink1.7狀態(tài)演變在實際生產(chǎn)中有什么好處?3.支持SQL/Table API中的富集連接可以做那些事情?4.Flink1.7新增了哪些連接器Ap...

    Hwg 評論0 收藏0
  • 從 Spark Streaming 到 Apache Flink : 實時數(shù)據(jù)流在愛奇藝的演進

    摘要:在移動端,愛奇藝月度總有效時長億小時,穩(wěn)居中國榜第三名。愛奇藝的峰值事件數(shù)達到萬秒,在正確性容錯性能延遲吞吐量擴展性等方面均遇到不小的挑戰(zhàn)。從到愛奇藝主要使用的是和來進行流式計算。作者:陳越晨 整理:劉河 本文將為大家介紹Apache Flink在愛奇藝的生產(chǎn)與實踐過程。你可以借此了解到愛奇藝引入Apache Flink的背景與挑戰(zhàn),以及平臺構(gòu)建化流程。主要內(nèi)容如下: 愛奇藝在實時計算方...

    econi 評論0 收藏0

發(fā)表評論

0條評論

endiat

|高級講師

TA的文章

閱讀更多
最新活動
閱讀需要支付1元查看
<