【kafka实战】03 SpringBoot使用kafka生产者和消费者示例

本节主要介绍用SpringBoot进行开发时,使用kafka进行生产和消费

一、引入依赖

<dependencies><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.8.6</version></dependency>
</dependencies>

二、添加topic

使用offset explore软件,添加一个topic
在这里插入图片描述

三、配置文件

server:port: 8080
spring:kafka:consumer:# Kafka服务器bootstrap-servers: 192.168.56.201:9092# 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D#auto-commit-interval: 2s# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:# earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费分区的记录# latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据(在消费者启动之后生成的记录)# none:当各分区都存在已提交的offset时,从提交的offset开始消费;只要有一个分区不存在已提交的offset,则抛出异常auto-offset-reset: latest# 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量enable-auto-commit: false# 键的反序列化方式#key-deserializer: org.apache.kafka.common.serialization.StringDeserializerkey-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 值的反序列化方式value-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 这个参数定义了poll方法最多可以拉取多少条消息,默认值为500。如果在拉取消息的时候新消息不足500条,那有多少返回多少;如果超过500条,每次只返回500。# 这个默认值在有些场景下太大,有些场景很难保证能够在5min内处理完500条消息,# 如果消费者无法在5分钟内处理完500条消息的话就会触发reBalance,# 然后这批消息会被分配到另一个消费者中,还是会处理不完,这样这批消息就永远也处理不完。# 要避免出现上述问题,提前评估好处理一条消息最长需要多少时间,然后覆盖默认的max.poll.records参数# 注:需要开启BatchListener批量监听才会生效,如果不开启BatchListener则不会出现reBalance情况max-poll-records: 100properties:# 两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalancemax:poll:interval:ms: 600000# 当broker多久没有收到consumer的心跳请求后就触发reBalance,默认值是10ssession:timeout:ms: 10000listener:# 在侦听器容器中运行的线程数,一般设置为 机器数*分区数concurrency: 4# 自动提交关闭,需要设置手动消息确认ack-mode: manual_immediate# 消费监听接口监听的主题不存在时,默认会报错,所以设置为false忽略错误missing-topics-fatal: false# 两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalancepoll-timeout: 600000producer:# Kafka服务器bootstrap-servers: 192.168.56.201:9092# 发生错误后,消息重发的次数,开启事务必须设置大于0。retries: 3# acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。# acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。# 开启事务时,必须设置为allacks: all# 当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。batch-size: 16384# 生产者内存缓冲区的大小。buffer-memory: 1024000# 键的序列化方式key-serializer: org.apache.kafka.common.serialization.StringSerializer# 值的序列化方式value-serializer: org.apache.kafka.common.serialization.StringSerializer

四、生产者代码

@Slf4j
@RestController
@RequestMapping("/msg")
public class SendMessageController {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@GetMapping("send")public String send() {for (int i = 0; i < 100; i++) {OrderInfo orderInfo = new OrderInfo();orderInfo.setAddress("成都市高新区");orderInfo.setOrderId(String.valueOf(i));orderInfo.setProductName("华为P60:" + i);kafkaTemplate.send("order.topic", "order:" + i, new Gson().toJson(orderInfo));}return "success";}
}

五、消费者代码

消费者代码采用手动ack的方式

@Slf4j
@Component
public class KafkaOrderConsumer {@KafkaListener(topics = "order.topic", groupId = "orderGroup")public void consumeOrder(ConsumerRecord<String, String> record, Acknowledgment ack) {log.info("消费订单消息key={},value={}", record.key(), record.value());ack.acknowledge();}
}

代码git仓库:https://gitee.com/syk1234/mqdmo

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/139139.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

zabbix自定义监控、钉钉、邮箱报警

目录 一、实验准备 二、安装 三、添加监控对象 四、添加自定义监控项 五、监控mariadb 1、添加模版查看要求 2、安装mariadb、创建用户 3、创建用户文件 4、修改监控模版 5、在上述文件中配置路径 6、重启zabbix-agent验证 六、监控NGINX 1、安装NGINX&#xff0c…

