RocketMQ mqadmin java springboot python 调用笔记

命令

mqadmin命令列表

yeqiang@yeqiang-MS-7B23:/opt/rocketmq-all-5.1.3-bin-release$ sh bin/mqadmin
The most commonly used mqadmin commands are:updateTopic               Update or create topicdeleteTopic               Delete topic from broker and NameServer.updateSubGroup            Update or create subscription groupsetConsumeMode            Set consume message mode. pull/pop etc.deleteSubGroup            Delete subscription group from broker.updateBrokerConfig        Update broker's configupdateTopicPerm           Update topic permtopicRoute                Examine topic route infotopicStatus               Examine topic Status infotopicClusterList          Get cluster info for topicaddBroker                 Add a broker to specified containerremoveBroker              Remove a broker from specified containerresetMasterFlushOffset    Reset master flush offset in slavebrokerStatus              Fetch broker runtime status dataqueryMsgById              Query Message by IdqueryMsgByKey             Query Message by KeyqueryMsgByUniqueKey       Query Message by Unique keyqueryMsgByOffset          Query Message by offsetqueryMsgTraceById         Query a message traceprintMsg                  Print Message DetailprintMsgByQueue           Print Message DetailsendMsgStatus             Send msg to broker.brokerConsumeStats        Fetch broker consume stats dataproducerConnection        Query producer's socket connection and client versionconsumerConnection        Query consumer's socket connection, client version and subscriptionconsumerProgress          Query consumers's progress, speedconsumerStatus            Query consumer's internal data structurecloneGroupOffset          Clone offset from other group.producer                  Query producer's instances, connection, status, etc.clusterList               List cluster infostopicList                 Fetch all topic list from name serverupdateKvConfig            Create or update KV config.deleteKvConfig            Delete KV config.wipeWritePerm             Wipe write perm of broker in all name server you defined in the -n paramaddWritePerm              Add write perm of broker in all name server you defined in the -n paramresetOffsetByTime         Reset consumer offset by timestamp(without client restart).skipAccumulatedMessage    Skip all messages that are accumulated (not consumed) currentlyupdateOrderConf           Create or update or delete order confcleanExpiredCQ            Clean expired ConsumeQueue on broker.deleteExpiredCommitLog    Delete expired CommitLog filescleanUnusedTopic          Clean unused topic on broker.startMonitoring           Start MonitoringstatsAll                  Topic and Consumer tps statsallocateMQ                Allocate MQcheckMsgSendRT            Check message send response timeclusterRT                 List All clusters Message Send RTgetNamesrvConfig          Get configs of name server.updateNamesrvConfig       Update configs of name server.getBrokerConfig           Get broker config by cluster or special brokergetConsumerConfig         Get consumer config by subscription group namequeryCq                   Query cq command.sendMessage               Send a messageconsumeMessage            Consume messageupdateAclConfig           Update acl config yaml file in brokerdeleteAclConfig           Delete Acl Config Account in brokerclusterAclConfigVersion   List all of acl config version information in clusterupdateGlobalWhiteAddr     Update global white address for acl Config File in brokergetAclConfig              List all of acl config information in clusterupdateStaticTopic         Update or create static topic, which has fixed number of queuesremappingStaticTopic      Update or create static topic, which has fixed number of queuesexportMetadata            Export metadataexportConfigs             Export configsexportMetrics             Export metricshaStatus                  Fetch ha runtime status datagetSyncStateSet           Fetch syncStateSet for target brokersgetBrokerEpoch            Fetch broker epoch entriesgetControllerMetaData     Get controller cluster's metadatagetControllerConfig       Get controller config.updateControllerConfig    Update controller config.electMaster               Re-elect the specified broker as mastercleanBrokerMetadata       Clean metadata of broker on controllerdumpCompactionLog         parse compaction log to messagegetColdDataFlowCtrInfo    get cold data flow ctr infoupdateColdDataFlowCtrGroupConfig addOrUpdate cold data flow ctr group configremoveColdDataFlowCtrGroupConfig remove consumer from cold ctr configsetCommitLogReadAheadMode set read ahead mode for all commitlog files

topicList

