RocketMQ使用

说明:本文介绍RocketMQ的消费模式&消息类型,RocketMQ的安装参考及简单使用,参考:http://t.csdn.cn/BKFPj

消费模式

RocketMQ与RabbitMQ最大的区别在于,RocketMQ是根据消息的Topic锁定消费者的,Topic属性设置为相同的消费者,可以看做是一个消费者集群。消息模式分为以下三种:

(1)一对一

最简单的一种方式,消息的Topic只被一个消费者消费,如下:

(生产者)

    @Autowiredprivate RocketMQTemplate rocketMQTemplate;@Testpublic void simpleTest(){rocketMQTemplate.syncSend("simple","hello rocketmq!");}

(消费者)

@Component
@RocketMQMessageListener(consumerGroup = "groupA", topic = "simple")
public class ConsumerListener implements RocketMQListener<String> {@Overridepublic void onMessage(String s) {System.out.println("s = " + s);}
}

执行结果

在这里插入图片描述

(2)一对多

当存在多个Topic相同的消费者时,这些消费者共同消费消息,如下:

(开启两个消费者,Topic相同)

在这里插入图片描述

(生产者)

    @Testpublic void oneToMany(){for (int i = 0; i < 10; i++) {rocketMQTemplate.syncSend("simple","one to many" + i);}}

执行结果,可以看到负载均衡策略是随机;

在这里插入图片描述

在这里插入图片描述

(3)多对多

参考一对多方式,发送多个Topic的消息,让多种Topic的消费者接收消息;

消息类型

根据消息的类型和对消息的处理,可以分为以下几种:

(1)同步消息

同步消息,消息发送到MQ,MQ保存成功后才会返回结果,在API中是以"sync"(synchronous,同步)开头的一些方法,可以看到这些方法都有返回值,可以通过返回结果判断是否发送成功;
在这里插入图片描述

(消费者)

@Component
@RocketMQMessageListener(consumerGroup = "groupA", topic = "simple")
public class ConsumerListener implements RocketMQListener<String> {@Overridepublic void onMessage(String s) {System.out.println("接收到同步消息 = " + s);}
}

(生产者,可以通过返回结果判断发送是否成功)

	@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Testpublic void simpleTest1(){SendResult sendResult = rocketMQTemplate.syncSend("simple", "这是一个异步消息");System.out.println("sendResult.getSendStatus() = " + sendResult.getSendStatus());}

在这里插入图片描述

(2)异步消息

异步消息,消息发送给MQ后代码就会立即向下执行,在API中是以“asyn”(asynchronous,异步),可以手动设置发送消息成功与否执行的方法;

(生产者)

    @Testpublic void simpleTest2() throws InterruptedException {rocketMQTemplate.asyncSend("simple", "这是一个异步消息", new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("成功信息" + sendResult.toString());}@Overridepublic void onException(Throwable throwable) {System.out.println("异常信息" + throwable.getMessage());}});TimeUnit.SECONDS.sleep(2);}

(发送消息成功,执行成功的方法)

在这里插入图片描述

需要注意,这里是指发送消息成功与否,与消费者是否成功消费无关;

(3)单向消息

单向消息,是指只管发送消息,不关系MQ是否成功接收,没有返回值;

    @Testpublic void simpleTest3() {rocketMQTemplate.sendOneWay("simple", "这是一个单向消息");}

(4)延迟消息

延迟消息,指给消息设置一个延迟级别,达到指定时间后,消费者才能收到这个消息,延迟级别如下:

# 延迟级别,从1开始
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

