方式一:Consumer设置exclusive
注意条件
- 作用于basic.consume
- 不支持quorum queue
当同时有A、B两个消费者调用basic.consume方法消费,并将exclusive设置为true时,第二个消费者会抛出异常:
com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=403, reply-text=ACCESS_REFUSED - queue 'test' in vhost '/' in exclusive use, class-id=60, method-id=20)at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:190)at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:223)at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:981)at com.dms.rabbitmq.TopicSender.lambda$main$2(TopicSender.java:63)at java.base/java.lang.Thread.run(Thread.java:840)
Spring AMQP 如何通过exclusive实现顺序消费:
核心逻辑:
while (!DirectMessageListenerContainer.this.started && isRunning()) {this.cancellationLock.reset();try {for (String queue : queueNames) {consumeFromQueue(queue);}}catch (AmqpConnectException | AmqpIOException e) {long nextBackOff = backOffExecution.nextBackOff();if (nextBackOff < 0 || e.getCause() instanceof AmqpApplicationContextClosedException) {DirectMessageListenerContainer.this.aborted = true;shutdown();this.logger.error("Failed to start container - fatal error or backOffs exhausted",e);this.taskScheduler.schedule(this::stop, Instant.now());break;}this.logger.error("Error creating consumer; retrying in " + nextBackOff, e);doShutdown();try {Thread.sleep(nextBackOff); // NOSONAR}catch (InterruptedException e1) {Thread.currentThread().interrupt();}continue; // initialization failed; try again having rested for backOff-interval}DirectMessageListenerContainer.this.started = true;DirectMessageListenerContainer.this.startedLatch.countDown();
}
- 抛出异常后,会重试
- 重试间隔、次数受recoveryInterval(默认无限)、recoveryBackOff控制
方式二:single active consumer
原理:
代码示例:
Channel ch = ...;
Map<String, Object> arguments = newHashMap<String, Object>();
arguments.put("x-single-active-consumer", true);
ch.queueDeclare("my-queue", false, false, false, arguments);
参考资料:https://www.rabbitmq.com/blog/2022/07/05/rabbitmq-3-11-feature-preview-single-active-consumer-for-streams