文章收录在网站:http://hardyfish.top/
文章收录在网站:http://hardyfish.top/
文章收录在网站:http://hardyfish.top/
文章收录在网站:http://hardyfish.top/
事务
事务Producer保证消息写入分区的原子性,即这批消息要么全部写入成功,要么全失败。此外,Producer重启回来后,kafka依然保证它们发送消息的精确一次处理。
开启enable.idempotence = true
设置Producer端参数transctional.id
数据的发送需要放在beginTransaction和commitTransaction之间。
Consumer端的代码也需要加上isolation.level
参数,用以处理事务提交的数据。
producer.initTransactions();
try {producer.beginTransaction();producer.send(record1);producer.send(record2);producer.commitTransaction();
} catch (KafkaException e) {producer.abortTransaction();
}
事务Producer虽然在多分区的数据处理上保证了幂等,但是处理性能上相应的是会有一些下降的。
数据存储
Kafka 消息以 Partition 作为存储单元,每个 Topic 的消息被一个或者多个 Partition 进行管理。
- Partition 是一个有序的,不变的消息队列,消息总是被追加到尾部。
- 一个 Partition 不能被切分成多个散落在多个 Broker 上或者多个磁盘上。
Partition 又划分成多个 Segment 来组织数据。
Segment 在它的下面还有两个组成部分:
- 索引文件:以
.index
后缀结尾,存储当前数据文件的索引。- 数据文件:以
.log
后缀结尾,存储当前索引文件名对应的数据文件。
请求模型
请求到Broker后,也会通过类似于请求转发的组件Acceptor转发到对应的工作线程上,Kafka中被称为网络线程池,一般默认每个Broker上为3个工作线程,可以通过参数 num.network.threads
进行配置。
并且采用轮询的策略,可以很均匀的将请求分发到不同的网络线程中进行处理。
但是实际的处理请求并不是由网络线程池进行处理的,而是会交给后续的IO线程池,当网络线程接受到请求的时候,会将请求写入到共享的请求队列中,而IO线程池会进行异步的处理,默认情况下是8个,可以通过
num.io.threads
进行配置。
常见场景
重复消费
consumer 在消费过程中,应用进程被强制kill掉或发生异常退出。
例如在一次poll500条消息后,消费到200条时,进程被强制kill消费到offset未提交,或出现异常退出导致消费到offset未提交。
下次重启时,依然会重新拉取500消息,造成之前消费到200条消息重复消费了两次。
消费者消费时间过长。
max.poll.interval.ms
参数定义了两次poll的最大间隔,它的默认值是 5 分钟,表示你的 Consumer 程序如果在 5 分钟之内无法消费完 poll 方法返回的消息,那么 Consumer 会主动发起 离开组 的请求,Coordinator 也会开启新一轮 Rebalance。因为上次消费的offset未提交,再次拉取的消息是之前消费过的消息,造成重复消费。
提高消费能力,提高单条消息的处理速度;根据实际场景
max.poll.interval.ms
值设置大一点,避免不必要的rebalance;可适当减小
max.poll.records
的值,默认值是500,可根据实际消息速率适当调小。
消息丢失
消费者程序丢失数据
Consumer 程序从 Kafka 获取到消息后开启了多个线程异步处理消息,而 Consumer 程序自动地向前更新位移。
假如某个线程运行失败了,它负责的消息没有被成功处理,但位移已经被更新了,因此这条消息对于 Consumer 而言实际上是丢失了。
最佳配置:
不要使用
producer.send(msg)
,而要使用producer.send(msg, callback)
。设置 acks = all:
- 设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是 已提交。
设置 retries 为一个较大的值。
- 当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了
retries > 0
的 Producer 能够自动重试消息发送,避免消息丢失。设置
unclean.leader.election.enable = false
。设置
replication.factor >= 3
。
- 防止消息丢失的主要机制就是冗余。
设置
min.insync.replicas > 1
。
- 控制的是消息至少要被写入到多少个副本才算是 已提交 。
- 设置成大于 1 可以提升消息持久性。
- 在实际环境中千万不要使用默认值 1。
确保
replication.factor > min.insync.replicas
。
- 如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。
确保消息消费完成再提交。
- Consumer 端有个参数
enable.auto.commit
,最好把它设置成 false,并采用手动提交位移的方式。
消息顺序
乱序场景一
因为一个topic可以有多个partition,kafka只能保证partition内部有序。
1、可以设置topic 有且只有一个partition。
2、根据业务需要,需要顺序的指定为同一个partition。
乱序场景二
对于同一业务进入了同一个消费者组之后,用了多线程来处理消息,会导致消息的乱序。
消费者内部根据线程数量创建等量的内存队列,对于需要顺序的一系列业务数据,根据key或者业务数据,放到同一个内存队列中,然后线程从对应的内存队列中取出并操作。