【Kafka】Kafka生产者数据重复、数据有序、数据乱序-07

【Kafka】Kafka生产者数据重复、数据有序、数据乱序-07

  • 1. 数据重复
    • 1.1 数据传递语义
    • 1.2 幂等性
      • 1.2.1 如何开启幂等性
      • 1.2.2 同一个消息,多个分区都会存在吗?
    • 1.3 事务
      • 1.3.1 Kafka 事务原理
      • 1.3.2 Kafka事务的作用和意义
        • 作用
        • 具体应用场景
  • 2. 数据有序
  • 3. 数据乱序

1. 数据重复

1.1 数据传递语义

  • 至少一次(At Least Once)= ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2

  • 最多一次(At Most Once)= ACK级别设置为0

  • 总结:
    At Least Once可以保证数据不丢失,但是不能保证数据不重复;
    At Most Once可以保证数据不重复,但是不能保证数据不丢失。

  • 精确一次(Exactly Once):对于一些非常重要的信息,比如和钱相关的数据,要求数据既不能重复也不丢失。

Kafka 0.11版本以后,引入了一项重大特性:幂等性和事务。

1.2 幂等性

幂等性就是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复。

精确一次(Exactly Once) = 幂等性 + 至少一次( ack=-1 + 分区副本数>=2 + ISR最小副本数量>=2) 。

重复数据的判断标准:具有<PID, Partition, SeqNumber>相同主键的消息提交时,Broker只会持久化一条。其中PID(Producer Id)是Kafka每次重启都会分配一个新的;Partition 表示分区号;Sequence Number是单调自增的。

所以幂等性只能保证的是在单分区单会话内不重复。

当幂等性Producer开启时,Kafka通过以下机制来保证消息的幂等性:

  1. Producer ID(PID)和Sequence Number:
    每个幂等性Producer在初始化时都会分配一个唯一的Producer ID(PID)。
    每条消息在发送时会被分配一个递增的Sequence Number(序列号)。
    Kafka Broker通过PID和Sequence Number来判断消息是否重复。
  2. 去重机制:
    当Broker收到一条消息时,会检查消息的PID和Sequence Number。如果消息的PID和Sequence Number已经存在,Broker会认为这是一个重复的消息,并且不会再次写入。
    这种机制只在单个分区内有效。如果消息发送到不同的分区,Kafka无法保证幂等性。

在这里插入图片描述

1.2.1 如何开启幂等性

开启方法:

  1. 二次开发代码中添加 “props.put(“enable.idempotence”,true)”。
  2. 客户端配置文件中添加 “enable.idempotence = true”。
// 初始化配置,开启事务特性
Properties props = new Properties();
props.put("enable.idempotence", true);
props.put("transactional.id", "transaction1");
...KafkaProducer producer = new KafkaProducer<String, String>(props);

1.2.2 同一个消息,多个分区都会存在吗?

在Kafka中,同一个消息在多个分区中一般不会存在。Kafka的设计原则之一是消息在分区间是分布的,而不是复制的。以下是一些关键点:

Kafka消息分区

  1. 分区(Partition):
    每个Kafka主题(Topic)可以有多个分区(Partitions),消息在这些分区之间分布。每个消息会被发送到一个特定的分区,而不是所有分区。
    分区可以提高并行处理能力和扩展性,因为不同的分区可以由不同的消费者并行处理。

  2. 消息键(Message Key):
    当你向Kafka发送消息时,可以指定一个键(Key)。Kafka使用这个键来决定消息应该被写入哪个分区。相同键的消息会被写入同一个分区,从而保证了消息的顺序性。
    如果没有指定键,Kafka会使用轮询(Round-Robin)或者其他算法来将消息分配到不同的分区。

  3. 副本(Replica):
    虽然同一个消息不会被写入多个分区,但Kafka有一个副本机制(Replication),用于提高数据的可靠性和容错性。每个分区有一个主副本(Leader)和多个从副本(Follower),这些副本会在不同的Broker上保存相同的数据。
    当Producer发送消息到一个分区的主副本时,主副本会将消息复制到从副本中,以保证数据的高可用性。

