学习材料
2024最新SpringCloud微服务开发与实战,java黑马商城项目微服务实战开发(涵盖MybatisPlus、Docker、MQ、ES、Redis高级等)
复习MQ,学过的,应该会轻松一点吧。
RabbitMQ
交换机没有存储功能,必须和队列进行绑定。
1.虚拟主机:数据隔离
Java客户端
1.Work Queues
2.Fanout,Direct,Topic交换机
队列只能发送给一个消费者,有了交换机就可以实现多个消费者同时消费一个消息。
3.如何在java代码中声明队列和交换机
有给定的Queue,Exchange和Binder类或者接口。可以基于注解实现(注解位置是使用消息的地方),或者编写配置类(Config包下)。
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}
package com.itheima.consumer.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class DirectConfig {/*** 声明交换机* @return Direct类型交换机*/@Beanpublic DirectExchange directExchange(){return ExchangeBuilder.directExchange("hmall.direct").build();}/*** 第1个队列*/@Beanpublic Queue directQueue1(){return new Queue("direct.queue1");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with("red");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue1WithBlue(Queue directQueue1, DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");}/*** 第2个队列*/@Beanpublic Queue directQueue2(){return new Queue("direct.queue2");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue2WithRed(Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with("red");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue2WithYellow(Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");}
}
4.消息转化器
放在启动类即可。
@Bean
public MessageConverter messageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息jackson2JsonMessageConverter.setCreateMessageIds(true);return jackson2JsonMessageConverter;
}
5.动手完成
怎么在common包下定义一个rabbitTemple实例,包含addBeforePublishPostProcessors() 和 addAfterReceivePostProcessors()。
有点难办,目前卡在怎么创建rabbiteTemplate哈哈哈
MQ高级部分
1.发送者的可靠性:发送者连接重试,发送者确认。
发送者重连就是配置那里定义一下就好,启动重试机制,重试时间间隔次数等等。
发送者确认就比较麻烦了。需要配置,还需要指定确认机制。(极少数情况下会发生,但不推荐使用。)
2.MQ的可靠性:数据持久化,LazyQueue。
主要还是MQ宕机不丢失消息最重要。
为了解决这个问题,从RabbitMQ的3.6.0版本开始,就增加了Lazy Queues的模式,也就是惰性队列。惰性队列的特征如下:
- 接收到消息后直接存入磁盘而非内存
- 消费者要消费消息时才会从磁盘中读取并加载到内存(也就是懒加载)
- 支持数百万条的消息存储
3.消费者的可靠性:消费者确认机制,失败重试,失败处理,业务幂等,兜底方案。
消费者确认机制最简单的方法就是在配置文件中配置auto就好了。
本地失败重试。
经过本地失败重试后,该怎么处理,1.reject,直接丢弃,2.nack,消息重新入队,3.失败消息发送到指定交换机。
业务幂等–》消息不重复消费(处理方案:消息ID唯一或者业务逻辑上处理)。
兜底方案就是,业务定时主动去查询数据,以确定一致性。
4.延迟消息
实现方案:
- 死信交换机+TTL
- 延迟消息插件
作业
配置很简单,但是其中的逻辑,helper和消息转换器等等怎么处理不是很懂。需要再深入学习。