type
status
date
slug
summary
tags
category
icon
password
一、流式读Hudi表规范
1、流读参数规范
参数名称 | 参数描述 | 建议值 | 说明 |
Connector | 读取表的类型 | hudi | 必填 |
Path | 表存储的路径 | 据实填写 | 必填 |
table.type | Hudi表类型 | MERGE_ON_REA
D默认值为
COPY_ON_WRIT
E | 必填 |
hoodie.datasource.write.recordkey.field | 表的主键 | 据实填写 | 必填 |
write.precombine.field | 数据合并字段 | 据实填写 | 必填 |
read.tasks | 读hudi表task并行度 | 默认为4 | 选填👌 |
read.streaming.enabled | 设置true开启流式增量读模式,false批量读取 | 据实填写,流读场景下为true | 必填 |
read.streaming.start-commit | 指定‘yyyyMMddHHmmss’ 格式的起始 commit(闭区间) | 默认从最新commit | 选填👌 |
hoodie.datasource.write.keygenerat
or.type | 上游表主键生成类型 | COMPLEX | 选填👌 |
read.streaming.check-interval | 流读监测上游新提交的周期 | 建议值5(流量大建议使用默认值)默认值为1分钟 | 选填👌 |
read.end-commit | read.end-commit 指定结束消费位置,区间为闭区间,即包含起始、结束的 commitStream 增量消费,通过参数read.streaming.start-commit指定起始消费位置;Batch 增量消费,通过参数read.end-commit消费位置,区间为闭区间,即包含起始、结束的 commit | 默认到最新commit | 选填👌 |
changelog.enabled | 是否写入changelog消息 | 默认值为false,CDC场景填写为true | 选填👌 |
2、设置合理的消费参数避免出现File not Found的问题
在Hudi里,数据文件不是静态的,而是有“生命周期”的:
- Write 阶段
- Hudi 写数据时会生成 base file (Parquet) 和对应的 log file (avro)。
- 如果是 MOR(Merge-On-Read)表,读的时候需要 merge base + log。
- 如果是 COW(Copy-On-Write)表,更新时会生成新的 base file,老文件被标记为 obsolete。
- Compaction / Clean / Archive
- Compaction:合并 base file + log file,减少小文件,提升查询效率。
- Clean:清理无效文件(比如旧版本的 base file,已经被替代)。
- Archive:将 commit 元数据从 active timeline 移到 archive log,避免 timeline 过大。
👉 所以:当下游(比如 Spark/Flink 流读)还没来得及读,Hudi 上游就已经 clean 掉旧文件 或者 archive 了 commit 元数据,下游再按旧 timeline 去读,就会报 File not Found。
当下游消费过慢,上游写入端会把文件归档,导致问题。优化建议:
(1)调大read.tasks
- Flink / Spark 下游读取 Hudi 表时,加大并行度,让消费追上写入。
- Hudi Flink Source 的 read.tasks 参数(如 hoodie.datasource.read.streaming.check.interval 配合),就是要加快读速度。
(2)如果有限流调大限流参数
- Flink 的 Hudi Source 有限流机制,保护下游任务。
- 但限流过低 → 下游消费速度赶不上 → commit timeline 越来越旧 → 文件被 clean → 报错。
- 所以要适当调高限流,让消费更及时。
(3)调大上游compaction、archive、clean参数
- Hudi 有一系列配置:
- hoodie.cleaner.commits.retained:保留多少个 commit 的文件。
- hoodie.keep.min.commits & hoodie.keep.max.commits:决定 archive 何时触发。
- hoodie.compact.inline、hoodie.compact.schedule.inline:决定 compaction 频率。
- 如果这些参数太小,就会很快把旧文件清理掉。
- 调大这些值,可以让文件多保留一会儿,下游即使有延迟也能读到。
二、流式写Hudi表规范
1、流写参数规范
参数名称 | 参数描述 | 建议值 | 说明 |
Connector | 读取表的类型 | hudi | 必填 |
Path | 表存储的路径 | 据实填写 | 必填 |
hoodie.datasource.write.recordkey.field | 表的主键 | 据实填写 | 必填 |
write.precombine.field | 数据合并字段 | 据实填写 | 必填 |
write.tasks | 写hudi表的task并行度 | 默认参数4,参考参数优化 | 选填👌 |
index.bootstrap.enabled | Flink采取的是内存索引,需要将数据的主键缓存到内存中,保证目标表的数据唯一,因此需要配置该值,否则会导致数据重复 | TRUE(默认值FALSE) | 选填,Bucket索引不配置👌 |
write.index_bootstrap.tasks | index.bootstrap.enabled开启后有效,增加任务数提升启动速度 | 4 | 选填,具体参照性能优化指导 |
index.state.ttl | 索引数据保存时长 | 默认值为0 | 选填,默认永久不失效,可根据业务调整 👌 |
compaction.delta_commits | MOR表Compaction计划触发条件 | 200 | 必填 |
compaction.async.enabled | 是否开启在线压缩,通过关闭此参数关闭在线压缩 | FALSE | 必填,将compaction操作转移到sparksql运行,提升写性能 |
hive_sync.enable | 是否向hive同步表信息 | True | 必填 |
hive_sync.metastore.uris | Hivemeta uri信息 | 据实填写 | 必填 |
hive_sync.jdbc_url | Hive jdbc链接 | 据实填写 | 必填 |
hive_sync.table | Hive同步的表名 | 据实填写 | 必填 |
hive_sync.db | Hive同步的数据库名 | 据实填写 | 选填👌 |
hive_sync.support_timestamp | 时间戳支持 | True | 必填 |
changelog.enabled | 是否写入changelog消息 | 默认值为false,CDC场景填写为true默认值为false,CDC场景填写为true | 选填👌 |
2、多组件建表名必须满足统一格式要求
在湖仓/数据湖体系里,多引擎共同依赖"同一份元数据",同一张 Hudi 表往往被多种组件同时读写:
- 写入:Flink、Spark SQL/Streaming、HIVE SQL、 MR
- 读取:Spark SQL、Flink SQL、Trino/Presto、Impala 等
- 元数据:Hive Metastore(HMS)或 AWS Glue Catalog(Hive 兼容 API)
- 存储:HDFS / OSS / S3等
这些组件"约定俗成"用 Hive 的命名/元数据模型 来定位一张表:catalog.db.table(不少时候简写成 db.table)。
因此,只要表名不满足 Hive 的语法与语义,你就会在以下任一环节踩雷:
- 注册失败/读取异常:在 HMS/Glue 里建不进去,或 Trino/Presto/Impala 无法解析
- 解析歧义:.(点号)被引擎当作库名分隔符而不是表名字符
- 路径/SerDe 混乱:特殊字符导致 location、serde、inputformat 的解析/转义出问题
- 区分大小写不一致:有的引擎大小写不敏感,有的部分逻辑又区分,跨引擎切换时查不到表或创建了"重复表"
- 流式读写的时间线/timeline 问题被放大:当表名需要反引号/转义时,作业脚本与 SQL 的可维护性、可迁移性下降,错误更难定位。
常见的踩坑场景有:
- 持续产表/多租户多业务自动建表:流水线/作业自动按业务代码生成表名,命名不合规时,线上 24×7 作业会频繁失败。
- 多引擎读同一张表:Flink 写、Spark/Trino 查;一个能用一个不能用,问题被放大。
- 分层多表/大规模表管理:ODS/DWD/DWS/DIM/ADS 前缀+业务后缀容易超长或引入特殊符号,导致 Glue/HMS 登记或 UI/CLI 查询异常。
- 环境迁移/多 catalog 切换:从本地 HMS 到 Glue、从 dev 到 prod,大小写与转义差异暴露,读写链路"炸雷"。
⚡️⚡️因此在实践中要注意:
(1)表名必须以字母或下划线开头,不能以数字开头。
(2)表名只能包含字母、数字、下划线和点号(.)。
(3)表名长度≤ 128 个字符。
(4)表名中不能包含空格和特殊字符,如冒号、分号、斜杠、反斜杠、管道、星号、问号、@、#、$、% 等。
(5)Hive 不区分大小写,但强烈建议全小写,避免跨引擎大小写不一致带来的歧义。
(6)不得使用 Hive 保留关键字(如 select、from、where、table、partition、view、database、index 等)作为表名。
(7)跨组件兼容性增强建议:
- 尽量避免在表名里使用点号 .。因为很多引擎把 db.table 语义硬编码在解析器里,a.b.c 容易被拆成 db=a, table=b.c 或直接报错。若已使用,需在 SQL 里处处加反引号,增加运维成本。
- 统一命名风格:小写 + 下划线分词 + 分层前缀(如 ods_/dwd_/dws_/dim_/ads_),例如:dwd_user_login_detail。
- 避免无意义拼音/随意缩写;表名能自解释业务含义。(8)范围澄清:这里的"表名"指 HMS/Glue 中的 table name;完整限定名常为 catalog.database.table。请勿把"database 名"或"catalog 名"的限制与"table 名"混淆;三者都应遵循类似的安全命名规则。
三、Flink作业参数规范
1、常用参数配置
参数名称 | 参数描述 | 建议值 | 说明 |
-c | 指定主类名 | 据实填写 | 必填 |
-ynm | Flink Yarn作业名称 | 据实填写 | 据实填写 |
execution.checkpointing.interval | 检查点触发间隔(毫秒) | 60000 | 必填,通过-yD添加,单位毫秒 |
execution.checkpointing.timeout | 检查点超时时长 | 30min | 必填,通过-yD添加,默认值:30 min |
parallelism.default | 作业并行度,例如join算子 | 据实填写 | 选填,通过-yD添加默认值:1👌 |
table.exec.state.ttl | flink状态ttl(join ttl) | 据实填写 | 必填,通过-yD添加,默认值:0 |
2、checkpoint间隔时长、执行时长于超时时长的关系
在 checkpoint 配置中,三类时长之间的关系如下:
- 执行时长(checkpoint duration)
- 实际完成一次 checkpoint 所需的时间,取决于当次要处理的数据量。
- 数据量越大,执行耗时越长。
- 间隔时长(checkpoint interval)
- 两次 checkpoint 启动之间的时间间隔。
- 一般要求 间隔时长 > 执行时长,避免前一次尚未完成时就触发新的 checkpoint,造成堆积或冲突。
- 超时时长(checkpoint timeout)
- 单次 checkpoint 允许的最大执行时间。
- 建议 超时时长 > 间隔时长,这样即使 checkpoint 执行时间较长,也不会因为尚未完成就被判定为超时失败。
因此
- 执行时长 受数据量决定,无法完全固定。
- 间隔时长 需大于平均执行时长,保证有缓冲。
- 超时时长 需大于间隔时长,给长耗时场景预留安全余量。
3、CDC场景下Hudi读写表需要开启Changelog
CDC场景下为了保障Flink计算的准确,需要在Hudi表中保留+I,+U,-U,-D等操作,因此一个Hudi表在写入、流读时都需要开启Changelog。
4、Hudi表作为Source表,建议设置限流
Hudi表作为source表时候, 防止上限出现超过流量峰值,导致作业出现异常,带来不稳定隐私, 因此因此建议设置限流,限流上限应该为业务上线压测的峰值。
5、检查点失败重试次数
execution.checkpointing.tolerable-failed-checkpoints,Flink On Hudi作业建议设置checkpoint容忍次数多次,可以为50
- 作者:tacjin
- 链接:http://jin.wiki/article/258e55fd-4dcc-800d-b983-f537786d1747
- 声明:本文采用 CC BY-NC-SA 4.0 许可协议,转载请注明出处。