RabbitMQ 消费者

  RabbitMQ的消费模式分两种:推模式和拉模式,推模式采用Basic.Consume进行消费,拉模式则是调用Basic.Get进行消费。
  消费者通过订阅队列从RabbitMQ中获取消息进行消费,为避免消息丢失可采用消费确认机制

消费者

  • 拉模式
    • 拉模式的实现
  • 推模式
  • 消费确认与拒绝
    • 消息确认的实现
    • 消息拒绝的实现
    • basicRecover
  • basicQos 限制消费
  • 总结

拉模式

  顾名思义,拉模式就是消费者主动的从RabbitMQ中获取数据,通过拉模式每次获取数据只能获取一条。拉模式的时序图如下图所示。
在这里插入图片描述
  RabbitMQ每次接收到Get请求后会将队列中即将被消费的消息发送给消费者,消费者接收处理消息后向RabbitMQ发送消费应答,然后该消息将从队列中移除。
  需要注意的是拉模式普遍仅适用用从RabbitMQ中获取一条数据的场景,如果以循环的方式获取批量数据将影响RabbitMQ的性能。

拉模式的实现

  拉模式通过以下方法实现:

/**
* queue 队列名称
* autoAck 是否开启自动应答
*/
GetResponse basicGet(String queue,boolean autoAck)

  如上述代码所示channel.basicGet方法返回的是一个GetResponse,在GetResponse对象中包含了一条消息内容,消费者可以获取该消息并进行处理。

推模式

  推模式是指RabbitMQ将消息主动推送给订阅监听队列的消费者。在RabbitMQ推送消息的过程中其并不关心该消费者是否完成上一条消息的消费,只要队列中存在消息则向消费者推送,当然推送消息的个数会受Basic.Qos的限制。Basic.Qos指定了某个消费者可以保持的未应答的消息数量。

    /*** Start a consumer. Calls the consumer's {@link Consumer#handleConsumeOk}* method.* Provide access to <code>basic.deliver(Broker推送消息)</code>, <code>basic.cancel</code>* and shutdown signal callbacks (which is sufficient* for most cases). See methods with a {@link Consumer} argument* to have access to all the application callbacks.* @param queue 队列名称* @param autoAck 是否自动确认* @param consumerTag 消费者标签,消费者的唯一标识符* @param noLocal 是否可以接收同Connection中生产者的消息(true不能接收)* @param exclusive 是否设置排他* @param arguments 其他参数* @param deliverCallback 消息接收回调* @param cancelCallback 消费取消回调* @param shutdownSignalCallback 连接或者信道关闭回调* @return the consumerTag associated with the new consumer*/String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException;String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, Consumer callback) throws IOException;

  可以通过上述两种方法(设置参数最多的)实现声明消费者。其中Consumer的定义如下:


public interface Consumer {/*** 消费者通过basicConsume被注册后调用*/void handleConsumeOk(String consumerTag);/*** 消费者通过basicCancel取消时调用*/void handleCancelOk(String consumerTag);/*** 消费者不通过basicCancel取消时调用*/void handleCancel(String consumerTag) throws IOException;/*** 通道或者连接关闭时调用*/void handleShutdownSignal(String consumerTag, ShutdownSignalException sig);/*** 接收重新发送的未被确认的消息时调用*/void handleRecoverOk(String consumerTag);/*** 接收消息时调用* @param consumerTag 消费者标签* @param envelope 打包消息的数据* @param properties 消息的内容标头数据* @param body 消息内容*/void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throws IOException;
}

消费确认与拒绝

  为了保障消息从队列可靠地到达消费者,RabbitMQ提供了消息确认机制。消费者在订阅队列时,可以指定autoAck参数,当autoAck为true时RabbitMQ会自动的把发送出去的消息设置为确认,然后从队列中删除;当autoACK为false时RabbitMQ会等待消费者显式回复确认信号后才从内存中移去消息(先标记再删除)。
  autoAck参数意为自动应答,但是如果该参数为true时则rabbitMQ将自动将发送的消息标记确认,无需消费者进行应答。

  当autoAck参数为false时,对于RabbitMQ服务器而言,队列中的消息分成两部分:一部分时等待投递给消费者的消息;一部分时已经投递给消费者,但是还未收到消费者确认消息的消息
  RabbitMQ不会为未确认的消息设置过期时间,如果一个消息一直未被消费者确认,那么这个消息再RabbitMQ中将一直保存为投递未确认状态,指导消费者确认或者消费者断开连接,如果消费者断开连接,则该消费者接收但未确认的消息将重新入队。

