RabbitMQ进阶篇

文章目录

  • 发送者的可靠性
    • 生产者重试机制
    • 实现生产者确认
  • MQ的可靠性
    • 数据持久化
      • 交换机持久化
      • 队列持久化
      • 消息持久化
    • Lazy Queue(可配置~)
      • 控制台配置Lazy模式
      • 代码配置Lazy模式
      • 更新已有队列为lazy模式
  • 消费者的可靠性
    • 消费者确认机制
    • 失败重试机制
    • 失败处理策略
  • 业务幂等性
    • 唯一消息ID
    • 业务判断
    • 兜底方案
  • 延迟消息
    • 死信交换机(不推荐使用)
    • 延迟消息
    • DelayExchange插件(推荐)
      • 插件下载地址:
      • 安装:
      • 声明延迟交换机
      • 发送延迟消息
    • 超时订单问题
      • 定义常量
      • 抽取共享mq配置
      • 改造下单业务
      • 编写查询支付状态接口
      • 消息监听

发送者的可靠性

保证一个消息发送出去,至少被消费一次。
image.png
可能在多个步骤中给消息弄丢了

生产者重试机制

不建议使用, 会增加网络和资源的消耗

第一种情况,就是生产者发送消息时,出现了网络故障,导致与MQ的连接中断
当RabbitTemplate与MQ连接超时后,多次重试
修改publisher模块的application.yaml文件,添加下面的内容:

spring:rabbitmq:connection-timeout: 1s # 设置MQ的连接超时时间template:retry:enabled: true # 开启超时重试机制initial-interval: 1000ms # 失败后的初始等待时间multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multipliermax-attempts: 3 # 最大重试次数

如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。

实现生产者确认

image.png

  • ConfirmCallback为发送Exchange(交换器)时回调,成功或者失败都会触发;
  • ReturnCallback为路由不到队列时触发,成功则不触发;

Rabbitmq之ConfirmCallback与ReturnCallback使用_rabbitmq returncallback-CSDN博客
在publisher模块的application.yaml中添加配置:

spring:rabbitmq:publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型publisher-returns: true # 开启publisher return机制

这里publisher-confirm-type有三种模式可选:

  • none:关闭confirm机制
  • simple:同步阻塞等待MQ的回执
  • correlated:MQ异步回调返回回执

定义ReturnCallback:
image.png

package com.itheima.publisher.config;import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Configuration;import javax.annotation.PostConstruct;@Slf4j
@AllArgsConstructor
@Configuration
public class MqConfig {private final RabbitTemplate rabbitTemplate;@PostConstructpublic void init(){rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {log.error("触发return callback,");log.debug("exchange: {}", returned.getExchange());log.debug("routingKey: {}", returned.getRoutingKey());log.debug("message: {}", returned.getMessage());log.debug("replyCode: {}", returned.getReplyCode());log.debug("replyText: {}", returned.getReplyText());}});}
}

image.png
image.png
定义ConfirmCallback:
由于每个消息发送时的处理逻辑不一定相同,因此**ConfirmCallback需要在每次发消息时定义。**具体来说,是在调用RabbitTemplate中的convertAndSend方法 时,多传递一个参数:

image.png

这里的CorrelationData中包含两个核心的东西:

  • id:消息的唯一标示,MQ对不同的消息的回执以此做判断,避免混淆
  • SettableListenableFuture:回执结果的Future对象

将来MQ的回执就会通过这个Future来返回,我们可以提前给CorrelationData中的Future添加回调函数来处理消息回执:

image.png

我们新建一个测试,向系统自带的交换机发送消息,并且添加ConfirmCallback:

