从零到一:Spring Boot 与 RocketMQ 的完美集成指南

1.Rocket的概念与原理

RocketMQ 是一款由阿里巴巴开源的分布式消息中间件,最初用于支持阿里巴巴的海量业务。它基于发布-订阅模型,具备高吞吐、低延迟、高可用和强一致性的特点,适用于消息队列、大规模数据流处理等场景。以下是对 RocketMQ 的原理和核心概念的详细介绍,帮助你快速认识和熟悉 RocketMQ。

1. 基础概念
  • Producer(生产者):负责生产消息的角色。Producer 发送消息到 RocketMQ 服务器(Broker),通常有同步、异步和单向发送三种方式。

  • Consumer(消费者):负责消费消息的角色。Consumer 从 Broker 拉取消息进行处理。消费者分为两种模式:

    • Push 模式:Broker 主动推送消息给 Consumer。
    • Pull 模式:Consumer 主动拉取消息。
  • Broker(消息服务器):核心组件,负责接收、存储和转发消息。Broker 还负责存储消息的元数据、处理生产者发送的消息请求以及消费者的消费请求。

  • Name Server:RocketMQ 的注册中心,Producer 和 Consumer 通过 Name Server 发现 Broker。Name Server 维护了所有 Broker 的路由信息,并提供名称解析服务。

  • Topic:消息的逻辑分类标签,Producer 发送消息时指定 Topic,Consumer 根据 Topic 订阅消息。

  • Tag:消息的二级分类,可以让 Consumer 更细粒度地过滤消息。

  • Message Queue(消息队列):每个 Topic 由多个消息队列组成,Producer 发送消息时会指定一个或多个队列。Consumer 从队列中拉取消息。

  • Message(消息):Producer 发送的数据单位。每条消息包含 Topic、Tag、内容、唯一标识符等属性。

  • Group:Producer 和 Consumer 都可以按照业务逻辑进行分组(Group)。Consumer Group 是多个 Consumer 的集合,这些 Consumer 可以消费同一个 Topic 下的消息。

2. 消息发送与消费流程
2.1 消息发送流程
  1. 消息生产:Producer 生成消息并指定目标 Topic。
  2. 路由发现:Producer 通过 Name Server 查询 Topic 对应的 Broker 路由信息。
  3. 消息发送:Producer 根据路由信息,将消息发送到指定的 Broker。Producer 可以选择同步发送、异步发送或者单向发送。
  4. 消息存储:Broker 接收到消息后,将消息存储在本地的 CommitLog 中,同时更新消息的元数据。
2.2 消息消费流程
  1. 订阅消息:Consumer 启动时,向 Name Server 注册自己,并订阅感兴趣的 Topic。
  2. 路由发现:Consumer 通过 Name Server 获取 Topic 对应的 Broker 信息以及消息队列信息。
  3. 拉取消息:Consumer 从 Broker 中拉取消息,并进行消费。消费的位置信息(消费进度)会定期同步到 Broker 端或持久化到本地。
  4. 消息处理:Consumer 处理接收到的消息,处理完毕后根据不同的消费模式(自动或手动)确认消息消费完成。
3. 消息存储机制
  • CommitLog:Broker 将接收到的消息持久化到 CommitLog 文件中,这是 RocketMQ 消息存储的核心文件。消息以追加的方式写入,方便快速写入和批量读取。

  • ConsumeQueue:消费队列,RocketMQ 为每个 Topic 创建对应的消费队列文件。ConsumeQueue 中存储了指向 CommitLog 的偏移量,用于定位消息内容。

  • IndexFile:索引文件,用于根据消息的某些字段(如消息 ID、Key)快速检索消息。

RocketMQ 的消息存储分为三个层次:首先是将消息内容存储在 CommitLog 文件中,然后将消息的位置信息存储在 ConsumeQueue 文件中,最后通过 IndexFile 文件提供快速查询功能。

4. 消息可靠性与事务
4.1 消息可靠性