消息确认的实现

  消息的显式确认需要消费者再声明的过程中设置autoAck=false。然后该消费者消费的消息可以显式的进行确认应答。确认应答方法如下:

	 /*** @param 消息的标签,可通过Delivery.getEnvelope().getDeliveryTag()获取* @param 如果为true则将发送给该消费者的该消息之前的所有未应答的消息进行应答,如果为false则仅应答一条消息*/void basicAck(long deliveryTag, boolean multiple) throws IOException;

  当进行消息的批量确认时,将所有发送给该消费者未确认的消息进行确认,而针对监听同一队列的其他消费者的未确认消息并不进行处理。

消息拒绝的实现

  RabbitMQ提供了两种消息拒绝的方法:Basic.Reject和Basic.Nack命令;其两者的区别时Nack可以进行批量拒绝。

    /*** @param deliveryTag 消息标签* @param requeue 为true时被拒绝的消息重新入队,否则将成为死信* @throws java.io.IOException if an error is encountered*/void basicReject(long deliveryTag, boolean requeue) throws IOException;/*** @param deliveryTag 消息标签* @param multiple 如果为true则批量拒绝自该消息之前所有未确认的发送给该消费者的消息* @param requeue 为true时被拒绝的消息重新入队,否则将成为死信* @throws java.io.IOException if an error is encountered*/void basicNack(long deliveryTag, boolean multiple, boolean requeue)throws IOException;

basicRecover

该方法可以将某个消费者未应答(确认或者拒绝)的消息重新入队,该方法会导致:

  • 投递而未被应答的消息可以重新发送给消费者进行处理
  • 消费者的消息队列被清空,可以重新接收到其他消息
    /*** <p>*  Ask the broker to resend unacknowledged messages.  In 0-8* basic.recover is asynchronous; in 0-9-1 it is synchronous, and* the new, deprecated method basic.recover_async is asynchronous.* </p>* Equivalent to calling <code>basicRecover(true)</code>, messages* will be requeued and possibly delivered to a different consumer.* @see #basicRecover(boolean)*/Basic.RecoverOk basicRecover() throws IOException;/*** Ask the broker to resend unacknowledged messages.  In 0-8* basic.recover is asynchronous; in 0-9-1 it is synchronous, and* the new, deprecated method basic.recover_async is asynchronous.* @param requeue If true, messages will be requeued and possibly* delivered to a different consumer. If false, messages will be* redelivered to the same consumer.*/Basic.RecoverOk basicRecover(boolean requeue) throws IOException;

basicQos 限制消费

  默认情况下,消费者对于接收的消息数量并未限制,也就是说,一旦RabbitMQ中接收到消息并且存在消费者,则RabbitMQ将把消息发送到相关的消费者中,并不关心消费者是否消息完信息。
  轮询的默认消息分发机制会导致消费者资源不能合理利用、消费者消息积压导致内存溢出等问题。为解决上述问题可以使用basicQos方法实现限制信道上消费者所能保持的最大未确认消息数量。该方法如下:

    /*** @param prefetchSize 消息大小* @param prefetchCount 消息数量* @param global 是否全局* @throws java.io.IOException if an error is encountered*/void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;

针对global参数需要注意一下内容:

  • 当global=true时信道上所有的消费者都需要遵从消息数量限定值(某个信道上所有消费者未确认消息数量<=prefetchCount)
  • 等global=false时新的消费者需要遵从消息数量的限定值。
  • 可以调用两次basicQos方法,并使用不同的global参数,这种情况下两次配置都可以生效。

