服务异步通讯——springcloud
文章目录
- 服务异步通讯——springcloud
- 初始MQ
- RabbitMQ快速入门
- 单机部署
- 1.1.下载镜像
- 安装MQ
- SpringAMQP
- work Queue 工作队列
- Fanout Exchange广播模式
- DirectExchange路由模式
- TopicExchange话题模式
- 消息转换器
初始MQ
RabbitMQ快速入门
官网https://www.rabbitmq.com/
单机部署
我们在Centos7虚拟机中使用Docker来安装。
1.1.下载镜像
方式一:在线拉取
docker pull rabbitmq:3-management
方式二:从本地加载
在课前资料已经提供了镜像包:
上传到虚拟机中后,使用命令加载镜像即可:
docker load -i mq.tar
安装MQ
执行下面的命令来运行MQ容器:
docker run \-e RABBITMQ_DEFAULT_USER=itcast \-e RABBITMQ_DEFAULT_PASS=123321 \--name mq \--hostname mq1 \-p 15672:15672 \-p 5672:5672 \-d \rabbitmq:3-management
SpringAMQP
https://spring.io/projects/spring-amqp/
<!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
spring:rabbitmq:host: 192.168.10.88 # rabbitMQ的ip地址port: 5672 # 端口username: itcastpassword: 123321virtual-host: /
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendMessage2SimpleQueue(){String queueName = "simple.queue";String message = "hello, spring amqp!";rabbitTemplate.convertAndSend(queueName,message);}
}
package cn.itcast.mq.listener;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class SpringRabbitListener {@RabbitListener(queues = "simple.queue")public void listenSimpleQueue(String msg){System.out.println("消费者接受到消息:[" + msg + "]");}
}
work Queue 工作队列
Fanout Exchange广播模式
@Configuration
public class FanoutConfig {// itcast.fanout@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("itcast.fanout");}// fanout.queue1@Beanpublic Queue fanoutQueue1(){return new Queue("fanout.queue1");}// 绑定队列1到交换机@Beanpublic Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}// fanout.queue2@Beanpublic Queue fanoutQueue2(){return new Queue("fanout.queue2");}// 绑定队列2到交换机@Beanpublic Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}}
@RabbitListener(queues = "fanout.queue1")public void listenFanoutQueue1(String msg){System.out.println("消费者接受到fanoutQueue1 消息:[" + msg + "]");}@RabbitListener(queues = "fanout.queue2")public void listenFanoutQueue2(String msg){System.out.println("消费者接受到fanoutQueue2 消息:[" + msg + "]");}
@Testpublic void testSendFanoutExchange(){//交换机名称String exchanneName = "itcast.fanout";//消息String message = "hello every one!";// 发送消息rabbitTemplate.convertAndSend(exchanneName,"",message);}
DirectExchange路由模式
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT),key = {"red","blue"}))public void LinstenDirectQueue1(String msg){System.out.println("消费者接受到directQueue1 消息:[" + msg + "]");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT),key = {"red","yellow"}))public void LinstenDirectQueue2(String msg){System.out.println("消费者接受到directQueue2 消息:[" + msg + "]");}
@Testpublic void testSendDirectExchange(){//交换机名称String exchanneName = "itcast.direct";//消息String message = "hello,red";// 发送消息rabbitTemplate.convertAndSend(exchanneName,"red",message);}
TopicExchange话题模式
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"),exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC),key = "china.#"))public void LinstenTopicQueue1(String msg){System.out.println("消费者接受到topicQueue1消息:[" + msg + "]");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2"),exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC),key = "#.news"))public void LinstenTopicQueue2(String msg){System.out.println("消费者接受到topicQueue2消息:[" + msg + "]");}
@Testpublic void testSendTopicExchange(){//交换机名称String exchanneName = "itcast.topic";//消息String message = "小米汽车倒闭了";// 发送消息rabbitTemplate.convertAndSend(exchanneName,"china.news",message);}
消息转换器
@Testpublic void testSendOnjectQueue() {Map<String, Object> msg = new HashMap<>();msg.put("name","柳岩");msg.put("age",18);rabbitTemplate.convertAndSend("object.queue",msg);}
<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency>
@Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}
@RabbitListener(queues = "object.queue")public void ListenObjectQueue(Map<String,Object> msg){System.out.println("接收到object.queue的消息:" + msg);}