1 消息发送重试和流控机制
本节介绍Apache RocketMQ的消息发送重试机制和消息流控机制
1.1 消息发送重试机制
1.1.1 重试基本概念
Apache RocketMQ 客户端连接服务端发起消息发送请求时,可能会因为网络故障、服务异常等原因导致调用失败。为保证消息的可靠性,Apache RocketMQ在客户端SDK中内置请求重试逻辑,尝试通过重试发送达到最终调用成功的效果。
同步发送和异步发送模式均支持消息发送重试
重试触发条件
触发消息发送重试机制的条件如下:
1、客户端消息发送请求调用失败或请求超时
2、网络异常造成连接失败或请求超时
3、服务端节点处于重启或下线等状态造成连接失败
4、服务端运行慢造成请求超时
5、服务端返回失败错误码
(1)系统逻辑错误:因运行逻辑不正确造成的错误。
(2)系统流控错误:因容量超限造成的流控错误。
1.1.2 重试流程
生产者在初始化时设置消息发送最大重试次数,当出现上述触发条件的场景时,生产者客户端会按照设置的重试次数一直重试发送消息,直到消息发送成功或达到最大重试次数重试结束,并在最后一次重试失败后返回调用错误响应。
同步发送:调用线程会一直阻塞,直到某次重试成功或最终重试失败,抛出错误码和异常。
异步发送:调用线程不会阻塞,但调用结果会通过异常事件或成功事件返回。
1.1.3 重试间隔
除服务端返回系统流控错误场景,其他触发条件触发重试后,均会立即进行重试,无等待间隔。
若由于服务端返回流控错误触发重试,系统会按照指数退避策略进行延迟重试。指数退避算法通过以下参数控制重试行为:
1、INITIAL_BACKOFF:第一次失败重试前后需要等待多久,默认值:1秒。
2、MULTIPLIER:指数退避因子,即退避倍率,默认值:1.6.
3、JITTER:随机抖动因子,默认值:0.2
4、MAX_BACKOFF:等待间隔时间上限,默认值:120秒。
5、MIN_CONNECT_TIMEOUT:最短重试时间,默认值:20秒。
1.1.4 功能约束
1、链路耗时阻塞评估:从上述重试机制可以看出,在重试流程中生产者仅能控制最大重试次数。若由于系统异常触发了SDK内置的重试逻辑,则服务端需要等待最终重试结果,可能会导致消息发送请求链路被阻塞。对于某些实时调用类场景,需要合理评估调用请求的超时时间以及最大重试次数,避免影响全链路的耗时。
2、最终异常兜底:Apache RocketMQ 客户端内置的发送请求重试机制并不能保证消息发送一定成功。当最终重试仍然失败时,业务方调用需要捕获异常,并做好冗余保护处理,避免消息发送结果不一致。
3、消息重复问题:因远程调用的不确定性,当Apache RocketMQ客户端因请求超时触发消发送重试流程,此时客户端无法感知服务端的处理结果,客户端进行的消息发送重试可能会产生消息重复问题,业务逻辑需要自行处理消息重复问题。
1.2 消息流控机制
1.2.1 消息流控基本概念
消息流控指的是系统容量使用率超过阈值时,Apache RocketMQ服务端会通过快速返回失败流控错误来避免底层资源承受过高压力。
1.2.2 触发条件
Apache RocketMQ的消息流控触发条件如下:
1、存储压力过大:消费者分组的初始消费位点为当前队列的最大消费位点。若某些场景如业务上新等需要回溯到指定时刻前开始消费,此时队列的存储压力会瞬间飙升,触发消息流控。
2、服务端请求任务排队溢出:若消费者消费能力不足,导致队列中有大量堆积消息,当堆积消息超过一定数量后会触发消息流控,减少下游消费系统压力。
1.2.3 流控行为
当系统触发消息发送流控时,客户端会收到系统限流错误和异常,错误码信息如下:
1、reply-code:530
2、reply-text:TOO_MANY_REQUESTS
客户端收到系统流控错误码后,会根据指数退避策略进行消息发送重试。
1.2.4 处理建议
如何避免触发消息流控:触发限流的根本原因是系统容量使用率到达阈值,可以利用可观测性功能监控系统的使用率,保证底层资源 充足,避免触发流控机制。
突发消息流控处理:如果因为突发原因触发消息流控,且客户端内置的重试流程执行失败,则建议业务方将请求调用临时替换到其他系统进行应急处理。
2 消费者分类
Apache RocketMQ 支持PushConsumer、SimpleConsumer、PullConsumer三种类型的消费者,本节分别从使用方式、实现原理、可靠性和使用场景分别介绍这三种类型的消费者。
2.1 背景信息
Apache RocketMQ 面向不同的业务场景提供了不同消费者类型,每种消费者类型的集成方式和控制方式都不一样。了解如下问题,可以选择更匹配业务场景的消费者类型。
如何实现并发消费:消费者如何使用并发的多线程机制处理消息,以此提高消息处理效率?
如何实现同步、异步消息处理:对于不同的集成场景,消费者获取消息后可能会将消息异步分发到业务逻辑中处理,此时,消息异步化处理如何实现?
如何实现消息可靠处理:消费者处理消息时如何返回响应结果?如何在消息异常情况下进行重试,保证消息的可靠处理?
2.2 功能描述
如上图所示,Apache RocketMQ 的消费者处理消息时主要经过以下阶段:消息获取——>消息处理——>消费状态提交。
针对以上几个阶段,Apache RocketMQ提供了不同的消费者类型:PushConsumer、SimpleConsumer、PullConsumer。具体差异如下:
【注意】在实际使用场景中,PullConsumer仅推荐在流处理框架中集成使用,大多数消息收发场景使用PushConsumer和SimpleConsumer就可以满足。如果业务场景发生变更,或当前使用的消费者类型不适合当前业务,可以选择在PushConsumer和SimpleConsumer之间变更消费者类型。变更消费者类型不影响当前 Apache RocketMQ资源的使用和业务处理。
【危险】生产环境中相同的ConsumerGroup下严禁混用PullConsumer和其他两种消费者,否则会导致消息消费异常。
对比项 | PushConsumer | SimpleConsumer | PullConsumer |
接口方式 | 使用监听器回调接口返回消费 结果,消费者仅允许在监听器 范围内处理消费逻辑 | 业务方自行实现消息 处理,并主动调用接口 返回消费结果。 | 业务方自动按队列拉取 消息,并可选择性地提交 消费结果。 |
消费并发 度管理 | 由SDK管理消费并发度 | 由业务方消费逻辑 自行管理消费线程 | 由业务方消费逻辑自行管理 消费线程。 |
负载均衡 粒度 | 5.0SDK是消息粒度,更均衡。 早起版本是队列维度。 | 消息粒度,更均衡。 | 队列粒度,吞吐比性能更好, 但容易不均衡 |
接口灵活度 | 高度封装,不够灵活。 | 原子接口,可灵活自动以。 | 原子接口,可灵活自动以。 |
适用场景 | 适用于无自定义流程的业务 消息开发场景 | 适用于需要高度自定义 业务流程的业务开发场景 | 仅推荐在流处理框架场景下集成使用。 |
2.3 PushConsumer
PushConsumer是一种高度封装的消费者类型,消费消息仅通过消费监听器处理业务并返回消费结果。消息的获取、消费状态提交以及消费重试都通过Apache RocketMQ的客户端SDK完成。
2.3.1 使用方式
PushConsumer的使用方式比较固定,在消费者初始化时注册一个消费监听器,并在消费监听器内部实现消息处理逻辑。由Apache RocketMQ的SDK在后台完成消息获取、触发监听器调用以及进行消息重试处理。
public class PushConsumerTest {public static void main(String[] args) throws ClientException {ClientServiceProvider provider = ClientServiceProvider.loadService();String topic = "myTopic";FilterExpression filterExpression = new FilterExpression("filtertag", FilterExpressionType.TAG);PushConsumer pushConsumer = provider.newPushConsumerBuilder()//设置消费者分组.setConsumerGroup("ConsumerGroupName")//设置接入点.setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints("endpoint").build())//设置预绑定的订阅关系.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))//设置消费监听器.setMessageListener(new MessageListener() {@Overridepublic ConsumeResult consume(MessageView messageView) {//消费消息并返回处理结果return ConsumeResult.SUCCESS;}}).build();}
}
PushConsumer的消费监听器执行结果分为以下三种情况:
1、返回消费成功:以Java SDK为例,返回ConsumeResult.SUCCESS,表示该消息处理成功,服务端按照消费结果更新消费进度。
2、返回消费失败,以Java SDK为例,返回ConsumeResult.FAILURE,表示该消息处理失败,需要根据消费重试逻辑判断是否需要进行重试消费。
3、出现非预期失败:例如抛出异常行文,告诫过按照消费失败处理,需要根据消费重试逻辑判断是否需要重试消费。
PushConsumer消费消息时,若消息处理逻辑出现预期之外的阻塞导致消息处理一直无法执行成功,SDK会按照消费超时处理强制提交消费失败结果,并按照消费重试逻辑进行处理。
2.3.2 内部原理
在PushConsumer类型中,消息的实时处理能力时基于SDK内部的典型Reactor线程模型实现的。如下图所示,SDK内置了一个长轮询线程,先将消息异步拉取到SDK内置的缓存队列中,再分别提交到消费线程中,触发监听器执行本地消费逻辑。
可靠性重试
PushConsumer消费者类型中,客户端SDK和消费逻辑的唯一边界是消费监听器接口。客户端SDK严格按照监听器的返回结果判断消息是否消费成功,并做可靠性重试。所有消息必须以同步方式进行消费处理,并在监听器接口结束时返回调用结果,不允许再做异步分发。
使用PushConsumer消费者消费时,不允许使用以下方式处理消息,否则Apache RocketMQ无法保证消息的可靠性。
错误方式一:消息还未处理完成,就提前返回消费成功结果。此时如果消息消费失败,Apache RocketMQ服务端是无法感知的,因此不会进行消费重试。
错误方式二:在消费监听器内将消息再次分发到自定义的其他线程,消费监听器提前返回消费结果。此时如果消息消费失败,Apache RocketMQ服务端是无法感知的,因此不会进行消费重试。
顺序性保障
基于Apache RocketMQ顺序消息的定义,如果消费者分组设置了顺序消费模式,则PushConsumer在触发消费监听器时,严格遵守消息的先后顺序。业务处理逻辑无法感知即可保证消息的消费顺序。
【备注】消息消费按照顺序处理的前提是遵循同步提交原则,如果业务逻辑自定义实现了异步分发,则Apache RocketMQ无法保证消息的顺序性。
使用场景
PushConsumer严格限制了消息同步处理及每条消息的处理超时时间,适用于以下场景:
1、消息处理时间可预估:如果不确定消息处理耗时,经常有预期之外的长时间耗时的消息,PushConsumer的可靠性保证会频繁触发消息重试机制造成大量重复消息。
2、无异步化、高级定制场景:PushConsumer限制了消费逻辑的线程模式,由客户端SDK内部按最大吞吐量触发消息处理。该模式开发逻辑简单,但是不允许使用异步化和自定义处理流程。
2.4 SimpleConsumer
SimpleConsumer是一种接口原子型的消费者类型,消息的获取、消费状态提交以及消费重试都是通过消费者业务逻辑主动发起调用完成的。
2.4.1 使用方式
SimpleConsumer的使用涉及多个接口调用,由业务逻辑按需调用接口获取消息,然后分发给业务线程处理消息,最后按照处理的结果调用提交接口,返回服务端当前消息的处理结果。
public class SimpleConsumerTest {public static void main(String[] args) throws ClientException {ClientServiceProvider provider = ClientServiceProvider.loadService();String topic = "MyTopic";FilterExpression filterExpression = new FilterExpression("MyFilterTag", FilterExpressionType.TAG);SimpleConsumer simpleConsumer = provider.newSimpleConsumerBuilder()//设置消费者分组.setConsumerGroup("MyConsumerGroupName")//设置接入点.setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints("MyEndpoint").build())//设置预绑定的订阅关系.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))//设置从服务端接收信息的最大等待时间.setAwaitDuration(Duration.ofSeconds(1)).build();try {//SimpleConsumer主动获取消息,并处理List<MessageView> messageViewList = simpleConsumer.receive(10,Duration.ofSeconds(30));messageViewList.forEach(messageView -> {System.out.println(messageView);//处理完成后,需要主动调用Ack提交消费结果try {simpleConsumer.ack(messageView);}catch (ClientException e){e.printStackTrace();}});}catch (ClientException e){//如果遇到系统流控等原因造成拉取失败,需要重新发起消息请求e.printStackTrace();}}
}
SimpleConsumer主要涉及以下几个接口行为:
接口名称 | 主要作用 | 可修改参数 |
ReceiveMessage | 消费者主动调用该接口从服务端获取消息。 说明:由于服务端存储为分布式,可能会出现 服务端实际有消息,但是返回为空 的现象,一般可通过重新发起ReceiveMessage 调用或提高ReceiveMessage的并发解决 | 批量拉取消息数:SimpleConsumer 可以一次性批量获取多条消息实现 批量消费,该接口可修改批量获取 的消息数量。 消费不可见时间:消息的最长处理耗时,该参数用于控制消费失败时 的消息重试间隔。 |
AckMessage | 消费者成功消费消息后,主动调用 该接口向服务端返回消费成功 相应。 | 无 |
ChangeInvisibleDuration | 消费重试场景下,消费者可通过 该接口修改信息处理时长,即 控制信息的重试间隔。 | 消费不可见时间:调用本接口可修改 ReceiveMessage接口预设的消费 不可见时间的参数值。一般用于需要 延长消息处理时长的场景。 |
2.4.2 可靠性重试
SimpleConsumer消费者类型中,客户端SDK和服务端通过ReceiveMessage和AckMessage接口通信。客户端SDK如果处理消息成功则调用AckMessage接口;如果处理失败只需要不会ACK响应,即可在定义的消费不可见时间到达后触发消费重试流程。
2.4.3 顺序性保障
基于Apache RocketMQ顺序消息的定义,SimpleConsumer在处理顺序消息时,会按照消息存储的先后顺序获取消息。即需要保持顺序的一组消息中,如果前面的消息为处理完成,则无法获取到后面的消息。
2.4.4 使用场景
SimpleConsumer提供原子接口,用于消息获取和提交消费结果,相对于PushConsumer方式更加灵活。SimpleConsumer适用于以下场景:
1、消息处理时长不可控:如果消息处理时长无法预估,经常有长时间耗时的消息处理情况。建议使用SimpleConsumer消费类型,可以在消费时自定义消息的预估处理时长,若实际业务中预估的消息处理时长不符合预期,也可以通过接口提前修改。
2、需要异步化、批量消费等高级定制场景:SimpleConsumer在SDK内部没有复杂的线程封装,完全由业务逻辑自由定制,可以实现异步分发、批量消费等高级定制场景。
3、需要自定义消费速率:SimpleConsumer是由业务逻辑主动调用接口获取消息,因此可以自由调整获取消息的频率,自定义控制消费速率。
2.5 使用建议
PushConsumer合理控制消费耗时,避免无限阻塞
对于PushConsumer消费类型,需要严格控制信息的消费耗时,尽量避免出现消息处理超时导致消息重复。如果业务经常会出现一些预期外的长时间耗时的消息 ,建议使用SimpleConsumer,并设置好消费不可见时间。
3 消息过滤
消费者订阅某个主题后,Apache RocketMQ会将该主题中的所有消息投递给消费者。若消费者只需要关注部分信息,可通过设置过滤条件在Apache RocketMQ服务端进行过滤,只获取到需要关注的消息子集,避免接收到大量无效的消息。
3.1 应用场景
Apache RocketMQ作为发布订阅模型的消息中间件广泛应用于上下业务集成场景。在实际业务场景中,同一个主题下的消息往往会被多个不同的下游业务方处理,各下游的处理逻辑不同,只关注自身逻辑需要的消息子集。
使用Apache RocketMQ的消息过滤功能,可以帮助消费者更高效的过滤子集需要的消息集合,避免大量无效消息投递给消费者,降低下游系统处理压力。
Apache RocketMQ主要解决的单个业务域即同一个主题内不同消息子集的过滤问题,一般是基于同一个业务下更具体的分类进行过滤匹配。如果是需要对不同业务域的消息进行拆分,建议使用不同主题处理不同业务域的消息。
3.2 功能概述
消息过滤定义
过滤的含义指的是将符合条件的消息投递给消费者,而不是将匹配的效果过滤掉。Apache RocketMQ的消息过滤功能通过生产者和消费者对消息的属性、标签进行定义,并在Apache RocketMQ服务端根据过滤条件进行筛选匹配,将符合条件的消息投递给消费者进行消费。
消息过滤原理
消息过滤主要通过以下几个关键流程实现:
1、生产者:生产者在初始化消息时预先为消息设置一些属性和标签,用于后续消费时指定过滤目标。
2、消费者:消费者在初始化及后续消费流程中通过调用订阅关系注册接口,向服务端上报需要订阅指定主题的哪些消息,即过滤条件。
3、服务端:消费者获取消息时会触发服务端的动态过滤计算,Apache RocketMQ服务端根据消费者上报的过滤条件的表达式进行匹配,并将符合条件的消息投递给消费者。
消息过滤分类
Apache RocketMQ支持Tag标签过滤和SQL属性过滤,两种过滤方式对比如下:
对比项 | Tag标签过滤 | SQL属性过滤 |
过滤目标 | 消息的Tag标签 | 消息的属性,包括用于自定义属性以及 系统属性(Tag是一种系统属性) |
过滤能力 | 精准匹配 | SQL语法匹配 |
适用场景 | 简单过滤场景、计算逻辑 简单轻量。 | 复杂过滤场景,计算逻辑较为复杂 |
3.3 订阅关系一致性
过滤表达式属于订阅关系的一部分,Apache RocketMQ的领域模型规定,同一消费者分组内的多个消费者的订阅关系包括过滤表达式,必须保持一致,否则可能会导致部分消息消费不到。
3.4 Tag标签过滤
Tag标签过滤式Apache RocketMQ提供的基础消息过滤功能,基于生产者为消息设置的Tag标签进行匹配。生产者发生消息时,设置消息的标签,消费者需指定已有的标签来进行匹配订阅。
场景示例
以下图电商交易场景为例,从客户下单到收到商品这一过程会生产一系列消息:订单消息、支付消息、物流消息等。这些消息会发送到名为Trade_Topic的主题中,被各个不同的下游系统订阅:支付系统(订阅支付消息)、物流系统(订阅物流消息)、交易成功率分析系统(订阅订单和支付消息)、实时计算系统(订阅所有和交易相关的消息)。
过滤效果图如下:
Tag标签设置
Tag由生产者发送消息时设置,每条消息允许设置一个Tag标签
Tag使用可见字符,建议长度不超过128字符
Tag标签过滤规则
Tag标签过滤为精准字符串匹配,过滤规则设置格式如下:
1、单Tag匹配:过滤表达式为目标Tag。表示只有消息标签为指定目标Tag的消息符合匹配条件,会被发送给消费者。
2、多Tag匹配:多个Tag之间为或的关系,不同Tag间使用两个竖线(||)隔开。例如Tag1||Tag2||Tag3,表示标签为Tag1或Tag2或Tag3的消息都满足匹配条件,都会被发送给消费者进行消息。
3、全部匹配:使用星号(*)作为全匹配表达式。表示主题下的所有消息都将被发送给消费者进行消费。
使用示例
发送消息,使用Tag标签
public static void test01(){MessageBuilderImpl messageBuilder = new MessageBuilderImpl();Message message = messageBuilder.setTopic("topic")//设置消息索引键,可根据关键字精确查找某条消息.setKeys("messageKey")//设置消息tag,用于消费端根据指定tag过滤消息,该示例表示消息的tag设置为TagA.setTag("TagA").setBody("messageBody".getBytes()).build();}
订阅关系,匹配单个Tag标签
public static void test02() throws ClientException {PushConsumerBuilderImpl pushConsumerBuilder = new PushConsumerBuilderImpl();PushConsumer pushConsumer = pushConsumerBuilder.build();String topic = "MyTopic";FilterExpression filterExpression = new FilterExpression("TagA", FilterExpressionType.TAG);pushConsumer.subscribe(topic,filterExpression);}
订阅消息,匹配多个Tag标签
public static void test03() throws ClientException {PushConsumerBuilderImpl pushConsumerBuilder = new PushConsumerBuilderImpl();PushConsumer pushConsumer = pushConsumerBuilder.build();String topic = "MyTopic";FilterExpression filterExpression = new FilterExpression("TagA||TagB||TagC", FilterExpressionType.TAG);pushConsumer.subscribe(topic,filterExpression);}
订阅消息,匹配topic中的所有消息,不进行过滤
public static void test04() throws ClientException {PushConsumerBuilderImpl pushConsumerBuilder = new PushConsumerBuilderImpl();PushConsumer pushConsumer = pushConsumerBuilder.build();String topic = "MyTopic";FilterExpression filterExpression = new FilterExpression("*", FilterExpressionType.TAG);pushConsumer.subscribe(topic,filterExpression);}
3.5 SQL属性过滤
SQL属性过滤是Apache RocketMQ提供的高级消息过滤方式,通过生产者为消息设置的属性(key)即属性值(value)进行匹配。生产者在发送消息时可设置多个属性,消费者订阅时可设置SQL语法的过滤表达式过滤多个属性。
【提示】Tag是一种系统属性,所以SQL过滤方式也兼容Tag标签过滤。在SQL语法中,Tag的属性名称为TAGS。
场景示例
以下图电商交易场景为例,从客户下单到收到商品这一过程会生产一系列消息,按照类型将消息分为订单消息和物流消息,其中给物流消息定义地域属性,按照地域分为杭州和上海。
· 订单消息
· 物流消息
○ 物流消息且地域为杭州
○物流消息且地域为上海
这些消息会发送到名称为Trade_Topic中,被各个不同的系统所订阅:物流系统1(订阅物流消息且地域为杭州)、物流系统2(订阅物流消息且地域为杭州或上海)、订单跟踪系统(订阅订单消息)、实时计算系统(订阅所有跟交易有关的消息)。
过滤效果图如下:
消息属性设置
生产者发送消息时可以自定义消息属性,每个属性都是一个自定义的键值对(key-value)。
每条消息支持设置多个属性。
SQL属性过滤规则
SQL属性过滤使用SQL92语法作为过滤规则表达式,语法规范如下:
语法 | 说明 | 示例 |
IS NULL | 判断属性不存在。 | a IS NULL :属性a不存在。 |
IS NOT NULL | 判断属性存在。 | a IS NOT NULL :属性a存在。 |
> >= < <= | 用于比较数字,不能用于比较字符串,否则消费者客户端启动时会报错。 说明 可转化为数字的字符串也被认为是数字。 | a IS NOT NULL AND a > 100 :属性a存在且属性a的值大于100。 a IS NOT NULL AND a > 'abc' :错误示例,abc为字符串,不能用于比较大小。 |
BETWEEN xxx AND xxx | 用于比较数字,不能用于比较字符串,否则消费者客户端启动时会报错。等价于>= xxx AND \<= xxx。表示属性值在两个数字之间。 | a IS NOT NULL AND (a BETWEEN 10 AND 100) :属性a存在且属性a的值大于等于10且小于等于100。 |
NOT BETWEEN xxx AND xxx | 用于比较数字,不能用于比较字符串,否则消费者客户端启动会报错。等价于\< xxx OR > xxx,表示属性值在两个值的区间之外。 | a IS NOT NULL AND (a NOT BETWEEN 10 AND 100) :属性a存在且属性a的值小于10或大于100。 |
IN (xxx, xxx) | 表示属性的值在某个集合内。集合的元素只能是字符串。 | a IS NOT NULL AND (a IN ('abc', 'def')) :属性a存在且属性a的值为abc或def。 |
= <> | 等于和不等于。可用于比较数字和字符串。 | a IS NOT NULL AND (a = 'abc' OR a<>'def') :属性a存在且属性a的值为abc或a的值不为def。 |
AND OR | 逻辑与、逻辑或。可用于组合任意简单的逻辑判断,需要将每个逻辑判断内容放入括号内。 | a IS NOT NULL AND (a > 100) OR (b IS NULL) :属性a存在且属性a的值大于100或属性b不存在。 |
由于SQL属性过滤是生产者定义消息属性,消费者设置SQL过滤条件,因此过滤条件的计算结果具有不确定性,服务端的处理方式如下:
-
异常情况处理:如果过滤条件的表达式计算抛异常,消息默认被过滤,不会被投递给消费者。例如比较数字和非数字类型的值。
-
空值情况处理:如果过滤条件的表达式计算值为null或不是布尔类型(true和false),则消息默认被过滤,不会被投递给消费者。例如发送消息时未定义某个属性,在订阅时过滤条件中直接使用该属性,则过滤条件的表达式计算结果为null。
-
数值类型不符处理:如果消息自定义属性为浮点型,但过滤条件中使用整数进行判断,则消息默认被过滤,不会被投递给消费者。
使用示例
发送消息,同时设置消息Tag和自定义属性
public static void test05(){MessageBuilderImpl messageBuilder = new MessageBuilderImpl();Message message = messageBuilder.setTopic("topic")//设置消息索引键,可根据关键字精确查找某条消息.setKeys("messageKey")//设置消息tag,用于消费端根据指定tag过滤消息,该示例表示消息的tag设置为TagA.setTag("TagA")//消息也可以设置自定义的分类属性,例如表镜标签、地域、逻辑分支//该示例表示为消息自定义一个属性,该属性为地域,属性值为杭州.addProperty("Region","Hangzhou").setBody("messageBody".getBytes()).build();}
订阅消息,根据单个自定义属性匹配消息。
public static void test06() throws ClientException {SimpleConsumerBuilderImpl simpleConsumerBuilder = new SimpleConsumerBuilderImpl();SimpleConsumer simpleConsumer = simpleConsumerBuilder.build();String topic = "MyTopic";FilterExpression filterExpression = new FilterExpression("Region IS NOT NULL AND Region='Hangzhou'",FilterExpressionType.SQL92);simpleConsumer.subscribe(topic,filterExpression);}
订阅消息,同时根据多个自定义属性匹配消息
public static void test07() throws ClientException {SimpleConsumerBuilderImpl simpleConsumerBuilder = new SimpleConsumerBuilderImpl();SimpleConsumer simpleConsumer = simpleConsumerBuilder.build();String topic = "MyTopic";//订阅地域为杭州,且价格属性大于30的消息FilterExpression filterExpression = new FilterExpression("Region IS NOT NULL AND price IS NOT NULL Region='Hangzhou' AND price > 30",FilterExpressionType.SQL92);simpleConsumer.subscribe(topic,filterExpression);}
订阅消息,匹配Topic中所有的消息,不进行过滤
public static void test08() throws ClientException {SimpleConsumerBuilderImpl simpleConsumerBuilder = new SimpleConsumerBuilderImpl();SimpleConsumer simpleConsumer = simpleConsumerBuilder.build();String topic = "MyTopic";FilterExpression filterExpression = new FilterExpression("True",FilterExpressionType.SQL92);simpleConsumer.subscribe(topic,filterExpression);}
3.6 使用建议
合理划分主题和Tag标签
从消息的过滤机制和主题的原理机制可以看出,业务消息的拆分可以基于主题进行筛选,也可以基于主题内消息的Tag标签及属性进行筛选。关注拆分方式的选择,应遵循以下原则:
1、消息类型是否一致:不同类型的消息,如顺序消息和普通消息需要使用不同的主题进行拆分,无法通过Tag标签进行分类。
2、业务域是否相同:不同业务域和部门的消息应该拆分不同的主题。例如物流消息和支付消息应该使用两个不同的主题;同样是一个主题内的物流消息,普通物流消息和加急物流消息则可以通过不同的Tag进行区分。
3、消息量级和重要性是否一致:如果消息的量级规模存在巨大差异,或者说消息的链路重要程度存在差异,则应该使用不同的主题进行隔离拆分。