【RocketMQ入门-安装部署与Java API测试】

【RocketMQ入门-安装部署与Java API测试】

    • 一、环境说明
    • 二、安装部署
    • 三、Java API 编写Producer和Consumer进行测试
    • 四、小结

一、环境说明

  1. 虚拟机VWMare:安装centos7.6操作系统
  2. 源码包:rocketmq-all-5.1.3-source-release.zip
  3. 单master部署,在一台虚拟机上安装部署name server和proxy以及broker
  4. 流程图:
    在这里插入图片描述

二、安装部署

  1. 源码包安装需要事先安装部署maven,下载apache-maven-3.6.3-bin.tar.gz安装包,然后解压并配置环境变量,如下命令:

    tar -zvxf apache-maven-3.6.3-bin.tar.gz -C /training/
    

    配置环境变量(此处是用root安装),编辑:vi ~/.bash_profile,在文件末尾添加如下内容:

    #maven
    export MVN_HOME=/training/apache-maven-3.6.3
    export PATH=$MVN_HOME/bin:$PATH
    

    执行:source ~/.bash_profile 使环境生效。

  2. 进入/training/apache-maven-3.6.3/conf目录下,配置maven的仓库为阿里云和华为云仓库,执行如下命令:

    cd /training/apache-maven-3.6.3/conf/
    mv settings.xml settings.xml.backup
    vi settings.xml
    

    在打开的settings.xml中,粘贴如下内容即可:

    <?xml version="1.0" encoding="utf-8"?>
    <settings xmlns="http://maven.apache.org/SETTINGS/1.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="         http://maven.apache.org/SETTINGS/1.0.0 http://maven.apache.org/xsd/settings-1.0.0.xsd"><mirrors><mirror><id>aliyunmaven</id><mirrorOf>*</mirrorOf><name>阿里云公共仓库</name><url>https://maven.aliyun.com/repository/public</url></mirror><mirror><id>huaweicloud</id><mirrorOf>central</mirrorOf><name>huaweicloud maven</name><url>https://mirrors.huaweicloud.com/repository/maven/</url></mirror></mirrors><profiles><profile><repositories><repository><id>central</id><url>https://maven.aliyun.com/repository/central</url><releases><enabled>true</enabled></releases><snapshots><enabled>true</enabled></snapshots></repository></repositories></profile></profiles>
    </settings>
    
  3. 由于CentOS7.6最小模式安装没有unzip命令,需要事先安装,执行如下命令安装:

    yum install unzip -y
    
  4. 解压源码包rocketmq-all-5.1.3-source-release.zip,进入到解压后的目录下,然后编译安装,执行如下命令:

    unzip rocketmq-all-5.1.3-source-release.zip
    cd rocketmq-all-5.1.3-source-release/
    mvn -Prelease-all -DskipTests -Dspotbugs.skip=true clean install -U
    
  5. 第5步骤正确后,进入到 /rocketmq-all-5.1.3-source-release/distribution/target/rocketmq-5.1.3/rocketmq-5.1.3目录下,然后启动NameServer,执行如下命令:

    cd  /root/rocketmq-all-5.1.3-source-release/distribution/target/rocketmq-5.1.3/rocketmq-5.1.3
    nohup sh bin/mqnamesrv &
    
  6. 验证NameServer是否启动成功,执行如下命令:

    tail -f ~/logs/rocketmqlogs/namesrv.log
    

    会看到如下内容,说明已经正常启动了

    The Name Server boot success. serializeType=JSON, address 0.0.0.0:9876
    或者执行jps命令查看是否已经有了NameServer进程:NamesrvStartup,如有说明ok

  7. 第5、6步骤正确后,进入到 /rocketmq-all-5.1.3-source-release/distribution/target/rocketmq-5.1.3/rocketmq-5.1.3目录下,然后启动Broker和Proxy,执行如下命令:注意:NameServer成功启动后,我们启动Broker和Proxy,5.x 版本下我们建议使用 Local 模式部署,即 Broker 和 Proxy 同进程部署。5.x 版本也支持 Broker 和 Proxy 分离部署以实现更灵活的集群能力。详情参考其他教程。

    cd  /root/rocketmq-all-5.1.3-source-release/distribution/target/rocketmq-5.1.3/rocketmq-5.1.3
    nohup sh bin/mqbroker -n localhost:9876 --enable-proxy &
    
  8. 验证NameServer是否启动成功,执行如下命令:

    tail -f ~/logs/rocketmqlogs/proxy.log
    

    会看到如下内容,说明已经正常启动了

    The broker[broker-a, 192.168.36.132:10911] boot success. serializeType=JSON and name server is localhost:9876
    或者执行jps命令查看是否已经有了:ProxyStartup 进程,如有说明ok

