仿 RabbitMQ 的消息队列3(实战项目)

七. 消息存储设计

上一篇博客已经将消息统计文件的读写代码实现了,下一步我们将实现创建队列文件和目录。

实现创建队列文件和目录

初始化 0\t0 这样的初始值.

//创建队列对应的文件和目录:public void createQueueFile(String queueName) throws IOException, MqException {//先创建对应的目录:File file = new File(getQueueDir(queueName));if(!file.exists()){boolean ok = file.mkdirs();if(!ok) throw new IOException("创建队列目录失败。baseDir:"+file.getAbsolutePath());}else{throw new MqException("[createQueueFile] 队列对应的目录已经被创建过了,创建失败");}//下面开始创建 数据文件:File dataFile = new File(getQueueDataDir(queueName));if(!dataFile.exists()){boolean ok = dataFile.createNewFile();if(!ok) throw new IOException("创建数据文件失败。queuedataDir:"+dataFile.getAbsolutePath());}else{throw new MqException("[createQueueFile] 队列对应的数据文件已经被创建过了,创建失败");}//创建 统计文件:File statFile = new File(getQueueStatDir(queueName));if(!statFile.exists()){boolean ok = statFile.createNewFile();if(!ok) throw new IOException("创建统计文件失败。queuestatDir:"+statFile.getAbsolutePath());}else{throw new MqException("[createQueueFile] 队列对应的统计文件已经被创建过了,创建失败");}//给消息统计文件设定初始值 0\t0, (消息数量:0,有效消息数量:0)// 目的:不用在今后使用的时候对空文件做一些特殊的判定Stat stat = new Stat();stat.totalCount = 0;stat.validCount = 0;//再写入:writeStat(queueName,stat);}

实现删除文件或目录

注意:File 类的 delete ⽅法只能删除空⽬录. 因此需要先把内部的⽂件先删除掉,如果还存在多余文件,就会删除失败。

//删除队列的文件或目录://队列也是可以删除的,当队列删除后,对应的消息文件啥的,也要随之删除。public void deleteQueueFile(String queueName) throws IOException{//先删除 数据文件:File queueDataFile = new File(getQueueDataDir(queueName));boolean ok1 = queueDataFile.delete();//再删除 统计文件:File queueStatFile = new File(getQueueStatDir(queueName));boolean ok2 = queueStatFile.delete();//再删除目录;File file = new File(getQueueDir(queueName));boolean ok3 = file.delete();if(!(ok1 && ok2 && ok3)){//任意一个删除失败,就失败,抛出异常:throw new IOException("删除队列文件或目录失败");}}

检查队列⽂件是否存在

判定该队列的消息⽂件和统计⽂件是否存在. ⼀旦出现缺失, 则不能进⾏后续⼯作.

//检查队列的 文件或目录 是否存在: 目的:判断是否队列之前被 别人用过。//用处1:如果后续有生产者给 broker server 生产消息了,这个消息可能需要记录到文件上,此时需要判断文件是否存在(持久化的应用)。public boolean checkFilesExists(String queueName){//如果队列的 数据文件和统计文件都存在,才存在:File queueDataFiles = new File(getQueueDataDir(queueName));if(!queueDataFiles.exists()) return false;File queueStatFiles = new File(getQueueStatDir(queueName));if(!queueStatFiles.exists()) return false;//都存在,则返回true;return true;}

实现消息对象序列化/反序列化

先创建工具类BinaryTool用与序列化/反序列化。
在这里插入图片描述

  • 使⽤ ByteArrayInputStream / ByteArrayOutputStream 针对 byte[] 进⾏封装, ⽅便后续操作. (这两个流对象是纯内存的, 不需要进⾏ close).
  • 使⽤ ObjectInputStream / ObjectOutputStream 进⾏序列化 / 反序列化操作. 通过内部的readObject / writeObject 即可完成对应操作.
/*** 这个类用来序列化 与反序列化* 此处我们采用的是java标注库里的 ObjectOutputStream 和ObjectInputStream 两个流对象,但是序列化的对象必须要实现Serializable接口、** 由于将序列化,反序列化当做一个工具,很多数据都可能用到,所以我们将它的方法搞成静态的**/
public class BinaryTool {//序列化:public static byte[] toBytes(Object object) throws IOException {//由于在try里面写流对象能自动关闭省去我们不少事,所以,直接写在try()里//这里 使用ByteArrayOutputStream是因为 未知的byte数组的长度,这个类能自动记录。try(ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()){//将byteArrayOutputStream 传入ObjectOutputStream 就相当于将他们相互关联了,当objectOutputStream调用writeObject方法//时会将这个对象写入关联的byteArrayOutputStream里,然后直接调用byteArrayOutputStream里的方法,将序列化的数据转换成直接数组就行了//其实这个 ObjectOutputStream 不仅可以关联数组,还可以是文件,网络。关联了文件就将对象序列化到文件里,关联了网络,就是网络数据的传输socket。try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)){objectOutputStream.writeObject(object);}//这个操作就是把byteArrayOutputStream中持有的二进制数据取出来,转成byte[]return byteArrayOutputStream.toByteArray();}}//反序列化:public static Object fromBytes(byte[] data) throws IOException, ClassNotFoundException {try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data)){try(ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)){return objectInputStream.readObject();}}}}

实现写入消息文件