@Test
void testPublisherConfirm() {// 1.创建CorrelationData,需要一个UUID,回调的时候通过id识别CorrelationData cd = new CorrelationData();// 2.给Future添加ConfirmCallbackcd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {@Overridepublic void onFailure(Throwable ex) {// 2.1.Future发生异常时的处理逻辑,基本不会触发log.error("send message fail", ex);}@Overridepublic void onSuccess(CorrelationData.Confirm result) {// 2.2.Future接收到回执的处理逻辑,参数中的result就是回执内容if(result.isAck()){ // result.isAck(),boolean类型,true代表ack回执,false 代表 nack回执log.debug("发送消息成功,收到 ack!");}else{ // result.getReason(),String类型,返回nack时的异常描述log.error("发送消息失败,收到 nack, reason : {}", result.getReason());}}});// 3.发送消息rabbitTemplate.convertAndSend("hmall.direct", "q", "hello", cd);
}

image.png

开启生产者确认比较消耗MQ性能,一般不建议开启。而且大家思考一下触发确认的几种情况:

  • 路由失败:一般是因为RoutingKey错误导致,往往是编程导致
  • 交换机名称错误:同样是编程错误导致
  • MQ内部故障:这种需要处理,但概率往往较低。因此只有对消息可靠性要求非常高的业务才需要开启,而且仅仅需要开启ConfirmCallback处理nack就可以了

image.png

MQ的可靠性

为了提升性能,默认情况下MQ的数据都是在内存存储的临时数据,重启后就会消失。为了保证数据的可靠性,必须配置数据持久化
image.png

  • 交换机持久化
  • 队列持久化
  • 消息持久化

数据持久化

交换机持久化

image.png

设置为Durable就是持久化模式,Transient就是临时模式。

队列持久化

image.png

消息持久化

image.png
image.png

说明:在开启持久化机制以后,如果同时还开启了生产者确认,那么MQ会在消息持久化以后才发送ACK回执,进一步确保消息的可靠性。
不过出于性能考虑,为了减少IO次数,发送到MQ的消息并不是逐条持久化到数据库的,而是每隔一段时间批量持久化。一般间隔在100毫秒左右,这就会导致ACK有一定的延迟,因此建议生产者确认全部采用异步方式。

当内存占满, page out会影响MQ阻塞

Lazy Queue(可配置~)

image.png

  • 接收到消息后直接存入磁盘而非内存
  • 消费者要消费消息时才会从磁盘中读取并加载到内存(也就是懒加载)
  • 支持数百万条的消息存储

而在3.12版本之后,LazyQueue已经成为所有队列的默认格式。官方推荐所有队列都为LazyQueue模式。

控制台配置Lazy模式

image.png

代码配置Lazy模式

在利用SpringAMQP声明队列的时候,添加x-queue-mod=lazy参数也可设置队列为Lazy模式:

@Bean
public Queue lazyQueue(){return QueueBuilder.durable("lazy.queue").lazy() // 开启Lazy模式.build();
}

当然,我们也可以基于注解来声明队列并设置为Lazy模式:

@RabbitListener(queuesToDeclare = @Queue(name = "lazy.queue",durable = "true",					arguments = @Argument(name = "x-queue-mode", value = "lazy")
))
public void listenLazyQueue(String msg){
log.info("接收到 lazy.queue的消息:{}", msg);
}

更新已有队列为lazy模式

可以基于命令行设置policy:

rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues

当然,也可以在控制台配置policy,进入在控制台的Admin页面,点击Policies,即可添加配置:
image.png

消费者的可靠性

  • 消息投递的过程中出现了网络故障
  • 消费者接收到消息后突然宕机
  • 消费者接收到消息后,因处理不当导致异常

消费者确认机制

当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态

  • ack:成功处理消息,RabbitMQ从队列中删除该消息
  • nack:消息处理失败,RabbitMQ需要再次投递消息
  • reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息

SpringAMQP帮我们实现了消息确认。并允许我们通过配置文件设置ACK处理方式

  • none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用

  • manual:手动模式。需要自己在业务代码中调用api,发送ack或reject,存在业务入侵,但更灵活

  • auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回

    • 如果是业务异常,会自动返回nack;
    • 如果是消息处理或校验异常,自动返回reject;

