RabbitMQ(二)

二、高级特性、应用问题以及集群搭建

高级特性

1.消息的可靠性投递

在使用RabbitMQ的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。
rabbitMQ整个消息投递的路径为:
producer -> rabbitMQ broker -> exchange -> queue -> consumer

  • confirm确认模式
    confirm确认模式是再producer传递给exchange过程中控制消息的模式,当消息成功的从producer传递到了exchange,那么则会返回一个 confirmCallBack() 回调函数
  • return 退回模式
    return退回模式是指消息从exchange传递给queue过程中消息传递失败,则会返回一个returnCallBack() 回调函数

1.1 confirm确认模式的代码编写:

因为确认模式是producer到exchange,所以代码和配置修改应该写在生产者的模块中。
一步:开启确认模式

新版本的rabbitmq弃用了publish-confirms:true,可以改用
publisher-confirm-type: correlated实现同样的效果

spring:rabbitmq:password: heimausername: heimaport: 5673virtual-host: itcasthost: 1.12.244.105#开启确认模式publisher-confirm-type: correlated

二步:编写confirmCallBack()函数
回调函数confirm()的返回值在发送消息成功时ack为true,但是我遇到一个问题,就是消息发送成功了,在队列中也能看到,但是返回值ack为false,

clean channel shutdown;

这是因为convertAncSend()方法结束后rabbitMQ的资源也就关闭了,所以就算成功了,回调函数返回值也是false;所以我们在后面强制睡眠200ms,让资源晚点关闭,这样的话得到的ack就是true了

package com.rabbitmq.springboot_mqproducer;import com.rabbitmq.springboot_mqproducer.rabbitMQconfig.MQConfig;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;@SpringBootTest
class SpringbootMqProducerApplicationTests {@ResourceRabbitTemplate rabbitTemplate;@Testvoid contextLoads() throws InterruptedException {rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {/**** @param correlationData 相关的配置信息* @param b 消息是否发送成功* @param s 消息发送失败原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {System.out.println("confirm方法被执行了");System.out.println(b);if(b){System.out.println("消息从producer -> exchange成功");System.out.println("失败原因:" + s);}else{System.out.println("消息从producer -> exchange失败");System.out.println("失败原因:" + s);}}});rabbitTemplate.convertAndSend(MQConfig.EXCHANGE_NAME,"test.hello","测试springboot整合交换机");Thread.sleep(200);}
}

结果:
在这里插入图片描述

1.2 return回退模式的代码编写

一步:开启回退模式

spring:rabbitmq:password: heimausername: heimaport: 5673virtual-host: itcasthost: 1.12.244.105#开启确认模式publisher-confirm-type: correlated#开启回退模式publisher-returns: true

二步:编写returnCallBack()函数
三步:设置exchange处理消息的模式
setMandatory为true,如果消息没有到队列queue,则返回消息给发送方
setMandatory为false,如果消息没有到队列queue,则丢弃消息(默认)

package com.rabbitmq.springboot_mqproducer;import com.rabbitmq.springboot_mqproducer.rabbitMQconfig.MQConfig;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.propertyeditors.StringArrayPropertyEditor;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;@SpringBootTest
class SpringbootMqProducerApplicationTests {@ResourceRabbitTemplate rabbitTemplate;@Testvoid contextLoads() throws InterruptedException {//编写confirm回调函数rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {/**** @param correlationData 相关的配置信息* @param b 消息是否发送成功* @param s 消息发送失败原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {System.out.println("confirm方法被执行了");System.out.println(b);if(b){System.out.println("消息从producer -> exchange成功");System.out.println("失败原因:" + s);}else{//消息发送失败,需要做一些处理System.out.println("消息从producer -> exchange失败");System.out.println("失败原因:" + s);}}});//编写return回调函数rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {System.out.println("return回退模式回调函数执行了");System.out.println("消息:"+returnedMessage.getMessage());System.out.println("exchange:"+returnedMessage.getExchange());System.out.println("replyCode:"+returnedMessage.getReplyCode());System.out.println("replyText:"+returnedMessage.getReplyText());System.out.println("routingKey:"+returnedMessage.getRoutingKey());}});//设置回退模式中,exchange处理消息的方式/*当将mandatory设置为false(默认值),如果RabbitMQ无法将消息路由,消息将会被静默丢弃,生产者不会收到通知。当设置mandatory为true时,意味着消息被视为"mandatory",如果在发布消息时RabbitMQ无法将消息路由到任何队列(例如由于没有匹配的队列与指定的路由键),则代理将通过调用ReturnListener回调的returnedMessage方法将消息返回给生产者(发布者)。生产者可以根据需要适当地处理这个返回的消息,例如记录日志或执行某些恢复操作。*/rabbitTemplate.setMandatory(true);//TODO 这里把routingKey写错,是为了让交换机找不到queue,从而触发returnCallBack()函数rabbitTemplate.convertAndSend(MQConfig.EXCHANGE_NAME,"testtttt.hello","测试springboot整合交换机");Thread.sleep(200);}}

消息的可靠投递小结:

