用一张时序图来描述下整个过程:
PS:RPC Proxy有时候也叫Stub(存根):(Client Stub,Server Stub)
为屏蔽客户调用远程主机上的对象,必须提供某种方式来模拟本地对象,这种本地对象称为存根(stub),存根负责接收本地方法调用,并将它们委派给各自的具体实现对象
PRC服务实现的过程中其实就两核心点:
消息协议:客户端调用的参数和服务端的返回值这些在网络上传输的数据以何种方式打包编码和拆包解码
经典代表:Protocol Buffers
传输控制:在网络中数据的收发传输控制具体如何实现(TCP/UDP/HTTP)
2.手写RPC下面我们就根据上面的流程来手写一个简单的RPC:
1.Client调用:
# client.py from client_stub import ClientStub def main(): stub = ClientStub(("192.168.36.144", 50051)) result = stub.get("sum", (1, 2)) print(f"1+2={result}") result = stub.get("sum", (1.1, 2)) print(f"1.1+2={result}") time_str = stub.get("get_time") print(time_str) if __name__ == "__main__": main()输出:
1+2=3 1.1+2.2=3.1 Wed Jan 16 222.Client Stub,客户端存根:(主要有打包、解包、和RPC服务器通信的方法)
# client_stub.py import socket class ClientStub(object): def __init__(self, address): """address ==> (ip,port)""" self.socket = socket.socket() self.socket.connect(address) def convert(self, obj): """根据类型转换成对应的类型编号""" if isinstance(obj, int): return 1 if isinstance(obj, float): return 2 if isinstance(obj, str): return 3 def pack(self, func, args): """打包:把方法和参数拼接成自定义的协议 格式:func:函数名@params:类型-参数,类型2-参数2... """ result = f"func:{func}" if args: params = "" # params:类型-参数,类型2-参数2... for item in args: params += f"{self.convert(item)}-{item}," # 去除最后一个, result += f"@params:{params[:-1]}" # print(result) # log 输出 return result.encode("utf-8") def unpack(self, data): """解包:获取返回结果""" msg = data.decode("utf-8") # 格式应该是"data:xxxx" params = msg.split(":") if len(params) > 1: return params[1] return None def get(self, func, args=None): """1.客户端的RPC Proxy组件收到调用后,负责将被调用的方法名、参数等打包编码成自定义的协议""" data = self.pack(func, args) # 2.客户端的RPC Proxy组件在打包完成后通过网络把数据包发送给RPC Server self.socket.send(data) # 等待服务端返回结果 data = self.socket.recv(2048) if data: return self.unpack(data) return None简要说明下:(我根据流程在Code里面标注了,看起来应该很轻松)
之前有说到核心其实就是消息协议and传输控制,我客户端存根的消息协议是自定义的格式(后面会说简化方案):func:函数名@params:类型-参数,类型2-参数2...,传输我是基于TCP进行了简单的封装
3.Server端:(实现很简单)
# server.py import socket from server_stub import ServerStub class RPCServer(object): def __init__(self, address, mycode): self.mycode = mycode # 服务端存根(RPC Proxy) self.server_stub = ServerStub(mycode) # TCP Socket self.socket = socket.socket() # 端口复用 self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # 绑定端口 self.socket.bind(address) def run(self): self.socket.listen() while True: # 等待客户端连接 client_socket, client_addr = self.socket.accept() print(f"来自{client_addr}的请求:\n") # 交给服务端存根(Server Proxy)处理 self.server_stub.handle(client_socket, client_addr) if __name__ == "__main__": from server_code import MyCode server = RPCServer(('', 50051), MyCode()) print("Server启动ing,Port:50051") server.run()