通过下面的配置可以修改SpringAMQP的ACK处理方式:

spring:rabbitmq:listener:simple:acknowledge-mode: auto # 不做处理

失败重试机制

当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次requeue到队列,再次投递,直到消息处理成功为止。

当然,上述极端情况发生的概率还是非常低的,不过不怕一万就怕万一。为了应对上述情况Spring又提供了消费者失败重试机制:在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。

修改consumer服务的application.yml文件,添加内容:

spring:rabbitmq:listener:simple:retry:enabled: true # 开启消费者失败重试initial-interval: 1000ms # 初识的失败等待时长为1秒multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

重启consumer服务,重复之前的测试。可以发现:

  • 消费者在失败后消息没有重新回到MQ无限重新投递,而是在本地重试了3次
  • 本地重试3次以后,抛出了AmqpRejectAndDontRequeueException异常。查看RabbitMQ控制台,发现消息被删除了,说明最后SpringAMQP返回的是reject

结论:

  • 开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试
  • 重试达到最大次数后,Spring会返回reject,消息会被丢弃

失败处理策略

Spring允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由MessageRecovery接口来定义的

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机(推荐)

比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。

在consumer服务中定义处理失败消息的交换机和队列
定义一个RepublishMessageRecoverer,关联队列和交换机
image.png

/*** 错误消息配置类,用于配置 RabbitMQ 错误消息处理相关的 Bean。* 当前类会根据 spring.rabbitmq.listener.simple.retry.enabled 属性的值来决定是否创建相关的 Bean。* 如果该属性值为 true,则会创建错误消息交换机、错误队列和绑定关系,并配置消息恢复器。*/
@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorMessageConfig {/*** 创建错误消息交换机*/@Beanpublic DirectExchange errorMessageExchange(){return new DirectExchange("error.direct");}/*** 创建错误队列*/@Beanpublic Queue errorQueue(){return new Queue("error.queue", true);}/*** 创建错误队列与错误消息交换机的绑定关系,"error"是路由键*/@Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");}/*** 创建消息恢复器*/@Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");}
}

消费者如何保证消息一定被消费?

  • 开启消费者确认机制为auto, 由spring确认消息处理成功后返回ack, 异常时返回nack
  • 开启消费者失败重试机制, 并设置MessageRecoverer, 多次重试失败后将信息投递到异常交换机

业务幂等性

image.png
保证消息处理的幂等性。这里给出两种方案:

  • 唯一消息ID
  • 业务状态判断

唯一消息ID

  • 每一条消息都生成一个唯一的id,与消息一起投递给消费者。
  • 消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库
  • 如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。

SpringAMQP的MessageConverter自带了MessageID的功能,我们只要开启这个功能即可
以Jackson的消息转换器为例( 加在启动类 ):

@Bean
public MessageConverter messageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息jjmc.setCreateMessageIds(true);// 在底层会自动创建一个UUIDreturn jjmc;
}

在Spring AMQP中,当使用Jackson2JsonMessageConverter并开启setCreateMessageIds(true)功能时,底层会自动在消息的属性中添加一个名为amqp_messageId的字段,其值为自动生成的UUID。
具体来说,UUID会成为消息的一部分,保存在消息的AMQP(Advanced Message Queuing Protocol)属性中。这些属性是与消息一起传递的元数据,包含了关于消息的一些信息。amqp_messageId字段就是用于唯一标识消息的UUID。
当消息被发送给消费者时,消费者可以通过**message.getMessageProperties().getMessageId()**方法来获取消息的ID,然后根据业务需求将该ID保存到数据库中。在处理相同消息时,消费者可以在数据库中查询是否存在相同的消息ID,以判断是否为重复消息。

业务判断

image.png
image.png
相比较而言,消息ID的方案需要改造原有的数据库,所以我更推荐使用业务判断的方案。

