【大数据学习 | kafka】kafka的ack和一致性

1. ack级别

上文中我们提到过kafka是存在确认应答机制的,也就是数据在发送到kafka的时候,kafka会回复一个确认信息,这个确认信息是存在等级的。

ack=0 这个等级是最低的,这个级别中数据sender线程复制完毕数据默认kafka已经接收到数据。

ack=1 这个级别中,sender线程复制完毕数据leader分区拿到数据放入到自己的存储并且返回确认信息

ack= -1 这个级别比较重要,sender线程复制完毕数据,主分区接受完毕数据并且从分区都同步完毕数据然后在返回确认信息

那么以上的等级在使用的时候都会出现什么问题呢?

ack = 0 会丢失数据

ack=0时,在异步复制过程中,leader可能会丢失leader分区和follower分区的数据。

ack=1

ack=1的时候leader虽然接收到数据存储到本地,但是没有同步给follower节点,这个时候主节点宕机,从节点重新选举新的主节点,主节点是不含有这个数据的,数据会丢失.

ack = -1

这个模式不会丢失数据,但是如果leader接受完毕数据并且将数据同步给不同的follower,从节点已经接受完毕,但是还没有返回给sender线程ack的时候,这个时候leader节点宕机了,sender没有接收到这个ack,它人为没有发送成功还会重新发送数据过来,会造成数据重复。

一般前两种都适合在数据并不是特别重要的时候使用,而最后一种效率会比较低下,但是适用于可靠性比较高的场景使用

所以一般使用我们都会使用ack = -1 retries = N 联合在一起使用

那么我们如何能够保证数据的一致性呢?

2. 幂等性

在kafka的0.10以后的版本中增加了新的特性,幂等性,主要就是为了解决kafka的ack = -1的时候,数据的重复问题,设计的原理就是在kafka中增加一个事务编号。

数据在发送的时候在单个分区中的seq事物编号是递增的,如果重复的在一个分区中多次插入编号一致的两个信息,那么这个数据会被去重掉

在单个分区中序号递增,也就是我们开启幂等性也只能保证单个分区的数据是可以去重的

整体代码如下:

pro.put(ProducerConfig.RETRIES_CONFIG,3);
pro.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);

设定retries = 3 ,enable.idempotence = true

幂等性开启的时候,ack默认设定为-1。

幂等性的工作原理很简单,每条消息都有一个「主键」,这个主键由 <PID, Partition, SeqNumber> 组成,他们分别是:

  • PID:ProducerID,每个生产者启动时,Kafka 都会给它分配一个 ID,ProducerID 是生产者的唯一标识,需要注意的是,Kafka 重启也会重新分配 PID
  • Partition:消息需要发往的分区号
  • SeqNumber:生产者,他会记录自己所发送的消息,给他们分配一个自增的 ID,这个 ID 就是 SeqNumber,是该消息的唯一标识

对于主键相同的数据,Kafka 是不会重复持久化的,它只会接收一条,但由于是原理的限制,幂等性也只能保证单分区、单会话内的数据不重复,如果 Kafka 挂掉,重新给生产者分配了 PID,还是有可能产生重复的数据,这就需要另一个特性来保证了 ——Kafka 事务。

3. kafka的事务

Kafka 事务基于幂等性实现,通过事务机制,Kafka 可以实现对多个 Topic 、多个 Partition 的原子性的写入,即处于同一个事务内的所有消息,最终结果是要么全部写成功,要么全部写失败。

Kafka 事务分为生产者事务消费者事务,但它们并不是强绑定的关系,消费者主要依赖自身对事务进行控制,因此这里我们主要讨论的是生产者事务。

3.1 如何开启事务

创建一个 Producer,指定一个事务 ID:

Properties properties = new Properties();properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//设置事务ID,必须
properties.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactional_id_1");
//创建生产者
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

使用事务发送消息:

// 初始化事务
producer.initTransactions();
// 开启事务
producer.beginTransaction();//发送10条消息往kafka,假如中间有异常,所有消息都会发送失败
try {for (int i = 0; i < 10; i++) {producer.send(new ProducerRecord<>("topic-test", "a message" + i));}
}
// 提交事务
producer.commitTransaction();
} catch (Exception e) {// 终止事务producer.abortTransaction();
} finally {producer.close();
}

3.2 事务工作原理

1)启动生产者,分配协调器

