新聞中心
概述
Facebook Velox 是一個(gè)針對(duì) SQL 運(yùn)行時(shí)的 C++ 庫,旨在統(tǒng)一 Facebook 各種計(jì)算流,包括 Spark 和 Presto,使用推的模式、支持向量計(jì)算。

Velox 接受一棵優(yōu)化過的 PlanNode Tree,然后將其切成一個(gè)個(gè)的線性的 Pipeline,Task 負(fù)責(zé)這個(gè)轉(zhuǎn)變過程,每個(gè) Task 針對(duì)一個(gè) PlanTree Segment。大多數(shù)算子是一對(duì)一翻譯的,但是有一些特殊的算子,通常出現(xiàn)在多個(gè) Pipeline 的切口處,通常來說,這些切口對(duì)應(yīng)計(jì)劃樹的分叉處,如 HashJoinNode,CrossJoinNode, MergeJoinNode ,通常會(huì)翻譯成 XXProbe 和 XXBuild。但也有一些例外,比如 LocalPartitionNode 和 LocalMergeNode 。
邏輯計(jì)劃翻譯成物理計(jì)劃,可調(diào)整 Pipeline 并發(fā)度
為了提高執(zhí)行的并行度,Velox 引入了 LocalPartitionNode 節(jié)點(diǎn),可以將一個(gè) Pipeline 進(jìn)行多線程(每個(gè)線程一個(gè)實(shí)例)并行運(yùn)行,并且互斥的消費(fèi)數(shù)據(jù)。其中每個(gè)實(shí)例稱為 Driver。該算子在輸入計(jì)劃樹里并沒有分叉(即沒有多個(gè) source),但在翻譯成物理算子時(shí),會(huì)在此節(jié)點(diǎn)處進(jìn)行切開,并在切口前后改變執(zhí)行的并行度,對(duì)應(yīng)的物理算子是LocalPartition 和 LocalExchange。
調(diào)整并發(fā)度算子,一個(gè)邏輯算子翻譯成兩個(gè)物理算子
還有一個(gè)特殊節(jié)點(diǎn),稱為 LocalMergeNode,該對(duì)輸入有要求:必須有序,然后會(huì)進(jìn)行單線程的歸并排序,從而使輸出全局有序。也因此,由其而切開的消費(fèi) Pipeline 一定是單 Driver 的。翻譯成算子,對(duì)應(yīng)兩個(gè) CallbackSink 和 LocalMerge。
Merge 算子,也是一種邏輯翻譯成兩種物理算子
總結(jié)一下,上述五個(gè) PlanNode,HashJoinNode,CrossJoinNode, MergeJoinNode ,LocalPartitionNode ,LocalMergeNode 在翻譯時(shí)會(huì)造成切口,即將邏輯 PlanTree 切成多個(gè)物理 Pipeline,因此在切口處會(huì)將一個(gè)邏輯算子翻譯成多個(gè)物理算子,分到不同 Pipeline 上。每個(gè) Pipeline 會(huì)有一個(gè)從 0 開始的編號(hào):Pipeline ID,是全局粒度的。
并且,可以由 LocalPartitionNode 來按需改變每個(gè) Pipeline 并行度,其中 Pipeline 的每個(gè)線程由一個(gè) Driver 來執(zhí)行。每個(gè) Driver 也有一個(gè)從 0 開始的編號(hào):Driver ID,是 Pipeline 粒度的。
其他 PlanNode 到算子的翻譯基本都是一對(duì)一的,感興趣的可以看官方文檔的這個(gè)頁面:Plan Nodes and Operators。
下面展開一些細(xì)節(jié)。
Splits
Velox 允許應(yīng)用層(即 Velox 的使用方)以 Splits (每個(gè)算子的輸入片段稱為 Split)的方式給 Pipeline 喂數(shù)據(jù),可以流式的喂,因此有兩個(gè) API:
- Task::addSplit(planNodeId, split) :喂一份數(shù)據(jù)給 Velox
- Task::noMoreSplits() :通知 Velox 我喂完了。
Velox 會(huì)使用一個(gè)隊(duì)列在緩存這些 Splits 數(shù)據(jù)。在數(shù)據(jù)喂完之前的任意一個(gè)時(shí)刻,Pipeline 的葉子算子(對(duì)的,外部喂數(shù)據(jù)只能發(fā)生在葉子節(jié)點(diǎn),如 TableScan,Exchange 和 MergeExchange)都可以從隊(duì)列中取數(shù)據(jù),對(duì)應(yīng) API 是 Task::getSplitOrFuture(planNodeId) ,返回值有兩種:
- 如果隊(duì)列中有數(shù)據(jù),則返回一個(gè) Split
- 如果隊(duì)列中無數(shù)據(jù),但還沒有收到喂完的信號(hào),則返回一個(gè) Future (類似于一個(gè)欠條,之后有數(shù)據(jù)之后,會(huì)憑該欠條兌付)。
Task 是 PlanTree Segment 執(zhí)行單位,可以通過 Splits 方式流式喂數(shù)據(jù)
Join Bridges and Barriers
Join (HashJoinNode 和 CrossJoinNode)會(huì)翻譯成 XXProbe 和 XXBuild 兩個(gè)算子,并且通過一個(gè)共享的 Bridge 來溝通數(shù)據(jù),兩側(cè) Pipeline 都可以通過 Task::getHashJoinBridge() 函數(shù)來根據(jù) PlanNodeId 獲取該共享的 Bridge。
為了提高 build 速度,build 側(cè) Pipeline 通常使用多個(gè) Driver 并發(fā)執(zhí)行。但由于只有一個(gè) Bridge,每個(gè) Driver 在結(jié)束時(shí)可以調(diào)用 Task::allPeersFinished() (內(nèi)部是使用一個(gè) BarrierState 的結(jié)構(gòu)來實(shí)現(xiàn)的)來判斷自己是否為最后一個(gè) Driver,如果是,則將所有 Driver 的輸出進(jìn)行合并后送到 Bridge。
當(dāng)然,在 RIGHT and FULL OUTER join 情況下,Probe 側(cè)也需要將沒有 match 上的數(shù)據(jù)喂給 Bridge,此時(shí)也需要由最后一個(gè) Driver 來負(fù)責(zé)這件事,于是同樣需要調(diào)用 Task::allPeersFinished() 函數(shù)。
使用 Bridge 對(duì) Join 兩側(cè) Pipeline 進(jìn)行數(shù)據(jù)橋接(Build->Probe)
下面來詳細(xì)看下 Join 類算子的切分細(xì)節(jié)。以 HashJoin 為例,Task 在切分 PlanTree 時(shí),會(huì)將邏輯上的一個(gè) HashJoin 算子,轉(zhuǎn)化成物理上的一對(duì)算子:HashProbe 和 HashJoin,并且使用異步機(jī)制進(jìn)行通知:在 HashJoin 完成后,通知 HashProbe 所在 Pipeline 繼續(xù)執(zhí)行,在此之前,后者是阻塞等待的。
Join 兩側(cè) Pipeline 是可以調(diào)整并發(fā)度的
如上圖,每個(gè) Pipeline 在實(shí)例化(邏輯 PlanNode 轉(zhuǎn)物理 Operator)的時(shí)候,可以生成多份,進(jìn)行并發(fā)執(zhí)行,互斥的消費(fèi)數(shù)據(jù)。并且,每個(gè) Pipeline 的并行粒度可以不一樣,如上圖 Probe Pipeline 實(shí)例化了兩份,而 Build Pipeline 實(shí)例化了三份。并且,Build Pipeline 組中最后一個(gè)運(yùn)行完的 Pipeline 負(fù)責(zé)將數(shù)據(jù)通過 Bridge 發(fā)送給 Probe Pipeline。
Exchange Clients
Velox 使用 Exchange Clients 來獲取遠(yuǎn)程 worker 的數(shù)據(jù)。分兩個(gè)步驟:
第一步,Pipeline 中第一個(gè) Driver (driverId == 0) 的 Exchange 算子從 Task 中獲取一個(gè) Split,并且初始化一個(gè)共享 Exchange Client。
第二步,Exchange Client 會(huì)為上游每個(gè) Task 構(gòu)造一個(gè) Exchange Source,并行的拉取每個(gè)上游 Task 同一個(gè) Partition (圖中是 Partition-15)數(shù)據(jù),然后將其放在 Client 的隊(duì)列 Queue 中。Exchange 的每個(gè) Driver 都會(huì)去隊(duì)列中拉取這些數(shù)據(jù)。
如何從上游 Task 拉取數(shù)據(jù)的邏輯,需要由用戶自定義實(shí)現(xiàn) ExchangeSource 和 ExchangeSource::Factory 。每個(gè) ExchangeSource 接受一個(gè)上游 Task 的字符串 ID、Partition 編號(hào)和一個(gè)隊(duì)列作為參數(shù)。然后會(huì)從上游 Task 中拉取該 Partition 的數(shù)據(jù),并且放到隊(duì)列中。
向上游 Task 遠(yuǎn)程(跨進(jìn)程)拉取數(shù)據(jù),也叫 MaterializePage
Local Exchange Queues
Local exchange 用于在一個(gè) Task 內(nèi)部調(diào)整數(shù)據(jù)并發(fā)度,會(huì)被翻譯成兩個(gè)物理算子:LocalPartition 和 LocalExchange。其中,LocalPartition 在生產(chǎn)側(cè) Pipeline,LocalExchange 在消費(fèi)側(cè) Pipeline。
中間通過 LocalExchangeQueues 來溝通生產(chǎn)者和消費(fèi)者,這些隊(duì)列在 Task 類中。對(duì)于每個(gè)消費(fèi)者(也即 LocalExchange 側(cè) Driver)Task 都會(huì)構(gòu)建一個(gè) LocalExchangeQueue 隊(duì)列;每個(gè)生產(chǎn)者 (LocalPartition)可以訪問所有隊(duì)列。在產(chǎn)生一條數(shù)據(jù)是,會(huì)對(duì)其按照某種方式進(jìn)行 Partition,然后寫到對(duì)應(yīng)隊(duì)列中。這個(gè)過程類似于 MapReduce 中的 Shuffle 階段。
本地改變并發(fā)度時(shí),使用一個(gè)隊(duì)列進(jìn)行數(shù)據(jù)溝通
具體來說,Local Exchange 可以有幾種方式改變并行度。如一改多、多改一。多改一,典型的例子如,并行 sort:先切成多個(gè)分片每個(gè)分片分別 sort,后通過 Local Exchange 進(jìn)行 merge sort。不僅單個(gè) Pipeline 的多個(gè) Driver 在進(jìn)行數(shù)據(jù)合并時(shí)可以用 Local Exchange,多個(gè) Pipeline 的合并也可以用 Local Exchange,不妨稱之為多并一。典型例子有,Union All,將多個(gè)數(shù)據(jù)集合并起來。
多改一
多并一
一改多通常用在,在經(jīng)歷了某些必須使用單線程的算子后(比如一些 Shuffle 算子),重新對(duì)數(shù)據(jù)分片提高并發(fā)度,使用多線程運(yùn)行。
一改多
Local Merge Sources
LocalMerge 算子和 LocalExchange 算子類似,但對(duì)并發(fā)數(shù)和輸入都有限定。其所在 Pipeline 只會(huì)單線程運(yùn)行,但會(huì)接受多線程運(yùn)行的 Pipeline 的輸入。并且要求所有輸入有序,然后將輸入進(jìn)行歸并,保證輸出是有序的。
LocalMerge 算子通過 Task::getLocalMergeSources() 來獲取所有待 Merge 的 sources。因此,每個(gè) LocalMergeNode 會(huì)初始化給定并發(fā)數(shù)個(gè) LocalMergeSource。
Merge Join Sources
MergeJoin 算子提供了某種接受右側(cè)輸入的方法。Task 會(huì)在右側(cè) Pipeline 增加一個(gè) CallbackSink 算子,來匯集數(shù)據(jù)。左側(cè)算子可以通過 Task::getMergeJoinSource() 接口來獲取該 CallbackSink 的輸出。
擴(kuò)展性
Velox 允許用戶自定義 PlanNode 和 Operator,以及 Join 相關(guān)的 Operator 和 Bridge。自定義 Operator 可以訪問 task 中的 splits 并使用 barriers。
但 Exchange clients, local exchange queues 和 local merge sources、 merge join sources 等狀態(tài)由于不是通用的,因此訪問不了。
總結(jié)
小節(jié)一下,Task 負(fù)責(zé)將由 PlanNode 組成的 PlanTree 翻譯成由 Operator 組成的 Pipeline,并且對(duì) Pipeline 進(jìn)行并發(fā)運(yùn)行。在此期間,Task 會(huì)維護(hù) Operator 間的共享狀態(tài)、協(xié)調(diào) Operator 間的運(yùn)行依賴。這些共享狀態(tài)包括:
- Splits
- Join bridges and barriers
- Exchange clients
- Local exchange queues
- Local merge sources
- Merge join sources
上述的每個(gè)狀態(tài)都是和特定 PlanNode 關(guān)聯(lián)的(即不是全局范圍的,而是和 PlanNode 綁定的),因此 Opeator 需要使用 PlanNodeID 來訪問相關(guān)狀態(tài)。前兩個(gè)狀態(tài)是所有算子都有的,因此自定義算子可以訪問到,后幾個(gè)狀態(tài)是某些算子特有的,因此自定義算子訪問不到。
分享題目:FacebookVelox運(yùn)行機(jī)制全面解析
網(wǎng)頁網(wǎng)址:http://m.fisionsoft.com.cn/article/copghdh.html


咨詢
建站咨詢
