RabbitMQ在Java中使用 SpringBoot 从基础到高级

充分利用每一个监听者

需要充分利用每一个消费者,需要在配置文件中加上prefetch配置并设置为1

rabbitmq:listener:simple:prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

创建交换机和队列

创建队列

  1. "fanout.queue1":队列的名称,这里是 “fanout.queue1”。
  2. false:指示队列是否是持久化的。如果设置为 true,则表示队列会在 RabbitMQ 服务器重启后仍然存在。如果设置为 false,则表示队列是非持久化的,即在 RabbitMQ 服务器重启后会被删除。
  3. false:指示队列是否是独占的。如果设置为 true,则表示只有声明该队列的连接可以使用它。如果设置为 false,则表示其他连接也可以使用该队列。
  4. true:指示队列是否会自动删除。如果设置为 true,则表示当没有消费者连接到该队列时,队列会自动删除。如果设置为 false,则表示队列不会自动删除。
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class CreateFanoutQueue {@Beanpublic Queue fanoutQueue1() {QueueBuilder.durable("fanout").build();return new Queue("fanout.queue1", false, false, true);}
}

创建交换机

  1. name:交换机的名称。
  2. durable:指示交换机是否是持久化的。如果设置为 true,则表示 RabbitMQ 服务器重启后仍然存在。如果设置为 false,则表示交换机是非持久化的,即在 RabbitMQ 服务器重启后会被删除。在你的代码中,durable 参数被设置为 false,表示这个交换机是非持久化的。
  3. autoDelete:指示交换机是否是自动删除的。如果设置为 true,则表示当没有与之绑定的队列时,交换机会自动删除。如果设置为 false,则表示交换机不会自动删除。在你的代码中,autoDelete 参数被设置为 false,表示这个交换机不会自动删除。
@Configuration
public class CreateFanoutExchange {/*** 创建永久交换机,必须要设置是否自动删除** @return Fanout类型交换机*/@Beanpublic FanoutExchange fanoutExchange2() {return new FanoutExchange("bunny.fanout2", false, false);}
}

注解创建交换机和队列

  • bindings:用于定义队列和交换机之间的绑定关系。在你的代码中,通过 @QueueBinding 注解来定义了一个队列绑定。该绑定包括一个名为 “fanout.queue3” 的队列和一个名为 “bunny.fanout” 的交换机之间的绑定关系。
  • value:用于指定队列的属性。在你的代码中,通过 @Queue 注解来指定了队列的名称为 “fanout.queue3”,并设置了 durable = "true",表示该队列是持久化的。
  • exchange:用于指定交换机的属性。在你的代码中,通过 @Exchange 注解来指定了交换机的名称为 “bunny.fanout”,类型为 ExchangeTypes.FANOUT,并设置了 durable = "true",表示该交换机是持久化的。
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class FanoutListener {@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "fanout.queue3", durable = "true"),exchange = @Exchange(name = "bunny.fanout", type = ExchangeTypes.FANOUT, durable = "true")))public void listenFanoutQueue3(String message) {System.out.println("消费者3接收到Fanout消息:【" + message + "】");}
}

交换机类型与作用

  • Fanout:广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是 Fanout 交换机
  • Direct:订阅,基于 RoutingKey(路由 key)发送给订阅了消息的队列
  • Topic:通配符订阅,与 Direct 类似,只不过 RoutingKey 可以使用通配符

Fanout交换机

[!important]

  • 接收publisher发送的消息
  • 将消息按照规则路由到与之绑定的队列
  • 不能缓存消息,路由失败,消息丢失
  • FanoutExchange的会将消息路由到每个绑定的队列

无特殊功能,当队列发送消息和接受消息时,只能发送到交换机, 交换机把消息发送给绑定过的所有队列, 订阅队列的消费者都能拿到消息。

当我们给交换机绑定了三个队列,这三个队列收到消息即可完成消息监听和发送。

使用方式
  1. 创建两个队列,分别为fanout.queue1fanout.queue2

在这里插入图片描述

  1. 创建bunny.fannout交换机。

在这里插入图片描述

  1. 绑定两个队列到交换机中。

在这里插入图片描述