207.Flink(二):架构及核心概念,flink从各种数据源读取数据,各种算子转化数据,将数据推送到各数据源

一、Flink架构及核心概念 1.系统架构 JobMaster是JobManager中最核心的组件,负责处理单独的作业(Job)。一个job对应一个jobManager 2.并行度 (1)并行度(Parallelism)概念 一个特定算子的子任务(subtask)的个数被称之为其并行度(parallelism)。这样,包含并行子任…

实验五 熟悉 Hive 的基本操作

实验环境&#xff1a; 1.操作系统&#xff1a;CentOS 7。 2.Hadoop 版本&#xff1a;3.3.0。 3.Hive 版本&#xff1a;3.1.2。 4.JDK 版本&#xff1a;1.8。 实验内容与完成情况&#xff1a; &#xff08;1&#xff09;创建一个内部表 stocks&#xff0c;字段分隔符为英文逗号…

爬虫 — Scrapy 框架(一)

目录 一、介绍1、同步与异步2、阻塞与非阻塞 二、工作流程三、项目结构1、安装2、项目文件夹2.1、方式一2.2、方式二 3、创建项目4、项目文件组成4.1、piders/__ init __.py4.2、spiders/demo.py4.3、__ init __.py4.4、items.py4.5、middlewares.py4.6、pipelines.py4.7、sett…

BOM与DOM--记录

BOM基础&#xff08;BOM简介、常见事件、定时器、this指向&#xff09; BOM和DOM的区别和联系 JavaScript的DOM与BOM的区别与用法详解 DOM和BOM是什么&#xff1f;有什么作用&#xff1f; 图解BOM与DOM的区别与联系 BOM和DOM详解 JavaScript 中的 BOM&#xff08;浏览器对…

睿趣科技:抖音开通蓝V怎么操作的

在抖音这个充满创意和活力的社交媒体平台上&#xff0c;蓝V认证成为了许多用户的梦想之一。蓝V认证不仅是身份的象征&#xff0c;还可以增加用户的影响力和可信度。但是&#xff0c;要在抖音上获得蓝V认证并不是一件容易的事情。下面&#xff0c;我们将介绍一些操作步骤&#x…

小米笔试题——01背包问题变种

这段代码的主要思路是使用动态规划来构建一个二维数组 dp&#xff0c;其中 dp[i][j] 表示前 i 个产品是否可以组合出金额 j。通过遍历产品列表和可能的目标金额&#xff0c;不断更新 dp 数组中的值&#xff0c;最终返回 dp[N][M] 来判断是否可以组合出目标金额 M。如果 dp[N][M…

Android studio安卓生成APK文件安装包方法

1.点击Build->Generate Signed Bundle/APK 2.选择APK 3.首次生成&#xff0c;没有jks文件&#xff0c;就点击Create new。再次生成&#xff0c;直接点Next 4.选择创建jks文件路径 5.点击Next 6.选择release 7.生成完成的apk安装包路径

【论文阅读 08】Adaptive Anomaly Detection within Near-regular Milling Textures

2013年&#xff0c;太老了&#xff0c;先不看 比较老的一篇论文&#xff0c;近规则铣削纹理中的自适应异常检测 1 Abstract 在钢质量控制中的应用&#xff0c;我们提出了图像处理算法&#xff0c;用于无监督地检测隐藏在全局铣削模式内的异常。因此&#xff0c;我们考虑了基于…

uniapp小程序点击按钮直接退出小程序效果demo(整理)

点击按钮直接退出小程序 <navigator target"miniProgram" open-type"exit">退出小程序</navigator>

Android Key/Trust Store研究+ssl证书密钥

前言&#xff1a;软件搞环境涉及到了中间件thal trustzone certificate key&#xff0c;翻译过来是thal信任区域证书密钥 &#xff0c;不明白这是什么&#xff0c;学习一下 ssl证书密钥 SSL密钥是SSL加密通信中的重要组成部分。SSL证书通过加密算法生成&#xff0c;用于保护网…

