RocketMQ(二)RocketMQ实战

文章目录

    • 一、RocketMQ实战
      • 1.1 批量消息发送
      • 1.2 消息发送队列自选择
      • 1.3 事务消息
      • 1.4 SpringCloud集成RocketMQ
    • 二、最佳实践
      • 2.1 生产者
        • 2.1.1 发送消息注意事项
        • 2.1.2 消息发送失败处理方式
      • 2.2 消费者
        • 2.2.1 消费过程幂等
        • 2.2.2 消费打印日志
      • 2.3 Broker
    • 三、相关问题
      • 3.1 为什么要使用消息队列
      • 3.2 为什么要选择RocketMQ
      • 3.3 RocketMQ有什么优缺点
      • 3.4 消息队列有哪些消息模型
      • 3.5 【RocketMQ用什么消息模型】
      • 3.6 【消息的消费模式】
      • 6.7 RoctetMQ架构
      • 3.8 【如何保证消息的可靠性(不丢失)】
      • 3.9 【如何处理消息重复的问题】
      • 3.10 怎么处理消息积压
      • 3.11 顺序消息如何实现
      • 3.12 如何实现消息过滤
      • 3.13 延时消息
      • 3.14 【怎么实现分布式消息事务的】
      • 3.15 死信队列
      • 3.16 如何保证RocketMQ的高可用
      • 3.17 【RocketMQ的整体工作流程】
      • 3.18 为什么RocketMQ不使用Zookeeper作为注册中心
      • 3.19 Broker是怎么保存数据的
      • 3.20 RocketMQ怎么对文件进行读写的
      • 3.21 消息刷盘怎么实现的
      • 3.22 什么时候清理过期消息
      • 3.23 【RocketMQ的负载均衡是如何实现的】
      • 3.24 消息队列设计成推消息还是拉消息
      • 3.25 如何设计一个消息队列*
      • 3.26 RocketMQ消息体过大的解决方案
      • 3.27 【如何保证幂等性】

  本系列文章:
    RocketMQ(一)消息的发送、存储与消费
    RocketMQ(二)RocketMQ实战

一、RocketMQ实战

1.1 批量消息发送

  RocketMQ批量消息发送是将同一主题的多条消息打包后一次性发送到消息服务端,减少网络调用的次数和网络通信资源的损耗,提高网络传输效率。示例:

		DefaultMQProducer producer=new DefaultMQProducer("BatchProducerGroupName");producer.setNamesrvAddr("127.0.0.1:9876");producer.start();String topic = "BatchTest";List<Message> messages = new ArrayList<>();messages.add(new Message(topic, "Tag", "OrderID001","Hello world1".getBytes()));messages.add(new Message(topic, "Tag", "OrderID002","Hello world2".getBytes()));System.out.println(producer.send(messages));producer.shutdown();

1.2 消息发送队列自选择

  消息发送默认根据主题的路由信息(主题消息队列)进行负载均衡,负载均衡机制为轮询策略。假设这样一个场景,订单的状态变更消息发送到特定主题,为了避免消息消费者同时消费同一订单不同状态的变更消息,在开发过程中我们应该使用顺序消息。为了提高消息消费的并发度,如果我们能根据某种负载算法,将相同订单不同的消息统一发送到同一个消息消费队列上,则可以避免引入分布式锁,RocketMQ在消息发送时提供了消息队列选择器MessageQueueSelector。示例:

	String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD","TagE"};for (int i = 0; i < 100; i++) {int orderId = i % 10;Message msg = new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(msg, new MessageQueueSelector() {public MessageQueue select(List<MessageQueue> mqs,Message msg, Object arg){Integer id = (Integer) arg;int index = id % mqs.size();return mqs.get(index);}}, orderId);System.out.printf("%s%n", sendResult);}

1.3 事务消息

  事务消息是RocketMQ提供的一种高级消息类型,支持在分布式场景下保障消息生产和本地事务的最终一致性。

  • 事务消息处理流程
  1. 生产者将消息发送至RocketMQ服务端。
  2. RocketMQ服务端将消息持久化成功之后,向生产者返回Ack确认消息已经发送成功,此时消息被标记为"暂不能投递",这种状态下的消息即为半事务消息。
  3. 生产者开始执行本地事务逻辑。
  4. 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:

  二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。
  二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。

  1. 在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。 说明 服务端回查的间隔时间和最大回查次数,请参见参数限制。
  2. 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
  3. 生产者根据检查到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。
  • 使用限制
      1、消息类型一致性。事务消息仅支持在 MessageType 为 Transaction 的主题内使用,即事务消息只能发送至类型为事务消息的主题中,发送的消息的类型必须和主题的类型一致。
      2、消费事务性。RocketMQ 事务消息保证本地主分支事务和下游消息发送事务的一致性,但不保证消息消费结果和上游事务的一致性。因此需要下游业务分支自行保证消息正确处理,建议消费端做好消费重试,如果有短暂失败可以利用重试机制保证最终处理成功。
      3、中间状态可见性。RocketMQ 事务消息为最终一致性,即在消息提交到下游消费端处理完成之前,下游分支和上游事务之间的状态会不一致。因此,事务消息仅适合接受异步执行的事务场景。
      4、事务超时机制。RocketMQ 事务消息的生命周期存在超时机制,即半事务消息被生产者发送服务端后,如果在指定时间内服务端无法确认提交或者回滚状态,则消息默认会被回滚。
  • 使用示例
      事务消息相比普通消息发送时需要修改以下几点:

  发送事务消息前,需要开启事务并关联本地的事务执行。
  为保证事务一致性,在构建生产者时,必须设置事务检查器和预绑定事务消息发送的主题列表,客户端内置的事务检查器会对绑定的事务主题做异常状态恢复。

    //演示demo,模拟订单表查询服务,用来确认订单事务是否提交成功。private static boolean checkOrderById(String orderId) {return true;}//演示demo,模拟本地事务的执行结果。private static boolean doLocalTransaction() {return true;}public static void main(String[] args) throws ClientException {ClientServiceProvider provider = new ClientServiceProvider();MessageBuilder messageBuilder = new MessageBuilderImpl();//构造事务生产者:事务消息需要生产者构建一个事务检查器,用于检查确认异常半事务的中间状态。Producer producer = provider.newProducerBuilder().setTransactionChecker(messageView -> {/*** 事务检查器一般是根据业务的ID去检查本地事务是否正确提交还是回滚,此处以订单ID属性为例。* 在订单表找到了这个订单,说明本地事务插入订单的操作已经正确提交;如果订单表没有订单,说明本地事务已经回滚。*/final String orderId = messageView.getProperties().get("OrderId");if (Strings.isNullOrEmpty(orderId)) {// 错误的消息,直接返回Rollback。return TransactionResolution.ROLLBACK;}return checkOrderById(orderId) ? TransactionResolution.COMMIT : TransactionResolution.ROLLBACK;}).build();//开启事务分支。final Transaction transaction;try {transaction = producer.beginTransaction();} catch (ClientException e) {e.printStackTrace();//事务分支开启失败,直接退出。return;}Message message = messageBuilder.setTopic("topic")//设置消息索引键,可根据关键字精确查找某条消息。.setKeys("messageKey")//设置消息Tag,用于消费端根据指定Tag过滤消息。.setTag("messageTag")//一般事务消息都会设置一个本地事务关联的唯一ID,用来做本地事务回查的校验。.addProperty("OrderId", "xxx")//消息体。.setBody("messageBody".getBytes()).build();//发送半事务消息final SendReceipt sendReceipt;try {sendReceipt = producer.send(message, transaction);} catch (ClientException e) {//半事务消息发送失败,事务可以直接退出并回滚。return;}/*** 执行本地事务,并确定本地事务结果。* 1. 如果本地事务提交成功,则提交消息事务。* 2. 如果本地事务提交失败,则回滚消息事务。* 3. 如果本地事务未知异常,则不处理,等待事务消息回查。**/boolean localTransactionOk = doLocalTransaction();if (localTransactionOk) {try {transaction.commit();} catch (ClientException e) {// 业务可以自身对实时性的要求选择是否重试,如果放弃重试,可以依赖事务消息回查机制进行事务状态的提交。e.printStackTrace();}} else {try {transaction.rollback();} catch (ClientException e) {// 建议记录异常信息,回滚异常时可以无需重试,依赖事务消息回查机制进行事务状态的提交。e.printStackTrace();}}}

