Rabbitmq使用笔记

前言

mq的优点:异步提速、解耦、流量削峰

mq的缺点: mq宕机可能导致消息丢失、消费者未成功消费如果保证整个系统架构的事务安全、消息可能被重复消费出现幂等问题、消息未被消费到引发死信问题、消息出现消费延迟或消费异常引发的顺序消费错乱问题...

mq的使用建议:系统扛不住了,扩容太贵了,不得不使用了

以下将根据官网提供的主流工作模式描述mq使用流程

一、队列生产消费模式

1、简单模式

        消费者创建

public class SimpleConsumer {public static final String QUEUE_NAME = "hello world";//队列的名称public static void consumeMessage() throws IOException, TimeoutException {//创建工厂ConnectionFactory factory = new ConnectionFactory();//工厂IP 连接rabbitmq队列factory.setHost("127.0.0.1");//设置用户名称factory.setUsername("guest");//设置密码factory.setUsername("guest");//创建连接Connection connection = factory.newConnection();//获取信道Channel channel = connection.createChannel();//声明 接受消息DeliverCallback deliverCallback = (consumerTag, message) -> {System.out.println(new String(message.getBody()));};//取消消息回调CancelCallback cancelCallback = consumerTag -> {System.out.println("消息消费被中断");};//消费者消费消息:消费哪个队列、消费成功之后是否要自动应答(true:自动应答,false:手动应答)、消费失败的回调、取消消息的回调System.out.println(channel.basicConsume(QUEUE_NAME, false, deliverCallback, cancelCallback));}
}

        生产者创建 

public class SimpleProducer {public static final String QUEUE_NAME = "hello world";//消息发送public static void buildMessage() throws IOException, TimeoutException {//创建工厂ConnectionFactory factory = new ConnectionFactory();//工厂IP 连接rabbitmq队列factory.setHost("127.0.0.1");//设置用户名称factory.setUsername("guest");//设置密码factory.setUsername("guest");//创建连接Connection connection = factory.newConnection();//获取信道Channel channel = connection.createChannel();//创建队列://    设置队列名称、是否持久化、是否共享队列消息的消费(true:共享消费,false:只有一个消费者消费)、是否自动删除(true:消息用完后自动删除、false:)、其它参数(延迟、死信消息处理用)channel.queueDeclare(QUEUE_NAME, false, false, false, null);//发送消息String message = "hello world!!!";//消息发送channel.basicPublish("", QUEUE_NAME, null, message.getBytes());}
}

2、工作队列模式

         工作队列(任务队列)主要是消费端注册多个消费者以加速消息消费,防止消息消费速度慢、延迟、死信等异常,适用于任务比较重的地方

                                        

模式特点:

1、一条消息只会被一个消费者接收;

2、rabbit采用轮询的方式将消息是平均发送给消费者的;

3、消费者在处理完某条消息后,才会收到下一条消息

代码和简单模式没区别,额外要做的就是对消费者线程多开,可以对线程做些标记观察是否真的有轮询消费

3、Pub/Sub订阅模式

与之前的模式相比,该模式可以提供消息共享,即一个消息被多个消费者消费,但是一个队列中的消息仍然只能消费一次,那如何做到的多消费呢?答案是利用交换订阅技术,即由交换机来路由消息给多个队列从而实现多次消费 

Exchange(交换机)

        交换机的工作非常简单,一方面接受来自生产者的消息,另一方面将消息推到队列。关键的点在于交配机必须精确处理消息,即把消息推到哪个队列或者是否应该丢弃

        交换机类型描述:直接(direct/route)、主题(topic)、标题(headers)、扇出(fanout)

注:比如前面代码发布消息时channel.basicPublish方法指定的exchange是""或者null,实际走的是name为default的exchange,其类型为direct,routingkey默认是队列名称

临时队列

        未设置持久配置都是临时队列,即durable参数配置的值不是durable,哦還要看生成者是否配置了autodelete,如果autodelete參數配置的值是true也是臨時隊列,因为用完就把队列删除了嘛谁还管队列是否配置了持久

        创建临时队列的快速方法:String queueName = channel.queueDeclare().getQueue()

Bind

        交换机绑定queue

        

Fanout

        扇出这种类型就是广播,我不知道谁做的中文翻译,我*你**,就是消息广播到所绑定的所有队列,和routingkey没有关系

4、Direct订阅模式        

        这个和fanout模式差不多,区别在于exchange的type是direct,需要在bind队列时设置routing key,exchange根据routing key找到bind的queue,然后路由消息到这个queue

        

5、Topic        

        这个是对direct的扩展,direct的缺点在于需要硬编码到代码,如果以后要扩展别的队列还需要去生产者手动添加新routing key,当然我们可以通过服务配置来弥补这个问题,但是维护也需要人力,而topic可以很好的解决这个耦合问题

topic模式要求

        交换机的routing key不能随意写,必须是一个单词列表,以 . 分隔,单词列表长度<=255

        还有两个替换符,*代替一个单词,#代替零或多个单词,例如:*.orange.*;lazy.# ...

二、死信队列

