消息何去何从
mandatory
和immediate
是channel.basicPublish方法中的两个参数,他们都有当消息传递过程中不可达目的地时将消息返回给生产者的功能。RabbitMQ提供的备份交换器可以将未能被交换器路由的消息(没有绑定队列或者没有匹配的绑定)存储起来,而不用返回给客户端。
mandatory参数
当mandatory参数为true时,交换器无法根据自身的类型和路由键找到符合条件的队列,那么RabbitMQ会调用Basic.Return命令将消息返回给生产者。当参数为false时,出现上述情形,则消息直接丢弃。
生产者通过调用channel.addReturnListener来添加ReturnListener监听器实现。
代码示例:
从AMQP协议层面来说,其对应的流转过程如下
immediate参数(已弃用)
当immediate参数为true时,如果交换器在将消息路由到队列时发现队列上并不存在任何消费者,那么这条消息不会存入队列中。当与路由键匹配的所有队列都没有消费者时,该消息会通过Basic.Return返回生产者。
总结
mandatory参数:告诉服务器至少将该消息路由到一个队列中,否则将消息返回给生产者。
immediate参数:告诉服务器,如果该消息关联的队列上有消费者,则立刻投递;如果所有匹配的队列上都没有消费者,则直接将消息返还给生产者,不用将消息存入队列等待消费者了。
备份交换器(AE)
生产者在发送消息的时候如果不设置mandatory参数,那么消息在未被路由器的情况下将会丢失;如果设置了mandatory参数,那么需要添加ReturnListener的编程逻辑,生产者代码将会变得复杂。使用 备份交换器,可以将未被路由的消息存储在RabbitMQ中,再在需要的时候去处理这些消息
。
声明备份交换器的方式:
上面的代码中声明了两个交换器normalExchange和myAe,分别绑定了normalQueue和unroutedQueue。同时,将myAe设置为normalExchange的备份交换器。注意,myAe的交换器类型为fanout
。
如果此时发送一条消息到normalExchange上,当路由键等于“normalKey”的时候,消息能正确路由到normalQueue这个队列中,如果路由键设为其他值,比如“errorKey”,即消息不能被正确的路由到normalExchange绑定的任何队列上,此时就会发送给myAe,进而发送到unroutedQueue这个队列。
注意以下几种情况:
- 如果设置的备份交换器不存在,客户端和服务端都不会有异常,此时消息会丢失。
- 如果备份交换器没有绑定任何队列,客户端和服务端都不会有异常,此时消息会丢失。
- 如果备份交换器没有任何匹配的队列,客户端和服务端都不会有异常,此时消息会丢失。
- 如果备份交换器和mandatory参数一起使用,那么mandatory参数无效。
消息过期时间(TTL)
TTL,过期时间,RabbitMQ可以对消息和队列设置过期时间。
设置消息的TTL
设置消息的TTL有两种方法,如果两种方法一起使用,则消息的TTL以两者之间较小的那个数值为准。
-
- 通过队列属性设置:队列中所有消息都有相同的过期时间。
-
- 对消息本身进行单独设置:每条消息的过期时间都不同。
消息在队列中的生存时间一旦超过设置的TTL时,就会变成 “死信”
,消费者将无法再收到该消息。
1. 通过队列属性设置消息TTL
在channel.queueDeclare方法中加入x-message-ttl参数实现,参数单位是毫秒。
如果不设置TTL,表示消息不会过期,如果TTL设置为0,表示除非此时可以直接将消息投递到消费者,否则该消息会被立即丢弃。
2. 设置单条消息的TTL
在channel.basicPublish方法中加入expiration属性,单位是ms。
对于设置队列属性的消息,一旦过期,就会从队列中抹去。而第二种方法,即使消息过期,也不会马上从队列中抹去,因为这条消息是否过期是在即将投递到消费者之前判定的。
设置队列的TTL
设置队列的TTL,可以控制队列被自动删除前处于未使用状态的时间。未使用表明该队列上没有任何的消费者,队列也没有被重新声明,并且在过期时间段内也未调用过Basic.Get命令。
RabbitMQ会确保在过期时间到达后将队列删除,但是不保证删除的动作有多及时,在MQ重启后,持久化队列的过期时间会被重新计算。
示例代码:
Map<String,Object> args = new HashMap<>();
args.put("x-expires",180000); //单位ms
channel.queueDeclare("myqueue",false,false,false,args);
三种特殊消息队列
1. 死信队列
DLX,死信交换机,也称为死信邮箱。当消息在一个队列中变成死信之后,他能被重新发送到另一个交换器中,这个交换器就是DLX。绑定DLX的队列称为死信队列。
消息变成死信一般由于以下几种情况:
- 消息者拒绝消费,并且设置requeue参数为false。
- 消息过期
- 队列到达最大长度。
死信队列,实际上就是设置某个队列的属性。当这个队列中存在死信时,MQ就会自动的将这个消息重新发布到设置的DLX上去,进而被路由到另一个队列,即死信队列。可以监听这个队列中的消息已进行相应的处理。这个特性与将该消息的TTL设置为0配合使用可以弥补immedaite参数的功能。
声明死信队列的方式
通过在channel.queueDeclare方法中设置x-dead-letter-exchange参数来为这个队列添加DLX。
channel.exchangeDeclare("dlx-exchange","direct");
Map<String,Object> args = new HashMap<>();
args.put("x-dead-letter-exchange","dlx_exchange");
//为队列添加DLX
channel.queueDeclare("myqueue",false,false,false,args);
创建队列,并且为其设置TTL和DLX
channel.exchangeDeclare("exchange.dlx","direct",true);
channel.exchangeDeclare("exchange.normal","fanout",true);
Map<String,Object> args = new HashMap<>();
args.put("x-message-ttl",10000);
args.put("x-dead-letter-exchange","exchange.dlx");
args.put("x-dead-letter-routing-key","routingKey");
channel.queueDeclare("queue.normal",true,false,false,args);
channel.queueBind("queue.normal","exchange.normal","");
channel.queueBind("queue.normal","exchange.normal","");
channel.queueDeclare("queue.dlx",true,false,false,null);
channel.queueBind("queue.dlx","exchange.dlx","routingKey");
channel.basicPublish("exchange.normal","rk",MessageProperties.PERSISTENT_TEXT_PLAIN,"dlx".getBytes());
消息路由过程
生产者首先发送一条携带路由键为“rk”的消息,然后经过交换器exchange.normal顺利的存储到队列queue.normal中。由于队列queue.normal设置了过期时间为10s。在这10s内没有消费者消费这条消息,那么判定这条消息为过期。由于设置了DLX,过期之时,消息被丢弃给交换器exchange.dlx中,这时找到与exchange.dlx匹配的队列queue.dlx,最后消息被存储在queue.dlx这个死信队列中。
2. 延迟队列
延迟队列的使用场景有很多,比如:
- 在订单系统中,一个用户下单后通常有30分钟的时间进行支付,如果在30分钟内没有支付成功,那么这个订单将进行异常处理,这时就可以使用延迟队列来处理这些订单。
- 用户系统通过手机远程遥控家里的智能设备在指定的时间进行工作,这时候就可以将用户指令发送到延迟队列,当指令设定的时间到了再将指令推送到智能设备。
在AMQP协议中,本身没有直接支持延迟队列的功能,但是可以通过前面的DLX和TTL模拟出延迟队列的功能。
在真实应用中,对于延迟队列可以根据延迟时间的长短分为多个等级,一般分为5s、10s、30s、1分钟、5分钟、10分钟、30分钟、1小时等维度。
如图所示,根据应用需求的不同,生产者在发送消息的时候通过设置不同的路由键,将消息发送到与交换器绑定的不同队列中,这里队列分别设置了时间为5s、10s、30s、1分钟,同时也分别设置了DLX和相应的死信队列。
当消息过期时,就会转存到相应的死信队列(即延迟队列中),这样消费者根据业务自身情况,分别选择不同延迟等级的延迟队列进行消费。
3. 优先级队列
优先级高的消息具备优先被消费的特权。
可以通过设置队列的x-max-priority参数来实现
Map<String,Object> args = new HashMap<>();
args.put("x-max-priority",10);
channel.queueDeclare("queue.priority",true,false,false,args);
上述代码演示如何配置一个队列的最大优先级,再此之后,需要在发送时在消息中设置消息当前的优先级。
代码中设置消息优先级为5,默认最低为0,最高为队列设置的最大优先级。优先级高的消息可以被优先消费,如果消费者的消费速度大于生产者的速度且Broker中没有消息堆积的情况下,对发送的消息设置优先级就没有什么实际意义。
使用消息队列实现RPC
RPC即远程过程调用,他的主要功用是让构建分布式计算更容易。一般在RabbitMQ中进行RPC是很简单的,客户端发送请求消息,服务端回复响应消息。为了接收响应的消息,我们需要在请求消息中发送一个回调队列,如下所示。
设置回调队列
在BasicProperties中,包含了14个属性,这里用到其中两个属性
- replyTo: 通常用来设置一个回调队列
- correlationId:用来关联请求和其调用RPC之后的回复。
使用上述代码,为每个RPC请求创建一个回调队列,是非常低效的,我们可以使用一个更具通用性的方案,为每个客户端创建一个单一的回调队列。
这样就产生了一个新的问题,对于回调队列而言,在其接收到一条回复的消息之后,他并不知道这条消息应该和哪个请求匹配。这里就用到了correlationId,为每一个请求设置一个唯一的correlationId。之后,在回调队列接收到回复的消息时,可以根据这个属性匹配到响应的请求。如果收到一个未知的correlationId,则可以简单的将其丢弃。
RPC处理流程
- 当客户端启动时,创建一个匿名的回调队列
- 客户端为RPC请求设置2个属性,replyTo用来告知RPC服务端回复请求时的目的队列即回调队列,correlationId用来标记一个请求。
- 请求被发送到rpc_queue队列中。
- RPC服务端监听rpc_queue队列中的请求,当请求到来时,服务端会处理并且把带有结果的消息发送给客户端,接受的队列就是replyTo设定的回调队列。
- 客户端监听回调队列,当有消息时,检查correlationId属性,如果与请求匹配,那就是结果了。
示例代码
RPC服务端代码
RPC客户端代码
持久化
RabbitMQ的持久化分为三个部分:交换器的持久化、队列的持久化、消息的持久化
。
交换器的持久化是通过在声明交换器时将durable参数设置为true实现。如果交换器不设置持久化,那么在MQ服务重启后,相关的 交换器元数据会丢失
。不过消息不会丢失,只是不能将消息发送到这个交换器了。对于一个长期使用的交换器来说,需要设置为持久化
。
队列的持久化是通过在声明队列时将durable参数设置为true实现,如果队列不设置持久化,那么在MQ服务器重启之后,相关队列的元数据会丢失,数据也会丢失。设置了队列的持久化可以保证其本身元数据不会因为异常而丢失,但是并不能保证内部所存储的消息不会丢失。为了确保消息不丢失,需要设置消息的投递模式(deliveryMode)为2
。前面实例中的MessageProperties.PERSISTENT_TEXT_PALIN实际上是封装了这个属性。
注意
:
设置队列和消息持久化,当MQ服务重启之后,消息依旧存在。单单只设置队列持久化,重启之后消息会丢失;单单只设置消息持久化,重启之后队列丢失,继而消息也会丢失。
可以将所有的消息都设置为持久化,但是这样会严重影响MQ的性能。写入磁盘的速度比写入内存速度慢得多。在选择是否要持久化消息时,需要在可靠性和吞吐量之间做一个权衡。
将交换器、队列、消息都设置持久化之后,能保证百分百不丢失数据吗,答案是否定的,有如下几种场景。
第一种:消费者设置autoAck为true时,那么消费者接收到消息之后,还没来得及处理就宕机了。
第二种:在持久化的消息正确存入MQ之后,还需要一段时间才能存入磁盘。MQ并不会为每条消息都进行同步存盘(调用内核的fsync方法)处理,可能仅仅保存到操作系统缓存之中而不是物理磁盘之中。如果这段时间内MQ发生宕机,消息还未来得及落盘。可以使用MQ镜像队列机制或者发送方确认机制来解决。
如何保证消息不丢失
生产者确认
在使用MQ的时候,可以通过持久化操作来解决因为服务器异常崩溃而导致的消息丢失,除此之外,当消息的生产者将消息发送出去之后,消息到底有没有正确到达服务器?如果不进行特殊配置,默认情况下发送出去的消息没有任何返回的信息给生产者。如果在消息达到服务器之前丢失,持久化操作解决不了问题
。
RabbitMQ针对这个问题,提供了两种解决方式:
- 通过事务机制实现
- 通过发送方确认机制实现。
事务机制
channel.txSelect : 用于将信道设置成事务模式。
channel.txCommit : 用于提交事务。
channel.txRollback: 用于事务回滚。
在通过channel.txSelect 方法开启事务之后,我们便可以发布消息给RabbitMQ了,如果事务提交成功,则消息一定到达了MQ,如果MQ异常崩溃或者其他原因出现异常,这个时候我们便可以将其捕获,进而通过执行channel.txRollback回滚事务。
AQMP协议流转过程
开启事务和不开启相比多了四个步骤:
- 客户端发送tx.select,将信道置为事务模式。
- Broker回复tx.select-ok,确认已将信道置为事务模式。
- 在发送完消息之后,客户端发送Tx.commit提交事务。
- Broker回复Tx.commit-ok,确认事务提交。
事务回滚样例
AQMP协议流转过程
事务确实能够解决发送方和MQ之间消息确认的问题,只有消息成功被MQ接收,事务才能提交成功
。但是,使用事务机制会吸干MQ性能
。
发送方确认机制
事务机制在一条消息发送出去之后会使发送端阻塞,以等待MQ的回应
,之后才能发送下一条消息。相比之下,发送方确认机制的好处在于它是异步的 (一边发送,一边等待确认)
,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用程序便可以通过回调方法来处理该消息。如果MQ因为自身内部错误导致消息丢失,就会发送一条nack(Basic.Nack)命令。
生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始递增),一旦消息被投递到所有匹配的队列之后,MQ会发送一个确(Basic.ACK)给生产者(包含唯一的ID),如果消息和队列是持久化的,消息会在写入磁盘之后发出。MQ也可以设置channel.basicAck中的mutiple参数,表示到这个序号之前的所有消息都已经得到了处理
。
生产者confirm模式示例代码
AQMP协议流转过程
事务机制和发送方确认机制QPS对比
QPS虽然有提升,但是完全没有达到预想的效果,关键在于:上述confirm模式每次发送一条消息之后就调用channel.waitForConfirms方法,等待服务端确认,此时就退化成了串行同步等待
。
而为什么又比事务机制快一点,核心在于:
confirm机制通信交互的命令是两条Basic.publish和Basic.Ack;
事务机制是3条:Basic.Publish、Tx.Commit、Tx.Commit-Ok,事务机制相比之下多了一个报文帧。
注意
:事务机制和confirm机制确保的是消息能够被正确的发送至交换器,如果此交换器没有匹配的队列,那么消息也会丢失。所以在使用这两种机制的时候要确保所涉及的交换器能够有匹配的队列。更进一步的说,发送方要配合mandatory参数或者备份交换器使用来提高消息传输的可靠性
。
对于confirm模式的改进,有如下两种
批量confirm方法
:每次发送一批消息后,调用channel.waitForConfirms方法,等待服务器的确认返回。异步confirm方法
:提供一个回调方法,服务端确认一条或者多条消息后客户端会回调这个方法进行处理。
1. 批量confirm
在批量confirm方法中,客户端定期或者定量调用channel.waitForConfirms等待MQ确认返回,批量极大提高了confirm的效率,但是 问题在于出现Basic.Nack或者超时情况时,客户端需要将这一批次的消息全部重发
。会带来明显的重复消息数,如果消息经常丢失,批量confirm性能反而下降。
批量confirm示例
2. 异步confirm
异步Confirm在客户端channel接口中提供addConfirmListener方法,添加ConfirmListener这个回调接口,这个接口包含两个方法:handleAck和handleNack。分别用来处理Basic.Ack和Basic.Nack。我们需要为每一个信道维护一个“unconfirm”消息序号集合,每发送一条消息,集合元素加1.每当调用ConfirmListener中的handleAck方法时,匹配unconfirm集合中元素进行处理。集合优先采用SortedSet集合。
示例代码
QPS测试对比
消费者要点
消息分发
当RabbitMQ队列拥有多个消费者时,队列收到的消息将以轮询的分发方式发送给消费者。每条消息只会发送给订阅列表里的一个消费者。当消费者端负载加重时,只需要创建更多的消费者来消费处理消息即可。
很多时候轮询分分发机制也不是那么优雅,比如,某些消费者任务繁重,来不及消费那么多消息,而某些其他消费者由于机器性能卓越很快处理完所分配到的消息,进而空闲,这样就会造成整体应用的吞吐量下降。
在订阅消费队列之前,消费端程序调用channel.basicQos(5) ,允许信道上的消费者保持最大未确认的消息数为5
,之后订阅某个队列进行消费。RabbitMQ会保存一个消费者列表,每发送一条消息都会为对应的消费者计数,如果该 消费者达到了设定的上限,MQ就不会向这个消费者在发送任何消息
。直到消费者确认了某条消息。这种机制类似于TCP/IP中的滑动窗口。
消息传输保障
一般消息中间件的消息传输保障分为三个层级
- At most once : 最多一次。消息可能会丢失,但绝不会重复传输。
- At least once : 最少一次,消息绝不会丢失,但可能会重复。
- Exactly once : 恰好一次,每条消息肯定会被传输一次且仅传输一次。
RabbitMQ支持“最多一次”和“最少一次”,其中最少一次投递实现需要考虑以下几方面
- 消息生产者需要开启事务机制或者 confirm机制,以确保消息可以可靠传输到RabbitMQ中。
- 消息生产者需要配合使用mandatory参数或者备份交换机来确保消息能够从交换器路由到队列中,进而能够保存下来而不会被丢弃。
- 消息和队列都需要持久化处理,以确保RabbitMQ服务器在遇到异常的情况时不会造成丢失。
- 消费者在消费消息的同时将autoAck设置为false,然后通过手动确认的方式去确认已经正确消费的消息,以避免在消费端引起不必要的消息丢失。
“最多一次”就无须考虑上述方面,生产者虽已发送,消费者随意消费,不过很难确保消息不会丢失。
“恰好一次”是RabbitMQ目前无法保证的。一般可以使用去重机制(例如借助Redis)加上“最少一次”进行处理
。