文章目录
- 一、RabbitMQ简介
- 1.1、概述
- 1.2、特性
- 二、RabbitMQ原理架构
- 三、RabbitMQ应用场景
- 3.1 简单模式
- 3.2 工作模式
- 3.3 发布订阅
- 3.4 路由模式
- 3.5 主题订阅模式
- 四、同类中间件对比
- 五、RabbitMQ部署
- 5.1 单机部署
- 5.1.1 安装erlang
- 5.1.2 安装rabbitmq
- 5.2 集群部署(镜像模式)
- 5.2.1 配置节点
- 5.2.2 创建集群
- 5.3 K8s部署
- 5.3.1 创建yaml文件
- 5.3.2 申请资源清单(启动各个yaml文件)
- 5.3.3 访问Rabbirmq页面
- 六、常用命令
一、RabbitMQ简介
1.1、概述
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而群集和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。
1.2、特性
**可伸缩性:**集群服务
**消息持久化:**从内存持久化消息到硬盘,再从硬盘加载到内存
解决什么问题:
- 进程间的通讯。程序间解耦(耦合)。
- web高并发。来不及进行同步处理。”too many connections”(同步 异步切换)
二、RabbitMQ原理架构
中间的Broker表示RabbitMQ服务,每个Broker里面至少有一个Virtual host虚拟主机,每个虚拟主机中有自己的Exchange交换机、Queue队列以及Exchange交换机与Queue队列之间的绑定关系Binding。producer(生产者)和consumer(消费者)通过与Broker建立Connection来保持连接,然后在Connection的基础上建立若干Channel信道,用来发送与接收消息
名词解释:
- exchange:它指定消息按什么规则,路由到哪个列队 给消息分类
- queue:消息载体,每个消息都会被投递到一个或多个列队里面。
- binding:exchange和queue按照规则绑定
- connections:producer和consumer用TCPconnections链接到rabbitMQ
- channels:TCP中的虚拟链接。(不损耗端口号)
- Routing key 路由关键字(消息路由)
- Send Message 发送消息
- Receive Message 收消息
- Broker 缓存代理
三、RabbitMQ应用场景
3.1 简单模式
做最简单的事情,一个生产者对应一个消费者,RabbitMQ相当于一个消息代理,负责将A的消息转发给B
应用场景:将发送的电子邮件放到消息队列,然后邮件服务在队列中获取邮件并发送给收件人、聊天等
3.2 工作模式
- 一条消息只会被一个消费者接收;
- rabbit采用轮询的方式将消息是平均发送给消费者的;
- 消费者在处理完某条消息后,才会收到下一条消息。
应用场景:对于 任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
3.3 发布订阅
- 每个消费者监听自己的队列。
- 生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息
应用场景:用户通知,当用户充值成功或转账完成系统通知用户,通知方式有短信等多种方法 。比如邮件群发,群聊天,广告等。
3.4 路由模式
- 每个消费者监听自己的队列,并且设置routingkey。
- 生产者将消息发给交换机,由交换机根据routingkey来转发消息到指定的队列。
应用场景:如在商品库存中增加了1台iphone13,iphone13促销活动消费者指定routing key为iphone13,只有此促销活动会接收到消息,其它促销活动不关心也不会消费此routing key的消息
3.5 主题订阅模式
根据主题(Topics)来接收消息,将路由key和某模式进行匹配,此时队列需要绑定在一个模式上,#匹配一个词或多个词,*只匹配一个词。
应用场景:同上,iphone促销活动可以接收主题为iphone的消息,如iphone12、iphone13等
四、同类中间件对比
特性 | ActiveMQ | RabbitMQ | RockerMQ | Kafka |
---|---|---|---|---|
producer-comsumer | (生产消费) | 支持 | 支持 | 支持 |
pubkish-subscribe(发布订阅) | 支持 | 支持 | 支持 | 支持 |
request-reply(请求应答) | 支持 | 支持 | ||
API完整性 | 高 | 高 | 高 | 高 |
多语言支持 | 支持, JAVA优先 | 无关 | JAVA | 支持, JAVA优先 |
单机吞吐量 | 万级 | 万级 | 万级 | 十万级 |
消息延时 | 毫秒 | 微秒 | 毫秒 | 毫秒 |
---|---|---|---|---|
可用性 | 高(主从) | 高(主从) | 高(分布式) | 高(分布式) |
消息丢失 | 低 | 低 | 理论上不会丢失 | 理论上不会丢失 |
消息重复 | 可控制 | 理论上会有重复 | ||
文档完整性 | 高 | 高 | 高 | 高 |
提供快速入门 | 有 | 有 | 有 | 有 |
社区活跃度 | 高 | 高 | 中 | 高 |
商业支持 | 无 | 无 | 阿里云 | 无 |
成熟度 | 成熟 | 成熟 | 比较成熟 | 成熟日志领域 |
特点 | 功能齐全,被大量 开源项目使用 | 由于Erlang语言开发,性能好 | 各环节分布式设计, 多种消费模式 | 可靠性、可扩展、 持久性、性能高 |
协议 | OPENWITE、STOP、 REST、XMPP、AMQP | AMQP | 自定义(社区提供JMS) | PLAINTEXT、SSL、SASL_PLAINTEXT、SASL_SSL |
持久化 | 内存、文件、数据库 | 内存、文件 | 磁盘文件 | 磁盘、文件 |
事务 | 支持 | 支持 | 支持 | 支持 |
负载均衡 | 支持 | 支持 | 支持 | 支持 |
管理界面 | 一般 | 好 | 有web console实现 | 好 |
优点 | 成熟的产品, 已经在很多项目得到应用。 有较多文档,各种协议支持较好,多重语言的成熟客户端 | 由于Erlang语言开发,mq性能好,管理页面丰富,多种语言支持,amqp客户端可用 | 模型简单,接口易用,阿里大规模应用性能好,支持多种消费,开发都较活跃 | 性能卓越、可用性非常高、 日志领域比较成熟、功能简单、web界面友好 |
缺点 | 会莫名丢失消息, 目前社区对重心版本不是太活跃,5.x维护较少,不适合大规模队列 | 由于Erlang语言开发,难度较大,不支持动态扩容 | 产品文档匮乏,没有在mq核心去实现JMS等接口,对已有系统不能完美兼容 | 消息失败不支持重试、消息顺序可能会导致消息乱序、社区更新较慢 |
五、RabbitMQ部署
5.1 单机部署
5.1.1 安装erlang
yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel ncurses-devel
wget https://github.com/erlang/otp/releases/download/OTP-24.0/otp_src_24.0.tar.gz
tar -xzvf /otp_src_24.0.tar.gz
cd /otp_src_24.0
./configure --prefix=/usr/local/erlang --with-ssl --enable-threads --enable-smp-support --enable-kernel-poll --enable-hipe
make && make install
5.1.2 安装rabbitmq
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.10.5/rabbitmq-server-3.10.5-1.el8.noarch.rpm
rpm -ivh --nodeps rabbitmq-server-3.10.5-1.el8.noarch.rpm
添加开机启动RabbitMQ服务
chkconfig rabbitmq-server on
启动RabbitMQ服务
rabbitmq-server start
后台启动RabbitMQ服务
rabbitmq-server -detached
停止RabbitMQ服务
rabbitmqctl stop
查看RabbitMQ服务状态
rabbitmqctl status
添加帐号:name 密码:
passwd rabbitmqctl add_user name passwd
赋予其administrator角色
rabbitmqctl set_user_tags name administrator
删除角色
rabbitmqctl delete_user Username
设置权限
rabbitmqctl set_permissions -p / name ".*" ".*" ".*"
查看用户的权限
rabbitmqctl list_user_permissions username
启动成功后,rabbitMQ的相关文件所在位置
- 相关命令 :/usr/lib/rabbitmq/bin/
- 相关的日志:/var/log/rabbitmq/
- 相关的配置 : /etc/rabbitmq/
- 设置的用户权限等元数据信息:/var/lib/rabbitmq/mnesia/
http://ip:15672/ 尝试访问rabbitmq的web页面
可能会遇到报错情况,Node的错误,大概的意思就是说,hosts中的地址和node中的地址不一样,不匹配导致的。修改hosts文件中的本机的IP地址,hosts中的本机的计算机名称和Node的节点的名称保持一致即可。
5.2 集群部署(镜像模式)
普通模式:普通集群模式,就是将 RabbitMQ 部署到多台服务器上,每个服务器启动一个 RabbitMQ 实例,多个实例之间进行消息通信。
此时我们创建的队列 Queue,它的元数据(主要就是 Queue 的一些配置信息)会在所有的 RabbitMQ 实例中进行同步,但是队列中的消息只会存在于一个 RabbitMQ 实例上,而不会同步到其他队列。
当我们消费消息的时候,如果连接到了另外一个实例,那么那个实例会通过元数据定位到 Queue 所在的位置,然后访问 Queue 所在的实例,拉取数据过来发送给消费者。
这种集群可以提高 RabbitMQ 的消息吞吐能力,但是无法保证高可用,因为一旦一个 RabbitMQ 实例挂了,消息就没法访问了,如果消息队列做了持久化,那么等 RabbitMQ 实例恢复后,就可以继续访问了;如果消息队列没做持久化,那么消息就丢了。
镜像模式:它和普通集群最大的区别在于 Queue 数据和原数据不再是单独存储在一台机器上,而是同时存储在多台机器上。也就是说每个 RabbitMQ 实例都有一份镜像数据(副本数据)。每次写入消息的时候都会自动把数据同步到多台实例上去,这样一旦其中一台机器发生故障,其他机器还有一份副本数据可以继续提供服务,也就实现了高可用。
5.2.1 配置节点
- 配置两台机器的hosts
vim /etc/hosts修改后使用 - 停止服务
rabbitmqctl stop
- 设置erlang cookie
这里将第一台的该文件复制到 其他节点,由于这个文件权限是400,为方便传输,先修改权限,所以需要先修改该文件权限为 777。
集群各节点的cookie必须保持一致,否则无法通信。
erlang是通过主机名来连接服务,必须保证各个主机名之间可以ping通。可以通过编辑/etc/hosts来手工添加主机名和IP对应关系。如果主机名ping不通,rabbitmq服务启动会失败。
注意.erlang.cookie的目录,也有可能在/var/lib/rabbitmq/.erlang.cookie
将权限和所属用户/组修改回来
node2:
chmod 400 /root/.erlang.cookie
- 启动个节点
rabbitmq-server –detached
查看集群状态
[root@test-1]# rabbitmqctl cluster_status
Cluster status of node rabbit@mq1 ...
Basics
Cluster name: rabbit@mq1.example.local #集群名称
Disk Nodes #磁盘节点
rabbit@mq1
Running Nodes #运作中节点
rabbit@mq1
Versions #版本
rabbit@mq1:
RabbitMQ 3.9.0 on Erlang 24.0.4
5.2.2 创建集群
rabbitmqctl stop_app #停止 app 服务
rabbitmqctl reset #清空元数据
rabbitmqctl join_cluster rabbit@mq1 --ram #将rabbitmq-server2添加到集群当中,并成为内存节点,不加--ram默认是磁盘节点
rabbitmqctl start_app #启动 app 服务
rabbitmqctl stop_app #停止 app 服务
rabbitmqctl reset #清空元数据
rabbitmqctl join_cluster rabbit@mq1 --ram #
将rabbitmq-server2添加到集群当中,并成为内存节点,不加--ram默认是磁盘节点
rabbitmqctl start_app #启动 app 服务
将集群设置为镜像模式(只要在其中一台节点执行以下命令即可) rabbitmqctl set_policy ha-all "#" '{"ha-mode":"all"}'
http://ip:15672/尝试访问rabbitmq的web页面
5.3 K8s部署
5.3.1 创建yaml文件
包括(comfigmap、secret、rbac、sts、svc)五个yaml文件
Comfigmap:
apiVersion: v1
kind: ConfigMap
metadata:name: rabbitmq-confignamespace: test
data:enabled_plugins: |[rabbitmq_management,rabbitmq_peer_discovery_k8s].
#启用插件rabbitmq_management和rabbitmq_peer_discovery_k8srabbitmq.conf: |cluster_formation.peer_discovery_backend = rabbit_peer_discovery_k8scluster_formation.k8s.host = kubernetes.default.svc.cluster.localcluster_formation.k8s.address_type = hostname################################################## rabbit-mq is rabbitmq-cluster's namespace##################################################cluster_formation.k8s.hostname_suffix = .rabbitmq-headless.rabbit-mq.svc.cluster.localcluster_formation.node_cleanup.interval = 30cluster_formation.node_cleanup.only_log_warning = truecluster_partition_handling = autohealqueue_master_locator=min-masterscluster_formation.randomized_startup_delay_range.max = 2vm_memory_high_watermark.absolute = 1GBdisk_free_limit.absolute = 2GBloopback_users.guest = false
Secret.yaml:
用来存储rabbitmq的用户名、密码及erlang.cookie。
erlang创建步骤:erlang是集群之间互访的秘钥
首先需要生成一个erlang.cookie的文件: echo $(openssl rand -base64 32) > erlang.cookie
然后再生成base64的值 如:echo -n ‘v/sWCz4uKETUvneRyJVn87Jg15si2eGaWg54Yvefhrk=’ |base64
apiVersion: v1
kind: Secret
metadata:name: devsecretnamespace: test
type: Opaque
data:rabbitDefaulUser: "YWRtaW4=" # echo -n 'admin' |base64rabbitDefaultPass: "YWRtaW4="erlang.cookie: "di96VHZ4VmhOY1Uxc3dzTG4zOHdyMmk2S1IrTG82L2xqUEdTTFUwYmdwVDRBPQ=="
Rbac.yaml:
apiVersion: v1
kind: ServiceAccount #集群访问apiserver的凭证
metadata:name: rabbitmqnamespace: test
---
kind: Role
apiVersion: rbac.authorization.k8s.io/v1
metadata:name: endpoint-readernamespace: test
rules:
- apiGroups: [""]resources: ["endpoints"]verbs: ["get"]
---
kind: RoleBinding #将角色绑定
apiVersion: rbac.authorization.k8s.io/v1
metadata:name: endpoint-readernamespace: test
subjects:
- kind: ServiceAccountname: rabbitmq
roleRef:apiGroup: rbac.authorization.k8s.iokind: Rolename: endpoint-reader
Statefulset.yaml:
要提前创建好挂载目录
apiVersion: apps/v1
kind: StatefulSet
metadata:name: rabbitmqnamespace: test
spec:serviceName: rabbitmq-headlessselector:matchLabels:app: rabbitmq #在apps/v1中,需与 .spec.template.metadata.label 相同,用于hostname传播访问podreplicas: 3 #副本数3template:metadata:labels:app: rabbitmqspec:podAntiAffinity:requiredDuringSchedulingIgnoredDuringExecution:- labelSelector:matchExpressions:- key: "app"operator: Invalues:- rabbitmqtopologyKey: "kubernetes.io/hostname"serviceAccountName: rabbitmqterminationGracePeriodSeconds: 10containers:- name: rabbitmqimage: rabbitmq:latestimagePullPolicy: Neverresources:limits:cpu: 1memory: 500Mirequests:cpu: 1memory: 500MivolumeMounts:- name: config-volumemountPath: /etc/rabbitmq- name: rabbitmq-datamountPath: /var/lib/rabbitmq/mnesiaports:- name: httpprotocol: TCPcontainerPort: 15672- name: amqpprotocol: TCPcontainerPort: 5672livenessProbe:exec:command: ["rabbitmq-diagnostics", "status"]initialDelaySeconds: 60periodSeconds: 60timeoutSeconds: 5readinessProbe:exec:command: ["rabbitmq-diagnostics", "status"]initialDelaySeconds: 20periodSeconds: 60timeoutSeconds: 5env:- name: RABBITMQ_DEFAULT_USERvalueFrom:secretKeyRef:key: rabbitDefaulUsername: devsecret #登陆用户名和密码都存储在一个secret对象中- name: RABBITMQ_DEFAULT_PASSvalueFrom:secretKeyRef:key: rabbitDefaultPassname: devsecret- name: RABBITMQ_ERLANG_COOKIEvalueFrom:secretKeyRef:name: devsecretkey: erlang.cookie- name: HOSTNAMEvalueFrom:fieldRef:fieldPath: metadata.name- name: MY_POD_NAMESPACEvalueFrom:fieldRef:fieldPath: metadata.namespace- name: RABBITMQ_USE_LONGNAMEvalue: "true"- name: K8S_SERVICE_NAMEvalue: "rabbitmq-headless"- name: RABBITMQ_NODENAMEvalue: rabbit@$(HOSTNAME).$(K8S_SERVICE_NAME).$(MY_POD_NAMESPACE).svc.cluster.local- name: K8S_HOSTNAME_SUFFIXvalue: .$(K8S_SERVICE_NAME).$(MY_POD_NAMESPACE).svc.cluster.localvolumes:- name: config-volumeconfigMap:name: rabbitmq-configitems:- key: rabbitmq.confpath: rabbitmq.conf- key: enabled_pluginspath: enabled_plugins- name: rabbitmq-datahostPath:path: /root/rabbitmq/datatype: Directory
Service.yaml
kind: Service
apiVersion: v1
metadata:name: rabbitmq-headlessnamespace: test
spec:clusterIP: NonepublishNotReadyAddresses: trueports:- name: amqpport: 5672- name: httpport: 15672selector:app: rabbitmq---
kind: Service
apiVersion: v1
metadata:namespace: testname: rabbitmq-service
spec:ports:- name: httpprotocol: TCPport: 15672nodePort: 30672 #管理web界面- name: amqpprotocol: TCPport: 5672targetPort: 5672nodePort: 30671selector:app: rabbitmqtype: NodePort
5.3.2 申请资源清单(启动各个yaml文件)
kubectl apply -f rabbitmq-configmap.yaml
kubectl apply -f rabbitmq-secret.yaml
kubectl apply -f rabbitmq-rbac.yaml
kubectl apply -f rabbitmq-sts.yaml
kubectl apply -f rabbitmq-svc.yaml
5.3.3 访问Rabbirmq页面
http://ip:30672/尝试访问rabbitmq的web页面(需要手动创建用户并授权)
六、常用命令
- 用户管理
A. 查看用户列表:rabbitmqctl list_users;
B. 添加用户:rabbitmqctl add_user ;
C. 修改密码:rabbitmqctl change_password ;
D. 删除用户:rabbitmqctl delete_user ;
E. 设置用户角色:
rabbitmqctl set_user_tags <tag1,tag2>,角色有
management:用户可以访问management管理插件;
administrator:所有权限;
monitoring:用户可以访问management管理插件,查看所有连接、通道以及与节点相关的信息;
policymaker:用户可以访问management管理插件,并管理他们有权访问的vhost的策略和参数; - 节点与应用管理
A. 启动rabbitmq应用程序:rabbitmqctl start_app;
B. 关闭rabbitmq应用程序,但Erlang VM保持运行:rabbitmqctl stop_app;
C. 关闭所有应用和节点:rabbitmqctl stop;
D. 将Rabbitmq节点返回到原始状态,包括删除数据:rabbitmqctl reset; - 集群节点
A. 查看集群状态:rabbitmqctl cluster_status;
B. 添加节点:rabbitmqctl join_cluster {–ram|–disk} rabbit@node;
C. 移除节点:rabbitmqctl forget_cluster_node rabbit@node,注意先关闭应用程序rabbitmqctl stop_app;
D. 节点健康检查:rabbitmqctl node_health_check; - 插件管理
A. 显示所有插件:rabbitmq-plugins list;
B. 启用指定的插件:rabbitmq-plugins enable rabbitmq_management rabbitmq_mqtt rabbitmq_prometheus; - 查看有效配置
A. rabbitmqctl environment