文章目录 镜像 docker-compose.yml 访问控制台 Spring Boot 批量声明队列
镜像
https://hub.docker.com/_/rabbitmq
docker pull rabbitmq:management
docker pull rabbitmq:4.0.7-management
docker-compose.yml
services : rabbitmq : image : rabbitmq: 3.9.5- managementcontainer_name : rabbitmqrestart : alwaysnetwork_mode : "host" volumes : - /etc/localtime: /etc/localtime- ./rabbitmq/data: /var/lib/rabbitmqenvironment : RABBITMQ_DEFAULT_USER : adminRABBITMQ_DEFAULT_PASS : Test123RABBITMQ_DEFAULT_VHOST : /test
docker-compose up -d rabbitmq
访问控制台
http://IP地址:15672/ 账号/密码:admin/Test123
Spring Boot
< dependency> < groupId> org.springframework.boot</ groupId> < artifactId> spring-boot-starter-amqp</ artifactId>
</ dependency>
spring : rabbitmq : host : 192.168.0.140port : 5672 username : adminpassword : Test123virtual-host : /testlistener : simple : acknowledge-mode : autoretry : enabled : true max-attempts : 5 initial-interval : 5000
批量声明队列
public class MQConstants { public static final String TEST_QUEUE_1 = "test-queue-1" ; public static final String TEST_QUEUE_2 = "test-queue-2" ; }
import org. springframework. amqp. core. Queue ;
import org. springframework. amqp. rabbit. connection. ConnectionFactory ;
import org. springframework. amqp. rabbit. core. RabbitAdmin ;
import org. springframework. context. annotation. Bean ;
import org. springframework. context. annotation. Configuration ; import java. lang. reflect. Field ;
import java. lang. reflect. Modifier ;
import java. util. ArrayList ;
import java. util. List ; @Configuration
public class RabbitMQConfig { @Bean public RabbitAdmin rabbitAdmin ( ConnectionFactory connectionFactory) { return new RabbitAdmin ( connectionFactory) ; } @Bean public List < Queue > createQueues ( RabbitAdmin rabbitAdmin) { List < Queue > queueList = new ArrayList < > ( ) ; List < String > queueNameList = getConstantsValueList ( MQConstants . class ) ; for ( String queueName : queueNameList) { queueList. add ( new Queue ( queueName, true ) ) ; } queueList. forEach ( rabbitAdmin:: declareQueue ) ; return queueList; } private List < String > getConstantsValueList ( Class < ? > clazz) { Field [ ] fields = clazz. getDeclaredFields ( ) ; List < String > valueList = new ArrayList < > ( fields. length) ; for ( Field field : fields) { int modifiers = field. getModifiers ( ) ; if ( Modifier . isStatic ( modifiers) && Modifier . isFinal ( modifiers) ) { try { valueList. add ( field. get ( null ) . toString ( ) ) ; } catch ( IllegalAccessException e) { e. printStackTrace ( ) ; } } } return valueList; } }
public Test { @Autowired private RabbitTemplate rabbitTemplate; @Test public void test ( ) { rabbitTemplate. convertAndSend ( MQConstants . TEST_QUEUE_1 , "test" ) ; }
}
import com. rabbitmq. client. Channel ;
import org. springframework. amqp. core. Message ;
import org. springframework. amqp. rabbit. annotation. RabbitListener ;
import org. springframework. amqp. rabbit. listener. api. ChannelAwareMessageListener ;
import org. springframework. stereotype. Component ; @Component
public class TestConsumer implements ChannelAwareMessageListener { @RabbitListener ( queues = MQConstants . TEST_QUEUE_1 , concurrency = "10" ) @Override public void onMessage ( Message message, Channel channel) throws Exception { String body = new String ( message. getBody ( ) ) ; System . out. println ( " [x] Received: " + body) ; try { if ( "error" . equals ( body) ) { throw new Exception ( "处理失败" ) ; } System . out. println ( " [✔] Done" ) ; } catch ( Exception e) { System . out. println ( " [✖] Error: " + e. getMessage ( ) ) ; throw e; } } }