RocketMQ面试题:进阶部分

🧑 博主简介:CSDN博客专家历代文学网(PC端可以访问:https://literature.sinhy.com/#/?__c=1000,移动端可微信小程序搜索“历代文学”)总架构师,15年工作经验,精通Java编程高并发设计Springboot和微服务,熟悉LinuxESXI虚拟化以及云原生Docker和K8s,热衷于探索科技的边界,并将理论知识转化为实际应用。保持对新技术的好奇心,乐于分享所学,希望通过我的实践经历和见解,启发他人的创新思维。在这里,我希望能与志同道合的朋友交流探讨,共同进步,一起在技术的世界里不断学习成长。
技术合作请加本人wx(注明来自csdn):foreast_sea

在这里插入图片描述


在这里插入图片描述

RocketMQ面试题:进阶部分

1. 如何保证消息的可用性/可靠性/不丢失呢?

消息可能在哪些阶段丢失呢?可能会在这三个阶段发生丢失:生产阶段、存储阶段、消费阶段。

所以要从这三个阶段考虑:

在这里插入图片描述

1.1 生产

在生产阶段,主要通过请求确认机制,来保证消息的可靠传递

  • 1、同步发送的时候,要注意处理响应结果和异常。如果返回响应 OK,表示消息成功发送到了 Broker,如果响应失败,或者发生其它异常,都应该重试。
  • 2、异步发送的时候,应该在回调方法里检查,如果发送失败或者异常,都应该进行重试。
  • 3、如果发生超时的情况,也可以通过查询日志的 API,来检查是否在 Broker 存储成功。
1.2 存储

存储阶段,可以通过配置可靠性优先的 Broker 参数来避免因为宕机丢消息,简单说就是可靠性优先的场景都应该使用同步。

  • 1、消息只要持久化到 CommitLog(日志文件)中,即使 Broker 宕机,未消费的消息也能重新恢复再消费。
  • 2、Broker 的刷盘机制:同步刷盘和异步刷盘,不管哪种刷盘都可以保证消息一定存储在 pagecache 中(内存中),但是同步刷盘更可靠,它是 Producer 发送消息后等数据持久化到磁盘之后再返回响应给 Producer。

在这里插入图片描述

  • 3、Broker 通过主从模式来保证高可用,Broker 支持 Master 和 Slave 同步复制、Master 和 Slave 异步复制模式,生产者的消息都是发送给 Master,但是消费既可以从 Master 消费,也可以从 Slave 消费。同步复制模式可以保证即使 Master 宕机,消息肯定在 Slave 中有备份,保证了消息不会丢失。
1.3 消费

从 Consumer 角度分析,如何保证消息被成功消费?

  • Consumer 保证消息成功消费的关键在于确认的时机,不要在收到消息后就立即发送消费确认,而是应该在执行完所有消费业务逻辑之后,再发送消费确认。因为消息队列维护了消费的位置,逻辑执行失败了,没有确认,再去队列拉取消息,就还是之前的一条。

2. 如何处理消息重复的问题呢?

RocketMQ 可以保证消息一定投递,且不丢失,但无法保证消息不重复消费。

因此,需要在业务端做好消息的幂等性处理,或者做消息去重。

在这里插入图片描述

幂等性是指一个操作可以执行多次而不会产生副作用,即无论执行多少次,结果都是相同的。可以在业务逻辑中加入检查逻辑,确保同一消息多次消费不会产生副作用。

例如,在支付场景下,消费者消费扣款的消息,对一笔订单执行扣款操作,金额为100元。

如果因网络不稳定等原因导致扣款消息重复投递,消费者重复消费了该扣款消息,但最终的业务结果要保证只扣款一次,金额为100元。如果扣款操作是符合要求的,那么就可以认为整个消费过程实现了消息幂等。

消息去重,是指在消费者消费消息之前,先检查一下是否已经消费过这条消息,如果消费过了,就不再消费。

业务端可以通过一个专门的表来记录已经消费过的消息 ID,每次消费消息之前,先查询一下这个表,如果已经存在,就不再消费。

public void processMessage(String messageId, String message) {if (!isMessageProcessed(messageId)) {// 处理消息markMessageAsProcessed(messageId);}
}private boolean isMessageProcessed(String messageId) {// 查询去重表,检查消息ID是否存在
}private void markMessageAsProcessed(String messageId) {// 将消息ID插入去重表
}
2.1 如何保证消息的幂等性?

在这里插入图片描述

首先,消息必须携带业务唯一标识,可以通过雪花算法生成全局唯一 ID。

Message msg = new Message(TOPIC /* Topic */,TAG /* Tag */,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);message.setKey("ORDERID_100"); // 订单编号
SendResult sendResult = producer.send(message);      

其次,在消费者接收到消息后,判断 Redis 中是否存在该业务主键的标志位,若存在标志位,则认为消费成功,否则执行业务逻辑,执行完成后,在缓存中添加标志位。

public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {try {for (MessageExt messageExt : msgs) {String bizKey = messageExt.getKeys(); // 唯一业务主键//1. 判断是否存在标志if(redisTemplate.hasKey(RedisKeyConstants.WAITING_SEND_LOCK + bizKey)) {continue;}//2. 执行业务逻辑//TODO do business//3. 设置标志位redisTemplate.opsForValue().set(RedisKeyConstants.WAITING_SEND_LOCK + bizKey, "1", 72, TimeUnit.HOURS);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;} catch (Exception e) {logger.error("consumeMessage error: ", e);return ConsumeConcurrentlyStatus.RECONSUME_LATER;}
}

然后,利用数据库的唯一索引来防止业务的重复插入。

CREATE TABLE `t_order` (`id` bigint(20) NOT NULL AUTO_INCREMENT,`order_id` varchar(64) NOT NULL COMMENT '订单编号',`order_name` varchar(64) NOT NULL COMMENT '订单名称',PRIMARY KEY (`id`),UNIQUE KEY `order_id` (`order_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='订单表';

最后,在数据库表中使用版本号,通过乐观锁机制来保证幂等性。每次更新操作时检查版本号是否一致,只有一致时才执行更新并递增版本号。如果版本号不一致,则说明操作已被执行过,拒绝重复操作。

public void updateRecordWithOptimisticLock(int id, String newValue, int expectedVersion) {int updatedRows = jdbcTemplate.update("UPDATE records SET value = ?, version = version + 1 WHERE id = ? AND version = ?",newValue, id, expectedVersion);if (updatedRows == 0) {throw new OptimisticLockingFailureException("Record has been modified by another transaction");}
}

或者悲观锁机制,通过数据库的锁机制来保证幂等性。

public void updateRecordWithPessimisticLock(int id) {jdbcTemplate.queryForObject("SELECT * FROM records WHERE id = ? FOR UPDATE", id);jdbcTemplate.update("UPDATE records SET value = ? WHERE id = ?", "newValue", id);
}
2.2 雪花算法了解吗?

雪花算法是由 Twitter 开发的一种分布式唯一 ID 生成算法。

在这里插入图片描述

雪花算法以 64 bit 来存储组成 ID 的4 个部分:

  1. 最高位占1 bit,始终为 0,表示正数。
  2. 中位占 41 bit,值为毫秒级时间戳;
  3. 中下位占 10 bit,机器 ID(包括数据中心 ID 和机器 ID),可以支持 1024 个节点。
  4. 末位占 12 bit,值为当前毫秒内生成的不同的自增序列,值的上限为 4096;

目前雪花算法的实现比较多,可以直接使用 Hutool 工具类库中的 IdUtil.getSnowflake() 方法来获取雪花 ID。

long id = IdUtil.getSnowflakeNextId();
  1. Java 面试指南(付费)收录的京东同学 4 云实习面试原题:如何处理消息重复消费的问题?如何保证幂等性?雪花算法了解吗?

3. 怎么处理消息积压?

发生了消息积压,这时候就得想办法赶紧把积压的消息消费完,就得考虑提高消费能力,一般有两种办法:

在这里插入图片描述

  • 消费者扩容:如果当前 Topic 的 Message Queue 的数量大于消费者数量,就可以对消费者进行扩容,增加消费者,来提高消费能力,尽快把积压的消息消费玩。
  • 消息迁移 Queue 扩容:如果当前 Topic 的 Message Queue 的数量小于或者等于消费者数量,这种情况,再扩容消费者就没什么用,就得考虑扩容 Message Queue。可以新建一个临时的 Topic,临时的 Topic 多设置一些 Message Queue,然后先用一些消费者把消费的数据丢到临时的 Topic,因为不用业务处理,只是转发一下消息,还是很快的。接下来用扩容的消费者去消费新的 Topic 里的数据,消费完了之后,恢复原状。

在这里插入图片描述

4. 顺序消息如何实现?

RocketMQ 实现顺序消息的关键在于保证消息生产和消费过程中严格的顺序控制,即确保同一业务的消息按顺序发送到同一个队列中,并由同一个消费者线程按顺序消费。

在这里插入图片描述

4.1 局部顺序消息如何实现?

局部顺序消息保证在某个逻辑分区或业务逻辑下的消息顺序,例如同一个订单或用户的消息按顺序消费,而不同订单或用户之间的顺序不做保证。

在这里插入图片描述

4.2 全局顺序消息如何实现?

全局顺序消息保证消息在整个系统范围内的严格顺序,即消息按照生产的顺序被消费。

可以将所有消息发送到一个单独的队列中,确保所有消息按生产顺序发送和消费。

在这里插入图片描述

5. 如何实现消息过滤?

有两种方案:

  • 一种是在 Broker 端按照 Consumer 的去重逻辑进行过滤,这样做的好处是避免了无用的消息传输到 Consumer 端,缺点是加重了 Broker 的负担,实现起来相对复杂。
  • 另一种是在 Consumer 端过滤,比如按照消息设置的 tag 去重,这样的好处是实现起来简单,缺点是有大量无用的消息到达了 Consumer 端只能丢弃不处理。

一般采用 Cosumer 端过滤,如果希望提高吞吐量,可以采用 Broker 过滤。

对消息的过滤有三种方式:

在这里插入图片描述

  • 根据 Tag 过滤:这是最常见的一种,用起来高效简单
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");
  • SQL 表达式过滤:SQL 表达式过滤更加灵活
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
// 只有订阅的消息有这个属性a, a >=0 and a <= 3
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");
consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
});
consumer.start();
  • Filter Server 方式:最灵活,也是最复杂的一种方式,允许用户自定义函数进行过滤

6. 延时消息了解吗?

电商的订单超时自动取消,就是一个典型的利用延时消息的例子,用户提交了一个订单,就可以发送一个延时消息,1h 后去检查这个订单的状态,如果还是未付款就取消订单释放库存。

RocketMQ 是支持延时消息的,只需要在生产消息的时候设置消息的延时级别:

// 实例化一个生产者来产生延时消息
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
// 启动生产者
producer.start();
int totalMessagesToSend = 100;
for (int i = 0; i < totalMessagesToSend; i++) {Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());// 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)message.setDelayTimeLevel(3);// 发送消息producer.send(message);
}

但是目前 RocketMQ 支持的延时级别是有限的:

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
6.1 RocketMQ 怎么实现延时消息的?

简单,八个字:临时存储+定时任务

Broker 收到延时消息了,会先发送到主题(SCHEDULE_TOPIC_XXXX)的相应时间段的 Message Queue 中,然后通过一个定时任务轮询这些队列,到期后,把消息投递到目标 Topic 的队列中,然后消费者就可以正常消费这些消息。

在这里插入图片描述

7. 怎么实现分布式消息事务的?半消息?

半消息:是指暂时还不能被 Consumer 消费的消息,Producer 成功发送到 Broker 端的消息,但是此消息被标记为 “暂不可投递” 状态,只有等 Producer 端执行完本地事务后经过二次确认了之后,Consumer 才能消费此条消息。

依赖半消息,可以实现分布式消息事务,其中的关键在于二次确认以及消息回查:

在这里插入图片描述

  • 1、Producer 向 broker 发送半消息
  • 2、Producer 端收到响应,消息发送成功,此时消息是半消息,标记为 “不可投递” 状态,Consumer 消费不了。
  • 3、Producer 端执行本地事务。
  • 4、正常情况本地事务执行完成,Producer 向 Broker 发送 Commit/Rollback,如果是 Commit,Broker 端将半消息标记为正常消息,Consumer 可以消费,如果是 Rollback,Broker 丢弃此消息。
  • 5、异常情况,Broker 端迟迟等不到二次确认。在一定时间后,会查询所有的半消息,然后到 Producer 端查询半消息的执行情况。
  • 6、Producer 端查询本地事务的状态
  • 7、根据事务的状态提交 commit/rollback 到 broker 端。(5,6,7 是消息回查)
  • 8、消费者段消费到消息之后,执行本地事务。

8. 死信队列知道吗?

死信队列用于存储那些无法被正常处理的消息,这些消息被称为死信(Dead Letter)。

在这里插入图片描述

产生死信的原因是,消费者在处理消息时发生异常,且达到了最大重试次数。当消费失败的原因排查并解决后,可以重发这些死信消息,让消费者重新消费;如果暂时无法处理,为避免到期后死信消息被删除,可以先将死信消息导出并进行保存。

9. 如何保证 RocketMQ 的高可用?

NameServer 因为是无状态,且不相互通信的,所以只要集群部署就可以保证高可用。

在这里插入图片描述

RocketMQ 的高可用主要是在体现在 Broker 的读和写的高可用,Broker 的高可用是通过集群主从实现的。

在这里插入图片描述

Broker 可以配置两种角色:Master 和 Slave,Master 角色的 Broker 支持读和写,Slave 角色的 Broker 只支持读,Master 会向 Slave 同步消息。

也就是说 Producer 只能向 Master 角色的 Broker 写入消息,Cosumer 可以从 Master 和 Slave 角色的 Broker 读取消息。

Consumer 的配置文件中,并不需要设置是从 Master 读还是从 Slave 读,当 Master 不可用或者繁忙的时候, Consumer 的读请求会被自动切换到从 Slave。有了自动切换 Consumer 这种机制,当一个 Master 角色的机器出现故障后,Consumer 仍然可以从 Slave 读取消息,不影响 Consumer 读取消息,这就实现了读的高可用。

如何达到发送端写的高可用性呢?在创建 Topic 的时候,把 Topic 的多个 Message Queue 创建在多个 Broker 组上(相同 Broker 名称,不同 brokerId 机器组成 Broker 组),这样当 Broker 组的 Master 不可用后,其他组 Master 仍然可用, Producer 仍然可以发送消息 RocketMQ 目前还不支持把 Slave 自动转成 Master ,如果机器资源不足,需要把 Slave 转成 Master ,则要手动停止 Slave 色的 Broker ,更改配置文件,用新的配置文件启动 Broker。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/501731.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

设计模式之桥接设计模式

简介 与适配器模式类似&#xff0c;桥接设计模式也是结构型模式将抽象部分与实现部分分离&#xff0c;使它们都可以独立的变化通俗来说&#xff0c;是通过组合来桥接其它的行为/维度 应用场景举例 传统方式 使用桥接设计模式之后 编码示例 /*** 抽象角色&#xff1a;手机*/ …

个人健康信息系统|Java|SSM|VUE| 前后端分离

【技术栈】 1⃣️&#xff1a;架构: B/S、MVC 2⃣️&#xff1a;系统环境&#xff1a;Windowsh/Mac 3⃣️&#xff1a;开发环境&#xff1a;IDEA、JDK1.8、Maven、Mysql5.7 4⃣️&#xff1a;技术栈&#xff1a;Java、Mysql、SSM、Mybatis-Plus、VUE、jquery,html 5⃣️数据库可…

个人交友系统|Java|SSM|JSP|

【技术栈】 1⃣️&#xff1a;架构: B/S、MVC 2⃣️&#xff1a;系统环境&#xff1a;Windowsh/Mac 3⃣️&#xff1a;开发环境&#xff1a;IDEA、JDK1.8、Maven、Mysql5.7 4⃣️&#xff1a;技术栈&#xff1a;Java、Mysql、SSM、Mybatis-Plus、JSP、jquery,html 5⃣️数据库可…

每天40分玩转Django:Django Celery

Django Celery 一、知识要点概览表 模块知识点掌握程度要求Celery基础配置、任务定义、任务执行深入理解异步任务任务状态、结果存储、错误处理熟练应用周期任务定时任务、Crontab、任务调度熟练应用监控管理Flower、任务监控、性能优化理解应用 二、基础配置实现 1. 安装和…

canvas+fabric实现时间刻度尺+长方形数据展示

前言 我们前面实现了时间刻度尺&#xff0c;现在在时间刻度尺里面画一个长方形&#xff0c;长方形里面有数据展示。 效果 实现 1.先实现时间刻度尺 2.鼠标移动、按下事件监听并画出对应效果 3.在刻度尺里面画对应的长方形数据展示 <template><div><canvas…

网络安全【C10-2024.10.1】-sql注入基础

1、利用宽字节注入实现“库名-表名”的注入过程&#xff0c;写清楚注入步骤&#xff1b; 宽字节概念 1、如果一个字符的大小是一个字节的&#xff0c;称为窄字节&#xff1b; 2、如果一个字符的大小是两个及以上字节的&#xff0c;称为宽字节&#xff1b;像GB2312、GBK、GB1803…

【Domain Generalization(2)】领域泛化在文生图领域的工作之——PromptStyler(ICCV23)

系列文章目录 【Domain Generalization(1)】增量学习/在线学习/持续学习/迁移学习/多任务学习/元学习/领域适应/领域泛化概念理解第一篇大概了解了 DG 的概念&#xff0c;那么接下来将介绍 DG 近年在文生图中的应用/代表性工作。本文介绍的是 PromptStyler: Prompt-driven Sty…

MySQL 08 章——聚合函数

聚合函数是对一组数据进行汇总的函数&#xff0c;输入的是一组数据的集合&#xff0c;输出的是单个值 一、聚合函数介绍 &#xff08;1&#xff09;AVG和SUM函数 举例&#xff1a;只适用于数值类型的字段&#xff08;或变量&#xff09;AVG函数和SUM函数在计算空值时&#x…

HTML——73.button按钮

<!DOCTYPE html> <html><head><meta charset"UTF-8"><title>button按钮</title></head><body><!--button按钮&#xff1a;1.button按钮type属性&#xff1a;可以设置三个值&#xff0c;submit/reset/button,含义…

Java 数据库连接 - Sqlite

Java 数据库连接 - Sqlite PS: 1. 连接依赖库&#xff1a;[sqlite-jdbc-xxx.jar](https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc)(根据连接的数据库版本选择) 2. 支持一次连接执行多次sql语句&#xff1b; 3. 仅本地连接&#xff1b;使用说明&#xff1a; publ…

NCCL源码解读3.1:double binary tree双二叉树构建算法,相比ring环算法的优势

目录 一、双二叉树出现的原因 二、双二叉树介绍 三、双二叉树大规模性能 四、双二叉树源码解读 双二叉树注意事项 核心逻辑 源码速递 视频分享在这&#xff0c;未完待补充&#xff1a; 3.1 NCCL源码解读双二叉树构建算法&#xff0c;double binary tree相比ring环算法的…

深入理解 JVM 的垃圾收集器:CMS、G1、ZGC

&#x1f9d1; 博主简介&#xff1a;CSDN博客专家&#xff0c;历代文学网&#xff08;PC端可以访问&#xff1a;https://literature.sinhy.com/#/literature?__c1000&#xff0c;移动端可微信小程序搜索“历代文学”&#xff09;总架构师&#xff0c;15年工作经验&#xff0c;…

四、VSCODE 使用GIT插件

VSCODE 使用GIT插件 一下载git插件与git Graph插件二、git插件使用三、文件提交到远程仓库四、git Graph插件 一下载git插件与git Graph插件 二、git插件使用 git插件一般VSCode自带了git&#xff0c;就是左边栏目的图标 在下载git软件后vscode的git插件会自动识别当前项目 …

【NLP高频面题】用RNN训练语言模型时如何计算损失?

用RNN训练语言模型时如何计算损失&#xff1f; 重要性&#xff1a;★ 以“you say goodbye and i say hello.”为例&#xff0c;将其作为具体的数据传入网络&#xff0c;此时 RNNLM 进行的处理如图所示&#xff1a; RNNLM 可以“记忆”目前为止输入的单词&#xff0c;并以此…

Spring Cloud Security集成JWT 快速入门Demo

一、介绍 JWT (JSON Web Token) 是一种带有绑实和信息的简单标准化机制&#xff0c;在信息通信中用于验证和信息传递。尤其在应用中使用Spring Cloud实现分布式构建时&#xff0c;JWT可以作为一种无状态验证原理的证明。 本文将进一步描述如何在Spring Cloud Security中集成JW…

【机器学习】【朴素贝叶斯分类器】从理论到实践:朴素贝叶斯分类器在垃圾短信过滤中的应用

&#x1f31f; 关于我 &#x1f31f; 大家好呀&#xff01;&#x1f44b; 我是一名大三在读学生&#xff0c;目前对人工智能领域充满了浓厚的兴趣&#xff0c;尤其是机器学习、深度学习和自然语言处理这些酷炫的技术&#xff01;&#x1f916;&#x1f4bb; 平时我喜欢动手做实…

unity学习5:创建一个自己的3D项目

目录 1 在unity里创建1个3D项目 1.1 关于选择universal 3d&#xff0c;built-in render pipeline的区别 1.2 创建1个universal 3d项目 2 打开3D项目 2.1 准备操作面板&#xff1a;操作界面 layout,可以随意更换 2.2 先收集资源&#xff1a;打开 window的 AssetStore 下载…

Vue3 内置组件之component

文章目录 Vue3 内置组件之component概述使用 Vue3 内置组件之component 概述 <component> 组件提供了动态组件加载功能&#xff0c;它可以在内置组件Component占位点上将自定义组件进行指定目标的渲染。比如页面中常见的Tabs选项卡效果就可以利用动态组件加载功能轻松实…

学习路之VScode--自定义按键写注释(插件)

1. 安装 "KoroFileHeader" 插件 首先&#xff0c;在 VScode 中搜索并安装名为 "KoroFileHeader" 的插件。你可以通过在扩展商店中搜索插件名称来找到并安装它。 2. 进入 VScode 设置页面 点击 VScode 左下角的设置图标&#xff0c;然后选择 "设置&q…

C++编程库与框架实战——ZeroMQ消息队列

一,消息队列简介 消息队列是一种进程间的通信机制,用于在不同进程之间同步消息。通信期间,一个进程将消息放入该队列中,然后另一个进程就可以从该队列中取出这条消息。 消息队列可以是异步的,即发送方无需等待接收方的确认或回复就可以立即执行下一步的操作。 消息队列…