RocketMQ高性能核心原理与源码架构剖析

文章目录

    • 一、源码环境搭建
      • 主要功能模块
      • 源码启动服务
        • 启动nameServer
        • 启动Broker
        • 发送消息
        • 消费消息
    • 二、源码热身阶段
      • NameServer的启动过程
        • 关注重点
        • 源码重点
      • Broker服务启动过程
        • 关注重点
        • 源码重点
      • Netty服务注册框架
        • 关注重点
        • 源码重点
        • 关于RocketMQ的同步结果推送与异步结果推送
      • Broker心跳注册管理
        • 关注重点
        • 源码重点
        • 极简化的服务注册发现流程
      • Producer发送消息过程
        • 关注重点
        • 源码重点
      • Consumer拉取消息过程
        • 关注重点
        • 源码重点
        • 广播模式与集群模式的Offset处理
        • Consumer与MessageQueue建立绑定关系
        • 顺序消费与并发消费
        • 实际拉取消息还是通过PullMessageService完成的。
      • 客户端负载均衡管理总结
        • Producer负载均衡
        • Consumer负载均衡

一、源码环境搭建

主要功能模块

​ RocketMQ的官方Git仓库地址:https://github.com/apache/rocketmq 可以用git把项目clone下来或者直接下载代码包。

​ 也可以到RocketMQ的官方网站上下载指定版本的源码: http://rocketmq.apache.org/dowloading/releases/

源码下很多的功能模块,很容易让人迷失方向,我们只关注下几个最为重要的模块:

broker: Broker 模块(broke 启动进程)
client :消息客户端,包含消息生产者、消息消费者相关类
example: RocketMQ 例代码
namesrv:NameServer模块
store:消息存储模块
remoting:远程访问模块

源码启动服务

​ 将源码导入IDEA后,需要先对源码进行编译。编译指令 clean install -Dmaven.test.skip=true
在这里插入图片描述
编译完成后就可以开始调试代码了。调试时需要按照以下步骤:

​ 调试时,先在项目目录下创建一个conf目录,并从distribution拷贝broker.conf和logback_broker.xml和logback_namesrv.xml
在这里插入图片描述

启动nameServer

展开namesrv模块,运行NamesrvStartup类即可启动NameServer
在这里插入图片描述
启动时,会报错,提示需要配置一个ROCKETMQ_HOME环境变量。这个环境变量我们可以在机器上配置,跟配置JAVA_HOME环境变量一样。也可以在IDEA的运行环境中配置。目录指向源码目录即可。
在这里插入图片描述
在这里插入图片描述
配置完成后,再次执行,看到以下日志内容,表示NameServer启动成功

The Name Server boot success. serializeType=JSON

启动Broker

启动Broker之前,我们需要先修改之前复制的broker.conf文件

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH# 自动创建Topic
autoCreateTopicEnable=true
# nameServ地址
namesrvAddr=127.0.0.1:9876
# 存储路径
storePathRootDir=E:\\RocketMQ\\data\\rocketmq\\dataDir
# commitLog路径
storePathCommitLog=E:\\RocketMQ\\data\\rocketmq\\dataDir\\commitlog
# 消息队列存储路径
storePathConsumeQueue=E:\\RocketMQ\\data\\rocketmq\\dataDir\\consumequeue
# 消息索引存储路径
storePathIndex=E:\\RocketMQ\\data\\rocketmq\\dataDir\\index
# checkpoint文件路径
storeCheckpoint=E:\\RocketMQ\\data\\rocketmq\\dataDir\\checkpoint
# abort文件存储路径
abortFile=E:\\RocketMQ\\data\\rocketmq\\dataDir\\abort

然后Broker的启动类是broker模块下的BrokerStartup。

启动Broker时,同样需要ROCETMQ_HOME环境变量,并且还需要配置一个-c 参数,指向broker.conf配置文件
在这里插入图片描述
然后重新启动,即可启动Broker。

发送消息

在源码的example模块下,提供了非常详细的测试代码。例如我们启动example模块下的org.apache.rocketmq.example.quickstart.Producer类即可发送消息。

public class Producer {/*** The number of produced messages.*/public static final int MESSAGE_COUNT = 1000;public static final String PRODUCER_GROUP = "please_rename_unique_group_name";public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";public static final String TOPIC = "TopicTest";public static final String TAG = "TagA";public static void main(String[] args) throws MQClientException, InterruptedException {/** Instantiate with a producer group name.*/DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);/** Specify name server addresses.** Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR* <pre>* {@code*  producer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");* }* </pre>*/// Uncomment the following line while debugging, namesrvAddr should be set to your local address
//        producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);/** Launch the instance.*/producer.start();for (int i = 0; i < MESSAGE_COUNT; i++) {try {/** Create a message instance, specifying topic, tag and message body.*/Message msg = new Message(TOPIC /* Topic */,TAG /* Tag */,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);/** Call send message to deliver message to one of brokers.*/SendResult sendResult = producer.send(msg);/** There are different ways to send message, if you don't care about the send result,you can use this way* {@code* producer.sendOneway(msg);* }*//** if you want to get the send result in a synchronize way, you can use this send method* {@code* SendResult sendResult = producer.send(msg);* System.out.printf("%s%n", sendResult);* }*//** if you want to get the send result in a asynchronize way, you can use this send method* {@code**  producer.send(msg, new SendCallback() {*  @Override*  public void onSuccess(SendResult sendResult) {*      // do something*  }**  @Override*  public void onException(Throwable e) {*      // do something*  }*});**}*/System.out.printf("%s%n", sendResult);} catch (Exception e) {e.printStackTrace();Thread.sleep(1000);}}/** Shut down once the producer instance is not longer in use.*/producer.shutdown();}
}

但是在测试源码中,需要指定NameServer地址。这个NameServer地址有两种指定方式,一种是配置一个NAMESRV_ADDR的环境变量。另一种是在源码中指定。我们可以在源码中加一行代码指定NameServer

producer.setNamesrvAddr(“127.0.0.1:9876”);

然后就可以发送消息了。

消费消息
public class Consumer {public static final String CONSUMER_GROUP = "please_rename_unique_group_name_4";public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";public static final String TOPIC = "TopicTest";public static void main(String[] args) throws InterruptedException, MQClientException {/** Instantiate with specified consumer group name.*/DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);/** Specify name server addresses.* <p/>** Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR* <pre>* {@code* consumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");* }* </pre>*/// Uncomment the following line while debugging, namesrvAddr should be set to your local address
//        consumer.setNamesrvAddr(DEFAULT_NAMESRVADDR);/** Specify where to start in case the specific consumer group is a brand-new one.*/consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);/** Subscribe one more topic to consume.*/consumer.subscribe(TOPIC, "*");/**  Register callback to execute on arrival of messages fetched from brokers.*/consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});/**  Launch the consumer instance.*/consumer.start();System.out.printf("Consumer Started.%n");}
}

我们可以使用同一模块下的org.apache.rocketmq.example.quickstart.Consumer类来消费消息。运行时同样需要指定NameServer地址

consumer.setNamesrvAddr(“192.168.232.128:9876”);

这样整个调试环境就搭建好了。

二、源码热身阶段

NameServer的启动过程

关注重点

在RocketMQ集群中,实际记性消息存储、推送等核心功能点是Broker。而NameServer的作用,其实和微服务中的注册中心非常类似,他只是提供了Broker端的服务注册与发现功能。

源码重点

​== NameServer的启动入口类是org.apache.rocketmq.namesrv.NamesrvStartup。其中的核心是构建并启动一个NamesrvController==。这个Cotroller对象就跟MVC中的Controller是很类似的,都是响应客户端的请求。只不过,他响应的是基于Netty的客户端请求。

​ 另外,他的实际启动过程,其实可以配合NameServer的启动脚本进行更深入的理解。

