RocketMQ5.0Pop消费模式

前言

RocketMQ 5.0 消费者引入了一种新的消费模式:Pop 消费模式,目的是解决 Push 消费模式的一些痛点。
RocketMQ 4.x 之前,消费模式分为两种:

  • Pull:拉模式,消费者自行拉取消息、上报消费结果
  • Push:推模式,消费者主动拉取消息,看起来像是 Broker 主动推送消息,主动上报消费结果

Pop 消费模式可以看作是 Push 的升级版,Push 消费模式存在以下痛点:

  • 客户端负责队列重平衡、消息拉取、消费位点上报、消费失败重试等功能,逻辑太重了,不利于多语言客户端发展
  • 队列数/消费者数量变更会触发重平衡操作,期间消息无法消费,容易造成消息堆积
  • 队列和消费者强绑定,消费者数量超过队列数后,无法再水平扩容
  • 队列和消费者强绑定,消费者僵死状态下导致消息堆积

Pop 消费模式就是要解决这些痛点的,它的设计目标:

  • 消费者只管消息拉取、消息消费、上报 ACK,客户端 SDK 轻量级
  • 队列不再绑定消费者,消费者可以消费所有队列消息
  • 消费者可以很方便的水平扩容

要实现这个目标还是有不小的挑战,看看 RocketMQ 是如何做到的吧。

设计难点

Pop 消费模式是存在一些设计难点的。
Push 模式下队列为什么要和消费者绑定?也就是一个队列同一时间只允许被一个消费者消费,因为这样实现起来简单啊,主要体现在:

  • Broker 消息投递处理简单,根据消费者请求的拉取位点投递
  • Broker 消费位点管理简单,无需记录消息是否被消费,只需要记录消费位点,位点前的都消费了,位点后的都没消费

如果改成 Pop 模式,首先面临的挑战有:
1、Broker 消息投递时,要记录哪些消息已经投递了,哪些消息还没投递
Pop 模式下一个队列可以被多个消费者消费,假设现在队列里面有 1~10 号消息,消费者A 拉取了 1~3 号消息,消费者B 再拉取的时候,Broker 必须知道 1~3 号消息已经投递过在消费中了,不能再投递给消费者B了,得投递 4 号及之后的消息给消费者B才行。
image.png

2、Broker 要记录哪些消息消费成功了,哪些消息消费失败了,不能单纯记录消费位点了
因为队列可以被多个消费者消费,大家都在上报队列粒度的消费位点,Broker 没法管理了。

你看,为了可以让队列被多个消费者消费,Broker 已经不能再按队列维度去管理信息了,必须精确到消息粒度,这就需要额外的数据结构来支撑。

设计实现

取消客户端队列 Rebalance
客户端重平衡,不仅限制了消费者的消费能力,还增加了客户端复杂度,重平衡期间消息无法被消费,还容易造成消息堆积。Pop 消费模式下,消费者可以消费所有队列,也就不再需要客户端重平衡了。消费者查询给自己分配的队列时,Proxy 返回的是 Broker 维度且 queueId=-1 的逻辑队列,Broker 端会投递所有队列的消息。
image.png

invisibleTime
Pop 消费模式下,消息投递后会有一个invisibleTime的概念,即消息的不可见时间,默认是 60 秒。比如 M1 投递给消费者A后,在不可见时间段内,其它消费者是无法消费这条消息的。
image.png
另外 Broker 还提供了changeInvisibleTime()接口修改单条消息的不可见时间,比如消息消费失败后,会根据重试次数来设置新的不可见时间。
image.png

CK & ACK 消息
RocketMQ 为了记录消息是否被消费成功,引入了 CK 和 ACK 消息,以及一个专属的 Topic:rmq_sys_REVIVE_LOG_{clusterName}
消费者在拉取消息时,Broker 端会给拉取到的这一批消息发一个 CK 消息,CK 消息记录了各消息的偏移量可以定位到具体消息,同时用位图记录了各消息的 ACK 情况。Consumer 消费成功后会调用ackMessage接口,Broker 会发送一个对应的 ACK 消息;消费失败后会调用changeInvisibleTime接口,延长消息不可见时间,底层是先发一个旧消息的 ACK 消息,再发一个新消息的 CK 消息。
image.png
上述流程的运转是基于消费者正常处理消费结果的前提下的,如果消费者挂了,既不发 ack 也不发 nack,Broker 又该怎么处理这些消息呢?其实也能正常处理,因为 CK 和 ACK 消息均是延时消息,延迟的时间即消息的不可见时间,CK 消息会提前一秒消费,目的是匹配 ACK 消息。
RocketMQ 会启动八个线程消费 REVIVE Topic 对应的八个队列,匹配 CK 消息里还有哪些消息没被 ack,再将这些没被 ack 的消息发送到 RETRY Topic,消费者就可以重新消费了。

源码

先通过流程图熟悉下 Pop 消费模式的大体流程:
image.png

ProcessQueue

PushConsumer 启动后,会定时向 Proxy 查询分配的队列:

