1、消息队列有什么优点和缺点?
优点:
解耦、异步、削峰填谷。
缺点:
系统可用性降低
系统复杂性提高
一致性问题
2、常见消息队列的比较
3、Kafka的特性
1.消息持久化
2.高吞吐量
3.扩展性强(动态)4集群+4台集群
4.多客户端支持(Java、C、C++、GO、Python)
5. Kafka Streams(流处理)双十一销售大屏
6、安全机制
7、数据备份
8、轻量级
9、消息压缩
4、RabbitMQ中的vhost起什么作用?
vhost:虚拟消息服务器。
Vhost提供了逻辑上的分离,可以将众多客户端进行区分,又可以避免队列和交换器的命名冲突。
rabbitmq里创建用户,必须要被指派给至少一个vhost,并且只能访问被指派内的队列、交换器和
绑定。Vhost必须通过rabbitmq的管理控制工具创建。
5、RabbitMQ上的一个queue中存放的message是否有数量限制?限制是多少
默认情况下一般是无限制,可以通过参数来限制, x-max-length :对队列中消息的条数进行
限制,x-max-length-bytes :对队列中消息的总量进行限制。
6、kafka中,可以不用zookeeper么?
新版本的kafka可以不用,3.0以上可以使用Kafka with Kraft,就可以完全抛弃zookeeper
2版本的需要使用zookeeper,用来存放topic。
7、说一说Kafka你熟悉的参数?
1、必选属性
创建生产者对象时有三个属性必须指定:
bootstrap.servers:该属性指定broker的地址清单,
key.serializer:key的序列化器。必须提供将对象序列化成字节数组的序列化器。
value.serializer:value的序列化器。必须提供将对象序列化成字节数组的序列化器。
acks:
acks=0:生产者在写入消息之前不会等待任 何来自服务器的响应,容易丢消息,但是吞吐量高。
acks=1:只要集群的主节点收到消息,生产者会收到来自服务器的成功响应。
acks=all:只有当所有参与复制的节点都收到消息,生产者才会收到一个来自服务器的成功响应。延迟高。
batch.size:该参数指定了一个批次可以使用的内存大小,按照字节数计算。
linger.ms:
max.request.size:控制生产者发送请求最大大小。默认这个值为1M
8、说一说RabbitMQ中的AMQP
AMQP协议 的模型架构 和 RabbitMQ 的模型架构是一样的,生产者将消息发送给交换器,交换器和队列绑定 。
9、RabbitMQ开启持久化机制,有什么要注意的点?
1、开启了持久化,性能影响会非常大,大概会变成2到3千的吞吐量
2、开启持久化需要交换器、队列、消息三者都需要持久化
10、kafka适合哪些场景?
1、适合日志收集:比较适合,按照时间进行写入的。
2、适合消息系统:典型的生产者生产消息,消费者消费消息。
3、流式处理:kafka(用的stream)配合Flink 可以进行大数据量的流处理,不用再去操作DB,很方便的查看任意时间段的数据。
4、不适合限时订单:订单30s没有支付就会取消订单,在RocketMQ中是使用延迟队列的方式实现。
11、RabbitMQ中交换器4种类型?
fanout(广播): 把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中。
direct(定向):把消息路由到BindingKey和RoutingKey完全匹配的队列中。
topic(通配符): 匹配规则: RoutingKey 为一个 点号'.'分隔的字符串。比如: java.xiaoka.show BindingKey和RoutingKey一样也是点号“.“分隔的字符串。 BindingKey可使用 * 和 # 用于做模糊匹配,*匹配一个单词,#匹配多个或者0个
headers:不依赖路由键匹配规则路由消息。是根据发送消息内容中的headers属性进行匹配。性能差,基本用不到。
12、为什么Kafka不支持读写分离?
读写分离要实现的就是主写从读。
1、数据一致性问题:数据从主节点转到从节点,必然会有一个延时的时间窗口,这个时间窗口会
导致主从节点之间的数据不一致。
2、延时问题:Kafka追求高性能,如果走主从复制,延时严重
3、如果实现了主写从读,很难保证负载均衡
4、不实现读写分离,架构简单、出错可能比较小
13、Kafka中是怎么做到消息顺序性的?
一个 topic,一个 partition,一个 consumer,内部单线程消费。
生产者在发送消息的时候指定要发送到特定Partition(分区)。
14、Kafka为什么那么快?
1、文件顺序读写,速度接近内存
2、写入数据,传统方式需要4次拷贝,kafka采用零拷贝技术,减少没有必要的拷贝,发送文件描
述符sendfile。
发送的文件描述符里面记录的是 在Socket缓冲区的位置,直接从Socket缓冲区拿数据就可以了。
3、批处理、发送的JSON数据进行压缩等
15、mq如何解决重复消费?
消息被重复消费,就是消费方多次接受到了同一条消息。根本原因就是,第一次消费完后,
消费方给 MQ 确认已消费的反馈,MQ 没有成功接受。比如网络原因、MQ 重启等。所以 MQ 是无
法保证消息不被重复消费的,只能业务系统层面考虑。
不被重复消费的问题,就被转化为消息消费的幂等性的问题。幂等性就是指一次和多次请求
的结果一致,多次请求不会产生副作用。
解决方案:
1、MVCC多版本并发控制 (生产的时候带上数据的版本号)
执行的方法 update(id,version);
实际执行的SQL语句
update order set count = count + 1 where id =1 and version = #{version}
2、去重表的方案
给这条消息生成一个唯一索引 id ,保存到创建的去重表中。
try{将这条消息生成的唯一id 保存到去重表中insert去重表(id)执行具体的sqlupdate order set count = count + 1 where id =1 }catch(Exeception e){如果去重表执行保存的时候发生了异常在这里捕获异常,不要抛出异常在这里执行消费消息的ACK确认}
16、Rocketmq如何保证高可用性?
1、架构层面
避免用单节点或者简单的一主一从架构,可以采取多主多从的架构,并且主从之间采用同步复制的方式进行数据双写。
2、刷盘策略
RocketMQ默认的异步刷盘,可以改成同步刷盘SYNC_FLUSH。
3、生产消息的高可用
当消息发送失败了,在消息重试的时候,会尽量规避上一次发送的 Broker,选择还没推送过该消息的Broker,以增大消息发送的成功率。
4、消费消息的高可用
消费者获取到消息之后,可以等到整个业务处理完成,再进行CONSUME_SUCCESS状态确认,如果业务处理过程中发生了异常那么就会触发broker的重试机制。
17、RocketMq的存储机制了解吗?
消息生产者发送消息到broker,都是会按照顺序存储在CommitLog文件中,每个commitLog文件的大小为1G。
CommitLog-存储所有的消息元数据,包括Topic、QueueId以及message
ConsumerQueue-消费逻辑队列:存储消息在CommitLog的offset
IndexFile-索引文件:存储消息的key和时间戳等信息,使得RocketMq可以采用key和时间区间来查询消息
也就是说,rocketMq将消息均存储在CommitLog中,并分别提供了CosumerQueue和IndexFile两个索引,来快速检索消息
18、RocketMq性能比较高的原因?
1、底层采用Netty 高效的NIO通讯框架。
2、文件存储,顺序读写
3.零拷贝,使用mmap的方式进行零拷贝,提高了数据传输的效率
4.异步刷盘
相比较于同步刷盘,异步刷盘的性能会高很多
5、RocketMQ大量使用多线程、异步
6、锁优化 (CAS机制无锁化)
19、让你来设计一个消息队列,你会怎么设计?
1、数据存储角度:
理论上,从速度来看,分布式文件系统 > 分布式KV(持久化)> 数据库,而可靠性截然相
反,如果追求性能可以基于文件系统的顺序写、零拷贝。
2、高可用角度:
动态可伸缩、
消息的丢失:多主多从、多副本,遵守raft协议,如果一台主服务器 宕机,通过选举机制选出来主服务器。
3、网络框架角度:
选用高效的Netty框架,producer 同步异步发送消息,consumer 同步异步接收消息。同步能够保证结果,异步能够保证性能。
20、有几百万消息持续积压几小时,说说怎么解决?
1、修复consumer的问题,让他恢复消费速度,然后等待几十分钟消费完毕,这是个解决方案。不
过有时候我们还会进行临时紧急扩容。
一个消费者一秒是1000条,一秒3个消费者是3000条,一分钟是18万条。1000多万条,所以如果积压了几百万到上千万的数据,即使消费者恢复了,也需要大概1小时的时间才能恢复过来。
2、临时紧急扩容具体操作步骤和思路如下:
先修复consumer的问题,确保其恢复消费速度,然后将现有consumer都停掉。
新建一个topic,partition是原来的10倍,临时建立好原先10倍或者20倍的queue数量。然后写一个临时的分发数据的consumer程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的10倍数量的queue。
接着临时征用10倍的机器来部署consumer,每一批consumer消费一个临时queue的数据。
这种做法相当于是临时将queue资源和consumer资源扩大10倍,以正常的10倍速度来消费数据。
等快速消费完积压数据之后,再恢复原先部署架构,重新用原先的consumer机器来消费消息。
21、Rocketmq中Broker的部署方式
1.单台 Master 部署;
2.多台 Master部署
3.多台主从部署---生产中常见的模式
22、Rocketmq中Broker的刷盘策略有哪些?
同步刷盘
SYNC_FLUSH(同步刷盘):生产者发送的每一条消息都在保存到磁盘成功后才返回告诉生产者成功。这种方式不会存在消息丢失的问
题,但是有很大的磁盘IO开销,性能有一定影响。
异步刷盘
ASYNC_FLUSH(异步刷盘):生产者发送的每一条消息并不是立即保存到磁盘,而是暂时缓存起来,然后就返回生产者成功。随后再异步的将缓存数据保存到磁盘,有两种情况:1是定期将缓存中更新的数据进行刷盘,2是当缓存中更新的数据条数达到某一设定值后进行刷盘。这种异步的方式会存在消息丢失(在还未来得及同步到磁盘的时候宕机),但是性能很好。默认是这种模式。
23、什么是路由注册?RocketMQ如何进行路由注册?
RocketMQ的路由注册是通过broker向NameServer发送心跳包实现的,首先borker每隔30s向nameserver发送心跳语句,nameserver更新brokerLiveTable的心跳时间。
24、什么是路由发现?RocketMQ如何进行路由发现?
RocketMQ的路由发现不是实时的,NameServer不会主动向客户端推送,而是客户端定时拉取主题最新的路由,然后更新。
step1:调用RouterInfoManager的方法,从路由表topicQueueTable、brokerAddrTable、filterServerTable分别填充信息;
step2:如果主题对应的消息为顺序消息,则从NameServerKVconfig中获取关于顺序消息相关的配置填充路由信息;
25、什么是路由剔除?RocketMQ如何进行路由剔除?
路由删除有两个触发节点:
1)NameServer定时扫描brokerLiveTable检测上次心跳包与当前系统时间的时间差,如果大于120S,就需要删除;
2)Broker在正常关闭时,会执行unregisterBroker命令。
两种方法删除的逻辑都是一致的。
step1:申请写锁
step2:从brokerLiveTable、filterServerTable移除,从brokerAddrTable、clusterAddrTable、topicQueueTable移除
step3:释放锁
26、使用RocketMQ过程中遇到过什么问题?
1、消息挤压问题(消费者出现问题了,短信没有发送)
2、消息丢失问题
3、消息重复消费问题
4、RocketMQ内存不够OOM问题
27、RocketMQ的总体架构,以及每个组件的功能?
RocketMQ 一共由四个部分组成:NameServer、Broker、Producer、Consumer,它们分别对应着发现、存、发、收四个功能。这四部分的功能很像邮政系统,Producer 相当于负责发送信件的发件人,Consumer 相当于负责接收信件的收件人,Broker 相当于负责暂存信件传输的邮局,NameServer 相当于负责协调各个地方邮局的管理机构。一般情况下,为了保证高可用,每一部分都是以集群形式部署的。