  • 考虑线程安全, 按照队列维度进⾏加锁.
  • 使⽤ DataOutputStream 进⾏⼆进制写操作. ⽐原⽣ OutputStream 要⽅便.
  • 需要记录 Message 对象在⽂件中的偏移量. 后续的删除操作依赖这个偏移量定位到消息,这也是message里的偏移量初始化的时候,就是发送消息的时候。offsetBeg是原有⽂件⼤⼩的基础上, 再 + 4. 4 个字节是存放消息大小的空间.
  • 写完消息, 要同时更新统计信息.
 //该方法将传来的一个新的消息放到对应的文件当中:新增消息public void sendMessage(MESGQueue queue, Message message) throws IOException, MqException {//先检查文件是否存在:如果不存在怎抛出异常,这个异常可以自定义。if(!checkFilesExists(queue.getName())){throw new MqException("[MessageFileManager对应的文件不存在]!queueName"+queue.getName());}//先进行序列化:byte[] binaryMessage = BinaryTool.toBytes(message);//为了解决线程安全问题,我们引入锁,如果此时的锁对象 是同一个队列,那就阻塞等待。synchronized (queue){//先将数据文件new出来,看看此时文件里已经写入的数据长度,方便我们后续计算offsetbegin和offsetendFile file = new File(getQueueDataDir(queue.getName()));//在写入消息的时候才对message里的offsetbegin和offsetend 进行赋值:message.setOffsetBeg(file.length()+4);message.setOffsetEnd(file.length()+4+binaryMessage.length);//由于我们的写入是追加写入,所以不要忘记 truetry (OutputStream outputStream = new FileOutputStream(file,true)){try(DataOutputStream dataOutputStream = new DataOutputStream(outputStream)){//先写入4个字节的消息长度:dataOutputStream.writeInt(binaryMessage.length);//再写入offsetbegin 和 offsetenddataOutputStream.write(binaryMessage);}}//此时已经将消息数据文件写完了,不要忘记消息统计文件:Stat stat = readStat(queue.getName());stat.validCount++;stat.totalCount++;writeStat(queue.getName(),stat);}}

创建异常类MqException

作为⾃定义异常类. 后续业务上出现问题, 都统⼀抛出这个异常
在这里插入图片描述

public class MqException extends Exception{public MqException(String reason){super(reason);}
}

实现删除消息

此处的删除只是 “逻辑删除”, 即把 Message 类中的 isValid 字段设置为 0.

  • 使⽤ RandomAccessFile 来随机访问到⽂件的内容.(随机访问其实没什么玄乎的,就像数组一样,能通过下标快速访问某个元素,这就是随机访问的原理。内存是支持随机访问的)。
  • 根据 Message 中的 offsetBeg 和 offsetEnd 定位到消息在⽂件中的位置. 通过randomAccessFile.seek 操作⽂件指针偏移过去. 再读取.
  • 读出的结果解析成 Message 对象, 修改 isValid 字段, 再重新写回⽂件. 注意写的时候要重新设定文件指针的位置. ⽂件指针会随着上述的读操作产⽣改变,所以要重新seek,将光标移动到开始。
  • 最后, 要记得更新统计⽂件, 把合法消息 - 1.
 //删除消息:主要的操作歩奏:// 1,将消息读出来//2,将消息里的isVail 改成0x0//3,将消息放回文件中public void deleteMessage(MESGQueue queue,Message message) throws IOException, ClassNotFoundException {//由于删除消息的时候也可能收到线程安全问题,所以我们要加锁:synchronized (queue){//先将消息读出来://由于我们正常使用的FileInputStream,只能从头开始读。而此时的场景更倾向于 随机读取,所以我们使用到了RandomAccessFile进行随机读取//注意这个RandomAccessFile类的第二个参数:rw可读可写。try(RandomAccessFile randomAccessFile = new RandomAccessFile(getQueueDataDir(queue.getName()),"rw")){//先准备一个byte数组用来放 读出来的二进制数据:byte[] bufferSrc = new byte[(int) (message.getOffsetEnd() - message.getOffsetBeg())];//先将光标刷新到 offsetbeginrandomAccessFile.seek(message.getOffsetBeg());//将message二进制数据读出来randomAccessFile.read(bufferSrc);//转换成message对象:Message message1 = (Message) BinaryTool.fromBytes(bufferSrc);//将message里的isVail 改成0x0message1.setIsVail((byte) 0x0);//将新的message1 转成二进制:byte[] bufferDest = BinaryTool.toBytes(message1);//由于上一次读文件光标已经发生了变化,所以此时还要调整光标到offsetbeginrandomAccessFile.seek(message.getOffsetBeg());//将数据写入文件:randomAccessFile.write(bufferDest);//此时已经将数据文件里的vail改成无效,那我们需不需要将这个内存中的 message对象里的vail也改成无效呢?//可以是可以,但是没有必要:想象一下我们将一个文件标记成无效的场景是不是我们此时要删掉这个文件的时候,//此时我们都要删掉这个文件了,当然要连同文件里的数据和内存中的数据都删了呀,文件里的数据可能需要一些歩奏,//但是在内存中删一个对象实在太容易了,今后会有内存中的删除消息操作。这就相当于让一个将死之人多活几秒,但他终究逃不过死亡//这就是message里的vail 其实不需要改动的原因。}//不要忘了统计文件也要更新, 由于我们此时已经将数据文件里的一个消息改成无效的,所以此时统计文件里的有效消息就要--了Stat stat = readStat(queue.getName());if(stat.validCount >0){stat.validCount --;}//再将更新后的统计信息 写入文件writeStat(queue.getName(),stat);}}

实现消息加载

这个功能在服务器重启, 和垃圾回收的时候都很关键