public static void main(String[] args) {main0(args);}public static NamesrvController main0(String[] args) {//K1 NameServer的核心组件,类似于Web应用中的Controller,负责接收处理网络请求。try {NamesrvController controller = createNamesrvController(args);start(controller);String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();log.info(tip);System.out.printf("%s%n", tip);return controller;} catch (Throwable e) {e.printStackTrace();System.exit(-1);}return null;}
 public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));//设定版本//PackageConflictDetect.detectFastjson();Options options = ServerUtil.buildCommandlineOptions(new Options());commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());if (null == commandLine) {System.exit(-1);return null;}//K2 NameServer的两个核心配置final NamesrvConfig namesrvConfig = new NamesrvConfig();final NettyServerConfig nettyServerConfig = new NettyServerConfig();nettyServerConfig.setListenPort(9876);//默认直接指定9876端口//命令行参数 -c 和 -p 解析。// 怎么调整默认端口?//NamesrvConfig里configStorePath,默认路径System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties";//按照这个目录建立的文件,可在里面做配置 比如调整默认端口if (commandLine.hasOption('c')) {String file = commandLine.getOptionValue('c');if (file != null) {InputStream in = new BufferedInputStream(new FileInputStream(file));properties = new Properties();properties.load(in);MixAll.properties2Object(properties, namesrvConfig);MixAll.properties2Object(properties, nettyServerConfig);namesrvConfig.setConfigStorePath(file);System.out.printf("load config properties file OK, %s%n", file);in.close();}}// - p 查看配置信息if (commandLine.hasOption('p')) {InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);MixAll.printObjectProperties(console, namesrvConfig);MixAll.printObjectProperties(console, nettyServerConfig);System.exit(0);}MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);//ROCKETMQ_HOME环境变量监测if (null == namesrvConfig.getRocketmqHome()) {System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);System.exit(-2);}LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();JoranConfigurator configurator = new JoranConfigurator();configurator.setContext(lc);lc.reset();configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);MixAll.printObjectProperties(log, namesrvConfig);MixAll.printObjectProperties(log, nettyServerConfig);final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);// remember all configs to prevent discardcontroller.getConfiguration().registerConfig(properties);return controller;}
  public static NamesrvController start(final NamesrvController controller) throws Exception {if (null == controller) {throw new IllegalArgumentException("NamesrvController is null");}//初始化,主要是几个定时任务boolean initResult = controller.initialize();if (!initResult) {controller.shutdown();System.exit(-3);}//服务关闭钩子,在服务正常关闭时执行。 去释放一些进程资源,但是如果是非正常关闭就不会触发钩子Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, (Callable<Void>) () -> {controller.shutdown();return null;}));//启动服务controller.start();return controller;}
 public boolean initialize() {this.kvConfigManager.load();//创建NettyServer网络处理对象,做RPC请求的this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);//Netty服务器的工作线程池this.remotingExecutor =Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));//K1 NameServer注册Processor到RemotingServer中。this.registerProcessor();this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.routeInfoManager::scanNotActiveBroker, 5, 10, TimeUnit.SECONDS);this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.kvConfigManager::printAllPeriodically, 1, 10, TimeUnit.MINUTES);//TLS是一个安全传输层协议,相关参数只能用JVM指令注入if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {// Register a listener to reload SslContexttry {fileWatchService = new FileWatchService(new String[] {TlsSystemConfig.tlsServerCertPath,TlsSystemConfig.tlsServerKeyPath,TlsSystemConfig.tlsServerTrustCertPath},new FileWatchService.Listener() {boolean certChanged, keyChanged = false;@Overridepublic void onChanged(String path) {if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {log.info("The trust certificate changed, reload the ssl context");reloadServerSslContext();}if (path.equals(TlsSystemConfig.tlsServerCertPath)) {certChanged = true;}if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {keyChanged = true;}if (certChanged && keyChanged) {log.info("The certificate and private key changed, reload the ssl context");certChanged = keyChanged = false;reloadServerSslContext();}}private void reloadServerSslContext() {((NettyRemotingServer) remotingServer).loadSslContext();}});} catch (Exception e) {log.warn("FileWatchService created error, can't load the certificate dynamically");}}return true;}
//启动服务public void start() throws Exception {//启动了一个Netty服务端this.remotingServer.start();if (this.fileWatchService != null) {this.fileWatchService.start();}}

​ 从NameServer启动和关闭这两个关键步骤,我们可以总结出NameServer的组件其实并不是很多,整个NameServer的结构是这样的;
在这里插入图片描述

这两个配置类就可以用来指导如何优化Nameserver的配置。比如,如何调整nameserver的端口?

从这里也能看出, RocketMQ的整体源码风格就是典型的MVC思想。Controller响应请求,Service处理业务,各种Table保存消息。
然后可以看出nameServer只接收请求,而不对外发送请求(只有server,没有client),从而侧面验证了nameserver有别于zk的注册中心,不保证数据的一致性,它们之间不存在数据交互

Broker服务启动过程

关注重点

Broker是整个RocketMQ的业务核心。所有消息存储、转发这些重要的业务都是Broker进行处理。
​ 这里重点梳理Broker有哪些内部服务。这些内部服务将是整理Broker核心业务流程的起点。

源码重点

Broker启动的入口在BrokerStartup这个类,可以从他的main方法开始调试。

   public static void main(String[] args) {start(createBrokerController(args));}

启动过程关键点:重点也是围绕一个BrokerController对象,先创建,然后再启动。

 public static BrokerController createBrokerController(String[] args) {//获取版本System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));try {//PackageConflictDetect.detectFastjson();封装参数Options options = ServerUtil.buildCommandlineOptions(new Options());commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),new PosixParser());if (null == commandLine) {System.exit(-1);}//K2 Broker的核心配置final BrokerConfig brokerConfig = new BrokerConfig();final NettyServerConfig nettyServerConfig = new NettyServerConfig();final NettyClientConfig nettyClientConfig = new NettyClientConfig();nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));nettyServerConfig.setListenPort(10911);//默认10911端口//K2 存储相关的配置信息final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();//SLAVE使用的消息常驻内存比例比Master低10%if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) {int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);}if (commandLine.hasOption('c')) {String file = commandLine.getOptionValue('c');if (file != null) {configFile = file;InputStream in = new BufferedInputStream(new FileInputStream(file));properties = new Properties();properties.load(in);properties2SystemEnv(properties);MixAll.properties2Object(properties, brokerConfig);MixAll.properties2Object(properties, nettyServerConfig);MixAll.properties2Object(properties, nettyClientConfig);MixAll.properties2Object(properties, messageStoreConfig);BrokerPathConfigHelper.setBrokerConfigPath(file);in.close();}}MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);if (null == brokerConfig.getRocketmqHome()) {System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV);System.exit(-2);}String namesrvAddr = brokerConfig.getNamesrvAddr();if (null != namesrvAddr) {try {String[] addrArray = namesrvAddr.split(";");for (String addr : addrArray) {RemotingUtil.string2SocketAddress(addr);}} catch (Exception e) {System.out.printf("The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n",namesrvAddr);System.exit(-3);}}//通过brokerId判断主从switch (messageStoreConfig.getBrokerRole()) {case ASYNC_MASTER:case SYNC_MASTER:brokerConfig.setBrokerId(MixAll.MASTER_ID);//MASTER_ID=0Lbreak;case SLAVE:if (brokerConfig.getBrokerId() <= 0) {System.out.printf("Slave's brokerId must be > 0");System.exit(-3);}break;default:break;}//Dledger集群的所有Broker节点ID都是-1if (messageStoreConfig.isEnableDLegerCommitLog()) {brokerConfig.setBrokerId(-1);}messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1);LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();JoranConfigurator configurator = new JoranConfigurator();configurator.setContext(lc);lc.reset();System.setProperty("brokerLogDir", "");if (brokerConfig.isIsolateLogEnable()) {System.setProperty("brokerLogDir", brokerConfig.getBrokerName() + "_" + brokerConfig.getBrokerId());}if (brokerConfig.isIsolateLogEnable() && messageStoreConfig.isEnableDLegerCommitLog()) {System.setProperty("brokerLogDir", brokerConfig.getBrokerName() + "_" + messageStoreConfig.getdLegerSelfId());}configurator.doConfigure(brokerConfig.getRocketmqHome() + "/conf/logback_broker.xml");if (commandLine.hasOption('p')) {InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);MixAll.printObjectProperties(console, brokerConfig);MixAll.printObjectProperties(console, nettyServerConfig);MixAll.printObjectProperties(console, nettyClientConfig);MixAll.printObjectProperties(console, messageStoreConfig);System.exit(0);} else if (commandLine.hasOption('m')) {InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);MixAll.printObjectProperties(console, brokerConfig, true);MixAll.printObjectProperties(console, nettyServerConfig, true);MixAll.printObjectProperties(console, nettyClientConfig, true);MixAll.printObjectProperties(console, messageStoreConfig, true);System.exit(0);}log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);MixAll.printObjectProperties(log, brokerConfig);MixAll.printObjectProperties(log, nettyServerConfig);MixAll.printObjectProperties(log, nettyClientConfig);MixAll.printObjectProperties(log, messageStoreConfig);//K1 创建核心的BrokerController 在broker.conf里可以配置哪些属性?final BrokerController controller = new BrokerController(brokerConfig,nettyServerConfig,nettyClientConfig,messageStoreConfig);// remember all configs to prevent discardcontroller.getConfiguration().registerConfig(properties);//K1 初始化BrokerController。注意从中理解Broekr的组件结构boolean initResult = controller.initialize();if (!initResult) {controller.shutdown();System.exit(-3);}//优雅关闭,释放资源Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {private volatile boolean hasShutdown = false;private AtomicInteger shutdownTimes = new AtomicInteger(0);@Overridepublic void run() {synchronized (this) {log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet());if (!this.hasShutdown) {this.hasShutdown = true;long beginTime = System.currentTimeMillis();controller.shutdown();long consumingTimeTotal = System.currentTimeMillis() - beginTime;log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal);}}}}, "ShutdownHook"));return controller;} catch (Throwable e) {e.printStackTrace();System.exit(-1);}return null;}

首先: 在BrokerStartup.createBrokerController方法中可以看到Broker的几个核心配置:

BrokerConfig : Broker服务配置
MessageStoreConfig : 消息存储配置。 这两个配置参数都可以在broker.conf文件中进行配置
NettyServerConfig :Netty服务端占用了10911端口。同样也可以在配置文件中覆盖。
NettyClientConfig : Broker既要作为Netty服务端,向客户端提供核心业务能力,又要作为Netty客户端,向NameServer注册心跳。
这些配置是我们了解如何优化 RocketMQ 使用的关键。

   public static BrokerController start(BrokerController controller) {try {//K1 BrokerController启动controller.start();String tip = "The broker[" + controller.getBrokerConfig().getBrokerName() + ", "+ controller.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();if (null != controller.getBrokerConfig().getNamesrvAddr()) {tip += " and name server is " + controller.getBrokerConfig().getNamesrvAddr();}log.info(tip);System.out.printf("%s%n", tip);return controller;} catch (Throwable e) {e.printStackTrace();System.exit(-1);}return null;}
 //BrokerController核心的启动方法public void start() throws Exception {//存储组件,这里启动服务主要是为了将CommitLog的写入事件分发给ComsumeQueue和IndexFileif (this.messageStore != null) {this.messageStore.start();}//Broker中启动了两个Netty服务if (this.remotingServer != null) {this.remotingServer.start();}if (this.fastRemotingServer != null) {this.fastRemotingServer.start();}if (this.fileWatchService != null) {this.fileWatchService.start();}//Broker的brokerOuterAPI可以理解为一个Netty客户端,往外发请求的组件。例如发送心跳if (this.brokerOuterAPI != null) {this.brokerOuterAPI.start();}//长轮询请求暂存服务if (this.pullRequestHoldService != null) {this.pullRequestHoldService.start();}if (this.clientHousekeepingService != null) {this.clientHousekeepingService.start();}if (this.filterServerManager != null) {this.filterServerManager.start();}if (!messageStoreConfig.isEnableDLegerCommitLog()) {startProcessorByHa(messageStoreConfig.getBrokerRole());handleSlaveSynchronize(messageStoreConfig.getBrokerRole());this.registerBrokerAll(true, false, true);}//K2 Broker核心的心跳注册任务,需要深入解读下。this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {//同时向所有NameServer发起心跳注册BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());} catch (Throwable e) {log.error("registerBrokerAll Exception", e);}}}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);if (this.brokerStatsManager != null) {this.brokerStatsManager.start();}if (this.brokerFastFailure != null) {this.brokerFastFailure.start();}}

然后: 在BrokerController.start方法可以看到启动了一大堆Broker的核心服务,我们挑一些重要的

this.messageStore.start();//启动核心的消息存储组件
this.remotingServer.start();
this.fastRemotingServer.start(); //启动两个Netty服务
this.brokerOuterAPI.start();//启动客户端,往外发请求
BrokerController.this.registerBrokerAll: //向NameServer注册心跳。
this.brokerStatsManager.start();
this.brokerFastFailure.start();//这也是一些负责具体业务的功能组件

我们现在不需要了解这些核心组件的具体功能,只要有个大概,Broker中有一大堆的功能组件负责具体的业务。后面等到分析具体业务时再去深入每个服务的细节。

我们需要抽象出Broker的一个整体结构:
在这里插入图片描述
可以看到Broker启动了两个Netty服务,他们的功能基本差不多。实际上,在应用中,可以通过producer.setSendMessageWithVIPChannel(true),让少量比较重要的producer走VIP的通道。而在消费者端,也可以通过consumer.setVipChannelEnabled(true),让消费者支持VIP通道的数据。

Netty服务注册框架

关注重点

网络通信服务是构建分布式应用的基础,也是我们去理解RocketMQ底层业务的基础。这里就重点梳理RocketMQ的这个服务注册框架,理解各个业务进程之间是如何进行RPC远程通信的。

​ Netty的所有远程通信功能都由remoting模块实现。RemotingServer模块里包含了RPC的服务端RemotingServer以及客户端RemotingClient。在RocketMQ中,涉及到的远程服务非常多,在RocketMQ中,NameServer主要是RPC的服务端RemotingServer,Broker对于客户端来说,是RPC的服务端RemotingServer,而对于NameServer来说,又是RPC的客户端。各种Client是RPC的客户端RemotingClient

​ 需要理解的是,RocketMQ基于Netty保持客户端与服务端的长连接Channel。只要Channel是稳定的,那么即可以从客户端发请求到服务端,同样服务端也可以发请求到客户端。例如在事务消息场景中,就需要Broker多次主动向Producer发送请求确认事务的状态。所以,RemotingServer和RemotingClient都需要注册自己的服务。

源码重点

​ 1、哪些组件需要Netty服务端?哪些组件需要Netty客户端?

比较好理解的,NameServer需要NettyServer。客户端,Producer和Consuer,需要NettyClient。Broker需要NettyServer响应客户端请求,需要NettyClient向NameServer注册心跳。但是有个问题, 事务消息的Producer也需要响应Broker的事务状态回查,他需要NettyServer吗?

NameServer不需要NettyClient,这也验证了之前介绍的NameServer之间不需要进行数据同步的说法。

NettyRemotingServer的构造方法

public NettyRemotingServer(final NettyServerConfig nettyServerConfig,final ChannelEventListener channelEventListener) {super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());this.serverBootstrap = new ServerBootstrap();this.nettyServerConfig = nettyServerConfig;this.channelEventListener = channelEventListener;int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();if (publicThreadNums <= 0) {publicThreadNums = 4;}this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());}});if (useEpoll()) {//针对linux系统,减少GC请求this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet()));}});this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);private int threadTotal = nettyServerConfig.getServerSelectorThreads();@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));}});} else {this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyNIOBoss_%d", this.threadIndex.incrementAndGet()));}});this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);private int threadTotal = nettyServerConfig.getServerSelectorThreads();@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));}});}loadSslContext();}

启动方法

@Overridepublic void start() {this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyServerConfig.getServerWorkerThreads(),new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());}});prepareSharableHandlers();ServerBootstrap childHandler =this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector).channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, nettyServerConfig.getServerSocketBacklog()).option(ChannelOption.SO_REUSEADDR, true).option(ChannelOption.SO_KEEPALIVE, false).childOption(ChannelOption.TCP_NODELAY, true).localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler).addLast(defaultEventExecutorGroup,encoder,//序列化操作,将请求参数解析为二进制new NettyDecoder(),//将二进制转成RemotingCommand对象//处理心跳,表示120s没有读写请求,就会断掉new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),connectionManageHandler,//处理连接serverHandler//处理服务);}});if (nettyServerConfig.getServerSocketSndBufSize() > 0) {log.info("server set SO_SNDBUF to {}", nettyServerConfig.getServerSocketSndBufSize());childHandler.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize());}if (nettyServerConfig.getServerSocketRcvBufSize() > 0) {log.info("server set SO_RCVBUF to {}", nettyServerConfig.getServerSocketRcvBufSize());childHandler.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize());}if (nettyServerConfig.getWriteBufferLowWaterMark() > 0 && nettyServerConfig.getWriteBufferHighWaterMark() > 0) {log.info("server set netty WRITE_BUFFER_WATER_MARK to {},{}",nettyServerConfig.getWriteBufferLowWaterMark(), nettyServerConfig.getWriteBufferHighWaterMark());childHandler.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(nettyServerConfig.getWriteBufferLowWaterMark(), nettyServerConfig.getWriteBufferHighWaterMark()));}if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);}try {ChannelFuture sync = this.serverBootstrap.bind().sync();InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();this.port = addr.getPort();} catch (InterruptedException e1) {throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);}if (this.channelEventListener != null) {this.nettyEventExecutor.start();}this.timer.scheduleAtFixedRate(new TimerTask() {@Overridepublic void run() {try {NettyRemotingServer.this.scanResponseTable();} catch (Throwable e) {log.error("scanResponseTable exception", e);}}}, 1000 * 3, 1000);}

2、所有的RPC请求数据都封账成RemotingCommand对象。而每个处理消息的服务逻辑,都会封装成一个NettyRequestProcessor对象。

public class NettyEncoder extends MessageToByteEncoder<RemotingCommand> {private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);//所有的请求参数都封装在RemotingCommand对象中,转成二进制,再转发出去@Overridepublic void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)throws Exception {try {remotingCommand.fastEncodeHeader(out);byte[] body = remotingCommand.getBody();if (body != null) {out.writeBytes(body);}} catch (Exception e) {log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);if (remotingCommand != null) {log.error(remotingCommand.toString());}RemotingUtil.closeChannel(ctx.channel());}}
}
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;final int opaque = cmd.getOpaque();if (pair != null) {Runnable run = new Runnable() {@Overridepublic void run() {try {String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());doBeforeRpcHooks(remoteAddr, cmd);//服务端处理请求结束的回调final RemotingResponseCallback callback = new RemotingResponseCallback() {@Overridepublic void callback(RemotingCommand response) {doAfterRpcHooks(remoteAddr, cmd, response);if (!cmd.isOnewayRPC()) {if (response != null) {response.setOpaque(opaque);response.markResponseType();response.setSerializeTypeCurrentRPC(cmd.getSerializeTypeCurrentRPC());try {ctx.writeAndFlush(response);} catch (Throwable e) {log.error("process request over, but response failed", e);log.error(cmd.toString());log.error(response.toString());}} else {}}}};if (pair.getObject1() instanceof AsyncNettyRequestProcessor) {AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1();processor.asyncProcessRequest(ctx, cmd, callback);} else {NettyRequestProcessor processor = pair.getObject1();RemotingCommand response = processor.processRequest(ctx, cmd);callback.callback(response);}} catch (Throwable e) {log.error("process request exception", e);log.error(cmd.toString());if (!cmd.isOnewayRPC()) {final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,RemotingHelper.exceptionSimpleDesc(e));response.setOpaque(opaque);ctx.writeAndFlush(response);}}}};if (pair.getObject1().rejectRequest()) {final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,"[REJECTREQUEST]system busy, start flow control for a while");response.setOpaque(opaque);ctx.writeAndFlush(response);return;}try {final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);pair.getObject2().submit(requestTask);} catch (RejectedExecutionException e) {if ((System.currentTimeMillis() % 10000) == 0) {log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())+ ", too many requests and system thread pool busy, RejectedExecutionException "+ pair.getObject2().toString()+ " request code: " + cmd.getCode());}if (!cmd.isOnewayRPC()) {final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,"[OVERLOAD]system busy, start flow control for a while");response.setOpaque(opaque);ctx.writeAndFlush(response);}}} else {String error = " request type " + cmd.getCode() + " not supported";final RemotingCommand response =RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);response.setOpaque(opaque);ctx.writeAndFlush(response);log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);}}

​ 3、服务端和客户端都维护一个processorTable,这是个HashMap。key是服务码requestCode,value是对应的运行单元 Pair<NettyRequestProcessor,ExecutorService>类型,包含了处理Processor和执行线程的线程池。具体的Processor,由业务系统自行注册。Broker服务注册见,BrokerController.registerProcessor(),客户端的服务注册见MQClientAPIImpl。NameServer则会注册一个大的DefaultRequestProcessor,统一处理所有服务。

 //Broker注册处理器public void registerProcessor() {/*** SendMessageProcessor*/SendMessageProcessor sendProcessor = new SendMessageProcessor(this);sendProcessor.registerSendMessageHook(sendMessageHookList);sendProcessor.registerConsumeMessageHook(consumeMessageHookList);this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);/*** PullMessageProcessor*/this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);/*** ReplyMessageProcessor*/ReplyMessageProcessor replyMessageProcessor = new ReplyMessageProcessor(this);replyMessageProcessor.registerSendMessageHook(sendMessageHookList);this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor);this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor);/*** QueryMessageProcessor*/NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this);this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);/*** ClientManageProcessor*/ClientManageProcessor clientProcessor = new ClientManageProcessor(this);this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);/*** ConsumerManageProcessor*/ConsumerManageProcessor consumerManageProcessor = new ConsumerManageProcessor(this);this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);/*** EndTransactionProcessor*/this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor);this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor);/*** Default*/AdminBrokerProcessor adminProcessor = new AdminBrokerProcessor(this);this.remotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);this.fastRemotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);}
//nameServer注册处理器private void registerProcessor() {if (namesrvConfig.isClusterTest()) {//测试集群this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),this.remotingExecutor);} else {//业务比较简单,所有功能整合到一个Processor里。this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);}}
 public DefaultRequestProcessor(NamesrvController namesrvController) {this.namesrvController = namesrvController;}//K2 NameServer处理请求的核心代码@Overridepublic RemotingCommand processRequest(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {if (ctx != null) {log.debug("receive request, {} {} {}",request.getCode(),RemotingHelper.parseChannelRemoteAddr(ctx.channel()),request);}//通过响应码处理不同业务switch (request.getCode()) {case RequestCode.PUT_KV_CONFIG:return this.putKVConfig(ctx, request);case RequestCode.GET_KV_CONFIG:return this.getKVConfig(ctx, request);case RequestCode.DELETE_KV_CONFIG:return this.deleteKVConfig(ctx, request);case RequestCode.QUERY_DATA_VERSION:return queryBrokerTopicConfig(ctx, request);case RequestCode.REGISTER_BROKER:Version brokerVersion = MQVersion.value2Version(request.getVersion());if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {return this.registerBrokerWithFilterServer(ctx, request);} else {return this.registerBroker(ctx, request);}case RequestCode.UNREGISTER_BROKER:return this.unregisterBroker(ctx, request);case RequestCode.GET_ROUTEINFO_BY_TOPIC:return this.getRouteInfoByTopic(ctx, request);case RequestCode.GET_BROKER_CLUSTER_INFO:return this.getBrokerClusterInfo(ctx, request);case RequestCode.WIPE_WRITE_PERM_OF_BROKER:return this.wipeWritePermOfBroker(ctx, request);case RequestCode.ADD_WRITE_PERM_OF_BROKER:return this.addWritePermOfBroker(ctx, request);case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:return getAllTopicListFromNameserver(ctx, request);case RequestCode.DELETE_TOPIC_IN_NAMESRV:return deleteTopicInNamesrv(ctx, request);case RequestCode.GET_KVLIST_BY_NAMESPACE:return this.getKVListByNamespace(ctx, request);case RequestCode.GET_TOPICS_BY_CLUSTER:return this.getTopicsByCluster(ctx, request);case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:return this.getSystemTopicListFromNs(ctx, request);case RequestCode.GET_UNIT_TOPIC_LIST:return this.getUnitTopicList(ctx, request);case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:return this.getHasUnitSubTopicList(ctx, request);case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:return this.getHasUnitSubUnUnitTopicList(ctx, request);case RequestCode.UPDATE_NAMESRV_CONFIG:return this.updateConfig(ctx, request);case RequestCode.GET_NAMESRV_CONFIG:return this.getConfig(ctx, request);default:break;}return null;}

​ 4、请求类型分为REQUESTRESPONSE。这是为了支持异步的RPC调用。NettyServer处理完请求后,可以先缓存到responseTable中,等NettyClient下次来获取,这样就不用阻塞Channel了,可以提升请求吞吐量

 class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {processMessageReceived(ctx, msg);}}
 public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {final RemotingCommand cmd = msg;//远端发过来的请求if (cmd != null) {switch (cmd.getType()) {case REQUEST_COMMAND://处理同步方式processRequestCommand(ctx, cmd);break;case RESPONSE_COMMAND://处理异步方式processResponseCommand(ctx, cmd);break;default:break;}}}

​ 5、重点理解remoting包中是如何实现全流程异步化。

整体RPC框架流程如下图:
在这里插入图片描述
​ RocketMQ使用Netty框架提供了一套基于服务码的服务注册机制,让各种不同的组件都可以按照自己的需求,注册自己的服务方法。RocketMQ的这一套服务注册机制,是非常简洁使用的。在使用Netty进行其他相关应用开发时,都可以借鉴他的这一套服务注册机制。例如要开发一个大型的IM项目,要加减好友、发送文本,图片,甚至红包、维护群聊信息等等各种各样的请求,这些请求如何封装,就可以很好的参考这个框架。

关于RocketMQ的同步结果推送与异步结果推送

RocketMQ的RemotingServer服务端,会维护一个responseTable,这是一个线程同步的Map结构。 key为请求的ID,value是异步的消息结果。ConcurrentMap<Integer /* opaque */, ResponseFuture> 。

​ 处理同步请求(NettyRemotingAbstract#invokeSyncImpl)时,处理的结果会存入responseTable,通过ResponseFuture提供一定的服务端异步处理支持,提升服务端的吞吐量。 请求返回后,立即从responseTable中移除请求记录。

​ 实际上,同步也是通过异步实现的。

//org.apache.rocketmq.remoting.netty.ResponseFuture//发送消息后,通过countDownLatch阻塞当前线程,造成同步等待的效果。public RemotingCommand waitResponse(final long timeoutMillis) throws InterruptedException {this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);return this.responseCommand;}//等待异步获取到消息后,再通过countDownLatch释放当前线程。public void putResponse(final RemotingCommand responseCommand) {this.responseCommand = responseCommand;this.countDownLatch.countDown();}

处理异步请求(NettyRemotingAbstract#invokeAsyncImpl)时,处理的结果依然会存入responsTable,等待客户端后续再来请求结果。但是他保存的依然是一个ResponseFuture,也就是在客户端请求结果时再去获取真正的结果。 另外,在RemotingServer启动时,会启动一个定时的线程任务,不断扫描responseTable,将其中过期的response清除掉。

//org.apache.rocketmq.remoting.netty.NettyRemotingServer
this.timer.scheduleAtFixedRate(new TimerTask() {@Overridepublic void run() {try {NettyRemotingServer.this.scanResponseTable();} catch (Throwable e) {log.error("scanResponseTable exception", e);}}}, 1000 * 3, 1000);

Broker心跳注册管理

关注重点

Broker会在启动时向所有NameServer注册自己的服务信息,并且会定时往NameServer发送心跳信息。而NameServer会维护Broker的路由列表,并对路由表进行实时更新。这一轮就重点梳理这个过程。

源码重点
public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<>();for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {TopicConfig tmp =new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),this.brokerConfig.getBrokerPermission());topicConfigTable.put(topicConfig.getTopicName(), tmp);}topicConfigWrapper.setTopicConfigTable(topicConfigTable);}//这里才是比较关键的地方。先判断是否需要注册,然后调用doRegisterBrokerAll方法真正去注册。if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),this.getBrokerAddr(),this.brokerConfig.getBrokerName(),this.brokerConfig.getBrokerId(),this.brokerConfig.getRegisterBrokerTimeoutMills())) {doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);}}
  //K2 Broker注册最核心的部分private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,TopicConfigSerializeWrapper topicConfigWrapper) {//通过这个API往外去发请求List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(this.brokerConfig.getBrokerClusterName(),this.getBrokerAddr(),this.brokerConfig.getBrokerName(),this.brokerConfig.getBrokerId(),this.getHAServerAddr(),topicConfigWrapper,this.filterServerManager.buildNewFilterServerList(),oneway,this.brokerConfig.getRegisterBrokerTimeoutMills(),this.brokerConfig.isCompressedRegister());if (registerBrokerResultList.size() > 0) {RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0);if (registerBrokerResult != null) {//注册完保存主从节点的地址if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());}this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());if (checkOrderConfig) {this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());}}}}
public List<RegisterBrokerResult> registerBrokerAll(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,final String haServerAddr,final TopicConfigSerializeWrapper topicConfigWrapper,final List<String> filterServerList,final boolean oneway,final int timeoutMills,final boolean compressed) {final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>();//通过客户端拿到nameServer的地址List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();if (nameServerAddressList != null && nameServerAddressList.size() > 0) {final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();requestHeader.setBrokerAddr(brokerAddr);requestHeader.setBrokerId(brokerId);requestHeader.setBrokerName(brokerName);requestHeader.setClusterName(clusterName);requestHeader.setHaServerAddr(haServerAddr);requestHeader.setCompressed(compressed);RegisterBrokerBody requestBody = new RegisterBrokerBody();requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);requestBody.setFilterServerList(filterServerList);final byte[] body = requestBody.encode(compressed);final int bodyCrc32 = UtilAll.crc32(body);requestHeader.setBodyCrc32(bodyCrc32);//利用CountDownLatch 多个nameServer一起注册完成,利用异步线程做一个同步请求final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());for (final String namesrvAddr : nameServerAddressList) {brokerOuterExecutor.execute(() -> {try {RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);if (result != null) {registerBrokerResultList.add(result);}log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);} catch (Exception e) {log.warn("registerBroker Exception, {}", namesrvAddr, e);} finally {countDownLatch.countDown();}});}try {countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {}}return registerBrokerResultList;}
private RegisterBrokerResult registerBroker(final String namesrvAddr,final boolean oneway,final int timeoutMills,final RegisterBrokerRequestHeader requestHeader,final byte[] body) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,InterruptedException {//将注册服务码和头信息封装成RemotingCommand 对象RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);request.setBody(body);if (oneway) {try {//通过remotingClient去注册this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);} catch (RemotingTooMuchRequestException e) {// Ignore}return null;}RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);assert response != null;switch (response.getCode()) {case ResponseCode.SUCCESS: {RegisterBrokerResponseHeader responseHeader =(RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);RegisterBrokerResult result = new RegisterBrokerResult();result.setMasterAddr(responseHeader.getMasterAddr());result.setHaServerAddr(responseHeader.getHaServerAddr());if (response.getBody() != null) {result.setKvTable(KVTable.decode(response.getBody(), KVTable.class));}return result;}default:break;}throw new MQBrokerException(response.getCode(), response.getRemark(), requestHeader == null ? null : requestHeader.getBrokerAddr());}

Broker启动后会立即发起向NameServer注册心跳。方法入口:BrokerController.this.registerBrokerAll。 然后启动一个定时任务,以10秒延迟,默认30秒的间隔持续向NameServer发送心跳。
在这里插入图片描述

 //K2 NameServer 实际处理Broker注册的地方public RemotingCommand registerBrokerWithFilterServer(ChannelHandlerContext ctx, RemotingCommand request)throws RemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();final RegisterBrokerRequestHeader requestHeader =(RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);if (!checksum(ctx, request, requestHeader)) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("crc32 not match");return response;}RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();if (request.getBody() != null) {try {registerBrokerBody = RegisterBrokerBody.decode(request.getBody(), requestHeader.isCompressed());} catch (Exception e) {throw new RemotingCommandException("Failed to decode RegisterBrokerBody", e);}} else {registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setCounter(new AtomicLong(0));registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setTimestamp(0);}//routeInfoManager就是管理路由信息的核心组件。RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(requestHeader.getClusterName(),requestHeader.getBrokerAddr(),requestHeader.getBrokerName(),requestHeader.getBrokerId(),requestHeader.getHaServerAddr(),registerBrokerBody.getTopicConfigSerializeWrapper(),registerBrokerBody.getFilterServerList(),ctx.channel());responseHeader.setHaServerAddr(result.getHaServerAddr());responseHeader.setMasterAddr(result.getMasterAddr());byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);response.setBody(jsonValue);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;}
  //注册Brokerpublic RegisterBrokerResult registerBroker(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,final String haServerAddr,final TopicConfigSerializeWrapper topicConfigWrapper,final List<String> filterServerList,final Channel channel) {RegisterBrokerResult result = new RegisterBrokerResult();try {try {//加锁,同一时间只能一个线程写this.lock.writeLock().lockInterruptibly();Set<String> brokerNames = this.clusterAddrTable.computeIfAbsent(clusterName, k -> new HashSet<>());brokerNames.add(brokerName);boolean registerFirst = false;//brokerAddrTable:核心路由信息表//nameServer端维护了Broker信息,需要给客户端推送,生产者拿到这个列表然后选一个broker去发消息,//同理消费者也需要拿到这个列表然后选一个broker去拉BrokerData brokerData = this.brokerAddrTable.get(brokerName);if (null == brokerData) {registerFirst = true;brokerData = new BrokerData(clusterName, brokerName, new HashMap<>());this.brokerAddrTable.put(brokerName, brokerData);}Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();//Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>//The same IP:PORT must only have one record in brokerAddrTableIterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();while (it.hasNext()) {Entry<Long, String> item = it.next();if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {log.debug("remove entry {} from brokerData", item);it.remove();}}String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);if (MixAll.MASTER_ID == brokerId) {log.info("cluster [{}] brokerName [{}] master address change from {} to {}",brokerData.getCluster(), brokerData.getBrokerName(), oldAddr, brokerAddr);}registerFirst = registerFirst || (null == oldAddr);if (null != topicConfigWrapper&& MixAll.MASTER_ID == brokerId) {if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())|| registerFirst) {ConcurrentMap<String, TopicConfig> tcTable =topicConfigWrapper.getTopicConfigTable();if (tcTable != null) {for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {this.createAndUpdateQueueData(brokerName, entry.getValue());}}}}//主要封装与客户端的channelBrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,new BrokerLiveInfo(System.currentTimeMillis(),topicConfigWrapper.getDataVersion(),channel,haServerAddr));if (null == prevBrokerLiveInfo) {log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);}if (filterServerList != null) {if (filterServerList.isEmpty()) {this.filterServerTable.remove(brokerAddr);} else {this.filterServerTable.put(brokerAddr, filterServerList);}}if (MixAll.MASTER_ID != brokerId) {String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);if (masterAddr != null) {BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);if (brokerLiveInfo != null) {result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());result.setMasterAddr(masterAddr);}}}} finally {this.lock.writeLock().unlock();}} catch (Exception e) {log.error("registerBroker Exception", e);}return result;}

​ NameServer内部会通过RouteInfoManager组件及时维护Broker信息。同时在NameServer启动时,会启动定时任务,扫描不活动的Broker。方法入口:NamesrvController.initialize方法。
在这里插入图片描述

  public int scanNotActiveBroker() {int removeCount = 0;Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();while (it.hasNext()) {Entry<String, BrokerLiveInfo> next = it.next();long last = next.getValue().getLastUpdateTimestamp();if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {RemotingUtil.closeChannel(next.getValue().getChannel());it.remove();log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);this.onChannelDestroy(next.getKey(), next.getValue().getChannel());removeCount++;}}return removeCount;}

在这里插入图片描述

极简化的服务注册发现流程

为什么RocketMQ要自己实现一个NameServer,而不用Zookeeper、Nacos这样现成的注册中心?

​ 首先,依赖外部组件会对产品的独立性形成侵入,不利于自己的版本演进。Kafka要抛弃Zookeeper就是一个先例。

​ 另外,其实更重要的还是对业务的合理设计。NameServer之间不进行信息同步,而是依赖Broker端向所有NameServer同时发起注册。这让NameServer的服务可以非常轻量。如果可能,你可以与Nacos或Zookeeper的核心流程做下对比。

​ 但是,要知道,这种极简的设计,其实是以牺牲数据一致性为代价的。Broker往多个NameServer同时发起注册,有可能部分NameServer注册成功,而部分NameServer注册失败了。这样,多个NameServer之间的数据是不一致的。作为注册中心,这是不可接受的。但是对于RocketMQ,这又变得可以接受了。因为客户端从NameServer上获得的,只要有一个正常运行的Broker就可以了,并不需要完整的Broker列表

Producer发送消息过程

关注重点

首先:回顾下我们之前的Producer使用案例。

Producer有两种:

一种是普通发送者:DefaultMQProducer。只负责发送消息,发送完消息,就可以停止了。
另一种是事务消息发送者: TransactionMQProducer。支持事务消息机制。需要在事务消息过程中提供事务状态确认的服务,这就要求事务消息发送者虽然是一个客户端,但是也要完成整个事务消息的确认机制后才能退出。
​ 事务消息机制后面将结合Broker进行整理分析。这一步暂不关注。我们只关注DefaultMQProducer的消息发送过程。

然后:整个Producer的使用流程,大致分为两个步骤:一是调用start方法,进行一大堆的准备工作。 二是各种send方法,进行消息发送。

​ 那我们重点关注以下几个问题:

1、Producer启动过程中启动了哪些服务

2、Producer如何管理broker路由信息。 可以设想一下,如果Producer启动了之后,NameServer挂了,那么Producer还能不能发送消息?希望你先从源码中进行猜想,然后自己设计实验进行验证。

3、关于Producer的负载均衡。也就是Producer到底将消息发到哪个MessageQueue中。这里可以结合顺序消息机制来理解一下。消息中那个莫名奇妙的MessageSelector到底是如何工作的。

源码重点

1、Producer的核心启动流程

​ 所有Producer的启动过程,最终都会调用到DefaultMQProducerImpl#start方法。在start方法中的通过一个mQClientFactory对象,启动生产者的一大堆重要服务。

​ 这里其实就是一种设计模式,虽然有很多种不同的客户端,但是这些客户端的启动流程最终都是统一的,全是交由mQClientFactory对象来启动。而不同之处在于这些客户端在启动过程中,按照服务端的要求注册不同的信息。例如生产者注册到producerTable,消费者注册到consumerTable,管理控制端注册到adminExtTable
在这里插入图片描述
2、发送消息的核心流程

​ 核心流程如下:
在这里插入图片描述

​ 1、发送消息时,会维护一个本地的topicPublishInfoTable缓存,DefaultMQProducer会尽量保证这个缓存数据是最新的。但是,如果NameServer挂了,那么DefaultMQProducer还是会基于这个本地缓存去找Broker。只要能找到Broker,还是可以正常发送消息到Broker的。 --可以在生产者示例中,start后打一个断点,然后把NameServer停掉,这时,Producer还是可以发送消息的。

​ 2、生产者如何找MessageQueue: 默认情况下,生产者是按照轮训的方式,依次轮训各个MessageQueue。但是如果某一次往一个Broker发送请求失败后,下一次就会跳过这个Broker。

 //org.apache.rocketmq.client.impl.producer.TopicPublishInfo//如果进到这里lastBrokerName不为空,那么表示上一次向这个Broker发送消息是失败的,这时就尽量不要再往这个Broker发送消息了。public MessageQueue selectOneMessageQueue(final String lastBrokerName) {if (lastBrokerName == null) {return selectOneMessageQueue();} else {for (int i = 0; i < this.messageQueueList.size(); i++) {int index = this.sendWhichQueue.incrementAndGet();int pos = Math.abs(index) % this.messageQueueList.size();if (pos < 0)pos = 0;MessageQueue mq = this.messageQueueList.get(pos);if (!mq.getBrokerName().equals(lastBrokerName)) {return mq;}}return selectOneMessageQueue();}}

​ 3、如果在发送消息时传了Selector,那么Producer就不会走这个负载均衡的逻辑,而是会使用Selector去寻找一个队列。 具体参见org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendSelectImpl 方法。

Consumer拉取消息过程

关注重点

结合我们之前的示例,回顾下消费者这一块的几个重点问题:

消费者也是有两种,推模式消费者和拉模式消费者。优秀的MQ产品都会有一个高级的目标,就是要提升整个消息处理的性能。而要提升性能,服务端的优化手段往往不够直接,最为直接的优化手段就是对消费者进行优化。所以在RocketMQ中,整个消费者的业务逻辑是非常复杂的,甚至某种程度上来说,比服务端更复杂,所以,在这里我们重点关注用得最多的推模式的消费者。

消费者组之间有集群模式和广播模式两种消费模式。我们就要了解下这两种集群模式是如何做的逻辑封装。

然后我们关注下消费者端的负载均衡的原理。即消费者是如何绑定消费队列的,哪些消费策略到底是如何落地的。

最后我们来关注下在推模式的消费者中,MessageListenerConcurrently 和MessageListenerOrderly这两种消息监听器的处理逻辑到底有什么不同,为什么后者能保持消息顺序。

源码重点

Consumer的核心启动过程和Producer是一样的, 最终都是通过mQClientFactory对象启动。不过之间添加了一些注册信息。整体的启动过程如下:
在这里插入图片描述

广播模式与集群模式的Offset处理

在DefaultMQPushConsumerImpl的start方法中,启动了非常多的核心服务。 比如,对于广播模式与集群模式的Offset处理

 if (this.defaultMQPushConsumer.getOffsetStore() != null) {this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();} else {switch (this.defaultMQPushConsumer.getMessageModel()) {case BROADCASTING:this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;case CLUSTERING:this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;default:break;}this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);}this.offsetStore.load();

​ 可以看到,广播模式是使用LocalFileOffsetStore,在Consumer本地保存Offset,而集群模式是使用RemoteBrokerOffsetStore,在Broker端远程保存offset。而这两种Offset的存储方式,最终都是通过维护本地的offsetTable缓存来管理Offset。

Consumer与MessageQueue建立绑定关系

​ start方法中还一个比较重要的东西是给rebalanceImpl设定了一个AllocateMessageQueueStrategy,用来给Consumer分配MessageQueue的。

this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
//Consumer负载均衡策略
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());

​ 这个AllocateMessageQueueStrategy就是用来给Consumer和MessageQueue之间建立一种对应关系的。也就是说,只要Topic当中的MessageQueue以及同一个ConsumerGroup中的Consumer实例都没有变动,那么某一个Consumer实例只是消费固定的一个或多个MessageQueue上的消息,其他Consumer不会来抢这个Consumer对应的MessageQueue。

​ 关于负载均衡机制,会在后面结合Producer的发送消息策略一起总结。不过这里,你可以想一下为什么要让一个MessageQueue只能由同一个ConsumerGroup中的一个Consumer实例来消费。

​ 其实原因很简单,因为Broker需要按照ConsumerGroup管理每个MessageQueue上的Offset,如果一个MessageQueue上有多个同属一个ConsumerGroup的Consumer实例,他们的处理进度就会不一样。这样的话,Offset就乱套了。

顺序消费与并发消费

​ 同样在start方法中,启动了consumerMessageService线程,进行消息拉取。

 //Consumer中自行指定的回调函数。if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {this.consumeOrderly = true;this.consumeMessageService =new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {this.consumeOrderly = false;this.consumeMessageService =new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());}

​ 可以看到, Consumer通过registerMessageListener方法指定的回调函数,都被封装成了ConsumerMessageService的子实现类。

​ 而对于这两个服务实现类的调用,会延续到DefaultMQPushConsumerImpl的pullCallback对象中。也就是Consumer每拉过来一批消息后,就向Broker提交下一个拉取消息的的请求。

这里也可以印证一个点,就是顺序消息,只对异步消费也就是推模式有效。同步消费的拉模式是无法进行顺序消费的。因为这个pullCallback对象,在拉模式的同步消费时,根本就没有往下传。
当然,这并不是说拉模式不能锁定队列进行顺序消费,拉模式在Consumer端应用就可以指定从哪个队列上拿消息。

 PullCallback pullCallback = new PullCallback() {@Overridepublic void onSuccess(PullResult pullResult) {if (pullResult != null) {//...switch (pullResult.getPullStatus()) {case FOUND://...DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(),processQueue,pullRequest.getMessageQueue(),dispatchToConsume);//... break;//...}}}

​ 而这里提交的,实际上是一个ConsumeRequest线程。而提交的这个ConsumeRequest线程,在两个不同的ConsumerService中有不同的实现。

​ 这其中,两者最为核心的区别在于ConsumerMessageOrderlyService是锁定了一个队列,处理完了之后,再消费下一个队列。

 public void run() {// ....final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);synchronized (objLock) {//....}}

​ 为什么给队列加个锁,就能保证顺序消费呢?结合顺序消息的实现机制理解一下。
在这里插入图片描述

​ 从源码中可以看到,Consumer提交请求时,都是往线程池里异步提交的请求。如果不加队列锁,那么就算Consumer提交针对同一个MessageQueue的拉取消息请求,这些请求都是异步执行,他们的返回顺序是乱的,无法进行控制。给队列加个锁之后,就保证了针对同一个队列的第二个请求,必须等第一个请求处理完了之后,释放了锁,才可以提交。这也是在异步情况下保证顺序的基础思路。

实际拉取消息还是通过PullMessageService完成的。

​ start方法中,相当于对很多消费者的服务进行初始化,包括指定一些服务的实现类,以及启动一些定时的任务线程,比如清理过期的请求缓存等。最后,会随着mQClientFactory组件的启动,启动一个PullMessageService。实际的消息拉取都交由PullMesasgeService进行。

​ 所谓消息推模式,其实还是通过Consumer拉消息实现的。

 //org.apache.rocketmq.client.impl.consumer.PullMessageServiceprivate void pullMessage(final PullRequest pullRequest) {final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());if (consumer != null) {DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;impl.pullMessage(pullRequest);} else {log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);}}

客户端负载均衡管理总结

​ 从之前Producer发送消息的过程以及Conmer拉取消息的过程,我们可以抽象出RocketMQ中一个消息分配的管理模型。这个模型是我们在使用RocketMQ时,很重要的进行性能优化的依据。

Producer负载均衡

​ Producer发送消息时,默认会轮询目标Topic下的所有MessageQueue,并采用递增取模的方式往不同的MessageQueue上发送消息,以达到让消息平均落在不同的queue上的目的。而由于MessageQueue是分布在不同的Broker上的,所以消息也会发送到不同的broker上。

​ 在之前源码中看到过,Producer轮训时,如果发现往某一个Broker上发送消息失败了,那么下一次会尽量避免再往同一个Broker上发送消息。但是,如果你的应用场景允许发送消息长延迟,也可以给Producer设定setSendLatencyFaultEnable(true)。这样对于某些Broker集群的网络不是很好的环境,可以提高消息发送成功的几率。
在这里插入图片描述
​ 同时生产者在发送消息时,可以指定一个MessageQueueSelector。通过这个对象来将消息发送到自己指定的MessageQueue上。这样可以保证消息局部有序。

Consumer负载均衡

Consumer也是以MessageQueue为单位来进行负载均衡。分为集群模式和广播模式。

1、集群模式

​ 在集群消费模式下,每条消息只需要投递到订阅这个topic的Consumer Group下的一个实例即可。RocketMQ采用主动拉取的方式拉取并消费消息,在拉取的时候需要明确指定拉取哪一条message queue。

​ 而每当实例的数量有变更,都会触发一次所有实例的负载均衡,这时候会按照queue的数量和实例的数量平均分配queue给每个实例。

​ 每次分配时,都会将MessageQueue和消费者ID进行排序后,再用不同的分配算法进行分配。内置的分配的算法共有六种,分别对应AllocateMessageQueueStrategy下的六种实现类,可以在consumer中直接set来指定。默认情况下使用的是最简单的平均分配策略。

AllocateMachineRoomNearby: 将同机房的Consumer和Broker优先分配在一起。
​ 这个策略可以通过一个machineRoomResolver对象来定制Consumer和Broker的机房解析规则。然后还需要引入另外一个分配策略来对同机房的Broker和Consumer进行分配。一般也就用简单的平均分配策略或者轮询分配策略。

感觉这东西挺鸡肋的,直接给个属性指定机房不是挺好的吗。
源码中有测试代码AllocateMachineRoomNearByTest。

​ 在示例中:Broker的机房指定方式: messageQueue.getBrokerName().split(“-”)[0],而Consumer的机房指定方式:clientID.split(“-”)[0]

​ clinetID的构建方式:见ClientConfig.buildMQClientId方法。按他的测试代码应该是要把clientIP指定为IDC1-CID-0这样的形式。

AllocateMessageQueueAveragely:平均分配。将所有MessageQueue平均分给每一个消费者

AllocateMessageQueueAveragelyByCircle: 轮询分配。轮流的给一个消费者分配一个MessageQueue。

AllocateMessageQueueByConfig: 不分配,直接指定一个messageQueue列表。类似于广播模式,直接指定所有队列。

AllocateMessageQueueByMachineRoom:按逻辑机房的概念进行分配。又是对BrokerName和ConsumerIdc有定制化的配置。

AllocateMessageQueueConsistentHash。源码中有测试代码AllocateMessageQueueConsitentHashTest。这个一致性哈希策略只需要指定一个虚拟节点数,是用的一个哈希环的算法,虚拟节点是为了让Hash数据在换上分布更为均匀。

最常用的就是平均分配和轮训分配了。例如平均分配时的分配情况是这样的:
在这里插入图片描述
​ 而轮训分配就不计算了,每次把一个队列分给下一个Consumer实例。

2、广播模式

​ 广播模式下,每一条消息都会投递给订阅了Topic的所有消费者实例,所以也就没有消息分配这一说。而在实现上,就是在Consumer分配Queue时,所有Consumer都分到所有的Queue。

​ 广播模式实现的关键是将消费者的消费偏移量不再保存到broker当中,而是保存到客户端当中,由客户端自行维护自己的消费偏移量。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/161560.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

堆/二叉堆详解[C/C++]

前言 堆是计算机科学中-类特殊的数据结构的统称。实现有很多,例如:大顶堆,小顶堆&#xff0c;斐波那契堆&#xff0c;左偏堆&#xff0c;斜堆等等。从子结点个数上可以分为二汊堆&#xff0c;N叉堆等等。本文将介绍的是二叉堆。 二叉堆的概念 1、引例 我们小时候&#xff0c;基…

04 MIT线性代数-矩阵的LU分解 Factorization into A=LU

目的: 从矩阵的角度理解高斯消元法, 完成LU分解得到ALU 1.矩阵乘积的逆矩阵 Inverse of a product 2.矩阵乘积的转置 Transpose of a product 3.转置矩阵的逆矩阵 Inverse of a transpose 4.矩阵的LU分解 U为上三角阵(Upper triangular matrix), L为下三角阵(Lower triangular…

【C++ 学习 ㉘】- 详解 C++11 的列表初始化

目录 一、C11 简介 二、列表初始化 2.1 - 统一初始化 2.2 - 列表初始化的使用细节 2.2.1 - 聚合类型的定义 2.2.2 - 注意事项 2.3 - initializer_list 2.3.1 - 基本使用 2.3.2 - 源码剖析 一、C11 简介 1998 年&#xff0c;C 标准委员会发布了第一版 C 标准&#xff0…

大数据Hadoop之——部署hadoop+hive+Mysql环境(window11)

一、安装JDK8 【温馨提示】对应后面安装的hadoop和hive版本&#xff0c;这里使用jdk8&#xff0c;这里不要用其他jdk了&#xff0c;可能会出现一些其他问题。 1&#xff09;JDK下载地址 Java Downloads | Oracle 按正常下载是需要先登录的&#xff0c;这里提供一个不用登录下载…

VMware虚拟机安装Linux系统的介绍

许多新手连 Windows 的安装都不太熟悉&#xff0c;更别提 Linux 的安装了&#xff1b;即使安装成功了&#xff0c;也有可能破坏现有的 Windows 系统&#xff0c;比如导致硬盘数据丢失、Windows 无法开机等。所以一直以来&#xff0c;安装 Linux 系统都是初学者的噩梦。 然而&a…

Zookeeper【Curator客户端Java版】从0到1——万字学习笔记

目录 初识Zookeeper Zookeeper作用 维护配置信息 分布式锁服务 集群管理 生产分布式唯一ID Zookeeper的设计目标 Zookeeper 工作机制 数据模型 ZooKeeper 命令操作 服务端常用命令 客户端常用命令 ZooKeeper JavaAPI操作 Curator 介绍 Curator API 常用操作 导入依赖 建立连接 …

PTE-精听学习(一)

目录 SST SST每一题都是单独计时 MMA 切换题目的时候&#xff0c;总是会迷茫 deduct 出现关键词之后&#xff0c;才开始精听 没有人管你 &#xff0c;绝对是要为后方留出更多的时间 &#xff0c;选多一个错的&#xff0c;要倒扣分 特征 1.paraphrase 2.循序出现 …

JDK 21的新特性总结和分析

&#x1f337;&#x1f341; 博主猫头虎 带您 Go to New World.✨&#x1f341; &#x1f984; 博客首页——猫头虎的博客&#x1f390; &#x1f433;《面试题大全专栏》 文章图文并茂&#x1f995;生动形象&#x1f996;简单易学&#xff01;欢迎大家来踩踩~&#x1f33a; &a…

Windows服务器安装php+mysql环境的经验分享

php mysql环境 下载IIS Php Mysql环境集成包,集成包下载地址: 1、Windows Server 2008 一键安装Web环境包 x64 适用64位操作系统服务器:下载地址:链接: https://pan.baidu.com/s/1MMOOLGll4D7Eb5tBrdTQZw 提取码: btnx 2、Windows Server 2008 一键安装Web环境包 32 适…

视频推拉流/直播点播平台EasyDSS分享的链接提示“无信号”,该如何解决?

视频直播点播平台EasyDSS可支持用户自行上传视频文件&#xff0c;也可将上传的点播文件作为虚拟直播进行播放。平台能支持多屏播放&#xff0c;可兼容Windows、Android、iOS、Mac等操作系统&#xff0c;还能支持CDN转推&#xff0c;具备较强的可拓展性与灵活性。 为给用户提供更…

AcWing算法提高课-5.6.2青蛙的约会

宣传一下 算法提高课整理 CSDN个人主页&#xff1a;更好的阅读体验 原题链接 题目描述 两只青蛙在网上相识了&#xff0c;它们聊得很开心&#xff0c;于是觉得很有必要见一面。 它们很高兴地发现它们住在同一条纬度线上&#xff0c;于是它们约定各自朝西跳&#xff0c;直到…

数据库设计与前端框架

数据库设计与前端框架 学习目标&#xff1a; 理解多租户的数据库设计方案 熟练使用PowerDesigner构建数据库模型理解前端工程的基本架构和执行流程 完成前端工程企业模块开发 多租户SaaS平台的数据库方案 多租户是什么 多租户技术&#xff08;Multi-TenancyTechnology&a…

PCI设备与UIO驱动

随着网络的高速发展,对网络的性能要求也越来越高,DPDK框架是目前的一种加速网络IO的解决方案之一,也是最为流行的一套方案。DPDK通过bypass内核协议栈与内核驱动,将驱动的工作从内核态移至用户态,并利用polling mode的线程工作模式加速网络I/O使得网络IO性能出现大幅度的增…

xml文件报错 ORA-00907: 缺失右括号

原来的sql 更改之后 加一个select * from &#xff08;&#xff09;

一文理解登录鉴权(Cookie、Session、Jwt、CAS、SSO)

1 前言 登录鉴权是任何一个网站都无法绕开的部分&#xff0c;当系统要正式上线前都会要求接入统一登陆系统&#xff0c;一方面能够让网站只允许合法的用户访问&#xff0c;另一方面&#xff0c;当用户在网站上进行操作时也需要识别操作的用户&#xff0c;用作后期的操作审计。…

嵌入式开发学习之STM32F407点亮LED及J-Link下载(二)

嵌入式开发学习之STM32F407点亮LED及J-Link下载&#xff08;二&#xff09; 开发涉及工具控制端口配置端口的设定与确认端口配置方法实现点亮LED程序下载与仿真 有工程实例&#xff0c;链接在最底部。 开发涉及工具 开发环境&#xff08;IDE&#xff09;&#xff1a;IAR-ARM8…

力扣每日一题46:全排列

题目描述&#xff1a; 给定一个不含重复数字的数组 nums &#xff0c;返回其 所有可能的全排列 。你可以 按任意顺序 返回答案。 示例 1&#xff1a; 输入&#xff1a;nums [1,2,3] 输出&#xff1a;[[1,2,3],[1,3,2],[2,1,3],[2,3,1],[3,1,2],[3,2,1]]示例 2&#xff1a; …

Tuxera NTFS2024最新永久版下载和安装

要使用Tuxera NTFS for Mac&#xff0c;你需要先下载和安装Tuxera NTFS for Mac驱动器&#xff0c;然后按照以下步骤操作&#xff1a; 1、下载和安装Tuxera NTFS for Mac 免费下载Tuxera NTFS for Mac驱动器的最新版本。下载完成后&#xff0c;双击DMG文件并按照提示安装即可…

云计算:shell脚本

shell脚本&#xff0c;会极大减少重复性工作&#xff0c;缩短很大时间。 脚本每个人都可以不一样&#xff0c;只要实现就可以。 注意&#xff1a;要多思考&#xff0c;把思路锻炼好。以后就可以写各种程序。 shell语言 学完shell之后&#xff0c;对Linux理解更深刻&#xff…

IDEA 修改插件安装位置

不说假话&#xff0c;一定要看到最后&#xff0c;不然你以为我为什么要自己总结&#xff01;&#xff01;&#xff01; IDEA 修改插件安装位置 前言步骤 前言 IDEA 默认的配置文件均安装在C盘&#xff0c;使用时间长会生成很多文件&#xff0c;这些文件会占用挤兑C盘空间&…