总结

  消费者就是针对某个队列进行消息监听和消息消费的。消费者消费消息存在拉模式和推模式,推模式的是使用场景相对比较多。
  为确保消息被合法的消费,RabbitMQ提供了消费确认机制,投递的消息并不能被理解完成了消费,仅消费者确认消费该消息才会被移除队列。
  默认的消息投递机制时轮询,轮询的消息分发并会关系消费者的性能以及消息积压的问题,因此需要限制每个消费者所能保持的最大未确认的消息数量。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/103332.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Delphi 中 interface 如何使用 (chatGPT回答)?

目录 1. 定义接口&#xff1a;可以使用interface关键字来定义一个接口。例如&#xff1a; 2. 实现接口&#xff1a;类可以实现一个或多个接口。要实现接口&#xff0c;需要在类声明中使用implements关键字&#xff0c;并提供对应接口的方法的实现。例如&#xff1a; 3. 使用…

科技资讯|苹果Apple Watch新专利,可根据服装、表带更换表盘颜色

根据美国商标和专利局&#xff08;USPTO&#xff09;公示的清单&#xff0c;苹果公司近日获得了一项 Apple Watch 相关的技术专利&#xff0c;最大的亮点在于配备颜色采样传感器&#xff0c;可以根据表带、服装自动变幻变盘颜色和主题。 Apple Watch 正面配备颜色采样传感器&am…

【LVS】2、部署LVS-DR群集

LVS-DR数据包的流向分析 1.客户端发送请求到负载均衡器&#xff0c;请求的数据报文到达内核空间&#xff1b; 2.负载均衡服务器和正式服务器在同一个网络中&#xff0c;数据通过二层数据链路层来传输&#xff1b; 3.内核空间判断数据包的目标IP是本机VIP&#xff0c;此时IP虚…

微服务中间件--分布式搜索ES

分布式搜索ES 11.分布式搜索 ESa.介绍ESb.IK分词器c.索引库操作 (类似于MYSQL的Table)d.查看、删除、修改 索引库e.文档操作 (类似MYSQL的数据)1) 添加文档2) 查看文档3) 删除文档4) 修改文档 f.RestClient操作索引库1) 创建索引库2) 删除索引库/判断索引库 g.RestClient操作文…

思维导图工具有哪些?如何挑选思维导图工具?10款好用工具推荐!

在我们的生活和工作中&#xff0c;思维导图是一种有效的工具&#xff0c;可以帮助我们更好地组织和理解信息&#xff0c;提高我们的学习和工作效率。当然&#xff0c;如何选择一款适合自己的思维导图工具也是一门学问。在这篇文章中&#xff0c;我将为大家介绍如何挑选一款适合…

UE4/5Niagara粒子特效之Niagara_Particles官方案例:1.1->1.4

目录 1.1-Simple Sprite Emitter ​编辑 发射器更新 粒子生成 粒子更新 1.2-Simple Sprite Emitter 发射器更新 粒子生成 粒子更新 渲染 1.3-Simple GPU Emitter 属性 发射器更新 粒子生成 粒子更新 1.4-Sprite Facing 发射器更新 粒子生成 粒子更新 通过对官方…

lwIP更新记10:IP 冲突检测

lwip-2.2.0-rc1 版本于 2023 年 6 月 29 日发布&#xff0c;带来了我期盼已久的 IPv4 冲突检测 功能。 lwip-2.2.0-rc1 版本重新回归了 master 分支&#xff08;主分支&#xff09;&#xff0c;不再使用单独的稳定分支。 master 分支 是一个 Git&#xff08;版本控制程序&…

el-table动态合并单元格

el-table使用这个方法合并单元格&#xff0c;:span-method“hbcell” <el-table size"small" :data"table.data" border empty-text"暂无数据" :cell-style"cellStyle" :header-cell-style"tableHeaderColor":span-meth…

Haproxy原理及部署

一、Haproxy简介 1、Haproxy应用分析 LVS在企业中康复在能力很强&#xff0c;但存在不足&#xff1a; LVS不支持正则处理&#xff0c;不能实现动静分离对于大型网站LVS的事实配置较为复杂&#xff0c;维护成本相对较高 Haproxy是一款可以供高可用性、负载均衡和基于TCP和HT…

OpenCV + CLion在windows环境下使用CMake编译, 出现Mutex相关的错误的解决办法