1.4 SpringCloud集成RocketMQ

  • 1、添加依赖
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>${rocketmq.version}</version>
</dependency>
  • 2、配置RocketMQ相关信息
      示例:
spring:rocketmq:namesrvaddr: localhost:29876producerGroup:TestProducerconsumerGroup:TestConsumer
  • 3、在Java代码中进行消息的发送和消费业务的开发

二、最佳实践

2.1 生产者

2.1.1 发送消息注意事项
  • Tags的使用
      一个应用尽可能用一个Topic,而消息子类型则可以用tags来标识。tags可以由应用自由设置,只有生产者在发送消息设置了tags,消费方在订阅消息时才可以利用tags通过broker做消息过滤,如:message.setTags(“TagA”)。
  • Keys的使用
      每个消息在业务层面的唯一标识码要设置到keys字段,方便将来定位消息丢失问题。服务器会为每个消息创建索引(哈希索引),应用可以通过topic、key来查询这条消息内容,以及消息被谁消费。由于是哈希索引,请务必保证key尽可能唯一,这样可以避免潜在的哈希冲突。示例:
   // 订单Id   String orderId = "20034568923546";   message.setKeys(orderId);   
  • 日志打印
      消息发送成功或者失败要打印消息日志,务必要打印SendResult和key字段。send消息方法只要不抛异常,就代表发送成功。发送成功会有多个状态,在sendResult里定义,即之前提过的:SEND_OK、FLUSH_DISK_TIMEOUT、FLUSH_SLAVE_TIMEOUT和SLAVE_NOT_AVAILABLE。
2.1.2 消息发送失败处理方式

  Producer的send方法本身支持内部重试,重试逻辑:

  至多重试2次(同步发送为2次,异步发送为0次)。
  如果发送失败,则轮转到下一个Broker。这个方法的总耗时时间不超过sendMsgTimeout设置的值,默认10s。
  如果本身向broker发送消息产生超时异常,就不会再重试。

  以上策略也是在一定程度上保证了消息可以发送成功。如果业务对消息可靠性要求比较高,建议应用增加相应的重试逻辑:比如调用send同步方法发送失败时,则尝试将消息存储到db,然后由后台线程定时重试,确保消息一定到达Broker。

2.2 消费者

2.2.1 消费过程幂等

  RocketMQ无法避免消息重复,所以如果业务对消费重复非常敏感,务必要在业务层面进行去重处理。可以借助关系数据库进行去重。首先需要确定消息的唯一键,可以是msgId,也可以是消息内容中的唯一标识字段,例如订单Id等。在消费之前判断唯一键是否在关系数据库中存在。如果不存在则插入,并消费,否则跳过。
  msgId一定是全局唯一标识符,但是实际使用中,可能会存在相同的消息有两个不同msgId的情况(消费者主动重发、因客户端重投机制导致的重复等),这种情况就需要使业务字段进行重复消费。

