【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

目录

分布式系统通信方式

MQ选型与应用场景

应用场景(优势)

RabbitMQ工作模型

RabbitMQ简介

RabbitMQ 工作模型(流程)​编辑

Docker安装配置RabbitMQ

RabbitMQ管理控制台

RabbitMQ 简单模式构建生产者

RabbitMQ 简单模式构建消费者

 RabbitMQ工作模式 - WorkQueues

RabbitMQ工作模式 - 发布订阅

RabbitMQ工作模式 - 路由模式

RabbitMQ工作模式 - 通配符模式

RabbitMQ集成SpringBoot(上) - 异步解耦发送短信

RabbitMQ集成SpringBoot(下) - 监听消费短信发送

消息的可靠性投递Confirm机制

 消息的可靠性投递Return机制

消费端可靠性ACK机制

RabbitMQ 消费者消息限流

RabbitMQ ttl特性控制短信队列超时

1. 对整个队列设置ttl:

2. 对单独一个消息进行ttl设置

RabbitMQ 死信队列的实现

代码实现

本章小结

作业


MQ:Message Queue(消息队列),是在消息传输的过程中,把消息保存到的一个容器。消息可以是我们产生的一些信息数据,一般都都是传字符串或者json字符串。消息队列主要用于一些分布式系统或者微服务系统之间的通信

分布式系统通信方式

  • 远程调用:同步
  • 借助中间件进行通信:异步,中间件有mq,zookeeper,redis

MQ选型与应用场景

  • RabbitMQ:erlang高并发能力不错,springboot推荐集成,社区活跃资料多。并发能力和性能都不错。
  • ActiveMQ:apache出品,目前使用量不多,不如以前,性能也不如其他。
  • RocketMQ:阿里出品,吞吐量很高,是Rabbit的10倍,十万左右。功能很全,扩展性也很好。金融类项目优先推荐使用。
  • Kafka:apache出品,侧重大数据领域用的很多,吞吐量10万级。只支持主要的一些MQ功能。
  • ZeroMQ,MetaMQ,RedisMQ
    简单来说,RabbitMQ的综合能力比较好。中小型公司用的也比较多,因为足够满足自身业务,所以后期一旦公司发展可以考虑转型使用RocketMQ,因为rocket是java开发的,可以自己去重构,而rabbit是erlang语言,极不容易修改。未来有机会可以再开Rocket来聊一聊

应用场景(优势)

  • 异步任务
    • 一些耗时很长的操作本身是同步执行的,可以借助mq通知消费者去处理,这个过程是异步的。从而减少了前端用户的等等的响应时间。
    • 比如云端操作重装系统,可能需要1分钟左右的时间,用户不可能在页面上等着吧。那么生产者可以向消费者发出一个重装的通知,让其进行重装操作。前端用户可以继续做别的操作,等重装完毕以后,再通知用户即可。因为这种操作不需要及时性,延后通知即可。这也是暂时性的不一致,MQ是最终一致性。
  • 提速
    • 本来前端用户都要等着处理完毕的结果响应,现在异步可以直接返回接口,减少了等待的时间,如此一来大大提高了前端用户的等待时间,用户体验更高了。也就是说当前的请求接口的吞吐量就大大提高了。
  • 接口解耦
    • MQ就相当于是工厂创建的微信群,把批发商拉进群,让他们进行监听,起到了接口之间解耦的作用。同时也是一种广播机制。在我们的系统中,如果一个接口里要调用多个远程服务,比如下单的同时需要调用库存、支付、物流、日志、积分等多个服务,那么如果是同步进行,一旦一个环节出了问题,那么整个接口就崩了,如此整个系统的容错性太低了; 如果使用mq解耦,那么哪怕要坏也是只坏一个环节,大大降低了发生问题的风险。
  • 削峰填谷
    • 这个是属于高并发的场景,举个例子,工厂有10万件衣服,需要快速清仓,现在所有的批发商的清货能力只能在5万件左右,而且也没有那么多钱进货。所以通过MQ这个中介,工厂把衣服都放入中介,批发商慢慢的把衣服卖出去以后再把后续的5万件衣服进货不就行了?这就是瞬时高并发所遇到的情况,就比如秒杀,服务器里redis啊数据库等处理能力不高,流量太大,那我们把请求放入到mq,如此一来,后续的数据慢慢处理就行了。这也就和餐厅吃饭在外面排队等位是一个道理。处理不来,就慢慢排队等着。
  • 使用MQ之前,高并发的时候,瞬时请求量很大,会达到5000以上,这个时候的当前时间就是高峰,服务器压力不行势必会打死。
  • 使用MQ之后,限制速度为1000个并发,哪怕5000甚至上万的请求进来,也会被限速,那么这就是削峰,消息的消费平均速度会维持在1000左右,哪怕高峰期过来,速度也是1000左右,那么这就是填谷
  • 举个例子,饭店吃饭高峰期,人流量很多,这个时候不可能直接进去吃饭把,餐桌就那么点,客户太多了,没办法,只能取号排队把,深有体会,排队的过程就是慢慢的消费,削峰填谷

RabbitMQ工作模型

RabbitMQ简介