protected void startUp() throws Exception {......scanAssignmentsFuture = scheduler.scheduleWithFixedDelay(() -> {try {// 扫描分配的队列scanAssignments();} catch (Throwable t) {log.error("Exception raised while scanning the load assignments, clientId={}", clientId, t);}}, 1, 5, TimeUnit.SECONDS);......
}

Pop 消费模式下,客户端无须知道具体的队列数据,因为 Broker 会投递所有队列消息,所以 Proxy 只返回 Broker 维度且 queueId=-1 的一个逻辑队列即可。
消费者拿到分配的队列后,紧接着调用syncProcessQueue()方法处理队列,有两种情况:

  • 队列不再分配给自己,停止拉取消息,移除队列
  • 新分配的队列,立即拉取消息

对于新分配的队列,消费者会创建ProcessQueue对象开始拉取队列消息:

private void receiveMessageImmediately() {// Broker端点列表 gRPC负载均衡调用final Endpoints endpoints = mq.getBroker().getEndpoints();// 构建请求final ReceiveMessageRequest request = consumer.wrapReceiveMessageRequest(batchSize, mq, filterExpression);// 发请求final ListenableFuture<ReceiveMessageResult> future = consumer.receiveMessage(request, mq,consumer.getPushConsumerSettings().getLongPollingTimeout());Futures.addCallback(future, new FutureCallback<ReceiveMessageResult>() {@Overridepublic void onSuccess(ReceiveMessageResult result) {// 处理消息onReceiveMessageResult(result);}}
}

PopMessageProcessor

Proxy 端会开启 gRPC 服务,GrpcMessagingApplication 用来处理客户端的请求,拉取消息对应的处理方法是receiveMessage,最终会交给 Broker 的 PopMessageProcessor 处理,主要步骤:

  • 校验请求参数、队列写权限等等
  • 构建消息过滤器 ExpressionMessageFilter
  • 按照 1/5 的概率先尝试从重试队列里拉取消息
  • 遍历所有队列拉取消息,直到拉取最大消息数
  • 如果普通队列拉取的消息数没达到最大消息数,再尝试拉取重试队列
  • 如果还有剩余消息,通知其它被长轮询挂起的请求继续拉取
  • 返回结果
private RemotingCommand processRequest(final Channel channel, RemotingCommand request)throws RemotingCommandException {......// 参数、权限校验// Topic配置TopicConfig topicConfig =this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());// 消费组订阅配置SubscriptionGroupConfig subscriptionGroupConfig =this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());// 消息过滤ExpressionMessageFilter messageFilter = ......// POP模式下,消费者要遍历所有队列,随机一个下标开始读取,都从0开始读的话,存在热点队列,竞争锁int randomQ = random.nextInt(100);int reviveQid; // REVEIE队列ID 顺序消息999 普通消息:0~7轮询if (requestHeader.isOrder()) {reviveQid = KeyBuilder.POP_ORDER_REVIVE_QUEUE;} else {reviveQid = (int) Math.abs(ckMessageNumber.getAndIncrement() % this.brokerController.getBrokerConfig().getReviveQueueNum());}int commercialSizePerMsg = this.brokerController.getBrokerConfig().getCommercialSizePerMsg();GetMessageResult getMessageResult = new GetMessageResult(commercialSizePerMsg);long restNum = 0; // 剩余消息数// 平均每5次请求,读取一下重试队列boolean needRetry = randomQ % 5 == 0;if (needRetry && !requestHeader.isOrder()) {restNum = popMsgFromQueue(true, getMessageResult, requestHeader, queueId, restNum, reviveQid,channel, popTime, messageFilter,startOffsetInfo, msgOffsetInfo, orderCountInfo);}// POP模式下,请求的queueId=-1,读所有队列if (requestHeader.getQueueId() < 0) {for (int i = 0; i < topicConfig.getReadQueueNums(); i++) {int queueId = (randomQ + i) % topicConfig.getReadQueueNums();restNum = popMsgFromQueue(false, getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, messageFilter,startOffsetInfo, msgOffsetInfo, orderCountInfo);}}// 拉取到的消息数量不满足消费者期望的数量,接着拉取重试队列:%RETRY%{consumerGroup}_{topic}......if (!getMessageResult.getMessageBufferList().isEmpty()) {// 拉取到消息,且队列里面还有消息,通知其它拉取请求if (restNum > 0) {notifyMessageArriving(requestHeader.getTopic(), requestHeader.getConsumerGroup(),requestHeader.getQueueId());}} else {// 没有消息,进入长轮询状态 挂起int pollingResult = polling(channel, request, requestHeader);}......
}

通过队列获取消息的方法是popMsgFromQueue,虽然队列可以被多个消费者消费,但是同一个消费组下,针对同一个队列拉取消息的行为必须保证串行,所以 Broker 首先会构建一个topic@group@queueId格式的字符串作为 lockKey 保证加锁成功,然后再获取消息拉取位点 offset,调用 MessageStore 获取消息,最终给这批消息记录 CK 消息。

private long popMsgFromQueue(boolean isRetry, GetMessageResult getMessageResult,PopMessageRequestHeader requestHeader, int queueId, long restNum, int reviveQid,Channel channel, long popTime,ExpressionMessageFilter messageFilter, StringBuilder startOffsetInfo,StringBuilder msgOffsetInfo, StringBuilder orderCountInfo) {String topic = isRetry ? KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(),requestHeader.getConsumerGroup()) : requestHeader.getTopic();// 给队列加锁 同一消费组下的队列串行读取 topic@group@queueIdString lockKey =topic + PopAckConstants.SPLIT + requestHeader.getConsumerGroup() + PopAckConstants.SPLIT + queueId;// 加锁if (!queueLockManager.tryLock(lockKey)) {restNum = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - offset + restNum;return restNum;}// 拉取位点offset = getPopOffset(topic, requestHeader, queueId, true, lockKey);// 获取消息GetMessageResult getMessageTmpResult = this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), topic, queueId, offset,requestHeader.getMaxMsgNums() - getMessageResult.getMessageMapedList().size(), messageFilter);if (!getMessageTmpResult.getMessageMapedList().isEmpty()) {// 追加CheckPointappendCheckPoint(requestHeader, topic, reviveQid, queueId, offset, getMessageTmpResult, popTime, this.brokerController.getBrokerConfig().getBrokerName());}......queueLockManager.unLock(lockKey);
}

image.png
这里的锁实现很简单,通过 AtomicBoolean CAS 修改上锁标记位来判断是否加锁成功:

static class TimedLock {private final AtomicBoolean lock;private volatile long lockTime;
}

加锁成功后,Broker 得知道该从哪里开始给消费者投递消息,这就是拉取位点的获取:

  • 首先取已提交的消费位点
  • 如果 CK 缓冲区有已投递的 PopCheckPoint,则取缓冲区的拉取位点,避免已经投递过的消息重复投递
private long getPopOffset(String topic, PopMessageRequestHeader requestHeader, int queueId, boolean init,String lockKey) {// 已提交的消费位点long offset = this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getConsumerGroup(),topic, queueId);if (offset < 0) {// 还没消费过 判断是从最旧还是最新的开始消费if (ConsumeInitMode.MIN == requestHeader.getInitMode()) {offset = this.brokerController.getMessageStore().getMinOffsetInQueue(topic, queueId);} else {// pop last one,then commit offset.offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - 1;// max & no consumer offsetif (offset < 0) {offset = 0;}if (init) {this.brokerController.getConsumerOffsetManager().commitOffset("getPopOffset",requestHeader.getConsumerGroup(), topic,queueId, offset);}}}// CK缓冲区的位点,这些是已投递、还没提交消费的位点long bufferOffset = this.popBufferMergeService.getLatestOffset(lockKey);if (bufferOffset < 0) {return offset;} else {return bufferOffset > offset ? bufferOffset : offset;}
}

有了拉取位点,接下来就是通过 MessageStore 查询消息了,底层会读取 consumequeue 和 commitlog 文件,这里不赘述。拉取到消息列表后,Broker 会给这一批消息记录一个 CK 消息,用于后续匹配 ACK 消息。

PopCheckPoint

CK 消息对应的类是 PopCheckPoint,主要记录了:消息拉取时间、消息偏移量、不可见时间、消息 ACK 位图等等。

public class PopCheckPoint {// 起始偏移量@JSONField(name = "so")private long startOffset;// 消息拉取时间@JSONField(name = "pt")private long popTime;@JSONField(name = "it")private long invisibleTime;// 位图 收到ACK消息则把对应位设为1@JSONField(name = "bm")private int bitMap;// 消息数量@JSONField(name = "n")private byte num;@JSONField(name = "q")private byte queueId;@JSONField(name = "t")private String topic;// 消费组@JSONField(name = "c")private String cid;@JSONField(name = "ro")private long reviveOffset;// 消息增量偏移量@JSONField(name = "d")private List<Integer> queueOffsetDiff;@JSONField(name = "bn")String brokerName;
}

构建好 PopCheckPoint 对象,Broker 会把它作为一个普通消息写入 commitlog 持久化,消息内容:

topic: rmq_sys_REVIVE_LOG_{clusterName}
queueId: 0~7轮询
tag: ck
body: json(PopCheckPoint)
deliverTimeMs: invisibleTime-1s

CK 消息存储完毕后,就可以正常返回消息了。需要注意的是,消息在返回给消费者前,Broker 会给消息设置一个很重要的属性:POP_CK。它是消息关联的 CK 消息的句柄字符串,消费者基于该属性来 ack 消息。

messageExt.getProperties().put(MessageConst.PROPERTY_POP_CK,ExtraInfoUtil.buildExtraInfo(startOffsetInfo.get(key), responseHeader.getPopTime(), responseHeader.getInvisibleTime(),responseHeader.getReviveQid(), messageExt.getTopic(), messageQueue.getBrokerName(), messageExt.getQueueId(), msgQueueOffset)
);

POP_CK由以下八个属性构成,空格连接,通过该属性可以快速定位到 CK 消息。

{ckQueueOffset} {popTime} {invisibleTime} {reviveQid} {0/1} {brokerName} {queueId} {msgQueueOffset}# 示例值
2 1698656741635 60000 0 0 broker-a 3 2

AckMessageProcessor

消费者消费完消息后,会调用eraseMessage()擦除消息,也就是根据消费结果判断是 ack 还是 nack。

public void eraseMessage(MessageViewImpl messageView, ConsumeResult consumeResult) {statsConsumptionResult(consumeResult);ListenableFuture<Void> future = ConsumeResult.SUCCESS.equals(consumeResult) ? ackMessage(messageView) :nackMessage(messageView);future.addListener(() -> evictCache(messageView), MoreExecutors.directExecutor());
}

如果消费成功,则调用ackMessage接口,Broker 的处理方式也很简单,就是构建一个 AckMsg 对象,然后把它作为消息体发一个 ACK 消息。

private RemotingCommand processRequest(final Channel channel, RemotingCommand request,boolean brokerAllowSuspend) throws RemotingCommandException {......AckMsg ackMsg = new AckMsg();ackMsg.setAckOffset(requestHeader.getOffset());ackMsg.setStartOffset(ExtraInfoUtil.getCkQueueOffset(extraInfo));ackMsg.setConsumerGroup(requestHeader.getConsumerGroup());ackMsg.setTopic(requestHeader.getTopic());ackMsg.setQueueId(requestHeader.getQueueId());ackMsg.setPopTime(ExtraInfoUtil.getPopTime(extraInfo));ackMsg.setBrokerName(ExtraInfoUtil.getBrokerName(extraInfo));if (rqId == KeyBuilder.POP_ORDER_REVIVE_QUEUE) {顺序消息处理}// 构建消息存储MessageExtBrokerInner msgInner = new MessageExtBrokerInner();msgInner.setTopic(reviveTopic);msgInner.setBody(JSON.toJSONString(ackMsg).getBytes(DataConverter.charset));msgInner.setQueueId(rqId);msgInner.setTags(PopAckConstants.ACK_TAG);msgInner.setBornTimestamp(System.currentTimeMillis());msgInner.setBornHost(this.brokerController.getStoreHost());msgInner.setStoreHost(this.brokerController.getStoreHost());// 延时消息 拉取时间+不可见时间msgInner.setDeliverTimeMs(ExtraInfoUtil.getPopTime(extraInfo) + ExtraInfoUtil.getInvisibleTime(extraInfo));msgInner.getProperties().put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, PopMessageProcessor.genAckUniqueId(ackMsg));msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));PutMessageResult putMessageResult = this.brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);return response;
}

ACK 消息内容如下:

topic: rmq_sys_REVIVE_LOG_{clusterName}
queueId: 0~7轮询
tag: ack
body: json(AckMsg)
deliverTimeMs: invisibleTime

注意:CK & ACK 消息同属一个Topic

ChangeInvisibleTimeProcessor

如果消费失败,会根据重试次数调用changeInvisibleDuration接口延长消息不可见时间。Broker 最终会由 ChangeInvisibleTimeProcessor 处理请求,因为消息是按照紧凑的方式顺序写入 commitlog,所以写入后就不支持修改了,所谓的修改消息不可见时间,其实是先发一个新的 CK 消息,再发一个旧消息的 ACK 消息。

private RemotingCommand processRequest(final Channel channel, RemotingCommand request,boolean brokerAllowSuspend) throws RemotingCommandException {......// add new cklong now = System.currentTimeMillis();PutMessageResult ckResult = appendCheckPoint(requestHeader, ExtraInfoUtil.getReviveQid(extraInfo), requestHeader.getQueueId(), requestHeader.getOffset(), now, ExtraInfoUtil.getBrokerName(extraInfo));// ack old msgackOrigin(requestHeader, extraInfo);
}

所以,无论消费者是调用 ack 还是 nack,消息一定会被 ack 掉。另外还有一种场景就是消费者异常,没有上报消费结果,Broker 在消息不可见时间到期后会把这些没被 ack 掉的消息发到重试队列里,让其它消费者消费。

PopReviveService

CK & ACK 消息存储下来以后,由谁来处理呢?AckMessageProcessor 会开启八个线程,消费 REVIVE Topic 的八个队列,线程对应的服务是 PopReviveService。
PopReviveService 线程每秒执行一次,消费 REVIVE Topic 里的消息。如果是 CK 消息则重新构建 PopCheckPoint 对象;如果是 ACK 消息则更新 PopCheckPoint 里的位图,通过 tag 来区分消息类型。
最后处理 PopCheckPoint,已经被 ack 的消息不做处理,未被 ack 的消息会构建一条新消息重新发送到重试队列,Topic 规则是:%RETRY%{consumerGroup}_{topic}
image.png

public void run() {......ConsumeReviveObj consumeReviveObj = new ConsumeReviveObj();// 消费Revive队列消息,构建PopCheckPoint放入mapconsumeReviveMessage(consumeReviveObj);// 处理PopCheckPointmergeAndRevive(consumeReviveObj);
}
protected void consumeReviveMessage(ConsumeReviveObj consumeReviveObj) {HashMap<String, PopCheckPoint> map = consumeReviveObj.map;// 消费位点long oldOffset = this.brokerController.getConsumerOffsetManager().queryOffset(PopAckConstants.REVIVE_GROUP, reviveTopic, queueId);while (true) {// 查询REVIVE队列消息List<MessageExt> messageExts = getReviveMessage(offset, queueId);for (MessageExt messageExt : messageExts) {//ck消息if (PopAckConstants.CK_TAG.equals(messageExt.getTags())) {String raw = new String(messageExt.getBody(), DataConverter.charset);PopCheckPoint point = JSON.parseObject(raw, PopCheckPoint.class);map.put(point.getTopic() + point.getCId() + point.getQueueId() + point.getStartOffset() + point.getPopTime(), point);} else if (PopAckConstants.ACK_TAG.equals(messageExt.getTags())) {// ack消息String raw = new String(messageExt.getBody(), DataConverter.charset);AckMsg ackMsg = JSON.parseObject(raw, AckMsg.class);PopCheckPoint point = map.get(ackMsg.getTopic() + ackMsg.getConsumerGroup() + ackMsg.getQueueId() + ackMsg.getStartOffset() + ackMsg.getPopTime());// 设置CheckPoint ACK位图int indexOfAck = point.indexOfAck(ackMsg.getAckOffset());if (indexOfAck > -1) {point.setBitMap(DataConverter.setBit(point.getBitMap(), indexOfAck, true));} else {POP_LOGGER.error("invalid ack index, {}, {}", ackMsg, point);}}        }}
}

CK 消息到期后,会触发reviveMsgFromCk恢复没有被 ack 的消息,处理方式是构建新消息发到重试队列。

private void reviveMsgFromCk(PopCheckPoint popCheckPoint) throws Throwable {for (int j = 0; j < popCheckPoint.getNum(); j++) {if (DataConverter.getBit(popCheckPoint.getBitMap(), j)) {// 已经ack了,跳过continue;}// 查询实际消息long msgOffset = popCheckPoint.ackOffsetByIndex((byte) j);MessageExt messageExt = getBizMessage(popCheckPoint.getTopic(), msgOffset, popCheckPoint.getQueueId(), popCheckPoint.getBrokerName());if (messageExt == null) {POP_LOGGER.warn("reviveQueueId={},can not get biz msg topic is {}, offset is {} , then continue ",queueId, popCheckPoint.getTopic(), msgOffset);continue;}//skip ck from last epochif (popCheckPoint.getPopTime() < messageExt.getStoreTimestamp()) {POP_LOGGER.warn("reviveQueueId={},skip ck from last epoch {}", queueId, popCheckPoint);continue;}// 重试 构建一条消息重新Put到commitlog topic:%RETRY%{consumerGroup}_{topic}reviveRetry(popCheckPoint, messageExt);}
}

PopBufferMergeService

为了记录消息的 ack 状态,RocketMQ 需要存储额外的 CK & ACK 消息,开销还是比较大的。而消息存储的目的是为了匹配消息的 ack 状态,把未被 ack 的消息发送到重试队列里。
消息消费一般是很快的,意味着正常情况下,很快就能收到消费者发过来的 ack 和 nack 请求,那为什么不直接在内存里完成匹配呢?这样做可以大幅提升性能,消息落盘只作为一个兜底方案。
RocketMQ 是支持优先在内存里匹配 ack 消息的,默认是关闭状态,需要手动把enablePopBufferMerge打开。
开启内存匹配后,PopCheckPoint 会优先只追加到缓冲区,只有当缓冲区添加失败才会落地磁盘。

public boolean addCk(PopCheckPoint point, int reviveQueueId, long reviveQueueOffset, long nextBeginOffset) {// key: point.getT() + point.getC() + point.getQ() + point.getSo() + point.getPt()if (!brokerController.getBrokerConfig().isEnablePopBufferMerge()) {// 启用内存匹配return false;}if (!serving) {return false;}long now = System.currentTimeMillis();if (point.getReviveTime() - now < brokerController.getBrokerConfig().getPopCkStayBufferTimeOut() + 1500) {if (brokerController.getBrokerConfig().isEnablePopLog()) {POP_LOGGER.warn("[PopBuffer]add ck, timeout, {}, {}", point, now);}return false;}if (this.counter.get() > brokerController.getBrokerConfig().getPopCkMaxBufferSize()) {POP_LOGGER.warn("[PopBuffer]add ck, max size, {}, {}", point, this.counter.get());return false;}PopCheckPointWrapper pointWrapper = new PopCheckPointWrapper(reviveQueueId, reviveQueueOffset, point, nextBeginOffset);if (!checkQueueOk(pointWrapper)) {// 队列默认不超过20000个return false;}// 添加到commitOffsetsputOffsetQueue(pointWrapper);// 添加到缓冲区this.buffer.put(pointWrapper.getMergeKey(), pointWrapper);this.counter.incrementAndGet();if (brokerController.getBrokerConfig().isEnablePopLog()) {POP_LOGGER.info("[PopBuffer]add ck, {}", pointWrapper);}return true;
}

同样的,Broker 在接收到消息 ack 请求时,也会优先只改缓冲区的 PopCheckPoint ack 位图,无需存储 ACK 消息。

 public boolean addAk(int reviveQid, AckMsg ackMsg) {if (!brokerController.getBrokerConfig().isEnablePopBufferMerge()) {return false;}if (!serving) {return false;}try {PopCheckPointWrapper pointWrapper = this.buffer.get(ackMsg.getTopic() + ackMsg.getConsumerGroup() + ackMsg.getQueueId() + ackMsg.getStartOffset() + ackMsg.getPopTime() + ackMsg.getBrokerName());if (pointWrapper == null) {if (brokerController.getBrokerConfig().isEnablePopLog()) {POP_LOGGER.warn("[PopBuffer]add ack fail, rqId={}, no ck, {}", reviveQid, ackMsg);}return false;}if (pointWrapper.isJustOffset()) {return false;}PopCheckPoint point = pointWrapper.getCk();long now = System.currentTimeMillis();if (point.getReviveTime() - now < brokerController.getBrokerConfig().getPopCkStayBufferTimeOut() + 1500) {if (brokerController.getBrokerConfig().isEnablePopLog()) {POP_LOGGER.warn("[PopBuffer]add ack fail, rqId={}, almost timeout for revive, {}, {}, {}", reviveQid, pointWrapper, ackMsg, now);}return false;}if (now - point.getPopTime() > brokerController.getBrokerConfig().getPopCkStayBufferTime() - 1500) {if (brokerController.getBrokerConfig().isEnablePopLog()) {POP_LOGGER.warn("[PopBuffer]add ack fail, rqId={}, stay too long, {}, {}, {}", reviveQid, pointWrapper, ackMsg, now);}return false;}// 直接更改内存里的位图int indexOfAck = point.indexOfAck(ackMsg.getAckOffset());if (indexOfAck > -1) {markBitCAS(pointWrapper.getBits(), indexOfAck);} else {POP_LOGGER.error("[PopBuffer]Invalid index of ack, reviveQid={}, {}, {}", reviveQid, ackMsg, point);return true;}if (brokerController.getBrokerConfig().isEnablePopLog()) {POP_LOGGER.info("[PopBuffer]add ack, rqId={}, {}, {}", reviveQid, pointWrapper, ackMsg);}return true;} catch (Throwable e) {POP_LOGGER.error("[PopBuffer]add ack error, rqId=" + reviveQid + ", " + ackMsg, e);}return false;}

内存资源有限,一直往里堆 PopCheckPoint 也不行啊,这时候就需要做两件事:

  • 把内存里已经匹配完 ack 的 PopCheckPoint 移除掉
  • 隔太久还没 ack 掉的 PopCheckPoint,必须要落盘存储了

PopBufferMergeService 本身也是个线程,会每隔 5ms 扫描一次缓冲区,执行上述操作。
image.png
内存里的 PopCheckPoint 移除前需要满足两个条件:

  • PopCheckPoint 消息已经持久化了
  • PopCheckPoint 在内存里就已经全被 ack 掉了

还没有完全被 ack 掉的 PopCheckPoint 移除前需要做两件事:

  • PopCheckPoint 消息持久化
  • 已经被 ack 掉的消息也要持久化
private void scan() {long startTime = System.currentTimeMillis();int count = 0, countCk = 0;// 迭代缓冲区的PopCheckPointIterator<Map.Entry<String, PopCheckPointWrapper>> iterator = buffer.entrySet().iterator();while (iterator.hasNext()) {Map.Entry<String, PopCheckPointWrapper> entry = iterator.next();PopCheckPointWrapper pointWrapper = entry.getValue();// CheckPoint已持久化,或已被ACK,从缓冲区删除// just process offset(already stored at pull thread), or buffer ck(not stored and ack finish)if (pointWrapper.isJustOffset() && pointWrapper.isCkStored() || isCkDone(pointWrapper)|| isCkDoneForFinish(pointWrapper) && pointWrapper.isCkStored()) {if (brokerController.getBrokerConfig().isEnablePopLog()) {POP_LOGGER.info("[PopBuffer]ck done, {}", pointWrapper);}iterator.remove();counter.decrementAndGet();continue;}// 把超时或停留时间超10s的CheckPoint从缓冲区删除,删除前要先持久化PopCheckPoint point = pointWrapper.getCk();long now = System.currentTimeMillis();boolean removeCk = !this.serving;// ck will be timeoutif (point.getReviveTime() - now < brokerController.getBrokerConfig().getPopCkStayBufferTimeOut()) {removeCk = true;}// 内存停留时间超过10秒,也要移除掉if (now - point.getPopTime() > brokerController.getBrokerConfig().getPopCkStayBufferTime()) {removeCk = true;}if (now - point.getPopTime() > brokerController.getBrokerConfig().getPopCkStayBufferTime() * 2L) {POP_LOGGER.warn("[PopBuffer]ck finish fail, stay too long, {}", pointWrapper);}// double checkif (isCkDone(pointWrapper)) {continue;} else if (pointWrapper.isJustOffset()) {// just offset should be in store.if (pointWrapper.getReviveQueueOffset() < 0) {// reviveQueueOffset<0代表还没存储,要先存储putCkToStore(pointWrapper, false);countCk++;}continue;} else if (removeCk) {// put buffer ak to storeif (pointWrapper.getReviveQueueOffset() < 0) {putCkToStore(pointWrapper, false);countCk++;}if (!pointWrapper.isCkStored()) {continue;}for (byte i = 0; i < point.getNum(); i++) {// reput buffer ak to store// 存储ack消息if (DataConverter.getBit(pointWrapper.getBits().get(), i)&& !DataConverter.getBit(pointWrapper.getToStoreBits().get(), i)) {if (putAckToStore(pointWrapper, i)) {count++;markBitCAS(pointWrapper.getToStoreBits(), i);}}}if (isCkDoneForFinish(pointWrapper) && pointWrapper.isCkStored()) {if (brokerController.getBrokerConfig().isEnablePopLog()) {POP_LOGGER.info("[PopBuffer]ck finish, {}", pointWrapper);}iterator.remove();counter.decrementAndGet();continue;}}}// 扫描commitOffsets,提交消费位点int offsetBufferSize = scanCommitOffset();long eclipse = System.currentTimeMillis() - startTime;if (eclipse > brokerController.getBrokerConfig().getPopCkStayBufferTimeOut() - 1000) {POP_LOGGER.warn("[PopBuffer]scan stop, because eclipse too long, PopBufferEclipse={}, " +"PopBufferToStoreAck={}, PopBufferToStoreCk={}, PopBufferSize={}, PopBufferOffsetSize={}",eclipse, count, countCk, counter.get(), offsetBufferSize);this.serving = false;} else {if (scanTimes % countOfSecond1 == 0) {POP_LOGGER.info("[PopBuffer]scan, PopBufferEclipse={}, " +"PopBufferToStoreAck={}, PopBufferToStoreCk={}, PopBufferSize={}, PopBufferOffsetSize={}",eclipse, count, countCk, counter.get(), offsetBufferSize);}}scanTimes++;if (scanTimes >= countOfMinute1) {counter.set(this.buffer.size());scanTimes = 0;}
}

内存里的 PopCheckPoint 除了会根据唯一键,构建一个 Map,还会根据topic@cid@queueId构建一个 QueueWithTime 队列连接起来。

ConcurrentHashMap<String/*topic@cid@queueId*/, QueueWithTime<PopCheckPointWrapper>> commitOffsets =new ConcurrentHashMap<>();

QueueWithTime 队列的用途有两个:

  • 内存里的 PopCheckPoint 处理完毕后,提交消费位点
  • 消息拉取时,获取拉取位点,避免已经投递的消息重复投递

内存里的 PopCheckPoint 只要持久化了或者全被 ack 掉了,就可以提交消费位点了。因为这些消息要么被成功消费了,要么后续在处理 CK 消息时也会被发送到重试队列里。提交消费位点的方法是commitOffset()

private boolean commitOffset(final PopCheckPointWrapper wrapper) {if (wrapper.getNextBeginOffset() < 0) {return true;}final PopCheckPoint popCheckPoint = wrapper.getCk();final String lockKey = wrapper.getLockKey();// 加锁if (!queueLockManager.tryLock(lockKey)) {return false;}try {// 旧的消费位点final long offset = brokerController.getConsumerOffsetManager().queryOffset(popCheckPoint.getCId(), popCheckPoint.getTopic(), popCheckPoint.getQueueId());if (wrapper.getNextBeginOffset() > offset) {if (brokerController.getBrokerConfig().isEnablePopLog()) {POP_LOGGER.info("Commit offset, {}, {}", wrapper, offset);}} else {// maybe store offset is not correct.POP_LOGGER.warn("Commit offset, consumer offset less than store, {}, {}", wrapper, offset);}// 提交消费位点brokerController.getConsumerOffsetManager().commitOffset(getServiceName(),popCheckPoint.getCId(), popCheckPoint.getTopic(), popCheckPoint.getQueueId(), wrapper.getNextBeginOffset());} finally {queueLockManager.unLock(lockKey);}return true;
}

尾巴

RocketMQ 5.0 的 Pop 消费模式是 Push 模式的升级版,它解决了原先 Push 模式下队列只能由一个消费者消费的问题、去除了客户端繁重的重平衡逻辑、降低了消息堆积的风险。核心逻辑是 Broker 给每次拉取的一批消息发一个 CK 延时消息,客户端 ack 时再发一个 ACK 延时消息,消息到期后对 CK 消息做 ACK 匹配,把未被 ack 掉的消息发到重试独立里。这种模式下,不可避免的会带来额外开销,所以 RocketMQ 也支持优先在内存里完成匹配,CK 和 ACK 消息就不用存储了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/229774.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Blender:从新手到专家的全方位指南

Blender&#xff0c;这款强大的开源3D建模和渲染软件&#xff0c;已经成为了CG行业的标准工具之一。它不仅拥有丰富的教程资源&#xff0c;而且还在不断发展和完善中。尽管Blender的教程主要集中在国外网站和YouTube上&#xff0c;但其全面的功能和易用性使它成为许多人的首选工…

宣传照(私密)勿转发

精美的海报通常都是由UI进行精心设计的&#xff0c;现在有100 件商品需要进行宣传推广&#xff0c;如果每个商品都出一张图显然是不合理的&#xff0c;且商品信息各异。因此需要通过代码的形式生成海报。对此&#xff0c;我也对我宣传一波&#xff0c;企图实现我一夜暴富的伟大…

海外静态IP和动态IP有什么区别?推荐哪种?

什么是静态ip、动态ip&#xff0c;二者有什么区别&#xff1f;哪种好&#xff1f;关于这个问题&#xff0c;不难发现&#xff0c;在知道、知乎上面的解释有很多&#xff0c;但据小编的发现&#xff0c;这些回答都是关于静态ip和动态ip的专业术语解释&#xff0c;普通非专业人事…

计算机毕业设计 SpringBoot的乡村养老服务管理系统 Javaweb项目 Java实战项目 前后端分离 文档报告 代码讲解 安装调试

&#x1f34a;作者&#xff1a;计算机编程-吉哥 &#x1f34a;简介&#xff1a;专业从事JavaWeb程序开发&#xff0c;微信小程序开发&#xff0c;定制化项目、 源码、代码讲解、文档撰写、ppt制作。做自己喜欢的事&#xff0c;生活就是快乐的。 &#x1f34a;心愿&#xff1a;点…

log4cplus visual c++ 编译及调试小记

简介 最近在调试一款SATA加密设备&#xff0c;发现设备有时加密出来的数据&#xff0c;再解密时与明文对不上&#xff0c;怀疑是通信问题。因此&#xff0c;急需要在测试工具中加入通信日志。由于对第三方日志库都不熟悉&#xff0c;所以随便选了个log4cplus软件集成到现有工具…

Python中的垃圾回收机制是什么

一、写在前面&#xff1a; 我们都知道Python一种面向对象的脚本语言&#xff0c;对象是Python中非常重要的一个概念。在Python中数字是对象&#xff0c;字符串是对象&#xff0c;任何事物都是对象&#xff0c;而它们的核心就是一个结构体--PyObject。 typedef struct_object{i…

【数据结构——二叉树】二叉树及其应用2023(头歌习题)【合集】

目录 第1关&#xff1a;括号表示法创建二叉树任务描述相关知识编程要求测试说明完整代码 第2关&#xff1a;先序序列创建二叉树任务描述相关知识二叉树的前序遍历如何创建一颗二叉树伪代码如下&#xff1a; 二叉树的中序遍历 编程要求测试说明完整代码 第3关&#xff1a;计算二…

畅捷通的 Serverless 探索实践之路

作者&#xff1a;计缘&#xff0c;阿里云云原生架构师 畅捷通介绍 畅捷通是中国领先的小微企业财税及业务云服务提供商&#xff0c;成立于 2010 年。畅捷通在 2021 年中国小微企业云财税市场份额排名第一&#xff0c;在产品前瞻性及行业全覆盖方面领跑市场&#xff0c;位居中…

使用运程操作电脑向日葵安装MySQl与Navicat的安装

目录 一、向日葵 1.1、简介 1.2、应用场景 1.3、原理&#xff1a; 1.4、使用&#xff1a; 1.5、在实施中的应用场景&#xff1a; 二、在Windows Server2012中安装MySQL 2.1、MySQL简介 2.2、MySQL5.7安装与8.0 2.3、输入命令步骤 三、Navicat 3.1、简介 3.2、安装N…

再见2023,你好2024(附新年烟花python实现)

亲爱的朋友们&#xff1a; 写点什么呢&#xff0c;我已经停更两个月了。2023年快结束了&#xff0c;时间真的过得好快&#xff0c;总要写点什么留下纪念吧。这一年伴随着许多挑战和机会&#xff0c;给了我无数的成长和体验。坦白说&#xff0c;有时候我觉得自己好像是在时间的…

JavaBean

学习目的与要求 熟练掌握<jsp:useBean>、<jsp:setProperty>、<jsp:getProperty>等JSP的操作指令。 本章主要内容 编写JavaBean在JSP中使用JavaBean 一个JSP页面通过使用HTML标记为用户显示数据&#xff08;静态部分&#xff09;&#xff0c;页面中变量的…

BMS均衡技术

一、电池的不一致性&#xff1f; 每个电池都有自己的“个性”&#xff0c;要说均衡&#xff0c;得先从电池谈起。即使是同一厂家同一批次生产的电池&#xff0c;也都有自己的生命周期、自己的“个性”——每个电池的容量不可能完全一致。例如以下的两个原因都会造成电池不一致…

Redis缓存保卫战:拒绝缓存击穿的进攻【redis问题 三】

欢迎来到我的博客&#xff0c;代码的世界里&#xff0c;每一行都是一个故事 Redis缓存保卫战&#xff1a;拒绝缓存击穿的进攻 前言缓存击穿的定义和原理为何会发生缓存击穿缓存击穿的危害防范缓存击穿结语: 前言 你是否曾经遇到过系统在高并发情况下出现严重性能问题&#xff…

JavaSE学习笔记 2023-12-28 --MySQL

MySQL 1.数据库介绍 数据库:数据仓库 DataBase:简称DB,用于长期存储有结构的,大量的,共享的数据长期的:持久存储,永久存储 有结构:有类型,有内部的数据类型有关系,数据与数据之前是有关联的 大量的:大多数据库都是以文件系统存在的,可以将数据存储在磁盘中 共享的:多个应用之…

[概率论]四小时不挂猴博士

贝叶斯公式是什么 贝叶斯公式是概率论中的一个重要定理&#xff0c;用于计算在已知一些先验信息的情况下&#xff0c;更新对事件发生概率的估计。贝叶斯公式的表达式如下&#xff1a; P(A|B) P(B|A) * P(A) / P(B) 其中&#xff0c;P(A|B)表示在事件B发生的条件下事件A发生的概…

干洗店洗鞋店小程序核心功能有哪些?

在繁忙的生活中&#xff0c;我们的鞋子常常承载着风尘仆仆的故事。而洗鞋小程序&#xff0c;就是那个让您的鞋子焕然一新的魔法师。通过这个小程序&#xff0c;您可以在线预约、支付&#xff0c;查询洗鞋订单&#xff0c;并与洗鞋店铺进行互动&#xff0c;轻松享受专业的洗鞋服…

小红书如何高效引流?

近年来&#xff0c;公域流量价格不断上涨&#xff0c;私域流量的优势逐渐凸显。企业正花费大量资源和成本来获取新流量&#xff0c;但与其如此&#xff0c;不如将精力放在留存和复购上&#xff0c;从而实现业绩的新增长。其中关键在于如何有效地将公域流量转化为私域流量。 然而…

听GPT 讲Rust源代码--src/tools(38)

File: rust/src/tools/clippy/clippy_dev/src/lib.rs rust/src/tools/clippy/clippy_dev/src/lib.rs文件是Clippy开发工具的入口文件&#xff0c;其作用是提供Clippy开发过程中所需的功能和工具。Clippy是一个Rust代码的静态分析工具&#xff0c;用于提供各种有用的代码规范、编…

在Android设备上设置和使用隧道代理HTTP

随着互联网的深入发展&#xff0c;网络信息的传递已经成为人们日常生活中不可或缺的一部分。对于我们中国人来说&#xff0c;由于某些特殊的原因&#xff0c;访问国外网站时常常会遇到限制。为了解决这个问题&#xff0c;使用代理服务器成为了许多人的选择。而在Android设备上设…

UI5与后端的文件交互(二)

文章目录 前言一、开发Action1. 创建Structure2. BEDF添加Action3. class中实现Action 二、修改UI5 项目1. 添加一个按钮2. 定义事件函数 三、测试及解析1. 测试2. js中提取到的excel流数据3. 后端解析 前言 这系列文章详细记录在Fiori应用中如何在前端和后端之间使用文件进行…