2.2.2 消费打印日志

  如果消息量较少,建议在消费入口方法打印消息,消费耗时等,方便后续排查问题。

   public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {log.info("RECEIVE_MSG_BEGIN: " + msgs.toString());// TODO 正常消费过程return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}   

2.3 Broker

  • 刷盘方式
      SYNC_FLUSH(同步刷新)相比于ASYNC_FLUSH(异步处理)会损失很多性能,但是也更可靠,所以需要根据实际的业务场景做好权衡。

三、相关问题

3.1 为什么要使用消息队列

  消息队列主要有三大用途:解耦、异步、削峰。
  以电商系统的下单为例:

  • 1、解耦
      引入消息队列之前,下单完成之后,需要订单服务去调用库存服务减库存,调用营销服务加营销数据……引入消息队列之后,可以把订单完成的消息丢进队列里,下游服务自己去调用就行了,这样就完成了订单服务和其它服务的解耦合。
  • 2、异步
      订单支付之后,我们要扣减库存、增加积分、发送消息等等,这样一来这个链路就长了,链路一长,响应时间就变长了。引入消息队列,除了更新订单状态,其它的都可以异步去做,这样一来就来,就能降低响应时间。
  • 3、削峰
      例如秒杀系统,平时流量很低,但是要做秒杀活动,流量突然增大,Redis或数据库承受不了巨大的请求量。此时,可以把请求扔到队列里面,只放出我们服务能处理的流量,这样就能抗住短时间的大流量了。
  • 4、数据同步
      业务数据推送同步。
  • 5、日志处理
      日志处理是指将消息队列用在日志处理中,比如用Kafka传输日志:

  引入消息队列之后,也会带来一些问题:

  • 1、系统可用性降低
      此时就要保证消息队列的高可用。
  • 2、系统复杂度提高
      此时要保证消息不重复消费、保证消息不丢失等。
  • 3、一致性问题
      此时要保证数据在各个系统的一致性。

3.2 为什么要选择RocketMQ

  市场上几大消息队列对比:

RabbitMQActiveMQRocketMQKafka
公司RabbitApache阿里Apache
语言ErlangJavaJavaScala&Java
协议支持AMPQOpenWire、STOMP、REST、 XMPP、AMQP自定义自定义协议,社区封装了http协议支持
客户端支持语言官方支持Erlang、Java、Ruby等,社区产出多种API,几乎支持所有语言Java、C、C++、Python、PHP、Perl,.net 等Java、C++(不成熟)官方支持Java,社区产出多种API,如PHP,Python等
单击吞吐量万级万级十万级十万级
消息延迟微秒级毫秒级毫秒级毫秒以内
可用性高,基于主从架构实现可用性高,基于主从架构实现可用性非常高,分布式架构非常高,分布式架构,一个数据多副本
消息可靠性基本不丢有较低的概率丢失数据经过参数优化配置,可以做到零丢失经过参数配置,消息可以做到零丢失
功能支持基于erlang开发,所以并发性能极强,性能极好,延时低MQ领域的功能极其完备MQ功能较为完备,分布式扩展性好功能较为简单,主要支持加单MQ功能
优势erlang语言开发,性能极好、延时很低,吞吐量万级、MQ功能完备,管理界面非常好,社区活跃;互联网公司使用较多非常成熟,功能强大,在业内大量公司和项目中都有应用接口简单易用,阿里出品有保障,吞吐量大,分布式扩展方便、社区比较活跃,支持大规模的Topic、支持复杂的业务场景,可以基于源码进行定制开发超高吞吐量,ms级的时延,极高的可用性和可靠性,分布式扩展方便
劣势吞吐量较低,erlang语音开发不容易进行定制开发,集群动态扩展麻烦偶尔有较低概率丟失消息,社区活跃度不高接口不是按照标准JMS(Java Message Service,Java消息服务应用程序接口,是一个Java平台中关于面向消息中间件的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API)规范走的,有的系统迁移要修改大量的代码,技术有被抛弃的风险有可能进行消息的重复消费
应用都有使用主要用于解耦和异步,较少用在大规模吞吐的场景中用于大规模吞吐、复杂业务中在大数据的实时计算和日志采集中被大规模使用,是业界的标准
topic数量对吞吐量的影响topic 可以达到几百/几千的级别,吞吐量会有较小幅度的下降,这是 RocketMQ的一大优势,在同等机器下,可以支撑大量的 topictopic 从几十到几百个时候,吞吐量会大幅度下降,在同等机器下,Kafka尽量保证 topic 数量不要过多,如果要支撑大规模的topic,需要增加更多的机器资源

  总结:选择中间件的可以从这些维度来考虑:可靠性,性能,功能,可运维行,可拓展性,社区活跃度。目前常用的几个中间件,ActiveMQ作为“老古董”,市面上用的已经不多。

  • RabbitMQ
      优点:轻量,迅捷,容易部署和使用,拥有灵活的路由配置。
      缺点:性能和吞吐量不太理想,不易进行二次开发。
  • RocketMQ
      优点:性能好,高吞吐量,稳定可靠,有活跃的中文社区
      缺点:兼容性上不是太好。
  • Kafka
      优点:拥有强大的性能及吞吐量,兼容性很好。
      缺点:由于“攒一波再处理”导致延迟比较高。

  一般的业务系统要引入MQ,最早大家都用ActiveMQ,但是现在确实大家用的不多了,没经过大规模吞吐量场景的验证,社区也不是很活跃,不推荐。
  如果是面向用户的C端系统,具有一定的并发量,对性能也有比较高的要求,可以选择低延迟、吞吐量比较高,可用性比较好的RocketMQ。

  RabbitMQ:基于erlang开发,对消息堆积的支持并不好,当大量消息积压的时候,会导致RabbitMQ的性能急剧下降。每秒钟可以处理几万到十几万条消息。
  RocketMQ:基于Java开发,面向互联网集群化功能丰富,对在线业务的响应时延做了很多的优化,大多数情况下可以做到毫秒级的响应,每秒钟大概能处理几十万条消息
  Kafka:基于Scala开发,面向日志功能丰富,性能最高。当你的业务场景中,每秒钟消息数量没有那么多的时候,Kafka 的时延反而会比较高。所以,Kafka 不太适合在线业务场景。
  ActiveMQ:基于Java开发,简单,稳定,性能不如前面三个。小型系统用也ok,但是不推荐。

  所以中小型公司,技术实力较为一般,技术挑战不是特别高,用 RabbitMQ 是不错的选择;大型公司,基础架构研发实力较强,用 RocketMQ 是很好的选择。
  如果是大数据领域的实时计算、日志采集等场景,用 Kafka 是业内标准的,绝对没问题,社区活跃度很高,绝对不会黄,何况几乎是全世界这个领域的事实性规范。

3.3 RocketMQ有什么优缺点

  • RocketMQ优点
      单机吞吐量:十万级。
      可用性:非常高,分布式架构。
      消息可靠性:经过参数优化配置,消息可以做到0丢失。
      功能支持:MQ功能较为完善,还是分布式的,扩展性好。
      支持10亿级别的消息堆积,不会因为堆积导致性能下降。
      源码是Java,方便结合公司自己的业务二次开发。
      天生为金融互联网领域而生,对于可靠性要求很高的场景,尤其是电商里面的订单扣款,以及业务削峰,在大量交易涌入时,后端可能无法及时处理的情况。
      RoketMQ在稳定性上可能更值得信赖,这些业务场景在阿里双11已经经历了多次考验,如果你的业务有上述并发场景,建议可以选择RocketMQ。
  • RocketMQ缺点
      支持的客户端语言不多,目前是Java及c++,其中c++不成熟。
      没有在 MQ核心中去实现JMS等接口,有些系统要迁移需要修改大量代码。

3.4 消息队列有哪些消息模型

  消息队列有两种模型:队列模型和发布/订阅模型。

  • 队列模型
      最初的一种消息队列模型,对应着消息队列“发-存-收”的模型。生产者往某个队列里面发送消息,一个队列可以存储多个生产者的消息,一个队列也可以有多个消费者,但是消费者之间是竞争关系,也就是说每条消息只能被一个消费者消费。
  • 发布/订阅模型
      如果需要将一份消息数据分发给多个消费者,并且每个消费者都要求收到全量的消息。很显然,队列模型无法满足这个需求。解决的方式就是发布/订阅模型。
      在发布 - 订阅模型中,消息的发送方称为发布者(Publisher),消息的接收方称为订阅者(Subscriber),服务端存放消息的容器称为主题(Topic)。发布者将消息发送到主题中,订阅者在接收消息之前需要先“订阅主题”。“订阅”在这里既是一个动作,同时还可以认为是主题在消费时的一个逻辑副本,每份订阅中,订阅者都可以接收到主题的所有消息。

      它和 “队列模式” 的异同:生产者就是发布者,队列就是主题,消费者就是订阅者,无本质区别。不同点在于:一份消息数据是否可以被多次消费。

3.5 【RocketMQ用什么消息模型】

  RocketMQ使用的消息模型是标准的发布-订阅模型,在RocketMQ的术语表中,生产者、消费者和主题,与发布-订阅模型中的概念是完全一样的。
  RocketMQ本身的消息是由下面几部分组成:

  • Message
      Message(消息)就是要传输的信息。一条消息必须有一个主题(Topic),一条消息也可以拥有一个可选的标签(Tag)。
  • Topic
      Topic(主题)可以看做消息的归类,它是消息的第一级类型。比如一个电商系统可以分为:交易消息、物流消息等,一条消息必须有一个 Topic 。
      Topic 与生产者和消费者的关系非常松散,一个 Topic 可以有0个、1个、多个生产者向其发送消息,一个生产者也可以同时向不同的 Topic 发送消息。
      一个 Topic 也可以被 0个、1个、多个消费者订阅。
  • Tag
      Tag(标签)可以看作子主题,它是消息的第二级类型,用于为用户提供额外的灵活性。使用标签,同一业务模块不同目的的消息就可以用相同 Topic 而不同的 Tag 来标识。比如交易消息又可以分为:交易创建消息、交易完成消息等,一条消息可以没有 Tag 。
  • Group
      RocketMQ中,订阅者的概念是通过消费组(Consumer Group)来体现的。每个消费组都消费主题中一份完整的消息,不同消费组之间消费进度彼此不受影响,也就是说,一条消息被Consumer Group1消费过,也会再给Consumer Group2消费。
      消费组中包含多个消费者,同一个组内的消费者是竞争消费的关系,每个消费者负责消费组内的一部分消息。默认情况,如果一条消息被消费者Consumer1消费了,那同组的其他消费者就不会再收到这条消息。
  • Message Queue
      Message Queue(消息队列),一个 Topic 下可以设置多个消息队列,Topic 包括多个 Message Queue ,如果一个 Consumer 需要获取 Topic下所有的消息,就要遍历所有的 Message Queue。
  • Offset
      在Topic的消费过程中,由于消息需要被不同的组进行多次消费,所以消费完的消息并不会立即被删除,这就需要RocketMQ为每个消费组在每个队列上维护一个消费位置(Consumer Offset),这个位置之前的消息都被消费过,之后的消息都没有被消费过,每成功消费一条消息,消费位置就加一。
      也可以这么说,Queue 是一个长度无限的数组,Offset 就是下标。

3.6 【消息的消费模式】

  消息消费模式有两种:Clustering(集群消费)和Broadcasting(广播消费)。

  默认情况下就是集群消费,这种模式下一个消费者组共同消费一个主题的多个队列,一个队列只会被一个消费者消费,如果某个消费者挂掉,分组内其它消费者会接替挂掉的消费者继续消费。
  而广播消费消息会发给消费者组中的每一个消费者进行消费。

6.7 RoctetMQ架构


  RocketMQ一共有四个部分组成:NameServer、Broker、Producer、Consumer。

  • NameServer
      NameServer 是一个无状态的服务器,角色类似于 Kafka使用的 Zookeeper,但比 Zookeeper 更轻量。其特点:
  • 每个NameServer结点之间是相互独立,彼此没有任何信息交互。
  • Nameserver被设计成几乎是无状态的,通过部署多个结点来标识自己是一个伪集群,Producer 在发送消息前从 NameServer 中获取 Topic 的路由信息也就是发往哪个 Broker,Consumer 也会定时从 NameServer 获取 Topic 的路由信息,Broker 在启动时会向 NameServer 注册,并定时进行心跳连接,且定时同步维护的 Topic 到 NameServer。

  NameServer功能主要有两个:

1、和Broker结点保持长连接。
2、维护Topic的路由信息。

  • Broker
      消息存储和中转角色,负责存储和转发消息。
      Broker 内部维护着一个个 Consumer Queue,用来存储消息的索引,真正存储消息的地方是 CommitLog(日志文件)。
      单个 Broker 与所有的 Nameserver 保持着长连接和心跳,并会定时将 Topic 信息同步到 NameServer,和 NameServer 的通信底层是通过 Netty 实现的。
  • Producer
      消息生产者,Producer由用户进行分布式部署,消息由Producer通过多种负载均衡模式发送到Broker集群,发送低延时,支持快速失败。
      Producer的负载均衡是由MQFaultStratege.selectOneMessageQueue()来实现的。这个方法就是随机选择一个要发送消息的broker来达到负载均衡的效果,选择的标准:尽量不选刚刚选过的broker,尽量不选发送上条消息延迟过高或没有响应的broker,也就是找到一个可用的broker。
      RocketMQ 提供了三种方式发送消息:同步、异步和单向
  • 同步发送:同步发送指消息发送方发出数据后会在收到接收方发回响应之后才发下一个数据包。一般用于重要通知消息,例如重要通知邮件、营销短信。
  • 异步发送:异步发送指发送方发出数据后,不等接收方发回响应,接着发送下个数据包,一般用于可能链路耗时较长而对响应时间敏感的业务场景,例如用户视频上传后通知启动转码服务。
  • 单向发送:单向发送是指只负责发送消息而不等待服务器回应且没有回调函数触发,适用于某些耗时非常短但对可靠性要求并不高的场景,例如日志收集。
  • Consumer
      消息消费者,负责消费消息,一般是后台系统负责异步消费。
      Consumer也由用户部署,支持PUSH和PULL两种消费模式,支持集群消费和广播消费,提供实时的消息订阅机制。
      Pull:拉取型消费者(Pull Consumer)主动从消息服务器拉取信息,只要批量拉取到消息,用户应用就会启动消费过程,所以 Pull 称为主动消费型。
      Push:推送型消费者(Push Consumer)封装了消息的拉取、消费进度和其他的内部维护工作,将消息到达时执行的回调接口留给用户应用程序来实现。所以 Push 称为被动消费类型,但其实从实现上看还是从消息服务器中拉取消息,不同于 Pull 的是 Push 首先要注册消费监听器,当监听器处触发后才开始消费消息。
      RocketMQ是拉模式。

3.8 【如何保证消息的可靠性(不丢失)】

  消息可能会在这三个阶段发生丢失:生产阶段、存储阶段、消费阶段。所以要从这三个阶段考虑:

  • 生产阶段
      在生产阶段,主要通过请求确认机制,来保证消息的可靠传递。
      1、同步发送的时候,要注意处理响应结果和异常。如果返回响应OK,表示消息成功发送到了Broker,如果响应失败,或者发生其它异常,都应该重试。
      2、异步发送的时候,应该在回调方法里检查,如果发送失败或者异常,都应该进行重试。
      3、如果发生超时的情况,也可以通过查询日志的API,来检查是否在Broker存储成功。
  • 存储阶段
      可以通过配置可靠性优先的 Broker 参数来避免因为宕机丢消息,简单说就是可靠性优先的场景都应该使用同步。
      1、消息只要持久化到CommitLog(日志文件)中,即使Broker宕机,未消费的消息也能重新恢复再消费。
      2、Broker的刷盘机制:同步刷盘和异步刷盘,不管哪种刷盘都可以保证消息一定存储在pagecache中(内存中),但是同步刷盘更可靠,它是Producer发送消息后等数据持久化到磁盘之后再返回响应给Producer。
      3、Broker通过主从模式来保证高可用,Broker支持Master和Slave同步复制、Master和Slave异步复制模式,生产者的消息都是发送给Master,但是消费既可以从Master消费,也可以从Slave消费。同步复制模式可以保证即使Master宕机,消息肯定在Slave中有备份,保证了消息不会丢失。
  • 消费阶段
      Consumer保证消息成功消费的关键在于确认的时机,不要在收到消息后就立即发送消费确认,而是应该在执行完所有消费业务逻辑之后,再发送消费确认。因为消息队列维护了消费的位置,逻辑执行失败了,没有确认,再去队列拉取消息,就还是之前的一条。

3.9 【如何处理消息重复的问题】

  对分布式消息队列来说,同时做到确保一定投递和不重复投递是很难的,就是所谓的“有且仅有一次” 。RocketMQ择了确保一定投递,保证消息不丢失,但有可能造成消息重复。
  处理消息重复问题,主要由业务端自己保证,主要的方式有两种:业务幂等和消息去重。

  • 业务幂等
      第一种是保证消费逻辑的幂等性,也就是多次调用和一次调用的效果是一样的。这样一来,不管消息消费多少次,对业务都没有影响。常用的解决方案有:
      1、唯一索引,防止新增脏数据。比如:支付宝的资金账户,支付宝也有用户账户,每个用户只能有一个资金账户,怎么防止给用户创建资金账户多个,那么给资金账户表中的用户ID加唯一索引,所以一个用户新增成功一个资金账户记录。要点:唯一索引或唯一组合索引来防止新增数据存在脏数据(当表存在唯一索引,并发时新增报错时,再查询一次就可以了,数据应该已经存在了,返回结果即可)。
      2、token机制,防止页面重复提交。业务要求: 页面的数据只能被点击提交一次;发生原因: 由于重复点击或者网络重发,或者nginx重发等情况会导致数据被重复提交;解决办法: 集群环境采用token加redis(redis单线程的,处理需要排队);单JVM环境:采用token加redis或token加jvm内存。处理流程:1)数据提交前要向服务的申请token,token放到redis或jvm内存,token有效时间;2)提交后后台校验token,同时删除token,生成新的token返回。
      3、分布式锁——还是拿插入数据的例子,如果是分布是系统,构建全局唯一索引比较困难,例如唯一性的字段没法确定,这时候可以引入分布式锁,通过第三方的系统(redis或zookeeper),在业务系统插入数据或者更新数据,获取分布式锁,然后做操作,之后释放锁,这样其实是把多线程并发的锁的思路,引入多多个系统,也就是分布式系统中得解决思路。

  • 消息去重
      第二种是业务端,对重复的消息就不再消费了。这种方法,需要保证每条消息都有一个唯一的编号,通常是业务相关的,比如订单号,消费的记录需要落库,而且需要保证和消息确认这一步的原子性。
      具体做法是可以建立一个消费记录表,拿到消息后先检查记录表里是否已经有消费过的记录,没有的再消费。这个消费记录表里要有一个唯一的ID,比如订单ID等。