在使用事务的时候,必须给生产者指定一个事务 ID,生产者启动时,Kafka 会根据事务 ID 来分配一个事务协调器(Transaction Coordinator) 。每个 Broker 都有一个事务协调器,负责分配 PID(Producer ID) 和管理事务。

事务协调器的分配涉及到一个特殊的主题 __transaction_state,该主题默认有 50 个分区,每个分区负责一部分事务;Kafka 根据事务ID的hashcode值%50 计算出该事务属于哪个分区, 该分区 Leader 所在 Broker 的事务协调器就会被分配给该生产者。

分配完事务协调器后,该事务协调器会给生产者分配一个 PID,接下来生产者就可以准备发送消息了。

2)发送消息

生产者分配到 PID 后,要先告诉事务协调器要把消息发往哪些分区,协调器会做一个记录,然后生产者就可以开始发送消息了,这些消息与普通的消息不同,它们带着一个字段标识自己是事务消息。

当生产者事务内的消息发送完毕,会向事务协调器发送 Commit 或 Abort 请求,此时生产者的工作已经做完了,它只需要等待 Kafka 的响应

3)确认事务

当生产者开始发送消息时,协调器判定事务开始。它会将开始的信息持久化到主题 __transaction_state 中。

当生产者发送完事务内的消息,或者遇到异常发送失败,协调器会收到 Commit 或 Abort 请求,接着事务协调器会跟所有主题通信,告诉它们事务是成功还是失败的。

如果是成功,主题会汇报自己已经收到消息,协调者收到所有主题的回应便确认了事务完成,并持久化这一结果。

如果是失败的,主题会把这个事务内的消息丢弃,并汇报给协调者,协调者收到所有结果后再持久化这一信息,事务结束;整个放弃事务的过程消费者是无感知的,它并不会收到这些数据。

事物不仅可以保证多个数据整体成功失败,还可以保证数据丢失后恢复。

3.3 代码实现

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class ProducerWithTransaction {public static void main(String[] args) {Properties pro = new Properties();pro.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop106:9092");pro.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());pro.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());pro.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transaciton_test");KafkaProducer<String, String> producer = new KafkaProducer<String, String>(pro);ProducerRecord<String, String> record = new ProducerRecord<>("topic_a", "this is hainiu");producer.initTransactions();producer.beginTransaction();try{for(int i=0;i<5;i++){producer.send(record);}
//            int a = 1/0;producer.commitTransaction();}catch (Exception e){producer.abortTransaction();}finally {producer.close();}}
}

4. 一致性语义

在大数据场景中存在三种时间语义,分别为

At Least Once 至少一次,数据至少一次,可能会重复

At Most Once 至多一次,数据至多一次,可能会丢失

Exactly Once 精准一次,有且只有一次,准确的消息传输

那么针对于以上我们学习了ack已经幂等性以及事务。

所以我们做以下分析:

如果设定ack = 0 或者是 1 出现的语义就是At Most Once 会丢失数据

如果设定ack = - 1 会出现At Least Once 数据的重复

在ack = -1的基础上开启幂等性会解决掉数据重复问题,但是不能保证一个批次的数据整体一致,所以还要开启事务才可以。

5. 参数调节

参数调节
buffer.memoryrecord accumulator的大小,适当增加可以保证producer的速度,默认32M
batch-size异步线程拉取的批次大小,适当增加可以提高效率,但是会增加延迟性
linger.ms异步线程等待时长一般根据生产效率而定,不建议太大增加延迟效果
acks确认应答一般设定为-1,保证数据不丢失
enable.idempotence开启幂等性保证数据去重,实现exactly once语义
retries增加重试次数,保证数据的稳定性
compression.type增加producer端的压缩
max.in.flight.requests.per.connectionsender线程异步复制数据的阻塞次数,当没收到kafka的ack之前可以最多发送五个写入请求,调节这个参数可以保证数据的有序性

全部代码:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class ProducerWithMultiConfig {public static void main(String[] args) throws InterruptedException {Properties pro = new Properties();pro.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop106:9092");pro.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());pro.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());pro.put(ProducerConfig.BATCH_SIZE_CONFIG, 16*1024);pro.put(ProducerConfig.LINGER_MS_CONFIG, 100);pro.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 1024*1024*64);pro.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);pro.put(ProducerConfig.RETRIES_CONFIG, 3);pro.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");pro.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);KafkaProducer<String, String> producer = new KafkaProducer<String, String>(pro);ProducerRecord<String, String> record = new ProducerRecord<>("topic_a", "this is hainiu");producer.send(record);producer.close();}
}

