查看原文
其他

Flink 执行引擎:流批一体的融合之路

zhisheng 2022-07-29

The following article is from Flink 中文社区 Author 马国维(黎钢)

摘要:本文由 Apache Flink Committer 马国维分享,主要介绍 Flink 作为大数据计算引擎的流批一体融合之路。内容包括:
  1. 背景
  2. 流批一体的分层架构
  3. 流批一体DataStream
  4. 流批一体DAG Scheduler
  5. 流批一体的Shuffle架构
  6. 流批一体的容错策略
  7. 未来展望


Tips:点击文末阅读原文可查看更多技术干货~ 

一、背景



随着互联网和移动互联网的不断发展,各行各业都积累海量的业务数据。而企业为了改善用户体验,提升产品在市场上的竞争力,都采取了实时化方式来处理大数据。社交媒体的实时大屏、电商的实时推荐、城市大脑的实时交通预测、金融行业的实时反欺诈,这些产品的成功都在说明大数据处理的实时化已经成为一个势不可挡的潮流。


在实时化的大趋势下,Flink 已经成为实时计算行业的事实标准。我们看到,不光是阿里巴巴,国内外各个领域的头部厂商,都把 Flink 做为实时计算的技术底座,国内有字节跳动、腾讯、华为,国外有 Netflix、Uber 等等。

而业务实时化只是一个起点,Flink 的目标之一就是给用户提供实时离线一体化的用户体验。其实很多用户不仅需要实时的数据统计,为了确认运营或产品的策略的效果,用户同时还需要和历史(昨天,甚至是去年的同期)数据比较。而从用户的角度来看,原有的流、批独立方案存在一些痛点:

  • 人力成本比较高。由于流和批是两套系统,相同的逻辑需要两个团队开发两遍。
  • 数据链路冗余。在很多的场景下,流和批计算内容其实是一致,但是由于是两套系统,所以相同逻辑还是需要运行两遍,产生一定的资源浪费。
  • 数据口径不一致。这个是用户遇到的最重要的问题。两套系统、两套算子,两套 UDF,一定会产生不同程度的误差,这些误差给业务方带来了非常大的困扰。这些误差不是简单依靠人力或者资源的投入就可以解决的。


2020 年的双十一,在实时洪峰到达 40 亿的历史新高的同时,Flink 团队与 DT 团队一起推出了基于 Flink 的全链路流批一体的数仓架构,很好解决了 Lambda 的架构所带来的一系列问题:流批作业使用同一 SQL,使研发效率提升了 3~4 倍;一套引擎确保了数据口径天然一致;流批作业在同一集群运行,削峰填谷大幅提升了资源效率。

Flink 流批一体的成功,离不开 Flink 开源社区的健康蓬勃发展。从 Apache 软件基金会 2020 年度报告可以看出,在反映开源社区繁荣情况的三个关键指标上 Flink 都名列前茅:用户邮件列表活跃度,Flink 排名第一;开发者提交次数 Flink 排名第二,Github 用户访问量排名第二。这些数据并不局限于大数据领域,而是 Apache 开源基金会下属的所有项目。


2020 年也是 Blink 反哺社区的第二年,这两年我们把 Blink 在集团内积累的经验逐步贡献回社区,让 Flink 成为真正意义上的流批一体平台。我希望通过这篇文章给大家分享下这两年 Flink 在执行引擎流批融合方面做了哪些工作。同时也希望 Flink 的老用户和新朋友可以进一步了解 Flink 流批一体架构的“前世今生”。
 

二、流批一体的分层架构



总体来说,Flink 的核心引擎主要分为如下三层:

  • SDK 层。Flink 的 SDK 主要有两类,第一类是关系型 Relational SDK 也就是  SQL/Table,第二类是物理型 Physical SDK 也就是 DataStream。这两类 SDK 都是流批统一,即不管是 SQL 还是 DataStream,用户的业务逻辑只要开发一遍,就可以同时在流和批的两种场景下使用;
  • 执行引擎层。执行引擎提供了统一的 DAG,用来描述数据处理流程 Data Processing Pipeline(Logical Plan)。不管是流任务还是批任务,用户的业务逻辑在执行前,都会先转化为此 DAG 图。执行引擎通过 Unified DAG Scheduler 把这个逻辑 DAG 转化成在分布式环境下执行的Task。Task 之间通过 Shuffle 传输数据,我们通过 Pluggable Unified Shuffle 架构,同时支持流批两种 Shuffle 方式;
  • 状态存储。状态存储层负责存储算子的状态执行状态。针对流作业有开源  RocksdbStatebackend、MemoryStatebackend,也有商业化的版本的GemniStateBackend;针对批作业我们在社区版本引入了 BatchStateBackend。
 
