成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

ogg同步數(shù)據(jù)到kafka案例簡介

IT那活兒 / 1266人閱讀
ogg同步數(shù)據(jù)到kafka案例簡介


點(diǎn)擊上方藍(lán)字關(guān)注我們


1、前  言


平時(shí)ogg數(shù)據(jù)同步的案例中,遇到最多就是常見RDBMS之間的同步,源端和目標(biāo)端分別找合適安裝包,按照經(jīng)典的抽取、傳輸、復(fù)制3個(gè)進(jìn)程進(jìn)行實(shí)施,例如oracleoracle,oraclemysql,sqlserveroracle等等場景。早就知道ogg也可以同步數(shù)據(jù)到HahoopBigData平臺(tái)及Kafka等消息中間件進(jìn)行同步,今天終于遇到這樣的實(shí)際需求了。

需求:某系統(tǒng)需要原生實(shí)時(shí)數(shù)據(jù)采集到大數(shù)據(jù)平臺(tái)。根據(jù)統(tǒng)一的實(shí)現(xiàn)方案,kafka規(guī)劃原則:

  • 省分topic隔離。不同省分,不共用相同的topic

  • 依據(jù)省份生產(chǎn)庫實(shí)例建設(shè)情況,按實(shí)例劃分對接topic,要求kafka一個(gè)topic對應(yīng)一個(gè)物理數(shù)據(jù)庫實(shí)例;

  • 為保證每張表數(shù)據(jù)在kafka中保證“有序”,要求每張表對應(yīng)topic的一個(gè)partition;

具體省分kafka規(guī)劃方案如下:


2、背  景


  2.1 GoldenGate for Big Data 19c

從圖上可以看到,OGG_BigData_Adapter支持的開源組件還真不少。


  2.2 軟件版本

針對本次環(huán)境源端是oracle12c數(shù)據(jù)庫,目標(biāo)端是Kafka2.11-1.0.2集群。源端使用ogg12cfor oracle即可,目標(biāo)端我們需要使用OGG_BigData的軟件包來實(shí)現(xiàn)的。根據(jù)官方的版本適配文檔,確定使用19c19.1來搭建。


源端軟件包  

123014_fbo_ggs_Linux_x64_shiphome.zip

12.3.0.1.4OGGCORE_12.3.0.1.0_PLATFORMS_180415.0359_FBO

目標(biāo)端軟件包

OGG_BigData_Linux_x64_19.1.0.0.1.zip

19.1.0.0.2OGGCORE_OGGADP.19.1.0.0.2_PLATFORMS_190916.0039


  2.3 邏輯流程

對于kafka集群來說,ogg的目標(biāo)端程序?qū)嶋H上是作為kafka的生產(chǎn)者客戶端,把解析trail文件得到的數(shù)據(jù)推送到kafka中。


3、實(shí)  施


  3.1 源端部署

源端數(shù)據(jù)庫12cRAC的多租戶模式,使用ogg12c12.3的集成模式,按常規(guī)配置ex_kafdp_kaf進(jìn)程即可。

抽取進(jìn)程EX_KAF參數(shù)

##view param EX_KAF


EXTRACT ex_kaf

SETENV (NLS_LANG=AMERICAN_AMERICA.ZHS16GBK)

setenv (ORACLE_HOME=/u01/app/oracle/product/12.2.0.1/db_1)

SETENV (ORACLE_SID=db2)

USERID c##ggs@db, PASSWORD XXXX

EXTTRAIL ./dirdat/ha

--DISCARDFILE ./dirrpt/ex_kaf.DSC, APPEND,MEGABYTES 100


-- report info

REPORTCOUNT EVERY 10 MINUTES, RATE

WARNLONGTRANS 2h,CHECKINTERVAL 30m


dboptions allowunusedcolumn

fetchoptions nousesnapshot

LOGALLSUPCOLS  //加入前鏡像(12c新版本特有參數(shù),遇到josn格式不固定,新老參數(shù)同時(shí)加

getupdatebefores  // 加入前鏡像(老版本參數(shù))

nocompressdeletes / /加入前鏡像(老版本參數(shù))

nocompressupdates // 加入前鏡像(老版本參數(shù))


--crm3hj 208 tables

TABLE crm3hj.CUST.TAB;



傳輸進(jìn)程DP_KAF參數(shù)


##view param DP_KAF

EXTRACT dp_kaf

PASSTHRU //傳輸進(jìn)程透傳,不做任何處理

rmthost 192.168.100.100, mgrport 7809 , TCPBUFSIZE 300000, TCPFLUSHBYTES 300000//目標(biāo)端地址

rmttrail ./dirdat/hk

EOFDELAYCSECS 10


--gettruncates

--crm3hj 8 tables

TABLE crm3hj.CUST.TAB;



  3.2 目標(biāo)端部署

解壓ogg壓縮包

tar xvfOGG_BigData_Linux_x64_19.1.0.0.1.tar -C /oggdata/ggv191adp

因?yàn)橐卿浀?/span>kafka集群,需要引用對應(yīng)jar包,故需部署

tar -zxvfkafka_2.11-1.0.2.tgz -C /oggdata/kafka

