当前位置: 首页 > news >正文

从零手写 RPC-version1

一、 前置知识

1. 反射

获取字节码的三种方式

  • Class.forName("全类名") (全类名,即包名+类名)
  • 类名.class
  • 对象.getClass() (任意对象都可调用,因为该方法来自Object类)

获取成员方法

Method getMethod(String name,Class<?>...parameterTypes)

参数为:方法名,参数类型<可变>

执行成员方法

Object invoke(Object obj, Object ... args)

参数 1:哪个对象来调用该方法

参数 2 :传入的实参

2. 动态代理

具体代码实现参考下方的"代码实现"部分

动态代理实现流程:

  1. 创建一个类来实现InvocationHandler接口(重写invoke方法)
  2. 调用Proxy.newProxyInstance()来创建代理类(需要将第一步创建的类作为参数传进来)
  3. 通过代理类来调用方法

二、上一个版本中的问题&解决思路

**问题 1:**服务端当前仅支持一个服务,当提供多个服务时,客户端如何指定要调用哪个服务?(当前客户端发送请求数据时,而只会傻乎乎地发送数据,而无法指定具体调用哪个接口)

当服务端提供多个服务时,客户端需要指定调用的服务名称。

创建一个请求对象类Request,其中包含的成员属性有:接口名称、要调用的方法名称、传递的参数数据,服务端利用这些信息,来使用反射调用相应的服务

**问题 2:**当前服务端中的方法返回类型是固定的,但是当有不同的方法时,返回数据类型可能不同,如果客户端要处理这些不同类型的数据,就需要提前知道服务端返回的数据类型。但这显然会违背解耦的原则,降低灵活性,而且不便于后续维护和扩展

引入统一的响应格式——将返回数据封装到一个公共类型中

创建一个响应对象类Response,其中包含的成员属性有:状态码、状态描述、响应数据(这种思想在 javaweb 开发也经常使用)

**问题 3:**在上个版本中,客户端和目标主机建立连接时,采用了硬编码,不够优雅

**问题 4:**如果仍然采用上个版本的客户端代码,那么代码耦合性较高(建立连接、发送请求的代码、接收响应、处理响应结果都写在了一起)

针对问题 3、4,将这些问题抽象了出来,建立一个 IOClient 类,专门建立连接、发送请求、接收响应。

三、本版本目标

  1. 将请求、响应的数据各自封装到一个公共类中,这样在请求和响应时就能进行统一,便于后续代码的书写和维护
  2. 服务端采用循环+BIO的形式,当接收到请求对象时,利用反射调用对应方法,并将执行结果发送给客户端
  3. 将建立连接、发送数据、接收数据的过程封装到专用组件中(会将请求参数封装到一个请求对象中(包含方法、参数类型、参数列表等))
  4. 利用动态代理,拦截所有对接口方法的调用,将其转换为 RPC 请求

四、代码实现

服务端

