然后是 PutStream,客户端不断向服务端发送数据,这两个过程是相同、但又相反的。
func (s *Server) PutStream(res yoyoyo.StreamTest_PutStreamServer) error { for { data, err := res.Recv() if err != nil { break } fmt.Println(data) } return nil }服务端的代码如上,和之前的客户端没有什么区别。然后客户端调用,和之前的服务端也是类似,所以它们是相同、但又相反的。
package main import ( "context" "fmt" "google.golang.org/grpc" "matsuri/yoyoyo" "time" ) func main() { conn, err := grpc.Dial("127.0.0.1:33333", grpc.WithInsecure()) if err != nil { fmt.Println(err) return } defer func() { _ = conn.Close() }() client := yoyoyo.NewStreamTestClient(conn) // 只需要接收一个 context response, _ := client.PutStream( context.Background(), ) for i := 0; i < 5; i++ { _ = response.Send(&yoyoyo.StreamRequestData{Data: fmt.Sprintf("%s%d", "椎名真白", i)}) time.Sleep(time.Second) } }服务端会打印输出,至于打印内容就不贴了,最后再来看一下双向流模式。
双向流模式的话,既可以 Send 又可以 Recv,我们直接开启两个协程去处理即可,服务端和客户端都是如此。先来看看服务端:
func (s *Server) AllStream(res yoyoyo.StreamTest_AllStreamServer) error { var wg = new(sync.WaitGroup) wg.Add(2) go func() { for ;; { if data, err := res.Recv(); err != nil { fmt.Println(err) break } else { fmt.Println("收到客户端消息:", data.Data) } } wg.Done() }() go func() { i := 0 for ;; i++ { if err := res.Send(&yoyoyo.StreamResponseData{Data: fmt.Sprintf("%s%d", "我是服务端", i)}); err != nil { fmt.Println(err) break } time.Sleep(time.Second) } wg.Done() }() wg.Wait() return nil }可以看到,逻辑的话就相当于将服务端流模式和客户端流模式组合起来了,客户端也是一样,逻辑高度一致。
package main import ( "context" "fmt" "google.golang.org/grpc" "matsuri/yoyoyo" "sync" "time" ) func main() { conn, err := grpc.Dial("127.0.0.1:33333", grpc.WithInsecure()) if err != nil { fmt.Println(err) return } defer func() { _ = conn.Close() }() client := yoyoyo.NewStreamTestClient(conn) // 只需要接收一个 context res, _ := client.AllStream( context.Background(), ) // 逻辑和客户端类似 var wg = new(sync.WaitGroup) wg.Add(2) go func() { for { if data, err := res.Recv(); err != nil { fmt.Println(err) break } else { fmt.Println("收到服务端消息:", data.Data) } } wg.Done() }() go func() { i := 0 for ;;i++{ // 这里需要注意,服务端 Send 的时候,里面的内容是 &StreamResponseData // 但客户端 Send 的内容是 &StreamRequestData if err := res.Send(&yoyoyo.StreamRequestData{Data: fmt.Sprintf("%s%d", "我是客户端", i)}); err != nil { fmt.Println(err) break } time.Sleep(time.Second) } wg.Done() }() wg.Wait() }执行服务端和客户端代码的话,会发现不断打印输出,当然我们这里没有做退出时候的处理,可以自己完善一下。
gRPC 进阶protobuf 和 gRPC 还有很多内容,我们没有说,它们还不止这么简单,下面来了解更多的内容。
常用的 protobuf 数据类型一个标量消息字段可以含有一个类型,然后在生成文件的时候会自动转换成与该语言对应的类型。那么 proto 文件中都可以定义哪些类型呢?它们和 Python、Go 之间的对应关系又是什么呢?
定义string类型参数: string name = 1;
定义bytes类型参数: bytes stream = 1;
定义bool类型参数: bool flag = 1;
定义int32类型参数: int32 age = 1;
定义int64类型参数: int64 count = 1;
定义float类型参数: float amount = 1;
定义repeated类型参数: repeated string hobby = 1;
定义map类型参数: map<string, string> data = 1;
另外,即使我们没有网 request 里面传递参数,那么服务端在通过 request 获取属性的时候也不会报错,会得到一个对应的类型的零值。这些类型后面会慢慢说。
Python 操作符合类型的一个坑像 repeated、map 我们称之为符合类型,但是 Python 在操作它们的时候有一个坑,必须要注意一下。假设 request 里面有一个参数 id_lst,是一个 repeated int32 类型。
# 按照之前的赋值方式是完全可以的,任何类型的参数都是如此 req1 = pb2.request(id_lst=[1, 2, 3]) # 但如果这么做是不行的 req2 = pb2.request() req2.id_lst = [1, 2, 3] # 这种方式只适用于标量,对于 repeated、map,甚至里面还可以是一个 request,对于它们不能采用这种方式 # 但是 id_lst 已经被解析为列表了,只不过它是一个空列表 req2.id_lst.extend([1, 2, 3]) # 这么做是可以的