1、概述
RocketMQ是阿里巴巴开源的一个分布式消息中间件,具有高吞吐量、低延迟和强一致性等特点。它特别适合大规模分布式系统的消息传递,广泛应用于电商、金融、物流等领域的实时数据处理和异步通信。
RocketMQ是用Java语言实现,在设计时参考了Kafka,并做出了自己的一些改进,消息可靠性上比Kafka更好。RocketMQ在阿里集团被广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理等。
它最初是为了解决阿里巴巴双11大促期间的海量消息传递问题而设计的,后来被捐赠给Apache基金会,成为了一个广泛使用的开源项目。RocketMQ不仅是一个消息队列系统,还可以作为日志收集、实时数据处理等场景下的分布式消息总线。
2、特点
- 高吞吐量:RocketMQ能够在单机上每秒处理数百万条消息,适用于大数据量的场景。
- 低延迟:RocketMQ的设计目标是低延迟,能够在毫秒级别内完成消息传递。
- 强一致性:RocketMQ支持消息的顺序传递,确保消息的顺序性和一致性。
- 分布式架构:RocketMQ采用主从复制和分布式架构,提供高可用性和水平扩展能力。
- 灵活的消息模型:支持P2P和Pub/Sub模式,满足不同的业务需求。
- 丰富的生态:RocketMQ提供了多种客户端库和工具,支持多种编程语言和平台。
- 插件机制:RocketMQ提供了插件机制,用户可以根据需要扩展其功能,例如添加自定义的序列化器、压缩算法等。
- 持久化存储:RocketMQ使用高效的文件系统进行消息持久化,确保消息不会丢失,并且支持磁盘预读、零拷贝等优化技术。
- 集群管理:RocketMQ支持多种集群模式,包括同步复制、异步复制、混合复制等,确保系统的高可用性和容错性。
3、架构设计
结构示例图:
RocketMQ的架构设计非常精巧,主要由以下几个组件组成:
(1)、NameServer
NameServer是RocketMQ的路由注册中心,负责管理和维护Broker的元数据信息(如IP地址、端口等)。生产者和消费者通过NameServer获取Broker的地址信息,从而建立连接。
特点:
- 无状态:NameServer是无状态的,多个NameServer实例可以独立运行,互不影响。
- 轻量级:NameServer的职责相对简单,只负责路由信息的管理和更新,不参与消息的存储和转发。
- 高可用:可以通过部署多个NameServer实例来实现高可用性,生产者和消费者会自动选择可用的NameServer。
(2)、Broker
Broker是RocketMQ的核心组件,负责消息的存储、转发和查询。每个Broker实例可以包含多个Topic,每个Topic又可以分为多个Queue(队列),用于实现消息的分区存储和负载均衡。
Broker类型:
- Master Broker:主Broker负责接收生产者的消息并将其持久化到磁盘,同时将消息分发给从Broker。
- Slave Broker:从Broker通过异步或同步的方式从主Broker复制消息,确保数据的高可用性和容错性。
特点:
- 高吞吐量:Broker使用高效的文件系统进行消息持久化,并通过内存映射、零拷贝等技术优化性能。
- 持久化存储:Broker将消息存储在磁盘上,确保消息不会丢失。支持磁盘预读、批量写入等优化技术。
- 负载均衡:Broker支持消息的分区存储和负载均衡,能够根据不同的策略将消息分配到不同的Queue中。
- 主从复制:Broker支持主从复制机制,确保数据的高可用性和容错性。可以通过配置同步复制或异步复制来平衡性能和可靠性。
(3)、Producer(生产者)
Producer负责将消息发送到Broker。生产者可以选择将消息发送到指定的Topic和Queue,或者让Broker自动选择合适的Queue。
特点:
- 负载均衡:Producer可以根据不同的策略选择合适的Broker和Queue,实现负载均衡。
- 事务消息:RocketMQ支持事务消息,确保消息的可靠传递。生产者可以在发送消息后执行本地事务,并根据事务结果决定是否提交或回滚消息。
- 批量发送:Producer支持批量发送消息,减少网络开销,提升性能。
- 消息压缩:Producer可以对消息进行压缩,减少网络传输的数据量,提升性能。
(4)、Consumer(消费者)
Consumer负责从Broker拉取消息并进行处理。消费者可以选择从指定的Topic和Queue中拉取消息,或者让Broker自动分配消息。
消费模式:
- 集群消费:多个消费者实例可以同时消费同一个Topic的消息,消息会被均匀分配给不同的消费者实例。适用于需要高吞吐量的场景。
- 广播消费:每个消费者实例都会收到Topic中的所有消息。适用于需要每个消费者都处理所有消息的场景。
特点:
- 负载均衡:Consumer可以根据不同的策略选择合适的Broker和Queue,实现负载均衡。
- 消息重试:如果消费者处理消息失败,RocketMQ会自动将消息重新放入队列中,供消费者再次处理。
- 消息顺序:RocketMQ支持消息的顺序传递,确保消息按照发送的顺序被消费。可以通过配置顺序消息队列来实现这一功能。
- 消息过滤:Consumer可以根据消息的标签(Tag)或其他属性进行过滤,只消费符合条件的消息。
(5)、Message(消息)
结构:
- Topic:消息的主题,用于区分不同类型的消息。一个Topic可以包含多个Queue。
- Queue:消息的队列,用于实现消息的分区存储和负载均衡。每个Queue对应一个物理文件,消息按顺序写入文件中。
- Tag:消息的标签,用于对消息进行分类和过滤。消费者可以根据Tag来选择消费哪些消息。
- Key:消息的唯一标识符,用于快速查找和定位消息。
- Body:消息的主体内容,通常是一个字节数组,可以包含任意格式的数据。
- 属性:消息的附加属性,用于存储一些额外的元数据信息,如消息的优先级、延迟时间等。
4、工作流程
(1)、启动NameServer:首先启动NameServer,NameServer作为路由注册中心,负责管理和维护Broker的元数据信息。
(2)、启动Broker:启动Master和Slave Broker,Broker会向NameServer注册自己的地址信息。生产者和消费者通过NameServer获取Broker的地址信息,建立连接。
(3)、生产者发送消息:
- 生产者根据Topic和Tag构建消息,并选择合适的Broker和Queue。
- 生产者将消息发送到Broker,Broker将消息持久化到磁盘,并返回确认信息。
(4)、消费者拉取消息: - 消费者根据Topic和Tag订阅消息,并选择合适的Broker和Queue。
- 消费者定期从Broker拉取消息,并进行处理。处理完成后,消费者向Broker发送确认信息,表示消息已被成功消费。
(5)、消息重试:如果消费者处理消息失败,RocketMQ会自动将消息重新放入队列中,供消费者再次处理。
(6)、消息顺序:对于顺序消息,RocketMQ会确保消息按照发送的顺序被消费。消费者可以配置顺序消息队列,确保消息的顺序性。
5、关键概念
- Topic:消息的主题,用于区分不同类型的消息。一个Topic可以包含多个Queue。
- Queue:消息的队列,用于实现消息的分区存储和负载均衡。每个Queue对应一个物理文件,消息按顺序写入文件中。
- Tag:消息的标签,用于对消息进行分类和过滤。消费者可以根据Tag来选择消费哪些消息。
- Group:消费者组,用于区分不同的消费者实例。同一Group内的消费者会共享消息,不同Group的消费者可以独立消费消息。类似kafka。
- Offset:消息的偏移量,表示消费者已经消费到的消息位置。每个消费者组都有一个独立的Offset,用于记录消费进度。
- Message Key:消息的唯一标识符,用于快速查找和定位消息。可以通过Message Key查询特定的消息。
- Transaction Message:事务消息,确保消息的可靠传递。生产者可以在发送消息后执行本地事务,并根据事务结果决定是否提交或回滚消息。
6、应用场景
- 电商促销:RocketMQ适合用于电商促销活动中的订单处理、库存更新等场景,能够应对高并发和大流量,其最初设计就是为了淘宝双十一活动准备的。
- 实时数据处理:适用于日志收集、监控数据传输等实时数据处理场景。
- 异步通信:可以用于系统之间的解耦和异步通信,避免阻塞主线程。
7、代码示例
生产者示例:
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;public class RocketMQProducer {public static void main(String[] args) throws Exception {// 创建生产者实例DefaultMQProducer producer = new DefaultMQProducer("producer_group");producer.setNamesrvAddr("localhost:9876"); // 设置NameServer地址producer.start();// 发送消息for (int i = 0; i < 10; i++) {String messageBody = "Hello, RocketMQ! " + i;// 参数(topic,标签tag,消息体)Message msg = new Message("topic-test", "tag-a", messageBody.getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(msg);System.out.printf(" [x] Sent message: %s, result: %s%n", messageBody, sendResult);}// 关闭生产者producer.shutdown();}
}
消费者示例:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;public class RocketMQConsumer {public static void main(String[] args) throws Exception {// 创建消费者实例DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");consumer.setNamesrvAddr("localhost:9876"); // 设置NameServer地址consumer.subscribe("topic-test", "");// 注册消息监听器consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {System.out.printf(" [x] Received message: %s, content: %s%n",new String(msg.getBody()), msg.getMsgId());}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动消费者consumer.start();System.out.println(" [x] Consumer started.");}
}
乘风破浪会有时,直挂云帆济沧海!!!