死信:

        死信是指无法被消费的消息,一般来说produer将消息投递到queue,consumer从queue取出消息进行消费,但是某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就是死信消息,为了保存这些死信消息就有了死信队列。

应用场景:

        为了保证订单业务的消息数据不丢失,需要使用rabbitmq的死信队列机制,当消息消费发生异常,将消息投入死信队列中。        

死信来源:

        消息ttl过期(在当前队列中的存活时间,这段时间不被处理就被丢到dead queue)

        队列达到最大长度(队列满了,无法再添加数据到mq)

        消息被拒绝(basic.reject或者basic.nack)并且requeue=false

        消费者都宕机了

三.延迟队列

        延迟队列是用来存放需要在指定时间被处理的元素的队列 

1.应用场景

        外卖订单规定在15min之内下单,否则失效

        新创建的店铺,如果十天内没有上传商品,自动发送消息提醒

        用户注册账户成功,如果三天内没有登陆进行短信提醒

        用户发起退款,如果三天内没有得到处理会通知线下服务人员

        预定会议后,在预定时间的前15分钟消息提醒参会人员

2.代码操作

2.1.框架图

创建两个队列 QA 和 QB,两个队列 TTL 分别设置为 10s 和 40s,然后再创建一个交换机 X 和死信交换机 Y,它们的类型都是 direct,创建一个死信队列 QD,它们的绑定关系如下:

2.2.配置信息 

@Configuration
public class TtlQueueConfig {/*** 普通交换机名称*/public static final String X_EXCHANGE = "X";/*** 死信交换机名称*/public static final String Y_DEAD_LETTER_EXCHANGE = "Y";/*** 普通队列名称*/public static final String QUEUE_A = "QA";public static final String QUEUE_B = "QB";/*** 死信队列名称*/public static final String DEAD_LETTER_QUEUE = "QD";/*** 声明 XExchange*/@Beanpublic DirectExchange xExchange(){return new DirectExchange(X_EXCHANGE);}/*** 声明 yExchange*/@Beanpublic DirectExchange yExchange(){return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);}/*** 声明队列QA*/@Beanpublic Queue queueA(){Map<String, Object> arguments = new HashMap<>(3);// 设置死信交换机arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);// 设置死信路由键arguments.put("x-dead-letter-routing-key", "YD");// 设置过期时间arguments.put("x-message-ttl", 10000);return new Queue(QUEUE_A, true, false, false, arguments);}/*** 声明队列QB*/@Beanpublic Queue queueB(){Map<String, Object> arguments = new HashMap<>(3);// 设置死信交换机arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);// 设置死信路由键arguments.put("x-dead-letter-routing-key", "YD");// 设置过期时间arguments.put("x-message-ttl", 40000);return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();}/*** 死信队列QD*/@Beanpublic Queue queueD(){return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();}/*** 绑定*/@Beanpublic Binding queueABindingX(@Qualifier("queueA") Queue queueA,@Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueA).to(xExchange).with("XA");}@Beanpublic Binding queueBBindingX(){return new Binding(QUEUE_B, Binding.DestinationType.QUEUE, X_EXCHANGE, "XB", null);}@Beanpublic Binding queueDBindingY(@Qualifier("queueD") Queue queueD,@Qualifier("yExchange") DirectExchange yExchange){return BindingBuilder.bind(queueD).to(yExchange).with("YD");}
}

2.3.消息生产者代码

@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMsgController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/sendMsg/{message}")public String sendMsg(@PathVariable String message){log.info("当前时间:{}发送一条消息{}给两个队列", new Date(), message);rabbitTemplate.convertAndSend("X", "XA", "消息来自TTL为10s队列QA:"+message);rabbitTemplate.convertAndSend("X", "XB", "消息来自TTL为40s队列QB:"+message);return "发送成功";}
}

2.4.消息消费者代码

@Slf4j
@Component
public class DeadLetterConsumer {@RabbitListener(queues = "QD")public void receiveD(Message message){String msg = new String(message.getBody());log.info("当前时间{},收到死信队列的消息:{}", new Date(), msg);}
}

2.5.测试

        发送一个请求 http://localhost:8080/ttl/sendMsg/嘻嘻嘻
在这里插入图片描述

第一条消息在 10s 后变成了死信消息,然后被消费者消费掉了,第二条消息在 40s 之后变成了死信消息,然后被消费掉,这样一个延时队列就完成了。

2.6.缺点

        queueA和queueB都是通过queue设置ttl,如此一来生产者无法灵活设置消息的ttl

2.7.优化

        让生产者也参与设置ttl,即生产者在生产消息时设置ttl,如此一来queue即使没有设置ttl也可以实现延时操作,扩展性大幅提高。

        【注:】如果生产者和queue同时设置ttl,则以最短的ttl为有效值

        如图,新增一个队列 QC,绑定关系如下,该队列不设置 TTL 时间

2.8.配置信息

@Component
public class MsgTtlQueueConfig {private static final String Y_DEAD_LETTER_EXCHANGE = "Y";private static final String QUEUE_C = "QC";@Bean("queueC")public Queue queueC(){Map<String, Object> arguments = new HashMap<>(2);// 声明当前队列绑定的死信交换机arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);// 声明当前队列的私信路由keyarguments.put("x-dead-letter-routing-key", "YD");return new Queue(QUEUE_C, false, false, false, arguments);}@Beanpublic Binding queueCBindingX(@Qualifier("queueC") Queue queueC,@Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueC).to(xExchange).with("XC");}
}

2.9.消费生产者

@GetMapping("/sendExpirationMsg/{message}/{ttlTime}")
public String sendMsg(@PathVariable String message, @PathVariable String ttlTime){MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setExpiration(ttlTime);return message;}};rabbitTemplate.convertAndSend("X", "XC", message, messagePostProcessor);return "发送成功";
}

        将程序执行,然后发送请求:
                http://localhost:8080/ttl/sendExpirationMsg/你好 1/20000
                http://localhost:8080/ttl/sendExpirationMsg/你好 2/2000

在这里插入图片描述
        两条消息的过期时间一致,过期时间短的那条消息,在过期时间到了以后并没有立即被消费,而是和过期时间长的那条消息一起被消费了。所以,如果使用在消息属性上设置 TTL 的方式,消息可能并不会按时“死亡”,因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。

2.10.Rabbitmq 插件实现延迟队列

        上面提到的问题,确实是一个问题,如果不能实现TTL的细粒度,并使其在设置的 TTL 时间及时死亡,就无法设计成一个通用的延时队列。

原理:

        以往的ttl都是由队列处理,而插件延迟功能是由exchange来管理,当ttl达到截至时间再发送到对应的queue

安装:

        这里不做演示,可百度去官网下载和安装对应版本的插件(好文推荐),安装完成后重启如下所示

        在这里插入图片描述 

        新增一个队列 delayed.queue,一个自定义交换机 delayed.exchange,绑定关系如下:
在这里插入图片描述

        我们自定义的交换机中,这是一种新的交换机类型,该类型消息支持延迟投递机制,消息传递后并不会立即投递到目标队列中,而是存储在 mnesia(一个分布式数据系统)表中,当达到投递时间时,才投递到目标队列中。 

2.11.配置信息 

@Configuration
public class DelayedQueueConfig {public static final String DELAYED_QUEUE_NAME = "delayed.queue";public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";public static final String DELAYED_ROUTING_KEY = "delayed.routingKey";@Bean("delayedQueue")public Queue delayedQueue(){return new Queue(DELAYED_QUEUE_NAME);}/*** 自定义交换机 定义一个延迟交换机*  不需要死信交换机和死信队列,支持消息延迟投递,消息投递之后没有到达投递时间,是不会投递给队列*  而是存储在一个分布式表,当投递时间到达,才会投递到目标队列* @return*/@Bean("delayedExchange")public CustomExchange delayedExchange(){Map<String, Object> args = new HashMap<>(1);// 自定义交换机的类型args.put("x-delayed-type", "direct");return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);}@Beanpublic Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue delayedQueue,@Qualifier("delayedExchange") CustomExchange delayedExchange){return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();}
}

2.12.生产消息

@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMsgController {@Autowiredprivate RabbitTemplate rabbitTemplate;public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";public static final String DELAYED_ROUTING_KEY = "delayed.routingKey";@GetMapping("/sendDelayMsg/{message}/{delayTime}")public String sendMsg(@PathVariable String message, @PathVariable Integer delayTime){rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, message, messagePostProcessor ->{messagePostProcessor.getMessageProperties().setDelay(delayTime);return messagePostProcessor;});log.info("当前时间:{},发送一条延迟{}毫秒的信息给队列delay.queue:{}", new Date(), delayTime, message);return "发送成功";}
}

2.13.消费消息

@Slf4j
@Component
public class DeadLetterConsumer {public static final String DELAYED_QUEUE_NAME = "delayed.queue";@RabbitListener(queues = DELAYED_QUEUE_NAME)public void receiveDelayedQueue(Message message){String msg = new String(message.getBody());log.info("当前时间:{},收到延时队列的消息:{}", new Date(), msg);}
}

发起请求:
        http://localhost:8080/ttl/sendDelayMsg/come on baby1/20000
        http://localhost:8080/ttl/sendDelayMsg/come on baby2/2000
在这里插入图片描述
        第二条消息被先消费掉了,符合预期 

2.14.总结

        延时队列在需要延时处理的场景下非常有用,使用 RabbitMQ 来实现延时队列可以很好地利用 RabbitMQ 的特性,如:消息可靠发送、消息可靠投递、死信队列,来保证消息至少被消费一次以及未被正确处理的消息不会被丢弃。另外,通过 RabbitMQ 集群的特性,可以很好要的解决单点故障问题,不会因为单个节点挂掉导致延时队列不可用或者消息丢失。 

四、发布确认 &  消息回退   

        如果rabbitmq服务器异常或者宕机,虽然队列、队列中消息可以持久化,但是消息可能还没有经交换机成功进入队列,这段消息传递的空窗期如何捕获到消息状态?

        发布确认是本次记录的一种方式(另外还有事务消息),发布确认是指rabbitmq服务在发布者和交换机这段起到状态反馈的作用,消息发送到交换机后,会反馈成功或失败状态给生产者

        消息回退是指rabbitmq服务在交换机到队列这段起到状态反馈的作用,在发布确认成功之后,交换机将消息路由到队列,如果消息路由失败则反馈给发布者失败的消息信息

