RocketMQ单机部署完整学习笔记

文章目录

  • 前言
  • 一、RocketMQ是什么?
  • 二、使用步骤
    • 1.安装MQ
      • 1.安装JDK
      • 2.安装mq
      • 3.MQ配置(核心)
    • 2.搭建可视化dashboard
      • 1.下载源码
      • 2.修改配置
      • 3.启动
    • 3.整合java
      • 1.生产者
      • 2.消费者
      • 3.启动生产者
      • 4.启动消费者
      • 5.dashboard添加消费组
  • 三、总结
    • 全部的配置

前言

本文是基于4.X版本RocketMQ,从MQ的搭建,消息推送和消费以及dashboard的使用

一、RocketMQ是什么?

参考文档 https://rocketmq.apache.org/zh/docs/4.x/
重点角色如下

  • Producer:消息的发送者;举例:发信者
  • Consumer:消息接收者;举例:收信者
  • Broker:暂存和传输消息;举例:邮局
  • NameServer:管理Broker;举例:各个邮局的管理机构
  • Topic:区分消息的种类;一个发送者可以发送消息给一个或者多个Topic;一个消息的接收者可以订阅一个或者多个Topic消息;举例
  • tag:消息标签,方便服务器过滤使用

二、使用步骤

1.安装MQ

首先安装jdk 再安装mq

1.安装JDK

  1. 查看Linux系统是否有自带的jdk
java -version

如果有 输入 rpm -qa | grep java 检测jdk的安装包
接着进行一个个删除包,输入:rpm -e --nodeps +包名
最后再次:rpm -qa | grep java检查是否删除完即可

  1. 下载jdk

https://www.oracle.com/java/technologies/downloads/#java8
资源连接地址
还在审核中
3. 上传jdk到linux服务器
在这里插入图片描述

  1. 解压jdk
tar -zvxf jdk-8u241-linux-x64..tar.gz
  1. 配置环境变量
    用vim /etc/profile进入编辑状态,加入下边这段配置
    JAVA_HOME 根据自己的解压路径来写
JAVA_HOME=/usr/local/jdk/jdk1.8.0_241
JRE_HOME=$JAVA_HOME/jre
PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
export JAVA_HOME JRE_HOME PATH CLASSPATH
  1. 重新加载配置
source /etc/profile
  1. 进行测试
java -version

在这里插入图片描述

2.安装mq

  1. 下载mq
    连接 https://archive.apache.org/dist/rocketmq/4.9.4/rocketmq-all-4.9.4-source-release.zip

  2. 解压上传
    我下载的是公司的mq是4.8的,官网链接给的是根4.9的,这个问题不大,不影响

目录结构如下
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

  1. 配置环境变量
vim /etc/profile
# 在末尾加入下面配置 路径和自己解压的mq路径一直 上一步有截图
export ROCKETMQ_HOME=/bsoft/mdt/rocketmq/rocketmq-4.8.0/rocketmq-4.8.0
# 使环境变量生效
source /etc/profile

3.MQ配置(核心)

这一步很重要,配置完这里,那mq就算部署好了

  1. 修改runserver.sh
    默认配置比较大,修改启动大小
## cd /bin
vim runserver.sh
## 修改启动大小
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
  1. 启动服务nameserver
## 启动
nohup sh bin/mqnamesrv &
## 关闭
sh bin/mqshutdown namesrv

出现下面就算启动成功了
The Name Server boot success. serializeType=JSON
注意目录别进错了
在这里插入图片描述

  1. 指定NameServer地址
    相当于 broker注册到nameserver上
vim /etc/profile
# 在末尾加入下面配置 有多个时以分号隔开,这个是集群时使用的 mq端口默认是9876 
# 192.168.141.101是服务器地址
export NAMESRV_ADDR=192.168.141.101:9876
# 使环境变量生效
source /etc/profile
  1. 修改 runbroker.sh

修改启动参数

vim runbroker.sh
## 
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m"
  1. 修改broker.conf
    重要,核心配置,以后关于mq服务的配置都在这里
    conf目录下
    2m-2s-async
    2m-2s-sync
    2m-noslave
    dledger
    这四个目录是集群配置时会用到,我们这路是单机的先不管
vim broker.confbrokerClusterName = DefaultCluster
brokerName = broker-a
#brokerid,0就表示是Master,>0的都是表示
brokerId = 0
# 这个就是第三三步配置的export NAMESRV_ADDR=192.168.141.101:9876 多个以分号分割
namesrvAddr=192.168.141.101:9876
#如果是多网卡的机器,比如云服务器,那么需要在broker.conf中增加brokerIP1属性,
#指定所在机器的外网网卡地址
brokerIP1=192.168.141.101
#对外服务的监听端口
listenPort=10911
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
traceTopicEnable=true
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true 

