微服务高级篇(五):可靠消息服务

文章目录

  • 一、消息队列MQ存在的问题?
  • 二、如何保证 `消息可靠性` ?
    • 2.1 生产者消息确认【对生产者配置】
    • 2.2 消息持久化
    • 2.3 消费者消息确认【对消费者配置】
    • 2.4 消费失败重试机制
    • 2.5 消费者失败消息处理策略
    • 2.6 总结
  • 三、处理延迟消息?死信交换机
    • 3.1 死信交换机概念
    • 3.2 TTL
    • 3.3 延迟队列 Delay Queue
      • 3.3.1 安装DelayExchange插件
      • 3.3.2 SpringAMQP使用延迟队列插件
  • 四、处理消息堆积问题?惰性队列
    • 4.1 消息堆积问题
    • 4.2 惰性队列 Lazy Queue
  • 五、高可用性:MQ集群
    • 5.1 集群分类
    • 5.2 普通集群
      • 5.2.1 部署
      • 5.2.2 获取cookie
      • 5.2.3 准备集群配置
      • 5.2.4 启动集群
      • 5.2.5 测试
    • 5.3 镜像集群
      • 5.3.1 镜像模式的特征
      • 5.3.2 镜像模式的配置
      • 5.3.3 测试
    • 5.4 仲裁队列
      • 5.4.1 添加仲裁队列
      • 5.4.2 测试
      • 5.4.3 集群扩容
    • 5.5 使用SpringAMQP创建仲裁队列


一、消息队列MQ存在的问题?

  1. 消息的可靠性问题:如何确保发送的消息至少被消费一次
  2. 延迟消息问题:如何实现消息的延迟投递
  3. 消息堆积问题:如何解决数百万消息堆积,无法及时消费的问题
  4. 高可用问题:如何避免单点的MQ故障而导致的不可用问题

二、如何保证 消息可靠性

2.1 生产者消息确认【对生产者配置】

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

演示:前提RabbitMQ有一个交换机amq.topic,队列simple.queue,并且二者绑定bind了rountingKey为simple.#

在这里插入图片描述

CommonConfig.java

@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {// 获取RabbitTemplate对象RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);// 配置ReturnCallbackrabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {// 判断是否是延迟消息Integer receivedDelay = message.getMessageProperties().getReceivedDelay();if (receivedDelay != null && receivedDelay > 0) {// 是一个延迟消息,忽略这个错误提示return;}// 记录日志log.error("消息发送到队列失败。响应码:{}, 失败原因:{}, 交换机: {}, 路由key:{}, 消息: {}",replyCode, replyText, exchange, routingKey, message.toString());// 如果有需要的话,重发消息});}
}

