模拟实现消息队列项目(系列3) -- 服务器模块(硬盘管理)

目录

前言

1. 创建项目

2. 创建核心类

2.1 Exchange

2.2 MSQueue

2.3 Binding

2.4 Message

3. 数据库设计

3.1 SQLite 配置

3.2 Mapper层代码实现

3.2.1 创建表操作

3.2.2 交换机 队列 绑定的增加和删除

3.3 实现DataBaseManager

3.4 DataBaseManager单元测试

4. 消息存储设计

4.1 创建MessageFileManager类

4.2 MessageFileManager单元测试

5. 整合数据库和文件操作(DiskDataCenter)

结语


前言

        我们上一节,对我们的项目的需求和模块的划分进行了总结,接下来我们进入代码环节,这里还是在强调一遍,一个项目的最好的开始就是对项目的需求分析以及模块的划分规划好,有了整天的架构,我们再写代码对功能进行一一实现.这个环节是必须要有的.接下来,本章节是对于服务器模块中硬盘管理进行总结,主要是数据库管理和文件管理.最后会将整个项目的Gitee链接放在文章末尾,欢迎访问.


1. 创建项目

        本项目是基于Spring boot框架的,这里如何创建Spring Boot 项目这里就不进行过多的赘述了,可以从之前的博客进行学习.

Spring Boot的创建与使用icon-default.png?t=N6B9https://blog.csdn.net/weixin_46114074/article/details/131652160

2. 创建核心类

2.1 Exchange

在mqServer中创建core文件夹新建Exchange.class 和 ExchangeType.class(使用枚举表示交换机的类型)

package com.example.demo.mqserver.core;import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Data;import java.util.HashMap;
import java.util.Map;@Data
public class Exchange {// 1.交换机的名字,为身份标识,也就是唯一的private String name;// 2.交换机的类型// direct fanout topicprivate ExchangeType type = ExchangeType.DIRECT;// 3.交换机是否持久化储存private boolean durable = false;// 4.如果当前交换机次没人使用了(生产者),就会自动被删除(没有实现此功能)private boolean autoDelete = false;// 5.创建交换机的时候指定一些额外的选项(没有实现此功能)// 为了进行存储数据库,我们将map进行转换成Json格式的字符串在数据库中进行储存private Map<String, Object> arguments = new HashMap<>();/*** 这一组getter 和 setter 方法 用来和数据库进行交互的时候的使用* @return*/public String getArguments() {// 将arguments进行转换成Json格式ObjectMapper objectMapper = new ObjectMapper();try {return objectMapper.writeValueAsString(arguments);} catch (JsonProcessingException e) {e.printStackTrace();}return "{}";}public void setArguments(String argumentsJson) {ObjectMapper objectMapper = new ObjectMapper();// 将从数据库获取的argumentsJson转换成maptry {
//                                  转换的数据       转换成的类型,如果是简单对象就直接使用类对象即可,
//                                                 要是复杂的数据类型,就使用TypeReference匿名内部类,传入目标转换的类型this.arguments = objectMapper.readValue(argumentsJson,new TypeReference<HashMap<String,Object>>(){});} catch (JsonProcessingException e) {e.printStackTrace();}}// ---------------------------------------------------------------------------------------/*** 这一组getter 和 setter 方法供内部进行使用,更加简单的获取和设置键值对* @param key* @return*/public Object getArguments(String key) {return arguments.get(key);}public void setArguments(String key, Object value) {arguments.put(key,value);}// ---------------------------------------------------------------------------------------public void setArguments(Map<String,Object> arguments){this.arguments = arguments;}
}

2.2 MSQueue

 在mqServer中创建core文件夹新建MSQueue.class

package com.example.demo.mqserver.core;import com.example.demo.common.ConsumerEnv;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Data;import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;/*** Created with IntelliJ IDEA.* Description:队列(存储消息的队列)* User: YAO* Date: 2023-07-26* Time: 16:43*/
@Data
public class MSQueue {// 1.队列的身份标识private String name;// 2.队列是否持久化private boolean durable = false;// 3.为true表示只可以被一个消费者使用,false表示大家都可以进行使用(没有实现此功能)private boolean exclusive = false;// 4.如果当前交换机次没人(消费者)使用了,就会自动被删除(没有实现此功能)private boolean autoDelete = false;// 5.创建交换机的时候指定一些额外的选项(没有实现此功能)private Map<String, Object> arguments = new HashMap<>();
}

2.3 Binding

 在mqServer中创建core文件夹新建Binding.class

package com.example.demo.mqserver.core;import lombok.Data;/*** Created with IntelliJ IDEA.* Description:绑定(交换机和队列之间的关联关系)* User: YAO* Date: 2023-07-26* Time: 16:43*/
@Data
public class Binding {// 1. 交换机的名字private String exchangeName;// 2. 队列的名字private String queueName;// 3. bindingKey(与routingKey进行匹配)private String bindingKey;// 4.binding这个定西依附于Exchange 和 queue 单独设计持久化就没有什么意义了
}

 2.4 Message

  在mqServer中创建core文件夹新建Message.class 和 BasicProperties.class(消息的属性)

package com.example.demo.mqserver.core;import lombok.Data;import java.io.Serializable;
import java.util.UUID;/*** Created with IntelliJ IDEA.* Description:消息(表示为要传递的消息)* 一, 主要包含两个部分: 属性 和 正文*      1. 属性部分 (BasicProperties)*      2. 正文属性 body** 二, 此处的message的对象要满足在网络中进行传输,并且进行写入文件中,所以我们要进行序列化和反序列化*      此处使用标准库自带的方法进行序列化,不使用Json,因为Json存储的是文本格式的数据,而我们的消息的body是二进制数据* User: YAO* Date: 2023-07-26* Time: 16:44*/
@Data
public class Message implements Serializable {// 版本号: 当程序员进行修改当前类的时候需要将当前版号进行变更private static final long serialVersionUID = 1L;// 1. 消息的属性(核心部分)private BasicProperties basicProperties = new BasicProperties();// 2. 消息的正文 (支持二进制数据)(核心属性)private byte[] body;// 辅助属性// 3. 一个文件中会存储很多消息,如何找到某个消息,在文件中的具体内容呢?// 使用两个偏移量进行表示// [offset,offend) 左闭右开// 不需要被序列化保存文件中(防止进行序列化)private transient long offsetBegin = 0;   // 消息数据的开头距离文件开头的位置偏移(字节)private transient long offsetEnd = 0;   // 消息数据的结尾距离文件开头的位置偏移(字节)// 4. 表示这条消息是否是有效信息(针对文件的删除使用逻辑删除)// 0x1 表示有效 0x0 表示无效private byte isValid = 0x1;// 5. 创建工厂方法,帮助我们进行封装创建message的过程public static Message createMessageWithId(String routingKey ,BasicProperties basicProperties,byte[] body){// 此方法会自动生成一个带有唯一的messageId的message对象Message message = new Message();if (basicProperties != null){message.setBasicProperties(basicProperties);}message.setMessageID("M-" + UUID.randomUUID());message.basicProperties.setRoutingKey(routingKey);message.setBody(body);// 此处将消息的主要的两个属性进行设置了,剩下的辅助属性在持久化之前进行设置return message;}public String getMessageID(){return basicProperties.getMessageId();}public void setMessageID(String messageId){basicProperties.setMessageId(messageId);}public String getRoutingKey(){return basicProperties.getRoutingKey();}public void setRoutingKey(String routingKey){basicProperties.setRoutingKey(routingKey);}public int getDeliverMode(){return basicProperties.getDeliverMode();}public void setDeliverMode(int mode){basicProperties.setDeliverMode(mode);}}

3. 数据库设计