Oracle 11g RAC部署笔记

搭了三次才搭好&#xff0c;要记录一下。 1. Oracle 11g RAC部署的相关步骤以及需要的包&#xff0c;可以参考这里。 Oracle 11g RAC部署_12006142的技术博客_51CTO博客Oracle 11g RAC部署&#xff0c;Oracle11gRAC部署操作环境&#xff1a;CentOS7.4Oracle11.2.0.4一、主机网…

解决老版本Oracle VirtualBox 此应用无法在此设备上运行问题

问题现象 安装华为eNSP模拟器的时候&#xff0c;对应的Oracle VirtualBox-5.2.26安装的时候提示兼容性问题&#xff0c;无法进行安装&#xff0c;具体版本信息如下&#xff1a; 软件对应版本备注Windows 11专业工作站版22H222621eNSP1.3.00.100 V100R003C00 SPC100终结正式版…

利用优化算法提高爬虫任务调度效率

目录 一、任务调度优化的重要性 二、选择合适的优化算法 三、建立任务调度模型 四、设计适应性函数 五、算法实施和调优 六、性能评估和优化结果分析 代码示例 总结 随着网络信息的爆炸式增长&#xff0c;网络爬虫在信息获取和数据挖掘等领域的应用越来越广泛。然而&am…

Arduino程序设计(十一)8×8 共阳极LED点阵显示(74HC595)

88 共阳极LED点阵显示 前言一、74HC595点阵模块1、74HC595介绍2、74HC595工作原理3、1088BS介绍4、74HC595点阵模块 二、点阵显示实验1、点阵显示初探2、点阵显示进阶3、点阵显示高阶3.1 点阵显示汉字&#xff08;方法1&#xff09;3.2 点阵显示汉字&#xff08;方法2&#xff…

conda的安装和使用

参考资料&#xff1a; https://www.bilibili.com/read/cv8956636/?spm_id_from333.999.0.0 https://www.bilibili.com/video/BV1Mv411x775/?spm_id_from333.999.0.0&vd_source98d31d5c9db8c0021988f2c2c25a9620 目录 conda是啥以及作用conda的安装conda的启动conda的配置…

2023华为杯D题——基于Kaya模型的碳排放达峰实证研究

一、前言 化石能源是推动现代经济增长的重要生产要素&#xff0c;经济生产活动与碳排放活动密切相关。充分认识经济增长与碳排放之间的关系对转变生产方式&#xff0c;确定碳达峰、碳中和路径极为必要。本研究在对经济增长与碳排放关系现有研究梳理的基础上&#xff0c;系统地分…

【二叉树魔法:链式结构与递归的纠缠】

本章重点 二叉树的链式存储二叉树链式结构的实现二叉树的遍历二叉树的节点个数以及高度二叉树的创建和销毁二叉树的优先遍历和广度优先遍历二叉树基础oj练习 1.二叉树的链式存储 二叉树的链式存储结构是指&#xff0c;用链表来表示一棵二叉树&#xff0c;即用链来指示元素的逻辑…

23. 图论 - 图的由来和构成

文章目录 图的由来图的构成Hi, 你好。我是茶桁。 从第一节课上到现在,我基本上把和人工智能相关的一些数学知识都教给大家了,终于来到我们人工智能数学的最后一个部分了,让我们从今天开始进入「图论」。 图论其实是一个比较有趣的领域,因为微积分其实更多的是对应连续型的…

iOS——KVC(键值编码)

键值编码&#xff08;KVC&#xff09; KVC&#xff08;Key Value Coding&#xff09;是一种允许以字符串形式间接操作对象属性的方式。 最基本的KVC是由NSKeyValueCoding协议提供支持&#xff0c;最基本的操作属性如下&#xff1a; setValue: 属性值 forKey: 属性名&#xff…