server/Server
package com.chanlee.crpc.v1.server;import com.chanlee.crpc.v1.common.RpcRequestDTO;
import com.chanlee.crpc.v1.common.RpcResponseDTO;import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.ServerSocket;
import java.net.Socket;
/*** 服务端代码*/
public class Server {private static final int SERVER_PORT = 8005;public static void main(String[] args) {UserServiceImpl userService = new UserServiceImpl();try (ServerSocket serverSocket = new ServerSocket(SERVER_PORT)){System.out.println("服务器已启动...");while(true){Socket socket = serverSocket.accept();//接收到连接请求后,启动一个新线程去处理任务new Thread(() -> {try {// 获取输入、输出流ObjectOutputStream objectOutput = new ObjectOutputStream(socket.getOutputStream());ObjectInputStream objectInput = new ObjectInputStream(socket.getInputStream());//读取接收到的请求RpcRequestDTO request = (RpcRequestDTO) objectInput.readObject();//利用反射调用对应方法Method method = userService.getClass().getMethod(request.getMethod(), request.getParamsTypes());Object invoke = method.invoke(userService, request.getParams());//将调用结果进行封装objectOutput.writeObject(RpcResponseDTO.success(invoke));//及时传递消息objectOutput.flush();} catch (Exception e) {e.printStackTrace();}}).start();}} catch (IOException e) {System.out.println("服务器启动失败...");}}
}
server/UserServiceImpl
package com.chanlee.crpc.v1.server;import com.chanlee.crpc.v1.common.User;
import com.chanlee.crpc.v1.service.UserService;import java.util.Random;
import java.util.UUID;/*** 服务端接口实现类*/
public class UserServiceImpl implements UserService {public User getUserById(int id) {System.out.println("客户端调用id 为 " + id + " 的用户");Random random = new Random();User user = User.builder().id(id).realName(UUID.randomUUID().toString()).age(random.nextInt(50)).build();return user;}public Integer insertUser(User user) {System.out.println("插入用户成功:" + user);return 1;}
}

客户端

client/Client
package com.chanlee.crpc.v1.client;import com.chanlee.crpc.v1.common.User;
import com.chanlee.crpc.v1.service.UserService;import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.Socket;/*** 客户端主代码*/
public class Client{public static void main(String[] args){ClientProxy clientProxy = new ClientProxy("127.0.0.1", 8005);UserService proxy = clientProxy.getProxy(UserService.class);//调用方法 1User user = proxy.getUserById(1);System.out.println("对应的user为:" + user);//调用方法 2User codingBoy = User.builder().age(25).id(32).realName("coding boy").build();Integer i = proxy.insertUser(codingBoy);System.out.println("向服务端插入的 user的Id为:" + i);}
}
client/Proxy
package com.chanlee.crpc.v1.client;import com.chanlee.crpc.v1.common.RpcRequestDTO;
import com.chanlee.crpc.v1.common.RpcResponseDTO;
import lombok.AllArgsConstructor;import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;import static com.chanlee.crpc.v1.client.IOClient.sendRequest;/*** 客户端代理类*/
@AllArgsConstructor
public class ClientProxy implements InvocationHandler {/*** 服务端 IP*/private String host;/*** 服务端端口号*/private int port;public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {//构建request请求RpcRequestDTO request = RpcRequestDTO.builder().interfaceName(method.getDeclaringClass().getName()).method(method.getName()).paramsTypes(method.getParameterTypes()).params(args).build();//发送请求并获取响应RpcResponseDTO<Object> response = sendRequest(host, port, request);//返回结果数据return response.getData();}public <T> T getProxy(Class<T> tClass){Object o = Proxy.newProxyInstance(tClass.getClassLoader(),new Class[]{tClass},this);return (T)o;}
}
client/IOClient
package com.chanlee.crpc.v1.client;import com.chanlee.crpc.v1.common.RpcRequestDTO;
import com.chanlee.crpc.v1.common.RpcResponseDTO;
import lombok.extern.slf4j.Slf4j;import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.net.Socket;/*** 客户端 IO 组件*/
@Slf4j
public class IOClient implements Serializable {public static <T> RpcResponseDTO<T> sendRequest(String host, int port, RpcRequestDTO request){//和服务器建立连接try {Socket socket = new Socket(host, port);// 获取输入流和输出流ObjectOutputStream objectOutput = new ObjectOutputStream(socket.getOutputStream());ObjectInputStream objectInput = new ObjectInputStream(socket.getInputStream());//发送请求objectOutput.writeObject(request);objectOutput.flush();//接收结果RpcResponseDTO<T> response = (RpcResponseDTO<T>) objectInput.readObject();//关闭连接socket.close();//返回结果return response;} catch (IOException e) {log.error("和服务器建立连接失败: {}", e);throw new RuntimeException(e);} catch (ClassNotFoundException e) {log.error("接收结果失败: {}", e);throw new RuntimeException(e);}}
}

服务层

service/UserService
package com.chanlee.crpc.v1.service;import com.chanlee.crpc.v1.common.User;/*** 服务端接口*/
public interface UserService {/*** 根据id获取用户信息* @param id* @return*/User getUserById(int id);/*** 插入用户信息* @param user* @return*/Integer insertUser(User user);
}

公共类

convention/BaseErrorCode
package com.chanlee.crpc.v1.common.convention;/*** 基础错误码*/
public enum BaseErrorCode implements ErrorCode {SERVER_ERROR("A000001", "服务端内部错误");private final String code;private final String message;BaseErrorCode(String code, String message) {this.code = code;this.message = message;}@Overridepublic String code() {return code;}@Overridepublic String message() {return message;}
}
convention/ErrorCode
package com.chanlee.crpc.v1.common.convention;/*** 错误码接口*/
public interface ErrorCode {/*** 错误码*/String code();/*** 错误信息*/String message();
}
commom/RpcRequestDTO
package com.chanlee.crpc.v1.common;import lombok.Builder;
import lombok.Data;import java.io.Serializable;/*** 请求对象体*/
@Builder
@Data
public class RpcRequestDTO implements Serializable {/*** 接口名*/private String interfaceName;/*** 方法名*/private String method;/*** 参数*/private Object[] params;/*** 参数类型*/private Class<?>[] paramsTypes;
}
Common/RpcRespDTO
package com.chanlee.crpc.v1.common;import com.chanlee.crpc.v1.common.convention.BaseErrorCode;
import lombok.Data;
import lombok.experimental.Accessors;import java.io.Serializable;/*** 响应对象体*/
@Data
@Accessors(chain = true)
public class RpcResponseDTO<T> implements Serializable {/*** 正确返回码*/public static final String SUCCESS_CODE = "200";/*** 返回码*/private String code;/*** 返回消息*/private String message;/*** 响应数据*/private T data;public static RpcResponseDTO<Void> success(){return new RpcResponseDTO<Void>().setCode(SUCCESS_CODE);}public static <T> RpcResponseDTO<T> success(T data){return new RpcResponseDTO<T>().setCode(SUCCESS_CODE).setData(data);}public static RpcResponseDTO<Void> failure(){return new RpcResponseDTO<Void>().setMessage(BaseErrorCode.SERVER_ERROR.code()).setCode(BaseErrorCode.SERVER_ERROR.message());}
}
common/User
package com.chanlee.crpc.v1.common;import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;import java.io.Serializable;
/*** 用户类*/
@Builder
@Data
@AllArgsConstructor
@NoArgsConstructor
public class User implements Serializable {/*** 用户id*/Integer id;/*** 用户真实姓名*/String realName;/*** 用户年龄*/Integer age;
}

相关文章:

  • docker在windows下wsl存储路径的变更与数据迁移
  • 区块链技术在物联网中的应用:构建可信的智能世界
  • 从白平衡色温坐标系调整的角度消除硬件不一致性引起的偏色问题
  • Godot学习-3D基本环境设置以及3D角色移动
  • 叶面温度传感器选清易高精度电热感应器 叶片温度变送器
  • OpenGL学习笔记(Blinn-Phong、伽马矫正、阴影)
  • 第一章:基于Docker环境快速搭建LangChain框架的智能对话系统:从langchain环境搭建到多轮对话代码实现(大语言模型加载)
  • 【专题刷题】滑动窗口(四):
  • 介绍 IntelliJ IDEA 快捷键操作
  • HCIP-OSPF综合实验
  • 通过智能分块策略、动态分块、多路召回与重排序融合、异构数据关联与溯源提升Ragflow与LangChain提升RAG的召回率
  • 【高频考点精讲】JavaScript中的访问者模式:从AST解析到数据转换的艺术
  • windos端远程控制ubuntu运行脚本程序并转发ubuntu端脚本输出的网页
  • 开发NESMA辅助工具版本之需求匹配
  • 【KWDB 创作者计划】_上位机知识篇---PlatformIO
  • 深入详解Java中的@PostConstruct注解:实现简洁而高效初始化操作
  • 量子计算浪潮下的安全应对之法
  • 一个关于相对速度的假想的故事-7
  • 迅为RK3562开发板ARM四核A53核心板多种系统适配全开源
  • 汽车免拆诊断案例 | 2013款大众辉腾车发动机抖动
  • 下周起上海浦东将投放5000万元消费券,预计分五周发放
  • 与包乐史驾帆航行|航海、钓鱼和写书:一个记者的再就业之路
  • 一夜跌去200美元,黄金巨震冲上热搜!涨势已近尾声?
  • 灰鹦鹉爆粗口三年未改?云南野生动物园:在持续引导
  • 中国围棋协会将不组队参加今年的LG杯世界棋王赛
  • “很多中国企业竞争力独一无二”,这场对接会上他频频为协同供应链点赞