目录
一、基本使用
二、使用注意事项
1. 生产者重连机制 - 保证mq服务是通的
2. 生产者确认机制 - 回调机制
3. MQ的可靠性
4. Lazy Queue模式
5. 消费者确认机制
一、基本使用
部署完RabbitMQ有两种使用方式:
- 网页客户端
- Java代码
MQ组成部分:
- 虚拟主机 -> 进行数据隔离的,好比mysql中的不同数据库
- 交换机 -> 进行路由转发消息
- fanout:最普通 相当于广播
- topic:可依key绑定不同队列,可使用.分割key,并支持模糊匹配
- direct:可依key绑定不同队列,不支持模糊匹配
- 队列 -> 接受交换机转发过来的消息
Java中使用(注解开发):
1.引依赖和配置属性 - 版本选个差不多的 我是继承的springboot的
<!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
spring:rabbitmq:host: 192.168.189.189port: 5672virtual-host: hmallVirusername: hmallpassword: 123321
2.使用RabbitTemplate发消息
@SpringBootTest
public class TestSendMs {@AutowiredRabbitTemplate rabbitTemplate;@Testvoid testSend1(){String queueName = "hmallQueue1";String ms = "hello";rabbitTemplate.convertAndSend(queueName, ms);}}
3.使用RabbitListerner接受消息
@Component
public class MyListeners {@RabbitListener(queues = "hmallQueue1")public void listenerMs1(String ms) throws InterruptedException {System.out.println("消费者1接收到消息 -> " + ms);Thread.sleep(20);}}
4.声明队列和交换机的方式二(简单常用)
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "hmallQueue1"),exchange = @Exchange(name = "hmall", type = ExchangeTypes.DIRECT),key = {"red", "blue"}))public void listenDirectQueue1(String msg){System.out.println("消费者1接收到消息 -> " + msg);}
轮询接受消息问题:
队列读取消息时使用轮询机制,每个队列都读取相同的消息数量,这样不好,我们要针对队列处理消息的能力,需要在配置文件设置属性fetch
spring:rabbitmq:host: 192.168.189.189port: 5672virtual-host: hmallVirusername: hmallpassword: 123321listener:simple:prefetch: 1 # 每次处理完消息再获取新的消息 性能越好处理消息越多
扩展消息转换器:
默认的消息转换器是直接将对象序列化为Byte[],即读不懂又不安全还占内存
<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId></dependency>
@Beanpublic MessageConverter jacksonMessageConverter(){return new Jackson2JsonMessageConverter();}
二、使用注意事项
1. 生产者重连机制 - 保证mq服务是通的
连接重试,注意这里是阻塞式的,意味着连接失败会一直重试其他业务不会执行,所以建议禁用此模式。
spring:rabbitmq:host: 192.168.189.189port: 5672virtual-host: hmallVirusername: hmallpassword: 123321connection-timeout: 1s # 连接失败重试template:retry:enabled: true # 开启重试机制initial-interval: 1000ms # 失败后的初始等待时间multiplier: 1 # 失败后下次的等待市场倍数max-attempts: 3 # 最大重试次数
2. 生产者确认机制 - 回调机制
SpringAMQP生产者确认机制的返回值情况?
- ack 消息成功投递到mq
- 交换机收到了 没有路由转发到队列 ack + return路由异常
- 交换机收到并路由成功 只ack
- nack 消息丢失 投递失败
3. MQ的可靠性
服务一旦挂了消息就都没有了,还有就是内存如果满了,会触发阻塞式的强制持久化操作,这会导致这段时间处理消息的能力为0
- 设置消息的持久化属性:需要将
delivery_mode
属性设置为2。这个设置会将消息标记为持久化,确保消息被写入磁盘而不是仅保存在内存中。 - 声明持久化的队列:在RabbitMQ中声明队列时,需要将
durable
属性设置为true。这样,队列的元数据以及队列中的消息都会被持久化到磁盘上。需要注意的是,即使队列被设置为持久化,也不能保证内部所存储的消息不会丢失,因为RabbitMQ默认在消息被消费后立即删除它。要确保消息不被删除,需要在消费时进行相应的设置。 - 声明持久化的交换机:与队列类似,交换机也可以被设置为持久化。在声明交换机时,将
durable
属性设置为true,就可以将交换机标记为持久化。如果交换机不设置持久化,那么在RabbitMQ服务重启之后,相关的交换机元数据会丢失,但已发送的消息不会丢失,只是不能再将新的消息发送到这个交换机中。
4. Lazy Queue模式
- 降低内存压力:在高负载情况下,大量消息堆积可能导致内存压力剧增。使用Lazy Queue模式,消息直接写入磁盘,可以有效降低内存使用,避免内存溢出等问题。
- 支持大量消息存储:由于消息存储在磁盘上,Lazy Queue模式可以支持数百万条消息的存储,满足大规模消息处理的需求。
- 提高系统稳定性:通过将消息持久化到磁盘,Lazy Queue模式增强了消息的可靠性,即使RabbitMQ服务器发生故障或重启,消息也不会丢失,保证了系统的稳定性和数据的完整性。
5. 消费者确认机制
这里就是模拟事务嘛,利用AOP思想