protobuf 介绍,Python 和 Go 编写 rpc 服务(gRPC) (19)

然后我们来编写服务端,获取 metadata 信息。

package main import ( "context" "fmt" "google.golang.org/grpc" "google.golang.org/grpc/metadata" "matsuri/yoyoyo" "net" ) type Server struct { } func (s *Server) SayHello (ctx context.Context, request *yoyoyo.HelloRequest) (*yoyoyo.HelloResponse, error) { res := new(yoyoyo.HelloResponse) // 获取客户端传递的 context 对象会交给参数 ctx,我们从中解析出 md md, _ := metadata.FromIncomingContext(ctx) // 注意:返回值是一个 []string content_type, cookie := md["content-type"], md["cookie"] res.Info = map[string]string{"content-type": content_type[0] + " 来自服务端", "cookie": cookie[0] + " 来自服务端"} return res, nil } func main() { server := grpc.NewServer() yoyoyo.RegisterHelloServer(server, &Server{}) listener, err := net.Listen("tcp", ":33333") if err != nil { fmt.Println(err) return } if err = server.Serve(listener); err != nil { fmt.Println(err) return } }

以上我们便实现了 metadata 机制,注意:除了我们自己设置的之外,metadata 里面还有一些自带的信息,可以自己尝试打印一下看看都有啥。

Python 操作 metadata

Go 操作 metadata 我们见识过了,那么看看 Python 如何操作,这里我们先来编写服务端,还是用之前的 hello.proto 文件。

import grpc # 因为我们把 proto 文件里面的 timestamp 给删掉了,只保留一个 info # 所以不需要导入内置的 proto 文件了,因此这里也就不需要设置环境变量了 pb2, pb2_grpc = grpc.protos_and_services("hello.proto") class Hello(pb2_grpc.HelloServicer): def SayHello(self, request, context): # 调用 context 的 invocation_metadata 可以拿到 metadata md = context.invocation_metadata() # 通过 context.set_trailing_metadata() 可以设置 metadata context.set_trailing_metadata( tuple((k, v + " from server") for k, v in md) ) return pb2.HelloResponse(info={"name": "mea", "age": "38"}) if __name__ == \'__main__\': from concurrent.futures import ThreadPoolExecutor grpc_server = grpc.server(ThreadPoolExecutor(max_workers=4)) pb2_grpc.add_HelloServicer_to_server(Hello(), grpc_server) grpc_server.add_insecure_port("127.0.0.1:22222") grpc_server.start() grpc_server.wait_for_termination()

然后是客户端:

import grpc pb2, pb2_grpc = grpc.protos_and_services("hello.proto") channel = grpc.insecure_channel("127.0.0.1:22222") client = pb2_grpc.HelloStub(channel=channel) # 调用 SayHello.with_call 来传递,Python 显然就更简单了 response, call = client.SayHello.with_call( pb2.HelloRequest(), metadata=( # 注意:里面的第一个元素相当于 key,一定要小写,否则报错 # 另外不能有横杠,否则显示不出来 ("content_type", "Json"), ("cookie", "SessionId"), ) ) # 返回值 print(response.info) # {\'name\': \'mea\', \'age\': \'38\'} # 通过 call 获取 metadata for k, v in call.trailing_metadata(): print(k, "----------", v) """ content_type ---------- Json from server cookie ---------- SessionId from server user-agent ---------- grpc-python/1.32.0 grpc-c/12.0.0 (windows; chttp2) from server """

我们看到它返回了一个 user-agent,通过它的 value 我们能看出服务端是可以区分到底是哪种语言的客户端调用的。

grpc 拦截器 - Go

对于做 web 开发而言,拦截器是非常常见的,基本上所有的 web 框架都会提供一个拦截器的功能。拦截器的功能很好理解,就是请求到来在进入处理逻辑之前会先被拦截,然后做一些预处理,比如判断是否登录、通过 User-Agent 判断是否是爬虫等等。

rpc 服务也是如此,尽管是客户端来调用,但我们仍然需要拦截器,因为我们不希望在业务层面上做大量重复的通用逻辑。而 gRPC 内置了拦截器的设置,我们只需要实现拦截器的逻辑,然后把拦截器配置到服务端或者客户端即可。下面我们来看看如何在 Go 的 gRPC 中实现拦截器,还是基于之前的 hello.proto 文件,尽量保证逻辑简单,先来编写服务端:

package main import ( "context" "fmt" "google.golang.org/grpc" "google.golang.org/grpc/metadata" "matsuri/yoyoyo" "net" ) type Server struct { } func (s *Server) SayHello (ctx context.Context, request *yoyoyo.HelloRequest) (*yoyoyo.HelloResponse, error) { res := new(yoyoyo.HelloResponse) res.Info = map[string]string{"name": "神乐七奈"} return res, nil } // 下面是拦截器的逻辑,我们编写一个函数,然后传递到 grpc.UnaryInterceptor 中即可返回一个拦截器(ServerOption),然后创建 Server 的时候传进去(可以是任意个) // 然后函数的类型如下: // type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error) func MyInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { // ctx 就相当于 SayHello 里面的 ctx、req 就相当于 SayHello 的 request(使用的时候需要先做一下类型断言) // 注意:并不是这里调用完毕之后再去调用服务端的方法,服务端的方法能否调用取决于拦截器里面的逻辑 // 参数 handler 就表示对应的服务端的方法,我们必须执行 handler(ctx, req),等价于 SayHello(ctx, request) // 如果没有这一步的话,那么服务端的方法是不会被调用的,我们做一下判断吧 md, _ := metadata.FromIncomingContext(ctx) // 用户必须通过 metadata 传递一个名为 "matsuri" 的 key、然后值为 "fubuki",否则就是验证失败 if val, ok := md["matsuri"]; ok && val[0] == "fubuki" { // 执行服务端的方法 return handler(ctx, req) } else { // 否则的话直接返回,仍然要返回一个 *yoyoyo.HelloResponse res := new(yoyoyo.HelloResponse) res.Info = map[string]string{"error": "缺少名为 \"matsuri\" 的 key,或者该 key 对应的 value 不正确"} return res, nil // 或者你还可以将错误信息放在 error 中,grpc 下面有一个 status 用来返回错误、codes 用来设置状态码 // status.Error(codes.Unauthenticated, "缺少名为 \"matsuri\" 的 key,或者该 key 对应的 value 不正确") } } func main() { // 传递一个函数,返回一个 ServerOption opt := grpc.UnaryInterceptor(MyInterceptor) // 放入到 NewServer 中 server := grpc.NewServer(opt) yoyoyo.RegisterHelloServer(server, &Server{}) listener, err := net.Listen("tcp", ":33333") if err != nil { fmt.Println(err) return } if err = server.Serve(listener); err != nil { fmt.Println(err) return } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zwfsgj.html