其中max.in.flight.requests.per.connection参数设定后可以增加producer的阻塞大小

在未开启幂等性的时候,这个值设定为1,可以保证单个批次的数据有序,在分区内部有序

如果开启了幂等性可以设定最大值不超过5,可以保证五个request请求单个分区内有序

因为没有开启幂等性的时候如果第一个请求失败,第二个请求重新发送的时候需要二次排序

要是开启幂等性了会保留原来的顺序性,不需要重新排序

总而言之kafka可以保证单分区有序但是整体是无序的

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/461825.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【分布式技术】分布式事务深入理解

文章目录 概述产生原因关键点 分布式事务解决方案3PC3PC的三个阶段&#xff1a;3PC相比于2PC的改进&#xff1a;3PC的缺点&#xff1a; TCCTCC事务的三个阶段&#xff1a;TCC事务的设计原则&#xff1a;TCC事务的适用场景&#xff1a;TCC事务的优缺点&#xff1a;如何解决TCC模…

Linux高阶——1027—

1、守护进程的基本流程 1、父进程创建子进程&#xff0c;父进程退出 守护进程是孤儿进程&#xff0c;但是是工程师人为创建的孤儿进程&#xff0c;低开销模式运行&#xff0c;对系统没有压力 2、子进程&#xff08;守护进程&#xff09;脱离控制终端&#xff0c;创建新会话 …

centos7配置keepalive+lvs

拓扑图 用户访问www.abc.com解析到10.4.7.8&#xff0c;防火墙做DNAT将访问10.4.7.8:80的请求转换到VIP 172.16.10.7:80&#xff0c;负载均衡器再将请求转发到后端web服务器。 实验环境 VIP&#xff1a;负载均衡服务器的虚拟ip地址 LB &#xff1a;负载均衡服务器 realserv…

服务器宝塔安装哪吒监控

哪吒文档地址&#xff1a;https://nezha.wiki/guide/dashboard.html 一、准备工作 OAuth : 我使用的gitee&#xff0c;github偶尔无法访问&#xff0c;不是很方便。第一次用了极狐GitLab&#xff0c;没注意&#xff0c;结果是使用90天&#xff0c;90天后gg了&#xff0c;无法登…

【动手学强化学习】part6-策略梯度算法

阐述、总结【动手学强化学习】章节内容的学习情况&#xff0c;复现并理解代码。 文章目录 一、算法背景1.1 算法目标1.2 存在问题1.3 解决方法 二、REINFORCE算法2.1 必要说明softmax()函数交叉熵策略更新思想 2.2 伪代码算法流程简述 2.3 算法代码2.4 运行结果2.5 算法流程说明…

单片机内存管理和启动文件

一、常见存储器介绍 FLASH又称为闪存&#xff0c;不仅具备电子可擦除可编程(EEPROM)的性能&#xff0c;还不会断电丢失数据同时可以快速读取数据&#xff0c;U盘和MP3里用的就是这种存储器。在以前的嵌入式芯片中&#xff0c;存储设备一直使用ROM(EPROM)&#xff0c;随着技术的…

Python画图3个小案例之“一起看流星雨”、“爱心跳动”、“烟花绚丽”

源码如下&#xff1a; import turtle # 导入turtle库&#xff0c;用于图形绘制 import random # 导入random库&#xff0c;生成随机数 import math # 导入math库&#xff0c;进行数学计算turtle.setup(1.0, 1.0) # 设置窗口大小为屏幕大小 turtle.title("流星雨动画&…

SQL-lab靶场less1-4

说明&#xff1a;部分内容来源于网络&#xff0c;如有侵权联系删除 前情提要&#xff1a;搭建sql-lab本地靶场的时候发现一些致命的报错&#xff1a; 这个程序只能在php 5.x上运行&#xff0c;在php 7及更高版本上&#xff0c;函数“mysql_query”和一些相关函数被删除&#xf…

AutoGLM:智谱AI的创新,让手机成为你的生活全能助手

目录 引言一、AutoGLM&#xff1a;开启AI的Phone Use时代二、技术核心&#xff1a;AI从“语言理解”到“执行操作”三、实际应用案例&#xff1a;AutoGLM的智能力量1. 智能生活管理&#x1f34e;2. 社交网络的智能互动&#x1f351;3. 办公自动化&#x1f352;4. 电子商务的购物…

ceph补充介绍

