如何在 Spring Boot 中利用 RocketMQ 实现批量消息消费

文章目录

      • 准备工作
      • 项目依赖
      • 配置 RocketMQ
      • 生产批量消息
      • 消费批量消息
      • 测试批量消息发送和消费
      • 总结
      • 推荐阅读文章

RocketMQ 是一款分布式消息队列,支持高吞吐、低延迟的消息传递。对于需要一次处理多条消息的场景,RocketMQ 提供了批量消费的机制,这篇文章将展示如何在 Spring Boot 中实现这一功能。

准备工作

在开始之前,请确保你已经安装和配置好 RocketMQ。如果还没安装,请参考 RocketMQ 官网 获取安装指南。

项目依赖

首先,我们需要在 Spring Boot 项目中添加 RocketMQ 的依赖。打开 pom.xml 文件,添加以下内容:

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.1.0</version>
</dependency>

这个依赖包包含了与 RocketMQ 集成所需的所有内容。

配置 RocketMQ

application.yml 文件中添加 RocketMQ 的相关配置:

rocketmq:name-server: 127.0.0.1:9876consumer:group: batchConsumerGroupproducer:group: batchProducerGroup
  • name-server:RocketMQ 服务的地址
  • consumer.group:消息消费的分组
  • producer.group:消息生产的分组

确保 name-server 地址是正确的,指向你的 RocketMQ 服务。

生产批量消息

创建一个消息生产者,用于发送批量消息。以下是 BatchProducer.java 的示例代码:

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;import java.util.ArrayList;
import java.util.List;@Service
public class BatchProducer {@Autowiredprivate RocketMQTemplate rocketMQTemplate;public void sendBatchMessages() {List<Message<String>> messages = new ArrayList<>();for (int i = 0; i < 10; i++) {Message<String> message = MessageBuilder.withPayload("Hello RocketMQ " + i).build();messages.add(message);}rocketMQTemplate.syncSend("BatchTopic", messages, 10000);System.out.println("批量消息发送成功!");}
}
  • 这里,我们创建了 10 条消息并将它们添加到列表 messages 中。
  • 调用 rocketMQTemplate.syncSend 方法将消息批量发送到主题 BatchTopic

消费批量消息

接下来,我们创建一个消息消费者,用于批量消费消息。以下是 BatchConsumer.java 的示例代码:

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;import java.util.List;@Service
@RocketMQMessageListener(topic = "BatchTopic", consumerGroup = "batchConsumerGroup", selectorExpression = "*", consumeMessageBatchMaxSize = 10)
public class BatchConsumer implements RocketMQListener<List<String>> {@Overridepublic void onMessage(List<String> messages) {System.out.println("批量接收到消息:");messages.forEach(message -> System.out.println("消息内容:" + message));}
}

在这段代码中:

  • @RocketMQMessageListener 注解用于标识这是一个 RocketMQ 的消息监听器,指定了监听的主题 BatchTopic 和消费分组 batchConsumerGroup
  • consumeMessageBatchMaxSize = 10 表示每次批量消费最多 10 条消息。
  • onMessage 方法会处理接收到的消息列表,并逐条打印出消息内容。

测试批量消息发送和消费

创建一个简单的 Spring Boot 控制器,用于触发批量消息发送。以下是 MessageController.java 的代码:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class MessageController {@Autowiredprivate BatchProducer batchProducer;@GetMapping("/sendBatchMessages")public String sendBatchMessages() {batchProducer.sendBatchMessages();return "批量消息已发送";}
}

通过访问 http://localhost:8080/sendBatchMessages 触发消息发送。

  • 调用这个接口会将批量消息发送到 RocketMQ 主题 BatchTopic
  • BatchConsumer 会自动接收并批量处理这些消息。

总结

我们成功在 Spring Boot 中实现了 RocketMQ 的批量消息发送与消费:

  1. 使用 BatchProducer 类批量发送消息。
  2. 使用 BatchConsumer 类批量消费消息,并设置最大批量大小。
  3. 通过简单的 REST API 控制消息发送,确保一切顺利。

批量消息处理可以提高消息传递的效率,适合高并发场景。这种方式可以减少网络开销,并有效利用系统资源。