3.10 怎么处理消息积压

  发生了消息积压,这时候就得想办法赶紧把积压的消息消费完,就得考虑提高消费能力,一般有两种办法:

  消费者扩容:如果当前Topic的Message Queue的数量大于消费者数量,就可以对消费者进行扩容,增加消费者,来提高消费能力,尽快把积压的消息消费玩。
  消息迁移Queue扩容:如果当前Topic的Message Queue的数量小于或者等于消费者数量,这种情况,再扩容消费者就没什么用,就得考虑扩容Message Queue。可以新建一个临时的Topic,临时的Topic多设置一些Message Queue,然后先用一些消费者把消费的数据丢到临时的Topic,因为不用业务处理,只是转发一下消息,还是很快的。接下来用扩容的消费者去消费新的Topic里的数据,消费完了之后,恢复原状。

3.11 顺序消息如何实现

  顺序消息是指消息的消费顺序和产生顺序相同,在有些业务逻辑下,必须保证顺序,比如订单的生成、付款、发货,这个消息必须按顺序处理才行。
  顺序消息分为全局顺序消息和部分顺序消息:

  全局顺序消息:某个Topic下的所有消息都要保证顺序;
  部分顺序消息:只要保证每一组消息被顺序消费即可,比如订单消息,只要保证同一个订单 ID 个消息能按顺序消费即可。

  • 部分顺序消息
      部分顺序消息相对比较好实现,生产端需要做到把同 ID 的消息发送到同一个 Message Queue ;在消费过程中,要做到从同一个Message Queue读取的消息顺序处理——消费端不能并发处理顺序消息,这样才能达到部分有序。
      由producer发送到broker的消息队列是满足FIFO的,所以发送是顺序的,单个queue里的消息是顺序的。多个Queue同时消费是无法绝对保证消息的有序性的。所以,同一个topic,同一个queue,发消息的时候一个线程发送消息,消费的时候一个线程去消费一个queue里的消息。
  • 全局顺序消息
      RocketMQ 默认情况下不保证顺序,比如创建一个 Topic ,默认八个写队列,八个读队列,这时候一条消息可能被写入任意一个队列里;在数据的读取过程中,可能有多个 Consumer ,每个 Consumer 也可能启动多个线程并行处理,所以消息被哪个 Consumer 消费,被消费的顺序和写人的顺序是否一致是不确定的。
      要保证全局顺序消息, 需要先把 Topic 的读写队列数设置为 一,然后Producer Consumer 的并发设置,也要是一。简单来说,为了保证整个 Topic全局消息有序,只能消除所有的并发处理,各部分都设置成单线程处理 ,这时候就完全牺牲RocketMQ的高并发、高吞吐的特性了。