1.3 事务

1.3.1 Kafka 事务原理

在这里插入图片描述

Kafka 的事务一共有如下5个API

// 1 初始化事务
void initTransactions();// 2 开启事务
void beginTransaction() throws ProducerFencedException;// 3 在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException;// 4 提交事务
void commitTransaction() throws ProducerFencedException;// 5 放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;
package com.atguigu.kafka.producer;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class CustomProducerTranactions {public static void main(String[] args) {// 0 配置Properties properties = new Properties();// 连接集群 bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");// 指定对应的key和value的序列化类型 key.serializer
//        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 指定事务idproperties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tranactional_id_01");// 1 创建kafka生产者对象// "" helloKafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);// 初始化事务kafkaProducer.initTransactions();// 开启事务kafkaProducer.beginTransaction();try {// 2 发送数据for (int i = 0; i < 5; i++) {kafkaProducer.send(new ProducerRecord<>("first", "atguigu" + i));}// 模拟失败int i = 1 / 0;// 提交事务kafkaProducer.commitTransaction();} catch (Exception e) {// 放弃事务kafkaProducer.abortTransaction();} finally {// 3 关闭资源kafkaProducer.close();}}
}

1.3.2 Kafka事务的作用和意义

作用
  1. 保证消息的原子性:
    事务可以保证一组消息的写入要么全部成功,要么全部失败。对于需要在多个分区或多个主题上写入数据的场景,事务能够确保数据的原子性。

  2. 避免数据丢失和重复:
    通过事务机制,Kafka可以避免消息在网络或系统故障时出现丢失或重复的情况。事务保证了每条消息的唯一性和可靠性。

  3. 支持跨分区和跨主题的操作:
    事务支持跨多个分区和多个主题的原子操作,使得Kafka在处理复杂数据流时更加灵活和可靠。

  4. 简化一致性处理:
    使用事务,开发者可以更简单地实现分布式系统中的数据一致性,而不需要手动处理分布式事务协调和一致性检查。

  5. 支持幂等性:
    事务机制基于幂等性,确保每条消息在分区内唯一,不会因重试操作导致重复消息。

具体应用场景
  1. 金融交易:
    在金融系统中,事务可以确保交易数据的完整性和一致性,避免资金损失和数据错乱。
  2. 订单处理:
    电商平台中的订单处理需要保证多个步骤(如库存检查、支付处理、订单确认)的原子性,事务可以确保订单处理的可靠性。
  3. 日志聚合:
    在日志收集和处理系统中,事务可以保证多条相关日志的完整性,避免丢失或重复。
  4. 数据同步:
    在多数据中心或多系统的数据同步中,事务可以确保数据的同步操作原子性,避免数据不一致。

2. 数据有序

在这里插入图片描述

3. 数据乱序

  1. kafka在1.x版本之前保证数据单分区有序,条件如下:
max.in.flight.requests.per.connection=1(不需要考虑是否开启幂等性)。
  1. kafka在1.x及以后版本保证数据单分区有序,条件如下:
    a.未开启幂等性 : max.in.flight.requests.per.connection需要设置为1
    b.开启幂等性: max.in.flight.requests.per.connection需要设置小于等于5
    原因说明:因为在kafka1.x以后,启用幂等后,kafka服务端会缓存producer发来的最近5个request的元数据,
    故无论如何,都可以保证最近5个request的数据都是有序的。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/355535.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

FP7195做大功率钓鱼灯应用方案,0.1%深度无极无频闪调光调色应用,调光曲线顺滑无突兀

文章目录 文章目录 方案背景 一、夜钓灯电路框架 二、FP7195芯片介绍 芯片参数 总结 方案背景 目前夜钓正在逐渐变得时尚起来&#xff0c;随着夜钓群体的年轻化&#xff0c;人们对于夜钓灯的审美要求也越来越高。夜钓灯作为夜间钓鱼的重点装备&#xff0c;不仅仅需要高质量的光…

足底筋膜炎的症状

足底筋膜炎是足底的肌腱或者筋膜发生无菌性炎症所致&#xff0c;其症状主要包括&#xff1a; 1、疼痛&#xff1a;这是足底筋膜炎最常见和突出的症状。疼痛通常出现在足跟或足底近足跟处&#xff0c;有时压痛较剧烈且持续存在。晨起时或长时间不活动后&#xff0c;疼痛感觉尤为…

重生奇迹MU整理装备注意事项

想成为奇迹MU的顶尖玩家&#xff0c;整理装备是必不可少的一项技能。在这篇文章中&#xff0c;我们将为您分享一些整理装备的注意事项与技巧&#xff0c;帮助您在游戏中更好地管理装备&#xff0c;提升你的实力。 整理装备&#xff0c;须知几点 整理装备是每位玩家必须完成的…

Scikit-Learn梯度提升决策树(GBDT)

Scikit-Learn梯度提升决策树 1、梯度提升决策树(GBDT)1.1、Boosting方法1.2、GBDT的原理1.3、GBDT回归的损失函数1.4、梯度下降与梯度提升1.5、随机森林与GBDT1.6、GBDT的优缺点2、Scikit-Learn梯度提升决策树(GBDT)2.1、Scikit-Learn GBDT回归2.1.1、Scikit-Learn GBDT回归…

GNSS边坡监测站

TH-WY1随着科技的飞速发展&#xff0c;各种先进的监测技术不断涌现&#xff0c;为边坡安全监测提供了有力保障。其中&#xff0c;GNSS边坡监测站以其高精度、实时性强的特点&#xff0c;受到了广泛关注。 GNSS边坡监测站&#xff0c;全称为全球导航卫星系统边坡监测站&#xf…

3D Gaussian Splatting Windows安装

0.安装C++ 编译器 https://aka.ms/vs/17/release/vs_buildtools.exe 1.下载源码 git clone https://github.com/graphdeco-inria/gaussian-splatting --recursive 2.安装cuda NVIDIA GPU Computing Toolkit CUDA Toolkit Archive | NVIDIA Developer 3.安装COLMAP

卓越的 App UI 风格引领潮流

卓越的 App UI 风格引领潮流

【C++】哈希的概念及STL中有关哈希容器的使用

目录 前言一、unordered系列关联式容器1.1 标准库中的unordered_set1.1.1 unordered_set的介绍1.1.2 unordered_set的常用接口说明1.1.2.1 unordered_set对象的常见构造1.1.2.1.1 [无参构造函数](https://legacy.cplusplus.com/reference/unordered_map/unordered_map/)1.1.2.1…

探索序列到序列模型:了解编码器和解码器架构的强大功能

目录 一、说明 二、什么是顺序数据&#xff1f; 三、编码器解码器架构的高级概述&#xff1a; 3.1 编码器和解码器架构的简要概述&#xff1a; 3.2 训练机制&#xff1a;编码器和解码器架构中的前向和后向传播&#xff1a; 四、编码器解码器架构的改进&#xff1a; 4.1.…

Spring自定义标签体系和应用

我们知道&#xff0c;在使用Dubbo框架时&#xff0c;需要指定配置文件中的application、protocol、registry、provider、service等服务器端和客户端的配置项&#xff0c;典型的配置方法如下所示。通过这些配置项&#xff0c;我们可以基于Spring容器来启动Dubbo服务。 <!-- …

SpringBoot 实现RequestBodyAdvice封装统一接受类功能

一、相关往期文章 SpringBootVue实现AOP系统日志功能_aop的vue完整项目 Spring AOP (面向切面编程&#xff09;原理与代理模式—实例演示_面向切面aop原理详解 二、需求分析 按照一般情况&#xff0c;统一接受类可以像以下的方式进行处理&#xff1a; 如果不想使用 Request…

Shiro721 反序列化漏洞(CVE-2019-12422)

目录 Shiro550和Shiro721的区别 判断是否存在漏洞 漏洞环境搭建 漏洞利用 利用Shiro检测工具 利用Shiro综综合利用工具 这一篇还是参考别的师傅的好文章学习Shiro的反序列化漏洞 上一篇也是Shiro的反序列化漏洞&#xff0c;不同的是一个是550一个是721&#xff0c;那么这…

【vue scrollTo 数据无限滚动 】

vue数据无限滚动 参考来源 Vue3 实现消息无限滚动的新思路 —— 林三心不学挖掘机 vue3代码 <template><div class"scroll-container" ref"scrollRef"><div v-for"(item, index) in list" :key"index" style"hei…

网络协议安全:TCP/IP协议栈的安全问题和解决方案

「作者简介」:北京冬奥会网络安全中国代表队,CSDN Top100,就职奇安信多年,以实战工作为基础对安全知识体系进行总结与归纳,著作适用于快速入门的 《网络安全自学教程》,内容涵盖Web安全、系统安全等12个知识域的一百多个知识点,持续更新。 这一章节我们需要知道TCP/IP每…

【Git】--Part3--远程操作 配置 标签管理

1. 远程仓库 Git 是分布式版本控制系统&#xff0c;同⼀个 Git 仓库&#xff0c;可以分布到不同的机器上。怎么分布呢&#xff1f; 最早&#xff0c;肯定只有⼀台机器有⼀个原始版本库&#xff0c;此后&#xff0c;别的机器可以 “克隆” 这个原始版本库&#xff0c;⽽且每台机…

JavaSE 面向对象程序设计初级 静态static 包package 常量final 代码块 代码实操理论内存原理详解

目录 static(静态) 静态的特点 应用示例 静态变量 静态方法 注意事项 内存图 重新认识main方法 包 什么是包 使用导包在什么时候 final关键字 常量 命名规范 细节&#xff08;重点&#xff09; 权限修饰符 代码块 局部代码块 构造代码块 静态代码块 个人号…

vue中实现百度地图全国与省市地图切换

前言 本文主要是用于示例全国地图&#xff0c;点击省市地图直接跳转到该省市地图并展示&#xff0c;可以拓展在地图上显示标记点&#xff08;本文未做示例&#xff09;&#xff0c;后续有完整代码&#xff0c;但是由于需要与本来项目业务代码进项分割&#xff0c;可能会有些问题…

多模态大模型面对误导性问题:看懂图片也会答错,一骗就中招

多模态大语言模型&#xff08;MLLMs&#xff09;因其在视觉理解和推理方面的突出表现&#xff0c;例如生成详细的图像描述和回答复杂的问题等&#xff0c;逐渐成为近期AI研究的热点。 然而&#xff0c;Bunny 团队的最新研究发现&#xff0c;尽管许多MLLMs对视觉内容能够正确理…

RocketMQ快速入门:集成spring, springboot实现各类消息消费(七)附带源码

0. 引言 rocketmq支持两种消费模式&#xff1a;pull和push&#xff0c;在实际开发中这两种模式分别是如何实现的呢&#xff0c;在spring框架和springboot框架中集成有什么差异&#xff1f;今天我们一起来探究这两个问题。 1. java client实现消息消费 1、添加依赖 <depen…

方舟云康亏损收窄:三年近10亿销售成本,平均付费及月活仍大幅承压

《港湾商业观察》施子夫 三度递表后&#xff0c;终于通过聆讯&#xff0c;方舟云康控股有限公司(以下简称&#xff0c;方舟云康)有望近期内挂牌港交所。方舟云康的国内运营主体为广州方舟云康信息科技集团有限公司、广州方舟医药有限公司。 值得关注的是&#xff0c;亏损的难…