type
status
date
slug
summary
tags
category
icon
password
一、数据表设计规范
1、表模型设计规范
(1)Hudi 表必须设置合理的主键
Hudi 表提供了数据更新的能力和幂等写入的能力,该能力要求数据记录必须设置主键用来识别重复数据和更新操作,主键设置不合理会导致数据重复。主键不能有 null 值和空值,可以参考如下三种方式的示例设置主键:
(2)Hudi 表必须配置 Precombine 字段
在数据同步过程中不可避免会出现重复主键数据再次写入的问题,例如:异常数据恢复、写入程序异常重启等,都会导致数据写入出现乱序。通过设置合理 precombine 字段值可以保证数据的准确性,老数据不会覆盖新数据,也就是幂等写入能力。该字段可用选择的类型包括:业务表中更新时间戳、数据库的提交时间戳等。precombine 字段不能有 null 值和空值,可以参考一下示例设置 precombine 字段:
(3)流式计算采用 MOR 表
流式计算是为低时延的实时计算能力,需要高性能的流式读写能力,在 Hudi 表中存在的 MOR 和 COW 两种模型中,MOR 表的流式读写性能最好,因此在流式计算场景下采用 MOR 表模型。关于 MOR 表在读写性能的对比关系如下:
对比维度 | MOR表 | COW表 |
流式写 | 高 | 低 |
流式读 | 高 | 低 |
批量写 | 高 | 低 |
批量读 | 低 | 高 |
(4)实时入湖,表模型采用 MOR 表
实时入湖一般的性能要求都是分钟内或者分钟级,再基于 Hudi 模型表的对比,因此在实时入湖场景中需要选择 MOR 模型
(5)Hudi 表名以及列明采用小写字母
在跨引擎读写同一张 Hudi 表,为了规避引擎之间大小写的支持的不同,统一采用小写字母
(6)批量计算中每批次写入数据量与表总量差距不大,采用COW表
COW 表模型中,写入数据存在写放大问题,会将历史数据和新增数据合并重新写入新文件。如果写入的数据量与表总量差距不大,也就不存在写放大的问题了,而且批量计算对时延不是很敏感,因此可以采用 COW 表。
(7)Hudi 表的写任务要配置向 hive 元数据服务同步
如果通过 SparkSql 进行先建表再 Insert 数据,表的元数据天然会存在于 Hive 元数据服务中。该条建议针对的是通过 spark datasource API / flink 写 Hudi 表,通过这两种方式写数据时需要增加向 Hive 元数据服务的配置项。目的是元数据统一托管到 Hive 元数据服务中,为后续的跨引擎操作数据以及数据管理提供便利。
2、 索引设计规范
(1)(1)不要修改表索引类型
Hudi 表的索引决定数据存储方式,随意修改索引类型会导致表中已有存量数据与新增数据之间出现数据重复和准确性问题。常见的索引类型如下:
- 布隆索引:Spark 引擎独有索引,采用 bloomfiter 机制,将布隆索引内容写入到 Parquet 文件的 footer 中
- Bucket 索引:在写入数据过程中,通过主键进行 Hash 计算,将数据进行分组,每个组的主键集合是相对固定的
- 状态索引:Flink 特有索引,将行记录的存储位置记录到状态后端,在作业冷启动过程中会遍历所有数据存储文件生成索引信息
(2)采用 Flink_state 索引,flink 写入后,不支持 Spark 继续写入
Flink 在写 Hudi 的 MOR 表时先写入 Log 文件,然后通过 compaction 操作将 log 文件转为 parquet 文件。在 log 文件转 parquet 文件过程中会生成布隆索引信息,Spark 写入是依据布隆索引保证数据唯一性的。因此,当 Flink 已经采用 flink_state 索引写入数据,log 文件中就会存在未转为 parquet 文件的数据,这部分数据没有布隆索引,此时使用 Spark 写入会导致重复数据产生。在批量初始化阶段,先用 Spark 批量写入 Hudi 表,再用 Flink 基于 Flink_state 索引写入不会有问题,原因是 Flink 冷启动时会遍历所有数据文件生成状态索引。
(3)实时入湖场景中,Spark 引擎采用 Bucket 索引,Flink 可以用 Bucket 或者 Flink_state 索引
实时入湖需要分钟内或分钟级的高性能入湖,索引选择会影响写 Hudi 表的性能。各索引在性能方面的区别如下:
- Bucket 索引
优点:写入过程中对主键进行 hash 计算然后分组,性能高,不受表数据量限制。Flink 和 Spark 引擎都支持,可以实现交叉混写同一张表。
缺点:Bucket 个数不能动态调整,数据量波动和整表数据量持续上涨会导致单个 Bucket 数据量过大,出现大数据文件。需要结合分区表来进行平衡改善。
- Flink_state 索引
优点:主键的存储信息存在状态后端,新增数据定位写入位置性能高。不会因流量波动出现大数据文件。
缺点:该索引为 Flink 特有索引。表的总数据行数达到数亿级别时,需要优化状态后端参数来保持写入性能。使用该索引无法支持 Flink 和 Spark 交叉混写。
(4)对于数据总量持续上涨的表,采用 Bucket 索引,必须使用时间分区表,分区键采用数据创建时间
如之前规则所描述,Bucket 索引在建表时需要设定 bucket 个数,当整表数据量持续上涨会导致单个 Bucket 数据量过大,例如:单个 bucket 桶的 Parquet 文件超过 256MB。针对这类表,需要结合日期分区表使用。可以按照天、月、年等不同粒度创建分区表,因为不同日期的数据量差异相对可控,可以按照预计的分区最大数据量来设置分区内的 Bucket 个数。
分区粒度可以按照"写入桶数最小"原则设计。例如:按天设置分区,每个分区下有 5 个 Bucket,每天写入的数据中有大量更新,更新数据范围是近一个月,这样最大可能写入的桶数是 30*5 = 150 个桶。如果按月分区,单个月的分桶个数会小于 150,原因是我们会根据日期波动设置一定的余量,导致 sum(日分区桶数) > 月分区桶数
(5)基于 Flink 的流式写入表,当数据量超过 2 亿条记录时应采用 Bucket 索引;2 亿条以内可采用 Flink_state 索引
根据 Flink_state 索引的特点,当数据记录数超过一定量后,状态数据会变得相当大,需要优化状态后端配置参数。此外,冷启动时需要遍历全表数据,导致启动速度变慢。从简化使用角度考虑,对于大数据量表,可以采用 Bucket 索引来减少优化参数配置的复杂度。
如果 Bucket 索引配合分区表仍无法解决 Bucket 桶过大的问题,可以继续使用 Flink_state 索引,但需按照规范优化相应配置参数。
(6)基于 Bucket 索引的表,按照单个 Bucket 2GB 数据量进行设计
为了避免单个 Bucket 过大,建议单个 Bucket 的数据量不超过 2GB(该 2GB 指数据内容大小,而非数据行数或 parquet 文件大小)。目的是将对应桶的 Parquet 文件大小控制在 256MB 范围内,从而平衡读写内存消耗和 HDFS 存储利用率。这个 2GB 限制是一个经验值,因为不同业务数据经过列存压缩后大小各异。以下解释为何建议使用 2GB:
- 2GB 数据存储为列存 Parquet 文件后,数据文件大小约为 150MB~256MB。不同业务数据会有差异。而 HDFS 单个数据块通常为 128MB,这样可以有效利用存储空间。
- 数据读写占用的内存空间等同于原始数据大小(包括空值也会占用内存),2GB 在大数据计算过程中通常是单个 task 配置可接受的范围。若单个 Bucket 的数据量超过此范围,可能带来以下影响:
- 读写任务可能会出现 OOM 问题,解决方法是提升单个 task 的内存占比。
- 写性能下降,因为单个 task 处理的数据量变大,导致处理耗时增加。
3、分区设计规范
(1)分区键不可以被更新
Hudi 具有主键唯一性保证机制,在分区表的场景下通常采用分区内唯一性保证,因此如果分区键的值发生变更,会导致相同主键的行记录出现多条的情况。以日期字段为分区键时,应采用数据的创建时间作为分区字段,切记不要使用数据更新时间
(2)事实表采用日期分区表,维度表采用非分区或大颗粒度的日期分区表
从表的使用属性看,事实表和维度表具有以下特点:
- 事实表:数据总量大,增量大,数据读取多以日期做切分,读取一定时间段的数据。
- 维度表:总量相对小,增量小,多以更新操作为主,数据读取会是全表读取,或者按照对应业务 ID 过滤。
基于以上考虑,维度表采用天分区会导致文件数过多,而且是全表读取,会导致所需的文件读取 Task 过多。采用大颗粒度的日期分区(如年分区)可以有效降低分区个数和文件数量;对于增量不大的维度表,也可以采用非分区表。如果维度表的总数据量很大或增量也很大,可以考虑采用某个业务 ID 进行分区。在大部分数据处理逻辑中,针对大维度表会有一定的业务条件进行过滤来提升处理性能,这类表要结合特定业务场景进行优化,无法仅通过日期分区优化。事实表读取方式通常按照时间段切分(近一年、近一个月或近一天),读取的文件数相对稳定可控,所以事实表优先考虑日期分区表
(3)分区采用日期字段,分区表粒度,要基于数据更新范围确定,不要过大也不要过小
分区粒度可以采用年、月、日,分区粒度的目标是减少同时写入的文件桶数,尤其是在:在有数据量更新,且更新数据有一定时间范围规律的,比如:近一个月的数据更新占比最大,可以按照月份创建分区;近一天内的数据更新占比大,可以按照天进行分区。采用 Bucket 索引,写入是通过主键 Hash 打散的,会均匀的打散到分区下每个桶。因为各个分区的数据量是会有波动的,分区下桶的个数设计一般会按照最大分区数据量计算,这样会出现越细粒度的分区,桶的个数会冗余越多。举个例子:采用天级分区,平均的日增数据量是 3GB,最多一天的日志是 8GB,这个会采用Bucket 桶数= 8GB/2GB = 4 来创建表;每天的更新数据占比较高,且主要分散到近一个月。这样会导致结果是,每天的数据会写入到全月的 Bucket 桶中,那就是 4*30 =120 个桶。如果采用月分区,分区桶的个数= 3GB * 30 /2GB = 45 个桶 ,这样写入的数据桶数减少到了 45 个桶。在有限的计算资源下,写入的桶数越少,性能越高。
二、 数据表管理操作规范
1、 数据表 Compaction 规范
mor 表更新数据以行存 log 的形式写入,log 读取时需要按主键合并,并且是行存的,导致 log 读取效率比 parquet 低很多。为了解决 log 读取的性能问题,hudi 通过compaction 将 log 压缩成 parquet 文件,大幅提升读取性能。
(1)(1)持续写入数据的表,每24小时至少执行一次compaction
对于MOR表,无论是流式写入还是批量写入,都需要确保每天至少完成一次Compaction操作。如果长时间不通过Compaction操作维护MOR表,将面临一个Parquet文件需要和多个Log文件合并的情况(通常一个Log文件默认最大为1GB,且有一定压缩率,这导致1GB的Log文件加载到内存中解压后至少需要5G~6G内存),这会带来以下问题:
- 读MOR表非常耗时,可能导致读作业OOM。
- 长时间积累后进行一次Compaction需要消耗大量资源,否则容易OOM。
- 阻塞Clean操作,如果没有Compaction操作产生新版本的Parquet文件,旧版本文件就无法被Clean清理,增加存储压力。
(2)CPU与内存比例为1:4~1:8
Compaction作业将存量parquet文件内的数据与新增log中的数据合并,需要消耗较高的内存资源。根据表设计规范和实际流量波动,将CPU与内存比例设置为1:4~1:8,可确保compaction作业稳定运行。当compaction出现OOM问题时,可通过增加该比例解决。
(3)通过增加并发数提升compaction性能
合理的CPU和内存比例配置能确保compaction作业稳定运行,保证单个compaction task稳定执行。但compaction整体运行时长取决于处理文件数和分配的CPU核数(并发能力),因此可以通过增加compaction作业的CPU核数来提升性能(注意增加CPU时也要保持合适的CPU与内存比例)。
(4)hudi表采用异步compaction
为确保流式入库作业稳定运行,需要保证流式作业不在实时入库过程中执行其它任务,例如flink写hudi的同时完成Compaction。这看似是个不错的方案,既完成入库又完成Compaction,能大幅缩短下游读作业的延迟。但Compaction操作非常消耗内存和IO,会给流式入库作业带来以下影响:
- 增加端到端延迟:Compaction会放大写入延迟,因为Compaction比入库更耗时。
- 降低作业稳定性:Compaction会增加入库作业的不稳定因素,如果Compaction失败,流式入库作业也会失败。
(5)建议每2~4小时进行一次compaction
Compaction是MOR表非常重要且必须执行的维护手段。对于实时任务,Compaction执行过程必须与实时任务解耦,需通过周期性调度Spark任务完成Compaction。关键在于如何合理设置调度周期:周期太短可能导致Spark任务空跑,周期太长则可能积压过多Compaction Plan,导致Spark任务耗时长且下游读作业延迟高。针对此场景,建议:
- 如果对异步Compaction任务的执行时长和资源消耗没有限制,可每2小时或4小时调度一次,这是基本的MOR表维护方案,但非最优方案。
- 最优调度方案:在下次新的Compaction Plan产生前,确保调度任务能执行完当前已有的Compaction Plan。
(6)采用异步Spark执行compaction,不使用Flink进行Compaction
推荐的Flink写hudi方案是:Flink仅负责写数据和schedule compaction,由Spark离线完成run compaction、clean和archive操作。其中schedule compaction用于记录Hudi表中哪些log文件和parquet文件需要合并,这个过程会在Hudi表的.hoodie目录下生成一个compaction.request文件,记录这些需要合并的文件。run compaction则负责实际将compaction.request文件中记录的log文件和parquet文件合并成新的parquet文件。具体实施步骤如下:
(7)Spark 资源配置 excutor_cores*executor_num = sum(FG)
针对 Hash 索引的表,FG 的个数就是合并设计的 Hash 桶的个数
(8)异步 Compaction 可以将多个表串行到一个作业,资源配置相近的表放到一组,该组作业的资源配置为最大消耗资源的表所需的资源。
对于异步 Compaction 任务,这里给出以下开发建议:
- 不需要对每张 Hudi 表都开发异步 Compaction 任务,这样会导致作业开发成本 高,集群作业爆炸,集群资源不能有效的利用和释放。
- 异步 Compaction 任务可以通过执行 Spark Sql 来完成,多个 Hudi 表的Compaction、Clean 和 Archive 可以放在同一个任务来执行,比如对 table1 和table2 用同一个任务来执行异步维护操作:
2、2、数据表 Clean 规范
Clean 是 Hudi 表的重要维护操作之一,对 MOR 表和 COW 表都必须执行。Clean 操作通过删除旧版本文件(Hudi 不再使用的数据文件)来节省 Hudi 表 List 过程的时间,并缓解存储压力。
(1)Hudi 表必须执行 Clean
对于 Hudi 的 MOR 类型和 COW 类型的表,都需要开启 Clean。
- Hudi 表在写入数据时会自动判断是否需要执行 Clean,因为 Clean 的开关默认打开(hoodie.clean.automatic 默认为 true)。
- Clean 操作并不是每次写数据时都会触发,至少需要满足两个条件: 1、Hudi 表中需要有旧版本的文件。COW 表只要数据被更新过就一定存在旧版本文件;MOR 表则需要数据被更新过且做过 Compaction 才会有旧版本文件。 2、Hudi 表满足 hoodie.cleaner.commits.retained 设置的阈值。如果是 Flink 写入,提交的 checkpoint 需要超过这个阈值;如果是批量写入,提交的批次需要超过这个阈值。
(2)MOR 表下游采用批量读模式,Clean 版本数应为 Compaction 版本数+1
MOR 表必须确保 Compaction Plan 能够被成功执行。Compaction Plan 只是记录了需要合并的 Log 文件和 Parquet 文件,关键是要保证执行时这些文件都存在。由于只有 Clean 操作会清理文件,因此建议 Clean 的触发阈值(hoodie.cleaner.commits.retained 的值)至少要大于 Compaction 的触发阈值(对于 Flink 任务即 compaction.delta_commits 的值)。
(3)MOR 表下游采用流式计算,历史版本保留小时级
当 MOR 表的下游是流式计算(如 Flink 流读)时,应按业务需求保留小时级的历史版本。这样近几小时的增量数据可通过 log 文件读取。保留时长过短会导致下游 Flink 作业在重启或阻塞时,上游增量数据已被 Clean,只能从 parquet 文件读取增量数据,降低性能;保留时间过长则会造成 log 中历史数据冗余存储。建议按以下公式计算保留 2 小时历史版本:版本数 = 3600*2/版本间隔时间,其中版本间隔时间来自 Flink 作业的 checkpoint 周期或上游批量写入周期。
(4)COW 表如无特殊要求,保留版本数设置为 1
COW 表的每个版本都包含表的全量数据,保留的版本数越多,冗余数据越多。因此,若业务无历史数据回溯需求,建议将保留版本数设置为 1,即只保留当前最新版本。
(5)Clean 作业每天至少执行一次,建议每 2~4 小时执行一次
Hudi 的 MOR 表和 COW 表都需要保证每天至少执行一次 Clean。MOR 表的 Clean 可以与 Compaction 一起异步执行,而 COW 表的 Clean 可在写数据时自动判断是否执行。
3、 数据表 Archive 规范
Archive(归档)是为了减轻 Hudi 读写元数据的压力。所有元数据都存放在 Hudi 表根目录/.hoodie 目录下。如果该目录下的文件数量超过 10000,Hudi 表会出现明显的读写延迟。
(1)Hudi 表必须执行 Archive
对于 Hudi 的 MOR 类型和 COW 类型的表,都需要开启 Archive。
- Hudi 表在写入数据时会自动判断是否需要执行 Archive,因为 Archive 的开关默认打开(hoodie.archive.automatic 默认为 true)。
- Archive 操作并不是每次写数据时都会触发,至少需要满足以下两个条件:
- Hudi 表满足 hoodie.keep.max.commits 设置的阈值。如果是 Flink 写 Hudi,至少提交的 checkpoint 要超过这个阈值;如果是批写 Hudi,至少提交的批次要超过这个阈值。
- Hudi 表做过 Clean。如果没有做过 Clean 就不会执行 Archive。
(2)Archive 作业每天至少执行一次
建议每 2~4 小时执行一次。Hudi 的 MOR 表和 COW 表都需要保证每天至少执行 1 次 Archive。MOR 表的 Archive 可以和 Compaction 放在一起异步执行。COW 表的 Archive 可以在写数据时自动判断是否执行。
- 作者:tacjin
- 链接:http://jin.wiki/article/24ee55fd-4dcc-8077-b5d0-d5ecaa24af0c
- 声明:本文采用 CC BY-NC-SA 4.0 许可协议,转载请注明出处。