RocketMq 顺序消费、分区消息、延迟发送消息、Topic、tag分类 实战 (消费者) (三)

消费端配置
如下所示:是消费者的配置类,有以下几点需要注意的地方
1、是TargetMessageListener这个监听类(下文会把这个监听类的具体代码贴出来),需要把这个监听类订阅。
2、rocketMqDcProperties.getTargetProperties()这个方法里面有相关的配置信息(这里面有绑定消费者是那个组,因为一类Producer或Consumer,这类Producer或Consumer通常生产或消费同一类消息,且消息发布或订阅的逻辑一致),具体代码见下文
3、subscription.setTopic(rocketMqDcProperties.getOrderTopic()) 是绑定Topic,这个Topic跟上篇文章生产者的Topic是一致的,这样就能保证消费者能准确消费到生产者发送的消息
4、subscription.setExpression(MqTagEnum.target.name()); 这个是一个消息的过滤,也是一个对消息的具体分类。详细用法请参考链接:
https://help.aliyun.com/zh/apsaramq-for-rocketmq/cloud-message-queue-rocketmq-4-x-series/developer-reference/message-filtering?spm=a2c4g.11186623.0.i38#concept-2047069
综上所述:
这段代码主要是用于配置一个 OrderConsumerBean 对象,设置其属性和订阅关系