RocketMQ 通过以下机制保证消息的可靠性:

  • 消息确认机制:Producer 发送消息后,需要等待 Broker 的确认。对于消费者,也有消费确认机制,保证消息被成功处理。
  • 消息重试:Consumer 在消费失败时,RocketMQ 会自动进行消息重试,直到消费成功或达到最大重试次数。
  • 消息持久化:Broker 会将消息持久化到磁盘,确保即使 Broker 崩溃,消息数据依然可以恢复。
4.2 事务消息

RocketMQ 支持事务消息,通过“二阶段提交”保证分布式事务的一致性。Producer 在发送事务消息时,首先发送半消息(Prepared Message),执行本地事务,最后根据事务执行结果来提交(Commit)或回滚(Rollback)消息。

5. 消息过滤与顺序消息
5.1 消息过滤

RocketMQ 提供了基于 Tag 的消息过滤功能,可以在消费端只接收特定 Tag 的消息。这减少了 Consumer 不必要的消息处理,提高了消费效率。

5.2 顺序消息

RocketMQ 支持全局顺序消息和部分顺序消息(分区顺序)。全局顺序消息要求所有消息按顺序发送和消费,通常只有一个队列;而分区顺序消息则允许不同分区(队列)中的消息可以并行处理,但同一分区内的消息必须按顺序处理。

6. 高可用性与集群
  • 主从架构(Master-Slave):RocketMQ 支持主从架构,Master 负责处理所有读写请求,Slave 仅用于备份数据。当 Master 故障时,Slave 可以作为备用 Broker 启动,确保数据不丢失。

  • 集群模式(Cluster Mode):支持多种集群模式,如多 Master 模式、Master-Slave 模式。不同集群模式可以根据业务需求灵活配置,以平衡性能、可靠性和可扩展性。

7. 性能优化与监控
  • 性能优化:RocketMQ 提供了多种手段优化性能,如通过多线程并发处理消息、使用异步 IO 提高吞吐量、调节 Producer 的批量发送、优化消息压缩等。

  • 监控与运维:RocketMQ 提供了丰富的监控指标,可以通过 RocketMQ Console 或者集成 Prometheus、Grafana 进行监控。运维人员可以实时监控消息的生产、消费情况以及 Broker 的运行状态。

RocketMQ 是一款功能强大、性能优异的分布式消息中间件,适用于多种业务场景。通过理解其核心概念和原理,你可以更好地在实际项目中应用 RocketMQ。了解消息的发送与消费流程、存储机制、可靠性保障、事务支持、以及顺序消息的处理等方面的知识,将有助于你更高效地使用和管理 RocketMQ 系统。

2.RocketMQ的安装与启动

要在 Windows 中下载安装 RocketMQ,可以按照以下步骤进行操作:

1. 安装 Java 环境

RocketMQ 依赖 Java 运行时环境 (JRE) 或 Java 开发工具包 (JDK),所以首先需要确保系统上安装了 JDK。

  • 到 Oracle 的官网 下载并安装 JDK 11 或更高版本。
  • 安装完成后,配置 JAVA_HOME 环境变量:
    • 右键点击“此电脑”,选择“属性”。
    • 点击“高级系统设置” -> “环境变量”。
    • 在“系统变量”中点击“新建”,设置 JAVA_HOME 变量,值为 JDK 的安装路径,例如 C:Program FilesJavajdk-11.0.10
    • 在系统变量中找到 Path,编辑并添加 %JAVA_HOME%inPath 中。
2. 下载并解压 RocketMQ
  • 到 Apache RocketMQ 官网 下载最新版本的 RocketMQ 二进制包(目前最新版为 rocketmq-all-5.3.0-bin-release.zip)。
  • 将下载的 ZIP 文件解压到您选择的目录,例如 C:application ocketmq ocketmq-all-5.3.0-bin-release ocketmq-all-5.3.0-bin-release.
    在这里插入图片描述
3. 配置 RocketMQ环境变量

