点击关注公众号,Java干货及时送达👇
来源:juejin.cn/post/7173571716294115336
What is rabbitMQ ?
RabbitMQ的消息模型
Why use rabbitMQ ?
异步
解耦
削峰
How to use rabbitMQ ?
基操安装方式
功能实现
另外的安装方式(建议使用这种)
实现延迟消息
What is rabbitMQ ?
❝
RabbitMQ 是一个由 Erlang 语言开发的 「AMQP」 (高级消息队列协议) 的开源实现。
RabbitMQ 是「轻量级且易于部署」 的,能支持多种消息协议。
RabbitMQ 可以部署在分布式和联合配置中,以满足「高规模、高可用性」 的需求。
❞
具体特点包括:
「可靠性」 (Reliability):RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布 确认。
「灵活的路由」 (Flexible Routing):在消息进入队列之前,通过 Exchange 来路由消息的。对 于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。
「消息集群」 (Clustering):多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker。
「高可用」 (Highly Available Queues):队列可以在集群中的机器上进行镜像,使得在部分节 点出问题的情况下队列仍然可用。
「多种协议」 (Multi-protocol):RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT等等。
「多语言客户端」 (Many Clients):RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、 Ruby 等等。
「管理界面」 (Management UI):RabbitMQ 提供了一个易用的用户界面,使得用户可以监控 和管理消息 Broker 的许多方面。
「跟踪机制」 (Tracing):如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生 了什么。
「插件机制」 (Plugin System):RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编 写自己的插件。
RabbitMQ的消息模型
RabbitMQ的消息模型
![图片](
Why use rabbitMQ ?
下面,我以一个(花店)商家的角色来向大家形象地举例:
异步
❝
之前顾客来店里下单,我会叫顾客等一下,同时叫店员准备订单,准备好送到顾客手上了顾客「才能离开」
❞
现在顾客打电话给我:"我要买xxx,地址是:xxx,你帮我送一下"
我拿个小本子记下:顾客a,电话:xxx,地址:xxx
店员「有空」 后就会准备订单并配送
解耦
❝
以前有新订单时,我会亲自找「每一个店员」 (负责准备花束的,负责记账的,负责送花的等),告诉他们有新订单了,有空了处理一下
如果有店员入职,我通知的时候会多「通知」 一个人;离职时,少「通知」 一个人(维护一个需要通知的人员列表)
❞
现在,有新订单的时候,我只需要「记到小本子上」 ,店员有空了自己来看
削峰
❝
去年七夕,很多电话打给我,我把每一个订单告诉店员,但是店员忙不过来,客户又一直打电话来催,最后店员累成狗直接罢工
❞
今年七夕节我学乖了,电话打进来我会告诉顾客:"我知道了,「会尽快安排处理」 ",然后记到小本子上就行,店员有空时「按顺序来处理订单」 就好
另外还有一种思路,引导客户不一定要在七夕才开始下单,可以 「提前」先买(淘宝的双十一预售就是出于这样的削峰思路)
以上是rabbitMQ解决的「核心」 问题。
How to use rabbitMQ ?
基操安装方式
MAC端
brew install rabbitmq
Windows端
安装Erlang,下载地址:
http://erlang.org/download/otp_win64_21.3.exe
安装Erlang
安装RabbitMQ,下载地址:
https://dl.bintray.com/rabbitmq/all/rabbitmq-server/3.7.14/rabbitmq-server-3.7.14.exe
安装RabbitMQ
安装完成后,进入RabbitMQ安装目录下的sbin目录
RabbitMQ
在地址栏输入cmd并回车启动命令行,然后输入以下命令启动管理功能:
rabbitmq-plugins enable rabbitmq_management
RabbitMQ
访问地址查看是否安装成功:
http://localhost:15672/
RabbitMQ
CentOS端
安装erlang
# rabbitmq依赖erlang 需要自己去自行下载
cd /path/to/erlang-sound-code && ./configure --prefix=/usr/local/erlang
make && make install vim /etc/profile
# 添加
export PATH=$PATH:/usr/local/erlang/binsource /etc/profile
# 输入erl,会出现版本信息,即安装成功
安装rabbitmq
#下载 abbitmq_server-3.8.16 并移动到/usr/local/下
vim /etc/profile# 添加
export PATH=$PATH:/usr/local/rabbitmq_server-3.8.16/sbin
source /etc/profilecd /usr/local/rabbitmq_server-3.8.16/sbin
# 启动
./rabbitmq-server start
功能实现
❝
RabbitMQ实现延迟消息的方式有两种,一种是使用
死信队列
实现,另一种是使用延迟插件
实现。死信队列的实现网上较多,本文介绍更简单的,使用
延迟插件
实现(mac环境,java版本)。❞
另外的安装方式(建议使用这种)
首先准备需要用到的安装文件及插件(rabbitmq_delayed_message_exchange),版本需要匹配,不匹配的版本可能装不上或导致兼容问题。
rabbitmq_delayed_message_exchange
本人使用的erl_25.0和rabbitMQ-3.10.0(可以到官网下载或者私信作者获取)。使用这种方式安装的优点在于本地安装和服务器安装流程完全一致,不过服务器需要开放安全端口5672,15672视情况,一般建议测试环境开放,生产环境关闭。
安装erl和rabbitMQ,具体步骤略(这个应该没人不会吧,逃~)。
将插件文件复制到RabbitMQ安装目录的plugins
目录下,执行以下命令后重启rabbitMQ:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
实现延迟消息
❝
以一个实际业务场景举例:当客服状态为在线且3分钟未回复客户消息时,自动重启im会话机器人接管会话。这是一个常见的延迟消息使用场景。
❞
首先在pom.xml
文件中添加AMQP
相关依赖
<!--消息队列相关依赖-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
在application.yml
添加RabbitMQ的相关配置
spring:rabbitmq:host: localhost # rabbitmq的连接地址port: 5672 # rabbitmq的连接端口号virtual-host: /mall # rabbitmq的虚拟hostusername: im # rabbitmq的用户名password: xxxxxx # rabbitmq的密码publisher-confirms: true #如果对异步消息需要回调必须设置为true
接下来创建RabbitMQ的java配置,主要用于配置交换机、队列和绑定关系
/*** 消息队列配置*/
@Configuration
public class RabbitMqConfig {/*** 机器人消息重启插件消息队列所绑定的交换机*/@BeanCustomExchange chatPluginDirect() {//创建一个自定义交换机,可以发送延迟消息Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct");return new CustomExchange(QueueEnum.QUEUE_RESET_MESSAGE_CANCEL.getExchange(), "x-delayed-message", true, false, args);}/*** 机器人消息重启插件队列*/@Beanpublic Queue chatPluginQueue() {return new Queue(QueueEnum.QUEUE_RESET_MESSAGE_CANCEL.getName());}/*** 将机器人消息重启插件队列绑定到交换机*/@Beanpublic Binding chatPluginBinding(CustomExchange chatPluginDirect, Queue chatPluginQueue) {return BindingBuilder.bind(chatPluginQueue).to(chatPluginDirect).with(QueueEnum.QUEUE_RESET_MESSAGE_CANCEL.getRouteKey()).noargs();}
}
创建一个消息的发出者,通过给消息设置x-delay
头来设置消息从交换机发送到队列的延迟时间
/*** 机器人重启队列发出者*/
@Component
@Slf4j
public class ChatQueueSender {private static Logger LOGGER = LoggerFactory.getLogger(ChatQueueSender.class);@Autowiredprivate AmqpTemplate amqpTemplate;public void sendMessageToChat(Long cmid, final long delayTimes) {//给延迟队列发送消息amqpTemplate.convertAndSend(QueueEnum.QUEUE_RESET_MESSAGE_CANCEL.getExchange(), QueueEnum.QUEUE_RESET_MESSAGE_CANCEL.getRouteKey(), cmid, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {//给消息设置延迟毫秒值message.getMessageProperties().setHeader("x-delay", delayTimes);return message;}});}
}
创建一个消息的接收者,用于处理延迟插件队列中的消息。
/*** 机器人重启队列处理者*/
@Component
@Slf4j
@RabbitListener(queues = "im.chat.cancel")
public class ChatQueueReceiver {@Autowiredprivate ChatRestartRobotService chatRestartRobotService;@RabbitHandlerpublic void handleOnChat(Long cmid) {
// log.info("机器人会话重启");chatRestartRobotService.restartRobot(cmid);}
}
最后,在对应的地方调用即可:
延迟插件队列
done !
热门内容:
因ChatGPT爆红的大数据培养计划|拿不到年薪25W全额退款用上这几个开源管理系统做项目,领导看了直呼专业!
ChatGPT 连夜迭代:你老婆不好使了
动态可监控线程池,你还没用起来吗?
最近面试BAT,整理一份面试资料《Java面试BAT通关手册》,覆盖了Java核心技术、JVM、Java并发、SSM、微服务、数据库、数据结构等等。
获取方式:点“在看”,关注公众号并回复 666 领取,更多内容陆续奉上。
明天见(。・ω・。)