【RocketMQ系列五】消息示例-顺序消息延迟消息广播消息的实现

1. 前言

上一篇文章我们介绍了简单消息的实现,本文将主要来介绍顺序消息的实现,顺序消息分为局部顺序消息和全局顺序消息。

顺序消息指的是消费者在消费消息时,按照生产者发送消息的顺序进行消费。即先发送的先消费【FIFO】。

顺序消息分为 全局顺序消息和局部顺序消息。

全局顺序消息就是全局使用一个queue。

局部顺序消息就是 有顺序依赖的消息放在同一个queue中,多个queue并行消费。

2. 局部顺序消息

默认情况下RocketMQ会根据轮询的方式将消息发送到某个broker中的某个队列中,这样的话就不能保证消息是有序的。

比如在购物网站下单场景下:有 1. 创建订单---->2. 订单支付---->3. 订单发货---->4. 订单完成 四条消息。这四条消息逻辑上肯定是有序的。但是如果采用RocketMQ默认的消息投递方式,那么同一个订单,有可能创建订单被投递到了 MessageQueue1,订单支付的话被投递到了MessageQueue2。 由于消息在不同的MessageQueue中,消费者在消费的时候就可能会出现订单支付的消息先于创建订单的消息。

局部顺序消息就是要保证同一笔订单4条消息都放在同一个queue中,这样的话就不会出现订单支付的消息先于创建订单的消息被消费。就像下图所示:

局部顺序消息

局部顺序消息消费者在消费某个topic的某个队列中的消息的时候是顺序的。消费者使用MessageListenerOrderly类来进行消息监听。

2.1. 定义生产者

  1. 这里定义了名为part_order_topic_test的topic。运行程序之后该topic可以路由到broker-a 以及broker-b 两个broker。

    image-20231003154231683

public class OrderProducer {// 局部顺序消费,核心就是自己选择Queue,保证需要顺序保障的消息落到同一个队列中public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer defaultMQProducer = new DefaultMQProducer("order_producer_group");defaultMQProducer.setNamesrvAddr("172.31.184.89:9876");defaultMQProducer.start();for (int i = 0; i < 10; i++) {int orderId = i;for (int j = 0; j < 5; j++) {// 构建消息体,tags和key 只是做一个简单区分Message partOrderMsg = new Message("part_order_topic_test", "order_" + orderId, "KEY_" + orderId, ("局部顺序消息处理_" + orderId + ";step_" + j).getBytes());SendResult send = defaultMQProducer.send(partOrderMsg, new MessageQueueSelector() {@Override//这里的arg参数就是外面的orderIdpublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer orderId = (Integer) arg;int index = orderId % mqs.size();return mqs.get(index);}}, orderId);System.out.printf("%s%n", send);}}defaultMQProducer.shutdown();}
}
  1. 在发送消息的时候实现MessageQueueSelector接口用于在发送消息的时候指定队列。其中, public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) 方法有三个参数:其中,mqs表示当前topic所路由的全部队列数,这里就是8个队列,broker-a有4个队列,broker-b有4个队列。msg就是传入的消息体,arg 就是传入的orderId。

  2. 这里根据orderId与队列数求模取余来获取消息应该发送到哪个队列中,这样就保证了相同的orderId的消息会落到同一个队列中

    Integer orderId = (Integer) arg;
    int index = orderId % mqs.size();
    return mqs.get(index);
    
生产者运行结果(部分截图)

image-20231003160039915

从运行结果可以看出相同orderId的消息被投递到了同一个MessageQueue中,而相同MessageQueue队列天然是有顺序的。

2.2.定义消费者

说完了生产者,接着来说说消费者。消费者的逻辑主要是在消费的时候需要实现 MessageListenerOrderly 类来进行消息监听。核心代码是:

	// 2.订阅消费消息defaultMQPushConsumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {for (MessageExt msg : msgs) {System.out.println("消费得到的消息是={}" + msg);System.out.println("消息体内容是={}" + new String(msg.getBody()));}return ConsumeOrderlyStatus.SUCCESS;}});

这里启动了三个消费者,不管消费者消费的顺序如何,相同的orderId下的5条消息都是被顺序消费的。

image-20231010125014684

image-20231010125043838

image-20231010125110337

3. 碰到的问题

在首次调试的时候出现了一个 broker is full 的错误。这是由于磁盘空间不足导致的,可以通过 df -h 命令查看当前磁盘空间的占用情况,当磁盘空间使用率超过90%的话则会报此错。

image-20231003131237358

4. 全局顺序消息

全局顺序消息是指消费者消费全部消息都是顺序的,只能让所有的消息都发送到同一个MessageQueue中来实现,在高并发场景下会非常影响效率。

