IDEA 导入 RocketMQ 源码

目录

  • 前言
  • 一、RocketMQ 架构
  • 二、环境准备
  • 三、下载源码
  • 四、编译源码
    • 4.1 导入源码
    • 4.2 目录结构
    • 4.3 运行程序
      • 1. 启动 Namesrv
      • 2. 启动 Broker
      • 3. 启动 Producer
      • 4. 启动 Consumer
  • 五、监控平台的搭建
    • 5.1 下载 console 源码
    • 5.2 IDEA 启动


前言

最近项目中有个功能需要在本地调试下 RocketMQ,所以需要在本地导入 RocketMQ 的源码并启动,故做此记录,便于回顾问题和与各位同学一起探讨。


一、RocketMQ 架构

在源码搭建前, 需要先理解 RocketMQ 的四个重要组件, 以及 RocketMQ 的工作流程:

在这里插入图片描述

  • NameServer 是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。

  • Broker 部署相对复杂,Broker 分为 MasterSlave,一个 Master 可以对应多个 Slave,但是一个 Slave 只能对应一个 MasterMasterSlave 的对应关系通过指定相同的 BrokerName,不同的 BrokerId 来定义,BrokerId0 表示 Master,非 0 表示 SlaveMaster 也可以部署多个。每个 BrokerNameServer 集群中的所有节点建立长连接,定时注册 Topic 信息到所有 NameServer。 注意:当前 RocketMQ 版本在部署架构上支持一 MasterSlave,但只有 BrokerId=1 的从服务器才会参与消息的读负载。

  • ProducerNameServer 集群中的其中一个节点(随机选择)建立长连接,定期从 NameServer 获取 Topic 路由信息,并向提供 Topic 服务的 Master 建立长连接,且定时向 Master 发送心跳。Producer 完全无状态,可集群部署。

  • ConsumerNameServer 集群中的其中一个节点(随机选择)建立长连接,定期从 NameServer 获取 Topic 路由信息,并向提供 Topic 服务的 MasterSlave 建立长连接,且定时向 MasterSlave 发送心跳。Consumer 既可以从 Master 订阅消息,也可以从 Slave 订阅消息,消费者在向 Master 拉取消息时,Master 服务器会根据拉取偏移量与最大偏移量的距离(判断是否读老消息,产生读 I/O),以及从服务器是否可读等因素建议下一次是从 Master 还是 Slave 拉取。

结合部署架构图,描述集群工作流程:

  • 启动 NameServerNameServer 起来后监听端口,等待 Broker、Producer、Consumer 连上来,相当于一个路由控制中心。

  • Broker 启动,跟所有的 NameServer 保持长连接,定时发送心跳包。心跳包中包含当前 Broker 信息 ( IP+ 端口等) 以及存储所有 Topic 信息。注册成功后,NameServer 集群中就有 TopicBroker 的映射关系。

  • 收发消息前,先创建 Topic,创建 Topic 时需要指定该 Topic 要存储在哪些 Broker 上,也可以在发送消息时自动创建 Topic

  • Producer 发送消息,启动时先跟 NameServer 集群中的其中一台建立长连接,并从 NameServer 中获取当前发送的 Topic 存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。

  • ConsumerProducer 类似,跟其中一台 NameServer 建立长连接,获取当前订阅 Topic 存在哪些 Broker 上,然后直接跟 Broker 建立连接通道,开始消费消息。


二、环境准备

  • ① JDK 1.8

    java -version

    在这里插入图片描述

  • ② IntelliJ IDEA 2021

  • ③ RocketMQ-4.9.x 源码

  • ④ Maven

  • ⑤ Git

  • ⑥ Windows 11


三、下载源码

我们可以在 github 或者 gitee 上都能下载到 RocketMQ 的源码

github 上下载:

github 上搜素 rocketmq 就能找到:https://github.com/apache/rocketmq

在这里插入图片描述

gitee上下载:

gitee 上搜素 rocketmq 就能找到:https://gitee.com/apache/rocketmq

在这里插入图片描述

这里我是直接下载 ZIP 压缩包的,可以避免很多问题