本文主要分享如下几个方面的内容:

  1. 流批一体的 DataStream 介绍了如何通过流批一体的 DataStream 来解决 Flink SDK 当前面临的挑战;
  2. 流批一体的 DAG Scheduler 介绍了如何通过统一的 Pipeline Region 机制充分挖掘流式引擎的性能优势;如何通过动态调整执行计划的方式来改善引擎的易用性,提高系统的资源利用率;
  3. 流批一体的 Shuffle 架构介绍如何通过一套统一的 Shuffle 架构既可以满足不同  Shuffle 在策略上的定制化需求,同时还能避免在共性需求上的重复开发;
  4. 流批一体的容错策略介绍了如何通过统一的容错策略既满足批场景下容错又可以提升流场景下的容错效果。
 

三、流批一体 DataStream


SDK 分析以及面临的挑战



如上图所示,目前 Flink 提供的 SDK 主要有三类:

  1. Table/SQL 是一种 Relational 的高级 SDK,主要用在一些数据分析的场景中,既可以支持 Bounded 也可以支持 Unbounded 的输入。由于 Table/SQL 是  Declarative 的,所以系统可以帮助用户进行很多优化,例如根据用户提供的Schema,可以进行 Filter Push Down 谓词下推、按需反序列二进制数据等优化。目前 Table/SQL 可以支持 Batch 和 Streaming 两种执行模式。[1]
  2. DataStream 属于一种 Physical SDK。Relatinal SDK 功能虽然强大,但也存在一些局限:不支持对 State、Timer 的操作;由于 Optimizer 的升级,可能导致用相同的 SQL 在两个版本中出现物理执行计划不兼容的情况。而 DataStream SDK,既可以支持 State、Timer 维度 Low Level 的操作,同时由于 DataStream 是一种  Imperative SDK,所以对物理执行计划有很好的“掌控力”,从而也不存在版本升级导致的不兼容。DataStream 目前在社区仍有很大用户群,例如目前未 Closed 的 DataStream issue 依然有近 500 个左右。虽然 DataStream 即可以支持 Bounded  又可以支持 Unbounded Input 用 DataStream 写的 Application,但是在 Flink-1.12 之前只支持 Streaming 的执行模式。
  3. DataSet 是一种仅支持 Bounded 输入的 Physical SDK,会根据 Bounded 的特性对某些算子进行做一定的优化,但是不支持 EventTime 和 State 等操作。虽然  DataSet 是 Flink 提供最早的一种 SDK,但是随着实时化和数据分析场景的不断发展,相比于 DataStream 和 SQL,DataSet 在社区的影响力在逐步下降。
 
目前 Table/SQL 对于流批统一的场景支持已经比较成熟,但是对于 Phyiscal SDK 来说还面临的一些挑战,主要有两个方面:

  1. 利用已有 Physical SDK 无法写出一个真正生产可以用的流批一体的 Application。例如用户写一个程序用来处理 Kafka 中的实时数据,那么利用相同的程序来处理存储在 OSS/S3/HDFS 上的历史数据也是非常自然的事情。但是目前不管是 DataSet 还是 DataStream 都无法满足用户这个“简单”的诉求。大家可能觉得奇怪,DataStream 不是既支持 Bounded 的 Input 又支持 Unbounded 的 Input,为什么还会有问题呢?其实“魔鬼藏在细节中”,我会在 Unified DataStream 这一节中会做进一的阐述。
  2. 学习和理解的成本比较高。随着 Flink 不断壮大,越来越多的新用户加入 Flink 社区,但是对于这些新用户来说就要学习两种 Physical SDK。和其他引擎相比,用户入门的学习成本是相对比较高的;两种 SDK 在语义上有不同的地方,例如 DataStream 上有 Watermark、EventTime,而 DataSet 却没有,对于用户来说,理解两套机制的门槛也不小;由于这两 SDK 还不兼容,一个新用户一旦选择错误,将会面临很大的切换成本。
 

Unified Physical SDK

 

