【RocketMQ】基本使用:Java操作RocketMQ(rocketmq-client)

【RocketMQ】基本使用:Java操作RocketMQ(rocketmq-client)

1.引入依赖

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.3.2</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.0.3</version></dependency>

2.Producer

public class MyProducer {public static void main(String[] args) throws Exception{// 构造Producer时,必须指定groupIdDefaultMQProducer producer = new DefaultMQProducer("my_producer_group");//指定NameServer的地址  只用namesrv的地址就行,它会从namesrv上拿到broker的地址和topic信息producer.setNamesrvAddr("localhost:9876");//启动生产者producer.start();int num = 0;while (num < 20) {num++;/*** rocketmq封装了Message* String topic,* String tags, 标签(分类)---> 筛选* byte[] body*/Message message = new Message("my_test_topic", "", ("hello rocketmq:" + num).getBytes());//同步发送  发送消息,拿到返回SendResultSendResult result = producer.send(message);System.out.println(result);}//关闭生产者producer.shutdown();}
}

启动并发送消成功后,返回的SendResult如下:

SendResult中,有一个sendStatus状态,表示消息的发送状态。一共有四种状态

FLUSH_DISK_TIMEOUT : 表示没有在规定时间内完成刷盘(需要Broker 的刷盘策Ill创立设置成 SYNC_FLUSH 才会报这个错误)
FLUSH_SLAVE_TIMEOUT :表示在主备方式下,并且Broker 被设置成SYNC_MASTER 方式,没有在设定时间内完成主从同步。
SLAVE_NOT_AVAILABLE : 这个状态产生的场景和FLUSH_SLAVE_TIMEOUT 类似, 表示在主备方式下,并且Broker 被设置成SYNC_MASTER ,但是没有找到被配置成Slave 的Broker 。
SEND OK :表示发送成功,发送成功的具体含义,比如消息是否已经被存储到磁盘?消息是否被同步到了Slave 上?消息在Slave 上是否被写入磁盘?需要结合所配置的刷盘策略、主从策略来定。这个状态还可以简单理解为,没有发生上面列出的三个问题状态就是SEND OK

3.Consumer


public class MyConsumer {public static void main(String[] args) throws MQClientException {// 构造Consumer时,必须指定groupIdDefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_consumer_group");consumer.setNamesrvAddr("localhost:9876"); // nameServer地址,用于获取broker、topic信息consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);// 指定订阅的主题与tag,通过tag可以定制性消费(*表示全部tag)consumer.subscribe("my_test_topic", "*"); // 异步消费consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {
//                System.out.println("Receive Message:" + msgs.toString());// 1 try catch(throwable)确保不会因为业务逻辑的异常,导致消息出现重复消费的现象// 2 org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest.run()中会对Throwable进行捕获,//   并且返回ConsumeConcurrentlyStatus.RECONSUME_LATERtry {for(MessageExt msg:msgs){String msgbody = new String(msg.getBody(), "utf-8");System.out.println(" MessageBody: "+ msgbody);//输出消息内容}} catch (Exception e) {e.printStackTrace();return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 签收}});//启动消费者consumer.start();System.out.println("消费者启动成功。。。");}
}

收到消息的内容:

consumer Group:位于同一个consumer Group中的consumer实例

和producer Group中的各个produer实例承担的角色类似
同一个group中可以配置多个consumer,可以提高消费端的并发消费能力以及容灾
和kafka一样,多个consumer会对消息做负载均衡,意味着同一个topic下的不同messageQueue会分发给同一个group中的不同consumer。
同时,如果我们希望消息能够达到广播的目的,那么只需要把consumer加入到不同的group就行。

【转载原文地址】:【RocketMQ】基本使用:Java操作RocketMQ(rocketmq-client)

【RocketMq在windows下的环境配置】:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/145136.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C++ AB组辅导课

C AB组辅导课 蓝桥杯C AB组辅导课 第一讲 递归与递推 Acwing1、整数划分(递归)2、acwing92. 递归实现指数型枚举10凑算式(全排列)11李白打酒(全排列)12、棋牌总数(递归)13、剪邮票(递归)14、1050. 鸣人的影分身 (递归或动态规划(记忆化搜索))15、方格分割 &#xff08;dfs思维&…

【软件评测】Apowersoft 傲软抠图AI智能换背景工具软件

现如今的数字图像处理已经成为人们生活中不可或缺的一部分&#xff0c;而图像抠图作为其中的重要环节&#xff0c;更是被广泛应用于设计、摄影、广告等领域。为了满足用户的需求&#xff0c;Apowersoft推出了一款傲软抠图AI智能换背景工具&#xff0c;宣称能够自动抠图并智能替…

应用架构的演进:亚马逊的微服务实践

当你在亚马逊上购物时,或许不会想到,你看到的这个购物网站,其背后技术架构经历了什么样的变迁与升级。 还记得上世纪 90 年代,那个只卖书的网上书店吗?那时的亚马逊,不过是一个架构简单的网站,所有的功能都堆积在一个庞大的软件堡垒里。随着更多业务的增加、更新和迭代,这个软…

【数据结构篇】堆

文章目录 堆前言基本介绍认识堆堆的特点堆的分类堆的操作堆的常见应用 堆的实现JDK 自带的堆手动实现堆 堆 前言 本文主要是对堆的一个简单介绍&#xff0c;如果你是刚学数据结构的话&#xff0c;十分推荐看这篇文章&#xff0c;通过本文你将对堆这个数据结构有一个大致的了解…

Flink之Watermark生成策略

在Flink1.12以后,watermark默认是按固定频率周期性的产生. 在Flink1.12版本以前是有两种生成策略的: AssignerWithPeriodicWatermarks周期性生成watermarkAssignerWithPunctuatedWatermarks[已过时] 按照指定标记性事件生成watermark 新版本API内置的watermark策略 单调递增的…

(vue3)create-vue 组合式APIsetup、ref、watch,通信

优势&#xff1a; 更易维护&#xff1a;组合式api&#xff0c;更好的TS支持 之前是选项式api&#xff0c;现在是组合式&#xff0c;把同功能的api集合式管理 复用功能封装成一整个函数 更快的速度 更小的体积 更优的数据响应式&#xff1a;Proxy create-vue 新的脚手架工…

【小沐学前端】Node.js实现UDP通信

文章目录 1、简介2、下载和安装3、代码示例3.1 HTTP3.2 UDP单播3.4 UDP广播 结语 1、简介 Node.js 是一个开源的、跨平台的 JavaScript 运行时环境。 Node.js 是一个开源和跨平台的 JavaScript 运行时环境。 它是几乎任何类型项目的流行工具&#xff01; Node.js 在浏览器之外…

【PHP】如何关闭buffer实时输出内容到前端

前言 默认情况下&#xff0c;我们在PHP里使用echo等函数输出的内容&#xff0c;是不会马上发送给前端的&#xff0c;原因是有 buffer 的存在&#xff0c;buffer又分两处&#xff0c;一处是PHP本身的buffer&#xff0c;另一处是Nginx的buffer。只有当buffer满了之后&#xff0c…

【图论C++】链式前向星(图(树)的存储)

/*** file * author jUicE_g2R(qq:3406291309)————彬(bin-必应)* 一个某双流一大学通信与信息专业大二在读 * * brief 一直在竞赛算法学习的路上* * copyright 2023.9* COPYRIGHT 原创技术笔记&#xff1a;转载需获得博主本人…

django 实现:闭包表—树状结构

闭包表—树状结构数据的数据库表设计 闭包表模型 闭包表&#xff08;Closure Table&#xff09;是一种通过空间换时间的模型&#xff0c;它是用一个专门的关系表&#xff08;其实这也是我们推荐的归一化方式&#xff09;来记录树上节点之间的层级关系以及距离。 场景 我们 …

红米手机 导出 通讯录 到电脑保存

不要搞什么 云服务 不要安装什么 手机助手 不要安装 什么app 用 usb 线 连接 手机 和 电脑 手机上会跳出 提示 选择 仅传输文件 会出现下面的 一个 盘 进入 MIUI目录 然后进入 此电脑\Redmi Note 5\内部存储设备\MIUI\backup\AllBackup\20230927_043337 如何没有上面的文件&a…

MyBatis的一级缓存和二级缓存:原理和作用

MyBatis的一级缓存和二级缓存&#xff1a;原理和作用 引言 在数据库访问中&#xff0c;缓存是一种重要的性能优化手段&#xff0c;它可以减少数据库查询的次数&#xff0c;加快数据访问速度。MyBatis作为一款流行的Java持久层框架&#xff0c;提供了一级缓存和二级缓存来帮助…

C++(string类)

本节目标&#xff1a; 1、为什么要学习string类 2.标准库中的string类 3.vs和g下string结构说明 1.为什么学习string类 1.1 c语言中的字符串 C 语言中&#xff0c;字符串是以 \0 结尾的一些字符的集合&#xff0c;为了操作方便&#xff0c; C 标准库中提供了一些 str系列的…

1.6.C++项目:仿mudou库实现并发服务器之channel模块的设计

项目完整版在&#xff1a; 文章目录 一、channel模块&#xff1a;事件管理Channel类实现二、提供的功能三、实现思想&#xff08;一&#xff09;功能&#xff08;二&#xff09;意义&#xff08;三&#xff09;功能设计 四、代码&#xff08;一&#xff09;框架&#xff08;二…

3.物联网射频识别,(高频)RFID应用ISO14443-2协议

一。ISO14443-2协议简介 1.ISO14443协议组成及部分缩略语 &#xff08;1&#xff09;14443协议组成&#xff08;下面的协议简介会详细介绍&#xff09; 14443-1 物理特性 14443-2 射频功率和信号接口 14443-3 初始化和防冲突 &#xff08;分为Type A、Type B两种接口&…

深入理解操作系统- - 进程篇(1)

目录 进程解释&#xff1a; process in memory(进程在内存中包含什么) : 并发的进程&#xff1a; 进程定义&#xff1a; 个人定义&#xff1a; 书本定义&#xff1a; 进程状态&#xff1a; 进程何时离开CPU&#xff1a; 内部事件&#xff1a; 外部事件&#xff1a; 进…

卫星图像应用 - 洪水检测 使用DALI进行数据预处理

这篇文章是上一篇的延申。 运行环境&#xff1a;Google Colab 1. 当今的深度学习应用包含由许多串行运算组成的、复杂的多阶段数据处理流水线&#xff0c;仅依靠 CPU 处理这些流水线已成为限制性能和可扩展性的瓶颈。 2. DALI 是一个用于加载和预处理数据的库&#xff0c;可…

消息队列实现进程之间通信方式代码,现象

1、向消息队列中写入数据 #include<myhead.h>//消息结构体 typedef struct {long msgtype; //消息类型char data[1024]; //消息正文 }Msg_ds;#define SIZE sizeof(Msg_ds)-sizeof(long) //正文大小int main(int argc, const char *argv[]) {//1、创建…

raw智能照片处理工具DxO PureRAW mac介绍

DxO PureRAW Mac版是一款raw智能照片处理工具&#xff0c;该软件采用了智能技术&#xff0c;以解决影响所有RAW文件的七个问题&#xff1a;去马赛克&#xff0c;降噪&#xff0c;波纹&#xff0c;变形&#xff0c;色差&#xff0c;不想要的渐晕&#xff0c;以及缺乏清晰度。 Dx…

渗透测试之打点

请遵守中华人民共和国网络安全法 打点的目的是获取一个服务器的控制权限 1. 企业架构收集 &#xff08;1&#xff09;官网 &#xff08;2&#xff09;网站或下属的子网站&#xff0c;依次往下 天眼查 企查查 2. ICP 备案查询 ICP/IP地址/域名信息备案管理系统 使用网站…