3.12 如何实现消息过滤

  有两种方案:
  一种是在 Broker 端按照 Consumer 的去重逻辑进行过滤,这样做的好处是避免了无用的消息传输到 Consumer 端,缺点是加重了 Broker 的负担,实现起来相对复杂。
  另一种是在 Consumer 端过滤,比如按照消息设置的 tag 去重,这样的好处是实现起来简单,缺点是有大量无用的消息到达了 Consumer 端只能丢弃不处理。

一般采用Cosumer端过滤,如果希望提高吞吐量,可以采用Broker过滤。

  对消息的过滤有三种方式:

  1、根据Tag过滤:这是最常见的一种,用起来高效简单。

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");

  2、SQL 表达式过滤:SQL表达式过滤更加灵活。

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
// 只有订阅的消息有这个属性a, a >=0 and a <= 3
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");
consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
});
consumer.start();

  3、Filter Server 方式:最灵活,也是最复杂的一种方式,允许用户自定义函数进行过滤。

3.13 延时消息

  电商的订单超时自动取消,就是一个典型的利用延时消息的例子,用户提交了一个订单,就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。
  RocketMQ是支持延时消息的,只需要在生产消息的时候设置消息的延时级别:

// 实例化一个生产者来产生延时消息
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
// 启动生产者
producer.start();
int totalMessagesToSend = 100;
for (int i = 0; i < totalMessagesToSend; i++) {Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());// 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)message.setDelayTimeLevel(3);// 发送消息producer.send(message);
}

  目前RocketMQ支持的延时级别是有限的:

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
  • RocketMQ怎么实现延时消息的
      临时存储+定时任务。
      Broker收到延时消息了,会先发送到主题(SCHEDULE_TOPIC_XXXX)的相应时间段的Message Queue中,然后通过一个定时任务轮询这些队列,到期后,把消息投递到目标Topic的队列中,然后消费者就可以正常消费这些消息。

3.14 【怎么实现分布式消息事务的】

  半消息:是指暂时还不能被 Consumer 消费的消息,Producer 成功发送到 Broker 端的消息,但是此消息被标记为 “暂不可投递” 状态,只有等 Producer 端执行完本地事务后经过二次确认了之后,Consumer 才能消费此条消息。
  依赖半消息,可以实现分布式消息事务,其中的关键在于二次确认以及消息回查:

  • 1、Producer 向 broker 发送半消息。
  • 2、Producer 端收到响应,消息发送成功,此时消息是半消息,标记为 “不可投递” 状态,Consumer 消费不了。
  • 3、Producer 端执行本地事务。
  • 4、正常情况本地事务执行完成,Producer 向 Broker 发送 Commit/Rollback,如果是 Commit,Broker 端将半消息标记为正常消息,Consumer 可以消费,如果是 Rollback,Broker 丢弃此消息。
  • 5、异常情况,Broker 端迟迟等不到二次确认。在一定时间后,会查询所有的半消息,然后到 Producer 端查询半消息的执行情况。
  • 6、Producer 端查询本地事务的状态。
  • 7、根据事务的状态提交 commit/rollback 到 broker 端。(5,6,7 是消息回查)。
  • 8、消费者段消费到消息之后,执行本地事务,执行本地事务。

3.15 死信队列

  死信队列用于处理无法被正常消费的消息,即死信消息。
  当一条消息初次消费失败,消息队列 RocketMQ 会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 RocketMQ 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中,该特殊队列称为死信队列。
  死信消息的特点:

  不会再被消费者正常消费。
  有效期与正常消息相同,均为 3 天。3 天后会被自动删除。因此,需要在死信消息产生后的 3 天内及时处理。

  死信队列的特点:

  一个死信队列对应一个 Group ID, 而不是对应单个消费者实例。
  如果一个 Group ID 未产生死信消息,消息队列 RocketMQ 不会为其创建相应的死信队列。
  一个死信队列包含了对应 Group ID 产生的所有死信消息,不论该消息属于哪个 Topic。

  RocketMQ 控制台提供对死信消息的查询、导出和重发的功能。

3.16 如何保证RocketMQ的高可用

  NameServer因为是无状态,且不相互通信的,所以只要集群部署就可以保证高可用。
  RocketMQ的高可用主要是在体现在Broker的读和写的高可用,Broker的高可用是通过集群和主从实现的

  Broker可以配置两种角色:Master和Slave,Master角色的Broker支持读和写,Slave角色的Broker只支持读,Master会向Slave同步消息。也就是说Producer只能向Master角色的Broker写入消息,Cosumer可以从Master和Slave角色的Broker读取消息。
  Consumer 的配置文件中,并不需要设置是从 Master 读还是从 Slave读,当 Master 不可用或者繁忙的时候, Consumer 的读请求会被自动切换到从 Slave。有了自动切换 Consumer 这种机制,当一个 Master 角色的机器出现故障后,Consumer 仍然可以从 Slave 读取消息,不影响 Consumer 读取消息,这就实现了读的高可用。
  如何达到发送端写的高可用性呢?在创建 Topic 的时候,把 Topic 的多个Message Queue 创建在多个 Broker 组上(相同 Broker 名称,不同 brokerId机器组成 Broker 组),这样当 Broker 组的 Master 不可用后,其他组Master 仍然可用, Producer 仍然可以发送消息 RocketMQ 目前还不支持把Slave自动转成 Master ,如果机器资源不足,需要把 Slave 转成 Master ,则要手动停止 Slave 色的 Broker ,更改配置文件,用新的配置文件启动 Broker。

3.17 【RocketMQ的整体工作流程】

  简单来说,RocketMQ是一个分布式消息队列,也就是消息队列+分布式系统。
  作为消息队列,它是发-存-收的一个模型,对应的就是Producer、Broker、Cosumer;作为分布式系统,它要有服务端、客户端、注册中心,对应的就是Broker、Producer/Consumer、NameServer。

  • 1、Broker在启动的时候去向所有的NameServer注册,并保持长连接,每30s发送一次心跳。
  • 2、Producer在发送消息的时候从NameServer获取Broker服务器地址,根据负载均衡算法选择一台服务器来发送消息。
  • 3、Conusmer消费消息的时候同样从NameServer获取Broker地址,然后主动拉取消息来消费。

3.18 为什么RocketMQ不使用Zookeeper作为注册中心

CAP理论,指的是在一个分布式系统中,Consistency(一致性)、Availability(可用性)、Partition Tolerance(分区容错性),不能同时成立。

  Kafka采用Zookeeper作为注册中心——当然也开始逐渐去Zookeeper,RocketMQ不使用Zookeeper其实主要可能从这几方面来考虑:

  • 1、基于可用性的考虑
      根据CAP理论,同时最多只能满足两个点。Zookeeper满足的是CP,也就是说Zookeeper并不能保证服务的可用性,Zookeeper在进行选举的时候,整个选举的时间太长,期间整个集群都处于不可用的状态,而这对于一个注册中心来说肯定是不能接受的,作为服务发现来说就应该是为可用性而设计。
  • 2、基于性能的考虑
      NameServer本身的实现非常轻量,而且可以通过增加机器的方式水平扩展,增加集群的抗压能力,而Zookeeper的写是不可扩展的,Zookeeper要解决这个问题只能通过划分领域,划分多个Zookeeper集群来解决,首先操作起来太复杂,其次这样还是又违反了CAP中的A的设计,导致服务之间是不连通的。
  • 3、持久化的机制来带的问题
      ZooKeeper 的 ZAB 协议对每一个写请求,会在每个 ZooKeeper 节点上保持写一个事务日志,同时再加上定期的将内存数据镜像(Snapshot)到磁盘来保证数据的一致性和持久性,而对于一个简单的服务发现的场景来说,这其实没有太大的必要,这个实现方案太重了。而且本身存储的数据应该是高度定制化的。
  • 4、消息发送应该弱依赖注册中心
      RocketMQ的设计理念也正是基于此,生产者在第一次发送消息的时候从NameServer获取到Broker地址后缓存到本地,如果NameServer整个集群不可用,短时间内对于生产者和消费者并不会产生太大影响。

