成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

《從0到1學(xué)習(xí)Flink》—— Flink 寫入數(shù)據(jù)到 ElasticSearch

W4n9Hu1 / 3263人閱讀

摘要:從到學(xué)習(xí)介紹從到學(xué)習(xí)介紹其中包括了和的,后面我也講了下如何自定義自己的和。這個問題可是線上很容易遇到的關(guān)注我轉(zhuǎn)載請務(wù)必注明原創(chuàng)地址為微信公眾號另外我自己整理了些的學(xué)習(xí)資料,目前已經(jīng)全部放到微信公眾號了。

前言

前面 FLink 的文章中我們已經(jīng)介紹了說 Flink 已經(jīng)有很多自帶的 Connector。

1、[《從0到1學(xué)習(xí)Flink》—— Data Source 介紹
](http://www.54tianzhisheng.cn/...

2、《從0到1學(xué)習(xí)Flink》—— Data Sink 介紹

其中包括了 Source 和 Sink 的,后面我也講了下如何自定義自己的 Source 和 Sink。

那么今天要做的事情是啥呢?就是介紹一下 Flink 自帶的 ElasticSearch Connector,我們今天就用他來做 Sink,將 Kafka 中的數(shù)據(jù)經(jīng)過 Flink 處理后然后存儲到 ElasticSearch。

準(zhǔn)備

安裝 ElasticSearch,這里就忽略,自己找我以前的文章,建議安裝 ElasticSearch 6.0 版本以上的,畢竟要跟上時代的節(jié)奏。

下面就講解一下生產(chǎn)環(huán)境中如何使用 Elasticsearch Sink 以及一些注意點(diǎn),及其內(nèi)部實(shí)現(xiàn)機(jī)制。

Elasticsearch Sink 添加依賴

    org.apache.flink
    flink-connector-elasticsearch6_${scala.binary.version}
    ${flink.version}

上面這依賴版本號請自己根據(jù)使用的版本對應(yīng)改變下。

下面所有的代碼都沒有把 import 引入到這里來,如果需要查看更詳細(xì)的代碼,請查看我的 GitHub 倉庫地址:

https://github.com/zhisheng17/flink-learning/tree/master/flink-learning-connectors/flink-learning-connectors-es6

這個 module 含有本文的所有代碼實(shí)現(xiàn),當(dāng)然越寫到后面自己可能會做一些抽象,所以如果有代碼改變很正常,請直接查看全部項(xiàng)目代碼。

ElasticSearchSinkUtil 工具類

這個工具類是自己封裝的,getEsAddresses 方法將傳入的配置文件 es 地址解析出來,可以是域名方式,也可以是 ip + port 形式。addSink 方法是利用了 Flink 自帶的 ElasticsearchSink 來封裝了一層,傳入了一些必要的調(diào)優(yōu)參數(shù)和 es 配置參數(shù),下面文章還會再講些其他的配置。

ElasticSearchSinkUtil.java

public class ElasticSearchSinkUtil {

    /**
     * es sink
     *
     * @param hosts es hosts
     * @param bulkFlushMaxActions bulk flush size
     * @param parallelism 并行數(shù)
     * @param data 數(shù)據(jù)
     * @param func
     * @param 
     */
    public static  void addSink(List hosts, int bulkFlushMaxActions, int parallelism,
                                   SingleOutputStreamOperator data, ElasticsearchSinkFunction func) {
        ElasticsearchSink.Builder esSinkBuilder = new ElasticsearchSink.Builder<>(hosts, func);
        esSinkBuilder.setBulkFlushMaxActions(bulkFlushMaxActions);
        data.addSink(esSinkBuilder.build()).setParallelism(parallelism);
    }

    /**
     * 解析配置文件的 es hosts
     *
     * @param hosts
     * @return
     * @throws MalformedURLException
     */
    public static List getEsAddresses(String hosts) throws MalformedURLException {
        String[] hostList = hosts.split(",");
        List addresses = new ArrayList<>();
        for (String host : hostList) {
            if (host.startsWith("http")) {
                URL url = new URL(host);
                addresses.add(new HttpHost(url.getHost(), url.getPort()));
            } else {
                String[] parts = host.split(":", 2);
                if (parts.length > 1) {
                    addresses.add(new HttpHost(parts[0], Integer.parseInt(parts[1])));
                } else {
                    throw new MalformedURLException("invalid elasticsearch hosts format");
                }
            }
        }
        return addresses;
    }
}
Main 啟動類

Main.java

public class Main {
    public static void main(String[] args) throws Exception {
        //獲取所有參數(shù)
        final ParameterTool parameterTool = ExecutionEnvUtil.createParameterTool(args);
        //準(zhǔn)備好環(huán)境
        StreamExecutionEnvironment env = ExecutionEnvUtil.prepare(parameterTool);
        //從kafka讀取數(shù)據(jù)
        DataStreamSource data = KafkaConfigUtil.buildSource(env);

        //從配置文件中讀取 es 的地址
        List esAddresses = ElasticSearchSinkUtil.getEsAddresses(parameterTool.get(ELASTICSEARCH_HOSTS));
        //從配置文件中讀取 bulk flush size,代表一次批處理的數(shù)量,這個可是性能調(diào)優(yōu)參數(shù),特別提醒
        int bulkSize = parameterTool.getInt(ELASTICSEARCH_BULK_FLUSH_MAX_ACTIONS, 40);
        //從配置文件中讀取并行 sink 數(shù),這個也是性能調(diào)優(yōu)參數(shù),特別提醒,這樣才能夠更快的消費(fèi),防止 kafka 數(shù)據(jù)堆積
        int sinkParallelism = parameterTool.getInt(STREAM_SINK_PARALLELISM, 5);

        //自己再自帶的 es sink 上一層封裝了下
        ElasticSearchSinkUtil.addSink(esAddresses, bulkSize, sinkParallelism, data,
                (Metrics metric, RuntimeContext runtimeContext, RequestIndexer requestIndexer) -> {
                    requestIndexer.add(Requests.indexRequest()
                            .index(ZHISHENG + "_" + metric.getName())  //es 索引名
                            .type(ZHISHENG) //es type
                            .source(GsonUtil.toJSONBytes(metric), XContentType.JSON)); 
                });
        env.execute("flink learning connectors es6");
    }
}
配置文件

配置都支持集群模式填寫,注意用 , 分隔!

kafka.brokers=localhost:9092
kafka.group.id=zhisheng-metrics-group-test
kafka.zookeeper.connect=localhost:2181
metrics.topic=zhisheng-metrics
stream.parallelism=5
stream.checkpoint.interval=1000
stream.checkpoint.enable=false
elasticsearch.hosts=localhost:9200
elasticsearch.bulk.flush.max.actions=40
stream.sink.parallelism=5
運(yùn)行結(jié)果

執(zhí)行 Main 類的 main 方法,我們的程序是只打印 flink 的日志,沒有打印存入的日志(因?yàn)槲覀冞@里沒有打日志):

