Finish技术生态计划: FinishRpc
finishRpc
简介
纯个人兴趣设计的项目:
因为失业在家摆烂 所以没事就想写点代码 本身也比较喜欢自己写一些好玩的demo
这个项目的设计完全是取悦自己又菜又有一个创造框架的梦想
可以用于提升框架设计思路以及实践一些常用技术的练习
可以用于校园中的练习 , 如果能对你有所帮助 那最好
最终的目标是能够在小环境的微服务中成为一种简单. 好用 性能尚可的框架
后续可能在b站出教学视频
finishRpc 也可以理解成一种思路 它也是在我学习了数个rpc文章 视频以后 按照各个教学中我个人认为的优点加上自己寻思的东西设计出一个利好我开发思想的rpc 千人千面 你可以将自己先实践的技术加入其中 让它成为你的finish 共勉
**ps: 如果你想要跟着文章编写finishRpc 并且文档可能有些细节无法完全展现,有一定的技术门槛 需要了解微服务和网路通信知识 如果有问题还请多一些些包容 遇到不合理的地方可以尝试自己设计 **
文档不可能做到尽善尽美 建议搭配源码一起了解
gitee地址 :https://gitee.com/cold-abyss_admin/finishRpc
环境
- jdk 8 -17 都可以
- nacos2.5.x
- netty 4.1.51
需要实现功能
- 基于netty的网络通信
- 以心跳的方式维护一个不长不短刚好够用的连接
- 自定义编码器
- 使用future的方式异步的执行远程调用
- 以nacos作为服务中心 更加贴合常用的微服务环境 便于组合
- 基于nacos完成动态的服务注册与获取 以及未来的配置获取
- 基于nacos的任务状态实现以事件驱动的缓存
- 使用调用端令牌桶+客户端熔断+重试的方式来确保可靠性
- 提供轮询和哈希一致性的方式实现最基础的负载均衡
可能要做的:
- 链路追踪定位 基于skywalking
- 实现多个自定义注解来更好的springboot环境 提升使用效率
纯原生版本
我们在base版本 要做的时间其实很简单 :
- 实现基础的rpc 就是固定地址+ip 然后动态代理直接类对象的方法
- 加入nacos 基于nacos来完成服务的注册获取和调用
结构与依赖
结构
父项目: idea纯maven项目 创建三个模块
-
finishRpc-core 核心库作为rpc核心 这里会暂时作为简单的客户端和调用端来作为测试
-
finishRpc-api:模拟的微服务 需要远程调用的接口
依赖
finishRPC-core
的依赖库
<dependencies><dependency><groupId>org.reflections</groupId><artifactId>reflections</artifactId><version>0.10.2</version> <!-- 请检查是否有更新版本 --></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.10</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.25</version></dependency><!-- nacos--><dependency><groupId>com.alibaba.nacos</groupId><artifactId>nacos-client</artifactId><version>2.5.1</version></dependency><!-- java轻量级高可用框架--><dependency><groupId>io.github.resilience4j</groupId><artifactId>resilience4j-circuitbreaker</artifactId><version>1.7.1</version></dependency><dependency><groupId>io.github.resilience4j</groupId><artifactId>resilience4j-retry</artifactId><version>1.7.1</version></dependency><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.51.Final</version><scope>compile</scope></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.25</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.83</version></dependency><dependency><groupId>com.hyc</groupId><artifactId>finishRpc-api</artifactId><version>1.0-SNAPSHOT</version></dependency></dependencies>
接口提供
当前包结构:
实体类
package com.hyc.pojo;import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;import java.io.Serializable;@Builder
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Task implements Serializable {// 客户端和服务端共有的private Integer id;private String TaskName;private boolean complete;
}
接口
package com.hyc.service;import com.hyc.pojo.Task;public interface TaskService {Task getTaskById(Integer id);Integer insertTaskId(Task user);
}
实现类
package com.hyc.service;import com.hyc.pojo.Task;import java.util.Random;
import java.util.UUID;public class taskServiceImpl implements TaskService {@Overridepublic Task getTaskById(Integer id) {System.out.println("客户端查询了" + id + "的用户");// 模拟从数据库中取用户的行为Random random = new Random();Task user = Task.builder().TaskName(UUID.randomUUID().toString()).id(id).complete(random.nextBoolean()).build();return user;}@Overridepublic Integer insertTaskId(Task task) {System.out.println("插入数据成功" + task.getTaskName());return task.getId();}
}
核心库
常量
RPCConstant
package com.hyc.constant;/*** @author 冷环渊* @date 2025/4/15 15:49* @description RPCConstant*/
public class RPCConstant {public static final String RPC_GROUP_NAME = "RPC";public static final String CONFIG_FILE_PREFIX = "FinishRPC";public static final String NACOS_USERNAME = "nacos";public static final String NACOS_PASSWORD = "nacos";public static final String SERVER_ADDR = "127.0.0.1:8848";
}
请求响应规范
请求类
package com.hyc.Message;import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;import java.io.Serializable;@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class RpcRequest implements Serializable {private String requestId;private MessageType type;//服务类名,客户端只知道接口private String interfaceName;//调用的方法名private String methodName;//参数列表private Object[] params;//参数类型private Class<?>[] paramsType;public static RpcRequest heartBeat() {return RpcRequest.builder().type(MessageType.HEARTBEAT).build();}
}
响应类
package com.hyc.Message;import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;import java.io.Serializable;@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class RpcResponse implements Serializable {private String requestId;//状态信息private int code;private String message;//具体数据private Object data;//更新:加入传输数据的类型,以便在自定义序列化器中解析private Class<?> dataType;//构造成功信息public static RpcResponse success(Object data) {return RpcResponse.builder().code(200).data(data).build();}public static RpcResponse nettySuccess(String requestId, Object data, Class<?> dataType) {return RpcResponse.builder().requestId(requestId).code(200).data(data).dataType(dataType).build();}//构造失败信息public static RpcResponse fail() {return RpcResponse.builder().code(500).message("服务器发生错误").build();}public static RpcResponse fail(String msg) {return RpcResponse.builder().code(500).message(msg).build();}
}
消息类型
package com.hyc.Message;import lombok.AllArgsConstructor;/*** 用于对消息读取机制拓展** @author 冷环渊* date: 2025/3/23 22:18*/
@AllArgsConstructor
public enum MessageType {REQUEST(0), RESPONSE(1), HEARTBEAT(2);private int code;public int getCode() {return code;}
}
编解码器
这里这么设计的原因是 想提供多种序列化器的 目前暂时用json和最原始的序列化方式 之后可能会拓展 更好的序列化格式
没有那么复杂的编解码需求 开发中希望能在编解码时多一些自定义校验 所以自己编写相对简单的编解码器
当前包位置:
序列化接口
package com.hyc.Serializer;import java.util.HashMap;
import java.util.Map;/*** 用于提供对象的序列化功能* 通过静态工厂方法根据类型返回具体的序列化器实例* @author 冷环渊* date: 2025/3/23 22:21*/
public interface Serializer {
// 把对象序列化成字节数组byte[] serialize(Object obj);
/*
* 将一个字节数组反序列化为一个Java对象
* 如果用Java自带的类型 不用设置也可以得到对象
* 如果有指定消息格式 就根据messageType转化为消息对象
* */Object deserializer(byte[] bytes,int messageType);int getType();// 定义静态常量 serializerMapstatic final Map<Integer, Serializer> serializerMap = new HashMap<>();static Serializer getSerializerByCode(int code) {// 静态映射,保证只初始化一次if(serializerMap.isEmpty()) {serializerMap.put(0, new ObjectSerializer());serializerMap.put(1, new JsonSerializer());}return serializerMap.get(code); // 如果不存在,则返回 null}
}
Json序列化
package com.hyc.Serializer;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.hyc.Message.MessageType;
import com.hyc.Message.RpcRequest;
import com.hyc.Message.RpcResponse;
import lombok.extern.slf4j.Slf4j;/*** 使用 fastjson来实现对象的序列化和反序列化** @author 冷环渊* date: 2025/3/23 22:32*/
@Slf4j
public class JsonSerializer implements Serializer {@Overridepublic byte[] serialize(Object obj) {byte[] bytes = JSONObject.toJSONBytes(obj);return bytes;}@Overridepublic Object deserializer(byte[] bytes, int messageType) {Object obj = null;// 传输的消息分为request与responseswitch (messageType) {case 0:RpcRequest request = JSON.parseObject(bytes, RpcRequest.class);
// 处理心跳信息if (request.getType() == MessageType.HEARTBEAT) {obj = request;break;}Object[] objects = new Object[request.getParams().length];// 把json字串转化成对应的对象, fastjson可以读出基本数据类型,不用转化// 对转换后的request中的params属性逐个进行类型判断for (int i = 0; i < objects.length; i++) {Class<?> paramsType = request.getParamsType()[i];//判断每个对象类型是否和paramsTypes中的一致if (!paramsType.isAssignableFrom(request.getParams()[i].getClass())) {//如果不一致,就行进行类型转换objects[i] = JSONObject.toJavaObject((JSONObject) request.getParams()[i], request.getParamsType()[i]);} else {//如果一致就直接赋给objects[i]objects[i] = request.getParams()[i];}}request.setParams(objects);obj = request;break;case 1:RpcResponse response = JSON.parseObject(bytes, RpcResponse.class);Class<?> dataType = response.getDataType();//判断转化后的response对象中的data的类型是否正确if (!dataType.isAssignableFrom(response.getData().getClass())) {response.setData(JSONObject.toJavaObject((JSONObject) response.getData(), dataType));}obj = response;break;default:log.warn("暂时不支持此种消息");throw new RuntimeException();}return obj;}//1 代表json序列化方式@Overridepublic int getType() {return 1;}@Overridepublic String toString() {return "Json";}
}
编码器
package com.hyc.Serializer;import com.hyc.Message.MessageType;
import com.hyc.Message.RpcRequest;
import com.hyc.Message.RpcResponse;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import lombok.AllArgsConstructor;/*** @author 冷环渊* @date 2025/3/23 22:42* @description MyEncoder*/
@AllArgsConstructor
public class MyEncoder extends MessageToByteEncoder {private Serializer serializer;// netty写出数据的时候使用该编码器将Java对象转换为二进制数据@Overrideprotected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception {
// 日志输出对象类名System.out.println(o.getClass());
// 判断消息是否是请求或者响应类型 根据类型写入if (o instanceof RpcRequest){byteBuf.writeShort(MessageType.REQUEST.getCode());}else if (o instanceof RpcResponse){byteBuf.writeShort(MessageType.RESPONSE.getCode());}
// 写入当前序列化器的表示byteBuf.writeShort(serializer.getType());
// 将消息转化为字符数组byte[] serialize = serializer.serialize(o);byteBuf.writeInt(serialize.length);byteBuf.writeBytes(serialize);}
}
解码器
package com.hyc.Serializer;import com.hyc.Message.MessageType;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import lombok.extern.slf4j.Slf4j;import java.util.List;/*** @author 冷环渊* @date 2025/3/23 22:47* @description MyDecoder*/
@Slf4j
public class MyDecoder extends ByteToMessageDecoder {
// 这个类负责传入字节流解码为业务对象然后将解码后的对象添加到输出@Overrideprotected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
// 读取消息类型short messageType = byteBuf.readShort();if (messageType!= MessageType.REQUEST.getCode()&&messageType!= MessageType.RESPONSE.getCode()) {log.error("暂时不支持这个类型的数据");}
// 读取序列化类型short serializerType = byteBuf.readShort();
// 根据类型返回适当的序列化器Serializer serializer = Serializer.getSerializerByCode(serializerType);if (serializer==null){throw new RuntimeException("不存在对应的序列化器");}// 将反序列化对象放到out列表中int length = byteBuf.readInt();byte[] bytes=new byte[length];
// 读取出对象byteBuf.readBytes(bytes);
// 反序列出对象加入到加入到输出列表Object deserialize= serializer.deserializer(bytes, messageType);list.add(deserialize);}
}
提供端
这是整个提供端与客户端的包结构
服务本地注册
提供端口的整个流程如下 :
服务端入口类 -> 初始化服务 -> 加入provider->netty收到客户端信息从处理器调用provider的获取服务 -> 提取服务执行方法-> 返回结果
我们最原始的rpc 需要一个收集所有服务然后提供服务的出口,将服务节点先注册到本地
package com.hyc.provider;import com.hyc.ServiceRegister.NacosServiceRegister;import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;//本地服务存放器
public class ServiceProvider {private String host;private int port;//集合中存放服务的实例private Map<String, Object> interfaceProvider;public ServiceProvider(String host, int port) {this.host = host;this.port = port;this.interfaceProvider = new HashMap<>();}public ServiceProvider() {this.interfaceProvider = new HashMap<>();}//本地注册服务public void provideServiceInterface(Object service, boolean canRetry) {Class<?>[] interfaceName = service.getClass().getInterfaces();for (Class<?> clazz : interfaceName) {
// 本机映射表interfaceProvider.put(clazz.getName(), service);}}//获取服务实例public Object getService(String interfaceName) {return interfaceProvider.get(interfaceName);}
}
netty服务端
目前我们netty服务端需要做的事情 将标记的服务注册到本地(map) 来作为服务提供列表返回给客户端,并且维持链接, 用心跳包的方式来控制链接的关闭,所以我需要准备两个handler 我喜欢分开写 如果你懂得netty知识 一个handler也是可以的
简单的阐述一下netty在非Spring 环境下的 固定使用流程
- 你可以直接在nettyserver或nettyclient里全部以表达式的形式去创建循环组的设置 这样会显得比较臃肿 耦合度也很高
- 这里我们采用最常见的 处理器-> 初始化 -> 启动类的方式 来实现netty的交互
- 处理器用于处理netty信息的消费和收发 以及事件的处理
- 初始化 用于我们定义netty的通道的配置 比如组件的初始化顺序 处理器的初始化 编解码器设置 拆包粘包处理器等
处理器
- HeartbeatHandler 用于处理心跳事件
- NettyRPCServerHandler 用于日常的收发请求
HeartbeatHandler
package com.hyc.netty.server.handler;import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;/*** 用于处理服务器的心跳数据** @author 冷环渊* @date 2025/4/9 22:32* @description HeartbeatHandler*/
@Slf4j
public class HeartbeatHandler extends ChannelInboundHandlerAdapter {// 用于处理客户端的心跳包@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {try {IdleStateEvent event = (IdleStateEvent) evt;log.info("触发事件:{}", event);
// 如果是触发读空闲 说明很久没有收到客户端的心跳包了if (IdleState.READER_IDLE.equals(event.state())) {log.info("超过10s没有收到客户端心跳 channel:{}", ctx.channel());ctx.close();} else if (IdleState.WRITER_IDLE.equals(event.state())) {log.info("超过25s没有写数据 channel:{}", ctx.channel());ctx.close();}} catch (Exception e) {log.error("处理事件发生异常,{}", e.getMessage());}}@Overridepublic void channelInactive(ChannelHandlerContext ctx) {log.info("连接已关闭: {}", ctx.channel());}
}
NettyRPCServerHandler
package com.hyc.netty.server.handler;import com.hyc.Message.MessageType;
import com.hyc.Message.RpcRequest;
import com.hyc.Message.RpcResponse;
import com.hyc.provider.ServiceProvider;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;/*** 因为是服务器端,我们知道接受到请求格式是RPCRequest** @author 冷环渊* date: 2025/3/22 23:21*/
@AllArgsConstructor
@Slf4j
public class NettyRPCServerHandler extends SimpleChannelInboundHandler<RpcRequest> {private ServiceProvider serviceProvider;@Overrideprotected void channelRead0(ChannelHandlerContext ctx, RpcRequest request) throws Exception {if (request == null) {log.error("RpcRequest is null");return;}if (request.getType() == MessageType.HEARTBEAT) {log.info("接收到心跳包");return;}//接收request,读取并调用服务RpcResponse response = getResponse(request);ctx.writeAndFlush(response);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();}private RpcResponse getResponse(RpcRequest rpcRequest) {//得到服务名String interfaceName = rpcRequest.getInterfaceName();//得到服务端相应服务实现类Object service = serviceProvider.getService(interfaceName);//反射调用方法Method method = null;try {method = service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamsType());Object invoke = method.invoke(service, rpcRequest.getParams());RpcResponse rpcResponse = RpcResponse.nettySuccess(rpcRequest.getRequestId(), invoke, method.getReturnType());System.out.println(rpcResponse);return rpcResponse;} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {e.printStackTrace();log.error("方法执行错误");return RpcResponse.fail();}}
}
初始化
package com.hyc.netty.server;import com.hyc.Serializer.JsonSerializer;
import com.hyc.Serializer.MyDecoder;
import com.hyc.Serializer.MyEncoder;
import com.hyc.netty.server.handler.HeartbeatHandler;
import com.hyc.netty.server.handler.NettyRPCServerHandler;
import com.hyc.provider.ServiceProvider;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.AllArgsConstructor;/*** 用于服务器的channel初始化** @author 冷环渊* date: 2025/3/22 23:20*/
@AllArgsConstructor
public class NettyServerInitializer extends ChannelInitializer<SocketChannel> {private ServiceProvider serviceProvider;@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));//计算当前待发送消息的长度,写入到前4个字节中pipeline.addLast(new LengthFieldPrepender(4));//使用自己的编码逻辑 默认使用json来进行序列化编码pipeline.addLast(new MyEncoder(new JsonSerializer()));//使用自己的解码逻辑pipeline.addLast(new MyDecoder());// 服务器持续的关注客户端的读写事件 如果一定时间没有收到消息 触发idlepipeline.addLast(new IdleStateHandler(15, 25, 0));pipeline.addLast(new HeartbeatHandler());pipeline.addLast(new NettyRPCServerHandler(serviceProvider));}
}
启动类
package com.hyc.netty.server;import com.hyc.provider.ServiceProvider;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;/*** @author 冷环渊* @date 2025/3/22 16:54* @description NettyRpcServer*/
@Slf4j
@AllArgsConstructor
public class NettyRpcServer {public NettyRpcServer(ServiceProvider serviceProvide) {this.serviceProvide = serviceProvide;}private ServiceProvider serviceProvide;private ChannelFuture channelFuture;ServerBootstrap serverBootstrap = new ServerBootstrap();NioEventLoopGroup workerGroup = new NioEventLoopGroup();NioEventLoopGroup bossGroup = new NioEventLoopGroup();public void start(int port) {serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new NettyServerInitializer(serviceProvide));log.info("Netty服务端启动了");try {channelFuture = serverBootstrap.bind(port).sync();log.info("Netty服务端已绑定端口:{}", port);// 将关闭通道的方式 也设置为异步的// 阻塞finally中的代码执行channelFuture.channel().closeFuture().sync();} catch (Exception e) {Thread.currentThread().interrupt();log.error("Netty服务端启动中断:{}", e.getMessage(), e);}}public void stop() {if (channelFuture != null) {try {channelFuture.channel().close().sync();log.info("Netty服务端主通道已关闭");} catch (InterruptedException e) {Thread.currentThread().interrupt();log.error("关闭Netty服务端主通道时中断:{}", e.getMessage(), e);}} else {log.warn("Netty服务端主通道尚未启动,无法关闭");}}private void shutdown(NioEventLoopGroup bossGroup, NioEventLoopGroup workGroup) {if (bossGroup != null) {bossGroup.shutdownGracefully().syncUninterruptibly();}if (workGroup != null) {workGroup.shutdownGracefully().syncUninterruptibly();}}
}
客户端
在客户端的使用中 核心的其实就是基于反射的代理 我们需要代理出所使用的对象 并且组合rpc请求到提供端获取执行结果
执行流程
客户端启动类-> 获取需要执行服务的代理->在代理类中初始化nettyclient->构建rpcrequest并且发送到提供端->异步执行-> 返回结果
为什么异步的执行呢 因为同步的方式 心跳包的发送会占用优先级 导致只有链接断开的时候才能拿到结果
netty客户端
netty客户端的入口代码和服务端就很相似了 一样的处理器 初始化和启动类
处理器
HeartbeatHandler
这里我的设置是单项心跳, 如果你想 按照这个项目的配置你完全可以实现双向心跳
package com.hyc.netty.client.handler;import com.hyc.Message.RpcRequest;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;/*** 用于处理客户端心跳发送** @author 冷环渊* @date 2025/4/9 22:32* @description HeartbeatHandler*/
@Slf4j
public class HeartbeatHandler extends ChannelInboundHandlerAdapter {@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {IdleStateEvent event = (IdleStateEvent) evt;log.info("触发事件:{}", event);IdleState idleState = event.state();
// 每一段时间都需要发送一个心跳包给服务器用于维护通道if (IdleState.WRITER_IDLE.equals(idleState)) {log.info("超过8s没有写数据 发送心跳包维持链接");ctx.writeAndFlush(RpcRequest.heartBeat());}}}
NettyClientHandler
这里的使用的future在处理器的read按需执行,之后在代理类中get结果
package com.hyc.netty.client.handler;import com.hyc.Message.RpcResponse;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.AttributeKey;
import lombok.extern.slf4j.Slf4j;import java.util.concurrent.CompletableFuture;/*** @author 冷环渊* @date 2025/3/22 16:06* @description simpleChennlInitlaizer*/
@Slf4j
public class NettyClientHandler extends SimpleChannelInboundHandler<RpcResponse> {@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();}@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcResponse rpcResponse) throws Exception {AttributeKey<CompletableFuture<RpcResponse>> key = AttributeKey.valueOf("RpcResponseFuture");
// 异步传递future 在需要的时候执行 并且不关闭连接 继续使用CompletableFuture<RpcResponse> future = channelHandlerContext.channel().attr(key).getAndRemove();if (future != null) {future.complete(rpcResponse);}}
}
初始化
package com.hyc.netty.client;import com.hyc.Serializer.JsonSerializer;
import com.hyc.Serializer.MyDecoder;
import com.hyc.Serializer.MyEncoder;
import com.hyc.netty.client.handler.HeartbeatHandler;
import com.hyc.netty.client.handler.NettyClientHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.timeout.IdleStateHandler;/*** netty 初始化** @author 冷环渊* date: 2025/3/22 16:22*/
public class NettyClientInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();/** 为了避免拆包/粘包 使用netty提供的基于自定义长度的解码器 参数设置为* 1 设置长度字节为int类型 一般使用最大值* 2 长度字段在发送的字节数组中的下标位置 从0开始* 3 长度字段大小 因为我们用的是int 所以是4字节* 4 我们希望服务端只接受消息体就行所以设置跳过消息长度存储的字节数也就是 4字节* */pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));//计算当前待发送消息的长度,写入到前4个字节中pipeline.addLast(new LengthFieldPrepender(4));//使用自己的编码逻辑 默认使用json来进行序列化编码pipeline.addLast(new MyEncoder(new JsonSerializer()));//使用自己的解码逻辑pipeline.addLast(new MyDecoder());pipeline.addLast(new NettyClientHandler());pipeline.addLast(new IdleStateHandler(0, 8, 0));pipeline.addLast(new HeartbeatHandler());}
}
启动类
package com.hyc.netty.client;import com.hyc.Message.RpcRequest;
import com.hyc.Message.RpcResponse;
import com.hyc.ServiceCenter.NacosServiceCenter;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.AttributeKey;
import lombok.extern.slf4j.Slf4j;import java.util.concurrent.CompletableFuture;/*** @author 冷环渊* @date 2025/4/10 14:20* @description NettyRpcClient*/
public class NettyClient {private String host;private int port;private static final Bootstrap bootstrap;private static final EventLoopGroup eventLoopGroup;public NettyClient(String host, int port) {this.host = host;this.port = port;}//netty客户端初始化static {eventLoopGroup = new NioEventLoopGroup();bootstrap = new Bootstrap();bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)//NettyClientInitializer这里 配置netty对消息的处理机制.handler(new NettyClientInitializer());}public CompletableFuture<RpcResponse> sendRequest(RpcRequest request) {CompletableFuture<RpcResponse> future = new CompletableFuture<>();// 连接到远程服务ChannelFuture channelFuture = null;channelFuture = bootstrap.connect(host, port).addListener((ChannelFutureListener) connectFuture -> {if (!connectFuture.isSuccess()) {future.completeExceptionally(connectFuture.cause());}}).syncUninterruptibly();Channel channel = channelFuture.channel();// 存储 future 到 Channel 属性中,供后续获取AttributeKey<CompletableFuture<RpcResponse>> responseFutureKey = AttributeKey.valueOf("RpcResponseFuture");channel.attr(responseFutureKey).set(future);// 发送数据channel.writeAndFlush(request).addListener((ChannelFutureListener) writeFuture -> {if (!writeFuture.isSuccess()) {future.completeExceptionally(writeFuture.cause());channel.close();}});return future;}
}
服务代理
在这个类获取到future get结果然后返回
package com.hyc.proxy;import com.hyc.Message.MessageType;
import com.hyc.Message.RpcRequest;
import com.hyc.Message.RpcResponse;
import com.hyc.ServiceCenter.NacosServiceCenter;
import com.hyc.netty.client.NettyRpcClient;
import com.hyc.retry.Resilience4jRetry;
import com.hyc.retry.RpcCircuitBreaker;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.retry.Retry;import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.function.Supplier;/*** 用于创建需要本地执行的任务代理实例** @author 冷环渊* @date 2025/4/10 14:18* @description ServiceProxy*/
public class ServiceProxy implements InvocationHandler {NettyRpcClient rpcClient;NacosServiceCenter nacosServiceCenter;public ServiceProxy() {this.rpcClient = new NettyRpcClient();}public <T> T getProxy(Class<T> clazz) {Object o = Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, this);return (T) o;}@Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {//构建requestRpcRequest request = RpcRequest.builder().requestId(UUID.randomUUID().toString()).type(MessageType.REQUEST).interfaceName(method.getDeclaringClass().getName()).methodName(method.getName()).params(args).paramsType(method.getParameterTypes()).build();CompletableFuture<RpcResponse> future = rpcClient.sendRequest(request);RpcResponse rpcResponse = future.whenComplete((res, ex) -> {if (ex != null) {ex.printStackTrace();}}).get();return rpcResponse.getData();}
}
测试和小结
到这里最基础的netty实现rpc就结束了 ,我们这个rpc是之后搭建的基础,远程调用的核心已经完成了 之后都是不断优化
- 加入注册中心 集中的管理服务地址 并且基于注册中心来实现缓存等一系列操作
- 加入服务端限流 客户端熔断重试等保证可靠性的优化
- 加入客户端负载均衡来提升系统的稳定性
- 抽象配置,集成到nacos中或者读取配置文件
- 注解化 将启动, 服务注册(之后可能还会更加的优化为按照方法注解 类似 openfeign),重试熔断抽象为注解 可以快速的加入到项目中
testClient
package com.hyc;import com.hyc.pojo.Task;
import com.hyc.proxy.ServiceProxy;
import com.hyc.service.TaskService;/*** @author 冷环渊* @date 2025/4/10 21:18* @description testClient*/
public class testClient {public static void main(String[] args) {ServiceProxy proxy = new ServiceProxy();TaskService taskService = proxy.getProxy(TaskService.class);Task TaskByUserId = taskService.getTaskById(1);System.out.println("获取到任务: " + userByUserId);}
}
testServer
package com.hyc;import com.hyc.netty.server.NettyRpcServer;
import com.hyc.provider.ServiceProvider;
import com.hyc.service.TaskService;
import com.hyc.service.taskServiceImpl;
import lombok.extern.slf4j.Slf4j;/*** @author 冷环渊* @date 2025/4/10 21:14* @description testServer*/
@Slf4j
public class testServer {public static void main(String[] args) {// 创建 UserService 实例TaskService taskService = new taskServiceImpl();ServiceProvider serviceProvider = new ServiceProvider("127.0.0.1", 9999);// 发布服务接口到 ServiceProviderserviceProvider.provideServiceInterface(taskService, true); // 可以设置是否支持重试// 启动 RPC 服务器并监听端口NettyRpcServer server = new NettyRpcServer(serviceProvider);server.start(9999); // 启动 Netty RPC 服务,监听 port 端口log.info("RPC 服务端启动,监听端口" + 9999);}
}
测试结果
这是调用端
服务端
注册中心 NACOS
我们使用nacos的意义是 集中的管理服务节点并且基于注册中心来实现缓存等一系列操作
在这一次的拓展中 我们将实现:
- 以nacos作为中心获取服务连接
- 基于nacos事件动态缓存节点地址
- 服务器限流
- 客户端负载均衡
我使用的nacos 是 2.5.1版本, 可以去官网下载
https://nacos.io/
解压完以后,我们需要先配置nacos , 新版的nacos需要开启权限和单机启动 , 可以去官方文档或者搜索如何配置
这是我的配置, 请忽略和这个key哈哈随便写一下就行
修改为 单机启动
复制一份 改名为 startupSingle
修改这个属性 默认是集群模式 改为单机启动
当前的包结构
服务端
我们还是先服务端再客户端, 有了nacos以后,我们需要设置一些使用nacos需要的常量,这些之后都会优化成配置文件读取
我们的思路是将rpc服务和注册节点的服务分开来 , 所有的rpc节点都在rpc分组下
服务注册nacos
package com.hyc.ServiceRegister;import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.hyc.constant.RPCConstant;import java.net.InetSocketAddress;
import java.util.Map;
import java.util.Properties;/*** @author 冷环渊* @date 2025/4/1 21:32* @description NacosServiceRegister*/
public class NacosServiceRegister {private NamingService namingService;// 命名空间private String nameSpace;public NacosServiceRegister(String nameSpace) {this.nameSpace = nameSpace;init();}public NacosServiceRegister() {init();}// 初始化public void init() {try {Properties properties = new Properties();properties.setProperty("username", RPCConstant.NACOS_USERNAME);properties.setProperty("password", RPCConstant.NACOS_PASSWORD);properties.setProperty("serverAddr", RPCConstant.SERVER_ADDR);if (nameSpace != null && !nameSpace.isEmpty()) {properties.setProperty("nameSpace", nameSpace);}namingService = NacosFactory.createNamingService(properties);} catch (NacosException e) {throw new RuntimeException("连接nacos失败", e);}}/*** 注册服务 负责将需要发现的服务注册到nacos管理** @author 冷环渊* date: 2025/4/1 21:40*/public void register(String serviceName, InetSocketAddress serviceAddress, String clusterName, Map<String, String> metadata) {Instance instance = new Instance();instance.setIp(serviceAddress.getHostName());instance.setPort(serviceAddress.getPort());instance.setServiceName(serviceName);if (clusterName != null && !clusterName.isEmpty()) {instance.setClusterName(clusterName);}if (metadata != null && !metadata.keySet().isEmpty()) {instance.setMetadata(metadata);}try {namingService.registerInstance(serviceName, RPCConstant.RPC_GROUP_NAME, instance);System.out.println("服务注册成功" + serviceName + "-" + serviceAddress.getHostName());} catch (NacosException e) {throw new RuntimeException("服务注册失败", e);}}
}
有了这个方法 我们就可以将服务以服务名+ip的形式 注册到nacos里
之后我们在provider中作为注册的起点 , 这里的canRetry是用于之后的是否开启重试服务的flag 存储在nacos服务节点的meta数据中
package com.hyc.provider;import com.hyc.ServiceRegister.NacosServiceRegister;import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;//本地服务存放器
public class ServiceProvider {private String host;private int port;//集合中存放服务的实例private Map<String, Object> interfaceProvider;NacosServiceRegister nacosServiceRegister;// nacos的元数据private Map<String, String> nacosMeta;public ServiceProvider(String host, int port) {this.host = host;this.port = port;this.interfaceProvider = new HashMap<>();this.nacosServiceRegister = new NacosServiceRegister();this.nacosMeta = new HashMap<>();}public ServiceProvider() {this.interfaceProvider = new HashMap<>();}//本地注册服务public void provideServiceInterface(Object service, boolean canRetry) {Class<?>[] interfaceName = service.getClass().getInterfaces();for (Class<?> clazz : interfaceName) {
// 本机映射表interfaceProvider.put(clazz.getName(), service);nacosMeta.put(clazz.getName(), String.valueOf(canRetry));nacosServiceRegister.register(clazz.getName(), new InetSocketAddress(host, port), null, nacosMeta);}}//获取服务实例public Object getService(String interfaceName) {return interfaceProvider.get(interfaceName);}
}
到这里 nacos加入到服务端就结束了 ,之后我们就可以在nacos中获取的对应的服务地址,然后发起请求了
基于令牌桶限流
包结构
开闭限流是服务器保证自己服务稳定性的一种手段 ,我们简单的实现一个令牌桶来对服务器进行限制,
我们的思路有两种 一种是以服务名为基础的限流,一种是以ip为基础的限流, 这里我们以服务名作为限流标识
后面想要更多的限流方式只需要不断实现接口 就可以达到效果
限流服务构建
package com.hyc.Server.provider;import com.hyc.rateLimit.RateLimit;
import com.hyc.rateLimit.TokenBucketRateLimit;
import lombok.extern.slf4j.Slf4j;import java.util.HashMap;
import java.util.Map;/*** 基于令牌桶限制的服务提供** @author 冷环渊* @date 2025/3/30 14:22* @description RateLimitProvider*/
@Slf4j
public class RateLimitProvider {// 用于存储每隔接口和对应速率显示器的映射关系private Map<String, RateLimit> rateLimitsMap = new HashMap<>();/*** 根据结构名称获取对应的限制器实例 如不存在将创建** @author 冷环渊* date: 2025/3/30 14:24*/public RateLimit getRateLimitByInterFaceName(String interFaceName) {if (!rateLimitsMap.containsKey(interFaceName)) {
// todo 这里先默认的使用固定值创建新的限流器 之后会修改成从配置文件中读取RateLimit rateLimit = new TokenBucketRateLimit(100, 15);rateLimitsMap.put(interFaceName, rateLimit);return rateLimit;}return rateLimitsMap.get(interFaceName);}// todo 这里采用的是服务名限流 之后会拓展基于IP限流 思路大致一致 改变的只会是粗细度
// public RateLimit getRateLimitByInterFaceName()
}
令牌桶算法实现
package com.hyc.Server.rateLimit;/*** 用于自定义限流机制 这里我们采用令牌桶的方式* @author 冷环渊* date: 2025/3/30 13:58*/
public interface RateLimit {boolean getToken();
}
package com.hyc.rateLimit;import lombok.extern.slf4j.Slf4j;/*** 服务器令牌桶限流机制具体实现** @author 冷环渊* @date 2025/3/30 13:59* @description TokenBucketRateLimit*/
@Slf4j
public class TokenBucketRateLimit implements RateLimit {// 临牌通产生token的速率 单位:msprivate final int RATE;// 具体的桶容量private final int CAPACITY;// 当前桶容量private volatile int curCAPACITY;// 上次请求的时间private volatile long timeStamp;//
// 需要在创建此算法的时候 就设定好 需要的容量和生成的速率public TokenBucketRateLimit(int rate, int capacity) {
// 每隔rate ms 就会生成一个令牌RATE = rate;CAPACITY = capacity;
// 初始化的时候当前容量curCAPACITY = capacity;timeStamp = System.currentTimeMillis();}public boolean getToken() {
// 实例限流独立加锁synchronized (this) {// 如果桶内还有令牌 直接消费 并且返回if (curCAPACITY > 0) {curCAPACITY--;return true;}long currentTime = System.currentTimeMillis();
// 时间间隔long elapsedTime = currentTime - timeStamp;int newToken = (int) (elapsedTime / RATE);
// newToken大于0说明时间间隔允许补充令牌了if (newToken > 0) {curCAPACITY += newToken;if (curCAPACITY > CAPACITY) {curCAPACITY = CAPACITY;}
// 更新时间戳timeStamp = currentTime;
// 消耗当前请求的令牌 确保请求次数和桶内令牌次数一致curCAPACITY--;return true;}return false;}}// 测试用例public static void main(String[] args) throws InterruptedException {TokenBucketRateLimit limit = new TokenBucketRateLimit(1000, 15);for (int i = 0; i < 15; i++) {if (!limit.getToken()) {System.out.println("被限流了");} else {System.out.println("正常执行" + i + "次");}}}
}
限流运用
将限流服务加入到serviceprovider中
@Getter
private RateLimitProvider rateLimitProvider;public ServiceProvider(String host, int port) {this.host = host;this.port = port;this.interfaceProvider = new HashMap<>();this.nacosServiceRegister = new NacosServiceRegister();this.nacosMeta = new HashMap<>();this.rateLimitProvider = new RateLimitProvider();
}
我们的服务提供入口是在 NettyRPCServerHandler的getResponse方法 , 修改如下
private RpcResponse getResponse(RpcRequest rpcRequest) {//得到服务名String interfaceName = rpcRequest.getInterfaceName();
// 创建限流器boolean token = serviceProvider.getRateLimitProvider().getRateLimitByInterFaceName(interfaceName).getToken();if (token) {log.info("服务请求被限流");}//得到服务端相应服务实现类Object service = serviceProvider.getService(interfaceName);//反射调用方法Method method = null;try {method = service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamsType());Object invoke = method.invoke(service, rpcRequest.getParams());RpcResponse rpcResponse = RpcResponse.nettySuccess(rpcRequest.getRequestId(), invoke, method.getReturnType());System.out.println(rpcResponse);return rpcResponse;} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {e.printStackTrace();log.error("方法执行错误");return RpcResponse.fail();}}
客户端
这里的服务发现,返回多个列表, 先获取第一个,之后会使用负载均衡的方式 经可能按照需求分散到各个节点
package com.hyc.ServiceCenter;import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.hyc.ServiceCenter.balance.impl.ConsistencyHashBalance;
import com.hyc.cache.serviceCache;
import com.hyc.constant.RPCConstant;import java.net.InetSocketAddress;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;/*** 用于发现nacos上所需要的服务** @author 冷环渊* @date 2025/4/1 22:11* @description NacosServiceCenter*/
public class NacosServiceCenter {private NamingService namingService;private boolean checkRetry = false;// 命名空间private String nameSpace;public NacosServiceCenter(String nameSpace) {this.nameSpace = nameSpace;init();}public NacosServiceCenter() {init();}// 初始化public void init() {try {Properties properties = new Properties();properties.setProperty("username", RPCConstant.NACOS_USERNAME);properties.setProperty("password", RPCConstant.NACOS_PASSWORD);properties.setProperty("serverAddr", RPCConstant.SERVER_ADDR);if (nameSpace != null && !nameSpace.isEmpty()) {properties.setProperty("nameSpace", nameSpace);}namingService = NacosFactory.createNamingService(properties);} catch (NacosException e) {throw new RuntimeException("连接nacos失败", e);}}public InetSocketAddress serviceDiscovery(String serviceName) {try {List<String> addressList = instances.stream().map(Instance::toInetAddr).collect(Collectors.toList());String s = instances.get(0).getMetadata().get(serviceName);checkRetry = Boolean.parseBoolean(s);return parseAddress(addressList.get(0));} catch (NacosException e) {throw new RuntimeException(e);}}public boolean checkRetry() {return checkRetry;}// 将字符串解析成为地址public InetSocketAddress parseAddress(String address) {String[] split = address.split(":");return new InetSocketAddress(split[0], Integer.parseInt(split[1]));}
}
有了服务发现的方法之后, 我们客户端就可以以注册中心作为连接点,直接获取到对应服务名的连接从而开始调用
package com.hyc.netty.client;import com.hyc.Message.RpcRequest;
import com.hyc.Message.RpcResponse;
import com.hyc.ServiceCenter.NacosServiceCenter;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.AttributeKey;
import lombok.extern.slf4j.Slf4j;import java.util.concurrent.CompletableFuture;/*** @author 冷环渊* @date 2025/4/10 14:20* @description NettyRpcClient*/
@Slf4j
public class NettyRpcClient {NacosServiceCenter nacosServiceCenter;private static final Bootstrap bootstrap;private static final EventLoopGroup eventLoopGroup;public NettyRpcClient() {this.nacosServiceCenter = new NacosServiceCenter();}//netty客户端初始化static {eventLoopGroup = new NioEventLoopGroup();bootstrap = new Bootstrap();bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)//NettyClientInitializer这里 配置netty对消息的处理机制.handler(new NettyClientInitializer());}public CompletableFuture<RpcResponse> sendRequest(RpcRequest request) {CompletableFuture<RpcResponse> future = new CompletableFuture<>();// 连接到远程服务ChannelFuture channelFuture = null;channelFuture = bootstrap.connect(nacosServiceCenter.serviceDiscovery(request.getInterfaceName())).addListener((ChannelFutureListener) connectFuture -> {if (!connectFuture.isSuccess()) {future.completeExceptionally(connectFuture.cause());}}).syncUninterruptibly();Channel channel = channelFuture.channel();// 存储 future 到 Channel 属性中,供后续获取AttributeKey<CompletableFuture<RpcResponse>> responseFutureKey = AttributeKey.valueOf("RpcResponseFuture");channel.attr(responseFutureKey).set(future);// 发送数据channel.writeAndFlush(request).addListener((ChannelFutureListener) writeFuture -> {if (!writeFuture.isSuccess()) {future.completeExceptionally(writeFuture.cause());channel.close();}});return future;}}
节点缓存
我们客户端其实是需要缓存的, 它可以提升请求的性能 节省不必要的连接开销, 比如我需要多次获取task 每次都要去注册中心请求地址, 本地才是最快的, 所以我们可以在本地以监听器+容器的方式 根据nacos的事件创造一个动态的本地缓存
package com.hyc.cache;import lombok.extern.slf4j.Slf4j;import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;/*** @author 冷环渊* 用于本地缓存实例* @date 2025/3/24 19:01* @description serviceCache*/
@Slf4j
public class serviceCache {// 存储服务名和地址列表public static Map<String, List<String>> cacheMap = new ConcurrentHashMap<>();//添加服务public void addServiceToCacheMap(String serviceName, String address) {if (cacheMap.containsKey(serviceName)) {List<String> list = cacheMap.get(serviceName);list.add(address);log.info("将名称为{}和地址为{}的服务添加到本地缓存中", serviceName, address);} else {List<String> addressList = new ArrayList<>();addressList.add(address);cacheMap.put(serviceName, addressList);log.info("创建了新的缓存节点 {} ", serviceName);}}//从缓存中取服务地址public List<String> getServiceFromCacheMap(String serviceName) {if (!cacheMap.containsKey(serviceName)) {// 返回空列表避免空指针异常return Collections.emptyList();}List<String> res = cacheMap.get(serviceName);return res;}//从缓存中删除服务地址public void delete(String serviceName, String address) {List<String> addressList = cacheMap.get(serviceName);if (addressList != null && addressList.contains(address)) {addressList.remove(address);log.info("将name为{}和地址为{}的服务从本地缓存中删除", serviceName, address);if (addressList.isEmpty()) {cacheMap.remove(serviceName);log.info("{}服务下没有所需列表,已清空", serviceName);}} else {log.warn("删除失败 地址{}不在服务{}的列表中", address, serviceName);}}
}
nacose监听器
思路就是监听rpc分组下的节点状态信息 以节点健康问基准动态的删除或新增至缓存
package com.hyc.ServiceCenter;import com.alibaba.nacos.api.naming.listener.Event;
import com.alibaba.nacos.api.naming.listener.EventListener;
import com.alibaba.nacos.api.naming.listener.NamingEvent;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.hyc.cache.serviceCache;
import com.hyc.constant.RPCConstant;import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;/*** 用于获取并监控 nacos提供rpc服务的节点的信息 更新缓存** @author 冷环渊* date: 2025/4/15 15:40*/
public class ServiceChangeListener implements EventListener {//本地缓存private serviceCache cache;List<String> healthyIps = new CopyOnWriteArrayList<>();public ServiceChangeListener() {this.cache = new serviceCache();}@Overridepublic void onEvent(Event event) {if (event instanceof NamingEvent) {NamingEvent namingEvent = (NamingEvent) event;String serviceName = namingEvent.getServiceName();if (RPCConstant.RPC_GROUP_NAME.equals(namingEvent.getGroupName())) {healthyIps = namingEvent.getInstances().stream().filter(Instance::isHealthy).map(Instance::toInetAddr).collect(Collectors.toList());updateInstanceCache(serviceName, healthyIps);}}}/*** 处理节点缓存更新** @author 冷环渊* date: 2025/4/15 17:00*/private void updateInstanceCache(String serviceName, List<String> healthyIps) {List<String> cacheMapList = cache.getServiceFromCacheMap(serviceName);healthyIps.stream().filter(ip -> !cacheMapList.contains(ip)).forEach(ip -> cache.addServiceToCacheMap(serviceName, ip));// 检测需要删除的缓存for (String ip : cacheMapList) {if (!healthyIps.contains(ip)) {cache.delete(serviceName, ip);}}
// 会出现并发读取的问题 切换成线程安全list也许能解决
// cacheMapList.stream().filter(ip -> !healthyIps.contains(ip)).forEach(ip -> cache.delete(serviceName, ip));}}
客户端负载均衡
负载均衡其实是一个很有意思的东西 ,这里我暂时先提供 两种负载均衡的方式,
- 一种是最简单和普遍的轮训法
- 第二种是基于节点hash来负载均衡
接口
package com.hyc.ServiceCenter.balance;import java.util.List;/*** @author 冷环渊* @date 2025/3/26 15:09* @description LoadBalance*/
public interface LoadBalance {// 负责实现具体算法 返回分配地址String balance(List<String> addressList);// 添加节点void addNode(String Node);// 删除节点void delNode(String Node);
}
轮询法
package com.hyc.ServiceCenter.balance.impl;import com.hyc.ServiceCenter.balance.LoadBalance;
import lombok.extern.slf4j.Slf4j;import java.util.List;/*** 基于轮询算法的负载均衡** @author 冷环渊* @date 2025/3/26 15:10* @description RoundLoadBalance*/
@Slf4j
public class RoundLoadBalance implements LoadBalance {// 初始化轮询索引private int flag = -1;@Overridepublic String balance(List<String> addressList) {flag++;flag = flag % addressList.size();log.info("轮询至 {} 服务器", flag);return addressList.get(flag);}@Overridepublic void addNode(String Node) {}@Overridepublic void delNode(String Node) {}
}
hash一致性
这个方法借鉴于一个rpc教学思路 . 这里的负载均衡思路 将节点映射在哈希环中, 通过哈希值来找到环中最近的节点, 通过虚拟节点来增加哈希命中的概率.
package com.hyc.ServiceCenter.balance.impl;import com.hyc.ServiceCenter.balance.LoadBalance;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;/*** 基于一致性hash负载均衡* 虚拟节点解决hash偏移问题的** @author 冷环渊* @date 2025/3/26 15:10* @description ConsistencyHashBalance*/
public class ConsistencyHashBalance implements LoadBalance {// 每个真实节点所需要的虚拟节点数public static final int VIRTUAL_NUM = 5;// 虚拟节点标识public static final String VIRTUAL_STR = "&&VIRTUAL_NODE";private static final Logger log = LoggerFactory.getLogger(ConsistencyHashBalance.class);// 保存虚拟节点的hash值以及对应的虚拟节点 key为hash值 value为虚拟节点名称private SortedMap<Integer, String> shards = new TreeMap<Integer, String>();// 考虑线程安全问题 并发读的时候无需上锁 并且新增不会直接往容器写private List<String> realNodes = new CopyOnWriteArrayList<>();private void init(List<String> addressList) {for (String server : addressList) {addNode(server);}}/*** 基于fnv1_32** @author 冷环渊*/public int getHash(String str) {final int FNV_PRIME = 16777619;
// fnv初始哈希值 截断为intint hash = (int) 2166136261L;for (int i = 0; i < str.length(); i++) {hash = (hash ^ str.charAt(i)) * FNV_PRIME;// 增强步骤:增加位移和异或操作hash = (hash ^ (hash >>> 16)) * 0x85ebca6b; // 混合高低位hash ^= (hash >>> 13); // 二次扩散if (hash < 0) {hash = Math.abs(hash);}}return hash;}public String getServer(String node, List<String> serviceList) {// 初始化真实节点和虚拟节点if (shards.isEmpty()) {init(serviceList);}int hash = getHash(node);log.info("这次请求所分析的地址hash {}", hash);Integer key = null;
// 获取hash值大于等于请求哈希值的所有虚拟节点SortedMap<Integer, String> nodeMap = shards.tailMap(hash);
// 如果没有找到对应的节点集合 那么久选择最大的节点 否则选择第一个虚拟节点if (nodeMap.isEmpty()) {key = shards.lastKey();} else {key = nodeMap.firstKey();}// 返回真实节点String virtualNode = shards.get(key);log.info("这次请求所前往的虚拟节点 {}", virtualNode);return virtualNode.substring(0, virtualNode.indexOf("&&"));}@Overridepublic String balance(List<String> addressList) {if (addressList == null || addressList.isEmpty()) {throw new IllegalArgumentException("服务列表地址为空");}String requestPath = UUID.randomUUID().toString();log.info("发起请求的地址 {}", requestPath);return getServer(requestPath, addressList);}@Overridepublic void addNode(String Node) {if (!realNodes.contains(Node)) {realNodes.add(Node);log.info("真实节点 {} 被添加", Node);for (int i = 0; i < VIRTUAL_NUM; i++) {
// 虚拟节点命名规则为server&&VIRTUAL_STR+i i是虚拟节点的编号String virtualNode = Node + VIRTUAL_STR + i;int hash = getHash(virtualNode);
// 根据hash值对节点排序shards.put(hash, virtualNode);log.info("虚拟节点 {} 被添加,hash为 {}", virtualNode, hash);}}}@Overridepublic void delNode(String Node) {if (realNodes.contains(Node)) {realNodes.remove(Node);log.info("真实节点 {} 以下线", Node);for (int i = 0; i < VIRTUAL_NUM; i++) {
// 虚拟节点命名规则为server&&VIRTUAL_STR+i i是虚拟节点的编号String virtualNode = Node + VIRTUAL_STR + i;int hash = getHash(virtualNode);
// 根据hash值对节点排序shards.remove(hash, virtualNode);log.info("虚拟节点 {} 被下线,hash {} 被移除", virtualNode, hash);}}}}
将负载均衡和缓存加入到netty的请求入口
package com.hyc.ServiceCenter;import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.hyc.ServiceCenter.balance.impl.ConsistencyHashBalance;
import com.hyc.cache.serviceCache;
import com.hyc.constant.RPCConstant;import java.net.InetSocketAddress;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;/*** 用于发现nacos上所需要的服务** @author 冷环渊* @date 2025/4/1 22:11* @description NacosServiceCenter*/
public class NacosServiceCenter {private NamingService namingService;private serviceCache serviceCache;ServiceChangeListener serviceChangeListener;// 命名空间private String nameSpace;public NacosServiceCenter(String nameSpace) {this.serviceCache = new serviceCache();this.nameSpace = nameSpace;init();}public NacosServiceCenter() {this.serviceCache = new serviceCache();init();}// 初始化public void init() {try {Properties properties = new Properties();properties.setProperty("username", RPCConstant.NACOS_USERNAME);properties.setProperty("password", RPCConstant.NACOS_PASSWORD);properties.setProperty("serverAddr", RPCConstant.SERVER_ADDR);if (nameSpace != null && !nameSpace.isEmpty()) {properties.setProperty("nameSpace", nameSpace);}namingService = NacosFactory.createNamingService(properties);serviceChangeListener = new ServiceChangeListener();} catch (NacosException e) {throw new RuntimeException("连接nacos失败", e);}}public InetSocketAddress serviceDiscovery(String serviceName) {try {List<Instance> instances = namingService.selectInstances(serviceName, RPCConstant.RPC_GROUP_NAME, true);namingService.subscribe(serviceName, RPCConstant.RPC_GROUP_NAME, serviceChangeListener);List<String> addressList = serviceCache.getServiceFromCacheMap(serviceName);if (addressList.isEmpty()) {addressList = instances.stream().map(Instance::toInetAddr).collect(Collectors.toList());}String balance = new ConsistencyHashBalance().balance(addressList);return parseAddress(balance);} catch (NacosException e) {throw new RuntimeException(e);}}public boolean checkRetry(String serviceName) {List<Instance> instances = null;try {instances = namingService.selectInstances(serviceName, RPCConstant.RPC_GROUP_NAME, true);} catch (NacosException e) {throw new RuntimeException(e);}String s = instances.get(0).getMetadata().get(serviceName);return Boolean.parseBoolean(s);}// 将字符串解析成为地址public InetSocketAddress parseAddress(String address) {String[] split = address.split(":");return new InetSocketAddress(split[0], Integer.parseInt(split[1]));}
}
测试
服务端日志
客户端日志
可以看到 缓存和负载均衡均已生效
重试与熔断
我们基于resilience4j 来实现重试与熔断
重试配置
package com.hyc.retry;import com.hyc.Message.RpcResponse;
import io.github.resilience4j.retry.Retry;
import io.github.resilience4j.retry.RetryConfig;
import io.github.resilience4j.retry.RetryRegistry;import java.time.Duration;/*** 基于Resilience4j的重试策略** @author 冷环渊* @date 2025/4/1 19:19* @description Resilience4jRetry*/
public class Resilience4jRetry {public static Retry GetRetryTemplate() {// 配置重试策略RetryConfig config = RetryConfig.custom().maxAttempts(2) // 最大重试次数(包括首次调用).waitDuration(Duration.ofSeconds(15)) // 重试间隔.retryOnResult(response -> ((RpcResponse) response).getCode() == 500).retryOnException(e -> e instanceof Exception) // 对哪些异常进行重试.failAfterMaxAttempts(true) // 达到最大重试次数后是否抛出异常.build();// 从注册表中获取或创建重试实例return RetryRegistry.of(config).retry("rpcServiceRetry");}
}
熔断配置
package com.hyc.retry;import com.hyc.Message.RpcResponse;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;
import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry;import java.time.Duration;/*** 熔断器配置 获取类** @author 冷环渊* @date 2025/4/1 14:51* @description RpcCircuitBreaker*/
public class RpcCircuitBreaker {public static CircuitBreaker getCircuitBreaker() {// 配置熔断器CircuitBreakerConfig config = CircuitBreakerConfig.custom().recordResult(response -> ((RpcResponse) response).getCode() == 500).failureRateThreshold(50) // 失败率阈值百分比.slowCallRateThreshold(50) // 慢调用率阈值百分比.slowCallDurationThreshold(Duration.ofSeconds(2)) // 慢调用时间阈值.waitDurationInOpenState(Duration.ofSeconds(15)) // 熔断器从打开到半开的等待时间.permittedNumberOfCallsInHalfOpenState(10) // 半开状态下允许的调用次数.minimumNumberOfCalls(10) // 计算失败率或慢调用率之前所需的最小调用次数.slidingWindowType(CircuitBreakerConfig.SlidingWindowType.COUNT_BASED) // 滑动窗口类型.slidingWindowSize(15) // 滑动窗口大小.recordExceptions(Exception.class) // 记录哪些异常.build();// 创建熔断器注册表CircuitBreakerRegistry registry = CircuitBreakerRegistry.of(config);// 从注册表中获取或创建熔断器return registry.circuitBreaker("rpcServiceCircuitBreaker");}
}
服务代理
我们更新服务代理类的代码 用于监控和编写重试熔断逻辑
package com.hyc.proxy;import com.hyc.Message.MessageType;
import com.hyc.Message.RpcRequest;
import com.hyc.Message.RpcResponse;
import com.hyc.ServiceCenter.NacosServiceCenter;
import com.hyc.netty.client.NettyRpcClient;
import com.hyc.retry.Resilience4jRetry;
import com.hyc.retry.RpcCircuitBreaker;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.retry.Retry;import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.UUID;
import java.util.function.Supplier;/*** 用于创建需要本地执行的任务代理实例** @author 冷环渊* @date 2025/4/10 14:18* @description ServiceProxy*/
public class ServiceProxy implements InvocationHandler {NettyRpcClient rpcClient;NacosServiceCenter nacosServiceCenter;// 熔断降级组件private final CircuitBreaker circuitBreaker;// 重试组件private final Retry r4jRetry;public ServiceProxy() {this.rpcClient = new NettyRpcClient();this.nacosServiceCenter = new NacosServiceCenter();this.circuitBreaker = RpcCircuitBreaker.getCircuitBreaker();this.r4jRetry = Resilience4jRetry.GetRetryTemplate();setupEventListeners();}public <T> T getProxy(Class<T> clazz) {Object o = Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, this);return (T) o;}@Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {//构建requestRpcRequest request = RpcRequest.builder().requestId(UUID.randomUUID().toString()).type(MessageType.REQUEST).interfaceName(method.getDeclaringClass().getName()).methodName(method.getName()).params(args).paramsType(method.getParameterTypes()).build();boolean checkRetry = nacosServiceCenter.checkRetry(request.getInterfaceName());// 定义熔断RPC调用逻辑Supplier<RpcResponse> rpcCall = () -> {try {return rpcClient.sendRequest(request).get();} catch (Exception e) {throw new RuntimeException(e);}};RpcResponse rpcResponse;if (checkRetry) {rpcResponse = CircuitBreaker.decorateSupplier(circuitBreaker, () -> Retry.decorateSupplier(r4jRetry, rpcCall).get()).get();} else {rpcResponse = rpcCall.get();}System.out.println("本次RPC方法请求的结果" + rpcResponse);return rpcResponse.getData();}/*** 熔断器状态监听** @author 冷环渊* date: 2025/4/17 10:25*/public void setupEventListeners() {// 用于调试重试r4jRetry.getEventPublisher().onRetry(event -> {System.out.println("触发重试" + event.getNumberOfRetryAttempts());}).onSuccess(event -> System.out.println("重试成功" + event.getNumberOfRetryAttempts())).onError(event -> System.out.println("重试失败" + event.getNumberOfRetryAttempts()));
// 用于调试熔断器circuitBreaker.getEventPublisher().onStateTransition(event -> {switch (event.getStateTransition().getToState()) {case OPEN:System.out.println("熔断器 open");break;case HALF_OPEN:System.out.println("熔断器 半开");break;case CLOSED:System.out.println("熔断器 关闭");break;default:event.getStateTransition();break;}});}
}
测试
熔断测试客户端请求案例
public static void main(String[] args) throws InterruptedException {ServiceProxy clientProxy = new ServiceProxy();TaskService proxy = clientProxy.getProxy(TaskService.class);for (int i = 0; i < 90; i++) {Integer i1 = i;if (i % 30 == 0) {Thread.sleep(10000);}new Thread(() -> {try {Task user = proxy.getTaskById(i1);System.out.println("获取到任务=" + user.toString());} catch (NullPointerException e) {System.out.println("task为空");e.printStackTrace();}}).start();}}
测试结果
我们将服务端的限流控制在初始只有一个令牌 也就是请求百分之99都错误的情况
客户端此时默认是开启重试与熔断的
package com.hyc.Server.provider;import com.hyc.rateLimit.RateLimit;
import com.hyc.rateLimit.TokenBucketRateLimit;
import lombok.extern.slf4j.Slf4j;import java.util.HashMap;
import java.util.Map;/*** 基于令牌桶限制的服务提供** @author 冷环渊* @date 2025/3/30 14:22* @description RateLimitProvider*/
@Slf4j
public class RateLimitProvider {// 用于存储每隔接口和对应速率显示器的映射关系private Map<String, RateLimit> rateLimitsMap = new HashMap<>();/*** 根据结构名称获取对应的限制器实例 如不存在将创建** @author 冷环渊* date: 2025/3/30 14:24*/public RateLimit getRateLimitByInterFaceName(String interFaceName) {if (!rateLimitsMap.containsKey(interFaceName)) {
// todo 这里先默认的使用固定值创建新的限流器 之后会修改成从配置文件中读取RateLimit rateLimit = new TokenBucketRateLimit(100, 1);rateLimitsMap.put(interFaceName, rateLimit);return rateLimit;}return rateLimitsMap.get(interFaceName);}// todo 这里采用的是服务名限流 之后会拓展基于IP限流 思路大致一致 改变的只会是粗细度
// public RateLimit getRateLimitByInterFaceName()
}
启动服务和客户端
触发批量请求的重试
抢到令牌
失败过多 判断为服务器挂了 开启熔断
读取配置
这个类用于存放默认配置
package com.hyc.config;import com.hyc.Serializer.Serializer;
import com.hyc.ServiceCenter.balance.impl.ConsistencyHashBalance;
import com.hyc.ServiceRegister.NacosServiceRegister;
import lombok.*;/*** @author 冷环渊* @date 2025/4/20 19:48* @description RpcConfigTemplate*/
@AllArgsConstructor
@NoArgsConstructor
@Data
@Builder
@ToString
public class RpcConfigTemplate {//名称private final String name = "FinishRpc";//端口private Integer port = 9999;//主机名private String host = "localhost";//版本号private String version = "1.0.0";//注册中心 暂时只支持nacos 之后如果有时间会多拓展几个private String registry = new NacosServiceRegister().toString();//序列化器 之后回去支持 protoprivate String serializer = Serializer.getSerializerByCode(1).toString();//负载均衡private String loadBalance = new ConsistencyHashBalance().toString();
}
工具类
package com.hyc.util;import cn.hutool.core.util.StrUtil;
import cn.hutool.setting.dialect.Props;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class ConfigUtil {// 加载配置文件,使用默认环境public static <T> T loadConfig(Class<T> targetClass, String prefix) {return loadConfig(targetClass, prefix, "");}// 加载配置文件,支持指定环境public static <T> T loadConfig(Class<T> targetClass, String prefix, String environment) {StringBuilder configFileNameBuilder = new StringBuilder("application");if (StrUtil.isNotBlank(environment)) {configFileNameBuilder.append("-").append(environment);}configFileNameBuilder.append(".properties");// 加载配置文件Props properties = new Props(configFileNameBuilder.toString());if (properties.isEmpty()) {log.warn("配置文件 '{}' 为空或加载失败!", configFileNameBuilder.toString());} else {log.info("加载配置文件: '{}'", configFileNameBuilder.toString());}// 返回转化后的配置对象try {return properties.toBean(targetClass, prefix);} catch (Exception e) {log.error("配置转换失败,目标类: {}", targetClass.getName(), e);throw new RuntimeException("配置加载失败", e);}}
}
创建测试项目
- servera 是客户端
- server b 是服务提供
客户端没什么说的 ,这里展示服务器的配置类
FinishRpc.version=1.0.0
FinishRpc.port=9999
FinishRpc.serializer=Json
FinishRpc.host=localhost
FinishRpc.registry=Nacos
FinishRpc.loadBalance=ConsistencyHash
到这里我们的基础设置就结束了 之后会在自动配置中使用此小节代码
注解启动与服务注册
我们使用reflections
来作为反射工具 为什么不用spring呢 因为基础版本我想作为原生的版本 他依旧可以在spring项目中使用 如果通过spring实现rpc 那么只需要将resttemplate封装并且适配一下rpc就实现了 有空可能去实现一下
启动注解
package com.hyc.annotation;import java.lang.annotation.*;@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface EnableFinishRpcServer {/*** 是否强制使用默认配置(覆盖配置文件)*/boolean useDefaultConfig() default false;
}
注册注解
package com.hyc.annotation;import java.lang.annotation.*;@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface FinishService {String serviceName() default "";boolean canRetry() default false;
}
autoconfig
新建一个autoconfig类
思路就是 :
- 启动类带有启动注解 自动配置服务端 并且扫描带有注册注解的类基于启动类的目录
package com.hyc.config;import com.hyc.annotation.EnableFinishRpcServer;
import com.hyc.annotation.FinishService;
import com.hyc.constant.RPCConstant;
import com.hyc.netty.server.NettyRpcServer;
import com.hyc.provider.ServiceProvider;
import com.hyc.util.ConfigUtil;
import lombok.extern.slf4j.Slf4j;
import org.reflections.Reflections;import java.lang.annotation.Annotation;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;/*** 用于读取配置并且初始化rpc 并且支持启动注解** @author 冷环渊* @date 2025/4/20 20:40* @description RpcAutoConfiguration*/
@Slf4j
public class RpcAutoConfiguration {private static volatile RpcConfigTemplate rpcConfigTemplate;//通过注解初始化 扫描类路径static {scanAndInitializeServer();}/*** 扫描类路径,检查是否有 @EnableFinishRpcServer 注解*/private static void scanAndInitializeServer() {try {String callerPackage = getCallerPackage();System.out.println("包名" + callerPackage);// 检查是否标记了 @EnableFinishRpcServerList<Object> FinishServiceClass = scanServices(callerPackage, FinishService.class);EnableFinishRpcServer EnableFinishRpc = scanService(callerPackage, EnableFinishRpcServer.class).getAnnotation(EnableFinishRpcServer.class);if (EnableFinishRpc.useDefaultConfig()) {initialize(new RpcConfigTemplate()); // 强制使用默认配置} else {initialize(); // 正常加载配置}log.info("RPC 框架通过 @EnableFinishRpcServer 注解初始化完成");if (!FinishServiceClass.isEmpty()) {for (Object scanService : FinishServiceClass) {System.out.println("拿到的类" + scanService);FinishService serviceAnnotation = scanService.getClass().getAnnotation(FinishService.class);ServiceProvider serviceProvider = new ServiceProvider(rpcConfigTemplate.getHost(), rpcConfigTemplate.getPort());if (!serviceAnnotation.serviceName().isEmpty()) {// 发布服务接口到 ServiceProviderserviceProvider.provideServiceInterface(scanService, serviceAnnotation.serviceName(), serviceAnnotation.canRetry()); // 可以设置是否支持重试// 启动 RPC 服务器并监听端口NettyRpcServer server = new NettyRpcServer(serviceProvider);server.start(rpcConfigTemplate.getPort());} else {// 发布服务接口到 ServiceProviderserviceProvider.provideServiceInterface(scanService, serviceAnnotation.canRetry()); // 可以设置是否支持重试// 启动 RPC 服务器并监听端口NettyRpcServer server = new NettyRpcServer(serviceProvider);server.start(rpcConfigTemplate.getPort());}}}} catch (Exception e) {log.warn("未找到 @EnableFinishRpcServer 注解,RPC 框架未自动初始化", e);}}/*** 扫描主类包 用于默认的服务扫描** @author 冷环渊* date: 2025/4/20 21:32*/public static String getCallerPackage() {// 获取调用栈,找到入口类(通常是用户的主类)StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();String mainClassName = stackTrace[stackTrace.length - 1].getClassName();try {Class<?> mainClass = Class.forName(mainClassName);return mainClass.getPackage().getName(); // 返回主类所在包} catch (ClassNotFoundException e) {return ""; // 如果无法获取,则默认全扫描(需过滤 JDK 类)}}public static void initialize(RpcConfigTemplate customRpcConfig) {rpcConfigTemplate = customRpcConfig;System.out.println("RPC 框架初始化,配置 = " + customRpcConfig);}public static void initialize() {RpcConfigTemplate customRpcConfig;try {customRpcConfig = ConfigUtil.loadConfig(RpcConfigTemplate.class, RPCConstant.CONFIG_FILE_PREFIX);} catch (Exception e) {// 配置加载失败,使用默认配置log.warn("配置加载失败,使用默认配置");customRpcConfig = new RpcConfigTemplate();}initialize(customRpcConfig);}public static RpcConfigTemplate getRpcConfig() {if (rpcConfigTemplate == null) {synchronized (RpcConfigTemplate.class) {initialize(); // 确保在第一次调用时初始化}}return rpcConfigTemplate;}/*** 获取带有注解实现类的接口** @author 冷环渊* date: 2025/4/21 14:03*/public static List<Object> scanServices(String basePackage, Class<? extends Annotation> annotationClass) {// 创建Reflections实例,指定要扫描的基础包Reflections reflections = new Reflections(basePackage);ArrayList<Object> objects = new ArrayList<>();// 使用getTypesAnnotatedWith方法获取带有指定注解的所有类Set<Class<?>> annotatedClasses = reflections.getTypesAnnotatedWith(annotationClass);for (Class<?> annotatedClass : annotatedClasses) {try {objects.add(annotatedClass.newInstance());} catch (Exception e) {throw new RuntimeException(e);}}// 将Set转换为List并返回return objects;}public static Class<?> scanService(String basePackage, Class<? extends Annotation> annotationClass) {// 创建Reflections实例,指定要扫描的基础包Reflections reflections = new Reflections(basePackage);Class<?> res = null;// 使用getTypesAnnotatedWith方法获取带有指定注解的所有类Set<Class<?>> annotatedClasses = reflections.getTypesAnnotatedWith(annotationClass);for (Class<?> annotatedClass : annotatedClasses) {res = annotatedClass;}return res;}}
测试
将api项目中的实现类直接诺到我们的serverB
中
加入注解
启动类
@Slf4j
@EnableFinishRpcServer
public class testServer {public static void main(String[] args) {// 框架已在静态代码块中初始化System.out.println("RPC 配置: " + RpcAutoConfiguration.getRpcConfig());}
直接启动测试项目
serverB打印
自动配置完成
总结
目前来看整个的 finishRpc已经具备了最基础的rpc以及可靠性和提升效率的功能 , 初次设计可能编写的很青涩 此项目也会作为博主提升的里程碑 ,之后考虑去基于finish的思路 实现一个网关 , 注册中心等一系列生态 , 然后目前的思路也可能去实现适配spring的rpc , 本篇结束 你的提升来自于你的热爱和想法 我们下一篇见