文章目录
- 一、源码环境搭建
- 主要功能模块
- 源码启动服务
- 启动nameServer
- 启动Broker
- 发送消息
- 消费消息
- 二、源码热身阶段
- NameServer的启动过程
- 关注重点
- 源码重点
- Broker服务启动过程
- 关注重点
- 源码重点
- Netty服务注册框架
- 关注重点
- 源码重点
- 关于RocketMQ的同步结果推送与异步结果推送
- Broker心跳注册管理
- 关注重点
- 源码重点
- 极简化的服务注册发现流程
- Producer发送消息过程
- 关注重点
- 源码重点
- Consumer拉取消息过程
- 关注重点
- 源码重点
- 广播模式与集群模式的Offset处理
- Consumer与MessageQueue建立绑定关系
- 顺序消费与并发消费
- 实际拉取消息还是通过PullMessageService完成的。
- 客户端负载均衡管理总结
- Producer负载均衡
- Consumer负载均衡
一、源码环境搭建
主要功能模块
RocketMQ的官方Git仓库地址:https://github.com/apache/rocketmq 可以用git把项目clone下来或者直接下载代码包。
也可以到RocketMQ的官方网站上下载指定版本的源码: http://rocketmq.apache.org/dowloading/releases/
源码下很多的功能模块,很容易让人迷失方向,我们只关注下几个最为重要的模块:
broker: Broker 模块(broke 启动进程)
client :消息客户端,包含消息生产者、消息消费者相关类
example: RocketMQ 例代码
namesrv:NameServer模块
store:消息存储模块
remoting:远程访问模块
源码启动服务
将源码导入IDEA后,需要先对源码进行编译。编译指令 clean install -Dmaven.test.skip=true
编译完成后就可以开始调试代码了。调试时需要按照以下步骤:
调试时,先在项目目录下创建一个conf目录,并从distribution拷贝broker.conf和logback_broker.xml和logback_namesrv.xml
启动nameServer
展开namesrv模块,运行NamesrvStartup类即可启动NameServer
启动时,会报错,提示需要配置一个ROCKETMQ_HOME环境变量。这个环境变量我们可以在机器上配置,跟配置JAVA_HOME环境变量一样。也可以在IDEA的运行环境中配置。目录指向源码目录即可。
配置完成后,再次执行,看到以下日志内容,表示NameServer启动成功
The Name Server boot success. serializeType=JSON
启动Broker
启动Broker之前,我们需要先修改之前复制的broker.conf文件
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH# 自动创建Topic
autoCreateTopicEnable=true
# nameServ地址
namesrvAddr=127.0.0.1:9876
# 存储路径
storePathRootDir=E:\\RocketMQ\\data\\rocketmq\\dataDir
# commitLog路径
storePathCommitLog=E:\\RocketMQ\\data\\rocketmq\\dataDir\\commitlog
# 消息队列存储路径
storePathConsumeQueue=E:\\RocketMQ\\data\\rocketmq\\dataDir\\consumequeue
# 消息索引存储路径
storePathIndex=E:\\RocketMQ\\data\\rocketmq\\dataDir\\index
# checkpoint文件路径
storeCheckpoint=E:\\RocketMQ\\data\\rocketmq\\dataDir\\checkpoint
# abort文件存储路径
abortFile=E:\\RocketMQ\\data\\rocketmq\\dataDir\\abort
然后Broker的启动类是broker模块下的BrokerStartup。
启动Broker时,同样需要ROCETMQ_HOME环境变量,并且还需要配置一个-c 参数,指向broker.conf配置文件。
然后重新启动,即可启动Broker。
发送消息
在源码的example模块下,提供了非常详细的测试代码。例如我们启动example模块下的org.apache.rocketmq.example.quickstart.Producer类即可发送消息。
public class Producer {/*** The number of produced messages.*/public static final int MESSAGE_COUNT = 1000;public static final String PRODUCER_GROUP = "please_rename_unique_group_name";public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";public static final String TOPIC = "TopicTest";public static final String TAG = "TagA";public static void main(String[] args) throws MQClientException, InterruptedException {/** Instantiate with a producer group name.*/DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);/** Specify name server addresses.** Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR* <pre>* {@code* producer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");* }* </pre>*/// Uncomment the following line while debugging, namesrvAddr should be set to your local address
// producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);/** Launch the instance.*/producer.start();for (int i = 0; i < MESSAGE_COUNT; i++) {try {/** Create a message instance, specifying topic, tag and message body.*/Message msg = new Message(TOPIC /* Topic */,TAG /* Tag */,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);/** Call send message to deliver message to one of brokers.*/SendResult sendResult = producer.send(msg);/** There are different ways to send message, if you don't care about the send result,you can use this way* {@code* producer.sendOneway(msg);* }*//** if you want to get the send result in a synchronize way, you can use this send method* {@code* SendResult sendResult = producer.send(msg);* System.out.printf("%s%n", sendResult);* }*//** if you want to get the send result in a asynchronize way, you can use this send method* {@code** producer.send(msg, new SendCallback() {* @Override* public void onSuccess(SendResult sendResult) {* // do something* }** @Override* public void onException(Throwable e) {* // do something* }*});**}*/System.out.printf("%s%n", sendResult);} catch (Exception e) {e.printStackTrace();Thread.sleep(1000);}}/** Shut down once the producer instance is not longer in use.*/producer.shutdown();}
}
但是在测试源码中,需要指定NameServer地址。这个NameServer地址有两种指定方式,一种是配置一个NAMESRV_ADDR的环境变量。另一种是在源码中指定。我们可以在源码中加一行代码指定NameServer
producer.setNamesrvAddr(“127.0.0.1:9876”);
然后就可以发送消息了。
消费消息
public class Consumer {public static final String CONSUMER_GROUP = "please_rename_unique_group_name_4";public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";public static final String TOPIC = "TopicTest";public static void main(String[] args) throws InterruptedException, MQClientException {/** Instantiate with specified consumer group name.*/DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);/** Specify name server addresses.* <p/>** Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR* <pre>* {@code* consumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");* }* </pre>*/// Uncomment the following line while debugging, namesrvAddr should be set to your local address
// consumer.setNamesrvAddr(DEFAULT_NAMESRVADDR);/** Specify where to start in case the specific consumer group is a brand-new one.*/consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);/** Subscribe one more topic to consume.*/consumer.subscribe(TOPIC, "*");/** Register callback to execute on arrival of messages fetched from brokers.*/consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});/** Launch the consumer instance.*/consumer.start();System.out.printf("Consumer Started.%n");}
}
我们可以使用同一模块下的org.apache.rocketmq.example.quickstart.Consumer类来消费消息。运行时同样需要指定NameServer地址
consumer.setNamesrvAddr(“192.168.232.128:9876”);
这样整个调试环境就搭建好了。
二、源码热身阶段
NameServer的启动过程
关注重点
在RocketMQ集群中,实际记性消息存储、推送等核心功能点是Broker。而NameServer的作用,其实和微服务中的注册中心非常类似,他只是提供了Broker端的服务注册与发现功能。
源码重点
== NameServer的启动入口类是org.apache.rocketmq.namesrv.NamesrvStartup。其中的核心是构建并启动一个NamesrvController==。这个Cotroller对象就跟MVC中的Controller是很类似的,都是响应客户端的请求。只不过,他响应的是基于Netty的客户端请求。
另外,他的实际启动过程,其实可以配合NameServer的启动脚本进行更深入的理解。
public static void main(String[] args) {main0(args);}public static NamesrvController main0(String[] args) {//K1 NameServer的核心组件,类似于Web应用中的Controller,负责接收处理网络请求。try {NamesrvController controller = createNamesrvController(args);start(controller);String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();log.info(tip);System.out.printf("%s%n", tip);return controller;} catch (Throwable e) {e.printStackTrace();System.exit(-1);}return null;}
public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));//设定版本//PackageConflictDetect.detectFastjson();Options options = ServerUtil.buildCommandlineOptions(new Options());commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());if (null == commandLine) {System.exit(-1);return null;}//K2 NameServer的两个核心配置final NamesrvConfig namesrvConfig = new NamesrvConfig();final NettyServerConfig nettyServerConfig = new NettyServerConfig();nettyServerConfig.setListenPort(9876);//默认直接指定9876端口//命令行参数 -c 和 -p 解析。// 怎么调整默认端口?//NamesrvConfig里configStorePath,默认路径System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties";//按照这个目录建立的文件,可在里面做配置 比如调整默认端口if (commandLine.hasOption('c')) {String file = commandLine.getOptionValue('c');if (file != null) {InputStream in = new BufferedInputStream(new FileInputStream(file));properties = new Properties();properties.load(in);MixAll.properties2Object(properties, namesrvConfig);MixAll.properties2Object(properties, nettyServerConfig);namesrvConfig.setConfigStorePath(file);System.out.printf("load config properties file OK, %s%n", file);in.close();}}// - p 查看配置信息if (commandLine.hasOption('p')) {InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);MixAll.printObjectProperties(console, namesrvConfig);MixAll.printObjectProperties(console, nettyServerConfig);System.exit(0);}MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);//ROCKETMQ_HOME环境变量监测if (null == namesrvConfig.getRocketmqHome()) {System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);System.exit(-2);}LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();JoranConfigurator configurator = new JoranConfigurator();configurator.setContext(lc);lc.reset();configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);MixAll.printObjectProperties(log, namesrvConfig);MixAll.printObjectProperties(log, nettyServerConfig);final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);// remember all configs to prevent discardcontroller.getConfiguration().registerConfig(properties);return controller;}
public static NamesrvController start(final NamesrvController controller) throws Exception {if (null == controller) {throw new IllegalArgumentException("NamesrvController is null");}//初始化,主要是几个定时任务boolean initResult = controller.initialize();if (!initResult) {controller.shutdown();System.exit(-3);}//服务关闭钩子,在服务正常关闭时执行。 去释放一些进程资源,但是如果是非正常关闭就不会触发钩子Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, (Callable<Void>) () -> {controller.shutdown();return null;}));//启动服务controller.start();return controller;}
public boolean initialize() {this.kvConfigManager.load();//创建NettyServer网络处理对象,做RPC请求的this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);//Netty服务器的工作线程池this.remotingExecutor =Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));//K1 NameServer注册Processor到RemotingServer中。this.registerProcessor();this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.routeInfoManager::scanNotActiveBroker, 5, 10, TimeUnit.SECONDS);this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.kvConfigManager::printAllPeriodically, 1, 10, TimeUnit.MINUTES);//TLS是一个安全传输层协议,相关参数只能用JVM指令注入if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {// Register a listener to reload SslContexttry {fileWatchService = new FileWatchService(new String[] {TlsSystemConfig.tlsServerCertPath,TlsSystemConfig.tlsServerKeyPath,TlsSystemConfig.tlsServerTrustCertPath},new FileWatchService.Listener() {boolean certChanged, keyChanged = false;@Overridepublic void onChanged(String path) {if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {log.info("The trust certificate changed, reload the ssl context");reloadServerSslContext();}if (path.equals(TlsSystemConfig.tlsServerCertPath)) {certChanged = true;}if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {keyChanged = true;}if (certChanged && keyChanged) {log.info("The certificate and private key changed, reload the ssl context");certChanged = keyChanged = false;reloadServerSslContext();}}private void reloadServerSslContext() {((NettyRemotingServer) remotingServer).loadSslContext();}});} catch (Exception e) {log.warn("FileWatchService created error, can't load the certificate dynamically");}}return true;}
//启动服务public void start() throws Exception {//启动了一个Netty服务端this.remotingServer.start();if (this.fileWatchService != null) {this.fileWatchService.start();}}
从NameServer启动和关闭这两个关键步骤,我们可以总结出NameServer的组件其实并不是很多,整个NameServer的结构是这样的;
这两个配置类就可以用来指导如何优化Nameserver的配置。比如,如何调整nameserver的端口?
从这里也能看出, RocketMQ的整体源码风格就是典型的MVC思想。Controller响应请求,Service处理业务,各种Table保存消息。
然后可以看出nameServer只接收请求,而不对外发送请求(只有server,没有client),从而侧面验证了nameserver有别于zk的注册中心,不保证数据的一致性,它们之间不存在数据交互
Broker服务启动过程
关注重点
Broker是整个RocketMQ的业务核心。所有消息存储、转发这些重要的业务都是Broker进行处理。
这里重点梳理Broker有哪些内部服务。这些内部服务将是整理Broker核心业务流程的起点。
源码重点
Broker启动的入口在BrokerStartup这个类,可以从他的main方法开始调试。
public static void main(String[] args) {start(createBrokerController(args));}
启动过程关键点:重点也是围绕一个BrokerController对象,先创建,然后再启动。
public static BrokerController createBrokerController(String[] args) {//获取版本System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));try {//PackageConflictDetect.detectFastjson();封装参数Options options = ServerUtil.buildCommandlineOptions(new Options());commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),new PosixParser());if (null == commandLine) {System.exit(-1);}//K2 Broker的核心配置final BrokerConfig brokerConfig = new BrokerConfig();final NettyServerConfig nettyServerConfig = new NettyServerConfig();final NettyClientConfig nettyClientConfig = new NettyClientConfig();nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));nettyServerConfig.setListenPort(10911);//默认10911端口//K2 存储相关的配置信息final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();//SLAVE使用的消息常驻内存比例比Master低10%if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) {int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);}if (commandLine.hasOption('c')) {String file = commandLine.getOptionValue('c');if (file != null) {configFile = file;InputStream in = new BufferedInputStream(new FileInputStream(file));properties = new Properties();properties.load(in);properties2SystemEnv(properties);MixAll.properties2Object(properties, brokerConfig);MixAll.properties2Object(properties, nettyServerConfig);MixAll.properties2Object(properties, nettyClientConfig);MixAll.properties2Object(properties, messageStoreConfig);BrokerPathConfigHelper.setBrokerConfigPath(file);in.close();}}MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);if (null == brokerConfig.getRocketmqHome()) {System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV);System.exit(-2);}String namesrvAddr = brokerConfig.getNamesrvAddr();if (null != namesrvAddr) {try {String[] addrArray = namesrvAddr.split(";");for (String addr : addrArray) {RemotingUtil.string2SocketAddress(addr);}} catch (Exception e) {System.out.printf("The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n",namesrvAddr);System.exit(-3);}}//通过brokerId判断主从switch (messageStoreConfig.getBrokerRole()) {case ASYNC_MASTER:case SYNC_MASTER:brokerConfig.setBrokerId(MixAll.MASTER_ID);//MASTER_ID=0Lbreak;case SLAVE:if (brokerConfig.getBrokerId() <= 0) {System.out.printf("Slave's brokerId must be > 0");System.exit(-3);}break;default:break;}//Dledger集群的所有Broker节点ID都是-1if (messageStoreConfig.isEnableDLegerCommitLog()) {brokerConfig.setBrokerId(-1);}messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1);LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();JoranConfigurator configurator = new JoranConfigurator();configurator.setContext(lc);lc.reset();System.setProperty("brokerLogDir", "");if (brokerConfig.isIsolateLogEnable()) {System.setProperty("brokerLogDir", brokerConfig.getBrokerName() + "_" + brokerConfig.getBrokerId());}if (brokerConfig.isIsolateLogEnable() && messageStoreConfig.isEnableDLegerCommitLog()) {System.setProperty("brokerLogDir", brokerConfig.getBrokerName() + "_" + messageStoreConfig.getdLegerSelfId());}configurator.doConfigure(brokerConfig.getRocketmqHome() + "/conf/logback_broker.xml");if (commandLine.hasOption('p')) {InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);MixAll.printObjectProperties(console, brokerConfig);MixAll.printObjectProperties(console, nettyServerConfig);MixAll.printObjectProperties(console, nettyClientConfig);MixAll.printObjectProperties(console, messageStoreConfig);System.exit(0);} else if (commandLine.hasOption('m')) {InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);MixAll.printObjectProperties(console, brokerConfig, true);MixAll.printObjectProperties(console, nettyServerConfig, true);MixAll.printObjectProperties(console, nettyClientConfig, true);MixAll.printObjectProperties(console, messageStoreConfig, true);System.exit(0);}log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);MixAll.printObjectProperties(log, brokerConfig);MixAll.printObjectProperties(log, nettyServerConfig);MixAll.printObjectProperties(log, nettyClientConfig);MixAll.printObjectProperties(log, messageStoreConfig);//K1 创建核心的BrokerController 在broker.conf里可以配置哪些属性?final BrokerController controller = new BrokerController(brokerConfig,nettyServerConfig,nettyClientConfig,messageStoreConfig);// remember all configs to prevent discardcontroller.getConfiguration().registerConfig(properties);//K1 初始化BrokerController。注意从中理解Broekr的组件结构boolean initResult = controller.initialize();if (!initResult) {controller.shutdown();System.exit(-3);}//优雅关闭,释放资源Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {private volatile boolean hasShutdown = false;private AtomicInteger shutdownTimes = new AtomicInteger(0);@Overridepublic void run() {synchronized (this) {log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet());if (!this.hasShutdown) {this.hasShutdown = true;long beginTime = System.currentTimeMillis();controller.shutdown();long consumingTimeTotal = System.currentTimeMillis() - beginTime;log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal);}}}}, "ShutdownHook"));return controller;} catch (Throwable e) {e.printStackTrace();System.exit(-1);}return null;}
首先: 在BrokerStartup.createBrokerController方法中可以看到Broker的几个核心配置:
BrokerConfig : Broker服务配置
MessageStoreConfig : 消息存储配置。 这两个配置参数都可以在broker.conf文件中进行配置
NettyServerConfig :Netty服务端占用了10911端口。同样也可以在配置文件中覆盖。
NettyClientConfig : Broker既要作为Netty服务端,向客户端提供核心业务能力,又要作为Netty客户端,向NameServer注册心跳。
这些配置是我们了解如何优化 RocketMQ 使用的关键。
public static BrokerController start(BrokerController controller) {try {//K1 BrokerController启动controller.start();String tip = "The broker[" + controller.getBrokerConfig().getBrokerName() + ", "+ controller.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();if (null != controller.getBrokerConfig().getNamesrvAddr()) {tip += " and name server is " + controller.getBrokerConfig().getNamesrvAddr();}log.info(tip);System.out.printf("%s%n", tip);return controller;} catch (Throwable e) {e.printStackTrace();System.exit(-1);}return null;}
//BrokerController核心的启动方法public void start() throws Exception {//存储组件,这里启动服务主要是为了将CommitLog的写入事件分发给ComsumeQueue和IndexFileif (this.messageStore != null) {this.messageStore.start();}//Broker中启动了两个Netty服务if (this.remotingServer != null) {this.remotingServer.start();}if (this.fastRemotingServer != null) {this.fastRemotingServer.start();}if (this.fileWatchService != null) {this.fileWatchService.start();}//Broker的brokerOuterAPI可以理解为一个Netty客户端,往外发请求的组件。例如发送心跳if (this.brokerOuterAPI != null) {this.brokerOuterAPI.start();}//长轮询请求暂存服务if (this.pullRequestHoldService != null) {this.pullRequestHoldService.start();}if (this.clientHousekeepingService != null) {this.clientHousekeepingService.start();}if (this.filterServerManager != null) {this.filterServerManager.start();}if (!messageStoreConfig.isEnableDLegerCommitLog()) {startProcessorByHa(messageStoreConfig.getBrokerRole());handleSlaveSynchronize(messageStoreConfig.getBrokerRole());this.registerBrokerAll(true, false, true);}//K2 Broker核心的心跳注册任务,需要深入解读下。this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {//同时向所有NameServer发起心跳注册BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());} catch (Throwable e) {log.error("registerBrokerAll Exception", e);}}}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);if (this.brokerStatsManager != null) {this.brokerStatsManager.start();}if (this.brokerFastFailure != null) {this.brokerFastFailure.start();}}
然后: 在BrokerController.start方法可以看到启动了一大堆Broker的核心服务,我们挑一些重要的
this.messageStore.start();//启动核心的消息存储组件
this.remotingServer.start();
this.fastRemotingServer.start(); //启动两个Netty服务
this.brokerOuterAPI.start();//启动客户端,往外发请求
BrokerController.this.registerBrokerAll: //向NameServer注册心跳。
this.brokerStatsManager.start();
this.brokerFastFailure.start();//这也是一些负责具体业务的功能组件
我们现在不需要了解这些核心组件的具体功能,只要有个大概,Broker中有一大堆的功能组件负责具体的业务。后面等到分析具体业务时再去深入每个服务的细节。
我们需要抽象出Broker的一个整体结构:
可以看到Broker启动了两个Netty服务,他们的功能基本差不多。实际上,在应用中,可以通过producer.setSendMessageWithVIPChannel(true),让少量比较重要的producer走VIP的通道。而在消费者端,也可以通过consumer.setVipChannelEnabled(true),让消费者支持VIP通道的数据。
Netty服务注册框架
关注重点
网络通信服务是构建分布式应用的基础,也是我们去理解RocketMQ底层业务的基础。这里就重点梳理RocketMQ的这个服务注册框架,理解各个业务进程之间是如何进行RPC远程通信的。
Netty的所有远程通信功能都由remoting模块实现。RemotingServer模块里包含了RPC的服务端RemotingServer以及客户端RemotingClient。在RocketMQ中,涉及到的远程服务非常多,在RocketMQ中,NameServer主要是RPC的服务端RemotingServer,Broker对于客户端来说,是RPC的服务端RemotingServer,而对于NameServer来说,又是RPC的客户端。各种Client是RPC的客户端RemotingClient。
需要理解的是,RocketMQ基于Netty保持客户端与服务端的长连接Channel。只要Channel是稳定的,那么即可以从客户端发请求到服务端,同样服务端也可以发请求到客户端。例如在事务消息场景中,就需要Broker多次主动向Producer发送请求确认事务的状态。所以,RemotingServer和RemotingClient都需要注册自己的服务。
源码重点
1、哪些组件需要Netty服务端?哪些组件需要Netty客户端?
比较好理解的,NameServer需要NettyServer。客户端,Producer和Consuer,需要NettyClient。Broker需要NettyServer响应客户端请求,需要NettyClient向NameServer注册心跳。但是有个问题, 事务消息的Producer也需要响应Broker的事务状态回查,他需要NettyServer吗?
NameServer不需要NettyClient,这也验证了之前介绍的NameServer之间不需要进行数据同步的说法。
NettyRemotingServer的构造方法
public NettyRemotingServer(final NettyServerConfig nettyServerConfig,final ChannelEventListener channelEventListener) {super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());this.serverBootstrap = new ServerBootstrap();this.nettyServerConfig = nettyServerConfig;this.channelEventListener = channelEventListener;int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();if (publicThreadNums <= 0) {publicThreadNums = 4;}this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());}});if (useEpoll()) {//针对linux系统,减少GC请求this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet()));}});this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);private int threadTotal = nettyServerConfig.getServerSelectorThreads();@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));}});} else {this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyNIOBoss_%d", this.threadIndex.incrementAndGet()));}});this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);private int threadTotal = nettyServerConfig.getServerSelectorThreads();@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));}});}loadSslContext();}
启动方法
@Overridepublic void start() {this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyServerConfig.getServerWorkerThreads(),new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());}});prepareSharableHandlers();ServerBootstrap childHandler =this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector).channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, nettyServerConfig.getServerSocketBacklog()).option(ChannelOption.SO_REUSEADDR, true).option(ChannelOption.SO_KEEPALIVE, false).childOption(ChannelOption.TCP_NODELAY, true).localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler).addLast(defaultEventExecutorGroup,encoder,//序列化操作,将请求参数解析为二进制new NettyDecoder(),//将二进制转成RemotingCommand对象//处理心跳,表示120s没有读写请求,就会断掉new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),connectionManageHandler,//处理连接serverHandler//处理服务);}});if (nettyServerConfig.getServerSocketSndBufSize() > 0) {log.info("server set SO_SNDBUF to {}", nettyServerConfig.getServerSocketSndBufSize());childHandler.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize());}if (nettyServerConfig.getServerSocketRcvBufSize() > 0) {log.info("server set SO_RCVBUF to {}", nettyServerConfig.getServerSocketRcvBufSize());childHandler.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize());}if (nettyServerConfig.getWriteBufferLowWaterMark() > 0 && nettyServerConfig.getWriteBufferHighWaterMark() > 0) {log.info("server set netty WRITE_BUFFER_WATER_MARK to {},{}",nettyServerConfig.getWriteBufferLowWaterMark(), nettyServerConfig.getWriteBufferHighWaterMark());childHandler.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(nettyServerConfig.getWriteBufferLowWaterMark(), nettyServerConfig.getWriteBufferHighWaterMark()));}if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);}try {ChannelFuture sync = this.serverBootstrap.bind().sync();InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();this.port = addr.getPort();} catch (InterruptedException e1) {throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);}if (this.channelEventListener != null) {this.nettyEventExecutor.start();}this.timer.scheduleAtFixedRate(new TimerTask() {@Overridepublic void run() {try {NettyRemotingServer.this.scanResponseTable();} catch (Throwable e) {log.error("scanResponseTable exception", e);}}}, 1000 * 3, 1000);}
2、所有的RPC请求数据都封账成RemotingCommand对象。而每个处理消息的服务逻辑,都会封装成一个NettyRequestProcessor对象。
public class NettyEncoder extends MessageToByteEncoder<RemotingCommand> {private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);//所有的请求参数都封装在RemotingCommand对象中,转成二进制,再转发出去@Overridepublic void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)throws Exception {try {remotingCommand.fastEncodeHeader(out);byte[] body = remotingCommand.getBody();if (body != null) {out.writeBytes(body);}} catch (Exception e) {log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);if (remotingCommand != null) {log.error(remotingCommand.toString());}RemotingUtil.closeChannel(ctx.channel());}}
}
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;final int opaque = cmd.getOpaque();if (pair != null) {Runnable run = new Runnable() {@Overridepublic void run() {try {String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());doBeforeRpcHooks(remoteAddr, cmd);//服务端处理请求结束的回调final RemotingResponseCallback callback = new RemotingResponseCallback() {@Overridepublic void callback(RemotingCommand response) {doAfterRpcHooks(remoteAddr, cmd, response);if (!cmd.isOnewayRPC()) {if (response != null) {response.setOpaque(opaque);response.markResponseType();response.setSerializeTypeCurrentRPC(cmd.getSerializeTypeCurrentRPC());try {ctx.writeAndFlush(response);} catch (Throwable e) {log.error("process request over, but response failed", e);log.error(cmd.toString());log.error(response.toString());}} else {}}}};if (pair.getObject1() instanceof AsyncNettyRequestProcessor) {AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1();processor.asyncProcessRequest(ctx, cmd, callback);} else {NettyRequestProcessor processor = pair.getObject1();RemotingCommand response = processor.processRequest(ctx, cmd);callback.callback(response);}} catch (Throwable e) {log.error("process request exception", e);log.error(cmd.toString());if (!cmd.isOnewayRPC()) {final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,RemotingHelper.exceptionSimpleDesc(e));response.setOpaque(opaque);ctx.writeAndFlush(response);}}}};if (pair.getObject1().rejectRequest()) {final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,"[REJECTREQUEST]system busy, start flow control for a while");response.setOpaque(opaque);ctx.writeAndFlush(response);return;}try {final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);pair.getObject2().submit(requestTask);} catch (RejectedExecutionException e) {if ((System.currentTimeMillis() % 10000) == 0) {log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())+ ", too many requests and system thread pool busy, RejectedExecutionException "+ pair.getObject2().toString()+ " request code: " + cmd.getCode());}if (!cmd.isOnewayRPC()) {final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,"[OVERLOAD]system busy, start flow control for a while");response.setOpaque(opaque);ctx.writeAndFlush(response);}}} else {String error = " request type " + cmd.getCode() + " not supported";final RemotingCommand response =RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);response.setOpaque(opaque);ctx.writeAndFlush(response);log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);}}
3、服务端和客户端都维护一个processorTable,这是个HashMap。key是服务码requestCode,value是对应的运行单元 Pair<NettyRequestProcessor,ExecutorService>类型,包含了处理Processor和执行线程的线程池。具体的Processor,由业务系统自行注册。Broker服务注册见,BrokerController.registerProcessor(),客户端的服务注册见MQClientAPIImpl。NameServer则会注册一个大的DefaultRequestProcessor,统一处理所有服务。
//Broker注册处理器public void registerProcessor() {/*** SendMessageProcessor*/SendMessageProcessor sendProcessor = new SendMessageProcessor(this);sendProcessor.registerSendMessageHook(sendMessageHookList);sendProcessor.registerConsumeMessageHook(consumeMessageHookList);this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);/*** PullMessageProcessor*/this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);/*** ReplyMessageProcessor*/ReplyMessageProcessor replyMessageProcessor = new ReplyMessageProcessor(this);replyMessageProcessor.registerSendMessageHook(sendMessageHookList);this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor);this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor);/*** QueryMessageProcessor*/NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this);this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);/*** ClientManageProcessor*/ClientManageProcessor clientProcessor = new ClientManageProcessor(this);this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);/*** ConsumerManageProcessor*/ConsumerManageProcessor consumerManageProcessor = new ConsumerManageProcessor(this);this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);/*** EndTransactionProcessor*/this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor);this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor);/*** Default*/AdminBrokerProcessor adminProcessor = new AdminBrokerProcessor(this);this.remotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);this.fastRemotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);}
//nameServer注册处理器private void registerProcessor() {if (namesrvConfig.isClusterTest()) {//测试集群this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),this.remotingExecutor);} else {//业务比较简单,所有功能整合到一个Processor里。this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);}}
public DefaultRequestProcessor(NamesrvController namesrvController) {this.namesrvController = namesrvController;}//K2 NameServer处理请求的核心代码@Overridepublic RemotingCommand processRequest(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {if (ctx != null) {log.debug("receive request, {} {} {}",request.getCode(),RemotingHelper.parseChannelRemoteAddr(ctx.channel()),request);}//通过响应码处理不同业务switch (request.getCode()) {case RequestCode.PUT_KV_CONFIG:return this.putKVConfig(ctx, request);case RequestCode.GET_KV_CONFIG:return this.getKVConfig(ctx, request);case RequestCode.DELETE_KV_CONFIG:return this.deleteKVConfig(ctx, request);case RequestCode.QUERY_DATA_VERSION:return queryBrokerTopicConfig(ctx, request);case RequestCode.REGISTER_BROKER:Version brokerVersion = MQVersion.value2Version(request.getVersion());if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {return this.registerBrokerWithFilterServer(ctx, request);} else {return this.registerBroker(ctx, request);}case RequestCode.UNREGISTER_BROKER:return this.unregisterBroker(ctx, request);case RequestCode.GET_ROUTEINFO_BY_TOPIC:return this.getRouteInfoByTopic(ctx, request);case RequestCode.GET_BROKER_CLUSTER_INFO:return this.getBrokerClusterInfo(ctx, request);case RequestCode.WIPE_WRITE_PERM_OF_BROKER:return this.wipeWritePermOfBroker(ctx, request);case RequestCode.ADD_WRITE_PERM_OF_BROKER:return this.addWritePermOfBroker(ctx, request);case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:return getAllTopicListFromNameserver(ctx, request);case RequestCode.DELETE_TOPIC_IN_NAMESRV:return deleteTopicInNamesrv(ctx, request);case RequestCode.GET_KVLIST_BY_NAMESPACE:return this.getKVListByNamespace(ctx, request);case RequestCode.GET_TOPICS_BY_CLUSTER:return this.getTopicsByCluster(ctx, request);case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:return this.getSystemTopicListFromNs(ctx, request);case RequestCode.GET_UNIT_TOPIC_LIST:return this.getUnitTopicList(ctx, request);case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:return this.getHasUnitSubTopicList(ctx, request);case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:return this.getHasUnitSubUnUnitTopicList(ctx, request);case RequestCode.UPDATE_NAMESRV_CONFIG:return this.updateConfig(ctx, request);case RequestCode.GET_NAMESRV_CONFIG:return this.getConfig(ctx, request);default:break;}return null;}
4、请求类型分为REQUEST和RESPONSE。这是为了支持异步的RPC调用。NettyServer处理完请求后,可以先缓存到responseTable中,等NettyClient下次来获取,这样就不用阻塞Channel了,可以提升请求吞吐量。
class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {processMessageReceived(ctx, msg);}}
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {final RemotingCommand cmd = msg;//远端发过来的请求if (cmd != null) {switch (cmd.getType()) {case REQUEST_COMMAND://处理同步方式processRequestCommand(ctx, cmd);break;case RESPONSE_COMMAND://处理异步方式processResponseCommand(ctx, cmd);break;default:break;}}}
5、重点理解remoting包中是如何实现全流程异步化。
整体RPC框架流程如下图:
RocketMQ使用Netty框架提供了一套基于服务码的服务注册机制,让各种不同的组件都可以按照自己的需求,注册自己的服务方法。RocketMQ的这一套服务注册机制,是非常简洁使用的。在使用Netty进行其他相关应用开发时,都可以借鉴他的这一套服务注册机制。例如要开发一个大型的IM项目,要加减好友、发送文本,图片,甚至红包、维护群聊信息等等各种各样的请求,这些请求如何封装,就可以很好的参考这个框架。
关于RocketMQ的同步结果推送与异步结果推送
RocketMQ的RemotingServer服务端,会维护一个responseTable,这是一个线程同步的Map结构。 key为请求的ID,value是异步的消息结果。ConcurrentMap<Integer /* opaque */, ResponseFuture> 。
处理同步请求(NettyRemotingAbstract#invokeSyncImpl)时,处理的结果会存入responseTable,通过ResponseFuture提供一定的服务端异步处理支持,提升服务端的吞吐量。 请求返回后,立即从responseTable中移除请求记录。
实际上,同步也是通过异步实现的。
//org.apache.rocketmq.remoting.netty.ResponseFuture//发送消息后,通过countDownLatch阻塞当前线程,造成同步等待的效果。public RemotingCommand waitResponse(final long timeoutMillis) throws InterruptedException {this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);return this.responseCommand;}//等待异步获取到消息后,再通过countDownLatch释放当前线程。public void putResponse(final RemotingCommand responseCommand) {this.responseCommand = responseCommand;this.countDownLatch.countDown();}
处理异步请求(NettyRemotingAbstract#invokeAsyncImpl)时,处理的结果依然会存入responsTable,等待客户端后续再来请求结果。但是他保存的依然是一个ResponseFuture,也就是在客户端请求结果时再去获取真正的结果。 另外,在RemotingServer启动时,会启动一个定时的线程任务,不断扫描responseTable,将其中过期的response清除掉。
//org.apache.rocketmq.remoting.netty.NettyRemotingServer
this.timer.scheduleAtFixedRate(new TimerTask() {@Overridepublic void run() {try {NettyRemotingServer.this.scanResponseTable();} catch (Throwable e) {log.error("scanResponseTable exception", e);}}}, 1000 * 3, 1000);
Broker心跳注册管理
关注重点
Broker会在启动时向所有NameServer注册自己的服务信息,并且会定时往NameServer发送心跳信息。而NameServer会维护Broker的路由列表,并对路由表进行实时更新。这一轮就重点梳理这个过程。
源码重点
public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<>();for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {TopicConfig tmp =new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),this.brokerConfig.getBrokerPermission());topicConfigTable.put(topicConfig.getTopicName(), tmp);}topicConfigWrapper.setTopicConfigTable(topicConfigTable);}//这里才是比较关键的地方。先判断是否需要注册,然后调用doRegisterBrokerAll方法真正去注册。if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),this.getBrokerAddr(),this.brokerConfig.getBrokerName(),this.brokerConfig.getBrokerId(),this.brokerConfig.getRegisterBrokerTimeoutMills())) {doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);}}
//K2 Broker注册最核心的部分private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,TopicConfigSerializeWrapper topicConfigWrapper) {//通过这个API往外去发请求List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(this.brokerConfig.getBrokerClusterName(),this.getBrokerAddr(),this.brokerConfig.getBrokerName(),this.brokerConfig.getBrokerId(),this.getHAServerAddr(),topicConfigWrapper,this.filterServerManager.buildNewFilterServerList(),oneway,this.brokerConfig.getRegisterBrokerTimeoutMills(),this.brokerConfig.isCompressedRegister());if (registerBrokerResultList.size() > 0) {RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0);if (registerBrokerResult != null) {//注册完保存主从节点的地址if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());}this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());if (checkOrderConfig) {this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());}}}}
public List<RegisterBrokerResult> registerBrokerAll(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,final String haServerAddr,final TopicConfigSerializeWrapper topicConfigWrapper,final List<String> filterServerList,final boolean oneway,final int timeoutMills,final boolean compressed) {final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>();//通过客户端拿到nameServer的地址List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();if (nameServerAddressList != null && nameServerAddressList.size() > 0) {final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();requestHeader.setBrokerAddr(brokerAddr);requestHeader.setBrokerId(brokerId);requestHeader.setBrokerName(brokerName);requestHeader.setClusterName(clusterName);requestHeader.setHaServerAddr(haServerAddr);requestHeader.setCompressed(compressed);RegisterBrokerBody requestBody = new RegisterBrokerBody();requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);requestBody.setFilterServerList(filterServerList);final byte[] body = requestBody.encode(compressed);final int bodyCrc32 = UtilAll.crc32(body);requestHeader.setBodyCrc32(bodyCrc32);//利用CountDownLatch 多个nameServer一起注册完成,利用异步线程做一个同步请求final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());for (final String namesrvAddr : nameServerAddressList) {brokerOuterExecutor.execute(() -> {try {RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);if (result != null) {registerBrokerResultList.add(result);}log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);} catch (Exception e) {log.warn("registerBroker Exception, {}", namesrvAddr, e);} finally {countDownLatch.countDown();}});}try {countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {}}return registerBrokerResultList;}
private RegisterBrokerResult registerBroker(final String namesrvAddr,final boolean oneway,final int timeoutMills,final RegisterBrokerRequestHeader requestHeader,final byte[] body) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,InterruptedException {//将注册服务码和头信息封装成RemotingCommand 对象RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);request.setBody(body);if (oneway) {try {//通过remotingClient去注册this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);} catch (RemotingTooMuchRequestException e) {// Ignore}return null;}RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);assert response != null;switch (response.getCode()) {case ResponseCode.SUCCESS: {RegisterBrokerResponseHeader responseHeader =(RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);RegisterBrokerResult result = new RegisterBrokerResult();result.setMasterAddr(responseHeader.getMasterAddr());result.setHaServerAddr(responseHeader.getHaServerAddr());if (response.getBody() != null) {result.setKvTable(KVTable.decode(response.getBody(), KVTable.class));}return result;}default:break;}throw new MQBrokerException(response.getCode(), response.getRemark(), requestHeader == null ? null : requestHeader.getBrokerAddr());}
Broker启动后会立即发起向NameServer注册心跳。方法入口:BrokerController.this.registerBrokerAll。 然后启动一个定时任务,以10秒延迟,默认30秒的间隔持续向NameServer发送心跳。
//K2 NameServer 实际处理Broker注册的地方public RemotingCommand registerBrokerWithFilterServer(ChannelHandlerContext ctx, RemotingCommand request)throws RemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();final RegisterBrokerRequestHeader requestHeader =(RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);if (!checksum(ctx, request, requestHeader)) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("crc32 not match");return response;}RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();if (request.getBody() != null) {try {registerBrokerBody = RegisterBrokerBody.decode(request.getBody(), requestHeader.isCompressed());} catch (Exception e) {throw new RemotingCommandException("Failed to decode RegisterBrokerBody", e);}} else {registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setCounter(new AtomicLong(0));registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setTimestamp(0);}//routeInfoManager就是管理路由信息的核心组件。RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(requestHeader.getClusterName(),requestHeader.getBrokerAddr(),requestHeader.getBrokerName(),requestHeader.getBrokerId(),requestHeader.getHaServerAddr(),registerBrokerBody.getTopicConfigSerializeWrapper(),registerBrokerBody.getFilterServerList(),ctx.channel());responseHeader.setHaServerAddr(result.getHaServerAddr());responseHeader.setMasterAddr(result.getMasterAddr());byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);response.setBody(jsonValue);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;}
//注册Brokerpublic RegisterBrokerResult registerBroker(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,final String haServerAddr,final TopicConfigSerializeWrapper topicConfigWrapper,final List<String> filterServerList,final Channel channel) {RegisterBrokerResult result = new RegisterBrokerResult();try {try {//加锁,同一时间只能一个线程写this.lock.writeLock().lockInterruptibly();Set<String> brokerNames = this.clusterAddrTable.computeIfAbsent(clusterName, k -> new HashSet<>());brokerNames.add(brokerName);boolean registerFirst = false;//brokerAddrTable:核心路由信息表//nameServer端维护了Broker信息,需要给客户端推送,生产者拿到这个列表然后选一个broker去发消息,//同理消费者也需要拿到这个列表然后选一个broker去拉BrokerData brokerData = this.brokerAddrTable.get(brokerName);if (null == brokerData) {registerFirst = true;brokerData = new BrokerData(clusterName, brokerName, new HashMap<>());this.brokerAddrTable.put(brokerName, brokerData);}Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();//Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>//The same IP:PORT must only have one record in brokerAddrTableIterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();while (it.hasNext()) {Entry<Long, String> item = it.next();if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {log.debug("remove entry {} from brokerData", item);it.remove();}}String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);if (MixAll.MASTER_ID == brokerId) {log.info("cluster [{}] brokerName [{}] master address change from {} to {}",brokerData.getCluster(), brokerData.getBrokerName(), oldAddr, brokerAddr);}registerFirst = registerFirst || (null == oldAddr);if (null != topicConfigWrapper&& MixAll.MASTER_ID == brokerId) {if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())|| registerFirst) {ConcurrentMap<String, TopicConfig> tcTable =topicConfigWrapper.getTopicConfigTable();if (tcTable != null) {for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {this.createAndUpdateQueueData(brokerName, entry.getValue());}}}}//主要封装与客户端的channelBrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,new BrokerLiveInfo(System.currentTimeMillis(),topicConfigWrapper.getDataVersion(),channel,haServerAddr));if (null == prevBrokerLiveInfo) {log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);}if (filterServerList != null) {if (filterServerList.isEmpty()) {this.filterServerTable.remove(brokerAddr);} else {this.filterServerTable.put(brokerAddr, filterServerList);}}if (MixAll.MASTER_ID != brokerId) {String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);if (masterAddr != null) {BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);if (brokerLiveInfo != null) {result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());result.setMasterAddr(masterAddr);}}}} finally {this.lock.writeLock().unlock();}} catch (Exception e) {log.error("registerBroker Exception", e);}return result;}
NameServer内部会通过RouteInfoManager组件及时维护Broker信息。同时在NameServer启动时,会启动定时任务,扫描不活动的Broker。方法入口:NamesrvController.initialize方法。
public int scanNotActiveBroker() {int removeCount = 0;Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();while (it.hasNext()) {Entry<String, BrokerLiveInfo> next = it.next();long last = next.getValue().getLastUpdateTimestamp();if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {RemotingUtil.closeChannel(next.getValue().getChannel());it.remove();log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);this.onChannelDestroy(next.getKey(), next.getValue().getChannel());removeCount++;}}return removeCount;}
极简化的服务注册发现流程
为什么RocketMQ要自己实现一个NameServer,而不用Zookeeper、Nacos这样现成的注册中心?
首先,依赖外部组件会对产品的独立性形成侵入,不利于自己的版本演进。Kafka要抛弃Zookeeper就是一个先例。
另外,其实更重要的还是对业务的合理设计。NameServer之间不进行信息同步,而是依赖Broker端向所有NameServer同时发起注册。这让NameServer的服务可以非常轻量。如果可能,你可以与Nacos或Zookeeper的核心流程做下对比。
但是,要知道,这种极简的设计,其实是以牺牲数据一致性为代价的。Broker往多个NameServer同时发起注册,有可能部分NameServer注册成功,而部分NameServer注册失败了。这样,多个NameServer之间的数据是不一致的。作为注册中心,这是不可接受的。但是对于RocketMQ,这又变得可以接受了。因为客户端从NameServer上获得的,只要有一个正常运行的Broker就可以了,并不需要完整的Broker列表。
Producer发送消息过程
关注重点
首先:回顾下我们之前的Producer使用案例。
Producer有两种:
一种是普通发送者:DefaultMQProducer。只负责发送消息,发送完消息,就可以停止了。
另一种是事务消息发送者: TransactionMQProducer。支持事务消息机制。需要在事务消息过程中提供事务状态确认的服务,这就要求事务消息发送者虽然是一个客户端,但是也要完成整个事务消息的确认机制后才能退出。
事务消息机制后面将结合Broker进行整理分析。这一步暂不关注。我们只关注DefaultMQProducer的消息发送过程。
然后:整个Producer的使用流程,大致分为两个步骤:一是调用start方法,进行一大堆的准备工作。 二是各种send方法,进行消息发送。
那我们重点关注以下几个问题:
1、Producer启动过程中启动了哪些服务
2、Producer如何管理broker路由信息。 可以设想一下,如果Producer启动了之后,NameServer挂了,那么Producer还能不能发送消息?希望你先从源码中进行猜想,然后自己设计实验进行验证。
3、关于Producer的负载均衡。也就是Producer到底将消息发到哪个MessageQueue中。这里可以结合顺序消息机制来理解一下。消息中那个莫名奇妙的MessageSelector到底是如何工作的。
源码重点
1、Producer的核心启动流程
所有Producer的启动过程,最终都会调用到DefaultMQProducerImpl#start方法。在start方法中的通过一个mQClientFactory对象,启动生产者的一大堆重要服务。
这里其实就是一种设计模式,虽然有很多种不同的客户端,但是这些客户端的启动流程最终都是统一的,全是交由mQClientFactory对象来启动。而不同之处在于这些客户端在启动过程中,按照服务端的要求注册不同的信息。例如生产者注册到producerTable,消费者注册到consumerTable,管理控制端注册到adminExtTable
2、发送消息的核心流程
核心流程如下:
1、发送消息时,会维护一个本地的topicPublishInfoTable缓存,DefaultMQProducer会尽量保证这个缓存数据是最新的。但是,如果NameServer挂了,那么DefaultMQProducer还是会基于这个本地缓存去找Broker。只要能找到Broker,还是可以正常发送消息到Broker的。 --可以在生产者示例中,start后打一个断点,然后把NameServer停掉,这时,Producer还是可以发送消息的。
2、生产者如何找MessageQueue: 默认情况下,生产者是按照轮训的方式,依次轮训各个MessageQueue。但是如果某一次往一个Broker发送请求失败后,下一次就会跳过这个Broker。
//org.apache.rocketmq.client.impl.producer.TopicPublishInfo//如果进到这里lastBrokerName不为空,那么表示上一次向这个Broker发送消息是失败的,这时就尽量不要再往这个Broker发送消息了。public MessageQueue selectOneMessageQueue(final String lastBrokerName) {if (lastBrokerName == null) {return selectOneMessageQueue();} else {for (int i = 0; i < this.messageQueueList.size(); i++) {int index = this.sendWhichQueue.incrementAndGet();int pos = Math.abs(index) % this.messageQueueList.size();if (pos < 0)pos = 0;MessageQueue mq = this.messageQueueList.get(pos);if (!mq.getBrokerName().equals(lastBrokerName)) {return mq;}}return selectOneMessageQueue();}}
3、如果在发送消息时传了Selector,那么Producer就不会走这个负载均衡的逻辑,而是会使用Selector去寻找一个队列。 具体参见org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendSelectImpl 方法。
Consumer拉取消息过程
关注重点
结合我们之前的示例,回顾下消费者这一块的几个重点问题:
消费者也是有两种,推模式消费者和拉模式消费者。优秀的MQ产品都会有一个高级的目标,就是要提升整个消息处理的性能。而要提升性能,服务端的优化手段往往不够直接,最为直接的优化手段就是对消费者进行优化。所以在RocketMQ中,整个消费者的业务逻辑是非常复杂的,甚至某种程度上来说,比服务端更复杂,所以,在这里我们重点关注用得最多的推模式的消费者。
消费者组之间有集群模式和广播模式两种消费模式。我们就要了解下这两种集群模式是如何做的逻辑封装。
然后我们关注下消费者端的负载均衡的原理。即消费者是如何绑定消费队列的,哪些消费策略到底是如何落地的。
最后我们来关注下在推模式的消费者中,MessageListenerConcurrently 和MessageListenerOrderly这两种消息监听器的处理逻辑到底有什么不同,为什么后者能保持消息顺序。
源码重点
Consumer的核心启动过程和Producer是一样的, 最终都是通过mQClientFactory对象启动。不过之间添加了一些注册信息。整体的启动过程如下:
广播模式与集群模式的Offset处理
在DefaultMQPushConsumerImpl的start方法中,启动了非常多的核心服务。 比如,对于广播模式与集群模式的Offset处理
if (this.defaultMQPushConsumer.getOffsetStore() != null) {this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();} else {switch (this.defaultMQPushConsumer.getMessageModel()) {case BROADCASTING:this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;case CLUSTERING:this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;default:break;}this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);}this.offsetStore.load();
可以看到,广播模式是使用LocalFileOffsetStore,在Consumer本地保存Offset,而集群模式是使用RemoteBrokerOffsetStore,在Broker端远程保存offset。而这两种Offset的存储方式,最终都是通过维护本地的offsetTable缓存来管理Offset。
Consumer与MessageQueue建立绑定关系
start方法中还一个比较重要的东西是给rebalanceImpl设定了一个AllocateMessageQueueStrategy,用来给Consumer分配MessageQueue的。
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
//Consumer负载均衡策略
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
这个AllocateMessageQueueStrategy就是用来给Consumer和MessageQueue之间建立一种对应关系的。也就是说,只要Topic当中的MessageQueue以及同一个ConsumerGroup中的Consumer实例都没有变动,那么某一个Consumer实例只是消费固定的一个或多个MessageQueue上的消息,其他Consumer不会来抢这个Consumer对应的MessageQueue。
关于负载均衡机制,会在后面结合Producer的发送消息策略一起总结。不过这里,你可以想一下为什么要让一个MessageQueue只能由同一个ConsumerGroup中的一个Consumer实例来消费。
其实原因很简单,因为Broker需要按照ConsumerGroup管理每个MessageQueue上的Offset,如果一个MessageQueue上有多个同属一个ConsumerGroup的Consumer实例,他们的处理进度就会不一样。这样的话,Offset就乱套了。
顺序消费与并发消费
同样在start方法中,启动了consumerMessageService线程,进行消息拉取。
//Consumer中自行指定的回调函数。if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {this.consumeOrderly = true;this.consumeMessageService =new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {this.consumeOrderly = false;this.consumeMessageService =new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());}
可以看到, Consumer通过registerMessageListener方法指定的回调函数,都被封装成了ConsumerMessageService的子实现类。
而对于这两个服务实现类的调用,会延续到DefaultMQPushConsumerImpl的pullCallback对象中。也就是Consumer每拉过来一批消息后,就向Broker提交下一个拉取消息的的请求。
这里也可以印证一个点,就是顺序消息,只对异步消费也就是推模式有效。同步消费的拉模式是无法进行顺序消费的。因为这个pullCallback对象,在拉模式的同步消费时,根本就没有往下传。
当然,这并不是说拉模式不能锁定队列进行顺序消费,拉模式在Consumer端应用就可以指定从哪个队列上拿消息。
PullCallback pullCallback = new PullCallback() {@Overridepublic void onSuccess(PullResult pullResult) {if (pullResult != null) {//...switch (pullResult.getPullStatus()) {case FOUND://...DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(),processQueue,pullRequest.getMessageQueue(),dispatchToConsume);//... break;//...}}}
而这里提交的,实际上是一个ConsumeRequest线程。而提交的这个ConsumeRequest线程,在两个不同的ConsumerService中有不同的实现。
这其中,两者最为核心的区别在于ConsumerMessageOrderlyService是锁定了一个队列,处理完了之后,再消费下一个队列。
public void run() {// ....final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);synchronized (objLock) {//....}}
为什么给队列加个锁,就能保证顺序消费呢?结合顺序消息的实现机制理解一下。
从源码中可以看到,Consumer提交请求时,都是往线程池里异步提交的请求。如果不加队列锁,那么就算Consumer提交针对同一个MessageQueue的拉取消息请求,这些请求都是异步执行,他们的返回顺序是乱的,无法进行控制。给队列加个锁之后,就保证了针对同一个队列的第二个请求,必须等第一个请求处理完了之后,释放了锁,才可以提交。这也是在异步情况下保证顺序的基础思路。
实际拉取消息还是通过PullMessageService完成的。
start方法中,相当于对很多消费者的服务进行初始化,包括指定一些服务的实现类,以及启动一些定时的任务线程,比如清理过期的请求缓存等。最后,会随着mQClientFactory组件的启动,启动一个PullMessageService。实际的消息拉取都交由PullMesasgeService进行。
所谓消息推模式,其实还是通过Consumer拉消息实现的。
//org.apache.rocketmq.client.impl.consumer.PullMessageServiceprivate void pullMessage(final PullRequest pullRequest) {final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());if (consumer != null) {DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;impl.pullMessage(pullRequest);} else {log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);}}
客户端负载均衡管理总结
从之前Producer发送消息的过程以及Conmer拉取消息的过程,我们可以抽象出RocketMQ中一个消息分配的管理模型。这个模型是我们在使用RocketMQ时,很重要的进行性能优化的依据。
Producer负载均衡
Producer发送消息时,默认会轮询目标Topic下的所有MessageQueue,并采用递增取模的方式往不同的MessageQueue上发送消息,以达到让消息平均落在不同的queue上的目的。而由于MessageQueue是分布在不同的Broker上的,所以消息也会发送到不同的broker上。
在之前源码中看到过,Producer轮训时,如果发现往某一个Broker上发送消息失败了,那么下一次会尽量避免再往同一个Broker上发送消息。但是,如果你的应用场景允许发送消息长延迟,也可以给Producer设定setSendLatencyFaultEnable(true)。这样对于某些Broker集群的网络不是很好的环境,可以提高消息发送成功的几率。
同时生产者在发送消息时,可以指定一个MessageQueueSelector。通过这个对象来将消息发送到自己指定的MessageQueue上。这样可以保证消息局部有序。
Consumer负载均衡
Consumer也是以MessageQueue为单位来进行负载均衡。分为集群模式和广播模式。
1、集群模式
在集群消费模式下,每条消息只需要投递到订阅这个topic的Consumer Group下的一个实例即可。RocketMQ采用主动拉取的方式拉取并消费消息,在拉取的时候需要明确指定拉取哪一条message queue。
而每当实例的数量有变更,都会触发一次所有实例的负载均衡,这时候会按照queue的数量和实例的数量平均分配queue给每个实例。
每次分配时,都会将MessageQueue和消费者ID进行排序后,再用不同的分配算法进行分配。内置的分配的算法共有六种,分别对应AllocateMessageQueueStrategy下的六种实现类,可以在consumer中直接set来指定。默认情况下使用的是最简单的平均分配策略。
AllocateMachineRoomNearby: 将同机房的Consumer和Broker优先分配在一起。
这个策略可以通过一个machineRoomResolver对象来定制Consumer和Broker的机房解析规则。然后还需要引入另外一个分配策略来对同机房的Broker和Consumer进行分配。一般也就用简单的平均分配策略或者轮询分配策略。
感觉这东西挺鸡肋的,直接给个属性指定机房不是挺好的吗。
源码中有测试代码AllocateMachineRoomNearByTest。
在示例中:Broker的机房指定方式: messageQueue.getBrokerName().split(“-”)[0],而Consumer的机房指定方式:clientID.split(“-”)[0]
clinetID的构建方式:见ClientConfig.buildMQClientId方法。按他的测试代码应该是要把clientIP指定为IDC1-CID-0这样的形式。
AllocateMessageQueueAveragely:平均分配。将所有MessageQueue平均分给每一个消费者
AllocateMessageQueueAveragelyByCircle: 轮询分配。轮流的给一个消费者分配一个MessageQueue。
AllocateMessageQueueByConfig: 不分配,直接指定一个messageQueue列表。类似于广播模式,直接指定所有队列。
AllocateMessageQueueByMachineRoom:按逻辑机房的概念进行分配。又是对BrokerName和ConsumerIdc有定制化的配置。
AllocateMessageQueueConsistentHash。源码中有测试代码AllocateMessageQueueConsitentHashTest。这个一致性哈希策略只需要指定一个虚拟节点数,是用的一个哈希环的算法,虚拟节点是为了让Hash数据在换上分布更为均匀。
最常用的就是平均分配和轮训分配了。例如平均分配时的分配情况是这样的:
而轮训分配就不计算了,每次把一个队列分给下一个Consumer实例。
2、广播模式
广播模式下,每一条消息都会投递给订阅了Topic的所有消费者实例,所以也就没有消息分配这一说。而在实现上,就是在Consumer分配Queue时,所有Consumer都分到所有的Queue。
广播模式实现的关键是将消费者的消费偏移量不再保存到broker当中,而是保存到客户端当中,由客户端自行维护自己的消费偏移量。