@Override
public void markOrderPaySuccess(Long orderId) {
// UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1
lambdaUpdate()
.set(Order::getStatus, 2)
.set(Order::getPayTime, LocalDateTime.now())
.eq(Order::getId, orderId)
.eq(Order::getStatus, 1)
.update();
}

上述代码逻辑上符合了幂等判断的需求,但是由于判断和更新是两步动作,因此在极小概率下可能存在线程安全问题。

@Override
public void markOrderPaySuccess(Long orderId) {
// UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1
lambdaUpdate()
.set(Order::getStatus, 2)
.set(Order::getPayTime, LocalDateTime.now())
.eq(Order::getId, orderId)
.eq(Order::getStatus, 1)
.update();
}

image.png

兜底方案

既然MQ通知不一定发送到交易服务,那么交易服务就必须自己主动去查询支付状态。这样即便支付服务的MQ通知失败,我们依然能通过主动查询来保证订单状态的一致。

image.png

图中黄色线圈起来的部分就是MQ通知失败后的兜底处理方案,由交易服务自己主动去查询支付状态。

综上,支付服务与交易服务之间的订单状态一致性是如何保证的?

  • 首先,支付服务会正在用户支付成功以后利用MQ消息通知交易服务,完成订单状态同步。
  • 其次,为了保证MQ消息的可靠性,我们采用了生产者确认机制、消费者确认、消费者失败重试等策略,确保消息投递的可靠性
  • 最后,我们还在交易服务设置了定时任务,定期查询订单支付状态。这样即便MQ通知失败,还可以利用定时任务作为兜底方案,确保订单支付状态的最终一致性。

延迟消息

image.png

死信交换机(不推荐使用)

当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):

  • 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false
  • 消息是一个过期消息,超时无人消费
  • 要投递的队列消息满了,无法投递

死信交换机有什么作用呢?

  • 收集那些因处理失败而被拒绝的消息
  • 收集那些因队列满了而被拒绝的消息
  • 收集因TTL(有效期)到期的消息

延迟消息

RabbitMQ的消息过期是基于追溯方式来实现的,也就是说当一个消息的TTL到期以后不一定会被移除或投递到死信交换机,而是在消息恰好处于队首时才会被处理。

当队列中消息堆积很多的时候,过期消息可能不会被按时处理,因此你设置的TTL时间不一定准确。

DelayExchange插件(推荐)

插件下载地址:

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

安装:

因为我们是基于Docker安装,所以需要先查看RabbitMQ的插件目录对应的数据卷。

docker volume inspect mq-plugins

结果如下:
image.png

[{"CreatedAt": "2024-01-19T09:22:59+08:00","Driver": "local","Labels": null,"Mountpoint": "/var/lib/docker/volumes/mq-plugins/_data","Name": "mq-plugins","Options": null,"Scope": "local"}
]

插件目录被挂载到了/var/lib/docker/volumes/mq-plugins/_data这个目录,我们上传插件到该目录下。

接下来执行命令,启用插件:

docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange

声明延迟交换机

基于注解方式:

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "delay.queue", durable = "true"),exchange = @Exchange(name = "delay.direct", delayed = "true"),key = "delay"	
))
public void listenDelayMessage(String msg){
log.info("接收到delay.queue的延迟消息:{}", msg);
}

基于@Bean的方式:

package com.itheima.consumer.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Slf4j
@Configuration
public class DelayExchangeConfig {@Beanpublic DirectExchange delayExchange(){return ExchangeBuilder.directExchange("delay.direct") // 指定交换机类型和名称.delayed() // 设置delay的属性为true.durable(true) // 持久化.build();}@Beanpublic Queue delayedQueue(){return new Queue("delay.queue");}@Beanpublic Binding delayQueueBinding(){return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay");}
}

发送延迟消息

发送消息时,必须通过x-delay属性设定延迟时间:

@Test
void testPublisherDelayMessage() {// 1.创建消息String message = "hello, delayed message";// 2.发送消息,利用消息后置处理器添加消息头rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {// 添加延迟消息属性message.getMessageProperties().setDelay(5000);return message;}});
}

可以写一个延迟时间的类, 不用每次都new一个,具体代码如下:
image.png

/*** @author Ccoo* 2024/1/22*/
@RequiredArgsConstructor
public class DelayMessageProcessor implements MessagePostProcessor {private final int delay;@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setDelay(delay);return message;}
}

