RocketMQ 消息传递模型

文章目录

  • 0. 前言
  • 1. RocketMQ的消息传递模型
    • 1.1. 同步发送
    • 1.2. 异步发送
    • 1.3. 单向发送
  • 2. RocketMQ的批量发送和消费
    • 2.1 批量发送
    • 2.2 批量消费
    • 2.3 Spring Boot集成RocketMQ官方starter 示例
  • 3. 总结
  • 4. 参考文档
  • 5. 源码地址

在这里插入图片描述

0. 前言

RocketMQ 支持6种消息传递方式,我们本次来聊三种消息传递模型,分别是可靠的同步传输、可靠的异步传输和单向传输。

  1. 可靠的同步传输(Reliable Synchronous Transmission):这是最常见的模型,生产者发送消息后,会等待消费者响应,确认消息已被消费者接收并处理。这种模式虽然可靠,但是由于需要等待确认,所以传输速度相对较慢。

  2. 可靠的异步传输(Reliable Asynchronous Transmission):在这种模型中,生产者发送消息后,不等待消费者的确认,直接返回,继续发送下一条消息。消费者在接收到消息后,会异步地确认消息。这种模式的传输速度较快,但是可能会存在消息丢失的风险。

  3. 单向传输(One-way Transmission):这种模型更加简单,生产者只负责发送消息,不关心消费者是否接收和处理,也不需要任何确认。这种模式通常用于对可靠性要求不高,但对速度要求高的场景,比如日志收集。

1. RocketMQ的消息传递模型

Spring boot 集成很简单
1.配置依赖

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>${rocketmq.version}</version>
</dependency>
  1. application.properties文件中配置RocketMQ的相关属性:
spring.rocketmq.name-server=127.0.0.1:9876
spring.rocketmq.producer.group=producerGroup

1.1. 同步发送

  • 定义:
    RocketMQ同步发送是指生产者发送消息后,会在收到服务器返回确认的应答后才会发送下一条消息。这样发送消息的方式会增加消息发送的耗时,但能够确保消息被服务器成功接收。

  • 适用场景:
    对于一些重要的消息通知、短信通知、短信营销系统等,需要确保消息的准确无误的到达,可以采用RocketMQ的同步发送方式。

  • Springboot 集成使用示例:通过RocketMQTemplate的syncSend方法发送消息。

// 发送消息
@Autowired
private RocketMQTemplate rocketMQTemplate;public void sendMessage() {rocketMQTemplate.syncSend("my-topic", "Hello, RocketMQ");
}
  1. 同步发送方式会阻塞当前线程,直到服务器返回响应,因此需要考虑到这种方式可能会影响系统的吞吐量。
  2. RocketMQ的同步发送方式能够保证消息的可靠性,但也需要保证RocketMQ服务器的高可用,防止服务器出现问题导致消息丢失。
  3. 在使用RocketMQ的同时,还需要注意消息的顺序性和消费者的消费能力,避免出现消息堆积的情况。

1.2. 异步发送

  • RocketMQ异步发送是指在发送消息时,不等待消息发送结果的返回,而是通过回调函数来处理消息发送的结果。

  • 适用场景:

    • 需要发送大量消息,并且对消息发送的响应时间要求不高的场景。
    • 需要异步处理消息发送结果的场景,例如发送短信、邮件等通知类消息。

使用RocketMQ的RocketMQTemplate发送消息