推荐阅读文章

  • 由 Spring 静态注入引发的一个线上T0级别事故(真的以后得避坑)

  • 如何理解 HTTP 是无状态的,以及它与 Cookie 和 Session 之间的联系

  • HTTP、HTTPS、Cookie 和 Session 之间的关系

  • 什么是 Cookie?简单介绍与使用方法

  • 什么是 Session?如何应用?

  • 使用 Spring 框架构建 MVC 应用程序:初学者教程

  • 有缺陷的 Java 代码:Java 开发人员最常犯的 10 大错误

  • 如何理解应用 Java 多线程与并发编程?

  • 把握Java泛型的艺术:协变、逆变与不可变性一网打尽

  • Java Spring 中常用的 @PostConstruct 注解使用总结

  • 如何理解线程安全这个概念?

  • 理解 Java 桥接方法

  • Spring 整合嵌入式 Tomcat 容器

  • Tomcat 如何加载 SpringMVC 组件

  • “在什么情况下类需要实现 Serializable,什么情况下又不需要(一)?”

  • “避免序列化灾难:掌握实现 Serializable 的真相!(二)”

  • 如何自定义一个自己的 Spring Boot Starter 组件(从入门到实践)

  • 解密 Redis:如何通过 IO 多路复用征服高并发挑战!

  • 线程 vs 虚拟线程:深入理解及区别

  • 深度解读 JDK 8、JDK 11、JDK 17 和 JDK 21 的区别

  • 10大程序员提升代码优雅度的必杀技,瞬间让你成为团队宠儿!

  • “打破重复代码的魔咒:使用 Function 接口在 Java 8 中实现优雅重构!”

  • Java 中消除 If-else 技巧总结

  • 线程池的核心参数配置(仅供参考)

  • 【人工智能】聊聊Transformer,深度学习的一股清流(13)

  • Java 枚举的几个常用技巧,你可以试着用用

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/468586.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

python-26-Python ORM系列之pymysql实现对数据库的增删改查及新建表

python-26-Python ORM系列之pymysql实现对数据库的增删改查及新建表 一.简介 在Python基础系列ORM部分我们为大家介绍了如何搭建MySQL数据和MySQL一些访问配置&#xff0c;同时细节的同学应该已经了解到了ORM的2个库pymysql和sqlalchemy&#xff1b; PyMySQL — MySQL 数据库…

ADSP21489 M25P16启动后无法使用USBi的问题

项目背景是ADSP21489 使用SPI MASTER 启动模式,程序存储在M25P16中 编译cces产生运行代码 第二步,插上USBi仿真器下载sigma topo 发现无法正常下载 操作多次发现需要目标板重新上点后需要拔插usbi才能下载和启动dsp程序 原因分析: 就是第一次插上usbi后,在给目标板上电,可…

量子计算包kaiwu安装过程踩过的坑

目录 1 安装过程 2 官方代码测试 3 踩坑说明 首先&#xff0c;目前的kaiwu版本仅支持python3.8&#xff0c;所以必须要下载python3.8才能运行kaiwu 1 安装过程 step1: 在页面的SDK标签下&#xff0c;找到对应操作系统的kaiwu包。 step2: 下载python3.8到本地&#xff0c;可…

线程相关概念

线程概念 线程是操作系统中一种基本的执行单元&#xff0c;是程序的最小调度单位。一个程序可以包含多个线程&#xff0c;每个线程代表一个独立的执行路径&#xff0c;使得程序可以并发地处理多个任务。 线程的基本概念 线程与进程的区别&#xff1a; 进程是资源分配的单位&…

SSH实验5密钥登录Linuxroot用户(免密登录)

当用户尝试通过SSH连接到远程服务器时&#xff0c;客户端会生成一对密钥&#xff1a;公钥和私钥。公钥被发送到远程服务器&#xff0c;并存储在服务器的~/.ssh/authorized_keys文件中。而私钥则由客户端保管&#xff0c;不会传输给服务器。 在连接过程中&#xff0c;客户端使用…

域名邮箱推荐:安全与稳定的邮件域名邮箱!

域名邮箱推荐及绑定攻略&#xff1f;最好用的域名邮箱服务推荐&#xff1f; 域名邮箱&#xff0c;作为一种个性化且专业的电子邮件服务&#xff0c;越来越受到企业和个人的青睐。烽火将详细介绍域名邮箱登录的全过程&#xff0c;从注册到登录&#xff0c;帮助您轻松掌握这一重…

政治经济学笔记

【拯救者】政治经济学速成&#xff08;基础习题&#xff09; 研究生产关系必须联系生产力和上层建筑 1.生产力与生产关系 生产力代表生产的物质内容&#xff0c;生产关系是生产的社会形式。生产力决定生产关系&#xff0c;生产关系对生产力具有 反作用 *其中的”反作用”指的是…

《TCP/IP网络编程》学习笔记 | Chapter 7:优雅地断开套接字连接

《TCP/IP网络编程》学习笔记 | Chapter 7&#xff1a;优雅地断开套接字连接 《TCP/IP网络编程》学习笔记 | Chapter 7&#xff1a;优雅地断开套接字连接基于 TCP 的半关闭单方面断开连接带来的问题套接字和流针对优雅断开的 shutdown 函数为何需要半关闭&#xff1f;基于半关闭…

python | 包

1. 在python中什么是包&#xff1f; ​ 包是一种组织代码的方式&#xff0c;如下图所示红色部分目录mypackage就称为一个包&#xff0c;它之所以称为一个包完全是因为它里面有蓝色方框里的文件__init__.py。 ​ 这个目录被定义为一个包之后&#xff0c;我们就可以通过import来…

Qt信号和槽-->day04

Qt信号和槽 标准的信号和槽函数Qt中的槽函数Qt中的信号 connect案例 自定义信号和槽案例分析 信号槽的拓展信号连接信号案例 信号槽的两种连接方式Qt5中的处理方式Qt4中的处理方式Qt5处理信号槽重载问题案例 lambda表达式简单案例Qt中的应用 补充知识点 标准的信号和槽函数 QW…

Golang | Leetcode Golang题解之第552题学生出勤记录II

题目&#xff1a; 题解&#xff1a; const mod int 1e9 7type matrix [6][6]intfunc (a matrix) mul(b matrix) matrix {c : matrix{}for i, row : range a {for j : range b[0] {for k, v : range row {c[i][j] (c[i][j] v*b[k][j]) % mod}}}return c }func (a matrix) p…

腾讯首个3D生成大模型Hunyuan3D-1.0分享

Hunyuan3D-1.0是腾讯混元团队开发的首个同时支持文字、图像转3D的大模型&#xff0c;可以基于文本描述或单张图像生成3D模型。 Hunyuan3D-1.0采用了多视图生成和多视图重建两阶段的方法&#xff0c;能够从不同视角捕捉对象的纹理和几何信息。 在多视图生成阶段&#xff0c;Hu…

【JavaEE初阶 — 多线程】内存可见性问题 volatile

1. 内存可见性问题 内存可见性的概念 什么是内存可见性问题呢&#xff1f; 当一个线程对共享变量进行了修改&#xff0c;那么另外的线程都是立即可以看到修改后的最新值。在Java中&#xff0c;可以借助 synchronized、volatile 以及各种Lock 实现可见性。如果我们将变量声…

排序算法.

排序算法是最常用的一种算法.它解决的主要问题是在一定的时间复杂度和空间复杂度的条件下,对n个数按照一定的顺序进行排序.排序算法主要分为四大类,即插入类,交换类,选择类和归并类,不同的排序算法的时间复杂程度和空间复杂程度差别很大. 排序算法主要有以下几种: 1.插入类排…

iOS18.1通話錄音實測 錄音夠清晰 文字轉錄廣東話用唔到?

iOS 18.1功能實測&#xff5c;期待已久的通話錄音功能終在iOS18.1推出&#xff0c;讓用家可以在通話過程中輕鬆錄音&#xff0c;並附上逐字稿功能&#xff0c;為使用者提供更靈活的通話記錄方式。記者實測通話錄音功能&#xff0c;看看錄音清晰度、方便性、逐字轉錄的表現。 打…

403 Request Entity Too Lager(请求体太大啦)

昨天收到 QA 的生产报障&#xff0c;说是测试环境的附件上传功能报了 403 的错误&#xff0c;错误信息&#xff1a;403 Request Entity Too Lager。我尝试复现问题&#xff0c;发现传个几兆的文件都费劲啊&#xff0c;一传一个失败。不用说&#xff0c;项目用到 ng 代理&#x…

【C++】新手入门指南

> &#x1f343; 本系列为初阶C的内容&#xff0c;如果感兴趣&#xff0c;欢迎订阅&#x1f6a9; > &#x1f38a;个人主页:[小编的个人主页])小编的个人主页 > &#x1f380; &#x1f389;欢迎大家点赞&#x1f44d;收藏⭐文章 > ✌️ &#x1f91e; &#x1…

ElasticSearch备考 -- Cross cluster replication(CCR)

一、题目 操作在cluster1&#xff08;local&#xff09;中操作索引task&#xff0c;复制到cluster2&#xff08;remote&#xff09;中 二、思考 CCR 我们可以对标MySQL 理解为为主从&#xff0c;后者备份。主节点负责写入数据&#xff0c;从/备节点负责同步时主节点的数据。 …

IDEA在编译时: java: 找不到符号符号: 变量 log

一、问题 IDEA在编译的时候报Error:(30, 17) java: 找不到符号符号: 变量 log Error:(30, 17) java: 找不到符号 符号: 变量 log 位置: 类 com.mokerson.rabbitmq.config.RabbitMqConfig 二、解决方案 背景&#xff1a;下载其他同事代码时&#xff0c;第一次运行&#xff0c…

一文熟悉新版llama.cpp使用并本地部署LLAMA

0. 简介 最近是快到双十一了再给大家上点干货。去年我们写了一个大模型的系列&#xff0c;经过一年&#xff0c;大模型的发展已经日新月异。这一次我们来看一下使用llama.cpp这个项目&#xff0c;其主要解决的是推理过程中的性能问题。主要有两点优化&#xff1a; llama.cpp …