成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專(zhuān)欄INFORMATION COLUMN

DM 源碼閱讀系列文章(四)dump/load 全量同步的實(shí)現(xiàn)

zombieda / 1351人閱讀

摘要:作者楊非本文為源碼閱讀系列文章的第四篇,上篇文章介紹了數(shù)據(jù)同步處理單元實(shí)現(xiàn)的功能,數(shù)據(jù)同步流程的運(yùn)行邏輯以及數(shù)據(jù)同步處理單元的設(shè)計(jì)。庫(kù)表黑白名單的實(shí)現(xiàn)方式。任務(wù)執(zhí)行完成之后,主線程就會(huì)釋放鎖,這樣有助于減少鎖持有的時(shí)間。

作者:楊非

本文為 DM 源碼閱讀系列文章的第四篇,上篇文章 介紹了數(shù)據(jù)同步處理單元實(shí)現(xiàn)的功能,數(shù)據(jù)同步流程的運(yùn)行邏輯以及數(shù)據(jù)同步處理單元的 interface 設(shè)計(jì)。本篇文章在此基礎(chǔ)上展開(kāi),詳細(xì)介紹 dump 和 load 兩個(gè)數(shù)據(jù)同步處理單元的設(shè)計(jì)實(shí)現(xiàn),重點(diǎn)關(guān)注數(shù)據(jù)同步處理單元 interface 的實(shí)現(xiàn),數(shù)據(jù)導(dǎo)入并發(fā)模型的設(shè)計(jì),以及導(dǎo)入任務(wù)在暫停或出現(xiàn)異常后如何恢復(fù)。

dump 處理單元

dump 處理單元的代碼位于 github.com/pingcap/dm/mydumper 包內(nèi),作用是從上游 MySQL 將表結(jié)構(gòu)和數(shù)據(jù)導(dǎo)出到邏輯 SQL 文件,由于該處理單元總是運(yùn)行在任務(wù)的第一個(gè)階段(full 模式和 all 模式),該處理單元每次運(yùn)行不依賴(lài)于其他處理單元的處理結(jié)果。另一方面,如果在 dump 運(yùn)行過(guò)程中被強(qiáng)制終止(例如在 dmctl 中執(zhí)行 pause-task 或者 stop-task),也不會(huì)記錄已經(jīng) dump 數(shù)據(jù)的 checkpoint 等信息。不記錄 checkpoint 是因?yàn)槊看芜\(yùn)行 mydumper 從上游導(dǎo)出數(shù)據(jù),上游的數(shù)據(jù)都可能發(fā)生變更,為了能得到一致的數(shù)據(jù)和 metadata 信息,每次恢復(fù)任務(wù)或重新運(yùn)行任務(wù)時(shí)該處理單元會(huì) 清理舊的數(shù)據(jù)目錄,重新開(kāi)始一次完整的數(shù)據(jù) dump。

導(dǎo)出表結(jié)構(gòu)和數(shù)據(jù)的邏輯并不是在 DM 內(nèi)部直接實(shí)現(xiàn),而是 通過(guò) os/exec 包調(diào)用外部 mydumper 二進(jìn)制文件 來(lái)完成。在 mydumper 內(nèi)部,我們需要關(guān)注以下幾個(gè)問(wèn)題:

數(shù)據(jù)導(dǎo)出時(shí)的并發(fā)模型是如何實(shí)現(xiàn)的。

no-locks, lock-all-tables, less-locking 等參數(shù)有怎樣的功能。

庫(kù)表黑白名單的實(shí)現(xiàn)方式。

mydumper 的實(shí)現(xiàn)細(xì)節(jié)

mydumper 的一次完整的運(yùn)行流程從主線程開(kāi)始,主線程按照以下步驟執(zhí)行:

解析參數(shù)。

創(chuàng)建到數(shù)據(jù)庫(kù)的連接。

會(huì)根據(jù) no-locks 選項(xiàng)進(jìn)行一系列的備份安全策略,包括 long query guardlock all tables or FLUSH TABLES WITH READ LOCK。

