摘要:最近為了導(dǎo)庫(kù)的問(wèn)題,費(fèi)了一些周折。可定制的數(shù)據(jù)導(dǎo)入工具基于的主要功能完全使用實(shí)現(xiàn)數(shù)據(jù)從到的遷移可批量導(dǎo)入多張表可自定義的數(shù)據(jù)遷移的規(guī)則數(shù)據(jù)表字段關(guān)系字段過(guò)濾使用正則進(jìn)行異常處理可自定義的異步分片導(dǎo)入方式,數(shù)據(jù)導(dǎo)入效率更高。
最近為了es導(dǎo)庫(kù)的問(wèn)題,費(fèi)了一些周折。于是乎做了一個(gè)小工具(用過(guò)npm的一些jdbc的導(dǎo)庫(kù)工具,感覺還不夠好用),這里拋磚引玉,自薦一下下,歡迎同道兄弟吐槽和參與,我會(huì)不定時(shí)的更新這個(gè)擴(kuò)展。
A customizable importer from mysql to elasticsearch.
可定制的 elasticsearch 數(shù)據(jù)導(dǎo)入工具 ——基于 elasticsearch 的 JS API
主要功能完全使用 JS 實(shí)現(xiàn)數(shù)據(jù)從 MySQL 到 elasticsearch 的遷移;
可批量導(dǎo)入多張 MySQL 表;
可自定義的數(shù)據(jù)遷移的規(guī)則(數(shù)據(jù)表/字段關(guān)系、字段過(guò)濾、使用正則進(jìn)行異常處理);
可自定義的異步分片導(dǎo)入方式,數(shù)據(jù)導(dǎo)入效率更高。
一鍵安裝npm install mysql_2_elasticsearch快速開始(簡(jiǎn)單用例)
var esMysqlRiver = require("mysql_2_elasticsearch"); var river_config = { mysql: { host: "127.0.0.1", user: "root", password: "root", database: "users", port: 3306 }, elasticsearch: { host_config: { // es客戶端的配置參數(shù) host: "localhost:9200", // log: "trace" }, index: "myIndex" }, riverMap: { "users => users": {} // 將數(shù)據(jù)表 users 導(dǎo)入到 es 類型: /myIndex/users } }; /* ** 以下代碼內(nèi)容: ** 通過(guò) esMysqlRiver 方法進(jìn)行數(shù)據(jù)傳輸,方法的回調(diào)參數(shù)(一個(gè)JSON對(duì)象) obj 包含此次數(shù)據(jù)傳輸?shù)慕Y(jié)果 ** 其中: ** 1. obj.total => 需要傳輸?shù)臄?shù)據(jù)表數(shù)量 ** 2. obj.success => 傳輸成功的數(shù)據(jù)表數(shù)量 ** 3. obj.failed => 傳輸失敗的數(shù)據(jù)表數(shù)量 ** 4. obj.result => 本次數(shù)據(jù)傳輸?shù)慕Y(jié)論 */ esMysqlRiver(river_config, function(obj) { /* 將傳輸結(jié)果打印到終端 */ console.log(" ---------------------------------"); console.log("總傳送:" + obj.total + "項(xiàng)"); console.log("成功:" + obj.success + "項(xiàng)"); console.log("失敗:" + obj.failed + "項(xiàng)"); if (obj.result == "success") { console.log(" 結(jié)論:全部數(shù)據(jù)傳送完成!"); } else { console.log(" 結(jié)論:傳送未成功..."); } console.log("---------------------------------"); /* 將傳輸結(jié)果打印到終端 */ });最佳實(shí)踐(完整用例)
var esMysqlRiver = require("mysql_2_elasticsearch"); /* ** mysql_2_elasticsearch 的相關(guān)參數(shù)配置(詳情見注釋) */ var river_config = { /* [必需] MySQL數(shù)據(jù)庫(kù)的相關(guān)參數(shù)(根據(jù)實(shí)際情況進(jìn)行修改) */ mysql: { host: "127.0.0.1", user: "root", password: "root", database: "users", port: 3306 }, /* [必需] es 相關(guān)參數(shù)(根據(jù)實(shí)際情況進(jìn)行修改) */ elasticsearch: { host_config: { // [必需] host_config 即 es客戶端的配置參數(shù),詳細(xì)配置參考 es官方文檔 host: "localhost:9200", log: "trace", // Other options... }, index: "myIndex", // [必需] es 索引名 chunkSize: 8000, // [非必需] 單分片最大數(shù)據(jù)量,默認(rèn)為 5000 (條數(shù)據(jù)) timeout: "2m" // [非必需] 單次分片請(qǐng)求的超時(shí)時(shí)間,默認(rèn)為 1m //(注意:此 timeout 并非es客戶端請(qǐng)求的timeout,后者請(qǐng)?jiān)?host_config 中設(shè)置) }, /* [必需] 數(shù)據(jù)傳送的規(guī)則 */ riverMap: { "users => users": { // [必需] "a => b" 表示將 mysql數(shù)據(jù)庫(kù)中名為 "a" 的 table 的所有數(shù)據(jù) 輸送到 es中名為 "b" 的 type 中去 filter_out: [ // [非必需] 需要過(guò)濾的字段名,即 filter_out 中的設(shè)置的所有字段將不會(huì)被導(dǎo)入 elasticsearch 的數(shù)據(jù)中 "password", "age" ], exception_handler: { // [非必需] 異常處理器,使用JS正則表達(dá)式處理異常數(shù)據(jù),避免 es 入庫(kù)時(shí)由于類型不合法造成數(shù)據(jù)缺失 "birthday": [ // [示例] 對(duì) users 表的 birthday 字段的異常數(shù)據(jù)進(jìn)行處理 { match: /NaN/gi, // [示例] 正則條件(此例匹配字段值為 "NaN" 的情況) writeAs: null // [示例] 將 "NaN" 重寫為 null }, { match: /(d{4})年/gi, // [示例] 正則表達(dá)式(此例匹配字段值為形如 "2016年" 的情況) writeAs: "$1.1" // [示例] 將 "2015年" 樣式的數(shù)據(jù)重寫為 "2016.1" 樣式的數(shù)據(jù) } ] } }, // Other fields" options... } }; /* ** 將傳輸結(jié)果打印到終端 */ esMysqlRiver(river_config, function(obj) { console.log(" ---------------------------------"); console.log("總傳送:" + obj.total + "項(xiàng)"); console.log("成功:" + obj.success + "項(xiàng)"); console.log("失敗:" + obj.failed + "項(xiàng)"); if (obj.result == "success") { console.log(" 結(jié)論:全部數(shù)據(jù)傳送完成!"); } else { console.log(" 結(jié)論:傳送未成功..."); } console.log("---------------------------------"); });注意事項(xiàng)及參考
elasticsearch數(shù)據(jù)導(dǎo)入前請(qǐng)先配置好數(shù)據(jù)的 mapping;
host_config 更多參數(shù)設(shè)置詳見 es官方API文檔;
mysql 表的自增 id 自動(dòng)替換為 表名+_id 的格式,如:users_id;
如出現(xiàn)數(shù)據(jù)缺失情況,請(qǐng)注意查看 elasticsearch 終端進(jìn)程或日志,找出未成功導(dǎo)入的數(shù)據(jù),通過(guò)設(shè)置 exception_handler 參數(shù)處理它。
github 項(xiàng)目地址 https://github.com/parksben/m...文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/91338.html
摘要:最近為了導(dǎo)庫(kù)的問(wèn)題,費(fèi)了一些周折??啥ㄖ频臄?shù)據(jù)導(dǎo)入工具基于的主要功能完全使用實(shí)現(xiàn)數(shù)據(jù)從到的遷移可批量導(dǎo)入多張表可自定義的數(shù)據(jù)遷移的規(guī)則數(shù)據(jù)表字段關(guān)系字段過(guò)濾使用正則進(jìn)行異常處理可自定義的異步分片導(dǎo)入方式,數(shù)據(jù)導(dǎo)入效率更高。 最近為了es導(dǎo)庫(kù)的問(wèn)題,費(fèi)了一些周折。于是乎做了一個(gè)小工具(用過(guò)npm的一些jdbc的導(dǎo)庫(kù)工具,感覺還不夠好用),這里拋磚引玉,自薦一下下,歡迎同道兄弟吐槽和參與,...
閱讀 1644·2021-10-09 09:44
閱讀 2804·2021-10-08 10:04
閱讀 2475·2021-09-26 09:55
閱讀 3854·2021-09-22 10:02
閱讀 3315·2019-08-29 17:08
閱讀 1075·2019-08-29 15:08
閱讀 2963·2019-08-26 13:52
閱讀 3279·2019-08-26 13:34