安装完成后,配置 ROCKETMQ_HOME 环境变量:

  • 右键点击“此电脑”,选择“属性”。
  • 点击“高级系统设置” -> “环境变量”。
  • 在“系统变量”中点击“新建”,设置 ROCKETMQ_HOME 变量,值为 JDK 的安装路径,例如 C:application ocketmq ocketmq-all-5.3.0-bin-release ocketmq-all-5.3.0-bin-release
  • 在系统变量中找到 Path,编辑并添加 %ROCKETMQ_HOME%inPath 中。

需要注意的是,如果未配置环境变量,在启动NameServer时会看到提示 Please set the ROCKETMQ_HOME variable in your environment!. 因此,安装最新版RocketMQ后,请确保已设置环境变量。

4. 启动 NameServer 和 Broker

RocketMQ 包含两个主要的服务:NameServer 和 Broker。

4.1 启动 NameServer
  1. 打开命令提示符 (CMD)。

  2. 启动 NameServer:

    start mqnamesrv.cmd
    
  3. 确保 NameServer 启动成功,终端会显示 The Name Server boot success.
    在这里插入图片描述

4.2 启动 Broker
  1. 同样在命令提示符中,执行以下命令启动 Broker:

    start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
    
  2. 如果成功启动,终端会显示 The broker boot success.
    在这里插入图片描述

5. 验证安装

可以使用 RocketMQ 提供的工具来验证安装是否成功。

5.1 发送消息
  1. bin 目录中,使用以下命令发送一条测试消息:

    tools.cmd org.apache.rocketmq.example.quickstart.Producer
    
  2. 这将发送一条消息到默认的 TopicTest

5.2 接收消息
  1. bin 目录中,使用以下命令接收测试消息:

    tools.cmd org.apache.rocketmq.example.quickstart.Consumer
    
  2. 如果消费者成功接收到消息,则表示 RocketMQ 安装和配置成功。

6. 使用控制台管理 RocketMQ

你可以选择安装 RocketMQ 控制台工具来更方便地管理和监控 RocketMQ 集群。

6.1 下载并启动 RocketMQ 控制台
  1. 从 GitHub 下载 RocketMQ 控制台。

  2. 使用以下命令启动控制台:

    java -jar rocketmq-console-ng-xxxx.jar --server.port=8080 --rocketmq.config.namesrvAddr=127.0.0.1:9876
    
  3. 启动后,打开浏览器访问 http://localhost:8080,你就可以通过图形化界面管理 RocketMQ。

这样,RocketMQ 就已经在 Windows 上成功安装并运行了。

3.SpringBoot集成RocketMQ

1.创建项目

在 IntelliJ IDEA 中创建一个 Spring Boot 项目很简单:选择 “新建项目”,然后选取 “Spring Initializr”;填写项目的基本信息如 Group 和 Artifact ID,选择 Java 版本和构建工具;添加所需的 Spring Boot Starter 依赖;最后选择项目位置并点击 “下一步” 完成创建。
在这里插入图片描述
后面仅选择Spring Web依赖后点击创建即可!
这里需要注意,如果创建项目后发现maven依赖始终无法下载下来,可以选择打开cmd终端,输入命令

where mvn

在这里插入图片描述
之后根据maven的路径,找到对应的conf下的setting.xml配置文件中更换镜像源地址,具体如下:

	<mirror><id>alimaven</id><name>aliyun maven</name><url>http://maven.aliyun.com/nexus/content/groups/public/</url><mirrorOf>central</mirrorOf>        </mirror>

在这里插入图片描述

2.引入依赖

首先,在你的pom.xml中添加RocketMQ的依赖,具体如下所示:

		<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.0</version></dependency>
3.配置文件