Java创建队列和交换机
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class FanoutListener {/*** 监听 消费者1 是否收到消息** @param message 消息*/@RabbitListener(queues = "fanout.queue1")public void listenFanoutQueue1(String message) {System.out.println("消费者1接收到Fanout消息:【" + message + "】");}/*** 监听 消费者2 是否收到消息** @param message 消息*/@RabbitListener(queues = "fanout.queue2")public void listenFanoutQueue2(String message) {System.out.println("消费者2接收到Fanout消息:【" + message + "】");}/*** 监听 消费者2 是否收到消息** @param message 消息*/@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "fanout.queue3", durable = "true"),exchange = @Exchange(name = "bunny.fanout", type = ExchangeTypes.FANOUT, durable = "true")))public void listenFanoutQueue3(String message) {System.out.println("消费者3接收到Fanout消息:【" + message + "】");}
}
Java发送消息
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest
public class TestSendFanout {@AutowiredRabbitTemplate rabbitTemplate;/*** 向队列发送消息 fanout.queue1*/@Testvoid testSendFanout1() throws Exception {for (int i = 0; i < 100; i++) {rabbitTemplate.convertAndSend("bunny.fanout", null, "第二个消息队列:" + i);}}
}

在这里插入图片描述

Direct交换机

使用方式

[!important]

Direct交换机与Fanout交换机的差异

  • Fanout交换机将消息路由给每一个与之绑定的队列
  • Direct交换机根据RoutingKey判断路由给哪个队列
  • 如果多个队列具有相同的RoutingKey,则与Fanout功能类似

Direct交换机发送消息时根据路由的key来发送的,而Fanout交换机是广播发送不设置路由的key

在这里插入图片描述

在这里插入图片描述

Java创建
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class DirectListener {/*** * 监听者1* 创建队列 持久化的、不自动删除* 创建交换机 持久化的、不自动删除* 无接受的key** @param message 接受消息*/@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1", durable = "true", autoDelete = "false"),exchange = @Exchange(name = "bunny.direct", type = ExchangeTypes.DIRECT, durable = "true", autoDelete = "false")))public void listenDirectQueue1(String message) {System.out.println("消费者1接收到 Direct 消息:【" + message + "】");}/*** * 监听者2* 创建队列 持久化的、不自动删除* 创建交换机 持久化的、不自动删除* key包含 red 和 yellow** @param message 接受消息*/@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2", durable = "true", autoDelete = "false"),exchange = @Exchange(name = "bunny.direct", type = ExchangeTypes.DIRECT, durable = "true", autoDelete = "false"),key = {"red", "yellow"}))public void listenDirectQueue2(String message) {System.out.println("消费者2接收到 Direct key 为 {\"red\", \"yellow\"} 消息:【" + message + "】");}/*** * 监听者3* 创建队列 持久化的、不自动删除* 创建交换机 持久化的、不自动删除* key包含 blue 和 yellow** @param message 接受消息*/@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue3", durable = "true", autoDelete = "false"),exchange = @Exchange(name = "bunny.direct", type = ExchangeTypes.DIRECT, durable = "true", autoDelete = "false"),key = {"blue", "yellow"}))public void listenDirectQueue3(String message) {System.out.println("消费者2接收到 Direct key 为 {\"blue\", \"yellow\"} 消息:【" + message + "】");}/*** * 监听者4* 创建队列 持久化的、不自动删除* 创建交换机 持久化的、不自动删除* key包含 yellow** @param message 接受消息*/@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue4", durable = "true", autoDelete = "false"),exchange = @Exchange(name = "bunny.direct", type = ExchangeTypes.DIRECT, durable = "true", autoDelete = "false"),key = "yellow"))public void listenDirectQueue4(String message) {System.out.println("消费者2接收到 Direct key 为 \"yellow\" 消息:【" + message + "】");}
}
Java发送消息
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest
public class TestSendDirect {@AutowiredRabbitTemplate rabbitTemplate;/*** 发送黄色消息*/@Testvoid testSendDirectYellow() throws Exception {for (int i = 0; i < 1000; i++) {rabbitTemplate.convertAndSend("bunny.direct", "yellow", "发送消息:" + i);}}/*** 发送红色消息*/@Testvoid testSendDirectRed() throws Exception {for (int i = 0; i < 1000; i++) {rabbitTemplate.convertAndSend("bunny.direct", "red", "发送消息:" + i);}}/*** 发送蓝色消息*/@Testvoid testSendDirectBlue() throws Exception {for (int i = 0; i < 1000; i++) {rabbitTemplate.convertAndSend("bunny.direct", "blue", "发送消息:" + i);}}
}