为了解决上述 Physical SDK 所面临的挑战,我们把 Unified DataStream SDK 作为  Flink 统一的 Physical SDK。这个部分主要解决两个问题:

  1. 为什么选择 DataStream 作为 Unified Physical SDK?

  2. Unified DataStream 比“老”的 DataStream 提供了哪些能力让用户可以写出一个真正生产可以用的流批一体 Application?


为什么不是 Unified DataSet


为了解决学习和理解成本比较高的问题,最自然最简单的方案就是从 DataStream 和  DataSet 中选择一个作为 Flink 的唯一的 Physical SDK。那为什么我们选择了  DataStream 而不是 DataSet 呢?主要有两个原因:

  1. 用户收益。在前边已经分析过,随着 Flink 社区的发展,目前 DataSet 在社区的影响力逐渐下降。如果选择使用 DataSet 作为 Unified Physical SDK,那么用户之前在 DataStream 大量“投资”就会作废。而选择 DataStream,可以让许多用户的已有 DataStream “投资”得到额外的回报;
  2. 开发成本。DataSet 过于古老,缺乏大量对于现代实时计算引擎基本概念的支持,例如 EventTime、Watermark、State、Unbounded Source 等。另外一个更深层的原因是现有 DataSet 算子的实现,在流的场景完全无法复用,例如 Join 等。而对于 DataStream 则不然,可以进行大量的复用。那么如何在流批两种场景下复用  DataStream 的算子呢?
 

Unified DataStream


很多对 Flink 有一定了解的用户可能会问:DataStream 是同时支持 Bounded/Unbounded 的输入,为什么我们会说:用 DataStream 无法写出一个真正生产可以用的流批一体 Application 呢?简单来说,DataStream 原本主要设计目标是给 Unbounded 场景使用的,所以导致在 Bounded 的场景下在效率、可用性、易用性方面和传统的批引擎还有一定距离。具体来说体现在如下两个方面:

  • 效率


先给大家看一个例子,下边是一个跑同样规模的 WordCount 的作业,DataStream 和  DataSet 的性能对比图。从这个例子可以看出,DataSet 的性能是 DataStream 将近 5  倍。


很明显,要让 DataStream 在生产中既可以支持流的场景又要支持批的场景,就一定要大幅提高 DataStream 在 Bounded 场景下效率。那么为什么 DataStream 的效率要比  DataSet 的效率低呢?

前面我们已经提到,DataStream 原本主要设计目标是给  Unbounded 的场景下使用的,而 Unounded 场景下一个主要的特点就是乱序,也就是说任何一个 DataStream 的算子无法假设处理的 Record 是按照什么顺序进行的,所以许多算子会用一个 K/V 存储来缓存这些乱序的数据,等到合适的时候再从 K/V 存储中取出这些数据进行处理并且进行输出。一般情况下,算子对 K/V 存储访问涉及大量的序列化和反序列化,同时也会引发随机磁盘 I/O;而在 DataSet 中,假设数据是有界的,也就是可以通过优化来避免随机的磁盘 I/O 访问,同时也对序列化和反序列化做了相关优化。这也是为什么用 DataSet 写的 WorkerCount 要比用 DataStream 写的 WordCount  快 5 倍主要原因。 

知道到了原因,是不是要把所有的 DataStream 的算子,都重写一遍就可以了呢?理论上没问题,但是 DataStream 有大量的算子需要重写,有些算子还比较复杂,例如与  Window 相关的一系列算子。可以想象到,如果都全部重写,工程量是非常之巨大的。所以我们通过单 Key 的 BatchStateBackend 几乎完全避免了对所有算子重写,同时还得到了非常不错的效果。

  • 一致性


对于 Flink 有一定了解的同学应该都知道,原来用 DataStream 写的 Application 都采用  Streaming 的执行模式,在这种模式下是通过 Checkpoint 的方式保持端到端的 Exactly Once 的语义,具体来说一个作业的 Sink 只有当全图的所有算子(包括 Sink 自己)都做完各自的 Snapshot 之后,Sink 才会把数据 Commit 到外部系统中,这是一个典型的依赖  Flink Checkpoint 机制的 2PC 协议。
 
而在 Bounded 的场景下虽然也可以采用 Streaming 的方式但是对于用户来说可能会存在一些问题:

  1. 资源消耗大: 使用 Streaming 方式,需要同时拿到所有的资源。在某些情况下,用户可能没有这么多资源;
  2. 容错成本高: 在 Bounded 场景下,为了效率一些算子可能无法支持 Snapshot 操作,一旦出错可能需要重新执行整个作业。 
 
