基于這個(gè)需求,flink本身提供了很多的任務(wù)運(yùn)行時(shí)刻Metrics相關(guān)指標(biāo),避免任務(wù)的運(yùn)行處于黑盒狀態(tài),通過分析指標(biāo),可以快速的調(diào)整任務(wù)的資源、定位遇到的問題。目前獲取 任務(wù)Metrics 有三種方式:
方式一:
通過flink WebUI 進(jìn)入Metrics 選項(xiàng)卡,根據(jù)不同算子,選擇需要監(jiān)測指標(biāo),實(shí)時(shí)查看指標(biāo)數(shù)據(jù),缺點(diǎn)比較明顯,無法查看歷史監(jiān)測數(shù)據(jù),需要一直打開,并且無法設(shè)置告警,適合開發(fā)過程時(shí)使用。
方式二:
官方提供了一種通過 REST API獲取方式指標(biāo)的方式,
https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/ops/rest_api/,提供的api 主要是面向 JobManager 對(duì)象的相關(guān)動(dòng)作api, 主要用于任務(wù)提交等操作,但所提供TaskManager對(duì)象指標(biāo)信息很少。通過此方式獲取相應(yīng)的指標(biāo),需要另外開發(fā)一套自動(dòng)化腳本或者程序,定期調(diào)用api 獲取相關(guān)信息,前題條件是需要提前規(guī)則監(jiān)測的指標(biāo)及對(duì)應(yīng)的請(qǐng)求的api 地址,增大了系統(tǒng)復(fù)雜程度。
方式三:
flink 提供了一種MetricsReporter機(jī)制,可以將各個(gè)組件的metrics數(shù)據(jù),通過不同的Metric Reporter插件將數(shù)據(jù)自動(dòng)暴露給外部系統(tǒng),這樣可以充份利用使用第三方的存儲(chǔ)和分析能力。
目前flink已經(jīng)支持了很多reporter,如Graphite、JMX、InfluxDB、Prometheus等,不管用哪一處方式,都需要額外部署第三方系統(tǒng)來,進(jìn)行接收、解析、分析metric數(shù)據(jù)。
我們本身已有了自動(dòng)化運(yùn)維平臺(tái),不會(huì)考慮部署像Prometheus這樣的第三方平臺(tái),需要做的是如何將metric數(shù)據(jù)跟自動(dòng)運(yùn)維平臺(tái)告警模塊進(jìn)行對(duì)接使用,告警模塊主要是通過kafka進(jìn)行對(duì)接數(shù)據(jù),所以采用自定義kafka reporter 解決數(shù)據(jù)對(duì)接問題。
flink metrices指標(biāo)項(xiàng)比較多,指標(biāo)數(shù)據(jù)量級(jí)跟所跑的任務(wù)個(gè)數(shù)有著直接的關(guān)系,我們關(guān)注的核心指標(biāo)項(xiàng),對(duì)核心指標(biāo)進(jìn)行規(guī)則告警,接下來介紹如何基于flink 現(xiàn)有的reporter 代碼實(shí)現(xiàn)kafka reporter功能點(diǎn):
1. 下載對(duì)應(yīng)版本flink 分支代碼,如
https://github.com/apache/flink/releases/tag/release-1.13.6
2. 解壓源代碼,導(dǎo)入開發(fā)工具,查看flink-metrics模塊代碼
3. 根據(jù)自己實(shí)際場景,對(duì)吐數(shù)據(jù)格式要求,參考不同自帶模板代碼,以flink-metrics-InfluxDB模塊代碼為例,新建flink-metrics-kafka
3.1 修改flink-metrics/pom.xml 文件,新增
3.2 新增KafkaReporterFactory主程序,需要實(shí)現(xiàn)MetricReporterFactory接口并重寫方法。
@InterceptInstantiationViaReflection(
reporterClassName = "org.apache.flink.metrics.kafka.KafkaReporter")
public class KafkaReporterFactory implements MetricReporterFactory {
@Override
public MetricReporter createMetricReporter(Properties properties) {
return new KafkaReporter();
}
}
3.3 新增KafkaReporter實(shí)現(xiàn)類,需要繼承AbstractReporter并實(shí)現(xiàn)Scheduled接口并重寫方法,主要作用是收集指標(biāo)數(shù)據(jù),并推送到kafka。
讀取配置flink配置文件conf/flink-conf.yaml kakfa 服務(wù)器地址及topic地址,初始化KafkaProducer消息生產(chǎn)者對(duì)象。
@Override
public void open(MetricConfig metricConfig) {
LOG.info("metricConfig:" + metricConfig);
topic = metricConfig.getString("topic", "");
if (StringUtils.isBlank(topic)) {
LOG.error("metrics.reporter.kafka_reporter.topic is null");
}
String endsWithMetric = metricConfig.getString("endswith.metricname", "").trim(); //指定需要獲取指標(biāo)名稱
endsWithMetricList = Arrays.asList(endsWithMetric.split(","));
String bootstrapservers = metricConfig.getString("bootstrap.servers", "");
if (StringUtils.isBlank(bootstrapservers)) {
LOG.error("metrics.reporter.kafka_reporter.bootstrap.servers is null");
}
Properties properties = new Properties();
properties.put("bootstrap.servers", bootstrapservers);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put(
"value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProducer = new KafkaProducer<String, String>(properties);
}
指標(biāo)數(shù)據(jù)的類型及格式是根據(jù)它所歸屬metrics類型(Counters/Gauges/Histograms/Meters)有關(guān),然后我們可以對(duì)指標(biāo)數(shù)據(jù)進(jìn)行格式化輸出所需要格式到kafka。
① Counters: 統(tǒng)計(jì)的是一個(gè)累加值,用與存儲(chǔ)數(shù)值類型指標(biāo)數(shù)據(jù)。
② Gauges:用來存儲(chǔ)任何類型指標(biāo)數(shù)據(jù)。
③ Histograms:度量值的統(tǒng)計(jì)結(jié)果,如平均值、最大值等。
④ Meters:用來計(jì)算平均速率,平均吞吐量等。
@Override
public void report() {
tryReport();
}
private final ObjectMapper mapper = new ObjectMapper();
private void tryReport() {
Instant timestamp = Instant.now();
try {
String job_id = "";
String job_name = "";
List metriclist = new ArrayList<>();
metriclist.addAll(gauges.values()); //獲取gauges類型指標(biāo)集
metriclist.addAll(counters.values());//獲取gauges類型指標(biāo)集
metriclist.addAll(histograms.values());//獲取histograms類型指標(biāo)集
metriclist.addAll(meters.values());//獲取meters類型指標(biāo)集
//每個(gè)指標(biāo)數(shù)據(jù)里面都加上對(duì)應(yīng)的job_id, job_name
for (MeasurementInfo info : metriclist) {
if (info.getName().startsWith("jobmanager_job_")
|| info.getName().startsWith("taskmanager_job_")) {
job_id = info.getTags().getOrDefault(JOB_ID, "");
job_name = info.getTags().getOrDefault(JOB_NAME, "");
if (StringUtils.isBlank(job_id) || StringUtils.isBlank(job_name)) {
LOG.error("do not get job_id or job name:{}", info);
}
break;
}
}
List
新增flink-metrics-kafka項(xiàng)目有兩種打包方式:
基于完整的flink源碼項(xiàng)目,進(jìn)行全量打包。
保留flink maven父子結(jié)構(gòu),flink parent pom.xml 只留
3.4 修改flink配置文件conf/flink-conf.yaml,主要包括Reporter全類名,上報(bào)周期,指定所需的指標(biāo)名。
metrics.reporters:kafka_reporter
metrics.reporter.kafka_reporter.factory.class:org.apache.flink.metrics.kafka.KafkaReporterFactory
metrics.reporter.kafka_reporter.interval:60 SECONDS
#kafka地址
metrics.reporter.kafka_reporter.bootstrap.servers:XXX.XXX.XXX.10:9090
#kafkatopic
metrics.reporter.kafka_reporter.topic:kafka_topic
#指標(biāo)名稱按后綴進(jìn)行過濾,注釋則不過濾
metrics.reporter.kafka_reporter.endswith.metricname:job_numRestarts,job_restartingTime,job_uptime,currentOutputWatermark,Status_JVM_CPU_Load,Status_JVM_Memory_Heap_Used
3.5 提交任務(wù),消費(fèi)kafka 可以獲取對(duì)應(yīng)的數(shù)據(jù)。
[
{
"name": "jobmanager.uptime",
"time": 1647314569119,
"fields": {
"value": 1703478823
},
"tags": {
"host": "bigdata-03",
"job_id": "dc7a58b3f202059cd72c3467ecedb4b7",
"job_name": "amp_zabbix_pre"
}
},
{
"name": "jobmanager.Status.JVM.Memory.Heap.Max",
"time": 1647314569119,
"fields": {
"value": 468713472
},
"tags": {
"host": "bigdata-03",
"job_id": "",
"job_name": ""
}
},
...
]
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/129566.html
摘要:從上面自定義的可以看到我們繼承的就是這個(gè)類,那么來了解一下一個(gè)抽象類,繼承自。該類的子類有三個(gè),兩個(gè)是抽象類,在此基礎(chǔ)上提供了更具體的實(shí)現(xiàn),另一個(gè)是。 showImg(https://segmentfault.com/img/remote/1460000016978900?w=1920&h=1641); 前言 在 《從0到1學(xué)習(xí)Flink》—— Data Source 介紹 文章中,我...
摘要:從到學(xué)習(xí)介紹從到學(xué)習(xí)介紹其中包括了和的,后面我也講了下如何自定義自己的和。這個(gè)問題可是線上很容易遇到的關(guān)注我轉(zhuǎn)載請(qǐng)務(wù)必注明原創(chuàng)地址為微信公眾號(hào)另外我自己整理了些的學(xué)習(xí)資料,目前已經(jīng)全部放到微信公眾號(hào)了。 showImg(https://segmentfault.com/img/remote/1460000017935460?w=1280&h=853); 前言 前面 FLink 的文章中...
閱讀 1357·2023-01-11 13:20
閱讀 1707·2023-01-11 13:20
閱讀 1215·2023-01-11 13:20
閱讀 1906·2023-01-11 13:20
閱讀 4165·2023-01-11 13:20
閱讀 2757·2023-01-11 13:20
閱讀 1402·2023-01-11 13:20
閱讀 3672·2023-01-11 13:20