golang http/transport 代码分析

http.Client 表示一个http client端,用来处理HTTP相关的工作,例如cookies, redirect, timeout等工作,其内部包含一个Transport,为RountTripper interface类型。

type Client struct { // Transport specifies the mechanism by which individual // HTTP requests are made. // If nil, DefaultTransport is used. Transport RoundTripper ... }

RountTripper定义了执行一次http请求时,如何根据reueqest返回response,它必须是支持并发的一个结构体,允许多个groutine同时调用:

type RoundTripper interface { RoundTrip(*Request) (*Response, error) }

如果不给http.Client显式指定RoundTripper则会创建一个默认的DefaultTransport。Transport是用来保存多个请求过程中的一些状态,用来缓存tcp连接,客户可以重用这些连接,防止每次新建,transport需要同时支持http, https, 并且需要http/1.1, http/2。DefaultTransport默认就支持http/2.0,如果需要显式指定则调用ConfigureTransport。

transport必须实现interface中的roundTrip方法:

// roundTrip implements a RoundTripper over HTTP. func (t *Transport) roundTrip(req *Request) (*Response, error) { ... for { select { case <-ctx.Done(): req.closeBody() return nil, ctx.Err() default: } // treq gets modified by roundTrip, so we need to recreate for each retry. treq := &transportRequest{Request: req, trace: trace} cm, err := t.connectMethodForRequest(treq) if err != nil { req.closeBody() return nil, err } // 获取一个连接 // Get the cached or newly-created connection to either the // host (for http or https), the http proxy, or the http proxy // pre-CONNECTed to https server. In any case, we'll be ready // to send it requests. pconn, err := t.getConn(treq, cm) if err != nil { t.setReqCanceler(req, nil) req.closeBody() return nil, err } var resp *Response if pconn.alt != nil { // HTTP/2 path. t.decHostConnCount(cm.key()) // don't count cached http2 conns toward conns per host t.setReqCanceler(req, nil) // not cancelable with CancelRequest resp, err = pconn.alt.RoundTrip(req) } else { // 开始调用该pconn的rountTrip方法取得response resp, err = pconn.roundTrip(treq) } if err == nil { return resp, nil } if !pconn.shouldRetryRequest(req, err) { // Issue 16465: return underlying net.Conn.Read error from peek, // as we've historically done. if e, ok := err.(transportReadFromServerError); ok { err = e.err } return nil, err } testHookRoundTripRetried() // Rewind the body if we're able to. if req.GetBody != nil { newReq := *req var err error newReq.Body, err = req.GetBody() if err != nil { return nil, err } req = &newReq } } }

roundTrip其实就是通过getConn用于获取一个连接persisConn并调用其roundTrip方法返回repsonse。其中getConn的实现如下:

// getConn dials and creates a new persistConn to the target as // specified in the connectMethod. This includes doing a proxy CONNECT // and/or setting up TLS. If this doesn't return an error, the persistConn // is ready to write requests to. func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (*persistConn, error) { req := treq.Request trace := treq.trace ctx := req.Context() if trace != nil && trace.GetConn != nil { trace.GetConn(cm.addr()) } // 首先从idleConn空闲连接池中尝试获取闲置的连接 if pc, idleSince := t.getIdleConn(cm); pc != nil { if trace != nil && trace.GotConn != nil { trace.GotConn(pc.gotIdleConnTrace(idleSince)) } // set request canceler to some non-nil function so we // can detect whether it was cleared between now and when // we enter roundTrip t.setReqCanceler(req, func(error) {}) return pc, nil } type dialRes struct { pc *persistConn err error } dialc := make(chan dialRes) // 连接创建完成之后会从该管道异步通知 cmKey := cm.key() // 标识一个连接的key // Copy these hooks so we don't race on the postPendingDial in // the goroutine we launch. Issue 11136. testHookPrePendingDial := testHookPrePendingDial testHookPostPendingDial := testHookPostPendingDial handlePendingDial := func() { testHookPrePendingDial() go func() { if v := <-dialc; v.err == nil { t.putOrCloseIdleConn(v.pc) } else { t.decHostConnCount(cmKey) } testHookPostPendingDial() }() } cancelc := make(chan error, 1) t.setReqCanceler(req, func(err error) { cancelc <- err }) // 一边增加记录的连接数,一边尝试获取连接,一边监听取消事件 if t.MaxConnsPerHost > 0 { select { case <-t.incHostConnCount(cmKey): // count below conn per host limit; proceed case pc := <-t.getIdleConnCh(cm): if trace != nil && trace.GotConn != nil { trace.GotConn(httptrace.GotConnInfo{Conn: pc.conn, Reused: pc.isReused()}) } return pc, nil case <-req.Cancel: return nil, errRequestCanceledConn case <-req.Context().Done(): return nil, req.Context().Err() case err := <-cancelc: if err == errRequestCanceled { err = errRequestCanceledConn } return nil, err } } // 异步发起连接操作 go func() { pc, err := t.dialConn(ctx, cm) dialc <- dialRes{pc, err} }() // 监听多个事件来源 // 1. 新创建成功 // 2. 其它连接结束,闲置连接池中有连接可以复用 // 3. 连接被取消 // 第一种情况和第二种情况谁先成功就直接返回 // 除了新建连接成功,其它所有情况都需要处理调用`handlePendingDial`,该函数决定新建连接返回后该如何处理 idleConnCh := t.getIdleConnCh(cm) select { case v := <-dialc: // 如果新建连接结束后会从该channel发送过来 // Our dial finished. if v.pc != nil { if trace != nil && trace.GotConn != nil && v.pc.alt == nil { trace.GotConn(httptrace.GotConnInfo{Conn: v.pc.conn}) } return v.pc, nil } // Our dial failed. See why to return a nicer error // value. t.decHostConnCount(cmKey) select { case <-req.Cancel: // It was an error due to cancelation, so prioritize that // error value. (Issue 16049) return nil, errRequestCanceledConn case <-req.Context().Done(): return nil, req.Context().Err() case err := <-cancelc: if err == errRequestCanceled { err = errRequestCanceledConn } return nil, err default: // It wasn't an error due to cancelation, so // return the original error message: return nil, v.err } case pc := <-idleConnCh: // 如果从空闲连接池中有了可用的连接,直接返回 // Another request finished first and its net.Conn // became available before our dial. Or somebody // else's dial that they didn't use. // But our dial is still going, so give it away // when it finishes: handlePendingDial() if trace != nil && trace.GotConn != nil { trace.GotConn(httptrace.GotConnInfo{Conn: pc.conn, Reused: pc.isReused()}) } return pc, nil case <-req.Cancel: handlePendingDial() return nil, errRequestCanceledConn case <-req.Context().Done(): handlePendingDial() return nil, req.Context().Err() case err := <-cancelc: handlePendingDial() if err == errRequestCanceled { err = errRequestCanceledConn } return nil, err } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wsswpx.html