新聞中心
1背景
在日常Flink使用過(guò)程中,我們經(jīng)常遇到Flink任務(wù)中某些Slot或者TM負(fù)載過(guò)重的問(wèn)題,對(duì)日常的資源調(diào)配、運(yùn)維以及降本都帶來(lái)了很大的影響,所以我們對(duì)Flink的task部署機(jī)制進(jìn)行了梳理和調(diào)研,準(zhǔn)備在后續(xù)的工作中進(jìn)行優(yōu)化。由于jobGraph的生成以及任務(wù)提交流程因任務(wù)部署方式而不同,對(duì)我們后續(xù)的分析也沒(méi)有影響,這里忽略前置流程,直接從Dispatcher出發(fā),重點(diǎn)關(guān)注submit后executionGraph構(gòu)建以及后續(xù)的任務(wù)部署過(guò)程。

創(chuàng)新互聯(lián)主營(yíng)彌勒網(wǎng)站建設(shè)的網(wǎng)絡(luò)公司,主營(yíng)網(wǎng)站建設(shè)方案,app軟件開(kāi)發(fā)公司,彌勒h5微信平臺(tái)小程序開(kāi)發(fā)搭建,彌勒網(wǎng)站營(yíng)銷(xiāo)推廣歡迎彌勒等地區(qū)企業(yè)咨詢(xún)
2Flink Scheduling Components 構(gòu)成
2.1 SchedulerNG
在Dispatcher收到submit請(qǐng)求后,先是啟動(dòng)了JobManagerRunner,再啟動(dòng)JobMaster,在初始化jobMaster的過(guò)程中,我們注意到這里開(kāi)始了整個(gè)作業(yè)的Scheduling第一步,創(chuàng)建SchedulerNG。
this.schedulerNG =
createScheduler(
slotPoolServiceSchedulerFactory,
executionDeploymentTracker,
jobManagerJobMetricGroup,
jobStatusListener);
我們看下SchedulerNG的職責(zé),可以看到調(diào)度的發(fā)起,作業(yè)狀態(tài)的跟蹤以及我們熟悉的cp,sp的trigger都是在這里:
我們這次主要跟蹤構(gòu)建executionGraph,然后根據(jù)Scheduling策略發(fā)起的整個(gè)部署過(guò)程。
2.2 ExecutionGraph
現(xiàn)階段(1.13)SchedulerNG默認(rèn)實(shí)現(xiàn)是DefaultScheduler,初始化過(guò)程中就會(huì)開(kāi)始構(gòu)建我們的ExecutionGraph,ExecutionGraph中有幾個(gè)重要元素
- ExecutionJobVertex: 代表jobGraph中的一個(gè)JobVertex,是所有并行Task的集合
- ExecutionVertex: 代表ExecutionJobVertex中并行task中的一個(gè),一個(gè)ExecutionJobVertex可能同時(shí)有很多并行運(yùn)行的ExecutionVertex
- Execution: 代表ExecutionVertex的一次部署/執(zhí)行,一個(gè)ExecutionVertex可能會(huì)有很多次Execution
這里executionGraph通過(guò)jobGraph的拓?fù)鋱D構(gòu)建了自己的核心結(jié)構(gòu),看下從JobVertex到ExecutionJobVertex 的轉(zhuǎn)換流程:
// topologically sort the job vertices and attach the graph to the existing one
ListsortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources();
executionGraph.attachJobGraph(sortedTopology){
1. executionGraph第一步拿到了jobGraph中的有序JobVertex列表
2. 接著一對(duì)一創(chuàng)建ExecutionJobVertex
3. 根據(jù)producer并行度生成producedDataSets(IntermediateDataSet)
4. 再根據(jù)自身并行度生成所屬的ExecutionVertex[]
5. 構(gòu)建stateBackend信息和checkpointStorage信息等
6. 最后完成executionGraph的拓?fù)錁?gòu)建executionTopology
}
2.3 執(zhí)行層拓?fù)浣Y(jié)構(gòu)
我們知道Flink引擎在不停的致力于批流一體建設(shè),調(diào)度層的統(tǒng)一也是其中核心的一層。為了提高failover后recovery速度,減少對(duì)Flink任務(wù)的影響,現(xiàn)在Flink對(duì)于批、流的任務(wù)task調(diào)度都是以pipeline region為基礎(chǔ)。
Pipeline region的構(gòu)建內(nèi)嵌在executionGraph的初始化過(guò)程中,我們知道Flink中各個(gè)節(jié)點(diǎn)之間的鏈接都會(huì)有IntermediateDataSet這一種邏輯結(jié)構(gòu),用來(lái)表示JobVertex的輸出,即該JobVertex中包含的算子會(huì)產(chǎn)生的數(shù)據(jù)集。這個(gè)數(shù)據(jù)集的ResultPartitionType有幾種類(lèi)型:
BLOCKING:都上游處理完數(shù)據(jù)后,再交給下游處理。這個(gè)數(shù)據(jù)分區(qū)可以被消費(fèi)多次,也可以并發(fā)消費(fèi)。這個(gè)分區(qū)并不會(huì)被自動(dòng)銷(xiāo)毀,而是交給調(diào)度器判斷。
BLOCKING_PERSISTENT:類(lèi)似于Blocking,但是其生命周期由用戶(hù)端指定。調(diào)用JobMaster或者ResourceManager的API來(lái)銷(xiāo)毀,而不是由調(diào)度器控制。
PIPELINED:流交換模式。可以用于有界和無(wú)界流。這種分區(qū)類(lèi)型的數(shù)據(jù)只能被每個(gè)消費(fèi)者消費(fèi)一次。且這種分區(qū)可以保留任意數(shù)據(jù)。
PIPELINED_BOUNDED:該策略在PIPELINED的基礎(chǔ)上保留有限制的buffer,避免對(duì)barrier造成阻塞。
PIPELINED_APPROXIMATE:和PIPELINED_BOUNDED類(lèi)似,可以支持下游task重啟后繼續(xù)消費(fèi),用來(lái)支持task failover后的Approximate Local-Recovery策略。
接下來(lái)我們看看executionGraph的核心拓?fù)浣Y(jié)構(gòu)ExecutionTopology是如何構(gòu)建的:
第一步 先根據(jù)executionTopology構(gòu)建rawPipelinedRegions,多個(gè)vertex能否組合成一個(gè)pipeline region的關(guān)鍵在于這個(gè)vertex的consumedResult.getResultType().isReconnectable(),如果支持重連,那么兩個(gè)vertex之間就會(huì)進(jìn)行拆分,劃到不同的region。這里的isReconnectable就和我們的ResultPartitionType類(lèi)型有關(guān),流處理中的PIPELINED和PIPELINED_BOUNDED都是默認(rèn)的false,在這種情況下所有的vertex其實(shí)都會(huì)放入同一個(gè)region。故我們?nèi)粘5膄link作業(yè)其實(shí)都只會(huì)生成一個(gè)pipeline region。
第二步 根據(jù)不同的pipeline region構(gòu)建自己的resultPartition信息,這個(gè)是為了構(gòu)建后續(xù)的PartitionReleaseStrategy,決定一個(gè)resultPartition何時(shí)finish以及被release
第三步 對(duì)vertex的coLocation情況進(jìn)行校驗(yàn),保證co-located tasks必須在同一個(gè)pipeline Region里。這里是因?yàn)楹罄m(xù)的scheduling strategy里會(huì)保證不同pipeline region的調(diào)度部署是階段隔離的,可能無(wú)法滿(mǎn)足colocation-constraint
2.4 Scheduling 策略
SchedulerNG Scheduling策略默認(rèn)為PipelinedRegionSchedulingStrategy,在executionGraph完成之后,就可以根據(jù)生成的剛剛executionTopology來(lái)初步構(gòu)建初步的Scheduling策略了。這里看下startScheduling代碼,可以看到Scheduling過(guò)程就是我們常說(shuō)的基于pipeline region的Scheduling。
@Override
public void startScheduling() {
final SetsourceRegions =
IterableUtils.toStream(schedulingTopology.getAllPipelinedRegions())
.filter(this::isSourceRegion)
.collect(Collectors.toSet());
maybeScheduleRegions(sourceRegions);
}
2.5 Execution Slot 分配器
默認(rèn)實(shí)現(xiàn)是SlotSharingExecutionSlotAllocator,在schedulerNG完成executionGraph構(gòu)建完成后,需要進(jìn)一步構(gòu)建Execution Slot 分配器。用于將physical shared slots分配到我們的logical slots 上,并將logical slot 分配給我們executionGraph中的execution(task)。通過(guò)代碼我們可以看到ExecutionSlotAllocator的職責(zé)非常簡(jiǎn)單,只有簡(jiǎn)單的allocate和cancel。
但在實(shí)現(xiàn)上這里有幾個(gè)重要元素需要了解:
LocalInputPreferredSlotSharingStrategy :在Flink內(nèi)部,所有的slot分配都是基于sharingslot來(lái)操作的,在滿(mǎn)足co-location的基礎(chǔ)上,F(xiàn)link期望將producer和consumeNode task盡可能的分布在一起,以減少數(shù)據(jù)傳輸成本。
SlotProfile:slot的資源信息,對(duì)task -> logical slot -> physical slot的mapping有非常重要的作用,包含了task的資源信息,slot的物理資源信息,傾向的location(TaskManagerLocation),傾向的allocation以及整個(gè)executionGraph之前分配過(guò)的allocation(用于黑名單,重啟后盡量避免分配在之前的slot里)。
ResourceProfileRetriever: 用于獲取executionVertex的實(shí)際資源信息。默認(rèn)是unknown,如果有明細(xì)配置會(huì)用于后續(xù)的executionSlotSharingGroup資源構(gòu)建。
ExecutionSlotSharingGroup:Flink task資源申請(qǐng)的最終邏輯載體,用于將sharing到一起的task(execution group)組合成一個(gè)group用于生成資源,后續(xù)部署也會(huì)綁定對(duì)應(yīng)的task。
3Scheduling 主要過(guò)程
在JobMaster完成自身構(gòu)建之后,就委托SchedulerNG來(lái)開(kāi)始了整個(gè)job的Scheduling:
@Override
protected void startSchedulingInternal() {
log.info(
"Starting scheduling with scheduling strategy [{}]",
schedulingStrategy.getClass().getName());
transitionToRunning();
schedulingStrategy.startScheduling();
}
可以看到這里是由schedulingStrategy來(lái)負(fù)責(zé)整個(gè)調(diào)度過(guò)程的,也就是我們的PipelinedRegionSchedulingStrategy,
one by one將pipeline region進(jìn)行部署
private void maybeScheduleRegions(final Setregions) {
final ListregionsSorted =
SchedulingStrategyUtils.sortPipelinedRegionsInTopologicalOrder(
schedulingTopology, regions);
final MapconsumableStatusCache = new HashMap<>();
for (SchedulingPipelinedRegion region : regionsSorted) {
maybeScheduleRegion(region, consumableStatusCache);
}
}
遍歷region中的ExecutionVertex依次進(jìn)行部署
final ListvertexDeploymentOptions =
SchedulingStrategyUtils.createExecutionVertexDeploymentOptions(
regionVerticesSorted.get(region), id -> deploymentOption);
schedulerOperations.allocateSlotsAndDeploy(vertexDeploymentOptions);
將vertexDeployment交給SlotSharingExecutionSlotAllocator處理
private ListallocateSlots(
final ListexecutionVertexDeploymentOptions) {
return executionSlotAllocator.allocateSlotsFor(
executionVertexDeploymentOptions.stream()
.map(ExecutionVertexDeploymentOption::getExecutionVertexId)
.collect(Collectors.toList()));
}
接下來(lái)整個(gè)allocate的主要過(guò)程如下(忽略physical fail等情況)
通過(guò)SlotSharingStrategy拿到每個(gè)execution對(duì)應(yīng)的ExecutionSlotSharingGroup
- 先從 corresponding co-location constraint 去mapping中尋找是否有存在的slot sharing group
- 接著從producer 的角度來(lái)逐一檢查是否可以合并到同一個(gè)slot sharing group.
- 最后嘗試所有剩下的slot sharing group看是否符合execution 的要求(如同屬于一個(gè)job vertex的task不能分配到同一個(gè) slot sharing group).
- 如果以上都沒(méi)有滿(mǎn)足條件的就創(chuàng)建一個(gè)新的slot sharing group
- 檢查ExecutionSlotSharingGroup是否已經(jīng)有了對(duì)應(yīng)的sharedSlot
- 遍歷尚未得到分配的ExecutionSlotSharingGroup
- 計(jì)算對(duì)應(yīng)的SlotProfile
- 向PhysicalSlotProvider申請(qǐng)新的physical slot
-
rm側(cè)會(huì)先檢查是否已經(jīng)有滿(mǎn)足條件的excess slot
-
如果沒(méi)有嘗試會(huì)申請(qǐng)新的woker以提供資源
-
由sharedSlotProfileRetriever來(lái)創(chuàng)建對(duì)應(yīng)的slotProfile并構(gòu)建PhysicalSlotRequest
-
PhysicalSlotProvider向slotPool申請(qǐng)新的slot
-
slotPool會(huì)向rm側(cè)申請(qǐng)新的slot
-
利用physical slot future提前創(chuàng)建sharedSlotFutrue
-
將sharedSlotFutrue 分配給所有相關(guān)的executions
-
最后生成所有的SlotExecutionVertexAssignments
在完成所有的SlotExecutionVertexAssignment之后,生成對(duì)應(yīng)的DeploymentHandle并等待所有的assignedSlot創(chuàng)建完畢,正式開(kāi)始部署對(duì)應(yīng)的任務(wù)。?
4問(wèn)題思考
我們對(duì)整個(gè)Flink task的部署過(guò)程完成梳理后,重新對(duì)我們一開(kāi)始的問(wèn)題進(jìn)行思考:
4.1 為什么會(huì)出現(xiàn)slot負(fù)載過(guò)重的情況?如何避免?
問(wèn)題的產(chǎn)生在于大量的task集中分配到了統(tǒng)一個(gè)sharedSlot,這個(gè)我們可以發(fā)現(xiàn)其實(shí)是在ExecutionSlotSharingGroup的構(gòu)建過(guò)程中產(chǎn)生的。我們看下源碼,可以很直接的看到整個(gè)group的分配是一個(gè)roundRobin過(guò)程,而executionVertices來(lái)自于有序拓?fù)浣Y(jié)構(gòu),中間傳遞過(guò)程也保證了有序性,所以最終會(huì)導(dǎo)致大量的task分配的index靠前的group中,最后落到了同一個(gè)slot。
為了避免這種情況,我們的做法其實(shí)有比較多,一種是在保證各種constraint的同時(shí)添加隨機(jī)性,以打散各個(gè)不均勻的task;還有一種就是構(gòu)建基于load-balance的分配過(guò)程,以盡可能的將task分布均勻。
附Flink部分源碼:
private void findAvailableOrCreateNewExecutionSlotSharingGroupFor(
final ListexecutionVertices) {
for (SchedulingExecutionVertex executionVertex : executionVertices) {
final SlotSharingGroup slotSharingGroup =
getSlotSharingGroup(executionVertex.getId());
final Listgroups =
executionSlotSharingGroups.computeIfAbsent(
slotSharingGroup.getSlotSharingGroupId(), k -> new ArrayList<>());
ExecutionSlotSharingGroup group = null;
for (ExecutionSlotSharingGroup executionSlotSharingGroup : groups) {
if (isGroupAvailableForVertex(
executionSlotSharingGroup, executionVertex.getId())) {
group = executionSlotSharingGroup;
break;
}
}
if (group == null) {
group = new ExecutionSlotSharingGroup();
group.setResourceProfile(slotSharingGroup.getResourceProfile());
groups.add(group);
}
addVertexToExecutionSlotSharingGroup(executionVertex, group);
}
}
4.2 如何避免tm級(jí)別的負(fù)載過(guò)重?
這個(gè)問(wèn)題主要是在于說(shuō)有一些過(guò)重的task對(duì)應(yīng)的slot都分配在了同一個(gè)tm上,導(dǎo)致整個(gè)tm壓力過(guò)大,資源難以協(xié)調(diào)。在整個(gè)過(guò)程中其實(shí)我們有看到tm信息的交互,在co-location constraint上。我們看下該hint職責(zé):
The co-location group is used to make sure that the i-th subtasks for iteration head and iteration tail are scheduled on the same TaskManager.
也就是說(shuō)其實(shí)是為了解決算子間相同index的task數(shù)據(jù)傳遞之類(lèi)的問(wèn)題,但對(duì)于task的均衡負(fù)載無(wú)法介入。對(duì)此我們嘗試去做的事情:
在當(dāng)前不使用細(xì)粒度資源配置的情況下,考慮task-slot之間均衡分布的同事,task-tm也能做到一定的負(fù)載均衡。這種情況可以通過(guò)tm單slot來(lái)解決,也可以在保證task-slotSharingGroup足夠隨機(jī)性的同時(shí),保證slotSharingGroup-tm的足夠隨機(jī)性。
在后續(xù)使用使用細(xì)粒度資源配置的情況下,不使用slotsharing,且將相同jobVertex對(duì)應(yīng)的task盡量分布在同一個(gè)task當(dāng)中。這個(gè)我們后續(xù)準(zhǔn)備在slotProfile中加入jobVertex相關(guān)的tag,SlotAllocator做slot matching的時(shí)候加入jobVertex constraint來(lái)保證task的位置分配。
5寫(xiě)在最后
Flink開(kāi)源社區(qū)較活躍,Task側(cè)的部署鏈路也一直在演進(jìn)中,持續(xù)跟進(jìn)并深入了解內(nèi)部實(shí)現(xiàn)邏輯能更好的支持我們解決Flink個(gè)性化調(diào)度策略上的一些問(wèn)題。后續(xù)我們也準(zhǔn)備進(jìn)一步完善Flink在operator級(jí)別的細(xì)粒度資源配置能力,降低資源使用率的同時(shí)進(jìn)一步提高Flink作業(yè)穩(wěn)定性。
當(dāng)前標(biāo)題:FlinkTask調(diào)度部署機(jī)制
瀏覽路徑:http://m.fisionsoft.com.cn/article/cojspsi.html


咨詢(xún)
建站咨詢(xún)
