前言
RocketMQ 5.0 消费者引入了一种新的消费模式:Pop 消费模式,目的是解决 Push 消费模式的一些痛点。
RocketMQ 4.x 之前,消费模式分为两种:
- Pull:拉模式,消费者自行拉取消息、上报消费结果
- Push:推模式,消费者主动拉取消息,看起来像是 Broker 主动推送消息,主动上报消费结果
Pop 消费模式可以看作是 Push 的升级版,Push 消费模式存在以下痛点:
- 客户端负责队列重平衡、消息拉取、消费位点上报、消费失败重试等功能,逻辑太重了,不利于多语言客户端发展
- 队列数/消费者数量变更会触发重平衡操作,期间消息无法消费,容易造成消息堆积
- 队列和消费者强绑定,消费者数量超过队列数后,无法再水平扩容
- 队列和消费者强绑定,消费者僵死状态下导致消息堆积
Pop 消费模式就是要解决这些痛点的,它的设计目标:
- 消费者只管消息拉取、消息消费、上报 ACK,客户端 SDK 轻量级
- 队列不再绑定消费者,消费者可以消费所有队列消息
- 消费者可以很方便的水平扩容
要实现这个目标还是有不小的挑战,看看 RocketMQ 是如何做到的吧。
设计难点
Pop 消费模式是存在一些设计难点的。
Push 模式下队列为什么要和消费者绑定?也就是一个队列同一时间只允许被一个消费者消费,因为这样实现起来简单啊,主要体现在:
- Broker 消息投递处理简单,根据消费者请求的拉取位点投递
- Broker 消费位点管理简单,无需记录消息是否被消费,只需要记录消费位点,位点前的都消费了,位点后的都没消费
如果改成 Pop 模式,首先面临的挑战有:
1、Broker 消息投递时,要记录哪些消息已经投递了,哪些消息还没投递
Pop 模式下一个队列可以被多个消费者消费,假设现在队列里面有 1~10 号消息,消费者A 拉取了 1~3 号消息,消费者B 再拉取的时候,Broker 必须知道 1~3 号消息已经投递过在消费中了,不能再投递给消费者B了,得投递 4 号及之后的消息给消费者B才行。
2、Broker 要记录哪些消息消费成功了,哪些消息消费失败了,不能单纯记录消费位点了
因为队列可以被多个消费者消费,大家都在上报队列粒度的消费位点,Broker 没法管理了。
你看,为了可以让队列被多个消费者消费,Broker 已经不能再按队列维度去管理信息了,必须精确到消息粒度,这就需要额外的数据结构来支撑。
设计实现
取消客户端队列 Rebalance
客户端重平衡,不仅限制了消费者的消费能力,还增加了客户端复杂度,重平衡期间消息无法被消费,还容易造成消息堆积。Pop 消费模式下,消费者可以消费所有队列,也就不再需要客户端重平衡了。消费者查询给自己分配的队列时,Proxy 返回的是 Broker 维度且 queueId=-1 的逻辑队列,Broker 端会投递所有队列的消息。
invisibleTime
Pop 消费模式下,消息投递后会有一个invisibleTime
的概念,即消息的不可见时间,默认是 60 秒。比如 M1 投递给消费者A后,在不可见时间段内,其它消费者是无法消费这条消息的。
另外 Broker 还提供了changeInvisibleTime()
接口修改单条消息的不可见时间,比如消息消费失败后,会根据重试次数来设置新的不可见时间。
CK & ACK 消息
RocketMQ 为了记录消息是否被消费成功,引入了 CK 和 ACK 消息,以及一个专属的 Topic:rmq_sys_REVIVE_LOG_{clusterName}
。
消费者在拉取消息时,Broker 端会给拉取到的这一批消息发一个 CK 消息,CK 消息记录了各消息的偏移量可以定位到具体消息,同时用位图记录了各消息的 ACK 情况。Consumer 消费成功后会调用ackMessage
接口,Broker 会发送一个对应的 ACK 消息;消费失败后会调用changeInvisibleTime
接口,延长消息不可见时间,底层是先发一个旧消息的 ACK 消息,再发一个新消息的 CK 消息。
上述流程的运转是基于消费者正常处理消费结果的前提下的,如果消费者挂了,既不发 ack 也不发 nack,Broker 又该怎么处理这些消息呢?其实也能正常处理,因为 CK 和 ACK 消息均是延时消息,延迟的时间即消息的不可见时间,CK 消息会提前一秒消费,目的是匹配 ACK 消息。
RocketMQ 会启动八个线程消费 REVIVE Topic 对应的八个队列,匹配 CK 消息里还有哪些消息没被 ack,再将这些没被 ack 的消息发送到 RETRY Topic,消费者就可以重新消费了。
源码
先通过流程图熟悉下 Pop 消费模式的大体流程:
ProcessQueue
PushConsumer 启动后,会定时向 Proxy 查询分配的队列:
protected void startUp() throws Exception {......scanAssignmentsFuture = scheduler.scheduleWithFixedDelay(() -> {try {// 扫描分配的队列scanAssignments();} catch (Throwable t) {log.error("Exception raised while scanning the load assignments, clientId={}", clientId, t);}}, 1, 5, TimeUnit.SECONDS);......
}
Pop 消费模式下,客户端无须知道具体的队列数据,因为 Broker 会投递所有队列消息,所以 Proxy 只返回 Broker 维度且 queueId=-1 的一个逻辑队列即可。
消费者拿到分配的队列后,紧接着调用syncProcessQueue()
方法处理队列,有两种情况:
- 队列不再分配给自己,停止拉取消息,移除队列
- 新分配的队列,立即拉取消息
对于新分配的队列,消费者会创建ProcessQueue
对象开始拉取队列消息:
private void receiveMessageImmediately() {// Broker端点列表 gRPC负载均衡调用final Endpoints endpoints = mq.getBroker().getEndpoints();// 构建请求final ReceiveMessageRequest request = consumer.wrapReceiveMessageRequest(batchSize, mq, filterExpression);// 发请求final ListenableFuture<ReceiveMessageResult> future = consumer.receiveMessage(request, mq,consumer.getPushConsumerSettings().getLongPollingTimeout());Futures.addCallback(future, new FutureCallback<ReceiveMessageResult>() {@Overridepublic void onSuccess(ReceiveMessageResult result) {// 处理消息onReceiveMessageResult(result);}}
}
PopMessageProcessor
Proxy 端会开启 gRPC 服务,GrpcMessagingApplication 用来处理客户端的请求,拉取消息对应的处理方法是receiveMessage
,最终会交给 Broker 的 PopMessageProcessor 处理,主要步骤:
- 校验请求参数、队列写权限等等
- 构建消息过滤器 ExpressionMessageFilter
- 按照 1/5 的概率先尝试从重试队列里拉取消息
- 遍历所有队列拉取消息,直到拉取最大消息数
- 如果普通队列拉取的消息数没达到最大消息数,再尝试拉取重试队列
- 如果还有剩余消息,通知其它被长轮询挂起的请求继续拉取
- 返回结果
private RemotingCommand processRequest(final Channel channel, RemotingCommand request)throws RemotingCommandException {......// 参数、权限校验// Topic配置TopicConfig topicConfig =this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());// 消费组订阅配置SubscriptionGroupConfig subscriptionGroupConfig =this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());// 消息过滤ExpressionMessageFilter messageFilter = ......// POP模式下,消费者要遍历所有队列,随机一个下标开始读取,都从0开始读的话,存在热点队列,竞争锁int randomQ = random.nextInt(100);int reviveQid; // REVEIE队列ID 顺序消息999 普通消息:0~7轮询if (requestHeader.isOrder()) {reviveQid = KeyBuilder.POP_ORDER_REVIVE_QUEUE;} else {reviveQid = (int) Math.abs(ckMessageNumber.getAndIncrement() % this.brokerController.getBrokerConfig().getReviveQueueNum());}int commercialSizePerMsg = this.brokerController.getBrokerConfig().getCommercialSizePerMsg();GetMessageResult getMessageResult = new GetMessageResult(commercialSizePerMsg);long restNum = 0; // 剩余消息数// 平均每5次请求,读取一下重试队列boolean needRetry = randomQ % 5 == 0;if (needRetry && !requestHeader.isOrder()) {restNum = popMsgFromQueue(true, getMessageResult, requestHeader, queueId, restNum, reviveQid,channel, popTime, messageFilter,startOffsetInfo, msgOffsetInfo, orderCountInfo);}// POP模式下,请求的queueId=-1,读所有队列if (requestHeader.getQueueId() < 0) {for (int i = 0; i < topicConfig.getReadQueueNums(); i++) {int queueId = (randomQ + i) % topicConfig.getReadQueueNums();restNum = popMsgFromQueue(false, getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, messageFilter,startOffsetInfo, msgOffsetInfo, orderCountInfo);}}// 拉取到的消息数量不满足消费者期望的数量,接着拉取重试队列:%RETRY%{consumerGroup}_{topic}......if (!getMessageResult.getMessageBufferList().isEmpty()) {// 拉取到消息,且队列里面还有消息,通知其它拉取请求if (restNum > 0) {notifyMessageArriving(requestHeader.getTopic(), requestHeader.getConsumerGroup(),requestHeader.getQueueId());}} else {// 没有消息,进入长轮询状态 挂起int pollingResult = polling(channel, request, requestHeader);}......
}
通过队列获取消息的方法是popMsgFromQueue
,虽然队列可以被多个消费者消费,但是同一个消费组下,针对同一个队列拉取消息的行为必须保证串行,所以 Broker 首先会构建一个topic@group@queueId
格式的字符串作为 lockKey 保证加锁成功,然后再获取消息拉取位点 offset,调用 MessageStore 获取消息,最终给这批消息记录 CK 消息。
private long popMsgFromQueue(boolean isRetry, GetMessageResult getMessageResult,PopMessageRequestHeader requestHeader, int queueId, long restNum, int reviveQid,Channel channel, long popTime,ExpressionMessageFilter messageFilter, StringBuilder startOffsetInfo,StringBuilder msgOffsetInfo, StringBuilder orderCountInfo) {String topic = isRetry ? KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(),requestHeader.getConsumerGroup()) : requestHeader.getTopic();// 给队列加锁 同一消费组下的队列串行读取 topic@group@queueIdString lockKey =topic + PopAckConstants.SPLIT + requestHeader.getConsumerGroup() + PopAckConstants.SPLIT + queueId;// 加锁if (!queueLockManager.tryLock(lockKey)) {restNum = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - offset + restNum;return restNum;}// 拉取位点offset = getPopOffset(topic, requestHeader, queueId, true, lockKey);// 获取消息GetMessageResult getMessageTmpResult = this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), topic, queueId, offset,requestHeader.getMaxMsgNums() - getMessageResult.getMessageMapedList().size(), messageFilter);if (!getMessageTmpResult.getMessageMapedList().isEmpty()) {// 追加CheckPointappendCheckPoint(requestHeader, topic, reviveQid, queueId, offset, getMessageTmpResult, popTime, this.brokerController.getBrokerConfig().getBrokerName());}......queueLockManager.unLock(lockKey);
}
这里的锁实现很简单,通过 AtomicBoolean CAS 修改上锁标记位来判断是否加锁成功:
static class TimedLock {private final AtomicBoolean lock;private volatile long lockTime;
}
加锁成功后,Broker 得知道该从哪里开始给消费者投递消息,这就是拉取位点的获取:
- 首先取已提交的消费位点
- 如果 CK 缓冲区有已投递的 PopCheckPoint,则取缓冲区的拉取位点,避免已经投递过的消息重复投递
private long getPopOffset(String topic, PopMessageRequestHeader requestHeader, int queueId, boolean init,String lockKey) {// 已提交的消费位点long offset = this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getConsumerGroup(),topic, queueId);if (offset < 0) {// 还没消费过 判断是从最旧还是最新的开始消费if (ConsumeInitMode.MIN == requestHeader.getInitMode()) {offset = this.brokerController.getMessageStore().getMinOffsetInQueue(topic, queueId);} else {// pop last one,then commit offset.offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - 1;// max & no consumer offsetif (offset < 0) {offset = 0;}if (init) {this.brokerController.getConsumerOffsetManager().commitOffset("getPopOffset",requestHeader.getConsumerGroup(), topic,queueId, offset);}}}// CK缓冲区的位点,这些是已投递、还没提交消费的位点long bufferOffset = this.popBufferMergeService.getLatestOffset(lockKey);if (bufferOffset < 0) {return offset;} else {return bufferOffset > offset ? bufferOffset : offset;}
}
有了拉取位点,接下来就是通过 MessageStore 查询消息了,底层会读取 consumequeue 和 commitlog 文件,这里不赘述。拉取到消息列表后,Broker 会给这一批消息记录一个 CK 消息,用于后续匹配 ACK 消息。
PopCheckPoint
CK 消息对应的类是 PopCheckPoint,主要记录了:消息拉取时间、消息偏移量、不可见时间、消息 ACK 位图等等。
public class PopCheckPoint {// 起始偏移量@JSONField(name = "so")private long startOffset;// 消息拉取时间@JSONField(name = "pt")private long popTime;@JSONField(name = "it")private long invisibleTime;// 位图 收到ACK消息则把对应位设为1@JSONField(name = "bm")private int bitMap;// 消息数量@JSONField(name = "n")private byte num;@JSONField(name = "q")private byte queueId;@JSONField(name = "t")private String topic;// 消费组@JSONField(name = "c")private String cid;@JSONField(name = "ro")private long reviveOffset;// 消息增量偏移量@JSONField(name = "d")private List<Integer> queueOffsetDiff;@JSONField(name = "bn")String brokerName;
}
构建好 PopCheckPoint 对象,Broker 会把它作为一个普通消息写入 commitlog 持久化,消息内容:
topic: rmq_sys_REVIVE_LOG_{clusterName}
queueId: 0~7轮询
tag: ck
body: json(PopCheckPoint)
deliverTimeMs: invisibleTime-1s
CK 消息存储完毕后,就可以正常返回消息了。需要注意的是,消息在返回给消费者前,Broker 会给消息设置一个很重要的属性:POP_CK
。它是消息关联的 CK 消息的句柄字符串,消费者基于该属性来 ack 消息。
messageExt.getProperties().put(MessageConst.PROPERTY_POP_CK,ExtraInfoUtil.buildExtraInfo(startOffsetInfo.get(key), responseHeader.getPopTime(), responseHeader.getInvisibleTime(),responseHeader.getReviveQid(), messageExt.getTopic(), messageQueue.getBrokerName(), messageExt.getQueueId(), msgQueueOffset)
);
POP_CK
由以下八个属性构成,空格连接,通过该属性可以快速定位到 CK 消息。
{ckQueueOffset} {popTime} {invisibleTime} {reviveQid} {0/1} {brokerName} {queueId} {msgQueueOffset}# 示例值
2 1698656741635 60000 0 0 broker-a 3 2
AckMessageProcessor
消费者消费完消息后,会调用eraseMessage()
擦除消息,也就是根据消费结果判断是 ack 还是 nack。
public void eraseMessage(MessageViewImpl messageView, ConsumeResult consumeResult) {statsConsumptionResult(consumeResult);ListenableFuture<Void> future = ConsumeResult.SUCCESS.equals(consumeResult) ? ackMessage(messageView) :nackMessage(messageView);future.addListener(() -> evictCache(messageView), MoreExecutors.directExecutor());
}
如果消费成功,则调用ackMessage
接口,Broker 的处理方式也很简单,就是构建一个 AckMsg 对象,然后把它作为消息体发一个 ACK 消息。
private RemotingCommand processRequest(final Channel channel, RemotingCommand request,boolean brokerAllowSuspend) throws RemotingCommandException {......AckMsg ackMsg = new AckMsg();ackMsg.setAckOffset(requestHeader.getOffset());ackMsg.setStartOffset(ExtraInfoUtil.getCkQueueOffset(extraInfo));ackMsg.setConsumerGroup(requestHeader.getConsumerGroup());ackMsg.setTopic(requestHeader.getTopic());ackMsg.setQueueId(requestHeader.getQueueId());ackMsg.setPopTime(ExtraInfoUtil.getPopTime(extraInfo));ackMsg.setBrokerName(ExtraInfoUtil.getBrokerName(extraInfo));if (rqId == KeyBuilder.POP_ORDER_REVIVE_QUEUE) {顺序消息处理}// 构建消息存储MessageExtBrokerInner msgInner = new MessageExtBrokerInner();msgInner.setTopic(reviveTopic);msgInner.setBody(JSON.toJSONString(ackMsg).getBytes(DataConverter.charset));msgInner.setQueueId(rqId);msgInner.setTags(PopAckConstants.ACK_TAG);msgInner.setBornTimestamp(System.currentTimeMillis());msgInner.setBornHost(this.brokerController.getStoreHost());msgInner.setStoreHost(this.brokerController.getStoreHost());// 延时消息 拉取时间+不可见时间msgInner.setDeliverTimeMs(ExtraInfoUtil.getPopTime(extraInfo) + ExtraInfoUtil.getInvisibleTime(extraInfo));msgInner.getProperties().put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, PopMessageProcessor.genAckUniqueId(ackMsg));msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));PutMessageResult putMessageResult = this.brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);return response;
}
ACK 消息内容如下:
topic: rmq_sys_REVIVE_LOG_{clusterName}
queueId: 0~7轮询
tag: ack
body: json(AckMsg)
deliverTimeMs: invisibleTime
注意:CK & ACK 消息同属一个Topic
ChangeInvisibleTimeProcessor
如果消费失败,会根据重试次数调用changeInvisibleDuration
接口延长消息不可见时间。Broker 最终会由 ChangeInvisibleTimeProcessor 处理请求,因为消息是按照紧凑的方式顺序写入 commitlog,所以写入后就不支持修改了,所谓的修改消息不可见时间,其实是先发一个新的 CK 消息,再发一个旧消息的 ACK 消息。
private RemotingCommand processRequest(final Channel channel, RemotingCommand request,boolean brokerAllowSuspend) throws RemotingCommandException {......// add new cklong now = System.currentTimeMillis();PutMessageResult ckResult = appendCheckPoint(requestHeader, ExtraInfoUtil.getReviveQid(extraInfo), requestHeader.getQueueId(), requestHeader.getOffset(), now, ExtraInfoUtil.getBrokerName(extraInfo));// ack old msgackOrigin(requestHeader, extraInfo);
}
所以,无论消费者是调用 ack 还是 nack,消息一定会被 ack 掉。另外还有一种场景就是消费者异常,没有上报消费结果,Broker 在消息不可见时间到期后会把这些没被 ack 掉的消息发到重试队列里,让其它消费者消费。
PopReviveService
CK & ACK 消息存储下来以后,由谁来处理呢?AckMessageProcessor 会开启八个线程,消费 REVIVE Topic 的八个队列,线程对应的服务是 PopReviveService。
PopReviveService 线程每秒执行一次,消费 REVIVE Topic 里的消息。如果是 CK 消息则重新构建 PopCheckPoint 对象;如果是 ACK 消息则更新 PopCheckPoint 里的位图,通过 tag 来区分消息类型。
最后处理 PopCheckPoint,已经被 ack 的消息不做处理,未被 ack 的消息会构建一条新消息重新发送到重试队列,Topic 规则是:%RETRY%{consumerGroup}_{topic}
。
public void run() {......ConsumeReviveObj consumeReviveObj = new ConsumeReviveObj();// 消费Revive队列消息,构建PopCheckPoint放入mapconsumeReviveMessage(consumeReviveObj);// 处理PopCheckPointmergeAndRevive(consumeReviveObj);
}
protected void consumeReviveMessage(ConsumeReviveObj consumeReviveObj) {HashMap<String, PopCheckPoint> map = consumeReviveObj.map;// 消费位点long oldOffset = this.brokerController.getConsumerOffsetManager().queryOffset(PopAckConstants.REVIVE_GROUP, reviveTopic, queueId);while (true) {// 查询REVIVE队列消息List<MessageExt> messageExts = getReviveMessage(offset, queueId);for (MessageExt messageExt : messageExts) {//ck消息if (PopAckConstants.CK_TAG.equals(messageExt.getTags())) {String raw = new String(messageExt.getBody(), DataConverter.charset);PopCheckPoint point = JSON.parseObject(raw, PopCheckPoint.class);map.put(point.getTopic() + point.getCId() + point.getQueueId() + point.getStartOffset() + point.getPopTime(), point);} else if (PopAckConstants.ACK_TAG.equals(messageExt.getTags())) {// ack消息String raw = new String(messageExt.getBody(), DataConverter.charset);AckMsg ackMsg = JSON.parseObject(raw, AckMsg.class);PopCheckPoint point = map.get(ackMsg.getTopic() + ackMsg.getConsumerGroup() + ackMsg.getQueueId() + ackMsg.getStartOffset() + ackMsg.getPopTime());// 设置CheckPoint ACK位图int indexOfAck = point.indexOfAck(ackMsg.getAckOffset());if (indexOfAck > -1) {point.setBitMap(DataConverter.setBit(point.getBitMap(), indexOfAck, true));} else {POP_LOGGER.error("invalid ack index, {}, {}", ackMsg, point);}} }}
}
CK 消息到期后,会触发reviveMsgFromCk
恢复没有被 ack 的消息,处理方式是构建新消息发到重试队列。
private void reviveMsgFromCk(PopCheckPoint popCheckPoint) throws Throwable {for (int j = 0; j < popCheckPoint.getNum(); j++) {if (DataConverter.getBit(popCheckPoint.getBitMap(), j)) {// 已经ack了,跳过continue;}// 查询实际消息long msgOffset = popCheckPoint.ackOffsetByIndex((byte) j);MessageExt messageExt = getBizMessage(popCheckPoint.getTopic(), msgOffset, popCheckPoint.getQueueId(), popCheckPoint.getBrokerName());if (messageExt == null) {POP_LOGGER.warn("reviveQueueId={},can not get biz msg topic is {}, offset is {} , then continue ",queueId, popCheckPoint.getTopic(), msgOffset);continue;}//skip ck from last epochif (popCheckPoint.getPopTime() < messageExt.getStoreTimestamp()) {POP_LOGGER.warn("reviveQueueId={},skip ck from last epoch {}", queueId, popCheckPoint);continue;}// 重试 构建一条消息重新Put到commitlog topic:%RETRY%{consumerGroup}_{topic}reviveRetry(popCheckPoint, messageExt);}
}
PopBufferMergeService
为了记录消息的 ack 状态,RocketMQ 需要存储额外的 CK & ACK 消息,开销还是比较大的。而消息存储的目的是为了匹配消息的 ack 状态,把未被 ack 的消息发送到重试队列里。
消息消费一般是很快的,意味着正常情况下,很快就能收到消费者发过来的 ack 和 nack 请求,那为什么不直接在内存里完成匹配呢?这样做可以大幅提升性能,消息落盘只作为一个兜底方案。
RocketMQ 是支持优先在内存里匹配 ack 消息的,默认是关闭状态,需要手动把enablePopBufferMerge
打开。
开启内存匹配后,PopCheckPoint 会优先只追加到缓冲区,只有当缓冲区添加失败才会落地磁盘。
public boolean addCk(PopCheckPoint point, int reviveQueueId, long reviveQueueOffset, long nextBeginOffset) {// key: point.getT() + point.getC() + point.getQ() + point.getSo() + point.getPt()if (!brokerController.getBrokerConfig().isEnablePopBufferMerge()) {// 启用内存匹配return false;}if (!serving) {return false;}long now = System.currentTimeMillis();if (point.getReviveTime() - now < brokerController.getBrokerConfig().getPopCkStayBufferTimeOut() + 1500) {if (brokerController.getBrokerConfig().isEnablePopLog()) {POP_LOGGER.warn("[PopBuffer]add ck, timeout, {}, {}", point, now);}return false;}if (this.counter.get() > brokerController.getBrokerConfig().getPopCkMaxBufferSize()) {POP_LOGGER.warn("[PopBuffer]add ck, max size, {}, {}", point, this.counter.get());return false;}PopCheckPointWrapper pointWrapper = new PopCheckPointWrapper(reviveQueueId, reviveQueueOffset, point, nextBeginOffset);if (!checkQueueOk(pointWrapper)) {// 队列默认不超过20000个return false;}// 添加到commitOffsetsputOffsetQueue(pointWrapper);// 添加到缓冲区this.buffer.put(pointWrapper.getMergeKey(), pointWrapper);this.counter.incrementAndGet();if (brokerController.getBrokerConfig().isEnablePopLog()) {POP_LOGGER.info("[PopBuffer]add ck, {}", pointWrapper);}return true;
}
同样的,Broker 在接收到消息 ack 请求时,也会优先只改缓冲区的 PopCheckPoint ack 位图,无需存储 ACK 消息。
public boolean addAk(int reviveQid, AckMsg ackMsg) {if (!brokerController.getBrokerConfig().isEnablePopBufferMerge()) {return false;}if (!serving) {return false;}try {PopCheckPointWrapper pointWrapper = this.buffer.get(ackMsg.getTopic() + ackMsg.getConsumerGroup() + ackMsg.getQueueId() + ackMsg.getStartOffset() + ackMsg.getPopTime() + ackMsg.getBrokerName());if (pointWrapper == null) {if (brokerController.getBrokerConfig().isEnablePopLog()) {POP_LOGGER.warn("[PopBuffer]add ack fail, rqId={}, no ck, {}", reviveQid, ackMsg);}return false;}if (pointWrapper.isJustOffset()) {return false;}PopCheckPoint point = pointWrapper.getCk();long now = System.currentTimeMillis();if (point.getReviveTime() - now < brokerController.getBrokerConfig().getPopCkStayBufferTimeOut() + 1500) {if (brokerController.getBrokerConfig().isEnablePopLog()) {POP_LOGGER.warn("[PopBuffer]add ack fail, rqId={}, almost timeout for revive, {}, {}, {}", reviveQid, pointWrapper, ackMsg, now);}return false;}if (now - point.getPopTime() > brokerController.getBrokerConfig().getPopCkStayBufferTime() - 1500) {if (brokerController.getBrokerConfig().isEnablePopLog()) {POP_LOGGER.warn("[PopBuffer]add ack fail, rqId={}, stay too long, {}, {}, {}", reviveQid, pointWrapper, ackMsg, now);}return false;}// 直接更改内存里的位图int indexOfAck = point.indexOfAck(ackMsg.getAckOffset());if (indexOfAck > -1) {markBitCAS(pointWrapper.getBits(), indexOfAck);} else {POP_LOGGER.error("[PopBuffer]Invalid index of ack, reviveQid={}, {}, {}", reviveQid, ackMsg, point);return true;}if (brokerController.getBrokerConfig().isEnablePopLog()) {POP_LOGGER.info("[PopBuffer]add ack, rqId={}, {}, {}", reviveQid, pointWrapper, ackMsg);}return true;} catch (Throwable e) {POP_LOGGER.error("[PopBuffer]add ack error, rqId=" + reviveQid + ", " + ackMsg, e);}return false;}
内存资源有限,一直往里堆 PopCheckPoint 也不行啊,这时候就需要做两件事:
- 把内存里已经匹配完 ack 的 PopCheckPoint 移除掉
- 隔太久还没 ack 掉的 PopCheckPoint,必须要落盘存储了
PopBufferMergeService 本身也是个线程,会每隔 5ms 扫描一次缓冲区,执行上述操作。
内存里的 PopCheckPoint 移除前需要满足两个条件:
- PopCheckPoint 消息已经持久化了
- PopCheckPoint 在内存里就已经全被 ack 掉了
还没有完全被 ack 掉的 PopCheckPoint 移除前需要做两件事:
- PopCheckPoint 消息持久化
- 已经被 ack 掉的消息也要持久化
private void scan() {long startTime = System.currentTimeMillis();int count = 0, countCk = 0;// 迭代缓冲区的PopCheckPointIterator<Map.Entry<String, PopCheckPointWrapper>> iterator = buffer.entrySet().iterator();while (iterator.hasNext()) {Map.Entry<String, PopCheckPointWrapper> entry = iterator.next();PopCheckPointWrapper pointWrapper = entry.getValue();// CheckPoint已持久化,或已被ACK,从缓冲区删除// just process offset(already stored at pull thread), or buffer ck(not stored and ack finish)if (pointWrapper.isJustOffset() && pointWrapper.isCkStored() || isCkDone(pointWrapper)|| isCkDoneForFinish(pointWrapper) && pointWrapper.isCkStored()) {if (brokerController.getBrokerConfig().isEnablePopLog()) {POP_LOGGER.info("[PopBuffer]ck done, {}", pointWrapper);}iterator.remove();counter.decrementAndGet();continue;}// 把超时或停留时间超10s的CheckPoint从缓冲区删除,删除前要先持久化PopCheckPoint point = pointWrapper.getCk();long now = System.currentTimeMillis();boolean removeCk = !this.serving;// ck will be timeoutif (point.getReviveTime() - now < brokerController.getBrokerConfig().getPopCkStayBufferTimeOut()) {removeCk = true;}// 内存停留时间超过10秒,也要移除掉if (now - point.getPopTime() > brokerController.getBrokerConfig().getPopCkStayBufferTime()) {removeCk = true;}if (now - point.getPopTime() > brokerController.getBrokerConfig().getPopCkStayBufferTime() * 2L) {POP_LOGGER.warn("[PopBuffer]ck finish fail, stay too long, {}", pointWrapper);}// double checkif (isCkDone(pointWrapper)) {continue;} else if (pointWrapper.isJustOffset()) {// just offset should be in store.if (pointWrapper.getReviveQueueOffset() < 0) {// reviveQueueOffset<0代表还没存储,要先存储putCkToStore(pointWrapper, false);countCk++;}continue;} else if (removeCk) {// put buffer ak to storeif (pointWrapper.getReviveQueueOffset() < 0) {putCkToStore(pointWrapper, false);countCk++;}if (!pointWrapper.isCkStored()) {continue;}for (byte i = 0; i < point.getNum(); i++) {// reput buffer ak to store// 存储ack消息if (DataConverter.getBit(pointWrapper.getBits().get(), i)&& !DataConverter.getBit(pointWrapper.getToStoreBits().get(), i)) {if (putAckToStore(pointWrapper, i)) {count++;markBitCAS(pointWrapper.getToStoreBits(), i);}}}if (isCkDoneForFinish(pointWrapper) && pointWrapper.isCkStored()) {if (brokerController.getBrokerConfig().isEnablePopLog()) {POP_LOGGER.info("[PopBuffer]ck finish, {}", pointWrapper);}iterator.remove();counter.decrementAndGet();continue;}}}// 扫描commitOffsets,提交消费位点int offsetBufferSize = scanCommitOffset();long eclipse = System.currentTimeMillis() - startTime;if (eclipse > brokerController.getBrokerConfig().getPopCkStayBufferTimeOut() - 1000) {POP_LOGGER.warn("[PopBuffer]scan stop, because eclipse too long, PopBufferEclipse={}, " +"PopBufferToStoreAck={}, PopBufferToStoreCk={}, PopBufferSize={}, PopBufferOffsetSize={}",eclipse, count, countCk, counter.get(), offsetBufferSize);this.serving = false;} else {if (scanTimes % countOfSecond1 == 0) {POP_LOGGER.info("[PopBuffer]scan, PopBufferEclipse={}, " +"PopBufferToStoreAck={}, PopBufferToStoreCk={}, PopBufferSize={}, PopBufferOffsetSize={}",eclipse, count, countCk, counter.get(), offsetBufferSize);}}scanTimes++;if (scanTimes >= countOfMinute1) {counter.set(this.buffer.size());scanTimes = 0;}
}
内存里的 PopCheckPoint 除了会根据唯一键,构建一个 Map,还会根据topic@cid@queueId
构建一个 QueueWithTime 队列连接起来。
ConcurrentHashMap<String/*topic@cid@queueId*/, QueueWithTime<PopCheckPointWrapper>> commitOffsets =new ConcurrentHashMap<>();
QueueWithTime 队列的用途有两个:
- 内存里的 PopCheckPoint 处理完毕后,提交消费位点
- 消息拉取时,获取拉取位点,避免已经投递的消息重复投递
内存里的 PopCheckPoint 只要持久化了或者全被 ack 掉了,就可以提交消费位点了。因为这些消息要么被成功消费了,要么后续在处理 CK 消息时也会被发送到重试队列里。提交消费位点的方法是commitOffset()
private boolean commitOffset(final PopCheckPointWrapper wrapper) {if (wrapper.getNextBeginOffset() < 0) {return true;}final PopCheckPoint popCheckPoint = wrapper.getCk();final String lockKey = wrapper.getLockKey();// 加锁if (!queueLockManager.tryLock(lockKey)) {return false;}try {// 旧的消费位点final long offset = brokerController.getConsumerOffsetManager().queryOffset(popCheckPoint.getCId(), popCheckPoint.getTopic(), popCheckPoint.getQueueId());if (wrapper.getNextBeginOffset() > offset) {if (brokerController.getBrokerConfig().isEnablePopLog()) {POP_LOGGER.info("Commit offset, {}, {}", wrapper, offset);}} else {// maybe store offset is not correct.POP_LOGGER.warn("Commit offset, consumer offset less than store, {}, {}", wrapper, offset);}// 提交消费位点brokerController.getConsumerOffsetManager().commitOffset(getServiceName(),popCheckPoint.getCId(), popCheckPoint.getTopic(), popCheckPoint.getQueueId(), wrapper.getNextBeginOffset());} finally {queueLockManager.unLock(lockKey);}return true;
}
尾巴
RocketMQ 5.0 的 Pop 消费模式是 Push 模式的升级版,它解决了原先 Push 模式下队列只能由一个消费者消费的问题、去除了客户端繁重的重平衡逻辑、降低了消息堆积的风险。核心逻辑是 Broker 给每次拉取的一批消息发一个 CK 延时消息,客户端 ack 时再发一个 ACK 延时消息,消息到期后对 CK 消息做 ACK 匹配,把未被 ack 掉的消息发到重试独立里。这种模式下,不可避免的会带来额外开销,所以 RocketMQ 也支持优先在内存里完成匹配,CK 和 ACK 消息就不用存储了。