从0开始学习RocketMQ:快速部署启动

快速部署

快速部署一个单节点单副本 RocketMQ 服务,并完成简单的消息收发。

安装Apache RocketMQ

下载地址:RocketMQ官网下载

这里我们下载二进制包:rocketmq-all-5.3.0-bin-release.zip
直接解压即可:tar -zxvf rocketmq-all-5.3.0-bin-release.zip
重命名:mv rocketmq-all-5.3.0-bin-release rocketmq
在这里插入图片描述

启动NameServer

安装完RocketMQ包后,我们启动NameServer

### 启动namesrv
$ nohup sh bin/mqnamesrv &### 验证namesrv是否启动成功
$ tail -f nohup.out
The Name Server boot success. serializeType=JSON, address 0.0.0.0:9876

启动Broker+Proxy

NameServer成功启动后,我们启动Broker和Proxy。这里我们使用 Local 模式部署,即 Broker 和 Proxy 同进程部署。

### 先启动broker
$ nohup sh bin/mqbroker -n localhost:9876 --enable-proxy &### 验证broker是否启动成功
$ tail -f nohup.out
Wed Sep 11 18:35:56 CST 2024 rocketmq-proxy startup successfully

在这里插入图片描述

工具测试消息收发

export NAMESRV_ADDR=localhost:9876
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

在这里插入图片描述

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

在这里插入图片描述

命令行测试

创建 Topic

bin/mqadmin updateTopic -c DefaultCluster -n localhost:9876 -t studyMq

在这里插入图片描述

查看 Topic 列表

bin/mqadmin topicList -n localhost:9876

在这里插入图片描述

发送消息

bin/mqadmin sendMessage -n localhost:9876 -t studyMq -p 111111

在这里插入图片描述

消费消息

bin/mqadmin consumeMessage -n localhost:9876 -t studyMq

在这里插入图片描述

SDK测试

生产消息