3.19 Broker是怎么保存数据的

  RocketMQ主要的存储文件包括CommitLog文件、ConsumeQueue文件、Indexfile文件。

  • CommitLog
      消息主体以及元数据的存储主体,存储Producer端写入的消息主体内容,消息内容不是定长的。单个文件大小默认1G, 文件名长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件。
      CommitLog文件保存于${Rocket_Home}/store/commitlog目录中。
  • ConsumeQueue
      消息消费队列,引入的目的主要是提高消息消费的性能,由于RocketMQ是基于主题topic的订阅模式,消息消费是针对主题进行的,如果要遍历commitlog文件中根据topic检索消息是非常低效的。
      Consumer即可根据ConsumeQueue来查找待消费的消息。其中,ConsumeQueue(逻辑消费队列)作为消费消息的索引,保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。
      ConsumeQueue文件可以看成是基于Topic的CommitLog索引文件,故ConsumeQueue文件夹的组织方式如下:topic/queue/file三层组织结构,具体存储路径为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同样ConsumeQueue文件采取定长设计,每一个条目共20个字节,分别为8字节的CommitLog物理偏移量、4字节的消息长度、8字节tag hashcode,单个文件由30W个条目组成,可以像数组一样随机访问每一个条目,每个ConsumeQueue文件大小约5.72M。
  • IndexFile
      IndexFile(索引文件)提供了一种可以通过key或时间区间来查询消息的方法。Index文件的存储位置是:{fileName},文件名fileName是以创建时的时间戳命名的,固定的单个IndexFile文件大小约为400M,一个IndexFile可以保存 2000W个索引,IndexFile的底层存储设计为在文件系统中实现HashMap结构,故RocketMQ的索引文件其底层实现为hash索引。
  • 总结
      RocketMQ采用的是混合型的存储结构,即为Broker单个实例下所有的队列共用一个日志数据文件(即为CommitLog)来存储。
      RocketMQ的混合型存储结构(多个Topic的消息实体内容都存储于一个CommitLog中)针对Producer和Consumer分别采用了数据和索引部分相分离的存储结构,Producer发送消息至Broker端,然后Broker端使用同步或者异步的方式对消息刷盘持久化,保存至CommitLog中。
      只要消息被刷盘持久化至磁盘文件CommitLog中,那么Producer发送的消息就不会丢失。正因为如此,Consumer也就肯定有机会去消费这条消息。当无法拉取到消息后,可以等下一次消息拉取,同时服务端也支持长轮询模式,如果一个消息拉取请求未拉取到消息,Broker允许等待30s的时间,只要这段时间内有新消息到达,将直接返回给消费端。
      RocketMQ的具体做法是,使用Broker端的后台服务线程—ReputMessageService不停地分发请求并异步构建ConsumeQueue(逻辑消费队列)和IndexFile(索引文件)数据。

3.20 RocketMQ怎么对文件进行读写的

  RocketMQ对文件的读写巧妙地利用了操作系统的一些高效文件读写方式——PageCache、顺序读写、零拷贝。

  • 顺序读写
      磁盘的顺序读写性能要远好于随机读写。因为每次从磁盘读数据时需要先寻址,找到数据在磁盘上的物理位置。对于机械硬盘来说,就是移动磁头,会消耗时间。顺序读写相比于随机读写省去了大部分的寻址时间,它只需要寻址一次就可以连续读写下去,所以性能比随机读写好很多。
      RocketMQ 利用了这个特性。它所有的消息数据都存放在一个无限增长的文件队列 CommitLog 中,CommitLog 是由一组 1G 内存映射文件队列组成的。写入时就从一个固定位置一直写下去,一个文件写满了就开启一个新文件顺序读写下去。
  • 页缓存
      页缓存(PageCache)是OS对文件的缓存,用于加速对文件的读写。一般来说,程序对文件进行顺序读写的速度几乎接近于内存的读写速度,主要原因就是由于OS使用PageCache机制对读写访问操作进行了性能优化,将一部分的内存用作PageCache。对于数据的写入,OS会先写入至Cache内,随后通过异步的方式由pdflush内核线程将Cache内的数据刷盘至物理磁盘上。对于数据的读取,如果一次读取文件时出现未命中PageCache的情况,OS从物理磁盘上访问读取文件的同时,会顺序对其他相邻块的数据文件进行预读取。
  • 零拷贝
      RocketMQ主要通过MappedByteBuffer对文件进行读写操作。其中,利用了NIO中的FileChannel模型将磁盘上的物理文件直接映射到用户态的内存地址中(这种Mmap的方式减少了传统IO,将磁盘文件数据在操作系统内核地址空间的缓冲区,和用户应用程序地址空间的缓冲区之间来回进行拷贝的性能开销),将对文件的操作转化为直接对内存地址进行操作,从而极大地提高了文件的读写效率(正因为需要使用内存映射机制,故RocketMQ的文件存储都使用定长结构来存储,方便一次将整个文件映射至内存)。
      在操作系统中,使用传统的方式,数据需要经历几次拷贝,还要经历用户态/内核态切换。
  1. 从磁盘复制数据到内核态内存;
  2. 从内核态内存复制到用户态内存;
  3. 然后从用户态内存复制到网络驱动的内核态内存;
  4. 最后是从网络驱动的内核态内存复制到网卡中进行传输。

  可以通过零拷贝的方式,减少用户态与内核态的上下文切换和内存拷贝的次数,用来提升I/O的性能。零拷贝比较常见的实现方式是mmap,这种机制在Java中是通过MappedByteBuffer实现的。

3.21 消息刷盘怎么实现的

  RocketMQ提供了两种刷盘策略:
  同步刷盘:在消息达到Broker的内存之后,必须刷到commitLog日志文件中才算成功,然后返回Producer数据已经发送成功。
  异步刷盘:异步刷盘是指消息达到Broker内存后就返回Producer数据已经发送成功,会唤醒一个线程去将数据持久化到CommitLog日志文件中。

3.22 什么时候清理过期消息

  4.6版本默认48(默认是72,但是broker配置文件默认改成了48,所以新版本都是48)小时后会删除不再使用的CommitLog文件。

检查这个文件最后访问时间。
判断是否大于过期时间。
指定时间删除,默认凌晨4点。

3.23 【RocketMQ的负载均衡是如何实现的】

  RocketMQ是分布式消息服务,负载均衡是在生产和消费的客户端完成的

  • 生产者的负载均衡
      实质是在选择MessageQueue对象(内部包含了brokerName和queueId),第一种是默认策略,从MessageQueue列表中随机选择一个,算法时通过自增随机数对列表打下取余得到位置信息,但获得的MessageQueue所在集群不能是上次失败集群。第二种是超时容忍策略,先随机选择一个MessageQueue,如果因为超时等异常发送失败,会优先选择该broker集群下其他MessageQueue发送,如果没找到就从之前发送失败的Broker集群中选一个进行发送,若还没有找到才使用默认策略。
  • 消费者的负载均衡
      可选的有六种算法。
      1、平均分配算法。

      2、环形算法。

      3、指定机房算法。
      4、就近机房算法。
      5、统一哈希算法。使用一致性哈希算法进行负载,每次负载都会重建一致性hash路由表,获取本地客户端负责的所有队列信息。默认的hash算法为MD5,假设有4个消费者客户端和2个消息队列mq1和mq2,通过hash后分布在hash环的不同位置,按照一致性hash的顺时针查找原则,mq1被client2消费,mq2被client3消费。

      6、手动配置算法。

