kafka(四)消息类型

一、同步消息

1、生产者

同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回 ack。 由于 send 方法返回的是一个 Future 对象,根据 Futrue 对象的特点,我们也可以实现同 步发送的效果,只需在调用 Future 对象的 get 方发即可。

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;
import java.util.concurrent.ExecutionException;public class CustomProducerSync {public static void main(String[] args) throws ExecutionException, InterruptedException {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// key,value序列化(必须):properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 3. 创建kafka生产者对象KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {// 默认为异步发送kafkaProducer.send(new ProducerRecord<>("first1", "atguigu" + i));// 末尾加get为同步发送kafkaProducer.send(new ProducerRecord<>("first1", "atguigu" + i)).get();}// 5. 关闭资源kafkaProducer.close();}
}

二、异步消息

1、生产者

异步消息有两种:

1.1、普通异步
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class CustomProducer {public static void main(String[] args) {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// key,value序列化(必须):properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 3. 创建kafka生产者对象KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {kafkaProducer.send(new ProducerRecord<>("first", "wtyy"));}// 5. 关闭资源kafkaProducer.close();}
}
1.2、带回调函数的异步发送

回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是 RecordMetadata 和 Exception,如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。

注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

import org.apache.kafka.clients.producer.*;import java.util.Properties;public class CustomProducerCallBack {public static void main(String[] args) {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// key,value序列化(必须):// 序列化器的serialization是一个接口,找到他的实现类// 我们一般都是使用Stringproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 3. 创建kafka生产者对象KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {kafkaProducer.send(new ProducerRecord<>("first1", "atguigu" + i),new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {//(1)消息发送成功  exception == null  接受到服务端ack消息   调用该方法//(2)消息发送失败  exception != null  也会调用该方法if (exception == null) {System.out.println(metadata);//使用打印演示}else{exception.printStackTrace();//打印异常信息}}});}// 5. 关闭资源kafkaProducer.close();}
}

三、顺序消息

以订单为例,

  • 生产者将相同的key的订单状态事件推送到kafka的同一分区
  • kafka 消费者接收消息
  • 消费者将消息提交给线程池
  • 线程池根据接收到的消息,将订单状态事件使用路由策略选择其中一个线程,将具有相同路由key的事件发送到同一个线程的阻塞队列中
  • 单个线程不停的从阻塞队列获取订单状态消息消费

@RestController
public class OrderController {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@GetMapping("/send")public String send() throws InterruptedException {int size = 1000;for (int i = 0; i < size; i++) {OrderDto orderDto = new InterOrderDto();orderDto.setOrderNo(i + "");orderDto.setPayStatus(getStatus(0));orderDto.setTimestamp(System.currentTimeMillis());//相同的key发送到相同的分区kafkaTemplate.send(Constants.TOPIC_ORDER, orderDto.getOrderNo(), JSON.toJSONString(orderDto));TimeUnit.MILLISECONDS.sleep(10);orderDto.setPayStatus(getStatus(1));orderDto.setTimestamp(System.currentTimeMillis());kafkaTemplate.send(Constants.TOPIC_ORDER, orderDto.getOrderNo(), JSON.toJSONString(orderDto));TimeUnit.MILLISECONDS.sleep(10);orderDto.setPayStatus(getStatus(2));orderDto.setTimestamp(System.currentTimeMillis());kafkaTemplate.send(Constants.TOPIC_ORDER, orderDto.getOrderNo(), JSON.toJSONString(orderDto));}return "success";}private String getStatus(int status){return status == 0 ? "待支付" : status == 1 ? "已支付" : "支付失败";}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/357371.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

python---OpenCv(二),背景分离方法较有意思

目录 边界矩形 旋转矩形(最小外接矩形): 计算轮廓 找4个点的坐标 把浮点型转为Int 画轮廓 边界矩形--&#xff08;最大外接矩形&#xff09; 转灰度 找轮廓 找顶点 画矩形 显示 背景分离方法&#xff08;这个很好玩&#xff0c;可以识别在动的物体&#xff09; 边…

Redis源码学习:quicklist的设计与实现

为什么需要quicklist 假设你已经知道了ziplist的缺陷&#xff1a; 虽然节省空间&#xff0c;但是申请内存必须是连续的&#xff0c;如果内存占用比较多&#xff0c;申请效率低要存储大量数据&#xff0c;超过了ziplist的最佳上限后&#xff0c;性能有影响 借鉴分片思想&…

栅格数据重心迁移变化分析

目前网络上大多是针对矢量重心迁移进行计算&#xff0c;或把栅格转矢量在进行计算&#xff0c;可以不用怎么麻烦&#xff0c;可以直接利用栅格进行得出多期数据的重心&#xff0c;然后进行变化分析等方面的分析。 矢量数据可以通过下面方式进行重心计算&#xff1a; 使用ArcGIS…

1.1 数据采集总览

正所谓巧妇难为无米之炊&#xff0c;数据采集是数据处理的第一步。 什么是数据采集 数据采集&#xff0c;也称为数据收集&#xff0c;是将原始数据从各种来源获取并存储起来的过程。这个过程是数据分析和数据仓库建设的第一步&#xff0c;涉及到从不同的数据源中提取数据&…

Element-UI实现el-dialog弹框拖拽功能

在实际开发中&#xff0c;会发现有些系统&#xff0c;弹框是可以在浏览器的可见区域自由拖拽的&#xff0c;这极大方便用户的操作。但在查看Element-UI中弹框&#xff08;el-dialog&#xff09;组件的文档时&#xff0c;发现并未实现这一功能。不过也无须担心&#xff0c;vue中…

搜索引擎数据库介绍

搜索引擎数据库的定义 搜索引擎数据库是一类专门用于数据内容搜索的NoSQL数据库&#xff0c;是非结构化大数据处理分析领域中重要的角色。搜索引擎数据库使用索引对数据中的相似特征进行归类&#xff0c;并提高搜索能力。通过对索引和检索过程的优化&#xff0c;以处理大量文本…

前后端经验分享:秋招春招赛道如何选择

前言&#xff1a;考研考公&#xff1f;国企互联网&#xff1f;老白小粉也曾对未来的方向选择产生迷茫&#xff0c;但最终老白小粉都选择了就业 →前端春招秋招经验分享 →后端春招秋招经验分享 因此今天这篇文章主要针对秋招春招的就业赛道给予学弟学妹们一些建议。 对于计算机…

【深度学习系列】全面指南:安装TensorFlow的CPU和GPU版本

本博客旨在为初学者提供一份全面的指南&#xff0c;介绍如何根据个人电脑的配置选择并安装适合的TensorFlow版本。内容涵盖了如何查看电脑显卡型号以确定是安装CPU还是GPU版本的TensorFlow&#xff0c;创建Python虚拟环境&#xff0c;以及使用conda命令查找可用的TensorFlow版本…

厂里资讯之异步通知文章上下架

kafka及异步通知文章上下架 1)自媒体文章上下架 需求分析 2)kafka概述 消息中间件对比 特性ActiveMQRabbitMQRocketMQKafka开发语言javaerlangjavascala单机吞吐量万级万级10万级100万级时效性msusmsms级以内可用性高&#xff08;主从&#xff09;高&#xff08;主从&#…

在 iPhone 上恢复已删除联系人的 5 种简便方法

想象一下&#xff1a;您正在 iPhone 上滚动并搜索要拨打的联系人&#xff0c;但却找不到任何结果。然后您想起昨晚您试图删除一个名字相似的联系人&#xff0c;但不知何故删除了错误的联系人。或者您的孩子错误地删除了一些联系人。这些情况足以让您感到迷茫。但别担心&#xf…

五种HTTP数据传输方式

在前端开发过程中,后端主要提供 http 接口来传输数据,而这种数据传输方式主要有五种: url paramqueryform-urlencodedform-datajson 下面就让我们一起来了解一下在Nest.js中如何使用这五种HTTP数据传输方式: 一,创建项目 使用nest new 创建一个nest的项目 nest new 项目名称 …

1panel OpenResty 设置网站重定向

当我们部署网站时需要&#xff0c;输入"cheshi.com"域名回车&#xff0c;希望他自动跳转https://cheshi.com/indx/&#xff0c;而不是直接跳转https://cheshi.com时可以利用重定向来实现&#xff0c; 这里演示的是 1panel 如何设置&#xff08;nginx 貌似也是这样配…

IPv6 address status lifetime

IPv6 地址状态转换 Address lifetime (地址生存期) 每个配置的 IPv6 单播地址都有一个生存期设置&#xff0c;该设置确定该地址在必须刷新或替换之前可以使用多长时间。某些地址设置为“永久”并且不会过期。“首选”和“有效”生存期用于指定其使用期限和可用性。 自动配置的…

程序猿大战Python——面向对象——继承进阶

方法重写 目标&#xff1a;掌握方法的重写。 当父类的同名方法达不到子类的要求&#xff0c;则可以在子类中对方法进行重写。语法&#xff1a; class 父类名(object):def 方法A(self):代码... class 子类名(父类名):def 方法A(self):代码... 例如&#xff0c;一起来完成&…

八爪鱼现金流-025-工作的终极目标,不是为了成为更好的员工

工作的终极目标&#xff0c;不是为了成为更好的员工。 而是解放时间和收入自动化 打造自己的被动收入!!! 八爪鱼现金流 八爪鱼

学生选课管理系统(JAVA课设)PS:有前端界面

1.课设要求描述 实现系统的所有功能&#xff0c;包括但不限于&#xff1a; 学生信息管理&#xff08;增加、删除、修改、查询&#xff09;课程信息管理选课操作成绩管理 2.制作思路及基础讲解 此项目主要是用于完成大二下半学期的JAVA大作业&#xff0c;也可当作课设&…

SpringMVC系列七: 手动实现SpringMVC底层机制-上

手动实现SpringMVC底层机制 博客的技术栈分析 &#x1f6e0;️具体实现细节总结 &#x1f41f;准备工作&#x1f34d;搭建SpringMVC底层机制开发环境 实现任务阶段一&#x1f34d;开发ZzwDispatcherServlet&#x1f966;说明: 编写ZzwDispatcherServlet充当原生的DispatcherSer…

摄像头画面显示于unity场景

&#x1f43e; 个人主页 &#x1f43e; &#x1faa7;阿松爱睡觉&#xff0c;横竖醒不来 &#x1f3c5;你可以不屠龙&#xff0c;但不能不磨剑&#x1f5e1; 目录 一、前言二、UI画面三、显示于场景四、结语 一、前言 由于标题限制&#xff0c;这篇文章主要是讲在unity中调用摄…

【网络安全常用术语解读 :什么是0day、1day、nday漏洞】

脆弱性攻击的时间窗被称作脆弱性窗口。通常情况下&#xff0c;一个安全漏洞的时间越久&#xff0c;攻击者就会有更多的机会去攻击它。 2. 0day 漏洞 0天漏洞&#xff0c;也被称作"零日漏洞"&#xff0c;是指尚未由供应商公布的缺陷&#xff0c;表示攻击者已知晓该缺…

Go 与 Java 字符编码选择:UTF-8 与 UTF-16 的较量

&#x1f49d;&#x1f49d;&#x1f49d;欢迎莅临我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:「stormsha的主页」…