在这里插入图片描述

解压:

在这里插入图片描述


四、编译源码

4.1 导入源码

打开 IDEA ,选择 File -> Open

在这里插入图片描述

选中 rocketmq 源码所在目录

在这里插入图片描述

导入进来是这样子的

在这里插入图片描述

项目导进来之后先检查下 JDK 的配置,配置 JDK 版本 1.8 的(建议 JDK 的版本不要配置太高)

在这里插入图片描述

配置你的 Maven

在这里插入图片描述

检查下 git 配置(编译的时候会自动去检测 git,所以需要检查下)

在这里插入图片描述


4.2 目录结构

在这里插入图片描述


4.3 运行程序

本地 Debug 环境搭建过程如下:

  • ① 通过源码启动 Namesrv
  • ② 通过源码启动 broker
  • ③ 通过源码启动 Producer
  • ④ 通过源码启动 Consumer

1. 启动 Namesrv

Namesrv 源码在 rocketmq-namesrv 包下,启动类是 src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java,直接通过NamesrvStartupmain 方法启动会失败

在这里插入图片描述

终端输出提示我们需要配置一个 ROCKETMQ_HOME 环境变量,我们将环境变量配置到 IDEA

在这里插入图片描述

例如:ROCKETMQ_HOME=D:\source-code\rocketmq-4.9.x\rocketmq-4.9.x

在这里插入图片描述

再次启动 NamesrvStartup 后再次报错

在这里插入图片描述

意思是没有读到 conf 目录下的配置文件 logback_namesrv.xml

那就在项目下创建一个 conf 的文件夹

在这里插入图片描述

在这里插入图片描述

logback_namesrv.xml 这个配置文件可以在 distribution 的模块下找到,只需要将该文件复制一份到你所创建 conf 目录下即可

在这里插入图片描述

再启动 namesrv,控制台提示启动成功 The Name Server boot success.

在这里插入图片描述

到此 NameSrver 就算是启动完成了


2. 启动 Broker

Broker 源码在 rocketmq-broker 包下,启动类是 src/main/java/org/apache/rocketmq/broker/BrokerStartup.java,如果直接通过 IDEA 启动也是会失败

在这里插入图片描述

与上面 namesrv 一样,我们在 IDEA 上配置启动环境变量 ROCKETMQ_HOME

例如:ROCKETMQ_HOME=D:\source-code\rocketmq-4.9.x\rocketmq-4.9.x

在这里插入图片描述

再次启动还是会报与 Nameserv 相同的问题,我们只需要在 distribution 模块下找到 broker 相关的两个配置文件 logback_broker.xmlbroker.conf 两个文件复制到 conf 目录下即可

在这里插入图片描述

再次启动,就可以看到 broker 也正常运行了

在这里插入图片描述

但是仔细观察会发现虽然 broker 启动成功了,但是 brokerName 好像和配置文件 broker.conf 中的 brokerName 不一致

在这里插入图片描述

我们可以启动 broker 时让它指定以 conf/broker.conf 的配置文件启动

在这里插入图片描述

再次启动

在这里插入图片描述

broker.conf 的配置文件中还有以下配置可以修改

# namesrv服务地址
namesrvAddr = 127.0.0.1:9876
# 运行自动创建topic,避免调试的时候麻烦
autoCreateTopicEnable = true
# 数据存储路径
storePathRootDir = D:/file/rocketmq/data_store
# commitlog存储文件
storePathCommitLog = D:/file/rocketmq/data_store/commitlog
# 消费队列存储文件
storePathConsumeQueue = D:/file/rocketmq/data_store/consumequeue
# 索引存储文件
storePathIndex = D:/file/rocketmq/data_store/index
# checkpoint存储文件
storeCheckpoint = D:/file/rocketmq/data_store/checkpoint
# abort文件
abortFile = D:/file/rocketmq/data_store/abort

做本地调试的时候最好添加以下两个配置:

# namesrv服务地址
namesrvAddr = 127.0.0.1:9876
# 运行自动创建topic,避免调试的时候麻烦
autoCreateTopicEnable = true

在这里插入图片描述