三、Java API 编写Producer和Consumer进行测试

  1. 上述正常启动NameServer和Broker及Proxy后,首先需要创建名为TestTopic的Topic,执行如下命令:
    cd  /root/rocketmq-all-5.1.3-source-release/distribution/target/rocketmq-5.1.3/rocketmq-5.1.3
    sh bin/mqadmin updatetopic -n localhost:9876 -t TestTopic -c DefaultCluster
    
    查看新创建的Topic,验证是否已经创建好,执行:
    sh bin/mqadmin topicList -n localhost:9876
    
    结果如下:
    在这里插入图片描述
  2. 创建消费者组,执行如下命令:
    cd  /root/rocketmq-all-5.1.3-source-release/distribution/target/rocketmq-5.1.3/rocketmq-5.1.3
    sh bin/sh mqadmin updateSubGroup -g testgroup -c DefaultCluster -n localhost:9876
    
    执行命令无任何错误即说明已经创建成功。
  3. 在Idea中创建Maven工程,添加rocketmq依赖,添加如下依赖到pom.xml中:
    <properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><rocketmq-client-java-version>5.0.5</rocketmq-client-java-version><slf4j.version>1.7.25</slf4j.version>
    </properties><dependencies><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client-java</artifactId><version>${rocketmq-client-java-version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>${slf4j.version}</version></dependency>
    </dependencies>
    
  4. 编写ProducerTest生产者,代码如下:
    import org.apache.rocketmq.client.apis.ClientConfiguration;
    import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
    import org.apache.rocketmq.client.apis.ClientException;
    import org.apache.rocketmq.client.apis.ClientServiceProvider;
    import org.apache.rocketmq.client.apis.message.Message;
    import org.apache.rocketmq.client.apis.producer.Producer;
    import org.apache.rocketmq.client.apis.producer.SendReceipt;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;import java.io.IOException;public class ProducerTest {private static final Logger logger = LoggerFactory.getLogger(ProducerTest.class);public static void main(String[] args) throws Exception {testMain();}public static void testMain() throws ClientException, IOException {// 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。String endpoint = "192.168.36.132:8081";// 消息发送的目标Topic名称,需要提前创建。// 执行:sh bin/mqadmin updatetopic -n localhost:9876 -t TestTopic -c DefaultClusterString topic = "TestTopic";ClientServiceProvider provider = ClientServiceProvider.loadService();ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);ClientConfiguration configuration = builder.build();// 初始化Producer时需要设置通信配置以及预绑定的Topic。Producer producer = provider.newProducerBuilder().setTopics(topic).setClientConfiguration(configuration).build();int temp = 0;while (true) {String msg = "第 " + temp + " 条消息,我喜欢rocketmq!!";temp++;// 普通消息发送。Message message = provider.newMessageBuilder().setTopic(topic)// 设置消息索引键,可根据关键字精确查找某条消息。.setKeys("messageKey")// 设置消息Tag,用于消费端根据指定Tag过滤消息。.setTag("messageTag")// 消息体。.setBody(msg.getBytes()).build();try {// 发送消息,需要关注发送结果,并捕获失败等异常。SendReceipt sendReceipt = producer.send(message);Thread.sleep(1000);logger.info("Send message successfully, messageId={}", sendReceipt.getMessageId());} catch (Exception e) {logger.error("Failed to send message", e);}}// producer.close();}
    }
    
  5. 编写CommonUtils工具类,用于将ByteBuffer转成String,代码如下:
    import java.nio.ByteBuffer;
    import java.nio.charset.Charset;
    import java.nio.charset.StandardCharsets;public class CommonUtils {public static void main(String[] args) {System.out.println("Hello world!");}public static String decodeKey(ByteBuffer bytes) {Charset charset = StandardCharsets.UTF_8;return charset.decode(bytes).toString();}public static byte[] decodeValue(ByteBuffer bytes) {int len = bytes.limit() - bytes.position();byte[] bytes1 = new byte[len];bytes.get(bytes1);return bytes1;}public static ByteBuffer encodeKey(String key) {return ByteBuffer.wrap(key.getBytes(StandardCharsets.UTF_8));}public static ByteBuffer encodeValue(byte[] value) {ByteBuffer byteBuffer = ByteBuffer.allocate(value.length);byteBuffer.clear();byteBuffer.get(value, 0, value.length);return byteBuffer;}
    }
    
  6. 编写ConsumerTest生产者,代码如下:
    import java.util.Collections;
    import org.apache.rocketmq.client.apis.ClientConfiguration;
    import org.apache.rocketmq.client.apis.ClientServiceProvider;
    import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
    import org.apache.rocketmq.client.apis.consumer.FilterExpression;
    import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
    import org.apache.rocketmq.client.apis.consumer.PushConsumer;
    import org.rocketmq.producer.CommonUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;public class PushConsumerTest {private static final Logger logger = LoggerFactory.getLogger(PushConsumerTest.class);private PushConsumerTest() {}public static void main(String[] args) throws Exception {final ClientServiceProvider provider = ClientServiceProvider.loadService();// 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。String endpoints = "192.168.36.132:8081";ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(endpoints).build();// 订阅消息的过滤规则,表示订阅所有Tag的消息。String tag = "*";FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);// 为消费者指定所属的消费者分组,Group需要提前创建。// 执行:sh bin/sh mqadmin updateSubGroup -g testgroup -c DefaultCluster -n localhost:9876String consumerGroup = "testgroup";// 指定需要订阅哪个目标Topic,Topic需要提前创建。String topic = "TestTopic";// 初始化PushConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。PushConsumer pushConsumer = provider.newPushConsumerBuilder().setClientConfiguration(clientConfiguration)// 设置消费者分组。.setConsumerGroup(consumerGroup)// 设置预绑定的订阅关系。.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))// 设置消费监听器。.setMessageListener(messageView -> {// 处理消息并返回消费结果。logger.info("Consume message successfully, messageId={},messageBody={}", messageView.getMessageId(), CommonUtils.decodeKey(messageView.getBody()));return ConsumeResult.SUCCESS;}).build();Thread.sleep(Long.MAX_VALUE);// 如果不需要再使用 PushConsumer,可关闭该实例。// pushConsumer.close();}
    }
    
  7. 为了能查看到控制台日志输入,需要在resources目录下新建log4j.properties、log4j2.properties,具体内容如下:
    log4j.properties内容:
    log4j.rootLogger=INFO,consolelog4j.appender.console=org.apache.log4j.ConsoleAppender
    log4j.appender.console.target=System.out
    log4j.appender.console.layout=org.apache.log4j.PatternLayout
    log4j.appender.console.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
    
    log4j2.properties内容:
    name = PropertiesConfig
    property.filename = target/logs#appenders = console, file
    #配置值是appender的类型,并不是具体appender实例的name
    appenders = rollingappender.rolling.type = RollingFile
    appender.rolling.name = RollingLogFile
    appender.rolling.fileName=${filename}/automationlogs.log
    appender.rolling.filePattern = ${filename}/automationlogs-%d{MM-dd-yy-HH-mm-ss}-%i.log
    appender.rolling.layout.type = PatternLayout
    appender.rolling.layout.pattern=[%-5level] %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} - %msg%n
    appender.rolling.policies.type = Policies
    appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
    appender.rolling.policies.size.size=100MB
    appender.rolling.strategy.type = DefaultRolloverStrategy
    appender.rolling.strategy.max = 5rootLogger.level = INFO,console
    rootLogger.appenderRef.rolling.ref = RollingLogFile
    
  8. 到此,完成了所有准备工作了,整个工程如下所示:
    在这里插入图片描述
  9. 运行ProducerTest程序进行消息的发送,控制台中会看到如下内容:
    在这里插入图片描述
  10. 运行ConsumerTest程序接收消息,控制台中会看到如下内容:
    在这里插入图片描述

四、小结

至此,一个单节点副本的 RocketMQ 集群已经部署起来了,我们也通过编写Java程序进行简单的消息收发。如本文对您有帮助,麻烦您动动发财的手指点个赞~~~~~,谢谢您的阅读!!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/87525.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

冶金作业VR虚拟仿真厂家

对于高风险行业来说&#xff0c;开展安全教育培训是企业的重点工作&#xff0c;传统培训逐渐跟不上时代变化和工人需求&#xff0c;冶金安全VR模拟仿真培训系统作为一种新型的教育和培训工具&#xff0c;借助VR虚拟现实技术为冶金行业的工人提供一个安全、高效的培训环境。 冶金…

后端开发9.商品类型模块

概述 简介 商品类型我设计的复杂了点,设计了多级类型 效果图 数据库设计 创建表 DROP TABLE IF EXISTS &

c语言每日一练(6)

前言&#xff1a;每日一练系列&#xff0c;每一期都包含5道选择题&#xff0c;2道编程题&#xff0c;博主会尽可能详细地进行讲解&#xff0c;令初学者也能听的清晰。每日一练系列会持续更新&#xff0c;暑假时三天之内必有一更&#xff0c;到了开学之后&#xff0c;将看学业情…

ubuntu supervisor 部署 python 项目

ubuntu supervisor 查看系统是否可用 cuda 初环境与设备安装 supervisor 环境创建 Supervisor 配置文件启动 Supervisor 服务管理项目 本篇文章将介绍 ubuntu supervisor 部署 python 项目 Supervisor 是一个用于管理和监控进程的系统工具。它的主要功能是确保系统中的进程持续…

Kuebernetes资源控制管理

第四阶段 时 间&#xff1a;2023年8月11日 参加人&#xff1a;全班人员 内 容&#xff1a; Kuebernetes资源控制管理 目录 Kubectl命令工具 一、kubectl 命令行的语法 二、kubectl命令列表 三、使用 Kubectl 工具容器资源 &#xff08;一&#xff09;创建Pod &…

腾讯云CVM服务器2核2g1m带宽支持多少人访问?

腾讯云2核2g1m的服务器支持多少人同时访问&#xff1f;2核2g1m云服务器短板是在1M公网带宽上&#xff0c;腾讯云服务器网以网站应用为例&#xff0c;当大规模用户同时访问网站时&#xff0c;很大概率会卡在公网带宽上&#xff0c;所以压根就谈不上2核2G的CPU内存计算性能是否够…

MATLAB算法实战应用案例精讲-【图像处理】图像分类模型Swin TrasnformerViT

目录 Swin Trasnformer 1. 模型介绍 2. 模型结构 3. 模型实现 4. 模型特点 5. 模型效果 ViT( Vision Transformer) 模型介绍 模型结构与实现 1. 图像分块嵌入 2. 多头注意力 3. 多层感知机&#xff08;MLP&#xff09; 4. DropPath 5. 基础模块 6. 定义ViT网络 …

分布式 - 消息队列Kafka:Kafka消费者分区再均衡(Rebalance)

文章目录 01. Kafka 消费者分区再均衡是什么&#xff1f;02. Kafka 消费者分区再均衡的触发条件&#xff1f;03. Kafka 消费者分区再均衡的过程&#xff1f;04. Kafka 如何判定消费者已经死亡&#xff1f;05. Kafka 如何避免消费者的分区再均衡?06. Kafka 消费者分区再均衡有什…

MATLAB从文件得出数据并计算吸收光谱

这一系列就是科研用的真实程序了&#xff0c;也是对自己的一个备忘录 真的收购每次都重写了 但真的文件太多了找不到啊&#xff01;&#xff01;&#xff01; 好吧是我废物 废话不多说&#xff0c;这就开始 基础的清理&#xff1a; clear clc close all 读取文件中的数据…

SpringBoot案例-部门管理-修改

目录 前言 查看页面原型&#xff0c;明确需求 页面原型 需求 阅读接口文件 思路分析 功能接口开发 控制层&#xff08;Controller类&#xff09; 业务层&#xff08;Service类&#xff09; 业务类 业务实现类 持久层&#xff08;Mapper类&#xff09; 接口测试 前…

LNMP环境介绍和搭建

一.LNMP简介 1.含义 2.工作原理 二.部署LNMP环境 1.Nginx环境 &#xff08;1&#xff09;上传nginx包&#xff0c;下载编译安装工具并解包到指定目录&#xff08;tar 参数 tar包 - C 目录路径&#xff09; &#xff08;2&#xff09; 开始编译安装&#xff0c;每次编译后…

HTTP代理编程:Python实用技巧与代码实例

今天我要与大家分享一些关于HTTP代理编程的实用技巧和Python代码实例。作为一名HTTP代理产品供应商&#xff0c;希望通过这篇文章&#xff0c;帮助你们掌握一些高效且实用的编程技巧&#xff0c;提高开发和使用HTTP代理产品的能力。 一、使用Python的requests库发送HTTP请求&a…

25. K 个一组翻转链表

25. K 个一组翻转链表 题目-困难难度示例1. 链表转列表 -> 计算 -> 列表转链表2. 反转合并 题目-困难难度 给你链表的头节点 head &#xff0c;每 k 个节点一组进行翻转&#xff0c;请你返回修改后的链表。 k 是一个正整数&#xff0c;它的值小于或等于链表的长度。如果…

mac harbor的安装

harbor的安装 为什么要整这个呢&#xff0c;因为我在学习k8s&#xff0c;但是需要一个自己的镜像仓库。于是&#xff0c;最开始想到的就是在本地直接部署一个&#xff0c;还比较安全、快速。 直接下载了官方的项目&#xff0c;运行脚本发现出了异常&#xff0c;这种异常我已经…

Ajax 笔记(一)—— Ajax 入门

笔记目录 1. Ajax 入门1.1 Ajax 概念1.2 axios 使用1.2.1 URL1.2.2 URL 查询参数1.2.3 小案例-查询地区列表1.2.4 常用请求方法和数据提交1.2.5 错误处理 1.3 HTTP 协议1.3.1 请求报文1.3.2 响应报文 1.4 接口文档1.5 案例1.5.1 用户登录&#xff08;主要业务&#xff09;1.5.2…

SpringBoot3集成Quartz

标签&#xff1a;Quartz.Job.Scheduler&#xff1b; 一、简介 Quartz由Java编写的功能丰富的开源作业调度框架&#xff0c;可以集成到几乎任何Java应用程序中&#xff0c;并且能够创建多个作业调度&#xff1b; 在实际的业务中&#xff0c;有很多场景依赖定时任务&#xff0c…

Java SpringBoot 加载 yml 配置文件中字典项

实际项目中&#xff0c;如果将该类信息放配置文件中的话&#xff0c;一般会结合Nocas一起使用 将字典数据&#xff0c;配置在 yml 文件中&#xff0c;通过加载yml将数据加载到 Map中 Spring Boot 中 yml 配置、引用其它 yml 中的配置。# 在配置文件目录&#xff08;如&#xff…

Ajax-概念、Http协议、Ajax请求及其常见问题

Ajax Ajax概念Ajax优缺点HTTP协议请求报文响应报文 Ajax案例准备工作express基本使用创建一个服务器 发送AJAX请求GET请求POST请求JSON响应 Ajax请求出现的问题IE缓存问题Ajax请求超时与网络异常处理Ajax手动取消请求Ajax重复发送请求问题 Ajax概念 AJAX 全称为Asynchronous J…

Maven从入门到好难

参考文献 永远最好的官网 超赞maven系列文章 1. 为啥要有maven 想当初&#xff0c;刚毕业刚工作&#xff0c;之前学的C&#xff0c;java不懂&#xff0c;部门用的Spring&#xff0c;于是开始学习SSM&#xff0c;妈的&#xff0c;jar包好难整&#xff0c;还要一个个下载好放到…

腾讯云服务器标准型CVM实例详细介绍S5/S6/SA2/SR1/SA3/S4等

腾讯云CVM服务器标准型实例的各项性能参数平衡&#xff0c;标准型云服务器适用于大多数常规业务&#xff0c;例如&#xff1a;web网站及中间件等&#xff0c;常见的标准型云服务器有CVM标准型S5、S6、SA3、SR1、S5se等规格&#xff0c;腾讯云服务器网来详细说下云服务器CVM标准…