概述
- RocketMQ 是一款分布式、队列模型的消息中间件,具有以下特点:
- 能够保证严格的消息顺序
- 提供丰富的消息拉取模式
- 高效的订阅者水平扩展能力
- 实时的消息订阅机制
- 亿级消息堆积能力
- 选用理由:
- 强调集群无单点,可扩展,任意一点高可用,水平可扩展。
- 海量消息堆积能力,消息堆积后,写入低延迟。
- 支持上万个队列
- 消息失败重试机制
- 消息可查询
- 开源社区活跃
- 成熟度(经过双十一考验)
关键概念
1 ) 主题与标签
- 主题 Tpoic:第一级消息类型,书的标题
- 标签 Tags:第二级消息类型,书的目录,可以基于 Tag 做消息过滤
例如:
- 主题:
- 订单交易
- 标签:
- 订单交易-创建
- 订单交易-付款
- 订单交易-完成
- 可见,标签用于细分主题
2 )发送与订阅群组
- 生产组:用于消息的发送
- 消费组:用于消息的订阅处理
- 生产组和消费组,方便扩缩机器,增减处理能力,集群组的名字,用于标记用途中的一员
- 每次只会随机的发给每个集群中的一员
RocketMQ 集群方式
- 推荐的几种 Broker 集群部署方式,这里的 Slave 不可写,但可读,类似于 Mysql 主备方式
1 ) 单个 Master
- 这种方式风险较大,一旦 Broker 重启或者宕机时,会导致整个服务不可用,不建议线上环境使用
2 ) 多 Master 模式
- 一个集群无 Slave,全是 Master,例如 2 个 Master 或者 3 个 Master
- 优点
- 配置简单,单个 Master 宕机或重启维护对应用无影响,在磁盘配置为 RAID10时
- 即使机器宕机不可恢复情况下,由与 RAID10 磁盘非常可靠
- 消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢), 性能最高
- 缺点
- 单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅
- 消息实时性会受到受到影响
- 启动集群
- 先启动 NameServer
- 在机器 A,启动第一个 Master
- 在机器 B,启动第二个 Master
多 Master 多 Slave 模式,异步复制
- 每个 Master 配置一个 Slave,有多对 Master-Slave,HA 采用异步复制方式,主备有短暂消息延迟,毫秒级
- 优点
- 即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响
- 因为 Master 宕机后,消费者仍然可以从 Slave 消费,此过程对应用透明
- 不需要人工干预,性能同多Master 模式几乎一样
- 缺点
- Master 宕机,磁盘损坏情况,会丢失少量消息
- Master 宕机,磁盘损坏情况,会丢失少量消息
- 启动集群
- 先启动 NameServer
- 在机器 A,启动第一个 Master
- 在机器 B,启动第二个 Master
- 在机器 C,启动第一个 Slave
- 在机器 D,启动第二个 Slave
多 Master 多 Slave 模式,同步双写
- 每个 Master 配置一个 Slave,有多对 Master-Slave,HA 采用同步双写方式,主备都写
成功,向应用返回成功。 - 优点
- 数据与服务都无单点,Master 宕机情况下,消息无延迟
- 服务可用性与数据可用性都非常高
- 缺点
- 性能比异步复制模式略低,大约低 10%左右,发送单个消息的 RT 会略高
- 目前主宕机后,备机不能自动切换为主机,后续会支持自动切换功能
- 启动集群
- 先启动 NameServer
- 在机器 A,启动第一个 Master
- 在机器 B,启动第二个 Master
- 在机器 C,启动第一个 Slave
- 在机器 D,启动第二个 Slave
- 以上 Broker 与 Slave 配对是通过指定相同的 brokerName 参数来配对,Master 的 BrokerId 必须是 0,Slave 的 BrokerId 必须是大与 0 的数
- 另外一个 Master 下面可以挂载多个 Slave,同一 Master 下的多个 Slave 通过指定不同的 BrokerId 来区分
RocketMQ 部署【Master-Slave 方式】
1 )服务器环境
序号 | IP | 用户名 | 密码 | 角色 | 模式 |
---|---|---|---|---|---|
1 | 192.168.11.128 | root | *** | nameServer1,brokerServer1 | Master1 |
2 | 192.168.11.129 | root | *** | nameServer2,brokerServer2 | Master2 |
2 ) Hosts 添加信息
vi /etc/hosts
IP | NAME |
---|---|
192.168.11.128 | rocketmq-nameserver1 |
192.168.11.128 | rocketmq-master1 |
192.168.11.129 | rocketmq-nameserver2 |
192.168.11.129 | rocketmq-master1-slave |
3 ) 上传解压【两台机器】
# 上传 apache-rocketmq.tar.gz 文件至/usr/local
# tar -zxvf apache-rocketmq.tar.gz -C /usr/local
# ln -s apache-rocketmq rocketmq 建立软链ll /usr/local
4 )创建存储路径【两台机器】
# mkdir /usr/local/rocketmq/store
# mkdir /usr/local/rocketmq/store/commitlog
# mkdir /usr/local/rocketmq/store/consumequeue
# mkdir /usr/local/rocketmq/store/index
5 ) RocketMQ 配置文件【两台机器】
- vim /usr/local/rocketmq/conf/2m-2s-async/broker-a.properties
- vim /usr/local/rocketmq/conf/2m-2s-async /broker-a-s.properties
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker 名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a|broker-b
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer 地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的 topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建 Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4 点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog 每个文件的大小默认 1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue 每个文件默认存 30W 条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制 Master
#- SYNC_MASTER 同步双写 Master
#- SLAVE
brokerRole=ASYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
6 ) 修改日志配置文件【两台机器】
# mkdir -p /usr/local/rocketmq/logs
# cd /usr/local/rocketmq/conf && sed -i 's#${user.home}#/usr/local/rocketmq#g' *.xml
7 ) 修改启动脚本参数【两台机器】
-
vim /usr/local/rocketmq/bin/runbroker.sh
#============================================================================== # 开发环境 JVM Configuration #============================================================================== JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -XX:PermSize=128m - XX:MaxPermSize=320m"
-
vim /usr/local/rocketmq/bin/runserver.sh
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -XX:PermSize=128m - XX:MaxPermSize=320m"
8 ) 启动 NameServer【两台机器】
# cd /usr/local/rocketmq/bin
# nohup sh mqnamesrv &
9 ) 启动 BrokerServer A【192.168.11.128】
# cd /usr/local/rocketmq/bin
# nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-noslave/broker-a.properties >/dev/null 2>&1 &
# netstat -ntlp
# jps
# tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/broker.log
# tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/namesrv.log
10 ) 启动 BrokerServer B【192.168.11.129】
# cd /usr/local/rocketmq/bin
# nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-noslave/broker-a-s.properties >/dev/null 2>&1 &
# netstat -ntlp
# jps
# tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/broker.log
# tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/namesrv.log
11 ) RocketMQ Console
在 tomcat 中部署 rocketmq-console.war
12 ) 数据清理
# cd /usr/local/rocketmq/bin
# sh mqshutdown broker
# sh mqshutdown namesrv
# --等待停止
# rm -rf /usr/local/rocketmq/store
# mkdir /usr/local/rocketmq/store
# mkdir /usr/local/rocketmq/store/commitlog
# mkdir /usr/local/rocketmq/store/consumequeue
# mkdir /usr/local/rocketmq/store/index
# --按照上面步骤重启 NameServer 与 BrokerServer
使用注意事项
1 ) 消费重试机制
第几次重试 | 每次重试间隔时间 |
---|---|
1 | 10 秒 |
2 | 30 秒 |
3 | 1 分钟 |
4 | 2 分钟 |
5 | 3 分钟 |
6 | 4 分钟 |
7 | 5 分钟 |
8 | 6 分钟 |
9 | 7 分钟 |
10 | 8 分钟 |
11 | 9 分钟 |
12 | 10 分钟 |
13 | 20 分钟 |
14 | 30 分钟 |
15 | 1 小时 |
16 | 2 小时 |
2 )消费端做幂等处理
- RocketMQ 无法避免消息重复,如果业务对消费重复非常敏感,务必要在业务局面去重,
- 有以下几种去重方式:
- 将消息的唯一键,可以是 msgId
- 也可以是消息内容中的唯一标识字段,例如订单 Id等
- 建议最好使用消息内容中的唯一标识字段去重
多主多从模式
-
多主多从模式分为俩种方式,第一种为异步复制,第二种为同步双写
- 我们暂且考虑其中的一种情况
-
注意:RocketMQ 每一种集群环境配置会对应一个不同的目录
- 双主模式,文件夹配置为: conf/2m-noslave/
- 多主多从模式(异步复制),文件夹配置为: conf/2m-2s-async/
- 多主多从模式(同步双写),文件夹配置为: conf/2m-2s-sync/
-
集群规划如下:【四台机器】
IP NAME 192.168.11.121 rocketmq-nameserver1、rocketmq-master1 192.168.11.122 rocketmq-nameserver2、rocketmq-master2 192.168.11.123 rocketmq-nameserver3、rocketmq-master1-slave 192.168.11.124 rocketmq-nameserver4、rocketmq-master2-slave -
添加 hosts 信息 vi /etc/hosts,参考如上
-
解压上传【113、114 俩台机器】
# 上传 alibaba-rocketmq-3.2.6.tar.gz 文件至/usr/local # tar -zxvf alibaba-rocketmq-3.2.6.tar.gz -C /usr/local # mv alibaba-rocketmq alibaba-rocketmq-3.2.6 # ln -s alibaba-rocketmq-3.2.6 rocketmqll /usr/local
-
创建存储目录【113、114 俩台机器】
# mkdir /usr/local/rocketmq/store # mkdir /usr/local/rocketmq/store/commitlog # mkdir /usr/local/rocketmq/store/consumequeue # mkdir /usr/local/rocketmq/store/index
-
RocketMQ 配置文件【四台机器】
# vim /usr/local/rocketmq/conf/2m-2s-async/broker-a.properties # vim /usr/local/rocketmq/conf/2m-2s-async/broker-b.properties # vim /usr/local/rocketmq/conf/2m-2s-async/broker-a-s.properties # vim /usr/local/rocketmq/conf/2m-2s-async/broker-b-s.properties
broker-a.properties、broker-b.properties 配置如下:
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker 名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a|broker-b
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer 地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876;rocketmqnameserver3:9876;rocketmq-nameserver4:9876
#在发送消息时,自动创建服务器不存在的 topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建 Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口listenPort=10911
#删除文件时间点,默认凌晨 4 点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog 每个文件的大小默认 1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue 每个文件默认存 30W 条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制 Master
#- SYNC_MASTER 同步双写 Master
#- SLAVE
brokerRole=ASYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
broker-a-s.properties、broker-b-s.properties 配置如下:
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker 名字,注意此处不同的配置文件填写的不一样,与 Master 通过 brokerName 来配对
brokerName=broker-a|broker-b
#0 表示 Master,>0 表示 Slave
brokerId=1
#nameServer 地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876;rocketmqnameserver3:9876;rocketmq-nameserver4:9876
#在发送消息时,自动创建服务器不存在的 topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建 Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4 点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog 每个文件的大小默认 1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue 每个文件默认存 30W 条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
#消费队列存储路径存储路径storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制 Master
#- SYNC_MASTER 同步双写 Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
-
修改日志配置文件【113、114 俩台机器】
# mkdir -p /usr/local/rocketmq/logs # cd /usr/local/rocketmq/conf && sed -i 's#${user.home}#/usr/local/rocketmq#g' *.xml
-
修改脚本启动参数【113、114 俩台机器】
-
vim /usr/local/rocketmq/bin/runbroker.sh
#============================================================================== # 开发环境 JVM Configuration #============================================================================== JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -XX:PermSize=128m -XX:MaxPermSize=320m"
-
vim /usr/local/rocketmq/bin/runserver.sh
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -XX:PermSize=128m - XX:MaxPermSize=320m"
-
-
启动 NameServer【四台机器】
# cd /usr/local/rocketmq/bin # nohup sh mqnamesrv &
-
启动 Master1:BrokerServerA【192.168.11.121】
# cd /usr/local/rocketmq/bin # nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-2s-async/broker-a.properties >/dev/null 2>&1 & # netstat -ntlp # jps # tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/broker.log # tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/namesrv.log
-
启动 Master2:BrokerServerB【192.168.11.122】
# cd /usr/local/rocketmq/bin # nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-2s-async/broker-b.properties >/dev/null 2>&1 & # netstat -ntlp # jps # tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/broker.log # tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/namesrv.log
-
启动 Master1-Slave:BrokerServerC【192.168.11.123】
# cd /usr/local/rocketmq/bin # nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-2s-async/broker-a-s.properties >/dev/null 2>&1 & # netstat -ntlp # jps # tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/broker.log # tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/namesrv.log
-
启动 Master2-Slave:BrokerServerD【192.168.11.124】
# cd /usr/local/rocketmq/bin # nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-2s-async/broker-b-s.properties >/dev/null 2>&1 & # netstat -ntlp # jps # tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/broker.log # tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/namesrv.log
-
服务停止(首先关闭 4 个 BrokerServer,再关闭 4 个 NameServer):
# cd /usr/local/rocketmq/bin # sh mqshutdown broker # sh mqshutdown namesrv