【项目篇之垃圾回收】仿照RabbitMQ模拟实现消息队列
实现垃圾回收
- 消息垃圾回收
- 为什么要去实现垃圾回收
- 如何实现这个垃圾回收?
- 编写代码
- 编写触发垃圾回收的条件
- 触发垃圾回收的条件
- 约定新文件所在的位置
- 实现垃圾回收的算法(重点)
- 总结
消息垃圾回收
为什么要去实现垃圾回收
由于当前会不停地往消息文件中写入新消息,并且删除消息只是逻辑删除,这就会导致消息文件越来越大,并且里面还包含了很多的无效消息,所以就需要对消息进行垃圾回收的操作
如何实现这个垃圾回收?
我们项目中实现垃圾回收,使用的是复制算法:
编写代码
编写触发垃圾回收的条件
写代码就是未言胜,先言败,写代码先不要去想成功了会怎么样,先去优先思考代码如果出现了问题应该怎么办,如何去处理这个问题和异常
所有的代码加上注释
触发垃圾回收的条件
这个方法也就是检测是否要针对这个队列进行垃圾回收
如果总消息数量大于等于2000个同时有效消息数量少于总数量的50%,如果队列同时满足了这两个条件就需要进行垃圾回收,如果队列没有同时满足,就不进行垃圾回收:
//触发垃圾回收的条件:
//检查当前是否要针对该队列进行垃圾回收
public boolean checkGC(String queueName){ //判定是否要GC,是根据中消息数量和有效消息数量,这两个值都是在消息统计文件中的 //所以就需要先去读取消息统计文件 Stat stat = new Stat(); if(stat.totalCount >= 2000 && (double) (stat.validCount / stat.totalCount) <= 0.5){ return true; } return false;
}
约定新文件所在的位置
//约定新文件所在的位置
private String getQueueDataNewPath(String queueName){ return getQueueDir(queueName) + "/queue_data_new.txt";
}
实现垃圾回收的算法(重点)
垃圾回收算法使用复制算法来完成,这个方法的大体逻辑步骤如下所示:
- 对整个垃圾回收的实现代码进行加锁操作
- 创建一个新的文件,名字就是queue_data_new.txt
- 把之前消息数据文件中的有效消息都读出来,写到新文件中
- 删除旧的文件,再把新的文件改名回queue_data.txt
- 同时要更新消息统计文件
具体的实现代码如下所示:
//这个方法是实现垃圾回收的算法
//这个方法是真正执行消息数据文件的垃圾回收操作
//使用复制算法来完成
//创建一个新的文件,名字就是queue_data_new.txt
//把之前消息数据文件中的有效消息都读出来,写到新文件中
//删除旧的文件,再把新的文件改名回queue_data.txt
//同时要记得更新消息统计文件
public void gc(MSGQueue queue) throws MqException, IOException, ClassNotFoundException { //为什么要进行加锁操作 //进行GC的时候是针对消息数据文件进行大洗牌,在这个过程中,其他线程不能针对该队列的消息文件做任何修改 //因为这个垃圾回收是一个比较耗时的操作,文件中的垃圾数量是未知的,可能很多,所以耗时长 //所以要统计一下垃圾回收消耗的时间 synchronized (queue){ //统计回收时间 long gcBeg = System.currentTimeMillis(); //1. 第一步,创建一个新的文件 File queueDataNewFile = new File(getQueueDataNewPath(queue.getName())); if(queueDataNewFile.exists()){ //正常情况下,这个文件不应该存在,如果存在,说明上一轮GC出了bug,上一轮GC到一半程序就意外崩溃了 throw new MqException("[MessageFileManager] GC的时候发现该队列的queue_data_new已经存在,queueName=" + queue.getName()) } //开始创建这个新文件 boolean ok = queueDataNewFile.createNewFile(); //如果创建失败了,继续抛出异常 if(!ok){ throw new MqException("[MessageFileManager] 创建文件失败,queueDataNewFile=" + queueDataNewFile.getAbsolutePath()); } //2.从旧的文件中读取出所有的有效消息对象,直接调用那个读取消息的方法即可 LinkedList<Message> messages = loadAllMessageFromQueue(queue.getName()); //3. 把有效消息写入到新的文件中 try(OutputStream outputStream = new FileOutputStream(queueDataNewFile)){ try(DataOutputStream dataOutputStream = new DataOutputStream(outputStream)){ for(Message message : messages){ byte[] buffer = BinaryTool.toBytes(message); //先写四个字节的长度 dataOutputStream.writeInt(buffer.length); dataOutputStream.write(buffer); } } } //4. 删除旧的数据文件,并且把新的文件给重命名 File queueDataOldFile = new File(getDataPath(queue.getName())); ok = queueDataOldFile.delete(); if(!ok){ throw new MqException("[MessageFileManager] 删除旧的数据文件失败! queueDataOldFile=" + queueDataOldFile.getAbsolutePath()); } //5.把新的文件名字重命名为旧的文件名字 ok = queueDataNewFile.renameTo(queueDataOldFile); if(!ok){ throw new MqException("[MessageFileManager] 危文件重命名失败!queueDataNewFile="+queueDataNewFile.getAbsolutePath() +" , queueDataOldFile=" + queueDataOldFile.getAbsolutePath()); } //6.更新统计文件 Stat stat = readStat(queue.getName()); stat.totalCount = messages.size(); stat.validCount = messages.size(); writeStat(queue.getName(),stat); long gcEnd = System.currentTimeMillis(); System.out.println("[MessageFileManager] 垃圾回收执行完毕!queueName="+queue.getName() + ", time=" + (gcEnd - gcBeg) + "ms"); }
}
总结
以上就是整个队列的垃圾回收的算法实现
- 如何实现垃圾回收
- 触发垃圾回收的条件
- 约定新文件的位置
- 实现垃圾回收的算法(重点)
最后,所有实现代码如下所示:
//触发垃圾回收的条件:
//检查当前是否要针对该队列进行垃圾回收
public boolean checkGC(String queueName){ //判定是否要GC,是根据中消息数量和有效消息数量,这两个值都是在消息统计文件中的 //所以就需要先去读取消息统计文件 Stat stat = new Stat(); if(stat.totalCount >= 2000 && (double) (stat.validCount / stat.totalCount) <= 0.5){ return true; } return false;
} //约定新文件所在的位置
private String getQueueDataNewPath(String queueName){ return getQueueDir(queueName) + "/queue_data_new.txt";
} //这个方法是实现垃圾回收的算法
//这个方法是真正执行消息数据文件的垃圾回收操作
//使用复制算法来完成
//创建一个新的文件,名字就是queue_data_new.txt
//把之前消息数据文件中的有效消息都读出来,写到新文件中
//删除旧的文件,再把新的文件改名回queue_data.txt
//同时要记得更新消息统计文件
public void gc(MSGQueue queue) throws MqException, IOException, ClassNotFoundException { //为什么要进行加锁操作 //进行GC的时候是针对消息数据文件进行大洗牌,在这个过程中,其他线程不能针对该队列的消息文件做任何修改 //因为这个垃圾回收是一个比较耗时的操作,文件中的垃圾数量是未知的,可能很多,所以耗时长 //所以要统计一下垃圾回收消耗的时间 synchronized (queue){ //统计回收时间 long gcBeg = System.currentTimeMillis(); //1. 第一步,创建一个新的文件 File queueDataNewFile = new File(getQueueDataNewPath(queue.getName())); if(queueDataNewFile.exists()){ //正常情况下,这个文件不应该存在,如果存在,说明上一轮GC出了bug,上一轮GC到一半程序就意外崩溃了 throw new MqException("[MessageFileManager] GC的时候发现该队列的queue_data_new已经存在,queueName=" + queue.getName()) } //开始创建这个新文件 boolean ok = queueDataNewFile.createNewFile(); //如果创建失败了,继续抛出异常 if(!ok){ throw new MqException("[MessageFileManager] 创建文件失败,queueDataNewFile=" + queueDataNewFile.getAbsolutePath()); } //2.从旧的文件中读取出所有的有效消息对象,直接调用那个读取消息的方法即可 LinkedList<Message> messages = loadAllMessageFromQueue(queue.getName()); //3. 把有效消息写入到新的文件中 try(OutputStream outputStream = new FileOutputStream(queueDataNewFile)){ try(DataOutputStream dataOutputStream = new DataOutputStream(outputStream)){ for(Message message : messages){ byte[] buffer = BinaryTool.toBytes(message); //先写四个字节的长度 dataOutputStream.writeInt(buffer.length); dataOutputStream.write(buffer); } } } //4. 删除旧的数据文件,并且把新的文件给重命名 File queueDataOldFile = new File(getDataPath(queue.getName())); ok = queueDataOldFile.delete(); if(!ok){ throw new MqException("[MessageFileManager] 删除旧的数据文件失败! queueDataOldFile=" + queueDataOldFile.getAbsolutePath()); } //5.把新的文件名字重命名为旧的文件名字 ok = queueDataNewFile.renameTo(queueDataOldFile); if(!ok){ throw new MqException("[MessageFileManager] 危文件重命名失败!queueDataNewFile="+queueDataNewFile.getAbsolutePath() +" , queueDataOldFile=" + queueDataOldFile.getAbsolutePath()); } //6.更新统计文件 Stat stat = readStat(queue.getName()); stat.totalCount = messages.size(); stat.validCount = messages.size(); writeStat(queue.getName(),stat); long gcEnd = System.currentTimeMillis(); System.out.println("[MessageFileManager] 垃圾回收执行完毕!queueName="+queue.getName() + ", time=" + (gcEnd - gcBeg) + "ms"); }
}