测试类:

    @Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendMessage2SimpleQueue() throws InterruptedException {// 1.准备消息String message = "hello, spring amqp!";// 2.准备CorrelationData// 2.1.消息IDCorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());// 2.2.准备ConfirmCallbackcorrelationData.getFuture().addCallback(result -> { // 1)成功回调// 判断结果if (result.isAck()) {// ACKlog.debug("消息成功投递到交换机!消息ID: {}", correlationData.getId());} else {// NACKlog.error("消息投递到交换机失败!消息ID:{}", correlationData.getId());// 重发消息}}, ex -> { // 2)失败回调// 记录日志log.error("消息发送失败!", ex);// 重发消息});// 3.发送消息rabbitTemplate.convertAndSend("amq.topic", "simple.test", message, correlationData);}

此时结果为:

22:28:57:420 DEBUG 9600 --- [07.236.163:5672] cn.itcast.mq.spring.SpringAmqpTest       : 消息成功投递到交换机!消息ID: 06e7f233-d126-4c23-8499-547869e7a9b6

rabbitTemplate.convertAndSend("aaaaamq.topic", "simple.test", message, correlationData);,此时结果为:

22:42:30:013 ERROR 18652 --- [nectionFactory1] cn.itcast.mq.spring.SpringAmqpTest       : 消息投递到交换机失败!消息ID4b3d6575-03bc-4473-a343-5c8ef30c6496

rabbitTemplate.convertAndSend("amq.topic", "aaa.simple.test", message, correlationData);,此时结果为:

22:28:57:420 DEBUG 9600 --- [07.236.163:5672] cn.itcast.mq.spring.SpringAmqpTest       : 消息成功投递到交换机!消息ID: 06e7f233-d126-4c23-8499-547869e7a9b6
22:28:57:424 ERROR 9600 --- [nectionFactory1] cn.itcast.mq.config.CommonConfig         : 消息发送到队列失败。响应码:312, 失败原因:NO_ROUTE, 交换机: amq.topic, 路由key:a.simple.test, 消息: (Body:'hello, spring amqp!' MessageProperties [headers={spring_returned_message_correlation=06e7f233-d126-4c23-8499-547869e7a9b6}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])

2.2 消息持久化

当通过docker restart mq重启RabbitMQ发现,之前设置的队列都被清理了,因此需要做消息的持久化。

下面交换机与队列的持久化在消费者中进行配置,消息的持久化在生产者中配置:

在这里插入图片描述

其实Spring默认消息是持久化的,无需专门设置,所以按照之前的写法也行。

2.3 消费者消息确认【对消费者配置】

消费者也需要进行消息的确认,当无异常时,会发送ack回执;当有异常时,会发出nack回执,告诉生产者我这里有异常,你再发送一遍消息。
下面三种方式最常使用的时自动auto生成回执。

在这里插入图片描述

如果利用auto机制,消费者有异常时,会发出nack回执,告诉生产者我这里有异常,生产者会发送消息,再异常再发,导致压力倍增。下面一节将处理这个问题。

2.4 消费失败重试机制

在这里插入图片描述

演示:

在这里插入图片描述

2.5 消费者失败消息处理策略

上述消费者本地重试消息失败后,默认消息会被丢弃,其实还有其他方式。

最常用的是RepublisjMessageRecover:将失败消息给专门处理失败消息的交换机和队列,然后人工处理其消息
在这里插入图片描述

/*** 处理异常消息的交换机和队列*/
@Configuration
public class ErrorMessageConfig {// 处理异常消息的交换机@Beanpublic DirectExchange errorMessageExchange(){return new DirectExchange("error.direct");}// 处理异常消息的队列@Beanpublic Queue errorQueue(){return new Queue("error.queue");}// 交换机和队列的绑定@Beanpublic Binding errorMessageBinding(){return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error");}// 消费者失败消息处理策略:// republishMessageRecoverer方式@Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");}
}

在这里插入图片描述

2.6 总结

如何确保RabbitMQ消息的可靠性?

  1. 开启生产者确认机制,确保生产者的消息能到达队列
  2. 开启持久化功能,确保消息未消费前在队列中不会丢失
  3. 开启消费者确认机制为auto,由spring确认消息处理成功后完成ack
  4. 开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理

三、处理延迟消息?死信交换机

3.1 死信交换机概念

死信交换机看似与上一节的失败交换机类似,但失败交换机是由消费者将失败消息投递给交换机,而死信交换机模式是由队列将消息投递给死信交换机。

在这里插入图片描述

在这里插入图片描述

3.2 TTL

TTL,也就是Time-To-Live。如果一个队列中的消息TTL结束仍未消费,则会变为死信,ttl超时分为两种情况:

1)消息所在的队列设置了存活时间

2)消息本身设置了存活时间

下面实现两个交换机队列:TTL交换机队列,死信交换机队列

在这里插入图片描述

1.在cunsumer中指定TTL交换机队列(超时时间为10秒),死信交换机队列

@Configuration
public class TTLMessageConfig {// TTL交换机@Beanpublic DirectExchange ttlDirectExchange(){return new DirectExchange("ttl.direct");}// TTL队列@Beanpublic Queue ttlQueue(){return QueueBuilder.durable("ttl.queue").ttl(10000)   // 队列的超时时间为10秒.deadLetterExchange("dl.direct") // 死信交换机.deadLetterRoutingKey("dl") // 死信交换机的rountingKey.build();}@Beanpublic Binding ttlBinding(){return BindingBuilder.bind(ttlQueue()).to(ttlDirectExchange()).with("ttl");}
}
@Slf4j
@Component
public class SpringRabbitListener {/*** 指定死信交换机、队列、绑定rountingKey值* @param msg*/@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "dl.queue", durable = "true"),exchange = @Exchange(name = "dl.direct"),key = "dl"))public void listenDlQueue(String msg) {log.info("消费者接收到了dl.queue的延迟消息");}
}

2.在publisher中编写测试类,发送一个超时时间为5秒的消息

    /*** 演示死信交换机:* 发送一个TTL超时时间为5秒的消息*/@Testpublic void testTTLMessage() {// 1.准备消息Message message = MessageBuilder.withBody("hello, ttl messsage".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).setExpiration("5000").build();// 2.发送消息rabbitTemplate.convertAndSend("ttl.direct", "ttl", message);// 3.记录日志log.info("消息已经成功发送!");}

3.启动消费者和生产者,发现生产者发送消息5秒后消费者才接受到,可以得出结论:取TTL队列和TTL消息中超时时间短的发送给死信交换机。

生产者的日志:
10:38:55:766  INFO 13084 --- [           main] cn.itcast.mq.spring.SpringAmqpTest       : 消息已经成功发送!消费者的日志:
10:39:00:807  INFO 14876 --- [ntContainer#0-1] c.i.mq.listener.SpringRabbitListener     : 消费者接收到了dl.queue的延迟消息

总结:

1.消息超时的两种方式是?

  • 1)给队列设置ttl属性,进入队列后超过ttl时间的消息变为死信
  • 2)给消息设置ttl属性,队列接收到消息超过ttl时间后变为死信
  • 3)两者共存时,以时间短的ttl为准