最后上面的业务代码变为:

@Test
void testPublisherDelayMessage() {// 1.创建消息String message = "hello, delayed message";// 2.发送消息,利用消息后置处理器添加消息头rabbitTemplate.convertAndSend("delay.direct", "delay", message,new DelayMessageProcessor(message.removeNextDelay().intValue());
} 

延迟消息插件内部会维护一个本地数据库表,同时使用Elang Timers功能实现计时。如果消息的延迟时间设置较长,可能会导致堆积的延迟消息非常多,会带来较大的CPU开销,同时延迟消息的时间会存在误差。因此,不建议设置延迟时间过长的延迟消息

超时订单问题

image.png

由于我们要多次发送延迟消息,因此需要先定义一个记录消息延迟时间的消息体,处于通用性考虑,我们将其定义到hm-common模块下:

image.png

@Data
public class MultiDelayMessage<T> {/*** 消息体*/private T data;/*** 记录延迟时间的集合*/private List<Long> delayMillis;public MultiDelayMessage(T data, List<Long> delayMillis) {this.data = data;this.delayMillis = delayMillis;}public static <T> MultiDelayMessage<T> of(T data, Long ... delayMillis){return new MultiDelayMessage<>(data, CollUtils.newArrayList(delayMillis));}/*** 获取并移除下一个延迟时间* @return 队列中的第一个延迟时间*/public Long removeNextDelay(){return delayMillis.remove(0);}/*** 是否还有下一个延迟时间*/public boolean hasNextDelay(){return !delayMillis.isEmpty();}
}

定义常量

image.png

/*** @author Ccoo* 2024/1/22*/
public interface MqConstants {String DELAY_EXCHANGE = "trade.delay.topic";String DELAY_ORDER_QUEUE = "trade.order.delay.queue";String DELAY_ORDER_ROUTING_KEY = "order.query";
}

抽取共享mq配置

在nacos中定义一个名为shared-mq.xml的配置文件,内容如下:

spring: rabbitmq:host: ${hm.mq.host:192.168.164.128} # 主机名port: ${hm.mq.port:5672} # 端口virtual-host: ${hm.mq.vhost:/hmall} # 虚拟主机username: ${hm.mq.un:itheima} # 用户名password: ${hm.mq.pw:123321} # 密码listener:simple:prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

在trade-service模块添加共享配置:
image.png

改造下单业务

  1. 引入依赖

在trade-service模块的pom.xml中引入amqp的依赖:

<!--amqp--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
  1. 改造下单业务

image.png

编写查询支付状态接口

首先,在hm-api模块定义三个类:

image.png

说明:

  • PayOrderDTO:支付单的数据传输实体
  • PayClient:支付系统的Feign客户端
  • PayClientFallback:支付系统的fallback逻辑