START TRANSACTION WITH CONSISTENT SNAPSHOT

記錄 binlog 位點(diǎn)信息。

less locking 處理線程的初始化。

普通導(dǎo)出線程初始化。

如果配置了 trx-consistency-only 選項(xiàng),執(zhí)行 UNLOCK TABLES /* trx-only */ 釋放之前獲取的表鎖。注意,如果開(kāi)啟該選項(xiàng),是無(wú)法保證非 InnoDB 表導(dǎo)出數(shù)據(jù)的一致性。更多關(guān)于一致性讀的細(xì)節(jié)可以參考 MySQL 官方文檔 Consistent Nonlocking Reads 部分。

根據(jù)配置規(guī)則(包括 --database, --tables-list 和 --regex 配置)讀取需要導(dǎo)出的 schema 和表信息,并在這個(gè)過(guò)程中有區(qū)分的記錄 innodb_tables 和 non_innodb_table。

為工作子線程創(chuàng)建任務(wù),并將任務(wù) push 到相關(guān)的工作隊(duì)列。

如果沒(méi)有配置 no-lockstrx-consistency-only 選項(xiàng),執(zhí)行 UNLOCK TABLES / FTWRL / 釋放鎖。

如果開(kāi)啟 less-locking,等待所有 less locking 子線程退出。

等待所有工作子線程退出。

工作線程的并發(fā)控制包括了兩個(gè)層面,一層是在不同表級(jí)別的并發(fā),另一層是同一張表級(jí)別的并發(fā)。mydumper 的主線程會(huì)將一次同步任務(wù)拆分為多個(gè)同步子任務(wù),并將每個(gè)子任務(wù)分發(fā)給同一個(gè)異步隊(duì)列 conf.queue_less_locking/conf.queue,工作子線程從隊(duì)列中獲取任務(wù)并執(zhí)行。具體的子任務(wù)劃分包括以下策略:

開(kāi)啟 less-locking 選項(xiàng)的非 InnoDB 表的處理。

先將所有 non_innodb_table 分為 num_threads 組,分組方式是遍歷這些表,依此將遍歷到的表加入到當(dāng)前數(shù)據(jù)量最小的分組,盡量保證每個(gè)分組內(nèi)的數(shù)據(jù)量相近。

上述得到的每個(gè)分組內(nèi)會(huì)包含一個(gè)或多個(gè)非 InnoDB 表,如果配置了 rows-per-file 選項(xiàng),會(huì)對(duì)每張表進(jìn)行 chunks 估算,對(duì)于每一張表,如果估算結(jié)果包含多個(gè) chunks,會(huì)將子任務(wù)進(jìn)一步按照 chunks 進(jìn)行拆分,分發(fā) chunks 數(shù)量個(gè)子任務(wù),如果沒(méi)有 chunks 劃分,分發(fā)為一個(gè)獨(dú)立的子任務(wù)。

注意,在該模式下,子任務(wù)會(huì) 發(fā)送到 queue_less_locking,并在編號(hào)為 num_threads ~ 2 * num_threads 的子線程中處理任務(wù)。

less_locking_threads 任務(wù)執(zhí)行完成之后,主線程就會(huì) UNLOCK TABLES / FTWRL / 釋放鎖,這樣有助于減少鎖持有的時(shí)間。主線程根據(jù) conf.unlock_tables 來(lái)判斷非 InnoDB 表是否全部導(dǎo)出,普通工作線程 或者 queue_less_locking 工作線程每次處理完一個(gè)非 InnoDB 表任務(wù)都會(huì)根據(jù) non_innodb_table_counternon_innodb_done 兩個(gè)變量判斷是否還有沒(méi)有導(dǎo)出結(jié)束的非 InnoDB 表,如果都已經(jīng)導(dǎo)出結(jié)束,就會(huì)向異步隊(duì)列 conf.unlock_tables 中發(fā)送一條數(shù)據(jù),表示可以解鎖全局鎖。