在``application.properties`中添加RocketMQ的基本配置:

spring.application.name=rocketmq-demorocketmq.nameServer=127.0.0.1:9876
rocketmq.producer.group=producer-group
rocketmq.consumer.group=consumer-group
4.创建消息生产者

创建一个 RocketMQProducer 类,包含三种消息发送方式:同步发送、异步发送和单向发送。

import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;@Service
public class RocketMQProducer {@Autowiredprivate RocketMQTemplate rocketMQTemplate;private final String topic = "demo-topic";// 1.同步发送消息public void sendSyncMessage(String message){rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(message).build());System.out.printf("同步发送结果: %s
", message);}// 2.异步发送消息public void sendAsyncMessage(String message){rocketMQTemplate.asyncSend(topic, MessageBuilder.withPayload(message).build(), new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.printf("异步发送成功: %s
", sendResult);}@Overridepublic void onException(Throwable throwable) {System.out.printf("异步发送失败: %s
", throwable.getMessage());}});}// 3.单向发送消息public void sendOneWayMessage(String message){rocketMQTemplate.sendOneWay(topic, MessageBuilder.withPayload(message).build());System.out.println("单向消息发送成功");}
}
5.创建消息消费者

接下来,创建一个 RocketMQConsumer 类来处理消费消息的逻辑。

import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;@Service
@RocketMQMessageListener(topic = "demo-topic", consumerGroup = "consumer-group", messageModel = MessageModel.CLUSTERING)
public class RocketMQConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String s) {System.out.printf("收到消息: %s
", s);}
}
6.测试消息发送

编写一个简单的RocketController控制器,用于触发消息发送!

import com.xing.rocketmqdemo.service.RocketMQProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@RestController
public class RocketController {@Autowiredprivate RocketMQProducer rocketMQProducer;@GetMapping("/sendSync")public String sendSync(@RequestParam String message) {rocketMQProducer.sendSyncMessage(message);return "同步消息发送成功";}@GetMapping("/sendAsync")public String sendAsync(@RequestParam String message) {rocketMQProducer.sendAsyncMessage(message);return "异步消息发送中";}@GetMapping("/sendOneWay")public String sendOneWay(@RequestParam String message) {rocketMQProducer.sendOneWayMessage(message);return "单向消息发送成功";}
}
7.启动项目并测试

启动你的 Spring Boot 应用程序,并通过浏览器或工具(如 Postman)访问以下 URL 来测试不同的消息发送方式:

  • 同步发送消息:http://localhost:8080/sendSync?message=HelloSync
  • 异步发送消息:http://localhost:8080/sendAsync?message=HelloAsync
  • 单向发送消息:http://localhost:8080/sendOneWay?message=HelloOneWay
8.解决问题

如果你出现类似以下的报错

***************************
APPLICATION FAILED TO START
***************************Description:Field rocketMQTemplate in com.xing.rocketmqdemo.service.RocketMQProducer required a bean of type 'org.apache.rocketmq.spring.core.RocketMQTemplate' that could not be found.The injection point has the following annotations:- @org.springframework.beans.factory.annotation.Autowired(required=true)Action:Consider defining a bean of type 'org.apache.rocketmq.spring.core.RocketMQTemplate' in your configuration.

可参考解决:

  1. application.properties配置文件中添加生产者组的配置
    rocketmq.producer.group=producer-group

  2. 解决SpringBoot 3 + RocketMQ 配置不生效的问题
    具体可参考以下博客:
    https://kunyuan.tech/archives/366
    https://github.com/apache/rocketmq-spring/pull/541

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/6232.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Odoo免费开源ERP最佳业务实践:生产管理

文 / 开源智造&#xff08;OSCG&#xff09; Odoo亚太金牌服务 概述 Odoo是全球排名第一的免费开源ERP系统&#xff0c;以其强大的功能和模块化设计著称&#xff0c;适用于各种规模及类型的企业。Odoo集成了生产、采购、销售、库存、财务、人力资源、市场营销、电子商务等多个…

1.CSS的三大特性

css有三个非常重要的三个特性&#xff1a;层叠性、继承性、优先级 1.1 层叠性 想通选择器给设置想听的样式&#xff0c;此时一个样式就会覆盖&#xff08;层叠&#xff09;另一个冲突的样式。层叠性主要是解决样式冲突的问题。 <!DOCTYPE html> <html lang"en&…

【2024年华为OD机试】(A卷,200分)- 优雅子数组 (JavaScriptJava PythonC/C++)

一、问题描述 题目描述 如果一个数组中出现次数最多的元素出现大于等于 k 次&#xff0c;被称为 k-优雅数组&#xff0c;k 也可以被称为优雅阈值。 例如&#xff1a; 数组 [1, 2, 3, 1, 2, 3, 1] 是一个 3-优雅数组&#xff0c;因为元素 1 出现次数大于等于 3 次。数组 [1,…

电子应用设计方案102:智能家庭AI鱼缸系统设计

智能家庭 AI 鱼缸系统设计 一、引言 智能家庭 AI 鱼缸系统旨在为鱼类提供一个健康、舒适的生活环境&#xff0c;同时为用户提供便捷的管理和观赏体验。 二、系统概述 1. 系统目标 - 自动维持水质稳定&#xff0c;包括水温、酸碱度、硬度和溶氧量等关键指标。 - 智能投食&…

智能化加速标准和协议的更新并推动验证IP(VIP)在芯片设计中的更广泛应用

作者&#xff1a;Karthik Gopal, SmartDV Technologies亚洲区总经理 智权半导体科技&#xff08;厦门&#xff09;有限公司总经理 随着AI技术向边缘和端侧设备广泛渗透&#xff0c;芯片设计师不仅需要考虑在其设计中引入加速器&#xff0c;也在考虑采用速度更快和带宽更高的总…

Vue3.5 企业级管理系统实战(三):页面布局及样式处理 (Scss UnoCSS )

本章主要是关于整体页面布局及样式处理&#xff0c;在进行这一章代码前&#xff0c;先将前两章中的示例代码部分删除&#xff08;如Home.vue、About.vue、counter.ts、App.vue中引用等&#xff09; 1 整体页面布局 页面整体布局构成了产品的框架基础&#xff0c;通常涵盖主导…

Linux 消息队列的使用方法

文章目录 1.概念2. 创建消息队列3. 发送消息4. 接收消息5. 消息结构体6. 消息队列控制&#xff08;删除、获取队列状态&#xff09;消息队列是否存在7. 使用场景8. 注意事项使用例子判断消息队列是否存在的代码获取队列空间大小 1.概念 消息队列是一种进程间通信 (IPC) 机制&a…

低代码可视化-转盘小游戏可视化-代码生成器

转盘小程序是一种互动工具&#xff0c;它通过模拟真实的转盘抽奖或决策体验&#xff0c;为用户提供了一种有趣且公平的选择方式。以下是对转盘小程序的详细介绍&#xff1a; 转盘小程序的应用场景 日常决策&#xff1a;转盘小程序可以帮助用户解决日常生活中的选择困难问题&a…

【Prometheus】Prometheus如何监控Haproxy

✨✨ 欢迎大家来到景天科技苑✨✨ &#x1f388;&#x1f388; 养成好习惯&#xff0c;先赞后看哦~&#x1f388;&#x1f388; &#x1f3c6; 作者简介&#xff1a;景天科技苑 &#x1f3c6;《头衔》&#xff1a;大厂架构师&#xff0c;华为云开发者社区专家博主&#xff0c;…

仅仅4M!windows系统适用,免费无限制使用!

软件介绍 在日常生活里&#xff0c;我们经常会碰到电脑运行迟缓、网速卡顿的现象&#xff0c;却又不清楚是哪个程序在占用过多资源。这种时候&#xff0c;一款能实时监测网络和系统状态的工具就变得非常关键了。今天呢&#xff0c;就给大家介绍一个小巧又实用的监控工具——「T…

计算机毕业设计hadoop+spark+hive图书推荐系统 豆瓣图书数据分析可视化大屏 豆瓣图书爬虫 知识图谱 图书大数据 大数据毕业设计 机器学习

温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 作者简介&#xff1a;Java领…

Harmony Next 支持创建分身

应用分身能实现在一个设备上安装多个相同的应用&#xff0c;实现多个账号同时登录使用和运行并且互不影响。主要应用场景有社交账号双开、游戏大小号双开等&#xff0c;无需账号切换&#xff0c;从而省去频繁登录的繁琐。 Harmony Next 很容易就能让 App 支持创建分身。 官方文…

Linux下 date时间应该与系统的 RTC(硬件时钟)同步

发现客户服务器时间与真实时间不同&#xff0c;并且服务器没有网络。 解决办法&#xff1a;时间应该与系统的 RTC&#xff08;硬件时钟&#xff09;同步 手动设置系统时间 使用 date 命令将系统时间设置为 2025年01月21日 14:12:00&#xff1a; sudo date --set"2025-01…

XX污水处理厂基于RK3576核心板应用(四)——人员倒地智能识别系统方案

通过 SAIL-RK3576核心板 支撑的 人员倒地识别系统&#xff0c;污水处理厂能够在广阔、复杂的区域内实时监控人员安全&#xff0c;实现意外倒地等事故的秒级响应与干预。搭配多元人形动态监测机制&#xff0c;还可进一步拓展对其他异常动作或不安全行为的识别&#xff0c;持续保…

【IEEE Fellow 主讲报告| EI检索稳定】第五届机器学习与智能系统工程国际学术会议(MLISE 2025)

重要信息 会议时间地点&#xff1a;2025年6月13-15日 中国深圳 会议官网&#xff1a;http://mlise.org EI Compendex/Scopus稳定检索 会议简介 第五届机器学习与智能系统工程国际学术会议将于6月13-15日在中国深圳隆重召开。本次会议旨在搭建一个顶尖的学术交流平台&#xf…

css粘性定位超出指定宽度失效问题

展示效果 解决办法&#xff1a;外层容器添加display:grid即可 完整代码 <template><div class"box"><div class"line" v-for"items in 10"><div class"item" v-for"item in 8">drgg</div>&…

随机变量的变量替换——归一化流和直方图规定化的数学基础

变量替换是一种在统计学和数学中广泛应用的技术&#xff0c;它通过定义新的变量来简化问题&#xff0c;使得原本复杂的随机变量变得更加容易分析。 变量替换的公式&#xff0c;用于将一个随机变量 X X X 的概率密度函数 f X f_X fX​ 转换为其经过函数 g g g 变换后的随机变…

Scrapy之一个item包含多级页面的处理方案

目标 在实际开发过程中&#xff0c;我们所需要的数据往往需要通过多个页面的数据汇总得到&#xff0c;通过列表获取到的数据只有简单的介绍。站在Scrapy框架的角度来看&#xff0c;实际上就是考虑如何处理一个item包含多级页面数据的问题。本文将以获取叶子猪网站的手游排行榜及…

应用层协议 HTTP 讲解实战:从0实现HTTP 服务器

&#x1f308; 个人主页&#xff1a;Zfox_ &#x1f525; 系列专栏&#xff1a;Linux 目录 一&#xff1a;&#x1f525; HTTP 协议 &#x1f98b; 认识 URL&#x1f98b; urlencode 和 urldecode 二&#xff1a;&#x1f525; HTTP 协议请求与响应格式 &#x1f98b; HTTP 请求…

Ansys Motor-CAD:IPM 电机实验室 - 扭矩速度曲线

各位电动机迷们&#xff0c;大家好&#xff1a; 在本博客中&#xff0c;我讨论了如何使用 Ansys Motor-CAD 通过 LAB 模块获取扭矩速度曲线。使用每安培最大扭矩电机控制策略&#xff0c;并涵盖恒定扭矩区域和恒定功率、磁通减弱区域。分析了高转子速度如何影响功率输出。 模型…