yeqiang@yeqiang-MS-7B23:/opt/rocketmq-all-5.1.3-bin-release$ sh bin/mqadmin topicList  -n localhost:9876
%RETRY%please_rename_unique_group_name
RMQ_SYS_TRANS_HALF_TOPIC
stringRequestTopic
%RETRY%objectRequestConsumer
%RETRY%please_rename_unique_group_name_4
TRANS_CHECK_MAX_TIME_TOPIC
BenchmarkTest
%RETRY%genericRequestConsumer
string-topic
TBW102
rmq_sys_REVIVE_LOG_DefaultCluster
SELF_TEST_TOPIC
%RETRY%string_consumer_newns
SCHEDULE_TOPIC_XXXX
DefaultCluster_REPLY_TOPIC
rmq_sys_SYNC_BROKER_MEMBER_yeqiang-MS-7B23
RMQ_SYS_TRANS_OP_HALF_TOPIC
TopicTest
localhost.localdomain
order-paid-topic
%RETRY%my-group1
user-topic
%RETRY%string_trans_consumer
message-ext-topic
OFFSET_MOVED_EVENT
%RETRY%user_consumer
%RETRY%order-paid-consumer
yeqiang-MS-7B23
DefaultCluster
spring-transaction-topic
%RETRY%stringRequestConsumer
bytesRequestTopic
%RETRY%string_consumer
%RETRY%bytesRequestConsumer
%RETRY%rocketmq-consume-demo-message-ext-consumer

statsAll

yeqiang@yeqiang-MS-7B23:/opt/rocketmq-all-5.1.3-bin-release$ sh bin/mqadmin statsAll  -n localhost:9876 
#Topic                                                            #Consumer Group                                                  #Accumulation      #InTPS     #OutTPS   #InMsg24Hour  #OutMsg24Hour
RMQ_SYS_TRANS_HALF_TOPIC                                          CID_RMQ_SYS_TRANS                                                           0        0.00        0.00              0              0
stringRequestTopic                                                stringRequestConsumer                                                       1        0.00        0.00              0              0
TRANS_CHECK_MAX_TIME_TOPIC                                                                                                                    0        0.00                          0    NO_CONSUMER
BenchmarkTest                                                                                                                                 0        0.00                          0    NO_CONSUMER
string-topic                                                      string_consumer                                                           106        0.00        0.00              0              0
string-topic                                                      string_consumer_newns                                                      63        0.00        0.00              0              0
TBW102                                                                                                                                        0        0.00                          0    NO_CONSUMER
rmq_sys_REVIVE_LOG_DefaultCluster                                                                                                             0        0.00                          0    NO_CONSUMER
SELF_TEST_TOPIC                                                                                                                               0        0.00                          0    NO_CONSUMER
SCHEDULE_TOPIC_XXXX                                                                                                                           0        0.00                          0    NO_CONSUMER
DefaultCluster_REPLY_TOPIC                                                                                                                    0        0.00                          0    NO_CONSUMER
rmq_sys_SYNC_BROKER_MEMBER_yeqiang-MS-7B23                                                                                                    0        0.00                          0    NO_CONSUMER
RMQ_SYS_TRANS_OP_HALF_TOPIC                                       CID_RMQ_SYS_TRANS                                                           0        0.00        0.00              0              0
TopicTest                                                         please_rename_unique_group_name                                           252        0.00        0.00              0              0
TopicTest                                                         please_rename_unique_group_name_4                                           0        0.00        0.00              0              0
localhost.localdomain                                                                                                                         0        0.00                          0    NO_CONSUMER
order-paid-topic                                                  order-paid-consumer                                                         1        0.00        0.00              0              0
user-topic                                                        user_consumer                                                               2        0.00        0.00              0              0
message-ext-topic                                                 rocketmq-consume-demo-message-ext-consumer                                  2        0.00        0.00              0              0
OFFSET_MOVED_EVENT                                                                                                                            0        0.00                          0    NO_CONSUMER
yeqiang-MS-7B23                                                                                                                               0        0.00                          0    NO_CONSUMER
DefaultCluster                                                                                                                                0        0.00                          0    NO_CONSUMER
spring-transaction-topic                                          string_trans_consumer                                                      15        0.00        0.00              0              0
bytesRequestTopic                                                 bytesRequestConsumer                                                        0        0.00        0.00              0              0

topicStatus

yeqiang@yeqiang-MS-7B23:/opt/rocketmq-all-5.1.3-bin-release$ sh bin/mqadmin topicStatus  -n localhost:9876 -t string-topic
#Broker Name                      #QID  #Min Offset           #Max Offset             #Last Updated
yeqiang-MS-7B23                   0     0                     35                      2023-08-25 16:21:35,786
yeqiang-MS-7B23                   1     0                     52                      2023-08-25 14:55:57,152
yeqiang-MS-7B23                   2     0                     33                      2023-08-25 16:21:35,646
yeqiang-MS-7B23                   3     0                     42                      2023-08-25 14:55:57,172
yeqiang-MS-7B23                   4     0                     1                       2023-08-25 16:21:34,355
yeqiang-MS-7B23                   5     0                     1                       2023-08-25 14:55:57,105
yeqiang-MS-7B23                   6     0                     4                       2023-08-25 16:23:01,489
yeqiang-MS-7B23                   7     0                     1                       2023-08-25 16:21:36,186

