摘要:若聚合函數(shù)不可以下推,則保持不變。目前當(dāng)且僅當(dāng)兩種情況下不可以并行執(zhí)行存在某個(gè)聚合函數(shù)參數(shù)為時(shí)。
作者:徐懷宇聚合算法執(zhí)行原理
在 SQL 中,聚合操作對(duì)一組值執(zhí)行計(jì)算,并返回單個(gè)值。TiDB 實(shí)現(xiàn)了 2 種聚合算法:Hash Aggregation 和 Stream Aggregation。
我們首先以 AVG 函數(shù)為例(案例參考 Stack Overflow),簡述這兩種算法的執(zhí)行原理。
假設(shè)表 t 如下:
列 a | 列 b |
---|---|
1 | 9 |
1 | -8 |
2 | -7 |
2 | 6 |
1 | 5 |
2 | 4 |
SQL: select avg(b) from t group by a, 要求將表 t 的數(shù)據(jù)按照 a 的值分組,對(duì)每一組的 b 值計(jì)算平均值。不管 Hash 還是 Stream 聚合,在 AVG 函數(shù)的計(jì)算過程中,我們都需要維護(hù) 2 個(gè)中間結(jié)果變量 sum 和 count。Hash 和 Stream 聚合算法的執(zhí)行原理如下。
Hash Aggregate 的執(zhí)行原理在 Hash Aggregate 的計(jì)算過程中,我們需要維護(hù)一個(gè) Hash 表,Hash 表的鍵為聚合計(jì)算的 Group-By 列,值為聚合函數(shù)的中間結(jié)果 sum 和 count。在本例中,鍵為 列 a 的值,值為 sum(b) 和 count(b)。
計(jì)算過程中,只需要根據(jù)每行輸入數(shù)據(jù)計(jì)算出鍵,在 Hash 表中找到對(duì)應(yīng)值進(jìn)行更新即可。對(duì)本例的執(zhí)行過程模擬如下。
輸入數(shù)據(jù) a b | Hash 表 [key] (sum, count) |
---|---|
1 9 | [1] (9, 1) |
1 -8 | [1] (1, 2) |
2 -7 | [1] (1, 2) ?[2] (-7, 1) |
2 6 | [1] (1, 2) ?[2] (-1, 2) |
1 5 | [1] (6, 3) ?[2] (-1, 2) |
2 4 | [1] (6, 3) ?[2] (3, 3) |
輸入數(shù)據(jù)輸入完后,掃描 Hash 表并計(jì)算,便可以得到最終結(jié)果:
Hash 表 | avg(b) |
---|---|
[1] (6, 3) | 2 |
[2] (3, 3) | 1 |
Stream Aggregate 的計(jì)算需要保證輸入數(shù)據(jù)按照 Group-By 列有序。在計(jì)算過程中,每當(dāng)讀到一個(gè)新的 Group 的值或所有數(shù)據(jù)輸入完成時(shí),便對(duì)前一個(gè) Group 的聚合最終結(jié)果進(jìn)行計(jì)算。
對(duì)于本例,我們首先對(duì)輸入數(shù)據(jù)按照 a 列進(jìn)行排序。排序后,本例執(zhí)行過程模擬如下。
輸入數(shù)據(jù) | 是否為新 Group 或所有數(shù)據(jù)輸入完成 | (sum, count) | avg(b) |
---|---|---|---|
1 9 | 是 | (1, 9) | 前一個(gè) Group 為空,不進(jìn)行計(jì)算 |
1 -8 | 否 | (2, 1) | |
1 5 | 否 | (3, 6) | |
2 -7 | 是 | (1, -7) | 2 |
2 6 | 否 | (2, -1) | |
2 4 | 否 | (3, 3) | |
是 | 1 |
因?yàn)?Stream Aggregate 的輸入數(shù)據(jù)需要保證同一個(gè) Group 的數(shù)據(jù)連續(xù)輸入,所以 Stream Aggregate 處理完一個(gè) Group 的數(shù)據(jù)后可以立刻向上返回結(jié)果,不用像 Hash Aggregate 一樣需要處理完所有數(shù)據(jù)后才能正確的對(duì)外返回結(jié)果。當(dāng)上層算子只需要計(jì)算部分結(jié)果時(shí),比如 Limit,當(dāng)獲取到需要的行數(shù)后,可以提前中斷 Stream Aggregate 后續(xù)的無用計(jì)算。
當(dāng) Group-By 列上存在索引時(shí),由索引讀入數(shù)據(jù)可以保證輸入數(shù)據(jù)按照 Group-By 列有序,此時(shí)同一個(gè) Group 的數(shù)據(jù)連續(xù)輸入 Stream Aggregate 算子,可以避免額外的排序操作。
TiDB 聚合函數(shù)的計(jì)算模式由于分布式計(jì)算的需要,TiDB 對(duì)于聚合函數(shù)的計(jì)算階段進(jìn)行劃分,相應(yīng)定義了 5 種計(jì)算模式:CompleteMode,F(xiàn)inalMode,Partial1Mode,Partial2Mode,DedupMode。不同的計(jì)算模式下,所處理的輸入值和輸出值會(huì)有所差異,如下表所示:
AggFunctionMode | 輸入值 | 輸出值 |
---|---|---|
CompleteMode | 原始數(shù)據(jù) | 最終結(jié)果 |
FinalMode | 中間結(jié)果 | 最終結(jié)果 |
Partial1Mode | 原始數(shù)據(jù) | 中間結(jié)果 |
Partial2Mode | 中間結(jié)果 | 進(jìn)一步聚合的中間結(jié)果 |
DedupMode | 原始數(shù)據(jù) | 去重后的原始數(shù)據(jù) |
以上文提到的 select avg(b) from t group by a 為例,通過對(duì)計(jì)算階段進(jìn)行劃分,可以有多種不同的計(jì)算模式的組合,如:
CompleteMode
此時(shí) AVG 函數(shù)的整個(gè)計(jì)算過程只有一個(gè)階段,如圖所示:
Partial1Mode --> FinalMode
此時(shí)我們將 AVG 函數(shù)的計(jì)算過程拆成兩個(gè)階段進(jìn)行,如圖所示:
除了上面的兩個(gè)例子外,還可能有如下的幾種計(jì)算方式:
聚合被下推到 TiKV 上進(jìn)行計(jì)算(Partial1Mode),并返回經(jīng)過預(yù)聚合的中間結(jié)果。為了充分利用 TiDB server 所在機(jī)器的 CPU 和內(nèi)存資源,加快 TiDB 層的聚合計(jì)算,TiDB 層的聚合函數(shù)計(jì)算可以這樣進(jìn)行:Partial2Mode --> FinalMode。
當(dāng)聚合函數(shù)需要對(duì)參數(shù)進(jìn)行去重,也就是包含 DISTINCT 屬性,且聚合算子因?yàn)橐恍┰虿荒芟峦频?TiKV 時(shí),TiDB 層的聚合函數(shù)計(jì)算可以這樣進(jìn)行:DedupMode --> Partial1Mode --> FinalMode。
聚合函數(shù)分為幾個(gè)階段執(zhí)行, 每個(gè)階段對(duì)應(yīng)的模式是什么,是否要下推到 TiKV,使用 Hash 還是 Stream 聚合算子等都由優(yōu)化器根據(jù)數(shù)據(jù)分布、估算的計(jì)算代價(jià)等來決定。
TiDB 并行 Hash Aggregation 的實(shí)現(xiàn) 如何構(gòu)建 Hash Aggregation 執(zhí)行器構(gòu)建邏輯執(zhí)行計(jì)劃 時(shí),會(huì)調(diào)用 NewAggFuncDesc 將聚合函數(shù)的元信息封裝為一個(gè) AggFuncDesc。 其中 AggFuncDesc.RetTp 由 AggFuncDesc.typeInfer 根據(jù)聚合函數(shù)類型及參數(shù)類型推導(dǎo)而來;AggFuncDesc.Mode 統(tǒng)一初始化為 CompleteMode。
構(gòu)建物理執(zhí)行計(jì)劃時(shí),PhysicalHashAgg 和 PhysicalStreamAgg 的 attach2Task 方法會(huì)根據(jù)當(dāng)前 task 的類型嘗試進(jìn)行下推聚合計(jì)算,如果 task 類型滿足下推的基本要求,比如 copTask,接著會(huì)調(diào)用 newPartialAggregate 嘗試將聚合算子拆成 TiKV 上執(zhí)行的 Partial 算子和 TiDB 上執(zhí)行的 Final 算子,其中 AggFuncToPBExpr 函數(shù)用來判斷某個(gè)聚合函數(shù)是否可以下推。若聚合函數(shù)可以下推,則會(huì)在 TiKV 中進(jìn)行預(yù)聚合并返回中間結(jié)果,因此需要將 TiDB 層執(zhí)行的 Final 聚合算子的 AggFuncDesc.Mode 修改為 FinalMode,并將其 AggFuncDesc.Args 修改為 TiKV 預(yù)聚合后返回的中間結(jié)果,TiKV 層的 Partial 聚合算子的 AggFuncDesc 也需要作出對(duì)應(yīng)的修改,這里不再詳述。若聚合函數(shù)不可以下推,則 AggFuncDesc.Mode 保持不變。
構(gòu)建 HashAgg 執(zhí)行器時(shí),首先檢查當(dāng)前 HashAgg 算子是否可以并行執(zhí)行。目前當(dāng)且僅當(dāng)兩種情況下 HashAgg 不可以并行執(zhí)行:
存在某個(gè)聚合函數(shù)參數(shù)為 DISTINCT 時(shí)。TiDB 暫未實(shí)現(xiàn)對(duì) DedupMode 的支持,因此對(duì)于含有 DISTINCT 的情況目前僅能單線程執(zhí)行。
系統(tǒng)變量 tidb_hashagg_partial_concurrency 和 tidb_hashagg_final_concurrency 被同時(shí)設(shè)置為 1 時(shí)。這兩個(gè)系統(tǒng)變量分別用來控制 Hash Aggregation 并行計(jì)算時(shí)候,TiDB 層聚合計(jì)算 partial 和 final 階段 worker 的并發(fā)數(shù)。當(dāng)它們都被設(shè)置為 1 時(shí),選擇單線程執(zhí)行。
若 HashAgg 算子可以并行執(zhí)行,使用 AggFuncDesc.Split 根據(jù) AggFuncDesc.Mode 將 TiDB 層的聚合算子的計(jì)算拆分為 partial 和 final 兩個(gè)階段,并分別生成對(duì)應(yīng)的 AggFuncDesc,設(shè)為 partialAggDesc 和 finalAggDesc。若 AggFuncDesc.Mode == CompleteMode,則將 TiDB 層的計(jì)算階段拆分為 Partial1Mode --> FinalMode;若 AggFuncDesc.Mode == FinalMode,則將 TiDB 層的計(jì)算階段拆分為 Partial2Mode --> FinalMode。進(jìn)一步的,我們可以根據(jù) partialAggDesc 和 finalAggDesc 分別 構(gòu)造出對(duì)應(yīng)的執(zhí)行函數(shù)。
并行 Hash Aggregation 執(zhí)行過程詳述TiDB 的并行 Hash Aggregation 算子執(zhí)行過程中的主要線程有:Main Thead,Data Fetcher,Partial Worker,和 Final Worker:
Main Thread 一個(gè):
啟動(dòng) Input Reader,Partial Workers 及 Final Workers
等待 Final Worker 的執(zhí)行結(jié)果并返回
Data Fetcher 一個(gè):
按 batch 讀取子節(jié)點(diǎn)數(shù)據(jù)并分發(fā)給 Partial Worker
Partial Worker 多個(gè):
讀取 Data Fetcher 發(fā)送來的數(shù)據(jù),并做預(yù)聚合
將預(yù)聚合結(jié)果根據(jù) Group 值 shuffle 給對(duì)應(yīng)的 Final Worker
Final Worker 多個(gè):
讀取 PartialWorker 發(fā)送來的數(shù)據(jù),計(jì)算最終結(jié)果,發(fā)送給 Main Thread
Hash Aggregation 的執(zhí)行階段可分為如下圖所示的 5 步:
啟動(dòng) Data Fetcher,Partial Workers 及 Final Workers。
這部分工作由 prepare4Parallel 函數(shù)完成。該函數(shù)會(huì)啟動(dòng)一個(gè) Data Fetcher,多個(gè) Partial Worker 以及 多個(gè) Final Worker。Partial Worker 和 Final Worker 的數(shù)量可以分別通過 tidb_hashgg_partial_concurrency 和 tidb_hashagg_final_concurrency 系統(tǒng)變量進(jìn)行控制,這兩個(gè)系統(tǒng)變量的默認(rèn)值都為 4。
DataFetcher 讀取子節(jié)點(diǎn)的數(shù)據(jù)并分發(fā)給 Partial Workers。
這部分工作由 fetchChildData 函數(shù)完成。
Partial Workers 預(yù)聚合計(jì)算,及根據(jù) Group Key shuffle 給對(duì)應(yīng)的 Final Workers。
這部分工作由 HashAggPartialWorker.run 函數(shù)完成。該函數(shù)調(diào)用 updatePartialResult 函數(shù)對(duì) DataFetcher 發(fā)來數(shù)據(jù)執(zhí)行 預(yù)聚合計(jì)算,并將預(yù)聚合結(jié)果存儲(chǔ)到 partialResultMap 中。其中 partialResultMap 的 key 為根據(jù) Group-By 的值 encode 的結(jié)果,value 為 PartialResult 類型的數(shù)組,數(shù)組中的每個(gè)元素表示該下標(biāo)處的聚合函數(shù)在對(duì)應(yīng) Group 中的預(yù)聚合結(jié)果。shuffleIntermData 函數(shù)完成根據(jù) Group 值 shuffle 給對(duì)應(yīng)的 Final Worker。
Final Worker 計(jì)算最終結(jié)果,發(fā)送給 Main Thread。
這部分工作由 HashAggFinalWorker.run 函數(shù)完成。該函數(shù)調(diào)用 consumeIntermData 函數(shù) 接收 PartialWorkers 發(fā)送來的預(yù)聚合結(jié)果,進(jìn)而 合并 得到最終結(jié)果。getFinalResult 函數(shù)完成發(fā)送最終結(jié)果給 Main Thread。
Main Thread 接收最終結(jié)果并返回。
TiDB 并行 Hash Aggregation 的性能提升此處以 TPC-H query-17 為例,測(cè)試并行 Hash Aggregation 相較于單線程計(jì)算時(shí)的性能提升。引入并行 Hash Aggregation 前,它的計(jì)算瓶頸在 HashAgg_35。
該查詢執(zhí)行計(jì)劃如下:
在 TiDB 中,使用 EXPLAIN ANALYZE 可以獲取 SQL 的執(zhí)行統(tǒng)計(jì)信息。因篇幅原因此處僅貼出 TPC-H query-17 部分算子的 EXPLAIN ANALYZE 結(jié)果。
HashAgg 單線程計(jì)算時(shí):
查詢總執(zhí)行時(shí)間 23 分 24 秒,其中 HashAgg 執(zhí)行時(shí)間約 17 分 9 秒。
HashAgg 并行計(jì)算時(shí)(此時(shí) TiDB 層 Partial 和 Final 階段的 worker 數(shù)量都設(shè)置為 16):
總查詢時(shí)間 8 分 37 秒,其中 HashAgg 執(zhí)行時(shí)間約 1 分 4 秒。
并行計(jì)算時(shí),Hash Aggregation 的計(jì)算速度提升約 16 倍。
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/17856.html
摘要:部分主要流程如下把上文提到語法解析階段會(huì)把語句中相關(guān)信息轉(zhuǎn)換成然后負(fù)責(zé)把結(jié)構(gòu)轉(zhuǎn)換即的元信息。最后把的元信息追加到的元信息中,具體實(shí)現(xiàn)在這里。會(huì)把要?jiǎng)h除的分區(qū)從元信息刪除掉,刪除前會(huì)做的檢查。 作者:肖亮亮 Table Partition 什么是 Table Partition Table Partition 是指根據(jù)一定規(guī)則,將數(shù)據(jù)庫中的一張表分解成多個(gè)更小的容易管理的部分。從邏輯上看...
閱讀 942·2021-09-07 09:58
閱讀 1494·2021-09-07 09:58
閱讀 2888·2021-09-04 16:40
閱讀 2508·2019-08-30 15:55
閱讀 2416·2019-08-30 15:54
閱讀 1374·2019-08-30 15:52
閱讀 438·2019-08-30 10:49
閱讀 2610·2019-08-29 13:21