rocketmq顺序消费简述

概述

再引入mq解耦部分业务操作后,一些场景还需要顺序处理;
这就需要mq顺序消费了;

rocketmq的顺序消费关键点在于对messagequeue的有序消费;
一个topic下有多个messagequeue(默认是4个),而且这些messagequeue可能分布在不同的broker上;

要想有序消费,先要确保生产者要将有序消费的消息投递到同一个messagequeue中;
然后这个messagequeue只有1个消费者消费,并且这个消费者内部要对消息进行顺序消费

提升顺序消息的能力;
可以将不同业务场景的顺序消息投递到不同的messagequeue中(比如A业务投递到a队列B业务投递到b队列),先拆一部分;
然后消费的时候可以批量获取消息,减少和broker的交互次数

rocketmq版本:4.X
java版本:1.8

生产者顺序投递

可以通过相同的shardkey,把消息投递到同一个message queue

官方的代码demo
https://rocketmq.apache.org/docs/4.x/producer/03message2

public class Producer {public static void main(String[] args) throws UnsupportedEncodingException {try {DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");producer.start();String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};for (int i = 0; i < 100; i++) {int orderId = i % 10;Message msg =new Message("TopicTest", tags[i % tags.length], "KEY" + i,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {//arg就是消息发送时的对象;这个demo中是orderIdInteger id = (Integer) arg;int index = id % mqs.size();return mqs.get(index);}}, orderId);System.out.printf("%s%n", sendResult);}producer.shutdown();} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {e.printStackTrace();}}
}

消费者顺序消费

核心在于让消费者顺序处理message queue中的消息;
rocketmq1个message queue只会分配给同一个分组下的1个消费者,所以这个时候消费者只要保证顺序消费就行;

push模式(DefaultMQPushConsumer实例化)的消费者,一般有3个因素影响;
batchPullSize:一次从broker拉取消息的数量,拉取后按照下面2个参数分批次并行消费
consumeThreadMin和consumeThreadMax:消费者启动时有多少个线程去处理消息
consumeMessageBatchMaxSize:消费者每个线程每次处理的消息数量

可以使用org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly来替换org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently

public class Consumer {/*** 消费者实体对象*/private DefaultMQPushConsumer consumer;/*** 消费者组*/public static final String CONSUMER_GROUP = "consumer_group";/*** 通过构造函数 实例化对象*/public Consumer() throws MQClientException {consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);consumer.setNamesrvAddr("IP:9876");//TODO 这里真的是个坑,我product设置VipChannelEnabled(false),但消费者并没有设置这个参数,之前发送普通消息的时候也没有问题。能正常消费。//TODO 但在顺序消息时,consumer一直不消费消息了,找了好久都没有找到原因,直到我这里也设置为VipChannelEnabled(false),竟然才可以消费消息。consumer.setVipChannelEnabled(false);//订阅主题和 标签( * 代表所有标签)下信息consumer.subscribe(JmsConfig.TOPIC, "*");//注册消费的监听 这里注意顺序消费为MessageListenerOrderly 之前并发为ConsumeConcurrentlyContextconsumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {//获取消息MessageExt msg = msgs.get(0);//消费者获取消息 这里只输出 不做后面逻辑处理log.info("Consumer-线程名称={},消息={}", Thread.currentThread().getName(), new String(msg.getBody()));return ConsumeOrderlyStatus.SUCCESS;});consumer.start();}
}

MessageListenerConcurrently顺序消费解析

org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start中可以看到,在启动时判断为顺序消费时;
会启动一个线程,对message对应的processQueue设置是否锁定;没有锁定时,在提交消息消费任务给消费线程池时会再次尝试加锁
org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService#start

消息在消费时,不管顺序消费还是并行消费都是提交给线程池去消费;顺序消费在提交给线程池时会对message queue使用synchronized

提交消息消费任务给线程池
org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService#submitConsumeRequest

message queue进行加锁
org.apache.rocketmq.client.impl.consumer.MessageQueueLock#fetchLockObject

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/6968.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

k8s简介,k8s环境搭建

目录 K8s简介环境搭建和准备工作修改主机名&#xff08;所有节点&#xff09;配置静态IP&#xff08;所有节点&#xff09;关闭防火墙和seLinux&#xff0c;清除iptables规则&#xff08;所有节点&#xff09;关闭交换分区&#xff08;所有节点&#xff09;修改/etc/hosts文件&…

net Core Ocelot(1)单地址,多地址

Ocelot 网关技术 》》》配置文件 》》》单地址 {"Routes": [{// 上游 》》 接受的请求//上游请求方法,可以设置特定的 HTTP 方法列表或设置空列表以允许其中任何方法"UpstreamHttpMethod": [ "Get", "Post" ],"UpstreamPathTe…

GIS 中的 SQLAlchemy:空间数据与数据库之间的桥梁

利用 SQLAlchemy 在现代应用程序中无缝集成地理空间数据导言 地理信息系统&#xff08;GIS&#xff09;在管理城市规划、环境监测和导航系统等各种应用的空间数据方面发挥着至关重要的作用。虽然 PostGIS 或 SpatiaLite 等专业地理空间数据库在处理空间数据方面非常出色&#…

Jmeter使用Request URL请求接口

简介 在Jmeter调试接口时&#xff0c;有时不清楚后端服务接口的具体路径&#xff0c;可以使用Request URL和cookie来实现接口请求。以下内容以使用cookie鉴权的接口举例。 步骤 ① 登录网站后获取具体的Request URL和cookie信息 通过浏览器获取到Request URL和cookie&#…

每日十题八股-2025年1月24日

1.面试官&#xff1a;Kafka 百万消息积压如何处理&#xff1f; 2.面试官&#xff1a;最多一次、至少一次和正好一次有什么区别? 3.面试官&#xff1a;你项目是怎么存密码的? 4.面试官&#xff1a;如何设计一个分布式ID&#xff1f; 5.面试官&#xff1a;单点登录是怎么工作的…

Docker—搭建Harbor和阿里云私有仓库

Harbor概述 Harbor是一个开源的企业级Docker Registry管理项目&#xff0c;由VMware公司开发。‌它的主要用途是帮助用户迅速搭建一个企业级的Docker Registry服务&#xff0c;提供比Docker官方公共镜像仓库更为丰富和安全的功能&#xff0c;特别适合企业环境使用。‌12 Harb…

基于Docker的Spark分布式集群

目录 1. 说明 2. 服务器规划 3. 步骤 3.1 要点 3.2 配置文件 3.2 访问Spark Master 4. 使用测试 5. 参考 1. 说明 以docker容器方式实现apache spark计算集群&#xff0c;能灵活的增减配置与worker数目。 2. 服务器规划 服务器 (1master, 3workers) ip开放端口备注ce…

C语言自定义数据类型详解(一)——结构体类型(上)

什么是自定义数据类型呢&#xff1f;顾名思义&#xff0c;就是我们用户自己定义和设置的类型。 在C语言中&#xff0c;我们的自定义数据类型一共有三种&#xff0c;它们分别是&#xff1a;结构体(struct)&#xff0c;枚举(enum)&#xff0c;联合(union)。接下来&#xff0c;我…

记录让cursor帮我给ruoyi-vue后台管理项目整合mybatis-plus

自己整合过程中会出现 work.web.exception.GlobalExceptionHandler :100 | 请求地址/admin/device/install/detail/1,发生未知异常. org.apache.ibatis.binding.BindingException: Invalid bound statement (not found): com.fire.mapper.DeviceInstallMapper.selectById at o…

HUMANITY’S LAST EXAM (HLE) 综述:人工智能领域的“最终考试”

论文地址&#xff1a;Humanity’s Last Exam 1. 背景与动机 随着大型语言模型&#xff08;LLMs&#xff09;能力的飞速发展&#xff0c;其在数学、编程、生物等领域的任务表现已超越人类。为了系统地衡量这些能力&#xff0c;LLMs 需要接受基准测试&#xff08;Benchmarks&…

利用大型语言模型在量化投资中实现自动化策略

“Automate Strategy Finding with LLM in Quant investment” 论文地址&#xff1a;https://arxiv.org/pdf/2409.06289 摘要 这个新提出的量化股票投资框架&#xff0c;利用大型语言模型&#xff08;LLMs&#xff09;与多智能体系统相结合的方法&#xff0c;通过LLMs从包括数…

OpenCV:在图像中添加高斯噪声、胡椒噪声

目录 在图像中添加高斯噪声 高斯噪声的特性 添加高斯噪声的实现 给图像添加胡椒噪声 实现胡椒噪声的步骤 相关阅读 OpenCV&#xff1a;图像处理中的低通滤波-CSDN博客 OpenCV&#xff1a;高通滤波之索贝尔、沙尔和拉普拉斯-CSDN博客 OpenCV&#xff1a;图像滤波、卷积与…

大数据学习(40)- Flink执行流

&&大数据学习&& &#x1f525;系列专栏&#xff1a; &#x1f451;哲学语录: 承认自己的无知&#xff0c;乃是开启智慧的大门 &#x1f496;如果觉得博主的文章还不错的话&#xff0c;请点赞&#x1f44d;收藏⭐️留言&#x1f4dd;支持一下博主哦&#x1f91…

Prometheus+Grafana监控minio对象存储

1. 安装 MinIO 步骤 1&#xff1a;下载 MinIO 二进制文件 wget https://dl.min.io/server/minio/release/linux-amd64/miniochmod x miniosudo mv minio /usr/local/bin/ 步骤 2&#xff1a;创建数据目录 sudo mkdir -p /data/miniosudo chown -R $USER:$USER /data/minio …

2025数学建模美赛|F题成品论文

国家安全政策与网络安全 摘要 随着互联网技术的迅猛发展&#xff0c;网络犯罪问题已成为全球网络安全中的重要研究课题&#xff0c;且网络犯罪的形式和影响日益复杂和严重。本文针对网络犯罪中的问题&#xff0c;基于多元回归分析和差异中的差异&#xff08;DiD&#xff09;思…

期权帮|如何利用股指期货进行对冲套利?

锦鲤三三每日分享期权知识&#xff0c;帮助期权新手及时有效地掌握即市趋势与新资讯&#xff01; 如何利用股指期货进行对冲套利&#xff1f; 对冲就是通过股指期货来平衡投资组合的风险。它分为正向与反向两种策略&#xff1a; &#xff08;1&#xff09;正向对冲&#xff…

QT 中 UDP 的使用

目录 一、UDP 简介 二、QT 中 UDP 编程的基本步骤 &#xff08;一&#xff09;包含头文件 &#xff08;二&#xff09;创建 UDP 套接字对象 &#xff08;三&#xff09;绑定端口 &#xff08;四&#xff09;发送数据 &#xff08;五&#xff09;接收数据 三、完整示例代…

Android BitmapShader简洁实现马赛克,Kotlin(二)

Android BitmapShader简洁实现马赛克&#xff0c;Kotlin&#xff08;二&#xff09; 这一篇 Android BitmapShader简洁实现马赛克&#xff0c;Kotlin&#xff08;一&#xff09;-CSDN博客 遗留一个问题&#xff0c;xml定义的MyView为wrap_content的宽高&#xff0c;如果改成其…

分布式光纤应变监测是一种高精度、分布式的监测技术

一、土木工程领域 桥梁结构健康监测 主跨应变监测&#xff1a;在大跨度桥梁的主跨部分&#xff0c;如悬索桥的主缆、斜拉桥的斜拉索和主梁&#xff0c;分布式光纤应变传感器可以沿着这些关键结构部件进行铺设。通过实时监测应变情况&#xff0c;能够精确捕捉到车辆荷载、风荷…

uniapp的插件开发发布指南

Hbuilder创建项目 项目根目录创建uni_modules 开发组件 发布到插件市场 填写发布说明&#xff08;未登录需要登录&#xff09; 点击提交 在终端可以看到 发布成功&#xff01; 插件市场查看