RabbitMQ基于AMQP协议,Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,是应用层协议的一个开放标准,为面向消息中间件设计的。基于这个协议的客户端和消息中间件可以传递消息,并且不会因为客户端或中间件产品的不同所首先,也不会受到不同开发语言的限制。比如用java、php、.net都可以使用消息队列,并且只要支持AMQP这个协议,不论什么客户端,都可以相互传输消息。而且甚至与你只要遵循amqp协议,自己也能开发一套消息队列产品也没问题。

RabbitMQ就是基于amqp所开发的一套产品,只是他是基于erlang语言开发的。它是一种应用于应用之间的通信方法,MQ在分布式系统中应用非常广泛。

官网地址:RabbitMQ: One broker to queue them all | RabbitMQ

RabbitMQ 工作模型(流程)

  • RabbitMQ:消息队列服务中间件
  • 生产者produce:创建消息,发送到mq
  • 消费者Consumer:监听消息,处理业务通知
  • Exchange:MQ的交换机,他会按照一定的规则来转发到某个队列,类似controller路由机制。比如 /passport/sms。也就是分发网络请求的功能,和我们现实生活中的交换机是一个意思。
  • Queue:队列,存储消息。相当于controller。被消费者监听,一旦有消息,会被消费者接收到。
  • Binding-Routes:交换机和队列的绑定关系,通过路由结合在一起,消息如何通过交换机发送到队列里,是通过路由机制,类似于@RequestMapping,路由规则一旦匹配,那么就可以存储对应的消息。
  • Channel:生产者和消费者与MQ建立的通道,相当于httpSession会话,建立请求就会有channel。可以理解为一个桥梁作用,消息经过桥梁到达mq的queue。Channel的目的是为了管理每次请求到RabbitMQ-server的连接connection,如此才能更好的节约资源的开支,提高通信效率。

Docker安装配置RabbitMQ

docker pull rabbitmq:management

运行mq

docker run --name rabbitmq \
-p 5671:5671 \
-p 5672:5672 \
-p 4369:4369 \
-p 15671:15671 \
-p 15672:15672 \
-p 25672:25672 \
--restart always \
-d rabbitmq:management

如果忘记加上自动重启,可以运行如下脚本:

docker update rabbitmq --restart=always

5671: AMQP端口
5672: AMQP端口
15672: 管理平台
4369,25672: erlang发现与集群端口
616131,61614: stomp协议端口
1883, 8883: MQTT协议端口
……
更多参照:Networking and RabbitMQ | RabbitMQ

运行成功

查看具体版本:

docker image inspect rabbitmq:management|grep -i version

访问管理界面:

​​​​​​http://192.168.1.112:15672/

默认用户名密码为:guest guest
进入后即可使用:

​​​​​​http://192.168.1.112:15672/

RabbitMQ管理控制台

RabbitMQ的管理控制台界面相当友好,可视化程度很不错。

概览信息:

  • connections:客户端的连接
  • channels:没有connections就没有channels
  • exchanges:交换机,内部有默认的定义好的名字
  • queues:队列,可以通过交换机点击绑定的进入
  • admin:管理设置,可以创建账号以及分区
  • virtul host 可以理解为分区,不同项目使用

RabbitMQ 简单模式构建生产者

本节课开始我们将会使用java代码来构建rabbitmq的各种通信,首先我们来学习的是构建消息的生产者。

RabbitMQ Tutorials | RabbitMQ

RabbitMQ tutorial - "Hello World!" | RabbitMQ

简单模式:通过生产者发消息给队列,消费者监听收到消息

为了更加清晰的看到生产者与消费者,我们可以创建两个子模块工程:

分别添加依赖坐标:

<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.16.0</version>
</dependency>

创建生产者:​​​​​​​

/*** 构建生产者,发送消息*/
public class FooProducer {public static void main(String[] args) throws Exception {// 1. 创建连接工厂以及参数配置ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.1.122");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("imooc");factory.setPassword("imooc");// 2. 创建连接ConnectionConnection connection = factory.newConnection();// 3. 创建管道ChannelChannel channel = connection.createChannel();// 4. 创建队列Queue(简单模式不需要交换机Exchange)/*** queue: 队列名* durable: 是否持久化,true:重启后,队列依然存在,否则不存在* exclusive: 是否独占,true:只能有一个消费者监听这个队列,一般设置为false* autoDelete: 是否自动删除,true:当没有消费者的时候,自动删除队列* arguments: map类型其他参数*/channel.queueDeclare("hello", true, false, false, null);// 5. 向队列发送消息/*** exchange: 交换机名称,简单模式没有,直接设置为 ""* routingKey: 路由key,映射路径,如果交换机是默认"",则路由key和队列名一致* props: 配置信息* body: 消息数据*/String msg = "Hello ~";channel.basicPublish("", "hello", null, msg.getBytes());// 7. 释放channel.close();connection.close();}
}