在这里插入图片描述
在这里插入图片描述

  1. 启动broker
    进入bin目录
    注意 -c 很多人启动不起来就是没加上
#启动
nohup sh  bin/mqbroker -c conf/broker.conf &
# 关闭
sh bin/mqshutdown broker

查看nohup.out
出现这样的就说明启动成功了
The broker[broker-a, 192.168.141.101:10911] boot success. serializeType=JSON and name server is 192.168.141.101:9876

在这里插入图片描述

  1. 到此mq服务已启动完成
jps

在这里插入图片描述

  1. 查看日志
## 查看namesrv日志tailf /root/logs/rocketmqlogs/namesrv.log ## broker日志tailf /root/logs/rocketmqlogs/broker.log 

其实前面启动的时候查看这里的日志也可以看是否启动没
停掉时先停broker 再停namesrv 启动先启 namesrv 再启动broker 因为broker 需要注册到namesrv 上

  1. 发送和接收消息

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

在这里插入图片描述

在这里插入图片描述

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

在这里插入图片描述

2.搭建可视化dashboard

1.下载源码

点击下面直接下载4.X的
https://github.com/apache/rocketmq-dashboard/tree/release-1.0.0
说下
现在MQ已经到5.X了,但是现在还保留着4的,分支下拉到最后可以看到一个relaese-1.0.0
这个就是4.X用的,下载下来后解压
切记版本要对上,不然你和我一样折腾个一两天

在这里插入图片描述

2.修改配置

主要改下这个

rocketmq.config.namesrvAddr=192.168.141.101:9876

如果自己端口需要修改也可以,我是改成了8078
在这里插入图片描述

3.启动

在这里插入图片描述
访问 http://localhost:8078
在这里插入图片描述

3.整合java

1.生产者

直接上代码

package com.bsoft;import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;import java.io.IOException;/*** @author:* @time: 2023/12/29 15:40*/
public class MQPublisher {private final static String nameServer = "192.168.141.101:9876";private final static String producerGroup = "my_group";private final static String consumerGroup = "my_group";private final static String topic = "topic_test";public static void main(String[] args) throws IOException {// 初始化一个producer并设置Producer group nameDefaultMQProducer producer = new DefaultMQProducer(producerGroup);try {// 设置NameServer地址producer.setNamesrvAddr(nameServer);// 启动producerproducer.start();// 创建一条消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤Message msg = new Message(topic, "tagB", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));// 异步发送消息, 发送结果通过callback返回给客户端producer.send(msg, new SendCallback() {public void onSuccess(SendResult sendResult) {System.out.printf("OK %s %n",sendResult.getMsgId());}public void onException(Throwable e) {System.out.printf("Exception %s %n", e);e.printStackTrace();}},10000);} catch (Exception e) {e.printStackTrace();}System.in.read();}
}

2.消费者

package com.bsoft;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;import java.io.IOException;
import java.nio.charset.Charset;
import java.util.List;/*** @author: liucheng* @time: 2023/12/29 15:39*/
public class MQConsumer {private final static String nameServer = "192.168.141.101:9876";private final static String consumerGroup = "my_group_test";private final static String topic = "topic_test";public static void main(String[] args) throws MQClientException, IOException, InterruptedException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup,true);// 设置NameServer的地址consumer.setNamesrvAddr(nameServer);// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息consumer.subscribe(topic, "tagE");// 注册回调实现类来处理从broker拉取回来的消息consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);msgs.forEach((msg)->{byte[] body = msg.getBody();String s = new String(body, Charset.defaultCharset());System.out.println("msg=================> " +s);});// 标记该消息已经被成功消费return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动消费者实例consumer.start();System.out.printf("Consumer Started......");
//        Thread.sleep(5000);
//        consumer.shutdown();System.in.read();}
}

3.启动生产者

在这里插入图片描述

在这里插入图片描述

查看详情
在这里插入图片描述

4.启动消费者

package com.bsoft;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;import java.io.IOException;
import java.nio.charset.Charset;
import java.util.List;/*** @author: liucheng* @time: 2023/12/29 15:39*/
public class MQConsumer {private final static String nameServer = "192.168.141.101:9876";private final static String consumerGroup = "my_group_test";private final static String topic = "topic_test";public static void main(String[] args) throws MQClientException, IOException, InterruptedException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);// 设置NameServer的地址consumer.setNamesrvAddr(nameServer);// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息consumer.subscribe(topic, "tagA");// 注册回调实现类来处理从broker拉取回来的消息consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);msgs.forEach((msg)->{byte[] body = msg.getBody();String s = new String(body, Charset.defaultCharset());System.out.println("msg=================> " +s);});// 标记该消息已经被成功消费return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动消费者实例consumer.start();System.out.printf("Consumer Started......");
//        Thread.sleep(5000);
//        consumer.shutdown();System.in.read();}
}