每個(gè) less_locking_threads 處理非 InnoDB 表任務(wù)時(shí),會(huì)先 加表鎖,導(dǎo)出數(shù)據(jù),最后 解鎖表鎖。

未開(kāi)啟 less-locking 選項(xiàng)的非 InnoDB 表的處理。

遍歷每一張非 InnoDB 表,同樣對(duì)每張表進(jìn)行 chunks 估算,如果包含多個(gè) chunks,按照 chunks 個(gè)數(shù)分發(fā)同樣的子任務(wù)數(shù);如果沒(méi)有劃分 chunks,每張表分發(fā)一個(gè)子任務(wù)。所有的任務(wù)都分發(fā)到 conf->queue 隊(duì)列。

InnoDB 表的處理。

與未開(kāi)啟 less-locking 選項(xiàng)的非 InnoDB 表的處理相同,同樣是 按照表分發(fā)子任務(wù),如果有 chunks 子任務(wù)會(huì)進(jìn)一步細(xì)分。

從上述的并發(fā)模型可以看出 mydumper 首先按照表進(jìn)行同步任務(wù)拆分,對(duì)于同一張表,如果配置 rows-per-file 參數(shù),會(huì)根據(jù)該參數(shù)和表行數(shù)將表劃分為合適的 chunks 數(shù),這即是同一張表內(nèi)部的并發(fā)。具體表行數(shù)的估算和 chunks 劃分的實(shí)現(xiàn)見(jiàn) get_chunks_for_table 函數(shù)。

需要注意目前 DM 在任務(wù)配置中指定的庫(kù)表黑白名單功能只應(yīng)用于 load 和 binlog replication 處理單元。如果在 dump 處理單元內(nèi)使用庫(kù)表黑白名單功能,需要在同步任務(wù)配置文件的 dump 處理單元配置提供 extra-args 參數(shù),并指定 mydumper 相關(guān)參數(shù),包括 --database, --tables-list 和 --regex。mydumper 使用 regex 過(guò)濾庫(kù)表的實(shí)現(xiàn)參考 check_regex 函數(shù)。

load 處理單元

load 處理單元的代碼位于 github.com/pingcap/dm/loader 包內(nèi),該處理單元在 dump 處理單元運(yùn)行結(jié)束后運(yùn)行,讀取 dump 處理單元導(dǎo)出的 SQL 文件解析并在下游數(shù)據(jù)庫(kù)執(zhí)行邏輯 SQL。我們重點(diǎn)分析 InitProcess 兩個(gè) interface 的實(shí)現(xiàn)。

Init 實(shí)現(xiàn)細(xì)節(jié)

該階段進(jìn)行一些初始化和清理操作,并不會(huì)開(kāi)始同步任務(wù),如果在該階段運(yùn)行中出現(xiàn)錯(cuò)誤,會(huì)通過(guò) rollback 機(jī)制 清理資源,不需要調(diào)用 Close 函數(shù)。該階段包含的初始化操作包括以下幾點(diǎn):

創(chuàng)建 checkpoint,checkpoint 用于記錄全量數(shù)據(jù)的導(dǎo)入進(jìn)度和 load 處理單元暫?;虍惓=K止后,恢復(fù)或重新開(kāi)始任務(wù)時(shí)可以從斷點(diǎn)處繼續(xù)導(dǎo)入數(shù)據(jù)。

應(yīng)用任務(wù)配置的數(shù)據(jù)同步規(guī)則,包括以下規(guī)則:

初始化黑白名單

初始化表路有規(guī)則

初始化列值轉(zhuǎn)換規(guī)則

Process 實(shí)現(xiàn)細(xì)節(jié)

