rabbitmq的消息应答

消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成
了部分突然它挂掉了,会发生什么情况。RabbitMQ 一旦向消费者传递了一条消息,便立即将该消
息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续
发送给该消费这的消息,因为它无法接收到。
为了保证消息在发送过程中不丢失,rabbitmq 引入消息应答机制,消息应答就是: 消费者在接
收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了。

消息应答的方法 

Channel.basicAck(用于肯定确认)

RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了

Channel.basicNack(用于否定确认)

Channel.basicReject(用于否定确认)

与 Channel.basicNack 相比少一个参数
不处理该消息了直接拒绝,可以将其丢弃了

自动应答 

消息发送后立即被认为已经传送成功,这种模式需要在 高吞吐量和数据传输安全性方面做权
,因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢
失了,当然另一方面这种模式消费者那边可以传递过载的消息, 没有对传递的消息数量进行限制
当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终
使得内存耗尽,最终这些消费者线程被操作系统杀死, 所以这种模式仅适用在消费者可以高效并
以某种速率能够处理这些消息的情况下使用。默认消息采用的是自动应答.

手动应答

手动应答的好处是可以批量应答并且减少网络拥堵multiple 的 true 和 false 代表不同意思

true 代表批量应答 channel 上未应答的消息
比如说 channel 上有传送 tag 的消息 5,6,7,8 当前 tag 是 8 那么此时
5-8 的这些还未应答的消息都会被确认收到消息应答
false 同上面相比
只会应答 tag=8 的消息 5,6,7 这三个消息依然不会被确认收到消息应答