package rocketmq;import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;import java.time.Duration;public class ProducerExample {public static void main(String[] args) throws Exception {// 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8080;xxx:8081。String endpoint = "127.0.0.1:8080";// 消息发送的目标Topic名称,需要提前创建。String topic = "studyMq";ClientServiceProvider provider = ClientServiceProvider.loadService();ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);builder.setRequestTimeout(Duration.ofSeconds(10));ClientConfiguration configuration = builder.build();// 初始化Producer时需要设置通信配置以及预绑定的Topic。Producer producer = provider.newProducerBuilder().setTopics(topic).setClientConfiguration(configuration).build();// 普通消息发送。Message message = provider.newMessageBuilder().setTopic(topic)// 设置消息Tag,用于消费端根据指定Tag过滤消息。.setTag("yyh")// 消息体。.setBody("good night".getBytes()).build();try {// 发送消息,需要关注发送结果,并捕获失败等异常。SendReceipt sendReceipt = producer.send(message);System.out.println("Send message successfully, messageId:" + sendReceipt.getMessageId());} catch (ClientException e) {System.out.println("Failed to send message:" + e);}// producer.close();}
}

消费消息

package rocketmq;import lombok.Data;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;import java.io.IOException;
import java.time.Duration;
import java.util.Collections;@Data
public class ConsumerExample {private ConsumerExample() {}public static void main(String[] args) throws ClientException, IOException, InterruptedException {final ClientServiceProvider provider = ClientServiceProvider.loadService();// 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。String endpoints = "127.0.0.1:8080";ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(endpoints).setRequestTimeout(Duration.ofSeconds(20)).build();// 订阅消息的过滤规则,表示订阅所有Tag的消息。String tag = "*";FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);// 为消费者指定所属的消费者分组,Group需要提前创建。String consumerGroup = "YourConsumerGroup";// 指定需要订阅哪个目标Topic,Topic需要提前创建。String topic = "studyMq";// 初始化PushConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。PushConsumer pushConsumer = provider.newPushConsumerBuilder().setClientConfiguration(clientConfiguration)// 设置消费者分组。.setConsumerGroup(consumerGroup)// 设置预绑定的订阅关系。.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))// 设置消费监听器。.setMessageListener(messageView -> {// 获取消息体的ByteBufferByteBuffer byteBuffer = messageView.getBody();// 将ByteBuffer转换为字节数组byte[] bytes = new byte[byteBuffer.remaining()];byteBuffer.get(bytes); //将ByteBuffer中的数据读取到字节数组// 将字节数组转换为字符串String msgBody = new String(bytes, StandardCharsets.UTF_8);// 处理消息并返回消费结果。System.out.println("Consume message successfully, messageId=" + messageView.getMessageId() +",msg=" + msgBody);return ConsumeResult.SUCCESS;}).build();Thread.sleep(Long.MAX_VALUE);// 如果不需要再使用 PushConsumer,可关闭该实例。// pushConsumer.close();}
}

关闭服务器

通过以下方式关闭服务

sh bin/mqshutdown brokersh bin/mqshutdown namesrv

注意:如果是部署在云服务器上,需要把9876、 8080、10911端口都对外授权

参考资料
《https://rocketmq.apache.org/zh/docs/quickStart/01quickstart》

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/421969.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

httprunner学习笔记(自用版)

目录 一、安装二、脚本录制1、charles录制2、F12脚本录制 三、脚本生成1、har转换为json脚本2、har转换为yml脚本 四、执行脚本五、查看报告六、httpruner接口自动化项目架构 HttpRunner 是一款面向 HTTP(S) 协议的通用测试框架,只需编写维护一份 YAML/JSON 脚本&am…

Request Response

1 前言 1.1 内容概要 理解Request、Response和HTTP报文之间的关系掌握通过Request能够获得的信息 请求URL、URI、请求协议请求头、客户机和主机请求参数 掌握通过Response能够完成的设置 响应中文乱码问题响应(Json)字符串、图片(文件&a…

Leetcode 188. 买卖股票的最佳时机 Ⅳ 状态机dp C++实现

Leetcode 188.买卖股票的最佳时机 Ⅳ 问题:给你一个整数数组 prices 和一个整数 k ,其中 prices[i] 是某支给定的股票在第 i 天的价格。设计一个算法来计算你所能获取的最大利润。你最多可以完成 k 笔交易。也就是说,你最多可以买 k 次&…

准备好了吗?JAVA从业AI开发的学习路线详解

作为一个拥有扎实 Java 基础的人,想要涉足人工智能(AI)应用开发,你已经在编程能力方面打下了很好的基础。Java 是一种通用的、强类型的语言,非常适合于开发高性能的应用程序,尤其是在后端服务和大规模分布式…

C++:IO流

目录 C语言的输入输出 流是什么 CIO流 C标准IO流 C文件IO流 stringstream的介绍 C语言的输入输出 C 语言中我们用到的最频繁的输入输出方式就是 scanf () 与 printf() 。 scanf(): 从标准输入设备 ( 键 盘 ) 读取数据,并将值存放在变量中 。 printf(): 将…

linux驱动之模块化编程

我们写的驱动程序,对linux操作系统而言,都是一个一个模块。 我们写应用程代码的时候是要有main函数入口,但是驱动模块有自己的入口。所以在编译驱动模块的时候就要使用到内核的makefile,来编译我们的模块。 我们在命令行敲&#x…

RS®FSWP 相位噪声分析仪和 VCO 测试仪信号源和组件的高端分析

FSWP 相位噪声分析仪和VCO测试仪 价格实惠,性能出众 R&SFSWP 相位噪声分析仪和 VCO 测试仪结合噪声极低的内部源与互相关技术,具备高灵敏度。它可在数秒内测量高度稳定的信号源的相位噪声。 R&SFSWP 还具备脉冲信号测量、加性相位噪声&…

C++初阶:string类的模拟实现

✨✨小新课堂开课了,欢迎欢迎~✨✨ 🎈🎈养成好习惯,先赞后看哦~🎈🎈 所属专栏:C:由浅入深篇 小新的主页:编程版小新-CSDN博客 前言: 前面已经对string类进行了…

[数据集][目标检测]井盖丢失未盖破损检测数据集VOC+YOLO格式2890张5类别

数据集格式:Pascal VOC格式YOLO格式(不包含分割路径的txt文件,仅仅包含jpg图片以及对应的VOC格式xml文件和yolo格式txt文件) 图片数量(jpg文件个数):2890 标注数量(xml文件个数):2890 标注数量(txt文件个数):2890 标注…

QGIS 如何连接空间库,并实时编辑空间表?编辑后库表如何刷新,保证是最新数据?

文章目录 一、什么是 qgis?二、qgis 如何连接数据库三、实时编辑空间表四、编辑后库表如何刷新,保证是最新数据?五、总结 一、什么是 qgis? QGIS(原称Quantum GIS)是一个用户界面友好的开源桌面端软件&…

htop、free -h对于可用内存显示不同的区别

htop中Mem包含了缓存和缓存区, free -h查看 used free buff/cache 上面htop显示的mem, 1、我看我还能用多少内存,看哪里 看free -h 中的free 2、buff/cache 是啥 缓存缓存区占用,htop显示的效果是把这个也算在一块了&#…

TIDB的整体架构和主要功能

1. 基础架构 PD :负责集群管理和调度。TiDB Server :负责 SQL 查询处理。TiKV/TiFlash:负责数据存储和事务处理。 1.1 PD (Placement Driver) Server 1.1.1 基础介绍 整个 TiDB 集群的元信息管理模块,负责存储每个 TiKV 节点实时…

哪款骨传导耳机适合运动?健身党无广安利五款有用的骨传导耳机!

作为一名耳机爱好者,我的耳机收藏可以说是丰富多样,从追求极致音质的头戴式,到便于携带的入耳式,再到近年来兴起的骨传导耳机,我都有所体验。在众多选择中,我最终偏爱上了骨传导耳机,它以其独特…

vue3 使用 codemirror 实现yaml文件的在线编辑

vue3 使用 codemirror 实现yaml文件的在线编辑 1. 使用情形2. 插件下载3. 封装yaml编辑器组件4. 父组件使用5. js-yaml 使用6. 备注 1. 使用情形 需要对yaml文件进行在线编辑,并且进行基础格式验证 2. 插件下载 vue-codemirror 在线代码编辑器插件 js-yaml 用于转…

容联云容犀Copilot&Agent入选《中国 AI Agent 产品罗盘》

近日,InfoQ研究中心推出《中国AI Agent应用研究报告》,并在报告中对现行的中国AI Agent产品进行梳理总结,并形成《中国AI Agent产品罗盘》。 作为“营销服”领域垂直类Agent,容联云容犀Copilot&Agent入选2024中国AI A…

java8+springboot2.3升级jdk17+springboot2.7.9踩坑

一.问题: java.lang.ExceptionInInitializerError: nullat java.base/java.lang.Class.forName0(Native Method)at java.base/java.lang.Class.forName(Class.java:375)。。。。。。内部保密。。。。at org.springframework.context.annotation.ParserStrategyUtils.invokeAwa…

封装智能指针 qt实现登录界面

1.封装独占智能指针——unique_ptr #include <iostream> #include <utility> // For std::move// 命名空间 namespace custom_memory { template <typename T> class myPtr { public:// 使用初始化列表进行初始化explicit myPtr(T* p nullptr) noexcept : …

ThinkPHP8出租屋管理系统

有需要请加文章底部Q哦 可远程调试 ThinkPHP8出租屋管理系统 一 介绍 此出租屋管理系统基于ThinkPHP8框架开发&#xff0c;数据库mysql&#xff0c;前端Vue3&#xff0c;前后端不分离&#xff0c;系统主要角色为管理员。房租计算器&#xff0c;房东记账收租管理&#xff0c;房…

NX二次开发—柱面中心线工具

设计一个柱面中心线工具,可以实现选择对象,画出圆柱的中心线,可以更改中心的线的颜色、线型、线宽和图层,是否延长,是否关联。 先在NX上进行界面设计 添加选择对象,并设置标题,选择设置为多选 添加组,在组里添加线条颜色/线型/线宽,设置颜色ColorValue和线型Value 这…

C++详解string(全面解析)

目录 string的概念&#xff1a; string的框架&#xff1a; 1、成员函数 2、迭代器&#xff08;Iterators&#xff09;​编辑 3、容量 4、元素访问 5、修改 6、非成员函数重载 string的构造和拷贝构造&#xff1a; string的析构&#xff1a; string的访问&#xff1a;…