联系hashgameCONTACT hashgame
地址:广东省广州市
手机:13988889999
电话:020-88889999
邮箱:admin@qq.com
查看更多
Rhashgamehashgame
你的位置: 首页 > hashgame > hashgames

HASH GAME - Online Skill Game ET 300Flink批处理自适应执行计划优化

发布时间:2025-04-07 19:59:35  点击量:

  HASH GAME - Online Skill Game GET 300

HASH GAME - Online Skill Game GET 300Flink批处理自适应执行计划优化

  本文整理自阿里集团高级开发工程师孙夏在Flink Forward Asia 2024的分享,聚焦Flink自适应逻辑执行计划与Join算子优化。内容涵盖自适应批处理调度器、动态逻辑执行计划、自适应Broadcast Hash Join及Join倾斜优化等技术细节,并展望未来改进方向,如支持更多场景和智能优化策略。文章还介绍了Flink UI调整及性能优化措施,为批处理任务提供更高效、灵活的解决方案。

  首先,让我们先来简单回顾下目前Flink支持了哪些执行计划。左边是一段简单的用户 SQL,这段 SQL 从提交到运行一般会经历几个过程。首先 Flink Table Planner 会对它进行编译优化,生成一张 StreamGraph,即逻辑执行计划,这是最接近用户逻辑的 DAG 表达,它描述了作业的执行拓扑,以及各个算子的计算逻辑,包括算子间的连边方式。Flink 接下来会在 StreamGraph 的基础之上对它再进行一次优化,在这一步当中它会将并发相同且连边方式为Forward的算子chain到一起生成新的 JobVertex 节点,以避免额外的网络开销以及序列化、反序化开销,到此为止 StreamGraph 和 JobGraph 的生成过程都是在编译期确定的,接下 Flink Client 就会将 JobGraph 提交给 JobMaster。JobMaster 会根据节点并行度、数据传输方式等信息对JobGraph进行分布式展开,生成物理执行计划,即 ExecutionGraph。

  由于传统的批处理作业是遵循 All Edges Blocking 的方式进行调度的,意味着在下游节点在被调度之前,它的所有上游应该是已经产出的状态,Runtime就能够获取到上游完整的数据分布,以及数据分区的大小。目前 Flink 自适应批调度器已经可以根据这些运行时信息来自动推断下游节点的并发度,也能够在一定程度上支持均衡数据分发的能力,可以尽可能得让每一个并发 Task 都能够消费到相近的数据量。

  同时我们也保留了查看全局信息的能力,UI上会提供Show Pending Operators的开关,打开开关之后就可以把还在 Pending 状态的 StreamNodes 也展示到UI拓扑当中,帮助用户掌握全局的拓扑信息。此外,为了在 UI 上和已经生成的 JobVertex 节点区分,未被调度的StreamNodes 节点在视觉上会相对更小。同时它的图形以及连边会变成虚线、自适应 Join 节点优化

  第一部分是 Table Planner 阶段,在这一阶段的主要工作是将符合条件的Join算子识别并标记为 AdaptiveJoin 算子。Flink Table Planner 首先会解析SQL语句,生成一颗抽象语法数,产出Logical RelNode Tree。之后, Planner通过一系列Cost-Based或Rule-Based优化策略对它进一步优化,生成Physical RelNode Tree。接着,Physical RelNode Tree会被转换成一个ExecNode Graph,在这个图上,Flink会做一些更偏向于执行层的优化,如算子的串联优化(Multiple-Input)以及动态分区裁剪(Dynamic Partition Pruning)等,而AdaptiveJoin算子就是在这一阶段进行识别和替换的。之后,Table Planner会继续将图转化为一系列的Transformation,并最终转换为一张StreamGraph。其中有一点需要补充说明下,选择在ExecNode Graph阶段中引入Adaptive Join优化的原因是,该阶段逻辑执行计划已接近成型,在这一步进行算子注入不会破坏 Table层一些既有的约定。同时,在这一层我们可以规避掉一些不支持优化的场景:例如,出于收益和实现复杂度的考虑,现阶段我们暂时还不支持对Multiple-Input节点中的Join算子进行优化; 其次,对于存在相同Hash Key的连续Hash Shuffle场景,由于Forward、Hash Partitioner的转换可能会带来正确性和性能问题,在这种情况下我们也会避免替换Join算子。

  我们先看一下上图的案例:这是一个简单的带Join算子的作业,特殊之处在于它的右表在Join之前有一个Filter算子。如果用静态Broadcast Hash Join优化策略对它进行优化,我们只能通过统计信息得到右表的原始数据量,由于表的大小15MB大于Broadcast的阈值10MB,因此它不会在编译期被优化为Broadcast Hash Join,但是在实际运行的时候,经过Filter算子的过滤,它所在的stage产出的实际数据量只有5MB,又满足了Broadcast的阈值,但是在原有的架构下,由于逻辑拓扑无法被修改,因此它还是会按照SortMergeJoin的方式去执行。

  如果发现了AdaptiveJoin算子,那么它将通过2个条件来判断它是否能够被优化为Broadcast Hash Join。第一个条件是 检查产出的数据量大小是否小于Broadcast的阈值。第二个条件是判断当前输入端是否能够被Broadcast。对于不同的Join方式存在不同的约束条件,如果是Inner Join,那么它的任意一边都可以被Broadcast,而如果是Left Outer Join,那么只有它的右边可以被Broadcast,否则会产生数据正确性问题,比如可能会导致数据的重复。

  如果AdaptiveJoin算子两个条件都满足,那么优化器会对它采取优化手段。我们会重新确定Join的Input顺序,以及Hash Join的Build端和Probe端。 其次,输入端的数据分发方式也需要改变,需要将小表端的数据分发方式更改为Broadcast Partitioner,大表端更改为Forward Partitioner。 最后,我们会在Runtime动态地生成OperatorFactory,确定最终的Join逻辑。

  不过上述方式生成的Broadcast Join在性能上相比于普通的 Broadcast Join 是有回退的,主要在于两方面:(1)普通 Broadcast Join 在编译阶段可以把大表端和 Join节点chain在一起的,可以避免一次完整的网络shuffle,而Adaptive Broadcast Join在被转换时它的上游可能已经开始执行或已经结束,因此我们无法将两者chain在一起。(2)对于小表侧的数据传输方式,普通Broadcast Join小表侧产出的数据分区是完整的Broadcast分区。而Adaptive Broadcast Join的小表侧是由Hash Partitioner转换而来的,这就会导致下游在读取数据分区的时候,会有多次的小文件的读取开销,这也会造成性能的回退。

  第一种关联关系为IntraInputCorrelation,即内部关联,它表示一个 Input 内的单个 Key Group 的数据必须在同一个下游并发实例中进行处理,也就是一个 Input 的 Key Group 是不可拆分的。具体到 Join 场景下,存在 IntraInputCorrelation 约束的输入端数据的 Key Group 是不可拆分的,如Left Outer Join的右表端。

  第二种关联关系是 InterInputCorrelation,即外部关联。具体的含义是多个 Input 之间如果存在此关联关系,它们的相同 Key Group 的数据需要在同一个下游并发实例中进行处理。因此在对其中某一个 Input 的 Key Group 数据做拆分时,其对应的另一端的 Key Group 数据必须要进行复制,以保证正确性语义。对于 Join场景,由于其必定存在2个上游,因此外部关联始终为True。

  第二类情况是Left Outer Join / Right Outer Join,即输入的上游中某一端可以被拆分。上图的Case中我们可以发现上侧的输入可以被拆分,因此我们就可以在 Key Group 的划分阶段将存在数据倾斜的紫色分区进行切分,而另一端的数据保持原样。最后,对两个数据分区做笛卡尔积,就可以把原来一组存在数据倾斜的 Key Group 生成两组数据更均匀的 Key Group,分别发往不同的下游。

  新的数据划分算法除了对 ALL-To-ALL 的数据传输方式做了优化,在 Point-Wise 场景也进行了优化。原有的 Point-Wise 划分算法,只能将数据分区划分为一组连续的 Partitions,或者是同一Partition内连续的 Subpartitions,上图 Case 中展示了 Rescale的数据传输方式,它上游有三个 Partition,下游有两个并发实例,由于存在奇数比例的分配,所以下游的并发实例当中,其中一个实例势必会比另一个并发实例多消费一份分区的数据,这样就会引入天然的不平等,导致数据的倾斜。

  支持更多的场景,例如 Multiple-Input 的场景。 目前Flink在Table阶段会执行算子串联优化,将多个输入的节点进行合并优化生成新的Multiple-Input算子,来避免额外的Shuffle开销,不过由于Multiple-Input的优化在ExecNode Graph阶段就已经定型了,在StreamGraph中无法再对Multiple-Input中的算子进行拆解,相对应的解决方式是将该优化像Operator Chain一样也延迟到Runtime来执行,以此覆盖更多的Join场景。

  最后是提供更智能的自适应优化策略,例如:对于自适应 Broadcast Hash Join,由于目前的实现仍然会引入大表侧和Join节点的网络传输开销,因此一种可能的优化方式是我们在统计信息完备的前提下,能够提前预判小表,然后在运行时 Pending 大表的执行,等小表执行完再做决策,这时候由于大表还未被调度,因此我们在Runtime阶段也可以将大表侧和Join节点Chain到一起,来减少一次Shuffle。

  本文整理自阿里云高级技术专家胡一博老师在Flink Forward Asia 2024数据集成(二)专场的分享,主要内容包括:1. Hologres介绍:实时数据仓库,支持毫秒级写入和高QPS查询;2. 写入优化:通过改进缓冲队列、连接池和COPY模式提高吞吐量和降低延迟;3. 消费优化:优化离线场景和分区表的消费逻辑,提升性能和资源利用率;4. 未来展望:进一步简化用户操作,支持更多DDL操作及全增量消费。Hologres 3.0全新升级为一体化实时湖仓平台,提供多项新功能并降低使用成本。

  【8月更文挑战第24天】Apache Flink 是一款开源框架,擅长流处理与批处理。流处理专攻实时数据流,支持无限数据流及事件驱动应用,实现数据的连续输入与实时处理。批处理则聚焦于静态数据集,进行一次性处理。两者差异体现在处理方式与应用场景:流处理适合实时性要求高的场景(例如实时监控),而批处理更适用于离线数据分析任务(如数据挖掘)。通过提供的示例代码,读者可以直观理解两种模式的不同之处及其实际应用。

  实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。

  实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。

  本文整理自阿里云高级开发工程师阮航在Flink Forward Asia 2024的分享,重点介绍了Flink CDC与实时计算Flink的集成、CDC YAML的核心功能及应用场景。主要内容包括:Flink CDC的发展及其在流批数据处理中的作用;CDC YAML支持的同步链路、Transform和Route功能、丰富的监控指标;典型应用场景如整库同步、Binlog原始数据同步、分库分表同步等;并通过两个Demo展示了MySQL整库同步到Paimon和Binlog同步到Kafka的过程。最后,介绍了未来规划,如脏数据处理、数据限流及扩展数据源支持。

【返回列表页】

顶部

地址:广东省广州市  电话:020-88889999 手机:13988889999
Copyright © 2018-2025 哈希游戏(hash game)官方网站 版权所有 非商用版本 ICP备案编: