C++ - 从零实现Json-Rpc框架-2(服务端模块 客户端模块 框架设计)
项⽬设计
本质上来讲,我们要实现的rpc(远端调⽤)思想上并不复杂,甚⾄可以说是简单,其实就是客⼾端想要完成某个任务的处理,但是这个处理的过程并不⾃⼰来完成,⽽是,将请求发送到服务器上,让服务器来帮其完成处理过程,并返回结果,客⼾端拿到结果后返回。
然⽽上图的模型中,是⼀种多对⼀或⼀对⼀的关系,⼀旦服务端掉线,则客⼾端⽆法进⾏远端调⽤,且其服务端的负载也会较⾼,因此在rpc实现中,我们不仅要实现其基本功能,还要再进⼀步,实现分布式架构的rpc。
分布式架构:简单理解就是由多个节点组成的⼀个系统,这些节点通常指的是服务器,将不同的业务或者同⼀个业务拆分分布在不同的节点上,通过协同⼯作解决⾼并发的问题,提⾼系统扩展性和可⽤性。
其实现思想也并不复杂,也就是在原来的模型基础上,增加⼀个注册中⼼,基于注册中⼼不同的服务提供服务器向注册中⼼进⾏服务注册,相当于告诉注册中⼼⾃⼰能够提供什么服务,⽽客⼾端在进⾏远端调⽤前,先通过注册中⼼进⾏服务发现,找到能够提供服务的服务器,然后发起调⽤。
⽽其次的发布订阅功能,则是依托于多个客⼾端围绕服务端进⾏消息的转发。
不过单纯的消息转发功能,并不能满⾜于⼤部分场景的需要,因此会在其基础上实现基于主题订阅的转发。
基于以上功能的合并,我们可以得到⼀个实现所有功能的结构图
在上图的结构中,我们甚⾄可以让每⼀个Server作为备⽤注册中⼼形成分布式架构,⼀旦⼀个注册中⼼下线,可以向备⽤中⼼进⾏注册以及请求,且在此基础上客⼾端在请求Rpc服务的时候,因为可以有多个rpc-provider可选,因此可以实现简单的负载均衡策略,且基于注册中⼼可以更简便实现发布订阅的功能。
项⽬的三个主要功能:
• rpc调⽤
• 服务的注册与发现以及服务的下线/上线通知
• 消息的发布订阅
服务端模块划分
服务端的功能需求:
• 基于⽹络通信接收客⼾端的请求,提供rpc服务
• 基于⽹络通信接收客⼾端的请求,提供服务注册与发现,上线&下线通知
• 基于⽹络通信接收客⼾端的请求,提供主题操作(创建/删除/订阅/取消),消息发布
在服务端的模块划分中,基于以上理解的功能,可以划分出这么⼏个模块
模块名称 功能描述 Network 负责网络通信(TCP/HTTP),接收和发送消息 Protocol 解析和封装 RPC 请求/响应协议(JSON、Protobuf 等) Dispatcher 消息调度模块,将不同的请求分发到正确的处理函数 RpcRouter 远程调用路由,确定请求应该发送到哪个服务器 Publish-Subscribe 发布-订阅模块,支持主题创建、订阅、消息推送 Registry-Discovery 服务注册与发现,管理可用的 RPC 服务 Server 核心服务端,整合所有模块并提供外部接口
1.Network网络通信
该模块为⽹络通信模块,实现底层的⽹络通信功能,这个模块本质上也是⼀个⽐较复杂庞⼤的模块,因此鉴于项⽬的庞⼤,该模块我们将使⽤陈硕⼤佬的Muduo库来进⾏搭建。
2.Protocol解析和封装报文
应⽤层通信协议模块的存在意义:解析数据,解决通信中有可能存在的粘包问题,能够获取到⼀条完整的消息。
在前边的muduo库基本使⽤中,我们能够知道想要让⼀个服务端/客⼾端对消息处理,就要设置⼀个onMessage的回调函数,在这个函数中对收到的数据进⾏应⽤层协议处理。
⽽Protocol模块就是是⽹络通信协议模块的设计,也就是在⽹络通信中,我们必须设计⼀个应⽤层的⽹络通信协议出来,以解决⽹络通信中可能存在的粘包问题,⽽解决粘包有三种⽅式:特殊字符间隔,定⻓,LV格式。(一个定长字段表示后面变长数据的大小)
⽽我们项⽬中将使⽤LV格式来定义应⽤层的通信协议格式
Length:该字段固定4字节⻓度,⽤于表⽰后续的本条消息数据⻓度(不包含本身)。
MType:该字段为Value中的固定字段,固定4字节⻓度,⽤于表⽰该条消息的类型。
◦ Rpc调⽤请求/响应类型消息
◦ 发布/订阅/取消订阅/消息推送类型消息
◦ 主题创建/删除类型消息
◦ 服务注册/发现/上线/下线类型消息
IDLength:为消息中的固定字段,该字段固定4字节⻓度,⽤于描述后续ID字段的实际⻓度。
MID:在每条消息中都会有⼀个固定字段为ID字段,⽤于唯⼀标识消息,ID字段⻓度不固定。
Body:消息主题正⽂数据字段,为请求或响应的实际内容字段。
3.Dispatcher消息派发
模块存在的意义:区分消息类型,根据不同的类型,调⽤不同的业务处理函数进⾏消息处理。
当muduo库底层通信收到数据后,在onMessage回调函数中对数据进⾏应⽤层协议解析,得到⼀条实际消息载荷后,我们就该决定这条消息代表这客⼾端的什么请求,以及应该如何处理。
因此,我们设计出了Dispatcher模块,作为⼀个分发模块,这个模块内部会保存有⼀个hash_map<消息类型, 回调函数>,以此由使⽤者来决定哪条消息⽤哪个业务函数进⾏处理,当收到消息后,在该模块找到其对应的处理回调函数进⾏调⽤即可。
1.Protocol(协议解析):
作用:
onMessage
处理收到的原始数据,解析成完整消息- Protocol 负责拆包,解析 LV 格式,确保数据完整
- 解析后的数据被传递给
onMessageCallback
进行后续处理流程:
- Muduo 网络层 收到数据后,触发
onMessage
回调。onMessage
调用Protocol
解析数据,并将完整消息传递给onMessageCallback
。onMessageCallback
进一步解析消息类型,并选择合适的处理方式。2.Dispatcher(消息分发器)
作用:
- 基于消息类型 (
MType
) 查找对应的业务处理函数- 维护
map<消息类型, 处理回调>
- 注册业务逻辑(
registerHandler()
),确保不同类型的消息能正确处理流程:
onMessage
解析后,提取MType
(消息类型)。- 通过
Dispatcher
的map<MType, 回调函数>
找到对应的业务处理函数。- 调用 注册的回调函数 处理具体的业务逻辑。
3.Muduo 网络模块
作用:
- 负责 TCP 连接管理(
onConnectionCallback
)- 负责 数据接收(
onMessage
)- 自动管理缓冲区,防止数据丢失
流程:
- 当有新的客户端连接,Muduo 触发
onConnectionCallback
进行管理。- 客户端发送数据后,Muduo 触发
onMessage
处理数据。- 解析后的消息被传递给
Dispatcher
进行业务处理。
整体框架:
Muduo::TcpServer / TcpClient (管理连接 & 数据收发) │ ▼ 收到数据 -> 触发 `onMessage` │ ▼ `Protocol` 解析完整消息 (解决粘包问题) │ ▼ `Dispatcher` 分发业务逻辑 │ ├── RPC 处理 ├── 服务注册 & 发现 ├── 发布订阅 └── 其他业务逻辑
关系总结
✅ Muduo 负责网络通信,触发
模块 作用 依赖 Muduo 网络通信,负责 TCP 连接、数据收发 onMessage
触发Protocol
解析Protocol 解析数据流,解决粘包问题,提取完整消息 解析数据后调用 Dispatcher
Dispatcher 基于消息类型 MType
分发业务逻辑依赖 Protocol
解析的MType, Body
业务逻辑 最终执行具体的 RPC、注册、订阅等业务 通过 registerHandler()
注册到Dispatcher
onMessage
✅ Protocol 负责协议解析,保证完整消息
✅ Dispatcher 负责分发业务逻辑,调用相应处理函数
4. RpcRouter RPC 路由器(处理 RPC
类型的消息)
RpcRouter模块存在的意义:提供rpc请求的处理回调函数,内部所要实现的功能,分辨出客⼾端请求的服务进⾏处理得到结果进⾏响应。
pc请求中,最关键的两个点:
• 请求⽅法名称
• 请求对应要处理的参数信息
在Rpc远端调⽤中,⾸先将客⼾端到服务端的通信链路打通,然后将⾃⼰所需要调⽤的服务名称,以及参数信息传递给服务端,由服务端进⾏接收处理,并返回结果。
⽽,不管是客⼾端要传递给服务端的服务名称以及参数信息,或者服务端返回的结果,都是在上边Protocol中定义的Body字段中,因此Body字段中就存在了另⼀层的正⽂序列化/反序列化过程。
序列化⽅式有很多种,鉴于当前我们是json-rpc,因此这个序列化过程我们就初步使⽤json序列化来进⾏.
需要注意的是,在服务端,当接收到这么⼀条消息后,Dispatcher模块会找到该Rpc请求类型的回调处理函数进⾏业务处理,但是在进⾏业务处理的时候,也是只会将 parameters 参数字段传⼊回调函数中进⾏处理。
然⽽,对服务端来说,应该从传⼊的Json::Value对象中,有什么样的参数,以及参数信息是否符合⾃⼰所提供的服务的要求,都应该有⼀个检测,是否符合要求,符合要求了再取出指定字段的数据进⾏处理。
因此,对服务端来说,在进⾏服务注册的时候,必须有⼀个服务描述,以代码段中的Add请求为例,该服务描述中就应该描述:
◦ 服务名称: Add,
◦ 参数名称: num1,是⼀个整形
◦ 参数名称: num2,是⼀个整形,
◦ 返回值类型:整形
有了这个描述,在回调函数中就可以先对传⼊的参数进⾏校验,没问题了则取出指定字段数据进⾏处理并返回结果
基于以上理解,在实现该模块时,该有以下设计:
1. 该模块必须具备⼀个Rpc路由管理,其中包含对于每个服务的参数校验功能
2. 该模块必须具备⼀个⽅法名称和⽅法业务回调的映射
3. 该模块必须向外提供 Rpc请求处理接口。
模块 作用 ServiceDescribe
维护 方法名、参数格式、回调函数 RpcRouter
维护方法名称与 ServiceDescribe
的映射,提供RPC
处理能力Dispatcher
分发 RPC 请求
到onRpcRequest
onRpcRequest
接收 Dispatcher
分发的请求,调用RpcRouter
处理
5.Publish-Subscribe 发布订阅(处理发布订阅请求)
Publish-Subscribe模块存在的意义:针对发布订阅请求进⾏处理,提供⼀个回调函数设置给
Dispatcher模块。
发布订阅所包含的请求操作:
• 主题的创建
• 主题的删除
• 主题的订阅
• 主题的取消订阅
• 主题消息的发布
在当前的项⽬中,我们也实现⼀个简单的发布订阅功能,该功能是围绕多个客⼾端与⼀个服务端来展开的。
即,任意⼀个客⼾端在发布或订阅之前先创建⼀个主题,⽐如在新闻发布中我们创建⼀个⾳乐新闻主题,哪些客⼾端希望能够收到⾳乐新闻相关的消息,则就订阅这个主题,服务端会建⽴起该主题与客⼾端之间的联系。
当某个客⼾端向服务端发布消息,且发布消息的⽬标主题是⾳乐新闻主题,则服务端会找出订阅了该主题的客⼾端,将消息推送给这些客⼾端。功能思想并不复杂,因此我们需要把更多的精⼒放到其实现设计上:
1. 该模块必须具备⼀个主题管理,且主题中需要保存订阅了该主题的客⼾端连接
a. 主题收到⼀条消息,需要将这条消息推送给订阅了该主题的所有客⼾端
2. 该模块必须具备⼀个订阅者管理,且每个订阅者描述中都必须保存⾃⼰所订阅的主题名称
a. ⽬的是为了当⼀个订阅客⼾端断开连接时,能够找到订阅信息的关联关系,进⾏删除
3. 该模块必须向外提供 主题创建/销毁,主题订阅/取消订阅,消息发布处理的业务处理函数
模块 作用 依赖 Dispatcher 基于 MType
分发订阅/发布
请求PubSubManager
PubSubManager 管理 Topic
&Subscriber
,执行订阅/发布
Topic
,Subscriber
Topic 维护订阅者列表,处理 publish()
Subscriber
Subscriber 记录已订阅的主题,用于取消订阅 Topic
Connection 代表客户端,标识某个 Subscriber
Subscriber
✅ Dispatcher 负责解析 & 分发
✅ PubSubManager 负责 订阅 & 主题 逻辑
✅ Topic 维护 订阅者列表,支持 消息推送
✅ Subscriber 维护 已订阅主题,支持 取消订阅
6.Registry-Discovery服务注册与发现请求的处理
Registry-Discovery模块存在的意义:就是针对服务注册与发现请求的处理。
• 服务注册/发现类型请求中的详细划分
◦ 服务注册:服务provider告诉中转中⼼,⾃⼰能提供哪些服务
◦ 服务发现:服务caller询问中转中⼼,谁能提供指定服务
◦ 服务上线:在⼀个provider上线了指定服务后,通知发现过该服务的客⼾端有个provider可以提供该服务
◦ 服务下线:在⼀个provider断开连接,通知发现过该服务的caller,谁下线了哪个服务
服务注册模块,该模块主要是为了实现分布式架构⽽存在,让每⼀个rpc客⼾端能够从不同的节点主机上获取⾃⼰所需的服务,让业务更具扩展性,系统更具健壮性。
⽽为了能够让rpc-caller知道有哪些rpc-provider能提供⾃⼰所需服务,那么就需要有⼀个注册中⼼让这些rpc-provider去注册登记⾃⼰的服务,让rpc-caller来发现这些服务。
因此,在我们的服务端功能中,还需实现服务的注册/发现,以及服务的上线/下线功能。该模块的设计如下:
1. 必须具备⼀个服务发现者的管理:
a. ⽅法与发现者:当⼀个客⼾端进⾏服务发现的时候,进⾏记录谁发现过该服务,当有⼀个新的提供者上线的时候,可以通知该发现者
b. 连接与发现者 :当⼀个发现者断开连接了,删除关联关系,往后就不需要通知了
2. 必须具备⼀个服务提供者的管理:
a. 连接与提供者:当⼀个提供者断开连接的时候,能够通知该提供者提供的服务对应的发现者,该主机的该服务下线了.(提供者的所有方法删除并通知对应的发现者)
b. ⽅法与提供者:能够知道谁的哪些⽅法下线了,然后通知发现过该⽅法的客⼾端(提供者的一部分方法删除并通知对应的发现者)
3. 必须向Dispatcher模块提供⼀个服务注册/发现的业务处理回调函数这样,当⼀个rpc-provider登记了服务,则将其管理起来,当rpc-caller进⾏服务发现时,则将保存的对应服务所对应的主机信息,响应给rpc-caller。
✅ Dispatcher 负责解析 & 分发
组件 作用 Dispatcher
解析 RPC MType
,分发服务请求RDManager
维护 Provider & Caller,提供 onServiceRequest()
Provider
记录 提供的 method
,管理host:port
Discoverer
记录 发现的 method
,支持通知 Caller
✅ RDManager 负责 服务注册 & 发现
✅ Provider 维护 服务信息,支持 上线/下线
✅ Discoverer 维护 发现信息,支持 Caller 通知
7.Server
当以上的所有功能模块都完成后,我们就可以将所有功能整合到⼀起来实现服务端程序了。
• RpcServer:rpc功能模块与⽹络通信部分结合。
• RegistryServer:服务发现注册功能模块与⽹络通信部分结合
• TopicServer:发布订阅功能模块与⽹络通信部分结合。
模块 | 作用 | 核心组件 |
---|---|---|
RpcServer | 提供 RPC 服务,并进行远程调用 | TcpServer 、RpcRouter 、Dispatcher |
RegistryServer | 管理服务注册/发现,通知上线/下线 | RegDisManager 、Registry 、Discovery |
TopicServer | 实现发布-订阅机制,支持消息推送 | PubSubManager 、Topic 、Subscriber |
客⼾端模块划分
在客⼾端的模块划分中,基于以上理解的功能,可以划分出这么⼏个模块
1. Protocol:应⽤层通信协议模块
2. Network:⽹络通信模块
3. Dispatcher:消息分发处理模块
4. Requestor:请求管理模块
5. RpcCaller:远端调⽤功能模块
6. Publish-Subscribe:发布订阅功能模块
7. Registry-Discovery:服务注册/发现/上线/下线功能模块
8. Client:基于以上模块整合⽽出的客⼾端模块
1 Network
⽹络通信基于muduo库实现⽹络通信客⼾端
2 Protocol
应⽤层通信协议处理,与服务端保持⼀致。
3 Dispatcher
IO数据分发处理,逻辑与服务端⼀致
4.Requestor 管理请求
Requestor模块存在的意义:针对客⼾端的每⼀条请求进⾏管理,以便于对请求对应的响应做出合适的操作。
⾸先,对于客⼾端来说,不同的地⽅在于,更多时候客⼾端是请求⽅,是主动发起请求服务的⼀⽅,⽽在多线程的⽹络通信中,多线程下,针对多个请求进⾏响应可能会存在时序的问题,这种情况下,则我们⽆法保证⼀个线程发送⼀个请求后,接下来接收到的响应就是针对⾃⼰这条请求的响应,这种情况是⾮常危险的⼀种情况。(请求顺序 ≠ 响应顺序)
其次,类似于Muduo库这种异步IO⽹络通信库,通常IO操作都是异步操作,即发送数据就是把数据放⼊发送缓冲区,但是什么时候会发送由底层的⽹络库来进⾏协调,并且也并不会提供recv接⼝(无法直接阻塞等待响应),⽽是在连接触发可读事件后,IO读取数据完成后调⽤处理回调进⾏数据处理,因此也⽆法直接在发送请求后去等待该条请求的响应。
针对以上问题,我们则创建出当前的请求管理模块来解决,它的思想也⾮常简单,就是给每⼀个请求都设定⼀个请求ID,服务端进⾏响应的时候标识响应针对的是哪个请求(也就是响应信息中会包含请求ID),因此客⼾端这边我们不管收到哪条请求的响应,将数据存储⼊⼀则hash_map中,以请求ID作为映射,并向外提供获取指定请求ID响应的阻塞接⼝,这样只要在发送请求的时候知道⾃⼰的请求ID,那么就能获取到⾃⼰想要的响应,⽽不会出现异常。
请求-响应匹配
- 在多线程异步环境中,请求顺序 ≠ 响应顺序,无法直接阻塞等待响应,需要
RID
进行匹配。异步 I/O
Muduo
等库不支持recv
操作,而是 事件触发回调处理,需要Requestor
统一管理。针对这个思想,我们再进⼀步,可以将每个请求进⼀步封装描述,添加⼊异步的future控制,或者设置回调函数的⽅式,在不仅可以阻塞获取响应,也可以实现异步获取响应以及回调处理响应。
1️⃣ 客户端调用 `send(request, callback)` 或 `send(request, future)`
2️⃣ `Requestor` 生成 `RID`,将请求存入 `map<rid, describe>`
3️⃣ `Requestor` 发送请求至 `Server`
4️⃣ `Server` 处理请求,并回传 `response(RID, result)`
5️⃣ `Requestor` 收到响应后:
- `RID` 关联到请求,找到 **回调函数** 或 **Future**
- 执行 **回调** 或 **解除 Future 阻塞**
组件 作用 Requestor 负责管理请求,存储 RID → 请求描述
request 客户端请求,包含 RID
、MType
、Body
describe 记录请求信息,可选 callback
或future
map<rid, describe> 记录所有待匹配的请求 onResponse() 解析 RID
,匹配对应请求,执行回调或解除阻塞Dispatcher 解析 MType
,分发不同类型的响应
5.RpcCaller 提供RPC调用接口
RpcCaller
是 客户端 RPC 远程调用的封装模块,它的主要作用是:
- 提供统一的 RPC 调用接口
- 支持不同的调用模式(同步、异步、回调)
- 封装底层
Requestor
,发送 RPC 请求并获取响应- 提高易用性,使用户无需关心底层网络通信
调用方式 接口 特点 同步调用 bool call(method, params, response)
阻塞等待,直到获取结果 异步调用 bool call(method, params, std::future<response>)
立即返回,稍后通过 future.get()
获取结果回调调用 bool call(method, params, callback)
立即返回,结果返回后自动触发 callback
1️⃣ 用户调用 `call(method, params, ...)`
2️⃣ `RpcCaller` 通过 `Requestor` 发送请求
3️⃣ `Server` 处理请求,返回 `response`
4️⃣ `Requestor` 解析响应,触发 `future.set_value()` 或 `callback`
5️⃣ `RpcCaller` 获取结果,并返回给用户
6.Publish-Subscribe 发布订阅
Publish-Subscribe模块存在意义:向⽤⼾提供发布订阅所需的接⼝,针对推送过来的消息进⾏处理。
发布订阅稍微能复杂⼀丢丢,因为在发布订阅中有两种⻆⾊,⼀个客⼾端可能是消息的发布者,也可能是消息的订阅者。
⽽且不管是哪个⻆⾊都是对主题进⾏操作,因此其中也包含了主题的相关操作,⽐如,要发布⼀条消息需要先创建主题。
且⼀个订阅者可能会订阅多个主题,每个主题的消息可能都会有不同的处理⽅式,因此需要有订阅者主题回调的管理。
Publish-Subscribe
(发布订阅)模块用于 消息推送,提供:
- 客户端的发布/订阅功能
- 主题管理(创建、删除、订阅、取消订阅)
- 消息路由,向订阅者分发消息
- 回调管理,保证不同主题有独立的回调处理逻辑
📌 主要特点
- 客户端既可以是 消息发布者,也可以是 订阅者
- 订阅者可能 同时订阅多个主题
- 每个主题的 消息处理方式可能不同
- 需要 确保订阅者能收到正确的主题消息
+------------------------------------------------------+ | Publish-Subscribe 模块 | +------------------------------------------------------+ | PubSubManager (管理发布 & 订阅) | | - createTopic(key) // 创建主题 | | - removeTopic(key) // 删除主题 | | - subscribe(key, callback) // 订阅主题 | | - cancel(key) // 取消订阅 | | - publish(key, content) // 发布消息 | |------------------------------------------------------| | Dispatcher (onPublish 处理 & 分发) | |------------------------------------------------------| | Requestor (发送订阅 & 发布请求到服务器) | +------------------------------------------------------+
组件 作用 PubSubManager 维护 主题
和订阅者
关系Dispatcher 处理 onPublish()
事件,匹配map<mtype, handler>
map<key, callback> 记录 主题 → 订阅者回调
关系Requestor 负责向 Server
发送发布/订阅
请求onPublish() 解析 发布请求
,调用订阅者回调
7.Registry-Discovery服务注册与发现
服务注册和发现模块需要实现的功能会稍微复杂⼀些,因为分为两个⻆⾊来完成其功能
1. 注册者:作为Rpc服务的提供者,需要向注册中⼼注册服务,因此需要实现向服务器注册服务的功能
2. 发现者:作为Rpc服务的调⽤者,需要先进⾏服务发现,也就是向服务器发送请求获取能够提供指定服务的主机地址,获取地址后需要管理起来留⽤,且作为发现者,需要关注注册中⼼发送过来的
服务上线/下线消息,以及时对已经下线的服务和主机进⾏管理。服务注册与发现模块主要包括:
- Registrar(注册者)
- 负责 Rpc 服务提供者 (
provider
) 向 注册中心 (RegistryServer
) 注册自己提供的服务。- 通过
Requestor
发送registryService()
请求。- Discoverer(发现者)
- 负责 Rpc 调用方 (
caller
) 发现可用的服务提供者 (provider
)。- 需要定期向 注册中心 发送
serviceDiscovery()
请求获取服务信息。- 监听
onServiceNotify()
处理 服务上线/下线通知,保持可用provider
列表更新。
✅ Registrar 负责 provider 注册服务
✅ Discoverer 负责 caller 发现服务
✅ onServiceNotify() 负责处理 provider 上下线通知
✅ Requestor 负责网络通信,确保请求发送到 RegistryServer
8.Client
以上模块进⾏整合就可以实现各个功能的客⼾端了。
客户端是整个 JSON-RPC 远程调用体系 的调用者,包含四大核心模块:
- RegistryClient —— 负责服务注册
- DiscoveryClient —— 负责服务发现
- RpcClient —— 负责调用 RPC 远程方法
- TopicClient —— 负责发布/订阅功能
框架设计
在当前项⽬的实现中,我们将整个项⽬的实现划分为三层来进⾏实现
1. 抽象层:将底层的⽹络通信以及应⽤层通信协议以及请求响应进⾏抽象,使项⽬更具扩展性和灵活性。
2. 具象层:针对抽象的功能进⾏具体的实现。
3. 业务层:基于抽象的框架在上层实现项⽬所需功能。
1.抽象层
在 Json-RPC 框架 中,我们采用了 Muduo 进行网络通信,并使用 LV 格式协议 解决粘包问题,同时 JSON 作为序列化方式。但:
- 未来可能更换网络库(如
asio
、libevent
)- 可能更换通信协议(如
protobuf
取代JSON
)- 可能调整底层实现(如
TCP
切换到UDP
)因此,设计一个抽象层,使上层业务代码不依赖具体实现,提升框架的可扩展性。
结构分析
✅
BaseBuffer
(数据缓冲)📌 作用
- 处理 字节流数据的解析
- 读取
int32_t
类型的消息头信息(如Length
)📌 核心函数
🔹 readableBytes() // 可读字节数 🔹 peekInt32() // 预读取 int32_t,不改变位置 🔹 readInt32() // 读取 int32_t 🔹 retrieveInt32() // 取出 int32_t 🔹 retrieveAsString(size_t) // 取出字符串
✅ 提供一个可复用的
Buffer
处理机制
✅
BaseMessage
(通用消息抽象)📌 作用
- 统一封装 请求 / 响应消息
- 进行 序列化 / 反序列化
- 设置消息 ID / 类型
📌 核心函数
🔹 mtype() // 获取消息类型 🔹 setMType(MsgType) // 设置消息类型 🔹 id() // 获取消息 ID 🔹 setId(const std::string &id) // 设置消息 ID 🔹 serialize(std::string &body) // 序列化 🔹 unserialize(const std::string &body) // 反序列化 🔹 check() // 检查消息完整性
✅ 无论是
JSON
还是protobuf
,都可以通过BaseMessage
进行封装
✅
BaseProtocol
(协议层)📌 作用
- 解析接收到的
BaseBuffer
- 检查数据是否完整
- 负责
onMessage()
回调📌 核心函数
🔹 onMessage(BaseBufferPtr &, BaseMessagePtr &) // 解析数据 🔹 serialize(const BaseMessagePtr &) // 进行序列化 🔹 canProcessed(const BaseBufferPtr &) // 检查数据是否完整
✅ 统一不同的
通信协议
解析流程
✅
BaseConnection
(通用连接封装)📌 作用
- 处理 发送 / 关闭 / 连接状态
- 用于
Server
和Client
复用📌 核心函数
🔹 send(const BaseMessagePtr &) // 发送数据 🔹 shutdown() // 关闭连接 🔹 connected() // 是否连接
✅ 无论是
TCP
还是UDP
,都可以使用BaseConnection
✅
BaseServer
(通用服务器)📌 作用
- 提供 连接 / 关闭 / 消息回调
- 底层可以使用
Muduo
或asio
📌 核心函数
🔹 setConnectionCallback(const ConnectionCallback &) // 连接回调 🔹 setCloseCallback(const CloseCallback &) // 关闭回调 🔹 setMessageCallback(const MessageCallback &) // 消息处理回调 🔹 start() // 启动服务器
✅ 让
Server
具备统一接口,支持不同网络库
✅
BaseClient
(通用客户端)📌 作用
- 负责 客户端网络通信
- 统一 不同
网络库
的Client
行为📌 核心函数
🔹 setConnectionCallback(const ConnectionCallback &) // 连接回调 🔹 setCloseCallback(const CloseCallback &) // 关闭回调 🔹 setMessageCallback(const MessageCallback &) // 消息回调 🔹 connect() // 连接 🔹 send(const BaseMessagePtr &) // 发送消息 🔹 shutdown() // 关闭 🔹 connected() // 是否连接 🔹 BaseConnectionPtr connection() // 获取连接
✅ 使
Client
具有统一封装,支持多种通信方式
结论
✅
BaseBuffer
统一数据流
处理
✅BaseMessage
统一序列化
行为
✅BaseProtocol
统一协议解析
✅BaseConnection
统一连接管理
✅BaseServer/BaseClient
统一网络通信
2.具象层
具象层就是针对抽象的具体实现。
⽽具体的实现也⽐较简单,从抽象类派⽣出具体功能的派⽣类,然后在内部实现各个接⼝功能即可。
• 基于Muduo库实现⽹络通信部分抽象
• 基于LV通信协议实现Protocol部分抽象
不过这⼀层中⽐较特殊的是,我们需要针对不同的请求,从BaseMessage中派⽣出不同的请求和响应类型,以便于在针对指定消息处理时,能够更加轻松的获取或设置请求及响应中的各项数据元素。
3.业务层
业务层就是基于底层的通信框架,针对项⽬中具体的业务功能的实现了,⽐如Rpc请求的处理,发布订阅请求的处理以及服务注册与发现的处理等等。
rpc
整个架构分为三个部分:
- Server 端 (紫色区域):处理
RPC
逻辑,解析请求并执行业务。- Network 层 (黄色区域):负责数据收发,与
Dispatcher
模块交互。- Client 端 (绿色区域):提供
RPC
访问接口,向外部应用暴露调用能力。
RPC 请求流程
Client
端调用RpcCaller.call("Add")
RpcCaller
通过Requestor
发送请求Network
层将请求发送到Server
Server
端RpcRouter
解析请求Dispatcher
分发至onRpcRequest
处理- 业务逻辑执行完成,
Server
端返回结果Network
层转发至Client
onRpcResponse
处理响应,并返回给RpcCaller
发布订阅
该架构图展示了 发布-订阅(Pub-Sub)模式 的完整交互流程,主要涉及:
- Server(紫色区域) - 处理 订阅、取消订阅、消息发布 逻辑
- Network(黄色区域) - 负责 通信数据的转发
- Client(绿色区域) - 负责 向服务器发送订阅/发布请求,并处理收到的推送消息
整个系统由以下 五大核心组件 组成:
✅ 1.
PSManager
(发布订阅管理器)
- Server 端 和 Client 端 各自拥有一个
PSManager
- 在
Server
端,PSManager
负责管理 所有的主题和订阅者- 在
Client
端,PSManager
负责 维护本地订阅状态✅ 2.
Dispatcher
(请求分发器)
Dispatcher
负责将不同的订阅/发布请求分发到对应的 请求处理器- 例如:
topicSubscribe
请求 -> 订阅一个主题topicCancel
请求 -> 取消订阅onPublish
请求 -> 触发消息发布✅ 3.
Requestor
(客户端请求管理)
Requestor
管理客户端发出的所有订阅和发布请求- 维护 请求 ID,确保正确匹配 响应数据
- 例如:
- 发送
topicSubscribe
请求时,Requestor
记录该请求- 当服务器返回订阅确认时,
Requestor
处理该响应✅ 4.
Network
(网络通信模块)
- 负责
Client
和Server
之间的 数据传输- 作用:
- 发送
topicSubscribe
、topicPublish
请求- 转发 服务器推送的消息给
Client
✅ 5.
Publish
(消息发布处理器)
- 负责 将消息推送给订阅该主题的客户端
- 例如:
PSManager
发现Topic_A
有新消息Publish
找到所有订阅Topic_A
的客户端- 通过
Network
发送推送消息
数据流分析
✅ 1. 订阅主题
📝 场景:
Client
订阅Topic_A
- Client 端
topicSubscribe("Topic_A")
被调用Requestor
发送 订阅请求Network
将请求 转发至Server
- Server 端
Dispatcher
接收topicSubscribe
请求PSManager
记录该 客户端对Topic_A
的订阅- 返回订阅成功 响应给
Client
- Client 端
onTopicResponse
处理订阅响应- 订阅成功 ✅
✅ 2. 发布消息
📝 场景:
Client
向Topic_A
发布一条消息
Client 端
onPublish("Topic_A", "Hello!")
被调用Requestor
发送 消息发布请求Network
将请求发送给Server
Server 端
Dispatcher
接收onPublish
请求PSManager
检查 哪些客户端订阅了Topic_A
- 通过
Publish
模块,将消息发送给订阅者Client 端
- 订阅
Topic_A
的客户端 收到推送消息onTopicResponse
处理消息推送 ✅
✅ 3. 取消订阅
📝 场景:
Client
取消订阅Topic_A
Client 端
topicCancel("Topic_A")
被调用Requestor
发送 取消订阅请求Network
将请求发送给Server
Server 端
Dispatcher
接收topicCancel
请求PSManager
删除该客户端的 订阅记录- 返回取消成功 响应给
Client
Client 端
onTopicResponse
处理取消订阅响应 ✅
✅
Server
端 (PSManager
) 负责:管理 主题 & 订阅者
✅Client
端 (PSManager
) 负责:维护 本地订阅状态
✅Dispatcher
负责:分发请求到正确的 订阅/发布处理器
✅Requestor
负责:管理请求,并匹配服务器的 响应
✅Publish
负责:将消息推送给 所有订阅该主题的客户端
服务注册&发现
该架构图展示了 服务注册 & 发现(Registry-Discovery, RD) 的完整交互流程,主要涉及:
- Server(紫色区域) - 负责 服务注册、发现 & 维护
- Network(黄色区域) - 负责 数据转发
- Client(绿色区域) - 负责 服务注册、发现 & 监听变更
整个系统由以下 六大核心组件 组成:
✅ 1.
RDManager
(注册发现管理器)
- Server 端 和 Client 端 各自拥有一个
RDManager
- 在
Server
端,RDManager
负责管理 所有服务信息- 在
Client
端,RDManager
负责 维护本地服务状态✅ 2.
Provider
(服务提供者)
- 代表 提供 RPC 服务的节点
- 通过
serviceRegistry()
向RDManager
注册自己- 断开连接后,会 触发
offlineNotify
通知Client
✅ 3.
Discoverer
(服务发现者)
- 代表 需要调用远程服务的客户端
- 通过
serviceDiscovery()
询问RDManager
可用的服务- 监听
onlineNotify
和offlineNotify
,获取 服务上下线事件✅ 4.
Dispatcher
(请求分发器)
Dispatcher
负责将不同的 服务发现 & 变更通知请求 分发到对应的处理器- 例如:
onRegistry
处理 服务注册onDiscovery
处理 服务发现onlineNotify
&offlineNotify
处理 服务上下线✅ 5.
Requestor
(请求管理)
- 管理客户端发出的所有
RD
相关请求- 维护 请求 ID,确保正确匹配 响应数据
- 例如:
- 发送
serviceDiscovery()
请求时,Requestor
记录该请求- 当服务器返回服务信息时,
Requestor
处理该响应✅ 6.
Network
(网络通信)
- 负责
Client
和Server
之间的RD
数据传输- 作用:
- 发送
serviceRegistry()
、serviceDiscovery()
请求- 转发 服务器推送的 服务上下线通知
数据流分析
✅ 1. 服务注册
📝 场景:
Provider
注册服务
Client 端
Provider
调用serviceRegistry("Add", host:1234)
Requestor
发送 注册请求Network
将请求转发至Server
Server 端
Dispatcher
解析serviceRegistry
RDManager
记录Add
方法的可用host
- 返回注册成功 响应给
Client
Client 端
onRDResponse
处理注册成功响应 ✅
✅ 2. 服务发现
📝 场景:
Discoverer
查询可用Add
服务
Client 端
serviceDiscovery("Add")
被调用Requestor
发送 发现请求Network
将请求转发至Server
Server 端
Dispatcher
解析serviceDiscovery
RDManager
查询 当前可用的Add
提供者- 返回
Add
提供者列表 给Client
Client 端
onRDResponse
处理 发现响应Discoverer
记录可用 服务提供者列表- ✅ 成功找到
Add
服务可用节点
✅ 3. 服务上线通知
📝 场景:
Provider
新增可用Add
服务
- Server 端
Provider
向RDManager
注册新的Add
服务RDManager
发现 有Discoverer
查询过Add
- 发送
onlineNotify
给所有Discoverer
- Client 端
onRDNotify
处理 新服务上线通知Discoverer
更新可用Add
提供者列表 ✅
✅ 4. 服务下线通知
📝 场景:
Provider
断开连接,Add
服务下线
- Server 端
Provider
断开,RDManager
检测到Add
服务不可用RDManager
向所有Discoverer
发送offlineNotify
- Client 端
onRDNotify
处理 服务下线通知Discoverer
更新可用Add
提供者列表 ✅