目录
一、虚拟主机设计
1.1、需求分析
1.1.1、核心 API
1.1.2、虚拟主机的是用来干什么的?
1.1.3、如何表示 交换机和虚拟主机 之间的从属关系?
二、实现 VirtualHost 类
2.1、属性
2.2、锁对象
2.3、公开实例
2.4、虚拟主机构造方法
2.5、交换机相关操作
2.5、队列相关操作
2.6、绑定相关操作
2.7、消息相关操作
2.8、补充线程安全问题
一、虚拟主机设计
1.1、需求分析
1.1.1、核心 API
虚拟主机的概念类似于 MySQL 的 database ,把 交换机、队列、绑定、消息...... 进行 逻辑 上的隔离,因此不仅仅需要管理数据,还需要提供一些核心 API 供上层代码调用.
核心 API(就是把之前写内存中的数据管理和硬盘的数据管理,串起来):
- 创建交换机 exchangeDeclare
- 删除交换机 exchagneDelete
- 创建队列 queueDeclare
- 删除队列 queueDelete
- 创建绑定 queueBind
- 删除绑定 queueUnbind
- 发送消息 basicPublish
- 订阅消息 basicCosume
- 确认消息 basicAck
1.1.2、虚拟主机的是用来干什么的?
虚拟主机的目的,是为了保证隔离,不同虚拟主机之间的内容不要有影响.
例如,虚拟主机1 中创建了一个 exchange,叫做:“testExchange”,虚拟主机2 中,也创建了一个 exchange,叫做 “testExchange” ,他两虽然名字相同,但却是不同空间中的 exchange。
从调用者的角度来讲,虽然这两个创建 exchange 时,起的名字叫做 testExchange,但是在虚拟主机内部会自动处理,加上前缀,“virtualHost1testExchange”,“virtualHost2testExchange”。
按照这个方式,就可以区分开不同交换机,进一步区分不同队列(一个交换机对应多个队列),进一步的,绑定也就隔离开了(绑定是和交换机与队列相关的),再进一步消息是和队列强相关的,消息自然也就区分开了.
1.1.3、如何表示 交换机和虚拟主机 之间的从属关系?
方案一:参考数据库的设计,“一对多” 方案,比如给交换机表,添加个属性,虚拟主机id/name...
方案二(RabbitMQ 采取的策略):重新约定,交换机的名字 = 虚拟主机的名字 + 交换机的名字
方案三: 更优雅的办法,就是给每个虚拟主机,分配一组不同的数据库和文件,但是有点麻烦.
这里我们按照 RabbitMQ 采取的策略~~
二、实现 VirtualHost 类
2.1、属性
private String virtualHostName;private MemoryDataCenter memoryDataCenter = new MemoryDataCenter();private DiskDataCenter diskDataCenter = new DiskDataCenter();private Router router;
Router 这个类就描述了 routingKey 和 bindingKey 是否合法,以及消息与队列之间的匹配规则.
2.2、锁对象
用来处理 交换机操作、队列操作、绑定操作、消息操作 中的线程安全问题
例如:针对多线程操作“增加交换机”,以及“边增加边删除”进行加锁操作(多线程同时删除其实没什么事,因为都是 sql 的 delete 进行删除,即使删除多次,也没有副作用)
//交换机锁对象private final Object exchangeLocker = new Object(); //final 修饰是为了防止程序员修改引用,导致不是一个锁对象(其实可有可无,自己注意就 ok)//队列锁对象private final Object queueLocker = new Object();
2.3、公开实例
让上层代码调用时可以分别拿到 虚拟主机名、内存数据处理中心、硬盘数据处理中心.
public String getVirtualHostName() {return virtualHostName;}public MemoryDataCenter getMemoryDataCenter() {return memoryDataCenter;}public DiskDataCenter getDiskDataCenter() {
2.4、虚拟主机构造方法
主要针对主机名,硬盘、内存数据处理中心进行初始化操作.
public VirtualHost(String name) {this.virtualHostName = name;//对于 MemoryDataCenter 来说,不需要进行初始化工作,只需要 new 出来即可//但是,对于 DiskDataCenter 来说,需要进行初始化操作,建库建表和初始数据的设定diskDataCenter.init();//另外还需要针对硬盘初始化的数据,恢复到内存中try {memoryDataCenter.recovery(diskDataCenter);} catch (IOException | MqException | ClassNotFoundException e) {e.printStackTrace();System.out.println("[VirtualHost] 恢复内存数据失败!");}}
2.5、交换机相关操作
创建交换机:从内存中获取对应交换机,如果不存在就创建,如果存在,就直接返回
删除交换机:先检查交换机是否存在,不存在抛出异常,存在就删除
Ps:返回值是 boolean 类型,true 表示成功,false 表示失败
/*** 创建交换机* @param exchangeName* @param exchangeType* @param durable* @param autoDelete* @param arguments* @return*/public boolean exchangeDeclare(String exchangeName, ExchangeType exchangeType, boolean durable, boolean autoDelete,Map<String, Object> arguments) {//1.把交换机加上虚拟主机的名字作为前缀(虚拟主机的隔离效果)exchangeName = virtualHostName + exchangeName;try {synchronized (exchangeLocker) {//2.通过内存查询交换机是否存在Exchange exchange = memoryDataCenter.getExchange(exchangeName);if(exchange != null) {//已经存在就直接返回System.out.println("[VirtualHost] 交换机已经存在!exchangeName=" + exchangeName);return true;}//3.不存在就创建一个exchange = new Exchange();exchange.setName(exchangeName);exchange.setType(exchangeType);exchange.setDurable(durable);exchange.setAutoDelete(autoDelete);exchange.setArguments(arguments);//4.内存中是一定要创建的,硬盘中是否存储,就要传来的参数 durable 是否为 true 了(持久化)if(durable) {diskDataCenter.insertExchange(exchange);}memoryDataCenter.insertExchange(exchange);System.out.println("[VirtualHost] 交换机创建成功!exchangeName=" + exchangeName);//上述逻辑中,先写硬盘,后写内存,就是因为硬盘更容易写失败,如果硬盘写失败了,内存就不用写了//但要是先写内存,一旦内存写成功了,硬盘写失败了,还需要把内存中的数据给删掉,比较麻烦}return true;} catch (Exception e) {System.out.println("[VirtualHost] 交换机创建失败!exchangeName=" + exchangeName);e.printStackTrace();return false;}}/*** 删除交换机* @param exchangeName* @return*/public boolean exchangeDelete(String exchangeName) {//1.虚拟主机前缀exchangeName = virtualHostName + exchangeName;try {synchronized (exchangeLocker) {//2.检查当前内存中是否存在该交换机Exchange exchange = memoryDataCenter.getExchange(exchangeName);if(exchange == null) {throw new MqException("[VirtualHost] 该交换机不存在无法删除!exchangeName=" + exchangeName);}//3.如果持久化到硬盘上,就把硬盘的先删除了if(exchange.isDurable()) {diskDataCenter.deleteExchange(exchangeName);}//4.存在就删除memoryDataCenter.deleteExchange(exchangeName);System.out.println("[VirtualHost] 交换机删除成功!exchangeName=" + exchangeName);}return true;} catch (Exception e) {System.out.println("[VirtualHost] 交换机删除失败!exchangeName=" + exchangeName);e.printStackTrace();return false;}}
2.5、队列相关操作
这里的操作逻辑和交换机差不多~~
/*** 创建队列* @param queueName* @param durable* @param exclusive* @param autoDelete* @param arguments* @return*/public boolean queueDeclare(String queueName, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) {//1.虚拟主机前缀queueName = virtualHostName + queueName;try {synchronized (queueLocker) {//2.先检查内存是否存在此队列MSGQueue queue = memoryDataCenter.getQueue(queueName);if(queue != null) {System.out.println("[VirtualHost] 当前队列已存在!queueName=" + queueName);return true;}//3.不存在则创建并赋值queue = new MSGQueue();queue.setName(queueName);queue.setExclusive(exclusive);queue.setDurable(durable);queue.setAutoDelete(autoDelete);queue.setArguments(arguments);//4.如果设置了持久化,就先在硬盘上保存一份if(durable) {diskDataCenter.insertQueue(queue);}//5.写内存memoryDataCenter.insertQueue(queue);System.out.println("[VirtualHost] 队列创建成功!queueName=" + queueName);}return true;} catch (Exception e) {System.out.println("[VirtualHost] 队列创建失败!queueName=" + queueName);e.printStackTrace();return false;}}/*** 队列删除* @param queueName* @return*/public boolean queueDelete(String queueName) {//1.虚拟主机前缀queueName = virtualHostName + queueName;try {synchronized (queueLocker) {//2.判断队列是否存在,若不存在则抛出异常MSGQueue queue = memoryDataCenter.getQueue(queueName);if(queue == null) {throw new MqException("[VirtualHost] 队列不存在,删除失败!queueName=" + queueName);}//3.存在则判断是否持久化,若持久化到硬盘上,则先删硬盘if(queue.isDurable()) {diskDataCenter.deleteQueue(queueName);}//4.删内存memoryDataCenter.deleteQueue(queueName);System.out.println("[VirtualHost] 队列删除成功!queueName=" + queueName);}return true;} catch (Exception e) {System.out.println("[VirtualHost] 队列删除失败!queueName=" + queueName);e.printStackTrace();return false;}}
2.6、绑定相关操作
创建绑定:先判断内存中是否存在这种绑定,如果存在则创建失败,如果不存在就需要看交换机和队列是否都同时存在,存在才能创建成功
删除绑定::先判断内存中是否存在这种绑定,不存在抛出异常,如果存在,不用判定交换机和队列是否存在(反而复杂了步骤),也不用判定硬盘上有没有此绑定,直接从内存和硬盘上删除就可,因为即使硬盘中没有数据,也不会有什么副作用(本质是 MyBatis 底层调用 delete 语句进行删除).
/*** 创建队列交换机绑定关系* @param queueName* @param exchangeName* @param bindingKey* @return*/public boolean queueBind(String queueName, String exchangeName, String bindingKey) {//1.虚拟主机前缀queueName = virtualHostName + queueName;exchangeName = virtualHostName + exchangeName;try {synchronized (exchangeLocker) {synchronized (queueLocker) {//2.检测内存中是否存在绑定Binding existsBinding = memoryDataCenter.getBinding(exchangeName, queueName);if(existsBinding != null) {throw new MqException("[VirtualHost] 绑定已存在,创建失败!exchangeName=" + exchangeName +"queueName=" + queueName);}//3.不存在,先检验 bindingKey 是否合法if(!router.checkBindingKey(bindingKey)) {throw new MqException("[VirtualHost] bindingKey 非法!bingdingKey=" + bindingKey);}//4.检验队列和交换机是否存在,不存在抛出异常Exchange existsExchange = memoryDataCenter.getExchange(exchangeName);if(existsExchange == null) {throw new MqException("[VirtualHost] 交换机不存在!exchangeName=" + exchangeName);}MSGQueue existsQueue = memoryDataCenter.getQueue(queueName);if(existsQueue == null) {throw new MqException("[VirtualHost] 队列不存在!queueName=" + queueName);}//5.创建绑定Binding binding = new Binding();binding.setBindingKey(bindingKey);binding.setQueueName(queueName);binding.setExchangeName(exchangeName);//6.硬盘中创建绑定的前提是队列和交换机都是持久化的,否则绑定在硬盘中无意义if(existsQueue.isDurable() && existsExchange.isDurable()) {diskDataCenter.insertBinding(binding);}//7.写内存memoryDataCenter.insertBinding(binding);System.out.println("[VirtualHost] 绑定创建成功!exchangeName=" + exchangeName +"queueName=" + queueName);}}return true;} catch (Exception e) {System.out.println("[VirtualHost] 绑定创建失败!exchangeName=" + exchangeName +"queueName=" + queueName);e.printStackTrace();return false;}}/*** 删除绑定关系* @param queueName* @param exchangeName* @return*/public boolean queueUnBind(String queueName, String exchangeName) {//1.虚拟主机前缀queueName = virtualHostName + queueName;exchangeName = virtualHostName + exchangeName;try {//这里的加锁顺序一定要和上面的两次加锁保持一致(防止死锁)synchronized (exchangeLocker) {synchronized (queueLocker) {//2.检查绑定是否存在,不存在则抛出异常Binding binding = memoryDataCenter.getBinding(exchangeName, queueName);if(binding == null) {throw new MqException("[VirtualHost] 绑定不存在, 删除失败!binding=" + binding);}//3.直接删除硬盘数据,不用判断是否存在,因为即使不存在绑定,直接删也不会有什么副作用,因为本质就是调用 sql 删除diskDataCenter.deleteBinding(binding);//4.删内存memoryDataCenter.deleteBinding(binding);System.out.println("[VirtualHost] 绑定删除成功!queueName=" + queueName +", exchangeName=" + exchangeName);}}return true;} catch (Exception e) {System.out.println("[VirtualHost] 绑定删除失败!queueName=" + queueName +", exchangeName=" + exchangeName);e.printStackTrace();return false;}}
2.7、消息相关操作
发布消息:本质就是发送消息到指定的 交换机/队列 中,首先判定 routingKey 是否合法,然后再去获取交换机,再根据不同交换机的类型进行不同的处理(不同交换机给队列发送消息的方式不同),最后调用发送消息方法
放送消息:发送消息本质上就是将消息写入 内存/硬盘 上,对于硬盘上是否要写入,就要看消息是否支持持久化(deliverMode 为 1 表示不持久化,deliverMode 为 2 表示要持久化),最后还需要补充一个逻辑,通知消费者可以消费消息了(这里先不补充了,下一章会讲到).
Ps:这里对于直接交换机,约定以 routingKey 作为队列名称,直接将消息发送给指定的队列,也就可以直接无视绑定关系了(这是实际开发中最常用的场景,因此这里这样设定)
/*** 发送消息到指定的 交换机/队列 中* @param exchangeName* @return*/public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties, byte[] body) {//1.虚拟主机前缀exchangeName = virtualHostName + exchangeName;try {//2.判定 routingKey 是否合法if(!router.checkRoutingKey(routingKey)) {throw new MqException("[VirtualHost] routingKey 非法!routingKey=" + routingKey);}//3.获取交换机Exchange exchange = memoryDataCenter.getExchange(exchangeName);if(exchange == null) {throw new MqException("[VirtualHost] 交换机不存在,消息发送失败!exchangeName=" + exchangeName);}//4.判定交换机类型if(exchange.getType() == ExchangeType.DIRECT) {//直接交换机转发消息,以 routingKey 作为队列名字//直接把消息写入指定队列,也就可以无视绑定关系(这是实际开发环境中最常用的,因此这里这样设定)String queueName = virtualHostName + routingKey;//5.构造消息对象Message message = Message.createMessageWithId(routingKey, basicProperties, body);//6.查找该队列名对应的对象MSGQueue queue = memoryDataCenter.getQueue(queueName);if(queue == null) {throw new MqException("[VirtualHost] 队列不存在!queueName=" + queueName);}//7.发送消息sendMessage(queue, message);} else {// 按照 fanout 和 topic 的方式转发//5.找到该交换机的所有绑定,并遍历绑定对象ConcurrentHashMap<String, Binding> bindingsMap = memoryDataCenter.getBindings(exchangeName);for(Map.Entry<String, Binding> entry : bindingsMap.entrySet()) {// 1) 获取到绑定对象,判定对应的队列是否存在Binding binding = entry.getValue();MSGQueue queue = memoryDataCenter.getQueue(binding.getQueueName());if(queue == null) {//此处不抛异常了,因为又多个这样的队列//希望不要因为一个队列匹配失败,影响到其他队列System.out.println("[VirtualHost] basicPublish 发送消息时,发现队列不存在!queueName=" + binding.getQueueName());}// 2) 构造消息对象Message message = Message.createMessageWithId(routingKey, basicProperties, body);// 3) 判定这个消息是否能转发给该队列// 如果是 fanout。所有绑定的队列都要转发// 如果是 topic,还需要判定下,bindingKey 和 routingKey 是不是匹配if(!router.route(exchange.getType(), binding, message)) {continue;}sendMessage(queue, message);}}return true;} catch (Exception e) {System.out.println("[VirtualHost] 消息发送失败!exchangeName=" + exchangeName +", routingKey=" + routingKey);e.printStackTrace();return false;}}/*** 发送消息* 实际上就是把消息写道 硬盘 和 内存 上* @param queue* @param message*/private void sendMessage(MSGQueue queue, Message message) throws IOException, MqException {//判断当前消息是否要进行持久化//deliverMode 为 1 表示不持久化,deliverMode 为 2 表示要持久化int deliverMode = message.getDeliverMode();if(deliverMode == 1) {diskDataCenter.sendMessage(queue, message);}//写内存memoryDataCenter.sendMessage(queue, message);//TODO 此处还需要补充一个逻辑,通知消费者可以消费消息了}
2.8、补充线程安全问题
锁粒度过大,是否要调整?
这里加了很多锁,同时锁粒度还是挺大的,但也可以做出调整,细化锁粒度,但是影响不大,因为这里创建交换机、创建绑定、创建队列、删除交换机.... 这些都是属于低频操作!
既然是低频操作,所以遇到两个线程都去操作创建队列之类的情况本身概率就很低了,再加上 synchronized 本身也是偏向锁状态,只有真正遇到竞争的时候才真加锁,因此没必要调整锁粒度.
此处加锁还是十分有必要的,为了应对一些少数的极端情况.
既然 VirtualHost 这一层代码加锁了,里面的 MemoryDataCenter 中的操作是否就不需要加锁了?之前加锁是否就没意义了?
咱们也不知道这个类的方法是给哪个类调用的,当前 VirtualHost 自身保证了线程安全的,此时 VirtualHost 内部调用 MemoryDataCenter ,里面的不加锁问题也不大,但是如果是另一个别的类,也多线程调用 MemoryDataCenter,就不可预知了~~