文章目录
- 消费者分类
- PushConsumer
- PushConsumer 内部原理
- 使用注意事项
- SimpleConsumer
- invisibleDuration 消息不可见时间
- 消费者分组(消费者负载均衡)
- 广播消费和共享消费
- 负载均衡策略
- 多个消费者消费顺序消息
- 多消费者消费顺序消息示例
- 消费者分组管理
- 关闭自动创建消费者分组
- 使用 admin tool 工具管理消费者分组
- updateSubGroup 更新或修改订阅关系(更新或修改消费者分组信息)
- deleteSubGroup 从 Broker 删除订阅关系(删除消费者分组)
消费者分类
RocketMQ 支持 PushConsumer 、 SimpleConsumer 以及 PullConsumer 这三种类型的消费者。每一种类型的消费者处理逻辑都回经过 消息获取—>消息处理—>消费状态提交 3 个阶段,只是它们每一个的实现方式不一样。
对比项 | PushConsumer | SimpleConsumer | PullConsumer |
---|---|---|---|
实现方式 | 使用监听器回调接口返回消费结果,消费者仅允许在监听器范围内处理消费逻辑。 | 业务方自行实现消息处理,并主动调用接口返回消费结果。 | 业务方自行按队列拉取消息,并可选择性地提交消费结果 |
消费并发度管理 | 由SDK管理消费并发度 | 由业务方消费逻辑自行管理消费线程 | 由业务方消费逻辑自行管理消费线程 |
负载均衡粒度 | 5.0 SDK是消息粒度,更均衡,早期版本是队列维度 | 消息粒度,更均衡 | 队列粒度,吞吐攒批性能更好,但容易不均衡 |
接口灵活度 | 高度封装,不够灵活 | 原子接口,可灵活自定义 | 原子接口,可灵活自定义 |
适用场景 | 适用于无自定义流程的业务消息开发场景 | 适用于需要高度自定义业务流程的业务开发场景 | 仅推荐在流处理框架场景下集成使用 |
在实际使用场景中,PullConsumer 仅推荐在流处理框架中集成使用,大多数消息收发场景使用 PushConsumer 和 SimpleConsumer 就可以满足需求。PullConsumer 在gRPC协议客户端中目前还尚未实现。
相同的 ConsumerGroup 下严禁混用 PullConsumer 和其他两种消费者,否则会导致消息消费异常。
PushConsumer
PushConsumers 是一种高度封装的消费者类型,消费消息仅通过消费监听器处理业务并返回消费结果。消息的获取、消费状态提交以及消费重试都通过 RocketMQ 的客户端SDK完成。我们的示例目前都是使用的 PushConsumer ,这里我们就不再贴示例代码了。
PushConsumer 中我们的实际消费代码 是通过消费监听器 MessageListener 实现,其 public ConsumeResult consume(MessageView messageView) 方法的执行结果分如下几种情况:
- 返回ConsumeResult.SUCCESS:表示该消息处理成功,服务端按照消费结果更新消费进度
- 返回ConsumeResult.FAILURE:示该消息处理失败,需要根据消费重试逻辑判断是否进行重试消费
- 方法内部异常:该结果按照消费失败处理,需要根据消费重试逻辑判断是否进行重试消费
这里涉及到 PushConsumer 的重试逻辑,这个我们会在后续 《RocketMQ 消息重试机制》一章中统一说明
PushConsumer 内部原理
在PushConsumer类型中,消息的实时处理能力是基于SDK内部的典型Reactor线程模型实现的。如下图所示,SDK内置了一个长轮询线程,先将消息异步拉取到SDK内置的缓存队列中,再分别提交到消费线程中,触发监听器执行本地消费逻辑
使用注意事项
- 禁止在 PushConsumer 监听方法中再次定义线程来分发处理。因为这会导致消息尚未处理完成,而监听方法提前返回处理成功,导致如果线程执行失败,无法进入重试逻辑。
- 如果消息处理时间不可预估(直接时间超长)或业务逻辑复杂需要用到多线程时,请使用 SimpleConsumer。
PushConsumer 默认的处理线程数量为 20,可通过 PushConsumerBuilder.setConsumptionThreadCount() 设置线程数
总结:PushConsumer 内置实现了消息的消费、确认、顺序消息的顺序性、异常重试等多项功能,使用起来比较简单,但其扩展性没有 SimpleConsumer 好,不允许使用异步化和自定义处理流程。
SimpleConsumer
SimpleConsumer 的使用涉及多个接口调用,由业务逻辑按需调用接口获取消息,然后分发给业务线程处理消息,最后按照处理的结果调用提交接口,返回服务端当前消息的处理结果。示例如下:
import com.yyoo.mq.rocket.MyMQProperties;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
import org.apache.rocketmq.client.apis.message.MessageView;import java.time.Duration;
import java.util.Collections;
import java.util.List;public class SimpleConsumerDemo {public static void main(String[] args) throws ClientException {// 用于提供:生产者、消费者、消息对应的构建类 BuilderClientServiceProvider provider = ClientServiceProvider.loadService();// 构建配置类(包含端点位置、认证以及连接超时等的配置)ClientConfiguration configuration = ClientConfiguration.newBuilder()// endpoints 即为 proxy 的地址,多个用分号隔开。如:xxx:8081;xxx:8081.setEndpoints(MyMQProperties.ENDPOINTS).build();// 设置过滤条件(这里为使用 tag 进行过滤)String tag = "ORDER_SUBMIT";FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);SimpleConsumer simpleConsumer = provider.newSimpleConsumerBuilder()// 设置消费者分组。.setConsumerGroup("MY_ORDER_SUBMIT_GROUP")// 设置接入点。.setClientConfiguration(configuration)// 设置预绑定的订阅关系。.setSubscriptionExpressions(Collections.singletonMap("MY_NORMAL_TOPIC", filterExpression))// 设置从服务端接受消息的最大等待时间.setAwaitDuration(Duration.ofSeconds(3)).build();// 一次获取多少个消息int maxMessageNum = 10;// 获取消息后,这些消息多长时间内,对其他消费者不可见Duration invisibleDuration = Duration.ofSeconds(30);try {List<MessageView> messageViewList = simpleConsumer.receive(maxMessageNum,invisibleDuration);// 循环处理所有取出的消息messageViewList.forEach(messageView -> {// TODO 业务代码System.out.println("simpleConsumer 消费消息:" + messageView);try {// 处理完成需要手动编写 ACK 代码提交消费结果simpleConsumer.ack(messageView);} catch (ClientException e) {e.printStackTrace();throw new RuntimeException(e);}});}catch (Exception e){// 处理 receive 获取消息时的异常(比如:消息拉取失败),常常需要重新获取消息e.printStackTrace();}}}
相对于 PushConsumer 的高度封装, SimpleConsumer 更加灵活,适用于需要异步处理、需要高度自定义消费的消息。
invisibleDuration 消息不可见时间
invisibleDuration 此参数十分重要,需要根据消息消费的时长来设定,该参数除了设定消息的不可见时间,还涉及到消息的重试与重试时间间隔。甚至还有一个方法:simpleConsumer.changeInvisibleDuration(); 和 simpleConsumer.changeInvisibleDurationAsync() 来在必要时修改延长这个不可见时间。关于消费重试的相关问题,这个我们会在后续 《RocketMQ 消息重试机制》一章中统一说明。
消费者分组(消费者负载均衡)
消费者分组,在gRPC协议客户端代码中就是一个字符串如 setConsumerGroup(“MY_FIFO_GROUP”),但其有十分重要的作用。
消费者分组是 RocketMQ 系统中承载多个消费行为一致的消费者的负载均衡分组,消费者分组并不是运行实体,而是一个逻辑资源。通过消费者分组内初始化多个消费者实现消费性能的水平扩展以及高可用容灾。
广播消费和共享消费
RocketMQ 的领域模型中,一条消息可以由多个消费者分组订阅,一个消费组中又可以初始化多个消费者,同一个消费组的消费者共享消费消息,不同消费组的间的消费者是广播消费。
- 消费组间广播消费 :如上图所示,每个消费者分组只初始化唯一一个消费者,每个消费者可消费到消费者分组内所有的消息,各消费者分组都订阅相同的消息,以此实现单客户端级别的广播一对多推送效果。
- 消费组内共享消费 :如上图所示,每个消费者分组下初始化了多个消费者,这些消费者共同分担消费者分组内的所有消息,实现消费者分组内流量的水平拆分和均衡负载。
关于广播消费的情况,我们在前面的章节 《RocketMQ 发送事务消息》的示例中已经使用。
负载均衡策略
RocketMQ 5.x + gRPC 客户端,默认是消息粒度的负载均衡策略,同一消费组中的多个消费者按照消息的粒度平均分摊主题中的所有消息,但具体某个消息被分发到哪个消费者是随机的。
消费者获取某条消息后,服务端会将该消息加锁,保证这条消息对其他消费者不可见,直到该消息消费成功或消费超时,因此,即使多个消费者同时消费同一队列的消息,服务端也可保证消息不会被多个消费者重复消费。
多个消费者消费顺序消息
如上图所述,队列Queue1中有4条顺序消息,这4条消息属于同一消息组G1,存储顺序由M1到M4。在消费过程中,前面的消息M1、M2被消费者Consumer A1处理时,只要消费状态没有提交,消费者A2是无法并行消费后续的M3、M4消息的,必须等前面的消息提交消费状态后才能消费后面的消息。
多消费者消费顺序消息示例
import com.yyoo.mq.rocket.MyMQProperties;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;import java.nio.ByteBuffer;
import java.util.Collections;public class FifoConsumerManyDemo {public static void main(String[] args) throws ClientException {// 用于提供:生产者、消费者、消息对应的构建类 BuilderClientServiceProvider provider = ClientServiceProvider.loadService();// 构建配置类(包含端点位置、认证以及连接超时等的配置)ClientConfiguration configuration = ClientConfiguration.newBuilder()// endpoints 即为 proxy 的地址,多个用分号隔开。如:xxx:8081;xxx:8081.setEndpoints(MyMQProperties.ENDPOINTS).build();// 设置过滤条件(这里为使用 tag 进行过滤)String tag = "ORDER_CREATE";FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);// 构建消费者provider.newPushConsumerBuilder().setClientConfiguration(configuration)// 设置消费者分组.setConsumerGroup("MY_FIFO_GROUP")// 设置主题与消费者之间的订阅关系.setSubscriptionExpressions(Collections.singletonMap("MY_FIFO_TOPIC", filterExpression)).setMessageListener(messageView -> {System.out.println(messageView);ByteBuffer rs = messageView.getBody();byte[] rsByte = new byte[rs.limit()];rs.get(rsByte);if(new String(rsByte).contains("user2")) {System.out.println("consumer1 Message body:" + new String(rsByte));// 处理消息并返回消费结果。System.out.println("consumer1 Consume message successfully, messageId=" + messageView.getMessageId());}return ConsumeResult.SUCCESS;}).build();provider.newPushConsumerBuilder().setClientConfiguration(configuration)// 设置消费者分组.setConsumerGroup("MY_FIFO_GROUP")// 设置主题与消费者之间的订阅关系.setSubscriptionExpressions(Collections.singletonMap("MY_FIFO_TOPIC", filterExpression)).setMessageListener(messageView -> {System.out.println(messageView);ByteBuffer rs = messageView.getBody();byte[] rsByte = new byte[rs.limit()];rs.get(rsByte);if(new String(rsByte).contains("user2")) {System.out.println("consumer2 Message body:" + new String(rsByte));// 处理消息并返回消费结果。System.out.println("consumer2 Consume message successfully, messageId=" + messageView.getMessageId());}return ConsumeResult.SUCCESS;}).build();// 如果不需要再使用 PushConsumer,可关闭该实例。// pushConsumer.close();}}
生产者代码,请参考《RocketMQ 发送顺序消息》一章。为了方便验证,我们在示例中判断了单个用户的所有消息消费情况做打印,看看结果是否是根据单个用户顺序的。
消费者分组管理
RocketMQ 5.x 开始,除了我们前面说的 Topic 不建议自动创建外,消费者分组也不建议自动创建了。消费者分组也可以通过 mqadmin 命令来添加或删除。
关闭自动创建消费者分组
5.1.3 版本的 RocketMQ目前是默认开启字段添加主题的,可以配置 borker 参数 autoCreateSubscriptionGroup 为 false 来禁用。关于如何配置,我们会在后续文章中说明。我们建议 Topic 和 消费者分组都禁用自动创建。避免在使用上的混乱,给主题和消费者分组的管理上带来不必要的麻烦。
使用 admin tool 工具管理消费者分组
前面我们提到,消费者分组是否重要, 因为消息的负载均衡策略、重试机制、顺序消费等属性都与之相关。在 ~/stroe/config/subscriptionGroup.json 文件中,可以看到我们在顺序消息的消费者中定义的消费者分组(MY_FIFO_GROUP)信息,信息如下:
"MY_FIFO_GROUP":{"attributes":{},// 当前消费者 brokerId,-i 参数"brokerId":0,"consumeBroadcastEnable":false,// 是否是广播模式,-d 参数"consumeEnable":true, // 分组是否允许消费,-s 参数"consumeFromMinEnable":false,// 是否从最小offset开始消费,-m 参数"consumeMessageOrderly":true,// 是否顺序消费,-o 参数"consumeTimeoutMinute":15,"groupName":"MY_FIFO_GROUP","groupRetryPolicy":{ // 重试策略"type":"CUSTOMIZED"},"groupSysFlag":0,// 当消费者数量变化时是否通知其他消费者负载均衡,-a 参数"notifyConsumerIdsChangedEnable":true,"retryMaxTimes":16, // 最大重试次数"retryQueueNums":1, // 重试队列数// 如果当前broker执行消费慢,使用另一个broker的id,-w 参数"whichBrokerWhenConsumeSlowly":1
},
Store 目录的配置,可以通过 ./mqadmin getBrokerConfig 命令查看 Broker 的相关配置中的 storePathRootDir 对应的位置。此地址默认为 ~/stroe
updateSubGroup 更新或修改订阅关系(更新或修改消费者分组信息)
查看命令选项
$> ./mqadmin updateSubGroup -husage: mqadmin updateSubGroup [-a <arg>] [--attributes <arg>] [-b <arg>] [-c <arg>] [-d <arg>] -g <arg> [-h][-i <arg>] [-m <arg>] [-n <arg>] [-o <arg>] [-p <arg>] [-q <arg>] [-r <arg>] [-s <arg>] [-w <arg>]-a,--notifyConsumerIdsChanged <arg> notify consumerId changed--attributes <arg> attribute(+a=b,+c=d,-e)-b,--brokerAddr <arg> create subscription group to which broker-c,--clusterName <arg> create subscription group to which cluster-d,--consumeBroadcastEnable <arg> broadcast-g,--groupName <arg> consumer group name-h,--help Print help-i,--brokerId <arg> consumer from which broker id-m,--consumeFromMinEnable <arg> from min offset-n,--namesrvAddr <arg> Name server address list, eg: '192.168.0.1:9876;192.168.0.2:9876'-o,--consumeMessageOrderly <arg> consume message orderly-p,--groupRetryPolicy <arg> the json string of retry policy ( exp:{"type":"EXPONENTIAL","exponentialRetryPolicy":{"initial":5000,"max":7200000,"multiplier":2}}{"type":"CUSTOMIZED","customizedRetryPolicy":{"next":[1000,5000,10000]}} )-q,--retryQueueNums <arg> retry queue nums-r,--retryMaxTimes <arg> retry max times-s,--consumeEnable <arg> consume enable-w,--whichBrokerWhenConsumeSlowly <arg> which broker id when consume slowly
使用此命令,我们可以添加消费者分组,并且可以设置分组相关信息,比如-o 顺序消费,-r 最大重试次数等。-o 参数我们在《RocketMQ 发送顺序消息》一章使用了。
示例
./mqadmin updateSubGroup -n 127.0.0.1:9876 -g MY_FIFO_GROUP -o true -c DefaultCluster
-n、-c、-g 参数为必须
deleteSubGroup 从 Broker 删除订阅关系(删除消费者分组)
查看命令选项
$> ./mqadmin deleteSubGroup -husage: mqadmin deleteSubGroup [-b <arg>] [-c <arg>] -g <arg> [-h] [-n <arg>] [-r <arg>]-b,--brokerAddr <arg> delete subscription group from which broker-c,--clusterName <arg> delete subscription group from which cluster-g,--groupName <arg> subscription group name-h,--help Print help-n,--namesrvAddr <arg> Name server address list, eg: '192.168.0.1:9876;192.168.0.2:9876'
示例
$> ./mqadmin deleteSubGroup -n 127.0.0.1:9876 -g MY_FIFO_GROUP -c DefaultCluster
消费重试的相关问题我们将在下一章《RocketMQ 消息重试机制》中详细说明。