作用 

        如图,核心点就是缓存,即消息的备份,通过发布确认和消息回退可以返还消息或者消息标识给生产者,生产者可以据此重新获取 & 发送消息

发布确认使用方式

        单消息发布确认:channel发布消息,然后channel执行waitForConfirms(),虽然简单但是低效,1000个简单消息处理耗时1s

        批量消息发布确认:channel发布一批消息(例如for循环发布),然后channel执行waitForConfirms(),1000个简单消息处理耗时0.15s

        异步发布确认: channel发布消息,然后channel执行 addReturnListener(ReturnCallback returnCallback),1000个简单消息处理耗时0.06s,不过需要你实现里面的两个回调接口并作为参数传入,一个是发布确认接口,另一个是不可达消息处理接口       

        springBoot下发布确认使用方式

        spring-boot-starter-amqp提供了配置信息:spring.rabbitmq.publisher-confirm-type,主要用于设置发布确认类型

        correlated:成功发布消息到交换机,可以异步触发回调

        simple:支持rabbittemplate使用waitForConfirms方法进行同步确认

        none:禁用发布模式

         好文推荐

RabbitMQ确认消息Confirm详解 - 简书 (jianshu.com)https://www.jianshu.com/p/2dd4acabfb90

(63条消息) RabbitMQ学习笔记 - mandatory参数_setmandatory_mytt_10566的博客-CSDN博客https://blog.csdn.net/mytt_10566/article/details/90741398

消息回退使用方式

        在仅开启生产者确认机制的情况下,交换机接收到消息后会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息被直接丢弃,生产者也感知不到消息被丢弃。这是一种不可靠的情况。解决方法就是设置mandatory参数,即生产者设置回退消息回调方法用来处理回退消息。

        springBoot的mandatory设置方式:spring.rabbitmq.publisher-returns=true

发布确认&回退消息代码演示

        演示架构 

                生产方演示发送消息到交换机失败、交换机路由失败等状态下回调函数的执行情况

        配置信息 

application.properties文件配置# 发布确认类型设置:correlated(关联性的,成功发布消息到交换机,可以触发回调)、none(禁用发布模式)、simple(支持rabbittemplate使用waitForConfirms方法进行同步确认)
spring.rabbitmq.publisher-confirm-type=correlated# 回退消息开启
spring.rabbitmq.publisher-returns=true
# mandatory代表强制发送: 即使交换机路由失败也会再次尝试,如果失败则回调ReturnCallback
spring.rabbitmq.template.mandatory=true
@Configuration
public class ConfirmConfig {//-----------------------------------------------发布确认配置----------------------------------------//交换机public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";//队列public static final String CONFIRM_QUEUE_NAME = "confirm_queue";//routing keypublic static final String CONFIRM_ROUTING_KEY = "confirm_routing_key";//声明交换机
//    创建发布确认交换机@Bean("confirmExchange")public DirectExchange confirmExchange(){return new DirectExchange(CONFIRM_EXCHANGE_NAME);}@Bean("confirmQueue")public Queue confirmQueue(){return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();}@Beanpublic Binding queueBindingExchange(@Qualifier("confirmQueue") Queue confirmQueue, @Qualifier("confirmExchange") DirectExchange exchange){return BindingBuilder.bind(confirmQueue).to(exchange).with(CONFIRM_ROUTING_KEY);}}

         回调接口实现

@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {@Autowiredprivate RabbitTemplate rabbitTemplate;@PostConstructpublic void init(){//注入发布确认回调接口rabbitTemplate.setConfirmCallback(this);
//        注入失败回退回调接口rabbitTemplate.setReturnsCallback(this);}/*** 交换机确认回调方法* 1.发送消息 交换机收到消息 回调*  correlationData 保存回调消息的ID和相关信息*  交换机收到消息 ack = true*  cause null* 2.发送消息 交换机接受失败 回调*  correlationData 保存回调消息的ID和相关信息*  交换机收到消息 ack = false*  cause 失败原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {String id = correlationData!=null ? correlationData.getId() : "";if(ack){log.info("确认回调:id:{}", id);}else {log.info("交换机未接受到消息,id:{},原因:{}", id, cause);}}//将不可达的消息传递给生产者@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {log.error("消息:{};\r\n退回消息交换机:{};\r\n退回原因:{};\r\nrouting key:{}",new String(returnedMessage.getMessage().getBody()), returnedMessage.getExchange(), returnedMessage.getReplyText(), returnedMessage.getRoutingKey());}
}

        消息消费

@Component
@Slf4j
public class Consumer {@RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)public void receiveConfirmMessage(Message message){log.info("接受到的消息:{}",new String(message.getBody()));}
}

