Kafka简单入门02——ISR机制

目录

ISR机制

ISR 关键概念

HW和LEO

Java使用Kafka通信

Kafka 生产者示例

Kafka 消费者示例


ISR机制

Kafka 中的 ISR(In-Sync Replicas)机制是一种用于确保数据可靠性和一致性的重要机制。ISR 是一组副本,它包括分区的领导者(Leader)和追随者(Follower)副本,这些副本与领导者保持数据同步。

ISR 关键概念
  1. 领导者和追随者:每个分区有一个领导者和零个或多个追随者。领导者负责处理客户端的写请求,而追随者主要用于数据复制。

  2. ISR 集合:ISR 集合是分区领导者的一组追随者副本,它们与领导者保持数据同步。只有在 ISR 集合中的追随者副本可以参与数据的写入和读取操作。

  3. 数据复制:领导者将消息写入其本地日志,并定期将这些消息发送给 ISR 集合中的追随者。追随者接收消息后,将其写入本地日志,以保持数据同步。

  4. Leader Epoch 和 Log Start Offset:ISR 集合中的每个追随者都维护了领导者的日志信息,包括领导者的 Leader Epoch 和 Log Start Offset。这些信息用于确保数据的正确复制和同步。

  5. 数据一致性:只有在 ISR 集合中的所有追随者都成功复制了一条消息后,领导者才会将该消息标记为已提交,确保数据的一致性。

  6. 故障处理:如果某个追随者发生故障或者追赶进度过慢,那么该追随者可能会被从 ISR 集合中移除。这有助于保持数据的可靠性和避免影响性能。

其中,需要注意的的概念:

  • 分区中的所有副本统称为AR(Assigned Replicas)。

  • 所有Leader副本加上和Leader副本保持同步的Follower副本组成ISR(In-Sync Replicas)。

  • 所有没有保持同步的Follower副本组成OSR(Out-of-Sync Replicas)。

  • AR = ISR + OSR。正常情况下,所有Follower副本都应该和Leader副本一致,即AR=ISR。

  • 当Leader故障时,在ISR集合中的Follower才有资格被选举为新的Leader。

HW和LEO

在 Kafka 中,HW(High Watermark)和 LEO(Log End Offset)是与数据复制和消费有关的两个重要概念。

HW(High Watermark):HW 是指在分区中,已经被所有追随者(Follower)副本复制的消息的位置。HW 是每个分区的属性,它表示已经提交的消息。只有在 HW 之前的消息才被认为是已经提交的,这些消息已经被写入分区的所有追随者副本,并且被认为是安全的,不会丢失。HW 是为了确保数据一致性和可靠性而引入的。

LEO(Log End Offset):LEO 是指在分区中当前最新消息的位置。LEO 表示分区日志中的最后一条消息的偏移量。LEO 包括已经被写入但尚未被所有追随者副本复制的消息,以及正在等待被写入的消息。LEO 是一个动态的属性,它会随着新消息的写入而逐渐增加。

HW 和 LEO 之间的关系非常重要,它们可以帮助确保数据的可靠性和一致性:

  • HW 之前的消息是已经提交的消息,它们在数据复制中是安全的,不会丢失。

  • LEO 之前的消息是已经写入但尚未被所有追随者副本复制的消息。这些消息可能会在 HW 之前被提交,也可能会在之后被提交。

  • 一旦 HW 追赶上 LEO,表示所有的消息都已经提交,分区的数据一致性得到了保障。

Kafka的消息同步流程:

  1. 初始状态,HW和LEO在同一位置。消费者可以读取的有效消息为0,1,2,3.

  2. 消息写入Leader,LEO位置改变。Follower进行同步。

  3. Follower同步进度决定HW位置,消费者可读的有效消息0,1,2,3,4。

  4. 完成同步,消费者可读的有效消息0,1,2,3,4,5,6。

可以看出,Kafka的复制机制既不是完全的同步复制,也不是单纯异步复制。

  • 同步复制要求所有Follower副本都复制完,太影响性能了。

  • 异步复制只要数据被写入Leader副本就认为提交成功,在这种情况下,如果Leader宕机时候Follower还是落后于Leader就会造成数据丢失。

而Kafka使用的ISR机制则有效地权衡了数据可靠性和性能之间的关系。

Java使用Kafka通信

以下是 Kafka 生产者和消费者的简单示例,使用 Kafka 的 Java 客户端库(Kafka Producer 和 Kafka Consumer)来创建一个基本的消息传递示例。