        我们之前划分的模块是将交换机 队列 绑定这三个的信息存储在硬盘中,由于对于操作这三者不是很频繁,所以我们将其存储在数据库中,但是对于消息的存储,我们不能存储在数据库中,是因为我们频繁的进行操作消息,所以我们将其存储在硬盘的文件中.

3.1 SQLite 配置

此处我们使用的数据库是SQLite,是⼀个更轻量的数据库.我们可以在Maven的中央仓库进行导入该依赖,直接进行使用.

1. 引入环境依赖

<dependency><groupId>org.xerial</groupId><artifactId>sqlite-jdbc</artifactId><version>3.41.0.1</version>
</dependency>

2. 配置Spring Boot配置文件application.yaml文件

spring:datasource:
#   当前项目的路径下url: jdbc:sqlite:./data/meta.db
#   sqlite不需要设置进行设置用户名以及密码,这点不同于MySQL
#   sqlite不是客户端节后的程序,就只有自己一个人进行访问,所以不需要进行设置用户名以及密码username:password:driver-class-name: org.sqlite.JDBC
# 设置 Mybatis 的 xml 保存路径
mybatis:mapper-locations: classpath:mapper/**Mapper.xml
#  configuration: # 配置打印 MyBatis 执行的 SQL
#    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
# 配置打印 MyBatis 执行的 SQL
#logging:
#  level:
#    com:
#      example:
#        demo: debug

3.2 Mapper层代码实现

3.2.1 创建表操作

@Mapper
public interface MetaMapper {// 1. 提供三个核心的建表方法void createExchangeTable();void createQueueTable();void createBindingTable();
}

对应MyBatis的Sql创建

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.example.demo.mqserver.mapper.MetaMapper"><!--    1. 消息队列服务器的表创建--><update id="createExchangeTable">create table if not exists exchange(name varchar(50) primary key,type int,durable boolean,autoDelete boolean,arguments varchar(1024))</update><update id="createQueueTable">create table if not exists queue(name varchar(50) primary key,durable boolean,exclusive boolean,autoDelete boolean,arguments varchar(1024))</update><update id="createBindingTable">create table if not exists binding (exchangeName varchar(50),queueName varchar(50),bindingKey varchar(256))</update>
</mapper>

3.2.2 交换机 队列 绑定的增加和删除

@Mapper
public interface MetaMapper {List<Exchange> selectAllExchange();List<MSQueue> selectAllMSQueue();List<Binding> selectAllBinding();void insertExchange(Exchange exchange);void deleteExchange(String exchangeName);void insertQueue(MSQueue queue);void deleteQueue(String queueName);void insertBinding(Binding binding);void deleteBinding(Binding binding);
}

对应MyBatis的Sql创建 

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.example.demo.mqserver.mapper.MetaMapper"><!--    查找数据操作--><select id="selectAllExchange" resultType="com.example.demo.mqserver.core.Exchange">select * from exchange</select><select id="selectAllMSQueue" resultType="com.example.demo.mqserver.core.MSQueue">select * from queue</select><select id="selectAllBinding" resultType="com.example.demo.mqserver.core.Binding">select * from binding</select><!--   插入数据操作--><insert id="insertExchange" >insert into exchange values(#{name}, #{type}, #{durable}, #{autoDelete}, #{arguments});</insert><insert id="insertQueue">insert into queue values(#{name},#{durable},#{exclusive},#{autoDelete},#{arguments})</insert><insert id="insertBinding">insert into binding values(#{exchangeName},#{queueName},#{bindingKey})</insert><!--    删除数据操作--><delete id="deleteExchange">delete from exchange where name=#{exchangeName}</delete><delete id="deleteQueue">delete from queue where name=#{queueName}</delete><delete id="deleteBinding">delete from binding where exchangeName=#{exchangeName} and queueName=#{queueName}</delete>
</mapper>

3.3 实现DataBaseManager

路径 : mqserver.datacenter.DataBaseManager  对数据库的操作进行封装.

1. 初始化数据库的文件

2. 进行数据库建表操作

3. 插入默认的一条交换机数据

4. 提供删除数据库文件的操作,主要用于单元测试.

5. 封装其他数据库的操作(获取,插入,删除)

package com.example.demo.mqserver.datacenter;import com.example.demo.DemoApplication;
import com.example.demo.mqserver.core.Binding;
import com.example.demo.mqserver.core.Exchange;
import com.example.demo.mqserver.core.ExchangeType;
import com.example.demo.mqserver.core.MSQueue;
import com.example.demo.mqserver.mapper.MetaMapper;
import org.apache.ibatis.annotations.Mapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.io.File;
import java.util.List;/*** Created with IntelliJ IDEA.* Description:操作数据库的类* User: YAO* Date: 2023-07-27* Time: 11:36*/
public class DataBaseManager {// 从Spring中获取Bean对象(手动获取)private MetaMapper metaMapper;/*** 数据库初始化*/public void init(){// 手动获取MetaMapper对象metaMapper = DemoApplication.context.getBean(MetaMapper.class);// 1.建库建表if (!checkBDExists()){// 创建data目录File dataDir = new File("./data");dataDir.mkdirs();createTable();// 2.插入默认的数据createDefaultData();System.out.println("[DataBaseManager]: 数据库初始化完成");}else {System.out.println("[DataBaseManager]: 数据库已经存在");}}/*** 删除数据库*/public void deleteDB(){File file = new File("./data/meta.db");boolean ret = file.delete();File dataDir = new File("./data");ret = dataDir.delete();if (ret){System.out.println("[DataBaseManager]: 数据库已经删除");}else {System.out.println("[DataBaseManager]: 数据库删除失败");}}/*** 判断数据库是否存在* @return*/private boolean checkBDExists() {File file = new File("./data/meta.db");if (file.exists()){return true;}return false;}/*** 建表,建库操作不需要自己手动执行* 首次执行到这了的数据操作的时候,就会创建出meta.db文件*/private void createTable(){metaMapper.createBindingTable();metaMapper.createQueueTable();metaMapper.createExchangeTable();System.out.println("[DataBaseManager]: 创建完成");}/*** 给数据库表中添加默认的数据* 添加一个默认的交换机* RabbitMq中有这样一个设定,带有一个匿名的交换机,类型为direct*/private void createDefaultData() {// 构造一个默认的交换机Exchange exchange = new Exchange();exchange.setName("");exchange.setType(ExchangeType.DIRECT);exchange.setDurable(true);exchange.setAutoDelete(false);metaMapper.insertExchange(exchange);System.out.println("[DataBaseManager]: 创建初始数据完成");}// 封装其他数据库操作public void insertExchange(Exchange exchange){metaMapper.insertExchange(exchange);};public void deleteExchange(String exchangeName){metaMapper.deleteExchange(exchangeName);};public void insertQueue(MSQueue queue){metaMapper.insertQueue(queue);};public void deleteQueue(String queueName){metaMapper.deleteQueue(queueName);};public void insertBinding(Binding binding){metaMapper.insertBinding(binding);};public void deleteBinding(Binding binding){metaMapper.deleteBinding(binding);};List<Exchange> selectAllExchange(){return metaMapper.selectAllExchange();};List<MSQueue> selectAllMSQueue(){return metaMapper.selectAllMSQueue();};List<Binding> selectAllBinding(){return metaMapper.selectAllBinding();};
}

3.4 DataBaseManager单元测试