kafkalib目錄為 /oggdata/kafka/kafka_2.11-1.0.2/libs

參數(shù)文件配置,這里除了rp_kaf進(jìn)程參數(shù)文件外,還有kafka屬性參數(shù)文件、生產(chǎn)者屬性配置參數(shù)文件。


  • rp_kaf.prm參數(shù)文件

[oracle@server003 dirprm]$ cat rp_kaf.prm

REPLICAT rp_kaf //定義進(jìn)程名稱

sourcedefs ./dirdef/o2kaf_jt.def  //指定使用源和目標(biāo)的表映射文件,高版本可省略

TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka_crm_ha.props //定義kafka一些適配性的庫文件以及配置文件,配置文件指定屬性文件


--reperror default, abend

reperror default, discard  //錯(cuò)誤處理,這里將錯(cuò)誤信息記錄DSC文件

DISCARDFILE ./dirrpt/rp_kaf.DSC, APPEND, MEGABYTES 4096 //DSC文件的屬性定義


REPORTCOUNT EVERY 10 MINUTES, RATE   //報(bào)告指定10min統(tǒng)計(jì)一次報(bào)告

--grouptransops 10000  //組提交,以事務(wù)傳輸時(shí),事務(wù)合并的單位,減少IO操作


MAP CRM3HJ.CUST.TAB, TARGET CUST. TAB; //具體表的映射配置


  • kafka_crm_ha.props參數(shù)文件

[oracle@server003 dirprm]$ cat kafka_crm_ha.props

gg.handlerlist=kafkahandler //handler類型

gg.handler.kafkahandler.type=kafka

gg.handler.kafkahandler.topicMappingTemplate=tp_share_crm //指定kafkatopic

gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer_JT.properties //指定kafka生產(chǎn)者配置文件

gg.handler.kafkahandler.ProducerRecordClass=oracle.goldengate.handler.kafka.MyCreateProducerRecordHa//生產(chǎn)者方法

gg.handler.kafkahandler.BlockingSend=false

gg.handler.kafkahandler.includeTokens=false

gg.handler.kafkahandler.mode=op //OGG for BigData中傳輸模式,即op為一次SQL傳輸一次,tx為一次事務(wù)傳輸一次


gg.handler.kafkahandler.format=json #傳輸?shù)南⒆罱K解析的格式,格式相關(guān)

gg.handler.kafkahandler.format.includePrimaryKeys=true

gg.handler.kafkahandler.format.insertOpKey=I

gg.handler.kafkahandler.format.updateOpKey=U

gg.handler.kafkahandler.format.deleteOpKey=D


gg.handler.kafkahandler.authType=kerberos      //kerberos安全認(rèn)證相關(guān)

gg.handler.kafkahandler.kerberosKeytabFile=/home/oracle/KDC/kafka_XXXX.keytab

gg.handler.kafkahandler.kerberosPrincipal=kafka_XXXX@HADOOP.XXXX.CN


goldengate.userexit.timestamp=utc+8

goldengate.userexit.writers=javawriter

javawriter.stats.display=true

javawriter.stats.full=true

javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar -Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/home/oracle/kafka_jass.conf //JVM設(shè)置


gg.log=log4j  //日志記錄

gg.log.level=INFO

gg.report.time=30sec