所以在 Bounded 场景下,用户希望 Application 可以采用 Batch 执行模式,因为利用  Batch 执行的模式可以非常自然的解决上述两个问题。在 Bounded 场景下支持 Batch 的执行模式是比较简单的,但是却引入了一个非常棘手的问题——利用已有的 Sink API  无法保证端到端的 Exactly Once 语义。这是由于 Bounded 场景下是没有 Checkpoint  的,而原有 Sink 都是依赖 Checkpoint 保证端到端的 ExactlyOnce 的。同时我们不希望开发者针对 Sink 在不同模式下开发两套不同的实现,因为这样非常不利用 Flink 和其他生态的对接。

实际上,一个 Transactional 的 Sink 主要解决如下 4 个问题:

  1. What to commit?
  2. How to commit?
  3. Where to commit?
  4. When to commit?
 
而 Flink 应该让 Sink 开发者提供 What to commit 和 How to commit,而系统应该根据不同的执行模式,选择 Where to commit 和 When to commit 来保证端到端的 Exactly Once。最终我们提出了一个全新 Unified Sink API,从而让开发者只开发一套 Sink 就可以同时运行在 Streaming 和 Batch 执行模式下。这里介绍的只是主要思路,在有限流的场景下如何保证 End to End 的一致性;如何对接 Hive、Iceberg 等外部生态,实际上还是存在一定挑战。


四、流批一体 DAG Scheduler


Unified DAG Scheduler 要解决什么问题


原来 Flink 有两种调度的模式:

  1. 一种是流的调度模式,在这种模式下,Scheduler 会申请到一个作业所需要的全部资源,然后同时调度这个作业的全部 Task,所有的 Task 之间采取 Pipeline 的方式进行通信。批作业也可以采取这种方式,并且在性能上也会有很大的提升。但是对于运行比较长的 Batch 作业来说来说,这种模式还是存在一定的问题:规模比较大的情况下,同时消耗的资源比较多,对于某些用户来说,他可能没有这么多的资源;容错代价比较高,例如一旦发生错误,整个作业都需要重新运行。
  2. 一种是批的调度模式。这种模式和传统的批引擎类似,所有 Task 都是可以独立申请资源,Task 之间都是通过 Batch Shuffle 进行通讯。这种方式的好处是容错代价比较小。但是这种运行方式也存在一些短板。例如,Task 之间的数据都是通过磁盘来进行交互,引发了大量的磁盘 IO。
 
总的来说,有了这两种调度方式是可以基本满足流批一体的场景需求,但是也存在着很大的改进空间,具体来说体现在三个方面:

  1. 架构不一致、维护成本高。调度的本质就是进行资源的分配,换句话说就是要解决  When to deploy which tasks to where 的问题。原有两种调度模式,在资源分配的时机和粒度上都有一定的差异,最终导致了调度架构上无法完全统一,需要开发人员维护两套逻辑。例如,流的调度模式,资源分配的粒度是整个物理执行计划的全部 Task;批的调度模式,资源分配的粒度是单个任务,当 Scheduler 拿到一个资源的时候,就需要根据作业类型走两套不同的处理逻辑;
  2. 性能。传统的批调度方式,虽然容错代价比较小,但是引入大量的磁盘 I/O,并且性能也不是最佳,无法发挥出 Flink 流式引擎的优势。实际上在资源相对充足的场景下,可以采取“流”的调度方式来运行 Batch 作业,从而避免额外的磁盘 I/O,提高作业的执行效率。尤其是在夜间,流作业可以释放出一定资源,这就为批作业按照“Streaming”的方式运行提供了可能。
  3. 自适应。目前两种调度方式的物理执行计划是静态的,静态生成物理执行计划存在调优人力成本高、资源利用率低等问题。
 

基于 Pipeline Region 的统一调度



为了既能发挥流引擎的优势,同时避免全图同时调度存在的一些短板,我们引入  Pipeline Region 的概念。Unified DAG Scheduler 允许在一个 DAG 图中,Task 之间既可以通过 Pipeline 通讯,也可以通过 Blocking 方式进行通讯。这些由 Pipeline 的数据交换方式连接的 Task 被称为一个 Pipeline Region。基于以上概念,Flink 引入 Pipeline Region 的概念,不管是流作业还是批作业,都是按照 Pipeline Region 粒度来申请资源和调度任务。细心的读者可以发现,其实原有的两种模式都是 Pipeline Region 调度的特例。


