前面提到当消息发送给交换机,交换机出故障,或者队列出现故障,会反馈给生产者。
如果交换机备份,将无法投递的消息发送给备份交换机,再由备份交换机给备份队列和告警队列的思路,来防止消息不丢失。
小提示
之前如果创建了confirm.exhange这个交换机,需要删除后重新创建。
代码
配置文件
spring:rabbitmq:host: 192.168.171.128username: adminpassword: 123port: 5672publisher-confirm-type: correlatedpublisher-returns: true
消费者
确认队列的消费者
package com.xkj.org.listener;import com.rabbitmq.client.Channel;
import com.xkj.org.config.BarkExchangeConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class ConfirmQueueConsumer {@RabbitListener(queues = BarkExchangeConfig.CONFIRM_QUEUE)public void receiveMsg(Message message, Channel channel) throws Exception {String msg = new String(message.getBody(), "UTF-8");log.info("收到队列的消息:{}", msg);}
}
告警队列的消费者
package com.xkj.org.listener;import com.rabbitmq.client.Channel;
import com.xkj.org.config.BarkExchangeConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class WarningQueueConsumer {@RabbitListener(queues = BarkExchangeConfig.WARNING_QUEUE)public void receiveWarningMsg(Message message, Channel channel) throws Exception {String msg = new String(message.getBody(), "UTF-8");log.error("报警发现不可路由消息:{}", msg);}
}
生产者
@ApiOperation("测试发布确认发消息")@GetMapping("/sendMessage/{msg}")public void sendMessage(@ApiParam(value = "消息内容", required = true)@PathVariable("msg") String message) {CorrelationData correlation = new CorrelationData("1");rabbitTemplate.convertAndSend("confirm.exchange", "key1"+"123", message, correlation);log.info("发送消息内容{}", message);}
说明:这里故意把routingkey写错,让消息发送失败。
配置类
package com.xkj.org.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class BarkExchangeConfig {//确认交换机public static final String CONFIRM_EXCHANGE = "confirm.exchange";//备份交换机public static final String BACKUP_EXCHANGE = "backup.exchange";//确认队列public static final String CONFIRM_QUEUE = "confirm.queue";//备份队列public static final String BACKUP_QUEUE = "backup.queue";//告警队列public static final String WARNING_QUEUE = "warning.queue";@Beanpublic DirectExchange confirmExchange() {//将确认交换机与备份交换机连接起来Map<String, Object> arguments = new HashMap<>();arguments.put("alternate-exchange", BACKUP_EXCHANGE);return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE).durable(true).withArguments(arguments).build();}@Beanpublic FanoutExchange backupExchange() {return new FanoutExchange(BACKUP_EXCHANGE);}@Beanpublic Queue confirmQueue() {return QueueBuilder.durable(CONFIRM_QUEUE).build();}@Beanpublic Queue backupQueue() {return QueueBuilder.durable(BACKUP_QUEUE).build();}@Beanpublic Queue warningQueue() {return QueueBuilder.durable(WARNING_QUEUE).build();}@Beanpublic Binding bindingConfirmQueueToExchange(@Qualifier("confirmQueue") Queue confirmQueue,@Qualifier("confirmExchange") DirectExchange confirmExchange) {return BindingBuilder.bind(confirmQueue).to(confirmExchange).with("key1");}@Beanpublic Binding bindingBackupQueueToExchange(@Qualifier("backupQueue") Queue backupQueue,@Qualifier("backupExchange") FanoutExchange backupExchange) {//删除交换机与队列绑定是不需要routingkey的return BindingBuilder.bind(backupQueue).to(backupExchange);}@Beanpublic Binding bindingWarningQueueToExchange(@Qualifier("warningQueue") Queue warningQueue,@Qualifier("backupExchange") FanoutExchange backupExchange) {return BindingBuilder.bind(warningQueue).to(backupExchange);}}
回调函数
package com.xkj.org.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import java.io.UnsupportedEncodingException;/*** 回调接口*/
@Slf4j
@Component // 1.第一步实例化MyCallback这个bean
public class MyCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {@Autowired // 2.第二步将rabbitTemplate实例依赖注入进来private RabbitTemplate rabbitTemplate;@PostConstructpublic void init() { //3.第三步执行此方法//将本类对象(ConfirmCallback的实现类对象)注入到RabbitTemplate中rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnCallback(this);}/*** 交换机确认回调方法* @param correlationData* @param ack* @param cause* 1.发消息 交换机收到 调用* 1.1 correlationData 回调消息的id及相关信息* 1.2 交换机收到消息 ack = true* 1.3 cause null* 2.发消息 交换机接收失败 回调* 2.1 correlationData 回调消息的id及相关信息* 2.2 交换机收到消息 ack = false* 2.3 cause 失败的原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {String id = correlationData != null ? correlationData.getId(): "";if(ack) {log.info("交换机已经接收到id为:{}的消息", id);}else {log.info("交换机还未收到id为:{}的消息,原因:{}", id, cause);}}/*** 可以在当消息传递过程中不可达目的时将消息返回给生产者* 注意此方法是消息传递失败才会调用,成功就不会执行* @param message* @param replyCode* @param replyText* @param exchange* @param routingKey*/@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {try {log.error("消息:{},被交换机:{},给退回了,原因:{},RoutingKey={}",new String(message.getBody(), "UTF-8"),exchange,replyText,routingKey);} catch (UnsupportedEncodingException e) {e.printStackTrace();}}
}
结论
这里mandatory参数与备份交换机一起使用的时候,如果两者同时开启,备份交换机的优先级更高。因为报警队列接收到没有路由成功的消息,但是没有输出消息被退回的错误。