所以看起來不知道我們的 sink 是否有用,數(shù)據(jù)是否從 kafka 讀取出來后存入到 es 了。

你可以查看下本地起的 es 終端或者服務(wù)器的 es 日志就可以看到效果了。

es 日志如下:

上圖是我本地 Mac 電腦終端的 es 日志,可以看到我們的索引了。

如果還不放心,你也可以在你的電腦裝個 kibana,然后更加的直觀查看下 es 的索引情況(或者直接敲 es 的命令)

我們用 kibana 查看存入 es 的索引如下:

程序執(zhí)行了一會,存入 es 的數(shù)據(jù)量就很大了。

擴(kuò)展配置

上面代碼已經(jīng)可以實(shí)現(xiàn)你的大部分場景了,但是如果你的業(yè)務(wù)場景需要保證數(shù)據(jù)的完整性(不能出現(xiàn)丟數(shù)據(jù)的情況),那么就需要添加一些重試策略,因?yàn)樵谖覀兊纳a(chǎn)環(huán)境中,很有可能會因?yàn)槟承┙M件不穩(wěn)定性導(dǎo)致各種問題,所以這里我們就要在數(shù)據(jù)存入失敗的時候做重試操作,這里 flink 自帶的 es sink 就支持了,常用的失敗重試配置有:

1、bulk.flush.backoff.enable 用來表示是否開啟重試機(jī)制

2、bulk.flush.backoff.type 重試策略,有兩種:EXPONENTIAL 指數(shù)型(表示多次重試之間的時間間隔按照指數(shù)方式進(jìn)行增長)、CONSTANT 常數(shù)型(表示多次重試之間的時間間隔為固定常數(shù))

3、bulk.flush.backoff.delay 進(jìn)行重試的時間間隔

4、bulk.flush.backoff.retries 失敗重試的次數(shù)

5、bulk.flush.max.actions: 批量寫入時的最大寫入條數(shù)

6、bulk.flush.max.size.mb: 批量寫入時的最大數(shù)據(jù)量

7、bulk.flush.interval.ms: 批量寫入的時間間隔,配置后則會按照該時間間隔嚴(yán)格執(zhí)行,無視上面的兩個批量寫入配置

看下啦,就是如下這些配置了,如果你需要的話,可以在這個地方配置擴(kuò)充了。

FailureHandler 失敗處理器

寫入 ES 的時候會有這些情況會導(dǎo)致寫入 ES 失敗:

1、ES 集群隊(duì)列滿了,報如下錯誤

12:08:07.326 [I/O dispatcher 13] ERROR o.a.f.s.c.e.ElasticsearchSinkBase - Failed Elasticsearch item request: ElasticsearchException[Elasticsearch exception [type=es_rejected_execution_exception, reason=rejected execution of org.elasticsearch.transport.TransportService$7@566c9379 on EsThreadPoolExecutor[name = node-1/write, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@f00b373[Running, pool size = 4, active threads = 4, queued tasks = 200, completed tasks = 6277]]]]

是這樣的,我電腦安裝的 es 隊(duì)列容量默認(rèn)應(yīng)該是 200,我沒有修改過。我這里如果配置的 bulk flush size * 并發(fā) sink 數(shù)量 這個值如果大于這個 queue capacity ,那么就很容易導(dǎo)致出現(xiàn)這種因?yàn)?es 隊(duì)列滿了而寫入失敗。

當(dāng)然這里你也可以通過調(diào)大點(diǎn) es 的隊(duì)列。參考:https://www.elastic.co/guide/...

2、ES 集群某個節(jié)點(diǎn)掛了

這個就不用說了,肯定寫入失敗的。跟過源碼可以發(fā)現(xiàn) RestClient 類里的 performRequestAsync 方法一開始會隨機(jī)的從集群中的某個節(jié)點(diǎn)進(jìn)行寫入數(shù)據(jù),如果這臺機(jī)器掉線,會進(jìn)行重試在其他的機(jī)器上寫入,那么當(dāng)時寫入的這臺機(jī)器的請求就需要進(jìn)行失敗重試,否則就會把數(shù)據(jù)丟失!

3、ES 集群某個節(jié)點(diǎn)的磁盤滿了

這里說的磁盤滿了,并不是磁盤真的就沒有一點(diǎn)剩余空間的,是 es 會在寫入的時候檢查磁盤的使用情況,在 85% 的時候會打印日志警告。

這里我看了下源碼如下圖:

如果你想繼續(xù)讓 es 寫入的話就需要去重新配一下 es 讓它繼續(xù)寫入,或者你也可以清空些不必要的數(shù)據(jù)騰出磁盤空間來。

解決方法
DataStream input = ...;

input.addSink(new ElasticsearchSink<>(
    config, transportAddresses,
    new ElasticsearchSinkFunction() {...},
    new ActionRequestFailureHandler() {
        @Override
        void onFailure(ActionRequest action,
                Throwable failure,
                int restStatusCode,
                RequestIndexer indexer) throw Throwable {

            if (ExceptionUtils.containsThrowable(failure, EsRejectedExecutionException.class)) {
                // full queue; re-add document for indexing
                indexer.add(action);
            } else if (ExceptionUtils.containsThrowable(failure, ElasticsearchParseException.class)) {
                // malformed document; simply drop request without failing sink
            } else {
                // for all other failures, fail the sink
                // here the failure is simply rethrown, but users can also choose to throw custom exceptions
                throw failure;
            }
        }
}));

