RabbitMQ 是一款流行的开源消息代理软件,它实现了高级消息队列协议(AMQP)。在消息传递过程中,确保消息被正确处理是至关重要的。RabbitMQ 提供了多种机制来确保消息的可靠性,其中确认模式(Acknowledgements Mode)是一个关键特性。
什么是确认模式?
确认模式(Acknowledgements Mode)允许消费者在成功处理消息后显式地向 RabbitMQ 服务器发送确认信号(ack)。只有在收到确认信号后,RabbitMQ 服务器才会从队列中删除该消息。如果消费者未能发送确认信号(例如,由于消费者崩溃或网络故障),RabbitMQ 会认为消息尚未被处理,并在适当的时候重新发送消息。
RabbitMQ 提供了三种主要的确认模式:
- 手动确认(Manual Acknowledgement):消费者需要显式地发送确认信号。
- 自动确认(Automatic Acknowledgement):消息一旦被消费者接收,立即自动确认。
- 批量确认(Batch Acknowledgement):消费者可以一次确认多条消息。
为什么使用确认模式?
- 确保消息不丢失:即使消费者崩溃,消息也会重新发送。
- 提高可靠性:通过控制确认时机,可以更好地管理消息处理流程。
- 灵活性:可以根据不同的业务需求选择不同的确认模式。
Java 代码示例
下面是一个使用 Java 和 Spring AMQP 实现 RabbitMQ 确认模式的示例。
Maven 依赖
首先,在你的 pom.xml
文件中添加 Spring AMQP 依赖:
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <!-- 其他依赖 -->
</dependencies>
配置 RabbitMQ
在 application.properties
文件中配置 RabbitMQ 连接信息:
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
配置类
配置一个队列、交换机和绑定:
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; @Configuration
public class RabbitMQConfig { public static final String QUEUE_NAME = "exampleQueue"; public static final String EXCHANGE_NAME = "exampleExchange"; public static final String ROUTING_KEY = "exampleRoutingKey"; @Bean public Queue queue() { return new Queue(QUEUE_NAME, true); // 持久化队列 } @Bean public DirectExchange exchange() { return new DirectExchange(EXCHANGE_NAME); } @Bean public Binding binding(Queue queue, DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY); }
}
消息监听器
使用手动确认模式来监听队列:
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener.ReturnCallback;
import org.springframework.amqp.rabbit.listener.api.RabbitListenerErrorHandler;
import org.springframework.amqp.rabbit.listener.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.listener.config.SimpleRabbitListenerEndpoint;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import com.rabbitmq.client.Channel; @Configuration
public class ListenerConfig { @Autowired private ConnectionFactory connectionFactory; @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(jsonMessageConverter()); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 设置为手动确认模式 return factory; } @Bean public MessageConverter jsonMessageConverter() { return new SimpleMessageConverter(); } @RabbitListener(queues = RabbitMQConfig.QUEUE_NAME, containerFactory = "rabbitListenerContainerFactory") public void listen(String message, Channel channel, org.springframework.amqp.core.Message rabbitMessage) throws Exception { try { // 处理消息 System.out.println("Received <" + message + ">"); // 发送确认信号 channel.basicAck(rabbitMessage.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { // 发送拒绝信号,并设置为重新入队(requeue) channel.basicNack(rabbitMessage.getMessageProperties().getDeliveryTag(), false, true); throw e; } }
}
发送消息
编写一个简单的控制器来发送消息到 RabbitMQ:
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController; @RestController
public class MessageController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/send") public String sendMessage(@RequestParam String message) { rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, RabbitMQConfig.ROUTING_KEY, message); return "Message sent: " + message; }
}
运行示例
- 启动 Spring Boot 应用程序。
- 使用浏览器或工具(如 Postman)访问
http://localhost:8080/send?message=HelloRabbitMQ
来发送消息。 - 观察控制台输出,确认消息已被接收并处理。
通过这种方式,你可以确保消息在成功处理后才会从队列中删除,从而提高了消息传递的可靠性。
新时代农民工