        消息发布 

@Slf4j
@RestController
@RequestMapping("/pub_confirm")
public class PubConfirmSendMsgController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/sendMsg/{message}")public String sendMsg(@PathVariable("message") String message){log.info("当前时间:{}发送一条消息{}", new Date(), message);//1 关联实例用来设置消息的唯一标识,存储在消息的headers中CorrelationData correlationData = new CorrelationData();
//        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, ConfirmConfig.CONFIRM_ROUTING_KEY, "消息来自发布确认测试:"+message, correlationData);//2 模拟下异常情况,设置要给不存在的routing key,正常情况下会由回退消息回调方法处理rabbitTemplate.convertAndSend("ConfirmConfig.CONFIRM_EXCHANGE_NAME", "none_exist", "消息来自发布确认测试:"+message, correlationData);return "发送成功";}

 【注:】

        发布确认在失败阶段要注意下,此时correlationData没有设置returnedMessge,只存在消息标识id,但是仍可以通过消息入库 + id查询的方式重新发送消息

五、备份交换机

        有了发布确认和回退消息,我们有机会去处理不可达的消息,但是有时候我们并不知道该如何处理这些不可达的消息,最多进行日志打印,然后触发告警,最后补偿数据,如何更方便地解决这个问题?备份交换机就可以很好地简化这个流程

        备份交换机是指为普通交换机的alternate参数配置的一个交换机,当交换机无法路由消息时就转给备份交换机来处理

【注:】

        如果同时设置了备份交换机和回退设置,会先执行备份交换机处理,回退消息回调方法来最后兜底。

        比如confirm.exchange路由失败转给backup.exchange失败,则回退消息回调方法会被触发;再比如backup.exchange将消息路由到warning.queue失败,则回退消息回调方法仍然会被触发

        配置信息

@Configuration
public class BackupConfig {//--------------------------------------------备份交换机配置-------------------------------------------//交换机public static final String BACKUP_EXCHANGE_NAME = "backup_exchange";//队列public static final String BACKUP_QUEUE_NAME = "backup_queue";public static final String WARNING_QUEUE_NAME = "warning_queue";//    关联备份交换机然后创建发布确认交换机
//    如果broker已经有confirmExchange要提前清除掉@Bean("confirmExchange")public DirectExchange confirmExchange(){return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true).alternate(BackupConfig.BACKUP_EXCHANGE_NAME).build();}//声明交换机@Bean("backupExchange")public FanoutExchange backupExchange(){return new FanoutExchange(BACKUP_EXCHANGE_NAME);}//声明队列@Bean("backupQueue")public Queue backupQueue(){return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();}@Bean("warningQueue")public Queue warningQueue(){return QueueBuilder.durable(WARNING_QUEUE_NAME).build();}@Beanpublic Binding queueBackupBindingExchange(@Qualifier("backupQueue") Queue backupQueue, @Qualifier("backupExchange") FanoutExchange exchange){return BindingBuilder.bind(backupQueue).to(exchange);}@Beanpublic Binding queueWarningBindingExchange(@Qualifier("warningQueue") Queue warningQueue, @Qualifier("backupExchange") FanoutExchange exchange){return BindingBuilder.bind(warningQueue).to(exchange);}
}

六、其它

1.幂等

         幂等就是对同一操作的重复执行,如删除操作和查询操作,天然兼容幂等,但是新增和修改必须解决幂等影响,比如查询某个用户的流水,不管怎么重复操作都不会影响流水数据,删除也一样,但是新增就麻烦了,比如付款,对当前订单的付款如果重复付款肯定是业务异常的,修改也是如此

         消费端的幂等保障一般是:1 唯一ID+指纹码机制,利用数据库去重;2 利用redis原子性实现。唯一码是一些规则或者时间戳加别的服务信息拼接成的唯一码,要么就是redis执行setnx指令,天然支持幂等

2.优先级队列

        在系统中有一个订单催付的业务,商城会在订单有效期内推送消息给客户作为一个下单提醒,很细节的功能,但是展开来看还是有点东西的,比如系统会区分大小客户,比如高价订单会被优先处理,这涉及到队列的优先级设置,简单系统可以利用定时任务轮询,但是发杂系统还是要用消息队列,rabbitmq就提供了一个优先级配置参数

        2.1.优先级队列控制台设置方式

        标记会有个pri,表示该队列有权限参数配置

       2.2.优先级队列代码设置方式

        配置信息   

@Configuration
public class PriorityConfig {@Beanpublic Queue queuePri(){
//      优先级以小为优先,设置范围0-255,不用设置太大消耗内存return QueueBuilder.durable("priority_queue").withArgument("x-max-priority", 10).build();}}

         消费端

@Slf4j
@Component
public class PriorityConsumer {@RabbitListener(queues = "priority_queue")public void receiveConfirmMessage(Message message){log.warn("排序处理!消息:{}", new String(message.getBody()));}}

         生产消息测试

@RestController
@RequestMapping("/priority")
public class PrioritySendMsgController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/sendMsg/{message}")private String sendMsg(@PathVariable("message") String message){MessageProperties messageProperties = new MessageProperties();Message msg = null;for(int i=1; i<11; i++){if(i==5){//优先级以小为优先messageProperties.setPriority(1);}else {messageProperties.setPriority(10);}msg = new Message((message+":"+i).getBytes(StandardCharsets.UTF_8), messageProperties);rabbitTemplate.convertAndSend("","priority_queue",msg.getBody());}return "发布成功!!!";}
}

3.惰性队列

