RabbitMQ多种工作场景详解

目录

1、hello world体验

2、Work queues 工作序列

3、Publish/Subscribe订阅与发布

4、Routing 基于内容的路由

5、Topics 基于话题的路由

6、Headers 头部路由机制

7、Publisher Confirms 发送者消息确认

​ 1、发布单条消息

​ 2、发送批量消息

​ 3、异步确认消息


1、hello world体验

       最直接的方式,P端发送一个消息到一个指定的queue,中间不需要任何exchange规则。C端按queue方式进行消费。
关键代码:(其实关键的区别也就是几个声明上的不同。)   

producer:

channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

 consumer:

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

这个模式就是之前演示的案例。


2、Work queues 工作序列

这是RabbitMQ最基础也是最常用的一种工作机制。

        Producer消息发送给queue,多个Consumer同时往队列上消费消息。一般来讲一条消息对应一个消费者消费(消费者消费消息的数量可以设置)。

producer: 将消息直接到Queue上

channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); //任务一般是不能因为消息中间件的服务而被耽误的,所以durable设置成了true,这样,即使rabbitMQ服务断了,这个消息也不会消失
channel.basicPublish("", TASK_QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));

Consumer: 每次拉取一条消息(通过调整QOS可以指定一次拉取多少消息)

channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
channel.basicQos(1);
channel.basicConsume(TASK_QUEUE_NAME, false, consumer);

       首先,Consumer端的autoAck字段设置的是false,这表示consumer在接收到消息后不会自动反馈服务器已消费了message,而要改在对message处理完成了之后(也就是consumer中的处理逻辑中),再调用channel.basicAck来通知服务器已经消费了该message.这样即使Consumer在执行message过程中出问题了,也不会造成message被忽略,因为没有ack的message会被服务器重新进行投递。
       但是,这其中也要注意一个很常见的BUG,就是如果所有的consumer都忘记调用basicAck()了,就会造成message被不停的分发,也就造成不断的消耗系统资源。这也就是 Poison Message(毒消息)

       RabbitMQ默认是采用的fair dispatch,也叫round-robin模式,就是把消息轮询,在所有consumer中轮流发送。这种方式,没有考虑消息处理的复杂度以及consumer的处理能力。而改进后的方案,是consumer可以向服务器声明一个prefetchCount,我把他叫做预处理能力值。channel.basicQos(prefetchCount);表示当前这个consumer可以同时处理几个message。这样服务器在进行消息发送前,会检查这个consumer当前正在处理中的message(message已经发送,但是未收到consumer的basicAck)有几个,如果超过了这个consumer节点的能力值,就不再往这个consumer发布。

       这种模式,消息有可能全部阻塞,所有consumer节点都超过了能力值,那消息就阻塞在服务器上,这时需要自己及时发现这个问题,采取措施,比如增加consumer节点或者其他策略。


3、Publish/Subscribe订阅与发布

type为fanout(广播) 的exchange

       这种机制是对上面的一种补充。也就是把preducer与Consumer进行进一步的解耦。producer只负责发送消息,至于消息进入哪个queue,由exchange来分配。如上图,就是把producer发送的消息,交由exchange同时发送到两个queue里,然后由不同的Consumer去进行消费。

producer: 只负责往exchange里发消息,后面的事情不管

channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));

receiver: 将消费的目标队列绑定到exchange上

channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");

       发布与订阅关键处就是type为”fanout” 的exchange,这种类型的exchange只负责往所有已绑定的队列上发送消息。


4、Routing 基于内容的路由

 type为direct(直连)的exchange

 

       在上一种exchange往所有队列发送消息的基础上,增加一个路由配置,指定exchange如何将不同类别的消息分发到不同的queue上。

 Producer

channel.exchangeDeclare(EXCHANGE_NAME, "direct");
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));

Receiver