@Data
@ApiModel(description = "支付单数据传输实体")
public class PayOrderDTO {@ApiModelProperty("id")private Long id;@ApiModelProperty("业务订单号")private Long bizOrderNo;@ApiModelProperty("支付单号")private Long payOrderNo;@ApiModelProperty("支付用户id")private Long bizUserId;@ApiModelProperty("支付渠道编码")private String payChannelCode;@ApiModelProperty("支付金额,单位分")private Integer amount;@ApiModelProperty("付类型,1:h5,2:小程序,3:公众号,4:扫码,5:余额支付")private Integer payType;@ApiModelProperty("付状态,0:待提交,1:待支付,2:支付超时或取消,3:支付成功")private Integer status;@ApiModelProperty("拓展字段,用于传递不同渠道单独处理的字段")private String expandJson;@ApiModelProperty("第三方返回业务码")private String resultCode;@ApiModelProperty("第三方返回提示信息")private String resultMsg;@ApiModelProperty("支付成功时间")private LocalDateTime paySuccessTime;@ApiModelProperty("支付超时时间")private LocalDateTime payOverTime;@ApiModelProperty("支付二维码链接")private String qrCodeUrl;@ApiModelProperty("创建时间")private LocalDateTime createTime;@ApiModelProperty("更新时间")private LocalDateTime updateTime;
}
@FeignClient(value = "pay-service", fallbackFactory = PayClientFallback.class)
public interface PayClient {/*** 根据交易订单id查询支付单* @param id 业务订单id* @return 支付单信息*/@GetMapping("/pay-orders/biz/{id}")PayOrderDTO queryPayOrderByBizOrderNo(@PathVariable("id") Long id);
}
@Slf4j
public class PayClientFallback implements FallbackFactory<PayClient> {@Overridepublic PayClient create(Throwable cause) {return new PayClient() {@Overridepublic PayOrderDTO queryPayOrderByBizOrderNo(Long id) {return null;}};}
}

最后,在pay-service模块的PayController中实现该接口:

@ApiOperation("根据id查询支付单")
@GetMapping("/biz/{id}")
public PayOrderDTO queryPayOrderByBizOrderNo(@PathVariable("id") Long id){PayOrder payOrder = payOrderService.lambdaQuery().eq(PayOrder::getBizOrderNo, id).one();return BeanUtils.copyBean(payOrder, PayOrderDTO.class);
}

消息监听

接下来,我们在trader-service编写一个监听器,监听延迟消息,查询订单支付状态:
image.png

