跳至主要内容

P3 - Query Execution

在這個 project 中,我們需要實現一個 query execution engine,總共有 4 個 task :

  • Access Method Executors
  • Aggregation & Join Executors
  • HashJoin Executor and Optimization
  • External Merge Sort + Limit Executors

Background

這個 project 的背景知識相對比較複雜,需要大量閱讀程式碼來了解整個結構。

我會介紹一下整個 SQL 從輸入到執行的流程以及資料儲存的結構來方便大家理解。 如果還是有卡住的話,我建議可以先去看一些已經實現好的 Executor 來了解整個流程。

SQL Query Journey

這邊會用一個簡單的 SQL 當作範例,簡單介紹一下 bustub 中 SQL 是怎麼被執行的。 範例這部分我是用 AI 幫忙生成的,不一定是真正的欄位但能了解意思就好 :

SELECT s.id, s.name, e.course
FROM student s
JOIN enrollment e ON s.id = e.sid
WHERE s.age > 18
LIMIT 10;
SQL Query Journey

Entrypoint

Shell 的入口點在 src/tools/shell/shell.cpp,負責創建 BustubInstance 實例並呼叫 bustub->ExecuteSql() 來執行 SQL。

BustubInstanceExecuteSql() 函數是執行 SQL 的主要入口,它會呼叫 ExecuteSqlTxn() 來執行查詢,如果出現錯誤則會呼叫 Abort()

Parser

首先 query 會先進入 parser,parser 的作用是將 SQL 解析成 AST,這部分 bustub 是使用第三方的工具 libpg_query 來完成的。

ExecuteSqlTxn() 中 bustub 會呼叫 ParseAndSave() 來將 SQL 解析成 AST,並把結果存到 statement_nodes_ 中。

以上面的 SQL 為例,解析後的 AST 可能會長這樣 :

PGSelectStmt {
targetList: [
PGResTarget { name: "id", val: PGColumnRef { fields: ["s", "id"] } },
PGResTarget { name: "name", val: PGColumnRef { fields: ["s", "name"] } },
PGResTarget { name: "course", val: PGColumnRef { fields: ["e", "course"] } }
],
fromClause: [
PGJoinExpr {
jointype: JOIN_INNER,
larg: PGRangeVar { relname: "student", alias: "s" },
rarg: PGRangeVar { relname: "enrollment", alias: "e" },
quals: PGAExpr { op: "=", lexpr: "s.id", rexpr: "e.sid" }
}
],
whereClause: PGAExpr { op: ">", lexpr: "s.age", rexpr: 18 },
limitCount: 10
}

Binder

接著這個 AST 就會交給 binder,binder 的作用是透過 Catalog 將 AST 中的名稱綁定到實際的資料庫物件上。

ExecuteSqlTxn() 中,BindStatement() 函數會負責這個工作。

以上面的 SQL 為例,綁定之後的 AST 可能會長這樣,可以看到它已經有 table_oidcol_idx 了 :

SelectStatement {
select_list_: [
BoundColumnRef { col_name: "id", type: INTEGER, table_oid: 1, col_idx: 0 },
BoundColumnRef { col_name: "name", type: VARCHAR, table_oid: 1, col_idx: 1 },
BoundColumnRef { col_name: "course", type: VARCHAR, table_oid: 2, col_idx: 1 }
],
table_: BoundJoinRef {
type: INNER_JOIN,
left: BoundBaseTableRef {
table: "student",
alias: "s",
oid: 1,
schema: [id:INTEGER, name:VARCHAR, age:INTEGER]
},
right: BoundBaseTableRef {
table: "enrollment",
alias: "e",
oid: 2,
schema: [sid:INTEGER, course:VARCHAR]
},
condition: BoundBinaryOp {
op_type: EQUAL,
left: BoundColumnRef { col_name: "id", table_oid: 1, col_idx: 0 },
right: BoundColumnRef { col_name: "sid", table_oid: 2, col_idx: 0 }
}
},
where_: BoundBinaryOp {
op_type: GREATER_THAN,
left: BoundColumnRef { col_name: "age", table_oid: 1, col_idx: 2 },
right: BoundConstant { value: 18, type: INTEGER }
},
limit_: BoundConstant { value: 10, type: INTEGER }
}

Planner

綁定好物件之後就會交給 planner,planner 的作用是將綁定好的 AST 轉換成執行計劃 (PlanNode)。

PlannerPlanQuery() 函數會將 Binder 轉換為 PlanNode,這是一個樹狀結構,表示 SQL 語句的執行計劃。

以上面的 SQL 為例,轉換後的執行計劃可能會長這樣 :