channel.exchangeDeclare(EXCHANGE_NAME, "direct");
channel.queueBind(queueName, EXCHANGE_NAME, routingKey1);
channel.queueBind(queueName, EXCHANGE_NAME, routingKey2);
channel.basicConsume(queueName, true, consumer);

       交换机根据routinkey(路由键)将消息路由到指定的队列中,这样消费者只关心队列中是否有消息即可。


5、Topics 基于话题的路由

 type为topic(主题)的exchange

        这个模式也就在上一个模式的基础上,对routingKey进行了模糊匹配单词之间用,隔开,* 代表一个具体的单词。# 代表0个或多个单词。

Producer

channel.exchangeDeclare(EXCHANGE_NAME, "topic");
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));

Receiver

channel.exchangeDeclare(EXCHANGE_NAME, "topic");
channel.queueBind(queueName, EXCHANGE_NAME, routingKey1);
channel.queueBind(queueName, EXCHANGE_NAME, routingKey2);
channel.basicConsume(queueName, true, consumer);

核心就是通过routingkey来进行模糊匹配。


6、Headers 头部路由机制

       在官网中还有一种路由策略并没有提及,那就是Headers路由。其实官网之所以没有过多介绍,就是因为这种策略在实际中用得比较少(官网提倡用Routingkey方式)。

Producer:发送消息时,带上消息的headers相关属性

public class EmitLogHeader {private static final String EXCHANGE_NAME = "logs";/*** exchange有四种类型, fanout topic headers direct* headers用得比较少,他是根据头信息来判断转发路由规则。头信息可以理解为一个Map* @param args* @throws Exception*/public static void main(String[] args) throws Exception{// header模式不需要routingKey来转发,他是根据header里的信息来转发的。比如消费者可以只订阅logLevel=info的消息。// 然而,消息发送的API还是需要一个routingKey。 // 如果使用header模式来转发消息,routingKey可以用来存放其他的业务消息,客户端接收时依然能接收到这个routingKey消息。String routingKey = "ourTestRoutingKey";// The map for the headers.Map<String, Object> headers = new HashMap<>();headers.put("loglevel", "error");headers.put("buslevel", "product");headers.put("syslevel", "admin");String message = "LOG INFO asdfasdf";Connection connection = RabbitMQUtil.getConnection();Channel channel = connection.createChannel();//发送者只管往exchange里发消息,而不用关心具体发到哪些queue里。channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.HEADERS);AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); builder.deliveryMode(MessageProperties.PERSISTENT_TEXT_PLAIN.getDeliveryMode());builder.priority(MessageProperties.PERSISTENT_TEXT_PLAIN.getPriority());builder.headers(headers);channel.basicPublish(EXCHANGE_NAME, routingKey, builder.build(), message.getBytes("UTF-8"));channel.close();connection.close();}
}

Consumer:声明Queue与Exchange绑定关系时,可以增加声明headers,表明自己对哪些信息感兴趣

		Map<String, Object> headers = new HashMap<String, Object>();headers.put("loglevel", "info");headers.put("buslevel", "product");headers.put("syslevel", "admin");Connection connection = RabbitMQUtil.getConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.HEADERS);String queueName = channel.queueDeclare("ReceiverHeader",true,false,false,null).getQueue();channel.queueBind(queueName, EXCHANGE_NAME,routingKey,headers);

        Headers交换机的性能相对比较低,因此官方并不建议大规模使用这种交换机,也没有把他列入基础的示例当中。


7、Publisher Confirms 发送者消息确认

       RabbitMQ的消息可靠性是非常高的,但是他以往的机制都是保证消息发送到了MQ之后,可以推送到消费者消费,不会丢失消息。但是发送者发送消息是否成功是没有保证的。发送者发送消息的基础API:Producer.basicPublish方法是没有返回值的,也就是说,一次发送消息是否成功,应用是不知道的,这在业务上就容易造成消息丢失。而这个模块就是通过给发送者提供一些确认机制,来保证这个消息发送的过程是成功的。

       发送者确认模式默认是不开启的,所以如果需要开启发送者确认模式,需要手动在channel中进行声明。  