消费者1 

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;public class Worker01 {private static final String QUEUE_NAME="hello";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();System.out.println("C1 等待接收消息处理时间较短");DeliverCallback deliverCallback=(consumerTag, delivery)->{String receivedMessage = new String(delivery.getBody());System.out.println("接收到消息:"+receivedMessage);try {// 睡眠1秒Thread.sleep(1000*1);} catch (InterruptedException e) {throw new RuntimeException(e);}// 消息标记tag,2.false代表只应答接收到的那个传递的消息,true为应答所有消息包括传递过来的消息channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);};CancelCallback cancelCallback=(consumerTag)->{System.out.println(consumerTag+"消费者取消消费接口回调逻辑");};System.out.println("C1 消费者启动等待消费......");boolean autoAck=false;channel.basicConsume(QUEUE_NAME,autoAck,deliverCallback,cancelCallback);}
}

 消费者2


import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;public class Worker02 {private static final String QUEUE_NAME="hello";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();System.out.println("C2 等待接收消息处理时间较长");DeliverCallback deliverCallback=(consumerTag, delivery)->{String receivedMessage = new String(delivery.getBody());System.out.println("接收到消息:"+receivedMessage);try {// 睡眠30秒Thread.sleep(1000*30);} catch (InterruptedException e) {throw new RuntimeException(e);}// 消息标记tag,2.false代表只应答接收到的那个传递的消息,true为应答所有消息包括传递过来的消息channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);};CancelCallback cancelCallback=(consumerTag)->{System.out.println(consumerTag+"消费者取消消费接口回调逻辑");};System.out.println("C2 消费者启动等待消费......");boolean autoAck=false;channel.basicConsume(QUEUE_NAME,autoAck,deliverCallback,cancelCallback);}
}

生产者 

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class Producer {private final static String QUEUE_NAME = "hello";public static void main(String[] args) throws Exception {//创建一个连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("43.139.59.28");factory.setUsername("guest");factory.setPassword("guest");//channel 实现了自动 close 接口 自动关闭 不需要显示关闭try(Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {/*** 生成一个队列* 1.队列名称* 2.队列里面的消息是否持久化 默认消息存储在内存中* 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费* 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除* 5.其他参数*/channel.queueDeclare(QUEUE_NAME,false,false,false,null);String message="hello world!!!!";/*** 发送一个消息* 1.发送到那个交换机* 2.路由的 key 是哪个* 3.其他的参数信息* 4.发送消息的消息体*/for (int i = 0; i < 10; i++) {channel.basicPublish("",QUEUE_NAME,null,message.getBytes());}System.out.println("消息发送完毕");}}
}

结果

在发送者发送消息 ,发出消息之后的把 C2 消费者停掉,按理说该 C2 来处理该消息,但是
由于它处理时间较长,在还未处理完,也就是说 C2 还没有执行 ack 代码的时候,C2 被停掉了,
此时会看到消息被 C1 接收到了,说明消息  被重新入队,然后分配给能处理消息的 C1 处理了 

消息自动重新入队

如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息
未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者
可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确
保不会丢失任何消息。

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/95449.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何切换goland之中的版本号(升级go 到1.20)

go 安装/版本切换_go 切换版本_云满笔记的博客-CSDN博客 用brew就行&#xff1a; echo export PATH"/opt/homebrew/opt/go1.20/bin:$PATH" >> ~/.zshrc

HTML详解连载(7)

HTML详解连载&#xff08;7&#xff09; 专栏链接 [link](http://t.csdn.cn/xF0H3)下面进行专栏介绍 开始喽结构伪类选择器作用 :nth-child&#xff08;公式&#xff09;作用举例 伪元素选择器作用注意&#xff1a; PxCoook作用盒子模型-重要组成部分 盒子模型-边框线属性名属性…

Matlab中图例的位置(图例放在图的上方、下方、左方、右方、图外面)等

一、图例默认位置 默认的位置在NorthEast r 10; a 0; b 0; t0:0.1:2.1*pi; xar*cos(t); ybr*sin(t); A1plot(x,y,r,linewidth,4);%圆 hold on axis equal A2plot([0 0],[1 10],b,linewidth,4);%直线 legend([A1,A2],圆形,line)二、通过Location对legend的位置进行改变 变…

sykwalking8.2和mysql5.7快速部署

1.SkyWalking 是什么&#xff1f; 分布式系统的应用程序性能监视工具&#xff0c;专为微服务、云原生架构和基于容器&#xff08;Docker、K8s、Mesos&#xff09;架构而设计。 提供分布式追踪、服务网格遥测分析、度量聚合和可视化一体化解决方案。 2.SkyWalking 有哪些功能…

K8S之存储卷

K8S之存储卷 一、emptyDir emptyDir&#xff1a;可实现Pod中的容器之间共享目录数据&#xff0c;但emptyDir存储卷没有持久化数据的能力&#xff0c;存储卷会随着Pod生命周期结束而一起删除二、hostPath hostPath&#xff1a;将Node节点上的目录/文件挂载到Pod容器的指定目录…

LVS-DR模式下(RS检测)ldirectord工具实现部分节点掉点后将请求发往正常设备进行处理

基于前文的LVS-DR集群构建环境 一.下载ldirectord软件 二.将模板文件中的LVS-DR模式相关文件拷贝到/etc/ha.d主配置目录并按实际设备修改 三.配置两台RS匹配规则 四.停止RS1的http服务进行测试 RS1失去工作能力&#xff0c;RS2接替RS1 基于前文的LVS-DR集群构建环境 一.下…

VITS2来袭~

论文&#xff1a;VITS2: Improving Quality and Efficiency of Single-Stage Text-to-Speech with Adversarial Learning and Architecture Design 演示&#xff1a;https://vits-2.github.io/demo/ 论文&#xff1a;https://arxiv.org/abs/2307.16430 目前仍然存在的问题: int…

使用GUI Guider工具开发嵌入式GUI应用(6)-切换多screen换场景

使用GUI Guider工具开发嵌入式GUI应用&#xff08;6&#xff09;-切换多screen换场景 本节将展示使用GUI Guider实现切换显示页面功能。 这里设计的用例是&#xff1a; 创建3张页面&#xff0c;screen_0,screen_1和screen_2。分别在每个页面上中放置一个Label&#xff08;最…

C++ STL容器适配器(详解)

STL容器适配器 什么是适配器&#xff0c;C STL容器适配器详解 在详解什么是容器适配器之前&#xff0c;初学者首先要理解适配器的含义。 其实&#xff0c;容器适配器中的“适配器”&#xff0c;和生活中常见的电源适配器中“适配器”的含义非常接近。我们知道&#xff0c;无…

后端项目打包上传服务器记录

后端项目打包上传服务器记录 文章目录 后端项目打包上传服务器记录1、项目打包2、jar包上传服务器 本文记录打包一个后端项目&#xff0c;上传公司服务器的过程。 1、项目打包 通过IDEA的插件进行打包&#xff1a; 打成一个jar包&#xff0c;jar包的位置在控制台可以看到。 2、…

简单介绍C++中的模板

目录 一、泛型编程 泛型编程的概念: 泛型编程举例: 二、函数模板 函数模板的概念&#xff1a; 函数模板的格式&#xff1a; 函数模板的实例化: 隐式实例化&#xff1a; 显式实例化&#xff1a; 模板参数的匹配原则: 三、类模板 类模板的格式定义&#xff1a; 类模…

【学会动态规划】最长湍流子数组(23)

目录 动态规划怎么学&#xff1f; 1. 题目解析 2. 算法原理 1. 状态表示 2. 状态转移方程 3. 初始化 4. 填表顺序 5. 返回值 3. 代码编写 写在最后&#xff1a; 动态规划怎么学&#xff1f; 学习一个算法没有捷径&#xff0c;更何况是学习动态规划&#xff0c; 跟我…

LeetCode 141.环形链表

文章目录 &#x1f4a1;题目分析&#x1f4a1;解题思路&#x1f514;接口源码&#x1f4a1;深度思考❓思考1❓思考2 题目链接&#x1f449; LeetCode 141.环形链表&#x1f448; &#x1f4a1;题目分析 给你一个链表的头节点 head &#xff0c;判断链表中是否有环。 如果链表中…

怎么系统的学习机器学习、深度学习?当然是看书了

目录 前言 内容简介 学完本书&#xff0c;你将能够 作者简介 本书目录 京东自购链接 前言 近年来&#xff0c;机器学习方法凭借其理解海量数据和自主决策的能力&#xff0c;已在医疗保健、 机器人、生物学、物理学、大众消费和互联网服务等行业得到了广泛的应用。自从Ale…

react实现模拟弹框遮罩的自定义hook

需求描述 点击按钮用于检测鼠标是否命中按钮 代码实现 import React from react; import {useState, useEffect, useRef} from react;// 封装一个hook用来检测当前点击事件是否在某个元素之外 function useClickOutSide(ref,cb) {useEffect(()>{const handleClickOutside…

《论文阅读14》FAST-LIO

一、论文 研究领域&#xff1a;激光雷达惯性测距框架论文&#xff1a;FAST-LIO: A Fast, Robust LiDAR-inertial Odometry Package by Tightly-Coupled Iterated Kalman Filter IEEE Robotics and Automation Letters, 2021 香港大学火星实验室 论文链接论文github 二、论文概…

实践-传统深度学习

简介与安装 2 训练自己的数据集整体流程3 数据加载与预处理4 搭建网络模型5 学习率对结果的影响6 Drop-out操作7 权重初始化方法对比8 初始化标准差对结果的影响9 正则化对结果的影响10 加载模型进行测试 TensorFlow&#xff1a;每一步都需要自己做。 Keras&#xff1a;做起来更…

Electron基础篇

人生有些事,错过一时,就错过一世。 官网&#xff1a;简介 | Electron Electron-大多用来写桌面端软件 Electron介绍 Electront的核心组成是Chromium、Node.js以及内置的Native API&#xff0c;其中Chromium为Electron提供强大的UI能力&#xff0c;可以在不考虑兼容的情况下利…

C# 读取pcd点云文件数据

pcd文件有ascii 和二进制格式&#xff0c;ascii可以直接记事本打开&#xff0c;C#可以一行行读。但二进制格式的打开是乱码&#xff0c;如果尝试程序中读取&#xff0c;对比下看了数据也对不上。 这里可以使用pcl里的函数来读取pcd&#xff0c;无论二进制或ascii都可以正确读取…

【探索Linux】—— 强大的命令行工具 P.5(yum工具、git 命令行提交代码)

阅读导航 前言一、软件包管理器 yum1.yum的概念yum的基本指令使用例子 二、git 命令行提交代码总结温馨提示 前言 前面我们讲了C语言的基础知识&#xff0c;也了解了一些数据结构&#xff0c;并且讲了有关C的一些知识&#xff0c;也学习了一些Linux的基本操作&#xff0c;也了…