为了更好的查看启动 broker 的相关配置,可以在 logback_broker.xml 配置文件中的 RocketmqBroker 里面追加 <appender-ref ref="STDOUT"/> 配置,例如:

    <logger name="RocketmqBroker" additivity="false"><level value="INFO"/><appender-ref ref="STDOUT"/><appender-ref ref="RocketmqBrokerAppender"/></logger>

在这里插入图片描述


3. 启动 Producer

example 模块中官方给了一个 producer 的示例

在这里插入图片描述

package org.apache.rocketmq.example.quickstart;import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;public class Producer {// 发送消息次数:原本是 1000 次,作为演示我调整为 1 次public static final int MESSAGE_COUNT = 1;public static final String PRODUCER_GROUP = "please_rename_unique_group_name";public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";public static final String TOPIC = "TopicTest";public static final String TAG = "TagA";public static void main(String[] args) throws MQClientException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);// 下面这行代码原本是注释的,这里要放开注释producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);producer.start();for (int i = 0; i < MESSAGE_COUNT; i++) {try {Message msg = new Message(TOPIC /* Topic */,TAG /* Tag */,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);} catch (Exception e) {e.printStackTrace();Thread.sleep(1000);}}producer.shutdown();}
}

直接运行即可

在这里插入图片描述

可以看到是有发送一条消息的


4. 启动 Consumer

同样的在 example 模块中官方给了一个 consumer 的示例

在这里插入图片描述

package org.apache.rocketmq.example.quickstart;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;public class Consumer {public static final String CONSUMER_GROUP = "please_rename_unique_group_name_4";public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";public static final String TOPIC = "TopicTest";public static void main(String[] args) throws InterruptedException, MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);// 下面这行代码原本是注释的,这里要放开注释consumer.setNamesrvAddr(DEFAULT_NAMESRVADDR);consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe(TOPIC, "*");consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});consumer.start();System.out.printf("Consumer Started.%n");}
}

直接运行

在这里插入图片描述

运行后可以看到 consumer 消费调了之前 producer 生成出来的那条消息


五、监控平台的搭建

RocketMQ 有一个专门的监控平台来查看 MQ 的情况,大概长这样子

在这里插入图片描述

5.1 下载 console 源码

下载链接:https://github.com/apache/rocketmq-externals

在这里插入图片描述可以通过 git 把它给拉下来,可以看到这里面有 rocketmq 与各种各样的技术集成,但是这个监控平台只需要启动 rocketmq-console 这个服务就行了

以下是我下载的:
链接:百度网盘链接
提取码:no5x

找个目录存放,然后解压下来。

在这里插入图片描述

5.2 IDEA 启动

进入 IDEA ,打开 rocketmq-console 项目

在这里插入图片描述

在这里插入图片描述

IDEA 打开以后,修改配置文件 application.properties

在这里插入图片描述
主要是将 rocketmq.config.namesrvAddr 设置成你 RocketMQ 所运行的服务器 IP (公网)地址,然后就可以直接启动了

在这里插入图片描述启动完成之后游览器上输入:localhost:8080,访问

在这里插入图片描述

到目前位置,RocketMQ 的监控平台也用 IDEA 启动成功了


参考文章:

RocketMQ 监控平台搭建与项目引入:https://blog.csdn.net/xhmico/article/details/124489116

基于 IDEA 搭建 RocketMQ-4.6 源码环境:https://juejin.cn/post/7166279522772320286

【RocketMQ | 源码分析】RocketMQ本地调试环境搭建:https://juejin.cn/post/7216729116690694199

手把手教你使用Idea调试RocketMQ源码:https://juejin.cn/post/7166175844145037319

RocketMQ 源码分析: https://gitee.com/haijun1998/rocketmq、https://gitee.com/wen-zhan/rocketmq

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/406696.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

验证实战知识点--(2)

1.seq中的pre_start pre_start 是 uvm_sequence 类的一个虚拟方法&#xff0c;用于在序列开始执行之前进行初始化和设置。这个方法在调用 start 方法前立即执行&#xff0c;提供了一个执行自定义初始化代码的机会。 start 方法用于启动序列的执行&#xff0c;而 pre_start 可以…