channel.confirmSelect();

​ 1、发布单条消息

发布一条消息就确认一条消息

for (int i = 0; i < MESSAGE_COUNT; i++) {String body = String.valueOf(i);channel.basicPublish("", queue, null, body.getBytes());channel.waitForConfirmsOrDie(5_000);
}

​        channel.waitForConfirmsOrDie(5_000);这个方法就会在channel端等待RabbitMQ给出一个响应,用来表明这个消息已经正确发送到了RabbitMQ服务端。但是要注意,这个方法会同步阻塞channel,在等待确认期间,channel将不能再继续发送消息,也就是说会明显降低集群的发送速度即吞吐量。然后如果到了超时时间,还没有收到服务端的确认机制,那就会抛出异常。然后通常处理这个异常的方式是记录错误日志或者尝试重发消息,但是尝试重发时一定要注意不要使程序陷入死循环。

​ 2、发送批量消息

​       之前单条确认的机制会对系统的吞吐量造成很大的影响,所以稍微中和一点的方式就是发送一批消息后,再一起确认。

            int batchSize = 100;int outstandingMessageCount = 0;long start = System.nanoTime();for (int i = 0; i < MESSAGE_COUNT; i++) {String body = String.valueOf(i);ch.basicPublish("", queue, null, body.getBytes());outstandingMessageCount++;if (outstandingMessageCount == batchSize) {ch.waitForConfirmsOrDie(5_000);outstandingMessageCount = 0;}}if (outstandingMessageCount > 0) {ch.waitForConfirmsOrDie(5_000);}

       这种方式可以稍微缓解下发送者确认模式对吞吐量的影响。但是也有个固有的问题就是,当确认出现异常时,发送者只能知道是这一批消息出问题了, 而无法确认具体是哪一条消息出了问题。所以接下来就需要增加一个机制能够具体对每一条发送出错的消息进行处理。

​ 3、异步确认消息

实现方式,Producer在channel中添加一个注册监听器来对消息进行确认。

channel.addConfirmListener(ConfirmCallback var1, ConfirmCallback var2);@FunctionalInterface
public interface ConfirmCallback {void handle(long sequenceNumber, boolean multiple) throws IOException;
}

       参数中的两个ConfirmCallback一个表示成功逻辑、一个表示失败逻辑。

       ConfirmCallback,这个监听器接口,里面只有一个方法: void handle(long sequenceNumber, boolean multiple) throws IOException; 这方法中的两个参数如下

sequenceNumer(long)

       这个是一个唯一的序列号,代表一个唯一的消息。在RabbitMQ中,他的消息体只是一个二进制数组,默认消息是没有序列号的。那么在回调的时候,RabbitMQ提供了一个方法int sequenceNumber = channel.getNextPublishSeqNo();来生成一个全局递增的序列号,这个序列号将会分配给新发送的那一条消息。然后应用程序需要自己来将这个序列号与消息对应起来。需要客户端自己去做对应!!!
multiple(boolean)

        这个是一个Boolean型的参数。如果是false,就表示这一次只确认了当前一条消息。如果是true,就表示RabbitMQ这一次确认了一批消息,在sequenceNumber之前的所有消息都已经确认完成了。

         发送者消息确认模式不光可以确认消息是否到了Exchange,还可以确认消息是否从Exchange成功路由到了Queue。在Channel中可以添加一个ReturnListener。这个ReturnListener就会监控到这一部分发送成功了,但是无法被Consumer消费到的消息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/247751.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【JVM】运行时数据区域,内存如何分配和对象在内存中的组成

目录 一.运行时数据区域 1.线程独享 2.线程共享 二.内存如何分配 1.指针碰撞法 2.空闲列表法 3.TLAB 三.对象在内存中的组成 ​编辑1.对象头 2.实例数据 3.对齐填充 一.运行时数据区域 1.线程独享 &#xff08;1&#xff09;栈 虚拟机栈&#xff1a;每个 Java 方法在…

GPT-SoVITS 测试

