RabbitMQ消息队列快速入门
初始MQ
MQ全称为Message Queue,即消息队列,是在消息的传输过程中保存消息的容器。它是典型的生产者-消费者模型。
生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。消息的生产和消费都是异步的,可以解耦发送者和接收者之间的通信,提高系统的可扩展性和可靠性。
技术选型
目比较常见的MQ实现有:
- ActiveMQ
- RabbitMQ
- RocketMQ
- Kafka
RabbitMQ | ActiveMQ | RocketMQ | Kafka | |
---|---|---|---|---|
公司/社区 | Rabbit | Apache | 阿里 | Apache |
开发语言 | Erlang | Java | Java | Scala&Java |
协议支持 | AMQP,XMPP,SMTP,STOMP | OpenWire,STOMP,REST,XMPP,AMQP | 自定义协议 | 自定义协议 |
可用性 | 高 | 一般 | 高 | 高 |
单机吞吐量 | 一般 | 差 | 高 | 非常高 |
消息延迟 | 微秒级 | 毫秒级 | 毫秒级 | 毫秒以内 |
消息可靠性 | 高 | 一般 | 高 | 一般 |
RabbitMQ
RabbitMQ是基于Erlang语言开发的开源消息通信中间件,官网地址:
Messaging that just works — RabbitMQ
安装
基于Docker来安装RabbitMQ
docker pull rabbitmq
运行
docker run \-e RABBITMQ_DEFAULT_USER=daybreak \-e RABBITMQ_DEFAULT_PASS=123456 \--name mq \--hostname mq \-p 15672:15672 \-p 5672:5672 \-d \rabbitmq
在安装命令中有两个映射的端口:
- 15672:RabbitMQ提供的管理控制台的端口
- 5672:RabbitMQ的消息发送处理接口
运行成功后,访问http://ip:15672,输入username和password即可进入管理控制台。
RabbitMQ架构
- publisher:生产者,也就是发送消息的一方
- consumer:消费者,也就是消费消息的一方
- queue:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理
- exchange:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。
- virtual host:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue
Spring AMQP
由于RabbitMQ
采用了AMQP协议,因此它具备跨语言的特性。任何语言只要遵循AMQP协议收发消息,都可以与RabbitMQ
交互。并且RabbitMQ
官方也提供了各种不同语言的客户端。
但是,RabbitMQ官方提供的Java客户端编码相对复杂,一般生产环境下我们更多会结合Spring来使用。而Spring的官方刚好基于RabbitMQ提供了这样一套消息收发的模板工具:Spring AMQP。
SpringAmqp的官方地址:Spring AMQP
SpringAMQP提供了三个功能:
- 自动声明队列、交换机及其绑定关系
- 基于注解的监听器模式,异步接收消息
- 封装了RabbitTemplate工具,用于发送消息
交换机
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
交换机的类型有四种:
- Fanout:广播,将消息交给所有绑定到交换机的队列。
- Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列。
- Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符。
- Headers:头匹配,基于MQ的消息头匹配,用的较少。
声明队列和交换机
基于Bean方式声明
package com.itheima.consumer.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class FanoutConfiguration {/*** 声明交换机* @return*/@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("daybreak.fanout");}/*** 声明队列* @return*/@Beanpublic Queue fanoutQueue(){return new Queue("fanout.queue");}/*** 绑定队列和交换机* @param fanoutQueue3* @param fanoutExchange* @return*/@Beanpublic Binding FanoutBinding3(Queue fanoutQueue, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue).to(fanoutExchange);}
}
基于注解声明
声明Direct模式的交换机和队列:
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue", durable = "true"),exchange = @Exchange(name = "daybreak.direct", type = ExchangeTypes.DIRECT),key = {"red", "yellow"}
))
public void listenDirectQueue(String msg){System.out.println("消费者收到了direct.queue的消息:" + msg);
}
声明Topic模式的交换机和队列:
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue"),exchange = @Exchange(name = "daybreak.topic", type = ExchangeTypes.TOPIC),key = "#.news"
))
public void listenTopicQueue(String msg){System.out.println("消费者接收到topic.queue的消息:【" + msg + "】");
}
快速入门
导入依赖
<!--AMQP依赖,包含RabbitMQ-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
添加配置
在application.yml中添加配置:
spring:rabbitmq:host: 192.168.200.130 # 你的虚拟机IPport: 5672 # 端口virtual-host: / # 虚拟主机username: daybreak # 用户名password: 123456 # 密码
配置JSON转换器
Spring的消息发送代码接收的消息体是一个Object:
在数据传输时,它会把发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。
然而默认情况下Spring采用的序列化方式是JDK序列化。
JDK序列化存在下列问题:
- 数据体积过大
- 有安全漏洞
- 可读性差
显然,JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。
使用JSON方式序列化需要引入以下依赖:
<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version>
</dependency>
注意,如果项目中引入了spring-boot-starter-web依赖,则无需再次引入Jackson依赖。
配置消息转换器,在服务的启动类中添加一个Bean即可:
@Bean
public MessageConverter messageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息jackson2JsonMessageConverter.setCreateMessageIds(true);return jackson2JsonMessageConverter;
}
接收端
package com.itheima.consumer.listeners;import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class MyListener {@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "daybreak.queue", durable = "true"),exchange = @Exchange(name = "daybreak.direct", type = ExchangeTypes.DIRECT),key = "demo"))public void listenDirectQueue(String msg){System.out.println("消费者收到了direct.queue的消息:" + msg);}
}
发送端
package com.itheima.publisher;import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest
public class MyPublisher {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void myTest(){String exchangeName = "daybreak.direct";String routingKey = "demo";String msg = "Hello,RabbitMQ";rabbitTemplate.convertAndSend(exchangeName, routingKey, msg);}
}
启动发送端和接收端后,运行结果如下:
消费者收到了direct.queue的消息:Hello,RabbitMQ