        我们使用Spring自带的单元测试,具体怎么生成单元测试,之前的文章也进行讲解过,就不再进行单独的讲解,其实很简单,只不过别忘了运行环境的注解@SpringBootTest.

package com.example.demo.mqserver.datacenter;import com.example.demo.DemoApplication;
import com.example.demo.mqserver.core.Binding;
import com.example.demo.mqserver.core.Exchange;
import com.example.demo.mqserver.core.ExchangeType;
import com.example.demo.mqserver.core.MSQueue;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.SpringApplication;import java.io.File;
import java.util.List;
import java.util.Map;import static org.junit.jupiter.api.Assertions.*;/*** Created with IntelliJ IDEA.* Description:测试用例(操作数据库)* User: YAO* Date: 2023-07-27* Time: 14:17*/
class DataBaseManagerTest {private DataBaseManager dataBaseManager = new DataBaseManager();/*** 执行测试前的准备工作*/@BeforeEachvoid setUp() {// 由于我们在init中,需要通过context中拿到MetaMapper实例DemoApplication.context = SpringApplication.run(DemoApplication.class);dataBaseManager.init(); // 初始化了MetaMapper}/*** 执行测试完成的后续工作*/@AfterEachvoid tearDown() {// 数据库给清空// 不能直接就进行删除操作 ===>> 先释放Context对象// 先将Context对象进行关闭,因为Context对象持有了MetaMapper的实例,MetaMapper实例又打开了meta.db数据库文件// meta.db被别人打开了,此时进行删除是执行失败的,在Windows是这样的在Linux中可以直接删除// 另一个方面,获取Context操作,会占用8080端口,如果不进行释放,那么下一个单元测试再重新获取Context对象时是获取不到的DemoApplication.context.close();dataBaseManager.deleteDB();}@Testvoid init() {// 在setUp中调用过,我们只需要检查数据库状态即可// 查交换机表 --> 1条数据// 查队列表 --> 0条数据// 查绑定表 --> 0条数据List<Exchange> exchangeList = dataBaseManager.selectAllExchange();List<MSQueue> msQueueList = dataBaseManager.selectAllMSQueue();List<Binding> bindingList = dataBaseManager.selectAllBinding();// 我们使用断言进行查看结果Assertions.assertEquals(1,exchangeList.size());Assertions.assertEquals("",exchangeList.get(0).getName());Assertions.assertEquals(ExchangeType.DIRECT,exchangeList.get(0).getType());Assertions.assertEquals(0,msQueueList.size());Assertions.assertEquals(0,msQueueList.size());}@Testvoid deleteDB() {}private Exchange createTestExchange(String exchangeName){Exchange exchange = new Exchange();exchange.setName(exchangeName);exchange.setType(ExchangeType.FANOUT);exchange.setAutoDelete(false);exchange.setDurable(true);exchange.setArguments("aa",1);exchange.setArguments("bb",2);return exchange;}@Testvoid insertExchange() {// 构造一个Exchange对象Exchange exchange = createTestExchange("testExchange");// 插入数据库dataBaseManager.insertExchange(exchange);// 查询结果List<Exchange> exchangeList = dataBaseManager.selectAllExchange();// 对查询结果进行验证Assertions.assertEquals(2,exchangeList.size());Assertions.assertEquals("testExchange",exchangeList.get(1).getName());Assertions.assertEquals(ExchangeType.FANOUT,exchangeList.get(1).getType());assertFalse(exchangeList.get(1).isAutoDelete());assertTrue(exchangeList.get(1).isDurable());Assertions.assertEquals(1,exchangeList.get(1).getArguments("aa"));Assertions.assertEquals(2,exchangeList.get(1).getArguments("bb"));}@Testvoid deleteExchange() {// 先构造叫交换机Exchange exchange = createTestExchange("TestExchange");dataBaseManager.insertExchange(exchange);List<Exchange> exchangeList = dataBaseManager.selectAllExchange();// 对查询结果进行验证Assertions.assertEquals(2,exchangeList.size());Assertions.assertEquals("TestExchange",exchangeList.get(1).getName());dataBaseManager.deleteExchange("TestExchange");List<Exchange> exchangeList2= dataBaseManager.selectAllExchange();Assertions.assertEquals(1,exchangeList2.size());Assertions.assertEquals("",exchangeList2.get(0).getName());}private MSQueue createTestMSQueue(String queueName){MSQueue queue = new MSQueue();queue.setName(queueName);queue.setDurable(true);queue.setAutoDelete(false);queue.setExclusive(false);queue.setArguments("aaa", 1);queue.setArguments("bbb", 2);return queue;}@Testvoid insertQueue() {MSQueue queue = createTestMSQueue("testQueue");dataBaseManager.insertQueue(queue);List<MSQueue> queueList = dataBaseManager.selectAllMSQueue();Assertions.assertEquals(1, queueList.size());MSQueue newQueue = queueList.get(0);Assertions.assertEquals("testQueue", newQueue.getName());assertTrue(newQueue.isDurable());assertFalse(newQueue.isAutoDelete());assertFalse(newQueue.isExclusive());Assertions.assertEquals(1, newQueue.getArguments("aaa"));Assertions.assertEquals(2, newQueue.getArguments("bbb"));}@Testvoid deleteQueue() {MSQueue queue = createTestMSQueue("testQueue");dataBaseManager.insertQueue(queue);List<MSQueue> queueList = dataBaseManager.selectAllMSQueue();Assertions.assertEquals(1, queueList.size());// 进行删除dataBaseManager.deleteQueue("testQueue");queueList = dataBaseManager.selectAllMSQueue();Assertions.assertEquals(0, queueList.size());}private Binding createTestBinding(String exchangeName, String queueName) {Binding binding = new Binding();binding.setExchangeName(exchangeName);binding.setQueueName(queueName);binding.setBindingKey("testBindingKey");return binding;}@Testpublic void testInsertBinding() {Binding binding = createTestBinding("testExchange", "testQueue");dataBaseManager.insertBinding(binding);List<Binding> bindingList = dataBaseManager.selectAllBinding();Assertions.assertEquals(1, bindingList.size());Assertions.assertEquals("testExchange", bindingList.get(0).getExchangeName());Assertions.assertEquals("testQueue", bindingList.get(0).getQueueName());Assertions.assertEquals("testBindingKey", bindingList.get(0).getBindingKey());}@Testpublic void testDeleteBinding() {Binding binding = createTestBinding("testExchange", "testQueue");dataBaseManager.insertBinding(binding);List<Binding> bindingList = dataBaseManager.selectAllBinding();Assertions.assertEquals(1, bindingList.size());// 删除Binding toDeleteBinding = createTestBinding("testExchange", "testQueue");dataBaseManager.deleteBinding(toDeleteBinding);bindingList = dataBaseManager.selectAllBinding();Assertions.assertEquals(0, bindingList.size());}
}

以上就完成了数据库相关的操作

4. 消息存储设计

