摘要:作者楊非本文為源碼閱讀系列文章的第四篇,上篇文章介紹了數(shù)據(jù)同步處理單元實(shí)現(xiàn)的功能,數(shù)據(jù)同步流程的運(yùn)行邏輯以及數(shù)據(jù)同步處理單元的設(shè)計(jì)。庫(kù)表黑白名單的實(shí)現(xiàn)方式。任務(wù)執(zhí)行完成之后,主線程就會(huì)釋放鎖,這樣有助于減少鎖持有的時(shí)間。
作者:楊非
本文為 DM 源碼閱讀系列文章的第四篇,上篇文章 介紹了數(shù)據(jù)同步處理單元實(shí)現(xiàn)的功能,數(shù)據(jù)同步流程的運(yùn)行邏輯以及數(shù)據(jù)同步處理單元的 interface 設(shè)計(jì)。本篇文章在此基礎(chǔ)上展開(kāi),詳細(xì)介紹 dump 和 load 兩個(gè)數(shù)據(jù)同步處理單元的設(shè)計(jì)實(shí)現(xiàn),重點(diǎn)關(guān)注數(shù)據(jù)同步處理單元 interface 的實(shí)現(xiàn),數(shù)據(jù)導(dǎo)入并發(fā)模型的設(shè)計(jì),以及導(dǎo)入任務(wù)在暫停或出現(xiàn)異常后如何恢復(fù)。
dump 處理單元dump 處理單元的代碼位于 github.com/pingcap/dm/mydumper 包內(nèi),作用是從上游 MySQL 將表結(jié)構(gòu)和數(shù)據(jù)導(dǎo)出到邏輯 SQL 文件,由于該處理單元總是運(yùn)行在任務(wù)的第一個(gè)階段(full 模式和 all 模式),該處理單元每次運(yùn)行不依賴(lài)于其他處理單元的處理結(jié)果。另一方面,如果在 dump 運(yùn)行過(guò)程中被強(qiáng)制終止(例如在 dmctl 中執(zhí)行 pause-task 或者 stop-task),也不會(huì)記錄已經(jīng) dump 數(shù)據(jù)的 checkpoint 等信息。不記錄 checkpoint 是因?yàn)槊看芜\(yùn)行 mydumper 從上游導(dǎo)出數(shù)據(jù),上游的數(shù)據(jù)都可能發(fā)生變更,為了能得到一致的數(shù)據(jù)和 metadata 信息,每次恢復(fù)任務(wù)或重新運(yùn)行任務(wù)時(shí)該處理單元會(huì) 清理舊的數(shù)據(jù)目錄,重新開(kāi)始一次完整的數(shù)據(jù) dump。
導(dǎo)出表結(jié)構(gòu)和數(shù)據(jù)的邏輯并不是在 DM 內(nèi)部直接實(shí)現(xiàn),而是 通過(guò) os/exec 包調(diào)用外部 mydumper 二進(jìn)制文件 來(lái)完成。在 mydumper 內(nèi)部,我們需要關(guān)注以下幾個(gè)問(wèn)題:
數(shù)據(jù)導(dǎo)出時(shí)的并發(fā)模型是如何實(shí)現(xiàn)的。
no-locks, lock-all-tables, less-locking 等參數(shù)有怎樣的功能。
庫(kù)表黑白名單的實(shí)現(xiàn)方式。
mydumper 的實(shí)現(xiàn)細(xì)節(jié)mydumper 的一次完整的運(yùn)行流程從主線程開(kāi)始,主線程按照以下步驟執(zhí)行:
解析參數(shù)。
創(chuàng)建到數(shù)據(jù)庫(kù)的連接。
會(huì)根據(jù) no-locks 選項(xiàng)進(jìn)行一系列的備份安全策略,包括 long query guard 和 lock all tables or FLUSH TABLES WITH READ LOCK。
START TRANSACTION WITH CONSISTENT SNAPSHOT。
記錄 binlog 位點(diǎn)信息。
less locking 處理線程的初始化。
普通導(dǎo)出線程初始化。
如果配置了 trx-consistency-only 選項(xiàng),執(zhí)行 UNLOCK TABLES /* trx-only */ 釋放之前獲取的表鎖。注意,如果開(kāi)啟該選項(xiàng),是無(wú)法保證非 InnoDB 表導(dǎo)出數(shù)據(jù)的一致性。更多關(guān)于一致性讀的細(xì)節(jié)可以參考 MySQL 官方文檔 Consistent Nonlocking Reads 部分。
根據(jù)配置規(guī)則(包括 --database, --tables-list 和 --regex 配置)讀取需要導(dǎo)出的 schema 和表信息,并在這個(gè)過(guò)程中有區(qū)分的記錄 innodb_tables 和 non_innodb_table。
為工作子線程創(chuàng)建任務(wù),并將任務(wù) push 到相關(guān)的工作隊(duì)列。
如果沒(méi)有配置 no-locks 和 trx-consistency-only 選項(xiàng),執(zhí)行 UNLOCK TABLES / FTWRL / 釋放鎖。
如果開(kāi)啟 less-locking,等待所有 less locking 子線程退出。
等待所有工作子線程退出。
工作線程的并發(fā)控制包括了兩個(gè)層面,一層是在不同表級(jí)別的并發(fā),另一層是同一張表級(jí)別的并發(fā)。mydumper 的主線程會(huì)將一次同步任務(wù)拆分為多個(gè)同步子任務(wù),并將每個(gè)子任務(wù)分發(fā)給同一個(gè)異步隊(duì)列 conf.queue_less_locking/conf.queue,工作子線程從隊(duì)列中獲取任務(wù)并執(zhí)行。具體的子任務(wù)劃分包括以下策略:
開(kāi)啟 less-locking 選項(xiàng)的非 InnoDB 表的處理。
先將所有 non_innodb_table 分為 num_threads 組,分組方式是遍歷這些表,依此將遍歷到的表加入到當(dāng)前數(shù)據(jù)量最小的分組,盡量保證每個(gè)分組內(nèi)的數(shù)據(jù)量相近。
上述得到的每個(gè)分組內(nèi)會(huì)包含一個(gè)或多個(gè)非 InnoDB 表,如果配置了 rows-per-file 選項(xiàng),會(huì)對(duì)每張表進(jìn)行 chunks 估算,對(duì)于每一張表,如果估算結(jié)果包含多個(gè) chunks,會(huì)將子任務(wù)進(jìn)一步按照 chunks 進(jìn)行拆分,分發(fā) chunks 數(shù)量個(gè)子任務(wù),如果沒(méi)有 chunks 劃分,分發(fā)為一個(gè)獨(dú)立的子任務(wù)。
注意,在該模式下,子任務(wù)會(huì) 發(fā)送到 queue_less_locking,并在編號(hào)為 num_threads ~ 2 * num_threads 的子線程中處理任務(wù)。
less_locking_threads 任務(wù)執(zhí)行完成之后,主線程就會(huì) UNLOCK TABLES / FTWRL / 釋放鎖,這樣有助于減少鎖持有的時(shí)間。主線程根據(jù) conf.unlock_tables 來(lái)判斷非 InnoDB 表是否全部導(dǎo)出,普通工作線程 或者 queue_less_locking 工作線程每次處理完一個(gè)非 InnoDB 表任務(wù)都會(huì)根據(jù) non_innodb_table_counter 和 non_innodb_done 兩個(gè)變量判斷是否還有沒(méi)有導(dǎo)出結(jié)束的非 InnoDB 表,如果都已經(jīng)導(dǎo)出結(jié)束,就會(huì)向異步隊(duì)列 conf.unlock_tables 中發(fā)送一條數(shù)據(jù),表示可以解鎖全局鎖。
每個(gè) less_locking_threads 處理非 InnoDB 表任務(wù)時(shí),會(huì)先 加表鎖,導(dǎo)出數(shù)據(jù),最后 解鎖表鎖。
未開(kāi)啟 less-locking 選項(xiàng)的非 InnoDB 表的處理。
遍歷每一張非 InnoDB 表,同樣對(duì)每張表進(jìn)行 chunks 估算,如果包含多個(gè) chunks,按照 chunks 個(gè)數(shù)分發(fā)同樣的子任務(wù)數(shù);如果沒(méi)有劃分 chunks,每張表分發(fā)一個(gè)子任務(wù)。所有的任務(wù)都分發(fā)到 conf->queue 隊(duì)列。
InnoDB 表的處理。
與未開(kāi)啟 less-locking 選項(xiàng)的非 InnoDB 表的處理相同,同樣是 按照表分發(fā)子任務(wù),如果有 chunks 子任務(wù)會(huì)進(jìn)一步細(xì)分。
從上述的并發(fā)模型可以看出 mydumper 首先按照表進(jìn)行同步任務(wù)拆分,對(duì)于同一張表,如果配置 rows-per-file 參數(shù),會(huì)根據(jù)該參數(shù)和表行數(shù)將表劃分為合適的 chunks 數(shù),這即是同一張表內(nèi)部的并發(fā)。具體表行數(shù)的估算和 chunks 劃分的實(shí)現(xiàn)見(jiàn) get_chunks_for_table 函數(shù)。
需要注意目前 DM 在任務(wù)配置中指定的庫(kù)表黑白名單功能只應(yīng)用于 load 和 binlog replication 處理單元。如果在 dump 處理單元內(nèi)使用庫(kù)表黑白名單功能,需要在同步任務(wù)配置文件的 dump 處理單元配置提供 extra-args 參數(shù),并指定 mydumper 相關(guān)參數(shù),包括 --database, --tables-list 和 --regex。mydumper 使用 regex 過(guò)濾庫(kù)表的實(shí)現(xiàn)參考 check_regex 函數(shù)。
load 處理單元load 處理單元的代碼位于 github.com/pingcap/dm/loader 包內(nèi),該處理單元在 dump 處理單元運(yùn)行結(jié)束后運(yùn)行,讀取 dump 處理單元導(dǎo)出的 SQL 文件解析并在下游數(shù)據(jù)庫(kù)執(zhí)行邏輯 SQL。我們重點(diǎn)分析 Init 和 Process 兩個(gè) interface 的實(shí)現(xiàn)。
Init 實(shí)現(xiàn)細(xì)節(jié)該階段進(jìn)行一些初始化和清理操作,并不會(huì)開(kāi)始同步任務(wù),如果在該階段運(yùn)行中出現(xiàn)錯(cuò)誤,會(huì)通過(guò) rollback 機(jī)制 清理資源,不需要調(diào)用 Close 函數(shù)。該階段包含的初始化操作包括以下幾點(diǎn):
創(chuàng)建 checkpoint,checkpoint 用于記錄全量數(shù)據(jù)的導(dǎo)入進(jìn)度和 load 處理單元暫?;虍惓=K止后,恢復(fù)或重新開(kāi)始任務(wù)時(shí)可以從斷點(diǎn)處繼續(xù)導(dǎo)入數(shù)據(jù)。
應(yīng)用任務(wù)配置的數(shù)據(jù)同步規(guī)則,包括以下規(guī)則:
初始化黑白名單
初始化表路有規(guī)則
初始化列值轉(zhuǎn)換規(guī)則
Process 實(shí)現(xiàn)細(xì)節(jié)該階段的工作流程也很直觀,通過(guò) 一個(gè)收發(fā)數(shù)據(jù)類(lèi)型為 *pb.ProcessError 的 channel 接收運(yùn)行過(guò)程中出現(xiàn)的錯(cuò)誤,出錯(cuò)后通過(guò) context 的 CancelFunc 強(qiáng)制結(jié)束處理單元運(yùn)行。在核心的 數(shù)據(jù)導(dǎo)入函數(shù) 中,工作模型與 mydumper 類(lèi)似,即在 主線程中分發(fā)任務(wù),有多個(gè)工作線程執(zhí)行具體的數(shù)據(jù)導(dǎo)入任務(wù)。具體的工作細(xì)節(jié)如下:
主線程會(huì)按照庫(kù),表的順序讀取創(chuàng)建庫(kù)語(yǔ)句文件
主線程讀取 checkpoint 信息,結(jié)合數(shù)據(jù)文件信息創(chuàng)建 fileJob 隨機(jī)分發(fā)任務(wù)給一個(gè)工作子線程,fileJob 任務(wù)的結(jié)構(gòu)如下所示 :
type fileJob struct { schema string table string dataFile string offset int64 // 表示讀取文件的起始 offset,如果沒(méi)有 checkpoint 斷點(diǎn)信息該值為 0 info *tableInfo // 保存原庫(kù)表,目標(biāo)庫(kù)表,列名,insert 語(yǔ)句 column 名字列表等信息 }
在每個(gè)工作線程內(nèi)部,有一個(gè)循環(huán)不斷從自己 fileJobQueue 獲取任務(wù),每次獲取任務(wù)后會(huì)對(duì)文件進(jìn)行解析,并將解析后的結(jié)果分批次打包為 SQL 語(yǔ)句分發(fā)給線程內(nèi)部的另外一個(gè)工作協(xié)程,該工作協(xié)程負(fù)責(zé)處理 SQL 語(yǔ)句的執(zhí)行。工作流程的偽代碼如下所示,完整的代碼參考 func (w *Worker) run():
// worker 工作線程內(nèi)分發(fā)給內(nèi)部工作協(xié)程的任務(wù)結(jié)構(gòu) type dataJob struct { sql string // insert 語(yǔ)句, insert into