如果僅僅只是想做失敗重試,也可以直接使用官方提供的默認(rèn)的 RetryRejectedExecutionFailureHandler ,該處理器會對 EsRejectedExecutionException 導(dǎo)致到失敗寫入做重試處理。如果你沒有設(shè)置失敗處理器(failure handler),那么就會使用默認(rèn)的 NoOpFailureHandler 來簡單處理所有的異常。

總結(jié)

本文寫了 Flink connector es,將 Kafka 中的數(shù)據(jù)讀取并存儲到 ElasticSearch 中,文中講了如何封裝自帶的 sink,然后一些擴(kuò)展配置以及 FailureHandler 情況下要怎么處理。(這個問題可是線上很容易遇到的)

關(guān)注我

轉(zhuǎn)載請務(wù)必注明原創(chuàng)地址為:http://www.54tianzhisheng.cn/2018/12/30/Flink-ElasticSearch-Sink/

微信公眾號:zhisheng

另外我自己整理了些 Flink 的學(xué)習(xí)資料,目前已經(jīng)全部放到微信公眾號了。你可以加我的微信:zhisheng_tian,然后回復(fù)關(guān)鍵字:Flink 即可無條件獲取到。

Github 代碼倉庫

https://github.com/zhisheng17/flink-learning/

以后這個項(xiàng)目的所有代碼都將放在這個倉庫里,包含了自己學(xué)習(xí) flink 的一些 demo 和博客

相關(guān)文章

1、《從0到1學(xué)習(xí)Flink》—— Apache Flink 介紹

2、《從0到1學(xué)習(xí)Flink》—— Mac 上搭建 Flink 1.6.0 環(huán)境并構(gòu)建運(yùn)行簡單程序入門

3、《從0到1學(xué)習(xí)Flink》—— Flink 配置文件詳解

4、《從0到1學(xué)習(xí)Flink》—— Data Source 介紹

5、《從0到1學(xué)習(xí)Flink》—— 如何自定義 Data Source ?

6、《從0到1學(xué)習(xí)Flink》—— Data Sink 介紹

7、《從0到1學(xué)習(xí)Flink》—— 如何自定義 Data Sink ?

8、《從0到1學(xué)習(xí)Flink》—— Flink Data transformation(轉(zhuǎn)換)

9、《從0到1學(xué)習(xí)Flink》—— 介紹Flink中的Stream Windows

10、《從0到1學(xué)習(xí)Flink》—— Flink 中的幾種 Time 詳解

11、《從0到1學(xué)習(xí)Flink》—— Flink 寫入數(shù)據(jù)到 ElasticSearch

12、《從0到1學(xué)習(xí)Flink》—— Flink 項(xiàng)目如何運(yùn)行?

13、《從0到1學(xué)習(xí)Flink》—— Flink 寫入數(shù)據(jù)到 Kafka

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/73067.html

相關(guān)文章

  • 01學(xué)習(xí)Flink》—— Apache Flink 介紹

    摘要:擴(kuò)展庫還包括用于復(fù)雜事件處理,機(jī)器學(xué)習(xí),圖形處理和兼容性的專用代碼庫。事件時間機(jī)制使得那些事件無序到達(dá)甚至延遲到達(dá)的數(shù)據(jù)流能夠計(jì)算出精確的結(jié)果。負(fù)責(zé)接受用戶的程序代碼,然后創(chuàng)建數(shù)據(jù)流,將數(shù)據(jù)流提交給以便進(jìn)一步執(zhí)行。 showImg(https://segmentfault.com/img/remote/1460000016902812); 前言 Flink 是一種流式計(jì)算框架,為什么我...

    flyer_dev 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<