由于項目需求,用Flink生成完實時數(shù)據(jù)之后,需要立即給前臺做展示,且日志項目實時數(shù)據(jù)數(shù)量較大,因此采用了Redis作為數(shù)據(jù)庫,用來存儲最近一小段時間的數(shù)據(jù),供前臺來進(jìn)行實時展示。
Flink的Sink方法提供了一個redis的sink包(flink-connector-redis_2.11),但是使用過程中發(fā)現(xiàn)與業(yè)務(wù)不符合。主要是2個方面:
Flink提供的包入redis時不能設(shè)置失效時間,業(yè)務(wù)場景是只需要保存最新的實時數(shù)據(jù)一段時間,需要設(shè)置失效時間,讓數(shù)據(jù)過期,從而提高資源的利用和性能。
業(yè)務(wù)使用的是Redis集群模式,F(xiàn)link提供的Jar包雖然支持集群但是沒有設(shè)置密碼的地方,業(yè)務(wù)的Redis集群都有設(shè)置密碼的強(qiáng)制要求。
對Flink提供RedisSink包進(jìn)行改造,具體的是:1.增加可以設(shè)置失效時間的方法;2.集群模式提供Redis的驗證方法。
由于原來的RedisMapper只提供了獲取入庫方式、獲取key以及獲取Value,三個接口,沒有提供獲取失效時間的接口。
所以首先重新定義RedisMapper接口類,增加一個獲取失效時間的接口。
其次在RedisCommand中增加一個帶失效時間的命令
SETEX(BasicRedisDataType.STRING)接口既然以及改好了,就要去重新定義它的實現(xiàn)了。
接著在RedisSink的invoke方法中提供獲取失效時間的代碼,以及SETNX命令的實現(xiàn)。
由于原來的RedisCommandsContainer接口中不包含失效時間的方法,所以需要新增一個含失效時間的接口。
在具體的RedisClusterContainer集群實現(xiàn)該方法。
這樣第一個問題就解決了,只要將設(shè)置為RedisCommand.SETEX就可以進(jìn)行帶失效時間的Sink方法了。
關(guān)于增加設(shè)置密碼的方式,F(xiàn)link提供的包中JedisClusterConfig是缺失了集群密碼設(shè)置的。因此需要添加一個密碼選項,并提供set方法。
然后初始化的時候增加一個構(gòu)造方法。
其實最終還是調(diào)用的原生redis連接包來創(chuàng)建了一個集群對象redis.clients.jedis.JedisCluster,在RedisCommandsContainerBuilder中添加帶密碼的redis集群創(chuàng)建方式。
通過這樣改造就擁有了設(shè)置密碼的方式,如果還少了別的參數(shù)同理可以通過這樣的方式給添加上去。為了靈活改造我們實際直接將整個jar拉取了下來,重新定義了一個屬于自己的Sink定制包,最終使用如下:
簡要的給出類圖
本文主要通過結(jié)合項目的實際使用場景,在flink與redis的sink包(flink-connector-redis_2.11)不能滿足要求的情況下,在原有包的基礎(chǔ)上根據(jù)實際使用情況,對該包進(jìn)行了一系列的改造,主要包括提供設(shè)置密碼的redis集群初始化以及在sink過程中可以指定redis鍵值的失效時長,最終通過改造的flink-connector-redis在項目flink任務(wù)中使用良好。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/130151.html
摘要:基于流處理機(jī)制實現(xiàn)批流融合相對基于批處理機(jī)制實現(xiàn)批流融合的思想更自然,更合理,也更有優(yōu)勢,因此阿里巴巴在基于支持大量核心實時計算場景的同時,也在不斷改進(jìn)的架構(gòu),使其朝著真正批流融合的統(tǒng)一計算引擎方向前進(jìn)。 阿里妹導(dǎo)讀:2018年12月下旬,由阿里巴巴集團(tuán)主辦的Flink Forward China在北京國家會議中心舉行。Flink Forward是由Apache軟件基金會授權(quán)的全球范圍...
摘要:基于在阿里巴巴搭建的平臺于年正式上線,并從阿里巴巴的搜索和推薦這兩大場景開始實現(xiàn)。在經(jīng)過一番調(diào)研之后,阿里巴巴實時計算認(rèn)為是一個非常適合的選擇。接下來,我們聊聊阿里巴巴在層對又大刀闊斧地進(jìn)行了哪些改進(jìn)。 Apache Flink 概述 Apache Flink(以下簡稱Flink)是誕生于歐洲的一個大數(shù)據(jù)研究項目,原名StratoSphere。該項目是柏林工業(yè)大學(xué)的一個研究性項目,早期...
摘要:實際上,本身就預(yù)留了與外部元數(shù)據(jù)對接的能力,分別提供了和這兩個抽象。對接外部數(shù)據(jù)源搞清楚了注冊庫表的過程,給我們帶來這樣一個思路如果外部元數(shù)據(jù)創(chuàng)建的表也能被轉(zhuǎn)換成可識別的,那么就能被無縫地注冊到。 本文整理自 2019 年 4 月 13 日在深圳舉行的 Flink Meetup 會議,分享嘉賓張俊,目前擔(dān)任 OPPO 大數(shù)據(jù)平臺研發(fā)負(fù)責(zé)人,也是 Apache Flink contrib...
閱讀 1356·2023-01-11 13:20
閱讀 1707·2023-01-11 13:20
閱讀 1215·2023-01-11 13:20
閱讀 1906·2023-01-11 13:20
閱讀 4165·2023-01-11 13:20
閱讀 2757·2023-01-11 13:20
閱讀 1402·2023-01-11 13:20
閱讀 3671·2023-01-11 13:20