一、fllink 内部配置
client.id.prefix
,指定用于 Kafka Consumer 的客户端 ID 前缀partition.discovery.interval.ms
,定义 Kafka Source 检查新分区的时间间隔。 请参阅下面的动态分区检查一节register.consumer.metrics
指定是否在 Flink 中注册 Kafka Consumer 的指标commit.offsets.on.checkpoint
指定是否在进行 checkpoint 时将消费位点提交至 Kafka broker
Kafka consumer 的配置可以参考 Apache Kafka 文档。
请注意,即使指定了以下配置项,构建器也会将其覆盖:
auto.offset.reset.strategy
被 OffsetsInitializer#getAutoOffsetResetStrategy() 覆盖partition.discovery.interval.ms
会在批模式下被覆盖为 -1-
消费位点提交 #
Kafka source 在 checkpoint 完成时提交当前的消费位点 ,以保证 Flink 的 checkpoint 状态和 Kafka broker 上的提交位点一致。如果未开启 checkpoint,Kafka source 依赖于 Kafka consumer 内部的位点定时自动提交逻辑,自动提交功能由
enable.auto.commit
和auto.commit.interval.ms
两个 Kafka consumer 配置项进行配置。注意:Kafka source 不依赖于 broker 上提交的位点来恢复失败的作业。提交位点只是为了上报 Kafka consumer 和消费组的消费进度,以在 broker 端进行监控。
二、Flink KafkaConsumer offset提交过程解释
Flink kafka consumer commit offset方式需要区分是否开启了checkpoint。
1.如果checkpoint关闭,commit offset要依赖于kafka 客户端的auto commit。需设置enable.auto.commit,auto.commit.interval.ms参数到consumerproperties,就会按固定的时间间隔定期auto commit offset到kafka。
2.如果开启checkpoint,这个时候作业消费的offset是Flink在state中自己管理和容错。此时提交offset到kafka,一般都是作为外部进度的监控,想实时知道作业消费的位置和lag情况。此时需要setCommitOffsetsOnCheckpoints 为 true来设置当checkpoint成功时提交offset 到 kafka。此时commit offset的间隔就取决于checkpoint的间隔,所以此时从kafka一侧看到的lag可能并非完全实时,如果checkpoint间隔比较长lag曲线可能会是一个锯齿状。
三、kafka 内部配置