  • 使⽤ DataInputStream 读取数据. 先读 4 个字节为消息的⻓度, 然后再按照这个⻓度来读取实际消息内容.
  • 读取完毕之后, 转换成 Message 对象.
  • 同时计算出该对象的 offsetBeg 和 offsetEnd.
  • 最终把结果整理成链表, 返回出去.
  • 注意, 对于 DataInputStream 来说, 如果读取到 EOF, 会抛出⼀个 EOFException , ⽽不是返回特定值. 因此需要注意上述循环的结束条件.
//从消息数据文件当中读出所有消息://由于是服务器刚启动的时候才会调用这个方法,此时的队列还不能处理各种请求,所以不需要考虑线程安全问题。public LinkedList<Message> loadAllMessagesFromQueueDataFile(String queueName) throws IOException, ClassNotFoundException, MqException {//先new出来一个linkedList来放所有消息:使用链表是因为要进行头删和尾删等操作:LinkedList<Message> messages = new LinkedList<>();//创建流对象:try(InputStream inputStream = new FileInputStream(getQueueDataDir(queueName))){//与上面的DataOutputStream对应,此时用的是DataInputStreamtry(DataInputStream dataInputStream = new DataInputStream(inputStream)){//由于要读的消息可能不止一条,所以用一个while循环://但是如果我们直接这样写会一直重复的读一个消息,而DataInputStream不能控制光标的移动,所以要定义一个量来//记录我们读到哪里了,另外,这个量也为后续message对象的offsetbegin和offsetend的初始化提供便利long currentOffset = 0;//写完大概得逻辑以后不知道不会不会有疑问,这个while的条件可是true啊,这不死循环了嘛,//其实这也是无奈之举,主要原因是dataInputStream.readInt()读到文件的末尾并不会返回-1,EOF啥的,而是//直接抛出 EOFException异常,直接结束循环,因此我们只用在外层catch住这个异常就行了,这是一个很特别的预料之内的循环结束方式while(true){//先读4个字节,求出数据的长度:int messageLen = dataInputStream.readInt();//创建一个刚好能装messageLen长度的字节数组:byte[] messageBinary = new byte[messageLen];//读出消息数据:并且用变量接收,判断读出的数据是否和预期的数据长度一致,若不一致,说明格式不正确,错乱了则抛出异常int realMessageLen = dataInputStream.read(messageBinary);if(realMessageLen != messageLen){throw new MqException("[MessageFileManager] 文件格式错误!!!queueName:"+queueName);}//将数组反序列化成message对象Message message = (Message) BinaryTool.fromBytes(messageBinary);//如果读到的消息是无效的,则跳过这个无效消息,更新currentoffset:if(message.getIsVail() == 0x0){currentOffset+=(4+messageLen);continue;}//再将message里的offsetbegin和offsetend给初始化:message.setOffsetBeg(currentOffset+4);message.setOffsetEnd(currentOffset+4+messageLen);//正常读完后,别忘了,将currentoffset更新currentOffset+=(4+messageLen);//再将消息加入到链表当中messages.add(message);}}catch (EOFException e){System.out.println("[MessageFileManager] 恢复Message 数据完成!!!");}}return messages;}

实现垃圾回收(GC)