Python 生产者:producer.py

from rocketmq.client import Producer, MessagegroupName = "my-group1"
nameserver = "127.0.0.0:9876"
topicName = "string-topic"
TAGS = "tag-my-group1"
KEYS = "key-my-group1-0"
# 初始化生产者,并设置生产组信息,组名称使用全称,例:rocketmq-xxx|namespace_python%group1
producer = Producer(groupName)
# 设置服务地址
producer.set_name_server_address(nameserver)
# 设置权限(角色名和密钥)
# producer.set_session_credentials(
#     accessKey,  # 角色密钥
#     secretKey,  # 角色名称
#     ''
# )
# 启动生产者
producer.start()# 组装消息   topic名称,在控制台 topic 页面复制。
msg = Message(topicName)
# 设置keys
msg.set_keys(TAGS)
# 设置tags
msg.set_tags(KEYS)
# 消息内容
msg.set_body('This is a new message.')# 发送同步消息
ret = producer.send_sync(msg)
print(ret.status, ret.msg_id, ret.offset)
# 资源释放
producer.shutdown()

运行

yeqiang@yeqiang-MS-7B23:~/code/pythonproj/python-rocketmq-demo$ source /home/yeqiang/code/pythonproj/python-rocketmq-demo/venv/bin/activate
(venv) yeqiang@yeqiang-MS-7B23:~/code/pythonproj/python-rocketmq-demo$ /home/yeqiang/code/pythonproj/python-rocketmq-demo/venv/bin/python /home/yeqiang/code/pythonproj/python-rocketmq-demo/producer.py
SendStatus.OK 7F0001012857767267388CFD61230000 35
(venv) yeqiang@yeqiang-MS-7B23:~/code/pythonproj/python-rocketmq-demo$ 

mqadmin查询topic状态

yeqiang@yeqiang-MS-7B23:/opt/rocketmq-all-5.1.3-bin-release$ sh bin/mqadmin topicStatus  -n localhost:9876 -t string-topic
#Broker Name                      #QID  #Min Offset           #Max Offset             #Last Updated
yeqiang-MS-7B23                   0     0                     36                      2023-08-28 09:03:35,722
yeqiang-MS-7B23                   1     0                     52                      2023-08-25 14:55:57,152
yeqiang-MS-7B23                   2     0                     33                      2023-08-25 16:21:35,646
yeqiang-MS-7B23                   3     0                     42                      2023-08-25 14:55:57,172
yeqiang-MS-7B23                   4     0                     1                       2023-08-25 16:21:34,355
yeqiang-MS-7B23                   5     0                     1                       2023-08-25 14:55:57,105
yeqiang-MS-7B23                   6     0                     4                       2023-08-25 16:23:01,489
yeqiang-MS-7B23                   7     0                     1                       2023-08-25 16:21:36,186

yeqiang@yeqiang-MS-7B23:/opt/rocketmq-all-5.1.3-bin-release$ sh bin/mqadmin topicRoute  -n localhost:9876 -t string-topic
{"brokerDatas":[{"brokerAddrs":{0:"10.47.76.67:10911"},"brokerName":"yeqiang-MS-7B23","cluster":"DefaultCluster","enableActingMaster":false}],"filterServerTable":{},"queueDatas":[{"brokerName":"yeqiang-MS-7B23","perm":6,"readQueueNums":8,"topicSysFlag":0,"writeQueueNums":8}]
}

图形工具rocketmq-dashborad

https://github.com/apache/rocketmq-dashboard

自行编译

mvn clean package -Dmaven.test.skip=true

启动

java -Drocketmq.namesrv.addr=127.0.0.1:9876 -jar target/rocketmq-dashboard-1.0.0.jar

 

 

(venv) yeqiang@yeqiang-MS-7B23:~/code/pythonproj/python-rocketmq-demo$ /home/yeqiang/code/pythonproj/python-rocketmq-demo/venv/bin/python /home/yeqiang/code/pythonproj/python-rocketmq-demo/producer.py
SendStatus.OK 7F0001012DF4226307248D16C3250000 36

 

 consoumer.py