即便可以资源上满足“流”的调度模式,那么哪些任务可以采取“流”的方式调度呢?

有同学还是会担心采取“流”的调度方式容错代价会比较高,因为在“流”的调度方式下,一个 Task 发生错误,和他联通的所有 Task 都会 Fail,然后重新运行。

在 Flink 中,不同 Task 之间有两种连接方式[2],一种是 All-to-All 的连接方式,上游 Task 会和下游的所有的 Task 进行连接;一种是 PointWise 的链接方式,上游的 Task 只会和下游的部分 Task 进行连接。

如果一个作业的所有 Task 之间都是通过 All-to-All 方式进行连接,一旦采取“流”的调度方式,那么整个物理拓扑都需要同时被调度,那么确实存在  FailOver 代价比较高的问题[3]。但是在实际 Batch 作业的拓扑中,Task 之间不都是通过 All-to-All 的边连接,Batch 作业中存在的大量 Task 通过 PointWise 的边连接,通过“流”的方式调度由 PointWise 连接的 Task 连通图,在减少作业的容错成本的同时,可以提高作业的执行效率,如下图所示在,在全量的 10T TPC-DS 测试中,开启所有  PointWise 边都采用 Pipeline 的链接方式就可以让整性能有 20% 以上的性能提升。


上述只是 Schduler 提供的划分 Pipeline Region 的 4 种策略中的一种[4],实际上  Planner 可以根据实际运行场景,定制哪些 Task 之间采取 Pipeline 的传输方式,哪些  Task 之间采取 Batch 的传输方式方式。

自适应调度

 
调度的本质是给物理执行计划进行资源分配的决策过程。Pipeline Region 解决了物理执行计划确定之后,流作业和批作业可以统一按照 Pipeline Region 的粒度进行调度。对于批作业来说静态生成物理执行计划存在一些问题[5]:

  1. 配置人力成本高。对于批作业来说,虽然理论上可以根据统计信息推断出物理执行计划中每个阶段的并发度,但是由于存在大量的 UDF 或者统计信息的缺失等问题,导致静态决策结果可能会出现严重不准确的情况;为了保障业务作业的 SLA,在大促期间,业务的同学需要根据大促的流量估计,手动调整高优批作业的并发度,由于业务变化快,一旦业务逻辑发生变化,又要不断的重复这个过程。整个调优过程都需要业务的同学手动操作,人力成本比较高,即便这样也可能会出现误判的情况导致无法满足用户 SLA;
  2. 资源利用率低。由于人工配置并发度成本比较高,所以不可能对所有的作业都手动配置并发度。对于中低优先级的作业,业务同学会选取一些默认值作为并发度,但是在大多数情况下这些默认值都偏大,造成资源的浪费;而且虽然高优先级的作业可以进行手工并发配置,由于配置方式比较繁琐,所以大促过后,虽然流量已经下降但是业务方仍然会使用大促期间的配置,也造成大量的资源浪费现象;
  3. 稳定性差。资源浪费的情况最终导致资源的超额申请现象。目前大多数批作业都是采取和流作业集群混跑的方式,具体来说申请的资源都是非保障资源,一旦资源紧张或者出现机器热点,这些非保障资源都是优先被调整的对象。


为了解决静态生成物理执行存在这些问题,我们为批作业引入了自适应调度功能[6],和原来的静态物理执行计划相比,利用这个特性可以大幅提高用户资源利用率。 Adaptive Scheduler 可以根据一个 JobVertex 的上游 JobVertex 的执行情况,动态决定当前 JobVertex 的并发度。在未来,我们也可以根据上游 JobVertex 产出的数据,动态决定下游采用什么样的算子。


五、流批一体的 Shuffle 架构


Flink 是一个流批一体的平台,因此引擎对于不同的执行模式要分别提供 Streaming 和Batch 两种类型的 Shuffle。虽然 Streaming Shuffle 和 Batch Shuffle 在具体的策略上存在一定的差异,但是本质上都是为了对数据进行重新划分(re-partition),因此不同的  Shuffle 之间还存在一定的共性。所以我们的目标是提供一套统一的 Shuffle 架构,既可以满足不同 Shuffle 在策略上的定制,同时还能避免在共性需求上进行重复开发。

