平時(shí)ogg數(shù)據(jù)同步的案例中,遇到最多就是常見RDBMS之間的同步,源端和目標(biāo)端分別找合適安裝包,按照經(jīng)典的抽取、傳輸、復(fù)制3個(gè)進(jìn)程進(jìn)行實(shí)施,例如oracle到oracle,oracle到mysql,sqlserver到oracle等等場景。早就知道ogg也可以同步數(shù)據(jù)到Hahoop等BigData平臺(tái)及Kafka等消息中間件進(jìn)行同步,今天終于遇到這樣的實(shí)際需求了。
需求:某系統(tǒng)需要原生實(shí)時(shí)數(shù)據(jù)采集到大數(shù)據(jù)平臺(tái)。根據(jù)統(tǒng)一的實(shí)現(xiàn)方案,kafka規(guī)劃原則:
省分topic隔離。不同省分,不共用相同的topic;
依據(jù)省份生產(chǎn)庫實(shí)例建設(shè)情況,按實(shí)例劃分對接topic,要求kafka一個(gè)topic對應(yīng)一個(gè)物理數(shù)據(jù)庫實(shí)例;
為保證每張表數(shù)據(jù)在kafka中保證“有序”,要求每張表對應(yīng)topic的一個(gè)partition;
具體省分kafka規(guī)劃方案如下:
針對本次環(huán)境源端是oracle12c數(shù)據(jù)庫,目標(biāo)端是Kafka2.11-1.0.2集群。源端使用ogg12cfor oracle即可,目標(biāo)端我們需要使用OGG_BigData的軟件包來實(shí)現(xiàn)的。根據(jù)官方的版本適配文檔,確定使用19c19.1來搭建。
源端軟件包
123014_fbo_ggs_Linux_x64_shiphome.zip
(12.3.0.1.4OGGCORE_12.3.0.1.0_PLATFORMS_180415.0359_FBO)
目標(biāo)端軟件包
OGG_BigData_Linux_x64_19.1.0.0.1.zip
(19.1.0.0.2OGGCORE_OGGADP.19.1.0.0.2_PLATFORMS_190916.0039)
對于kafka集群來說,ogg的目標(biāo)端程序?qū)嶋H上是作為kafka的生產(chǎn)者客戶端,把解析trail文件得到的數(shù)據(jù)推送到kafka中。
源端數(shù)據(jù)庫12cRAC的多租戶模式,使用ogg12c12.3的集成模式,按常規(guī)配置ex_kaf、dp_kaf進(jìn)程即可。
抽取進(jìn)程EX_KAF參數(shù)
##view param EX_KAF EXTRACT ex_kaf SETENV (NLS_LANG=AMERICAN_AMERICA.ZHS16GBK) setenv (ORACLE_HOME=/u01/app/oracle/product/12.2.0.1/db_1) SETENV (ORACLE_SID=db2) USERID c##ggs@db, PASSWORD XXXX EXTTRAIL ./dirdat/ha --DISCARDFILE ./dirrpt/ex_kaf.DSC, APPEND,MEGABYTES 100 -- report info REPORTCOUNT EVERY 10 MINUTES, RATE WARNLONGTRANS 2h,CHECKINTERVAL 30m dboptions allowunusedcolumn fetchoptions nousesnapshot LOGALLSUPCOLS //加入前鏡像(12c新版本特有參數(shù),遇到josn格式不固定,新老參數(shù)同時(shí)加) getupdatebefores // 加入前鏡像(老版本參數(shù)) nocompressdeletes / /加入前鏡像(老版本參數(shù)) nocompressupdates // 加入前鏡像(老版本參數(shù)) --crm3hj 208 tables TABLE crm3hj.CUST.TAB; |
##view param DP_KAF EXTRACT dp_kaf PASSTHRU //傳輸進(jìn)程透傳,不做任何處理 rmthost 192.168.100.100, mgrport 7809 , TCPBUFSIZE 300000, TCPFLUSHBYTES 300000//目標(biāo)端地址 rmttrail ./dirdat/hk EOFDELAYCSECS 10 --gettruncates --crm3hj 8 tables TABLE crm3hj.CUST.TAB; |
解壓ogg壓縮包
tar xvfOGG_BigData_Linux_x64_19.1.0.0.1.tar -C /oggdata/ggv191adp
因?yàn)橐卿浀?/span>kafka集群,需要引用對應(yīng)jar包,故需部署
tar -zxvfkafka_2.11-1.0.2.tgz -C /oggdata/kafka
kafka的lib目錄為 /oggdata/kafka/kafka_2.11-1.0.2/libs
參數(shù)文件配置,這里除了rp_kaf進(jìn)程參數(shù)文件外,還有kafka屬性參數(shù)文件、生產(chǎn)者屬性配置參數(shù)文件。
rp_kaf.prm參數(shù)文件
[oracle@server003 dirprm]$ cat rp_kaf.prm REPLICAT rp_kaf //定義進(jìn)程名稱 sourcedefs ./dirdef/o2kaf_jt.def //指定使用源和目標(biāo)的表映射文件,高版本可省略 TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka_crm_ha.props //定義kafka一些適配性的庫文件以及配置文件,配置文件指定屬性文件 --reperror default, abend reperror default, discard //錯(cuò)誤處理,這里將錯(cuò)誤信息記錄DSC文件 DISCARDFILE ./dirrpt/rp_kaf.DSC, APPEND, MEGABYTES 4096 //對DSC文件的屬性定義 REPORTCOUNT EVERY 10 MINUTES, RATE //報(bào)告指定10min統(tǒng)計(jì)一次報(bào)告 --grouptransops 10000 //組提交,以事務(wù)傳輸時(shí),事務(wù)合并的單位,減少IO操作 MAP CRM3HJ.CUST.TAB, TARGET CUST. TAB; //具體表的映射配置 |
kafka_crm_ha.props參數(shù)文件
[oracle@server003 dirprm]$ cat kafka_crm_ha.props gg.handlerlist=kafkahandler //handler類型 gg.handler.kafkahandler.type=kafka gg.handler.kafkahandler.topicMappingTemplate=tp_share_crm //指定kafka的topic gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer_JT.properties //指定kafka生產(chǎn)者配置文件 gg.handler.kafkahandler.ProducerRecordClass=oracle.goldengate.handler.kafka.MyCreateProducerRecordHa//生產(chǎn)者方法 gg.handler.kafkahandler.BlockingSend=false gg.handler.kafkahandler.includeTokens=false gg.handler.kafkahandler.mode=op //OGG for BigData中傳輸模式,即op為一次SQL傳輸一次,tx為一次事務(wù)傳輸一次 gg.handler.kafkahandler.format=json #傳輸?shù)南⒆罱K解析的格式,格式相關(guān) gg.handler.kafkahandler.format.includePrimaryKeys=true gg.handler.kafkahandler.format.insertOpKey=I gg.handler.kafkahandler.format.updateOpKey=U gg.handler.kafkahandler.format.deleteOpKey=D gg.handler.kafkahandler.authType=kerberos //kerberos安全認(rèn)證相關(guān) gg.handler.kafkahandler.kerberosKeytabFile=/home/oracle/KDC/kafka_XXXX.keytab gg.handler.kafkahandler.kerberosPrincipal=kafka_XXXX@HADOOP.XXXX.CN goldengate.userexit.timestamp=utc+8 goldengate.userexit.writers=javawriter javawriter.stats.display=true javawriter.stats.full=true javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar -Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/home/oracle/kafka_jass.conf //JVM設(shè)置 gg.log=log4j //日志記錄 gg.log.level=INFO gg.report.time=30sec gg.classpath=dirprm/:/oggdata/kafka/kafka_2.11-1.0.2/libs/*:/oggdata/ggv191adp/:/oggdata/ggv191adp/lib/* #Kafka的lib目錄 |
核心參數(shù)說明:
custom_kafka_producer.properties參數(shù)文件
[oracle@server003 dirprm]$ cat custom_kafka_producer_JT.properties bootstrap.servers=XXXX.COM //kafka集群的broker的地址 acks=1 //參考KAFKA的acks參數(shù) compression.type=gzip //壓縮類型 reconnect.backoff.ms=1000 //重連延時(shí) max.request.size=5024000 //請求發(fā)送設(shè)置 send.buffer.bytes=5024000 value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer batch.size=102400 linger.ms=10000 security.protocol=SASL_PLAINTEXT sasl.kerberos.service.name=kafka // kerberos安全認(rèn)證相關(guān) saal.machanism=GSSAPI |
測試實(shí)踐過程中,遇到最大的問題就是默認(rèn)情況下,ogg會(huì)把所有表都放在同一topic下,而根據(jù)規(guī)范文檔,不同的表要對應(yīng)同一topic下的不同分區(qū)。
實(shí)際需求是每個(gè)表對應(yīng)一個(gè)partition,例如
我們在kafka屬性參數(shù)文件中,可以指定自定義生產(chǎn)者方法,繼承ogg自帶的生產(chǎn)者父類,編寫自己的生產(chǎn)者方法這樣就能實(shí)現(xiàn)表與分區(qū)的對應(yīng)關(guān)系。
gg.handler.kafkahandler.ProducerRecordClass=MyCreateProducerRecordXX.java
第1步:編寫MyCreateProducerRecordXX.java文件
新建MyCreateProducerRecordXX類,實(shí)現(xiàn)ogg預(yù)定義好的接口方法CreateProducerRecord,編寫自定義createProducerRecord方法。
第2步:編譯jar包
第3步:替換jar包
[oracle@server003 lib]$ls -rtlh ggkafka-19.1.0.0.1.003.jar
-rwxr-xr-x 1 oracleoinstall 27K Sep 26 03:37 ggkafka-19.1.0.0.1.003.jar
[oracle@server003 lib]$pwd
/oggdata/ggv191adp/ggjava/resources/lib
第4步:指定使用的自定義方法
rp_kaf進(jìn)程同步的數(shù)據(jù),也就是生產(chǎn)的消息其實(shí)是json格式的DML操作數(shù)據(jù),我們可以使用消費(fèi)者命令檢查查看數(shù)據(jù)內(nèi)容:表名、操作類型、更新時(shí)間、主鍵信息、數(shù)據(jù)行before鏡像,數(shù)據(jù)行after鏡像。
1) ogg往kafka傳數(shù)據(jù)大體上還是之前的套路,區(qū)別點(diǎn)就在于怎么把復(fù)制進(jìn)程當(dāng)做客戶端,當(dāng)做生產(chǎn)者去往kafka對應(yīng)的topic上生產(chǎn)數(shù)據(jù)。
2) Kafka作為高吞吐量、低延遲、高并發(fā)的消息中間件產(chǎn)品,我們的同步進(jìn)程甚至不需要考慮目標(biāo)端的性能問題,只要往kafka上推送數(shù)據(jù),最終的數(shù)據(jù)使用則是另一端的消費(fèi)者程序怎么使用數(shù)據(jù)的問題。
3) 既然是BigData的adapter軟件包,還可以實(shí)現(xiàn)往HDFS、Hive、Hbase、ApacheCassandra、MongoDB、Greenplum等多種開源產(chǎn)品中同步數(shù)據(jù),基本與kafka的配置類似,自定義類的實(shí)現(xiàn)為數(shù)據(jù)同步提供了更多的可能性,有待嘗試。
4) 實(shí)際生產(chǎn)中的配置,還涉及到安全認(rèn)證的問題,KDC的認(rèn)證在這里省略。
5) 擴(kuò)展聯(lián)想一下,如果kafka消費(fèi)者程序可以連接到不同的數(shù)據(jù)庫、不同的大數(shù)據(jù)開源組件進(jìn)行數(shù)據(jù)的消費(fèi),那么就可以形成一個(gè)統(tǒng)一的模式,ogg_for_XXDB? ogg_for Big Data?KAFKA?任意數(shù)據(jù)庫。
6) 再聯(lián)想一下,在kafka上看到j(luò)son格式里有數(shù)據(jù)變化的前后鏡像,是不是可以結(jié)合這個(gè)做一個(gè)基于ogg的數(shù)據(jù)操作閃回功能?
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/130014.html
摘要:從零開始設(shè)計(jì)開發(fā)一個(gè)日處理數(shù)據(jù)億的大數(shù)據(jù)高并發(fā)實(shí)時(shí)系統(tǒng),哪些性能問題需要特別注意這里我們一起梳理一下本文中我將以,同學(xué)戲稱的系統(tǒng)網(wǎng)易云捕設(shè)計(jì)開發(fā)實(shí)踐中兩年的時(shí)間里碰到的真實(shí)問題,踩過的坑及解決問題的方法和大家一起討論如何解決這些問題。 本文由作者余寶虹授權(quán)網(wǎng)易云社區(qū)發(fā)布。 從零開始設(shè)計(jì)開發(fā)一個(gè)日處理數(shù)據(jù)8億的大數(shù)據(jù)高并發(fā)實(shí)時(shí)系統(tǒng),哪些性能問題需要特別注意?這里我們一起梳理一下,本文中我...
摘要:慕課網(wǎng)流處理平臺(tái)學(xué)習(xí)總結(jié)時(shí)間年月日星期日說明本文部分內(nèi)容均來自慕課網(wǎng)。 慕課網(wǎng)《Kafka流處理平臺(tái)》學(xué)習(xí)總結(jié) 時(shí)間:2018年09月09日星期日 說明:本文部分內(nèi)容均來自慕課網(wǎng)。@慕課網(wǎng):https://www.imooc.com 教學(xué)源碼:無 學(xué)習(xí)源碼:https://github.com/zccodere/s... 第一章:課程介紹 1-1 課程介紹 課程介紹 Kafk...
OGG Integrated Native DDL簡單測試 img{ display:block; margin:0 auto !important; width:100%; } body{ width:75%;...
閱讀 1356·2023-01-11 13:20
閱讀 1707·2023-01-11 13:20
閱讀 1215·2023-01-11 13:20
閱讀 1906·2023-01-11 13:20
閱讀 4165·2023-01-11 13:20
閱讀 2757·2023-01-11 13:20
閱讀 1402·2023-01-11 13:20
閱讀 3671·2023-01-11 13:20