【项目篇】仿照RabbitMQ模拟实现消息队列
大家好呀
我是浪前
项目篇:仿照RabbitMQ模拟实现消息队列
今天是项目的第一篇,我们先来创建出最核心的几个类。
仿照RabbitMQ模拟实现消息队列
- 创建Exchange类
- MessageQueue类
- Binding类
- Message类
- 1:BasicProperties类
- 2:正文部分
- 3:辅助用的一些属性:
开始之前,先给大家来一句哲理,给大家加加油~
如果你信, 有一天会要站到高处。
那现在的得与失,就不重要。 不至于有这么多的愤怒,不满,怀疑,怨念。 因为事后, 那都是小事,小钱,路人,故人。
没有敌人,大事,坏事,不可承受的损失。
如果你在乎。 那只可能是: 你不相信自己有一天会登顶, 会举世无双, 一览众山小。
我们是要创建一个SpringBoot项目:
这个项目里面包含了三个模块:
- 公共模块
- 客户端模块
- 服务器模块
目前我们先将brokerServer中的核心的概念表示出来:
- 交换机(exchange)
- 队列(queue)
- 绑定(binding)
- 消息(message)
上面的概念都是存储在brokerServer中的,所以对于客户端来说,客户端并不关心服务器有哪些交换机,有哪些队列,客户端只需要把当前的这些数据给发送到服务器(brokerServer),然后再经过brokerServer进行转发,所以上述的概念全部都放到mqserver这个包里面即可
所以第一步:
根据三个模块创建出三个包
第二步:
服务器的核心模块下,根据核心概念创建出这几个类,
下面这张图是交换机,队列,绑定需要写的所有属性:
下面这一张图是消息类需要撰写的所有属性:
下面我们一个一个地展开:
创建Exchange类
交换机里面有什么属性:
1: 交换机的身份标识(name)
//此处使用name来作为交换机的身份标识(唯一的)private String name;
2: 交换机的类型(枚举表示)
//交换机类型:DIRECT, FANOUT, TOPIC:private ExchangeType Type = ExchangeType.DIRECT;
枚举代码:这个枚举代码枚举了交换机的三种类型:
package org.example.mq.mqserver.core; //枚举类型: public enum ExchangeType { DIRECT(0), FANOUT(1), TOPIC(2); //属性type private final int type; //构造方法: private ExchangeType(int type){ this.type = type; } public int getType(){ return type; }
}
3: 交换机是否要进行持久化存储
//该交换机是否要持久化存储? true表示要持久化存储, false表示不必持久化
private boolean durable = false; //默认先是false
有些交换机,队列,绑定是需要进行持久化存储的,但是有些交换机和队列绑定是不需要的:
那么有了这个是否要存储的属性之后,用户在使用的时候,就可以通过开关(boolean值)来决定要不要进行持久化
4:属性:自动删除:如果当前交换机没有生产者使用了,那么就会这个交换机就会自动删除(RabbitMQ):
这个属性先列在这里,后续可以自己去自行扩展:
private boolean autoDelete = false;
5: 属性:arguments表示的是创建交换机时指定的一些额外的参数选项(后续未实现,可自行扩展):
private Map<String,Object> arguments = new HashMap<>();
什么叫做选项?
arguments可以称之为选项,也就是可选,可不选
可以有,也可以没有,通过这个选项来开启不同的功能:
女朋友可盐可甜,今天选择小盐模式的女友,明天选择一个小甜模式女友,我们就可以通过不同的风格来开启不同的选项,通故不同的选项来开启不同的功能
上述的就是大致的交换机的属性:
下面我们加上属性的Setter方法和Getter方法即可:
package org.example.mq.mqserver.core;
/*
这个类表示是交换机: */
import java.util.HashMap;
import java.util.Map; public class Exchange { //此处使用name来作为交换机的身份标识(唯一的) private String name; //交换机类型:DIRECT, FANOUT, TOPIC: private ExchangeType Type = ExchangeType.DIRECT; //该交换机是否要持久化存储? true表示要持久化存储, false表示不必持久化 private boolean durable = false; //默认先是false //交换机没人使用的,那就自动进行删除操作 private boolean autoDelete = false; //创建交换机的时候,指定一些额外的参数选项: private Map<String,Object> arguments = new HashMap<>(); public String getName() { return name; } public void setName(String name) { this.name = name; } public ExchangeType getType() { return Type; } public void setType(ExchangeType type) { Type = type; } public boolean isDurable() { return durable; } public void setDurable(boolean durable) { this.durable = durable; } public boolean isAutoDelete() { return autoDelete; } public void setAutoDelete(boolean autoDelete) { this.autoDelete = autoDelete; } public Map<String, Object> getArguments() { return arguments; } public void setArguments(Map<String, Object> arguments) { this.arguments = arguments; }
}
MessageQueue类
这个类是一个存储消息的的队列
拥有的属性一共如下所示:
1: 队列的身份标识:
private String name;
2:队列是否持久化: true表示持久化,false表示不持久化
private boolean durable;
3:exclusive(独有的,专有的):这个属性为true表示这个队列只能被一个消费者使用,别人用不了:
private boolean exclusive = false;
4:如果队列没有消费者使用,那就进行自动删除:true表示会自动删除,false表示不会自动删除
private boolean autoDelete = false;
5:选项功能:表示扩展参数,列在这里,暂时不实现:
private Map<String, Object> arguments = new HashMap<>();
虽然这些功能没有实现,但是把对应的字段都给列出来,
因为这些字段都是RabbitMQ支持的字段,在RabbitMQ中都有实现,我们这里也就顺便交代一下,一方面可以熟悉RabbitMQ的使用,一方面可以自己对项目进行扩展
之后我们加上Getter和Setter方法之后,这个MSGQueue类就创建完毕了:
所以这个存储消息的队列的属性代码如下所示:
package org.example.mq.mqserver.core; import java.util.HashMap;
import java.util.Map; /*
这个类表示是一个存储消息的队列
MSG -> Message: */public class MSGQueue { //队列的身份标识: private String name; //队列是否持久化: true表示持久化,false表示不持久化 private boolean durable; //这个属性为true表示这个队列只能被一个消费者使用,别人用不了: private boolean exclusive = false; //如果队列没有消费者使用,那就进行自动删除:true表示会自动删除,false表示不会自动删除 private boolean autoDelete = false; //选项功能:表示扩展参数,列在这里,暂时不实现: private Map<String, Object> arguments = new HashMap<>(); public String getName() { return name; } public void setName(String name) { this.name = name; } public boolean isDurable() { return durable; } public void setDurable(boolean durable) { this.durable = durable; } public boolean isExclusive() { return exclusive; } public void setExclusive(boolean exclusive) { this.exclusive = exclusive; } public boolean isAutoDelete() { return autoDelete; } public void setAutoDelete(boolean autoDelete) { this.autoDelete = autoDelete; } public Map<String, Object> getArguments() { return arguments; } public void setArguments(Map<String, Object> arguments) { this.arguments = arguments; }
}
项目为什么要进行扩展,就只实现代码行不行?
我们的项目在精不在多,即使只有一个项目,但是这个项目功能很丰富,内容很全面,技术点很密集,代码量很庞大,项目很健硕,自己很熟悉了解所有的细节,难点,也是很厉害的,你一个项目顶得上别人的两个三个项目
如果你的项目和别人的冲突了,你的项目没有扩展,那么你大概率就危险了
反之,你如果做了5个项目,听起来很高大上很牛逼的框架项目,但是你一个项目细节都不了解,没有吃透,那么你大概率也是很危险的
Binding类
Binding这个类用来表示队列和交换机之间的关联关系:
既然这个类表示队列和交换机之间的关联关系,那么这个类就是既和交换机有关,同时也和队列有关:
下面列出Binding的属性:
1: 写出队列和交换机这两个的身份标识:
private String exchangeName;
private String queueName;
有了上面这身份标识之后,我们就知道是哪一个交换机和哪一个队列之间有关联关系了
如果一个交换机和多个队列有关联,那就new 出多个Binding对象就可以了
2:bindingKey:就是在出题,要求领红包的人要画出一个“兔子”才可以领到红包:
bindingKey就是要出的题目。
private String bindingKey;
注意Binding这个类,是依附于Exchange和Queue的
比如:对于持久化来说,如果Exchange和Queue任何一个都没有进行持久化,那么此时针对这个Binding来进行持久化是没有意义的
之后我们加上Getter和Setter方法,此时Binding类就创建完毕了:
package org.example.mq.mqserver.core; /*
这个类表示队列和交换机之间的关联关系 */public class Binding { // 写出队列和交换机这两个的身份标识: private String exchangeName; private String queueName; //bindingKey:就是在出题,要求领红包的人要画出一个“兔子”才可以领到红包: private String bindingKey; public String getExchangeName() { return exchangeName; } public void setExchangeName(String exchangeName) { this.exchangeName = exchangeName; } public String getQueueName() { return queueName; } public void setQueueName(String queueName) { this.queueName = queueName; } public String getBindingKey() { return bindingKey; } public void setBindingKey(String bindingKey) { this.bindingKey = bindingKey; }
}
Message类
Message类就是表示一个要传递的消息:
所以一个消息是怎么构成的呢?
一个Message主要包含三个部分:
1: 属性部分: 使用一个类BasicProperties
2: 正文部分:字节数组
3: 辅助用的一些属性
下面这张图是Message类的所有属性:
1:BasicProperties类
我们先来实现Message类的第一个部分:
BasicProperties类:
注意这个BasicProperties类也是需要我们自己去单独创建出来的:
下面我们先来介绍一下BasicProperties类中的属性:
1: 消息的唯一身份标识:messageId:此时为了保证唯一性,我们是使用了UUID来作为messageId
什么是UUID呢,UUID就是我们编程中的一个专门用来生成唯一id的算法:每调用一次UUID,所生成的id都是唯一的
private String messageId;
2:routingKey是一个消息上带有的内容,和bindingKey做匹配的暗号:
private String routingKey;
如果当前的交换机的类型是DIRECT,此时routingKey就表示要转发的队列名字
如果当前交换机的类型是FANOUT,此时routitngKey就是无意义的(不使用)
如果当前交换机的类型是TOPIC,此时routingKey就要和bindingKey做匹配,符合要求的才能够转发给对应的队列
3:deliverMode就是表示消息是否要进行持久化,1就表示这个消息不持久化,2就表示消息要持久化
private int deliverMode = 1;
其实针对RabbitMQ来说,BasicProperties里面还有很多别的属性,我们这里就暂时先不去实现了
最后给BasicProperties类加上Getter和Setter方法,这个BasicProperties类就定义好了:
package org.example.mq.mqserver.core; public class BasicProperties { //1.消息的身份标识:为了保证唯一性,我们使用UUID来作为messageId private String messageId; //2.routingKey是一个消息上带有的内容,和bindingKey做匹配的暗号 private String routingKey; //3.deliverMode就是表示消息是否要进行持久化,1就表示这个消息不持久化,2就表示消息要持久化 private int deliverMode = 1; public String getMessageId() { return messageId; } public void setMessageId(String messageId) { this.messageId = messageId; } public String getRoutingKey() { return routingKey; } public void setRoutingKey(String routingKey) { this.routingKey = routingKey; } public int getDeliverMode() { return deliverMode; } public void setDeliverMode(int deliverMode) { this.deliverMode = deliverMode; }
}
2:正文部分
接着我们去实现Message类的第二个部分:
正文部分:
2: 正文部分: byte【】
我们这个正文部分使用的是字节数组,说明正文部分是可以支持二进制数据的,如果是String数组的话,是只能表示文本的,无法表示二进制数据
private byte[] body;
3:辅助用的一些属性:
Message后续如果持久化之后,会被存储到文件中去,不是存储在数据库中的,
而且一个文件中会存储很多的消息,如何找到某个消息就是通过下面的两个偏移量在文件中找到消息的,同时采取前闭后开的区间:【offsetBeg,offsetEnd)
private long offsetBeg = 0; //消息数据的开头距离文件开头的位置偏移(字节)
private long offsetEnd = 0; //消息数据的结尾距离文件开头的位置偏移(字节)
使用下面这个辅助属性表示该消息在文件中是否是有效消息:
isValid 意思就是是否合法,是否有效:
针对文件中的消息,如果删除,就使用逻辑删除的方式进行删除操作:
0x1表示有效,0x0表示无效:
private byte isValid = 0x1;
什么是逻辑删除?
当我们删除数据库,删除硬盘文件,删除数据的时候,很多时候都是使用的是“逻辑删除”的操作:
不是真的直接把这一块硬盘上的数据删除了,而是把这块硬盘上的数据标记为了无效,此时这块硬盘上的空间仍然可以去使用,此处我们也是采取逻辑删除的方式去针对文件中的消息进行删除
接着我们在Message类中自行实现一些好用的方法:
1: 快速拿到和设置Message的messageId:
//调用这个方法之后,立即返回Message的id:
public String getMessageId(){ return basicProperties.getMessageId();
}//设置Message的MessageId:
public void setMessageId(String messageId){ basicProperties.setMessageId(messageId);
}
下面是获取到RoutingKey和设置RoutingKey:
//设置和获取到RoutingKey:
public String getRoutingKey(){ return basicProperties.getRoutingKey();
}
//设置RoutingKey:
public void setRoutingKey(String routingKey){ basicProperties.setRoutingKey(routingKey);
}
下面是是否要进行持久化的方法:
//下面是设置了是否要进行持久化:
//得到是否持久化的消息:
public int getDeliverMode(){ return basicProperties.getDeliverMode();
} //下面是设置了是否要持久化:
public void setDeliverMode(int mode){ basicProperties.setDeliverMode(mode);
}
以上就是Message类的所有方法和属性,由于这个类的方法有些多了,想要直观地体现出来,有点麻烦,
所以我们就可以使用一个工厂方法,让工厂方法帮我们封装一下创建Message对象的过程
这个工厂方法中创建的Message对象,会自动生成一个唯一的MessageId
所以下面这个方法的目的是为了构造出一个Message对象
//创建一个工厂方法, 让工厂方法帮助我们封装一下创建Message对象的过程
//这个方法中创建的Message对象,会自动生成一个唯一的MessageId
public static Message createMessageWithId(String routingKey, BasicProperties basicProperties, byte[] body){ Message message = new Message(); if(basicProperties != null ){ //说明basicProperties是有值的, message.setBasicProperties(basicProperties); } //更新了routingKey: message.setRoutingKey(routingKey); //更新messageId,生成一个随机的id并转为字符串:(M作为前缀) message.setMessageId("M-"+UUID.randomUUID()); //更新消息的正文: message.setBody(body); return message; }
为什么这个工厂方法中创建对象的时候,遗漏了offsetBeg和offsetEnd还有isValid呢?
因为这三个属性都是在消息进行持久化的时候才会使用的到的消息,在把消息写入到文件中之前再来设定这三个属性,而此处的工厂方法,只是在内存中创建了一个Message对象。
但是要注意:上面的Message对象,是需要能够在网络上进行传输,这个消息是需要进行网络传输的,并且这个消息也能够写入到文件当中去,此时就需要对Message进行序列化和反序列化,此时在我们的项目中,我们就直接使用标准库中自带的方式去完成这个消息的序列化和反序列化了
实现这个序列化的前提是要先去给每一个需要序列化的类实现一个叫做Serializable的接口:
public class Message implements Serializable {
我们只需要实现这个接口就可以了,并不需要去重写接口里面的方法。
同时也需要给BasicProperties这个类也加上这个接口去一起实现序列化:
public class BasicProperties implements Serializable {
那么我们不使用json的方式去完成序列化和反序列化呢?
因为json本质上是文本格式,这个json里面存放的是文本类型的数据,而我们此处的Message存储的是二进制数据,这个二进制数据不能使用文本来表示,所以不可以使用json的方式来进行
虽然刚刚给整个Message类都进行了序列化处理,但是有两个属性是不需要被序列化保存到文件中的,因为当消息被写入到文件中之后,所在的位置就固定了,并不需要单独存储,这两个属性存在的目的主要是为了让内存中的Message对象能够快速找到对应的硬盘上的Message的位置,所以给这两个属性加上transient这个关键字就不会被序列化了:
private transient long offsetBeg = 0;
private transient long offsetEnd = 0;
到此为止,我们的Message类就算是编写完毕啦~,具体完整的Message类的代码如下所示:
package org.example.mq.mqserver.core; import java.io.Serial;
import java.io.Serializable;
import java.util.UUID; /*
表示一个要传递的消息 */public class Message implements Serializable { //1.属性部分,使用一个名叫BasicProperties的类: private BasicProperties basicProperties = new BasicProperties(); //2.正文部分:使用字节数组(可以表示二进制数据) private byte[] body; //辅助属性如下: private transient long offsetBeg = 0; //消息数据的开头距离文件开头的位置偏移(字节) private transient long offsetEnd = 0; //消息数据的结尾距离文件开头的位置偏移(字节) //使用下面这个辅助属性表示该消息在文件中是否是有效消息: //0x1表示有效,0x0表示无效: private byte isValid = 0x1; //创建一个工厂方法, 让工厂方法帮助我们封装一下创建Message对象的过程 //这个方法中创建的Message对象,会自动生成一个唯一的MessageId public static Message createMessageWithId(String routingKey, BasicProperties basicProperties, byte[] body){ Message message = new Message(); if(basicProperties != null ){ //说明basicProperties是有值的, message.setBasicProperties(basicProperties); } //更新了routingKey: message.setRoutingKey(routingKey); //更新messageId,生成一个随机的id并转为字符串:(M作为前缀) message.setMessageId("M-"+UUID.randomUUID()); //更新消息的正文: message.setBody(body); return message; } //调用这个方法之后,立即返回Message的id: public String getMessageId(){ return basicProperties.getMessageId(); } //设置Message的MessageId: public void setMessageId(String messageId){ basicProperties.setMessageId(messageId); } //设置和获取到RoutingKey: public String getRoutingKey(){ return basicProperties.getRoutingKey(); } //设置RoutingKey: public void setRoutingKey(String routingKey){ basicProperties.setRoutingKey(routingKey); } //下面是设置了是否要进行持久化: //得到是否持久化的消息: public int getDeliverMode(){ return basicProperties.getDeliverMode(); } //下面是设置了是否要持久化: public void setDeliverMode(int mode){ basicProperties.setDeliverMode(mode); } public BasicProperties getBasicProperties() { return basicProperties; } public void setBasicProperties(BasicProperties basicProperties) { this.basicProperties = basicProperties; } public byte[] getBody() { return body; } public void setBody(byte[] body) { this.body = body; } public long getOffsetBeg() { return offsetBeg; } public void setOffsetBeg(long offsetBeg) { this.offsetBeg = offsetBeg; } public long getOffsetEnd() { return offsetEnd; } public void setOffsetEnd(long offsetEnd) { this.offsetEnd = offsetEnd; } public byte getIsValid() { return isValid; } public void setIsValid(byte isValid) { this.isValid = isValid; }
}