使⽤发送和接收消息验证MQ
vim /etc/profileexport NAMESRV_ADDR="192.168.136.66:9876;192.168.136.67:9876"
./tools.sh org.apache.rocketmq.example.quickstart.Producer
SendResult [sendStatus=SEND_OK, msgId=7F0000011A085B2133B16F18735A03E0, offsetMsgId=C0A8884200002A9F00000000000177A8, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=2], queueOffset=124]
SendResult [sendStatus=SEND_OK, msgId=7F0000011A085B2133B16F18735D03E1, offsetMsgId=C0A8884200002A9F000000000001786A, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=3], queueOffset=124]
SendResult [sendStatus=SEND_OK, msgId=7F0000011A085B2133B16F18736003E2, offsetMsgId=C0A8884300002A9F00000000000177AA, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-b, queueId=0], queueOffset=124]
SendResult [sendStatus=SEND_OK, msgId=7F0000011A085B2133B16F18736403E3, offsetMsgId=C0A8884300002A9F000000000001786C, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-b, queueId=1], queueOffset=124]
SendResult [sendStatus=SEND_OK, msgId=7F0000011A085B2133B16F18736803E4, offsetMsgId=C0A8884300002A9F000000000001792E, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-b, queueId=2], queueOffset=124]
SendResult [sendStatus=SEND_OK, msgId=7F0000011A085B2133B16F18736C03E5, offsetMsgId=C0A8884300002A9F00000000000179F0, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-b, queueId=3], queueOffset=124]
SendResult [sendStatus=SEND_OK, msgId=7F0000011A085B2133B16F18737003E6, offsetMsgId=C0A8884200002A9F000000000001792C, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=124]
SendResult [sendStatus=SEND_OK, msgId=7F0000011A085B2133B16F18737403E7, offsetMsgId=C0A8884200002A9F00000000000179EE, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=1], queueOffset=124]
13:44:33.467 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.136.66:11011] result: true
13:44:33.481 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.136.67:11011] result: true
13:44:33.482 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.136.66:9876] result: true
13:44:33.482 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.136.67:10911] result: true
13:44:33.483 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.136.66:10911] result: true
上面可以看到发送1000条消息是成功的
验证消费者:
也是启动一个消费者的程序,消费者就将生产者发送的消息全部接收到了。使⽤bin/tools.sh⼯具验证消息的接收。
./tools.sh org.apache.rocketmq.example.quickstart.Consumer
看到接收到的消息:
ConsumeMessageThread_please_rename_unique_group_name_4_20 Receive New Messages: [MessageExt [brokerName=broker-b, queueId=0, storeSize=194, queueOffset=486, sysFlag=0, bornTimestamp=1726984215960, bornHost=/192.168.136.66:57460, storeTimestamp=1726984215964, storeHost=/192.168.136.67:10911, msgId=C0A8884300002A9F000000000005C11B, commitLogOffset=377115, bodyCRC=1193163826, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=500, CONSUME_START_TIME=1726984415179, UNIQ_KEY=7F0000011AA75B2133B16F1DAD98037D, CLUSTER=rocketmq-cluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 56, 57, 51], transactionId='null'}]]
上面就说明搭建的服务是ok的。
关闭服务器
- 关闭broker
./mqshutdown broker
- 关闭nameserver
./mqshutdown namesrv
mqadmin
[root@localhost bin]# ./mqadmin clusterList -n 127.0.0.1:9876
#Cluster Name #Broker Name #BID #Addr #Version #InTPS(LOAD) #OutTPS(LOAD) #PCWait(ms) #Hour #SPACE
rocketmq-cluster broker-a 0 192.168.136.66:10911 V4_9_8 0.00(0,0ms) 0.00(0,0ms) 0 74.73 0.0700
rocketmq-cluster broker-a 1 192.168.136.67:11011 V4_9_8 0.00(0,0ms) 0.00(0,0ms) 0 74.73 0.0600
rocketmq-cluster broker-b 0 192.168.136.67:10911 V4_9_8 0.00(0,0ms) 0.00(0,0ms) 0 74.73 0.0600
rocketmq-cluster broker-b 1 192.168.136.66:11011 V4_9_8 0.00(0,0ms) 0.00(0,0ms) 0 74.73 0.0700#创建topic
[root@localhost bin]# ./mqadmin updateTopic -n localhost:9876 -c rocketmq-cluster -t lucas_topic
create topic to 192.168.136.66:10911 success.
create topic to 192.168.136.67:10911 success.
TopicConfig [topicName=lucas_topic, readQueueNums=8, writeQueueNums=8, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false]topic建在rocketmq-cluster集群下的所有master身份的broker上,两个broker上broker-a-master和broker-b-master上都会创建topic。
#查看所有topic :
sh mqadmin topicList -n 192.168.1.23:9876
#topic路由信息
sh mqadmin topicRoute –n 192.168.208.190:9876 –t testb
#删除topic
sh mqadmin deleteTopic -n 192.168.208.190:9876 -c DefaultCluster -t topicWarning # -c 集群名 -t topic名
#查询集群消息
sh mqadmin clusterList -n 192.168.208.190:9876
上面这些命令都不是很直观
安装可视化管理控制平台
rocketmq.config.namesrvAddr=172.16.253.103:9876;172.16.253.101:9876
;172.16.253.102:9876
mvn clean package -Dmaven.test.skip=true
nohup java -jar rocketmq-console-ng-1.0.1.jar