Kafka 生产者示例
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
​
public class KafkaProducerExample {public static void main(String[] args) {String bootstrapServers = "localhost:9092"; // Kafka 服务器地址String topic = "my-topic"; // Kafka 主题名称
​Properties properties = new Properties();properties.put("bootstrap.servers", bootstrapServers);properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
​Producer<String, String> producer = new KafkaProducer<>(properties);
​// 发送消息producer.send(new ProducerRecord<>(topic, "key", "Hello, Kafka!"), (metadata, exception) -> {if (exception == null) {System.out.println("Message sent to partition " + metadata.partition() + ", offset " + metadata.offset());} else {System.err.println("Error sending message: " + exception.getMessage());}});
​producer.close();}
}
Kafka 消费者示例
import org.apache.kafka.clients.consumer.*;
import java.util.Properties;
import java.time.Duration;
import java.util.Collections;
​
public class KafkaConsumerExample {public static void main(String[] args) {String bootstrapServers = "localhost:9092"; // Kafka 服务器地址String groupId = "my-group"; // 消费者组 IDString topic = "my-topic"; // Kafka 主题名称
​Properties properties = new Properties();properties.put("bootstrap.servers", bootstrapServers);properties.put("group.id", groupId);properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
​KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);consumer.subscribe(Collections.singletonList(topic));
​while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.println("Received message: key = " + record.key() + ", value = " + record.value());}}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/169478.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【WinForm详细教程一】WinForm中的窗体、Label、TextBox及Button控件、RadioButton和CheckBox、ListBox

文章目录 1.WinForm文件结构2. 窗体的常用属性、方法与事件2.1 常用属性&#xff08;可直接在属性中设置&#xff09;2.2 常用方法2.3 常用事件 3.Label、TextBox及Button控件4.RadioButton和CheckBox5.ListBox&#xff08;列表框&#xff09; 1.WinForm文件结构 .sln文件 &am…

NFTScan | 10.16~10.22 NFT 市场热点汇总

欢迎来到由 NFT 基础设施 NFTScan 出品的 NFT 生态热点事件每周汇总。 周期&#xff1a;2023.10.16~ 2023.10.22 NFT Hot News 01/y00ts&#xff1a;迁移回以太坊的跨链桥已上线&#xff0c;将承担第一天所有 Gas 费 10 月 16 日&#xff0c;y00ts 发推称&#xff0c;将 y00…

scrapy的安装和使用

一、scrapy是什么&#xff1a;Scrapy是一个为了爬取网站数据&#xff0c;提取结构性数据而编写的应用框架&#xff0c;可以应用在包括数据挖掘&#xff0c;信息处理或存储历史数据等一系列的程序 二、scrapy的安装&#xff1a;pip install scrapy -i https://pypi.douban.com/…

vscode类似GitHub Copilot的插件推荐

由于GitHub Copilot前段时间学生认证的账号掉了很多&#xff0c;某宝激活也是价格翻了几倍&#xff0c;而却&#xff0c;拿来用一天就掉线&#xff0c;可以试试同类免费的插件哦。 例如&#xff1a;TabNine&#xff0c;下载插件后&#xff0c;他会提示你登录&#xff0c;直接登…

x-ui部署(与宝塔共存)

大家好&#xff0c;我叫徐锦桐&#xff0c;个人博客地址为www.xujintong.com。平时记录一下学习计算机过程中获取的知识&#xff0c;还有日常折腾的经验&#xff0c;欢迎大家来访。 x-ui是一个搭建节点的工具&#xff0c;有一键安装脚本&#xff0c;可以快速的部署。但是如果我…

分代ZGC详解

ZGC&#xff08;Z Garbage Collector&#xff09;是Java平台上的一种垃圾收集器&#xff0c;它是由Oracle开发的&#xff0c;旨在解决大堆的低延迟垃圾收集问题。ZGC是一种并发的分代垃圾收集器&#xff0c;它主要针对具有大内存需求和低停顿时间要求的应用程序 分代ZGC收集器…

JVM相关的面试题

一、什么是程序计数器 二、简要的介绍一下堆 三、什么是虚拟机栈 四、能不能解释下方法区 五、你听过直接内存吗&#xff1f; 六、什么是类加载器&#xff0c;类加载器有哪些 七、什么是双亲委派模型 八、JVM为什么采用双亲委派机制 九、类装载的执行过程 十、对象什么时候被垃…

粤嵌实训医疗项目day02(Vue + SpringBoot)

目录 一、创建vue项目并运行 二、vue-cli中的路由使用 三、element-ui框架、实现页面布局以及vue-路由 四、前端登录页面 五、user登录后端接口完善【后端】 六、user登录前端-请求工具-请求发起【前端】 七、请求的跨域-访问策略 八、完善项目的页面布局、导航菜单以及…

Nginx 防盗链

