与服务端数据流模式相反,这次是客户端不断地向服务端发送数据,而在发送结束后,由服务端返回一个响应,典型的例子是物联网终端向服务器发送数据。比如大棚里面的温度传感器,显然要把里面的温度实时上报给服务器。
双向数据流:
顾名思义,这是客户端和服务端都可以向对方发送数据流,这个时候双方的数据可以同时互相发送,也就是实现实时交互,典型的例子是聊天机器人。
然后我们来编写 proto 文件,实现一下上面的几种流模式。
syntax = "proto3"; option go_package = ".;yoyoyo"; message StreamRequestData { string data = 1; } message StreamResponseData { string data = 1; } service StreamTest { // 服务端流模式,在返回值前面加上一个 stream rpc GetStream(StreamRequestData) returns (stream StreamResponseData){} // 客户端流模式,在参数前面加上一个 stream rpc PutStream(stream StreamRequestData) returns (StreamResponseData){} // 双向流模式 rpc AllStream(stream StreamRequestData) returns (stream StreamResponseData){} } // 所以我们看到一个服务里面的方法可以有很多个,并且这里面的参数和返回值都是 StreamRequestData 和 StreamResponseData // 但是不同的方法,我们也可以指定不同的参数下面我们就来生成对应的 Go 源文件,然后编写对应的服务端,我们以 Go 来演示,Python 也是类似的。
package main import ( "fmt" "google.golang.org/grpc" "matsuri/yoyoyo" "net" ) // 还是定义一个结构体,然后为结构体绑定方法 type Server struct { } // 但是这是流模式,绑定的方法里面的参数和返回值还和之间一样吗?我们看一下自动生成的文件吧 /* type StreamTestServer interface { // 服务端流模式,在返回值前面加上一个 stream GetStream(*StreamRequestData, StreamTest_GetStreamServer) error // 客户端流模式,在参数前面加上一个 stream PutStream(StreamTest_PutStreamServer) error // 双向流模式 AllStream(StreamTest_AllStreamServer) error } */ // 我们后面在使用 RegisterStreamTestServer 进行注册的时候,第二个参数接收的实际上是一个接口 StreamTestServer // 所以如果你想注册成服务的话,那么就必须实现上面三个方法。而且我们看到,在自动生成代码的是帮我们把注释也加上去了 func (s *Server) GetStream(request *yoyoyo.StreamRequestData, res yoyoyo.StreamTest_GetStreamServer) error { return nil } func (s *Server) PutStream(res yoyoyo.StreamTest_PutStreamServer) error { return nil } func (s *Server) AllStream(res yoyoyo.StreamTest_AllStreamServer) error { return nil } func main() { // 直接进行 grpc 服务端创建、注册等逻辑没有变化 server := grpc.NewServer() yoyoyo.RegisterStreamTestServer(server, &Server{}) listener, err := net.Listen("tcp", ":33333") if err != nil { fmt.Println(err) return } if err = server.Serve(listener); err != nil { fmt.Println(err) return } }整体逻辑是没有问题的,但是现在还不能执行,因为方法里面只返回了一个 nil。这里我们看到流模式对应函数的参数的和返回值,与之前的简单模式是不一样的。因为这是肯定的,流模式要求源源不断地返回,所以肯定不能通过 return 语句实现,因此流模式的返回值只有一个 error。了解了这些之后,我们再来编写里面的方法。
func (s *Server) GetStream(request *yoyoyo.StreamRequestData, res yoyoyo.StreamTest_GetStreamServer) error { // 那么服务端流模式要如何返回数据呢?答案是通过 res.Send 方法即可 data := request.Data i := 1 for i < 6{ // 但 Send 的里面的内容还是 StreamResponseData,因为要被序列化嘛 _ = res.Send(&yoyoyo.StreamResponseData{Data: fmt.Sprintf("%s%d", data, i)}) i ++ time.Sleep(time.Second) } return nil } func (s *Server) PutStream(res yoyoyo.StreamTest_PutStreamServer) error { return nil } func (s *Server) AllStream(res yoyoyo.StreamTest_AllStreamServer) error { return nil }这里我们先编写 GetStream,然后编写客户端去访问。
package main import ( "context" "fmt" "google.golang.org/grpc" "matsuri/yoyoyo" ) func main() { conn, err := grpc.Dial("127.0.0.1:33333", grpc.WithInsecure()) if err != nil { fmt.Println(err) return } defer func() { _ = conn.Close() }() client := yoyoyo.NewStreamTestClient(conn) // 注意:客户端调用依旧是之前的模式,因为它是基于 proto 文件来的 response, _ := client.GetStream( context.Background(), &yoyoyo.StreamRequestData{Data: "神乐"}, ) // 然后我们可以进行测试了 for { // 当服务端返回之后,那么 err 会得到一个 EOF data, err := response.Recv() if err != nil { fmt.Println(err) break } fmt.Println(data) } /* data:"神乐1" data:"神乐2" data:"神乐3" data:"神乐4" data:"神乐5" EOF */ }打印的是一个结构体,我们可以调用里面的 Data 成员,当然这不是重点,重点是数据是实时返回的。