总体来说,Shuffle 架构可以划分成如下图所示的四个层次。流和批的 Shuffle 需求在各层上有一定差异,也有大量的共性,下边我做了一些简要分析。


流批 Shuffle 之间的差异


大家都知道,批作业和流作业对 Shuffle 的需求是有差异的,具体可以体现在如下 3 个方面:

  1. Shuffle 数据的生命周期。流作业的 Shuffle 数据和 Task 的生命周期基本是一致的;而批作业的 Shuffle 数据和 Task 生命周期是解耦的;
  2. Shuffle 数据的存储介质。因为流作业的 Shuffle 数据生命周期比较短,所以可以把流作业的 Shuffle 数据存储在内存中;而批作业的 Shuffle 数据生命周期有一定的不确定性,所以需要把批作业的 Shuffle 数据存储在磁盘中;
  3. Shuffle 部署方式[7]。把 Shuffle 服务和计算节点部署在一起,对流作业来说这种部署方式是有优势的,因为这样会减少不必要网络开销,从而减少 Latency。但对于批作业来说,这种部署方式在资源利用率、性能、稳定性上都存在一定的问题。[8]

流批 Shuffle 之间的共性


批作业和流作业的 Shuffle 有差异也有共性,共性主要体现在:

  1. 数据的 Meta 管理。所谓 Shuffle Meta 是指逻辑数据划分到数据物理位置的映射。不管是流还是批的场景,在正常情况下都需要从 Meta 中找出自己的读取或者写入数据的物理位置;在异常情况下,为了减少容错代价,通常也会对 Shuffle Meta 数据进行持久化;
  2. 数据传输。从逻辑上讲,流作业和批作业的 Shuffle 都是为了对数据进行重新划分(re-partition/re-distribution)。在分布式系统中,对数据的重新划分都涉及到跨线程、进程、机器的数据传输。

流批一体的 Shuffle 架构

 
 
Unified Shuffle 架构抽象出三个组件[9]: Shuffle Master、Shuffle Reader、Shuffle Writer。Flink通过和这三个组件交互完成算子间的数据的重新划分。通过这三个组件可以满足不同Shuffle插件在具体策略上的差异:

  1. Shuffle Master 资源申请和资源释放。也就是说插件需要通知框架 How to request/release resource。而由 Flink 来决定 When to call it;
  2. Shuffle Writer 上游的算子利用 Writer 把数据写入 Shuffle Service——Streaming Shuffle 会把数据写入内存;External/Remote Batch Shuffle 可以把数据写入到外部存储中;
  3. Shuffle Reader 下游的算子可以通过 Reader 读取 Shuffle 数据;
 
同时,我们也为流批 Shuffle 的共性——Meta 管理、数据传输、服务部署[10]——提供了架构层面的支持,从而避免对复杂组件的重复开发。高效稳定的数据传输,是分布式系统最复杂的子系统之一,例如在传输中都要解决上下游反压、数据压缩、内存零拷贝等问题,在新的架构中只要开发一遍,就可以同时在流和批两种场景下共同使用,大大减少了开发和维护的成本。


六、流批一体的容错策略

 

Flink 原有容错策略是以检查点为前提的,具体来说无论是单个 Task 出现失败还是JobMaster 失败,Flink 都会按照最近的检查点重启整个作业。虽然这种策略存在一定的优化空间,但是总的来说对于流的场景是基本是接受的。目前,Flink Batch 运行模式下不会开启检查点[11],这也意味一旦出现任何错误,整个作业都要从头执行。

虽然原有策略在理论上可以保证最终一定会产出正确的结果,但是明显大多数客户都无法接受这种容错策略所付出的代价。为了解决这些问题,我们分别对 Task 和 JM 的容错都做了相应的改进。

Pipeline Region Failover


虽然在 Batch 执行模式下没有定时的 Checkpoint,但是在 Batch 执行模式下,Flink允许 Task 之间通过 Blocking Shuffle 进行通信。对于读取 Blocking Shuffle 的 Task 发生失败之后,由于 Blocking Shuffle 中存储了这个 Task 所需要的全部数据,所以只需要重启这个 Task 以及通过 Pipeline Shuffle 与其相连的全部下游任务即可,而不需要重启整个作业。