import time
from rocketmq.client import PushConsumer, ConsumeStatus
# 消息处理回调groupName = "my-group1"
nameserver = "127.0.0.0:9876"
topicName = "string-topic"
KEYS = "key-my-group1-0"def callback(msg):# 模拟业务print('Received message. messageId: ', msg.id, ' body: ', msg.body)# 消费成功回复CONSUME_SUCCESSreturn ConsumeStatus.CONSUME_SUCCESS# 消费成功回复消息状态# return ConsumeStatus.RECONSUME_LATER# 初始化消费者,并设置消费者组信息
consumer = PushConsumer(groupName)
# 设置服务地址
consumer.set_name_server_address(nameserver)
# 设置权限(角色名和密钥)
# consumer.set_session_credentials(
#     accessKey,	 # 角色密钥
#     secretKey,   # 角色名称
#     ''
# )
# 订阅topic
consumer.subscribe(topicName, callback, "*")
print(' [Consumer] Waiting for messages.')
# 启动消费者
consumer.start()while True:time.sleep(3600)
# 资源释放
consumer.shutdown()

启动python消费者

(venv) yeqiang@yeqiang-MS-7B23:~/code/pythonproj/python-rocketmq-demo$ /home/yeqiang/code/pythonproj/python-rocketmq-demo/venv/bin/python /home/yeqiang/code/pythonproj/python-rocketmq-demo/consumer.py[Consumer] Waiting for messages.
Received message. messageId:  7F0001012DF4226307248D16C3250000  body:  b'This is a new message.'

可以看到my-group1已被消费 

再启动一个consumer.py,产生一次消息

可以看到,只有一个consumer消费到了消息,说明默认情况下,消息非广播模式。

Java生产一个消息:

training: Java SpringBoot SpringCloud k8s等练习程序 - Gitee.com

 

 

 

python rocketmq依赖

Release rocketmq-client-cpp-2.1.0 · apache/rocketmq-client-cpp · GitHub

python完整程序

python-rocketmq-demo: python3 rocketmq5 的一个例子

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/110698.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【深度学习_TensorFlow】过拟合

写在前面 过拟合与欠拟合 欠拟合: 是指在模型学习能力较弱,而数据复杂度较高的情况下,模型无法学习到数据集中的“一般规律”,因而导致泛化能力弱。此时,算法在训练集上表现一般,但在测试集上表现较差&…

亿发浙江生产工厂信息化建设管理平台,实现生产智能化、数字化

在全球化、科技深刻变革的时代,浙江省信息化建设正迎来新的发展机遇。以物联网、人工智能大数据、为代表的新技术应用,为人类社会带来了智能、便捷,也标志着新一代信息化浪潮已经到来。特别是在生产型企业中,智能制造是生产型企业…

运用Python解析HTML页面获取资料

在网络爬虫的应用中,我们经常需要从HTML页面中提取图片、音频和文字资源。本文将介绍如何使用Python的requests库和BeautifulSoup解析HTML页面,获取这些资源。 一、环境准备 首先,确保您已经安装了Python环境。接下来,我们需要安…

HUT23级训练赛

目录 A - tmn学长的字符串1 B - 帮帮神君先生 C - z学长的猫 D - 这题用来防ak E - 这题考察FFT卷积 F - 这题考察二进制 G - 这题考察高精度 H - 这题考察签到 I - 爱派克斯,启动! J - tmn学长的字符串2 K - 秋奕来买瓜 A - tmn学长的字符串1 思路&#x…

CSS中如何实现背景图片的平铺和定位?

聚沙成塔每天进步一点点 ⭐ 专栏简介⭐ 平铺背景图片⭐ 背景图片定位⭐ 同时设置平铺和定位⭐ 写在最后 ⭐ 专栏简介 前端入门之旅:探索Web开发的奇妙世界 记得点击上方或者右侧链接订阅本专栏哦 几何带你启航前端之旅 欢迎来到前端入门之旅!这个专栏是…

3D点云处理:基于2D边缘提取的方法提取3D点云边缘(占位待补充)

文章目录 0. 实现效果 微信:dhlddx B站演示视频 0. 实现效果

1.1 数据库系统简介

思维导图: 1.1.数据库系统简介 前言: 数据库系统是一个软件系统,用于管理和操作数据库。它提供了一个组织良好、高效并能够方便存取的数据存储机制,并且能够支持各种数据操作、事务管理、并发控制和恢复功能。以下是数据库系统的…

基于PIC单片机温度-脉搏-DS18B20温度-液晶12864显示(proteus仿真+源程序)

一、系统方案 1、上电初始化液晶第一行显示脉搏,第二行显示温度,第三行显示模式,第四行显示强度;按下K1按键可以选择模式,催眼模式或治疗模式。 2、治疗模块下,可以通过K2、K3修改强度。 二、硬件设计 原理…

c++之指针

总结性质 我们如何在一个函数中获取数组的长度: 我们都知道,在main函数中我们获得数组的长度只需要使用sizeof(a)/sizeof(a【0】)即可获得,但当我们把一个数组传入到方法时,c默认把…

