RocketMQ快速入门:集成spring, springboot实现各类消息消费(七)附带源码

0. 引言

rocketmq支持两种消费模式:pull和push,在实际开发中这两种模式分别是如何实现的呢,在spring框架和springboot框架中集成有什么差异?今天我们一起来探究这两个问题。

1. java client实现消息消费

1、添加依赖

		<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.8.0</version></dependency>

1.1 Push消息消费

rocketmq的push消费是通过pull模式为基础来进行模拟的,就是通过监听器,不断的pull来实现,因此其实现重点就是实现监听器

rocektmq的监听器支持2种:

  • MessageListenerConcurrently 拉取到新消息之后就提交到线程池去消费
  • MessageListenerOrderly 通过加分布式锁和本地锁保证同时只有一条线程去消费一个队列上的数据,以此保证顺序消费

这里虽然还有MessageListener类型,实际上是上述两种的父类,该方法也被弃用了
在这里插入图片描述
所以push模式的的重点就是实现MessageListenerConcurrently监听器,其内部只有一个consumeMessage方法
在这里插入图片描述
那么实现的重点就是consumeMessage方法,这里我们睡眠了10s,用于模拟该监听器运行10s

public static void main(String[] args) throws InterruptedException, MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_test");consumer.setNamesrvAddr("127.0.0.1:9876");// 集群消费模式consumer.setMessageModel(MessageModel.CLUSTERING);// 设置topicconsumer.subscribe("topic_test", "*");// 注册回调函数,处理消息consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {for (MessageExt msg : list) {String topic = msg.getTopic();try {String messageBody = new String(msg.getBody(), "utf-8");System.out.println(topic+":"+messageBody);} catch (UnsupportedEncodingException e) {e.printStackTrace();}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动消费者实例consumer.start();Thread.sleep(10000);}

当然,如上的形式只能用于我们单元测试使用,集成在生产中时肯定不能这样用,我们需要将其注册为bean形式,并在项目启动时进行调用,让其注册为监听器

@Component
public class Consumer1PushListener implements MessageListenerConcurrently {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for (MessageExt msg : list) {String topic = msg.getTopic();try {String messageBody = new String(msg.getBody(), "utf-8");System.out.println(topic+":"+messageBody);} catch (UnsupportedEncodingException e) {e.printStackTrace();}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}@PostConstructpublic void init(){DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_test");consumer.setNamesrvAddr("127.0.0.1:9876");// 集群消费模式consumer.setMessageModel(MessageModel.CLUSTERING);// 注册监听器consumer.registerMessageListener(this);try{// 设置topicconsumer.subscribe("topic_test", "*");// 启动示例consumer.start();}catch (Exception e){e.printStackTrace();System.out.println("rocketmq 消费者启动失败");}}
}

我们启动项目,发送一条消息,会发现消费者可以实时消费

在这里插入图片描述
消息模式如何调整?
rocektmq 有集群模式和广播模式两种消息模式,如果需要调整的话,通过消费者的setMessageModel方法即可调整

// 集群模式
consumer.setMessageModel(MessageModel.CLUSTERING);
// 广播模式
consumer.setMessageModel(MessageModel.BROADCASTING);

1.2 Pull消息消费

pull模式的实现更加简单,直接查看pull消费者类DefaultMQPullConsumer,其下有pull方法
在这里插入图片描述
官方给出的示例代码如下:

public static void main(String[] args) throws MQClientException {DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");consumer.setNamesrvAddr("127.0.0.1:9876");consumer.start();try {MessageQueue mq = new MessageQueue();mq.setQueueId(0);mq.setTopic("topic_test");mq.setBrokerName("Broker");long offset = 26;PullResult pullResult = consumer.pull(mq, "*", offset, 32);if (pullResult.getPullStatus().equals(PullStatus.FOUND)) {System.out.printf("%s%n", pullResult.getMsgFoundList());consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());}} catch (Exception e) {e.printStackTrace();}consumer.shutdown();}

但是截止目前,该类已经被弃用了
在这里插入图片描述
更加推荐的是用DefaultLitePullConsumer类实现,其下的poll方法可以帮助我们更加方便的实现消息消费,这里需要注意,两个类,一个是pull,一个是poll,pull实际上是需要指定偏移量的,而poll则自动帮我们更新了偏移量

public static void main(String[] args) throws MQClientException {DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("group2");consumer.setNamesrvAddr("127.0.0.1:9876");consumer.subscribe("topic_test", "*");consumer.start();try {List<MessageExt> messageList = consumer.poll(3000);for (MessageExt message : messageList) {System.out.println("pull消费:"+new String(message.getBody()));}} catch (Exception e) {e.printStackTrace();}consumer.shutdown();}

发送几条消息,运行测试
在这里插入图片描述
生产中使用时,大家可以把DefaultLitePullConsumer定义为bean, 以此减少每次资源创建的消耗,具体方式可参考上述push模式的实现代码

1.3 顺序消息消费

rocketmq中提供了两种消费处理形式:并发消费(MessageListenerConcurrently)和顺序消费(MessageListenerOrderly

并发消费消费者会创建多个线程同时消费队列消息,而顺序消费流程跟并发消费最大的区别在于,顺序消费对要处理的队列加锁,确保同一队列,同一时间,只允许一个消费线程处理

我们在之前消息发送的章节已经提前体验过顺序消费代码实现了,通过上述对监听器类型的描述,我们也能知道顺序消费的实现,就是实现MessageListenerOrderly监听器

public static void main(String[] args) throws InterruptedException, MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_test");consumer.setNamesrvAddr("127.0.0.1:9876");// 集群消费模式consumer.setMessageModel(MessageModel.CLUSTERING);// 设置topicconsumer.subscribe("topic_order", "*");// 注册回调函数,处理消息consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {byte[] body = list.get(0).getBody();System.out.println("接收消息:"+new String(body, StandardCharsets.UTF_8));return ConsumeOrderlyStatus.SUCCESS;}});// 启动消费者实例consumer.start();Thread.sleep(10000);}

2. springboot实现消息消费

1、添加依赖

		<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.0</version></dependency>

2、修改配置项

rocketmq:name-server: localhost:9876producer:group: group_test # 生产者分组,事务消息会使用send-message-timeout: 3000 # 消息发送超时时长,默认3sretry-times-when-send-failed: 3 # 同步发送消息失败重试次数,默认2retry-times-when-send-async-failed: 3 # 异步发送消息失败重试次数,默认2

2.1 push消息消费

通过实现RocketMQListener<T>接口,其中T是泛型,及消息内容的数据类型,可以是String, JSONObject,也可以是自定义数据结构类型

将监听器声明为bean,并实现onMessage方法即可

@Component
@RocketMQMessageListener(topic = "topic_test", consumerGroup = "group_test")
public class MessageListener implements RocketMQListener<String> {@Overridepublic void onMessage(String s) {System.out.println("消费消息:" + s);}
}

注解中的messageModel属性可以用来设置消息模式,默认为集群模式

在这里插入图片描述

2.2 pull消息消费

添加消费者配置

rocketmq:name-server: localhost:9876consumer:group: "group_test"topic: "topic_test"

通过receive方法实现消费

 @GetMapping(value = "/poll")public void poll() {List<String> list = rocketMQTemplate.receive(String.class);for (String message : list) {System.out.println("poll消费:"+message);}}

2.3 顺序消息消费

与普通消息不同的是,要声明消费模式为顺序消费consumeMode= ConsumeMode.ORDERLY

@Component
@RocketMQMessageListener(topic = "topic_order", consumerGroup = "group_order", consumeMode= ConsumeMode.ORDERLY)
public class MessageOrderListener implements RocketMQListener<String> {@Overridepublic void onMessage(String s) {System.out.println("顺序消费消息:" + s);}
}

3. 总结

消息消费相对更加简单,实际上掌握一种之后,其他类型的也就能够举一反三了,本文也只是针对最常用的类型进行列举,还有更多参数的支持,需要大家在实际应用中探索。

本文演示源码见:https://gitee.com/wuhanxue/wu_study/tree/master/demo/rocketmq_demo

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/355500.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

方舟云康亏损收窄:三年近10亿销售成本,平均付费及月活仍大幅承压

《港湾商业观察》施子夫 三度递表后&#xff0c;终于通过聆讯&#xff0c;方舟云康控股有限公司(以下简称&#xff0c;方舟云康)有望近期内挂牌港交所。方舟云康的国内运营主体为广州方舟云康信息科技集团有限公司、广州方舟医药有限公司。 值得关注的是&#xff0c;亏损的难…

Python日志管理利器:如何高效管理平台日志

一、为什么需要日志管理&#xff1f; 日志是应用程序的重要组成部分&#xff0c;它记录了应用程序的运行状态、错误信息以及用户交互等关键信息。良好的日志管理可以帮助开发人员及时发现和解决问题&#xff0c;提高应用程序的稳定性和可靠性。 项目在本地开发调试时&#xf…

vscode字符多行自动增长插件。

多行字符自动增长插件CharAutoIncre 当你使用shiftalt选中了多行,并输入了’1’,这时这几行都变成了’1’. 这时你可以选中&#xff08;shift左键&#xff09;为’1’的这几行, 接下来按下shiftaltq此时’1’变为了’12345’自增长的样式。 同时本插件支持字符’a-z,A-Z’。 目…

【知识图谱】基于neo4j开发的信息化文档分析系统(源码)

一、项目介绍 一款全源码&#xff0c;可二开&#xff0c;可基于云部署、私有部署的企业级知识库云平台&#xff0c;一款让企业知识变为实打实的数字财富的系统&#xff0c;应用在需要进行文档整理、分类、归集、检索、分析的场景。 为什么建立知识库平台&#xff1f; 助力企业…

IDEA中 pom.xml 设置自动提示

IDEA中 pom.xml 自动提示 IDEA中 pom.xml 自动提示设置如下&#xff1a; file–>Settings–>Build,Execution…–>Build Tools–>Maven–>Repositories 会看到类似表格的画面&#xff0c;内容是你的maven地址&#xff0c;选中后&#xff0c;右边有个Update的按…

Python酷库之旅-第三方库openpyxl(01)

目录 一、 openpyxl库的由来 1、背景 2、起源 3、发展 4、特点 4-1、支持.xlsx格式 4-2、读写Excel文件 4-3、操作单元格 4-4、创建和修改工作表 4-5、样式设置 4-6、图表和公式 4-7、支持数字和日期格式 二、openpyxl库的优缺点 1、优点 1-1、支持现代Excel格式…

鄂州职业大学2024年成人高等继续教育招生简章

鄂州职业大学&#xff0c;作为一所享有盛誉的高等学府&#xff0c;一直以来都致力于为社会培养具备专业技能和良好素养的优秀人才。在成人高等继续教育领域&#xff0c;该校同样表现出色&#xff0c;为广大渴望继续深造、提升自身能力的成年人提供了宝贵的学习机会。 随着社会…

椭圆的矩阵表示法

椭圆的矩阵表示法 flyfish 1. 标准几何表示法 标准几何表示法是通过椭圆的几何定义来表示的&#xff1a; x 2 a 2 y 2 b 2 1 \frac{x^2}{a^2} \frac{y^2}{b^2} 1 a2x2​b2y2​1其中&#xff0c; a a a 是椭圆的长半轴长度&#xff0c; b b b 是椭圆的短半轴长度。 2.…

一文带你全面详细了解安全运维

一、安全运维-网络 1、IP地址相关 IP地址属于网络层地址&#xff0c;用于标识网络中的节点设备。 IP地址由32bit构成&#xff0c;每8bit一组&#xff0c;共占用4个字节。 IP地址由两部分组成&#xff0c;网络位和主机位。 IP地址分类&#xff1a; 类别网络位子网掩码私有地…

云原生Kubernetes系列项目实战-k8s集群+高可用负载均衡层+防火墙

一、Kubernetes 区域可采用 Kubeadm 方式进行安装&#xff1a; 名称主机部署服务master192.168.91.10docker、kubeadm、kubelet、kubectl、flannelnode01192.168.91.11docker、kubeadm、kubelet、kubectl、flannelnode02192.168.91.20docker、kubeadm、kubelet、kubectl、flan…

Linux 一键部署 Nginx1.26.1 + ModSecurity3

前言 ModSecurity 是 Apache 基金会的一个开源、高性能的 Web 应用程序防火墙(WAF),它提供了强大的安全规则引擎,用于检测和阻止各种攻击行为,如 SQL 注入、XSS 跨站点脚本攻击等。而 nginx 是一个高性能的 Web 服务器,常用于处理大量的并发请求,具有很高的负载均衡能力…

从0开始C++(三):构造函数与析构函数详解

目录 构造函数 构造函数的基本使用 构造函数也支持函数重载 构造函数也支持函数参数默认值 构造初始化列表 拷贝构造函数 浅拷贝和深拷贝 析构函数 总结 练习一下ヽ(&#xffe3;▽&#xffe3;)&#xff89; 构造函数 构造函数的基本使用 构造函数是一种特殊的成…

HTML(17)——圆角和盒子阴影

盒子模型——圆角 作用&#xff1a;设置元素的外边框为圆角 属性名&#xff1a;border-radius 属性值&#xff1a;数字px/百分比 也可以每个角设置不同的效果&#xff0c;从左上角顺时针开始赋值&#xff0c;没有取值的角与对角取值相同。 正圆 给正方形盒子设置圆角属性…

WordPress实时搜索插件Ajax Search Lite,轻松替代默认搜索功能

WordPress自带的默认搜索功能是跳转到搜索结果页&#xff0c;如果你想要实时搜索功能&#xff0c;特别是在问答中心显示搜索功能&#xff0c;那么建议使用这个WordPress实时搜索插件Ajax Search Lite&#xff0c;它可以在文章、页面、自定义类型文章中搜索标题、内容、摘要、自…

DP:完全背包+多重背包问题

完全背包和01背包的区别就是&#xff1a;可以多次选 一、完全背包&#xff08;模版&#xff09; 【模板】完全背包_牛客题霸_牛客网 #include <iostream> #include<string.h> using namespace std; const int N1001; int n,V,w[N],v[N],dp[N][N]; //dp[i][j]表示…

队列 + 宽搜(BFS)

例题一 解法&#xff1a; 算法思路&#xff1a; 层序遍历即可~ 仅需多加⼀个变量&#xff0c;⽤来记录每⼀层结点的个数就好了。 例题二 解法&#xff08;层序遍历&#xff09;&#xff1a; 算法思路&#xff1a; 在正常的层序遍历过程中&#xff0c;我们是可以把⼀层的结点…

SpringBoot整合justauth实现多种方式的第三方登陆

目录 0.准备工作 1.引入依赖 2.yml文件 3. Controller代码 4.效果 参考 0.准备工作 你需要获取三方登陆的client-id和client-secret 以github为例 申请地址&#xff1a;Sign in to GitHub GitHub 1.引入依赖 <?xml version"1.0" encoding"UTF-8&quo…

行车记录仪文件夹“0字节”现象解析与恢复策略

一、行车记录仪文件夹“0字节”现象描述 行车记录仪作为现代驾驶中的必备设备&#xff0c;其储存的视频数据对于事故记录和取证至关重要。然而&#xff0c;有时车主们可能会遇到这样一个问题&#xff1a;行车记录仪的某个文件夹内的文件突然变成了0字节大小&#xff0c;无法正…

用于快速充电站的 AC/DC 转换器概述

电动汽车构成了未来实现可持续交通部门的有前途技术的主要部分。AC/DC 转换器是扩展和改进 EV 功能的骨干组件。本文概述了 AC/DC 转换器、充电站类型、传统两电平 (2L) AC/DC 转换器面临的问题以及使用多电平转换器 (MLC) 的重要性。 AC/DC 充电器示意图&#xff08;&#xff…

北航数据结构与程序设计图部分选填题

一、 抓两个关键信息&#xff1a;无向图&#xff0c;邻接表。无向图中&#xff0c;边&#xff08;vi&#xff0c;vj&#xff09;要在vi的链表中记录一次&#xff0c;再以&#xff08;vj&#xff0c;vi&#xff09;的形式在vj的链表中记录一次。 每个边都要记录两次&#xff0c…