在这里插入图片描述
没日志打印,但是又显示消费了
在这里插入图片描述

5.dashboard添加消费组

没查询到那添加一个
在这里插入图片描述
在这里插入图片描述
重启生产者发现可以消费

对于消费组还是得在dashboard创建好了再去写代码,有的说能够改配置能否直接创建,试过了,没生效,先这样吧,有好的方法或搭建过程中遇到什么问题可以私聊我,看到及时回答

三、总结

整个搭建过程踩了不少坑,比如
版本的不一致导致部分功能一直报错;
启动brocker时未指定实例文件没有加-c来启动导致部署失败;消
费组未在dashboard创建时代码中不显示消费信息
关于5.x无法打包的问题是因为缺少yarn-1.22.10.tar.gz 这个已经上传到jdk那个资源下了,把这个复制到
{path}\maven\repository\com\github\eirslett\yarn\1.22.10再打包即可

全部的配置

#  cat /etc/profileJAVA_HOME=/usr/local/jdk/jdk1.8.0_241
JRE_HOME=$JAVA_HOME/jre
PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
export JAVA_HOME JRE_HOME PATH CLASSPATH
export ROCKETMQ_HOME=/bsoft/mdt/rocketmq/rocketmq-4.8.0/rocketmq-4.8.0
export NAMESRV_ADDR=192.168.141.101:9876
#  cat runbroker.sh JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m"
# cat runserver.sh 
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"# cat broker.conf 
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
namesrvAddr=192.168.141.101:9876
brokerIP1=192.168.141.101
listenPort=10911
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
traceTopicEnable=true
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/230162.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

论文阅读——Slide-Transformer(cvpr2023)

Slide-Transformer: Hierarchical Vision Transformer with Local Self-Attention 一、分析 1、改进transformer的几个思路&#xff1a; &#xff08;1&#xff09;将全局感受野控制在较小区域&#xff0c;如&#xff1a;PVT&#xff0c;DAT&#xff0c;使用稀疏全局注意力来…

机器学习笔记 - 从2D数据合成3D数据

一、3D 数据简介 人们一致认为,从单一角度合成 3D 数据是人类视觉的一项基本功能,这对计算机视觉算法来说极具挑战性。但随着 LiDAR、RGB-D 相机(RealSense、Kinect)和 3D 扫描仪等 3D 传感器的可用性和价格的提高,3D 采集技术的最新进展取得了巨大飞跃。 与广泛使用的 2D…

【REST2SQL】03 GO读取JSON文件

REST2SQL需要一些配置信息&#xff0c;用JSON文件保存&#xff0c;比如config.json 1 创建config.json配置文件 {"hostPort":"localhost:5217","connString":"oracle://blma:5217127.0.0.1:1521/CQYH","_oracle":"ora…

微信防红链接遮罩,QQ防红遮罩,附带安装教程

源码介绍 主要用于防止网址被微信和QQ拦截&#xff0c;以避免用户在其内置浏览器中直接打开。会提供一个引导用户跳转到浏览器进行浏览的页面。 使用教程 1.上传插件整个文件夹到网站根目录。得到&#xff1a; /WxqqJump 2.修改 /index.php 文件。在第一行 <?php 下新增…

力扣hot100 翻转二叉树 递归

&#x1f468;‍&#x1f3eb; 题目地址 &#x1f60b; AC code /*** Definition for a binary tree node.* public class TreeNode {* int val;* TreeNode left;* TreeNode right;* TreeNode() {}* TreeNode(int val) { this.val val; }* TreeNod…

jdk和IDEA教育版下载和安装详解

前言 研究生专业是通信系统,为了寻找实习于是在研二时期学习java。但是在学习java的过程中没有进行系统总结,很多知识点或者一些细节已经忘记。由于工作找的是某行软件中心的软件开发。准备在毕业前对java知识进行系统性学习。本专栏将从零基础开始,从最简单的jdk和IDEA下载…

为什么避免在生命周期钩子中使用箭头函数

在Vue.js中&#xff0c;生命周期钩子是特殊的函数&#xff0c;它们在组件的不同阶段自动被调用。当这些钩子被调用时&#xff0c;Vue确保它们的this上下文指向当前组件的实例。这意味着在生命周期钩子内部&#xff0c;你可以通过this访问组件的数据、计算属性、方法等。这是Vue…

计算机网络(9):无线网络

无线局域网 WLAN 无线局域网常简写为 WLAN (Wireless Local Area Network)。 无线局域网的组成 无线局域网可分为两大类。第一类是有固定基础设施的&#xff0c;第二类是无固定基础设施的。所谓“固定基础设施”是指预先建立起来的、能够覆盖一定地理范围的一批固定基站。 …

