消息队列技术选型:这 7 种消息场景一定要考虑!

大家好,我是君哥。

我们在做消息队列的技术选型时,往往会结合业务场景进行考虑。今天来聊一聊消息队列可能会用到的 7 种消息场景。

1 普通消息

消息队列最基础的功能就是生产者发送消息、Broker 保存消息,消费者来消费消息,以此实现系统解耦、削峰填谷的作用。

普通消息是消息队列必备的消息类型,也是系统使用场景最多的一种消息。

2 顺序消息

顺序消息是指生产者发送消息的顺序和消费者消费消息的顺序是一致的。比如在一个电商场景,同一个用户提交订单、订单支付、订单出库,这三个消息消费者需要按照顺序来进行消费。如下图:

顺序消息的实现并不容易,原因如下:

  • 生产者集群中,有多个生产者发送消息,网络延迟不一样,很难保证发送到 Broker 的消息落盘顺序是一致的;
  • 如果 Broker 有多个分区或队列,生产者发送的消息会进入多个分区,也无法保证顺序消费;
  • 如果有多个消费者来异步消费同一个分区,很难保证消费顺序跟生产者发送顺序一致。

要保证消息有序,需要满足两个条件:

  • 同一个生产者必须同步发送消息到同一个分区;
  • 一个分区只能给同一个消费者消费。

如下图:

上面第二个条件是比较容易实现的,一个分区绑定一个消费者就可以,主要是第一个条件。

在主流消息队列的实现中,Kafka 和 Pulsar 的实现方式类似,生产者给消息赋值一个 key,对 key 做 Hash 运算来指定消息发送到哪一个分区。比如上面电商的例子,对同一个用户的一笔订单,提交订单、订单支付、订单出库这三个消息赋值同一个 key,就可以把这三条消息发送到同一个分区。

对于 RocketMQ,生产者在发送消息的时候,可以通过 MessageQueueSelector 指定把消息投递到那个 MessageQueue,如下图:

示例代码如下:

public static void main(String[] args) throws UnsupportedEncodingException {try {DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");producer.start();String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};for (int i = 0; i < 100; i++) {int orderId = i % 10;Message msg =new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer id = (Integer) arg;int index = id % mqs.size();return mqs.get(index);}}, orderId);System.out.printf("%s%n", sendResult);}producer.shutdown();} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {e.printStackTrace();}
}

RabbitMQ 的实现是 Exchange 根据设置好的 Route Key 将数据路由到不同的 Queue 中。示例代码如下:

@Resource
private AmqpTemplate rabbitTemplate;public void send1(String message) {rabbitTemplate.convertAndSend("testExchange", "testRoutingKey", message);
}

3 延时消息

或者也叫定时消息,是指消息发送后不会立即被消费,而是指定一个时间,到时间后再消费。经典的场景比如电商购物时,30 分钟未支付订单,让订单自动失效。

3.1 RocketMQ 实现

RocketMQ 定义了 18 个延时级别,每个延时级别对应一个延时时间。下面如果延迟级别是 3,则消息会延迟 10s 才会拉取。

//MessageStoreConfig类
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

RocketMQ 的延时消息如下图:

生产者把消费发送到 Broker 后,Broker 首先把消息保存到 SCHEDULE_TOPIC_XXXX 这个 Topic,然后调度任务会判断是否到期,如果到期,会把消息从 SCHEDULE_TOPIC_XXXX 取出投递到原始的 queue,这样消费者就可以消费到了。

RocketMQ 的延时消息只支持最大两个小时的延时,不过 RocketMQ5.0 基于时间轮算法实现了定时消息,解决了这个问题。

3.2 Pulsar 实现

Pulsar 的实现如下图:

Pulsar 的延时消息首先会写入一个 Delayed Message Tracker 的数据结构中,Delayed Message Tracker 根据延时时间构建 delayed index 优先级队列。消费者拉取消息时,首先去 Delayed Message Tracker 检查是否有到期的消息。如果有则直接拉取进行消费。

3.3 RabbitMQ 实现

RabbitMQ 的实现方式有两种,一种是投递到普通队列都不消费,等消息过期后被投递到死信队列,消费者消费死信队列。如下图:

第二种方式是生产者发送消息时,先发送到本地 Mnesia 数据库,消息到期后定时器再将消息投递到 broker。

3.4 Kafka 实现

Kafka 本身并没有延时队列,不过可以通过生产者拦截器来实现消息延时发送,也可以定义延时 Topic,利用类似 RocketMQ 的方案来实现延时消息。

4 事务消息

事务消息是指生产消息和消费消息满足事务的特性。

RabbitMQ 和 Kafka 的事务消息都是只支持生产消息的事务特性,即一批消息要不全部发送成功,要不全部发送失败。

RabbitMQ 通过 Channel 来开启事务消息,代码如下:

ConnectionFactory factory=new ConnectionFactory();
connection=factory.newConnection();
Channel channel=connection.createChannel();
//开启事务
channel.txSelect();
channel.basicPublish("directTransactionExchange","transactionRoutingKey",null,message.getBytes("utf-8"));
//提交事务 或者 channel.txRollback()回滚事务
channel.txCommit();

Kafka 可以给多个生产者设置同一个事务 ID ,从而把多个 Topic 、多个 Partition 放在一个事务中,实现原子性写入。

Pulsar 的事务消息对于事务语义的定义是:允许事件流应用将消费、处理、生产消息整个过程定义为一个原子操作。可见,Pulsar 的事务消息可以覆盖消息流整个过程。

RocketMQ 的事务消息是通过 half 消息来实现的。以电商购物场景来看,账户服务扣减账户金额后,发送消息给 Broker,库存服务来消费这条消息进行扣减库存。如下图:

可见,RocketMQ 只能保证生产者发送消息和本地事务的原子性,并不能保证消费消息的原子性。

5 轨迹消息

轨迹消息主要用于跟踪消息的生命周期,当消息丢失时可以很方便地找出原因。

轨迹消息也跟普通消息一样,也需要存储和查询,也会占用消息队列的资源,所以选择轨迹消息要考虑下面几点:

  • 消息生命周期的关键节点一定要记录;
  • 不能影响正常消息的发送和消费性能;
  • 不能影响 Broker 的消息存储性能;
  • 要考虑消息查询维度和性能。

RabbitMQ Broker 实现了轨迹消息的功能,打开 Trace 开关,就可以把轨迹消息发送到 amq.rabbitmq.trace 这个 exchange,但是要考虑轨迹消息会不会给 Broker 造成 压力进而导致消息积压。RabbitMQ 的生产者和消费者都没有实现轨迹消息,需要开发者自己来实现。

RocketMQ 生产者、Broker 和消费者都实现了轨迹消息,不过默认是关闭的,需要手工开启。

使用轨迹消息,需要考虑记录哪些节点、存储介质、性能、查询方式等问题。

6 死信队列

在消息队列中,死信队列主要应对一些异常的情况,如下图:

RocketMQ 实现了消费端的死信队列,当消费者消费失败时,会进行重试,如果重试 16 次还是失败,则这条消息会被发送到死信队列。

RabbitMQ 实现了生产者和 Broker 的死信队列,下面三种情况,消息会被发送到死信队列:

  • 生产者发送消息被拒绝,并且 requeue 参数设置为 false;
  • Broker 消息过期了;
  • 队列达到最大长度。

RabbitMQ 消息变成死信消息后,会被发送到死信交换机(Dead-Letter-Exchange)。

7 优先级消息

有一些业务场景下,我们需要优先处理一些消息,比如银行里面的金卡客户、银卡客户优先级高于普通客户,他们的业务需要优先处理。如下图:

主流消息队列中,RabbitMQ 是支持优先级队列的,代码如下:

ConnectionFactory factory=new ConnectionFactory();
connection=factory.newConnection();
Channel channel=connection.createChannel();
Map<String, Object> args = new HashMap<String, Object>();
//设置优先级为 5
args.put("x-max-priority", 5);
channel.queueDeclare("my-priority-queue", true, false, false, args);

8 总结

消息队列技术选型,要考虑的因素很多,本文主要从业务场景来分析需要考虑的因素,同时技术上也需要考虑运维复杂度、业务规模、社区活跃度、学习成本等因素。希望本文对你使用消息队列有所帮助。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/149476.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Stm32_标准库_6_八种输入出模式

上拉输入与下拉输入 上拉输入&#xff1a;电平默认为高电平&#xff0c;只有当外部输入为低电平时&#xff0c;此IO口电平才会被拉低&#xff0c;经过触发器&#xff0c;再到寄存器&#xff0c;最后传入CPU GPIO_Mode_IPU&#xff1b;下拉输入&#xff1a;电平默认为低电平&am…

机器学习小知识--面试得一塌糊涂

机器学习中需要归一化的算法有SVM, 逻辑回归&#xff0c;神经网络&#xff0c;KNN, 线性回归&#xff0c;而树形结构的不需要归一化&#xff0c;因为它们不关心变量的值&#xff0c;而是关心变量分布和变量之间的条件概率&#xff0c;如决策树&#xff0c;随机森林&#xff0c;…

ExoPlayer架构详解与源码分析(3)——Timeline

系列文章目录 ExoPlayer架构详解与源码分析&#xff08;1&#xff09;——前言 ExoPlayer架构详解与源码分析&#xff08;2&#xff09;——Player 文章目录 系列文章目录前言Timeline单文件或者点播流媒体文件播放列表或者点播流列表有限可播的直播流无限可播的直播流有多个P…

速度轴模拟量控制FB(博途SCL+三菱ST代码)

利用模拟量实现变频器的正反转直接控制具体方法,请参考下面文章链接: 模拟量0-10V信号控制变频器实现正反转速度随动_RXXW_Dor的博客-CSDN博客比例随动专栏有系列文章介绍,大家可以查看相关文章,链接如下:绕线机-排线伺服比例随动功能块(梯形图+SCL代码)_RXXW_Dor的博客…

Python如何实现数据驱动的接口自动化测试

大家在接口测试的过程中&#xff0c;很多时候会用到对CSV的读取操作&#xff0c;本文主要说明Python3对CSV的写入和读取。下面话不多说了&#xff0c;来一起看看详细的介绍吧。 1、需求 某API&#xff0c;GET方法&#xff0c;token,mobile,email三个参数 token为必填项mobil…

比特米盒子刷CoreELEC

CoreELEC就晶辰定制的Kodi版本&#xff0c;比特米盒子在刷入ATV后通过切换卡载系统可以安装CoreELEC即可安装&#xff0c;实现影音播放自由 1、U盘启动CoreELEC 1.1 、安装【安卓】切换卡载系统 通过U盘在已经刷好atv6.0的比特米盒子安装“切换卡载系统”。比特米盒子刷atv6.…

uni-app:js修改元素样式(宽度、外边距)

效果 代码 1、在<view>元素上添加一个ref属性&#xff0c;用于在JavaScript代码中获取对该元素的引用&#xff1a;<view ref"myView" id"mybox"></view> 2、获取元素引用 &#xff1a;const viewElement this.$refs.myView.$el; 3、修改…

【Zookeeper专题】Zookeeper特性与节点数据类型详解

目录 前言前置知识课程内容一、Zookeeper介绍二、Zookeeper快速开始2.1 Zookeeper安装2.2 客户端命令行操作2.3 GUI工具 三、Zookeeper数据结构3.1 ZNode节点分类3.2 ZNode状态信息3.3 监听机制详解3.3.1 永久性Watch 3.4 节点ZNode特性总结3.5 应用场景详解3.5.1 统一命名服务…

广西建筑模板厂家-能强优品木业

广西作为中国西南地区的重要省份&#xff0c;建筑业蓬勃发展&#xff0c;建筑模板作为建筑施工的核心材料之一&#xff0c;在广西也有着广泛的需求。如果您正在寻找广西的建筑模板厂家&#xff0c;广西贵港市能强优品木业有限公司是一家备受认可的供应商。广西贵港市能强优品木…

八、【快速选择工具组】

文章目录 对象选择工具快速选择工具魔棒工具 对象选择工具 当我们选择对象选择工具时&#xff0c;需要先注意上边有一个循环的圆&#xff0c;它会进行内容识别&#xff0c;当识别完成会停止旋转。这个时候我们按住n键&#xff0c;或者将鼠标放上对应的图形时会出现选中的颜色。…

5分钟入门卷积算法

大家好啊&#xff0c;我是董董灿。 深度学习算法中&#xff0c;尤其是计算机视觉&#xff0c;卷积是无论如何都绕不过去的槛。 初学者看到这个算法后&#xff0c;很多是知其然不知其所以然&#xff0c;甚至不知道这个算法是做什么的&#xff0c;或者很疑惑&#xff0c;为什么…

在vue2中,v-model和.sync的区别

最近在封装一个弹窗组件时&#xff0c;用了比较复杂的逻辑去做显示和隐藏的逻辑&#xff0c;在查看同事的代码之后&#xff0c;才知道还有更简单的方法&#xff0c;自己已经忘了一些API. popup组件里统一的template&#xff1a; <div v-ifisShowPopup> // 弹窗内容 <…

oringin的x轴(按x轴规定值)绘制不规律的横坐标

1.双击x轴 2.选择刻度线标签 3.选择刻度

网络安全行业真的内卷了吗?网络安全就业就业必看

前言 有一个特别流行的词语叫做“内卷”&#xff1a; 城市内卷太严重了&#xff0c;年轻人不好找工作&#xff1b;教育内卷&#xff1b;考研内卷&#xff1b;当然还有计算机行业内卷…… 这里的内卷当然不是这个词原本的意思&#xff0c;而是“过剩”“饱和”的替代词。 按照…

实验三十五、LM117 稳压电源的设计

一、题目 利用 LM117 设计一个稳压电路&#xff0c;要求输出电压的调节范围为 5 ∼ 20 V 5\sim20\,\textrm V 5∼20V&#xff0c;最大负载电流为 400 mA 400\,\textrm{mA} 400mA。利用 Multisim 对所设计电路进行仿真&#xff0c;并测试所有性能指标。 二、仿真电路 仿真电…

Waves 14混音特效插件合集mac/win

Waves14是一款音频处理软件&#xff0c;主要用于音频编辑、混音和母带处理。该软件提供了各种插件&#xff0c;包括EQ、压缩、混响、延迟、失真等&#xff0c;以及一些专业的音频处理工具&#xff0c;如L2限幅器、Linear Phase EQ和多频道扬声器管理。 Mac软件下载&#xff1a;…

深度学习基础 2D卷积(1)

什么是2D卷积 2D参数量怎么计算 以pytorch为例子&#xff0c;2D卷积在设置的时候具有以下参数&#xff0c;具有输入通道的多少&#xff08;这个决定了卷积核的通道数量&#xff09;&#xff0c;滤波器数量&#xff0c;这个是有多少个滤波器&#xff0c;越多提取的特征就越有用…

10.1select并发服务器以及客户端

服务器&#xff1a; #include<myhead.h>//do-while只是为了不让花括号单独存在&#xff0c;并不循环 #define ERR_MSG(msg) do{\fprintf(stderr,"%d:",__LINE__);\perror(msg);\ }while(0);#define PORT 8888//端口号1024-49151 #define IP "192.168.2.5…

在Linux上安装QQ

第一步&#xff1a; 在QQ官网上复制Linux版QQ的下载链接 https://dldir1.qq.com/qqfile/qq/QQNT/b69de82d/linuxqq_3.2.1-17153_x86_64.rpm 第二步&#xff1a; 在Linux终端输入wget 加Linux版QQ的下载链接&#xff08;如果怕中途断开链接可以输入wget -c &#xff09; [rootn…

网络基础知识面试题2

VC++常用功能开发汇总(专栏文章列表,欢迎订阅,持续更新...)https://blog.csdn.net/chenlycly/article/details/124272585C++软件异常排查从入门到精通系列教程(专栏文章列表,欢迎订阅,持续更新...)