由于消费者1没有绑定任何key所以任何消息都没有接受到。

在这里插入图片描述

Topic交换机

使用方式

[!important]

Direct交换机与Topic交换机的差异?

  • Topic交换机接收的消息RoutingKey必须是多个单词,以 **.** 分割
  • Topic交换机与队列绑定时的bindingKey可以指定通配符
  • #:代表0个或多个词
  • *:代表1个词

在这里插入图片描述

在这里插入图片描述

Java创建
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class TopicListener {/*** * 监听者1* 创建队列 持久化的、不自动删除* 创建交换机 持久化的、不自动删除* key为china.#** @param message 接受消息*/@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"),exchange = @Exchange(name = "bunny.topic", type = ExchangeTypes.TOPIC),key = "china.#"))public void listenTopicChina(String message) {System.out.println("消费者1接收到topic.queue1的消息:【" + message + "】");}/*** * 监听者1* 创建队列 持久化的、不自动删除* 创建交换机 持久化的、不自动删除* #.news** @param message 接受消息*/@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2"),exchange = @Exchange(name = "bunny.topic", type = ExchangeTypes.TOPIC),key = "#.news"))public void listenTopicNews(String message) {System.out.println("消费者1接收到topic.queue2的消息:【" + message + "】");}
}
Java发送消息
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest
public class TestSendTopic {@AutowiredRabbitTemplate rabbitTemplate;/*** 发送消息 key 包含 china.# 的*/@Testvoid testSendTopic1() throws Exception {for (int i = 0; i < 1000; i++) {rabbitTemplate.convertAndSend("bunny.topic", "china.ly", "china.ly 发送消息:" + i);}}/*** 发送消息 key 包含 #.news 的*/@Testvoid testSendTopic2() {for (int i = 0; i < 1000; i++) {rabbitTemplate.convertAndSend("bunny.topic", "ly.news", "ly.news 发送消息:" + i);}}
}

在这里插入图片描述
在这里插入图片描述

消息持久化

为了提升性能,默认情况下MQ的数据都是在内存存储的临时数据,重启后就会消失。为了保证数据的可靠性,必须配置数据持久化,包括:

  • 交换机持久化(在交换机类型与作用已解释)
  • 队列持久化(在交换机类型与作用已解释)
  • 消息持久化(本文介绍)

使用MessageBuilder创建使用调用方法setDeliveryMode,在里面设置持久化消息和非持久化消息

非持久化消息

setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)设置非持久化消息。

import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;import java.nio.charset.StandardCharsets;@SpringBootTest
public class TestSendFanout {@AutowiredRabbitTemplate rabbitTemplate;/*** * 发送数据非持久化* 向 bunny.fanout 发送消息*/@Testvoid testSendFanout2() throws Exception {// 创建消息-非持久化消息Message message = MessageBuilder.withBody("hello world".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).build();// 发送很多消息for (int i = 0; i < 1000000; i++) {rabbitTemplate.convertAndSend("bunny.fanout", null, message);}}
}

持久化消息

setDeliveryMode(MessageDeliveryMode.PERSISTENT)设置持久化消息。

import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;import java.nio.charset.StandardCharsets;@SpringBootTest
public class TestSendFanout {@AutowiredRabbitTemplate rabbitTemplate;@Testvoid testSendFanout3() throws Exception {// 创建消息-持久化消息Message message = MessageBuilder.withBody("hello".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();// 发送很多消息for (int i = 0; i < 1000000; i++) {rabbitTemplate.convertAndSend("bunny.fanout", null, message);}}
}

惰性队列

惰性队列比其它队列性能好,速度快。

控制台操作

使用控制台添加惰性队列。

在这里插入图片描述

Java方式添加

不基于注解

使用IOC容器注入,使用属性名方式lazy()创建。

@Configuration
public class CreateLazyQueue {/*** 创建惰性队列** @return 惰性队列*/@Beanpublic Queue lazyQueue() {return QueueBuilder.durable("lazy.queue1").lazy().build();}
}
基于注解

也是比较常见的方式,这种也方便。

@Queue(name = "lazy.queue2", durable = "true", autoDelete = "false", arguments = @Argument(name = "x-queue-mode", value = "lazy"))

全部代码

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class LazyListener {@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "lazy.queue2", durable = "true", autoDelete = "false", arguments = @Argument(name = "x-queue-mode", value = "lazy")),exchange = @Exchange(name = "lazy.fanout", type = ExchangeTypes.FANOUT, durable = "true", autoDelete = "false")))public void listenLazyQueue1(String message) {System.out.println("消费者1接收到 Lazy 消息:【" + message + "】");}
}