gg.classpath=dirprm/:/oggdata/kafka/kafka_2.11-1.0.2/libs/*:/oggdata/ggv191adp/:/oggdata/ggv191adp/lib/* #Kafkalib目錄


核心參數(shù)說明:

  • custom_kafka_producer.properties參數(shù)文件

[oracle@server003 dirprm]$ cat custom_kafka_producer_JT.properties

bootstrap.servers=XXXX.COM //kafka集群的broker的地址

acks=1 //參考KAFKAacks參數(shù)

compression.type=gzip  //壓縮類型

reconnect.backoff.ms=1000 //重連延時(shí)


max.request.size=5024000 //請求發(fā)送設(shè)置

send.buffer.bytes=5024000


value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer

key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer


batch.size=102400

linger.ms=10000


security.protocol=SASL_PLAINTEXT

sasl.kerberos.service.name=kafka // kerberos安全認(rèn)證相關(guān)

saal.machanism=GSSAPI




  3.3 同一topic下表對應(yīng)partitions如何做

測試實(shí)踐過程中,遇到最大的問題就是默認(rèn)情況下,ogg會(huì)把所有表都放在同一topic下,而根據(jù)規(guī)范文檔,不同的表要對應(yīng)同一topic下的不同分區(qū)。

實(shí)際需求是每個(gè)表對應(yīng)一個(gè)partition,例如

我們在kafka屬性參數(shù)文件中,可以指定自定義生產(chǎn)者方法,繼承ogg自帶的生產(chǎn)者父類,編寫自己的生產(chǎn)者方法這樣就能實(shí)現(xiàn)表與分區(qū)的對應(yīng)關(guān)系。

gg.handler.kafkahandler.ProducerRecordClass=MyCreateProducerRecordXX.java


1步:編寫MyCreateProducerRecordXX.java文件

新建MyCreateProducerRecordXX類,實(shí)現(xiàn)ogg預(yù)定義好的接口方法CreateProducerRecord,編寫自定義createProducerRecord方法。

2步:編譯jar

3步:替換jar

[oracle@server003 lib]$ls -rtlh ggkafka-19.1.0.0.1.003.jar

-rwxr-xr-x 1 oracleoinstall 27K Sep 26 03:37 ggkafka-19.1.0.0.1.003.jar

[oracle@server003 lib]$pwd

/oggdata/ggv191adp/ggjava/resources/lib

4步:指定使用的自定義方法


  3.4 傳到kafka數(shù)據(jù)的json格式查看

rp_kaf進(jìn)程同步的數(shù)據(jù),也就是生產(chǎn)的消息其實(shí)是json格式的DML操作數(shù)據(jù),我們可以使用消費(fèi)者命令檢查查看數(shù)據(jù)內(nèi)容:表名、操作類型、更新時(shí)間、主鍵信息、數(shù)據(jù)行before鏡像,數(shù)據(jù)行after鏡像。


4、總結(jié)


1) ogg往kafka傳數(shù)據(jù)大體上還是之前的套路,區(qū)別點(diǎn)就在于怎么把復(fù)制進(jìn)程當(dāng)做客戶端,當(dāng)做生產(chǎn)者去往kafka對應(yīng)的topic上生產(chǎn)數(shù)據(jù)。

2) Kafka作為高吞吐量、低延遲、高并發(fā)的消息中間件產(chǎn)品,我們的同步進(jìn)程甚至不需要考慮目標(biāo)端的性能問題,只要往kafka上推送數(shù)據(jù),最終的數(shù)據(jù)使用則是另一端的消費(fèi)者程序怎么使用數(shù)據(jù)的問題。

3) 既然是BigData的adapter軟件包,還可以實(shí)現(xiàn)往HDFS、Hive、Hbase、ApacheCassandra、MongoDB、Greenplum等多種開源產(chǎn)品中同步數(shù)據(jù),基本與kafka的配置類似,自定義類的實(shí)現(xiàn)為數(shù)據(jù)同步提供了更多的可能性,有待嘗試。

4) 實(shí)際生產(chǎn)中的配置,還涉及到安全認(rèn)證的問題,KDC的認(rèn)證在這里省略。

5) 擴(kuò)展聯(lián)想一下,如果kafka消費(fèi)者程序可以連接到不同的數(shù)據(jù)庫、不同的大數(shù)據(jù)開源組件進(jìn)行數(shù)據(jù)的消費(fèi),那么就可以形成一個(gè)統(tǒng)一的模式,ogg_for_XXDB? ogg_for Big Data?KAFKA?任意數(shù)據(jù)庫。

6) 再聯(lián)想一下,在kafka上看到j(luò)son格式里有數(shù)據(jù)變化的前后鏡像,是不是可以結(jié)合這個(gè)做一個(gè)基于ogg的數(shù)據(jù)操作閃回功能?


END



文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/130014.html

相關(guān)文章

  • 網(wǎng)易云捕性能踩坑解決之道上篇

    摘要:從零開始設(shè)計(jì)開發(fā)一個(gè)日處理數(shù)據(jù)億的大數(shù)據(jù)高并發(fā)實(shí)時(shí)系統(tǒng),哪些性能問題需要特別注意這里我們一起梳理一下本文中我將以,同學(xué)戲稱的系統(tǒng)網(wǎng)易云捕設(shè)計(jì)開發(fā)實(shí)踐中兩年的時(shí)間里碰到的真實(shí)問題,踩過的坑及解決問題的方法和大家一起討論如何解決這些問題。 本文由作者余寶虹授權(quán)網(wǎng)易云社區(qū)發(fā)布。 從零開始設(shè)計(jì)開發(fā)一個(gè)日處理數(shù)據(jù)8億的大數(shù)據(jù)高并發(fā)實(shí)時(shí)系統(tǒng),哪些性能問題需要特別注意?這里我們一起梳理一下,本文中我...

    李義 評(píng)論0 收藏0
  • 慕課網(wǎng)_《Kafka流處理平臺(tái)》學(xué)習(xí)總結(jié)

    摘要:慕課網(wǎng)流處理平臺(tái)學(xué)習(xí)總結(jié)時(shí)間年月日星期日說明本文部分內(nèi)容均來自慕課網(wǎng)。 慕課網(wǎng)《Kafka流處理平臺(tái)》學(xué)習(xí)總結(jié) 時(shí)間:2018年09月09日星期日 說明:本文部分內(nèi)容均來自慕課網(wǎng)。@慕課網(wǎng):https://www.imooc.com 教學(xué)源碼:無 學(xué)習(xí)源碼:https://github.com/zccodere/s... 第一章:課程介紹 1-1 課程介紹 課程介紹 Kafk...

    Maxiye 評(píng)論0 收藏0
  • OGG Integrated Native DDL簡單測試

    OGG Integrated Native DDL簡單測試 img{ display:block; margin:0 auto !important; width:100%; } body{ width:75%;...

    IT那活兒 評(píng)論0 收藏1085

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<