W5500-EVB-PICO进行UDP组播数据回环测试(九)

前言 上一章我们用我们的开发板作为UDP客户端连接服务器进行数据回环测试,那么本章我们进行UDP组播数据回环测试。 什么是UDP组播? 组播是主机间一对多的通讯模式, 组播是一种允许一个或多个组播源发送同一报文到多个接收者的技术。组播源将…

完美解决Ubuntu网络故障,连接异常,IP地址一直显示127.0.0.1

终端输入ifconfig显示虚拟机IP地址为127.0.0.1&#xff0c;具体输出内容如下&#xff1a; wxyubuntu:~$ ifconfig lo: flags73<UP,LOOPBACK,RUNNING> mtu 65536inet 127.0.0.1 netmask 255.0.0.0inet6 ::1 prefixlen 128 scopeid 0x10<host>loop txqueuelen …

Java【手撕滑动窗口】LeetCode 209. “长度最小子数组“, 图文详解思路分析 + 代码

文章目录 前言一、长度最小子数组1, 题目2, 思路分析3, 代码 前言 各位读者好, 我是小陈, 这是我的个人主页, 希望我的专栏能够帮助到你: &#x1f4d5; JavaSE基础: 基础语法, 类和对象, 封装继承多态, 接口, 综合小练习图书管理系统等 &#x1f4d7; Java数据结构: 顺序表, 链…

苹果新健康专利:利用 iPhone、Apple Watch 来分析佩戴者的呼吸情况

根据美国商标和专利局&#xff08;USPTO&#xff09;公示的清单&#xff0c;苹果获得了一项健康相关的技术专利&#xff0c;可以利用 iPhone、Apple Watch 来分析佩戴者的呼吸系统。 苹果在专利中概述了一种测量用户呼吸功能的系统&#xff0c;通过 iPhone 上的光学感测单元&am…

中央发文:提高青年人才资助比例, 放宽学历、年龄限制 (附2023国自然资助比例统计)~

8 月 27 日&#xff0c;中共中央办公厅、国务院办公厅印发《关于进一步加强青年科技人才培养和使用的若干措施》&#xff08;以下简称《若干措施》&#xff09;&#xff0c;明确提出包括提高国家自然科学基金对青年科技人才的资助比例&#xff0c;放宽学历、年龄限制等措施&…

stm32之DHT11

今天&#xff0c;记录一下DHT11&#xff0c;涉及到了单总线协议&#xff0c;所以先花点时间谈论一下单总线协议&#xff08;DS18B20也是用的单总线&#xff09;。 单总线协议 单总线技术的通信协议 可能这时序图就是个例子&#xff0c;ds18b20的时序图与DHT11的时序图也是不一…

我想开通期权?如何开通期权账户?

场内期权的合约由交易所统一标准化定制&#xff0c;大家面对的同一个合约对应的价格都是一致的&#xff0c;比较公开透明&#xff0c;期权开户当天不能交易的&#xff0c;期权开户需要满足20日日均50万及半年交易经验即可操作&#xff0c;下文科普我想开通期权&#xff1f;如何…

软件工程(十七) 行为型设计模式(三)

1、观察者模式 简要说明 定义对象间的一种一对多的依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都得到通知并自动更新 速记关键字 联动,广播消息 类图如下 基于上面的类图,我们来实现一个监听器。类图中的Subject对应我们的被观察对象接口(IObservable),…

开源与云计算:新的合作模式

&#x1f337;&#x1f341; 博主猫头虎 带您 Go to New World.✨&#x1f341; &#x1f984; 博客首页——猫头虎的博客&#x1f390; &#x1f433;《面试题大全专栏》 文章图文并茂&#x1f995;生动形象&#x1f996;简单易学&#xff01;欢迎大家来踩踩~&#x1f33a; &a…

SQL注入之宽字节注入

文章目录 宽字节注入是什么&#xff1f;注入练习让转义符失效联合查询 代码审计 宽字节注入是什么&#xff1f; 宽字节注入准确来说不是注入手法&#xff0c;而是另外一种比较特殊的情况。宽字节注入的目的是绕过单双引号转义。 宽字节注入是一种绕过单双引号转义的手段&#x…

HQL解决连续三天登陆问题

1.背景 统计连续登录天数超过3天的用户&#xff0c;输出信息包括&#xff1a;用户id&#xff0c;登录天数&#xff0c;起始时间&#xff0c;结束时间&#xff1b; 2.准备数据 -- 建表 create table if not exists user_login_3days(user_id STRING,login_date date );--插入…