更新已有队列为lazy模式

对于已经存在的队列,也可以配置为lazy模式,但是要通过设置policy实现。

命令行方式

可以基于命令行设置policy

命令解读:

  • rabbitmqctl :RabbitMQ的命令行工具
  • set_policy :添加一个策略
  • Lazy :策略名称,可以自定义
  • "^lazy-queue$" :用正则表达式匹配队列的名字
  • '{"queue-mode":"lazy"}' :设置队列模式为lazy模式
  • --apply-to queues:策略的作用对象,是所有的队列
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues  
控制台方式

在这里插入图片描述

消费者的可靠性

[!tip]

当RabbitMQ向消费者投递消息以后,需要知道消费者的处理状态如何。因为消息投递给消费者并不代表就一定被正确消费了,可能出现的故障有很多,比如:

  • 消息投递的过程中出现了网络故障
  • 消费者接收到消息后突然宕机
  • 消费者接收到消息后,因处理不当导致异常

失败重试机制

当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次requeue到队列,再次投递,直到消息处理成功为止。
极端情况就是消费者一直无法执行成功,那么消息requeue就会无限循环,导致mq的消息处理飙升,带来不必要的压力

spring:rabbitmq:listener:simple:retry:enabled: true # 开启消费者失败重试initial-interval: 1000ms # 初识的失败等待时长为1秒multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false	

Java中配置

当消息中出现异常时,自动转到错误交换机和错误队列中,方便开发和调试人员查看。

创建error.direct,设置key为error,队列为error.queue

当然,如果在配置中没有设置错误机制这时某些配置也没有必要加载进来,当配置spring.rabbitmq.listener.simple.retry.enabledtrue时才启用当前配置。

@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")// 当开启错误重试这个配置才有效果

示例代码

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")// 当开启错误重试这个配置才有效果
@Slf4j
public class ErrorConfiguration {@Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate) {log.info("加载错误交换机");return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");}
}
只查看不做处理

如果只是想在MQ控制台中查看错误消息,并不需要监听错误消息,这时可以使用之前IOC注入方式配置。

创建交换机、队列,之后绑定设置需要传入的key。

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")// 当开启错误重试这个配置才有效果
@Slf4j
public class ErrorConfiguration {@Beanpublic DirectExchange errorMessageExchange() {return new DirectExchange("error.direct");}@Beanpublic Queue errorQueue() {return new Queue("error.queue", true);}@Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange) {return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");}@Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate) {log.info("加载错误交换机");return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");}
}
测试配置

为了验证我们配置的是否有问题,可以在监听消息时手动抛出异常。

对消息进行错误测试,在消息中抛出异常。

/*** * 监听者3* 创建队列 持久化的、不自动删除* 创建交换机 持久化的、不自动删除* key包含 blue 和 yellow** @param message 接受消息*/
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue3", durable = "true", autoDelete = "false"),exchange = @Exchange(name = "bunny.direct", type = ExchangeTypes.DIRECT, durable = "true", autoDelete = "false"),key = {"blue", "yellow"}
))
public void listenDirectQueue3(String message) {System.out.println("消费者3接收到 Direct key 为 {\"blue\", \"yellow\"} 消息:【" + message + "】");throw new RuntimeException("错误消息");
}

在这里插入图片描述

对消息进行监听处理

如果需要对错误队列进行监听并且做出相应处理,使用注解方式,可以直接创建并监听消息。

其中自动绑定了消息队列,和自动创建了错误交换机。

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.time.LocalDateTime;@Component
@Slf4j
public class ErrorListener {@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "error.queue", durable = "true"),exchange = @Exchange(name = "error.direct", type = ExchangeTypes.DIRECT),key = "error"))@RabbitListener(queues = "error.queue")public void listenError(String message) {System.out.println(LocalDateTime.now() + "收集错误队列-消费者接收到 error 消息:【" + message + "】");}
}