  • 设置配置publisher-confirm-type: correlated开启确认模式
  • 使用rabbitTemplate.setConfirmCallback设置回调函数。当消息发送到exchange后回调confirm方法。在方法中判断ack,如果为true, 则发送成功,如果为false,则发送失败,需要处理。
  • 设置ConnectionFactory的publisher-returns="true"开肩退回模式。
  • 使用rabbitTemplate.setReturnCallback设置退回函数,当消息从exchange路由到queue失败后,如果设置了rabbitTemplate.setMandatory(true)参数,则会将消息退回给producer。并执行回调函数returnedMessage。
  • 在RabbitMQ中也提供了事务机制,但是性能较差,此处不做讲解。
    使用channel下列方法,完成事务控制:
    txSelect(),用于将当前channel设置成transaction模式
    txCommit(),用于提交事务
    txRollback(),用于回滚事务

2.Consumer Ack

ack指Acknowledge,确认。表示消费端收到消息后的确认方式。
有三种确认方式:

  • 自动确认:acknowledge=“none”
  • 手动确认:acknowledge=“manual”
  • 根据异常情况确认:acknowledge=“auto”(这种方式很麻烦,不做讲解)

其中自动确认是指,当消息一旦被Consumer接收到, 则自动确认收到,并将相应message从RabbitMQ的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck() 手动签收,如果出现异常,则调用channel.basicNack() 方法,让其自动重新发送消息。

代码编写:
发送消息的生产者端代码不用变,只需要能够发送消息就行
消费者端:
一步:编写yml配置文件

spring:rabbitmq:username: heimapassword: heimavirtual-host: itcasthost: 1.12.244.105port: 5673#设置消息为手动签收listener:simple:acknowledge-mode: manual #消费者端确认模式:none自动确认 manual手动确认 auto通过抛出异常的类型,来做响应的处理concurrency: 1 #当前监听的数量max-concurrency: 5 #最大监听数量retry:enabled: true #是否支持重试max-attempts: 4 #最大重试次数,默认为3

二步:编写消费者代码
消费者端创建一个listener并实现ChannelAwareMessageListener接口(其实也可以不实现该接口,只要 @RabbitListener 标记的方法,或者 @RabbitListener 标记的类+ @RabbitHandler 标记的方法的参数列表有[com.rabbitmq.client.Channel]和[org.springframework.amqp.core.Message]两个参数,都可以)

package com.rabbit.springboot_mqconsumer;import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;/*** @author Watching* * @date 2023/7/19* * Describe:*/
@Component
public class RabbitMQListener implements ChannelAwareMessageListener {
//    @RabbitListener(queues = {"boot_topic_queue"})//填写队列名称,可以以字符串数组的方式监听多个队列
//    public void listener(Message message){
//        System.out.println("message:"+message);
//    }/*** 使用ChannelAwareMessageListener监听器接口中的onMessage()方法来充当消费者,如果上面注释的方法与当前方法同时存在,一条消息只会被消费一次。不会被两个方法都消费** @param message* @param channel* @throws Exception Consumer ACK机制:*                   1.设置手动签收。acknowledge= "manual”*                   2.让监听器类实现ChannelAwareMessageListener接口*                   3.如果消息成功处理,则调用channel的basicAck()签收*                   4.如果消息处理失败,则调用channel的basicNack( )拒绝签收,broker重新发送给consumer*/@RabbitListener(queues = "boot_topic_queue" )@Overridepublic void onMessage(Message message, Channel channel) throws Exception {try{//1.接收消息System.out.println("message:" + message);System.out.println("channel:" + channel);//2.处理业务逻辑System.out.println("模拟处理业务逻辑......");//3.手动签收/*void basicAck(long deliveryTag, boolean multiple) throws IOException;deliveryTag:当消费者接收到一条消息后,RabbitMQ 会为该消息分配一个唯一的 DeliveryTag。这个 DeliveryTag 是一个64位的长整型数值,并且只在该 Channel 内唯一,即相同 Channel 下的 DeliveryTag 不会重复。multiple:当 multiple 设置为 false 时,表示只确认当前指定的 deliveryTag 对应的一条消息。也就是说,只确认指定的单个消息已经成功被处理或处理失败。当 multiple 设置为 true 时,表示确认当前指定的 deliveryTag 及其之前所有未确认的消息(在同一个 Channel 下)。也就是说,会一次性确认多条消息的处理状态,将 deliveryTag 小于或等于指定 deliveryTag 的所有消息都确认处理了。这种批量确认的机制有助于提高消息的处理效率,特别是当消费者处理多条消息时,可以通过一次性确认多条消息的方式来减少网络开销和消费者端的负担。在使用 channel.basicAck(deliveryTag, multiple) 和 channel.basicNack(deliveryTag, multiple, requeue) 方法时,可以根据实际场景来选择是单条确认还是批量确认,以满足不同的业务需求。*/channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);System.out.println("完成手动签收");}catch(Exception e){//4.出现异常,拒绝签收/*deliveryTag:一个唯一标识消息的64位长整型数值,用于确认消息的消费状态。multiple:一个布尔类型的参数,用于决定是否批量处理多条消息。若设置为 true,则会否定当前指定 deliveryTag 及其之前的所有未确认消息;若设置为 false,则只否定当前指定 deliveryTag 对应的一条消息。requeue:一个布尔类型的参数,表示是否将消息重新放回队列。若设置为 true,则消息会被重新入队列,RabbitMQ 会再次将它发送给消费者;若设置为 false,则消息会被直接丢弃,不会重新放回队列。*/System.out.println("代码逻辑出现异常,拒收");channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,true);}}
}

只需要两步,就可以实现Consumer ack,下面我们来测试一下:
首先是正常运行的代码的结果:(业务逻辑代码无异常)
在这里插入图片描述
生产者端是用的前面测试boot整合的代码
在这里插入图片描述
然后我们来测试业务逻辑代码出错的情况,我们在业务逻辑代码处添加一个除数不能为0的异常
在这里插入图片描述
再次运行代码,一直在重试,一直再报错
在这里插入图片描述

消息的可靠性总结

1.持久化:

  • exchange要持久化
  • queue要持久化
  • message要持久化

2.生产方确认Confirm(在后续文章中会讲解如何在回调函数中进行具体的处理
3.消费方确认Ack
4. Broker高可用(集群搭建

3.消费端限流

在A系统中,每秒最多只能处理1000条请求,如果在一秒钟只能瞬间有5000条请求打入A系统,那么A系统就会崩溃,所以我们在A系统中加入一个MQ中间件,让5000个请求先发送到MQ,然后A系统再分批次的从MQ中拉取1000条请求,这样A系统就避免了崩溃的情况。
这也是我们常说的MQ的削峰功能
在这里插入图片描述
设置MQ消费限流很简单,只需要设置两个属性:

  • 确认模式设置为手动确认(在上面的Ack我们已经讲过)
  • 设置prefetch属性,prefetch = n,n就是每次从MQ中获取消息的数量
    在这里插入图片描述
    其余的消费端代码和生产者端代码不用修改。
    当设置了消费端限流后,如果从MQ中取出1条消息,消费者端没有进行确认,那么消费者端将不会再从MQ中取消息,直到消息被确认。

4.TTL

TTL全称Time To Live(存活时间/过期时间)。
当消息到达存活时间后,还没有被消费,会被自动清除。
RabbitMQ可以对消息设置过期时间,也可以对整个队列Queue设置过期时间。
举一个例子:
生活中我们在购买商品的时候会下订单,系统会提示我们要在30分钟之内付款,否则订单将会被取消。
在这里插入图片描述

Ⅰ、先在控制台模拟上面的情况

①创建一个交换机
在这里插入图片描述
②创建一个队列
在这里插入图片描述
③进入交换机exchange_ttl和队列queue_ttl进行绑定
在这里插入图片描述
④消息的发布
在这里插入图片描述
⑤在消息队列中查看
将鼠标放上ttl,就可以看到设置的时间,等时间一过,这条消息就会被自动清除。
在这里插入图片描述

Ⅱ、代码实现队列过期,和消息过期

①创建交换机,队列,以及绑定关系

package com.rabbitmq.springboot_mqproducer.rabbitMQconfig;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** @author Watching* * @date 2023/7/18* * Describe:*/
@Configuration
public class MQConfig {public static final String QUEUE_TTL_NAME = "queue_ttl";public static final String EXCHANGE_TTL_NAME = "exchange_ttl";/*
创建队列,测试ttl特性*/@Bean("test_queue_ttl")public Queue ttlQueue() {Map<String,Object> arguments = new HashMap<>();arguments.put("x-message-ttl",10000);//消息过期的时间arguments.put("x-expires",100000);//队列过期的时间//设置队列的ttl时间return QueueBuilder.durable(QUEUE_TTL_NAME).withArguments(arguments).build();//参数的属性可以在控制台上查看}/*
创建一个交换机测试队列ttl特性*/@Bean("test_exchange_ttl")public Exchange ttlExchange() {return ExchangeBuilder.topicExchange(EXCHANGE_TTL_NAME).durable(true).build();}/*绑定ttl交换机和队列*/@Beanpublic Binding ttlBinding(@Qualifier("test_exchange_ttl") Exchange exchange, @Qualifier("test_queue_ttl") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("ttl.#").noargs();}
}

在创建队列时,我们指定了x-message-ttl,使队列中的所有消息都是一个固定的时间过期
我们还可以在发送消息时,指定每条消息的过期时间。
只需要在发送方法convertAndSend()方法中添加一个消息后处理参数即可

/*MessagePostProcessor 是 Spring AMQP 中的一个接口,用于对消息进行后处理。通过实现该接口,你可以在发送消息之前对消息进行一些自定义处理,例如添加自定义的消息头、修改消息内容等。*/MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {//1.设置消息属性message.getMessageProperties().setExpiration("5000");//5000ms过期//2.返回该消息return message;}};@Testvoid testSend() throws InterruptedException {for (int i = 0; i < 10; i++) {rabbitTemplate.convertAndSend(MQConfig.EXCHANGE_TTL_NAME, "ttl.hello", "测试ttl"+i,messagePostProcessor);}Thread.sleep(200);}

小细节:
①当队列设置了x-expires和x-messgae-ttl,消息过期时间以短的为准
②当队列设置了x-messgae-ttl,且发送消息时通过消息后处理也设置了过期时间,那么消息过期时间也以短的为准。
③当十条消息中只有一条消息设置了过期时间,这条消息过期后,只有处于队列顶端,即即将被消费时,才会对这条消息是否过期做判断。

5.死信队列

5.1 概念

死信队列,英文缩写: DLX ,Dead Letter Exchange (死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。(死信队列为什么英文翻译过来使死信交换机呢?因为交换机概念只有在RabbitMQ中才有,其它MQ中间件只有队列概念,所以习惯叫死信队列,而RabbitMQ中存在交换机概念,所以叫死信交换机。)
在这里插入图片描述
在这里我们需要理解的问题有:
①消息什么时候成为死信?

  • 队列长度达到限制,比如队列最多容纳10条消息,当第11条消息进入时,这条消息就成为了死信消息。
  • 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;
  • 原队列存在消息过期设置,消息到达超时时间却并未被消费

以上三种,满足一条即为死信消息

②队列如何绑定死信交换机?
队列设置参数:x-dead-letter-exchangex-dead-letter-routing-key
x-dead-letter-exchange:死信交换机的名称
x-dead-letter-routing-key:消息发送时指定的routingKey

在这里插入图片描述

5.2 代码实现死信队列

创建死信队列:

  • 1.声明正常的队列(test_queue_dLx)和交换机(test_exchange_dlx)
  • 2.声明死信队列(queue_dlx)和死信交换机(exchange_dlx)
  • 3.正常队列绑定死信交换机,正常队列绑定死信队列不需要创建Binding Bean,只需要在正常队列创建时设置参数就可以
    – 设置两个参数:
    x-dead-letter-exchange:死信交换机名称
    x-dead-letter-routing-key:发送给死信交换机的routingkey

设置正常队列中的消息的过期时间x-message-ttl
设置正常队列的长度限制x-max-length

package com.rabbitmq.springboot_mqproducer.rabbitMQconfig;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** @author Watching* * @date 2023/7/18* * Describe:*/
@Configuration
public class MQConfig {/*** 测试死信队列*//*创建普通交换机和普通队列*/@Bean("test_exchange_dlx")public Exchange testDlxExchange() {return ExchangeBuilder.topicExchange("test_exchange_dlx").durable(true).build();}@Bean("test_queue_dlx")public Queue testDlxQueue() {Map<String,Object> map = new HashMap<>();//x-dead-letter-exchange:死信交换机名称map.put("x-dead-letter-exchange","exchange_dlx");//x-dead-letter-routing-key:发送给死信交换机的routingkeymap.put("x-dead-letter-routing-key","dlx.hehe");//这个routingkey只需要满足死信交换机的路由规则就可以//设置正常队列中的消息的过期时间ttlmap.put("x-message-ttl",10000);//设置正常队列的长度限制max-lengthmap.put("x-max_length",10);return QueueBuilder.durable("test_queue_dlx").withArguments(map).build();}@Beanpublic Binding binding1(@Qualifier("test_exchange_dlx") Exchange exchange,@Qualifier("test_queue_dlx")Queue queue){return BindingBuilder.bind(queue).to(exchange).with("test.dlx.#").noargs();}/*创建死信交换机和死信队列*/@Bean("exchange_dlx")public Exchange dlxExchange() {return ExchangeBuilder.topicExchange("exchange_dlx").durable(true).build();}@Bean("queue_dlx")public Queue dlxQueue() {return QueueBuilder.durable("queue_dlx").build();}@Beanpublic Binding binding2(@Qualifier("exchange_dlx") Exchange exchange,@Qualifier("queue_dlx")Queue queue){return BindingBuilder.bind(queue).to(exchange).with("dlx.#").noargs();}/*绑定普通队列和死信交换机,并不需要写一个Binding,只需要在普通队列中添加参数就行*/
}

发送消息测试死信消息:
1.过期时间
2.长度限制
3.消息拒收

    @Testvoid testDlx() {//1.过期时间
//        rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.hello","测试消息超出过期时间变成死信");//2.超出队列消息数量限制
//        for (int i = 0; i < 20; i++) {
//            rabbitTemplate.convertAndSend("test_exchange_dlx", "test.dlx.hello", "测试消息超出队列数量限制变成死信");
//        }//3.消费端拒收rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.hello","测试消息被拒收变成死信");}

死信队列小结:
1.死信交换机,死信队列和普通交换机,普通队列没有区别.
2.当消息成为死信后,如果该队列绑定了死信交换机,则消息会被重新路由到死信队列中
3.消息成为死信的三种情况

  • 消息在队列中到达超时时间并未被消费
  • 消息在消费者端被拒收,且设置了不重回队列
  • 队列长度存在限制,消息数量超出了限制

6.延迟队列

延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。
需求:
1.下单后,30分钟未支付,取消订单,回滚库存。
2.新用户注册成功7天后,发送短信问候。
实现方式:
1.定时器
2.延迟队列
订单系统将订单放入延迟队列种,30分钟后取出,去库存系统中判断订单是否已经支付,再进行后续的支付或者未支付操作
在这里插入图片描述
但是!
RabbitMQ官方没有提供延迟队列,所以我们需要使用ttl+死信队列构成延迟队列
普通队列设置为30min中过期,过期后消息路由到死信队列,库存系统从死信队列中取消息,这样就形成了一个延迟队列
在这里插入图片描述

代码实现延迟队列

1.定义正常交换机(order_exchange)和队列(order_queue),同时绑定
2.定义死信交换机(order_exchange_dlx) 和队列(order_queue_dlx),同时绑定
3.绑定正常队列和死信交换机,设置正常队列过期时间为10秒

    /*** 测试延迟队列*//*1.定义正常交换机(order_exchange)和队列(order_queue)*/@Bean("orderQueue")public Queue orderQueue(){//3.正常队列绑定死信交换机Map<String,Object> map = new HashMap<>();map.put("x-dead-letter-exchange","order_exchange_dlx");map.put("x-dead-letter-routing-key","dlx.order.hehe");//设置正常队列的消息过期时间map.put("x-message-ttl",10000);return QueueBuilder.durable("order_queue").withArguments(map).build();}@Bean("orderExchange")public Exchange orderExchange(){return ExchangeBuilder.topicExchange("order_exchange").build();}@Beanpublic Binding orderBinding(@Qualifier("orderQueue")Queue queue,@Qualifier("orderExchange")Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with("order.#").noargs();}/*2.定义死信交换机(order_exchange_dlx) 和队列(order_queue_dlx)*/@Bean("orderQueueDlx")public Queue orderQueueDlx(){return QueueBuilder.durable("order_queue_dlx").build();}@Bean("orderExchangeDlx")public Exchange orderExchangeDlx(){return ExchangeBuilder.topicExchange("order_exchange_dlx").build();}@Beanpublic Binding orderBindingDlx(@Qualifier("orderQueueDlx")Queue queue,@Qualifier("orderExchangeDlx")Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with("dlx.order.#").noargs();}

4.创建生产者发送消息

    /*** 测试延迟队列*/@Testvoid testDelay() throws InterruptedException {rabbitTemplate.convertAndSend("order_exchange","order.test","测试延迟队列");for (int i = 10;i > 0;i--){System.out.println(i+"...");Thread.sleep(1000);}}

5.创建消费者

package com.rabbit.springboot_mqconsumer;import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;/*** @author Watching* * @date 2023/8/2* * Describe:*/
@Component
public class OrderListener implements ChannelAwareMessageListener {@RabbitListener(queues = "order_queue_dlx")//监听死信队列@Overridepublic void onMessage(Message message, Channel channel) throws Exception {try{//1.接收messageSystem.out.println("message:"+message);//2.处理业务逻辑System.out.println("处理业务逻辑");System.out.println("根据订单id在数据库中查询订单状态");System.out.println("判断订单是否支付成功");System.out.println("未支付,回滚库存,取消订单");//3.手动签收channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);}catch (Exception e){//4.业务出错,拒绝签收channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,true);//业务出错,拒签后要将这条消息重新放回死信队列}}
}

延迟队列小结:
1.延迟队列指消息进入队列后,可以被延迟一定时间,再进行消费。
2. RabbitMQ没有提供延迟队列功能,但是可以使用: TTL + DLX来实现延迟队列效果。

应用问题

1.消息补偿

消息补偿机制

2.幂等性保障

幂等性保障

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/77949.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

springCache-缓存

SpringCache 简介&#xff1a;是一个框架&#xff0c;实现了基于注解的缓存功能&#xff0c;底层可以切换不同的cache的实现&#xff0c;具体是通过CacheManager接口实现 使用springcache,根据实现的缓存技术&#xff0c;如使用的redis,需要导入redis的依赖包 基于map缓存 …

简述静态网页和动态网页的区别。简述 Webl.0 和 Web2.0 的区别。安装tomcat8,配置服务启动脚本,部署jpress应用

静态网页和动态网页区别 静态网页和动态网页是两种常见的网页类型&#xff0c;它们在内容生成和交互方式上存在不同。 静态网页是在服务器上提前生成好的网页&#xff0c;它的内容在访问时不会发生变化。静态网页通常由HTML、CSS和JavaScript等静态文件组成&#xff0c;这些文…

无涯教程-Perl - bless函数

描述 此函数告诉REF引用的实体,它现在是CLASSNAME包中的对象,如果省略CLASSNAME,则为当前包中的对象。建议使用bless的两个参数形式。 语法 以下是此函数的简单语法- bless REF, CLASSNAMEbless REF返回值 该函数返回对祝福到CLASSNAME中的对象的引用。 例 以下是显示其…

Python web实战之 Django 的模板语言详解

关键词&#xff1a; Python、web开发、Django、模板语言 概要 作为 Python Web 开发的框架之一&#xff0c;Django 提供了一套完整的 MVC 模式&#xff0c;其中的模板语言为开发者提供了强大的渲染和控制前端的能力。本文介绍 Django 的模板语言。 1. Django 模板语言入门 Dj…

【Android】控件与布局入门 - 简易计算器

目录 1. 基础开发环境 2. 计算器的布局和相关按钮 3. 计算器的主要运算逻辑 4. APK 文件 5. 项目源码 1. 基础开发环境 JDK&#xff1a;JDK17 Android Studio&#xff1a;Android Studio Giraffe | 2022.3.1 Android SDK&#xff1a;Android API 34 Gradle: gradle-8.0-bi…

【Nginx基础】Nginx基础及安装

目录 Nginx出现背景Nginx 概念Nginx 作用Http 代理&#xff0c;反向代理负载均衡&#xff1a;内置策略和扩展策略内置策略&#xff1a;轮询内置策略&#xff1a;加权轮询内置策略&#xff1a;IP hash 动静分离 安装 NginxWindows下安装&#xff08;nginx-1.16.1&#xff09;Lin…

计算机毕设 深度学习实现行人重识别 - python opencv yolo Reid

文章目录 0 前言1 课题背景2 效果展示3 行人检测4 行人重识别5 其他工具6 最后 0 前言 &#x1f525; 这两年开始毕业设计和毕业答辩的要求和难度不断提升&#xff0c;传统的毕设题目缺少创新和亮点&#xff0c;往往达不到毕业答辩的要求&#xff0c;这两年不断有学弟学妹告诉…

EtherCAT转Profinet网关连接西门子PLC与凯福科技总线步进驱动器通讯

西门子S7-1200/1500系列的PLC&#xff0c;采用Profinet实时以太网通讯协议&#xff0c;需要连接带EtherCAT的通讯功能的伺服驱动器等设备&#xff0c;就必须进行通讯协议转换。捷米特JM-EIP-RTU系列的网关提供了&#xff0c;快速可行的解决方案 捷米特JM-ECTM-PN在PROFINET一侧…

Linux下进程的特点与环境变量

目录 进程的特点 进程特点的介绍 进程时如何实现并发性的 进程间如何切换 概念铺设 PC指针 上下文 环境变量 PATH 修改PATH HOME SHELL env 命令行参数 什么是命令行参数&#xff1f; 打印命令行参数 通过函数获得环境变量 getenv 命令行参数 env 修改环境变…

Linux从安装到实战 常用命令 Bash常用功能 用户和组管理

1.0初识Linux 1.1虚拟机介绍 1.2VMware Workstation虚拟化软件 下载CentOS; 1.3远程链接Linux系统 &FinalShell 链接finalshell半天没连接进去 他说ip adress 看IP地址是在虚拟机上 win11主机是 终端输入&#xff1a; ifconfig VMware虚拟机的设置 & ssh连接_snge…

[Pytorch]卷积运算conv2d

文章目录 [Pytorch]卷积运算conv2d一.F.Conv2d二.nn.Conv2d三.nn.Conv2d的运算过程 [Pytorch]卷积运算conv2d 一.F.Conv2d torch.nn.functional.Conv2d()的详细参数&#xff1a; conv2d(input: Tensor, weight: Tensor, bias: Optional[Tensor]None, stride: Union[_int, _s…

如何在 Android 上恢复已删除的视频|快速找回丢失的记忆

想知道是否有任何成功的方法可以从 Android 手机中检索已删除的视频&#xff1f;好吧&#xff0c;本指南将向您展示分步说明&#xff0c;让您轻松从手机中找回丢失的视频文件&#xff01; 您是否不小心从 Android 智能手机中删除了珍贵的生日视频&#xff1f;难道是无处可寻吗…

【计算机视觉|语音分离】期望在嘈杂环境中聆听:一个用于语音分离的不依赖于讲话者的“音频-视觉模型”

本系列博文为深度学习/计算机视觉论文笔记&#xff0c;转载请注明出处 标题&#xff1a;Looking to Listen at the Cocktail Party: A Speaker-Independent Audio-Visual Model for Speech Separation 链接&#xff1a;Looking to listen at the cocktail party: a speaker-in…

驱动工作原理

驱动原理 在Linux操作系统中&#xff0c;硬件驱动程序中实现对硬件直接操作&#xff0c;而用户空间&#xff0c;通过通用的系统调用接口&#xff08;open() 打开相应的驱动设备,ioctl()控制相应的功能等&#xff09;&#xff0c;实现对硬件操作&#xff0c;应用程序没有直接操作…

MySQL事务管理

MySQL事务管理 MySQL增删查改时的问题一.什么是事务&#xff1f;二.为什么会出现事务&#xff1f;三.事务的其他属性1. 事务的版本支持2. 事务的提交方式 四.事务的准备工作五.事务的操作1. 事务的正常操作2. 事务的异常验证与产出结论 六.事务的隔离级别1. 事务隔离级别概念2.…

Linux-centos花生壳实现内网穿透

Linux-centos花生壳实现内网穿透 官网教程 1.安装花生壳 下载网址 点击复制就可以复制下载命令了 wget "https://dl.oray.com/hsk/linux/phddns_5.2.0_amd64.rpm" -O phddns_5.2.0_amd64.rpm# 下载完成之后会多一个rpm文件 [rootlocalhost HuaSheng]# ls phddns_…

ios_base::out和ios::out、ios_base::in和ios::in、ios_base::app和ios::app等之间有什么区别吗?

2023年8月2日&#xff0c;周三晚上 今天我看到了这样的两行代码&#xff1a; std::ofstream file("example.txt", std::ios_base::out);std::ofstream file("example.txt", std::ios::out);这让我产生了几个疑问&#xff1a; 为什么有时候用ios_base::o…

一篇文章了解类成员定义表结构

文章目录 一篇文章了解类成员定义表结构 %Dictionary.ClassDefinition - 类定义表简介索引示例表结构 %Dictionary.ForeignKeyDefinition - 外键定义表简介索引示例表结构 %Dictionary.IndexDefinition - 索引定义表简介索引示例表结构 %Dictionary.MethodDefinition - 方法定义…

前端js--旋转幻灯片

效果图 代码 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8" /><meta name"viewport" content"widthdevice-width, initial-scale1.0" /><link rel"stylesheet" href"…

基于RASC的keil电子时钟制作(瑞萨RA)(8)----按键修改数码管时间

基于RASC的keil电子时钟制作8_按键修改数码管时间 概述硬件准备视频教程配置按键管脚按键设置主程序timer_smg.ctimer_smg.h 概述 前几节课程已经单独驱动了数码管和RTC&#xff0c;同时已经整合成了能够用数码管显示具体时间&#xff0c;但是无法修改时间&#xff0c;这节就来…