开箱直用版&#xff08;使用 AutoDL&#xff09; step1 打开地址 https://www.codewithgpu.com/i/RVC-Boss/GPT-SoVITS/GPT-SoVITS-Official 选择 AutoDL创建实例&#xff0c;选择 3080ti 机器 step2 创建好实例之后&#xff0c;进入命令行&#xff0c;输入命令 echo {}>…

Vue学习之使用开发工具创建项目、gitcode管理项目

Vue学习之使用开发工具创建项目、gitcode管理项目 翻阅与学习了vue的开发工具&#xff0c;通过对比最终采用HBuilderX作为开发工具&#xff0c;以下章节对HBuilder安装与基础使用介绍 1. HBuilder 下载 从HbuildX官网&#xff08;http://www.dcloud.io/hbuilderx.html&#…

Mac安装配置maven

Mac安装配置maven 官网下载地址&#xff1a;https://maven.apache.org/download.cgi 下载好以后解压配置 maven 环境变量 打开终端&#xff0c;输入命令打开配置文件./bash_profile open ~/.bash_profile输入i进入编辑模式,进行maven配置; MAVEN_HOME为maven的本地路径 ex…

opencv——将2张图片合并

效果演示&#xff1a; 带有绿幕的图片的狮子提取出来&#xff0c;放到另一种风景图片里&#xff01; 1. 首先我们要先口出绿色绿幕&#xff0c;比如&#xff1a; 这里将绿色绿色绿幕先转为HSV&#xff0c;通过修改颜色的明暗度&#xff0c;抠出狮子的轮廓。 代码 &#xff1a…

CSS3的学习笔记

CSS3的学习笔记 什么是css: CSS是层叠样式表&#xff08;Cascading Style Sheets&#xff09;的缩写&#xff0c;是一种用来描述网页样式和布局的标记语言。它可以控制网页中的文字大小、颜色、间距、背景、边框、布局等方面&#xff0c;使网页更加美观和易于阅读。通过CSS&a…

HarmonyOS NEXT 星河版项目案例

参考代码&#xff1a;HeimaHealthy: 鸿蒙项目案例练习 (gitee.com) 1.欢迎页面 Entry Component struct WelcomePage {State message: string Hello Worldbuild() {Column({space: 10}) {Row() {// 1.中央slogonImage($r(app.media.home_slogan)).width(260)}.layoutWeight(…

MG7050HAN 基于声表的差分多输出 晶体振荡器 (HCSL)

基于MG7050 HAN的声表差分多输出晶体振荡器(HCSL)&#xff0c;采用两路或四路差分HCSL&#xff08;高速电流驱动逻辑&#xff09;输出&#xff0c;可以减少外部扇出缓冲区&#xff0c;特别适用于需要超低抖动、高频率范围内稳定工作的应用场合。其输出特性曲线超低抖动&#xf…

x-cmd pkg | sqlite3 - 轻量级的嵌入式关系型数据库

目录 简介首次用户 技术特点竞品和相关产品sqlite 与 x-cmd进一步阅读 简介 sqlite3 是一个轻量级的文件数据库&#xff0c;体积非常小&#xff0c;提供简单优雅而功能强大的 sql 化的数据查询。 通常情况下&#xff0c;sqlite 指的是 SQLite 2.x 版本&#xff0c;而 sqlite3 …

【AI视野·今日NLP 自然语言处理论文速览 第七十六期】Fri, 12 Jan 2024

AI视野今日CS.NLP 自然语言处理论文速览 Fri, 12 Jan 2024 Totally 60 papers &#x1f449;上期速览✈更多精彩请移步主页 Daily Computation and Language Papers Axis Tour: Word Tour Determines the Order of Axes in ICA-transformed Embeddings Authors Hiroaki Yamagi…

Apache Shiro 安全框架