 我们给每个队列分配一个目录,目录的名字为data+队列名 ./data/testQueue

 

 4.1 创建MessageFileManager类

路径 :  mqserver.database.MessageFileManager 

实现方法:

  • 1. 获取指定队列对应的消息文件所在的文件夹
  • 2. 获取指定队列的对应消息的数据文件路径
  • 3. 获取指定队列的对应消息的统计文件路径
  • 4. 创建队列对应的文件和目录
  • 5. 删除指定队列对应的消息的目录和文件
  • 6. 对于队列对应消息的文件进行判断是否存在
  • 7. 往队列对应文件中,添加消息
  • 8. 删除消息 逻辑删除,对消息的的isValid进行设置成0
  • 9. 加载指定名称的队列对应的所有消息存放在一个链表中
  • 10.实现文件消息的垃圾回收
package com.example.demo.mqserver.datacenter;import com.example.demo.common.BinaryTool;
import com.example.demo.common.MqException;
import com.example.demo.mqserver.core.MSQueue;
import com.example.demo.mqserver.core.Message;import java.io.*;
import java.util.LinkedList;
import java.util.Scanner;/*** Created with IntelliJ IDEA.* Description:对硬盘上的消息进行管理* User: YAO* Date: 2023-07-27* Time: 17:43*/
public class MessageFileManager {// 1. 约定消息文件所在的目录和文件名public void init(){// 暂时不需要进行初始化}/*** 1. 获取指定队列对应的消息文件所在的文件夹* @param queueName* @return*/private String getQueueDir(String queueName){return "./data/" + queueName;}/*** 2.获取指定队列的对应消息的数据文件路径* @param queueName* @return*/private String getQueueDataPath(String queueName){return getQueueDir(queueName) + "/queue_data.txt";}/*** 3.获取指定队列的对应消息的统计文件路径* @param queueName* @return*/private String getQueueStatPath(String queueName){return getQueueDir(queueName) + "/queue_stat.txt";}/*** 4. 定义一个静态内部类进行描述,消息的统计信息文件的属性** 对于简单的类,就直接设置为public的成员变量*/static public class Stat{// 1. 定义总消息的数量public int totalCount;// 2. 定义有效消息的数量public int validCount;}/*** 4.1  读取统计文件的信息* @param queueName* @return*/private Stat readStat(String queueName){// 由于当前的统计信息的文件类型是文本文件,我们可以直接使用Scanner进行读取Stat stat = new Stat();try(InputStream inputStream = new FileInputStream(getQueueStatPath(queueName))){Scanner scanner = new Scanner(inputStream);stat.totalCount = scanner.nextInt();stat.validCount = scanner.nextInt();return stat;} catch (IOException e) {e.printStackTrace();}return null;}/*** 4.2 往统计文件写入信息* @param queueName* @param stat* @return*/private void writeStat(String queueName, Stat stat) {// 使用 PrintWrite 来写文件.// OutputStream 打开文件, 默认情况下, 会直接把原文件清空. 此时相当于新的数据覆盖了旧的.try (OutputStream outputStream = new FileOutputStream(getQueueStatPath(queueName))) {PrintWriter printWriter = new PrintWriter(outputStream);printWriter.write(stat.totalCount + "\t" + stat.validCount);printWriter.flush();} catch (IOException e) {e.printStackTrace();}}/*** 5. 创建队列对应的文件和目录*/public void createQueueFiles(String queueName) throws IOException {// 1. 先创建队列对应的消息目录File baseDir = new File(getQueueDir(queueName));if (!baseDir.exists()){// 不存在就进行创建文件路径boolean ok = baseDir.mkdirs();if(!ok){throw new IOException("创建目录失败! baseDir=" + baseDir.getAbsolutePath());}}// 2. 创建消息数据文件File queueDataDir = new File(getQueueDataPath(queueName));if (!queueDataDir.exists()){// 不存在就进行创建消息数据文件boolean ok = queueDataDir.createNewFile();if(!ok){throw new IOException("创建消息数据文件失败! baseDir=" + queueDataDir.getAbsolutePath());}}// 3. 创建消息统计文件File queueStatDir = new File(getQueueStatPath(queueName));if (!queueStatDir.exists()){// 不存在就进行创建消息统计文件boolean ok = queueStatDir.createNewFile();if(!ok){throw new IOException("创建消息数据文件失败! baseDir=" + queueStatDir.getAbsolutePath());}}// 4. 给消息统计文件进行设置初始值  0\t0Stat stat = new Stat();stat.totalCount = 0;stat.validCount = 0;writeStat(queueName,stat);}/*** 6. 删除指定队列对应的消息的目录和文件* 当队列被删除的时候,对应的消息文件也要删除* @param queueName*/public void destroyQueueFiles(String queueName) throws IOException {// 1. 删除文件里面的内容File queueDataFile = new File(getQueueDataPath(queueName));boolean ok1 = queueDataFile.delete();File queueStatFile = new File(getQueueStatPath(queueName));boolean ok2 = queueStatFile.delete();// 2. 删除目录File baseDir = new File(getQueueDir(queueName));boolean ok3 = baseDir.delete();if (!ok1 || !ok2 || !ok3){throw new IOException("删除消息目录和文件失败, baseDir:"+baseDir.getAbsolutePath());}}/*** 7. 对于队列对应消息的文件进行判断是否存在**  后续生产者进行生产消息了,如果此时的消息设置的是持久化类型,就需要判断之前的存放消息的文件夹是否存在* @param queueName* @return*/public boolean checkFilesExits(String queueName){File queueDataFile = new File(getQueueDataPath(queueName));if (!queueDataFile.exists()){return false;}File queueStatFile = new File(getQueueStatPath(queueName));if (!queueStatFile.exists()){return false;}return true;}/*** 8. 往队列对应文件中,添加消息* @param queue   写入的队列* @param message 当前消息内容*/public void sendMessage(MSQueue queue, Message message) throws MqException, IOException {// 1. 先检查要写入队列对应的文件和目录是否存在if (!checkFilesExits(queue.getName())){// 自己定义异常进行抛出throw new MqException("[MessageFileManager] 队列对应文件不存在! queueName=" + queue.getName());}// 2. 将Message对象进行序列化,然后进行写入文件byte[] messageBinary = BinaryTool.toBytes(message);// 3. 获取当前二进制数据的长度,计算 Message的offsetBeg 和 offsetEnd// offsetBeg = 当前文件长度 + 4   (4个字节用来表示文件的长度)// offsetEnd = 当前文件长度 + 4 + 文件长度synchronized (queue){File queueDataFile = new File(getQueueDataPath(queue.getName()));message.setOffsetBegin(queueDataFile.length() + 4);message.setOffsetEnd(queueDataFile.length() + 4 + messageBinary.length);// 3. 将二进制消息数据进行写入文件try(OutputStream outputStream = new FileOutputStream(queueDataFile,true)){try(DataOutputStream dataOutputStream = new DataOutputStream(outputStream)){// 使用DataOutputStream写可以将一个int写入的时候是四个字节// 写入消息长度(四个字节)dataOutputStream.writeInt(messageBinary.length);// 写入消息本体dataOutputStream.write(messageBinary);}}// 5. 更新消息统计文件Stat stat = readStat(queue.getName());stat.validCount += 1;stat.totalCount += 1;writeStat(queue.getName(),stat);}}/*** 9. 删除消息  逻辑删除,对消息的的isValid进行设置成0*   1. 将文件的二级制数据进行读取,还原成Message对象*   2. 将该Message对象的isValid属性进行设置为0*   3. 将修改过的Message对象进行转换成二进制数据重新写入到文件中*   此处这个参数的Message必须包含有效的offsetBeg 和 offsetEnd* @param queue* @param message*/public void deleteMessage(MSQueue queue, Message message) throws IOException, ClassNotFoundException {// 1. 将文件的二级制数据进行读取,还原成Message对象// 读取的是随机读取,指定位置进行读取  RandomAccessFile// 对其进行加锁synchronized (queue){try(RandomAccessFile randomAccessFile = new RandomAccessFile(getQueueDataPath(queue.getName()),"rw")){// 1.1先从文件中读取对应的Message数据byte[] bufferSrc = new byte[(int) (message.getOffsetEnd()-message.getOffsetBegin())];// 1.2将光标指定到消息数据的开始位置randomAccessFile.seek(message.getOffsetBegin());randomAccessFile.read(bufferSrc);Message diskMessage = (Message) BinaryTool.fromBytes(bufferSrc);// 2. 将该Message对象的isValid属性进行设置为0x0diskMessage.setIsValid((byte) 0x0);// 3. 将修改过的Message对象进行转换成二进制数据重新写入到文件中byte[] bufferSet = BinaryTool.toBytes(diskMessage);// 记得移动光标到消息的开始位置,光标会随着读写操作会发生移动randomAccessFile.seek(message.getOffsetBegin());randomAccessFile.write(bufferSet);// 以上操作对于文件来说就是一个字节进行了调整}// 4.最后进行更新统计文件Stat stat = readStat(queue.getName());if (stat.validCount > 0){stat.validCount -= 1;}writeStat(queue.getName(),stat);}}/*** 10. 加载指定名称的队列对应的所有消息存放在一个链表中*     调用时机: 当程序进行启动的时候进行调用()*     使用LinkedList: 后续为了进行头删除操作* @param queueName 队列名字* @return 指定队列的所有的消息*/public LinkedList<Message> loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException {// 1. 创建消息链表LinkedList<Message> messages = new LinkedList<>();// 2. 读取消息文件(顺序读取)try(InputStream inputStream = new FileInputStream(getQueueDataPath(queueName))){try(DataInputStream dataInputStream = new DataInputStream(inputStream)){long currentOffset = 0;while (true){// 1. 读取文件长度int messageSize = dataInputStream.readInt();// 读到文件末尾,就会抛出异常// 2. 按照消息长度进行读消息byte[] buffer = new byte[messageSize];int actualSize = dataInputStream.read(buffer);if (messageSize != actualSize){// 如果不匹配,说明文件的问题,格式错乱了throw new MqException("[MessageFileManager] 文件格式有误! queueName=" + queueName);}// 3. 把读取二级制消息数据进行反序列化,添加到链表中Message message = (Message) BinaryTool.fromBytes(buffer);// 4. 判断消息对象是否是无效消息if (message.getIsValid() != 0x1){// 无效数据直接跳过,但是光标要进行更新currentOffset += (4 + messageSize);continue;}// 5. 有效数据,则需要把这个Message对象添加到链表,加入之前还需要填写offsetBeg 和 offsetEnd//  进行计算offset的时候,需要记录当前文件光标的位置message.setOffsetBegin(currentOffset + 4);message.setOffsetEnd(currentOffset + 4 + messageSize);// 更新光标的位置currentOffset += (4 + messageSize);messages.add(message);}}catch (EOFException e){// 这个catch 并不是处理异常,而是正常的业务逻辑,表示文章读到了末尾,会被ReadeInt抛出异常System.out.println("[MessageFileManager] 恢复Message数据完成");}}return messages;}/*** 11. 实现文件消息的垃圾回收*  实现思路: 使用垃圾回收的复制算法*  具体: 判断当文件中消息总数为2000的时候,并且有效消息不足50%的时候,触发垃圾回收*       此时就把当前文件的有效数据进行提取,单独的写入到新的文件中,删除旧文件,使用新文件进行代替*//*** 11.1 检测当前队列的消息文件是否要进行触发垃圾回收* @param queueName* @return*/public boolean checkGC(String queueName) throws MqException {if (!checkFilesExits(queueName)){throw new MqException("[MessageFileManager] 队列对应文件不存在! queueName=" + queueName);}// 判断总消息数量和有效数量Stat stat = readStat(queueName);if (stat.totalCount > 200 && (double)stat.validCount / (double)stat.totalCount < 0.5){return true;}return false;}/*** 11.2 创建存放有效数据的新的文件夹* @param queueName* @return*/private String getQueueDataNewPath(String queueName){return getQueueDir(queueName) + "/queue_data_new.txt";}/*** 11.3    进行垃圾回收  (复制算法)*   1. 创建新的文件*   2. 把之前有效的消息进行读取,写入到新的文件中*   3. 删除原来的文件*   4. 将新的文件进行重命名操作(修改成原来的文件)*   5. 更新消息的统计文件的数据**   注意: 进行垃圾回收的时候是对数据的大改动,所以不允许别的线程进行对消息文件进行改动,所以要进行加锁的操作** @param queue*/public void gc(MSQueue queue) throws IOException, MqException, ClassNotFoundException {synchronized (queue){// gc操作可能比较耗时,我们记录一下消耗的时间long gcBeg = System.currentTimeMillis();// 1. 创建新的文件File queueDataNewFile = new File(getQueueDataNewPath(queue.getName()));if (queueDataNewFile.exists()){// 正常情况下是不存在的,如果存在就代表上次gc没有完成throw  new MqException("[MessageFileManager] gc时发现该队列的queue_data_new 已经存在 queueName:" + queue.getName());}boolean ok = queueDataNewFile.createNewFile();if (!ok){throw  new MqException("[MessageFileManager] 队列对应的消息的queue_data_new文件创建失败 queueName:" + queue.getName());}// 2. 把之前有效的消息进行读取,写入到新的文件中LinkedList<Message> messages = loadAllMessageFromQueue(queue.getName());try(OutputStream outputStream = new FileOutputStream(queueDataNewFile)){try(DataOutputStream dataOutputStream = new DataOutputStream(outputStream)){for (Message message : messages){byte[] buffer = BinaryTool.toBytes(message);// 先写四个字节的消息的长度dataOutputStream.writeInt(buffer.length);// 写入消息dataOutputStream.write(buffer);}}}// 3. 删除原来的文件File queueDataOldFile = new File(getQueueDataPath(queue.getName()));ok = queueDataOldFile.delete(); if (!ok){throw  new MqException("[MessageFileManager] 队列对应的消息的queue_data_old文件删除失败 queueName:" + queue.getName());}// 4. 将新的文件进行重命名操作(修改成原来的文件)ok = queueDataNewFile.renameTo(queueDataOldFile);if (!ok){throw  new MqException("[MessageFileManager] 队列对应的消息的queue_data_new文件重命名失败 queueName:" + queue.getName());}// 5. 更新消息的统计文件的数据Stat stat = readStat(queue.getName());stat.totalCount = messages.size();stat.validCount = messages.size();writeStat(queue.getName(),stat);long gcEnd = System.currentTimeMillis();System.out.println("[MessageFileManager] gc执行完毕, queueName=" + queue.getName() + ",time" + (gcEnd-gcBeg) + "ms");}}
}

4.2 MessageFileManager单元测试

package com.example.demo.mqserver.datacenter;import com.example.demo.DemoApplication;
import com.example.demo.common.MqException;
import com.example.demo.mqserver.core.MSQueue;
import com.example.demo.mqserver.core.Message;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.util.ReflectionTestUtils;import java.io.File;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;import static org.junit.jupiter.api.Assertions.*;/*** Created with IntelliJ IDEA.* Description:测试硬盘中消息管理的方法* User: YAO* Date: 2023-07-28* Time: 14:06*/
@SpringBootTest
class MessageFileManagerTest {private MessageFileManager messageFileManager = new MessageFileManager();public static final String queueName1 = "testQueue1";public static final String queueName2 = "testQueue2";@BeforeEachvoid setUp() throws IOException {messageFileManager.createQueueFiles(queueName1);messageFileManager.createQueueFiles(queueName2);}@AfterEachvoid tearDown() throws IOException {messageFileManager.destroyQueueFiles(queueName1);messageFileManager.destroyQueueFiles(queueName2);}@Testvoid createQueueFiles() {// 1.创建队列消息文件已经执行// 2.直接验证文件是否勋在File queueDataFile1 = new File("./data/" + queueName1 + "/queue_data.txt");assertTrue(queueDataFile1.isFile());File queueStatFile1 = new File("./data/" + queueName1 + "/queue_stat.txt");assertTrue(queueStatFile1.isFile());File queueDataFile2 = new File("./data/" + queueName2 + "/queue_data.txt");assertTrue(queueDataFile2.isFile());File queueStatFile2 = new File("./data/" + queueName2 + "/queue_stat.txt");assertTrue(queueStatFile2.isFile());}@Testvoid readAndWriteStat() {MessageFileManager.Stat stat = new MessageFileManager.Stat();stat.totalCount = 100;stat.validCount = 50;// 此处使用反射的机制进行调用writeStat 和 readStat// 此处使用Spring封装好的反射类ReflectionTestUtils.invokeMethod(messageFileManager,"writeStat",queueName1,stat);MessageFileManager.Stat newStat = ReflectionTestUtils.invokeMethod(messageFileManager,"readStat",queueName1);Assertions.assertEquals(100,newStat.totalCount);Assertions.assertEquals(50,newStat.validCount);}private MSQueue createTestQueue(String queueName) {MSQueue queue = new MSQueue();queue.setName(queueName);queue.setDurable(true);queue.setAutoDelete(false);queue.setExclusive(false);return queue;}private Message createTestMessage(String content) {Message message = Message.createMessageWithId("testRoutingKey", null, content.getBytes());return message;}@Testvoid sendMessage() throws IOException, MqException, ClassNotFoundException {//  构造消息 , 构造队列Message message = createTestMessage("testMessage");// 对应的目录和文件啥的都存在才行.MSQueue queue = createTestQueue(queueName1);// 调用发送消息方法messageFileManager.sendMessage(queue, message);// 检查 stat 文件.MessageFileManager.Stat stat = ReflectionTestUtils.invokeMethod(messageFileManager, "readStat", queueName1);Assertions.assertEquals(1, stat.totalCount);Assertions.assertEquals(1, stat.validCount);// 检查 data 文件LinkedList<Message> messages = messageFileManager.loadAllMessageFromQueue(queueName1);System.out.println(messages);Assertions.assertEquals(1, messages.size());Message curMessage = messages.get(0);Assertions.assertEquals(message.getMessageID(), curMessage.getMessageID());Assertions.assertEquals(message.getRoutingKey(), curMessage.getRoutingKey());Assertions.assertEquals(message.getDeliverMode(), curMessage.getDeliverMode());// 比较两个字节数组的内容是否相同, 不能直接使用 assertEquals 了.Assertions.assertArrayEquals(message.getBody(), curMessage.getBody());System.out.println("message: " + curMessage);}@Testvoid loadAllMessageFromQueue() throws IOException, MqException, ClassNotFoundException {// 往队列中插入100条数据,验证100条消息,验证加载之后是否跟之前是一致的List<Message> expectMessages = new LinkedList<>();MSQueue queue = createTestQueue(queueName1);for (int i = 0; i < 100; i++) {Message message = createTestMessage("testMessage" + i);messageFileManager.sendMessage(queue,message);expectMessages.add(message);}// 从硬盘中读取LinkedList<Message> actualMessages = messageFileManager.loadAllMessageFromQueue(queueName1);Assertions.assertEquals(expectMessages.size(),actualMessages.size());for (int i = 0; i < expectMessages.size(); i++) {Message expectMessage = expectMessages.get(i);Message actualMessage = actualMessages.get(i);System.out.println("[" + i +"]" + "actualMessages:" + actualMessage);Assertions.assertEquals(expectMessage.getMessageID(), actualMessage.getMessageID());Assertions.assertEquals(expectMessage.getRoutingKey(), actualMessage.getRoutingKey());Assertions.assertEquals(expectMessage.getDeliverMode(), actualMessage.getDeliverMode());Assertions.assertArrayEquals(expectMessage.getBody(), actualMessage.getBody());Assertions.assertEquals(0x1, actualMessage.getIsValid());}}@Testvoid deleteMessage() throws IOException, MqException, ClassNotFoundException {// 创建队列, 写入 10 个消息. 删除其中的几个消息. 再把所有消息读取出来, 判定是否符合预期.MSQueue queue = createTestQueue(queueName1);List<Message> expectedMessages = new LinkedList<>();for (int i = 0; i < 10; i++) {Message message = createTestMessage("testMessage" + i);messageFileManager.sendMessage(queue, message);expectedMessages.add(message);}// 删除其中的三个消息messageFileManager.deleteMessage(queue, expectedMessages.get(7));messageFileManager.deleteMessage(queue, expectedMessages.get(8));messageFileManager.deleteMessage(queue, expectedMessages.get(9));// 对比这里的内容是否正确.LinkedList<Message> actualMessages = messageFileManager.loadAllMessageFromQueue(queueName1);Assertions.assertEquals(7, actualMessages.size());for (int i = 0; i < actualMessages.size(); i++) {Message expectedMessage = expectedMessages.get(i);Message actualMessage = actualMessages.get(i);System.out.println("[" + i + "] actualMessage=" + actualMessage);Assertions.assertEquals(expectedMessage.getMessageID(), actualMessage.getMessageID());Assertions.assertEquals(expectedMessage.getRoutingKey(), actualMessage.getRoutingKey());Assertions.assertEquals(expectedMessage.getDeliverMode(), actualMessage.getDeliverMode());Assertions.assertArrayEquals(expectedMessage.getBody(), actualMessage.getBody());Assertions.assertEquals(0x1, actualMessage.getIsValid());}}@Testvoid gc() throws IOException, MqException, ClassNotFoundException {// 先往队列中写 100 个消息. 获取到文件大小.// 再把 100 个消息中的一半, 都给删除掉(比如把下标为偶数的消息都删除)// 再手动调用 gc 方法, 检测得到的新的文件的大小是否比之前缩小了.MSQueue queue = createTestQueue(queueName1);List<Message> expectedMessages = new LinkedList<>();for (int i = 0; i < 100; i++) {Message message = createTestMessage("testMessage" + i);messageFileManager.sendMessage(queue, message);expectedMessages.add(message);}// 获取 gc 前的文件大小File beforeGCFile = new File("./data/" + queueName1 + "/queue_data.txt");long beforeGCLength = beforeGCFile.length();// 删除偶数下标的消息for (int i = 0; i < 100; i += 2) {messageFileManager.deleteMessage(queue, expectedMessages.get(i));}// 手动调用 gcmessageFileManager.gc(queue);// 重新读取文件, 验证新的文件的内容是不是和之前的内容匹配LinkedList<Message> actualMessages = messageFileManager.loadAllMessageFromQueue(queueName1);Assertions.assertEquals(50, actualMessages.size());for (int i = 0; i < actualMessages.size(); i++) {// 把之前消息偶数下标的删了, 剩下的就是奇数下标的元素了.// actual 中的 0 对应 expected 的 1// actual 中的 1 对应 expected 的 3// actual 中的 2 对应 expected 的 5// actual 中的 i 对应 expected 的 2 * i + 1Message expectedMessage = expectedMessages.get(2 * i + 1);Message actualMessage = actualMessages.get(i);Assertions.assertEquals(expectedMessage.getMessageID(), actualMessage.getMessageID());Assertions.assertEquals(expectedMessage.getRoutingKey(), actualMessage.getRoutingKey());Assertions.assertEquals(expectedMessage.getDeliverMode(), actualMessage.getDeliverMode());Assertions.assertArrayEquals(expectedMessage.getBody(), actualMessage.getBody());Assertions.assertEquals(0x1, actualMessage.getIsValid());}// 获取新的文件的大小File afterGCFile = new File("./data/" + queueName1 + "/queue_data.txt");long afterGCLength = afterGCFile.length();System.out.println("before: " + beforeGCLength);System.out.println("after: " + afterGCLength);Assertions.assertTrue(beforeGCLength > afterGCLength);// 验证gc之后的是stat文件的内容// 反射进行获取MessageFileManager.Stat newStat = ReflectionTestUtils.invokeMethod(messageFileManager,"readStat",queueName1);Assertions.assertEquals(50,newStat.validCount);Assertions.assertEquals(50,newStat.totalCount);}
}

单元测试全部通过: 

5. 整合数据库和文件操作(DiskDataCenter)