在这里插入图片描述

全部的配置

spring:application:name: demo-mqrabbitmq:host: 192.168.1.6 # 主机地址port: 5672 # 端口virtual-host: /bunny # 虚拟主机username: bunny # 用户名password: "02120212" # 密码listener:simple:prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息acknowledge-mode: auto # 确认机制retry:enabled: true # 开启消费者失败重试initial-interval: 1000ms # 初始失败等待时长multiplier: 1 # 下次失败等待时间被树,下次等待时长 multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true 无状态 false 有状态。如果业务中包含事务,这里改为falseconnection-timeout: 1s # 设置mq连接超时时间template:retry:enabled: true # 开启超时重试机制initial-interval: 200ms # 失败后初始等待时间multiplier: 1 # 失败后下次等待时长倍数,发送消息失败不会走这个max-attempts: 3 # 最大重试次数publisher-confirm-type: none # 开启publisher confirm机制,并设置confirm类型publisher-returns: true # 开启publisher return机制

业务幂等性

什么事幂等性

幂等是一个数学概念,用函数表达式来描述是这样的:f(x) = f(f(x)),例如求绝对值函数。 在程序开发中,则是指同一个业务,执行一次或多次对业务状态的影响是一致的。例如:

  • 根据id删除数据
  • 查询数据
  • 新增数据

但数据的更新往往不是幂等的,如果重复执行可能造成不一样的后果。比如:

  • 取消订单,恢复库存的业务。如果多次恢复就会出现库存重复增加的情况
  • 退款业务。重复退款对商家而言会有经济损失。

所以,我们要尽可能避免业务被重复执行。 然而在实际业务场景中,由于意外经常会出现业务被重复执行的情况,例如:

  • 页面卡顿时频繁刷新导致表单重复提交
  • 服务间调用的重试
  • MQ消息的重复投递

生成消息唯一ID

SpringAMQPMessageConverter自带了MessageID的功能,我们只要开启这个功能即可。 以Jackson的消息转换器为例:

@Bean
public MessageConverter messageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息jjmc.setCreateMessageIds(true);return jjmc;
}

延迟消息

对于超过一定时间未支付的订单,应该立刻取消订单并释放占用的库存

像这种在一段时间以后才执行的任务,我们称之为延迟任务,而要实现延迟任务,最简单的方案就是利用MQ的延迟消息了。

在RabbitMQ中实现延迟消息也有两种方案:

  • 死信交换机+TTL
  • 延迟消息插件

基于死信队列虽然可以实现延迟消息,但是太麻烦了。因此RabbitMQ社区提供了一个延迟消息插件来实现相同的效果。

DelayExchange插件

官方文档说明

Scheduling Messages with RabbitMQ | RabbitMQ - Blog

插件下载地址

GitHub - rabbitmq/rabbitmq-delayed-message-exchange: Delayed Messaging for RabbitMQ

在这里插入图片描述

安装延迟插件

找到RabbitMQ镜像插件位置。

docker inspect mq
# 或者执行
docker volume inspect mq-plugins

在这里插入图片描述

将插件拖入rabbitmq_delayed_message_exchange-3.8.17.8f537ac.ez

在这里插入图片描述

执行命令

rabbitmq_delayed_message_exchange是你的插件名称。

docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange

在这里插入图片描述

Java中使用

创建延迟消息,延迟消息如果很多而且延迟时间较长不建议使用MQ去处理这些消息,因为在内部会维护一个时钟,如果消息很大时间又长,对于系统资源消耗会很大。

如果时间很长可以使用Redis去处理这些内容。

不基于注解

使IOC容器方式创建延迟消息。

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
@Slf4j
public class CreateDirectExchange {/*** 创建延迟交换机** @return 延迟交换机*/@Beanpublic DirectExchange delayExchange() {return ExchangeBuilder.directExchange("delay.direct").delayed()// 设置delay的属性为true.durable(true)// 持久化.build();}
}
基于注解

使用注解方式一次性创建交换机、队列、延迟消息。

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class DelayListener {@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "delay.queue", durable = "true"),exchange = @Exchange(name = "delay.direct", delayed = "true"),key = "delay"))public void listenDelay(String message) {System.out.println("消费者接收到 delay 消息:【" + message + "】");}
}

