文章目录
- 一、起因
- 二、代码
- 1. 定义exchange和queue
- 2. RabbitTemplate
- 3. EnhancedCorrelationData
- 4. 发送消息
环境如下
Version | |
---|---|
SpringBoot | 3.2.1 |
spring-amqp | 3.1.1 |
RabbitMq | 3-management |
一、起因
老版本的spring-amqp
在CorrelationData
上设置ConfirmCallback
。但是今天却突然发现correlationData.getFuture()
没有addCallback
函数了。
查询文档和帖子后,发现ConfirmCallback
和ReturnsCallback
都需要在RabbitTemplate
中设置,同时ConfirmCallback
中默认无法得到消息内容,如果想在ConfirmCallback
中把消息内容存到数据库等地方进行记录,怎么办呢?
参考手册
- Spring AMQP 3.1.1
- Spring AMQP 3.1.1 API
二、代码
1. 定义exchange和queue
@Slf4j
@Configuration
public class PayNotifyConfig{//交换机public static final String PAYNOTIFY_EXCHANGE_FANOUT = "paynotify_exchange_fanout";//支付通知队列public static final String PAYNOTIFY_QUEUE = "paynotify_queue";//支付结果通知消息类型public static final String MESSAGE_TYPE = "payresult_notify";//声明交换机,且持久化@Bean(PAYNOTIFY_EXCHANGE_FANOUT)public FanoutExchange paynotify_exchange_fanout() {// 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除return new FanoutExchange(PAYNOTIFY_EXCHANGE_FANOUT, true, false);}//支付通知队列,且持久化@Bean(PAYNOTIFY_QUEUE)public Queue paynotify_queue() {return QueueBuilder.durable(PAYNOTIFY_QUEUE).build();}//交换机和支付通知队列绑定@Beanpublic Binding binding_paynotify_queue(@Qualifier(PAYNOTIFY_QUEUE) Queue queue, @Qualifier(PAYNOTIFY_EXCHANGE_FANOUT) FanoutExchange exchange) {return BindingBuilder.bind(queue).to(exchange);}
}
2. RabbitTemplate
在上面的类中继续添加RabbitTemplate
,并设置ConfirmCallback
和ReturnsCallback
。
@Beanpublic RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) {final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);//设置confirm callbackrabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {String body = "1";if (correlationData instanceof EnhancedCorrelationData) {body = ((EnhancedCorrelationData) correlationData).getBody();}if (ack) {//消息投递到exchangelog.debug("消息发送到exchange成功:correlationData={},message_id={} ", correlationData, body);System.out.println("消息发送到exchange成功:correlationData={},message_id={}"+correlationData+body);} else {log.debug("消息发送到exchange失败:cause={},message_id={}",cause, body);System.out.println("消息发送到exchange失败:cause={},message_id={}"+cause+body);}});//设置return callbackrabbitTemplate.setReturnsCallback(returned -> {Message message = returned.getMessage();int replyCode = returned.getReplyCode();String replyText = returned.getReplyText();String exchange = returned.getExchange();String routingKey = returned.getRoutingKey();// 投递失败,记录日志log.error("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}",replyCode, replyText, exchange, routingKey, message.toString());});return rabbitTemplate;}
3. EnhancedCorrelationData
原始的CorrelationData
,目前已经无法从中获取消息内容,也就是说现在的ConfirmCallback
无法获取到消息的内容,因为设计上只关注是否投递到exchange
成功。如果需要在ConfirmCallback
中获取消息的内容,需要扩展这个类,并在发消息的时候,放入自定义数据。
public class EnhancedCorrelationData extends CorrelationData {private final String body;public EnhancedCorrelationData(String id, String body) {super(id);this.body = body;}public String getBody() {return body;}
}
4. 发送消息
在EnhancedCorrelationData
把消息本身放进去,或者如果你有表记录消息,你可以只放入其id
。这样触发ConfirmCallback的时候,就可以获取消息内容。
public void notifyPayResult() {String message = "TEST Message";Message message1 = MessageBuilder.withBody(message.getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();CorrelationData correlationData = new EnhancedCorrelationData(UUID.randomUUID().toString(), message.toString());rabbitTemplate.convertAndSend(PayNotifyConfig.PAYNOTIFY_EXCHANGE_FANOUT,"", message1, correlationData);}