总的来说,Pipeline Region Failover 策略和 Scheduler 在进行正常调度的时候一样,都是把一个 DAG 拆分成由若干 Pipeline shuffle 连接的一些 Pipeline Region,每当一个 Task 发生 FailOver 的时候,只会重启这个 Task 所在的 Region 即可。


JM Failover


JM 是一个作业的控制中心,包含了作业的各种执行状态。Flink 利用这些状态对任务进行调度和部署。一旦 JM 发生错误之后,这些状态将会全部丢失。如果没有这些信息,即便所有的工作节点都没有发生故障,新 JM 仍然无法继续调度原来的作业。例如,由于任务的结束信息都已丢失,一个任务结束之后,新 JM 无法判断现有的状态是否满足调度下游任务的条件——所有的输入数据都已经产生。

从上边的分析可以看出,JM Failover 的关键就是如何让一个 JM“恢复记忆”。在 VVR[12] 中我们通过基于 Operation Log 机制恢复 JM 的关键状态。


细心的同学可能已经发现了,虽然这两个改进的出发点是为了批的场景,但是实际上对于流的作业容也同样有效。上边只是简要的介绍了两种容错策略的思路,实际上还有很多值得思考的内容。例如 Blocking 上游数据丢失了我们应该如何处理?JM 中有哪些关键的状态需要恢复?
 

七、未来展望


为了提供比现在更快、更稳的用户体验,我们已经开始了下一代流式架构的研发;Flink在流批一体的场景下得到了越来越多用户的认可,但是我们也知道业界还有很多高水平传统大数据系统值得我们学习。最后我也希望感兴趣的小伙伴可以加入我们,一起打造一个具有完美用户体验的流批一体大数据计算引擎。
  
注释:

[1] Streaming 和 Batch 是两种执行模式和语义无关,Streaming 执行模式可以简单的理解为,Task 之间采用 Pipeline 的 Shuffle;Batch 执行模式可以简单的理解为,Task 之间采用 Blocking 的 Shuffle 模式。
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/runtime/jobgraph/DistributionPattern.html
[3] 我们正在开发 Adaptive 的 Shuffle 模式,利用这种模式可以避免”纯”Pipeline的方式引发的容错代价过高的问题。
[4] https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/graph/GlobalDataExchangeMode.html
[5] 对于流作业来说,静态物理执行计划也有和批类似的问题,我们提供了一个AutoPilot的系统来动态修改物理执行计划。由于AutoPilot 属于独立的服务,不属于执行引擎这里就不展开赘述了。
[6] 由于时间规划的原因,这个功能暂时只存在我们的商业化版本的执行引擎VVR中
[7] 在一些情况下,批Shuffle Service也会和计算节点部署在一起。例如,在Flink Session的模式下,虽然Shuffle Service和计算部署在一起有一定的稳定性代价,但是对部分用户来说这种部署模式是在成本和稳定性之间权衡之下的一个结果。所以在一定程度上,流批Shuffle在部署方面也是有共性的,只是不是完全相同而已。
[8] 把批作业的计算和Shuffle部署在一个节点内所存在的问题:资源利用率低、成本高。如果没有计算任务继续部署该节点上那么这个节点上计算资源就会被浪费掉,计算资源提早释放也会节省用户成本;性能无法达到最优 由于一个节点只能看到部分的Shuffle数据,因此一个Reduce需要从n个节点上拉取自己的数据,这会引发大量的随机IO读,这样大量的随机读IO会大大降低作业性能;稳定性 一旦结点挂掉,整个节点所负责的Shuffle数据就会丢失,然后就会触发作业重新计算,这种重新计算的代价都是比较高的。(Task中包含用户代码,所以此种结点down掉的概率会大于存储计算分离情况下的Shuffle结点。)
[9] 由于历史原因,大家在读Flink代码的时候看到不是Reader和Writer,而是ResultPartion/InputGate。这里用Reader和Writer是为了降低刚接触Flink的同学的理解门槛。
[10] 部署为什么也算共性,可以参考[7]。
[11] 虽然理论上批作业可以支持检查点,但是在批的场景下,开启原生的流式Checkpoint成本是比较高的的。当然这也不是完全排除未来可能会发现比较合适场景;
[12] VVR是Flink商业产品的执行引擎,由于时间规划的原因,这个功能暂时还没有回馈给Flink社区

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存