创建延迟消息会有独特的tag。

在这里插入图片描述

创建队列

在这里插入图片描述

发送延迟消息

import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest
@Slf4j
public class TestSendDelay {@AutowiredRabbitTemplate rabbitTemplate;@Testvoid testSendDelay() throws Exception {rabbitTemplate.convertAndSend("delay.direct", "delay", "延迟消息", message -> {message.getMessageProperties().setDelayLong(5000L);return message;});log.info("延迟消息发送成功");}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/283265.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

集简云新增“文本语音转换”功能,实现智能语音交互

为丰富人工智能领域的应用集成&#xff0c;为用户提供更便捷和智能化的信息获取和视觉创作方式&#xff0c;本周集简云上线了内置应用—文本语音转换。目前支持OpenAI TTS和TTS HD模型&#xff0c;实现文本语音高效智能转换&#xff0c;也可根据你的产品或品牌创建独特的神经网…

二、SpringBoot3 配置文件

本章概要 统一配置管理概述属性配置文件使用YAML 配置文件使用批量配置文件注入多环境配置和使用 2.1 统一配置管理概述 SpringBoot工程下&#xff0c;进行统一的配置管理&#xff0c;你想设置的任何参数&#xff08;端口号、项目根路径、数据库连接信息等等)都集中到一个固定…

福建科立讯通信 指挥调度管理平台 SQL注入漏洞复现(CVE-2024-2620、CVE-2024-2621)

0x01 产品简介 福建科立讯通信指挥调度管理平台是一个专门针对通信行业的管理平台。该产品旨在提供高效的指挥调度和管理解决方案,以帮助通信运营商或相关机构实现更好的运营效率和服务质量。该平台提供强大的指挥调度功能,可以实时监控和管理通信网络设备、维护人员和工作任…

如何在linux环境上部署单机ES(以8.12.2版本为例)

ES安装&#xff08;以8.12.2版本为例&#xff09; 首先创建好对应的文件夹然后在对应的文件夹下执行依次这些命令 1.wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-8.12.2-linux-x86_64.tar.gz 2.wget https://artifacts.elastic.co/downloads/…

【Flask】Flask项目结构初识

1.前提准备 Python版本 # python 3.8.0 # 查看Python版本 python --version 安装第三方 Flask pip install flask # 如果安装失败&#xff0c;可以使用 -i&#xff0c;指定使用国内镜像源 # 清华镜像源&#xff1a;https://pypi.tuna.tsinghua.edu.cn/simple/ 检查 Flask 是…

6个免费的ChatGPT网站

AI 大模型的出现给时代带来了深远的影响&#xff1a; 改变了产业格局&#xff1a;AI 大模型的发展推动了人工智能技术在各行业的广泛应用&#xff0c;改变了传统产业的运作方式&#xff0c;促进了新兴产业的崛起&#xff0c;如智能驾驶、医疗健康、金融科技等。提升了科学研究…

python的ITS 信息平台的设计与实现flask-django-nodejs-php

第二&#xff0c;陈列说明该系统实现所采用的架构、系统搭建采用的服务器、系统开发环境和使用的工具&#xff0c;以及系统后台采用的数据库。 最后&#xff0c;对系统进行全面测试&#xff0c;主要包括功能测试、查询性能测试、安全性能测试。 分析系统存在的不足以及将来改进…

haproxy 高可用

一 haproxy HAProxy简介 HAProxy提供高可用、负载均衡以及基于TCP和HTTP的应用代理&#xff0c;适合处理高负载站点的七层数据请求。类似的代理服务可以屏蔽内部真实服务器&#xff0c;防止内部服务器遭受攻击。 HAProxy特点和优点&#xff1a; 1.支持原声SSL,同时支持客户端和…

[LeetBook]【学习日记】排序算法——归并排序

主要思想 归并排序是一种分治算法&#xff0c;其排序过程包括分和治分是指将要排序的序列一分为二、二分为四&#xff0c;直到单个序列中只有一个数治是指在分完后&#xff0c;将每两个元素重新组合&#xff0c;四合为二、二合为一&#xff0c;最终完成排序 图片作者&#xf…

阿里云OSS分布式存储

目录 &#x1f9c2;1.OSS开通 &#x1f32d;2.头像上传整合OSS &#x1f68d;2.1.引入依赖 &#x1f68d;2.2添加配置 &#x1f68d;2.3创建配置类 &#x1f68d;2.4添加实现类 &#x1f68d;2.5controller调用接口 &#x1f68d;2.6postman测试 1.OSS开通 1.登…

Google XSS Game Level 6 通关方式

文章目录 链接&#xff1a;[Google XSS Game](#https://xss-game.appspot.com/)Level 6 - Follow the &#x1f407;思路1 &#xff08;当然&#xff0c;我使用这个方式没有成功&#xff0c;所以才来记录下&#xff09;解法2 【最简单的解法】需要注意的一个小问题 链接&#x…

Spring异步注解@Async线程池配置

系列文章目录 文章目录 系列文章目录前言前言 前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站,这篇文章男女通用,看懂了就去分享给你的码吧。 从Spring3开始提供了@Async注解,该注解可以被标注在方法上,以便异步地调…

基于“云”重构“百度云盘”

这一篇文章是和上一篇连着的哟&#xff01; # docker run -p 80:80 -d -v /data/owncloud/:/var/www/html owncloud 一、【安装完成】 二、【打开浏览器】 三、【回到这个熟悉的界面&#xff0c;掉。】 四、【上传文件】 试了可以看哇偶&#xff01;&#xff01;&#xff01…

Word为图表设置图注并在图表清单中自动生成

1如果需要自动插入题注&#xff0c;请不要自己为文件增加新的标题样式或删除自带的标题1样式 2章节大标题最好是标题1&#xff0c;2,3而不要设置标题一、二、三&#xff0c;否则图例在自动生成时会显示 图一 -1&#xff0c;调整起来会非常不方便 若实在要使用大写中文标题&…

Vue模块化开发步骤—遇到的问题—解决办法

目录 1.npm install webpack -g 2.npm install -g vue/cli-init 3.初始化vue项目 4.启动vue项目 Vscode初建Vue时几个需要注意的问题-CSDN博客 1.npm install webpack -g 全局安装webpack 直接命令提示符运行改指令会报错&#xff0c;operation not permitted 注意&#…

牛客NC101 压缩字符串(一)【简单 模拟 Java,Go,PHP】

题目 题目链接&#xff1a; https://www.nowcoder.com/practice/c43a0d72d29941c1b65c857d8ac9047e 思路 直接模拟参考答案Java import java.util.*;public class Solution {/*** 代码中的类名、方法名、参数名已经指定&#xff0c;请勿修改&#xff0c;直接返回方法规定的值…

Node安装,nodejs详细安装步骤

什么是nodejs? 脚本语言需要一个解析器才能运行&#xff0c;JavaScript是脚本语言&#xff0c;在不同的位置有不一样的解析器&#xff0c;如写入html的js语言&#xff0c;浏览器是它的解析器角色。而对于需要独立运行的JS&#xff0c;nodejs就是一个解析器。 每一种解析器都是…

【基于skyent的热更思考】

基于skyent的热更思考 skynet-inject热更原理关键源码分析热更方式拓扑图注意事项 skynet-inject热更原理 inject是一个用于动态加载 Lua 代码文件并执行其中定义的函数的功能。可以在运行时动态加载 Lua 代码文件&#xff0c;然后调用其中定义的函数&#xff0c;通过修改模块…

欧科云链:2024将聚焦发展与安全,用技术助力链上数据安全和合规

近期&#xff0c;OpenAI和Web3.0两大新技术发展势头迅猛。OpenAI 再次引领AI领域的新浪潮&#xff0c;推出了创新的文本转视频模型——Sora&#xff0c;Sora 可以创建长达60 秒的视频&#xff0c;包含高度详细的场景、複杂的摄像机运动以及情感丰富角色&#xff0c;再次将AI 的…

Linux-生产者与消费者模型

文章目录 一、什么是生产者与消费者模型&#xff1f;二、示例模型示例模型介绍交易场所&#xff08;blockQueue&#xff09;消费者与生产者运行结果 总结 一、什么是生产者与消费者模型&#xff1f; 参照日常生活中&#xff0c;购买商品的人群可以被称之为消费者&#xff0c;生…