Kafka生产者相关

windows中kafka集群部署示例-CSDN博客

先启动集群或者单机也OK

引入依赖

    <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.9.0</version></dependency>

关于主题创建

理论来讲创建主题(Topic是Kafka的内部操作),无论生产者或是消费者都不能主动创建主题.

没有主题就不能生产数据

但是往往看到生产者可以创建主题,原因是kafka的内部自动创建主题机制,当生产者中有个管理员,没有该主题就会自动创建

auto.create.topics.enable 默认是true  如果改成false  那么生产者就无法创建了

因此主题是kafka的自动创建主题的机制来实现的,而非生产者创建主题

生产者利用kafka自动创建主题的机制来创建主题...........................................................................

/*** @author hrui* @date 2025/2/26 12:53*/
public class AdminTopicTest {public static void main(String[] args) {Map<String,Object> confMap=new HashMap<>();//例如我的集群是9091  9092 9093  这里无需关心具体连接哪个端口  随意一个端口confMap.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9091");//管理员对象Admin admin=Admin.create(confMap);/*** 构建主题的三个参数* 第一个参数:主题名称* 第二个参数:分区数量* 第三个参数:副本数量(short类型)*/NewTopic newTopic=new NewTopic("test1",1, (short) 1);//创建主题CreateTopicsResult topics = admin.createTopics(Arrays.asList(newTopic));//关闭管理者对象admin.close();}
}

NewTopic("test1",这里可以传个Map);可以自定义主题分区副本策略   不指定就默认

生产者流程图

生产者大致代码

public class KafkaProducerTest {public static void main(String[] args) {//创建配置对象Map<String,Object> configMap=new HashMap<>();//如果是集群随意指定一个configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");//对Key Value进行序列化操作configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//创建生产者对象KafkaProducer<String,String> kafkaProducer=new KafkaProducer<>(configMap);for(int i=0;i<10;i++){//key的作用是通过某种算法,放到topic的某个分区中ProducerRecord<String, String> record = new ProducerRecord<>("test", "key1","hello kafka" + i);kafkaProducer.send(record);}//关闭生产者对象kafkaProducer.close();}
}

生产者拦截器

可以对照流程图,看下生产者拦截器在什么位置,一般是对Key   value的整理转换,对生产的数据做统一规范化处理,可以配置多个

可以点进去

大致就是这么个过程

遍历 拦截器  并调用每个拦截器的onSend方法

可以看到每个拦截器都是ProducerInterceptor类型

自定义生产者拦截器

自定义一个类实现ProducerInterceptor

在创建生产者时候添加拦截器配置

生产者拦截器

package com.hrui.interceptor;import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;import java.util.Map;/*** @author hrui* @date 2025/2/26 14:20*/
public class ValueInterceptor implements ProducerInterceptor<String,String> {@Override//发送数据的时候,会调用public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {System.out.println("拦截器拦截到消息:"+producerRecord.value());return new ProducerRecord<>(producerRecord.topic(),producerRecord.key(),producerRecord.value()+"-拦截器");}@Override//发送数据完毕,服务器返回的响应,会调用此方法public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {}@Override//生产者对象关闭时候,会调用此方法public void close() {}@Override//创建生产者对象时候调用public void configure(Map<String, ?> map) {}
}

启动下 

生产者数据发送同步或异步

如果需要同步

ACKS数据接收应答处理机制

指的是:

生产者发送数据到 Kafka Broker 时,Kafka 如何处理消息的接收确认。通过设置 ACKS 参数,你可以控制 Kafka 如何在生产者发送消息后确认数据是否成功写入。

ACKS三个配置

ACKS=0  生产者发送数据之后,不等待任何确认,发送了 就认为你可能收到了,丢失不管

ACKS=1  生产者会等待 分区的主副本(Leader)确认消息已经写入到其磁盘中,主副本发送成功确认后,生产者就认为消息已经成功发送。 如果主副本挂了消息仍可能丢失,除非有副本在进行同步

ACKS=all(或ACKS=-1)  等待所有副本确认 消息保证不会丢失  性能会较低,因为生产者需要等待所有副本确认

默认ACKS=-1  

生产者数据重试(重发)功能

例如ACKS=1的情况下   Leader还没来的及将数据保存到磁盘

Broker挂了,此时生产者在等待回调  但是一直没回复,超过等待时间

Kafka退出超时重试机制  retry

可以配置retry重试机制

重试机制带来了好处,也有坏处

例如 broker并没有挂  只是因为网络不稳定    这就产生了数据重复和乱序现象

如何避免数据重复

如果ACSK 1或者-1(就是ALL)就是为了数据不丢失,增强可靠性

如果你禁用重试肯定是不行的

但是重试又会导致数据重复和乱序现象

Kafka提供了生产者幂等性操作:所谓生产者幂等性操作就是 生产者的消息无论向Kafka发送多少次,

Kafka的Leader只会保存一条,默认的幂等性是不起作用的

开启

要启用生产者的幂等性,必须设置以下两个配置:

  • acks=all(或 acks=-1):这要求生产者等待所有副本确认消息已成功写入,确保数据的持久性和一致性。
  • enable.idempotence=true:启用幂等性保证。
  • 且要开启重试处理
  • 在途请求缓冲区数量指的是 Kafka 生产者在发送消息时,等待确认的消息数量默认是5  不能超过5

在途请求缓冲区的数量:max.in.flight.requests.per.connection

幂等性 确保了相同分区内的消息不会重复,但在 多个分区 的情况下,跨分区的消息仍然无法避免乱序

生产者事务操作

事务可以保证生产者 ID 唯一   解决跨会话  每次重启  生产者ID会变化  加了事务可以保持不变

package com.hrui;import com.hrui.interceptor.KafkaProducerInterceptorTest;
import com.hrui.interceptor.ValueInterceptor;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;/*** @author hrui* @date 2025/2/26 13:36*/
public class KafkaProducerTest {public static void main(String[] args) throws ExecutionException, InterruptedException {//创建配置对象Map<String,Object> configMap=new HashMap<>();//如果是集群随意指定一个configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");//对Key Value进行序列化操作configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());configMap.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ValueInterceptor.class.getName());//可以配置ACKSconfigMap.put(ProducerConfig.ACKS_CONFIG,"-1");//配置幂等性configMap.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);//配置重试次数configMap.put(ProducerConfig.RETRIES_CONFIG,3);//配置超时configMap.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,3000);//配置事务 事务基于幂等性configMap.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"my-tx-id");//创建生产者对象KafkaProducer<String,String> kafkaProducer=new KafkaProducer<>(configMap);//初始化事务kafkaProducer.initTransactions();try {//开启事务kafkaProducer.beginTransaction();for(int i=0;i<10;i++){//key的作用是通过某种算法,放到topic的某个分区中//可以不设置key 默认是按照轮询的方式ProducerRecord<String, String> record = new ProducerRecord<>("test", "key1","hello kafka" + i);//发送数据  send方法还可以接收一个参数,就是回调函数  kafkaProducer.send(record);是异步的Future<RecordMetadata> send = kafkaProducer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e != null) {// 处理发送失败的情况e.printStackTrace();} else {// 处理发送成功的情况System.out.println("发送成功:" + recordMetadata);}}});send.get();}//提交事务kafkaProducer.commitTransaction();}catch (Exception e){e.printStackTrace();//中止事务kafkaProducer.abortTransaction();}finally {//关闭生产者对象kafkaProducer.close();}}
}

添加事务后  生产者默认会创建一个事务topic   默认50个分区

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/26280.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

《Effective Objective-C》阅读笔记(下)

目录 内存管理 理解引用计数 引用计数工作原理 自动释放池 保留环 以ARC简化引用计数 使用ARC时必须遵循的方法命名规则 变量的内存管理语义 ARC如何清理实例变量 在dealloc方法中只释放引用并解除监听 编写“异常安全代码”时留意内存管理问题 以弱引用避免保留环 …

一、对iic类模块分析与使用

bmp280驱动代码 说明&#xff1a; 1、该模块用于获取气压&#xff0c;温度&#xff0c;海拔等数据。 vcc&#xff0c;gnd接电源 sda &#xff0c;scl 接iic通信引脚 2、该模块使用iic通信&#xff0c;通过iic发送请求相关类的寄存器值&#xff0c;芯片获取对应寄存器返回的数据…

辛格迪客户案例 | 祐儿医药科技GMP培训管理(TMS)项目

01 项目背景&#xff1a;顺应行业趋势&#xff0c;弥补管理短板 随着医药科技行业的快速发展&#xff0c;相关法规和标准不断更新&#xff0c;对企业的质量管理和人员培训提出了更高要求。祐儿医药科技有限公司&#xff08;以下简称“祐儿医药”&#xff09;作为一家专注于创新…

汽车低频发射天线介绍

汽车低频PKE天线是基于RFID技术的深度研究及产品开发应用的一种天线&#xff0c;在汽车的智能系统中发挥着重要作用&#xff0c;以下是关于它的详细介绍&#xff1a; 移动管家PKE低频天线结构与原理 结构&#xff1a;产品一般由一个高Q值磁棒天线和一个高压电容组成&#xff…

Java 大视界 -- Java 大数据分布式文件系统的性能调优实战(101)

&#x1f496;亲爱的朋友们&#xff0c;热烈欢迎来到 青云交的博客&#xff01;能与诸位在此相逢&#xff0c;我倍感荣幸。在这飞速更迭的时代&#xff0c;我们都渴望一方心灵净土&#xff0c;而 我的博客 正是这样温暖的所在。这里为你呈上趣味与实用兼具的知识&#xff0c;也…

全国普通高等学校名单

全国普通高等学校名单 全国普通高等院校&#xff0c;简称“高校”&#xff0c;是指那些提供高等教育的学校&#xff0c;涵盖了大学、独立学院、高等专科学校以及高等职业学校等多种类型。这些机构通过国家普通高等教育招生考试&#xff0c;主要招收高中毕业生&#xff0c;并为…

Vue.js 学习笔记

文章目录 前言一、Vue.js 基础概念1.1 Vue.js 简介1.2 Vue.js 的特点1.3 Vue.js 基础示例 二、Vue.js 常用指令2.1 双向数据绑定&#xff08;v-model&#xff09;2.2 条件渲染&#xff08;v-if 和 v-show&#xff09;2.3 列表渲染&#xff08;v-for&#xff09;2.4 事件处理&am…

【Python】基础语法三

> 作者&#xff1a;დ旧言~ > 座右铭&#xff1a;松树千年终是朽&#xff0c;槿花一日自为荣。 > 目标&#xff1a;了解Python的函数、列表和数组。 > 毒鸡汤&#xff1a;有些事情&#xff0c;总是不明白&#xff0c;所以我不会坚持。早安! > 专栏选自&#xff…

传奇3光通版手游行会战攻略:团队协作与战术布局详解

戳一戳&#xff1b;了解更多 在《传奇3光通版》手游中&#xff0c;行会战是玩家们展现团队协作与战术布局的重要舞台。下面&#xff0c;我们就来详细解析一下行会战中的团队协作与战术布局攻略。 一、团队协作 ​职业搭配 在行会战中&#xff0c;合理的职业搭配至关重要。一般…

unity学习56:旧版legacy和新版TMP文本输入框 InputField学习

目录 1 旧版文本输入框 legacy InputField 1.1 新建一个文本输入框 1.2 InputField 的子物体构成 1.3 input field的的component 1.4 input Field的属性 2 过渡 transition 3 控件导航 navigation 4 占位文本 placeholder 5 文本 text 5.1 文本内容&#xff0c;用户…

99分巧克力

99分巧克力 ⭐️难度&#xff1a;中等 &#x1f31f;考点&#xff1a;二分 2017省赛真题 &#x1f4d6; &#x1f4da; import java.util.Scanner;public class Main {public static void main(String[] args) {Scanner sc new Scanner(System.in);int n sc.nextInt();i…

Python 基础知识全面总结

Python 是一种广泛应用的编程语言&#xff0c;具有简洁、易读、功能强大等特点。本文将对 Python 的基础知识进行全面梳理&#xff0c;涵盖从入门必备知识到各类模块和编程概念等内容。 一、Python基础语法 &#xff08;一&#xff09;标识符 定义&#xff1a;用于给变量、函…

7.1.1 计算机网络的组成

文章目录 物理组成功能组成工作方式完整导图 物理组成 计算机网络是将分布在不同地域的计算机组织成系统&#xff0c;便于相互之间资源共享、传递信息。 计算机网络的物理组成包括硬件和软件。硬件中包含主机、前端处理器、连接设备、通信线路。软件中包含协议和应用软件。 功…

领域驱动设计:事件溯源架构简介

概述 事件溯源架构通常由3种应用设计模式组成,分别是:事件驱动(Event Driven),事件溯源(Event Source)、CQRS(读写分离)。这三种应用设计模式常见于领域驱动设计(DDD)中,但它们本身是一种应用设计的思想,不仅仅局限于DDD,每一种模式都可以单独拿出来使用。 E…

【AD】3-10 原理图PDF导出

文件—智能PDF 多页原理图导出 导出设置时选择工程&#xff0c;可自行选择导出一页或多页原理图&#xff0c;一般PCB不用导出

半导体制造工艺(二)光刻工艺—掩模版

在上文中我们已经简单概述了光刻工艺的大致流程。接下来将会介绍在光刻工艺中所需用到的必备材料以及设备。例如掩模版、光刻胶、匀胶机、光刻机等等。由于需要保持讲述工艺的完整性以及流畅&#xff0c;每一个都需要涉及&#xff0c;所以每次仅是侧重点不同。此篇主要讲述的是…

ubuntu服务器安装VASP.6.4.3

ubuntu服务器安装VASP.6.4.3 1 安装Intel OneAPI Base Toolkit和Intel OneAPI HPC Toolkit1.1 更新并安装环境变量1.2 下载Intel OneAPI Base Toolkit和Intel OneAPI HPC Toolkit安装包1.3 安装 Intel OneAPI Base Toolkit1.4 安装 Intel OneAPI HPC Toolkit1.5 添加并激活环境…

Autosar RTE配置-Port Update配置及使用-基于ETAS工具

文章目录 前言Autosar Rte中enableUpdate参数定义ETAS工具中的配置生成代码分析总结前言 在E2E校验中,需要对Counter进行自增,但每个报文周期不一样,导致自增的周期不一样。且Counter应该在收到报文之后才进行自增。基于这些需求,本文介绍使用RTE Port中的参数enableUpdat…

文字滚动效果组件和按钮组件

今天和大家分享一个vue中好用的组件&#xff0c;是我自己写的&#xff0c;大家也可以自己改&#xff0c;就是文字的循环滚动效果&#xff0c;如下图&#xff0c;文字会向左移动&#xff0c;结束之后也会有一个循环&#xff0c;还有一个按钮组件&#xff0c;基本框架写的差不多了…

VMWare虚拟机Ubuntu Desktop怎么共享文件夹

1、虚拟机设置 在菜单"管理"里面找到"虚拟机设置"菜单&#xff0c;然后从"硬件"切换到"选项"&#xff0c;点到"共享文件夹"&#xff0c;然后在右侧选中"总是启用"&#xff0c;然后添加一个共享文件夹。 2、在我的…