【MySQL】数据库基础(库的操作)

目录 一、MySQL安装、连接、修改密码操作 二、库的操作 2.1 创建数据库 2.2 字符集和校验规则 2.3 操控数据库 2.4 修改数据库 2.5 删除数据库 2.6 数据库的备份和恢复 2.7 查看连接情况 前情提要&#xff1a; 我的服务器操作系统是Ubuntu20.04&#xff0c;安装的是M…

06--kubernetes.pod管理与投射数据卷

前言&#xff1a;上一章记录了部署k8s常用的两个方式&#xff0c;这一章就简单一些&#xff0c;整理一下k8s资源对象的配置和管理命令。 1、集群状态检查 前天搭建的环境&#xff0c;然后关机了两天今天开启后第一时间需要检查集群环境是否正常 [rootk8s-master1 ~]# kubect…

探索《黑神话·悟空》背后的AI技术支持:英伟达全景光线追踪技术、DLSS 3.5 与帧生成

引言 2023 年&#xff0c;游戏《黑神话悟空》以其震撼的视觉效果和深度沉浸的游戏体验&#xff0c;成为全球玩家热议的焦点。这款游戏在发布初期就取得了惊人的销量&#xff1a;预售阶段便突破 120 万套&#xff0c;而发售首日更是达到 450 万份的惊人成绩。这个现象级作品背后…

大模型微调课程及大模型应用开发课程介绍

大模型实验室是在学校现有的实验室建设基础上&#xff0c;依托行业标杆企业&#xff0c;聚焦行业大模型产业发展方向&#xff0c;建设一个产学研一体化的合作教学平台&#xff0c;形成“教与学紧密结合、理论与实践紧密结合&#xff0c;学校与企业紧密结合”的创新教育模式。大…

初识C++以及安装C++学习工具

C的发展史 C是由Bjarne Stroustrup在20世纪80年代初期于贝尔实验室开发的一种编程语言。它的设计初衷是作为C语言的一个超集&#xff0c;通过添加面向对象编程的特性来增强C语言。C支持多种编程范式&#xff0c;包括过程化编程、面向对象编程和泛型编程。 C的历史可以追溯到1…

[数据集][目标检测]道路积水检测数据集VOC+YOLO格式2699张1类别

数据集格式&#xff1a;Pascal VOC格式YOLO格式(不包含分割路径的txt文件&#xff0c;仅仅包含jpg图片以及对应的VOC格式xml文件和yolo格式txt文件) 图片数量(jpg文件个数)&#xff1a;2699 标注数量(xml文件个数)&#xff1a;2699 标注数量(txt文件个数)&#xff1a;2699 标注…

python-逆序数(赛氪OJ)

[题目描述] 在一个排列中&#xff0c;如果一对数的前后位置与大小顺序相反&#xff0c;即前面的数大于后面的数&#xff0c;那么它们就称为一个逆序。一个排列中逆序的总数就称为这个排列的逆序数。比如一个元素个数为 4 的数列&#xff0c;其元素为 2,4,3,1&#xff0c;则 (2,…

深度优先搜索-放苹果

放苹果 http://noi.openjudge.cn/ch0205/666/ #include<bits/stdc.h> using namespace std;int dfs(int,int); //第一个赋值为1 其余为0 int a[11]{1},ans,n,m;int main(){ int k; cin>>k; for(int i1;i<k;i){ ans0; cin>>m>>n; dfs(m,1);//m个…

Windows C++控制台菜单库开发与源码展示

Windows C控制台菜单库 声明&#xff1a;演示视频&#xff1a;一、前言二、具体框架三、源码展示console_screen_set.hframeconsole_screen_frame_base.hconsole_screen_frame_char.hconsole_screen_frame_wchar_t.hconsole_screen_frame.h menuconsole_screen_menu_base.hcons…

入门 - Vue中使用axios原理分析及解决前端跨域问题