2.如何实现发送一个消息20秒后消费者才收到消息?

  • 1)给消息的目标队列指定死信交换机
  • 2)消费者监听与死信交换机绑定的队列
  • 3)发送消息时给消息设置ttl为20秒

3.3 延迟队列 Delay Queue

利用TTL结合死信交换机,我们实现了消息发出后,消费者延迟收到消息的效果。这种消息模式就称为延迟队列(Delay Queue)模式。

延迟队列的使用场景包括:

  • 延迟发送短信
  • 用户下单,如果用户在15 分钟内未支付,则自动取消
  • 预约工作会议,20分钟后自动通知所有参会人员

3.3.1 安装DelayExchange插件

官方的安装指南地址为:https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq

上述文档是基于linux原生安装RabbitMQ,然后安装插件。

因为我们之前是基于Docker安装RabbitMQ,所以下面我们会讲解基于Docker来安装RabbitMQ插件。

  1. 下载插件

RabbitMQ有一个官方的插件社区,地址为:https://www.rabbitmq.com/community-plugins.html

其中包含各种各样的插件,包括我们要使用的DelayExchange插件:

大家可以去对应的GitHub页面下载3.8.9版本的插件,地址为https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/tag/3.8.9这个对应RabbitMQ的3.8.5以上版本。

课前资料也提供了下载好的插件:rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez

  1. 上传插件

因为我们是基于Docker安装,所以需要先查看RabbitMQ的插件目录对应的数据卷。如果不是基于Docker的同学,请参考第一章部分,重新创建Docker容器。

我们之前设定的RabbitMQ的数据卷名称为mq-plugins,所以我们使用下面命令查看数据卷:

docker volume inspect mq-plugins

可以得到下面结果:

[root@iZ2ze1r1nnqykr8zfme6cjZ tmp]# docker volume inspect mq-plugins
[{"CreatedAt": "2024-03-26T11:02:23+08:00","Driver": "local","Labels": null,"Mountpoint": "/var/lib/docker/volumes/mq-plugins/_data","Name": "mq-plugins","Options": null,"Scope": "local"}
]

接下来,将插件上传到这个目录即可:Mountpoint目录下

  1. 安装插件

最后就是安装了,需要进入MQ容器内部来执行安装。我的容器名为mq,所以执行下面命令:

docker exec -it mq bash

执行时,请将其中的 -it 后面的mq替换为你自己的容器名.

进入容器内部后,执行下面命令开启插件:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

结果如下:

Enabling plugins on node rabbit@mq1:
rabbitmq_delayed_message_exchange
The following plugins have been configured:rabbitmq_delayed_message_exchangerabbitmq_managementrabbitmq_management_agentrabbitmq_prometheusrabbitmq_web_dispatch
Applying plugin configuration to rabbit@mq1...
The following plugins have been enabled:rabbitmq_delayed_message_exchangestarted 1 plugins.

最后通过exit命令退出容器

  1. 使用插件

DelayExchange插件的原理是对官方原生的Exchange做了功能的升级:

  • 将DelayExchange接受到的消息暂存在内存中(官方的Exchange是无法存储消息的)。
  • 在DelayExchange中计时,超时后才投递消息到队列中

1)如何创建一个延迟交换机?

在RabbitMQ的管理平台声明一个DelayExchange:

在这里插入图片描述

2)如何发送一个延迟消息?

消息的延迟时间需要在发送消息的时候指定:
在这里插入图片描述

3.3.2 SpringAMQP使用延迟队列插件

演示:在消费者consumer声明延迟交换机,有两种方式:
在这里插入图片描述
在这里插入图片描述
然后在发布者publisher发送一个延迟消息,添加一个x-delay
在这里插入图片描述
测试发现报错,解释:理论上消息到达了交换机会立即将消息转发出去,而延迟交换机会将消息保存下来,等时间到后再转发消息,因此在等待的时间会返回给发布者一个未到达队列的错误。