SDS-ceph ceph介绍 crushmap 1、crush算法通过计算数据存储位置来确定如何存储和检索&#xff0c;授权客户端直接连接osd 2、对象通过算法被切分成数据片&#xff0c;分布在不同的osd上 3、提供很多种的bucket&#xff0c;最小的节点是osd # 结构 osd (or device) host #主…

Scrapy源码解析:DownloadHandlers设计与解析

1、源码解析 代码路径&#xff1a;scrapy/core/downloader/__init__.py 详细代码解析&#xff0c;请看代码注释 """Download handlers for different schemes"""import logging from typing import TYPE_CHECKING, Any, Callable, Dict, Gener…

如何解决docker镜像下载失败问题

经常用docker的朋友都知道&#xff0c;docker hub的镜像仓库经常访问不通 rootiZwz97kfjnf78copv1ae65Z:~# docker pull ubuntu:18.04 Error response from daemon: Get https://registry-1.docker.io/v2/: net/http: request canceled while waiting for connection (Client.…

探索 ONLYOFFICE:开源办公套件的魅力

文章目录 引言一、ONLYOFFICE 产品介绍与历史1.1 ONLUOFFICE 介绍1.2 ONLYOFFICE发展历史 二、ONLYOFFICE 的核心功能2.1 文档处理2.2 演示文稿 三、ONLYOFFICE 部署与安装四、ONLYOFFICE 产品优势和挑战五、ONLYOFFICE 案例分析六、ONLYOFFICE 的未来发展七、全文总结 引言 在…

FlaskFastAPIgunicornunicorn并发调用

Flask VS. FastAPI Flask和FastAPI是Python中两种流行的Web框架&#xff0c;它们各自具有不同的特点和适用场景。以下是它们之间的一些主要区别&#xff1a; 1. 框架类型 Flask&#xff1a;Flask是一个轻量级的微框架&#xff0c;适合构建小型到中型的Web应用。它灵活且易于扩展…

第2章 JSP基础

JavaWeb程序设计-T2(JSP基础) 一、JSP概述 1、JSP概念 JSP(Java Server Page)是sun公司倡导建立的一种动态网页标准。 用于开发动态网页(将后端开发语言嵌入带前端中【将java嵌入到HTML中】) 2、JSP工作原理 JSP就是将传统Java代码嵌入到html页面代码中,由Web服务器进…

Unix 中文件权限设置

在 Unix 和类 Unix 系统中&#xff0c;文件权限是通过八进制数表示的&#xff0c;这些数字代表不同的权限组合。以下是一些常见的八进制数及其对应的权限设置&#xff1a; 1. **0644**&#xff1a; - 所有者&#xff08;owner&#xff09;&#xff1a;读&#xff08;read&a…

【小白学机器学习28】 统计学脉络+ 总体+ 随机抽样方法

目录 参考书&#xff0c;学习书 0 统计学知识大致脉络 1 个体---抽样---整体 1.1 关于个体---抽样---整体&#xff0c;这个三段式关系 1.2 要明白&#xff0c;自然界的整体/母体是不可能被全部认识的 1.2.1 不要较真&#xff0c;如果是人为定义的一个整体&#xff0c;是可…

《Python游戏编程入门》注-第4章5

2.3 实现开始游戏的功能 当显示图1所示的游戏启动界面后&#xff0c;根据提示点击“确定”按键&#xff0c;则可以开始游戏。也就是要完成键盘监听的功能&#xff0c;当游戏程序监听到玩家点击了“确定”按键后&#xff0c;开始游戏。 在《Python游戏编程入门注-第4章2》中介…

mysql中的锁理解

1.共享锁&#xff0c;排他锁&#xff0c;也叫读锁和写锁 共享锁(S锁)(读锁)&#xff1a;事务在读取记录的时候获取共享锁&#xff0c;允许其它事务同时获取共享锁。 排他锁(X锁)(写锁)&#xff1a;事务在修改记录的时候获取排他锁&#xff0c;只允许一个事务获取排他锁&#x…

【C++】位图详解(一文彻底搞懂位图的使用方法与底层原理)

目录 1.位图的概念 2.位图的使用方法 定义与创建 设置和清除 位访问和检查 转换为其他格式 3.位图的使用场景 1.快速的查找某个数据是否在一个集合中 2.排序去重 3.求两个集合的交集和并集 4.位图的底层实现 私有成员定义与初始化 set和reset的实现 前面的博客我们…