1. 什么是Axios&#xff1f; Axios&#xff08;ajax i/o system&#xff09;&#xff0c;是Vue创建者主推的请求发送方式&#xff0c;因其简单的配置与良好的性能被前端爱好者所喜爱。众所周知&#xff0c;在进行网页设计时经常需要从后端拿数据&#xff0c;在Web应用初期会将…

计算机网络之TCP序号,确认序号和报文传输时间

开篇提示 本篇适合于了解基础知识&#xff0c;进行扩展提高的使用&#xff0c;附带考研习题以及解析。 TCP序号和确认序号的区别 TCP首部中有序号和确认序号&#xff0c;他们都是4个字节&#xff08;4B&#xff09;&#xff0c;且在数据传输中有很重要的意义&#xff0c;那么两…

0x01 GlassFish 任意文件读取漏洞复现

参考文章&#xff1a; 应用服务器glassfish任意文件读取漏洞 - SecPulse.COM | 安全脉搏 fofa 搜索使用该服务器的网站 网络空间测绘&#xff0c;网络空间安全搜索引擎&#xff0c;网络空间搜索引擎&#xff0c;安全态势感知 - FOFA网络空间测绘系统 "glassfish"&…

用TensorFlow实现线性回归

说明 本文采用TensorFlow框架进行讲解&#xff0c;虽然之前的文章都采用mxnet&#xff0c;但是我发现tensorflow提供了免费的gpu可供使用&#xff0c;所以果断开始改为tensorflow&#xff0c;若要实现文章代码&#xff0c;可以使用colaboratory进行运行&#xff0c;当然&#…

外挂程序:增强点及辅助

1.关于前几篇介绍的外挂程序,SAP中的业务单据还是要区分具体的操作人员。如建立财务凭证,工号A,B,C使用相同的SAP账号,那就没办法知道是谁操作的了啊,所以sap的业务单据需要细分到具体人员的都要增强实现以下: 如生产工单: 具体的增强点: 2.辅助程序:SAP账号自动锁定功…

从新手到专家必读书籍:官方推荐.NET技术体系架构指南

前言 Microsoft 官方推荐了一系列有关 .NET 体系结构的指南&#xff0c;旨在帮助开发人员掌握最新的技术和最佳实践。这些资源覆盖了从微服务架构到云原生应用开发等多个主题&#xff0c;是开发高质量 .NET 应用程序不可或缺的参考资料。 通过这些指南&#xff0c;可以深入了…

瑞幸x《黑神话》周边秒空,联名营销真的是流量密码吗?

​8月19日&#xff0c;瑞幸上线了与国产3A游戏《黑神话&#xff1a;悟空》合作的联名活动&#xff0c;其中包括黑神话腾云美式咖啡及周边产品。很多人为了抢到联名的周边&#xff0c;一大早就在瑞幸卡点下单&#xff0c;更有一些网友早上6点多就在瑞幸门口“蹲点”&#xff0c;…

会话跟踪方案:Cookie Session Token

什么是会话技术&#xff1f; Cookie 以登录为例&#xff0c;用户在浏览器中将账号密码输入并勾选自动登录&#xff0c;浏览器发送请求&#xff0c;请求头中设置Cookie&#xff1a;userName:张三 ,password:1234aa &#xff0c;若登录成功&#xff0c;服务器将这个cookie保存…

河南萌新联赛2024第(六)场:郑州大学(补题ABCDFGIL)

文章目录 河南萌新联赛2024第&#xff08;六&#xff09;场&#xff1a;郑州大学A 装备二选一&#xff08;一&#xff09;简单介绍&#xff1a;思路&#xff1a;代码&#xff1a; B 百变吗喽简单介绍&#xff1a;思路&#xff1a;代码&#xff1a; C 16进制世界简单介绍&#x…

【时时三省】(C语言基础)指针进阶2

山不在高&#xff0c;有仙则名。水不在深&#xff0c;有龙则灵。 ----CSDN 时时三省 数组指针 是一种指针&#xff0d;是指向数组的指针 整型指针&#xff0d;是指向整形的指针 字符指针&#xff0d;是指向字符的指针 什么叫做数组指针 上面的整形指针跟字符指针只需要&am…