RabbitMQ开启消息发送确认和消费手动确认

开启RabbitMQ的生产者发送消息到RabbitMQ服务端的接收确认(ACK)和消费者通过手动确认或者丢弃消费的消息。
通过配置 publisher-confirm-type: correlatedpublisher-returns: true开启生产者确认消息。

server:port: 8014spring:rabbitmq:username: adminpassword: 123456dynamic: true
#    port: 5672
#    host: 192.168.49.9addresses: 192.168.49.10:5672,192.168.49.9:5672,192.168.49.11:5672publisher-confirm-type: correlatedpublisher-returns: trueapplication:name: shushandatasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://ip/shushanusername: rootpassword: hikari:minimum-idle: 10maximum-pool-size: 20idle-timeout: 50000max-lifetime: 540000connection-test-query: select 1connection-timeout: 600000

RabbitConfig :

package com.kexuexiong.shushan.common.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Slf4j
@Configuration
public class RabbitConfig {@Beanpublic RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory);rabbitTemplate.setMandatory(true);rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {log.info("confirmCallback  data: " + correlationData);log.info("confirmCallback ack :" + ack);log.info("confirmCallback cause :" + cause);});rabbitTemplate.setReturnsCallback(returned -> log.info("returnsCallback msg : " + returned));return rabbitTemplate;}
}

AckReceiver 手动确认消费者:

package com.kexuexiong.shushan.common.mq;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
import java.util.Map;
import java.util.Objects;@Slf4j
@Component
public class AckReceiver implements ChannelAwareMessageListener {@Overridepublic void onMessage(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();byte[] messageBody = message.getBody();try (ObjectInputStream inputStream = new ObjectInputStream(new ByteArrayInputStream(messageBody));) {Map<String, String> msg = (Map<String, String>) inputStream.readObject();log.info(message.getMessageProperties().getConsumerQueue()+"-ack Receiver :" + msg);log.info("header msg :"+message.getMessageProperties().getHeaders());if(Objects.equals(message.getMessageProperties().getConsumerQueue(),MqConstant.BUSINESS_QUEUE)){channel.basicNack(deliveryTag,false,false);}else if(Objects.equals(message.getMessageProperties().getConsumerQueue(),MqConstant.DEAD_LETTER_QUEUE)){channel.basicAck(deliveryTag, true);}else {channel.basicAck(deliveryTag, true);}} catch (Exception e) {channel.basicReject(deliveryTag, false);log.error(e.getMessage());}}
}

通过配置 simpleMessageListenerContainer.setQueueNames(MqConstant.DEAD_LETTER_QUEUE)可以监听多个消息队列。

package com.kexuexiong.shushan.common.mq;import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class MessageListenerConfig {@Autowiredprivate CachingConnectionFactory connectionFactory;@Autowiredprivate AckReceiver ackReceiver;@Beanpublic SimpleMessageListenerContainer simpleMessageListenerContainer() {SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);simpleMessageListenerContainer.setConcurrentConsumers(2);simpleMessageListenerContainer.setMaxConcurrentConsumers(2);simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);//,MqConstant.demoDirectQueue, MqConstant.FANOUT_A, MqConstant.BIG_CAR_TOPICsimpleMessageListenerContainer.setQueueNames(MqConstant.DEAD_LETTER_QUEUE);simpleMessageListenerContainer.setMessageListener(ackReceiver);return simpleMessageListenerContainer;}}
package com.kexuexiong.shushan.controller.mq;import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.RandomUtil;
import com.kexuexiong.shushan.common.mq.MqConstant;
import com.kexuexiong.shushan.controller.BaseController;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;@Slf4j
@RestController
@RequestMapping("/mq/")
public class MqController extends BaseController {@AutowiredRabbitTemplate rabbitTemplate;@GetMapping("/callback/sendDirectMessage")public String sendDirectMessageCallback(){String msgId = UUID.randomUUID().toString();String msg = "demo msg ,kexuexiong";String createTime = DateUtil.format(new Date(),"YYYY-MM-dd HH:mm:ss");Map<String,Object> map = new HashMap();map.put("msgId",msgId);map.put("msg",msg);map.put("createTime",createTime);rabbitTemplate.convertAndSend("noneDirectExchange","demoDirectRouting",map);return "ok";}@GetMapping("/callback/lonelyDirectExchange")public String lonelyDirectExchange(){String msgId = UUID.randomUUID().toString();String msg = "demo msg ,kexuexiong";String createTime = DateUtil.format(new Date(),"YYYY-MM-dd HH:mm:ss");Map<String,Object> map = new HashMap();map.put("msgId",msgId);map.put("msg",msg);map.put("createTime",createTime);rabbitTemplate.convertAndSend(MqConstant.lonelyDirectExchange,"demoDirectRouting",map);return "ok";}
}

测试:

发送dirct消息 找不到交换机情况
在这里插入图片描述