LimitPlanNode {
limit_: 10,
child_: ProjectionPlanNode {
expressions_: [
ColumnValueExpression { col_idx: 0 }, // s.id
ColumnValueExpression { col_idx: 1 }, // s.name
ColumnValueExpression { col_idx: 4 } // e.course (after join)
],
output_schema_: Schema([id:INTEGER, name:VARCHAR, course:VARCHAR]),
child_: FilterPlanNode {
predicate_: ComparisonExpression {
op_: GREATER_THAN,
left_: ColumnValueExpression { col_idx: 2 }, // s.age
right_: ConstantValueExpression { value: 18 }
},
child_: NestedLoopJoinPlanNode {
join_type_: INNER,
join_predicate_: ComparisonExpression {
op_: EQUAL,
left_: ColumnValueExpression { col_idx: 0 }, // s.id
right_: ColumnValueExpression { col_idx: 3 } // e.sid
},
output_schema_: Schema([id:INTEGER, name:VARCHAR, age:INTEGER, sid:INTEGER, course:VARCHAR]),
left_child_: SeqScanPlanNode {
table_oid_: 1, // student table
output_schema_: Schema([id:INTEGER, name:VARCHAR, age:INTEGER])
},
right_child_: SeqScanPlanNode {
table_oid_: 2, // enrollment table
output_schema_: Schema([sid:INTEGER, course:VARCHAR])
}
}
}
}
}

Optimizer

接著就會交給 optimizer 來優化執行計劃,這裡 bustub 採用的是 rule-based 的方式,也就是它會一條一條規則的套用到執行計劃上。

OptimizerOptimize() 函數會檢查這些規則並套用到 PlanNode 上,比如說我們要實作的 OptimizeNLJAsIndexJoin

以上面的 SQL 為例,優化後的執行計劃可能會長這樣,改成使用 NestedIndexJoin 並做了 predicate pushdown :

LimitPlanNode {
limit_: 10,
child_: ProjectionPlanNode {
expressions_: [
ColumnValueExpression { col_idx: 0 }, // s.id
ColumnValueExpression { col_idx: 1 }, // s.name
ColumnValueExpression { col_idx: 4 } // e.course
],
child_: NestedIndexJoinPlanNode {
outer_child_: FilterPlanNode {
predicate_: ComparisonExpression { op: GT, left: age, right: 18 },
child_: SeqScanPlanNode { table: "student" }
},
key_predicate_: ColumnValueExpression { col_idx: 0 }, // s.id
inner_table_oid_: 2,
index_oid_: 5,
index_name_: "enrollment_sid_idx"
}
}
}

Executor

最後就是交給 executor 來將 PlanNode 轉換成對應的 Executor,這裡 bustub 採用的是 pull-based 的 volcano model,也就是每個 Executor 都有 Init()Next() 兩個方法,並且上層的 Executor 會不斷呼叫下層的 ExecutorNext() 來取得資料直到回傳 false 為止。

Executor 的入口點在 src/execution/executor_engine.cpp,它會呼叫 ExecutorFactory::CreateExecutor() 來遞迴地針對不同種類的 PlanNode 創建對應的 Executor。 同時它也會呼叫 Init() 來初始化整個 executor 樹,最後呼叫 PollExecutor() 來不斷的 pull 資料直到沒有資料為止。

除此之外,ExecutorContext 也會在這個時候被創建,它包含了 TransactionCatalogBufferPoolManager 等等執行時需要的物件。

Storage Structure

在了解了整個 SQL 的執行流程之後,我們還需要了解一下資料庫中的結構以及這些結構是如何被儲存的。

Storage Structure
  • Catalog : 最頂層的結構,負責管理資料表跟索引,有 CreateTable()GetTable()CreateIndex()GetIndex() 等等方法
  • TableInfo : 代表一個資料表,包含了 SchemaTableHeapTableOidName 等等資訊
  • IndexInfo : 代表一個索引,包含了 SchemaIndexIndexOidName 等等資訊
  • TableHeap : 代表 page 的儲存結構,維護 first_page_id_last_page_id_ 方便插入跟查詢資料,同時提供了很多很有用的方法,例如 InsertTuple()GetTuple()UpdateTupleMeta()UpdateTupleInPlaceWithLockAcquired()
  • TablePage : 代表一個 page,採用 slotted-page 的結構來儲存 tuple
  • Tuple : 代表一筆資料,包含了 RID (由 page id / slot num 組成) 跟 datadata 裡面有多個 ValueValue 可以代表不同的資料型態
  • TupleInfo : 代表 tuple 在 page 中的 offset 跟 size 以及 TupleMetaTupleMeta 包含了 is_deleted_ts_ (timestamp)
  • Schema : 代表資料表的 schema,包含了多個 Column
  • Column : 代表一個欄位,包含了欄位名稱、欄位類型、欄位長度等等資訊

