Linux部署RocketMQ并使用SpringBoot创建生产、消费者

  • 😜           :是江迪呀
  • ✒️本文关键词RocketMQ消息队列
  • ☀️每日   一言在你心灰意冷、心烦意乱时也不要停下你的脚步!

在这里插入图片描述

一、前言

RocketMQ(Apache RocketMQ)是一种开源的分布式消息中间件系统,最初由阿里巴巴开发并捐赠给 Apache 基金会。它提供了可靠的、低延迟的消息传递能力,适用于构建大规模分布式系统中的消息通信。RocketMQ 主要用于解决分布式系统中异步通信、解耦、流量削峰等问题。下面让我们一起看下,如何在Linux上部署RocketMQ~

二、介绍RocketMQ

2.1 RocketMQ产生背景

随着业务规模的扩大,阿里巴巴面临着越来越多的分布式系统构建需求。为了解决这个问题,阿里巴巴集团2012年推出的开源分布式消息中间件 —— RocketMQ

2.1 RocketMQ作用

(1)异步通信和解耦: RocketMQ可以在不同的服务之间实现异步通信,解耦了服务之间的紧耦合关系,提高了系统的可维护性和可扩展性。
(2)流量削峰: RocketMQ支持消息积压和消费速率不匹配时的流量削峰功能,防止系统因突发流量而崩溃。
(3)实时数据同步: 用于将数据实时同步到不同的存储介质,保持数据的一致性。
(4)事件驱动架构: RocketMQ支持事件驱动的架构,使得系统能够更加敏捷地响应业务事件。

2.2 RocketMQ的组件

RocketMQ 的主要组件包括:
(1)Producer:负责发送消息到 RocketMQ 服务器。
(2)Broker:消息中转服务器,负责存储消息并提供消息的读写服务。
(3)Consumer:Broker 订阅并消费消息。
(4)Topic:消息的分类,Producer 发送消息到特定的 TopicConsumer 订阅相应的 Topic
(5)Tag: 对消息的进一步分类,可以用于 Consumer 进一步过滤消息。
(6)Message Queue: 每个 Topic 下可以分成多个 Message Queue,实现消息的分区和负载均衡。

2.3 RocketMQ的优缺点

(1)优点

  • 高吞吐量: RocketMQ具有高吞吐量的特点,适用于大量消息的处理。
  • 可靠性: RocketMQ通过消息的持久化存储和复制机制,确保消息不会丢失。
  • 低延迟: RocketMQ在消息传递过程中能够保持较低的延迟,适用于实时性要求较高的场景。
  • 灵活的消息模式: 支持发布-订阅和点对点两种消息模式,根据业务需求进行选择。
  • 水平扩展: 可以通过增加Broker节点来实现水平扩展,提高消息处理能力。

(2)缺点

  • 维护成本: RocketMQ需要维护多个组件,包括ProducerBrokerConsumer等,涉及到一定的运维成本。
  • 学习曲线: 对于新手来说,学习和理解RocketMQ的一些概念和配置可能需要一定的时间。
  • 一致性保障: 虽然RocketMQ通过复制机制保障了消息的可靠性,但在极端情况下可能会存在消息的重复传递或乱序问题。

三、 RocketMQ如何部署

3.1 下载

RocketMQ下载地址

3.2 上传、解压

上传文件到Linux有两种方式:

(1)上传

  • 通过rz命令
rz

你可以使用rz命令,在使用这个命令之前你必须确保linux已经安装了lrzsz,安装命令如下:

sudo apt-get update
sudo apt-get install lrzsz
  • 使用xftp
    这个我就不赘述了。
    在这里插入图片描述

(2)解压

unzip rocketmq-all-4.5.2-bin-release.zip

如果没有安装unzip,需要安装一下:

// 查看 unzip 包的安装情况
yum list unzip
//没有安装时,使用命令安装 unzip
yum list unzipyum install unzip.x86_64

在这里插入图片描述

3.2 启动RocketMQ

RocketMQ的启动主要涉及到Namesrv(命名服务)Broker(消息存储和消费者服务)两部分。要想启动RocketMQ,首先进入解压后的bin目录:

cd rocketmq-all-4.5.2-bin-release/bin

(1)启动Namesrv并设置输出日志位置

nohup sh mqnamesrv > namesrv.log 2>&1 &

(2)启动Broker并设置输出日志位置

nohup sh mqbroker -n localhost:9876 > broker.log 2>&1 &

查看是否启动:

jps

输出下面的内容说明启动成功了:

2931 NamesrvStartup
25599 Jps
25583 BrokerStartup

在启动Broker会出现失败问题,一般来说就是内存不足RocketMq默认的虚拟机内存较大,因而启动失败,需要编辑如下两个配置文件,修改jvm的内存大小:

//编辑runbroker.sh和runserver.sh修改默认的JVM大小
vim runbroker.sh
vim runserver.sh 

在这里插入图片描述
修改为:

JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -xx:metaspaceSize=128m -XX:MaxMetaspaceSize=320m"

如果还是启动不了,需要将NameServer关闭,重新启动一下,同样是先进入bin目录,关闭命令如下:

sh mqshutdown namesrv

四、测试与关闭

4.1 测试

(1) 发送消息(生产者)

//设置环境变量
export NAMESRV_ADDR=localhost:9876
//使用安装包的demo发送消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

在这里插入图片描述
上面的信息就是RocketMQproducer发送的消息。特点:启动发送完毕消息后就会停止。

(2) 接收消息(消费者)

//设置环境变量
export NAMESRV_ADDR=localhost:9876
//接收消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

在这里插入图片描述

4.2 关闭RocketMQ

//关闭namesrv
sh bin/mqshutdown namesrv
//关闭Broker
sh bin/mqshutdown broker

五、SpringBoot连接RocketMQ

5.1 引入依赖

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-thymeleaf</artifactId><version>2.3.5.RELEASE</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.5.2</version> </dependency>
</dependencies>

5.2 配置文件application.properties

# Name Server地址
rocketmq.name-server=your-nameserver-ip:9876
# 生产者组名
rocketmq.producer.group=my-producer-group
# 消费者组名
rocketmq.consumer.group=my-consumer-group

