Apache Hudi 0.14.0 现已发布。公告指出,该版本标志着一个重要的里程碑,具有一系列新功能和增强功能。其中包括引入Record Level Index、自动生成记录键 、用于增量读取的 hudi_table_changes
函数等等。
值得注意的是,此版本还包含对 Spark 3.4 的支持。在 Flink 方面,0.14.0 版本带来了一些令人兴奋的功能,例如一致哈希索引支持、支持Flink 1.17 以及支持更新和删除语句。此外此版本还升级了Hudi表版本,提示用户查阅下面提供的迁移指南。我们鼓励用户在采用 0.14.0 版本之前查看重大特性、重大变化和行为变更。
迁移指南
在 0.14.0 版本进行了一些更改,例如从 ".aux" 文件夹中删除压缩计划以及引入新的日志块版本。作为此版本的一部分,表版本更新到版本 6。在具有旧表版本的表上运行版本 0.14.0 的 Hudi 作业时,会触发自动升级过程以将表升级到版本 6。升级对于每个 Hudi 表都是一次性的,因为升级完成后 hoodie.table.version
会在属性文件中更新。此外还包括用于降级的命令行工具,允许用户从表版本 6 降级到 5,或从 Hudi 0.14.0 恢复到 0.14.0 之前的版本。请从 0.14.0 环境使用此工具。有关更多详细信息,请参阅 hudi-cli。
注意:如果从旧版本(0.14.0 之前)迁移,建议按顺序检查每个旧版本的升级说明。
Bundle包更新
新 Spark Bundle包
在此版本扩展了支持范围,包括 Spark 3.4 (hudi-spark3.4-bundle_2.12) 和 Spark 3.0 (hudi-spark3.0-bundle_2.12) 的Bundle包。请注意,在 Hudi 0.10.1 版本之后,对 Spark 3.0 的支持已停止,但由于社区的强烈兴趣,在此版本中恢复了对 Spark 3.0 的支持。
重大变化
Spark SQL INSERT INTO 行为
在 0.14.0 版本之前,Spark SQL 中通过 INSERT INTO 摄取的数据遵循 upsert 流程,其中多个版本的记录将合并为一个版本。但是从 0.14.0 开始更改了 INSERT INTO 的默认行为,默认行为更改为insert
流。此更改显着提高了写入性能,因为它绕过了索引查找。
如果使用 preCombine 键创建表,则 INSERT INTO 的默认操作仍为 upsert
。相反如果没有设置preCombine 键,则INSERT INTO的底层写操作默认为 insert
。用户可以根据自己的要求显式设置配置 hoodie.spark.sql.insert.into.operation
的值来灵活地覆盖此行为。此配置的可能值包括 insert
、bulk_insert
和 upsert
。
此外在 0.14.0 版本中弃用了两个相关的旧配置
-
hoodie.sql.insert.mode
-
hoodie.sql.bulk.insert.enable
行为变更
使用 Spark SQL Inserts简化重复处理
如果操作类型配置为 Spark SQL INSERT INTO 流的插入,用户现在可以选择使用配置设置 hoodie.datasource.insert.dup.policy 强制执行重复策略。此策略确定当正在摄取的传入记录已存在于存储中时采取的操作。此配置的可用值如下:
-
none:不采取任何特定操作,如果传入记录包含重复项,则允许 Hudi 表中存在重复项。
-
drop:传入写入中的匹配记录将被删除,其余记录将被摄取。
-
fail:如果重新摄取相同的记录,写入操作将失败。本质上由键生成策略确定的给定记录只能被摄取到目标表中一次。
通过添加此配置,旧的相关配置 hoodie.datasource.write.insert.drop.duplicates
现已弃用。当两者都指定时,新配置将优先于旧配置。如果未提供特定配置,则将采用较新配置的默认值。强烈鼓励用户迁移到使用这些较新的配置。
MOR 表Compaction
对于 Spark 批写入器(Spark Datasource和 Spark SQL),默认情况下会自动为 MOR(读取时合并)表启用压缩,除非用户显式覆盖此行为。用户可以选择通过将 hoodie.compact.inline
设置为 false
显式禁用压缩。如果用户不覆盖此配置,大约每 5 个增量提交(hoodie.compact.inline.max.delta.commits
的默认值)会触发 MOR 表的压缩。
HoodieDeltaStreamer 更名为 HoodieStreamer
从版本 0.14.0 开始将 HoodieDeltaStreamer 重命名为 HoodieStreamer。同时确保了向后兼容性,以便现有的用户作业不受影响。但是在即将发布的版本中可能会停止对 Deltastreamer 的支持。因此强烈建议用户改用 HoodieStreamer。
MERGE INTO JOIN CONDITION
从0.14.0版本开始,当用户没有提供明确的规范时,Hudi能够自动生成主记录键。此增强功能使 MERGE INTO JOIN
子句能够引用 Hudi 表中连接条件的任何数据列,其中主键由 Hudi 本身生成。但是在用户配置主记录键的情况下,连接条件仍然需要用户指定的主键字段。
重大特性
Record Level Index
Hudi 版本 0.14.0 引入了新的索引实现 - Record Level Index。记录级索引通过有效存储每条记录的位置并在索引查找操作期间实现快速检索,显着增强了大型表的写入性能。它可以有效地替代 Hudi 中常用的其他全局索引,例如 Global_bloom、Global_Simple 或 Hbase。
由于在查找过程中从各种数据文件收集索引数据的成本很高,布隆索引和简单索引对于大型数据集表现出较低的性能。而且,这些索引不保留一对一的记录键来记录文件路径映射;相反,他们在查找时通过优化搜索来推断映射。这些索引所需的每个文件的开销使得它们对于具有大量文件或记录的数据集效率较低。
另一方面,Hbase 索引为每个记录键保存一对一的映射,从而实现随数据集大小扩展的快速性能。然而,它需要一个单独的 HBase 集群来进行维护,这在操作上具有挑战性且资源密集型,需要专门的专业知识。
记录索引结合了 HBase 索引的速度和可扩展性,而没有其限制和开销。作为 HUDI 元数据表的一部分,未来写入和查询方面的任何性能增强都将自动转化为记录索引性能的改进。采用记录级索引有可能将索引查找性能提高 4 到 10 倍,具体取决于工作负载,即使对于超大规模数据集(例如 1TB)也是如此。
通过记录级别索引,可以观察到大型数据集的显着性能改进,因为延迟与摄取的数据量成正比。这与其他全局索引形成鲜明对比,其中索引查找时间随着表大小线性增加。记录级索引专门设计用于有效处理此类大规模数据的查找,而查找时间不会随着表大小的增长而线性增加。
为了利用这种快速的索引的优势,用户需要启用两种配置:
-
必须启用
hoodie.metadata.record.index.enable
才能将记录级别索引写入元数据表。 -
hoodie.index.type
需要设置为RECORD_INDEX
以便索引查找利用记录级别索引。
支持 Hudi 表自动生成键
从Hudi最初的正式版本开始,主键是用户需要为任何Hudi表配置的必填字段。从 0.14.0 开始,我们放宽了这一限制。此增强功能解决了社区内的长期需求,其中某些用例不具有主键。版本 0.14.0 现在为用户提供了创建 Hudi 表的灵活性,而无需显式配置主键(通过省略配置设置 - hoodie.datasource.write.recordkey.field
)。这种情况下Hudi 将自动生成主键。此功能仅适用于新表,不能更改现有表。
所有 Spark 写入器都提供此功能,但有一定限制。对于仅追加类型的用例,如下四个写入器都允许插入和批量插入 - Spark Datasource、Spark SQL、Spark Streaming、Hoodie Streamer。仅使用 Spark SQL MERGE INTO 、 UPDATE 和 DELETE 语句支持更新和删除。对于 Spark Datasource,仅当DataFrame包含 Hudi 的元字段时才支持 UPDATE 和 DELETE。请查看快速入门指南,了解有关自动生成键的 Hudi 表 CRUD 操作的代码片段。
Spark 3.4版本支持
添加Spark 3.4支持, Spark 3.4 的用户可以使用 hudi-spark3.4-bundle。Spark 3.2、Spark 3.1、Spark3.0 和 Spark 2.4 将继续受支持。请检查迁移指南以获取Bundle包更新。可以浏览快速入门指南快速开始使用 Hudi 和 Spark 3.4。
查询端改进
Athena 的元数据表支持
用户现在可以与 Athena 无缝地利用 Hudi 的元数据表。文件列表索引通过从维护分区到文件映射的索引检索信息,消除了对递归文件系统调用(如“列表文件”)的需要。事实证明这种方法非常高效,尤其是在处理大量数据集时。使用 Hudi 0.14.0,用户可以在为其 Hudi 表执行 Glue 目录同步时激活基于元数据表的文件列表。要启用此功能,用户可以配置 hoodie.datasource.meta.sync.glue.metadata_file_listing
并在 Glue 同步过程中将其设置为 true。
查询利用 Parquet 布隆过滤器
在 Hudi 0.14.0 中,用户现在可以使用原生 Parquet 布隆过滤器,前提是他们的计算引擎支持 Apache Parquet 1.12.0 或更高版本。这种支持涵盖了数据集的写入和读取。Hudi 通过 Hadoop 配置方便使用原生 Parquet 布隆过滤器。用户需要使用代表要应用布隆过滤器的列的特定键来设置 Hadoop 配置。例如, parquet.bloom.filter.enabled#rider=true
为 rider 列创建布隆过滤器。每当查询涉及 rider 列上的谓词时,布隆过滤器就会发挥作用,从而增强读取性能。
多写入器的增量查询
在多写入器场景中,由于并发写入活动,时间线中可能会出现间隙(requested或inflight时刻不是最新时刻)。在执行增量查询时,这些间隙可能会导致结果不一致。为了解决这个问题,Hudi 0.14.0 引入了一个新的配置设置 hoodie.read.timeline.holes.resolution.policy
,专门用于处理增量查询中的这些不一致问题。该配置提供了三种可能的策略:
-
FAIL:这是默认策略,当增量查询期间发现此类时间线间隙时,会引发异常。
-
BLOCK:在此策略中,增量查询的结果仅限于时间线中空洞之间的时间范围。例如,如果在 t0 到 t2 的增量查询范围内,在 t1 时刻检测到间隙,则查询将仅显示 t0 到 t1 之间的结果,而不会失败。
-
USE_TRANSITION_TIME:此策略是实验性的,涉及在增量查询期间使用状态转换时间,该时间基于时间线中提交元数据文件的文件修改时间。
Hive 3.x 的Timestamp类型支持
相当长一段时间以来,Hudi 用户在读取 Spark 的 Timestamp 类型列以及随后尝试使用 Hive 3.x 读取它们时遇到了挑战。在 Hudi 0.13.x 中,我们引入了一种解决方法来缓解此问题,0.14.0 版本现在确保 HiveAvroSerializer 与 Hive 3.x 完全兼容以解决此问题。
Google BigQuery 同步增强功能
在 0.14.0 中,BigQuerySyncTool 支持使用清单将表同步到 BigQuery。与传统方式相比,这预计将具有更好的查询性能。模式演进由清单方法支持。由于新的 schema 处理改进,不再需要从文件中删除分区列。要启用此功能,用户可以将 hoodie.gcp.bigquery.sync.use_bq_manifest_file
设置为 true
。
Spark 读取端改进
MOR Bootstrap 表的快照读取支持
在 0.14.0 中,为引导表添加了 MOR 快照读取支持。默认行为已通过多种方式进行了更改,以匹配非引导 MOR 表的行为。快照读取现在将成为默认读取模式。使用 hoodie.datasource.query.type=read_optimized
进行读取优化查询,这是以前的默认行为。此类表的 Hive 同步将导致表名带有 _ro 和 _rt 后缀,分别表示读取优化和快照读取。
用于增量读取的表值函数 hudi_table_changes
Hudi 已经提供了使用增量查询类型获取自给定提交时间戳以来更改的记录流的功能。在 Hudi 0.14.0 中,我们添加了一种新的、更简单的方法,使用名为 hudi_table_changes 的表值函数来获取 Hudi 数据集的最新状态或更改流。以下是有关如何使用此函数的语法和一些示例。
SYNTAX
hudi_table_changes(table, queryType, beginTime [, endTime]);
-- table: table identifier, example: db.tableName, tableName, or path for the table, example: hdfs://path/to/hudiTable.
-- queryType: incremental query mode, valid values: latest_state, cdc
(for cdc query, first enable cdc for the table by setting cdc.enabled=true),
-- beginTime: instantTime to begin query from, example: earliest, 202305150000,
-- endTime: optional instantTime to end query at, example: 202305160000,
EXAMPLES
-- incrementally query data by table name
-- start from earliest available commit, end at latest available commit.
SELECT * FROM hudi_table_changes('db.table', 'latest_state', 'earliest');
-- start from earliest, end at 202305160000.
SELECT * FROM hudi_table_changes('table', 'latest_state', 'earliest', '202305160000');
-- incrementally query data by path
-- start from earliest available commit, end at latest available commit.
SELECT * FROM hudi_table_changes('path/to/table', 'cdc', 'earliest');
查看快速入门以获取更多示例。
Spark 中新的 MOR 文件格式读取器
基于 RFC-72 旨在重新设计 Hudi-Spark 集成的提案,我们引入了用于 MOR(读取合并)表的实验性文件格式读取器。与旧文件格式相比,该读取器预计可将读取延迟显着降低 20% 至 40%,特别是对于快照和引导查询。目标是使延迟更接近 COW(写入时复制)文件格式的延迟。要利用这种新文件格式,用户需要设置 hoodie.datasource.read.use.new.parquet.file.format=true
。值得注意的是,此功能仍处于实验阶段,并且存在一些限制。有关更多详细信息以及有兴趣做出贡献,请参阅 HUDI-6568。
Spark 写入端改进
Bulk_Insert 和行写入器增强
0.14.0 版本支持在执行 INSERT OVERWRITE TABLE 和 INSERT OVERWRITE PARTITION 等 SQL 操作时使用批量插入操作。要启用批量插入,请将配置 hoodie.spark.sql.insert.into.operation
设置为值bulk_insert
。与插入操作相比,批量插入具有更好的写入性能。另外简单存储桶索引也支持了行写入器。
Hoodie DeltaStreamer增强
动态配置更新
当 Hoodie Streamer 以连续模式运行时,可以在每次同步调用之前刷新/更新属性。有兴趣的用户可以实现 org.apache.hudi.utilities.deltastreamer.ConfigurationHotUpdateStrategy
来利用它。
HoodieStreamer 基于 SQL 文件的源
HoodieStreamer 中添加了一个新源 - SqlFileBasedSource,旨在促进一次性回填场景。
Flink 增强功能
以下是 0.14.0 版本中基于 Flink Engine 的增强功能。
一致的哈希索引支持
与静态哈希索引(BUCKET索引)相比,一致性哈希索引为写入者提供了数据桶的动态可扩展性。要利用此功能,请将选项index.type
配置为BUCKET并将hoodie.index.bucket.engine
设置为CONSISTENT_HASHING
。
启用一致性哈希索引时,在写入器中激活异步 Clustering 调度非常重要。Clustering计划应通过离线作业执行。在此过程中,写入器将在Clustering Pending时对新旧数据桶执行双重写入。虽然双写不会影响正确性,但强烈建议尽快执行Clustering。
用于流式读取的动态分区修剪
在 0.14.0 之前,当查询具有恒定日期时间过滤的谓词时,Flink 流式读取器无法正确修剪日期时间分区。自此版本以来,Flink 流式查询已得到修复,以支持任何过滤谓词模式,包括但不限于日期时间过滤。
简单桶索引表查询加速(带索引字段)
对于一个简单的桶索引表,如果查询对索引键字段采用等式过滤谓词,Flink引擎会优化规划,只包含来自非常特定数据桶的源数据文件;此类查询预计平均性能将提高近 hoodie.bucket.index.num.buckets
倍。
Flink 1.17 支持
Flink 1.17 支持新的编译 maven 配置文件 flink1.17,在 Flink Hudi Bundle包 jar 的编译 cmd 中添加配置文件 -Pflink1.17 以启用与 Flink 1.17 的集成。
Flink 更新删除语句
自此版本以来,UPDATE 和 DELETE 语句已集成用于批量查询。当前只有定义主键的表可以正确处理该语句。
UPDATE hudi_table SET ... WHERE ...
DELETE FROM hudi_table WHERE ...
EXAMPLES
-- update the specific records with constant age
UPDATE hudi_table SET age=19 WHERE UUID in ('id1', 'id2');
-- delete all the records that with age greater than 23
DELETE FROM hudi_table WHERE age > 23;
Java 增强功能
Java 引擎已扩展支持许多写操作,使其与其他引擎保持一致。例如 Java Engine 0.14.0 中添加了压缩、Clustering和元数据表支持。
已知回退
在Hudi 0.14.0中,当查询使用ComplexKeyGenerator或CustomKeyGenerator的表时,分区值以字符串形式返回。请注意,存储上没有类型更改,即分区字段以存储上的用户定义类型写入。这对于上述键生成器来说是一个重大变化,将在 0.14.1 中修复。