(生产者)

    @Testpublic void simpleTest4() {// 设置超时为1秒,延迟等级为3,即10秒rocketMQTemplate.syncSend("simple", MessageBuilder.withPayload("这是一个延迟消息").build(),1000,3);}

(消费者,10秒后才收到消息)

在这里插入图片描述

延迟消息相较于RabbitMQ,使用起来更方便,但是只能设置时间等级,不能设置准确时间,非常难受;

(5)批量消息

RocketMQ可以发送一个集合,如下:

(消费者)

    @Testpublic void simpleTest5(){ArrayList<Message> list = new ArrayList<>();list.add(MessageBuilder.withPayload("aaa").build());list.add(MessageBuilder.withPayload("bbb").build());list.add(MessageBuilder.withPayload("ccc").build());rocketMQTemplate.syncSend("simple", list, 3000);}

(执行结果)

在这里插入图片描述

(6)消息过滤

消息过滤,是RocketMQ较与RabbitMQ独有的功能,指对发送的消息进行过滤,指接收限定条件的消息,对消息进行限制接收。有两种方式,如下:

a. 标签过滤

在发送消息时,指定topic的同时,加上一个标签,表示只发给有这个标签的消费者;

(生产者)

    @Testpublic void simpleTest6(){rocketMQTemplate.syncSend("simple:tag", "Tag Message");}

(消费者)

@Component
@RocketMQMessageListener(consumerGroup = "groupA", topic = "simple", selectorExpression = "tag1")
public class ConsumerListener implements RocketMQListener<String> {@Overridepublic void onMessage(String s) {System.out.println("接收到标签过滤消息 = " + s);}
}

(执行结果)

在这里插入图片描述

b. SQL过滤

另一种是SQL过滤的方式,在消费者这边,写SQL语句对消息进行过滤消息;

(生产者,设置name = SQL)

    @Testpublic void simpleTest6(){// 标签方式rocketMQTemplate.syncSend("simple:tag", "Tag Message");// SQL语句方式rocketMQTemplate.syncSend("simple",MessageBuilder.withPayload("SQL Message").setHeader("name","SQL").build());}

(消费者,只接受name = SQL的消息)

@Component
@RocketMQMessageListener(consumerGroup = "groupA", topic = "simple",selectorType = SelectorType.SQL92, selectorExpression = "name = 'SQL'")
public class ConsumerListener implements RocketMQListener<String> {@Overridepublic void onMessage(String s) {System.out.println("接收到SQL语句过滤消息 = " + s);}
}

(执行结果)

在这里插入图片描述

(7)对象消息

RocketMQ当然也可以发送对象作为消息,该对象应该要实现Serializable接口,如下:

import java.io.Serializable;public class User implements Serializable {private String username;private String password;public User() {}public User(String username, String password) {this.username = username;this.password = password;}public String getUsername() {return username;}public void setUsername(String username) {this.username = username;}public String getPassword() {return password;}public void setPassword(String password) {this.password = password;}@Overridepublic String toString() {return "User{" +"username='" + username + '\'' +", password='" + password + '\'' +'}';}
}

(生产者)

    @Testpublic void simpleTest7(){User user = new User();user.setUsername("zhangsan");user.setPassword("123456");rocketMQTemplate.syncSend("simple", user);}

(消费者)

@Component
@RocketMQMessageListener(consumerGroup = "groupA", topic = "simple")
public class ConsumerListener implements RocketMQListener<User> {@Overridepublic void onMessage(User user) {System.out.println("user = " + user);}
}

(执行结果)

在这里插入图片描述

(8)顺序消息

顺序消息,是指消息从发送到被消费,需要始终保持前后顺序。如下,发送15次消息,可以看到消费者那边的消费顺序并不是一直的;

    @Testpublic void simpleTest1() {for (int i = 0; i < 15; i++) {rocketMQTemplate.syncSend("simple", "这是一个同步消息===>" + i);}}

在这里插入图片描述

顺序消息,需要保证以下两方面:

  • 所有的消息存入到MQ中的同一个队列中,因为RocketMQ默认有四个队列,消息会被负载均衡存储在这些队列里;

  • 该队列只能被一个线程消费,因为一个队列的消息在消费时会有多个线程同时进行消费;

前者可以通过,XxxOrderly()方法实现消息在队列中的顺序存储,如下:

(生产者:给对象设置一个ID,让它们按照ID顺序存储在MQ中)

    @Testpublic void simpleTest8(){ArrayList<User> users = new ArrayList<>();User user1 = new User("1","zhangsan","zs");User user2 = new User("2","lisi","ls");User user3 = new User("3","wangwu","ww");users.add(user1);users.add(user2);users.add(user3);for (User user : users) {rocketMQTemplate.syncSendOrderly("simple",user,user.getId());}}

后者,可以通过在消费者这边添加这个配置,保证消息被顺序消费,如下:

(消费者,设置消费模式 consumeMode = ConsumeMode.ORDERLY

@Component
@RocketMQMessageListener(consumerGroup = "groupA", topic = "simple",consumeMode = ConsumeMode.ORDERLY)
public class ConsumerListener implements RocketMQListener<User> {@Overridepublic void onMessage(User user) {System.out.println(user);}
}

执行结果,可以看到消息时顺序进行的

在这里插入图片描述

总结

RocketMQ的内容还有很多,可参考 http://t.csdn.cn/QXQNZ

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/78143.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

摄像头电池组和平衡车电池组

摄像头电池组 Wh~是电量 Wh V*Ah 毫安(mA)~是电流 电量是9.62Wh&#xff0c;电压是 3.7v 9.62 wh / 3.7v 2.6 Ah 2600mAH 4个并联电池&#xff1a;10400mAH / 4 2600mAH PH2.0mm-2Pins 平衡车 72 wh / 36v 2 Ah 2000mAH 对比自己买的单粒电池 vs 摄像头和平衡车的 …

嵌入式开发学习(STC51-15-红外遥控)

内容 使用外部中断功能&#xff0c;使按下红外遥控器&#xff0c;将对应键值编码数据解码后通过数码管显示 红外遥控介绍 红外线简介 人的眼睛能看到的可见光按波长从长到短排列&#xff0c;依次为红、橙、黄、绿、青、蓝、紫&#xff1b; 其中红光的波长范围为 0.62&…

前端小练习:案例5.律动爱心

目录 一.效果预览图 二.实现思路 ​编辑 1.html部分 2.css部分 三.完整代码 一.效果预览图 二.实现思路 想要实现爱心律动效果并不难&#xff0c;核心点是关键帧动画。 定义律动爱心需要的元素块&#xff0c;使用定位或者弹性布局等方法&#xff08;定位元素不适合布局&…

Android应用开发(6)TextView进阶用法

Android应用开发学习笔记——目录索引 本章介绍文本视图&#xff08;TextView&#xff09;的显示&#xff0c;包括&#xff1a;设置文本内容、设置文本大小、设置文本显示颜色。 一、设置TextView显示内容 Layout XML文件中设置 如&#xff1a;res/layout/activity_main.xm…

【Matter】基于Ubuntu 22.04 交叉编译chip-tool

编译工程之际&#xff0c;记录一下编译过程&#xff0c;免得后续遗忘&#xff0c;总结下来chip-tool 交叉编译涉及到的知识点&#xff1a; 需要了解如何支持交叉编译&#xff0c;基于GN编译框架需要理解应用库如何交叉编译&#xff0c;理解pkg-config的使用meson 编译&#xf…

“我,在腾讯月薪5万,离职后才明白:人越努力,只会越平庸”

那天看瑞达利欧说&#xff0c;他今年已经60岁了&#xff0c;可以说是阅人无数&#xff0c;但没有一个成功人士天赋异禀。 真的如他所说吗&#xff1f; 那张一鸣呢&#xff1f; 字节做到这么大&#xff0c;赚了这么多钱&#xff0c;不靠天赋&#xff0c;靠的是什么&#xff1…

C++笔记之从数组指针到函数数组指针(使用using name和std::function)

C笔记之从数组指针到函数数组指针(使用using name和std::function) 参考笔记&#xff1a; C之指针探究(三)&#xff1a;指针数组和数组指针 C之指针探究(十三)&#xff1a;函数指针数组 C之指针探究(二)&#xff1a;一级指针和一维数组 C之指针探究(十一)&#xff1a;函数名的…

IP路由基础+OSPF 基础

IP路由 RIB与FIB RIB&#xff1a;Routing Information Base&#xff0c;路由信息库 &#xff0c;路由器的控制平面 FIB&#xff1a;Forwarding Information Base&#xff0c;转发信息库&#xff0c;路由器的数据平面 路由信息库主要是记录直连路由以及协议宣告的路由信息&am…

【vue】vue基础知识

1、插值表达式&属性绑定 <!--template展示给用户&#xff0c;相当于MVVM模式中的V--> <template><div class"first_div">//插值表达式<p>{{ message }}</p>//这里的参数是从父组件的template里传过来的<p>{{data_1}}</p…

【转】金融行业JR/T0197-2020《金融数据安全 数据安全分级指南》解读

原文链接&#xff1a;金融行业JR/T0197-2020《金融数据安全 数据安全分级指南》解读 《金融数据安全 数据安全分级指南》 解 读 随着IT技术的发展&#xff0c;银行的基础业务、核心流程等众多事务和活动都运营在信息化基础之上&#xff0c;金融机构运行过程中产生了大量的数字…

【JavaEE进阶】Spring核心与设计思想

文章目录 一. Spring框架概述1. 什么是Spring框架2. 为什么要学习框架?3. Spring框架学习的难点 二. Spring 核心与设计思想1. 什么是容器?2. 什么是IoC?3. Spring是IoC容器4. DI&#xff08;依赖注入&#xff09;5. DL&#xff08;依赖查找&#xff09; 一. Spring框架概述…

栈和队列的实现以及OJ题讲解

&#x1f493;博主个人主页:不是笨小孩&#x1f440; ⏩专栏分类:数据结构与算法&#x1f440; 刷题专栏&#x1f440; C语言&#x1f440; &#x1f69a;代码仓库:笨小孩的代码库&#x1f440; ⏩社区&#xff1a;不是笨小孩&#x1f440; &#x1f339;欢迎大家三连关注&…

【阵列信号处理】空间匹配滤波器、锥形/非锥形最佳波束成形器、样本矩阵反演 (SMI) 研究(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

回归预测 | MATLAB实现POA-CNN-LSTM鹈鹕算法优化卷积长短期记忆神经网络多输入单输出回归预测

回归预测 | MATLAB实现POA-CNN-LSTM鹈鹕算法优化卷积长短期记忆神经网络多输入单输出回归预测 目录 回归预测 | MATLAB实现POA-CNN-LSTM鹈鹕算法优化卷积长短期记忆神经网络多输入单输出回归预测预测效果基本介绍模型描述程序设计参考资料 预测效果 基本介绍 MATLAB实现POA-CNN…

Michael.W基于Foundry精读Openzeppelin第20期——EnumerableMap.sol

0. 版本 [openzeppelin]&#xff1a;v4.8.3&#xff0c;[forge-std]&#xff1a;v1.5.6 0.1 EnumerableMap.sol Github: https://github.com/OpenZeppelin/openzeppelin-contracts/blob/v4.8.3/contracts/utils/structs/EnumerableMap.sol EnumerableMap库提供了Bytes32ToB…

快速远程桌面控制公司电脑远程办公

文章目录 第一步第二步第三步 远程办公的概念很早就被提出来&#xff0c;但似乎并没有多少项目普及落实到实际应用层面&#xff0c;至少在前几年&#xff0c;远程办公距离我们仍然很遥远。但2019年末突如其来的疫情&#xff0c;着实打了大家一个措手不及。尽管国内最初的大面积…

新魔百和M301H_关于CW代工_JL(南传)代工_zn及sm代工区分及鸿蒙架构全网通卡刷包刷机教程

新魔百盒M301H_关于CW代工_JL(南传)代工_zn及sm代工区分及鸿蒙架构全网通卡刷包刷机教程 下载固件之前我们先区分下代工&#xff1a; 如盒子背面型号标签上带有ZN则视为兆能代工&#xff0c;如有CW或BYT则视为创维代工&#xff1b; 如有JL或南传则视为九联代工&#xff0c;ys…

机器学习---概述(一)

文章目录 1.人工智能、机器学习、深度学习2.机器学习的工作流程2.1 获取数据集2.2 数据基本处理2.3 特征工程2.3.1 特征提取2.3.2 特征预处理2.3.3 特征降维 2.4 机器学习2.5 模型评估 3.机器学习的算法分类3.1 监督学习3.1.1 回归问题3.1.2 分类问题 3.2 无监督学习3.3 半监督…

网络安全防火墙体验实验

网络拓扑 实验操作&#xff1a; 1、cloud配置 2、防火墙配置 [USG6000V1]int GigabitEthernet 0/0/0 [USG6000V1-GigabitEthernet0/0/0]ip add 192.168.200.100 24 打开防火墙的所有服务 [USG6000V1-GigabitEthernet0/0/0]service-manage all permit 3、进入图形化界面配置…

阿里云容器服务助力极氪荣获 FinOps 先锋实践者

作者&#xff1a;海迩 可信云评估是中国信息通信研究院下属的云计算服务和软件的专业评估体系&#xff0c;自 2013 年起历经十年发展&#xff0c;可信云服务评估体系已日臻成熟&#xff0c;成为政府支撑、行业规范、用户选型的重要参考。 2022 年 5 月国务院国资委制定印发《…