Kafka Producer发送消息流程之消息异步发送和同步发送

文章目录

  • 1. 异步发送
  • 2. 同步发送

在这里插入图片描述

1. 异步发送

Kafka默认就是异步发送,在Main线程中的多条消息,没有严格的先后顺序,Sender发送后就继续下一条,异步接受结果。

public class KafkaProducerCallbackTest {public static void main(String[] args) throws InterruptedException {//创建producerHashMap<String, Object> config = new HashMap<>();config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config);for (int i = 0; i < 10; i++) {//创建recordProducerRecord<String, String> record = new ProducerRecord<String, String>("test2",""+i,"我是你爹"+i);//发送recordproducer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {System.out.println("回调信息:消息发送成功");}});System.out.println("发送数据");}//关闭producerproducer.close();}
}

Main线程中,对于多条数据,下一条消息的发送并不等待上一条消息的确认,而是继续发送。

2024-07-17 21:43:46.052 [kafka-producer-network-thread | producer-1] INFO  org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: BqIgDGtwTeeusL_ygHtn2w
发送数据
发送数据
发送数据
发送数据
发送数据
发送数据
发送数据
发送数据
发送数据
发送数据
2024-07-17 21:43:46.075 [main] INFO  org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2024-07-17 21:43:46.280 [kafka-producer-network-thread | producer-1] INFO  o.a.k.c.producer.internals.TransactionManager - [Producer clientId=producer-1] ProducerId set to 6000 with epoch 0
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
2024-07-17 21:43:46.569 [main] INFO  org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed

可以看到先是main线程循环发送完了多条数据,然后再异步收到通知。

2. 同步发送

消息有严格的先后顺序,下一条消息必须等到上一条消息的回调确认后,再发送,这是一个效率极低的过程。

按照流程图,上一条消息需要从生产者一直流转,多个步骤,到数据收集器,到Sender,最后还要等待回调确认,才可以开始下一条消息的流转。

public class KafkaProducerCallbackTest {public static void main(String[] args) throws InterruptedException, ExecutionException {//创建producerHashMap<String, Object> config = new HashMap<>();config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config);for (int i = 0; i < 10; i++) {//创建recordProducerRecord<String, String> record = new ProducerRecord<String, String>("test2",""+i,"我是你爹"+i);//发送recordFuture<RecordMetadata> send = producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {System.out.println("回调信息:消息发送成功");}});System.out.println("发送数据");send.get();}//关闭producerproducer.close();}
}
2024-07-17 21:49:19.586 [kafka-producer-network-thread | producer-1] INFO  o.a.k.c.producer.internals.TransactionManager - [Producer clientId=producer-1] ProducerId set to 5000 with epoch 0
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
2024-07-17 21:49:19.823 [main] INFO  org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2024-07-17 21:49:19.838 [main] INFO  org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/380713.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

JVM:JavaAgent技术

文章目录 一、Java工具的介绍二、Java Agent技术1、介绍2、静态加载模式3、动态加载模式 三、搭建java agent静态加载模式环境1、创建maven项目2、编写类和premain方法3、编写MANIFEST.MF文件4、使用maven-assembly-plugin进行打包5、创建Spring Boot应用 一、Java工具的介绍 …

C++ | Leetcode C++题解之第240题搜索二维矩阵II

题目&#xff1a; 题解&#xff1a; class Solution { public:bool searchMatrix(vector<vector<int>>& matrix, int target) {int m matrix.size(), n matrix[0].size();int x 0, y n - 1;while (x < m && y > 0) {if (matrix[x][y] targ…

HTML5大作业三农有机,农产品,农庄,农旅网站源码

文章目录 1.设计来源1.1 轮播图页面头部效果1.2 栏目列表页面效果1.3 页面底部导航效果 2.效果和源码2.1 源代码 源码下载万套模板&#xff0c;程序开发&#xff0c;在线开发&#xff0c;在线沟通 作者&#xff1a;xcLeigh 文章地址&#xff1a;https://blog.csdn.net/weixin_4…

Spring3(代理模式 Spring1案例补充 Aop 面试题)

一、代理模式 在代理模式&#xff08;Proxy Pattern&#xff09;中&#xff0c;一个类代表另一个类的功能&#xff0c;这种类型的设计模式属于结构型模式。 代理模式通过引入一个代理对象来控制对原对象的访问。代理对象在客户端和目标对象之间充当中介&#xff0c;负责将客户端…

Visual Studio 2022美化

说明&#xff1a; VS版本&#xff1a;Visual Studio Community 2022 背景美化 【扩展】【管理扩展】搜索“ClaudiaIDE”&#xff0c;【下载】&#xff0c;安装完扩展要重启VS 在wallhaven下载壁纸图片作为文本编辑器区域背景图片 【工具】【选项】搜索ClaudiaIDE&#xff…

基于python深度学习遥感影像地物分类与目标识别、分割实践技术应用

目录 专题一、深度学习发展与机器学习 专题二、深度卷积网络基本原理 专题三、TensorFlow与Keras介绍与入门 专题四、PyTorch介绍与入门 专题五、卷积神经网络实践与遥感图像场景分类 专题六、深度学习与遥感图像检测 专题七、遥感图像检测案例 专题八、深度学习与遥感…

四、GD32 MCU 常见外设介绍

系统架构 1.RCU 时钟介绍 众所周知&#xff0c;时钟是MCU能正常运行的基本条件&#xff0c;就好比心跳或脉搏&#xff0c;为所有的工作单元提供时间 基数。时钟控制单元提供了一系列频率的时钟功能&#xff0c;包括多个内部RC振荡器时钟(IRC)、一个外部 高速晶体振荡器时钟(H…

《背包乱斗》为什么好玩 苹果电脑怎么玩《背包乱斗》游戏 mac怎么玩steam windows游戏

在当今竞争激烈的游戏市场中&#xff0c;《背包乱斗》以其独特的魅力在众多作品中脱颖而出&#xff0c;吸引了大量玩家的关注和喜爱。其创新的游戏机制和不断迭代的内容&#xff0c;加之出色的视觉效果和社区建设&#xff0c;使其成为了游戏界的一股清流。 一、《背包乱斗》为…

【计算机视觉】siamfc论文复现实现目标追踪

什么是目标跟踪 使用视频序列第一帧的图像(包括bounding box的位置)&#xff0c;来找出目标出现在后序帧位置的一种方法。 什么是孪生网络结构 孪生网络结构其思想是将一个训练样本(已知类别)和一个测试样本(未知类别)输入到两个CNN(这两个CNN往往是权值共享的)中&#xff0…

jmeter部署

一、windows环境下部署 1、安装jdk并配置jdk的环境变量 (1) 安装jdk jdk下载完成后双击安装包&#xff1a;无限点击"下一步"直到完成&#xff0c;默认路径即可。 (2) jdk安装完成后配置jdk的环境变量 找到环境变量中的系统变量&#xff1a;此电脑 --> 右键属性 …

Figma 中文版指南:获取和安装汉化插件

Figma是一种主流的在线团队合作设计工具&#xff0c;也是一种基于 Web 端的设计工具。在当今的设计时代&#xff0c;Figma 的使用满足了每个人的设计需求&#xff0c;不仅可以实现在线编辑&#xff0c;还可以方便日常管理&#xff0c;有效提高工作效率。然而&#xff0c;相信很…

RPM、YUM 安装 xtrabackup 8 (mysql 热备系列一)包含rpm安装 mysql 8 配置主从

RPM安装 percona-xtrabackup-80-8.0.35-30.1.el7.x86_64.rpm 官网&#xff1a; https://www.percona.com/ 下载地址&#xff1a; https://www.percona.com/downloads wget https://downloads.percona.com/downloads/percona-distribution-mysql-ps/percona-distribution-mysq…

【Vue3】响应式数据

【Vue3】响应式数据 背景简介开发环境基本数据类型对象数据类型使用 reactive 定义对象类型响应式数据使用 ref 定义对象类型响应式数据 ref 和 reactive 的对比使用原则建议 背景 随着年龄的增长&#xff0c;很多曾经烂熟于心的技术原理已被岁月摩擦得愈发模糊起来&#xff0…

Linux系统之部署扫雷小游戏(三)

Linux系统之部署扫雷小游戏(三) 一、小游戏介绍1.1 小游戏简介1.2 项目预览二、本次实践介绍2.1 本地环境规划2.2 本次实践介绍三、检查本地环境3.1 检查系统版本3.2 检查系统内核版本3.3 检查软件源四、安装Apache24.1 安装Apache2软件4.2 启动apache2服务4.3 查看apache2服…

项目管理进阶之RACI矩阵

前言 项目管理进阶系列续新篇。 RACI&#xff1f;这个是什么矩阵&#xff0c;有什么用途&#xff1f; 在项目管理过程中&#xff0c;如Team规模超5以上时&#xff0c;则有必要采用科学的管理方式&#xff0c;满足工作需要。否则可能事倍功半。 Q&#xff1a;什么是RACI矩阵 …

【HarmonyOS】HarmonyOS NEXT学习日记:五、交互与状态管理

【HarmonyOS】HarmonyOS NEXT学习日记&#xff1a;五、交互与状态管理 在之前我们已经学习了页面布局相关的知识&#xff0c;绘制静态页面已经问题不大。那么今天来学习一下如何让页面动起来、并且结合所学完成一个代码实例。 交互 如果是为移动端开发应用&#xff0c;那么交…

Unity Apple Vision Pro 开发(四):体积相机 Volume Camera

文章目录 &#x1f4d5;教程说明&#x1f4d5;教程内容概括&#x1f4d5;体积相机作用&#x1f4d5;创建体积相机&#x1f4d5;添加体积相机配置文件&#x1f4d5;体积相机配置文件参数&#x1f4d5;体积相机的边界盒大小&#x1f4d5;体积相机边界盒大小和应用边界盒大小的区别…

弹性网络回归(Elastic Net Regression)

弹性网络回归&#xff08;Elastic Net Regression&#xff09;的详细理论知识推导 理论背景 弹性网络回归结合了岭回归&#xff08;Ridge Regression&#xff09;和Lasso回归&#xff08;Lasso Regression&#xff09;的优点&#xff0c;通过引入两个正则化参数来实现特征选择…

kubernetes k8s Deployment 控制器配置管理 k8s 红蓝部署 金丝雀发布

目录 1、Deployment控制器&#xff1a;概念、原理解读 1.1 Deployment概述 1.2 Deployment工作原理&#xff1a;如何管理rs和Pod&#xff1f; 2、Deployment资源清单文件编写技巧 3、Deployment使用案例&#xff1a;创建一个web站点 4、Deployment管理pod&#xff1a;扩…

通义千问AI模型对接飞书机器人-模型配置(2-1)

一 背景 根据业务或者使用场景搭建自定义的智能ai模型机器人&#xff0c;可以较少我们人工回答的沟通成本&#xff0c;而且可以更加便捷的了解业务需求给出大家设定的业务范围的回答&#xff0c;目前基于阿里云的通义千问模型研究。 二 模型研究 参考阿里云帮助文档&#xf…