在 executor 中,如果我們要讀取資料表的資料,通常會要通過 exec_ctx_->GetCatalog() 先取得 Catalog,然後再繼續往下取得 TableInfoTableHeap 等我們需要的資訊。

整體來說單一的函數實作起來難度應該不是太高,所以下面我就簡單說明一下流程跟一些可能會用到的方法就好。

Task 1 - Access Method Executors

SeqScan

使用 TableIterator 來掃描,注意跳過被刪除的資料跟 WHERE 條件,可以用 Evaluate() 來判斷 WHERE 條件。

Insert

除了 TableHeap::InsertTuple() 之外,還需要新增索引,可以用 InsertEntry() 來新增索引,key 可以用 KeyFromTuple() 來取得。

需要注意要有一個參數負責記錄之前有沒有回傳過結果了,因為上層會不斷呼叫 Next(),如果已經回傳過結果了就要回傳 false,包括後面的一些 executor 也是一樣。

Update

先把舊的 TupleMeta 使用 UpdateTupleMeta() 標記為刪除,接著把 index 刪除,然後再插入新的 tuple 並插入新的索引。

Delete

沒啥特別好說的,就是把 TupleMeta 標記為刪除,然後刪除索引。

IndexScan

pred_keys_ 判斷是點查詢還是範圍查詢,如果是點查詢就用 ScanKey() 查詢每一個 key,如果是範圍查詢就直接用 B+ tree 的迭代器來掃描。

IndexScan Optimizer

首先要先遞迴來遍歷整個 AbstractPlanNodeRef,接著檢查是不是 seq_scan + filter 的組合,並看看有沒有 index 可以用。

有三種情況要處理 :

SELECT * FROM t1 WHERE v1 = 1;
SELECT * FROM t1 WHERE 2 = v1;
SELECT * FROM t1 WHERE v1 = 1 OR v1 = 2;

對於前兩種情況,我們需要檢查謂詞是否為 ComparisonExpression,且比較類型為 Equal。 接著檢查左右子節點,確保一邊是 ColumnValueExpression (變數),另一邊是 ConstantValueExpression(常數)。 再來檢查 ColumnValueExpressioncol_idx 是否與某個可用索引的第一個鍵值欄位相符。 最後檢查 GetTupleIdx() == 0,這個檢查確保了這個欄位屬於當前正在掃描的這張表。

對於第三種情況,我們需要遞迴地分析。 如果表達式是一個 LogicExpression 且邏輯類型是 LogicType::Or,那麼我們就遞迴地對其左右子表達式應用前兩種情況的判斷邏輯。

Task 2 - Aggregation & Join Executors

Aggregation

首先要完成 CombineAggregateValues 中幾種函數的實作,這部分比較簡單,主要是要注意 NULL 的處理。

這個 executor 是一個 pipeline breaker,所以在 Init() 的時候就要把所有資料都讀進來並計算好結果,可以利用 MakeAggregateKeyMakeAggregateValue 並丟到 InsertCombine() 來做。 最後輸出的時候要把 AggregateKeyAggregateValue 的條件也一起輸出方便上面的 Projection 使用。

需要特別處理 empty input 的情況,可以呼叫 GenerateInitialAggregateValue() 來產生初始值。

NestedLoopJoin

需要處理 INNER JOIN 跟 LEFT JOIN 兩種情況。 這裡我的做法是先讀左邊的一個 tuple,然後重新 Init() 右邊的 executor,接著不斷呼叫右邊的 Next() 並用 EvaluateJoin 來判斷有沒有符合條件,如果有就把兩個 tuple 合併起來並回傳。

如果是 LEFT JOIN 的話,當右邊的 executor 掃描完之後還沒有找到符合條件的 tuple,就要回傳左邊的 tuple 並把右邊的欄位設為 NULL,可以使用 ValueFactory::GetNullValueByType() 來產生 NULL

NestedIndexJoin

整體邏輯跟 NestedLoopJoin 差不多,差別在於右邊的 executor 是一個 IndexScan,所以每次讀左邊的 tuple 之後要用 KeyPredicate 來生成 probe key。

Task 3 - HashJoin Executor and Optimization

HashJoin

這個 task 要做的事情還蠻多的,除了 Init()Next() 之外,還要實作針對 std::vector<Value> 的 hash 方法。

我實作了一個 JoinKey 的 class 來代表 join key,並且實作了 operator==std::hash 來讓它可以被 hash,這部分可以大致參考 AggregationPlanNode 的實作。