qt 异常汇总

1. C2338 No Q_OBJECT in the class with the signal (编译源文件 ..\..\qt\labelme-master\src\mainwindow.cpp mainwindow头文件中的类没有Q_OBJECT宏定义&#xff0c;或者其子类或者其他依赖没有Q_OBJECT宏定义。 全部qt类都要写上Q_OBJECT. 2. C2385 对connect的访…

2 Windows网络编程

1 基础概念 1.1 socket概念 Socket 的原意是“插座”&#xff0c;在计算机通信领域&#xff0c;socket 被翻译为“套接字”&#xff0c;它是计算机之间进行通信的一种约定或一种方式。Socket本质上是一个抽象层&#xff0c;它是一组用于网络通信的API&#xff0c;包括了一系列…

Springboot通过profiles切换不同环境使用的配置

文章目录 简介1.通过分隔符隔离2.通过使用不同的配置文件区分3.测试 简介 一个项目从开发到上线一般要经过几个环境, dev测试环境-uat预生产环境-prod生产环境&#xff0c;每个环境的使用的数据库或者配置不同&#xff0c;这时候可以通过下面两种方式区分配置,达到快速切换的效…

用PHP搭建一个绘画API

【腾讯云AI绘画】用PHP搭建一个绘画API 大家好&#xff01;今天我要给大家推荐的是如何用PHP搭建一个绘画API&#xff0c;让你的网站或应用瞬间拥有强大的绘画能力&#xff01;无论你是想要让用户在网页上绘制自己的创意&#xff0c;还是想要实现自动绘画生成特效&#xff0c;这…

管理组件状态

概述 在应用中&#xff0c;界面通常都是动态的。如图1所示&#xff0c;在子目标列表中&#xff0c;当用户点击目标一&#xff0c;目标一会呈现展开状态&#xff0c;再次点击目标一&#xff0c;目标一呈现收起状态。界面会根据不同的状态展示不一样的效果。 图1 展开/收起目标…

李沐机器学习系列1--- 线性规划

1 Introduction 1.1 线性回归函数 典型的线性回归函数 f ( x ) w ⃗ ⋅ x ⃗ f(x)\vec{w} \cdot \vec{x} f(x)w ⋅x 现实生活中&#xff0c;简单的线性回归问题很少&#xff0c;这里有一个简单的线性回归问题。房子的价格和房子的面积以及房子的年龄假设成线性关系。 p r …

2023机器人行业总结,2024机器人崛起元年(具身智能)

2023总结&#xff1a; 1.Chatgpt引爆了通用人工智能&#xff0c;最大的受益者或是机器人&#xff0c;2023年最热门的创业赛道便是人形机器人&#xff0c;优必选更是成为人形机器人上市第一股&#xff0c; 可以说2023年是机器人开启智能化的元年&#xff0c;而2024则将成为机器…

go执行静态二进制文件和执行动态库文件

目的和需求&#xff1a;部分go的核心文件不开源&#xff0c;例如验证&#xff0c;主程序核心逻辑等等 第一个想法&#xff0c;把子程序代码打包成静态文件&#xff0c;然后主程序执行 子程序 package mainimport ("fmt""github.com/gogf/gf/v2/os/gfile"…

[笔记] 使用 qemu 创建虚拟磁盘并安装 grub

之前使用 wsl 进行了直接创建虚拟磁盘并安装 grub,现在希望能够直接借助 qemu 的工具创建虚拟磁盘文件并安装 grub,由于需要用到 nbd(net block device网络块设备) 模块,在 wsl 中并不支持,因此这里使用到了 Hypver-V 虚拟机创建了一个 Ubuntu 系统,在系统中安装了 qemu 和 gru…

Spring Cloud Gateway + Nacos 实现动态路由

1、maven 依赖 主要依赖 <!-- 网关 --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-gateway</artifactId></dependency>案件差不多完整主要依赖 <!--Spring boot 依赖(微服务基…

uni-app 前后端调用实例 基于Springboot 详情页实现

锋哥原创的uni-app视频教程&#xff1a; 2023版uniapp从入门到上天视频教程(Java后端无废话版)&#xff0c;火爆更新中..._哔哩哔哩_bilibili2023版uniapp从入门到上天视频教程(Java后端无废话版)&#xff0c;火爆更新中...共计23条视频&#xff0c;包括&#xff1a;第1讲 uni…

分布微服软件体系快速云端架构

1 概述 分布微服软件体系云端架构平台&#xff0c;以主流的NACOS服务器作为注册配置中心&#xff0c;采用主流的Gradle框架&#xff0c;内嵌Tomcat10以上版本&#xff0c;用于快速构造各类基于JDK17以上的信息应用系统的分布式微服务软件体系架构&#xff0c;可以适用关系型SQ…