2023-10-10T17:04:58.492+08:00 ERROR 27232 --- [.168.49.10:5672] o.s.a.r.c.CachingConnectionFactory       : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'noneDirectExchange' in vhost '/', class-id=60, method-id=40)
2023-10-10T17:04:58.492+08:00  INFO 27232 --- [nectionFactory6] c.k.shushan.common.config.RabbitConfig   : confirmCallback  data: null
2023-10-10T17:04:58.492+08:00  INFO 27232 --- [nectionFactory6] c.k.shushan.common.config.RabbitConfig   : confirmCallback ack :false
2023-10-10T17:04:58.492+08:00  INFO 27232 --- [nectionFactory6] c.k.shushan.common.config.RabbitConfig   : confirmCallback cause :channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'noneDirectExchange' in vhost '/', class-id=60, method-id=40)

ack 为false。

发送dirct消息 找不到队列
在这里插入图片描述

2023-10-10T17:05:55.851+08:00  INFO 27232 --- [nectionFactory5] c.k.shushan.common.config.RabbitConfig   : confirmCallback  data: null
2023-10-10T17:05:55.852+08:00  INFO 27232 --- [nectionFactory5] c.k.shushan.common.config.RabbitConfig   : confirmCallback ack :true
2023-10-10T17:05:55.852+08:00  INFO 27232 --- [nectionFactory5] c.k.shushan.common.config.RabbitConfig   : confirmCallback cause :null
2023-10-10T17:05:55.865+08:00  INFO 27232 --- [nectionFactory6] c.k.shushan.common.config.RabbitConfig   : returnsCallback msg : ReturnedMessage [message=(Body:'[serialized object]' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]), replyCode=312, replyText=NO_ROUTE, exchange=lonelyDirectExchange, routingKey=demoDirectRouting]

ACK为true,replyText=NO_ROUTE。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/154166.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Reactor网络模式

文章目录 1. 关于Reactor模式的了解2. 基于Reactor模式实现epoll ET服务器2.1 EventItem类的实现2.2 Reactor类的实现Dispatcher函数AddEvent函数DelEvent函数EnableReadWrite函数 2.3 四个回调函数的实现acceptor回调函数recver回调函数sender回调函数errorer回调函数 3. epol…

mac使⽤nginx

⽅法1&#xff1a;homebrew 默认本地已经安装homebrew&#xff1b; 安装与启动 brew install nginx 安装nginx&#xff1b; brew services start nginx 启动nginx nginx⽂件⽬录 1. nginx安装⽂件⽬录/usr/local/Cellar/nginx 2. nginx配置⽂件⽬录/usr/local/etc/nginx 3. con…

【办公-excel】两个时间相减 (二) - 带毫秒的时间进行相减操作

