下面进行代码分析:
func (t *Transport) queueForDial(w *wantConn) { w.beforeDial() // 小于零说明无限制,异步建立连接 if t.MaxConnsPerHost <= 0 { go t.dialConnFor(w) return } t.connsPerHostMu.Lock() defer t.connsPerHostMu.Unlock() // 每个 host 建立的连接数没达到上限,异步建立连接 if n := t.connsPerHost[w.key]; n < t.MaxConnsPerHost { if t.connsPerHost == nil { t.connsPerHost = make(map[connectMethodKey]int) } t.connsPerHost[w.key] = n + 1 go t.dialConnFor(w) return } //每个 host 建立的连接数已达到上限,需要进入等待队列 if t.connsPerHostWait == nil { t.connsPerHostWait = make(map[connectMethodKey]wantConnQueue) } q := t.connsPerHostWait[w.key] q.cleanFront() q.pushBack(w) t.connsPerHostWait[w.key] = q}这里主要进行参数校验,如果最大连接数限制为零,亦或是每个 host 建立的连接数没达到上限,那么直接异步建立连接。
dialConnFor
func (t *Transport) dialConnFor(w *wantConn) { defer w.afterDial() // 建立连接 pc, err := t.dialConn(w.ctx, w.cm) // 连接绑定 wantConn delivered := w.tryDeliver(pc, err) // 建立连接成功,但是绑定 wantConn 失败 // 那么将该连接放置到空闲连接字典或调用 等待获取空闲 connection 字典 中的元素执行 if err == nil && (!delivered || pc.alt != nil) { t.putOrCloseIdleConn(pc) } if err != nil { t.decConnsPerHost(w.key) }}dialConnFor 会调用 dialConn 进行 TCP 连接创建,创建完毕之后调用 tryDeliver 方法和 wantConn 进行绑定。
dialConn
func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error) { // 创建连接结构体 pconn = &persistConn{ t: t, cacheKey: cm.key(), reqch: make(chan requestAndChan, 1), writech: make(chan writeRequest, 1), closech: make(chan struct{}), writeErrCh: make(chan error, 1), writeLoopDone: make(chan struct{}), } ... if cm.scheme() == "https" && t.hasCustomTLSDialer() { ... } else { // 建立 tcp 连接 conn, err := t.dial(ctx, "tcp", cm.addr()) if err != nil { return nil, wrapErr(err) } pconn.conn = conn } ... if s := pconn.tlsState; s != nil && s.NegotiatedProtocolIsMutual && s.NegotiatedProtocol != "" { if next, ok := t.TLSNextProto[s.NegotiatedProtocol]; ok { alt := next(cm.targetAddr, pconn.conn.(*tls.Conn)) if e, ok := alt.(http2erringRoundTripper); ok { // pconn.conn was closed by next (http2configureTransport.upgradeFn). return nil, e.err } return &persistConn{t: t, cacheKey: pconn.cacheKey, alt: alt}, nil } } pconn.br = bufio.NewReaderSize(pconn, t.readBufferSize()) pconn.bw = bufio.NewWriterSize(persistConnWriter{pconn}, t.writeBufferSize()) //为每个连接异步处理读写数据 go pconn.readLoop() go pconn.writeLoop() return pconn, nil}这里会根据 schema 的不同设置不同的连接配置,我上面显示的是我们常用的 HTTP 连接的创建过程。对于 HTTP 来说会建立 tcp 连接,然后为连接异步处理读写数据,最后将创建好的连接返回。
等待响应这一部分的内容会稍微复杂一些,但确实非常的有趣。
在创建连接的时候会初始化两个 channel :writech 负责写入请求数据,reqch负责读取响应数据。我们在上面创建连接的时候,也提到了会为连接创建两个异步循环 readLoop 和 writeLoop 来负责处理读写数据。
在获取到连接之后,会调用连接的 roundTrip 方法,它首先会将请求数据写入到 writech 管道中,writeLoop 接收到数据之后就会处理请求。
然后 roundTrip 会将 requestAndChan 结构体写入到 reqch 管道中,然后 roundTrip 会循环等待。readLoop 读取到响应数据之后就会通过 requestAndChan 结构体中保存的管道将数据封装成 responseAndError 结构体回写,这样 roundTrip 就可以接受到响应数据结束循环等待并返回。
roundTrip
func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) { ... writeErrCh := make(chan error, 1) // 将请求数据写入到 writech 管道中 pc.writech <- writeRequest{req, writeErrCh, continueCh} // 用于接收响应的管道 resc := make(chan responseAndError) // 将用于接收响应的管道封装成 requestAndChan 写入到 reqch 管道中 pc.reqch <- requestAndChan{ req: req.Request, cancelKey: req.cancelKey, ch: resc, ... } ... for { testHookWaitResLoop() select { // 接收到响应数据 case re := <-resc: if (re.res == nil) == (re.err == nil) { panic(fmt.Sprintf("internal error: exactly one of res or err should be set; nil=%v", re.res == nil)) } if debugRoundTrip { req.logf("resc recv: %p, %T/%#v", re.res, re.err, re.err) } if re.err != nil { return nil, pc.mapRoundTripError(req, startBytesWritten, re.err) } // 返回响应数据 return re.res, nil ... }}这里会封装好 writeRequest 作为发送请求的数据,并将用于接收响应的管道封装成 requestAndChan 写入到 reqch 管道中,然后循环等待接受响应。