摘要:是阿里云日志服務(wù)提供的,用于對接的工具,包括兩部分,消費者和生產(chǎn)者。子用戶使用需要授權(quán)如下幾個用于將數(shù)據(jù)寫到阿里云日志服務(wù)中。
摘要: Flink log connector是阿里云日志服務(wù)推出的,用于對接Flink的工具,包含兩塊,分別是消費者和生產(chǎn)者,消費者用于從日志服務(wù)中讀數(shù)據(jù),支持exactly once語義,生產(chǎn)者用于將數(shù)據(jù)寫到日志服務(wù)中,該Connector隱藏了日志服務(wù)的一些概念,比如Shard的分裂合并等,用戶在使用時只需要專注在自己的業(yè)務(wù)邏輯即可。
阿里云日志服務(wù)是針對實時數(shù)據(jù)一站式服務(wù),用戶只需要將精力集中在分析上,過程中數(shù)據(jù)采集、對接各種存儲計算、數(shù)據(jù)索引和查詢等瑣碎工作等都可以交給日志服務(wù)完成。
日志服務(wù)中最基礎(chǔ)的功能是LogHub,支持數(shù)據(jù)實時采集與消費,實時消費家族除 Spark Streaming、Storm、StreamCompute(Blink外),目前新增Flink啦。
Flink Connector
Flink log connector是阿里云日志服務(wù)提供的,用于對接flink的工具,包括兩部分,消費者(Consumer)和生產(chǎn)者(Producer)。
消費者用于從日志服務(wù)中讀取數(shù)據(jù),支持exactly once語義,支持shard負載均衡.
生產(chǎn)者用于將數(shù)據(jù)寫入日志服務(wù),使用connector時,需要在項目中添加maven依賴:
org.apache.flink flink-streaming-java_2.11 1.3.2 com.aliyun.openservices flink-log-connector 0.1.3 com.google.protobuf protobuf-java 2.5.0 com.aliyun.openservices aliyun-log 0.6.10 com.aliyun.openservices log-loghub-producer 0.1.8
代碼:Github
用法
請參考日志服務(wù)文檔,正確創(chuàng)建Logstore。
如果使用子賬號訪問,請確認正確設(shè)置了LogStore的RAM策略。參考授權(quán)RAM子用戶訪問日志服務(wù)資源。
1. Log Consumer
在Connector中, 類FlinkLogConsumer提供了訂閱日志服務(wù)中某一個LogStore的能力,實現(xiàn)了exactly once語義,在使用時,用戶無需關(guān)心LogStore中shard數(shù)
量的變化,consumer會自動感知。
flink中每一個子任務(wù)負責(zé)消費LogStore中部分shard,如果LogStore中shard發(fā)生split或者merge,子任務(wù)消費的shard也會隨之改變。
1.1 配置啟動參數(shù)
Properties configProps = new Properties(); // 設(shè)置訪問日志服務(wù)的域名 configProps.put(ConfigConstants.LOG_ENDPOINT, "cn-hangzhou.log.aliyuncs.com"); // 設(shè)置訪問ak configProps.put(ConfigConstants.LOG_ACCESSSKEYID, ""); configProps.put(ConfigConstants.LOG_ACCESSKEY, ""); // 設(shè)置日志服務(wù)的project configProps.put(ConfigConstants.LOG_PROJECT, "ali-cn-hangzhou-sls-admin"); // 設(shè)置日志服務(wù)的LogStore configProps.put(ConfigConstants.LOG_LOGSTORE, "sls_consumergroup_log"); // 設(shè)置消費日志服務(wù)起始位置 configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_END_CURSOR); // 設(shè)置日志服務(wù)的消息反序列化方法 RawLogGroupListDeserializer deserializer = new RawLogGroupListDeserializer(); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamlogTestStream = env.addSource( new FlinkLogConsumer (deserializer, configProps));
上面是一個簡單的消費示例,我們使用java.util.Properties作為配置工具,所有Consumer的配置都可以在ConfigConstants中找到。
注意,flink stream的子任務(wù)數(shù)量和日志服務(wù)LogStore中的shard數(shù)量是獨立的,如果shard數(shù)量多于子任務(wù)數(shù)量,每個子任務(wù)不重復(fù)的消費多個shard,如果少于,
那么部分子任務(wù)就會空閑,等到新的shard產(chǎn)生。
1.2 設(shè)置消費起始位置
Flink log consumer支持設(shè)置shard的消費起始位置,通過設(shè)置屬性ConfigConstants.LOG_CONSUMER_BEGIN_POSITION,就可以定制消費從shard的頭尾或者某個特定時間開始消費,具體取值如下:
Consts.LOG_BEGIN_CURSOR: 表示從shard的頭開始消費,也就是從shard中最舊的數(shù)據(jù)開始消費。
Consts.LOG_END_CURSOR: 表示從shard的尾開始,也就是從shard中最新的數(shù)據(jù)開始消費。
UnixTimestamp: 一個整型數(shù)值的字符串,用1970-01-01到現(xiàn)在的秒數(shù)表示, 含義是消費shard中這個時間點之后的數(shù)據(jù)。
三種取值舉例如下:
configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_BEGIN_CURSOR); configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_END_CURSOR); configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, "1512439000");
1.3 監(jiān)控:消費進度(可選)
Flink log consumer支持設(shè)置消費進度監(jiān)控,所謂消費進度就是獲取每一個shard實時的消費位置,這個位置使用時間戳表示,詳細概念可以參考
文檔消費組-查看狀態(tài),[消費組-監(jiān)控報警
](https://help.aliyun.com/docum...。
configProps.put(ConfigConstants.LOG_CONSUMERGROUP, "your consumer group name”);
注意上面代碼是可選的,如果設(shè)置了,consumer會首先創(chuàng)建consumerGroup,如果已經(jīng)存在,則什么都不做,consumer中的snapshot會自動同步到日志服務(wù)的consumerGroup中,用戶可以在日志服務(wù)的控制臺查看consumer的消費進度。
1.4 容災(zāi)和exactly once語義支持
當打開Flink的checkpointing功能時,F(xiàn)link log consumer會周期性的將每個shard的消費進度保存起來,當作業(yè)失敗時,flink會恢復(fù)log consumer,并
從保存的最新的checkpoint開始消費。
寫checkpoint的周期定義了當發(fā)生失敗時,最多多少的數(shù)據(jù)會被回溯,也就是重新消費,使用代碼如下:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 開啟flink exactly once語義 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 每5s保存一次checkpoint env.enableCheckpointing(5000);
更多Flink checkpoint的細節(jié)請參考Flink官方文檔Checkpoints。
1.5 補充材料:關(guān)聯(lián) API與權(quán)限設(shè)置
Flink log consumer 會用到的阿里云日志服務(wù)接口如下:
GetCursorOrData
用于從shard中拉數(shù)據(jù), 注意頻繁的調(diào)用該接口可能會導(dǎo)致數(shù)據(jù)超過日志服務(wù)的shard quota, 可以通過ConfigConstants.LOG_FETCH_DATA_INTERVAL_MILLIS和ConfigConstants.LOG_MAX_NUMBER_PER_FETCH 控制接口調(diào)用的時間間隔和每次調(diào)用拉取的日志數(shù)量,shard的quota參考文章[shard簡介](https://help.aliyun.com/document_detail/28976.html).
configProps.put(ConfigConstants.LOG_FETCH_DATA_INTERVAL_MILLIS, "100"); configProps.put(ConfigConstants.LOG_MAX_NUMBER_PER_FETCH, "100");
ListShards
用于獲取logStore中所有的shard列表,獲取shard狀態(tài)等.如果您的shard經(jīng)常發(fā)生分裂合并,可以通過調(diào)整接口的調(diào)用周期來及時發(fā)現(xiàn)shard的變化。
// 設(shè)置每30s調(diào)用一次ListShards configProps.put(ConfigConstants.LOG_SHARDS_DISCOVERY_INTERVAL_MILLIS, "30000");
CreateConsumerGroup
該接口調(diào)用只有當設(shè)置消費進度監(jiān)控時才會發(fā)生,功能是創(chuàng)建consumerGroup,用于同步checkpoint。
ConsumerGroupUpdateCheckPoint
該接口用戶將flink的snapshot同步到日志服務(wù)的consumerGroup中。
子用戶使用Flink log consumer需要授權(quán)如下幾個RAM Policy:
Log Producer
FlinkLogProducer 用于將數(shù)據(jù)寫到阿里云日志服務(wù)中。
注意producer只支持Flink at-least-once語義,這就意味著在發(fā)生作業(yè)失敗的情況下,寫入日志服務(wù)中的數(shù)據(jù)有可能會重復(fù),但是絕對不會丟失。
用法示例如下,我們將模擬產(chǎn)生的字符串寫入日志服務(wù):
// 將數(shù)據(jù)序列化成日志服務(wù)的數(shù)據(jù)格式 class SimpleLogSerializer implements LogSerializationSchema{ public RawLogGroup serialize(String element) { RawLogGroup rlg = new RawLogGroup(); RawLog rl = new RawLog(); rl.setTime((int)(System.currentTimeMillis() / 1000)); rl.addContent("message", element); rlg.addLog(rl); return rlg; } } public class ProducerSample { public static String sEndpoint = "cn-hangzhou.log.aliyuncs.com"; public static String sAccessKeyId = ""; public static String sAccessKey = ""; public static String sProject = "ali-cn-hangzhou-sls-admin"; public static String sLogstore = "test-flink-producer"; private static final Logger LOG = LoggerFactory.getLogger(ConsumerSample.class); public static void main(String[] args) throws Exception { final ParameterTool params = ParameterTool.fromArgs(args); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setGlobalJobParameters(params); env.setParallelism(3); DataStream simpleStringStream = env.addSource(new EventsGenerator()); Properties configProps = new Properties(); // 設(shè)置訪問日志服務(wù)的域名 configProps.put(ConfigConstants.LOG_ENDPOINT, sEndpoint); // 設(shè)置訪問日志服務(wù)的ak configProps.put(ConfigConstants.LOG_ACCESSSKEYID, sAccessKeyId); configProps.put(ConfigConstants.LOG_ACCESSKEY, sAccessKey); // 設(shè)置日志寫入的日志服務(wù)project configProps.put(ConfigConstants.LOG_PROJECT, sProject); // 設(shè)置日志寫入的日志服務(wù)logStore configProps.put(ConfigConstants.LOG_LOGSTORE, sLogstore); FlinkLogProducer logProducer = new FlinkLogProducer (new SimpleLogSerializer(), configProps); simpleStringStream.addSink(logProducer); env.execute("flink log producer"); } // 模擬產(chǎn)生日志 public static class EventsGenerator implements SourceFunction { private boolean running = true; @Override public void run(SourceContext ctx) throws Exception { long seq = 0; while (running) { Thread.sleep(10); ctx.collect((seq++) + "-" + RandomStringUtils.randomAlphabetic(12)); } } @Override public void cancel() { running = false; } } }
2.1 初始化
Producer初始化主要需要做兩件事情:
初始化配置參數(shù)Properties, 這一步和Consumer類似, Producer有一些定制的參數(shù),一般情況下使用默認值即可,特殊場景可以考慮定制:
// 用于發(fā)送數(shù)據(jù)的io線程的數(shù)量,默認是8 ConfigConstants.LOG_SENDER_IO_THREAD_COUNT // 該值定義日志數(shù)據(jù)被緩存發(fā)送的時間,默認是3000 ConfigConstants.LOG_PACKAGE_TIMEOUT_MILLIS // 緩存發(fā)送的包中日志的數(shù)量,默認是4096 ConfigConstants.LOG_LOGS_COUNT_PER_PACKAGE // 緩存發(fā)送的包的大小,默認是3Mb ConfigConstants.LOG_LOGS_BYTES_PER_PACKAGE // 作業(yè)可以使用的內(nèi)存總的大小,默認是100Mb ConfigConstants.LOG_MEM_POOL_BYTES
上述參數(shù)不是必選參數(shù),用戶可以不設(shè)置,直接使用默認值。
重載LogSerializationSchema,定義將數(shù)據(jù)序列化成RawLogGroup的方法。
RawLogGroup是log的集合,每個字段的含義可以參考文檔[日志數(shù)據(jù)模型](https://help.aliyun.com/document_detail/29054.html)。
如果用戶需要使用日志服務(wù)的shardHashKey功能,指定數(shù)據(jù)寫到某一個shard中,可以使用LogPartitioner產(chǎn)生數(shù)據(jù)的hashKey,用法例子如下:
FlinkLogProducerlogProducer = new FlinkLogProducer (new SimpleLogSerializer(), configProps); logProducer.setCustomPartitioner(new LogPartitioner () { // 生成32位hash值 public String getHashKey(String element) { try { MessageDigest md = MessageDigest.getInstance("MD5"); md.update(element.getBytes()); String hash = new BigInteger(1, md.digest()).toString(16); while(hash.length() < 32) hash = "0" + hash; return hash; } catch (NoSuchAlgorithmException e) { } return "0000000000000000000000000000000000000000000000000000000000000000"; } });
注意LogPartitioner是可選的,不設(shè)置情況下, 數(shù)據(jù)會隨機寫入某一個shard。
2.2 權(quán)限設(shè)置:RAM Policy
Producer依賴日志服務(wù)的API寫數(shù)據(jù),如下:
log:PostLogStoreLogs
log:ListShards
當RAM子用戶使用Producer時,需要對上述兩個API進行授權(quán):
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/51577.html
摘要:默認情況下,當數(shù)據(jù)元到達時,分段接收器將按當前系統(tǒng)時間拆分,并使用日期時間模式命名存儲區(qū)。如果需要,可以使用數(shù)據(jù)元或元組的屬性來確定目錄。這將調(diào)用傳入的數(shù)據(jù)元并將它們寫入部分文件,由換行符分隔。消費者的消費者被稱為或等。 1 概覽 1.1 預(yù)定義的源和接收器 Flink內(nèi)置了一些基本數(shù)據(jù)源和接收器,并且始終可用。該預(yù)定義的數(shù)據(jù)源包括文件,目錄和插socket,并從集合和迭代器攝取數(shù)據(jù)...
摘要:實際上,本身就預(yù)留了與外部元數(shù)據(jù)對接的能力,分別提供了和這兩個抽象。對接外部數(shù)據(jù)源搞清楚了注冊庫表的過程,給我們帶來這樣一個思路如果外部元數(shù)據(jù)創(chuàng)建的表也能被轉(zhuǎn)換成可識別的,那么就能被無縫地注冊到。 本文整理自 2019 年 4 月 13 日在深圳舉行的 Flink Meetup 會議,分享嘉賓張俊,目前擔(dān)任 OPPO 大數(shù)據(jù)平臺研發(fā)負責(zé)人,也是 Apache Flink contrib...
摘要:通過狀態(tài)演變,可以在狀態(tài)模式中添加或刪除列,以便更改應(yīng)用程序部署后應(yīng)捕獲的業(yè)務(wù)功能。本地恢復(fù)通過擴展的調(diào)度來完成本地恢復(fù)功能,以便在恢復(fù)時考慮先前的部署位置。此功能大大提高了恢復(fù)速度。問題導(dǎo)讀1.Flink1.7開始支持Scala哪個版本?2.Flink1.7狀態(tài)演變在實際生產(chǎn)中有什么好處?3.支持SQL/Table API中的富集連接可以做那些事情?4.Flink1.7新增了哪些連接器Ap...
摘要:在移動端,愛奇藝月度總有效時長億小時,穩(wěn)居中國榜第三名。愛奇藝的峰值事件數(shù)達到萬秒,在正確性容錯性能延遲吞吐量擴展性等方面均遇到不小的挑戰(zhàn)。從到愛奇藝主要使用的是和來進行流式計算。作者:陳越晨 整理:劉河 本文將為大家介紹Apache Flink在愛奇藝的生產(chǎn)與實踐過程。你可以借此了解到愛奇藝引入Apache Flink的背景與挑戰(zhàn),以及平臺構(gòu)建化流程。主要內(nèi)容如下: 愛奇藝在實時計算方...
閱讀 2767·2019-08-30 15:53
閱讀 536·2019-08-29 17:22
閱讀 1074·2019-08-29 13:10
閱讀 2331·2019-08-26 13:45
閱讀 2763·2019-08-26 10:46
閱讀 3210·2019-08-26 10:45
閱讀 2516·2019-08-26 10:14
閱讀 478·2019-08-23 18:23