該階段的工作流程也很直觀,通過(guò) 一個(gè)收發(fā)數(shù)據(jù)類(lèi)型為 *pb.ProcessErrorchannel 接收運(yùn)行過(guò)程中出現(xiàn)的錯(cuò)誤,出錯(cuò)后通過(guò) context 的 CancelFunc 強(qiáng)制結(jié)束處理單元運(yùn)行。在核心的 數(shù)據(jù)導(dǎo)入函數(shù) 中,工作模型與 mydumper 類(lèi)似,即在 主線程中分發(fā)任務(wù),有多個(gè)工作線程執(zhí)行具體的數(shù)據(jù)導(dǎo)入任務(wù)。具體的工作細(xì)節(jié)如下:

主線程會(huì)按照庫(kù),表的順序讀取創(chuàng)建庫(kù)語(yǔ)句文件 -schema-create.sql 和建表語(yǔ)句文件 .-schema-create.sql,并在下游執(zhí)行 SQL 創(chuàng)建相對(duì)應(yīng)的庫(kù)和表。

主線程讀取 checkpoint 信息,結(jié)合數(shù)據(jù)文件信息創(chuàng)建 fileJob 隨機(jī)分發(fā)任務(wù)給一個(gè)工作子線程,fileJob 任務(wù)的結(jié)構(gòu)如下所示 :

type fileJob struct {
   schema    string
   table     string
   dataFile  string
   offset    int64 // 表示讀取文件的起始 offset,如果沒(méi)有 checkpoint 斷點(diǎn)信息該值為 0
   info      *tableInfo // 保存原庫(kù)表,目標(biāo)庫(kù)表,列名,insert 語(yǔ)句 column 名字列表等信息
}

在每個(gè)工作線程內(nèi)部,有一個(gè)循環(huán)不斷從自己 fileJobQueue 獲取任務(wù),每次獲取任務(wù)后會(huì)對(duì)文件進(jìn)行解析,并將解析后的結(jié)果分批次打包為 SQL 語(yǔ)句分發(fā)給線程內(nèi)部的另外一個(gè)工作協(xié)程,該工作協(xié)程負(fù)責(zé)處理 SQL 語(yǔ)句的執(zhí)行。工作流程的偽代碼如下所示,完整的代碼參考 func (w *Worker) run()

// worker 工作線程內(nèi)分發(fā)給內(nèi)部工作協(xié)程的任務(wù)結(jié)構(gòu)
type dataJob struct {
   sql         string // insert 語(yǔ)句, insert into  values (x, y, z), (x2, y2, z2), … (xn, yn, zn);
   schema      string // 目標(biāo)數(shù)據(jù)庫(kù)
   file        string // SQL 文件名
   offset      int64 // 本次導(dǎo)入數(shù)據(jù)在 SQL 文件的偏移量
   lastOffset  int64 // 上一次已導(dǎo)入數(shù)據(jù)對(duì)應(yīng) SQL 文件偏移量
}

// SQL 語(yǔ)句執(zhí)行協(xié)程
doJob := func() {
   for {
       select {
       case <-ctx.Done():
           return
       case job := <-jobQueue:
           sqls := []string{
               fmt.Sprintf("USE `%s`;", job.schema), // 指定插入數(shù)據(jù)的 schema
               job.sql,
               checkpoint.GenSQL(job.file, job.offset), // 更新 checkpoint 的 SQL 語(yǔ)句
           }
           executeSQLInOneTransaction(sqls) // 在一個(gè)事務(wù)中執(zhí)行上述 3 條 SQL 語(yǔ)句
       }
   }
}
?
// worker 主線程
for {
   select {
   case <-ctx.Done():
       return
   case job := <-fileJobQueue:
       go doJob()
       readDataFileAndDispatchSQLJobs(ctx, dir, job.dataFile, job.offset, job.info)
   }
}

dispatchSQL 函數(shù)負(fù)責(zé)在工作線程內(nèi)部讀取 SQL 文件和重寫(xiě) SQL,該函數(shù)會(huì)在運(yùn)行初始階段 創(chuàng)建所操作表的 checkpoint 信息,需要注意在任務(wù)中斷恢復(fù)之后,如果這個(gè)文件的導(dǎo)入還沒(méi)有完成,checkpoint.Init 仍然會(huì)執(zhí)行,但是這次運(yùn)行不會(huì)更新該文件的 checkpoint 信息。列值轉(zhuǎn)換和庫(kù)表路由也是在這個(gè)階段內(nèi)完成。