@Slf4j
@Component
@RequiredArgsConstructor
public class OrderStatusListener {private final IOrderService orderService;private final PayClient payClient;private final RabbitTemplate rabbitTemplate;@RabbitListener(bindings = @QueueBinding(value = @Queue(name = MqConstants.DELAY_ORDER_QUEUE, durable = "true"),exchange = @Exchange(name = MqConstants.DELAY_EXCHANGE, type = ExchangeTypes.TOPIC),key = MqConstants.DELAY_ORDER_ROUTING_KEY))public void listenOrderCheckDelayMessage(MultiDelayMessage<Long> msg) {// 1.获取消息中的订单idLong orderId = msg.getData();// 2.查询订单,判断状态:1是未支付,大于1则是已支付或已关闭Order order = orderService.getById(orderId);if (order == null || order.getStatus() > 1) {// 订单不存在或交易已经结束,放弃处理return;}// 3.可能是未支付,查询支付服务PayOrderDTO payOrder = payClient.queryPayOrderByBizOrderNo(orderId);if (payOrder != null && payOrder.getStatus() == 3) {// 支付成功,更新订单状态orderService.markOrderPaySuccess(orderId);return;}// 4.确定未支付,判断是否还有剩余延迟时间if (msg.hasNextDelay()) {// 4.1.有延迟时间,需要重发延迟消息,先获取延迟时间的int值int delayVal = msg.removeNextDelay().intValue();// 4.2.发送延迟消息rabbitTemplate.convertAndSend(MqConstants.DELAY_EXCHANGE, MqConstants.DELAY_ORDER_ROUTING_KEY, msg,message -> {message.getMessageProperties().setDelay(delayVal);return message;});return;}// 5.没有剩余延迟时间了,说明订单超时未支付,需要取消订单orderService.cancelOrder(orderId);}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/366568.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

springboot数字化医院产科系统源码

目录 一、系统概述 二、开发环境 三、功能设计 四、功能介绍 一、系统概述 数字化产科是为医院产科量身定制的信息管理系统。它管理了孕妇从怀孕开始到生产结束42天一系列医院保健服务信息。该系统由门诊系统、住院系统、数据统计模块三部分组成&#xff0c;与医院HIS、LI…

智能版面设计:指令跟随模型在自动布局规划中的应用

在广告行业一个吸引人的视觉布局能够显著提升信息的传播效果。但对于非专业设计师来说&#xff0c;创建既美观又功能性强的布局常常是一项挑战。他们往往缺乏必要的设计技能、审美训练或资源来快速实现创意构想。传统的设计软件和在线工具虽然提供了一些模板和指导&#xff0c;…

让 SwiftUI 原生 TabView 支持标签页切换转场动画

功能需求 何曾几时,秃头码农们多么希望 SwiftUI 里原生 TabView 的标签页切换能够有转场动画效果。 如上图所示,我们在 SwiftUI 原生 TabView 视图的标签页切换时展现出美美哒的转场动画,本实现支持最新的 iPadOS 18 和 iOS 18 系统。这是怎么做到的呢? 在本篇博文中,您…

Windows 组策略编辑器怎么打开,这两种方法你必须知道

组策略编辑器&#xff08;Group Policy Editor, 简称 GPEdit.msc&#xff09;是 Windows 操作系统中一个强大的工具&#xff0c;主要用于管理和配置系统设置、安全选项、用户权限等&#xff0c;尤其适用于企业环境中批量部署和管理策略。 尽管家庭版 Windows&#xff08;如 Win…

文章智能改写工具哪个好?什么文都能改的智能写作工具

在学术探索的广袤海域中&#xff0c;撰写论文是一项考验我们知识深度和创新能力的必经之路。 我们可能会在这片海洋中遇到内容雷同、创意匮乏的暗礁。但不必惊慌&#xff0c;免费智能改写工具就像一盏导航明灯&#xff0c;照亮我们前行的道路。 今天&#xff0c;让我们一起探…

【postgresql】数据库操作

创建数据库 使用 CREATE DATABASE SQL 语句来创建 语法&#xff1a; CREATE DATABASE dbname; 使用 createdb 命令来创建 语法&#xff1a; createdb [option...] [dbname [description]] 参数说明&#xff1a; dbname&#xff1a;要创建的数据库名。 description&…

win11电源设置

把钩子去掉以后 win11的电脑关机才有用 否则&#xff0c;关机了&#xff0c;电脑也实际上一直在运行

计算机网络之入门

1.网络的发展 1.1计算机网络定义 计算机网络是以共享资源&#xff08;硬件、软件和数据等&#xff09;为目的而连接起来的、在协议控制下&#xff0c;由一台或多台计算机、若干台终端设备、数据传输设备等组成的系统之集合。 这些计算机系统应当具有独立自治的能力&#xff…

fastapi swagger js css 国内访问慢问题解决

fastapi swagger js css 国内访问慢问题解决 直接修改fastapi包中静态资源地址为如下地址 swagger_js_url: str "https://cdn.bootcdn.net/ajax/libs/swagger-ui/3.9.3/swagger-ui-bundle.js", swagger_css_url: str "https://cdn.bootcdn.net/ajax/libs/sw…

什么方法能快速分享视频给他人?视频二维码提供预览的制作技巧

现在想要分享一个或者多个视频时&#xff0c;很多人会选择将视频生成二维码的方法来展现视频内容&#xff0c;通过这种方式可以让多人同时扫码查看同一个视频&#xff0c;有效提升其他人获取内容的速度及视频传播的效率。那么视频转换成二维码的方法是什么样的呢&#xff1f; …

深入解析:Java爬虫的本质是什么?

深入解析&#xff1a;Java爬虫的本质是什么&#xff1f; 引言&#xff1a; 随着互联网的快速发展&#xff0c;获取网络数据已成为许多应用场景中的重要需求。而爬虫作为一种自动化程序&#xff0c;能够模拟人类浏览器的行为&#xff0c;从网页中提取所需信息&#xff0c;成为了…

STM32存储左右互搏 模拟U盘桥接QSPI总线FATS读写FLASH W25QXX

STM32存储左右互搏 模拟U盘桥接QSPI总线FATS读写FLASH W25QXX STM32的USB接口可以模拟成为U盘&#xff0c;通过FATS文件系统对连接的存储单元进行U盘方式的读写。 这里介绍STM32CUBEIDE开发平台HAL库模拟U盘桥接Quad SPI总线FATS读写W25Q各型号FLASH的例程。 FLASH是常用的一种…

通过百度文心智能体创建STM32编程助手-实操

一、前言 文心智能体平台AgentBuilder 是百度推出的基于文心大模型的智能体&#xff08;Agent&#xff09;平台&#xff0c;支持广大开发者根据自身行业领域、应用场景&#xff0c;选取不同类型的开发方式&#xff0c;打造大模型时代的产品能力。开发者可以通过 prompt 编排的…

开放签电子签章,让签字有迹可循

开放签&#xff08;企业版&#xff09;V2.0.5版本上线后&#xff0c;系统支持一键查询电子文件的签署操作记录&#xff0c;支持一键生成详细的签署记录报告&#xff0c;详细请看下图&#xff1a; 1、操作记录详情&#xff1a; 从合同发起、填写、签署、撤销等环节全流程展示操…

由监官要求下架docker hub镜像导致无法正常拉取镜像

问题&#xff1a;下载docker镜像超时 error pulling image configuration: download failed after attempts6: dial tcp 202.160.128.205:443: i/o timeout解决办法&#xff1a;配置daemon.json [rootbogon aihuidi]# cat /etc/docker/daemon.json {"registry-mirrors&qu…

[Information Sciences 2023]用于假新闻检测的相似性感知多模态提示学习

推荐的一个视频&#xff1a;p-tuning P-tunning直接使用连续空间搜索 做法就是直接将在自然语言中存在的词直接替换成可以直接训练的输入向量。本身的Pretrained LLMs 可以Fine-Tuning也可以不做。 这篇论文也解释了为什么很少在其他领域结合知识图谱的原因&#xff1a;就是因…

怎么压缩pdf文件大小,如何压缩pdf文件大小

pdf文件怎么压缩&#xff1f;在当下这个信息爆炸的时代&#xff0c;无论是在工作场所还是校园中&#xff0c;我们经常会面临需要处理大文件的情况&#xff0c;而PDF格式作为一种保留文档结构和布局完整性的理想选择&#xff0c;有时候pdf文件太大&#xff0c;因此&#xff0c;对…

Linux登录界面

Linux登录界面 1. 起因2. 脚本3. 效果 1. 起因 某次刷抖音看到一个博主展示了一个登录页面,觉得蛮好看的.于是自己动手也写一个 2. 脚本 编写脚本/usr/local/bin/login.sh #!/bin/bash Current_timedate %Y-%m-%d %H:%M:%S Versioncat /etc/redhat-release Kernel_Version…

为什么这几年参加PMP考试的人越来越多

参加PMP认证的人越来越多的原因我认为和社会发展、职场竞争、个人提升等等方面有着不小的关系。国际认证与国内认证的性质、发展途径会有一些区别&#xff0c;PMP引进到中国二十余年&#xff0c;报考人数持增长状态也是正常的。 具体可以从下面这几个点来展开论述。 市场竞争…

SSM OA办公系统19159

SSM OA办公系统 摘 要 随着现代信息技术的快速发展以及企业规模不断扩大&#xff0c;实现办公线上流程自动化已成为提升企业核心竞争力的关键。本文主要介绍的是利用Spring、SpringMVC和MyBatis&#xff08;简称为&#xff1a;SSM&#xff09;框架&#xff0c;MySQL数据库等先…