        从3.6.0开始引入惰性队列,功能是将消息最大限度存储到磁盘,当消息被消费时才被加载进内存。但是默认情况,生产者发送消息到队列时会最大限度存储到内存,这是为了更高效处理消息

        优点:有磁盘兜底可以大量地存储消息、使消息更加可靠(比如防止宕机丢失)

        缺点:在被写入磁盘时依然会在内存备份,当mq释放内存时会将内存的消息换页到磁盘,这个操作比较消耗内存,也会阻塞队列导致接受不到消息,尤其消息量高时问题明显

        总结:常规消息量高效处理可以用默认队列(default mode);解决消息堆积可以用惰性队列(lazy mode),好文推荐https://blog.csdn.net/TheWindOfSon/article/details/130808424?spm=1001.2101.3001.6650.2&utm_medium=distribute.pc_relevant.none-task-blog-2~default~YuanLiJiHua~Position-2-130808424-blog-104037059.235%5Ev38%5Epc_relevant_anti_vip_base&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2~default~YuanLiJiHua~Position-2-130808424-blog-104037059.235%5Ev38%5Epc_relevant_anti_vip_base&utm_relevant_index=5

        控制台配置信息​​​​​​ 

        

        代码配置信息

@Configuration
public class LazyModeConfig {@Beanpublic Queue queueLazyMode(){return QueueBuilder.durable("lazy_mode_queue").withArgument("x-queue-mode", "lazy").build();}
}

         消息消费

@Slf4j
@Component
public class LazyModeConsumer {@RabbitListener(queues = "lazy_mode_queue")public void receiveConfirmMessage(Message message){log.warn("懒惰队列!消息:{}", new String(message.getBody()));}}

        消息生产

@RestController
@RequestMapping("/lazyMode")
public class LazyModeController {@Autowiredprivate RabbitTemplate rabbitTemplate;// lazyMode队列的可执行测试@GetMapping("/sendMsg/{message}")private String sendMsg(@PathVariable("message") String message){//obj参数如果是非Message类型实例则默认执行消息持久化rabbitTemplate.convertAndSend("","lazy_mode_queue",message);return "发布成功!!!";}// lazyMode队列消息是否可持久化测试@GetMapping("/sendMsg2/{message}")private String sendMsg2(@PathVariable("message") String message){//obj参数设置Message类型实例,交付模式为非持久化。如果mq宕机恢复后消息依然存在则说明lazyMode队列确实是做了消息的持久化//测试流程:
//            1.关闭消费者LazyModeConsumer(测试结束后恢复开启)
//            2.执行接口
//            3.查看控制台,是否有待消费消息
//            4.如果有,stop mq服务
//            5.start mq服务,观察待消费消息是否依然存在MessageProperties messageProperties = new MessageProperties();messageProperties.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);Message msg =  new Message((message).getBytes(StandardCharsets.UTF_8), messageProperties);rabbitTemplate.convertAndSend("","lazy_mode_queue",msg.getBody());return "发布成功!!!";}
}

        创建Message类型实例发送消息,通过模拟宕机测试出的结论:消息不会丢失,持久化得到了实际验证 

6.4.交付模式(delivery mode)

        delivery mode是指生产者发送消息的属性是否支持持久化,要么是none_persistent、要么是persistent

七、集群

        集群搭建(主从高可用集群,一主多从,主机负责消息接收和消费,主机宕机后选举从机补上,RabbitMQ—集群原理)

        镜像队列(主机宕机将无法消费队列,因此要在从机备份队列,以便宕机后选举出的新主机运行此备份队列,镜像队列)

        federationExchange(非集群之间如何消息同步)

        federationQueue(非集群之间消息同步)

        Shovel(非集群之间消息同步)

        待续:83-集群原理_哔哩哔哩_bilibili