  • 上述删除操作, 只是把消息在⽂件上标记成了⽆效. 并没有腾出硬盘空间. 最终⽂件⼤⼩可能会越积越多. 因此需要定期的进⾏批量清除.
  • 此处使⽤类似于复制算法. 当总消息数超过 2000, 并且有效消息数⽬少于 50% 的时候, 就触发 GC。GC 的时候会把所有有效消息加载出来, 写⼊到⼀个新的消息⽂件中, 使⽤新⽂件, 代替旧⽂件即可.
 public void gc(MESGQueue queue) throws MqException, IOException, ClassNotFoundException {//根据以前的写代码经验,次GC过程可能有线程安全问题,所以我们直接加锁://其实这也是为什么形参传入的是一个队列,而不是队列的名字的其中一个原因。synchronized (queue){//由于GC的执行时间可能很慢,我们手动的将时间计算出来,如果将来服务器运行半天无响应了,如果是GC的问题//我们也能知道long gcBegin = System.currentTimeMillis();//先创建新的文件:File newQueueFile = new File(getQueueDataNewPath(queue.getName()));//如果文件已经存在了,可能上一次gc有残留,这是不正常的,所以抛出异常if(newQueueFile.exists()){throw new MqException("[MessageFilemanager] 队列的queue_new_data.txt已经存在!!queueName:"+queue.getName());}//如果执行到这,说明文件不存在,则创建新文件:boolean ok = newQueueFile.createNewFile();//如果创建文件失败,则抛出异常:if(!ok){throw new MqException("[MessageFileManager] 创建文件失败!!newQueueDataFile:"+newQueueFile.getAbsolutePath());}//先创建一个链表用来存储从原来的文件中取出来的message对象:此处可以用到之前的方法://取出原来文件里的所有有效文件:LinkedList<Message> messages = loadAllMessagesFromQueueDataFile(queue.getName());//new出相应的流对象用来写入新文件://这里我写错了,将queue.getName()传入了,但是明明是一个不存在的路径,他竟然还能正常写?底层也不抛出异常,//我真是又惊讶,又惊吓。//之后我又去查了查资料:原来是FileOutputStream的问题啊,FileOutputStream太nb了,//如果传入的字符串对应的路径不存在,FileOutputStream会自动给你创建一个文件用于写入,这这这也太贴心了吧,//不过我还是希望他能直接抛异常,毕竟我找bug也找了这么久了,况且天知道他会把我的数据写到哪里://其实也知道:如果是绝对路径,他会自动创建路径下的文件;如果是相对路径,他会在当前工作空间创建一个文件。//找了一圈以后发下,在我的mq路径下,就存在一个queuetest1的文件,里面正好是之前我写入的数据,呜呜呜,要哭了。
//            try(OutputStream outputStream = new FileOutputStream(queue.getName())){try(OutputStream outputStream = new FileOutputStream(newQueueFile)){try(DataOutputStream dataOutputStream = new DataOutputStream(outputStream)){//循环读取messages,将对象重新写入新文件:for(Message m : messages){//先将消息序列化:一个字节数组:byte[] buffer = BinaryTool.toBytes(m);//将二进制数组写入新的文件:注意遵循之前的约定:dataOutputStream.writeInt(buffer.length);dataOutputStream.write(buffer);}}}//删除旧文件,这里以前传入的旧文件的路径写错了,直接传成了名字,所以写代码一定要细心啊。
//            File oldQueueFile = new File(queue.getName());File oldQueueFile = new File(getQueueDataDir(queue.getName()));boolean ok2 = oldQueueFile.delete();System.out.println("[ok2]oldQueueFile 文件删除:"+ok2);//如果删除失败,可能是没有权限之类的,抛出异常:if(!ok2){throw new MqException("MessageFileManager 删除旧文件失败!! oldDataQueueFile:"+oldQueueFile.getAbsolutePath());}//重命名新文件:boolean ok3 = newQueueFile.renameTo(oldQueueFile);//如果重命名失败,抛出异常:if(!ok3) {throw new MqException("[MessageFileManager] 新文件重命名失败!!oldDataQueueFile:"+oldQueueFile.getAbsolutePath()+" , newDataQueueFile="+newQueueFile.getAbsolutePath());}//不要忘记更新统计文件:Stat stat = readStat(queue.getName());stat.totalCount = messages.size();stat.validCount = messages.size();writeStat(queue.getName(),stat);long gcEnd = System.currentTimeMillis();System.out.println("[MessageFileManager] GC执行完毕!!! 执行的时间:"+(gcEnd - gcBegin)+"ms");}}

测试MessageFileManager

创建MessageFileManagerTest类用于测试:
在这里插入图片描述

测试前的准备:

  • 创建两个队列, ⽤来辅助测试.
  • 使⽤ ReflectionTestUtils.invokeMethod 来调⽤私有⽅法(这就是传说中的反射,注意它的参数,用法)。
  •     ReflectionTestUtils.invokeMethod(messageFileManager,"writeStat",queueName1,stat);
    

这个反射的参数:
第一个参数:类的实例。
第二个参数:你想调用的方法
后面的参数就是不定参数了(数量不确定),能确定的是:后面的参数的就是你想调用的方法的参数。


@SpringBootTest
public class MessageFileManagerTest {private MessageFileManager messageFileManager = new MessageFileManager();private static final String queueName1 = "queuetest1";private static final String queueName2 = "queuetest2";@BeforeEachpublic void setUp() throws IOException, MqException {//由于我们要测试的是队列,所以准备工作就是先创建队列文件:messageFileManager.createQueueFile(queueName1);messageFileManager.createQueueFile(queueName2);}//这个@AfterEach注解我试过了,即使测试方法执行过程中抛出了异常,这个方法还是在每次执行完测试单元以后该执行他还是执行他,//无关乎异常,真nb@AfterEachpublic void tearDown() throws IOException {//首尾工作,将刚才创建的队列文件删掉:messageFileManager.deleteQueueFile(queueName1);messageFileManager.deleteQueueFile(queueName2);}}

测试代码:

@Testpublic void testCreateFile(){//其实就测试创建的队列文件是否存在://由于我们在MessageFileManager里的get路径方法是 private修饰的,所以不能直接调用get路径方法,只能手动写上//检验 队列数据文件是否存在:File queueDataFile1 = new File("./data/"+queueName1+"/queue_data.txt");//此处用的方法是isFile 而不是exists,因为要判定这是个文件,并不是只是存在就行,存在了也可能是个目录。Assertions.assertEquals(true,queueDataFile1.isFile());//检验 队列统计文件是否存在:File queueStatFile1 = new File("./data/"+queueName1+"/queue_stat.txt");Assertions.assertEquals(true,queueStatFile1.isFile());//检验 队列数据文件是否存在:File queueDataFile2 = new File("./data/"+queueName2+"/queue_data.txt");Assertions.assertEquals(true,queueDataFile2.isFile());//检验 队列统计文件是否存在:File queueStatFile2 = new File("./data/"+queueName2+"/queue_stat.txt");Assertions.assertEquals(true,queueStatFile2.isFile());System.out.println("[CreateFileText] 测试创建队列文件成功!!!");}@Testpublic void testReadAndWriteStat(){//先创建出stat类,由于他是内部类,所以要类名. 调用出来:MessageFileManager.Stat stat = new MessageFileManager.Stat();stat.totalCount = 200;stat.validCount = 100;//此时写入stat 到统计文件当中:但是如果直接用messageFileManager. 由于writeStat是private修饰,所以肯定调用不出来,//此时就要用spring带的 反射方法了:
//        messageFileManager.//用反射将 stat写入统计文件:ReflectionTestUtils.invokeMethod(messageFileManager,"writeStat",queueName1,stat);//用反射将 写入的统计文件读出来:MessageFileManager.Stat statNew = ReflectionTestUtils.invokeMethod(messageFileManager,"readStat",queueName1);//判断读出来的stat和我们设定的stat是否一样Assertions.assertEquals(200,statNew.totalCount);Assertions.assertEquals(100,statNew.validCount);System.out.println("[testReadAndWriteStat] 测试成功!!!");}//要想测试发送消息,首先要有队列和消息吧,所以,我们先写创建队列和消息的方法:private MESGQueue createQueue(){MESGQueue queue = new MESGQueue();//这里的队列名字不能随便取,因为随便取的队列名字也没有对应的文件啊,要用就要用已经创建了文件的队列名,//考虑到这个队列要与文件交互,而我们只创建了queuename1和queuename2两个名字对应的文件,所以只能用这两个名字的一个。queue.setName(queueName1);queue.setDurable(true);queue.setExclusive(false);queue.setAutoDelete(false);HashMap<String, Object> hashMap = new HashMap<>();hashMap.put("aaa", "111");hashMap.put("bbb", "222");queue.setArguments(hashMap);return queue;}private Message createMessage(String context){//此时能用到我们之前在message里写的创建 message的工厂类了:Message message = Message.createMessageWithId("testRoutingKey",null,context.getBytes());return message;}@Testpublic void testSendMessage() throws IOException, MqException, ClassNotFoundException {//先创建队列与消息:MESGQueue queue = createQueue();Message message = createMessage("abcdefghijklmnopqrstuvwxyz");//发送消息:messageFileManager.sendMessage(queue,message);//验证stat文件:MessageFileManager.Stat stat = ReflectionTestUtils.invokeMethod(messageFileManager,"readStat",queueName1);Assertions.assertEquals(1,stat.totalCount);Assertions.assertEquals(1,stat.validCount);//验证data文件:使用loadAllMessagesFromQueueDataFile读取文件内容:LinkedList<Message> messages = messageFileManager.loadAllMessagesFromQueueDataFile(queueName1);//验证:Assertions.assertEquals(1,messages.size());Message message1 = messages.get(0);//判断这个message1和我们之前的消息message是否一样:Assertions.assertEquals(message.getMessageId(),message1.getMessageId());Assertions.assertEquals(message.getRoutingKey(),message1.getRoutingKey());Assertions.assertEquals(message.getDeliverMode(),message1.getDeliverMode());Assertions.assertArrayEquals(message.getBody(),message1.getBody());System.out.println("[testSendMessage] 测试成功!!!");}//虽然上一个testSendMessage 已经间接测试过这个方法,但是为了求稳,再测试一遍@Testpublic void testLoadAllMessagesFromQueueDataFile() throws IOException, MqException, ClassNotFoundException {//我们需要准备200条数据用来加载://先创建一个队列用来存放消息:注意这个方法使用的是queueName1创建的队列MESGQueue queue = createQueue();//先创建一个链表用来保存消息,和后面新加载的消息作对比:LinkedList<Message> expectedMessages = new LinkedList<>();//使用for循环创建消息:for(int i =0;i<200;i++){Message message = createMessage("testMessage"+i);//将消息写入文件:messageFileManager.sendMessage(queue,message);//记录消息:expectedMessages.add(message);}//调用loadAllMessagesFromQueueDataFile取出所有消息:LinkedList<Message> realMessages = messageFileManager.loadAllMessagesFromQueueDataFile(queueName1);//先验证队列的数目是否一致:Assertions.assertEquals(200,realMessages.size());//验证基本属性:for(int i= 0;i<realMessages.size();i++){Message realMessage = realMessages.get(i);Message expectMessage = expectedMessages.get(i);System.out.println("["+i+"]"+realMessage.toString());Assertions.assertEquals(realMessage.getMessageId(),expectMessage.getMessageId());Assertions.assertEquals(realMessage.getIsVail(),expectMessage.getIsVail());Assertions.assertEquals(realMessage.getDeliverMode(),expectMessage.getDeliverMode());Assertions.assertEquals(realMessage.getRoutingKey(),expectMessage.getRoutingKey());Assertions.assertArrayEquals(realMessage.getBody(),expectMessage.getBody());}System.out.println("[testLoadAllMessagesFromQueueDataFile] 测试成功!!!");}//测试删除消息:@Testpublic void testDeleteMessage() throws IOException, MqException, ClassNotFoundException {//先创建一个队列:MESGQueue queue = createQueue();//创建一个链表用来保存预期消息:LinkedList<Message> expectedMessages = new LinkedList<>();//再将20条消息都写入队列:for(int i =0;i<20;i++){Message message = createMessage("testMessage"+i);//将消息写入队列文件:messageFileManager.sendMessage(queue,message);//记录消息:expectedMessages.add(message);}//这里 就以删除前三条消息为例:messageFileManager.deleteMessage(queue,expectedMessages.get(0));messageFileManager.deleteMessage(queue,expectedMessages.get(1));messageFileManager.deleteMessage(queue,expectedMessages.get(2));//读出消息,对比:LinkedList<Message> realMessages = messageFileManager.loadAllMessagesFromQueueDataFile(queueName1);//先判断个数:Assertions.assertEquals(17,realMessages.size());for(int i =3;i<20;i++){Message realMessage = realMessages.get(i-3);Message expectMessage = expectedMessages.get(i);System.out.println("["+i+"]"+realMessage.toString());Assertions.assertEquals(realMessage.getMessageId(),expectMessage.getMessageId());Assertions.assertEquals(realMessage.getIsVail(),expectMessage.getIsVail());Assertions.assertEquals(realMessage.getDeliverMode(),expectMessage.getDeliverMode());Assertions.assertEquals(realMessage.getRoutingKey(),expectMessage.getRoutingKey());Assertions.assertArrayEquals(realMessage.getBody(),expectMessage.getBody());}System.out.println("[testDeleteMessage] 测试成功!!!");}//测试GC,这里的GC其实只是测试,不用管消息总数是否大于2000或有效消息占比不到50%,因为那是业务上的判定,会有专门的类来进一步封装//而此处我们只进行测试GC这个方法//计划将100条消息都存入队列,然后将奇数下标的消息都删除,然后执行GC,验证现在的文件是否比原来的文件小:@Testpublic void testGC() throws IOException, MqException, ClassNotFoundException {//创建一个队列:MESGQueue queue = createQueue();//创建一个链表用来记录消息:LinkedList<Message> expectedMessages = new LinkedList<>();//先发送100条消息:for(int i =0;i<100;i++){Message message = createMessage("testMessage"+i);//发送到队列:messageFileManager.sendMessage(queue,message);//记录expectedMessages.add(message);}//删除奇数下标的消息:for(int i =1;i<100;i+=2){messageFileManager.deleteMessage(queue,expectedMessages.get(i));}//先记录执行GG之前的文件大小:File oldFile = new File("./data/"+queueName1+"/queue_data.txt");long oldFileLength = oldFile.length();//执行GCmessageFileManager.gc(queue);//记录执行完GC之后的文件大小:File newFile = new File("./data/"+queueName1+"/queue_data.txt");long newFileLength = newFile.length();//取出真实的消息:LinkedList<Message> realMessages = messageFileManager.loadAllMessagesFromQueueDataFile(queueName1);//先验证消息数量是否对的上:Assertions.assertEquals(50,realMessages.size());//挨个验证消息:for(int i = 0;i<50;i++){Message realMessage = realMessages.get(i);Message expectMessage = expectedMessages.get(i*2);System.out.println("["+i+"]"+realMessage.toString());Assertions.assertEquals(realMessage.getMessageId(),expectMessage.getMessageId());Assertions.assertEquals(realMessage.getIsVail(),expectMessage.getIsVail());Assertions.assertEquals(realMessage.getDeliverMode(),expectMessage.getDeliverMode());Assertions.assertEquals(realMessage.getRoutingKey(),expectMessage.getRoutingKey());Assertions.assertArrayEquals(realMessage.getBody(),expectMessage.getBody());}//验证文件大小://这个验证的原理其实是://删除一个文件并不是直接删除,而是逻辑删除,通过标记统计文件里的vail来标识的,此时的数据文件即使有很多无效的文件,但是他的大小依旧是total//而非vail有效文件的大小。但是如果进行了文件的GC迁移,此时的新文件的大小就是旧文件的vail有效文件的大小了。所以,新文件会小于旧文件的大小。System.out.println("[oldFileLength]:"+oldFileLength);System.out.println("[newFileLength]:"+newFileLength);Assertions.assertTrue(newFileLength<oldFileLength);System.out.println("[testGC] 测试成功!!!");}

测试结果:没问题

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/6567.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【STM32HAL-----GPIO】

1. 什么是GPIO&#xff1f;&#xff08;了解&#xff09; 2. STM32 GPIO简介 2.1. GPIO特点 2.2. GPIO电气特性 2.3. GPIO引脚分布图 IO引脚分布特点&#xff1a;按组存在、组数视芯片而定、每组最多16个IO引脚。 3. IO端口基本结构介绍 4. GPIO八种工作模式 4.1. 输入浮空 特…

Midjourney基础-常用修饰词+权重的用法大全

用好修饰词很关键 Midjourney要用除了掌握好提示词的写法&#xff0c;按照上一篇《做Midjourney最好图文教程-提示词公式以及高级参数讲解》画面主体 场景氛围 主体行为 构图方式 艺术风格 图像质量。 要画出有质感的内容我们必须要掌握好“修饰词”&#xff0c;这些修饰…

二叉树和堆

树概念及结构&#xff08;了解&#xff09; 树的概念&#xff08;看看就行&#xff09; 树是一种 非线性 的数据结构&#xff0c;它是由 n &#xff08; n>0 &#xff09;个有限结点组成一个具有层次关系的集合。 把它叫做树是因 为它看起来像一棵倒挂的树&#xff0c;也就是…

C语言 指针_野指针 指针运算

野指针&#xff1a; 概念&#xff1a;野指针就是指针指向的位置是不可知的&#xff08;随机的、不正确的、没有明确限制的&#xff09; 指针非法访问&#xff1a; int main() {int* p;//p没有初始化&#xff0c;就意味着没有明确的指向//一个局部变量不初始化&#xff0c;放…

腾讯 Hunyuan3D-2: 高分辨率3D 资产生成

腾讯 Hunyuan3D-2&#xff1a;高分辨率 3D 资产生成的突破 前言 在当今数字化时代&#xff0c;3D 资产生成技术正变得越来越重要。无论是游戏开发、影视制作还是虚拟现实领域&#xff0c;高质量的 3D 模型和纹理都是创造沉浸式体验的关键。然而&#xff0c;传统的 3D 资产制作…

Java如何实现反转义

Java如何实现反转义 前提 最近做的一个需求&#xff0c;是热搜词增加换一批的功能。功能做完自测后&#xff0c;交给了测试伙伴&#xff0c;但是测试第二天后就提了一个bug&#xff0c;出现了未知词 levis。第一眼看着像公司售卖的一个品牌-李维斯。然后再扒前人写的代码&…

Java 高级工程师面试高频题:JVM+Redis+ 并发 + 算法 + 框架

前言 在过 2 个月即将进入 3 月了&#xff0c;然而面对今年的大环境而言&#xff0c;跳槽成功的难度比往年高了很多&#xff0c;很明显的感受就是&#xff1a;对于今年的 java 开发朋友跳槽面试&#xff0c;无论一面还是二面&#xff0c;都开始考验一个 Java 程序员的技术功底…

后端:MyBatis

文章目录 1. MyBatis1-1. Mybatis 工具类的封装1-2. Mybatis 通过集合或实体类传递参数-实现插入数据(增)1-3. MyBatis 实现删除数据(删)1-4. MyBatis 实现修改数据(改)1-5. MyBatis 实现查询数据(查) 2. MyBatis 配置文件中的一些标签和属性2-1.environments标签2-2. dataSour…

安卓14自由窗口圆角处理之绘制圆角轮廓线

背景&#xff1a; 前面文章已经分享过&#xff1a; 如何一行代码搞定自由窗口的圆角处理&#xff1f;-wms/自由窗口/sf实战开发 但是又有学员朋友提出另一个blog的成果&#xff1a; 安卓aosp14上自由窗口划线边框Freeform Caption实战开发-千里马framework实战 想要把划线和…

【Unity3D】3D物体摆放、场景优化案例Demo

目录 PlaceManager.cs(放置管理类) Ground.cs(地板类) 和 GroundData.cs(地板数据类) 额外知识点说明 1、MeshFilter和MeshRenderer的Bounds区别 2、Gizmos 绘制一个平行于斜面的立方体 通过网盘分享的文件&#xff1a;PlaceGameDemo2.unitypackage 链接: https://pan.baid…

高效沟通驱动LabVIEW项目成功

在LabVIEW项目开发中&#xff0c;由于涉及软件、硬件及多方协作&#xff0c;项目沟通效率的高低直接影响开发进度与最终质量。不明确的需求、信息传递中的误解以及跨部门协作的阻碍&#xff0c;常导致项目延误甚至失败。因此&#xff0c;建立高效的沟通机制&#xff0c;确保信息…

信息收集(下)

一.端口信息收集 1.端口基础认知 A.端口简介 在 Internet 环境中&#xff0c;各主机依据 TCP/IP 协议实现数据包的发送与接收。数据包凭借目的主机的 IP 地址&#xff0c;在互联网络里完成路由选择&#xff0c;进而精准地传至目标主机。然而&#xff0c;当目的主机同时运行多…

springBoot 整合ModBus TCP

ModBus是什么&#xff1a; ModBus是一种串行通信协议&#xff0c;主要用于从仪器和控制设备传输信号到主控制器或数据采集系统&#xff0c;例如用于测量温度和湿度并将结果传输到计算机的系统。&#xff08;百度答案&#xff09; ModBus 有些什么东西&#xff1a; ModBus其分…

composer安装指定php版本, 忽略平台原因导致的报错

windows下 //composer安装指定php版本, 写出完整的php和composer.phar路径 D:\phpstudy_pro\Extensions\php\php8.1.11nts\php.exe D:\phpstudy_pro\Extensions\composer1.8.5\composer.phar install windows下一些扩展不支持, 如下图, 所以本地composer安装组件时可以忽略 …

【论文投稿】Python 网络爬虫:探秘网页数据抓取的奇妙世界

目录 前言 一、Python—— 网络爬虫的绝佳拍档 二、网络爬虫基础&#xff1a;揭开神秘面纱 &#xff08;一&#xff09;工作原理&#xff1a;步步为营的数据狩猎 &#xff08;二&#xff09;分类&#xff1a;各显神通的爬虫家族 三、Python 网络爬虫核心库深度剖析 &…

大模型应用与部署 技术方案

大模型应用与部署 技术方案 一、引言 人工智能蓬勃发展,Qwen 大模型在自然语言处理领域地位关键,其架构优势尽显,能处理文本创作等多类复杂任务,提供优质交互。Milvus 向量数据库则是向量数据存储检索利器,有高效索引算法(如 IVF_FLAT、HNSWLIB 等)助力大规模数据集相似…

Postman接口测试工具详解

(创作不易&#xff0c;感谢有你&#xff0c;你的支持&#xff0c;就是我前行的最大动力&#xff0c;如果看完对你有帮助&#xff0c;还请三连支持一波哇ヾ(&#xff20;^∇^&#xff20;)ノ&#xff09; 目录 引言 Postman简介 Postman的特点 Postman的下载与安装 Postman…

电路研究9.2——合宙Air780EP使用AT指令

这里正式研究AT指令的学习了&#xff0c;之前只是接触的AT指令&#xff0c;这里则是深入分析AT指令了。 软件的开发方式&#xff1a; AT&#xff1a;MCU 做主控&#xff0c;MCU 发 AT 命令给模组的开发方式&#xff0c;模组仅提供标准的 AT 固件&#xff0c; 所有的业务控制逻辑…

百度APP iOS端磁盘优化实践(上)

01 概览 在APP的开发中&#xff0c;磁盘管理已成为不可忽视的部分。随着功能的复杂化和数据量的快速增长&#xff0c;如何高效管理磁盘空间直接关系到用户体验和APP性能。本文将结合磁盘管理的实践经验&#xff0c;详细介绍iOS沙盒环境下的文件存储规范&#xff0c;探讨业务缓…

Sharding-JDBC 5.4.1+SpringBoot3.4.1+MySQL8.4.1 使用案例

最近在升级 SpringBoot 项目&#xff0c;原版本是 2.7.16&#xff0c;要升级到 3.4.0 &#xff0c;JDK 版本要从 JDK8 升级 JDK21&#xff0c;原项目中使用了 Sharding-JDBC&#xff0c;版本 4.0.0-RC1&#xff0c;在升级 SpringBoot 版本到 3.4.0 之后&#xff0c;服务启动失败…