golang timeoutHandler解析及kubernetes中的变种 (2)

kubernetes 为了防止某个请求hang死之后一直占用连接, 所以会对每个请求进行timeout的处理, 这部分逻辑是在一个handler chain中WithTimeoutForNonLongRunningRequests handler实现。其中返回的WithTimeout的实现如下:

// WithTimeout returns an http.Handler that runs h with a timeout // determined by timeoutFunc. The new http.Handler calls h.ServeHTTP to handle // each request, but if a call runs for longer than its time limit, the // handler responds with a 504 Gateway Timeout error and the message // provided. (If msg is empty, a suitable default message will be sent.) After // the handler times out, writes by h to its http.ResponseWriter will return // http.ErrHandlerTimeout. If timeoutFunc returns a nil timeout channel, no // timeout will be enforced. recordFn is a function that will be invoked whenever // a timeout happens. func WithTimeout(h http.Handler, timeoutFunc func(*http.Request) (timeout <-chan time.Time, recordFn func(), err *apierrors.StatusError)) http.Handler { return &timeoutHandler{h, timeoutFunc} }

其中主要是timeoutHandler, 实现如下:

type timeoutHandler struct { handler http.Handler timeout func(*http.Request) (<-chan time.Time, func(), *apierrors.StatusError) } func (t *timeoutHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { after, recordFn, err := t.timeout(r) if after == nil { t.handler.ServeHTTP(w, r) return } result := make(chan interface{}) tw := newTimeoutWriter(w) go func() { defer func() { result <- recover() }() t.handler.ServeHTTP(tw, r) }() select { case err := <-result: if err != nil { panic(err) } return case <-after: recordFn() tw.timeout(err) } }

如上, 在ServeHTTP中主要做了几件事情:

调用timeoutHandler.timeout设置一个timer, 如果timeout时间到到达会通过after这个channel传递过来, 后面会监听该channel

创建timeoutWriter对象, 该timeoutWriter中有一个timeout方法, 该方法会在超时之后会被调用

异步调用ServeHTTP并将timeoutWriter传递进去,如果该groutine panic则进行捕获并通过channel传递到调用方groutine, 因为我们不能因为一个groutine panic导致整个进程退出,而且调用方groutine对这些panic信息比较感兴趣,需要传递过去。

监听定时器channel

如果定时器channel超时会调用timeoutWrite.timeout方法,该方法如下:

func (tw *baseTimeoutWriter) timeout(err *apierrors.StatusError) { tw.mu.Lock() defer tw.mu.Unlock() tw.timedOut = true // The timeout writer has not been used by the inner handler. // We can safely timeout the HTTP request by sending by a timeout // handler if !tw.wroteHeader && !tw.hijacked { tw.w.WriteHeader(http.StatusGatewayTimeout) enc := json.NewEncoder(tw.w) enc.Encode(&err.ErrStatus) } else { // The timeout writer has been used by the inner handler. There is // no way to timeout the HTTP request at the point. We have to shutdown // the connection for HTTP1 or reset stream for HTTP2. // // Note from: Brad Fitzpatrick // if the ServeHTTP goroutine panics, that will do the best possible thing for both // HTTP/1 and HTTP/2. In HTTP/1, assuming you're replying with at least HTTP/1.1 and // you've already flushed the headers so it's using HTTP chunking, it'll kill the TCP // connection immediately without a proper 0-byte EOF chunk, so the peer will recognize // the response as bogus. In HTTP/2 the server will just RST_STREAM the stream, leaving // the TCP connection open, but resetting the stream to the peer so it'll have an error, // like the HTTP/1 case. panic(errConnKilled) } }

可以看到, 如果此时还没有写入任何数据, 则直接返回504状态码, 否则直接panic。 上面有一大段注释说明为什么panic, 这段注释的出处在kubernetes issue:
API server panics when writing response #29001。 引用的是golang http包作者 Brad Fitzpatrick的话, 意思是: 如果我们已经往一个writer中写入了部分数据,我们是没有办法timeout, 此时goroutine panic或许是最好的选择, 无论是对于HTTP/1.1还是HTTP/2.0, 如果是HTTP/1.1, 他不会发送任何数据,直接断开tcp连接, 此时对端就能够识别出来server异常,如果是HTTP/2.0 此时srever会RST_STREAM该stream, 并且不会影响connnection, 对端也能够很好的处理。 这部分代码还是很有意思的, 很难想象kubernetes会以panic掉groutine的方式来处理一个request的超时。

panic掉一个groutine, 如果你上层没有任何recover机制的话, 整个程序都会退出, 对于kubenernetes apiserver肯定是不能接受的, kubernetes在每个request的handler chain中会有一个genericfilters.WithPanicRecovery进行捕获这样的panic, 避免整个进程崩溃。

Other

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpdzdf.html