 管理所有硬盘上的数据(对数据库与和文件对的操作进行封装) 

1. 数据库: 交换机 绑定 队列 

2. 数据文件: 消息

上层逻辑要操作银盘,就进行调用通过这个类,(上层代码关心的是数据存储在数据库还是文件中)

package com.example.demo.mqserver.datacenter;import com.example.demo.common.MqException;
import com.example.demo.mqserver.core.Binding;
import com.example.demo.mqserver.core.Exchange;
import com.example.demo.mqserver.core.MSQueue;
import com.example.demo.mqserver.core.Message;
import lombok.Data;import java.io.IOException;
import java.util.LinkedList;
import java.util.List;/*** Created with IntelliJ IDEA.* Description:管理所有硬盘上的数据(对数据库与和文件对的操作进行封装)*      1. 数据库: 交换机 绑定 队列*      2. 数据文件: 消息*  上层逻辑要操作银盘,就进行调用通过这个类,(上层代码关心的是数据存储在数据库还是文件中)* User: YAO* Date: 2023-07-28* Time: 16:25*/
@Data
public class DiskDataCenter {// 管理数据库中数据private DataBaseManager dataBaseManager = new DataBaseManager();// 管理文件中的数据private MessageFileManager messageFileManager = new MessageFileManager();public void init(){// 初始化dataBaseManager所有的条件dataBaseManager.init();// 当前messageFileManager没有任何初始化的内容messageFileManager.init();}// 封装交换机, 队列, 绑定的操作-----------------------------------------------------------public void insertExchange(Exchange exchange){dataBaseManager.insertExchange(exchange);}public void deleteExchange(String exchangeName){dataBaseManager.deleteExchange(exchangeName);}public void insertQueue(MSQueue queue) throws IOException {// 创建队列的同时,不仅仅是把队列对象写到数据库,而且还需要创建出对应的目录和文件messageFileManager.createQueueFiles(queue.getName());dataBaseManager.insertQueue(queue);}public void deleteQueue(String queueName) throws IOException {// 删除队列的同时,不仅仅是把队列对象在数据库中进行删除,而且还要删除队列对应的目录和文件messageFileManager.destroyQueueFiles(queueName);dataBaseManager.deleteQueue(queueName);}public void insertBinding(Binding binding){dataBaseManager.insertBinding(binding);}public void deleteBinding(Binding binding){dataBaseManager.deleteBinding(binding);}public List<Exchange> selectAllExchange(){return dataBaseManager.selectAllExchange();}public List<MSQueue> selectAllMSQueue(){return dataBaseManager.selectAllMSQueue();}public List<Binding> selectAllBinding(){return dataBaseManager.selectAllBinding();}// 封装消息的操作-----------------------------------------------------------public void sendMessage(MSQueue queue, Message message) throws MqException, IOException {messageFileManager.sendMessage(queue,message);}public void deleteMessage(MSQueue queue, Message message) throws IOException, ClassNotFoundException, MqException {messageFileManager.deleteMessage(queue,message);// 进行判断是否进行gcif (messageFileManager.checkGC(queue.getName())){// 进行gcmessageFileManager.gc(queue);}}public LinkedList<Message> loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException {return messageFileManager.loadAllMessageFromQueue(queueName);}
}

以上我们就完成了相关硬盘数据的封装,之后上层代码在调用的时候就不必考虑数据是存储在哪个地方.


结语

本文主要是对硬盘存储的数据进行封装以及相关API的实现,为以后虚拟主机操作交换机 队列 绑定 消息提供服务.

完整的项目代码已上传Gitee,欢迎大家访问.👇👇👇

模拟实现消息队列icon-default.png?t=N6B9https://gitee.com/yao-fa/advanced-java-ee/tree/master/My-mq

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/80720.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

“他“是怎么拿offer的?全网最全,性能测试面试题+答案(超全整理)

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 1、什么是负载测试…

R语言3_安装SeurateData

环境Ubuntu22/20, R4.1 在命令行中键入&#xff0c; apt-get update apt install libcurl4-openssl-dev libssl-dev libxml2-dev libcairo2-dev libgtk-3-dev # libcairo2-dev :: systemfonts # libgtk :: textshaping进入r语言交互环境&#xff0c;键入&#xff0c; instal…

7.物联网操作系统互斥信号量

1.使用互斥信号量解决信号量导致的优先级反转&#xff0c; 2.使用递归互斥信号量解决互斥信号量导致的死锁。 3.高优先级主函数中多次使用同一信号量的使用&#xff0c;使用递归互斥信号量&#xff0c;但要注意每个信号量的使用要对应一个释放 优先级翻转问题 优先级翻转功能需…

Git仓关联多个远程仓路径

前言 Git仓如果需要将代码push到多个仓&#xff0c;常用的做法是添加多个远程仓路径&#xff0c;然后分别push。这样虽然可以实现目的&#xff0c;但是需要多次执行push指令&#xff0c;很麻烦。 本文介绍关联多个远程仓路径且执行一次push指令的方法&#xff1a;git remote …

conda 环境 numpy 安装报错需要 Microsoft Visual C++ 14.0

到公司装深度学校环境。项目较旧&#xff0c;安装依赖&#xff0c;一堆报错&#xff08;基于 conda 环境&#xff09;&#xff1a; numpy 安装报需要 C 14.0 No module named numpy.distutils._msvccompiler in numpy.distutils; trying from distutilserror: Microsoft Visu…

Maven-生命周期及命令

关于本文 ✍写作原因 之前在学校学习的时候&#xff0c;编写代码使用的项目都是单体架构&#xff0c;导入开源框架依赖时只需要在pom.xml里面添加依赖&#xff0c;点一下reload按钮即可解决大部分需求&#xff1b;但是在公司使用了dubbo微服务架构之后发现只知道使用reload不足…

COSV Schema 1.0正式对外发布,棱镜七彩参与制定工作

近期&#xff0c;CCF版开源漏洞信息描述规范COSV Schema 1.0正式制定并对外发布&#xff0c;棱镜七彩参与制定工作。 图 COSV Schema 1.0制定过程贡献单位及专家名单 作为开源软件治理与软件供应链安全领域的先行者&#xff0c;棱镜七彩一直致力于提升开源效能、防范开源漏洞。…

一起学数据结构(3)——万字解析:链表的概念及单链表的实现

上篇文章介绍了数据结构的一些基本概念&#xff0c;以及顺序表的概念和实现&#xff0c;本文来介绍链表的概念和单链表的实现&#xff0c;在此之前&#xff0c;首先来回顾以下顺序表的特点&#xff1a; 1.顺序表特点回顾&#xff1a; 1. 顺序表是一组地址连续的存储单元依次存…

FL Studio21高级中文版本下载及切换中文语言教程

FL Studio对新人有极高的友好度&#xff0c;成为编曲软件的入门首选&#xff01;FL Studio官方提供多达31款各类插件&#xff0c;令你编曲功力大涨&#xff01;FL Studio是超多顶级音乐人的启蒙首选&#xff01;包括百大DJ冠军Martin Garrix&#xff0c;六获格莱美提名的Deadma…

Java入门2022黑马-200-1

1-5 常用cmd命令 dir可以查看隐藏的文件&#xff0c; exit 退出 6-20 20-30 30-40 37 三元表达式 switch新特性 统计 while continue break 50

ELK企业级日志分析系统

目录 一、ELK 概述 1.ElasticSearch 2.Kiabana 3.Logstash 可以添加的其它组件 1.Filebeat 2.Fluentd 三、为什么要使用 ELK 四、ELK 的工作原理 五、 ELK Elasticsearch 集群部署 更改主机名、配置域名解析、查看Java环境 部署 Elasticsearch 软件 修改elasticsearc…

怎么合并多个视频?简单视频合并方法分享

合并多个视频可以将它们组合成一个更长的视频&#xff0c;这对于需要播放多个短视频的情况非常有用。此外&#xff0c;合并视频还可以使视频编辑过程更加高效&#xff0c;因为不必将多个独立的视频文件分别处理。最后&#xff0c;合并视频可以减少文件数量&#xff0c;从而使整…

K8S系列文章之 Kind 部署K8S的 服务发布

安装kind 下载 https://github.com/kubernetes-sigs/kind/releases/download/0.17.0/kind-linux-amd64 执行以下命令&#xff1a; mv kind-linux-amd64 /usr/local/bin/kind chmod 777 /usr/local/bin/kind 之前需要先在本地主机安装好docker yum -y install yum-utils d…

vscode Google代码风格设置无效解决

1. 采用第一个方法设置google代码设置风格 2. 安装了clangd后需要在格式化风格做选择 vscode 安装 clang-format插件 $ code /home/tony/.config/Code/User/settings.json 这就能解决google风格设置无效的问题了&#xff0c;原来根因在于使用的格式化插件没有生效导致&#xf…

MemFire教程|FastAPI+MemFire Cloud+LangChain开发ChatGPT应用-Part2

基本介绍 上篇文章我们讲解了使用FastAPIMemFire CloudLangChain进行GPT知识库开发的基本原理和关键路径的代码实现。目前完整的实现代码已经上传到了github&#xff0c;感兴趣的可以自己玩一下&#xff1a; https://github.com/MemFire-Cloud/memfirecloud-qa 目前代码主要…

VIM 编辑器: Bram Moolenaar

VIM 用了很长时间&#xff0c; 个人的 VIM 配置文件差不多10年没有更新了。以前写程序的时候&#xff0c; 编辑都用这个。 linux kernel&#xff0c; boost规模的代码都不在话下。现在虽然代码写的少了&#xff0c;依然是我打开文件的首选。 现在用手机了&#xff0c;配个蓝牙键…

UE中低延时播放RTSP监控视频解决方案

第1章 方案简介 1.1 行业痛点 在各种智慧城市、智慧社区、智慧水利、智慧矿山等数字孪生项目中&#xff0c;经常使用通UE来开发三维可视化场景。在这些场景中通常都需要把现场的各种监控视频在UE的可视化场景中接入&#xff0c;主要包含海康威视、大华、宇视、华为等众多监控…

腾讯云-宝塔添加MySQL数据库

1. 数据库菜单 2. 添加数据库 3. 数据库添加成功 4. 上传数据库文件 5. 导入数据库文件 6. 开启数据库权限 7. 添加安全组 (宝塔/腾讯云) 8. Navicat 连接成功

小白到运维工程师自学之路 第六十五集 (docker-compose)

一、概述 Docker Compose 的前身是 Fig&#xff0c;它是一个定义及运行多个 Docker 容器的工具。可以使用YAML文件来配置应用程序的服务。然后&#xff0c;使用单个命令&#xff0c;您可以创建并启动配置中的所有服务。Docker Compose 会通过解析容器间的依赖关系&#xff08;…

网络编程——深入理解TCP/IP协议——OSI模型和TCP/IP模型:构建网络通信的基石

TCP/IP协议— 一、简介 TCP/IP协议&#xff0c;即传输控制协议/互联网协议&#xff0c;是一组用于在计算机网络中实现通信的协议。它由两个主要的协议组成&#xff1a;TCP&#xff08;传输控制协议&#xff09;和IP&#xff08;互联网协议&#xff09;。TCP负责确保数据的可靠…