时间比较仓促,部署Linux的过程我就简写啦。
0. 我的系统版本(供参考)
Windows 11 专业版 24H2, Build 26100.2161
Experience: Windows Feature Experience Pack 1000.26100.32.0
JDK: OpenJDK Amazon Corretto 21.0.4.7.1
1. 在WSL2上安装Ubuntu 24.04
我完全参照了这个安装,流程上没啥问题:Install Ubuntu on WSL2 - Ubuntu WSL documentation
遇到的坑:
(1)安装之后需要以管理员身份打开。进入到命令行才算是安装完成。
(2)打开Ubuntu 24.04后无法进入命令行,显示WslRegisterDistribution failed with error: 0x80370114 后面跟着一堆问号或者啥
解决方法:控制面板→启用或关闭Windows功能→勾选 Hyper-V和Virtual Machine Platform,最好不要勾选Telnet→确定后重启电脑。
2. 安装Docker Desktop:Windows | Docker Docs
不需要付费,也不用登录,直接打开就行。遇到的坑:
(1)无法打开Docker Desktop,显示 An unexpected error was encountered while executing a WSL command…
解决方法:确认已安装的Linux子系统来自Ubuntu Distribution,并设置为默认。详见:https://stackoverflow.com/questions/76160943/docker-desktop-an-unexpected-error-was-encountered-while-executing-a-wsl-comma
3. 从Docker安装RocketMQ 5.3.1
在标题栏上搜索“rocketmq”,然后直接 pull 即可。
4. 创建一个共享网络,并启动一个命名服务。
以管理员身份打开Powershell,然后执行
docker network create rocketmqdocker run -d --name rmqnamesrv -p 9876:9876 --network rocketmq apache/rocketmq:5.3.1 sh mqnamesrv
现在我们占用了 9876 这个端口。还可以顺便用这个命令检查一下是否正常启动了:
docker logs -f rmqnamesrv
当然,从Docker Desktop也能看:
5. 启用Broker和Proxy
官方步骤在这个:Run RocketMQ in Docker | RocketMQ
但是我的步骤和官网不太一样,可以先试试我的步骤
(1)随便找个地方新建个记事本,然后重命名为broker.conf,空文件就行,啥也不用写。我建在了 D:\broker.conf
(2)在Powershell中执行
docker run -d `--name rmqbroker `--network rocketmq `-p 10912:10912 -p 10911:10911 -p 10909:10909 `-p 8080:8080 -p 8081:8081 `-e "NAMESRV_ADDR=rmqnamesrv:9876" `-v /D/broker.conf:/home/rocketmq/rocketmq-5.3.1/conf/broker.conf `apache/rocketmq:5.3.1 sh mqbroker --enable-proxy `-c /home/rocketmq/rocketmq-5.3.1/conf/broker.conf
注意:-v后面推荐使用Unix风格的绝对路径,比如我的写成 /D/broker.conf,冒号后面的部分不用改。
10909、10911、10912三个端口都应该做映射(如命令所示)。现在我们能在容器列表看到:
6. 点进 rmqbroker,确认一下 broker.conf文件真实存在
依次展开:/home/rocketmq/rocketmq-5.3.1/conf/,看有没有 broker.conf 文件
7. 回到 Exec,执行以下命令,然后重启容器
cd /home/rocketmq/rocketmq-5.3.1/conf/echo "brokerIP1=127.0.0.1" > broker.conf
重启容器:点击右上角那个圈圈箭头
8. 如果你想,可以手动建立一个Topic,也可以不建
这次我们在 Powershell 里进入容器,然后执行新建命令,就叫 TopicTest 吧,默认集群即可
docker exec -it rmqbroker shsh mqadmin updateTopic -n rmqnamesrv:9876 -t TopicTest
9. Java 代码验证一下
(0)POM
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>5.3.1</version>
</dependency><!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-common -->
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-common</artifactId><version>5.3.1</version>
</dependency>
(1)生产者
package com.example.demo.rmq;import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;@SuppressWarnings("preview")
public class Producer {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup");producer.setNamesrvAddr("localhost:9876");producer.start();Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes());producer.send(msg, 10000);System.out.println(STR."Message sent: \{new String(msg.getBody())}");producer.shutdown();}
}
(2)消费者
package com.example.demo.rmq;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;@SuppressWarnings("preview")
public class Consumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup");consumer.setNamesrvAddr("localhost:9876");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe("TopicTest", "*");consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {for (MessageExt msg : msgs) {System.out.println(STR."Received message: \{new String(msg.getBody())}");}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});consumer.setMessageModel(MessageModel.BROADCASTING);consumer.start();}
}
(3)删除 Topic
package com.example.demo.rmq;import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.remoting.exception.RemotingException;import java.util.concurrent.ExecutionException;@SuppressWarnings("preview")
public class DeleteTopics {public static void main(String[] args) throws ExecutionException, InterruptedException, RemotingException, MQClientException {final String topicToDelete = "TopicTest";final String nameServerAddr = "localhost:9876";final String brokerAddr = "localhost:10911";ClientConfig config = new ClientConfig();config.setNamesrvAddr(nameServerAddr);var admin = MQClientManager.getInstance().getOrCreateMQClientInstance(config).getMQClientAPIImpl();admin.start();System.out.println(STR."Before deletion, topics available: \{admin.getTopicListFromNameServer(3000).getTopicList()}");// Step 1: Delete topic from NameServeradmin.deleteTopicInNameServer(nameServerAddr, topicToDelete, 3000);// Step 2: Delete topic from Brokeradmin.deleteTopicInBroker(brokerAddr, topicToDelete, 3000);System.out.println(STR."After deletion, topics available: \{admin.getTopicListFromNameServer(3000).getTopicList()}");}
}
运行结果:
Before deletion, topics available: [RMQ_SYS_TRANS_HALF_TOPIC, BenchmarkTest, OFFSET_MOVED_EVENT, TBW102, %RETRY%ConsumerGroup, rmq_sys_REVIVE_LOG_DefaultCluster, SELF_TEST_TOPIC, DefaultCluster, SCHEDULE_TOPIC_XXXX, DefaultCluster_REPLY_TOPIC, rmq_sys_wheel_timer, 6d472f2c2a3d, rmq_sys_SYNC_BROKER_MEMBER_6d472f2c2a3d, RMQ_SYS_TRANS_OP_HALF_TOPIC, TopicTest]
After deletion, topics available: [RMQ_SYS_TRANS_HALF_TOPIC, BenchmarkTest, OFFSET_MOVED_EVENT, TBW102, %RETRY%ConsumerGroup, rmq_sys_REVIVE_LOG_DefaultCluster, SELF_TEST_TOPIC, DefaultCluster, SCHEDULE_TOPIC_XXXX, DefaultCluster_REPLY_TOPIC, rmq_sys_wheel_timer, 6d472f2c2a3d, rmq_sys_SYNC_BROKER_MEMBER_6d472f2c2a3d, RMQ_SYS_TRANS_OP_HALF_TOPIC]
可以发现末尾的 TopicTest 已经删掉了。
That's it.
---
后记:在最新版本的 RocketMQ 中,已经没有 DefaultMQAdminExt 等常用的 Admin 类了,都需要通过 ClientConfig 构建 MQClient,不知道为何官方文档迟迟没有更新。另外,已经不需要 DeleteTopicRequestHeader 等麻烦的类了。
其他内容欢迎大家补充。