生产者的日志:
11:25:22:536  INFO 5756 --- [           main] cn.itcast.mq.spring.SpringAmqpTest       : 发送消息成功
11:25:22:552 ERROR 5756 --- [nectionFactory1] cn.itcast.mq.config.CommonConfig         : 消息发送到队列失败。响应码:312, 失败原因:NO_ROUTE, 交换机: delay.direct, 路由key:delay, 消息: (Body:'[B@5f1dfcce(byte[19])' MessageProperties [headers={spring_returned_message_correlation=c411ed18-e4ea-44e0-add9-0d867bbbf282}, contentType=application/octet-stream, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, receivedDelay=5000, deliveryTag=0])Process finished with exit code 0消费者的日志:
11:25:27:556  INFO 2612 --- [ntContainer#2-1] c.i.mq.listener.SpringRabbitListener     : 消费者接收到了delay.queue的延迟消息

这里我们修改发布者的ReturnCallback,添加逻辑忽略延迟消息。

@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {// 获取RabbitTemplate对象RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);// 配置ReturnCallbackrabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {// 判断是否是延迟消息Integer receivedDelay = message.getMessageProperties().getReceivedDelay();if (receivedDelay != null && receivedDelay > 0) {// 是一个延迟消息,忽略这个错误提示return;}// 记录日志log.error("消息发送到队列失败。响应码:{}, 失败原因:{}, 交换机: {}, 路由key:{}, 消息: {}",replyCode, replyText, exchange, routingKey, message.toString());// 如果有需要的话,重发消息});}
}

最后结果如下:

生产者的日志:
[delegate=amqp://itcast@39.107.236.163:5672/, localPort= 55991]
11:19:50:778  INFO 18824 --- [           main] cn.itcast.mq.spring.SpringAmqpTest       : 发送消息成功消费者的日志:
11:19:55:816  INFO 13184 --- [ntContainer#2-1] c.i.mq.listener.SpringRabbitListener     : 消费者接收到了delay.queue的延迟消息

四、处理消息堆积问题?惰性队列

4.1 消息堆积问题

当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限。最早接收到的消息,可能就会成为死信,会被丢弃,这就是消息堆积问题。

解决消息堆积有三种种思路:

  • 增加更多消费者,提高消费速度
  • 在消费者内开启线程池加快消息处理速度
  • 扩大队列容积,提高堆积上限

4.2 惰性队列 Lazy Queue

从RabbitMQ的3.6.0版本开始,就增加了Lazy Queues的概念,也就是惰性队列。惰性队列的特征如下:

  • 接收到消息后直接存入磁盘而非内存
  • 消费者要消费消息时才会从磁盘中读取并加载到内存
  • 支持数百万条的消息存储

设置一个队列为惰性队列的方法?

方法一:只需要在声明队列时,指定x-queue-mode属性为lazy即可。可以通过命令行将一个运行中的队列修改为惰性队列:
在这里插入图片描述

方法二:使用SpringAMQP声明惰性队列:

在这里插入图片描述

演示:定义两个队列,惰性队列和普通队列,分别发送一百万条消息,看内存和磁盘情况

/*** 惰性队列*/
@Configuration
public class LazyConfig {// 惰性队列@Beanpublic Queue lazyQueue() {return QueueBuilder.durable("lazy.queue").lazy().build();}// 普通队列@Beanpublic Queue normalQueue() {return QueueBuilder.durable("normal.queue").build();}
}

惰性队列:消息全在磁盘中

    /*** 演示惰性队列:* 发送一百万条消息*/@Testpublic void testLazyQueue() throws InterruptedException {long b = System.nanoTime();for (int i = 0; i < 1000000; i++) {// 1.准备消息Message message = MessageBuilder.withBody("hello, Spring".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).build();// 2.发送消息rabbitTemplate.convertAndSend("lazy.queue", message);}long e = System.nanoTime();System.out.println(e - b);}

在这里插入图片描述

普通队列:消息再磁盘和内存中

    /*** 演示普通队列:* 发送一百万条消息*/@Testpublic void testNormalQueue() throws InterruptedException {long b = System.nanoTime();for (int i = 0; i < 1000000; i++) {// 1.准备消息Message message = MessageBuilder.withBody("hello, Spring".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).build();// 2.发送消息rabbitTemplate.convertAndSend("normal.queue", message);}long e = System.nanoTime();System.out.println(e - b);}

在这里插入图片描述

在这里插入图片描述

五、高可用性:MQ集群

5.1 集群分类

在这里插入图片描述

在RabbitMQ的官方文档中,讲述了两种集群的配置方式:

  • 普通模式:普通模式集群不进行数据同步,每个MQ都有自己的队列、数据信息(其它元数据信息如交换机等会同步)。例如我们有2个MQ:mq1,和mq2,如果你的消息在mq1,而你连接到了mq2,那么mq2会去mq1拉取消息,然后返回给你。如果mq1宕机,消息就会丢失。
  • 镜像模式:与普通模式不同,队列会在各个mq的镜像节点之间同步,因此你连接到任何一个镜像节点,均可获取到消息。而且如果一个节点宕机,并不会导致数据丢失。不过,这种方式增加了数据同步的带宽消耗。

5.2 普通集群

在这里插入图片描述

接下来,我们看看如何安装RabbitMQ的集群。

5.2.1 部署

我们先来看普通模式集群,我们的计划部署3节点的mq集群:记得云服务器要暴露15672、5672、8081-8083、8071-8073端口

主机名控制台端口amqp通信端口
mq18081 —> 156728071 —> 5672
mq28082 —> 156728072 —> 5672
mq38083 —> 156728073 —> 5672

集群中的节点标示默认都是:rabbit@[hostname],因此以上三个节点的名称分别为:

  • rabbit@mq1
  • rabbit@mq2
  • rabbit@mq3

5.2.2 获取cookie

RabbitMQ底层依赖于Erlang,而Erlang虚拟机就是一个面向分布式的语言,默认就支持集群模式。集群模式中的每个RabbitMQ 节点使用 cookie 来确定它们是否被允许相互通信。

要使两个节点能够通信,它们必须具有相同的共享秘密,称为Erlang cookie。cookie 只是一串最多 255 个字符的字母数字字符。

每个集群节点必须具有相同的 cookie。实例之间也需要它来相互通信。

我们先在之前启动的mq容器中获取一个cookie值,作为集群的cookie。执行下面的命令:

docker exec -it mq cat /var/lib/rabbitmq/.erlang.cookie

可以看到cookie值如下:

JBIXBEJBTRPDPHPGGZJV

接下来,停止并删除当前的mq容器,我们重新搭建集群。

docker rm -f mq# 清理数据卷
docker volume prune

5.2.3 准备集群配置

在/tmp目录新建一个配置文件 rabbitmq.conf:

cd /tmp
# 创建文件
touch rabbitmq.conf

文件内容如下:

loopback_users.guest = false
listeners.tcp.default = 5672
cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config
cluster_formation.classic_config.nodes.1 = rabbit@mq1
cluster_formation.classic_config.nodes.2 = rabbit@mq2
cluster_formation.classic_config.nodes.3 = rabbit@mq3

再创建一个文件,记录cookie

cd /tmp
# 创建cookie文件
touch .erlang.cookie
# 写入cookie
echo "JBIXBEJBTRPDPHPGGZJV" > .erlang.cookie
# 修改cookie文件的权限
chmod 600 .erlang.cookie

准备三个目录,mq1、mq2、mq3:

cd /tmp
# 创建目录
mkdir mq1 mq2 mq3

然后拷贝rabbitmq.conf、cookie文件到mq1、mq2、mq3:

# 进入/tmp
cd /tmp
# 拷贝
cp rabbitmq.conf mq1
cp rabbitmq.conf mq2
cp rabbitmq.conf mq3
cp .erlang.cookie mq1
cp .erlang.cookie mq2
cp .erlang.cookie mq3

5.2.4 启动集群

创建一个网络:

docker network create mq-net

运行命令

docker run -d --net mq-net \
-v ${PWD}/mq1/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-v ${PWD}/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=123321 \
--name mq1 \
--hostname mq1 \
-p 8071:5672 \
-p 8081:15672 \
rabbitmq:3.8-management
docker run -d --net mq-net \
-v ${PWD}/mq2/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-v ${PWD}/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=123321 \
--name mq2 \
--hostname mq2 \
-p 8072:5672 \
-p 8082:15672 \
rabbitmq:3.8-management
docker run -d --net mq-net \
-v ${PWD}/mq3/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-v ${PWD}/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=123321 \
--name mq3 \
--hostname mq3 \
-p 8073:5672 \
-p 8083:15672 \
rabbitmq:3.8-management

输入ip地址:8082,可以3个集群都能看到
在这里插入图片描述

5.2.5 测试

在mq1这个节点上添加一个队列:

如图,在mq2和mq3两个控制台也都能看到:

在这里插入图片描述

  1. 数据共享测试

点击这个队列,进入管理页面:

然后利用控制台发送一条消息到这个队列:

结果在mq2、mq3上都能看到这条消息:

  1. 可用性测试

我们让其中一台节点mq1宕机:

docker stop mq1

然后登录mq2或mq3的控制台,发现simple.queue也不可用了:

说明数据并没有拷贝到mq2和mq3。

5.3 镜像集群

在这里插入图片描述

在刚刚的案例中,一旦创建队列的主机宕机,队列就会不可用。不具备高可用能力。如果要解决这个问题,必须使用官方提供的镜像集群方案。

官方文档地址:https://www.rabbitmq.com/ha.html

5.3.1 镜像模式的特征

默认情况下,队列只保存在创建该队列的节点上。而镜像模式下,创建队列的节点被称为该队列的主节点,队列还会拷贝到集群中的其它节点,也叫做该队列的镜像节点。

但是,不同队列可以在集群中的任意节点上创建,因此不同队列的主节点可以不同。甚至,一个队列的主节点可能是另一个队列的镜像节点

用户发送给队列的一切请求,例如发送消息、消息回执默认都会在主节点完成,如果是从节点接收到请求,也会路由到主节点去完成。镜像节点仅仅起到备份数据作用

当主节点接收到消费者的ACK时,所有镜像都会删除节点中的数据。

总结如下:

  • 镜像队列结构是一主多从(从就是镜像)
  • 所有操作都是主节点完成,然后同步给镜像节点
  • 主宕机后,镜像节点会替代成新的主(如果在主从同步完成前,主就已经宕机,可能出现数据丢失)
  • 不具备负载均衡功能,因为所有操作都会有主节点完成(但是不同队列,其主节点可以不同,可以利用这个提高吞吐量)

5.3.2 镜像模式的配置

镜像模式的配置有3种模式:

ha-modeha-params效果
准确模式exactly队列的副本量count集群中队列副本(主服务器和镜像服务器之和)的数量。count如果为1意味着单个副本:即队列主节点。count值为2表示2个副本:1个队列主和1个队列镜像。换句话说:count = 镜像数量 + 1。如果群集中的节点数少于count,则该队列将镜像到所有节点。如果有集群总数大于count+1,并且包含镜像的节点出现故障,则将在另一个节点上创建一个新的镜像。
all(none)队列在群集中的所有节点之间进行镜像。队列将镜像到任何新加入的节点。镜像到所有节点将对所有群集节点施加额外的压力,包括网络I / O,磁盘I / O和磁盘空间使用情况。推荐使用exactly,设置副本数为(N / 2 +1)。
nodesnode names指定队列创建到哪些节点,如果指定的节点全部不存在,则会出现异常。如果指定的节点在集群中存在,但是暂时不可用,会创建节点到当前客户端连接到的节点。

这里我们以rabbitmqctl命令作为案例来讲解配置语法。

语法示例:

  1. exactly模式
rabbitmqctl set_policy ha-two "^two\." '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
  • rabbitmqctl set_policy:固定写法
  • ha-two:策略名称,自定义
  • "^two\.":匹配队列的正则表达式,符合命名规则的队列才生效,这里是任何以two.开头的队列名称
  • '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}': 策略内容
    • "ha-mode":"exactly":策略模式,此处是exactly模式,指定副本数量
    • "ha-params":2:策略参数,这里是2,就是副本数量为2,1主1镜像
    • "ha-sync-mode":"automatic":同步策略,默认是manual,即新加入的镜像节点不会同步旧的消息。如果设置为automatic,则新加入的镜像节点会把主节点中所有消息都同步,会带来额外的网络开销
  1. all模式
rabbitmqctl set_policy ha-all "^all\." '{"ha-mode":"all"}'
  • ha-all:策略名称,自定义
  • "^all\.":匹配所有以all.开头的队列名
  • '{"ha-mode":"all"}':策略内容
    • "ha-mode":"all":策略模式,此处是all模式,即所有节点都会称为镜像节点
  1. nodes模式
rabbitmqctl set_policy ha-nodes "^nodes\." '{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}'
  • rabbitmqctl set_policy:固定写法
  • ha-nodes:策略名称,自定义
  • "^nodes\.":匹配队列的正则表达式,符合命名规则的队列才生效,这里是任何以nodes.开头的队列名称
  • '{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}': 策略内容
    • "ha-mode":"nodes":策略模式,此处是nodes模式
    • "ha-params":["rabbit@mq1", "rabbit@mq2"]:策略参数,这里指定副本所在节点名称

5.3.3 测试

我们使用exactly模式的镜像,因为集群节点数量为3,因此镜像数量就设置为2.

我们进入mq1运行下面的命令:表示只要是以two开头的队列会创建一个镜像

# 进入mq1容器的控制台
docker exec -it mq1 bash
# 执行exactly模式
rabbitmqctl set_policy ha-two "^two\." '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
# 退出容器
exit

可以看到策略已经部署上
在这里插入图片描述

下面,我们创建一个新的队列:two.queue

在这里插入图片描述

在任意一个mq控制台查看队列:

在这里插入图片描述

  1. 测试数据共享

给two.queue发送一条消息:

然后在mq1、mq2、mq3的任意控制台都可以查看消息:

  1. 测试高可用

现在,我们让two.queue的主节点mq1宕机:

docker stop mq1

查看集群状态:

在这里插入图片描述

查看队列状态:mq2成为了主节点,mq3成为了镜像节点

在这里插入图片描述

发现依然是健康的!并且其主节点切换到了rabbit@mq2上

5.4 仲裁队列

仲裁队列:仲裁队列是3.8版本以后才有的新功能,用来替代镜像队列,具备下列特征:

  • 与镜像队列一样,都是主从模式,支持主从数据同步
  • 使用非常简单,没有复杂的配置
  • 主从同步基于Raft协议,强一致

5.4.1 添加仲裁队列

在任意控制台添加一个队列,一定要选择队列类型为Quorum类型。

在这里插入图片描述

在任意控制台查看队列:

在这里插入图片描述

可以看到,仲裁队列的 + 2字样。代表这个队列有2个镜像节点。

因为仲裁队列默认的镜像数为5。如果你的集群有7个节点,那么镜像数肯定是5;而我们集群只有3个节点,因此镜像数量就是3.

5.4.2 测试

可以参考对镜像集群的测试,效果是一样的。

5.4.3 集群扩容

  1. 加入集群

1)启动一个新的MQ容器:

docker run -d --net mq-net \
-v ${PWD}/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=123321 \
--name mq4 \
--hostname mq5 \
-p 8074:15672 \
-p 8084:15672 \
rabbitmq:3.8-management

2)进入容器控制台:

docker exec -it mq4 bash

3)停止mq进程

rabbitmqctl stop_app

4)重置RabbitMQ中的数据:

rabbitmqctl reset

5)加入mq1:

rabbitmqctl join_cluster rabbit@mq1

6)再次启动mq进程

rabbitmqctl start_app
  1. 增加仲裁队列副本

我们先查看下quorum.queue这个队列目前的副本情况,进入mq1容器:

docker exec -it mq1 bash

执行命令:

rabbitmq-queues quorum_status "quorum.queue"

结果:

现在,我们让mq4也加入进来:

rabbitmq-queues add_member "quorum.queue" "rabbit@mq4"

结果:

再次查看:

rabbitmq-queues quorum_status "quorum.queue"

查看控制台,发现quorum.queue的镜像数量也从原来的 +2 变成了 +3:

5.5 使用SpringAMQP创建仲裁队列

修改xml文件
在这里插入图片描述

声明仲裁队列

/*** 仲裁队列*/
@Configuration
public class QuorumConfig {@Beanpublic Queue quorumQueue() {return QueueBuilder.durable("quorum.queue2") // 持久化.quorum() // 仲裁队列.build();}
}

启动运行,发现队列创建成功

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/291944.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

系统需求分析报告(原件获取)

第1章 序言 第2章 引言 2.1 项目概述 2.2 编写目的 2.3 文档约定 2.4 预期读者及阅读建议 第3章 技术要求 3.1 软件开发要求 第4章 项目建设内容 第5章 系统安全需求 5.1 物理设计安全 5.2 系统安全设计 5.3 网络安全设计 5.4 应用安全设计 5.5 对用户安全管理 …

一文带你深刻了解控制台console那些事

一、前言 首先感谢小伙伴们访问我的博客&#xff0c;但是你是有多么的无聊才会选择打开我的控制台呢&#xff1f;不过还是很感谢大家通过邮件的形式&#xff0c;给我提出很多的宝贵意见。 借此机会正好和大家唠一唠前端console到底有什么魔法。 二、console.log调试必备 consol…

Android ImageView 的scaleType 属性图解

目录 前言测试素材测试布局xmlscaleType前言 一、ScaleType.FIT_CENTER 默认二、ScaleType.FIT_START三、ScaleType.FIT_END四、ScaleType.FIT_XY五、ScaleType.CENTER六、ScaleType.CENTER_CROP七、ScaleType.CENTER_INSIDE八、ScaleType.MATRIX 前言 原文链接&#xff1a; A…

阿里云服务器4核8G配置最新活动价格

阿里云服务器4核8g配置云服务器u1价格是955.58元一年&#xff0c;4核8G配置还可以选择ECS计算型c7实例、计算型c8i实例、计算平衡增强型c6e、ECS经济型e实例、AMD计算型c8a等机型等ECS实例规格&#xff0c;规格不同性能不同&#xff0c;价格也不同&#xff0c;阿里云服务器网al…

交换机干道链路

干道链路是用于交换机之间或交换机与路由器之间互连的物理链路。干道链路传输的数据帧都必须打上Tag&#xff0c;便于设备识别数据帧所属的VLAN。因此一条干道链路可以承载多个VLAN的数据帧&#xff0c;如图1-1所示。 图1-1 干道链路功能示意图 干道链路可以透传VLAN。换言之&…

鸿蒙应用开发与鸿蒙系统开发哪个更有前景?

随后迎来了不少互联网公司与华为鸿蒙原生应用达成了合作&#xff0c;像我们常见的阿里、京东、小红书、得物……等公司&#xff0c;还有一些银行也都与华为鸿蒙达成了合作。使得一时之间市场紧缺鸿蒙开发人才&#xff0c;不少公司不惜重金争抢人才。 据智联招聘的最新数据显示…

推特Twitter有直播功能吗?如何用Twitter直播?

现在各大直播平台已经成为社交媒体营销的一种重要渠道&#xff0c;它让品牌能够即时地与全球受众进行互动。据统计&#xff0c;直播市场正在迅速增长&#xff0c;预计到2028年将达到2230亿美元的规模。在这个不断扩张的市场中&#xff0c;许多社交媒体平台如YouTube、Facebook、…

[深度学习]yolov8+pyqt5搭建精美界面GUI设计源码实现二

【简单介绍】 基于目标检测算法YOLOv8和灵活的PyQt5界面开发框架&#xff0c;我们精心打造了一款集直观性、易用性和功能性于一体的目标检测GUI界面。通过深度整合YOLOv8在目标识别上的卓越能力与PyQt5的精致界面设计&#xff0c;我们成功研发出一款既高效又稳定的软件GUI。 …

【论文精读】CAM:基于上下文增强和特征细化网络的微小目标检测

文章目录 &#x1f680;&#x1f680;&#x1f680;摘要一、1️⃣ Introduction---介绍二、2️⃣Related Work---相关工作2.1 &#x1f393; 基于深度学习的对象检测器2.2 ✨多尺度特征融合2.3 ⭐️数据增强 三、3️⃣提议的方法3.1 &#x1f393; 具有上下文增强和特征细化的特…

Qt与编码

ASCII码:一个字节&#xff0c;256个字符。 Unicode:字母&#xff0c;汉字都占用两个字节。 utf-8:字母一个字节&#xff0c;汉字3个字节。 gbk:字母一个字节&#xff0c;汉字2个字节。 gb2312:可以表示汉字&#xff0c;gb2312<gbk。 编码查看&#xff1a; https://www.…

烫烫烫手的结构体大小计算来咯,很烫哦,慢慢消化。自定义类型(一)

emmm&#xff0c;在这炎热的夏天在宿舍吹着空调写着博客也是一件不错的事呢&#xff0c;今天就来来好好盘一下C语言中的自定义类型。 常常会回顾努力的自己&#xff0c;所以要给自己的努力留下足迹。 为今天努力的自己打个卡&#xff0c;留个痕迹吧 2024.03.29 小闭 目录 …

Linux之权限管理

Linux 下有两种用户&#xff1a;超级用户&#xff08; root &#xff09;、普通用户。 超级用户&#xff1a;可以再linux系统下做任何事情&#xff0c;不受限制 普通用户&#xff1a;在linux下做有限的事情。 超级用户的命令提示符是“#”&#xff0c;普通用户的命令提示符是…

【Qt 学习笔记】Day1 | Qt 开发环境的搭建

博客主页&#xff1a;Duck Bro 博客主页系列专栏&#xff1a;Qt 专栏关注博主&#xff0c;后期持续更新系列文章如果有错误感谢请大家批评指出&#xff0c;及时修改感谢大家点赞&#x1f44d;收藏⭐评论✍ Day1 | Qt 开发环境的搭建 文章编号&#xff1a;Qt 学习笔记 / 02 文…

mysql进阶知识总结

1.存储引擎 1.1MySQL体系结构 1).连接层 最上层是一些客户端和链接服务&#xff0c;包含本地sock通信和大多数基于客户端/服务端工具实现的类似于TCP/IP的通信。主要完成一些类似于连接处理、授权认证、及相关的安全方案。在该层上引入了线程池的概念&#xff0c;为通过认证…

Linux第84步_了解Linux中断及其函数

1、中断号 中断号又称中断线&#xff0c;每个中断都有一个中断号&#xff0c;通过中断号即可区分不同的中断。 2、Linux中断API函数 需要包含头文件“#include <linux/interrupt.h>” 1)、在使用某个中断功能的时候&#xff0c;需要执行“申请中断” int request_irq(…

左手医生:医疗 AI 企业的云原生提效降本之路

相信这样的经历对很多人来说并不陌生&#xff1a;为了能到更好的医院治病&#xff0c;不惜路途遥远奔波到大城市&#xff1b;或者只是看个小病&#xff0c;也得排上半天长队。这些由于医疗资源分配不均导致的就医问题已是老生长谈。 云计算、人工智能、大数据等技术的发展和融…

阿里云安全产品简介,Web应用防火墙与云防火墙产品各自作用介绍

在阿里云的安全类云产品中&#xff0c;Web应用防火墙与云防火墙是用户比较关注的安全类云产品&#xff0c;二则在作用上并不是完全一样的&#xff0c;Web应用防火墙是一款网站Web应用安全的防护产品&#xff0c;云防火墙是一款公共云环境下的SaaS化防火墙&#xff0c;本文为大家…

海量数据处理项目-账号微服务和流量包数据库表+索引规范(下)

海量数据处理项目-账号微服务和流量包数据库表索引规范&#xff08;下&#xff09; 第2集 账号微服务和流量包数据库表索引规范讲解《下》 简介&#xff1a;账号微服务和流量包数据库表索引规范讲解 账号和流量包的关系&#xff1a;一对多traffic流量包表思考点 海量数据下每…

Spring依赖注入思想分析

Spring 依赖注入思想分析 文章目录 Spring 依赖注入思想分析一、前言二、控制反转&#xff08;Inversion of Control&#xff09;1. 代码依赖初始化问题2. 匿名内部类解决方案3. 创建接口实现类方案4. 问题深入5. 定义父类解决问题1方案6. 控制反转解决问题2方案 三、依赖注入&…

const在指针中的作用以及*p在各种写法中分别代表什么含义

const在指针中起固定的作用&#xff0c;在不同的写法中其效果也有所区别&#xff0c;具体如下&#xff1a; 1、int* const p固定的是指针p指向的地址。 2、int const *p固定的是指针p指向地址中储存的内容。 例&#xff1a; 以上操作在编译器中执行不了&#xff0c;会报错。…