golang http/transport 代码分析 (2)

getConn首先从空闲连接池中获取连接,如果没有,则新建连接。在新建过程中,如果连接池中有空闲连接则也复用空闲连接。
继续看一下dialConn是如何建立连接的:

func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (*persistConn, error) { // 注意这里初始化的各种channle pconn := &persistConn{ t: t, cacheKey: cm.key(), reqch: make(chan requestAndChan, 1), // 用于给readLoop发送request writech: make(chan writeRequest, 1), // 用于给writeLoop发送request closech: make(chan struct{}), // 当连接关闭是用于传递信息 writeErrCh: make(chan error, 1), // 由writeLoop返回给roundTrip错误信息 writeLoopDone: make(chan struct{}), // 当writeLoop结束的时候会关闭该channel } trace := httptrace.ContextClientTrace(ctx) wrapErr := func(err error) error { if cm.proxyURL != nil { // Return a typed error, per Issue 16997 return &net.OpError{Op: "proxyconnect", Net: "tcp", Err: err} } return err } conn, err := t.dial(ctx, "tcp", cm.addr()) if err != nil { return nil, wrapErr(err) } pconn.conn = conn // 包装一个请求成另一个结构体,方便后续处理 if t.MaxConnsPerHost > 0 { pconn.conn = &connCloseListener{Conn: pconn.conn, t: t, cmKey: pconn.cacheKey} } // 包装读写conn并开启读取和写入groutine pconn.br = bufio.NewReader(pconn) pconn.bw = bufio.NewWriter(persistConnWriter{pconn}) go pconn.readLoop() go pconn.writeLoop() return pconn, nil }

可以看到首先调用dial函数,获取一个conn对象,然后封装为pconn的, 启动readLoop和wirteLoop后将该pconn返回。
以readLoop为例,看看是如何从一个pc中读取response的:

func (pc *persistConn) readLoop() { // 默认是失败,如果失败则进行处理,移除该连接,使用defer语句表示在程序退出的时候执行,也就是说该groutine在正常情况下不会退出,是个死循环,通过channel与其它groutine通信,处理请求 closeErr := errReadLoopExiting // default value, if not changed below defer func() { pc.close(closeErr) pc.t.removeIdleConn(pc) }() // 尝试将该连接重新返回闲置连接池 tryPutIdleConn := func(trace *httptrace.ClientTrace) bool { if err := pc.t.tryPutIdleConn(pc); err != nil { closeErr = err if trace != nil && trace.PutIdleConn != nil && err != errKeepAlivesDisabled { trace.PutIdleConn(err) } return false } if trace != nil && trace.PutIdleConn != nil { trace.PutIdleConn(nil) } return true } // 用来保证先后次序,先归还连接再读取response.Body // eofc is used to block caller goroutines reading from Response.Body // at EOF until this goroutines has (potentially) added the connection // back to the idle pool. eofc := make(chan struct{}) defer close(eofc) // unblock reader on errors // Read this once, before loop starts. (to avoid races in tests) testHookMu.Lock() testHookReadLoopBeforeNextRead := testHookReadLoopBeforeNextRead testHookMu.Unlock() alive := true for alive { pc.readLimit = pc.maxHeaderResponseSize() _, err := pc.br.Peek(1) pc.mu.Lock() if pc.numExpectedResponses == 0 { pc.readLoopPeekFailLocked(err) pc.mu.Unlock() return } pc.mu.Unlock() // 获取一个新连接来处理 rc := <-pc.reqch trace := httptrace.ContextClientTrace(rc.req.Context()) var resp *Response if err == nil { // 读取返回结果 resp, err = pc.readResponse(rc, trace) } else { err = transportReadFromServerError{err} closeErr = err } if err != nil { if pc.readLimit <= 0 { err = fmt.Errorf("net/http: server response headers exceeded %d bytes; aborted", pc.maxHeaderResponseSize()) } select { case rc.ch <- responseAndError{err: err}: case <-rc.callerGone: return } return } pc.readLimit = maxInt64 // effictively no limit for response bodies pc.mu.Lock() pc.numExpectedResponses-- pc.mu.Unlock() bodyWritable := resp.bodyIsWritable() hasBody := rc.req.Method != "HEAD" && resp.ContentLength != 0 if resp.Close || rc.req.Close || resp.StatusCode <= 199 || bodyWritable { // Don't do keep-alive on error if either party requested a close // or we get an unexpected informational (1xx) response. // StatusCode 100 is already handled above. alive = false } if !hasBody || bodyWritable { pc.t.setReqCanceler(rc.req, nil) // Put the idle conn back into the pool before we send the response // so if they process it quickly and make another request, they'll // get this same conn. But we use the unbuffered channel 'rc' // to guarantee that persistConn.roundTrip got out of its select // potentially waiting for this persistConn to close. // but after alive = alive && !pc.sawEOF && pc.wroteRequest() && tryPutIdleConn(trace) if bodyWritable { closeErr = errCallerOwnsConn } select { case rc.ch <- responseAndError{res: resp}: case <-rc.callerGone: return } // Now that they've read from the unbuffered channel, they're safely // out of the select that also waits on this goroutine to die, so // we're allowed to exit now if needed (if alive is false) testHookReadLoopBeforeNextRead() continue } // bodyEOFSignal实现了io.ReadCloser interface, 保证读取的时候,该response已经收到了eof waitForBodyRead := make(chan bool, 2) body := &bodyEOFSignal{ body: resp.Body, earlyCloseFn: func() error { waitForBodyRead <- false <-eofc // will be closed by deferred call at the end of the function return nil }, fn: func(err error) error { isEOF := err == io.EOF waitForBodyRead <- isEOF if isEOF { <-eofc // see comment above eofc declaration } else if err != nil { if cerr := pc.canceled(); cerr != nil { return cerr } } return err }, } resp.Body = body if rc.addedGzip && strings.EqualFold(resp.Header.Get("Content-Encoding"), "gzip") { resp.Body = &gzipReader{body: body} resp.Header.Del("Content-Encoding") resp.Header.Del("Content-Length") resp.ContentLength = -1 resp.Uncompressed = true } select { // 将分装好的repsponse发送回去 case rc.ch <- responseAndError{res: resp}: case <-rc.callerGone: return } // Before looping back to the top of this function and peeking on // the bufio.Reader, wait for the caller goroutine to finish // reading the response body. (or for cancelation or death) select { case bodyEOF := <-waitForBodyRead: pc.t.setReqCanceler(rc.req, nil) // before pc might return to idle pool alive = alive && bodyEOF && !pc.sawEOF && pc.wroteRequest() && tryPutIdleConn(trace) if bodyEOF { eofc <- struct{}{} //前面所有检查完毕,通知对端开始读取 } case <-rc.req.Cancel: alive = false pc.t.CancelRequest(rc.req) case <-rc.req.Context().Done(): alive = false pc.t.cancelRequest(rc.req, rc.req.Context().Err()) case <-pc.closech: alive = false } testHookReadLoopBeforeNextRead() } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wsswpx.html