漫谈grpc 3:从实践到原理,带你参透 gRPC (6)

Serve 会根据外部传入的 Listener 不同而调用不同的监听模式,这也是 net.Listener 的魅力,灵活性和扩展性会比较高。而在 gRPC Server 中最常用的就是 TCPConn,基于 TCP Listener 去做。接下来我们一起看看具体的处理逻辑,如下:

图片

漫谈grpc 3:从实践到原理,带你参透 gRPC

循环处理连接,通过 lis.Accept 取出连接,如果队列中没有需处理的连接时,会形成阻塞等待。

若 lis.Accept 失败,则触发休眠机制,若为第一次失败那么休眠 5ms,否则翻倍,再次失败则不断翻倍直至上限休眠时间 1s,而休眠完毕后就会尝试去取下一个 “它”。

若 lis.Accept 成功,则重置休眠的时间计数和启动一个新的 goroutine 调用 handleRawConn 方法去执行/处理新的请求,也就是大家很喜欢说的 “每一个请求都是不同的 goroutine 在处理”。

在循环过程中,包含了 “退出” 服务的场景,主要是硬关闭和优雅重启服务两种情况。

客户端

图片

漫谈grpc 3:从实践到原理,带你参透 gRPC

一、创建拨号连接 // grpc.Dial(":"+PORT, grpc.WithInsecure()) func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {  cc := &ClientConn{   target:            target,   csMgr:             &connectivityStateManager{},   conns:             make(map[*addrConn]struct{}),   dopts:             defaultDialOptions(),   blockingpicker:    newPickerWrapper(),   czData:            new(channelzData),   firstResolveEvent: grpcsync.NewEvent(),  }  ...  chainUnaryClientInterceptors(cc)  chainStreamClientInterceptors(cc)  ... }

漫谈grpc 3:从实践到原理,带你参透 gRPC

grpc.Dial 方法实际上是对于 grpc.DialContext 的封装,区别在于 ctx 是直接传入 context.Background。其主要功能是创建与给定目标的客户端连接,其承担了以下职责:

初始化 ClientConn

初始化(基于进程 LB)负载均衡配置

初始化 channelz

初始化重试规则和客户端一元/流式拦截器

初始化协议栈上的基础信息

相关 context 的超时控制

初始化并解析地址信息

创建与服务端之间的连接

连没连

之前听到有的人说调用 grpc.Dial 后客户端就已经与服务端建立起了连接,但这对不对呢?我们先鸟瞰全貌,看看正在跑的 goroutine。如下:

图片

漫谈grpc 3:从实践到原理,带你参透 gRPC

我们可以有几个核心方法一直在等待/处理信号,通过分析底层源码可得知。涉及如下:

func (ac *addrConn) connect() func (ac *addrConn) resetTransport() func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) func (ac *addrConn) getReadyTransport()

漫谈grpc 3:从实践到原理,带你参透 gRPC

在这里主要分析 goroutine 提示的 resetTransport 方法,看看都做了啥。核心代码如下:

func (ac *addrConn) resetTransport() {  for i := 0; ; i++ {   if ac.state == connectivity.Shutdown {    return   }   ...   connectDeadline := time.Now().Add(dialDuration)   ac.updateConnectivityState(connectivity.Connecting)   newTr, addr, reconnect, err := ac.tryAllAddrs(addrs, connectDeadline)   if err != nil {    if ac.state == connectivity.Shutdown {     return    }    ac.updateConnectivityState(connectivity.TransientFailure)    timer := time.NewTimer(backoffFor)    select {    case <-timer.C:     ...    }    continue   }   if ac.state == connectivity.Shutdown {    newTr.Close()    return   }   ...   if !healthcheckManagingState {    ac.updateConnectivityState(connectivity.Ready)   }   ...   if ac.state == connectivity.Shutdown {    return   }   ac.updateConnectivityState(connectivity.TransientFailure)  } }

漫谈grpc 3:从实践到原理,带你参透 gRPC

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zgffdj.html