最近在windows下面用cmake编译OpenCV的项目代码,但是一直碰到找不到mutex的问题&#xff0c;百思不得其解, Executing task: g -g -o bin/debug.exe src/main.cppC:\MinGW\lib\opencv\build\include/opencv2/core/utility.hpp:697:14: error: recursive_mutex in namespace st…

ssl卸载原理

SSL卸载&#xff0c;也称为SSL解密&#xff0c;是一种将SSL加密数据流卸成非加密的明文数据流的过程。SSL卸载通常在负载均衡器、代理服务器、WAF等设备中实现&#xff0c;可以提高传输效率和安全性。 SSL卸载的原理是将SSL数据流拦截下来&#xff0c;通过设备内置的证书进行解…

图为科技-边缘计算在智慧医疗领域的作用

边缘计算在智慧医疗领域的作用 随着科技的进步&#xff0c;智慧医疗已成为医疗行业的重要发展趋势。边缘计算作为新兴技术&#xff0c;在智慧医疗领域发挥着越来越重要的作用。本文将介绍边缘计算在智慧医疗领域的应用及其优势&#xff0c;并探讨未来发展方向。 一、边缘计算…

androidstudio Please specify a signing configuration for this variant (release)

当直接运行release版本时&#xff0c;报错Error: The apk for your currently selected variant cannot be signed. Please specify a signing configuration for this variant (package64-release). 解决报错&#xff1a;添加签名&#xff0c;signingConfigs 写在buildTypes前…

知道吗?微软将Python集成到Excel中,国产软件“抄作业”了

Excel集成Python 众所周知哦&#xff0c;VBA是一种基于微软的Visual Basic语言的宏编程语言&#xff0c;专为在Office应用程序中执行自动化任务而设计。 VBA适用于Excel、Word、PowerPoint等Office套件中的宏编程&#xff0c;可直接操作和控制Office应用程序的对象模型。 我们…

appium2.0+ 单点触控和多点触控新的解决方案

在 appium2.0 之前&#xff0c;在移动端设备上的触屏操作&#xff0c;单手指触屏和多手指触屏分别是由 TouchAction 类&#xff0c;Multiaction 类实现的。 在 appium2.0 之后&#xff0c;这 2 个方法将会被舍弃。 "[Deprecated] TouchAction action is deprecated. Ple…

Docker 搭建 LNMP + Wordpress(详细步骤)

目录 一、项目模拟 1. 项目环境 2. 服务器环境 3.任务需求 二、Linux 系统基础镜像 三、Nginx 1. 建立工作目录 2. 编写 Dockerfile 脚本 3. 准备 nginx.conf 配置文件 4. 生成镜像 5. 创建自定义网络 6. 启动镜像容器 7. 验证 nginx 四、Mysql 1.…

无脑入门pytorch系列(五)—— nn.Dropout

本系列教程适用于没有任何pytorch的同学&#xff08;简单的python语法还是要的&#xff09;&#xff0c;从代码的表层出发挖掘代码的深层含义&#xff0c;理解具体的意思和内涵。pytorch的很多函数看着非常简单&#xff0c;但是其中包含了很多内容&#xff0c;不了解其中的意思…

PySide6学习笔记--gui小模版使用

一、界面绘制 1.desiner画图 2.画图代码 # -*- coding: utf-8 -*-################################################################################ ## Form generated from reading UI file t1gui.ui ## ## Created by: Qt User Interface Compiler version 6.5.2 ## ##…

备份服务器搭建

备份服务器搭建 1、背景2、作用3、选型4、环境5、部署5.1、服务端部署5.1.1、安装5.1.2、配置 5.2、客户端部署5.3、备份策略5.3.1、定时备份策略5.3.2、文件变动备份 6、参考 1、背景 随着项目的推进&#xff0c;备份服务器被提上了工作日程&#xff0c;等保、密评和接入测评…

优化指南:带宽限制的可行策略

大家好&#xff01;作为一名专业的爬虫程序员&#xff0c;我们经常面临的一个挑战就是带宽限制。尤其是在需要快速采集大量数据时&#xff0c;带宽限制成为了我们提升爬虫速度的一大阻碍。今天&#xff0c;我将和大家分享一些解决带宽限制的可行策略&#xff0c;希望能帮助大家…