3.24 消息队列设计成推消息还是拉消息

  • 推拉模式
      推拉模式的时候指的是Comsumer和Broker之间的交互。
      默认的认为Producer与Broker之间就是推的方式,即Producer将消息推送给Broker,而不是Broker主动去拉取消息。
  • 推模式
      推模式指的是消息从Broker推向Consumer,即Consumer被动的接收消息,由Broker来主导消息的发送。
    【推模式的优点】
      1、消息实时性高, Broker 接受完消息之后可以立马推送给 Consumer。
      2、对于消费者使用来说简单。
    【推模式的缺点】
      推送速率难以适应消费速率,推模式的目标就是以最快的速度推送消息,当生产者往 Broker 发送消息的速率大于消费者消费消息的速率时,随着时间的增长消费者那边可能就“爆仓”了,因为根本消费不过来。
      并且不同的消费者的消费速率还不一样,身为Broker很难平衡每个消费者的推送速率,如果要实现自适应的推送速率,那就需要在推送的时候消费者告诉Broker ,然后Broker需要维护每个消费者的状态进行推送速率的变更。这增加了Broker自身的复杂度。所以说推模式难以根据消费者的状态控制推送速率,适用于消息量不大、消费能力强要求实时性高的情况
  • 拉模式
      拉模式指的是Consumer主动向Broker请求拉取消息,即Broker被动的发送消息给Consumer。
      拉模式主动权就在消费者身上了,消费者可以根据自身的情况来发起拉取消息的请求。假设当前消费者觉得自己消费不过来了,它可以根据一定的策略停止拉取,或者间隔拉取都行。
      拉模式下Broker就相对轻松了,它只管存生产者发来的消息,至于消费的时候自然由消费者主动发起,来一个请求就给它消息,从哪开始拿消息,拿多少消费者都告诉它。
      拉模式可以更合适的进行消息的批量发送,基于推模式可以来一个消息就推送,也可以缓存一些消息之后再推送,但是推送的时候其实不知道消费者到底能不能一次性处理这么多消息。而拉模式就更加合理,它可以参考消费者请求的信息来决定缓存多少消息之后批量发送。
    【拉模式的缺点】
      1)消息延迟。毕竟是消费者去拉取消息,但是消费者怎么知道消息到了呢?所以它只能不断地拉取,但是又不能很频繁地请求,太频繁了就变成消费者在攻击 Broker 了。因此需要降低请求的频率,比如隔个2 秒请求一次,你看着消息就很有可能延迟 2 秒了。
      2)消息忙请求,忙请求就是比如消息隔了几个小时才有,那么在几个小时之内消费者的请求都是无效的,在做无用功。
  • 推拉模式如何选择
       RocketMQ 和Kafka都选择了拉模式,当然业界也有基于推模式的消息队列如 ActiveMQ。
      拉模式可能适用性更好些,因为现在的消息队列都有持久化消息的需求,也就是说本身它就有个存储功能,它的使命就是接受消息,保存好消息使得消费者可以消费消息即可。

3.25 如何设计一个消息队列*

  需要明确地提出消息中间件的几个重要角色,分别是生产者、消费者、Broker、注册中心。
  简述下消息中间件数据流转过程:无非就是生产者生成消息,发送至Broker,Broker可以暂缓消息,然后消费者再从Broker获取消息,用于消费。
  注册中心用于服务的发现包括:Broker的发现、生产者的发现、消费者的发现,当然还包括下线,可以说服务的高可用离不开注册中心。
  然后开始简述实现要点,可以同通信讲起:各模块的通信可以基于Netty然后自定义协议来实现。注册中心可以利用zookeeper、consul、eureka、nacos等等,也可以像RocketMQ自己实现简单的nameserver。
  为了考虑扩容和整体的性能,采用分布式的思想,像Kafka一样采取分区理念,一个Topic分为多个partition。并且为保证数据可靠性,采取多副本存储,即Leader和follower,根据性能和数据可靠的权衡提供异步和同步的刷盘存储。
  并且利用选举算法保证Leader挂了之后Follower可以顶上,保证消息队列的高可用
  也同样为了提高消息队列的可靠性利用本地文件系统来存储消息,并且采用顺序写的方式来提高性能。也可以根据消息队列的特性利用内存映射、零拷贝进一步的提升性能。

3.26 RocketMQ消息体过大的解决方案

  官方定义消息体默认大小为 4MB,普通顺序消息类型。事务、定时、延时类消息默认大小为64KB。如果超过限制则会抛出异常

  • 方案一:消息压缩
      通常我们都是传递json消息数据,然后底层使用字节流进行传输。如果此时json数据超过4MB,则可以考虑进行消息压缩。原理其实很好理解,比如我们经常使用的压缩包,可以把大文件进行压缩,依次减小文件大小。那么我们这里需要使用到的就是字符压缩,把json字符串进行压缩,然后进行传输。
      消息压缩示例:
			// 创建消息,指定 Topic、Tag 和消息体Message msg = new Message("CompressedTopic", "TagA", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));// 设置消息压缩级别,可选值为 0~9,其中 0 表示不压缩//级别越高压缩效果越好,但压缩和解压缩的时间可能会更长msg.setProperty(MessageConst.PROPERTY_MESSAGE_COMPRESS_LEVEL, "4");
  • 方案二:消息分割
      简而言之,就是把一个大消息体,进行分割成多个小消息体进行传输。

  一个大消息分割成多个小消息。
  多个小消息拥有相同的消息标识,如UUID。
  分割后小消息需要有一些元数据来标识自己,如 消息标识、一共分割了多少个、自己是第几个。
  传输后,消费者消费,然后根据元数据进行数据聚合还原。
  将还原后的消息走正常消费流程即可。

3.27 【如何保证幂等性】

  • 1、全局唯一id
      如果使用全局唯一ID,就是根据业务的操作和内容生成一个全局ID,在执行操作前先根据这个全局唯一ID是否存在,来判断这个操作是否已经执行。如果不存在则把全局ID存储到存储系统中,比如数据库、redis等。如果存在则表示该方法已经执行。
      使用全局ID做幂等可以作为一个业务的基础的微服务存在,在很多的微服务中都会用到这样的服务,在每个微服务中都完成这样的功能,会存在工作量重复。另外打造一个高可靠的幂等服务还需要考虑很多问题,比如一台机器虽然把全局ID先写入了存储,但是在写入之后挂了,这就需要引入全局ID的超时机制。
      使用全局唯一ID是一个通用方案,可以支持插入、更新、删除业务操作。但是这个方案看起来很美但是实现起来比较麻烦,下面的方案适用于特定的场景,实现起来比较简单。
  • 2、去重表
      这种方法适用于在业务中有唯一标的插入场景中,比如在支付场景中,如果一个订单只会支付一次,所以订单ID可以作为唯一标识。这时,我们就可以建一张去重表,并且把唯一标识作为唯一索引,在我们实现时,把创建支付单据和写入去重表,放在一个事务中,如果重复创建,数据库会抛出唯一约束异常,操作就会回滚。
  • 3、插入或更新
      这种方法插入并且有唯一索引的情况,比如我们要关联商品类,其中商品的ID和品类的ID可以构成唯一索引,并且在数据表中也增加了唯一索引。这时就可以使用InsertOrUpdate操作。在mysql数据库中如下:
insert into goods_category (goods_id,category_id,create_time,update_time)values(#{goodsId},#{categoryId},now(),now())on DUPLICATE KEY UPDATEupdate_time=now()
  • 4、多版本控制
      这种方法适合在更新的场景中,比如我们要更新商品的名字,这时我们就可以在更新的接口中增加一个版本号,来做幂等。
boolean updateGoodsName(int id,String newName,int version);

  在实现时可以如下:

update goods set name=#{newName},version=#{version} where id=#{id} and version<${version}
  • 5、状态机控制
      这种方法适合在有状态机流转的情况下,比如就会订单的创建和付款,订单的付款肯定是在之前,这时我们可以通过在设计状态字段时,使用int类型,并且通过值类型的大小来做幂等,比如订单的创建为0,付款成功为100。付款失败为99。在设计状态字段时,使用int类型,并且通过值类型的大小来做幂等,比如订单的创建为0,付款成功为100。付款失败为99。在做状态机更新时,这可以这样控制:
update `order` set status=#{status} where id=#{id} and status<#{status}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/498296.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

产品原型设计

&#x1f923;&#x1f923;目录&#x1f923;&#x1f923; 一、Axure原型设计&#xff08;Axure RP 9 &#xff09;1.1 软件下载安装1.2 产品原型展示1.3 产品原型下载1.4 视频课程推荐 二、磨刀原型设计2.1 软件下载安装2.2 产品原型展示2.3 产品原型下载2.4 视频课程推荐 什…

C++ 编译过程全解析:从源码到可执行文件的蜕变之旅

引言 C 作为一种广泛应用于系统开发、游戏编程、嵌入式系统等领域的高级编程语言&#xff0c;其代码需要经过编译才能转换为计算机可执行的机器语言。编译过程涵盖多个复杂阶段&#xff0c;每个阶段对最终生成的可执行文件的性能、稳定性及兼容性都有着深远影响。深入理解 C 编…

【漏洞复现】NetMizer 日志管理系统 hostdelay.php 前台RCE漏洞复现

免责声明 请勿利用文章内的相关技术从事非法测试,由于传播、利用此文所提供的信息而造成的任何直接或者间接的后果及损失,均由使用者本人负责,作者不为此承担任何责任。工具来自网络,安全性自测,如有侵权请联系删除。本次测试仅供学习使用,如若非法他用,与平台和本文作…

hive-sql 连续登录五天的用户

with tmp as (select 梁牧泽 as uid, 2023-03-03 as dt union allselect 梁牧泽 as uid, 2023-03-04 as dt union allselect 梁牧泽 as uid, 2023-03-05 as dt union allselect 梁牧泽 as uid, 2023-03-07 as dt union allselect 梁牧泽 as uid, 2023-03-08 as dt union allsel…

【运维】部署MKDocs

部署MKDocs obsidian 记录笔记&#xff0c;通过 mkdocs 私有化部署。 1 使用MKDocs创建笔记 创建仓库&#xff0c;安装 Material for MkDocs 和 mkdocs-minify-plugin mkdir tmp cd tmp git initpip install mkdocs-material pip install mkdocs-minify-pluginmkdocs new .2 …

Linux的进程替换以及基础IO

进程替换 上一篇草率的讲完了进程地址空间的组成结构和之间的关系&#xff0c;那么我们接下来了解一下程序的替换。 首先&#xff0c;在进程部分我们提过了&#xff0c;其实文件可以在运行时变成进程&#xff0c;而我们使用的Linux软件其实也是一个进程&#xff0c;所以进一步…

C#编写的金鱼趣味小应用 - 开源研究系列文章

今天逛网&#xff0c;在GitHub中文网上发现一个源码&#xff0c;里面有这个金鱼小应用&#xff0c;于是就下载下来&#xff0c;根据自己的C#架构模板进行了更改&#xff0c;最终形成了这个例子。 1、 项目目录&#xff1b; 2、 源码介绍&#xff1b; 1) 初始化&#xff1b; 将样…

cloudns二级免费域名python更新ipv6 dns记录

没找到api&#xff0c;托管到cloudflare也不行。就只能写代码了&#xff08;只写了更新和添加单条ipv6记录&#xff09; 需要修改的地方 请求头的cookies填自己的 data里的zone填自己的 import requests from lxml import etree host#子域名 cookies填自己的 zone自己域名的 …

机器学习作业 | 泰坦尼克号生存的预测任务

泰坦尼克号生存的预测任务 学校作业&#xff0c;我来水一水 环境&#xff1a;pycharmanaconda虚拟环境 文章目录 泰坦尼克号生存的预测任务0.环境搭建参考&#xff1a;1 目的与要求2 任务背景3 任务简介4 模型介绍1.决策树&#xff08;Decision Tree&#xff09;2.朴素贝叶斯…

【pytorch】conda安装pytorch

Step 1 打开官网&#xff1a; https://pytorch.org/get-started/locally/ 进行选择对应版本&#xff1a; 复制图中命令执行。 Step 2 验证是否安装成功。 执行&#xff1a; import torch print(torch.cuda.is_available()) print(torch.cuda.device_count()) print(torch.…

项目开发实践——基于SpringBoot+Vue3实现的在线考试系统(四)

文章目录 一、管理员角色功能实现1、添加教师功能实现1.1 页面设计1.2 前端功能实现1.3 后端功能实现1.4 效果展示2、教师管理功能实现2.1 页面设计2.2 前端功能实现2.3 后端功能实现2.3.1 后端查询接口实现2.3.2 后端编辑接口实现2.3.3 后端删除接口实现2.4 效果展示二、代码下…

基于16QAM的载波同步和定时同步性能仿真,采用四倍采样,包括Costas环和gardner环

目录 1.算法仿真效果 2.算法涉及理论知识概要 3.MATLAB核心程序 4.完整算法代码文件获得 1.算法仿真效果 matlab2022a仿真结果如下&#xff08;完整代码运行后无水印&#xff09;&#xff1a; 仿真操作步骤可参考程序配套的操作视频。 2.算法涉及理论知识概要 载波同步是…

新服务器ubuntu系统相关操作

1、查看驱动:驱动版本535.216.01能够支持cuda12.2,下面直接使用默认安装的cuda。 2、赋予用户管理员权限。 首先有超级用户(root)权限来编辑 /etc/sudoers 文件,visudo 是一个命令,用于安全地编辑 /etc/sudoers 文件。运行: sudo visudo 在 visudo 编辑器中,找到类似…

2、Bert论文笔记

Bert论文 1、解决的问题2、预训练微调2.1预训练微调概念2.2深度双向2.3基于特征和微调&#xff08;预训练下游策略&#xff09; 3、模型架构4、输入/输出1.输入&#xff1a;2.输出&#xff1a;3.Learned Embeddings(学习嵌入)1. **Token Embedding**2. **Position Embedding**3…

python 渗透开发工具之SQLMapApi Server不同IP服务启动方式处理 解决方案SqlMapApiServer外网不能访问的情况

目录 说在前面 什么是 SQLMapAPI 说明 sqlmapApi能干什么 sqlmapApi 服务安装相关 kali-sqlmap存放位置 正常启动sqlmap-api server SqlMapApi-Server 解决外网不能访问情况 说在前面 什么是sqlmap 这个在前面已经说过了&#xff0c;如果这个不知道&#xff0c;就可以…

操作系统论文导读(八):Schedulability analysis of sporadic tasks with multiple criticality specifications——具有多个

Schedulability analysis of sporadic tasks with multiple criticality specifications——具有多个关键性规范的零星任务的可调度性分析 目录 一、论文核心思想 二、基本定义 2.1 关键性指标 2.2 任务及相关参数定义 2.3 几个基础定义 三、可调度性分析 3.1 调度算法分…

技术速递|调用异步功能 - WinForms 在 .NET 9 中的未来发展

作者&#xff1a; Klaus Loeffelmann 排版&#xff1a;Alan Wang 随着 .NET 的不断发展&#xff0c;WinForms 开发者可用的工具也在不断进步&#xff0c;这使得开发更加高效且应用响应更迅速。在 .NET 9 中&#xff0c;我们很高兴引入了一系列新的异步 API&#xff0c;这些 API…

Docker-构建自己的Web-Linux系统-镜像webtop:ubuntu-kde

介绍 安装自己的linux-server,可以作为学习使用&#xff0c;web方式访问&#xff0c;基于ubuntu构建开源项目 https://github.com/linuxserver/docker-webtop安装 docker run -d -p 1336:3000 -e PASSWORD123456 --name webtop lscr.io/linuxserver/webtop:ubuntu-kde登录 …

【每日学点鸿蒙知识】箭头函数、Watch状态变量、H5获取定位数据、前后台切换、长按事件

【每日学点鸿蒙知识】箭头函数、Watch状态变量、H5获取定位数据、前后台切换、长按事件 1、HarmonyOS confirm: () > void () > { }&#xff1f; confirm: () > void () > { }是什么格式。 是一个箭头函数&#xff0c;它的类型是 () > void&#xff0c;表示…

【人工智能机器学习基础篇】——深入详解监督学习之模型评估:掌握评估指标(准确率、精确率、召回率、F1分数等)和交叉验证技术

深入详解监督学习之模型评估 在监督学习中&#xff0c;模型评估是衡量模型性能的关键步骤。有效的模型评估不仅能帮助我们理解模型在训练数据上的表现&#xff0c;更重要的是评估其在未见数据上的泛化能力。本文将深入探讨监督学习中的模型评估方法&#xff0c;重点介绍评估指…