一、使用内部函数 1.1 效果展示 TEXT(((RIGHT(TEXT(B2,"yyyy-mm-dd hh:mm:ss.000"),LEN(TEXT(B2,"yyyy-mm-dd hh:mm:ss.000"))-FIND(".",TEXT(B2,"yyyy-mm-dd hh:mm:ss.000")))-RIGHT(TEXT(A2,"yyyy-mm-dd hh:mm:ss.000"),…

微信支付v2

文档&#xff1a; https://pay.weixin.qq.com/wiki/doc/api/index.html 微信小程序&#xff1a;https://pay.weixin.qq.com/wiki/doc/api/jsapi.php?chapter11_1 需要一个微信认证后的小程序&#xff0c;&#xff0c;还需要一个&#xff0c;在微信商户平台&#xff0c;&…

jdbc(DriverManager+Connection+Statement+ResultSet)+SQL注入+开启预编译+数据连接池

1 JDBC概念 JDBC 就是使用Java连接并操作数据库的一套API 全称&#xff1a;( Java DataBase Connectivity ) Java 数据库连接 2 JDBC优势 可随时替换底层数据库&#xff0c;访问数据库的Java代码基本不变 以后编写操作数据库的代码只需要面向JDBC&#xff08;接口&#xf…

C++三大特性——继承(上篇)

文章目录 目录 一、继承的概念及定义 1.1继承的概念 1.2 继承定义 1.2.1定义格式 1.2.2继承关系和访问限定符 1.2.3继承基类成员访问方式的变化 二、基类和派生类对象赋值转换 三、继承中的作用域 四、派生类的默认成员函数 一、继承的概念及定义 1.1继承的概念 继承(inherita…

最新AI智能创作系统源码AI绘画系统/支持GPT联网提问/支持Prompt应用

AI绘图专业设计 不得将程序用作任何违法违纪内容&#xff0c;不要让亲人两行泪 界面部分图解构&#xff1a; 前台show&#xff1a; 前端部署&#xff1a; 安装pm2管理器 点击设置 选择v16.19.1版本-切换版本 再新建一个网站 点击设置 添加反向代理-代理名称随便…

[MongoDB]-权限验证管理

[MongoDB]-权限验证管理 senge | 2023年9月 背景说明&#xff1a;现有两套MongoDB副本集群给开发人员使用时未开启认证。 产生影响&#xff1a;用户若输入账号以及密码则会进行校验&#xff0c;但用户可以在不输入用户名和密码的情况下也可直接登录。 倘若黑客借此进行攻击勒索…

ElasticSearch 学习7 集成ik分词器

网上找了一大堆&#xff0c;很多都介绍的不详细&#xff0c;开始安装完一直报错找不到plugin-descriptor.properties&#xff0c;有些懵这个东西不应该带在里面吗&#xff0c;参考了一篇博客说新建一个这个&#xff0c;新建完可以启动&#xff0c;但是插入索引数据会报错找不到…

Mini-dashboard 和meilisearch配合使用

下载的meilisearch一般是development模式&#xff0c;内置客户端&#xff0c;修改客户端后需要重要全部编译&#xff0c;花时间太长了。前后端分离才是正道&#xff0c;客户端修改不用重新编译后端。 方法如下&#xff1a; 1、修改配置文件/etc/meilisearch.toml&#xff0c;…

FPGA实现电机霍尔编码器模块

一. 简介 想要知道直流电机的转速&#xff0c;就需要用到编码器&#xff0c;常用的编码器有霍尔和光电两种&#xff0c;但是光电编码器比较贵(性能好于霍尔)&#xff0c;所以平常的时候使用最多的是霍尔编码器了。 霍尔编码器一般有AB两相信号输出&#xff0c;默认的时候为低…

基于springboot实现家具销售电商平台管理系统项目【项目源码+论文说明】

基于springboot实现家具销售电商平台管理系统演示 摘要 社会的发展和科学技术的进步&#xff0c;互联网技术越来越受欢迎。网络计算机的交易方式逐渐受到广大人民群众的喜爱&#xff0c;也逐渐进入了每个用户的使用。互联网具有便利性&#xff0c;速度快&#xff0c;效率高&am…

ssti 前置学习

python venv环境 可以把它想象成一个容器&#xff0c;该容器供你用来存放你的Python脚本以及安装各种Python第三方模块&#xff0c;容器里的环境和本机是完全分开的 创建venv环境安装flask #apt install python3.10-venv #cd /opt #python3 -m venv flask1 #cd /opt 选…

信息增益,经验熵和经验条件熵——决策树

目录 1.经验熵 2.经验条件熵 3.信息增益 4.增益比率 5.例子1 6.例子2 在决策树模型中&#xff0c;我们会考虑应该选择哪一个特征作为根节点最好&#xff0c;这里就用到了信息增益 通俗上讲&#xff0c;信息增益就是在做出判断时&#xff0c;该信息对你影响程度的大小。比…

SpringCloud源码探析(十)-Web消息推送

1.概述 消息推送在日常使用中的场景比较多&#xff0c;比如有人点赞了我的博客或者关注了我&#xff0c;这时我就会收到一条推送消息&#xff0c;以此来吸引我点击或者打开应用。消息推送的方式主要分为两种&#xff1a;web消息推送和移动端消息推送。它将所要发送的信息&…

剑指offer——JZ84 二叉树中和为某一值的路径(三) 解题思路与具体代码【C++】

一、题目描述与要求 二叉树中和为某一值的路径(三)_牛客题霸_牛客网 (nowcoder.com) 题目描述 给定一个二叉树root和一个整数值 sum &#xff0c;求该树有多少路径的的节点值之和等于 sum 。 1.该题路径定义不需要从根节点开始&#xff0c;也不需要在叶子节点结束&#xff…

【已解决】msvcp140.dll丢失怎样修复?msvcp140.dll重新安装的解决方法

今天我要和大家分享的是关于msvcp140.dll丢失的五种不同解决方法。我们知道&#xff0c;在运行一些软件或游戏的时候&#xff0c;经常会遇到“msvcp140.dll丢失”的问题&#xff0c;这可能会影响到我们的使用体验。那么&#xff0c;面对这个问题&#xff0c;我们应该如何应对呢…

【2023研电赛】安谋科技企业命题特别奖:面向独居老人的智能居家监护系统

本文为2023年第十八届中国研究生电子设计竞赛安谋科技企业命题特别奖分享&#xff0c;参加极术社区的【有奖活动】分享2023研电赛作品扩大影响力&#xff0c;更有丰富电子礼品等你来领&#xff01;&#xff0c;分享2023研电赛作品扩大影响力&#xff0c;更有丰富电子礼品等你来…

Http请求响应 Ajax 过滤器

10/10/2023 近期总结&#xff1a; 最近学的后端部署&#xff0c;web服务器运行&#xff0c;各种请求响应&#xff0c;内容很多&#xff0c;学的很乱&#xff0c;还是需要好好整理&#xff0c;前面JavaSE内容还没有完全掌握&#xff0c;再加上一边刷题&#xff0c;感觉压力很大哈…

JavaScript Web APIs第五天笔记

Web APIs - 第5天笔记 目标&#xff1a; 能够利用JS操作浏览器,具备利用本地存储实现学生就业表的能力 BOM操作综合案例 js组成 JavaScript的组成 ECMAScript: 规定了js基础语法核心知识。比如&#xff1a;变量、分支语句、循环语句、对象等等 Web APIs : DOM 文档对象模型&…