SpringBoot整合RabbitMQ,三种交换机类型示例
1、流程概括
2、引入依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
3、配置RabbitMQ连接
在application.properties或application.yml中配置RabbitMQ服务器的连接参数:
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
4、不同交换机模式下的使用
4.1、DirectExchange(直连交换机)
消费者
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class DirectConsumer {//注册一个队列@Bean //启动多次为什么不报错?启动的时候,它会根据这个名称Direct_Q01先去查找有没有这个队列,如果有什么都不做,如果没有创建一个新的public Queue queue(){return QueueBuilder.durable("Direct_Q01").maxLength(100).build();}//注册交换机@Beanpublic DirectExchange exchange(){//1.启动的时候,它会根据这个名称Direct_E01先去查找有没有这个交换机,如果有什么都不做,如果没有创建一个新的return ExchangeBuilder.directExchange("Direct_E01").build();}//绑定交换机与队列关系@Beanpublic Binding binding(Queue queue,DirectExchange exchange){return BindingBuilder.bind(queue).to(exchange).with("RK01");}//启动一个消费者@RabbitListener(queues = "Direct_Q01")public void receiveMessage(String msg){System.out.println("收到消息:"+msg);}}
生产者
@Service
public class DirectProvider {@Autowiredprivate RabbitTemplate rabbitTemplate;public void send(Object message) {rabbitTemplate.convertAndSend("Direct_E01", "RK01", message);}
}
测试
@SpringBootTest
class TestApp {@Autowired
private DirectProvider directProvider;@Test
void test() throws IOException {for(int i = 0; i < 10; i++){directProvider.send("你好呀"+i);System.out.println("发送成功"+i);ThreadUtil.safeSleep(1000);}System.in.read();}}
一个交换机对多个队列的特点:
一个队列对多个消费者特点:
4.2、FanoutExchange(扇形/广播交换机)
消费者
/** Copyright (c) 2020, 2024, fpl1116.cn All rights reserved.**/
package com.fpl.consumers;import com.fpl.model.OrderingOk;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** <p>Project: spring-rabbitmq - FanoutConsumer</p>* <p>Powered by fpl1116 On 2024-04-08 11:35:22</p>* <p>描述:<p>* @author penglei* @version 1.0* @since 1.8*/
@Configuration
public class FanoutConsumer {//注册一个队列@Beanpublic Queue fanoutQueue1(){return QueueBuilder.durable("Fanout_Q01").build();}//注册交换机@Beanpublic FanoutExchange fanoutExchange(){//1.启动的时候,它会根据这个名称Direct_E01先去查找有没有这个交换机,如果有什么都不做,如果没有创建一个新的return ExchangeBuilder.fanoutExchange("Fanout_E01").build();}@Bean //交换机与队列关系public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}@RabbitListener(queues = "Fanout_Q01")public void receiveMessage(OrderingOk msg){System.out.println("FanoutConsumer1 消费者1 收到消息:"+msg);}@RabbitListener(queues = "Fanout_Q01")public void receiveMessage32(OrderingOk msg){System.out.println("FanoutConsumer1 消费者2 收到消息:"+msg);}}
生产者
@Service
public class FanoutProvider {@Autowiredprivate RabbitTemplate rabbitTemplate;public void send(Object message) {rabbitTemplate.convertAndSend("Fanout_E01", "", message);}
}
4.3、TopicExchange(主题交换机)
消费者
/** Copyright (c) 2020, 2024, fpl1116.cn All rights reserved.**/
package com.fpl.consumers;import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;/*** <p>Project: spring-rabbitmq - TopicConsumer</p>* <p>Powered by fpl1116 On 2024-04-08 11:38:20</p>* <p>描述:<p>* @author penglei* @version 1.0* @since 1.8*/
//@Configuration
public class TopicConsumer {@Beanpublic TopicExchange topicExchange(){return ExchangeBuilder.topicExchange("Topic_E01").build();}@Beanpublic Queue topicQueue1(){return QueueBuilder.durable("小龙").build();}@Beanpublic Queue topicQueue2(){return QueueBuilder.durable("小虎").build();}@Beanpublic Queue topicQueue3(){return QueueBuilder.durable("小羊").build();}//注册交换机@Bean //交换机与队列关系public Binding TopicBinding1(Queue topicQueue1, TopicExchange topicExchange){return BindingBuilder.bind(topicQueue1).to(topicExchange).with("#");}@Bean //交换机与队列关系public Binding TopicBinding2(Queue topicQueue2,TopicExchange topicExchange){return BindingBuilder.bind(topicQueue2).to(topicExchange).with("1.6.*");}@Bean //交换机与队列关系public Binding TopicBinding3(Queue topicQueue3,TopicExchange topicExchange){return BindingBuilder.bind(topicQueue3).to(topicExchange).with("1.8.*");}@RabbitListener(queues = "小龙")public void receiveMessage(String msg){System.out.println("小龙 收到消息:"+msg);}@RabbitListener(queues = "小虎")public void receiveMessage2(String msg){System.out.println("小虎 收到消息:"+msg);}@RabbitListener(queues = "小羊")public void receiveMessage3(String msg){System.out.println("小羊 收到消息:"+msg);}
}
生产者
@Service
public class TopicProvider {@Autowiredprivate RabbitTemplate rabbitTemplate;public void send(Girl girl) {rabbitTemplate.convertAndSend("Topic_E01",girl.getHeight(), girl);}
}