@Service
public class MessageProducer {@Autowiredprivate RocketMQTemplate rocketMQTemplate;public void sendMessage(String topic, String message) {rocketMQTemplate.asyncSend(topic, message, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {// 处理发送成功的逻辑}@Overridepublic void onException(Throwable throwable) {// 处理发送异常的逻辑}});}
}

在需要发送消息的地方调用MessageProducersendMessage方法:

@RestController
public class MessageController {@Autowiredprivate MessageProducer messageProducer;@GetMapping("/send")public String sendMessage() {String topic = "testTopic";String message = "Hello RocketMQ!";messageProducer.sendMessage(topic, message);return "Message sent successfully!";}
}
  • 异步发送消息需要通过回调函数来处理发送结果,需要考虑回调函数的执行时间和顺序,以确保消息发送的可靠性。
  • 异步发送消息可能会导致消息发送的顺序不确定,需要在接收端进行相关处理,保证消息的处理顺序。
  • 异步发送消息时,需要注意控制并发量,避免发送过多消息导致系统负载过高。

1.3. 单向发送

  • 定义:单向发送是指消息生产者发送消息后,不等待服务器回执响应,即发送后不关心是否到达broker。这种方式发送消息的过程网络开销最小,速度最快。

  • 适用场景:适用于某些耗时非常短,但是对可靠性要求并不高的场景,比如日志收集。

    • 单向发送方式并不能保证消息一定会被成功消费,因为它并不关心消息是否正确到达broker,所以如果你的业务对消息的可靠性有较高要求,不推荐使用单向发送。
    • 在大流量的情况下,单向发送方式由于其网络开销小,速度快的特点,可以显著提高系统的吞吐量。

然后通过RocketMQTemplatesendOneWay方法来发送单向消息:

 @Servicepublic class MyProducer {@Autowiredprivate RocketMQTemplate rocketMQTemplate;public void sendMsg(String topic, String msg) {// 这里的topic需要和你在RocketMQ中设置的topic相对应rocketMQTemplate.sendOneWay(topic, MessageBuilder.withPayload(msg).build());}}

2. RocketMQ的批量发送和消费

2.1 批量发送

批量发送的优点和使用场景:
优点:批量发送可以减少网络开销,提高消息传输的吞吐量,特别是在网络带宽充足的情况下。使用场景:适合大量小消息的发送,例如日志收集,统计数据等。

如何进行批量发送:

List<Message> msgs = new ArrayList<>();
msgs.add(new Message("TopicA", "TagA", "OrderID001", "Hello world 0".getBytes()));
msgs.add(new Message("TopicA", "TagA", "OrderID002", "Hello world 1".getBytes()));
msgs.add(new Message("TopicA", "TagA", "OrderID003", "Hello world 2".getBytes()));
try {producer.send(msgs);
} catch (Exception e) {e.printStackTrace();
}

2.2 批量消费

批量消费的优点和使用场景:
优点:批量消费可以减少消费者与消息队列的通信次数,提高消费效率。使用场景:适合处理大量小消息的场景,例如日志处理,统计数据等。

如何进行批量消费:
在RocketMQ中,批量消费主要通过设置consumer的consumeMessageBatchMaxSize属性,一次性从队列中拉取多条消息。

consumer.setConsumeMessageBatchMaxSize(10);  //一次消费10条消息

2.3 Spring Boot集成RocketMQ官方starter 示例

批量消费的前提是生产者发送的是批量消息。这个由于RocketMQ的设计,目前的版本中并不支持批量消费单条发送的消息。
这里以Spring Boot集成RocketMQ官方starter为例,首先在pom.xml中添加依赖:

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.0.4</version>
</dependency>

批量发送示例:

@Autowired
private RocketMQTemplate rocketMQTemplate;public void sendBatchMessages() {List<Message> msgs = new ArrayList<>();msgs.add(new Message("TopicA", "TagA", "OrderID001", "Hello world 0".getBytes()));msgs.add(new Message("TopicA", "TagA", "OrderID002", "Hello world 1".getBytes()));msgs.add(new Message("TopicA", "TagA", "OrderID003", "Hello world 2".getBytes()));rocketMQTemplate.syncSend("TopicA",msgs);
}

批量消费示例:

@Service
@RocketMQMessageListener(topic = "TopicA", consumerGroup = "my-consumer_group")
public class BatchConsumer implements RocketMQListener<List<String>> {@Overridepublic void onMessage(List<String> messages) {for (String msg : messages) {System.out.println("Receive message: " + msg);}}
}

3. 总结

同步传输模型(Synchronous)
在同步传输模型中,消息发送方(Producer)发送消息后会一直等待消息被确认(Acknowledgement)后才继续执行后续操作。消息接收方(Consumer)在接收到消息后,会发送确认消息给消息发送方,告知消息已经成功接收。这种模型保证了消息的可靠性,但会造成消息发送方的阻塞。

异步传输模型(Asynchronous)
在异步传输模型中,消息发送方发送消息后不会立即等待确认,而是继续执行后续操作。消息接收方在接收到消息后,会发送确认消息给消息发送方,告知消息已经成功接收。这种模型可以提高消息发送方的吞吐量,但消息的可靠性需要通过设置重试和回调机制来保证。

单向传输模型(Oneway)
在单向传输模型中,消息发送方发送消息后不会等待确认,也不会接收到消息接收方的确认消息。消息发送方无法得知消息是否成功接收,也无法进行重试。这种模型适用于对消息可靠性要求不高,但对发送性能要求较高的场景,如日志记录等。

4. 参考文档

  1. 官方文档链接:https://rocketmq.apache.org/docs/

  2. GitHub链接:https://github.com/apache/rocketmq-spring

5. 源码地址

我的github https://github.com/wangshuai67/icepip-springboot-action-examples

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/131852.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

pyarmor 加密许可证的使用

一 pyarmor 许可证的用处 文档&#xff1a;5. 许可模式和许可证 — Pyarmor 8.3.6 文档 试用版本有如下的限制&#xff1a; 加密功能对脚本大小有限制&#xff0c;不能加密超过限制的大脚本。 混淆字符串功能在试用版中无法使用。 RFT 加密模式&#xff0c;BCC 加密模式在试…

解决java.io.IOException: Network error

解决java.io.IOException: Network error 解决java.io.IOException: Network error摘要引言正文1. 理解异常的根本原因2. 处理网络连接问题3. 处理连接超时4. 处理协议错误或不匹配5. 异常处理 总结参考资料 博主 默语带您 Go to New World. ✍ 个人主页—— 默语 的博客&#…

24.Xaml ListView控件-----显示数据

1.运行效果 2.运行源码 a.Xaml源码 <Window x:Class="testView.MainWindow"xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"xmlns:d="http://schemas.mic…

电子信息工程专业课复习知识点总结:(四)信号与系统、数字信号处理

这次我不具体把所有概念写出来了&#xff0c;只针对一些面试中经常提问的重点问题。 第一章 信号与系统基本概念 这里提出一个信号与系统这本书的大纲&#xff1a;这本书研究的就是信号与系统的关系。 一.信号是什么&#xff1f; ①信息是自然世界中一种表现形式&#xff0…

pkg 打包 nodejs

一、先全局安装pkg npm i -g pkg 二、下载打包所需的 node-v16.16.0-linux-x64 和 node-v16.16.0-win-x64 下载地址&#xff0c;里面选择你需要的版本 三、放到pkg的缓存目录 windows&#xff1a;C:\Users\whh\.pkg-cache\v3.4&#xff0c;&#xff08;把whh替换为你的电脑…

用冒泡排序完成库函数qsort的作用

Hello&#xff0c;今天分享的是我们用冒泡函数实现qsort&#xff0c;也就是快排&#xff0c;之前我们也讲过库函数qsort的使用方法&#xff0c;今天我们尝试用冒泡函数实现一下&#xff0c;当然我们也见过qsort&#xff0c;后面也会继续完善的。这几天我是破防大学生&#xff0…

MFC-GetAdaptersAddresses获取网卡信息

需要&#xff1a;#pragma comment(lib, "IPHLPAPI.lib") GetAdaptersAddresses函数参数说明 ULONG bufferSize 0;ULONG result ::GetAdaptersAddresses(AF_UNSPEC, GAA_FLAG_INCLUDE_PREFIX, nullptr, nullptr, &bufferSize);/*参数1&#xff1a;ULONG Famil…

【hive】列转行—collect_set()/collect_list()/concat_ws()函数的使用场景

文章目录 一、collect_set()/collect_list()二、实际运用把同一分组的不同行的数据聚合成一个行用下标可以随机取某一个聚合后的中的值用‘|’分隔开使用collect_set()/collect_list()使得全局有序 一、collect_set()/collect_list() 在 Hive 中想实现按某字段分组&#xff0c…

Python数据分析 — 数据分析概念、重要性、流程和常用工具

前言&#xff1a;Hello大家好&#xff0c;我是小哥谈。Python数据分析是利用Python编程语言进行数据处理、转换、清洗、可视化和建模的过程。Python在数据科学领域非常流行&#xff0c;有许多强大的库和工具可供使用&#xff0c;例如NumPy、Pandas、Matplotlib和Scikit-learn等…

导数公式及求导法则

目录 基本初等函数的导数公式 求导法则 有理运算法则 复合函数求导法 隐函数求导法 反函数求导法 参数方程求导法 对数求导法 基本初等函数的导数公式 基本初等函数的导数公式包括&#xff1a; C0(x^n)nx^(n-1)(a^x)a^x*lna(e^x)e^x(loga(x))1/(xlna)(lnx)1/x(sinx)cos…

服务器访问本机图片nginx配置

下面是Nginx的配置 然后是yml文件配置 后端返回给前端的数据直接返回这个地址就可以了 {"success": true,"code": "200","msg": "操作成功","data": [{"趋势": "https://120.26.98.185:8090/s…

YOLO物体检测系列3:YOLOV3改进解读

&#x1f388;&#x1f388;&#x1f388;YOLO 系列教程 总目录 YOLOV1整体解读 YOLOV2整体解读 YOLOV3提出论文&#xff1a;《Yolov3: An incremental improvement》 1、YOLOV3改进 这张图讲道理真的过分了&#xff01;&#xff01;&#xff01;我不是针对谁&#xff0c;在…

一点感受

做了两天企业数字化转型的评委&#xff0c;涉及全国最顶级的公司、最顶级的实际落地项目案例&#xff0c;由企业真实的落地团队亲自当面讲解。主要是为了了解了解真实的一线、真实的客户、真实的应用现状和应用水平。 &#xff08;1&#xff09;现状 我评审的涉及底层技术平台&…

JMeter-BeanShell预处理程序和BeanShell后置处理程序的应用

一、什么是BeanShell&#xff1f; BeanShell是用Java写成的,一个小型的、免费的、可以下载的、嵌入式的Java源代码解释器&#xff0c;JMeter性能测试工具也充分接纳了BeanShell解释器&#xff0c;封装成了可配置的BeanShell前置和后置处理器&#xff0c;分别是 BeanShell Pre…

想要精通算法和SQL的成长之路 - 受限条件下可到达节点的数目

想要精通算法和SQL的成长之路 - 受限条件下可到达节点的数目 前言一. 相交链表&#xff08;邻接图和DFS&#xff09; 前言 想要精通算法和SQL的成长之路 - 系列导航 一. 相交链表&#xff08;邻接图和DFS&#xff09; 原题链接 public int reachableNodes(int n, int[][] ed…

Linux下Minio分布式存储安装配置(图文详细)

文章目录 Linux下Minio分布式存储安装配置(图文详细)1 资源准备1.1 创建存储目录1.2 获取Minio Server资源1.3 获取Minio Client资源 2 Minio Server安装配置2.1 切换目录2.2 后台启动2.3 查看进程2.4 控制台测试 3 Minio Client安装配置3.1 切换目录3.2 移动mc脚本3.2 运行mc命…

LeetCode 39. Combination Sum【回溯,剪枝】中等

本文属于「征服LeetCode」系列文章之一&#xff0c;这一系列正式开始于2021/08/12。由于LeetCode上部分题目有锁&#xff0c;本系列将至少持续到刷完所有无锁题之日为止&#xff1b;由于LeetCode还在不断地创建新题&#xff0c;本系列的终止日期可能是永远。在这一系列刷题文章…

cudnn-windows-x86_64-8.6.0.163_cuda11-archive 下载

网址不太好访问的话,请从下面我提供的分享下载 Download cuDNN v8.6.0 (October 3rd, 2022), for CUDA 11.x 此资源适配 cuda11.x 将bin和include文件夹里的文件&#xff0c;分别复制到C盘安装CUDA目录的对应文件夹里 安装cuda时自动设置了 CUDA_PATH_V11_8 及path C:\Progra…

数据结构——排序算法——快速排序

快速排序算法的基本思想是 1.从数组中取出一个数&#xff0c;称之为基数&#xff08;pivot&#xff09; 2.遍历数组&#xff0c;将比基数大的数字放到它的右边&#xff0c;比基数小的数字放到它的左边。遍历完成后&#xff0c;数组被分成了左右两个区域 3.将左右两个区域视为两…

leecode 每日一题 2596. 检查骑士巡视方案

2596. 检查骑士巡视方案 骑士在一张 n x n 的棋盘上巡视。在 有效 的巡视方案中&#xff0c;骑士会从棋盘的 左上角 出发&#xff0c;并且访问棋盘上的每个格子 恰好一次 。 给你一个 n x n 的整数矩阵 grid &#xff0c;由范围 [0, n * n - 1] 内的不同整数组成&#xff0c;其…