RabbitMQ的模式中,常用的模式有:简单模式,发布与订阅模式,工作模式,路由模式,主题模式。简单模式不太会运用到工作中,我们可以使用 RabbitMQ 的发布订阅模式,实现:
- 用户发布动态,其“粉丝”收到其发布动态的消息
- 用户下订单,库存模块、支付模块等收到消息并处理
- 等等
1. 创建RabbitMQ的生产者
创建一个springboot项目,项目创建idea的默认创建springboot项目
然后进行rabbitMq的整合过程
1.1 引入rabbitmq的jar包
在项目的pom.xml中引入rabbitmq的jar包,详情如下:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>2.3.12.RELEASE</version>
</dependency>
1.2 配置文件中添加配置
在项目的配置文件中添加rabbitmq的相关配置,配置详情如下:
server:port: 10001# rabbitMq 相关配置
spring:application:name: springboot-rabbitmq-s1rabbitmq:host: 127.0.0.1port: 5672virtual-host: /username: guestpassword: guest
guest是rabbitmq的默认密码,不需要重新设置,不过在生产中为了安全是需要改密码的
1.3 创建配置类
配置类用于将队列和交换机进行绑定,该操作也可以使用rabbitmq的管理界面操作,并不是一定需要的步骤。配置类详情如下:
package com.study.rabbitmq.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @Author alen* @DATE 2022/6/7 23:50*/
@Configuration
public class RabbitMQConfig {public static final String EXCHANGE_NAME = "fanout-order-exchange";public static final String SMS_QUEUE = "sms-fanout-queue";public static final String EMAIL_QUEUE = "email-fanout-queue";public static final String WECHAT_QUEUE = "wechat-fanout-queue";/*** 1.* 声明交换机* @return*/@Beanpublic FanoutExchange fanoutExchange() {/*** FanoutExchange的参数说明:* 1. 交换机名称* 2. 是否持久化 true:持久化,交换机一直保留, false:不持久化,用完就删除* 3. 是否自动删除 false:不自动删除, true:自动删除*/return new FanoutExchange(EXCHANGE_NAME, true, false);}/*** 2.* 声明队列* @return*/@Beanpublic Queue smsQueue() {/*** Queue构造函数参数说明* 1. 队列名* 2. 是否持久化 true:持久化, false:不持久化*/return new Queue(SMS_QUEUE, true);}@Beanpublic Queue emailQueue() {return new Queue(EMAIL_QUEUE, true);}@Beanpublic Queue wechatQueue() {return new Queue(WECHAT_QUEUE, true);}/*** 3.* 队列与交换机绑定*/@Beanpublic Binding smsBinding() {return BindingBuilder.bind(smsQueue()).to(fanoutExchange());}@Beanpublic Binding emailBinding() {return BindingBuilder.bind(emailQueue()).to(fanoutExchange());}@Beanpublic Binding wechatBinding() {return BindingBuilder.bind(wechatQueue()).to(fanoutExchange());}
}
1.4 模拟发送消息
创建一个service类,在类中进行rabbitMq消息的发送,源码如下:
package com.study.rabbitmq.service;import cn.hutool.json.JSONUtil;
import com.study.rabbitmq.entity.Order;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;/*** @Author alen* @DATE 2022/6/7 23:31*/
@Service
@Slf4j
public class OrderService {@Autowiredprivate RabbitTemplate rabbitTemplate;public void createOrder(Order order) {String body = JSONUtil.toJsonStr(order);log.info("订单信息:{}", body);//交换机名称String exchangeName = "fanout-order-exchange";//路由key 由于我们实现的是fanout模式(广播模式),不需要路由key,所有的消费者都可以进行监听和消费String routeKey = "";//发送mq消息rabbitTemplate.convertAndSend(exchangeName, routeKey, body);log.info("rabbitmq发送广播模式消息成功。。。");}
}
使用单元测试模拟消息发送,单元测试详情如下:
package com.study.rabbitmq;import com.study.rabbitmq.entity.Order;
import com.study.rabbitmq.service.OrderService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;import java.util.UUID;@SpringBootTest
class SpringbootRabbitmqS1ApplicationTests {@Autowiredprivate OrderService orderService;@Testvoid contextLoads() {for (long i = 1; i < 50; i++) {Order order = new Order();order.setRequestId(i);order.setUserId(i);order.setOrderNo(UUID.randomUUID().toString());order.setAmount(10L);order.setGoodsNum(1);order.setTotalAmount(10L);orderService.createOrder(order);}}
}
发送完后,我们可以在rabbitMq的管理后台看到已经发送成功的消息,效果如下:
可见消息已经全部发送完毕,因为前面的三个队列都是绑定在同一个交换机上,所以三个队列都会收到消息。
2. 创建RabbitMQ的消费者
创建消费者服务S2,项目结构参考生产者项目结构,然后进行消息消费的相关代码的实现,实现过程如下
2.1 引入RabbitMQ的jar包
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>2.3.12.RELEASE</version>
</dependency>
2.2 在项目配置文件中添加配置
配置详情如下
server:port: 10002# rabbitmq 相关配置
spring:application:name: springboot-rabbitmq-s2rabbitmq:host: 127.0.0.1port: 5672virtual-host: /username: adminpassword: admin
2.3 创建MQ消息消费者
消费者类详情如下
package com.study.rabbitmq.service;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;/*** @Author alen* @DATE 2022/6/8 8:15*/
@Slf4j
@Service
@RabbitListener(queues = {"email-fanout-queue"}) //监听队列
public class FanoutEmailConsumer {@RabbitHandlerpublic void emailMessage(String message) {log.info("Email fanout --接收到消息:{}", message);}
}
启动消费者项目,消费效果如下:
登录rabbitMq后台查看队列的消息情况如下
到此,似乎感觉整合得很顺利,没啥毛病。但是实际的运用中,以上演示过程中忽略了两个很重要的问题,一是我如何知道消息被顺利的发送到了队列,因为实际的工作中,不大可能每个消息都去rabbitmq管理后台查看。二是如果消息在消费的过程中出现了异常导致消息丢失,不重要的数据还好,如果是支付类的消息呢?就会产生严重的线上问题。那么这两个问题需要怎么处理呢?其实rabbitmq提供了消息发送结果回调和消息消费手动确认来处理这两个问题。