Mq介绍
RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。支持Windows、Linux/Unix、MAC OS X操作系统和包括JAVA在内的多种编程语言。
引入Pom
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
具体的模型。概念都不说了。直接看代码
控制器测试代码
package com.example.demoamqp.controller;import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.RandomUtil;
import com.example.demoamqp.entity.Order;
import com.example.demoamqp.send.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*;/*** @version 1.0.0* @className: TestSendMqController* @description: 消息发送者* @author: zhangjunfa* @date: 2023/6/16 11:05*/
@Slf4j
@RequestMapping
@RestController
public class TestSendMqController {private Sender sender;private FanoutSender fanoutSender;private TopicSender topicSender;private DeadSender deadSender;private DelayQueueSender delayQueueSender;private Delay2Sender delay2Sender;public TestSendMqController(Sender sender, FanoutSender fanoutSender, TopicSender topicSender, DeadSender deadSender, DelayQueueSender delayQueueSender, Delay2Sender delay2Sender) {this.sender = sender;this.fanoutSender = fanoutSender;this.topicSender = topicSender;this.deadSender = deadSender;this.delayQueueSender = delayQueueSender;this.delay2Sender = delay2Sender;}@PostMapping("/send")public Object send(@RequestParam(name = "param") String param) throws InterruptedException {Thread.sleep(3000);sender.send(param);return "success";}@PostMapping("/sendSimple")public Object sendSimple(@RequestParam(name = "orderName") String orderName) throws InterruptedException {Order order = new Order();order.setId(IdUtil.getSnowflakeNextId());order.setOrderName(orderName);order.setOrderNo(IdUtil.nanoId());order.setCreatedTime(DateUtil.date());sender.sendSimple(order);return "success";}@PostMapping("/sendCode")public Object sendCode() throws InterruptedException {int randomInt = RandomUtil.randomInt(100000, 999999);log.info("生产者生成了一个验证码:{}", randomInt);this.fanoutSender.sendCode(String.valueOf(randomInt));return "success";}@PostMapping("/sendTopic")public Object sendTopic(@RequestParam(name = "msg") String msg, @RequestParam(name = "routingKey") String routingKey) throws InterruptedException {this.topicSender.sendMsg(msg, routingKey);return "success";}/*** 延迟队列(死信)设计** @param msg* @return* @throws InterruptedException*/@PostMapping("/sendDead")public Object sendDead(@RequestParam(name = "msg") String msg) throws InterruptedException {this.deadSender.sendDelay(msg, 2000);return "success 我是死信队列";}/*** 延迟队列设计** @param msg* @return* @throws InterruptedException*/@PostMapping("/sendDelay")public Object sendDelay(@RequestParam(name = "msg") String msg) throws InterruptedException {this.delayQueueSender.sendMsg(msg);return "success 我是延迟队列";}/*** 延迟队列设计** @param msg* @return* @throws InterruptedException*/@PostMapping("/sendDelay2")public Object sendDelay2(@RequestParam(name = "msg") String msg,@RequestParam(name = "delayTime") Integer delayTime) throws InterruptedException {this.delay2Sender.sendDelay2(msg,delayTime);return "success 我是延迟队列";}
}
配置类代码
package com.example.demoamqp.config;import com.example.demoamqp.conatants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 创建消息队列并注入容器中** @author ross*/@Configuration
public class QueueConfig {/*** 创建队列** @return*/@Beanpublic Queue createQueue() {return new Queue("ross_amq");}/****************************** 发布、订阅者模式 *********************************/@Bean // 邮箱的队列public Queue mailQueue(){return new Queue(Constants.MQ_MAIL_QUEUE,Constants.durable,Constants.exclusive,Constants.autoDelete);}@Bean // 电话的队列public Queue phoneQueue(){return new Queue(Constants.MQ_PHONE_QUEUE,Constants.durable,Constants.exclusive,Constants.autoDelete);}@Bean // 交换机public FanoutExchange fanoutExchange(){return new FanoutExchange(Constants.MQ_FANOUT_EXCHANGE,Constants.durable,Constants.autoDelete);}/*** 邮箱绑定交换机* @return*/@Beanpublic Binding mailBinding(){return BindingBuilder.bind(mailQueue()).to(fanoutExchange());}/*** 电话绑定交换机* @return*/@Beanpublic Binding phoneBinding(){return BindingBuilder.bind(phoneQueue()).to(fanoutExchange());}/*----------------------------------------------------*/@Bean // A队列public Queue topicAQueue(){return new Queue(Constants.MQ_TOPIC_QUEUE_A,Constants.durable,Constants.exclusive,Constants.autoDelete);}/*** topic模式相关配置*/@Bean // B队列public Queue topicBQueue(){return new Queue(Constants.MQ_TOPIC_QUEUE_B,Constants.durable,Constants.exclusive,Constants.autoDelete);}@Bean // topic的交换机public TopicExchange topicMyExchange(){return new TopicExchange(Constants.MQ_TOPIC_EXCHANGE,Constants.durable,Constants.autoDelete);}@Beanpublic Binding topicAQueueBinding(){return BindingBuilder.bind(topicAQueue()).to(topicMyExchange()).with("topic.xxx"); // 规则 topic.xxx}@Beanpublic Binding topicBQueueBinding(){return BindingBuilder.bind(topicBQueue()).to(topicMyExchange()).with("topic.*"); // 规则 topic.xxx}
}
消费者
package com.example.demoamqp.receiver;import cn.hutool.core.date.DateUtil;
import com.example.demoamqp.conatants.Constants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** 死信队列消费者* @className: MqDelayReceiver* @projectName: demo-one* @auth: rosszhang* @date: 2023/12/28 16:53*/
@Slf4j
@Component
public class MqDelayReceiver {@RabbitListener(queues = Constants.MQ_DELAY_QUEUE)public void delayConsume(String msg) {log.debug("[消费者消费信息:{},时间:{}", msg, DateUtil.date());}}
发送者代码
package com.example.demoamqp.send;import cn.hutool.core.date.DateUtil;
import com.example.demoamqp.conatants.Constants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/*** 死信队列** @className: DeadSender* @projectName: demo-one* @auth: rosszhang* @date: 2023/12/28 16:49*/
@Slf4j
@Component
public class DeadSender {private AmqpTemplate rabbitAmqpTemplate;@Autowiredpublic void setRabbitAmqpTemplate(AmqpTemplate rabbitAmqpTemplate) {this.rabbitAmqpTemplate = rabbitAmqpTemplate;}/*** 死信队列** @param msg* @param delayTime*/public void sendDelay(String msg, int delayTime) {rabbitAmqpTemplate.convertAndSend(Constants.MQ_NORMAL_EXCHANGE,Constants.MQ_NORMAL_ROUTING_KEY,msg,process -> {process.getMessageProperties().setExpiration(String.valueOf(delayTime));return process;});log.debug("[生产者:]发送消息:{},时间{},延迟{}秒", msg, DateUtil.date(), delayTime / 1000);}
}
源码大家可以看我Gitte地址
Gitte仓库地址
下面这个是我的个人公共号 只会写Bug的程序猿,大家可以关注一下,一键三连。相互交流学习。