列值轉(zhuǎn)換:需要對(duì)輸入 SQL 進(jìn)行解析拆分為每一個(gè) field,對(duì)需要轉(zhuǎn)換的 field 進(jìn)行轉(zhuǎn)換操作,然后重新拼接起 SQL 語(yǔ)句。詳細(xì)重寫(xiě)流程見(jiàn) reassemble 函數(shù)。

庫(kù)表路由:這種場(chǎng)景下只需要 替換源表到目標(biāo)表 即可。

在工作線程執(zhí)行一個(gè)批次的 SQL 語(yǔ)句之前,會(huì)首先根據(jù)文件 offset 信息生成一條更新 checkpoint 的語(yǔ)句,加入到打包的 SQL 語(yǔ)句中,具體執(zhí)行時(shí)這些語(yǔ)句會(huì) 在一個(gè)事務(wù)中提交,這樣就保證了斷點(diǎn)信息的準(zhǔn)確性,如果導(dǎo)入過(guò)程暫停或中斷,恢復(fù)任務(wù)后從斷點(diǎn)重新同步可以保證數(shù)據(jù)一致。

小結(jié)

本篇詳細(xì)介紹 dump 和 load 兩個(gè)數(shù)據(jù)同步處理單元的設(shè)計(jì)實(shí)現(xiàn),對(duì)核心 interface 實(shí)現(xiàn)、數(shù)據(jù)導(dǎo)入并發(fā)模型、數(shù)據(jù)導(dǎo)入暫停或中斷的恢復(fù)進(jìn)行了分析。接下來(lái)的文章會(huì)繼續(xù)介紹 binlog replication,relay log 兩個(gè)數(shù)據(jù)同步處理單元的實(shí)現(xiàn)。

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/18003.html

相關(guān)文章

  • DM 源碼閱讀系列文章(三)數(shù)據(jù)同步處理單元介紹

    摘要:實(shí)際上中的數(shù)據(jù)同步處理單元分為兩類(lèi)全局共享單例。獨(dú)享數(shù)據(jù)同步處理單元使用邏輯相關(guān)代碼在。數(shù)據(jù)同步處理單元運(yùn)行狀態(tài)監(jiān)控。后續(xù)會(huì)分三篇文章詳細(xì)地介紹數(shù)據(jù)同步處理單元的實(shí)現(xiàn),包括全量同步實(shí)現(xiàn)增量同步實(shí)現(xiàn)實(shí)現(xiàn) 作者:lan 本文為 DM 源碼閱讀系列文章的第三篇,上篇文章 介紹了 DM 的整體架構(gòu),DM 組件 DM-master 和 DM-worker 的入口代碼,以及兩者之間的數(shù)據(jù)交互模型。...

    Forelax 評(píng)論0 收藏0
  • DM 源碼閱讀系列文章(一)序

    摘要:內(nèi)容概要源碼閱讀系列將會(huì)從兩條線進(jìn)行展開(kāi),一條是圍繞的系統(tǒng)架構(gòu)和重要模塊進(jìn)行分析,另一條線圍繞內(nèi)部的同步機(jī)制展開(kāi)分析。更多的代碼閱讀內(nèi)容會(huì)在后面的章節(jié)中逐步展開(kāi),敬請(qǐng)期待。 作者:楊非 前言 TiDB-DM 是由 PingCAP 開(kāi)發(fā)的一體化數(shù)據(jù)同步任務(wù)管理平臺(tái),支持從 MySQL 或 MariaDB 到 TiDB 的全量數(shù)據(jù)遷移和增量數(shù)據(jù)同步,在 TiDB DevCon 2019 正...

    Mr_houzi 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

閱讀需要支付1元查看

          <