Java项目——设计一个消息队列
- ⼀. 消息队列背景知识
- ⼆. 需求分析
- 核⼼概念
- 核⼼ API
- 交换机类型 (Exchange Type)
- 持久化
- ⽹络通信
- 消息应答
- 三. 模块划分
- 服务器模块
- 客户端模块
- 公共模块
⼀. 消息队列背景知识
曾经我们学习过 阻塞队列 (BlockingQueue) , 我们说, 阻塞队列最⼤的⽤途, 就是⽤来实现 ⽣产者消费
者模型.
⽣产者消费者模型, 存在诸多好处, 是后端开发的常⽤编程⽅式.
- 解耦合
- 削峰填⾕
在实际的后端开发中, 尤其是分布式系统⾥, 跨主机之间使⽤⽣产者消费者模型, 也是⾮常普遍的需求.
因此, 我们通常会把阻塞队列, 封装成⼀个独⽴的服务器程序, 并且赋予其更丰富的功能.
这样的程序我们就称为 消息队列 (Message Queue, MQ)
⼆. 需求分析
核⼼概念
- ⽣产者 (Producer)
- 消费者 (Consumer)
- 中间⼈ (Broker)
- 发布 (Publish)
- 订阅 (Subscribe)
⼀个⽣产者, ⼀个消费者
N 个⽣产者, N 个消费者
其中, Broker 是最核⼼的部分. 负责消息的存储和转发.
在 Broker 中, ⼜存在以下概念.
- 虚拟机 (VirtualHost): 类似于 MySQL 的 “database”, 是⼀个逻辑上的集合. ⼀个 BrokerServer 上可
以存在多个 VirtualHost. - 交换机 (Exchange): ⽣产者把消息先发送到 Broker 的 Exchange 上. 再根据不同的规则, 把消息转发
给不同的 Queue. - 队列 (Queue): 真正⽤来存储消息的部分. 每个消费者决定⾃⼰从哪个 Queue 上读取消息.
- 绑定 (Binding): Exchange 和 Queue 之间的关联关系. Exchange 和 Queue 可以理解成 “多对多” 关
系. 使⽤⼀个关联表就可以把这两个概念联系起来. - 消息 (Message): 传递的内容.
所谓的 Exchange 和 Queue 可以理解成 “多对多” 关系, 和数据库中的 “多对多” ⼀样. 意思是:
⼀个 Exchange 可以绑定多个 Queue (可以向多个 Queue 中转发消息).
⼀个 Queue 也可以被多个 Exchange 绑定 (⼀个 Queue 中的消息可以来⾃于多个 Exchange).
这些概念, 既需要在内存中存储, 也需要在硬盘上存储.
- 内存存储: ⽅便使⽤.
- 硬盘存储: 重启数据不丢失
核⼼ API
对于 Broker 来说, 要实现以下核⼼ API. 通过这些 API 来实现消息队列的基本功能.
- 创建队列 (queueDeclare)
- 销毁队列 (queueDelete)
- 创建交换机 (exchangeDeclare)
- 销毁交换机 (exchangeDelete)
- 创建绑定 (queueBind)
- 解除绑定 (queueUnbind)
- 发布消息 (basicPublish)
- 订阅消息 (basicConsume)
- 确认消息 (basicAck)
另⼀⽅⾯, Producer 和 Consumer 则通过⽹络的⽅式, 远程调⽤这些 API, 实现 ⽣产者消费者模型.
交换机类型 (Exchange Type)
对于 RabbitMQ 来说, 主要⽀持四种交换机类型.
• Direct
• Fanout
• Topic
• Header
其中 Header 这种⽅式⽐较复杂, ⽐较少⻅. 常⽤的是前三种交换机类型. 咱们此处也主要实现这三种.
• Direct: ⽣产者发送消息时, 直接指定被该交换机绑定的队列名.
• Fanout: ⽣产者发送的消息会被复制到该交换机的所有队列中.
• Topic: 绑定队列到交换机上时, 指定⼀个字符串为 bindingKey. 发送消息指定⼀个字符串为
routingKey. 当 routingKey 和 bindingKey 满⾜⼀定的匹配条件的时候, 则把消息投递到指定队列.
这三种操作就像给 qq 群发红包.
• Direct 是发⼀个专属红包, 只有指定的⼈能领.
• Fanout 是使⽤了魔法, 发⼀个 10 块钱红包, 群⾥的每个⼈都能领 10 块钱.
• Topic 是发⼀个画图红包, 发 10 块钱红包, 同时出个题, 得画的像的⼈, 才能领. 也是每个领到的⼈
都能领 10 块钱
持久化
Exchange, Queue, Binding, Message 都有持久化需求.
当程序重启 / 主机重启, 保证上述内容不丢失
⽹络通信
⽣产者和消费者都是客⼾端程序, broker 则是作为服务器. 通过⽹络进⾏通信.
在⽹络通信的过程中, 客⼾端部分要提供对应的 api, 来实现对服务器的操作
- 创建 Connection
- 关闭 Connection
- 创建 Channel
- 关闭 Channel
- 创建队列 (queueDeclare)
- 销毁队列 (queueDelete)
- 创建交换机 (exchangeDeclare)
- 销毁交换机 (exchangeDelete)
- 创建绑定 (queueBind)
- 解除绑定 (queueUnbind)
- 发布消息 (basicPublish)
- 订阅消息 (basicConsume)
- 确认消息 (basicAck)
可以看到, 在 broker 的基础上, 客⼾端还要增加 Connection 操作和 Channel 操作.
Connection 对应⼀个 TCP 连接.
Channel 则是 Connection 中的逻辑通道.
⼀个 Connection 中可以包含多个 Channel.
Channel 和 Channel 之间的数据是独⽴的. 不会相互⼲扰.
这样的设定主要是为了能够更好的复⽤ TCP 连接, 达到⻓连接的效果, 避免频繁的创建关闭 TCP 连接
Connection 可以理解成⼀根⽹线. Channel 则是⽹线⾥具体的线缆
消息应答
被消费的消息, 需要进⾏应答.
应答模式分成两种.
• ⾃动应答: 消费者只要消费了消息, 就算应答完毕了. Broker 直接删除这个消息.
• ⼿动应答: 消费者⼿动调⽤应答接⼝, Broker 收到应答请求之后, 才真正删除这个消息.
⼿动应答的⽬的, 是为了保证消息确实被消费者处理成功了. 在⼀些对于数据可靠性要求⾼的场景, ⽐
较常⻅.