 演示代码gitee:rabbitmq-demo: rabbitmq-demohttps://gitee.com/songyahuiX2/rabbitmq-demo.git

参考资料: rabbitmq_哔哩哔哩_bilibilihttps://www.bilibili.com/video/BV1cb4y1o7zz?p=1&vd_source=8be621c052fd9f705308579363b67881

好文推荐:


RabbitMQ镜像队列_胡晗-的博客-CSDN博客icon-default.png?t=N6B9https://blog.csdn.net/phoenix9311/article/details/108998237?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522168982077116800215063044%2522%252C%2522scm%2522%253A%252220140713.130102334.pc%255Fall.%2522%257D&request_id=168982077116800215063044&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~all~first_rank_ecpm_v1~rank_v31_ecpm-4-108998237-null-null.142^v90^chatsearch,239^v2^insert_chatgpt&utm_term=%E9%95%9C%E5%83%8F%E9%98%9F%E5%88%97%E5%AF%BC%E8%87%B4%E6%B6%88%E8%B4%B9%E9%87%8D%E5%A4%8D&spm=1018.2226.3001.4187
(63条消息) 【RabbitMQ】RabbitMQ如何做到保证消息100%不丢失?_rabbitmq确保消息不丢失_码农BookSea的博客-CSDN博客https://blog.csdn.net/bookssea/article/details/123119980

(63条消息) RabbitMQ消息丢失的场景,如何保证消息不丢失?(详细讲解,一文看懂)_rabbitmq如何保证消息不丢_十八岁讨厌Java的博客-CSDN博客https://blog.csdn.net/w20001118/article/details/126595970?spm=1001.2101.3001.6650.1&utm_medium=distribute.pc_relevant.none-task-blog-2~default~BlogCommendFromBaidu~Rate-1-126595970-blog-108402423.235%5Ev38%5Epc_relevant_anti_vip_base&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2~default~BlogCommendFromBaidu~Rate-1-126595970-blog-108402423.235%5Ev38%5Epc_relevant_anti_vip_base&utm_relevant_index=2

(63条消息) Springboot 2.x ——RabbitTemplate为什么会默认消息持久化?_rabbittemplate 如何这是消息持久化_专注写bug的博客-CSDN博客https://blog.csdn.net/qq_38322527/article/details/122921878

RabbitMQ如何保证消息的可靠传输 - 知乎 (zhihu.com)https://zhuanlan.zhihu.com/p/156147569(63条消息) RabbitMQ(四):RabbitMQ高级特性_张凯锋zkf的博客-CSDN博客https://blog.csdn.net/m0_53142956/article/details/127792054
(63条消息) rabbitmq basicReject / basicNack / basicRecover区别_t0mCl0nes的博客-CSDN博客https://blog.csdn.net/fly_leopard/article/details/102821776

(63条消息) RabbitMQ学习笔记:addReturnListener监听回退消息_rabbitmq监听队列消息_OceanSky6的博客-CSDN博客https://blog.csdn.net/yaomingyang/article/details/102752906

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/24099.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Dapr,可能是传统应用转向微服务式应用最快的技术路线

一、开篇小记 过去的一段时间&#xff0c;一直在赶一些项目的进度&#xff0c;再加上前阵子的封控&#xff0c;一直没有时间静下心来好好整理和总结。从这周开始&#xff0c;总算有时间整理点东西了&#xff0c;就还是继续折腾了一些关于微服务的知识点。 由于我本人呢&#…

大模型 NLP 算法 大汇总

大模型 & NLP & 算法 大汇总 订阅本专栏【大模型 & NLP & 算法 知识大礼包】&#xff0c;即可获取博主多年积累的关于 【大模型 & NLP & 算法】 全部资料&#xff0c;只要59.9&#xff01;订阅成功后请主动联系博主索要资料&#xff5e; 目前大模型和…

关于midjourney、novelai的订阅购买

midjourney 最近人工智能非常火热&#xff0c;有chatgpt、midjourney及novelai等等&#xff0c;在不同领域都应用广泛&#xff0c;关于订阅购买&#xff0c;这边做个记录。 购买midjourney 注册discord账户&#xff0c;进入社区&#xff0c;直接访问公共服务器&#xff0c;进…

Unity接入大模型(小羊驼Vicuna,vLLM,ChatGPT等)

实现在Unity内部的大模型访问&#xff0c;我也是第一次接触Unity中通过大模型url访问。此博客面向新手&#xff0c;旨在给大家简单理解大模型POST和GET过程&#xff0c;还有实现简单的大模型访问。 参考博客&#xff1a;什么是chatGPT&#xff1f;Unity结合OpenAI官方api实现类…

ChatGPT 的工作原理:深入探究

本文首发于微信公众号&#xff1a;大迁世界, 我的微信&#xff1a;qq449245884&#xff0c;我会第一时间和你分享前端行业趋势&#xff0c;学习途径等等。 更多开源作品请看 GitHub https://github.com/qq449245884/xiaozhi &#xff0c;包含一线大厂面试完整考点、资料以及我的…

ChatGPT私人订制!只需把文档一键上传,免费可玩

衡宇 发自 凹非寺量子位 | 公众号 QbitAI 想不想简单轻松地拥有一个私人订制GPT&#xff1f; 不如试试askwise&#xff0c;只需要上传word、pdf等各种文档&#xff0c;就能生成个性化知识库&#xff0c;然后AI在你的知识库中上下求索&#xff0c;进行回答。 浅试了一下&#xf…

台大李宏毅报告:ChatGPT (可能)是怎麼煉成的 - GPT 社會化的過程

台大李宏毅报告&#xff1a;ChatGPT &#xff08;可能&#xff09;是怎麼煉成的 - GPT 社會化的過程 ChatGPT官方Blog&#xff1a;ChatGPT未公布论文——根据兄弟模型InstructGPT论文进行猜想&#xff1a;&#xff08;1&#xff09;Chat GPT的學習四階段1.學習文字接龍2.人類老…

如何使用chatgpt生成精美PPT提高工作效率

本教程收集于:AIGC从入门到精通教程 如何快速生成精美PPT提高工作效率 一、ChatGPT生成markdown源代码 二、Mindshow登录/注册 三、导入markd

达摩院榜首模型人脸检测MogFace CVPR论文深入解读

团队模型、论文、博文、直播合集&#xff0c;点击此处浏览 一、开源 1.&#xff09;论文链接&#xff1a;MogFace: Towards a Deeper Appreciation on Face Detection 2.&#xff09;模型&代码&#xff1a;https://modelscope.cn/models/damo/cv_resnet101_face-detecti…

GTC 2023 | 「皮衣刀客」黄仁勋畅谈 AI Top 5,科学计算、生成式 AI、Omniverse 榜上有名

内容一览&#xff1a;北京时间 3 月 21 日 23:00&#xff0c;英伟达创始人兼 CEO 黄仁勋在 GTC 2023 上发表主题演讲&#xff0c;介绍了生成式 AI、元宇宙、大语言模型、云计算等领域最新进展。 关键词&#xff1a;英伟达 黄仁勋 GTC 2023 「Don’t Miss This Defining Momen…

《WebRTC系列》实战 Web 端支持 h265 硬解

1、背景 Web 端实时预览 H.265 需求一直存在&#xff0c;但由于之前 Chrome 本身不支持 H.265 硬解&#xff0c;软解性能消耗大&#xff0c;仅能支持一路播放&#xff0c;该需求被搁置。 去年 9 月份&#xff0c;Chrome 发布 M106 版本&#xff0c;默认开启 H.265 硬解&#xf…

极客公园对话 Zilliz 星爵:大模型时代,需要新的「存储基建」

大模型在以「日更」进展的同时&#xff0c;不知不觉也带来一股焦虑情绪&#xff1a;估值 130 亿美元的 AI 写作工具 Grammarly 在 ChatGPT 发布后网站用户直线下降&#xff1b;AI 聊天机器人独角兽公司 Character.AI 的自建大模型在 ChatGPT 进步之下&#xff0c;被质疑能否形成…

云平台的ChatGLM部署

最近ChatGPT很火&#xff0c;国内清华也发布了ChatGLM&#xff0c;于是想在云平台上实现一下小型的ChatGLM。目前准备在趋动云这个平台上试试ChatGLM-6B-int8。 目前ChatGLM-6B-int8显存最少需要10G 可以参考GitHub - THUDM/ChatGLM-6B: ChatGLM-6B: An Open Bilingual Dialo…

高通Ziad Asghar:AI处理的重心从云端向边缘侧转移,智能手机是最佳平台 | MEET 2023...

萧箫 整理自 MEET 2023量子位 | 公众号 QbitAI 从Stable Diffusion到ChatGPT&#xff0c;这半年AI算法应用可谓突飞猛进。 但对于硬件领域而言&#xff0c;AI计算的下一个突破口或未来趋势究竟是什么&#xff1f; 尤其是AI应用最大的领域之一——移动端&#xff0c;大量AI算法在…

Stable Diffusion免费(三个月)通过阿里云轻松部署服务

温馨提示&#xff1a;划重点&#xff0c;活动入口在这里喔&#xff0c;不要迷路了。 其实我就在AIGC_有没有一种可能&#xff0c;其实你早就在AIGC了&#xff1f;阿里云邀请你&#xff0c;体验一把AIGC级的毕加索、达芬奇、梵高等大师作画的快感。阿里云将提供免费云产品资源&…

如何通过限制 IP 相关信息 | 控制用户访问站点频率

文章目录 通过 IP 限制反爬实验介绍知识点课程环境 IP 限制实战用 Nginx 限制特定 IP关于 allow 和 deny 的使用说明Nginx 限制 IP 访问频率Python Flask 模拟 IP 黑名单 实验总结 通过 IP 限制反爬 实验介绍 在常规的反爬手段中&#xff0c;IP 限制是应用广泛且比较有效的&a…

win11 报错 你的IT管理员已经限制对此应用一些区域的访问 解决方法

你的IT管理员已经限制对此应用一些区域的访问,你尝试访问的项目不可用。有关详细,请与你的IT支持人员联系。 1.按下wins&#xff0c;在框中输入cmd&#xff0c;右键管理员身份运行 2.在命令提示符中输入 reg add “HKEY_LOCAL_MACHINE\SOFTWARE\Policies\Microsoft\Windows Def…

wordpress开放注册和邮件问题解决

1开放注册 WordPress后台,设置-常规,勾选任何人都可以注册前面的复选框,新用户角色改为作者&#xff0c;保存即可开启。 2新用户注册收不到邮件问题解决 wordpress配置SMTP服务发送邮件(以qq邮箱为例) 第一步、配置邮箱&#xff08;这里介绍qq邮箱&#xff09; 我试过多个…

SLAM基础知识汇总【长期更新】

SLAM基础知识汇总 特征点相关 特征点由关键点和描述子构成&#xff1a; 关键点&#xff1a;特征点在图像里的位置描述子&#xff1a;通常是一个向量&#xff0c;描述了该关键点周围的信息&#xff0c;朝向大小等 [ORB-SLAM2] ORB-SLAM中的ORB特征&#xff08;提取&#xff…

国科大数字图像处理(复习与整理)

图像处理复习笔记&#xff1a; 1、证明一个系统是线性系统2、证明函数卷积的傅里叶变换等于函数傅氏变换后的乘积3、采样定理与混叠4、直方图均衡化第一节课知识点第二节课知识点第三节课知识点第四节课知识点第五节课知识点第六节课知识点第七节课知识点第八节课知识点第九节课…