  运行后查看控制台:

RabbitMQ 简单模式构建消费者

/*** 构建消费者,监听消息*/
public class FooConsumer {public static void main(String[] args) throws Exception {// 1. 创建连接工厂以及参数配置ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.1.122");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("imooc");factory.setPassword("imooc");// 2. 创建连接ConnectionConnection connection = factory.newConnection();// 3. 创建管道ChannelChannel channel = connection.createChannel();// 4. 创建队列Queue(简单模式不需要交换机Exchange)/*** queue: 队列名* durable: 是否持久化,true:重启后,队列依然存在,否则不存在* exclusive: 是否独占,true:只能有一个消费者监听这个队列,一般设置为false* autoDelete: 是否自动删除,true:当没有消费者的时候,自动删除队列* arguments: map类型其他参数*/channel.queueDeclare("hello", true, false, false, null);// 5. 监听并消费消息/*** queue: 监听的队列名* autoAck: 是否自动确认,true:告知mq消费者已经消费的确认通知* callback: 回调函数,处理监听的消息*/Consumer consumer = new DefaultConsumer(channel) {/*** 重写消息配送方法* @param consumerTag: 消息标签(标识)* @param envelope: 信封(一些信息,比如交换机路由等信息)* @param properties: 配置信息,和生产者的一致* @param body: 消息数据* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {System.out.println("consumerTag = " + consumerTag);System.out.println("envelope = " + envelope.toString());System.out.println("properties = " + properties.toString());System.out.println("body = " + new String(body));super.handleDelivery(consumerTag, envelope, properties, body);}};channel.basicConsume("hello", true, consumer);// 不需要关闭连接,则持续监听}
}

监听结果:

 RabbitMQ工作模式 - WorkQueues

   

工作队列没有交换机,生产者发送消息给队列,队列有两个或者多个消费者监听进行消费,但是所有的消息是会被消费者以竞争的关系进行消费,所以队列里的消息也称之为工作的任务,任务由一个人完成了就不需要被第二个人完成。所以这个队列里的所有消息只会被某一个消费者进行消费读取。

  • 使用场景:如果任务量很大很多,而一个消费者处理不过来,则此时可以使用工作队列,比如短信发送在一个系统里会有很多场景进行发送给用户,所以量很大的时候,也可以分配给多个消费者去进行消费发短信即可。这就像上班工作量很大,就需要招人共同完成一些任务是一个道理。

构建生产者:
代码可以从简单模式复制过来进行修改

......
channel.queueDeclare("work_queue", true, false, false, null);Integer tasks[] = {};
for (int i = 0 ; i < 10 ; i ++) {String msg = "开始上班,任务[" + i + "]";channel.basicPublish("", "work_queue", null, msg.getBytes());
}
......        

构建两个消费者:
只需要复制简单模式再修改队列名即可

...
channel.queueDeclare("work_queue", true, false, false, null);
...
channel.basicConsume("work_queue", true, consumer);
...

从运行结果可以开得出来,这也是类似于负载均衡的轮询效果。

RabbitMQ工作模式 - 发布订阅

之前的工作队列是一个消息只能被一个消费者处理消费,发布订阅则不是,只要监听队列,所有的消费者都能消费同一个消息,这类似与公众号的订阅,比如我和你都订阅了慕课网的公众号,慕课网只要发布一个新的文章,那么我们都可以同时收到,这就是发布订阅模式。
需要注意,这里使用了交换机,因为不同的用户组可以订阅不同的队列,所以通过交换机来绑定管理并且把消息路由到不同的队列即可。
交换机的类型:

  • Fanout:广播模式,把消息发送给所有绑定的队列
  • Direct:定向投递,把消息发送给指定的routing key的队列
  • Topic:通配符模式,把消息交给符合routing pattern的队列
    需要注意,交换机只负责转发消息,不会存储消息,存储消息的职责是队列的,不要搞混噢。

生产者代码:

......
// 创建交换机
/*** exchange: 交换机名称* type: 交换机类型*      FANOUT("fanout"): 广播模式,把消息发送给所有绑定的队列*      DIRECT("direct"): 定向投递,把消息发送给指定的`routing key`的队列*      TOPIC("topic"): 通配符模式,把消息交给符合`routing pattern`的队列*      HEADERS("headers"): 使用率不多,参数匹配* durable: 是否持久化* autoDelete: 是否自动删除* internal: 内部意思,true:表示当前Exchange是RabbitMQ内部使用,用户创建的队列不会消费该类型交换机下的消息,所以我们自己使用设置为false即可* arguments: 参数*/
String exchange = "fanout_exchange";
channel.exchangeDeclare(exchange,BuiltinExchangeType.FANOUT, true, false, false, null);// 创建两个队列
String fanout_queue_a = "fanout_queue_a";
String fanout_queue_b = "fanout_queue_b";
channel.queueDeclare("fanout_queue_a", true, false, false, null);
channel.queueDeclare("fanout_queue_b", true, false, false, null);// 绑定交换机和队列
/*** queue* exchange* routingKey: 路由key,绑定规则,这里暂不使用(fanout本身广播给所有订阅者,所以没有路由规则,使用空字符串即可)*/
channel.queueBind(fanout_queue_a, exchange, "");
channel.queueBind(fanout_queue_b, exchange, "");for (int i = 0 ; i < 10 ; i ++) {String msg = "开始上班,任务[" + i + "]";channel.basicPublish(exchange, "", null, msg.getBytes());
}channel.close();
connection.close();

运行查看:

消费者:
复制工作模式的消费者代码,只需要修改定义的队列名即可

......
String fanout_queue_a = "fanout_queue_a";
channel.queueDeclare(fanout_queue_a, true, false, false, null);
......
channel.basicConsume(fanout_queue_a, true, consumer);
......

运行结果:

RabbitMQ工作模式 - 路由模式

路由模式routing可以针对不同的类别进行路由,比如图中,可以控制不同的日志级别进行路由,这就相当于控制器的@RequestMapping,请求的url地址根据不同的映射路径进行controller接口的调用,原理一致。

生产者代码:
可以直接拷贝发布订阅模式进行修改

...
String exchange = "routing_exchange";
channel.exchangeDeclare(exchange,BuiltinExchangeType.DIRECT, true, false, false, null);// 创建两个队列
String routing_queue_order = "routing_queue_order";
String routing_queue_pay = "routing_queue_pay";
channel.queueDeclare(routing_queue_order, true, false, false, null);
channel.queueDeclare(routing_queue_pay, true, false, false, null);
...
// 订单的创建/更新/删除都走订单队列进行消费;订单的支付走独立的队列进行消费
channel.queueBind(routing_queue_order, exchange, "order_create");
channel.queueBind(routing_queue_order, exchange, "order_delete");
channel.queueBind(routing_queue_order, exchange, "order_update");
channel.queueBind(routing_queue_pay, exchange, "order_pay");  // 根据不同的路由key进行消息的发送
String msg1 = "创建订单A";
String msg2 = "创建订单B";
String msg3 = "删除订单C";
String msg4 = "修改订单D";
String msg5 = "支付订单E";
String msg6 = "支付订单F";
channel.basicPublish(exchange, "order_create", null, msg1.getBytes());
channel.basicPublish(exchange, "order_create", null, msg2.getBytes());
channel.basicPublish(exchange, "order_delete", null, msg3.getBytes());
channel.basicPublish(exchange, "order_update", null, msg4.getBytes());
channel.basicPublish(exchange, "order_pay", null, msg5.getBytes());
channel.basicPublish(exchange, "order_pay", null, msg6.getBytes());
...

运行生产者,查看:

消费者代码:
拷贝发布定义模式的代码,修改队列名称即可

​​​​​​​

...
String routing_queue_order = "routing_queue_order";
channel.queueDeclare(routing_queue_order, true, false, false, null);
...
channel.basicConsume(routing_queue_order, true, consumer);
...

  运行结果:

RabbitMQ工作模式 - 通配符模式

topics通配符模式是功能最强大也是用的最多的一个mq工作模式。
通配符模式其实是路由模式的进阶版,这个时候的routing key可以写成通配符,只要符合规则即可进行消息的路由。

通配符可以有两种形式:

  • *:匹配一个字段
    • 比如:order.*.* 可以匹配 order.do.delete,order.create.finish
  • #:匹配0个或者多个字段
    • 比如 #.order.# 可以匹配 order.do.delete,check.order.if.finish

生产者代码:

...
String exchange = "topics_exchange";
channel.exchangeDeclare(exchange,BuiltinExchangeType.TOPIC, true, false, false, null);// 创建两个队列
String topics_queue_order = "topics_queue_order";
String topics_queue_pay = "topics_queue_pay";
channel.queueDeclare(topics_queue_order, true, false, false, null);
channel.queueDeclare(topics_queue_pay, true, false, false, null);
...
channel.queueBind(topics_queue_order, exchange, "order.*");
channel.queueBind(topics_queue_pay, exchange, "#.pay.#");// 根据不同的路由key进行消息的发送
String msg1 = "创建订单A";
String msg2 = "创建订单B";
String msg3 = "删除订单C";
String msg4 = "修改订单D";
String msg5 = "支付订单E";
String msg6 = "支付订单F";
channel.basicPublish(exchange, "order.create", null, msg1.getBytes());
channel.basicPublish(exchange, "order.create", null, msg2.getBytes());
channel.basicPublish(exchange, "order.delete", null, msg3.getBytes());
channel.basicPublish(exchange, "order.update", null, msg4.getBytes());
channel.basicPublish(exchange, "order.pay", null, msg5.getBytes());
channel.basicPublish(exchange, "imooc.order.pay.display", null, "慕课网订单支付".getBytes());
channel.basicPublish(exchange, "supermarket.pay", null, "超市支付".getBytes());
...

运行后结果

消费者代码:
只需要修改队列名即可

...
String topics_queue_pay = "topics_queue_pay";
channel.queueDeclare(topics_queue_pay, true, false, false, null);
...
channel.basicConsume(topics_queue_pay, true, consumer);
...

RabbitMQ集成SpringBoot(上) - 异步解耦发送短信


引入mq依赖:

<!-- SpringBoot 整合RabbitMQ 依赖 -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

在api子工程中创建短信发送的短信配置类:

/*** RabbitMQ 的配置类*/
@Configuration
public class RabbitMQSMSConfig {// 定义交换机的名字public static final String SMS_EXCHANGE = "sms_exchange";// 定义队列的名字public static final String SMS_QUEUE = "sms_queue";// 创建交换机@Bean(SMS_EXCHANGE)public Exchange exchange() {return ExchangeBuilder.topicExchange(SMS_EXCHANGE).durable(true).build();}// 创建队列@Bean(SMS_QUEUE)public Queue queue() {
//        return new Queue(SMS_QUEUE);return QueueBuilder.durable(SMS_QUEUE).build();}// 定义队列绑定到交换机的关系@Beanpublic Binding smsBinding(@Qualifier(SMS_EXCHANGE) Exchange exchange,@Qualifier(SMS_QUEUE) Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("imooc.sms.#").noargs(); // 执行绑定}
}

异步解耦短信发送:

spring:rabbitmq:host: 192.168.1.122port: 5672username: rabbitmqpassword: rabbitmqvirtual-host: imooc-space
@Autowired
private RabbitTemplate rabbitTemplate;...
// 把短信内容和手机号构建为一个bean并且转换为json作为消息发送给mq
rabbitTemplate.convertAndSend(RabbitMQSMSConfig.SMS_EXCHANGE,"imooc.sms.login.send",bodyJson);
...

RabbitMQ集成SpringBoot(下) - 监听消费短信发送

创建一个资源服务service-resource-4001,目前用于消费者监听

@Component
@Slf4j
public class RabbitMQSMSConsumer {/*** 监听队列,并且处理消息*/@RabbitListener(queues = {RabbitMQSMSConfig.SMS_QUEUE})public void watchQueue(String payload, Message message) {log.info("payload = " + payload);String routingKey = message.getMessageProperties().getReceivedRoutingKey();log.info("routingKey = " + routingKey);String msg = payload;SMSContentQO smsContent = GsonUtils.stringToBean(msg, SMSContentQO.class);// 发送短信 TODO}
}

最终运行测试即可,实现短信发送的异步解耦。

消息的可靠性投递Confirm机制

那么现在我们在进行短信发送的时候,直接发送就可以了。假设,现在我们要确保消息不被丢失,是可靠的发送到mq服务,并且也能够确保被消费了。这个时候应该怎么处理呢?这个其实就是消息的可靠性。
因为生产者发消息到交换机再到队列,这之间是有链路的,每个链路的过程都有可能出错,导致消息没有正常发送。

RabbitMQ有两种方式来控制消息可靠性。

  • confirm 确认模式
    • 消息一旦从生产者发给MQ server,也就是被交换机接收到,这个时候会有一个回调 confirmCallback,代表mq服务接收到了。但是这个消息不管是否成功,这个回调函数一定会执行
  • return 回退模式
    • 交换机把消息路由到队列的时候如果出现问题,则触发回调 returnCallback,只有失败才会执行这个函数。

上代码:

配置confirm类型:

  • NONE: 禁用发布确认模式,是默认的
  • CORRELATED: 发布消息成功到交换器后会触发回调方法
  • SIMPLE: 使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法

定义confirmCallback回调函数:

运行测试,观察结果。

 消息的可靠性投递Return机制

配置return开启:

定义returnCallback回调函数:

(路由key写错,则不会路由到队列,用于测试)
运行后观察结果:

  • message: 消息数据
  • replyCode: 错误code
  • replyText: 错误信息
  • exchange: 交换机
  • routingKey: 发送的路由地址

这就相当于发邮件的时候,如果你发的地址不存在有问题,对方接收不到,则你会收到一个退回邮件,机制是差不多的。

如此一来,我们就可以通过confirm和return两种机制来记录这个过程的日志,如果错误日志出现很频繁说明咱们的mq可能就不太稳定或者有什么其他因素需要排查了。

提问:RabbitMQ有没有事务机制?
也有,但是性能很差,所以平时不用。

消费端可靠性ACK机制

前面两节所用的confirm和return都是生产端机制,此外还有ack模式,是基于消费端的可靠性。

什么是ack呢?
ack的意思是Acknowledge,确认的意思,表示消费者收到消息后的确认,可以理解为这是一个信息的回执,以前我们寄信的时候,收件人收到信后会有一个回执,也就是签收单,这个交给邮局的,邮局就是broker,也是一种签收确认的机制吧。mq同理。

ack机制有三种确认方式:

  • 自动确认: Acknowledge=none(默认)
    • 消费者收到消息,则会自动给broker一个回执,表示消息收到了,但是消费者里的监听方法内部是我们自己的业务,业务是否成功,他不管的,如果业务出现问题出现异常,那么也就相当于这条消息是失败的。
    • 这就相当于我寄了一个快递,对方收到后快递公司就自动确认了快递的签收确认,这是很常见的手段吧。一旦快递内部是否破损我们就不知道了,对吧。
  • 手动确认:Acknowledge=manual
    • 消费端收到消息后,不会通知broker回执,需要等待我们自己的业务处理完毕OK了没问题,然后手动写一条代码去确认,当然如果出现错误异常,我们可以有兜底的处理方法,比如记录日志或者重新发新的消息都行。
  • 根据异常类型确定:Acknowledge=auto
    • 消费端处理业务出现不同的异常,根据异常的类型来做出不同的响应处理,这种方式比较麻烦,需要提前预判很多异常类型。这里了解一下有这个类型即可。

代码中设置手动签收

创建新的方法,加入参数channel

运行后发送消息查看结果:

发现有一条消息是未被确认的,因为我们没有手动确认,如果是默认的确认方式,这个时候是不会有unacked的。

添加除零错误

手动确认和消息重发

@RabbitListener(queues = {RabbitMQSMSConfig.SMS_QUEUE})
public void watchQueue(Message message, Channel channel) throws Exception {try {String routingKey = message.getMessageProperties().getReceivedRoutingKey();log.info("routingKey = " + routingKey);int a = 100 / 0;String msg = new String(message.getBody());log.info("msg = " + msg);/*** deliveryTag: 消息投递标签* multiple: 批量确认所有消费者获得的消息*/channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);} catch (Exception e) {e.printStackTrace();/*** requeue: true:重新回到队列, false:丢弃消息*/channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,true);// 不支持requeue// channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);}
}

RabbitMQ 消费者消息限流

我们在一开始介绍MQ的时候,就提到了削峰填谷,本质上就是限流,所以我们需要对限流做一个落地的实现。那么现在提出一个需求,假设用户发短信一下子太多了,那么消费者在进行消费处理业务的时候,也是需要进行限流的。

限流主要在消费者这一块,基于消费者做代码实现。并且基于手动ack的开启。

prefetch的设置,其实也是类似于负载均衡,为了更有效的利用服务器资源。也可以提高消费者的吞吐量。

生产者加一个for循环模拟多条消息的发送。

消费者手动ack之前打一个断点,

运行后观察:

此时我们第一个批次拉取了2条消息,但是没有确认,剩余8条还存在于队列中。
断点继续:

每次都是未确认2个,消息会以此从mq中拉取。

把prefetch注释掉再测试:

断点再一次进来,可以看到,消费者是一次性拉取了10条消息,并没有做到限流。

所以,通过prefetch可以达到消费端控制消费速度的效果。

RabbitMQ ttl特性控制短信队列超时

假设现在短信发送量很多,消息太多太多了,可能处理不过来,那么假设短信验证码本身就是5分钟内失效的,但是5分钟过后还没有发出去,mq消息还是在队列中没有被消费者消费,那么这条消息其实是作废的,没有用的。而且本身用户已经等了那么久了都没收到,所以我们能不能索性设定一个时间为60s,60秒还没有消费消息,那么就不消费了呗,让用户再次发送一个请求完事。

此时我们可以使用ttl这个特性。