前言 Apache Shiro 是一个强大且容易使用的Java安全矿建&#xff0c;执行身份验证&#xff0c;授权&#xff0c;密码和会话管理。使用Shiro的易于理解的API您可以快速轻松的获得任何应用程序直到大的项目。 一丶什么是Shiro 1.Shiro是什么 Apache Shiro是一个强大且易于使用…

第九节HarmonyOS 常用基础组件14-DataPanel

1、描述 数据面板组件&#xff0c;用于将多个数据占比情况使用占比图进行展示。 2、接口 DataPanel(options:{values: number[], max?: numner, type?: DataPanelType}) 3、参数 参数名 参数类型 必填 描述 values number[] 是 数据值列表&#xff0c;最多含9条数…

【数据分析】numpy基础第二天

文章目录 前言数组的形状变换reshape的基本介绍使用reshapereshape([10, 1])运行结果reshape自动判断形状reshape([-1, 1])运行结果 合并数组使用vstack和hstackvstack和hstack的运行结果使用concatenateconcatenate运行结果 分割数组array_split运行结果 数组的条件筛选条件筛…

C++ 数论相关题目:卡特兰数应用、快速幂求组合数。满足条件的01序列

给定 n 个 0 和 n 个 1 &#xff0c;它们将按照某种顺序排成长度为 2n 的序列&#xff0c;求它们能排列成的所有序列中&#xff0c;能够满足任意前缀序列中 0 的个数都不少于 1 的个数的序列有多少个。 输出的答案对 1097 取模。 输入格式 共一行&#xff0c;包含整数 n 。 …

【计网·湖科大·思科】实验五 IPV4地址-分类地址和构建超网

&#x1f57a;作者&#xff1a; 主页 我的专栏C语言从0到1探秘C数据结构从0到1探秘Linux &#x1f618;欢迎关注&#xff1a;&#x1f44d;点赞&#x1f64c;收藏✍️留言 &#x1f3c7;码字不易&#xff0c;你的&#x1f44d;点赞&#x1f64c;收藏❤️关注对我真的很重要&…

highcharts.css文件的样式覆盖了options的series里面的color问题解决

文章目录 一、问题背景二、解决问题 一、问题背景 原本的charts我们的每个数据是有对应的color显示的&#xff0c;如下图&#xff1a; 后面我们系统做了黑白模式&#xff0c;引入了highcharts的css文件&#xff0c;结果highcharts的css文件中class的颜色样式覆盖了我们数据中的…

Rollup:打包 TypeScript - React 组件库

调用浏览器摄像头拍照组件 1、前提1、安装依赖2、添加 rollup.config.js 配置3、修改 package.json3.1 添加打包命令3.2 添加组件入口3.3 添加组件声明入口3.4 浏览器支持 1、前提 1.1 通过 create-react-app take-photo --template 创建前端应用 1.2 添加组件 TakePhoto (拍照…

差异性分析汇总

在做科研写论文的时候&#xff0c;我们总会听说要对数据进行差异性分析&#xff0c;那么何为差异性分析&#xff1f;差异性分析常用的方法有哪些&#xff1f;这些方法应该如何进行分类&#xff1f;如何选择&#xff1f;差异性分析的数据格式是怎么样的&#xff1f;软件如何操作…

如何在Win系统安装Jupyter Notbook并实现无公网ip远程访问本地笔记

文章目录 1.前言2.Jupyter Notebook的安装2.1 Jupyter Notebook下载安装2.2 Jupyter Notebook的配置2.3 Cpolar下载安装 3.Cpolar端口设置3.1 Cpolar云端设置3.2.Cpolar本地设置 4.公网访问测试5.结语 1.前言 在数据分析工作中&#xff0c;使用最多的无疑就是各种函数、图表、…

C语言-指针的基本知识(上)

一、关于内存 存储器&#xff1a;存储数据器件 外存 外存又叫外部存储器&#xff0c;长期存放数据&#xff0c;掉电不丢失数据 常见的外存设备&#xff1a;硬盘、flash、rom、u盘、光盘、磁带 内存 内存又叫内部存储器&#xff0c;暂时存放数据&#xff0c;掉电数据…