nginx防盗链问题 盗链&#xff1a; 就是a网站有一张照片&#xff0c;b网站引用了a网站的照片 。 防盗链&#xff1a; a网站通过设置禁止b网站引用a网站的照片。 nginx防止网站资源被盗用模块 ngx_http_referer_module 如何区分哪些是不正常的用户&#xff1f; HTTP Referer…

2023年信息科学与工程学院学生科协第一次前端培训

目录 一、前端是什么&#xff1f;前端能做什么&#xff1f;前端需要做什么&#xff1f;现阶段如何理解前端 二、前端学习路线html是什么&#xff1f;css是什么&#xff1f;什么是jshtml、css以及js关系掌握三种语言之后的学习路线 三、HTML基础语法标题段落文本换行文本标签图像…

Calibre拾遗:FDI (Foreign Database Interface)系统简介

Calibre是强大的GDS处理工具&#xff0c;包括查看&#xff0c;验证&#xff0c;分析等操作&#xff0c;操作由浅入深&#xff0c;除过手动编辑GDS的不是很灵活外&#xff0c;其他各种命令和操作策略&#xff0c;都是远&#xff08;遥&#xff09;远&#xff08;遥&#xff09;走…

视频去噪网络BSVD的实现

前些天写了视频去噪网络BSVD论文的理解&#xff0c;详情请点击这里&#xff0c;这两个星期动手实践了一下&#xff0c;本篇就来记录一下这个模型的实现。 这个网络的独特之处在于&#xff0c;它的训练和推理在实现上有所差别。在训练阶段&#xff0c;其使用了TSM&#xff08;T…

IP地址SSL证书 IP证书

在许多企业用例中&#xff0c;公司需要SSL证书作为IP地址。公司使用IP地址通过Internet访问各种类型的应用程序。 公网IP地址的SSL证书&#xff1a; 内部IP&#xff08;也称为私有IP&#xff09;是IANA设置为保存的IPv4或IPv6地址&#xff0c;例如&#xff1a; RFC 1918范围内…

【JavaEE】CAS -- 多线程篇(7)

CAS 1. 什么是 CAS2. CAS 伪代码3. CAS 是怎么实现的4. CAS的应用4.1 实现原子类4.2 实现自旋锁 5. CAS 的 ABA 问题 1. 什么是 CAS CAS: 全称Compare and swap&#xff0c;字面意思:”比较并交换“能够比较和交换 某个寄存器中的值和内存中的值, 看是否相等, 如果相等, 则把另…

Java面试(JVM篇)——JVM 面试题合集 深入理解JVM虚拟机

关于什么是JVM&#xff1f; 作用&#xff1a; 运⾏并管理Java 源码⽂件所⽣成的Class⽂件&#xff0c;在不同的操作系统上安装不同的JVM &#xff0c;从⽽实现了跨平台的保证。 ⼀般情况下&#xff0c;对于开发者⽽⾔&#xff0c;即使不熟悉JVM 的运⾏机制并不影响业务代码的…

lvs+keepalived: 高可用集群

lvskeepalived: 高可用集群 keepalived为lvs应运而生的高可用服务。lvs的调度器无法做高可用&#xff0c;于是keepalived软件。实现的是调度器的高可用。 但是&#xff1a;keepalived不是专门为集群服务的&#xff0c;也可以做其他服务器的高可用。 lvs的高可用集群&#xf…

防止消息丢失与消息重复——Kafka可靠性分析及优化实践

系列文章目录 上手第一关&#xff0c;手把手教你安装kafka与可视化工具kafka-eagle Kafka是什么&#xff0c;以及如何使用SpringBoot对接Kafka 架构必备能力——kafka的选型对比及应用场景 Kafka存取原理与实现分析&#xff0c;打破面试难关 防止消息丢失与消息重复——Kafka可…

Hadoop3教程(三十五):(生产调优篇)HDFS小文件优化与MR集群简单压测

文章目录 &#xff08;168&#xff09;HDFS小文件优化方法&#xff08;169&#xff09;MapReduce集群压测参考文献 &#xff08;168&#xff09;HDFS小文件优化方法 小文件的弊端&#xff0c;之前也讲过&#xff0c;一是大量占用NameNode的空间&#xff0c;二是会使得寻址速度…

Redis数据类型——list类型数据的扩展操作

1.list阻塞式数据获取 2.list类型数据业务场景

电脑软件:推荐一款非常强大的pdf阅读编辑软件

目录 一、软件简介 二、功能介绍 1、界面美观&#xff0c;打开速度快 2、可直接编辑pdf 3、非常强大好用的注释功能 4、很好用的页面组织和提取功能 5、PDF转word效果非常棒 6、强大的OCR功能 三、软件特色 四、软件下载 pdf是日常办公非常常见的文档格式&#xff0c;…