  • TTL:time to live 存活时间,和redis的ttl是一个道理
  • 如果消息到到ttl的时候,还没有被消费,则自动清除
  • RabbitMQ可以对消息或者整个队列设置ttl

代码实现,生产端配置超时ttl时间(单位:毫秒)

1. 对整个队列设置ttl:

参数可以从控制台复制:

把消费者的监听去除,不要进行消费:

运行并且观察:
报错,因为现在队列是没有ttl的,我们需要删除后重新创建。

(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-message-ttl' for queue 'sms_queue' in vhost '/': received the value '10000' of type 'signedint' but current is none,

再次运行测试:
发现这个队列额外多了一个ttl的标签

此外当前有10条消息等待被消费

但是10秒过后,没有消费超时了,则自动丢弃

2. 对单独一个消息进行ttl设置

运行后观察:
目前有1条消息

10秒过后,消息过期了抛弃,如下:

注意:如果同时设置了两种消息过期方式和的话,时间短的会优先触发。

RabbitMQ 死信队列的实现

前面讲了ttl,那么在这里就必须来提一嘴死信队列

  • 死信队列:DLX,dead letter exchange。
  • 当一个消息了以后,就会被发送到死信交换机DLX

前面我们使用了ttl超时机制,如果我们队列绑定了DLX死信交换机,那么超时后,消息不会被抛弃,而是会进入到死信交换机,死信交换机绑定了其他队列(称之为死信队列),那么这个时候我们就可以处理那些被抛弃的消息了。

此外除了ttl超时进入死信队列,还有两种情况也会进入到死信队列:

  • 队列消息长度已经到达限制,我一个队列设置最多1000条消息的量,后续进入的消息就抛弃了,就像我只能是10碗饭,你再盛饭给我吃就吃不下了,只能扔了。
  • 消费者手动驳回消息,nack或者reject,并且requeue为false,就像我吃饭的时候,一不小心里面进了一只苍蝇,我不会把这碗饭重新放入饭盆里吧,只能扔了。

代码实现

配置死信交换机和队列:(步骤和之前创建队列交换机以及绑定关系是一致的)

/*** RabbitMQ 的配置类 死信队列*/
@Configuration
public class RabbitMQSMSConfig_Dead {// 定义交换机的名称public static final String SMS_EXCHANGE_DEAD = "sms_exchange_dead";// 定义队列的名称public static final String SMS_QUEUE_DEAD = "sms_queue_dead";// 统一定义路由keypublic static final String ROUTING_KEY_SMS_DEAD = "dead.sms.display";// 创建交换机@Bean(SMS_EXCHANGE_DEAD)public Exchange exchange() {return ExchangeBuilder.topicExchange(SMS_EXCHANGE_DEAD).durable(true).build();}// 创建队列@Bean(SMS_QUEUE_DEAD)public Queue queue() {
//        return new Queue(SMS_QUEUE);return QueueBuilder.durable(SMS_QUEUE_DEAD).build();}// 创建绑定关系@Beanpublic Binding smsDeadBinding(@Qualifier(SMS_EXCHANGE_DEAD) Exchange exchange,@Qualifier(SMS_QUEUE_DEAD) Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("dead.sms.*").noargs();}}

为消费者绑定死信队列,配置死信队列的参数:
需要设置死信交换机和死信路由key,队列长度也可以限制:

死信队列监听的消费者:

运行结果:
超过长度的已经丢弃并且进入死信队里

10秒种后剩余6条消息没有被消费则进入死信队列

ttl+死信队列可以实现延迟队列,但是后期我们会使用延迟插件安装来实现延迟队列,更加方便。

手动nack以及reject课后去尝试一下。

本章小结

我们为什么会选择使用mq呢,因为在一个大型项目里,涉及发短信的地方太多了,如果每个地方都这么用异步任务来一下,会显得有些乱,而且每个项目里都有,使用mq以后,可以在一个单独的项目中,专门负责用于进行短信发送的消费,这样就更加解耦了,更加体现单一职责了。如此一来,mq可以更好的管理消费者,或者说他有更细的细粒度。甚至说消费者服务可以有很多,我们还能创建一个子嵌套聚合工程进行消费者管理。

异步任务 放入到api里,两点不好,一个是大量业务在api共用工程,不太好,第二个,本质上项目没有做到解耦,打包还是在一起的,虽然业务异步,但是代码并未解耦。而mq可以用一个专有的短信监听服务,专门异步发送所有短信场景,这才是比较 ok的。

作业

手动ack重回队列发生死循环的时候,如何设定参数进行手动上报预警。

  • 需要使用到Nacos分布式配置,redis计数,短信或邮箱发送

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/491196.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

RTMP推流平台EasyDSS在无人机推流直播安防监控中的创新应用

无人机与低空经济的关系密切&#xff0c;并且正在快速发展。2024年中国低空经济行业市场规模达到5800亿元&#xff0c;其中低空制造产业占整个低空经济产业的88%。预计未来五年复合增速将达到16.03%。 随着科技的飞速发展&#xff0c;公共安防关乎每一个市民的生命财产安全。在…

PCIE概述

PCIE概述 文章目录 PCIE概述前言一、应用场景二、PCIE理论2.1 硬件2.2 拓扑结构&#xff1a;处理器和设备之间的关系2.3 速率2.4 层次接口2.5 四种请求类型2.5.1 bar空间2.5.2 memory2.5.3 IO2.5.4 configuration2.5.5 message 前言 参考链接&#xff1a; pcie总线知识点解析 …

SpringBoot SPI

参考 https://blog.csdn.net/Peelarmy/article/details/106872570 https://javaguide.cn/java/basis/spi.html#%E4%BD%95%E8%B0%93-spi SPI SPI(service provider interface)是JDK提供的服务发现机制。以JDBC为例&#xff0c;JDK提供JDBC接口&#xff0c;在包java.sql.*。MY…

超详细!Jmeter性能测试

前言 性能测试是一个全栈工程师/架构师必会的技能之一&#xff0c;只有学会性能测试&#xff0c;才能根据得到的测试报告进行分析&#xff0c;找到系统性能的瓶颈所在&#xff0c;而这也是优化架构设计中重要的依据。 测试流程&#xff1a; 需求分析→环境搭建→测试计划→脚…

快速本地化部署 OnlyOffice服务 ( Linux+Docker)

文章目录 一、OnlyOffice介绍&#x1f4d6;二、集成OnlyOffice&#x1f9e9;2.1 环境准备&#x1f5a5;️2.2 搜索镜像2.3 拉取镜像2.4 查看镜像2.5 创建容器2.6 进入容器配置2.7 重启服务2.8 添加字体字号2.9 测试OnlyOffice服务 三、在线预览office文档四、Cpolar内网穿透 一…

加密--03--MD5-- JAVA运用(hutool工具包)

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 hutool1.简介2.pom.xml3.Hutool-crypto概述4.MD5 加密 案例11.hutool依赖2.用户表3.加密方法4.业务代码 hutool https://www.hutool.cn/docs/#/ 1.简介 2.pom.xml …

CSS 实现带tooltip的slider

现代 CSS 强大的令人难以置信 这次我们来用 CSS 实现一个全功能的滑动输入器&#xff0c;也就是各大组件库都有的slider&#xff0c;效果如下 还可以改变一下样式&#xff0c;像这样 特别是在拖动时&#xff0c;tooltip还能跟随拖动的方向和速度呈现不同的倾斜角度&#xff0c…

关于SAP Router连接不稳定的改良

这个也是网上看来的&#xff0c;之前在用的时候也在想是不是建立一个长连接&#xff0c;就不至于断线。今天正好看到。 关于SAP Router连接不稳定的改良 我们在使用SAPRouter时经常会碰到断线&#xff0c;其发生原因有很多&#xff0c;如&#xff1a;网络不稳定、操作间隔时间…

docker 搭建自动唤醒UpSnap工具

1、拉取阿里UpSnap镜像 docker pull crpi-k5k93ldwfc7o75ip.cn-hangzhou.personal.cr.aliyuncs.com/upsnap/upsnap:4 2、创建docker-compose.yml文件&#xff0c;进行配置&#xff1a; version: "3" services:upsnap:container_name: upsnapimage: crpi-k5k93ldwf…

Python课设-谁为影狂-豆瓣数据【数据获取与预处理课设】

&#x1f3c6; 作者简介&#xff1a;席万里 ⚡ 个人网站&#xff1a;https://dahua.bloggo.chat/ ✍️ 一名后端开发小趴菜&#xff0c;同时略懂Vue与React前端技术&#xff0c;也了解一点微信小程序开发。 &#x1f37b; 对计算机充满兴趣&#xff0c;愿意并且希望学习更多的技…

STM32 进阶SPI案例1:软件模拟SPI读写FLASH

需求描述 基于寄存器操作&#xff0c;使用软件模拟SPI协议&#xff0c;完成读写FLASH。 硬件电路设计 寄存器代码书写 main.c #include "usart1.h" #include "string.h" #include <stdio.h> #include "m24c02.h" #include "soft_…

Qt WORD/PDF(一)使用 QtPdfium库实现 PDF 预览

文章目录 一、简介二、下载 QtPdfium三、加载 QtPdfium 动态库四、Demo 使用 关于QT Widget 其它文章请点击这里: QT Widget 姊妹篇: Qt WORD/PDF&#xff08;一&#xff09;使用 QtPdfium库实现 PDF 操作 Qt WORD/PDF&#xff08;二&#xff09;使用 QtPdfium库实现…

【SpringCloud】OpenFeign配置时间Decode

文章目录 1.自定义反序列化器2.配置类与自定义 ObjectMapper客户端 需求&#xff1a;OpenFeign配置自定义decode&#xff0c;解析http请求返回的时间字符串 1.自定义反序列化器 Date 自定义反序列化器 import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.j…

java程序设计2(二)

自动装箱和自动拆箱&#xff1a;基本数据类型和包装类型之间可以直接相互转换的包装类通常可以区分有效数据和无效数据&#xff0c;例如&#xff1a;0和nullString类 获取字符串的方式 【企业面试】 String str1 "hello"; 这种获取字符串的方式&#xff0c;在串池…

百度地图JavaScript API核心功能指引

百度地图JavaScript API是一套由JavaScript语言编写的应用程序接口&#xff0c;它能够帮助您在网站中构建功能丰富、交互性强的地图应用&#xff0c;包含了构建地图基本功能的各种接口&#xff0c;提供了诸如本地搜索、路线规划等数据服务。百度地图JavaScript API支持HTTP和HT…

让 Win10 上网本 Debug 模式 QUDPSocket 信号槽 收发不丢包的方法总结

在前两篇文章里&#xff0c;我们探讨了不少UDP丢包的解决方案。经过几年的摸索测试&#xff0c;其实方法非常简单, 无需修改代码。 1. Windows 下设置UDP缓存 这个方法可以一劳永逸解决UDP的收发丢包问题&#xff0c;只要添加注册表项目并重启即可。即使用Qt的信号与槽&#…

【他山之石】Leading-Trim: The Future of Digital Typesetting:数字排版的未来 —— Leading-Trim

文章目录 【他山之石】Leading-Trim: The Future of Digital Typesetting&#xff1a;数字排版的未来 —— Leading-TrimHow an emerging CSS standard can fix old problems and raise the bar for web apps1. The problem with text boxes today2. How we got here: a histor…

MySQL基础大全(看这一篇足够!!!)

文章目录 前言一、初识MySQL1.1 数据库基础1.2 数据库技术构成1.2.1 数据库系统1.2.2 SQL语言1.2.3 数据库访问接口 1.3 什么是MySQL 二、数据库的基本操作2.1 数据库创建和删除2.2 数据库存储引擎2.2.1 MySQL存储引擎简介2.2.2 InnoDB存储引擎2.2.3 MyISAM存储引擎2.2.4 存储引…

uniapp/HBuilder X引入weex报错weex is not defined

出现错误&#xff1a; ‍[⁠ReferenceError⁠]‍ {message: "weex is not defined"} 在www.iconfont.cn把想要的图标放进个人项目中并且下载css文件&#xff1a; 进入HBuilder自己创建的项目中添加一个目录common&#xff0c;添加一个文件free-icon.css 把刚才下载…

Python随机抽取Excel数据并在处理后整合为一个文件

本文介绍基于Python语言&#xff0c;针对一个文件夹下大量的Excel表格文件&#xff0c;基于其中每一个文件&#xff0c;随机从其中选取一部分数据&#xff0c;并将全部文件中随机获取的数据合并为一个新的Excel表格文件的方法。 首先&#xff0c;我们来明确一下本文的具体需求。…