Init() 的時候可以先把一邊的資料都讀進來並建立 hash table,我的儲存方式是 std::unordered_map<JoinKey, std::vector<Tuple>>

Next() 的時候就不斷讀另一邊的資料,然後用 join key 去 hash table 裡面找有沒有符合的 tuple,如果有就把兩個 tuple 合併起來並回傳,需要特別處理一對多的情況。

HashJoin Optimizer

有兩種情況可以優化成 HashJoin :

SELECT * FROM t1 JOIN t2 ON t1.v1 = t2.v1;
SELECT * FROM t1 JOIN t2 ON t1.v1 = t2.v1 AND t1.v2 = t2.v2;

首先,跟其他 optimizer 一樣需要先把樹遞迴到底在開始。

對於第一種情況來說,需要檢查掃描方法是不是 NestedLoopJoin 且是透過 ComparisonType::EqualColumnValueExpression 來 join 的,接著再確定他們是不是來自不同的 table。

對於第二種情況來說,還需要考慮 AND 的情況,可以用遞迴的方式來檢查 LogicExpression 是不是 LogicType::And,然後檢查左右子節點符不符合第一種情況的條件。

Task 4 - External Merge Sort + Limit Executors

External Merge Sort

首先要實作 TupleComparator::operator() 來比較兩個 tuple 的大小,這部分可以用 Value 裡面的 CompareEquals()CompareLessThan() 來比較大小,要區分 ASC (DEFAULT) 跟 DESC

接著實作 GenerateSortKey() 來產生排序的 key,可以用 MakeKeyFromTuple() 來產生。

然後需要實作 SortPageMergeSortRun 來代表硬碟上的 page 以及一個邏輯上的 run,可以自行依據需求寫一些 helper function。 在我自己的實作中,SortPage 會包含一個成員 char data_[BUSTUB_PAGE_SIZE],這個成員的前 4 個 byte 會用來記錄這個 page 裡面有多少個 tuple,剩下的空間就用來放 tuple 的資料。

這邊有兩個問題要注意,第一個是要在 MergeSortRun 中的 Iterator 保留上一次讀取的 ReadPageGuard,不然測試很容易超時。 第二個是如果遇到了 test incurred 0 times of disk deletion, which is too low 的錯誤訊息,代表有可能在 project 1 的時候就沒有正確使用 disk_scheduler_->DeallocatePage() 導致它抓不到 page 的刪除次數。

最後就是實作真正的 ExternalMergeSortExecutor 了,這裡我實作了 4 個 helper function 來分別處理不同的階段 :

  • CreateInitialRuns() : 從 child executor 拉 tuple,組成 <sort_key, tuple> 的 pair,滿一個 page 的大小就呼叫 FlushRunToDisk() 來把這些 tuple 排序並寫到硬碟上
  • FlushRunToDisk() : 把一個 run 的 tuple 排序並寫到硬碟上,這裡可以用 std::sort() 來排序,並寫入到 SortPage 中最後把 page_id 記錄起來並塞回硬碟中
  • MergeRuns() : 每次取出兩個 page 並呼叫 MergeTwoRuns() 來合併成一個新的 run,最後回傳新的 run 的 page_id
  • MergeTwoRuns() : 運用兩個 iterator 來將兩個 MergeSortRun 合併成一個新的 MergeSortRun

同時這個 executor 也是一個 pipeline breaker,所以在 Init() 的時候就要把所有資料都讀進來並排序好,最後在 Next() 的時候只需要不斷地從排序好的 run 中讀取 tuple 並回傳。

Limit

這應該是最簡單的 executor,只需要在 Next() 的時候計數,當超過 limit 的數量就回傳 false。

Submit

如果要執行 SQL,可以使用以下指令或是到這個網站上執行 :

make -j$(nproc) shell && ./bin/bustub-shell

測試的方式有點麻煩,要自己一個一個試 :

make -j$(nproc) sqllogictest && \
./bin/bustub-sqllogictest ../test/sql/p3.05-index-scan-btree.slt --verbose
make format && make check-format && make check-lint && make check-clang-tidy-p3
make submit-p3

Takeaways

這個 project 做起來其實蠻無聊的,因為大部分都是照著規格一步一步實作就好,沒有什麼特別的技巧,不過因為要看的東西不少,所以還是花了我蠻多時間的。

對於想要快速通關的人來說我覺得 AI 在這個 project 上面應該可以幫助很大,但要注意一定要能理解 AI 幫你寫的東西,因為下一個 project 會需要在這個 project 的基礎上繼續實作。

References