5. 广播消息

广播消息是向主题(topic)的所有订阅者发送消息,订阅同一个topic的多个消费者,都能全量收到生产者发送的所有消息。

广播消息的生产者与普通同步消息的生产者实现是一致的,不同的是消费者的消息模式不同。这里给出消费者实现的不同之处。

	DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("broadCastGroup");defaultMQPushConsumer.setNamesrvAddr("172.31.184.89:9876");// 设置消费者的模式是广播模式defaultMQPushConsumer.setMessageModel(MessageModel.BROADCASTING);//从第一位开始消费defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

6. 延迟消息

延迟消息与普通消息的不同之处在于,它们要在指定的时间之后才会被传递。生产者并不会延迟发送消息,而是发送到topic里面,消费者延迟指定的时间进行消费。

6.1. 延迟消息生产者

DefaultMQProducer defaultMQProducer = new DefaultMQProducer("scheduled_group");defaultMQProducer.setNamesrvAddr("172.31.186.180:9876");defaultMQProducer.start();for (int i = 0; i < 100; i++) {Message message = new Message("Schedule_topic", ("延迟消息测试" + i).getBytes());//设置延迟级别,默认有18个延迟级别,这个消息将延迟10秒消费message.setDelayTimeLevel(3);defaultMQProducer.send(message);}System.out.println("所有延迟消息发送完成");defaultMQProducer.shutdown();

延迟消息生产者与普通消息生产者主要的区别是延迟消息需要调用 setDelayTimeLevel 方法设置延迟级别,这里设置级别是3,则是延迟10秒。RocketMQ提供了18种延迟级别。可以在 RocketMQ的仪表板中的集群中的broker配置中找到。

image-20231003200021490

延迟消息的消费者与普通消息的消费者相同的。RocketMQ内部通过名为SCHEDULE_TOPIC_XXXX 的topic来存放延迟消息。

image-20231003201410410

7.批量消息

批量发送消息提高了传递消息的性能。官方建议批量消息的总大小不应超过1M,实际不应超过4M。如果超过4M的批量消息需要进行分批处理。同时设置broker的配置参数为4M(在broker的配置文件中修改:maxMessageSize=4194304)。核心代码如下:

	//4.创建消息List<Message> messageList = new ArrayList<>();for (int i = 0; i < 100*100; i++) {// 创建消息,指定topic,以及消息体messageList.add(new Message("batch_topic", ("飞哥测试批量消息" + i).getBytes()));}//批量消息消息小于4M的处理SendResult send = defaultMQProducer.send(messageList);System.out.println(send);

8.过滤消息

使用tag过滤

在大多数情况下,标签是一种简单而有用的设计,可以用来选择你想要的消息。

首先是根据tag来过滤消息,生产者在发送消息的时候指定该消息的tag标签,消费者则可以根据tag来过滤消息。

8.1. 过滤消息生产者

这里定义了三个tag,分别是tagA,tagB以及tagC,生产者在生产消息的时候给每个消息指定不同的tag。

DefaultMQProducer defaultMQProducer = new DefaultMQProducer("TagProducer_group");defaultMQProducer.setNamesrvAddr("172.31.184.89:9876");defaultMQProducer.start();String[] tags = new String[]{"tagA", "tagB", "tagC"};for (int i = 0; i < 15; i++) {Message message = new Message("TagFilterTest", tags[i % tags.length], ("飞哥tag消息过滤" + tags[i % tags.length]).getBytes());SendResult send = defaultMQProducer.send(message);System.out.printf("%s%n", send);}defaultMQProducer.shutdown();

8.2. 过滤消息的消费者

消费者过滤出了标签带有tagA以及tagC的消息进行消费。这里其实是broker将consumer需要的消息推给消费者。

	DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("tagConsumer");defaultMQPushConsumer.setNamesrvAddr("172.31.184.89:9876");defaultMQPushConsumer.subscribe("TagFilterTest", "tagA||tagC");defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {for (MessageExt msg : msgs) {System.out.println("接收到的消息=" + msg);System.out.println("接收到的消息体=" + new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});defaultMQPushConsumer.start();System.out.println("消费者已经启动");

image-20231003212919155

使用SQL过滤

SQL 功能可以通过发送消息时输入的属性进行一些计算,在RocketMQ定义的语法下,可以实现一些有趣的逻辑。

语法

RocketMQ只定义了一些基本的语法类支持这个特性。

1. 数值比较:如 `>`,`>=`,`<=`,`BETWEEN`,`=`;
2. 字符比较:如 `=`,'<>',`IN`;
3. `IS NULL` 或 `IS NOT NULL` ;
4. 逻辑`AND`,`OR`,`NOT`;

常量类型有:

1. 数字,如 123,
2. 字符,如 'abc',必须用单引号;
3. `NULL`,特殊常数;
4. 布尔值,`TRUE` 或 `FALSE`;

SQL过滤生产者

生产者主要设置属性过滤 message.putUserProperty("a", String.valueOf(i)); 表示第一条消息键值对是 a=0,第二条消息键值对是a=1。

	DefaultMQProducer defaultMQProducer = new DefaultMQProducer("TagProducer_group");defaultMQProducer.setNamesrvAddr("172.31.184.89:9876");defaultMQProducer.start();String[] tags = new String[]{"tagA", "tagB", "tagC"};for (int i = 0; i < 15; i++) {Message message = new Message("SQLFilterTest", tags[i % tags.length], ("飞哥sql消息过滤" + tags[i % tags.length]).getBytes());message.putUserProperty("a", String.valueOf(i));SendResult send = defaultMQProducer.send(message);System.out.printf("%s%n", send);}defaultMQProducer.shutdown();

SQL过滤消费者:

	DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("tagConsumer");defaultMQPushConsumer.setNamesrvAddr("172.31.184.89:9876");defaultMQPushConsumer.subscribe("SQLFilterTest", MessageSelector.bySql("(TAGS is not null and TAGS in ('tagA','tagC'))"+" and (a is null and a between 0 and 3)"));defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {for (MessageExt msg : msgs) {System.out.println("接收到的消息=" + msg);System.out.println("接收到的消息体=" + new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});defaultMQPushConsumer.start();System.out.println("消费者已经启动");

如果运行报 The broker does not support consumer to filter message by SQL92

image-20231003221207618

则需要修改 broker.conf 文件,增加如下配置:

# 开启对 propertyfilter的支持
enablePropertyFilter = true 
filterSupportRetry = true

然后重启broker。

总结

本文介绍了局部顺序消息,全局顺序消息,广播消息,延迟消息,以及如何批量发送消息和过滤消息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/165171.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

vue2升级到vue2.7

vue2升级到vue2.7 小小的改进,大大的提升 只需要简单修改,开发体验得到大大提升. 为什么要升级Vue2.7 不能拒绝的理由: 组合式 API(解决mixins问题:命名冲突,隐式依赖)单文件组件内的 <script setup>语法模板表达式中支持 ESNext 语法(可选链:?.、空值合并:??)单文…

Windows 钉钉多开 dingtalkRC版

亲测可用 下载链接&#xff1a; https://dtapp-pub.dingtalk.com/dingtalk-desktop/win_installer/RC/DingTalk_v6.5.20-RC.7229101.exe

修改echarts的tooltip样式 折线图如何配置阴影并实现渐变色和自适应

图片展示 一、引入echarts 这里不用多解释 vue里使用 import echarts from “echarts”; html页面引用js文件或用script标签引用 二、定义一个具有宽高的dom div <div id"echart-broken" style"width:400px;height: 200px;"></div>三、定义…

Softing为连接PROFIBUS网络提供多种接口产品方案

一 应用广泛的PROFIBUS网络 PROFIBUS是基于统一、标准且独立于应用的通信协议。据PI-China统计&#xff0c;在工业领域里早已有近5090万个PROFIBUS设备被安装在了超过900万节点中。PROFIBUS网络的广泛应用得益于PROFIBUS协议的开放性——用户可以很方便地在PROFIBUS网络的任意…

NXP官方uboot针对ALPHA开发板网络驱动更改说明三

一. 简介 前几篇文章学习了 在 NXP官方uboot上做网络驱动的一部分更改。地址如下&#xff1a; ALPHA开发板网络方案说明-CSDN博客 NXP官方uboot针对ALPHA开发板网络驱动更改说明一-CSDN博客 NXP官方uboot针对ALPHA开发板网络驱动更改说明二-CSDN博客 本文继续来学习在 NXP官…

C#,数值计算——分类与推理Phylagglomnode的计算方法与源程序

1 文本格式 using System; using System.Collections.Generic; namespace Legalsoft.Truffer { public class Phylagglomnode { public int mo { get; set; } public int ldau { get; set; } public int rdau { get; set; } public …

hexo发生错误 Error: Spawn failed

错误描述 仓库中有东西&#xff0c;运行如下命令后报错 hexo d报错提示: 原因分析: 看别人的博客是用git进行push或hexo d的时候改变了一些.deploy_git文件下的内容&#xff0c;这个.deploy_git的内容对于hexo来说可能是系统文件&#xff0c;这里挖坑 解决办法 一个个的…

第一节——vue安装+前端工程化

作者&#xff1a;尤雨溪 官网&#xff1a;简介 | Vue.js 脚手架文档 创建一个项目 | Vue CLI 一、概念&#xff08;了解&#xff09; 是一套用于构建用户界面的渐进式框架。与其它大型框架不同的是&#xff0c;Vue 被设计为可以自底向上逐层应用。Vue 的核心库只关注视图层&…

简单谈谈我参加数据分析省赛的感受与体会

数据分析省赛的感受与体会 概要考试前的感受与体会考试注意事项小结 概要 大数据分析省赛指的是在省级范围内举办的大数据分析竞赛活动。该竞赛旨在鼓励和推动大数据分析领域的技术创新和人才培养&#xff0c;促进大数据技术与应用的深度融合&#xff0c;切实解决实际问题。参…

一些经典的神经网络(第19天)

1. 经典神经网络&#xff08;LeNet&#xff09; LeNet是早期成功的神经网络&#xff1b; 先使用卷积层来学习图片空间信息 然后使用全连接层来转到到类别空间 【通过在卷积层后加入激活函数&#xff0c;可以引入非线性、增加模型的表达能力、增强稀疏性和解决梯度消失等问题…

【R】数据相关性的可视化

一千零一技|相关性分析及其可视化&#xff1a;copy&paste&#xff0c;搞定 .libPaths(c("/bioinfo/home/software/miniconda3/envs/R4.0/lib/R/library")) #data("mtcars") library("PerformanceAnalytics") # pdf("test.pdf") #…

《动手学深度学习 Pytorch版》 9.3 深度循环神经网络

将多层循环神经网络堆叠在一起&#xff0c;通过对几个简单层的组合&#xff0c;产生一个灵活的机制。其中的数据可能与不同层的堆叠有关。 9.3.1 函数依赖关系 将深度架构中的函数依赖关系形式化&#xff0c;第 l l l 个隐藏层的隐状态表达式为&#xff1a; H t ( l ) ϕ l …

2023年信息院学生科协第二次硬件培训

2023年信息院学生科协第二次硬件培训 前言一、51单片机简介1、什么是单片机2、主流单片机及其编程语言3、单片机的应用4、单片机开发软件 二、GPIO&#xff08;点亮LED&#xff09;1、GPIO简介2、LED简介3、硬件设计4、软件设计 三、GPIO&#xff08;独立按键&#xff09;1、按…

数据结构--线性表回顾

目录 线性表 1.定义 2.线性表的基本操作 3.顺序表的定义 3.1顺序表的实现--静态分配 3.2顺序表的实现--动态分配 4顺序表的插入、删除 4.1插入操作的时间复杂度 4.2顺序表的删除操作-时间复杂度 5 顺序表的查找 5.1按位查找 5.2 动态分配的方式 5.3按位查找的时间…

Halcon手眼标定

手眼标定&#xff08;参考&#xff1a;B站王佳琪老师) 这里说的手眼标定中的手指的是机械手或者电机运动的轴&#xff0c;眼表示摄像头 就是两个空间坐标系的转换&#xff0c;这个转换需要一个转换矩阵&#xff0c;那么转换矩阵需要根据两个坐标系的对应的九个点来通过vec_to…

如何开发出来一款解决抖音本地生活的软件营销工具?

一、智能剪辑、矩阵分发、无人直播、爆款文案于一体独立应用开发 抖去推----主要针对本地生活的----移动端(小程序软件系统&#xff0c;目前是全国源头独立开发)&#xff0c;开发功能大拆解分享&#xff0c;功能大拆解&#xff1a; 7大模型剪辑法&#xff08;数学阶乘&#x…

10月20日星期五今日早报简报微语报早读

10月20日星期五&#xff0c;农历九月初六&#xff0c;早报微语早读分享。 1、上海4岁走失女童遗体在宁波市某滩涂被发现&#xff0c;排除刑事案件&#xff1b; 2、中国多个实体和个人被指涉伊朗军工制造将遭美国非法单边制裁&#xff0c;外交部&#xff1a;坚决反对&#xff…

dubbogo-1 基础rpc服务

文章目录 基本环境处理编译pb接口开启rpc调用业务观察qa1 能取出protoc里面的字段值吗&#xff1f; 基本环境处理 https://cn.dubbo.apache.org/zh-cn/overview/quickstart/go/install/ 这里没有 protoc-gen-go --version 执行 go get -u github.com/golang/protobuf/protoc…

初出茅庐的小李博客之Windows11运行Linux记录

安装教程 超简单&#xff0c;不安装虚拟机&#xff0c;Windows11运行Linuxhttps://zhuanlan.zhihu.com/p/393484912 注意事项 出现错误有可能是少了驱动 驱动下载地址 https://link.zhihu.com/?targethttps%3A//wslstorestorage.blob.core.windows.net/wslblob/wsl_updat…