5.3 生产者

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;@Component
public class RocketMQProducer {@Value("${rocketmq.name-server}")private String nameServer;@Value("${rocketmq.producer.group}")private String producerGroup;public void sendMessage(String topic, String message) throws Exception {DefaultMQProducer producer = new DefaultMQProducer(producerGroup);producer.setNamesrvAddr(nameServer);producer.start();// 创建消息对象,设置消息内容org.apache.rocketmq.common.message.Message msg = new org.apache.rocketmq.common.message.Message(topic, message.getBytes());// 发送消息producer.send(msg);producer.shutdown();}
}

5.4 消费者

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;@Component
public class RocketMQConsumer {@Value("${rocketmq.name-server}")private String nameServer;@Value("${rocketmq.consumer.group}")private String consumerGroup;public void startConsumer(String topic) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);consumer.setNamesrvAddr(nameServer);// 订阅主题和标签,可以根据需要进行过滤consumer.subscribe(topic, "*");// 注册消息监听器consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {for (org.apache.rocketmq.common.message.MessageExt msg : msgs) {System.out.println("Received message: " + new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});consumer.start();}

5.5 启动类

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;@SpringBootApplication
public class RocketMQDemoApplication {public static void main(String[] args) throws Exception {ConfigurableApplicationContext context = SpringApplication.run(RocketMQDemoApplication.class, args);RocketMQProducer producer = context.getBean(RocketMQProducer.class);producer.sendMessage("my-topic", "Hello, RocketMQ!");RocketMQConsumer consumer = context.getBean(RocketMQConsumer.class);consumer.startConsumer("my-topic");}
}

六、RocketMQ集群

上面所述的是单体RocketMQ,也能使用。但是如果你想要实现高可用在实际的业务场景中。RocketMQ大部分都不会单体存在,需要搭建集群来实现高可用

有人已经写好了,而且很详细:传送门

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/111146.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SOLIDWORKS中多实体文件到装配体的转换技巧

我们在做机械等工程设计中&#xff0c;有时为了节省时间&#xff0c;需要把多实体的“零件”&#xff0c;直接转换为装配体&#xff0c;不再另外装配&#xff0c;这样能大大简化设计的操作时间&#xff0c;复杂程度。 在这里&#xff0c;我们首先要了解&#xff0c;SOLIDWORKS文…

比较差值结构的两种排斥作用

( A, B )---3*30*2---( 1, 0 )( 0, 1 ) 让网络的输入只有3个节点&#xff0c;AB训练集各由6张二值化的图片组成&#xff0c;让差值结构中有两个点&#xff0c;一种情况两个点都属于A&#xff0c;一种情况两个点分别来自A和B。排列组合所有可能&#xff0c;统计迭代次数并排序。…

【单片机】有人WH-LTE-7S1 4G cat1 模块连接服务器,教程,记录

文章目录 4G cat1 模块封装引脚名称功能拓扑图串口模块调试WH-LTE-7S1 4G cat1 模块 我买的这个模块内置了电信卡&#xff0c;不用插电话卡就能用&#xff0c;要插也行&#xff0c;在背面。 ⚫ 5-16V 宽电压供电 ⚫ LTE Cat 1&#xff0c;搭载 4G 网络&#xff0c;低时延&…

webassembly003 ggml ADAM (暂记)

Adam优化器的工作方式是通过不断更新一阶矩估计和二阶矩估计来自适应地调整学习率&#xff0c;并利用动量法来加速训练过程。这种方式可以在不同的参数更新方向和尺度上进行自适应调整&#xff0c;从而更有效地优化模型。 https://arxiv.org/pdf/1412.6980.pdf 参数 这些参数…

Linux通过libudev获取挂载路径、监控U盘热拔插事件、U盘文件系统类型

文章目录 获取挂载路径监控U盘热拔插事件libusb 文件系统类型通过挂载点获取挂载路径添libudev加库 获取挂载路径 #include <stdio.h> #include <libudev.h> #include <string.h>int main() {struct udev *udev;struct udev_enumerate *enumerate;struct ud…

EVO大赛是什么

价格是你所付出的东西&#xff0c;而价值是你得到的东西 EVO大赛是什么&#xff1f; “EVO”大赛全称“Evolution Championship Series”&#xff0c;是北美最高规格格斗游戏比赛&#xff0c;大赛正式更名后已经连续举办12年&#xff0c;是全世界最大规模的格斗游戏赛事。常见…

bpmnjs Properties-panel拓展(属性设置篇)

最近有思考工作流相关的事情&#xff0c;绘制bpmn图的工具认可度比较高的就是bpmn.js了&#xff0c;是一个基于node.js的流程图绘制框架。初始的框架只实现了基本的可视化&#xff0c;想在xml进行客制化操作的话需要拓展&#xff0c;简单记录下几个需求的实现过程。 修改基础 …

Transformer (Attention Is All You Need) 论文精读笔记

Transformer(Attention Is All You Need) Attention Is All You Need 参考&#xff1a;跟李沐学AI-Transformer论文逐段精读【论文精读】 摘要&#xff08;Abstract&#xff09; 首先摘要说明&#xff1a;目前&#xff0c;主流的序列转录&#xff08;序列转录&#xff1a;给…

【数据结构】排序(插入、选择、交换、归并) -- 详解

一、排序的概念及其运用 1、排序的概念 排序&#xff1a;所谓排序&#xff0c;就是使一串记录&#xff0c;按照其中的某个或某些关键字的大小&#xff0c;递增或递减的排列起来的操作。 稳定性&#xff1a;假定在待排序的记录序列中&#xff0c;存在多个具有相同的关键字的记…

Go 结构体

现在有一个需求&#xff0c;要求存储学生的详细信息&#xff0c;例如&#xff0c;学生的学号&#xff0c;学生的姓名&#xff0c;年龄&#xff0c;家庭住址等。按照以前学习的存储方式&#xff0c;可以以如下的方式进行存储&#xff1a; 通过定义变量的信息&#xff0c;进行存储…

数字电路-二进制学习

什么是二进制&#xff1f; 数字电路 中 只有 高电平 和低电平 就是 1 和0 进位规则是“逢二进一”&#xff0c;借位规则是“借一当二”。 二进制、八进制 、十进制、十六进制 二进制 有两个数来表示 &#xff1a; 0、1 八进制 有8个数来表示 &#xff1a; 0、1、2、3、4、…

基于RabbitMQ的模拟消息队列之二---创建项目及核心类

一、创建项目 创建一个SpringBoot项目&#xff0c;环境&#xff1a;JDK8&#xff0c;添加依赖&#xff1a;Spring Web、MyBatis FrameWork(最主要&#xff09; 二、创建核心类 1.项目分层 2.核心类 在mqserver包中添加一个包&#xff0c;名字为core&#xff0c;表示核心类…

uniapp 项目实践总结(一)uniapp 框架知识总结

导语&#xff1a;最近开发了一个基于 uniapp 框架的项目&#xff0c;有一些感触和体会&#xff0c;所以想记录以下一些技术和经验&#xff0c;在这里做一个系列总结&#xff0c;算是对自己做一个交代吧。 目录 简介全局文件全局组件常用 API条件编译插件开发 简介 uniapp 是…

openGauss学习笔记-47 openGauss 高级数据管理-权限

文章目录 openGauss学习笔记-47 openGauss 高级数据管理-权限47.1 语法格式47.2 参数说明47.3 示例 openGauss学习笔记-47 openGauss 高级数据管理-权限 数据库对象创建后&#xff0c;进行对象创建的用户就是该对象的所有者。数据库安装后的默认情况下&#xff0c;未开启三权分…

使用ELK(ES+Logstash+Filebeat+Kibana)收集nginx的日志

文章目录 Nginx日志格式修改配置logstash收集nginx日志引入Redis收集日志写入redis从redis中读取日志 引入FilebeatFilebeat简介Filebeat安装和配置 配置nginx转发ES和kibanaELK设置账号和密码 书接上回&#xff1a;《ELK中Logstash的基本配置和用法》 Nginx日志格式修改 默认…

Gorilla LLM:连接海量 API 的大型语言模型

如果你对这篇文章感兴趣&#xff0c;而且你想要了解更多关于AI领域的实战技巧&#xff0c;可以关注「技术狂潮AI」公众号。在这里&#xff0c;你可以看到最新最热的AIGC领域的干货文章和案例实战教程。 一、前言 在当今这个数字化时代&#xff0c;大型语言模型&#xff08;LLM…

利用多种机器学习方法对爬取到的谷歌趋势某个关键词的每日搜索次数进行学习

大家好&#xff0c;我是带我去滑雪&#xff01; 前一期利用python爬取了谷歌趋势某个关键词的每日搜索次数&#xff0c;本期利用爬取的数据进行多种机器学习方法进行学习&#xff0c;其中方法包括&#xff1a;随机森林、XGBOOST、决策树、支持向量机、神经网络、K邻近等方法&am…

聚类分析 | MATLAB实现基于DBSCAD密度聚类算法可视化

聚类分析 | MATLAB实现基于LP拉普拉斯映射的聚类可视化 目录 聚类分析 | MATLAB实现基于LP拉普拉斯映射的聚类可视化效果一览基本介绍程序设计参考资料 效果一览 基本介绍 基于DBSCAD密度聚类算法可视化&#xff0c;MATLAB程序。 使用带有KD树加速的dbscan_with_kdtree函数进行…

uniapp项目实战系列(1):导入数据库,启动后端服务,开启代码托管

目录 前言前期准备1.数据库的导入2.运行后端服务2.1数据库的后端配置2.2后端服务下载依赖&#xff0c;第三方库2.3启动后端服务 3.开启gitcode代码托管 ✨ 原创不易&#xff0c;还希望各位大佬支持一下&#xff01; &#x1f44d; 点赞&#xff0c;你的认可是我创作的动力&…

vscode编译C语言

首先把c文件拖到vscode中 然后安装这个插件 安装完毕后会提示你代码中的语法错误&#xff0c;并在编译器的右上角出现编译按钮 我当前的问题是没有GCC&#xff0c;我们点一下编译的按钮也可以看出来这个问题 在 django笔记中 附录二 windows上直接安装uwsgi(不可行) 附录二 win…