@Configuration
public class TargetConsumerClient {@Autowiredprivate RocketMqDcProperties rocketMqDcProperties;@Autowiredprivate TargetMessageListener messageListener;@ConditionalOnProperty(name = "rocket.mq.dc.enabled", havingValue = "true", matchIfMissing = true)@Bean(initMethod = "start", destroyMethod = "shutdown",name = {"buildTargetConsumer"})public OrderConsumerBean buildTargetConsumer() {OrderConsumerBean orderConsumerBean = new OrderConsumerBean();//配置文件orderConsumerBean.setProperties(rocketMqDcProperties.getTargetProperties());//订阅关系Map<Subscription, MessageOrderListener> subscriptionTable = new HashMap<Subscription, MessageOrderListener>();Subscription subscription = new Subscription();subscription.setTopic(rocketMqDcProperties.getOrderTopic());subscription.setExpression(MqTagEnum.target.name());subscriptionTable.put(subscription, messageListener);orderConsumerBean.setSubscriptionTable(subscriptionTable);return orderConsumerBean;}
}
	public Properties getTargetProperties() {return this.getProperties(this.targetGroupId);}private Properties getProperties(String groupId) {Properties properties = new Properties();properties.setProperty("AccessKey", this.accessKey);properties.setProperty("SecretKey", this.secretKey);properties.setProperty("NAMESRV_ADDR", this.nameSrvAddr);properties.setProperty("GROUP_ID", groupId);properties.setProperty("ConsumeThreadNums", this.getConsumeThreadNums().toString());properties.setProperty("maxReconsumeTimes", this.getMaxReconsumeTimes().toString());properties.setProperty("consumeTimeout", this.getConsumeTimeout().toString());properties.setProperty("suspendTimeMillis", this.getSuspendTimeMillis().toString());return properties;}    

消费端代码
如下所示:TargetMessageListener 实现了MessageOrderListener接口,并如上文所示,其和OrderConsumerBean也绑定了订阅关系。

@Slf4j
@Component
public class TargetMessageListener implements MessageOrderListener {@Autowiredprivate MqMessageRecordDao mqMessageRecordDao;@Autowiredprivate TargetService targetServiceImpl;@Overridepublic OrderAction consume(final Message message, final ConsumeOrderContext context) {log.info("MQ消息消费者监听消息内容:{}", message);MqMessageRecordMo mqMessageRecordMo = new MqMessageRecordMo();try {String body = new String(message.getBody());String tag = message.getTag();mqMessageRecordMo.setMsgId(message.getMsgID());mqMessageRecordMo.setOrderTopic(message.getTopic());mqMessageRecordMo.setProducerIp(message.getBornHost());mqMessageRecordMo.setTag(tag);//mqMessageRecordMo.setMessageKey(message.getKey());mqMessageRecordMo.setShardingKey(message.getShardingKey());mqMessageRecordMo.setBodyJson(body)mqMessageRecordMo.setProducerTime(LocalDateLUtils.timestampToDatetime(message.getBornTimestamp()));mqMessageRecordMo.setCreateTime(LocalDateTime.now());mqMessageRecordMo.setPMsgId(message.getUserProperties("pMsgId"));log.info("MQ消费者消息消费成功,解析并处理相应的业务逻辑, tag = {},key = {},messageId = {}", tag, message.getShardingKey(),message.getMsgID());DataBaseEnum dataBase = DataBaseEnum.getEnum(message.getShardingKey());targetServiceImpl.process(message.getMsgID(),dataBase,body);log.info("MQ消息体消费监听解析结果:{}", body);mqMessageRecordMo.setIsSuccess(true);return OrderAction.Success;} catch (Exception e) {//消费失败,挂起当前队列// 存储错误消息,重试,记录日志/*log.error("target MQ消费者消息监听消息业务逻辑处理失败:",e);mqMessageRecordMo.setIsSuccess(false);mqMessageRecordMo.setErrorMsg(e.getMessage());return OrderAction.Success;} finally {mqMessageRecordDao.save(mqMessageRecordMo);}}
}            

在这里想讲下顺序消息
顺序消息
顺序消息可以保证消息的消费顺序和发送的顺序一致,即先发送的先消费,后发送的后消费,常用于金融证券、电商业务等对消息指令顺序有严格要求的场景。本文介绍云消息队列 RocketMQ 版顺序消息的概念、适用场景、实现原理以及使用过程中的注意事项。
什么是顺序消息
顺序消息是云消息队列 RocketMQ 版提供的一种对消息发送和消费顺序有严格要求的消息。对于一个指定的Topic,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发布的消息先消费,后发布的消息后消费。
顺序消息分为分区顺序消息和全局顺序消息。
分区顺序消息
对于指定的一个Topic,所有消息根据Sharding Key进行区块分区,同一个分区内的消息按照严格的先进先出(FIFO)原则进行发布和消费。同一分区内的消息保证顺序,不同分区之间的消息顺序不做要求。
适用场景
适用于性能要求高,以Sharding Key作为分区字段,在同一个区块中严格地按照先进先出(FIFO)原则进行消息发布和消费的场景。
示例
用户注册需要发送验证码,以用户ID作为Sharding Key,那么同一个用户发送的消息都会按照发布的先后顺序来消费。
电商的订单创建,以订单ID作为Sharding Key,那么同一个订单相关的创建订单消息、订单支付消息、订单退款消息、订单物流消息都会按照发布的先后顺序来消费。
阿里巴巴集团内部电商系统均使用分区顺序消息,既保证业务的顺序,同时又能保证业务的高性能。
全局顺序消息
对于指定的一个Topic,所有消息按照严格的先入先出(FIFO)的顺序来发布和消费。
适用场景
适用于性能要求不高,所有的消息严格按照FIFO原则来发布和消费的场景。
示例
在证券处理中,以人民币兑换美元为Topic,在价格相同的情况下,先出价者优先处理,则可以按照FIFO的方式发布和消费全局顺序消息。
说明
全局顺序消息实际上是一种特殊的分区顺序消息,即Topic中只有一个分区,因此全局顺序和分区顺序的实现原理相同。因为分区顺序消息有多个分区,所以分区顺序消息比全局顺序消息的并发度和性能更高
如何实现顺序消息
在这里插入图片描述
在云消息队列 RocketMQ 版中,消息的顺序需要由以下三个阶段保证:
消息发送
如上图所示,A1、B1、A2、A3、B2、B3是订单A和订单B的消息产生的顺序,业务上要求同一订单的消息保持顺序,例如订单A的消息发送和消费都按照A1、A2、A3的顺序。如果是普通消息,订单A的消息可能会被轮询发送到不同的队列中,不同队列的消息将无法保持顺序,而顺序消息发送时云消息队列 RocketMQ 版支持将Sharding Key相同(例如同一订单号)的消息序路由到一个队列中。
云消息队列 RocketMQ 版服务端判定消息产生的顺序性是参照同一生产者发送消息的时序。不同生产者、不同线程并发产生的消息,云消息队列 RocketMQ 版服务端无法判定消息的先后顺序。
消息存储
如上图所示,顺序消息的Topic中,每个逻辑队列对应一个物理队列,当消息按照顺序发送到Topic中的逻辑队列时,每个分区的消息将按照同样的顺序存储到对应的物理队列中。
消息消费
云消息队列 RocketMQ 版按照存储的顺序将消息投递给Consumer,Consumer收到消息后也不对消息顺序做任何处理,按照接收到的顺序进行消费。
Consumer消费消息时,同一Sharding Key的消息使用单线程消费,保证消息消费顺序和存储顺序一致,最终实现消费顺序和发布顺序的一致
注意事项
a、同一个Group ID只对应一种类型的Topic,即不同时用于顺序消息和无序消息的收发。
b、对于全局顺序消息,建议消息不要有阻塞。同时运行多个实例,是为了防止工作实例意外退出而导致业务中断。当工作实例退出时,其他实例可以立即接手工作,不会导致业务中断,实际工作的只会有一个实例。
c、云消息队列 RocketMQ 版服务端判定消息产生的顺序性是参照单一生产者、单一线程并发下消息发送的时序。如果发送方有多个生产者或者有多个线程并发发送消息,则此时只能以到达云消息队列 RocketMQ 版服务端的时序作为消息顺序的依据,和业务侧的发送顺序未必一致

顺序消息常见问题
a、同一条消息是否可以既是顺序消息,又是定时消息和事务消息?
不可以。顺序消息、定时消息、事务消息是不同的消息类型,三者是互斥关系,不能叠加在一起使用。

b、顺序消息支持哪些地域?
支持云消息队列 RocketMQ 版所有公共云地域和金融云地域。

c、为什么全局顺序消息性能一般?
全局顺序消息是严格按照FIFO的消息阻塞原则,即上一条消息没有被成功消费,那么下一条消息会一直被存储到Topic队列中。如果想提高全局顺序消息的TPS,可以升级实例配置,同时消息客户端应用尽量减少处理本地业务逻辑的耗时。

d、顺序消息支持哪种消息发送方式?
顺序消息只支持可靠同步发送方式,不支持异步发送方式,否则将无法严格保证顺序。

e、顺序消息是否支持集群消费和广播消费?
顺序消息暂时仅支持集群消费模式,不支持广播消费模式。
以上文档链接来源:
https://help.aliyun.com/zh/apsaramq-for-rocketmq/cloud-message-queue-rocketmq-4-x-series/developer-reference/ordered-messages?spm=a2c4g.11186623.0.0.34b428e5LL1Jlh

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/284481.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

0基础 三个月掌握C语言(13)-下

数据在内存中的存储 浮点数在内存中的存储 常见的浮点数&#xff1a;3.141592、1E10等 浮点数家族包括&#xff1a;float、double、long double类型 浮点数表示的范围&#xff1a;在float.h中定义 练习 关于&#xff08;float*)&n&#xff1a; &n&#xff1a;这是一…

字符串哈希(c++)

1、定义 字符串哈希把不同的字符串映射成不同的整数 1.1、规则 1、把字符串映射成一个P进制数字 对于一个长度为n的字符串s 这样定义Hash函数&#xff1a; 例如&#xff1a;字符串abc&#xff0c;其哈希函数值为ap^2 bp^1 c; 即97 * 131^2 98 * 131^1 99; 2、两个字符串不一…

【计算机网络篇】数据链路层(3)差错检测

文章目录 &#x1f95a;误码&#x1f354;两种常见的检错技术⭐奇偶校验⭐循环冗余校验&#x1f388;例子 &#x1f95a;误码 误码首先介绍误码的相关概念 &#x1f354;两种常见的检错技术 ⭐奇偶校验 奇校验是在待发送的数据后面添加1个校验位&#xff0c;使得添加该校验…

C++|类封装、类的分文件编写练习:设计立方体类、点和圆的关系

文章目录 练习案例1&#xff1a;设计立方体类CPP代码 练习案例2:点和圆的关系CPP代码 代码总结类的分文件编写 练习案例1&#xff1a;设计立方体类 设计立方体类(Cube) 求出立方体的面积和体积 分别用全局函数和成员函数判断两个立方体是否相等。 CPP代码 class Cube { pub…

GPT4.0

GPT4.0 支持官网所有功能以及所有第三方GPTS&#xff0c;完全同步官网。无需魔法&#xff0c;填写授权码直达官网。全天超18小时维护&#xff0c;无需担心不稳定。没有永久卡&#xff0c;3.5免费提供&#xff0c;4.0可以按需下单即可&#xff0c;不存在跑路。 需要的联系

详细安装步骤:vue.js 三种方式安装(vue-cli)

Vue.js&#xff08;读音 /vjuː/, 类似于 view&#xff09;是一个构建数据驱动的 web 界面的渐进式框架。Vue.js 的目标是通过尽可能简单的 API 实现响应的数据绑定和组合的视图组件。它不仅易于上手&#xff0c;还便于与第三方库或既有项目整合。 三种 Vue.js 的安装方法&…

unity 多屏幕操作

想了解基础操作请移步&#xff1a;&#xff08;重点是大佬写的好&#xff0c;这里就不再赘述&#xff09; Unity 基础 之 使用 Display 简单的实现 多屏幕显示的效果_unity display-CSDN博客 在panel上也可以通过获取 Canvas&#xff0c;来达到切换多屏幕的操作&#xff0c; …

Flutter-仿携程首页类型切换

效果 唠叨 闲来无事&#xff0c;不小心下载了携程app&#xff0c;还幻想可以去旅游一番&#xff0c;奈何自己运气不好&#xff0c;自从高考时第一次吹空调导致自己拉肚子考试&#xff0c;物理&#xff0c;数学考了一半就交卷&#xff0c;英语2B铅笔除了问题&#xff0c;导致原…

行政工作常用表格

企业管制制度大全https://www.chuandao100.com/279.html

JVM垃圾收集器你会选择吗?

目录 一、Serial收集器 二、ParNew收集器 三、Paralle Scavenge 四、Serial Old 五、Parallel Old 六、CMS收集器 6.1 CMS对处理器资源非常敏感 6.2 CMS容易出现浮动垃圾 6.3 产生内存碎片 七、G1 收集器 八、如何选择合适的垃圾收集器 JVM 垃圾收集器是Java虚…

生成模型概述

文章目录 生成模型概述一、生成模型类型二、生成对抗网络&#xff08;GANs&#xff09;三、自回归模型&#xff08;Autoregressive Models&#xff09;四、扩散模型&#xff08;Diffusion Models&#xff09;五、流模型&#xff08;Flow-based Models&#xff09;参考 生成模型…

【数据分享】1929-2023年全球站点的逐日平均海平面压力(Shp\Excel\免费获取)

气象数据是在各项研究中都经常使用的数据&#xff0c;气象指标包括气温、风速、降水、能见度等指标&#xff0c;说到气象数据&#xff0c;最详细的气象数据是具体到气象监测站点的数据&#xff01; 有关气象指标的监测站点数据&#xff0c;之前我们分享过1929-2023年全球气象站…

CPP容器vector和list,priority_queue定义比较器

#include <iostream> #include <bits/stdc.h> using namespace std; struct VecCmp{bool operator()(int& a,int& b){return a>b;/*** 对于vector和list容器&#xff0c;这里写了&#xff1e;就是从大到小* 对于priority_queue容器&#xff0c;这里写…

【阅读论文】When Large Language Models Meet Vector Databases: A Survey

摘要 本调查探讨了大型语言模型&#xff08;LLM&#xff09;和向量数据库&#xff08;VecDB&#xff09;之间的协同潜力&#xff0c;这是一个新兴但迅速发展的研究领域。随着LLM的广泛应用&#xff0c;出现了许多挑战&#xff0c;包括产生虚构内容、知识过时、商业应用成本高昂…

【算法与数据结构】 C语言实现单链表队列详解

文章目录 &#x1f4dd;队列&#x1f320; 数据结构设计&#x1f309;初始化队列函数 &#x1f320;销毁队列函数&#x1f309;入队函数 &#x1f320;出队函数&#x1f309;获取队首元素函数 &#x1f320;获取队尾元素函数&#x1f309; 判断队列是否为空函数&#x1f309;获…

如何让uni-app开发的H5页面顶部原生标题和小程序的顶部标题不一致?

如何让标题1和标题2不一样&#xff1f; 修改根目录下的App.vue&#xff08;核心代码如下&#xff09; <script>export default {onLaunch() {// 监听各种跳转----------------------------------------[navigateTo, redirectTo, reLaunch, switchTab, navigateBack, ].…

Leetcode 226. 翻转二叉树

心路历程&#xff1a; 翻转一瞬间没什么思路&#xff0c;其实就是挨个把每个结点的左右子树都翻转了。主要不要按照左右子树去思考&#xff0c;要按照结点去思考。 翻转既可以从上到下翻转&#xff08;前序遍历&#xff09;&#xff0c;也可以从下到上翻转&#xff08;后序遍历…

等保测评-Windows服务器

安全计算环境 身份鉴别 a)应对登录的用户进行身份标识和鉴别&#xff0c;身份标识具有唯一性&#xff0c;身份鉴别信息具有复杂度要求并定期更换 winr //运行 secpol.msc //本地安全策略&#xff0c;点击账户策略中的密码策略 注意修改的时候确保当前密码是满足条件的 b)应…

应急响应-Linux(1)

应急响应-Linux(1) 黑客的IP地址 思路&#xff1a; 一般系统中马之后会有进程连接黑客的主机&#xff0c;可以使用netstat -anpt查看下当前进程的连接&#xff0c;此处查看到没有后 &#xff0c;可以从系统服务开始查找&#xff0c;系统的服务日志一般都会保存相关访问信息&…

idea 没有代码提示解决方法

itellij idea 没有代码提示解决方法 今天写代码发现没有代码提示了&#xff0c;很难受。 直接上解决方法 设置 File-Settings-Editor-General-Code Completion&#xff1a;勾选Show suggestrions as you type 我的是这个问题&#xff0c;勾选上就ok了 取消节能模式 如果…