RocketMQ消息发送基本示例(推送消费者)

消息生产者通过三种方式发送消息

1.同步发送:等待消息返回后再继续进行下面的操作  同步发送保证了消息的可靠性,适用于关键业务场景。

2.异步发送:不等待消息返回直接进入后续流程.broker将结果返回后调用callback函数,并使用

CountDownLatch计数

3.单向发送:只负责发送,不管消息是否发送成功  单向发送不保证消息的送达,仅适用于对可靠性要求不高的场景。

消费者消费消息分两种:

拉模式:消费者主动去Broker上拉取消息

推模式:消费者等待Broker把消息推送过来

事实上:尽管存在“推送消费者”(DefaultMQPushConsumer)和“拉取消费者”(DefaultMQPullConsumer)这两种消费者类型,但实际上它们都是以“拉取”模式工作的,只不过实现方式和使用场景有所不同。

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>5.3.0</version>
</dependency>

客户端与服务器安装版本一致即可

演示1    同步发送模式   客户端推送模式

注意观察   broker是把消息分两次推送的  就是发多少条消息  推送多少次

生产者     

package com.example.rocketmqdemo.simple;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;import java.nio.charset.StandardCharsets;/*** 同步发送* 使用场景:* 1.可靠性要求高,消息发送需要等待确认* 2.数据量较少的场景* 3.实时响应,消息发送需要立即得到结果* 小的订单系统* @author hrui* @date 2024/7/31 20:31*/
public class SyncProducer {public static void main(String[] args) {//创建一个DefaultMQProducer实例,指定生产者组名为"group1"DefaultMQProducer producer = new DefaultMQProducer("group1");//生产者组和消费者组是不同概念  不需要相同//设置NameServer地址,RocketMQ客户端通过NameServer获取Broker的路由信息producer.setNamesrvAddr("xxx.xxx.xxx:9876");try {//启动生产者实例producer.start();//发送10条消息for (int i = 0; i < 2; i++) {//创建消息实例,指定主题为"Topic1",标签为"Tag1",消息内容为"Hello World"加上编号Message message = new Message("Topic1", "Tag1", ("Hello World" + i).getBytes(StandardCharsets.UTF_8));//发送消息,并同步等待发送结果 (同步发送)SendResult sendResult = producer.send(message);//打印消息发送结果System.out.println("第" + i + "条消息发送成功:返回---->" + sendResult);}} catch (Exception e) {//捕获并打印异常信息e.printStackTrace();} finally {//关闭生产者实例,释放资源producer.shutdown();}}
}

消费者

package com.example.rocketmqdemo.simple;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;/*** 简单消费者* @author hrui* @date 2024/7/31 20:40*/
public class Consumer {public static void main(String[] args) {//创建一个DefaultMQPushConsumer实例,指定消费者组名为"group1"//采用长轮询机制,模拟推送效果,但本质上是主动拉取。适合低延迟、高实时性的场景。DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");//设置NameServer地址,RocketMQ客户端通过NameServer获取Broker的路由信息consumer.setNamesrvAddr("xxx.xxx.xxx:9876");try {//订阅主题"Topic1",过滤标签为"*",表示接收所有消息consumer.subscribe("Topic1", "*");//设置消息监听器,处理接收到的消息//可以传入两种类型的监听器://1. MessageListenerOrderly(顺序消费):保证消息按顺序处理//2. MessageListenerConcurrently(并发消费):消息并发处理,不保证顺序consumer.setMessageListener(new MessageListenerConcurrently() {//consumeMessage方法用于处理接收到的消息列表@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
//                    //遍历消息列表,处理每条消息
//                    list.forEach(messageExt -> {
//                        //输出消息体内容(需要根据具体的消息编码解码,这里假设为UTF-8)
//                        System.out.println(new String(messageExt.getBody()));
//                        //消息处理成功后输出确认信息
//                        System.out.println("消息消费成功");
//                    });for (int i=0;i<list.size();i++){System.out.println(i+"_消息消费成功_"+new String(list.get(i).getBody()));broker是将两条消息分别发送的}//返回消费状态,CONSUME_SUCCESS表示消息消费成功return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消费者实例,开始接收消息consumer.start();} catch (Exception e) {//捕获并打印异常信息e.printStackTrace();}}
}

演示2  异步发送

package com.example.rocketmqdemo.simple;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;/*** 异步发送消息* 并发流量高的场景下,使用异步发送消息可以提高吞吐量。* @author hrui* @date 2024/7/31 21:53*/
public class AsyncProducer {public static void main(String[] args) {//创建一个DefaultMQProducer实例,指定生产者组名为"group2"DefaultMQProducer producer = new DefaultMQProducer("group1");//生产者组和消费者组是不同概念  不需要相同//设置NameServer地址,RocketMQ客户端通过NameServer获取Broker的路由信息producer.setNamesrvAddr("xxx.xxx.xxx:9876");//计数器,用于跟踪异步消息发送的完成情况CountDownLatch countDownLatch = new CountDownLatch(100);try {// 启动生产者实例producer.start();//发送100条消息for (int i = 0; i < 100; i++) {final int index = i;//创建消息实例,指定主题为"Topic2",标签为"Tag2",消息内容为"Hello World"加上编号Message message = new Message("Topic1", "Tag1", ("Hello World" + i).getBytes(StandardCharsets.UTF_8));//发送消息,异步发送。第二个参数是SendCallback回调函数producer.send(message, new SendCallback() {@Override//发送成功时,Broker回调此方法public void onSuccess(SendResult sendResult) {//将CountDownLatch计数器减一,表示一个消息发送任务完成countDownLatch.countDown();System.out.println("消息发送成功_" + sendResult);}@Override//发送失败时,Broker回调此方法public void onException(Throwable throwable) {// 将CountDownLatch计数器减一,表示一个消息发送任务完成countDownLatch.countDown();System.out.println("消息发送失败_" + throwable.getStackTrace());}});}//等待所有消息发送完成//countDownLatch.await();boolean await = countDownLatch.await(5, TimeUnit.SECONDS);if (!await) {System.out.println("消息发送超时");}} catch (Exception e) {//捕获并打印异常信息e.printStackTrace();} finally {//关闭生产者实例,释放资源producer.shutdown();}}
}

演示3  单向发送

package com.example.rocketmqdemo.simple;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;import java.nio.charset.StandardCharsets;/*** 单向发送* 试用场景* 日志收集* @author hrui* @date 2024/7/31 22:27*/
public class OnewayProducer {public static void main(String[] args) {//创建一个DefaultMQProducer实例,指定生产者组名为"group1"DefaultMQProducer producer = new DefaultMQProducer("group1");//生产者组和消费者组是不同概念  不需要相同//设置NameServer地址,RocketMQ客户端通过NameServer获取Broker的路由信息producer.setNamesrvAddr("xxx.xxx.xxx:9876");try {//启动生产者实例producer.start();//发送10条消息for (int i = 0; i < 2; i++) {//创建消息实例,指定主题为"Topic1",标签为"Tag1",消息内容为"Hello World"加上编号  topic要和消费者相同Message message = new Message("Topic1", "Tag1", ("Hello World" + i).getBytes(StandardCharsets.UTF_8));//发送消息,单向发送,不管发送成功与否producer.sendOneway(message);System.out.println(i+"_消息发送了");}} catch (Exception e) {//捕获并打印异常信息e.printStackTrace();} finally {//关闭生产者实例,释放资源producer.shutdown();}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/388154.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MySQL---JDBC

一、JDBC是什么&#xff1f; JDBC(Java Database Connectivity):是Java访问数据库的解决方案。 JDBC定义了一套标准的接口&#xff0c;即访问数据库的通用API&#xff0c;不同数据库的厂商根据各自数据库的特点实现这些接口。 JDBC希望用相同的方式访问不同的数据库&#xff0c…

cocos creator绘制网格背景(基于矢量绘图)

在2D游戏开发中&#xff0c;设计2D地图的背景实现通常有以下几种方式&#xff1a; 静态背景图&#xff1a; 最简单的方式是使用静态背景图&#xff0c;即将整个背景作为一个静态图像加载到游戏中。这种方式适用于简单的游戏或者背景不需要变化的场景。 平铺背景图&#xff1a;…

java~反射

反射 使用的前提条件&#xff1a;必须先得到代表的字节码的Class&#xff0c;Class类用于表示.class文件&#xff08;字节码&#xff09; 原理图 加载完类后&#xff0c;在堆中就产生了一个Class类型的对象&#xff08;一个类只有一个Class对象&#xff09;&#xff0c;这个对…

湖南(市场调研公司)源点咨询 如何进行精准化用户画像细分研究

湖南源点咨询认为&#xff0c;用户画像&#xff0c;是根据用户的基本属性、用户偏好、生活习惯、用户行为等信息而抽象出来的标签化用户模型。我们在这里为大家分析为什么要建立用户画像&#xff0c;进行用户细分调研。 一、什么是用户画像 简单来讲&#xff0c;就是想要在通…

Java每日一练,技术成长不间断

目录 题目1.下列关于继承的哪项叙述是正确的&#xff1f;2.Java的跨平台特性是指它的源代码可以在多个平台运行。&#xff08;&#xff09;3.以下 _____ 不是 Object 类的方法4.以下代码&#xff1a;5.下面哪个流类不属于面向字符的流&#xff08;&#xff09;总结 题目 选自牛…

KubeSphere 部署向量数据库 Milvus 实战指南

作者&#xff1a;运维有术星主 Milvus 是一个为通用人工智能&#xff08;GenAI&#xff09;应用而构建的开源向量数据库。它以卓越的性能和灵活性&#xff0c;提供了一个强大的平台&#xff0c;用于存储、搜索和管理大规模的向量数据。Milvus 能够执行高速搜索&#xff0c;并以…

一文剖析高可用向量数据库的本质

面对因电力故障、网络问题或人为操作失误等导致的服务中断&#xff0c;数据库系统高可用能够保证系统在这些情况下仍然不间断地提供服务。如果数据库系统不具备高可用性&#xff0c;那么系统就需要承担停机和数据丢失等重大风险&#xff0c;而这些风险极有可能造成用户流失&…

python中的print函数总结

文章目录 打印变量打印数学计算多行文本复制n次字符串 x*n,n*x不换行输出多个数据换行符制表位转义原字符字符串切片格式化字符串千位分隔符&#xff08;只适用于整数和浮点数&#xff09;浮点数小数部分的精度字符串类型&#xff0c;.表示最大的显示长度整数类型浮点数类型 打…

(新)VMware虚拟机安装Linux教程(超详细)

创作不易&#xff0c;禁止转载抄袭&#xff01;&#xff01;&#xff01;违者必究&#xff01;&#xff01;&#xff01; 创作不易&#xff0c;禁止转载抄袭&#xff01;&#xff01;&#xff01;违者必究&#xff01;&#xff01;&#xff01; 创作不易&#xff0c;禁止转载抄…

C语言:扫雷游戏实现

一、扫雷游戏的分析和设计 扫雷游戏想必大家都玩过吧&#xff0c;初级的玩法是在一个9*9的棋盘上找到没有雷的格子&#xff0c;而今天我们就要做的就是9*9扫雷游戏的实现。 1、游戏功能和规则 使用控制台实现经典的扫雷游戏游戏可以通过菜单实现继续玩或者退出游戏扫雷的棋盘…

Flink SQL 的工作机制

前言 Flink SQL 引擎的工作流总结如图所示。 从图中可以看出&#xff0c;一段查询 SQL / 使用TableAPI 编写的程序&#xff08;以下简称 TableAPI 代码&#xff09;从输入到编译为可执行的 JobGraph 主要经历如下几个阶段&#xff1a; 将 SQL文本 / TableAPI 代码转化为逻辑执…

面试经典算法150题系列-数组/字符串操作之多数元素

序言&#xff1a;今天是第五题啦&#xff0c;前面四题的解法还清楚吗&#xff1f;可以到面试算法题系列150题专栏 进行复习呀。 温故而知新&#xff0c;可以为师矣&#xff01;加油&#xff0c;未来的技术大牛们。 多数元素 给定一个大小为 n 的数组 nums &#xff0c;返回其…

C#实现深度优先搜索(Depth-First Search,DFS)算法

深度优先搜索&#xff08;DFS&#xff09;是一种图搜索算法&#xff0c;它尽可能深入一个分支&#xff0c;然后回溯并探索其他分支。以下是使用C#实现DFS的代码示例&#xff1a; using System; using System.Collections.Generic;class Graph {private int V; // 顶点的数量pr…

大模型算法备案流程最详细说明【流程+附件】

文章目录 一、语料安全评估 二、黑盒测试 三、模型安全措施评估 四、性能评估 五、性能评估 六、安全性评估 七、可解释性评估 八、法律和合规性评估 九、应急管理措施 十、材料准备 十一、【线下流程】大模型备案线下详细步骤说明 十二、【线上流程】算法备案填报…

ChatGLM3-6B模型部署微调实战

准备 视频教程 https://www.bilibili.com/video/BV1ce411J7nZ?p14&vd_source165c419c549bc8d0c2d71be2d7b93ccc 视频对应的资料 https://pan.baidu.com/wap/init?surlAjPi7naUMcI3OGG9lDpnpQ&pwdvai2#/home/%2FB%E7%AB%99%E5%85%AC%E5%BC%80%E8%AF%BE%E3%80%90%E8…

HTTP协议详解(一)

协议 为了使数据在网络上从源头到达目的&#xff0c;网络通信的参与方必须遵循相同的规则&#xff0c;这套规则称为协议&#xff0c;它最终体现为在网络上传输的数据包的格式。 一、HTTP 协议介绍 HTTP&#xff08;Hyper Text Transfer Protocol&#xff09;&#xff1a; 全…

Monorepo简介

Monorepo 第一章&#xff1a;与Monorepo的邂逅第二章&#xff1a;Multirepo的困境第三章&#xff1a;Monorepo的魔力 - 不可思议的解决问题能力第四章&#xff1a;Monorepo的挑战与应对策略第五章&#xff1a;总结第六章&#xff1a;参考 第一章&#xff1a;与Monorepo的邂逅 …

【AI大模型】分布式训练:深入探索与实践优化

欢迎来到 破晓的历程的 博客 ⛺️不负时光&#xff0c;不负己✈️ 文章目录 一、分布式训练的核心原理二、技术细节与实现框架1. 数据并行与模型并行2. 主流框架 三、面临的挑战与优化策略1. 通信开销2. 数据一致性3. 负载均衡 4.使用示例示例一&#xff1a;TensorFlow中的数据…

VAE、GAN与Transformer核心公式解析

VAE、GAN与Transformer核心公式解析 VAE、GAN与Transformer&#xff1a;三大深度学习模型的异同解析 【表格】VAE、GAN与Transformer的对比分析 序号对比维度VAE&#xff08;变分自编码器&#xff09;GAN&#xff08;生成对抗网络&#xff09;Transformer&#xff08;变换器&…

设计师的素材管理神器,eagle、千鹿大测评

前言 专业的设计师都会精心维护自己的个人素材库&#xff0c;常常需要耗费大量时间用于浏览采集、分类标注、预览筛选、分享协作&#xff0c;还要管理字体、图片、音视频等各类设计素材 如果你作为设计师的话&#xff0c;今天&#xff0c;就为大家带来两款热门的素材管理工具…